DBZ-1052 Transaction metadata for SQL Server
This commit is contained in:
parent
6f7b8d1707
commit
b0eb571f9f
@ -20,6 +20,7 @@
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.AbstractSourceInfoStructMaker;
|
||||
import io.debezium.pipeline.txmetadata.TransactionMonitor;
|
||||
|
||||
/**
|
||||
* @author Randall Hauch
|
||||
@ -259,6 +260,7 @@ public void schemaIsCorrect() {
|
||||
.field("ts_ms", Schema.INT64_SCHEMA)
|
||||
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
|
||||
.field("db", Schema.STRING_SCHEMA)
|
||||
.field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA)
|
||||
.field("rs", Schema.STRING_SCHEMA)
|
||||
.field("collection", Schema.STRING_SCHEMA)
|
||||
.field("ord", Schema.INT32_SCHEMA)
|
||||
|
@ -28,6 +28,7 @@
|
||||
import io.debezium.data.VerifyRecord;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.document.Document;
|
||||
import io.debezium.pipeline.txmetadata.TransactionMonitor;
|
||||
|
||||
public class SourceInfoTest {
|
||||
|
||||
@ -655,6 +656,7 @@ public void schemaIsCorrect() {
|
||||
.field("ts_ms", Schema.INT64_SCHEMA)
|
||||
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
|
||||
.field("db", Schema.STRING_SCHEMA)
|
||||
.field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA)
|
||||
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("server_id", Schema.INT64_SCHEMA)
|
||||
.field("gtid", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
|
@ -156,7 +156,7 @@ public void start(Configuration config) {
|
||||
previousOffset,
|
||||
errorHandler,
|
||||
PostgresConnector.class,
|
||||
connectorConfig.getLogicalName(),
|
||||
connectorConfig,
|
||||
new PostgresChangeEventSourceFactory(
|
||||
connectorConfig,
|
||||
snapshotter,
|
||||
|
@ -22,6 +22,7 @@
|
||||
import io.debezium.connector.postgresql.connection.ReplicationConnection;
|
||||
import io.debezium.connector.postgresql.spi.OffsetState;
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.pipeline.txmetadata.TransactionContext;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.schema.DataCollectionId;
|
||||
import io.debezium.time.Conversions;
|
||||
@ -38,9 +39,10 @@ public class PostgresOffsetContext implements OffsetContext {
|
||||
private final Map<String, String> partition;
|
||||
private boolean lastSnapshotRecord;
|
||||
private Long lastCompletelyProcessedLsn;
|
||||
private final TransactionContext transactionContext;
|
||||
|
||||
private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Long lsn, Long lastCompletelyProcessedLsn, Long txId, Instant time, boolean snapshot,
|
||||
boolean lastSnapshotRecord) {
|
||||
boolean lastSnapshotRecord, TransactionContext transactionContext) {
|
||||
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
|
||||
sourceInfo = new SourceInfo(connectorConfig);
|
||||
|
||||
@ -55,6 +57,7 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Long lsn,
|
||||
else {
|
||||
sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE);
|
||||
}
|
||||
this.transactionContext = transactionContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -175,7 +178,8 @@ public OffsetContext load(Map<String, ?> offset) {
|
||||
final Instant useconds = Conversions.toInstantFromMicros((Long) offset.get(SourceInfo.TIMESTAMP_USEC_KEY));
|
||||
final boolean snapshot = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE);
|
||||
final boolean lastSnapshotRecord = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE);
|
||||
return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, txId, useconds, snapshot, lastSnapshotRecord);
|
||||
return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, txId, useconds, snapshot, lastSnapshotRecord,
|
||||
TransactionContext.load(offset));
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,7 +203,8 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne
|
||||
txId,
|
||||
clock.currentTimeAsInstant(),
|
||||
false,
|
||||
false);
|
||||
false,
|
||||
new TransactionContext());
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new ConnectException("Database processing error", e);
|
||||
@ -224,4 +229,9 @@ public void markLastSnapshotRecord() {
|
||||
public void event(DataCollectionId tableId, Instant instant) {
|
||||
sourceInfo.update(instant, (TableId) tableId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransactionContext getTransactionContext() {
|
||||
return transactionContext;
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
import io.debezium.connector.AbstractSourceInfoStructMaker;
|
||||
import io.debezium.data.VerifyRecord;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.pipeline.txmetadata.TransactionMonitor;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.time.Conversions;
|
||||
|
||||
@ -68,6 +69,7 @@ public void schemaIsCorrect() {
|
||||
.field("ts_ms", Schema.INT64_SCHEMA)
|
||||
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
|
||||
.field("db", Schema.STRING_SCHEMA)
|
||||
.field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA)
|
||||
.field("schema", Schema.STRING_SCHEMA)
|
||||
.field("table", Schema.STRING_SCHEMA)
|
||||
.field("txId", Schema.OPTIONAL_INT64_SCHEMA)
|
||||
|
@ -6,7 +6,6 @@
|
||||
package io.debezium.connector.sqlserver;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@ -128,7 +127,7 @@ public void start(Configuration config) {
|
||||
previousOffset,
|
||||
errorHandler,
|
||||
SqlServerConnector.class,
|
||||
connectorConfig.getLogicalName(),
|
||||
connectorConfig,
|
||||
new SqlServerChangeEventSourceFactory(connectorConfig, dataConnection, metadataConnection, errorHandler, dispatcher, clock, schema),
|
||||
dispatcher,
|
||||
schema);
|
||||
@ -136,27 +135,6 @@ public void start(Configuration config) {
|
||||
coordinator.start(taskContext, this.queue, new SqlServerEventMetadataProvider());
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the connector's persistent offset (if present) via the given loader.
|
||||
*/
|
||||
@Override
|
||||
protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) {
|
||||
Map<String, ?> partition = loader.getPartition();
|
||||
|
||||
Map<String, Object> previousOffset = context.offsetStorageReader()
|
||||
.offsets(Collections.singleton(partition))
|
||||
.get(partition);
|
||||
|
||||
if (previousOffset != null) {
|
||||
OffsetContext offsetContext = loader.load(previousOffset);
|
||||
LOGGER.info("Found previous offset {}", offsetContext);
|
||||
return offsetContext;
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SourceRecord> poll() throws InterruptedException {
|
||||
final List<DataChangeEvent> records = queue.poll();
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
import io.debezium.connector.SnapshotRecord;
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.pipeline.txmetadata.TransactionContext;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.schema.DataCollectionId;
|
||||
import io.debezium.util.Collect;
|
||||
@ -27,13 +28,15 @@ public class SqlServerOffsetContext implements OffsetContext {
|
||||
private final SourceInfo sourceInfo;
|
||||
private final Map<String, String> partition;
|
||||
private boolean snapshotCompleted;
|
||||
private final TransactionContext transactionContext;
|
||||
|
||||
/**
|
||||
* The index of the current event within the current transaction.
|
||||
*/
|
||||
private long eventSerialNo;
|
||||
|
||||
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, long eventSerialNo) {
|
||||
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, long eventSerialNo,
|
||||
TransactionContext transactionContext) {
|
||||
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
|
||||
sourceInfo = new SourceInfo(connectorConfig);
|
||||
|
||||
@ -49,10 +52,11 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos
|
||||
sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE);
|
||||
}
|
||||
this.eventSerialNo = eventSerialNo;
|
||||
this.transactionContext = transactionContext;
|
||||
}
|
||||
|
||||
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted) {
|
||||
this(connectorConfig, position, snapshot, snapshotCompleted, 1);
|
||||
this(connectorConfig, position, snapshot, snapshotCompleted, 1, new TransactionContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -69,11 +73,11 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos
|
||||
SourceInfo.COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString());
|
||||
}
|
||||
else {
|
||||
return Collect.hashMapOf(
|
||||
return transactionContext.store(Collect.hashMapOf(
|
||||
SourceInfo.COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString(),
|
||||
SourceInfo.CHANGE_LSN_KEY,
|
||||
sourceInfo.getChangeLsn() == null ? null : sourceInfo.getChangeLsn().toString(),
|
||||
SourceInfo.EVENT_SERIAL_NO_KEY, eventSerialNo);
|
||||
SourceInfo.EVENT_SERIAL_NO_KEY, eventSerialNo));
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,7 +162,8 @@ public OffsetContext load(Map<String, ?> offset) {
|
||||
eventSerialNo = Long.valueOf(0);
|
||||
}
|
||||
|
||||
return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, eventSerialNo);
|
||||
return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, eventSerialNo,
|
||||
TransactionContext.load(offset));
|
||||
}
|
||||
}
|
||||
|
||||
@ -183,4 +188,9 @@ public void event(DataCollectionId tableId, Instant timestamp) {
|
||||
sourceInfo.setSourceTime(timestamp);
|
||||
sourceInfo.setTableId((TableId) tableId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransactionContext getTransactionContext() {
|
||||
return transactionContext;
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.AbstractSourceInfoStructMaker;
|
||||
import io.debezium.connector.SnapshotRecord;
|
||||
import io.debezium.pipeline.txmetadata.TransactionMonitor;
|
||||
import io.debezium.relational.TableId;
|
||||
|
||||
public class SourceInfoTest {
|
||||
@ -95,6 +96,7 @@ public void schemaIsCorrect() {
|
||||
.field("ts_ms", Schema.INT64_SCHEMA)
|
||||
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
|
||||
.field("db", Schema.STRING_SCHEMA)
|
||||
.field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA)
|
||||
.field("schema", Schema.STRING_SCHEMA)
|
||||
.field("table", Schema.STRING_SCHEMA)
|
||||
.field("change_lsn", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
|
@ -379,6 +379,70 @@ public void update() throws Exception {
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void transactionMetadata() throws Exception {
|
||||
final int RECORDS_PER_TABLE = 5;
|
||||
final int ID_START = 10;
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
|
||||
.build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
Testing.Print.enable();
|
||||
// Wait for snapshot completion
|
||||
consumeRecordsByTopic(1);
|
||||
|
||||
connection.setAutoCommit(false);
|
||||
final String[] inserts = new String[RECORDS_PER_TABLE * 2];
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START + i;
|
||||
inserts[2 * i] = "INSERT INTO tablea VALUES(" + id + ", 'a')";
|
||||
inserts[2 * i + 1] = "INSERT INTO tableb VALUES(" + id + ", 'b')";
|
||||
}
|
||||
connection.execute(inserts);
|
||||
connection.setAutoCommit(true);
|
||||
|
||||
connection.execute("INSERT INTO tableb VALUES(1000, 'b')");
|
||||
|
||||
// BEGIN, data, END, BEGIN, data
|
||||
final SourceRecords records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * 2 + 1 + 1 + 1);
|
||||
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
|
||||
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
|
||||
final List<SourceRecord> tx = records.recordsForTopic("__debezium.transaction.server1");
|
||||
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE + 1);
|
||||
Assertions.assertThat(tx).hasSize(3);
|
||||
|
||||
final List<SourceRecord> all = records.allRecordsInOrder();
|
||||
final Struct begin = (Struct) all.get(0).value();
|
||||
final Struct beginKey = (Struct) all.get(0).key();
|
||||
Assertions.assertThat(begin.getString("status")).isEqualTo("BEGIN");
|
||||
Assertions.assertThat(begin.getInt64("event_count")).isNull();
|
||||
final String txId = begin.getString("id");
|
||||
Assertions.assertThat(beginKey.getString("id")).isEqualTo(txId);
|
||||
|
||||
long counter = 1;
|
||||
for (int i = 1; i <= 2 * RECORDS_PER_TABLE; i++) {
|
||||
final Struct change = ((Struct) all.get(i).value()).getStruct("source").getStruct("transaction");
|
||||
Assertions.assertThat(change.getString("id")).isEqualTo(txId);
|
||||
Assertions.assertThat(change.getInt64("total_order")).isEqualTo(counter);
|
||||
Assertions.assertThat(change.getInt64("data_collection_order")).isEqualTo((counter + 1) / 2);
|
||||
counter++;
|
||||
}
|
||||
|
||||
final Struct end = (Struct) all.get(2 * RECORDS_PER_TABLE + 1).value();
|
||||
final Struct endKey = (Struct) all.get(2 * RECORDS_PER_TABLE + 1).key();
|
||||
Assertions.assertThat(end.getString("status")).isEqualTo("END");
|
||||
Assertions.assertThat(end.getString("id")).isEqualTo(txId);
|
||||
Assertions.assertThat(end.getInt64("event_count")).isEqualTo(2 * RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(endKey.getString("id")).isEqualTo(txId);
|
||||
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updatePrimaryKey() throws Exception {
|
||||
|
||||
|
@ -0,0 +1,324 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.sqlserver;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.fest.assertions.Assertions;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
|
||||
import io.debezium.connector.sqlserver.util.TestHelper;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.data.SchemaAndValueField;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
/**
|
||||
* Integration test for the Debezium SQL Server connector.
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*/
|
||||
public class TransactionMetadataIT extends AbstractConnectorTest {
|
||||
|
||||
private SqlServerConnection connection;
|
||||
|
||||
@Before
|
||||
public void before() throws SQLException {
|
||||
TestHelper.createTestDatabase();
|
||||
connection = TestHelper.testConnection();
|
||||
connection.execute(
|
||||
"CREATE TABLE tablea (id int primary key, cola varchar(30))",
|
||||
"CREATE TABLE tableb (id int primary key, colb varchar(30))",
|
||||
"INSERT INTO tablea VALUES(1, 'a')");
|
||||
TestHelper.enableTableCdc(connection, "tablea");
|
||||
TestHelper.enableTableCdc(connection, "tableb");
|
||||
|
||||
initializeConnectorTestFramework();
|
||||
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
|
||||
// Testing.Print.enable();
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws SQLException {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void transactionMetadata() throws Exception {
|
||||
final int RECORDS_PER_TABLE = 5;
|
||||
final int ID_START = 10;
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
|
||||
.build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
// Testing.Print.enable();
|
||||
// Wait for snapshot completion
|
||||
consumeRecordsByTopic(1);
|
||||
|
||||
connection.setAutoCommit(false);
|
||||
final String[] inserts = new String[RECORDS_PER_TABLE * 2];
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START + i;
|
||||
inserts[2 * i] = "INSERT INTO tablea VALUES(" + id + ", 'a')";
|
||||
inserts[2 * i + 1] = "INSERT INTO tableb VALUES(" + id + ", 'b')";
|
||||
}
|
||||
connection.execute(inserts);
|
||||
connection.setAutoCommit(true);
|
||||
|
||||
connection.execute("INSERT INTO tableb VALUES(1000, 'b')");
|
||||
|
||||
// BEGIN, data, END, BEGIN, data
|
||||
final SourceRecords records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * 2 + 1 + 1 + 1);
|
||||
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
|
||||
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
|
||||
final List<SourceRecord> tx = records.recordsForTopic("__debezium.transaction.server1");
|
||||
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE + 1);
|
||||
Assertions.assertThat(tx).hasSize(3);
|
||||
|
||||
final List<SourceRecord> all = records.allRecordsInOrder();
|
||||
final String txId = assertBeginTransaction(all.get(0));
|
||||
|
||||
long counter = 1;
|
||||
for (int i = 1; i <= 2 * RECORDS_PER_TABLE; i++) {
|
||||
assertRecordTransactionMetadata(all.get(i), txId, counter, (counter + 1) / 2);
|
||||
counter++;
|
||||
}
|
||||
|
||||
assertEndTransaction(all.get(2 * RECORDS_PER_TABLE + 1), txId, 2 * RECORDS_PER_TABLE);
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean afterStreaming) throws Exception {
|
||||
final int RECORDS_PER_TABLE = 30;
|
||||
final int TABLES = 2;
|
||||
final int ID_START = 10;
|
||||
final int ID_RESTART = 1000;
|
||||
final int HALF_ID = ID_START + RECORDS_PER_TABLE / 2;
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
|
||||
.build();
|
||||
|
||||
// Testing.Print.enable();
|
||||
|
||||
if (restartJustAfterSnapshot) {
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
// Wait for snapshot to be completed
|
||||
consumeRecordsByTopic(1);
|
||||
stopConnector();
|
||||
connection.execute("INSERT INTO tablea VALUES(-1, '-a')");
|
||||
}
|
||||
|
||||
start(SqlServerConnector.class, config, record -> {
|
||||
if (!"server1.dbo.tablea.Envelope".equals(record.valueSchema().name())) {
|
||||
return false;
|
||||
}
|
||||
final Struct envelope = (Struct) record.value();
|
||||
final Struct after = envelope.getStruct("after");
|
||||
final Integer id = after.getInt32("id");
|
||||
final String value = after.getString("cola");
|
||||
return id != null && id == HALF_ID && "a".equals(value);
|
||||
});
|
||||
assertConnectorIsRunning();
|
||||
|
||||
String firstTxId = null;
|
||||
if (restartJustAfterSnapshot) {
|
||||
// Transaction begin
|
||||
SourceRecord begin = consumeRecordsByTopic(1).allRecordsInOrder().get(0);
|
||||
firstTxId = assertBeginTransaction(begin);
|
||||
}
|
||||
|
||||
// Wait for snapshot to be completed or a first streaming message delivered
|
||||
consumeRecordsByTopic(1);
|
||||
|
||||
if (afterStreaming) {
|
||||
connection.execute("INSERT INTO tablea VALUES(-2, '-a')");
|
||||
final SourceRecords records = consumeRecordsByTopic(2);
|
||||
final List<SchemaAndValueField> expectedRow = Arrays.asList(
|
||||
new SchemaAndValueField("id", Schema.INT32_SCHEMA, -2),
|
||||
new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "-a"));
|
||||
assertRecord(((Struct) records.allRecordsInOrder().get(1).value()).getStruct(Envelope.FieldName.AFTER), expectedRow);
|
||||
SourceRecord begin = records.allRecordsInOrder().get(0);
|
||||
firstTxId = assertBeginTransaction(begin);
|
||||
}
|
||||
|
||||
connection.setAutoCommit(false);
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START + i;
|
||||
connection.executeWithoutCommitting(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a')");
|
||||
connection.executeWithoutCommitting(
|
||||
"INSERT INTO tableb VALUES(" + id + ", 'b')");
|
||||
}
|
||||
connection.connection().commit();
|
||||
|
||||
// End of previous TX, BEGIN of new TX, change records
|
||||
final int txBeginIndex = firstTxId != null ? 1 : 0;
|
||||
int expectedRecords = txBeginIndex + 1 + RECORDS_PER_TABLE;
|
||||
List<SourceRecord> records = consumeRecordsByTopic(expectedRecords).allRecordsInOrder();
|
||||
|
||||
assertThat(records).hasSize(expectedRecords);
|
||||
|
||||
if (firstTxId != null) {
|
||||
assertEndTransaction(records.get(0), firstTxId, 1);
|
||||
}
|
||||
final String batchTxId = assertBeginTransaction(records.get(txBeginIndex));
|
||||
|
||||
SourceRecord lastRecordForOffset = records.get(RECORDS_PER_TABLE + txBeginIndex);
|
||||
Struct value = (Struct) lastRecordForOffset.value();
|
||||
final List<SchemaAndValueField> expectedLastRow = Arrays.asList(
|
||||
new SchemaAndValueField("id", Schema.INT32_SCHEMA, HALF_ID - 1),
|
||||
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
|
||||
assertRecord((Struct) value.get("after"), expectedLastRow);
|
||||
assertRecordTransactionMetadata(lastRecordForOffset, batchTxId, RECORDS_PER_TABLE, RECORDS_PER_TABLE / 2);
|
||||
|
||||
stopConnector();
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE);
|
||||
records = sourceRecords.allRecordsInOrder();
|
||||
assertThat(records).hasSize(RECORDS_PER_TABLE);
|
||||
|
||||
List<SourceRecord> tableA = sourceRecords.recordsForTopic("server1.dbo.tablea");
|
||||
List<SourceRecord> tableB = sourceRecords.recordsForTopic("server1.dbo.tableb");
|
||||
for (int i = 0; i < RECORDS_PER_TABLE / 2; i++) {
|
||||
final int id = HALF_ID + i;
|
||||
final SourceRecord recordA = tableA.get(i);
|
||||
final SourceRecord recordB = tableB.get(i);
|
||||
final List<SchemaAndValueField> expectedRowA = Arrays.asList(
|
||||
new SchemaAndValueField("id", Schema.INT32_SCHEMA, id),
|
||||
new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
|
||||
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
|
||||
new SchemaAndValueField("id", Schema.INT32_SCHEMA, id),
|
||||
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
|
||||
|
||||
final Struct valueA = (Struct) recordA.value();
|
||||
assertRecord((Struct) valueA.get("after"), expectedRowA);
|
||||
assertNull(valueA.get("before"));
|
||||
|
||||
final Struct valueB = (Struct) recordB.value();
|
||||
assertRecord((Struct) valueB.get("after"), expectedRowB);
|
||||
assertNull(valueB.get("before"));
|
||||
|
||||
assertRecordTransactionMetadata(recordA, batchTxId, RECORDS_PER_TABLE + 2 * i + 1, RECORDS_PER_TABLE / 2 + i + 1);
|
||||
assertRecordTransactionMetadata(recordB, batchTxId, RECORDS_PER_TABLE + 2 * i + 2, RECORDS_PER_TABLE / 2 + i + 1);
|
||||
}
|
||||
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_RESTART + i;
|
||||
connection.executeWithoutCommitting(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a')");
|
||||
connection.executeWithoutCommitting(
|
||||
"INSERT INTO tableb VALUES(" + id + ", 'b')");
|
||||
connection.connection().commit();
|
||||
}
|
||||
|
||||
// END of previous TX, data records, BEGIN of TX for every pair of record, END of TX for every pair of record but last
|
||||
sourceRecords = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * TABLES + (2 * RECORDS_PER_TABLE - 1));
|
||||
tableA = sourceRecords.recordsForTopic("server1.dbo.tablea");
|
||||
tableB = sourceRecords.recordsForTopic("server1.dbo.tableb");
|
||||
List<SourceRecord> txMetadata = sourceRecords.recordsForTopic("__debezium.transaction.server1");
|
||||
|
||||
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(txMetadata).hasSize(1 + 2 * RECORDS_PER_TABLE - 1);
|
||||
assertEndTransaction(txMetadata.get(0), batchTxId, 2 * RECORDS_PER_TABLE);
|
||||
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = i + ID_RESTART;
|
||||
final SourceRecord recordA = tableA.get(i);
|
||||
final SourceRecord recordB = tableB.get(i);
|
||||
final List<SchemaAndValueField> expectedRowA = Arrays.asList(
|
||||
new SchemaAndValueField("id", Schema.INT32_SCHEMA, id),
|
||||
new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
|
||||
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
|
||||
new SchemaAndValueField("id", Schema.INT32_SCHEMA, id),
|
||||
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
|
||||
|
||||
final Struct valueA = (Struct) recordA.value();
|
||||
assertRecord((Struct) valueA.get("after"), expectedRowA);
|
||||
assertNull(valueA.get("before"));
|
||||
|
||||
final Struct valueB = (Struct) recordB.value();
|
||||
assertRecord((Struct) valueB.get("after"), expectedRowB);
|
||||
assertNull(valueB.get("before"));
|
||||
|
||||
final String txId = assertBeginTransaction(txMetadata.get(2 * i + 1));
|
||||
assertRecordTransactionMetadata(recordA, txId, 1, 1);
|
||||
assertRecordTransactionMetadata(recordB, txId, 2, 1);
|
||||
if (i < RECORDS_PER_TABLE - 1) {
|
||||
assertEndTransaction(txMetadata.get(2 * i + 2), txId, 2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void restartInTheMiddleOfTxAfterSnapshot() throws Exception {
|
||||
restartInTheMiddleOfTx(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void restartInTheMiddleOfTxAfterCompletedTx() throws Exception {
|
||||
restartInTheMiddleOfTx(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void restartInTheMiddleOfTx() throws Exception {
|
||||
restartInTheMiddleOfTx(false, false);
|
||||
}
|
||||
|
||||
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
|
||||
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
|
||||
}
|
||||
|
||||
private String assertBeginTransaction(SourceRecord record) {
|
||||
final Struct begin = (Struct) record.value();
|
||||
final Struct beginKey = (Struct) record.key();
|
||||
Assertions.assertThat(begin.getString("status")).isEqualTo("BEGIN");
|
||||
Assertions.assertThat(begin.getInt64("event_count")).isNull();
|
||||
final String txId = begin.getString("id");
|
||||
Assertions.assertThat(beginKey.getString("id")).isEqualTo(txId);
|
||||
return txId;
|
||||
}
|
||||
|
||||
private void assertEndTransaction(SourceRecord record, String expectedTxId, long expectedEventCount) {
|
||||
final Struct end = (Struct) record.value();
|
||||
final Struct endKey = (Struct) record.key();
|
||||
Assertions.assertThat(end.getString("status")).isEqualTo("END");
|
||||
Assertions.assertThat(end.getString("id")).isEqualTo(expectedTxId);
|
||||
Assertions.assertThat(end.getInt64("event_count")).isEqualTo(expectedEventCount);
|
||||
Assertions.assertThat(endKey.getString("id")).isEqualTo(expectedTxId);
|
||||
}
|
||||
|
||||
private void assertRecordTransactionMetadata(SourceRecord record, String expectedTxId, long expectedTotalOrder, long expectedCollectionOrder) {
|
||||
final Struct change = ((Struct) record.value()).getStruct("source").getStruct("transaction");
|
||||
Assertions.assertThat(change.getString("id")).isEqualTo(expectedTxId);
|
||||
Assertions.assertThat(change.getInt64("total_order")).isEqualTo(expectedTotalOrder);
|
||||
Assertions.assertThat(change.getInt64("data_collection_order")).isEqualTo(expectedCollectionOrder);
|
||||
}
|
||||
}
|
@ -158,6 +158,14 @@ public static Version parse(String value, String defaultValue) {
|
||||
.withDescription("Whether field names will be sanitized to Avro naming conventions")
|
||||
.withDefault(Boolean.FALSE);
|
||||
|
||||
public static final Field PROVIDE_TRANSACTION_METADATA = Field.create("provide.transaction.metadata")
|
||||
.withDisplayName("Store transaction metadata information in a dedicated topic.")
|
||||
.withType(Type.BOOLEAN)
|
||||
.withWidth(Width.SHORT)
|
||||
.withImportance(Importance.LOW)
|
||||
.withDescription("Enables transaction metadata extraction together with event counting")
|
||||
.withDefault(Boolean.FALSE);
|
||||
|
||||
private final Configuration config;
|
||||
private final boolean emitTombstoneOnDelete;
|
||||
private final int maxQueueSize;
|
||||
@ -169,6 +177,7 @@ public static Version parse(String value, String defaultValue) {
|
||||
private final int snapshotFetchSize;
|
||||
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
|
||||
private final boolean sanitizeFieldNames;
|
||||
private final boolean shouldProvideTransactionMetadata;
|
||||
|
||||
protected CommonConnectorConfig(Configuration config, String logicalName, int defaultSnapshotFetchSize) {
|
||||
this.config = config;
|
||||
@ -182,6 +191,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de
|
||||
this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize);
|
||||
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION)));
|
||||
this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config);
|
||||
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -226,6 +236,10 @@ public int getSnapshotFetchSize() {
|
||||
return snapshotFetchSize;
|
||||
}
|
||||
|
||||
public boolean shouldProvideTransactionMetadata() {
|
||||
return shouldProvideTransactionMetadata;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStructMaker() {
|
||||
return (SourceInfoStructMaker<T>) sourceInfoStructMaker;
|
||||
|
@ -13,6 +13,7 @@
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.data.Enum;
|
||||
import io.debezium.pipeline.txmetadata.TransactionMonitor;
|
||||
|
||||
/**
|
||||
* Common information provided by all connectors in either source field or offsets.
|
||||
@ -42,7 +43,8 @@ protected SchemaBuilder commonSchemaBuilder() {
|
||||
.field(AbstractSourceInfo.SERVER_NAME_KEY, Schema.STRING_SCHEMA)
|
||||
.field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
|
||||
.field(AbstractSourceInfo.SNAPSHOT_KEY, SNAPSHOT_RECORD_SCHEMA)
|
||||
.field(AbstractSourceInfo.DATABASE_NAME_KEY, Schema.STRING_SCHEMA);
|
||||
.field(AbstractSourceInfo.DATABASE_NAME_KEY, Schema.STRING_SCHEMA)
|
||||
.field(TransactionMonitor.DEBEZIUM_TRANSACTION_KEY, TransactionMonitor.TRANSACTION_BLOCK_SCHEMA);
|
||||
}
|
||||
|
||||
protected Struct commonStruct(T sourceInfo) {
|
||||
|
@ -15,6 +15,7 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.connector.base.ChangeEventQueueMetrics;
|
||||
import io.debezium.connector.common.CdcSourceTaskContext;
|
||||
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
|
||||
@ -28,6 +29,7 @@
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.pipeline.spi.SnapshotResult;
|
||||
import io.debezium.pipeline.spi.SnapshotResult.SnapshotResultStatus;
|
||||
import io.debezium.pipeline.txmetadata.TransactionMonitor;
|
||||
import io.debezium.relational.RelationalDatabaseSchema;
|
||||
import io.debezium.util.Threads;
|
||||
|
||||
@ -49,6 +51,7 @@ public class ChangeEventSourceCoordinator {
|
||||
private final ExecutorService executor;
|
||||
private final EventDispatcher<?> eventDispatcher;
|
||||
private final RelationalDatabaseSchema schema;
|
||||
private final CommonConnectorConfig connectorConfig;
|
||||
|
||||
private volatile boolean running;
|
||||
private volatile StreamingChangeEventSource streamingSource;
|
||||
@ -56,12 +59,14 @@ public class ChangeEventSourceCoordinator {
|
||||
private SnapshotChangeEventSourceMetrics snapshotMetrics;
|
||||
private StreamingChangeEventSourceMetrics streamingMetrics;
|
||||
|
||||
public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType, String logicalName,
|
||||
public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType,
|
||||
CommonConnectorConfig connectorConfig,
|
||||
ChangeEventSourceFactory changeEventSourceFactory, EventDispatcher<?> eventDispatcher, RelationalDatabaseSchema schema) {
|
||||
this.previousOffset = previousOffset;
|
||||
this.errorHandler = errorHandler;
|
||||
this.connectorConfig = connectorConfig;
|
||||
this.changeEventSourceFactory = changeEventSourceFactory;
|
||||
this.executor = Threads.newSingleThreadExecutor(connectorType, logicalName, "change-event-source-coordinator");
|
||||
this.executor = Threads.newSingleThreadExecutor(connectorType, connectorConfig.getLogicalName(), "change-event-source-coordinator");
|
||||
this.eventDispatcher = eventDispatcher;
|
||||
this.schema = schema;
|
||||
}
|
||||
@ -93,7 +98,8 @@ public synchronized <T extends CdcSourceTaskContext> void start(T taskContext, C
|
||||
|
||||
if (running && snapshotResult.isCompletedOrSkipped()) {
|
||||
streamingSource = changeEventSourceFactory.getStreamingChangeEventSource(snapshotResult.getOffset());
|
||||
eventDispatcher.setEventListener(streamingMetrics);
|
||||
eventDispatcher.setEventListener(new CompositeDataChangeEventListener(streamingMetrics,
|
||||
new TransactionMonitor(connectorConfig, metadataProvider, eventDispatcher::dispatchTransactionMessage)));
|
||||
streamingMetrics.connected(true);
|
||||
LOGGER.info("Starting streaming");
|
||||
streamingSource.execute(context);
|
||||
|
@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.pipeline;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.pipeline.source.spi.DataChangeEventListener;
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.schema.DataCollectionId;
|
||||
|
||||
/**
|
||||
* Composite design pattern to aggregate multiple instances of {@link DataChangeEventListener}.
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*
|
||||
*/
|
||||
public class CompositeDataChangeEventListener implements DataChangeEventListener {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CompositeDataChangeEventListener.class);
|
||||
|
||||
final List<DataChangeEventListener> elements = new ArrayList<>();
|
||||
|
||||
public CompositeDataChangeEventListener(DataChangeEventListener... components) {
|
||||
Collections.addAll(this.elements, components);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) throws InterruptedException {
|
||||
for (DataChangeEventListener c : elements) {
|
||||
c.onEvent(source, offset, key, value);
|
||||
}
|
||||
LOGGER.debug("Created instance with {} elements", elements.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFilteredEvent(String event) {
|
||||
for (DataChangeEventListener c : elements) {
|
||||
c.onFilteredEvent(event);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onErroneousEvent(String event) {
|
||||
for (DataChangeEventListener c : elements) {
|
||||
c.onErroneousEvent(event);
|
||||
}
|
||||
}
|
||||
}
|
@ -203,6 +203,10 @@ private void enqueueHeartbeat(SourceRecord record) throws InterruptedException {
|
||||
queue.enqueue(new DataChangeEvent(record));
|
||||
}
|
||||
|
||||
public void dispatchTransactionMessage(SourceRecord record) throws InterruptedException {
|
||||
queue.enqueue(new DataChangeEvent(record));
|
||||
}
|
||||
|
||||
/**
|
||||
* Change record receiver used during snapshotting. Allows for a deferred submission of records, which is needed in
|
||||
* order to set the "snapshot completed" offset field, which we can't send to Kafka Connect without sending an
|
||||
|
@ -22,7 +22,7 @@ public interface DataChangeEventListener {
|
||||
/**
|
||||
* Invoked if an event is processed for a captured table.
|
||||
*/
|
||||
void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value);
|
||||
void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) throws InterruptedException;
|
||||
|
||||
/**
|
||||
* Invoked for events pertaining to non-whitelisted tables.
|
||||
|
@ -11,6 +11,7 @@
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.pipeline.txmetadata.TransactionContext;
|
||||
import io.debezium.schema.DataCollectionId;
|
||||
|
||||
/**
|
||||
@ -69,4 +70,6 @@ interface Loader {
|
||||
* Records the name of the collection and the timestamp of the last event
|
||||
*/
|
||||
void event(DataCollectionId collectionId, Instant timestamp);
|
||||
|
||||
TransactionContext getTransactionContext();
|
||||
}
|
||||
|
@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.pipeline.txmetadata;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import io.debezium.annotation.NotThreadSafe;
|
||||
import io.debezium.schema.DataCollectionId;
|
||||
|
||||
/**
|
||||
* The context holds internal state necessary for book-keeping of events in active transaction.
|
||||
* The main data tracked are
|
||||
* <ul>
|
||||
* <li>active transaction id</li>
|
||||
* <li>the total event number seen from the transaction</li>
|
||||
* <li>the number of events per table/collection seen in the transaction</li>
|
||||
* </ul>
|
||||
*
|
||||
* The state of this context is stored in offsets and is recovered upon restart.
|
||||
*
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*/
|
||||
@NotThreadSafe
|
||||
public class TransactionContext {
|
||||
|
||||
private static final String OFFSET_TRANSACTION_ID = TransactionMonitor.DEBEZIUM_TRANSACTION_KEY + "_" + TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY;
|
||||
private static final String OFFSET_TABLE_COUNT_PREFIX = TransactionMonitor.DEBEZIUM_TRANSACTION_KEY + "_"
|
||||
+ TransactionMonitor.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY + "_";
|
||||
private static final int OFFSET_TABLE_COUNT_PREFIX_LENGTH = OFFSET_TABLE_COUNT_PREFIX.length();
|
||||
|
||||
private String transactionId = null;
|
||||
private Map<String, Long> perTableEventCount = new HashMap<>();
|
||||
private long totalEventCount = 0;
|
||||
|
||||
public TransactionContext() {
|
||||
}
|
||||
|
||||
private void reset() {
|
||||
transactionId = null;
|
||||
totalEventCount = 0;
|
||||
perTableEventCount.clear();
|
||||
}
|
||||
|
||||
public Map<String, Object> store(Map<String, Object> offset) {
|
||||
offset.put(OFFSET_TRANSACTION_ID, transactionId);
|
||||
final String tableCountPrefix = OFFSET_TABLE_COUNT_PREFIX;
|
||||
for (final Entry<String, Long> e : perTableEventCount.entrySet()) {
|
||||
offset.put(tableCountPrefix + e.getKey(), e.getValue());
|
||||
}
|
||||
return offset;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static TransactionContext load(Map<String, ?> offsets) {
|
||||
final Map<String, Object> o = (Map<String, Object>) offsets;
|
||||
final TransactionContext context = new TransactionContext();
|
||||
|
||||
context.transactionId = (String) o.get(OFFSET_TRANSACTION_ID);
|
||||
|
||||
for (final Entry<String, Object> offset : o.entrySet()) {
|
||||
if (offset.getKey().startsWith(OFFSET_TABLE_COUNT_PREFIX)) {
|
||||
final String dataCollectionId = offset.getKey().substring(OFFSET_TABLE_COUNT_PREFIX_LENGTH);
|
||||
final Long count = (Long) offset.getValue();
|
||||
context.perTableEventCount.put(dataCollectionId, count);
|
||||
}
|
||||
}
|
||||
|
||||
context.totalEventCount = context.perTableEventCount.values().stream().mapToLong(x -> x).sum();
|
||||
|
||||
return context;
|
||||
}
|
||||
|
||||
public boolean isTransactionInProgress() {
|
||||
return transactionId != null;
|
||||
}
|
||||
|
||||
public String getTransactionId() {
|
||||
return transactionId;
|
||||
}
|
||||
|
||||
public long getTotalEventCount() {
|
||||
return totalEventCount;
|
||||
}
|
||||
|
||||
public void beginTransaction(String txId) {
|
||||
reset();
|
||||
transactionId = txId;
|
||||
}
|
||||
|
||||
public void endTransaction() {
|
||||
reset();
|
||||
}
|
||||
|
||||
public long event(DataCollectionId source) {
|
||||
totalEventCount++;
|
||||
final String sourceName = source.toString();
|
||||
final long dataCollectionEventOrder = perTableEventCount.getOrDefault(sourceName, 0L).longValue() + 1;
|
||||
perTableEventCount.put(sourceName, Long.valueOf(dataCollectionEventOrder));
|
||||
return dataCollectionEventOrder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TransactionContext [currentTransactionId=" + transactionId + ", perTableEventCount="
|
||||
+ perTableEventCount + ", totalEventCount=" + totalEventCount + "]";
|
||||
}
|
||||
}
|
@ -0,0 +1,164 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.pipeline.txmetadata;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.annotation.NotThreadSafe;
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.function.BlockingConsumer;
|
||||
import io.debezium.pipeline.source.spi.DataChangeEventListener;
|
||||
import io.debezium.pipeline.source.spi.EventMetadataProvider;
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.schema.DataCollectionId;
|
||||
import io.debezium.util.SchemaNameAdjuster;
|
||||
|
||||
/**
|
||||
* The class has externalized its state in {@link TransactionContext} context class so it can be stored in and recovered from offsets.
|
||||
* The class receives all processed events and keeps the transaction tracking depending on transaction id.
|
||||
* Upon transaction change the metadata events are delivered to a dedicated topic informing about {@code START/END} of the transaction,
|
||||
* including transaction id and in case of {@code END} event the amount of events generated by the transaction.
|
||||
* <p>
|
||||
* Every event seen has its {@code source} block enriched to contain
|
||||
*
|
||||
* <ul>
|
||||
* <li>transaction id</li>
|
||||
* <li>the total event order in the transaction</li>
|
||||
* <li>the order of event per table/collection source in the transaction</li>
|
||||
* </ul>
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*/
|
||||
@NotThreadSafe
|
||||
public class TransactionMonitor implements DataChangeEventListener {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionMonitor.class);
|
||||
private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
|
||||
|
||||
private static final String TOPIC_PREFIX = "__debezium.transaction.";
|
||||
|
||||
public static final String DEBEZIUM_TRANSACTION_KEY = "transaction";
|
||||
public static final String DEBEZIUM_TRANSACTION_ID_KEY = "id";
|
||||
public static final String DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY = "total_order";
|
||||
public static final String DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY = "data_collection_order";
|
||||
public static final String DEBEZIUM_TRANSACTION_STATUS_KEY = "status";
|
||||
public static final String DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY = "event_count";
|
||||
|
||||
public static final Schema TRANSACTION_BLOCK_SCHEMA = SchemaBuilder.struct().optional()
|
||||
.field(DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA)
|
||||
.field(DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA)
|
||||
.field(DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA)
|
||||
.build();
|
||||
|
||||
private static Schema TRANSACTION_KEY_SCHEMA = SchemaBuilder.struct()
|
||||
.name(schemaNameAdjuster.adjust("io.debezium.connector.common.TransactionMetadataKey"))
|
||||
.field(DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA)
|
||||
.build();
|
||||
private static Schema TRANSACTION_VALUE_SCHEMA = SchemaBuilder.struct()
|
||||
.name(schemaNameAdjuster.adjust("io.debezium.connector.common.TransactionMetadataValue"))
|
||||
.field(DEBEZIUM_TRANSACTION_STATUS_KEY, Schema.STRING_SCHEMA)
|
||||
.field(DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA)
|
||||
.field(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, Schema.OPTIONAL_INT64_SCHEMA)
|
||||
.build();
|
||||
|
||||
private final EventMetadataProvider eventMetadataProvider;
|
||||
private final String topicName;
|
||||
private final BlockingConsumer<SourceRecord> sender;
|
||||
private final CommonConnectorConfig connectorConfig;
|
||||
|
||||
public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataProvider eventMetadataProvider, BlockingConsumer<SourceRecord> sender) {
|
||||
Objects.requireNonNull(eventMetadataProvider);
|
||||
this.topicName = TOPIC_PREFIX + connectorConfig.getLogicalName();
|
||||
this.eventMetadataProvider = eventMetadataProvider;
|
||||
this.sender = sender;
|
||||
this.connectorConfig = connectorConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) throws InterruptedException {
|
||||
if (!connectorConfig.shouldProvideTransactionMetadata()) {
|
||||
return;
|
||||
}
|
||||
final TransactionContext transactionContext = offset.getTransactionContext();
|
||||
|
||||
final String txId = eventMetadataProvider.getTransactionId(source, offset, key, value);
|
||||
if (txId == null) {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("Event '{}' has no transaction id", eventMetadataProvider.toSummaryString(source, offset, key, value));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!transactionContext.isTransactionInProgress()) {
|
||||
transactionContext.beginTransaction(txId);
|
||||
beginTransaction(offset);
|
||||
}
|
||||
else if (!transactionContext.getTransactionId().equals(txId)) {
|
||||
endTransaction(offset);
|
||||
transactionContext.endTransaction();
|
||||
transactionContext.beginTransaction(txId);
|
||||
beginTransaction(offset);
|
||||
}
|
||||
transactionEvent(offset, source, value);
|
||||
}
|
||||
|
||||
private void transactionEvent(OffsetContext offsetContext, DataCollectionId source, Struct value) {
|
||||
final long dataCollectionEventOrder = offsetContext.getTransactionContext().event(source);
|
||||
if (value == null) {
|
||||
LOGGER.debug("Event with key {} without value. Cannot enrich source block.");
|
||||
return;
|
||||
}
|
||||
final Struct sourceBlock = value.getStruct(Envelope.FieldName.SOURCE);
|
||||
if (sourceBlock == null) {
|
||||
LOGGER.debug("Event with key {} with value {} lacks source block. Cannot be enriched.");
|
||||
return;
|
||||
}
|
||||
final Struct txStruct = new Struct(TRANSACTION_BLOCK_SCHEMA);
|
||||
txStruct.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId());
|
||||
txStruct.put(DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, offsetContext.getTransactionContext().getTotalEventCount());
|
||||
txStruct.put(DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, dataCollectionEventOrder);
|
||||
sourceBlock.put(DEBEZIUM_TRANSACTION_KEY, txStruct);
|
||||
}
|
||||
|
||||
private void beginTransaction(OffsetContext offsetContext) throws InterruptedException {
|
||||
final Struct key = new Struct(TRANSACTION_KEY_SCHEMA);
|
||||
key.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId());
|
||||
final Struct value = new Struct(TRANSACTION_VALUE_SCHEMA);
|
||||
value.put(DEBEZIUM_TRANSACTION_STATUS_KEY, TransactionStatus.BEGIN.name());
|
||||
value.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId());
|
||||
|
||||
sender.accept(new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(),
|
||||
topicName, null, key.schema(), key, value.schema(), value));
|
||||
}
|
||||
|
||||
private void endTransaction(OffsetContext offsetContext) throws InterruptedException {
|
||||
final Struct key = new Struct(TRANSACTION_KEY_SCHEMA);
|
||||
key.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId());
|
||||
final Struct value = new Struct(TRANSACTION_VALUE_SCHEMA);
|
||||
value.put(DEBEZIUM_TRANSACTION_STATUS_KEY, TransactionStatus.END.name());
|
||||
value.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId());
|
||||
value.put(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, offsetContext.getTransactionContext().getTotalEventCount());
|
||||
|
||||
sender.accept(new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(),
|
||||
topicName, null, key.schema(), key, value.schema(), value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFilteredEvent(String event) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onErroneousEvent(String event) {
|
||||
}
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.pipeline.txmetadata;
|
||||
|
||||
/**
|
||||
* Describes the transition of transaction from start to end.
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*/
|
||||
public enum TransactionStatus {
|
||||
|
||||
BEGIN,
|
||||
END;
|
||||
}
|
Loading…
Reference in New Issue
Block a user