DBZ-521 Record LSN for the last message committed to Connect

This commit is contained in:
Jiri Pechanec 2018-01-12 13:21:38 +01:00 committed by Gunnar Morling
parent c3edf8ee9c
commit 0b5348339a
11 changed files with 85 additions and 22 deletions

View File

@ -49,6 +49,7 @@ public class PostgresConnectorTask extends SourceTask {
private RecordsProducer producer;
private Metronome metronome;
private Duration pollInterval;
private volatile SourceRecord lastRecordForRecovery = null;
public PostgresConnectorTask() {
}
@ -159,10 +160,11 @@ private void createSnapshotProducer(PostgresTaskContext taskContext, SourceInfo
@Override
public void commit() throws InterruptedException {
if (running.get()) {
producer.commit();
producer.commit(lastRecordForRecovery);
}
}
@SuppressWarnings("unchecked")
@Override
public List<SourceRecord> poll() throws InterruptedException {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
@ -188,6 +190,15 @@ public List<SourceRecord> poll() throws InterruptedException {
break;
}
}
if (records.size() > 0) {
for (int i = records.size() - 1; i >= 0; i--) {
SourceRecord r = records.get(i);
if (((Map<String, Boolean>)r.sourceOffset()).getOrDefault(SourceInfo.LAST_EVENT_FOR_LSN, Boolean.TRUE)) {
lastRecordForRecovery = r;
break;
}
}
}
return records;
} finally {
previousContext.restore();

View File

@ -43,9 +43,9 @@ protected RecordsProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo
protected abstract void start(Consumer<SourceRecord> recordsConsumer);
/**
* Notification that offsets have been committed to Kafka.
* Notification that offsets have been committed to Kafka and LSN recorded up to the record
*/
protected abstract void commit();
protected abstract void commit(SourceRecord lastRecordForRecovery);
/**
* Requests that this producer be stopped. This is normally a request coming from a {@link PostgresConnectorTask} instance

View File

@ -108,8 +108,8 @@ private void startStreaming(Consumer<SourceRecord> consumer) {
}
@Override
protected void commit() {
streamProducer.ifPresent(RecordsStreamProducer::commit);
protected void commit(final SourceRecord lastRecordForRecovery) {
streamProducer.ifPresent(x -> x.commit(lastRecordForRecovery));
}
@Override
@ -172,7 +172,7 @@ private void takeSnapshot(Consumer<SourceRecord> consumer) {
// and mark the start of the snapshot
sourceInfo.startSnapshot();
sourceInfo.update(xlogStart, clock().currentTimeInMicros(), txId);
sourceInfo.update(xlogStart, clock().currentTimeInMicros(), txId, true);
logger.info("Step 3: reading and exporting the contents of each table");
AtomicInteger rowsCounter = new AtomicInteger(0);

View File

@ -136,14 +136,20 @@ private void streamChanges(Consumer<SourceRecord> consumer) {
}
@Override
protected synchronized void commit() {
protected synchronized void commit(final SourceRecord lastRecordForRecovery) {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
ReplicationStream replicationStream = this.replicationStream.get();
if (replicationStream != null) {
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
logger.debug("flushing offsets to server...");
if (lastRecordForRecovery != null) {
Map<String, ?> offset = lastRecordForRecovery.sourceOffset();
Long lsn = (Long)offset.get(SourceInfo.LSN_KEY);
if (lsn != null) {
replicationStream.flushLSN();
}
}
} else {
logger.debug("streaming has already stopped, ignoring commit callback...");
}
@ -220,7 +226,7 @@ private void process(ReplicationMessage message, Long lsn, Consumer<SourceRecord
// update the source info with the coordinates for this message
long commitTimeNs = message.getCommitTime();
int txId = message.getTransactionId();
sourceInfo.update(lsn, commitTimeNs, txId);
sourceInfo.update(lsn, commitTimeNs, txId, message.isLastEventForLsn());
if (logger.isDebugEnabled()) {
logger.debug("received new message at position {}\n{}", ReplicationConnection.format(lsn), message);
}

View File

@ -80,6 +80,7 @@ final class SourceInfo {
public static final String LSN_KEY = "lsn";
public static final String SNAPSHOT_KEY = "snapshot";
public static final String LAST_SNAPSHOT_RECORD_KEY = "last_snapshot_record";
public static final String LAST_EVENT_FOR_LSN = "last_event_for_lsn";
/**
* A {@link Schema} definition for a {@link Struct} used to store the {@link #partition()} and {@link #offset()} information.
@ -92,6 +93,7 @@ final class SourceInfo {
.field(LSN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SNAPSHOT_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field(LAST_SNAPSHOT_RECORD_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field(LAST_EVENT_FOR_LSN, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.build();
private final String serverName;
@ -102,6 +104,7 @@ final class SourceInfo {
private Long useconds;
private boolean snapshot = false;
private Boolean lastSnapshotRecord;
private Boolean lastEventForLsn;
protected SourceInfo(String serverName) {
this.serverName = serverName;
@ -116,6 +119,7 @@ protected void load(Map<String, Object> lastStoredOffset) {
if (this.snapshot) {
this.lastSnapshotRecord = (Boolean) lastStoredOffset.get(LAST_SNAPSHOT_RECORD_KEY);
}
this.lastEventForLsn = (Boolean)lastStoredOffset.get(LAST_EVENT_FOR_LSN);
}
/**
@ -151,6 +155,9 @@ public Map<String, String> partition() {
result.put(SNAPSHOT_KEY, true);
result.put(LAST_SNAPSHOT_RECORD_KEY, lastSnapshotRecord);
}
if (lastEventForLsn != null) {
result.put(LAST_EVENT_FOR_LSN, lastEventForLsn);
}
return result;
}
@ -161,13 +168,15 @@ public Map<String, String> partition() {
* available
* @param useconds the commit time (in microseconds since epoch) of the transaction that generated the event;
* may be null indicating that this information is not available
* @param txId the ID of the transaction that generated the transaction; may be null if this information nis not available
* @param txId the ID of the transaction that generated the transaction; may be null if this information is not available
* @param lastEventForLsn a flag indicating that record associated with offset was the last one in a batch with same LSN
* @return this instance
*/
protected SourceInfo update(Long lsn, Long useconds, Integer txId) {
protected SourceInfo update(Long lsn, Long useconds, Integer txId, boolean lastEventForLsn) {
this.lsn = lsn;
this.useconds = useconds;
this.txId = txId;
this.lastEventForLsn = lastEventForLsn;
return this;
}
@ -262,4 +271,8 @@ public String toString() {
sb.append(']');
return sb.toString();
}
protected Boolean getLastEventForLsn() {
return lastEventForLsn;
}
}

View File

@ -217,6 +217,13 @@ public void flushLSN() throws SQLException {
stream.forceUpdateStatus();
}
@Override
public void flushLSN(Long lsn) throws SQLException {
stream.setFlushedLSN(LogSequenceNumber.valueOf(lsn));
stream.setAppliedLSN(LogSequenceNumber.valueOf(lsn));
stream.forceUpdateStatus();
}
@Override
public Long lastReceivedLSN() {
return lastReceivedLSN != null ? lastReceivedLSN.asLong() : null;

View File

@ -92,4 +92,9 @@ public interface ColumnTypeMetadata {
* @return true if type metadata are passed as a part of message
*/
boolean hasMetadata();
/**
* @return true if this is the last message in the batch of messages with same LSN
*/
boolean isLastEventForLsn();
}

View File

@ -55,6 +55,8 @@ public interface ReplicationMessageProcessor {
*/
void flushLSN() throws SQLException;
void flushLSN(Long lsn) throws SQLException;
/**
* Returns the value for the latest server received LSN during a read operation. The value is always updated once messages
* are read via the {@link ReplicationConnection#startStreaming()} or {@link ReplicationConnection#startStreaming(Long)}

View File

@ -111,6 +111,12 @@ public int doGetOidType() {
.collect(Collectors.toList());
}
@Override
public boolean isLastEventForLsn() {
return true;
}
/**
* Converts the Protobuf value for a {@link io.debezium.connector.postgresql.proto.PgProto.DatumMessage plugin message} to
* a Java value based on the type of the column from the message. This value will be converted later on if necessary by the

View File

@ -20,6 +20,7 @@
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.Value;
/**
* JSON deserialization of a message sent by
@ -50,8 +51,13 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
final String timestamp = message.getString("timestamp");
final long commitTime = dateTime.systemTimestamp(timestamp);
final Array changes = message.getArray("change");
for (Array.Entry e: changes) {
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, e.getValue().asDocument(), containsMetadata));
if (changes.size() > 0) {
for (int i = 0; i < changes.size() - 2; i++) {
final Value v = changes.get(i);
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, v.asDocument(), containsMetadata, false));
}
final Value v = changes.get(changes.size() - 1);
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, v.asDocument(), containsMetadata, true));
}
} catch (final IOException e) {
throw new ConnectException(e);

View File

@ -50,12 +50,14 @@ class Wal2JsonReplicationMessage implements ReplicationMessage {
private final long commitTime;
private final Document rawMessage;
private final boolean hasMetadata;
final boolean lastEventForLsn;
public Wal2JsonReplicationMessage(final int txId, final long commitTime, final Document rawMessage, final boolean hasMetadata) {
public Wal2JsonReplicationMessage(final int txId, final long commitTime, final Document rawMessage, final boolean hasMetadata, boolean lastEventForLsn) {
this.txId = txId;
this.commitTime = commitTime;
this.rawMessage = rawMessage;
this.hasMetadata = hasMetadata;
this.lastEventForLsn = lastEventForLsn;
}
@Override
@ -350,4 +352,9 @@ private String toInternalTypeName(TypeMetadataImpl typeMetadata) {
final String fullTypeName = typeMetadata.getFullType();
return fullTypeName.startsWith("character") ? "bpchar" : fullTypeName;
}
@Override
public boolean isLastEventForLsn() {
return lastEventForLsn;
}
}