DBZ-1407 Replacing custom CassandraTopicSelector with Debezium TopicSelector

This commit is contained in:
Bingqin Zhou 2020-04-01 01:42:24 -07:00 committed by GitHub
parent 82d7103acf
commit 90524d27e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 19 additions and 105 deletions

View File

@ -5,110 +5,14 @@
*/
package io.debezium.connector.cassandra;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.schema.TopicSelector;
/**
* Responsible for selecting the Kafka topic that the record will get send to.
*/
public class CassandraTopicSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraTopicSelector.class);
private static final String DEFAULT_DELIMITER = ".";
private final String prefix;
private final String delimiter;
private final TableTopicNamer tableTopicNamer;
private CassandraTopicSelector(String prefix, String delimiter, TableTopicNamer tableTopicNamer) {
this.prefix = prefix;
this.delimiter = delimiter;
this.tableTopicNamer = new TopicNameCache(new TopicNameSanitizer(tableTopicNamer));
}
static CassandraTopicSelector defaultSelector(String topicPrefix) {
return new CassandraTopicSelector(
topicPrefix,
DEFAULT_DELIMITER,
(keyspaceTable, prefix, delimiter) -> String.join(delimiter, prefix, keyspaceTable.keyspace, keyspaceTable.table));
}
String topicNameFor(KeyspaceTable keyspaceTable) {
return tableTopicNamer.topicNameFor(keyspaceTable, prefix, delimiter);
}
@FunctionalInterface
public interface TableTopicNamer {
String topicNameFor(KeyspaceTable keyspaceTable, String prefix, String delimiter);
}
/**
* A topic namer that replaces any characters invalid in a topic name with {@code _}.
*/
private static class TopicNameSanitizer implements TableTopicNamer {
private static final String REPLACEMENT_CHAR = "_";
private final TableTopicNamer delegate;
TopicNameSanitizer(TableTopicNamer delegate) {
this.delegate = delegate;
}
@Override
public String topicNameFor(KeyspaceTable keyspaceTable, String prefix, String delimiter) {
String topicName = delegate.topicNameFor(keyspaceTable, prefix, delimiter);
StringBuilder sanitizedNameBuilder = new StringBuilder(topicName.length());
boolean changed = false;
for (int i = 0; i < topicName.length(); i++) {
char c = topicName.charAt(i);
if (isValidTopicNameCharacter(c)) {
sanitizedNameBuilder.append(c);
}
else {
sanitizedNameBuilder.append(REPLACEMENT_CHAR);
changed = true;
}
}
if (changed) {
String sanitizedName = sanitizedNameBuilder.toString();
LOGGER.warn("Topic '{}' name isn't a valid topic, replacing it with '{}'", topicName, sanitizedName);
return sanitizedName;
}
else {
return topicName;
}
}
/**
* Whether the given character is a legal character of a Kafka topic name. Legal characters are
* {@code [a-zA-Z0-9._-]}.
*
* {@see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java}
*/
private boolean isValidTopicNameCharacter(char c) {
return c == '.' || c == '_' || c == '-' || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9');
}
}
@ThreadSafe
private static class TopicNameCache implements TableTopicNamer {
private final ConcurrentHashMap<KeyspaceTable, String> topicNames;
private final TableTopicNamer delegate;
TopicNameCache(TableTopicNamer delegate) {
this.topicNames = new ConcurrentHashMap<>();
this.delegate = delegate;
}
@Override
public String topicNameFor(KeyspaceTable keyspaceTable, String prefix, String delimiter) {
return topicNames.computeIfAbsent(keyspaceTable, i -> delegate.topicNameFor(i, prefix, delimiter));
}
public static TopicSelector<KeyspaceTable> defaultSelector(String prefix, String heartbeatPrefix) {
return TopicSelector.defaultSelector(prefix, heartbeatPrefix, ".",
(keyspaceTable, pref, delimiter) -> String.join(delimiter, pref, keyspaceTable.keyspace, keyspaceTable.table));
}
}

View File

@ -19,6 +19,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.schema.TopicSelector;
/**
* This emitter is responsible for emitting records to Kafka broker and managing offsets post send.
*/
@ -26,7 +28,7 @@ public class KafkaRecordEmitter implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordEmitter.class);
private final KafkaProducer<byte[], byte[]> producer;
private final CassandraTopicSelector topicSelector;
private final TopicSelector<KeyspaceTable> topicSelector;
private final OffsetWriter offsetWriter;
private final OffsetFlushPolicy offsetFlushPolicy;
private final Map<Record, Future<RecordMetadata>> futures = new LinkedHashMap<>();
@ -36,11 +38,11 @@ public class KafkaRecordEmitter implements AutoCloseable {
private Converter keyConverter;
private Converter valueConverter;
public KafkaRecordEmitter(String kafkaTopicPrefix, Properties kafkaProperties, OffsetWriter offsetWriter,
Duration offsetFlushIntervalMs, long maxOffsetFlushSize,
public KafkaRecordEmitter(String kafkaTopicPrefix, String heartbeatPrefix, Properties kafkaProperties,
OffsetWriter offsetWriter, Duration offsetFlushIntervalMs, long maxOffsetFlushSize,
Converter keyConverter, Converter valueConverter) {
this.producer = new KafkaProducer<>(kafkaProperties);
this.topicSelector = CassandraTopicSelector.defaultSelector(kafkaTopicPrefix);
this.topicSelector = CassandraTopicSelector.defaultSelector(kafkaTopicPrefix, heartbeatPrefix);
this.offsetWriter = offsetWriter;
this.offsetFlushPolicy = offsetFlushIntervalMs.isZero() ? OffsetFlushPolicy.always() : OffsetFlushPolicy.periodic(offsetFlushIntervalMs, maxOffsetFlushSize);
this.keyConverter = keyConverter;

View File

@ -9,10 +9,12 @@
import com.datastax.driver.core.TableMetadata;
import io.debezium.schema.DataCollectionId;
/**
* The KeyspaceTable uniquely identifies each table in the Cassandra cluster
*/
public class KeyspaceTable {
public class KeyspaceTable implements DataCollectionId {
public final String keyspace;
public final String table;
@ -51,4 +53,9 @@ public int hashCode() {
public String toString() {
return name();
}
@Override
public String identifier() {
return keyspace + "." + table;
}
}

View File

@ -35,6 +35,7 @@ public class QueueProcessor extends AbstractProcessor {
public QueueProcessor(CassandraConnectorContext context) {
this(context, new KafkaRecordEmitter(
context.getCassandraConnectorConfig().kafkaTopicPrefix(),
context.getCassandraConnectorConfig().getHeartbeatTopicsPrefix(),
context.getCassandraConnectorConfig().getKafkaConfigs(),
context.getOffsetWriter(),
context.getCassandraConnectorConfig().offsetFlushIntervalMs(),