DBZ-978 Misc. clean-up

This commit is contained in:
Gunnar Morling 2018-12-13 21:17:23 +01:00
parent edec1c3090
commit 47f22feddc
3 changed files with 18 additions and 20 deletions

View File

@ -6,6 +6,7 @@
package io.debezium.connector.common;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Supplier;
import javax.management.MalformedObjectNameException;
@ -24,17 +25,19 @@
*/
public class CdcSourceTaskContext {
private static final String[] EMPTY_CAPTURED_LIST = new String[0];
private final String connectorType;
private final String connectorName;
private final Clock clock;
/**
* Obtains the data collections captured at the point of invocation.
*/
private final Supplier<Collection<? extends DataCollectionId>> collectionsSupplier;
public CdcSourceTaskContext(String connectorType, String connectorName, Supplier<Collection<? extends DataCollectionId>> collectionsSupplier) {
this.connectorType = connectorType;
this.connectorName = connectorName;
this.collectionsSupplier = collectionsSupplier;
this.collectionsSupplier = collectionsSupplier != null ? collectionsSupplier : Collections::emptyList;
this.clock = Clock.system();
}
@ -68,18 +71,9 @@ public ObjectName metricName(String contextName) throws MalformedObjectNameExcep
}
public String[] capturedDataCollections() {
if (collectionsSupplier == null) {
return EMPTY_CAPTURED_LIST;
}
final Collection<? extends DataCollectionId> collections = collectionsSupplier.get();
if (collections == null) {
return EMPTY_CAPTURED_LIST;
}
String[] ret = new String[collections.size()];
int i = 0;
for (DataCollectionId collection: collections) {
ret[i++] = collection.toString();
}
return ret;
return collectionsSupplier.get()
.stream()
.map(DataCollectionId::toString)
.toArray(String[]::new);
}
}

View File

@ -76,13 +76,9 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
connectorConfig.getLogicalName());
}
// TODO One could argue that snapshot events shouldn't have to go through the dispatcher but rather to the queue
// directly; It's done though in order to handle heartbeat, JMX and other things consistently with streaming, so
// it might be beneficial eventually
public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
// TODO Handle Heartbeat
DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
// TODO handle as per inconsistent schema info option

View File

@ -15,10 +15,18 @@
*/
public interface DataChangeEventListener {
/**
* Invoked if an event is processed for a captured table.
*/
void onEvent(String event);
/**
* Invoked for events pertaining to non-whitelisted tables.
*/
void onSkippedEvent(String event);
static DataChangeEventListener NO_OP = new DataChangeEventListener() {
@Override
public void onSkippedEvent(String event) {
}