diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java index d917ca0c6..783fe44b3 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java @@ -6,7 +6,6 @@ package io.debezium.connector.jdbc; import static io.debezium.connector.jdbc.JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE; -import static io.debezium.connector.jdbc.naming.TableNamingStrategy.IGNORE_SINK_RECORD_FOR_TABLE; import java.sql.SQLException; import java.util.List; @@ -70,7 +69,7 @@ public void execute(SinkRecord record) { .build(); String tableName = tableNamingStrategy.resolveTableName(config, record); - if (IGNORE_SINK_RECORD_FOR_TABLE.equals(tableName)) { + if (tableName == null) { LOGGER.warn("Ignored to write record from topic '{}' partition '{}' offset '{}'", record.topic(), record.kafkaPartition(), record.kafkaOffset()); return; } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/naming/DefaultTableNamingStrategy.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/naming/DefaultTableNamingStrategy.java index b479b5369..19d78312c 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/naming/DefaultTableNamingStrategy.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/naming/DefaultTableNamingStrategy.java @@ -42,9 +42,9 @@ public String resolveTableName(JdbcSinkConnectorConfig config, SinkRecord record private String resolveTableNameBySource(JdbcSinkConnectorConfig config, SinkRecord record, String tableFormat) { String table = tableFormat; if (table.contains("${source.")) { - if (isMaybeTombstone(record)) { + if (isTombstone(record)) { LOGGER.warn("Ignore this record because it seems to be a tombstone that doesn't have source field, then cannot resolve table name in topic '{}', partition '{}', offset '{}'", record.topic(), record.kafkaPartition(), record.kafkaOffset()); - return IGNORE_SINK_RECORD_FOR_TABLE; + return null; } try { @@ -63,7 +63,7 @@ private String resolveTableNameBySource(JdbcSinkConnectorConfig config, SinkReco return table; } - private boolean isMaybeTombstone(SinkRecord record) { + private boolean isTombstone(SinkRecord record) { return record.value() == null; } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/naming/TableNamingStrategy.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/naming/TableNamingStrategy.java index 31a5a01dd..830055556 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/naming/TableNamingStrategy.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/naming/TableNamingStrategy.java @@ -15,7 +15,6 @@ * @author Chris Cranford */ public interface TableNamingStrategy { - String IGNORE_SINK_RECORD_FOR_TABLE = "__IGNORE_SINK_RECORD_FOR_TABLE"; /** * Resolves the logical table name from the sink record. * diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/TableNamingStrategyTest.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/TableNamingStrategyTest.java index 3854ba7d7..77d00ce83 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/TableNamingStrategyTest.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/TableNamingStrategyTest.java @@ -5,7 +5,6 @@ */ package io.debezium.connector.jdbc; -import static io.debezium.connector.jdbc.naming.TableNamingStrategy.IGNORE_SINK_RECORD_FOR_TABLE; import static org.fest.assertions.Assertions.assertThat; import java.util.Map; @@ -78,6 +77,15 @@ public void testDefaultTableNamingStrategyWithDebeziumSourceAndTombstone() { final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); final DefaultTableNamingStrategy strategy = new DefaultTableNamingStrategy(); SinkRecord sinkRecord = factory.tombstoneRecord("database.schema.table"); - assertThat(strategy.resolveTableName(config, sinkRecord)).isEqualTo(IGNORE_SINK_RECORD_FOR_TABLE); + assertThat(strategy.resolveTableName(config, sinkRecord)).isNull(); + } + + @Test + public void testDefaultTableNamingStrategyWithTopicAndTombstone() { + final JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("table.name.format", "kafka_${topic}")); + final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); + final DefaultTableNamingStrategy strategy = new DefaultTableNamingStrategy(); + SinkRecord sinkRecord = factory.tombstoneRecord("database.schema.table"); + assertThat(strategy.resolveTableName(config, sinkRecord)).isEqualTo("kafka_database_schema_table"); } }