DBZ-8035 Uniform metrics tests

This commit is contained in:
mfvitale 2024-08-06 19:57:43 +02:00 committed by Jiri Pechanec
parent f71ddd9219
commit f3d43a8755
12 changed files with 867 additions and 616 deletions

View File

@ -5,41 +5,30 @@
*/
package io.debezium.connector.binlog;
import static org.assertj.core.api.Assertions.assertThat;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.sql.Connection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.sql.SQLException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotMode;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
import io.debezium.data.VerifyRecord;
import io.debezium.pipeline.AbstractMetricsTest;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.storage.file.history.FileSchemaHistory;
/**
* @author Chris Cranford
*/
public abstract class BinlogMetricsIT<C extends SourceConnector> extends AbstractBinlogConnectorIT<C> {
public abstract class BinlogMetricsIT<C extends SourceConnector> extends AbstractMetricsTest<C> {
private static final Path SCHEMA_HISTORY_PATH = Files.createTestingPath("file-schema-history-metrics.txt").toAbsolutePath();
private static final String SERVER_NAME = "myserver";
@ -48,6 +37,58 @@ public abstract class BinlogMetricsIT<C extends SourceConnector> extends Abstrac
private static final String INSERT1 = "INSERT INTO simple (val) VALUES (25);";
private static final String INSERT2 = "INSERT INTO simple (val) VALUES (50);";
protected abstract String getConnectorName();
protected abstract BinlogTestConnection getTestDatabaseConnection(String databaseName);
@Override
protected String connector() {
return getConnectorName();
}
@Override
protected String server() {
return SERVER_NAME;
}
@Override
protected Configuration.Builder config() {
return DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple"))
.with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE);
}
protected Configuration.Builder noSnapshot(Configuration.Builder config) {
return config.with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA);
}
@Override
protected void executeInsertStatements() throws SQLException {
try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) {
connection.createStatement().execute(INSERT1);
connection.createStatement().execute(INSERT2);
}
}
@Override
protected String tableName() {
return DATABASE.qualifiedTableName("simple");
}
@Override
protected long expectedEvents() {
return 2L;
}
@Override
protected boolean snapshotCompleted() {
return true;
}
@Before
public void before() throws Exception {
// Testing.Print.enable();
@ -67,289 +108,12 @@ public void after() throws Exception {
}
}
@Test
public void testLifecycle() throws Exception {
// start connector
start(getConnectorClass(),
DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple"))
.with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.build());
assertConnectorIsRunning();
// These methods use the JMX metrics, this simply checks they're available as expected
waitForSnapshotToBeCompleted();
waitForStreamingToStart();
// Stop the connector
stopConnector();
// Verify snapshot metrics no longer exist
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
mBeanServer.getMBeanInfo(getSnapshotMetricsObjectName());
Assert.fail("Expected Snapshot Metrics no longer to exist");
}
catch (InstanceNotFoundException e) {
// expected
}
// Verify streaming metrics no longer exist
try {
mBeanServer.getMBeanInfo(getStreamingMetricsObjectName());
Assert.fail("Expected Streaming Metrics no longer to exist");
}
catch (InstanceNotFoundException e) {
// expected
}
}
@Test
public void testSnapshotOnlyMetrics() throws Exception {
// Setup
try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) {
connection.createStatement().execute(INSERT1);
connection.createStatement().execute(INSERT2);
}
// start connector
start(getConnectorClass(),
DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY)
.with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple"))
.with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.build());
assertSnapshotMetrics();
assertStreamingMetricsExist();
}
@Test
public void testPauseResumeSnapshotMetrics() throws Exception {
final int NUM_RECORDS = 1_000;
final String TABLE_NAME = DATABASE.qualifiedTableName("simple");
final String SIGNAL_TABLE_NAME = DATABASE.qualifiedTableName("debezium_signal");
try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) {
for (int i = 1; i < NUM_RECORDS; i++) {
connection.createStatement().execute(String.format("INSERT INTO %s (val) VALUES (%d);", TABLE_NAME, i));
}
}
// Start connector.
start(getConnectorClass(),
DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s", TABLE_NAME))
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1)
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
.with(BinlogConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_TABLE_NAME)
.build());
assertConnectorIsRunning();
waitForSnapshotToBeCompleted();
waitForStreamingToStart();
// Consume initial snapshot records.
List<SourceRecord> records = new ArrayList<>();
consumeRecords(NUM_RECORDS, record -> {
records.add(record);
});
try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) {
// Start incremental snapshot.
connection.createStatement().execute(String.format(
"INSERT INTO debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')", TABLE_NAME));
// Pause incremental snapshot.
connection.createStatement().execute(String.format("INSERT INTO %s VALUES('test-pause', 'pause-snapshot', '')", SIGNAL_TABLE_NAME));
// Sleep more than 1 second, we get the pause in seconds.
Thread.sleep(1500);
// Resume incremental snapshot.
connection.createStatement().execute(String.format("INSERT INTO debezium_signal VALUES('test-resume', 'resume-snapshot', '')", SIGNAL_TABLE_NAME));
}
// Consume incremental snapshot records.
consumeRecords(NUM_RECORDS, record -> {
records.add(record);
});
Assert.assertTrue(records.size() >= 2 * NUM_RECORDS);
assertSnapshotPauseNotZero();
}
@Test
public void testSnapshotAndStreamingMetrics() throws Exception {
// Setup
try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) {
connection.createStatement().execute(INSERT1);
connection.createStatement().execute(INSERT2);
}
// start connector
start(getConnectorClass(),
DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple"))
.with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.build());
assertSnapshotMetrics();
assertStreamingMetrics(0);
}
@Test
public void testStreamingOnlyMetrics() throws Exception {
// start connector
start(getConnectorClass(),
DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple"))
.with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.build());
// CREATE DATABASE, CREATE TABLE, and 2 INSERT
assertStreamingMetrics(4);
assertSnapshotMetricsExist();
}
private void assertNoSnapshotMetricsExist() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted");
Assert.fail("Expected Snapshot Metrics to not exist");
}
catch (InstanceNotFoundException e) {
// expected
}
}
private void assertNoStreamingMetricsExist() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen");
Assert.fail("Expected Streaming Metrics to not exist");
}
catch (InstanceNotFoundException e) {
// expected
}
}
private void assertStreamingMetricsExist() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen");
}
catch (InstanceNotFoundException e) {
Assert.fail("Streaming Metrics should exist");
}
}
private void assertSnapshotMetricsExist() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted");
}
catch (InstanceNotFoundException e) {
Assert.fail("Snapshot Metrics should exist");
}
}
private void assertSnapshotPauseNotZero() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
final long snapshotPauseDuration = (Long) mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds");
Assert.assertTrue(snapshotPauseDuration > 0);
}
catch (InstanceNotFoundException e) {
Assert.fail("Snapshot Metrics should exist");
}
}
private void assertSnapshotMetrics() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// Wait for the snapshot to complete to verify metrics
waitForSnapshotToBeCompleted();
// 4 meta, 1 USE, 1 CREATE, 2 INSERT
consumeRecords(8);
// Check snapshot metrics
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(1);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables"))
.isEqualTo(new String[]{ DATABASE.qualifiedTableName("simple") });
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "NumberOfEventsFiltered")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "NumberOfErroneousEvents")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
}
private void assertStreamingMetrics(long events) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// Wait for the streaming to begin
waitForStreamingToStart();
// Insert new records and wait for them to become available
try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) {
connection.createStatement().execute(INSERT1);
connection.createStatement().execute(INSERT2);
}
waitForAvailableRecords(30, TimeUnit.SECONDS);
// Testing.Print.enable();
int size = consumeAvailableRecords(VerifyRecord::print);
// Check streaming metrics
assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "Connected")).isEqualTo(true);
// note: other connectors would report the physical number of operations, e.g. inserts/updates.
// the MySQL BinaryLogClientStatistics bean which this value is based upon tracks number of events
// read from the log, which may be more than the insert/update/delete operations.
assertThat((Long) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen"))
.isGreaterThanOrEqualTo(events);
Awaitility.await().atMost(Duration.ofMinutes(1)).until(() -> ((String[]) mBeanServer
.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).length > 0);
assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables"))
.isEqualTo(new String[]{ DATABASE.qualifiedTableName("simple") });
}
private ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException {
protected ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException {
return getSnapshotMetricsObjectName(getConnectorName(), SERVER_NAME);
}
private ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException {
public ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException {
return getStreamingMetricsObjectName(getConnectorName(), SERVER_NAME, getStreamingNamespace());
}
private void waitForSnapshotToBeCompleted() throws InterruptedException {
waitForSnapshotToBeCompleted(getConnectorName(), SERVER_NAME);
}
private void waitForStreamingToStart() throws InterruptedException {
waitForStreamingRunning(getConnectorName(), SERVER_NAME, getStreamingNamespace());
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mariadb;
import io.debezium.connector.binlog.BinlogMetricsIT;
import io.debezium.connector.binlog.util.BinlogTestConnection;
/**
* @author Chris Cranford
*/
public class MariaDbMetricsIT extends BinlogMetricsIT<MariaDbConnector> implements MariaDbCommon {
@Override
public Class<MariaDbConnector> getConnectorClass() {
return MariaDbCommon.super.getConnectorClass();
}
@Override
public BinlogTestConnection getTestDatabaseConnection(String databaseName) {
return MariaDbCommon.super.getTestDatabaseConnection(databaseName);
}
@Override
public String getConnectorName() {
return MariaDbCommon.super.getConnectorName();
}
}

View File

@ -1,15 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mariadb;
import io.debezium.connector.binlog.BinlogMetricsIT;
/**
* @author Chris Cranford
*/
public class MetricsIT extends BinlogMetricsIT<MariaDbConnector> implements MariaDbCommon {
}

View File

@ -17,6 +17,7 @@
* @author Chris Cranford
*/
public interface MySqlCommon extends BinlogConnectorTest<MySqlConnector> {
@Override
default String getConnectorName() {
return Module.name();

View File

@ -6,10 +6,25 @@
package io.debezium.connector.mysql;
import io.debezium.connector.binlog.BinlogMetricsIT;
import io.debezium.connector.binlog.util.BinlogTestConnection;
/**
* @author Chris Cranford
*/
public class MySqlMetricsIT extends BinlogMetricsIT<MySqlConnector> implements MySqlCommon {
@Override
public String getConnectorName() {
return MySqlCommon.super.getConnectorName();
}
@Override
public Class<MySqlConnector> getConnectorClass() {
return MySqlCommon.super.getConnectorClass();
}
@Override
public BinlogTestConnection getTestDatabaseConnection(String databaseName) {
return MySqlCommon.super.getTestDatabaseConnection(databaseName);
}
}

View File

@ -0,0 +1,108 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.pipeline.AbstractMetricsTest;
import io.debezium.util.Testing;
public class OracleMetricsIT extends AbstractMetricsTest<OracleConnector> {
private static OracleConnection connection;
@Override
protected Class<OracleConnector> getConnectorClass() {
return OracleConnector.class;
}
@Override
protected String connector() {
return TestHelper.CONNECTOR_NAME;
}
@Override
protected String server() {
return TestHelper.SERVER_NAME;
}
@Override
protected Configuration.Builder config() {
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL);
}
@Override
protected Configuration.Builder noSnapshot(Configuration.Builder config) {
return config.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA);
}
@Override
protected void executeInsertStatements() throws SQLException {
connection.execute("INSERT INTO debezium.customer VALUES (NULL, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))");
connection.execute("INSERT INTO debezium.customer VALUES (NULL, 'Bruce', 2345.67, null)");
connection.execute("COMMIT");
}
@Override
protected String tableName() {
return "ORCLPDB1.DEBEZIUM.CUSTOMER";
}
@Override
protected long expectedEvents() {
return 2L;
}
@Override
protected boolean snapshotCompleted() {
return true;
}
@BeforeClass
public static void beforeClass() throws SQLException {
connection = TestHelper.testConnection();
TestHelper.dropAllTables();
TestHelper.dropTable(connection, "debezium.customer");
String ddl = "create table debezium.customer (" +
" id numeric(9,0) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 101) not null, " +
" name varchar2(1000), " +
" score decimal(6, 2), " +
" registered timestamp, " +
" primary key (id)" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.customer to " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.customer ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
}
@AfterClass
public static void closeConnection() throws SQLException {
if (connection != null) {
TestHelper.dropTable(connection, "debezium.customer");
connection.close();
}
}
@Before
public void before() throws SQLException {
connection.execute("delete from debezium.customer");
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
}
}

View File

@ -8,22 +8,14 @@
import static org.assertj.core.api.Assertions.assertThat;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.openmbean.TabularDataSupport;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@ -31,15 +23,15 @@
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.doc.FixFor;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenJavaVersion;
import io.debezium.util.Testing;
import io.debezium.pipeline.AbstractMetricsTest;
/**
* @author Chris Cranford
* @author Mario Fiore Vitale
*/
public class PostgresMetricsIT extends AbstractRecordsProducerTest {
public class PostgresMetricsIT extends AbstractMetricsTest<PostgresConnector> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresMetricsIT.class);
@ -48,10 +40,58 @@ public class PostgresMetricsIT extends AbstractRecordsProducerTest {
private static final String INSERT_STATEMENTS = "INSERT INTO simple (val) VALUES (25); "
+ "INSERT INTO simple (val) VALUES (50);";
@Override
protected Class<PostgresConnector> getConnectorClass() {
return PostgresConnector.class;
}
@Override
protected String connector() {
return "postgres";
}
@Override
protected String server() {
return "test_server";
}
@Override
protected Configuration.Builder config() {
return TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
}
protected Configuration.Builder noSnapshot(Configuration.Builder config) {
return config.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA);
}
@Override
protected void executeInsertStatements() {
TestHelper.execute(INSERT_STATEMENTS);
}
@Override
protected String tableName() {
return "public.simple";
}
@Override
protected long expectedEvents() {
return 2L;
}
@Override
protected boolean snapshotCompleted() {
return false;
}
@Before
public void before() throws Exception {
TestHelper.dropDefaultReplicationSlot();
TestHelper.dropAllSchemas();
TestHelper.execute(INIT_STATEMENTS);
}
@After
@ -59,293 +99,12 @@ public void after() throws Exception {
stopConnector();
}
@Test
public void testLifecycle() throws Exception {
// start connector
start(PostgresConnector.class,
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build());
assertConnectorIsRunning();
// These methods use the JMX metrics, this simply checks they're available as expected
waitForSnapshotToBeCompleted();
waitForStreamingToStart();
// Stop the connector
stopConnector();
// Verify snapshot metrics no longer exist
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
mBeanServer.getMBeanInfo(getSnapshotMetricsObjectName());
Assert.fail("Expected Snapshot Metrics no longer to exist");
}
catch (InstanceNotFoundException e) {
// expected
}
// Verify streaming metrics no longer exist
try {
mBeanServer.getMBeanInfo(getStreamingMetricsObjectName());
Assert.fail("Expected Streaming Metrics no longer to exist");
}
catch (InstanceNotFoundException e) {
// expected
}
}
@Test
public void testSnapshotOnlyMetrics() throws Exception {
// Setup
TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS);
// start connector
start(PostgresConnector.class,
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build());
assertSnapshotMetrics();
}
@Test
public void testSnapshotAndStreamingMetrics() throws Exception {
// Setup
TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS);
// start connector
start(PostgresConnector.class,
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build());
assertSnapshotMetrics();
assertStreamingMetrics(false);
}
@Test
@FixFor("DBZ-6603")
public void testSnapshotAndStreamingWithCustomMetrics() throws Exception {
// Setup
TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS);
// start connector
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata")
.build();
Map<String, String> customMetricTags = new PostgresConnectorConfig(config).getCustomMetricTags();
start(PostgresConnector.class, config);
assertSnapshotWithCustomMetrics(customMetricTags);
assertStreamingWithCustomMetrics(customMetricTags);
}
@Test
public void testStreamingOnlyMetrics() throws Exception {
// Setup
TestHelper.execute(INIT_STATEMENTS);
// start connector
start(PostgresConnector.class,
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build());
assertSnapshotNotExecutedMetrics();
assertStreamingMetrics(false);
}
@Test
public void testAdvancedStreamingMetrics() throws Exception {
// Setup
TestHelper.execute(INIT_STATEMENTS);
// start connector
start(PostgresConnector.class,
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE)
.build());
assertSnapshotNotExecutedMetrics();
assertStreamingMetrics(true);
}
@Test
public void testPauseAndResumeAdvancedStreamingMetrics() throws Exception {
// Setup
TestHelper.execute(INIT_STATEMENTS);
// start connector
start(PostgresConnector.class,
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE)
.build());
assertSnapshotNotExecutedMetrics();
assertStreamingMetrics(true);
invokeOperation(getStreamingMetricsObjectName(), "pause");
insertRecords();
assertAdvancedMetrics(2);
invokeOperation(getStreamingMetricsObjectName(), "resume");
insertRecords();
assertAdvancedMetrics(4);
}
private void insertRecords() throws InterruptedException {
// Wait for the streaming to begin
TestConsumer consumer = testConsumer(2, "public");
TestHelper.execute(INSERT_STATEMENTS);
consumer.await(TestHelper.waitTimeForRecords() * 30L, TimeUnit.SECONDS);
Thread.sleep(Duration.ofSeconds(2).toMillis());
}
private void assertSnapshotMetrics() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// Wait for the snapshot to complete to verify metrics
waitForSnapshotToBeCompleted();
// Check snapshot metrics
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(1);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables")).isEqualTo(new String[]{ "public.simple" });
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
}
private void assertSnapshotWithCustomMetrics(Map<String, String> customMetricTags) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName objectName = getSnapshotMetricsObjectName("postgres", TestHelper.TEST_SERVER, customMetricTags);
// Wait for the snapshot to complete to verify metrics
waitForSnapshotWithCustomMetricsToBeCompleted(customMetricTags);
// Check snapshot metrics
assertThat(mBeanServer.getAttribute(objectName, "TotalTableCount")).isEqualTo(1);
assertThat(mBeanServer.getAttribute(objectName, "CapturedTables")).isEqualTo(new String[]{ "public.simple" });
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(2L);
assertThat(mBeanServer.getAttribute(objectName, "RemainingTableCount")).isEqualTo(0);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotRunning")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotAborted")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotCompleted")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotPaused")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
}
private void assertSnapshotNotExecutedMetrics() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Awaitility.await("Waiting for snapshot metrics to appear").atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
try {
mBeanServer.getObjectInstance(getSnapshotMetricsObjectName());
return true;
}
catch (InstanceNotFoundException e) {
return false;
}
});
// Check snapshot metrics
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(0);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables")).isEqualTo(new String[]{});
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(false);
}
private void assertStreamingMetrics(boolean checkAdvancedMetrics) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// Wait for the streaming to begin
TestConsumer consumer = testConsumer(2, "public");
waitForStreamingToStart();
// Insert new records and wait for them to become available
TestHelper.execute(INSERT_STATEMENTS);
consumer.await(TestHelper.waitTimeForRecords() * 30L, TimeUnit.SECONDS);
Thread.sleep(Duration.ofSeconds(2).toMillis());
// Check streaming metrics
Testing.print("****ASSERTIONS****");
assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "Connected")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L);
// todo: this does not seem to be populated?
// Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).isEqualTo(new String[] {"public.simple"});
if (checkAdvancedMetrics) {
assertAdvancedMetrics(2);
}
}
public void assertAdvancedMetrics(int expectedInsert) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "NumberOfCreateEventsSeen");
String values = numberOfCreateEventsSeen.values().stream()
.limit(1)
.toList()
.get(0)
.toString();
assertThat(values).isEqualTo(
"javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map<java.lang.String, java.lang.Long>,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=public.simple, value="
+ expectedInsert + "})");
}
private void invokeOperation(ObjectName objectName, String operation)
throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
server.invoke(objectName, operation, new Object[]{}, new String[]{});
}
private void assertStreamingWithCustomMetrics(Map<String, String> customMetricTags) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName objectName = getStreamingMetricsObjectName("postgres", TestHelper.TEST_SERVER, customMetricTags);
// Wait for the streaming to begin
TestConsumer consumer = testConsumer(2, "public");
waitForStreamingWithCustomMetricsToStart(customMetricTags);
// Insert new records and wait for them to become available
TestHelper.execute(INSERT_STATEMENTS);
consumer.await(TestHelper.waitTimeForRecords() * 30L, TimeUnit.SECONDS);
Thread.sleep(Duration.ofSeconds(2).toMillis());
// Check streaming metrics
Testing.print("****ASSERTIONS****");
assertThat(mBeanServer.getAttribute(objectName, "Connected")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(2L);
}
@Test
@SkipWhenJavaVersion(check = EqualityCheck.GREATER_THAN_OR_EQUAL, value = 16, description = "Deep reflection not allowed by default on this Java version")
public void oneRecordInQueue() throws Exception {
// Testing.Print.enable();
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS);
executeInsertStatements();
final CountDownLatch step1 = new CountDownLatch(1);
final CountDownLatch step2 = new CountDownLatch(1);
@ -368,8 +127,8 @@ public void oneRecordInQueue() throws Exception {
LOGGER.info("Record processing completed");
}, true);
waitForStreamingToStart();
TestHelper.execute(INSERT_STATEMENTS);
waitForStreamingRunning(connector(), server());
executeInsertStatements();
LOGGER.info("Waiting for the first record to arrive");
step1.await(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS);
LOGGER.info("First record arrived");
@ -417,11 +176,4 @@ public void oneRecordInQueue() throws Exception {
stopConnector();
}
private ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException {
return getSnapshotMetricsObjectName("postgres", TestHelper.TEST_SERVER);
}
private ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException {
return getStreamingMetricsObjectName("postgres", TestHelper.TEST_SERVER);
}
}

