DBZ-6209 Extract additional common code

This commit is contained in:
Jiri Pechanec 2023-04-26 15:55:02 +02:00
parent 15026cc444
commit 22e19c0ff3
4 changed files with 149 additions and 168 deletions

View File

@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,7 +34,7 @@ public abstract class AbstractFileBasedSchemaHistory extends AbstractSchemaHisto
protected final DocumentWriter documentWriter = DocumentWriter.defaultWriter();
protected final DocumentReader documentReader = DocumentReader.defaultReader();
private volatile List<HistoryRecord> records = new ArrayList<>();
protected volatile List<HistoryRecord> records = new ArrayList<>();
public AbstractFileBasedSchemaHistory() {
}
@ -79,4 +80,70 @@ protected byte[] fromHistoryRecord(HistoryRecord record) {
protected List<HistoryRecord> getRecords() {
return records;
}
@Override
public synchronized void start() {
doPreStart();
lock.write(() -> {
if (running.compareAndSet(false, true)) {
if (!storageExists()) {
initializeStorage();
}
doStart();
}
});
super.start();
}
@Override
public synchronized void stop() {
if (running.compareAndSet(true, false)) {
doStop();
}
super.stop();
}
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
doPreStoreRecord(record);
if (record == null) {
return;
}
lock.write(() -> {
if (!running.get()) {
throw new SchemaHistoryException("The history has been stopped and will not accept more records");
}
doStoreRecord(record);
});
}
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> getRecords().forEach(records));
}
@Override
public boolean exists() {
return !getRecords().isEmpty();
}
protected void doPreStart() {
}
protected void doStart() {
}
protected void doStop() {
}
protected void doPreStoreRecord(HistoryRecord record) {
}
protected void doStoreRecord(HistoryRecord record) {
}
}

View File

