DBZ-2975 Avoiding raw types in some places;

Also parameterizing offset loader contract, so to avoid casts later on.
This commit is contained in:
Gunnar Morling 2021-06-09 15:09:37 +02:00
parent 657a6d2542
commit 1ce65fa628
16 changed files with 37 additions and 39 deletions

View File

@ -88,7 +88,7 @@ public ChangeEventSourceCoordinator<MySqlOffsetContext> start(Configuration conf
validateBinlogConfiguration(connectorConfig);
MySqlOffsetContext previousOffset = (MySqlOffsetContext) getPreviousOffset(new MySqlOffsetContext.Loader(connectorConfig));
MySqlOffsetContext previousOffset = getPreviousOffset(new MySqlOffsetContext.Loader(connectorConfig));
if (previousOffset == null) {
LOGGER.info("No previous offset found");
}

View File

@ -186,7 +186,7 @@ public static MySqlOffsetContext initial(MySqlConnectorConfig config) {
return offset;
}
public static class Loader implements OffsetContext.Loader {
public static class Loader implements OffsetContext.Loader<MySqlOffsetContext> {
private final MySqlConnectorConfig connectorConfig;
@ -200,7 +200,7 @@ public Loader(MySqlConnectorConfig connectorConfig) {
}
@Override
public OffsetContext load(Map<String, ?> offset) {
public MySqlOffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)) || "true".equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)) || "true".equals(offset.get(SNAPSHOT_COMPLETED_KEY));

View File

