DBZ-6180: debezium-storage-azure-blob

This commit is contained in:
Hossein Torabi 2023-03-06 18:07:40 +01:00 committed by Jiri Pechanec
parent 9e7591641a
commit d4ab16f011
6 changed files with 488 additions and 0 deletions

View File

@ -44,6 +44,9 @@
<!-- S3 -->
<version.s3>2.17.241</version.s3>
<!-- Blob Storage -->
<version.azure.blob>12.21.0</version.azure.blob>
<!-- Testing -->
<version.junit>4.13.1</version.junit>
<version.fest>1.4</version.fest>
@ -454,6 +457,12 @@
<version>${version.s3}</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>${version.azure.blob}</version>
</dependency>
<!-- Testing utilities -->
<dependency>

View File

@ -0,0 +1,113 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-storage-azure-blob</artifactId>
<name>Debezium Storage Azure Blob Module</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!--
Define several useful profiles
-->
<profiles>
<profile>
<id>assembly</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${version.assembly.plugin}</version>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-assembly-descriptors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>default</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<attach>true</attach> <!-- we want attach & deploy these to Maven -->
<descriptorRefs>
<descriptorRef>${assembly.descriptor}</descriptorRef>
</descriptorRefs>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>quick</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>quick</name>
</property>
</activation>
<properties>
<skipITs>true</skipITs>
</properties>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,227 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.azure.blob.history;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Loggings;
/** A {@link SchemaHistory} implementation that records schema changes as normal {@link SourceRecord}s on the specified topic,
* and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
*
* This implementation provides caching {@link HistoryRecord} on the main memory in the case of recovering records.
* {@link AzureBlobSchemaHistory#start()} fetches history log from Blob storage and store the {@link HistoryRecord} on the main memory.
* Also {@link AzureBlobSchemaHistory#storeRecord(HistoryRecord)} creates new history blob everytime invokes on Blob
*
* @author hossein torabi
* @a
*/
public class AzureBlobSchemaHistory extends AbstractSchemaHistory {
private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobSchemaHistory.class);
public static final String ACCOUNT_CONNECTION_STRING_CONFIG = "azure.storage.account.connectionstring";
public static final String CONTAINER_NAME_CONFIG = "azure.storage.account.container.name";
public static final String BLOB_NAME_CONFIG = "azure.storage.blob.name";
public static final Field ACCOUNT_CONNECTION_STRING = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + ACCOUNT_CONNECTION_STRING_CONFIG)
.withDisplayName("storage connection string")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH);
public static final Field CONTAINER_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + CONTAINER_NAME_CONFIG)
.withDisplayName("container name")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH);
public static final Field BLOB_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + BLOB_NAME_CONFIG)
.withDisplayName("blob name")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH);
public static final Field.Set ALL_FIELDS = Field.setOf(ACCOUNT_CONNECTION_STRING, CONTAINER_NAME, BLOB_NAME);
private final AtomicBoolean running = new AtomicBoolean();
private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
private final DocumentWriter documentWriter = DocumentWriter.defaultWriter();
private final DocumentReader reader = DocumentReader.defaultReader();
private volatile BlobServiceClient blobServiceClient = null;
private volatile BlobClient blobClient = null;
private String container = null;
private String blobName = null;
private volatile List<HistoryRecord> records = new ArrayList<>();
@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) {
throw new SchemaHistoryException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
container = config.getString(CONTAINER_NAME);
if (container == null) {
throw new DebeziumException(CONTAINER_NAME + " is required to be set");
}
blobName = config.getString(BLOB_NAME);
if (blobName == null) {
throw new DebeziumException(BLOB_NAME + " is required to be set");
}
}
@Override
public synchronized void start() {
if (blobServiceClient == null) {
blobServiceClient = new BlobServiceClientBuilder()
.connectionString(config.getString(ACCOUNT_CONNECTION_STRING))
.buildClient();
}
if (blobClient == null) {
blobClient = blobServiceClient.getBlobContainerClient(container)
.getBlobClient(blobName);
}
lock.write(() -> {
if (running.compareAndSet(false, true)) {
if (!storageExists()) {
initializeStorage();
}
}
if (blobClient.exists()) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
blobClient.downloadStream(outputStream);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
try (BufferedReader historyReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
while (true) {
String line = historyReader.readLine();
if (line == null) {
break;
}
if (!line.isEmpty()) {
records.add(new HistoryRecord(reader.read(line)));
}
}
}
catch (IOException e) {
throw new SchemaHistoryException("Unable to read object content", e);
}
}
});
super.start();
}
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (blobClient == null) {
throw new IllegalStateException("No Blob client is available. Ensure that 'start()' is called before storing database history records.");
}
if (record == null) {
return;
}
LOGGER.trace("Storing record into database history: {}", record);
lock.write(() -> {
if (!running.get()) {
throw new IllegalStateException("The history has been stopped and will not accept more records");
}
records.add(record);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (BufferedWriter historyWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) {
for (HistoryRecord r : records) {
String line = null;
line = documentWriter.write(r.document());
if (line != null) {
historyWriter.newLine();
historyWriter.append(line);
}
}
}
catch (IOException e) {
Loggings.logErrorAndTraceRecord(logger, record, "Failed to convert record", e);
throw new SchemaHistoryException("Failed to convert record", e);
}
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
blobClient.upload(inputStream, true);
});
}
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> this.records.forEach(records));
}
@Override
public boolean exists() {
return !records.isEmpty();
}
@Override
public boolean storageExists() {
final boolean containerExists = blobServiceClient.getBlobContainerClient(container)
.exists();
if (containerExists) {
LOGGER.info("Container '{}' used to store database history exists", container);
}
else {
LOGGER.info("Container '{}' used to store database history does not exist yet", container);
}
return containerExists;
}
public void initializeStorage() {
blobServiceClient.createBlobContainer(container);
}
@Override
public String toString() {
return "Azure Blob Storage";
}
}

