DBZ-7858 Initial snapshot completed/aborted notification will the correct offset

This commit is contained in:
mfvitale 2024-05-13 10:01:58 +02:00 committed by Jiri Pechanec
parent 5b14097a94
commit 730eabc1f9
2 changed files with 51 additions and 1 deletions

View File

@ -113,12 +113,13 @@ public SnapshotResult<O> execute(ChangeEventSourceContext context, P partition,
completed(ctx); completed(ctx);
snapshotProgressListener.snapshotCompleted(partition); snapshotProgressListener.snapshotCompleted(partition);
notificationService.initialSnapshotNotificationService().notifyCompleted(offsets.getTheOnlyPartition(), offsets.getTheOnlyOffset()); notificationService.initialSnapshotNotificationService().notifyCompleted(ctx.partition, ctx.offset);
} }
else { else {
LOGGER.warn("Snapshot was not completed successfully, it will be re-executed upon connector restart"); LOGGER.warn("Snapshot was not completed successfully, it will be re-executed upon connector restart");
aborted(ctx); aborted(ctx);
snapshotProgressListener.snapshotAborted(offsets.getTheOnlyPartition()); snapshotProgressListener.snapshotAborted(offsets.getTheOnlyPartition());
notificationService.initialSnapshotNotificationService().notifyAborted(ctx.partition, ctx.offset);
} }
} }
} }

View File

@ -46,6 +46,7 @@
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor; import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel; import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
@ -228,6 +229,54 @@ public void emittingDebeziumNotificationWillGenerateAJmxNotification()
assertThat(notification.getTimestamp()).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1)); assertThat(notification.getTimestamp()).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1));
} }
@Test
@FixFor("DBZ-7858")
public void sinkNotificationWillCorrectlySaveOffsetAfterSnapshot() throws InterruptedException {
// Testing.Print.enable();
startConnector(config -> config
.with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification")
.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink"));
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(connector(), server(), task(), database());
List<SourceRecord> notifications = new ArrayList<>();
Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
consumeAvailableRecords(r -> {
if (r.topic().equals("io.debezium.notification")) {
notifications.add(r);
}
});
return notifications.size() == calculateNotificationSize();
});
assertThat(notifications).hasSize(calculateNotificationSize());
SourceRecord sourceRecord = notifications.get(0);
Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification");
Assertions.assertThat(((Struct) sourceRecord.value()).getString("aggregate_type")).isEqualTo("Initial Snapshot");
Assertions.assertThat(((Struct) sourceRecord.value()).getString("type")).isEqualTo("STARTED");
Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1));
collections().forEach(tableName -> assertTableNotificationsSentToTopic(notifications, tableName));
sourceRecord = notifications.get(notifications.size() - 1);
Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification");
Assertions.assertThat(((Struct) sourceRecord.value()).getString("aggregate_type")).isEqualTo("Initial Snapshot");
Assertions.assertThat(((Struct) sourceRecord.value()).getString("type")).isEqualTo(snapshotStatusResult());
Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1));
stopConnector();
startConnector(config -> config
.with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification")
.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink"));
assertConnectorIsRunning();
}
private void assertTableNotificationsSentToJmx(List<Notification> notifications, String tableName) { private void assertTableNotificationsSentToJmx(List<Notification> notifications, String tableName) {
// For debugging purposes // For debugging purposes
Optional<Notification> tableNotification; Optional<Notification> tableNotification;