DBZ-2216 Fix transaction-id resolution with MongoDB 4.2

This commit is contained in:
Chris Cranford 2020-06-17 13:47:45 -04:00 committed by Jiri Pechanec
parent 95b0e76aa3
commit 4f561600a4
7 changed files with 96 additions and 18 deletions

View File

@ -63,6 +63,10 @@ public String getTransactionId(DataCollectionId source, OffsetContext offset, Ob
if (source == null) {
return null;
}
final String sessionTxnId = sourceInfo.getString(SourceInfo.SESSION_TXN_ID);
if (sessionTxnId != null) {
return sessionTxnId;
}
final Long operationId = sourceInfo.getInt64(SourceInfo.OPERATION_ID);
if (operationId == null) {
return null;

View File

@ -25,6 +25,7 @@ public MongoDbSourceInfoStructMaker(String connector, String version, CommonConn
.field(SourceInfo.ORDER, Schema.INT32_SCHEMA)
.field(SourceInfo.OPERATION_ID, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.TX_ORD, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.SESSION_TXN_ID, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
@ -39,7 +40,8 @@ public Struct struct(SourceInfo sourceInfo) {
.put(SourceInfo.REPLICA_SET_NAME, sourceInfo.replicaSetName())
.put(SourceInfo.COLLECTION, sourceInfo.collectionId().name())
.put(SourceInfo.ORDER, sourceInfo.position().getInc())
.put(SourceInfo.OPERATION_ID, sourceInfo.position().getOperationId());
.put(SourceInfo.OPERATION_ID, sourceInfo.position().getOperationId())
.put(SourceInfo.SESSION_TXN_ID, sourceInfo.position().getSessionTxnId());
sourceInfo.transactionPosition().ifPresent(transactionPosition -> struct.put(SourceInfo.TX_ORD, transactionPosition));

View File

@ -304,8 +304,7 @@ private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, D
return true;
}
try {
final Long operationId = event.getLong(SourceInfo.OPERATION_ID);
dispatcher.dispatchTransactionStartedEvent(Long.toString(operationId), oplogContext.getOffset());
dispatcher.dispatchTransactionStartedEvent(getTransactionId(event), oplogContext.getOffset());
for (Document change : txChanges) {
final boolean r = handleOplogEvent(primaryAddress, change, event, ++txOrder, oplogContext, context);
if (!r) {
@ -402,6 +401,15 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto
return new MongoDbOffsetContext(new SourceInfo(connectorConfig), new TransactionContext(), positions);
}
private static String getTransactionId(Document event) {
final Long operationId = event.getLong(SourceInfo.OPERATION_ID);
if (operationId != null && operationId != 0L) {
return Long.toString(operationId);
}
return MongoUtil.getOplogSessionTransactionId(event);
}
/**
* A context associated with a given replica set oplog read operation.
*/

View File

@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.regex.Matcher;
@ -22,6 +23,7 @@
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import io.debezium.DebeziumException;
import io.debezium.function.BlockingConsumer;
import io.debezium.util.Strings;
@ -219,6 +221,22 @@ public static ServerAddress parseAddress(String addressStr) {
return null;
}
/**
* Helper function to extract the session transaction-id from an oplog event.
*
* @param oplogEvent the oplog event
* @return the session transaction id from the oplog event
*/
public static String getOplogSessionTransactionId(Document oplogEvent) {
if (!(oplogEvent.containsKey("lsid") && oplogEvent.containsKey("txnNumber"))) {
throw new DebeziumException("Oplog event does not contain lsid and txnNumber fields");
}
final String lsid = oplogEvent.get("lsid", Document.class).get("id", UUID.class).toString();
final Long txnNumber = oplogEvent.getLong("txnNumber");
return lsid + ":" + txnNumber;
}
/**
* Parse the comma-separated list of server addresses. The format of the supplied string is one of the following:
*

View File

@ -75,11 +75,12 @@ public final class SourceInfo extends BaseSourceInfo {
public static final String ORDER = "ord";
public static final String OPERATION_ID = "h";
public static final String TX_ORD = "tord";
public static final String SESSION_TXN_ID = "stxnid";
public static final String INITIAL_SYNC = "initsync";
public static final String COLLECTION = "collection";
private static final BsonTimestamp INITIAL_TIMESTAMP = new BsonTimestamp();
private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null, 0);
private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null, 0, null);
private final ConcurrentMap<String, Map<String, String>> sourcePartitionsByReplicaSetName = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Position> positionsByReplicaSetName = new ConcurrentHashMap<>();
@ -99,15 +100,17 @@ protected static final class Position {
private final Long opId;
private final BsonTimestamp ts;
private final long txOrder;
private final String sessionTxnId;
public Position(int ts, int order, Long opId, long txOrder) {
this(new BsonTimestamp(ts, order), opId, txOrder);
public Position(int ts, int order, Long opId, long txOrder, String sessionTxnId) {
this(new BsonTimestamp(ts, order), opId, txOrder, sessionTxnId);
}
public Position(BsonTimestamp ts, Long opId, long txOrder) {
public Position(BsonTimestamp ts, Long opId, long txOrder, String sessionTxnId) {
this.ts = ts;
this.opId = opId;
this.txOrder = txOrder;
this.sessionTxnId = sessionTxnId;
assert this.ts != null;
}
@ -127,6 +130,10 @@ public Long getOperationId() {
return this.opId;
}
public String getSessionTxnId() {
return sessionTxnId;
}
public OptionalLong getTxOrder() {
return txOrder == 0 ? OptionalLong.empty() : OptionalLong.of(txOrder);
}
@ -210,11 +217,13 @@ public OptionalLong lastOffsetTxOrder(String replicaSetName) {
return Collect.hashMapOf(TIMESTAMP, Integer.valueOf(existing.getTime()),
ORDER, Integer.valueOf(existing.getInc()),
OPERATION_ID, existing.getOperationId(),
SESSION_TXN_ID, existing.getSessionTxnId(),
INITIAL_SYNC, true);
}
Map<String, Object> offset = Collect.hashMapOf(TIMESTAMP, Integer.valueOf(existing.getTime()),
ORDER, Integer.valueOf(existing.getInc()),
OPERATION_ID, existing.getOperationId());
OPERATION_ID, existing.getOperationId(),
SESSION_TXN_ID, existing.getSessionTxnId());
existing.getTxOrder().ifPresent(txOrder -> offset.put(TX_ORD, txOrder));
@ -251,7 +260,8 @@ public void opLogEvent(String replicaSetName, Document oplogEvent, Document mast
if (oplogEvent != null) {
BsonTimestamp ts = extractEventTimestamp(masterEvent);
Long opId = masterEvent.getLong("h");
position = new Position(ts, opId, orderInTx);
String sessionTxnId = extractSessionTxnId(masterEvent);
position = new Position(ts, opId, orderInTx, sessionTxnId);
namespace = oplogEvent.getString("ns");
}
positionsByReplicaSetName.put(replicaSetName, position);
@ -282,6 +292,27 @@ protected static BsonTimestamp extractEventTimestamp(Document oplogEvent) {
return oplogEvent != null ? oplogEvent.get("ts", BsonTimestamp.class) : null;
}
/**
* Utility to extract the {@link String unique transaction id} value from the event.
*
* @param oplogEvent the event
* @return the session transaction id or null
*/
protected static String extractSessionTxnId(Document oplogEvent) {
// In MongoDB prior to 4.2, the h field is populated.
// For backward compatibility if h is not present or contains a zero value, then proeeed to extract
// the session transaction unique identifier value.
Long opId = oplogEvent.getLong("h");
if (opId == null || opId == 0L) {
// For MongoDB 4.2+, the h field no longer has a non-zero value.
// In this case, the lsid and the associated txnNumber fields must be extracted and combined to
// represent a unique identifier for the individual operation. Therefore, the return value will
// carry the same semantics as h did for MongoDB platforms prior to 4.2.
return MongoUtil.getOplogSessionTransactionId(oplogEvent);
}
return null;
}
private void onEvent(String replicaSetName, CollectionId collectionId, Position position) {
this.replicaSetName = replicaSetName;
this.position = (position == null) ? INITIAL_POSITION : position;
@ -324,7 +355,8 @@ public boolean setOffsetFor(String replicaSetName, Map<String, ?> sourceOffset)
int order = intOffsetValue(sourceOffset, ORDER);
long operationId = longOffsetValue(sourceOffset, OPERATION_ID);
long txOrder = longOffsetValue(sourceOffset, TX_ORD);
positionsByReplicaSetName.put(replicaSetName, new Position(time, order, operationId, txOrder));
String sessionTxnId = stringOffsetValue(sourceOffset, SESSION_TXN_ID);
positionsByReplicaSetName.put(replicaSetName, new Position(time, order, operationId, txOrder, sessionTxnId));
return true;
}
@ -410,6 +442,14 @@ private static long longOffsetValue(Map<String, ?> values, String key) {
}
}
private static String stringOffsetValue(Map<String, ?> values, String key) {
Object obj = values.get(key);
if (obj == null) {
return null;
}
return (String) obj;
}
private static boolean booleanOffsetValue(Map<String, ?> values, String key) {
Object obj = values.get(key);
if (obj != null && obj instanceof Boolean) {

View File

@ -168,16 +168,21 @@ protected void insertDocumentsInTx(String dbName, String collectionName, Documen
final MongoCollection<Document> collection = db.getCollection(collectionName);
final ClientSession session = mongo.startSession();
session.startTransaction();
try {
session.startTransaction();
final InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(true);
for (Document document : documents) {
assertThat(document).isNotNull();
assertThat(document.size()).isGreaterThan(0);
collection.insertOne(session, document, insertOptions);
final InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(true);
for (Document document : documents) {
assertThat(document).isNotNull();
assertThat(document.size()).isGreaterThan(0);
collection.insertOne(session, document, insertOptions);
}
session.commitTransaction();
}
finally {
session.close();
}
session.commitTransaction();
});
}

View File

@ -264,6 +264,7 @@ public void schemaIsCorrect() {
.field("ord", Schema.INT32_SCHEMA)
.field("h", Schema.OPTIONAL_INT64_SCHEMA)
.field("tord", Schema.OPTIONAL_INT64_SCHEMA)
.field("stxnid", Schema.OPTIONAL_STRING_SCHEMA)
.build();
assertConnectSchemasAreEqual(null, source.schema(), schema);