diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java index cbe774c6c..c6e4e8f9f 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java @@ -113,12 +113,13 @@ public SnapshotResult execute(ChangeEventSourceContext context, P partition, completed(ctx); snapshotProgressListener.snapshotCompleted(partition); - notificationService.initialSnapshotNotificationService().notifyCompleted(offsets.getTheOnlyPartition(), offsets.getTheOnlyOffset()); + notificationService.initialSnapshotNotificationService().notifyCompleted(ctx.partition, ctx.offset); } else { LOGGER.warn("Snapshot was not completed successfully, it will be re-executed upon connector restart"); aborted(ctx); snapshotProgressListener.snapshotAborted(offsets.getTheOnlyPartition()); + notificationService.initialSnapshotNotificationService().notifyAborted(ctx.partition, ctx.offset); } } } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java b/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java index dc7c911a9..f4456ab0b 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java @@ -46,6 +46,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.doc.FixFor; import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; import io.debezium.junit.logging.LogInterceptor; import io.debezium.pipeline.notification.channels.SinkNotificationChannel; @@ -228,6 +229,54 @@ public void emittingDebeziumNotificationWillGenerateAJmxNotification() assertThat(notification.getTimestamp()).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1)); } + @Test + @FixFor("DBZ-7858") + public void sinkNotificationWillCorrectlySaveOffsetAfterSnapshot() throws InterruptedException { + // Testing.Print.enable(); + + startConnector(config -> config + .with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification") + .with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink")); + + assertConnectorIsRunning(); + + waitForSnapshotToBeCompleted(connector(), server(), task(), database()); + + List notifications = new ArrayList<>(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + + consumeAvailableRecords(r -> { + if (r.topic().equals("io.debezium.notification")) { + notifications.add(r); + } + }); + return notifications.size() == calculateNotificationSize(); + }); + + assertThat(notifications).hasSize(calculateNotificationSize()); + SourceRecord sourceRecord = notifications.get(0); + Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification"); + Assertions.assertThat(((Struct) sourceRecord.value()).getString("aggregate_type")).isEqualTo("Initial Snapshot"); + Assertions.assertThat(((Struct) sourceRecord.value()).getString("type")).isEqualTo("STARTED"); + Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1)); + + collections().forEach(tableName -> assertTableNotificationsSentToTopic(notifications, tableName)); + + sourceRecord = notifications.get(notifications.size() - 1); + Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification"); + Assertions.assertThat(((Struct) sourceRecord.value()).getString("aggregate_type")).isEqualTo("Initial Snapshot"); + Assertions.assertThat(((Struct) sourceRecord.value()).getString("type")).isEqualTo(snapshotStatusResult()); + Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1)); + + stopConnector(); + + startConnector(config -> config + .with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification") + .with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink")); + + assertConnectorIsRunning(); + } + private void assertTableNotificationsSentToJmx(List notifications, String tableName) { // For debugging purposes Optional tableNotification;