Change config from boolean to enum

This commit is contained in:
Eero Koplimets 2018-11-01 12:06:08 +02:00 committed by Gunnar Morling
parent 8844442459
commit 268101acd5
3 changed files with 67 additions and 8 deletions

View File

@ -329,6 +329,58 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
}
}
/**
* The set of predefined Gtid New Channel Position options.
*/
public static enum GtidNewChannelPosition implements EnumeratedValue {
/**
* This mode will start reading new gtid channel from mysql servers last_executed position
*/
LATEST("latest"),
/**
* This mode will start reading new gtid channel from earliest available position in server.
* This is needed when during active-passive failover the new gtid channel becomes active and receiving writes. #DBZ-923
*/
EARLIEST("earliest");
private final String value;
private GtidNewChannelPosition(String value) { this.value = value; }
@Override
public String getValue() { return value; }
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static GtidNewChannelPosition parse(String value) {
if (value == null) return null;
value = value.trim();
for (GtidNewChannelPosition option : GtidNewChannelPosition.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static GtidNewChannelPosition parse(String value, String defaultValue) {
GtidNewChannelPosition mode = parse(value);
if (mode == null && defaultValue != null) mode = parse(defaultValue);
return mode;
}
}
/**
* The set of predefined modes for dealing with failures during binlog event processing.
*/
@ -696,13 +748,12 @@ public static DdlParsingMode parse(String value, String defaultValue) {
*
* When true, either {@link #GTID_SOURCE_INCLUDES} or {@link #GTID_SOURCE_EXCLUDES} must be set.
*/
public static final Field GTID_SOURCE_START_FROM_LATEST = Field.create("gtid.source.start.from.latest")
public static final Field GTID_NEW_CHANNEL_POSITION = Field.create("gtid.new.channel.position")
.withDisplayName("GTID start position")
.withType(Type.BOOLEAN)
.withEnum(GtidNewChannelPosition.class, GtidNewChannelPosition.LATEST)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(true)
.withDescription("If set to true, when connector sees new GTID set, it will start consuming from server latest executed gtid position. If false, starts reading from earliest available (not purged) gtid position on server.");
.withDescription("If set to 'latest', when connector sees new GTID, it will start consuming gtid channel from the server latest executed gtid position. If 'earliest' connector starts reading channel from first available (not purged) gtid position on the server.");
public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms")

View File

@ -16,6 +16,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import io.debezium.function.Predicates;
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
@ -240,6 +241,12 @@ protected SnapshotMode snapshotMode() {
return SnapshotMode.parse(value, MySqlConnectorConfig.SNAPSHOT_MODE.defaultValueAsString());
}
protected GtidNewChannelPosition gtidNewChannelPosition() {
String value = config.getString(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION);
return GtidNewChannelPosition.parse(value, MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION.defaultValueAsString());
}
public String getSnapshotSelectOverrides() {
return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}
@ -314,14 +321,14 @@ public GtidSet filterGtidSet(GtidSet availableServerGtidSet, GtidSet purgedServe
GtidSet mergedGtidSet;
if (this.config.getBoolean(MySqlConnectorConfig.GTID_SOURCE_START_FROM_LATEST)) {
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
} else {
if (this.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) {
LOGGER.info("Using first available positions for new GTID channels");
mergedGtidSet = availableServerGtidSet
.getGTIDSetBeginning()
.with(purgedServerGtid)
.with(filteredGtidSet);
} else {
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
}
LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);

View File

@ -18,6 +18,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import io.debezium.document.Document;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.HistoryRecord;
@ -247,7 +248,7 @@ public void shouldMergeToFirstAvailableGtidSetPositions() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c")
.with(MySqlConnectorConfig.GTID_SOURCE_START_FROM_LATEST, false)
.with(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION, GtidNewChannelPosition.EARLIEST)
.build();
context = new MySqlTaskContext(config, false);