DBZ-1174 Convert commitTime from nanos to micros for wal2json

- wal2json sends the txn commitTime using a function from PostgreSQL's C
  library. The value that Debezium recevies is in nanoseconds.
- decoderbufs sends the txn commitTime in microseconds.
- RecordsSnapshotProducer updates SourceInfo.ts_usec by converting
  System.currentTimeMillis() to microseconds.
- RecordsSnapshotProducer updates the SourceInfo's ts_usec field using
  message.getCommitTime().

This means that when using wal2json, the value of SourceInfo.ts_usec
is in microseconds since epoch during snapshot but is in nanoseconds
during streaming. To fix this, we changed
Wal2JsonReplicationMessage.getCommitTime() to return in microseconds.
This commit is contained in:
Ashhar Hasan 2019-03-08 04:29:02 +05:30 committed by Jiri Pechanec
parent 71765a2ab2
commit 0b07afd23e
3 changed files with 41 additions and 3 deletions

View File

@ -241,9 +241,9 @@ private void process(ReplicationMessage message, Long lsn, BlockingConsumer<Chan
assert tableId != null;
// update the source info with the coordinates for this message
long commitTimeNs = message.getCommitTime();
long commitTimeMicros = message.getCommitTime();
long txId = message.getTransactionId();
sourceInfo.update(lsn, commitTimeNs, txId, tableId);
sourceInfo.update(lsn, commitTimeMicros, txId, tableId);
if (logger.isDebugEnabled()) {
logger.debug("received new message at position {}\n{}", ReplicationConnection.format(lsn), message);
}

View File

@ -13,6 +13,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import org.apache.kafka.connect.data.Field;
@ -85,7 +86,7 @@ public Operation getOperation() {
@Override
public long getCommitTime() {
return commitTime;
return TimeUnit.NANOSECONDS.toMicros(commitTime);
}
@Override

View File

@ -18,6 +18,7 @@
import static org.junit.Assert.fail;
import java.sql.SQLException;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
@ -214,6 +215,34 @@ public void shouldProduceEventsWithInitialSnapshot() throws Exception {
assertRecordsAfterInsert(2, 3, 3);
}
@Test
@FixFor("DBZ-1174")
public void shouldUseMicrosecondsForTransactionCommitTime() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
start(PostgresConnector.class, TestHelper.defaultConfig().build());
assertConnectorIsRunning();
// check records from snapshot
Instant inst = Instant.now();
// Microseconds since epoch, may overflow
final long microsSnapshot = TimeUnit.SECONDS.toMicros(inst.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(inst.getNano());
SourceRecords actualRecords = consumeRecordsByTopic(2);
actualRecords.forEach(sourceRecord -> assertSourceInfoTransactionTimestamp(sourceRecord, microsSnapshot, TimeUnit.MINUTES.toMicros(1L)));
// insert 2 new records
TestHelper.execute(INSERT_STMT);
// check records from streaming
inst = Instant.now();
// Microseconds since epoch, may overflow
final long microsStream = TimeUnit.SECONDS.toMicros(inst.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(inst.getNano());
actualRecords = consumeRecordsByTopic(2);
actualRecords.forEach(sourceRecord -> assertSourceInfoTransactionTimestamp(sourceRecord, microsStream, TimeUnit.MINUTES.toMicros(1L)));
//now stop the connector
stopConnector();
assertNoRecordsToConsume();
}
@Test
@FixFor("DBZ-997")
public void shouldReceiveChangesForChangePKColumnDefinition() throws Exception {
@ -749,6 +778,14 @@ private void assertRecordsAfterInsert(int expectedCount, int...pks) throws Inter
IntStream.range(0, expectedCountPerSchema).forEach(i -> VerifyRecord.isValidInsert(recordsForTopicS2.remove(0), PK_FIELD, pks[i]));
}
protected void assertSourceInfoTransactionTimestamp(SourceRecord record, Long ts_usec, Long tolerance_usec) {
assertTrue(record.value() instanceof Struct);
Struct source = ((Struct) record.value()).getStruct("source");
// 1 minute difference is okay
System.out.println("TS_USEC\t" + source.getInt64("ts_usec"));
assertTrue(Math.abs(ts_usec - source.getInt64("ts_usec")) < tolerance_usec);
}
private <T> void validateField(Config config, Field field, T expectedValue) {
assertNoConfigurationErrors(config, field);
Object actualValue = configValue(config, field.name()).value();