From 403fee1375df7874728cdab1b7e11816b8feb979 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Tue, 7 Feb 2017 11:10:49 -0600 Subject: [PATCH] =?UTF-8?q?DBZ-185=20MySQL=E2=80=99s=20database=20history?= =?UTF-8?q?=20now=20filters=20GTID=20sources?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Corrects how the MySQL connector reloads database history to take into account the included and excluded GTID sources. This only affects a connector configured to capture changes from _multiple_ MySQL database servers when GTID sources are explicitly excluded or included. --- .../debezium/connector/mysql/MySqlSchema.java | 26 +++++-- .../connector/mysql/MySqlTaskContext.java | 13 ++-- .../debezium/connector/mysql/SourceInfo.java | 10 ++- .../connector/mysql/Configurator.java | 2 +- .../connector/mysql/MySqlTaskContextTest.java | 67 +++++++++++++++- .../connector/mysql/SourceInfoTest.java | 78 +++++++++++++++---- 6 files changed, 163 insertions(+), 33 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index 0c81712f9..990bf1e5a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.function.Predicate; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; @@ -21,6 +22,7 @@ import io.debezium.connector.mysql.MySqlConnectorConfig.DecimalHandlingMode; import io.debezium.connector.mysql.MySqlConnectorConfig.TemporalPrecisionMode; import io.debezium.connector.mysql.MySqlSystemVariables.Scope; +import io.debezium.document.Document; import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.relational.Table; @@ -57,8 +59,6 @@ @NotThreadSafe public class MySqlSchema { - private static final HistoryRecordComparator HISTORY_COMPARATOR = HistoryRecordComparator.usingPositions(SourceInfo::isPositionAtOrBefore); - private final Logger logger = LoggerFactory.getLogger(getClass()); private final AvroValidator schemaNameValidator = AvroValidator.create(logger); private final Set ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES"); @@ -70,6 +70,7 @@ public class MySqlSchema { private final DdlChanges ddlChanges; private final String serverName; private final String schemaPrefix; + private final HistoryRecordComparator historyComparator; private Tables tables; /** @@ -77,8 +78,11 @@ public class MySqlSchema { * * @param config the connector configuration, which is presumed to be valid * @param serverName the name of the server + * @param gtidFilter the predicate function that should be applied to GTID sets in database history, and which + * returns {@code true} if a GTID source is to be included, or {@code false} if a GTID source is to be excluded; + * may be null if not needed */ - public MySqlSchema(Configuration config, String serverName) { + public MySqlSchema(Configuration config, String serverName, Predicate gtidFilter) { this.filters = new Filters(config); this.ddlParser = new MySqlDdlParser(false); this.tables = new Tables(); @@ -116,9 +120,21 @@ public MySqlSchema(Configuration config, String serverName) { .edit() .withDefault(DatabaseHistory.NAME, connectorName + "-dbhistory") .build(); - this.dbHistory.configure(dbHistoryConfig, HISTORY_COMPARATOR); // validates + + // Set up a history record comparator that uses the GTID filter ... + this.historyComparator = new HistoryRecordComparator() { + @Override + protected boolean isPositionAtOrBefore(Document recorded, Document desired) { + return SourceInfo.isPositionAtOrBefore(recorded, desired, gtidFilter); + } + }; + this.dbHistory.configure(dbHistoryConfig, historyComparator); // validates } - + + protected HistoryRecordComparator historyComparator() { + return this.historyComparator; + } + /** * Start by acquiring resources needed to persist the database history */ diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index 3a8419da5..4f4c4b7dc 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -44,16 +44,17 @@ public MySqlTaskContext(Configuration config) { this.source = new SourceInfo(); this.source.setServerName(serverName()); - // Set up the MySQL schema ... - this.dbSchema = new MySqlSchema(config, serverName()); - - // Set up the record processor ... - this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector); - + // Set up the GTID filter ... String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES); String gtidSetExcludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES); this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includes(gtidSetIncludes) : (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null); + + // Set up the MySQL schema ... + this.dbSchema = new MySqlSchema(config, serverName(), this.gtidSourceFilter); + + // Set up the record processor ... + this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector); } public String connectorName() { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java index 6bcdc0da0..90cb00e07 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.function.Predicate; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -594,9 +595,11 @@ public String toString() { * @param recorded the position obtained from recorded history; never null * @param desired the desired position that we want to obtain, which should be after some recorded positions, * at some recorded positions, and before other recorded positions; never null + * @param gtidFilter the predicate function that will return {@code true} if a GTID source is to be included, or + * {@code false} if a GTID source is to be excluded; may be null if no filtering is to be done * @return {@code true} if the recorded position is at or before the desired position; or {@code false} otherwise */ - public static boolean isPositionAtOrBefore(Document recorded, Document desired) { + public static boolean isPositionAtOrBefore(Document recorded, Document desired, Predicate gtidFilter) { String recordedGtidSetStr = recorded.getString(GTID_SET_KEY); String desiredGtidSetStr = desired.getString(GTID_SET_KEY); if (desiredGtidSetStr != null) { @@ -605,6 +608,11 @@ public static boolean isPositionAtOrBefore(Document recorded, Document desired) // Both have GTIDs, so base the comparison entirely on the GTID sets. GtidSet recordedGtidSet = new GtidSet(recordedGtidSetStr); GtidSet desiredGtidSet = new GtidSet(desiredGtidSetStr); + if (gtidFilter != null) { + // Apply the GTID source filter before we do any comparisons ... + recordedGtidSet = recordedGtidSet.retainAll(gtidFilter); + desiredGtidSet = desiredGtidSet.retainAll(gtidFilter); + } if (recordedGtidSet.equals(desiredGtidSet)) { // They are exactly the same, which means the recorded position exactly matches the desired ... if (!recorded.has(SNAPSHOT_KEY) && desired.has(SNAPSHOT_KEY)) { diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/Configurator.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/Configurator.java index 69d3b0bba..3bdb98289 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/Configurator.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/Configurator.java @@ -82,7 +82,7 @@ public Filters createFilters() { public MySqlSchema createSchemas() { Configuration config = configBuilder.build(); - return new MySqlSchema(config,config.getString(MySqlConnectorConfig.SERVER_NAME)); + return new MySqlSchema(config,config.getString(MySqlConnectorConfig.SERVER_NAME), null); } } \ No newline at end of file diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java index 5bd868d56..af2835413 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java @@ -18,7 +18,10 @@ import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; +import io.debezium.document.Document; import io.debezium.relational.history.FileDatabaseHistory; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecordComparator; import io.debezium.util.Testing; /** @@ -199,17 +202,17 @@ public void shouldNotAllowBothGtidSetIncludesAndExcludes() throws Exception { "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41") .build(); context = new MySqlTaskContext(config); - boolean valid = config.validateAndRecord(MySqlConnectorConfig.ALL_FIELDS,msg->{}); + boolean valid = config.validateAndRecord(MySqlConnectorConfig.ALL_FIELDS, msg -> {}); assertThat(valid).isFalse(); } @Test public void shouldFilterAndMergeGtidSet() throws Exception { String gtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2," - + "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:5-41"; + + "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:5-41"; String availableServerGtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-20," - + "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200," - + "123e4567-e89b-12d3-a456-426655440000:1-41"; + + "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200," + + "123e4567-e89b-12d3-a456-426655440000:1-41"; config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES, "036d85a9-64e5-11e6-9b48-42010af0000c") .build(); @@ -229,4 +232,60 @@ public void shouldFilterAndMergeGtidSet() throws Exception { assertThat(uuidSet3.getIntervals()).isEqualTo(Arrays.asList(new GtidSet.Interval(1, 41))); assertThat(uuidSet4).isNull(); } + + @Test + public void shouldComparePositionsWithDifferentFields() { + String lastGtidStr = "01261278-6ade-11e6-b36a-42010af00790:1-400944168," + + "30efb117-e42a-11e6-ba9e-42010a28002e:1-9," + + "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11604379," + + "621dc2f6-803b-11e6-acc1-42010af000a4:1-7963838," + + "716ec46f-d522-11e5-bb56-0242ac110004:1-35850702," + + "c627b2bc-9647-11e6-a886-42010af0044a:1-10426868," + + "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648"; + config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES, "96c2072e-e428-11e6-9590-42010a28002d") + .build(); + context = new MySqlTaskContext(config); + context.start(); + context.source().setCompletedGtidSet(lastGtidStr); + HistoryRecordComparator comparator = context.dbSchema().historyComparator(); + + String server = "mysql-server-1"; + HistoryRecord rec1 = historyRecord(server, "mysql-bin.000008", 380941551, "01261278-6ade-11e6-b36a-42010af00790:1-378422946," + + "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11002284," + + "716ec46f-d522-11e5-bb56-0242ac110004:1-34673215," + + "96c2072e-e428-11e6-9590-42010a28002d:1-3," + + "c627b2bc-9647-11e6-a886-42010af0044a:1-9541144", 0, 0, true); + HistoryRecord rec2 = historyRecord(server, "mysql-bin.000016", 645115324, "01261278-6ade-11e6-b36a-42010af00790:1-400944168," + + "30efb117-e42a-11e6-ba9e-42010a28002e:1-9," + + "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11604379," + + "621dc2f6-803b-11e6-acc1-42010af000a4:1-7963838," + + "716ec46f-d522-11e5-bb56-0242ac110004:1-35850702," + + "c627b2bc-9647-11e6-a886-42010af0044a:1-10426868," + + "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648", 2, 1, false); + + assertThat(comparator.isAtOrBefore(rec1, rec2)).isTrue(); + assertThat(comparator.isAtOrBefore(rec2, rec1)).isFalse(); + } + + protected HistoryRecord historyRecord(String serverName, String binlogFilename, int position, String gtids, + int event, int row, boolean snapshot) { + Document source = Document.create(SourceInfo.SERVER_NAME_KEY, serverName); + Document pos = Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, binlogFilename, + SourceInfo.BINLOG_POSITION_OFFSET_KEY, position); + if (row >= 0) { + pos = pos.set(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row); + } + if (event >= 0) { + pos = pos.set(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event); + } + if (gtids != null && gtids.trim().length() != 0) { + pos = pos.set(SourceInfo.GTID_SET_KEY, gtids); + } + if (snapshot) { + pos = pos.set(SourceInfo.SNAPSHOT_KEY, true); + } + return new HistoryRecord(Document.create(HistoryRecord.Fields.SOURCE, source, + HistoryRecord.Fields.POSITION, pos)); + } + } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java index 509c91146..b1842ef68 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java @@ -7,8 +7,11 @@ import static org.junit.Assert.assertTrue; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; import org.apache.avro.Schema; import org.apache.kafka.connect.data.Struct; @@ -381,12 +384,12 @@ protected void handleNextEvent(long positionOfEvent, long eventSize, int rowCoun } Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY); if (rowsToSkip == null) rowsToSkip = 0L; - if( (row+1) == rowCount) { + if ((row + 1) == rowCount) { // This is the last row, so the next binlog position should be the number of rows in the event ... assertThat(rowsToSkip).isEqualTo(rowCount); } else { // This is not the last row, so the next binlog position should be the row number ... - assertThat(rowsToSkip).isEqualTo(row+1); + assertThat(rowsToSkip).isEqualTo(row + 1); } // Get the source struct for this row (always second), which should always reflect this row in this event ... Struct recordSource = source.struct(); @@ -515,6 +518,25 @@ public void shouldComparePositionsWithoutGtids() { assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 1, 0)); } + @Test + public void shouldComparePositionsWithDifferentFields() { + Document history = positionWith("mysql-bin.000008", 380941551, "01261278-6ade-11e6-b36a-42010af00790:1-378422946," + + "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11002284," + + "716ec46f-d522-11e5-bb56-0242ac110004:1-34673215," + + "96c2072e-e428-11e6-9590-42010a28002d:1-3," + + "c627b2bc-9647-11e6-a886-42010af0044a:1-9541144", 0, 0, true); + Document current = positionWith("mysql-bin.000016", 645115324, "01261278-6ade-11e6-b36a-42010af00790:1-400944168," + + "30efb117-e42a-11e6-ba9e-42010a28002e:1-9," + + "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11604379," + + "621dc2f6-803b-11e6-acc1-42010af000a4:1-7963838," + + "716ec46f-d522-11e5-bb56-0242ac110004:1-35850702," + + "c627b2bc-9647-11e6-a886-42010af0044a:1-10426868," + + "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648", 2, 1, false); + assertThatDocument(current).isAfter(history); + Set excludes = Collections.singleton("96c2072e-e428-11e6-9590-42010a28002d"); + assertThatDocument(history).isAtOrBefore(current, (uuid) -> !excludes.contains(uuid)); + } + @FixFor("DBZ-107") @Test public void shouldRemoveNewlinesFromGtidSet() { @@ -558,17 +580,25 @@ protected Document positionWithoutGtids(String filename, int position, int event } protected Document positionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) { - if (snapshot) { - return Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename, - SourceInfo.BINLOG_POSITION_OFFSET_KEY, position, - SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row, - SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event, - SourceInfo.SNAPSHOT_KEY, true); + return positionWith(filename, position, null, event, row, snapshot); + } + + protected Document positionWith(String filename, int position, String gtids, int event, int row, boolean snapshot) { + Document pos = Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename, + SourceInfo.BINLOG_POSITION_OFFSET_KEY, position); + if (row >= 0) { + pos = pos.set(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row); } - return Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename, - SourceInfo.BINLOG_POSITION_OFFSET_KEY, position, - SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row, - SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event); + if (event >= 0) { + pos = pos.set(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event); + } + if (gtids != null && gtids.trim().length() != 0) { + pos = pos.set(SourceInfo.GTID_SET_KEY, gtids); + } + if (snapshot) { + pos = pos.set(SourceInfo.SNAPSHOT_KEY, true); + } + return pos; } protected PositionAssert assertThatDocument(Document position) { @@ -597,23 +627,39 @@ public PositionAssert(Document position) { } public PositionAssert isAt(Document otherPosition) { - if (SourceInfo.isPositionAtOrBefore(actual, otherPosition)) return this; + return isAt(otherPosition, null); + } + + public PositionAssert isAt(Document otherPosition, Predicate gtidFilter) { + if (SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) return this; failIfCustomMessageIsSet(); throw failure(actual + " should be consider same position as " + otherPosition); } public PositionAssert isBefore(Document otherPosition) { - return isAtOrBefore(otherPosition); + return isBefore(otherPosition, null); + } + + public PositionAssert isBefore(Document otherPosition, Predicate gtidFilter) { + return isAtOrBefore(otherPosition, gtidFilter); } public PositionAssert isAtOrBefore(Document otherPosition) { - if (SourceInfo.isPositionAtOrBefore(actual, otherPosition)) return this; + return isAtOrBefore(otherPosition, null); + } + + public PositionAssert isAtOrBefore(Document otherPosition, Predicate gtidFilter) { + if (SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) return this; failIfCustomMessageIsSet(); throw failure(actual + " should be consider same position as or before " + otherPosition); } public PositionAssert isAfter(Document otherPosition) { - if (!SourceInfo.isPositionAtOrBefore(actual, otherPosition)) return this; + return isAfter(otherPosition, null); + } + + public PositionAssert isAfter(Document otherPosition, Predicate gtidFilter) { + if (!SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) return this; failIfCustomMessageIsSet(); throw failure(actual + " should be consider after " + otherPosition); }