diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java index 9cafee11c..9bba0461e 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java @@ -92,7 +92,7 @@ public void transactionMetadata() throws Exception { final SourceRecords records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * 2 + 1 + 1 + 1); final List tableA = records.recordsForTopic("server1.dbo.tablea"); final List tableB = records.recordsForTopic("server1.dbo.tableb"); - final List tx = records.recordsForTopic("__debezium.transaction.server1"); + final List tx = records.recordsForTopic("server1.transaction"); Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE); Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE + 1); Assertions.assertThat(tx).hasSize(3); @@ -242,7 +242,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af sourceRecords = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * TABLES + (2 * RECORDS_PER_TABLE - 1)); tableA = sourceRecords.recordsForTopic("server1.dbo.tablea"); tableB = sourceRecords.recordsForTopic("server1.dbo.tableb"); - List txMetadata = sourceRecords.recordsForTopic("__debezium.transaction.server1"); + List txMetadata = sourceRecords.recordsForTopic("server1.transaction"); Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE); Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java index e72041634..a9ca431eb 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java @@ -45,7 +45,7 @@ public class TransactionMonitor { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionMonitor.class); private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); - private static final String TOPIC_PREFIX = "__debezium.transaction."; + private static final String TOPIC_SUFFIX = ".transaction"; public static final String DEBEZIUM_TRANSACTION_KEY = "transaction"; public static final String DEBEZIUM_TRANSACTION_ID_KEY = "id"; @@ -78,7 +78,7 @@ public class TransactionMonitor { public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataProvider eventMetadataProvider, BlockingConsumer sender) { Objects.requireNonNull(eventMetadataProvider); - this.topicName = TOPIC_PREFIX + connectorConfig.getLogicalName(); + this.topicName = connectorConfig.getLogicalName() + TOPIC_SUFFIX; this.eventMetadataProvider = eventMetadataProvider; this.sender = sender; this.connectorConfig = connectorConfig;