DBZ-7806 Skip search for WAl position when offset exist and is just after the initial snapshot.

This commit is contained in:
mfvitale 2024-05-08 15:46:14 +02:00 committed by Jiri Pechanec
parent c3a8ba4afb
commit 70c526eb16
5 changed files with 84 additions and 15 deletions

View File

@ -9,9 +9,6 @@
import static org.assertj.core.api.Assertions.entry;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
@ -54,7 +51,6 @@ public abstract class BinlogReadOnlyIncrementalSnapshotIT<C extends SourceConnec
private static KafkaCluster kafka;
private static final int PARTITION_NO = 0;
public static final String EXCLUDED_TABLE = "b";
private final Path signalsFile = Paths.get("src", "test", "resources").resolve("debezium_signaling_file.txt");
@Rule
public ConditionalFail conditionalFail = new ConditionalFail();
@ -351,16 +347,6 @@ public void insertDeleteWatermarkingStrategy() throws Exception {
// test has not to be executed on read only
}
private void sendExecuteSnapshotFileSignal(String fullTableNames) throws IOException {
String signalValue = String.format(
"{\"id\":\"12345\",\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}",
fullTableNames);
java.nio.file.Files.write(signalsFile, signalValue.getBytes());
}
protected void populate4PkTable() throws SQLException {
try (JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "a4");

View File

@ -159,7 +159,7 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
this.lastCompletelyProcessedLsn = replicationStream.get().startLsn();
if (walPosition.searchingEnabled()) {
if (walPosition.searchingEnabled() && this.effectiveOffset.lastCompletelyProcessedLsn() != null) {
searchWalPosition(context, partition, this.effectiveOffset, stream, walPosition);
try {
if (!isInPreSnapshotCatchUpStreaming(this.effectiveOffset)) {

View File

@ -8,6 +8,8 @@
import static io.debezium.pipeline.signal.actions.AbstractSnapshotSignal.SnapshotType.BLOCKING;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.List;
@ -21,6 +23,7 @@
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
import io.debezium.pipeline.signal.channels.FileSignalChannel;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
@ -33,6 +36,9 @@ public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
"CREATE TABLE s1.b (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048))";
protected final Path signalsFile = Paths.get("src", "test", "resources")
.resolve("debezium_signaling_blocking_file.txt");
@Before
public void before() throws SQLException {
@ -184,4 +190,31 @@ public void executeBlockingSnapshotWhenASnapshotAlreadyExecuted() throws Excepti
assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT, 10));
}
@Test
public void executeBlockingSnapshotJustAfterInitialSnapshotAndNoEventStreamedYet() throws Exception {
// Testing.Print.enable();
populateTable();
startConnectorWithSnapshot(x -> mutableConfig(false, false)
.with(FileSignalChannel.SIGNAL_FILE, signalsFile.toString())
.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "file"));
waitForSnapshotToBeCompleted(connector(), server(), task(), database());
SourceRecords consumedRecordsByTopic = consumeRecordsByTopic(ROW_COUNT, 10);
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT, consumedRecordsByTopic);
sendExecuteSnapshotFileSignal(tableDataCollectionId(), BLOCKING.name(), signalsFile);
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
assertRecordsFromSnapshotAndStreamingArePresent((ROW_COUNT), consumeRecordsByTopic((ROW_COUNT), 10));
insertRecords(ROW_COUNT, ROW_COUNT * 2);
assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT, 10));
}
}

View File

@ -51,6 +51,7 @@
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
@ -294,6 +295,35 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
assertSourceInfo(second, TestHelper.TEST_DATABASE, "s2", "a");
}
@Test
public void shouldStreamAfterSnapshot() throws Exception {
LogInterceptor logInterceptor = new LogInterceptor(PostgresStreamingChangeEventSource.class);
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
String insertStmt = "INSERT INTO s1.a (aa) VALUES (1);" +
"INSERT INTO s2.a (aa) VALUES (1);";
String statements = "CREATE SCHEMA s1; " +
"CREATE SCHEMA s2; " +
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s2.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
insertStmt;
TestHelper.execute(statements);
buildWithStreamProducer(TestHelper.defaultConfig());
TestConsumer consumer = testConsumer(2, "s1", "s2");
waitForSnapshotToBeCompleted();
// first make sure we get the initial records from both schemas...
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
consumer.clear();
assertThat(logInterceptor.containsMessage("Processing messages")).isTrue();
}
@Test
@FixFor("DBZ-1564")
public void shouldCloseTransactionsAfterSnapshot() throws Exception {

View File

@ -7,7 +7,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
@ -38,6 +40,8 @@ public abstract class AbstractSnapshotTest<T extends SourceConnector> extends Ab
protected static final int PARTITION_NO = 0;
protected static final String SERVER_NAME = "test_server";
private static final int MAXIMUM_NO_RECORDS_CONSUMES = 5;
protected final Path signalsFile = Paths.get("src", "test", "resources")
.resolve("debezium_signaling_file.txt");
protected abstract Class<T> connectorClass();
@ -301,6 +305,22 @@ protected int getMaximumEnqueuedRecordCount() {
return ROW_COUNT * 3;
}
protected void sendExecuteSnapshotFileSignal(String fullTableNames) throws IOException {
sendExecuteSnapshotFileSignal(fullTableNames, "INCREMENTAL", signalsFile);
}
protected void sendExecuteSnapshotFileSignal(String fullTableNames, String type, Path signalFile) throws IOException {
String signalValue = String.format(
"{\"id\":\"12345\",\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"%s\"}}",
fullTableNames, type);
java.nio.file.Files.write(signalFile, signalValue.getBytes());
}
protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException {
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", dataCollectionIds);
}