DBZ-2911 Add additional LSN information to all sources

Adds a new "sequence" field to all sources via the AbstractSourceInfo
class. Specifically, this field is required to deduplicate records
from PostgreSQL sources in O(1) time. The sequence field is a stringified
list of metadata. For PostgreSQL sources, this will include the last
committed LSN and the current LSN. For other sources, this will be NULL
until implemented. A new integration test was added for PostgreSQL.
This commit is contained in:
JLDLaughlin 2021-03-01 09:04:59 -05:00 committed by Jiri Pechanec
parent 98299ca111
commit 06b0475f17
11 changed files with 133 additions and 4 deletions

View File

@ -259,6 +259,7 @@ public void schemaIsCorrect() {
.field("ts_ms", Schema.INT64_SCHEMA) .field("ts_ms", Schema.INT64_SCHEMA)
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
.field("db", Schema.STRING_SCHEMA) .field("db", Schema.STRING_SCHEMA)
.field("sequence", Schema.OPTIONAL_STRING_SCHEMA)
.field("rs", Schema.STRING_SCHEMA) .field("rs", Schema.STRING_SCHEMA)
.field("collection", Schema.STRING_SCHEMA) .field("collection", Schema.STRING_SCHEMA)
.field("ord", Schema.INT32_SCHEMA) .field("ord", Schema.INT32_SCHEMA)

View File

@ -620,6 +620,7 @@ public void schemaIsCorrect() {
.field("ts_ms", Schema.INT64_SCHEMA) .field("ts_ms", Schema.INT64_SCHEMA)
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
.field("db", Schema.STRING_SCHEMA) .field("db", Schema.STRING_SCHEMA)
.field("sequence", Schema.OPTIONAL_STRING_SCHEMA)
.field("table", Schema.OPTIONAL_STRING_SCHEMA) .field("table", Schema.OPTIONAL_STRING_SCHEMA)
.field("server_id", Schema.INT64_SCHEMA) .field("server_id", Schema.INT64_SCHEMA)
.field("gtid", Schema.OPTIONAL_STRING_SCHEMA) .field("gtid", Schema.OPTIONAL_STRING_SCHEMA)

View File

@ -720,6 +720,7 @@ public void schemaIsCorrect() {
.field("ts_ms", Schema.INT64_SCHEMA) .field("ts_ms", Schema.INT64_SCHEMA)
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
.field("db", Schema.STRING_SCHEMA) .field("db", Schema.STRING_SCHEMA)
.field("sequence", Schema.OPTIONAL_STRING_SCHEMA)
.field("table", Schema.OPTIONAL_STRING_SCHEMA) .field("table", Schema.OPTIONAL_STRING_SCHEMA)
.field("server_id", Schema.INT64_SCHEMA) .field("server_id", Schema.INT64_SCHEMA)
.field("gtid", Schema.OPTIONAL_STRING_SCHEMA) .field("gtid", Schema.OPTIONAL_STRING_SCHEMA)

View File

@ -52,7 +52,7 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn,
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
this.lastCommitLsn = lastCommitLsn; this.lastCommitLsn = lastCommitLsn;
sourceInfo.update(lsn, time, txId, null, sourceInfo.xmin()); sourceInfo.update(lsn, time, txId, null, sourceInfo.xmin(), lastCommitLsn);
sourceInfoSchema = sourceInfo.schema(); sourceInfoSchema = sourceInfo.schema();
this.lastSnapshotRecord = lastSnapshotRecord; this.lastSnapshotRecord = lastSnapshotRecord;
@ -141,7 +141,7 @@ public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant c
public void updateCommitPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, TableId tableId, Long xmin) { public void updateCommitPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, TableId tableId, Long xmin) {
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
this.lastCommitLsn = lastCompletelyProcessedLsn; this.lastCommitLsn = lastCompletelyProcessedLsn;
sourceInfo.update(lsn, commitTime, txId, tableId, xmin); sourceInfo.update(lsn, commitTime, txId, tableId, xmin, lastCompletelyProcessedLsn);
} }
boolean hasLastKnownPosition() { boolean hasLastKnownPosition() {

View File

@ -7,6 +7,11 @@
package io.debezium.connector.postgresql; package io.debezium.connector.postgresql;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.annotation.NotThreadSafe; import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.SnapshotRecord; import io.debezium.connector.SnapshotRecord;
@ -80,6 +85,7 @@ public final class SourceInfo extends BaseSourceInfo {
private final String dbName; private final String dbName;
private Lsn lsn; private Lsn lsn;
private Lsn[] sequence;
private Long txId; private Long txId;
private Long xmin; private Long xmin;
private Instant timestamp; private Instant timestamp;
@ -103,6 +109,12 @@ protected SourceInfo(PostgresConnectorConfig connectorConfig) {
* @param xmin the xmin of the slot, may be null * @param xmin the xmin of the slot, may be null
* @return this instance * @return this instance
*/ */
protected SourceInfo update(Lsn lsn, Instant commitTime, Long txId, TableId tableId, Long xmin, Lsn lastCommitLsn) {
update(lsn, commitTime, txId, tableId, xmin);
this.sequence = new Lsn[]{ lastCommitLsn, lsn };
return this;
}
protected SourceInfo update(Lsn lsn, Instant commitTime, Long txId, TableId tableId, Long xmin) { protected SourceInfo update(Lsn lsn, Instant commitTime, Long txId, TableId tableId, Long xmin) {
this.lsn = lsn; this.lsn = lsn;
if (commitTime != null) { if (commitTime != null) {
@ -138,6 +150,27 @@ public Long xmin() {
return this.xmin; return this.xmin;
} }
public String sequence() {
List<String> sequence = new ArrayList<String>();
if (this.sequence != null) {
for (Lsn lsn : this.sequence) {
if (lsn == null) {
sequence.add(null);
}
else {
sequence.add(Long.toString(lsn.asLong()));
}
}
}
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(sequence);
}
catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
}
@Override @Override
protected String database() { protected String database() {
return dbName; return dbName;
@ -179,6 +212,9 @@ public String toString() {
if (xmin != null) { if (xmin != null) {
sb.append(", xmin=").append(xmin); sb.append(", xmin=").append(xmin);
} }
if (sequence != null) {
sb.append(", sequence=").append(sequence);
}
if (timestamp != null) { if (timestamp != null) {
sb.append(", timestamp=").append(timestamp); sb.append(", timestamp=").append(timestamp);
} }

View File

@ -15,11 +15,13 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -54,6 +56,8 @@
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
import org.postgresql.util.PSQLState; import org.postgresql.util.PSQLState;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.Version; import io.debezium.config.CommonConnectorConfig.Version;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
@ -2509,6 +2513,70 @@ private CompletableFuture<Void> batchInsertRecords(long recordsCount, int batchS
}); });
} }
private List<Long> getSequence(SourceRecord record) {
assertTrue(record.value() instanceof Struct);
Struct source = ((Struct) record.value()).getStruct("source");
String stringSequence = source.getString("sequence");
ObjectMapper mapper = new ObjectMapper();
try {
// Sequence values are Strings, but they are all Longs for
// Postgres sources.
return Arrays.asList(mapper.readValue(stringSequence, Long[].class));
}
catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Test
@FixFor("DBZ-2991")
public void shouldHaveLastCommitLsn() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
start(PostgresConnector.class, TestHelper.defaultConfig()
.with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V2)
.with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.build());
assertConnectorIsRunning();
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
assertNoRecordsToConsume();
final int n_inserts = 3;
for (int i = 0; i < n_inserts; ++i) {
TestHelper.execute(INSERT_STMT);
}
List<SourceRecord> records = new ArrayList<>();
Awaitility.await("Skip empty transactions and find the data").atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords() * 3)).until(() -> {
int n_transactions = 0;
while (n_transactions < n_inserts) {
final List<SourceRecord> candidate = consumeRecordsByTopic(2).allRecordsInOrder();
if (candidate.get(1).topic().contains("transaction")) {
// empty transaction, should be skipped
continue;
}
records.addAll(candidate);
records.addAll(consumeRecordsByTopic(2).allRecordsInOrder());
++n_transactions;
}
return true;
});
assertEquals(4 * n_inserts, records.size());
List<Long> second_transaction_sequence = getSequence(records.get(5));
assertEquals(second_transaction_sequence.size(), 2);
assertEquals(second_transaction_sequence.get(0), getSequence(records.get(6)).get(0));
List<Long> third_transaction_sequence = getSequence(records.get(9));
assertEquals(third_transaction_sequence.size(), 2);
assertEquals(third_transaction_sequence.get(0), getSequence(records.get(10)).get(0));
// Assert the lsn of the second transaction is less than the third.
assertTrue(second_transaction_sequence.get(1) < third_transaction_sequence.get(1));
stopConnector();
}
private Predicate<SourceRecord> stopOnPKPredicate(int pkValue) { private Predicate<SourceRecord> stopOnPKPredicate(int pkValue) {
return record -> { return record -> {
Struct key = (Struct) record.key(); Struct key = (Struct) record.key();

View File

@ -68,6 +68,7 @@ public void schemaIsCorrect() {
.field("ts_ms", Schema.INT64_SCHEMA) .field("ts_ms", Schema.INT64_SCHEMA)
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
.field("db", Schema.STRING_SCHEMA) .field("db", Schema.STRING_SCHEMA)
.field("sequence", Schema.OPTIONAL_STRING_SCHEMA)
.field("schema", Schema.STRING_SCHEMA) .field("schema", Schema.STRING_SCHEMA)
.field("table", Schema.STRING_SCHEMA) .field("table", Schema.STRING_SCHEMA)
.field("txId", Schema.OPTIONAL_INT64_SCHEMA) .field("txId", Schema.OPTIONAL_INT64_SCHEMA)

View File

@ -95,6 +95,7 @@ public void schemaIsCorrect() {
.field("ts_ms", Schema.INT64_SCHEMA) .field("ts_ms", Schema.INT64_SCHEMA)
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
.field("db", Schema.STRING_SCHEMA) .field("db", Schema.STRING_SCHEMA)
.field("sequence", Schema.OPTIONAL_STRING_SCHEMA)
.field("schema", Schema.STRING_SCHEMA) .field("schema", Schema.STRING_SCHEMA)
.field("table", Schema.STRING_SCHEMA) .field("table", Schema.STRING_SCHEMA)
.field("change_lsn", Schema.OPTIONAL_STRING_SCHEMA) .field("change_lsn", Schema.OPTIONAL_STRING_SCHEMA)

View File

@ -28,6 +28,7 @@ public abstract class AbstractSourceInfo {
public static final String SCHEMA_NAME_KEY = "schema"; public static final String SCHEMA_NAME_KEY = "schema";
public static final String TABLE_NAME_KEY = "table"; public static final String TABLE_NAME_KEY = "table";
public static final String COLLECTION_NAME_KEY = "collection"; public static final String COLLECTION_NAME_KEY = "collection";
public static final String SEQUENCE_KEY = "sequence";
private final CommonConnectorConfig config; private final CommonConnectorConfig config;
@ -75,4 +76,18 @@ protected String serverName() {
public Struct struct() { public Struct struct() {
return structMaker().struct(this); return structMaker().struct(this);
} }
/**
* Returns extra sequencing metadata about a change event formatted
* as a stringified JSON array. The metadata contained in a sequence must be
* ordered sequentially in order to be understood and compared.
*
* Note: if a source's sequence metadata contains any string values, those
* strings must be correctly escaped before being included in the stringified
* JSON array.
*/
protected String sequence() {
return null;
};
} }

View File

@ -43,18 +43,21 @@ protected SchemaBuilder commonSchemaBuilder() {
.field(AbstractSourceInfo.SERVER_NAME_KEY, Schema.STRING_SCHEMA) .field(AbstractSourceInfo.SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA) .field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.field(AbstractSourceInfo.SNAPSHOT_KEY, SNAPSHOT_RECORD_SCHEMA) .field(AbstractSourceInfo.SNAPSHOT_KEY, SNAPSHOT_RECORD_SCHEMA)
.field(AbstractSourceInfo.DATABASE_NAME_KEY, Schema.STRING_SCHEMA); .field(AbstractSourceInfo.DATABASE_NAME_KEY, Schema.STRING_SCHEMA)
.field(AbstractSourceInfo.SEQUENCE_KEY, Schema.OPTIONAL_STRING_SCHEMA);
} }
protected Struct commonStruct(T sourceInfo) { protected Struct commonStruct(T sourceInfo) {
final Instant timestamp = sourceInfo.timestamp() == null ? Instant.now() : sourceInfo.timestamp(); final Instant timestamp = sourceInfo.timestamp() == null ? Instant.now() : sourceInfo.timestamp();
final String database = sourceInfo.database() == null ? "" : sourceInfo.database(); final String database = sourceInfo.database() == null ? "" : sourceInfo.database();
final String sequence = sourceInfo.sequence() == null ? "" : sourceInfo.sequence();
Struct ret = new Struct(schema()) Struct ret = new Struct(schema())
.put(AbstractSourceInfo.DEBEZIUM_VERSION_KEY, version) .put(AbstractSourceInfo.DEBEZIUM_VERSION_KEY, version)
.put(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY, connector) .put(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY, connector)
.put(AbstractSourceInfo.SERVER_NAME_KEY, serverName) .put(AbstractSourceInfo.SERVER_NAME_KEY, serverName)
.put(AbstractSourceInfo.TIMESTAMP_KEY, timestamp.toEpochMilli()) .put(AbstractSourceInfo.TIMESTAMP_KEY, timestamp.toEpochMilli())
.put(AbstractSourceInfo.DATABASE_NAME_KEY, database); .put(AbstractSourceInfo.DATABASE_NAME_KEY, database)
.put(AbstractSourceInfo.SEQUENCE_KEY, sequence);
final SnapshotRecord snapshot = sourceInfo.snapshot(); final SnapshotRecord snapshot = sourceInfo.snapshot();
if (snapshot != null) { if (snapshot != null) {
snapshot.toSource(ret); snapshot.toSource(ret);

View File

@ -825,6 +825,7 @@ The following example shows the value portion of a change event that the connect
"ts_ms": 1559033904863, "ts_ms": 1559033904863,
"snapshot": true, "snapshot": true,
"db": "postgres", "db": "postgres",
"sequence": "[24023119,24023128]"
"schema": "public", "schema": "public",
"table": "customers", "table": "customers",
"txId": 555, "txId": 555,
@ -890,6 +891,7 @@ a|Mandatory field that describes the source metadata for the event. This field c
* {prodname} version * {prodname} version
* Connector type and name * Connector type and name
* Database and table that contains the new row * Database and table that contains the new row
* Stringified JSON array of additional offset information. The first value is always the last committed LSN, the second value is always the current LSN. Either value may be `null`.
* Schema name * Schema name
* If the event was part of a snapshot * If the event was part of a snapshot
* ID of the transaction in which the operation was performed * ID of the transaction in which the operation was performed