DBZ-777 Fix heartbeats

This commit is contained in:
Jiri Pechanec 2019-08-14 13:46:39 +02:00 committed by Gunnar Morling
parent ffb7e6d7d8
commit 037d581844
5 changed files with 26 additions and 6 deletions

View File

@ -129,6 +129,10 @@ boolean hasLastKnownPosition() {
return sourceInfo.lsn() != null; return sourceInfo.lsn() != null;
} }
boolean hasCompletelyProcessedPosition() {
return this.lastCompletelyProcessedLsn != null;
}
Long lsn() { Long lsn() {
return sourceInfo.lsn(); return sourceInfo.lsn();
} }

View File

@ -91,12 +91,11 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final ReplicationStream stream = this.replicationStream.get(); final ReplicationStream stream = this.replicationStream.get();
while (context.isRunning()) { while (context.isRunning()) {
stream.readPending(message -> { if (!stream.readPending(message -> {
final Long lsn = stream.lastReceivedLsn(); final Long lsn = stream.lastReceivedLsn();
if (message == null) { if (message == null) {
LOGGER.trace("Received empty message"); LOGGER.trace("Received empty message");
lastCompletelyProcessedLsn = lsn; lastCompletelyProcessedLsn = lsn;
pauseNoMessage.pause();
return; return;
} }
if (message.isLastEventForLsn()) { if (message.isLastEventForLsn()) {
@ -119,7 +118,12 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
message message
) )
); );
}); })) {
if (offsetContext.hasCompletelyProcessedPosition()) {
dispatcher.dispatchHeartbeatEvent(offsetContext);
pauseNoMessage.pause();
}
}
} }
} }
catch (Throwable e) { catch (Throwable e) {

View File

@ -980,6 +980,8 @@ public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws E
.with(Heartbeat.HEARTBEAT_INTERVAL, "1") .with(Heartbeat.HEARTBEAT_INTERVAL, "1")
.with(PostgresConnectorConfig.POLL_INTERVAL_MS, "50") .with(PostgresConnectorConfig.POLL_INTERVAL_MS, "50")
.with(PostgresConnectorConfig.TABLE_WHITELIST, "s1\\.b") .with(PostgresConnectorConfig.TABLE_WHITELIST, "s1\\.b")
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER),
false
); );
String statement = "CREATE SCHEMA s1;" + String statement = "CREATE SCHEMA s1;" +
@ -990,11 +992,13 @@ public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws E
// streaming from database is non-blocking so we should receive many heartbeats // streaming from database is non-blocking so we should receive many heartbeats
final int expectedHeartbeats = 5; final int expectedHeartbeats = 5;
consumer = testConsumer(1 + expectedHeartbeats); // heartbeat for unfiltered table, data change, heartbeats
consumer = testConsumer(1 + 1 + expectedHeartbeats);
consumer.setIgnoreExtraRecords(true); consumer.setIgnoreExtraRecords(true);
executeAndWait(statement); executeAndWait(statement);
// change record for s1.b and heartbeats // change record for s1.b and heartbeats
assertHeartBeatRecordInserted();
assertRecordInserted("s1.b", PK_FIELD, 1); assertRecordInserted("s1.b", PK_FIELD, 1);
for (int i = 0; i < expectedHeartbeats; i++) { for (int i = 0; i < expectedHeartbeats; i++) {
assertHeartBeatRecordInserted(); assertHeartBeatRecordInserted();

View File

@ -173,7 +173,7 @@ public void dispatchSchemaChangeEvent(T dataCollectionId, SchemaChangeEventEmitt
schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver()); schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
} }
public void dispatchHeartbeatEvent(OffsetContext offset) throws InterruptedException { public void alwaysDispatchHeartbeatEvent(OffsetContext offset) throws InterruptedException {
heartbeat.forcedBeat( heartbeat.forcedBeat(
offset.getPartition(), offset.getPartition(),
offset.getOffset(), offset.getOffset(),
@ -181,6 +181,14 @@ public void dispatchHeartbeatEvent(OffsetContext offset) throws InterruptedExcep
); );
} }
public void dispatchHeartbeatEvent(OffsetContext offset) throws InterruptedException {
heartbeat.heartbeat(
offset.getPartition(),
offset.getOffset(),
this::enqueueHeartbeat
);
}
private void enqueueHeartbeat(SourceRecord record) throws InterruptedException { private void enqueueHeartbeat(SourceRecord record) throws InterruptedException {
queue.enqueue(new DataChangeEvent(record)); queue.enqueue(new DataChangeEvent(record));
} }

View File

@ -164,7 +164,7 @@ public SnapshotResult execute(ChangeEventSourceContext context) throws Interrupt
ctx.offset.postSnapshotCompletion(); ctx.offset.postSnapshotCompletion();
} }
dispatcher.dispatchHeartbeatEvent(ctx.offset); dispatcher.alwaysDispatchHeartbeatEvent(ctx.offset);
snapshotProgressListener.snapshotCompleted(); snapshotProgressListener.snapshotCompleted();
return SnapshotResult.completed(ctx.offset); return SnapshotResult.completed(ctx.offset);
} }