DBZ-10 Added small utility so unit tests can run an embedded Kafka cluster within the same process.

This utility is only suitable for unit tests and therefore is defined in the test JAR of the `debezium-core` module. It certainly should never be used for production purposes.
This commit is contained in:
Randall Hauch 2016-02-04 15:18:27 -06:00
parent 52a7c096a4
commit 54b822bb72
10 changed files with 1971 additions and 7 deletions

View File

@ -46,6 +46,21 @@
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
</dependency>
<!-- Used for unit testing with Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${version.kafka.scala}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>

View File

@ -251,7 +251,7 @@ protected void parseNextStatement(Marker marker) {
*/
protected void parseComment(Marker marker) {
String comment = tokens.consume();
logger.debug("COMMENT: {}", comment);
logger.trace("COMMENT: {}", comment);
}
/**
@ -304,16 +304,16 @@ protected void parseUnknownStatement(Marker marker) {
}
protected void debugParsed(Marker statementStart) {
if (logger.isDebugEnabled()) {
if (logger.isTraceEnabled()) {
String statement = removeLineFeeds(tokens.getContentFrom(statementStart));
logger.debug("PARSED: {}", statement);
logger.trace("PARSED: {}", statement);
}
}
protected void debugSkipped(Marker statementStart) {
if (logger.isDebugEnabled()) {
if (logger.isTraceEnabled()) {
String statement = removeLineFeeds(tokens.getContentFrom(statementStart));
logger.debug("SKIPPED: {}", statement);
logger.trace("SKIPPED: {}", statement);
}
}

View File

@ -380,7 +380,7 @@ public static void delete(File... filesOrFolder) throws IOException {
public static void delete(Path path) throws IOException {
if (path != null) {
if (path.toAbsolutePath().toFile().exists()) {
LOGGER.info("Deleting '" + path + "'...");
LOGGER.debug("Deleting '" + path + "'...");
Set<FileVisitOption> options = EnumSet.noneOf(FileVisitOption.class);
int maxDepth = 10;
FileVisitor<Path> removingVisitor = new SimpleFileVisitor<Path>() {

View File

@ -0,0 +1,1041 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.document.Document;
import io.debezium.document.DocumentSerdes;
import io.debezium.util.IoUtil;
/**
* An embeddable cluster of Kafka servers and a single Zookeeper server. This may be useful when creating a complete environment
* within a single process, but doing so offers limited durability and fault tolerance compared to the normal approach of
* using an external cluster of Kafka servers and Zookeeper servers with proper replication and fault tolerance.
*
* @author Randall Hauch
*/
@ThreadSafe
public class KafkaCluster {
public static final boolean DEFAULT_DELETE_DATA_UPON_SHUTDOWN = true;
public static final boolean DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP = false;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCluster.class);
private final ConcurrentMap<Integer, KafkaServer> kafkaServers = new ConcurrentHashMap<>();
private final ZookeeperServer zkServer = new ZookeeperServer();
private volatile File dataDir = null;
private volatile boolean deleteDataUponShutdown = DEFAULT_DELETE_DATA_UPON_SHUTDOWN;
private volatile boolean deleteDataPriorToStartup = DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP;
private volatile boolean running = false;
private volatile Properties kafkaConfig = null;
private volatile int startingKafkaPort = -1;
private final AtomicLong nextKafkaPort = new AtomicLong(startingKafkaPort);
/**
* Create a new embedded cluster.
*/
public KafkaCluster() {
}
/**
* Specify whether the data is to be deleted upon {@link #shutdown()}.
*
* @param delete true if the data is to be deleted upon shutdown, or false otherwise
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster deleteDataUponShutdown(boolean delete) {
if (running) throw new IllegalStateException("Unable to change cluster settings when running");
this.deleteDataUponShutdown = delete;
return this;
}
/**
* Specify whether the data is to be deleted prior to {@link #startup()}.
*
* @param delete true if the data is to be deleted upon shutdown, or false otherwise
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster deleteDataPriorToStartup(boolean delete) {
if (running) throw new IllegalStateException("Unable to change cluster settings when running");
this.deleteDataPriorToStartup = delete;
return this;
}
/**
* Add a number of new Kafka broker to the cluster. The broker IDs will be generated.
*
* @param count the number of new brokers to add
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster addBrokers(int count) {
if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
AtomicLong added = new AtomicLong();
while (added.intValue() < count) {
kafkaServers.computeIfAbsent(new Integer(added.intValue() + 1), id -> {
added.incrementAndGet();
KafkaServer server = new KafkaServer(zkServer::getConnection, id);
if (dataDir != null) server.setStateDirectory(dataDir);
if (kafkaConfig != null) server.setProperties(kafkaConfig);
if (startingKafkaPort >= 0) server.setPort((int) this.nextKafkaPort.getAndIncrement());
return server;
});
}
return this;
}
/**
* Set the parent directory where the brokers logs and server's logs and snapshots will be kept.
*
* @param dataDir the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
* @throws IllegalArgumentException if the supplied file is not a directory or not writable
*/
public KafkaCluster usingDirectory(File dataDir) {
if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
if (dataDir != null && dataDir.exists() && !dataDir.isDirectory() && !dataDir.canWrite() && !dataDir.canRead()) {
throw new IllegalArgumentException("The directory must be readable and writable");
}
this.dataDir = dataDir;
return this;
}
/**
* Set the configuration properties for each of the brokers. This method does nothing if the supplied properties are null or
* empty.
*
* @param properties the Kafka configuration properties
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster withKafkaConfiguration(Properties properties) {
if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
if (properties != null && !properties.isEmpty()) {
kafkaConfig = new Properties(properties);
kafkaServers.values().forEach(kafka -> kafka.setProperties(kafkaConfig));
}
return this;
}
/**
* Set the port numbers for Zookeeper and the Kafka brokers.
*
* @param zkPort the port number that Zookeeper should use; may be -1 if an available port should be discovered
* @param firstKafkaPort the port number for the first Kafka broker (additional brokers will use subsequent port numbers);
* may be -1 if available ports should be discovered
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster withPorts(int zkPort, int firstKafkaPort) {
if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
this.zkServer.setPort(zkPort);
this.startingKafkaPort = firstKafkaPort;
if (this.startingKafkaPort >= 0) {
this.nextKafkaPort.set(this.startingKafkaPort);
kafkaServers.values().forEach(kafka -> kafka.setPort((int) this.nextKafkaPort.getAndIncrement()));
}
return this;
}
/**
* Determine if the cluster is running.
*
* @return true if the cluster is running, or false otherwise
*/
public boolean isRunning() {
return running;
}
/**
* Start the embedded Zookeeper server and the Kafka servers {@link #addBrokers(int) in the cluster}.
* This method does nothing if the cluster is already running.
*
* @return this instance to allow chaining methods; never null
* @throws IOException if there is an error during startup
*/
public synchronized KafkaCluster startup() throws IOException {
if (!running) {
if (dataDir == null) {
try {
File temp = File.createTempFile("kafka", "suffix");
dataDir = new File(temp.getParentFile(), "cluster");
dataDir.mkdirs();
temp.delete();
} catch (IOException e) {
throw new RuntimeException("Unable to create temporary directory", e);
}
} else if (deleteDataPriorToStartup) {
IoUtil.delete(dataDir);
dataDir.mkdirs();
}
File zkDir = new File(dataDir, "zk");
zkServer.setStateDirectory(zkDir); // does error checking
this.dataDir = dataDir;
File kafkaDir = new File(dataDir, "kafka");
kafkaServers.values().forEach(server -> server.setStateDirectory(new File(kafkaDir, "broker" + server.brokerId())));
zkServer.startup();
LOGGER.debug("Starting {} brokers", kafkaServers.size());
kafkaServers.values().forEach(KafkaServer::startup);
running = true;
}
return this;
}
/**
* Shutdown the embedded Zookeeper server and the Kafka servers {@link #addBrokers(int) in the cluster}.
* This method does nothing if the cluster is not running.
*
* @return this instance to allow chaining methods; never null
*/
public synchronized KafkaCluster shutdown() {
if (running) {
try {
kafkaServers.values().forEach(this::shutdownReliably);
} finally {
try {
zkServer.shutdown(deleteDataUponShutdown);
} catch (Throwable t) {
LOGGER.error("Error while shutting down {}", zkServer, t);
} finally {
if (deleteDataUponShutdown) {
try {
kafkaServers.values().forEach(KafkaServer::deleteData);
} finally {
try {
IoUtil.delete(this.dataDir);
} catch (IOException e) {
LOGGER.error("Error while deleting cluster data", e);
}
}
}
running = false;
}
}
}
return this;
}
/**
* Create the specified topics.
*
* @param topics the names of the topics to create
* @throws IllegalStateException if the cluster is not running
*/
public void createTopics(String... topics) {
LOGGER.debug("Creating topics: {}", Arrays.toString(topics));
if (!running) throw new IllegalStateException("The cluster must be running to create topics");
kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(topics));
}
/**
* Create the specified topics.
*
* @param topics the names of the topics to create
* @throws IllegalStateException if the cluster is not running
*/
public void createTopics(Set<String> topics) {
createTopics(topics.toArray(new String[topics.size()]));
}
/**
* Create the specified topics.
*
* @param numPartitions the number of partitions for each topic
* @param replicationFactor the replication factor for each topic
* @param topics the names of the topics to create
*/
public void createTopics(int numPartitions, int replicationFactor, String... topics) {
LOGGER.debug("Creating topics with {} partitions and {} replicas each: {}", numPartitions, replicationFactor,
Arrays.toString(topics));
if (!running) throw new IllegalStateException("The cluster must be running to create topics");
kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(numPartitions, replicationFactor, topics));
}
/**
* Create the specified topics.
*
* @param numPartitions the number of partitions for each topic
* @param replicationFactor the replication factor for each topic
* @param topics the names of the topics to create
*/
public void createTopics(int numPartitions, int replicationFactor, Set<String> topics) {
createTopics(numPartitions, replicationFactor, topics.toArray(new String[topics.size()]));
}
/**
* Create the specified topic.
*
* @param topic the name of the topic to create
* @param numPartitions the number of partitions for the topic
* @param replicationFactor the replication factor for the topic
*/
public void createTopic(String topic, int numPartitions, int replicationFactor) {
LOGGER.debug("Creating topic '{}' with {} partitions and {} replicas", topic, numPartitions, replicationFactor);
if (!running) throw new IllegalStateException("The cluster must be running to create topics");
kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopic(topic, numPartitions, replicationFactor));
}
/**
* Perform the supplied function on each directory used by this cluster.
*
* @param consumer the consumer function; may not be null
*/
void onEachDirectory(java.util.function.Consumer<File> consumer) {
consumer.accept(zkServer.getSnapshotDirectory());
consumer.accept(zkServer.getLogDirectory());
kafkaServers.values().forEach(server -> consumer.accept(server.getStateDirectory()));
}
/**
* Get the list of brokers.
*
* @return the broker list
*/
public String brokerList() {
StringJoiner joiner = new StringJoiner(",");
kafkaServers.values().forEach(server -> {
joiner.add(server.getConnection());
});
return joiner.toString();
}
private void shutdownReliably(KafkaServer server) {
try {
server.shutdown();
} catch (Throwable t) {
LOGGER.error("Error while shutting down {}", server, t);
}
}
/**
* Obtain the interface for using this cluster.
*
* @return the usage interface; never null
* @throws IllegalStateException if the cluster is not running
*/
public Usage useTo() {
if (!running) throw new IllegalStateException("Unable to use the cluster it is not running");
return new Usage();
}
/**
* A simple interactive Kafka producer for use with the cluster.
*
* @param <K> the type of key
* @param <V> the type of value
*/
public static interface InteractiveProducer<K, V> extends Closeable {
/**
* Write to the topic with the given name a record with the specified key and value. The message is flushed immediately.
*
* @param topic the name of the topic; may not be null
* @param key the key; may not be null
* @param value the value; may not be null
* @return this producer instance to allow chaining methods together
*/
default InteractiveProducer<K, V> write(String topic, K key, V value) {
return write(new ProducerRecord<>(topic, key, value));
}
/**
* Write the specified record to the topic with the given name. The message is flushed immediately.
*
* @param record the record; may not be null
* @return this producer instance to allow chaining methods together
*/
InteractiveProducer<K, V> write(ProducerRecord<K, V> record);
/**
* Close this producer's connection to Kafka and clean up all resources.
*/
@Override
public void close();
}
/**
* A simple interactive Kafka consumer for use with the cluster.
*
* @param <K> the type of key
* @param <V> the type of value
*/
public static interface InteractiveConsumer<K, V> extends Closeable {
/**
* Block until a record can be read from this consumer's topic, and return the value in that record.
*
* @return the value; never null
* @throws InterruptedException if the thread is interrupted while blocking
*/
default V nextValue() throws InterruptedException {
return nextRecord().value();
}
/**
* Block until a record can be read from this consumer's topic, and return the record.
*
* @return the record; never null
* @throws InterruptedException if the thread is interrupted while blocking
*/
ConsumerRecord<K, V> nextRecord() throws InterruptedException;
/**
* Block until a record can be read from this consumer's topic or until the timeout occurs, and if a record was read
* return the value in that record.
*
* @param timeout the maximum amount of time to block to wait for a record
* @param unit the unit of time for the {@code timeout}
* @return the value, or null if the method timed out
* @throws InterruptedException if the thread is interrupted while blocking
*/
default V nextValue(long timeout, TimeUnit unit) throws InterruptedException {
ConsumerRecord<K, V> record = nextRecord(timeout, unit);
return record != null ? record.value() : null;
}
/**
* Block until a record can be read from this consumer's topic or until the timeout occurs, and if a record was read
* return the record.
*
* @param timeout the maximum amount of time to block to wait for a record
* @param unit the unit of time for the {@code timeout}
* @return the record, or null if the method timed out
* @throws InterruptedException if the thread is interrupted while blocking
*/
ConsumerRecord<K, V> nextRecord(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Obtain a stream to consume the input messages. This method can be used in place of repeated calls to the
* {@code next...()} methods.
*
* @return the stream of all messages.
*/
Stream<ConsumerRecord<K, V>> stream();
/**
* Obtain a stream over all of accumulated input messages. This method can be called multiple times, and each time
* the resulting stream will operate over <em>all messages</em> that received by this consumer, and is completely
* independent of the {@link #stream()}, {@link #nextRecord()}, {@link #nextRecord(long, TimeUnit)}, {@link #nextValue()}
* and {@link #nextValue(long, TimeUnit)} methods.
*
* @return the stream of all messages.
*/
Stream<ConsumerRecord<K, V>> streamAll();
/**
* Asynchronously close this consumer's connection to Kafka and begin to clean up all resources.
*/
@Override
public void close();
}
/**
* A set of methods to use a running KafkaCluster.
*/
public class Usage {
/**
* Get a new set of properties for consumers that want to talk to this server.
*
* @param groupId the group ID for the consumer; may not be null
* @param clientId the optional identifier for the client; may be null if not needed
* @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
* out of range; may be null for the default to be used
* @return the mutable consumer properties
* @see #getProducerProperties(String)
*/
public Properties getConsumerProperties(String groupId, String clientId, OffsetResetStrategy autoOffsetReset) {
if (groupId == null) throw new IllegalArgumentException("The groupId is required");
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
if (autoOffsetReset != null) {
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset.toString().toLowerCase());
}
if (clientId != null) props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
return props;
}
/**
* Get a new set of properties for producers that want to talk to this server.
*
* @param clientId the optional identifier for the client; may be null if not needed
* @return the mutable producer properties
* @see #getConsumerProperties(String, String, OffsetResetStrategy)
*/
public Properties getProducerProperties(String clientId) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(1));
if (clientId != null) props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId);
return props;
}
/**
* Create an {@link InteractiveProducer simple producer} that can be used to write messages to the cluster.
*
* @param producerName the name of the producer; may not be null
* @param keySerializer the serializer for the keys; may not be null
* @param valueSerializer the serializer for the values; may not be null
* @return the object that can be used to produce messages; never null
*/
public <K, V> InteractiveProducer<K, V> createProducer(String producerName, Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
Properties props = getProducerProperties(producerName);
KafkaProducer<K, V> producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
return new InteractiveProducer<K, V>() {
@Override
public InteractiveProducer<K, V> write(ProducerRecord<K, V> record) {
producer.send(record);
producer.flush();
return this;
}
@Override
public void close() {
producer.close();
}
};
}
/**
* Create an {@link InteractiveProducer simple producer} that can be used to write {@link Document} messages to the
* cluster.
*
* @param producerName the name of the producer; may not be null
* @return the object that can be used to produce messages; never null
*/
public InteractiveProducer<String, Document> createProducer(String producerName) {
return createProducer(producerName, new StringSerializer(), new DocumentSerdes());
}
/**
* Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param topicName the name of the topic to read; may not be null and may not be empty
* @param keyDeserializer the deserializer for the keys; may not be null
* @param valueDeserializer the deserializer for the values; may not be null
* @param completion the function to call when the consumer terminates; may be null
* @return the running interactive consumer; never null
*/
public <K, V> InteractiveConsumer<K, V> createConsumer(String groupId, String clientId, String topicName,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer, Runnable completion) {
Set<String> topicNames = Collections.singleton(topicName);
return createConsumer(groupId, clientId, topicNames, keyDeserializer, valueDeserializer, completion);
}
/**
* Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param topicNames the names of the topics to read; may not be null and may not be empty
* @param keyDeserializer the deserializer for the keys; may not be null
* @param valueDeserializer the deserializer for the values; may not be null
* @param completion the function to call when the consumer terminates; may be null
* @return the running interactive consumer; never null
*/
public <K, V> InteractiveConsumer<K, V> createConsumer(String groupId, String clientId, Set<String> topicNames,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer, Runnable completion) {
BlockingQueue<ConsumerRecord<K, V>> consumed = new LinkedBlockingQueue<>();
List<ConsumerRecord<K, V>> allMessages = new LinkedList<>();
AtomicBoolean keepReading = new AtomicBoolean();
consume(groupId, clientId, OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, () -> keepReading.get(),
completion, topicNames, record -> {
consumed.add(record);
allMessages.add(record);
});
return new InteractiveConsumer<K, V>() {
@Override
public ConsumerRecord<K, V> nextRecord() throws InterruptedException {
return consumed.take();
}
@Override
public ConsumerRecord<K, V> nextRecord(long timeout, TimeUnit unit) throws InterruptedException {
return consumed.poll(timeout, unit);
}
@Override
public void close() {
keepReading.set(false);
}
@Override
public Stream<ConsumerRecord<K, V>> stream() {
return consumed.stream();
}
@Override
public Stream<ConsumerRecord<K, V>> streamAll() {
return allMessages.stream();
}
};
}
/**
* Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param topicName the name of the topic to read; may not be null and may not be empty
* @param completion the function to call when the consumer terminates; may be null
* @return the running interactive consumer; never null
*/
public InteractiveConsumer<String, Document> createConsumer(String groupId, String clientId, String topicName,
Runnable completion) {
Set<String> topicNames = Collections.singleton(topicName);
return createConsumer(groupId, clientId, topicNames, new StringDeserializer(), new DocumentSerdes(), completion);
}
/**
* Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param topicNames the names of the topics to read; may not be null and may not be empty
* @param completion the function to call when the consumer terminates; may be null
* @return the running interactive consumer; never null
*/
public InteractiveConsumer<String, Document> createConsumer(String groupId, String clientId, Set<String> topicNames,
Runnable completion) {
return createConsumer(groupId, clientId, topicNames, new StringDeserializer(), new DocumentSerdes(), completion);
}
/**
* Use the supplied function to asynchronously produce {@link Document} messages and write them to the cluster.
*
* @param producerName the name of the producer; may not be null
* @param producer the function that will asynchronously
*/
public <K, V> void produce(String producerName, Consumer<InteractiveProducer<String, Document>> producer) {
produce(producerName, new StringSerializer(), new DocumentSerdes(), producer);
}
/**
* Use the supplied function to asynchronously produce messages and write them to the cluster.
*
* @param producerName the name of the producer; may not be null
* @param keySerializer the serializer for the keys; may not be null
* @param valueSerializer the serializer for the values; may not be null
* @param producer the function that will asynchronously
*/
public <K, V> void produce(String producerName, Serializer<K> keySerializer, Serializer<V> valueSerializer,
Consumer<InteractiveProducer<K, V>> producer) {
Properties props = getProducerProperties(producerName);
KafkaProducer<K, V> kafkaProducer = new KafkaProducer<>(props, keySerializer, valueSerializer);
InteractiveProducer<K, V> interactive = new InteractiveProducer<K, V>() {
@Override
public InteractiveProducer<K, V> write(ProducerRecord<K, V> record) {
kafkaProducer.send(record);
kafkaProducer.flush();
return this;
}
@Override
public void close() {
kafkaProducer.close();
}
};
Thread t = new Thread(() -> {
try {
producer.accept(interactive);
} finally {
interactive.close();
}
});
t.setName(producerName + "-thread");
t.start();
}
/**
* Use the supplied function to asynchronously produce messages and write them to the cluster.
*
* @param producerName the name of the producer; may not be null
* @param messageCount the number of messages to produce; must be positive
* @param keySerializer the serializer for the keys; may not be null
* @param valueSerializer the serializer for the values; may not be null
* @param completionCallback the function to be called when the producer is completed; may be null
* @param messageSupplier the function to produce messages; may not be null
*/
public <K, V> void produce(String producerName, int messageCount,
Serializer<K> keySerializer, Serializer<V> valueSerializer,
Runnable completionCallback,
Supplier<ProducerRecord<K, V>> messageSupplier) {
Properties props = getProducerProperties(producerName);
Thread t = new Thread(() -> {
LOGGER.debug("Starting producer {} to write {} messages", producerName, messageCount);
try (KafkaProducer<K, V> producer = new KafkaProducer<>(props, keySerializer, valueSerializer)) {
for (int i = 0; i != messageCount; ++i) {
ProducerRecord<K, V> record = messageSupplier.get();
producer.send(record);
producer.flush();
LOGGER.debug("Producer {}: sent message {}", producerName, record);
}
} finally {
if (completionCallback != null) completionCallback.run();
LOGGER.debug("Stopping producer {}", producerName);
}
});
t.setName(producerName + "-thread");
t.start();
}
/**
* Use the supplied function to asynchronously produce messages with String keys and values, and write them to the
* cluster.
*
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param messageSupplier the function to produce messages; may not be null
*/
public void produceStrings(int messageCount,
Runnable completionCallback, Supplier<ProducerRecord<String, String>> messageSupplier) {
Serializer<String> keySer = new StringSerializer();
Serializer<String> valSer = keySer;
String randomId = UUID.randomUUID().toString();
produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
/**
* Use the supplied function to asynchronously produce messages with String keys and {@link Document} values, and write
* them to the cluster.
*
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param messageSupplier the function to produce messages; may not be null
*/
public void produceDocuments(int messageCount,
Runnable completionCallback, Supplier<ProducerRecord<String, Document>> messageSupplier) {
Serializer<String> keySer = new StringSerializer();
Serializer<Document> valSer = new DocumentSerdes();
String randomId = UUID.randomUUID().toString();
produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
/**
* Use the supplied function to asynchronously produce messages with String keys and Integer values, and write them to the
* cluster.
*
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param messageSupplier the function to produce messages; may not be null
*/
public void produceIntegers(int messageCount,
Runnable completionCallback, Supplier<ProducerRecord<String, Integer>> messageSupplier) {
Serializer<String> keySer = new StringSerializer();
Serializer<Integer> valSer = new IntegerSerializer();
String randomId = UUID.randomUUID().toString();
produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
/**
* Asynchronously produce messages with String keys and sequential Integer values, and write them to the cluster.
*
* @param topic the name of the topic to which the messages should be written; may not be null
* @param messageCount the number of messages to produce; must be positive
* @param initialValue the first integer value to produce
* @param completionCallback the function to be called when the producer is completed; may be null
*/
public void produceIntegers(String topic, int messageCount, int initialValue,
Runnable completionCallback) {
AtomicLong counter = new AtomicLong(initialValue);
produceIntegers(messageCount, completionCallback, () -> {
long i = counter.incrementAndGet();
String keyAndValue = Long.toString(i);
return new ProducerRecord<String, Integer>(topic, keyAndValue, new Integer((int) i));
});
}
/**
* Asynchronously produce messages with monotonically increasing String keys and values obtained from the supplied
* function, and write them to the cluster.
*
* @param topic the name of the topic to which the messages should be written; may not be null
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param valueSupplier the value supplier; may not be null
*/
public void produceStrings(String topic, int messageCount,
Runnable completionCallback, Supplier<String> valueSupplier) {
AtomicLong counter = new AtomicLong(0);
produceStrings(messageCount, completionCallback, () -> {
long i = counter.incrementAndGet();
String keyAndValue = Long.toString(i);
return new ProducerRecord<String, String>(topic, keyAndValue, valueSupplier.get());
});
}
/**
* Asynchronously produce messages with monotonically increasing String keys and values obtained from the supplied
* function, and write them to the cluster.
*
* @param topic the name of the topic to which the messages should be written; may not be null
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param valueSupplier the value supplier; may not be null
*/
public void produceDocuments(String topic, int messageCount,
Runnable completionCallback, Supplier<Document> valueSupplier) {
AtomicLong counter = new AtomicLong(0);
produceDocuments(messageCount, completionCallback, () -> {
long i = counter.incrementAndGet();
String keyAndValue = Long.toString(i);
return new ProducerRecord<String, Document>(topic, keyAndValue, valueSupplier.get());
});
}
/**
* Use the supplied function to asynchronously consume messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
* out of range; may be null for the default to be used
* @param keyDeserializer the deserializer for the keys; may not be null
* @param valueDeserializer the deserializer for the values; may not be null
* @param continuation the function that determines if the consumer should continue; may not be null
* @param completion the function to call when the consumer terminates; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset,
Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer,
BooleanSupplier continuation, Runnable completion, Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) {
Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset);
Thread t = new Thread(() -> {
LOGGER.debug("Starting consumer {} to read messages", clientId);
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) {
consumer.subscribe(new ArrayList<>(topics));
while (continuation.getAsBoolean()) {
consumer.poll(10).forEach(record -> {
LOGGER.debug("Consumer {}: consuming message {}", clientId, record);
consumerFunction.accept(record);
consumer.commitAsync();
});
}
} finally {
if (completion != null) completion.run();
LOGGER.debug("Stopping consumer {}", clientId);
}
});
t.setName(clientId + "-thread");
t.start();
}
/**
* Asynchronously consume all messages from the cluster.
*
* @param continuation the function that determines if the consumer should continue; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public void consumeDocuments(BooleanSupplier continuation, Runnable completion, Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<String, Document>> consumerFunction) {
Deserializer<String> keyDes = new StringDeserializer();
Deserializer<Document> valDes = new DocumentSerdes();
String randomId = UUID.randomUUID().toString();
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
}
/**
* Asynchronously consume all messages from the cluster.
*
* @param continuation the function that determines if the consumer should continue; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public void consumeStrings(BooleanSupplier continuation, Runnable completion, Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<String, String>> consumerFunction) {
Deserializer<String> keyDes = new StringDeserializer();
Deserializer<String> valDes = keyDes;
String randomId = UUID.randomUUID().toString();
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
}
/**
* Asynchronously consume all messages from the cluster.
*
* @param continuation the function that determines if the consumer should continue; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<String, Integer>> consumerFunction) {
Deserializer<String> keyDes = new StringDeserializer();
Deserializer<Integer> valDes = new IntegerDeserializer();
String randomId = UUID.randomUUID().toString();
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param consumer the function to consume the messages; may not be null
*/
public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion,
BiPredicate<String, String> consumer) {
AtomicLong readCounter = new AtomicLong();
consumeStrings(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit),
completion,
Collections.singleton(topicName),
record -> {
if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet();
});
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param consumer the function to consume the messages; may not be null
*/
public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion,
BiPredicate<String, Document> consumer) {
AtomicLong readCounter = new AtomicLong();
consumeDocuments(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit),
completion,
Collections.singleton(topicName),
record -> {
if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet();
});
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param consumer the function to consume the messages; may not be null
*/
public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion,
BiPredicate<String, Integer> consumer) {
AtomicLong readCounter = new AtomicLong();
consumeIntegers(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit),
completion,
Collections.singleton(topicName),
record -> {
if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet();
});
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
*/
public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
consumeStrings(topicName, count, timeout, unit, completion, (key, value) -> true);
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
*/
public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
consumeDocuments(topicName, count, timeout, unit, completion, (key, value) -> true);
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
*/
public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
consumeIntegers(topicName, count, timeout, unit, completion, (key, value) -> true);
}
protected BooleanSupplier continueIfNotExpired(BooleanSupplier continuation, long timeout, TimeUnit unit) {
return new BooleanSupplier() {
long stopTime = 0L;
@Override
public boolean getAsBoolean() {
if (stopTime == 0L) stopTime = System.currentTimeMillis() + unit.toMillis(timeout);
return continuation.getAsBoolean() && System.currentTimeMillis() <= stopTime;
}
};
}
}
}

