From bf699ec8b7dfe597001923b6cfbd2912c888fa19 Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Mon, 9 Jan 2023 14:11:16 +0100 Subject: [PATCH] DBZ-5986 Add Debezium server sink for Infinispan --- debezium-server/debezium-server-bom/pom.xml | 13 ++ debezium-server/debezium-server-dist/pom.xml | 4 + .../debezium-server-infinispan/pom.xml | 137 ++++++++++++++++++ .../infinispan/InfinispanSinkConsumer.java | 128 ++++++++++++++++ .../src/main/resources/META-INF/beans.xml | 0 .../infinispan/InfinispanSinkConsumerIT.java | 73 ++++++++++ .../InfinispanTestConfigSource.java | 45 ++++++ ...nfinispanTestResourceLifecycleManager.java | 70 +++++++++ ...lipse.microprofile.config.spi.ConfigSource | 1 + .../src/test/resources/infinispan-local.xml | 40 +++++ debezium-server/pom.xml | 1 + 11 files changed, 512 insertions(+) create mode 100644 debezium-server/debezium-server-infinispan/pom.xml create mode 100644 debezium-server/debezium-server-infinispan/src/main/java/io/debezium/server/infinispan/InfinispanSinkConsumer.java create mode 100644 debezium-server/debezium-server-infinispan/src/main/resources/META-INF/beans.xml create mode 100644 debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanSinkConsumerIT.java create mode 100644 debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanTestConfigSource.java create mode 100644 debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanTestResourceLifecycleManager.java create mode 100644 debezium-server/debezium-server-infinispan/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource create mode 100644 debezium-server/debezium-server-infinispan/src/test/resources/infinispan-local.xml diff --git a/debezium-server/debezium-server-bom/pom.xml b/debezium-server/debezium-server-bom/pom.xml index 1b5a770b1..f572f5546 100644 --- a/debezium-server/debezium-server-bom/pom.xml +++ b/debezium-server/debezium-server-bom/pom.xml @@ -19,6 +19,7 @@ 0.9.1 2.16.3 2.2.3 + 14.0.4.Final 1.2 @@ -135,6 +136,11 @@ debezium-server-nats-jetstream ${project.version} + + io.debezium + debezium-server-infinispan + ${project.version} + io.debezium debezium-server-core @@ -198,6 +204,13 @@ pravega-client ${version.pravega} + + org.infinispan + infinispan-bom + ${version.infinispan} + pom + import + diff --git a/debezium-server/debezium-server-dist/pom.xml b/debezium-server/debezium-server-dist/pom.xml index f99023431..eddaceddc 100644 --- a/debezium-server/debezium-server-dist/pom.xml +++ b/debezium-server/debezium-server-dist/pom.xml @@ -121,6 +121,10 @@ io.debezium debezium-server-nats-jetstream + + io.debezium + debezium-server-infinispan + io.quarkus quarkus-logging-json diff --git a/debezium-server/debezium-server-infinispan/pom.xml b/debezium-server/debezium-server-infinispan/pom.xml new file mode 100644 index 000000000..60484db70 --- /dev/null +++ b/debezium-server/debezium-server-infinispan/pom.xml @@ -0,0 +1,137 @@ + + + + io.debezium + debezium-server + 2.2.0-SNAPSHOT + ../pom.xml + + 4.0.0 + debezium-server-infinispan + Debezium Server Infinispan Sink Adapter + jar + + + + io.debezium + debezium-server-core + + + org.infinispan + infinispan-client-hotrod + + + + + io.debezium + debezium-server-core + test-jar + test + + + io.debezium + debezium-testing-testcontainers + test + + + org.infinispan + infinispan-core + test + + + + + + + io.quarkus + quarkus-maven-plugin + ${quarkus.version} + + + + build + + + + + + org.jboss.jandex + jandex-maven-plugin + + + make-index + + jandex + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + integration-test + + integration-test + + + + verify + + verify + + + + + ${skipITs} + true + + IT + + ${runOrder} + + + + org.apache.maven.plugins + maven-javadoc-plugin + + private + true + + + + + + + quick + + false + + quick + + + + true + true + + + + + skip-integration-tests + + false + + skipITs + + + + true + + + + diff --git a/debezium-server/debezium-server-infinispan/src/main/java/io/debezium/server/infinispan/InfinispanSinkConsumer.java b/debezium-server/debezium-server-infinispan/src/main/java/io/debezium/server/infinispan/InfinispanSinkConsumer.java new file mode 100644 index 000000000..ef9e57af7 --- /dev/null +++ b/debezium-server/debezium-server-infinispan/src/main/java/io/debezium/server/infinispan/InfinispanSinkConsumer.java @@ -0,0 +1,128 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.infinispan; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.enterprise.context.Dependent; +import javax.enterprise.inject.Instance; +import javax.inject.Inject; +import javax.inject.Named; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; +import org.infinispan.client.hotrod.impl.ConfigurationProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.DebeziumException; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.server.BaseChangeConsumer; +import io.debezium.server.CustomConsumerBuilder; + +/** + * An implementation of the {@link DebeziumEngine.ChangeConsumer} interface that publishes change event messages to predefined Infinispan cache. + * + * @author vjuranek + */ +@Named("infinispan") +@Dependent +public class InfinispanSinkConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { + + private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanSinkConsumer.class); + + private static final String CONF_PREFIX = "debezium.sink.infinispan."; + private static final String SERVER_HOST = CONF_PREFIX + "server.host"; + private static final String SERVER_PORT = CONF_PREFIX + "server.port"; + private static final String CACHE_NAME = CONF_PREFIX + "cache"; + private static final String USER_NAME = CONF_PREFIX + "user"; + private static final String PASSWORD = CONF_PREFIX + "password"; + + private RemoteCacheManager remoteCacheManager; + private RemoteCache cache; + + @Inject + @CustomConsumerBuilder + Instance customCache; + + @PostConstruct + void connect() { + if (customCache.isResolvable()) { + cache = customCache.get(); + LOGGER.info("Obtained custom cache with configuration '{}'", cache.getRemoteCacheContainer().getConfiguration()); + return; + } + + final Config config = ConfigProvider.getConfig(); + final String serverHost = config.getValue(SERVER_HOST, String.class); + final String cacheName = config.getValue(CACHE_NAME, String.class); + final Integer serverPort = config.getOptionalValue(SERVER_PORT, Integer.class).orElse(ConfigurationProperties.DEFAULT_HOTROD_PORT); + final Optional user = config.getOptionalValue(USER_NAME, String.class); + final Optional password = config.getOptionalValue(PASSWORD, String.class); + + ConfigurationBuilder builder = new ConfigurationBuilder(); + String uri; + if (user.isPresent() && password.isPresent()) { + uri = String.format("hotrod://%s:%s@%s:%d", user.get(), password.get(), serverHost, serverPort); + } + else { + uri = String.format("hotrod://%s:%d", serverHost, serverPort); + } + LOGGER.info("Connecting to the Infinispan server using URI '{}'", uri); + builder.uri(uri); + + remoteCacheManager = new RemoteCacheManager(builder.build()); + cache = remoteCacheManager.getCache(cacheName); + LOGGER.info("Connected to the Infinispan server {}", remoteCacheManager.getServers()[0]); + } + + @PreDestroy + void close() { + try { + if (remoteCacheManager != null) { + remoteCacheManager.close(); + LOGGER.info("Connection to Infinispan server closed."); + } + } + catch (Exception e) { + throw new DebeziumException(e); + } + } + + @Override + public void handleBatch(List> records, DebeziumEngine.RecordCommitter> committer) + throws InterruptedException { + Map entries = new HashMap<>(records.size()); + for (ChangeEvent record : records) { + if (record.value() != null) { + LOGGER.trace("Received event {} = '{}'", getString(record.key()), getString(record.value())); + entries.put(record.key(), record.value()); + } + } + + try { + cache.putAll(entries); + } + catch (Exception e) { + throw new DebeziumException(e); + } + + for (ChangeEvent record : records) { + committer.markProcessed(record); + } + + committer.markBatchFinished(); + } +} diff --git a/debezium-server/debezium-server-infinispan/src/main/resources/META-INF/beans.xml b/debezium-server/debezium-server-infinispan/src/main/resources/META-INF/beans.xml new file mode 100644 index 000000000..e69de29bb diff --git a/debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanSinkConsumerIT.java b/debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanSinkConsumerIT.java new file mode 100644 index 000000000..80222a7cf --- /dev/null +++ b/debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanSinkConsumerIT.java @@ -0,0 +1,73 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.infinispan; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; + +import org.awaitility.Awaitility; +import org.infinispan.Cache; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; +import org.infinispan.manager.DefaultCacheManager; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; + +import io.debezium.server.DebeziumServer; +import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; +import io.debezium.util.Testing; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +/** + * @author vjuranek + */ +@QuarkusTest +@QuarkusTestResource(PostgresTestResourceLifecycleManager.class) +@QuarkusTestResource(InfinispanTestResourceLifecycleManager.class) +public class InfinispanSinkConsumerIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanSinkConsumerIT.class); + private static final int MESSAGE_COUNT = 4; + + static { + Testing.Files.delete(InfinispanTestConfigSource.OFFSET_STORE_PATH); + Testing.Files.createTestingFile(InfinispanTestConfigSource.OFFSET_STORE_PATH); + } + + @Inject + DebeziumServer server; + + private DefaultCacheManager cacheManager; + private Cache cache; + + @Test + public void testStreaming() throws Exception { + ConfigurationBuilder builder = new ConfigurationBuilder(); + String uri = String.format("hotrod://%s:%s@%s:%d", + InfinispanTestConfigSource.USER_NAME, + InfinispanTestConfigSource.PASSWORD, + InfinispanTestResourceLifecycleManager.getHost(), + InfinispanTestResourceLifecycleManager.getPort()); + LOGGER.info("Connected to Infinispan server using URI '{}'", uri); + builder.uri(uri); + RemoteCacheManager remoteCacheManager = new RemoteCacheManager(builder.build()); + RemoteCache remoteCache = remoteCacheManager.getCache(InfinispanTestConfigSource.CACHE_NAME); + + assertThat(remoteCache.size()).isEqualTo(0); + + Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { + return remoteCache.size() == MESSAGE_COUNT; + }); + + assertThat(remoteCache.size()).isEqualTo(MESSAGE_COUNT); + } +} diff --git a/debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanTestConfigSource.java b/debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanTestConfigSource.java new file mode 100644 index 000000000..db4696ab1 --- /dev/null +++ b/debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanTestConfigSource.java @@ -0,0 +1,45 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.infinispan; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; + +import io.debezium.server.TestConfigSource; + +public class InfinispanTestConfigSource extends TestConfigSource { + + public static final String CACHE_NAME = "debezium_test"; + public static final String USER_NAME = "admin"; + public static final String PASSWORD = "secret"; + public static final String CONFIG_FILE = "infinispan-local.xml"; + + public InfinispanTestConfigSource() { + Map infinispanTest = new HashMap<>(); + + infinispanTest.put("debezium.sink.type", "infinispan"); + infinispanTest.put("debezium.sink.infinispan.cache", CACHE_NAME); + infinispanTest.put("debezium.sink.infinispan.user", USER_NAME); + infinispanTest.put("debezium.sink.infinispan.password", PASSWORD); + infinispanTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + infinispanTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + infinispanTest.put("debezium.source.offset.flush.interval.ms", "0"); + infinispanTest.put("debezium.source.topic.prefix", "testc"); + infinispanTest.put("debezium.source.schema.include.list", "inventory"); + infinispanTest.put("debezium.source.table.include.list", "inventory.customers"); + + config = infinispanTest; + } + + @Override + public int getOrdinal() { + // Configuration property precedence is based on ordinal values and since we override the + // properties in TestConfigSource, we should give this a higher priority. + return super.getOrdinal() + 1; + } +} diff --git a/debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanTestResourceLifecycleManager.java b/debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanTestResourceLifecycleManager.java new file mode 100644 index 000000000..4fa41bd88 --- /dev/null +++ b/debezium-server/debezium-server-infinispan/src/test/java/io/debezium/server/infinispan/InfinispanTestResourceLifecycleManager.java @@ -0,0 +1,70 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.infinispan; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.infinispan.client.hotrod.impl.ConfigurationProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class InfinispanTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanTestResourceLifecycleManager.class); + public static final String INFINISPAN_IMAGE = "quay.io/infinispan/server:14.0.4.Final"; + public static final int PORT = ConfigurationProperties.DEFAULT_HOTROD_PORT; + public static final String CONFIG_PATH = "/etc/infinispan-local.xml"; + private static final GenericContainer container = new GenericContainer<>(INFINISPAN_IMAGE) + .withExposedPorts(PORT) + .withClasspathResourceMapping(InfinispanTestConfigSource.CONFIG_FILE, CONFIG_PATH, BindMode.READ_ONLY) + .withCommand("-c", CONFIG_PATH) + .withEnv("USER", InfinispanTestConfigSource.USER_NAME) + .withEnv("PASS", InfinispanTestConfigSource.PASSWORD); + + private static final AtomicBoolean running = new AtomicBoolean(false); + + private static synchronized void init() { + if (!running.get()) { + container.start(); + running.set(true); + } + } + + public static String getHost() { + return container.getHost(); + } + + public static int getPort() { + return container.getMappedPort(PORT); + } + + @Override + public Map start() { + init(); + + Map params = new ConcurrentHashMap<>(); + params.put("debezium.sink.infinispan.server.host", getHost()); + params.put("debezium.sink.infinispan.server.port", String.valueOf(getPort())); + return params; + } + + @Override + public void stop() { + try { + container.stop(); + } + catch (Exception e) { + // ignored + } + running.set(false); + } +} diff --git a/debezium-server/debezium-server-infinispan/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource b/debezium-server/debezium-server-infinispan/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource new file mode 100644 index 000000000..5cc056a6c --- /dev/null +++ b/debezium-server/debezium-server-infinispan/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource @@ -0,0 +1 @@ +io.debezium.server.infinispan.InfinispanTestConfigSource \ No newline at end of file diff --git a/debezium-server/debezium-server-infinispan/src/test/resources/infinispan-local.xml b/debezium-server/debezium-server-infinispan/src/test/resources/infinispan-local.xml new file mode 100644 index 000000000..a934b6858 --- /dev/null +++ b/debezium-server/debezium-server-infinispan/src/test/resources/infinispan-local.xml @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/debezium-server/pom.xml b/debezium-server/pom.xml index b0cc2f59e..6277983c6 100644 --- a/debezium-server/pom.xml +++ b/debezium-server/pom.xml @@ -25,6 +25,7 @@ debezium-server-pravega debezium-server-nats-streaming debezium-server-nats-jetstream + debezium-server-infinispan