DBZ-7906 Enhancing the threads utility class for broader use

This commit is contained in:
Ankur Gupta 2024-05-30 12:18:43 +05:30 committed by Jiri Pechanec
parent 2917b78881
commit 1cf3b72f1b
2 changed files with 22 additions and 22 deletions

View File

@ -45,6 +45,7 @@ Angshuman Dey
Anil Dasari
Animesh Kumar
Anisha Mohanty
Ankur Gupta
Ant Kutschera
Anton Kondratev
Anton Martynov

View File

@ -16,7 +16,6 @@
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -79,7 +78,7 @@ public void reset() {
@Override
public long elapsedTime() {
long elapsed = clock.currentTimeInMillis() - lastTimeInMillis;
return elapsed <= 0L ? 0L : elapsed;
return Math.max(elapsed, 0L);
}
};
}
@ -240,35 +239,35 @@ private Threads() {
/**
* Returns a thread factory that creates threads conforming to Debezium thread naming
* pattern {@code debezium-<connector class>-<connector-id>-<thread-name>}.
* pattern {@code debezium-<component class>-<component-id>-<thread-name>}.
*
* @param connector - the source connector class
* @param connectorId - the identifier to differentiate between connector instances
* @param component - the source connector or sink change consumer class
* @param componentId - the identifier to differentiate between component instances
* @param name - the name of the thread
* @param indexed - true if the thread name should be appended with an index
* @param daemon - true if the thread should be a daemon thread
* @return the thread factory setting the correct name
*/
public static ThreadFactory threadFactory(Class<? extends SourceConnector> connector, String connectorId, String name, boolean indexed, boolean daemon) {
return threadFactory(connector, connectorId, name, indexed, daemon, null);
public static ThreadFactory threadFactory(Class<?> component, String componentId, String name, boolean indexed, boolean daemon) {
return threadFactory(component, componentId, name, indexed, daemon, null);
}
/**
* Returns a thread factory that creates threads conforming to Debezium thread naming
* pattern {@code debezium-<connector class>-<connector-id>-<thread-name>}.
* pattern {@code debezium-<component class>-<component-id>-<thread-name>}.
*
* @param connector - the source connector class
* @param connectorId - the identifier to differentiate between connector instances
* @param component - the source or sink component class
* @param componentId - the identifier to differentiate between componentId instances
* @param name - the name of the thread
* @param indexed - true if the thread name should be appended with an index
* @param daemon - true if the thread should be a daemon thread
* @param callback - a callback called on every thread created
* @return the thread factory setting the correct name
*/
public static ThreadFactory threadFactory(Class<? extends SourceConnector> connector, String connectorId, String name, boolean indexed, boolean daemon,
public static ThreadFactory threadFactory(Class<?> component, String componentId, String name, boolean indexed, boolean daemon,
Consumer<Thread> callback) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Requested thread factory for connector {}, id = {} named = {}", connector.getSimpleName(), connectorId, name);
LOGGER.info("Requested thread factory for component {}, id = {} named = {}", component.getSimpleName(), componentId, name);
}
return new ThreadFactory() {
@ -277,9 +276,9 @@ public static ThreadFactory threadFactory(Class<? extends SourceConnector> conne
@Override
public Thread newThread(Runnable r) {
StringBuilder threadName = new StringBuilder(DEBEZIUM_THREAD_NAME_PREFIX)
.append(connector.getSimpleName().toLowerCase())
.append(component.getSimpleName().toLowerCase())
.append('-')
.append(connectorId)
.append(componentId)
.append('-')
.append(name);
if (indexed) {
@ -296,19 +295,19 @@ public Thread newThread(Runnable r) {
};
}
public static ExecutorService newSingleThreadExecutor(Class<? extends SourceConnector> connector, String connectorId, String name, boolean daemon) {
return Executors.newSingleThreadExecutor(threadFactory(connector, connectorId, name, false, daemon));
public static ExecutorService newSingleThreadExecutor(Class<?> component, String componentId, String name, boolean daemon) {
return Executors.newSingleThreadExecutor(threadFactory(component, componentId, name, false, daemon));
}
public static ExecutorService newFixedThreadPool(Class<? extends SourceConnector> connector, String connectorId, String name, int threadCount) {
return Executors.newFixedThreadPool(threadCount, threadFactory(connector, connectorId, name, true, false));
public static ExecutorService newFixedThreadPool(Class<?> component, String componentId, String name, int threadCount) {
return Executors.newFixedThreadPool(threadCount, threadFactory(component, componentId, name, true, false));
}
public static ExecutorService newSingleThreadExecutor(Class<? extends SourceConnector> connector, String connectorId, String name) {
return newSingleThreadExecutor(connector, connectorId, name, false);
public static ExecutorService newSingleThreadExecutor(Class<?> component, String componentId, String name) {
return newSingleThreadExecutor(component, componentId, name, false);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(Class<? extends SourceConnector> connector, String connectorId, String name, boolean daemon) {
return Executors.newSingleThreadScheduledExecutor(threadFactory(connector, connectorId, name, false, daemon));
public static ScheduledExecutorService newSingleThreadScheduledExecutor(Class<?> component, String componentId, String name, boolean daemon) {
return Executors.newSingleThreadScheduledExecutor(threadFactory(component, componentId, name, false, daemon));
}
}