DBZ-5402 debezium-storage-s3 module

This commit is contained in:
Hossein Torabi 2022-07-19 17:26:28 +02:00 committed by Jiri Pechanec
parent 9d38deb12b
commit 1004f803a5
7 changed files with 722 additions and 7 deletions

View File

@ -37,6 +37,7 @@ jobs:
mysql-ddl-parser-changed: ${{ steps.changed-files-mysql-ddl-parser.outputs.any_changed }}
oracle-ddl-parser-changed: ${{ steps.changed-files-oracle-ddl-parser.outputs.any_changed }}
documentation-only-changed: ${{ steps.changed-files-documentation.outputs.only_changed}}
storage-only-changed: ${{ steps.changed-files-storage.outputs.only_changed}}
steps:
- name: Checkout Action
uses: actions/checkout@v3
@ -187,6 +188,13 @@ jobs:
files: |
documentation/**
- name: Get modified files (Storage)
id: changed-files-storage
uses: tj-actions/changed-files@v14.3
with:
files: |
debezium-storage/**
build_cache:
name: "Dependency Cache"
needs: [file_changes]
@ -620,6 +628,38 @@ jobs:
-Dmaven.wagon.http.pool=false
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
build_storage:
needs: [check_style, file_changes]
if: ${{ needs.file_changes.outputs.common-changed == 'true' || needs.file_changes.outputs.mysql-changed == 'true' || needs.file_changes.outputs.mysql-ddl-parser-changed == 'true' || needs.file_changes.outputs.storage-changed == 'true' }}
name: "Storage"
runs-on: ubuntu-latest
steps:
- name: Checkout Action
uses: actions/checkout@v2
- name: Set up Java 17
uses: actions/setup-java@v2
with:
distribution: 'temurin'
java-version: 17
- name: Cache Maven Repository
uses: actions/cache@v2
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
restore-keys: |
maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
- name: Build Debezium Storage
run: >
./mvnw clean install -B -f debezium-storage/pom.xml -Passembly
-Dcheckstyle.skip=true
-Dformat.skip=true
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
-Dmaven.wagon.http.pool=false
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
build_cassandra:
needs: [check_style, file_changes]
if: ${{ needs.file_changes.outputs.common-changed == 'true' }}
@ -714,7 +754,7 @@ jobs:
- name: Build Debezium (Core)
run: >
./core/mvnw clean install -f core/pom.xml
-pl debezium-assembly-descriptors,debezium-bom,debezium-core,debezium-embedded,:debezium-storage-file,:debezium-storage-kafka,:debezium-storage-redis,:debezium-ide-configs,:debezium-checkstyle,:debezium-revapi
-pl debezium-assembly-descriptors,debezium-bom,debezium-core,debezium-embedded,:debezium-ide-configs,:debezium-checkstyle,:debezium-revapi
-am
-DskipTests=true
-DskipITs=true

View File

@ -37,8 +37,12 @@
<version.graalvm.js>20.0.0</version.graalvm.js>
<!-- Storages -->
<!-- Redis -->
<version.jedis>4.1.1</version.jedis>
<!-- S3 -->
<version.s3>2.17.241</version.s3>
<!-- Testing -->
<version.junit>4.13.1</version.junit>
<version.fest>1.4</version.fest>
@ -424,12 +428,21 @@
</dependency>
<!--Storages -->
<!-- Redis Storage -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${version.jedis}</version>
</dependency>
<!-- S3 Storage -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${version.s3}</version>
</dependency>
<!-- Testing utilities -->
<dependency>
<groupId>org.testcontainers</groupId>
@ -537,11 +550,26 @@
<artifactId>debezium-storage-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-s3</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-file</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-redis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-tests</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-scripting</artifactId>
@ -633,11 +661,6 @@
<artifactId>debezium-quarkus-outbox-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-redis</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Debezium test artifacts -->
<dependency>

View File

@ -0,0 +1,87 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-storage-s3</artifactId>
<name>Debezium Storage S3 Module</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
</dependencies>
<!--
Define several useful profiles
-->
<profiles>
<profile>
<id>assembly</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${version.assembly.plugin}</version>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-assembly-descriptors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>default</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<attach>true</attach> <!-- we want attach & deploy these to Maven -->
<descriptorRefs>
<descriptorRef>${assembly.descriptor}</descriptorRef>
</descriptorRefs>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,291 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.s3.history;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.util.FunctionalReadWriteLock;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.Bucket;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
/** A {@link SchemaHistory} implementation that records schema changes as normal {@link SourceRecord}s on the specified topic,
* and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
*
* This implementation provides caching {@link HistoryRecord} on the main memory in the case of recovering records.
* Since S3 does not support Append operation on the object level. {@link S3SchemaHistory#start()} fetches history log
* from S3 and store the {@link HistoryRecord} on the main memory. Also {@link S3SchemaHistory#storeRecord(HistoryRecord)}
* creates new history object everytime invokes on S3
*
* @author hossein.torabi
*/
@NotThreadSafe
public class S3SchemaHistory extends AbstractSchemaHistory {
private static final Logger LOGGER = LoggerFactory.getLogger(S3SchemaHistory.class);
public static final String ACCESS_KEY_ID_CONFIG = "s3.access.key.id";
public static final String SECRET_ACCESS_KEY_CONFIG = "s3.secret.access.key";
public static final String REGION_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.region.name";
public static final String BUCKET_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.bucket.name";
public static final String ENDPOINT_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.endpoint";
public static final String OBJECT_CONTENT_TYPE = "text/plain";
public static final Field ACCESS_KEY_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + ACCESS_KEY_ID_CONFIG)
.withDisplayName("S3 access key id")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withDefault("")
.withImportance(Importance.HIGH);
public static final Field SECRET_ACCESS_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + SECRET_ACCESS_KEY_CONFIG)
.withDisplayName("S3 secret access key")
.withType(Type.PASSWORD)
.withWidth(Width.LONG)
.withDefault("")
.withImportance(Importance.HIGH);
public static final Field REGION = Field.create(REGION_CONFIG)
.withDisplayName("S3 region")
.withWidth(Width.LONG)
.withType(Type.STRING)
.withImportance(Importance.MEDIUM);
public static final Field BUCKET = Field.create(BUCKET_CONFIG)
.withDisplayName("S3 object")
.withType(Type.STRING)
.withImportance(Importance.HIGH);
public static final Field ENDPOINT = Field.create(ENDPOINT_CONFIG)
.withDefault("S3 endpoint")
.withType(Type.STRING)
.withImportance(Importance.LOW);
public static final Field.Set ALL_FIELDS = Field.setOf(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION, BUCKET, ENDPOINT);
private final AtomicBoolean running = new AtomicBoolean();
private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
private final DocumentWriter writer = DocumentWriter.defaultWriter();
private final DocumentReader reader = DocumentReader.defaultReader();
private final String objectName = String.format("db-history-%s.log", Thread.currentThread().getName());
private String bucket = null;
private Region region = null;
private URI endpoint = null;
private AwsCredentialsProvider credentialsProvider = null;
private volatile S3Client client = null;
private volatile List<HistoryRecord> records = new ArrayList<>();
@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) {
throw new SchemaHistoryException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
bucket = config.getString(BUCKET);
region = Region.of(config.getString(REGION));
endpoint = URI.create(config.getString(ENDPOINT));
if (config.getString(ACCESS_KEY_ID).isEmpty() && config.getString(SECRET_ACCESS_KEY).isEmpty()) {
credentialsProvider = DefaultCredentialsProvider.create();
}
else {
AwsCredentials credentials = AwsBasicCredentials.create(config.getString(ACCESS_KEY_ID), config.getString(SECRET_ACCESS_KEY));
credentialsProvider = StaticCredentialsProvider.create(credentials);
}
}
@Override
public synchronized void start() {
if (client == null) {
S3ClientBuilder clientBuilder = S3Client.builder().credentialsProvider(credentialsProvider)
.region(region);
if (endpoint != null) {
clientBuilder.endpointOverride(endpoint);
}
client = clientBuilder.build();
}
lock.write(() -> {
if (running.compareAndSet(false, true)) {
if (!storageExists()) {
initializeStorage();
}
InputStream objectInputStream = null;
try {
GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(objectName).responseContentType(OBJECT_CONTENT_TYPE).build();
objectInputStream = client.getObject(request, ResponseTransformer.toInputStream());
}
catch (NoSuchKeyException e) {
// do nothing
}
catch (S3Exception e) {
throw new SchemaHistoryException("Can not retrieve history object from S3", e);
}
if (objectInputStream != null) {
try (BufferedReader historyReader = new BufferedReader(new InputStreamReader(objectInputStream, StandardCharsets.UTF_8))) {
while (true) {
String line = historyReader.readLine();
if (line == null) {
break;
}
if (!line.isEmpty()) {
records.add(new HistoryRecord(reader.read(line)));
}
}
}
catch (IOException e) {
throw new SchemaHistoryException("Unable to read object content", e);
}
}
}
});
super.start();
}
@Override
public synchronized void stop() {
if (running.compareAndSet(true, false)) {
if (client != null) {
client.close();
}
}
super.stop();
}
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (client == null) {
throw new IllegalStateException("No S3 client is available. Ensure that 'start()' is called before storing database history records.");
}
if (record == null) {
return;
}
LOGGER.trace("Storing record into database history: {}", record);
lock.write(() -> {
if (!running.get()) {
throw new IllegalStateException("The history has been stopped and will not accept more records");
}
records.add(record);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (BufferedWriter historyWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) {
records.forEach(r -> {
String line = null;
try {
line = writer.write(r.document());
}
catch (IOException e) {
LOGGER.error("Failed to convert record to string: {}", r, e);
}
if (line != null) {
try {
historyWriter.newLine();
historyWriter.append(line);
}
catch (IOException e) {
LOGGER.error("Failed to add record {} to history", r, e);
return;
}
}
});
}
catch (IOException e) {
LOGGER.error("Failed to convert record to string: {}", record, e);
}
try {
PutObjectRequest request = PutObjectRequest.builder()
.bucket(bucket)
.key(objectName)
.contentType(OBJECT_CONTENT_TYPE)
.build();
client.putObject(request, RequestBody.fromBytes(outputStream.toByteArray()));
}
catch (S3Exception e) {
throw new SchemaHistoryException("can not store record to S3", e);
}
});
}
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> this.records.forEach(records));
}
@Override
public boolean exists() {
return !records.isEmpty();
}
@Override
public boolean storageExists() {
return client.listBuckets().buckets().stream().map(Bucket::name).collect(Collectors.toList()).contains(config.getString(bucket));
}
@Override
public void initializeStorage() {
client.createBucket(CreateBucketRequest.builder().bucket(bucket).build());
}
@Override
public String toString() {
return "S3";
}
}

