From 1ce65fa628a1854f6e9a48b48e8e42b147272597 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Wed, 9 Jun 2021 15:09:37 +0200 Subject: [PATCH] DBZ-2975 Avoiding raw types in some places; Also parameterizing offset loader contract, so to avoid casts later on. --- .../connector/mysql/MySqlConnectorTask.java | 2 +- .../connector/mysql/MySqlOffsetContext.java | 4 ++-- .../connector/oracle/OracleConnectorTask.java | 10 ++++------ .../connector/oracle/StreamingAdapter.java | 2 +- .../oracle/logminer/LogMinerAdapter.java | 2 +- .../LogMinerOracleOffsetContextLoader.java | 4 ++-- .../connector/oracle/xstream/XStreamAdapter.java | 2 +- .../XStreamOracleOffsetContextLoader.java | 4 ++-- .../PostgresChangeEventSourceCoordinator.java | 16 +++++++--------- .../postgresql/PostgresConnectorTask.java | 4 ++-- .../postgresql/PostgresOffsetContext.java | 4 ++-- .../sqlserver/SqlServerConnectorTask.java | 2 +- .../sqlserver/SqlServerOffsetContext.java | 4 ++-- .../connector/common/BaseSourceTask.java | 6 +++--- .../pipeline/ChangeEventSourceCoordinator.java | 4 ++-- .../io/debezium/pipeline/spi/OffsetContext.java | 6 ++++-- 16 files changed, 37 insertions(+), 39 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index e319b6155..db6019867 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -88,7 +88,7 @@ public ChangeEventSourceCoordinator start(Configuration conf validateBinlogConfiguration(connectorConfig); - MySqlOffsetContext previousOffset = (MySqlOffsetContext) getPreviousOffset(new MySqlOffsetContext.Loader(connectorConfig)); + MySqlOffsetContext previousOffset = getPreviousOffset(new MySqlOffsetContext.Loader(connectorConfig)); if (previousOffset == null) { LOGGER.info("No previous offset found"); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java index d515c4505..6f894f573 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java @@ -186,7 +186,7 @@ public static MySqlOffsetContext initial(MySqlConnectorConfig config) { return offset; } - public static class Loader implements OffsetContext.Loader { + public static class Loader implements OffsetContext.Loader { private final MySqlConnectorConfig connectorConfig; @@ -200,7 +200,7 @@ public Loader(MySqlConnectorConfig connectorConfig) { } @Override - public OffsetContext load(Map offset) { + public MySqlOffsetContext load(Map offset) { boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)) || "true".equals(offset.get(SourceInfo.SNAPSHOT_KEY)); boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)) || "true".equals(offset.get(SNAPSHOT_COMPLETED_KEY)); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java index 3555a19bc..56d5a44fa 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java @@ -22,13 +22,12 @@ import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; -import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.Clock; import io.debezium.util.SchemaNameAdjuster; -public class OracleConnectorTask extends BaseSourceTask { +public class OracleConnectorTask extends BaseSourceTask { private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnectorTask.class); private static final String CONTEXT_NAME = "oracle-connector-task"; @@ -45,7 +44,7 @@ public String version() { } @Override - public ChangeEventSourceCoordinator start(Configuration config) { + public ChangeEventSourceCoordinator start(Configuration config) { OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config); TopicSelector topicSelector = OracleTopicSelector.defaultSelector(connectorConfig); SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); @@ -57,8 +56,7 @@ public ChangeEventSourceCoordinator start(Configuration config) { TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection); this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, schemaNameAdjuster, topicSelector, tableNameCaseSensitivity); this.schema.initializeStorage(); - - OffsetContext previousOffset = getPreviousOffset(connectorConfig.getAdapter().getOffsetContextLoader()); + OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig.getAdapter().getOffsetContextLoader()); if (previousOffset != null) { schema.recover(previousOffset); @@ -93,7 +91,7 @@ public ChangeEventSourceCoordinator start(Configuration config) { final OracleStreamingChangeEventSourceMetrics streamingMetrics = new OracleStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider, connectorConfig); - ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator( + ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( previousOffset, errorHandler, OracleConnector.class, diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/StreamingAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/StreamingAdapter.java index 465b0aa90..3a9ad1d5c 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/StreamingAdapter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/StreamingAdapter.java @@ -46,7 +46,7 @@ enum TableNameCaseSensitivity { HistoryRecordComparator getHistoryRecordComparator(); - OffsetContext.Loader getOffsetContextLoader(); + OffsetContext.Loader getOffsetContextLoader(); StreamingChangeEventSource getSource(OracleConnection connection, EventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema, diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java index c693b6d51..a0b5c2f20 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java @@ -49,7 +49,7 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { } @Override - public OffsetContext.Loader getOffsetContextLoader() { + public OffsetContext.Loader getOffsetContextLoader() { return new LogMinerOracleOffsetContextLoader(connectorConfig); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java index 4e18810a5..7ea04e615 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java @@ -18,7 +18,7 @@ /** * @author Chris Cranford */ -public class LogMinerOracleOffsetContextLoader implements OffsetContext.Loader { +public class LogMinerOracleOffsetContextLoader implements OffsetContext.Loader { private final OracleConnectorConfig connectorConfig; @@ -32,7 +32,7 @@ public LogMinerOracleOffsetContextLoader(OracleConnectorConfig connectorConfig) } @Override - public OffsetContext load(Map offset) { + public OracleOffsetContext load(Map offset) { boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(OracleOffsetContext.SNAPSHOT_COMPLETED_KEY)); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamAdapter.java index 7e139be3b..b4348c2bc 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamAdapter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamAdapter.java @@ -60,7 +60,7 @@ public boolean isPositionAtOrBefore(Document recorded, Document desired) { } @Override - public OffsetContext.Loader getOffsetContextLoader() { + public OffsetContext.Loader getOffsetContextLoader() { return new XStreamOracleOffsetContextLoader(connectorConfig); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java index 1f7d22ba0..a7beb8a51 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java @@ -20,7 +20,7 @@ * * @author Chris Cranford */ -public class XStreamOracleOffsetContextLoader implements OffsetContext.Loader { +public class XStreamOracleOffsetContextLoader implements OffsetContext.Loader { private final OracleConnectorConfig connectorConfig; @@ -34,7 +34,7 @@ public XStreamOracleOffsetContextLoader(OracleConnectorConfig connectorConfig) { } @Override - public OffsetContext load(Map offset) { + public OracleOffsetContext load(Map offset) { boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(OracleOffsetContext.SNAPSHOT_COMPLETED_KEY)); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java index a8d7b1032..14f997e1b 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java @@ -21,26 +21,24 @@ import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory; import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; -import io.debezium.pipeline.source.spi.ChangeEventSourceFactory; import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; -import io.debezium.pipeline.spi.OffsetContext; import io.debezium.schema.DatabaseSchema; /** * Coordinates one or more {@link ChangeEventSource}s and executes them in order. Extends the base * {@link ChangeEventSourceCoordinator} to support a pre-snapshot catch up streaming phase. */ -public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoordinator { +public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoordinator { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresChangeEventSourceCoordinator.class); private final Snapshotter snapshotter; private final SlotState slotInfo; - public PostgresChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler errorHandler, + public PostgresChangeEventSourceCoordinator(PostgresOffsetContext previousOffset, ErrorHandler errorHandler, Class connectorType, CommonConnectorConfig connectorConfig, - ChangeEventSourceFactory changeEventSourceFactory, + PostgresChangeEventSourceFactory changeEventSourceFactory, ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory, EventDispatcher eventDispatcher, DatabaseSchema schema, Snapshotter snapshotter, SlotState slotInfo) { @@ -50,19 +48,19 @@ public PostgresChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorH } @Override - protected CatchUpStreamingResult executeCatchUpStreaming(OffsetContext previousOffset, ChangeEventSourceContext context, - SnapshotChangeEventSource snapshotSource) + protected CatchUpStreamingResult executeCatchUpStreaming(PostgresOffsetContext previousOffset, ChangeEventSourceContext context, + SnapshotChangeEventSource snapshotSource) throws InterruptedException { if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot() && slotInfo != null) { try { setSnapshotStartLsn((PostgresSnapshotChangeEventSource) snapshotSource, - (PostgresOffsetContext) previousOffset); + previousOffset); } catch (SQLException e) { throw new DebeziumException("Failed to determine catch-up streaming stopping LSN"); } LOGGER.info("Previous connector state exists and will stream events until {} then perform snapshot", - ((PostgresOffsetContext) previousOffset).getStreamingStoppingLsn()); + previousOffset.getStreamingStoppingLsn()); streamEvents(previousOffset, context); return new CatchUpStreamingResult(true); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 596d86c9c..56c9cb383 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -93,7 +93,7 @@ public ChangeEventSourceCoordinator start(Configuration c schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector, valueConverterBuilder.build(typeRegistry)); this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector); - final PostgresOffsetContext previousOffset = (PostgresOffsetContext) getPreviousOffset(new PostgresOffsetContext.Loader(connectorConfig)); + final PostgresOffsetContext previousOffset = getPreviousOffset(new PostgresOffsetContext.Loader(connectorConfig)); final Clock clock = Clock.system(); LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); @@ -197,7 +197,7 @@ public ChangeEventSourceCoordinator start(Configuration c schemaNameAdjuster, jdbcConnection); - ChangeEventSourceCoordinator coordinator = new PostgresChangeEventSourceCoordinator( + ChangeEventSourceCoordinator coordinator = new PostgresChangeEventSourceCoordinator ( previousOffset, errorHandler, PostgresConnector.class, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index 0ed95d6c8..5cb343b1e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -187,7 +187,7 @@ Long xmin() { return sourceInfo.xmin(); } - public static class Loader implements OffsetContext.Loader { + public static class Loader implements OffsetContext.Loader { private final PostgresConnectorConfig connectorConfig; @@ -207,7 +207,7 @@ private Long readOptionalLong(Map offset, String key) { @SuppressWarnings("unchecked") @Override - public OffsetContext load(Map offset) { + public PostgresOffsetContext load(Map offset) { final Lsn lsn = Lsn.valueOf(readOptionalLong(offset, SourceInfo.LSN_KEY)); final Lsn lastCompletelyProcessedLsn = Lsn.valueOf(readOptionalLong(offset, LAST_COMPLETELY_PROCESSED_LSN_KEY)); final Lsn lastCommitLsn = Lsn.valueOf(readOptionalLong(offset, LAST_COMPLETELY_PROCESSED_LSN_KEY)); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index a443de2a7..e1fe61f64 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -83,7 +83,7 @@ public ChangeEventSourceCoordinator start(Configuration this.schema = new SqlServerDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster); this.schema.initializeStorage(); - final SqlServerOffsetContext previousOffset = (SqlServerOffsetContext) getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig)); + final SqlServerOffsetContext previousOffset = getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig)); if (previousOffset != null) { schema.recover(previousOffset); } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java index e7fb68101..17fb9275b 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -140,7 +140,7 @@ public void postSnapshotCompletion() { sourceInfo.setSnapshot(SnapshotRecord.FALSE); } - public static class Loader implements OffsetContext.Loader { + public static class Loader implements OffsetContext.Loader { private final SqlServerConnectorConfig connectorConfig; @@ -154,7 +154,7 @@ public Loader(SqlServerConnectorConfig connectorConfig) { } @Override - public OffsetContext load(Map offset) { + public SqlServerOffsetContext load(Map offset) { final Lsn changeLsn = Lsn.valueOf((String) offset.get(SourceInfo.CHANGE_LSN_KEY)); final Lsn commitLsn = Lsn.valueOf((String) offset.get(SourceInfo.COMMIT_LSN_KEY)); boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); diff --git a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java index ec51bdf49..8fdbb2c5d 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java @@ -298,11 +298,11 @@ public void commit() throws InterruptedException { /** * Loads the connector's persistent offset (if present) via the given loader. */ - protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) { + protected O getPreviousOffset(OffsetContext.Loader loader) { Map partition = loader.getPartition(); if (lastOffset != null) { - OffsetContext offsetContext = loader.load(lastOffset); + O offsetContext = loader.load(lastOffset); LOGGER.info("Found previous offset after restart {}", offsetContext); return offsetContext; } @@ -312,7 +312,7 @@ protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) { .get(partition); if (previousOffset != null) { - OffsetContext offsetContext = loader.load(previousOffset); + O offsetContext = loader.load(previousOffset); LOGGER.info("Found previous offset {}", offsetContext); return offsetContext; } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index e38a60747..ec09e4fa9 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -82,7 +82,7 @@ public ChangeEventSourceCoordinator(O previousOffset, ErrorHandler errorHandler, this.schema = schema; } - public synchronized void start(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, + public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) { AtomicReference previousLogContext = new AtomicReference<>(); try { @@ -141,7 +141,7 @@ public synchronized void start(T taskContext, C } } - protected CatchUpStreamingResult executeCatchUpStreaming(OffsetContext previousOffset, ChangeEventSourceContext context, + protected CatchUpStreamingResult executeCatchUpStreaming(O previousOffset, ChangeEventSourceContext context, SnapshotChangeEventSource snapshotSource) throws InterruptedException { return new CatchUpStreamingResult(false); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java index c78f3f0d6..c220788c6 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java @@ -11,8 +11,10 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.pipeline.txmetadata.TransactionMonitor; import io.debezium.schema.DataCollectionId; /** @@ -27,10 +29,10 @@ public interface OffsetContext { /** * Implementations load a connector-specific offset context based on the offset values stored in Kafka. */ - interface Loader { + interface Loader { Map getPartition(); - OffsetContext load(Map offset); + O load(Map offset); } Map getPartition();