DBZ-3621 Added debezium jdbc storage module

This commit is contained in:
Kanthi Subramanian 2023-04-05 15:05:03 -04:00 committed by Jiri Pechanec
parent 8be3806525
commit aca0da75e1
13 changed files with 2092 additions and 0 deletions

View File

@ -0,0 +1,2 @@
Run
`mvn clean verify` to run integration tests

View File

@ -0,0 +1,541 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-storage-jdbc</artifactId>
<name>Debezium Storage JDBC Module</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<scope>provided</scope>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-ddl-parser</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.36.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>7.3.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.3.1</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<!--
Specify the properties for the various Docker containers.
-->
<mysql.user>mysqluser</mysql.user>
<mysql.password>mysqlpw</mysql.password>
<mysql.replica.user>mysqlreplica</mysql.replica.user>
<mysql.replica.password>mysqlpw</mysql.replica.password>
<mysql.port>3306</mysql.port>
<mysql.percona.port>3306</mysql.percona.port>
<mysql.gtid.port>3306</mysql.gtid.port>
<mysql.gtid.replica.port>3306</mysql.gtid.replica.port>
<mysql.ssl.port>3306</mysql.ssl.port>
<mysql.replica.port>3306</mysql.replica.port> <!-- by default use primary as 'replica' -->
<mysql.init.timeout>60000</mysql.init.timeout> <!-- 60 seconds -->
<apicurio.port>8080</apicurio.port>
<apicurio.init.timeout>60000</apicurio.init.timeout> <!-- 60 seconds -->
<!--
By default, we should use the docker image maintained by the MySQL team. This property is changed with different profiles.
However, we run one container with GTIDs and one without.
-->
<docker.dbs>debezium/mysql-server-test-database</docker.dbs>
<docker.filter>${docker.dbs}</docker.filter>
<docker.skip>false</docker.skip>
<docker.initimage>rm -f /etc/localtime; ln -s /usr/share/zoneinfo/US/Samoa /etc/localtime</docker.initimage>
</properties>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<watchInterval>500</watchInterval>
<logDate>default</logDate>
<verbose>true</verbose>
<!--autoPull>always</autoPull-->
<images>
</images>
</configuration>
<!--
Connect this plugin to the maven lifecycle around the integration-test phase:
start the container in pre-integration-test and stop it in post-integration-test.
-->
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>build</goal>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<!--
Unlike surefire, the failsafe plugin ensures 'post-integration-test' phase always runs, even
when there are failed integration tests. We rely upon this to always shut down the Docker container
after the integration tests (defined as '*IT.java') are run.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemPropertyVariables>
<!-- Make these available to the tests via system properties -->
<database.hostname>${docker.host.address}</database.hostname>
<database.port>${mysql.port}</database.port>
<database.user>${mysql.user}</database.user>
<database.password>${mysql.password}</database.password>
<database.replica.hostname>${docker.host.address}</database.replica.hostname>
<database.replica.port>${mysql.replica.port}</database.replica.port>
<skipLongRunningTests>${skipLongRunningTests}</skipLongRunningTests>
</systemPropertyVariables>
<runOrder>${runOrder}</runOrder>
</configuration>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>*</include>
<include>**/*</include>
</includes>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<filtering>false</filtering>
<includes>
<include>*</include>
<include>**/*</include>
</includes>
</testResource>
</testResources>
</build>
<!--
Define several useful profiles
-->
<profiles>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This assembly profile is used during official builds. In addition to
compiling, and running the unit and integration tests like the non-assembly
profiles, this profile creates additional (like the connector plugin archives),
starts up all three Docker containers (normal MySQL, MySQL+GTIDs, and alt-MySQL)
and runs the integration tests against each of them.
To use, specify "-Passembly" on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>assembly</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<!-- Run multiple images at the same time, but use different ports for all MySQL servers -->
<docker.dbs>
debezium/mysql-server-test-database,debezium/mysql-server-gtids-test-database,debezium/mysql-server-gtids-test-database-replica,debezium/percona-server-test-database,debezium/mysql-server-test-database-ssl
</docker.dbs>
<mysql.port>4301</mysql.port>
<mysql.gtid.port>4302</mysql.gtid.port>
<mysql.gtid.replica.port>4303</mysql.gtid.replica.port>
<mysql.percona.port>4304</mysql.percona.port>
<mysql.ssl.port>4305</mysql.ssl.port>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${version.assembly.plugin}</version>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-assembly-descriptors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>default</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<attach>true</attach> <!-- we want attach & deploy these to Maven -->
<descriptorRefs>
<descriptorRef>${assembly.descriptor}</descriptorRef>
</descriptorRefs>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
<!--
Override the failsafe plugin to run the integration tests for each set of databases.
But make sure each database server is used only once ...
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${version.failsafe.plugin}</version>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemPropertyVariables>
<!-- Make these available to the tests via system properties -->
<database.hostname>${docker.host.address}</database.hostname>
<database.user>${mysql.user}</database.user>
<database.password>${mysql.password}</database.password>
<database.replica.hostname>${docker.host.address}</database.replica.hostname>
<database.replica.user>${mysql.replica.user}</database.replica.user>
<database.replica.password>${mysql.replica.password}</database.replica.password>
<database.port>${mysql.port}</database.port>
<database.replica.port>${mysql.port}</database.replica.port>
<database.ssl.mode>disabled</database.ssl.mode>
<skipLongRunningTests>false</skipLongRunningTests>
</systemPropertyVariables>
<runOrder>${runOrder}</runOrder>
</configuration>
<executions>
<!-- First run the integration tests with the non-GTID server alone -->
<execution>
<id>integration-test-mysql</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<!-- same port for both, since we're only running one server -->
<database.port>${mysql.port}</database.port>
<database.replica.port>${mysql.port}</database.replica.port>
</systemPropertyVariables>
</configuration>
</execution>
<!-- Then run the integration tests with the GTID server + replica server -->
<execution>
<id>integration-test-mysql-gtids-with-replica</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<database.port>${mysql.gtid.port}</database.port>
<database.replica.port>${mysql.gtid.replica.port}</database.replica.port>
</systemPropertyVariables>
</configuration>
</execution>
<!-- Then just Percona Server -->
<execution>
<id>integration-test-percona-server</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<!-- same port for both, since we're only running one server -->
<database.port>${mysql.percona.port}</database.port>
<database.replica.port>${mysql.percona.port}</database.replica.port>
</systemPropertyVariables>
</configuration>
</execution>
<!-- SSL -->
<execution>
<id>integration-test-ssl</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<database.ssl.mode>verify_ca</database.ssl.mode>
<database.ssl.truststore>${project.basedir}/src/test/resources/ssl/truststore</database.ssl.truststore>
<database.ssl.truststore.password>debezium</database.ssl.truststore.password>
<database.ssl.keystore>${project.basedir}/src/test/resources/ssl/keystore</database.ssl.keystore>
<database.ssl.keystore.password>debezium</database.ssl.keystore.password>
<!-- same port for both, since we're only running one server -->
<database.port>${mysql.ssl.port}</database.port>
<database.replica.port>${mysql.ssl.port}</database.replica.port>
</systemPropertyVariables>
</configuration>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Do not perform any Docker-related functionality
To use, specify "-DskipITs" on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>skip-integration-tests</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>skipITs</name>
</property>
</activation>
<properties>
<docker.skip>true</docker.skip>
</properties>
</profile>
<profile>
<id>quick</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>quick</name>
</property>
</activation>
<properties>
<skipITs>true</skipITs>
<docker.skip>true</docker.skip>
</properties>
</profile>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Use the alternative Docker image for MySQL.
To use, specify "-Dmysql-gtids" or -Pmysql-gtids on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>mysql-gtids</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>mysql-gtids</name>
</property>
</activation>
<properties>
<!-- Docker properties -->
<docker.dbs>debezium/mysql-server-gtids-test-database</docker.dbs>
<!-- Integration test properties -->
<database.port>${mysql.gtid.port}</database.port>
<database.replica.port>${mysql.gtid.port}</database.replica.port>
</properties>
</profile>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Use the Docker image for Percona Server.
To use, specify "-Dpercona-server" or -Ppercona-server on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>percona-server</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>percona-server</name>
</property>
</activation>
<properties>
<!-- Docker properties -->
<docker.dbs>debezium/percona-server-test-database</docker.dbs>
<!-- Integration test properties -->
<database.port>${mysql.percona.port}</database.port>
<database.replica.port>${mysql.percona.port}</database.replica.port>
<docker.initimage>ln -s /usr/share/zoneinfo/Pacific/Pago_Pago /etc/localtime</docker.initimage>
</properties>
</profile>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Use the Docker image for a MySQL replica of another MySQL server
configured to use GTIDs. To use, specify "-Dmysql-replica"
or -Pmysql-replica on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>mysql-replica</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>mysql-replica</name>
</property>
</activation>
<properties>
<!-- Docker properties -->
<mysql.gtid.port>3306</mysql.gtid.port>
<mysql.gtid.replica.port>4306</mysql.gtid.replica.port>
<docker.dbs>debezium/mysql-server-gtids-test-database,debezium/mysql-server-gtids-test-database-replica</docker.dbs>
<!-- Integration test properties -->
<database.port>${mysql.gtid.port}</database.port>
<database.replica.port>${mysql.gtid.replica.port}</database.replica.port>
</properties>
</profile>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Use the alternative Docker image for MySQL.
To use, specify "-Dmysql-ssl" or -Pmysql-ssl on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>mysql-ssl</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>mysql-ssl</name>
</property>
</activation>
<properties>
<!-- Docker properties -->
<docker.dbs>debezium/mysql-server-test-database-ssl</docker.dbs>
<!-- Integration test properties -->
<database.port>${mysql.ssl.port}</database.port>
<database.replica.port>${mysql.ssl.port}</database.replica.port>
<docker.initimage>ln -s /usr/share/zoneinfo/Pacific/Pago_Pago /etc/localtime</docker.initimage>
</properties>
</profile>
<profile>
<id>apicurio</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>apicurio</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<use.apicurio>true</use.apicurio>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<docker.filter>${docker.dbs},apicurio/apicurio-registry-mem:${version.apicurio}</docker.filter>
</properties>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,44 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.jdbc;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_PASSWORD;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_URI;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_USER;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_TABLE_NAME;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.WorkerConfig;
public class JdbcConfig extends WorkerConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = WorkerConfig.baseConfigDef()
.define(OFFSET_STORAGE_JDBC_URI.name(),
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Jdbc database uri")
.define(OFFSET_STORAGE_JDBC_USER.name(),
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Jdbc database username")
.define(OFFSET_STORAGE_JDBC_PASSWORD.name(),
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Jdbc database password")
.define(OFFSET_STORAGE_TABLE_NAME.name(),
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Name of the table to store offsets");
}
public JdbcConfig(Map<String, String> props) {
super(CONFIG, props);
}
}

View File

@ -0,0 +1,252 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.jdbc;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Field;
/**
* Implementation of OffsetBackingStore that saves data to database table.
*/
public class JdbcOffsetBackingStore implements OffsetBackingStore {
public static final Field OFFSET_STORAGE_JDBC_URI = Field.create("offset.storage.jdbc.uri")
.withDescription("URI of the database which will be used to record the database history")
.withValidation(Field::isRequired);
public static final Field OFFSET_STORAGE_JDBC_USER = Field.create("offset.storage.jdbc.user")
.withDescription("Username of the database which will be used to record the database history")
.withValidation(Field::isRequired);
public static final Field OFFSET_STORAGE_JDBC_PASSWORD = Field.create("offset.storage.jdbc.password")
.withDescription("Password of the database which will be used to record the database history")
.withValidation(Field::isRequired);
public static final String DEFAULT_OFFSET_STORAGE_TABLE_NAME = "debezium_offset_storage";
public static final Field OFFSET_STORAGE_TABLE_NAME = Field.create("offset.storage.jdbc.offset_table_name")
.withDescription("Name of the table to store offsets")
.withDefault(DEFAULT_OFFSET_STORAGE_TABLE_NAME);
public static final String OFFSET_STORAGE_TABLE_DDL = "CREATE TABLE %s(id VARCHAR(36) NOT NULL, offset_key VARCHAR(1255), offset_val VARCHAR(1255)," +
"record_insert_ts TIMESTAMP NOT NULL," +
"record_insert_seq INTEGER NOT NULL" +
")";
public static final String OFFSET_STORAGE_TABLE_SELECT = "SELECT id, offset_key, offset_val FROM %s ORDER BY record_insert_ts, record_insert_seq";
public static final String OFFSET_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ?, ?, ? )";
public static final String OFFSET_STORAGE_TABLE_DELETE = "DELETE FROM %s";
private static final Logger LOG = LoggerFactory.getLogger(JdbcOffsetBackingStore.class);
protected ConcurrentHashMap<String, String> data = new ConcurrentHashMap<>();
protected ExecutorService executor;
private final AtomicInteger recordInsertSeq = new AtomicInteger(0);
private Connection conn;
private String jdbcUri;
private String offsetStorageTableName;
public JdbcOffsetBackingStore() {
}
public String fromByteBuffer(ByteBuffer data) {
return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null;
}
public ByteBuffer toByteBuffer(String data) {
return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null;
}
@Override
public void configure(WorkerConfig config) {
try {
jdbcUri = config.getString("offset.storage.jdbc.uri");
offsetStorageTableName = config.getString(OFFSET_STORAGE_TABLE_NAME.name());
conn = DriverManager.getConnection(jdbcUri, config.getString(OFFSET_STORAGE_JDBC_USER.name()), config.getString(OFFSET_STORAGE_JDBC_PASSWORD.name()));
conn.setAutoCommit(false);
}
catch (Exception e) {
throw new IllegalStateException("Failed to connect JDBC offset backing store: " + jdbcUri, e);
}
}
@Override
public synchronized void start() {
executor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory(
this.getClass().getSimpleName() + "-%d", false));
LOG.info("Starting JdbcOffsetBackingStore db {}", jdbcUri);
try {
initializeTable();
}
catch (SQLException e) {
throw new IllegalStateException("Failed to create JDBC offset table: " + jdbcUri, e);
}
load();
}
private void initializeTable() throws SQLException {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists = dbMeta.getTables(null, null, offsetStorageTableName, null);
if (tableExists.next()) {
return;
}
LOG.debug("Creating table {} to store offset", offsetStorageTableName);
conn.prepareStatement(String.format(OFFSET_STORAGE_TABLE_DDL, offsetStorageTableName)).execute();
}
protected void save() {
try {
LOG.debug("Saving data to state table...");
try (PreparedStatement sqlDelete = conn.prepareStatement(String.format(OFFSET_STORAGE_TABLE_DELETE, offsetStorageTableName))) {
sqlDelete.executeUpdate();
for (Map.Entry<String, String> mapEntry : data.entrySet()) {
Timestamp currentTs = new Timestamp(System.currentTimeMillis());
String key = (mapEntry.getKey() != null) ? mapEntry.getKey() : null;
String value = (mapEntry.getValue() != null) ? mapEntry.getValue() : null;
// Execute a query
try (PreparedStatement sql = conn.prepareStatement(String.format(OFFSET_STORAGE_TABLE_INSERT, offsetStorageTableName))) {
sql.setString(1, UUID.randomUUID().toString());
sql.setString(2, key);
sql.setString(3, value);
sql.setTimestamp(4, currentTs);
sql.setInt(5, recordInsertSeq.incrementAndGet());
sql.executeUpdate();
}
}
}
conn.commit();
}
catch (SQLException e) {
try {
conn.rollback();
}
catch (SQLException ex) {
//
}
throw new ConnectException(e);
}
}
private void load() {
try {
ConcurrentHashMap<String, String> tmpData = new ConcurrentHashMap<>();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(String.format(OFFSET_STORAGE_TABLE_SELECT, offsetStorageTableName));
while (rs.next()) {
String key = rs.getString("offset_key");
String val = rs.getString("offset_val");
tmpData.put(key, val);
}
data = tmpData;
}
catch (SQLException e) {
LOG.error("Failed recover records from database: {}", jdbcUri, e);
}
}
private void stopExecutor() {
if (executor != null) {
executor.shutdown();
// Best effort wait for any get() and set() tasks (and caller's callbacks) to complete.
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!executor.shutdownNow().isEmpty()) {
throw new ConnectException("Failed to stop JdbcOffsetBackingStore. Exiting without cleanly " +
"shutting down pending tasks and/or callbacks.");
}
executor = null;
}
}
@Override
public synchronized void stop() {
stopExecutor();
try {
if (conn != null) {
conn.close();
}
}
catch (SQLException e) {
LOG.error("Exception while stopping JdbcOffsetBackingStore", e);
}
LOG.info("Stopped JdbcOffsetBackingStore");
}
@Override
public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values,
final Callback<Void> callback) {
return executor.submit(new Callable<>() {
@Override
public Void call() {
for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
if (entry.getKey() == null) {
continue;
}
data.put(fromByteBuffer(entry.getKey()), fromByteBuffer(entry.getValue()));
}
save();
if (callback != null) {
callback.onCompletion(null, null);
}
return null;
}
});
}
@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
@Override
public Map<ByteBuffer, ByteBuffer> call() {
Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
for (ByteBuffer key : keys) {
result.put(key, toByteBuffer(data.get(fromByteBuffer(key))));
}
return result;
}
});
}
}

