DBZ-2124 Do not call markBatchFinished on failure
This commit is contained in:
parent
966b5ce7f0
commit
6b5ac2494b
@ -459,23 +459,19 @@ private static ChangeConsumer buildDefaultChangeConsumer(Consumer<SourceRecord>
|
||||
*/
|
||||
@Override
|
||||
public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
|
||||
try {
|
||||
for (SourceRecord record : records) {
|
||||
try {
|
||||
consumer.accept(record);
|
||||
committer.markProcessed(record);
|
||||
}
|
||||
catch (StopConnectorException | StopEngineException ex) {
|
||||
// ensure that we mark the record as finished
|
||||
// in this case
|
||||
committer.markProcessed(record);
|
||||
throw ex;
|
||||
}
|
||||
for (SourceRecord record : records) {
|
||||
try {
|
||||
consumer.accept(record);
|
||||
committer.markProcessed(record);
|
||||
}
|
||||
catch (StopConnectorException | StopEngineException ex) {
|
||||
// ensure that we mark the record as finished
|
||||
// in this case
|
||||
committer.markProcessed(record);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
finally {
|
||||
committer.markBatchFinished();
|
||||
}
|
||||
committer.markBatchFinished();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user