From 8844442459f3b18c3de7771f7447d492405d4fac Mon Sep 17 00:00:00 2001 From: Eero Koplimets Date: Tue, 2 Oct 2018 13:37:58 +0300 Subject: [PATCH] DBZ-923 config to use earliest available gtid set from mysql server when connecting --- .../connector/mysql/BinlogReader.java | 8 ++++- .../io/debezium/connector/mysql/GtidSet.java | 24 +++++++++++++ .../connector/mysql/MySqlConnectorConfig.java | 18 ++++++++++ .../connector/mysql/MySqlJdbcContext.java | 21 +++++++++++ .../connector/mysql/MySqlTaskContext.java | 17 +++++++-- .../connector/mysql/MySqlTaskContextTest.java | 35 ++++++++++++++++++- 6 files changed, 119 insertions(+), 4 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java index cd77ec53e..e8167e0d8 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java @@ -279,7 +279,13 @@ protected void doStart() { // Now look at the GTID set from the server and what we've previously seen ... GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr); - GtidSet filteredGtidSet = context.filterGtidSet(availableServerGtidSet); + + // also take into account purged GTID logs + String purgedServerGtidStr = connectionContext.purgedGtidSet(); + GtidSet purgedServerGtidSet = new GtidSet(purgedServerGtidStr); + logger.info("GTID set purged on server: {}", purgedServerGtidSet); + + GtidSet filteredGtidSet = context.filterGtidSet(availableServerGtidSet, purgedServerGtidSet); if (filteredGtidSet != null) { // We've seen at least some GTIDs, so start reading from the filtered GTID set ... logger.info("Registering binlog reader with GTID set: {}", filteredGtidSet); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/GtidSet.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/GtidSet.java index 7f6b6c5ba..7a7723eaf 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/GtidSet.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/GtidSet.java @@ -114,6 +114,21 @@ public GtidSet with(GtidSet other) { return new GtidSet(newSet); } + /** + * Returns a copy with all intervals set to beginning + * @return + */ + public GtidSet getGTIDSetBeginning() { + Map newSet = new HashMap<>(); + + for (UUIDSet uuidSet : uuidSetsByServerId.values()) { + newSet.put(uuidSet.getUUID(), uuidSet.asIntervalBeginning()); + } + + + return new GtidSet(newSet); + } + @Override public int hashCode() { return uuidSetsByServerId.keySet().hashCode(); @@ -165,6 +180,15 @@ protected UUIDSet(com.github.shyiko.mysql.binlog.GtidSet.UUIDSet uuidSet) { } } } + protected UUIDSet(String uuid, Interval interval) { + this.uuid = uuid; + this.intervals.add(interval); + } + + public UUIDSet asIntervalBeginning() { + Interval start = new Interval(intervals.get(0).getStart(), intervals.get(0).getStart()); + return new UUIDSet(this.uuid, start); + } /** * Get the Uuid for the server that generated the GTIDs. diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 5b46c04ef..f41ff28ce 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -687,6 +687,24 @@ public static DdlParsingMode parse(String value, String defaultValue) { .withDefault(true) .withDescription("If set to true, we will only produce DML events into Kafka for transactions that were written on mysql servers with UUIDs matching the filters defined by the gtid.source.includes or gtid.source.excludes configuration options, if they are specified."); + /** + * If set to true, connector when encountering new GTID channel after job restart will start reading it from the + * latest executed position (default). When set to false the connector will try to read this GTID channel from the first available offset. + * This is useful when in active-passive mysql setup during failover new GTID channel gets used, see #DBZ-923 + * + * Defaults to true. + * + * When true, either {@link #GTID_SOURCE_INCLUDES} or {@link #GTID_SOURCE_EXCLUDES} must be set. + */ + public static final Field GTID_SOURCE_START_FROM_LATEST = Field.create("gtid.source.start.from.latest") + .withDisplayName("GTID start position") + .withType(Type.BOOLEAN) + .withWidth(Width.SHORT) + .withImportance(Importance.MEDIUM) + .withDefault(true) + .withDescription("If set to true, when connector sees new GTID set, it will start consuming from server latest executed gtid position. If false, starts reading from earliest available (not purged) gtid position on server."); + + public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms") .withDisplayName("Connection Timeout (ms)") .withType(Type.INT) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java index c90d1b34f..bb4f189be 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java @@ -177,6 +177,27 @@ public String knownGtidSet() { return result != null ? result : ""; } + /** + * Get the purged gtid values from MySQL (gtid_purged value) + * + * @return string representation of GTID set or empty string + */ + public String purgedGtidSet() { + AtomicReference gtidSetStr = new AtomicReference(); + try { + jdbc.query("SHOW GLOBAL VARIABLES LIKE \"gtid_purged\"", rs -> { + if (rs.next() && rs.getMetaData().getColumnCount() > 1) { + gtidSetStr.set(rs.getString(2));// GTID set, may be null, blank, or contain a GTID set + } + }); + } catch (SQLException e) { + throw new ConnectException("Unexpected error while connecting to MySQL and looking at gtid_purged variable: ", e); + } + + String result = gtidSetStr.get(); + return result != null ? result : ""; + } + /** * Determine if the current user has the named privilege. Note that if the user has the "ALL" privilege this method * returns {@code true}. diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index 3f84dda37..3e3866059 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -293,10 +293,11 @@ public void temporaryLoggingContext(String contextName, Runnable operation) { * This method does not mutate any state in the context. * * @param availableServerGtidSet the GTID set currently available in the MySQL server + * @param purgedServerGtid the GTID set already purged by the MySQL server * @return A GTID set meant for consuming from a MySQL binlog; may return null if the SourceInfo has no GTIDs and therefore * none were filtered */ - public GtidSet filterGtidSet(GtidSet availableServerGtidSet) { + public GtidSet filterGtidSet(GtidSet availableServerGtidSet, GtidSet purgedServerGtid) { String gtidStr = source.gtidSet(); if (gtidStr == null) { return null; @@ -310,7 +311,19 @@ public GtidSet filterGtidSet(GtidSet availableServerGtidSet) { LOGGER.info("GTID set after applying GTID source includes/excludes to previous recorded offset: {}", filteredGtidSet); } LOGGER.info("GTID set available on server: {}", availableServerGtidSet); - GtidSet mergedGtidSet = availableServerGtidSet.with(filteredGtidSet); + + GtidSet mergedGtidSet; + + if (this.config.getBoolean(MySqlConnectorConfig.GTID_SOURCE_START_FROM_LATEST)) { + mergedGtidSet = availableServerGtidSet.with(filteredGtidSet); + } else { + LOGGER.info("Using first available positions for new GTID channels"); + mergedGtidSet = availableServerGtidSet + .getGTIDSetBeginning() + .with(purgedServerGtid) + .with(filteredGtidSet); + } + LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet); return mergedGtidSet; } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java index 75cb4c23a..369c85a61 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java @@ -214,6 +214,8 @@ public void shouldFilterAndMergeGtidSet() throws Exception { String availableServerGtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-20," + "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200," + "123e4567-e89b-12d3-a456-426655440000:1-41"; + String purgedServerGtidStr = ""; + config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES, "036d85a9-64e5-11e6-9b48-42010af0000c") .build(); @@ -221,7 +223,7 @@ public void shouldFilterAndMergeGtidSet() throws Exception { context.start(); context.source().setCompletedGtidSet(gtidStr); - GtidSet mergedGtidSet = context.filterGtidSet(new GtidSet(availableServerGtidStr)); + GtidSet mergedGtidSet = context.filterGtidSet(new GtidSet(availableServerGtidStr), new GtidSet(purgedServerGtidStr)); assertThat(mergedGtidSet).isNotNull(); GtidSet.UUIDSet uuidSet1 = mergedGtidSet.forServerWithId("036d85a9-64e5-11e6-9b48-42010af0000c"); GtidSet.UUIDSet uuidSet2 = mergedGtidSet.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004"); @@ -234,6 +236,37 @@ public void shouldFilterAndMergeGtidSet() throws Exception { assertThat(uuidSet4).isNull(); } + @Test + public void shouldMergeToFirstAvailableGtidSetPositions() throws Exception { + String gtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2," + + "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:5-41"; + String availableServerGtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-20," + + "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200," + + "123e4567-e89b-12d3-a456-426655440000:1-41"; + String purgedServerGtidStr = "7145bf69-d1ca-11e5-a588-0242ac110004:1-1234"; + + config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES, + "036d85a9-64e5-11e6-9b48-42010af0000c") + .with(MySqlConnectorConfig.GTID_SOURCE_START_FROM_LATEST, false) + .build(); + + context = new MySqlTaskContext(config, false); + context.start(); + context.source().setCompletedGtidSet(gtidStr); + + GtidSet mergedGtidSet = context.filterGtidSet(new GtidSet(availableServerGtidStr), new GtidSet(purgedServerGtidStr)); + assertThat(mergedGtidSet).isNotNull(); + GtidSet.UUIDSet uuidSet1 = mergedGtidSet.forServerWithId("036d85a9-64e5-11e6-9b48-42010af0000c"); + GtidSet.UUIDSet uuidSet2 = mergedGtidSet.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004"); + GtidSet.UUIDSet uuidSet3 = mergedGtidSet.forServerWithId("123e4567-e89b-12d3-a456-426655440000"); + GtidSet.UUIDSet uuidSet4 = mergedGtidSet.forServerWithId("7c1de3f2-3fd2-11e6-9cdc-42010af000bc"); + + assertThat(uuidSet1.getIntervals()).isEqualTo(Arrays.asList(new GtidSet.Interval(1, 2))); + assertThat(uuidSet2.getIntervals()).isEqualTo(Arrays.asList(new GtidSet.Interval(1, 1234))); + assertThat(uuidSet3.getIntervals()).isEqualTo(Arrays.asList(new GtidSet.Interval(1, 1))); + assertThat(uuidSet4).isNull(); + } + @Test public void shouldComparePositionsWithDifferentFields() { String lastGtidStr = "01261278-6ade-11e6-b36a-42010af00790:1-400944168,"