DBZ-7917 Add javadocs to PostgresReadOnlyIncrementalSnapshotChangeEventSource, PgSnapshot and PostgresReadOnlyIncrementalSnapshotContext

This commit is contained in:
mfvitale 2024-06-12 15:52:55 +02:00 committed by Jiri Pechanec
parent 8edb87a881
commit 0fbc63c0c7
4 changed files with 61 additions and 18 deletions

View File

@ -14,6 +14,11 @@
import io.debezium.DebeziumException;
/**
* This class contains the information returned by the pg_current_snapshot function.
*
* @author Mario Fiore Vitale
*/
public class PgSnapshot {
private static final String SNAPSHOT_FORMAT = "(\\d+):(\\d+):((\\d+,*)+)*";
@ -42,7 +47,13 @@ public Set<Long> getXip() {
return xip;
}
public static PgSnapshot from(String snapshotString) {
/**
* Returns a PgSnapshot instance representing the specified snapshot string
*
* @param snapshotString is the string returned by the pg_current_snapshot function
* @return a PgSnapshot representing the {@code snapshotString}
*/
public static PgSnapshot valueOf(String snapshotString) {
Matcher matcher = SNAPSHOT_PATTERN.matcher(snapshotString);

View File

@ -27,6 +27,44 @@
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
/**
* A PostgreSQL specific read-only incremental snapshot change event source.
* Uses {@code PgSnapshot} as low/high watermarks for incremental snapshot window.
* <p>
* <b>Prerequisites</b>
* <ul>
* <li> PostgreSQL version >= 13 </li>
* </ul>
* </p>
* <p>
* <b>How a chunk is snapshotted</b>
* <ul>
* <li> a pg_current_snapshot() query is executed and the low watermark is set to lowWatermark </li>
* <li> a new data chunk is read from a database by generating the SELECT statement and placed into a window buffer keyed by primary keys </li>
* <li> a pg_current_snapshot() query is executed and the high watermark is set to highWatermark </li>
* </ul>
* </p>
* <p>
* <b>During the subsequent streaming</b>
* <ul>
* <li> if WAL event is received and its txId is greater then or equal to xMin of the low watermark, then window processing mode is enabled </li>
* <li> if WAL event is received and its txId is greater then the xMax of high watermark, then window processing mode is disabled and the rest of the windows buffer is streamed </li>
* <li> if window processing mode is enabled then if the event key is contained in the window buffer then it is removed from the window buffer </li>
* <li> event is streamed </li>
* </ul>
* </p>
* <br/>
* <br/>
* <b>No binlog events</b>
* <p>Heartbeat events (events that are sent when there are not changes from the WAL) are used to update the window processing mode when the rate of WAL updates is low.</p>
* <p>The heartbeat has the same txId as the latest WAL event at the moment and it will used to continue to read chunks even there are not event coming from the WAL. This processing will end if the watermarks changes because that means there are some transaction potentially modifying data.</p>
* <br/>
* <b>No changes between watermarks</b>
* <p>A window can be opened and closed right away by the same event. This can happen when a high watermark and low watermark are the same, which means there were no active transaction during the chunk select. Chunk will get inserted right after the low watermark, no events will be deduplicated from the chunk</p>
* <br/>
*
* @author Mario Fiore Vitale
*/
public class PostgresReadOnlyIncrementalSnapshotChangeEventSource<P extends PostgresPartition>
extends AbstractIncrementalSnapshotChangeEventSource<P, TableId> {
@ -168,12 +206,12 @@ private void getCurrentSnapshot(Consumer<PgSnapshot> watermark) {
try {
PgSnapshot pgSnapshot = jdbcConnection.queryAndMap(CURRENT_SNAPSHOT, singleResultMapper(rs -> {
PgSnapshot pgSnapshot = jdbcConnection.queryAndMap(CURRENT_SNAPSHOT, jdbcConnection.singleResultMapper(rs -> {
String currentSnapshot = rs.getString(1);
LOGGER.trace("Current snapshot {}", currentSnapshot);
return PgSnapshot.from(currentSnapshot);
return PgSnapshot.valueOf(currentSnapshot);
}, "Unable to get current snapshot"));
watermark.accept(pgSnapshot);
@ -205,15 +243,4 @@ private void forceNewTransactionId() {
}
}
private <T> JdbcConnection.ResultSetMapper<T> singleResultMapper(JdbcConnection.ResultSetExtractor<T> extractor, String error) throws SQLException {
return (rs) -> {
if (rs.next()) {
final T ret = extractor.apply(rs);
if (!rs.next()) {
return ret;
}
}
throw new IllegalStateException(error);
};
}
}

View File

@ -13,6 +13,11 @@
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
/**
* A class describing PostgreSQL read only incremental snapshot context state.
*
* @author Mario Fiore Vitale
*/
public class PostgresReadOnlyIncrementalSnapshotContext<T> extends AbstractIncrementalSnapshotContext<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresReadOnlyIncrementalSnapshotContext.class);
@ -62,7 +67,7 @@ public void updateWindowState(OffsetContext offsetContext) {
Long eventTxId = offsetContext.getSourceInfo().getInt64(SourceInfo.TXID_KEY);
LOGGER.trace("Received event with TxId {}", eventTxId);
LOGGER.trace("Updating window. Window oped: {}, low watermark {}, high watermark {}", windowOpened, lowWatermark, highWatermark);
LOGGER.trace("Updating window. Window opened: {}, low watermark {}, high watermark {}", windowOpened, lowWatermark, highWatermark);
if (!windowOpened && lowWatermark != null) {
boolean pastLowWatermark = eventTxId >= lowWatermark.getXMin();

View File

@ -15,7 +15,7 @@ public class PgSnapshotTest {
@Test
public void parseCorrectPgSnapshotWithInProgressTransactions() {
PgSnapshot snapshot = PgSnapshot.from("795:799:795,797");
PgSnapshot snapshot = PgSnapshot.valueOf("795:799:795,797");
Assertions.assertThat(snapshot.getXMin()).isEqualTo(795L);
Assertions.assertThat(snapshot.getXMax()).isEqualTo(799L);
@ -25,7 +25,7 @@ public void parseCorrectPgSnapshotWithInProgressTransactions() {
@Test
public void parseCorrectPgSnapshotWithoutInProgressTransactions() {
PgSnapshot snapshot = PgSnapshot.from("795:799:");
PgSnapshot snapshot = PgSnapshot.valueOf("795:799:");
Assertions.assertThat(snapshot.getXMin()).isEqualTo(795L);
Assertions.assertThat(snapshot.getXMax()).isEqualTo(799L);
@ -35,7 +35,7 @@ public void parseCorrectPgSnapshotWithoutInProgressTransactions() {
@Test
public void parseAWrongPgSnapshotWillThrowException() {
Assertions.assertThatThrownBy(() -> PgSnapshot.from("795::"))
Assertions.assertThatThrownBy(() -> PgSnapshot.valueOf("795::"))
.isInstanceOf(DebeziumException.class);
}
}