DBZ-1437 Fix InitialOnlySnapshotter to properly determine when to run a snapshot.

This commit is contained in:
Chris Cranford 2019-08-19 13:12:09 -04:00 committed by Gunnar Morling
parent 68f0bba7b3
commit 8504f6aafd
2 changed files with 80 additions and 1 deletions

View File

@ -5,7 +5,24 @@
*/
package io.debezium.connector.postgresql.snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
public class InitialOnlySnapshotter extends QueryingSnapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(InitialOnlySnapshotter.class);
private OffsetState sourceInfo;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
super.init(config, sourceInfo, slotState);
this.sourceInfo = sourceInfo;
}
@Override
public boolean shouldStream() {
return false;
@ -13,6 +30,17 @@ public boolean shouldStream() {
@Override
public boolean shouldSnapshot() {
if (sourceInfo == null) {
LOGGER.info("Taking initial snapshot for new datasource");
return true;
}
else if (sourceInfo.snapshotInEffect()) {
LOGGER.info("Found previous incomplete snapshot");
return true;
}
else {
LOGGER.info("Previous initial snapshot completed, no snapshot will be performed");
return false;
}
}
}

View File

@ -916,6 +916,57 @@ public void shouldAllowForExportedSnapshot() throws Exception {
VerifyRecord.isValidInsert(s2recs.get(0), PK_FIELD, 3);
}
@Test
@FixFor("DBZ-1437")
public void shouldPeformSnapshotOnceForInitialOnlySnapshotMode() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor();
TestHelper.dropDefaultReplicationSlot();
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
// Lets wait for snapshot to finish before proceeding
waitForSnapshotToBeCompleted("postgres", "test_server");
// Lets perform some inserts, this shouldn't be captured.
// This is because we only did snapshot and these would be added afterward.
TestHelper.execute(INSERT_STMT);
waitForAvailableRecords(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
// Consume stream and make sure only the records at snapshot were generated
SourceRecords actualRecords = consumeRecordsByTopic(2);
List<SourceRecord> s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
List<SourceRecord> s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(1);
assertThat(s2recs.size()).isEqualTo(1);
VerifyRecord.isValidRead(s1recs.get(0), PK_FIELD, 1);
VerifyRecord.isValidRead(s2recs.get(0), PK_FIELD, 1);
// Stop the connector
stopConnector();
assertConnectorNotRunning();
// Restart the connector again with initial-only
// No snapshot should be produced and no records generated
config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
// Stop the connector, verify that no snapshot was performed
stopConnector(value -> assertThat(logInterceptor.containsMessage("Previous initial snapshot completed, no snapshot will be performed")).isTrue());
}
private String getConfirmedFlushLsn(PostgresConnection connection) throws SQLException {
return connection.prepareQueryAndMap(
"select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> {