diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 3f4772344..7112528f4 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; +import io.debezium.annotation.VisibleForTesting; import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnection.NonRelationalTableException; import io.debezium.connector.oracle.OracleConnectorConfig; @@ -262,7 +263,14 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc } metrics.setLastProcessedRowsCount(counters.rows); - return calculateNewStartScn(endScn, offsetContext.getCommitScn().getMaxCommittedScn()); + + if (counters.rows == 0) { + // When no rows are processed, don't advance the SCN + return startScn; + } + else { + return calculateNewStartScn(endScn, offsetContext.getCommitScn().getMaxCommittedScn()); + } } } } @@ -1260,7 +1268,8 @@ private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId ta * @throws SQLException if an exception occurred obtaining the DDL statement * @throws NonRelationalTableException if the table is not a relational table */ - private String getTableMetadataDdl(TableId tableId) throws SQLException, NonRelationalTableException { + @VisibleForTesting + public String getTableMetadataDdl(TableId tableId) throws SQLException, NonRelationalTableException { counters.tableMetadataCount++; LOGGER.info("Getting database metadata for table '{}'", tableId); // A separate connection must be used for this out-of-bands query while processing LogMiner results. diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java index d39f7b69c..814392026 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java @@ -90,6 +90,7 @@ public void before() throws Exception { this.offsetContext = Mockito.mock(OracleOffsetContext.class); final CommitScn commitScn = CommitScn.valueOf((String) null); Mockito.when(this.offsetContext.getCommitScn()).thenReturn(commitScn); + Mockito.when(this.offsetContext.getSnapshotScn()).thenReturn(Scn.valueOf("1")); this.connection = createOracleConnection(false); this.schema = createOracleDatabaseSchema(); this.metrics = createMetrics(schema); @@ -235,6 +236,58 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Exception } } + @Test + @FixFor("DBZ-6679") + public void testEmptyResultSetWithMineRangeAdvancesCorrectly() throws Exception { + if (!isTransactionAbandonmentSupported()) { + return; + } + + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + try (T processor = getProcessor(config)) { + final ResultSet rs = Mockito.mock(ResultSet.class); + Mockito.when(rs.next()).thenReturn(false); + + final PreparedStatement ps = Mockito.mock(PreparedStatement.class); + Mockito.when(processor.createQueryStatement()).thenReturn(ps); + Mockito.when(ps.executeQuery()).thenReturn(rs); + + Scn nextStartScn = processor.process(Scn.valueOf(100), Scn.valueOf(200)); + assertThat(nextStartScn).isEqualTo(Scn.valueOf(100)); + } + } + + @Test + @FixFor("DBZ-6679") + public void testNonEmptyResultSetWithMineRangeAdvancesCorrectly() throws Exception { + if (!isTransactionAbandonmentSupported()) { + return; + } + + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + try (T processor = getProcessor(config)) { + final ResultSet rs = Mockito.mock(ResultSet.class); + Mockito.when(rs.next()).thenReturn(true, false); + Mockito.when(rs.getString(1)).thenReturn("101"); + Mockito.when(rs.getString(2)).thenReturn("insert into \"DEBEZIUM\".\"ABC\"(\"ID\",\"DATA\") values ('1','test');"); + Mockito.when(rs.getInt(3)).thenReturn(EventType.INSERT.getValue()); + Mockito.when(rs.getString(7)).thenReturn("ABC"); + Mockito.when(rs.getString(8)).thenReturn("DEBEZIUM"); + + final PreparedStatement ps = Mockito.mock(PreparedStatement.class); + Mockito.when(processor.createQueryStatement()).thenReturn(ps); + Mockito.when(ps.executeQuery()).thenReturn(rs); + + T processorMock = Mockito.spy(processor); + Mockito.doReturn("CREATE TABLE DEBEZIUM.ABC (ID primary key(9,0), data varchar2(50))") + .when(processorMock) + .getTableMetadataDdl(Mockito.any(TableId.class)); + + Scn nextStartScn = processorMock.process(Scn.valueOf(100), Scn.valueOf(200)); + assertThat(nextStartScn).isEqualTo(Scn.valueOf(101)); + } + } + @Test public void testAbandonOneTransaction() throws Exception { if (!isTransactionAbandonmentSupported()) { @@ -351,6 +404,7 @@ private OracleConnection createOracleConnection(boolean singleOptionalValueThrow OracleConnection connection = Mockito.mock(OracleConnection.class); Mockito.when(connection.connection(Mockito.anyBoolean())).thenReturn(conn); + Mockito.when(connection.connection()).thenReturn(conn); if (!singleOptionalValueThrowException) { Mockito.when(connection.singleOptionalValue(anyString(), any())).thenReturn(BigInteger.TWO); }