DBZ-5986 Add Debezium server sink for Infinispan

This commit is contained in:
Vojtech Juranek 2023-01-09 14:11:16 +01:00 committed by Jiri Pechanec
parent 5e56ad75f8
commit bf699ec8b7
11 changed files with 512 additions and 0 deletions

View File

@ -19,6 +19,7 @@
<version.pravega>0.9.1</version.pravega> <version.pravega>0.9.1</version.pravega>
<version.nats>2.16.3</version.nats> <version.nats>2.16.3</version.nats>
<version.stan>2.2.3</version.stan> <version.stan>2.2.3</version.stan>
<version.infinispan>14.0.4.Final</version.infinispan>
<version.commons.logging>1.2</version.commons.logging> <version.commons.logging>1.2</version.commons.logging>
</properties> </properties>
@ -135,6 +136,11 @@
<artifactId>debezium-server-nats-jetstream</artifactId> <artifactId>debezium-server-nats-jetstream</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-infinispan</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>io.debezium</groupId> <groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId> <artifactId>debezium-server-core</artifactId>
@ -198,6 +204,13 @@
<artifactId>pravega-client</artifactId> <artifactId>pravega-client</artifactId>
<version>${version.pravega}</version> <version>${version.pravega}</version>
</dependency> </dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-bom</artifactId>
<version>${version.infinispan}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>

View File

@ -121,6 +121,10 @@
<groupId>io.debezium</groupId> <groupId>io.debezium</groupId>
<artifactId>debezium-server-nats-jetstream</artifactId> <artifactId>debezium-server-nats-jetstream</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-infinispan</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.quarkus</groupId> <groupId>io.quarkus</groupId>
<artifactId>quarkus-logging-json</artifactId> <artifactId>quarkus-logging-json</artifactId>

View File

