DBZ-604 Making sure connector cleanly shuts down also if records are produced after polling loop has stopped

This commit is contained in:
Gunnar Morling 2018-11-23 10:53:20 +01:00 committed by Jiri Pechanec
parent 2fd16a6bc0
commit 62fe7d1ee7

View File

@ -98,9 +98,17 @@ public void start() {
@Override
public void stop() {
try {
// Emptying the queue so to make sure that enqueue() won't block indefinitely when adding records after
// poll() isn't called anymore but before the binlog reader is stopped; note there's still a tiny chance for
// this to happen if enough records are added again between here and the call to disconnect(); protecting
// against it seems not worth though it as shouldn't happen for any practical queue size
List<SourceRecord> unsent = new ArrayList<>();
records.drainTo(unsent);
logger.info("Discarding {} unsent record(s) due to the connector shutting down", unsent.size());
doStop();
running.set(false);
} finally {
}
finally {
if (failure.get() != null) {
// We had a failure and it was propagated via poll(), after which Kafka Connect will stop
// the connector, which will stop the task that will then stop this reader via this method.