View File

@ -17,6 +17,7 @@
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.transforms.ActivityMonitoringMeter;
/**
* Base implementation of partition-scoped multi-partition SQL Server connector metrics.
@ -24,11 +25,15 @@
abstract public class AbstractSqlServerPartitionMetrics extends Metrics implements SqlServerPartitionMetricsMXBean {
private final CommonEventMeter commonEventMeter;
private final ActivityMonitoringMeter activityMonitoringMeter;
protected final CdcSourceTaskContext taskContext;
AbstractSqlServerPartitionMetrics(CdcSourceTaskContext taskContext, Map<String, String> tags,
EventMetadataProvider metadataProvider) {
super(taskContext, tags);
this.taskContext = taskContext;
this.commonEventMeter = new CommonEventMeter(taskContext.getClock(), metadataProvider);
this.activityMonitoringMeter = new ActivityMonitoringMeter();
}
@Override
@ -71,11 +76,39 @@ public long getNumberOfErroneousEvents() {
return commonEventMeter.getNumberOfErroneousEvents();
}
@Override
public Map<String, Long> getNumberOfCreateEventsSeen() {
return activityMonitoringMeter.getNumberOfCreateEventsSeen();
}
@Override
public Map<String, Long> getNumberOfDeleteEventsSeen() {
return activityMonitoringMeter.getNumberOfDeleteEventsSeen();
}
@Override
public Map<String, Long> getNumberOfUpdateEventsSeen() {
return activityMonitoringMeter.getNumberOfUpdateEventsSeen();
}
@Override
public void pause() {
activityMonitoringMeter.pause();
}
@Override
public void resume() {
activityMonitoringMeter.resume();
}
/**
* Invoked if an event is processed for a captured table.
*/
void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Operation operation) {
commonEventMeter.onEvent(source, offset, key, value, operation);
if (taskContext.getConfig().isAdvancedMetricsEnabled()) {
activityMonitoringMeter.onEvent(source, offset, key, value, operation);
}
}
/**
@ -114,6 +147,8 @@ void onConnectorEvent(ConnectorEvent event) {
@Override
public void reset() {
commonEventMeter.reset();
activityMonitoringMeter.reset();
}
}

View File

@ -7,11 +7,12 @@
import io.debezium.pipeline.metrics.traits.CommonEventMetricsMXBean;
import io.debezium.pipeline.metrics.traits.SchemaMetricsMXBean;
import io.debezium.transforms.ActivityMonitoringMXBean;
/**
* Metrics scoped to a source partition that are common for both snapshot and streaming change event sources
*/
public interface SqlServerPartitionMetricsMXBean extends CommonEventMetricsMXBean, SchemaMetricsMXBean {
public interface SqlServerPartitionMetricsMXBean extends CommonEventMetricsMXBean, SchemaMetricsMXBean, ActivityMonitoringMXBean {
void reset();
}

View File

@ -0,0 +1,163 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import static io.debezium.connector.sqlserver.util.TestHelper.SCHEMA_HISTORY_PATH;
import static io.debezium.connector.sqlserver.util.TestHelper.TEST_DATABASE_1;
import static io.debezium.connector.sqlserver.util.TestHelper.TEST_SERVER_NAME;
import java.sql.SQLException;
import java.util.Map;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.pipeline.AbstractMetricsTest;
import io.debezium.util.Testing;
public class SqlServerMetricsIT extends AbstractMetricsTest<SqlServerConnector> {
@Override
protected Class<SqlServerConnector> getConnectorClass() {
return SqlServerConnector.class;
}
@Override
protected String connector() {
return "sql_server";
}
@Override
protected String server() {
return TEST_SERVER_NAME;
}
@Override
protected Configuration.Builder config() {
return TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL);
}
@Override
protected Configuration.Builder noSnapshot(Configuration.Builder config) {
return TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA);
}
@Override
protected void executeInsertStatements() throws Exception {
connection.execute("INSERT INTO tablea VALUES('a')", "INSERT INTO tablea VALUES('b')");
TestHelper.enableTableCdc(connection, "tablea");
TestHelper.waitForEnabledCdc(connection, "tablea");
}
@Override
protected String tableName() {
return "testDB1.dbo.tablea";
}
@Override
protected long expectedEvents() {
return 2L;
}
@Override
protected boolean snapshotCompleted() {
return true;
}
@Override
protected String task() {
return "0";
}
@Override
protected String database() {
return "testDB1";
}
private SqlServerConnection connection;
@Before
public void before() throws SQLException {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
connection.execute(
"CREATE TABLE tablea (id int IDENTITY(1,1) primary key, cola varchar(30))");
TestHelper.enableTableCdc(connection, "tablea");
TestHelper.adjustCdcPollingInterval(connection, 1);
initializeConnectorTestFramework();
Testing.Files.delete(SCHEMA_HISTORY_PATH);
}
@After
public void after() throws SQLException {
if (connection != null) {
connection.close();
}
}
@Override
protected ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException {
return getSnapshotMetricsObjectName(connector(), server(), task(), TEST_DATABASE_1);
}
@Override
protected ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException {
return getStreamingMetricsObjectName(connector(), server(), getStreamingNamespace(), task());
}
@Override
protected ObjectName getMultiplePartitionStreamingMetricsObjectName() throws MalformedObjectNameException {
return getStreamingMetricsObjectName(connector(), server(), getStreamingNamespace(), task(), TEST_DATABASE_1);
}
@Override
protected ObjectName getMultiplePartitionStreamingMetricsObjectNameCustomTags(Map<String, String> customTags) throws MalformedObjectNameException {
return getStreamingMetricsObjectName(connector(), server(), task(), TEST_DATABASE_1, customTags);
}
@Test
@Override
public void testSnapshotAndStreamingMetrics() throws Exception {
// Setup
executeInsertStatements();
// start connector
start();
assertConnectorIsRunning();
assertSnapshotMetrics();
// For SQL Server we have two more since when the streaming will start from an empty offset
// it will take the last commited transaction in the log and so also the initial inserts will be streamed.
assertStreamingMetrics(false, expectedEvents() + 2);
}
@Test
@Override
public void testSnapshotAndStreamingWithCustomMetrics() throws Exception {
// Setup
executeInsertStatements();
// start connector
Map<String, String> customMetricTags = Map.of("env", "test", "bu", "bigdata");
start(x -> x.with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata"));
assertSnapshotWithCustomMetrics(customMetricTags);
// For SQL Server we have two more since when the streaming will start from an empty offset
// it will take the last commited transaction in the log and so also the initial inserts will be streamed.
assertStreamingWithCustomMetrics(customMetricTags, expectedEvents() + 2);
}
}

