DBZ-7917 Implement read-only incremental snapshot for PostgreSQL

This commit is contained in:
mfvitale 2024-06-06 09:45:25 +02:00 committed by Jiri Pechanec
parent 22efd5a367
commit 3bf9f4d9d5
9 changed files with 478 additions and 10 deletions

View File

@ -0,0 +1,90 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import io.debezium.DebeziumException;
public class PgSnapshot {
private static final String SNAPSHOT_FORMAT = "(\\d+):(\\d+):((\\d+,*)+)*";
private static final Pattern SNAPSHOT_PATTERN = Pattern.compile(SNAPSHOT_FORMAT);
private static final String SEPARATOR = ",";
private final Long xMin;
private final Long xMax;
private final Set<Long> xip;
public PgSnapshot(Long xMin, Long xMax, Set<Long> xip) {
this.xMin = xMin;
this.xMax = xMax;
this.xip = xip;
}
public Long getXMin() {
return xMin;
}
public Long getXMax() {
return xMax;
}
public Set<Long> getXip() {
return xip;
}
public static PgSnapshot from(String snapshotString) {
Matcher matcher = SNAPSHOT_PATTERN.matcher(snapshotString);
if (matcher.matches()) {
Long xmin = Long.parseLong(matcher.group(1));
Long xmax = Long.parseLong(matcher.group(2));
Set<Long> xip = Set.of();
if (matcher.group(3) != null) {
xip = Arrays.stream(matcher.group(3).split(SEPARATOR))
.map(Long::parseLong)
.collect(Collectors.toSet());
}
return new PgSnapshot(xmin, xmax, xip);
}
throw new DebeziumException(String.format("Unable to parse PgCurrentSnapshot result %s.", snapshotString));
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
PgSnapshot that = (PgSnapshot) o;
return Objects.equals(xMin, that.xMin) && Objects.equals(xMax, that.xMax) && Objects.equals(xip, that.xip);
}
@Override
public int hashCode() {
return Objects.hash(xMin, xMax, xip);
}
@Override
public String toString() {
return "PgSnapshot{" +
"xMin=" + xMin +
", xMax=" + xMax +
", xip=" + xip +
'}';
}
}

View File

@ -94,12 +94,26 @@ public StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> getS
SnapshotProgressListener<PostgresPartition> snapshotProgressListener, SnapshotProgressListener<PostgresPartition> snapshotProgressListener,
DataChangeEventListener<PostgresPartition> dataChangeEventListener, DataChangeEventListener<PostgresPartition> dataChangeEventListener,
NotificationService<PostgresPartition, PostgresOffsetContext> notificationService) { NotificationService<PostgresPartition, PostgresOffsetContext> notificationService) {
if (configuration.isReadOnlyConnection()) {
return Optional.of(new PostgresReadOnlyIncrementalSnapshotChangeEventSource<>(
configuration,
connectionFactory.mainConnection(),
dispatcher,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener,
notificationService));
}
// If no data collection id is provided, don't return an instance as the implementation requires // If no data collection id is provided, don't return an instance as the implementation requires
// that a signal data collection id be provided to work. // that a signal data collection id be provided to work.
if (Strings.isNullOrEmpty(configuration.getSignalingDataCollectionId())) { if (Strings.isNullOrEmpty(configuration.getSignalingDataCollectionId())) {
return Optional.empty(); return Optional.empty();
} }
final PostgresSignalBasedIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new PostgresSignalBasedIncrementalSnapshotChangeEventSource(
return Optional.of(new PostgresSignalBasedIncrementalSnapshotChangeEventSource(
configuration, configuration,
connectionFactory.mainConnection(), connectionFactory.mainConnection(),
dispatcher, dispatcher,
@ -107,7 +121,6 @@ public StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> getS
clock, clock,
snapshotProgressListener, snapshotProgressListener,
dataChangeEventListener, dataChangeEventListener,
notificationService); notificationService));
return Optional.of(incrementalSnapshotChangeEventSource);
} }
} }