View File

@ -0,0 +1,303 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.jdbc.history;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.util.Collect;
import io.debezium.util.FunctionalReadWriteLock;
/**
* A {@link SchemaHistory} implementation that stores the schema history to database table
*
* @author Ismail Simsek
*/
@ThreadSafe
@Incubating
public final class JdbcSchemaHistory extends AbstractSchemaHistory {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSchemaHistory.class);
public static final Field JDBC_URI = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "jdbc.uri")
.withDescription("URI of the database which will be used to record the database history")
.withValidation(Field::isRequired);
public static final Field JDBC_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "jdbc.user")
.withDescription("Username of the database which will be used to record the database history")
.withValidation(Field::isRequired);
public static final Field JDBC_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "jdbc.password")
.withDescription("Password of the database which will be used to record the database history")
.withValidation(Field::isRequired);
private static final String DATABASE_HISTORY_TABLE_NAME = "debezium_database_history";
private static final String DATABASE_HISTORY_TABLE_DDL = "CREATE TABLE " + DATABASE_HISTORY_TABLE_NAME +
"(" +
"id VARCHAR(36) NOT NULL," +
"history_data VARCHAR(65000)," +
"history_data_seq INTEGER," +
"record_insert_ts TIMESTAMP NOT NULL," +
"record_insert_seq INTEGER NOT NULL" +
")";
private static final String DATABASE_HISTORY_TABLE_SELECT = "SELECT id, history_data, history_data_seq FROM " + DATABASE_HISTORY_TABLE_NAME
+ " ORDER BY record_insert_ts, record_insert_seq, id, history_data_seq";
private static final String DATABASE_HISTORY_TABLE_DATA_EXISTS_SELECT = "SELECT * FROM " + DATABASE_HISTORY_TABLE_NAME + " LIMIT 1";
private static final String DATABASE_HISTORY_TABLE_INSERT = "INSERT INTO " + DATABASE_HISTORY_TABLE_NAME + " VALUES ( ?, ?, ?, ?, ? )";
private static final Collection<Field> ALL_FIELDS = Collect.arrayListOf(JDBC_USER);
private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
private final DocumentWriter writer = DocumentWriter.defaultWriter();
private final DocumentReader reader = DocumentReader.defaultReader();
private final AtomicBoolean running = new AtomicBoolean();
private final AtomicInteger recordInsertSeq = new AtomicInteger(0);
private Connection conn;
private String jdbcUri;
@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
if (!config.validateAndRecord(ALL_FIELDS, LOG::error)) {
throw new ConnectException(
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
config.validateAndRecord(ALL_FIELDS, LOG::error);
if (running.get()) {
throw new IllegalStateException("Database history already initialized db: " + config.getString(JDBC_URI));
}
super.configure(config, comparator, listener, useCatalogBeforeSchema);
try {
jdbcUri = config.getString(JDBC_URI.name());
conn = DriverManager.getConnection(config.getString(JDBC_URI.name()), config.getString(JDBC_USER.name()), config.getString(JDBC_PASSWORD.name()));
conn.setAutoCommit(false);
}
catch (SQLException e) {
throw new IllegalStateException("Failed to connect " + jdbcUri);
}
}
@Override
public void start() {
super.start();
lock.write(() -> {
if (running.compareAndSet(false, true)) {
if (conn == null) {
throw new IllegalStateException("Database connection must be set before it is started");
}
try {
if (!storageExists()) {
initializeStorage();
}
}
catch (Exception e) {
throw new SchemaHistoryException("Unable to create history table " + jdbcUri + ": " + e.getMessage(), e);
}
}
});
}
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (record == null) {
return;
}
lock.write(() -> {
if (!running.get()) {
throw new IllegalStateException("The history has been stopped and will not accept more records");
}
try {
String line = writer.write(record.document());
Timestamp currentTs = new Timestamp(System.currentTimeMillis());
List<String> substrings = split(line, 65000);
int partSeq = 0;
for (String dataPart : substrings) {
PreparedStatement sql = conn.prepareStatement(DATABASE_HISTORY_TABLE_INSERT);
sql.setString(1, UUID.randomUUID().toString());
sql.setString(2, dataPart);
sql.setInt(3, partSeq);
sql.setTimestamp(4, currentTs);
sql.setInt(5, recordInsertSeq.incrementAndGet());
sql.executeUpdate();
partSeq++;
}
conn.commit();
}
catch (IOException | SQLException e) {
try {
conn.rollback();
}
catch (SQLException ex) {
// ignore
}
throw new SchemaHistoryException("Failed to store record: " + record, e);
}
});
}
private static List<String> split(String s, int chunkSize) {
List<String> chunks = new ArrayList<>();
for (int i = 0; i < s.length(); i += chunkSize) {
chunks.add(s.substring(i, Math.min(s.length(), i + chunkSize)));
}
return chunks;
}
@Override
public void stop() {
running.set(false);
super.stop();
try {
conn.close();
}
catch (SQLException e) {
LOG.error("Exception during stop", e);
}
}
@Override
protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> {
try {
if (exists()) {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(DATABASE_HISTORY_TABLE_SELECT);
String prevHistoryId = "-1";
// StringBuilder historyData = new StringBuilder();
while (rs.next()) {
String historyId = rs.getString("id");
String historyData = rs.getString("history_data");
if (historyData.isEmpty() == false) {
records.accept(new HistoryRecord(reader.read(historyData)));
}
// String historyId = rs.getString("id");
// // System.out.println("HISTORY ID" + historyId);
// if (!historyId.equals(prevHistoryId)) {
// if (historyData.length() > 0) {
// // System.out.println("RECORDS ACCEPT");
// records.accept(new HistoryRecord(reader.read(historyData.toString())));
// }
// else {
// // System.out.println("HISTORY DATA EMPTY");
// }
// prevHistoryId = historyId;
// historyData = new StringBuilder();
// }
// historyData.append(rs.getString("history_data"));
// // System.out.println("HISTORY DATA" + historyData);
}
}
else {
// System.out.println("STORAGE DOES NOT EXIST");
LOG.error("Storage does not exist when recovering records");
}
}
catch (IOException | SQLException e) {
throw new SchemaHistoryException("Failed to recover records", e);
}
});
}
@Override
public boolean storageExists() {
boolean sExists = false;
try {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists = dbMeta.getTables(null, null, DATABASE_HISTORY_TABLE_NAME, null);
if (tableExists.next()) {
sExists = true;
}
}
catch (SQLException e) {
throw new SchemaHistoryException("Failed to check database history storage", e);
}
return sExists;
}
@Override
public boolean exists() {
if (!storageExists()) {
return false;
}
boolean isExists = false;
try {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(DATABASE_HISTORY_TABLE_DATA_EXISTS_SELECT);
while (rs.next()) {
isExists = true;
}
}
catch (SQLException e) {
throw new SchemaHistoryException("Failed to recover records", e);
}
return isExists;
}
@Override
public String toString() {
return "Jdbc database: " + (jdbcUri != null ? jdbcUri : "(unstarted)");
}
@Override
public void initializeStorage() {
try {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists = dbMeta.getTables(null, null, DATABASE_HISTORY_TABLE_NAME, null);
if (tableExists.next()) {
return;
}
LOG.debug("Creating table {} to store database history", DATABASE_HISTORY_TABLE_NAME);
conn.prepareStatement(DATABASE_HISTORY_TABLE_DDL).execute();
LOG.info("Created table in given database...");
}
catch (SQLException e) {
throw new SchemaHistoryException("Error initializing Database history storage", e);
}
}
}

