diff --git a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogMetricsIT.java b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogMetricsIT.java index 272e557b6..2f9cba9c1 100644 --- a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogMetricsIT.java +++ b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogMetricsIT.java @@ -5,41 +5,30 @@ */ package io.debezium.connector.binlog; -import static org.assertj.core.api.Assertions.assertThat; - -import java.lang.management.ManagementFactory; import java.nio.file.Path; import java.sql.Connection; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; +import java.sql.SQLException; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import org.apache.kafka.connect.source.SourceConnector; -import org.apache.kafka.connect.source.SourceRecord; -import org.awaitility.Awaitility; import org.junit.After; -import org.junit.Assert; import org.junit.Before; -import org.junit.Test; -import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; import io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotMode; +import io.debezium.connector.binlog.util.BinlogTestConnection; import io.debezium.connector.binlog.util.TestHelper; import io.debezium.connector.binlog.util.UniqueDatabase; -import io.debezium.data.VerifyRecord; +import io.debezium.pipeline.AbstractMetricsTest; import io.debezium.relational.history.SchemaHistory; import io.debezium.storage.file.history.FileSchemaHistory; /** * @author Chris Cranford */ -public abstract class BinlogMetricsIT extends AbstractBinlogConnectorIT { +public abstract class BinlogMetricsIT extends AbstractMetricsTest { private static final Path SCHEMA_HISTORY_PATH = Files.createTestingPath("file-schema-history-metrics.txt").toAbsolutePath(); private static final String SERVER_NAME = "myserver"; @@ -48,6 +37,58 @@ public abstract class BinlogMetricsIT extends Abstrac private static final String INSERT1 = "INSERT INTO simple (val) VALUES (25);"; private static final String INSERT2 = "INSERT INTO simple (val) VALUES (50);"; + protected abstract String getConnectorName(); + + protected abstract BinlogTestConnection getTestDatabaseConnection(String databaseName); + + @Override + protected String connector() { + return getConnectorName(); + } + + @Override + protected String server() { + return SERVER_NAME; + } + + @Override + protected Configuration.Builder config() { + return DATABASE.defaultConfig() + .with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class) + .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) + .with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple")) + .with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE) + .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE); + } + + protected Configuration.Builder noSnapshot(Configuration.Builder config) { + return config.with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA); + } + + @Override + protected void executeInsertStatements() throws SQLException { + try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) { + connection.createStatement().execute(INSERT1); + connection.createStatement().execute(INSERT2); + } + } + + @Override + protected String tableName() { + return DATABASE.qualifiedTableName("simple"); + } + + @Override + protected long expectedEvents() { + return 2L; + } + + @Override + protected boolean snapshotCompleted() { + return true; + } + @Before public void before() throws Exception { // Testing.Print.enable(); @@ -67,289 +108,12 @@ public void after() throws Exception { } } - @Test - public void testLifecycle() throws Exception { - // start connector - start(getConnectorClass(), - DATABASE.defaultConfig() - .with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) - .with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class) - .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) - .with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple")) - .with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE) - .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE) - .build()); - - assertConnectorIsRunning(); - - // These methods use the JMX metrics, this simply checks they're available as expected - waitForSnapshotToBeCompleted(); - waitForStreamingToStart(); - - // Stop the connector - stopConnector(); - - // Verify snapshot metrics no longer exist - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - try { - mBeanServer.getMBeanInfo(getSnapshotMetricsObjectName()); - Assert.fail("Expected Snapshot Metrics no longer to exist"); - } - catch (InstanceNotFoundException e) { - // expected - } - - // Verify streaming metrics no longer exist - try { - mBeanServer.getMBeanInfo(getStreamingMetricsObjectName()); - Assert.fail("Expected Streaming Metrics no longer to exist"); - } - catch (InstanceNotFoundException e) { - // expected - } - } - - @Test - public void testSnapshotOnlyMetrics() throws Exception { - // Setup - try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) { - connection.createStatement().execute(INSERT1); - connection.createStatement().execute(INSERT2); - } - - // start connector - start(getConnectorClass(), - DATABASE.defaultConfig() - .with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY) - .with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class) - .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) - .with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple")) - .with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE) - .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE) - .build()); - - assertSnapshotMetrics(); - assertStreamingMetricsExist(); - } - - @Test - public void testPauseResumeSnapshotMetrics() throws Exception { - final int NUM_RECORDS = 1_000; - final String TABLE_NAME = DATABASE.qualifiedTableName("simple"); - final String SIGNAL_TABLE_NAME = DATABASE.qualifiedTableName("debezium_signal"); - - try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) { - for (int i = 1; i < NUM_RECORDS; i++) { - connection.createStatement().execute(String.format("INSERT INTO %s (val) VALUES (%d);", TABLE_NAME, i)); - } - } - - // Start connector. - start(getConnectorClass(), - DATABASE.defaultConfig() - .with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) - .with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class) - .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) - .with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s", TABLE_NAME)) - .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE) - .with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1) - .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) - .with(BinlogConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_TABLE_NAME) - .build()); - - assertConnectorIsRunning(); - waitForSnapshotToBeCompleted(); - waitForStreamingToStart(); - - // Consume initial snapshot records. - List records = new ArrayList<>(); - consumeRecords(NUM_RECORDS, record -> { - records.add(record); - }); - - try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) { - // Start incremental snapshot. - connection.createStatement().execute(String.format( - "INSERT INTO debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')", TABLE_NAME)); - // Pause incremental snapshot. - connection.createStatement().execute(String.format("INSERT INTO %s VALUES('test-pause', 'pause-snapshot', '')", SIGNAL_TABLE_NAME)); - // Sleep more than 1 second, we get the pause in seconds. - Thread.sleep(1500); - // Resume incremental snapshot. - connection.createStatement().execute(String.format("INSERT INTO debezium_signal VALUES('test-resume', 'resume-snapshot', '')", SIGNAL_TABLE_NAME)); - } - - // Consume incremental snapshot records. - consumeRecords(NUM_RECORDS, record -> { - records.add(record); - }); - - Assert.assertTrue(records.size() >= 2 * NUM_RECORDS); - assertSnapshotPauseNotZero(); - } - - @Test - public void testSnapshotAndStreamingMetrics() throws Exception { - // Setup - try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) { - connection.createStatement().execute(INSERT1); - connection.createStatement().execute(INSERT2); - } - - // start connector - start(getConnectorClass(), - DATABASE.defaultConfig() - .with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) - .with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class) - .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) - .with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple")) - .with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE) - .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE) - .build()); - - assertSnapshotMetrics(); - assertStreamingMetrics(0); - } - - @Test - public void testStreamingOnlyMetrics() throws Exception { - // start connector - start(getConnectorClass(), - DATABASE.defaultConfig() - .with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) - .with(BinlogConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class) - .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) - .with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple")) - .with(BinlogConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE) - .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE) - .build()); - - // CREATE DATABASE, CREATE TABLE, and 2 INSERT - assertStreamingMetrics(4); - assertSnapshotMetricsExist(); - } - - private void assertNoSnapshotMetricsExist() throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - try { - mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted"); - Assert.fail("Expected Snapshot Metrics to not exist"); - } - catch (InstanceNotFoundException e) { - // expected - } - } - - private void assertNoStreamingMetricsExist() throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - try { - mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen"); - Assert.fail("Expected Streaming Metrics to not exist"); - } - catch (InstanceNotFoundException e) { - // expected - } - } - - private void assertStreamingMetricsExist() throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - try { - mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen"); - } - catch (InstanceNotFoundException e) { - Assert.fail("Streaming Metrics should exist"); - } - } - - private void assertSnapshotMetricsExist() throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - try { - mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted"); - } - catch (InstanceNotFoundException e) { - Assert.fail("Snapshot Metrics should exist"); - } - } - - private void assertSnapshotPauseNotZero() throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - try { - final long snapshotPauseDuration = (Long) mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds"); - Assert.assertTrue(snapshotPauseDuration > 0); - } - catch (InstanceNotFoundException e) { - Assert.fail("Snapshot Metrics should exist"); - } - } - - private void assertSnapshotMetrics() throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - - // Wait for the snapshot to complete to verify metrics - waitForSnapshotToBeCompleted(); - - // 4 meta, 1 USE, 1 CREATE, 2 INSERT - consumeRecords(8); - - // Check snapshot metrics - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(1); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables")) - .isEqualTo(new String[]{ DATABASE.qualifiedTableName("simple") }); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "NumberOfEventsFiltered")).isEqualTo(0L); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "NumberOfErroneousEvents")).isEqualTo(0L); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L); - } - - private void assertStreamingMetrics(long events) throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - - // Wait for the streaming to begin - waitForStreamingToStart(); - - // Insert new records and wait for them to become available - try (Connection connection = getTestDatabaseConnection(DATABASE.getDatabaseName()).connection()) { - connection.createStatement().execute(INSERT1); - connection.createStatement().execute(INSERT2); - } - - waitForAvailableRecords(30, TimeUnit.SECONDS); - - // Testing.Print.enable(); - int size = consumeAvailableRecords(VerifyRecord::print); - - // Check streaming metrics - assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "Connected")).isEqualTo(true); - // note: other connectors would report the physical number of operations, e.g. inserts/updates. - // the MySQL BinaryLogClientStatistics bean which this value is based upon tracks number of events - // read from the log, which may be more than the insert/update/delete operations. - assertThat((Long) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen")) - .isGreaterThanOrEqualTo(events); - - Awaitility.await().atMost(Duration.ofMinutes(1)).until(() -> ((String[]) mBeanServer - .getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).length > 0); - assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")) - .isEqualTo(new String[]{ DATABASE.qualifiedTableName("simple") }); - } - - private ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException { + protected ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException { return getSnapshotMetricsObjectName(getConnectorName(), SERVER_NAME); } - private ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException { + public ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException { return getStreamingMetricsObjectName(getConnectorName(), SERVER_NAME, getStreamingNamespace()); } - private void waitForSnapshotToBeCompleted() throws InterruptedException { - waitForSnapshotToBeCompleted(getConnectorName(), SERVER_NAME); - } - - private void waitForStreamingToStart() throws InterruptedException { - waitForStreamingRunning(getConnectorName(), SERVER_NAME, getStreamingNamespace()); - } } diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbMetricsIT.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbMetricsIT.java new file mode 100644 index 000000000..a6241404c --- /dev/null +++ b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbMetricsIT.java @@ -0,0 +1,31 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mariadb; + +import io.debezium.connector.binlog.BinlogMetricsIT; +import io.debezium.connector.binlog.util.BinlogTestConnection; + +/** + * @author Chris Cranford + */ +public class MariaDbMetricsIT extends BinlogMetricsIT implements MariaDbCommon { + + @Override + public Class getConnectorClass() { + return MariaDbCommon.super.getConnectorClass(); + } + + @Override + public BinlogTestConnection getTestDatabaseConnection(String databaseName) { + return MariaDbCommon.super.getTestDatabaseConnection(databaseName); + } + + @Override + public String getConnectorName() { + return MariaDbCommon.super.getConnectorName(); + } + +} diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MetricsIT.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MetricsIT.java deleted file mode 100644 index 74ae3c819..000000000 --- a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MetricsIT.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.mariadb; - -import io.debezium.connector.binlog.BinlogMetricsIT; - -/** - * @author Chris Cranford - */ -public class MetricsIT extends BinlogMetricsIT implements MariaDbCommon { - -} diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlCommon.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlCommon.java index 714eecd03..34942298f 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlCommon.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlCommon.java @@ -17,6 +17,7 @@ * @author Chris Cranford */ public interface MySqlCommon extends BinlogConnectorTest { + @Override default String getConnectorName() { return Module.name(); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java index b3de38540..a70e092f4 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java @@ -6,10 +6,25 @@ package io.debezium.connector.mysql; import io.debezium.connector.binlog.BinlogMetricsIT; +import io.debezium.connector.binlog.util.BinlogTestConnection; /** * @author Chris Cranford */ public class MySqlMetricsIT extends BinlogMetricsIT implements MySqlCommon { + @Override + public String getConnectorName() { + return MySqlCommon.super.getConnectorName(); + } + + @Override + public Class getConnectorClass() { + return MySqlCommon.super.getConnectorClass(); + } + + @Override + public BinlogTestConnection getTestDatabaseConnection(String databaseName) { + return MySqlCommon.super.getTestDatabaseConnection(databaseName); + } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleMetricsIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleMetricsIT.java new file mode 100644 index 000000000..51477e43c --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleMetricsIT.java @@ -0,0 +1,108 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import io.debezium.config.Configuration; +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.pipeline.AbstractMetricsTest; +import io.debezium.util.Testing; + +public class OracleMetricsIT extends AbstractMetricsTest { + + private static OracleConnection connection; + + @Override + protected Class getConnectorClass() { + return OracleConnector.class; + } + + @Override + protected String connector() { + return TestHelper.CONNECTOR_NAME; + } + + @Override + protected String server() { + return TestHelper.SERVER_NAME; + } + + @Override + protected Configuration.Builder config() { + return TestHelper.defaultConfig() + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL); + } + + @Override + protected Configuration.Builder noSnapshot(Configuration.Builder config) { + return config.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA); + } + + @Override + protected void executeInsertStatements() throws SQLException { + connection.execute("INSERT INTO debezium.customer VALUES (NULL, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"); + connection.execute("INSERT INTO debezium.customer VALUES (NULL, 'Bruce', 2345.67, null)"); + connection.execute("COMMIT"); + } + + @Override + protected String tableName() { + return "ORCLPDB1.DEBEZIUM.CUSTOMER"; + } + + @Override + protected long expectedEvents() { + return 2L; + } + + @Override + protected boolean snapshotCompleted() { + return true; + } + + @BeforeClass + public static void beforeClass() throws SQLException { + connection = TestHelper.testConnection(); + + TestHelper.dropAllTables(); + + TestHelper.dropTable(connection, "debezium.customer"); + + String ddl = "create table debezium.customer (" + + " id numeric(9,0) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 101) not null, " + + " name varchar2(1000), " + + " score decimal(6, 2), " + + " registered timestamp, " + + " primary key (id)" + + ")"; + + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.customer to " + TestHelper.getConnectorUserName()); + connection.execute("ALTER TABLE debezium.customer ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + } + + @AfterClass + public static void closeConnection() throws SQLException { + if (connection != null) { + TestHelper.dropTable(connection, "debezium.customer"); + connection.close(); + } + } + + @Before + public void before() throws SQLException { + connection.execute("delete from debezium.customer"); + setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS); + initializeConnectorTestFramework(); + Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH); + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java index 61695f68b..abd673158 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java @@ -8,22 +8,14 @@ import static org.assertj.core.api.Assertions.assertThat; import java.lang.management.ManagementFactory; -import java.time.Duration; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.management.InstanceNotFoundException; -import javax.management.MBeanException; import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.ReflectionException; -import javax.management.openmbean.TabularDataSupport; import org.awaitility.Awaitility; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -31,15 +23,15 @@ import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; -import io.debezium.doc.FixFor; import io.debezium.junit.EqualityCheck; import io.debezium.junit.SkipWhenJavaVersion; -import io.debezium.util.Testing; +import io.debezium.pipeline.AbstractMetricsTest; /** * @author Chris Cranford + * @author Mario Fiore Vitale */ -public class PostgresMetricsIT extends AbstractRecordsProducerTest { +public class PostgresMetricsIT extends AbstractMetricsTest { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresMetricsIT.class); @@ -48,10 +40,58 @@ public class PostgresMetricsIT extends AbstractRecordsProducerTest { private static final String INSERT_STATEMENTS = "INSERT INTO simple (val) VALUES (25); " + "INSERT INTO simple (val) VALUES (50);"; + @Override + protected Class getConnectorClass() { + return PostgresConnector.class; + } + + @Override + protected String connector() { + return "postgres"; + } + + @Override + protected String server() { + return "test_server"; + } + + @Override + protected Configuration.Builder config() { + return TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE); + } + + protected Configuration.Builder noSnapshot(Configuration.Builder config) { + return config.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA); + } + + @Override + protected void executeInsertStatements() { + TestHelper.execute(INSERT_STATEMENTS); + } + + @Override + protected String tableName() { + return "public.simple"; + } + + @Override + protected long expectedEvents() { + return 2L; + } + + @Override + protected boolean snapshotCompleted() { + return false; + } + @Before public void before() throws Exception { TestHelper.dropDefaultReplicationSlot(); TestHelper.dropAllSchemas(); + + TestHelper.execute(INIT_STATEMENTS); } @After @@ -59,293 +99,12 @@ public void after() throws Exception { stopConnector(); } - @Test - public void testLifecycle() throws Exception { - // start connector - start(PostgresConnector.class, - TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) - .build()); - - assertConnectorIsRunning(); - - // These methods use the JMX metrics, this simply checks they're available as expected - waitForSnapshotToBeCompleted(); - waitForStreamingToStart(); - - // Stop the connector - stopConnector(); - - // Verify snapshot metrics no longer exist - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - try { - mBeanServer.getMBeanInfo(getSnapshotMetricsObjectName()); - Assert.fail("Expected Snapshot Metrics no longer to exist"); - } - catch (InstanceNotFoundException e) { - // expected - } - - // Verify streaming metrics no longer exist - try { - mBeanServer.getMBeanInfo(getStreamingMetricsObjectName()); - Assert.fail("Expected Streaming Metrics no longer to exist"); - } - catch (InstanceNotFoundException e) { - // expected - } - } - - @Test - public void testSnapshotOnlyMetrics() throws Exception { - // Setup - TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS); - - // start connector - start(PostgresConnector.class, - TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) - .build()); - - assertSnapshotMetrics(); - } - - @Test - public void testSnapshotAndStreamingMetrics() throws Exception { - // Setup - TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS); - - // start connector - start(PostgresConnector.class, - TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) - .build()); - - assertSnapshotMetrics(); - assertStreamingMetrics(false); - } - - @Test - @FixFor("DBZ-6603") - public void testSnapshotAndStreamingWithCustomMetrics() throws Exception { - // Setup - TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS); - - // start connector - Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) - .with(PostgresConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata") - .build(); - Map customMetricTags = new PostgresConnectorConfig(config).getCustomMetricTags(); - start(PostgresConnector.class, config); - - assertSnapshotWithCustomMetrics(customMetricTags); - assertStreamingWithCustomMetrics(customMetricTags); - } - - @Test - public void testStreamingOnlyMetrics() throws Exception { - // Setup - TestHelper.execute(INIT_STATEMENTS); - - // start connector - start(PostgresConnector.class, - TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) - .build()); - - assertSnapshotNotExecutedMetrics(); - assertStreamingMetrics(false); - } - - @Test - public void testAdvancedStreamingMetrics() throws Exception { - // Setup - TestHelper.execute(INIT_STATEMENTS); - - // start connector - start(PostgresConnector.class, - TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) - .with(PostgresConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE) - .build()); - - assertSnapshotNotExecutedMetrics(); - assertStreamingMetrics(true); - } - - @Test - public void testPauseAndResumeAdvancedStreamingMetrics() throws Exception { - // Setup - TestHelper.execute(INIT_STATEMENTS); - - // start connector - start(PostgresConnector.class, - TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) - .with(PostgresConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE) - .build()); - - assertSnapshotNotExecutedMetrics(); - assertStreamingMetrics(true); - - invokeOperation(getStreamingMetricsObjectName(), "pause"); - insertRecords(); - assertAdvancedMetrics(2); - - invokeOperation(getStreamingMetricsObjectName(), "resume"); - insertRecords(); - assertAdvancedMetrics(4); - } - - private void insertRecords() throws InterruptedException { - // Wait for the streaming to begin - TestConsumer consumer = testConsumer(2, "public"); - TestHelper.execute(INSERT_STATEMENTS); - consumer.await(TestHelper.waitTimeForRecords() * 30L, TimeUnit.SECONDS); - Thread.sleep(Duration.ofSeconds(2).toMillis()); - } - - private void assertSnapshotMetrics() throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - - // Wait for the snapshot to complete to verify metrics - waitForSnapshotToBeCompleted(); - - // Check snapshot metrics - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(1); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables")).isEqualTo(new String[]{ "public.simple" }); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L); - } - - private void assertSnapshotWithCustomMetrics(Map customMetricTags) throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - final ObjectName objectName = getSnapshotMetricsObjectName("postgres", TestHelper.TEST_SERVER, customMetricTags); - - // Wait for the snapshot to complete to verify metrics - waitForSnapshotWithCustomMetricsToBeCompleted(customMetricTags); - - // Check snapshot metrics - assertThat(mBeanServer.getAttribute(objectName, "TotalTableCount")).isEqualTo(1); - assertThat(mBeanServer.getAttribute(objectName, "CapturedTables")).isEqualTo(new String[]{ "public.simple" }); - assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(2L); - assertThat(mBeanServer.getAttribute(objectName, "RemainingTableCount")).isEqualTo(0); - assertThat(mBeanServer.getAttribute(objectName, "SnapshotRunning")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(objectName, "SnapshotAborted")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(objectName, "SnapshotCompleted")).isEqualTo(true); - assertThat(mBeanServer.getAttribute(objectName, "SnapshotPaused")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(objectName, "SnapshotPausedDurationInSeconds")).isEqualTo(0L); - } - - private void assertSnapshotNotExecutedMetrics() throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - - Awaitility.await("Waiting for snapshot metrics to appear").atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { - try { - mBeanServer.getObjectInstance(getSnapshotMetricsObjectName()); - return true; - } - catch (InstanceNotFoundException e) { - return false; - } - }); - - // Check snapshot metrics - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(0); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables")).isEqualTo(new String[]{}); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(0L); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false); - assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(false); - } - - private void assertStreamingMetrics(boolean checkAdvancedMetrics) throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - - // Wait for the streaming to begin - TestConsumer consumer = testConsumer(2, "public"); - waitForStreamingToStart(); - - // Insert new records and wait for them to become available - TestHelper.execute(INSERT_STATEMENTS); - consumer.await(TestHelper.waitTimeForRecords() * 30L, TimeUnit.SECONDS); - Thread.sleep(Duration.ofSeconds(2).toMillis()); - - // Check streaming metrics - Testing.print("****ASSERTIONS****"); - assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "Connected")).isEqualTo(true); - assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L); - // todo: this does not seem to be populated? - // Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).isEqualTo(new String[] {"public.simple"}); - - if (checkAdvancedMetrics) { - assertAdvancedMetrics(2); - } - } - - public void assertAdvancedMetrics(int expectedInsert) throws Exception { - - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - - TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "NumberOfCreateEventsSeen"); - - String values = numberOfCreateEventsSeen.values().stream() - .limit(1) - .toList() - .get(0) - .toString(); - assertThat(values).isEqualTo( - "javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=public.simple, value=" - + expectedInsert + "})"); - } - - private void invokeOperation(ObjectName objectName, String operation) - throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException { - - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - - server.invoke(objectName, operation, new Object[]{}, new String[]{}); - } - - private void assertStreamingWithCustomMetrics(Map customMetricTags) throws Exception { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - final ObjectName objectName = getStreamingMetricsObjectName("postgres", TestHelper.TEST_SERVER, customMetricTags); - - // Wait for the streaming to begin - TestConsumer consumer = testConsumer(2, "public"); - waitForStreamingWithCustomMetricsToStart(customMetricTags); - - // Insert new records and wait for them to become available - TestHelper.execute(INSERT_STATEMENTS); - consumer.await(TestHelper.waitTimeForRecords() * 30L, TimeUnit.SECONDS); - Thread.sleep(Duration.ofSeconds(2).toMillis()); - - // Check streaming metrics - Testing.print("****ASSERTIONS****"); - assertThat(mBeanServer.getAttribute(objectName, "Connected")).isEqualTo(true); - assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(2L); - } - @Test @SkipWhenJavaVersion(check = EqualityCheck.GREATER_THAN_OR_EQUAL, value = 16, description = "Deep reflection not allowed by default on this Java version") public void oneRecordInQueue() throws Exception { // Testing.Print.enable(); final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS); + executeInsertStatements(); final CountDownLatch step1 = new CountDownLatch(1); final CountDownLatch step2 = new CountDownLatch(1); @@ -368,8 +127,8 @@ public void oneRecordInQueue() throws Exception { LOGGER.info("Record processing completed"); }, true); - waitForStreamingToStart(); - TestHelper.execute(INSERT_STATEMENTS); + waitForStreamingRunning(connector(), server()); + executeInsertStatements(); LOGGER.info("Waiting for the first record to arrive"); step1.await(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS); LOGGER.info("First record arrived"); @@ -417,11 +176,4 @@ public void oneRecordInQueue() throws Exception { stopConnector(); } - private ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException { - return getSnapshotMetricsObjectName("postgres", TestHelper.TEST_SERVER); - } - - private ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException { - return getStreamingMetricsObjectName("postgres", TestHelper.TEST_SERVER); - } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/AbstractSqlServerPartitionMetrics.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/AbstractSqlServerPartitionMetrics.java index 9b040e177..09a95cf8a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/AbstractSqlServerPartitionMetrics.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/AbstractSqlServerPartitionMetrics.java @@ -17,6 +17,7 @@ import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.spi.schema.DataCollectionId; +import io.debezium.transforms.ActivityMonitoringMeter; /** * Base implementation of partition-scoped multi-partition SQL Server connector metrics. @@ -24,11 +25,15 @@ abstract public class AbstractSqlServerPartitionMetrics extends Metrics implements SqlServerPartitionMetricsMXBean { private final CommonEventMeter commonEventMeter; + private final ActivityMonitoringMeter activityMonitoringMeter; + protected final CdcSourceTaskContext taskContext; AbstractSqlServerPartitionMetrics(CdcSourceTaskContext taskContext, Map tags, EventMetadataProvider metadataProvider) { super(taskContext, tags); + this.taskContext = taskContext; this.commonEventMeter = new CommonEventMeter(taskContext.getClock(), metadataProvider); + this.activityMonitoringMeter = new ActivityMonitoringMeter(); } @Override @@ -71,11 +76,39 @@ public long getNumberOfErroneousEvents() { return commonEventMeter.getNumberOfErroneousEvents(); } + @Override + public Map getNumberOfCreateEventsSeen() { + return activityMonitoringMeter.getNumberOfCreateEventsSeen(); + } + + @Override + public Map getNumberOfDeleteEventsSeen() { + return activityMonitoringMeter.getNumberOfDeleteEventsSeen(); + } + + @Override + public Map getNumberOfUpdateEventsSeen() { + return activityMonitoringMeter.getNumberOfUpdateEventsSeen(); + } + + @Override + public void pause() { + activityMonitoringMeter.pause(); + } + + @Override + public void resume() { + activityMonitoringMeter.resume(); + } + /** * Invoked if an event is processed for a captured table. */ void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Operation operation) { commonEventMeter.onEvent(source, offset, key, value, operation); + if (taskContext.getConfig().isAdvancedMetricsEnabled()) { + activityMonitoringMeter.onEvent(source, offset, key, value, operation); + } } /** @@ -114,6 +147,8 @@ void onConnectorEvent(ConnectorEvent event) { @Override public void reset() { + commonEventMeter.reset(); + activityMonitoringMeter.reset(); } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerPartitionMetricsMXBean.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerPartitionMetricsMXBean.java index 7a6dd52a3..e24836d35 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerPartitionMetricsMXBean.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerPartitionMetricsMXBean.java @@ -7,11 +7,12 @@ import io.debezium.pipeline.metrics.traits.CommonEventMetricsMXBean; import io.debezium.pipeline.metrics.traits.SchemaMetricsMXBean; +import io.debezium.transforms.ActivityMonitoringMXBean; /** * Metrics scoped to a source partition that are common for both snapshot and streaming change event sources */ -public interface SqlServerPartitionMetricsMXBean extends CommonEventMetricsMXBean, SchemaMetricsMXBean { +public interface SqlServerPartitionMetricsMXBean extends CommonEventMetricsMXBean, SchemaMetricsMXBean, ActivityMonitoringMXBean { void reset(); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerMetricsIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerMetricsIT.java new file mode 100644 index 000000000..772d78416 --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerMetricsIT.java @@ -0,0 +1,163 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import static io.debezium.connector.sqlserver.util.TestHelper.SCHEMA_HISTORY_PATH; +import static io.debezium.connector.sqlserver.util.TestHelper.TEST_DATABASE_1; +import static io.debezium.connector.sqlserver.util.TestHelper.TEST_SERVER_NAME; + +import java.sql.SQLException; +import java.util.Map; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.util.TestHelper; +import io.debezium.pipeline.AbstractMetricsTest; +import io.debezium.util.Testing; + +public class SqlServerMetricsIT extends AbstractMetricsTest { + + @Override + protected Class getConnectorClass() { + return SqlServerConnector.class; + } + + @Override + protected String connector() { + return "sql_server"; + } + + @Override + protected String server() { + return TEST_SERVER_NAME; + } + + @Override + protected Configuration.Builder config() { + return TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL); + } + + @Override + protected Configuration.Builder noSnapshot(Configuration.Builder config) { + return TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA); + } + + @Override + protected void executeInsertStatements() throws Exception { + connection.execute("INSERT INTO tablea VALUES('a')", "INSERT INTO tablea VALUES('b')"); + TestHelper.enableTableCdc(connection, "tablea"); + TestHelper.waitForEnabledCdc(connection, "tablea"); + } + + @Override + protected String tableName() { + return "testDB1.dbo.tablea"; + } + + @Override + protected long expectedEvents() { + return 2L; + } + + @Override + protected boolean snapshotCompleted() { + return true; + } + + @Override + protected String task() { + return "0"; + } + + @Override + protected String database() { + return "testDB1"; + } + + private SqlServerConnection connection; + + @Before + public void before() throws SQLException { + TestHelper.createTestDatabase(); + connection = TestHelper.testConnection(); + connection.execute( + "CREATE TABLE tablea (id int IDENTITY(1,1) primary key, cola varchar(30))"); + TestHelper.enableTableCdc(connection, "tablea"); + TestHelper.adjustCdcPollingInterval(connection, 1); + initializeConnectorTestFramework(); + Testing.Files.delete(SCHEMA_HISTORY_PATH); + } + + @After + public void after() throws SQLException { + if (connection != null) { + connection.close(); + } + } + + @Override + protected ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException { + return getSnapshotMetricsObjectName(connector(), server(), task(), TEST_DATABASE_1); + } + + @Override + protected ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException { + return getStreamingMetricsObjectName(connector(), server(), getStreamingNamespace(), task()); + } + + @Override + protected ObjectName getMultiplePartitionStreamingMetricsObjectName() throws MalformedObjectNameException { + return getStreamingMetricsObjectName(connector(), server(), getStreamingNamespace(), task(), TEST_DATABASE_1); + } + + @Override + protected ObjectName getMultiplePartitionStreamingMetricsObjectNameCustomTags(Map customTags) throws MalformedObjectNameException { + + return getStreamingMetricsObjectName(connector(), server(), task(), TEST_DATABASE_1, customTags); + } + + @Test + @Override + public void testSnapshotAndStreamingMetrics() throws Exception { + // Setup + executeInsertStatements(); + + // start connector + start(); + assertConnectorIsRunning(); + + assertSnapshotMetrics(); + // For SQL Server we have two more since when the streaming will start from an empty offset + // it will take the last commited transaction in the log and so also the initial inserts will be streamed. + assertStreamingMetrics(false, expectedEvents() + 2); + } + + @Test + @Override + public void testSnapshotAndStreamingWithCustomMetrics() throws Exception { + // Setup + executeInsertStatements(); + + // start connector + + Map customMetricTags = Map.of("env", "test", "bu", "bigdata"); + start(x -> x.with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata")); + + assertSnapshotWithCustomMetrics(customMetricTags); + // For SQL Server we have two more since when the streaming will start from an empty offset + // it will take the last commited transaction in the log and so also the initial inserts will be streamed. + assertStreamingWithCustomMetrics(customMetricTags, expectedEvents() + 2); + } +} diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index b252f751a..b923b8c85 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -33,6 +33,7 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.management.InstanceNotFoundException; import javax.management.JMException; @@ -1325,6 +1326,19 @@ public static void waitForSnapshotWithCustomMetricsToBeCompleted(String connecto .getAttribute(getSnapshotMetricsObjectName(connector, server, props), "SnapshotCompleted")); } + public static void waitForSnapshotWithCustomMetricsToBeCompleted(String connector, String server, String task, String database, Map props) + throws InterruptedException { + final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + + Awaitility.await() + .alias("Streaming was not started on time") + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS) + .ignoreException(InstanceNotFoundException.class) + .until(() -> (boolean) mbeanServer + .getAttribute(getSnapshotMetricsObjectName(connector, server, task, database, props), "SnapshotCompleted")); + } + private static void waitForSnapshotEvent(String connector, String server, String event, String task, String database) throws InterruptedException { final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); @@ -1360,7 +1374,16 @@ public static void waitForStreamingWithCustomMetricsToStart(String connector, St .pollInterval(100, TimeUnit.MILLISECONDS) .atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS) .ignoreException(InstanceNotFoundException.class) - .until(() -> isStreamingRunning(connector, server, props)); + .until(() -> isStreamingRunning(connector, server, null, null, props)); + } + + public static void waitForStreamingWithCustomMetricsToStart(String connector, String server, String task, String database, Map props) { + Awaitility.await() + .alias("Streaming was not started on time") + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS) + .ignoreException(InstanceNotFoundException.class) + .until(() -> isStreamingRunning(connector, server, task, database, props)); } public static void waitForConnectorShutdown(String connector, String server) { @@ -1391,11 +1414,11 @@ public static boolean isStreamingRunning(String connector, String server, String return false; } - public static boolean isStreamingRunning(String connector, String server, Map props) { + public static boolean isStreamingRunning(String connector, String server, String task, String database, Map props) { final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); try { - ObjectName streamingMetricsObjectName = getStreamingMetricsObjectName(connector, server, props); + ObjectName streamingMetricsObjectName = getStreamingMetricsObjectName(connector, server, task, null, props); return (boolean) mbeanServer.getAttribute(streamingMetricsObjectName, "Connected"); } catch (JMException ignored) { @@ -1429,10 +1452,50 @@ public static ObjectName getSnapshotMetricsObjectName(String connector, String s return getSnapshotMetricsObjectName(connector, server); } + public static ObjectName getSnapshotMetricsObjectName(String connector, String server, String task, String database, Map props) + throws MalformedObjectNameException { + + Map taskAndDatabase = new HashMap<>(); + taskAndDatabase.put("task", task); + taskAndDatabase.put("database", database); + + String additionalProperties = Stream.of(props.entrySet(), taskAndDatabase.entrySet()).flatMap(Set::stream) + .filter(e -> e.getValue() != null) + .map(e -> String.format("%s=%s", e.getKey(), e.getValue())) + .collect(Collectors.joining(",")); + + if (additionalProperties.length() != 0) { + return new ObjectName("debezium." + connector + ":type=connector-metrics,context=snapshot,server=" + server + "," + additionalProperties); + } + + return getSnapshotMetricsObjectName(connector, server); + } + public static ObjectName getStreamingMetricsObjectName(String connector, String server) throws MalformedObjectNameException { return getStreamingMetricsObjectName(connector, server, getStreamingNamespace()); } + public static ObjectName getStreamingMetricsObjectName(String connector, String server, String context, String task, String database) + throws MalformedObjectNameException { + + Map props = new HashMap<>(); + props.put("task", task); + props.put("database", database); + + return getStreamingMetricsObjectName(connector, server, props); + } + + public static ObjectName getStreamingMetricsObjectName(String connector, String server, String task, String database, Map customTags) + throws MalformedObjectNameException { + + Map props = new HashMap<>(); + props.put("task", task); + props.put("database", database); + props.putAll(customTags); + + return getStreamingMetricsObjectName(connector, server, props); + } + public static ObjectName getStreamingMetricsObjectName(String connector, String server, String context) throws MalformedObjectNameException { return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server); } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java new file mode 100644 index 000000000..eed617e19 --- /dev/null +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java @@ -0,0 +1,333 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.lang.management.ManagementFactory; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.kafka.connect.source.SourceConnector; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.Test; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.doc.FixFor; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.util.Testing; + +public abstract class AbstractMetricsTest extends AbstractConnectorTest { + + protected abstract Class getConnectorClass(); + + protected abstract String connector(); + + protected abstract String server(); + + protected abstract Configuration.Builder config(); + + protected abstract Configuration.Builder noSnapshot(Configuration.Builder config); + + protected abstract void executeInsertStatements() throws Exception; + + protected abstract String tableName(); + + protected abstract long expectedEvents(); + + protected abstract boolean snapshotCompleted(); + + protected String task() { + return null; + } + + protected String database() { + return null; + } + + protected void start() { + final Configuration config = config().build(); + start(getConnectorClass(), config, loggingCompletion(), null); + } + + protected void start(Function custConfig) { + final Configuration config = custConfig.apply(config()).build(); + start(getConnectorClass(), config, loggingCompletion(), null); + } + + @Test + public void testLifecycle() throws Exception { + + start(); + + assertConnectorIsRunning(); + + // These methods use the JMX metrics, this simply checks they're available as expected + waitForSnapshotToBeCompleted(connector(), server(), task(), database()); + waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task()); + + // Stop the connector + stopConnector(); + + // Verify snapshot metrics no longer exist + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + try { + mBeanServer.getMBeanInfo(getSnapshotMetricsObjectName()); + Assert.fail("Expected Snapshot Metrics no longer to exist"); + } + catch (InstanceNotFoundException e) { + // expected + } + + // Verify streaming metrics no longer exist + try { + mBeanServer.getMBeanInfo(getStreamingMetricsObjectName()); + Assert.fail("Expected Streaming Metrics no longer to exist"); + } + catch (InstanceNotFoundException e) { + // expected + } + } + + @Test + public void testSnapshotOnlyMetrics() throws Exception { + // Setup + executeInsertStatements(); + + // start connector + start(); + + assertSnapshotMetrics(); + } + + @Test + public void testSnapshotAndStreamingMetrics() throws Exception { + // Setup + executeInsertStatements(); + + // start connector + start(); + assertConnectorIsRunning(); + + assertSnapshotMetrics(); + assertStreamingMetrics(false, expectedEvents()); + } + + @Test + @FixFor("DBZ-6603") + public void testSnapshotAndStreamingWithCustomMetrics() throws Exception { + // Setup + executeInsertStatements(); + + // start connector + + Map customMetricTags = Map.of("env", "test", "bu", "bigdata"); + start(x -> x.with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata")); + + assertSnapshotWithCustomMetrics(customMetricTags); + assertStreamingWithCustomMetrics(customMetricTags, expectedEvents()); + } + + @Test + public void testStreamingOnlyMetrics() throws Exception { + + // start connector + start(this::noSnapshot); + + waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task()); + assertSnapshotNotExecutedMetrics(); + assertStreamingMetrics(false, expectedEvents()); + } + + @Test + public void testAdvancedStreamingMetrics() throws Exception { + + // start connector + start(x -> noSnapshot(x) + .with(CommonConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE)); + + waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task()); + assertSnapshotNotExecutedMetrics(); + assertStreamingMetrics(true, expectedEvents()); + } + + @Test + public void testPauseAndResumeAdvancedStreamingMetrics() throws Exception { + + // start connector + start(x -> noSnapshot(x) + .with(CommonConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE)); + + waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task()); + assertSnapshotNotExecutedMetrics(); + assertStreamingMetrics(true, expectedEvents()); + + invokeOperation(getMultiplePartitionStreamingMetricsObjectName(), "pause"); + insertRecords(); + assertAdvancedMetrics(2); + + invokeOperation(getMultiplePartitionStreamingMetricsObjectName(), "resume"); + insertRecords(); + assertAdvancedMetrics(4); + } + + private void insertRecords() throws Exception { + // Wait for the streaming to begin + executeInsertStatements(); + waitForAvailableRecords(30, TimeUnit.SECONDS); + Thread.sleep(Duration.ofSeconds(2).toMillis()); + } + + protected void assertSnapshotMetrics() throws Exception { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + // Wait for the snapshot to complete to verify metrics + waitForSnapshotToBeCompleted(connector(), server(), task(), database()); + + // Check snapshot metrics + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(1); + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables")).isEqualTo(new String[]{ tableName() }); + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L); + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0); + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false); + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false); + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true); + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false); + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L); + } + + protected void assertSnapshotWithCustomMetrics(Map customMetricTags) throws Exception { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + final ObjectName objectName = getSnapshotMetricsObjectName(connector(), server(), task(), database(), customMetricTags); + + // Wait for the snapshot to complete to verify metrics + waitForSnapshotWithCustomMetricsToBeCompleted(connector(), server(), task(), database(), customMetricTags); + + // Check snapshot metrics + assertThat(mBeanServer.getAttribute(objectName, "TotalTableCount")).isEqualTo(1); + assertThat(mBeanServer.getAttribute(objectName, "CapturedTables")).isEqualTo(new String[]{ tableName() }); + assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(2L); + assertThat(mBeanServer.getAttribute(objectName, "RemainingTableCount")).isEqualTo(0); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotRunning")).isEqualTo(false); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotAborted")).isEqualTo(false); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotCompleted")).isEqualTo(true); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotPaused")).isEqualTo(false); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotPausedDurationInSeconds")).isEqualTo(0L); + } + + private void assertSnapshotNotExecutedMetrics() throws Exception { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + Awaitility.await("Waiting for snapshot metrics to appear").atMost(waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { + try { + mBeanServer.getObjectInstance(getSnapshotMetricsObjectName()); + return true; + } + catch (InstanceNotFoundException e) { + return false; + } + }); + + // Check snapshot metrics + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(0L); + assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(snapshotCompleted()); + } + + protected void assertStreamingMetrics(boolean checkAdvancedMetrics, long expectedEvents) throws Exception { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task()); + + // Insert new records and wait for them to become available + executeInsertStatements(); + // Testing.Print.enable(); + consumeRecordsByTopic((int) expectedEvents); + Thread.sleep(Duration.ofSeconds(2).toMillis()); + + // Check streaming metrics + Testing.print("****ASSERTIONS****"); + assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "Connected")).isEqualTo(true); + assertThat(mBeanServer.getAttribute(getMultiplePartitionStreamingMetricsObjectName(), "TotalNumberOfCreateEventsSeen")).isEqualTo(expectedEvents); + + if (checkAdvancedMetrics) { + assertAdvancedMetrics(2L); + } + } + + public void assertAdvancedMetrics(long expectedInsert) throws Exception { + + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer + .getAttribute(getStreamingMetricsObjectName(connector(), server(), getStreamingNamespace(), task(), database()), "NumberOfCreateEventsSeen"); + + String values = numberOfCreateEventsSeen.values().stream() + .limit(1) + .toList() + .get(0) + .toString(); + assertThat(values).isEqualTo( + "javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=" + + tableName() + ", value=" + + expectedInsert + "})"); + } + + private void invokeOperation(ObjectName objectName, String operation) + throws ReflectionException, InstanceNotFoundException, MBeanException { + + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + + server.invoke(objectName, operation, new Object[]{}, new String[]{}); + } + + protected void assertStreamingWithCustomMetrics(Map customMetricTags, long expectedEvents) throws Exception { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + waitForStreamingWithCustomMetricsToStart(connector(), server(), task(), database(), customMetricTags); + + // Insert new records and wait for them to become available + executeInsertStatements(); + waitForAvailableRecords(30, TimeUnit.SECONDS); + consumeRecords((int) expectedEvents); + Thread.sleep(Duration.ofSeconds(2).toMillis()); + + // Check streaming metrics + Testing.print("****ASSERTIONS****"); + assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(connector(), server(), task(), null, customMetricTags), "Connected")).isEqualTo(true); + assertThat(mBeanServer.getAttribute(getMultiplePartitionStreamingMetricsObjectNameCustomTags(customMetricTags), "TotalNumberOfCreateEventsSeen")) + .isEqualTo(expectedEvents); + } + + protected ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException { + return getSnapshotMetricsObjectName(connector(), server()); + } + + protected ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException { + return getStreamingMetricsObjectName(connector(), server()); + } + + protected ObjectName getMultiplePartitionStreamingMetricsObjectName() throws MalformedObjectNameException { + // Only SQL Server manage partition scoped metrics + return getStreamingMetricsObjectName(connector(), server()); + } + + protected ObjectName getMultiplePartitionStreamingMetricsObjectNameCustomTags(Map customTags) throws MalformedObjectNameException { + // Only SQL Server manage partition scoped metrics + return getStreamingMetricsObjectName(connector(), server(), customTags); + } +}