DBZ-7917 Support schema refresh for PostgreSQL read-only incremental snapshot

This commit is contained in:
mfvitale 2024-06-07 17:10:59 +02:00 committed by Jiri Pechanec
parent 8dc9605aac
commit 7225e93722
2 changed files with 21 additions and 2 deletions

View File

@ -66,10 +66,15 @@ public static PgSnapshot from(String snapshotString) {
@Override
public boolean equals(Object o) {
if (this == o)
if (this == o) {
return true;
if (o == null || getClass() != o.getClass())
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PgSnapshot that = (PgSnapshot) o;
return Objects.equals(xMin, that.xMin) && Objects.equals(xMax, that.xMax) && Objects.equals(xip, that.xip);
}

View File

@ -12,6 +12,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
@ -20,6 +21,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
@ -33,6 +35,9 @@ public class PostgresReadOnlyIncrementalSnapshotChangeEventSource<P extends Post
private static final String FORCE_NEW_TRANSACTION = "SELECT * FROM pg_current_xact_id();";
private static final String CURRENT_SNAPSHOT = "SELECT * FROM pg_current_snapshot();";
private final PostgresConnection jdbcConnection;
private final PostgresSchema schema;
public PostgresReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config,
JdbcConnection jdbcConnection,
EventDispatcher<P, TableId> dispatcher,
@ -43,6 +48,8 @@ public PostgresReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseCo
NotificationService<P, ? extends OffsetContext> notificationService) {
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener, notificationService);
this.jdbcConnection = (PostgresConnection) jdbcConnection;
this.schema = (PostgresSchema) databaseSchema;
}
@Override
@ -116,6 +123,13 @@ public void processHeartbeat(P partition, OffsetContext offsetContext) throws In
LOGGER.trace("Finished processing heartbeat event");
}
@Override
protected Table refreshTableSchema(Table table) throws SQLException {
LOGGER.debug("Refreshing table '{}' schema for incremental snapshot.", table.id());
schema.refresh(jdbcConnection, table.id(), true);
return schema.tableFor(table.id());
}
private void readUntilNewTransactionChange(P partition, OffsetContext offsetContext) throws InterruptedException {
Long eventTxId = offsetContext.getSourceInfo().getInt64(SourceInfo.TXID_KEY);