View File

@ -0,0 +1,222 @@
-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
-- However, in this database we'll grant 3 users different privileges:
--
-- 1) 'replicator' - all privileges required by the binlog reader (setup through 'readbinlog.sql')
-- 2) 'snapper' - all privileges required by the snapshot reader AND binlog reader
-- 3) 'mysqluser' - all privileges
--
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator' IDENTIFIED BY 'replpass';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'snapper'@'%' IDENTIFIED BY 'snapperpass';
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: emptydb
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE emptydb;
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: readbinlog_test
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE readbinlog_test;
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: connector_test
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE connector_test;
USE connector_test;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
(default,"car battery","12V car battery",8.1),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
(default,"hammer","12oz carpenter's hammer",0.75),
(default,"hammer","14oz carpenter's hammer",0.875),
(default,"hammer","16oz carpenter's hammer",1.0),
(default,"rocks","box of assorted rocks",5.3),
(default,"jacket","water resistent black wind breaker",0.1),
(default,"spare tire","24 inch spare tire",22.2);
-- Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
product_id INTEGER NOT NULL PRIMARY KEY,
quantity INTEGER NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);
INSERT INTO products_on_hand VALUES (101,3);
INSERT INTO products_on_hand VALUES (102,8);
INSERT INTO products_on_hand VALUES (103,18);
INSERT INTO products_on_hand VALUES (104,4);
INSERT INTO products_on_hand VALUES (105,5);
INSERT INTO products_on_hand VALUES (106,0);
INSERT INTO products_on_hand VALUES (107,44);
INSERT INTO products_on_hand VALUES (108,2);
INSERT INTO products_on_hand VALUES (109,5);
-- Create some customers ...
CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
INSERT INTO customers
VALUES (default,"Sally","Thomas","sally.thomas@acme.com"),
(default,"George","Bailey","gbailey@foobar.com"),
(default,"Edward","Walker","ed@walker.com"),
(default,"Anne","Kretchmar","annek@noanswer.org");
-- Create some veyr simple orders
CREATE TABLE orders (
order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL,
FOREIGN KEY order_customer (purchaser) REFERENCES customers(id),
FOREIGN KEY ordered_product (product_id) REFERENCES products(id)
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '2016-01-17', 1002, 2, 105),
(default, '2016-02-18', 1004, 3, 109),
(default, '2016-02-19', 1002, 2, 106),
(default, '2016-02-21', 1003, 1, 107);
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: connector_test_ro
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE connector_test_ro;
USE connector_test_ro;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
(default,"car battery","12V car battery",8.1),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
(default,"hammer","12oz carpenter's hammer",0.75),
(default,"hammer","14oz carpenter's hammer",0.875),
(default,"hammer","16oz carpenter's hammer",1.0),
(default,"rocks","box of assorted rocks",5.3),
(default,"jacket","water resistent black wind breaker",0.1),
(default,"spare tire","24 inch spare tire",22.2);
-- Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
product_id INTEGER NOT NULL PRIMARY KEY,
quantity INTEGER NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);
INSERT INTO products_on_hand VALUES (101,3);
INSERT INTO products_on_hand VALUES (102,8);
INSERT INTO products_on_hand VALUES (103,18);
INSERT INTO products_on_hand VALUES (104,4);
INSERT INTO products_on_hand VALUES (105,5);
INSERT INTO products_on_hand VALUES (106,0);
INSERT INTO products_on_hand VALUES (107,44);
INSERT INTO products_on_hand VALUES (108,2);
INSERT INTO products_on_hand VALUES (109,5);
-- Create some customers ...
CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
INSERT INTO customers
VALUES (default,"Sally","Thomas","sally.thomas@acme.com"),
(default,"George","Bailey","gbailey@foobar.com"),
(default,"Edward","Walker","ed@walker.com"),
(default,"Anne","Kretchmar","annek@noanswer.org");
-- Create some veyr simple orders
CREATE TABLE orders (
order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL,
FOREIGN KEY order_customer (purchaser) REFERENCES customers(id),
FOREIGN KEY ordered_product (product_id) REFERENCES products(id)
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '2016-01-17', 1002, 2, 105),
(default, '2016-02-18', 1004, 3, 109),
(default, '2016-02-19', 1002, 2, 106),
(default, '2016-02-21', 1003, 1, 107);
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: regression_test
-- ----------------------------------------------------------------------------------------------------------------
-- The integration test for this database expects to scans all of the binlog events associated with this database
-- without error or problems. The integration test does not modify any records in this database, so this script
-- must contain all operations to these tables.
#
CREATE DATABASE regression_test;
USE regression_test;
-- DBZ-61 handle binary value recorded as hex string value
CREATE TABLE t1464075356413_testtable6 (
pk_column int auto_increment NOT NULL,
varbinary_col varbinary(20) NOT NULL,
PRIMARY KEY(pk_column)
);
INSERT INTO t1464075356413_testtable6 (pk_column, varbinary_col)
VALUES(default, 0x4D7953514C);
-- DBZ-84 Handle TINYINT
CREATE TABLE dbz84_integer_types_table (
-- The column lengths are used for display purposes, and do not affect the range of values
colTinyIntA tinyint NOT NULL DEFAULT 100,
colTinyIntB tinyint(1) NOT NULL DEFAULT 101,
colTinyIntC tinyint(2) UNSIGNED NOT NULL DEFAULT 102,
colTinyIntD tinyint(3) UNSIGNED NOT NULL DEFAULT 103,
colSmallIntA smallint NOT NULL DEFAULT 200,
colSmallIntB smallint(1) NOT NULL DEFAULT 201,
colSmallIntC smallint(2) NOT NULL DEFAULT 201,
colSmallIntD smallint(3) NOT NULL DEFAULT 201,
colMediumIntA mediumint NOT NULL DEFAULT 300,
colMediumIntB mediumint(1) NOT NULL DEFAULT 301,
colMediumIntC mediumint(2) NOT NULL DEFAULT 302,
colMediumIntD mediumint(3) NOT NULL DEFAULT 303,
colIntA int NOT NULL DEFAULT 400,
colIntB int(1) NOT NULL DEFAULT 401,
colIntC int(2) NOT NULL DEFAULT 402,
colIntD int(3) NOT NULL DEFAULT 403,
colBigIntA bigint NOT NULL DEFAULT 500,
colBigIntB bigint(1) NOT NULL DEFAULT 501,
colBigIntC bigint(2) NOT NULL DEFAULT 502,
colBigIntD bigint(3) NOT NULL DEFAULT 503
);
INSERT INTO dbz84_integer_types_table
VALUES(127,-128,128,255, default,201,202,203, default,301,302,303, default,401,402,403, default,501,502,503);

