DBZ-1408 Applying project formatter

This commit is contained in:
Gunnar Morling 2019-11-08 12:21:19 +01:00
parent e3466f78bd
commit 9408cc4d9d
65 changed files with 440 additions and 381 deletions

View File

@ -5,11 +5,11 @@
*/
package io.debezium.connector.cassandra;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* An abstract processor designed to be a convenient superclass for all concrete processors for Cassandra
* connector task. The class handles concurrency control for starting and stopping the processor.
@ -36,12 +36,14 @@ public AbstractProcessor(String name, long delayMillis) {
/**
* Override initialize to initialize resources before starting the processor
*/
public void initialize() throws Exception { }
public void initialize() throws Exception {
}
/**
* Override destroy to clean up resources after stopping the processor
*/
public void destroy() throws Exception { }
public void destroy() throws Exception {
}
public boolean isRunning() {
return running;

View File

@ -5,15 +5,16 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
/**
* This queue stores the change events sent from the readers and gets processed by {@link QueueProcessor}
* where the events will get emitted to kafka.

View File

@ -5,6 +5,18 @@
*/
package io.debezium.connector.cassandra;
import static com.codahale.metrics.MetricRegistry.name;
import static io.debezium.connector.cassandra.CassandraConnectorTask.METRIC_REGISTRY_INSTANCE;
import static io.debezium.connector.cassandra.network.SslContextFactory.createSslContext;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
@ -20,20 +32,9 @@
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.google.common.annotations.VisibleForTesting;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name;
import static io.debezium.connector.cassandra.CassandraConnectorTask.METRIC_REGISTRY_INSTANCE;
import static io.debezium.connector.cassandra.network.SslContextFactory.createSslContext;
/**
* A wrapper around Cassandra driver that is used to query Cassandra table and table schema.
*/

View File

@ -12,17 +12,19 @@
import java.util.Properties;
import java.util.stream.Collectors;
import io.debezium.config.Field;
import io.debezium.config.Configuration;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.connect.storage.Converter;
import com.datastax.driver.core.ConsistencyLevel;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import com.datastax.driver.core.ConsistencyLevel;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.ByteArraySerializer;
/**
* All configs used by a Cassandra connector agent.
@ -184,7 +186,6 @@ public static Optional<SnapshotMode> fromText(String text) {
public static final Field COMMIT_LOG_TRANSFER_CLASS = Field.create("commit.log.transfer.class")
.withType(Type.STRING).withDefault(DEFAULT_COMMIT_LOG_TRANSFER_CLASS);
/**
* The directory to store offset tracking files.
*/
@ -206,10 +207,10 @@ public static Optional<SnapshotMode> fromText(String text) {
public static final Field MAX_OFFSET_FLUSH_SIZE = Field.create("max.offset.flush.size")
.withType(Type.INT).withDefault(DEFAULT_MAX_OFFSET_FLUSH_SIZE);
/**
* Positive integer value that specifies the number of milliseconds the schema processor should wait before
* refreshing the cached Cassandra table schemas.
*/
/**
* Positive integer value that specifies the number of milliseconds the schema processor should wait before
* refreshing the cached Cassandra table schemas.
*/
public static final int DEFAULT_SCHEMA_POLL_INTERVAL_MS = 10000;
public static final Field SCHEMA_POLL_INTERVAL_MS = Field.create("schema.refresh.interval.ms")
.withType(Type.INT).withDefault(DEFAULT_SCHEMA_POLL_INTERVAL_MS);

View File

@ -5,13 +5,12 @@
*/
package io.debezium.connector.cassandra;
import java.io.IOException;
import java.security.GeneralSecurityException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import java.io.IOException;
import java.security.GeneralSecurityException;
/**
* Contains contextual information and objects scoped to the lifecycle
* of {@link CassandraConnectorTask} implementation.

View File

@ -5,21 +5,6 @@
*/
package io.debezium.connector.cassandra;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.health.HealthCheckRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.codahale.metrics.servlets.HealthCheckServlet;
import com.codahale.metrics.servlets.MetricsServlet;
import com.codahale.metrics.servlets.PingServlet;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.debezium.connector.cassandra.network.BuildInfoServlet;
import io.debezium.config.Configuration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
@ -30,6 +15,23 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.health.HealthCheckRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.codahale.metrics.servlets.HealthCheckServlet;
import com.codahale.metrics.servlets.MetricsServlet;
import com.codahale.metrics.servlets.PingServlet;
import io.debezium.config.Configuration;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.debezium.connector.cassandra.network.BuildInfoServlet;
/**
* A task that reads Cassandra commit log in CDC directory and generate corresponding data
* change events which will be emitted to Kafka. If the table has not been bootstrapped,

View File

@ -5,13 +5,13 @@
*/
package io.debezium.connector.cassandra;
import java.util.HashMap;
import java.util.Map;
import com.codahale.metrics.health.HealthCheck;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
public class CassandraConnectorTaskHealthCheck extends HealthCheck {
private static final ObjectMapper mapper = new ObjectMapper();
private final CassandraConnectorTask.ProcessorGroup processorGroup;

View File

@ -7,11 +7,11 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfoStructMaker;
import io.debezium.util.SchemaNameAdjuster;
public class CassandraSourceInfoStructMaker extends AbstractSourceInfoStructMaker<SourceInfo> {
private final Schema schema;

View File

@ -5,11 +5,12 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.annotation.ThreadSafe;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import io.debezium.annotation.ThreadSafe;
/**
* Responsible for selecting the Kafka topic that the record will get send to.
@ -91,7 +92,7 @@ public String topicNameFor(KeyspaceTable keyspaceTable, String prefix, String de
* {@see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java}
*/
private boolean isValidTopicNameCharacter(char c) {
return c == '.' || c == '_' || c == '-' || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9');
return c == '.' || c == '_' || c == '-' || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9');
}
}

View File

@ -5,15 +5,17 @@
*/
package io.debezium.connector.cassandra;
import com.datastax.driver.core.ColumnMetadata;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import org.apache.kafka.connect.data.Struct;
import java.util.Objects;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.kafka.connect.data.Struct;
import java.util.Objects;
import com.datastax.driver.core.ColumnMetadata;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
/**
* Cell-level data about the source event. Each cell contains the name, value and

View File

@ -5,9 +5,7 @@
*/
package io.debezium.connector.cassandra;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import java.io.File;
import java.io.IOException;
@ -16,8 +14,9 @@
import java.util.Arrays;
import java.util.Collections;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link CommitLogProcessor} is used to process CommitLog in CDC directory.

View File

@ -5,12 +5,12 @@
*/
package io.debezium.connector.cassandra;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import static io.debezium.connector.cassandra.CassandraConnectorTask.METRIC_REGISTRY_INSTANCE;
import java.util.concurrent.atomic.AtomicLong;
import static io.debezium.connector.cassandra.CassandraConnectorTask.METRIC_REGISTRY_INSTANCE;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
public class CommitLogProcessorMetrics {
private String commitLogFilename = null;

View File

@ -5,13 +5,19 @@
*/
package io.debezium.connector.cassandra;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.TableMetadata;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import io.debezium.time.Conversions;
import org.apache.kafka.connect.data.Schema;
import static io.debezium.connector.cassandra.CommitLogReadHandlerImpl.RowType.DELETE;
import static io.debezium.connector.cassandra.CommitLogReadHandlerImpl.RowType.INSERT;
import static io.debezium.connector.cassandra.CommitLogReadHandlerImpl.RowType.UPDATE;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.LivenessInfo;
@ -24,21 +30,17 @@
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.TableMetadata;
import static io.debezium.connector.cassandra.CommitLogReadHandlerImpl.RowType.DELETE;
import static io.debezium.connector.cassandra.CommitLogReadHandlerImpl.RowType.INSERT;
import static io.debezium.connector.cassandra.CommitLogReadHandlerImpl.RowType.UPDATE;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import io.debezium.time.Conversions;
/**
* Handler that implements {@link CommitLogReadHandler} interface provided by Cassandra source code.
@ -58,7 +60,6 @@ public class CommitLogReadHandlerImpl implements CommitLogReadHandler {
private final SchemaHolder schemaHolder;
private final CommitLogProcessorMetrics metrics;
CommitLogReadHandlerImpl(SchemaHolder schemaHolder,
BlockingEventQueue<Event> queue,
OffsetWriter offsetWriter,
@ -166,7 +167,6 @@ enum RowType {
*/
RANGE_TOMBSTONE,
/**
* Unknown row-level operation
*/
@ -330,7 +330,8 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo
after.addCell(cellData);
}
recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(pu.maxTimestamp()));
recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(pu.maxTimestamp()));
recordMaker.delete(after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue);
}
catch (Exception e) {
@ -449,7 +450,7 @@ private static List<Object> getPartitionKeys(PartitionUpdate pu) {
Object value = CassandraTypeDeserializer.deserialize(type, bb);
values.add(value);
// composite partition key
// composite partition key
}
else {
ByteBuffer keyBytes = pu.partitionKey().getKey().duplicate();
@ -463,9 +464,9 @@ private static List<Object> getPartitionKeys(PartitionUpdate pu) {
}
// the encoding of columns in the partition key byte buffer is
// <col><col><col>...
// <col><col><col>...
// where <col> is:
// <length of value><value><end-of-component byte>
// <length of value><value><end-of-component byte>
// <length of value> is a 2 bytes unsigned short (excluding 0xFFFF used to encode "static columns")
// <end-of-component byte> should always be 0 for columns (1 for query bounds)
// this section reads the bytes for each column and deserialize into objects based on each column type

View File

@ -16,13 +16,15 @@ public interface CommitLogTransfer {
/**
* Initialize resources required by the commit log transfer
*/
default void init(Properties commitLogTransferConfigs) throws Exception { }
default void init(Properties commitLogTransferConfigs) throws Exception {
}
/**
* Destroy resources used by the commit log transfer
*/
default void destroy() throws Exception { }
default void destroy() throws Exception {
}
/**
* Transfer a commit log that has been successfully processed.

View File

@ -5,9 +5,7 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorDataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import java.io.File;
import java.nio.file.Files;
@ -15,7 +13,10 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorDataException;
/**
* Utility class used by the {@link CommitLogProcessor} to compare/delete commit log files.
@ -25,7 +26,8 @@ public final class CommitLogUtil {
private static final Pattern FILENAME_REGEX_PATTERN = Pattern.compile("CommitLog-\\d+-(\\d+).log");
private CommitLogUtil() { }
private CommitLogUtil() {
}
/**
* Move a commit log to a new directory. If the commit log already exists in the new directory, it woull be replaced.

View File

@ -5,11 +5,6 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@ -23,6 +18,12 @@
import java.nio.file.StandardOpenOption;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
/**
* A concrete implementation of {@link OffsetWriter} which tracks the progress of events
* being processed by the {@link SnapshotProcessor} and {@link CommitLogProcessor} to
@ -171,7 +172,8 @@ private FileLock init(File offsetFile) throws IOException {
FileChannel channel = FileChannel.open(offsetFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
FileLock lock = channel.tryLock();
if (lock == null) {
throw new CassandraConnectorTaskException("Failed to acquire file lock on " + offsetFile.getName() + ". There might be another Cassandra Connector Task running");
throw new CassandraConnectorTaskException(
"Failed to acquire file lock on " + offsetFile.getName() + ". There might be another Cassandra Connector Task running");
}
return lock;
}

View File

@ -5,9 +5,8 @@
*/
package io.debezium.connector.cassandra;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
/**
* An interface that indicates the record can be converted to a {@link Struct}.

View File

@ -5,13 +5,6 @@
*/
package io.debezium.connector.cassandra;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
@ -19,6 +12,13 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This emitter is responsible for emitting records to Kafka broker and managing offsets post send.
*/

View File

@ -5,10 +5,10 @@
*/
package io.debezium.connector.cassandra;
import com.datastax.driver.core.TableMetadata;
import java.util.Objects;
import com.datastax.driver.core.TableMetadata;
/**
* The KeyspaceTable uniquely identifies each table in the Cassandra cluster
*/

View File

@ -5,10 +5,10 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.util.IoUtil;
import java.util.Properties;
import io.debezium.util.IoUtil;
/**
* Information about this module.
*/
@ -16,7 +16,6 @@ public final class Module {
private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/connector/cassandra/build.version");
public static String version() {
return INFO.getProperty("version");
}

View File

@ -5,12 +5,12 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorDataException;
import java.io.File;
import java.util.Arrays;
import java.util.Objects;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorDataException;
/**
* The OffsetPosition uniquely identifies a specific {@link org.apache.cassandra.db.Mutation} in a specific commit log.
*/

View File

@ -33,7 +33,6 @@ public interface OffsetWriter {
*/
void flush();
/**
* Close all resources used by this class.
*/

View File

@ -5,16 +5,18 @@
*/
package io.debezium.connector.cassandra;
import com.google.common.annotations.VisibleForTesting;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
/**
* A thread that constantly polls records from the queue and emit them to Kafka via the KafkaRecordEmitter.
* The processor is also responsible for marking the offset to file and deleting the commit log files.
@ -37,8 +39,7 @@ public QueueProcessor(CassandraConnectorContext context) {
context.getCassandraConnectorConfig().offsetFlushIntervalMs(),
context.getCassandraConnectorConfig().maxOffsetFlushSize(),
context.getCassandraConnectorConfig().getKeyConverter(),
context.getCassandraConnectorConfig().getValueConverter()
));
context.getCassandraConnectorConfig().getValueConverter()));
}
@VisibleForTesting

View File

@ -5,13 +5,13 @@
*/
package io.debezium.connector.cassandra;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Schema;
import static io.debezium.connector.cassandra.SchemaHolder.getFieldSchema;
import java.util.List;
import java.util.Objects;
import static io.debezium.connector.cassandra.SchemaHolder.getFieldSchema;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
/**
* An immutable data structure representing a change event, and can be converted

View File

@ -5,10 +5,10 @@
*/
package io.debezium.connector.cassandra;
import org.apache.kafka.connect.data.Schema;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Schema;
/**
* Responsible for generating ChangeRecord and/or TombstoneRecord for create/update/delete events, as well as EOF events.
*/

View File

@ -5,12 +5,7 @@
*/
package io.debezium.connector.cassandra;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.TableMetadata;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Field;
import static io.debezium.connector.cassandra.SchemaHolder.getFieldSchema;
import java.util.LinkedHashMap;
import java.util.List;
@ -18,7 +13,13 @@
import java.util.Objects;
import java.util.stream.Collectors;
import static io.debezium.connector.cassandra.SchemaHolder.getFieldSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.TableMetadata;
/**
* Row-level data about the source event. Contains a map where the key is the table column

View File

@ -5,23 +5,25 @@
*/
package io.debezium.connector.cassandra;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.TableMetadata;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.TableMetadata;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
/**
* Caches the key and value schema for all CDC-enabled tables. This cache gets updated
@ -82,7 +84,7 @@ public static Schema getFieldSchema(String fieldName, Schema schema) {
private void refreshSchema(KeyspaceTable keyspaceTable) {
LOGGER.debug("Refreshing schema for {}", keyspaceTable);
TableMetadata existing = tableToKVSchemaMap.containsKey(keyspaceTable) ? tableToKVSchemaMap.get(keyspaceTable).tableMetadata() : null;
TableMetadata existing = tableToKVSchemaMap.containsKey(keyspaceTable) ? tableToKVSchemaMap.get(keyspaceTable).tableMetadata() : null;
TableMetadata latest = cassandraClient.getCdcEnabledTableMetadata(keyspaceTable.keyspace, keyspaceTable.table);
if (existing != latest) {
if (existing == null) {

View File

@ -5,23 +5,6 @@
*/
package io.debezium.connector.cassandra;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import io.debezium.time.Conversions;
import org.apache.kafka.connect.data.Schema;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
@ -31,6 +14,24 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import io.debezium.time.Conversions;
/**
* This reader is responsible for initial bootstrapping of a table,
@ -85,7 +86,7 @@ public void destroy() {
@Override
public void process() {
if (snapshotMode ==CassandraConnectorConfig.SnapshotMode.ALWAYS) {
if (snapshotMode == CassandraConnectorConfig.SnapshotMode.ALWAYS) {
snapshot();
}
else if (snapshotMode == CassandraConnectorConfig.SnapshotMode.INITIAL && initial) {
@ -210,7 +211,8 @@ private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet)
RowData after = extractRowData(row, tableMetadata.getColumns(), partitionKeyNames, clusteringKeyNames, writeTimeHolder);
// only mark offset if there are no more rows left
boolean markOffset = !rowIter.hasNext();
recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), OffsetPosition.defaultOffsetPosition(), keyspaceTable, true, Conversions.toInstantFromMicros(writeTimeHolder.get()));
recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), OffsetPosition.defaultOffsetPosition(), keyspaceTable, true,
Conversions.toInstantFromMicros(writeTimeHolder.get()));
recordMaker.insert(after, keySchema, valueSchema, markOffset, queue::enqueue);
rowNum++;
if (rowNum % 10_000 == 0) {
@ -230,7 +232,8 @@ private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet)
/**
* This function extracts the relevant row data from {@link Row} and updates the maximum writetime for each row.
*/
private static RowData extractRowData(Row row, List<ColumnMetadata> columns, Set<String> partitionKeyNames, Set<String> clusteringKeyNames, WriteTimeHolder writeTimeHolder) {
private static RowData extractRowData(Row row, List<ColumnMetadata> columns, Set<String> partitionKeyNames, Set<String> clusteringKeyNames,
WriteTimeHolder writeTimeHolder) {
RowData rowData = new RowData();
Object executionTime = readExecutionTime(row);

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.cassandra;
import com.codahale.metrics.Gauge;
import static io.debezium.connector.cassandra.CassandraConnectorTask.METRIC_REGISTRY_INSTANCE;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -14,7 +14,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static io.debezium.connector.cassandra.CassandraConnectorTask.METRIC_REGISTRY_INSTANCE;
import com.codahale.metrics.Gauge;
public class SnapshotProcessorMetrics {
private final AtomicInteger tableCount = new AtomicInteger();

View File

@ -5,17 +5,16 @@
*/
package io.debezium.connector.cassandra;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SnapshotRecord;
import io.debezium.time.Conversions;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.time.Instant;
/**
* Metadata about the source of the change event
*/

View File

@ -5,16 +5,17 @@
*/
package io.debezium.connector.cassandra.network;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
public class BuildInfoServlet extends HttpServlet {
private static final String CONTENT_TYPE = "application/json";
private static final String CACHE_CONTROL = "Cache-Control";
@ -30,7 +31,8 @@ public BuildInfoServlet(Map<String, String> buildInfo) {
@Override
protected void doGet(HttpServletRequest req,
HttpServletResponse resp) throws IOException {
HttpServletResponse resp)
throws IOException {
resp.setContentType(CONTENT_TYPE);
resp.setHeader(CACHE_CONTROL, NO_CACHE);
resp.setStatus(HttpServletResponse.SC_OK);

View File

@ -5,25 +5,29 @@
*/
package io.debezium.connector.cassandra.network;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.util.Properties;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
public class SslContextFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(SslContextFactory.class);
private SslContextFactory() { }
private SslContextFactory() {
}
/**
* Return an {@link SslContext} containing all SSL configurations parsed

View File

@ -5,14 +5,9 @@
*/
package io.debezium.connector.cassandra.transforms;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.type.converter.BasicTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.ListTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.MapTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.SetTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.TupleTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.TypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.UserTypeConverter;
import java.util.HashMap;
import java.util.Map;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BooleanType;
@ -34,12 +29,20 @@
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
import java.util.HashMap;
import java.util.Map;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.type.converter.BasicTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.ListTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.MapTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.SetTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.TupleTypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.TypeConverter;
import io.debezium.connector.cassandra.transforms.type.converter.UserTypeConverter;
public final class CassandraTypeConverter {
private CassandraTypeConverter() { }
private CassandraTypeConverter() {
}
private static final Map<DataType.Name, TypeConverter<?>> typeMap = new HashMap<>();

View File

@ -58,7 +58,8 @@
@Immutable
public final class CassandraTypeDeserializer {
private CassandraTypeDeserializer() { }
private CassandraTypeDeserializer() {
}
private static final Map<Class<? extends AbstractType>, TypeDeserializer> TYPE_MAP;

View File

@ -11,7 +11,8 @@
public final class UuidUtil {
private static final int BYTE_SIZE = 16;
private UuidUtil() { }
private UuidUtil() {
}
public static UUID asUuid(final byte[] bytes) {
ByteBuffer bb = ByteBuffer.wrap(bytes);

View File

@ -5,9 +5,10 @@
*/
package io.debezium.connector.cassandra.transforms.type.converter;
import com.datastax.driver.core.DataType;
import org.apache.cassandra.db.marshal.AbstractType;
import com.datastax.driver.core.DataType;
public class BasicTypeConverter<T extends AbstractType<?>> implements TypeConverter<T> {
private T abstractType;

View File

@ -5,11 +5,13 @@
*/
package io.debezium.connector.cassandra.transforms.type.converter;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ListType;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
public class ListTypeConverter implements TypeConverter<ListType<?>> {
@Override

View File

@ -5,11 +5,13 @@
*/
package io.debezium.connector.cassandra.transforms.type.converter;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import java.util.List;
import org.apache.cassandra.db.marshal.MapType;
import java.util.List;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
public class MapTypeConverter implements TypeConverter<MapType<?, ?>> {
@Override

View File

@ -5,11 +5,13 @@
*/
package io.debezium.connector.cassandra.transforms.type.converter;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.SetType;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
public class SetTypeConverter implements TypeConverter<SetType<?>> {
@Override
public SetType convert(DataType dataType) {

View File

@ -5,13 +5,15 @@
*/
package io.debezium.connector.cassandra.transforms.type.converter;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TupleType;
import java.util.ArrayList;
import java.util.List;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
public class TupleTypeConverter implements TypeConverter<TupleType> {
@Override
@ -19,7 +21,7 @@ public TupleType convert(DataType dataType) {
com.datastax.driver.core.TupleType tupleDataType = (com.datastax.driver.core.TupleType) dataType;
List<DataType> innerTypes = tupleDataType.getComponentTypes();
List<AbstractType<?>> innerAbstractTypes = new ArrayList<>(innerTypes.size());
for (DataType dt: innerTypes) {
for (DataType dt : innerTypes) {
innerAbstractTypes.add(CassandraTypeConverter.convert(dt));
}
return new TupleType(innerAbstractTypes);

View File

@ -5,9 +5,10 @@
*/
package io.debezium.connector.cassandra.transforms.type.converter;
import com.datastax.driver.core.DataType;
import org.apache.cassandra.db.marshal.AbstractType;
import com.datastax.driver.core.DataType;
public interface TypeConverter<T extends AbstractType<?>> {
T convert(DataType dataType);

View File

@ -5,17 +5,19 @@
*/
package io.debezium.connector.cassandra.transforms.type.converter;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.cassandra.cql3.FieldIdentifier;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UserType;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.datastax.driver.core.DataType;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
public class UserTypeConverter implements TypeConverter<UserType> {
@ -24,7 +26,7 @@ public UserType convert(DataType dataType) {
com.datastax.driver.core.UserType userType = (com.datastax.driver.core.UserType) dataType;
List<DataType> innerTypes = dataType.getTypeArguments();
List<AbstractType<?>> innerAbstractTypes = new ArrayList<>(innerTypes.size());
for (DataType dt: innerTypes) {
for (DataType dt : innerTypes) {
innerAbstractTypes.add(CassandraTypeConverter.convert(dt));
}
@ -34,15 +36,15 @@ public UserType convert(DataType dataType) {
ByteBuffer typeNameBuffer = UTF8Type.instance.fromString(typeNameString);
List<FieldIdentifier> fieldIdentifiers = new ArrayList<>(fieldNames.size());
for (String fieldName: fieldNames) {
for (String fieldName : fieldNames) {
fieldIdentifiers.add(FieldIdentifier.forInternalString(fieldName));
}
return new UserType(userType.getKeyspace(),
typeNameBuffer,
fieldIdentifiers,
innerAbstractTypes,
!userType.isFrozen());
typeNameBuffer,
fieldIdentifiers,
innerAbstractTypes,
!userType.isFrozen());
}
}

View File

@ -5,12 +5,13 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders;
import org.apache.cassandra.db.marshal.AbstractType;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.marshal.AbstractType;
import io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders;
public class InetAddressDeserializer extends BasicTypeDeserializer {
public InetAddressDeserializer() {

View File

@ -5,15 +5,16 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Values;
import java.nio.ByteBuffer;
import java.util.List;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
public class ListTypeDeserializer extends TypeDeserializer {

View File

@ -5,15 +5,16 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Values;
import java.nio.ByteBuffer;
import java.util.Map;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
public class MapTypeDeserializer extends TypeDeserializer {

View File

@ -5,17 +5,18 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Values;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.List;
import java.util.ArrayList;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
public class SetTypeDeserializer extends TypeDeserializer {
@ -23,7 +24,7 @@ public class SetTypeDeserializer extends TypeDeserializer {
@SuppressWarnings("unchecked")
public Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {
Set<?> deserializedSet = (Set<?>) super.deserialize(abstractType, bb);
List<?> deserializedList = (new ArrayList<> (deserializedSet));
List<?> deserializedList = (new ArrayList<>(deserializedSet));
return Values.convertToList(getSchemaBuilder(abstractType).build(), deserializedList);
}

View File

@ -5,14 +5,15 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.UuidUtil;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.data.SchemaBuilder;
import static io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders.UUID_TYPE;
import java.nio.ByteBuffer;
import static io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders.UUID_TYPE;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Values;
import io.debezium.connector.cassandra.transforms.UuidUtil;
public class TimeUUIDTypeDeserializer extends TypeDeserializer {

View File

@ -5,12 +5,13 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders;
import java.nio.ByteBuffer;
import java.util.Date;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.nio.ByteBuffer;
import java.util.Date;
import io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders;
public class TimestampTypeDeserializer extends TypeDeserializer {

View File

@ -5,14 +5,15 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TupleType;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import java.nio.ByteBuffer;
import java.util.List;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
public class TupleTypeDeserializer extends TypeDeserializer {
@ -31,8 +32,7 @@ public Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {
for (int i = 0; i < innerTypes.size(); i++) {
AbstractType<?> currentInnerType = innerTypes.get(i);
String fieldName = createFieldNameForIndex(i);
Object deserializedInnerObject =
CassandraTypeDeserializer.deserialize(currentInnerType, innerValueByteBuffers[i]);
Object deserializedInnerObject = CassandraTypeDeserializer.deserialize(currentInnerType, innerValueByteBuffers[i]);
struct.put(fieldName, deserializedInnerObject);
}

View File

@ -5,11 +5,11 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.nio.ByteBuffer;
public abstract class TypeDeserializer {
public Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {

View File

@ -5,14 +5,15 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.UuidUtil;
import org.apache.cassandra.db.marshal.AbstractType;
import static io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders.UUID_TYPE;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Values;
import static io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders.UUID_TYPE;
import io.debezium.connector.cassandra.transforms.UuidUtil;
public class UUIDTypeDeserializer extends TypeDeserializer {

View File

@ -5,7 +5,9 @@
*/
package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.cql3.UserTypes;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UserType;
@ -13,8 +15,7 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import java.nio.ByteBuffer;
import java.util.List;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
public class UserTypeDeserializer extends TypeDeserializer {

View File

@ -5,18 +5,18 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.config.Configuration;
import org.junit.Test;
import java.util.Properties;
import java.util.HashMap;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Properties;
import org.junit.Test;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.debezium.config.Configuration;
public class CassandraConnectorConfigTest {
@Test
@ -133,7 +133,7 @@ public void testConfigs() {
String keyConverterClass = "io.confluent.connect.avro.AvroConverter";
HashMap<String, Object> keyConverterConfigs = new HashMap<>();
keyConverterConfigs.put(CassandraConnectorConfig.KEY_CONVERTER_CLASS_CONFIG.name(), keyConverterClass);
keyConverterConfigs.put(CassandraConnectorConfig.KEY_CONVERTER_PREFIX+AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
keyConverterConfigs.put(CassandraConnectorConfig.KEY_CONVERTER_PREFIX + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
config = buildTaskConfigs(keyConverterConfigs);
assertEquals(keyConverterClass, config.getKeyConverter().getClass().getName());

View File

@ -5,14 +5,14 @@
*/
package io.debezium.connector.cassandra;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class CassandraConnectorTaskTest {
@Test(timeout = 60000)
@ -26,6 +26,7 @@ public void testProcessorGroup() throws Exception {
public void initialize() {
running.incrementAndGet();
}
@Override
public void destroy() {
running.decrementAndGet();
@ -41,6 +42,7 @@ public void process() {
public void initialize() {
running.incrementAndGet();
}
@Override
public void destroy() {
running.decrementAndGet();

View File

@ -5,15 +5,16 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.config.Configuration;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.junit.Test;
import io.debezium.config.Configuration;
public class CommitLogPostProcessorTest extends EmbeddedCassandraConnectorTestBase {
@ -29,6 +30,7 @@ public void testPostProcessor() throws Exception {
public void onSuccessTransfer(File file) {
archivedFileCount.incrementAndGet();
}
@Override
public void onErrorTransfer(File file) {
errorFileCount.incrementAndGet();

View File

@ -5,6 +5,13 @@
*/
package io.debezium.connector.cassandra;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
@ -17,13 +24,6 @@
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class CommitLogProcessorTest extends EmbeddedCassandraConnectorTestBase {
private CassandraConnectorContext context;
private CommitLogProcessor commitLogProcessor;
@ -67,7 +67,7 @@ public void testProcessCommitLogs() throws Exception {
// process the logs in commit log directory
File cdcLoc = new File(DatabaseDescriptor.getCommitLogLocation());
File[] commitLogs = CommitLogUtil.getCommitLogs(cdcLoc);
for (File commitLog: commitLogs) {
for (File commitLog : commitLogs) {
commitLogProcessor.processCommitLog(commitLog);
}

View File

@ -5,17 +5,6 @@
*/
package io.debezium.connector.cassandra;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import io.debezium.config.Configuration;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.cassandraunit.utils.CqlOperations;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@ -26,6 +15,19 @@
import java.util.Map;
import java.util.Properties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.cassandraunit.utils.CqlOperations;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import io.debezium.config.Configuration;
/**
* Base class used to automatically spin up a single-node embedded Cassandra cluster before tests
* and handle clean up after tests.

View File

@ -5,12 +5,9 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.config.Configuration;
import io.debezium.time.Conversions;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import org.apache.kafka.connect.data.Schema;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.FileInputStream;
import java.io.IOException;
@ -18,9 +15,13 @@
import java.nio.file.Path;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.kafka.connect.data.Schema;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.time.Conversions;
public class FileOffsetWriterTest {
@ -81,7 +82,7 @@ public void testMarkOffset() {
@Test
public void testFlush() throws IOException {
offsetWriter.flush();
try (FileInputStream fis = new FileInputStream(offsetDir.toString() + "/" + FileOffsetWriter.SNAPSHOT_OFFSET_FILE)) {
try (FileInputStream fis = new FileInputStream(offsetDir.toString() + "/" + FileOffsetWriter.SNAPSHOT_OFFSET_FILE)) {
snapshotProps.load(fis);
}
try (FileInputStream fis = new FileInputStream(offsetDir.toString() + "/" + FileOffsetWriter.COMMITLOG_OFFSET_FILE)) {
@ -105,7 +106,7 @@ public void testFlush() throws IOException {
process(commitLogRecordDiffTable);
offsetWriter.flush();
try (FileInputStream fis = new FileInputStream(offsetDir.toString() + "/" + FileOffsetWriter.SNAPSHOT_OFFSET_FILE)) {
try (FileInputStream fis = new FileInputStream(offsetDir.toString() + "/" + FileOffsetWriter.SNAPSHOT_OFFSET_FILE)) {
snapshotProps.load(fis);
}
try (FileInputStream fis = new FileInputStream(offsetDir.toString() + "/" + FileOffsetWriter.COMMITLOG_OFFSET_FILE)) {
@ -130,22 +131,20 @@ private ChangeRecord generateRecord(boolean markOffset, boolean isSnapshot, Offs
CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.from(new Properties()));
SourceInfo sourceInfo = new SourceInfo(config);
sourceInfo.update("test-cluster", offsetPosition, keyspaceTable, isSnapshot, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
return new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, markOffset);
return new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, markOffset);
}
private boolean isProcessed(ChangeRecord record) {
return offsetWriter.isOffsetProcessed(
record.getSource().keyspaceTable.name(),
record.getSource().offsetPosition.serialize(),
record.getSource().snapshot
);
record.getSource().snapshot);
}
private void process(ChangeRecord record) {
offsetWriter.markOffset(
record.getSource().keyspaceTable.name(),
record.getSource().offsetPosition.serialize(),
record.getSource().snapshot
);
record.getSource().snapshot);
}
}

View File

@ -5,17 +5,6 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.config.Configuration;
import io.debezium.time.Conversions;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.connect.data.Schema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ -24,6 +13,18 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.File;
import java.util.Properties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.connect.data.Schema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.time.Conversions;
public class QueueProcessorTest extends EmbeddedCassandraConnectorTestBase {
private CassandraConnectorContext context;
private QueueProcessor queueProcessor;
@ -50,8 +51,9 @@ public void testProcessChangeRecords() throws Exception {
for (int i = 0; i < recordSize; i++) {
CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.from(new Properties()));
SourceInfo sourceInfo = new SourceInfo(config);
sourceInfo.update(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
Record record = new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, false);
sourceInfo.update(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false,
Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
Record record = new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, false);
queue.enqueue(record);
}
@ -70,7 +72,8 @@ public void testProcessTombstoneRecords() throws Exception {
for (int i = 0; i < recordSize; i++) {
CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.from(new Properties()));
SourceInfo sourceInfo = new SourceInfo(config);
sourceInfo.update(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
sourceInfo.update(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false,
Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
Record record = new TombstoneRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA);
queue.enqueue(record);
}

View File

@ -5,13 +5,14 @@
*/
package io.debezium.connector.cassandra;
import com.datastax.driver.core.TableMetadata;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.Test;
import com.datastax.driver.core.TableMetadata;
public class SchemaProcessorTest extends EmbeddedCassandraConnectorTestBase {
@Test

View File

@ -5,15 +5,6 @@
*/
package io.debezium.connector.cassandra;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doNothing;
@ -22,6 +13,15 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class SnapshotProcessorTest extends EmbeddedCassandraConnectorTestBase {
@Test
public void testSnapshotTable() throws Exception {

View File

@ -5,9 +5,11 @@
*/
package io.debezium.connector.cassandra.transforms;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.TupleType;
import com.datastax.driver.core.UserType;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.cql3.FieldIdentifier;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
@ -36,10 +38,9 @@
import org.junit.Test;
import org.mockito.Mockito;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.TupleType;
import com.datastax.driver.core.UserType;
public class CassandraTypeConverterTest {
@ -321,22 +322,20 @@ public void testUdt() {
expectedFieldTypes.add(DoubleType.instance);
// non-frozen
org.apache.cassandra.db.marshal.UserType expectedAbstractType =
new org.apache.cassandra.db.marshal.UserType("barspace",
expectedTypeName,
expectedFieldIdentifiers,
expectedFieldTypes,
true);
org.apache.cassandra.db.marshal.UserType expectedAbstractType = new org.apache.cassandra.db.marshal.UserType("barspace",
expectedTypeName,
expectedFieldIdentifiers,
expectedFieldTypes,
true);
AbstractType<?> convertedType = CassandraTypeConverter.convert(userType);
Assert.assertEquals(expectedAbstractType, convertedType);
// frozen
expectedAbstractType =
new org.apache.cassandra.db.marshal.UserType("barspace",
expectedTypeName,
expectedFieldIdentifiers,
expectedFieldTypes,
false);
expectedAbstractType = new org.apache.cassandra.db.marshal.UserType("barspace",
expectedTypeName,
expectedFieldIdentifiers,
expectedFieldTypes,
false);
convertedType = CassandraTypeConverter.convert(userType);
Assert.assertEquals(expectedAbstractType, convertedType);
}

View File

@ -174,7 +174,7 @@ public void testFloatType() {
public void testInetAddressType() throws UnknownHostException {
InetAddress sourceInetAddress = InetAddress.getLocalHost();
// the address is the only thing that cassandra will seralize for an inetadress.
String expectedInetAddress = "/" + sourceInetAddress.getHostAddress();
String expectedInetAddress = "/" + sourceInetAddress.getHostAddress();
ByteBuffer serializedInetAddress = InetAddressType.instance.decompose(sourceInetAddress);

View File

@ -5,8 +5,9 @@
*/
package io.debezium.connector.cassandra.utils;
import io.debezium.connector.cassandra.CommitLogUtil;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
@ -14,9 +15,9 @@
import java.nio.file.Path;
import java.util.Objects;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import io.debezium.connector.cassandra.CommitLogUtil;
public class CommitLogUtilTest {

View File

@ -5,12 +5,13 @@
*/
package io.debezium.connector.cassandra.utils;
import io.debezium.connector.cassandra.transforms.UuidUtil;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import io.debezium.connector.cassandra.transforms.UuidUtil;
public class UuidUtilTest {