View File

@ -33,6 +33,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
@ -1325,6 +1326,19 @@ public static void waitForSnapshotWithCustomMetricsToBeCompleted(String connecto
.getAttribute(getSnapshotMetricsObjectName(connector, server, props), "SnapshotCompleted"));
}
public static void waitForSnapshotWithCustomMetricsToBeCompleted(String connector, String server, String task, String database, Map<String, String> props)
throws InterruptedException {
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
Awaitility.await()
.alias("Streaming was not started on time")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> (boolean) mbeanServer
.getAttribute(getSnapshotMetricsObjectName(connector, server, task, database, props), "SnapshotCompleted"));
}
private static void waitForSnapshotEvent(String connector, String server, String event, String task, String database) throws InterruptedException {
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
@ -1360,7 +1374,16 @@ public static void waitForStreamingWithCustomMetricsToStart(String connector, St
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> isStreamingRunning(connector, server, props));
.until(() -> isStreamingRunning(connector, server, null, null, props));
}
public static void waitForStreamingWithCustomMetricsToStart(String connector, String server, String task, String database, Map<String, String> props) {
Awaitility.await()
.alias("Streaming was not started on time")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> isStreamingRunning(connector, server, task, database, props));
}
public static void waitForConnectorShutdown(String connector, String server) {
@ -1391,11 +1414,11 @@ public static boolean isStreamingRunning(String connector, String server, String
return false;
}
public static boolean isStreamingRunning(String connector, String server, Map<String, String> props) {
public static boolean isStreamingRunning(String connector, String server, String task, String database, Map<String, String> props) {
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
try {
ObjectName streamingMetricsObjectName = getStreamingMetricsObjectName(connector, server, props);
ObjectName streamingMetricsObjectName = getStreamingMetricsObjectName(connector, server, task, null, props);
return (boolean) mbeanServer.getAttribute(streamingMetricsObjectName, "Connected");
}
catch (JMException ignored) {
@ -1429,10 +1452,50 @@ public static ObjectName getSnapshotMetricsObjectName(String connector, String s
return getSnapshotMetricsObjectName(connector, server);
}
public static ObjectName getSnapshotMetricsObjectName(String connector, String server, String task, String database, Map<String, String> props)
throws MalformedObjectNameException {
Map<String, String> taskAndDatabase = new HashMap<>();
taskAndDatabase.put("task", task);
taskAndDatabase.put("database", database);
String additionalProperties = Stream.of(props.entrySet(), taskAndDatabase.entrySet()).flatMap(Set::stream)
.filter(e -> e.getValue() != null)
.map(e -> String.format("%s=%s", e.getKey(), e.getValue()))
.collect(Collectors.joining(","));
if (additionalProperties.length() != 0) {
return new ObjectName("debezium." + connector + ":type=connector-metrics,context=snapshot,server=" + server + "," + additionalProperties);
}
return getSnapshotMetricsObjectName(connector, server);
}
public static ObjectName getStreamingMetricsObjectName(String connector, String server) throws MalformedObjectNameException {
return getStreamingMetricsObjectName(connector, server, getStreamingNamespace());
}
public static ObjectName getStreamingMetricsObjectName(String connector, String server, String context, String task, String database)
throws MalformedObjectNameException {
Map<String, String> props = new HashMap<>();
props.put("task", task);
props.put("database", database);
return getStreamingMetricsObjectName(connector, server, props);
}
public static ObjectName getStreamingMetricsObjectName(String connector, String server, String task, String database, Map<String, String> customTags)
throws MalformedObjectNameException {
Map<String, String> props = new HashMap<>();
props.put("task", task);
props.put("database", database);
props.putAll(customTags);
return getStreamingMetricsObjectName(connector, server, props);
}
public static ObjectName getStreamingMetricsObjectName(String connector, String server, String context) throws MalformedObjectNameException {
return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server);
}

View File

@ -0,0 +1,333 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline;
import static org.assertj.core.api.Assertions.assertThat;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.openmbean.TabularDataSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
public abstract class AbstractMetricsTest<T extends SourceConnector> extends AbstractConnectorTest {
protected abstract Class<T> getConnectorClass();
protected abstract String connector();
protected abstract String server();
protected abstract Configuration.Builder config();
protected abstract Configuration.Builder noSnapshot(Configuration.Builder config);
protected abstract void executeInsertStatements() throws Exception;
protected abstract String tableName();
protected abstract long expectedEvents();
protected abstract boolean snapshotCompleted();
protected String task() {
return null;
}
protected String database() {
return null;
}
protected void start() {
final Configuration config = config().build();
start(getConnectorClass(), config, loggingCompletion(), null);
}
protected void start(Function<Configuration.Builder, Configuration.Builder> custConfig) {
final Configuration config = custConfig.apply(config()).build();
start(getConnectorClass(), config, loggingCompletion(), null);
}
@Test
public void testLifecycle() throws Exception {
start();
assertConnectorIsRunning();
// These methods use the JMX metrics, this simply checks they're available as expected
waitForSnapshotToBeCompleted(connector(), server(), task(), database());
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
// Stop the connector
stopConnector();
// Verify snapshot metrics no longer exist
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
mBeanServer.getMBeanInfo(getSnapshotMetricsObjectName());
Assert.fail("Expected Snapshot Metrics no longer to exist");
}
catch (InstanceNotFoundException e) {
// expected
}
// Verify streaming metrics no longer exist
try {
mBeanServer.getMBeanInfo(getStreamingMetricsObjectName());
Assert.fail("Expected Streaming Metrics no longer to exist");
}
catch (InstanceNotFoundException e) {
// expected
}
}
@Test
public void testSnapshotOnlyMetrics() throws Exception {
// Setup
executeInsertStatements();
// start connector
start();
assertSnapshotMetrics();
}
@Test
public void testSnapshotAndStreamingMetrics() throws Exception {
// Setup
executeInsertStatements();
// start connector
start();
assertConnectorIsRunning();
assertSnapshotMetrics();
assertStreamingMetrics(false, expectedEvents());
}
@Test
@FixFor("DBZ-6603")
public void testSnapshotAndStreamingWithCustomMetrics() throws Exception {
// Setup
executeInsertStatements();
// start connector
Map<String, String> customMetricTags = Map.of("env", "test", "bu", "bigdata");
start(x -> x.with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata"));
assertSnapshotWithCustomMetrics(customMetricTags);
assertStreamingWithCustomMetrics(customMetricTags, expectedEvents());
}
@Test
public void testStreamingOnlyMetrics() throws Exception {
// start connector
start(this::noSnapshot);
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
assertSnapshotNotExecutedMetrics();
assertStreamingMetrics(false, expectedEvents());
}
@Test
public void testAdvancedStreamingMetrics() throws Exception {
// start connector
start(x -> noSnapshot(x)
.with(CommonConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE));
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
assertSnapshotNotExecutedMetrics();
assertStreamingMetrics(true, expectedEvents());
}
@Test
public void testPauseAndResumeAdvancedStreamingMetrics() throws Exception {
// start connector
start(x -> noSnapshot(x)
.with(CommonConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE));
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
assertSnapshotNotExecutedMetrics();
assertStreamingMetrics(true, expectedEvents());
invokeOperation(getMultiplePartitionStreamingMetricsObjectName(), "pause");
insertRecords();
assertAdvancedMetrics(2);
invokeOperation(getMultiplePartitionStreamingMetricsObjectName(), "resume");
insertRecords();
assertAdvancedMetrics(4);
}
private void insertRecords() throws Exception {
// Wait for the streaming to begin
executeInsertStatements();
waitForAvailableRecords(30, TimeUnit.SECONDS);
Thread.sleep(Duration.ofSeconds(2).toMillis());
}
protected void assertSnapshotMetrics() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// Wait for the snapshot to complete to verify metrics
waitForSnapshotToBeCompleted(connector(), server(), task(), database());
// Check snapshot metrics
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(1);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables")).isEqualTo(new String[]{ tableName() });
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
}
protected void assertSnapshotWithCustomMetrics(Map<String, String> customMetricTags) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName objectName = getSnapshotMetricsObjectName(connector(), server(), task(), database(), customMetricTags);
// Wait for the snapshot to complete to verify metrics
waitForSnapshotWithCustomMetricsToBeCompleted(connector(), server(), task(), database(), customMetricTags);
// Check snapshot metrics
assertThat(mBeanServer.getAttribute(objectName, "TotalTableCount")).isEqualTo(1);
assertThat(mBeanServer.getAttribute(objectName, "CapturedTables")).isEqualTo(new String[]{ tableName() });
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(2L);
assertThat(mBeanServer.getAttribute(objectName, "RemainingTableCount")).isEqualTo(0);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotRunning")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotAborted")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotCompleted")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotPaused")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
}
private void assertSnapshotNotExecutedMetrics() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Awaitility.await("Waiting for snapshot metrics to appear").atMost(waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
try {
mBeanServer.getObjectInstance(getSnapshotMetricsObjectName());
return true;
}
catch (InstanceNotFoundException e) {
return false;
}
});
// Check snapshot metrics
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(snapshotCompleted());
}
protected void assertStreamingMetrics(boolean checkAdvancedMetrics, long expectedEvents) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
// Insert new records and wait for them to become available
executeInsertStatements();
// Testing.Print.enable();
consumeRecordsByTopic((int) expectedEvents);
Thread.sleep(Duration.ofSeconds(2).toMillis());
// Check streaming metrics
Testing.print("****ASSERTIONS****");
assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "Connected")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(getMultiplePartitionStreamingMetricsObjectName(), "TotalNumberOfCreateEventsSeen")).isEqualTo(expectedEvents);
if (checkAdvancedMetrics) {
assertAdvancedMetrics(2L);
}
}
public void assertAdvancedMetrics(long expectedInsert) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer
.getAttribute(getStreamingMetricsObjectName(connector(), server(), getStreamingNamespace(), task(), database()), "NumberOfCreateEventsSeen");
String values = numberOfCreateEventsSeen.values().stream()
.limit(1)
.toList()
.get(0)
.toString();
assertThat(values).isEqualTo(
"javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map<java.lang.String, java.lang.Long>,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key="
+ tableName() + ", value="
+ expectedInsert + "})");
}
private void invokeOperation(ObjectName objectName, String operation)
throws ReflectionException, InstanceNotFoundException, MBeanException {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
server.invoke(objectName, operation, new Object[]{}, new String[]{});
}
protected void assertStreamingWithCustomMetrics(Map<String, String> customMetricTags, long expectedEvents) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
waitForStreamingWithCustomMetricsToStart(connector(), server(), task(), database(), customMetricTags);
// Insert new records and wait for them to become available
executeInsertStatements();
waitForAvailableRecords(30, TimeUnit.SECONDS);
consumeRecords((int) expectedEvents);
Thread.sleep(Duration.ofSeconds(2).toMillis());
// Check streaming metrics
Testing.print("****ASSERTIONS****");
assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(connector(), server(), task(), null, customMetricTags), "Connected")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(getMultiplePartitionStreamingMetricsObjectNameCustomTags(customMetricTags), "TotalNumberOfCreateEventsSeen"))
.isEqualTo(expectedEvents);
}
protected ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException {
return getSnapshotMetricsObjectName(connector(), server());
}
protected ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException {
return getStreamingMetricsObjectName(connector(), server());
}
protected ObjectName getMultiplePartitionStreamingMetricsObjectName() throws MalformedObjectNameException {
// Only SQL Server manage partition scoped metrics
return getStreamingMetricsObjectName(connector(), server());
}
protected ObjectName getMultiplePartitionStreamingMetricsObjectNameCustomTags(Map<String, String> customTags) throws MalformedObjectNameException {
// Only SQL Server manage partition scoped metrics
return getStreamingMetricsObjectName(connector(), server(), customTags);
}
}