From 38df24a76bf5184228c2f79ec0c86ef1b280426b Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Mon, 10 Feb 2020 10:28:37 +0100 Subject: [PATCH] DBZ-234 Create API module with engine API --- debezium-api/pom.xml | 76 ++++ .../io/debezium/annotation/Incubating.java | 23 ++ .../io/debezium/engine/DebeziumEngine.java | 239 ++++++++++++ .../engine/spi/OffsetCommitPolicy.java | 97 +++++ .../io/debezium/errors/DebeziumException.java | 35 ++ .../debezium/errors/StopEngineException.java | 22 ++ .../engine/OffsetCommitPolicyTest.java | 90 +++++ debezium-core/pom.xml | 4 + debezium-embedded/pom.xml | 1 + .../io/debezium/embedded/EmbeddedEngine.java | 341 +++++++----------- .../embedded/StopConnectorException.java | 4 +- .../embedded/spi/OffsetCommitPolicy.java | 12 +- .../io.debezium.engine.DebeziumEngine$Builder | 1 + .../debezium/embedded/EmbeddedEngineTest.java | 59 +++ pom.xml | 6 + 15 files changed, 787 insertions(+), 223 deletions(-) create mode 100644 debezium-api/pom.xml create mode 100644 debezium-api/src/main/java/io/debezium/annotation/Incubating.java create mode 100644 debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java create mode 100644 debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java create mode 100644 debezium-api/src/main/java/io/debezium/errors/DebeziumException.java create mode 100644 debezium-api/src/main/java/io/debezium/errors/StopEngineException.java create mode 100644 debezium-api/src/test/java/io/debezium/engine/OffsetCommitPolicyTest.java create mode 100644 debezium-embedded/src/main/resources/META-INF/services/io.debezium.engine.DebeziumEngine$Builder diff --git a/debezium-api/pom.xml b/debezium-api/pom.xml new file mode 100644 index 000000000..88f7bac5c --- /dev/null +++ b/debezium-api/pom.xml @@ -0,0 +1,76 @@ + + + + io.debezium + debezium-parent + 1.1.0-SNAPSHOT + ../pom.xml + + 4.0.0 + debezium-api + Debezium API + jar + + + org.slf4j + slf4j-api + provided + + + + + org.slf4j + slf4j-log4j12 + test + + + log4j + log4j + test + + + junit + junit + test + + + org.easytesting + fest-assert + test + + + + + org.apache.kafka + kafka_${version.kafka.scala} + test + + + org.apache.curator + curator-test + test + + + org.apache.zookeeper + zookeeper + test + + + io.confluent + kafka-connect-avro-converter + test + + + + + + + true + src/main/resources + + **/build.properties + + + + + diff --git a/debezium-api/src/main/java/io/debezium/annotation/Incubating.java b/debezium-api/src/main/java/io/debezium/annotation/Incubating.java new file mode 100644 index 000000000..0991ce1b2 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/annotation/Incubating.java @@ -0,0 +1,23 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +/** + * Marks the annotated element as incubating. The contract of incubating elements (e.g. packages, types, methods, + * constants etc.) is under active development and may be incompatibly altered - or removed - in subsequent releases. + *

+ * Usage of incubating API/SPI members is encouraged (so the development team can get feedback on these new features) + * but you should be prepared for updating code which is using them as needed. + * + */ +@Documented +@Retention(RetentionPolicy.CLASS) +public @interface Incubating { +} diff --git a/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java b/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java new file mode 100644 index 000000000..d98c223e8 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java @@ -0,0 +1,239 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.engine; + +import java.time.Clock; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.Incubating; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.errors.DebeziumException; + +/** + * A mechanism for running a single Kafka Connect {@link SourceConnector} within an application's process. The engine + * is entirely standalone and only talks with the source system; no Kafka, Kafka Connect, or Zookeeper processes are needed. + * Applications using the engine simply set one up and supply a {@link Consumer consumer function} to which the + * engine will pass all records containing database change events. + *

+ * With the engine, the application that runs the connector assumes all responsibility for fault tolerance, + * scalability, and durability. Additionally, applications must specify how the engine can store its relational database + * schema history and offsets. By default, this information will be stored in memory and will thus be lost upon application + * restart. + *

+ * Engine Is designed to be submitted to an {@link Executor} or {@link ExecutorService} for execution by a single + * thread, and a running connector can be stopped either by calling {@link #stop()} from another thread or by interrupting + * the running thread (e.g., as is the case with {@link ExecutorService#shutdownNow()}). + * + * @author Randall Hauch + */ +@Incubating +public interface DebeziumEngine extends Runnable { + + public static final String OFFSET_FLUSH_INTERVAL_MS_PROP = "offset.flush.interval.ms"; + + /** + * A callback function to be notified when the connector completes. + */ + public interface CompletionCallback { + /** + * Handle the completion of the embedded connector engine. + * + * @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error + * that prevented startup or premature termination. + * @param message the completion message; never null + * @param error the error, or null if there was no exception + */ + void handle(boolean success, String message, Throwable error); + } + + /** + * Callback function which informs users about the various stages a connector goes through during startup + */ + public interface ConnectorCallback { + + /** + * Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has + * completed successfully + */ + default void connectorStarted() { + // nothing by default + } + + /** + * Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has + * completed successfully + */ + default void connectorStopped() { + // nothing by default + } + + /** + * Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has + * completed successfully + */ + default void taskStarted() { + // nothing by default + } + + /** + * Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has + * completed successfully + */ + default void taskStopped() { + // nothing by default + } + } + + /** + * Contract passed to {@link ChangeConsumer}s, allowing them to commit single records as they have been processed + * and to signal that offsets may be flushed eventually. + */ + public static interface RecordCommitter { + + /** + * Marks a single record as processed, must be called for each + * record. + * + * @param record the record to commit + */ + void markProcessed(R record) throws InterruptedException; + + /** + * Marks a batch as finished, this may result in committing offsets/flushing + * data. + *

+ * Should be called when a batch of records is finished being processed. + */ + void markBatchFinished(); + } + + /** + * A contract invoked by the embedded engine when it has received a batch of change records to be processed. Allows + * to process multiple records in one go, acknowledging their processing once that's done. + */ + public static interface ChangeConsumer { + + /** + * Handles a batch of records, calling the {@link RecordCommitter#markProcessed(SourceRecord)} + * for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished. + * @param records the records to be processed + * @param committer the committer that indicates to the system that we are finished + */ + void handleBatch(List records, RecordCommitter committer) throws InterruptedException; + } + + /** + * A builder to set up and create {@link DebeziumEngine} instances. + */ + public static interface Builder { + + /** + * Call the specified function for every {@link SourceRecord data change event} read from the source database. + * This method must be called with a non-null consumer. + * + * @param consumer the consumer function + * @return this builder object so methods can be chained together; never null + */ + Builder notifying(Consumer consumer); + + /** + * Pass a custom ChangeConsumer override the default implementation, + * this allows for more complex handling of records for batch and async handling + * + * @param handler the consumer function + * @return this builder object so methods can be chained together; never null + */ + Builder notifying(ChangeConsumer handler); + + /** + * Use the specified configuration for the connector. The configuration is assumed to already be valid. + * + * @param config the configuration + * @return this builder object so methods can be chained together; never null + */ + Builder using(Properties config); + + /** + * Use the specified class loader to find all necessary classes. Passing null or not calling this method + * results in the connector using this class's class loader. + * + * @param classLoader the class loader + * @return this builder object so methods can be chained together; never null + */ + Builder using(ClassLoader classLoader); + + /** + * Use the specified clock when needing to determine the current time. Passing null or not calling this + * method results in the connector using the {@link Clock#system() system clock}. + * + * @param clock the clock + * @return this builder object so methods can be chained together; never null + */ + Builder using(Clock clock); + + /** + * When the engine's {@link DebeziumEngine#run()} method completes, call the supplied function with the results. + * + * @param completionCallback the callback function; may be null if errors should be written to the log + * @return this builder object so methods can be chained together; never null + */ + Builder using(CompletionCallback completionCallback); + + /** + * During the engine's {@link DebeziumEngine#run()} method, call the supplied the supplied function at different + * stages according to the completion state of each component running within the engine (connectors, tasks etc) + * + * @param connectorCallback the callback function; may be null + * @return this builder object so methods can be chained together; never null + */ + Builder using(ConnectorCallback connectorCallback); + + /** + * During the engine's {@link DebeziumEngine#run()} method, decide when the offsets + * should be committed into the {@link OffsetBackingStore}. + * @param policy + * @return this builder object so methods can be chained together; never null + */ + Builder using(OffsetCommitPolicy policy); + + /** + * Build a new connector with the information previously supplied to this builder. + * + * @return the embedded connector; never null + * @throws IllegalArgumentException if a {@link #using(Properties) configuration} or {@link #notifying(Consumer) + * consumer function} were not supplied before this method is called + */ + DebeziumEngine build(); + } + + /** + * Obtain a new {@link Builder} instance that can be used to construct runnable {@link DebeziumEngine} instances. + * + * @return the new builder; never null + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Builder create(Class recordClass) { + final ServiceLoader loader = ServiceLoader.load(Builder.class); + final Iterator iterator = loader.iterator(); + if (!iterator.hasNext()) { + throw new DebeziumException("No implementation of Debezium engine builder was found"); + } + final Builder builder = iterator.next(); + if (iterator.hasNext()) { + LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass()); + } + return builder; + } +} diff --git a/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java b/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java new file mode 100644 index 000000000..de16eead6 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java @@ -0,0 +1,97 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.engine.spi; + +import java.time.Duration; +import java.util.Properties; + +import io.debezium.annotation.Incubating; +import io.debezium.engine.DebeziumEngine; + +/** + * The policy that defines when the offsets should be committed to offset storage. + * + * @author Randall Hauch + */ +@Incubating +@FunctionalInterface +public interface OffsetCommitPolicy { + + /** + * An {@link OffsetCommitPolicy} that will commit offsets as frequently as possible. This may result in reduced + * performance, but it has the least potential for seeing source records more than once upon restart. + */ + public static class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy { + + @Override + public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) { + return true; + } + } + + /** + * An {@link OffsetCommitPolicy} that will commit offsets no more than the specified time period. If the specified + * time is less than {@code 0} then the policy will behave as {@link AlwaysCommitOffsetPolicy}. + * @see io.debezium.engine.DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS + */ + public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy { + + private final Duration minimumTime; + + public PeriodicCommitOffsetPolicy(Properties config) { + minimumTime = Duration.ofMillis(Long.valueOf(config.getProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP))); + } + + @Override + public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) { + return timeSinceLastCommit.compareTo(minimumTime) >= 0; + } + } + + static OffsetCommitPolicy always() { + return new AlwaysCommitOffsetPolicy(); + } + + static OffsetCommitPolicy periodic(Properties config) { + return new PeriodicCommitOffsetPolicy(config); + } + + /** + * Determine if a commit of the offsets should be performed. + * + * @param numberOfMessagesSinceLastCommit the number of messages that have been received from the connector since last + * the offsets were last committed; never negative + * @param timeSinceLastCommit the time that has elapsed since the offsets were last committed; never negative + * @return {@code true} if the offsets should be committed, or {@code false} otherwise + */ + boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit); + + /** + * Obtain a new {@link OffsetCommitPolicy} that will commit offsets if this policy OR the other requests it. + * + * @param other the other commit policy; if null, then this policy instance is returned as is + * @return the resulting policy; never null + */ + default OffsetCommitPolicy or(OffsetCommitPolicy other) { + if (other == null) { + return this; + } + return (number, time) -> this.performCommit(number, time) || other.performCommit(number, time); + } + + /** + * Obtain a new {@link OffsetCommitPolicy} that will commit offsets if both this policy AND the other requests it. + * + * @param other the other commit policy; if null, then this policy instance is returned as is + * @return the resulting policy; never null + */ + default OffsetCommitPolicy and(OffsetCommitPolicy other) { + if (other == null) { + return this; + } + return (number, time) -> this.performCommit(number, time) && other.performCommit(number, time); + } +} diff --git a/debezium-api/src/main/java/io/debezium/errors/DebeziumException.java b/debezium-api/src/main/java/io/debezium/errors/DebeziumException.java new file mode 100644 index 000000000..3ea41a878 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/errors/DebeziumException.java @@ -0,0 +1,35 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.errors; + +/** + * Base exception returned by Debezium API. + * + */ +public class DebeziumException extends RuntimeException { + + private static final long serialVersionUID = -829914184849944524L; + + public DebeziumException() { + } + + public DebeziumException(String message) { + super(message); + } + + public DebeziumException(Throwable cause) { + super(cause); + } + + public DebeziumException(String message, Throwable cause) { + super(message, cause); + } + + public DebeziumException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + +} diff --git a/debezium-api/src/main/java/io/debezium/errors/StopEngineException.java b/debezium-api/src/main/java/io/debezium/errors/StopEngineException.java new file mode 100644 index 000000000..efb87d6ca --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/errors/StopEngineException.java @@ -0,0 +1,22 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.errors; + +/** + * An exception that is used to tell the engine to process the last source record and to then stop. When raised by + * {@link Consumer} implementations passed to {@link DebeziumEngine.Builder#notifying(Consumer)}, this exception should + * only be raised after that consumer has safely processed the passed event. + * + * @author Randall Hauch + */ +public class StopEngineException extends DebeziumException { + + private static final long serialVersionUID = 1L; + + public StopEngineException(String msg) { + super(msg); + } +} diff --git a/debezium-api/src/test/java/io/debezium/engine/OffsetCommitPolicyTest.java b/debezium-api/src/test/java/io/debezium/engine/OffsetCommitPolicyTest.java new file mode 100644 index 000000000..8e6e431b1 --- /dev/null +++ b/debezium-api/src/test/java/io/debezium/engine/OffsetCommitPolicyTest.java @@ -0,0 +1,90 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.engine; + +import static org.fest.assertions.Assertions.assertThat; + +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import io.debezium.engine.spi.OffsetCommitPolicy; + +/** + * @author Randall Hauch + */ +public class OffsetCommitPolicyTest { + + @Test + public void shouldAlwaysCommit() { + OffsetCommitPolicy policy = OffsetCommitPolicy.always(); + assertThat(policy.performCommit(0, Duration.ofNanos(0))).isTrue(); + assertThat(policy.performCommit(10000, Duration.ofDays(1000))).isTrue(); + } + + @Test + public void shouldCommitPeriodically() { + // 10 hours + final Properties props = new Properties(); + props.setProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, Integer.toString(10 * 60 * 60 * 1000)); + OffsetCommitPolicy policy = OffsetCommitPolicy.periodic(props); + assertThat(policy.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(policy.performCommit(10000, Duration.ofHours(9))).isFalse(); + assertThat(policy.performCommit(0, Duration.ofHours(10))).isTrue(); + } + + @Test + public void shouldCombineTwoPolicies() { + AtomicBoolean commitFirst = new AtomicBoolean(false); + AtomicBoolean commitSecond = new AtomicBoolean(false); + OffsetCommitPolicy policy1 = (num, time) -> commitFirst.get(); + OffsetCommitPolicy policy2 = (num, time) -> commitSecond.get(); + OffsetCommitPolicy both1 = policy1.and(policy2); + OffsetCommitPolicy both2 = policy2.and(policy1); + OffsetCommitPolicy either1 = policy1.or(policy2); + OffsetCommitPolicy either2 = policy2.or(policy1); + + assertThat(both1.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(both2.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(either1.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(either2.performCommit(0, Duration.ofNanos(0))).isFalse(); + + commitFirst.set(true); + assertThat(both1.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(both2.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(either1.performCommit(0, Duration.ofNanos(0))).isTrue(); + assertThat(either2.performCommit(0, Duration.ofNanos(0))).isTrue(); + + commitSecond.set(true); + assertThat(both1.performCommit(0, Duration.ofNanos(0))).isTrue(); + assertThat(both2.performCommit(0, Duration.ofNanos(0))).isTrue(); + assertThat(either1.performCommit(0, Duration.ofNanos(0))).isTrue(); + assertThat(either2.performCommit(0, Duration.ofNanos(0))).isTrue(); + + commitFirst.set(false); + assertThat(both1.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(both2.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(either1.performCommit(0, Duration.ofNanos(0))).isTrue(); + assertThat(either2.performCommit(0, Duration.ofNanos(0))).isTrue(); + + commitSecond.set(false); + assertThat(both1.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(both2.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(either1.performCommit(0, Duration.ofNanos(0))).isFalse(); + assertThat(either2.performCommit(0, Duration.ofNanos(0))).isFalse(); + } + + @Test + public void shouldCombineOnePolicyWithNull() { + AtomicBoolean commit = new AtomicBoolean(false); + OffsetCommitPolicy policy1 = (num, time) -> commit.get(); + assertThat(policy1.and(null)).isSameAs(policy1); + assertThat(policy1.or(null)).isSameAs(policy1); + } + +} diff --git a/debezium-core/pom.xml b/debezium-core/pom.xml index 56d72b042..f0e03a476 100644 --- a/debezium-core/pom.xml +++ b/debezium-core/pom.xml @@ -11,6 +11,10 @@ Debezium Core jar + + io.debezium + debezium-api + org.slf4j slf4j-api diff --git a/debezium-embedded/pom.xml b/debezium-embedded/pom.xml index 9c07a6ff3..88f1bf392 100644 --- a/debezium-embedded/pom.xml +++ b/debezium-embedded/pom.xml @@ -77,6 +77,7 @@ src/main/resources **/build.properties + **/META-INF/services/* diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java index 9653c4828..949a39d10 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -44,7 +45,9 @@ import io.debezium.annotation.ThreadSafe; import io.debezium.config.Configuration; import io.debezium.config.Field; -import io.debezium.embedded.spi.OffsetCommitPolicy; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.errors.StopEngineException; import io.debezium.util.Clock; import io.debezium.util.VariableLatch; @@ -66,7 +69,7 @@ * @author Randall Hauch */ @ThreadSafe -public final class EmbeddedEngine implements Runnable { +public final class EmbeddedEngine implements DebeziumEngine { /** * A required field for an embedded connector that specifies the unique name for the connector instance. @@ -160,7 +163,7 @@ public final class EmbeddedEngine implements Runnable { .withDescription("The fully-qualified class name of the commit policy type. This class must implement the interface " + OffsetCommitPolicy.class.getName() + ". The default is a periodic commit policy based upon time intervals.") - .withDefault(OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName()) + .withDefault(io.debezium.embedded.spi.OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName()) .withValidation(Field::isClassName); protected static final Field INTERNAL_KEY_CONVERTER_CLASS = Field.create("internal.key.converter") @@ -186,57 +189,116 @@ public final class EmbeddedEngine implements Runnable { private static final Duration WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT = Duration.ofSeconds(2); private static final String WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP = "debezium.embedded.shutdown.pause.before.interrupt.ms"; + public static final class BuilderImpl implements Builder { + private Configuration config; + private DebeziumEngine.ChangeConsumer handler; + private ClassLoader classLoader; + private Clock clock; + private DebeziumEngine.CompletionCallback completionCallback; + private DebeziumEngine.ConnectorCallback connectorCallback; + private OffsetCommitPolicy offsetCommitPolicy = null; + + @Override + public Builder using(Configuration config) { + this.config = config; + return this; + } + + @Override + public Builder using(Properties config) { + this.config = Configuration.from(config); + return this; + } + + @Override + public Builder using(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + @Override + public Builder using(Clock clock) { + this.clock = clock; + return this; + } + + @Override + public Builder using(DebeziumEngine.CompletionCallback completionCallback) { + this.completionCallback = completionCallback; + return this; + } + + @Override + public Builder using(DebeziumEngine.ConnectorCallback connectorCallback) { + this.connectorCallback = connectorCallback; + return this; + } + + @Override + public Builder using(OffsetCommitPolicy offsetCommitPolicy) { + this.offsetCommitPolicy = offsetCommitPolicy; + return this; + } + + @Override + public Builder notifying(Consumer consumer) { + this.handler = buildDefaultChangeConsumer(consumer); + return this; + } + + @Override + public Builder notifying(DebeziumEngine.ChangeConsumer handler) { + this.handler = handler; + return this; + } + + @Override + public Builder using(java.time.Clock clock) { + return using(new Clock() { + + @Override + public long currentTimeInMillis() { + return clock.millis(); + } + }); + } + + @Override + public EmbeddedEngine build() { + if (classLoader == null) { + classLoader = getClass().getClassLoader(); + } + if (clock == null) { + clock = Clock.system(); + } + Objects.requireNonNull(config, "A connector configuration must be specified."); + Objects.requireNonNull(handler, "A connector consumer or changeHandler must be specified."); + return new EmbeddedEngine(config, classLoader, clock, + handler, completionCallback, connectorCallback, offsetCommitPolicy); + } + + // backward compatibility methods + @Override + public Builder using(CompletionCallback completionCallback) { + return using((DebeziumEngine.CompletionCallback) completionCallback); + } + + @Override + public Builder using(ConnectorCallback connectorCallback) { + return using((DebeziumEngine.ConnectorCallback) connectorCallback); + } + } + /** * A callback function to be notified when the connector completes. */ - public interface CompletionCallback { - /** - * Handle the completion of the embedded connector engine. - * - * @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error - * that prevented startup or premature termination. - * @param message the completion message; never null - * @param error the error, or null if there was no exception - */ - void handle(boolean success, String message, Throwable error); + public interface CompletionCallback extends DebeziumEngine.CompletionCallback { } /** * Callback function which informs users about the various stages a connector goes through during startup */ - public interface ConnectorCallback { - - /** - * Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has - * completed successfully - */ - default void connectorStarted() { - // nothing by default - } - - /** - * Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has - * completed successfully - */ - default void connectorStopped() { - // nothing by default - } - - /** - * Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has - * completed successfully - */ - default void taskStarted() { - // nothing by default - } - - /** - * Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has - * completed successfully - */ - default void taskStopped() { - // nothing by default - } + public interface ConnectorCallback extends DebeziumEngine.ConnectorCallback { } /** @@ -350,38 +412,14 @@ public boolean hasError() { * and to signal that offsets may be flushed eventually. */ @ThreadSafe - public static interface RecordCommitter { - - /** - * Marks a single record as processed, must be called for each - * record. - * - * @param record the record to commit - */ - void markProcessed(SourceRecord record) throws InterruptedException; - - /** - * Marks a batch as finished, this may result in committing offsets/flushing - * data. - *

- * Should be called when a batch of records is finished being processed. - */ - void markBatchFinished(); + public static interface RecordCommitter extends DebeziumEngine.RecordCommitter { } /** * A contract invoked by the embedded engine when it has received a batch of change records to be processed. Allows * to process multiple records in one go, acknowledging their processing once that's done. */ - public static interface ChangeConsumer { - - /** - * Handles a batch of records, calling the {@link RecordCommitter#markProcessed(SourceRecord)} - * for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished. - * @param records the records to be processed - * @param committer the committer that indicates to the system that we are finished - */ - void handleBatch(List records, RecordCommitter committer) throws InterruptedException; + public static interface ChangeConsumer extends DebeziumEngine.ChangeConsumer { } private static ChangeConsumer buildDefaultChangeConsumer(Consumer consumer) { @@ -399,14 +437,14 @@ private static ChangeConsumer buildDefaultChangeConsumer(Consumer * @throws Exception */ @Override - public void handleBatch(List records, RecordCommitter committer) throws InterruptedException { + public void handleBatch(List records, DebeziumEngine.RecordCommitter committer) throws InterruptedException { try { for (SourceRecord record : records) { try { consumer.accept(record); committer.markProcessed(record); } - catch (StopConnectorException ex) { + catch (StopConnectorException | StopEngineException ex) { // ensure that we mark the record as finished // in this case committer.markProcessed(record); @@ -424,25 +462,7 @@ public void handleBatch(List records, RecordCommitter committer) t /** * A builder to set up and create {@link EmbeddedEngine} instances. */ - public static interface Builder { - - /** - * Call the specified function for every {@link SourceRecord data change event} read from the source database. - * This method must be called with a non-null consumer. - * - * @param consumer the consumer function - * @return this builder object so methods can be chained together; never null - */ - Builder notifying(Consumer consumer); - - /** - * Pass a custom ChangeConsumer override the default implementation, - * this allows for more complex handling of records for batch and async handling - * - * @param handler the consumer function - * @return this builder object so methods can be chained together; never null - */ - Builder notifying(ChangeConsumer handler); + public static interface Builder extends DebeziumEngine.Builder { /** * Use the specified configuration for the connector. The configuration is assumed to already be valid. @@ -452,15 +472,6 @@ public static interface Builder { */ Builder using(Configuration config); - /** - * Use the specified class loader to find all necessary classes. Passing null or not calling this method - * results in the connector using this class's class loader. - * - * @param classLoader the class loader - * @return this builder object so methods can be chained together; never null - */ - Builder using(ClassLoader classLoader); - /** * Use the specified clock when needing to determine the current time. Passing null or not calling this * method results in the connector using the {@link Clock#system() system clock}. @@ -470,38 +481,19 @@ public static interface Builder { */ Builder using(Clock clock); - /** - * When the engine's {@link EmbeddedEngine#run()} method completes, call the supplied function with the results. - * - * @param completionCallback the callback function; may be null if errors should be written to the log - * @return this builder object so methods can be chained together; never null - */ + // backward compatibility methods + Builder notifying(Consumer consumer); + + Builder notifying(DebeziumEngine.ChangeConsumer handler); + + Builder using(ClassLoader classLoader); + Builder using(CompletionCallback completionCallback); - /** - * During the engine's {@link EmbeddedEngine#run()} method, call the supplied the supplied function at different - * stages according to the completion state of each component running within the engine (connectors, tasks etc) - * - * @param connectorCallback the callback function; may be null - * @return this builder object so methods can be chained together; never null - */ Builder using(ConnectorCallback connectorCallback); - /** - * During the engine's {@link EmbeddedEngine#run()} method, decide when the offsets - * should be committed into the {@link OffsetBackingStore}. - * @param policy - * @return this builder object so methods can be chained together; never null - */ Builder using(OffsetCommitPolicy policy); - /** - * Build a new connector with the information previously supplied to this builder. - * - * @return the embedded connector; never null - * @throws IllegalArgumentException if a {@link #using(Configuration) configuration} or {@link #notifying(Consumer) - * consumer function} were not supplied before this method is called - */ EmbeddedEngine build(); } @@ -511,87 +503,16 @@ public static interface Builder { * @return the new builder; never null */ public static Builder create() { - return new Builder() { - private Configuration config; - private ChangeConsumer handler; - private ClassLoader classLoader; - private Clock clock; - private CompletionCallback completionCallback; - private ConnectorCallback connectorCallback; - private OffsetCommitPolicy offsetCommitPolicy = null; - - @Override - public Builder using(Configuration config) { - this.config = config; - return this; - } - - @Override - public Builder using(ClassLoader classLoader) { - this.classLoader = classLoader; - return this; - } - - @Override - public Builder using(Clock clock) { - this.clock = clock; - return this; - } - - @Override - public Builder using(CompletionCallback completionCallback) { - this.completionCallback = completionCallback; - return this; - } - - @Override - public Builder using(ConnectorCallback connectorCallback) { - this.connectorCallback = connectorCallback; - return this; - } - - @Override - public Builder using(OffsetCommitPolicy offsetCommitPolicy) { - this.offsetCommitPolicy = offsetCommitPolicy; - return this; - } - - @Override - public Builder notifying(Consumer consumer) { - this.handler = buildDefaultChangeConsumer(consumer); - return this; - } - - @Override - public Builder notifying(ChangeConsumer handler) { - this.handler = handler; - return this; - } - - @Override - public EmbeddedEngine build() { - if (classLoader == null) { - classLoader = getClass().getClassLoader(); - } - if (clock == null) { - clock = Clock.system(); - } - Objects.requireNonNull(config, "A connector configuration must be specified."); - Objects.requireNonNull(handler, "A connector consumer or changeHandler must be specified."); - return new EmbeddedEngine(config, classLoader, clock, - handler, completionCallback, connectorCallback, offsetCommitPolicy); - } - - }; + return new BuilderImpl(); } private final Logger logger = LoggerFactory.getLogger(getClass()); private final Configuration config; private final Clock clock; private final ClassLoader classLoader; - private final ChangeConsumer handler; - private final CompletionCallback completionCallback; - private final ConnectorCallback connectorCallback; + private final DebeziumEngine.ChangeConsumer handler; + private final DebeziumEngine.CompletionCallback completionCallback; + private final DebeziumEngine.ConnectorCallback connectorCallback; private final AtomicReference runningThread = new AtomicReference<>(); private final VariableLatch latch = new VariableLatch(0); private final Converter keyConverter; @@ -604,8 +525,8 @@ public EmbeddedEngine build() { private SourceTask task; - private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, ChangeConsumer handler, - CompletionCallback completionCallback, ConnectorCallback connectorCallback, + private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer handler, + DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy) { this.config = config; this.handler = handler; @@ -693,7 +614,7 @@ public void run() { final String engineName = config.getString(ENGINE_NAME); final String connectorClassName = config.getString(CONNECTOR_CLASS); - final Optional connectorCallback = Optional.ofNullable(this.connectorCallback); + final Optional connectorCallback = Optional.ofNullable(this.connectorCallback); // Only one thread can be in this part of the method at a time ... latch.countUp(); try { @@ -766,7 +687,7 @@ public void raiseError(Exception e) { try { // Start the connector with the given properties and get the task configurations ... connector.start(config.asMap()); - connectorCallback.ifPresent(ConnectorCallback::connectorStarted); + connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted); List> taskConfigs = connector.taskConfigs(1); Class taskClass = connector.taskClass(); task = null; @@ -791,7 +712,7 @@ public Map configs() { }; task.initialize(taskContext); task.start(taskConfigs.get(0)); - connectorCallback.ifPresent(ConnectorCallback::taskStarted); + connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted); } catch (Throwable t) { // Mask the passwords ... @@ -857,7 +778,7 @@ public Map configs() { // First stop the task ... logger.debug("Stopping the task and engine"); task.stop(); - connectorCallback.ifPresent(ConnectorCallback::taskStopped); + connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStopped); // Always commit offsets that were captured from the source records we actually processed ... commitOffsets(offsetWriter, commitTimeout, task); if (handlerError == null) { @@ -884,7 +805,7 @@ public Map configs() { finally { try { connector.stop(); - connectorCallback.ifPresent(ConnectorCallback::connectorStopped); + connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped); } catch (Throwable t) { fail("Error while trying to stop connector class '" + connectorClassName + "'", t); diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/StopConnectorException.java b/debezium-embedded/src/main/java/io/debezium/embedded/StopConnectorException.java index 389c0d176..f98f2c656 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/StopConnectorException.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/StopConnectorException.java @@ -7,7 +7,7 @@ import java.util.function.Consumer; -import org.apache.kafka.connect.errors.ConnectException; +import io.debezium.errors.DebeziumException; /** * An exception that is used to tell the connector to process the last source record and to then stop. When raised by @@ -16,7 +16,7 @@ * * @author Randall Hauch */ -public class StopConnectorException extends ConnectException { +public class StopConnectorException extends DebeziumException { private static final long serialVersionUID = 1L; diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/spi/OffsetCommitPolicy.java b/debezium-embedded/src/main/java/io/debezium/embedded/spi/OffsetCommitPolicy.java index 768214658..6aff24831 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/spi/OffsetCommitPolicy.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/spi/OffsetCommitPolicy.java @@ -18,7 +18,7 @@ * @author Randall Hauch */ @FunctionalInterface -public interface OffsetCommitPolicy { +public interface OffsetCommitPolicy extends io.debezium.engine.spi.OffsetCommitPolicy { /** * An {@link OffsetCommitPolicy} that will commit offsets as frequently as possible. This may result in reduced @@ -59,16 +59,6 @@ static OffsetCommitPolicy periodic(Configuration config) { return new PeriodicCommitOffsetPolicy(config); } - /** - * Determine if a commit of the offsets should be performed. - * - * @param numberOfMessagesSinceLastCommit the number of messages that have been received from the connector since last - * the offsets were last committed; never negative - * @param timeSinceLastCommit the time that has elapsed since the offsets were last committed; never negative - * @return {@code true} if the offsets should be committed, or {@code false} otherwise - */ - boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit); - /** * Obtain a new {@link OffsetCommitPolicy} that will commit offsets if this policy OR the other requests it. * diff --git a/debezium-embedded/src/main/resources/META-INF/services/io.debezium.engine.DebeziumEngine$Builder b/debezium-embedded/src/main/resources/META-INF/services/io.debezium.engine.DebeziumEngine$Builder new file mode 100644 index 000000000..bb5cd439a --- /dev/null +++ b/debezium-embedded/src/main/resources/META-INF/services/io.debezium.engine.DebeziumEngine$Builder @@ -0,0 +1 @@ +io.debezium.embedded.EmbeddedEngine$BuilderImpl diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java index 968e4bab8..91ec29675 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java @@ -13,6 +13,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -26,6 +27,7 @@ import io.debezium.config.Configuration; import io.debezium.doc.FixFor; +import io.debezium.engine.DebeziumEngine; import io.debezium.util.Collect; import io.debezium.util.LoggingContext; import io.debezium.util.Testing; @@ -149,6 +151,63 @@ public void shouldWorkToUseCustomChangeConsumer() throws Exception { stopConnector(); } + @Test + public void shouldRunDebeziumEngine() throws Exception { + // Add initial content to the file ... + appendLinesToSource(NUMBER_OF_LINES); + + final Properties props = new Properties(); + props.setProperty("name", "debezium-engine"); + props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector"); + props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + props.setProperty("offset.flush.interval.ms", "0"); + props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString()); + props.setProperty("topic", "topicX"); + + CountDownLatch firstLatch = new CountDownLatch(1); + CountDownLatch allLatch = new CountDownLatch(6); + + // create an engine with our custom class + final DebeziumEngine engine = DebeziumEngine.create(SourceRecord.class) + .using(props) + .notifying((records, committer) -> { + assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES); + Integer groupCount = records.size() / NUMBER_OF_LINES; + + for (SourceRecord r : records) { + committer.markProcessed(r); + } + + committer.markBatchFinished(); + firstLatch.countDown(); + for (int i = 0; i < groupCount; i++) { + allLatch.countDown(); + } + }) + .using(this.getClass().getClassLoader()) + .build(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + exec.execute(() -> { + LoggingContext.forConnector(getClass().getSimpleName(), "", "engine"); + engine.run(); + }); + + firstLatch.await(5000, TimeUnit.MILLISECONDS); + assertThat(firstLatch.getCount()).isEqualTo(0); + + for (int i = 0; i < 5; i++) { + // Add a few more lines, and then verify they are consumed ... + appendLinesToSource(NUMBER_OF_LINES); + Thread.sleep(10); + } + allLatch.await(5000, TimeUnit.MILLISECONDS); + assertThat(allLatch.getCount()).isEqualTo(0); + + // Stop the connector ... + stopConnector(); + } + protected void appendLinesToSource(int numberOfLines) throws IOException { CharSequence[] lines = new CharSequence[numberOfLines]; for (int i = 0; i != numberOfLines; ++i) { diff --git a/pom.xml b/pom.xml index b91876453..fd3a0091e 100644 --- a/pom.xml +++ b/pom.xml @@ -155,6 +155,7 @@ support/checkstyle support/ide-configs + debezium-api debezium-ddl-parser debezium-assembly-descriptors debezium-core @@ -408,6 +409,11 @@ + + io.debezium + debezium-api + ${project.version} + io.debezium debezium-core