Merge pull request #10 from rhauch/dbz-10

DBZ-10 Added small utility so unit tests can run an embedded Kafka cluster in-process
This commit is contained in:
Randall Hauch 2016-02-04 15:29:25 -06:00
commit 64187eb390
10 changed files with 1971 additions and 7 deletions

View File

@ -46,6 +46,21 @@
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
</dependency>
<!-- Used for unit testing with Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${version.kafka.scala}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>

View File

@ -251,7 +251,7 @@ protected void parseNextStatement(Marker marker) {
*/
protected void parseComment(Marker marker) {
String comment = tokens.consume();
logger.debug("COMMENT: {}", comment);
logger.trace("COMMENT: {}", comment);
}
/**
@ -304,16 +304,16 @@ protected void parseUnknownStatement(Marker marker) {
}
protected void debugParsed(Marker statementStart) {
if (logger.isDebugEnabled()) {
if (logger.isTraceEnabled()) {
String statement = removeLineFeeds(tokens.getContentFrom(statementStart));
logger.debug("PARSED: {}", statement);
logger.trace("PARSED: {}", statement);
}
}
protected void debugSkipped(Marker statementStart) {
if (logger.isDebugEnabled()) {
if (logger.isTraceEnabled()) {
String statement = removeLineFeeds(tokens.getContentFrom(statementStart));
logger.debug("SKIPPED: {}", statement);
logger.trace("SKIPPED: {}", statement);
}
}

View File

@ -380,7 +380,7 @@ public static void delete(File... filesOrFolder) throws IOException {
public static void delete(Path path) throws IOException {
if (path != null) {
if (path.toAbsolutePath().toFile().exists()) {
LOGGER.info("Deleting '" + path + "'...");
LOGGER.debug("Deleting '" + path + "'...");
Set<FileVisitOption> options = EnumSet.noneOf(FileVisitOption.class);
int maxDepth = 10;
FileVisitor<Path> removingVisitor = new SimpleFileVisitor<Path>() {

View File

@ -0,0 +1,1041 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.document.Document;
import io.debezium.document.DocumentSerdes;
import io.debezium.util.IoUtil;
/**
* An embeddable cluster of Kafka servers and a single Zookeeper server. This may be useful when creating a complete environment
* within a single process, but doing so offers limited durability and fault tolerance compared to the normal approach of
* using an external cluster of Kafka servers and Zookeeper servers with proper replication and fault tolerance.
*
* @author Randall Hauch
*/
@ThreadSafe
public class KafkaCluster {
public static final boolean DEFAULT_DELETE_DATA_UPON_SHUTDOWN = true;
public static final boolean DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP = false;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCluster.class);
private final ConcurrentMap<Integer, KafkaServer> kafkaServers = new ConcurrentHashMap<>();
private final ZookeeperServer zkServer = new ZookeeperServer();
private volatile File dataDir = null;
private volatile boolean deleteDataUponShutdown = DEFAULT_DELETE_DATA_UPON_SHUTDOWN;
private volatile boolean deleteDataPriorToStartup = DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP;
private volatile boolean running = false;
private volatile Properties kafkaConfig = null;
private volatile int startingKafkaPort = -1;
private final AtomicLong nextKafkaPort = new AtomicLong(startingKafkaPort);
/**
* Create a new embedded cluster.
*/
public KafkaCluster() {
}
/**
* Specify whether the data is to be deleted upon {@link #shutdown()}.
*
* @param delete true if the data is to be deleted upon shutdown, or false otherwise
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster deleteDataUponShutdown(boolean delete) {
if (running) throw new IllegalStateException("Unable to change cluster settings when running");
this.deleteDataUponShutdown = delete;
return this;
}
/**
* Specify whether the data is to be deleted prior to {@link #startup()}.
*
* @param delete true if the data is to be deleted upon shutdown, or false otherwise
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster deleteDataPriorToStartup(boolean delete) {
if (running) throw new IllegalStateException("Unable to change cluster settings when running");
this.deleteDataPriorToStartup = delete;
return this;
}
/**
* Add a number of new Kafka broker to the cluster. The broker IDs will be generated.
*
* @param count the number of new brokers to add
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster addBrokers(int count) {
if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
AtomicLong added = new AtomicLong();
while (added.intValue() < count) {
kafkaServers.computeIfAbsent(new Integer(added.intValue() + 1), id -> {
added.incrementAndGet();
KafkaServer server = new KafkaServer(zkServer::getConnection, id);
if (dataDir != null) server.setStateDirectory(dataDir);
if (kafkaConfig != null) server.setProperties(kafkaConfig);
if (startingKafkaPort >= 0) server.setPort((int) this.nextKafkaPort.getAndIncrement());
return server;
});
}
return this;
}
/**
* Set the parent directory where the brokers logs and server's logs and snapshots will be kept.
*
* @param dataDir the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
* @throws IllegalArgumentException if the supplied file is not a directory or not writable
*/
public KafkaCluster usingDirectory(File dataDir) {
if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
if (dataDir != null && dataDir.exists() && !dataDir.isDirectory() && !dataDir.canWrite() && !dataDir.canRead()) {
throw new IllegalArgumentException("The directory must be readable and writable");
}
this.dataDir = dataDir;
return this;
}
/**
* Set the configuration properties for each of the brokers. This method does nothing if the supplied properties are null or
* empty.
*
* @param properties the Kafka configuration properties
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster withKafkaConfiguration(Properties properties) {
if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
if (properties != null && !properties.isEmpty()) {
kafkaConfig = new Properties(properties);
kafkaServers.values().forEach(kafka -> kafka.setProperties(kafkaConfig));
}
return this;
}
/**
* Set the port numbers for Zookeeper and the Kafka brokers.
*
* @param zkPort the port number that Zookeeper should use; may be -1 if an available port should be discovered
* @param firstKafkaPort the port number for the first Kafka broker (additional brokers will use subsequent port numbers);
* may be -1 if available ports should be discovered
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the cluster is running
*/
public KafkaCluster withPorts(int zkPort, int firstKafkaPort) {
if (running) throw new IllegalStateException("Unable to add a broker when the cluster is already running");
this.zkServer.setPort(zkPort);
this.startingKafkaPort = firstKafkaPort;
if (this.startingKafkaPort >= 0) {
this.nextKafkaPort.set(this.startingKafkaPort);
kafkaServers.values().forEach(kafka -> kafka.setPort((int) this.nextKafkaPort.getAndIncrement()));
}
return this;
}
/**
* Determine if the cluster is running.
*
* @return true if the cluster is running, or false otherwise
*/
public boolean isRunning() {
return running;
}
/**
* Start the embedded Zookeeper server and the Kafka servers {@link #addBrokers(int) in the cluster}.
* This method does nothing if the cluster is already running.
*
* @return this instance to allow chaining methods; never null
* @throws IOException if there is an error during startup
*/
public synchronized KafkaCluster startup() throws IOException {
if (!running) {
if (dataDir == null) {
try {
File temp = File.createTempFile("kafka", "suffix");
dataDir = new File(temp.getParentFile(), "cluster");
dataDir.mkdirs();
temp.delete();
} catch (IOException e) {
throw new RuntimeException("Unable to create temporary directory", e);
}
} else if (deleteDataPriorToStartup) {
IoUtil.delete(dataDir);
dataDir.mkdirs();
}
File zkDir = new File(dataDir, "zk");
zkServer.setStateDirectory(zkDir); // does error checking
this.dataDir = dataDir;
File kafkaDir = new File(dataDir, "kafka");
kafkaServers.values().forEach(server -> server.setStateDirectory(new File(kafkaDir, "broker" + server.brokerId())));
zkServer.startup();
LOGGER.debug("Starting {} brokers", kafkaServers.size());
kafkaServers.values().forEach(KafkaServer::startup);
running = true;
}
return this;
}
/**
* Shutdown the embedded Zookeeper server and the Kafka servers {@link #addBrokers(int) in the cluster}.
* This method does nothing if the cluster is not running.
*
* @return this instance to allow chaining methods; never null
*/
public synchronized KafkaCluster shutdown() {
if (running) {
try {
kafkaServers.values().forEach(this::shutdownReliably);
} finally {
try {
zkServer.shutdown(deleteDataUponShutdown);
} catch (Throwable t) {
LOGGER.error("Error while shutting down {}", zkServer, t);
} finally {
if (deleteDataUponShutdown) {
try {
kafkaServers.values().forEach(KafkaServer::deleteData);
} finally {
try {
IoUtil.delete(this.dataDir);
} catch (IOException e) {
LOGGER.error("Error while deleting cluster data", e);
}
}
}
running = false;
}
}
}
return this;
}
/**
* Create the specified topics.
*
* @param topics the names of the topics to create
* @throws IllegalStateException if the cluster is not running
*/
public void createTopics(String... topics) {
LOGGER.debug("Creating topics: {}", Arrays.toString(topics));
if (!running) throw new IllegalStateException("The cluster must be running to create topics");
kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(topics));
}
/**
* Create the specified topics.
*
* @param topics the names of the topics to create
* @throws IllegalStateException if the cluster is not running
*/
public void createTopics(Set<String> topics) {
createTopics(topics.toArray(new String[topics.size()]));
}
/**
* Create the specified topics.
*
* @param numPartitions the number of partitions for each topic
* @param replicationFactor the replication factor for each topic
* @param topics the names of the topics to create
*/
public void createTopics(int numPartitions, int replicationFactor, String... topics) {
LOGGER.debug("Creating topics with {} partitions and {} replicas each: {}", numPartitions, replicationFactor,
Arrays.toString(topics));
if (!running) throw new IllegalStateException("The cluster must be running to create topics");
kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(numPartitions, replicationFactor, topics));
}
/**
* Create the specified topics.
*
* @param numPartitions the number of partitions for each topic
* @param replicationFactor the replication factor for each topic
* @param topics the names of the topics to create
*/
public void createTopics(int numPartitions, int replicationFactor, Set<String> topics) {
createTopics(numPartitions, replicationFactor, topics.toArray(new String[topics.size()]));
}
/**
* Create the specified topic.
*
* @param topic the name of the topic to create
* @param numPartitions the number of partitions for the topic
* @param replicationFactor the replication factor for the topic
*/
public void createTopic(String topic, int numPartitions, int replicationFactor) {
LOGGER.debug("Creating topic '{}' with {} partitions and {} replicas", topic, numPartitions, replicationFactor);
if (!running) throw new IllegalStateException("The cluster must be running to create topics");
kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopic(topic, numPartitions, replicationFactor));
}
/**
* Perform the supplied function on each directory used by this cluster.
*
* @param consumer the consumer function; may not be null
*/
void onEachDirectory(java.util.function.Consumer<File> consumer) {
consumer.accept(zkServer.getSnapshotDirectory());
consumer.accept(zkServer.getLogDirectory());
kafkaServers.values().forEach(server -> consumer.accept(server.getStateDirectory()));
}
/**
* Get the list of brokers.
*
* @return the broker list
*/
public String brokerList() {
StringJoiner joiner = new StringJoiner(",");
kafkaServers.values().forEach(server -> {
joiner.add(server.getConnection());
});
return joiner.toString();
}
private void shutdownReliably(KafkaServer server) {
try {
server.shutdown();
} catch (Throwable t) {
LOGGER.error("Error while shutting down {}", server, t);
}
}
/**
* Obtain the interface for using this cluster.
*
* @return the usage interface; never null
* @throws IllegalStateException if the cluster is not running
*/
public Usage useTo() {
if (!running) throw new IllegalStateException("Unable to use the cluster it is not running");
return new Usage();
}
/**
* A simple interactive Kafka producer for use with the cluster.
*
* @param <K> the type of key
* @param <V> the type of value
*/
public static interface InteractiveProducer<K, V> extends Closeable {
/**
* Write to the topic with the given name a record with the specified key and value. The message is flushed immediately.
*
* @param topic the name of the topic; may not be null
* @param key the key; may not be null
* @param value the value; may not be null
* @return this producer instance to allow chaining methods together
*/
default InteractiveProducer<K, V> write(String topic, K key, V value) {
return write(new ProducerRecord<>(topic, key, value));
}
/**
* Write the specified record to the topic with the given name. The message is flushed immediately.
*
* @param record the record; may not be null
* @return this producer instance to allow chaining methods together
*/
InteractiveProducer<K, V> write(ProducerRecord<K, V> record);
/**
* Close this producer's connection to Kafka and clean up all resources.
*/
@Override
public void close();
}
/**
* A simple interactive Kafka consumer for use with the cluster.
*
* @param <K> the type of key
* @param <V> the type of value
*/
public static interface InteractiveConsumer<K, V> extends Closeable {
/**
* Block until a record can be read from this consumer's topic, and return the value in that record.
*
* @return the value; never null
* @throws InterruptedException if the thread is interrupted while blocking
*/
default V nextValue() throws InterruptedException {
return nextRecord().value();
}
/**
* Block until a record can be read from this consumer's topic, and return the record.
*
* @return the record; never null
* @throws InterruptedException if the thread is interrupted while blocking
*/
ConsumerRecord<K, V> nextRecord() throws InterruptedException;
/**
* Block until a record can be read from this consumer's topic or until the timeout occurs, and if a record was read
* return the value in that record.
*
* @param timeout the maximum amount of time to block to wait for a record
* @param unit the unit of time for the {@code timeout}
* @return the value, or null if the method timed out
* @throws InterruptedException if the thread is interrupted while blocking
*/
default V nextValue(long timeout, TimeUnit unit) throws InterruptedException {
ConsumerRecord<K, V> record = nextRecord(timeout, unit);
return record != null ? record.value() : null;
}
/**
* Block until a record can be read from this consumer's topic or until the timeout occurs, and if a record was read
* return the record.
*
* @param timeout the maximum amount of time to block to wait for a record
* @param unit the unit of time for the {@code timeout}
* @return the record, or null if the method timed out
* @throws InterruptedException if the thread is interrupted while blocking
*/
ConsumerRecord<K, V> nextRecord(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Obtain a stream to consume the input messages. This method can be used in place of repeated calls to the
* {@code next...()} methods.
*
* @return the stream of all messages.
*/
Stream<ConsumerRecord<K, V>> stream();
/**
* Obtain a stream over all of accumulated input messages. This method can be called multiple times, and each time
* the resulting stream will operate over <em>all messages</em> that received by this consumer, and is completely
* independent of the {@link #stream()}, {@link #nextRecord()}, {@link #nextRecord(long, TimeUnit)}, {@link #nextValue()}
* and {@link #nextValue(long, TimeUnit)} methods.
*
* @return the stream of all messages.
*/
Stream<ConsumerRecord<K, V>> streamAll();
/**
* Asynchronously close this consumer's connection to Kafka and begin to clean up all resources.
*/
@Override
public void close();
}
/**
* A set of methods to use a running KafkaCluster.
*/
public class Usage {
/**
* Get a new set of properties for consumers that want to talk to this server.
*
* @param groupId the group ID for the consumer; may not be null
* @param clientId the optional identifier for the client; may be null if not needed
* @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
* out of range; may be null for the default to be used
* @return the mutable consumer properties
* @see #getProducerProperties(String)
*/
public Properties getConsumerProperties(String groupId, String clientId, OffsetResetStrategy autoOffsetReset) {
if (groupId == null) throw new IllegalArgumentException("The groupId is required");
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
if (autoOffsetReset != null) {
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset.toString().toLowerCase());
}
if (clientId != null) props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
return props;
}
/**
* Get a new set of properties for producers that want to talk to this server.
*
* @param clientId the optional identifier for the client; may be null if not needed
* @return the mutable producer properties
* @see #getConsumerProperties(String, String, OffsetResetStrategy)
*/
public Properties getProducerProperties(String clientId) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(1));
if (clientId != null) props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId);
return props;
}
/**
* Create an {@link InteractiveProducer simple producer} that can be used to write messages to the cluster.
*
* @param producerName the name of the producer; may not be null
* @param keySerializer the serializer for the keys; may not be null
* @param valueSerializer the serializer for the values; may not be null
* @return the object that can be used to produce messages; never null
*/
public <K, V> InteractiveProducer<K, V> createProducer(String producerName, Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
Properties props = getProducerProperties(producerName);
KafkaProducer<K, V> producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
return new InteractiveProducer<K, V>() {
@Override
public InteractiveProducer<K, V> write(ProducerRecord<K, V> record) {
producer.send(record);
producer.flush();
return this;
}
@Override
public void close() {
producer.close();
}
};
}
/**
* Create an {@link InteractiveProducer simple producer} that can be used to write {@link Document} messages to the
* cluster.
*
* @param producerName the name of the producer; may not be null
* @return the object that can be used to produce messages; never null
*/
public InteractiveProducer<String, Document> createProducer(String producerName) {
return createProducer(producerName, new StringSerializer(), new DocumentSerdes());
}
/**
* Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param topicName the name of the topic to read; may not be null and may not be empty
* @param keyDeserializer the deserializer for the keys; may not be null
* @param valueDeserializer the deserializer for the values; may not be null
* @param completion the function to call when the consumer terminates; may be null
* @return the running interactive consumer; never null
*/
public <K, V> InteractiveConsumer<K, V> createConsumer(String groupId, String clientId, String topicName,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer, Runnable completion) {
Set<String> topicNames = Collections.singleton(topicName);
return createConsumer(groupId, clientId, topicNames, keyDeserializer, valueDeserializer, completion);
}
/**
* Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param topicNames the names of the topics to read; may not be null and may not be empty
* @param keyDeserializer the deserializer for the keys; may not be null
* @param valueDeserializer the deserializer for the values; may not be null
* @param completion the function to call when the consumer terminates; may be null
* @return the running interactive consumer; never null
*/
public <K, V> InteractiveConsumer<K, V> createConsumer(String groupId, String clientId, Set<String> topicNames,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer, Runnable completion) {
BlockingQueue<ConsumerRecord<K, V>> consumed = new LinkedBlockingQueue<>();
List<ConsumerRecord<K, V>> allMessages = new LinkedList<>();
AtomicBoolean keepReading = new AtomicBoolean();
consume(groupId, clientId, OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, () -> keepReading.get(),
completion, topicNames, record -> {
consumed.add(record);
allMessages.add(record);
});
return new InteractiveConsumer<K, V>() {
@Override
public ConsumerRecord<K, V> nextRecord() throws InterruptedException {
return consumed.take();
}
@Override
public ConsumerRecord<K, V> nextRecord(long timeout, TimeUnit unit) throws InterruptedException {
return consumed.poll(timeout, unit);
}
@Override
public void close() {
keepReading.set(false);
}
@Override
public Stream<ConsumerRecord<K, V>> stream() {
return consumed.stream();
}
@Override
public Stream<ConsumerRecord<K, V>> streamAll() {
return allMessages.stream();
}
};
}
/**
* Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param topicName the name of the topic to read; may not be null and may not be empty
* @param completion the function to call when the consumer terminates; may be null
* @return the running interactive consumer; never null
*/
public InteractiveConsumer<String, Document> createConsumer(String groupId, String clientId, String topicName,
Runnable completion) {
Set<String> topicNames = Collections.singleton(topicName);
return createConsumer(groupId, clientId, topicNames, new StringDeserializer(), new DocumentSerdes(), completion);
}
/**
* Create an {@link InteractiveConsumer simple consumer} that can be used to read messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param topicNames the names of the topics to read; may not be null and may not be empty
* @param completion the function to call when the consumer terminates; may be null
* @return the running interactive consumer; never null
*/
public InteractiveConsumer<String, Document> createConsumer(String groupId, String clientId, Set<String> topicNames,
Runnable completion) {
return createConsumer(groupId, clientId, topicNames, new StringDeserializer(), new DocumentSerdes(), completion);
}
/**
* Use the supplied function to asynchronously produce {@link Document} messages and write them to the cluster.
*
* @param producerName the name of the producer; may not be null
* @param producer the function that will asynchronously
*/
public <K, V> void produce(String producerName, Consumer<InteractiveProducer<String, Document>> producer) {
produce(producerName, new StringSerializer(), new DocumentSerdes(), producer);
}
/**
* Use the supplied function to asynchronously produce messages and write them to the cluster.
*
* @param producerName the name of the producer; may not be null
* @param keySerializer the serializer for the keys; may not be null
* @param valueSerializer the serializer for the values; may not be null
* @param producer the function that will asynchronously
*/
public <K, V> void produce(String producerName, Serializer<K> keySerializer, Serializer<V> valueSerializer,
Consumer<InteractiveProducer<K, V>> producer) {
Properties props = getProducerProperties(producerName);
KafkaProducer<K, V> kafkaProducer = new KafkaProducer<>(props, keySerializer, valueSerializer);
InteractiveProducer<K, V> interactive = new InteractiveProducer<K, V>() {
@Override
public InteractiveProducer<K, V> write(ProducerRecord<K, V> record) {
kafkaProducer.send(record);
kafkaProducer.flush();
return this;
}
@Override
public void close() {
kafkaProducer.close();
}
};
Thread t = new Thread(() -> {
try {
producer.accept(interactive);
} finally {
interactive.close();
}
});
t.setName(producerName + "-thread");
t.start();
}
/**
* Use the supplied function to asynchronously produce messages and write them to the cluster.
*
* @param producerName the name of the producer; may not be null
* @param messageCount the number of messages to produce; must be positive
* @param keySerializer the serializer for the keys; may not be null
* @param valueSerializer the serializer for the values; may not be null
* @param completionCallback the function to be called when the producer is completed; may be null
* @param messageSupplier the function to produce messages; may not be null
*/
public <K, V> void produce(String producerName, int messageCount,
Serializer<K> keySerializer, Serializer<V> valueSerializer,
Runnable completionCallback,
Supplier<ProducerRecord<K, V>> messageSupplier) {
Properties props = getProducerProperties(producerName);
Thread t = new Thread(() -> {
LOGGER.debug("Starting producer {} to write {} messages", producerName, messageCount);
try (KafkaProducer<K, V> producer = new KafkaProducer<>(props, keySerializer, valueSerializer)) {
for (int i = 0; i != messageCount; ++i) {
ProducerRecord<K, V> record = messageSupplier.get();
producer.send(record);
producer.flush();
LOGGER.debug("Producer {}: sent message {}", producerName, record);
}
} finally {
if (completionCallback != null) completionCallback.run();
LOGGER.debug("Stopping producer {}", producerName);
}
});
t.setName(producerName + "-thread");
t.start();
}
/**
* Use the supplied function to asynchronously produce messages with String keys and values, and write them to the
* cluster.
*
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param messageSupplier the function to produce messages; may not be null
*/
public void produceStrings(int messageCount,
Runnable completionCallback, Supplier<ProducerRecord<String, String>> messageSupplier) {
Serializer<String> keySer = new StringSerializer();
Serializer<String> valSer = keySer;
String randomId = UUID.randomUUID().toString();
produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
/**
* Use the supplied function to asynchronously produce messages with String keys and {@link Document} values, and write
* them to the cluster.
*
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param messageSupplier the function to produce messages; may not be null
*/
public void produceDocuments(int messageCount,
Runnable completionCallback, Supplier<ProducerRecord<String, Document>> messageSupplier) {
Serializer<String> keySer = new StringSerializer();
Serializer<Document> valSer = new DocumentSerdes();
String randomId = UUID.randomUUID().toString();
produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
/**
* Use the supplied function to asynchronously produce messages with String keys and Integer values, and write them to the
* cluster.
*
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param messageSupplier the function to produce messages; may not be null
*/
public void produceIntegers(int messageCount,
Runnable completionCallback, Supplier<ProducerRecord<String, Integer>> messageSupplier) {
Serializer<String> keySer = new StringSerializer();
Serializer<Integer> valSer = new IntegerSerializer();
String randomId = UUID.randomUUID().toString();
produce(randomId, messageCount, keySer, valSer, completionCallback, messageSupplier);
}
/**
* Asynchronously produce messages with String keys and sequential Integer values, and write them to the cluster.
*
* @param topic the name of the topic to which the messages should be written; may not be null
* @param messageCount the number of messages to produce; must be positive
* @param initialValue the first integer value to produce
* @param completionCallback the function to be called when the producer is completed; may be null
*/
public void produceIntegers(String topic, int messageCount, int initialValue,
Runnable completionCallback) {
AtomicLong counter = new AtomicLong(initialValue);
produceIntegers(messageCount, completionCallback, () -> {
long i = counter.incrementAndGet();
String keyAndValue = Long.toString(i);
return new ProducerRecord<String, Integer>(topic, keyAndValue, new Integer((int) i));
});
}
/**
* Asynchronously produce messages with monotonically increasing String keys and values obtained from the supplied
* function, and write them to the cluster.
*
* @param topic the name of the topic to which the messages should be written; may not be null
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param valueSupplier the value supplier; may not be null
*/
public void produceStrings(String topic, int messageCount,
Runnable completionCallback, Supplier<String> valueSupplier) {
AtomicLong counter = new AtomicLong(0);
produceStrings(messageCount, completionCallback, () -> {
long i = counter.incrementAndGet();
String keyAndValue = Long.toString(i);
return new ProducerRecord<String, String>(topic, keyAndValue, valueSupplier.get());
});
}
/**
* Asynchronously produce messages with monotonically increasing String keys and values obtained from the supplied
* function, and write them to the cluster.
*
* @param topic the name of the topic to which the messages should be written; may not be null
* @param messageCount the number of messages to produce; must be positive
* @param completionCallback the function to be called when the producer is completed; may be null
* @param valueSupplier the value supplier; may not be null
*/
public void produceDocuments(String topic, int messageCount,
Runnable completionCallback, Supplier<Document> valueSupplier) {
AtomicLong counter = new AtomicLong(0);
produceDocuments(messageCount, completionCallback, () -> {
long i = counter.incrementAndGet();
String keyAndValue = Long.toString(i);
return new ProducerRecord<String, Document>(topic, keyAndValue, valueSupplier.get());
});
}
/**
* Use the supplied function to asynchronously consume messages from the cluster.
*
* @param groupId the name of the group; may not be null
* @param clientId the name of the client; may not be null
* @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
* out of range; may be null for the default to be used
* @param keyDeserializer the deserializer for the keys; may not be null
* @param valueDeserializer the deserializer for the values; may not be null
* @param continuation the function that determines if the consumer should continue; may not be null
* @param completion the function to call when the consumer terminates; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset,
Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer,
BooleanSupplier continuation, Runnable completion, Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) {
Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset);
Thread t = new Thread(() -> {
LOGGER.debug("Starting consumer {} to read messages", clientId);
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) {
consumer.subscribe(new ArrayList<>(topics));
while (continuation.getAsBoolean()) {
consumer.poll(10).forEach(record -> {
LOGGER.debug("Consumer {}: consuming message {}", clientId, record);
consumerFunction.accept(record);
consumer.commitAsync();
});
}
} finally {
if (completion != null) completion.run();
LOGGER.debug("Stopping consumer {}", clientId);
}
});
t.setName(clientId + "-thread");
t.start();
}
/**
* Asynchronously consume all messages from the cluster.
*
* @param continuation the function that determines if the consumer should continue; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public void consumeDocuments(BooleanSupplier continuation, Runnable completion, Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<String, Document>> consumerFunction) {
Deserializer<String> keyDes = new StringDeserializer();
Deserializer<Document> valDes = new DocumentSerdes();
String randomId = UUID.randomUUID().toString();
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
}
/**
* Asynchronously consume all messages from the cluster.
*
* @param continuation the function that determines if the consumer should continue; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public void consumeStrings(BooleanSupplier continuation, Runnable completion, Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<String, String>> consumerFunction) {
Deserializer<String> keyDes = new StringDeserializer();
Deserializer<String> valDes = keyDes;
String randomId = UUID.randomUUID().toString();
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
}
/**
* Asynchronously consume all messages from the cluster.
*
* @param continuation the function that determines if the consumer should continue; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<String, Integer>> consumerFunction) {
Deserializer<String> keyDes = new StringDeserializer();
Deserializer<Integer> valDes = new IntegerDeserializer();
String randomId = UUID.randomUUID().toString();
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param consumer the function to consume the messages; may not be null
*/
public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion,
BiPredicate<String, String> consumer) {
AtomicLong readCounter = new AtomicLong();
consumeStrings(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit),
completion,
Collections.singleton(topicName),
record -> {
if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet();
});
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param consumer the function to consume the messages; may not be null
*/
public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion,
BiPredicate<String, Document> consumer) {
AtomicLong readCounter = new AtomicLong();
consumeDocuments(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit),
completion,
Collections.singleton(topicName),
record -> {
if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet();
});
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
* @param consumer the function to consume the messages; may not be null
*/
public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion,
BiPredicate<String, Integer> consumer) {
AtomicLong readCounter = new AtomicLong();
consumeIntegers(continueIfNotExpired(() -> readCounter.get() < count, timeout, unit),
completion,
Collections.singleton(topicName),
record -> {
if (consumer.test(record.key(), record.value())) readCounter.incrementAndGet();
});
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
*/
public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
consumeStrings(topicName, count, timeout, unit, completion, (key, value) -> true);
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
*/
public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
consumeDocuments(topicName, count, timeout, unit, completion, (key, value) -> true);
}
/**
* Asynchronously consume all messages on the given topic from the cluster.
*
* @param topicName the name of the topic; may not be null
* @param count the expected number of messages to read before terminating; may not be null
* @param timeout the maximum time that this consumer should run before terminating; must be positive
* @param unit the unit of time for the timeout; may not be null
* @param completion the function to call when all messages have been consumed; may be null
*/
public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
consumeIntegers(topicName, count, timeout, unit, completion, (key, value) -> true);
}
protected BooleanSupplier continueIfNotExpired(BooleanSupplier continuation, long timeout, TimeUnit unit) {
return new BooleanSupplier() {
long stopTime = 0L;
@Override
public boolean getAsBoolean() {
if (stopTime == 0L) stopTime = System.currentTimeMillis() + unit.toMillis(timeout);
return continuation.getAsBoolean() && System.currentTimeMillis() <= stopTime;
}
};
}
}
}

