DBZ-1052 Change TX topic name

This commit is contained in:
Jiri Pechanec 2020-01-28 13:52:13 +01:00 committed by Gunnar Morling
parent 5749acda7e
commit 4867c8a298
2 changed files with 4 additions and 4 deletions

View File

@ -92,7 +92,7 @@ public void transactionMetadata() throws Exception {
final SourceRecords records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * 2 + 1 + 1 + 1);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
final List<SourceRecord> tx = records.recordsForTopic("__debezium.transaction.server1");
final List<SourceRecord> tx = records.recordsForTopic("server1.transaction");
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE + 1);
Assertions.assertThat(tx).hasSize(3);
@ -242,7 +242,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
sourceRecords = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * TABLES + (2 * RECORDS_PER_TABLE - 1));
tableA = sourceRecords.recordsForTopic("server1.dbo.tablea");
tableB = sourceRecords.recordsForTopic("server1.dbo.tableb");
List<SourceRecord> txMetadata = sourceRecords.recordsForTopic("__debezium.transaction.server1");
List<SourceRecord> txMetadata = sourceRecords.recordsForTopic("server1.transaction");
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);

View File

@ -45,7 +45,7 @@ public class TransactionMonitor {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionMonitor.class);
private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
private static final String TOPIC_PREFIX = "__debezium.transaction.";
private static final String TOPIC_SUFFIX = ".transaction";
public static final String DEBEZIUM_TRANSACTION_KEY = "transaction";
public static final String DEBEZIUM_TRANSACTION_ID_KEY = "id";
@ -78,7 +78,7 @@ public class TransactionMonitor {
public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataProvider eventMetadataProvider, BlockingConsumer<SourceRecord> sender) {
Objects.requireNonNull(eventMetadataProvider);
this.topicName = TOPIC_PREFIX + connectorConfig.getLogicalName();
this.topicName = connectorConfig.getLogicalName() + TOPIC_SUFFIX;
this.eventMetadataProvider = eventMetadataProvider;
this.sender = sender;
this.connectorConfig = connectorConfig;