diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/SourceInfoTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/SourceInfoTest.java index dc3cd35c4..afd2e36d8 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/SourceInfoTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/SourceInfoTest.java @@ -20,6 +20,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.AbstractSourceInfoStructMaker; +import io.debezium.pipeline.txmetadata.TransactionMonitor; /** * @author Randall Hauch @@ -259,6 +260,7 @@ public void schemaIsCorrect() { .field("ts_ms", Schema.INT64_SCHEMA) .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("db", Schema.STRING_SCHEMA) + .field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA) .field("rs", Schema.STRING_SCHEMA) .field("collection", Schema.STRING_SCHEMA) .field("ord", Schema.INT32_SCHEMA) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java index 0f430117b..a5c2481dc 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java @@ -28,6 +28,7 @@ import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; import io.debezium.document.Document; +import io.debezium.pipeline.txmetadata.TransactionMonitor; public class SourceInfoTest { @@ -655,6 +656,7 @@ public void schemaIsCorrect() { .field("ts_ms", Schema.INT64_SCHEMA) .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("db", Schema.STRING_SCHEMA) + .field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA) .field("table", Schema.OPTIONAL_STRING_SCHEMA) .field("server_id", Schema.INT64_SCHEMA) .field("gtid", Schema.OPTIONAL_STRING_SCHEMA) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 781e020a9..52af0f945 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -156,7 +156,7 @@ public void start(Configuration config) { previousOffset, errorHandler, PostgresConnector.class, - connectorConfig.getLogicalName(), + connectorConfig, new PostgresChangeEventSourceFactory( connectorConfig, snapshotter, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index eafb1ed7f..1749ae164 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -22,6 +22,7 @@ import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.spi.OffsetState; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; import io.debezium.time.Conversions; @@ -38,9 +39,10 @@ public class PostgresOffsetContext implements OffsetContext { private final Map partition; private boolean lastSnapshotRecord; private Long lastCompletelyProcessedLsn; + private final TransactionContext transactionContext; private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Long lsn, Long lastCompletelyProcessedLsn, Long txId, Instant time, boolean snapshot, - boolean lastSnapshotRecord) { + boolean lastSnapshotRecord, TransactionContext transactionContext) { partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); sourceInfo = new SourceInfo(connectorConfig); @@ -55,6 +57,7 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Long lsn, else { sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE); } + this.transactionContext = transactionContext; } @Override @@ -175,7 +178,8 @@ public OffsetContext load(Map offset) { final Instant useconds = Conversions.toInstantFromMicros((Long) offset.get(SourceInfo.TIMESTAMP_USEC_KEY)); final boolean snapshot = (boolean) ((Map) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE); final boolean lastSnapshotRecord = (boolean) ((Map) offset).getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE); - return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, txId, useconds, snapshot, lastSnapshotRecord); + return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, txId, useconds, snapshot, lastSnapshotRecord, + TransactionContext.load(offset)); } } @@ -199,7 +203,8 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne txId, clock.currentTimeAsInstant(), false, - false); + false, + new TransactionContext()); } catch (SQLException e) { throw new ConnectException("Database processing error", e); @@ -224,4 +229,9 @@ public void markLastSnapshotRecord() { public void event(DataCollectionId tableId, Instant instant) { sourceInfo.update(instant, (TableId) tableId); } + + @Override + public TransactionContext getTransactionContext() { + return transactionContext; + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java index b791bdf09..f73175707 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java @@ -16,6 +16,7 @@ import io.debezium.connector.AbstractSourceInfoStructMaker; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; +import io.debezium.pipeline.txmetadata.TransactionMonitor; import io.debezium.relational.TableId; import io.debezium.time.Conversions; @@ -68,6 +69,7 @@ public void schemaIsCorrect() { .field("ts_ms", Schema.INT64_SCHEMA) .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("db", Schema.STRING_SCHEMA) + .field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA) .field("schema", Schema.STRING_SCHEMA) .field("table", Schema.STRING_SCHEMA) .field("txId", Schema.OPTIONAL_INT64_SCHEMA) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index 182b606ca..e1cf3820d 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -6,7 +6,6 @@ package io.debezium.connector.sqlserver; import java.sql.SQLException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -128,7 +127,7 @@ public void start(Configuration config) { previousOffset, errorHandler, SqlServerConnector.class, - connectorConfig.getLogicalName(), + connectorConfig, new SqlServerChangeEventSourceFactory(connectorConfig, dataConnection, metadataConnection, errorHandler, dispatcher, clock, schema), dispatcher, schema); @@ -136,27 +135,6 @@ public void start(Configuration config) { coordinator.start(taskContext, this.queue, new SqlServerEventMetadataProvider()); } - /** - * Loads the connector's persistent offset (if present) via the given loader. - */ - @Override - protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) { - Map partition = loader.getPartition(); - - Map previousOffset = context.offsetStorageReader() - .offsets(Collections.singleton(partition)) - .get(partition); - - if (previousOffset != null) { - OffsetContext offsetContext = loader.load(previousOffset); - LOGGER.info("Found previous offset {}", offsetContext); - return offsetContext; - } - else { - return null; - } - } - @Override public List poll() throws InterruptedException { final List records = queue.poll(); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java index f4c6ad483..345b58db9 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -14,6 +14,7 @@ import io.debezium.connector.SnapshotRecord; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; import io.debezium.util.Collect; @@ -27,13 +28,15 @@ public class SqlServerOffsetContext implements OffsetContext { private final SourceInfo sourceInfo; private final Map partition; private boolean snapshotCompleted; + private final TransactionContext transactionContext; /** * The index of the current event within the current transaction. */ private long eventSerialNo; - public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, long eventSerialNo) { + public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, long eventSerialNo, + TransactionContext transactionContext) { partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); sourceInfo = new SourceInfo(connectorConfig); @@ -49,10 +52,11 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE); } this.eventSerialNo = eventSerialNo; + this.transactionContext = transactionContext; } public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted) { - this(connectorConfig, position, snapshot, snapshotCompleted, 1); + this(connectorConfig, position, snapshot, snapshotCompleted, 1, new TransactionContext()); } @Override @@ -69,11 +73,11 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos SourceInfo.COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString()); } else { - return Collect.hashMapOf( + return transactionContext.store(Collect.hashMapOf( SourceInfo.COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString(), SourceInfo.CHANGE_LSN_KEY, sourceInfo.getChangeLsn() == null ? null : sourceInfo.getChangeLsn().toString(), - SourceInfo.EVENT_SERIAL_NO_KEY, eventSerialNo); + SourceInfo.EVENT_SERIAL_NO_KEY, eventSerialNo)); } } @@ -158,7 +162,8 @@ public OffsetContext load(Map offset) { eventSerialNo = Long.valueOf(0); } - return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, eventSerialNo); + return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, eventSerialNo, + TransactionContext.load(offset)); } } @@ -183,4 +188,9 @@ public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.setSourceTime(timestamp); sourceInfo.setTableId((TableId) tableId); } + + @Override + public TransactionContext getTransactionContext() { + return transactionContext; + } } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SourceInfoTest.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SourceInfoTest.java index b2a046c45..9fe0a8c1f 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SourceInfoTest.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SourceInfoTest.java @@ -17,6 +17,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.AbstractSourceInfoStructMaker; import io.debezium.connector.SnapshotRecord; +import io.debezium.pipeline.txmetadata.TransactionMonitor; import io.debezium.relational.TableId; public class SourceInfoTest { @@ -95,6 +96,7 @@ public void schemaIsCorrect() { .field("ts_ms", Schema.INT64_SCHEMA) .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("db", Schema.STRING_SCHEMA) + .field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA) .field("schema", Schema.STRING_SCHEMA) .field("table", Schema.STRING_SCHEMA) .field("change_lsn", Schema.OPTIONAL_STRING_SCHEMA) diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 8535a5981..6dcf9a7ef 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -379,6 +379,70 @@ public void update() throws Exception { stopConnector(); } + @Test + public void transactionMetadata() throws Exception { + final int RECORDS_PER_TABLE = 5; + final int ID_START = 10; + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) + .build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + Testing.Print.enable(); + // Wait for snapshot completion + consumeRecordsByTopic(1); + + connection.setAutoCommit(false); + final String[] inserts = new String[RECORDS_PER_TABLE * 2]; + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_START + i; + inserts[2 * i] = "INSERT INTO tablea VALUES(" + id + ", 'a')"; + inserts[2 * i + 1] = "INSERT INTO tableb VALUES(" + id + ", 'b')"; + } + connection.execute(inserts); + connection.setAutoCommit(true); + + connection.execute("INSERT INTO tableb VALUES(1000, 'b')"); + + // BEGIN, data, END, BEGIN, data + final SourceRecords records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * 2 + 1 + 1 + 1); + final List tableA = records.recordsForTopic("server1.dbo.tablea"); + final List tableB = records.recordsForTopic("server1.dbo.tableb"); + final List tx = records.recordsForTopic("__debezium.transaction.server1"); + Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE); + Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE + 1); + Assertions.assertThat(tx).hasSize(3); + + final List all = records.allRecordsInOrder(); + final Struct begin = (Struct) all.get(0).value(); + final Struct beginKey = (Struct) all.get(0).key(); + Assertions.assertThat(begin.getString("status")).isEqualTo("BEGIN"); + Assertions.assertThat(begin.getInt64("event_count")).isNull(); + final String txId = begin.getString("id"); + Assertions.assertThat(beginKey.getString("id")).isEqualTo(txId); + + long counter = 1; + for (int i = 1; i <= 2 * RECORDS_PER_TABLE; i++) { + final Struct change = ((Struct) all.get(i).value()).getStruct("source").getStruct("transaction"); + Assertions.assertThat(change.getString("id")).isEqualTo(txId); + Assertions.assertThat(change.getInt64("total_order")).isEqualTo(counter); + Assertions.assertThat(change.getInt64("data_collection_order")).isEqualTo((counter + 1) / 2); + counter++; + } + + final Struct end = (Struct) all.get(2 * RECORDS_PER_TABLE + 1).value(); + final Struct endKey = (Struct) all.get(2 * RECORDS_PER_TABLE + 1).key(); + Assertions.assertThat(end.getString("status")).isEqualTo("END"); + Assertions.assertThat(end.getString("id")).isEqualTo(txId); + Assertions.assertThat(end.getInt64("event_count")).isEqualTo(2 * RECORDS_PER_TABLE); + Assertions.assertThat(endKey.getString("id")).isEqualTo(txId); + + stopConnector(); + } + @Test public void updatePrimaryKey() throws Exception { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java new file mode 100644 index 000000000..971b95751 --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java @@ -0,0 +1,324 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import static org.fest.assertions.Assertions.assertThat; +import static org.junit.Assert.assertNull; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.fest.assertions.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; +import io.debezium.connector.sqlserver.util.TestHelper; +import io.debezium.data.Envelope; +import io.debezium.data.SchemaAndValueField; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.util.Testing; + +/** + * Integration test for the Debezium SQL Server connector. + * + * @author Jiri Pechanec + */ +public class TransactionMetadataIT extends AbstractConnectorTest { + + private SqlServerConnection connection; + + @Before + public void before() throws SQLException { + TestHelper.createTestDatabase(); + connection = TestHelper.testConnection(); + connection.execute( + "CREATE TABLE tablea (id int primary key, cola varchar(30))", + "CREATE TABLE tableb (id int primary key, colb varchar(30))", + "INSERT INTO tablea VALUES(1, 'a')"); + TestHelper.enableTableCdc(connection, "tablea"); + TestHelper.enableTableCdc(connection, "tableb"); + + initializeConnectorTestFramework(); + Testing.Files.delete(TestHelper.DB_HISTORY_PATH); + // Testing.Print.enable(); + } + + @After + public void after() throws SQLException { + if (connection != null) { + connection.close(); + } + } + + @Test + public void transactionMetadata() throws Exception { + final int RECORDS_PER_TABLE = 5; + final int ID_START = 10; + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) + .build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + // Testing.Print.enable(); + // Wait for snapshot completion + consumeRecordsByTopic(1); + + connection.setAutoCommit(false); + final String[] inserts = new String[RECORDS_PER_TABLE * 2]; + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_START + i; + inserts[2 * i] = "INSERT INTO tablea VALUES(" + id + ", 'a')"; + inserts[2 * i + 1] = "INSERT INTO tableb VALUES(" + id + ", 'b')"; + } + connection.execute(inserts); + connection.setAutoCommit(true); + + connection.execute("INSERT INTO tableb VALUES(1000, 'b')"); + + // BEGIN, data, END, BEGIN, data + final SourceRecords records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * 2 + 1 + 1 + 1); + final List tableA = records.recordsForTopic("server1.dbo.tablea"); + final List tableB = records.recordsForTopic("server1.dbo.tableb"); + final List tx = records.recordsForTopic("__debezium.transaction.server1"); + Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE); + Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE + 1); + Assertions.assertThat(tx).hasSize(3); + + final List all = records.allRecordsInOrder(); + final String txId = assertBeginTransaction(all.get(0)); + + long counter = 1; + for (int i = 1; i <= 2 * RECORDS_PER_TABLE; i++) { + assertRecordTransactionMetadata(all.get(i), txId, counter, (counter + 1) / 2); + counter++; + } + + assertEndTransaction(all.get(2 * RECORDS_PER_TABLE + 1), txId, 2 * RECORDS_PER_TABLE); + stopConnector(); + } + + private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean afterStreaming) throws Exception { + final int RECORDS_PER_TABLE = 30; + final int TABLES = 2; + final int ID_START = 10; + final int ID_RESTART = 1000; + final int HALF_ID = ID_START + RECORDS_PER_TABLE / 2; + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) + .build(); + + // Testing.Print.enable(); + + if (restartJustAfterSnapshot) { + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + // Wait for snapshot to be completed + consumeRecordsByTopic(1); + stopConnector(); + connection.execute("INSERT INTO tablea VALUES(-1, '-a')"); + } + + start(SqlServerConnector.class, config, record -> { + if (!"server1.dbo.tablea.Envelope".equals(record.valueSchema().name())) { + return false; + } + final Struct envelope = (Struct) record.value(); + final Struct after = envelope.getStruct("after"); + final Integer id = after.getInt32("id"); + final String value = after.getString("cola"); + return id != null && id == HALF_ID && "a".equals(value); + }); + assertConnectorIsRunning(); + + String firstTxId = null; + if (restartJustAfterSnapshot) { + // Transaction begin + SourceRecord begin = consumeRecordsByTopic(1).allRecordsInOrder().get(0); + firstTxId = assertBeginTransaction(begin); + } + + // Wait for snapshot to be completed or a first streaming message delivered + consumeRecordsByTopic(1); + + if (afterStreaming) { + connection.execute("INSERT INTO tablea VALUES(-2, '-a')"); + final SourceRecords records = consumeRecordsByTopic(2); + final List expectedRow = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, -2), + new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "-a")); + assertRecord(((Struct) records.allRecordsInOrder().get(1).value()).getStruct(Envelope.FieldName.AFTER), expectedRow); + SourceRecord begin = records.allRecordsInOrder().get(0); + firstTxId = assertBeginTransaction(begin); + } + + connection.setAutoCommit(false); + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_START + i; + connection.executeWithoutCommitting( + "INSERT INTO tablea VALUES(" + id + ", 'a')"); + connection.executeWithoutCommitting( + "INSERT INTO tableb VALUES(" + id + ", 'b')"); + } + connection.connection().commit(); + + // End of previous TX, BEGIN of new TX, change records + final int txBeginIndex = firstTxId != null ? 1 : 0; + int expectedRecords = txBeginIndex + 1 + RECORDS_PER_TABLE; + List records = consumeRecordsByTopic(expectedRecords).allRecordsInOrder(); + + assertThat(records).hasSize(expectedRecords); + + if (firstTxId != null) { + assertEndTransaction(records.get(0), firstTxId, 1); + } + final String batchTxId = assertBeginTransaction(records.get(txBeginIndex)); + + SourceRecord lastRecordForOffset = records.get(RECORDS_PER_TABLE + txBeginIndex); + Struct value = (Struct) lastRecordForOffset.value(); + final List expectedLastRow = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, HALF_ID - 1), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); + assertRecord((Struct) value.get("after"), expectedLastRow); + assertRecordTransactionMetadata(lastRecordForOffset, batchTxId, RECORDS_PER_TABLE, RECORDS_PER_TABLE / 2); + + stopConnector(); + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE); + records = sourceRecords.allRecordsInOrder(); + assertThat(records).hasSize(RECORDS_PER_TABLE); + + List tableA = sourceRecords.recordsForTopic("server1.dbo.tablea"); + List tableB = sourceRecords.recordsForTopic("server1.dbo.tableb"); + for (int i = 0; i < RECORDS_PER_TABLE / 2; i++) { + final int id = HALF_ID + i; + final SourceRecord recordA = tableA.get(i); + final SourceRecord recordB = tableB.get(i); + final List expectedRowA = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, id), + new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a")); + final List expectedRowB = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, id), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); + + final Struct valueA = (Struct) recordA.value(); + assertRecord((Struct) valueA.get("after"), expectedRowA); + assertNull(valueA.get("before")); + + final Struct valueB = (Struct) recordB.value(); + assertRecord((Struct) valueB.get("after"), expectedRowB); + assertNull(valueB.get("before")); + + assertRecordTransactionMetadata(recordA, batchTxId, RECORDS_PER_TABLE + 2 * i + 1, RECORDS_PER_TABLE / 2 + i + 1); + assertRecordTransactionMetadata(recordB, batchTxId, RECORDS_PER_TABLE + 2 * i + 2, RECORDS_PER_TABLE / 2 + i + 1); + } + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_RESTART + i; + connection.executeWithoutCommitting( + "INSERT INTO tablea VALUES(" + id + ", 'a')"); + connection.executeWithoutCommitting( + "INSERT INTO tableb VALUES(" + id + ", 'b')"); + connection.connection().commit(); + } + + // END of previous TX, data records, BEGIN of TX for every pair of record, END of TX for every pair of record but last + sourceRecords = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * TABLES + (2 * RECORDS_PER_TABLE - 1)); + tableA = sourceRecords.recordsForTopic("server1.dbo.tablea"); + tableB = sourceRecords.recordsForTopic("server1.dbo.tableb"); + List txMetadata = sourceRecords.recordsForTopic("__debezium.transaction.server1"); + + Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE); + Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE); + Assertions.assertThat(txMetadata).hasSize(1 + 2 * RECORDS_PER_TABLE - 1); + assertEndTransaction(txMetadata.get(0), batchTxId, 2 * RECORDS_PER_TABLE); + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = i + ID_RESTART; + final SourceRecord recordA = tableA.get(i); + final SourceRecord recordB = tableB.get(i); + final List expectedRowA = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, id), + new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a")); + final List expectedRowB = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, id), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); + + final Struct valueA = (Struct) recordA.value(); + assertRecord((Struct) valueA.get("after"), expectedRowA); + assertNull(valueA.get("before")); + + final Struct valueB = (Struct) recordB.value(); + assertRecord((Struct) valueB.get("after"), expectedRowB); + assertNull(valueB.get("before")); + + final String txId = assertBeginTransaction(txMetadata.get(2 * i + 1)); + assertRecordTransactionMetadata(recordA, txId, 1, 1); + assertRecordTransactionMetadata(recordB, txId, 2, 1); + if (i < RECORDS_PER_TABLE - 1) { + assertEndTransaction(txMetadata.get(2 * i + 2), txId, 2); + } + } + } + + @Test + public void restartInTheMiddleOfTxAfterSnapshot() throws Exception { + restartInTheMiddleOfTx(true, false); + } + + @Test + public void restartInTheMiddleOfTxAfterCompletedTx() throws Exception { + restartInTheMiddleOfTx(false, true); + } + + @Test + public void restartInTheMiddleOfTx() throws Exception { + restartInTheMiddleOfTx(false, false); + } + + private void assertRecord(Struct record, List expected) { + expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); + } + + private String assertBeginTransaction(SourceRecord record) { + final Struct begin = (Struct) record.value(); + final Struct beginKey = (Struct) record.key(); + Assertions.assertThat(begin.getString("status")).isEqualTo("BEGIN"); + Assertions.assertThat(begin.getInt64("event_count")).isNull(); + final String txId = begin.getString("id"); + Assertions.assertThat(beginKey.getString("id")).isEqualTo(txId); + return txId; + } + + private void assertEndTransaction(SourceRecord record, String expectedTxId, long expectedEventCount) { + final Struct end = (Struct) record.value(); + final Struct endKey = (Struct) record.key(); + Assertions.assertThat(end.getString("status")).isEqualTo("END"); + Assertions.assertThat(end.getString("id")).isEqualTo(expectedTxId); + Assertions.assertThat(end.getInt64("event_count")).isEqualTo(expectedEventCount); + Assertions.assertThat(endKey.getString("id")).isEqualTo(expectedTxId); + } + + private void assertRecordTransactionMetadata(SourceRecord record, String expectedTxId, long expectedTotalOrder, long expectedCollectionOrder) { + final Struct change = ((Struct) record.value()).getStruct("source").getStruct("transaction"); + Assertions.assertThat(change.getString("id")).isEqualTo(expectedTxId); + Assertions.assertThat(change.getInt64("total_order")).isEqualTo(expectedTotalOrder); + Assertions.assertThat(change.getInt64("data_collection_order")).isEqualTo(expectedCollectionOrder); + } +} diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 151cbbf22..d1c3f317a 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -158,6 +158,14 @@ public static Version parse(String value, String defaultValue) { .withDescription("Whether field names will be sanitized to Avro naming conventions") .withDefault(Boolean.FALSE); + public static final Field PROVIDE_TRANSACTION_METADATA = Field.create("provide.transaction.metadata") + .withDisplayName("Store transaction metadata information in a dedicated topic.") + .withType(Type.BOOLEAN) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("Enables transaction metadata extraction together with event counting") + .withDefault(Boolean.FALSE); + private final Configuration config; private final boolean emitTombstoneOnDelete; private final int maxQueueSize; @@ -169,6 +177,7 @@ public static Version parse(String value, String defaultValue) { private final int snapshotFetchSize; private final SourceInfoStructMaker sourceInfoStructMaker; private final boolean sanitizeFieldNames; + private final boolean shouldProvideTransactionMetadata; protected CommonConnectorConfig(Configuration config, String logicalName, int defaultSnapshotFetchSize) { this.config = config; @@ -182,6 +191,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize); this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION))); this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config); + this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA); } /** @@ -226,6 +236,10 @@ public int getSnapshotFetchSize() { return snapshotFetchSize; } + public boolean shouldProvideTransactionMetadata() { + return shouldProvideTransactionMetadata; + } + @SuppressWarnings("unchecked") public SourceInfoStructMaker getSourceInfoStructMaker() { return (SourceInfoStructMaker) sourceInfoStructMaker; diff --git a/debezium-core/src/main/java/io/debezium/connector/AbstractSourceInfoStructMaker.java b/debezium-core/src/main/java/io/debezium/connector/AbstractSourceInfoStructMaker.java index 964022175..e13cb4375 100644 --- a/debezium-core/src/main/java/io/debezium/connector/AbstractSourceInfoStructMaker.java +++ b/debezium-core/src/main/java/io/debezium/connector/AbstractSourceInfoStructMaker.java @@ -13,6 +13,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.data.Enum; +import io.debezium.pipeline.txmetadata.TransactionMonitor; /** * Common information provided by all connectors in either source field or offsets. @@ -42,7 +43,8 @@ protected SchemaBuilder commonSchemaBuilder() { .field(AbstractSourceInfo.SERVER_NAME_KEY, Schema.STRING_SCHEMA) .field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA) .field(AbstractSourceInfo.SNAPSHOT_KEY, SNAPSHOT_RECORD_SCHEMA) - .field(AbstractSourceInfo.DATABASE_NAME_KEY, Schema.STRING_SCHEMA); + .field(AbstractSourceInfo.DATABASE_NAME_KEY, Schema.STRING_SCHEMA) + .field(TransactionMonitor.DEBEZIUM_TRANSACTION_KEY, TransactionMonitor.TRANSACTION_BLOCK_SCHEMA); } protected Struct commonStruct(T sourceInfo) { diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index 4c0d10fb1..009f028d6 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import io.debezium.annotation.ThreadSafe; +import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; @@ -28,6 +29,7 @@ import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.SnapshotResult; import io.debezium.pipeline.spi.SnapshotResult.SnapshotResultStatus; +import io.debezium.pipeline.txmetadata.TransactionMonitor; import io.debezium.relational.RelationalDatabaseSchema; import io.debezium.util.Threads; @@ -49,6 +51,7 @@ public class ChangeEventSourceCoordinator { private final ExecutorService executor; private final EventDispatcher eventDispatcher; private final RelationalDatabaseSchema schema; + private final CommonConnectorConfig connectorConfig; private volatile boolean running; private volatile StreamingChangeEventSource streamingSource; @@ -56,12 +59,14 @@ public class ChangeEventSourceCoordinator { private SnapshotChangeEventSourceMetrics snapshotMetrics; private StreamingChangeEventSourceMetrics streamingMetrics; - public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler errorHandler, Class connectorType, String logicalName, + public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler errorHandler, Class connectorType, + CommonConnectorConfig connectorConfig, ChangeEventSourceFactory changeEventSourceFactory, EventDispatcher eventDispatcher, RelationalDatabaseSchema schema) { this.previousOffset = previousOffset; this.errorHandler = errorHandler; + this.connectorConfig = connectorConfig; this.changeEventSourceFactory = changeEventSourceFactory; - this.executor = Threads.newSingleThreadExecutor(connectorType, logicalName, "change-event-source-coordinator"); + this.executor = Threads.newSingleThreadExecutor(connectorType, connectorConfig.getLogicalName(), "change-event-source-coordinator"); this.eventDispatcher = eventDispatcher; this.schema = schema; } @@ -93,7 +98,8 @@ public synchronized void start(T taskContext, C if (running && snapshotResult.isCompletedOrSkipped()) { streamingSource = changeEventSourceFactory.getStreamingChangeEventSource(snapshotResult.getOffset()); - eventDispatcher.setEventListener(streamingMetrics); + eventDispatcher.setEventListener(new CompositeDataChangeEventListener(streamingMetrics, + new TransactionMonitor(connectorConfig, metadataProvider, eventDispatcher::dispatchTransactionMessage))); streamingMetrics.connected(true); LOGGER.info("Starting streaming"); streamingSource.execute(context); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/CompositeDataChangeEventListener.java b/debezium-core/src/main/java/io/debezium/pipeline/CompositeDataChangeEventListener.java new file mode 100644 index 000000000..0110445e7 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/CompositeDataChangeEventListener.java @@ -0,0 +1,57 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.pipeline.source.spi.DataChangeEventListener; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.schema.DataCollectionId; + +/** + * Composite design pattern to aggregate multiple instances of {@link DataChangeEventListener}. + * + * @author Jiri Pechanec + * + */ +public class CompositeDataChangeEventListener implements DataChangeEventListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(CompositeDataChangeEventListener.class); + + final List elements = new ArrayList<>(); + + public CompositeDataChangeEventListener(DataChangeEventListener... components) { + Collections.addAll(this.elements, components); + } + + @Override + public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) throws InterruptedException { + for (DataChangeEventListener c : elements) { + c.onEvent(source, offset, key, value); + } + LOGGER.debug("Created instance with {} elements", elements.size()); + } + + @Override + public void onFilteredEvent(String event) { + for (DataChangeEventListener c : elements) { + c.onFilteredEvent(event); + } + } + + @Override + public void onErroneousEvent(String event) { + for (DataChangeEventListener c : elements) { + c.onErroneousEvent(event); + } + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index 05ed780fc..41ac33f00 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -203,6 +203,10 @@ private void enqueueHeartbeat(SourceRecord record) throws InterruptedException { queue.enqueue(new DataChangeEvent(record)); } + public void dispatchTransactionMessage(SourceRecord record) throws InterruptedException { + queue.enqueue(new DataChangeEvent(record)); + } + /** * Change record receiver used during snapshotting. Allows for a deferred submission of records, which is needed in * order to set the "snapshot completed" offset field, which we can't send to Kafka Connect without sending an diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java index 497560c8f..e0e31f15d 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java @@ -22,7 +22,7 @@ public interface DataChangeEventListener { /** * Invoked if an event is processed for a captured table. */ - void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value); + void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) throws InterruptedException; /** * Invoked for events pertaining to non-whitelisted tables. diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java index ec418d16d..bc3737495 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java @@ -11,6 +11,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.schema.DataCollectionId; /** @@ -69,4 +70,6 @@ interface Loader { * Records the name of the collection and the timestamp of the last event */ void event(DataCollectionId collectionId, Instant timestamp); + + TransactionContext getTransactionContext(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionContext.java b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionContext.java new file mode 100644 index 000000000..a8ba405a8 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionContext.java @@ -0,0 +1,113 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.txmetadata; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import io.debezium.annotation.NotThreadSafe; +import io.debezium.schema.DataCollectionId; + +/** + * The context holds internal state necessary for book-keeping of events in active transaction. + * The main data tracked are + *
    + *
  • active transaction id
  • + *
  • the total event number seen from the transaction
  • + *
  • the number of events per table/collection seen in the transaction
  • + *
+ * + * The state of this context is stored in offsets and is recovered upon restart. + * + * + * @author Jiri Pechanec + */ +@NotThreadSafe +public class TransactionContext { + + private static final String OFFSET_TRANSACTION_ID = TransactionMonitor.DEBEZIUM_TRANSACTION_KEY + "_" + TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY; + private static final String OFFSET_TABLE_COUNT_PREFIX = TransactionMonitor.DEBEZIUM_TRANSACTION_KEY + "_" + + TransactionMonitor.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY + "_"; + private static final int OFFSET_TABLE_COUNT_PREFIX_LENGTH = OFFSET_TABLE_COUNT_PREFIX.length(); + + private String transactionId = null; + private Map perTableEventCount = new HashMap<>(); + private long totalEventCount = 0; + + public TransactionContext() { + } + + private void reset() { + transactionId = null; + totalEventCount = 0; + perTableEventCount.clear(); + } + + public Map store(Map offset) { + offset.put(OFFSET_TRANSACTION_ID, transactionId); + final String tableCountPrefix = OFFSET_TABLE_COUNT_PREFIX; + for (final Entry e : perTableEventCount.entrySet()) { + offset.put(tableCountPrefix + e.getKey(), e.getValue()); + } + return offset; + } + + @SuppressWarnings("unchecked") + public static TransactionContext load(Map offsets) { + final Map o = (Map) offsets; + final TransactionContext context = new TransactionContext(); + + context.transactionId = (String) o.get(OFFSET_TRANSACTION_ID); + + for (final Entry offset : o.entrySet()) { + if (offset.getKey().startsWith(OFFSET_TABLE_COUNT_PREFIX)) { + final String dataCollectionId = offset.getKey().substring(OFFSET_TABLE_COUNT_PREFIX_LENGTH); + final Long count = (Long) offset.getValue(); + context.perTableEventCount.put(dataCollectionId, count); + } + } + + context.totalEventCount = context.perTableEventCount.values().stream().mapToLong(x -> x).sum(); + + return context; + } + + public boolean isTransactionInProgress() { + return transactionId != null; + } + + public String getTransactionId() { + return transactionId; + } + + public long getTotalEventCount() { + return totalEventCount; + } + + public void beginTransaction(String txId) { + reset(); + transactionId = txId; + } + + public void endTransaction() { + reset(); + } + + public long event(DataCollectionId source) { + totalEventCount++; + final String sourceName = source.toString(); + final long dataCollectionEventOrder = perTableEventCount.getOrDefault(sourceName, 0L).longValue() + 1; + perTableEventCount.put(sourceName, Long.valueOf(dataCollectionEventOrder)); + return dataCollectionEventOrder; + } + + @Override + public String toString() { + return "TransactionContext [currentTransactionId=" + transactionId + ", perTableEventCount=" + + perTableEventCount + ", totalEventCount=" + totalEventCount + "]"; + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java new file mode 100644 index 000000000..a72f24741 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java @@ -0,0 +1,164 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.txmetadata; + +import java.util.Objects; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.NotThreadSafe; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.data.Envelope; +import io.debezium.function.BlockingConsumer; +import io.debezium.pipeline.source.spi.DataChangeEventListener; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.schema.DataCollectionId; +import io.debezium.util.SchemaNameAdjuster; + +/** + * The class has externalized its state in {@link TransactionContext} context class so it can be stored in and recovered from offsets. + * The class receives all processed events and keeps the transaction tracking depending on transaction id. + * Upon transaction change the metadata events are delivered to a dedicated topic informing about {@code START/END} of the transaction, + * including transaction id and in case of {@code END} event the amount of events generated by the transaction. + *

+ * Every event seen has its {@code source} block enriched to contain + * + *

    + *
  • transaction id
  • + *
  • the total event order in the transaction
  • + *
  • the order of event per table/collection source in the transaction
  • + *
+ * + * @author Jiri Pechanec + */ +@NotThreadSafe +public class TransactionMonitor implements DataChangeEventListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(TransactionMonitor.class); + private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); + + private static final String TOPIC_PREFIX = "__debezium.transaction."; + + public static final String DEBEZIUM_TRANSACTION_KEY = "transaction"; + public static final String DEBEZIUM_TRANSACTION_ID_KEY = "id"; + public static final String DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY = "total_order"; + public static final String DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY = "data_collection_order"; + public static final String DEBEZIUM_TRANSACTION_STATUS_KEY = "status"; + public static final String DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY = "event_count"; + + public static final Schema TRANSACTION_BLOCK_SCHEMA = SchemaBuilder.struct().optional() + .field(DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA) + .field(DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA) + .field(DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA) + .build(); + + private static Schema TRANSACTION_KEY_SCHEMA = SchemaBuilder.struct() + .name(schemaNameAdjuster.adjust("io.debezium.connector.common.TransactionMetadataKey")) + .field(DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA) + .build(); + private static Schema TRANSACTION_VALUE_SCHEMA = SchemaBuilder.struct() + .name(schemaNameAdjuster.adjust("io.debezium.connector.common.TransactionMetadataValue")) + .field(DEBEZIUM_TRANSACTION_STATUS_KEY, Schema.STRING_SCHEMA) + .field(DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA) + .field(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, Schema.OPTIONAL_INT64_SCHEMA) + .build(); + + private final EventMetadataProvider eventMetadataProvider; + private final String topicName; + private final BlockingConsumer sender; + private final CommonConnectorConfig connectorConfig; + + public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataProvider eventMetadataProvider, BlockingConsumer sender) { + Objects.requireNonNull(eventMetadataProvider); + this.topicName = TOPIC_PREFIX + connectorConfig.getLogicalName(); + this.eventMetadataProvider = eventMetadataProvider; + this.sender = sender; + this.connectorConfig = connectorConfig; + } + + @Override + public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) throws InterruptedException { + if (!connectorConfig.shouldProvideTransactionMetadata()) { + return; + } + final TransactionContext transactionContext = offset.getTransactionContext(); + + final String txId = eventMetadataProvider.getTransactionId(source, offset, key, value); + if (txId == null) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Event '{}' has no transaction id", eventMetadataProvider.toSummaryString(source, offset, key, value)); + } + return; + } + + if (!transactionContext.isTransactionInProgress()) { + transactionContext.beginTransaction(txId); + beginTransaction(offset); + } + else if (!transactionContext.getTransactionId().equals(txId)) { + endTransaction(offset); + transactionContext.endTransaction(); + transactionContext.beginTransaction(txId); + beginTransaction(offset); + } + transactionEvent(offset, source, value); + } + + private void transactionEvent(OffsetContext offsetContext, DataCollectionId source, Struct value) { + final long dataCollectionEventOrder = offsetContext.getTransactionContext().event(source); + if (value == null) { + LOGGER.debug("Event with key {} without value. Cannot enrich source block."); + return; + } + final Struct sourceBlock = value.getStruct(Envelope.FieldName.SOURCE); + if (sourceBlock == null) { + LOGGER.debug("Event with key {} with value {} lacks source block. Cannot be enriched."); + return; + } + final Struct txStruct = new Struct(TRANSACTION_BLOCK_SCHEMA); + txStruct.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId()); + txStruct.put(DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, offsetContext.getTransactionContext().getTotalEventCount()); + txStruct.put(DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, dataCollectionEventOrder); + sourceBlock.put(DEBEZIUM_TRANSACTION_KEY, txStruct); + } + + private void beginTransaction(OffsetContext offsetContext) throws InterruptedException { + final Struct key = new Struct(TRANSACTION_KEY_SCHEMA); + key.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId()); + final Struct value = new Struct(TRANSACTION_VALUE_SCHEMA); + value.put(DEBEZIUM_TRANSACTION_STATUS_KEY, TransactionStatus.BEGIN.name()); + value.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId()); + + sender.accept(new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), + topicName, null, key.schema(), key, value.schema(), value)); + } + + private void endTransaction(OffsetContext offsetContext) throws InterruptedException { + final Struct key = new Struct(TRANSACTION_KEY_SCHEMA); + key.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId()); + final Struct value = new Struct(TRANSACTION_VALUE_SCHEMA); + value.put(DEBEZIUM_TRANSACTION_STATUS_KEY, TransactionStatus.END.name()); + value.put(DEBEZIUM_TRANSACTION_ID_KEY, offsetContext.getTransactionContext().getTransactionId()); + value.put(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, offsetContext.getTransactionContext().getTotalEventCount()); + + sender.accept(new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), + topicName, null, key.schema(), key, value.schema(), value)); + } + + @Override + public void onFilteredEvent(String event) { + } + + @Override + public void onErroneousEvent(String event) { + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionStatus.java b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionStatus.java new file mode 100644 index 000000000..227b31010 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionStatus.java @@ -0,0 +1,17 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.txmetadata; + +/** + * Describes the transition of transaction from start to end. + * + * @author Jiri Pechanec + */ +public enum TransactionStatus { + + BEGIN, + END; +}