DBZ-965 Some clean-up
This commit is contained in:
parent
675f5b6479
commit
5a9ca6eb31
@ -40,7 +40,12 @@ public class PostgresConnectorTask extends BaseSourceTask {
|
||||
|
||||
private PostgresTaskContext taskContext;
|
||||
private RecordsProducer producer;
|
||||
private volatile Long lastProcessedLsn;
|
||||
|
||||
/**
|
||||
* In case of wal2json, all records of one TX will be sent with the same LSN. This is the last LSN that was
|
||||
* completely processed, i.e. we've seen all events originating from that TX.
|
||||
*/
|
||||
private volatile Long lastCompletelyProcessedLsn;
|
||||
|
||||
/**
|
||||
* A queue with change events filled by the snapshot and streaming producers, consumed
|
||||
@ -140,8 +145,8 @@ private void createSnapshotProducer(PostgresTaskContext taskContext, SourceInfo
|
||||
@Override
|
||||
public void commit() throws InterruptedException {
|
||||
if (running.get()) {
|
||||
if (lastProcessedLsn != null) {
|
||||
producer.commit(lastProcessedLsn);
|
||||
if (lastCompletelyProcessedLsn != null) {
|
||||
producer.commit(lastCompletelyProcessedLsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -151,7 +156,7 @@ public List<SourceRecord> poll() throws InterruptedException {
|
||||
List<ChangeEvent> events = changeEventQueue.poll();
|
||||
|
||||
if (events.size() > 0) {
|
||||
lastProcessedLsn = events.get(events.size() - 1).getLastCompletelyProcessedLsn();
|
||||
lastCompletelyProcessedLsn = events.get(events.size() - 1).getLastCompletelyProcessedLsn();
|
||||
}
|
||||
return events.stream().map(ChangeEvent::getRecord).collect(Collectors.toList());
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ public class RecordsStreamProducer extends RecordsProducer {
|
||||
private final AtomicReference<ReplicationStream> replicationStream;
|
||||
private final AtomicBoolean cleanupExecuted = new AtomicBoolean();
|
||||
private PgConnection typeResolverConnection = null;
|
||||
private Long lastProcessedLsn;
|
||||
private Long lastCompletelyProcessedLsn;
|
||||
|
||||
private final Heartbeat heartbeat;
|
||||
|
||||
@ -111,7 +111,7 @@ protected synchronized void start(BlockingConsumer<ChangeEvent> eventConsumer, C
|
||||
// refresh the schema so we have a latest view of the DB tables
|
||||
taskContext.refreshSchema(true);
|
||||
|
||||
this.lastProcessedLsn = sourceInfo.lsn();
|
||||
this.lastCompletelyProcessedLsn = sourceInfo.lsn();
|
||||
|
||||
// the new thread will inherit it's parent MDC
|
||||
executorService.submit(() -> streamChanges(eventConsumer, failureConsumer));
|
||||
@ -232,7 +232,7 @@ private void process(ReplicationMessage message, Long lsn, BlockingConsumer<Chan
|
||||
return;
|
||||
}
|
||||
if (message.isLastEventForLsn()) {
|
||||
lastProcessedLsn = lsn;
|
||||
lastCompletelyProcessedLsn = lsn;
|
||||
}
|
||||
|
||||
TableId tableId = PostgresSchema.parse(message.getTable());
|
||||
@ -305,7 +305,7 @@ protected void generateCreateRecord(TableId tableId, Object[] rowData, BlockingC
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("sending create event '{}' to topic '{}'", record, topicName);
|
||||
}
|
||||
recordConsumer.accept(new ChangeEvent(record, lastProcessedLsn));
|
||||
recordConsumer.accept(new ChangeEvent(record, lastCompletelyProcessedLsn));
|
||||
}
|
||||
|
||||
protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object[] newRowData,
|
||||
@ -345,7 +345,7 @@ protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object
|
||||
new SourceRecord(
|
||||
partition, offset, topicName, null, oldKeySchema, oldKey, envelope.schema(),
|
||||
envelope.delete(oldValue, source, clock().currentTimeInMillis())),
|
||||
lastProcessedLsn);
|
||||
lastCompletelyProcessedLsn);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("sending delete event '{}' to topic '{}'", changeEvent.getRecord(), topicName);
|
||||
}
|
||||
@ -355,7 +355,7 @@ protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object
|
||||
// send a tombstone event (null value) for the old key so it can be removed from the Kafka log eventually...
|
||||
changeEvent = new ChangeEvent(
|
||||
new SourceRecord(partition, offset, topicName, null, oldKeySchema, oldKey, null, null),
|
||||
lastProcessedLsn);
|
||||
lastCompletelyProcessedLsn);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("sending tombstone event '{}' to topic '{}'", changeEvent.getRecord(), topicName);
|
||||
}
|
||||
@ -367,7 +367,7 @@ protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object
|
||||
new SourceRecord(
|
||||
partition, offset, topicName, null, newKeySchema, newKey, envelope.schema(),
|
||||
envelope.create(newValue, source, clock().currentTimeInMillis())),
|
||||
lastProcessedLsn);
|
||||
lastCompletelyProcessedLsn);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("sending create event '{}' to topic '{}'", changeEvent.getRecord(), topicName);
|
||||
}
|
||||
@ -376,7 +376,7 @@ protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object
|
||||
SourceRecord record = new SourceRecord(partition, offset, topicName, null,
|
||||
newKeySchema, newKey, envelope.schema(),
|
||||
envelope.update(oldValue, newValue, source, clock().currentTimeInMillis()));
|
||||
recordConsumer.accept(new ChangeEvent(record, lastProcessedLsn));
|
||||
recordConsumer.accept(new ChangeEvent(record, lastCompletelyProcessedLsn));
|
||||
}
|
||||
}
|
||||
|
||||
@ -404,7 +404,7 @@ protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, Blocki
|
||||
partition, offset, topicName, null,
|
||||
keySchema, key, envelope.schema(),
|
||||
envelope.delete(value, sourceInfo.source(), clock().currentTimeInMillis())),
|
||||
lastProcessedLsn);
|
||||
lastCompletelyProcessedLsn);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("sending delete event '{}' to topic '{}'", changeEvent.getRecord(), topicName);
|
||||
}
|
||||
@ -414,7 +414,7 @@ protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, Blocki
|
||||
if (taskContext.config().isEmitTombstoneOnDelete()) {
|
||||
changeEvent = new ChangeEvent(
|
||||
new SourceRecord(partition, offset, topicName, null, keySchema, key, null, null),
|
||||
lastProcessedLsn);
|
||||
lastCompletelyProcessedLsn);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("sending tombstone event '{}' to topic '{}'", changeEvent.getRecord(), topicName);
|
||||
}
|
||||
|
@ -30,8 +30,6 @@ public static int updatesWithoutPK(final int expectedCount, final int updatesWit
|
||||
/**
|
||||
* wal2json plugin is not currently able to encode and parse quoted identifiers
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*
|
||||
*/
|
||||
public static class AreQuotedIdentifiersUnsupported implements Supplier<Boolean> {
|
||||
@Override
|
||||
@ -43,9 +41,6 @@ public Boolean get() {
|
||||
/**
|
||||
* wal2json plugin sends heartbeat only at the end of transaction as the changes in a single transaction
|
||||
* are under the same LSN.
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*
|
||||
*/
|
||||
public static boolean singleHeartbeatPerTransaction() {
|
||||
return wal2Json();
|
||||
|
Loading…
Reference in New Issue
Block a user