View File

@ -14,6 +14,9 @@
<properties>
<!-- S3 -->
<version.s3mock>2.4.14</version.s3mock>
<!-- Azurite -->
<version.azurite>3.22.0</version.azurite>
</properties>
<dependencyManagement>
@ -81,6 +84,12 @@
<artifactId>debezium-storage-s3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-azure-blob</artifactId>
<version>2.2.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<!-- Source system -->
<dependency>
@ -106,6 +115,12 @@
<artifactId>s3mock-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<!-- Azure Blob Storage -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>
</dependencies>
<build>
@ -133,6 +148,7 @@
<systemPropertyVariables>
<!-- Make these available to the tests via system properties -->
<tag.smock>${version.s3mock}</tag.smock>
<tag.azurite>${version.azurite}</tag.azurite>
</systemPropertyVariables>
<runOrder>alphabetical</runOrder>
<argLine>${debug.argline} ${modules.argline} ${test.argline}</argLine>

View File

@ -0,0 +1,122 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.azure.blob.history;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import io.debezium.config.Configuration;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.AbstractSchemaHistoryTest;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryListener;
public class AzureBlobSchemaHistoryIT extends AbstractSchemaHistoryTest {
final public static String IMAGE_TAG = System.getProperty("tag.azurite", "latest");
final public static String CONTAINER_NAME = "debezium";
final public static String BLOB_NAME = "debezium-history.log";
final public static String CONNECTION_STRING = "DefaultEndpointsProtocol=http;" +
"AccountName=account;" +
"AccountKey=key;" +
"BlobEndpoint=http://127.0.0.1:%s/account;";
final private static GenericContainer<?> container = new GenericContainer(String.format("mcr.microsoft.com/azure-storage/azurite:%s", IMAGE_TAG))
.withCommand("azurite --blobHost 0.0.0.0 --blobPort 10000")
.withEnv("AZURITE_ACCOUNTS", "account:key")
.withExposedPorts(10000);
private static BlobServiceClient blobServiceClient;
@BeforeClass
public static void startAzurite() {
container.start();
blobServiceClient = new BlobServiceClientBuilder()
.connectionString(String.format(CONNECTION_STRING, container.getMappedPort(10000)))
.buildClient();
}
@AfterClass()
public static void stopAzurite() {
container.stop();
}
@Override
protected SchemaHistory createHistory() {
SchemaHistory history = new AzureBlobSchemaHistory();
Configuration config = Configuration.create()
.with(AzureBlobSchemaHistory.ACCOUNT_CONNECTION_STRING, String.format(CONNECTION_STRING, container.getMappedPort(10000)))
.with(AzureBlobSchemaHistory.CONTAINER_NAME, CONTAINER_NAME)
.with(AzureBlobSchemaHistory.BLOB_NAME, BLOB_NAME)
.build();
history.configure(config, null, SchemaHistoryListener.NOOP, true);
history.start();
return history;
}
@Test
public void initializeStorageShouldCreateContainer() {
blobServiceClient.deleteBlobContainer(CONTAINER_NAME);
assertFalse(blobServiceClient.getBlobContainerClient(CONTAINER_NAME).exists());
history.initializeStorage();
assertTrue(blobServiceClient.getBlobContainerClient(CONTAINER_NAME).exists());
}
@Test
public void storeRecordShouldSaveRecordsInBlobStorage() throws IOException {
BlobClient blobClient = blobServiceClient
.getBlobContainerClient(CONTAINER_NAME)
.getBlobClient(BLOB_NAME);
assertFalse(blobClient.exists());
record(01, 0, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", all, t3, t2, t1, t0);
assertTrue(blobClient.exists());
List<HistoryRecord> historyRecords = new ArrayList<>();
ByteArrayInputStream inputStream = new ByteArrayInputStream(blobClient.downloadContent().toBytes());
BufferedReader historyReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
DocumentReader reader = DocumentReader.defaultReader();
while (true) {
String line = historyReader.readLine();
if (line == null) {
break;
}
historyRecords.add(new HistoryRecord(reader.read(historyReader.readLine())));
}
assertEquals(1, historyRecords.size());
assertEquals("CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", historyRecords.get(0).document().getString("ddl"));
assertEquals(1, historyRecords.get(0).document().getDocument("position").getInteger("position").intValue());
assertEquals(0, historyRecords.get(0).document().getDocument("position").getInteger("entry").intValue());
}
}

View File

@ -23,5 +23,6 @@
<module>debezium-storage-s3</module>
<module>debezium-storage-tests</module>
<module>debezium-storage-rocketmq</module>
<module>debezium-storage-azure-blob</module>
</modules>
</project>