DBZ-6023 Add integration tests

This commit is contained in:
PlugaruT 2023-02-09 19:55:40 +02:00 committed by Jiri Pechanec
parent 7d0492fee5
commit 3e05965d24

View File

@ -250,20 +250,31 @@ protected String getSignalTypeFieldName() {
}
protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException {
sendAdHocSnapshotSignalWithAdditionalCondition(Optional.empty(), dataCollectionIds);
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), dataCollectionIds);
}
protected void sendAdHocSnapshotSignalWithAdditionalCondition(Optional<String> additionalCondition, String... dataCollectionIds) {
protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional<String> additionalCondition, Optional<String> surrogateKey,
String... dataCollectionIds) {
final String dataCollectionIdsList = Arrays.stream(dataCollectionIds)
.map(x -> '"' + x + '"')
.collect(Collectors.joining(", "));
try (JdbcConnection connection = databaseConnection()) {
String query;
if (additionalCondition.isPresent()) {
if (additionalCondition.isPresent() && surrogateKey.isPresent()) {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')",
signalTableName(), dataCollectionIdsList, additionalCondition.get(), surrogateKey.get());
}
else if (additionalCondition.isPresent()) {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [%s], \"additional-condition\": %s}')",
signalTableName(), dataCollectionIdsList, additionalCondition.get());
}
else if (surrogateKey.isPresent()) {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [%s], \"surrogate-key\": %s}')",
signalTableName(), dataCollectionIdsList, surrogateKey.get());
}
else {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [%s]}')",
@ -902,7 +913,7 @@ public void snapshotWithAdditionalCondition() throws Exception {
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignalWithAdditionalCondition(Optional.of(String.format("\"aa = %s\"", expectedValue)),
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.empty(),
tableDataCollectionId());
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount,
@ -921,7 +932,7 @@ public void shouldExecuteRegularSnapshotWhenAdditionalConditionEmpty() throws Ex
final int recordsCount = ROW_COUNT;
sendAdHocSnapshotSignalWithAdditionalCondition(Optional.of("\"\""), tableDataCollectionId());
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of("\"\""), Optional.empty(), tableDataCollectionId());
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(recordsCount,
x -> true, null);
@ -944,7 +955,7 @@ public void snapshotWithAdditionalConditionWithRestart() throws Exception {
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignalWithAdditionalCondition(Optional.of(String.format("\"aa = %s\"", expectedValue)),
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.empty(),
tableDataCollectionId());
final AtomicInteger recordCounter = new AtomicInteger();
@ -964,6 +975,48 @@ public void snapshotWithAdditionalConditionWithRestart() throws Exception {
assertTrue(dbChanges.values().stream().allMatch(v -> v.equals(expectedValue)));
}
@Test
public void snapshotWithSurrogateKey() throws Exception {
// Testing.Print.enable();
populateTable();
startConnector();
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.of("\"pk\""), tableDataCollectionId());
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
assertThat(dbChanges).contains(entry(i + 1, i));
}
}
@Test
public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception {
// Testing.Print.enable();
int expectedCount = 10, expectedValue = 12345678;
populateTable();
populateTableWithSpecificValue(2000, expectedCount, expectedValue);
waitForCdcTransactionPropagation(3);
final Configuration config = config().build();
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.of("\"pk\""),
tableDataCollectionId());
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount,
x -> true, null);
assertEquals(expectedCount, dbChanges.size());
assertTrue(dbChanges.values().stream().allMatch(v -> (((Struct) v.value()).getStruct("after")
.getInt32(valueFieldName())).equals(expectedValue)));
}
protected void sendAdHocSnapshotSignalAndWait(String... collectionIds) throws Exception {
// Sends the adhoc snapshot signal and waits for the signal event to have been received
if (collectionIds.length == 0) {