DBZ-6522 Resume token handling for documents with complex ids

This commit is contained in:
ani-sha 2024-06-10 18:14:19 +05:30 committed by Jakub Cechacek
parent 76108b4835
commit 774edcacfa
7 changed files with 159 additions and 33 deletions

View File

@ -164,7 +164,16 @@ public String lastResumeToken() {
public BsonDocument lastResumeTokenDoc() {
final String data = sourceInfo.lastResumeToken();
return (data == null) ? null : ResumeTokens.fromData(data);
if (data == null) {
return null;
}
try {
return ResumeTokens.fromBase64(data);
}
catch (Exception e) {
LOGGER.info("Old resume token format detected, attempting to parse as string " + data);
return ResumeTokens.fromData(data);
}
}
public BsonTimestamp lastTimestamp() {

View File

@ -8,7 +8,6 @@
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -204,10 +203,7 @@ protected ChangeStreamIterable<BsonDocument> initChangeStream(MongoClient client
}
if (offsetContext.lastResumeToken() != null) {
LOGGER.info("Resuming streaming from token '{}'", offsetContext.lastResumeToken());
final BsonDocument doc = new BsonDocument();
doc.put("_data", new BsonString(offsetContext.lastResumeToken()));
stream.resumeAfter(doc);
stream.resumeAfter(offsetContext.lastResumeTokenDoc());
}
else if (offsetContext.lastTimestamp() != null) {
LOGGER.info("Resuming streaming from operation time '{}'", offsetContext.lastTimestamp());

View File

@ -7,11 +7,18 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Base64;
import org.bson.BsonBinaryReader;
import org.bson.BsonBinaryWriter;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.codecs.BsonDocumentCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.bson.io.BasicOutputBuffer;
import io.debezium.util.HexConverter;
@ -37,6 +44,25 @@ public static BsonTimestamp getTimestamp(BsonDocument resumeToken) {
return new BsonTimestamp(timestampAsLong);
}
public static String toBase64(BsonDocument resumeToken) {
var out = new BasicOutputBuffer();
var writer = new BsonBinaryWriter(out);
var codec = new BsonDocumentCodec();
var context = EncoderContext.builder().build();
codec.encode(writer, resumeToken, context);
return Base64.getEncoder().encodeToString(out.toByteArray());
}
public static BsonDocument fromBase64(String data) {
var bytes = Base64.getDecoder().decode(data);
var reader = new BsonBinaryReader(ByteBuffer.wrap(bytes));
var codec = new BsonDocumentCodec();
var context = DecoderContext.builder().build();
return codec.decode(reader, context);
}
public static BsonValue getData(BsonDocument resumeToken) {
if (!resumeToken.containsKey("_data")) {
throw new IllegalArgumentException("Expected _data field in resume token");
@ -45,10 +71,6 @@ public static BsonValue getData(BsonDocument resumeToken) {
return resumeToken.get("_data");
}
public static String getDataString(BsonDocument resumeToken) {
return getData(resumeToken).asString().getValue();
}
public static BsonDocument fromData(String data) {
return (data == null) ? null : new BsonDocument("_data", new BsonString(data));
}
@ -78,5 +100,4 @@ else if (data.isBinary()) {
private ResumeTokens() {
throw new AssertionError(getClass().getName() + " should not be instantiated");
}
}

View File

@ -193,14 +193,14 @@ public void noEvent(ResumableChangeStreamEvent<BsonDocument> event) {
if (event.hasDocument()) {
return;
}
noEvent(ResumeTokens.getDataString(event.resumeToken));
noEvent(ResumeTokens.toBase64(event.resumeToken));
}
public void noEvent(MongoChangeStreamCursor<?> cursor) {
if (cursor == null || cursor.getResumeToken() == null) {
return;
}
noEvent(ResumeTokens.getDataString(cursor.getResumeToken()));
noEvent(ResumeTokens.toBase64(cursor.getResumeToken()));
}
public void noEvent(BsonTimestamp timestamp) {
@ -230,7 +230,7 @@ public void changeStreamEvent(ChangeStreamDocument<BsonDocument> changeStreamEve
String namespace = "";
long wallTime = 0L;
if (changeStreamEvent != null) {
String resumeToken = ResumeTokens.getDataString(changeStreamEvent.getResumeToken());
String resumeToken = ResumeTokens.toBase64(changeStreamEvent.getResumeToken());
BsonTimestamp ts = changeStreamEvent.getClusterTime();
position = Position.changeStreamPosition(ts, resumeToken, MongoUtils.getChangeStreamSessionTransactionId(changeStreamEvent));
namespace = changeStreamEvent.getNamespace().getFullName();

View File

@ -15,7 +15,6 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.bson.Document;
import org.junit.After;
@ -33,13 +32,13 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.CaptureScope;
import io.debezium.connector.mongodb.connection.MongoDbConnections;
import io.debezium.connector.mongodb.junit.MongoDbDatabaseProvider;
import io.debezium.connector.mongodb.junit.MongoDbDatabaseVersionResolver;
import io.debezium.connector.mongodb.junit.MongoDbPlatform;
import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.testing.testcontainers.MongoDbReplicaSet;
import io.debezium.testing.testcontainers.util.DockerUtils;
@ -159,7 +158,7 @@ public void shouldConsumeAllEventsFromDatabaseWithPermissions() throws Interrupt
@Test
public void shouldFailWithoutPermissions() {
var logInterceptor = new LogInterceptor(ErrorHandler.class);
var logInterceptor = new LogInterceptor(MongoDbConnections.class);
// Populate collection
populateCollection(TEST_DATABASE, TEST_COLLECTION, INIT_DOCUMENT_COUNT);
@ -170,9 +169,12 @@ public void shouldFailWithoutPermissions() {
// Start the connector ...
start(MongoDbConnector.class, config);
// Connector should fail while authenticating with the database
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.until(() -> logInterceptor.containsMessage("Error while attempting to Setting resume token"));
// Connector should fail after 2 retries
Awaitility.await().pollDelay(10, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS).until(() -> !isEngineRunning.get());
Assertions.assertThat(logInterceptor.containsMessage("The maximum number of 2 retries has been attempted")).isTrue();
// Awaitility.await().pollDelay(10, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS).until(() -> !isEngineRunning.get());
// Assertions.assertThat(logInterceptor.containsMessage("The maximum number of 2 retries has been attempted")).isTrue();
}
protected void consumeAndVerifyFromInitialSnapshot(String topic, int expectedRecords) throws InterruptedException {

View File

@ -34,6 +34,7 @@
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
@ -2703,6 +2704,94 @@ public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws E
assertThat(value.getInt64(Envelope.FieldName.TIMESTAMP)).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
}
@FixFor("DBZ-6522")
@Test
public void shouldConsumeDocumentsWithComplexIds() throws Exception {
config = TestHelper.getConfiguration(mongo).edit()
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.build();
context = new MongoDbTaskContext(config);
TestHelper.cleanDatabase(mongo, "dbit");
start(MongoDbConnector.class, config);
waitForStreamingRunning("mongodb", "mongo");
Document doc = new Document("_id", 4367438483L).append("name", "John Doe").append("age", 25);
insertDocuments("dbit", "colA", doc);
SourceRecords records = consumeRecordsByTopic(1);
AssertionsForInterfaceTypes.assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
stopConnector();
start(MongoDbConnector.class, config);
waitForStreamingRunning("mongodb", "mongo");
Document doc1 = new Document("_id", 1).append("name", "Jane Doe").append("age", 22);
insertDocuments("dbit", "colA", doc1);
records = consumeRecordsByTopic(1);
AssertionsForInterfaceTypes.assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
stopConnector();
}
@FixFor("DBZ-6522")
@Test
public void shouldConsumeEventsFromOffsetWithDataResumeToken() throws InterruptedException {
LogInterceptor logInterceptor = new LogInterceptor(MongoDbOffsetContext.class);
config = TestHelper.getConfiguration(mongo).edit()
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.build();
Map<Map<String, ?>, Map<String, ?>> offset = Map.of(
Collect.hashMapOf("server_id", "mongo"),
Collect.hashMapOf(
SourceInfo.TIMESTAMP, 0,
SourceInfo.ORDER, -1,
SourceInfo.RESUME_TOKEN,
"826666EDD6000000032B0229296E04"));
storeOffsets(config, offset);
// Set up the replication context for connections ...
context = new MongoDbTaskContext(config);
// Cleanup database
TestHelper.cleanDatabase(mongo, "dbit");
// Before starting the connector, add data to the databases ...
insertDocuments("dbit", "colA", new Document("_id", 1).append("name", "John"));
// Start the connector ...
start(MongoDbConnector.class, config);
waitForStreamingRunning("mongodb", "mongo");
insertDocuments("dbit", "colA", new Document("_id", 24734982398L).append("name", "Jane"));
// Consume the records ...
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
assertThat(logInterceptor.containsMessage("Old resume token format detected, attempting to parse as string 826666EDD6000000032B0229296E04")).isTrue();
// Stop the connector ...
stopConnector();
// Restart the connector ...
start(MongoDbConnector.class, config);
insertDocuments("dbit", "colA", new Document("_id", 24734982399L).append("name", "Jack"));
// Consume the records ...
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
}
@Test
public void shouldAlwaysSnapshot() throws Exception {

View File

@ -37,17 +37,26 @@ public class SourceInfoTest {
private static final String REPLICA_SET_NAME = "myReplicaSet";
// FROM: https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens-from-change-events
private static final String CHANGE_RESUME_TOKEN_DATA = "82635019A0000000012B042C0100296E5A1004AB1154ACA" +
"CD849A48C61756D70D3B21F463C6F7065726174696F6E54" +
"797065003C696E736572740046646F63756D656E744B657" +
"90046645F69640064635019A078BE67426D7CF4D2000004";
private static final BsonDocument CHANGE_RESUME_TOKEN = ResumeTokens.fromData(CHANGE_RESUME_TOKEN_DATA);
// For resume token with all fields included
private static final String CHANGE_RESUME_TOKEN_JSON = "{\n" +
" \"_data\": \"82647A41A6000000012B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E666F6F002B021E626172002B04000004\",\n" +
" \"_typeBits\":\n" +
" {\n" +
" \"$binary\":\n" +
" {\n" +
" \"base64\": \"gkAB\",\n" +
" \"subType\": \"00\"\n" +
" }\n" +
" }\n" +
"}";
private static final BsonDocument CHANGE_RESUME_TOKEN = BsonDocument.parse(CHANGE_RESUME_TOKEN_JSON);
private static final String CHANGE_RESUME_TOKEN_STRING = ResumeTokens.toBase64(CHANGE_RESUME_TOKEN);
private static final BsonTimestamp CHANGE_TIMESTAMP = new BsonTimestamp(1666193824, 1);
// FROM: https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens-from-aggregate
private static final String CURSOR_RESUME_TOKEN_DATA = "8263515EAC000000022B0429296E1404";
private static final BsonDocument CURSOR_RESUME_TOKEN = ResumeTokens.fromData(CURSOR_RESUME_TOKEN_DATA);
private static final String CURSOR_RESUME_TOKEN_JSON = "{ \"_data\" : \"8263515EAC000000022B0429296E1404\" }";
private static final BsonDocument CURSOR_RESUME_TOKEN = BsonDocument.parse(CURSOR_RESUME_TOKEN_JSON);
private static final String CURSOR_RESUME_TOKEN_STRING = ResumeTokens.toBase64(CURSOR_RESUME_TOKEN);
private SourceInfo source;
private MongoDbOffsetContext context;
@ -151,14 +160,14 @@ public void assertSourceInfoContents(SourceInfo source,
@Test
public void shouldSetAndReturnRecordedOffset() {
var cursor = mockEventChangeStreamCursor();
assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, null);
assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_STRING, CHANGE_TIMESTAMP, null);
// Create a new source info and set the offset ...
var position = source.position();
SourceInfo newSource = createSourceInfo();
newSource.setPosition(position);
assertSourceInfoContents(newSource, true, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, null);
assertSourceInfoContents(newSource, true, CHANGE_RESUME_TOKEN_STRING, CHANGE_TIMESTAMP, null);
}
@Test
@ -170,13 +179,13 @@ public void shouldReturnOffsetForUnusedReplicaName() {
@Test
public void shouldReturnRecordedOffsetForUsedReplicaName() {
var cursor = mockEventChangeStreamCursor();
assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, null);
assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_STRING, CHANGE_TIMESTAMP, null);
}
@Test
public void shouldReturnRecordedOffsetForUsedReplicaNameWithoutEvent() {
var cursor = mockNoEventChangeStreamCursor();
assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_DATA, null, null);
assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_STRING, null, null);
}
@Test
@ -190,7 +199,7 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSnapshot()
source.startInitialSnapshot();
var cursor = mockEventChangeStreamCursor();
assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, "true");
assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_STRING, CHANGE_TIMESTAMP, "true");
}
@Test
@ -198,7 +207,7 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSnapshotWit
source.startInitialSnapshot();
var cursor = mockNoEventChangeStreamCursor();
assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_DATA, null, "true");
assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_STRING, null, "true");
}
@Test