diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraTopicSelector.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraTopicSelector.java index a877cbe86..46887d9a4 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraTopicSelector.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraTopicSelector.java @@ -5,110 +5,14 @@ */ package io.debezium.connector.cassandra; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.debezium.annotation.ThreadSafe; +import io.debezium.schema.TopicSelector; /** * Responsible for selecting the Kafka topic that the record will get send to. */ public class CassandraTopicSelector { - private static final Logger LOGGER = LoggerFactory.getLogger(CassandraTopicSelector.class); - - private static final String DEFAULT_DELIMITER = "."; - - private final String prefix; - private final String delimiter; - private final TableTopicNamer tableTopicNamer; - - private CassandraTopicSelector(String prefix, String delimiter, TableTopicNamer tableTopicNamer) { - this.prefix = prefix; - this.delimiter = delimiter; - this.tableTopicNamer = new TopicNameCache(new TopicNameSanitizer(tableTopicNamer)); - } - - static CassandraTopicSelector defaultSelector(String topicPrefix) { - return new CassandraTopicSelector( - topicPrefix, - DEFAULT_DELIMITER, - (keyspaceTable, prefix, delimiter) -> String.join(delimiter, prefix, keyspaceTable.keyspace, keyspaceTable.table)); - } - - String topicNameFor(KeyspaceTable keyspaceTable) { - return tableTopicNamer.topicNameFor(keyspaceTable, prefix, delimiter); - } - - @FunctionalInterface - public interface TableTopicNamer { - String topicNameFor(KeyspaceTable keyspaceTable, String prefix, String delimiter); - } - - /** - * A topic namer that replaces any characters invalid in a topic name with {@code _}. - */ - private static class TopicNameSanitizer implements TableTopicNamer { - private static final String REPLACEMENT_CHAR = "_"; - private final TableTopicNamer delegate; - - TopicNameSanitizer(TableTopicNamer delegate) { - this.delegate = delegate; - } - - @Override - public String topicNameFor(KeyspaceTable keyspaceTable, String prefix, String delimiter) { - String topicName = delegate.topicNameFor(keyspaceTable, prefix, delimiter); - - StringBuilder sanitizedNameBuilder = new StringBuilder(topicName.length()); - boolean changed = false; - - for (int i = 0; i < topicName.length(); i++) { - char c = topicName.charAt(i); - if (isValidTopicNameCharacter(c)) { - sanitizedNameBuilder.append(c); - } - else { - sanitizedNameBuilder.append(REPLACEMENT_CHAR); - changed = true; - } - } - - if (changed) { - String sanitizedName = sanitizedNameBuilder.toString(); - LOGGER.warn("Topic '{}' name isn't a valid topic, replacing it with '{}'", topicName, sanitizedName); - return sanitizedName; - } - else { - return topicName; - } - } - - /** - * Whether the given character is a legal character of a Kafka topic name. Legal characters are - * {@code [a-zA-Z0-9._-]}. - * - * {@see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java} - */ - private boolean isValidTopicNameCharacter(char c) { - return c == '.' || c == '_' || c == '-' || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9'); - } - } - - @ThreadSafe - private static class TopicNameCache implements TableTopicNamer { - private final ConcurrentHashMap topicNames; - private final TableTopicNamer delegate; - - TopicNameCache(TableTopicNamer delegate) { - this.topicNames = new ConcurrentHashMap<>(); - this.delegate = delegate; - } - - @Override - public String topicNameFor(KeyspaceTable keyspaceTable, String prefix, String delimiter) { - return topicNames.computeIfAbsent(keyspaceTable, i -> delegate.topicNameFor(i, prefix, delimiter)); - } + public static TopicSelector defaultSelector(String prefix, String heartbeatPrefix) { + return TopicSelector.defaultSelector(prefix, heartbeatPrefix, ".", + (keyspaceTable, pref, delimiter) -> String.join(delimiter, pref, keyspaceTable.keyspace, keyspaceTable.table)); } } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KafkaRecordEmitter.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KafkaRecordEmitter.java index beb7f8c94..977483d47 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KafkaRecordEmitter.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KafkaRecordEmitter.java @@ -19,6 +19,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.schema.TopicSelector; + /** * This emitter is responsible for emitting records to Kafka broker and managing offsets post send. */ @@ -26,7 +28,7 @@ public class KafkaRecordEmitter implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordEmitter.class); private final KafkaProducer producer; - private final CassandraTopicSelector topicSelector; + private final TopicSelector topicSelector; private final OffsetWriter offsetWriter; private final OffsetFlushPolicy offsetFlushPolicy; private final Map> futures = new LinkedHashMap<>(); @@ -36,11 +38,11 @@ public class KafkaRecordEmitter implements AutoCloseable { private Converter keyConverter; private Converter valueConverter; - public KafkaRecordEmitter(String kafkaTopicPrefix, Properties kafkaProperties, OffsetWriter offsetWriter, - Duration offsetFlushIntervalMs, long maxOffsetFlushSize, + public KafkaRecordEmitter(String kafkaTopicPrefix, String heartbeatPrefix, Properties kafkaProperties, + OffsetWriter offsetWriter, Duration offsetFlushIntervalMs, long maxOffsetFlushSize, Converter keyConverter, Converter valueConverter) { this.producer = new KafkaProducer<>(kafkaProperties); - this.topicSelector = CassandraTopicSelector.defaultSelector(kafkaTopicPrefix); + this.topicSelector = CassandraTopicSelector.defaultSelector(kafkaTopicPrefix, heartbeatPrefix); this.offsetWriter = offsetWriter; this.offsetFlushPolicy = offsetFlushIntervalMs.isZero() ? OffsetFlushPolicy.always() : OffsetFlushPolicy.periodic(offsetFlushIntervalMs, maxOffsetFlushSize); this.keyConverter = keyConverter; diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KeyspaceTable.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KeyspaceTable.java index 98292d8ad..42f6cd297 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KeyspaceTable.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KeyspaceTable.java @@ -9,10 +9,12 @@ import com.datastax.driver.core.TableMetadata; +import io.debezium.schema.DataCollectionId; + /** * The KeyspaceTable uniquely identifies each table in the Cassandra cluster */ -public class KeyspaceTable { +public class KeyspaceTable implements DataCollectionId { public final String keyspace; public final String table; @@ -51,4 +53,9 @@ public int hashCode() { public String toString() { return name(); } + + @Override + public String identifier() { + return keyspace + "." + table; + } } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/QueueProcessor.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/QueueProcessor.java index b94a922ff..4a2d5e534 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/QueueProcessor.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/QueueProcessor.java @@ -35,6 +35,7 @@ public class QueueProcessor extends AbstractProcessor { public QueueProcessor(CassandraConnectorContext context) { this(context, new KafkaRecordEmitter( context.getCassandraConnectorConfig().kafkaTopicPrefix(), + context.getCassandraConnectorConfig().getHeartbeatTopicsPrefix(), context.getCassandraConnectorConfig().getKafkaConfigs(), context.getOffsetWriter(), context.getCassandraConnectorConfig().offsetFlushIntervalMs(),