DBZ-5439 Add transaction id associate with commit scn

This commit is contained in:
thangdc94 2022-08-19 00:22:34 +07:00 committed by Jiri Pechanec
parent 722b1fe187
commit cfa8b780ed

View File

@ -9,10 +9,12 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import io.debezium.util.Strings;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -92,7 +94,9 @@ public Scn getCommitScnForRedoThread(int thread) {
public boolean hasCommitAlreadyBeenHandled(LogMinerEventRow row) { public boolean hasCommitAlreadyBeenHandled(LogMinerEventRow row) {
final RedoThreadCommitScn commitScn = redoThreadCommitScns.get(row.getThread()); final RedoThreadCommitScn commitScn = redoThreadCommitScns.get(row.getThread());
if (commitScn != null) { if (commitScn != null) {
return commitScn.getCommitScn().compareTo(row.getScn()) >= 0; Set<String> txIds = commitScn.getTxIds();
return commitScn.getCommitScn().compareTo(row.getScn()) > 0 ||
(commitScn.getCommitScn().compareTo(row.getScn()) == 0 && txIds.contains(row.getTransactionId()));
} }
return false; return false;
} }
@ -105,6 +109,16 @@ public boolean hasCommitAlreadyBeenHandled(LogMinerEventRow row) {
public void recordCommit(LogMinerEventRow row) { public void recordCommit(LogMinerEventRow row) {
final RedoThreadCommitScn redoCommitScn = redoThreadCommitScns.get(row.getThread()); final RedoThreadCommitScn redoCommitScn = redoThreadCommitScns.get(row.getThread());
if (redoCommitScn != null) { if (redoCommitScn != null) {
Scn prevCommitScn = redoCommitScn.getCommitScn();
if (Objects.equals(prevCommitScn, row.getScn())) {
Set<String> txIds = redoCommitScn.getTxIds();
txIds.add(row.getTransactionId());
}
else {
Set<String> txIds = new HashSet<>();
txIds.add(row.getTransactionId());
redoCommitScn.setTxIds(txIds);
}
redoCommitScn.setCommitScn(row.getScn()); redoCommitScn.setCommitScn(row.getScn());
redoCommitScn.setRsId(row.getRsId()); redoCommitScn.setRsId(row.getRsId());
redoCommitScn.setSsn(row.getSsn()); redoCommitScn.setSsn(row.getSsn());
@ -224,7 +238,7 @@ public static CommitScn valueOf(String value) {
public static CommitScn valueOf(Long value) { public static CommitScn valueOf(Long value) {
final Set<RedoThreadCommitScn> scns = new HashSet<>(); final Set<RedoThreadCommitScn> scns = new HashSet<>();
if (value != null) { if (value != null) {
scns.add(new RedoThreadCommitScn(1, Scn.valueOf(value), null, 0)); scns.add(new RedoThreadCommitScn(1, Scn.valueOf(value), null, 0, new HashSet<>()));
} }
return new CommitScn(scns); return new CommitScn(scns);
} }
@ -277,20 +291,22 @@ public static class RedoThreadCommitScn {
private Scn commitScn; private Scn commitScn;
private String rsId; private String rsId;
private int ssn; private int ssn;
private Set<String> txIds;
public RedoThreadCommitScn(int thread) { public RedoThreadCommitScn(int thread) {
this(thread, Scn.NULL, null, 0); this(thread, Scn.NULL, null, 0, new HashSet<>());
} }
public RedoThreadCommitScn(LogMinerEventRow row) { public RedoThreadCommitScn(LogMinerEventRow row) {
this(row.getThread(), row.getScn(), row.getRsId(), row.getSsn()); this(row.getThread(), row.getScn(), row.getRsId(), row.getSsn(), new HashSet<>());
} }
public RedoThreadCommitScn(int thread, Scn commitScn, String rsId, int ssn) { public RedoThreadCommitScn(int thread, Scn commitScn, String rsId, int ssn, Set<String> txIds) {
this.thread = thread; this.thread = thread;
this.commitScn = commitScn; this.commitScn = commitScn;
this.rsId = rsId; this.rsId = rsId;
this.ssn = ssn; this.ssn = ssn;
this.txIds = txIds;
} }
public int getThread() { public int getThread() {
@ -321,25 +337,36 @@ public void setSsn(int ssn) {
this.ssn = ssn; this.ssn = ssn;
} }
public Set<String> getTxIds() {
return txIds;
}
public void setTxIds(Set<String> txIds) {
this.txIds = txIds;
}
public String getFormattedString() { public String getFormattedString() {
return commitScn.toString() + ":" + (rsId != null ? rsId : "") + ":" + ssn + ":" + thread; return commitScn.toString() + ":" + (rsId != null ? rsId : "") + ":" + ssn + ":" + thread + ":" + Strings.join("-", txIds);
} }
public static RedoThreadCommitScn valueOf(String value) { public static RedoThreadCommitScn valueOf(String value) {
final String[] parts = value.split(":"); final String[] parts = value.split(":", -1);
if (parts.length == 1) { if (parts.length == 1) {
// Reading a legacy commit_scn entry that has only the SCN bit // Reading a legacy commit_scn entry that has only the SCN bit
// Create the redo thread entry with thread 1. // Create the redo thread entry with thread 1.
// There is only ever a single redo thread commit entry in this use case. // There is only ever a single redo thread commit entry in this use case.
return new RedoThreadCommitScn(1, Scn.valueOf(parts[0]), null, 0); return new RedoThreadCommitScn(1, Scn.valueOf(parts[0]), null, 0, new HashSet<>());
} } else if (parts.length == 5) {
else if (parts.length == 4) {
// The new redo-thread based commit scn entry // The new redo-thread based commit scn entry
final Scn scn = Scn.valueOf(parts[0]); final Scn scn = Scn.valueOf(parts[0]);
final String rsId = parts[1]; final String rsId = parts[1];
final int ssn = Integer.parseInt(parts[2]); final int ssn = Integer.parseInt(parts[2]);
final int thread = Integer.parseInt(parts[3]); final int thread = Integer.parseInt(parts[3]);
return new RedoThreadCommitScn(thread, scn, rsId, ssn); Set<String> txIds = new HashSet<>();
if (!parts[4].isEmpty()) {
Collections.addAll(txIds, parts[4].split("-"));
}
return new RedoThreadCommitScn(thread, scn, rsId, ssn, txIds);
} }
throw new DebeziumException("An unexpected redo thread commit scn entry: '" + value + "'"); throw new DebeziumException("An unexpected redo thread commit scn entry: '" + value + "'");
} }
@ -351,6 +378,7 @@ public String toString() {
", commitScn=" + commitScn + ", commitScn=" + commitScn +
", rsId='" + rsId + '\'' + ", rsId='" + rsId + '\'' +
", ssn=" + ssn + ", ssn=" + ssn +
", txIds=" + txIds +
'}'; '}';
} }
} }