From 506457c13b2638dfd781dbde2167537269ae7374 Mon Sep 17 00:00:00 2001 From: Horia Chiorean Date: Thu, 17 Nov 2016 18:54:12 +0200 Subject: [PATCH] DBZ-156 Updates EmbeddedEngine to better handle exceptional cases and provide more feedback during startup It also updates EmbeddedEngine to use the Kafka commit callbacks introduced after 0.10 and updates AbstractConnectorTest to better synchronize with the embedded engine --- .../io/debezium/embedded/EmbeddedEngine.java | 182 +++++++++++++----- .../embedded/AbstractConnectorTest.java | 26 ++- 2 files changed, 154 insertions(+), 54 deletions(-) diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java index 64ec4d673..bace4e2cb 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -155,7 +156,7 @@ public final class EmbeddedEngine implements Runnable { /** * A callback function to be notified when the connector completes. */ - public static interface CompletionCallback { + public interface CompletionCallback { /** * Handle the completion of the embedded connector engine. * @@ -166,6 +167,44 @@ public static interface CompletionCallback { */ void handle(boolean success, String message, Throwable error); } + + /** + * Callback function which informs users about the various stages a connector goes through during startup + */ + public interface ConnectorCallback { + + /** + * Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has + * completed successfully + */ + default void connectorStarted() { + //nothing by default + } + + /** + * Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has + * completed successfully + */ + default void connectorStopped() { + //nothing by default + } + + /** + * Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has + * completed successfully + */ + default void taskStarted() { + //nothing by default + } + + /** + * Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has + * completed successfully + */ + default void taskStopped() { + //nothing by default + } + } /** * A callback function to be notified when the connector completes. @@ -309,6 +348,15 @@ public static interface Builder { */ Builder using(CompletionCallback completionCallback); + /** + * During the engine's {@link EmbeddedEngine#run()} method, call the supplied the supplied function at different + * stages according to the completion state of each component running within the engine (connectors, tasks etc) + * + * @param connectorCallback the callback function; may be null + * @return this builder object so methods can be chained together; never null + */ + Builder using(ConnectorCallback connectorCallback); + /** * Build a new connector with the information previously supplied to this builder. * @@ -331,6 +379,7 @@ public static Builder create() { private ClassLoader classLoader; private Clock clock; private CompletionCallback completionCallback; + private ConnectorCallback connectorCallback; @Override public Builder using(Configuration config) { @@ -355,7 +404,13 @@ public Builder using(CompletionCallback completionCallback) { this.completionCallback = completionCallback; return this; } - + + @Override + public Builder using(ConnectorCallback connectorCallback) { + this.connectorCallback = connectorCallback; + return this; + } + @Override public Builder notifying(Consumer consumer) { this.consumer = consumer; @@ -368,7 +423,7 @@ public EmbeddedEngine build() { if (clock == null) clock = Clock.system(); Objects.requireNonNull(config, "A connector configuration must be specified."); Objects.requireNonNull(consumer, "A connector consumer must be specified."); - return new EmbeddedEngine(config, classLoader, clock, consumer, completionCallback); + return new EmbeddedEngine(config, classLoader, clock, consumer, completionCallback, connectorCallback); } }; @@ -380,16 +435,18 @@ public EmbeddedEngine build() { private final ClassLoader classLoader; private final Consumer consumer; private final CompletionCallback completionCallback; + private final ConnectorCallback connectorCallback; private final AtomicReference runningThread = new AtomicReference<>(); private final VariableLatch latch = new VariableLatch(0); private final Converter keyConverter; private final Converter valueConverter; private final WorkerConfig workerConfig; + private final CompletionResult completionResult; private long recordsSinceLastCommit = 0; private long timeSinceLastCommitMillis = 0; private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer consumer, - CompletionCallback completionCallback) { + CompletionCallback completionCallback, ConnectorCallback connectorCallback) { this.config = config; this.consumer = consumer; this.classLoader = classLoader; @@ -397,6 +454,8 @@ private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock cloc this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> { if (success) logger.error(msg, error); }; + this.connectorCallback = connectorCallback; + this.completionResult = new CompletionResult(); assert this.config != null; assert this.consumer != null; assert this.classLoader != null; @@ -433,11 +492,13 @@ private void fail(String msg) { } private void fail(String msg, Throwable error) { - completionCallback.handle(false, msg, error); + // don't use the completion callback here because we want to store the error and message only + completionResult.handle(false, msg, error); } private void succeed(String msg) { - completionCallback.handle(true, msg, null); + // don't use the completion callback here because we want to store the error and message only + completionResult.handle(true, msg, null); } /** @@ -461,6 +522,7 @@ public void run() { final String engineName = config.getString(ENGINE_NAME); final String connectorClassName = config.getString(CONNECTOR_CLASS); + final Optional connectorCallback = Optional.ofNullable(this.connectorCallback); // Only one thread can be in this part of the method at a time ... latch.countUp(); try { @@ -529,6 +591,7 @@ public void raiseError(Exception e) { try { // Start the connector with the given properties and get the task configurations ... connector.start(config.asMap()); + connectorCallback.ifPresent(ConnectorCallback::connectorStarted); List> taskConfigs = connector.taskConfigs(1); Class taskClass = connector.taskClass(); SourceTask task = null; @@ -542,6 +605,7 @@ public void raiseError(Exception e) { SourceTaskContext taskContext = () -> offsetReader; task.initialize(taskContext); task.start(taskConfigs.get(0)); + connectorCallback.ifPresent(ConnectorCallback::taskStarted); } catch (Throwable t) { String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: " + taskConfigs.get(0); @@ -551,50 +615,55 @@ public void raiseError(Exception e) { recordsSinceLastCommit = 0; Throwable handlerError = null; - timeSinceLastCommitMillis = clock.currentTimeInMillis(); - while (runningThread.get() != null && handlerError == null) { - try { - logger.debug("Embedded engine is polling task for records"); - List changeRecords = task.poll(); // blocks until there are values ... - if (changeRecords != null && !changeRecords.isEmpty()) { - logger.debug("Received {} records from the task", changeRecords.size()); - - // First forward the records to the connector's consumer ... - for (SourceRecord record : changeRecords) { - try { - consumer.accept(record); - } catch (Throwable t) { - handlerError = t; - break; - } - - // Record the offset for this record's partition - offsetWriter.offset(record.sourcePartition(), record.sourceOffset()); - recordsSinceLastCommit += 1; - } - - // Flush the offsets to storage if necessary ... - maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs); - } else { - logger.debug("Received no records from the task"); - } - } catch (InterruptedException e) { - // This thread was interrupted, which signals that the thread should stop work. - // We first try to commit the offsets, since we record them only after the records were handled - // by the consumer ... - maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs); - // Then clear the interrupted status ... - Thread.interrupted(); - break; - } - } try { + timeSinceLastCommitMillis = clock.currentTimeInMillis(); + while (runningThread.get() != null && handlerError == null) { + try { + logger.debug("Embedded engine is polling task for records"); + List changeRecords = task.poll(); // blocks until there are values ... + if (changeRecords != null && !changeRecords.isEmpty()) { + logger.debug("Received {} records from the task", changeRecords.size()); + + // First forward the records to the connector's consumer ... + for (SourceRecord record : changeRecords) { + try { + consumer.accept(record); + task.commitRecord(record); + } catch (Throwable t) { + handlerError = t; + break; + } + + // Record the offset for this record's partition + offsetWriter.offset(record.sourcePartition(), record.sourceOffset()); + recordsSinceLastCommit += 1; + } + + // Flush the offsets to storage if necessary ... + maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs, task); + } else { + logger.debug("Received no records from the task"); + } + } catch (Throwable t) { + // There was some sort of unexpected exception, so we should stop work + // We first try to commit the offsets, since we record them only after the records were handled + // by the consumer ... + maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs, task); + if (t instanceof InterruptedException) { + // Then clear the interrupted status ... + Thread.interrupted(); + } + break; + } + } + } finally { // First stop the task ... logger.debug("Stopping the task and engine"); task.stop(); - } finally { + connectorCallback.ifPresent(ConnectorCallback::taskStopped); + // Always commit offsets that were captured from the source records we actually processed ... - commitOffsets(offsetWriter, commitTimeoutMs); + commitOffsets(offsetWriter, commitTimeoutMs, task); if (handlerError != null) { // There was an error in the handler ... fail("Stopping connector after error in the application's handler method: " + handlerError.getMessage(), @@ -606,44 +675,49 @@ public void raiseError(Exception e) { } } catch (Throwable t) { fail("Error while trying to run connector class '" + connectorClassName + "'", t); - return; } finally { // Close the offset storage and finally the connector ... try { offsetStore.stop(); } finally { connector.stop(); + connectorCallback.ifPresent(ConnectorCallback::connectorStopped); } } } finally { latch.countDown(); runningThread.set(null); + // after we've "shut down" the engine, fire the completion callback based on the results we collected + completionCallback.handle(completionResult.success(), completionResult.message(), completionResult.error()); } } } - + /** * Determine if we should flush offsets to storage, and if so then attempt to flush offsets. - * + * * @param offsetWriter the offset storage writer; may not be null * @param policy the offset commit policy; may not be null * @param commitTimeoutMs the timeout to wait for commit results + * @param task the task which produced the records for which the offsets have been committed */ - protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, long commitTimeoutMs) { + protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, long commitTimeoutMs, + SourceTask task) { // Determine if we need to commit to offset storage ... if (policy.performCommit(recordsSinceLastCommit, timeSinceLastCommitMillis, TimeUnit.MILLISECONDS)) { - commitOffsets(offsetWriter, commitTimeoutMs); + commitOffsets(offsetWriter, commitTimeoutMs, task); } } - + /** * Flush offsets to storage. - * + * * @param offsetWriter the offset storage writer; may not be null * @param commitTimeoutMs the timeout to wait for commit results + * @param task the task which produced the records for which the offsets have been committed */ - protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeoutMs) { + protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeoutMs, SourceTask task) { long started = clock.currentTimeInMillis(); long timeout = started + commitTimeoutMs; if (!offsetWriter.beginFlush()) return; @@ -653,6 +727,8 @@ protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeou // Wait until the offsets are flushed ... try { flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0), TimeUnit.MILLISECONDS); + // if we've gotten this far, the offsets have been committed so notify the task + task.commit(); recordsSinceLastCommit = 0; timeSinceLastCommitMillis = clock.currentTimeInMillis(); } catch (InterruptedException e) { diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index 10ddc8435..041240041 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -55,6 +55,7 @@ import io.debezium.data.SchemaUtil; import io.debezium.data.VerifyRecord; import io.debezium.embedded.EmbeddedEngine.CompletionCallback; +import io.debezium.embedded.EmbeddedEngine.ConnectorCallback; import io.debezium.embedded.EmbeddedEngine.EmbeddedConfig; import io.debezium.function.BooleanConsumer; import io.debezium.relational.history.HistoryRecord; @@ -250,10 +251,22 @@ protected void start(Class connectorClass, Configurat try { if (callback != null) callback.handle(success, msg, error); } finally { - latch.countDown(); + if (!success) { + // we only unblock if there was an error; in all other cases we're unblocking when a task has been started + latch.countDown(); + } } Testing.debug("Stopped connector"); }; + + ConnectorCallback connectorCallback = new ConnectorCallback() { + @Override + public void taskStarted() { + // if this is called, it means a task has been started successfully so we can continue + latch.countDown(); + } + }; + // Create the connector ... engine = EmbeddedEngine.create() .using(config) @@ -270,6 +283,7 @@ protected void start(Class connectorClass, Configurat }) .using(this.getClass().getClassLoader()) .using(wrapperCallback) + .using(connectorCallback) .build(); // Submit the connector for asynchronous execution ... @@ -279,6 +293,16 @@ protected void start(Class connectorClass, Configurat LoggingContext.forConnector(getClass().getSimpleName(), "", "engine"); engine.run(); }); + try { + if (!latch.await(10, TimeUnit.SECONDS)) { + // maybe it takes more time to start up, so just log a warning and continue + logger.warn("The connector did not finish starting its task(s) or complete in the expected amount of time"); + } + } catch (InterruptedException e) { + if (Thread.interrupted()) { + fail("Interrupted while waiting for engine startup"); + } + } } /**