DBZ-2282 updated to use batching

This commit is contained in:
abhishek 2020-07-15 18:14:16 +05:30 committed by Gunnar Morling
parent c551f80855
commit 520cf62049

View File

@ -92,8 +92,7 @@ void connect() {
try { try {
producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient(); producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient();
} } catch (Exception e) {
catch (Exception e) {
throw new DebeziumException(e); throw new DebeziumException(e);
} }
@ -105,17 +104,15 @@ void close() {
try { try {
producer.close(); producer.close();
LOGGER.info("Closed Event Hubs producer client"); LOGGER.info("Closed Event Hubs producer client");
} } catch (Exception e) {
catch (Exception e) {
LOGGER.warn("Exception while closing Event Hubs producer: {}", e); LOGGER.warn("Exception while closing Event Hubs producer: {}", e);
} }
} }
@Override @Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, public void handleBatch(List<ChangeEvent<Object, Object>> records,
RecordCommitter<ChangeEvent<Object, Object>> committer) RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
throws InterruptedException { LOGGER.trace("Event Hubs sink adapter processing change events");
LOGGER.trace("Processing change events...");
CreateBatchOptions op = new CreateBatchOptions().setPartitionId(partitionID); CreateBatchOptions op = new CreateBatchOptions().setPartitionId(partitionID);
if (partitionKey != "") { if (partitionKey != "") {
@ -127,6 +124,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
EventDataBatch batch = producer.createBatch(op); EventDataBatch batch = producer.createBatch(op);
// this loop just adds records to the batch
for (ChangeEvent<Object, Object> record : records) { for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received event '{}'", record); LOGGER.trace("Received event '{}'", record);
if (null == record.value()) { if (null == record.value()) {
@ -136,51 +134,40 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
if (record.value() instanceof String) { if (record.value() instanceof String) {
eventData = new EventData((String) record.value()); eventData = new EventData((String) record.value());
} } else if (record.value() instanceof byte[]) {
else if (record.value() instanceof byte[]) {
eventData = new EventData(getBytes(record.value())); eventData = new EventData(getBytes(record.value()));
} }
try { try {
if (!batch.tryAdd(eventData)) { if (!batch.tryAdd(eventData)) {
LOGGER.warn("event was too large to fit in the batch - {}", record); LOGGER.warn("Event data was too large to fit in the batch - {}", record);
continue;
} }
} catch (IllegalArgumentException e) {
LOGGER.warn("Event data was null - {}", e.getMessage());
} catch (AmqpException e) {
LOGGER.warn("Event data is larger than the maximum size of the EventDataBatch - {}", e.getMessage());
} catch (Exception e) {
LOGGER.warn("Failed to add event data to batch - {}", e.getMessage());
} }
catch (IllegalArgumentException e) { }
LOGGER.warn("EventData was null - {}", e.getMessage());
continue;
}
catch (AmqpException e) {
LOGGER.warn("EventData is larger than the maximum size of the EventDataBatch - {}", e.getMessage());
continue;
}
catch (Exception e) {
LOGGER.warn("Failed to add EventData to batch {}", e.getMessage());
continue;
}
// Event Hubs producer only supports "batch"ed sends. Each record is sent as a try {
// separate batch which is then acknowledged/committed producer.send(batch);
LOGGER.trace("Sent record batch to Event Hubs");
} catch (Exception e) {
LOGGER.warn("Failed to send record to Event Hubs {}", e.getMessage());
}
try { // this loop commits each record
producer.send(batch); for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Sent record to Event Hubs");
}
catch (Exception e) {
LOGGER.warn("Failed to send record to Event Hubs {}", e.getMessage());
// do not mark the record as processed it its not sent to Event Hubs
continue;
}
try { try {
committer.markProcessed(record); committer.markProcessed(record);
LOGGER.trace("Record marked processed"); LOGGER.trace("Record marked processed");
} } catch (Exception e) {
catch (Exception e) {
LOGGER.warn("Failed to mark record as processed {}", e.getMessage()); LOGGER.warn("Failed to mark record as processed {}", e.getMessage());
} }
} }
committer.markBatchFinished(); committer.markBatchFinished();
LOGGER.info("Batch marked finished"); LOGGER.trace("Batch marked finished");
} }
} }