DBZ-2897 Make Offsets interface non-static, refactor test for updating offsets

This commit is contained in:
Thomas Thornton 2021-02-16 17:50:10 -08:00 committed by Gunnar Morling
parent dcb6b04835
commit 72f8c06bb6
2 changed files with 23 additions and 18 deletions

View File

@ -142,7 +142,7 @@ public static interface RecordCommitter<R> {
* Contract that should be passed to {@link RecordCommitter#markProcessed(Object, Offsets)} for marking a record * Contract that should be passed to {@link RecordCommitter#markProcessed(Object, Offsets)} for marking a record
* as processed with updated offsets. * as processed with updated offsets.
*/ */
public static interface Offsets { public interface Offsets {
/** /**
* Associates a key with a specific value, overwrites the value if the key is already present. * Associates a key with a specific value, overwrites the value if the key is already present.

View File

@ -7,9 +7,7 @@
import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Assertions.assertThat;
import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -21,6 +19,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -31,16 +30,20 @@
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.file.FileStreamSourceConnector; import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.fest.assertions.Assertions; import org.fest.assertions.Assertions;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.simple.SimpleSourceConnector; import io.debezium.connector.simple.SimpleSourceConnector;
@ -336,15 +339,17 @@ public void shouldRunEngineWithConsumerSettingOffsets() throws Exception {
// Add initial content to the file ... // Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES); appendLinesToSource(NUMBER_OF_LINES);
String TEST_TOPIC = "topicX";
String CUSTOM_SOURCE_OFFSET_PARTITION = "test_topic_partition1";
Long EXPECTED_CUSTOM_OFFSET = 1L;
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty("name", "debezium-engine"); props.setProperty("name", "debezium-engine");
props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector"); props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty("offset.flush.interval.ms", "0"); props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString()); props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX"); props.setProperty("topic", TEST_TOPIC);
String CUSTOM_SOURCE_OFFSET_PARTITION = "test_topic_partition1";
CountDownLatch firstLatch = new CountDownLatch(1); CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6); CountDownLatch allLatch = new CountDownLatch(6);
@ -358,7 +363,7 @@ public void shouldRunEngineWithConsumerSettingOffsets() throws Exception {
for (RecordChangeEvent<SourceRecord> r : records) { for (RecordChangeEvent<SourceRecord> r : records) {
DebeziumEngine.Offsets offsets = committer.buildOffsets(); DebeziumEngine.Offsets offsets = committer.buildOffsets();
offsets.set(CUSTOM_SOURCE_OFFSET_PARTITION, 1L); offsets.set(CUSTOM_SOURCE_OFFSET_PARTITION, EXPECTED_CUSTOM_OFFSET);
logger.info(r.record().sourceOffset().toString()); logger.info(r.record().sourceOffset().toString());
committer.markProcessed(r, offsets); committer.markProcessed(r, offsets);
} }
@ -389,17 +394,17 @@ public void shouldRunEngineWithConsumerSettingOffsets() throws Exception {
allLatch.await(5000, TimeUnit.MILLISECONDS); allLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0); assertThat(allLatch.getCount()).isEqualTo(0);
boolean containsCustomPartition = false; SafeObjectInputStream inputStream = new SafeObjectInputStream(java.nio.file.Files.newInputStream(OFFSET_STORE_PATH.toAbsolutePath()));
try (BufferedReader br = new BufferedReader(new FileReader(OFFSET_STORE_PATH.toString()))) { Object obj = inputStream.readObject();
String line; Map<byte[], byte[]> raw = (Map) obj;
while ((line = br.readLine()) != null) { Set<Map.Entry<byte[], byte[]>> fileOffsetStoreEntrySingleton = raw.entrySet();
logger.info(line); assertThat(fileOffsetStoreEntrySingleton.size()).isEqualTo(1);
if (line.contains(CUSTOM_SOURCE_OFFSET_PARTITION)) { Map.Entry<byte[], byte[]> fileOffsetEntry = fileOffsetStoreEntrySingleton.iterator().next();
containsCustomPartition = true; ByteBuffer offsetJsonString = fileOffsetEntry.getValue() != null ? ByteBuffer.wrap(fileOffsetEntry.getValue()) : null;
} JsonDeserializer jsonDeserializer = new JsonDeserializer();
} JsonNode partitionToOffsetMap = jsonDeserializer.deserialize(TEST_TOPIC, offsetJsonString.array());
} Long actualOffset = partitionToOffsetMap.get(CUSTOM_SOURCE_OFFSET_PARTITION).asLong();
assertThat(containsCustomPartition).isTrue(); assertThat(actualOffset).isEqualTo(EXPECTED_CUSTOM_OFFSET);
// Stop the connector ... // Stop the connector ...
stopConnector(); stopConnector();