diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 860e98b23..b6e08d712 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -1124,7 +1124,8 @@ protected SourceInfoStructMaker getSourceInfoStruc HSTORE_HANDLING_MODE, BINARY_HANDLING_MODE, INTERVAL_HANDLING_MODE, - SCHEMA_REFRESH_MODE) + SCHEMA_REFRESH_MODE, + INCREMENTAL_SNAPSHOT_CHUNK_SIZE) .excluding(INCLUDE_SCHEMA_CHANGES) .create(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index ee830ccb7..a35300af1 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -187,7 +187,8 @@ public ChangeEventSourceCoordinator start(Configuration config) { PostgresChangeRecordEmitter::updateSchema, metadataProvider, heartbeat, - schemaNameAdjuster); + schemaNameAdjuster, + jdbcConnection); ChangeEventSourceCoordinator coordinator = new PostgresChangeEventSourceCoordinator( previousOffset, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index a7b76aebd..75a117cdd 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -275,4 +275,9 @@ public void event(DataCollectionId tableId, Instant instant) { public TransactionContext getTransactionContext() { return transactionContext; } + + @Override + public void incrementalSnapshotWindow() { + sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java new file mode 100644 index 000000000..f59fbb438 --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java @@ -0,0 +1,197 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.fest.assertions.Assertions; +import org.fest.assertions.MapAssert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.util.Testing; + +public class IncrementalSnapshotIT extends AbstractConnectorTest { + + private static final int ROW_COUNT = 1_000; + private static final String TOPIC_NAME = "test_server.s1.a"; + private static final int MAXIMUM_NO_RECORDS_CONSUMES = 3; + + private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" + + "CREATE SCHEMA s1; " + + "CREATE SCHEMA s2; " + + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" + + "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));"; + + @Before + public void before() throws SQLException { + TestHelper.dropAllSchemas(); + initializeConnectorTestFramework(); + } + + @After + public void after() { + stopConnector(); + TestHelper.dropDefaultReplicationSlot(); + TestHelper.dropPublication(); + } + + private void populateTable() throws SQLException { + try (final PostgresConnection pgConnection = TestHelper.create()) { + pgConnection.setAutoCommit(false); + for (int i = 0; i < ROW_COUNT; i++) { + pgConnection.executeWithoutCommitting(String.format("INSERT INTO s1.a (aa) VALUES (%s)", i)); + } + pgConnection.commit(); + } + } + + protected Map consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException { + final Map dbChanges = new HashMap<>(); + int noRecords = 0; + for (;;) { + final SourceRecords records = consumeRecordsByTopic(1); + final List dataRecords = records.recordsForTopic(TOPIC_NAME); + if (dataRecords == null || dataRecords.isEmpty()) { + noRecords++; + Assertions.assertThat(noRecords) + .describedAs("Too many no data record results") + .isLessThan(MAXIMUM_NO_RECORDS_CONSUMES); + continue; + } + noRecords = 0; + dataRecords.forEach(record -> { + final int id = ((Struct) record.key()).getInt32("pk"); + final int value = ((Struct) record.value()).getStruct("after").getInt32("aa"); + dbChanges.put(id, value); + }); + if (dbChanges.size() >= recordCount) { + break; + } + } + + Assertions.assertThat(dbChanges).hasSize(recordCount); + return dbChanges; + } + + @Test + public void snapshotOnly() throws Exception { + Testing.Print.enable(); + + TestHelper.dropDefaultReplicationSlot(); + TestHelper.execute(SETUP_TABLES_STMT); + populateTable(); + Configuration config = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") + .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) + .build(); + start(PostgresConnector.class, config); + assertConnectorIsRunning(); + TestHelper.waitForDefaultReplicationSlotBeActive(); + + waitForAvailableRecords(100, TimeUnit.MILLISECONDS); + // there shouldn't be any snapshot records + assertNoRecordsToConsume(); + + // Insert the signal record + TestHelper.execute("INSERT INTO s1.debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"s1.a\"]}')"); + + final int expectedRecordCount = ROW_COUNT; + final Map dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount); + for (int i = 0; i < expectedRecordCount; i++) { + Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i)); + } + } + + @Test + public void inserts() throws Exception { + Testing.Print.enable(); + + TestHelper.dropDefaultReplicationSlot(); + TestHelper.execute(SETUP_TABLES_STMT); + populateTable(); + Configuration config = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") + .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) + .build(); + start(PostgresConnector.class, config); + assertConnectorIsRunning(); + TestHelper.waitForDefaultReplicationSlotBeActive(); + + waitForAvailableRecords(100, TimeUnit.MILLISECONDS); + // there shouldn't be any snapshot records + assertNoRecordsToConsume(); + + // Insert the signal record + TestHelper.execute("INSERT INTO s1.debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"s1.a\"]}')"); + + try (final PostgresConnection pgConnection = TestHelper.create()) { + for (int i = 0; i < ROW_COUNT; i++) { + pgConnection.executeWithoutCommitting(String.format("INSERT INTO s1.a (aa) VALUES (%s)", i + ROW_COUNT)); + } + } + + final int expectedRecordCount = ROW_COUNT * 2; + final Map dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount); + for (int i = 0; i < expectedRecordCount; i++) { + Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i)); + } + } + + @Test + public void updates() throws Exception { + Testing.Print.enable(); + + TestHelper.dropDefaultReplicationSlot(); + TestHelper.execute(SETUP_TABLES_STMT); + populateTable(); + Configuration config = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") + .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) + .build(); + start(PostgresConnector.class, config); + assertConnectorIsRunning(); + TestHelper.waitForDefaultReplicationSlotBeActive(); + + waitForAvailableRecords(100, TimeUnit.MILLISECONDS); + // there shouldn't be any snapshot records + assertNoRecordsToConsume(); + + // Insert the signal record + TestHelper.execute("INSERT INTO s1.debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"s1.a\"]}')"); + + final int batchSize = 10; + try (final PostgresConnection pgConnection = TestHelper.create()) { + for (int i = 0; i < ROW_COUNT / batchSize; i++) { + TestHelper.execute(String.format("UPDATE s1.a SET aa = aa + 1000 WHERE pk > %s AND pk <= %s", i * batchSize, (i + 1) * batchSize)); + } + } + + final int expectedRecordCount = ROW_COUNT; + final Map dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount); + for (int i = 0; i < expectedRecordCount; i++) { + Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i + 1000)); + } + } +} diff --git a/debezium-connector-postgres/src/test/resources/log4j.properties b/debezium-connector-postgres/src/test/resources/log4j.properties index a76c9e1a9..d2a6b43af 100644 --- a/debezium-connector-postgres/src/test/resources/log4j.properties +++ b/debezium-connector-postgres/src/test/resources/log4j.properties @@ -3,14 +3,15 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n -log4j.appender.stdout.threshold=WARN +log4j.appender.stdout.threshold=DEBUG # Root logger option log4j.rootLogger=INFO, stdout # Set up the default logging to be INFO level, then override specific units log4j.logger.io.debezium=INFO -log4j.logger.io.debezium.connector.postgresql=INFO +log4j.logger.io.debezium.pipeline=DEBUG +log4j.logger.io.debezium.connector.postgresql=DEBUG log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG # Needed for PostgresConnectorIT.shouldClearDatabaseWarnings() log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN #log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index baf58abda..57a75d09a 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -323,6 +323,15 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { .withDescription("The maximum number of records that should be loaded into memory while performing a snapshot") .withValidation(Field::isNonNegativeInteger); + public static final Field INCREMENTAL_SNAPSHOT_CHUNK_SIZE = Field.create("incremental.snapshot.chunk.size") + .withDisplayName("Incremental snapshot chunk size") + .withType(Type.INT) + .withWidth(Width.MEDIUM) + .withImportance(Importance.MEDIUM) + .withDescription("The maximum size of chunk for incremental snapshotting") + .withDefault(1024) + .withValidation(Field::isNonNegativeInteger); + public static final Field SNAPSHOT_MODE_TABLES = Field.create("snapshot.include.collection.list") .withDisplayName("Snapshot mode include data collection") .withType(Type.LIST) @@ -454,6 +463,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { private final Duration snapshotDelayMs; private final Duration retriableRestartWait; private final int snapshotFetchSize; + private final int incrementalSnapshotChunkSize; private final int snapshotMaxThreads; private final Integer queryFetchSize; private final SourceInfoStructMaker sourceInfoStructMaker; @@ -479,6 +489,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize); this.snapshotMaxThreads = config.getInteger(SNAPSHOT_MAX_THREADS); this.queryFetchSize = config.getInteger(QUERY_FETCH_SIZE); + this.incrementalSnapshotChunkSize = config.getInteger(INCREMENTAL_SNAPSHOT_CHUNK_SIZE); this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION))); this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config); this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA); @@ -565,6 +576,10 @@ public int getQueryFetchSize() { return queryFetchSize; } + public int getIncrementalSnashotChunkSize() { + return incrementalSnapshotChunkSize; + } + public boolean shouldProvideTransactionMetadata() { return shouldProvideTransactionMetadata; } diff --git a/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java b/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java index 67de38bc9..1d34ea844 100644 --- a/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java +++ b/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java @@ -25,7 +25,11 @@ public enum SnapshotRecord { /** * Record is from streaming phase. */ - FALSE; + FALSE, + /** + * Record is from incremental snapshot window. + */ + INCREMENTAL; public static SnapshotRecord fromSource(Struct source) { if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index c94a5766a..19a36e36f 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -29,7 +29,9 @@ import io.debezium.data.Envelope; import io.debezium.data.Envelope.Operation; import io.debezium.heartbeat.Heartbeat; +import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.signal.Signal; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.spi.ChangeEventCreator; @@ -82,6 +84,7 @@ public class EventDispatcher { private final Schema schemaChangeValueSchema; private final TableChangesSerializer> tableChangesSerializer = new ConnectTableChangeSerializer(); private final Signal signal; + private final IncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource; /** * Change event receiver for events dispatched from a streaming change event source. @@ -91,20 +94,23 @@ public class EventDispatcher { public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector topicSelector, DatabaseSchema schema, ChangeEventQueue queue, DataCollectionFilter filter, ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) { - this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider, null, schemaNameAdjuster); + this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider, + null, schemaNameAdjuster, null); } public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector topicSelector, DatabaseSchema schema, ChangeEventQueue queue, DataCollectionFilter filter, ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) { - this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider, heartbeat, schemaNameAdjuster); + this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider, + heartbeat, schemaNameAdjuster, null); } public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector topicSelector, DatabaseSchema schema, ChangeEventQueue queue, DataCollectionFilter filter, ChangeEventCreator changeEventCreator, InconsistentSchemaHandler inconsistentSchemaHandler, - EventMetadataProvider metadataProvider, Heartbeat customHeartbeat, SchemaNameAdjuster schemaNameAdjuster) { + EventMetadataProvider metadataProvider, Heartbeat customHeartbeat, SchemaNameAdjuster schemaNameAdjuster, + JdbcConnection jdbcConnection) { this.connectorConfig = connectorConfig; this.topicSelector = topicSelector; this.schema = schema; @@ -121,6 +127,8 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector t this.neverSkip = connectorConfig.supportsOperationFiltering() || this.skippedOperations.isEmpty(); this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, this::enqueueTransactionMessage); + this.incrementalSnapshotChangeEventSource = new IncrementalSnapshotChangeEventSource( + connectorConfig, jdbcConnection, schema, this); this.signal = new Signal(connectorConfig, this); if (customHeartbeat != null) { heartbeat = customHeartbeat; @@ -176,6 +184,10 @@ public SnapshotReceiver getSnapshotChangeEventReceiver() { return new BufferingSnapshotChangeRecordReceiver(); } + public SnapshotReceiver getIncrementalSnapshotChangeEventReceiver() { + return new IncrementalSnapshotChangeRecordReceiver(); + } + /** * Dispatches one or more {@link DataChangeEvent}s. If the given data collection is included in the currently * captured set of collections, the given emitter will be invoked, so it can emit one or more events (in the common @@ -220,6 +232,7 @@ public void changeRecord(DataCollectionSchema schema, if (neverSkip || !skippedOperations.contains(operation)) { transactionMonitor.dataEvent(dataCollectionId, offset, key, value); eventListener.onEvent(dataCollectionId, offset, key, value); + incrementalSnapshotChangeEventSource.processMessage(dataCollectionId, key); streamingReceiver.changeRecord(schema, operation, key, value, offset, headers); } } @@ -364,7 +377,8 @@ public void changeRecord(DataCollectionSchema dataCollectionSchema, String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id()); SourceRecord record = new SourceRecord(offsetContext.getPartition(), - offsetContext.getOffset(), topicName, null, + incrementalSnapshotChangeEventSource.store(offsetContext.getOffset()), + topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), value, @@ -445,6 +459,37 @@ public void completeSnapshot() throws InterruptedException { } } + private final class IncrementalSnapshotChangeRecordReceiver implements SnapshotReceiver { + + @Override + public void changeRecord(DataCollectionSchema dataCollectionSchema, + Operation operation, + Object key, Struct value, + OffsetContext offsetContext, + ConnectHeaders headers) + throws InterruptedException { + Objects.requireNonNull(value, "value must not be null"); + + LOGGER.trace("Received change record for {} operation on key {}", operation, key); + + Schema keySchema = dataCollectionSchema.keySchema(); + String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id()); + + SourceRecord record = new SourceRecord( + offsetContext.getPartition(), + incrementalSnapshotChangeEventSource.store(offsetContext.getOffset()), + topicName, null, + keySchema, key, + dataCollectionSchema.getEnvelopeSchema().schema(), value, + null, headers); + queue.enqueue(changeEventCreator.createDataChangeEvent(record)); + } + + @Override + public void completeSnapshot() throws InterruptedException { + } + } + private final class SchemaChangeEventReceiver implements SchemaChangeEventEmitter.Receiver { private Struct schemaChangeRecordKey(SchemaChangeEvent event) { @@ -507,4 +552,8 @@ public DatabaseSchema getSchema() { public HistorizedDatabaseSchema getHistorizedSchema() { return historizedSchema; } + + public IncrementalSnapshotChangeEventSource getIncrementalSnapshotChangeEventSource() { + return incrementalSnapshotChangeEventSource; + } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/ExecuteSnapshot.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/ExecuteSnapshot.java new file mode 100644 index 000000000..44e0b7b38 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/ExecuteSnapshot.java @@ -0,0 +1,69 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.signal; + +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.document.Array; +import io.debezium.pipeline.signal.Signal.Payload; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; + +/** + * The action to trigger an ad-hoc snapshot. + * The action parameters are {@code type} of snapshot and list of {@code data-collections} on which the + * snapshot will be executed. + * + * @author Jiri Pechanec + * + */ +public class ExecuteSnapshot implements Signal.Action { + + private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteSnapshot.class); + private static final String FIELD_DATA_COLLECTIONS = "data-collections"; + private static final String FIELD_TYPE = "type"; + + public static final String NAME = "execute-snapshot"; + + public enum SnapshotType { + INCREMENTAL + } + + private final IncrementalSnapshotChangeEventSource eventSource; + + public ExecuteSnapshot(IncrementalSnapshotChangeEventSource eventSource) { + this.eventSource = eventSource; + } + + @Override + public boolean arrived(Payload signalPayload) { + final Array dataCollectionsArray = signalPayload.data.getArray("data-collections"); + if (dataCollectionsArray == null || dataCollectionsArray.isEmpty()) { + LOGGER.warn( + "Execute snapshot signal '{}' has arrived but the requested field '{}' is missing from data or is empty", + signalPayload, FIELD_DATA_COLLECTIONS); + return false; + } + final List dataCollections = dataCollectionsArray.streamValues().map(v -> v.asString().trim()) + .collect(Collectors.toList()); + final String typeStr = signalPayload.data.getString(FIELD_TYPE); + SnapshotType type = SnapshotType.INCREMENTAL; + if (typeStr != null) { + type = SnapshotType.valueOf(typeStr); + } + LOGGER.info("Requested '{}' snapshot of data collections '{}'", type, dataCollections); + switch (type) { + case INCREMENTAL: + eventSource.addDataCollectionNamesToSnapshot(dataCollections); + break; + } + return true; + } + +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/Signal.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/Signal.java index 676c05981..9f2dd4654 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/Signal.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/Signal.java @@ -21,6 +21,9 @@ import io.debezium.document.Document; import io.debezium.document.DocumentReader; import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.snapshot.incremental.CloseIncrementalSnapshotWindow; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; +import io.debezium.pipeline.source.snapshot.incremental.OpenIncrementalSnapshotWindow; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.schema.DataCollectionId; @@ -102,6 +105,10 @@ public Signal(CommonConnectorConfig connectorConfig, EventDispatcher { + + private static final Logger LOGGER = LoggerFactory.getLogger(IncrementalSnapshotChangeEventSource.class); + + public static final String INCREMENTAL_SNAPSHOT_KEY = "incremental_snapshot"; + public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY = INCREMENTAL_SNAPSHOT_KEY + "_collections"; + public static final String EVENT_PRIMARY_KEY = INCREMENTAL_SNAPSHOT_KEY + "_primary_key"; + public static final String TABLE_MAXIMUM_KEY = INCREMENTAL_SNAPSHOT_KEY + "_maximum_key"; + + // TODO Metrics + // Need to discuss and decide if + // - SnapshotChangeEventSourceMetricsMXBean would be extended with window metrics + // - new MXBean would be introduced with subset of SnapshotChangeEventSourceMetricsMXBean and additional window metrics + // - SnapshotChangeEventSourceMetricsMXBean would be reused and new MXBean should be introduce for window metrics + + // List needs to be used as key as it implements hashCode/equals contract + private Map window = new LinkedHashMap<>(); + private CommonConnectorConfig connectorConfig; + private JdbcConnection jdbcConnection; + // TODO Pass Clock + private final Clock clock = Clock.system(); + private final String signalWindowStatement; + private final RelationalDatabaseSchema databaseSchema; + private final EventDispatcher dispatcher; + + private boolean windowOpened = false; + private Object[] chunkEndPosition; + private Table currentTable; + + // TODO Extract into a separate IncrementalSnapshotContext + // TODO After extracting add into source info optional block incrementalSnapshotWindow{String from, String to} + // State to be stored and recovered from offsets + private final Queue dataCollectionsToSnapshot = new LinkedList<>(); + // The PK of the last record that was passed to Kafka Connect + // In case of connector restart the start of the first window will be populated from it + private Object[] lastEventSentKey; + // The largest PK in the table at the start of snapshot + private Object[] maximumKey; + + public IncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection, + DatabaseSchema databaseSchema, EventDispatcher dispatcher) { + this.connectorConfig = config; + this.jdbcConnection = jdbcConnection; + signalWindowStatement = "INSERT INTO " + connectorConfig.getSignalingDataCollectionId() + + " VALUES (?, ?, null)"; + this.databaseSchema = (RelationalDatabaseSchema) databaseSchema; + this.dispatcher = dispatcher; + } + + public void windowOpen() { + LOGGER.info("Opening window for incremental snapshot batch"); + windowOpened = true; + } + + public void windowClosed(OffsetContext offsetContext) { + try { + LOGGER.info("Closing window for incremental snapshot chunk"); + windowOpened = false; + // TODO There is an issue here + // Events are emitted with tx log coordinates of the CloseIncrementalSnapshotWindow + // These means that if the connector is restarted in the middle of emptying the buffer + // then the rest of the buffer might not be resent or even the snapshotting restarted + // as there is no more of events. + // Most probably could be solved by injecting a sequence of windowOpen/Closed upon the start + offsetContext.incrementalSnapshotWindow(); + for (Object[] row : window.values()) { + sendEnvent(offsetContext, row); + } + offsetContext.postSnapshotCompletion(); + window.clear(); + populateWindow(); + } + catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + protected void sendEnvent(OffsetContext offsetContext, Object[] row) throws InterruptedException { + lastEventSentKey = keyFromRow(row); + offsetContext.event((T) currentDataCollectionId(), clock.currentTimeAsInstant()); + dispatcher.dispatchSnapshotEvent((T) currentDataCollectionId(), + getChangeRecordEmitter(currentDataCollectionId(), offsetContext, row), + dispatcher.getIncrementalSnapshotChangeEventReceiver()); + } + + /** + * Returns a {@link ChangeRecordEmitter} producing the change records for + * the given table row. + */ + protected ChangeRecordEmitter getChangeRecordEmitter(T dataCollectionId, OffsetContext offsetContext, + Object[] row) { + return new SnapshotChangeRecordEmitter(offsetContext, row, clock); + } + + public void processMessage(DataCollectionId dataCollectionId, Object key) { + LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window); + if (!windowOpened || window.isEmpty()) { + return; + } + if (!currentDataCollectionId().equals(dataCollectionId)) { + return; + } + if (key instanceof Struct) { + if (window.remove((Struct) key) != null) { + LOGGER.info("Removed '{}' from window", key); + } + } + } + + protected T currentDataCollectionId() { + return dataCollectionsToSnapshot.peek(); + } + + private void emitWindowOpen() throws SQLException { + jdbcConnection.prepareUpdate(signalWindowStatement, x -> { + x.setString(1, UUID.randomUUID().toString()); + x.setString(2, OpenIncrementalSnapshotWindow.NAME); + }); + } + + private void emitWindowClose() throws SQLException { + jdbcConnection.prepareUpdate(signalWindowStatement, x -> { + x.setString(1, UUID.randomUUID().toString()); + x.setString(2, CloseIncrementalSnapshotWindow.NAME); + }); + } + + protected String buildChunkQuery(Table table) { + final StringBuilder sql = new StringBuilder("SELECT * FROM "); + sql.append(table.id().toString()); + + // Add condition when this is not the first query + if (isNonInitialChunk()) { + // Window boundaries + sql.append(" WHERE "); + addKeyColumnsToCondition(table, sql, " >= ?"); + sql.append(" AND NOT ("); + addKeyColumnsToCondition(table, sql, " = ?"); + sql.append(")"); + // Table boundaries + sql.append(" AND "); + addKeyColumnsToCondition(table, sql, " <= ?"); + } + // TODO limiting is db dialect based + sql.append(" ORDER BY ") + .append(table.primaryKeyColumns().stream().map(Column::name).collect(Collectors.joining(", "))) + .append(" LIMIT ").append(connectorConfig.getIncrementalSnashotChunkSize()); + return sql.toString(); + } + + private boolean isNonInitialChunk() { + return chunkEndPosition != null; + } + + protected String buildMaxPrimaryKeyQuery(Table table) { + final StringBuilder sql = new StringBuilder("SELECT * FROM "); + sql.append(table.id().toString()); + + // TODO limiting is db dialect based + sql.append(" ORDER BY ") + .append(table.primaryKeyColumns().stream().map(Column::name).collect(Collectors.joining(" DESC, "))) + .append(" DESC LIMIT ").append(1); + return sql.toString(); + } + + public void nextChunk(Object[] end) { + chunkEndPosition = end; + } + + private void resetChunk() { + chunkEndPosition = null; + maximumKey = null; + } + + protected boolean tablesAvailable() { + return !dataCollectionsToSnapshot.isEmpty(); + } + + protected void setMaximumKey(Object[] key) { + maximumKey = key; + } + + private boolean hasMaximumKey() { + return maximumKey != null; + } + + private void populateWindow() throws InterruptedException { + if (!tablesAvailable()) { + return; + } + try { + emitWindowOpen(); + while (tablesAvailable()) { + final TableId currentTableId = (TableId) currentDataCollectionId(); + currentTable = databaseSchema.tableFor(currentTableId); + if (!hasMaximumKey()) { + setMaximumKey(jdbcConnection.queryAndMap(buildMaxPrimaryKeyQuery(currentTable), rs -> { + if (!rs.next()) { + return null; + } + return keyFromRow(rowToArray(currentTable, rs, ColumnUtils.toArray(rs, currentTable))); + })); + if (!hasMaximumKey()) { + LOGGER.info( + "No maximum key returned by the query, incremental snapshotting of table '{}' finished as it is empty", + currentTableId); + nextDataCollection(); + continue; + } + LOGGER.info("Incremental snapshot for table '{}' will end at position {}", currentTableId, + maximumKey); + } + createDataEventsForTable(dataCollectionsToSnapshot.size()); + if (window.isEmpty()) { + LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished", + currentTableId); + nextDataCollection(); + } + else { + break; + } + } + emitWindowClose(); + } + catch (SQLException e) { + throw new DebeziumException("Database error while executing incremental snapshot", e); + } + } + + protected T nextDataCollection() { + resetChunk(); + return dataCollectionsToSnapshot.poll(); + } + + private void addTablesIdsToSnapshot(List dataCollectionIds) { + boolean shouldPopulateWindow = false; + if (!tablesAvailable()) { + shouldPopulateWindow = true; + } + dataCollectionsToSnapshot.addAll(dataCollectionIds); + if (shouldPopulateWindow) { + try { + populateWindow(); + } + catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + + @SuppressWarnings("unchecked") + public void addDataCollectionNamesToSnapshot(List dataCollectionIds) { + addTablesIdsToSnapshot(dataCollectionIds.stream().map(x -> (T) TableId.parse(x)).collect(Collectors.toList())); + } + + protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) { + for (Iterator i = table.primaryKeyColumns().iterator(); i.hasNext();) { + final Column key = i.next(); + sql.append(key.name()).append(predicate); + if (i.hasNext()) { + sql.append(" AND "); + } + } + } + + /** + * Dispatches the data change events for the records of a single table. + */ + private void createDataEventsForTable(int tableCount) throws InterruptedException { + long exportStart = clock.currentTimeInMillis(); + LOGGER.debug("Exporting data chunk from table '{}' (total {} tables)", currentTable.id(), tableCount); + + final String selectStatement = buildChunkQuery(currentTable); + LOGGER.debug("\t For table '{}' using select statement: '{}', key: '{}', maximum key: '{}'", currentTable.id(), + selectStatement, chunkEndPosition, maximumKey); + + final TableSchema tableSchema = databaseSchema.schemaFor(currentTable.id()); + + try (PreparedStatement statement = readTableStatement(selectStatement); + ResultSet rs = statement.executeQuery()) { + + final ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, currentTable); + long rows = 0; + Timer logTimer = getTableScanLogTimer(); + + Object[] lastRow = null; + while (rs.next()) { + rows++; + final Object[] row = rowToArray(currentTable, rs, columnArray); + final Struct keyStruct = tableSchema.keyFromColumnData(row); + window.put(keyStruct, row); + if (logTimer.expired()) { + long stop = clock.currentTimeInMillis(); + LOGGER.debug("\t Exported {} records for table '{}' after {}", rows, currentTable.id(), + Strings.duration(stop - exportStart)); + logTimer = getTableScanLogTimer(); + } + lastRow = row; + } + nextChunk(keyFromRow(lastRow)); + if (lastRow != null) { + LOGGER.debug("\t Next window will resume from '{}'", chunkEndPosition); + } + + LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows, + currentTable.id(), Strings.duration(clock.currentTimeInMillis() - exportStart)); + } + catch (SQLException e) { + throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed", e); + } + } + + // Extract to JdbcConnection, same as in RelationalSnapshotChangeEventSource + protected Object[] rowToArray(Table table, ResultSet rs, ColumnUtils.ColumnArray columnArray) throws SQLException { + final Object[] row = new Object[columnArray.getGreatestColumnPosition()]; + for (int i = 0; i < columnArray.getColumns().length; i++) { + row[columnArray.getColumns()[i].position() - 1] = getColumnValue(rs, i + 1, columnArray.getColumns()[i], + table); + } + return row; + } + + // TODO Parmetrize the method and extract it to JdbcConnection + /** + * Allow per-connector query creation to override for best database + * performance depending on the table size. + */ + protected PreparedStatement readTableStatement(String sql) throws SQLException { + int fetchSize = connectorConfig.getSnapshotFetchSize(); + PreparedStatement statement = jdbcConnection.connection().prepareStatement(sql); + if (isNonInitialChunk()) { + for (int i = 0; i < chunkEndPosition.length; i++) { + statement.setObject(i + 1, chunkEndPosition[i]); + statement.setObject(i + 1 + chunkEndPosition.length, chunkEndPosition[i]); + statement.setObject(i + 1 + 2 * chunkEndPosition.length, maximumKey[i]); + } + } + statement.setFetchSize(fetchSize); + return statement; + } + + private Timer getTableScanLogTimer() { + return Threads.timer(clock, RelationalSnapshotChangeEventSource.LOG_INTERVAL); + } + + // TODO Extract these two methods from *SnapshotChangeEventSource to JdbcValueConverters or JdbcConnection + protected Object getColumnValue(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException { + return getColumnValue(rs, columnIndex, column); + } + + @Deprecated + protected Object getColumnValue(ResultSet rs, int columnIndex, Column column) throws SQLException { + return rs.getObject(columnIndex); + } + + private Object[] keyFromRow(Object[] row) { + if (row == null) { + return null; + } + final List keyColumns = currentTable.primaryKeyColumns(); + final Object[] key = new Object[keyColumns.size()]; + for (int i = 0; i < keyColumns.size(); i++) { + key[i] = row[keyColumns.get(i).position() - 1]; + } + return key; + } + + private String arrayToSerializedString(Object[] array) { + try (final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(array); + return HexConverter.convertToHexString(bos.toByteArray()); + } + catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + private Object[] serializedStringToArray(String field, String serialized) { + try (final ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized)); + ObjectInputStream ois = new ObjectInputStream(bis)) { + return (Object[]) ois.readObject(); + } + catch (Exception e) { + throw new DebeziumException(String.format("Failed to deserialize '%s' with value '%s'", field, serialized), e); + } + } + + private String dataCollectionsToSnapshotAsString() { + // TODO Handle non-standard table ids containing dots, commas etc. + return dataCollectionsToSnapshot.stream() + .map(x -> x.toString()) + .collect(Collectors.joining(",")); + } + + private List stringToDataCollections(String dataCollectionsStr) { + return Arrays.asList(dataCollectionsStr.split(",")); + } + + public Map store(Map iOffset) { + @SuppressWarnings("unchecked") + final Map offset = (Map) iOffset; + if (!tablesAvailable()) { + return offset; + } + offset.put(EVENT_PRIMARY_KEY, arrayToSerializedString(lastEventSentKey)); + offset.put(TABLE_MAXIMUM_KEY, arrayToSerializedString(maximumKey)); + offset.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY, dataCollectionsToSnapshotAsString()); + return offset; + } + + // TODO Call on connector start + public void load(Map offsets) { + if (offsets == null) { + return; + } + final String lastEventSentKeyStr = (String) offsets.get(EVENT_PRIMARY_KEY); + chunkEndPosition = (lastEventSentKeyStr != null) ? serializedStringToArray(EVENT_PRIMARY_KEY, lastEventSentKeyStr) : null; + lastEventSentKey = null; + final String maximumKeyStr = (String) offsets.get(TABLE_MAXIMUM_KEY); + maximumKey = (maximumKeyStr != null) ? serializedStringToArray(TABLE_MAXIMUM_KEY, maximumKeyStr) : null; + final String dataCollectionsStr = (String) offsets.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY); + dataCollectionsToSnapshot.clear(); + if (dataCollectionsStr != null) { + addDataCollectionNamesToSnapshot(stringToDataCollections(dataCollectionsStr)); + } + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/OpenIncrementalSnapshotWindow.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/OpenIncrementalSnapshotWindow.java new file mode 100644 index 000000000..91da1bcc9 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/OpenIncrementalSnapshotWindow.java @@ -0,0 +1,32 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.source.snapshot.incremental; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.pipeline.signal.Signal; +import io.debezium.pipeline.signal.Signal.Payload; + +public class OpenIncrementalSnapshotWindow implements Signal.Action { + + private static final Logger LOGGER = LoggerFactory.getLogger(OpenIncrementalSnapshotWindow.class); + + public static final String NAME = "snapshot-window-open"; + + private IncrementalSnapshotChangeEventSource eventSource; + + public OpenIncrementalSnapshotWindow(IncrementalSnapshotChangeEventSource eventSource) { + this.eventSource = eventSource; + } + + @Override + public boolean arrived(Payload signalPayload) { + eventSource.windowOpen(); + return true; + } + +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java index 4dedd928f..ff38fc938 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java @@ -78,4 +78,12 @@ interface Loader { * @return transaction context */ TransactionContext getTransactionContext(); + + default void incrementalSnapshotWindow() { + + } + + default void incrementalSnapshotStop() { + + } } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 83ebe5894..1d35ff01b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -57,7 +57,7 @@ public abstract class RelationalSnapshotChangeEventSource extends AbstractSnapsh /** * Interval for showing a log statement with the progress while scanning a single table. */ - private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000); + public static final Duration LOG_INTERVAL = Duration.ofMillis(10_000); private final RelationalDatabaseConnectorConfig connectorConfig; private final OffsetContext previousOffset; diff --git a/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSourceTest.java b/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSourceTest.java new file mode 100644 index 000000000..ffe0326df --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSourceTest.java @@ -0,0 +1,68 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.source.snapshot.incremental; + +import org.fest.assertions.Assertions; +import org.junit.Test; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.relational.Column; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; + +public class IncrementalSnapshotChangeEventSourceTest { + + protected CommonConnectorConfig config() { + return new CommonConnectorConfig( + Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").build(), + "core", 0) { + @Override + protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { + return null; + } + + @Override + public String getContextName() { + return null; + } + + @Override + public String getConnectorName() { + return null; + } + }; + } + + @Test + public void testBuildQuery() { + final IncrementalSnapshotChangeEventSource source = new IncrementalSnapshotChangeEventSource<>(config(), null, null, null); + final Column pk1 = Column.editor().name("pk1").create(); + final Column pk2 = Column.editor().name("pk2").create(); + final Column val1 = Column.editor().name("val1").create(); + final Column val2 = Column.editor().name("val2").create(); + final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2) + .addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create(); + Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM s1.table1 ORDER BY pk1, pk2 LIMIT 1024"); + source.nextChunk(new Object[]{ 1, 5 }); + source.setMaximumKey(new Object[]{ 10, 50 }); + Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo( + "SELECT * FROM s1.table1 WHERE pk1 >= ? AND pk2 >= ? AND NOT (pk1 = ? AND pk2 = ?) AND pk1 <= ? AND pk2 <= ? ORDER BY pk1, pk2 LIMIT 1024"); + } + + @Test + public void testMaxQuery() { + final IncrementalSnapshotChangeEventSource source = new IncrementalSnapshotChangeEventSource<>(config(), null, null, null); + final Column pk1 = Column.editor().name("pk1").create(); + final Column pk2 = Column.editor().name("pk2").create(); + final Column val1 = Column.editor().name("val1").create(); + final Column val2 = Column.editor().name("val2").create(); + final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2) + .addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create(); + Assertions.assertThat(source.buildMaxPrimaryKeyQuery(table)).isEqualTo("SELECT * FROM s1.table1 ORDER BY pk1 DESC, pk2 DESC LIMIT 1"); + } +}