DBZ-3978 add userName as a constructor argument.Add integration test.

This commit is contained in:
Willie Zhu 2021-09-23 18:42:07 +08:00 committed by Chris Cranford
parent 0fa6f9f464
commit 0a9fced8bb
5 changed files with 41 additions and 17 deletions

View File

@ -35,16 +35,19 @@ public class Transaction {
private String userName;
@VisibleForMarshalling
public Transaction(String transactionId, Scn startScn, Instant changeTime, List<LogMinerEvent> events, Set<Long> hashes) {
public Transaction(String transactionId, Scn startScn, Instant changeTime, List<LogMinerEvent> events, Set<Long> hashes, String userName) {
this.transactionId = transactionId;
this.startScn = startScn;
this.changeTime = changeTime;
this.events = events;
this.hashes = hashes;
if (!"UNKNOWN".equalsIgnoreCase(userName)) {
this.userName = userName;
}
}
public Transaction(String transactionId, Scn startScn, Instant changeTime) {
this(transactionId, startScn, changeTime, new ArrayList<>(), new HashSet<>());
public Transaction(String transactionId, Scn startScn, Instant changeTime, String userName) {
this(transactionId, startScn, changeTime, new ArrayList<>(), new HashSet<>(), userName);
}
public String getTransactionId() {
@ -86,12 +89,6 @@ public String getUserName() {
return userName;
}
public void setUserName(String userName) {
if (userName != null && !"UNKNOWN".equalsIgnoreCase(userName)) {
this.userName = userName;
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -113,7 +110,8 @@ public int hashCode() {
public String toString() {
return "Transaction{" +
"transactionId='" + transactionId + '\'' +
", startScn=" + startScn + ", userName='" + userName +
", startScn=" + startScn +
", userName='" + userName +
"'}";
}
}

View File

@ -223,8 +223,7 @@ protected void handleStart(LogMinerEventRow row) {
final String transactionId = row.getTransactionId();
final Transaction transaction = getTransactionCache().get(transactionId);
if (transaction == null && !isRecentlyCommitted(transactionId)) {
Transaction newTransaction = new Transaction(transactionId, row.getScn(), row.getChangeTime());
newTransaction.setUserName(row.getUserName());
Transaction newTransaction = new Transaction(transactionId, row.getScn(), row.getChangeTime(), row.getUserName());
getTransactionCache().put(transactionId, newTransaction);
metrics.setActiveTransactions(getTransactionCache().size());
}
@ -477,8 +476,7 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
Transaction transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.debug("Transaction {} not in cache for DML, creating.", transactionId);
transaction = new Transaction(transactionId, row.getScn(), row.getChangeTime());
transaction.setUserName(row.getUserName());
transaction = new Transaction(transactionId, row.getScn(), row.getChangeTime(), row.getUserName());
getTransactionCache().put(transactionId, transaction);
}

View File

@ -353,7 +353,7 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
Transaction transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} is not in cache, creating.", transactionId);
transaction = new Transaction(transactionId, row.getScn(), row.getChangeTime());
transaction = new Transaction(transactionId, row.getScn(), row.getChangeTime(), row.getUserName());
}
if (row.getHash() == 0L || !transaction.getHashes().contains(row.getHash())) {
LOGGER.trace("Adding {} to transaction {} for table '{}'", row.getOperation(), transactionId, row.getTableId());

View File

@ -40,8 +40,8 @@ public class TransactionAdapter {
* @return the constructed Transaction instance
*/
@ProtoFactory
public Transaction factory(String transactionId, String scn, String changeTime, List<LogMinerEvent> events, Set<Long> hashes) {
return new Transaction(transactionId, Scn.valueOf(scn), Instant.parse(changeTime), events, hashes);
public Transaction factory(String transactionId, String scn, String changeTime, List<LogMinerEvent> events, Set<Long> hashes, String userName) {
return new Transaction(transactionId, Scn.valueOf(scn), Instant.parse(changeTime), events, hashes, userName);
}
/**
@ -101,4 +101,9 @@ public List<LogMinerEvent> getEvents(Transaction transaction) {
public Set<Long> getHashes(Transaction transaction) {
return transaction.getHashes();
}
@ProtoField(number = 6)
public String getUserName(Transaction transaction) {
return transaction.getUserName();
}
}

View File

@ -221,4 +221,27 @@ record = records.get(6);
assertEndTransaction(record, expectedTxId, 2, Collect.hashMapOf(dbName + ".DEBEZIUM.CUSTOMER", 1, dbName + ".DEBEZIUM.ORDERS", 1));
}
}
@Test
public void filterUser() throws Exception {
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER,DEBEZIUM\\.ORDERS")
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(OracleConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(OracleConnectorConfig.LOG_MINING_STRATEGY, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG)
.with(OracleConnectorConfig.LOG_MINING_USERNAME_EXCLUDE_LIST, "DEBEZIUM")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.executeWithoutCommitting("INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))");
connection.executeWithoutCommitting("INSERT INTO debezium.orders VALUES (1, TO_DATE('2021-02-01', 'yyyy-mm-dd'), 1001, 1, 102)");
connection.execute("COMMIT");
// all messages are filtered out
assertThat(waitForAvailableRecords(10, TimeUnit.SECONDS)).isFalse();
}
}