Merge pull request #140 from hchiorean/DBZ-156

DBZ-156 Updates EmbeddedEngine to better handle exceptional cases and provide more feedback during startup
This commit is contained in:
Randall Hauch 2016-11-17 19:15:07 -06:00 committed by GitHub
commit 0dc86dbdae
2 changed files with 154 additions and 54 deletions

View File

@ -8,6 +8,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@ -155,7 +156,7 @@ public final class EmbeddedEngine implements Runnable {
/**
* A callback function to be notified when the connector completes.
*/
public static interface CompletionCallback {
public interface CompletionCallback {
/**
* Handle the completion of the embedded connector engine.
*
@ -166,6 +167,44 @@ public static interface CompletionCallback {
*/
void handle(boolean success, String message, Throwable error);
}
/**
* Callback function which informs users about the various stages a connector goes through during startup
*/
public interface ConnectorCallback {
/**
* Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has
* completed successfully
*/
default void connectorStarted() {
//nothing by default
}
/**
* Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has
* completed successfully
*/
default void connectorStopped() {
//nothing by default
}
/**
* Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has
* completed successfully
*/
default void taskStarted() {
//nothing by default
}
/**
* Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has
* completed successfully
*/
default void taskStopped() {
//nothing by default
}
}
/**
* A callback function to be notified when the connector completes.
@ -309,6 +348,15 @@ public static interface Builder {
*/
Builder using(CompletionCallback completionCallback);
/**
* During the engine's {@link EmbeddedEngine#run()} method, call the supplied the supplied function at different
* stages according to the completion state of each component running within the engine (connectors, tasks etc)
*
* @param connectorCallback the callback function; may be null
* @return this builder object so methods can be chained together; never null
*/
Builder using(ConnectorCallback connectorCallback);
/**
* Build a new connector with the information previously supplied to this builder.
*
@ -331,6 +379,7 @@ public static Builder create() {
private ClassLoader classLoader;
private Clock clock;
private CompletionCallback completionCallback;
private ConnectorCallback connectorCallback;
@Override
public Builder using(Configuration config) {
@ -355,7 +404,13 @@ public Builder using(CompletionCallback completionCallback) {
this.completionCallback = completionCallback;
return this;
}
@Override
public Builder using(ConnectorCallback connectorCallback) {
this.connectorCallback = connectorCallback;
return this;
}
@Override
public Builder notifying(Consumer<SourceRecord> consumer) {
this.consumer = consumer;
@ -368,7 +423,7 @@ public EmbeddedEngine build() {
if (clock == null) clock = Clock.system();
Objects.requireNonNull(config, "A connector configuration must be specified.");
Objects.requireNonNull(consumer, "A connector consumer must be specified.");
return new EmbeddedEngine(config, classLoader, clock, consumer, completionCallback);
return new EmbeddedEngine(config, classLoader, clock, consumer, completionCallback, connectorCallback);
}
};
@ -380,16 +435,18 @@ public EmbeddedEngine build() {
private final ClassLoader classLoader;
private final Consumer<SourceRecord> consumer;
private final CompletionCallback completionCallback;
private final ConnectorCallback connectorCallback;
private final AtomicReference<Thread> runningThread = new AtomicReference<>();
private final VariableLatch latch = new VariableLatch(0);
private final Converter keyConverter;
private final Converter valueConverter;
private final WorkerConfig workerConfig;
private final CompletionResult completionResult;
private long recordsSinceLastCommit = 0;
private long timeSinceLastCommitMillis = 0;
private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer,
CompletionCallback completionCallback) {
CompletionCallback completionCallback, ConnectorCallback connectorCallback) {
this.config = config;
this.consumer = consumer;
this.classLoader = classLoader;
@ -397,6 +454,8 @@ private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock cloc
this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {
if (success) logger.error(msg, error);
};
this.connectorCallback = connectorCallback;
this.completionResult = new CompletionResult();
assert this.config != null;
assert this.consumer != null;
assert this.classLoader != null;
@ -433,11 +492,13 @@ private void fail(String msg) {
}
private void fail(String msg, Throwable error) {
completionCallback.handle(false, msg, error);
// don't use the completion callback here because we want to store the error and message only
completionResult.handle(false, msg, error);
}
private void succeed(String msg) {
completionCallback.handle(true, msg, null);
// don't use the completion callback here because we want to store the error and message only
completionResult.handle(true, msg, null);
}
/**
@ -461,6 +522,7 @@ public void run() {
final String engineName = config.getString(ENGINE_NAME);
final String connectorClassName = config.getString(CONNECTOR_CLASS);
final Optional<ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback);
// Only one thread can be in this part of the method at a time ...
latch.countUp();
try {
@ -529,6 +591,7 @@ public void raiseError(Exception e) {
try {
// Start the connector with the given properties and get the task configurations ...
connector.start(config.asMap());
connectorCallback.ifPresent(ConnectorCallback::connectorStarted);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
Class<? extends Task> taskClass = connector.taskClass();
SourceTask task = null;
@ -542,6 +605,7 @@ public void raiseError(Exception e) {
SourceTaskContext taskContext = () -> offsetReader;
task.initialize(taskContext);
task.start(taskConfigs.get(0));
connectorCallback.ifPresent(ConnectorCallback::taskStarted);
} catch (Throwable t) {
String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: "
+ taskConfigs.get(0);
@ -551,50 +615,55 @@ public void raiseError(Exception e) {
recordsSinceLastCommit = 0;
Throwable handlerError = null;
timeSinceLastCommitMillis = clock.currentTimeInMillis();
while (runningThread.get() != null && handlerError == null) {
try {
logger.debug("Embedded engine is polling task for records");
List<SourceRecord> changeRecords = task.poll(); // blocks until there are values ...
if (changeRecords != null && !changeRecords.isEmpty()) {
logger.debug("Received {} records from the task", changeRecords.size());
// First forward the records to the connector's consumer ...
for (SourceRecord record : changeRecords) {
try {
consumer.accept(record);
} catch (Throwable t) {
handlerError = t;
break;
}
// Record the offset for this record's partition
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
recordsSinceLastCommit += 1;
}
// Flush the offsets to storage if necessary ...
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs);
} else {
logger.debug("Received no records from the task");
}
} catch (InterruptedException e) {
// This thread was interrupted, which signals that the thread should stop work.
// We first try to commit the offsets, since we record them only after the records were handled
// by the consumer ...
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs);
// Then clear the interrupted status ...
Thread.interrupted();
break;
}
}
try {
timeSinceLastCommitMillis = clock.currentTimeInMillis();
while (runningThread.get() != null && handlerError == null) {
try {
logger.debug("Embedded engine is polling task for records");
List<SourceRecord> changeRecords = task.poll(); // blocks until there are values ...
if (changeRecords != null && !changeRecords.isEmpty()) {
logger.debug("Received {} records from the task", changeRecords.size());
// First forward the records to the connector's consumer ...
for (SourceRecord record : changeRecords) {
try {
consumer.accept(record);
task.commitRecord(record);
} catch (Throwable t) {
handlerError = t;
break;
}
// Record the offset for this record's partition
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
recordsSinceLastCommit += 1;
}
// Flush the offsets to storage if necessary ...
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs, task);
} else {
logger.debug("Received no records from the task");
}
} catch (Throwable t) {
// There was some sort of unexpected exception, so we should stop work
// We first try to commit the offsets, since we record them only after the records were handled
// by the consumer ...
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs, task);
if (t instanceof InterruptedException) {
// Then clear the interrupted status ...
Thread.interrupted();
}
break;
}
}
} finally {
// First stop the task ...
logger.debug("Stopping the task and engine");
task.stop();
} finally {
connectorCallback.ifPresent(ConnectorCallback::taskStopped);
// Always commit offsets that were captured from the source records we actually processed ...
commitOffsets(offsetWriter, commitTimeoutMs);
commitOffsets(offsetWriter, commitTimeoutMs, task);
if (handlerError != null) {
// There was an error in the handler ...
fail("Stopping connector after error in the application's handler method: " + handlerError.getMessage(),
@ -606,44 +675,49 @@ public void raiseError(Exception e) {
}
} catch (Throwable t) {
fail("Error while trying to run connector class '" + connectorClassName + "'", t);
return;
} finally {
// Close the offset storage and finally the connector ...
try {
offsetStore.stop();
} finally {
connector.stop();
connectorCallback.ifPresent(ConnectorCallback::connectorStopped);
}
}
} finally {
latch.countDown();
runningThread.set(null);
// after we've "shut down" the engine, fire the completion callback based on the results we collected
completionCallback.handle(completionResult.success(), completionResult.message(), completionResult.error());
}
}
}
/**
* Determine if we should flush offsets to storage, and if so then attempt to flush offsets.
*
*
* @param offsetWriter the offset storage writer; may not be null
* @param policy the offset commit policy; may not be null
* @param commitTimeoutMs the timeout to wait for commit results
* @param task the task which produced the records for which the offsets have been committed
*/
protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, long commitTimeoutMs) {
protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, long commitTimeoutMs,
SourceTask task) {
// Determine if we need to commit to offset storage ...
if (policy.performCommit(recordsSinceLastCommit, timeSinceLastCommitMillis,
TimeUnit.MILLISECONDS)) {
commitOffsets(offsetWriter, commitTimeoutMs);
commitOffsets(offsetWriter, commitTimeoutMs, task);
}
}
/**
* Flush offsets to storage.
*
*
* @param offsetWriter the offset storage writer; may not be null
* @param commitTimeoutMs the timeout to wait for commit results
* @param task the task which produced the records for which the offsets have been committed
*/
protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeoutMs) {
protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeoutMs, SourceTask task) {
long started = clock.currentTimeInMillis();
long timeout = started + commitTimeoutMs;
if (!offsetWriter.beginFlush()) return;
@ -653,6 +727,8 @@ protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeou
// Wait until the offsets are flushed ...
try {
flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0), TimeUnit.MILLISECONDS);
// if we've gotten this far, the offsets have been committed so notify the task
task.commit();
recordsSinceLastCommit = 0;
timeSinceLastCommitMillis = clock.currentTimeInMillis();
} catch (InterruptedException e) {

View File

@ -55,6 +55,7 @@
import io.debezium.data.SchemaUtil;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine.CompletionCallback;
import io.debezium.embedded.EmbeddedEngine.ConnectorCallback;
import io.debezium.embedded.EmbeddedEngine.EmbeddedConfig;
import io.debezium.function.BooleanConsumer;
import io.debezium.relational.history.HistoryRecord;
@ -250,10 +251,22 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
try {
if (callback != null) callback.handle(success, msg, error);
} finally {
latch.countDown();
if (!success) {
// we only unblock if there was an error; in all other cases we're unblocking when a task has been started
latch.countDown();
}
}
Testing.debug("Stopped connector");
};
ConnectorCallback connectorCallback = new ConnectorCallback() {
@Override
public void taskStarted() {
// if this is called, it means a task has been started successfully so we can continue
latch.countDown();
}
};
// Create the connector ...
engine = EmbeddedEngine.create()
.using(config)
@ -270,6 +283,7 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
})
.using(this.getClass().getClassLoader())
.using(wrapperCallback)
.using(connectorCallback)
.build();
// Submit the connector for asynchronous execution ...
@ -279,6 +293,16 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run();
});
try {
if (!latch.await(10, TimeUnit.SECONDS)) {
// maybe it takes more time to start up, so just log a warning and continue
logger.warn("The connector did not finish starting its task(s) or complete in the expected amount of time");
}
} catch (InterruptedException e) {
if (Thread.interrupted()) {
fail("Interrupted while waiting for engine startup");
}
}
}
/**