DBZ-2975: Introduce io.debezium.pipeline.spi.Offsets

This commit is contained in:
Sergei Morozov 2021-08-06 14:11:36 -07:00 committed by Gunnar Morling
parent c7ac5fca52
commit 77e2ac8a12
9 changed files with 82 additions and 62 deletions

View File

@ -28,6 +28,7 @@
import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.util.Clock; import io.debezium.util.Clock;
import io.debezium.util.LoggingContext.PreviousContext; import io.debezium.util.LoggingContext.PreviousContext;
import io.debezium.util.SchemaNameAdjuster; import io.debezium.util.SchemaNameAdjuster;
@ -105,7 +106,7 @@ public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> star
schemaNameAdjuster); schemaNameAdjuster);
ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> coordinator = new ChangeEventSourceCoordinator<>( ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
Collections.singletonMap(new MongoDbPartition(), previousOffset), new Offsets<>(Collections.singletonMap(new MongoDbPartition(), previousOffset)),
errorHandler, errorHandler,
MongoDbConnector.class, MongoDbConnector.class,
connectorConfig, connectorConfig,

View File

@ -7,7 +7,6 @@
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
@ -29,6 +28,7 @@
import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.history.AbstractDatabaseHistory; import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.schema.TopicSelector; import io.debezium.schema.TopicSelector;
@ -83,7 +83,8 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
validateBinlogConfiguration(connectorConfig); validateBinlogConfiguration(connectorConfig);
Map<MySqlPartition, MySqlOffsetContext> previousOffsets = getPreviousOffsets(new MySqlPartition.Provider(connectorConfig), Offsets<MySqlPartition, MySqlOffsetContext> previousOffsets = getPreviousOffsets(
new MySqlPartition.Provider(connectorConfig),
new MySqlOffsetContext.Loader(connectorConfig)); new MySqlOffsetContext.Loader(connectorConfig));
final boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive(); final boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();
@ -99,7 +100,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
throw new DebeziumException(e); throw new DebeziumException(e);
} }
MySqlOffsetContext previousOffset = getTheOnlyOffset(previousOffsets); MySqlOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
validateAndLoadDatabaseHistory(connectorConfig, previousOffset, schema); validateAndLoadDatabaseHistory(connectorConfig, previousOffset, schema);
@ -114,8 +115,8 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
// If the binlog position is not available it is necessary to reexecute snapshot // If the binlog position is not available it is necessary to reexecute snapshot
if (validateSnapshotFeasibility(connectorConfig, previousOffset)) { if (validateSnapshotFeasibility(connectorConfig, previousOffset)) {
MySqlPartition partition = getTheOnlyPartition(previousOffsets); MySqlPartition partition = previousOffsets.getTheOnlyPartition();
previousOffsets.put(partition, null); previousOffsets.resetOffset(partition);
} }
taskContext = new MySqlTaskContext(connectorConfig, schema); taskContext = new MySqlTaskContext(connectorConfig, schema);

View File