View File

@ -0,0 +1,196 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.util.Stopwatch;
import io.debezium.util.Testing;
/**
* @author Randall Hauch
*/
public class KafkaClusterTest {
private KafkaCluster cluster;
private File dataDir;
@Before
public void beforeEach() {
dataDir = Testing.Files.createTestingDirectory("cluster");
Testing.Files.delete(dataDir);
cluster = new KafkaCluster().usingDirectory(dataDir);
}
@After
public void afterEach() {
cluster.shutdown();
Testing.Files.delete(dataDir);
}
@Test
public void shouldStartClusterWithOneBrokerAndRemoveData() throws Exception {
cluster.deleteDataUponShutdown(true).addBrokers(1).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
cluster.shutdown();
cluster.onEachDirectory(this::assertDoesNotExist);
}
@Test
public void shouldStartClusterWithMultipleBrokerAndRemoveData() throws Exception {
cluster.deleteDataUponShutdown(true).addBrokers(3).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
cluster.shutdown();
cluster.onEachDirectory(this::assertDoesNotExist);
}
@Test
public void shouldStartClusterWithOneBrokerAndLeaveData() throws Exception {
cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
cluster.shutdown();
cluster.onEachDirectory(this::assertValidDataDirectory);
}
@Test
public void shouldStartClusterWithMultipleBrokerAndLeaveData() throws Exception {
cluster.deleteDataUponShutdown(false).addBrokers(3).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
cluster.shutdown();
cluster.onEachDirectory(this::assertValidDataDirectory);
}
@Test
public void shouldStartClusterAndAllowProducersAndConsumersToUseIt() throws Exception {
Testing.Debug.enable();
final String topicName = "topicA";
final CountDownLatch completion = new CountDownLatch(2);
final int numMessages = 100;
final AtomicLong messagesRead = new AtomicLong(0);
// Start a cluster and create a topic ...
cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
cluster.createTopics(topicName);
// Consume messages asynchronously ...
Stopwatch sw = Stopwatch.reusable().start();
cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
messagesRead.incrementAndGet();
return true;
});
// Produce some messages asynchronously ...
cluster.useTo().produceIntegers(topicName, numMessages, 1, completion::countDown);
// Wait for both to complete ...
if (completion.await(10, TimeUnit.SECONDS)) {
sw.stop();
Testing.debug("Both consumer and producer completed normally in " + sw.durations());
} else {
Testing.debug("Consumer and/or producer did not completed normally");
}
assertThat(messagesRead.get()).isEqualTo(numMessages);
}
@Test
public void shouldStartClusterAndAllowInteractiveProductionAndAutomaticConsumersToUseIt() throws Exception {
Testing.Debug.enable();
final String topicName = "topicA";
final CountDownLatch completion = new CountDownLatch(1);
final int numMessages = 3;
final AtomicLong messagesRead = new AtomicLong(0);
// Start a cluster and create a topic ...
cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
cluster.createTopics(topicName);
// Consume messages asynchronously ...
Stopwatch sw = Stopwatch.reusable().start();
cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
messagesRead.incrementAndGet();
return true;
});
// Produce some messages interactively ...
cluster.useTo()
.createProducer("manual", new StringSerializer(), new IntegerSerializer())
.write(topicName, "key1", 1)
.write(topicName, "key2", 2)
.write(topicName, "key3", 3)
.close();
// Wait for the consumer to to complete ...
if (completion.await(10, TimeUnit.SECONDS)) {
sw.stop();
Testing.debug("The consumer completed normally in " + sw.durations());
} else {
Testing.debug("Consumer did not completed normally");
}
assertThat(messagesRead.get()).isEqualTo(numMessages);
}
@Test
public void shouldStartClusterAndAllowAsynchronousProductionAndAutomaticConsumersToUseIt() throws Exception {
Testing.Debug.enable();
final String topicName = "topicA";
final CountDownLatch completion = new CountDownLatch(2);
final int numMessages = 3;
final AtomicLong messagesRead = new AtomicLong(0);
// Start a cluster and create a topic ...
cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
cluster.createTopics(topicName);
// Consume messages asynchronously ...
Stopwatch sw = Stopwatch.reusable().start();
cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
messagesRead.incrementAndGet();
return true;
});
// Produce some messages interactively ...
cluster.useTo().produce("manual", new StringSerializer(), new IntegerSerializer(), produer -> {
produer.write(topicName, "key1", 1);
produer.write(topicName, "key2", 2);
produer.write(topicName, "key3", 3);
completion.countDown();
});
// Wait for the consumer to to complete ...
if (completion.await(10, TimeUnit.SECONDS)) {
sw.stop();
Testing.debug("The consumer completed normally in " + sw.durations());
} else {
Testing.debug("Consumer did not completed normally");
}
assertThat(messagesRead.get()).isEqualTo(numMessages);
}
protected void assertValidDataDirectory(File dir) {
assertThat(dir.exists()).isTrue();
assertThat(dir.isDirectory()).isTrue();
assertThat(dir.canWrite()).isTrue();
assertThat(dir.canRead()).isTrue();
assertThat(Testing.Files.inTargetDir(dir)).isTrue();
}
protected void assertDoesNotExist(File dir) {
assertThat(dir.exists()).isFalse();
}
}