View File

@ -0,0 +1,140 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-storage-tests</artifactId>
<name>Debezium Storage Tests Module</name>
<packaging>jar</packaging>
<properties>
<!-- S3 -->
<version.s3mock>2.4.14</version.s3mock>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.adobe.testing</groupId>
<artifactId>s3mock-testcontainers</artifactId>
<version>${version.s3mock}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-ddl-parser</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>test</scope>
</dependency>
<!-- Test libs -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
</dependency>
<!-- Storage modules -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-s3</artifactId>
<scope>test</scope>
</dependency>
<!-- Source system -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<scope>test</scope>
</dependency>
<!-- Target systems -->
<!-- S3 storage -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.adobe.testing</groupId>
<artifactId>s3mock-testcontainers</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemPropertyVariables>
<!-- Make these available to the tests via system properties -->
<tag.smock>${version.s3mock}</tag.smock>
</systemPropertyVariables>
<runOrder>alphabetical</runOrder>
<argLine>${debug.argline} ${modules.argline} ${test.argline}</argLine>
<useSystemClassLoader>${useSystemClassLoader}</useSystemClassLoader>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,133 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.s3.history;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.http.entity.ContentType;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
import io.debezium.config.Configuration;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.AbstractSchemaHistoryTest;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryListener;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Bucket;
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
public class S3SchemaHistoryIT extends AbstractSchemaHistoryTest {
final public static String IMAGE_TAG = System.getProperty("tag.smock");
final public static String BUCKET = "debezium";
final public static String OBJECT_NAME = String.format("db-history-%s.log", Thread.currentThread().getName());
final private static S3MockContainer container = new S3MockContainer(IMAGE_TAG);
private static S3Client client;
@BeforeClass
public static void startS3() {
container.start();
client = S3Client.builder()
.credentialsProvider(AnonymousCredentialsProvider.create())
.region(Region.AWS_GLOBAL)
.endpointOverride(URI.create(container.getHttpEndpoint())).build();
}
@AfterClass
public static void stopS3() {
container.stop();
}
@Override
public void afterEach() {
if (client.listBuckets().buckets().stream().map(Bucket::name).collect(Collectors.toList()).contains(BUCKET)) {
client.deleteObject(DeleteObjectRequest.builder().bucket(BUCKET).key(OBJECT_NAME).build());
client.deleteBucket(DeleteBucketRequest.builder().bucket(BUCKET).build());
}
super.afterEach();
}
@Override
protected SchemaHistory createHistory() {
SchemaHistory history = new S3SchemaHistory();
Configuration config = Configuration.create()
.with(S3SchemaHistory.ACCESS_KEY_ID, "")
.with(S3SchemaHistory.SECRET_ACCESS_KEY, "")
.with(S3SchemaHistory.BUCKET_CONFIG, BUCKET)
.with(S3SchemaHistory.REGION_CONFIG, Region.AWS_GLOBAL)
.with(S3SchemaHistory.ENDPOINT_CONFIG, container.getHttpEndpoint())
.build();
history.configure(config, null, SchemaHistoryListener.NOOP, true);
history.start();
return history;
}
@Test
public void InitializeStorageShouldCreateBucket() {
if (client.listBuckets().buckets().stream().map(Bucket::name).collect(Collectors.toList()).contains(BUCKET)) {
client.deleteBucket(DeleteBucketRequest.builder().bucket(BUCKET).build());
}
history.initializeStorage();
Assert.assertTrue(client.listBuckets().buckets().stream().map(Bucket::name).collect(Collectors.toList()).contains(BUCKET));
}
@Test
public void StoreRecordShouldSaveRecordsInS3() throws IOException {
record(01, 0, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", all, t3, t2, t1, t0);
List<S3Object> s3ObjectList = client.listObjects(ListObjectsRequest.builder().bucket(BUCKET).build()).contents();
Assert.assertEquals(1, s3ObjectList.size());
S3Object s3Object = s3ObjectList.get(0);
Assert.assertEquals(OBJECT_NAME, s3Object.key());
InputStream objectInputStream = client.getObject(
GetObjectRequest.builder().bucket(BUCKET)
.key(OBJECT_NAME)
.responseCacheControl(ContentType.TEXT_PLAIN.getMimeType())
.build(),
ResponseTransformer.toInputStream());
BufferedReader historyReader = new BufferedReader(new InputStreamReader(objectInputStream, StandardCharsets.UTF_8));
DocumentReader reader = DocumentReader.defaultReader();
List<HistoryRecord> historyRecords = new ArrayList<>();
while (true) {
String line = historyReader.readLine();
if (line == null) {
break;
}
historyRecords.add(new HistoryRecord(reader.read(historyReader.readLine())));
}
Assert.assertEquals(1, historyRecords.size());
Assert.assertEquals("CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", historyRecords.get(0).document().getString("ddl"));
Assert.assertEquals(1, historyRecords.get(0).document().getDocument("position").getInteger("position").intValue());
Assert.assertEquals(0, historyRecords.get(0).document().getDocument("position").getInteger("entry").intValue());
}
}

View File

@ -20,6 +20,7 @@
<module>debezium-storage-kafka</module>
<module>debezium-storage-file</module>
<module>debezium-storage-redis</module>
<module>debezium-storage-s3</module>
<module>debezium-storage-tests</module>
</modules>
</project>