From 13a96549bc5ef15f081b6c3d15945c3db47f7645 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Wed, 30 Mar 2022 12:31:35 -0400 Subject: [PATCH] DBZ-4907 Increment Commit SCN when any COMMIT is detected --- .../oracle/logminer/LogMinerQueryBuilder.java | 44 +++++++++++++++---- .../connector/oracle/OracleConnectorIT.java | 40 +++++++++++++++++ .../logminer/LogMinerQueryBuilderTest.java | 8 ++-- 3 files changed, 81 insertions(+), 11 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java index d3f6d2a6a..d92ac454b 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java @@ -60,10 +60,19 @@ public static String build(OracleConnectorConfig connectorConfig, OracleDatabase // These bind parameters will be bound when the query is executed by the caller. query.append("WHERE SCN > ? AND SCN <= ? "); - // Restrict to configured PDB if one is supplied + // The connector currently requires a "database.pdb.name" configuration property when using CDB mode. + // If this property is provided, build a predicate that will be used in later predicates. final String pdbName = connectorConfig.getPdbName(); + final String pdbPredicate; if (!Strings.isNullOrEmpty(pdbName)) { - query.append("AND ").append("SRC_CON_NAME = '").append(pdbName.toUpperCase()).append("' "); + // This predicate is used later to explicitly restrict certain OPERATION_CODE and DDL events by the + // PDB database name while allowing all START, COMMIT, MISSING_SCN, and ROLLBACK operations + // regardless of where they originate, i.e. the PDB or CDB$ROOT. + pdbPredicate = "SRC_CON_NAME = '" + pdbName + "'"; + } + else { + // No PDB configuration provided, no PDB predicate is necessary. + pdbPredicate = null; } // Excluded schemas, if defined @@ -81,8 +90,9 @@ public static String build(OracleConnectorConfig connectorConfig, OracleDatabase if (!schema.storeOnlyCapturedTables()) { // In this mode, the connector will always be fed DDL operations for all tables even if they - // are not part of the inclusion/exclusion lists. - query.append(" OR ").append(buildDdlPredicate()).append(" "); + // are not part of the inclusion/exclusion lists. We will pass the PDB predicate here to then + // restrict DDL operations to only the PDB database if not null. + query.append(" OR ").append(buildDdlPredicate(pdbPredicate)).append(" "); // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase if (connectorConfig.isLobEnabled()) { query.append(") OR (OPERATION_CODE IN (1,2,3,9,10,11,29) "); @@ -90,17 +100,30 @@ public static String build(OracleConnectorConfig connectorConfig, OracleDatabase else { query.append(") OR (OPERATION_CODE IN (1,2,3) "); } + if (pdbPredicate != null) { + // Restrict Insert, Update, Delete, and optionally SelectLob, LobWrite, LobTrim, and LobErase by PDB + query.append("AND ").append(pdbPredicate).append(' '); + } } else { + query.append(") OR ("); + if (pdbPredicate != null) { + // We specify the PDB predicate here because it applies to the OPERATION_CODE predicates but + // also the DDL predicate that is to follow later due to predicate groups, effectively + // restricting all DML operations and DDL changes to the PDB only. + query.append(pdbPredicate).append(" AND "); + } // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase if (connectorConfig.isLobEnabled()) { - query.append(") OR ((OPERATION_CODE IN (1,2,3,9,10,11,29) "); + query.append("(OPERATION_CODE IN (1,2,3,9,10,11,29) "); } else { - query.append(") OR ((OPERATION_CODE IN (1,2,3) "); + query.append("(OPERATION_CODE IN (1,2,3) "); } // In this mode, the connector will filter DDL operations based on the table inclusion/exclusion lists - query.append("OR ").append(buildDdlPredicate()).append(") "); + // We pass "null" to the DDL predicate because we will have added the predicate earlier as a part of + // the outer predicate group to also be applied to OPERATION_CODE + query.append("OR ").append(buildDdlPredicate(null)).append(") "); } // Always ignore the flush table @@ -124,13 +147,18 @@ public static String build(OracleConnectorConfig connectorConfig, OracleDatabase /** * Builds a common SQL fragment used to obtain DDL operations via LogMiner. * + * @param pdbPredicate pluggable database predicate, maybe {@code null} * @return predicate that can be used to obtain DDL operations via LogMiner */ - private static String buildDdlPredicate() { + private static String buildDdlPredicate(String pdbPredicate) { final StringBuilder predicate = new StringBuilder(256); predicate.append("(OPERATION_CODE = 5 "); predicate.append("AND USERNAME NOT IN ('SYS','SYSTEM') "); predicate.append("AND INFO NOT LIKE 'INTERNAL DDL%' "); + if (pdbPredicate != null) { + // DDL changes should be restricted to only the PDB database if supplied + predicate.append("AND ").append(pdbPredicate).append(' '); + } predicate.append("AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%'))"); return predicate.toString(); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index a0e111134..7ab4c0542 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -3292,6 +3292,46 @@ public void shouldCaptureChangeForTableWithUnsupportedColumnTypeLong() throws Ex } } + @Test + @FixFor("DBZ-4907") + public void shouldContinueToUpdateOffsetsEvenWhenTableIsNotChanged() throws Exception { + TestHelper.dropTable(connection, "dbz4907"); + try { + + connection.execute("CREATE TABLE dbz4907 (id numeric(9,0) primary key, state varchar2(50))"); + connection.execute("INSERT INTO dbz4907 (id,state) values (1, 'snapshot')"); + TestHelper.streamTable(connection, "dbz4907"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4907") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + SourceRecords records = consumeRecordsByTopic(1); + List table = records.recordsForTopic("server1.DEBEZIUM.DBZ4907"); + assertThat(table).hasSize(1); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // There should at least be a commit by the flush policy that triggers the advancement + // of the SCN values in the offsets within a few seconds of the polling mechanism. + final String offsetScn = getStreamingMetric("OffsetScn"); + final String committedScn = getStreamingMetric("CommittedScn"); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + final String newOffsetScn = getStreamingMetric("OffsetScn"); + final String newCommittedScn = getStreamingMetric("CommittedScn"); + return !newOffsetScn.equals(offsetScn) && !newCommittedScn.equals(committedScn); + }); + } + finally { + TestHelper.dropTable(connection, "dbz4907"); + } + } + private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException { try (OracleConnection admin = TestHelper.adminConnection()) { admin.resetSessionToCdb(); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java index 3fe60724e..c6ad0b894 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java @@ -62,14 +62,15 @@ public class LogMinerQueryBuilderTest { private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE1 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " + "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID " + "FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? " + - "AND SRC_CON_NAME = '" + TestHelper.DATABASE + "' " + "${systemTablePredicate}" + "AND ((" + "OPERATION_CODE IN (6,7,34,36) OR " + "(OPERATION_CODE = 5 AND USERNAME NOT IN ('SYS','SYSTEM') " + "AND INFO NOT LIKE 'INTERNAL DDL%' " + + "AND SRC_CON_NAME = '" + TestHelper.DATABASE + "' " + "AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%')) ) " + "OR (OPERATION_CODE IN ${operationCodes} " + + "AND SRC_CON_NAME = '" + TestHelper.DATABASE + "' " + "AND TABLE_NAME != '" + LogWriterFlushStrategy.LOGMNR_FLUSH_TABLE + "' " + "${schemaPredicate}" + "${tablePredicate}" + @@ -82,11 +83,12 @@ public class LogMinerQueryBuilderTest { private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE2 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " + "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID " + "FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? " + - "AND SRC_CON_NAME = '" + TestHelper.DATABASE + "' " + "${systemTablePredicate}" + "AND ((" + "OPERATION_CODE IN (6,7,34,36)) OR " + - "((OPERATION_CODE IN ${operationCodes} OR " + + "(" + + "SRC_CON_NAME = '" + TestHelper.DATABASE + "' AND " + + "(OPERATION_CODE IN ${operationCodes} OR " + "(OPERATION_CODE = 5 AND USERNAME NOT IN ('SYS','SYSTEM') " + "AND INFO NOT LIKE 'INTERNAL DDL%' " + "AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%'))) " +