DBZ-8137 Fix JMX signal and notification channel for multiple tasks connectors

This commit is contained in:
mfvitale 2024-08-26 12:53:36 +02:00 committed by Jiri Pechanec
parent 2fe9b54dae
commit 08c50fb619
3 changed files with 114 additions and 2 deletions

View File

@ -0,0 +1,105 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import static org.assertj.core.api.Assertions.assertThat;
import java.lang.management.ManagementFactory;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.actions.Log;
import io.debezium.util.Testing;
public class SignalsIT extends AbstractAsyncEngineConnectorTest {
private SqlServerConnection connection;
@Before
public void before() throws SQLException {
TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2);
connection = TestHelper.multiPartitionTestConnection();
connection.execute(
"USE " + TestHelper.TEST_DATABASE_1,
"CREATE TABLE tableA (id int primary key, colA varchar(32))",
"CREATE TABLE tableB (id int primary key, colB varchar(32))",
"INSERT INTO tableA VALUES(1, 'a1')",
"INSERT INTO tableB VALUES(2, 'b')");
TestHelper.enableTableCdc(connection, "tableA");
TestHelper.enableTableCdc(connection, "tableB");
connection.execute(
"USE " + TestHelper.TEST_DATABASE_2,
"CREATE TABLE tableA (id int primary key, colA varchar(32))",
"CREATE TABLE tableC (id int primary key, colC varchar(32))",
"INSERT INTO tableA VALUES(3, 'a2')",
"INSERT INTO tableC VALUES(4, 'c')");
TestHelper.enableTableCdc(connection, "tableA");
TestHelper.enableTableCdc(connection, "tableC");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
}
@After
public void after() throws SQLException {
if (connection != null) {
connection.close();
}
}
@Test
public void jmxSignals() throws Exception {
// Testing.Print.enable();
final LogInterceptor logInterceptor = new LogInterceptor(Log.class);
final Configuration config = TestHelper.defaultConfig(
TestHelper.TEST_DATABASE_1,
TestHelper.TEST_DATABASE_2)
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA)
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500")
.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "jmx")
.with("tasks.max", 2)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "0");
sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "1");
waitForAvailableRecords(800, TimeUnit.MILLISECONDS);
assertThat(logInterceptor.countOccurrences("Signal message at offset")).isEqualTo(2);
}
private void sendLogSignalWithJmx(String id, String type, String data, String taskId)
throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException {
ObjectName objectName = new ObjectName(String.format("debezium.sql_server:type=management,context=signals,server=server1,task=%s", taskId));
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
server.invoke(objectName, "signal", new Object[]{ id, type, data }, new String[]{ String.class.getName(), String.class.getName(), String.class.getName() });
}
}

View File

@ -116,7 +116,10 @@ public static void unregisterMXBean(CommonConnectorConfig connectorConfig, Strin
private static String getManagementJmxObjectName(String type, String context, CommonConnectorConfig connectorConfig) {
String tags = String.format(JMX_OBJECT_NAME_FORMAT, connectorConfig.getContextName().toLowerCase(), type, context, connectorConfig.getLogicalName());
if (connectorConfig.getCustomMetricTags().size() > 0) {
if (connectorConfig.getTaskId() != null) {
tags += ",task=" + connectorConfig.getTaskId();
}
if (!connectorConfig.getCustomMetricTags().isEmpty()) {
String customTags = connectorConfig.getCustomMetricTags().entrySet().stream()
.map(e -> e.getKey() + "=" + Sanitizer.jmxSanitize(e.getValue()))
.collect(Collectors.joining(","));

View File

@ -353,7 +353,11 @@ private MBeanNotificationInfo[] readJmxNotifications()
private ObjectName getObjectName() throws MalformedObjectNameException {
return new ObjectName(String.format("debezium.%s:type=management,context=notifications,server=%s", connector(), server()));
String objName = String.format("debezium.%s:type=management,context=notifications,server=%s", connector(), server());
if (task() != null) {
objName += ",task=" + task();
}
return new ObjectName(objName);
}
private List<javax.management.Notification> registerJmxNotificationListener()