diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 7a504fe3d..d5fd94232 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -35,6 +35,10 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS private static final Logger LOGGER = LoggerFactory.getLogger(PostgresStreamingChangeEventSource.class); + // PGOUTPUT decoder sends the messages with larger time gaps than other decoders + // We thus try to read the message multiple times before we make poll pause + private static final int THROTTLE_NO_MESSAGE_BEFORE_PAUSE = 5; + private final PostgresConnection connection; private final EventDispatcher dispatcher; private final ErrorHandler errorHandler; @@ -91,6 +95,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio final ReplicationStream stream = this.replicationStream.get(); while (context.isRunning()) { + int noMessageIterations = 0; if (!stream.readPending(message -> { final Long lsn = stream.lastReceivedLsn(); if (message == null) { @@ -123,7 +128,14 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio if (offsetContext.hasCompletelyProcessedPosition()) { dispatcher.dispatchHeartbeatEvent(offsetContext); } - pauseNoMessage.pause(); + noMessageIterations++; + if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) { + noMessageIterations = 0; + pauseNoMessage.pause(); + } + } + else { + noMessageIterations = 0; } } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 75e3cc2c3..386619137 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -23,6 +23,8 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.connect.data.Decimal; @@ -52,6 +54,7 @@ import io.debezium.junit.ShouldFailWhen; import io.debezium.relational.Table; import io.debezium.relational.TableId; +import io.debezium.util.Stopwatch; import io.debezium.util.Testing; /** @@ -1366,6 +1369,52 @@ public void shouldHaveXminWhenEnabled() throws Exception { assertThat(consumer.isEmpty()).isTrue(); } + @Test + public void shouldProcessLargerTx() throws Exception { + Testing.Print.disable(); + final int numberOfEvents = 1000; + + startConnector(); + waitForStreamingToStart(); + + final String topicPrefix = "public.test_table"; + final String topicName = topicName(topicPrefix); + + final Stopwatch stopwatch = Stopwatch.reusable(); + consumer = testConsumer(numberOfEvents); + // This is not accurate as we measure also including the data but + // it is sufficient to confirm there is no large difference + // in runtime between the cases + stopwatch.start(); + executeAndWait(IntStream.rangeClosed(2, numberOfEvents + 1) + .boxed() + .map(x -> "INSERT INTO test_table (text) VALUES ('insert" + x + "')") + .collect(Collectors.joining(";"))); + stopwatch.stop(); + final long firstRun = stopwatch.durations().statistics().getTotal().toMillis(); + logger.info("Single tx duration = {} ms", firstRun); + for (int i = 0; i < numberOfEvents; i++) { + SourceRecord record = consumer.remove(); + assertEquals(topicName, record.topic()); + VerifyRecord.isValidInsert(record, PK_FIELD, i + 2); + } + + consumer.expects(numberOfEvents); + IntStream.rangeClosed(2, numberOfEvents + 1).forEach(x -> TestHelper.execute("INSERT INTO test_table (text) VALUES ('insert" + x + "')")); + stopwatch.start(); + // There should be no significant difference between many TX runtime and single large TX + // We still add generous limits as the runtime is in seconds and we cannot provide + // a stable scheduling environment + consumer.await(3 * firstRun, TimeUnit.MILLISECONDS); + stopwatch.stop(); + for (int i = 0; i < numberOfEvents; i++) { + SourceRecord record = consumer.remove(); + assertEquals(topicName, record.topic()); + VerifyRecord.isValidInsert(record, PK_FIELD, i + 1002); + } + logger.info("Many tx duration = {} ms", stopwatch.durations().statistics().getTotal().toMillis()); + } + private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode mode, boolean tablesBeforeStart) throws Exception{ if (tablesBeforeStart) { TestHelper.execute(