DBZ-4610 Upgrade to Kafka 3.1.0

This commit is contained in:
Jiri Pechanec 2022-01-25 08:32:21 +01:00 committed by Gunnar Morling
parent 4236e3291a
commit 55894e57c2
3 changed files with 9 additions and 9 deletions

View File

@ -19,7 +19,7 @@
import io.debezium.util.IoUtil; import io.debezium.util.IoUtil;
import kafka.admin.RackAwareMode; import kafka.admin.RackAwareMode;
import kafka.log.Log; import kafka.log.UnifiedLog;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
import kafka.zk.AdminZkClient; import kafka.zk.AdminZkClient;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
@ -238,7 +238,7 @@ public synchronized void shutdown(boolean deleteLogs) {
if (deleteLogs) { if (deleteLogs) {
// as of 0.10.1.1 if logs are not deleted explicitly, there are open File Handles left on .timeindex files // as of 0.10.1.1 if logs are not deleted explicitly, there are open File Handles left on .timeindex files
// at least on Windows courtesy of the TimeIndex.scala class // at least on Windows courtesy of the TimeIndex.scala class
JavaConverters.asJavaIterableConverter(server.logManager().allLogs()).asJava().forEach(Log::delete); JavaConverters.asJavaIterableConverter(server.logManager().allLogs()).asJava().forEach(UnifiedLog::delete);
} }
LOGGER.info("Stopped Kafka server {} at {}", brokerId, getConnection()); LOGGER.info("Stopped Kafka server {} at {}", brokerId, getConnection());
} }

View File

@ -908,7 +908,7 @@ protected RecordCommitter buildRecordCommitter(OffsetStorageWriter offsetWriter,
public synchronized void markProcessed(SourceRecord record) throws InterruptedException { public synchronized void markProcessed(SourceRecord record) throws InterruptedException {
task.commitRecord(record); task.commitRecord(record);
recordsSinceLastCommit += 1; recordsSinceLastCommit += 1;
offsetWriter.offset(record.sourcePartition(), record.sourceOffset()); offsetWriter.offset((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());
} }
@Override @Override

12
pom.xml
View File

@ -95,16 +95,16 @@
<release.endpoint>https://s01.oss.sonatype.org/</release.endpoint> <release.endpoint>https://s01.oss.sonatype.org/</release.endpoint>
<!-- Kafka and it's dependencies MUST reflect what the Kafka version uses --> <!-- Kafka and it's dependencies MUST reflect what the Kafka version uses -->
<version.kafka>3.0.0</version.kafka> <version.kafka>3.1.0</version.kafka>
<version.zookeeper>3.5.9</version.zookeeper> <version.zookeeper>3.6.3</version.zookeeper>
<version.jackson>2.10.5</version.jackson> <version.jackson>2.12.3</version.jackson>
<version.jackson.databind>2.10.5.1</version.jackson.databind> <version.jackson.databind>2.12.3</version.jackson.databind>
<version.org.slf4j>1.7.30</version.org.slf4j> <version.org.slf4j>1.7.30</version.org.slf4j>
<version.log4j>1.2.17</version.log4j> <version.log4j>1.2.17</version.log4j>
<version.netty>4.1.51.Final</version.netty> <version.netty>4.1.68.Final</version.netty>
<!-- Scala version used to build Kafka --> <!-- Scala version used to build Kafka -->
<version.kafka.scala>2.12</version.kafka.scala> <version.kafka.scala>2.13</version.kafka.scala>
<!-- ANTLR --> <!-- ANTLR -->
<!-- Align with Antlr runtime version pulled in via Quarkus --> <!-- Align with Antlr runtime version pulled in via Quarkus -->