DBZ-923 config to use earliest available gtid set from mysql server when connecting

This commit is contained in:
Eero Koplimets 2018-10-02 13:37:58 +03:00 committed by Gunnar Morling
parent db1d3a7fb8
commit 8844442459
6 changed files with 119 additions and 4 deletions

View File

@ -279,7 +279,13 @@ protected void doStart() {
// Now look at the GTID set from the server and what we've previously seen ...
GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);
GtidSet filteredGtidSet = context.filterGtidSet(availableServerGtidSet);
// also take into account purged GTID logs
String purgedServerGtidStr = connectionContext.purgedGtidSet();
GtidSet purgedServerGtidSet = new GtidSet(purgedServerGtidStr);
logger.info("GTID set purged on server: {}", purgedServerGtidSet);
GtidSet filteredGtidSet = context.filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
if (filteredGtidSet != null) {
// We've seen at least some GTIDs, so start reading from the filtered GTID set ...
logger.info("Registering binlog reader with GTID set: {}", filteredGtidSet);

View File

@ -114,6 +114,21 @@ public GtidSet with(GtidSet other) {
return new GtidSet(newSet);
}
/**
* Returns a copy with all intervals set to beginning
* @return
*/
public GtidSet getGTIDSetBeginning() {
Map<String, UUIDSet> newSet = new HashMap<>();
for (UUIDSet uuidSet : uuidSetsByServerId.values()) {
newSet.put(uuidSet.getUUID(), uuidSet.asIntervalBeginning());
}
return new GtidSet(newSet);
}
@Override
public int hashCode() {
return uuidSetsByServerId.keySet().hashCode();
@ -165,6 +180,15 @@ protected UUIDSet(com.github.shyiko.mysql.binlog.GtidSet.UUIDSet uuidSet) {
}
}
}
protected UUIDSet(String uuid, Interval interval) {
this.uuid = uuid;
this.intervals.add(interval);
}
public UUIDSet asIntervalBeginning() {
Interval start = new Interval(intervals.get(0).getStart(), intervals.get(0).getStart());
return new UUIDSet(this.uuid, start);
}
/**
* Get the Uuid for the server that generated the GTIDs.

View File

@ -687,6 +687,24 @@ public static DdlParsingMode parse(String value, String defaultValue) {
.withDefault(true)
.withDescription("If set to true, we will only produce DML events into Kafka for transactions that were written on mysql servers with UUIDs matching the filters defined by the gtid.source.includes or gtid.source.excludes configuration options, if they are specified.");
/**
* If set to true, connector when encountering new GTID channel after job restart will start reading it from the
* latest executed position (default). When set to false the connector will try to read this GTID channel from the first available offset.
* This is useful when in active-passive mysql setup during failover new GTID channel gets used, see #DBZ-923
*
* Defaults to true.
*
* When true, either {@link #GTID_SOURCE_INCLUDES} or {@link #GTID_SOURCE_EXCLUDES} must be set.
*/
public static final Field GTID_SOURCE_START_FROM_LATEST = Field.create("gtid.source.start.from.latest")
.withDisplayName("GTID start position")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(true)
.withDescription("If set to true, when connector sees new GTID set, it will start consuming from server latest executed gtid position. If false, starts reading from earliest available (not purged) gtid position on server.");
public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms")
.withDisplayName("Connection Timeout (ms)")
.withType(Type.INT)

View File

