DBZ-1295 3state snapshot for Oracle

This commit is contained in:
Jiri Pechanec 2019-05-31 10:46:31 +02:00 committed by Gunnar Morling
parent da44d93a92
commit dc76aa4ea2
4 changed files with 19 additions and 26 deletions

View File

@ -13,6 +13,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
@ -44,7 +45,7 @@ private OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, Lcr
postSnapshotCompletion();
}
else {
sourceInfo.setSnapshot(snapshot);
sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE);
}
}
@ -159,7 +160,7 @@ public boolean isSnapshotRunning() {
@Override
public void preSnapshotStart() {
sourceInfo.setSnapshot(true);
sourceInfo.setSnapshot(SnapshotRecord.TRUE);
snapshotCompleted = false;
}
@ -170,7 +171,7 @@ public void preSnapshotCompletion() {
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(false);
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
@Override
@ -187,6 +188,11 @@ public String toString() {
return sb.toString();
}
@Override
public void markLastSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.LAST);
}
public static class Loader implements OffsetContext.Loader {
private final OracleConnectorConfig connectorConfig;

View File

@ -10,11 +10,11 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.relational.TableId;
@NotThreadSafe
public class SourceInfo extends AbstractSourceInfo {
public class SourceInfo extends BaseSourceInfo {
public static final String TXID_KEY = "txId";
public static final String SCN_KEY = "scn";
@ -25,7 +25,6 @@ public class SourceInfo extends AbstractSourceInfo {
private LcrPosition lcrPosition;
private String transactionId;
private Instant sourceTime;
private boolean snapshot;
private TableId tableId;
protected SourceInfo(OracleConnectorConfig connectorConfig) {
@ -71,14 +70,6 @@ public void setSourceTime(Instant sourceTime) {
this.sourceTime = sourceTime;
}
public void setSnapshot(boolean snapshot) {
this.snapshot = snapshot;
}
public boolean isSnapshot() {
return snapshot;
}
public TableId getTableId() {
return tableId;
}
@ -92,11 +83,6 @@ protected Instant timestamp() {
return sourceTime;
}
@Override
protected boolean snapshot() {
return isSnapshot();
}
@Override
protected String database() {
return tableId.catalog();

View File

@ -111,7 +111,7 @@ public void shouldTakeSnapshot() throws Exception {
assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false);
Struct source = (Struct) ((Struct) record1.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("true");
SourceRecord record2 = testTableRecords.get(1);
VerifyRecord.isValidRead(record2, "ID", 2);
@ -125,7 +125,7 @@ public void shouldTakeSnapshot() throws Exception {
assertThat(record2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
source = (Struct) ((Struct) record2.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("last");
}
@Test
@ -154,7 +154,7 @@ public void shouldContinueWithStreamingAfterSnapshot() throws Exception {
assertThat(after.get("ID")).isEqualTo(1);
Struct source = (Struct) ((Struct) record1.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("true");
assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull();
assertThat(source.get(SourceInfo.SERVER_NAME_KEY)).isEqualTo("server1");
assertThat(source.get(SourceInfo.DEBEZIUM_VERSION_KEY)).isNotNull();
@ -190,7 +190,7 @@ public void shouldContinueWithStreamingAfterSnapshot() throws Exception {
assertThat(record3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse();
source = (Struct) ((Struct) record3.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(false);
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("false");
assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull();
assertThat(source.get(SourceInfo.SERVER_NAME_KEY)).isEqualTo("server1");
assertThat(source.get(SourceInfo.DEBEZIUM_VERSION_KEY)).isNotNull();
@ -226,7 +226,7 @@ public void shouldStreamTransaction() throws Exception {
assertThat(after.get("ID")).isEqualTo(1);
Struct source = (Struct) ((Struct) record1.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("true");
assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull();
assertThat(source.get(SourceInfo.SERVER_NAME_KEY)).isEqualTo("server1");
assertThat(source.get(SourceInfo.DEBEZIUM_VERSION_KEY)).isNotNull();
@ -280,7 +280,7 @@ private void assertTxBatch(int expectedRecordCount, int offset) throws Interrupt
assertThat(record3.sourceOffset().containsKey(SourceInfo.SCN_KEY)).isFalse();
source = (Struct) ((Struct) record3.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(false);
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("false");
assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull();
assertThat(source.get(SourceInfo.LCR_POSITION_KEY)).isNotNull();
assertThat(source.get(SourceInfo.SERVER_NAME_KEY)).isEqualTo("server1");

View File

@ -11,6 +11,7 @@
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.AbstractSourceInfoStructMaker;
import io.debezium.data.VerifyRecord;
import io.debezium.relational.TableId;
@ -53,7 +54,7 @@ public void schemaIsCorrect() {
.field("connector", Schema.STRING_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("ts_ms", Schema.INT64_SCHEMA)
.field("snapshot", SchemaBuilder.bool().optional().defaultValue(false).build())
.field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA)
.field("db", Schema.STRING_SCHEMA)
.field("schema", Schema.STRING_SCHEMA)
.field("table", Schema.STRING_SCHEMA)