DBZ-234 Create API module with engine API

This commit is contained in:
Jiri Pechanec 2020-02-10 10:28:37 +01:00 committed by Gunnar Morling
parent 436615934d
commit 38df24a76b
15 changed files with 787 additions and 223 deletions

76
debezium-api/pom.xml Normal file
View File

@ -0,0 +1,76 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-api</artifactId>
<name>Debezium API</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<!-- Used for unit testing with Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${version.kafka.scala}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>**/build.properties</include>
</includes>
</resource>
</resources>
</build>
</project>

View File

@ -0,0 +1,23 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* Marks the annotated element as incubating. The contract of incubating elements (e.g. packages, types, methods,
* constants etc.) is under active development and may be incompatibly altered - or removed - in subsequent releases.
* <p>
* Usage of incubating API/SPI members is encouraged (so the development team can get feedback on these new features)
* but you should be prepared for updating code which is using them as needed.
*
*/
@Documented
@Retention(RetentionPolicy.CLASS)
public @interface Incubating {
}

View File

@ -0,0 +1,239 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.engine;
import java.time.Clock;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.Incubating;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.errors.DebeziumException;
/**
* A mechanism for running a single Kafka Connect {@link SourceConnector} within an application's process. The engine
* is entirely standalone and only talks with the source system; no Kafka, Kafka Connect, or Zookeeper processes are needed.
* Applications using the engine simply set one up and supply a {@link Consumer consumer function} to which the
* engine will pass all records containing database change events.
* <p>
* With the engine, the application that runs the connector assumes all responsibility for fault tolerance,
* scalability, and durability. Additionally, applications must specify how the engine can store its relational database
* schema history and offsets. By default, this information will be stored in memory and will thus be lost upon application
* restart.
* <p>
* Engine Is designed to be submitted to an {@link Executor} or {@link ExecutorService} for execution by a single
* thread, and a running connector can be stopped either by calling {@link #stop()} from another thread or by interrupting
* the running thread (e.g., as is the case with {@link ExecutorService#shutdownNow()}).
*
* @author Randall Hauch
*/
@Incubating
public interface DebeziumEngine<R> extends Runnable {
public static final String OFFSET_FLUSH_INTERVAL_MS_PROP = "offset.flush.interval.ms";
/**
* A callback function to be notified when the connector completes.
*/
public interface CompletionCallback {
/**
* Handle the completion of the embedded connector engine.
*
* @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error
* that prevented startup or premature termination.
* @param message the completion message; never null
* @param error the error, or null if there was no exception
*/
void handle(boolean success, String message, Throwable error);
}
/**
* Callback function which informs users about the various stages a connector goes through during startup
*/
public interface ConnectorCallback {
/**
* Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has
* completed successfully
*/
default void connectorStarted() {
// nothing by default
}
/**
* Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has
* completed successfully
*/
default void connectorStopped() {
// nothing by default
}
/**
* Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has
* completed successfully
*/
default void taskStarted() {
// nothing by default
}
/**
* Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has
* completed successfully
*/
default void taskStopped() {
// nothing by default
}
}
/**
* Contract passed to {@link ChangeConsumer}s, allowing them to commit single records as they have been processed
* and to signal that offsets may be flushed eventually.
*/
public static interface RecordCommitter<R> {
/**
* Marks a single record as processed, must be called for each
* record.
*
* @param record the record to commit
*/
void markProcessed(R record) throws InterruptedException;
/**
* Marks a batch as finished, this may result in committing offsets/flushing
* data.
* <p>
* Should be called when a batch of records is finished being processed.
*/
void markBatchFinished();
}
/**
* A contract invoked by the embedded engine when it has received a batch of change records to be processed. Allows
* to process multiple records in one go, acknowledging their processing once that's done.
*/
public static interface ChangeConsumer<R> {
/**
* Handles a batch of records, calling the {@link RecordCommitter#markProcessed(SourceRecord)}
* for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished.
* @param records the records to be processed
* @param committer the committer that indicates to the system that we are finished
*/
void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;
}
/**
* A builder to set up and create {@link DebeziumEngine} instances.
*/
public static interface Builder<R> {
/**
* Call the specified function for every {@link SourceRecord data change event} read from the source database.
* This method must be called with a non-null consumer.
*
* @param consumer the consumer function
* @return this builder object so methods can be chained together; never null
*/
Builder<R> notifying(Consumer<R> consumer);
/**
* Pass a custom ChangeConsumer override the default implementation,
* this allows for more complex handling of records for batch and async handling
*
* @param handler the consumer function
* @return this builder object so methods can be chained together; never null
*/
Builder<R> notifying(ChangeConsumer<R> handler);
/**
* Use the specified configuration for the connector. The configuration is assumed to already be valid.
*
* @param config the configuration
* @return this builder object so methods can be chained together; never null
*/
Builder<R> using(Properties config);
/**
* Use the specified class loader to find all necessary classes. Passing <code>null</code> or not calling this method
* results in the connector using this class's class loader.
*
* @param classLoader the class loader
* @return this builder object so methods can be chained together; never null
*/
Builder<R> using(ClassLoader classLoader);
/**
* Use the specified clock when needing to determine the current time. Passing <code>null</code> or not calling this
* method results in the connector using the {@link Clock#system() system clock}.
*
* @param clock the clock
* @return this builder object so methods can be chained together; never null
*/
Builder<R> using(Clock clock);
/**
* When the engine's {@link DebeziumEngine#run()} method completes, call the supplied function with the results.
*
* @param completionCallback the callback function; may be null if errors should be written to the log
* @return this builder object so methods can be chained together; never null
*/
Builder<R> using(CompletionCallback completionCallback);
/**
* During the engine's {@link DebeziumEngine#run()} method, call the supplied the supplied function at different
* stages according to the completion state of each component running within the engine (connectors, tasks etc)
*
* @param connectorCallback the callback function; may be null
* @return this builder object so methods can be chained together; never null
*/
Builder<R> using(ConnectorCallback connectorCallback);
/**
* During the engine's {@link DebeziumEngine#run()} method, decide when the offsets
* should be committed into the {@link OffsetBackingStore}.
* @param policy
* @return this builder object so methods can be chained together; never null
*/
Builder<R> using(OffsetCommitPolicy policy);
/**
* Build a new connector with the information previously supplied to this builder.
*
* @return the embedded connector; never null
* @throws IllegalArgumentException if a {@link #using(Properties) configuration} or {@link #notifying(Consumer)
* consumer function} were not supplied before this method is called
*/
DebeziumEngine<R> build();
}
/**
* Obtain a new {@link Builder} instance that can be used to construct runnable {@link DebeziumEngine} instances.
*
* @return the new builder; never null
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Builder<T> create(Class<T> recordClass) {
final ServiceLoader<Builder> loader = ServiceLoader.load(Builder.class);
final Iterator<Builder> iterator = loader.iterator();
if (!iterator.hasNext()) {
throw new DebeziumException("No implementation of Debezium engine builder was found");
}
final Builder builder = iterator.next();
if (iterator.hasNext()) {
LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass());
}
return builder;
}
}

View File

@ -0,0 +1,97 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.engine.spi;
import java.time.Duration;
import java.util.Properties;
import io.debezium.annotation.Incubating;
import io.debezium.engine.DebeziumEngine;
/**
* The policy that defines when the offsets should be committed to offset storage.
*
* @author Randall Hauch
*/
@Incubating
@FunctionalInterface
public interface OffsetCommitPolicy {
/**
* An {@link OffsetCommitPolicy} that will commit offsets as frequently as possible. This may result in reduced
* performance, but it has the least potential for seeing source records more than once upon restart.
*/
public static class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy {
@Override
public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
return true;
}
}
/**
* An {@link OffsetCommitPolicy} that will commit offsets no more than the specified time period. If the specified
* time is less than {@code 0} then the policy will behave as {@link AlwaysCommitOffsetPolicy}.
* @see io.debezium.engine.DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS
*/
public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {
private final Duration minimumTime;
public PeriodicCommitOffsetPolicy(Properties config) {
minimumTime = Duration.ofMillis(Long.valueOf(config.getProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP)));
}
@Override
public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
return timeSinceLastCommit.compareTo(minimumTime) >= 0;
}
}
static OffsetCommitPolicy always() {
return new AlwaysCommitOffsetPolicy();
}
static OffsetCommitPolicy periodic(Properties config) {
return new PeriodicCommitOffsetPolicy(config);
}
/**
* Determine if a commit of the offsets should be performed.
*
* @param numberOfMessagesSinceLastCommit the number of messages that have been received from the connector since last
* the offsets were last committed; never negative
* @param timeSinceLastCommit the time that has elapsed since the offsets were last committed; never negative
* @return {@code true} if the offsets should be committed, or {@code false} otherwise
*/
boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit);
/**
* Obtain a new {@link OffsetCommitPolicy} that will commit offsets if this policy OR the other requests it.
*
* @param other the other commit policy; if null, then this policy instance is returned as is
* @return the resulting policy; never null
*/
default OffsetCommitPolicy or(OffsetCommitPolicy other) {
if (other == null) {
return this;
}
return (number, time) -> this.performCommit(number, time) || other.performCommit(number, time);
}
/**
* Obtain a new {@link OffsetCommitPolicy} that will commit offsets if both this policy AND the other requests it.
*
* @param other the other commit policy; if null, then this policy instance is returned as is
* @return the resulting policy; never null
*/
default OffsetCommitPolicy and(OffsetCommitPolicy other) {
if (other == null) {
return this;
}
return (number, time) -> this.performCommit(number, time) && other.performCommit(number, time);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.errors;
/**
* Base exception returned by Debezium API.
*
*/
public class DebeziumException extends RuntimeException {
private static final long serialVersionUID = -829914184849944524L;
public DebeziumException() {
}
public DebeziumException(String message) {
super(message);
}
public DebeziumException(Throwable cause) {
super(cause);
}
public DebeziumException(String message, Throwable cause) {
super(message, cause);
}
public DebeziumException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.errors;
/**
* An exception that is used to tell the engine to process the last source record and to then stop. When raised by
* {@link Consumer} implementations passed to {@link DebeziumEngine.Builder#notifying(Consumer)}, this exception should
* only be raised after that consumer has safely processed the passed event.
*
* @author Randall Hauch
*/
public class StopEngineException extends DebeziumException {
private static final long serialVersionUID = 1L;
public StopEngineException(String msg) {
super(msg);
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.engine;
import static org.fest.assertions.Assertions.assertThat;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import io.debezium.engine.spi.OffsetCommitPolicy;
/**
* @author Randall Hauch
*/
public class OffsetCommitPolicyTest {
@Test
public void shouldAlwaysCommit() {
OffsetCommitPolicy policy = OffsetCommitPolicy.always();
assertThat(policy.performCommit(0, Duration.ofNanos(0))).isTrue();
assertThat(policy.performCommit(10000, Duration.ofDays(1000))).isTrue();
}
@Test
public void shouldCommitPeriodically() {
// 10 hours
final Properties props = new Properties();
props.setProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, Integer.toString(10 * 60 * 60 * 1000));
OffsetCommitPolicy policy = OffsetCommitPolicy.periodic(props);
assertThat(policy.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(policy.performCommit(10000, Duration.ofHours(9))).isFalse();
assertThat(policy.performCommit(0, Duration.ofHours(10))).isTrue();
}
@Test
public void shouldCombineTwoPolicies() {
AtomicBoolean commitFirst = new AtomicBoolean(false);
AtomicBoolean commitSecond = new AtomicBoolean(false);
OffsetCommitPolicy policy1 = (num, time) -> commitFirst.get();
OffsetCommitPolicy policy2 = (num, time) -> commitSecond.get();
OffsetCommitPolicy both1 = policy1.and(policy2);
OffsetCommitPolicy both2 = policy2.and(policy1);
OffsetCommitPolicy either1 = policy1.or(policy2);
OffsetCommitPolicy either2 = policy2.or(policy1);
assertThat(both1.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(both2.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(either1.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(either2.performCommit(0, Duration.ofNanos(0))).isFalse();
commitFirst.set(true);
assertThat(both1.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(both2.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(either1.performCommit(0, Duration.ofNanos(0))).isTrue();
assertThat(either2.performCommit(0, Duration.ofNanos(0))).isTrue();
commitSecond.set(true);
assertThat(both1.performCommit(0, Duration.ofNanos(0))).isTrue();
assertThat(both2.performCommit(0, Duration.ofNanos(0))).isTrue();
assertThat(either1.performCommit(0, Duration.ofNanos(0))).isTrue();
assertThat(either2.performCommit(0, Duration.ofNanos(0))).isTrue();
commitFirst.set(false);
assertThat(both1.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(both2.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(either1.performCommit(0, Duration.ofNanos(0))).isTrue();
assertThat(either2.performCommit(0, Duration.ofNanos(0))).isTrue();
commitSecond.set(false);
assertThat(both1.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(both2.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(either1.performCommit(0, Duration.ofNanos(0))).isFalse();
assertThat(either2.performCommit(0, Duration.ofNanos(0))).isFalse();
}
@Test
public void shouldCombineOnePolicyWithNull() {
AtomicBoolean commit = new AtomicBoolean(false);
OffsetCommitPolicy policy1 = (num, time) -> commit.get();
assertThat(policy1.and(null)).isSameAs(policy1);
assertThat(policy1.or(null)).isSameAs(policy1);
}
}

View File

@ -11,6 +11,10 @@
<name>Debezium Core</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -77,6 +77,7 @@
<directory>src/main/resources</directory>
<includes>
<include>**/build.properties</include>
<include>**/META-INF/services/*</include>
</includes>
</resource>
</resources>

View File

@ -10,6 +10,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@ -44,7 +45,9 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.errors.StopEngineException;
import io.debezium.util.Clock;
import io.debezium.util.VariableLatch;
@ -66,7 +69,7 @@
* @author Randall Hauch
*/
@ThreadSafe
public final class EmbeddedEngine implements Runnable {
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
/**
* A required field for an embedded connector that specifies the unique name for the connector instance.
@ -160,7 +163,7 @@ public final class EmbeddedEngine implements Runnable {
.withDescription("The fully-qualified class name of the commit policy type. This class must implement the interface "
+ OffsetCommitPolicy.class.getName()
+ ". The default is a periodic commit policy based upon time intervals.")
.withDefault(OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())
.withDefault(io.debezium.embedded.spi.OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())
.withValidation(Field::isClassName);
protected static final Field INTERNAL_KEY_CONVERTER_CLASS = Field.create("internal.key.converter")
@ -186,57 +189,116 @@ public final class EmbeddedEngine implements Runnable {
private static final Duration WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT = Duration.ofSeconds(2);
private static final String WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP = "debezium.embedded.shutdown.pause.before.interrupt.ms";
public static final class BuilderImpl implements Builder {
private Configuration config;
private DebeziumEngine.ChangeConsumer<SourceRecord> handler;
private ClassLoader classLoader;
private Clock clock;
private DebeziumEngine.CompletionCallback completionCallback;
private DebeziumEngine.ConnectorCallback connectorCallback;
private OffsetCommitPolicy offsetCommitPolicy = null;
@Override
public Builder using(Configuration config) {
this.config = config;
return this;
}
@Override
public Builder using(Properties config) {
this.config = Configuration.from(config);
return this;
}
@Override
public Builder using(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
}
@Override
public Builder using(Clock clock) {
this.clock = clock;
return this;
}
@Override
public Builder using(DebeziumEngine.CompletionCallback completionCallback) {
this.completionCallback = completionCallback;
return this;
}
@Override
public Builder using(DebeziumEngine.ConnectorCallback connectorCallback) {
this.connectorCallback = connectorCallback;
return this;
}
@Override
public Builder using(OffsetCommitPolicy offsetCommitPolicy) {
this.offsetCommitPolicy = offsetCommitPolicy;
return this;
}
@Override
public Builder notifying(Consumer<SourceRecord> consumer) {
this.handler = buildDefaultChangeConsumer(consumer);
return this;
}
@Override
public Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler) {
this.handler = handler;
return this;
}
@Override
public Builder using(java.time.Clock clock) {
return using(new Clock() {
@Override
public long currentTimeInMillis() {
return clock.millis();
}
});
}
@Override
public EmbeddedEngine build() {
if (classLoader == null) {
classLoader = getClass().getClassLoader();
}
if (clock == null) {
clock = Clock.system();
}
Objects.requireNonNull(config, "A connector configuration must be specified.");
Objects.requireNonNull(handler, "A connector consumer or changeHandler must be specified.");
return new EmbeddedEngine(config, classLoader, clock,
handler, completionCallback, connectorCallback, offsetCommitPolicy);
}
// backward compatibility methods
@Override
public Builder using(CompletionCallback completionCallback) {
return using((DebeziumEngine.CompletionCallback) completionCallback);
}
@Override
public Builder using(ConnectorCallback connectorCallback) {
return using((DebeziumEngine.ConnectorCallback) connectorCallback);
}
}
/**
* A callback function to be notified when the connector completes.
*/
public interface CompletionCallback {
/**
* Handle the completion of the embedded connector engine.
*
* @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error
* that prevented startup or premature termination.
* @param message the completion message; never null
* @param error the error, or null if there was no exception
*/
void handle(boolean success, String message, Throwable error);
public interface CompletionCallback extends DebeziumEngine.CompletionCallback {
}
/**
* Callback function which informs users about the various stages a connector goes through during startup
*/
public interface ConnectorCallback {
/**
* Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has
* completed successfully
*/
default void connectorStarted() {
// nothing by default
}
/**
* Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has
* completed successfully
*/
default void connectorStopped() {
// nothing by default
}
/**
* Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has
* completed successfully
*/
default void taskStarted() {
// nothing by default
}
/**
* Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has
* completed successfully
*/
default void taskStopped() {
// nothing by default
}
public interface ConnectorCallback extends DebeziumEngine.ConnectorCallback {
}
/**
@ -350,38 +412,14 @@ public boolean hasError() {
* and to signal that offsets may be flushed eventually.
*/
@ThreadSafe
public static interface RecordCommitter {
/**
* Marks a single record as processed, must be called for each
* record.
*
* @param record the record to commit
*/
void markProcessed(SourceRecord record) throws InterruptedException;
/**
* Marks a batch as finished, this may result in committing offsets/flushing
* data.
* <p>
* Should be called when a batch of records is finished being processed.
*/
void markBatchFinished();
public static interface RecordCommitter extends DebeziumEngine.RecordCommitter<SourceRecord> {
}
/**
* A contract invoked by the embedded engine when it has received a batch of change records to be processed. Allows
* to process multiple records in one go, acknowledging their processing once that's done.
*/
public static interface ChangeConsumer {
/**
* Handles a batch of records, calling the {@link RecordCommitter#markProcessed(SourceRecord)}
* for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished.
* @param records the records to be processed
* @param committer the committer that indicates to the system that we are finished
*/
void handleBatch(List<SourceRecord> records, RecordCommitter committer) throws InterruptedException;
public static interface ChangeConsumer extends DebeziumEngine.ChangeConsumer<SourceRecord> {
}
private static ChangeConsumer buildDefaultChangeConsumer(Consumer<SourceRecord> consumer) {
@ -399,14 +437,14 @@ private static ChangeConsumer buildDefaultChangeConsumer(Consumer<SourceRecord>
* @throws Exception
*/
@Override
public void handleBatch(List<SourceRecord> records, RecordCommitter committer) throws InterruptedException {
public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
try {
for (SourceRecord record : records) {
try {
consumer.accept(record);
committer.markProcessed(record);
}
catch (StopConnectorException ex) {
catch (StopConnectorException | StopEngineException ex) {
// ensure that we mark the record as finished
// in this case
committer.markProcessed(record);
@ -424,25 +462,7 @@ public void handleBatch(List<SourceRecord> records, RecordCommitter committer) t
/**
* A builder to set up and create {@link EmbeddedEngine} instances.
*/
public static interface Builder {
/**
* Call the specified function for every {@link SourceRecord data change event} read from the source database.
* This method must be called with a non-null consumer.
*
* @param consumer the consumer function
* @return this builder object so methods can be chained together; never null
*/
Builder notifying(Consumer<SourceRecord> consumer);
/**
* Pass a custom ChangeConsumer override the default implementation,
* this allows for more complex handling of records for batch and async handling
*
* @param handler the consumer function
* @return this builder object so methods can be chained together; never null
*/
Builder notifying(ChangeConsumer handler);
public static interface Builder extends DebeziumEngine.Builder<SourceRecord> {
/**
* Use the specified configuration for the connector. The configuration is assumed to already be valid.
@ -452,15 +472,6 @@ public static interface Builder {
*/
Builder using(Configuration config);
/**
* Use the specified class loader to find all necessary classes. Passing <code>null</code> or not calling this method
* results in the connector using this class's class loader.
*
* @param classLoader the class loader
* @return this builder object so methods can be chained together; never null
*/
Builder using(ClassLoader classLoader);
/**
* Use the specified clock when needing to determine the current time. Passing <code>null</code> or not calling this
* method results in the connector using the {@link Clock#system() system clock}.
@ -470,38 +481,19 @@ public static interface Builder {
*/
Builder using(Clock clock);
/**
* When the engine's {@link EmbeddedEngine#run()} method completes, call the supplied function with the results.
*
* @param completionCallback the callback function; may be null if errors should be written to the log
* @return this builder object so methods can be chained together; never null
*/
// backward compatibility methods
Builder notifying(Consumer<SourceRecord> consumer);
Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler);
Builder using(ClassLoader classLoader);
Builder using(CompletionCallback completionCallback);
/**
* During the engine's {@link EmbeddedEngine#run()} method, call the supplied the supplied function at different
* stages according to the completion state of each component running within the engine (connectors, tasks etc)
*
* @param connectorCallback the callback function; may be null
* @return this builder object so methods can be chained together; never null
*/
Builder using(ConnectorCallback connectorCallback);
/**
* During the engine's {@link EmbeddedEngine#run()} method, decide when the offsets
* should be committed into the {@link OffsetBackingStore}.
* @param policy
* @return this builder object so methods can be chained together; never null
*/
Builder using(OffsetCommitPolicy policy);
/**
* Build a new connector with the information previously supplied to this builder.
*
* @return the embedded connector; never null
* @throws IllegalArgumentException if a {@link #using(Configuration) configuration} or {@link #notifying(Consumer)
* consumer function} were not supplied before this method is called
*/
EmbeddedEngine build();
}
@ -511,87 +503,16 @@ public static interface Builder {
* @return the new builder; never null
*/
public static Builder create() {
return new Builder() {
private Configuration config;
private ChangeConsumer handler;
private ClassLoader classLoader;
private Clock clock;
private CompletionCallback completionCallback;
private ConnectorCallback connectorCallback;
private OffsetCommitPolicy offsetCommitPolicy = null;
@Override
public Builder using(Configuration config) {
this.config = config;
return this;
}
@Override
public Builder using(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
}
@Override
public Builder using(Clock clock) {
this.clock = clock;
return this;
}
@Override
public Builder using(CompletionCallback completionCallback) {
this.completionCallback = completionCallback;
return this;
}
@Override
public Builder using(ConnectorCallback connectorCallback) {
this.connectorCallback = connectorCallback;
return this;
}
@Override
public Builder using(OffsetCommitPolicy offsetCommitPolicy) {
this.offsetCommitPolicy = offsetCommitPolicy;
return this;
}
@Override
public Builder notifying(Consumer<SourceRecord> consumer) {
this.handler = buildDefaultChangeConsumer(consumer);
return this;
}
@Override
public Builder notifying(ChangeConsumer handler) {
this.handler = handler;
return this;
}
@Override
public EmbeddedEngine build() {
if (classLoader == null) {
classLoader = getClass().getClassLoader();
}
if (clock == null) {
clock = Clock.system();
}
Objects.requireNonNull(config, "A connector configuration must be specified.");
Objects.requireNonNull(handler, "A connector consumer or changeHandler must be specified.");
return new EmbeddedEngine(config, classLoader, clock,
handler, completionCallback, connectorCallback, offsetCommitPolicy);
}
};
return new BuilderImpl();
}
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Configuration config;
private final Clock clock;
private final ClassLoader classLoader;
private final ChangeConsumer handler;
private final CompletionCallback completionCallback;
private final ConnectorCallback connectorCallback;
private final DebeziumEngine.ChangeConsumer<SourceRecord> handler;
private final DebeziumEngine.CompletionCallback completionCallback;
private final DebeziumEngine.ConnectorCallback connectorCallback;
private final AtomicReference<Thread> runningThread = new AtomicReference<>();
private final VariableLatch latch = new VariableLatch(0);
private final Converter keyConverter;
@ -604,8 +525,8 @@ public EmbeddedEngine build() {
private SourceTask task;
private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, ChangeConsumer handler,
CompletionCallback completionCallback, ConnectorCallback connectorCallback,
private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> handler,
DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback,
OffsetCommitPolicy offsetCommitPolicy) {
this.config = config;
this.handler = handler;
@ -693,7 +614,7 @@ public void run() {
final String engineName = config.getString(ENGINE_NAME);
final String connectorClassName = config.getString(CONNECTOR_CLASS);
final Optional<ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback);
final Optional<DebeziumEngine.ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback);
// Only one thread can be in this part of the method at a time ...
latch.countUp();
try {
@ -766,7 +687,7 @@ public void raiseError(Exception e) {
try {
// Start the connector with the given properties and get the task configurations ...
connector.start(config.asMap());
connectorCallback.ifPresent(ConnectorCallback::connectorStarted);
connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
Class<? extends Task> taskClass = connector.taskClass();
task = null;
@ -791,7 +712,7 @@ public Map<String, String> configs() {
};
task.initialize(taskContext);
task.start(taskConfigs.get(0));
connectorCallback.ifPresent(ConnectorCallback::taskStarted);
connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted);
}
catch (Throwable t) {
// Mask the passwords ...
@ -857,7 +778,7 @@ public Map<String, String> configs() {
// First stop the task ...
logger.debug("Stopping the task and engine");
task.stop();
connectorCallback.ifPresent(ConnectorCallback::taskStopped);
connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStopped);
// Always commit offsets that were captured from the source records we actually processed ...
commitOffsets(offsetWriter, commitTimeout, task);
if (handlerError == null) {
@ -884,7 +805,7 @@ public Map<String, String> configs() {
finally {
try {
connector.stop();
connectorCallback.ifPresent(ConnectorCallback::connectorStopped);
connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped);
}
catch (Throwable t) {
fail("Error while trying to stop connector class '" + connectorClassName + "'", t);

View File

@ -7,7 +7,7 @@
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.errors.DebeziumException;
/**
* An exception that is used to tell the connector to process the last source record and to then stop. When raised by
@ -16,7 +16,7 @@
*
* @author Randall Hauch
*/
public class StopConnectorException extends ConnectException {
public class StopConnectorException extends DebeziumException {
private static final long serialVersionUID = 1L;

View File

@ -18,7 +18,7 @@
* @author Randall Hauch
*/
@FunctionalInterface
public interface OffsetCommitPolicy {
public interface OffsetCommitPolicy extends io.debezium.engine.spi.OffsetCommitPolicy {
/**
* An {@link OffsetCommitPolicy} that will commit offsets as frequently as possible. This may result in reduced
@ -59,16 +59,6 @@ static OffsetCommitPolicy periodic(Configuration config) {
return new PeriodicCommitOffsetPolicy(config);
}
/**
* Determine if a commit of the offsets should be performed.
*
* @param numberOfMessagesSinceLastCommit the number of messages that have been received from the connector since last
* the offsets were last committed; never negative
* @param timeSinceLastCommit the time that has elapsed since the offsets were last committed; never negative
* @return {@code true} if the offsets should be committed, or {@code false} otherwise
*/
boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit);
/**
* Obtain a new {@link OffsetCommitPolicy} that will commit offsets if this policy OR the other requests it.
*

View File

@ -0,0 +1 @@
io.debezium.embedded.EmbeddedEngine$BuilderImpl

View File

@ -13,6 +13,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -26,6 +27,7 @@
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.engine.DebeziumEngine;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
@ -149,6 +151,63 @@ public void shouldWorkToUseCustomChangeConsumer() throws Exception {
stopConnector();
}
@Test
public void shouldRunDebeziumEngine() throws Exception {
// Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES);
final Properties props = new Properties();
props.setProperty("name", "debezium-engine");
props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX");
CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
// create an engine with our custom class
final DebeziumEngine<SourceRecord> engine = DebeziumEngine.create(SourceRecord.class)
.using(props)
.notifying((records, committer) -> {
assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
Integer groupCount = records.size() / NUMBER_OF_LINES;
for (SourceRecord r : records) {
committer.markProcessed(r);
}
committer.markBatchFinished();
firstLatch.countDown();
for (int i = 0; i < groupCount; i++) {
allLatch.countDown();
}
})
.using(this.getClass().getClassLoader())
.build();
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run();
});
firstLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(firstLatch.getCount()).isEqualTo(0);
for (int i = 0; i < 5; i++) {
// Add a few more lines, and then verify they are consumed ...
appendLinesToSource(NUMBER_OF_LINES);
Thread.sleep(10);
}
allLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);
// Stop the connector ...
stopConnector();
}
protected void appendLinesToSource(int numberOfLines) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {

View File

@ -155,6 +155,7 @@
<modules>
<module>support/checkstyle</module>
<module>support/ide-configs</module>
<module>debezium-api</module>
<module>debezium-ddl-parser</module>
<module>debezium-assembly-descriptors</module>
<module>debezium-core</module>
@ -408,6 +409,11 @@
</dependency>
<!-- Debezium artifacts -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>