DBZ-537 OffsetCommitPolicy supports Configuration constructor initialization

This commit is contained in:
Jiri Pechanec 2018-01-30 09:47:10 +01:00 committed by Gunnar Morling
parent a8750221cb
commit 5be55ae8ff
4 changed files with 72 additions and 44 deletions

View File

@ -1385,23 +1385,8 @@ default <T> T getInstance(String key, Class<T> type) {
* @return the new instance, or null if there is no such key-value pair in the configuration or if there is a key-value
* configuration but the value could not be converted to an existing class with a zero-argument constructor
*/
@SuppressWarnings("unchecked")
default <T> T getInstance(String key, Class<T> type, Supplier<ClassLoader> classloaderSupplier) {
String className = getString(key);
if (className != null) {
ClassLoader classloader = classloaderSupplier != null ? classloaderSupplier.get() : getClass().getClassLoader();
try {
Class<? extends T> clazz = (Class<? extends T>) classloader.loadClass(className);
return clazz.newInstance();
} catch (ClassNotFoundException e) {
LoggerFactory.getLogger(getClass()).error("Unable to find class {}", className, e);
} catch (InstantiationException e) {
LoggerFactory.getLogger(getClass()).error("Unable to instantiate class {}", className, e);
} catch (IllegalAccessException e) {
LoggerFactory.getLogger(getClass()).error("Unable to access class {}", className, e);
}
}
return null;
return ConfigurationHelper.doGetInstance(getString(key), classloaderSupplier, null);
}
/**
@ -1416,6 +1401,34 @@ default <T> T getInstance(Field field, Class<T> clazz) {
return getInstance(field, clazz, () -> getClass().getClassLoader());
}
/**
* Get an instance of the class given by the value in the configuration associated with the given key.
* The instance is created using {@code Instance(Configuration)} constructor.
*
* @param key the key for the configuration property
* @param clazz the Class of which the resulting object is expected to be an instance of; may not be null
* @param the {@link Configuration} object that is passed as a parameter to the constructor
* @return the new instance, or null if there is no such key-value pair in the configuration or if there is a key-value
* configuration but the value could not be converted to an existing class with a zero-argument constructor
*/
default <T> T getInstance(String key, Class<T> clazz, Configuration configuration) {
return ConfigurationHelper.doGetInstance(getString(key), () -> getClass().getClassLoader(), configuration);
}
/**
* Get an instance of the class given by the value in the configuration associated with the given field.
* The instance is created using {@code Instance(Configuration)} constructor.
*
* @param field the field for the configuration property
* @param clazz the Class of which the resulting object is expected to be an instance of; may not be null
* @param the {@link Configuration} object that is passed as a parameter to the constructor
* @return the new instance, or null if there is no such key-value pair in the configuration or if there is a key-value
* configuration but the value could not be converted to an existing class with a zero-argument constructor
*/
default <T> T getInstance(Field field, Class<T> clazz, Configuration configuration) {
return ConfigurationHelper.doGetInstance(getString(field), () -> getClass().getClassLoader(), configuration);
}
/**
* Get an instance of the class given by the value in the configuration associated with the given field.
*
@ -1426,23 +1439,8 @@ default <T> T getInstance(Field field, Class<T> clazz) {
* @return the new instance, or null if there is no such key-value pair in the configuration or if there is a key-value
* configuration but the value could not be converted to an existing class with a zero-argument constructor
*/
@SuppressWarnings("unchecked")
default <T> T getInstance(Field field, Class<T> type, Supplier<ClassLoader> classloaderSupplier) {
String className = getString(field);
if (className != null) {
ClassLoader classloader = classloaderSupplier != null ? classloaderSupplier.get() : getClass().getClassLoader();
try {
Class<? extends T> clazz = (Class<? extends T>) classloader.loadClass(className);
return clazz.newInstance();
} catch (ClassNotFoundException e) {
LoggerFactory.getLogger(getClass()).error("Unable to find class {}", className, e);
} catch (InstantiationException e) {
LoggerFactory.getLogger(getClass()).error("Unable to instantiate class {}", className, e);
} catch (IllegalAccessException e) {
LoggerFactory.getLogger(getClass()).error("Unable to access class {}", className, e);
}
}
return null;
return ConfigurationHelper.doGetInstance(getString(field), classloaderSupplier, null);
}
/**

View File

@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.config;
import java.lang.reflect.InvocationTargetException;
import java.util.function.Supplier;
import org.slf4j.LoggerFactory;
class ConfigurationHelper {
@SuppressWarnings("unchecked")
static <T> T doGetInstance(String className, Supplier<ClassLoader> classloaderSupplier,
Configuration configuration) {
if (className != null) {
ClassLoader classloader = classloaderSupplier != null ? classloaderSupplier.get()
: Configuration.class.getClassLoader();
try {
Class<? extends T> clazz = (Class<? extends T>) classloader.loadClass(className);
return configuration == null ? clazz.newInstance()
: clazz.getConstructor(Configuration.class).newInstance(configuration);
} catch (ClassNotFoundException e) {
LoggerFactory.getLogger(Configuration.class).error("Unable to find class {}", className, e);
} catch (InstantiationException e) {
LoggerFactory.getLogger(Configuration.class).error("Unable to instantiate class {}", className, e);
} catch (IllegalAccessException e) {
LoggerFactory.getLogger(Configuration.class).error("Unable to access class {}", className, e);
} catch (IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) {
LoggerFactory.getLogger(Configuration.class).error("Call constructor(Configuration) of class {} failed",
className, e);
}
}
return null;
}
}

View File

@ -633,8 +633,7 @@ public void run() {
// Set up the offset commit policy ...
if (offsetCommitPolicy == null) {
offsetCommitPolicy = config.getInstance(EmbeddedEngine.OFFSET_COMMIT_POLICY, OffsetCommitPolicy.class);
offsetCommitPolicy.configure(config);
offsetCommitPolicy = config.getInstance(EmbeddedEngine.OFFSET_COMMIT_POLICY, OffsetCommitPolicy.class, config);
}
// Initialize the connector using a context that does NOT respond to requests to reconfigure tasks ...

View File

@ -39,13 +39,10 @@ public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration time
*/
public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {
private Duration minimumTime;
private final Duration minimumTime;
@Override
public OffsetCommitPolicy configure(Configuration config) {
OffsetCommitPolicy.super.configure(config);
public PeriodicCommitOffsetPolicy(Configuration config) {
minimumTime = Duration.ofMillis(config.getLong(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS));
return this;
}
@Override
@ -59,7 +56,7 @@ static OffsetCommitPolicy always() {
}
static OffsetCommitPolicy periodic(Configuration config) {
return new PeriodicCommitOffsetPolicy().configure(config);
return new PeriodicCommitOffsetPolicy(config);
}
/**
@ -93,8 +90,4 @@ default OffsetCommitPolicy and(OffsetCommitPolicy other) {
if ( other == null ) return this;
return (number, time) -> this.performCommit(number, time) && other.performCommit(number, time);
}
default OffsetCommitPolicy configure(Configuration config) {
return this;
}
}