diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/FileSignalChannel.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/FileSignalChannel.java new file mode 100644 index 000000000..d674b1c93 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/FileSignalChannel.java @@ -0,0 +1,132 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.signal.channels; + +import java.io.File; +import java.io.FileWriter; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.debezium.DebeziumException; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.pipeline.signal.SignalRecord; + +/** + * The class responsible for processing of signals delivered to Debezium via a file. + * The signal message must have the following structure, formatted as json line: + * + * + * @author Ismail Simsek + * + */ +public class FileSignalChannel implements SignalChannelReader { + public static final String CONFIGURATION_FIELD_PREFIX_STRING = "signal."; + public static final Field SIGNAL_FILE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "file") + .withDisplayName("Signal file name") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.LONG) + .withImportance(ConfigDef.Importance.HIGH) + .withDescription("The name of the file for the signals to the connector") + .withValidation(Field::isRequired); + public static final String CHANNEL_NAME = "file"; + private static final Logger LOGGER = LoggerFactory.getLogger(FileSignalChannel.class); + ObjectMapper mapper = new ObjectMapper(); + private File signalFile; + + @Override + public String name() { + return CHANNEL_NAME; + } + + @Override + public void init(CommonConnectorConfig connectorConfig) { + + Configuration signalConfig = connectorConfig.getConfig().subset(CONFIGURATION_FIELD_PREFIX_STRING, false) + .edit() + .withDefault(FileSignalChannel.SIGNAL_FILE, "file-signals.txt") + .build(); + this.signalFile = new File(signalConfig.getString(SIGNAL_FILE)); + LOGGER.info("Reading '{}' file for signals", signalFile.getAbsolutePath()); + } + + @Override + public List read() { + List signals = new ArrayList<>(); + + if (!signalFile.exists() || signalFile.isDirectory()) { + LOGGER.trace("Signal file not found '{}'", signalFile.getAbsolutePath()); + return signals; + } + + try { + // read all the signals from file. signal line must be json formatted + List lines = Files.readAllLines(signalFile.toPath()); + if (!lines.isEmpty()) { + // remove signals from file by overriding file with empty content + new FileWriter(signalFile, false).close(); + } + Iterator lineIterator = lines.iterator(); + while (lineIterator.hasNext()) { + String signalLine = lineIterator.next(); + if (signalLine == null || signalLine.isBlank()) { + LOGGER.debug("Ignoring empty signal line: `{}`", signalLine); + lineIterator.remove(); + continue; + } + try { + SignalRecord signal = readSignalString(signalLine); + signals.add(signal); + LOGGER.info("Processing signal: {}, {}, {}, {}", signal.getId(), signal.getType(), signal.getData(), signal.getAdditionalData()); + } + catch (final Exception e) { + LOGGER.warn("Skipped signal due to an error '{}'", signalLine, e); + } + lineIterator.remove(); + } + + } + catch (Exception e) { + throw new DebeziumException("Failed to read signal file " + signalFile.getAbsolutePath(), e); + } + + return signals; + } + + private SignalRecord readSignalString(String signalLine) throws JsonProcessingException { + LOGGER.trace("Processing signal line: {}", signalLine); + JsonNode signalJson = mapper.readTree(signalLine); + Map additionalData = signalJson.has("additionalData") ? mapper.convertValue(signalJson.get("additionalData"), new TypeReference<>() { + }) : Map.of(); + Long channelOffset = signalJson.has("channelOffset") ? signalJson.get("channelOffset").asLong(0L) : 0L; + String id = signalJson.get("id").asText(); + String type = signalJson.get("type").asText(); + String data = signalJson.get("data").toString(); + return new SignalRecord(id, type, data, channelOffset, additionalData); + } + + @Override + public void close() { + } + +} diff --git a/debezium-core/src/test/java/io/debezium/pipeline/signal/FileSignalChannelTest.java b/debezium-core/src/test/java/io/debezium/pipeline/signal/FileSignalChannelTest.java new file mode 100644 index 000000000..efd9fac89 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/pipeline/signal/FileSignalChannelTest.java @@ -0,0 +1,66 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.signal; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.List; + +import org.junit.Test; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.pipeline.signal.channels.FileSignalChannel; + +/** + * @author Ismail Simsek + * + */ +public class FileSignalChannelTest { + + Path signalsData = Paths.get("src", "test", "resources").resolve("debezium_signaling_file.signals.txt"); + Path signalsFile = Paths.get("src", "test", "resources").resolve("debezium_signaling_file.txt"); + + @Test + public void shouldLoadFileSignalsTest() throws IOException { + Files.copy(signalsData, signalsFile, StandardCopyOption.REPLACE_EXISTING); + + final FileSignalChannel fileSignalChannel = new FileSignalChannel(); + fileSignalChannel.init(config()); + List signalRecords = fileSignalChannel.read(); + // only two whitespace lines are ignored + assertThat(signalRecords).hasSize(2); + assertThat(signalRecords.get(0).getData().contains("public.MyFirstTable")).isTrue(); + Files.deleteIfExists(signalsFile.toAbsolutePath()); + } + + protected CommonConnectorConfig config() { + return new CommonConnectorConfig(Configuration.create() + .with(FileSignalChannel.SIGNAL_FILE, signalsFile.toString()) + .build(), 0) { + @Override + protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { + return null; + } + + @Override + public String getContextName() { + return null; + } + + @Override + public String getConnectorName() { + return null; + } + }; + } +} diff --git a/debezium-core/src/test/resources/debezium_signaling_file.signals.txt b/debezium-core/src/test/resources/debezium_signaling_file.signals.txt new file mode 100644 index 000000000..685445473 --- /dev/null +++ b/debezium-core/src/test/resources/debezium_signaling_file.signals.txt @@ -0,0 +1,9 @@ + + + + + + + +{"id":"d139b9b7-7777-4547-917d-111111111111", "type":"execute-snapshot", "data":{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}} +{"id":"d139b9b7-7777-4547-917d-222222222222", "type":"execute-snapshot", "data":{"data-collections": ["public2.MyFirstTable", "public2.MySecondTable"]}} \ No newline at end of file