DBZ-8137 Stabilize SignalsIT for SQLServer

This commit is contained in:
mfvitale 2024-08-27 15:17:10 +02:00 committed by Jiri Pechanec
parent 08c50fb619
commit a4ff03716b

View File

@ -18,6 +18,7 @@
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.ReflectionException; import javax.management.ReflectionException;
import org.awaitility.Awaitility;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -88,7 +89,8 @@ public void jmxSignals() throws Exception {
sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "0"); sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "0");
sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "1"); sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "1");
waitForAvailableRecords(800, TimeUnit.MILLISECONDS); Awaitility.await("Waiting for metrics to appear").atMost(waitTimeForRecords(), TimeUnit.SECONDS)
.until(() -> logInterceptor.countOccurrences("Signal message at offset") == 2);
assertThat(logInterceptor.countOccurrences("Signal message at offset")).isEqualTo(2); assertThat(logInterceptor.countOccurrences("Signal message at offset")).isEqualTo(2);
@ -100,6 +102,16 @@ private void sendLogSignalWithJmx(String id, String type, String data, String ta
ObjectName objectName = new ObjectName(String.format("debezium.sql_server:type=management,context=signals,server=server1,task=%s", taskId)); ObjectName objectName = new ObjectName(String.format("debezium.sql_server:type=management,context=signals,server=server1,task=%s", taskId));
MBeanServer server = ManagementFactory.getPlatformMBeanServer(); MBeanServer server = ManagementFactory.getPlatformMBeanServer();
Awaitility.await("Waiting for metrics to appear").atMost(waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
try {
server.getObjectInstance(objectName);
return true;
}
catch (InstanceNotFoundException e) {
return false;
}
});
server.invoke(objectName, "signal", new Object[]{ id, type, data }, new String[]{ String.class.getName(), String.class.getName(), String.class.getName() }); server.invoke(objectName, "signal", new Object[]{ id, type, data }, new String[]{ String.class.getName(), String.class.getName(), String.class.getName() });
} }
} }