From 6f6ce339580855cd87d2d613d9b2c0218dd49445 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Thu, 18 Jan 2018 11:07:00 +0100 Subject: [PATCH] DBZ-521 Misc. improvements and fixes; * Actually assing last processed LSN to commit() * passing LSN to commit() instead of entire SourceRecord * Avoiding exception in case of wal2json batch with exactly one element * Using "Lsn" instead of "LSN" in method names * JavaDoc clarifications --- .../postgresql/PostgresConnectorTask.java | 10 +-- .../connector/postgresql/RecordsProducer.java | 17 +++-- .../postgresql/RecordsSnapshotProducer.java | 4 +- .../postgresql/RecordsStreamProducer.java | 12 +-- .../PostgresReplicationConnection.java | 26 ++++--- .../connection/ReplicationStream.java | 32 +++++--- .../wal2json/Wal2JsonMessageDecoder.java | 14 ++-- .../connection/ReplicationConnectionIT.java | 75 ++++++++++--------- 8 files changed, 99 insertions(+), 91 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 6328d9258..09a25106c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -49,10 +49,7 @@ public class PostgresConnectorTask extends SourceTask { private RecordsProducer producer; private Metronome metronome; private Duration pollInterval; - private volatile SourceRecord lastRecordForRecovery = null; - - public PostgresConnectorTask() { - } + private volatile long lastProcessedLsn; @Override public void start(Map props) { @@ -160,7 +157,7 @@ private void createSnapshotProducer(PostgresTaskContext taskContext, SourceInfo @Override public void commit() throws InterruptedException { if (running.get()) { - producer.commit(lastRecordForRecovery); + producer.commit(lastProcessedLsn); } } @@ -194,7 +191,8 @@ public List poll() throws InterruptedException { for (int i = records.size() - 1; i >= 0; i--) { SourceRecord r = records.get(i); if (((Map)r.sourceOffset()).getOrDefault(SourceInfo.LAST_EVENT_FOR_LSN, Boolean.TRUE)) { - lastRecordForRecovery = r; + Map offset = r.sourceOffset(); + lastProcessedLsn = (Long)offset.get(SourceInfo.LSN_KEY); break; } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsProducer.java index ef64cf56f..d9c7af2bd 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsProducer.java @@ -7,6 +7,7 @@ package io.debezium.connector.postgresql; import java.util.function.Consumer; + import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +18,7 @@ /** * Class which generates Kafka Connect {@link org.apache.kafka.connect.source.SourceRecord} records. - * + * * @author Horia Chiorean (hchiorea@redhat.com) */ public abstract class RecordsProducer { @@ -29,29 +30,29 @@ public abstract class RecordsProducer { protected RecordsProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo) { assert taskContext != null; assert sourceInfo != null; - + this.sourceInfo = sourceInfo; this.taskContext = taskContext; } /** - * Starts up this producer. This is normally done by a {@link PostgresConnectorTask} instance. Subclasses should start + * Starts up this producer. This is normally done by a {@link PostgresConnectorTask} instance. Subclasses should start * enqueuing records via a separate thread at the end of this method. - * + * * @param recordsConsumer a consumer of {@link SourceRecord} instances, may not be null */ protected abstract void start(Consumer recordsConsumer); /** - * Notification that offsets have been committed to Kafka and LSN recorded up to the record + * Notification that offsets have been committed to Kafka up to the given LSN. */ - protected abstract void commit(SourceRecord lastRecordForRecovery); + protected abstract void commit(long lsn); /** * Requests that this producer be stopped. This is normally a request coming from a {@link PostgresConnectorTask} instance */ protected abstract void stop(); - + protected PostgresSchema schema() { return taskContext.schema(); } @@ -59,7 +60,7 @@ protected PostgresSchema schema() { protected TopicSelector topicSelector() { return taskContext.topicSelector(); } - + protected Clock clock() { return taskContext.clock(); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java index d98247551..b21e40712 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java @@ -108,8 +108,8 @@ private void startStreaming(Consumer consumer) { } @Override - protected void commit(final SourceRecord lastRecordForRecovery) { - streamProducer.ifPresent(x -> x.commit(lastRecordForRecovery)); + protected void commit(long lsn) { + streamProducer.ifPresent(x -> x.commit(lsn)); } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java index 26db43ff3..d6689a62c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java @@ -116,7 +116,7 @@ private void streamChanges(Consumer consumer) { while (!Thread.currentThread().isInterrupted()) { try { // this will block until a message is available - stream.read(x -> process(x, stream.lastReceivedLSN(), consumer)); + stream.read(x -> process(x, stream.lastReceivedLsn(), consumer)); } catch (SQLException e) { Throwable cause = e.getCause(); if (cause != null && (cause instanceof IOException)) { @@ -136,20 +136,14 @@ private void streamChanges(Consumer consumer) { } @Override - protected synchronized void commit(final SourceRecord lastRecordForRecovery) { + protected synchronized void commit(long lsn) { LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); try { ReplicationStream replicationStream = this.replicationStream.get(); if (replicationStream != null) { // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments logger.debug("flushing offsets to server..."); - if (lastRecordForRecovery != null) { - Map offset = lastRecordForRecovery.sourceOffset(); - Long lsn = (Long)offset.get(SourceInfo.LSN_KEY); - if (lsn != null) { - replicationStream.flushLSN(); - } - } + replicationStream.flushLsn(lsn); } else { logger.debug("streaming has already stopped, ignoring commit callback..."); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 1e334a6b9..f47d44b88 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -207,25 +207,29 @@ public void close() throws SQLException { } @Override - public void flushLSN() throws SQLException { + public void flushLastReceivedLsn() throws SQLException { if (lastReceivedLSN == null) { // nothing to flush yet, since we haven't read anything... return; } - stream.setFlushedLSN(lastReceivedLSN); - stream.setAppliedLSN(lastReceivedLSN); + + doFlushLsn(lastReceivedLSN); + } + + @Override + public void flushLsn(long lsn) throws SQLException { + doFlushLsn(LogSequenceNumber.valueOf(lsn)); + } + + private void doFlushLsn(LogSequenceNumber lsn) throws SQLException { + stream.setFlushedLSN(lsn); + stream.setAppliedLSN(lsn); + stream.forceUpdateStatus(); } @Override - public void flushLSN(Long lsn) throws SQLException { - stream.setFlushedLSN(LogSequenceNumber.valueOf(lsn)); - stream.setAppliedLSN(LogSequenceNumber.valueOf(lsn)); - stream.forceUpdateStatus(); - } - - @Override - public Long lastReceivedLSN() { + public Long lastReceivedLsn() { return lastReceivedLSN != null ? lastReceivedLSN.asLong() : null; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java index c56cfad8c..eb839575c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java @@ -12,7 +12,7 @@ /** * A stream from which messages sent by a logical decoding plugin can be consumed over a replication connection. - * + * * @author Horia Chiorean (hchiorea@redhat.com) */ public interface ReplicationStream extends AutoCloseable { @@ -22,23 +22,23 @@ public interface ReplicationMessageProcessor { } /** - * Blocks and waits for a replication message to be sent over a replication connection. Once a message has been received, - * the value of the {@link #lastReceivedLSN() last received LSN} will also be updated accordingly. + * Blocks and waits for a replication message to be sent over a replication connection. Once a message has been received, + * the value of the {@link #lastReceivedLsn() last received LSN} will also be updated accordingly. * * @param processor - a callback to which the arrived message is passed * @throws SQLException if anything unexpected fails - * @see PGReplicationStream#read() + * @see PGReplicationStream#read() */ void read(ReplicationMessageProcessor processor) throws SQLException; /** * Attempts to read a replication message from a replication connection, returning that message if it's available or returning - * {@code null} if nothing is available. Once a message has been received, the value of the {@link #lastReceivedLSN() last received LSN} + * {@code null} if nothing is available. Once a message has been received, the value of the {@link #lastReceivedLsn() last received LSN} * will also be updated accordingly. * * @param processor - a callback to which the arrived message is passed * @throws SQLException if anything unexpected fails - * @see PGReplicationStream#readPending() + * @see PGReplicationStream#readPending() */ void readPending(ReplicationMessageProcessor processor) throws SQLException; @@ -46,6 +46,10 @@ public interface ReplicationMessageProcessor { * Sends a message to the server informing it about that latest position in the WAL that this stream has read via * {@link ReplicationConnection#startStreaming()} or {@link ReplicationConnection#startStreaming(Long)}. *

+ * Due to the internal buffering the messages sent to Kafka (and thus committed offsets) will usually lag behind the + * latest received LSN, which is why {@link #flushLsn(long)} should be called typically in order to prevent committing + * an LSN which hasn't been committed to Kafka yet. + *

* This essentially tells the server that this stream has successfully processed messages up to the current read cursor * and so the server is free to discard older segments with earlier LSNs. It also affects the catch-up behavior once a slot * is restarted and the server attempt to bring it up-to-date. @@ -53,23 +57,29 @@ public interface ReplicationMessageProcessor { * * @throws SQLException if anything goes wrong */ - void flushLSN() throws SQLException; + void flushLastReceivedLsn() throws SQLException; - void flushLSN(Long lsn) throws SQLException; + /** + * Sends a message to the server informing it about the latest processed LSN. + * + * @throws SQLException if anything goes wrong + */ + + void flushLsn(long lsn) throws SQLException; /** * Returns the value for the latest server received LSN during a read operation. The value is always updated once messages - * are read via the {@link ReplicationConnection#startStreaming()} or {@link ReplicationConnection#startStreaming(Long)} + * are read via the {@link ReplicationConnection#startStreaming()} or {@link ReplicationConnection#startStreaming(Long)} * methods. * * @return a {@link Long} value, possibly null if this is called before anything has been read */ - Long lastReceivedLSN(); + Long lastReceivedLsn(); /** * //TODO author=Horia Chiorean date=13/10/2016 description=Don't use this for now, because of the bug from the PG server * This is stream is closed atm. once the replication connection which created it is closed. - * @see PGReplicationStream#close() + * @see PGReplicationStream#close() */ @Override void close() throws Exception; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonMessageDecoder.java index 1775ae053..5546082cd 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonMessageDecoder.java @@ -9,6 +9,7 @@ import java.nio.ByteBuffer; import java.sql.SQLException; import java.util.Arrays; +import java.util.Iterator; import org.apache.kafka.connect.errors.ConnectException; import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; @@ -18,6 +19,7 @@ import io.debezium.connector.postgresql.connection.MessageDecoder; import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor; import io.debezium.document.Array; +import io.debezium.document.Array.Entry; import io.debezium.document.Document; import io.debezium.document.DocumentReader; import io.debezium.document.Value; @@ -51,13 +53,11 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces final String timestamp = message.getString("timestamp"); final long commitTime = dateTime.systemTimestamp(timestamp); final Array changes = message.getArray("change"); - if (changes.size() > 0) { - for (int i = 0; i < changes.size() - 2; i++) { - final Value v = changes.get(i); - processor.process(new Wal2JsonReplicationMessage(txId, commitTime, v.asDocument(), containsMetadata, false)); - } - final Value v = changes.get(changes.size() - 1); - processor.process(new Wal2JsonReplicationMessage(txId, commitTime, v.asDocument(), containsMetadata, true)); + + Iterator it = changes.iterator(); + while (it.hasNext()) { + Value value = it.next().getValue(); + processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext())); } } catch (final IOException e) { throw new ConnectException(e); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java index 0ee127e1a..38cc522e2 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; + import org.junit.Before; import org.junit.Test; @@ -30,36 +31,36 @@ /** * Integration test for {@link ReplicationConnection} - * + * * @author Horia Chiorean (hchiorea@redhat.com) */ public class ReplicationConnectionIT { - + @Before public void before() throws Exception { TestHelper.dropAllSchemas(); - String statement = "CREATE SCHEMA public;" + + String statement = "CREATE SCHEMA public;" + "CREATE TABLE table_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));" + "CREATE TABLE table_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);"; TestHelper.execute(statement); - } - + } + @Test public void shouldCreateAndDropReplicationSlots() throws Exception { // create a replication connection which should be dropped once it's closed try (ReplicationConnection connection = TestHelper.createForReplication("test1", true)) { - ReplicationStream stream = connection.startStreaming(); - assertNull(stream.lastReceivedLSN()); + ReplicationStream stream = connection.startStreaming(); + assertNull(stream.lastReceivedLsn()); stream.close(); - } + } // create a replication connection which should be dropped once it's closed try (ReplicationConnection connection = TestHelper.createForReplication("test2", true)) { ReplicationStream stream = connection.startStreaming(); - assertNull(stream.lastReceivedLSN()); + assertNull(stream.lastReceivedLsn()); stream.close(); } } - + @Test(expected = IllegalStateException.class) public void shouldNotAllowMultipleReplicationSlotsOnTheSameDBSlotAndPlugin() throws Exception { // create a replication connection which should be dropped once it's closed @@ -71,7 +72,7 @@ public void shouldNotAllowMultipleReplicationSlotsOnTheSameDBSlotAndPlugin() thr } } } - + @Test public void shouldReceiveAndDecodeIndividualChanges() throws Exception { // create a replication connection which should be dropped once it's closed @@ -81,13 +82,13 @@ public void shouldReceiveAndDecodeIndividualChanges() throws Exception { expectedMessagesFromStream(stream, expectedMessages); } } - + @Test public void shouldReceiveSameChangesIfNotFlushed() throws Exception { // don't drop the replication slot once this is finished String slotName = "test"; int receivedMessagesCount = startInsertStop(slotName, null); - + // create a new replication connection with the same slot and check that without the LSN having been flushed, // we'll get back the same message again from before try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) { @@ -95,15 +96,15 @@ public void shouldReceiveSameChangesIfNotFlushed() throws Exception { expectedMessagesFromStream(stream, receivedMessagesCount); } } - - + + @Test public void shouldNotReceiveSameChangesIfFlushed() throws Exception { // don't drop the replication slot once this is finished String slotName = "test"; startInsertStop(slotName, this::flushLSN); - // create a new replication connection with the same slot and check that we don't get back the same changes that we've + // create a new replication connection with the same slot and check that we don't get back the same changes that we've // flushed try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) { ReplicationStream stream = connection.startStreaming(); @@ -111,43 +112,43 @@ public void shouldNotReceiveSameChangesIfFlushed() throws Exception { expectedMessagesFromStream(stream, 0); } } - + @Test public void shouldReceiveMissedChangesWhileDown() throws Exception { String slotName = "test"; startInsertStop(slotName, this::flushLSN); - // run some more SQL while the slot is stopped + // run some more SQL while the slot is stopped // this deletes 2 entries so each of them will have a message TestHelper.execute("DELETE FROM table_with_pk WHERE a < 3;"); int additionalMessages = 2; - + // create a new replication connection with the same slot and check that we get the additional messages try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) { ReplicationStream stream = connection.startStreaming(); expectedMessagesFromStream(stream, additionalMessages); } } - + @Test public void shouldResumeFromLastReceivedLSN() throws Exception { String slotName = "test"; AtomicLong lastReceivedLSN = new AtomicLong(0); - startInsertStop(slotName, stream -> lastReceivedLSN.compareAndSet(0, stream.lastReceivedLSN())); + startInsertStop(slotName, stream -> lastReceivedLSN.compareAndSet(0, stream.lastReceivedLsn())); assertTrue(lastReceivedLSN.get() > 0); - + // resume replication from the last received LSN and don't expect anything else try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) { ReplicationStream stream = connection.startStreaming(lastReceivedLSN.get()); expectedMessagesFromStream(stream, 0); } - } - + } + @Test public void shouldTolerateInvalidLSNValues() throws Exception { String slotName = "test"; startInsertStop(slotName, null); - + // resume replication from the last received LSN and don't expect anything else try (ReplicationConnection connection = TestHelper.createForReplication(slotName, true)) { ReplicationStream stream = connection.startStreaming(Long.MAX_VALUE); @@ -158,7 +159,7 @@ public void shouldTolerateInvalidLSNValues() throws Exception { expectedMessagesFromStream(stream, 0); } } - + @Test public void shouldReceiveOneMessagePerDMLOnTransactionCommit() throws Exception { try (ReplicationConnection connection = TestHelper.createForReplication("test", true)) { @@ -173,7 +174,7 @@ public void shouldReceiveOneMessagePerDMLOnTransactionCommit() throws Exception expectedMessagesFromStream(stream, 2); } } - + @Test public void shouldNotReceiveMessagesOnTransactionRollback() throws Exception { try (ReplicationConnection connection = TestHelper.createForReplication("test", true)) { @@ -186,7 +187,7 @@ public void shouldNotReceiveMessagesOnTransactionRollback() throws Exception { expectedMessagesFromStream(stream, 0); } } - + @Test public void shouldGeneratesEventsForMultipleSchemas() throws Exception { try (ReplicationConnection connection = TestHelper.createForReplication("test", true)) { @@ -201,12 +202,12 @@ public void shouldGeneratesEventsForMultipleSchemas() throws Exception { "INSERT INTO schema2.table (b, c) VALUES('Value for schema2', now());"; TestHelper.execute(statements); expectedMessagesFromStream(stream, 2); - } + } } - + private void flushLSN(ReplicationStream stream) { try { - stream.flushLSN(); + stream.flushLastReceivedLsn(); } catch (SQLException e) { throw new RuntimeException(e); } @@ -224,7 +225,7 @@ private int startInsertStop(String slotName, Consumer streamP streamProcessor.accept(stream); } } catch (Throwable t) { - // make sure we always drop the slot if something fails - note the connection was created with the drop on close + // make sure we always drop the slot if something fails - note the connection was created with the drop on close // set to false try (PostgresConnection conn = TestHelper.create()) { conn.dropReplicationSlot(slotName); @@ -236,7 +237,7 @@ private int startInsertStop(String slotName, Consumer streamP Thread.sleep(100); return expectedMessageCount; } - + private List expectedMessagesFromStream(ReplicationStream stream, int expectedMessages) throws Exception { List actualMessages = new ArrayList<>(); @@ -270,7 +271,7 @@ private List expectedMessagesFromStream(ReplicationStream st } return actualMessages; } - + private int insertSmallTestData() throws Exception { String statement = "INSERT INTO table_with_pk (b, c) VALUES('Backup and Restore', now());" + "INSERT INTO table_with_pk (b, c) VALUES('Tuning', now());"; @@ -278,7 +279,7 @@ private int insertSmallTestData() throws Exception { // we expect 2 messages from the above return 2; } - + private int insertLargeTestData() throws Exception { String statement = "INSERT INTO table_with_pk (b, c) VALUES('Backup and Restore', now());" + "INSERT INTO table_with_pk (b, c) VALUES('Tuning', now());" + @@ -288,9 +289,9 @@ private int insertLargeTestData() throws Exception { "ALTER TABLE table_without_pk REPLICA IDENTITY FULL;" + "UPDATE table_without_pk SET c = 'Baz' WHERE c = 'Bar';" + "DELETE FROM table_without_pk WHERE c = 'Baz';"; - + // Postgres WILL NOT fire any tuple changes (UPDATES or DELETES) for tables which don't have a PK by default EXCEPT - // if that table has a REPLICA IDENTITY of FULL or INDEX. + // if that table has a REPLICA IDENTITY of FULL or INDEX. // See http://michael.otacoo.com/postgresql-2/postgres-9-4-feature-highlight-replica-identity-logical-replication/ // ...so we expect 8 messages for the above DML TestHelper.execute(statement);