DBZ-4715 Fix stream event's source timestamp tz resolution

This commit is contained in:
Chris Cranford 2022-02-11 11:32:55 -05:00 committed by Gunnar Morling
parent 1e0f22e52e
commit 76e4420d30
2 changed files with 7 additions and 1 deletions

View File

@ -694,6 +694,10 @@ public void calculateTimeDifference(OffsetDateTime databaseSystemTime) {
LOGGER.trace("Current time {} ms, database difference {} ms", now.toEpochMilli(), timeDiffMillis); LOGGER.trace("Current time {} ms, database difference {} ms", now.toEpochMilli(), timeDiffMillis);
} }
public long getDatabaseOffsetSeconds() {
return offsetSeconds.get();
}
public void calculateLagMetrics(Instant changeTime) { public void calculateLagMetrics(Instant changeTime) {
if (changeTime != null) { if (changeTime != null) {
final Instant correctedChangeTime = changeTime.plusMillis(timeDifference.longValue()).minusSeconds(offsetSeconds.longValue()); final Instant correctedChangeTime = changeTime.plusMillis(timeDifference.longValue()).minusSeconds(offsetSeconds.longValue());

View File

@ -359,6 +359,8 @@ protected void handleCommit(LogMinerEventRow row) throws InterruptedException {
LOGGER.trace("Commit (smallest SCN {}) {}", smallestScn, row); LOGGER.trace("Commit (smallest SCN {}) {}", smallestScn, row);
LOGGER.trace("Transaction {} has {} events", transactionId, numEvents); LOGGER.trace("Transaction {} has {} events", transactionId, numEvents);
final long databaseOffsetSeconds = metrics.getDatabaseOffsetSeconds();
final boolean skipExcludedUserName = isTransactionUserExcluded(transaction); final boolean skipExcludedUserName = isTransactionUserExcluded(transaction);
BlockingConsumer<LogMinerEvent> delegate = new BlockingConsumer<LogMinerEvent>() { BlockingConsumer<LogMinerEvent> delegate = new BlockingConsumer<LogMinerEvent>() {
private int numEvents = getTransactionEventCount(transaction); private int numEvents = getTransactionEventCount(transaction);
@ -372,7 +374,7 @@ public void accept(LogMinerEvent event) throws InterruptedException {
} }
offsetContext.setTransactionId(transactionId); offsetContext.setTransactionId(transactionId);
offsetContext.setSourceTime(event.getChangeTime()); offsetContext.setSourceTime(event.getChangeTime().minusSeconds(databaseOffsetSeconds));
offsetContext.setTableId(event.getTableId()); offsetContext.setTableId(event.getTableId());
if (--numEvents == 0) { if (--numEvents == 0) {
// reached the last event update the commit scn in the offsets // reached the last event update the commit scn in the offsets