From 08c50fb619426a33e216af45aa22a65c9df35d22 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 26 Aug 2024 12:53:36 +0200 Subject: [PATCH] DBZ-8137 Fix JMX signal and notification channel for multiple tasks connectors --- .../connector/sqlserver/SignalsIT.java | 105 ++++++++++++++++++ .../java/io/debezium/pipeline/JmxUtils.java | 5 +- .../notification/AbstractNotificationsIT.java | 6 +- 3 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SignalsIT.java diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SignalsIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SignalsIT.java new file mode 100644 index 000000000..b26ba28d1 --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SignalsIT.java @@ -0,0 +1,105 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.lang.management.ManagementFactory; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.util.TestHelper; +import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; +import io.debezium.junit.logging.LogInterceptor; +import io.debezium.pipeline.signal.actions.Log; +import io.debezium.util.Testing; + +public class SignalsIT extends AbstractAsyncEngineConnectorTest { + + private SqlServerConnection connection; + + @Before + public void before() throws SQLException { + TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2); + connection = TestHelper.multiPartitionTestConnection(); + connection.execute( + "USE " + TestHelper.TEST_DATABASE_1, + "CREATE TABLE tableA (id int primary key, colA varchar(32))", + "CREATE TABLE tableB (id int primary key, colB varchar(32))", + "INSERT INTO tableA VALUES(1, 'a1')", + "INSERT INTO tableB VALUES(2, 'b')"); + TestHelper.enableTableCdc(connection, "tableA"); + TestHelper.enableTableCdc(connection, "tableB"); + connection.execute( + "USE " + TestHelper.TEST_DATABASE_2, + "CREATE TABLE tableA (id int primary key, colA varchar(32))", + "CREATE TABLE tableC (id int primary key, colC varchar(32))", + "INSERT INTO tableA VALUES(3, 'a2')", + "INSERT INTO tableC VALUES(4, 'c')"); + TestHelper.enableTableCdc(connection, "tableA"); + TestHelper.enableTableCdc(connection, "tableC"); + + initializeConnectorTestFramework(); + Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH); + } + + @After + public void after() throws SQLException { + if (connection != null) { + connection.close(); + } + } + + @Test + public void jmxSignals() throws Exception { + // Testing.Print.enable(); + + final LogInterceptor logInterceptor = new LogInterceptor(Log.class); + + final Configuration config = TestHelper.defaultConfig( + TestHelper.TEST_DATABASE_1, + TestHelper.TEST_DATABASE_2) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA) + .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500") + .with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "jmx") + .with("tasks.max", 2) + .build(); + + start(SqlServerConnector.class, config); + + assertConnectorIsRunning(); + + sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "0"); + sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "1"); + + waitForAvailableRecords(800, TimeUnit.MILLISECONDS); + + assertThat(logInterceptor.countOccurrences("Signal message at offset")).isEqualTo(2); + + } + + private void sendLogSignalWithJmx(String id, String type, String data, String taskId) + throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException { + + ObjectName objectName = new ObjectName(String.format("debezium.sql_server:type=management,context=signals,server=server1,task=%s", taskId)); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + + server.invoke(objectName, "signal", new Object[]{ id, type, data }, new String[]{ String.class.getName(), String.class.getName(), String.class.getName() }); + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/JmxUtils.java b/debezium-core/src/main/java/io/debezium/pipeline/JmxUtils.java index 7407a7fcf..aa8e2fcd1 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/JmxUtils.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/JmxUtils.java @@ -116,7 +116,10 @@ public static void unregisterMXBean(CommonConnectorConfig connectorConfig, Strin private static String getManagementJmxObjectName(String type, String context, CommonConnectorConfig connectorConfig) { String tags = String.format(JMX_OBJECT_NAME_FORMAT, connectorConfig.getContextName().toLowerCase(), type, context, connectorConfig.getLogicalName()); - if (connectorConfig.getCustomMetricTags().size() > 0) { + if (connectorConfig.getTaskId() != null) { + tags += ",task=" + connectorConfig.getTaskId(); + } + if (!connectorConfig.getCustomMetricTags().isEmpty()) { String customTags = connectorConfig.getCustomMetricTags().entrySet().stream() .map(e -> e.getKey() + "=" + Sanitizer.jmxSanitize(e.getValue())) .collect(Collectors.joining(",")); diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java b/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java index 1fbcb4b1f..9a17e08db 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java @@ -353,7 +353,11 @@ private MBeanNotificationInfo[] readJmxNotifications() private ObjectName getObjectName() throws MalformedObjectNameException { - return new ObjectName(String.format("debezium.%s:type=management,context=notifications,server=%s", connector(), server())); + String objName = String.format("debezium.%s:type=management,context=notifications,server=%s", connector(), server()); + if (task() != null) { + objName += ",task=" + task(); + } + return new ObjectName(objName); } private List registerJmxNotificationListener()