@ -7,7 +7,6 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.source.SourceRecord;
@ -87,7 +86,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
}
@Override
public synchronized void start() {
protected void doPreStart() {
if (blobServiceClient == null) {
blobServiceClient = new BlobServiceClientBuilder()
.connectionString(config.getString(ACCOUNT_CONNECTION_STRING))
@ -97,51 +96,28 @@ public synchronized void start() {
blobClient = blobServiceClient.getBlobContainerClient(container)
.getBlobClient(blobName);
}
lock.write(() -> {
if (running.compareAndSet(false, true)) {
if (!storageExists()) {
initializeStorage();
}
}
if (blobClient.exists()) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
blobClient.downloadStream(outputStream);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
toHistoryRecord(inputStream);
}
});
super.start();
}
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
protected void doStart() {
if (blobClient.exists()) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
blobClient.downloadStream(outputStream);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
toHistoryRecord(inputStream);
}
}
@Override
protected void doPreStoreRecord(HistoryRecord record) {
if (blobClient == null) {
throw new SchemaHistoryException("No Blob client is available. Ensure that 'start()' is called before storing database history records.");
}
if (record == null) {
return;
}
lock.write(() -> {
if (!running.get()) {
throw new SchemaHistoryException("The history has been stopped and will not accept more records");
}
ByteArrayInputStream inputStream = new ByteArrayInputStream(fromHistoryRecord(record));
blobClient.upload(inputStream, true);
});
}
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> getRecords().forEach(records));
}
@Override
public boolean exists() {
return !getRecords().isEmpty();
protected void doStoreRecord(HistoryRecord record) {
blobClient.upload(new ByteArrayInputStream(fromHistoryRecord(record)), true);
}
@Override

View File

@ -5,7 +5,6 @@
*/
package io.debezium.storage.file.history;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
@ -14,7 +13,9 @@
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
@ -36,6 +37,7 @@
*/
@ThreadSafe
public final class FileSchemaHistory extends AbstractFileBasedSchemaHistory {
private static final Logger LOGGER = LoggerFactory.getLogger(FileSchemaHistory.class);
public static final Field FILE_PATH = Field.create(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "file.filename")
.withDescription("The path to the file that will be used to record the database schema history")
@ -58,75 +60,38 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
}
@Override
public void start() {
super.start();
lock.write(() -> {
if (running.compareAndSet(false, true)) {
if (!storageExists()) {
initializeStorage();
}
}
});
}
protected void doStoreRecord(HistoryRecord record) {
try {
LOGGER.trace("Storing record into database history: {}", record);
records.add(record);
String line = documentWriter.write(record.document());
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (record == null) {
return;
}
lock.write(() -> {
if (!running.get()) {
throw new IllegalStateException("The history has been stopped and will not accept more records");
}
try {
String line = documentWriter.write(record.document());
// Create a buffered writer to write all of the records, closing the file when there is an error or when
// the thread is no longer supposed to run
try (BufferedWriter historyWriter = Files.newBufferedWriter(path, StandardOpenOption.APPEND)) {
try {
historyWriter.append(line);
historyWriter.newLine();
}
catch (IOException e) {
Loggings.logErrorAndTraceRecord(logger, record, "Failed to add record to history at {}", path, e);
}
try (BufferedWriter historyWriter = Files.newBufferedWriter(path, StandardOpenOption.APPEND)) {
try {
historyWriter.append(line);
historyWriter.newLine();
}
catch (IOException e) {
throw new SchemaHistoryException("Unable to create writer for history file " + path + ": " + e.getMessage(), e);
Loggings.logErrorAndTraceRecord(logger, record, "Failed to add record to history at {}", path, e);
}
}
catch (IOException e) {
Loggings.logErrorAndTraceRecord(logger, record, "Failed to convert record to string", e);
throw new SchemaHistoryException("Unable to create writer for history file " + path + ": " + e.getMessage(), e);
}
});
}
catch (IOException e) {
Loggings.logErrorAndTraceRecord(logger, record, "Failed to convert record to string", e);
}
}
@Override
public void stop() {
running.set(false);
super.stop();
}
@Override
protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> {
if (exists()) {
try (BufferedReader historyReader = Files.newBufferedReader(path)) {
while (true) {
String line = historyReader.readLine();
if (line == null) {
break;
}
if (!line.isEmpty()) {
records.accept(new HistoryRecord(documentReader.read(line)));
}
}
}
catch (IOException e) {
logger.error("Failed to add recover records from history at {}", path, e);
}
}
});
protected void doStart() {
try {
toHistoryRecord(Files.newInputStream(path));
}
catch (IOException e) {
throw new SchemaHistoryException("Can't retrieve file with schema history", e);
}
}
@Override

View File

@ -7,7 +7,6 @@
import java.io.InputStream;
import java.net.URI;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@ -146,7 +145,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
}
@Override
public synchronized void start() {
protected void doPreStart() {
if (client == null) {
S3ClientBuilder clientBuilder = S3Client.builder().credentialsProvider(credentialsProvider).region(region);
if (endpoint != null) {
@ -155,84 +154,58 @@ public synchronized void start() {
client = clientBuilder.build();
}
lock.write(() -> {
if (running.compareAndSet(false, true)) {
if (!storageExists()) {
initializeStorage();
}
InputStream objectInputStream = null;
try {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(bucket)
.key(objectName)
.responseContentType(OBJECT_CONTENT_TYPE)
.build();
objectInputStream = client.getObject(request, ResponseTransformer.toInputStream());
}
catch (NoSuchKeyException e) {
// do nothing
}
catch (S3Exception e) {
throw new SchemaHistoryException("Can't retrieve history object from S3", e);
}
if (objectInputStream != null) {
toHistoryRecord(objectInputStream);
}
}
});
super.start();
}
@Override
public synchronized void stop() {
if (running.compareAndSet(true, false)) {
if (client != null) {
client.close();
}
protected void doStart() {
InputStream objectInputStream = null;
try {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(bucket)
.key(objectName)
.responseContentType(OBJECT_CONTENT_TYPE)
.build();
objectInputStream = client.getObject(request, ResponseTransformer.toInputStream());
}
catch (NoSuchKeyException e) {
// do nothing
}
catch (S3Exception e) {
throw new SchemaHistoryException("Can't retrieve history object from S3", e);
}
super.stop();
if (objectInputStream != null) {
toHistoryRecord(objectInputStream);
}
}
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
public void doStop() {
if (client != null) {
client.close();
}
}
@Override
protected void doPreStoreRecord(HistoryRecord record) {
if (client == null) {
throw new SchemaHistoryException("No S3 client is available. Ensure that 'start()' is called before storing database history records.");
}
if (record == null) {
return;
}
@Override
protected void doStoreRecord(HistoryRecord record) {
try {
PutObjectRequest request = PutObjectRequest.builder()
.bucket(bucket)
.key(objectName)
.contentType(OBJECT_CONTENT_TYPE)
.build();
client.putObject(request, RequestBody.fromBytes(fromHistoryRecord(record)));
}
catch (S3Exception e) {
throw new SchemaHistoryException("Can not store record to S3", e);
}
lock.write(() -> {
if (!running.get()) {
throw new SchemaHistoryException("The history has been stopped and will not accept more records");
}
try {
PutObjectRequest request = PutObjectRequest.builder()
.bucket(bucket)
.key(objectName)
.contentType(OBJECT_CONTENT_TYPE)
.build();
client.putObject(request, RequestBody.fromBytes(fromHistoryRecord(record)));
}
catch (S3Exception e) {
throw new SchemaHistoryException("Can not store record to S3", e);
}
});
}
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> getRecords().forEach(records));
}
@Override
public boolean exists() {
return !getRecords().isEmpty();
}
@Override