View File

@ -0,0 +1,196 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.util.Stopwatch;
import io.debezium.util.Testing;
/**
* @author Randall Hauch
*/
public class KafkaClusterTest {
private KafkaCluster cluster;
private File dataDir;
@Before
public void beforeEach() {
dataDir = Testing.Files.createTestingDirectory("cluster");
Testing.Files.delete(dataDir);
cluster = new KafkaCluster().usingDirectory(dataDir);
}
@After
public void afterEach() {
cluster.shutdown();
Testing.Files.delete(dataDir);
}
@Test
public void shouldStartClusterWithOneBrokerAndRemoveData() throws Exception {
cluster.deleteDataUponShutdown(true).addBrokers(1).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
cluster.shutdown();
cluster.onEachDirectory(this::assertDoesNotExist);
}
@Test
public void shouldStartClusterWithMultipleBrokerAndRemoveData() throws Exception {
cluster.deleteDataUponShutdown(true).addBrokers(3).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
cluster.shutdown();
cluster.onEachDirectory(this::assertDoesNotExist);
}
@Test
public void shouldStartClusterWithOneBrokerAndLeaveData() throws Exception {
cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
cluster.shutdown();
cluster.onEachDirectory(this::assertValidDataDirectory);
}
@Test
public void shouldStartClusterWithMultipleBrokerAndLeaveData() throws Exception {
cluster.deleteDataUponShutdown(false).addBrokers(3).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
cluster.shutdown();
cluster.onEachDirectory(this::assertValidDataDirectory);
}
@Test
public void shouldStartClusterAndAllowProducersAndConsumersToUseIt() throws Exception {
Testing.Debug.enable();
final String topicName = "topicA";
final CountDownLatch completion = new CountDownLatch(2);
final int numMessages = 100;
final AtomicLong messagesRead = new AtomicLong(0);
// Start a cluster and create a topic ...
cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
cluster.createTopics(topicName);
// Consume messages asynchronously ...
Stopwatch sw = Stopwatch.reusable().start();
cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
messagesRead.incrementAndGet();
return true;
});
// Produce some messages asynchronously ...
cluster.useTo().produceIntegers(topicName, numMessages, 1, completion::countDown);
// Wait for both to complete ...
if (completion.await(10, TimeUnit.SECONDS)) {
sw.stop();
Testing.debug("Both consumer and producer completed normally in " + sw.durations());
} else {
Testing.debug("Consumer and/or producer did not completed normally");
}
assertThat(messagesRead.get()).isEqualTo(numMessages);
}
@Test
public void shouldStartClusterAndAllowInteractiveProductionAndAutomaticConsumersToUseIt() throws Exception {
Testing.Debug.enable();
final String topicName = "topicA";
final CountDownLatch completion = new CountDownLatch(1);
final int numMessages = 3;
final AtomicLong messagesRead = new AtomicLong(0);
// Start a cluster and create a topic ...
cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
cluster.createTopics(topicName);
// Consume messages asynchronously ...
Stopwatch sw = Stopwatch.reusable().start();
cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
messagesRead.incrementAndGet();
return true;
});
// Produce some messages interactively ...
cluster.useTo()
.createProducer("manual", new StringSerializer(), new IntegerSerializer())
.write(topicName, "key1", 1)
.write(topicName, "key2", 2)
.write(topicName, "key3", 3)
.close();
// Wait for the consumer to to complete ...
if (completion.await(10, TimeUnit.SECONDS)) {
sw.stop();
Testing.debug("The consumer completed normally in " + sw.durations());
} else {
Testing.debug("Consumer did not completed normally");
}
assertThat(messagesRead.get()).isEqualTo(numMessages);
}
@Test
public void shouldStartClusterAndAllowAsynchronousProductionAndAutomaticConsumersToUseIt() throws Exception {
Testing.Debug.enable();
final String topicName = "topicA";
final CountDownLatch completion = new CountDownLatch(2);
final int numMessages = 3;
final AtomicLong messagesRead = new AtomicLong(0);
// Start a cluster and create a topic ...
cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
cluster.createTopics(topicName);
// Consume messages asynchronously ...
Stopwatch sw = Stopwatch.reusable().start();
cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
messagesRead.incrementAndGet();
return true;
});
// Produce some messages interactively ...
cluster.useTo().produce("manual", new StringSerializer(), new IntegerSerializer(), produer -> {
produer.write(topicName, "key1", 1);
produer.write(topicName, "key2", 2);
produer.write(topicName, "key3", 3);
completion.countDown();
});
// Wait for the consumer to to complete ...
if (completion.await(10, TimeUnit.SECONDS)) {
sw.stop();
Testing.debug("The consumer completed normally in " + sw.durations());
} else {
Testing.debug("Consumer did not completed normally");
}
assertThat(messagesRead.get()).isEqualTo(numMessages);
}
protected void assertValidDataDirectory(File dir) {
assertThat(dir.exists()).isTrue();
assertThat(dir.isDirectory()).isTrue();
assertThat(dir.canWrite()).isTrue();
assertThat(dir.canRead()).isTrue();
assertThat(Testing.Files.inTargetDir(dir)).isTrue();
}
protected void assertDoesNotExist(File dir) {
assertThat(dir.exists()).isFalse();
}
}

