DBZ-1515 Add poll pause only after multiple no message attempts
This commit is contained in:
parent
3634d680f4
commit
22e8397e6d
@ -35,6 +35,10 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
|
|||||||
|
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresStreamingChangeEventSource.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresStreamingChangeEventSource.class);
|
||||||
|
|
||||||
|
// PGOUTPUT decoder sends the messages with larger time gaps than other decoders
|
||||||
|
// We thus try to read the message multiple times before we make poll pause
|
||||||
|
private static final int THROTTLE_NO_MESSAGE_BEFORE_PAUSE = 5;
|
||||||
|
|
||||||
private final PostgresConnection connection;
|
private final PostgresConnection connection;
|
||||||
private final EventDispatcher<TableId> dispatcher;
|
private final EventDispatcher<TableId> dispatcher;
|
||||||
private final ErrorHandler errorHandler;
|
private final ErrorHandler errorHandler;
|
||||||
@ -91,6 +95,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
|
|||||||
|
|
||||||
final ReplicationStream stream = this.replicationStream.get();
|
final ReplicationStream stream = this.replicationStream.get();
|
||||||
while (context.isRunning()) {
|
while (context.isRunning()) {
|
||||||
|
int noMessageIterations = 0;
|
||||||
if (!stream.readPending(message -> {
|
if (!stream.readPending(message -> {
|
||||||
final Long lsn = stream.lastReceivedLsn();
|
final Long lsn = stream.lastReceivedLsn();
|
||||||
if (message == null) {
|
if (message == null) {
|
||||||
@ -123,7 +128,14 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
|
|||||||
if (offsetContext.hasCompletelyProcessedPosition()) {
|
if (offsetContext.hasCompletelyProcessedPosition()) {
|
||||||
dispatcher.dispatchHeartbeatEvent(offsetContext);
|
dispatcher.dispatchHeartbeatEvent(offsetContext);
|
||||||
}
|
}
|
||||||
pauseNoMessage.pause();
|
noMessageIterations++;
|
||||||
|
if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
|
||||||
|
noMessageIterations = 0;
|
||||||
|
pauseNoMessage.pause();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
noMessageIterations = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,8 @@
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.apache.kafka.connect.data.Decimal;
|
import org.apache.kafka.connect.data.Decimal;
|
||||||
@ -52,6 +54,7 @@
|
|||||||
import io.debezium.junit.ShouldFailWhen;
|
import io.debezium.junit.ShouldFailWhen;
|
||||||
import io.debezium.relational.Table;
|
import io.debezium.relational.Table;
|
||||||
import io.debezium.relational.TableId;
|
import io.debezium.relational.TableId;
|
||||||
|
import io.debezium.util.Stopwatch;
|
||||||
import io.debezium.util.Testing;
|
import io.debezium.util.Testing;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1366,6 +1369,52 @@ public void shouldHaveXminWhenEnabled() throws Exception {
|
|||||||
assertThat(consumer.isEmpty()).isTrue();
|
assertThat(consumer.isEmpty()).isTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldProcessLargerTx() throws Exception {
|
||||||
|
Testing.Print.disable();
|
||||||
|
final int numberOfEvents = 1000;
|
||||||
|
|
||||||
|
startConnector();
|
||||||
|
waitForStreamingToStart();
|
||||||
|
|
||||||
|
final String topicPrefix = "public.test_table";
|
||||||
|
final String topicName = topicName(topicPrefix);
|
||||||
|
|
||||||
|
final Stopwatch stopwatch = Stopwatch.reusable();
|
||||||
|
consumer = testConsumer(numberOfEvents);
|
||||||
|
// This is not accurate as we measure also including the data but
|
||||||
|
// it is sufficient to confirm there is no large difference
|
||||||
|
// in runtime between the cases
|
||||||
|
stopwatch.start();
|
||||||
|
executeAndWait(IntStream.rangeClosed(2, numberOfEvents + 1)
|
||||||
|
.boxed()
|
||||||
|
.map(x -> "INSERT INTO test_table (text) VALUES ('insert" + x + "')")
|
||||||
|
.collect(Collectors.joining(";")));
|
||||||
|
stopwatch.stop();
|
||||||
|
final long firstRun = stopwatch.durations().statistics().getTotal().toMillis();
|
||||||
|
logger.info("Single tx duration = {} ms", firstRun);
|
||||||
|
for (int i = 0; i < numberOfEvents; i++) {
|
||||||
|
SourceRecord record = consumer.remove();
|
||||||
|
assertEquals(topicName, record.topic());
|
||||||
|
VerifyRecord.isValidInsert(record, PK_FIELD, i + 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer.expects(numberOfEvents);
|
||||||
|
IntStream.rangeClosed(2, numberOfEvents + 1).forEach(x -> TestHelper.execute("INSERT INTO test_table (text) VALUES ('insert" + x + "')"));
|
||||||
|
stopwatch.start();
|
||||||
|
// There should be no significant difference between many TX runtime and single large TX
|
||||||
|
// We still add generous limits as the runtime is in seconds and we cannot provide
|
||||||
|
// a stable scheduling environment
|
||||||
|
consumer.await(3 * firstRun, TimeUnit.MILLISECONDS);
|
||||||
|
stopwatch.stop();
|
||||||
|
for (int i = 0; i < numberOfEvents; i++) {
|
||||||
|
SourceRecord record = consumer.remove();
|
||||||
|
assertEquals(topicName, record.topic());
|
||||||
|
VerifyRecord.isValidInsert(record, PK_FIELD, i + 1002);
|
||||||
|
}
|
||||||
|
logger.info("Many tx duration = {} ms", stopwatch.durations().statistics().getTotal().toMillis());
|
||||||
|
}
|
||||||
|
|
||||||
private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode mode, boolean tablesBeforeStart) throws Exception{
|
private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode mode, boolean tablesBeforeStart) throws Exception{
|
||||||
if (tablesBeforeStart) {
|
if (tablesBeforeStart) {
|
||||||
TestHelper.execute(
|
TestHelper.execute(
|
||||||
|
Loading…
Reference in New Issue
Block a user