@ -0,0 +1,137 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-server</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-server-infinispan</artifactId>
<name>Debezium Server Infinispan Sink Adapter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-client-hotrod</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<executions>
<execution>
<id>make-index</id>
<goals>
<goal>jandex</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemProperties>
<test.type>IT</test.type>
</systemProperties>
<runOrder>${runOrder}</runOrder>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<show>private</show>
<nohelp>true</nohelp>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>quick</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>quick</name>
</property>
</activation>
<properties>
<skipITs>true</skipITs>
<docker.skip>true</docker.skip>
</properties>
</profile>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Do not perform any Docker-related functionality
To use, specify "-DskipITs" on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>skip-integration-tests</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>skipITs</name>
</property>
</activation>
<properties>
<docker.skip>true</docker.skip>
</properties>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,128 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.infinispan;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
/**
* An implementation of the {@link DebeziumEngine.ChangeConsumer} interface that publishes change event messages to predefined Infinispan cache.
*
* @author vjuranek
*/
@Named("infinispan")
@Dependent
public class InfinispanSinkConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanSinkConsumer.class);
private static final String CONF_PREFIX = "debezium.sink.infinispan.";
private static final String SERVER_HOST = CONF_PREFIX + "server.host";
private static final String SERVER_PORT = CONF_PREFIX + "server.port";
private static final String CACHE_NAME = CONF_PREFIX + "cache";
private static final String USER_NAME = CONF_PREFIX + "user";
private static final String PASSWORD = CONF_PREFIX + "password";
private RemoteCacheManager remoteCacheManager;
private RemoteCache cache;
@Inject
@CustomConsumerBuilder
Instance<RemoteCache> customCache;
@PostConstruct
void connect() {
if (customCache.isResolvable()) {
cache = customCache.get();
LOGGER.info("Obtained custom cache with configuration '{}'", cache.getRemoteCacheContainer().getConfiguration());
return;
}
final Config config = ConfigProvider.getConfig();
final String serverHost = config.getValue(SERVER_HOST, String.class);
final String cacheName = config.getValue(CACHE_NAME, String.class);
final Integer serverPort = config.getOptionalValue(SERVER_PORT, Integer.class).orElse(ConfigurationProperties.DEFAULT_HOTROD_PORT);
final Optional<String> user = config.getOptionalValue(USER_NAME, String.class);
final Optional<String> password = config.getOptionalValue(PASSWORD, String.class);
ConfigurationBuilder builder = new ConfigurationBuilder();
String uri;
if (user.isPresent() && password.isPresent()) {
uri = String.format("hotrod://%s:%s@%s:%d", user.get(), password.get(), serverHost, serverPort);
}
else {
uri = String.format("hotrod://%s:%d", serverHost, serverPort);
}
LOGGER.info("Connecting to the Infinispan server using URI '{}'", uri);
builder.uri(uri);
remoteCacheManager = new RemoteCacheManager(builder.build());
cache = remoteCacheManager.getCache(cacheName);
LOGGER.info("Connected to the Infinispan server {}", remoteCacheManager.getServers()[0]);
}
@PreDestroy
void close() {
try {
if (remoteCacheManager != null) {
remoteCacheManager.close();
LOGGER.info("Connection to Infinispan server closed.");
}
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
Map<Object, Object> entries = new HashMap<>(records.size());
for (ChangeEvent<Object, Object> record : records) {
if (record.value() != null) {
LOGGER.trace("Received event {} = '{}'", getString(record.key()), getString(record.value()));
entries.put(record.key(), record.value());
}
}
try {
cache.putAll(entries);
}
catch (Exception e) {
throw new DebeziumException(e);
}
for (ChangeEvent<Object, Object> record : records) {
committer.markProcessed(record);
}
committer.markBatchFinished();
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.infinispan;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import org.awaitility.Awaitility;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import io.debezium.server.DebeziumServer;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
/**
* @author vjuranek
*/
@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@QuarkusTestResource(InfinispanTestResourceLifecycleManager.class)
public class InfinispanSinkConsumerIT {
private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanSinkConsumerIT.class);
private static final int MESSAGE_COUNT = 4;
static {
Testing.Files.delete(InfinispanTestConfigSource.OFFSET_STORE_PATH);
Testing.Files.createTestingFile(InfinispanTestConfigSource.OFFSET_STORE_PATH);
}
@Inject
DebeziumServer server;
private DefaultCacheManager cacheManager;
private Cache<String, String> cache;
@Test
public void testStreaming() throws Exception {
ConfigurationBuilder builder = new ConfigurationBuilder();
String uri = String.format("hotrod://%s:%s@%s:%d",
InfinispanTestConfigSource.USER_NAME,
InfinispanTestConfigSource.PASSWORD,
InfinispanTestResourceLifecycleManager.getHost(),
InfinispanTestResourceLifecycleManager.getPort());
LOGGER.info("Connected to Infinispan server using URI '{}'", uri);
builder.uri(uri);
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(builder.build());
RemoteCache<Object, Object> remoteCache = remoteCacheManager.getCache(InfinispanTestConfigSource.CACHE_NAME);
assertThat(remoteCache.size()).isEqualTo(0);
Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
return remoteCache.size() == MESSAGE_COUNT;
});
assertThat(remoteCache.size()).isEqualTo(MESSAGE_COUNT);
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.infinispan;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import io.debezium.server.TestConfigSource;
public class InfinispanTestConfigSource extends TestConfigSource {
public static final String CACHE_NAME = "debezium_test";
public static final String USER_NAME = "admin";
public static final String PASSWORD = "secret";
public static final String CONFIG_FILE = "infinispan-local.xml";
public InfinispanTestConfigSource() {
Map<String, String> infinispanTest = new HashMap<>();
infinispanTest.put("debezium.sink.type", "infinispan");
infinispanTest.put("debezium.sink.infinispan.cache", CACHE_NAME);
infinispanTest.put("debezium.sink.infinispan.user", USER_NAME);
infinispanTest.put("debezium.sink.infinispan.password", PASSWORD);
infinispanTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
infinispanTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
infinispanTest.put("debezium.source.offset.flush.interval.ms", "0");
infinispanTest.put("debezium.source.topic.prefix", "testc");
infinispanTest.put("debezium.source.schema.include.list", "inventory");
infinispanTest.put("debezium.source.table.include.list", "inventory.customers");
config = infinispanTest;
}
@Override
public int getOrdinal() {
// Configuration property precedence is based on ordinal values and since we override the
// properties in TestConfigSource, we should give this a higher priority.
return super.getOrdinal() + 1;
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.infinispan;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
public class InfinispanTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {
private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanTestResourceLifecycleManager.class);
public static final String INFINISPAN_IMAGE = "quay.io/infinispan/server:14.0.4.Final";
public static final int PORT = ConfigurationProperties.DEFAULT_HOTROD_PORT;
public static final String CONFIG_PATH = "/etc/infinispan-local.xml";
private static final GenericContainer<?> container = new GenericContainer<>(INFINISPAN_IMAGE)
.withExposedPorts(PORT)
.withClasspathResourceMapping(InfinispanTestConfigSource.CONFIG_FILE, CONFIG_PATH, BindMode.READ_ONLY)
.withCommand("-c", CONFIG_PATH)
.withEnv("USER", InfinispanTestConfigSource.USER_NAME)
.withEnv("PASS", InfinispanTestConfigSource.PASSWORD);
private static final AtomicBoolean running = new AtomicBoolean(false);
private static synchronized void init() {
if (!running.get()) {
container.start();
running.set(true);
}
}
public static String getHost() {
return container.getHost();
}
public static int getPort() {
return container.getMappedPort(PORT);
}
@Override
public Map<String, String> start() {
init();
Map<String, String> params = new ConcurrentHashMap<>();
params.put("debezium.sink.infinispan.server.host", getHost());
params.put("debezium.sink.infinispan.server.port", String.valueOf(getPort()));
return params;
}
@Override
public void stop() {
try {
container.stop();
}
catch (Exception e) {
// ignored
}
running.set(false);
}
}

View File

@ -0,0 +1 @@
io.debezium.server.infinispan.InfinispanTestConfigSource

View File

@ -0,0 +1,40 @@
<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:14.0 https://infinispan.org/schemas/infinispan-config-14.0.xsd
urn:infinispan:server:14.0 https://infinispan.org/schemas/infinispan-server-14.0.xsd"
xmlns="urn:infinispan:config:14.0"
xmlns:server="urn:infinispan:server:14.0">
<cache-container name="default" statistics="true">
<security>
<authorization/>
</security>
<local-cache name="debezium_test"/>
</cache-container>
<server xmlns="urn:infinispan:server:14.0">
<interfaces>
<interface name="public">
<inet-address value="${infinispan.bind.address:127.0.0.1}"/>
</interface>
</interfaces>
<socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}">
<socket-binding name="default" port="${infinispan.bind.port:11222}"/>
<socket-binding name="memcached" port="11221"/>
</socket-bindings>
<security>
<security-realms>
<security-realm name="default">
<properties-realm groups-attribute="Roles">
<user-properties path="users.properties"/>
<group-properties path="groups.properties"/>
</properties-realm>
</security-realm>
</security-realms>
</security>
<endpoints socket-binding="default" security-realm="default" />
</server>
</infinispan>

View File

@ -25,6 +25,7 @@
<module>debezium-server-pravega</module> <module>debezium-server-pravega</module>
<module>debezium-server-nats-streaming</module> <module>debezium-server-nats-streaming</module>
<module>debezium-server-nats-jetstream</module> <module>debezium-server-nats-jetstream</module>
<module>debezium-server-infinispan</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>