DBZ-278 Explicit history storage initialization

This commit is contained in:
Jiri Pechanec 2018-02-05 14:01:02 +01:00 committed by Gunnar Morling
parent ad7b92310e
commit 4541fc53c9
7 changed files with 56 additions and 39 deletions

View File

@ -79,6 +79,7 @@ public synchronized void start(Map<String, String> props) {
// First check if db history is available
if (!taskContext.historyExists()) {
taskContext.initializeHistoryStorage();
if (taskContext.isSchemaOnlyRecoverySnapshot()) {
startWithSnapshot = true;
@ -128,6 +129,7 @@ public synchronized void start(Map<String, String> props) {
} else {
// We have no recorded offsets ...
taskContext.initializeHistoryStorage();
if (taskContext.isSnapshotNeverAllowed()) {
// We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
// full history of the database.

View File

@ -274,6 +274,13 @@ public boolean historyExists() {
return dbHistory.exists();
}
/**
* Initialize permanent storage for database history
*/
public void intializeHistoryStorage() {
dbHistory.initializeStorage();
}
/**
* Discard any currently-cached schemas and rebuild them using the filters.
*/

View File

@ -163,6 +163,13 @@ public boolean historyExists() {
return dbSchema.historyExists();
}
/**
* Initialize permanent storage for database history
*/
public void initializeHistoryStorage() {
dbSchema.intializeHistoryStorage();
}
public Clock clock() {
return clock;
}

View File

@ -95,4 +95,8 @@ public final void recover(Map<String, ?> source, Map<String, ?> position, Tables
public void stop() {
// do nothing
}
@Override
public void initializeStorage() {
}
}

View File

@ -123,4 +123,8 @@ public interface DatabaseHistory {
*/
boolean exists();
/**
* Called to initialize permanent storage of the history.
*/
void initializeStorage();
}

View File

@ -10,7 +10,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@ -56,8 +55,6 @@
@NotThreadSafe
public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
private static final Duration MESSAGE_RETENTION_IN_DAYS = Duration.ofDays(365 * 10);
public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.topic")
.withDisplayName("Database history topic name")
.withType(Type.STRING)
@ -163,45 +160,11 @@ public void configure(Configuration config, HistoryRecordComparator comparator)
@Override
public synchronized void start() {
super.start();
createDatabaseHistoryTopic();
if (this.producer == null) {
this.producer = new KafkaProducer<>(this.producerConfig.asProperties());
}
}
private void createDatabaseHistoryTopic() {
final AdminClient admin = AdminClient.create(this.producerConfig.asProperties());
try {
short replicationFactor;
Optional<String> findTopic = admin.listTopics().names().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS).stream().filter(x -> topicName.equals(x)).findFirst();
if (!findTopic.isPresent()) {
// Find default replication factor
final Collection<Node> nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (nodes.isEmpty()) {
throw new ConnectException("No brokers available to obtain default settings");
}
final Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodes.iterator().next().idString()))).all().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (configs.isEmpty()) {
throw new ConnectException("No configs have been received");
}
final Config config = configs.values().iterator().next();
replicationFactor = Short.parseShort(config.get("default.replication.factor").value());
// Create topic
final NewTopic topic = new NewTopic(topicName, (short)1, replicationFactor);
topic.configs(Collect.hashMapOf("cleanup.policy", "delete", "retention.ms", Long.toString(MESSAGE_RETENTION_IN_DAYS.toMillis())));
admin.createTopics(Collections.singleton(topic));
logger.info("Database history topic created");
} else {
logger.info("Database history topic already exists");
}
} catch (Exception e) {
logger.warn("Debezium could not create database history topic, delegating to operator or broker", e);
}
}
@Override
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
if (this.producer == null) {
@ -358,4 +321,31 @@ public String toString() {
protected static String consumerConfigPropertyName(String kafkaConsumerPropertyName) {
return CONSUMER_PREFIX + kafkaConsumerPropertyName;
}
@Override
public void initializeStorage() {
super.initializeStorage();
final AdminClient admin = AdminClient.create(this.producerConfig.asProperties());
try {
// Find default replication factor
final Collection<Node> nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (nodes.isEmpty()) {
throw new ConnectException("No brokers available to obtain default settings");
}
final Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodes.iterator().next().idString()))).all().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (configs.isEmpty()) {
throw new ConnectException("No configs have been received");
}
final Config config = configs.values().iterator().next();
final short replicationFactor = Short.parseShort(config.get("default.replication.factor").value());
// Create topic
final NewTopic topic = new NewTopic(topicName, (short)1, replicationFactor);
topic.configs(Collect.hashMapOf("cleanup.policy", "delete", "retention.ms", Long.toString(Long.MAX_VALUE)));
admin.createTopics(Collections.singleton(topic));
logger.info("Database history topic '{}' created", topic);
} catch (Exception e) {
logger.warn("Creation of database history topic failed, please create the topic manually", e);
}
}
}

View File

@ -108,6 +108,9 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) {
// Should be able to call start more than once ...
history.start();
history.initializeStorage();
history.initializeStorage();
DdlParser recoveryParser = new DdlParserSql2003();
DdlParser ddlParser = new DdlParserSql2003();
ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well