From ee91eac4183b15ec6db449f479d5de27e1c662be Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 10 Mar 2020 15:10:37 +0100 Subject: [PATCH] DBZ-1730 Stream from replica slot position not end of tx log --- .../PostgresStreamingChangeEventSource.java | 8 +++-- .../PostgresReplicationConnection.java | 6 +++- .../connection/ReplicationStream.java | 7 ++++ .../postgresql/RecordsStreamProducerIT.java | 36 +++++++++++++++++++ 4 files changed, 54 insertions(+), 3 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index d39751a8e..f291ea870 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -60,6 +60,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS private final AtomicReference replicationStream = new AtomicReference<>(); private final Snapshotter snapshotter; private final Metronome pauseNoMessage; + private final boolean hasStartLsnStoredInContext; /** * The minimum of (number of event received since the last event sent to Kafka, @@ -78,6 +79,9 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi this.clock = clock; this.schema = schema; this.offsetContext = (offsetContext != null) ? offsetContext : PostgresOffsetContext.initialContext(connectorConfig, connection, clock); + // replication slot could exist at the time of starting Debezium so we will stream from the position in the slot + // instead of the last position in the database + this.hasStartLsnStoredInContext = (offsetContext != null); pauseNoMessage = Metronome.sleeper(taskContext.getConfig().getPollInterval(), Clock.SYSTEM); this.taskContext = taskContext; this.snapshotter = snapshotter; @@ -92,7 +96,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio } try { - if (offsetContext.hasLastKnownPosition()) { + if (hasStartLsnStoredInContext) { // start streaming from the last recorded position in the offset final Long lsn = offsetContext.lastCompletelyProcessedLsn() != null ? offsetContext.lastCompletelyProcessedLsn() : offsetContext.lsn(); if (LOGGER.isDebugEnabled()) { @@ -113,7 +117,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio // refresh the schema so we have a latest view of the DB tables taskContext.refreshSchema(connection, true); - this.lastCompletelyProcessedLsn = offsetContext.lsn(); + this.lastCompletelyProcessedLsn = replicationStream.get().startLsn(); while (context.isRunning()) { int noMessageIterations = 0; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 323c0caf7..d798fdbb2 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -152,7 +152,6 @@ protected void initPublication() { } protected void initReplicationSlot() throws SQLException, InterruptedException { - final String postgresPluginName = plugin.getPostgresPluginName(); ServerInfo.ReplicationSlot slotInfo = getSlotInfo(); boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo; @@ -480,6 +479,11 @@ private void processWarnings(final boolean forced) throws SQLException { } } } + + @Override + public Long startLsn() { + return startingLsn; + } }; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java index 14f3819c7..a00db0a0c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java @@ -74,6 +74,13 @@ public interface ReplicationMessageProcessor { */ Long lastReceivedLsn(); + /** + * Returns the value for the LSN form which the streaming is executed. + * + * @return a {@link Long} value, possibly null if starting LSN is undefined + */ + Long startLsn(); + /** * Starts a background thread to ensure the slot is kept alive, useful for when temporarily * stopping reads from the stream such as querying metadata, etc diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index fd33764f1..45a65c456 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -39,6 +39,7 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; import org.awaitility.Awaitility; import org.fest.assertions.Assertions; import org.junit.Before; @@ -65,6 +66,7 @@ import io.debezium.data.VerifyRecord; import io.debezium.data.geometry.Point; import io.debezium.doc.FixFor; +import io.debezium.embedded.EmbeddedEngine; import io.debezium.heartbeat.Heartbeat; import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.TemporalPrecisionMode; @@ -74,6 +76,7 @@ import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; import io.debezium.relational.Table; import io.debezium.relational.TableId; +import io.debezium.util.Collect; import io.debezium.util.Stopwatch; import io.debezium.util.Testing; @@ -1613,6 +1616,39 @@ public void stopInTheMiddleOfTxAndResume() throws Exception { } } + @Test + @FixFor("DBZ-1730") + public void shouldStartConsumingFromSlotLocation() throws Exception { + Testing.Print.enable(); + + startConnector(config -> config + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false) + .with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class), true); + waitForStreamingToStart(); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO test_table (text) VALUES ('insert2')"); + consumer.remove(); + + stopConnector(); + TestHelper.execute( + "INSERT INTO test_table (text) VALUES ('insert3');", + "INSERT INTO test_table (text) VALUES ('insert4')"); + startConnector(config -> config + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class), false); + + consumer.expects(2); + consumer.await(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS); + + // We cannot guarantee the flush timing so it is possible that insert2 record will be redelivered + Assertions.assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isIn(Collect.unmodifiableSet("insert2", "insert3")); + Assertions.assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isIn(Collect.unmodifiableSet("insert3", "insert4")); + + stopConnector(); + } + @Test @FixFor("DBZ-1824") @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON, reason = "wal2json cannot resume transaction in the middle of processing")