DBZ-7777 Avoid using List#get() in ParallelSmtConsumerProcessor

Before the change:

    Benchmark                                        (processingOrder)  (recordCount)  (threadCount)  Mode  Cnt   Score   Error  Units
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              1    ss       88.006           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              2    ss       90.226           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              4    ss       83.129           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              8    ss       81.417           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000             16    ss       90.936           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              1    ss       83.645           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              2    ss       77.527           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              4    ss       88.495           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              8    ss       79.590           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000             16    ss       90.246           s/op
    DebeziumEnginePerf.processRecordsEmbeddedEngine                N/A         100000            N/A    ss        0.125           s/op

After the change:

    Benchmark                                        (processingOrder)  (recordCount)  (threadCount)  Mode  Cnt  Score   Error  Units
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              1    ss       0.514           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              2    ss       0.391           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              4    ss       0.467           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              8    ss       0.393           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000             16    ss       0.462           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              1    ss       0.410           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              2    ss       0.431           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              4    ss       0.460           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              8    ss       0.419           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000             16    ss       0.448           s/op
    DebeziumEnginePerf.processRecordsEmbeddedEngine                N/A         100000            N/A    ss       0.230           s/op
This commit is contained in:
Vojtech Juranek 2024-04-18 15:21:26 +02:00 committed by Jiri Pechanec
parent 8f0d5a8e66
commit c4a54962ad
2 changed files with 21 additions and 12 deletions

View File

@ -5,7 +5,6 @@
*/
package io.debezium.embedded.async;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Consumer;
@ -37,13 +36,18 @@ public class ParallelSmtAsyncConsumerProcessor extends AbstractRecordProcessor<S
@Override
public void processRecords(final List<SourceRecord> records) throws Exception {
LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size());
final List<Future<Void>> recordFutures = new ArrayList<>(records.size());
records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new ProcessingCallables.TransformAndConsumeRecord(r, transformations, consumer))));
final Future<Void>[] recordFutures = new Future[records.size()];
int i = 0;
for (SourceRecord r : records) {
recordFutures[i] = recordService.submit(new ProcessingCallables.TransformAndConsumeRecord(r, transformations, consumer));
i++;
}
LOGGER.trace("Waiting for the batch to finish processing.");
for (int i = 0; i < records.size(); i++) {
recordFutures.get(i);
committer.markProcessed(records.get(i));
i = 0;
for (SourceRecord r : records) {
recordFutures[i].get();
committer.markProcessed(r);
}
LOGGER.trace("Marking batch as finished.");

View File

@ -5,7 +5,6 @@
*/
package io.debezium.embedded.async;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Consumer;
@ -37,13 +36,19 @@ public class ParallelSmtConsumerProcessor extends AbstractRecordProcessor<Source
@Override
public void processRecords(final List<SourceRecord> records) throws Exception {
LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size());
final List<Future<SourceRecord>> recordFutures = new ArrayList<>(records.size());
records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new ProcessingCallables.TransformRecord(r, transformations))));
final Future<SourceRecord>[] recordFutures = new Future[records.size()];
int i = 0;
for (SourceRecord r : records) {
recordFutures[i] = recordService.submit(new ProcessingCallables.TransformRecord(r, transformations));
i++;
}
LOGGER.trace("Calling user consumer.");
for (int i = 0; i < records.size(); i++) {
consumer.accept(recordFutures.get(i).get());
committer.markProcessed(records.get(i));
i = 0;
for (SourceRecord r : records) {
consumer.accept(recordFutures[i].get());
committer.markProcessed(r);
i++;
}
LOGGER.trace("Marking batch as finished.");