DBZ-1035 Support PostgreSQL lock-free snapshot using exported snapshots

This commit is contained in:
Chris Cranford 2019-07-25 12:27:22 -04:00 committed by Jiri Pechanec
parent 4b44e5f925
commit e852f082a2
4 changed files with 153 additions and 1 deletions

View File

@ -30,6 +30,7 @@
import io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder;
import io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder;
import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter;
import io.debezium.connector.postgresql.snapshot.ExportedSnapshotter;
import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter;
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter;
import io.debezium.connector.postgresql.snapshot.NeverSnapshotter;
@ -133,6 +134,11 @@ public enum SnapshotMode implements EnumeratedValue {
*/
INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()),
/**
* Perform an exported snapshot
*/
EXPORTED("exported", (c) -> new ExportedSnapshotter()),
/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/
@ -655,6 +661,7 @@ public static SchemaRefreshMode parse(String value) {
+ "'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; "
+ "'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;"
+ "'never' to specify the connector should never run a snapshot and that upon first startup the connector should read from the last position (LSN) recorded by the server; and"
+ "'exported' to specify the connector should run a snapshot based on the position when the replication slot was created; "
+ "'custom' to specify a custom class with 'snapshot.custom_class' which will be loaded and used to determine the snapshot, see docs for more details.");
public static final Field SNAPSHOT_MODE_CLASS = Field.create("snapshot.custom.class")

View File

@ -225,7 +225,7 @@ private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
}
}
long xlogStart = connection.currentXLogLocation();
long xlogStart = getTransactionStartLsn(connection);
long txId = connection.currentTransactionId().longValue();
if (logger.isInfoEnabled()) {
logger.info("\t read xlogStart at '{}' from transaction '{}'", ReplicationConnection.format(xlogStart), txId);
@ -320,6 +320,17 @@ private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
}
}
private long getTransactionStartLsn(PostgresConnection connection) throws SQLException {
if (snapshotter.exportSnapshot() && slotCreatedInfo != null) {
// When performing an exported snapshot based on a newly created replication slot, the txLogStart position
// should be based on the replication slot snapshot transaction point. This is crucial so that if any
// SQL operations occur mid-snapshot that they'll be properly captured when streaming begins; otherwise
// they'll be lost.
return slotCreatedInfo.startLsn();
}
return connection.currentXLogLocation();
}
private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
final Struct envelope = (Struct) currentRecord.value();
final Struct source = (Struct) envelope.get(Envelope.FieldName.SOURCE);

View File

@ -0,0 +1,72 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.postgresql.replication.LogSequenceNumber;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.relational.TableId;
/**
* @author Chris Cranford
*/
public class ExportedSnapshotter implements Snapshotter {
private Map<TableId, String> snapshotOverrides;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
this.snapshotOverrides = config.getSnapshotSelectOverridesByTable();
}
@Override
public boolean shouldSnapshot() {
return true;
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean exportSnapshot() {
return true;
}
@Override
public Optional<String> buildSnapshotQuery(TableId tableId) {
if (snapshotOverrides.containsKey(tableId)) {
return Optional.of(snapshotOverrides.get(tableId));
}
else {
return Optional.of("select * from " + tableId.toDoubleQuotedString());
}
}
@Override
public Optional<String> snapshotTableLockingStatement(Duration lockTimeout, Set<TableId> tableIds) {
return Optional.empty();
}
@Override
public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) {
if (newSlotInfo != null) {
String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName());
return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;
}
return Snapshotter.super.snapshotTransactionIsolationLevelStatement(newSlotInfo);
}
}

View File

@ -851,6 +851,68 @@ record = s2recs.get(0);
VerifyRecord.isValidRead(s2recs.get(1), PK_FIELD, 2);
}
@Test
@FixFor("DBZ-1035")
public void shouldAllowForExportedSnapshot() throws Exception {
TestHelper.dropDefaultReplicationSlot();
// Inside RecordsSnapshotProducer, we inject a new row into s1.a with aa=5 prior to executing the
// actual snapshot. The snapshot reference is that of what the tables looked like at the time
// the replication slot was created.
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
// Consume records from the snapshot
SourceRecords actualRecords = consumeRecordsByTopic(2);
List<SourceRecord> s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
List<SourceRecord> s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(1);
assertThat(s2recs.size()).isEqualTo(1);
VerifyRecord.isValidRead(s1recs.get(0), PK_FIELD, 1);
VerifyRecord.isValidRead(s2recs.get(0), PK_FIELD, 1);
// Insert 2 more rows
// These are captured by the stream
// NOTE: Manually tested the notion that if records were inserted between creation of replication slot and
// the finalization of the snapshot that those records would be captured and streamed at this point.
TestHelper.execute(INSERT_STMT);
actualRecords = consumeRecordsByTopic(2);
s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(1);
assertThat(s2recs.size()).isEqualTo(1);
VerifyRecord.isValidInsert(s1recs.get(0), PK_FIELD, 2);
VerifyRecord.isValidInsert(s2recs.get(0), PK_FIELD, 2);
stopConnector();
config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
actualRecords = consumeRecordsByTopic(4);
s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(2);
assertThat(s2recs.size()).isEqualTo(2);
VerifyRecord.isValidRead(s1recs.get(0), PK_FIELD, 1);
VerifyRecord.isValidRead(s1recs.get(1), PK_FIELD, 2);
VerifyRecord.isValidRead(s2recs.get(0), PK_FIELD, 1);
VerifyRecord.isValidRead(s2recs.get(1), PK_FIELD, 2);
}
private String getConfirmedFlushLsn(PostgresConnection connection) throws SQLException {
return connection.prepareQueryAndMap(
"select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> {