DBZ-7099 Provide default value for PeriodicCommitOffsetPolicy

In 7b4cf1901 deprecated `io.debezium.embedded.spi.OffsetCommitPolicy`,
which provided also constructor for `Configuration`, was removed.
This constructor was actually used for creating `OffsetCommitPolicy`.
`Configuration` provides default values for options which are not
explicitly set, while `Properties` based constructor cannot do that
and therefore with this switch the default value for
`offset.flush.interval.ms` is now missing.

As debezium-api package has no knowledge about `Configuration`
interface, which is part of debezium-core, and thus about the default
values, specify default value direcly in the `PeriodicCommitOffsetPolicy`
class.
This commit is contained in:
Vojtech Juranek 2023-11-13 14:57:05 +01:00 committed by Jiri Pechanec
parent 893d8eb29f
commit 60939c8965
2 changed files with 49 additions and 1 deletions

View File

@ -40,7 +40,10 @@ class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {
private final Duration minimumTime;
public PeriodicCommitOffsetPolicy(Properties config) {
minimumTime = Duration.ofMillis(Long.valueOf(config.getProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP)));
final long interval = config.containsKey(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP)
? Long.valueOf(config.getProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP))
: 60000L;
minimumTime = Duration.ofMillis(interval);
}
@Override

View File

@ -783,6 +783,51 @@ public void validationThrowsException() throws Exception {
assertThat(isEngineRunning.get()).isFalse();
}
@Test
@FixFor("DBZ-7099")
public void shouldHandleNoDefaultOffsetFlushInterval() throws IOException, InterruptedException {
final Properties props = new Properties();
props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), SimpleSourceConnector.class.getName());
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.put(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS.name(), "10");
final CountDownLatch engineRunning = new CountDownLatch(1);
final CountDownLatch engineStopped = new CountDownLatch(1);
final AtomicBoolean engineSucceeded = new AtomicBoolean(false);
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying((records, committer) -> {
engineRunning.countDown();
})
.using(this.getClass().getClassLoader())
.using(new DebeziumEngine.ConnectorCallback() {
@Override
public void connectorStarted() {
isEngineRunning.compareAndExchange(false, true);
}
})
.using((success, message, error) -> {
engineSucceeded.set(success);
engineStopped.countDown();
})
.build();
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run();
});
engineRunning.await(100, TimeUnit.MILLISECONDS);
assertThat(isEngineRunning.get()).isTrue();
engine.close();
engineStopped.await(100, TimeUnit.MILLISECONDS);
assertThat(engineSucceeded.get()).isTrue();
}
protected void appendLinesToSource(int numberOfLines) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {