DBZ-1295 3state snapshot for MySQL
This commit is contained in:
parent
7f35d70d0e
commit
4b64c8e447
@ -28,10 +28,13 @@
|
|||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.data.Struct;
|
||||||
import org.apache.kafka.connect.errors.ConnectException;
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
import org.apache.kafka.connect.source.SourceRecord;
|
import org.apache.kafka.connect.source.SourceRecord;
|
||||||
|
|
||||||
|
import io.debezium.connector.SnapshotRecord;
|
||||||
import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
|
import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
|
||||||
|
import io.debezium.data.Envelope;
|
||||||
import io.debezium.function.BufferedBlockingConsumer;
|
import io.debezium.function.BufferedBlockingConsumer;
|
||||||
import io.debezium.function.Predicates;
|
import io.debezium.function.Predicates;
|
||||||
import io.debezium.heartbeat.Heartbeat;
|
import io.debezium.heartbeat.Heartbeat;
|
||||||
@ -601,7 +604,7 @@ protected void execute() {
|
|||||||
source.markLastSnapshot(context.config());
|
source.markLastSnapshot(context.config());
|
||||||
long stop = clock.currentTimeInMillis();
|
long stop = clock.currentTimeInMillis();
|
||||||
try {
|
try {
|
||||||
bufferedRecordQueue.close(this::replaceOffset);
|
bufferedRecordQueue.close(this::replaceOffsetAndSource);
|
||||||
if (logger.isInfoEnabled()) {
|
if (logger.isInfoEnabled()) {
|
||||||
logger.info("Step {}: scanned {} rows in {} tables in {}",
|
logger.info("Step {}: scanned {} rows in {} tables in {}",
|
||||||
step, totalRowCount, tableIds.size(), Strings.duration(stop - startScan));
|
step, totalRowCount, tableIds.size(), Strings.duration(stop - startScan));
|
||||||
@ -867,17 +870,22 @@ private void logRolesForCurrentUser(JdbcConnection mysql) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility method to replace the offset in the given record with the latest. This is used on the last record produced
|
* Utility method to replace the offset and the source in the given record with the latest. This is used on the last record produced
|
||||||
* during the snapshot.
|
* during the snapshot.
|
||||||
*
|
*
|
||||||
* @param record the record
|
* @param record the record
|
||||||
* @return the updated record
|
* @return the updated record
|
||||||
*/
|
*/
|
||||||
protected SourceRecord replaceOffset(SourceRecord record) {
|
protected SourceRecord replaceOffsetAndSource(SourceRecord record) {
|
||||||
if (record == null) {
|
if (record == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
Map<String, ?> newOffset = context.source().offset();
|
Map<String, ?> newOffset = context.source().offset();
|
||||||
|
final Struct envelope = (Struct) record.value();
|
||||||
|
final Struct source = (Struct) envelope.get(Envelope.FieldName.SOURCE);
|
||||||
|
if (SnapshotRecord.fromSource(source) == SnapshotRecord.TRUE) {
|
||||||
|
SnapshotRecord.LAST.toSource(source);
|
||||||
|
}
|
||||||
return new SourceRecord(record.sourcePartition(),
|
return new SourceRecord(record.sourcePartition(),
|
||||||
newOffset,
|
newOffset,
|
||||||
record.topic(),
|
record.topic(),
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import io.debezium.annotation.NotThreadSafe;
|
import io.debezium.annotation.NotThreadSafe;
|
||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
import io.debezium.connector.AbstractSourceInfo;
|
import io.debezium.connector.AbstractSourceInfo;
|
||||||
|
import io.debezium.connector.SnapshotRecord;
|
||||||
import io.debezium.connector.SourceInfoStructMaker;
|
import io.debezium.connector.SourceInfoStructMaker;
|
||||||
import io.debezium.data.Envelope;
|
import io.debezium.data.Envelope;
|
||||||
import io.debezium.document.Document;
|
import io.debezium.document.Document;
|
||||||
@ -842,8 +843,11 @@ protected Instant timestamp() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean snapshot() {
|
protected SnapshotRecord snapshot() {
|
||||||
return isLastSnapshot();
|
if (isSnapshotInEffect()) {
|
||||||
|
return SnapshotRecord.TRUE;
|
||||||
|
}
|
||||||
|
return SnapshotRecord.FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -341,8 +341,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
|||||||
SourceRecord secondToLast = allRecords.get(allRecords.size() - 2);
|
SourceRecord secondToLast = allRecords.get(allRecords.size() - 2);
|
||||||
assertThat(secondToLast.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isTrue();
|
assertThat(secondToLast.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isTrue();
|
||||||
assertThat(last.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isFalse(); // not snapshot
|
assertThat(last.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isFalse(); // not snapshot
|
||||||
assertThat(((Struct) secondToLast.value()).getStruct(Envelope.FieldName.SOURCE).getBoolean(SourceInfo.SNAPSHOT_KEY)).isTrue();
|
assertThat(((Struct) secondToLast.value()).getStruct(Envelope.FieldName.SOURCE).getString(SourceInfo.SNAPSHOT_KEY)).isEqualTo("true");
|
||||||
assertThat(((Struct) last.value()).getStruct(Envelope.FieldName.SOURCE).getBoolean(SourceInfo.SNAPSHOT_KEY)).isTrue();
|
assertThat(((Struct) last.value()).getStruct(Envelope.FieldName.SOURCE).getString(SourceInfo.SNAPSHOT_KEY)).isEqualTo("last");
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------------------------------------------
|
||||||
// Stopping the connector does not lose events recorded when connector is not running
|
// Stopping the connector does not lose events recorded when connector is not running
|
||||||
|
@ -206,7 +206,6 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
|
|||||||
VerifyRecord.hasNoSourceQuery(record);
|
VerifyRecord.hasNoSourceQuery(record);
|
||||||
store.add(record);
|
store.add(record);
|
||||||
schemaChanges.add(record);
|
schemaChanges.add(record);
|
||||||
System.out.println(record);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// The last poll should always return null ...
|
// The last poll should always return null ...
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
|
|
||||||
import io.confluent.connect.avro.AvroData;
|
import io.confluent.connect.avro.AvroData;
|
||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.connector.AbstractSourceInfoStructMaker;
|
||||||
import io.debezium.data.VerifyRecord;
|
import io.debezium.data.VerifyRecord;
|
||||||
import io.debezium.doc.FixFor;
|
import io.debezium.doc.FixFor;
|
||||||
import io.debezium.document.Document;
|
import io.debezium.document.Document;
|
||||||
@ -650,7 +651,7 @@ public void schemaIsCorrect() {
|
|||||||
.field("connector", Schema.STRING_SCHEMA)
|
.field("connector", Schema.STRING_SCHEMA)
|
||||||
.field("name", Schema.STRING_SCHEMA)
|
.field("name", Schema.STRING_SCHEMA)
|
||||||
.field("ts_ms", Schema.INT64_SCHEMA)
|
.field("ts_ms", Schema.INT64_SCHEMA)
|
||||||
.field("snapshot", SchemaBuilder.bool().optional().defaultValue(false).build())
|
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
|
||||||
.field("db", Schema.STRING_SCHEMA)
|
.field("db", Schema.STRING_SCHEMA)
|
||||||
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
|
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
|
||||||
.field("server_id", Schema.INT64_SCHEMA)
|
.field("server_id", Schema.INT64_SCHEMA)
|
||||||
|
@ -300,7 +300,7 @@ private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
|
|||||||
|
|
||||||
private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
|
private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
|
||||||
final Struct envelope = (Struct) currentRecord.value();
|
final Struct envelope = (Struct) currentRecord.value();
|
||||||
final Struct source = (Struct) envelope.get("source");
|
final Struct source = (Struct) envelope.get(Envelope.FieldName.SOURCE);
|
||||||
if (source.schema().field(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null && source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) {
|
if (source.schema().field(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null && source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) {
|
||||||
source.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true);
|
source.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true);
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ public enum SnapshotRecord {
|
|||||||
LAST;
|
LAST;
|
||||||
|
|
||||||
public static SnapshotRecord fromSource(Struct source) {
|
public static SnapshotRecord fromSource(Struct source) {
|
||||||
if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null && io.debezium.data.Enum.LOGICAL_NAME.equals(source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY).name())) {
|
if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null && io.debezium.data.Enum.LOGICAL_NAME.equals(source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY).schema().name())) {
|
||||||
final String snapshotString = source.getString(AbstractSourceInfo.SNAPSHOT_KEY);
|
final String snapshotString = source.getString(AbstractSourceInfo.SNAPSHOT_KEY);
|
||||||
if (snapshotString != null) {
|
if (snapshotString != null) {
|
||||||
return SnapshotRecord.valueOf(snapshotString.toUpperCase());
|
return SnapshotRecord.valueOf(snapshotString.toUpperCase());
|
||||||
|
Loading…
Reference in New Issue
Block a user