diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PgSnapshot.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PgSnapshot.java new file mode 100644 index 000000000..3f8f19256 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PgSnapshot.java @@ -0,0 +1,90 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql; + +import java.util.Arrays; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import io.debezium.DebeziumException; + +public class PgSnapshot { + + private static final String SNAPSHOT_FORMAT = "(\\d+):(\\d+):((\\d+,*)+)*"; + private static final Pattern SNAPSHOT_PATTERN = Pattern.compile(SNAPSHOT_FORMAT); + private static final String SEPARATOR = ","; + + private final Long xMin; + private final Long xMax; + private final Set xip; + + public PgSnapshot(Long xMin, Long xMax, Set xip) { + this.xMin = xMin; + this.xMax = xMax; + this.xip = xip; + } + + public Long getXMin() { + return xMin; + } + + public Long getXMax() { + return xMax; + } + + public Set getXip() { + return xip; + } + + public static PgSnapshot from(String snapshotString) { + + Matcher matcher = SNAPSHOT_PATTERN.matcher(snapshotString); + + if (matcher.matches()) { + + Long xmin = Long.parseLong(matcher.group(1)); + Long xmax = Long.parseLong(matcher.group(2)); + + Set xip = Set.of(); + if (matcher.group(3) != null) { + xip = Arrays.stream(matcher.group(3).split(SEPARATOR)) + .map(Long::parseLong) + .collect(Collectors.toSet()); + } + + return new PgSnapshot(xmin, xmax, xip); + } + + throw new DebeziumException(String.format("Unable to parse PgCurrentSnapshot result %s.", snapshotString)); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + PgSnapshot that = (PgSnapshot) o; + return Objects.equals(xMin, that.xMin) && Objects.equals(xMax, that.xMax) && Objects.equals(xip, that.xip); + } + + @Override + public int hashCode() { + return Objects.hash(xMin, xMax, xip); + } + + @Override + public String toString() { + return "PgSnapshot{" + + "xMin=" + xMin + + ", xMax=" + xMax + + ", xip=" + xip + + '}'; + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java index 00c950444..b854493c9 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java @@ -94,12 +94,26 @@ public StreamingChangeEventSource getS SnapshotProgressListener snapshotProgressListener, DataChangeEventListener dataChangeEventListener, NotificationService notificationService) { + if (configuration.isReadOnlyConnection()) { + + return Optional.of(new PostgresReadOnlyIncrementalSnapshotChangeEventSource<>( + configuration, + connectionFactory.mainConnection(), + dispatcher, + schema, + clock, + snapshotProgressListener, + dataChangeEventListener, + notificationService)); + } + // If no data collection id is provided, don't return an instance as the implementation requires // that a signal data collection id be provided to work. if (Strings.isNullOrEmpty(configuration.getSignalingDataCollectionId())) { return Optional.empty(); } - final PostgresSignalBasedIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new PostgresSignalBasedIncrementalSnapshotChangeEventSource( + + return Optional.of(new PostgresSignalBasedIncrementalSnapshotChangeEventSource( configuration, connectionFactory.mainConnection(), dispatcher, @@ -107,7 +121,6 @@ public StreamingChangeEventSource getS clock, snapshotProgressListener, dataChangeEventListener, - notificationService); - return Optional.of(incrementalSnapshotChangeEventSource); + notificationService)); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 44e6a34c3..9e1c1e972 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -959,6 +959,16 @@ public static AutoCreateMode parse(String value, String defaultValue) { .withDefault(Boolean.TRUE) .withValidation(Field::isBoolean, PostgresConnectorConfig::validateFlushLsnSource); + public static final Field READ_ONLY_CONNECTION = Field.create("read.only") + .withDisplayName("Read only connection") + .withType(ConfigDef.Type.BOOLEAN) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 100)) + .withDefault(false) + .withWidth(ConfigDef.Width.SHORT) + .withImportance(ConfigDef.Importance.LOW) + .withDescription("Switched connector to use alternative methods to deliver signals to Debezium instead " + + "of writing to signaling table"); + public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(PostgresSourceInfoStructMaker.class.getName()); @@ -971,6 +981,7 @@ public static AutoCreateMode parse(String value, String defaultValue) { private final SnapshotMode snapshotMode; private final SnapshotLockingMode snapshotLockingMode; + private final boolean readOnlyConnection; public PostgresConnectorConfig(Configuration config) { super( @@ -992,6 +1003,7 @@ public PostgresConnectorConfig(Configuration config) { this.replicaIdentityMapper = (replicaIdentityMapping != null) ? new ReplicaIdentityMapper(replicaIdentityMapping) : null; this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString()); this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); + this.readOnlyConnection = config.getBoolean(READ_ONLY_CONNECTION); } protected String hostname() { @@ -1105,6 +1117,13 @@ public Optional getSnapshotLockingMode() { return Optional.of(this.snapshotLockingMode); } + /** + * @return whether database connection should be treated as read-only. + */ + public boolean isReadOnlyConnection() { + return readOnlyConnection; + } + protected int moneyFractionDigits() { return getConfig().getInteger(MONEY_FRACTION_DIGITS); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index b83777609..d80502a49 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -209,8 +209,12 @@ public PostgresOffsetContext load(Map offset) { final Instant useconds = Conversions.toInstantFromMicros((Long) ((Map) offset).getOrDefault(SourceInfo.TIMESTAMP_USEC_KEY, 0L)); final boolean snapshot = (boolean) ((Map) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE); final boolean lastSnapshotRecord = (boolean) ((Map) offset).getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE); - return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, lastCommitLsn, txId, messageType, useconds, snapshot, lastSnapshotRecord, - TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext.load(offset, false)); + return new PostgresOffsetContext(connectorConfig, lsn, + lastCompletelyProcessedLsn, lastCommitLsn, txId, messageType, useconds, snapshot, lastSnapshotRecord, + TransactionContext.load(offset), + connectorConfig.isReadOnlyConnection() + ? PostgresReadOnlyIncrementalSnapshotContext.load(offset) + : SignalBasedIncrementalSnapshotContext.load(offset, false)); } } @@ -245,7 +249,9 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne false, false, new TransactionContext(), - new SignalBasedIncrementalSnapshotContext<>(false)); + connectorConfig.isReadOnlyConnection() + ? new PostgresReadOnlyIncrementalSnapshotContext<>() + : new SignalBasedIncrementalSnapshotContext<>(false)); } catch (SQLException e) { throw new ConnectException("Database processing error", e); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotChangeEventSource.java new file mode 100644 index 000000000..d0f5e868f --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotChangeEventSource.java @@ -0,0 +1,182 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql; + +import java.sql.SQLException; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.DebeziumException; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource; +import io.debezium.pipeline.source.spi.DataChangeEventListener; +import io.debezium.pipeline.source.spi.SnapshotProgressListener; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.TableId; +import io.debezium.schema.DatabaseSchema; +import io.debezium.spi.schema.DataCollectionId; +import io.debezium.util.Clock; + +public class PostgresReadOnlyIncrementalSnapshotChangeEventSource

+ extends AbstractIncrementalSnapshotChangeEventSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresReadOnlyIncrementalSnapshotChangeEventSource.class); + + private static final String FORCE_NEW_TRANSACTION = "SELECT * FROM pg_current_xact_id();"; + private static final String CURRENT_SNAPSHOT = "SELECT * FROM pg_current_snapshot();"; + + public PostgresReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config, + JdbcConnection jdbcConnection, + EventDispatcher dispatcher, + DatabaseSchema databaseSchema, + Clock clock, + SnapshotProgressListener

progressListener, + DataChangeEventListener

dataChangeEventListener, + NotificationService notificationService) { + + super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener, notificationService); + } + + @Override + protected void preIncrementalSnapshotStart() { + super.preIncrementalSnapshotStart(); + + forceNewTransactionId(); + } + + private PostgresReadOnlyIncrementalSnapshotContext getContext() { + return (PostgresReadOnlyIncrementalSnapshotContext) context; + } + + @Override + protected void emitWindowOpen() { + + getCurrentSnapshot(getContext()::setLowWatermark); + } + + @Override + protected void emitWindowClose(P partition, OffsetContext offsetContext) { + + getCurrentSnapshot(getContext()::setHighWatermark); + } + + @Override + public void processMessage(P partition, DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) throws InterruptedException { + + if (getContext() == null) { + LOGGER.warn("Context is null, skipping message processing"); + return; + } + + LOGGER.debug("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window); + getContext().updateWindowState(offsetContext); + + boolean windowClosed = getContext().isWindowClosed(); + if (windowClosed) { + sendWindowEvents(partition, offsetContext); + readChunk(partition, offsetContext); + } + else if (!window.isEmpty() && getContext().deduplicationNeeded()) { + LOGGER.trace("Deduplicating"); + deduplicateWindow(dataCollectionId, key); + } + } + + @Override + public void processTransactionCommittedEvent(P partition, OffsetContext offsetContext) throws InterruptedException { + + if (getContext() == null) { + LOGGER.warn("Context is null, skipping message processing"); + return; + } + LOGGER.trace("Processing transaction event"); + readUntilNewTransactionChange(partition, offsetContext); + LOGGER.trace("Finished processing transaction event"); + } + + private void readUntilNewTransactionChange(P partition, OffsetContext offsetContext) throws InterruptedException { + + Long eventTxId = offsetContext.getSourceInfo().getInt64(SourceInfo.TXID_KEY); + + LOGGER.debug("Event txId {}, snapshot is running {}, reachedHighWatermark {}", + eventTxId, getContext().snapshotRunning(), getContext().isTransactionVisible(eventTxId)); + + if (getContext().snapshotRunning() && maxInProgressTransactionCommitted(eventTxId)) { + getContext().closeWindow(); + sendWindowEvents(partition, offsetContext); + readChunk(partition, offsetContext); + + return; + } + + while (getContext().snapshotRunning() && getContext().isTransactionVisible(eventTxId)) { + LOGGER.debug("Finishing snapshot, snapshot is running {}, reachedHighWatermark {}", getContext().snapshotRunning(), + getContext().isTransactionVisible(eventTxId)); + getContext().closeWindow(); + sendWindowEvents(partition, offsetContext); + readChunk(partition, offsetContext); + if (getContext().watermarksChanged()) { + LOGGER.trace("Watermarks changed"); + return; + } + LOGGER.trace("Re read chunk finished, snapshot is running {}, reachedHighWatermark {}", getContext().snapshotRunning(), + getContext().isTransactionVisible(eventTxId)); + } + } + + private void getCurrentSnapshot(Consumer watermark) { + + try { + + PgSnapshot pgSnapshot = jdbcConnection.queryAndMap(CURRENT_SNAPSHOT, singleResultMapper(rs -> { + + String currentSnapshot = rs.getString(1); + LOGGER.trace("Current snapshot {}", currentSnapshot); + + return PgSnapshot.from(currentSnapshot); + }, "Unable to get current snapshot")); + + watermark.accept(pgSnapshot); + } + catch (SQLException e) { + throw new DebeziumException(e); + } + } + + private boolean maxInProgressTransactionCommitted(Long eventTxId) { + return getContext().getHighWatermark().getXMax().equals(eventTxId); + } + + private void forceNewTransactionId() { + try { + jdbcConnection.query(FORCE_NEW_TRANSACTION, rs -> { + if (rs.next()) { + LOGGER.trace("Created new transaction ID {}", rs.getString(1)); + } + }); + } + catch (SQLException e) { + throw new DebeziumException(e); + } + } + + private JdbcConnection.ResultSetMapper singleResultMapper(JdbcConnection.ResultSetExtractor extractor, String error) throws SQLException { + return (rs) -> { + if (rs.next()) { + final T ret = extractor.apply(rs); + if (!rs.next()) { + return ret; + } + } + throw new IllegalStateException(error); + }; + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotContext.java new file mode 100644 index 000000000..f99a5597f --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotContext.java @@ -0,0 +1,108 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotContext; +import io.debezium.pipeline.spi.OffsetContext; + +public class PostgresReadOnlyIncrementalSnapshotContext extends AbstractIncrementalSnapshotContext { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresReadOnlyIncrementalSnapshotContext.class); + + private PgSnapshot lowWatermark; + private PgSnapshot highWatermark; + private PgSnapshot previousHighWatermark; + private PgSnapshot previousLowWatermark; + + public PostgresReadOnlyIncrementalSnapshotContext() { + this(false); + } + + public PostgresReadOnlyIncrementalSnapshotContext(boolean useCatalogBeforeSchema) { + super(useCatalogBeforeSchema); + } + + public static PostgresReadOnlyIncrementalSnapshotContext load(Map offsets) { + return load(offsets, false); + } + + public static PostgresReadOnlyIncrementalSnapshotContext load(Map offsets, boolean useCatalogBeforeSchema) { + PostgresReadOnlyIncrementalSnapshotContext context = new PostgresReadOnlyIncrementalSnapshotContext<>(useCatalogBeforeSchema); + init(context, offsets); + return context; + } + + public PgSnapshot getLowWatermark() { + return lowWatermark; + } + + public void setLowWatermark(PgSnapshot lowWatermark) { + LOGGER.trace("Setting low watermark to {}", lowWatermark); + this.lowWatermark = lowWatermark; + } + + public PgSnapshot getHighWatermark() { + return highWatermark; + } + + public void setHighWatermark(PgSnapshot highWatermark) { + LOGGER.trace("Setting high watermark to {}", highWatermark); + this.highWatermark = highWatermark; + } + + public void updateWindowState(OffsetContext offsetContext) { + + Long eventTxId = offsetContext.getSourceInfo().getInt64(SourceInfo.TXID_KEY); + LOGGER.trace("Received event with TxId {}", eventTxId); + LOGGER.trace("Updating window. Window oped: {}, low watermark {}, high watermark {}", windowOpened, lowWatermark, highWatermark); + + if (!windowOpened && lowWatermark != null) { + boolean pastLowWatermark = eventTxId >= lowWatermark.getXMin(); + if (pastLowWatermark) { + LOGGER.debug("Current event txId {}, low watermark {}", eventTxId, lowWatermark); + windowOpened = true; + } + } + if (windowOpened && highWatermark != null) { + boolean pastHighWatermark = eventTxId > Math.max(highWatermark.getXMax(), lowWatermark.getXMax()); + if (pastHighWatermark) { + LOGGER.debug("Current event txId {}, high watermark {}", eventTxId, highWatermark); + closeWindow(); + } + } + } + + public boolean isWindowClosed() { + return !windowOpened; + } + + public void closeWindow() { + LOGGER.trace("Window closed. Low and High watermark cleaned"); + windowOpened = false; + previousHighWatermark = highWatermark; + highWatermark = null; + previousLowWatermark = lowWatermark; + lowWatermark = null; + } + + public boolean isTransactionVisible(Long eventTxId) { + if (lowWatermark == null) { + return true; + } + return eventTxId.compareTo(highWatermark.getXMin()) <= 0; + } + + public boolean watermarksChanged() { + LOGGER.trace("previousLowWatermark {}, lowWatermark {}, previousHighWatermark {}, highWatermark {}", previousLowWatermark, lowWatermark, previousHighWatermark, + highWatermark); + return !previousLowWatermark.equals(lowWatermark) || !previousHighWatermark.equals(highWatermark); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index c6ebaf418..83d5a8dc4 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -258,20 +258,24 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff // Tx BEGIN/END event if (message.isTransactionalMessage()) { + + offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), + taskContext.getSlotXmin(connection), + null, + message.getOperation()); + if (!connectorConfig.shouldProvideTransactionMetadata()) { LOGGER.trace("Received transactional message {}", message); // Don't skip on BEGIN message as it would flush LSN for the whole transaction // too early if (message.getOperation() == Operation.COMMIT) { commitMessage(partition, offsetContext, lsn); + dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime()); } return; } - offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), - taskContext.getSlotXmin(connection), - null, - message.getOperation()); + if (message.getOperation() == Operation.BEGIN) { dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime()); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PgSnapshotTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PgSnapshotTest.java new file mode 100644 index 000000000..d5f9a76de --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PgSnapshotTest.java @@ -0,0 +1,41 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import io.debezium.DebeziumException; + +public class PgSnapshotTest { + + @Test + public void parseCorrectPgSnapshotWithInProgressTransactions() { + + PgSnapshot snapshot = PgSnapshot.from("795:799:795,797"); + + Assertions.assertThat(snapshot.getXMin()).isEqualTo(795L); + Assertions.assertThat(snapshot.getXMax()).isEqualTo(799L); + Assertions.assertThat(snapshot.getXip()).contains(795L, 797L); + } + + @Test + public void parseCorrectPgSnapshotWithoutInProgressTransactions() { + + PgSnapshot snapshot = PgSnapshot.from("795:799:"); + + Assertions.assertThat(snapshot.getXMin()).isEqualTo(795L); + Assertions.assertThat(snapshot.getXMax()).isEqualTo(799L); + Assertions.assertThat(snapshot.getXip()).isEmpty(); + } + + @Test + public void parseAWrongPgSnapshotWillThrowException() { + + Assertions.assertThatThrownBy(() -> PgSnapshot.from("795::")) + .isInstanceOf(DebeziumException.class); + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java index ef7c091ed..17c969595 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java @@ -227,6 +227,7 @@ public void init(P partition, OffsetContext offsetContext) { } LOGGER.info("Incremental snapshot in progress, need to read new chunk on start"); try { + preIncrementalSnapshotStart(); progressListener.snapshotStarted(partition); readChunk(partition, offsetContext); } @@ -683,6 +684,10 @@ protected void setContext(IncrementalSnapshotContext context) { this.context = context; } + protected void preIncrementalSnapshotStart() { + // no-op + } + protected void preReadChunk(IncrementalSnapshotContext context) { LOGGER.trace("Pre read chunk");