DBZ-2550 Prevent duplicate events using catch up streaming

Normally when a connector gracefully shuts down, the connect framework attempts to commit offsets so the latest committed state gets acked on the replication stream.  While the connector is running, the framework periodically commits offsets. Debezium does not manage triggering an offset commit.  When the catch up streaming phase ends, there may be uncommitted state and the connector is unable to determine when the next commit will occur because the commit timing is externally managed. If a commit is not triggered between the end of the catch up streaming phase and the normal streaming phase after the snapshot, the connector may produce some duplicated messages.

Although the replication stream may be out of date, the internal OffsetContext is aware of the latest committed offset. When the snapshot phase recreates a new offset after catch up streaming, the previous offset has access to the latest state. Use the previous offset to forward known state to the new offset.
This commit is contained in:
cooksey 2020-09-17 09:46:19 -04:00 committed by Jiri Pechanec
parent cc4dbdcc9c
commit ba074abe4c
3 changed files with 22 additions and 3 deletions

View File

@ -226,6 +226,11 @@ public String toString() {
}
public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock) {
return initialContext(connectorConfig, jdbcConnection, clock, null, null);
}
public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock, Lsn lastCommitLsn,
Lsn lastCompletelyProcessedLsn) {
try {
LOGGER.info("Creating initial offset context");
final Lsn lsn = Lsn.valueOf(jdbcConnection.currentXLogLocation());
@ -234,8 +239,8 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne
return new PostgresOffsetContext(
connectorConfig,
lsn,
null,
null,
lastCompletelyProcessedLsn,
lastCommitLsn,
txId,
clock.currentTimeAsInstant(),
false,

View File

@ -44,6 +44,7 @@ public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeE
private final Snapshotter snapshotter;
private final SlotCreationResult slotCreatedInfo;
private final SlotState startingSlotInfo;
private final PostgresOffsetContext previousOffset;
public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresOffsetContext previousOffset,
PostgresConnection jdbcConnection, PostgresSchema schema, EventDispatcher<TableId> dispatcher, Clock clock,
@ -55,6 +56,7 @@ public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig
this.snapshotter = snapshotter;
this.slotCreatedInfo = slotCreatedInfo;
this.startingSlotInfo = startingSlotInfo;
this.previousOffset = previousOffset;
}
@Override
@ -130,7 +132,17 @@ protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext snapshotCont
protected void determineSnapshotOffset(RelationalSnapshotContext ctx) throws Exception {
PostgresOffsetContext offset = (PostgresOffsetContext) ctx.offset;
if (offset == null) {
offset = PostgresOffsetContext.initialContext(connectorConfig, jdbcConnection, getClock());
if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot()) {
// The connect framework, not the connector, manages triggering committing offset state so the
// replication stream may not have flushed the latest offset state during catch up streaming.
// The previousOffset variable is shared between the catch up streaming and snapshot phases and
// has the latest known offset state.
offset = PostgresOffsetContext.initialContext(connectorConfig, jdbcConnection, getClock(),
previousOffset.lastCommitLsn(), previousOffset.lastCompletelyProcessedLsn());
}
else {
offset = PostgresOffsetContext.initialContext(connectorConfig, jdbcConnection, getClock());
}
ctx.offset = offset;
}

View File

@ -1490,6 +1490,8 @@ public void customSnapshotterSkipsTablesOnRestart() throws Exception {
VerifyRecord.isValidRead(s1recs.get(1), PK_FIELD, 1);
VerifyRecord.isValidRead(s1recs.get(2), PK_FIELD, 2);
assertNoRecordsToConsume();
TestHelper.assertNoOpenTransactions();
stopConnector(value -> assertThat(logInterceptor.containsMessage("For table 's2.a' the select statement was not provided, skipping table")).isTrue());