DBZ-6038 Add wallTime in mongo source info

This commit is contained in:
Yang Wu 2023-01-24 16:38:33 -08:00 committed by Jiri Pechanec
parent 0aed5d5fe9
commit 2a366cf398
5 changed files with 101 additions and 11 deletions

View File

@ -24,6 +24,7 @@ public MongoDbSourceInfoStructMaker(String connector, String version, CommonConn
.field(SourceInfo.ORDER, Schema.INT32_SCHEMA)
.field(SourceInfo.LSID, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.TXN_NUMBER, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.WALL_TIME, Schema.OPTIONAL_INT64_SCHEMA)
.build();
}
@ -44,6 +45,10 @@ public Struct struct(SourceInfo sourceInfo) {
.put(SourceInfo.TXN_NUMBER, sourceInfo.position().getChangeStreamSessionTxnId().txnNumber);
}
if (sourceInfo.wallTime() != 0L) {
struct.put(SourceInfo.WALL_TIME, sourceInfo.wallTime());
}
return struct;
}
}

View File

@ -97,7 +97,7 @@ public boolean hasOffset() {
}
public void readEvent(CollectionId collectionId, Instant timestamp) {
sourceInfo.collectionEvent(replicaSetName, collectionId);
sourceInfo.collectionEvent(replicaSetName, collectionId, 0L);
sourceInfo.lastOffset(replicaSetName);
}

View File

