DBZ-6679 Only advance LogMiner start position when query has data

This commit is contained in:
Chris Cranford 2024-02-18 02:41:50 -05:00 committed by Jiri Pechanec
parent 1632646525
commit 340234c2ac
2 changed files with 65 additions and 2 deletions

View File

@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnection.NonRelationalTableException;
import io.debezium.connector.oracle.OracleConnectorConfig;
@ -262,10 +263,17 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc
}
metrics.setLastProcessedRowsCount(counters.rows);
if (counters.rows == 0) {
// When no rows are processed, don't advance the SCN
return startScn;
}
else {
return calculateNewStartScn(endScn, offsetContext.getCommitScn().getMaxCommittedScn());
}
}
}
}
/**
* Get the LogMiner query that will be used to fetch results.
@ -1260,7 +1268,8 @@ private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId ta
* @throws SQLException if an exception occurred obtaining the DDL statement
* @throws NonRelationalTableException if the table is not a relational table
*/
private String getTableMetadataDdl(TableId tableId) throws SQLException, NonRelationalTableException {
@VisibleForTesting
public String getTableMetadataDdl(TableId tableId) throws SQLException, NonRelationalTableException {
counters.tableMetadataCount++;
LOGGER.info("Getting database metadata for table '{}'", tableId);
// A separate connection must be used for this out-of-bands query while processing LogMiner results.

View File

@ -90,6 +90,7 @@ public void before() throws Exception {
this.offsetContext = Mockito.mock(OracleOffsetContext.class);
final CommitScn commitScn = CommitScn.valueOf((String) null);
Mockito.when(this.offsetContext.getCommitScn()).thenReturn(commitScn);
Mockito.when(this.offsetContext.getSnapshotScn()).thenReturn(Scn.valueOf("1"));
this.connection = createOracleConnection(false);
this.schema = createOracleDatabaseSchema();
this.metrics = createMetrics(schema);
@ -235,6 +236,58 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Exception
}
}
@Test
@FixFor("DBZ-6679")
public void testEmptyResultSetWithMineRangeAdvancesCorrectly() throws Exception {
if (!isTransactionAbandonmentSupported()) {
return;
}
final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build());
try (T processor = getProcessor(config)) {
final ResultSet rs = Mockito.mock(ResultSet.class);
Mockito.when(rs.next()).thenReturn(false);
final PreparedStatement ps = Mockito.mock(PreparedStatement.class);
Mockito.when(processor.createQueryStatement()).thenReturn(ps);
Mockito.when(ps.executeQuery()).thenReturn(rs);
Scn nextStartScn = processor.process(Scn.valueOf(100), Scn.valueOf(200));
assertThat(nextStartScn).isEqualTo(Scn.valueOf(100));
}
}
@Test
@FixFor("DBZ-6679")
public void testNonEmptyResultSetWithMineRangeAdvancesCorrectly() throws Exception {
if (!isTransactionAbandonmentSupported()) {
return;
}
final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build());
try (T processor = getProcessor(config)) {
final ResultSet rs = Mockito.mock(ResultSet.class);
Mockito.when(rs.next()).thenReturn(true, false);
Mockito.when(rs.getString(1)).thenReturn("101");
Mockito.when(rs.getString(2)).thenReturn("insert into \"DEBEZIUM\".\"ABC\"(\"ID\",\"DATA\") values ('1','test');");
Mockito.when(rs.getInt(3)).thenReturn(EventType.INSERT.getValue());
Mockito.when(rs.getString(7)).thenReturn("ABC");
Mockito.when(rs.getString(8)).thenReturn("DEBEZIUM");
final PreparedStatement ps = Mockito.mock(PreparedStatement.class);
Mockito.when(processor.createQueryStatement()).thenReturn(ps);
Mockito.when(ps.executeQuery()).thenReturn(rs);
T processorMock = Mockito.spy(processor);
Mockito.doReturn("CREATE TABLE DEBEZIUM.ABC (ID primary key(9,0), data varchar2(50))")
.when(processorMock)
.getTableMetadataDdl(Mockito.any(TableId.class));
Scn nextStartScn = processorMock.process(Scn.valueOf(100), Scn.valueOf(200));
assertThat(nextStartScn).isEqualTo(Scn.valueOf(101));
}
}
@Test
public void testAbandonOneTransaction() throws Exception {
if (!isTransactionAbandonmentSupported()) {
@ -351,6 +404,7 @@ private OracleConnection createOracleConnection(boolean singleOptionalValueThrow
OracleConnection connection = Mockito.mock(OracleConnection.class);
Mockito.when(connection.connection(Mockito.anyBoolean())).thenReturn(conn);
Mockito.when(connection.connection()).thenReturn(conn);
if (!singleOptionalValueThrowException) {
Mockito.when(connection.singleOptionalValue(anyString(), any())).thenReturn(BigInteger.TWO);
}