DBZ-8105 Make JDBC connector compatible with both Connect 3.7 and 3.8

This commit is contained in:
Jiri Pechanec 2024-07-31 07:42:44 +02:00
parent 772cc7e280
commit bf497adf7f

View File

@ -5,12 +5,15 @@
*/
package io.debezium.connector.jdbc;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
@ -22,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.dialect.DatabaseDialectResolver;
import io.debezium.pipeline.sink.spi.ChangeEventSink;
@ -37,6 +41,9 @@
public class JdbcSinkConnectorTask extends SinkTask {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSinkConnectorTask.class);
private static final Class[] EMPTY_CLASS_ARRAY = new Class[0];
private SessionFactory sessionFactory;
private enum State {
@ -51,6 +58,24 @@ private enum State {
private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
private Throwable previousPutException;
/**
* There is a change in {@link InternalSinkRecord} API between Connect 3.7 and 3.8.
* The code now uses 3.8 and use reflection to call old API if new one is not available.
*/
private boolean usePre380OriginalRecordAccess = false;
private Method pre380OriginalRecordMethod = null;
public JdbcSinkConnectorTask() {
try {
pre380OriginalRecordMethod = InternalSinkRecord.class.getMethod("originalRecord", EMPTY_CLASS_ARRAY);
usePre380OriginalRecordAccess = true;
LOGGER.info("Old InternalSinkRecord class found, will use reflection for calls");
}
catch (NoSuchMethodException | SecurityException e) {
LOGGER.info("New InternalSinkRecord class found");
}
}
@Override
public String version() {
return Module.version();
@ -244,6 +269,14 @@ private String getOriginalTopicName(SinkRecord record) {
// Kafka Connect implementation at this point, it's a fair workaround.
//
if (record instanceof InternalSinkRecord) {
if (usePre380OriginalRecordAccess) {
try {
return ((ConsumerRecord<byte[], byte[]>) pre380OriginalRecordMethod.invoke(record, EMPTY_CLASS_ARRAY)).topic();
}
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw new DebeziumException("Failed to access original record data", e);
}
}
return ((InternalSinkRecord) record).context().original().topic();
}
return null;