DBZ-7024 Make default RecordCommitter thread unsafe

Default implmentation of `RecordCommitter`, the `SourceRecordCommitter`,
is always created for each task and withing given task is called
sequentially, always in the same thread. There's no need to aquire locks
for each method call.

Make `SourceRecordCommitter` thread unsafe.
This commit is contained in:
Vojtech Juranek 2024-01-24 17:35:50 +01:00 committed by Jiri Pechanec
parent 998f00f811
commit 293b84645d

View File

@ -1200,6 +1200,7 @@ public void processRecords(final List<SourceRecord> records) throws Exception {
/**
* The default implementation of {@link DebeziumEngine.RecordCommitter}.
* The implementation is not thread safe and the user has to ensure it's used in thread safe manner.
*/
private static class SourceRecordCommitter implements DebeziumEngine.RecordCommitter<SourceRecord> {
@ -1221,15 +1222,14 @@ private static class SourceRecordCommitter implements DebeziumEngine.RecordCommi
}
@Override
// TODO do we need synchronized here? Possibly not.
public synchronized void markProcessed(SourceRecord record) throws InterruptedException {
public void markProcessed(SourceRecord record) throws InterruptedException {
task.commitRecord(record);
recordsSinceLastCommit += 1;
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
}
@Override
public synchronized void markBatchFinished() throws InterruptedException {
public void markBatchFinished() throws InterruptedException {
final Duration durationSinceLastCommit = Duration.ofMillis(clock.currentTimeInMillis() - timeOfLastCommitMillis);
if (offsetCommitPolicy.performCommit(recordsSinceLastCommit, durationSinceLastCommit)) {
try {
@ -1245,7 +1245,7 @@ public synchronized void markBatchFinished() throws InterruptedException {
}
@Override
public synchronized void markProcessed(SourceRecord record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
public void markProcessed(SourceRecord record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
DebeziumEngineCommon.SourceRecordOffsets offsets = (DebeziumEngineCommon.SourceRecordOffsets) sourceOffsets;
SourceRecord recordWithUpdatedOffsets = new SourceRecord(record.sourcePartition(), offsets.getOffsets(), record.topic(),
record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(),