diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java index 1a21af04b..76353c59c 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java @@ -102,6 +102,16 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s .with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, SchemaNameAdjustmentMode.AVRO); } + @Override + protected String connector() { + return "mysql"; + } + + @Override + protected String server() { + return DATABASE.getServerName(); + } + @Override protected Class connectorClass() { return MySqlConnector.class; diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotCaseSensitiveIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotCaseSensitiveIT.java index 0f5152eb7..4018df32b 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotCaseSensitiveIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotCaseSensitiveIT.java @@ -185,6 +185,16 @@ protected int defaultIncrementalSnapshotChunkSize() { return 250; } + @Override + protected String connector() { + return "oracle"; + } + + @Override + protected String server() { + return TestHelper.SERVER_NAME; + } + @Test public void snapshotPreceededBySchemaChange() throws Exception { // TODO: remove once https://github.com/Apicurio/apicurio-registry/issues/2980 is fixed diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java index 70a0861f7..88ec3be9a 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java @@ -180,6 +180,16 @@ protected int defaultIncrementalSnapshotChunkSize() { return 250; } + @Override + protected String connector() { + return "oracle"; + } + + @Override + protected String server() { + return TestHelper.SERVER_NAME; + } + @Test public void snapshotPreceededBySchemaChange() throws Exception { // TODO: remove once https://github.com/Apicurio/apicurio-registry/issues/2980 is fixed diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java index 9ef545e3d..e9182fcd2 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java @@ -167,6 +167,16 @@ protected void waitForConnectorToStart() { TestHelper.waitForDefaultReplicationSlotBeActive(); } + @Override + protected String connector() { + return "postgres"; + } + + @Override + protected String server() { + return TestHelper.TEST_SERVER; + } + @Test public void inserts4Pks() throws Exception { // Testing.Print.enable(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java index 16ff8c9d4..9a4a3a010 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java @@ -178,4 +178,24 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception { TestHelper.waitForCdcTransactionPropagation(connection, TestHelper.TEST_DATABASE_1, expectedTransactions); } + + @Override + protected String connector() { + return "sql_server"; + } + + @Override + protected String server() { + return TestHelper.TEST_SERVER_NAME; + } + + @Override + protected String task() { + return "0"; + } + + @Override + protected String database() { + return TestHelper.TEST_DATABASE_1; + } } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotWithRecompileIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotWithRecompileIT.java index 7e11d0ea9..aebeb1519 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotWithRecompileIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotWithRecompileIT.java @@ -122,6 +122,26 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl); } + @Override + protected String connector() { + return "sql_server"; + } + + @Override + protected String server() { + return TestHelper.TEST_SERVER_NAME; + } + + @Override + protected String task() { + return "0"; + } + + @Override + protected String database() { + return TestHelper.TEST_DATABASE_1; + } + @Override protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception { TestHelper.waitForCdcTransactionPropagation(connection, TestHelper.TEST_DATABASE_1, expectedTransactions); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/notification/IncrementalSnapshotNotificationService.java b/debezium-core/src/main/java/io/debezium/pipeline/notification/IncrementalSnapshotNotificationService.java index 3e56318e3..bb8dbae80 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/notification/IncrementalSnapshotNotificationService.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/notification/IncrementalSnapshotNotificationService.java @@ -11,9 +11,7 @@ import java.util.UUID; import java.util.stream.Collectors; -import org.apache.kafka.connect.errors.DataException; - -import io.debezium.connector.common.BaseSourceInfo; +import io.debezium.config.CommonConnectorConfig; import io.debezium.pipeline.source.snapshot.incremental.DataCollection; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; @@ -36,6 +34,8 @@ public class IncrementalSnapshotNotificationService

notificationService; + private final CommonConnectorConfig connectorConfig; + public enum TableScanCompletionStatus { EMPTY, NO_PRIMARY_KEY, @@ -45,8 +45,9 @@ public enum TableScanCompletionStatus { UNKNOWN_SCHEMA } - public IncrementalSnapshotNotificationService(NotificationService notificationService) { + public IncrementalSnapshotNotificationService(NotificationService notificationService, CommonConnectorConfig config) { this.notificationService = notificationService; + connectorConfig = config; } public void notifyStarted(IncrementalSnapshotContext incrementalSnapshotContext, P partition, OffsetContext offsetContext) { @@ -139,14 +140,7 @@ private Notification buildNotificationWith(Incremen Map fullMap = new HashMap<>(additionalData); - String connectorName; - try { - connectorName = offsetContext.getSourceInfo().getString(BaseSourceInfo.SERVER_NAME_KEY); - } - catch (DataException e) { - connectorName = NONE; - } - fullMap.put(CONNECTOR_NAME, connectorName); + fullMap.put(CONNECTOR_NAME, connectorConfig.getLogicalName()); String id = incrementalSnapshotContext.getCorrelationId() != null ? incrementalSnapshotContext.getCorrelationId() : UUID.randomUUID().toString(); return Notification.Builder.builder() diff --git a/debezium-core/src/main/java/io/debezium/pipeline/notification/NotificationService.java b/debezium-core/src/main/java/io/debezium/pipeline/notification/NotificationService.java index 3b85699d9..8c7df6692 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/notification/NotificationService.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/notification/NotificationService.java @@ -48,7 +48,7 @@ public NotificationService(List notificationChannels, .filter(isConnectChannel()) .forEach(channel -> ((ConnectChannel) channel).initConnectChannel(schemaFactory, consumer)); - incrementalSnapshotNotificationService = new IncrementalSnapshotNotificationService<>(this); + incrementalSnapshotNotificationService = new IncrementalSnapshotNotificationService<>(this, config); initialSnapshotNotificationService = new InitialSnapshotNotificationService<>(this, config); } diff --git a/debezium-core/src/test/java/io/debezium/pipeline/notification/IncrementalSnapshotNotificationServiceTest.java b/debezium-core/src/test/java/io/debezium/pipeline/notification/IncrementalSnapshotNotificationServiceTest.java index 13d6cb7ca..5de0a3b4e 100644 --- a/debezium-core/src/test/java/io/debezium/pipeline/notification/IncrementalSnapshotNotificationServiceTest.java +++ b/debezium-core/src/test/java/io/debezium/pipeline/notification/IncrementalSnapshotNotificationServiceTest.java @@ -8,7 +8,6 @@ import static io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.TableScanCompletionStatus.SUCCEEDED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -16,7 +15,6 @@ import java.util.Map; import java.util.Optional; -import org.apache.kafka.connect.data.Struct; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -24,6 +22,7 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import io.debezium.config.CommonConnectorConfig; import io.debezium.pipeline.source.snapshot.incremental.DataCollection; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; @@ -42,14 +41,15 @@ public class IncrementalSnapshotNotificationServiceTest { private NotificationService notificationService; @Mock private IncrementalSnapshotContext incrementalSnapshotContext; + + @Mock + private CommonConnectorConfig connectorConfig; @InjectMocks private IncrementalSnapshotNotificationService incrementalSnapshotNotificationService; @Before public void setUp() { - Struct mockedSourceStruct = mock(Struct.class); - when(mockedSourceStruct.getString("name")).thenReturn("connector-test"); - when(offsetContext.getSourceInfo()).thenReturn(mockedSourceStruct); + when(connectorConfig.getLogicalName()).thenReturn("connector-test"); when(incrementalSnapshotContext.getCorrelationId()).thenReturn("12345"); when(incrementalSnapshotContext.getDataCollections()).thenReturn(List.of( new DataCollection<>(new TableId("db", "inventory", "product")), diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index 88a63bb33..6ba7c5121 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -1242,12 +1242,16 @@ public static void waitForStreamingRunning(String connector, String server) thro } public static void waitForStreamingRunning(String connector, String server, String contextName) { + waitForStreamingRunning(connector, server, contextName, null); + } + + public static void waitForStreamingRunning(String connector, String server, String contextName, String task) { Awaitility.await() .alias("Streaming was not started on time") .pollInterval(100, TimeUnit.MILLISECONDS) .atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS) .ignoreException(InstanceNotFoundException.class) - .until(() -> isStreamingRunning(connector, server, contextName)); + .until(() -> isStreamingRunning(connector, server, contextName, task)); } public static void waitForConnectorShutdown(String connector, String server) { @@ -1258,14 +1262,20 @@ public static void waitForConnectorShutdown(String connector, String server) { } public static boolean isStreamingRunning(String connector, String server) { - return isStreamingRunning(connector, server, getStreamingNamespace()); + return isStreamingRunning(connector, server, getStreamingNamespace(), null); } public static boolean isStreamingRunning(String connector, String server, String contextName) { + return isStreamingRunning(connector, server, contextName, null); + } + + public static boolean isStreamingRunning(String connector, String server, String contextName, String task) { final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); try { - return (boolean) mbeanServer.getAttribute(getStreamingMetricsObjectName(connector, server, contextName), "Connected"); + ObjectName streamingMetricsObjectName = task != null ? getStreamingMetricsObjectName(connector, server, contextName, task) + : getStreamingMetricsObjectName(connector, server, contextName); + return (boolean) mbeanServer.getAttribute(streamingMetricsObjectName, "Connected"); } catch (JMException ignored) { } @@ -1301,6 +1311,10 @@ public static ObjectName getStreamingMetricsObjectName(String connector, String return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server); } + public static ObjectName getStreamingMetricsObjectName(String connector, String server, String context, String task) throws MalformedObjectNameException { + return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server + ",task=" + task); + } + protected static String getStreamingNamespace() { return System.getProperty("test.streaming.metrics.namespace", "streaming"); } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java index 13c7f1475..89d18e793 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java @@ -88,6 +88,18 @@ protected String signalTableNameSanitized() { protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl); + protected abstract String connector(); + + protected abstract String server(); + + protected String task() { + return null; + } + + protected String database() { + return null; + } + protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception { } @@ -1094,6 +1106,7 @@ public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception { @Test public void testNotification() throws Exception { + populateTable(); startConnector(x -> x.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink") .with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, defaultIncrementalSnapshotChunkSize()) @@ -1103,6 +1116,8 @@ public void testNotification() throws Exception { waitForAvailableRecords(1, TimeUnit.SECONDS); + waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task()); + sendAdHocSnapshotSignal(); List records = new ArrayList<>();