diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java index 52b30effb..6c29135b0 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java @@ -43,6 +43,7 @@ public class PostgresChangeRecordEmitter extends RelationalChangeRecordEmitter { private final PostgresSchema schema; private final PostgresConnectorConfig connectorConfig; private final PostgresConnection connection; + private final TableId tableId; public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresConnectorConfig connectorConfig, PostgresSchema schema, PostgresConnection connection, ReplicationMessage message) { super(offset, clock); @@ -51,6 +52,9 @@ public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresCo this.message = message; this.connectorConfig = connectorConfig; this.connection = connection; + + this.tableId = PostgresSchema.parse(message.getTable()); + Objects.requireNonNull(tableId); } @Override @@ -68,10 +72,13 @@ protected Operation getOperation() { } @Override - protected Object[] getOldColumnValues() { - final TableId tableId = PostgresSchema.parse(message.getTable()); - Objects.requireNonNull(tableId); + public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException { + schema = synchronizeTableSchema(schema); + super.emitChangeRecords(schema, receiver); + } + @Override + protected Object[] getOldColumnValues() { try { switch (getOperation()) { case CREATE: @@ -89,9 +96,6 @@ protected Object[] getOldColumnValues() { @Override protected Object[] getNewColumnValues() { - final TableId tableId = PostgresSchema.parse(message.getTable()); - Objects.requireNonNull(tableId); - try { switch (getOperation()) { case CREATE: @@ -107,8 +111,7 @@ protected Object[] getNewColumnValues() { } } - @Override - public DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSchema) { + private DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSchema) { final boolean metadataInMessage = message.hasTypeMetadata(); final TableId tableId = (TableId) tableSchema.id(); final Table table = schema.tableFor(tableId); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index bb5ac4048..fde09b016 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -145,8 +145,9 @@ public void start(Configuration config) { schema, queue, connectorConfig.getTableFilters().dataCollectionFilter(), - DataChangeEvent::new); - dispatcher.setInconsistentSchemaHandler(PostgresChangeRecordEmitter::updateSchema); + DataChangeEvent::new, + PostgresChangeRecordEmitter::updateSchema + ); coordinator = new ChangeEventSourceCoordinator( previousOffset, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java index 26d38bfc6..ef378f0db 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java @@ -6,7 +6,6 @@ package io.debezium.connector.postgresql.snapshot; import java.time.Duration; -import java.util.Map; import java.util.Optional; import java.util.Set; @@ -22,11 +21,8 @@ */ public class ExportedSnapshotter implements Snapshotter { - private Map snapshotOverrides; - @Override public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { - this.snapshotOverrides = config.getSnapshotSelectOverridesByTable(); } @Override @@ -46,12 +42,7 @@ public boolean exportSnapshot() { @Override public Optional buildSnapshotQuery(TableId tableId) { - if (snapshotOverrides.containsKey(tableId)) { - return Optional.of(snapshotOverrides.get(tableId)); - } - else { - return Optional.of("select * from " + tableId.toDoubleQuotedString()); - } + return Optional.of("select * from " + tableId.toDoubleQuotedString()); } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/QueryingSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/QueryingSnapshotter.java index 9d3cceaa6..02d8a24bf 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/QueryingSnapshotter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/QueryingSnapshotter.java @@ -5,7 +5,6 @@ */ package io.debezium.connector.postgresql.snapshot; -import java.util.Map; import java.util.Optional; import io.debezium.connector.postgresql.PostgresConnectorConfig; @@ -16,26 +15,16 @@ public abstract class QueryingSnapshotter implements Snapshotter { - private Map snapshotOverrides; - @Override public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { - this.snapshotOverrides = config.getSnapshotSelectOverridesByTable(); } @Override public Optional buildSnapshotQuery(TableId tableId) { - if (snapshotOverrides.containsKey(tableId)) { - return Optional.of(snapshotOverrides.get(tableId)); - } - else { - // DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted - StringBuilder q = new StringBuilder(); - q.append("SELECT * FROM "); - q.append(tableId.toDoubleQuotedString()); - return Optional.of(q.toString()); - } + // DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted + StringBuilder q = new StringBuilder(); + q.append("SELECT * FROM "); + q.append(tableId.toDoubleQuotedString()); + return Optional.of(q.toString()); } - - } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index ffac8fa96..e2002fa42 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -57,7 +57,7 @@ public class EventDispatcher { private final Heartbeat heartbeat; private DataChangeEventListener eventListener = DataChangeEventListener.NO_OP; private final boolean emitTombstonesOnDelete; - private InconsistentSchemaHandler inconsistentSchemaHandler = this::errorOnMissingSchema; + private final InconsistentSchemaHandler inconsistentSchemaHandler; /** * Change event receiver for events dispatched from a streaming change event source. @@ -67,6 +67,11 @@ public class EventDispatcher { public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector topicSelector, DatabaseSchema schema, ChangeEventQueue queue, DataCollectionFilter filter, ChangeEventCreator changeEventCreator) { + this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null); + } + public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector topicSelector, + DatabaseSchema schema, ChangeEventQueue queue, DataCollectionFilter filter, + ChangeEventCreator changeEventCreator, InconsistentSchemaHandler inconsistentSchemaHandler) { this.topicSelector = topicSelector; this.schema = schema; this.historizedSchema = schema instanceof HistorizedDatabaseSchema @@ -77,6 +82,7 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector t this.changeEventCreator = changeEventCreator; this.streamingReceiver = new StreamingChangeRecordReceiver(); this.emitTombstonesOnDelete = connectorConfig.isEmitTombstoneOnDelete(); + this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema; heartbeat = Heartbeat.create(connectorConfig.getConfig(), topicSelector.getHeartbeatTopic(), connectorConfig.getLogicalName()); @@ -125,13 +131,12 @@ public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter chan // TODO handle as per inconsistent schema info option if (dataCollectionSchema == null) { - final Optional replacementSchema = inconsistentSchemaHandler.handle(dataCollectionId, changeRecordEmitter); + final Optional replacementSchema = inconsistentSchemaHandler.handle(dataCollectionId, changeRecordEmitter); if (!replacementSchema.isPresent()) { return; } dataCollectionSchema = replacementSchema.get(); } - dataCollectionSchema = changeRecordEmitter.synchronizeTableSchema(dataCollectionSchema); changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() { @@ -151,10 +156,6 @@ public void changeRecord(DataCollectionSchema schema, Operation operation, Objec ); } - public void setInconsistentSchemaHandler(InconsistentSchemaHandler inconsistentSchemaHandler) { - this.inconsistentSchemaHandler = inconsistentSchemaHandler; - } - public Optional errorOnMissingSchema(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) { eventListener.onErroneousEvent("source = " + dataCollectionId); throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java index 65abee078..55deee744 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java @@ -29,10 +29,6 @@ public interface ChangeRecordEmitter { */ OffsetContext getOffset(); - default DataCollectionSchema synchronizeTableSchema(DataCollectionSchema dataCollectionSchema) { - return dataCollectionSchema; - } - public interface Receiver { void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException; } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 94d8e93f0..a80bd9568 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -51,8 +51,6 @@ * * @author Gunnar Morling */ -// TODO Mostly, this should be usable for Postgres as well; only the aspect of managing the schema history will have to -// be made optional based on the connector public abstract class RelationalSnapshotChangeEventSource implements SnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class); @@ -228,7 +226,7 @@ private void delaySnapshotIfNeeded(ChangeEventSourceContext context) throws Inte */ protected void connectionCreated(SnapshotContext snapshotContext) throws Exception { } - + private Stream toTableIds(Set tableIds, Pattern pattern) { return tableIds .stream()