DBZ-1726 Introduce DataCollectionId#identifier()

This commit is contained in:
Chris Cranford 2020-03-09 11:08:23 -04:00 committed by Gunnar Morling
parent 4b6f0588c9
commit 38cf4ceae1
7 changed files with 109 additions and 17 deletions

View File

@ -79,6 +79,11 @@ public String replicaSetName() {
return replicaSetName; return replicaSetName;
} }
@Override
public String identifier() {
return replicaSetName + "." + dbName + "." + name;
}
@Override @Override
public int hashCode() { public int hashCode() {
return name.hashCode(); return name.hashCode();
@ -109,6 +114,6 @@ public String namespace() {
@Override @Override
public String toString() { public String toString() {
return replicaSetName + "." + dbName + "." + name; return identifier();
} }
} }

View File

@ -526,7 +526,7 @@ else if (snapshotContext.lastCollection) {
LOGGER.info("\t Finished exporting {} records for collection '{}'; total duration '{}'", docs, collectionId, LOGGER.info("\t Finished exporting {} records for collection '{}'; total duration '{}'", docs, collectionId,
Strings.duration(clock.currentTimeInMillis() - exportStart)); Strings.duration(clock.currentTimeInMillis() - exportStart));
snapshotProgressListener.dataCollectionSnapshotCompleted(collectionId.toString(), docs); snapshotProgressListener.dataCollectionSnapshotCompleted(collectionId, docs);
} }
}); });
} }

View File

@ -20,6 +20,7 @@
import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
/** /**
* Metrics related to the initial snapshot of a connector. * Metrics related to the initial snapshot of a connector.
@ -89,20 +90,20 @@ public String[] getMonitoredTables() {
} }
@Override @Override
public void monitoredTablesDetermined(Iterable<TableId> tableIds) { public void monitoredCollectionsDetermined(Iterable<DataCollectionId> dataCollectionIds) {
Iterator<TableId> it = tableIds.iterator(); Iterator<DataCollectionId> it = dataCollectionIds.iterator();
while (it.hasNext()) { while (it.hasNext()) {
TableId tableId = it.next(); DataCollectionId dataCollectionId = it.next();
this.remainingTables.put(tableId.toString(), ""); this.remainingTables.put(dataCollectionId.identifier(), "");
monitoredTables.add(tableId.toString()); monitoredTables.add(dataCollectionId.identifier());
} }
} }
@Override @Override
public void dataCollectionSnapshotCompleted(String dataCollectionId, long numRows) { public void dataCollectionSnapshotCompleted(DataCollectionId dataCollectionId, long numRows) {
rowsScanned.put(dataCollectionId, numRows); rowsScanned.put(dataCollectionId.identifier(), numRows);
remainingTables.remove(dataCollectionId); remainingTables.remove(dataCollectionId.identifier());
} }
@Override @Override

View File

@ -5,7 +5,11 @@
*/ */
package io.debezium.pipeline.source.spi; package io.debezium.pipeline.source.spi;
import static io.debezium.util.Iterators.toIterable;
import static io.debezium.util.Iterators.transform;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
/** /**
* A class invoked by {@link SnapshotChangeEventSource} whenever an important event or change of state happens. * A class invoked by {@link SnapshotChangeEventSource} whenever an important event or change of state happens.
@ -16,21 +20,29 @@ public interface SnapshotProgressListener {
void snapshotStarted(); void snapshotStarted();
void monitoredTablesDetermined(Iterable<TableId> tableIds); /**
* @deprecated Since 1.1, use {@link #monitoredCollectionsDetermined(Iterable)} instead.
*/
@Deprecated
default void monitoredTablesDetermined(Iterable<TableId> tableIds) {
monitoredCollectionsDetermined(toIterable(transform(tableIds.iterator(), tableId -> tableId)));
}
void monitoredCollectionsDetermined(Iterable<DataCollectionId> dataCollectionIds);
void snapshotCompleted(); void snapshotCompleted();
void snapshotAborted(); void snapshotAborted();
/** /**
* @deprecated Since 1.1, use {@link #dataCollectionSnapshotCompleted(String, long)} instead. * @deprecated Since 1.1, use {@link #dataCollectionSnapshotCompleted(DataCollectionId, long)} instead.
*/ */
@Deprecated @Deprecated
default void tableSnapshotCompleted(TableId id, long numRows) { default void tableSnapshotCompleted(TableId id, long numRows) {
dataCollectionSnapshotCompleted(id.toString(), numRows); dataCollectionSnapshotCompleted(id, numRows);
} }
void dataCollectionSnapshotCompleted(String dataCollectionId, long numRows); void dataCollectionSnapshotCompleted(DataCollectionId dataCollectionId, long numRows);
void rowsScanned(TableId tableId, long numRows); void rowsScanned(TableId tableId, long numRows);
@ -45,11 +57,11 @@ public void rowsScanned(TableId tableId, long numRows) {
} }
@Override @Override
public void monitoredTablesDetermined(Iterable<TableId> tableIds) { public void monitoredCollectionsDetermined(Iterable<DataCollectionId> dataCollectionIds) {
} }
@Override @Override
public void dataCollectionSnapshotCompleted(String dataCollectionId, long numRows) { public void dataCollectionSnapshotCompleted(DataCollectionId dataCollectionId, long numRows) {
} }
@Override @Override

View File

@ -117,6 +117,11 @@ public String table() {
return tableName; return tableName;
} }
@Override
public String identifier() {
return id;
}
@Override @Override
public int compareTo(TableId that) { public int compareTo(TableId that) {
if (this == that) { if (this == that) {
@ -147,7 +152,7 @@ public boolean equals(Object obj) {
@Override @Override
public String toString() { public String toString() {
return id; return identifier();
} }
/** /**

View File

@ -11,4 +11,11 @@
* @author Gunnar Morling * @author Gunnar Morling
*/ */
public interface DataCollectionId { public interface DataCollectionId {
/**
* Get the fully qualified identifier of the data collection.
*
* @return the collection's fully qualified identifier.
*/
String identifier();
} }

View File

@ -330,6 +330,68 @@ public void remove() {
}; };
} }
/**
* Get an {@link Iterable} from an {@link Iterator}.
*
* @param iterator the source iterator
* @param <T> the iterator type
*
* @return the iterable
*/
public static <T> Iterable<T> toIterable(Iterator<T> iterator) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
return iterator;
}
};
}
/**
* An iterator that is able to transform its contents to another type.
*
* @param <F> the source transform type
* @param <T> the destination transform type
*/
public static interface TransformedIterator<F, T> extends Iterator<T> {
T transform(F from);
}
/**
* Transform an iterator from a given type to super types.
*
* @param fromIterator the source iterator
* @param function the function to be applied when performing element transformation
*
* @param <F> the source transform type
* @param <T> the destination transform type
*
* @return the transformed iterator
*/
public static <F, T> Iterator<T> transform(Iterator<F> fromIterator, Function<? super F, ? extends T> function) {
return new TransformedIterator<F, T>() {
@Override
public boolean hasNext() {
return fromIterator.hasNext();
}
@Override
public T next() {
return transform(fromIterator.next());
}
@Override
public void remove() {
fromIterator.remove();
}
@Override
public T transform(F from) {
return function.apply(from);
}
};
}
/** /**
* A read only iterator that is able to preview the next value without consuming it or altering the behavior or semantics * A read only iterator that is able to preview the next value without consuming it or altering the behavior or semantics
* of the normal {@link Iterator} methods. * of the normal {@link Iterator} methods.