diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java index 6bb85c0a4..f682a5ef3 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java @@ -483,6 +483,15 @@ public void run() { finally { // Close the offset storage and finally the connector ... stopOffsetStoreAndConnector(connector, connectorClassName, offsetStore, connectorCallback); + // Close the transformation chain. + if (transformations != null) { + try { + transformations.close(); + } + catch (IOException e) { + fail("Failed to close transformations: ", e); + } + } } } catch (EmbeddedEngineRuntimeException e) { diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java index ec56a4b40..5ff804eef 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java @@ -283,6 +283,14 @@ private void close(final State stateBeforeStop) { LOGGER.warn("Failed to close header converter: ", e); } } + if (transformations != null) { + try { + transformations.close(); + } + catch (IOException e) { + LOGGER.warn("Failed to close transformations: ", e); + } + } shutDownLatch.countDown(); } diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java index 09fcf0e5e..b8ff8b7cb 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java @@ -21,12 +21,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.file.FileStreamSourceConnector; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.Transformation; import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; import org.junit.Before; @@ -451,6 +453,46 @@ public void testExecuteSmt() throws Exception { stopEngine(); } + @Test + @FixFor("DBZ-8106") + public void testCloseSmt() throws Exception { + final Properties props = new Properties(); + props.setProperty(ConnectorConfig.NAME_CONFIG, "debezium-engine"); + props.setProperty(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + props.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSourceConnector.class.getName()); + props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + props.setProperty(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "0"); + props.setProperty(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH.toAbsolutePath().toString()); + props.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, "testTopic"); + props.setProperty("transforms", "close"); + props.setProperty("transforms.close.type", "io.debezium.embedded.async.AsyncEmbeddedEngineTest$CloseTestTransform"); + + CountDownLatch callbackLatch = new CountDownLatch(1); + + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + engine = builder + .using(props) + .using(new TestEngineConnectorCallback()) + .using((success, message, error) -> { + if (success) { + callbackLatch.countDown(); + } + }) + .notifying((records, committer) -> { + }) + .build(); + + engineExecSrv.submit(() -> { + LoggingContext.forConnector(getClass().getSimpleName(), "", "engine"); + engine.run(); + }); + + callbackLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); + stopEngine(); + + assertThat(CloseTestTransform.wasClosed).isTrue(); + } + @Test public void testPollingIsRetriedUponFailure() throws Exception { final Properties props = new Properties(); @@ -756,4 +798,31 @@ public List poll() throws InterruptedException { } } + public static class CloseTestTransform implements Transformation { + + public static boolean wasClosed; + + @Override + public SourceRecord apply(SourceRecord record) { + // Nothing to do. + return null; + } + + @Override + public ConfigDef config() { + // Nothing to do. + return null; + } + + @Override + public void close() { + wasClosed = true; + } + + @Override + public void configure(Map map) { + wasClosed = false; + } + } + }