DBZ-6605 Fix DataCollections for snapshot completion notification

DBZ-6605 Use new 'scanned_collection' field for snapshot completion signal

Updated the docs for fixed snapshot completion notification
This commit is contained in:
bdbene 2023-06-23 15:08:58 -04:00 committed by Fiore Mario Vitale
parent fa78e77797
commit 6f29ce08ef
5 changed files with 13 additions and 6 deletions

View File

@ -55,6 +55,7 @@ Aykut Farsak
Babur Duisenov
Balázs Németh
Balázs Sipos
Balint Bene
Barry LaFond
Bartosz Miedlar
Ben Hardesty

View File

@ -23,6 +23,7 @@ public class IncrementalSnapshotNotificationService<P extends Partition, O exten
public static final String INCREMENTAL_SNAPSHOT = "Incremental Snapshot";
public static final String DATA_COLLECTIONS = "data_collections";
public static final String SCANNED_COLLECTION = "scanned_collection";
public static final String CURRENT_COLLECTION_IN_PROGRESS = "current_collection_in_progress";
public static final String MAXIMUM_KEY = "maximum_key";
public static final String LAST_PROCESSED_KEY = "last_processed_key";
@ -99,6 +100,7 @@ public <T extends DataCollectionId> void notifyAborted(IncrementalSnapshotContex
public <T extends DataCollectionId> void notifyTableScanCompleted(IncrementalSnapshotContext<T> incrementalSnapshotContext, P partition, OffsetContext offsetContext,
long totalRowsScanned, TableScanCompletionStatus status) {
String scannedCollection = incrementalSnapshotContext.currentDataCollectionId().getId().identifier();
String dataCollections = incrementalSnapshotContext.getDataCollections().stream().map(DataCollection::getId)
.map(DataCollectionId::identifier)
.collect(Collectors.joining(LIST_DELIMITER));
@ -106,6 +108,7 @@ public <T extends DataCollectionId> void notifyTableScanCompleted(IncrementalSna
notificationService.notify(buildNotificationWith(incrementalSnapshotContext, SnapshotStatus.TABLE_SCAN_COMPLETED,
Map.of(
DATA_COLLECTIONS, dataCollections,
SCANNED_COLLECTION, scannedCollection,
TOTAL_ROWS_SCANNED, String.valueOf(totalRowsScanned),
STATUS, status.name()),
offsetContext),

View File

@ -126,6 +126,7 @@ public void notifyTableScanCompleted() {
Notification expectedNotification = new Notification("12345", "Incremental Snapshot", "TABLE_SCAN_COMPLETED", Map.of(
"connector_name", "connector-test",
"data_collections", "db.inventory.product,db.inventory.customer",
"scanned_collection", "db.inventory.product",
"total_rows_scanned", "100",
"status", "SUCCEEDED"));

View File

@ -157,7 +157,8 @@ a|[source, json]
"type":"TABLE_SCAN_COMPLETED",
"additional_data":{
"connector_name":"my-connector",
"data_collection":"table1",
"data_collection":"table1, table2",
"scanned_collection":"table1",
"total_rows_scanned":"100",
"status":"SUCCEEDED" // <1>
}
@ -225,7 +226,7 @@ To enable an application to listen for the JMX notifications that an MBean emits
[id="debezium-notification-custom-channel"]
== Custom notification channels
The notification mechanism is designed to be extensible.
The notification mechanism is designed to be extensible.
You can implement channels as needed to deliver notifications in a manner that works best in your environment.
Adding a notification channel involves several steps:
@ -257,9 +258,9 @@ public interface NotificationChannel {
<1> The name of the channel.
To enable {prodname} to use the channel, specify this name in the connector's `notification.enabled.channels` property.
<2> Initializes specific configuration, variables, or connections that the channel requires.
<3> Sends the notification on the channel.
<3> Sends the notification on the channel.
{prodname} calls this method to report its status.
<4> Closes all allocated resources.
<4> Closes all allocated resources.
{prodname} calls this method when the connector is stopped.
// Type: concept
@ -300,5 +301,5 @@ NOTE: To use a custom notification channel with multiple connectors, you must pl
[id="configuring-connectors-to-use-a-custom-notification-channel"]
=== Configuring connectors to use a custom notification channel
Add the name of the custom notification channel to the `notification.enabled.channels` configuration property.
Add the name of the custom notification channel to the `notification.enabled.channels` configuration property.

View File

@ -213,4 +213,5 @@ gongchanghua,Gong Chang Hua
angsdey2,Angshuman Dey
jehrenzweig-pi,Jesse Ehrenzweig
TechIsCool,David Beck
cjmencias,Christian Jacob Mencias
cjmencias,Christian Jacob Mencias
bdbene,Balint Bene