@ -7,7 +7,6 @@
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
@ -23,6 +22,7 @@
import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector; import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock; import io.debezium.util.Clock;
@ -57,9 +57,9 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection); TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection);
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, schemaNameAdjuster, topicSelector, tableNameCaseSensitivity); this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, schemaNameAdjuster, topicSelector, tableNameCaseSensitivity);
this.schema.initializeStorage(); this.schema.initializeStorage();
Map<OraclePartition, OracleOffsetContext> previousOffsets = getPreviousOffsets(new OraclePartition.Provider(connectorConfig), Offsets<OraclePartition, OracleOffsetContext> previousOffsets = getPreviousOffsets(new OraclePartition.Provider(connectorConfig),
connectorConfig.getAdapter().getOffsetContextLoader()); connectorConfig.getAdapter().getOffsetContextLoader());
OracleOffsetContext previousOffset = getTheOnlyOffset(previousOffsets); OracleOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
if (previousOffset != null) { if (previousOffset != null) {
schema.recover(previousOffset); schema.recover(previousOffset);

View File

@ -6,7 +6,6 @@
package io.debezium.connector.postgresql; package io.debezium.connector.postgresql;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Map;
import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -23,6 +22,7 @@
import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.schema.DatabaseSchema; import io.debezium.schema.DatabaseSchema;
/** /**
@ -36,7 +36,7 @@ public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoord
private final Snapshotter snapshotter; private final Snapshotter snapshotter;
private final SlotState slotInfo; private final SlotState slotInfo;
public PostgresChangeEventSourceCoordinator(Map<PostgresPartition, PostgresOffsetContext> previousOffsets, public PostgresChangeEventSourceCoordinator(Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets,
ErrorHandler errorHandler, ErrorHandler errorHandler,
Class<? extends SourceConnector> connectorType, Class<? extends SourceConnector> connectorType,
CommonConnectorConfig connectorConfig, CommonConnectorConfig connectorConfig,

View File

@ -11,7 +11,6 @@
import java.time.Duration; import java.time.Duration;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
@ -38,6 +37,7 @@
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory; import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector; import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock; import io.debezium.util.Clock;
@ -94,10 +94,10 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector, valueConverterBuilder.build(typeRegistry)); schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector, valueConverterBuilder.build(typeRegistry));
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector); this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector);
final Map<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets( final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
new PostgresPartition.Provider(connectorConfig), new PostgresOffsetContext.Loader(connectorConfig)); new PostgresPartition.Provider(connectorConfig), new PostgresOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system(); final Clock clock = Clock.system();
final PostgresOffsetContext previousOffset = getTheOnlyOffset(previousOffsets); final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try { try {

View File

@ -7,7 +7,6 @@
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
@ -24,6 +23,7 @@
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory; import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory; import io.debezium.relational.history.DatabaseHistory;
@ -86,10 +86,10 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
this.schema = new SqlServerDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster); this.schema = new SqlServerDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster);
this.schema.initializeStorage(); this.schema.initializeStorage();
Map<SqlServerPartition, SqlServerOffsetContext> offsets = getPreviousOffsets( Offsets<SqlServerPartition, SqlServerOffsetContext> offsets = getPreviousOffsets(
new SqlServerPartition.Provider(connectorConfig), new SqlServerPartition.Provider(connectorConfig),
new SqlServerOffsetContext.Loader(connectorConfig)); new SqlServerOffsetContext.Loader(connectorConfig));
SqlServerOffsetContext previousOffset = getTheOnlyOffset(offsets); SqlServerOffsetContext previousOffset = offsets.getTheOnlyOffset();
if (previousOffset != null) { if (previousOffset != null) {
schema.recover(previousOffset); schema.recover(previousOffset);

View File

@ -23,13 +23,13 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.SingleThreadAccess; import io.debezium.annotation.SingleThreadAccess;
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.pipeline.ChangeEventSourceCoordinator; import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition; import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Clock; import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy; import io.debezium.util.ElapsedTimeStrategy;
@ -301,7 +301,7 @@ public void commit() throws InterruptedException {
/** /**
* Loads the connector's persistent offsets (if present) via the given loader. * Loads the connector's persistent offsets (if present) via the given loader.
*/ */
protected Map<P, O> getPreviousOffsets(Partition.Provider<P> provider, OffsetContext.Loader<O> loader) { protected Offsets<P, O> getPreviousOffsets(Partition.Provider<P> provider, OffsetContext.Loader<O> loader) {
Set<P> partitions = provider.getPartitions(); Set<P> partitions = provider.getPartitions();
OffsetReader<P, O, OffsetContext.Loader<O>> reader = new OffsetReader<>( OffsetReader<P, O, OffsetContext.Loader<O>> reader = new OffsetReader<>(
context.offsetStorageReader(), loader); context.offsetStorageReader(), loader);
@ -321,36 +321,6 @@ protected Map<P, O> getPreviousOffsets(Partition.Provider<P> provider, OffsetCon
LOGGER.info("No previous offsets found"); LOGGER.info("No previous offsets found");
} }
return offsets; return new Offsets<>(offsets);
}
/**
* Returns the offset of the only partition that the task is configured to use.
*
* This method is meant to be used only by the connectors that do not implement handling
* multiple partitions per task.
*/
protected P getTheOnlyPartition(Map<P, O> offsets) {
if (offsets.size() != 1) {
throw new DebeziumException("The task must be configured to use exactly one partition, "
+ offsets.size() + " found");
}
return offsets.entrySet().iterator().next().getKey();
}
/**
* Returns the offset of the only partition that the task is configured to use.
*
* This method is meant to be used only by the connectors that do not implement handling
* multiple partitions per task.
*/
protected O getTheOnlyOffset(Map<P, O> offsets) {
if (offsets.size() != 1) {
throw new DebeziumException("The task must be configured to use exactly one partition, "
+ offsets.size() + " found");
}
return offsets.entrySet().iterator().next().getValue();
} }
} }

