DBZ-1 Expanded documentation and improved EmbeddedConnector framework

Changed the EmbeddedConnector framework to initialize all major components via configuration properties rather than through the public builder. This increases the size of the configurations, but it simplifies what embedding applications must do to obtain an EmbeddedConnector instance.

The DatabaseHistory framework was also changed to be configurable in similar ways to the OffsetBackingStore. Essentially, connectors that want to use it (like the MySqlConnector) will describe it as part of the connector's configuration, allowing more flexibility in which DatabaseHistory implementation is used and how it is configured whether in Kafka Connector or as part of the EmbeddedConnector.

Added a README.md to `debezium-embedded` to provide documentation and sample code showing how to use the EmbeddedConnector.
This commit is contained in:
Randall Hauch 2016-02-03 14:11:53 -06:00
parent 0e58dba9d6
commit 37d6a5e7da
13 changed files with 369 additions and 107 deletions

View File

@ -106,11 +106,13 @@ public void start(Map<String, String> props) {
// Create and configure the database history ...
this.dbHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (this.dbHistory == null) {
this.logger.error("Unable to instantiate the database history class {}",config.getString(MySqlConnectorConfig.DATABASE_HISTORY));
this.logger.error("Unable to instantiate the database history class {}",
config.getString(MySqlConnectorConfig.DATABASE_HISTORY));
return;
}
Configuration dbHistoryConfig = config.subset(MySqlConnectorConfig.DATABASE_HISTORY.name() + ".", true);
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIG_PREFIX, false); // do not remove prefix
this.dbHistory.configure(dbHistoryConfig);
this.dbHistory.start();
// Read the configuration ...
final String user = config.getString(MySqlConnectorConfig.USER);
@ -241,12 +243,18 @@ public List<SourceRecord> poll() throws InterruptedException {
@Override
public void stop() {
try {
dbHistory.stop();
} catch (Throwable e) {
logger.error("Unexpected error shutting down the database history", e);
} finally {
try {
client.disconnect();
} catch (IOException e) {
logger.error("Unexpected error when disconnecting from the MySQL binary log reader", e);
}
}
}
/**
* Adds the event into the queue for subsequent batch processing.

View File

@ -6,10 +6,12 @@
package io.debezium.connector.mysql;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.util.Collect;
@ -65,17 +67,20 @@ public class MySqlConnectorConfig {
.withValidation(Field::isPositiveInteger);
public static final Field POLL_INTERVAL_MS = Field.create("poll.interval.ms")
.withDescription("Frequency in milliseconds to poll for new change events.")
.withDefault(1 * 1000)
.withDescription("Frequency in milliseconds to poll for new change events. Defaults to 1 second (1000 ms)")
.withDefault(TimeUnit.SECONDS.toMillis(1))
.withValidation(Field::isPositiveInteger);
public static final Field DATABASE_HISTORY = Field.create("database.history")
.withDescription("The name of the DatabaseHistory class that should be used to store and recover database schema changes. "
+ "The configuration properties for the history can be specified with the 'database.history.' prefix.")
+ "The configuration properties for the history can be specified with the '"
+ DatabaseHistory.CONFIG_PREFIX + "' prefix.")
.withDefault(KafkaDatabaseHistory.class.getName());
public static final Field INCLUDE_SCHEMA_CHANGES = Field.create("include.schema.changes")
.withDescription("Whether schema changes should be included in the change events")
.withDescription("Whether schema changes should be included in the "
+ "change events (in addition to storing in the database "
+ "history). The default is 'false'.")
.withDefault(false)
.withValidation(Field::isBoolean);

View File

@ -66,6 +66,17 @@ public static interface ConfigBuilder<C extends Configuration, B extends ConfigB
*/
B with(String key, String value);
/**
* Associate the given value with the specified key.
*
* @param key the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B with(String key, Object value) {
return with(key, value != null ? value.toString() : null);
}
/**
* Associate the given value with the specified key.
*
@ -130,6 +141,17 @@ default B with(String key, boolean value) {
*/
B withDefault(String key, String value);
/**
* If there is no field with the specified key, then associate the given value with the specified key.
*
* @param key the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(String key, Object value) {
return withDefault(key, value != null ? value.toString() : null);
}
/**
* If there is no field with the specified key, then associate the given value with the specified key.
*
@ -417,6 +439,11 @@ public Set<String> keys() {
public String getString(String key) {
return null;
}
@Override
public String toString() {
return "{}";
}
};
}
@ -1029,6 +1056,11 @@ public String getString(String key) {
String oldKey = newToOld.get(key);
return Configuration.this.getString(oldKey);
}
@Override
public String toString() {
return asProperties().toString();
}
};
}
@ -1053,6 +1085,11 @@ public Set<String> keys() {
public String getString(String key) {
return matcher.test(key) ? Configuration.this.getString(key) : null;
}
@Override
public String toString() {
return asProperties().toString();
}
};
}

View File

@ -311,6 +311,10 @@ public static boolean isInteger(String value) {
return true;
}
public static boolean isNonNegativeInteger(String value) {
return value != null ? Integer.parseInt(value) >= 0 : true;
}
public static boolean isPositiveInteger(String value) {
return value != null ? Integer.parseInt(value) > 0 : true;
}

View File

@ -32,6 +32,11 @@ public void configure(Configuration config) {
this.config = config;
}
@Override
public void start() {
// do nothing
}
@Override
public final void record(Map<String, ?> source, Map<String, ?> position, String databaseName, Tables schema, String ddl) {
storeRecord(new HistoryRecord(source, position, databaseName, ddl));
@ -56,7 +61,7 @@ public final void recover(Map<String, ?> source, Map<String, ?> position, Tables
protected abstract void recoverRecords(Tables schema, DdlParser ddlParser, Consumer<HistoryRecord> records);
@Override
public void shutdown() {
public void stop() {
// do nothing
}
}

View File

@ -20,12 +20,19 @@
*/
public interface DatabaseHistory {
public static final String CONFIG_PREFIX = "database.history.";
/**
* Configure this instance.
* @param config the configuration for this history store
*/
void configure(Configuration config);
/**
* Start the history.
*/
void start();
/**
* Record a change to the schema of the named database, and store it in the schema storage.
*
@ -55,5 +62,5 @@ public interface DatabaseHistory {
/**
* Stop recording history and release any resources acquired since {@link #configure(Configuration)}.
*/
void shutdown();
void stop();
}

View File

@ -35,7 +35,7 @@
public final class FileDatabaseHistory extends AbstractDatabaseHistory {
@SuppressWarnings("unchecked")
public static final Field FILE_PATH = Field.create("path")
public static final Field FILE_PATH = Field.create(CONFIG_PREFIX + "file.filename")
.withDescription("The path to the file that will be used to record the database history")
.withValidation(Field::isRequired);

View File

@ -37,12 +37,12 @@
public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
@SuppressWarnings("unchecked")
public static final Field TOPIC = Field.create("topic")
public static final Field TOPIC = Field.create(CONFIG_PREFIX + "kafka.topic")
.withDescription("The name of the topic for the database schema history")
.withValidation(Field::isRequired);
@SuppressWarnings("unchecked")
public static final Field BOOTSTRAP_SERVERS = Field.create("bootstrap.servers")
public static final Field BOOTSTRAP_SERVERS = Field.create(CONFIG_PREFIX + "kafka.bootstrap.servers")
.withDescription("A list of host/port pairs that the connector will use for establishing the initial "
+ "connection to the Kafka cluster for retrieving database schema history previously stored "
+ "by the connector. This should point to the same Kafka cluster used by the Kafka Connect "
@ -83,6 +83,11 @@ public void configure(Configuration config) {
.withDefault("key.deserializer", StringDeserializer.class.getName())
.withDefault("value.deserializer", StringDeserializer.class.getName())
.build();
}
@Override
public void start() {
super.start();
this.producer = new KafkaProducer<>(this.producerConfig.asProperties());
}
@ -114,12 +119,12 @@ protected void recoverRecords(Tables schema, DdlParser ddlParser, Consumer<Histo
}
@Override
public void shutdown() {
public void stop() {
try {
if (this.producer != null) this.producer.close();
} finally {
this.producer = null;
super.shutdown();
super.stop();
}
}
}

183
debezium-embedded/README.md Normal file
View File

@ -0,0 +1,183 @@
# Embedding Debezium connectors in applications
Debezium connectors are normally operated by deploying them to a Kafka Connect service, and configuring one or more connectors to monitor upstream databases and produce data change events for all changes that they sees in the upstream databases. Those data change events are written to Kafka, where they can be independently consumed by many different applications. Kafka Connect provides excellent fault tolerance and scalability, since it runs as a distributed service and ensures that all registered and configured connectors are always running. For example, even if one of the Kafka Connect endpoints in a cluster goes down, the remaining Kafka Connect endpoints will restart any connectors that were previously running on the now-terminated endpoint.
Not every applications needs this level of fault tolerance and reliability, and they may not want to rely upon an external cluster of Kafka brokers and Kafka Connect services. Instead, some applications would prefer to *embed* Debezium connectors directly within the application space. They still want the same data change events, but prefer to have the connectors send them directly to the application rather than persiste them inside Kafka.
This `debezium-embedded` module defines a small library that allows an application to easily configure and run Debezium connectors.
## Dependencies
To use this module, add the `debezium-embedded` module to your application's dependencies. For Maven, this entails adding the following to your application's POM:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
where `${version.debezium}` is either the version of Debezium you're using or a Maven property whose value contains the Debezium version string.
Likewise, add dependencies for any of Debezium's connectors that your application will use. For example, the following can be added to your application's Maven POM file so your application can use the MySQL connector:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
## In the code
Your application needs to set up an `EmbeddedConnector` for each connector instance you want to run. The `io.debezium.embedded.EmbeddedConnector` class serves as an easy-to-use wrapper around any Kafka Connect connector and completely manages the real connector's lifecycle. Basically, you create the `EmbeddedConnector` with a configuration that defines the environment for the `EmbeddedConnector` and the properties for the underlying connector. You also provide it with a function that the `EmbeddedConnector` will call whenever the underlying connector produces a data change event.
Let's see what this looks like when we use the MySQL connector:
// Define the configuration for the embedded and MySQL connector ...
Configuration config = Configuration.create()
.with(EmbeddedConnector.CONNECTOR_NAME, "file-connector")
.with(EmbeddedConnector.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
.with(MySqlConnectorConfig.HOSTNAME, "localhost")
.with(MySqlConnectorConfig.PORT, 3306)
.with(MySqlConnectorConfig.USER, "mysqluser")
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
.with(MySqlConnectorConfig.SERVER_ID, 85744)
.with(MySqlConnectorConfig.SERVER_NAME, "my-app-connector")
.build())
# Embedding Debezium connectors in applications
Debezium connectors are normally operated by deploying them to a Kafka Connect service, and configuring one or more connectors to monitor upstream databases and produce data change events for all changes that they sees in the upstream databases. Those data change events are written to Kafka, where they can be independently consumed by many different applications. Kafka Connect provides excellent fault tolerance and scalability, since it runs as a distributed service and ensures that all registered and configured connectors are always running. For example, even if one of the Kafka Connect endpoints in a cluster goes down, the remaining Kafka Connect endpoints will restart any connectors that were previously running on the now-terminated endpoint.
Not every applications needs this level of fault tolerance and reliability, and they may not want to rely upon an external cluster of Kafka brokers and Kafka Connect services. Instead, some applications would prefer to *embed* Debezium connectors directly within the application space. They still want the same data change events, but prefer to have the connectors send them directly to the application rather than persiste them inside Kafka.
This `debezium-embedded` module defines a small library that allows an application to easily configure and run Debezium connectors.
## Dependencies
To use this module, add the `debezium-embedded` module to your application's dependencies. For Maven, this entails adding the following to your application's POM:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
where `${version.debezium}` is either the version of Debezium you're using or a Maven property whose value contains the Debezium version string.
Likewise, add dependencies for any of Debezium's connectors that your application will use. For example, the following can be added to your application's Maven POM file so your application can use the MySQL connector:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
## In the code
Your application needs to set up an `EmbeddedConnector` for each connector instance you want to run. The `io.debezium.embedded.EmbeddedConnector` class serves as an easy-to-use wrapper around any Kafka Connect connector and completely manages the real connector's lifecycle. Basically, you create the `EmbeddedConnector` with a configuration that defines the environment for the `EmbeddedConnector` and the properties for the underlying connector. You also provide it with a function that the `EmbeddedConnector` will call whenever the underlying connector produces a data change event.
Here's an example of code that configures and runs an embedded MySQL connector:
// Define the configuration for the embedded and MySQL connector ...
Configuration config = Configuration.create()
/* begin embedded connector properties */
.with("name", "my-sql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/path/to/storage/offset.dat")
.with("offset.flush.interval.ms", 60000)
/* begin wrapped connector properties */
.with("database.hostname", "localhost")
.with("database.port", 3306)
.with("database.user", "mysqluser")
.with("database.password", "mysqlpw")
.with("server.id", 85744)
.with("server.name", "my-app-connector")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())
// Create the connector with this configuration ...
EmbeddedConnector connector = EmbeddedConnector.create()
.using(config)
.notifying(this::handleEvent)
.build();
// Run the connector asynchronously ...
Executor executor = ...
executor.execute(connector);
// At some later time ...
connector.stop();
Let's look into this code in more detail, starting with the first few lines that we repeat here:
// Define the configuration for the embedded and MySQL connector ...
Configuration config = Configuration.create()
/* begin embedded connector properties */
.with("name", "mysql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/path/to/storage/offset.dat")
.with("offset.flush.interval.ms", 60000)
This creates a new `Configuration` object and, using a builder-style API, sets several fields required by the `EmbeddedConnector`. The first is a name for the connector that will be used within the source records produced by the connector, so use something meaningful in your application. The `connector.class` field defines the name of the class that implements the Kafka Connect `org.apache.kafka.connect.source.SourceConnector` abstract class; in this example, we specify Debezium's `MySqlConnector` class.
When a Kafka Connect connector runs, it reads information from the source and periodically records "offsets" that define how much of that information it has processed. Should the connector be restarted, it will use the last recorded offset to know where in the source information it should resume reading. The next few fields define that the embedded connector should use the `FileOffsetBackingStore` class to store offsets in the `/path/to/storage/offset.dat` file on the local file system (which can be named anything). Additionally, although the connector produces offsets with each source record it produces, offsets should be flushed to the store once every minute. These fields can be tailored as needed for your application.
The next few lines define the fields that are specific to the `MySqlConnector`:
/* begin wrapped connector properties */
.with("database.hostname", "localhost")
.with("database.port", 3306)
.with("database.user", "mysqluser")
.with("database.password", "mysqlpw")
.with("server.id", 85744)
.with("server.name", "products")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())
Here, we set the name of the host machine and port number where the MySQL database server is running, and we define the username and password that will be used to connect to the MySQL database. The username and password should correspond to a MySQL database user that has been granted the [`REPLICATION SLAVE` privilege](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-repuser.html).
The configuration also includes a numeric identifier for the `server.id`. The `MySqlConnector` class joins the MySQL server group so that it can read the database server's binlog, so this server ID must be [unique within all processes that make up the MySQL server group](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-masterbaseconfig.html) and is any integer between 1 and (2^32)1.
The configuration also specifies a logical name for the server, which again is included in the topic field of every source records produced by the connector and allows your application to discern the origin of those records. Our example uses a server name of "products", presumably because the database contains product information. Of course, you can name this anything meaningful to your application.
When the `MySqlConnector` class runs, it reads the MySQL server's binlog that includes all data changes and schema changes that are made to the databases hosted by the server. The connector records the schema information so that, should the connector be restarted and resume reading from the last recorded offset, it knows exactly what the database schemas looked like at that point in time. Having accurate schema information is essential to properly decode the data changes recorded in the binlog. The last two fields of our configuration specify that our connector should use the `FileDatabaseHistory` class to store database schema history changes in the `/path/to/storage/dbhistory.dat` file on the local file system (which can be named anything).
Finally the immutable configuration is built using the `build()` method.
The next few lines of our sample code create the `EmbeddedConnector` instance, and are repeated here:
// Create the connector with this configuration ...
EmbeddedConnector connector = EmbeddedConnector.create()
.using(config)
.notifying(this::handleEvent)
.build();
Again, a fluent-style builder API is used to create a connector that uses our `Configuration` object and that sends all data change records to the `handleEvent(SourceRecord)` method. However, your application can reference any method that matches the signature of `java.util.function.Consumer<SourceRecord>` method, where `SourceRecord` is the `org.apache.kafka.connect.source.SourceRecord` class. However, your applications method that handles all `SourceRecord` objects produced by the connector should handle all possible errors; although any exception thrown by the method will be logged and the connector will continue operating with the next source record, your application will not have another chance to handle that particular source record.
At this point, we have an existing `EmbeddedConnector` object that is configured and ready to run, but it doesn't do anything. To execute the connector, we recommend having an `Executor` or `ExecutorService` execute the connector asynchronously:
// Run the connector asynchronously ...
Executor executor = ...
executor.execute(connector);
Your application can stop the connector safely and gracefully by calling the `stop()` method on the connector:
// At some later time ...
connector.stop();
The connector will stop reading information from the source system, forward all remaining `SourceRecord` objects to your handler function, and flush the latest offets to offset storage. Only after all of this completes will the connector execution complete. You can optionally wait for the connector to complete. For example, you can wait for at most 30 seconds using code similar to the following:
try {
connector.await(30, TimeUnit.SECONDS);
} catch ( InterruptedException e ) {
Thread.interrupted();
}
Your application will likely want to use the `boolean` response from `await(...)` to determine if the connector actually did complete before the 30 second timeout period, and if not to perhaps wait again. (Only if the connector is running on a daemon thread will the VM wait for the thread to complete before exiting.)
Note that care should be taken to properly stop the connector rather than simply shutdown the `ExecutorService`. Although the `EmbeddedConnector` will respond correctly when interrupted, interrupting the execution may happen during the application's handler function, which may result in inconsistent or incomplete handling of the source record. The result might be that upon restart the application receives some `SourceRecord`s that it had already processed prior to interruption. Therefore, using `stop()` is far superior since it allows the connector to gracefully complete all work and to ensure that the application never receives any duplicate source records or misses any source records.

View File

@ -25,7 +25,7 @@
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
@ -76,12 +76,32 @@ public final class EmbeddedConnector implements Runnable {
.withDescription("The Java class for the connector")
.withValidation(Field::isRequired);
/**
* An optional field that specifies the name of the class that implements the {@link OffsetBackingStore} interface,
* and that will be used to store offsets recorded by the connector.
*/
public static final Field OFFSET_STORAGE = Field.create("offset.storage")
.withDescription("The Java class that implements the `OffsetBackingStore` "
+ "interface, used to periodically store offsets so that, upon "
+ "restart, the connector can resume where it last left off.")
.withDefault(FileOffsetBackingStore.class.getName());
/**
* An optional advanced field that specifies the maximum amount of time that the embedded connector should wait
* for an offset commit to complete.
*/
@SuppressWarnings("unchecked")
public static final Field OFFSET_COMMIT_TIMEOUT_MS_CONFIG = Field.create("offset.flush.timeout.ms")
public static final Field OFFSET_FLUSH_INTERVAL_MS = Field.create("offset.flush.interval.ms")
.withDescription("Interval at which to try committing offsets. The default is 1 minute.")
.withDefault(60000L)
.withValidation(Field::isNonNegativeInteger);
/**
* An optional advanced field that specifies the maximum amount of time that the embedded connector should wait
* for an offset commit to complete.
*/
@SuppressWarnings("unchecked")
public static final Field OFFSET_COMMIT_TIMEOUT_MS = Field.create("offset.flush.timeout.ms")
.withDescription("Maximum number of milliseconds to wait for records to flush and partition offset data to be"
+ " committed to offset storage before cancelling the process and restoring the offset "
+ "data to be committed in a future attempt.")
@ -124,32 +144,6 @@ public static interface Builder {
*/
Builder using(Configuration config);
/**
* Use the specified {@link OffsetCommitPolicy} to determine when offsets should be written to offset storage.
* <p>
* Passing <code>null</code> or not calling this method results in the connector using all offsets
* {@link OffsetCommitPolicy#always() always} being committed after each batch of records are received from the source
* system and processed by the {@link #notifying(Consumer) consumer function}.
*
* @param policy the policy for when to commit offsets to the offset store
* @return this builder object so methods can be chained together; never null
*/
Builder using(OffsetCommitPolicy policy);
/**
* Use the specified storage mechanism for tracking how much data change history in the source database the connector
* has processed.
* <p>
* Passing <code>null</code> or not calling this method results in the connector storing offsets in-memory, which means
* when the application stops it will lose all record of how far the connector has read from the source database. If the
* application upon restart should resume reading the source database where it left off, then a durable store must be
* supplied.
*
* @param offsetStorage the store for recording connector offsets
* @return this builder object so methods can be chained together; never null
*/
Builder using(OffsetBackingStore offsetStorage);
/**
* Use the specified class loader to find all necessary classes. Passing <code>null</code> or not calling this method
* results in the connector using this class's class loader.
@ -186,8 +180,6 @@ public static interface Builder {
public static Builder create() {
return new Builder() {
private Configuration config;
private OffsetBackingStore offsetStore;
private OffsetCommitPolicy offsetCommitPolicy;
private Consumer<SourceRecord> consumer;
private ClassLoader classLoader;
private Clock clock;
@ -198,18 +190,6 @@ public Builder using(Configuration config) {
return this;
}
@Override
public Builder using(OffsetBackingStore offsetStore) {
this.offsetStore = offsetStore;
return this;
}
@Override
public Builder using(OffsetCommitPolicy policy) {
this.offsetCommitPolicy = policy;
return this;
}
@Override
public Builder using(ClassLoader classLoader) {
this.classLoader = classLoader;
@ -230,13 +210,11 @@ public Builder notifying(Consumer<SourceRecord> consumer) {
@Override
public EmbeddedConnector build() {
if (offsetStore == null) offsetStore = new MemoryOffsetBackingStore();
if (offsetCommitPolicy == null) offsetCommitPolicy = OffsetCommitPolicy.always();
if (classLoader == null) classLoader = getClass().getClassLoader();
if (clock == null) clock = Clock.system();
Objects.requireNonNull(config, "A connector configuration must be specified.");
Objects.requireNonNull(consumer, "A connector consumer must be specified.");
return new EmbeddedConnector(config, offsetStore, offsetCommitPolicy, classLoader, clock, consumer);
return new EmbeddedConnector(config, classLoader, clock, consumer);
}
};
@ -244,8 +222,6 @@ public EmbeddedConnector build() {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Configuration config;
private final OffsetBackingStore offsetStore;
private final OffsetCommitPolicy offsetCommitPolicy;
private final Clock clock;
private final ClassLoader classLoader;
private final Consumer<SourceRecord> consumer;
@ -256,17 +232,12 @@ public EmbeddedConnector build() {
private long recordsSinceLastCommit = 0;
private long timeSinceLastCommitMillis = 0;
private EmbeddedConnector(Configuration config, OffsetBackingStore offsetStore,
OffsetCommitPolicy offsetCommitPolicy, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer) {
private EmbeddedConnector(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer) {
this.config = config;
this.offsetStore = offsetStore;
this.offsetCommitPolicy = offsetCommitPolicy;
this.consumer = consumer;
this.classLoader = classLoader;
this.clock = clock;
assert this.config != null;
assert this.offsetStore != null;
assert this.offsetCommitPolicy != null;
assert this.consumer != null;
assert this.classLoader != null;
assert this.clock != null;
@ -302,7 +273,8 @@ protected boolean isRunning() {
* {@link #stop() stopped}.
* <p>
* Note that there are two ways to stop a connector running on a thread: calling {@link #stop()} from another thread, or
* interrupting the thread (e.g., via {@link ExecutorService#shutdownNow()}).
* interrupting the thread (e.g., via {@link ExecutorService#shutdownNow()}). However, interrupting the thread may result
* in source records being repeated upon next startup, so {@link #stop()} should always be used when possible.
*/
@Override
public void run() {
@ -324,6 +296,32 @@ public void run() {
return;
}
// Instantiate the offset store ...
final String offsetStoreClassName = config.getString(OFFSET_STORAGE);
OffsetBackingStore offsetStore = null;
try {
@SuppressWarnings("unchecked")
Class<? extends OffsetBackingStore> offsetStoreClass = (Class<OffsetBackingStore>) classLoader.loadClass(offsetStoreClassName);
offsetStore = offsetStoreClass.newInstance();
} catch (Throwable t) {
logger.error("Unable to instantiate OffsetBackingStore class {}", offsetStoreClassName, t);
return;
}
// Initialize the offset store ...
try {
offsetStore.configure(config.subset(OFFSET_STORAGE.name() + ".", false).asMap()); // subset but do not
// remove prefixes
offsetStore.start();
} catch (Throwable t) {
logger.error("Unable to configure and start the {} offset backing store", offsetStoreClassName, t);
return;
}
// Set up the offset commit policy ...
long offsetPeriodMs = config.getLong(OFFSET_FLUSH_INTERVAL_MS);
OffsetCommitPolicy offsetCommitPolicy = OffsetCommitPolicy.periodic(offsetPeriodMs, TimeUnit.MILLISECONDS);
// Initialize the connector using a context that does NOT respond to requests to reconfigure tasks ...
ConnectorContext context = () -> {};
connector.initialize(context);
@ -331,7 +329,7 @@ public void run() {
keyConverter, valueConverter);
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, connectorName,
keyConverter, valueConverter);
long commitTimeoutMs = config.getLong(OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
long commitTimeoutMs = config.getLong(OFFSET_COMMIT_TIMEOUT_MS);
try {
// Start the connector with the given properties and get the task configurations ...
@ -362,35 +360,45 @@ public void run() {
List<SourceRecord> changeRecords = task.poll(); // blocks until there are values ...
if (changeRecords != null && !changeRecords.isEmpty()) {
// First write out the last partition to offset storage ...
// First forward the records to the connector's consumer ...
for (SourceRecord record : changeRecords) {
try {
consumer.accept(record);
} catch (Throwable t) {
logger.error("Error in the application's handler method, but continuing anyway", t);
}
}
// Only then do we write out the last partition to offset storage ...
SourceRecord lastRecord = changeRecords.get(changeRecords.size() - 1);
lastRecord.sourceOffset();
offsetWriter.offset(lastRecord.sourcePartition(), lastRecord.sourceOffset());
// Now forward the records to the connector's consumer ...
for (SourceRecord record : changeRecords) {
consumer.accept(record);
}
// Flush the offsets to storage if necessary ...
recordsSinceLastCommit += changeRecords.size();
maybeFlush(offsetWriter, commitTimeoutMs);
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs);
}
} catch (InterruptedException e) {
// This thread was interrupted, which signals that the thread should stop work ...
// but first try to commit the offsets ...
maybeFlush(offsetWriter, commitTimeoutMs);
// This thread was interrupted, which signals that the thread should stop work.
// We first try to commit the offsets, since we record them only after the records were handled
// by the consumer ...
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs);
// Then clear the interrupted status ...
Thread.interrupted();
return;
break;
}
}
} catch (Throwable t) {
logger.error("Error while running to instantiate connector class {}", connectorClassName, t);
} finally {
// Close the offset storage and finally the connector ...
try {
offsetStore.stop();
} finally {
connector.stop();
}
}
}
} finally {
latch.countDown();
running.set(false);
@ -402,11 +410,12 @@ public void run() {
* Determine if we should flush offsets to storage, and if so then attempt to flush offsets.
*
* @param offsetWriter the offset storage writer; may not be null
* @param policy the offset commit policy; may not be null
* @param commitTimeoutMs the timeout to wait for commit results
*/
protected void maybeFlush(OffsetStorageWriter offsetWriter, long commitTimeoutMs) {
protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, long commitTimeoutMs) {
// Determine if we need to commit to offset storage ...
if (this.offsetCommitPolicy.performCommit(recordsSinceLastCommit, timeSinceLastCommitMillis,
if (policy.performCommit(recordsSinceLastCommit, timeSinceLastCommitMillis,
TimeUnit.MILLISECONDS)) {
long started = clock.currentTimeInMillis();

View File

@ -28,13 +28,15 @@ static OffsetCommitPolicy always() {
}
/**
* Get an {@link OffsetCommitPolicy} that will commit offsets no more than the specified time period.
* Get an {@link OffsetCommitPolicy} that will commit offsets no more than the specified time period. If the {@code minimumTime}
* is not positive, then this method returns {@link #always()}.
*
* @param minimumTime the minimum amount of time between committing offsets; must be positive
* @param minimumTime the minimum amount of time between committing offsets
* @param timeUnit the time unit for {@code minimumTime}; may not be null
* @return the offset commit policy; never null
*/
static OffsetCommitPolicy periodic(long minimumTime, TimeUnit timeUnit) {
if ( minimumTime <= 0 ) return always();
return (number, actualTime, actualUnit) -> {
return timeUnit.convert(actualTime, actualUnit) >= minimumTime;
};

View File

@ -20,8 +20,7 @@
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -39,7 +38,8 @@ public class EmbeddedConnectorTest implements Testing {
private static final int NUMBER_OF_LINES = 10;
private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt");
private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath();
private static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("file-connector-offsets.txt").toAbsolutePath();
private static final Charset UTF8 = StandardCharsets.UTF_8;
private ExecutorService executor;
@ -48,7 +48,6 @@ public class EmbeddedConnectorTest implements Testing {
private BlockingQueue<SourceRecord> consumedLines;
private int nextConsumedLineNumber;
private int linesAdded;
private OffsetBackingStore offsetStore;
@Before
public void beforeEach() throws Exception {
@ -56,6 +55,7 @@ public void beforeEach() throws Exception {
linesAdded = 0;
consumedLines = new ArrayBlockingQueue<>(100);
Testing.Files.delete(TEST_FILE_PATH);
Testing.Files.delete(OFFSET_STORE_PATH);
inputFile = Testing.Files.createTestingFile(TEST_FILE_PATH);
executor = Executors.newFixedThreadPool(1);
}
@ -67,9 +67,6 @@ public void afterEach() {
@Test
public void shouldStartAndUseFileConnectorUsingMemoryOffsetStorage() throws Exception {
// Set up the offset store ...
offsetStore = new MemoryOffsetBackingStore();
// Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES);
@ -78,11 +75,11 @@ public void shouldStartAndUseFileConnectorUsingMemoryOffsetStorage() throws Exce
.using(Configuration.create()
.with(EmbeddedConnector.CONNECTOR_NAME, "file-connector")
.with(EmbeddedConnector.CONNECTOR_CLASS, FileStreamSourceConnector.class.getName())
.with(FileStreamSourceConnector.FILE_CONFIG, inputFile.getAbsolutePath())
.with(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedConnector.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH)
.with(FileStreamSourceConnector.TOPIC_CONFIG, "topicX")
.build())
.using(offsetStore)
.using(OffsetCommitPolicy.always())
.notifying(consumedLines::add)
.build();

View File

@ -24,7 +24,7 @@
<scm>
<connection>scm:git:git@github.com:debezium/debezium.git</connection>
<developerConnection>scm:git:git@github.com:debezium/debezium.git</developerConnection>
<url>git@github.com:debezium/debezium.git</url>
<url>https://github.com/debezium/debezium</url>
</scm>
<issueManagement>
<system>jira</system>