@ -82,6 +82,8 @@ public final class SourceInfo extends BaseSourceInfo {
public static final String LSID = "lsid";
public static final String TXN_NUMBER = "txnNumber";
public static final String WALL_TIME = "wallTime";
// Change Stream fields
private static final BsonTimestamp INITIAL_TIMESTAMP = new BsonTimestamp();
@ -100,6 +102,8 @@ public final class SourceInfo extends BaseSourceInfo {
private CollectionId collectionId;
private Position position = new Position(INITIAL_TIMESTAMP, null, null);
private long wallTime;
@Immutable
protected static final class Position {
private final BsonTimestamp ts;
@ -268,8 +272,8 @@ public Position lastPosition(String replicaSetName) {
* @return the source partition and offset {@link Struct}; never null
* @see #schema()
*/
public void collectionEvent(String replicaSetName, CollectionId collectionId) {
onEvent(replicaSetName, collectionId, positionsByReplicaSetName.get(replicaSetName));
public void collectionEvent(String replicaSetName, CollectionId collectionId, long wallTime) {
onEvent(replicaSetName, collectionId, positionsByReplicaSetName.get(replicaSetName), wallTime);
}
/**
@ -292,21 +296,25 @@ public void initialPosition(String replicaSetName, BsonDocument oplogEvent) {
}
positionsByReplicaSetName.put(replicaSetName, position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, 0L);
}
public void changeStreamEvent(String replicaSetName, ChangeStreamDocument<BsonDocument> changeStreamEvent) {
Position position = INITIAL_POSITION;
String namespace = "";
long wallTime = 0L;
if (changeStreamEvent != null) {
BsonTimestamp ts = changeStreamEvent.getClusterTime();
position = Position.changeStreamPosition(ts, changeStreamEvent.getResumeToken().getString("_data").getValue(),
MongoUtil.getChangeStreamSessionTransactionId(changeStreamEvent));
namespace = changeStreamEvent.getNamespace().getFullName();
if (changeStreamEvent.getWallTime() != null) {
wallTime = changeStreamEvent.getWallTime().getValue();
}
}
positionsByReplicaSetName.put(replicaSetName, position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime);
}
/**
@ -319,10 +327,11 @@ protected static BsonTimestamp extractEventTimestamp(BsonDocument oplogEvent) {
return oplogEvent != null ? oplogEvent.getTimestamp("ts") : null;
}
private void onEvent(String replicaSetName, CollectionId collectionId, Position position) {
private void onEvent(String replicaSetName, CollectionId collectionId, Position position, long wallTime) {
this.replicaSetName = replicaSetName;
this.position = (position == null) ? INITIAL_POSITION : position;
this.collectionId = collectionId;
this.wallTime = wallTime;
}
/**
@ -489,6 +498,10 @@ String replicaSetName() {
return replicaSetName;
}
long wallTime() {
return wallTime;
}
@Override
public String toString() {
return "SourceInfo [sourcePartitionsByReplicaSetName=" + sourcePartitionsByReplicaSetName

View File

@ -6,8 +6,10 @@
package io.debezium.connector.mongodb;
import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS;
import static io.debezium.junit.EqualityCheck.GREATER_THAN_OR_EQUAL;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
import static java.time.temporal.ChronoUnit.MILLIS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
@ -239,6 +241,64 @@ public void shouldValidateAcceptableConfiguration() {
assertNoConfigurationErrors(result, MongoDbConnectorConfig.CAPTURE_MODE);
}
@Test
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 6, reason = "wallTime support in Change Stream is officially released in Mongo 6.0.")
public void shouldProvideWallTime() throws InterruptedException {
config = TestHelper.getConfiguration(mongo).edit()
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.build();
context = new MongoDbTaskContext(config);
TestHelper.cleanDatabase(mongo, "dbit");
start(MongoDbConnector.class, config);
waitForStreamingRunning("mongodb", "mongo");
// Insert record
final Instant timestamp = Instant.now();
ObjectId objId = new ObjectId();
Document obj = new Document("_id", objId);
insertDocuments("dbit", "c1", obj);
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.allRecordsInOrder().size()).isEqualTo(1);
assertNoRecordsToConsume();
final SourceRecord record = records.allRecordsInOrder().get(0);
final Struct value = (Struct) record.value();
final long wallTime = value.getStruct(Envelope.FieldName.SOURCE).getInt64(SourceInfo.WALL_TIME);
Instant instant = Instant.ofEpochMilli(wallTime);
assertThat(instant.truncatedTo(MILLIS).getNano()).isNotZero();
assertThat(wallTime).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
}
@Test
@SkipWhenDatabaseVersion(check = GREATER_THAN_OR_EQUAL, major = 6, reason = "wallTime support in Change Stream is officially released in Mongo 6.0.")
public void shouldNotProvideWallTimeForOlderVersions() throws InterruptedException {
config = TestHelper.getConfiguration(mongo).edit()
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.build();
context = new MongoDbTaskContext(config);
TestHelper.cleanDatabase(mongo, "dbit");
start(MongoDbConnector.class, config);
waitForStreamingRunning("mongodb", "mongo");
// Insert record
ObjectId objId = new ObjectId();
Document obj = new Document("_id", objId);
insertDocuments("dbit", "c1", obj);
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.allRecordsInOrder().size()).isEqualTo(1);
assertNoRecordsToConsume();
final SourceRecord record = records.allRecordsInOrder().get(0);
final Struct value = (Struct) record.value();
// For pre-6.0 version, wallTime should not be presented
assertThat(value.getStruct(Envelope.FieldName.SOURCE).getInt64(SourceInfo.WALL_TIME)).isNull();
}
@Test
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 6, reason = "Pre-image support in Change Stream is officially released in Mongo 6.0.")
public void shouldConsumePreImage() throws InterruptedException {

View File

@ -98,7 +98,7 @@ public void shouldSetAndReturnRecordedOffset() {
assertThat(ts.getTime()).isEqualTo(100);
assertThat(ts.getInc()).isEqualTo(2);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"), 0L);
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(100_000);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(2);
@ -121,7 +121,7 @@ public void shouldReturnOffsetForUnusedReplicaName() {
assertThat(ts.getTime()).isEqualTo(0);
assertThat(ts.getInc()).isEqualTo(0);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"), 0L);
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(0);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(0);
@ -152,7 +152,7 @@ public void shouldReturnRecordedOffsetForUsedReplicaName() {
assertThat(ts.getTime()).isEqualTo(100);
assertThat(ts.getInc()).isEqualTo(2);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"), 0L);
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(100_000);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(2);
@ -176,7 +176,7 @@ public void shouldReturnOffsetForUnusedReplicaNameDuringInitialSync() {
assertThat(ts.getTime()).isEqualTo(0);
assertThat(ts.getInc()).isEqualTo(0);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"), 0L);
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(0);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(0);
@ -209,7 +209,7 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSync() {
assertThat(ts.getTime()).isEqualTo(100);
assertThat(ts.getInc()).isEqualTo(2);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"), 0L);
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(100_000);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(2);
@ -238,6 +238,17 @@ public void connectorIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
@Test
public void wallTimeIsPresent() {
final BsonDocument event = new BsonDocument().append("ts", new BsonTimestamp(100, 2))
.append("h", new BsonInt64(Long.valueOf(1987654321)))
.append("ns", new BsonString("dbA.collectA"));
source.initialPosition("rs", event);
assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isNull();
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"), 10L);
assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isEqualTo(10L);
}
@Test
public void schemaIsCorrect() {
final Schema schema = SchemaBuilder.struct()
@ -254,6 +265,7 @@ public void schemaIsCorrect() {
.field("ord", Schema.INT32_SCHEMA)
.field("lsid", Schema.OPTIONAL_STRING_SCHEMA)
.field("txnNumber", Schema.OPTIONAL_INT64_SCHEMA)
.field("wallTime", Schema.OPTIONAL_INT64_SCHEMA)
.build();
assertConnectSchemasAreEqual(null, source.schema(), schema);