From 1004f803a51ee6e603dbc11f552b66215a3a0043 Mon Sep 17 00:00:00 2001 From: Hossein Torabi Date: Tue, 19 Jul 2022 17:26:28 +0200 Subject: [PATCH] DBZ-5402 debezium-storage-s3 module --- .github/workflows/debezium-workflow.yml | 42 ++- debezium-bom/pom.xml | 33 +- debezium-storage/debezium-storage-s3/pom.xml | 87 ++++++ .../storage/s3/history/S3SchemaHistory.java | 291 ++++++++++++++++++ .../debezium-storage-tests/pom.xml | 140 +++++++++ .../storage/s3/history/S3SchemaHistoryIT.java | 133 ++++++++ debezium-storage/pom.xml | 3 +- 7 files changed, 722 insertions(+), 7 deletions(-) create mode 100644 debezium-storage/debezium-storage-s3/pom.xml create mode 100644 debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java create mode 100644 debezium-storage/debezium-storage-tests/pom.xml create mode 100644 debezium-storage/debezium-storage-tests/src/test/java/io/debezium/storage/s3/history/S3SchemaHistoryIT.java diff --git a/.github/workflows/debezium-workflow.yml b/.github/workflows/debezium-workflow.yml index 7a20966e5..c862551be 100644 --- a/.github/workflows/debezium-workflow.yml +++ b/.github/workflows/debezium-workflow.yml @@ -37,6 +37,7 @@ jobs: mysql-ddl-parser-changed: ${{ steps.changed-files-mysql-ddl-parser.outputs.any_changed }} oracle-ddl-parser-changed: ${{ steps.changed-files-oracle-ddl-parser.outputs.any_changed }} documentation-only-changed: ${{ steps.changed-files-documentation.outputs.only_changed}} + storage-only-changed: ${{ steps.changed-files-storage.outputs.only_changed}} steps: - name: Checkout Action uses: actions/checkout@v3 @@ -187,6 +188,13 @@ jobs: files: | documentation/** + - name: Get modified files (Storage) + id: changed-files-storage + uses: tj-actions/changed-files@v14.3 + with: + files: | + debezium-storage/** + build_cache: name: "Dependency Cache" needs: [file_changes] @@ -620,6 +628,38 @@ jobs: -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 + build_storage: + needs: [check_style, file_changes] + if: ${{ needs.file_changes.outputs.common-changed == 'true' || needs.file_changes.outputs.mysql-changed == 'true' || needs.file_changes.outputs.mysql-ddl-parser-changed == 'true' || needs.file_changes.outputs.storage-changed == 'true' }} + name: "Storage" + runs-on: ubuntu-latest + steps: + - name: Checkout Action + uses: actions/checkout@v2 + + - name: Set up Java 17 + uses: actions/setup-java@v2 + with: + distribution: 'temurin' + java-version: 17 + + - name: Cache Maven Repository + uses: actions/cache@v2 + with: + path: ~/.m2/repository + key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} + restore-keys: | + maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} + + - name: Build Debezium Storage + run: > + ./mvnw clean install -B -f debezium-storage/pom.xml -Passembly + -Dcheckstyle.skip=true + -Dformat.skip=true + -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn + -Dmaven.wagon.http.pool=false + -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 + build_cassandra: needs: [check_style, file_changes] if: ${{ needs.file_changes.outputs.common-changed == 'true' }} @@ -714,7 +754,7 @@ jobs: - name: Build Debezium (Core) run: > ./core/mvnw clean install -f core/pom.xml - -pl debezium-assembly-descriptors,debezium-bom,debezium-core,debezium-embedded,:debezium-storage-file,:debezium-storage-kafka,:debezium-storage-redis,:debezium-ide-configs,:debezium-checkstyle,:debezium-revapi + -pl debezium-assembly-descriptors,debezium-bom,debezium-core,debezium-embedded,:debezium-ide-configs,:debezium-checkstyle,:debezium-revapi -am -DskipTests=true -DskipITs=true diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml index f334e7e6e..80bce464e 100644 --- a/debezium-bom/pom.xml +++ b/debezium-bom/pom.xml @@ -37,8 +37,12 @@ 20.0.0 + 4.1.1 + + 2.17.241 + 4.13.1 1.4 @@ -424,12 +428,21 @@ + redis.clients jedis ${version.jedis} + + + software.amazon.awssdk + s3 + ${version.s3} + + + org.testcontainers @@ -537,11 +550,26 @@ debezium-storage-kafka ${project.version} + + io.debezium + debezium-storage-s3 + ${project.version} + io.debezium debezium-storage-file ${project.version} + + io.debezium + debezium-storage-redis + ${project.version} + + + io.debezium + debezium-storage-tests + ${project.version} + io.debezium debezium-scripting @@ -633,11 +661,6 @@ debezium-quarkus-outbox-deployment ${project.version} - - io.debezium - debezium-storage-redis - ${project.version} - diff --git a/debezium-storage/debezium-storage-s3/pom.xml b/debezium-storage/debezium-storage-s3/pom.xml new file mode 100644 index 000000000..8ef5aefe6 --- /dev/null +++ b/debezium-storage/debezium-storage-s3/pom.xml @@ -0,0 +1,87 @@ + + + + io.debezium + debezium-storage + 2.2.0-SNAPSHOT + ../pom.xml + + 4.0.0 + debezium-storage-s3 + Debezium Storage S3 Module + jar + + + + io.debezium + debezium-api + provided + + + io.debezium + debezium-core + provided + + + org.slf4j + slf4j-api + provided + + + org.apache.kafka + connect-api + provided + + + + software.amazon.awssdk + s3 + + + + + + + assembly + + false + + + + + org.apache.maven.plugins + maven-assembly-plugin + ${version.assembly.plugin} + + + io.debezium + debezium-assembly-descriptors + ${project.version} + + + + + default + package + + single + + + ${project.artifactId}-${project.version} + true + + ${assembly.descriptor} + + posix + + + + + + + + + + diff --git a/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java b/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java new file mode 100644 index 000000000..f1830942f --- /dev/null +++ b/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java @@ -0,0 +1,291 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.storage.s3.history; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.NotThreadSafe; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.relational.history.AbstractSchemaHistory; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryException; +import io.debezium.relational.history.SchemaHistoryListener; +import io.debezium.util.FunctionalReadWriteLock; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.Bucket; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; + +/** A {@link SchemaHistory} implementation that records schema changes as normal {@link SourceRecord}s on the specified topic, + * and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic. + * + * This implementation provides caching {@link HistoryRecord} on the main memory in the case of recovering records. + * Since S3 does not support Append operation on the object level. {@link S3SchemaHistory#start()} fetches history log + * from S3 and store the {@link HistoryRecord} on the main memory. Also {@link S3SchemaHistory#storeRecord(HistoryRecord)} + * creates new history object everytime invokes on S3 + * + * @author hossein.torabi + */ +@NotThreadSafe +public class S3SchemaHistory extends AbstractSchemaHistory { + private static final Logger LOGGER = LoggerFactory.getLogger(S3SchemaHistory.class); + public static final String ACCESS_KEY_ID_CONFIG = "s3.access.key.id"; + public static final String SECRET_ACCESS_KEY_CONFIG = "s3.secret.access.key"; + public static final String REGION_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.region.name"; + public static final String BUCKET_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.bucket.name"; + public static final String ENDPOINT_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.endpoint"; + public static final String OBJECT_CONTENT_TYPE = "text/plain"; + + public static final Field ACCESS_KEY_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + ACCESS_KEY_ID_CONFIG) + .withDisplayName("S3 access key id") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withDefault("") + .withImportance(Importance.HIGH); + + public static final Field SECRET_ACCESS_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + SECRET_ACCESS_KEY_CONFIG) + .withDisplayName("S3 secret access key") + .withType(Type.PASSWORD) + .withWidth(Width.LONG) + .withDefault("") + .withImportance(Importance.HIGH); + + public static final Field REGION = Field.create(REGION_CONFIG) + .withDisplayName("S3 region") + .withWidth(Width.LONG) + .withType(Type.STRING) + .withImportance(Importance.MEDIUM); + + public static final Field BUCKET = Field.create(BUCKET_CONFIG) + .withDisplayName("S3 object") + .withType(Type.STRING) + .withImportance(Importance.HIGH); + + public static final Field ENDPOINT = Field.create(ENDPOINT_CONFIG) + .withDefault("S3 endpoint") + .withType(Type.STRING) + .withImportance(Importance.LOW); + + public static final Field.Set ALL_FIELDS = Field.setOf(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION, BUCKET, ENDPOINT); + + private final AtomicBoolean running = new AtomicBoolean(); + private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant(); + private final DocumentWriter writer = DocumentWriter.defaultWriter(); + private final DocumentReader reader = DocumentReader.defaultReader(); + private final String objectName = String.format("db-history-%s.log", Thread.currentThread().getName()); + + private String bucket = null; + private Region region = null; + private URI endpoint = null; + private AwsCredentialsProvider credentialsProvider = null; + + private volatile S3Client client = null; + private volatile List records = new ArrayList<>(); + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + super.configure(config, comparator, listener, useCatalogBeforeSchema); + if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) { + throw new SchemaHistoryException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); + } + + bucket = config.getString(BUCKET); + region = Region.of(config.getString(REGION)); + endpoint = URI.create(config.getString(ENDPOINT)); + + if (config.getString(ACCESS_KEY_ID).isEmpty() && config.getString(SECRET_ACCESS_KEY).isEmpty()) { + credentialsProvider = DefaultCredentialsProvider.create(); + } + else { + AwsCredentials credentials = AwsBasicCredentials.create(config.getString(ACCESS_KEY_ID), config.getString(SECRET_ACCESS_KEY)); + credentialsProvider = StaticCredentialsProvider.create(credentials); + } + + } + + @Override + public synchronized void start() { + if (client == null) { + S3ClientBuilder clientBuilder = S3Client.builder().credentialsProvider(credentialsProvider) + .region(region); + if (endpoint != null) { + clientBuilder.endpointOverride(endpoint); + } + + client = clientBuilder.build(); + } + + lock.write(() -> { + if (running.compareAndSet(false, true)) { + if (!storageExists()) { + initializeStorage(); + } + + InputStream objectInputStream = null; + try { + GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(objectName).responseContentType(OBJECT_CONTENT_TYPE).build(); + objectInputStream = client.getObject(request, ResponseTransformer.toInputStream()); + } + catch (NoSuchKeyException e) { + // do nothing + } + catch (S3Exception e) { + throw new SchemaHistoryException("Can not retrieve history object from S3", e); + } + + if (objectInputStream != null) { + try (BufferedReader historyReader = new BufferedReader(new InputStreamReader(objectInputStream, StandardCharsets.UTF_8))) { + while (true) { + String line = historyReader.readLine(); + if (line == null) { + break; + } + if (!line.isEmpty()) { + records.add(new HistoryRecord(reader.read(line))); + } + } + } + catch (IOException e) { + throw new SchemaHistoryException("Unable to read object content", e); + } + } + } + }); + super.start(); + } + + @Override + public synchronized void stop() { + if (running.compareAndSet(true, false)) { + if (client != null) { + client.close(); + } + } + + super.stop(); + } + + @Override + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { + if (client == null) { + throw new IllegalStateException("No S3 client is available. Ensure that 'start()' is called before storing database history records."); + } + if (record == null) { + return; + } + + LOGGER.trace("Storing record into database history: {}", record); + lock.write(() -> { + if (!running.get()) { + throw new IllegalStateException("The history has been stopped and will not accept more records"); + } + + records.add(record); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (BufferedWriter historyWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) { + records.forEach(r -> { + String line = null; + try { + line = writer.write(r.document()); + } + catch (IOException e) { + LOGGER.error("Failed to convert record to string: {}", r, e); + } + + if (line != null) { + try { + historyWriter.newLine(); + historyWriter.append(line); + } + catch (IOException e) { + LOGGER.error("Failed to add record {} to history", r, e); + return; + } + } + }); + } + catch (IOException e) { + LOGGER.error("Failed to convert record to string: {}", record, e); + } + + try { + PutObjectRequest request = PutObjectRequest.builder() + .bucket(bucket) + .key(objectName) + .contentType(OBJECT_CONTENT_TYPE) + .build(); + client.putObject(request, RequestBody.fromBytes(outputStream.toByteArray())); + } + catch (S3Exception e) { + throw new SchemaHistoryException("can not store record to S3", e); + } + + }); + } + + @Override + protected void recoverRecords(Consumer records) { + lock.write(() -> this.records.forEach(records)); + } + + @Override + public boolean exists() { + return !records.isEmpty(); + } + + @Override + public boolean storageExists() { + return client.listBuckets().buckets().stream().map(Bucket::name).collect(Collectors.toList()).contains(config.getString(bucket)); + } + + @Override + public void initializeStorage() { + client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); + } + + @Override + public String toString() { + return "S3"; + } +} diff --git a/debezium-storage/debezium-storage-tests/pom.xml b/debezium-storage/debezium-storage-tests/pom.xml new file mode 100644 index 000000000..4b96a8606 --- /dev/null +++ b/debezium-storage/debezium-storage-tests/pom.xml @@ -0,0 +1,140 @@ + + + + io.debezium + debezium-storage + 2.2.0-SNAPSHOT + ../pom.xml + + 4.0.0 + debezium-storage-tests + Debezium Storage Tests Module + jar + + + + 2.4.14 + + + + + + com.adobe.testing + s3mock-testcontainers + ${version.s3mock} + test + + + + + + + io.debezium + debezium-api + test + + + io.debezium + debezium-core + test-jar + + + io.debezium + debezium-ddl-parser + test + + + org.slf4j + slf4j-api + test + + + org.apache.kafka + connect-api + test + + + + + junit + junit + test + + + org.assertj + assertj-core + test + + + org.easytesting + fest-assert + + + + + io.debezium + debezium-storage-s3 + test + + + + + io.debezium + debezium-connector-mysql + test-jar + + + io.debezium + debezium-connector-mysql + test + + + + + + software.amazon.awssdk + s3 + test + + + com.adobe.testing + s3mock-testcontainers + test + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + integration-test + + integration-test + + + + verify + + verify + + + + + ${skipITs} + true + + + ${version.s3mock} + + alphabetical + ${debug.argline} ${modules.argline} ${test.argline} + ${useSystemClassLoader} + + + + + + diff --git a/debezium-storage/debezium-storage-tests/src/test/java/io/debezium/storage/s3/history/S3SchemaHistoryIT.java b/debezium-storage/debezium-storage-tests/src/test/java/io/debezium/storage/s3/history/S3SchemaHistoryIT.java new file mode 100644 index 000000000..e36c4d80a --- /dev/null +++ b/debezium-storage/debezium-storage-tests/src/test/java/io/debezium/storage/s3/history/S3SchemaHistoryIT.java @@ -0,0 +1,133 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.storage.s3.history; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.http.entity.ContentType; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.adobe.testing.s3mock.testcontainers.S3MockContainer; + +import io.debezium.config.Configuration; +import io.debezium.document.DocumentReader; +import io.debezium.relational.history.AbstractSchemaHistoryTest; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryListener; + +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.Bucket; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.S3Object; + +public class S3SchemaHistoryIT extends AbstractSchemaHistoryTest { + final public static String IMAGE_TAG = System.getProperty("tag.smock"); + final public static String BUCKET = "debezium"; + + final public static String OBJECT_NAME = String.format("db-history-%s.log", Thread.currentThread().getName()); + + final private static S3MockContainer container = new S3MockContainer(IMAGE_TAG); + private static S3Client client; + + @BeforeClass + public static void startS3() { + container.start(); + client = S3Client.builder() + .credentialsProvider(AnonymousCredentialsProvider.create()) + .region(Region.AWS_GLOBAL) + .endpointOverride(URI.create(container.getHttpEndpoint())).build(); + } + + @AfterClass + public static void stopS3() { + container.stop(); + } + + @Override + public void afterEach() { + if (client.listBuckets().buckets().stream().map(Bucket::name).collect(Collectors.toList()).contains(BUCKET)) { + client.deleteObject(DeleteObjectRequest.builder().bucket(BUCKET).key(OBJECT_NAME).build()); + client.deleteBucket(DeleteBucketRequest.builder().bucket(BUCKET).build()); + } + super.afterEach(); + } + + @Override + protected SchemaHistory createHistory() { + SchemaHistory history = new S3SchemaHistory(); + Configuration config = Configuration.create() + .with(S3SchemaHistory.ACCESS_KEY_ID, "") + .with(S3SchemaHistory.SECRET_ACCESS_KEY, "") + .with(S3SchemaHistory.BUCKET_CONFIG, BUCKET) + .with(S3SchemaHistory.REGION_CONFIG, Region.AWS_GLOBAL) + .with(S3SchemaHistory.ENDPOINT_CONFIG, container.getHttpEndpoint()) + .build(); + history.configure(config, null, SchemaHistoryListener.NOOP, true); + history.start(); + return history; + } + + @Test + public void InitializeStorageShouldCreateBucket() { + if (client.listBuckets().buckets().stream().map(Bucket::name).collect(Collectors.toList()).contains(BUCKET)) { + client.deleteBucket(DeleteBucketRequest.builder().bucket(BUCKET).build()); + } + history.initializeStorage(); + + Assert.assertTrue(client.listBuckets().buckets().stream().map(Bucket::name).collect(Collectors.toList()).contains(BUCKET)); + } + + @Test + public void StoreRecordShouldSaveRecordsInS3() throws IOException { + record(01, 0, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", all, t3, t2, t1, t0); + List s3ObjectList = client.listObjects(ListObjectsRequest.builder().bucket(BUCKET).build()).contents(); + Assert.assertEquals(1, s3ObjectList.size()); + + S3Object s3Object = s3ObjectList.get(0); + Assert.assertEquals(OBJECT_NAME, s3Object.key()); + + InputStream objectInputStream = client.getObject( + GetObjectRequest.builder().bucket(BUCKET) + .key(OBJECT_NAME) + .responseCacheControl(ContentType.TEXT_PLAIN.getMimeType()) + .build(), + ResponseTransformer.toInputStream()); + BufferedReader historyReader = new BufferedReader(new InputStreamReader(objectInputStream, StandardCharsets.UTF_8)); + DocumentReader reader = DocumentReader.defaultReader(); + List historyRecords = new ArrayList<>(); + while (true) { + String line = historyReader.readLine(); + if (line == null) { + break; + } + historyRecords.add(new HistoryRecord(reader.read(historyReader.readLine()))); + } + + Assert.assertEquals(1, historyRecords.size()); + + Assert.assertEquals("CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", historyRecords.get(0).document().getString("ddl")); + Assert.assertEquals(1, historyRecords.get(0).document().getDocument("position").getInteger("position").intValue()); + Assert.assertEquals(0, historyRecords.get(0).document().getDocument("position").getInteger("entry").intValue()); + } +} diff --git a/debezium-storage/pom.xml b/debezium-storage/pom.xml index 2b1297779..93b0feb40 100644 --- a/debezium-storage/pom.xml +++ b/debezium-storage/pom.xml @@ -20,6 +20,7 @@ debezium-storage-kafka debezium-storage-file debezium-storage-redis + debezium-storage-s3 + debezium-storage-tests -