@ -22,13 +22,12 @@
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
public class OracleConnectorTask extends BaseSourceTask {
public class OracleConnectorTask extends BaseSourceTask<OracleOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnectorTask.class);
private static final String CONTEXT_NAME = "oracle-connector-task";
@ -45,7 +44,7 @@ public String version() {
}
@Override
public ChangeEventSourceCoordinator start(Configuration config) {
public ChangeEventSourceCoordinator<OracleOffsetContext> start(Configuration config) {
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
@ -57,8 +56,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection);
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, schemaNameAdjuster, topicSelector, tableNameCaseSensitivity);
this.schema.initializeStorage();
OffsetContext previousOffset = getPreviousOffset(connectorConfig.getAdapter().getOffsetContextLoader());
OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig.getAdapter().getOffsetContextLoader());
if (previousOffset != null) {
schema.recover(previousOffset);
@ -93,7 +91,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
final OracleStreamingChangeEventSourceMetrics streamingMetrics = new OracleStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider,
connectorConfig);
ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator(
ChangeEventSourceCoordinator<OracleOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffset,
errorHandler,
OracleConnector.class,

View File

@ -46,7 +46,7 @@ enum TableNameCaseSensitivity {
HistoryRecordComparator getHistoryRecordComparator();
OffsetContext.Loader getOffsetContextLoader();
OffsetContext.Loader<OracleOffsetContext> getOffsetContextLoader();
StreamingChangeEventSource<OracleOffsetContext> getSource(OracleConnection connection, EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema,

View File

@ -49,7 +49,7 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
}
@Override
public OffsetContext.Loader getOffsetContextLoader() {
public OffsetContext.Loader<OracleOffsetContext> getOffsetContextLoader() {
return new LogMinerOracleOffsetContextLoader(connectorConfig);
}

View File

@ -18,7 +18,7 @@
/**
* @author Chris Cranford
*/
public class LogMinerOracleOffsetContextLoader implements OffsetContext.Loader {
public class LogMinerOracleOffsetContextLoader implements OffsetContext.Loader<OracleOffsetContext> {
private final OracleConnectorConfig connectorConfig;
@ -32,7 +32,7 @@ public LogMinerOracleOffsetContextLoader(OracleConnectorConfig connectorConfig)
}
@Override
public OffsetContext load(Map<String, ?> offset) {
public OracleOffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(OracleOffsetContext.SNAPSHOT_COMPLETED_KEY));

View File

@ -60,7 +60,7 @@ public boolean isPositionAtOrBefore(Document recorded, Document desired) {
}
@Override
public OffsetContext.Loader getOffsetContextLoader() {
public OffsetContext.Loader<OracleOffsetContext> getOffsetContextLoader() {
return new XStreamOracleOffsetContextLoader(connectorConfig);
}

View File

@ -20,7 +20,7 @@
*
* @author Chris Cranford
*/
public class XStreamOracleOffsetContextLoader implements OffsetContext.Loader {
public class XStreamOracleOffsetContextLoader implements OffsetContext.Loader<OracleOffsetContext> {
private final OracleConnectorConfig connectorConfig;
@ -34,7 +34,7 @@ public XStreamOracleOffsetContextLoader(OracleConnectorConfig connectorConfig) {
}
@Override
public OffsetContext load(Map<String, ?> offset) {
public OracleOffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(OracleOffsetContext.SNAPSHOT_COMPLETED_KEY));

View File

@ -21,26 +21,24 @@
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DatabaseSchema;
/**
* Coordinates one or more {@link ChangeEventSource}s and executes them in order. Extends the base
* {@link ChangeEventSourceCoordinator} to support a pre-snapshot catch up streaming phase.
*/
public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoordinator {
public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoordinator<PostgresOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresChangeEventSourceCoordinator.class);
private final Snapshotter snapshotter;
private final SlotState slotInfo;
public PostgresChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler errorHandler,
public PostgresChangeEventSourceCoordinator(PostgresOffsetContext previousOffset, ErrorHandler errorHandler,
Class<? extends SourceConnector> connectorType,
CommonConnectorConfig connectorConfig,
ChangeEventSourceFactory changeEventSourceFactory,
PostgresChangeEventSourceFactory changeEventSourceFactory,
ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory,
EventDispatcher<?> eventDispatcher, DatabaseSchema<?> schema,
Snapshotter snapshotter, SlotState slotInfo) {
@ -50,19 +48,19 @@ public PostgresChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorH
}
@Override
protected CatchUpStreamingResult executeCatchUpStreaming(OffsetContext previousOffset, ChangeEventSourceContext context,
SnapshotChangeEventSource snapshotSource)
protected CatchUpStreamingResult executeCatchUpStreaming(PostgresOffsetContext previousOffset, ChangeEventSourceContext context,
SnapshotChangeEventSource<PostgresOffsetContext> snapshotSource)
throws InterruptedException {
if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot() && slotInfo != null) {
try {
setSnapshotStartLsn((PostgresSnapshotChangeEventSource) snapshotSource,
(PostgresOffsetContext) previousOffset);
previousOffset);
}
catch (SQLException e) {
throw new DebeziumException("Failed to determine catch-up streaming stopping LSN");
}
LOGGER.info("Previous connector state exists and will stream events until {} then perform snapshot",
((PostgresOffsetContext) previousOffset).getStreamingStoppingLsn());
previousOffset.getStreamingStoppingLsn());
streamEvents(previousOffset, context);
return new CatchUpStreamingResult(true);
}

View File

@ -93,7 +93,7 @@ public ChangeEventSourceCoordinator<PostgresOffsetContext> start(Configuration c
schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector, valueConverterBuilder.build(typeRegistry));
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector);
final PostgresOffsetContext previousOffset = (PostgresOffsetContext) getPreviousOffset(new PostgresOffsetContext.Loader(connectorConfig));
final PostgresOffsetContext previousOffset = getPreviousOffset(new PostgresOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
@ -197,7 +197,7 @@ public ChangeEventSourceCoordinator<PostgresOffsetContext> start(Configuration c
schemaNameAdjuster,
jdbcConnection);
ChangeEventSourceCoordinator coordinator = new PostgresChangeEventSourceCoordinator(
ChangeEventSourceCoordinator<PostgresOffsetContext> coordinator = new PostgresChangeEventSourceCoordinator (
previousOffset,
errorHandler,
PostgresConnector.class,

View File

@ -187,7 +187,7 @@ Long xmin() {
return sourceInfo.xmin();
}
public static class Loader implements OffsetContext.Loader {
public static class Loader implements OffsetContext.Loader<PostgresOffsetContext> {
private final PostgresConnectorConfig connectorConfig;
@ -207,7 +207,7 @@ private Long readOptionalLong(Map<String, ?> offset, String key) {
@SuppressWarnings("unchecked")
@Override
public OffsetContext load(Map<String, ?> offset) {
public PostgresOffsetContext load(Map<String, ?> offset) {
final Lsn lsn = Lsn.valueOf(readOptionalLong(offset, SourceInfo.LSN_KEY));
final Lsn lastCompletelyProcessedLsn = Lsn.valueOf(readOptionalLong(offset, LAST_COMPLETELY_PROCESSED_LSN_KEY));
final Lsn lastCommitLsn = Lsn.valueOf(readOptionalLong(offset, LAST_COMPLETELY_PROCESSED_LSN_KEY));

View File

@ -83,7 +83,7 @@ public ChangeEventSourceCoordinator<SqlServerOffsetContext> start(Configuration
this.schema = new SqlServerDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster);
this.schema.initializeStorage();
final SqlServerOffsetContext previousOffset = (SqlServerOffsetContext) getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig));
final SqlServerOffsetContext previousOffset = getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig));
if (previousOffset != null) {
schema.recover(previousOffset);
}

View File

@ -140,7 +140,7 @@ public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
public static class Loader implements OffsetContext.Loader {
public static class Loader implements OffsetContext.Loader<SqlServerOffsetContext> {
private final SqlServerConnectorConfig connectorConfig;
@ -154,7 +154,7 @@ public Loader(SqlServerConnectorConfig connectorConfig) {
}
@Override
public OffsetContext load(Map<String, ?> offset) {
public SqlServerOffsetContext load(Map<String, ?> offset) {
final Lsn changeLsn = Lsn.valueOf((String) offset.get(SourceInfo.CHANGE_LSN_KEY));
final Lsn commitLsn = Lsn.valueOf((String) offset.get(SourceInfo.COMMIT_LSN_KEY));
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));

View File

@ -298,11 +298,11 @@ public void commit() throws InterruptedException {
/**
* Loads the connector's persistent offset (if present) via the given loader.
*/
protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) {
protected O getPreviousOffset(OffsetContext.Loader<O> loader) {
Map<String, ?> partition = loader.getPartition();
if (lastOffset != null) {
OffsetContext offsetContext = loader.load(lastOffset);
O offsetContext = loader.load(lastOffset);
LOGGER.info("Found previous offset after restart {}", offsetContext);
return offsetContext;
}
@ -312,7 +312,7 @@ protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) {
.get(partition);
if (previousOffset != null) {
OffsetContext offsetContext = loader.load(previousOffset);
O offsetContext = loader.load(previousOffset);
LOGGER.info("Found previous offset {}", offsetContext);
return offsetContext;
}

View File

@ -82,7 +82,7 @@ public ChangeEventSourceCoordinator(O previousOffset, ErrorHandler errorHandler,
this.schema = schema;
}
public synchronized <T extends CdcSourceTaskContext> void start(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {
AtomicReference<LoggingContext.PreviousContext> previousLogContext = new AtomicReference<>();
try {
@ -141,7 +141,7 @@ public synchronized <T extends CdcSourceTaskContext> void start(T taskContext, C
}
}
protected CatchUpStreamingResult executeCatchUpStreaming(OffsetContext previousOffset, ChangeEventSourceContext context,
protected CatchUpStreamingResult executeCatchUpStreaming(O previousOffset, ChangeEventSourceContext context,
SnapshotChangeEventSource<O> snapshotSource)
throws InterruptedException {
return new CatchUpStreamingResult(false);

View File

@ -11,8 +11,10 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.schema.DataCollectionId;
/**
@ -27,10 +29,10 @@ public interface OffsetContext {
/**
* Implementations load a connector-specific offset context based on the offset values stored in Kafka.
*/
interface Loader {
interface Loader<O extends OffsetContext> {
Map<String, ?> getPartition();
OffsetContext load(Map<String, ?> offset);
O load(Map<String, ?> offset);
}
Map<String, ?> getPartition();