DBZ-156 Updates EmbeddedEngine to better handle exceptional cases and provide more feedback during startup

It also updates  EmbeddedEngine to use the Kafka commit callbacks introduced after 0.10 and updates AbstractConnectorTest to better synchronize with the embedded engine
This commit is contained in:
Horia Chiorean 2016-11-17 18:54:12 +02:00
parent a82ae5691b
commit 506457c13b
2 changed files with 154 additions and 54 deletions

View File

@ -8,6 +8,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -155,7 +156,7 @@ public final class EmbeddedEngine implements Runnable {
/** /**
* A callback function to be notified when the connector completes. * A callback function to be notified when the connector completes.
*/ */
public static interface CompletionCallback { public interface CompletionCallback {
/** /**
* Handle the completion of the embedded connector engine. * Handle the completion of the embedded connector engine.
* *
@ -166,6 +167,44 @@ public static interface CompletionCallback {
*/ */
void handle(boolean success, String message, Throwable error); void handle(boolean success, String message, Throwable error);
} }
/**
* Callback function which informs users about the various stages a connector goes through during startup
*/
public interface ConnectorCallback {
/**
* Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has
* completed successfully
*/
default void connectorStarted() {
//nothing by default
}
/**
* Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has
* completed successfully
*/
default void connectorStopped() {
//nothing by default
}
/**
* Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has
* completed successfully
*/
default void taskStarted() {
//nothing by default
}
/**
* Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has
* completed successfully
*/
default void taskStopped() {
//nothing by default
}
}
/** /**
* A callback function to be notified when the connector completes. * A callback function to be notified when the connector completes.
@ -309,6 +348,15 @@ public static interface Builder {
*/ */
Builder using(CompletionCallback completionCallback); Builder using(CompletionCallback completionCallback);
/**
* During the engine's {@link EmbeddedEngine#run()} method, call the supplied the supplied function at different
* stages according to the completion state of each component running within the engine (connectors, tasks etc)
*
* @param connectorCallback the callback function; may be null
* @return this builder object so methods can be chained together; never null
*/
Builder using(ConnectorCallback connectorCallback);
/** /**
* Build a new connector with the information previously supplied to this builder. * Build a new connector with the information previously supplied to this builder.
* *
@ -331,6 +379,7 @@ public static Builder create() {
private ClassLoader classLoader; private ClassLoader classLoader;
private Clock clock; private Clock clock;
private CompletionCallback completionCallback; private CompletionCallback completionCallback;
private ConnectorCallback connectorCallback;
@Override @Override
public Builder using(Configuration config) { public Builder using(Configuration config) {
@ -355,7 +404,13 @@ public Builder using(CompletionCallback completionCallback) {
this.completionCallback = completionCallback; this.completionCallback = completionCallback;
return this; return this;
} }
@Override
public Builder using(ConnectorCallback connectorCallback) {
this.connectorCallback = connectorCallback;
return this;
}
@Override @Override
public Builder notifying(Consumer<SourceRecord> consumer) { public Builder notifying(Consumer<SourceRecord> consumer) {
this.consumer = consumer; this.consumer = consumer;
@ -368,7 +423,7 @@ public EmbeddedEngine build() {
if (clock == null) clock = Clock.system(); if (clock == null) clock = Clock.system();
Objects.requireNonNull(config, "A connector configuration must be specified."); Objects.requireNonNull(config, "A connector configuration must be specified.");
Objects.requireNonNull(consumer, "A connector consumer must be specified."); Objects.requireNonNull(consumer, "A connector consumer must be specified.");
return new EmbeddedEngine(config, classLoader, clock, consumer, completionCallback); return new EmbeddedEngine(config, classLoader, clock, consumer, completionCallback, connectorCallback);
} }
}; };
@ -380,16 +435,18 @@ public EmbeddedEngine build() {
private final ClassLoader classLoader; private final ClassLoader classLoader;
private final Consumer<SourceRecord> consumer; private final Consumer<SourceRecord> consumer;
private final CompletionCallback completionCallback; private final CompletionCallback completionCallback;
private final ConnectorCallback connectorCallback;
private final AtomicReference<Thread> runningThread = new AtomicReference<>(); private final AtomicReference<Thread> runningThread = new AtomicReference<>();
private final VariableLatch latch = new VariableLatch(0); private final VariableLatch latch = new VariableLatch(0);
private final Converter keyConverter; private final Converter keyConverter;
private final Converter valueConverter; private final Converter valueConverter;
private final WorkerConfig workerConfig; private final WorkerConfig workerConfig;
private final CompletionResult completionResult;
private long recordsSinceLastCommit = 0; private long recordsSinceLastCommit = 0;
private long timeSinceLastCommitMillis = 0; private long timeSinceLastCommitMillis = 0;
private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer, private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer,
CompletionCallback completionCallback) { CompletionCallback completionCallback, ConnectorCallback connectorCallback) {
this.config = config; this.config = config;
this.consumer = consumer; this.consumer = consumer;
this.classLoader = classLoader; this.classLoader = classLoader;
@ -397,6 +454,8 @@ private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock cloc
this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> { this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {
if (success) logger.error(msg, error); if (success) logger.error(msg, error);
}; };
this.connectorCallback = connectorCallback;
this.completionResult = new CompletionResult();
assert this.config != null; assert this.config != null;
assert this.consumer != null; assert this.consumer != null;
assert this.classLoader != null; assert this.classLoader != null;
@ -433,11 +492,13 @@ private void fail(String msg) {
} }
private void fail(String msg, Throwable error) { private void fail(String msg, Throwable error) {
completionCallback.handle(false, msg, error); // don't use the completion callback here because we want to store the error and message only
completionResult.handle(false, msg, error);
} }
private void succeed(String msg) { private void succeed(String msg) {
completionCallback.handle(true, msg, null); // don't use the completion callback here because we want to store the error and message only
completionResult.handle(true, msg, null);
} }
/** /**
@ -461,6 +522,7 @@ public void run() {
final String engineName = config.getString(ENGINE_NAME); final String engineName = config.getString(ENGINE_NAME);
final String connectorClassName = config.getString(CONNECTOR_CLASS); final String connectorClassName = config.getString(CONNECTOR_CLASS);
final Optional<ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback);
// Only one thread can be in this part of the method at a time ... // Only one thread can be in this part of the method at a time ...
latch.countUp(); latch.countUp();
try { try {
@ -529,6 +591,7 @@ public void raiseError(Exception e) {
try { try {
// Start the connector with the given properties and get the task configurations ... // Start the connector with the given properties and get the task configurations ...
connector.start(config.asMap()); connector.start(config.asMap());
connectorCallback.ifPresent(ConnectorCallback::connectorStarted);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1); List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
Class<? extends Task> taskClass = connector.taskClass(); Class<? extends Task> taskClass = connector.taskClass();
SourceTask task = null; SourceTask task = null;
@ -542,6 +605,7 @@ public void raiseError(Exception e) {
SourceTaskContext taskContext = () -> offsetReader; SourceTaskContext taskContext = () -> offsetReader;
task.initialize(taskContext); task.initialize(taskContext);
task.start(taskConfigs.get(0)); task.start(taskConfigs.get(0));
connectorCallback.ifPresent(ConnectorCallback::taskStarted);
} catch (Throwable t) { } catch (Throwable t) {
String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: " String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: "
+ taskConfigs.get(0); + taskConfigs.get(0);
@ -551,50 +615,55 @@ public void raiseError(Exception e) {
recordsSinceLastCommit = 0; recordsSinceLastCommit = 0;
Throwable handlerError = null; Throwable handlerError = null;
timeSinceLastCommitMillis = clock.currentTimeInMillis();
while (runningThread.get() != null && handlerError == null) {
try {
logger.debug("Embedded engine is polling task for records");
List<SourceRecord> changeRecords = task.poll(); // blocks until there are values ...
if (changeRecords != null && !changeRecords.isEmpty()) {
logger.debug("Received {} records from the task", changeRecords.size());
// First forward the records to the connector's consumer ...
for (SourceRecord record : changeRecords) {
try {
consumer.accept(record);
} catch (Throwable t) {
handlerError = t;
break;
}
// Record the offset for this record's partition
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
recordsSinceLastCommit += 1;
}
// Flush the offsets to storage if necessary ...
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs);
} else {
logger.debug("Received no records from the task");
}
} catch (InterruptedException e) {
// This thread was interrupted, which signals that the thread should stop work.
// We first try to commit the offsets, since we record them only after the records were handled
// by the consumer ...
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs);
// Then clear the interrupted status ...
Thread.interrupted();
break;
}
}
try { try {
timeSinceLastCommitMillis = clock.currentTimeInMillis();
while (runningThread.get() != null && handlerError == null) {
try {
logger.debug("Embedded engine is polling task for records");
List<SourceRecord> changeRecords = task.poll(); // blocks until there are values ...
if (changeRecords != null && !changeRecords.isEmpty()) {
logger.debug("Received {} records from the task", changeRecords.size());
// First forward the records to the connector's consumer ...
for (SourceRecord record : changeRecords) {
try {
consumer.accept(record);
task.commitRecord(record);
} catch (Throwable t) {
handlerError = t;
break;
}
// Record the offset for this record's partition
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
recordsSinceLastCommit += 1;
}
// Flush the offsets to storage if necessary ...
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs, task);
} else {
logger.debug("Received no records from the task");
}
} catch (Throwable t) {
// There was some sort of unexpected exception, so we should stop work
// We first try to commit the offsets, since we record them only after the records were handled
// by the consumer ...
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs, task);
if (t instanceof InterruptedException) {
// Then clear the interrupted status ...
Thread.interrupted();
}
break;
}
}
} finally {
// First stop the task ... // First stop the task ...
logger.debug("Stopping the task and engine"); logger.debug("Stopping the task and engine");
task.stop(); task.stop();
} finally { connectorCallback.ifPresent(ConnectorCallback::taskStopped);
// Always commit offsets that were captured from the source records we actually processed ... // Always commit offsets that were captured from the source records we actually processed ...
commitOffsets(offsetWriter, commitTimeoutMs); commitOffsets(offsetWriter, commitTimeoutMs, task);
if (handlerError != null) { if (handlerError != null) {
// There was an error in the handler ... // There was an error in the handler ...
fail("Stopping connector after error in the application's handler method: " + handlerError.getMessage(), fail("Stopping connector after error in the application's handler method: " + handlerError.getMessage(),
@ -606,44 +675,49 @@ public void raiseError(Exception e) {
} }
} catch (Throwable t) { } catch (Throwable t) {
fail("Error while trying to run connector class '" + connectorClassName + "'", t); fail("Error while trying to run connector class '" + connectorClassName + "'", t);
return;
} finally { } finally {
// Close the offset storage and finally the connector ... // Close the offset storage and finally the connector ...
try { try {
offsetStore.stop(); offsetStore.stop();
} finally { } finally {
connector.stop(); connector.stop();
connectorCallback.ifPresent(ConnectorCallback::connectorStopped);
} }
} }
} finally { } finally {
latch.countDown(); latch.countDown();
runningThread.set(null); runningThread.set(null);
// after we've "shut down" the engine, fire the completion callback based on the results we collected
completionCallback.handle(completionResult.success(), completionResult.message(), completionResult.error());
} }
} }
} }
/** /**
* Determine if we should flush offsets to storage, and if so then attempt to flush offsets. * Determine if we should flush offsets to storage, and if so then attempt to flush offsets.
* *
* @param offsetWriter the offset storage writer; may not be null * @param offsetWriter the offset storage writer; may not be null
* @param policy the offset commit policy; may not be null * @param policy the offset commit policy; may not be null
* @param commitTimeoutMs the timeout to wait for commit results * @param commitTimeoutMs the timeout to wait for commit results
* @param task the task which produced the records for which the offsets have been committed
*/ */
protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, long commitTimeoutMs) { protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, long commitTimeoutMs,
SourceTask task) {
// Determine if we need to commit to offset storage ... // Determine if we need to commit to offset storage ...
if (policy.performCommit(recordsSinceLastCommit, timeSinceLastCommitMillis, if (policy.performCommit(recordsSinceLastCommit, timeSinceLastCommitMillis,
TimeUnit.MILLISECONDS)) { TimeUnit.MILLISECONDS)) {
commitOffsets(offsetWriter, commitTimeoutMs); commitOffsets(offsetWriter, commitTimeoutMs, task);
} }
} }
/** /**
* Flush offsets to storage. * Flush offsets to storage.
* *
* @param offsetWriter the offset storage writer; may not be null * @param offsetWriter the offset storage writer; may not be null
* @param commitTimeoutMs the timeout to wait for commit results * @param commitTimeoutMs the timeout to wait for commit results
* @param task the task which produced the records for which the offsets have been committed
*/ */
protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeoutMs) { protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeoutMs, SourceTask task) {
long started = clock.currentTimeInMillis(); long started = clock.currentTimeInMillis();
long timeout = started + commitTimeoutMs; long timeout = started + commitTimeoutMs;
if (!offsetWriter.beginFlush()) return; if (!offsetWriter.beginFlush()) return;
@ -653,6 +727,8 @@ protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeou
// Wait until the offsets are flushed ... // Wait until the offsets are flushed ...
try { try {
flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0), TimeUnit.MILLISECONDS); flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0), TimeUnit.MILLISECONDS);
// if we've gotten this far, the offsets have been committed so notify the task
task.commit();
recordsSinceLastCommit = 0; recordsSinceLastCommit = 0;
timeSinceLastCommitMillis = clock.currentTimeInMillis(); timeSinceLastCommitMillis = clock.currentTimeInMillis();
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -55,6 +55,7 @@
import io.debezium.data.SchemaUtil; import io.debezium.data.SchemaUtil;
import io.debezium.data.VerifyRecord; import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine.CompletionCallback; import io.debezium.embedded.EmbeddedEngine.CompletionCallback;
import io.debezium.embedded.EmbeddedEngine.ConnectorCallback;
import io.debezium.embedded.EmbeddedEngine.EmbeddedConfig; import io.debezium.embedded.EmbeddedEngine.EmbeddedConfig;
import io.debezium.function.BooleanConsumer; import io.debezium.function.BooleanConsumer;
import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecord;
@ -250,10 +251,22 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
try { try {
if (callback != null) callback.handle(success, msg, error); if (callback != null) callback.handle(success, msg, error);
} finally { } finally {
latch.countDown(); if (!success) {
// we only unblock if there was an error; in all other cases we're unblocking when a task has been started
latch.countDown();
}
} }
Testing.debug("Stopped connector"); Testing.debug("Stopped connector");
}; };
ConnectorCallback connectorCallback = new ConnectorCallback() {
@Override
public void taskStarted() {
// if this is called, it means a task has been started successfully so we can continue
latch.countDown();
}
};
// Create the connector ... // Create the connector ...
engine = EmbeddedEngine.create() engine = EmbeddedEngine.create()
.using(config) .using(config)
@ -270,6 +283,7 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
}) })
.using(this.getClass().getClassLoader()) .using(this.getClass().getClassLoader())
.using(wrapperCallback) .using(wrapperCallback)
.using(connectorCallback)
.build(); .build();
// Submit the connector for asynchronous execution ... // Submit the connector for asynchronous execution ...
@ -279,6 +293,16 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine"); LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run(); engine.run();
}); });
try {
if (!latch.await(10, TimeUnit.SECONDS)) {
// maybe it takes more time to start up, so just log a warning and continue
logger.warn("The connector did not finish starting its task(s) or complete in the expected amount of time");
}
} catch (InterruptedException e) {
if (Thread.interrupted()) {
fail("Interrupted while waiting for engine startup");
}
}
} }
/** /**