DBZ-1405 Use kafka connect conveters instead of serializers.

This commit is contained in:
Bingqin Zhou 2019-09-09 14:23:53 -07:00 committed by Gunnar Morling
parent 58f43b4cae
commit 1287f51b46
5 changed files with 107 additions and 17 deletions

View File

@ -8,21 +8,26 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.clients.producer.ProducerConfig;
import com.datastax.driver.core.ConsistencyLevel;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import com.datastax.driver.core.ConsistencyLevel;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.connect.storage.Converter;
/**
* All configs used by a Cassandra connector agent.
*/
public class CassandraConnectorConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordEmitter.class);
/**
* The set of predefined SnapshotMode options.
*/
@ -68,6 +73,26 @@ public static Optional<SnapshotMode> fromText(String text) {
*/
public static final String KAFKA_PRODUCER_CONFIG_PREFIX = "kafka.producer.";
/**
* The prefix prepended to all Kafka key converter configurations, including schema registry.
*/
public static final String KEY_CONVERTER_PREFIX = "key.converter.";
/**
* The prefix prepended to all Kafka value converter configurations, including schema registry.
*/
public static final String VALUE_CONVERTER_PREFIX = "value.converter.";
/**
* Required config for Kafka key converter.
*/
public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
/**
* Required config for Kafka value converter.
*/
public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
/**
* Specifies the criteria for running a snapshot (eg. initial sync) upon startup of the cassandra connector agent.
* Must be one of 'INITIAL', 'ALWAYS', or 'NEVER'. The default snapshot mode is 'INITIAL'.
@ -177,7 +202,7 @@ public static Optional<SnapshotMode> fromText(String text) {
* are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be
* larger than the maximum batch size specified in the max.batch.size property.
* The capacity of the queue to hold deserialized {@link Record}
* before they are converted to Avro Records and emitted to Kafka.
* before they are converted to kafka connect Struct Records and emitted to Kafka.
*/
public static final String MAX_QUEUE_SIZE = "max.queue.size";
public static final int DEFAULT_MAX_QUEUE_SIZE = 8192;
@ -257,8 +282,8 @@ public Properties getKafkaConfigs() {
Properties props = new Properties();
// default configs
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
configs.entrySet().stream()
.filter(entry -> entry.getKey().toString().startsWith(KAFKA_PRODUCER_CONFIG_PREFIX))
@ -413,6 +438,42 @@ public boolean tombstonesOnDelete() {
Boolean.parseBoolean((String) configs.get(TOMBSTONES_ON_DELETE)) : DEFAULT_TOMBSTONES_ON_DELETE;
}
public Converter getKeyConverter() throws CassandraConnectorConfigException {
try {
Class keyConverterClass = Class.forName((String) configs.get(KEY_CONVERTER_CLASS_CONFIG));
Converter keyConverter = (Converter) keyConverterClass.newInstance();
HashMap<String, Object> keyConverterConfigs = keyValueConverterConfigs(KEY_CONVERTER_PREFIX);
keyConverter.configure(keyConverterConfigs, true);
return keyConverter;
} catch (Exception e) {
throw new CassandraConnectorConfigException(e);
}
}
public Converter getValueConverter() throws CassandraConnectorConfigException {
try {
Class valueConverterClass = Class.forName((String) configs.get(VALUE_CONVERTER_CLASS_CONFIG));
Converter valueConverter = (Converter) valueConverterClass.newInstance();
HashMap<String, Object> valueConverterConfigs = keyValueConverterConfigs(VALUE_CONVERTER_PREFIX);
valueConverter.configure(valueConverterConfigs, false);
return valueConverter;
} catch (Exception e) {
throw new CassandraConnectorConfigException(e);
}
}
private HashMap<String, Object> keyValueConverterConfigs(String converterPrefix) {
HashMap<String, Object> converterConfigs = new HashMap<> ();
configs.entrySet().stream()
.filter(entry -> entry.getKey().toString().startsWith(converterPrefix))
.forEach(entry -> {
String k = entry.getKey().toString().replace(converterPrefix, "");
Object v = entry.getValue();
converterConfigs.put(k, v);
});
return converterConfigs;
}
@Override
public String toString() {
return configs.entrySet().stream()

View File

@ -5,10 +5,10 @@
*/
package io.debezium.connector.cassandra;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,7 +25,7 @@
public class KafkaRecordEmitter implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordEmitter.class);
private final KafkaProducer<Struct, Struct> producer;
private final KafkaProducer<byte[], byte[]> producer;
private final CassandraTopicSelector topicSelector;
private final OffsetWriter offsetWriter;
private final OffsetFlushPolicy offsetFlushPolicy;
@ -33,26 +33,34 @@ public class KafkaRecordEmitter implements AutoCloseable {
private final Object lock = new Object();
private long timeOfLastFlush;
private long emitCount = 0;
private Converter keyConverter;
private Converter valueConverter;
public KafkaRecordEmitter(String kafkaTopicPrefix, Properties kafkaProperties, OffsetWriter offsetWriter, Duration offsetFlushIntervalMs, long maxOffsetFlushSize) {
public KafkaRecordEmitter(String kafkaTopicPrefix, Properties kafkaProperties, OffsetWriter offsetWriter,
Duration offsetFlushIntervalMs, long maxOffsetFlushSize,
Converter keyConverter, Converter valueConverter) {
this.producer = new KafkaProducer<>(kafkaProperties);
this.topicSelector = CassandraTopicSelector.defaultSelector(kafkaTopicPrefix);
this.offsetWriter = offsetWriter;
this.offsetFlushPolicy = offsetFlushIntervalMs.isZero() ? OffsetFlushPolicy.always() : OffsetFlushPolicy.periodic(offsetFlushIntervalMs, maxOffsetFlushSize);
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
}
public void emit(Record record) {
synchronized (lock) {
ProducerRecord<Struct, Struct> producerRecord = toProducerRecord(record);
ProducerRecord<byte[], byte[]> producerRecord = toProducerRecord(record);
Future<RecordMetadata> future = producer.send(producerRecord);
futures.put(record, future);
maybeFlushAndMarkOffset();
}
}
private ProducerRecord<Struct, Struct> toProducerRecord(Record record) {
private ProducerRecord<byte[], byte[]> toProducerRecord(Record record) {
String topic = topicSelector.topicNameFor(record.getSource().keyspaceTable);
return new ProducerRecord<>(topic, record.buildKey(), record.buildValue());
byte[] serializedKey = keyConverter.fromConnectData(topic, record.getKeySchema(), record.buildKey());
byte[] serializedValue = valueConverter.fromConnectData(topic, record.getValueSchema(), record.buildValue());
return new ProducerRecord<>(topic, serializedKey, serializedValue);
}
private void maybeFlushAndMarkOffset() {

View File

@ -35,7 +35,9 @@ public QueueProcessor(CassandraConnectorContext context) {
context.getCassandraConnectorConfig().getKafkaConfigs(),
context.getOffsetWriter(),
context.getCassandraConnectorConfig().offsetFlushIntervalMs(),
context.getCassandraConnectorConfig().maxOffsetFlushSize()
context.getCassandraConnectorConfig().maxOffsetFlushSize(),
context.getCassandraConnectorConfig().getKeyConverter(),
context.getCassandraConnectorConfig().getValueConverter()
));
}

View File

@ -8,6 +8,7 @@
import org.junit.Test;
import java.util.Properties;
import java.util.HashMap;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -126,6 +127,26 @@ public void testConfigs() {
String transferClazz = "io.debezium.connector.cassandra.BlackHoleCommitLogTransfer";
config = buildTaskConfig(CassandraConnectorConfig.COMMIT_LOG_TRANSFER_CLASS, transferClazz);
assertEquals(transferClazz, config.getCommitLogTransfer().getClass().getName());
String keyConverterClass = "io.confluent.connect.avro.AvroConverter";
HashMap<String, Object> keyConverterConfigs = new HashMap<> ();
keyConverterConfigs.put(CassandraConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, keyConverterClass);
keyConverterConfigs.put(CassandraConnectorConfig.KEY_CONVERTER_PREFIX+"schema.registry.url", "http://localhost:8081");
config = buildTaskConfigs(keyConverterConfigs);
assertEquals(keyConverterClass, config.getKeyConverter().getClass().getName());
String valueConverterClass = "org.apache.kafka.connect.json.JsonConverter";
config = buildTaskConfig(CassandraConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, valueConverterClass);
assertEquals(valueConverterClass, config.getValueConverter().getClass().getName());
}
private CassandraConnectorConfig buildTaskConfigs(HashMap<String, Object> map) {
Properties props = new Properties();
for (String key : map.keySet()) {
props.put(key, map.get(key));
}
return new CassandraConnectorConfig(props);
}
private CassandraConnectorConfig buildTaskConfig(String key, Object value) {

View File

@ -8,7 +8,6 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.cassandraunit.utils.CqlOperations;
@ -174,7 +173,6 @@ protected static Properties generateDefaultConfigMap() throws IOException {
props.put(CassandraConnectorConfig.CASSANDRA_PORT, String.valueOf(TEST_CASSANDRA_PORT));
props.put(CassandraConnectorConfig.OFFSET_BACKING_STORE_DIR, Files.createTempDirectory("offset").toString());
props.put(CassandraConnectorConfig.KAFKA_PRODUCER_CONFIG_PREFIX + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TEST_KAFKA_SERVERS);
props.put(CassandraConnectorConfig.KAFKA_PRODUCER_CONFIG_PREFIX + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, TEST_SCHEMA_REGISTRY_URL);
props.put(CassandraConnectorConfig.COMMIT_LOG_RELOCATION_DIR, Files.createTempDirectory("cdc_raw_relocation").toString());
return props;
}