DBZ-859 Force heartbeat after snapshot completion

This commit is contained in:
Jiri Pechanec 2018-08-16 16:51:26 +02:00 committed by Gunnar Morling
parent 49214491a7
commit 985e03e7b5
3 changed files with 30 additions and 2 deletions

View File

@ -38,6 +38,7 @@
import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
import io.debezium.function.BufferedBlockingConsumer;
import io.debezium.function.Predicates;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnection.StatementFactory;
import io.debezium.relational.Column;
@ -63,6 +64,7 @@ public class SnapshotReader extends AbstractReader {
private RecordRecorder recorder;
private final SnapshotReaderMetrics metrics;
private ExecutorService executorService;
private final Heartbeat heartbeat;
private final MySqlConnectorConfig.SnapshotLockingMode snapshotLockingMode;
@ -78,6 +80,9 @@ public SnapshotReader(String name, MySqlTaskContext context) {
this.snapshotLockingMode = context.getConnectorConfig().getSnapshotLockingMode();
recorder = this::recordRowAsRead;
metrics = new SnapshotReaderMetrics(context.getClock(), context.dbSchema());
heartbeat = Heartbeat.create(context.config(), context.topicSelector().getHeartbeatTopic(),
context.getConnectorConfig().getLogicalName());
}
/**
@ -683,6 +688,7 @@ protected void execute() {
// Mark the source as having completed the snapshot. This will ensure the `source` field on records
// are not denoted as a snapshot ...
source.completeSnapshot();
heartbeat.forcedBeat(source.partition(), source.offset(), this::enqueueRecord);
} finally {
// Set the completion flag ...
completeSuccessfully();

View File

@ -56,6 +56,11 @@ public interface Heartbeat {
@Override
public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
}
@Override
public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer)
throws InterruptedException {
}
};
/**
@ -69,6 +74,17 @@ public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingC
// too
void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;
/**
* Generates a heartbeat record unconditionaly
*
* @param partition partition for the heartbeat record
* @param offset offset for the heartbeat record
* @param consumer - a code to place record among others to be sent into Connect
*/
// TODO would be nice to pass OffsetContext here; not doing it for now, though, until MySQL is using OffsetContext,
// too
void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;
/**
* Provide an instance of Heartbeat object
*

View File

@ -68,12 +68,18 @@ class HeartbeatImpl implements Heartbeat {
@Override
public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
if (heartbeatTimeout.expired()) {
LOGGER.debug("Generating heartbeat event");
consumer.accept(heartbeatRecord(partition, offset));
forcedBeat(partition, offset, consumer);
heartbeatTimeout = resetHeartbeat();
}
}
@Override
public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer)
throws InterruptedException {
LOGGER.debug("Generating heartbeat event");
consumer.accept(heartbeatRecord(partition, offset));
}
/**
* Produce a key struct based on the server name and KEY_SCHEMA
*