View File

@ -13,7 +13,6 @@
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -33,6 +32,7 @@
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition; import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult; import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.pipeline.spi.SnapshotResult.SnapshotResultStatus; import io.debezium.pipeline.spi.SnapshotResult.SnapshotResultStatus;
@ -56,7 +56,7 @@ public class ChangeEventSourceCoordinator<P extends Partition, O extends OffsetC
*/ */
public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90); public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90);
private final Map<P, O> previousOffsets; private final Offsets<P, O> previousOffsets;
private final ErrorHandler errorHandler; private final ErrorHandler errorHandler;
private final ChangeEventSourceFactory<P, O> changeEventSourceFactory; private final ChangeEventSourceFactory<P, O> changeEventSourceFactory;
private final ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory; private final ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory;
@ -71,7 +71,7 @@ public class ChangeEventSourceCoordinator<P extends Partition, O extends OffsetC
private SnapshotChangeEventSourceMetrics snapshotMetrics; private SnapshotChangeEventSourceMetrics snapshotMetrics;
private StreamingChangeEventSourceMetrics streamingMetrics; private StreamingChangeEventSourceMetrics streamingMetrics;
public ChangeEventSourceCoordinator(Map<P, O> previousOffsets, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType, public ChangeEventSourceCoordinator(Offsets<P, O> previousOffsets, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType,
CommonConnectorConfig connectorConfig, CommonConnectorConfig connectorConfig,
ChangeEventSourceFactory<P, O> changeEventSourceFactory, ChangeEventSourceFactory<P, O> changeEventSourceFactory,
ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory, EventDispatcher<?> eventDispatcher, DatabaseSchema<?> schema) { ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory, EventDispatcher<?> eventDispatcher, DatabaseSchema<?> schema) {
@ -86,15 +86,8 @@ public ChangeEventSourceCoordinator(Map<P, O> previousOffsets, ErrorHandler erro
public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) { EventMetadataProvider metadataProvider) {
if (previousOffsets.size() != 1) { final P partition = previousOffsets.getTheOnlyPartition();
throw new ConnectException("The coordinator must be provided with exactly one partition, " final O previousOffset = previousOffsets.getTheOnlyOffset();
+ previousOffsets.size() + " found");
}
Map.Entry<P, O> entry = previousOffsets.entrySet().iterator().next();
final P partition = entry.getKey();
final O previousOffset = entry.getValue();
AtomicReference<LoggingContext.PreviousContext> previousLogContext = new AtomicReference<>(); AtomicReference<LoggingContext.PreviousContext> previousLogContext = new AtomicReference<>();
try { try {

View File

@ -0,0 +1,55 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.spi;
import java.util.Map;
import io.debezium.DebeziumException;
/**
* Keeps track the source partitions to be processed by the connector task and their respective offsets.
*/
public final class Offsets<P extends Partition, O extends OffsetContext> {
private final Map<P, O> offsets;
public Offsets(Map<P, O> offsets) {
this.offsets = offsets;
}
public void resetOffset(P partition) {
offsets.put(partition, null);
}
/**
* Returns the offset of the only partition that the task is configured to use.
*
* This method is meant to be used only by the connectors that do not implement handling
* multiple partitions per task.
*/
public P getTheOnlyPartition() {
if (offsets.size() != 1) {
throw new DebeziumException("The task must be configured to use exactly one partition, "
+ offsets.size() + " found");
}
return offsets.entrySet().iterator().next().getKey();
}
/**
* Returns the offset of the only offset that the task is configured to use.
*
* This method is meant to be used only by the connectors that do not implement handling
* multiple partitions per task.
*/
public O getTheOnlyOffset() {
if (offsets.size() != 1) {
throw new DebeziumException("The task must be configured to use exactly one partition, "
+ offsets.size() + " found");
}
return offsets.entrySet().iterator().next().getValue();
}
}