DBZ-4277 Clean-up variable usage, fix incrementation

This commit is contained in:
Chris Cranford 2021-11-16 09:50:18 -05:00 committed by Gunnar Morling
parent 9e846710a3
commit ceea7a7b5f

View File

@ -125,16 +125,15 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
op.setMaximumSizeInBytes(maxBatchSize);
}
for (int i = 0; i < records.size();) {
int added = 0;
int start = i;
for (int recordIndex = 0; recordIndex < records.size();) {
int start = recordIndex;
LOGGER.trace("Emitting events starting from index {}", start);
EventDataBatch batch = producer.createBatch(op);
// this loop adds as many records to the batch as possible
for (; i < records.size();) {
ChangeEvent<Object, Object> record = records.get(i);
for (; recordIndex < records.size(); recordIndex++) {
ChangeEvent<Object, Object> record = records.get(recordIndex);
LOGGER.trace("Received record '{}'", record.value());
if (null == record.value()) {
continue;
@ -150,18 +149,16 @@ else if (record.value() instanceof byte[]) {
try {
if (!batch.tryAdd(eventData)) {
if (added == 0) {
if (batch.getCount() == 0) {
// If we fail to add at least the very first event to the batch that is because
// the event's size exceeds the maxBatchSize in which case we cannot safely
// recover and dispatch the event, only option is to throw an exception.
throw new DebeziumException("Event data is too large to fit into batch");
}
// reached the maximum allowed size for the batch
LOGGER.trace("Maximum batch reached, dispatching {} events.", added);
LOGGER.trace("Maximum batch reached, dispatching {} events.", batch.getCount());
break;
}
added++;
i++;
}
catch (IllegalArgumentException e) {
// thrown by tryAdd if event data is null
@ -177,9 +174,10 @@ else if (record.value() instanceof byte[]) {
}
}
if (added > 0) {
final int batchEventSize = batch.getCount();
if (batchEventSize > 0) {
try {
LOGGER.trace("Sending batch of {} events to Event Hubs", added);
LOGGER.trace("Sending batch of {} events to Event Hubs", batchEventSize);
producer.send(batch);
LOGGER.trace("Sent record batch to Event Hubs");
}
@ -188,8 +186,8 @@ else if (record.value() instanceof byte[]) {
}
// this loop commits each record submitted in the event hubs batch
LOGGER.trace("Marking records at index {} to {} as processed", start, start + added);
for (int j = start; j < start + added; ++j) {
LOGGER.trace("Marking records at index {} to {} as processed", start, recordIndex);
for (int j = start; j < recordIndex; ++j) {
ChangeEvent<Object, Object> record = records.get(j);
try {
committer.markProcessed(record);