DBZ-4907 Increment Commit SCN when any COMMIT is detected

This commit is contained in:
Chris Cranford 2022-03-30 12:31:35 -04:00 committed by Chris Cranford
parent 1355067ef0
commit 13a96549bc
3 changed files with 81 additions and 11 deletions

View File

@ -60,10 +60,19 @@ public static String build(OracleConnectorConfig connectorConfig, OracleDatabase
// These bind parameters will be bound when the query is executed by the caller.
query.append("WHERE SCN > ? AND SCN <= ? ");
// Restrict to configured PDB if one is supplied
// The connector currently requires a "database.pdb.name" configuration property when using CDB mode.
// If this property is provided, build a predicate that will be used in later predicates.
final String pdbName = connectorConfig.getPdbName();
final String pdbPredicate;
if (!Strings.isNullOrEmpty(pdbName)) {
query.append("AND ").append("SRC_CON_NAME = '").append(pdbName.toUpperCase()).append("' ");
// This predicate is used later to explicitly restrict certain OPERATION_CODE and DDL events by the
// PDB database name while allowing all START, COMMIT, MISSING_SCN, and ROLLBACK operations
// regardless of where they originate, i.e. the PDB or CDB$ROOT.
pdbPredicate = "SRC_CON_NAME = '" + pdbName + "'";
}
else {
// No PDB configuration provided, no PDB predicate is necessary.
pdbPredicate = null;
}
// Excluded schemas, if defined
@ -81,8 +90,9 @@ public static String build(OracleConnectorConfig connectorConfig, OracleDatabase
if (!schema.storeOnlyCapturedTables()) {
// In this mode, the connector will always be fed DDL operations for all tables even if they
// are not part of the inclusion/exclusion lists.
query.append(" OR ").append(buildDdlPredicate()).append(" ");
// are not part of the inclusion/exclusion lists. We will pass the PDB predicate here to then
// restrict DDL operations to only the PDB database if not null.
query.append(" OR ").append(buildDdlPredicate(pdbPredicate)).append(" ");
// Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
if (connectorConfig.isLobEnabled()) {
query.append(") OR (OPERATION_CODE IN (1,2,3,9,10,11,29) ");
@ -90,17 +100,30 @@ public static String build(OracleConnectorConfig connectorConfig, OracleDatabase
else {
query.append(") OR (OPERATION_CODE IN (1,2,3) ");
}
if (pdbPredicate != null) {
// Restrict Insert, Update, Delete, and optionally SelectLob, LobWrite, LobTrim, and LobErase by PDB
query.append("AND ").append(pdbPredicate).append(' ');
}
}
else {
query.append(") OR (");
if (pdbPredicate != null) {
// We specify the PDB predicate here because it applies to the OPERATION_CODE predicates but
// also the DDL predicate that is to follow later due to predicate groups, effectively
// restricting all DML operations and DDL changes to the PDB only.
query.append(pdbPredicate).append(" AND ");
}
// Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase
if (connectorConfig.isLobEnabled()) {
query.append(") OR ((OPERATION_CODE IN (1,2,3,9,10,11,29) ");
query.append("(OPERATION_CODE IN (1,2,3,9,10,11,29) ");
}
else {
query.append(") OR ((OPERATION_CODE IN (1,2,3) ");
query.append("(OPERATION_CODE IN (1,2,3) ");
}
// In this mode, the connector will filter DDL operations based on the table inclusion/exclusion lists
query.append("OR ").append(buildDdlPredicate()).append(") ");
// We pass "null" to the DDL predicate because we will have added the predicate earlier as a part of
// the outer predicate group to also be applied to OPERATION_CODE
query.append("OR ").append(buildDdlPredicate(null)).append(") ");
}
// Always ignore the flush table
@ -124,13 +147,18 @@ public static String build(OracleConnectorConfig connectorConfig, OracleDatabase
/**
* Builds a common SQL fragment used to obtain DDL operations via LogMiner.
*
* @param pdbPredicate pluggable database predicate, maybe {@code null}
* @return predicate that can be used to obtain DDL operations via LogMiner
*/
private static String buildDdlPredicate() {
private static String buildDdlPredicate(String pdbPredicate) {
final StringBuilder predicate = new StringBuilder(256);
predicate.append("(OPERATION_CODE = 5 ");
predicate.append("AND USERNAME NOT IN ('SYS','SYSTEM') ");
predicate.append("AND INFO NOT LIKE 'INTERNAL DDL%' ");
if (pdbPredicate != null) {
// DDL changes should be restricted to only the PDB database if supplied
predicate.append("AND ").append(pdbPredicate).append(' ');
}
predicate.append("AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%'))");
return predicate.toString();
}

View File

@ -3292,6 +3292,46 @@ public void shouldCaptureChangeForTableWithUnsupportedColumnTypeLong() throws Ex
}
}
@Test
@FixFor("DBZ-4907")
public void shouldContinueToUpdateOffsetsEvenWhenTableIsNotChanged() throws Exception {
TestHelper.dropTable(connection, "dbz4907");
try {
connection.execute("CREATE TABLE dbz4907 (id numeric(9,0) primary key, state varchar2(50))");
connection.execute("INSERT INTO dbz4907 (id,state) values (1, 'snapshot')");
TestHelper.streamTable(connection, "dbz4907");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4907")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> table = records.recordsForTopic("server1.DEBEZIUM.DBZ4907");
assertThat(table).hasSize(1);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// There should at least be a commit by the flush policy that triggers the advancement
// of the SCN values in the offsets within a few seconds of the polling mechanism.
final String offsetScn = getStreamingMetric("OffsetScn");
final String committedScn = getStreamingMetric("CommittedScn");
Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
final String newOffsetScn = getStreamingMetric("OffsetScn");
final String newCommittedScn = getStreamingMetric("CommittedScn");
return !newOffsetScn.equals(offsetScn) && !newCommittedScn.equals(committedScn);
});
}
finally {
TestHelper.dropTable(connection, "dbz4907");
}
}
private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException {
try (OracleConnection admin = TestHelper.adminConnection()) {
admin.resetSessionToCdb();

View File

@ -62,14 +62,15 @@ public class LogMinerQueryBuilderTest {
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE1 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID " +
"FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? " +
"AND SRC_CON_NAME = '" + TestHelper.DATABASE + "' " +
"${systemTablePredicate}" +
"AND ((" +
"OPERATION_CODE IN (6,7,34,36) OR " +
"(OPERATION_CODE = 5 AND USERNAME NOT IN ('SYS','SYSTEM') " +
"AND INFO NOT LIKE 'INTERNAL DDL%' " +
"AND SRC_CON_NAME = '" + TestHelper.DATABASE + "' " +
"AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%')) ) " +
"OR (OPERATION_CODE IN ${operationCodes} " +
"AND SRC_CON_NAME = '" + TestHelper.DATABASE + "' " +
"AND TABLE_NAME != '" + LogWriterFlushStrategy.LOGMNR_FLUSH_TABLE + "' " +
"${schemaPredicate}" +
"${tablePredicate}" +
@ -82,11 +83,12 @@ public class LogMinerQueryBuilderTest {
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE2 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID " +
"FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? " +
"AND SRC_CON_NAME = '" + TestHelper.DATABASE + "' " +
"${systemTablePredicate}" +
"AND ((" +
"OPERATION_CODE IN (6,7,34,36)) OR " +
"((OPERATION_CODE IN ${operationCodes} OR " +
"(" +
"SRC_CON_NAME = '" + TestHelper.DATABASE + "' AND " +
"(OPERATION_CODE IN ${operationCodes} OR " +
"(OPERATION_CODE = 5 AND USERNAME NOT IN ('SYS','SYSTEM') " +
"AND INFO NOT LIKE 'INTERNAL DDL%' " +
"AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%'))) " +