View File

@ -0,0 +1,339 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.util.IoUtil;
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.utils.ZkUtils;
/**
* A small embedded Kafka server.
*
* @author Randall Hauch
*/
@ThreadSafe
public class KafkaServer {
public static final int DEFAULT_BROKER_ID = 1;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServer.class);
private final Supplier<String> zkConnection;
private final int brokerId;
private volatile File logsDir;
private final Properties config;
private volatile int desiredPort = -1;
private volatile int port = -1;
private volatile kafka.server.KafkaServer server;
/**
* Create a new server instance.
*
* @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
*/
public KafkaServer(Supplier<String> zookeeperConnection) {
this(zookeeperConnection, DEFAULT_BROKER_ID);
}
/**
* Create a new server instance.
*
* @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
* @param brokerId the unique broker ID
*/
public KafkaServer(Supplier<String> zookeeperConnection, int brokerId) {
this(zookeeperConnection, brokerId, -1);
}
/**
* Create a new server instance.
*
* @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
* @param brokerId the unique broker ID
* @param port the desired port
*/
public KafkaServer(Supplier<String> zookeeperConnection, int brokerId, int port) {
if (zookeeperConnection == null) throw new IllegalArgumentException("The Zookeeper connection string supplier may not be null");
this.zkConnection = zookeeperConnection;
this.brokerId = brokerId;
this.config = new Properties();
setPort(port);
populateDefaultConfiguration(this.config);
}
protected int brokerId() {
return brokerId;
}
protected String zookeeperConnection() {
return this.zkConnection.get();
}
/**
* Set the initial default configuration properties. This method is called from the constructors and can be overridden
* to customize these properties.
*
* @param props the configuration properties; never null
*/
protected void populateDefaultConfiguration(Properties props) {
config.setProperty(KafkaConfig.NumPartitionsProp(), String.valueOf(1));
config.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
}
/**
* Set a configuration property. Several key properties that deal with Zookeeper, the host name, and the broker ID,
* may not be set via this method and are ignored since they are controlled elsewhere in this instance.
*
* @param name the property name; may not be null
* @param value the property value; may be null
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the server is running when this method is called
*/
public KafkaServer setProperty(String name, String value) {
if (server != null) throw new IllegalStateException("Unable to change the properties when already running");
if (!KafkaConfig.ZkConnectProp().equalsIgnoreCase(name)
&& !KafkaConfig.BrokerIdProp().equalsIgnoreCase(name)
&& !KafkaConfig.HostNameProp().equalsIgnoreCase(name)) {
this.config.setProperty(name, value);
}
return this;
}
/**
* Set multiple configuration properties. Several key properties that deal with Zookeeper, the host name, and the broker ID,
* may not be set via this method and are ignored since they are controlled elsewhere in this instance.
*
* @param properties the configuration properties; may be null or empty
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the server is running when this method is called
*/
public KafkaServer setProperties( Properties properties ) {
if (server != null) throw new IllegalStateException("Unable to change the properties when already running");
properties.stringPropertyNames().forEach(propName -> {
setProperty(propName, properties.getProperty(propName));
});
return this;
}
/**
* Set the port for the server.
*
* @param port the desired port, or {@code -1} if a random available port should be found and used
* @return this instance to allow chaining methods; never null
*/
public KafkaServer setPort(int port) {
this.desiredPort = port > 0 ? port : -1;
this.port = desiredPort;
return this;
}
/**
* Get a copy of the complete configuration that is or will be used by the running server.
*
* @return the properties for the currently-running server; may be empty if not running
*/
public Properties config() {
Properties runningConfig = new Properties(config);
runningConfig.setProperty(KafkaConfig.ZkConnectProp(), zookeeperConnection());
runningConfig.setProperty(KafkaConfig.BrokerIdProp(), Integer.toString(brokerId));
runningConfig.setProperty(KafkaConfig.HostNameProp(), "localhost");
runningConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), String.valueOf(Boolean.TRUE));
return runningConfig;
}
/**
* Get the connection string. If the server is not {@link #startup() running} and the port is to be dynamically discovered
* upon startup, then this method returns "{@code localhost:-1}".
*
* @return the connection string; never null
*/
public String getConnection() {
return "localhost:" + port;
}
/**
* Start the embedded Kafka server.
*
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the server is already running
*/
public synchronized KafkaServer startup() {
if (server != null) throw new IllegalStateException("" + this + " is already running");
// Determine the storage directory and adjust the configuration ...
Properties config = config();
if (logsDir == null) {
try {
File temp = File.createTempFile("kafka", "suffix");
this.logsDir = temp.getParentFile();
temp.delete();
} catch (IOException e) {
throw new RuntimeException("Unable to create temporary directory", e);
}
}
config.setProperty(KafkaConfig.LogDirProp(), logsDir.getAbsolutePath());
// Determine the port and adjust the configuration ...
port = desiredPort > 0 ? desiredPort : IoUtil.getAvailablePort();
config.setProperty(KafkaConfig.PortProp(), Integer.toString(port));
// config.setProperty("metadata.broker.list", getConnection());
// Start the server ...
try {
LOGGER.debug("Starting Kafka broker {} @ {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath());
server = new kafka.server.KafkaServer(new KafkaConfig(config), new SystemTime(), scala.Option.apply(null));
server.startup();
LOGGER.info("Started Kafka server {} @ {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath());
return this;
} catch (RuntimeException e) {
server = null;
throw e;
}
}
/**
* Shutdown the embedded Kafka server and delete all data.
*/
public synchronized void shutdown() {
if (server != null) {
try {
server.shutdown();
} finally {
server = null;
port = desiredPort;
}
}
}
/**
* Delete all of the data associated with this server.
*/
public synchronized void deleteData() {
if (server == null) {
// Delete all data ...
try {
IoUtil.delete(this.logsDir);
} catch (IOException e) {
LOGGER.error("Unable to delete directory '{}'", this.logsDir, e);
}
}
}
/**
* Get the Zookeeper utilities used by the running Kafka server.
*
* @return the Zookeeper utilities, or null if the Kafka server is not running
*/
public ZkUtils getZkUtils() {
return server != null ? server.zkUtils() : null;
}
/**
* Create the specified topics.
*
* @param topics the names of the topics to create
*/
public void createTopics(String... topics) {
createTopics(1,1,topics);
}
/**
* Create the specified topics.
*
* @param numPartitions the number of partitions for each topic
* @param replicationFactor the replication factor for each topic
* @param topics the names of the topics to create
*/
public void createTopics(int numPartitions, int replicationFactor, String... topics) {
for (String topic : topics) {
if ( topic != null ) createTopic(topic, numPartitions, replicationFactor);
}
}
/**
* Create the specified topic.
*
* @param topic the name of the topic to create
* @param numPartitions the number of partitions for the topic
* @param replicationFactor the replication factor for the topic
*/
public void createTopic( String topic, int numPartitions, int replicationFactor ) {
AdminUtils.createTopic(getZkUtils(), topic, 1, 1, new Properties());
}
/**
* Perform the supplied function on each directory used by this server.
*
* @param consumer the consumer function; may not be null
*/
void onEachDirectory(Consumer<File> consumer) {
consumer.accept(getStateDirectory());
}
/**
* Get the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself
* under this directory.
*
* @return the parent directory for the broker's state; may be null if a temporary directory will be used
*/
public File getStateDirectory() {
return this.logsDir;
}
/**
* Set the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself
* under this directory.
*
* @param stateDirectory the parent directory for the broker's state; may be null if a temporary directory will be used
* @throws IllegalArgumentException if the supplied file is not a directory or not writable
*/
public void setStateDirectory(File stateDirectory) {
if (stateDirectory != null && stateDirectory.exists() && !stateDirectory.isDirectory() && !stateDirectory.canWrite()
&& !stateDirectory.canRead()) {
throw new IllegalArgumentException("The directory must be readable and writable");
}
this.logsDir = stateDirectory;
}
@Override
public String toString() {
return "KafkaServer{" + getConnection() + "}";
}
protected static class SystemTime implements kafka.utils.Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
@Override
public long nanoseconds() {
return System.nanoTime();
}
@Override
public void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
// Ignore
}
}
}
}

