From 70c526eb1690732fec4ed348910025d40af77ca3 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Wed, 8 May 2024 15:46:14 +0200 Subject: [PATCH] DBZ-7806 Skip search for WAl position when offset exist and is just after the initial snapshot. --- .../BinlogReadOnlyIncrementalSnapshotIT.java | 14 -------- .../PostgresStreamingChangeEventSource.java | 2 +- .../postgresql/BlockingSnapshotIT.java | 33 +++++++++++++++++++ .../postgresql/RecordsSnapshotProducerIT.java | 30 +++++++++++++++++ .../incremental/AbstractSnapshotTest.java | 20 +++++++++++ 5 files changed, 84 insertions(+), 15 deletions(-) diff --git a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogReadOnlyIncrementalSnapshotIT.java b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogReadOnlyIncrementalSnapshotIT.java index d373c1772..d3c0bcb32 100644 --- a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogReadOnlyIncrementalSnapshotIT.java +++ b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogReadOnlyIncrementalSnapshotIT.java @@ -9,9 +9,6 @@ import static org.assertj.core.api.Assertions.entry; import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; import java.sql.SQLException; import java.time.Duration; import java.util.ArrayList; @@ -54,7 +51,6 @@ public abstract class BinlogReadOnlyIncrementalSnapshotIT mutableConfig(false, false) + .with(FileSignalChannel.SIGNAL_FILE, signalsFile.toString()) + .with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "file")); + + waitForSnapshotToBeCompleted(connector(), server(), task(), database()); + + SourceRecords consumedRecordsByTopic = consumeRecordsByTopic(ROW_COUNT, 10); + assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT, consumedRecordsByTopic); + + sendExecuteSnapshotFileSignal(tableDataCollectionId(), BLOCKING.name(), signalsFile); + + waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); + + assertRecordsFromSnapshotAndStreamingArePresent((ROW_COUNT), consumeRecordsByTopic((ROW_COUNT), 10)); + + insertRecords(ROW_COUNT, ROW_COUNT * 2); + + assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT, 10)); + + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index 979004ffd..e7d05b756 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -51,6 +51,7 @@ import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.junit.SkipTestRule; import io.debezium.junit.SkipWhenDatabaseVersion; +import io.debezium.junit.logging.LogInterceptor; import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; @@ -294,6 +295,35 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception { assertSourceInfo(second, TestHelper.TEST_DATABASE, "s2", "a"); } + @Test + public void shouldStreamAfterSnapshot() throws Exception { + + LogInterceptor logInterceptor = new LogInterceptor(PostgresStreamingChangeEventSource.class); + TestHelper.dropAllSchemas(); + TestHelper.executeDDL("postgres_create_tables.ddl"); + + String insertStmt = "INSERT INTO s1.a (aa) VALUES (1);" + + "INSERT INTO s2.a (aa) VALUES (1);"; + + String statements = "CREATE SCHEMA s1; " + + "CREATE SCHEMA s2; " + + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" + + "CREATE TABLE s2.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" + + insertStmt; + TestHelper.execute(statements); + + buildWithStreamProducer(TestHelper.defaultConfig()); + + TestConsumer consumer = testConsumer(2, "s1", "s2"); + waitForSnapshotToBeCompleted(); + + // first make sure we get the initial records from both schemas... + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + consumer.clear(); + + assertThat(logInterceptor.containsMessage("Processing messages")).isTrue(); + } + @Test @FixFor("DBZ-1564") public void shouldCloseTransactionsAfterSnapshot() throws Exception { diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java index 17a918900..adffbeed0 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java @@ -7,7 +7,9 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.io.IOException; import java.nio.file.Path; +import java.nio.file.Paths; import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; @@ -38,6 +40,8 @@ public abstract class AbstractSnapshotTest extends Ab protected static final int PARTITION_NO = 0; protected static final String SERVER_NAME = "test_server"; private static final int MAXIMUM_NO_RECORDS_CONSUMES = 5; + protected final Path signalsFile = Paths.get("src", "test", "resources") + .resolve("debezium_signaling_file.txt"); protected abstract Class connectorClass(); @@ -301,6 +305,22 @@ protected int getMaximumEnqueuedRecordCount() { return ROW_COUNT * 3; } + protected void sendExecuteSnapshotFileSignal(String fullTableNames) throws IOException { + + sendExecuteSnapshotFileSignal(fullTableNames, "INCREMENTAL", signalsFile); + + } + + protected void sendExecuteSnapshotFileSignal(String fullTableNames, String type, Path signalFile) throws IOException { + + String signalValue = String.format( + "{\"id\":\"12345\",\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"%s\"}}", + fullTableNames, type); + + java.nio.file.Files.write(signalFile, signalValue.getBytes()); + + } + protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException { sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", dataCollectionIds); }