From e852f082a222575810b26f2a308d1f5ccef39997 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Thu, 25 Jul 2019 12:27:22 -0400 Subject: [PATCH] DBZ-1035 Support PostgreSQL lock-free snapshot using exported snapshots --- .../postgresql/PostgresConnectorConfig.java | 7 ++ .../postgresql/RecordsSnapshotProducer.java | 13 +++- .../snapshot/ExportedSnapshotter.java | 72 +++++++++++++++++++ .../postgresql/PostgresConnectorIT.java | 62 ++++++++++++++++ 4 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index d882d9e13..48688c60a 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -30,6 +30,7 @@ import io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder; import io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder; import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter; +import io.debezium.connector.postgresql.snapshot.ExportedSnapshotter; import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter; import io.debezium.connector.postgresql.snapshot.InitialSnapshotter; import io.debezium.connector.postgresql.snapshot.NeverSnapshotter; @@ -133,6 +134,11 @@ public enum SnapshotMode implements EnumeratedValue { */ INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()), + /** + * Perform an exported snapshot + */ + EXPORTED("exported", (c) -> new ExportedSnapshotter()), + /** * Inject a custom snapshotter, which allows for more control over snapshots. */ @@ -655,6 +661,7 @@ public static SchemaRefreshMode parse(String value) { + "'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; " + "'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;" + "'never' to specify the connector should never run a snapshot and that upon first startup the connector should read from the last position (LSN) recorded by the server; and" + + "'exported' to specify the connector should run a snapshot based on the position when the replication slot was created; " + "'custom' to specify a custom class with 'snapshot.custom_class' which will be loaded and used to determine the snapshot, see docs for more details."); public static final Field SNAPSHOT_MODE_CLASS = Field.create("snapshot.custom.class") diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java index 419169f31..e3cba83cf 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java @@ -225,7 +225,7 @@ private void takeSnapshot(BlockingConsumer consumer) { } } - long xlogStart = connection.currentXLogLocation(); + long xlogStart = getTransactionStartLsn(connection); long txId = connection.currentTransactionId().longValue(); if (logger.isInfoEnabled()) { logger.info("\t read xlogStart at '{}' from transaction '{}'", ReplicationConnection.format(xlogStart), txId); @@ -320,6 +320,17 @@ private void takeSnapshot(BlockingConsumer consumer) { } } + private long getTransactionStartLsn(PostgresConnection connection) throws SQLException { + if (snapshotter.exportSnapshot() && slotCreatedInfo != null) { + // When performing an exported snapshot based on a newly created replication slot, the txLogStart position + // should be based on the replication slot snapshot transaction point. This is crucial so that if any + // SQL operations occur mid-snapshot that they'll be properly captured when streaming begins; otherwise + // they'll be lost. + return slotCreatedInfo.startLsn(); + } + return connection.currentXLogLocation(); + } + private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) { final Struct envelope = (Struct) currentRecord.value(); final Struct source = (Struct) envelope.get(Envelope.FieldName.SOURCE); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java new file mode 100644 index 000000000..3cc6db9bf --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ExportedSnapshotter.java @@ -0,0 +1,72 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.snapshot; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.postgresql.replication.LogSequenceNumber; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.spi.OffsetState; +import io.debezium.connector.postgresql.spi.SlotCreationResult; +import io.debezium.connector.postgresql.spi.SlotState; +import io.debezium.connector.postgresql.spi.Snapshotter; +import io.debezium.relational.TableId; + +/** + * @author Chris Cranford + */ +public class ExportedSnapshotter implements Snapshotter { + + private Map snapshotOverrides; + + @Override + public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { + this.snapshotOverrides = config.getSnapshotSelectOverridesByTable(); + } + + @Override + public boolean shouldSnapshot() { + return true; + } + + @Override + public boolean shouldStream() { + return true; + } + + @Override + public boolean exportSnapshot() { + return true; + } + + @Override + public Optional buildSnapshotQuery(TableId tableId) { + if (snapshotOverrides.containsKey(tableId)) { + return Optional.of(snapshotOverrides.get(tableId)); + } + else { + return Optional.of("select * from " + tableId.toDoubleQuotedString()); + } + } + + @Override + public Optional snapshotTableLockingStatement(Duration lockTimeout, Set tableIds) { + return Optional.empty(); + } + + @Override + public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { + if (newSlotInfo != null) { + String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName()); + return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet; + } + return Snapshotter.super.snapshotTransactionIsolationLevelStatement(newSlotInfo); + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index f97011b4a..93cc73982 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -851,6 +851,68 @@ record = s2recs.get(0); VerifyRecord.isValidRead(s2recs.get(1), PK_FIELD, 2); } + @Test + @FixFor("DBZ-1035") + public void shouldAllowForExportedSnapshot() throws Exception { + TestHelper.dropDefaultReplicationSlot(); + + // Inside RecordsSnapshotProducer, we inject a new row into s1.a with aa=5 prior to executing the + // actual snapshot. The snapshot reference is that of what the tables looked like at the time + // the replication slot was created. + TestHelper.execute(SETUP_TABLES_STMT); + Configuration config = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) + .build(); + start(PostgresConnector.class, config); + assertConnectorIsRunning(); + + // Consume records from the snapshot + SourceRecords actualRecords = consumeRecordsByTopic(2); + + List s1recs = actualRecords.recordsForTopic(topicName("s1.a")); + List s2recs = actualRecords.recordsForTopic(topicName("s2.a")); + assertThat(s1recs.size()).isEqualTo(1); + assertThat(s2recs.size()).isEqualTo(1); + + VerifyRecord.isValidRead(s1recs.get(0), PK_FIELD, 1); + VerifyRecord.isValidRead(s2recs.get(0), PK_FIELD, 1); + + // Insert 2 more rows + // These are captured by the stream + // NOTE: Manually tested the notion that if records were inserted between creation of replication slot and + // the finalization of the snapshot that those records would be captured and streamed at this point. + TestHelper.execute(INSERT_STMT); + actualRecords = consumeRecordsByTopic(2); + + s1recs = actualRecords.recordsForTopic(topicName("s1.a")); + s2recs = actualRecords.recordsForTopic(topicName("s2.a")); + assertThat(s1recs.size()).isEqualTo(1); + assertThat(s2recs.size()).isEqualTo(1); + + VerifyRecord.isValidInsert(s1recs.get(0), PK_FIELD, 2); + VerifyRecord.isValidInsert(s2recs.get(0), PK_FIELD, 2); + stopConnector(); + + config = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .build(); + start(PostgresConnector.class, config); + assertConnectorIsRunning(); + + actualRecords = consumeRecordsByTopic(4); + + s1recs = actualRecords.recordsForTopic(topicName("s1.a")); + s2recs = actualRecords.recordsForTopic(topicName("s2.a")); + assertThat(s1recs.size()).isEqualTo(2); + assertThat(s2recs.size()).isEqualTo(2); + VerifyRecord.isValidRead(s1recs.get(0), PK_FIELD, 1); + VerifyRecord.isValidRead(s1recs.get(1), PK_FIELD, 2); + VerifyRecord.isValidRead(s2recs.get(0), PK_FIELD, 1); + VerifyRecord.isValidRead(s2recs.get(1), PK_FIELD, 2); + } + private String getConfirmedFlushLsn(PostgresConnection connection) throws SQLException { return connection.prepareQueryAndMap( "select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> {