From 22e19c0ff3df7f0e4cf47ef95ddf8fe662265a18 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 26 Apr 2023 15:55:02 +0200 Subject: [PATCH] DBZ-6209 Extract additional common code --- .../AbstractFileBasedSchemaHistory.java | 69 ++++++++++- .../blob/history/AzureBlobSchemaHistory.java | 52 +++------ .../file/history/FileSchemaHistory.java | 87 +++++--------- .../storage/s3/history/S3SchemaHistory.java | 109 +++++++----------- 4 files changed, 149 insertions(+), 168 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/relational/history/AbstractFileBasedSchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/AbstractFileBasedSchemaHistory.java index 9f1678463..2303d6b93 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/AbstractFileBasedSchemaHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/AbstractFileBasedSchemaHistory.java @@ -16,6 +16,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +34,7 @@ public abstract class AbstractFileBasedSchemaHistory extends AbstractSchemaHisto protected final DocumentWriter documentWriter = DocumentWriter.defaultWriter(); protected final DocumentReader documentReader = DocumentReader.defaultReader(); - private volatile List records = new ArrayList<>(); + protected volatile List records = new ArrayList<>(); public AbstractFileBasedSchemaHistory() { } @@ -79,4 +80,70 @@ protected byte[] fromHistoryRecord(HistoryRecord record) { protected List getRecords() { return records; } + + @Override + public synchronized void start() { + doPreStart(); + + lock.write(() -> { + if (running.compareAndSet(false, true)) { + if (!storageExists()) { + initializeStorage(); + } + + doStart(); + } + }); + super.start(); + } + + @Override + public synchronized void stop() { + if (running.compareAndSet(true, false)) { + doStop(); + } + + super.stop(); + } + + @Override + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { + doPreStoreRecord(record); + if (record == null) { + return; + } + + lock.write(() -> { + if (!running.get()) { + throw new SchemaHistoryException("The history has been stopped and will not accept more records"); + } + + doStoreRecord(record); + }); + } + + @Override + protected void recoverRecords(Consumer records) { + lock.write(() -> getRecords().forEach(records)); + } + + @Override + public boolean exists() { + return !getRecords().isEmpty(); + } + + protected void doPreStart() { + } + + protected void doStart() { + } + + protected void doStop() { + } + + protected void doPreStoreRecord(HistoryRecord record) { + } + + protected void doStoreRecord(HistoryRecord record) { + } } diff --git a/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java b/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java index e0a983776..78cb185d7 100644 --- a/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java +++ b/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java @@ -7,7 +7,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.util.function.Consumer; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.source.SourceRecord; @@ -87,7 +86,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator, } @Override - public synchronized void start() { + protected void doPreStart() { if (blobServiceClient == null) { blobServiceClient = new BlobServiceClientBuilder() .connectionString(config.getString(ACCOUNT_CONNECTION_STRING)) @@ -97,51 +96,28 @@ public synchronized void start() { blobClient = blobServiceClient.getBlobContainerClient(container) .getBlobClient(blobName); } - - lock.write(() -> { - if (running.compareAndSet(false, true)) { - if (!storageExists()) { - initializeStorage(); - } - } - - if (blobClient.exists()) { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - blobClient.downloadStream(outputStream); - ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); - toHistoryRecord(inputStream); - } - }); - super.start(); } @Override - protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { + protected void doStart() { + if (blobClient.exists()) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + blobClient.downloadStream(outputStream); + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + toHistoryRecord(inputStream); + } + } + + @Override + protected void doPreStoreRecord(HistoryRecord record) { if (blobClient == null) { throw new SchemaHistoryException("No Blob client is available. Ensure that 'start()' is called before storing database history records."); } - if (record == null) { - return; - } - - lock.write(() -> { - if (!running.get()) { - throw new SchemaHistoryException("The history has been stopped and will not accept more records"); - } - - ByteArrayInputStream inputStream = new ByteArrayInputStream(fromHistoryRecord(record)); - blobClient.upload(inputStream, true); - }); } @Override - protected void recoverRecords(Consumer records) { - lock.write(() -> getRecords().forEach(records)); - } - - @Override - public boolean exists() { - return !getRecords().isEmpty(); + protected void doStoreRecord(HistoryRecord record) { + blobClient.upload(new ByteArrayInputStream(fromHistoryRecord(record)), true); } @Override diff --git a/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java b/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java index ea4f91f09..8ac0f2ad4 100644 --- a/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java +++ b/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java @@ -5,7 +5,6 @@ */ package io.debezium.storage.file.history; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.nio.file.FileAlreadyExistsException; @@ -14,7 +13,9 @@ import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Collection; -import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; import io.debezium.annotation.ThreadSafe; @@ -36,6 +37,7 @@ */ @ThreadSafe public final class FileSchemaHistory extends AbstractFileBasedSchemaHistory { + private static final Logger LOGGER = LoggerFactory.getLogger(FileSchemaHistory.class); public static final Field FILE_PATH = Field.create(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "file.filename") .withDescription("The path to the file that will be used to record the database schema history") @@ -58,75 +60,38 @@ public void configure(Configuration config, HistoryRecordComparator comparator, } @Override - public void start() { - super.start(); - lock.write(() -> { - if (running.compareAndSet(false, true)) { - if (!storageExists()) { - initializeStorage(); - } - } - }); - } + protected void doStoreRecord(HistoryRecord record) { + try { + LOGGER.trace("Storing record into database history: {}", record); + records.add(record); + String line = documentWriter.write(record.document()); - @Override - protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { - if (record == null) { - return; - } - lock.write(() -> { - if (!running.get()) { - throw new IllegalStateException("The history has been stopped and will not accept more records"); - } - try { - String line = documentWriter.write(record.document()); - // Create a buffered writer to write all of the records, closing the file when there is an error or when - // the thread is no longer supposed to run - try (BufferedWriter historyWriter = Files.newBufferedWriter(path, StandardOpenOption.APPEND)) { - try { - historyWriter.append(line); - historyWriter.newLine(); - } - catch (IOException e) { - Loggings.logErrorAndTraceRecord(logger, record, "Failed to add record to history at {}", path, e); - } + try (BufferedWriter historyWriter = Files.newBufferedWriter(path, StandardOpenOption.APPEND)) { + try { + historyWriter.append(line); + historyWriter.newLine(); } catch (IOException e) { - throw new SchemaHistoryException("Unable to create writer for history file " + path + ": " + e.getMessage(), e); + Loggings.logErrorAndTraceRecord(logger, record, "Failed to add record to history at {}", path, e); } } catch (IOException e) { - Loggings.logErrorAndTraceRecord(logger, record, "Failed to convert record to string", e); + throw new SchemaHistoryException("Unable to create writer for history file " + path + ": " + e.getMessage(), e); } - }); + } + catch (IOException e) { + Loggings.logErrorAndTraceRecord(logger, record, "Failed to convert record to string", e); + } } @Override - public void stop() { - running.set(false); - super.stop(); - } - - @Override - protected synchronized void recoverRecords(Consumer records) { - lock.write(() -> { - if (exists()) { - try (BufferedReader historyReader = Files.newBufferedReader(path)) { - while (true) { - String line = historyReader.readLine(); - if (line == null) { - break; - } - if (!line.isEmpty()) { - records.accept(new HistoryRecord(documentReader.read(line))); - } - } - } - catch (IOException e) { - logger.error("Failed to add recover records from history at {}", path, e); - } - } - }); + protected void doStart() { + try { + toHistoryRecord(Files.newInputStream(path)); + } + catch (IOException e) { + throw new SchemaHistoryException("Can't retrieve file with schema history", e); + } } @Override diff --git a/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java b/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java index 56dc5f507..1e856b136 100644 --- a/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java +++ b/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java @@ -7,7 +7,6 @@ import java.io.InputStream; import java.net.URI; -import java.util.function.Consumer; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -146,7 +145,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator, } @Override - public synchronized void start() { + protected void doPreStart() { if (client == null) { S3ClientBuilder clientBuilder = S3Client.builder().credentialsProvider(credentialsProvider).region(region); if (endpoint != null) { @@ -155,84 +154,58 @@ public synchronized void start() { client = clientBuilder.build(); } - - lock.write(() -> { - if (running.compareAndSet(false, true)) { - if (!storageExists()) { - initializeStorage(); - } - - InputStream objectInputStream = null; - try { - GetObjectRequest request = GetObjectRequest.builder() - .bucket(bucket) - .key(objectName) - .responseContentType(OBJECT_CONTENT_TYPE) - .build(); - objectInputStream = client.getObject(request, ResponseTransformer.toInputStream()); - } - catch (NoSuchKeyException e) { - // do nothing - } - catch (S3Exception e) { - throw new SchemaHistoryException("Can't retrieve history object from S3", e); - } - - if (objectInputStream != null) { - toHistoryRecord(objectInputStream); - } - } - }); - super.start(); } @Override - public synchronized void stop() { - if (running.compareAndSet(true, false)) { - if (client != null) { - client.close(); - } + protected void doStart() { + InputStream objectInputStream = null; + try { + GetObjectRequest request = GetObjectRequest.builder() + .bucket(bucket) + .key(objectName) + .responseContentType(OBJECT_CONTENT_TYPE) + .build(); + objectInputStream = client.getObject(request, ResponseTransformer.toInputStream()); + } + catch (NoSuchKeyException e) { + // do nothing + } + catch (S3Exception e) { + throw new SchemaHistoryException("Can't retrieve history object from S3", e); } - super.stop(); + if (objectInputStream != null) { + toHistoryRecord(objectInputStream); + } } @Override - protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { + public void doStop() { + if (client != null) { + client.close(); + } + } + + @Override + protected void doPreStoreRecord(HistoryRecord record) { if (client == null) { throw new SchemaHistoryException("No S3 client is available. Ensure that 'start()' is called before storing database history records."); } - if (record == null) { - return; + } + + @Override + protected void doStoreRecord(HistoryRecord record) { + try { + PutObjectRequest request = PutObjectRequest.builder() + .bucket(bucket) + .key(objectName) + .contentType(OBJECT_CONTENT_TYPE) + .build(); + client.putObject(request, RequestBody.fromBytes(fromHistoryRecord(record))); + } + catch (S3Exception e) { + throw new SchemaHistoryException("Can not store record to S3", e); } - - lock.write(() -> { - if (!running.get()) { - throw new SchemaHistoryException("The history has been stopped and will not accept more records"); - } - - try { - PutObjectRequest request = PutObjectRequest.builder() - .bucket(bucket) - .key(objectName) - .contentType(OBJECT_CONTENT_TYPE) - .build(); - client.putObject(request, RequestBody.fromBytes(fromHistoryRecord(record))); - } - catch (S3Exception e) { - throw new SchemaHistoryException("Can not store record to S3", e); - } - }); - } - - @Override - protected void recoverRecords(Consumer records) { - lock.write(() -> getRecords().forEach(records)); - } - - @Override - public boolean exists() { - return !getRecords().isEmpty(); } @Override