DBZ-1730 Stream from replica slot position not end of tx log
This commit is contained in:
parent
a3f4c82908
commit
ee91eac418
@ -60,6 +60,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
|
||||
private final AtomicReference<ReplicationStream> replicationStream = new AtomicReference<>();
|
||||
private final Snapshotter snapshotter;
|
||||
private final Metronome pauseNoMessage;
|
||||
private final boolean hasStartLsnStoredInContext;
|
||||
|
||||
/**
|
||||
* The minimum of (number of event received since the last event sent to Kafka,
|
||||
@ -78,6 +79,9 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi
|
||||
this.clock = clock;
|
||||
this.schema = schema;
|
||||
this.offsetContext = (offsetContext != null) ? offsetContext : PostgresOffsetContext.initialContext(connectorConfig, connection, clock);
|
||||
// replication slot could exist at the time of starting Debezium so we will stream from the position in the slot
|
||||
// instead of the last position in the database
|
||||
this.hasStartLsnStoredInContext = (offsetContext != null);
|
||||
pauseNoMessage = Metronome.sleeper(taskContext.getConfig().getPollInterval(), Clock.SYSTEM);
|
||||
this.taskContext = taskContext;
|
||||
this.snapshotter = snapshotter;
|
||||
@ -92,7 +96,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
|
||||
}
|
||||
|
||||
try {
|
||||
if (offsetContext.hasLastKnownPosition()) {
|
||||
if (hasStartLsnStoredInContext) {
|
||||
// start streaming from the last recorded position in the offset
|
||||
final Long lsn = offsetContext.lastCompletelyProcessedLsn() != null ? offsetContext.lastCompletelyProcessedLsn() : offsetContext.lsn();
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
@ -113,7 +117,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
|
||||
// refresh the schema so we have a latest view of the DB tables
|
||||
taskContext.refreshSchema(connection, true);
|
||||
|
||||
this.lastCompletelyProcessedLsn = offsetContext.lsn();
|
||||
this.lastCompletelyProcessedLsn = replicationStream.get().startLsn();
|
||||
|
||||
while (context.isRunning()) {
|
||||
int noMessageIterations = 0;
|
||||
|
@ -152,7 +152,6 @@ protected void initPublication() {
|
||||
}
|
||||
|
||||
protected void initReplicationSlot() throws SQLException, InterruptedException {
|
||||
final String postgresPluginName = plugin.getPostgresPluginName();
|
||||
ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
|
||||
|
||||
boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
|
||||
@ -480,6 +479,11 @@ private void processWarnings(final boolean forced) throws SQLException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long startLsn() {
|
||||
return startingLsn;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -74,6 +74,13 @@ public interface ReplicationMessageProcessor {
|
||||
*/
|
||||
Long lastReceivedLsn();
|
||||
|
||||
/**
|
||||
* Returns the value for the LSN form which the streaming is executed.
|
||||
*
|
||||
* @return a {@link Long} value, possibly null if starting LSN is undefined
|
||||
*/
|
||||
Long startLsn();
|
||||
|
||||
/**
|
||||
* Starts a background thread to ensure the slot is kept alive, useful for when temporarily
|
||||
* stopping reads from the stream such as querying metadata, etc
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.fest.assertions.Assertions;
|
||||
import org.junit.Before;
|
||||
@ -65,6 +66,7 @@
|
||||
import io.debezium.data.VerifyRecord;
|
||||
import io.debezium.data.geometry.Point;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.embedded.EmbeddedEngine;
|
||||
import io.debezium.heartbeat.Heartbeat;
|
||||
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
|
||||
import io.debezium.jdbc.TemporalPrecisionMode;
|
||||
@ -74,6 +76,7 @@
|
||||
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.util.Collect;
|
||||
import io.debezium.util.Stopwatch;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
@ -1613,6 +1616,39 @@ public void stopInTheMiddleOfTxAndResume() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1730")
|
||||
public void shouldStartConsumingFromSlotLocation() throws Exception {
|
||||
Testing.Print.enable();
|
||||
|
||||
startConnector(config -> config
|
||||
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false)
|
||||
.with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class), true);
|
||||
waitForStreamingToStart();
|
||||
|
||||
consumer = testConsumer(1);
|
||||
executeAndWait("INSERT INTO test_table (text) VALUES ('insert2')");
|
||||
consumer.remove();
|
||||
|
||||
stopConnector();
|
||||
TestHelper.execute(
|
||||
"INSERT INTO test_table (text) VALUES ('insert3');",
|
||||
"INSERT INTO test_table (text) VALUES ('insert4')");
|
||||
startConnector(config -> config
|
||||
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true)
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER)
|
||||
.with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class), false);
|
||||
|
||||
consumer.expects(2);
|
||||
consumer.await(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS);
|
||||
|
||||
// We cannot guarantee the flush timing so it is possible that insert2 record will be redelivered
|
||||
Assertions.assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isIn(Collect.unmodifiableSet("insert2", "insert3"));
|
||||
Assertions.assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isIn(Collect.unmodifiableSet("insert3", "insert4"));
|
||||
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1824")
|
||||
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON, reason = "wal2json cannot resume transaction in the middle of processing")
|
||||
|
Loading…
Reference in New Issue
Block a user