DBZ-7661 Close properly offset backing store

This commit is contained in:
Vojtech Juranek 2024-03-18 22:32:38 +01:00 committed by Jiri Pechanec
parent 16a089abb6
commit 8e0c6ad88e
3 changed files with 42 additions and 2 deletions

View File

@ -70,6 +70,7 @@
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.source.DebeziumSourceConnectorContext;
import io.debezium.engine.source.EngineSourceConnector;
import io.debezium.engine.source.EngineSourceConnectorContext;
import io.debezium.engine.source.EngineSourceTask;
@ -335,7 +336,7 @@ private Map<String, String> initializeConnector() throws Exception {
final OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName, offsetKeyConverter, offsetValueConverter);
LOGGER.debug("Initializing Connect connector itself");
connector.initialize(new EngineSourceConnectorContext(this, offsetReader, offsetWriter));
connector.initialize(new EngineSourceConnectorContext(this, offsetStore, offsetReader, offsetWriter));
return connectorConfig;
}
@ -618,6 +619,27 @@ private void stopSourceTasks(final List<EngineSourceTask> tasks) {
}
}
/**
* Stops {@link OffsetBackingStore} used by engine if there is any.
* If engine fails during initialization phase before connector context is created, the reference may be {@code null} and in such this method does nothing.
*
* @param connectorContext {@link DebeziumSourceConnectorContext} used by the connector or {@code null} if the context hasn't been initialized yet.
*/
private void stopOffsetStore(final DebeziumSourceConnectorContext connectorContext) {
if (connectorContext == null || connectorContext.offsetStore() == null) {
LOGGER.debug("Offset store hasn't been initialized yet, closing of the offset store is skipped.");
return;
}
LOGGER.debug("Stopping offset backing store.");
try {
connectorContext.offsetStore().stop();
}
catch (Exception e) {
LOGGER.warn("Failed to stop offset backing store", e);
}
}
/**
* Stops connector's tasks if they are already running and then stops connector itself.
*
@ -630,6 +652,7 @@ private void stopConnector(final List<EngineSourceTask> tasks, final State engin
stopPollingIfNeeded();
stopSourceTasks(tasks);
}
stopOffsetStore(connector.context());
LOGGER.debug("Stopping the connector.");
connector.connectConnector().stop();
LOGGER.debug("Calling connector callback after connector stop");

View File

@ -5,6 +5,7 @@
*/
package io.debezium.engine.source;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
@ -18,6 +19,13 @@
@Incubating
public interface DebeziumSourceConnectorContext {
/**
* Returns the {@link OffsetBackingStore} used by this connector.
* This should be used mainly for proper closing the offset backing store.
* @return the {@link OffsetBackingStore} use by this connector.
*/
OffsetBackingStore offsetStore();
/**
* Returns the {@link OffsetStorageReader} for this DebeziumConnectorContext.
* @return the OffsetStorageReader for this connector.

View File

@ -6,6 +6,7 @@
package io.debezium.engine.source;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
@ -21,15 +22,23 @@
public class EngineSourceConnectorContext implements DebeziumSourceConnectorContext, SourceConnectorContext {
private final AsyncEmbeddedEngine engine;
private final OffsetBackingStore offsetStore;
private final OffsetStorageReader offsetReader;
private final OffsetStorageWriter offsetWriter;
public EngineSourceConnectorContext(final AsyncEmbeddedEngine engine, final OffsetStorageReader offsetReader, final OffsetStorageWriter offsetWriter) {
public EngineSourceConnectorContext(final AsyncEmbeddedEngine engine, final OffsetBackingStore offsetStore, final OffsetStorageReader offsetReader,
final OffsetStorageWriter offsetWriter) {
this.engine = engine;
this.offsetStore = offsetStore;
this.offsetReader = offsetReader;
this.offsetWriter = offsetWriter;
}
@Override
public OffsetBackingStore offsetStore() {
return offsetStore;
}
@Override
public OffsetStorageReader offsetStorageReader() {
return offsetReader;