DBZ-1 Renamed EmbeddedConnector to EmbeddedEngine and improved README

This commit is contained in:
Randall Hauch 2016-02-03 15:33:57 -06:00
parent 37d6a5e7da
commit fbae6d75c8
5 changed files with 82 additions and 115 deletions

View File

@ -1,6 +1,6 @@
# Embedding Debezium connectors in applications
Debezium connectors are normally operated by deploying them to a Kafka Connect service, and configuring one or more connectors to monitor upstream databases and produce data change events for all changes that they sees in the upstream databases. Those data change events are written to Kafka, where they can be independently consumed by many different applications. Kafka Connect provides excellent fault tolerance and scalability, since it runs as a distributed service and ensures that all registered and configured connectors are always running. For example, even if one of the Kafka Connect endpoints in a cluster goes down, the remaining Kafka Connect endpoints will restart any connectors that were previously running on the now-terminated endpoint.
Debezium connectors are normally operated by deploying them to a Kafka Connect service, and configuring one or more connectors to monitor upstream databases and produce data change events for all changes that they sees in the upstream databases. Those data change events are written to Kafka, where they can be independently consumed by many different applications. Kafka Connect provides excellent fault tolerance and scalability, since it runs as a distributed service and ensures that all registered and configured connectors are always running. For example, even if one of the Kafka Connect endpoints in a cluster goes down, the remaining Kafka Connect endpoints will restart any connectors that were previously running on the now-terminated endpoint, minimizing downtime and eliminating administrative activities.
Not every applications needs this level of fault tolerance and reliability, and they may not want to rely upon an external cluster of Kafka brokers and Kafka Connect services. Instead, some applications would prefer to *embed* Debezium connectors directly within the application space. They still want the same data change events, but prefer to have the connectors send them directly to the application rather than persiste them inside Kafka.
@ -18,7 +18,7 @@ To use this module, add the `debezium-embedded` module to your application's dep
where `${version.debezium}` is either the version of Debezium you're using or a Maven property whose value contains the Debezium version string.
Likewise, add dependencies for any of Debezium's connectors that your application will use. For example, the following can be added to your application's Maven POM file so your application can use the MySQL connector:
Likewise, add dependencies for each the Debezium connectors that your application will use. For example, the following can be added to your application's Maven POM file so your application can use the MySQL connector:
<dependency>
<groupId>io.debezium</groupId>
@ -28,64 +28,19 @@ Likewise, add dependencies for any of Debezium's connectors that your applicatio
## In the code
Your application needs to set up an `EmbeddedConnector` for each connector instance you want to run. The `io.debezium.embedded.EmbeddedConnector` class serves as an easy-to-use wrapper around any Kafka Connect connector and completely manages the real connector's lifecycle. Basically, you create the `EmbeddedConnector` with a configuration that defines the environment for the `EmbeddedConnector` and the properties for the underlying connector. You also provide it with a function that the `EmbeddedConnector` will call whenever the underlying connector produces a data change event.
Let's see what this looks like when we use the MySQL connector:
// Define the configuration for the embedded and MySQL connector ...
Configuration config = Configuration.create()
.with(EmbeddedConnector.CONNECTOR_NAME, "file-connector")
.with(EmbeddedConnector.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
.with(MySqlConnectorConfig.HOSTNAME, "localhost")
.with(MySqlConnectorConfig.PORT, 3306)
.with(MySqlConnectorConfig.USER, "mysqluser")
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
.with(MySqlConnectorConfig.SERVER_ID, 85744)
.with(MySqlConnectorConfig.SERVER_NAME, "my-app-connector")
.build())
# Embedding Debezium connectors in applications
Debezium connectors are normally operated by deploying them to a Kafka Connect service, and configuring one or more connectors to monitor upstream databases and produce data change events for all changes that they sees in the upstream databases. Those data change events are written to Kafka, where they can be independently consumed by many different applications. Kafka Connect provides excellent fault tolerance and scalability, since it runs as a distributed service and ensures that all registered and configured connectors are always running. For example, even if one of the Kafka Connect endpoints in a cluster goes down, the remaining Kafka Connect endpoints will restart any connectors that were previously running on the now-terminated endpoint.
Not every applications needs this level of fault tolerance and reliability, and they may not want to rely upon an external cluster of Kafka brokers and Kafka Connect services. Instead, some applications would prefer to *embed* Debezium connectors directly within the application space. They still want the same data change events, but prefer to have the connectors send them directly to the application rather than persiste them inside Kafka.
This `debezium-embedded` module defines a small library that allows an application to easily configure and run Debezium connectors.
## Dependencies
To use this module, add the `debezium-embedded` module to your application's dependencies. For Maven, this entails adding the following to your application's POM:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
where `${version.debezium}` is either the version of Debezium you're using or a Maven property whose value contains the Debezium version string.
Likewise, add dependencies for any of Debezium's connectors that your application will use. For example, the following can be added to your application's Maven POM file so your application can use the MySQL connector:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
## In the code
Your application needs to set up an `EmbeddedConnector` for each connector instance you want to run. The `io.debezium.embedded.EmbeddedConnector` class serves as an easy-to-use wrapper around any Kafka Connect connector and completely manages the real connector's lifecycle. Basically, you create the `EmbeddedConnector` with a configuration that defines the environment for the `EmbeddedConnector` and the properties for the underlying connector. You also provide it with a function that the `EmbeddedConnector` will call whenever the underlying connector produces a data change event.
Your application needs to set up an embedded engine for each connector instance you want to run. The `io.debezium.embedded.EmbeddedEngine` class serves as an easy-to-use wrapper around any standard Kafka Connect connector and completely manages the connector's lifecycle. Basically, you create the `EmbeddedEngine` with a configuration (perhaps loaded from a properties file) that defines the environment for both the engine and the connector. You also provide the engine with a function that the it will call for every data change event produced by the connector.
Here's an example of code that configures and runs an embedded MySQL connector:
// Define the configuration for the embedded and MySQL connector ...
Configuration config = Configuration.create()
/* begin embedded connector properties */
/* begin engine properties */
.with("name", "my-sql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/path/to/storage/offset.dat")
.with("offset.flush.interval.ms", 60000)
/* begin wrapped connector properties */
/* begin connector properties */
.with("database.hostname", "localhost")
.with("database.port", 3306)
.with("database.user", "mysqluser")
@ -96,37 +51,37 @@ Here's an example of code that configures and runs an embedded MySQL connector:
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())
// Create the connector with this configuration ...
EmbeddedConnector connector = EmbeddedConnector.create()
.using(config)
.notifying(this::handleEvent)
.build();
// Create the engine with this configuration ...
EmbeddedEngine engine = EmbeddedEngine.create()
.using(config)
.notifying(this::handleEvent)
.build();
// Run the connector asynchronously ...
// Run the engine asynchronously ...
Executor executor = ...
executor.execute(connector);
executor.execute(engine);
// At some later time ...
connector.stop();
engine.stop();
Let's look into this code in more detail, starting with the first few lines that we repeat here:
// Define the configuration for the embedded and MySQL connector ...
Configuration config = Configuration.create()
/* begin embedded connector properties */
/* begin engine properties */
.with("name", "mysql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/path/to/storage/offset.dat")
.with("offset.flush.interval.ms", 60000)
This creates a new `Configuration` object and, using a builder-style API, sets several fields required by the `EmbeddedConnector`. The first is a name for the connector that will be used within the source records produced by the connector, so use something meaningful in your application. The `connector.class` field defines the name of the class that implements the Kafka Connect `org.apache.kafka.connect.source.SourceConnector` abstract class; in this example, we specify Debezium's `MySqlConnector` class.
This creates a new `Configuration` object and uses a fluent-style builder API to set several fields required by the engine regardless of which connector is being used. The first is a name for the engine that will be used within the source records produced by the connector and its internal state, so use something meaningful in your application. The `connector.class` field defines the name of the class that extends the Kafka Connect `org.apache.kafka.connect.source.SourceConnector` abstract class; in this example, we specify Debezium's `MySqlConnector` class.
When a Kafka Connect connector runs, it reads information from the source and periodically records "offsets" that define how much of that information it has processed. Should the connector be restarted, it will use the last recorded offset to know where in the source information it should resume reading. The next few fields define that the embedded connector should use the `FileOffsetBackingStore` class to store offsets in the `/path/to/storage/offset.dat` file on the local file system (which can be named anything). Additionally, although the connector produces offsets with each source record it produces, offsets should be flushed to the store once every minute. These fields can be tailored as needed for your application.
When a Kafka Connect connector runs, it reads information from the source and periodically records "offsets" that define how much of that information it has processed. Should the connector be restarted, it will use the last recorded offset to know where in the source information it should resume reading. Since connectors don't know or care *how* the offsets are stored, it is up to the engine to provide a way to store and recover these offsets. The next few fields of our configuration specify that our engine should use the `FileOffsetBackingStore` class to store offsets in the `/path/to/storage/offset.dat` file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in our case, once each minute). These fields can be tailored as needed for your application.
The next few lines define the fields that are specific to the `MySqlConnector`:
The next few lines define the fields that are specific to the connector, which in our example is the `MySqlConnector` connector:
/* begin wrapped connector properties */
/* begin connector properties */
.with("database.hostname", "localhost")
.with("database.port", 3306)
.with("database.user", "mysqluser")
@ -137,47 +92,58 @@ The next few lines define the fields that are specific to the `MySqlConnector`:
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())
Here, we set the name of the host machine and port number where the MySQL database server is running, and we define the username and password that will be used to connect to the MySQL database. The username and password should correspond to a MySQL database user that has been granted the [`REPLICATION SLAVE` privilege](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-repuser.html).
Here, we set the name of the host machine and port number where the MySQL database server is running, and we define the username and password that will be used to connect to the MySQL database. Note that for MySQL the username and password should correspond to a MySQL database user that has been granted the [`REPLICATION SLAVE` privilege](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-repuser.html), allowing the database to read the server's binlog that is normally used for MySQL replication.
The configuration also includes a numeric identifier for the `server.id`. The `MySqlConnector` class joins the MySQL server group so that it can read the database server's binlog, so this server ID must be [unique within all processes that make up the MySQL server group](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-masterbaseconfig.html) and is any integer between 1 and (2^32)1.
The configuration also includes a numeric identifier for the `server.id`. Since MySQL's binlog is part of the MySQL replication mechanism, in order to read the binlog the `MySqlConnector` instance must join the MySQL server group, and that means this server ID must be [unique within all processes that make up the MySQL server group](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-masterbaseconfig.html) and is any integer between 1 and (2^32)1. In our code we set it to a fairly large but somewhat random value we'll use only for our application.
The configuration also specifies a logical name for the server, which again is included in the topic field of every source records produced by the connector and allows your application to discern the origin of those records. Our example uses a server name of "products", presumably because the database contains product information. Of course, you can name this anything meaningful to your application.
The configuration also specifies a logical name for the MySQL server. The connector includes this logical name within the topic field of every source record it produces, enabling your application to discern the origin of those records. Our example uses a server name of "products", presumably because the database contains product information. Of course, you can name this anything meaningful to your application.
When the `MySqlConnector` class runs, it reads the MySQL server's binlog that includes all data changes and schema changes that are made to the databases hosted by the server. The connector records the schema information so that, should the connector be restarted and resume reading from the last recorded offset, it knows exactly what the database schemas looked like at that point in time. Having accurate schema information is essential to properly decode the data changes recorded in the binlog. The last two fields of our configuration specify that our connector should use the `FileDatabaseHistory` class to store database schema history changes in the `/path/to/storage/dbhistory.dat` file on the local file system (which can be named anything).
When the `MySqlConnector` class runs, it reads the MySQL server's binlog, which includes all data changes and schema changes made to the databases hosted by the server. Since all changes to data are structured in terms of the owning table's schema at the time the change was recorded, the connector needs to track all of the schema changes so that it can properly decode the change events. The connector records the schema information so that, should the connector be restarted and resume reading from the last recorded offset, it knows exactly what the database schemas looked like at that offset. How the connector records the database schema history is defined in the last two fields of our configuration, namely that our connector should use the `FileDatabaseHistory` class to store database schema history changes in the `/path/to/storage/dbhistory.dat` file on the local file system (again, this file can be named anything and stored anywhere).
Finally the immutable configuration is built using the `build()` method.
Finally the immutable configuration is built using the `build()` method. (Incidentally, rather than build it programmatically, we could have *read* the configuration from a properties file using one of the `Configuration.read(...)` methods.)
The next few lines of our sample code create the `EmbeddedConnector` instance, and are repeated here:
Now that we have a configuration, we can create our engine. Here again are the relevant lines of code:
// Create the connector with this configuration ...
EmbeddedConnector connector = EmbeddedConnector.create()
.using(config)
.notifying(this::handleEvent)
.build();
// Create the engine with this configuration ...
EmbeddedEngine engine = EmbeddedEngine.create()
.using(config)
.notifying(this::handleEvent)
.build();
Again, a fluent-style builder API is used to create a connector that uses our `Configuration` object and that sends all data change records to the `handleEvent(SourceRecord)` method. However, your application can reference any method that matches the signature of `java.util.function.Consumer<SourceRecord>` method, where `SourceRecord` is the `org.apache.kafka.connect.source.SourceRecord` class. However, your applications method that handles all `SourceRecord` objects produced by the connector should handle all possible errors; although any exception thrown by the method will be logged and the connector will continue operating with the next source record, your application will not have another chance to handle that particular source record.
A fluent-style builder API is used to create an engine that uses our `Configuration` object and that sends all data change records to the `handleEvent(SourceRecord)` method, which can be any method that matches the signature of the `java.util.function.Consumer<SourceRecord>` functional interface, where `SourceRecord` is the `org.apache.kafka.connect.source.SourceRecord` class. Note that your application's handler function should not throw any exceptions; if it does, the engine will log any exception thrown by the method and will continue to operate on the next source record, but your application will not have another chance to handle the particular source record that caused the exception, meaning your application might become inconsistent with the database.
At this point, we have an existing `EmbeddedConnector` object that is configured and ready to run, but it doesn't do anything. To execute the connector, we recommend having an `Executor` or `ExecutorService` execute the connector asynchronously:
At this point, we have an existing `EmbeddedEngine` object that is configured and ready to run, but it doesn't do anything. The `EmbeddedEngine` is designed to be executed asynchronously by an `Executor` or `ExecutorService`:
// Run the connector asynchronously ...
// Run the engine asynchronously ...
Executor executor = ...
executor.execute(connector);
executor.execute(engine);
Your application can stop the connector safely and gracefully by calling the `stop()` method on the connector:
Your application can stop the engine safely and gracefully by calling its `stop()` method:
// At some later time ...
connector.stop();
engine.stop();
The connector will stop reading information from the source system, forward all remaining `SourceRecord` objects to your handler function, and flush the latest offets to offset storage. Only after all of this completes will the connector execution complete. You can optionally wait for the connector to complete. For example, you can wait for at most 30 seconds using code similar to the following:
The engine's connector will stop reading information from the source system, forward all remaining `SourceRecord` objects to your handler function, and flush the latest offets to offset storage. Only after all of this completes will the engine's `run()` method return. If your application needs to wait for the engine to completely stop before exiting, you can do this with the engine's `await(...)` method:
try {
connector.await(30, TimeUnit.SECONDS);
while (!connector.await(30, TimeUnit.SECONDS)) {
logger.info("Wating another 30 seconds for the embedded enging to shut down");
}
} catch ( InterruptedException e ) {
Thread.interrupted();
}
Your application will likely want to use the `boolean` response from `await(...)` to determine if the connector actually did complete before the 30 second timeout period, and if not to perhaps wait again. (Only if the connector is running on a daemon thread will the VM wait for the thread to complete before exiting.)
Recall that when the JVM shuts down, it only waits for daemon threads. Therefore, if your application exits, be sure to wait for completion of the engine or alternatively run the engine on a daemon thread.
Note that care should be taken to properly stop the connector rather than simply shutdown the `ExecutorService`. Although the `EmbeddedConnector` will respond correctly when interrupted, interrupting the execution may happen during the application's handler function, which may result in inconsistent or incomplete handling of the source record. The result might be that upon restart the application receives some `SourceRecord`s that it had already processed prior to interruption. Therefore, using `stop()` is far superior since it allows the connector to gracefully complete all work and to ensure that the application never receives any duplicate source records or misses any source records.
Your application should always properly stop the engine to ensure graceful and complete shutdown and that each source record is sent to the application exactly one time. For example, do not rely upon shutting down the `ExecutorService`, since that interrupts the running threads. Although the `EmbeddedEngine` will indeed terminate when its thread is interrupted, the engine may not terminate cleanly, and when your application is restarted it may see some of the same source records that it had processed just prior to the shutdown.
## Handling failures
When the engine executes, its connector is actively recording the source offset inside each source record, and the engine is periodically flushing those offsets to persistent storage. When the application and engine shutdown normally or crash, when they are restarted the engine and its connector will resume reading the source information *from the last recorded offset*.
So, what happens when your application fails while an embedded engine is running? The net effect is that the application will likely receive some source records after restart that it had already processed right before the crash. How many depends upon how frequently the engine flushes offsets to its store (via the `offset.flush.interval.ms` property) and how many source records the specific connector returns in one batch. The best case is that the offsets are flushed every time (e.g., `offset.flush.interval.ms` is set to 0), but even then the embedded engine will still only flush the offsets after each batch of source records is received from the connector.
For example, the MySQL connector uses the `max.batch.size` to specify the maximum number of source records that can appear in a batch. Even with `offset.flush.interval.ms` is set to 0, when an application restarts after a crash it may see up to *n* duplicates, where *n* is the size of the batches. If the `offset.flush.interval.ms` property is set higher, then the application may see up to `n * m` duplicates, where *n* is the maximum size of the batches and *m* is the number of batches that might accumulate during a single offset flush interval. (Obviously it is possible to configure embedded connectors to use no batching and to always flush offsets, resulting in an application never receiving any duplicate source records. However, this dramatically increases the overhead and decreases the throughput of the connectors.)
The bottom line is that when using embedded connectors, applications will receive each source record exactly once during normal operation (including restart after a graceful shutdown), but do need to be tolerant of receiving duplicate events immediately following a restart after a crash or improper shutdown. If applications need more rigorous exactly-once behavior, then they should use the full Debezium platform that can provide exactly-once guarantees (even after crashes and restarts).

View File

@ -58,13 +58,13 @@
* @author Randall Hauch
*/
@ThreadSafe
public final class EmbeddedConnector implements Runnable {
public final class EmbeddedEngine implements Runnable {
/**
* A required field for an embedded connector that specifies the unique name for the connector instance.
*/
@SuppressWarnings("unchecked")
public static final Field CONNECTOR_NAME = Field.create("name")
public static final Field ENGINE_NAME = Field.create("name")
.withDescription("Unique name for this connector instance.")
.withValidation(Field::isRequired);
@ -119,10 +119,10 @@ public final class EmbeddedConnector implements Runnable {
/**
* The array of fields that are required by each connectors.
*/
public static final Field[] CONNECTOR_FIELDS = { CONNECTOR_NAME, CONNECTOR_CLASS };
public static final Field[] CONNECTOR_FIELDS = { ENGINE_NAME, CONNECTOR_CLASS };
/**
* A builder to set up and create {@link EmbeddedConnector} instances.
* A builder to set up and create {@link EmbeddedEngine} instances.
*/
public static interface Builder {
@ -169,11 +169,11 @@ public static interface Builder {
* @throws IllegalArgumentException if a {@link #using(Configuration) configuration} or {@link #notifying(Consumer)
* consumer function} were not supplied before this method is called
*/
EmbeddedConnector build();
EmbeddedEngine build();
}
/**
* Obtain a new {@link Builder} instance that can be used to construct runnable {@link EmbeddedConnector} instances.
* Obtain a new {@link Builder} instance that can be used to construct runnable {@link EmbeddedEngine} instances.
*
* @return the new builder; never null
*/
@ -209,12 +209,12 @@ public Builder notifying(Consumer<SourceRecord> consumer) {
}
@Override
public EmbeddedConnector build() {
public EmbeddedEngine build() {
if (classLoader == null) classLoader = getClass().getClassLoader();
if (clock == null) clock = Clock.system();
Objects.requireNonNull(config, "A connector configuration must be specified.");
Objects.requireNonNull(consumer, "A connector consumer must be specified.");
return new EmbeddedConnector(config, classLoader, clock, consumer);
return new EmbeddedEngine(config, classLoader, clock, consumer);
}
};
@ -232,7 +232,7 @@ public EmbeddedConnector build() {
private long recordsSinceLastCommit = 0;
private long timeSinceLastCommitMillis = 0;
private EmbeddedConnector(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer) {
private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer) {
this.config = config;
this.consumer = consumer;
this.classLoader = classLoader;
@ -284,7 +284,7 @@ public void run() {
try {
if (config.validate(CONNECTOR_FIELDS, logger::error)) {
// Instantiate the connector ...
final String connectorName = config.getString(CONNECTOR_NAME);
final String engineName = config.getString(ENGINE_NAME);
final String connectorClassName = config.getString(CONNECTOR_CLASS);
SourceConnector connector = null;
try {
@ -325,9 +325,9 @@ public void run() {
// Initialize the connector using a context that does NOT respond to requests to reconfigure tasks ...
ConnectorContext context = () -> {};
connector.initialize(context);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connectorName,
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName,
keyConverter, valueConverter);
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, connectorName,
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName,
keyConverter, valueConverter);
long commitTimeoutMs = config.getLong(OFFSET_COMMIT_TIMEOUT_MS);
@ -479,6 +479,6 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
@Override
public String toString() {
return "EmbeddedConnector{id=" + config.getString(CONNECTOR_NAME) + '}';
return "EmbeddedConnector{id=" + config.getString(ENGINE_NAME) + '}';
}
}

View File

@ -15,7 +15,7 @@
* @author Randall Hauch
*/
@FunctionalInterface
public interface OffsetCommitPolicy {
interface OffsetCommitPolicy {
/**
* Get an {@link OffsetCommitPolicy} that will commit offsets as frequently as possible. This may result in reduced

View File

@ -20,24 +20,25 @@
* restarts its connector, the connector can continue processing exactly where it left off.
* <h2>Usage</h2>
* <p>
* Applications do not directly work with Debezium connectors, but instead use the {@link io.debezium.embedded.EmbeddedConnector}
* class to configure and build an {@link io.debezium.embedded.EmbeddedConnector} instance that wraps and completely manages
* a standard Debezium connector. The application also provides, among other things, a function that the EmbeddedConnector will
* use to deliver data change events to the application.
* Applications do not directly work with Debezium connectors, but instead configure and build an
* {@link io.debezium.embedded.EmbeddedEngine} instance that wraps and completely manages a single standard Debezium connector.
* The application also provides the engine with a function that it will use to deliver data change events to the application.
* <p>
* Once the application has configured its {@link io.debezium.embedded.EmbeddedConnector} instance and is ready to start receiving
* data change events, the application submits the EmbeddedConnector to an {@link java.util.concurrent.Executor} or
* {@link java.util.concurrent.ExecutorService} managed by the application. The EmbeddedConnector's
* Once the application has configured its {@link io.debezium.embedded.EmbeddedEngine} instance and is ready to start receiving
* data change events, the application submits the EmbeddedEngine to an {@link java.util.concurrent.Executor} or
* {@link java.util.concurrent.ExecutorService} managed by the application. The EmbeddedEngine's
* {@link io.debezium.embedded.EmbeddedConnector#run()} method will start the standard Debezium connector and continuously
* deliver any data changes events to the application.
* <p>
* When the application is ready to shut down the connector, it should call {@link EmbeddedConnector#stop()} on the
* EmbeddedConnector, which will then stop monitoring the source database, complete any current work, and gracefully shut down.
* The application can wait for the connector to complete by using the
* {@link io.debezium.embedded.EmbeddedConnector#await(long, java.util.concurrent.TimeUnit)} method.
* When the application is ready to shut down the engine, it should call {@link io.debezium.embedded.EmbeddedEngine#stop()} on the
* engine, which will then stop the connector and have it gracefully complete all current work and shut down.
* The application can wait for the engine to complete by using the
* {@link io.debezium.embedded.EmbeddedEngine#await(long, java.util.concurrent.TimeUnit)} method.
* <h2>Storing connector state</h2>
* <p>
* As Debezium connectors operate, they keep track of which information from the source database they have processed, and they
* All connector state is managed by components defined in the engine's configuration.
* <p>
* As Debezium connectors operate, they keep track of how much information from the source database they have processed, and they
* record this <em>offset information</em> in an {@link org.apache.kafka.connect.storage.OffsetBackingStore}. Kafka Connect
* provides several implementations that can be used by an application, including a
* {@link org.apache.kafka.connect.storage.FileOffsetBackingStore file-based store} and an

View File

@ -43,7 +43,7 @@ public class EmbeddedConnectorTest implements Testing {
private static final Charset UTF8 = StandardCharsets.UTF_8;
private ExecutorService executor;
private EmbeddedConnector connector;
private EmbeddedEngine connector;
private File inputFile;
private BlockingQueue<SourceRecord> consumedLines;
private int nextConsumedLineNumber;
@ -71,12 +71,12 @@ public void shouldStartAndUseFileConnectorUsingMemoryOffsetStorage() throws Exce
appendLinesToSource(NUMBER_OF_LINES);
// Create the connector ...
connector = EmbeddedConnector.create()
connector = EmbeddedEngine.create()
.using(Configuration.create()
.with(EmbeddedConnector.CONNECTOR_NAME, "file-connector")
.with(EmbeddedConnector.CONNECTOR_CLASS, FileStreamSourceConnector.class.getName())
.with(EmbeddedEngine.ENGINE_NAME, "file-connector")
.with(EmbeddedEngine.CONNECTOR_CLASS, FileStreamSourceConnector.class.getName())
.with(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedConnector.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH)
.with(FileStreamSourceConnector.TOPIC_CONFIG, "topicX")
.build())