DBZ-6496 Fix performance issue on testNotification test with Oracle and MS SQL Server connectors

This commit is contained in:
mfvitale 2023-05-23 15:42:43 +02:00 committed by Jiri Pechanec
parent 7ba9f57035
commit e0659f1a0e
3 changed files with 18 additions and 3 deletions

View File

@ -175,6 +175,11 @@ protected String alterTableAddColumnStatement(String tableName) {
return "ALTER TABLE " + tableName + " ADD col3 INTEGER DEFAULT 0";
}
@Override
protected int defaultIncrementalSnapshotChunkSize() {
return 250;
}
@Test
public void snapshotPreceededBySchemaChange() throws Exception {
// TODO: remove once https://github.com/Apicurio/apicurio-registry/issues/2980 is fixed

View File

@ -126,4 +126,9 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur
protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
TestHelper.waitForCdcTransactionPropagation(connection, TestHelper.TEST_DATABASE_1, expectedTransactions);
}
@Override
protected int defaultIncrementalSnapshotChunkSize() {
return 250;
}
}

View File

@ -1095,9 +1095,10 @@ public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception {
@Test
public void testNotification() throws Exception {
populateTable();
startConnector(x -> x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1)
.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink")
startConnector(x -> x.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink")
.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, defaultIncrementalSnapshotChunkSize())
.with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification"), loggingCompletion(), false);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
@ -1147,6 +1148,10 @@ public void testNotification() throws Exception {
assertCorrectIncrementalSnapshotNotification(notifications);
}
protected int defaultIncrementalSnapshotChunkSize() {
return 1;
}
private static BiPredicate<Integer, SourceRecord> incrementalSnapshotCompleted() {
return (recordsConsumed, record) -> record.topic().equals("io.debezium.notification") &&
((Struct) record.value()).getString("type").equals("COMPLETED");
@ -1172,7 +1177,7 @@ private void assertCorrectIncrementalSnapshotNotification(List<SourceRecord> not
assertThat(inProgress.getMap("additional_data"))
.containsEntry("current_collection_in_progress", tableDataCollectionId())
.containsEntry("maximum_key", "1000")
.containsEntry("last_processed_key", "1");
.containsEntry("last_processed_key", String.valueOf(defaultIncrementalSnapshotChunkSize()));
Struct completed = incrementalSnapshotNotification.stream().filter(s -> s.getString("type").equals("TABLE_SCAN_COMPLETED")).findFirst().get();
assertThat(completed.getMap("additional_data"))