View File

@ -959,6 +959,16 @@ public static AutoCreateMode parse(String value, String defaultValue) {
.withDefault(Boolean.TRUE) .withDefault(Boolean.TRUE)
.withValidation(Field::isBoolean, PostgresConnectorConfig::validateFlushLsnSource); .withValidation(Field::isBoolean, PostgresConnectorConfig::validateFlushLsnSource);
public static final Field READ_ONLY_CONNECTION = Field.create("read.only")
.withDisplayName("Read only connection")
.withType(ConfigDef.Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 100))
.withDefault(false)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Switched connector to use alternative methods to deliver signals to Debezium instead "
+ "of writing to signaling table");
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(PostgresSourceInfoStructMaker.class.getName()); .withDefault(PostgresSourceInfoStructMaker.class.getName());
@ -971,6 +981,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
private final SnapshotMode snapshotMode; private final SnapshotMode snapshotMode;
private final SnapshotLockingMode snapshotLockingMode; private final SnapshotLockingMode snapshotLockingMode;
private final boolean readOnlyConnection;
public PostgresConnectorConfig(Configuration config) { public PostgresConnectorConfig(Configuration config) {
super( super(
@ -992,6 +1003,7 @@ public PostgresConnectorConfig(Configuration config) {
this.replicaIdentityMapper = (replicaIdentityMapping != null) ? new ReplicaIdentityMapper(replicaIdentityMapping) : null; this.replicaIdentityMapper = (replicaIdentityMapping != null) ? new ReplicaIdentityMapper(replicaIdentityMapping) : null;
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString()); this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.readOnlyConnection = config.getBoolean(READ_ONLY_CONNECTION);
} }
protected String hostname() { protected String hostname() {
@ -1105,6 +1117,13 @@ public Optional<SnapshotLockingMode> getSnapshotLockingMode() {
return Optional.of(this.snapshotLockingMode); return Optional.of(this.snapshotLockingMode);
} }
/**
* @return whether database connection should be treated as read-only.
*/
public boolean isReadOnlyConnection() {
return readOnlyConnection;
}
protected int moneyFractionDigits() { protected int moneyFractionDigits() {
return getConfig().getInteger(MONEY_FRACTION_DIGITS); return getConfig().getInteger(MONEY_FRACTION_DIGITS);
} }

View File

@ -209,8 +209,12 @@ public PostgresOffsetContext load(Map<String, ?> offset) {
final Instant useconds = Conversions.toInstantFromMicros((Long) ((Map<String, Object>) offset).getOrDefault(SourceInfo.TIMESTAMP_USEC_KEY, 0L)); final Instant useconds = Conversions.toInstantFromMicros((Long) ((Map<String, Object>) offset).getOrDefault(SourceInfo.TIMESTAMP_USEC_KEY, 0L));
final boolean snapshot = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE); final boolean snapshot = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE);
final boolean lastSnapshotRecord = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE); final boolean lastSnapshotRecord = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE);
return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, lastCommitLsn, txId, messageType, useconds, snapshot, lastSnapshotRecord, return new PostgresOffsetContext(connectorConfig, lsn,
TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext.load(offset, false)); lastCompletelyProcessedLsn, lastCommitLsn, txId, messageType, useconds, snapshot, lastSnapshotRecord,
TransactionContext.load(offset),
connectorConfig.isReadOnlyConnection()
? PostgresReadOnlyIncrementalSnapshotContext.load(offset)
: SignalBasedIncrementalSnapshotContext.load(offset, false));
} }
} }
@ -245,7 +249,9 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne
false, false,
false, false,
new TransactionContext(), new TransactionContext(),
new SignalBasedIncrementalSnapshotContext<>(false)); connectorConfig.isReadOnlyConnection()
? new PostgresReadOnlyIncrementalSnapshotContext<>()
: new SignalBasedIncrementalSnapshotContext<>(false));
} }
catch (SQLException e) { catch (SQLException e) {
throw new ConnectException("Database processing error", e); throw new ConnectException("Database processing error", e);

View File

@ -0,0 +1,182 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import java.sql.SQLException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
public class PostgresReadOnlyIncrementalSnapshotChangeEventSource<P extends PostgresPartition>
extends AbstractIncrementalSnapshotChangeEventSource<P, TableId> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresReadOnlyIncrementalSnapshotChangeEventSource.class);
private static final String FORCE_NEW_TRANSACTION = "SELECT * FROM pg_current_xact_id();";
private static final String CURRENT_SNAPSHOT = "SELECT * FROM pg_current_snapshot();";
public PostgresReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config,
JdbcConnection jdbcConnection,
EventDispatcher<P, TableId> dispatcher,
DatabaseSchema<?> databaseSchema,
Clock clock,
SnapshotProgressListener<P> progressListener,
DataChangeEventListener<P> dataChangeEventListener,
NotificationService<P, ? extends OffsetContext> notificationService) {
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener, notificationService);
}
@Override
protected void preIncrementalSnapshotStart() {
super.preIncrementalSnapshotStart();
forceNewTransactionId();
}
private PostgresReadOnlyIncrementalSnapshotContext<TableId> getContext() {
return (PostgresReadOnlyIncrementalSnapshotContext<TableId>) context;
}
@Override
protected void emitWindowOpen() {
getCurrentSnapshot(getContext()::setLowWatermark);
}
@Override
protected void emitWindowClose(P partition, OffsetContext offsetContext) {
getCurrentSnapshot(getContext()::setHighWatermark);
}
@Override
public void processMessage(P partition, DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) throws InterruptedException {
if (getContext() == null) {
LOGGER.warn("Context is null, skipping message processing");
return;
}
LOGGER.debug("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window);
getContext().updateWindowState(offsetContext);
boolean windowClosed = getContext().isWindowClosed();
if (windowClosed) {
sendWindowEvents(partition, offsetContext);
readChunk(partition, offsetContext);
}
else if (!window.isEmpty() && getContext().deduplicationNeeded()) {
LOGGER.trace("Deduplicating");
deduplicateWindow(dataCollectionId, key);
}
}
@Override
public void processTransactionCommittedEvent(P partition, OffsetContext offsetContext) throws InterruptedException {
if (getContext() == null) {
LOGGER.warn("Context is null, skipping message processing");
return;
}
LOGGER.trace("Processing transaction event");
readUntilNewTransactionChange(partition, offsetContext);
LOGGER.trace("Finished processing transaction event");
}
private void readUntilNewTransactionChange(P partition, OffsetContext offsetContext) throws InterruptedException {
Long eventTxId = offsetContext.getSourceInfo().getInt64(SourceInfo.TXID_KEY);
LOGGER.debug("Event txId {}, snapshot is running {}, reachedHighWatermark {}",
eventTxId, getContext().snapshotRunning(), getContext().isTransactionVisible(eventTxId));
if (getContext().snapshotRunning() && maxInProgressTransactionCommitted(eventTxId)) {
getContext().closeWindow();
sendWindowEvents(partition, offsetContext);
readChunk(partition, offsetContext);
return;
}
while (getContext().snapshotRunning() && getContext().isTransactionVisible(eventTxId)) {
LOGGER.debug("Finishing snapshot, snapshot is running {}, reachedHighWatermark {}", getContext().snapshotRunning(),
getContext().isTransactionVisible(eventTxId));
getContext().closeWindow();
sendWindowEvents(partition, offsetContext);
readChunk(partition, offsetContext);
if (getContext().watermarksChanged()) {
LOGGER.trace("Watermarks changed");
return;
}
LOGGER.trace("Re read chunk finished, snapshot is running {}, reachedHighWatermark {}", getContext().snapshotRunning(),
getContext().isTransactionVisible(eventTxId));
}
}
private void getCurrentSnapshot(Consumer<PgSnapshot> watermark) {
try {
PgSnapshot pgSnapshot = jdbcConnection.queryAndMap(CURRENT_SNAPSHOT, singleResultMapper(rs -> {
String currentSnapshot = rs.getString(1);
LOGGER.trace("Current snapshot {}", currentSnapshot);
return PgSnapshot.from(currentSnapshot);
}, "Unable to get current snapshot"));
watermark.accept(pgSnapshot);
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}
private boolean maxInProgressTransactionCommitted(Long eventTxId) {
return getContext().getHighWatermark().getXMax().equals(eventTxId);
}
private void forceNewTransactionId() {
try {
jdbcConnection.query(FORCE_NEW_TRANSACTION, rs -> {
if (rs.next()) {
LOGGER.trace("Created new transaction ID {}", rs.getString(1));
}
});
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}
private <T> JdbcConnection.ResultSetMapper<T> singleResultMapper(JdbcConnection.ResultSetExtractor<T> extractor, String error) throws SQLException {
return (rs) -> {
if (rs.next()) {
final T ret = extractor.apply(rs);
if (!rs.next()) {
return ret;
}
}
throw new IllegalStateException(error);
};
}
}

View File

@ -0,0 +1,108 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
public class PostgresReadOnlyIncrementalSnapshotContext<T> extends AbstractIncrementalSnapshotContext<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresReadOnlyIncrementalSnapshotContext.class);
private PgSnapshot lowWatermark;
private PgSnapshot highWatermark;
private PgSnapshot previousHighWatermark;
private PgSnapshot previousLowWatermark;
public PostgresReadOnlyIncrementalSnapshotContext() {
this(false);
}
public PostgresReadOnlyIncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
super(useCatalogBeforeSchema);
}
public static <U> PostgresReadOnlyIncrementalSnapshotContext<U> load(Map<String, ?> offsets) {
return load(offsets, false);
}
public static <U> PostgresReadOnlyIncrementalSnapshotContext<U> load(Map<String, ?> offsets, boolean useCatalogBeforeSchema) {
PostgresReadOnlyIncrementalSnapshotContext<U> context = new PostgresReadOnlyIncrementalSnapshotContext<>(useCatalogBeforeSchema);
init(context, offsets);
return context;
}
public PgSnapshot getLowWatermark() {
return lowWatermark;
}
public void setLowWatermark(PgSnapshot lowWatermark) {
LOGGER.trace("Setting low watermark to {}", lowWatermark);
this.lowWatermark = lowWatermark;
}
public PgSnapshot getHighWatermark() {
return highWatermark;
}
public void setHighWatermark(PgSnapshot highWatermark) {
LOGGER.trace("Setting high watermark to {}", highWatermark);
this.highWatermark = highWatermark;
}
public void updateWindowState(OffsetContext offsetContext) {
Long eventTxId = offsetContext.getSourceInfo().getInt64(SourceInfo.TXID_KEY);
LOGGER.trace("Received event with TxId {}", eventTxId);
LOGGER.trace("Updating window. Window oped: {}, low watermark {}, high watermark {}", windowOpened, lowWatermark, highWatermark);
if (!windowOpened && lowWatermark != null) {
boolean pastLowWatermark = eventTxId >= lowWatermark.getXMin();
if (pastLowWatermark) {
LOGGER.debug("Current event txId {}, low watermark {}", eventTxId, lowWatermark);
windowOpened = true;
}
}
if (windowOpened && highWatermark != null) {
boolean pastHighWatermark = eventTxId > Math.max(highWatermark.getXMax(), lowWatermark.getXMax());
if (pastHighWatermark) {
LOGGER.debug("Current event txId {}, high watermark {}", eventTxId, highWatermark);
closeWindow();
}
}
}
public boolean isWindowClosed() {
return !windowOpened;
}
public void closeWindow() {
LOGGER.trace("Window closed. Low and High watermark cleaned");
windowOpened = false;
previousHighWatermark = highWatermark;
highWatermark = null;
previousLowWatermark = lowWatermark;
lowWatermark = null;
}
public boolean isTransactionVisible(Long eventTxId) {
if (lowWatermark == null) {
return true;
}
return eventTxId.compareTo(highWatermark.getXMin()) <= 0;
}
public boolean watermarksChanged() {
LOGGER.trace("previousLowWatermark {}, lowWatermark {}, previousHighWatermark {}, highWatermark {}", previousLowWatermark, lowWatermark, previousHighWatermark,
highWatermark);
return !previousLowWatermark.equals(lowWatermark) || !previousHighWatermark.equals(highWatermark);
}
}

