DBZ-40 Snapshotting and connector restart

This commit is contained in:
Jiri Pechanec 2018-07-17 13:12:09 +02:00 committed by Gunnar Morling
parent a5b5e7d9eb
commit 0e35439019
4 changed files with 23 additions and 14 deletions

View File

@ -7,7 +7,6 @@
import java.math.BigDecimal;
import java.time.Duration;
import java.util.Objects;
import java.util.Random;
import org.apache.kafka.common.config.ConfigDef;
@ -1128,18 +1127,6 @@ private static int validateGtidSetExcludes(Configuration config, Field field, Va
return 0;
}
private static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, ValidationOutput problems) {
String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
String historyTopicName = config.getString(KafkaDatabaseHistory.TOPIC);
if (Objects.equals(serverName, historyTopicName)) {
problems.accept(SERVER_NAME, serverName, "Must not have the same value as " + KafkaDatabaseHistory.TOPIC.name());
return 1;
}
return 0;
}
/**
* Validate the new snapshot.locking.mode configuration, which replaces snapshot.minimal.locking.
*

View File

@ -7,12 +7,15 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.relational.history.KafkaDatabaseHistory;
/**
* Configuration options common to all Debezium connectors.
@ -24,6 +27,7 @@ public class CommonConnectorConfig {
public static final int DEFAULT_MAX_QUEUE_SIZE = 8192;
public static final int DEFAULT_MAX_BATCH_SIZE = 2048;
public static final long DEFAULT_POLL_INTERVAL_MILLIS = 500;
public static final String DATABASE_CONFIG_PREFIX = "database.";
public static final Field TOMBSTONES_ON_DELETE = Field.create("tombstones.on.delete")
.withDisplayName("Change the behaviour of Debezium with regards to delete operations")
@ -129,4 +133,17 @@ private static int validateMaxQueueSize(Configuration config, Field field, Field
}
return count;
}
protected static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, ValidationOutput problems) {
String serverName = config.getString(field);
String historyTopicName = config.getString(KafkaDatabaseHistory.TOPIC);
if (Objects.equals(serverName, historyTopicName)) {
problems.accept(field, serverName, "Must not have the same value as " + KafkaDatabaseHistory.TOPIC.name());
return 1;
}
return 0;
}
}

View File

@ -399,4 +399,8 @@ public boolean snapshotSchema() {
return snapshotSchema;
}
}
protected Clock getClock() {
return clock;
}
}

View File

@ -37,7 +37,8 @@ public SchemaChangeEvent(Map<String, ?> partition, Map<String, ?> offset, String
this.offset = Objects.requireNonNull(offset, "offset must not be null");
this.database = Objects.requireNonNull(database, "database must not be null");
this.schema = Objects.requireNonNull(schema, "schema must not be null");
this.ddl = Objects.requireNonNull(ddl, "ddl must not be null");
// DDL is not mandatory
this.ddl = ddl;
this.tables = Objects.requireNonNull(tables, "tables must not be null");
this.type = Objects.requireNonNull(type, "type must not be null");
this.isFromSnapshot = isFromSnapshot;