DBZ-8035 Add pause/resume operation for ActivityMonitoringMeter JMXBean
This commit is contained in:
parent
d38c7b4325
commit
f71ddd9219
@ -14,9 +14,11 @@
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.management.InstanceNotFoundException;
|
||||
import javax.management.MBeanException;
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.ReflectionException;
|
||||
import javax.management.openmbean.TabularDataSupport;
|
||||
|
||||
import org.awaitility.Awaitility;
|
||||
@ -178,6 +180,39 @@ public void testAdvancedStreamingMetrics() throws Exception {
|
||||
assertStreamingMetrics(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPauseAndResumeAdvancedStreamingMetrics() throws Exception {
|
||||
// Setup
|
||||
TestHelper.execute(INIT_STATEMENTS);
|
||||
|
||||
// start connector
|
||||
start(PostgresConnector.class,
|
||||
TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA)
|
||||
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
|
||||
.with(PostgresConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE)
|
||||
.build());
|
||||
|
||||
assertSnapshotNotExecutedMetrics();
|
||||
assertStreamingMetrics(true);
|
||||
|
||||
invokeOperation(getStreamingMetricsObjectName(), "pause");
|
||||
insertRecords();
|
||||
assertAdvancedMetrics(2);
|
||||
|
||||
invokeOperation(getStreamingMetricsObjectName(), "resume");
|
||||
insertRecords();
|
||||
assertAdvancedMetrics(4);
|
||||
}
|
||||
|
||||
private void insertRecords() throws InterruptedException {
|
||||
// Wait for the streaming to begin
|
||||
TestConsumer consumer = testConsumer(2, "public");
|
||||
TestHelper.execute(INSERT_STATEMENTS);
|
||||
consumer.await(TestHelper.waitTimeForRecords() * 30L, TimeUnit.SECONDS);
|
||||
Thread.sleep(Duration.ofSeconds(2).toMillis());
|
||||
}
|
||||
|
||||
private void assertSnapshotMetrics() throws Exception {
|
||||
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
|
||||
|
||||
@ -258,17 +293,32 @@ private void assertStreamingMetrics(boolean checkAdvancedMetrics) throws Excepti
|
||||
// Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).isEqualTo(new String[] {"public.simple"});
|
||||
|
||||
if (checkAdvancedMetrics) {
|
||||
TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "NumberOfCreateEventsSeen");
|
||||
|
||||
String values = numberOfCreateEventsSeen.values().stream()
|
||||
.limit(1)
|
||||
.toList()
|
||||
.get(0)
|
||||
.toString();
|
||||
assertThat(values).isEqualTo(
|
||||
"javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map<java.lang.String, java.lang.Long>,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=public.simple, value=2})");
|
||||
assertAdvancedMetrics(2);
|
||||
}
|
||||
}
|
||||
|
||||
public void assertAdvancedMetrics(int expectedInsert) throws Exception {
|
||||
|
||||
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
|
||||
|
||||
TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "NumberOfCreateEventsSeen");
|
||||
|
||||
String values = numberOfCreateEventsSeen.values().stream()
|
||||
.limit(1)
|
||||
.toList()
|
||||
.get(0)
|
||||
.toString();
|
||||
assertThat(values).isEqualTo(
|
||||
"javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map<java.lang.String, java.lang.Long>,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=public.simple, value="
|
||||
+ expectedInsert + "})");
|
||||
}
|
||||
|
||||
private void invokeOperation(ObjectName objectName, String operation)
|
||||
throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException {
|
||||
|
||||
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
|
||||
|
||||
server.invoke(objectName, operation, new Object[]{}, new String[]{});
|
||||
}
|
||||
|
||||
private void assertStreamingWithCustomMetrics(Map<String, String> customMetricTags) throws Exception {
|
||||
|
@ -106,6 +106,16 @@ public void reset() {
|
||||
activityMonitoringMeter.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause() {
|
||||
activityMonitoringMeter.pause();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
activityMonitoringMeter.resume();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getNumberOfCreateEventsSeen() {
|
||||
return activityMonitoringMeter.getNumberOfCreateEventsSeen();
|
||||
|
@ -7,6 +7,9 @@
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Exposes advanced metrics used for monitoring DB activity.
|
||||
*/
|
||||
public interface ActivityMonitoringMXBean {
|
||||
|
||||
Map<String, Long> getNumberOfCreateEventsSeen();
|
||||
@ -14,4 +17,8 @@ public interface ActivityMonitoringMXBean {
|
||||
Map<String, Long> getNumberOfDeleteEventsSeen();
|
||||
|
||||
Map<String, Long> getNumberOfUpdateEventsSeen();
|
||||
|
||||
void pause();
|
||||
|
||||
void resume();
|
||||
}
|
||||
|
@ -27,8 +27,15 @@ public class ActivityMonitoringMeter implements ActivityMonitoringMXBean {
|
||||
private final ActivityCounter updateCount = new ActivityCounter();
|
||||
private final ActivityCounter deleteCount = new ActivityCounter();
|
||||
|
||||
private boolean isPaused = false;
|
||||
|
||||
public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Envelope.Operation operation) {
|
||||
|
||||
if (isPaused) {
|
||||
LOGGER.trace("ActivityMonitoringMeter is paused, no metric will be collected.");
|
||||
return;
|
||||
}
|
||||
|
||||
LOGGER.trace("Received record {} with key {}", value, key);
|
||||
String tableName = source.identifier();
|
||||
switch (operation) {
|
||||
@ -63,6 +70,16 @@ public Map<String, Long> getNumberOfUpdateEventsSeen() {
|
||||
return updateCount.getCounter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause() {
|
||||
isPaused = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
isPaused = false;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
createCount.reset();
|
||||
updateCount.reset();
|
||||
|
Loading…
Reference in New Issue
Block a user