DBZ-7425 Use adapter-specific offset loaders when copying offsets

This commit is contained in:
Chris Cranford 2024-02-05 10:59:15 -05:00 committed by Chris Cranford
parent a597a82c19
commit e2d03d9936
5 changed files with 24 additions and 2 deletions

View File

@ -21,7 +21,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.logminer.LogMinerOracleOffsetContextLoader;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
@ -308,6 +307,6 @@ private static class OracleSnapshotContext extends RelationalSnapshotContext<Ora
@Override
protected OracleOffsetContext copyOffset(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext) {
return new LogMinerOracleOffsetContextLoader(connectorConfig).load(snapshotContext.offset.getOffset());
return connectorConfig.getAdapter().copyOffset(connectorConfig, snapshotContext.offset);
}
}

View File

@ -105,4 +105,12 @@ default OracleValueConverters getValueConverter(OracleConnectorConfig connectorC
return new OracleValueConverters(connectorConfig, connection);
}
/**
* Creates a copy of the existing offsets.
*
* @param connectorConfig the connector configuration, should never be {@code null}
* @param offsetContext the current offset context, should never be {@code null}
* @return a copy of the offset context for this adapter
*/
OracleOffsetContext copyOffset(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext);
}

View File

@ -150,6 +150,11 @@ public OracleOffsetContext determineSnapshotOffset(RelationalSnapshotContext<Ora
}
}
@Override
public OracleOffsetContext copyOffset(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext) {
return new LogMinerOracleOffsetContextLoader(connectorConfig).load(offsetContext.getOffset());
}
private Optional<Scn> getCurrentScn(Scn latestTableDdlScn, OracleConnection connection) throws SQLException {
final String query = "SELECT CURRENT_SCN FROM V$DATABASE";

View File

@ -129,4 +129,8 @@ public OracleValueConverters getValueConverter(OracleConnectorConfig connectorCo
return new OpenLogReplicatorValueConverter(connectorConfig, connection);
}
@Override
public OracleOffsetContext copyOffset(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext) {
return new OpenLogReplicatorOracleOffsetContextLoader(connectorConfig).load(offsetContext.getOffset());
}
}

View File

@ -142,4 +142,10 @@ public OracleOffsetContext determineSnapshotOffset(RelationalSnapshotContext<Ora
.incrementalSnapshotContext(new SignalBasedIncrementalSnapshotContext<>())
.build();
}
@Override
public OracleOffsetContext copyOffset(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext) {
return new XStreamOracleOffsetContextLoader(connectorConfig).load(offsetContext.getOffset());
}
}