From 72f8c06bb617274bbfdf6f62bc59ad12704847e6 Mon Sep 17 00:00:00 2001 From: Thomas Thornton Date: Tue, 16 Feb 2021 17:50:10 -0800 Subject: [PATCH] DBZ-2897 Make Offsets interface non-static, refactor test for updating offsets --- .../io/debezium/engine/DebeziumEngine.java | 2 +- .../debezium/embedded/EmbeddedEngineTest.java | 39 +++++++++++-------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java b/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java index 09bd9e517..ff6b21c09 100644 --- a/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java +++ b/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java @@ -142,7 +142,7 @@ public static interface RecordCommitter { * Contract that should be passed to {@link RecordCommitter#markProcessed(Object, Offsets)} for marking a record * as processed with updated offsets. */ - public static interface Offsets { + public interface Offsets { /** * Associates a key with a specific value, overwrites the value if the key is already present. diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java index a485bdadc..e842259e1 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java @@ -7,9 +7,7 @@ import static org.fest.assertions.Assertions.assertThat; -import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -21,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -31,16 +30,20 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.SafeObjectInputStream; import org.fest.assertions.Assertions; import org.junit.Before; import org.junit.Test; +import com.fasterxml.jackson.databind.JsonNode; + import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.connector.simple.SimpleSourceConnector; @@ -336,15 +339,17 @@ public void shouldRunEngineWithConsumerSettingOffsets() throws Exception { // Add initial content to the file ... appendLinesToSource(NUMBER_OF_LINES); + String TEST_TOPIC = "topicX"; + String CUSTOM_SOURCE_OFFSET_PARTITION = "test_topic_partition1"; + Long EXPECTED_CUSTOM_OFFSET = 1L; + final Properties props = new Properties(); props.setProperty("name", "debezium-engine"); props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector"); props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); props.setProperty("offset.flush.interval.ms", "0"); props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString()); - props.setProperty("topic", "topicX"); - - String CUSTOM_SOURCE_OFFSET_PARTITION = "test_topic_partition1"; + props.setProperty("topic", TEST_TOPIC); CountDownLatch firstLatch = new CountDownLatch(1); CountDownLatch allLatch = new CountDownLatch(6); @@ -358,7 +363,7 @@ public void shouldRunEngineWithConsumerSettingOffsets() throws Exception { for (RecordChangeEvent r : records) { DebeziumEngine.Offsets offsets = committer.buildOffsets(); - offsets.set(CUSTOM_SOURCE_OFFSET_PARTITION, 1L); + offsets.set(CUSTOM_SOURCE_OFFSET_PARTITION, EXPECTED_CUSTOM_OFFSET); logger.info(r.record().sourceOffset().toString()); committer.markProcessed(r, offsets); } @@ -389,17 +394,17 @@ public void shouldRunEngineWithConsumerSettingOffsets() throws Exception { allLatch.await(5000, TimeUnit.MILLISECONDS); assertThat(allLatch.getCount()).isEqualTo(0); - boolean containsCustomPartition = false; - try (BufferedReader br = new BufferedReader(new FileReader(OFFSET_STORE_PATH.toString()))) { - String line; - while ((line = br.readLine()) != null) { - logger.info(line); - if (line.contains(CUSTOM_SOURCE_OFFSET_PARTITION)) { - containsCustomPartition = true; - } - } - } - assertThat(containsCustomPartition).isTrue(); + SafeObjectInputStream inputStream = new SafeObjectInputStream(java.nio.file.Files.newInputStream(OFFSET_STORE_PATH.toAbsolutePath())); + Object obj = inputStream.readObject(); + Map raw = (Map) obj; + Set> fileOffsetStoreEntrySingleton = raw.entrySet(); + assertThat(fileOffsetStoreEntrySingleton.size()).isEqualTo(1); + Map.Entry fileOffsetEntry = fileOffsetStoreEntrySingleton.iterator().next(); + ByteBuffer offsetJsonString = fileOffsetEntry.getValue() != null ? ByteBuffer.wrap(fileOffsetEntry.getValue()) : null; + JsonDeserializer jsonDeserializer = new JsonDeserializer(); + JsonNode partitionToOffsetMap = jsonDeserializer.deserialize(TEST_TOPIC, offsetJsonString.array()); + Long actualOffset = partitionToOffsetMap.get(CUSTOM_SOURCE_OFFSET_PARTITION).asLong(); + assertThat(actualOffset).isEqualTo(EXPECTED_CUSTOM_OFFSET); // Stop the connector ... stopConnector();