DBZ-6595 Remove magic constant IGNORE_SINK_RECORD_FOR_TABLE

This commit is contained in:
nicholas-fwang 2023-06-23 21:08:56 +09:00 committed by Jiri Pechanec
parent 9896e8adc9
commit 0ecaa3c6ce
4 changed files with 14 additions and 8 deletions

View File

@ -6,7 +6,6 @@
package io.debezium.connector.jdbc;
import static io.debezium.connector.jdbc.JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE;
import static io.debezium.connector.jdbc.naming.TableNamingStrategy.IGNORE_SINK_RECORD_FOR_TABLE;
import java.sql.SQLException;
import java.util.List;
@ -70,7 +69,7 @@ public void execute(SinkRecord record) {
.build();
String tableName = tableNamingStrategy.resolveTableName(config, record);
if (IGNORE_SINK_RECORD_FOR_TABLE.equals(tableName)) {
if (tableName == null) {
LOGGER.warn("Ignored to write record from topic '{}' partition '{}' offset '{}'", record.topic(), record.kafkaPartition(), record.kafkaOffset());
return;
}

View File

@ -42,9 +42,9 @@ public String resolveTableName(JdbcSinkConnectorConfig config, SinkRecord record
private String resolveTableNameBySource(JdbcSinkConnectorConfig config, SinkRecord record, String tableFormat) {
String table = tableFormat;
if (table.contains("${source.")) {
if (isMaybeTombstone(record)) {
if (isTombstone(record)) {
LOGGER.warn("Ignore this record because it seems to be a tombstone that doesn't have source field, then cannot resolve table name in topic '{}', partition '{}', offset '{}'", record.topic(), record.kafkaPartition(), record.kafkaOffset());
return IGNORE_SINK_RECORD_FOR_TABLE;
return null;
}
try {
@ -63,7 +63,7 @@ private String resolveTableNameBySource(JdbcSinkConnectorConfig config, SinkReco
return table;
}
private boolean isMaybeTombstone(SinkRecord record) {
private boolean isTombstone(SinkRecord record) {
return record.value() == null;
}
}

View File

@ -15,7 +15,6 @@
* @author Chris Cranford
*/
public interface TableNamingStrategy {
String IGNORE_SINK_RECORD_FOR_TABLE = "__IGNORE_SINK_RECORD_FOR_TABLE";
/**
* Resolves the logical table name from the sink record.
*

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.jdbc;
import static io.debezium.connector.jdbc.naming.TableNamingStrategy.IGNORE_SINK_RECORD_FOR_TABLE;
import static org.fest.assertions.Assertions.assertThat;
import java.util.Map;
@ -78,6 +77,15 @@ public void testDefaultTableNamingStrategyWithDebeziumSourceAndTombstone() {
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final DefaultTableNamingStrategy strategy = new DefaultTableNamingStrategy();
SinkRecord sinkRecord = factory.tombstoneRecord("database.schema.table");
assertThat(strategy.resolveTableName(config, sinkRecord)).isEqualTo(IGNORE_SINK_RECORD_FOR_TABLE);
assertThat(strategy.resolveTableName(config, sinkRecord)).isNull();
}
@Test
public void testDefaultTableNamingStrategyWithTopicAndTombstone() {
final JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("table.name.format", "kafka_${topic}"));
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final DefaultTableNamingStrategy strategy = new DefaultTableNamingStrategy();
SinkRecord sinkRecord = factory.tombstoneRecord("database.schema.table");
assertThat(strategy.resolveTableName(config, sinkRecord)).isEqualTo("kafka_database_schema_table");
}
}