View File

@ -0,0 +1,220 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.function.Consumer;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.util.IoUtil;
/**
* A lightweight embeddable Zookeeper server useful for unit testing.
*
* @author Randall Hauch
* @see KafkaCluster
*/
@ThreadSafe
public class ZookeeperServer {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServer.class);
public static int DEFAULT_TICK_TIME = 500;
/**
* The basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be
* twice the tickTime.
*/
private volatile int tickTime = DEFAULT_TICK_TIME;
private volatile int port = -1;
private volatile ServerCnxnFactory factory;
private volatile File dataDir;
private volatile File snapshotDir;
private volatile File logDir;
/**
* Create a new server instance.
*/
public ZookeeperServer() {
}
/**
* Start the embedded Zookeeper server.
*
* @return this instance to allow chaining methods; never null
* @throws IOException if there is an error during startup
* @throws IllegalStateException if the server is already running
*/
public synchronized ZookeeperServer startup() throws IOException {
if (factory != null) throw new IllegalStateException("" + this + " is already running");
if (this.port == -1) this.port = IoUtil.getAvailablePort();
this.factory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), 1024);
if ( this.dataDir == null ) {
try {
File temp = File.createTempFile("kafka","suffix");
this.dataDir = temp.getParentFile();
temp.delete();
} catch ( IOException e ) {
throw new RuntimeException("Unable to create temporary directory",e);
}
}
this.snapshotDir = new File(this.dataDir,"snapshot");
this.logDir = new File(this.dataDir,"log");
this.snapshotDir.mkdirs();
this.logDir.mkdirs();
try {
factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
return this;
} catch (InterruptedException e) {
factory = null;
Thread.interrupted();
throw new IOException(e);
}
}
/**
* Shutdown the embedded Zookeeper server and delete all data.
*/
public void shutdown() {
shutdown(true);
}
/**
* Shutdown the embedded Kafka server.
*
* @param deleteData true if the data should be removed, or false otherwise
*/
public synchronized void shutdown(boolean deleteData) {
if (factory != null) {
try {
factory.shutdown();
} finally {
factory = null;
if (deleteData) {
// Delete all data ...
try {
IoUtil.delete(this.snapshotDir,this.logDir);
} catch ( IOException e ) {
LOGGER.error("Unable to delete data upon shutdown",e);
}
}
}
}
}
/**
* Get the connection string. If the server is not {@link #startup() running} and the port is to be dynamically discovered
* upon startup, then this method returns "{@code localhost:-1}".
*
* @return the connection string; never null
*/
public String getConnection() {
return "localhost:" + port;
}
/**
* Set the port for the server.
*
* @param port the desired port, or {@code -1} if a random available port should be found and used
* @return this instance to allow chaining methods; never null
*/
public ZookeeperServer setPort(int port) {
this.port = port;
return this;
}
/**
* Set the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will
* be twice the tickTime.
*
* @param tickTime the desired value, or non-positive if the default of {@value #DEFAULT_TICK_TIME} be used
* @return this instance to allow chaining methods; never null
*/
public ZookeeperServer setTickTime(int tickTime) {
this.tickTime = tickTime > 0 ? tickTime : DEFAULT_TICK_TIME;
return this;
}
/**
* Get the current port.
*
* @return the port number, or {@code -1} if the port should be discovered upon {@link #startup()}
*/
public int getPort() {
return port;
}
/**
* Get the basic time unit in milliseconds used by ZooKeeper.
*
* @return tick time; always positive
*/
public int getTickTime() {
return tickTime;
}
/**
* Perform the supplied function on each directory used by this server.
*
* @param consumer the consumer function; may not be null
*/
void onEachDirectory(Consumer<File> consumer) {
consumer.accept(getSnapshotDirectory());
consumer.accept(getLogDirectory());
}
/**
* Get the parent directory where the server's snapshots are kept.
* @return the parent directory for the server's snapshots; never null once the server is running
*/
File getSnapshotDirectory() {
return this.snapshotDir;
}
/**
* Get the parent directory where the server's logs are kept.
* @return the parent directory for the server's logs; never null once the server is running
*/
File getLogDirectory() {
return this.logDir;
}
/**
* Get the parent directory where the server's logs and snapshots will be kept.
* @return the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used
*/
public File getStateDirectory() {
return this.logDir;
}
/**
* Set the parent directory where the server's logs and snapshots will be kept.
* @param dataDir the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used
* @return this instance to allow method chaining; never null
* @throws IllegalArgumentException if the supplied file is not a directory or not writable
*/
public ZookeeperServer setStateDirectory(File dataDir) {
if ( dataDir != null && dataDir.exists() && !dataDir.isDirectory() && !dataDir.canWrite() && !dataDir.canRead() ) {
throw new IllegalArgumentException("The directory must be readable and writable");
}
this.dataDir = dataDir;
return this;
}
@Override
public String toString() {
return "ZookeeperServer{" + getConnection() + "}";
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.File;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.util.Testing;
/**
* @author Randall Hauch
*
*/
public class ZookeeperServerTest {
private ZookeeperServer server;
private File dataDir;
@Before
public void beforeEach() {
dataDir = Testing.Files.createTestingDirectory("zk");
Testing.Files.delete(dataDir);
server = new ZookeeperServer();
server.setStateDirectory(dataDir);
}
@After
public void afterEach() {
Testing.Files.delete(dataDir);
}
@Test
public void shouldStartServerAndRemoveData() throws Exception {
Testing.debug("Running 1");
server.startup();
server.onEachDirectory(this::assertValidDataDirectory);
server.shutdown(true);
server.onEachDirectory(this::assertDoesNotExist);
}
@Test
public void shouldStartServerAndLeaveData() throws Exception {
Testing.debug("Running 2");
server.startup();
server.onEachDirectory(this::assertValidDataDirectory);
server.shutdown(false);
server.onEachDirectory(this::assertValidDataDirectory);
}
protected void assertValidDataDirectory(File dir) {
assertThat(dir.exists()).isTrue();
assertThat(dir.isDirectory()).isTrue();
assertThat(dir.canWrite()).isTrue();
assertThat(dir.canRead()).isTrue();
assertThat(Testing.Files.inTargetDir(dir)).isTrue();
}
protected void assertDoesNotExist(File dir) {
assertThat(dir.exists()).isFalse();
}
}

View File

@ -4,8 +4,56 @@ log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %m (%c)%n
# Direct Zookeeper log messages to stdout with special prefix
log4j.appender.zk=org.apache.log4j.ConsoleAppender
log4j.appender.zk.Target=System.out
log4j.appender.zk.layout=org.apache.log4j.PatternLayout
log4j.appender.zk.layout.ConversionPattern=%d{ISO8601} %-5p ZOO %m%n
#log4j.appender.zk.layout.ConversionPattern=%d{ISO8601} %-5p ZOO %m (%c)%n
# Direct Zookeeper Client log messages to stdout with special prefix
log4j.appender.zkclient=org.apache.log4j.ConsoleAppender
log4j.appender.zkclient.Target=System.out
log4j.appender.zkclient.layout=org.apache.log4j.PatternLayout
log4j.appender.zkclient.layout.ConversionPattern=%d{ISO8601} %-5p ZKC %m%n
#log4j.appender.zkclient.layout.ConversionPattern=%d{ISO8601} %-5p ZKC %m (%c)%n
# Direct Kafka log messages to stdout with special prefix
log4j.appender.kafka=org.apache.log4j.ConsoleAppender
log4j.appender.kafka.Target=System.out
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{ISO8601} %-5p KAF %m%n
#log4j.appender.kafka.layout.ConversionPattern=%d{ISO8601} %-5p KAF %m (%c)%n
# Direct Debezium log messages to stdout with special prefix
log4j.appender.debezium=org.apache.log4j.ConsoleAppender
log4j.appender.debezium.Target=System.out
log4j.appender.debezium.layout=org.apache.log4j.PatternLayout
log4j.appender.debezium.layout.ConversionPattern=%d{ISO8601} %-5p DBZ %m (%c)%n
# Root logger option
log4j.rootLogger=INFO, stdout
# Set up the default logging to be INFO level, then override specific units
log4j.logger.io.debezium=INFO
log4j.logger.io.debezium=INFO, debezium
log4j.additivity.io.debezium=false
# Kafka is pretty verbose at INFO level, so for brevity use ERROR everywhere except INFO at kafka.server.KafkaServer
log4j.logger.org.apache.kafka=ERROR, kafka
log4j.additivity.org.apache.kafka=false
log4j.logger.kafka=ERROR, kafka
log4j.additivity.kafka=false
log4j.logger.kafka.server.KafkaServer=ERROR, kafka
log4j.additivity.kafka.server.KafkaServer=false
log4j.logger.state.change.logger=ERROR, kafka
log4j.additivity.state.change.logger=false
# Zookeeper is pretty verbose at INFO level, so for brevity use ERROR everywhere except INFO at org.apache.zookeeper.server.ZooKeeperServer
log4j.logger.org.apache.zookeeper=ERROR, zk
log4j.additivity.org.apache.zookeeper=false
log4j.logger.org.apache.zookeeper.server.ZooKeeperServer=ERROR, zk
log4j.additivity.org.apache.zookeeper.server.ZooKeeperServer=false
# Zookeeper Client is pretty verbose at INFO level, so for brevity use ERROR everywhere ...
log4j.logger.org.I0Itec.zkclient=ERROR, zkclient
log4j.additivity.org.I0Itec.zkclient=false

36
pom.xml
View File

@ -79,6 +79,12 @@
<!-- Dockerfiles -->
<docker.maintainer>Debezium community</docker.maintainer>
<!-- Kafka -->
<version.kafka>0.9.0.0</version.kafka>
<version.kafka.scala>2.11</version.kafka.scala>
<version.scala>2.11.7</version.scala>
<version.curator>2.4.0</version.curator>
<version.zookeeper>3.4.6</version.zookeeper>
</properties>
<modules>
<module>support/checkstyle</module>
@ -158,6 +164,36 @@
<scope>test</scope>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${version.kafka.scala}</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${version.zookeeper}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${version.kafka.scala}</artifactId>
<version>${version.kafka}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${version.curator}</version>
<scope>test</scope>
</dependency>
<!-- Test depedencies -->
<dependency>
<groupId>junit</groupId>