View File

@ -258,20 +258,24 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
// Tx BEGIN/END event // Tx BEGIN/END event
if (message.isTransactionalMessage()) { if (message.isTransactionalMessage()) {
offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection),
null,
message.getOperation());
if (!connectorConfig.shouldProvideTransactionMetadata()) { if (!connectorConfig.shouldProvideTransactionMetadata()) {
LOGGER.trace("Received transactional message {}", message); LOGGER.trace("Received transactional message {}", message);
// Don't skip on BEGIN message as it would flush LSN for the whole transaction // Don't skip on BEGIN message as it would flush LSN for the whole transaction
// too early // too early
if (message.getOperation() == Operation.COMMIT) { if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn); commitMessage(partition, offsetContext, lsn);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime());
} }
return; return;
} }
offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection),
null,
message.getOperation());
if (message.getOperation() == Operation.BEGIN) { if (message.getOperation() == Operation.BEGIN) {
dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime()); dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime());
} }

View File

@ -0,0 +1,41 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import io.debezium.DebeziumException;
public class PgSnapshotTest {
@Test
public void parseCorrectPgSnapshotWithInProgressTransactions() {
PgSnapshot snapshot = PgSnapshot.from("795:799:795,797");
Assertions.assertThat(snapshot.getXMin()).isEqualTo(795L);
Assertions.assertThat(snapshot.getXMax()).isEqualTo(799L);
Assertions.assertThat(snapshot.getXip()).contains(795L, 797L);
}
@Test
public void parseCorrectPgSnapshotWithoutInProgressTransactions() {
PgSnapshot snapshot = PgSnapshot.from("795:799:");
Assertions.assertThat(snapshot.getXMin()).isEqualTo(795L);
Assertions.assertThat(snapshot.getXMax()).isEqualTo(799L);
Assertions.assertThat(snapshot.getXip()).isEmpty();
}
@Test
public void parseAWrongPgSnapshotWillThrowException() {
Assertions.assertThatThrownBy(() -> PgSnapshot.from("795::"))
.isInstanceOf(DebeziumException.class);
}
}

View File

@ -227,6 +227,7 @@ public void init(P partition, OffsetContext offsetContext) {
} }
LOGGER.info("Incremental snapshot in progress, need to read new chunk on start"); LOGGER.info("Incremental snapshot in progress, need to read new chunk on start");
try { try {
preIncrementalSnapshotStart();
progressListener.snapshotStarted(partition); progressListener.snapshotStarted(partition);
readChunk(partition, offsetContext); readChunk(partition, offsetContext);
} }
@ -683,6 +684,10 @@ protected void setContext(IncrementalSnapshotContext<T> context) {
this.context = context; this.context = context;
} }
protected void preIncrementalSnapshotStart() {
// no-op
}
protected void preReadChunk(IncrementalSnapshotContext<T> context) { protected void preReadChunk(IncrementalSnapshotContext<T> context) {
LOGGER.trace("Pre read chunk"); LOGGER.trace("Pre read chunk");