DBZ-6355 fix unit tests errors
This commit is contained in:
parent
cac5485176
commit
fa42a1b0ce
@ -177,8 +177,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
|
||||
.withType(Type.DOUBLE)
|
||||
.withWidth(Width.SHORT)
|
||||
.withImportance(Importance.MEDIUM)
|
||||
.withDefault(0)
|
||||
.withValidation(Field::isNonNegativeInteger)
|
||||
.withDefault(0.0)
|
||||
.withValidation(Field::isNonNegativeDouble)
|
||||
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 18))
|
||||
.withDescription("Hours to keep long running transactions in transaction buffer between log mining " +
|
||||
"sessions. By default, all transactions are retained.");
|
||||
|
@ -201,6 +201,10 @@ public void testTransactionRetention() throws Exception {
|
||||
|
||||
config = Configuration.create().with(transactionRetentionField, -1).build();
|
||||
assertThat(config.validateAndRecord(Collections.singletonList(transactionRetentionField), LOGGER::error)).isFalse();
|
||||
|
||||
config = Configuration.create().with(transactionRetentionField, 0.25).build();
|
||||
connectorConfig = new OracleConnectorConfig(config);
|
||||
assertThat(connectorConfig.getLogMiningTransactionRetention()).isEqualTo(Duration.ofMinutes(15));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -203,4 +203,49 @@ public void shouldResumeLongRunningTransactionFromPersistedState() throws Except
|
||||
assertThat(after.get("ID")).isEqualTo(4);
|
||||
assertThat(after.get("NAME")).isEqualTo("Roger Rabbit");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-6355")
|
||||
public void testBacklogTransactionShouldNotBeAbandon() throws Exception {
|
||||
if (!hasPersistedState()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Start thbacke connector using the specified buffer & not to drop the buffer across restarts.
|
||||
// The testing framework automatically specifies this as true, so we need to override it.
|
||||
Configuration config = getBufferImplementationConfig()
|
||||
.with(OracleConnectorConfig.LOG_MINING_TRANSACTION_RETENTION, 0.017) // 1 Minute retention
|
||||
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3752")
|
||||
.build();
|
||||
|
||||
// Insert one record.
|
||||
try (OracleConnection secondary = TestHelper.testConnection()) {
|
||||
secondary.execute("INSERT INTO dbz3752 (id,name) values (1, 'Gerald Jinx Mouse')");
|
||||
}
|
||||
|
||||
Thread.sleep(120000);
|
||||
|
||||
// Start connector and wait for streaming to begin
|
||||
start(OracleConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||
|
||||
// Get only record
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
assertThat(records.allRecordsInOrder()).hasSize(1);
|
||||
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ3752");
|
||||
assertThat(tableRecords).hasSize(1);
|
||||
|
||||
// Assert record state
|
||||
Struct after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
|
||||
assertThat(after.get("ID")).isEqualTo(1);
|
||||
assertThat(after.get("NAME")).isEqualTo("Gerald Jinx Mouse");
|
||||
|
||||
// There should be no more records to consume.
|
||||
// The persisted state should contain the Thomas Jasper insert
|
||||
assertNoRecordsToConsume();
|
||||
|
||||
// Shutdown the connector
|
||||
stopConnector();
|
||||
}
|
||||
}
|
||||
|
@ -833,6 +833,16 @@ public Field withDefault(int defaultValue) {
|
||||
() -> defaultValue, validator, recommender, isRequired, group, allowedValues);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and return a new Field instance that is a copy of this field but with the given default value.
|
||||
* @param defaultValue the new default value for the new field
|
||||
* @return the new field; never null
|
||||
*/
|
||||
public Field withDefault(double defaultValue) {
|
||||
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
|
||||
() -> defaultValue, validator, recommender, isRequired, group, allowedValues);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and return a new Field instance that is a copy of this field but with the given default value.
|
||||
*
|
||||
@ -1267,6 +1277,22 @@ public static int isNonNegativeInteger(Configuration config, Field field, Valida
|
||||
return 1;
|
||||
}
|
||||
|
||||
public static int isNonNegativeDouble(Configuration config, Field field, ValidationOutput problems) {
|
||||
String value = config.getString(field);
|
||||
if (value == null) {
|
||||
return 0;
|
||||
}
|
||||
try {
|
||||
if (Double.parseDouble(value) >= 0) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
catch (Throwable e) {
|
||||
}
|
||||
problems.accept(field, value, "An non-negative double is expected");
|
||||
return 1;
|
||||
}
|
||||
|
||||
public static int isLong(Configuration config, Field field, ValidationOutput problems) {
|
||||
String value = config.getString(field);
|
||||
if (value == null) {
|
||||
|
Loading…
Reference in New Issue
Block a user