DBZ-7946 Handle partition rebalances

There are two things at play here to support partition rebalance. The first is to handle
the close function call to remove the entry from the offset map that is maintained by
the connector when a partition rebalance happens.

The second is to address the fact that Kafka 3.6+ changed the SinkRecord API and in
order to match the close TopicPartition list with the managed offsets, we need to
always make sure we use the originalXXXX methods that were added in Kafka 3.6, falling
back to the old behavior for Kafka 3.5 or before.
This commit is contained in:
Chris Cranford 2024-06-13 15:03:50 -04:00 committed by Chris Cranford
parent 5f17e289bc
commit d95e496df0

View File

@ -122,6 +122,16 @@ public void put(Collection<SinkRecord> records) {
LOGGER.trace("[PERF] Mark processed execution time {}", markProcessedStopWatch.durations());
}
@Override
public void close(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
if (offsets.containsKey(partition)) {
LOGGER.trace("Requested close TopicPartition request for '{}'", partition);
offsets.remove(partition);
}
}
}
@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// Flush only up to the records processed by this sink
@ -176,15 +186,16 @@ private void markProcessed(SinkRecord record) {
LOGGER.trace("Marking processed record for topic {}", topicName);
final TopicPartition topicPartition = new TopicPartition(topicName, record.kafkaPartition());
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.kafkaOffset() + 1L);
final long kafkaOffset = getOriginalKafkaOffset(record);
final TopicPartition topicPartition = new TopicPartition(topicName, getOriginalKafkaPartition(record));
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(kafkaOffset + 1L);
final OffsetAndMetadata existing = offsets.put(topicPartition, offsetAndMetadata);
if (existing == null) {
LOGGER.trace("Advanced topic {} to offset {}.", topicName, record.kafkaOffset());
LOGGER.trace("Advanced topic {} to offset {}.", topicName, kafkaOffset);
}
else {
LOGGER.trace("Updated topic {} from offset {} to {}.", topicName, existing.offset(), record.kafkaOffset());
LOGGER.trace("Updated topic {} from offset {} to {}.", topicName, existing.offset(), kafkaOffset);
}
}
@ -200,10 +211,14 @@ private void markNotProcessed(SinkRecord record) {
// in doing this if this tuple is not already in the map as a previous entry could have been
// added because an earlier record was either processed or marked as not processed since any
// remaining entries in the batch call this method on failures.
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
final String topicName = getOriginalTopicName(record);
final Integer kafkaPartition = getOriginalKafkaPartition(record);
final long kafkaOffset = getOriginalKafkaOffset(record);
final TopicPartition topicPartition = new TopicPartition(topicName, kafkaPartition);
if (!offsets.containsKey(topicPartition)) {
LOGGER.debug("Rewinding topic {} offset to {}.", record.topic(), record.kafkaOffset());
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.kafkaOffset());
LOGGER.debug("Rewinding topic {} offset to {}.", topicName, kafkaOffset);
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(kafkaOffset);
offsets.put(topicPartition, offsetAndMetadata);
}
}
@ -225,4 +240,25 @@ private String getOriginalTopicName(SinkRecord record) {
return null;
}
private Integer getOriginalKafkaPartition(SinkRecord record) {
try {
// Added in Kafka 3.6 and should be used as it will contain the details pre-transformations
return record.originalKafkaPartition();
}
catch (NoSuchMethodError e) {
// Fallback to old method for Kafka 3.5 or earlier
return record.kafkaPartition();
}
}
private long getOriginalKafkaOffset(SinkRecord record) {
try {
// Added in Kafka 3.6 and should be used as it will contain the details pre-transformations
return record.originalKafkaOffset();
}
catch (NoSuchMethodError e) {
// Fallback to old method for Kafka 3.5 or earlier
return record.kafkaOffset();
}
}
}