@ -177,6 +177,27 @@ public String knownGtidSet() {
return result != null ? result : "";
}
/**
* Get the purged gtid values from MySQL (gtid_purged value)
*
* @return string representation of GTID set or empty string
*/
public String purgedGtidSet() {
AtomicReference<String> gtidSetStr = new AtomicReference<String>();
try {
jdbc.query("SHOW GLOBAL VARIABLES LIKE \"gtid_purged\"", rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 1) {
gtidSetStr.set(rs.getString(2));// GTID set, may be null, blank, or contain a GTID set
}
});
} catch (SQLException e) {
throw new ConnectException("Unexpected error while connecting to MySQL and looking at gtid_purged variable: ", e);
}
String result = gtidSetStr.get();
return result != null ? result : "";
}
/**
* Determine if the current user has the named privilege. Note that if the user has the "ALL" privilege this method
* returns {@code true}.

View File

@ -293,10 +293,11 @@ public void temporaryLoggingContext(String contextName, Runnable operation) {
* This method does not mutate any state in the context.
*
* @param availableServerGtidSet the GTID set currently available in the MySQL server
* @param purgedServerGtid the GTID set already purged by the MySQL server
* @return A GTID set meant for consuming from a MySQL binlog; may return null if the SourceInfo has no GTIDs and therefore
* none were filtered
*/
public GtidSet filterGtidSet(GtidSet availableServerGtidSet) {
public GtidSet filterGtidSet(GtidSet availableServerGtidSet, GtidSet purgedServerGtid) {
String gtidStr = source.gtidSet();
if (gtidStr == null) {
return null;
@ -310,7 +311,19 @@ public GtidSet filterGtidSet(GtidSet availableServerGtidSet) {
LOGGER.info("GTID set after applying GTID source includes/excludes to previous recorded offset: {}", filteredGtidSet);
}
LOGGER.info("GTID set available on server: {}", availableServerGtidSet);
GtidSet mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
GtidSet mergedGtidSet;
if (this.config.getBoolean(MySqlConnectorConfig.GTID_SOURCE_START_FROM_LATEST)) {
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
} else {
LOGGER.info("Using first available positions for new GTID channels");
mergedGtidSet = availableServerGtidSet
.getGTIDSetBeginning()
.with(purgedServerGtid)
.with(filteredGtidSet);
}
LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);
return mergedGtidSet;
}

View File

@ -214,6 +214,8 @@ public void shouldFilterAndMergeGtidSet() throws Exception {
String availableServerGtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-20,"
+ "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200,"
+ "123e4567-e89b-12d3-a456-426655440000:1-41";
String purgedServerGtidStr = "";
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c")
.build();
@ -221,7 +223,7 @@ public void shouldFilterAndMergeGtidSet() throws Exception {
context.start();
context.source().setCompletedGtidSet(gtidStr);
GtidSet mergedGtidSet = context.filterGtidSet(new GtidSet(availableServerGtidStr));
GtidSet mergedGtidSet = context.filterGtidSet(new GtidSet(availableServerGtidStr), new GtidSet(purgedServerGtidStr));
assertThat(mergedGtidSet).isNotNull();
GtidSet.UUIDSet uuidSet1 = mergedGtidSet.forServerWithId("036d85a9-64e5-11e6-9b48-42010af0000c");
GtidSet.UUIDSet uuidSet2 = mergedGtidSet.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004");
@ -234,6 +236,37 @@ public void shouldFilterAndMergeGtidSet() throws Exception {
assertThat(uuidSet4).isNull();
}
@Test
public void shouldMergeToFirstAvailableGtidSetPositions() throws Exception {
String gtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2,"
+ "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:5-41";
String availableServerGtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-20,"
+ "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200,"
+ "123e4567-e89b-12d3-a456-426655440000:1-41";
String purgedServerGtidStr = "7145bf69-d1ca-11e5-a588-0242ac110004:1-1234";
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c")
.with(MySqlConnectorConfig.GTID_SOURCE_START_FROM_LATEST, false)
.build();
context = new MySqlTaskContext(config, false);
context.start();
context.source().setCompletedGtidSet(gtidStr);
GtidSet mergedGtidSet = context.filterGtidSet(new GtidSet(availableServerGtidStr), new GtidSet(purgedServerGtidStr));
assertThat(mergedGtidSet).isNotNull();
GtidSet.UUIDSet uuidSet1 = mergedGtidSet.forServerWithId("036d85a9-64e5-11e6-9b48-42010af0000c");
GtidSet.UUIDSet uuidSet2 = mergedGtidSet.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004");
GtidSet.UUIDSet uuidSet3 = mergedGtidSet.forServerWithId("123e4567-e89b-12d3-a456-426655440000");
GtidSet.UUIDSet uuidSet4 = mergedGtidSet.forServerWithId("7c1de3f2-3fd2-11e6-9cdc-42010af000bc");
assertThat(uuidSet1.getIntervals()).isEqualTo(Arrays.asList(new GtidSet.Interval(1, 2)));
assertThat(uuidSet2.getIntervals()).isEqualTo(Arrays.asList(new GtidSet.Interval(1, 1234)));
assertThat(uuidSet3.getIntervals()).isEqualTo(Arrays.asList(new GtidSet.Interval(1, 1)));
assertThat(uuidSet4).isNull();
}
@Test
public void shouldComparePositionsWithDifferentFields() {
String lastGtidStr = "01261278-6ade-11e6-b36a-42010af00790:1-400944168,"