diff --git a/debezium-core/pom.xml b/debezium-core/pom.xml
index 908beefad..4e4b96265 100644
--- a/debezium-core/pom.xml
+++ b/debezium-core/pom.xml
@@ -46,6 +46,21 @@
org.easytesting
fest-assert
+
+
+ org.apache.kafka
+ kafka_${version.kafka.scala}
+ test
+
+
+ org.apache.curator
+ curator-test
+
+
+ org.apache.zookeeper
+ zookeeper
+ test
+
diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java
index d11e92bd7..a77c63f5b 100644
--- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java
+++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java
@@ -251,7 +251,7 @@ protected void parseNextStatement(Marker marker) {
*/
protected void parseComment(Marker marker) {
String comment = tokens.consume();
- logger.debug("COMMENT: {}", comment);
+ logger.trace("COMMENT: {}", comment);
}
/**
@@ -304,16 +304,16 @@ protected void parseUnknownStatement(Marker marker) {
}
protected void debugParsed(Marker statementStart) {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
String statement = removeLineFeeds(tokens.getContentFrom(statementStart));
- logger.debug("PARSED: {}", statement);
+ logger.trace("PARSED: {}", statement);
}
}
protected void debugSkipped(Marker statementStart) {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
String statement = removeLineFeeds(tokens.getContentFrom(statementStart));
- logger.debug("SKIPPED: {}", statement);
+ logger.trace("SKIPPED: {}", statement);
}
}
diff --git a/debezium-core/src/main/java/io/debezium/util/IoUtil.java b/debezium-core/src/main/java/io/debezium/util/IoUtil.java
index b42e12a05..ac2a3d009 100644
--- a/debezium-core/src/main/java/io/debezium/util/IoUtil.java
+++ b/debezium-core/src/main/java/io/debezium/util/IoUtil.java
@@ -380,7 +380,7 @@ public static void delete(File... filesOrFolder) throws IOException {
public static void delete(Path path) throws IOException {
if (path != null) {
if (path.toAbsolutePath().toFile().exists()) {
- LOGGER.info("Deleting '" + path + "'...");
+ LOGGER.debug("Deleting '" + path + "'...");
Set options = EnumSet.noneOf(FileVisitOption.class);
int maxDepth = 10;
FileVisitor removingVisitor = new SimpleFileVisitor() {
diff --git a/debezium-core/src/test/java/io/debezium/kafka/KafkaCluster.java b/debezium-core/src/test/java/io/debezium/kafka/KafkaCluster.java
new file mode 100644
index 000000000..2306c6ed9
--- /dev/null
+++ b/debezium-core/src/test/java/io/debezium/kafka/KafkaCluster.java
@@ -0,0 +1,1041 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.kafka;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.annotation.ThreadSafe;
+import io.debezium.document.Document;
+import io.debezium.document.DocumentSerdes;
+import io.debezium.util.IoUtil;
+
+/**
+ * An embeddable cluster of Kafka servers and a single Zookeeper server. This may be useful when creating a complete environment
+ * within a single process, but doing so offers limited durability and fault tolerance compared to the normal approach of
+ * using an external cluster of Kafka servers and Zookeeper servers with proper replication and fault tolerance.
+ *
+ * @author Randall Hauch
+ */
+@ThreadSafe
+public class KafkaCluster {
+
+ public static final boolean DEFAULT_DELETE_DATA_UPON_SHUTDOWN = true;
+ public static final boolean DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP = false;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCluster.class);
+
+ private final ConcurrentMap kafkaServers = new ConcurrentHashMap<>();
+ private final ZookeeperServer zkServer = new ZookeeperServer();
+ private volatile File dataDir = null;
+ private volatile boolean deleteDataUponShutdown = DEFAULT_DELETE_DATA_UPON_SHUTDOWN;
+ private volatile boolean deleteDataPriorToStartup = DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP;
+ private volatile boolean running = false;
+ private volatile Properties kafkaConfig = null;
+ private volatile int startingKafkaPort = -1;
+ private final AtomicLong nextKafkaPort = new AtomicLong(startingKafkaPort);
+
+ /**
+ * Create a new embedded cluster.
+ */
+ public KafkaCluster() {
+ }
+
+ /**
+ * Specify whether the data is to be deleted upon {@link #shutdown()}.
+ *
+ * @param delete true if the data is to be deleted upon shutdown, or false otherwise
+ * @return this instance to allow chaining methods; never null
+ * @throws IllegalStateException if the cluster is running
+ */
+ public KafkaCluster deleteDataUponShutdown(boolean delete) {
+ if (running) throw new IllegalStateException("Unable to change cluster settings when running");
+ this.deleteDataUponShutdown = delete;
+ return this;
+ }
+
+ /**
+ * Specify whether the data is to be deleted prior to {@link #startup()}.
+ *
+ * @param delete true if the data is to be deleted upon shutdown, or false otherwise
+ * @return this instance to allow chaining methods; never null
+ * @throws IllegalStateException if the cluster is running
+ */
+ public KafkaCluster deleteDataPriorToStartup(boolean delete) {
+ if (running) throw new IllegalStateException("Unable to change cluster settings when running");
+ this.deleteDataPriorToStartup = delete;
+ return this;
+ }
+
+ /**
+ * Add a number of new Kafka broker to the cluster. The broker IDs will be generated.
+ *
+ * @param count the number of new brokers to add
+ * @return this instance to allow chaining methods; never null
+ * @throws IllegalStateException if the cluster is running
+ */
+ public KafkaCluster addBrokers(int count) {
+ if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
+ AtomicLong added = new AtomicLong();
+ while (added.intValue() < count) {
+ kafkaServers.computeIfAbsent(new Integer(added.intValue() + 1), id -> {
+ added.incrementAndGet();
+ KafkaServer server = new KafkaServer(zkServer::getConnection, id);
+ if (dataDir != null) server.setStateDirectory(dataDir);
+ if (kafkaConfig != null) server.setProperties(kafkaConfig);
+ if (startingKafkaPort >= 0) server.setPort((int) this.nextKafkaPort.getAndIncrement());
+ return server;
+ });
+ }
+ return this;
+ }
+
+ /**
+ * Set the parent directory where the brokers logs and server's logs and snapshots will be kept.
+ *
+ * @param dataDir the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used
+ * @return this instance to allow chaining methods; never null
+ * @throws IllegalStateException if the cluster is running
+ * @throws IllegalArgumentException if the supplied file is not a directory or not writable
+ */
+ public KafkaCluster usingDirectory(File dataDir) {
+ if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
+ if (dataDir != null && dataDir.exists() && !dataDir.isDirectory() && !dataDir.canWrite() && !dataDir.canRead()) {
+ throw new IllegalArgumentException("The directory must be readable and writable");
+ }
+ this.dataDir = dataDir;
+ return this;
+ }
+
+ /**
+ * Set the configuration properties for each of the brokers. This method does nothing if the supplied properties are null or
+ * empty.
+ *
+ * @param properties the Kafka configuration properties
+ * @return this instance to allow chaining methods; never null
+ * @throws IllegalStateException if the cluster is running
+ */
+ public KafkaCluster withKafkaConfiguration(Properties properties) {
+ if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
+ if (properties != null && !properties.isEmpty()) {
+ kafkaConfig = new Properties(properties);
+ kafkaServers.values().forEach(kafka -> kafka.setProperties(kafkaConfig));
+ }
+ return this;
+ }
+
+ /**
+ * Set the port numbers for Zookeeper and the Kafka brokers.
+ *
+ * @param zkPort the port number that Zookeeper should use; may be -1 if an available port should be discovered
+ * @param firstKafkaPort the port number for the first Kafka broker (additional brokers will use subsequent port numbers);
+ * may be -1 if available ports should be discovered
+ * @return this instance to allow chaining methods; never null
+ * @throws IllegalStateException if the cluster is running
+ */
+ public KafkaCluster withPorts(int zkPort, int firstKafkaPort) {
+ if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
+ this.zkServer.setPort(zkPort);
+ this.startingKafkaPort = firstKafkaPort;
+ if (this.startingKafkaPort >= 0) {
+ this.nextKafkaPort.set(this.startingKafkaPort);
+ kafkaServers.values().forEach(kafka -> kafka.setPort((int) this.nextKafkaPort.getAndIncrement()));
+ }
+ return this;
+ }
+
+ /**
+ * Determine if the cluster is running.
+ *
+ * @return true if the cluster is running, or false otherwise
+ */
+ public boolean isRunning() {
+ return running;
+ }
+
+ /**
+ * Start the embedded Zookeeper server and the Kafka servers {@link #addBrokers(int) in the cluster}.
+ * This method does nothing if the cluster is already running.
+ *
+ * @return this instance to allow chaining methods; never null
+ * @throws IOException if there is an error during startup
+ */
+ public synchronized KafkaCluster startup() throws IOException {
+ if (!running) {
+ if (dataDir == null) {
+ try {
+ File temp = File.createTempFile("kafka", "suffix");
+ dataDir = new File(temp.getParentFile(), "cluster");
+ dataDir.mkdirs();
+ temp.delete();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create temporary directory", e);
+ }
+ } else if (deleteDataPriorToStartup) {
+ IoUtil.delete(dataDir);
+ dataDir.mkdirs();
+ }
+ File zkDir = new File(dataDir, "zk");
+ zkServer.setStateDirectory(zkDir); // does error checking
+ this.dataDir = dataDir;
+ File kafkaDir = new File(dataDir, "kafka");
+ kafkaServers.values().forEach(server -> server.setStateDirectory(new File(kafkaDir, "broker" + server.brokerId())));
+
+ zkServer.startup();
+ LOGGER.debug("Starting {} brokers", kafkaServers.size());
+ kafkaServers.values().forEach(KafkaServer::startup);
+ running = true;
+ }
+ return this;
+ }
+
+ /**
+ * Shutdown the embedded Zookeeper server and the Kafka servers {@link #addBrokers(int) in the cluster}.
+ * This method does nothing if the cluster is not running.
+ *
+ * @return this instance to allow chaining methods; never null
+ */
+ public synchronized KafkaCluster shutdown() {
+ if (running) {
+ try {
+ kafkaServers.values().forEach(this::shutdownReliably);
+ } finally {
+ try {
+ zkServer.shutdown(deleteDataUponShutdown);
+ } catch (Throwable t) {
+ LOGGER.error("Error while shutting down {}", zkServer, t);
+ } finally {
+ if (deleteDataUponShutdown) {
+ try {
+ kafkaServers.values().forEach(KafkaServer::deleteData);
+ } finally {
+ try {
+ IoUtil.delete(this.dataDir);
+ } catch (IOException e) {
+ LOGGER.error("Error while deleting cluster data", e);
+ }
+ }
+ }
+ running = false;
+ }
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Create the specified topics.
+ *
+ * @param topics the names of the topics to create
+ * @throws IllegalStateException if the cluster is not running
+ */
+ public void createTopics(String... topics) {
+ LOGGER.debug("Creating topics: {}", Arrays.toString(topics));
+ if (!running) throw new IllegalStateException("The cluster must be running to create topics");
+ kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(topics));
+ }
+
+ /**
+ * Create the specified topics.
+ *
+ * @param topics the names of the topics to create
+ * @throws IllegalStateException if the cluster is not running
+ */
+ public void createTopics(Set topics) {
+ createTopics(topics.toArray(new String[topics.size()]));
+ }
+
+ /**
+ * Create the specified topics.
+ *
+ * @param numPartitions the number of partitions for each topic
+ * @param replicationFactor the replication factor for each topic
+ * @param topics the names of the topics to create
+ */
+ public void createTopics(int numPartitions, int replicationFactor, String... topics) {
+ LOGGER.debug("Creating topics with {} partitions and {} replicas each: {}", numPartitions, replicationFactor,
+ Arrays.toString(topics));
+ if (!running) throw new IllegalStateException("The cluster must be running to create topics");
+ kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(numPartitions, replicationFactor, topics));
+ }
+
+ /**
+ * Create the specified topics.
+ *
+ * @param numPartitions the number of partitions for each topic
+ * @param replicationFactor the replication factor for each topic
+ * @param topics the names of the topics to create
+ */
+ public void createTopics(int numPartitions, int replicationFactor, Set topics) {
+ createTopics(numPartitions, replicationFactor, topics.toArray(new String[topics.size()]));
+ }
+
+ /**
+ * Create the specified topic.
+ *
+ * @param topic the name of the topic to create
+ * @param numPartitions the number of partitions for the topic
+ * @param replicationFactor the replication factor for the topic
+ */
+ public void createTopic(String topic, int numPartitions, int replicationFactor) {
+ LOGGER.debug("Creating topic '{}' with {} partitions and {} replicas", topic, numPartitions, replicationFactor);
+ if (!running) throw new IllegalStateException("The cluster must be running to create topics");
+ kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopic(topic, numPartitions, replicationFactor));
+ }
+
+ /**
+ * Perform the supplied function on each directory used by this cluster.
+ *
+ * @param consumer the consumer function; may not be null
+ */
+ void onEachDirectory(java.util.function.Consumer consumer) {
+ consumer.accept(zkServer.getSnapshotDirectory());
+ consumer.accept(zkServer.getLogDirectory());
+ kafkaServers.values().forEach(server -> consumer.accept(server.getStateDirectory()));
+ }
+
+ /**
+ * Get the list of brokers.
+ *
+ * @return the broker list
+ */
+ public String brokerList() {
+ StringJoiner joiner = new StringJoiner(",");
+ kafkaServers.values().forEach(server -> {
+ joiner.add(server.getConnection());
+ });
+ return joiner.toString();
+ }
+
+ private void shutdownReliably(KafkaServer server) {
+ try {
+ server.shutdown();
+ } catch (Throwable t) {
+ LOGGER.error("Error while shutting down {}", server, t);
+ }
+ }
+
+ /**
+ * Obtain the interface for using this cluster.
+ *
+ * @return the usage interface; never null
+ * @throws IllegalStateException if the cluster is not running
+ */
+ public Usage useTo() {
+ if (!running) throw new IllegalStateException("Unable to use the cluster it is not running");
+ return new Usage();
+ }
+
+ /**
+ * A simple interactive Kafka producer for use with the cluster.
+ *
+ * @param the type of key
+ * @param the type of value
+ */
+ public static interface InteractiveProducer extends Closeable {
+ /**
+ * Write to the topic with the given name a record with the specified key and value. The message is flushed immediately.
+ *
+ * @param topic the name of the topic; may not be null
+ * @param key the key; may not be null
+ * @param value the value; may not be null
+ * @return this producer instance to allow chaining methods together
+ */
+ default InteractiveProducer write(String topic, K key, V value) {
+ return write(new ProducerRecord<>(topic, key, value));
+ }
+
+ /**
+ * Write the specified record to the topic with the given name. The message is flushed immediately.
+ *
+ * @param record the record; may not be null
+ * @return this producer instance to allow chaining methods together
+ */
+ InteractiveProducer write(ProducerRecord record);
+
+ /**
+ * Close this producer's connection to Kafka and clean up all resources.
+ */
+ @Override
+ public void close();
+ }
+
+ /**
+ * A simple interactive Kafka consumer for use with the cluster.
+ *
+ * @param the type of key
+ * @param the type of value
+ */
+ public static interface InteractiveConsumer extends Closeable {
+ /**
+ * Block until a record can be read from this consumer's topic, and return the value in that record.
+ *
+ * @return the value; never null
+ * @throws InterruptedException if the thread is interrupted while blocking
+ */
+ default V nextValue() throws InterruptedException {
+ return nextRecord().value();
+ }
+
+ /**
+ * Block until a record can be read from this consumer's topic, and return the record.
+ *
+ * @return the record; never null
+ * @throws InterruptedException if the thread is interrupted while blocking
+ */
+ ConsumerRecord nextRecord() throws InterruptedException;
+
+ /**
+ * Block until a record can be read from this consumer's topic or until the timeout occurs, and if a record was read
+ * return the value in that record.
+ *
+ * @param timeout the maximum amount of time to block to wait for a record
+ * @param unit the unit of time for the {@code timeout}
+ * @return the value, or null if the method timed out
+ * @throws InterruptedException if the thread is interrupted while blocking
+ */
+ default V nextValue(long timeout, TimeUnit unit) throws InterruptedException {
+ ConsumerRecord record = nextRecord(timeout, unit);
+ return record != null ? record.value() : null;
+ }
+
+ /**
+ * Block until a record can be read from this consumer's topic or until the timeout occurs, and if a record was read
+ * return the record.
+ *
+ * @param timeout the maximum amount of time to block to wait for a record
+ * @param unit the unit of time for the {@code timeout}
+ * @return the record, or null if the method timed out
+ * @throws InterruptedException if the thread is interrupted while blocking
+ */
+ ConsumerRecord nextRecord(long timeout, TimeUnit unit) throws InterruptedException;
+
+ /**
+ * Obtain a stream to consume the input messages. This method can be used in place of repeated calls to the
+ * {@code next...()} methods.
+ *
+ * @return the stream of all messages.
+ */
+ Stream> stream();
+
+ /**
+ * Obtain a stream over all of accumulated input messages. This method can be called multiple times, and each time
+ * the resulting stream will operate over all messages that received by this consumer, and is completely
+ * independent of the {@link #stream()}, {@link #nextRecord()}, {@link #nextRecord(long, TimeUnit)}, {@link #nextValue()}
+ * and {@link #nextValue(long, TimeUnit)} methods.
+ *
+ * @return the stream of all messages.
+ */
+ Stream> streamAll();
+
+ /**
+ * Asynchronously close this consumer's connection to Kafka and begin to clean up all resources.
+ */
+ @Override
+ public void close();
+ }
+
+ /**
+ * A set of methods to use a running KafkaCluster.
+ */
+ public class Usage {
+
+ /**
+ * Get a new set of properties for consumers that want to talk to this server.
+ *
+ * @param groupId the group ID for the consumer; may not be null
+ * @param clientId the optional identifier for the client; may be null if not needed
+ * @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
+ * out of range; may be null for the default to be used
+ * @return the mutable consumer properties
+ * @see #getProducerProperties(String)
+ */
+ public Properties getConsumerProperties(String groupId, String clientId, OffsetResetStrategy autoOffsetReset) {
+ if (groupId == null) throw new IllegalArgumentException("The groupId is required");
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ if (autoOffsetReset != null) {
+ props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset.toString().toLowerCase());
+ }
+ if (clientId != null) props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+ return props;
+ }
+
+ /**
+ * Get a new set of properties for producers that want to talk to this server.
+ *
+ * @param clientId the optional identifier for the client; may be null if not needed
+ * @return the mutable producer properties
+ * @see #getConsumerProperties(String, String, OffsetResetStrategy)
+ */
+ public Properties getProducerProperties(String clientId) {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
+ props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(1));
+ if (clientId != null) props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+ return props;
+ }
+
+ /**
+ * Create an {@link InteractiveProducer simple producer} that can be used to write messages to the cluster.
+ *
+ * @param producerName the name of the producer; may not be null
+ * @param keySerializer the serializer for the keys; may not be null
+ * @param valueSerializer the serializer for the values; may not be null
+ * @return the object that can be used to produce messages; never null
+ */
+ public InteractiveProducer createProducer(String producerName, Serializer keySerializer,
+ Serializer valueSerializer) {
+ Properties props = getProducerProperties(producerName);
+ KafkaProducer producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
+ return new InteractiveProducer() {
+ @Override
+ public InteractiveProducer write(ProducerRecord record) {
+ producer.send(record);
+ producer.flush();
+ return this;
+ }
+
+ @Override
+ public void close() {
+ producer.close();
+ }
+ };
+ }
+
+ /**
+ * Create an {@link InteractiveProducer simple producer} that can be used to write {@link Document} messages to the
+ * cluster.
+ *
+ * @param producerName the name of the producer; may not be null
+ * @return the object that can be used to produce messages; never null
+ */
+ public InteractiveProducer createProducer(String producerName) {
+ return createProducer(producerName, new StringSerializer(), new DocumentSerdes());
+ }
+
+ /**
+ * Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
+ *
+ * @param groupId the name of the group; may not be null
+ * @param clientId the name of the client; may not be null
+ * @param topicName the name of the topic to read; may not be null and may not be empty
+ * @param keyDeserializer the deserializer for the keys; may not be null
+ * @param valueDeserializer the deserializer for the values; may not be null
+ * @param completion the function to call when the consumer terminates; may be null
+ * @return the running interactive consumer; never null
+ */
+ public InteractiveConsumer createConsumer(String groupId, String clientId, String topicName,
+ Deserializer keyDeserializer,
+ Deserializer valueDeserializer, Runnable completion) {
+ Set topicNames = Collections.singleton(topicName);
+ return createConsumer(groupId, clientId, topicNames, keyDeserializer, valueDeserializer, completion);
+ }
+
+ /**
+ * Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
+ *
+ * @param groupId the name of the group; may not be null
+ * @param clientId the name of the client; may not be null
+ * @param topicNames the names of the topics to read; may not be null and may not be empty
+ * @param keyDeserializer the deserializer for the keys; may not be null
+ * @param valueDeserializer the deserializer for the values; may not be null
+ * @param completion the function to call when the consumer terminates; may be null
+ * @return the running interactive consumer; never null
+ */
+ public InteractiveConsumer createConsumer(String groupId, String clientId, Set topicNames,
+ Deserializer keyDeserializer,
+ Deserializer valueDeserializer, Runnable completion) {
+ BlockingQueue> consumed = new LinkedBlockingQueue<>();
+ List> allMessages = new LinkedList<>();
+ AtomicBoolean keepReading = new AtomicBoolean();
+ consume(groupId, clientId, OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, () -> keepReading.get(),
+ completion, topicNames, record -> {
+ consumed.add(record);
+ allMessages.add(record);
+ });
+ return new InteractiveConsumer() {
+ @Override
+ public ConsumerRecord nextRecord() throws InterruptedException {
+ return consumed.take();
+ }
+
+ @Override
+ public ConsumerRecord nextRecord(long timeout, TimeUnit unit) throws InterruptedException {
+ return consumed.poll(timeout, unit);
+ }
+
+ @Override
+ public void close() {
+ keepReading.set(false);
+ }
+
+ @Override
+ public Stream> stream() {
+ return consumed.stream();
+ }
+
+ @Override
+ public Stream> streamAll() {
+ return allMessages.stream();
+ }
+ };
+ }
+
+ /**
+ * Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
+ *
+ * @param groupId the name of the group; may not be null
+ * @param clientId the name of the client; may not be null
+ * @param topicName the name of the topic to read; may not be null and may not be empty
+ * @param completion the function to call when the consumer terminates; may be null
+ * @return the running interactive consumer; never null
+ */
+ public InteractiveConsumer createConsumer(String groupId, String clientId, String topicName,
+ Runnable completion) {
+ Set topicNames = Collections.singleton(topicName);
+ return createConsumer(groupId, clientId, topicNames, new StringDeserializer(), new DocumentSerdes(), completion);
+ }
+
+ /**
+ * Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
+ *
+ * @param groupId the name of the group; may not be null
+ * @param clientId the name of the client; may not be null
+ * @param topicNames the names of the topics to read; may not be null and may not be empty
+ * @param completion the function to call when the consumer terminates; may be null
+ * @return the running interactive consumer; never null
+ */
+ public InteractiveConsumer createConsumer(String groupId, String clientId, Set topicNames,
+ Runnable completion) {
+ return createConsumer(groupId, clientId, topicNames, new StringDeserializer(), new DocumentSerdes(), completion);
+ }
+
+ /**
+ * Use the supplied function to asynchronously produce {@link Document} messages and write them to the cluster.
+ *
+ * @param producerName the name of the producer; may not be null
+ * @param producer the function that will asynchronously
+ */
+ public void produce(String producerName, Consumer> producer) {
+ produce(producerName, new StringSerializer(), new DocumentSerdes(), producer);
+ }
+
+ /**
+ * Use the supplied function to asynchronously produce messages and write them to the cluster.
+ *
+ * @param producerName the name of the producer; may not be null
+ * @param keySerializer the serializer for the keys; may not be null
+ * @param valueSerializer the serializer for the values; may not be null
+ * @param producer the function that will asynchronously
+ */
+ public void produce(String producerName, Serializer keySerializer, Serializer valueSerializer,
+ Consumer> producer) {
+ Properties props = getProducerProperties(producerName);
+ KafkaProducer kafkaProducer = new KafkaProducer<>(props, keySerializer, valueSerializer);
+ InteractiveProducer interactive = new InteractiveProducer() {
+ @Override
+ public InteractiveProducer write(ProducerRecord record) {
+ kafkaProducer.send(record);
+ kafkaProducer.flush();
+ return this;
+ }
+
+ @Override
+ public void close() {
+ kafkaProducer.close();
+ }
+ };
+ Thread t = new Thread(() -> {
+ try {
+ producer.accept(interactive);
+ } finally {
+ interactive.close();
+ }
+ });
+ t.setName(producerName + "-thread");
+ t.start();
+ }
+
+ /**
+ * Use the supplied function to asynchronously produce messages and write them to the cluster.
+ *
+ * @param producerName the name of the producer; may not be null
+ * @param messageCount the number of messages to produce; must be positive
+ * @param keySerializer the serializer for the keys; may not be null
+ * @param valueSerializer the serializer for the values; may not be null
+ * @param completionCallback the function to be called when the producer is completed; may be null
+ * @param messageSupplier the function to produce messages; may not be null
+ */
+ public void produce(String producerName, int messageCount,
+ Serializer keySerializer, Serializer valueSerializer,
+ Runnable completionCallback,
+ Supplier> messageSupplier) {
+ Properties props = getProducerProperties(producerName);
+ Thread t = new Thread(() -> {
+ LOGGER.debug("Starting producer {} to write {} messages", producerName, messageCount);
+ try (KafkaProducer producer = new KafkaProducer<>(props, keySerializer, valueSerializer)) {
+ for (int i = 0; i != messageCount; ++i) {
+ ProducerRecord record = messageSupplier.get();
+ producer.send(record);
+ producer.flush();
+ LOGGER.debug("Producer {}: sent message {}", producerName, record);
+ }
+ } finally {
+ if (completionCallback != null) completionCallback.run();
+ LOGGER.debug("Stopping producer {}", producerName);
+ }
+ });
+ t.setName(producerName + "-thread");
+ t.start();
+ }
+
+ /**
+ * Use the supplied function to asynchronously produce messages with String keys and values, and write them to the
+ * cluster.
+ *
+ * @param messageCount the number of messages to produce; must be positive
+ * @param completionCallback the function to be called when the producer is completed; may be null
+ * @param messageSupplier the function to produce messages; may not be null
+ */
+ public void produceStrings(int messageCount,
+ Runnable completionCallback, Supplier> messageSupplier) {
+ Serializer keySer = new StringSerializer();
+ Serializer valSer = keySer;
+ String randomId = UUID.randomUUID().toString();
+ produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
+ }
+
+ /**
+ * Use the supplied function to asynchronously produce messages with String keys and {@link Document} values, and write
+ * them to the cluster.
+ *
+ * @param messageCount the number of messages to produce; must be positive
+ * @param completionCallback the function to be called when the producer is completed; may be null
+ * @param messageSupplier the function to produce messages; may not be null
+ */
+ public void produceDocuments(int messageCount,
+ Runnable completionCallback, Supplier> messageSupplier) {
+ Serializer keySer = new StringSerializer();
+ Serializer valSer = new DocumentSerdes();
+ String randomId = UUID.randomUUID().toString();
+ produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
+ }
+
+ /**
+ * Use the supplied function to asynchronously produce messages with String keys and Integer values, and write them to the
+ * cluster.
+ *
+ * @param messageCount the number of messages to produce; must be positive
+ * @param completionCallback the function to be called when the producer is completed; may be null
+ * @param messageSupplier the function to produce messages; may not be null
+ */
+ public void produceIntegers(int messageCount,
+ Runnable completionCallback, Supplier> messageSupplier) {
+ Serializer keySer = new StringSerializer();
+ Serializer valSer = new IntegerSerializer();
+ String randomId = UUID.randomUUID().toString();
+ produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
+ }
+
+ /**
+ * Asynchronously produce messages with String keys and sequential Integer values, and write them to the cluster.
+ *
+ * @param topic the name of the topic to which the messages should be written; may not be null
+ * @param messageCount the number of messages to produce; must be positive
+ * @param initialValue the first integer value to produce
+ * @param completionCallback the function to be called when the producer is completed; may be null
+ */
+ public void produceIntegers(String topic, int messageCount, int initialValue,
+ Runnable completionCallback) {
+ AtomicLong counter = new AtomicLong(initialValue);
+ produceIntegers(messageCount, completionCallback, () -> {
+ long i = counter.incrementAndGet();
+ String keyAndValue = Long.toString(i);
+ return new ProducerRecord(topic, keyAndValue, new Integer((int) i));
+ });
+ }
+
+ /**
+ * Asynchronously produce messages with monotonically increasing String keys and values obtained from the supplied
+ * function, and write them to the cluster.
+ *
+ * @param topic the name of the topic to which the messages should be written; may not be null
+ * @param messageCount the number of messages to produce; must be positive
+ * @param completionCallback the function to be called when the producer is completed; may be null
+ * @param valueSupplier the value supplier; may not be null
+ */
+ public void produceStrings(String topic, int messageCount,
+ Runnable completionCallback, Supplier valueSupplier) {
+ AtomicLong counter = new AtomicLong(0);
+ produceStrings(messageCount, completionCallback, () -> {
+ long i = counter.incrementAndGet();
+ String keyAndValue = Long.toString(i);
+ return new ProducerRecord(topic, keyAndValue, valueSupplier.get());
+ });
+ }
+
+ /**
+ * Asynchronously produce messages with monotonically increasing String keys and values obtained from the supplied
+ * function, and write them to the cluster.
+ *
+ * @param topic the name of the topic to which the messages should be written; may not be null
+ * @param messageCount the number of messages to produce; must be positive
+ * @param completionCallback the function to be called when the producer is completed; may be null
+ * @param valueSupplier the value supplier; may not be null
+ */
+ public void produceDocuments(String topic, int messageCount,
+ Runnable completionCallback, Supplier valueSupplier) {
+ AtomicLong counter = new AtomicLong(0);
+ produceDocuments(messageCount, completionCallback, () -> {
+ long i = counter.incrementAndGet();
+ String keyAndValue = Long.toString(i);
+ return new ProducerRecord(topic, keyAndValue, valueSupplier.get());
+ });
+ }
+
+ /**
+ * Use the supplied function to asynchronously consume messages from the cluster.
+ *
+ * @param groupId the name of the group; may not be null
+ * @param clientId the name of the client; may not be null
+ * @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
+ * out of range; may be null for the default to be used
+ * @param keyDeserializer the deserializer for the keys; may not be null
+ * @param valueDeserializer the deserializer for the values; may not be null
+ * @param continuation the function that determines if the consumer should continue; may not be null
+ * @param completion the function to call when the consumer terminates; may be null
+ * @param topics the set of topics to consume; may not be null or empty
+ * @param consumerFunction the function to consume the messages; may not be null
+ */
+ public void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset,
+ Deserializer keyDeserializer, Deserializer valueDeserializer,
+ BooleanSupplier continuation, Runnable completion, Collection topics,
+ java.util.function.Consumer> consumerFunction) {
+ Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset);
+ Thread t = new Thread(() -> {
+ LOGGER.debug("Starting consumer {} to read messages", clientId);
+ try (KafkaConsumer consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) {
+ consumer.subscribe(new ArrayList<>(topics));
+ while (continuation.getAsBoolean()) {
+ consumer.poll(10).forEach(record -> {
+ LOGGER.debug("Consumer {}: consuming message {}", clientId, record);
+ consumerFunction.accept(record);
+ consumer.commitAsync();
+ });
+ }
+ } finally {
+ if (completion != null) completion.run();
+ LOGGER.debug("Stopping consumer {}", clientId);
+ }
+ });
+ t.setName(clientId + "-thread");
+ t.start();
+ }
+
+ /**
+ * Asynchronously consume all messages from the cluster.
+ *
+ * @param continuation the function that determines if the consumer should continue; may not be null
+ * @param completion the function to call when all messages have been consumed; may be null
+ * @param topics the set of topics to consume; may not be null or empty
+ * @param consumerFunction the function to consume the messages; may not be null
+ */
+ public void consumeDocuments(BooleanSupplier continuation, Runnable completion, Collection topics,
+ java.util.function.Consumer> consumerFunction) {
+ Deserializer keyDes = new StringDeserializer();
+ Deserializer valDes = new DocumentSerdes();
+ String randomId = UUID.randomUUID().toString();
+ consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
+ }
+
+ /**
+ * Asynchronously consume all messages from the cluster.
+ *
+ * @param continuation the function that determines if the consumer should continue; may not be null
+ * @param completion the function to call when all messages have been consumed; may be null
+ * @param topics the set of topics to consume; may not be null or empty
+ * @param consumerFunction the function to consume the messages; may not be null
+ */
+ public void consumeStrings(BooleanSupplier continuation, Runnable completion, Collection topics,
+ java.util.function.Consumer> consumerFunction) {
+ Deserializer keyDes = new StringDeserializer();
+ Deserializer valDes = keyDes;
+ String randomId = UUID.randomUUID().toString();
+ consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
+ }
+
+ /**
+ * Asynchronously consume all messages from the cluster.
+ *
+ * @param continuation the function that determines if the consumer should continue; may not be null
+ * @param completion the function to call when all messages have been consumed; may be null
+ * @param topics the set of topics to consume; may not be null or empty
+ * @param consumerFunction the function to consume the messages; may not be null
+ */
+ public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection topics,
+ java.util.function.Consumer> consumerFunction) {
+ Deserializer keyDes = new StringDeserializer();
+ Deserializer valDes = new IntegerDeserializer();
+ String randomId = UUID.randomUUID().toString();
+ consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
+ }
+
+ /**
+ * Asynchronously consume all messages on the given topic from the cluster.
+ *
+ * @param topicName the name of the topic; may not be null
+ * @param count the expected number of messages to read before terminating; may not be null
+ * @param timeout the maximum time that this consumer should run before terminating; must be positive
+ * @param unit the unit of time for the timeout; may not be null
+ * @param completion the function to call when all messages have been consumed; may be null
+ * @param consumer the function to consume the messages; may not be null
+ */
+ public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion,
+ BiPredicate consumer) {
+ AtomicLong readCounter = new AtomicLong();
+ consumeStrings(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit),
+ completion,
+ Collections.singleton(topicName),
+ record -> {
+ if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet();
+ });
+ }
+
+ /**
+ * Asynchronously consume all messages on the given topic from the cluster.
+ *
+ * @param topicName the name of the topic; may not be null
+ * @param count the expected number of messages to read before terminating; may not be null
+ * @param timeout the maximum time that this consumer should run before terminating; must be positive
+ * @param unit the unit of time for the timeout; may not be null
+ * @param completion the function to call when all messages have been consumed; may be null
+ * @param consumer the function to consume the messages; may not be null
+ */
+ public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion,
+ BiPredicate consumer) {
+ AtomicLong readCounter = new AtomicLong();
+ consumeDocuments(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit),
+ completion,
+ Collections.singleton(topicName),
+ record -> {
+ if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet();
+ });
+ }
+
+ /**
+ * Asynchronously consume all messages on the given topic from the cluster.
+ *
+ * @param topicName the name of the topic; may not be null
+ * @param count the expected number of messages to read before terminating; may not be null
+ * @param timeout the maximum time that this consumer should run before terminating; must be positive
+ * @param unit the unit of time for the timeout; may not be null
+ * @param completion the function to call when all messages have been consumed; may be null
+ * @param consumer the function to consume the messages; may not be null
+ */
+ public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion,
+ BiPredicate consumer) {
+ AtomicLong readCounter = new AtomicLong();
+ consumeIntegers(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit),
+ completion,
+ Collections.singleton(topicName),
+ record -> {
+ if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet();
+ });
+ }
+
+ /**
+ * Asynchronously consume all messages on the given topic from the cluster.
+ *
+ * @param topicName the name of the topic; may not be null
+ * @param count the expected number of messages to read before terminating; may not be null
+ * @param timeout the maximum time that this consumer should run before terminating; must be positive
+ * @param unit the unit of time for the timeout; may not be null
+ * @param completion the function to call when all messages have been consumed; may be null
+ */
+ public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
+ consumeStrings(topicName, count, timeout, unit, completion, (key, value) -> true);
+ }
+
+ /**
+ * Asynchronously consume all messages on the given topic from the cluster.
+ *
+ * @param topicName the name of the topic; may not be null
+ * @param count the expected number of messages to read before terminating; may not be null
+ * @param timeout the maximum time that this consumer should run before terminating; must be positive
+ * @param unit the unit of time for the timeout; may not be null
+ * @param completion the function to call when all messages have been consumed; may be null
+ */
+ public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
+ consumeDocuments(topicName, count, timeout, unit, completion, (key, value) -> true);
+ }
+
+ /**
+ * Asynchronously consume all messages on the given topic from the cluster.
+ *
+ * @param topicName the name of the topic; may not be null
+ * @param count the expected number of messages to read before terminating; may not be null
+ * @param timeout the maximum time that this consumer should run before terminating; must be positive
+ * @param unit the unit of time for the timeout; may not be null
+ * @param completion the function to call when all messages have been consumed; may be null
+ */
+ public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
+ consumeIntegers(topicName, count, timeout, unit, completion, (key, value) -> true);
+ }
+
+ protected BooleanSupplier continueIfNotExpired(BooleanSupplier continuation, long timeout, TimeUnit unit) {
+ return new BooleanSupplier() {
+ long stopTime = 0L;
+
+ @Override
+ public boolean getAsBoolean() {
+ if (stopTime == 0L) stopTime = System.currentTimeMillis() + unit.toMillis(timeout);
+ return continuation.getAsBoolean() && System.currentTimeMillis() <= stopTime;
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/debezium-core/src/test/java/io/debezium/kafka/KafkaClusterTest.java b/debezium-core/src/test/java/io/debezium/kafka/KafkaClusterTest.java
new file mode 100644
index 000000000..e2dead2d7
--- /dev/null
+++ b/debezium-core/src/test/java/io/debezium/kafka/KafkaClusterTest.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.kafka;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.fest.assertions.Assertions.assertThat;
+
+import io.debezium.util.Stopwatch;
+import io.debezium.util.Testing;
+
+/**
+ * @author Randall Hauch
+ */
+public class KafkaClusterTest {
+
+ private KafkaCluster cluster;
+ private File dataDir;
+
+ @Before
+ public void beforeEach() {
+ dataDir = Testing.Files.createTestingDirectory("cluster");
+ Testing.Files.delete(dataDir);
+ cluster = new KafkaCluster().usingDirectory(dataDir);
+ }
+
+ @After
+ public void afterEach() {
+ cluster.shutdown();
+ Testing.Files.delete(dataDir);
+ }
+
+ @Test
+ public void shouldStartClusterWithOneBrokerAndRemoveData() throws Exception {
+ cluster.deleteDataUponShutdown(true).addBrokers(1).startup();
+ cluster.onEachDirectory(this::assertValidDataDirectory);
+ cluster.shutdown();
+ cluster.onEachDirectory(this::assertDoesNotExist);
+ }
+
+ @Test
+ public void shouldStartClusterWithMultipleBrokerAndRemoveData() throws Exception {
+ cluster.deleteDataUponShutdown(true).addBrokers(3).startup();
+ cluster.onEachDirectory(this::assertValidDataDirectory);
+ cluster.shutdown();
+ cluster.onEachDirectory(this::assertDoesNotExist);
+ }
+
+ @Test
+ public void shouldStartClusterWithOneBrokerAndLeaveData() throws Exception {
+ cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
+ cluster.onEachDirectory(this::assertValidDataDirectory);
+ cluster.shutdown();
+ cluster.onEachDirectory(this::assertValidDataDirectory);
+ }
+
+ @Test
+ public void shouldStartClusterWithMultipleBrokerAndLeaveData() throws Exception {
+ cluster.deleteDataUponShutdown(false).addBrokers(3).startup();
+ cluster.onEachDirectory(this::assertValidDataDirectory);
+ cluster.shutdown();
+ cluster.onEachDirectory(this::assertValidDataDirectory);
+ }
+
+ @Test
+ public void shouldStartClusterAndAllowProducersAndConsumersToUseIt() throws Exception {
+ Testing.Debug.enable();
+ final String topicName = "topicA";
+ final CountDownLatch completion = new CountDownLatch(2);
+ final int numMessages = 100;
+ final AtomicLong messagesRead = new AtomicLong(0);
+
+ // Start a cluster and create a topic ...
+ cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
+ cluster.createTopics(topicName);
+
+ // Consume messages asynchronously ...
+ Stopwatch sw = Stopwatch.reusable().start();
+ cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
+ messagesRead.incrementAndGet();
+ return true;
+ });
+
+ // Produce some messages asynchronously ...
+ cluster.useTo().produceIntegers(topicName, numMessages, 1, completion::countDown);
+
+ // Wait for both to complete ...
+ if (completion.await(10, TimeUnit.SECONDS)) {
+ sw.stop();
+ Testing.debug("Both consumer and producer completed normally in " + sw.durations());
+ } else {
+ Testing.debug("Consumer and/or producer did not completed normally");
+ }
+
+ assertThat(messagesRead.get()).isEqualTo(numMessages);
+ }
+
+ @Test
+ public void shouldStartClusterAndAllowInteractiveProductionAndAutomaticConsumersToUseIt() throws Exception {
+ Testing.Debug.enable();
+ final String topicName = "topicA";
+ final CountDownLatch completion = new CountDownLatch(1);
+ final int numMessages = 3;
+ final AtomicLong messagesRead = new AtomicLong(0);
+
+ // Start a cluster and create a topic ...
+ cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
+ cluster.createTopics(topicName);
+
+ // Consume messages asynchronously ...
+ Stopwatch sw = Stopwatch.reusable().start();
+ cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
+ messagesRead.incrementAndGet();
+ return true;
+ });
+
+ // Produce some messages interactively ...
+ cluster.useTo()
+ .createProducer("manual", new StringSerializer(), new IntegerSerializer())
+ .write(topicName, "key1", 1)
+ .write(topicName, "key2", 2)
+ .write(topicName, "key3", 3)
+ .close();
+
+ // Wait for the consumer to to complete ...
+ if (completion.await(10, TimeUnit.SECONDS)) {
+ sw.stop();
+ Testing.debug("The consumer completed normally in " + sw.durations());
+ } else {
+ Testing.debug("Consumer did not completed normally");
+ }
+
+ assertThat(messagesRead.get()).isEqualTo(numMessages);
+ }
+
+ @Test
+ public void shouldStartClusterAndAllowAsynchronousProductionAndAutomaticConsumersToUseIt() throws Exception {
+ Testing.Debug.enable();
+ final String topicName = "topicA";
+ final CountDownLatch completion = new CountDownLatch(2);
+ final int numMessages = 3;
+ final AtomicLong messagesRead = new AtomicLong(0);
+
+ // Start a cluster and create a topic ...
+ cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
+ cluster.createTopics(topicName);
+
+ // Consume messages asynchronously ...
+ Stopwatch sw = Stopwatch.reusable().start();
+ cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
+ messagesRead.incrementAndGet();
+ return true;
+ });
+
+ // Produce some messages interactively ...
+ cluster.useTo().produce("manual", new StringSerializer(), new IntegerSerializer(), produer -> {
+ produer.write(topicName, "key1", 1);
+ produer.write(topicName, "key2", 2);
+ produer.write(topicName, "key3", 3);
+ completion.countDown();
+ });
+
+ // Wait for the consumer to to complete ...
+ if (completion.await(10, TimeUnit.SECONDS)) {
+ sw.stop();
+ Testing.debug("The consumer completed normally in " + sw.durations());
+ } else {
+ Testing.debug("Consumer did not completed normally");
+ }
+ assertThat(messagesRead.get()).isEqualTo(numMessages);
+ }
+
+ protected void assertValidDataDirectory(File dir) {
+ assertThat(dir.exists()).isTrue();
+ assertThat(dir.isDirectory()).isTrue();
+ assertThat(dir.canWrite()).isTrue();
+ assertThat(dir.canRead()).isTrue();
+ assertThat(Testing.Files.inTargetDir(dir)).isTrue();
+ }
+
+ protected void assertDoesNotExist(File dir) {
+ assertThat(dir.exists()).isFalse();
+ }
+}
\ No newline at end of file
diff --git a/debezium-core/src/test/java/io/debezium/kafka/KafkaServer.java b/debezium-core/src/test/java/io/debezium/kafka/KafkaServer.java
new file mode 100644
index 000000000..81ed7b953
--- /dev/null
+++ b/debezium-core/src/test/java/io/debezium/kafka/KafkaServer.java
@@ -0,0 +1,339 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.annotation.ThreadSafe;
+import io.debezium.util.IoUtil;
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.utils.ZkUtils;
+
+/**
+ * A small embedded Kafka server.
+ *
+ * @author Randall Hauch
+ */
+@ThreadSafe
+public class KafkaServer {
+
+ public static final int DEFAULT_BROKER_ID = 1;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServer.class);
+
+ private final Supplier zkConnection;
+ private final int brokerId;
+ private volatile File logsDir;
+ private final Properties config;
+ private volatile int desiredPort = -1;
+ private volatile int port = -1;
+ private volatile kafka.server.KafkaServer server;
+
+ /**
+ * Create a new server instance.
+ *
+ * @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
+ */
+ public KafkaServer(Supplier zookeeperConnection) {
+ this(zookeeperConnection, DEFAULT_BROKER_ID);
+ }
+
+ /**
+ * Create a new server instance.
+ *
+ * @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
+ * @param brokerId the unique broker ID
+ */
+ public KafkaServer(Supplier zookeeperConnection, int brokerId) {
+ this(zookeeperConnection, brokerId, -1);
+ }
+
+ /**
+ * Create a new server instance.
+ *
+ * @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
+ * @param brokerId the unique broker ID
+ * @param port the desired port
+ */
+ public KafkaServer(Supplier zookeeperConnection, int brokerId, int port) {
+ if (zookeeperConnection == null) throw new IllegalArgumentException("The Zookeeper connection string supplier may not be null");
+ this.zkConnection = zookeeperConnection;
+ this.brokerId = brokerId;
+ this.config = new Properties();
+ setPort(port);
+ populateDefaultConfiguration(this.config);
+ }
+
+ protected int brokerId() {
+ return brokerId;
+ }
+
+ protected String zookeeperConnection() {
+ return this.zkConnection.get();
+ }
+
+ /**
+ * Set the initial default configuration properties. This method is called from the constructors and can be overridden
+ * to customize these properties.
+ *
+ * @param props the configuration properties; never null
+ */
+ protected void populateDefaultConfiguration(Properties props) {
+ config.setProperty(KafkaConfig.NumPartitionsProp(), String.valueOf(1));
+ config.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
+ }
+
+ /**
+ * Set a configuration property. Several key properties that deal with Zookeeper, the host name, and the broker ID,
+ * may not be set via this method and are ignored since they are controlled elsewhere in this instance.
+ *
+ * @param name the property name; may not be null
+ * @param value the property value; may be null
+ * @return this instance to allow chaining methods; never null
+ * @throws IllegalStateException if the server is running when this method is called
+ */
+ public KafkaServer setProperty(String name, String value) {
+ if (server != null) throw new IllegalStateException("Unable to change the properties when already running");
+ if (!KafkaConfig.ZkConnectProp().equalsIgnoreCase(name)
+ && !KafkaConfig.BrokerIdProp().equalsIgnoreCase(name)
+ && !KafkaConfig.HostNameProp().equalsIgnoreCase(name)) {
+ this.config.setProperty(name, value);
+ }
+ return this;
+ }
+
+ /**
+ * Set multiple configuration properties. Several key properties that deal with Zookeeper, the host name, and the broker ID,
+ * may not be set via this method and are ignored since they are controlled elsewhere in this instance.
+ *
+ * @param properties the configuration properties; may be null or empty
+ * @return this instance to allow chaining methods; never null
+ * @throws IllegalStateException if the server is running when this method is called
+ */
+ public KafkaServer setProperties( Properties properties ) {
+ if (server != null) throw new IllegalStateException("Unable to change the properties when already running");
+ properties.stringPropertyNames().forEach(propName -> {
+ setProperty(propName, properties.getProperty(propName));
+ });
+ return this;
+ }
+
+
+
+ /**
+ * Set the port for the server.
+ *
+ * @param port the desired port, or {@code -1} if a random available port should be found and used
+ * @return this instance to allow chaining methods; never null
+ */
+ public KafkaServer setPort(int port) {
+ this.desiredPort = port > 0 ? port : -1;
+ this.port = desiredPort;
+ return this;
+ }
+
+ /**
+ * Get a copy of the complete configuration that is or will be used by the running server.
+ *
+ * @return the properties for the currently-running server; may be empty if not running
+ */
+ public Properties config() {
+ Properties runningConfig = new Properties(config);
+ runningConfig.setProperty(KafkaConfig.ZkConnectProp(), zookeeperConnection());
+ runningConfig.setProperty(KafkaConfig.BrokerIdProp(), Integer.toString(brokerId));
+ runningConfig.setProperty(KafkaConfig.HostNameProp(), "localhost");
+ runningConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), String.valueOf(Boolean.TRUE));
+ return runningConfig;
+ }
+
+ /**
+ * Get the connection string. If the server is not {@link #startup() running} and the port is to be dynamically discovered
+ * upon startup, then this method returns "{@code localhost:-1}".
+ *
+ * @return the connection string; never null
+ */
+ public String getConnection() {
+ return "localhost:" + port;
+ }
+
+ /**
+ * Start the embedded Kafka server.
+ *
+ * @return this instance to allow chaining methods; never null
+ * @throws IllegalStateException if the server is already running
+ */
+ public synchronized KafkaServer startup() {
+ if (server != null) throw new IllegalStateException("" + this + " is already running");
+
+ // Determine the storage directory and adjust the configuration ...
+ Properties config = config();
+ if (logsDir == null) {
+ try {
+ File temp = File.createTempFile("kafka", "suffix");
+ this.logsDir = temp.getParentFile();
+ temp.delete();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create temporary directory", e);
+ }
+ }
+ config.setProperty(KafkaConfig.LogDirProp(), logsDir.getAbsolutePath());
+
+ // Determine the port and adjust the configuration ...
+ port = desiredPort > 0 ? desiredPort : IoUtil.getAvailablePort();
+ config.setProperty(KafkaConfig.PortProp(), Integer.toString(port));
+ // config.setProperty("metadata.broker.list", getConnection());
+
+ // Start the server ...
+ try {
+ LOGGER.debug("Starting Kafka broker {} @ {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath());
+ server = new kafka.server.KafkaServer(new KafkaConfig(config), new SystemTime(), scala.Option.apply(null));
+ server.startup();
+ LOGGER.info("Started Kafka server {} @ {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath());
+ return this;
+ } catch (RuntimeException e) {
+ server = null;
+ throw e;
+ }
+ }
+
+ /**
+ * Shutdown the embedded Kafka server and delete all data.
+ */
+ public synchronized void shutdown() {
+ if (server != null) {
+ try {
+ server.shutdown();
+ } finally {
+ server = null;
+ port = desiredPort;
+ }
+ }
+ }
+
+ /**
+ * Delete all of the data associated with this server.
+ */
+ public synchronized void deleteData() {
+ if (server == null) {
+ // Delete all data ...
+ try {
+ IoUtil.delete(this.logsDir);
+ } catch (IOException e) {
+ LOGGER.error("Unable to delete directory '{}'", this.logsDir, e);
+ }
+ }
+ }
+
+ /**
+ * Get the Zookeeper utilities used by the running Kafka server.
+ *
+ * @return the Zookeeper utilities, or null if the Kafka server is not running
+ */
+ public ZkUtils getZkUtils() {
+ return server != null ? server.zkUtils() : null;
+ }
+
+ /**
+ * Create the specified topics.
+ *
+ * @param topics the names of the topics to create
+ */
+ public void createTopics(String... topics) {
+ createTopics(1,1,topics);
+ }
+
+ /**
+ * Create the specified topics.
+ *
+ * @param numPartitions the number of partitions for each topic
+ * @param replicationFactor the replication factor for each topic
+ * @param topics the names of the topics to create
+ */
+ public void createTopics(int numPartitions, int replicationFactor, String... topics) {
+ for (String topic : topics) {
+ if ( topic != null ) createTopic(topic, numPartitions, replicationFactor);
+ }
+ }
+
+ /**
+ * Create the specified topic.
+ *
+ * @param topic the name of the topic to create
+ * @param numPartitions the number of partitions for the topic
+ * @param replicationFactor the replication factor for the topic
+ */
+ public void createTopic( String topic, int numPartitions, int replicationFactor ) {
+ AdminUtils.createTopic(getZkUtils(), topic, 1, 1, new Properties());
+ }
+
+ /**
+ * Perform the supplied function on each directory used by this server.
+ *
+ * @param consumer the consumer function; may not be null
+ */
+ void onEachDirectory(Consumer consumer) {
+ consumer.accept(getStateDirectory());
+ }
+
+ /**
+ * Get the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself
+ * under this directory.
+ *
+ * @return the parent directory for the broker's state; may be null if a temporary directory will be used
+ */
+ public File getStateDirectory() {
+ return this.logsDir;
+ }
+
+ /**
+ * Set the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself
+ * under this directory.
+ *
+ * @param stateDirectory the parent directory for the broker's state; may be null if a temporary directory will be used
+ * @throws IllegalArgumentException if the supplied file is not a directory or not writable
+ */
+ public void setStateDirectory(File stateDirectory) {
+ if (stateDirectory != null && stateDirectory.exists() && !stateDirectory.isDirectory() && !stateDirectory.canWrite()
+ && !stateDirectory.canRead()) {
+ throw new IllegalArgumentException("The directory must be readable and writable");
+ }
+ this.logsDir = stateDirectory;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaServer{" + getConnection() + "}";
+ }
+
+ protected static class SystemTime implements kafka.utils.Time {
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServer.java b/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServer.java
new file mode 100644
index 000000000..365a5fb4e
--- /dev/null
+++ b/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServer.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.function.Consumer;
+
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.annotation.ThreadSafe;
+import io.debezium.util.IoUtil;
+
+/**
+ * A lightweight embeddable Zookeeper server useful for unit testing.
+ *
+ * @author Randall Hauch
+ * @see KafkaCluster
+ */
+@ThreadSafe
+public class ZookeeperServer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServer.class);
+ public static int DEFAULT_TICK_TIME = 500;
+
+ /**
+ * The basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be
+ * twice the tickTime.
+ */
+ private volatile int tickTime = DEFAULT_TICK_TIME;
+ private volatile int port = -1;
+
+ private volatile ServerCnxnFactory factory;
+ private volatile File dataDir;
+ private volatile File snapshotDir;
+ private volatile File logDir;
+
+ /**
+ * Create a new server instance.
+ */
+ public ZookeeperServer() {
+ }
+
+ /**
+ * Start the embedded Zookeeper server.
+ *
+ * @return this instance to allow chaining methods; never null
+ * @throws IOException if there is an error during startup
+ * @throws IllegalStateException if the server is already running
+ */
+ public synchronized ZookeeperServer startup() throws IOException {
+ if (factory != null) throw new IllegalStateException("" + this + " is already running");
+
+ if (this.port == -1) this.port = IoUtil.getAvailablePort();
+ this.factory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), 1024);
+ if ( this.dataDir == null ) {
+ try {
+ File temp = File.createTempFile("kafka","suffix");
+ this.dataDir = temp.getParentFile();
+ temp.delete();
+ } catch ( IOException e ) {
+ throw new RuntimeException("Unable to create temporary directory",e);
+ }
+ }
+ this.snapshotDir = new File(this.dataDir,"snapshot");
+ this.logDir = new File(this.dataDir,"log");
+ this.snapshotDir.mkdirs();
+ this.logDir.mkdirs();
+
+ try {
+ factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
+ return this;
+ } catch (InterruptedException e) {
+ factory = null;
+ Thread.interrupted();
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Shutdown the embedded Zookeeper server and delete all data.
+ */
+ public void shutdown() {
+ shutdown(true);
+ }
+
+ /**
+ * Shutdown the embedded Kafka server.
+ *
+ * @param deleteData true if the data should be removed, or false otherwise
+ */
+ public synchronized void shutdown(boolean deleteData) {
+ if (factory != null) {
+ try {
+ factory.shutdown();
+ } finally {
+ factory = null;
+ if (deleteData) {
+ // Delete all data ...
+ try {
+ IoUtil.delete(this.snapshotDir,this.logDir);
+ } catch ( IOException e ) {
+ LOGGER.error("Unable to delete data upon shutdown",e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the connection string. If the server is not {@link #startup() running} and the port is to be dynamically discovered
+ * upon startup, then this method returns "{@code localhost:-1}".
+ *
+ * @return the connection string; never null
+ */
+ public String getConnection() {
+ return "localhost:" + port;
+ }
+
+ /**
+ * Set the port for the server.
+ *
+ * @param port the desired port, or {@code -1} if a random available port should be found and used
+ * @return this instance to allow chaining methods; never null
+ */
+ public ZookeeperServer setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * Set the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will
+ * be twice the tickTime.
+ *
+ * @param tickTime the desired value, or non-positive if the default of {@value #DEFAULT_TICK_TIME} be used
+ * @return this instance to allow chaining methods; never null
+ */
+ public ZookeeperServer setTickTime(int tickTime) {
+ this.tickTime = tickTime > 0 ? tickTime : DEFAULT_TICK_TIME;
+ return this;
+ }
+
+ /**
+ * Get the current port.
+ *
+ * @return the port number, or {@code -1} if the port should be discovered upon {@link #startup()}
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Get the basic time unit in milliseconds used by ZooKeeper.
+ *
+ * @return tick time; always positive
+ */
+ public int getTickTime() {
+ return tickTime;
+ }
+
+ /**
+ * Perform the supplied function on each directory used by this server.
+ *
+ * @param consumer the consumer function; may not be null
+ */
+ void onEachDirectory(Consumer consumer) {
+ consumer.accept(getSnapshotDirectory());
+ consumer.accept(getLogDirectory());
+ }
+
+ /**
+ * Get the parent directory where the server's snapshots are kept.
+ * @return the parent directory for the server's snapshots; never null once the server is running
+ */
+ File getSnapshotDirectory() {
+ return this.snapshotDir;
+ }
+
+ /**
+ * Get the parent directory where the server's logs are kept.
+ * @return the parent directory for the server's logs; never null once the server is running
+ */
+ File getLogDirectory() {
+ return this.logDir;
+ }
+
+ /**
+ * Get the parent directory where the server's logs and snapshots will be kept.
+ * @return the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used
+ */
+ public File getStateDirectory() {
+ return this.logDir;
+ }
+
+ /**
+ * Set the parent directory where the server's logs and snapshots will be kept.
+ * @param dataDir the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used
+ * @return this instance to allow method chaining; never null
+ * @throws IllegalArgumentException if the supplied file is not a directory or not writable
+ */
+ public ZookeeperServer setStateDirectory(File dataDir) {
+ if ( dataDir != null && dataDir.exists() && !dataDir.isDirectory() && !dataDir.canWrite() && !dataDir.canRead() ) {
+ throw new IllegalArgumentException("The directory must be readable and writable");
+ }
+ this.dataDir = dataDir;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "ZookeeperServer{" + getConnection() + "}";
+ }
+}
\ No newline at end of file
diff --git a/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServerTest.java b/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServerTest.java
new file mode 100644
index 000000000..3fcabd5a6
--- /dev/null
+++ b/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.kafka;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.fest.assertions.Assertions.assertThat;
+
+import io.debezium.util.Testing;
+
+/**
+ * @author Randall Hauch
+ *
+ */
+public class ZookeeperServerTest {
+
+ private ZookeeperServer server;
+ private File dataDir;
+
+ @Before
+ public void beforeEach() {
+ dataDir = Testing.Files.createTestingDirectory("zk");
+ Testing.Files.delete(dataDir);
+ server = new ZookeeperServer();
+ server.setStateDirectory(dataDir);
+ }
+
+ @After
+ public void afterEach() {
+ Testing.Files.delete(dataDir);
+ }
+
+ @Test
+ public void shouldStartServerAndRemoveData() throws Exception {
+ Testing.debug("Running 1");
+ server.startup();
+ server.onEachDirectory(this::assertValidDataDirectory);
+ server.shutdown(true);
+ server.onEachDirectory(this::assertDoesNotExist);
+ }
+
+ @Test
+ public void shouldStartServerAndLeaveData() throws Exception {
+ Testing.debug("Running 2");
+ server.startup();
+ server.onEachDirectory(this::assertValidDataDirectory);
+ server.shutdown(false);
+ server.onEachDirectory(this::assertValidDataDirectory);
+ }
+
+ protected void assertValidDataDirectory(File dir) {
+ assertThat(dir.exists()).isTrue();
+ assertThat(dir.isDirectory()).isTrue();
+ assertThat(dir.canWrite()).isTrue();
+ assertThat(dir.canRead()).isTrue();
+ assertThat(Testing.Files.inTargetDir(dir)).isTrue();
+ }
+
+ protected void assertDoesNotExist(File dir) {
+ assertThat(dir.exists()).isFalse();
+ }
+}
\ No newline at end of file
diff --git a/debezium-core/src/test/resources/log4j.properties b/debezium-core/src/test/resources/log4j.properties
index bb0bd4546..f962d866f 100644
--- a/debezium-core/src/test/resources/log4j.properties
+++ b/debezium-core/src/test/resources/log4j.properties
@@ -4,8 +4,56 @@ log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %m (%c)%n
+# Direct Zookeeper log messages to stdout with special prefix
+log4j.appender.zk=org.apache.log4j.ConsoleAppender
+log4j.appender.zk.Target=System.out
+log4j.appender.zk.layout=org.apache.log4j.PatternLayout
+log4j.appender.zk.layout.ConversionPattern=%d{ISO8601} %-5p ZOO %m%n
+#log4j.appender.zk.layout.ConversionPattern=%d{ISO8601} %-5p ZOO %m (%c)%n
+
+# Direct Zookeeper Client log messages to stdout with special prefix
+log4j.appender.zkclient=org.apache.log4j.ConsoleAppender
+log4j.appender.zkclient.Target=System.out
+log4j.appender.zkclient.layout=org.apache.log4j.PatternLayout
+log4j.appender.zkclient.layout.ConversionPattern=%d{ISO8601} %-5p ZKC %m%n
+#log4j.appender.zkclient.layout.ConversionPattern=%d{ISO8601} %-5p ZKC %m (%c)%n
+
+# Direct Kafka log messages to stdout with special prefix
+log4j.appender.kafka=org.apache.log4j.ConsoleAppender
+log4j.appender.kafka.Target=System.out
+log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafka.layout.ConversionPattern=%d{ISO8601} %-5p KAF %m%n
+#log4j.appender.kafka.layout.ConversionPattern=%d{ISO8601} %-5p KAF %m (%c)%n
+
+# Direct Debezium log messages to stdout with special prefix
+log4j.appender.debezium=org.apache.log4j.ConsoleAppender
+log4j.appender.debezium.Target=System.out
+log4j.appender.debezium.layout=org.apache.log4j.PatternLayout
+log4j.appender.debezium.layout.ConversionPattern=%d{ISO8601} %-5p DBZ %m (%c)%n
+
# Root logger option
log4j.rootLogger=INFO, stdout
# Set up the default logging to be INFO level, then override specific units
-log4j.logger.io.debezium=INFO
\ No newline at end of file
+log4j.logger.io.debezium=INFO, debezium
+log4j.additivity.io.debezium=false
+
+# Kafka is pretty verbose at INFO level, so for brevity use ERROR everywhere except INFO at kafka.server.KafkaServer
+log4j.logger.org.apache.kafka=ERROR, kafka
+log4j.additivity.org.apache.kafka=false
+log4j.logger.kafka=ERROR, kafka
+log4j.additivity.kafka=false
+log4j.logger.kafka.server.KafkaServer=ERROR, kafka
+log4j.additivity.kafka.server.KafkaServer=false
+log4j.logger.state.change.logger=ERROR, kafka
+log4j.additivity.state.change.logger=false
+
+# Zookeeper is pretty verbose at INFO level, so for brevity use ERROR everywhere except INFO at org.apache.zookeeper.server.ZooKeeperServer
+log4j.logger.org.apache.zookeeper=ERROR, zk
+log4j.additivity.org.apache.zookeeper=false
+log4j.logger.org.apache.zookeeper.server.ZooKeeperServer=ERROR, zk
+log4j.additivity.org.apache.zookeeper.server.ZooKeeperServer=false
+
+# Zookeeper Client is pretty verbose at INFO level, so for brevity use ERROR everywhere ...
+log4j.logger.org.I0Itec.zkclient=ERROR, zkclient
+log4j.additivity.org.I0Itec.zkclient=false
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 63158ec4c..808008012 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,12 @@
Debezium community
+
+ 0.9.0.0
+ 2.11
+ 2.11.7
+ 2.4.0
+ 3.4.6
support/checkstyle
@@ -158,6 +164,36 @@
test
+
+
+ org.apache.kafka
+ kafka_${version.kafka.scala}
+ ${version.kafka}
+
+
+ org.apache.kafka
+ kafka-clients
+ ${version.kafka}
+
+
+ org.apache.zookeeper
+ zookeeper
+ ${version.zookeeper}
+
+
+ org.apache.kafka
+ kafka_${version.kafka.scala}
+ ${version.kafka}
+ test
+ test
+
+
+ org.apache.curator
+ curator-test
+ ${version.curator}
+ test
+
+
junit