diff --git a/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java b/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java index 8eb5aeb2f..5c89737f3 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java @@ -6,6 +6,7 @@ package io.debezium.connector.common; import java.util.Collection; +import java.util.Collections; import java.util.function.Supplier; import javax.management.MalformedObjectNameException; @@ -24,17 +25,19 @@ */ public class CdcSourceTaskContext { - private static final String[] EMPTY_CAPTURED_LIST = new String[0]; - private final String connectorType; private final String connectorName; private final Clock clock; + + /** + * Obtains the data collections captured at the point of invocation. + */ private final Supplier> collectionsSupplier; public CdcSourceTaskContext(String connectorType, String connectorName, Supplier> collectionsSupplier) { this.connectorType = connectorType; this.connectorName = connectorName; - this.collectionsSupplier = collectionsSupplier; + this.collectionsSupplier = collectionsSupplier != null ? collectionsSupplier : Collections::emptyList; this.clock = Clock.system(); } @@ -68,18 +71,9 @@ public ObjectName metricName(String contextName) throws MalformedObjectNameExcep } public String[] capturedDataCollections() { - if (collectionsSupplier == null) { - return EMPTY_CAPTURED_LIST; - } - final Collection collections = collectionsSupplier.get(); - if (collections == null) { - return EMPTY_CAPTURED_LIST; - } - String[] ret = new String[collections.size()]; - int i = 0; - for (DataCollectionId collection: collections) { - ret[i++] = collection.toString(); - } - return ret; + return collectionsSupplier.get() + .stream() + .map(DataCollectionId::toString) + .toArray(String[]::new); } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index 0fed83734..1c9c8d07e 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -76,13 +76,9 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector t connectorConfig.getLogicalName()); } - // TODO One could argue that snapshot events shouldn't have to go through the dispatcher but rather to the queue - // directly; It's done though in order to handle heartbeat, JMX and other things consistently with streaming, so - // it might be beneficial eventually public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException { // TODO Handle Heartbeat - DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId); // TODO handle as per inconsistent schema info option diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java index 23a20bd78..edb9f0496 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java @@ -15,10 +15,18 @@ */ public interface DataChangeEventListener { + /** + * Invoked if an event is processed for a captured table. + */ void onEvent(String event); + + /** + * Invoked for events pertaining to non-whitelisted tables. + */ void onSkippedEvent(String event); static DataChangeEventListener NO_OP = new DataChangeEventListener() { + @Override public void onSkippedEvent(String event) { }