View File

@ -0,0 +1,339 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.util.IoUtil;
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.utils.ZkUtils;
/**
* A small embedded Kafka server.
*
* @author Randall Hauch
*/
@ThreadSafe
public class KafkaServer {
public static final int DEFAULT_BROKER_ID = 1;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServer.class);
private final Supplier<String> zkConnection;
private final int brokerId;
private volatile File logsDir;
private final Properties config;
private volatile int desiredPort = -1;
private volatile int port = -1;
private volatile kafka.server.KafkaServer server;
/**
* Create a new server instance.
*
* @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
*/
public KafkaServer(Supplier<String> zookeeperConnection) {
this(zookeeperConnection, DEFAULT_BROKER_ID);
}
/**
* Create a new server instance.
*
* @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
* @param brokerId the unique broker ID
*/
public KafkaServer(Supplier<String> zookeeperConnection, int brokerId) {
this(zookeeperConnection, brokerId, -1);
}
/**
* Create a new server instance.
*
* @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
* @param brokerId the unique broker ID
* @param port the desired port
*/
public KafkaServer(Supplier<String> zookeeperConnection, int brokerId, int port) {
if (zookeeperConnection == null) throw new IllegalArgumentException("The Zookeeper connection string supplier may not be null");
this.zkConnection = zookeeperConnection;
this.brokerId = brokerId;
this.config = new Properties();
setPort(port);
populateDefaultConfiguration(this.config);
}
protected int brokerId() {
return brokerId;
}
protected String zookeeperConnection() {
return this.zkConnection.get();
}
/**
* Set the initial default configuration properties. This method is called from the constructors and can be overridden
* to customize these properties.
*
* @param props the configuration properties; never null
*/
protected void populateDefaultConfiguration(Properties props) {
config.setProperty(KafkaConfig.NumPartitionsProp(), String.valueOf(1));
config.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
}
/**
* Set a configuration property. Several key properties that deal with Zookeeper, the host name, and the broker ID,
* may not be set via this method and are ignored since they are controlled elsewhere in this instance.
*
* @param name the property name; may not be null
* @param value the property value; may be null
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the server is running when this method is called
*/
public KafkaServer setProperty(String name, String value) {
if (server != null) throw new IllegalStateException("Unable to change the properties when already running");
if (!KafkaConfig.ZkConnectProp().equalsIgnoreCase(name)
&& !KafkaConfig.BrokerIdProp().equalsIgnoreCase(name)
&& !KafkaConfig.HostNameProp().equalsIgnoreCase(name)) {
this.config.setProperty(name, value);
}
return this;
}
/**
* Set multiple configuration properties. Several key properties that deal with Zookeeper, the host name, and the broker ID,
* may not be set via this method and are ignored since they are controlled elsewhere in this instance.
*
* @param properties the configuration properties; may be null or empty
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the server is running when this method is called
*/
public KafkaServer setProperties( Properties properties ) {
if (server != null) throw new IllegalStateException("Unable to change the properties when already running");
properties.stringPropertyNames().forEach(propName -> {
setProperty(propName, properties.getProperty(propName));
});
return this;
}
/**
* Set the port for the server.
*
* @param port the desired port, or {@code -1} if a random available port should be found and used
* @return this instance to allow chaining methods; never null
*/
public KafkaServer setPort(int port) {
this.desiredPort = port > 0 ? port : -1;
this.port = desiredPort;
return this;
}
/**
* Get a copy of the complete configuration that is or will be used by the running server.
*
* @return the properties for the currently-running server; may be empty if not running
*/
public Properties config() {
Properties runningConfig = new Properties(config);
runningConfig.setProperty(KafkaConfig.ZkConnectProp(), zookeeperConnection());
runningConfig.setProperty(KafkaConfig.BrokerIdProp(), Integer.toString(brokerId));
runningConfig.setProperty(KafkaConfig.HostNameProp(), "localhost");
runningConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), String.valueOf(Boolean.TRUE));
return runningConfig;
}
/**
* Get the connection string. If the server is not {@link #startup() running} and the port is to be dynamically discovered
* upon startup, then this method returns "{@code localhost:-1}".
*
* @return the connection string; never null
*/
public String getConnection() {
return "localhost:" + port;
}
/**
* Start the embedded Kafka server.
*
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the server is already running
*/
public synchronized KafkaServer startup() {
if (server != null) throw new IllegalStateException("" + this + " is already running");
// Determine the storage directory and adjust the configuration ...
Properties config = config();
if (logsDir == null) {
try {
File temp = File.createTempFile("kafka", "suffix");
this.logsDir = temp.getParentFile();
temp.delete();
} catch (IOException e) {
throw new RuntimeException("Unable to create temporary directory", e);
}
}
config.setProperty(KafkaConfig.LogDirProp(), logsDir.getAbsolutePath());
// Determine the port and adjust the configuration ...
port = desiredPort > 0 ? desiredPort : IoUtil.getAvailablePort();
config.setProperty(KafkaConfig.PortProp(), Integer.toString(port));
// config.setProperty("metadata.broker.list", getConnection());
// Start the server ...
try {
LOGGER.debug("Starting Kafka broker {} @ {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath());
server = new kafka.server.KafkaServer(new KafkaConfig(config), new SystemTime(), scala.Option.apply(null));
server.startup();
LOGGER.info("Started Kafka server {} @ {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath());
return this;
} catch (RuntimeException e) {
server = null;
throw e;
}
}
/**
* Shutdown the embedded Kafka server and delete all data.
*/
public synchronized void shutdown() {
if (server != null) {
try {
server.shutdown();
} finally {
server = null;
port = desiredPort;
}
}
}
/**
* Delete all of the data associated with this server.
*/
public synchronized void deleteData() {
if (server == null) {
// Delete all data ...
try {
IoUtil.delete(this.logsDir);
} catch (IOException e) {
LOGGER.error("Unable to delete directory '{}'", this.logsDir, e);
}
}
}
/**
* Get the Zookeeper utilities used by the running Kafka server.
*
* @return the Zookeeper utilities, or null if the Kafka server is not running
*/
public ZkUtils getZkUtils() {
return server != null ? server.zkUtils() : null;
}
/**
* Create the specified topics.
*
* @param topics the names of the topics to create
*/
public void createTopics(String... topics) {
createTopics(1,1,topics);
}
/**
* Create the specified topics.
*
* @param numPartitions the number of partitions for each topic
* @param replicationFactor the replication factor for each topic
* @param topics the names of the topics to create
*/
public void createTopics(int numPartitions, int replicationFactor, String... topics) {
for (String topic : topics) {
if ( topic != null ) createTopic(topic, numPartitions, replicationFactor);
}
}
/**
* Create the specified topic.
*
* @param topic the name of the topic to create
* @param numPartitions the number of partitions for the topic
* @param replicationFactor the replication factor for the topic
*/
public void createTopic( String topic, int numPartitions, int replicationFactor ) {
AdminUtils.createTopic(getZkUtils(), topic, 1, 1, new Properties());
}
/**
* Perform the supplied function on each directory used by this server.
*
* @param consumer the consumer function; may not be null
*/
void onEachDirectory(Consumer<File> consumer) {
consumer.accept(getStateDirectory());
}
/**
* Get the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself
* under this directory.
*
* @return the parent directory for the broker's state; may be null if a temporary directory will be used
*/
public File getStateDirectory() {
return this.logsDir;
}
/**
* Set the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself
* under this directory.
*
* @param stateDirectory the parent directory for the broker's state; may be null if a temporary directory will be used
* @throws IllegalArgumentException if the supplied file is not a directory or not writable
*/
public void setStateDirectory(File stateDirectory) {
if (stateDirectory != null && stateDirectory.exists() && !stateDirectory.isDirectory() && !stateDirectory.canWrite()
&& !stateDirectory.canRead()) {
throw new IllegalArgumentException("The directory must be readable and writable");
}
this.logsDir = stateDirectory;
}
@Override
public String toString() {
return "KafkaServer{" + getConnection() + "}";
}
protected static class SystemTime implements kafka.utils.Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
@Override
public long nanoseconds() {
return System.nanoTime();
}
@Override
public void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
// Ignore
}
}
}
}

