diff --git a/debezium-core/pom.xml b/debezium-core/pom.xml index 908beefad..4e4b96265 100644 --- a/debezium-core/pom.xml +++ b/debezium-core/pom.xml @@ -46,6 +46,21 @@ org.easytesting fest-assert + + + org.apache.kafka + kafka_${version.kafka.scala} + test + + + org.apache.curator + curator-test + + + org.apache.zookeeper + zookeeper + test + diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java index d11e92bd7..a77c63f5b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java @@ -251,7 +251,7 @@ protected void parseNextStatement(Marker marker) { */ protected void parseComment(Marker marker) { String comment = tokens.consume(); - logger.debug("COMMENT: {}", comment); + logger.trace("COMMENT: {}", comment); } /** @@ -304,16 +304,16 @@ protected void parseUnknownStatement(Marker marker) { } protected void debugParsed(Marker statementStart) { - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { String statement = removeLineFeeds(tokens.getContentFrom(statementStart)); - logger.debug("PARSED: {}", statement); + logger.trace("PARSED: {}", statement); } } protected void debugSkipped(Marker statementStart) { - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { String statement = removeLineFeeds(tokens.getContentFrom(statementStart)); - logger.debug("SKIPPED: {}", statement); + logger.trace("SKIPPED: {}", statement); } } diff --git a/debezium-core/src/main/java/io/debezium/util/IoUtil.java b/debezium-core/src/main/java/io/debezium/util/IoUtil.java index b42e12a05..ac2a3d009 100644 --- a/debezium-core/src/main/java/io/debezium/util/IoUtil.java +++ b/debezium-core/src/main/java/io/debezium/util/IoUtil.java @@ -380,7 +380,7 @@ public static void delete(File... filesOrFolder) throws IOException { public static void delete(Path path) throws IOException { if (path != null) { if (path.toAbsolutePath().toFile().exists()) { - LOGGER.info("Deleting '" + path + "'..."); + LOGGER.debug("Deleting '" + path + "'..."); Set options = EnumSet.noneOf(FileVisitOption.class); int maxDepth = 10; FileVisitor removingVisitor = new SimpleFileVisitor() { diff --git a/debezium-core/src/test/java/io/debezium/kafka/KafkaCluster.java b/debezium-core/src/test/java/io/debezium/kafka/KafkaCluster.java new file mode 100644 index 000000000..2306c6ed9 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/kafka/KafkaCluster.java @@ -0,0 +1,1041 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.kafka; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.StringJoiner; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiPredicate; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.ThreadSafe; +import io.debezium.document.Document; +import io.debezium.document.DocumentSerdes; +import io.debezium.util.IoUtil; + +/** + * An embeddable cluster of Kafka servers and a single Zookeeper server. This may be useful when creating a complete environment + * within a single process, but doing so offers limited durability and fault tolerance compared to the normal approach of + * using an external cluster of Kafka servers and Zookeeper servers with proper replication and fault tolerance. + * + * @author Randall Hauch + */ +@ThreadSafe +public class KafkaCluster { + + public static final boolean DEFAULT_DELETE_DATA_UPON_SHUTDOWN = true; + public static final boolean DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP = false; + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCluster.class); + + private final ConcurrentMap kafkaServers = new ConcurrentHashMap<>(); + private final ZookeeperServer zkServer = new ZookeeperServer(); + private volatile File dataDir = null; + private volatile boolean deleteDataUponShutdown = DEFAULT_DELETE_DATA_UPON_SHUTDOWN; + private volatile boolean deleteDataPriorToStartup = DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP; + private volatile boolean running = false; + private volatile Properties kafkaConfig = null; + private volatile int startingKafkaPort = -1; + private final AtomicLong nextKafkaPort = new AtomicLong(startingKafkaPort); + + /** + * Create a new embedded cluster. + */ + public KafkaCluster() { + } + + /** + * Specify whether the data is to be deleted upon {@link #shutdown()}. + * + * @param delete true if the data is to be deleted upon shutdown, or false otherwise + * @return this instance to allow chaining methods; never null + * @throws IllegalStateException if the cluster is running + */ + public KafkaCluster deleteDataUponShutdown(boolean delete) { + if (running) throw new IllegalStateException("Unable to change cluster settings when running"); + this.deleteDataUponShutdown = delete; + return this; + } + + /** + * Specify whether the data is to be deleted prior to {@link #startup()}. + * + * @param delete true if the data is to be deleted upon shutdown, or false otherwise + * @return this instance to allow chaining methods; never null + * @throws IllegalStateException if the cluster is running + */ + public KafkaCluster deleteDataPriorToStartup(boolean delete) { + if (running) throw new IllegalStateException("Unable to change cluster settings when running"); + this.deleteDataPriorToStartup = delete; + return this; + } + + /** + * Add a number of new Kafka broker to the cluster. The broker IDs will be generated. + * + * @param count the number of new brokers to add + * @return this instance to allow chaining methods; never null + * @throws IllegalStateException if the cluster is running + */ + public KafkaCluster addBrokers(int count) { + if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running"); + AtomicLong added = new AtomicLong(); + while (added.intValue() < count) { + kafkaServers.computeIfAbsent(new Integer(added.intValue() + 1), id -> { + added.incrementAndGet(); + KafkaServer server = new KafkaServer(zkServer::getConnection, id); + if (dataDir != null) server.setStateDirectory(dataDir); + if (kafkaConfig != null) server.setProperties(kafkaConfig); + if (startingKafkaPort >= 0) server.setPort((int) this.nextKafkaPort.getAndIncrement()); + return server; + }); + } + return this; + } + + /** + * Set the parent directory where the brokers logs and server's logs and snapshots will be kept. + * + * @param dataDir the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used + * @return this instance to allow chaining methods; never null + * @throws IllegalStateException if the cluster is running + * @throws IllegalArgumentException if the supplied file is not a directory or not writable + */ + public KafkaCluster usingDirectory(File dataDir) { + if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running"); + if (dataDir != null && dataDir.exists() && !dataDir.isDirectory() && !dataDir.canWrite() && !dataDir.canRead()) { + throw new IllegalArgumentException("The directory must be readable and writable"); + } + this.dataDir = dataDir; + return this; + } + + /** + * Set the configuration properties for each of the brokers. This method does nothing if the supplied properties are null or + * empty. + * + * @param properties the Kafka configuration properties + * @return this instance to allow chaining methods; never null + * @throws IllegalStateException if the cluster is running + */ + public KafkaCluster withKafkaConfiguration(Properties properties) { + if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running"); + if (properties != null && !properties.isEmpty()) { + kafkaConfig = new Properties(properties); + kafkaServers.values().forEach(kafka -> kafka.setProperties(kafkaConfig)); + } + return this; + } + + /** + * Set the port numbers for Zookeeper and the Kafka brokers. + * + * @param zkPort the port number that Zookeeper should use; may be -1 if an available port should be discovered + * @param firstKafkaPort the port number for the first Kafka broker (additional brokers will use subsequent port numbers); + * may be -1 if available ports should be discovered + * @return this instance to allow chaining methods; never null + * @throws IllegalStateException if the cluster is running + */ + public KafkaCluster withPorts(int zkPort, int firstKafkaPort) { + if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running"); + this.zkServer.setPort(zkPort); + this.startingKafkaPort = firstKafkaPort; + if (this.startingKafkaPort >= 0) { + this.nextKafkaPort.set(this.startingKafkaPort); + kafkaServers.values().forEach(kafka -> kafka.setPort((int) this.nextKafkaPort.getAndIncrement())); + } + return this; + } + + /** + * Determine if the cluster is running. + * + * @return true if the cluster is running, or false otherwise + */ + public boolean isRunning() { + return running; + } + + /** + * Start the embedded Zookeeper server and the Kafka servers {@link #addBrokers(int) in the cluster}. + * This method does nothing if the cluster is already running. + * + * @return this instance to allow chaining methods; never null + * @throws IOException if there is an error during startup + */ + public synchronized KafkaCluster startup() throws IOException { + if (!running) { + if (dataDir == null) { + try { + File temp = File.createTempFile("kafka", "suffix"); + dataDir = new File(temp.getParentFile(), "cluster"); + dataDir.mkdirs(); + temp.delete(); + } catch (IOException e) { + throw new RuntimeException("Unable to create temporary directory", e); + } + } else if (deleteDataPriorToStartup) { + IoUtil.delete(dataDir); + dataDir.mkdirs(); + } + File zkDir = new File(dataDir, "zk"); + zkServer.setStateDirectory(zkDir); // does error checking + this.dataDir = dataDir; + File kafkaDir = new File(dataDir, "kafka"); + kafkaServers.values().forEach(server -> server.setStateDirectory(new File(kafkaDir, "broker" + server.brokerId()))); + + zkServer.startup(); + LOGGER.debug("Starting {} brokers", kafkaServers.size()); + kafkaServers.values().forEach(KafkaServer::startup); + running = true; + } + return this; + } + + /** + * Shutdown the embedded Zookeeper server and the Kafka servers {@link #addBrokers(int) in the cluster}. + * This method does nothing if the cluster is not running. + * + * @return this instance to allow chaining methods; never null + */ + public synchronized KafkaCluster shutdown() { + if (running) { + try { + kafkaServers.values().forEach(this::shutdownReliably); + } finally { + try { + zkServer.shutdown(deleteDataUponShutdown); + } catch (Throwable t) { + LOGGER.error("Error while shutting down {}", zkServer, t); + } finally { + if (deleteDataUponShutdown) { + try { + kafkaServers.values().forEach(KafkaServer::deleteData); + } finally { + try { + IoUtil.delete(this.dataDir); + } catch (IOException e) { + LOGGER.error("Error while deleting cluster data", e); + } + } + } + running = false; + } + } + } + return this; + } + + /** + * Create the specified topics. + * + * @param topics the names of the topics to create + * @throws IllegalStateException if the cluster is not running + */ + public void createTopics(String... topics) { + LOGGER.debug("Creating topics: {}", Arrays.toString(topics)); + if (!running) throw new IllegalStateException("The cluster must be running to create topics"); + kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(topics)); + } + + /** + * Create the specified topics. + * + * @param topics the names of the topics to create + * @throws IllegalStateException if the cluster is not running + */ + public void createTopics(Set topics) { + createTopics(topics.toArray(new String[topics.size()])); + } + + /** + * Create the specified topics. + * + * @param numPartitions the number of partitions for each topic + * @param replicationFactor the replication factor for each topic + * @param topics the names of the topics to create + */ + public void createTopics(int numPartitions, int replicationFactor, String... topics) { + LOGGER.debug("Creating topics with {} partitions and {} replicas each: {}", numPartitions, replicationFactor, + Arrays.toString(topics)); + if (!running) throw new IllegalStateException("The cluster must be running to create topics"); + kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(numPartitions, replicationFactor, topics)); + } + + /** + * Create the specified topics. + * + * @param numPartitions the number of partitions for each topic + * @param replicationFactor the replication factor for each topic + * @param topics the names of the topics to create + */ + public void createTopics(int numPartitions, int replicationFactor, Set topics) { + createTopics(numPartitions, replicationFactor, topics.toArray(new String[topics.size()])); + } + + /** + * Create the specified topic. + * + * @param topic the name of the topic to create + * @param numPartitions the number of partitions for the topic + * @param replicationFactor the replication factor for the topic + */ + public void createTopic(String topic, int numPartitions, int replicationFactor) { + LOGGER.debug("Creating topic '{}' with {} partitions and {} replicas", topic, numPartitions, replicationFactor); + if (!running) throw new IllegalStateException("The cluster must be running to create topics"); + kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopic(topic, numPartitions, replicationFactor)); + } + + /** + * Perform the supplied function on each directory used by this cluster. + * + * @param consumer the consumer function; may not be null + */ + void onEachDirectory(java.util.function.Consumer consumer) { + consumer.accept(zkServer.getSnapshotDirectory()); + consumer.accept(zkServer.getLogDirectory()); + kafkaServers.values().forEach(server -> consumer.accept(server.getStateDirectory())); + } + + /** + * Get the list of brokers. + * + * @return the broker list + */ + public String brokerList() { + StringJoiner joiner = new StringJoiner(","); + kafkaServers.values().forEach(server -> { + joiner.add(server.getConnection()); + }); + return joiner.toString(); + } + + private void shutdownReliably(KafkaServer server) { + try { + server.shutdown(); + } catch (Throwable t) { + LOGGER.error("Error while shutting down {}", server, t); + } + } + + /** + * Obtain the interface for using this cluster. + * + * @return the usage interface; never null + * @throws IllegalStateException if the cluster is not running + */ + public Usage useTo() { + if (!running) throw new IllegalStateException("Unable to use the cluster it is not running"); + return new Usage(); + } + + /** + * A simple interactive Kafka producer for use with the cluster. + * + * @param the type of key + * @param the type of value + */ + public static interface InteractiveProducer extends Closeable { + /** + * Write to the topic with the given name a record with the specified key and value. The message is flushed immediately. + * + * @param topic the name of the topic; may not be null + * @param key the key; may not be null + * @param value the value; may not be null + * @return this producer instance to allow chaining methods together + */ + default InteractiveProducer write(String topic, K key, V value) { + return write(new ProducerRecord<>(topic, key, value)); + } + + /** + * Write the specified record to the topic with the given name. The message is flushed immediately. + * + * @param record the record; may not be null + * @return this producer instance to allow chaining methods together + */ + InteractiveProducer write(ProducerRecord record); + + /** + * Close this producer's connection to Kafka and clean up all resources. + */ + @Override + public void close(); + } + + /** + * A simple interactive Kafka consumer for use with the cluster. + * + * @param the type of key + * @param the type of value + */ + public static interface InteractiveConsumer extends Closeable { + /** + * Block until a record can be read from this consumer's topic, and return the value in that record. + * + * @return the value; never null + * @throws InterruptedException if the thread is interrupted while blocking + */ + default V nextValue() throws InterruptedException { + return nextRecord().value(); + } + + /** + * Block until a record can be read from this consumer's topic, and return the record. + * + * @return the record; never null + * @throws InterruptedException if the thread is interrupted while blocking + */ + ConsumerRecord nextRecord() throws InterruptedException; + + /** + * Block until a record can be read from this consumer's topic or until the timeout occurs, and if a record was read + * return the value in that record. + * + * @param timeout the maximum amount of time to block to wait for a record + * @param unit the unit of time for the {@code timeout} + * @return the value, or null if the method timed out + * @throws InterruptedException if the thread is interrupted while blocking + */ + default V nextValue(long timeout, TimeUnit unit) throws InterruptedException { + ConsumerRecord record = nextRecord(timeout, unit); + return record != null ? record.value() : null; + } + + /** + * Block until a record can be read from this consumer's topic or until the timeout occurs, and if a record was read + * return the record. + * + * @param timeout the maximum amount of time to block to wait for a record + * @param unit the unit of time for the {@code timeout} + * @return the record, or null if the method timed out + * @throws InterruptedException if the thread is interrupted while blocking + */ + ConsumerRecord nextRecord(long timeout, TimeUnit unit) throws InterruptedException; + + /** + * Obtain a stream to consume the input messages. This method can be used in place of repeated calls to the + * {@code next...()} methods. + * + * @return the stream of all messages. + */ + Stream> stream(); + + /** + * Obtain a stream over all of accumulated input messages. This method can be called multiple times, and each time + * the resulting stream will operate over all messages that received by this consumer, and is completely + * independent of the {@link #stream()}, {@link #nextRecord()}, {@link #nextRecord(long, TimeUnit)}, {@link #nextValue()} + * and {@link #nextValue(long, TimeUnit)} methods. + * + * @return the stream of all messages. + */ + Stream> streamAll(); + + /** + * Asynchronously close this consumer's connection to Kafka and begin to clean up all resources. + */ + @Override + public void close(); + } + + /** + * A set of methods to use a running KafkaCluster. + */ + public class Usage { + + /** + * Get a new set of properties for consumers that want to talk to this server. + * + * @param groupId the group ID for the consumer; may not be null + * @param clientId the optional identifier for the client; may be null if not needed + * @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is + * out of range; may be null for the default to be used + * @return the mutable consumer properties + * @see #getProducerProperties(String) + */ + public Properties getConsumerProperties(String groupId, String clientId, OffsetResetStrategy autoOffsetReset) { + if (groupId == null) throw new IllegalArgumentException("The groupId is required"); + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList()); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + if (autoOffsetReset != null) { + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset.toString().toLowerCase()); + } + if (clientId != null) props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + return props; + } + + /** + * Get a new set of properties for producers that want to talk to this server. + * + * @param clientId the optional identifier for the client; may be null if not needed + * @return the mutable producer properties + * @see #getConsumerProperties(String, String, OffsetResetStrategy) + */ + public Properties getProducerProperties(String clientId) { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList()); + props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(1)); + if (clientId != null) props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId); + return props; + } + + /** + * Create an {@link InteractiveProducer simple producer} that can be used to write messages to the cluster. + * + * @param producerName the name of the producer; may not be null + * @param keySerializer the serializer for the keys; may not be null + * @param valueSerializer the serializer for the values; may not be null + * @return the object that can be used to produce messages; never null + */ + public InteractiveProducer createProducer(String producerName, Serializer keySerializer, + Serializer valueSerializer) { + Properties props = getProducerProperties(producerName); + KafkaProducer producer = new KafkaProducer<>(props, keySerializer, valueSerializer); + return new InteractiveProducer() { + @Override + public InteractiveProducer write(ProducerRecord record) { + producer.send(record); + producer.flush(); + return this; + } + + @Override + public void close() { + producer.close(); + } + }; + } + + /** + * Create an {@link InteractiveProducer simple producer} that can be used to write {@link Document} messages to the + * cluster. + * + * @param producerName the name of the producer; may not be null + * @return the object that can be used to produce messages; never null + */ + public InteractiveProducer createProducer(String producerName) { + return createProducer(producerName, new StringSerializer(), new DocumentSerdes()); + } + + /** + * Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster. + * + * @param groupId the name of the group; may not be null + * @param clientId the name of the client; may not be null + * @param topicName the name of the topic to read; may not be null and may not be empty + * @param keyDeserializer the deserializer for the keys; may not be null + * @param valueDeserializer the deserializer for the values; may not be null + * @param completion the function to call when the consumer terminates; may be null + * @return the running interactive consumer; never null + */ + public InteractiveConsumer createConsumer(String groupId, String clientId, String topicName, + Deserializer keyDeserializer, + Deserializer valueDeserializer, Runnable completion) { + Set topicNames = Collections.singleton(topicName); + return createConsumer(groupId, clientId, topicNames, keyDeserializer, valueDeserializer, completion); + } + + /** + * Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster. + * + * @param groupId the name of the group; may not be null + * @param clientId the name of the client; may not be null + * @param topicNames the names of the topics to read; may not be null and may not be empty + * @param keyDeserializer the deserializer for the keys; may not be null + * @param valueDeserializer the deserializer for the values; may not be null + * @param completion the function to call when the consumer terminates; may be null + * @return the running interactive consumer; never null + */ + public InteractiveConsumer createConsumer(String groupId, String clientId, Set topicNames, + Deserializer keyDeserializer, + Deserializer valueDeserializer, Runnable completion) { + BlockingQueue> consumed = new LinkedBlockingQueue<>(); + List> allMessages = new LinkedList<>(); + AtomicBoolean keepReading = new AtomicBoolean(); + consume(groupId, clientId, OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, () -> keepReading.get(), + completion, topicNames, record -> { + consumed.add(record); + allMessages.add(record); + }); + return new InteractiveConsumer() { + @Override + public ConsumerRecord nextRecord() throws InterruptedException { + return consumed.take(); + } + + @Override + public ConsumerRecord nextRecord(long timeout, TimeUnit unit) throws InterruptedException { + return consumed.poll(timeout, unit); + } + + @Override + public void close() { + keepReading.set(false); + } + + @Override + public Stream> stream() { + return consumed.stream(); + } + + @Override + public Stream> streamAll() { + return allMessages.stream(); + } + }; + } + + /** + * Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster. + * + * @param groupId the name of the group; may not be null + * @param clientId the name of the client; may not be null + * @param topicName the name of the topic to read; may not be null and may not be empty + * @param completion the function to call when the consumer terminates; may be null + * @return the running interactive consumer; never null + */ + public InteractiveConsumer createConsumer(String groupId, String clientId, String topicName, + Runnable completion) { + Set topicNames = Collections.singleton(topicName); + return createConsumer(groupId, clientId, topicNames, new StringDeserializer(), new DocumentSerdes(), completion); + } + + /** + * Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster. + * + * @param groupId the name of the group; may not be null + * @param clientId the name of the client; may not be null + * @param topicNames the names of the topics to read; may not be null and may not be empty + * @param completion the function to call when the consumer terminates; may be null + * @return the running interactive consumer; never null + */ + public InteractiveConsumer createConsumer(String groupId, String clientId, Set topicNames, + Runnable completion) { + return createConsumer(groupId, clientId, topicNames, new StringDeserializer(), new DocumentSerdes(), completion); + } + + /** + * Use the supplied function to asynchronously produce {@link Document} messages and write them to the cluster. + * + * @param producerName the name of the producer; may not be null + * @param producer the function that will asynchronously + */ + public void produce(String producerName, Consumer> producer) { + produce(producerName, new StringSerializer(), new DocumentSerdes(), producer); + } + + /** + * Use the supplied function to asynchronously produce messages and write them to the cluster. + * + * @param producerName the name of the producer; may not be null + * @param keySerializer the serializer for the keys; may not be null + * @param valueSerializer the serializer for the values; may not be null + * @param producer the function that will asynchronously + */ + public void produce(String producerName, Serializer keySerializer, Serializer valueSerializer, + Consumer> producer) { + Properties props = getProducerProperties(producerName); + KafkaProducer kafkaProducer = new KafkaProducer<>(props, keySerializer, valueSerializer); + InteractiveProducer interactive = new InteractiveProducer() { + @Override + public InteractiveProducer write(ProducerRecord record) { + kafkaProducer.send(record); + kafkaProducer.flush(); + return this; + } + + @Override + public void close() { + kafkaProducer.close(); + } + }; + Thread t = new Thread(() -> { + try { + producer.accept(interactive); + } finally { + interactive.close(); + } + }); + t.setName(producerName + "-thread"); + t.start(); + } + + /** + * Use the supplied function to asynchronously produce messages and write them to the cluster. + * + * @param producerName the name of the producer; may not be null + * @param messageCount the number of messages to produce; must be positive + * @param keySerializer the serializer for the keys; may not be null + * @param valueSerializer the serializer for the values; may not be null + * @param completionCallback the function to be called when the producer is completed; may be null + * @param messageSupplier the function to produce messages; may not be null + */ + public void produce(String producerName, int messageCount, + Serializer keySerializer, Serializer valueSerializer, + Runnable completionCallback, + Supplier> messageSupplier) { + Properties props = getProducerProperties(producerName); + Thread t = new Thread(() -> { + LOGGER.debug("Starting producer {} to write {} messages", producerName, messageCount); + try (KafkaProducer producer = new KafkaProducer<>(props, keySerializer, valueSerializer)) { + for (int i = 0; i != messageCount; ++i) { + ProducerRecord record = messageSupplier.get(); + producer.send(record); + producer.flush(); + LOGGER.debug("Producer {}: sent message {}", producerName, record); + } + } finally { + if (completionCallback != null) completionCallback.run(); + LOGGER.debug("Stopping producer {}", producerName); + } + }); + t.setName(producerName + "-thread"); + t.start(); + } + + /** + * Use the supplied function to asynchronously produce messages with String keys and values, and write them to the + * cluster. + * + * @param messageCount the number of messages to produce; must be positive + * @param completionCallback the function to be called when the producer is completed; may be null + * @param messageSupplier the function to produce messages; may not be null + */ + public void produceStrings(int messageCount, + Runnable completionCallback, Supplier> messageSupplier) { + Serializer keySer = new StringSerializer(); + Serializer valSer = keySer; + String randomId = UUID.randomUUID().toString(); + produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier); + } + + /** + * Use the supplied function to asynchronously produce messages with String keys and {@link Document} values, and write + * them to the cluster. + * + * @param messageCount the number of messages to produce; must be positive + * @param completionCallback the function to be called when the producer is completed; may be null + * @param messageSupplier the function to produce messages; may not be null + */ + public void produceDocuments(int messageCount, + Runnable completionCallback, Supplier> messageSupplier) { + Serializer keySer = new StringSerializer(); + Serializer valSer = new DocumentSerdes(); + String randomId = UUID.randomUUID().toString(); + produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier); + } + + /** + * Use the supplied function to asynchronously produce messages with String keys and Integer values, and write them to the + * cluster. + * + * @param messageCount the number of messages to produce; must be positive + * @param completionCallback the function to be called when the producer is completed; may be null + * @param messageSupplier the function to produce messages; may not be null + */ + public void produceIntegers(int messageCount, + Runnable completionCallback, Supplier> messageSupplier) { + Serializer keySer = new StringSerializer(); + Serializer valSer = new IntegerSerializer(); + String randomId = UUID.randomUUID().toString(); + produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier); + } + + /** + * Asynchronously produce messages with String keys and sequential Integer values, and write them to the cluster. + * + * @param topic the name of the topic to which the messages should be written; may not be null + * @param messageCount the number of messages to produce; must be positive + * @param initialValue the first integer value to produce + * @param completionCallback the function to be called when the producer is completed; may be null + */ + public void produceIntegers(String topic, int messageCount, int initialValue, + Runnable completionCallback) { + AtomicLong counter = new AtomicLong(initialValue); + produceIntegers(messageCount, completionCallback, () -> { + long i = counter.incrementAndGet(); + String keyAndValue = Long.toString(i); + return new ProducerRecord(topic, keyAndValue, new Integer((int) i)); + }); + } + + /** + * Asynchronously produce messages with monotonically increasing String keys and values obtained from the supplied + * function, and write them to the cluster. + * + * @param topic the name of the topic to which the messages should be written; may not be null + * @param messageCount the number of messages to produce; must be positive + * @param completionCallback the function to be called when the producer is completed; may be null + * @param valueSupplier the value supplier; may not be null + */ + public void produceStrings(String topic, int messageCount, + Runnable completionCallback, Supplier valueSupplier) { + AtomicLong counter = new AtomicLong(0); + produceStrings(messageCount, completionCallback, () -> { + long i = counter.incrementAndGet(); + String keyAndValue = Long.toString(i); + return new ProducerRecord(topic, keyAndValue, valueSupplier.get()); + }); + } + + /** + * Asynchronously produce messages with monotonically increasing String keys and values obtained from the supplied + * function, and write them to the cluster. + * + * @param topic the name of the topic to which the messages should be written; may not be null + * @param messageCount the number of messages to produce; must be positive + * @param completionCallback the function to be called when the producer is completed; may be null + * @param valueSupplier the value supplier; may not be null + */ + public void produceDocuments(String topic, int messageCount, + Runnable completionCallback, Supplier valueSupplier) { + AtomicLong counter = new AtomicLong(0); + produceDocuments(messageCount, completionCallback, () -> { + long i = counter.incrementAndGet(); + String keyAndValue = Long.toString(i); + return new ProducerRecord(topic, keyAndValue, valueSupplier.get()); + }); + } + + /** + * Use the supplied function to asynchronously consume messages from the cluster. + * + * @param groupId the name of the group; may not be null + * @param clientId the name of the client; may not be null + * @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is + * out of range; may be null for the default to be used + * @param keyDeserializer the deserializer for the keys; may not be null + * @param valueDeserializer the deserializer for the values; may not be null + * @param continuation the function that determines if the consumer should continue; may not be null + * @param completion the function to call when the consumer terminates; may be null + * @param topics the set of topics to consume; may not be null or empty + * @param consumerFunction the function to consume the messages; may not be null + */ + public void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset, + Deserializer keyDeserializer, Deserializer valueDeserializer, + BooleanSupplier continuation, Runnable completion, Collection topics, + java.util.function.Consumer> consumerFunction) { + Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset); + Thread t = new Thread(() -> { + LOGGER.debug("Starting consumer {} to read messages", clientId); + try (KafkaConsumer consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) { + consumer.subscribe(new ArrayList<>(topics)); + while (continuation.getAsBoolean()) { + consumer.poll(10).forEach(record -> { + LOGGER.debug("Consumer {}: consuming message {}", clientId, record); + consumerFunction.accept(record); + consumer.commitAsync(); + }); + } + } finally { + if (completion != null) completion.run(); + LOGGER.debug("Stopping consumer {}", clientId); + } + }); + t.setName(clientId + "-thread"); + t.start(); + } + + /** + * Asynchronously consume all messages from the cluster. + * + * @param continuation the function that determines if the consumer should continue; may not be null + * @param completion the function to call when all messages have been consumed; may be null + * @param topics the set of topics to consume; may not be null or empty + * @param consumerFunction the function to consume the messages; may not be null + */ + public void consumeDocuments(BooleanSupplier continuation, Runnable completion, Collection topics, + java.util.function.Consumer> consumerFunction) { + Deserializer keyDes = new StringDeserializer(); + Deserializer valDes = new DocumentSerdes(); + String randomId = UUID.randomUUID().toString(); + consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction); + } + + /** + * Asynchronously consume all messages from the cluster. + * + * @param continuation the function that determines if the consumer should continue; may not be null + * @param completion the function to call when all messages have been consumed; may be null + * @param topics the set of topics to consume; may not be null or empty + * @param consumerFunction the function to consume the messages; may not be null + */ + public void consumeStrings(BooleanSupplier continuation, Runnable completion, Collection topics, + java.util.function.Consumer> consumerFunction) { + Deserializer keyDes = new StringDeserializer(); + Deserializer valDes = keyDes; + String randomId = UUID.randomUUID().toString(); + consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction); + } + + /** + * Asynchronously consume all messages from the cluster. + * + * @param continuation the function that determines if the consumer should continue; may not be null + * @param completion the function to call when all messages have been consumed; may be null + * @param topics the set of topics to consume; may not be null or empty + * @param consumerFunction the function to consume the messages; may not be null + */ + public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection topics, + java.util.function.Consumer> consumerFunction) { + Deserializer keyDes = new StringDeserializer(); + Deserializer valDes = new IntegerDeserializer(); + String randomId = UUID.randomUUID().toString(); + consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction); + } + + /** + * Asynchronously consume all messages on the given topic from the cluster. + * + * @param topicName the name of the topic; may not be null + * @param count the expected number of messages to read before terminating; may not be null + * @param timeout the maximum time that this consumer should run before terminating; must be positive + * @param unit the unit of time for the timeout; may not be null + * @param completion the function to call when all messages have been consumed; may be null + * @param consumer the function to consume the messages; may not be null + */ + public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion, + BiPredicate consumer) { + AtomicLong readCounter = new AtomicLong(); + consumeStrings(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit), + completion, + Collections.singleton(topicName), + record -> { + if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet(); + }); + } + + /** + * Asynchronously consume all messages on the given topic from the cluster. + * + * @param topicName the name of the topic; may not be null + * @param count the expected number of messages to read before terminating; may not be null + * @param timeout the maximum time that this consumer should run before terminating; must be positive + * @param unit the unit of time for the timeout; may not be null + * @param completion the function to call when all messages have been consumed; may be null + * @param consumer the function to consume the messages; may not be null + */ + public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion, + BiPredicate consumer) { + AtomicLong readCounter = new AtomicLong(); + consumeDocuments(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit), + completion, + Collections.singleton(topicName), + record -> { + if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet(); + }); + } + + /** + * Asynchronously consume all messages on the given topic from the cluster. + * + * @param topicName the name of the topic; may not be null + * @param count the expected number of messages to read before terminating; may not be null + * @param timeout the maximum time that this consumer should run before terminating; must be positive + * @param unit the unit of time for the timeout; may not be null + * @param completion the function to call when all messages have been consumed; may be null + * @param consumer the function to consume the messages; may not be null + */ + public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion, + BiPredicate consumer) { + AtomicLong readCounter = new AtomicLong(); + consumeIntegers(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit), + completion, + Collections.singleton(topicName), + record -> { + if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet(); + }); + } + + /** + * Asynchronously consume all messages on the given topic from the cluster. + * + * @param topicName the name of the topic; may not be null + * @param count the expected number of messages to read before terminating; may not be null + * @param timeout the maximum time that this consumer should run before terminating; must be positive + * @param unit the unit of time for the timeout; may not be null + * @param completion the function to call when all messages have been consumed; may be null + */ + public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) { + consumeStrings(topicName, count, timeout, unit, completion, (key, value) -> true); + } + + /** + * Asynchronously consume all messages on the given topic from the cluster. + * + * @param topicName the name of the topic; may not be null + * @param count the expected number of messages to read before terminating; may not be null + * @param timeout the maximum time that this consumer should run before terminating; must be positive + * @param unit the unit of time for the timeout; may not be null + * @param completion the function to call when all messages have been consumed; may be null + */ + public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) { + consumeDocuments(topicName, count, timeout, unit, completion, (key, value) -> true); + } + + /** + * Asynchronously consume all messages on the given topic from the cluster. + * + * @param topicName the name of the topic; may not be null + * @param count the expected number of messages to read before terminating; may not be null + * @param timeout the maximum time that this consumer should run before terminating; must be positive + * @param unit the unit of time for the timeout; may not be null + * @param completion the function to call when all messages have been consumed; may be null + */ + public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) { + consumeIntegers(topicName, count, timeout, unit, completion, (key, value) -> true); + } + + protected BooleanSupplier continueIfNotExpired(BooleanSupplier continuation, long timeout, TimeUnit unit) { + return new BooleanSupplier() { + long stopTime = 0L; + + @Override + public boolean getAsBoolean() { + if (stopTime == 0L) stopTime = System.currentTimeMillis() + unit.toMillis(timeout); + return continuation.getAsBoolean() && System.currentTimeMillis() <= stopTime; + } + }; + } + } +} \ No newline at end of file diff --git a/debezium-core/src/test/java/io/debezium/kafka/KafkaClusterTest.java b/debezium-core/src/test/java/io/debezium/kafka/KafkaClusterTest.java new file mode 100644 index 000000000..e2dead2d7 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/kafka/KafkaClusterTest.java @@ -0,0 +1,196 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.kafka; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.fest.assertions.Assertions.assertThat; + +import io.debezium.util.Stopwatch; +import io.debezium.util.Testing; + +/** + * @author Randall Hauch + */ +public class KafkaClusterTest { + + private KafkaCluster cluster; + private File dataDir; + + @Before + public void beforeEach() { + dataDir = Testing.Files.createTestingDirectory("cluster"); + Testing.Files.delete(dataDir); + cluster = new KafkaCluster().usingDirectory(dataDir); + } + + @After + public void afterEach() { + cluster.shutdown(); + Testing.Files.delete(dataDir); + } + + @Test + public void shouldStartClusterWithOneBrokerAndRemoveData() throws Exception { + cluster.deleteDataUponShutdown(true).addBrokers(1).startup(); + cluster.onEachDirectory(this::assertValidDataDirectory); + cluster.shutdown(); + cluster.onEachDirectory(this::assertDoesNotExist); + } + + @Test + public void shouldStartClusterWithMultipleBrokerAndRemoveData() throws Exception { + cluster.deleteDataUponShutdown(true).addBrokers(3).startup(); + cluster.onEachDirectory(this::assertValidDataDirectory); + cluster.shutdown(); + cluster.onEachDirectory(this::assertDoesNotExist); + } + + @Test + public void shouldStartClusterWithOneBrokerAndLeaveData() throws Exception { + cluster.deleteDataUponShutdown(false).addBrokers(1).startup(); + cluster.onEachDirectory(this::assertValidDataDirectory); + cluster.shutdown(); + cluster.onEachDirectory(this::assertValidDataDirectory); + } + + @Test + public void shouldStartClusterWithMultipleBrokerAndLeaveData() throws Exception { + cluster.deleteDataUponShutdown(false).addBrokers(3).startup(); + cluster.onEachDirectory(this::assertValidDataDirectory); + cluster.shutdown(); + cluster.onEachDirectory(this::assertValidDataDirectory); + } + + @Test + public void shouldStartClusterAndAllowProducersAndConsumersToUseIt() throws Exception { + Testing.Debug.enable(); + final String topicName = "topicA"; + final CountDownLatch completion = new CountDownLatch(2); + final int numMessages = 100; + final AtomicLong messagesRead = new AtomicLong(0); + + // Start a cluster and create a topic ... + cluster.deleteDataUponShutdown(false).addBrokers(1).startup(); + cluster.createTopics(topicName); + + // Consume messages asynchronously ... + Stopwatch sw = Stopwatch.reusable().start(); + cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> { + messagesRead.incrementAndGet(); + return true; + }); + + // Produce some messages asynchronously ... + cluster.useTo().produceIntegers(topicName, numMessages, 1, completion::countDown); + + // Wait for both to complete ... + if (completion.await(10, TimeUnit.SECONDS)) { + sw.stop(); + Testing.debug("Both consumer and producer completed normally in " + sw.durations()); + } else { + Testing.debug("Consumer and/or producer did not completed normally"); + } + + assertThat(messagesRead.get()).isEqualTo(numMessages); + } + + @Test + public void shouldStartClusterAndAllowInteractiveProductionAndAutomaticConsumersToUseIt() throws Exception { + Testing.Debug.enable(); + final String topicName = "topicA"; + final CountDownLatch completion = new CountDownLatch(1); + final int numMessages = 3; + final AtomicLong messagesRead = new AtomicLong(0); + + // Start a cluster and create a topic ... + cluster.deleteDataUponShutdown(false).addBrokers(1).startup(); + cluster.createTopics(topicName); + + // Consume messages asynchronously ... + Stopwatch sw = Stopwatch.reusable().start(); + cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> { + messagesRead.incrementAndGet(); + return true; + }); + + // Produce some messages interactively ... + cluster.useTo() + .createProducer("manual", new StringSerializer(), new IntegerSerializer()) + .write(topicName, "key1", 1) + .write(topicName, "key2", 2) + .write(topicName, "key3", 3) + .close(); + + // Wait for the consumer to to complete ... + if (completion.await(10, TimeUnit.SECONDS)) { + sw.stop(); + Testing.debug("The consumer completed normally in " + sw.durations()); + } else { + Testing.debug("Consumer did not completed normally"); + } + + assertThat(messagesRead.get()).isEqualTo(numMessages); + } + + @Test + public void shouldStartClusterAndAllowAsynchronousProductionAndAutomaticConsumersToUseIt() throws Exception { + Testing.Debug.enable(); + final String topicName = "topicA"; + final CountDownLatch completion = new CountDownLatch(2); + final int numMessages = 3; + final AtomicLong messagesRead = new AtomicLong(0); + + // Start a cluster and create a topic ... + cluster.deleteDataUponShutdown(false).addBrokers(1).startup(); + cluster.createTopics(topicName); + + // Consume messages asynchronously ... + Stopwatch sw = Stopwatch.reusable().start(); + cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> { + messagesRead.incrementAndGet(); + return true; + }); + + // Produce some messages interactively ... + cluster.useTo().produce("manual", new StringSerializer(), new IntegerSerializer(), produer -> { + produer.write(topicName, "key1", 1); + produer.write(topicName, "key2", 2); + produer.write(topicName, "key3", 3); + completion.countDown(); + }); + + // Wait for the consumer to to complete ... + if (completion.await(10, TimeUnit.SECONDS)) { + sw.stop(); + Testing.debug("The consumer completed normally in " + sw.durations()); + } else { + Testing.debug("Consumer did not completed normally"); + } + assertThat(messagesRead.get()).isEqualTo(numMessages); + } + + protected void assertValidDataDirectory(File dir) { + assertThat(dir.exists()).isTrue(); + assertThat(dir.isDirectory()).isTrue(); + assertThat(dir.canWrite()).isTrue(); + assertThat(dir.canRead()).isTrue(); + assertThat(Testing.Files.inTargetDir(dir)).isTrue(); + } + + protected void assertDoesNotExist(File dir) { + assertThat(dir.exists()).isFalse(); + } +} \ No newline at end of file diff --git a/debezium-core/src/test/java/io/debezium/kafka/KafkaServer.java b/debezium-core/src/test/java/io/debezium/kafka/KafkaServer.java new file mode 100644 index 000000000..81ed7b953 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/kafka/KafkaServer.java @@ -0,0 +1,339 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.kafka; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.ThreadSafe; +import io.debezium.util.IoUtil; +import kafka.admin.AdminUtils; +import kafka.server.KafkaConfig; +import kafka.utils.ZkUtils; + +/** + * A small embedded Kafka server. + * + * @author Randall Hauch + */ +@ThreadSafe +public class KafkaServer { + + public static final int DEFAULT_BROKER_ID = 1; + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServer.class); + + private final Supplier zkConnection; + private final int brokerId; + private volatile File logsDir; + private final Properties config; + private volatile int desiredPort = -1; + private volatile int port = -1; + private volatile kafka.server.KafkaServer server; + + /** + * Create a new server instance. + * + * @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null + */ + public KafkaServer(Supplier zookeeperConnection) { + this(zookeeperConnection, DEFAULT_BROKER_ID); + } + + /** + * Create a new server instance. + * + * @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null + * @param brokerId the unique broker ID + */ + public KafkaServer(Supplier zookeeperConnection, int brokerId) { + this(zookeeperConnection, brokerId, -1); + } + + /** + * Create a new server instance. + * + * @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null + * @param brokerId the unique broker ID + * @param port the desired port + */ + public KafkaServer(Supplier zookeeperConnection, int brokerId, int port) { + if (zookeeperConnection == null) throw new IllegalArgumentException("The Zookeeper connection string supplier may not be null"); + this.zkConnection = zookeeperConnection; + this.brokerId = brokerId; + this.config = new Properties(); + setPort(port); + populateDefaultConfiguration(this.config); + } + + protected int brokerId() { + return brokerId; + } + + protected String zookeeperConnection() { + return this.zkConnection.get(); + } + + /** + * Set the initial default configuration properties. This method is called from the constructors and can be overridden + * to customize these properties. + * + * @param props the configuration properties; never null + */ + protected void populateDefaultConfiguration(Properties props) { + config.setProperty(KafkaConfig.NumPartitionsProp(), String.valueOf(1)); + config.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1)); + } + + /** + * Set a configuration property. Several key properties that deal with Zookeeper, the host name, and the broker ID, + * may not be set via this method and are ignored since they are controlled elsewhere in this instance. + * + * @param name the property name; may not be null + * @param value the property value; may be null + * @return this instance to allow chaining methods; never null + * @throws IllegalStateException if the server is running when this method is called + */ + public KafkaServer setProperty(String name, String value) { + if (server != null) throw new IllegalStateException("Unable to change the properties when already running"); + if (!KafkaConfig.ZkConnectProp().equalsIgnoreCase(name) + && !KafkaConfig.BrokerIdProp().equalsIgnoreCase(name) + && !KafkaConfig.HostNameProp().equalsIgnoreCase(name)) { + this.config.setProperty(name, value); + } + return this; + } + + /** + * Set multiple configuration properties. Several key properties that deal with Zookeeper, the host name, and the broker ID, + * may not be set via this method and are ignored since they are controlled elsewhere in this instance. + * + * @param properties the configuration properties; may be null or empty + * @return this instance to allow chaining methods; never null + * @throws IllegalStateException if the server is running when this method is called + */ + public KafkaServer setProperties( Properties properties ) { + if (server != null) throw new IllegalStateException("Unable to change the properties when already running"); + properties.stringPropertyNames().forEach(propName -> { + setProperty(propName, properties.getProperty(propName)); + }); + return this; + } + + + + /** + * Set the port for the server. + * + * @param port the desired port, or {@code -1} if a random available port should be found and used + * @return this instance to allow chaining methods; never null + */ + public KafkaServer setPort(int port) { + this.desiredPort = port > 0 ? port : -1; + this.port = desiredPort; + return this; + } + + /** + * Get a copy of the complete configuration that is or will be used by the running server. + * + * @return the properties for the currently-running server; may be empty if not running + */ + public Properties config() { + Properties runningConfig = new Properties(config); + runningConfig.setProperty(KafkaConfig.ZkConnectProp(), zookeeperConnection()); + runningConfig.setProperty(KafkaConfig.BrokerIdProp(), Integer.toString(brokerId)); + runningConfig.setProperty(KafkaConfig.HostNameProp(), "localhost"); + runningConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), String.valueOf(Boolean.TRUE)); + return runningConfig; + } + + /** + * Get the connection string. If the server is not {@link #startup() running} and the port is to be dynamically discovered + * upon startup, then this method returns "{@code localhost:-1}". + * + * @return the connection string; never null + */ + public String getConnection() { + return "localhost:" + port; + } + + /** + * Start the embedded Kafka server. + * + * @return this instance to allow chaining methods; never null + * @throws IllegalStateException if the server is already running + */ + public synchronized KafkaServer startup() { + if (server != null) throw new IllegalStateException("" + this + " is already running"); + + // Determine the storage directory and adjust the configuration ... + Properties config = config(); + if (logsDir == null) { + try { + File temp = File.createTempFile("kafka", "suffix"); + this.logsDir = temp.getParentFile(); + temp.delete(); + } catch (IOException e) { + throw new RuntimeException("Unable to create temporary directory", e); + } + } + config.setProperty(KafkaConfig.LogDirProp(), logsDir.getAbsolutePath()); + + // Determine the port and adjust the configuration ... + port = desiredPort > 0 ? desiredPort : IoUtil.getAvailablePort(); + config.setProperty(KafkaConfig.PortProp(), Integer.toString(port)); + // config.setProperty("metadata.broker.list", getConnection()); + + // Start the server ... + try { + LOGGER.debug("Starting Kafka broker {} @ {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath()); + server = new kafka.server.KafkaServer(new KafkaConfig(config), new SystemTime(), scala.Option.apply(null)); + server.startup(); + LOGGER.info("Started Kafka server {} @ {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath()); + return this; + } catch (RuntimeException e) { + server = null; + throw e; + } + } + + /** + * Shutdown the embedded Kafka server and delete all data. + */ + public synchronized void shutdown() { + if (server != null) { + try { + server.shutdown(); + } finally { + server = null; + port = desiredPort; + } + } + } + + /** + * Delete all of the data associated with this server. + */ + public synchronized void deleteData() { + if (server == null) { + // Delete all data ... + try { + IoUtil.delete(this.logsDir); + } catch (IOException e) { + LOGGER.error("Unable to delete directory '{}'", this.logsDir, e); + } + } + } + + /** + * Get the Zookeeper utilities used by the running Kafka server. + * + * @return the Zookeeper utilities, or null if the Kafka server is not running + */ + public ZkUtils getZkUtils() { + return server != null ? server.zkUtils() : null; + } + + /** + * Create the specified topics. + * + * @param topics the names of the topics to create + */ + public void createTopics(String... topics) { + createTopics(1,1,topics); + } + + /** + * Create the specified topics. + * + * @param numPartitions the number of partitions for each topic + * @param replicationFactor the replication factor for each topic + * @param topics the names of the topics to create + */ + public void createTopics(int numPartitions, int replicationFactor, String... topics) { + for (String topic : topics) { + if ( topic != null ) createTopic(topic, numPartitions, replicationFactor); + } + } + + /** + * Create the specified topic. + * + * @param topic the name of the topic to create + * @param numPartitions the number of partitions for the topic + * @param replicationFactor the replication factor for the topic + */ + public void createTopic( String topic, int numPartitions, int replicationFactor ) { + AdminUtils.createTopic(getZkUtils(), topic, 1, 1, new Properties()); + } + + /** + * Perform the supplied function on each directory used by this server. + * + * @param consumer the consumer function; may not be null + */ + void onEachDirectory(Consumer consumer) { + consumer.accept(getStateDirectory()); + } + + /** + * Get the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself + * under this directory. + * + * @return the parent directory for the broker's state; may be null if a temporary directory will be used + */ + public File getStateDirectory() { + return this.logsDir; + } + + /** + * Set the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself + * under this directory. + * + * @param stateDirectory the parent directory for the broker's state; may be null if a temporary directory will be used + * @throws IllegalArgumentException if the supplied file is not a directory or not writable + */ + public void setStateDirectory(File stateDirectory) { + if (stateDirectory != null && stateDirectory.exists() && !stateDirectory.isDirectory() && !stateDirectory.canWrite() + && !stateDirectory.canRead()) { + throw new IllegalArgumentException("The directory must be readable and writable"); + } + this.logsDir = stateDirectory; + } + + @Override + public String toString() { + return "KafkaServer{" + getConnection() + "}"; + } + + protected static class SystemTime implements kafka.utils.Time { + @Override + public long milliseconds() { + return System.currentTimeMillis(); + } + + @Override + public long nanoseconds() { + return System.nanoTime(); + } + + @Override + public void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // Ignore + } + } + } +} \ No newline at end of file diff --git a/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServer.java b/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServer.java new file mode 100644 index 000000000..365a5fb4e --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServer.java @@ -0,0 +1,220 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.kafka; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.function.Consumer; + +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.ThreadSafe; +import io.debezium.util.IoUtil; + +/** + * A lightweight embeddable Zookeeper server useful for unit testing. + * + * @author Randall Hauch + * @see KafkaCluster + */ +@ThreadSafe +public class ZookeeperServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServer.class); + public static int DEFAULT_TICK_TIME = 500; + + /** + * The basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be + * twice the tickTime. + */ + private volatile int tickTime = DEFAULT_TICK_TIME; + private volatile int port = -1; + + private volatile ServerCnxnFactory factory; + private volatile File dataDir; + private volatile File snapshotDir; + private volatile File logDir; + + /** + * Create a new server instance. + */ + public ZookeeperServer() { + } + + /** + * Start the embedded Zookeeper server. + * + * @return this instance to allow chaining methods; never null + * @throws IOException if there is an error during startup + * @throws IllegalStateException if the server is already running + */ + public synchronized ZookeeperServer startup() throws IOException { + if (factory != null) throw new IllegalStateException("" + this + " is already running"); + + if (this.port == -1) this.port = IoUtil.getAvailablePort(); + this.factory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), 1024); + if ( this.dataDir == null ) { + try { + File temp = File.createTempFile("kafka","suffix"); + this.dataDir = temp.getParentFile(); + temp.delete(); + } catch ( IOException e ) { + throw new RuntimeException("Unable to create temporary directory",e); + } + } + this.snapshotDir = new File(this.dataDir,"snapshot"); + this.logDir = new File(this.dataDir,"log"); + this.snapshotDir.mkdirs(); + this.logDir.mkdirs(); + + try { + factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime)); + return this; + } catch (InterruptedException e) { + factory = null; + Thread.interrupted(); + throw new IOException(e); + } + } + + /** + * Shutdown the embedded Zookeeper server and delete all data. + */ + public void shutdown() { + shutdown(true); + } + + /** + * Shutdown the embedded Kafka server. + * + * @param deleteData true if the data should be removed, or false otherwise + */ + public synchronized void shutdown(boolean deleteData) { + if (factory != null) { + try { + factory.shutdown(); + } finally { + factory = null; + if (deleteData) { + // Delete all data ... + try { + IoUtil.delete(this.snapshotDir,this.logDir); + } catch ( IOException e ) { + LOGGER.error("Unable to delete data upon shutdown",e); + } + } + } + } + } + + /** + * Get the connection string. If the server is not {@link #startup() running} and the port is to be dynamically discovered + * upon startup, then this method returns "{@code localhost:-1}". + * + * @return the connection string; never null + */ + public String getConnection() { + return "localhost:" + port; + } + + /** + * Set the port for the server. + * + * @param port the desired port, or {@code -1} if a random available port should be found and used + * @return this instance to allow chaining methods; never null + */ + public ZookeeperServer setPort(int port) { + this.port = port; + return this; + } + + /** + * Set the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will + * be twice the tickTime. + * + * @param tickTime the desired value, or non-positive if the default of {@value #DEFAULT_TICK_TIME} be used + * @return this instance to allow chaining methods; never null + */ + public ZookeeperServer setTickTime(int tickTime) { + this.tickTime = tickTime > 0 ? tickTime : DEFAULT_TICK_TIME; + return this; + } + + /** + * Get the current port. + * + * @return the port number, or {@code -1} if the port should be discovered upon {@link #startup()} + */ + public int getPort() { + return port; + } + + /** + * Get the basic time unit in milliseconds used by ZooKeeper. + * + * @return tick time; always positive + */ + public int getTickTime() { + return tickTime; + } + + /** + * Perform the supplied function on each directory used by this server. + * + * @param consumer the consumer function; may not be null + */ + void onEachDirectory(Consumer consumer) { + consumer.accept(getSnapshotDirectory()); + consumer.accept(getLogDirectory()); + } + + /** + * Get the parent directory where the server's snapshots are kept. + * @return the parent directory for the server's snapshots; never null once the server is running + */ + File getSnapshotDirectory() { + return this.snapshotDir; + } + + /** + * Get the parent directory where the server's logs are kept. + * @return the parent directory for the server's logs; never null once the server is running + */ + File getLogDirectory() { + return this.logDir; + } + + /** + * Get the parent directory where the server's logs and snapshots will be kept. + * @return the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used + */ + public File getStateDirectory() { + return this.logDir; + } + + /** + * Set the parent directory where the server's logs and snapshots will be kept. + * @param dataDir the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used + * @return this instance to allow method chaining; never null + * @throws IllegalArgumentException if the supplied file is not a directory or not writable + */ + public ZookeeperServer setStateDirectory(File dataDir) { + if ( dataDir != null && dataDir.exists() && !dataDir.isDirectory() && !dataDir.canWrite() && !dataDir.canRead() ) { + throw new IllegalArgumentException("The directory must be readable and writable"); + } + this.dataDir = dataDir; + return this; + } + + @Override + public String toString() { + return "ZookeeperServer{" + getConnection() + "}"; + } +} \ No newline at end of file diff --git a/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServerTest.java b/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServerTest.java new file mode 100644 index 000000000..3fcabd5a6 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/kafka/ZookeeperServerTest.java @@ -0,0 +1,69 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.kafka; + +import java.io.File; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.fest.assertions.Assertions.assertThat; + +import io.debezium.util.Testing; + +/** + * @author Randall Hauch + * + */ +public class ZookeeperServerTest { + + private ZookeeperServer server; + private File dataDir; + + @Before + public void beforeEach() { + dataDir = Testing.Files.createTestingDirectory("zk"); + Testing.Files.delete(dataDir); + server = new ZookeeperServer(); + server.setStateDirectory(dataDir); + } + + @After + public void afterEach() { + Testing.Files.delete(dataDir); + } + + @Test + public void shouldStartServerAndRemoveData() throws Exception { + Testing.debug("Running 1"); + server.startup(); + server.onEachDirectory(this::assertValidDataDirectory); + server.shutdown(true); + server.onEachDirectory(this::assertDoesNotExist); + } + + @Test + public void shouldStartServerAndLeaveData() throws Exception { + Testing.debug("Running 2"); + server.startup(); + server.onEachDirectory(this::assertValidDataDirectory); + server.shutdown(false); + server.onEachDirectory(this::assertValidDataDirectory); + } + + protected void assertValidDataDirectory(File dir) { + assertThat(dir.exists()).isTrue(); + assertThat(dir.isDirectory()).isTrue(); + assertThat(dir.canWrite()).isTrue(); + assertThat(dir.canRead()).isTrue(); + assertThat(Testing.Files.inTargetDir(dir)).isTrue(); + } + + protected void assertDoesNotExist(File dir) { + assertThat(dir.exists()).isFalse(); + } +} \ No newline at end of file diff --git a/debezium-core/src/test/resources/log4j.properties b/debezium-core/src/test/resources/log4j.properties index bb0bd4546..f962d866f 100644 --- a/debezium-core/src/test/resources/log4j.properties +++ b/debezium-core/src/test/resources/log4j.properties @@ -4,8 +4,56 @@ log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %m (%c)%n +# Direct Zookeeper log messages to stdout with special prefix +log4j.appender.zk=org.apache.log4j.ConsoleAppender +log4j.appender.zk.Target=System.out +log4j.appender.zk.layout=org.apache.log4j.PatternLayout +log4j.appender.zk.layout.ConversionPattern=%d{ISO8601} %-5p ZOO %m%n +#log4j.appender.zk.layout.ConversionPattern=%d{ISO8601} %-5p ZOO %m (%c)%n + +# Direct Zookeeper Client log messages to stdout with special prefix +log4j.appender.zkclient=org.apache.log4j.ConsoleAppender +log4j.appender.zkclient.Target=System.out +log4j.appender.zkclient.layout=org.apache.log4j.PatternLayout +log4j.appender.zkclient.layout.ConversionPattern=%d{ISO8601} %-5p ZKC %m%n +#log4j.appender.zkclient.layout.ConversionPattern=%d{ISO8601} %-5p ZKC %m (%c)%n + +# Direct Kafka log messages to stdout with special prefix +log4j.appender.kafka=org.apache.log4j.ConsoleAppender +log4j.appender.kafka.Target=System.out +log4j.appender.kafka.layout=org.apache.log4j.PatternLayout +log4j.appender.kafka.layout.ConversionPattern=%d{ISO8601} %-5p KAF %m%n +#log4j.appender.kafka.layout.ConversionPattern=%d{ISO8601} %-5p KAF %m (%c)%n + +# Direct Debezium log messages to stdout with special prefix +log4j.appender.debezium=org.apache.log4j.ConsoleAppender +log4j.appender.debezium.Target=System.out +log4j.appender.debezium.layout=org.apache.log4j.PatternLayout +log4j.appender.debezium.layout.ConversionPattern=%d{ISO8601} %-5p DBZ %m (%c)%n + # Root logger option log4j.rootLogger=INFO, stdout # Set up the default logging to be INFO level, then override specific units -log4j.logger.io.debezium=INFO \ No newline at end of file +log4j.logger.io.debezium=INFO, debezium +log4j.additivity.io.debezium=false + +# Kafka is pretty verbose at INFO level, so for brevity use ERROR everywhere except INFO at kafka.server.KafkaServer +log4j.logger.org.apache.kafka=ERROR, kafka +log4j.additivity.org.apache.kafka=false +log4j.logger.kafka=ERROR, kafka +log4j.additivity.kafka=false +log4j.logger.kafka.server.KafkaServer=ERROR, kafka +log4j.additivity.kafka.server.KafkaServer=false +log4j.logger.state.change.logger=ERROR, kafka +log4j.additivity.state.change.logger=false + +# Zookeeper is pretty verbose at INFO level, so for brevity use ERROR everywhere except INFO at org.apache.zookeeper.server.ZooKeeperServer +log4j.logger.org.apache.zookeeper=ERROR, zk +log4j.additivity.org.apache.zookeeper=false +log4j.logger.org.apache.zookeeper.server.ZooKeeperServer=ERROR, zk +log4j.additivity.org.apache.zookeeper.server.ZooKeeperServer=false + +# Zookeeper Client is pretty verbose at INFO level, so for brevity use ERROR everywhere ... +log4j.logger.org.I0Itec.zkclient=ERROR, zkclient +log4j.additivity.org.I0Itec.zkclient=false \ No newline at end of file diff --git a/pom.xml b/pom.xml index 63158ec4c..808008012 100644 --- a/pom.xml +++ b/pom.xml @@ -79,6 +79,12 @@ Debezium community + + 0.9.0.0 + 2.11 + 2.11.7 + 2.4.0 + 3.4.6 support/checkstyle @@ -158,6 +164,36 @@ test + + + org.apache.kafka + kafka_${version.kafka.scala} + ${version.kafka} + + + org.apache.kafka + kafka-clients + ${version.kafka} + + + org.apache.zookeeper + zookeeper + ${version.zookeeper} + + + org.apache.kafka + kafka_${version.kafka.scala} + ${version.kafka} + test + test + + + org.apache.curator + curator-test + ${version.curator} + test + + junit