DBZ-5402 Make object name configurable
This commit is contained in:
parent
5883afdabe
commit
2ecd8422c7
@ -75,6 +75,7 @@ public class S3SchemaHistory extends AbstractSchemaHistory {
|
||||
public static final String SECRET_ACCESS_KEY_CONFIG = "s3.secret.access.key";
|
||||
public static final String REGION_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.region.name";
|
||||
public static final String BUCKET_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.bucket.name";
|
||||
public static final String OBJECT_NAME_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.object.name";
|
||||
public static final String ENDPOINT_CONFIG = CONFIGURATION_FIELD_PREFIX_STRING + "s3.endpoint";
|
||||
public static final String OBJECT_CONTENT_TYPE = "text/plain";
|
||||
|
||||
@ -101,6 +102,12 @@ public class S3SchemaHistory extends AbstractSchemaHistory {
|
||||
.withType(Type.STRING)
|
||||
.withImportance(Importance.HIGH);
|
||||
|
||||
public static final Field OBJECT_NAME = Field.create(OBJECT_NAME_CONFIG)
|
||||
.withDisplayName("S3 Object name")
|
||||
.withType(Type.STRING)
|
||||
.withImportance(Importance.HIGH)
|
||||
.withDescription("The name of the object under which the history is stored.");
|
||||
|
||||
public static final Field ENDPOINT = Field.create(ENDPOINT_CONFIG)
|
||||
.withDisplayName("S3 endpoint")
|
||||
.withType(Type.STRING)
|
||||
@ -112,9 +119,9 @@ public class S3SchemaHistory extends AbstractSchemaHistory {
|
||||
private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
|
||||
private final DocumentWriter writer = DocumentWriter.defaultWriter();
|
||||
private final DocumentReader reader = DocumentReader.defaultReader();
|
||||
private final String objectName = String.format("db-history-%s.log", Thread.currentThread().getName());
|
||||
|
||||
private String bucket = null;
|
||||
private String objectName = null;
|
||||
private Region region = null;
|
||||
private URI endpoint = null;
|
||||
private AwsCredentialsProvider credentialsProvider = null;
|
||||
@ -134,13 +141,20 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
|
||||
throw new DebeziumException(BUCKET + " is required to be set");
|
||||
}
|
||||
|
||||
// Unknown value is mapped to aws-global internally
|
||||
objectName = config.getString(OBJECT_NAME);
|
||||
if (objectName == null) {
|
||||
throw new DebeziumException(OBJECT_NAME + " is required to be set");
|
||||
}
|
||||
|
||||
// Unknown value is not detected by Region.of
|
||||
final var regionName = config.getString(REGION);
|
||||
if (regionName == null) {
|
||||
throw new DebeziumException(REGION + " is required to be set");
|
||||
}
|
||||
region = Region.of(regionName);
|
||||
|
||||
LOGGER.info("Database history will be stored in bucket '{}' under key '{}' using region '{}'", bucket, objectName, region);
|
||||
|
||||
final var uriString = config.getString(ENDPOINT);
|
||||
if (uriString != null) {
|
||||
LOGGER.info("Using explicitly configured endpoint " + uriString);
|
||||
@ -186,7 +200,7 @@ public synchronized void start() {
|
||||
// do nothing
|
||||
}
|
||||
catch (S3Exception e) {
|
||||
throw new SchemaHistoryException("Can not retrieve history object from S3", e);
|
||||
throw new SchemaHistoryException("Can't retrieve history object from S3", e);
|
||||
}
|
||||
|
||||
if (objectInputStream != null) {
|
||||
|
@ -55,6 +55,11 @@
|
||||
</dependency>
|
||||
|
||||
<!-- Test libs -->
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
@ -23,7 +23,6 @@
|
||||
|
||||
import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
|
||||
|
||||
import io.debezium.DebeziumException;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.document.DocumentReader;
|
||||
import io.debezium.relational.history.AbstractSchemaHistoryTest;
|
||||
@ -81,7 +80,8 @@ protected SchemaHistory createHistory() {
|
||||
.with(S3SchemaHistory.ACCESS_KEY_ID, "aa")
|
||||
.with(S3SchemaHistory.SECRET_ACCESS_KEY, "bb")
|
||||
.with(S3SchemaHistory.BUCKET_CONFIG, BUCKET)
|
||||
.with(S3SchemaHistory.REGION_CONFIG, Region.AWS_GLOBAL)
|
||||
.with(S3SchemaHistory.OBJECT_NAME, OBJECT_NAME)
|
||||
.with(S3SchemaHistory.REGION_CONFIG, Region.AWS_GLOBAL.id())
|
||||
.with(S3SchemaHistory.ENDPOINT_CONFIG, container.getHttpEndpoint())
|
||||
.build();
|
||||
history.configure(config, null, SchemaHistoryListener.NOOP, true);
|
||||
|
Loading…
Reference in New Issue
Block a user