View File

@ -0,0 +1,61 @@
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
# --------------------------------------------------------------------------------------------
# This section specifies 5.5 and cross-version common configurations
# --------------------------------------------------------------------------------------------
[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
#secure-file-priv=/var/lib/mysql-files
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
#log-error=/var/log/mysqld.log
#pid-file=/var/run/mysqld/mysqld.pid
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 112233
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
# --------------------------------------------------------------------------------------------
# This section specifies 5.6 specific configurations
# --------------------------------------------------------------------------------------------
[mysqld-5.6]
default_authentication_plugin = mysql_native_password
# --------------------------------------------------------------------------------------------
# This section specifies 5.7 specific configurations
# --------------------------------------------------------------------------------------------
[mysqld-5.7]
default_authentication_plugin = mysql_native_password
# --------------------------------------------------------------------------------------------
# This section specifies 8.0 specific configurations
# --------------------------------------------------------------------------------------------
[mysqld-8.0]
default_authentication_plugin = mysql_native_password

View File

@ -0,0 +1,156 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.jdbc;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_PASSWORD;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_URI;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_USER;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_TABLE_NAME;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.util.Testing;
/**
* @author Kanthi Subramanian
*/
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
public class JdbcOffsetBackingStoreIT extends AbstractConnectorTest {
private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-connect.txt").toAbsolutePath();
private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "connector_test")
.withDbHistoryPath(SCHEMA_HISTORY_PATH);
private final UniqueDatabase RO_DATABASE = new UniqueDatabase("myServer2", "connector_test_ro", DATABASE)
.withDbHistoryPath(SCHEMA_HISTORY_PATH);
private Configuration config;
@Before
public void beforeEach() {
// stopConnector();
// DATABASE.createAndInitialize();
// RO_DATABASE.createAndInitialize();
initializeConnectorTestFramework();
// Testing.Files.delete(SCHEMA_HISTORY_PATH);
}
@After
public void afterEach() {
try {
stopConnector();
}
finally {
// Testing.Files.delete(SCHEMA_HISTORY_PATH);
}
}
@Test
public void shouldStartCorrectlyWithJDBCOffsetStorage() throws SQLException, InterruptedException, IOException {
String masterPort = System.getProperty("database.port", "3306");
String replicaPort = System.getProperty("database.replica.port", "3306");
boolean replicaIsMaster = masterPort.equals(replicaPort);
if (!replicaIsMaster) {
// Give time for the replica to catch up to the master ...
Thread.sleep(5000L);
}
File dbFile = File.createTempFile("test-", "db");
String jdbcUri = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath());
// Use the DB configuration to define the connector's configuration to use the "replica"
// which may be the same as the "master" ...
config = Configuration.create()
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))
.with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))
.with(MySqlConnectorConfig.USER, "mysqluser")
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(CommonConnectorConfig.TOPIC_PREFIX, DATABASE.getServerName())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "connector_test")
.with("database.whitelist", "connector_test")
.with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 10_000)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(OFFSET_STORAGE_JDBC_URI.name(), jdbcUri)
.with(OFFSET_STORAGE_JDBC_USER.name(), "user")
.with(OFFSET_STORAGE_JDBC_PASSWORD.name(), "pass")
.with(OFFSET_STORAGE_TABLE_NAME.name(), "offsets_jdbc")
.with("snapshot.mode", "initial")
.with("offset.flush.interval.ms", "1000")
.with("offset.storage", "io.debezium.storage.jdbc.JdbcOffsetBackingStore")
.build();
// Start the connector ...
start(MySqlConnector.class, config);
Thread.sleep(4000);
validateIfDataIsCreatedInJDBCDatabase(jdbcUri, "user", "pass", "offsets_jdbc");
}
/**
* Function to validate the offset storage data that is created
* in Database.
*
* @param jdbcUri
* @param jdbcUser
* @param jdbcPassword
*/
private void validateIfDataIsCreatedInJDBCDatabase(String jdbcUri, String jdbcUser,
String jdbcPassword, String jdbcTableName) {
Connection connection = null;
try {
// create a database connection
connection = DriverManager.getConnection(jdbcUri, jdbcUser, jdbcPassword);
Statement statement = connection.createStatement();
statement.setQueryTimeout(30); // set timeout to 30 sec.
ResultSet rs = statement.executeQuery(String.format("select * from %s", jdbcTableName));
while (rs.next()) {
String offsetKey = rs.getString("offset_key");
String offsetValue = rs.getString("offset_val");
String recordInsertTimestamp = rs.getString("record_insert_ts");
String recordInsertSequence = rs.getString("record_insert_seq");
Assert.assertFalse(offsetKey.isBlank() && offsetKey.isEmpty());
Assert.assertFalse(offsetValue.isBlank() && offsetValue.isEmpty());
Assert.assertFalse(recordInsertTimestamp.isBlank() && recordInsertTimestamp.isEmpty());
Assert.assertFalse(recordInsertSequence.isBlank() && recordInsertSequence.isEmpty());
}
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.jdbc;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_PASSWORD;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_URI;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_USER;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_TABLE_NAME;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.Callback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author Ismail simsek
*/
public class JdbcOffsetBackingStoreTest {
private final Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
private final Map<ByteBuffer, ByteBuffer> secondSet = new HashMap<>();
JdbcOffsetBackingStore store;
Map<String, String> props;
JdbcConfig config;
File dbFile;
@Before
public void setup() throws IOException {
dbFile = File.createTempFile("test-", "db");
store = new JdbcOffsetBackingStore();
props = new HashMap<>();
props.put(OFFSET_STORAGE_JDBC_URI.name(), "jdbc:sqlite:" + dbFile.getAbsolutePath());
props.put(OFFSET_STORAGE_JDBC_USER.name(), "user");
props.put(OFFSET_STORAGE_JDBC_PASSWORD.name(), "pass");
props.put(OFFSET_STORAGE_TABLE_NAME.name(), "offsets_jdbc");
props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
config = new JdbcConfig(props);
store.configure(config);
store.start();
firstSet.put(store.toByteBuffer("key"), store.toByteBuffer("value"));
firstSet.put(null, null);
secondSet.put(store.toByteBuffer("key1secondSet"), store.toByteBuffer("value1secondSet"));
secondSet.put(store.toByteBuffer("key2secondSet"), store.toByteBuffer("value2secondSet"));
}
@After
public void teardown() {
dbFile.delete();
}
@Test
public void testInitialize() {
// multiple initialization should not fail
// first one should create the table and following ones should use the created table
store.start();
store.start();
store.start();
}
@Test
public void testGetSet() throws Exception {
Callback<Void> cb = new Callback<Void>() {
public void onCompletion(Throwable error, Void result) {
return;
}
};
store.set(firstSet, cb).get();
Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(store.toByteBuffer("key"), store.toByteBuffer("bad"))).get();
assertEquals(store.toByteBuffer("value"), values.get(store.toByteBuffer("key")));
Assert.assertNull(values.get(store.toByteBuffer("bad")));
}
@Test
public void testSaveRestore() throws Exception {
Callback<Void> cb = new Callback<Void>() {
public void onCompletion(Throwable error, Void result) {
return;
}
};
store.set(firstSet, cb).get();
store.set(secondSet, cb).get();
store.stop();
// Restore into a new store mand make sure its correctly reload
JdbcOffsetBackingStore restore = new JdbcOffsetBackingStore();
restore.configure(config);
restore.start();
Map<ByteBuffer, ByteBuffer> values = restore.get(Collections.singletonList(store.toByteBuffer("key"))).get();
Map<ByteBuffer, ByteBuffer> values2 = restore.get(Collections.singletonList(store.toByteBuffer("key1secondSet"))).get();
Map<ByteBuffer, ByteBuffer> values3 = restore.get(Collections.singletonList(store.toByteBuffer("key2secondSet"))).get();
assertEquals(store.toByteBuffer("value"), values.get(store.toByteBuffer("key")));
assertEquals(store.toByteBuffer("value1secondSet"), values2.get(store.toByteBuffer("key1secondSet")));
assertEquals(store.toByteBuffer("value2secondSet"), values3.get(store.toByteBuffer("key2secondSet")));
}
}

View File

@ -0,0 +1,212 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.jdbc.history;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_PASSWORD;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_URI;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_JDBC_USER;
import static io.debezium.storage.jdbc.JdbcOffsetBackingStore.OFFSET_STORAGE_TABLE_NAME;
import static io.debezium.storage.jdbc.history.JdbcSchemaHistory.JDBC_PASSWORD;
import static io.debezium.storage.jdbc.history.JdbcSchemaHistory.JDBC_URI;
import static io.debezium.storage.jdbc.history.JdbcSchemaHistory.JDBC_USER;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.Testing;
public class JdbcSchemaHistoryIT extends AbstractConnectorTest {
private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("schema-history.db").toAbsolutePath();
private static final String USER = "debezium";
private static final String PASSWORD = "dbz";
private static final String PRIVILEGED_USER = "mysqluser";
private static final String PRIVILEGED_PASSWORD = "mysqlpassword";
private static final String ROOT_PASSWORD = "debezium";
private static final String DBNAME = "inventory";
private static final String IMAGE = "debezium/example-mysql";
private static final Integer PORT = 3306;
private static final String TOPIC_PREFIX = "test";
private static final String TABLE_NAME = "schematest";
private static final GenericContainer<?> container = new GenericContainer<>(IMAGE)
.waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2))
.withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD)
.withEnv("MYSQL_USER", PRIVILEGED_USER)
.withEnv("MYSQL_PASSWORD", PRIVILEGED_PASSWORD)
.withExposedPorts(PORT)
.withStartupTimeout(Duration.ofSeconds(180));
@BeforeClass
public static void startDatabase() {
container.start();
}
@AfterClass
public static void stopDatabase() {
container.stop();
}
@Before
public void beforeEach() throws SQLException {
initializeConnectorTestFramework();
Testing.Files.delete(SCHEMA_HISTORY_PATH);
try (MySqlTestConnection conn = testConnection()) {
// System.out.println("DML");
conn.execute(
"DROP TABLE IF EXISTS schematest",
"CREATE TABLE schematest (id INT PRIMARY KEY, val VARCHAR(16))",
"INSERT INTO schematest VALUES (1, 'one'), (2, 'two'), (3, 'three'), (4, 'four')");
}
stopConnector();
}
@After
public void afterEach() throws SQLException {
try {
stopConnector();
}
finally {
Testing.Files.delete(SCHEMA_HISTORY_PATH);
}
try (MySqlTestConnection conn = testConnection()) {
conn.execute("DROP TABLE IF EXISTS schematest");
}
}
private String topicName() {
return String.format("%s.%s.%s", TOPIC_PREFIX, DBNAME, TABLE_NAME);
}
protected Configuration.Builder schemaHistory(Configuration.Builder builder) {
return builder
.with(JDBC_URI, "jdbc:sqlite:" + SCHEMA_HISTORY_PATH)
.with(JDBC_USER, "user")
.with(JDBC_PASSWORD, "pass");
}
private Configuration.Builder config() throws IOException {
File dbFile = File.createTempFile("test-", "db");
String jdbcUri = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath());
final Builder builder = Configuration.create()
.with(MySqlConnectorConfig.HOSTNAME, container.getHost())
.with(MySqlConnectorConfig.PORT, container.getMappedPort(PORT))
.with(MySqlConnectorConfig.USER, USER)
.with(MySqlConnectorConfig.PASSWORD, PASSWORD)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DBNAME)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DBNAME + "." + TABLE_NAME)
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.SCHEMA_HISTORY, JdbcSchemaHistory.class)
.with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(OFFSET_STORAGE_JDBC_URI.name(), jdbcUri)
.with(OFFSET_STORAGE_JDBC_USER.name(), "user")
.with(OFFSET_STORAGE_JDBC_PASSWORD.name(), "pass")
.with(OFFSET_STORAGE_TABLE_NAME.name(), "offsets_jdbc");
return schemaHistory(builder);
}
private MySqlTestConnection testConnection() {
final JdbcConfiguration jdbcConfig = JdbcConfiguration.create()
.withHostname(container.getHost())
.withPort(container.getMappedPort(PORT))
.withUser(PRIVILEGED_USER)
.withPassword(PRIVILEGED_PASSWORD)
.withDatabase(DBNAME)
.build();
return new MySqlTestConnection(jdbcConfig);
}
@Test
public void shouldStreamChanges() throws InterruptedException, IOException {
Configuration config = config().build();
// Start the connector ...
start(MySqlConnector.class, config);
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(4); // 4 DML changes
assertThat(records.topics().size()).isEqualTo(1);
assertThat(records.recordsForTopic(topicName())).hasSize(4);
stopConnector();
}
@Test
public void shouldStreamChangesAfterRestart() throws InterruptedException, SQLException, IOException {
Configuration config = config().build();
// Start the connector ...
start(MySqlConnector.class, config);
waitForStreamingRunning("mysql", TOPIC_PREFIX);
Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(4); // 4 DML changes
assertThat(records.topics().size()).isEqualTo(1);
assertThat(records.recordsForTopic(topicName())).hasSize(4);
try (BufferedReader br = new BufferedReader(new FileReader(String.valueOf(OFFSET_STORE_PATH)))) {
String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
stopConnector();
try (MySqlTestConnection conn = testConnection()) {
conn.execute("INSERT INTO schematest VALUES (5, 'five')");
}
// Start the connector ...
start(MySqlConnector.class, config);
waitForStreamingRunning("mysql", TOPIC_PREFIX);
records = consumeRecordsByTopic(1); // 1 DML change
assertThat(records.topics().size()).isEqualTo(1);
assertThat(records.recordsForTopic(topicName())).hasSize(1);
final SourceRecord record = records.recordsForTopic(topicName()).get(0);
assertThat(((Struct) record.key()).get("id")).isEqualTo(5);
stopConnector();
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.jdbc.history;
import static io.debezium.storage.jdbc.history.JdbcSchemaHistory.JDBC_PASSWORD;
import static io.debezium.storage.jdbc.history.JdbcSchemaHistory.JDBC_URI;
import static io.debezium.storage.jdbc.history.JdbcSchemaHistory.JDBC_USER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Types;
import java.time.Instant;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryMetrics;
import io.debezium.relational.history.TableChanges;
import io.debezium.util.Collect;
/**
* @author Ismail simsek
*/
public class JdbcSchemaHistoryTest {
protected SchemaHistory history;
String dbFile = "/tmp/test.db";
static String databaseName = "db";
static String schemaName = "myschema";
static String ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );";
static Map<String, Object> source;
static Map<String, Object> position;
static TableId tableId;
static Table table;
static TableChanges tableChanges;
static HistoryRecord historyRecord;
static Map<String, Object> position2;
static TableId tableId2;
static Table table2;
static TableChanges tableChanges2;
static HistoryRecord historyRecord2;
static DdlParser ddlParser = new TestingAntlrDdlParser();
static Instant currentInstant = Instant.now();
@BeforeClass
public static void beforeClass() {
source = Collect.linkMapOf("server", "abc");
position = Collect.linkMapOf("file", "x.log", "positionInt", 100, "positionLong", Long.MAX_VALUE, "entry", 1);
tableId = new TableId(databaseName, schemaName, "foo");
table = Table.editor()
.tableId(tableId)
.addColumn(Column.editor()
.name("first")
.jdbcType(Types.VARCHAR)
.type("VARCHAR")
.length(22)
.optional(false)
.create())
.setPrimaryKeyNames("first")
.create();
tableChanges = new TableChanges().create(table);
historyRecord = new HistoryRecord(source, position, databaseName, schemaName, ddl, tableChanges, currentInstant);
//
position2 = Collect.linkMapOf("file", "x.log", "positionInt", 100, "positionLong", Long.MAX_VALUE, "entry", 2);
tableId2 = new TableId(databaseName, schemaName, "bar");
table2 = Table.editor()
.tableId(tableId2)
.addColumn(Column.editor()
.name("first")
.jdbcType(Types.VARCHAR)
.type("VARCHAR")
.length(22)
.optional(false)
.create())
.setPrimaryKeyNames("first")
.create();
tableChanges2 = new TableChanges().create(table2);
historyRecord2 = new HistoryRecord(source, position, databaseName, schemaName, ddl, tableChanges2, currentInstant);
}
@Before
public void beforeEach() {
history = new JdbcSchemaHistory();
history.configure(Configuration.create()
.with(JDBC_URI, "jdbc:sqlite:" + dbFile)
.with(JDBC_USER, "user")
.with(JDBC_PASSWORD, "pass")
.build(), null, SchemaHistoryMetrics.NOOP, true);
history.start();
}
@After
public void afterEach() throws IOException {
if (history != null) {
history.stop();
}
Files.delete(Paths.get(dbFile));
}
@Test
public void shouldNotFailMultipleInitializeStorage() {
history.initializeStorage();
history.initializeStorage();
history.initializeStorage();
assertTrue(history.storageExists());
assertFalse(history.exists());
}
@Test
public void shouldRecordChangesAndRecover() throws InterruptedException {
history.record(source, position, databaseName, schemaName, ddl, tableChanges, currentInstant);
history.record(source, position, databaseName, schemaName, ddl, tableChanges, currentInstant);
Tables tables = new Tables();
history.recover(source, position, tables, ddlParser);
assertEquals(tables.size(), 1);
assertEquals(tables.forTable(tableId), table);
history.record(source, position2, databaseName, schemaName, ddl, tableChanges2, currentInstant);
history.record(source, position2, databaseName, schemaName, ddl, tableChanges2, currentInstant);
history.stop();
// after restart, it should recover history correctly
JdbcSchemaHistory history2 = new JdbcSchemaHistory();
history2.configure(Configuration.create()
.with(JDBC_URI, "jdbc:sqlite:" + dbFile)
.with(JDBC_USER, "user")
.with(JDBC_PASSWORD, "pass")
.build(), null, SchemaHistoryMetrics.NOOP, true);
history2.start();
assertTrue(history2.storageExists());
assertTrue(history2.exists());
Tables tables2 = new Tables();
history2.recover(source, position2, tables2, ddlParser);
assertEquals(tables2.size(), 2);
assertEquals(tables2.forTable(tableId2), table2);
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.jdbc.history;
import io.debezium.relational.SystemVariables;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.AbstractDdlParser;
/**
* @author Ismail simsek
*/
public class TestingAntlrDdlParser extends AbstractDdlParser {
public TestingAntlrDdlParser() {
super(false, false);
}
@Override
protected SystemVariables createNewSystemVariablesInstance() {
return null;
}
@Override
public void parse(String ddlContent, Tables databaseTables) {
}
}

View File

@ -24,5 +24,6 @@
<module>debezium-storage-tests</module>
<module>debezium-storage-rocketmq</module>
<module>debezium-storage-azure-blob</module>
<module>debezium-storage-jdbc</module>
</modules>
</project>