diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java index 0e257634c..80568bfee 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java @@ -250,20 +250,31 @@ protected String getSignalTypeFieldName() { } protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException { - sendAdHocSnapshotSignalWithAdditionalCondition(Optional.empty(), dataCollectionIds); + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), dataCollectionIds); } - protected void sendAdHocSnapshotSignalWithAdditionalCondition(Optional additionalCondition, String... dataCollectionIds) { + protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional additionalCondition, Optional surrogateKey, + String... dataCollectionIds) { final String dataCollectionIdsList = Arrays.stream(dataCollectionIds) .map(x -> '"' + x + '"') .collect(Collectors.joining(", ")); try (JdbcConnection connection = databaseConnection()) { String query; - if (additionalCondition.isPresent()) { + if (additionalCondition.isPresent() && surrogateKey.isPresent()) { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')", + signalTableName(), dataCollectionIdsList, additionalCondition.get(), surrogateKey.get()); + } + else if (additionalCondition.isPresent()) { query = String.format( "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [%s], \"additional-condition\": %s}')", signalTableName(), dataCollectionIdsList, additionalCondition.get()); } + else if (surrogateKey.isPresent()) { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [%s], \"surrogate-key\": %s}')", + signalTableName(), dataCollectionIdsList, surrogateKey.get()); + } else { query = String.format( "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [%s]}')", @@ -902,7 +913,7 @@ public void snapshotWithAdditionalCondition() throws Exception { // there shouldn't be any snapshot records assertNoRecordsToConsume(); - sendAdHocSnapshotSignalWithAdditionalCondition(Optional.of(String.format("\"aa = %s\"", expectedValue)), + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.empty(), tableDataCollectionId()); final Map dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount, @@ -921,7 +932,7 @@ public void shouldExecuteRegularSnapshotWhenAdditionalConditionEmpty() throws Ex final int recordsCount = ROW_COUNT; - sendAdHocSnapshotSignalWithAdditionalCondition(Optional.of("\"\""), tableDataCollectionId()); + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of("\"\""), Optional.empty(), tableDataCollectionId()); final Map dbChanges = consumeRecordsMixedWithIncrementalSnapshot(recordsCount, x -> true, null); @@ -944,7 +955,7 @@ public void snapshotWithAdditionalConditionWithRestart() throws Exception { // there shouldn't be any snapshot records assertNoRecordsToConsume(); - sendAdHocSnapshotSignalWithAdditionalCondition(Optional.of(String.format("\"aa = %s\"", expectedValue)), + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.empty(), tableDataCollectionId()); final AtomicInteger recordCounter = new AtomicInteger(); @@ -964,6 +975,48 @@ public void snapshotWithAdditionalConditionWithRestart() throws Exception { assertTrue(dbChanges.values().stream().allMatch(v -> v.equals(expectedValue))); } + @Test + public void snapshotWithSurrogateKey() throws Exception { + // Testing.Print.enable(); + + populateTable(); + startConnector(); + + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.of("\"pk\""), tableDataCollectionId()); + + final int expectedRecordCount = ROW_COUNT; + final Map dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount); + for (int i = 0; i < expectedRecordCount; i++) { + assertThat(dbChanges).contains(entry(i + 1, i)); + } + } + + @Test + public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception { + // Testing.Print.enable(); + + int expectedCount = 10, expectedValue = 12345678; + populateTable(); + populateTableWithSpecificValue(2000, expectedCount, expectedValue); + waitForCdcTransactionPropagation(3); + final Configuration config = config().build(); + startAndConsumeTillEnd(connectorClass(), config); + waitForConnectorToStart(); + + waitForAvailableRecords(1, TimeUnit.SECONDS); + // there shouldn't be any snapshot records + assertNoRecordsToConsume(); + + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.of("\"pk\""), + tableDataCollectionId()); + + final Map dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount, + x -> true, null); + assertEquals(expectedCount, dbChanges.size()); + assertTrue(dbChanges.values().stream().allMatch(v -> (((Struct) v.value()).getStruct("after") + .getInt32(valueFieldName())).equals(expectedValue))); + } + protected void sendAdHocSnapshotSignalAndWait(String... collectionIds) throws Exception { // Sends the adhoc snapshot signal and waits for the signal event to have been received if (collectionIds.length == 0) {