Merge pull request #65 from rhauch/dbz-79
DBZ-79 Changed public methods in GtidSet to reflect the MySQL Binary Log Connector's class
This commit is contained in:
commit
83c44ba046
@ -5,10 +5,12 @@
|
|||||||
*/
|
*/
|
||||||
package io.debezium.connector.mysql;
|
package io.debezium.connector.mysql;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
@ -23,7 +25,6 @@
|
|||||||
@Immutable
|
@Immutable
|
||||||
public final class GtidSet {
|
public final class GtidSet {
|
||||||
|
|
||||||
private final String orderedString;
|
|
||||||
private final Map<String, UUIDSet> uuidSetsByServerId = new TreeMap<>(); // sorts on keys
|
private final Map<String, UUIDSet> uuidSetsByServerId = new TreeMap<>(); // sorts on keys
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -38,7 +39,6 @@ public GtidSet(String gtids) {
|
|||||||
if (sb.length() != 0) sb.append(',');
|
if (sb.length() != 0) sb.append(',');
|
||||||
sb.append(uuidSet.toString());
|
sb.append(uuidSet.toString());
|
||||||
});
|
});
|
||||||
orderedString = sb.toString();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -67,19 +67,19 @@ public UUIDSet forServerWithId(String uuid) {
|
|||||||
* @return {@code true} if all of the GTIDs in this set are completely contained within the supplied set of GTIDs, or
|
* @return {@code true} if all of the GTIDs in this set are completely contained within the supplied set of GTIDs, or
|
||||||
* {@code false} otherwise
|
* {@code false} otherwise
|
||||||
*/
|
*/
|
||||||
public boolean isSubsetOf(GtidSet other) {
|
public boolean isContainedWithin(GtidSet other) {
|
||||||
if (other == null) return false;
|
if (other == null) return false;
|
||||||
if (this.equals(other)) return true;
|
if (this.equals(other)) return true;
|
||||||
for (UUIDSet uuidSet : uuidSetsByServerId.values()) {
|
for (UUIDSet uuidSet : uuidSetsByServerId.values()) {
|
||||||
UUIDSet thatSet = other.forServerWithId(uuidSet.getUUID());
|
UUIDSet thatSet = other.forServerWithId(uuidSet.getUUID());
|
||||||
if (!uuidSet.isSubsetOf(thatSet)) return false;
|
if (!uuidSet.isContainedWithin(thatSet)) return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return orderedString.hashCode();
|
return uuidSetsByServerId.keySet().hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -87,14 +87,18 @@ public boolean equals(Object obj) {
|
|||||||
if (obj == this) return true;
|
if (obj == this) return true;
|
||||||
if (obj instanceof GtidSet) {
|
if (obj instanceof GtidSet) {
|
||||||
GtidSet that = (GtidSet) obj;
|
GtidSet that = (GtidSet) obj;
|
||||||
return this.orderedString.equalsIgnoreCase(that.orderedString);
|
return this.uuidSetsByServerId.equals(that.uuidSetsByServerId);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return orderedString;
|
List<String> gtids = new ArrayList<String>();
|
||||||
|
for (UUIDSet uuidSet : uuidSetsByServerId.values()) {
|
||||||
|
gtids.add(uuidSet.toString());
|
||||||
|
}
|
||||||
|
return String.join(",",gtids);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -139,37 +143,8 @@ public String getUUID() {
|
|||||||
*
|
*
|
||||||
* @return the immutable transaction intervals; never null
|
* @return the immutable transaction intervals; never null
|
||||||
*/
|
*/
|
||||||
public Collection<Interval> getIntervals() {
|
public List<Interval> getIntervals() {
|
||||||
return Collections.unmodifiableCollection(intervals);
|
return Collections.unmodifiableList(intervals);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the first interval of transaction numbers for this server.
|
|
||||||
*
|
|
||||||
* @return the first interval, or {@code null} if there is none
|
|
||||||
*/
|
|
||||||
public Interval getFirstInterval() {
|
|
||||||
return intervals.isEmpty() ? null : intervals.getFirst();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the last interval of transaction numbers for this server.
|
|
||||||
*
|
|
||||||
* @return the last interval, or {@code null} if there is none
|
|
||||||
*/
|
|
||||||
public Interval getLastInterval() {
|
|
||||||
return intervals.isEmpty() ? null : intervals.getLast();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the interval that contains the full range (and possibly more) of all of the individual intervals for this server.
|
|
||||||
*
|
|
||||||
* @return the complete interval comprised of the {@link Interval#getStart() start} of the {@link #getFirstInterval()
|
|
||||||
* first interval} and the {@link Interval#getEnd() end} of the {@link #getLastInterval()}, or {@code null} if
|
|
||||||
* this server has no intervals at all
|
|
||||||
*/
|
|
||||||
public Interval getCompleteInterval() {
|
|
||||||
return intervals.isEmpty() ? null : new Interval(getFirstInterval().getStart(), getLastInterval().getEnd());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -180,7 +155,7 @@ public Interval getCompleteInterval() {
|
|||||||
* @return {@code true} if this server's transaction numbers are a subset of the transaction numbers of the supplied set,
|
* @return {@code true} if this server's transaction numbers are a subset of the transaction numbers of the supplied set,
|
||||||
* or false otherwise
|
* or false otherwise
|
||||||
*/
|
*/
|
||||||
public boolean isSubsetOf(UUIDSet other) {
|
public boolean isContainedWithin(UUIDSet other) {
|
||||||
if (other == null) return false;
|
if (other == null) return false;
|
||||||
if (!this.getUUID().equalsIgnoreCase(other.getUUID())) {
|
if (!this.getUUID().equalsIgnoreCase(other.getUUID())) {
|
||||||
// Not even the same server ...
|
// Not even the same server ...
|
||||||
@ -195,7 +170,7 @@ public boolean isSubsetOf(UUIDSet other) {
|
|||||||
for (Interval thisInterval : this.intervals) {
|
for (Interval thisInterval : this.intervals) {
|
||||||
boolean found = false;
|
boolean found = false;
|
||||||
for (Interval otherInterval : other.intervals) {
|
for (Interval otherInterval : other.intervals) {
|
||||||
if (thisInterval.isSubsetOf(otherInterval)) {
|
if (thisInterval.isContainedWithin(otherInterval)) {
|
||||||
found = true;
|
found = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -270,7 +245,7 @@ public long getEnd() {
|
|||||||
* {@link #getStart() start} and the {@link #getEnd() end} is less than or equal to the supplied interval's
|
* {@link #getStart() start} and the {@link #getEnd() end} is less than or equal to the supplied interval's
|
||||||
* {@link #getEnd() end}, or {@code false} otherwise
|
* {@link #getEnd() end}, or {@code false} otherwise
|
||||||
*/
|
*/
|
||||||
public boolean isSubsetOf(Interval other) {
|
public boolean isContainedWithin(Interval other) {
|
||||||
if (other == this) return true;
|
if (other == this) return true;
|
||||||
if (other == null) return false;
|
if (other == null) return false;
|
||||||
return this.getStart() >= other.getStart() && this.getEnd() <= other.getEnd();
|
return this.getStart() >= other.getStart() && this.getEnd() <= other.getEnd();
|
||||||
@ -293,8 +268,8 @@ public int hashCode() {
|
|||||||
@Override
|
@Override
|
||||||
public boolean equals(Object obj) {
|
public boolean equals(Object obj) {
|
||||||
if (this == obj) return true;
|
if (this == obj) return true;
|
||||||
if (obj instanceof com.github.shyiko.mysql.binlog.GtidSet.Interval) {
|
if (obj instanceof Interval) {
|
||||||
com.github.shyiko.mysql.binlog.GtidSet.Interval that = (com.github.shyiko.mysql.binlog.GtidSet.Interval) obj;
|
Interval that = (Interval) obj;
|
||||||
return this.getStart() == that.getStart() && this.getEnd() == that.getEnd();
|
return this.getStart() == that.getStart() && this.getEnd() == that.getEnd();
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
@ -302,7 +277,7 @@ public boolean equals(Object obj) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getStart() == getEnd() ? Long.toString(getStart()) : "" + getStart() + "-" + getEnd();
|
return "" + getStart() + "-" + getEnd();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -194,7 +194,7 @@ protected boolean isBinlogAvailable() {
|
|||||||
// GTIDs are enabled, and we used them previously ...
|
// GTIDs are enabled, and we used them previously ...
|
||||||
GtidSet gtidSet = new GtidSet(gtidStr);
|
GtidSet gtidSet = new GtidSet(gtidStr);
|
||||||
GtidSet availableGtidSet = new GtidSet(knownGtidSet());
|
GtidSet availableGtidSet = new GtidSet(knownGtidSet());
|
||||||
if ( gtidSet.isSubsetOf(availableGtidSet)) {
|
if ( gtidSet.isContainedWithin(availableGtidSet)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
logger.info("Connector last known GTIDs are {}, but MySQL has {}",gtidSet,availableGtidSet);
|
logger.info("Connector last known GTIDs are {}, but MySQL has {}",gtidSet,availableGtidSet);
|
||||||
|
@ -503,7 +503,7 @@ public static boolean isPositionAtOrBefore(Document recorded, Document desired)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
// The GTIDs are not an exact match, so figure out if recorded is a subset of the desired ...
|
// The GTIDs are not an exact match, so figure out if recorded is a subset of the desired ...
|
||||||
return recordedGtidSet.isSubsetOf(desiredGtidSet);
|
return recordedGtidSet.isContainedWithin(desiredGtidSet);
|
||||||
}
|
}
|
||||||
// The desired position did use GTIDs while the recorded did not use GTIDs. So, we assume that the
|
// The desired position did use GTIDs while the recorded did not use GTIDs. So, we assume that the
|
||||||
// recorded position is older since GTIDs are often enabled but rarely disabled. And if they are disabled,
|
// recorded position is older since GTIDs are often enabled but rarely disabled. And if they are disabled,
|
||||||
|
@ -5,6 +5,8 @@
|
|||||||
*/
|
*/
|
||||||
package io.debezium.connector.mysql;
|
package io.debezium.connector.mysql;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.fest.assertions.Assertions.assertThat;
|
import static org.fest.assertions.Assertions.assertThat;
|
||||||
@ -84,14 +86,14 @@ protected void asertIntervalExists( String uuid, int start, int end) {
|
|||||||
|
|
||||||
protected void asertFirstInterval( String uuid, int start, int end) {
|
protected void asertFirstInterval( String uuid, int start, int end) {
|
||||||
UUIDSet set = gtids.forServerWithId(uuid);
|
UUIDSet set = gtids.forServerWithId(uuid);
|
||||||
Interval interval = set.getFirstInterval();
|
Interval interval = set.getIntervals().iterator().next();
|
||||||
assertThat(interval.getStart()).isEqualTo(start);
|
assertThat(interval.getStart()).isEqualTo(start);
|
||||||
assertThat(interval.getEnd()).isEqualTo(end);
|
assertThat(interval.getEnd()).isEqualTo(end);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void asertLastInterval( String uuid, int start, int end) {
|
protected void asertLastInterval( String uuid, int start, int end) {
|
||||||
UUIDSet set = gtids.forServerWithId(uuid);
|
UUIDSet set = gtids.forServerWithId(uuid);
|
||||||
Interval interval = set.getLastInterval();
|
Interval interval = new LinkedList<>(set.getIntervals()).getLast();
|
||||||
assertThat(interval.getStart()).isEqualTo(start);
|
assertThat(interval.getStart()).isEqualTo(start);
|
||||||
assertThat(interval.getEnd()).isEqualTo(end);
|
assertThat(interval.getEnd()).isEqualTo(end);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user