[DBZ-1579] Fix regression of large refreshes causing connection timeout

In DBZ-1214, an issue was fixed where during the initial schema refresh,
the replication stream connection would timeout. This occurred in cases
of many schemas that need to be refreshed.

This was fixed by adding a keep alive thread that ensure that we
periodically send an empty update to the DB.

It appears that in the refactor to move the new shared interface, the
call to this keep alive thread was removed, but the remaining keep alive
interface still exists

This simply wires the keep alive thread back up
This commit is contained in:
Addison Higham 2019-10-26 15:13:23 -06:00 committed by Gunnar Morling
parent c25bb23a04
commit b1b657b8bd

View File

@ -8,6 +8,7 @@
import java.sql.SQLException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
@ -101,13 +102,17 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
LOGGER.info("no previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
replicationStream.compareAndSet(null, replicationConnection.startStreaming());
}
// for large dbs, the refresh of schema can take too much time
// such that the connection times out. We must enable keep
// alive to ensure that it doesn't time out
final ReplicationStream stream = this.replicationStream.get();
stream.startKeepAlive(Executors.newSingleThreadExecutor());
// refresh the schema so we have a latest view of the DB tables
taskContext.refreshSchema(connection, true);
this.lastCompletelyProcessedLsn = offsetContext.lsn();
final ReplicationStream stream = this.replicationStream.get();
while (context.isRunning()) {
int noMessageIterations = 0;
if (!stream.readPending(message -> {
@ -163,6 +168,12 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
finally {
if (replicationConnection != null) {
LOGGER.debug("stopping streaming...");
// stop the keep alive thread, this also shuts down the
// executor pool
ReplicationStream stream = replicationStream.get();
if (stream != null) {
stream.stopKeepAlive();
}
//TODO author=Horia Chiorean date=08/11/2016 description=Ideally we'd close the stream, but it's not reliable atm (see javadoc)
//replicationStream.close();
// close the connection - this should also disconnect the current stream even if it's blocking