Merge pull request #178 from rhauch/dbz-185

DBZ-185 MySQL’s database history now filters GTID sources
This commit is contained in:
Randall Hauch 2017-02-07 13:25:32 -06:00 committed by GitHub
commit 6da2a8a1b2
6 changed files with 163 additions and 33 deletions

View File

@ -10,6 +10,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
@ -21,6 +22,7 @@
import io.debezium.connector.mysql.MySqlConnectorConfig.DecimalHandlingMode; import io.debezium.connector.mysql.MySqlConnectorConfig.DecimalHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.TemporalPrecisionMode; import io.debezium.connector.mysql.MySqlConnectorConfig.TemporalPrecisionMode;
import io.debezium.connector.mysql.MySqlSystemVariables.Scope; import io.debezium.connector.mysql.MySqlSystemVariables.Scope;
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.relational.Table; import io.debezium.relational.Table;
@ -57,8 +59,6 @@
@NotThreadSafe @NotThreadSafe
public class MySqlSchema { public class MySqlSchema {
private static final HistoryRecordComparator HISTORY_COMPARATOR = HistoryRecordComparator.usingPositions(SourceInfo::isPositionAtOrBefore);
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final AvroValidator schemaNameValidator = AvroValidator.create(logger); private final AvroValidator schemaNameValidator = AvroValidator.create(logger);
private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES"); private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES");
@ -70,6 +70,7 @@ public class MySqlSchema {
private final DdlChanges ddlChanges; private final DdlChanges ddlChanges;
private final String serverName; private final String serverName;
private final String schemaPrefix; private final String schemaPrefix;
private final HistoryRecordComparator historyComparator;
private Tables tables; private Tables tables;
/** /**
@ -77,8 +78,11 @@ public class MySqlSchema {
* *
* @param config the connector configuration, which is presumed to be valid * @param config the connector configuration, which is presumed to be valid
* @param serverName the name of the server * @param serverName the name of the server
* @param gtidFilter the predicate function that should be applied to GTID sets in database history, and which
* returns {@code true} if a GTID source is to be included, or {@code false} if a GTID source is to be excluded;
* may be null if not needed
*/ */
public MySqlSchema(Configuration config, String serverName) { public MySqlSchema(Configuration config, String serverName, Predicate<String> gtidFilter) {
this.filters = new Filters(config); this.filters = new Filters(config);
this.ddlParser = new MySqlDdlParser(false); this.ddlParser = new MySqlDdlParser(false);
this.tables = new Tables(); this.tables = new Tables();
@ -116,9 +120,21 @@ public MySqlSchema(Configuration config, String serverName) {
.edit() .edit()
.withDefault(DatabaseHistory.NAME, connectorName + "-dbhistory") .withDefault(DatabaseHistory.NAME, connectorName + "-dbhistory")
.build(); .build();
this.dbHistory.configure(dbHistoryConfig, HISTORY_COMPARATOR); // validates
// Set up a history record comparator that uses the GTID filter ...
this.historyComparator = new HistoryRecordComparator() {
@Override
protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
return SourceInfo.isPositionAtOrBefore(recorded, desired, gtidFilter);
}
};
this.dbHistory.configure(dbHistoryConfig, historyComparator); // validates
} }
protected HistoryRecordComparator historyComparator() {
return this.historyComparator;
}
/** /**
* Start by acquiring resources needed to persist the database history * Start by acquiring resources needed to persist the database history
*/ */

View File

@ -44,16 +44,17 @@ public MySqlTaskContext(Configuration config) {
this.source = new SourceInfo(); this.source = new SourceInfo();
this.source.setServerName(serverName()); this.source.setServerName(serverName());
// Set up the MySQL schema ... // Set up the GTID filter ...
this.dbSchema = new MySqlSchema(config, serverName());
// Set up the record processor ...
this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector);
String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES); String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES);
String gtidSetExcludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES); String gtidSetExcludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES);
this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includes(gtidSetIncludes) this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includes(gtidSetIncludes)
: (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null); : (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null);
// Set up the MySQL schema ...
this.dbSchema = new MySqlSchema(config, serverName(), this.gtidSourceFilter);
// Set up the record processor ...
this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector);
} }
public String connectorName() { public String connectorName() {

View File

@ -7,6 +7,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
@ -594,9 +595,11 @@ public String toString() {
* @param recorded the position obtained from recorded history; never null * @param recorded the position obtained from recorded history; never null
* @param desired the desired position that we want to obtain, which should be after some recorded positions, * @param desired the desired position that we want to obtain, which should be after some recorded positions,
* at some recorded positions, and before other recorded positions; never null * at some recorded positions, and before other recorded positions; never null
* @param gtidFilter the predicate function that will return {@code true} if a GTID source is to be included, or
* {@code false} if a GTID source is to be excluded; may be null if no filtering is to be done
* @return {@code true} if the recorded position is at or before the desired position; or {@code false} otherwise * @return {@code true} if the recorded position is at or before the desired position; or {@code false} otherwise
*/ */
public static boolean isPositionAtOrBefore(Document recorded, Document desired) { public static boolean isPositionAtOrBefore(Document recorded, Document desired, Predicate<String> gtidFilter) {
String recordedGtidSetStr = recorded.getString(GTID_SET_KEY); String recordedGtidSetStr = recorded.getString(GTID_SET_KEY);
String desiredGtidSetStr = desired.getString(GTID_SET_KEY); String desiredGtidSetStr = desired.getString(GTID_SET_KEY);
if (desiredGtidSetStr != null) { if (desiredGtidSetStr != null) {
@ -605,6 +608,11 @@ public static boolean isPositionAtOrBefore(Document recorded, Document desired)
// Both have GTIDs, so base the comparison entirely on the GTID sets. // Both have GTIDs, so base the comparison entirely on the GTID sets.
GtidSet recordedGtidSet = new GtidSet(recordedGtidSetStr); GtidSet recordedGtidSet = new GtidSet(recordedGtidSetStr);
GtidSet desiredGtidSet = new GtidSet(desiredGtidSetStr); GtidSet desiredGtidSet = new GtidSet(desiredGtidSetStr);
if (gtidFilter != null) {
// Apply the GTID source filter before we do any comparisons ...
recordedGtidSet = recordedGtidSet.retainAll(gtidFilter);
desiredGtidSet = desiredGtidSet.retainAll(gtidFilter);
}
if (recordedGtidSet.equals(desiredGtidSet)) { if (recordedGtidSet.equals(desiredGtidSet)) {
// They are exactly the same, which means the recorded position exactly matches the desired ... // They are exactly the same, which means the recorded position exactly matches the desired ...
if (!recorded.has(SNAPSHOT_KEY) && desired.has(SNAPSHOT_KEY)) { if (!recorded.has(SNAPSHOT_KEY) && desired.has(SNAPSHOT_KEY)) {

View File

@ -82,7 +82,7 @@ public Filters createFilters() {
public MySqlSchema createSchemas() { public MySqlSchema createSchemas() {
Configuration config = configBuilder.build(); Configuration config = configBuilder.build();
return new MySqlSchema(config,config.getString(MySqlConnectorConfig.SERVER_NAME)); return new MySqlSchema(config,config.getString(MySqlConnectorConfig.SERVER_NAME), null);
} }
} }

View File

@ -18,7 +18,10 @@
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode; import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.document.Document;
import io.debezium.relational.history.FileDatabaseHistory; import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Testing; import io.debezium.util.Testing;
/** /**
@ -199,17 +202,17 @@ public void shouldNotAllowBothGtidSetIncludesAndExcludes() throws Exception {
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41") "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41")
.build(); .build();
context = new MySqlTaskContext(config); context = new MySqlTaskContext(config);
boolean valid = config.validateAndRecord(MySqlConnectorConfig.ALL_FIELDS,msg->{}); boolean valid = config.validateAndRecord(MySqlConnectorConfig.ALL_FIELDS, msg -> {});
assertThat(valid).isFalse(); assertThat(valid).isFalse();
} }
@Test @Test
public void shouldFilterAndMergeGtidSet() throws Exception { public void shouldFilterAndMergeGtidSet() throws Exception {
String gtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2," String gtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2,"
+ "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:5-41"; + "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:5-41";
String availableServerGtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-20," String availableServerGtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-20,"
+ "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200," + "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200,"
+ "123e4567-e89b-12d3-a456-426655440000:1-41"; + "123e4567-e89b-12d3-a456-426655440000:1-41";
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES, config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c") "036d85a9-64e5-11e6-9b48-42010af0000c")
.build(); .build();
@ -229,4 +232,60 @@ public void shouldFilterAndMergeGtidSet() throws Exception {
assertThat(uuidSet3.getIntervals()).isEqualTo(Arrays.asList(new GtidSet.Interval(1, 41))); assertThat(uuidSet3.getIntervals()).isEqualTo(Arrays.asList(new GtidSet.Interval(1, 41)));
assertThat(uuidSet4).isNull(); assertThat(uuidSet4).isNull();
} }
@Test
public void shouldComparePositionsWithDifferentFields() {
String lastGtidStr = "01261278-6ade-11e6-b36a-42010af00790:1-400944168,"
+ "30efb117-e42a-11e6-ba9e-42010a28002e:1-9,"
+ "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11604379,"
+ "621dc2f6-803b-11e6-acc1-42010af000a4:1-7963838,"
+ "716ec46f-d522-11e5-bb56-0242ac110004:1-35850702,"
+ "c627b2bc-9647-11e6-a886-42010af0044a:1-10426868,"
+ "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648";
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES, "96c2072e-e428-11e6-9590-42010a28002d")
.build();
context = new MySqlTaskContext(config);
context.start();
context.source().setCompletedGtidSet(lastGtidStr);
HistoryRecordComparator comparator = context.dbSchema().historyComparator();
String server = "mysql-server-1";
HistoryRecord rec1 = historyRecord(server, "mysql-bin.000008", 380941551, "01261278-6ade-11e6-b36a-42010af00790:1-378422946,"
+ "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11002284,"
+ "716ec46f-d522-11e5-bb56-0242ac110004:1-34673215,"
+ "96c2072e-e428-11e6-9590-42010a28002d:1-3,"
+ "c627b2bc-9647-11e6-a886-42010af0044a:1-9541144", 0, 0, true);
HistoryRecord rec2 = historyRecord(server, "mysql-bin.000016", 645115324, "01261278-6ade-11e6-b36a-42010af00790:1-400944168,"
+ "30efb117-e42a-11e6-ba9e-42010a28002e:1-9,"
+ "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11604379,"
+ "621dc2f6-803b-11e6-acc1-42010af000a4:1-7963838,"
+ "716ec46f-d522-11e5-bb56-0242ac110004:1-35850702,"
+ "c627b2bc-9647-11e6-a886-42010af0044a:1-10426868,"
+ "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648", 2, 1, false);
assertThat(comparator.isAtOrBefore(rec1, rec2)).isTrue();
assertThat(comparator.isAtOrBefore(rec2, rec1)).isFalse();
}
protected HistoryRecord historyRecord(String serverName, String binlogFilename, int position, String gtids,
int event, int row, boolean snapshot) {
Document source = Document.create(SourceInfo.SERVER_NAME_KEY, serverName);
Document pos = Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, binlogFilename,
SourceInfo.BINLOG_POSITION_OFFSET_KEY, position);
if (row >= 0) {
pos = pos.set(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row);
}
if (event >= 0) {
pos = pos.set(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event);
}
if (gtids != null && gtids.trim().length() != 0) {
pos = pos.set(SourceInfo.GTID_SET_KEY, gtids);
}
if (snapshot) {
pos = pos.set(SourceInfo.SNAPSHOT_KEY, true);
}
return new HistoryRecord(Document.create(HistoryRecord.Fields.SOURCE, source,
HistoryRecord.Fields.POSITION, pos));
}
} }

View File

@ -7,8 +7,11 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -381,12 +384,12 @@ protected void handleNextEvent(long positionOfEvent, long eventSize, int rowCoun
} }
Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY); Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY);
if (rowsToSkip == null) rowsToSkip = 0L; if (rowsToSkip == null) rowsToSkip = 0L;
if( (row+1) == rowCount) { if ((row + 1) == rowCount) {
// This is the last row, so the next binlog position should be the number of rows in the event ... // This is the last row, so the next binlog position should be the number of rows in the event ...
assertThat(rowsToSkip).isEqualTo(rowCount); assertThat(rowsToSkip).isEqualTo(rowCount);
} else { } else {
// This is not the last row, so the next binlog position should be the row number ... // This is not the last row, so the next binlog position should be the row number ...
assertThat(rowsToSkip).isEqualTo(row+1); assertThat(rowsToSkip).isEqualTo(row + 1);
} }
// Get the source struct for this row (always second), which should always reflect this row in this event ... // Get the source struct for this row (always second), which should always reflect this row in this event ...
Struct recordSource = source.struct(); Struct recordSource = source.struct();
@ -515,6 +518,25 @@ public void shouldComparePositionsWithoutGtids() {
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 1, 0)); assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 1, 0));
} }
@Test
public void shouldComparePositionsWithDifferentFields() {
Document history = positionWith("mysql-bin.000008", 380941551, "01261278-6ade-11e6-b36a-42010af00790:1-378422946,"
+ "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11002284,"
+ "716ec46f-d522-11e5-bb56-0242ac110004:1-34673215,"
+ "96c2072e-e428-11e6-9590-42010a28002d:1-3,"
+ "c627b2bc-9647-11e6-a886-42010af0044a:1-9541144", 0, 0, true);
Document current = positionWith("mysql-bin.000016", 645115324, "01261278-6ade-11e6-b36a-42010af00790:1-400944168,"
+ "30efb117-e42a-11e6-ba9e-42010a28002e:1-9,"
+ "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11604379,"
+ "621dc2f6-803b-11e6-acc1-42010af000a4:1-7963838,"
+ "716ec46f-d522-11e5-bb56-0242ac110004:1-35850702,"
+ "c627b2bc-9647-11e6-a886-42010af0044a:1-10426868,"
+ "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648", 2, 1, false);
assertThatDocument(current).isAfter(history);
Set<String> excludes = Collections.singleton("96c2072e-e428-11e6-9590-42010a28002d");
assertThatDocument(history).isAtOrBefore(current, (uuid) -> !excludes.contains(uuid));
}
@FixFor("DBZ-107") @FixFor("DBZ-107")
@Test @Test
public void shouldRemoveNewlinesFromGtidSet() { public void shouldRemoveNewlinesFromGtidSet() {
@ -558,17 +580,25 @@ protected Document positionWithoutGtids(String filename, int position, int event
} }
protected Document positionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) { protected Document positionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) {
if (snapshot) { return positionWith(filename, position, null, event, row, snapshot);
return Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename, }
SourceInfo.BINLOG_POSITION_OFFSET_KEY, position,
SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row, protected Document positionWith(String filename, int position, String gtids, int event, int row, boolean snapshot) {
SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event, Document pos = Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename,
SourceInfo.SNAPSHOT_KEY, true); SourceInfo.BINLOG_POSITION_OFFSET_KEY, position);
if (row >= 0) {
pos = pos.set(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row);
} }
return Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename, if (event >= 0) {
SourceInfo.BINLOG_POSITION_OFFSET_KEY, position, pos = pos.set(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event);
SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row, }
SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event); if (gtids != null && gtids.trim().length() != 0) {
pos = pos.set(SourceInfo.GTID_SET_KEY, gtids);
}
if (snapshot) {
pos = pos.set(SourceInfo.SNAPSHOT_KEY, true);
}
return pos;
} }
protected PositionAssert assertThatDocument(Document position) { protected PositionAssert assertThatDocument(Document position) {
@ -597,23 +627,39 @@ public PositionAssert(Document position) {
} }
public PositionAssert isAt(Document otherPosition) { public PositionAssert isAt(Document otherPosition) {
if (SourceInfo.isPositionAtOrBefore(actual, otherPosition)) return this; return isAt(otherPosition, null);
}
public PositionAssert isAt(Document otherPosition, Predicate<String> gtidFilter) {
if (SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) return this;
failIfCustomMessageIsSet(); failIfCustomMessageIsSet();
throw failure(actual + " should be consider same position as " + otherPosition); throw failure(actual + " should be consider same position as " + otherPosition);
} }
public PositionAssert isBefore(Document otherPosition) { public PositionAssert isBefore(Document otherPosition) {
return isAtOrBefore(otherPosition); return isBefore(otherPosition, null);
}
public PositionAssert isBefore(Document otherPosition, Predicate<String> gtidFilter) {
return isAtOrBefore(otherPosition, gtidFilter);
} }
public PositionAssert isAtOrBefore(Document otherPosition) { public PositionAssert isAtOrBefore(Document otherPosition) {
if (SourceInfo.isPositionAtOrBefore(actual, otherPosition)) return this; return isAtOrBefore(otherPosition, null);
}
public PositionAssert isAtOrBefore(Document otherPosition, Predicate<String> gtidFilter) {
if (SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) return this;
failIfCustomMessageIsSet(); failIfCustomMessageIsSet();
throw failure(actual + " should be consider same position as or before " + otherPosition); throw failure(actual + " should be consider same position as or before " + otherPosition);
} }
public PositionAssert isAfter(Document otherPosition) { public PositionAssert isAfter(Document otherPosition) {
if (!SourceInfo.isPositionAtOrBefore(actual, otherPosition)) return this; return isAfter(otherPosition, null);
}
public PositionAssert isAfter(Document otherPosition, Predicate<String> gtidFilter) {
if (!SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) return this;
failIfCustomMessageIsSet(); failIfCustomMessageIsSet();
throw failure(actual + " should be consider after " + otherPosition); throw failure(actual + " should be consider after " + otherPosition);
} }