View File

@ -0,0 +1,220 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.function.Consumer;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.util.IoUtil;
/**
* A lightweight embeddable Zookeeper server useful for unit testing.
*
* @author Randall Hauch
* @see KafkaCluster
*/
@ThreadSafe
public class ZookeeperServer {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServer.class);
public static int DEFAULT_TICK_TIME = 500;
/**
* The basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be
* twice the tickTime.
*/
private volatile int tickTime = DEFAULT_TICK_TIME;
private volatile int port = -1;
private volatile ServerCnxnFactory factory;
private volatile File dataDir;
private volatile File snapshotDir;
private volatile File logDir;
/**
* Create a new server instance.
*/
public ZookeeperServer() {
}
/**
* Start the embedded Zookeeper server.
*
* @return this instance to allow chaining methods; never null
* @throws IOException if there is an error during startup
* @throws IllegalStateException if the server is already running
*/
public synchronized ZookeeperServer startup() throws IOException {
if (factory != null) throw new IllegalStateException("" + this + " is already running");
if (this.port == -1) this.port = IoUtil.getAvailablePort();
this.factory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), 1024);
if ( this.dataDir == null ) {
try {
File temp = File.createTempFile("kafka","suffix");
this.dataDir = temp.getParentFile();
temp.delete();
} catch ( IOException e ) {
throw new RuntimeException("Unable to create temporary directory",e);
}
}
this.snapshotDir = new File(this.dataDir,"snapshot");
this.logDir = new File(this.dataDir,"log");
this.snapshotDir.mkdirs();
this.logDir.mkdirs();
try {
factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
return this;
} catch (InterruptedException e) {
factory = null;
Thread.interrupted();
throw new IOException(e);
}
}
/**
* Shutdown the embedded Zookeeper server and delete all data.
*/
public void shutdown() {
shutdown(true);
}
/**
* Shutdown the embedded Kafka server.
*
* @param deleteData true if the data should be removed, or false otherwise
*/
public synchronized void shutdown(boolean deleteData) {
if (factory != null) {
try {
factory.shutdown();
} finally {
factory = null;
if (deleteData) {
// Delete all data ...
try {
IoUtil.delete(this.snapshotDir,this.logDir);
} catch ( IOException e ) {
LOGGER.error("Unable to delete data upon shutdown",e);
}
}
}
}
}
/**
* Get the connection string. If the server is not {@link #startup() running} and the port is to be dynamically discovered
* upon startup, then this method returns "{@code localhost:-1}".
*
* @return the connection string; never null
*/
public String getConnection() {
return "localhost:" + port;
}
/**
* Set the port for the server.
*
* @param port the desired port, or {@code -1} if a random available port should be found and used
* @return this instance to allow chaining methods; never null
*/
public ZookeeperServer setPort(int port) {
this.port = port;
return this;
}
/**
* Set the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will
* be twice the tickTime.
*
* @param tickTime the desired value, or non-positive if the default of {@value #DEFAULT_TICK_TIME} be used
* @return this instance to allow chaining methods; never null
*/
public ZookeeperServer setTickTime(int tickTime) {
this.tickTime = tickTime > 0 ? tickTime : DEFAULT_TICK_TIME;
return this;
}
/**
* Get the current port.
*
* @return the port number, or {@code -1} if the port should be discovered upon {@link #startup()}
*/
public int getPort() {
return port;
}
/**
* Get the basic time unit in milliseconds used by ZooKeeper.
*
* @return tick time; always positive
*/
public int getTickTime() {
return tickTime;
}
/**
* Perform the supplied function on each directory used by this server.
*
* @param consumer the consumer function; may not be null
*/
void onEachDirectory(Consumer<File> consumer) {
consumer.accept(getSnapshotDirectory());
consumer.accept(getLogDirectory());
}
/**
* Get the parent directory where the server's snapshots are kept.
* @return the parent directory for the server's snapshots; never null once the server is running
*/
File getSnapshotDirectory() {
return this.snapshotDir;
}
/**
* Get the parent directory where the server's logs are kept.
* @return the parent directory for the server's logs; never null once the server is running
*/
File getLogDirectory() {
return this.logDir;
}
/**
* Get the parent directory where the server's logs and snapshots will be kept.
* @return the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used
*/
public File getStateDirectory() {
return this.logDir;
}
/**
* Set the parent directory where the server's logs and snapshots will be kept.
* @param dataDir the parent directory for the server's logs and snapshots; may be null if a temporary directory will be used
* @return this instance to allow method chaining; never null
* @throws IllegalArgumentException if the supplied file is not a directory or not writable
*/
public ZookeeperServer setStateDirectory(File dataDir) {
if ( dataDir != null && dataDir.exists() && !dataDir.isDirectory() && !dataDir.canWrite() && !dataDir.canRead() ) {
throw new IllegalArgumentException("The directory must be readable and writable");
}
this.dataDir = dataDir;
return this;
}
@Override
public String toString() {
return "ZookeeperServer{" + getConnection() + "}";
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kafka;
import java.io.File;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.util.Testing;
/**
* @author Randall Hauch
*
*/
public class ZookeeperServerTest {
private ZookeeperServer server;
private File dataDir;
@Before
public void beforeEach() {
dataDir = Testing.Files.createTestingDirectory("zk");
Testing.Files.delete(dataDir);
server = new ZookeeperServer();
server.setStateDirectory(dataDir);
}
@After
public void afterEach() {
Testing.Files.delete(dataDir);
}
@Test
public void shouldStartServerAndRemoveData() throws Exception {
Testing.debug("Running 1");
server.startup();
server.onEachDirectory(this::assertValidDataDirectory);
server.shutdown(true);
server.onEachDirectory(this::assertDoesNotExist);
}
@Test
public void shouldStartServerAndLeaveData() throws Exception {
Testing.debug("Running 2");
server.startup();
server.onEachDirectory(this::assertValidDataDirectory);
server.shutdown(false);
server.onEachDirectory(this::assertValidDataDirectory);
}
protected void assertValidDataDirectory(File dir) {
assertThat(dir.exists()).isTrue();
assertThat(dir.isDirectory()).isTrue();
assertThat(dir.canWrite()).isTrue();
assertThat(dir.canRead()).isTrue();
assertThat(Testing.Files.inTargetDir(dir)).isTrue();
}
protected void assertDoesNotExist(File dir) {
assertThat(dir.exists()).isFalse();
}
}

View File

@ -4,8 +4,56 @@ log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %m (%c)%n
# Direct Zookeeper log messages to stdout with special prefix
log4j.appender.zk=org.apache.log4j.ConsoleAppender
log4j.appender.zk.Target=System.out
log4j.appender.zk.layout=org.apache.log4j.PatternLayout
log4j.appender.zk.layout.ConversionPattern=%d{ISO8601} %-5p ZOO %m%n
#log4j.appender.zk.layout.ConversionPattern=%d{ISO8601} %-5p ZOO %m (%c)%n
# Direct Zookeeper Client log messages to stdout with special prefix
log4j.appender.zkclient=org.apache.log4j.ConsoleAppender
log4j.appender.zkclient.Target=System.out
log4j.appender.zkclient.layout=org.apache.log4j.PatternLayout
log4j.appender.zkclient.layout.ConversionPattern=%d{ISO8601} %-5p ZKC %m%n
#log4j.appender.zkclient.layout.ConversionPattern=%d{ISO8601} %-5p ZKC %m (%c)%n
# Direct Kafka log messages to stdout with special prefix
log4j.appender.kafka=org.apache.log4j.ConsoleAppender
log4j.appender.kafka.Target=System.out
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{ISO8601} %-5p KAF %m%n
#log4j.appender.kafka.layout.ConversionPattern=%d{ISO8601} %-5p KAF %m (%c)%n
# Direct Debezium log messages to stdout with special prefix
log4j.appender.debezium=org.apache.log4j.ConsoleAppender
log4j.appender.debezium.Target=System.out
log4j.appender.debezium.layout=org.apache.log4j.PatternLayout
log4j.appender.debezium.layout.ConversionPattern=%d{ISO8601} %-5p DBZ %m (%c)%n
# Root logger option
log4j.rootLogger=INFO, stdout
# Set up the default logging to be INFO level, then override specific units
log4j.logger.io.debezium=INFO
log4j.logger.io.debezium=INFO, debezium
log4j.additivity.io.debezium=false
# Kafka is pretty verbose at INFO level, so for brevity use ERROR everywhere except INFO at kafka.server.KafkaServer
log4j.logger.org.apache.kafka=ERROR, kafka
log4j.additivity.org.apache.kafka=false
log4j.logger.kafka=ERROR, kafka
log4j.additivity.kafka=false
log4j.logger.kafka.server.KafkaServer=ERROR, kafka
log4j.additivity.kafka.server.KafkaServer=false
log4j.logger.state.change.logger=ERROR, kafka
log4j.additivity.state.change.logger=false
# Zookeeper is pretty verbose at INFO level, so for brevity use ERROR everywhere except INFO at org.apache.zookeeper.server.ZooKeeperServer
log4j.logger.org.apache.zookeeper=ERROR, zk
log4j.additivity.org.apache.zookeeper=false
log4j.logger.org.apache.zookeeper.server.ZooKeeperServer=ERROR, zk
log4j.additivity.org.apache.zookeeper.server.ZooKeeperServer=false
# Zookeeper Client is pretty verbose at INFO level, so for brevity use ERROR everywhere ...
log4j.logger.org.I0Itec.zkclient=ERROR, zkclient
log4j.additivity.org.I0Itec.zkclient=false

36
pom.xml
View File

@ -79,6 +79,12 @@
<!-- Dockerfiles -->
<docker.maintainer>Debezium community</docker.maintainer>
<!-- Kafka -->
<version.kafka>0.9.0.0</version.kafka>
<version.kafka.scala>2.11</version.kafka.scala>
<version.scala>2.11.7</version.scala>
<version.curator>2.4.0</version.curator>
<version.zookeeper>3.4.6</version.zookeeper>
</properties>
<modules>
<module>support/checkstyle</module>
@ -158,6 +164,36 @@
<scope>test</scope>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${version.kafka.scala}</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${version.zookeeper}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${version.kafka.scala}</artifactId>
<version>${version.kafka}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${version.curator}</version>
<scope>test</scope>
</dependency>
<!-- Test depedencies -->
<dependency>
<groupId>junit</groupId>