DBZ-3942 Load correct last commit LSN from offsets

This commit is contained in:
Jiri Pechanec 2021-08-31 12:47:52 +02:00
parent e0efd68852
commit 38cd6907ae

View File

@ -190,7 +190,10 @@ private Long readOptionalLong(Map<String, ?> offset, String key) {
public PostgresOffsetContext load(Map<String, ?> offset) { public PostgresOffsetContext load(Map<String, ?> offset) {
final Lsn lsn = Lsn.valueOf(readOptionalLong(offset, SourceInfo.LSN_KEY)); final Lsn lsn = Lsn.valueOf(readOptionalLong(offset, SourceInfo.LSN_KEY));
final Lsn lastCompletelyProcessedLsn = Lsn.valueOf(readOptionalLong(offset, LAST_COMPLETELY_PROCESSED_LSN_KEY)); final Lsn lastCompletelyProcessedLsn = Lsn.valueOf(readOptionalLong(offset, LAST_COMPLETELY_PROCESSED_LSN_KEY));
final Lsn lastCommitLsn = Lsn.valueOf(readOptionalLong(offset, LAST_COMPLETELY_PROCESSED_LSN_KEY)); Lsn lastCommitLsn = Lsn.valueOf(readOptionalLong(offset, LAST_COMMIT_LSN_KEY));
if (lastCommitLsn == null) {
lastCommitLsn = lastCompletelyProcessedLsn;
}
final Long txId = readOptionalLong(offset, SourceInfo.TXID_KEY); final Long txId = readOptionalLong(offset, SourceInfo.TXID_KEY);
final Instant useconds = Conversions.toInstantFromMicros((Long) offset.get(SourceInfo.TIMESTAMP_USEC_KEY)); final Instant useconds = Conversions.toInstantFromMicros((Long) offset.get(SourceInfo.TIMESTAMP_USEC_KEY));