DBZ-4277 Multi-batch dispatch to EventHubs

This commit is contained in:
Chris Cranford 2021-11-15 19:05:33 -05:00 committed by Gunnar Morling
parent 4cd5b092e9
commit c57f793387

View File

@ -125,57 +125,74 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
op.setMaximumSizeInBytes(maxBatchSize);
}
EventDataBatch batch = producer.createBatch(op);
for (int i = 0; i < records.size();) {
int added = 0;
int start = i;
LOGGER.trace("Emitting events starting from index {}", start);
// this loop just adds records to the batch
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received record '{}'", record.value());
if (null == record.value()) {
continue;
}
EventData eventData = null;
EventDataBatch batch = producer.createBatch(op);
if (record.value() instanceof String) {
eventData = new EventData((String) record.value());
}
else if (record.value() instanceof byte[]) {
eventData = new EventData(getBytes(record.value()));
}
try {
if (!batch.tryAdd(eventData)) {
throw new DebeziumException("Event data was too large to fit in the batch");
// this loop adds as many records to the batch as possible
for (; i < records.size();) {
ChangeEvent<Object, Object> record = records.get(i);
LOGGER.trace("Received record '{}'", record.value());
if (null == record.value()) {
continue;
}
EventData eventData = null;
if (record.value() instanceof String) {
eventData = new EventData((String) record.value());
}
else if (record.value() instanceof byte[]) {
eventData = new EventData(getBytes(record.value()));
}
try {
if (!batch.tryAdd(eventData)) {
// reached the maximum allowed size for the batch
LOGGER.trace("Maximum batch reached, dispatching {} events.", added);
break;
}
added++;
i++;
}
catch (IllegalArgumentException e) {
// thrown by tryAdd if event data is null
throw new DebeziumException(e);
}
catch (AmqpException e) {
// tryAdd throws AmqpException if "eventData is larger than the maximum size of
// the EventDataBatch."
throw new DebeziumException("Event data was larger than the maximum size of the batch", e);
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
catch (IllegalArgumentException e) {
// thrown by tryAdd if event data is null
throw new DebeziumException(e);
}
catch (AmqpException e) {
// tryAdd throws AmqpException if "eventData is larger than the maximum size of
// the EventDataBatch."
throw new DebeziumException("Event data was larger than the maximum size of the batch", e);
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
try {
producer.send(batch);
LOGGER.trace("Sent record batch to Event Hubs");
}
catch (Exception e) {
throw new DebeziumException(e);
}
if (added > 0) {
try {
LOGGER.trace("Sending batch of {} events to Event Hubs", added);
producer.send(batch);
LOGGER.trace("Sent record batch to Event Hubs");
}
catch (Exception e) {
throw new DebeziumException(e);
}
// this loop commits each record
for (ChangeEvent<Object, Object> record : records) {
try {
committer.markProcessed(record);
LOGGER.trace("Record marked processed");
}
catch (Exception e) {
throw new DebeziumException(e);
// this loop commits each record submitted in the event hubs batch
LOGGER.trace("Marking records at index {} to {} as processed", start, start + added);
for (int j = start; j < start + added; ++j) {
ChangeEvent<Object, Object> record = records.get(j);
try {
committer.markProcessed(record);
LOGGER.trace("Record marked processed");
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
}
}