DBZ-521 Misc. improvements and fixes;

* Actually assing last processed LSN to commit()
* passing LSN to commit() instead of entire SourceRecord
* Avoiding exception in case of wal2json batch with exactly one element
* Using "Lsn" instead of "LSN" in method names
* JavaDoc clarifications
This commit is contained in:
Gunnar Morling 2018-01-18 11:07:00 +01:00
parent 0b5348339a
commit 6f6ce33958
8 changed files with 99 additions and 91 deletions

View File

@ -49,10 +49,7 @@ public class PostgresConnectorTask extends SourceTask {
private RecordsProducer producer;
private Metronome metronome;
private Duration pollInterval;
private volatile SourceRecord lastRecordForRecovery = null;
public PostgresConnectorTask() {
}
private volatile long lastProcessedLsn;
@Override
public void start(Map<String, String> props) {
@ -160,7 +157,7 @@ private void createSnapshotProducer(PostgresTaskContext taskContext, SourceInfo
@Override
public void commit() throws InterruptedException {
if (running.get()) {
producer.commit(lastRecordForRecovery);
producer.commit(lastProcessedLsn);
}
}
@ -194,7 +191,8 @@ public List<SourceRecord> poll() throws InterruptedException {
for (int i = records.size() - 1; i >= 0; i--) {
SourceRecord r = records.get(i);
if (((Map<String, Boolean>)r.sourceOffset()).getOrDefault(SourceInfo.LAST_EVENT_FOR_LSN, Boolean.TRUE)) {
lastRecordForRecovery = r;
Map<String, ?> offset = r.sourceOffset();
lastProcessedLsn = (Long)offset.get(SourceInfo.LSN_KEY);
break;
}
}

View File

@ -7,6 +7,7 @@
package io.debezium.connector.postgresql;
import java.util.function.Consumer;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -17,7 +18,7 @@
/**
* Class which generates Kafka Connect {@link org.apache.kafka.connect.source.SourceRecord} records.
*
*
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public abstract class RecordsProducer {
@ -29,29 +30,29 @@ public abstract class RecordsProducer {
protected RecordsProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo) {
assert taskContext != null;
assert sourceInfo != null;
this.sourceInfo = sourceInfo;
this.taskContext = taskContext;
}
/**
* Starts up this producer. This is normally done by a {@link PostgresConnectorTask} instance. Subclasses should start
* Starts up this producer. This is normally done by a {@link PostgresConnectorTask} instance. Subclasses should start
* enqueuing records via a separate thread at the end of this method.
*
*
* @param recordsConsumer a consumer of {@link SourceRecord} instances, may not be null
*/
protected abstract void start(Consumer<SourceRecord> recordsConsumer);
/**
* Notification that offsets have been committed to Kafka and LSN recorded up to the record
* Notification that offsets have been committed to Kafka up to the given LSN.
*/
protected abstract void commit(SourceRecord lastRecordForRecovery);
protected abstract void commit(long lsn);
/**
* Requests that this producer be stopped. This is normally a request coming from a {@link PostgresConnectorTask} instance
*/
protected abstract void stop();
protected PostgresSchema schema() {
return taskContext.schema();
}
@ -59,7 +60,7 @@ protected PostgresSchema schema() {
protected TopicSelector topicSelector() {
return taskContext.topicSelector();
}
protected Clock clock() {
return taskContext.clock();
}

View File

@ -108,8 +108,8 @@ private void startStreaming(Consumer<SourceRecord> consumer) {
}
@Override
protected void commit(final SourceRecord lastRecordForRecovery) {
streamProducer.ifPresent(x -> x.commit(lastRecordForRecovery));
protected void commit(long lsn) {
streamProducer.ifPresent(x -> x.commit(lsn));
}
@Override

View File

@ -116,7 +116,7 @@ private void streamChanges(Consumer<SourceRecord> consumer) {
while (!Thread.currentThread().isInterrupted()) {
try {
// this will block until a message is available
stream.read(x -> process(x, stream.lastReceivedLSN(), consumer));
stream.read(x -> process(x, stream.lastReceivedLsn(), consumer));
} catch (SQLException e) {
Throwable cause = e.getCause();
if (cause != null && (cause instanceof IOException)) {
@ -136,20 +136,14 @@ private void streamChanges(Consumer<SourceRecord> consumer) {
}
@Override
protected synchronized void commit(final SourceRecord lastRecordForRecovery) {
protected synchronized void commit(long lsn) {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
ReplicationStream replicationStream = this.replicationStream.get();
if (replicationStream != null) {
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
logger.debug("flushing offsets to server...");
if (lastRecordForRecovery != null) {
Map<String, ?> offset = lastRecordForRecovery.sourceOffset();
Long lsn = (Long)offset.get(SourceInfo.LSN_KEY);
if (lsn != null) {
replicationStream.flushLSN();
}
}
replicationStream.flushLsn(lsn);
} else {
logger.debug("streaming has already stopped, ignoring commit callback...");
}

View File

@ -207,25 +207,29 @@ public void close() throws SQLException {
}
@Override
public void flushLSN() throws SQLException {
public void flushLastReceivedLsn() throws SQLException {
if (lastReceivedLSN == null) {
// nothing to flush yet, since we haven't read anything...
return;
}
stream.setFlushedLSN(lastReceivedLSN);
stream.setAppliedLSN(lastReceivedLSN);
doFlushLsn(lastReceivedLSN);
}
@Override
public void flushLsn(long lsn) throws SQLException {
doFlushLsn(LogSequenceNumber.valueOf(lsn));
}
private void doFlushLsn(LogSequenceNumber lsn) throws SQLException {
stream.setFlushedLSN(lsn);
stream.setAppliedLSN(lsn);
stream.forceUpdateStatus();
}
@Override
public void flushLSN(Long lsn) throws SQLException {
stream.setFlushedLSN(LogSequenceNumber.valueOf(lsn));
stream.setAppliedLSN(LogSequenceNumber.valueOf(lsn));
stream.forceUpdateStatus();
}
@Override
public Long lastReceivedLSN() {
public Long lastReceivedLsn() {
return lastReceivedLSN != null ? lastReceivedLSN.asLong() : null;
}

View File

@ -12,7 +12,7 @@
/**
* A stream from which messages sent by a logical decoding plugin can be consumed over a replication connection.
*
*
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public interface ReplicationStream extends AutoCloseable {
@ -22,23 +22,23 @@ public interface ReplicationMessageProcessor {
}
/**
* Blocks and waits for a replication message to be sent over a replication connection. Once a message has been received,
* the value of the {@link #lastReceivedLSN() last received LSN} will also be updated accordingly.
* Blocks and waits for a replication message to be sent over a replication connection. Once a message has been received,
* the value of the {@link #lastReceivedLsn() last received LSN} will also be updated accordingly.
*
* @param processor - a callback to which the arrived message is passed
* @throws SQLException if anything unexpected fails
* @see PGReplicationStream#read()
* @see PGReplicationStream#read()
*/
void read(ReplicationMessageProcessor processor) throws SQLException;
/**
* Attempts to read a replication message from a replication connection, returning that message if it's available or returning
* {@code null} if nothing is available. Once a message has been received, the value of the {@link #lastReceivedLSN() last received LSN}
* {@code null} if nothing is available. Once a message has been received, the value of the {@link #lastReceivedLsn() last received LSN}
* will also be updated accordingly.
*
* @param processor - a callback to which the arrived message is passed
* @throws SQLException if anything unexpected fails
* @see PGReplicationStream#readPending()
* @see PGReplicationStream#readPending()
*/
void readPending(ReplicationMessageProcessor processor) throws SQLException;
@ -46,6 +46,10 @@ public interface ReplicationMessageProcessor {
* Sends a message to the server informing it about that latest position in the WAL that this stream has read via
* {@link ReplicationConnection#startStreaming()} or {@link ReplicationConnection#startStreaming(Long)}.
* <p>
* Due to the internal buffering the messages sent to Kafka (and thus committed offsets) will usually lag behind the
* latest received LSN, which is why {@link #flushLsn(long)} should be called typically in order to prevent committing
* an LSN which hasn't been committed to Kafka yet.
* <p>
* This essentially tells the server that this stream has successfully processed messages up to the current read cursor
* and so the server is free to discard older segments with earlier LSNs. It also affects the catch-up behavior once a slot
* is restarted and the server attempt to bring it up-to-date.
@ -53,23 +57,29 @@ public interface ReplicationMessageProcessor {
*
* @throws SQLException if anything goes wrong
*/
void flushLSN() throws SQLException;
void flushLastReceivedLsn() throws SQLException;
void flushLSN(Long lsn) throws SQLException;
/**
* Sends a message to the server informing it about the latest processed LSN.
*
* @throws SQLException if anything goes wrong
*/
void flushLsn(long lsn) throws SQLException;
/**
* Returns the value for the latest server received LSN during a read operation. The value is always updated once messages
* are read via the {@link ReplicationConnection#startStreaming()} or {@link ReplicationConnection#startStreaming(Long)}
* are read via the {@link ReplicationConnection#startStreaming()} or {@link ReplicationConnection#startStreaming(Long)}
* methods.
*
* @return a {@link Long} value, possibly null if this is called before anything has been read
*/
Long lastReceivedLSN();
Long lastReceivedLsn();
/**
* //TODO author=Horia Chiorean date=13/10/2016 description=Don't use this for now, because of the bug from the PG server
* This is stream is closed atm. once the replication connection which created it is closed.
* @see PGReplicationStream#close()
* @see PGReplicationStream#close()
*/
@Override
void close() throws Exception;

View File

@ -9,6 +9,7 @@
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
@ -18,6 +19,7 @@
import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
import io.debezium.document.Array;
import io.debezium.document.Array.Entry;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.Value;
@ -51,13 +53,11 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
final String timestamp = message.getString("timestamp");
final long commitTime = dateTime.systemTimestamp(timestamp);
final Array changes = message.getArray("change");
if (changes.size() > 0) {
for (int i = 0; i < changes.size() - 2; i++) {
final Value v = changes.get(i);
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, v.asDocument(), containsMetadata, false));
}
final Value v = changes.get(changes.size() - 1);
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, v.asDocument(), containsMetadata, true));
Iterator<Entry> it = changes.iterator();
while (it.hasNext()) {
Value value = it.next().getValue();
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext()));
}
} catch (final IOException e) {
throw new ConnectException(e);

View File

@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
@ -30,36 +31,36 @@
/**
* Integration test for {@link ReplicationConnection}
*
*
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class ReplicationConnectionIT {
@Before
public void before() throws Exception {
TestHelper.dropAllSchemas();
String statement = "CREATE SCHEMA public;" +
String statement = "CREATE SCHEMA public;" +
"CREATE TABLE table_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));" +
"CREATE TABLE table_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);";
TestHelper.execute(statement);
}
}
@Test
public void shouldCreateAndDropReplicationSlots() throws Exception {
// create a replication connection which should be dropped once it's closed
try (ReplicationConnection connection = TestHelper.createForReplication("test1", true)) {
ReplicationStream stream = connection.startStreaming();
assertNull(stream.lastReceivedLSN());
ReplicationStream stream = connection.startStreaming();
assertNull(stream.lastReceivedLsn());
stream.close();
}
}
// create a replication connection which should be dropped once it's closed
try (ReplicationConnection connection = TestHelper.createForReplication("test2", true)) {
ReplicationStream stream = connection.startStreaming();
assertNull(stream.lastReceivedLSN());
assertNull(stream.lastReceivedLsn());
stream.close();
}
}
@Test(expected = IllegalStateException.class)
public void shouldNotAllowMultipleReplicationSlotsOnTheSameDBSlotAndPlugin() throws Exception {
// create a replication connection which should be dropped once it's closed
@ -71,7 +72,7 @@ public void shouldNotAllowMultipleReplicationSlotsOnTheSameDBSlotAndPlugin() thr
}
}
}
@Test
public void shouldReceiveAndDecodeIndividualChanges() throws Exception {
// create a replication connection which should be dropped once it's closed
@ -81,13 +82,13 @@ public void shouldReceiveAndDecodeIndividualChanges() throws Exception {
expectedMessagesFromStream(stream, expectedMessages);
}
}
@Test
public void shouldReceiveSameChangesIfNotFlushed() throws Exception {
// don't drop the replication slot once this is finished
String slotName = "test";
int receivedMessagesCount = startInsertStop(slotName, null);
// create a new replication connection with the same slot and check that without the LSN having been flushed,
// we'll get back the same message again from before
try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) {
@ -95,15 +96,15 @@ public void shouldReceiveSameChangesIfNotFlushed() throws Exception {
expectedMessagesFromStream(stream, receivedMessagesCount);
}
}
@Test
public void shouldNotReceiveSameChangesIfFlushed() throws Exception {
// don't drop the replication slot once this is finished
String slotName = "test";
startInsertStop(slotName, this::flushLSN);
// create a new replication connection with the same slot and check that we don't get back the same changes that we've
// create a new replication connection with the same slot and check that we don't get back the same changes that we've
// flushed
try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) {
ReplicationStream stream = connection.startStreaming();
@ -111,43 +112,43 @@ public void shouldNotReceiveSameChangesIfFlushed() throws Exception {
expectedMessagesFromStream(stream, 0);
}
}
@Test
public void shouldReceiveMissedChangesWhileDown() throws Exception {
String slotName = "test";
startInsertStop(slotName, this::flushLSN);
// run some more SQL while the slot is stopped
// run some more SQL while the slot is stopped
// this deletes 2 entries so each of them will have a message
TestHelper.execute("DELETE FROM table_with_pk WHERE a < 3;");
int additionalMessages = 2;
// create a new replication connection with the same slot and check that we get the additional messages
try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) {
ReplicationStream stream = connection.startStreaming();
expectedMessagesFromStream(stream, additionalMessages);
}
}
@Test
public void shouldResumeFromLastReceivedLSN() throws Exception {
String slotName = "test";
AtomicLong lastReceivedLSN = new AtomicLong(0);
startInsertStop(slotName, stream -> lastReceivedLSN.compareAndSet(0, stream.lastReceivedLSN()));
startInsertStop(slotName, stream -> lastReceivedLSN.compareAndSet(0, stream.lastReceivedLsn()));
assertTrue(lastReceivedLSN.get() > 0);
// resume replication from the last received LSN and don't expect anything else
try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) {
ReplicationStream stream = connection.startStreaming(lastReceivedLSN.get());
expectedMessagesFromStream(stream, 0);
}
}
}
@Test
public void shouldTolerateInvalidLSNValues() throws Exception {
String slotName = "test";
startInsertStop(slotName, null);
// resume replication from the last received LSN and don't expect anything else
try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) {
ReplicationStream stream = connection.startStreaming(Long.MAX_VALUE);
@ -158,7 +159,7 @@ public void shouldTolerateInvalidLSNValues() throws Exception {
expectedMessagesFromStream(stream, 0);
}
}
@Test
public void shouldReceiveOneMessagePerDMLOnTransactionCommit() throws Exception {
try (ReplicationConnection connection = TestHelper.createForReplication("test", true)) {
@ -173,7 +174,7 @@ public void shouldReceiveOneMessagePerDMLOnTransactionCommit() throws Exception
expectedMessagesFromStream(stream, 2);
}
}
@Test
public void shouldNotReceiveMessagesOnTransactionRollback() throws Exception {
try (ReplicationConnection connection = TestHelper.createForReplication("test", true)) {
@ -186,7 +187,7 @@ public void shouldNotReceiveMessagesOnTransactionRollback() throws Exception {
expectedMessagesFromStream(stream, 0);
}
}
@Test
public void shouldGeneratesEventsForMultipleSchemas() throws Exception {
try (ReplicationConnection connection = TestHelper.createForReplication("test", true)) {
@ -201,12 +202,12 @@ public void shouldGeneratesEventsForMultipleSchemas() throws Exception {
"INSERT INTO schema2.table (b, c) VALUES('Value for schema2', now());";
TestHelper.execute(statements);
expectedMessagesFromStream(stream, 2);
}
}
}
private void flushLSN(ReplicationStream stream) {
try {
stream.flushLSN();
stream.flushLastReceivedLsn();
} catch (SQLException e) {
throw new RuntimeException(e);
}
@ -224,7 +225,7 @@ private int startInsertStop(String slotName, Consumer<ReplicationStream> streamP
streamProcessor.accept(stream);
}
} catch (Throwable t) {
// make sure we always drop the slot if something fails - note the connection was created with the drop on close
// make sure we always drop the slot if something fails - note the connection was created with the drop on close
// set to false
try (PostgresConnection conn = TestHelper.create()) {
conn.dropReplicationSlot(slotName);
@ -236,7 +237,7 @@ private int startInsertStop(String slotName, Consumer<ReplicationStream> streamP
Thread.sleep(100);
return expectedMessageCount;
}
private List<ReplicationMessage> expectedMessagesFromStream(ReplicationStream stream,
int expectedMessages) throws Exception {
List<ReplicationMessage> actualMessages = new ArrayList<>();
@ -270,7 +271,7 @@ private List<ReplicationMessage> expectedMessagesFromStream(ReplicationStream st
}
return actualMessages;
}
private int insertSmallTestData() throws Exception {
String statement = "INSERT INTO table_with_pk (b, c) VALUES('Backup and Restore', now());" +
"INSERT INTO table_with_pk (b, c) VALUES('Tuning', now());";
@ -278,7 +279,7 @@ private int insertSmallTestData() throws Exception {
// we expect 2 messages from the above
return 2;
}
private int insertLargeTestData() throws Exception {
String statement = "INSERT INTO table_with_pk (b, c) VALUES('Backup and Restore', now());" +
"INSERT INTO table_with_pk (b, c) VALUES('Tuning', now());" +
@ -288,9 +289,9 @@ private int insertLargeTestData() throws Exception {
"ALTER TABLE table_without_pk REPLICA IDENTITY FULL;" +
"UPDATE table_without_pk SET c = 'Baz' WHERE c = 'Bar';" +
"DELETE FROM table_without_pk WHERE c = 'Baz';";
// Postgres WILL NOT fire any tuple changes (UPDATES or DELETES) for tables which don't have a PK by default EXCEPT
// if that table has a REPLICA IDENTITY of FULL or INDEX.
// if that table has a REPLICA IDENTITY of FULL or INDEX.
// See http://michael.otacoo.com/postgresql-2/postgres-9-4-feature-highlight-replica-identity-logical-replication/
// ...so we expect 8 messages for the above DML
TestHelper.execute(statement);