From def7379a0df65813666dcea2117e959d7e401e0c Mon Sep 17 00:00:00 2001 From: mfvitale Date: Wed, 19 Jul 2023 14:42:30 +0200 Subject: [PATCH] DBZ-6566 Support Blocking snapshot for MySQL --- debezium-connector-mysql/pom.xml | 6 + .../mysql/MySqlChangeEventSourceFactory.java | 6 +- .../mysql/MySqlSnapshotChangeEventSource.java | 11 +- .../MySqlStreamingChangeEventSource.java | 6 + .../connector/mysql/BlockingSnapshotIT.java | 149 ++++++++ .../resources/ddl/blocking_snapshot-test.sql | 16 + .../src/test/resources/logback-test.xml | 13 + debezium-connector-postgres/pom.xml | 6 + .../PostgresStreamingChangeEventSource.java | 1 - .../postgresql/BlockingSnapshotIT.java | 85 +---- .../connector/base/ChangeEventQueue.java | 9 +- .../ChangeEventSourceCoordinator.java | 2 - .../io/debezium/pipeline/EventDispatcher.java | 6 + .../RelationalSnapshotChangeEventSource.java | 11 +- .../junit/logging/LogInterceptor.java | 11 + .../AbstractBlockingSnapshotTest.java | 240 +++++++++++++ .../AbstractIncrementalSnapshotTest.java | 309 +--------------- .../incremental/AbstractSnapshotTest.java | 332 ++++++++++++++++++ 18 files changed, 822 insertions(+), 397 deletions(-) create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java create mode 100644 debezium-connector-mysql/src/test/resources/ddl/blocking_snapshot-test.sql create mode 100644 debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java create mode 100644 debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java diff --git a/debezium-connector-mysql/pom.xml b/debezium-connector-mysql/pom.xml index aa34f1149..2f865e537 100644 --- a/debezium-connector-mysql/pom.xml +++ b/debezium-connector-mysql/pom.xml @@ -105,6 +105,12 @@ io.apicurio apicurio-registry-utils-converter test + + + slf4j-jboss-logmanager + org.jboss.slf4j + + org.awaitility diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java index e10fb1fb8..fe397f164 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java @@ -63,7 +63,11 @@ public MySqlChangeEventSourceFactory(MySqlConnectorConfig configuration, MainCon public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener, NotificationService notificationService) { return new MySqlSnapshotChangeEventSource(configuration, connectionFactory, taskContext.getSchema(), dispatcher, clock, - (MySqlSnapshotChangeEventSourceMetrics) snapshotProgressListener, this::modifyAndFlushLastRecord, notificationService); + (MySqlSnapshotChangeEventSourceMetrics) snapshotProgressListener, this::modifyAndFlushLastRecord, this::preSnapshot, notificationService); + } + + private void preSnapshot() { + queue.enableBuffering(); } private void modifyAndFlushLastRecord(Function modify) throws InterruptedException { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java index b9950fa09..d9f963327 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java @@ -67,11 +67,13 @@ public class MySqlSnapshotChangeEventSource extends RelationalSnapshotChangeEven private final List schemaEvents = new ArrayList<>(); private Set delayedSchemaSnapshotTables = Collections.emptySet(); private final BlockingConsumer> lastEventProcessor; + private final Runnable preSnapshotAction; public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, MainConnectionProvidingConnectionFactory connectionFactory, MySqlDatabaseSchema schema, EventDispatcher dispatcher, Clock clock, MySqlSnapshotChangeEventSourceMetrics metrics, BlockingConsumer> lastEventProcessor, + Runnable preSnapshotAction, NotificationService notificationService) { super(connectorConfig, connectionFactory, schema, dispatcher, clock, metrics, notificationService); this.connectorConfig = connectorConfig; @@ -80,6 +82,7 @@ public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, Main this.metrics = metrics; this.databaseSchema = schema; this.lastEventProcessor = lastEventProcessor; + this.preSnapshotAction = preSnapshotAction; } @Override @@ -88,7 +91,7 @@ protected SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOf boolean snapshotData = true; // found a previous offset and the earlier snapshot has completed - if (previousOffset != null && !previousOffset.isSnapshotRunning()) { + if (previousOffset != null && !previousOffset.isSnapshotRunning() && false /* TODO check if streaming is pause */) { LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted."); snapshotSchema = databaseSchema.isStorageInitializationExecuted(); snapshotData = false; @@ -652,6 +655,12 @@ protected void postSnapshot() throws InterruptedException { super.postSnapshot(); } + @Override + protected void preSnapshot() throws InterruptedException { + preSnapshotAction.run(); + super.preSnapshot(); + } + @Override protected MySqlOffsetContext copyOffset(RelationalSnapshotContext snapshotContext) { return new Loader(connectorConfig).load(snapshotContext.offset.getOffset()); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index c3bc23d4d..de4bf33ca 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -1038,6 +1038,12 @@ public void execute(ChangeEventSourceContext context, MySqlPartition partition, } while (context.isRunning()) { Thread.sleep(100); + if (context.isPaused()) { + LOGGER.info("Streaming will now pause"); + context.streamingPaused(); + context.waitSnapshotCompletion(); + LOGGER.info("Streaming resumed"); + } } } finally { diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java new file mode 100644 index 000000000..f1d8b658d --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java @@ -0,0 +1,149 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.mysql; + +import java.sql.SQLException; +import java.util.List; + +import org.junit.After; +import org.junit.Before; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.pipeline.AbstractBlockingSnapshotTest; +import io.debezium.relational.TableId; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.util.Testing; + +public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest { + + protected static final String SERVER_NAME = "is_test"; + protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH); + + @Before + public void before() throws SQLException { + stopConnector(); + DATABASE.createAndInitialize(); + initializeConnectorTestFramework(); + Testing.Files.delete(SCHEMA_HISTORY_PATH); + } + + @After + public void after() { + try { + stopConnector(); + } + finally { + Testing.Files.delete(SCHEMA_HISTORY_PATH); + } + } + + protected Configuration.Builder config() { + return DATABASE.defaultConfig() + .with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true) + .with(MySqlConnectorConfig.USER, "mysqluser") + .with(MySqlConnectorConfig.PASSWORD, "mysqlpw") + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY.getValue()) + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) + .with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal")) + .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1); + } + + @Override + protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) { + final String tableIncludeList; + if (signalTableOnly) { + tableIncludeList = DATABASE.qualifiedTableName("c"); + } + else { + tableIncludeList = DATABASE.qualifiedTableName("a") + ", " + DATABASE.qualifiedTableName("c"); + } + return DATABASE.defaultConfig() + .with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true) + .with(MySqlConnectorConfig.USER, "mysqluser") + .with(MySqlConnectorConfig.PASSWORD, "mysqlpw") + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL.getValue()) + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) + .with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal")) + .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) + .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList) + .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl) + .with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO); + } + + @Override + protected String connector() { + return "mysql"; + } + + @Override + protected String server() { + return DATABASE.getServerName(); + } + + @Override + protected Class connectorClass() { + return MySqlConnector.class; + } + + @Override + protected JdbcConnection databaseConnection() { + return MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()); + } + + @Override + protected String topicName() { + return DATABASE.topicForTable("a"); + } + + @Override + protected List topicNames() { + return List.of(DATABASE.topicForTable("a"), DATABASE.topicForTable("c")); + } + + @Override + protected String tableName() { + return tableNameId().toQuotedString('`'); + } + + @Override + protected List tableNames() { + final String tableA = TableId.parse(DATABASE.qualifiedTableName("a")).toQuotedString('`'); + final String tableB = TableId.parse(DATABASE.qualifiedTableName("c")).toQuotedString('`'); + return List.of(tableA, tableB); + } + + @Override + protected String signalTableName() { + return tableNameId("debezium_signal").toQuotedString('`'); + } + + @Override + protected String signalTableNameSanitized() { + return DATABASE.qualifiedTableName("debezium_signal"); + } + + @Override + protected String tableDataCollectionId() { + return tableNameId().toString(); + } + + @Override + protected List tableDataCollectionIds() { + return List.of(tableNameId().toString(), tableNameId("c").toString()); + } + + private TableId tableNameId() { + return tableNameId("a"); + } + + private TableId tableNameId(String table) { + return TableId.parse(DATABASE.qualifiedTableName(table)); + } + +} diff --git a/debezium-connector-mysql/src/test/resources/ddl/blocking_snapshot-test.sql b/debezium-connector-mysql/src/test/resources/ddl/blocking_snapshot-test.sql new file mode 100644 index 000000000..fcb9906ea --- /dev/null +++ b/debezium-connector-mysql/src/test/resources/ddl/blocking_snapshot-test.sql @@ -0,0 +1,16 @@ +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: incremental_snapshot_test +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE TABLE a ( + pk INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + aa INTEGER +) AUTO_INCREMENT = 1; + +CREATE TABLE debezium_signal ( + id varchar(64), + type varchar(32), + data varchar(2048) +); + +CREATE DATABASE IF NOT EXISTS emptydb; diff --git a/debezium-connector-mysql/src/test/resources/logback-test.xml b/debezium-connector-mysql/src/test/resources/logback-test.xml index a74cd6883..5273d3656 100644 --- a/debezium-connector-mysql/src/test/resources/logback-test.xml +++ b/debezium-connector-mysql/src/test/resources/logback-test.xml @@ -26,4 +26,17 @@ level="error" additivity="false"> + + + + + + + + + diff --git a/debezium-connector-postgres/pom.xml b/debezium-connector-postgres/pom.xml index 9a49ceea1..bc4741749 100644 --- a/debezium-connector-postgres/pom.xml +++ b/debezium-connector-postgres/pom.xml @@ -138,6 +138,12 @@ io.apicurio apicurio-registry-utils-converter test + + + slf4j-jboss-logmanager + org.jboss.slf4j + + diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 33adbf1c9..16741c48b 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -244,7 +244,6 @@ private void processMessages(ChangeEventSourceContext context, PostgresPartition connection.commit(); } - // LOGGER.info("Checking stream paused"); if (context.isPaused()) { LOGGER.info("Streaming will now pause"); context.streamingPaused(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java index ef37d9072..405bfd24e 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java @@ -6,28 +6,20 @@ package io.debezium.connector.postgresql; -import static io.debezium.pipeline.signal.actions.AbstractSnapshotSignal.SnapshotType.INITIAL_BLOCKING; -import static org.assertj.core.api.Assertions.assertThat; - import java.sql.SQLException; import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.kafka.connect.data.Struct; import org.junit.After; import org.junit.Before; -import org.junit.Test; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; import io.debezium.jdbc.JdbcConnection; -import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest; +import io.debezium.pipeline.AbstractBlockingSnapshotTest; import io.debezium.relational.RelationalDatabaseConnectorConfig; -public class BlockingSnapshotIT extends AbstractIncrementalSnapshotTest { +public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest { private static final String TOPIC_NAME = "test_server.s1.a"; @@ -64,9 +56,7 @@ protected Configuration.Builder config() { .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1") .with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source") .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) - .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4") - // DBZ-4272 required to allow dropping columns just before an incremental snapshot - .with("database.autosave", "conservative"); + .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4"); } @Override @@ -79,9 +69,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1") - .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a") - // DBZ-4272 required to allow dropping columns just before an incremental snapshot - .with("database.autosave", "conservative"); + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a"); } @Override @@ -119,12 +107,6 @@ protected String signalTableName() { return "s1.debezium_signal"; } - @Override - protected void waitForConnectorToStart() { - super.waitForConnectorToStart(); - TestHelper.waitForDefaultReplicationSlotBeActive(); - } - @Override protected String connector() { return "postgres"; @@ -135,63 +117,4 @@ protected String server() { return TestHelper.TEST_SERVER; } - @Test - public void executeBlockingSnapshot() throws Exception { - // Testing.Print.enable(); - - populateTable(); - - startConnectorWithSnapshot(x -> mutableConfig(false, false)); - - waitForSnapshotToBeCompleted(connector(), server(), task(), database()); - - try (JdbcConnection connection = databaseConnection()) { - connection.setAutoCommit(false); - for (int i = 0; i < ROW_COUNT; i++) { - connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", - tableName(), - connection.quotedColumnIdString(pkFieldName()), - i + ROW_COUNT + 1, - i + ROW_COUNT)); - } - connection.commit(); - } - - SourceRecords snapshotAndStreamingRecords = consumeRecordsByTopic(ROW_COUNT * 2); - assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(ROW_COUNT * 2); - List actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream() - .map(s -> ((Struct) s.value()).getStruct("after").getInt32("aa")) - .collect(Collectors.toList()); - assertThat(actual).containsAll(IntStream.range(0, 1999).boxed().collect(Collectors.toList())); - - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), INITIAL_BLOCKING, tableDataCollectionId()); - - waitForSnapshotToBeCompleted(connector(), server(), task(), database()); - - snapshotAndStreamingRecords = consumeRecordsByTopic((ROW_COUNT * 2) + 1); - assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo((ROW_COUNT * 2) + 1); - actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream() - .map(s -> ((Struct) s.value()).getStruct("after").getInt32("aa")) - .collect(Collectors.toList()); - assertThat(actual).containsAll(IntStream.range(0, 1999).boxed().collect(Collectors.toList())); - - try (JdbcConnection connection = databaseConnection()) { - connection.setAutoCommit(false); - for (int i = 0; i < ROW_COUNT; i++) { - connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", - tableName(), - connection.quotedColumnIdString(pkFieldName()), - i + (ROW_COUNT * 2) + 1, - i + (ROW_COUNT * 2))); - } - connection.commit(); - } - - snapshotAndStreamingRecords = consumeRecordsByTopic(ROW_COUNT + 1); - assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(ROW_COUNT + 1); - actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream() - .map(s -> ((Struct) s.value()).getStruct("after").getInt32("aa")) - .collect(Collectors.toList()); - assertThat(actual).containsAll(IntStream.range(2000, 2999).boxed().collect(Collectors.toList())); - } } diff --git a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java index 3de81d2e3..984e70a9e 100644 --- a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java +++ b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java @@ -187,7 +187,7 @@ record = bufferedEvent.getAndSet(record); * @throws InterruptedException */ public void flushBuffer(Function recordModifier) throws InterruptedException { - assert buffering : "Unsuported for queues with disabled buffering"; + assert buffering : "Unsupported for queues with disabled buffering"; T record = bufferedEvent.getAndSet(null); if (record != null) { doEnqueue(recordModifier.apply(record)); @@ -202,6 +202,13 @@ public void disableBuffering() { buffering = false; } + /** + * Enable buffering for the queue + */ + public void enableBuffering() { + buffering = true; + } + protected void doEnqueue(T record) throws InterruptedException { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Enqueuing source record '{}'", record); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index c127bede6..ea6c2e17b 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -11,7 +11,6 @@ import java.util.Optional; import java.util.ServiceLoader; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; @@ -260,7 +259,6 @@ protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSourceContex protected void streamEvents(ChangeEventSourceContext context, P partition, O offsetContext) throws InterruptedException { initStreamEvents(partition, offsetContext); LOGGER.info("Starting streaming"); - // Maybe add a pause and restart method that should be called from the action through the coordinator streamingSource.execute(context, partition, offsetContext); LOGGER.info("Finished streaming"); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index 1d0ea291d..e11cb40df 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -214,6 +214,9 @@ public void changeRecord(P partition, OffsetContext offset, ConnectHeaders headers) throws InterruptedException { + + LOGGER.trace("Received change record {} for {} operation on key {} with context {}", value, operation, key, offset); + eventListener.onEvent(partition, dataCollectionSchema.id(), offset, key, value, operation); receiver.changeRecord(partition, dataCollectionSchema, operation, key, value, offset, headers); } @@ -268,6 +271,9 @@ public void changeRecord(P partition, OffsetContext offset, ConnectHeaders headers) throws InterruptedException { + + LOGGER.trace("Received change record {} for {} operation on key {} with context {}", value, operation, key, offset); + if (operation == Operation.CREATE && connectorConfig.isSignalDataCollection(dataCollectionId) && sourceSignalChannel != null) { sourceSignalChannel.process(value); diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 30e720a13..02ea4ef00 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -106,8 +106,11 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, O previousO final RelationalSnapshotContext ctx = (RelationalSnapshotContext) snapshotContext; Connection connection = null; - Exception exceptionWhileSnapshot = null; + Throwable exceptionWhileSnapshot = null; try { + + preSnapshot(); + LOGGER.info("Snapshot step 1 - Preparing"); if (previousOffset != null && previousOffset.isSnapshotRunning()) { @@ -165,7 +168,7 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, O previousO dispatcher.alwaysDispatchHeartbeatEvent(ctx.partition, ctx.offset); return SnapshotResult.completed(ctx.offset); } - catch (final Exception e) { + catch (final Throwable e) { LOGGER.error("Error during snapshot", e); exceptionWhileSnapshot = e; throw e; @@ -749,4 +752,8 @@ protected Clock getClock() { protected void postSnapshot() throws InterruptedException { } + + protected void preSnapshot() throws InterruptedException { + + } } diff --git a/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java b/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java index 009be2f3e..06d1e6ed4 100644 --- a/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java +++ b/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java @@ -7,6 +7,7 @@ import static org.slf4j.Logger.ROOT_LOGGER_NAME; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -95,6 +96,16 @@ public boolean containsMessage(String text) { return false; } + public List getLoggingEvents(String text) { + List matchEvents = new ArrayList<>(); + for (ILoggingEvent event : events) { + if (event.getFormattedMessage().toString().contains(text)) { + matchEvents.add(event); + } + } + return matchEvents; + } + public boolean containsWarnMessage(String text) { return containsMessage(Level.WARN, text); } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java new file mode 100644 index 000000000..0f4be1771 --- /dev/null +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java @@ -0,0 +1,240 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline; + +import static io.debezium.pipeline.signal.actions.AbstractSnapshotSignal.SnapshotType.INITIAL_BLOCKING; +import static org.assertj.core.api.Assertions.assertThat; + +import java.lang.management.ManagementFactory; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ReflectionException; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.kafka.connect.data.Struct; +import org.awaitility.Awaitility; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.junit.logging.LogInterceptor; +import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; +import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest; +import io.debezium.util.Testing; + +public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest { + private int signalingRecords; + + protected static final int ROW_COUNT = 1000; + + protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl); + + protected abstract JdbcConnection databaseConnection(); + + @Override + protected abstract String topicName(); + + @Override + protected abstract String tableName(); + + @Override + protected abstract String connector(); + + @Override + protected abstract String server(); + + @Test + public void executeBlockingSnapshot() throws Exception { + // Testing.Print.enable(); + + populateTable(); + + startConnectorWithSnapshot(x -> mutableConfig(false, false)); + + waitForSnapshotToBeCompleted(connector(), server(), task(), database()); + + insertRecords(ROW_COUNT, ROW_COUNT); + + assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2); + + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), INITIAL_BLOCKING, tableDataCollectionId()); + + waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); + + assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2); + + insertRecords(ROW_COUNT, (ROW_COUNT * 2)); + + signalingRecords = 1; + + assertStreamingRecordsArePresent(ROW_COUNT + signalingRecords); + + } + + @Test + public void executeBlockingSnapshotWhileStreaming() throws Exception { + // Testing.Debug.enable(); + + populateTable(); + + startConnectorWithSnapshot(x -> mutableConfig(false, false)); + + waitForSnapshotToBeCompleted(connector(), server(), task(), database()); + + Future batchInserts = executeAsync(insertTask()); + + Thread.sleep(2000); // Let's start stream some insert + + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), INITIAL_BLOCKING, tableDataCollectionId()); + + waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); + + waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task()); + + Long totalSnapshotRecords = getTotalSnapshotRecords(tableName(), connector(), server(), task(), database()); + + batchInserts.get(120, TimeUnit.SECONDS); + + insertRecords(ROW_COUNT, (ROW_COUNT * 2)); + + signalingRecords = 1 + // from streaming + 1; // from snapshot + + assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords), + AbstractBlockingSnapshotTest.getExpectedValues(totalSnapshotRecords)); + } + + private Runnable insertTask() { + return () -> { + try { + insertRecordsWithRandomSleep(ROW_COUNT, ROW_COUNT, 2); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + } + + private Long getTotalSnapshotRecords(String table, String connector, String server, String task, String database) throws MalformedObjectNameException, + ReflectionException, AttributeNotFoundException, InstanceNotFoundException, + MBeanException { + + final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + + TabularDataSupport rowsScanned = (TabularDataSupport) mbeanServer.getAttribute(getSnapshotMetricsObjectName(connector, server, task, database), + "RowsScanned"); + + Map scannedRowsByTable = rowsScanned.values().stream().map(c -> ((CompositeDataSupport) c)) + .collect(Collectors.toMap(compositeDataSupport -> compositeDataSupport.get("key").toString(), compositeDataSupport -> compositeDataSupport.get("value"))); + + String unquotedTableName = table.replace("`", ""); + return (Long) scannedRowsByTable.get(unquotedTableName); + } + + private static List getExpectedValues(Long totalSnapshotRecords) { + + List initialSnapShotValues = IntStream.rangeClosed(0, 999).boxed().collect(Collectors.toList()); + List firstStreamingBatchValues = IntStream.rangeClosed(1000, 1999).boxed().collect(Collectors.toList()); + List blockingSnapshotValues = Stream.of( + initialSnapShotValues, + IntStream.rangeClosed(1000, Math.toIntExact(totalSnapshotRecords)).boxed().collect(Collectors.toList())).flatMap(List::stream) + .collect(Collectors.toList()); + List secondStreamingBatchValues = IntStream.rangeClosed(2000, 2999).boxed().collect(Collectors.toList()); + return Stream.of(initialSnapShotValues, firstStreamingBatchValues, blockingSnapshotValues, secondStreamingBatchValues).flatMap(List::stream) + .collect(Collectors.toList()); + } + + private static void waitForLogMessage(String message, Class logEmitterClass) { + LogInterceptor interceptor = new LogInterceptor(logEmitterClass); + Awaitility.await() + .alias("Snapshot not completed on time") + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS) + .until(() -> interceptor.containsMessage(message)); + } + + private Future executeAsync(Runnable operation) { + return Executors.newSingleThreadExecutor().submit(operation); + } + + private void assertStreamingRecordsArePresent(int expectedRecords) throws InterruptedException { + + assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList())); + } + + private void assertRecordsFromSnapshotAndStreamingArePresent(int expectedRecords) throws InterruptedException { + + assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 1).boxed().collect(Collectors.toList())); + } + + private void assertRecordsWithValuesPresent(int expectedRecords, List expectedValues) throws InterruptedException { + + SourceRecords snapshotAndStreamingRecords = consumeRecordsByTopic(expectedRecords, 10); + assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords); + List actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream() + .map(s -> ((Struct) s.value()).getStruct("after").getInt32("aa")) + .collect(Collectors.toList()); + assertThat(actual).containsAll(expectedValues); + } + + private void insertRecords(int rowCount, int startingPkId) throws SQLException { + + try (JdbcConnection connection = databaseConnection()) { + connection.setAutoCommit(false); + for (int i = 0; i < rowCount; i++) { + connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", + tableName(), + connection.quotedColumnIdString(pkFieldName()), + i + startingPkId + 1, + i + startingPkId)); + } + connection.commit(); + } + } + + private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int maxSleep, Runnable actionOnInsert) throws SQLException { + + try (JdbcConnection connection = databaseConnection()) { + connection.setAutoCommit(true); + for (int i = 0; i < rowCount; i++) { + connection.execute(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", + tableName(), + connection.quotedColumnIdString(pkFieldName()), + i + startingPkId + 1, + i + startingPkId)); + actionOnInsert.run(); + int sleepTime = ThreadLocalRandom.current().nextInt(1, maxSleep); + Thread.sleep(sleepTime); + } + Testing.debug(String.format("Insert of %s records completed", rowCount)); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int maxSleep) throws SQLException { + + insertRecordsWithRandomSleep(rowCount, startingPkId, maxSleep, () -> { + }); + } +} diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java index 9ddbdd3a7..59292a770 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java @@ -10,11 +10,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.nio.file.Path; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -23,9 +21,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiPredicate; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.kafka.clients.producer.KafkaProducer; @@ -42,8 +37,6 @@ import io.debezium.config.Configuration; import io.debezium.data.Envelope; import io.debezium.doc.FixFor; -import io.debezium.embedded.AbstractConnectorTest; -import io.debezium.engine.DebeziumEngine; import io.debezium.jdbc.JdbcConnection; import io.debezium.junit.EqualityCheck; import io.debezium.junit.SkipWhenConnectorUnderTest; @@ -51,275 +44,16 @@ import io.debezium.junit.logging.LogInterceptor; import io.debezium.kafka.KafkaCluster; import io.debezium.pipeline.notification.channels.SinkNotificationChannel; -import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal; import io.debezium.pipeline.signal.actions.snapshotting.StopSnapshot; -import io.debezium.util.Testing; -public abstract class AbstractIncrementalSnapshotTest extends AbstractConnectorTest { - - protected static final int ROW_COUNT = 1_000; - private static final int MAXIMUM_NO_RECORDS_CONSUMES = 5; - - protected static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-is.txt") - .toAbsolutePath(); - private static final int PARTITION_NO = 0; - private static final String SERVER_NAME = "test_server"; +public abstract class AbstractIncrementalSnapshotTest extends AbstractSnapshotTest { protected static KafkaCluster kafka; - protected abstract Class connectorClass(); - - protected abstract JdbcConnection databaseConnection(); - - protected abstract String topicName(); - - protected abstract String tableName(); - - protected abstract List topicNames(); - - protected abstract List tableNames(); - - protected abstract String signalTableName(); - - protected String signalTableNameSanitized() { - return signalTableName(); - } - - protected abstract Configuration.Builder config(); - - protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl); - - protected abstract String connector(); - - protected abstract String server(); - - protected String task() { - return null; - } - - protected String database() { - return null; - } - - protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception { - } - - protected String alterTableAddColumnStatement(String tableName) { - return "ALTER TABLE " + tableName + " add col3 int default 0"; - } - - protected String alterTableDropColumnStatement(String tableName) { - return "ALTER TABLE " + tableName + " drop column col3"; - } - - protected String tableDataCollectionId() { - return tableName(); - } - - protected List tableDataCollectionIds() { - return tableNames(); - } - - protected void populateTable(JdbcConnection connection, String tableName) throws SQLException { - connection.setAutoCommit(false); - for (int i = 0; i < ROW_COUNT; i++) { - connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", - tableName, connection.quotedColumnIdString(pkFieldName()), i + 1, i)); - } - connection.commit(); - } - - protected void populateTable(JdbcConnection connection) throws SQLException { - populateTable(connection, tableName()); - } - - protected void populateTables(JdbcConnection connection) throws SQLException { - for (String tableName : tableNames()) { - populateTable(connection, tableName); - } - } - - protected void populateTable() throws SQLException { - try (JdbcConnection connection = databaseConnection()) { - populateTable(connection); - } - } - - protected void populateTableWithSpecificValue(int startRow, int count, int value) throws SQLException { - try (JdbcConnection connection = databaseConnection()) { - populateTableWithSpecificValue(connection, tableName(), startRow, count, value); - } - } - - private void populateTableWithSpecificValue(JdbcConnection connection, String tableName, int startRow, int count, int value) - throws SQLException { - connection.setAutoCommit(false); - for (int i = startRow + 1; i <= startRow + count; i++) { - connection.executeWithoutCommitting( - String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", - tableName, connection.quotedColumnIdString(pkFieldName()), count + i, value)); - } - connection.commit(); - } - - protected void populateTables() throws SQLException { - try (JdbcConnection connection = databaseConnection()) { - populateTables(connection); - } - } - - protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException { - connection.setAutoCommit(false); - for (int i = 0; i < ROW_COUNT; i++) { - final int id = i + 1; - final int pk1 = id / 1000; - final int pk2 = (id / 100) % 10; - final int pk3 = (id / 10) % 10; - final int pk4 = id % 10; - connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)", - tableName, - pk1, - pk2, - pk3, - pk4, - i)); - } - connection.commit(); - } - - protected Map consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException { - return consumeMixedWithIncrementalSnapshot(recordCount, topicName()); - } - - protected Map consumeMixedWithIncrementalSnapshot(int recordCount, String topicName) throws InterruptedException { - return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), x -> true, null, - topicName); - } - - protected Map consumeMixedWithIncrementalSnapshot(int recordCount, Function valueConverter, - Predicate> dataCompleted, - Consumer> recordConsumer, - String topicName) - throws InterruptedException { - return consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> k.getInt32(pkFieldName()), valueConverter, topicName, recordConsumer); - } - - protected Map consumeMixedWithIncrementalSnapshot(int recordCount, - Predicate> dataCompleted, - Function idCalculator, - Function valueConverter, - String topicName, - Consumer> recordConsumer) - throws InterruptedException { - final Map dbChanges = new HashMap<>(); - int noRecords = 0; - for (;;) { - final SourceRecords records = consumeRecordsByTopic(1); - final List dataRecords = records.recordsForTopic(topicName); - if (records.allRecordsInOrder().isEmpty()) { - noRecords++; - assertThat(noRecords).describedAs(String.format("Too many no data record results, %d < %d", dbChanges.size(), recordCount)) - .isLessThanOrEqualTo(MAXIMUM_NO_RECORDS_CONSUMES); - continue; - } - noRecords = 0; - if (dataRecords == null || dataRecords.isEmpty()) { - continue; - } - dataRecords.forEach(record -> { - final int id = idCalculator.apply((Struct) record.key()); - final V value = valueConverter.apply(record); - dbChanges.put(id, value); - }); - if (recordConsumer != null) { - recordConsumer.accept(dataRecords); - } - if (dbChanges.size() >= recordCount) { - if (!dbChanges.entrySet().stream().anyMatch(dataCompleted.negate())) { - break; - } - } - } - - assertThat(dbChanges).hasSize(recordCount); - return dbChanges; - } - - protected Map consumeRecordsMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException { - return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), x -> true, null, topicName()); - } - - protected Map consumeMixedWithIncrementalSnapshot(int recordCount, Predicate> dataCompleted, - Consumer> recordConsumer) - throws InterruptedException { - return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), dataCompleted, - recordConsumer, topicName()); - } - - protected Map consumeRecordsMixedWithIncrementalSnapshot(int recordCount, Predicate> dataCompleted, - Consumer> recordConsumer) - throws InterruptedException { - return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), dataCompleted, recordConsumer, topicName()); - } - - protected String valueFieldName() { - return "aa"; - } - - protected String pkFieldName() { - return "pk"; - } - protected String getSignalTypeFieldName() { return "type"; } - protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException { - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), dataCollectionIds); - } - - protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional additionalCondition, Optional surrogateKey, - String... dataCollectionIds) { - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(additionalCondition, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL, - dataCollectionIds); - } - - protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional additionalCondition, Optional surrogateKey, - AbstractSnapshotSignal.SnapshotType snapshotType, - String... dataCollectionIds) { - final String dataCollectionIdsList = Arrays.stream(dataCollectionIds) - .map(x -> '"' + x + '"') - .collect(Collectors.joining(", ")); - try (JdbcConnection connection = databaseConnection()) { - String query; - if (additionalCondition.isPresent() && surrogateKey.isPresent()) { - query = String.format( - "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')", - signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get(), surrogateKey.get()); - } - else if (additionalCondition.isPresent()) { - query = String.format( - "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s}')", - signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get()); - } - else if (surrogateKey.isPresent()) { - query = String.format( - "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')", - signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey.get()); - } - else { - query = String.format( - "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')", - signalTableName(), snapshotType.toString(), dataCollectionIdsList); - } - logger.info("Sending signal with query {}", query); - connection.execute(query); - } - catch (Exception e) { - logger.warn("Failed to send signal", e); - } - } - protected void sendAdHocSnapshotStopSignal(String... dataCollectionIds) throws SQLException { String collections = ""; if (dataCollectionIds.length > 0) { @@ -396,39 +130,6 @@ protected void sendResumeSignal() { } } - protected void startConnector(DebeziumEngine.CompletionCallback callback) { - startConnector(Function.identity(), callback, true); - } - - protected void startConnector(Function custConfig) { - startConnector(custConfig, loggingCompletion(), true); - } - - protected void startConnector(Function custConfig, - DebeziumEngine.CompletionCallback callback, boolean expectNoRecords) { - final Configuration config = custConfig.apply(config()).build(); - start(connectorClass(), config, callback); - waitForConnectorToStart(); - - waitForAvailableRecords(5, TimeUnit.SECONDS); - if (expectNoRecords) { - // there shouldn't be any snapshot records - assertNoRecordsToConsume(); - } - } - - protected void startConnectorWithSnapshot(Function custConfig) { - startConnector(custConfig, loggingCompletion(), false); - } - - protected void startConnector() { - startConnector(Function.identity(), loggingCompletion(), true); - } - - protected void waitForConnectorToStart() { - assertConnectorIsRunning(); - } - @Test public void snapshotOnly() throws Exception { // Testing.Print.enable(); @@ -1208,10 +909,6 @@ private void assertCorrectIncrementalSnapshotNotification(List not .containsEntry("total_rows_scanned", "1000"); } - private Function getRecordValue() { - return s -> s.getStruct("after").getInt32(valueFieldName()); - } - protected void sendAdHocSnapshotSignalAndWait(String... collectionIds) throws Exception { // Sends the adhoc snapshot signal and waits for the signal event to have been received if (collectionIds.length == 0) { @@ -1271,8 +968,4 @@ protected boolean consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMes return stopMessageFound.get(); } - @Override - protected int getMaximumEnqueuedRecordCount() { - return ROW_COUNT * 3; - } } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java new file mode 100644 index 000000000..12d75d531 --- /dev/null +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java @@ -0,0 +1,332 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.source.snapshot.incremental; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceRecord; + +import io.debezium.config.Configuration; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.engine.DebeziumEngine; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal; + +public abstract class AbstractSnapshotTest extends AbstractConnectorTest { + + protected static final int ROW_COUNT = 1000; + protected static final Path SCHEMA_HISTORY_PATH = Files.createTestingPath("file-schema-history-is.txt") + .toAbsolutePath(); + protected static final int PARTITION_NO = 0; + protected static final String SERVER_NAME = "test_server"; + private static final int MAXIMUM_NO_RECORDS_CONSUMES = 5; + + protected abstract Class connectorClass(); + + protected abstract JdbcConnection databaseConnection(); + + protected abstract String topicName(); + + protected abstract String tableName(); + + protected abstract List topicNames(); + + protected abstract List tableNames(); + + protected abstract String signalTableName(); + + protected String signalTableNameSanitized() { + return signalTableName(); + } + + protected abstract Configuration.Builder config(); + + protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl); + + protected abstract String connector(); + + protected abstract String server(); + + protected String task() { + return null; + } + + protected String database() { + return null; + } + + protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception { + } + + protected String alterTableAddColumnStatement(String tableName) { + return "ALTER TABLE " + tableName + " add col3 int default 0"; + } + + protected String alterTableDropColumnStatement(String tableName) { + return "ALTER TABLE " + tableName + " drop column col3"; + } + + protected String tableDataCollectionId() { + return tableName(); + } + + protected List tableDataCollectionIds() { + return tableNames(); + } + + protected void populateTable(JdbcConnection connection, String tableName) throws SQLException { + connection.setAutoCommit(false); + for (int i = 0; i < ROW_COUNT; i++) { + connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", + tableName, connection.quotedColumnIdString(pkFieldName()), i + 1, i)); + } + connection.commit(); + } + + protected void populateTable(JdbcConnection connection) throws SQLException { + populateTable(connection, tableName()); + } + + protected void populateTables(JdbcConnection connection) throws SQLException { + for (String tableName : tableNames()) { + populateTable(connection, tableName); + } + } + + protected void populateTable() throws SQLException { + try (JdbcConnection connection = databaseConnection()) { + populateTable(connection); + } + } + + protected void populateTableWithSpecificValue(int startRow, int count, int value) throws SQLException { + try (JdbcConnection connection = databaseConnection()) { + populateTableWithSpecificValue(connection, tableName(), startRow, count, value); + } + } + + private void populateTableWithSpecificValue(JdbcConnection connection, String tableName, int startRow, int count, int value) + throws SQLException { + connection.setAutoCommit(false); + for (int i = startRow + 1; i <= startRow + count; i++) { + connection.executeWithoutCommitting( + String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", + tableName, connection.quotedColumnIdString(pkFieldName()), count + i, value)); + } + connection.commit(); + } + + protected void populateTables() throws SQLException { + try (JdbcConnection connection = databaseConnection()) { + populateTables(connection); + } + } + + protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException { + connection.setAutoCommit(false); + for (int i = 0; i < ROW_COUNT; i++) { + final int id = i + 1; + final int pk1 = id / 1000; + final int pk2 = (id / 100) % 10; + final int pk3 = (id / 10) % 10; + final int pk4 = id % 10; + connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)", + tableName, + pk1, + pk2, + pk3, + pk4, + i)); + } + connection.commit(); + } + + protected Map consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException { + return consumeMixedWithIncrementalSnapshot(recordCount, topicName()); + } + + protected Map consumeMixedWithIncrementalSnapshot(int recordCount, String topicName) throws InterruptedException { + return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), x -> true, null, + topicName); + } + + protected Map consumeMixedWithIncrementalSnapshot(int recordCount, Function valueConverter, + Predicate> dataCompleted, + Consumer> recordConsumer, + String topicName) + throws InterruptedException { + return consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> k.getInt32(pkFieldName()), valueConverter, topicName, recordConsumer); + } + + protected Map consumeMixedWithIncrementalSnapshot(int recordCount, + Predicate> dataCompleted, + Function idCalculator, + Function valueConverter, + String topicName, + Consumer> recordConsumer) + throws InterruptedException { + final Map dbChanges = new HashMap<>(); + int noRecords = 0; + for (;;) { + final SourceRecords records = consumeRecordsByTopic(1); + final List dataRecords = records.recordsForTopic(topicName); + if (records.allRecordsInOrder().isEmpty()) { + noRecords++; + assertThat(noRecords).describedAs(String.format("Too many no data record results, %d < %d", dbChanges.size(), recordCount)) + .isLessThanOrEqualTo(MAXIMUM_NO_RECORDS_CONSUMES); + continue; + } + noRecords = 0; + if (dataRecords == null || dataRecords.isEmpty()) { + continue; + } + dataRecords.forEach(record -> { + final int id = idCalculator.apply((Struct) record.key()); + final V value = valueConverter.apply(record); + dbChanges.put(id, value); + }); + if (recordConsumer != null) { + recordConsumer.accept(dataRecords); + } + if (dbChanges.size() >= recordCount) { + if (!dbChanges.entrySet().stream().anyMatch(dataCompleted.negate())) { + break; + } + } + } + + assertThat(dbChanges).hasSize(recordCount); + return dbChanges; + } + + protected Map consumeRecordsMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException { + return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), x -> true, null, topicName()); + } + + protected Map consumeMixedWithIncrementalSnapshot(int recordCount, Predicate> dataCompleted, + Consumer> recordConsumer) + throws InterruptedException { + return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), dataCompleted, + recordConsumer, topicName()); + } + + protected Map consumeRecordsMixedWithIncrementalSnapshot(int recordCount, Predicate> dataCompleted, + Consumer> recordConsumer) + throws InterruptedException { + return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), dataCompleted, recordConsumer, topicName()); + } + + protected String valueFieldName() { + return "aa"; + } + + protected String pkFieldName() { + return "pk"; + } + + protected void startConnector(DebeziumEngine.CompletionCallback callback) { + startConnector(Function.identity(), callback, true); + } + + protected void startConnector(Function custConfig) { + startConnector(custConfig, loggingCompletion(), true); + } + + protected void startConnector(Function custConfig, + DebeziumEngine.CompletionCallback callback, boolean expectNoRecords) { + final Configuration config = custConfig.apply(config()).build(); + start(connectorClass(), config, callback); + waitForConnectorToStart(); + + waitForAvailableRecords(5, TimeUnit.SECONDS); + if (expectNoRecords) { + // there shouldn't be any snapshot records + assertNoRecordsToConsume(); + } + } + + protected void startConnectorWithSnapshot(Function custConfig) { + startConnector(custConfig, loggingCompletion(), false); + } + + protected void startConnector() { + startConnector(Function.identity(), loggingCompletion(), true); + } + + protected void waitForConnectorToStart() { + assertConnectorIsRunning(); + } + + protected Function getRecordValue() { + return s -> s.getStruct("after").getInt32(valueFieldName()); + } + + @Override + protected int getMaximumEnqueuedRecordCount() { + return ROW_COUNT * 3; + } + + protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException { + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), dataCollectionIds); + } + + protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional additionalCondition, Optional surrogateKey, + String... dataCollectionIds) { + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(additionalCondition, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL, + dataCollectionIds); + } + + protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional additionalCondition, Optional surrogateKey, + AbstractSnapshotSignal.SnapshotType snapshotType, + String... dataCollectionIds) { + final String dataCollectionIdsList = Arrays.stream(dataCollectionIds) + .map(x -> '"' + x + '"') + .collect(Collectors.joining(", ")); + try (JdbcConnection connection = databaseConnection()) { + String query; + if (additionalCondition.isPresent() && surrogateKey.isPresent()) { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')", + signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get(), surrogateKey.get()); + } + else if (additionalCondition.isPresent()) { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s}')", + signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get()); + } + else if (surrogateKey.isPresent()) { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')", + signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey.get()); + } + else { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')", + signalTableName(), snapshotType.toString(), dataCollectionIdsList); + } + logger.info("Sending signal with query {}", query); + connection.execute(query); + } + catch (Exception e) { + logger.warn("Failed to send signal", e); + } + } +}