From 68f0bba7b38cf034088fe71d6876b97048bf4891 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Fri, 16 Aug 2019 16:07:58 -0400 Subject: [PATCH] DBZ-1437 Fix ExportedSnapshotter to properly determine when to run a snapshot. --- .../snapshot/ExportedSnapshotter.java | 20 ++++++++++++++++++- .../postgresql/PostgresConnectorIT.java | 14 ++++++------- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java index ef378f0db..ca671146e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java @@ -9,6 +9,9 @@ import java.util.Optional; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.spi.OffsetState; import io.debezium.connector.postgresql.spi.SlotCreationResult; @@ -21,13 +24,28 @@ */ public class ExportedSnapshotter implements Snapshotter { + private final static Logger LOGGER = LoggerFactory.getLogger(ExportedSnapshotter.class); + private OffsetState sourceInfo; + @Override public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { + this.sourceInfo = sourceInfo; } @Override public boolean shouldSnapshot() { - return true; + if (sourceInfo == null) { + LOGGER.info("Taking exported snapshot for new datasource"); + return true; + } + else if (sourceInfo.snapshotInEffect()) { + LOGGER.info("Found previous incomplete snapshot"); + return true; + } + else { + LOGGER.info("Previous exported snapshot completed, streaming logical changes from last known position"); + return false; + } } @Override diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index b13948ad5..6c56b498a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -904,16 +904,16 @@ public void shouldAllowForExportedSnapshot() throws Exception { start(PostgresConnector.class, config); assertConnectorIsRunning(); - actualRecords = consumeRecordsByTopic(4); + TestHelper.execute(INSERT_STMT); + + actualRecords = consumeRecordsByTopic(2); s1recs = actualRecords.recordsForTopic(topicName("s1.a")); s2recs = actualRecords.recordsForTopic(topicName("s2.a")); - assertThat(s1recs.size()).isEqualTo(2); - assertThat(s2recs.size()).isEqualTo(2); - VerifyRecord.isValidRead(s1recs.get(0), PK_FIELD, 1); - VerifyRecord.isValidRead(s1recs.get(1), PK_FIELD, 2); - VerifyRecord.isValidRead(s2recs.get(0), PK_FIELD, 1); - VerifyRecord.isValidRead(s2recs.get(1), PK_FIELD, 2); + assertThat(s1recs.size()).isEqualTo(1); + assertThat(s2recs.size()).isEqualTo(1); + VerifyRecord.isValidInsert(s1recs.get(0), PK_FIELD, 3); + VerifyRecord.isValidInsert(s2recs.get(0), PK_FIELD, 3); } private String getConfirmedFlushLsn(PostgresConnection connection) throws SQLException {