DBZ-1437 Fix ExportedSnapshotter to properly determine when to run a snapshot.

This commit is contained in:
Chris Cranford 2019-08-16 16:07:58 -04:00 committed by Gunnar Morling
parent e7dc1dfb62
commit 68f0bba7b3
2 changed files with 26 additions and 8 deletions

View File

@ -9,6 +9,9 @@
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
@ -21,14 +24,29 @@
*/
public class ExportedSnapshotter implements Snapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(ExportedSnapshotter.class);
private OffsetState sourceInfo;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
this.sourceInfo = sourceInfo;
}
@Override
public boolean shouldSnapshot() {
if (sourceInfo == null) {
LOGGER.info("Taking exported snapshot for new datasource");
return true;
}
else if (sourceInfo.snapshotInEffect()) {
LOGGER.info("Found previous incomplete snapshot");
return true;
}
else {
LOGGER.info("Previous exported snapshot completed, streaming logical changes from last known position");
return false;
}
}
@Override
public boolean shouldStream() {

View File

@ -904,16 +904,16 @@ public void shouldAllowForExportedSnapshot() throws Exception {
start(PostgresConnector.class, config);
assertConnectorIsRunning();
actualRecords = consumeRecordsByTopic(4);
TestHelper.execute(INSERT_STMT);
actualRecords = consumeRecordsByTopic(2);
s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(2);
assertThat(s2recs.size()).isEqualTo(2);
VerifyRecord.isValidRead(s1recs.get(0), PK_FIELD, 1);
VerifyRecord.isValidRead(s1recs.get(1), PK_FIELD, 2);
VerifyRecord.isValidRead(s2recs.get(0), PK_FIELD, 1);
VerifyRecord.isValidRead(s2recs.get(1), PK_FIELD, 2);
assertThat(s1recs.size()).isEqualTo(1);
assertThat(s2recs.size()).isEqualTo(1);
VerifyRecord.isValidInsert(s1recs.get(0), PK_FIELD, 3);
VerifyRecord.isValidInsert(s2recs.get(0), PK_FIELD, 3);
}
private String getConfirmedFlushLsn(PostgresConnection connection) throws SQLException {