DBZ-1358 Avoiding instantiation of SystemTime

This commit is contained in:
Gunnar Morling 2019-06-28 08:05:56 +02:00
parent a0df2c140b
commit 254dba97ee

View File

@ -11,7 +11,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,7 +25,7 @@
/**
* A small embedded Kafka server.
*
*
* @author Randall Hauch
*/
@ThreadSafe
@ -46,7 +46,7 @@ public class KafkaServer {
/**
* Create a new server instance.
*
*
* @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
*/
public KafkaServer(Supplier<String> zookeeperConnection) {
@ -55,7 +55,7 @@ public KafkaServer(Supplier<String> zookeeperConnection) {
/**
* Create a new server instance.
*
*
* @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
* @param brokerId the unique broker ID
*/
@ -65,7 +65,7 @@ public KafkaServer(Supplier<String> zookeeperConnection, int brokerId) {
/**
* Create a new server instance.
*
*
* @param zookeeperConnection the supplier of the Zookeeper connection string; may not be null
* @param brokerId the unique broker ID
* @param port the desired port
@ -92,7 +92,7 @@ protected String zookeeperConnection() {
/**
* Set the initial default configuration properties. This method is called from the constructors and can be overridden
* to customize these properties.
*
*
* @param props the configuration properties; never null
*/
protected void populateDefaultConfiguration(Properties props) {
@ -103,7 +103,7 @@ protected void populateDefaultConfiguration(Properties props) {
/**
* Set a configuration property. Several key properties that deal with Zookeeper, the host name, and the broker ID,
* may not be set via this method and are ignored since they are controlled elsewhere in this instance.
*
*
* @param name the property name; may not be null
* @param value the property value; may be null
* @return this instance to allow chaining methods; never null
@ -120,11 +120,11 @@ public KafkaServer setProperty(String name, String value) {
}
return this;
}
/**
* Set multiple configuration properties. Several key properties that deal with Zookeeper, the host name, and the broker ID,
* may not be set via this method and are ignored since they are controlled elsewhere in this instance.
*
*
* @param properties the configuration properties; may be null or empty
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the server is running when this method is called
@ -138,12 +138,12 @@ public KafkaServer setProperties( Properties properties ) {
});
return this;
}
/**
* Set the port for the server.
*
*
* @param port the desired port, or {@code -1} if a random available port should be found and used
* @return this instance to allow chaining methods; never null
*/
@ -155,7 +155,7 @@ public KafkaServer setPort(int port) {
/**
* Get a copy of the complete configuration that is or will be used by the running server.
*
*
* @return the properties for the currently-running server; may be empty if not running
*/
public Properties config() {
@ -175,7 +175,7 @@ public Properties config() {
/**
* Get the connection string. If the server is not {@link #startup() running} and the port is to be dynamically discovered
* upon startup, then this method returns "{@code localhost:-1}".
*
*
* @return the connection string; never null
*/
public String getConnection() {
@ -184,7 +184,7 @@ public String getConnection() {
/**
* Start the embedded Kafka server.
*
*
* @return this instance to allow chaining methods; never null
* @throws IllegalStateException if the server is already running
*/
@ -215,7 +215,7 @@ public synchronized KafkaServer startup() {
// Start the server ...
try {
LOGGER.debug("Starting Kafka broker {} at {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath());
server = new kafka.server.KafkaServer(new KafkaConfig(config), new SystemTime(), scala.Option.apply(null),
server = new kafka.server.KafkaServer(new KafkaConfig(config), Time.SYSTEM, scala.Option.apply(null),
new scala.collection.mutable.ArraySeq<>(0));
server.startup();
LOGGER.info("Started Kafka server {} at {} with storage in {}", brokerId, getConnection(), logsDir.getAbsolutePath());
@ -229,7 +229,7 @@ public synchronized KafkaServer startup() {
/**
* Shutdown the embedded Kafka server and delete all data.
*
*
* @param deleteLogs whether or not to remove all the log files after shutting down
*/
public synchronized void shutdown(boolean deleteLogs) {
@ -249,7 +249,7 @@ public synchronized void shutdown(boolean deleteLogs) {
}
}
}
/**
* Delete all of the data associated with this server.
*/
@ -266,7 +266,7 @@ public synchronized void deleteData() {
/**
* Get the Zookeeper admin client used by the running Kafka server.
*
*
* @return the Zookeeper admin client, or null if the Kafka server is not running
*/
public AdminZkClient getAdminZkClient() {
@ -275,7 +275,7 @@ public AdminZkClient getAdminZkClient() {
/**
* Create the specified topics.
*
*
* @param topics the names of the topics to create
*/
public void createTopics(String... topics) {
@ -284,7 +284,7 @@ public void createTopics(String... topics) {
/**
* Create the specified topics.
*
*
* @param numPartitions the number of partitions for each topic
* @param replicationFactor the replication factor for each topic
* @param topics the names of the topics to create
@ -296,10 +296,10 @@ public void createTopics(int numPartitions, int replicationFactor, String... top
}
}
}
/**
* Create the specified topic.
*
*
* @param topic the name of the topic to create
* @param numPartitions the number of partitions for the topic
* @param replicationFactor the replication factor for the topic
@ -311,7 +311,7 @@ public void createTopic( String topic, int numPartitions, int replicationFactor
/**
* Perform the supplied function on each directory used by this server.
*
*
* @param consumer the consumer function; may not be null
*/
void onEachDirectory(Consumer<File> consumer) {
@ -321,7 +321,7 @@ void onEachDirectory(Consumer<File> consumer) {
/**
* Get the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself
* under this directory.
*
*
* @return the parent directory for the broker's state; may be null if a temporary directory will be used
*/
public File getStateDirectory() {
@ -331,7 +331,7 @@ public File getStateDirectory() {
/**
* Set the parent directory where the broker's state will be kept. The broker will create a subdirectory for itself
* under this directory.
*
*
* @param stateDirectory the parent directory for the broker's state; may be null if a temporary directory will be used
* @throws IllegalArgumentException if the supplied file is not a directory or not writable
*/