DBZ-2897 Extended RecordCommitter interface to handle updated source offsets.

Extended the DebeziumEngine interface to include a new method for marking a
record processed with updated source offsets. Implemented the new interface
method in both ConvertingEngineBuilder and EmbeddedEngine. EmbeddedEngine
creates a new SourceRecord with the updated sourceOffsets and passes this
to the original markProcessed method. Added a unit test in EmbeddedEngineTest
that verifies that the file offset storage contains the updated source offset’s
partition. Added a revapi ignore case to prevent failure from a method added
to an interface without a default method. Updated docs for Debezium Engine
that describe the new functionality.
This commit is contained in:
Thomas Thornton 2021-01-29 14:04:01 -08:00 committed by Gunnar Morling
parent 673a1657a7
commit 8d87f5feae
6 changed files with 112 additions and 0 deletions

View File

@ -121,6 +121,14 @@ public static interface RecordCommitter<R> {
* Should be called when a batch of records is finished being processed.
*/
void markBatchFinished() throws InterruptedException;
/**
* Marks a record with updated source offsets as processed.
*
* @param record the record to commit
* @param sourceOffset the source offset to update the record with
*/
void markProcessed(R record, Map<String, ?> sourceOffset) throws InterruptedException;
}
/**

View File

@ -7,6 +7,7 @@
import java.io.IOException;
import java.time.Clock;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;
@ -93,6 +94,11 @@ public void markProcessed(R record) throws InterruptedException {
public void markBatchFinished() throws InterruptedException {
committer.markBatchFinished();
}
@Override
public void markProcessed(R record, Map<String, ?> sourceOffset) throws InterruptedException {
committer.markProcessed(fromFormat.apply(record), sourceOffset);
}
}));
return this;
}

View File

@ -906,6 +906,14 @@ public synchronized void markProcessed(SourceRecord record) throws InterruptedEx
public synchronized void markBatchFinished() throws InterruptedException {
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeout, task);
}
@Override
public synchronized void markProcessed(SourceRecord record, Map<String, ?> sourceOffset) throws InterruptedException {
SourceRecord recordWithUpdatedOffsets = new SourceRecord(record.sourcePartition(), sourceOffset, record.topic(),
record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(),
record.timestamp(), record.headers());
markProcessed(recordWithUpdatedOffsets);
}
};
}

View File

@ -7,7 +7,9 @@
import static org.fest.assertions.Assertions.assertThat;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
@ -15,6 +17,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -328,6 +331,80 @@ public void shouldRunDebeziumEngine() throws Exception {
stopConnector();
}
@Test
@FixFor("DBZ-2897")
public void shouldRunEngineWithConsumerSettingOffsets() throws Exception {
// Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES);
final Properties props = new Properties();
props.setProperty("name", "debezium-engine");
props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX");
String CUSTOM_SOURCE_OFFSET_PARTITION = "test_topic_partition1";
CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
// create an engine with our custom class
final DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(props)
.notifying((records, committer) -> {
assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
Integer groupCount = records.size() / NUMBER_OF_LINES;
for (RecordChangeEvent<SourceRecord> r : records) {
Map<String, Long> newSourceOffset = Collections.singletonMap(CUSTOM_SOURCE_OFFSET_PARTITION, 1L);
logger.info(r.record().sourceOffset().toString());
committer.markProcessed(r, newSourceOffset);
}
committer.markBatchFinished();
firstLatch.countDown();
for (int i = 0; i < groupCount; i++) {
allLatch.countDown();
}
})
.using(this.getClass().getClassLoader())
.build();
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run();
});
firstLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(firstLatch.getCount()).isEqualTo(0);
for (int i = 0; i < 5; i++) {
// Add a few more lines, and then verify they are consumed ...
appendLinesToSource(NUMBER_OF_LINES);
Thread.sleep(10);
}
allLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);
boolean containsCustomPartition = false;
try (BufferedReader br = new BufferedReader(new FileReader(OFFSET_STORE_PATH.toString()))) {
String line;
while ((line = br.readLine()) != null) {
logger.info(line);
if (line.contains(CUSTOM_SOURCE_OFFSET_PARTITION)) {
containsCustomPartition = true;
}
}
}
assert containsCustomPartition;
// Stop the connector ...
stopConnector();
}
@Test
public void shouldExecuteSmt() throws Exception {
// Add initial content to the file ...

View File

@ -333,6 +333,10 @@ This interface has single function with the following signature:
As mentioned in the Javadoc, the `RecordCommitter` object is to be called for each record and once each batch is finished.
The `RecordCommitter` interface is threadsafe, which allows for flexible processing of records.
You can optionally overwrite the offsets of the records that are processed. This is done by calling
`RecordCommitter#markProcessed(SourceRecord record, Map<String, ?> sourceOffset)`, where the `sourceOffset` corresponds
to the new offsets that will replace the offsets of the original record.
To use the `ChangeConsumer` API, you must pass an implementation of the interface to the `notifying` API, as seen below:
[source,java,indent=0]

View File

@ -1,5 +1,14 @@
<?xml version='1.0' encoding='UTF-8'?>
<analysisConfiguration>
<version-1.5.0>
<revapi.ignore>
<item>
<code>java.method.addedToInterface</code>
<class>io.debezium.engine.DebeziumEngine.RecordCommitter</class>
<justification>This interface is not supposed to be implemented by clients.</justification>
</item>
</revapi.ignore>
</version-1.5.0>
<!-- No changes as of yet. This is just an example of how to tell Revapi to ignore intentional changes.
<version-1.2.0>
<revapi.ignore>