DBZ-8106 Close transfomations upon embedded engine stop

This commit is contained in:
Vojtech Juranek 2024-07-30 16:20:49 +02:00 committed by Jiri Pechanec
parent 63377d9d6a
commit fb94614419
3 changed files with 86 additions and 0 deletions

View File

@ -483,6 +483,15 @@ public void run() {
finally { finally {
// Close the offset storage and finally the connector ... // Close the offset storage and finally the connector ...
stopOffsetStoreAndConnector(connector, connectorClassName, offsetStore, connectorCallback); stopOffsetStoreAndConnector(connector, connectorClassName, offsetStore, connectorCallback);
// Close the transformation chain.
if (transformations != null) {
try {
transformations.close();
}
catch (IOException e) {
fail("Failed to close transformations: ", e);
}
}
} }
} }
catch (EmbeddedEngineRuntimeException e) { catch (EmbeddedEngineRuntimeException e) {

View File

@ -283,6 +283,14 @@ private void close(final State stateBeforeStop) {
LOGGER.warn("Failed to close header converter: ", e); LOGGER.warn("Failed to close header converter: ", e);
} }
} }
if (transformations != null) {
try {
transformations.close();
}
catch (IOException e) {
LOGGER.warn("Failed to close transformations: ", e);
}
}
shutDownLatch.countDown(); shutDownLatch.countDown();
} }

View File

@ -21,12 +21,14 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.file.FileStreamSourceConnector; import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException; import org.awaitility.core.ConditionTimeoutException;
import org.junit.Before; import org.junit.Before;
@ -451,6 +453,46 @@ public void testExecuteSmt() throws Exception {
stopEngine(); stopEngine();
} }
@Test
@FixFor("DBZ-8106")
public void testCloseSmt() throws Exception {
final Properties props = new Properties();
props.setProperty(ConnectorConfig.NAME_CONFIG, "debezium-engine");
props.setProperty(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSourceConnector.class.getName());
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "0");
props.setProperty(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, "testTopic");
props.setProperty("transforms", "close");
props.setProperty("transforms.close.type", "io.debezium.embedded.async.AsyncEmbeddedEngineTest$CloseTestTransform");
CountDownLatch callbackLatch = new CountDownLatch(1);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
.using((success, message, error) -> {
if (success) {
callbackLatch.countDown();
}
})
.notifying((records, committer) -> {
})
.build();
engineExecSrv.submit(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run();
});
callbackLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
stopEngine();
assertThat(CloseTestTransform.wasClosed).isTrue();
}
@Test @Test
public void testPollingIsRetriedUponFailure() throws Exception { public void testPollingIsRetriedUponFailure() throws Exception {
final Properties props = new Properties(); final Properties props = new Properties();
@ -756,4 +798,31 @@ public List<SourceRecord> poll() throws InterruptedException {
} }
} }
public static class CloseTestTransform implements Transformation<SourceRecord> {
public static boolean wasClosed;
@Override
public SourceRecord apply(SourceRecord record) {
// Nothing to do.
return null;
}
@Override
public ConfigDef config() {
// Nothing to do.
return null;
}
@Override
public void close() {
wasClosed = true;
}
@Override
public void configure(Map<String, ?> map) {
wasClosed = false;
}
}
} }