DBZ-5045 Remove legacy V1 source info

This commit is contained in:
Vojtech Juranek 2022-06-07 10:22:46 +02:00 committed by Jiri Pechanec
parent 250e160b81
commit 4a48eb33df
13 changed files with 4 additions and 1214 deletions

View File

@ -1973,7 +1973,6 @@ public void shouldNotReplicateSnapshot() throws Exception {
public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws Exception {
config = TestHelper.getConfiguration().edit()
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, "v1")
.with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")
.build();

View File

@ -1,75 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.LegacyV1AbstractSourceInfoStructMaker;
public class LegacyV1MySqlSourceInfoStructMaker extends LegacyV1AbstractSourceInfoStructMaker<SourceInfo> {
private final Schema schema;
public LegacyV1MySqlSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
.name("io.debezium.connector.mysql.Source")
.field(AbstractSourceInfo.SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.SERVER_ID_KEY, Schema.INT64_SCHEMA)
.field(MySqlOffsetContext.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.field(SourceInfo.GTID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA)
.field(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Schema.INT32_SCHEMA)
.field(SourceInfo.SNAPSHOT_KEY, SchemaBuilder.bool().optional().defaultValue(false).build())
.field(SourceInfo.THREAD_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.DATABASE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.QUERY_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
@Override
public Schema schema() {
return schema;
}
@Override
public Struct struct(SourceInfo sourceInfo) {
Struct result = commonStruct();
result.put(SourceInfo.SERVER_NAME_KEY, serverName);
result.put(SourceInfo.SERVER_ID_KEY, sourceInfo.getServerId());
if (sourceInfo.getCurrentGtid() != null) {
// Don't put the GTID Set into the struct; only the current GTID is fine ...
result.put(SourceInfo.GTID_KEY, sourceInfo.getCurrentGtid());
}
result.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, sourceInfo.getCurrentBinlogFilename());
result.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, sourceInfo.getCurrentBinlogPosition());
result.put(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, sourceInfo.getCurrentRowNumber());
result.put(MySqlOffsetContext.TIMESTAMP_KEY, sourceInfo.getBinlogTimestampSeconds());
if (sourceInfo.isLastSnapshot()) {
// if the snapshot is COMPLETED, then this will not happen.
result.put(SourceInfo.SNAPSHOT_KEY, true);
}
if (sourceInfo.getThreadId() >= 0) {
result.put(SourceInfo.THREAD_KEY, sourceInfo.getThreadId());
}
if (sourceInfo.database() != null) {
result.put(SourceInfo.DATABASE_NAME_KEY, sourceInfo.database());
}
if (sourceInfo.table() != null) {
result.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.table());
}
if (sourceInfo.getQuery() != null) {
result.put(SourceInfo.QUERY_KEY, sourceInfo.getQuery());
}
return result;
}
}

View File

@ -952,7 +952,6 @@ protected boolean supportsSchemaChangesDuringIncrementalSnapshot() {
private final Predicate<String> gtidSourceFilter;
private final EventProcessingFailureHandlingMode inconsistentSchemaFailureHandlingMode;
private final Predicate<String> ddlFilter;
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
private final boolean readOnlyConnection;
public MySqlConnectorConfig(Configuration config) {
@ -989,8 +988,6 @@ public MySqlConnectorConfig(Configuration config) {
// Set up the DDL filter
final String ddlFilter = config.getString(DatabaseHistory.DDL_FILTER);
this.ddlFilter = (ddlFilter != null) ? Predicates.includes(ddlFilter) : (x -> false);
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION)));
}
public boolean useCursorFetch() {
@ -1074,12 +1071,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
@Override
protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(Version version) {
switch (version) {
case V1:
return new LegacyV1MySqlSourceInfoStructMaker(Module.name(), Module.version(), this);
default:
return new MySqlSourceInfoStructMaker(Module.name(), Module.version(), this);
}
return new MySqlSourceInfoStructMaker(Module.name(), Module.version(), this);
}
@Override
@ -1194,10 +1186,4 @@ public boolean isReadOnlyConnection() {
boolean useGlobalLock() {
return !"true".equals(config.getString(TEST_DISABLE_GLOBAL_LOCKING));
}
@SuppressWarnings("unchecked")
@Override
public SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker() {
return sourceInfoStructMaker;
}
}

View File

@ -1,745 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.GenericAssert;
import org.junit.Before;
import org.junit.Test;
import io.confluent.connect.avro.AvroData;
import io.debezium.config.CommonConnectorConfig.Version;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.SourceInfoTest.PositionAssert;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.document.Document;
public class LegacyV1SourceInfoTest {
private static int avroSchemaCacheSize = 1000;
private static final AvroData avroData = new AvroData(avroSchemaCacheSize);
private static final String FILENAME = "mysql-bin.00001";
private static final String GTID_SET = "gtid-set"; // can technically be any string
private static final String SERVER_NAME = "my-server"; // can technically be any string
private SourceInfo source;
private MySqlOffsetContext offsetContext;
private boolean inTxn = false;
private long positionOfBeginEvent = 0L;
private int eventNumberInTxn = 0;
@Before
public void beforeEach() {
offsetContext = MySqlOffsetContext.initial(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, "server")
.with(MySqlConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build()));
source = offsetContext.getSource();
positionOfBeginEvent = 0L;
eventNumberInTxn = 0;
}
@Test
public void shouldStartSourceInfoFromZeroBinlogCoordinates() {
offsetContext.setBinlogStartPoint(FILENAME, 0);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(offsetContext.eventsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
@Test
public void shouldStartSourceInfoFromNonZeroBinlogCoordinates() {
offsetContext.setBinlogStartPoint(FILENAME, 100);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
// -------------------------------------------------------------------------------------
// Test reading the offset map and recovering the proper SourceInfo state
// -------------------------------------------------------------------------------------
@Test
public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinates() {
sourceWith(offset(0, 0));
assertThat(offsetContext.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinates() {
sourceWith(offset(100, 0));
assertThat(offsetContext.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndNonZeroRow() {
sourceWith(offset(0, 5));
assertThat(offsetContext.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndNonZeroRow() {
sourceWith(offset(100, 5));
assertThat(offsetContext.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndSnapshot() {
sourceWith(offset(0, 0, true));
assertThat(offsetContext.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isTrue();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndSnapshot() {
sourceWith(offset(100, 0, true));
assertThat(offsetContext.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isTrue();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() {
sourceWith(offset(0, 5, true));
assertThat(offsetContext.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(offsetContext.isSnapshotRunning()).isTrue();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() {
sourceWith(offset(100, 5, true));
assertThat(offsetContext.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(offsetContext.isSnapshotRunning()).isTrue();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinates() {
sourceWith(offset(GTID_SET, 0, 0, false));
assertThat(offsetContext.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinatesAndNonZeroRow() {
sourceWith(offset(GTID_SET, 0, 5, false));
assertThat(offsetContext.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinates() {
sourceWith(offset(GTID_SET, 100, 0, false));
assertThat(offsetContext.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinatesAndNonZeroRow() {
sourceWith(offset(GTID_SET, 100, 5, false));
assertThat(offsetContext.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(offsetContext.isSnapshotRunning()).isFalse();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinatesAndSnapshot() {
sourceWith(offset(GTID_SET, 0, 0, true));
assertThat(offsetContext.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isTrue();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() {
sourceWith(offset(GTID_SET, 0, 5, true));
assertThat(offsetContext.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(offsetContext.isSnapshotRunning()).isTrue();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinatesAndSnapshot() {
sourceWith(offset(GTID_SET, 100, 0, true));
assertThat(offsetContext.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(offsetContext.isSnapshotRunning()).isTrue();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() {
sourceWith(offset(GTID_SET, 100, 5, true));
assertThat(offsetContext.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(offsetContext.isSnapshotRunning()).isTrue();
}
// -------------------------------------------------------------------------------------
// Test advancing SourceInfo state (similar to how the BinlogReader uses it)
// -------------------------------------------------------------------------------------
@Test
public void shouldAdvanceSourceInfoFromNonZeroPositionAndRowZeroForEventsWithOneRow() {
sourceWith(offset(100, 0));
// Try a transactions with just one event ...
handleTransactionBegin(150, 2);
handleNextEvent(200, 10, withRowCount(1));
handleTransactionCommit(210, 2);
handleTransactionBegin(210, 2);
handleNextEvent(220, 10, withRowCount(1));
handleTransactionCommit(230, 3);
handleTransactionBegin(240, 2);
handleNextEvent(250, 50, withRowCount(1));
handleTransactionCommit(300, 4);
// Try a transactions with multiple events ...
handleTransactionBegin(340, 2);
handleNextEvent(350, 20, withRowCount(1));
handleNextEvent(370, 30, withRowCount(1));
handleNextEvent(400, 40, withRowCount(1));
handleTransactionCommit(440, 4);
handleTransactionBegin(500, 2);
handleNextEvent(510, 20, withRowCount(1));
handleNextEvent(540, 15, withRowCount(1));
handleNextEvent(560, 10, withRowCount(1));
handleTransactionCommit(580, 4);
// Try another single event transaction ...
handleTransactionBegin(600, 2);
handleNextEvent(610, 50, withRowCount(1));
handleTransactionCommit(660, 4);
// Try event outside of a transaction ...
handleNextEvent(670, 10, withRowCount(1));
// Try another single event transaction ...
handleTransactionBegin(700, 2);
handleNextEvent(710, 50, withRowCount(1));
handleTransactionCommit(760, 4);
}
@Test
public void shouldAdvanceSourceInfoFromNonZeroPositionAndRowZeroForEventsWithMultipleRow() {
sourceWith(offset(100, 0));
// Try a transactions with just one event ...
handleTransactionBegin(150, 2);
handleNextEvent(200, 10, withRowCount(3));
handleTransactionCommit(210, 2);
handleTransactionBegin(210, 2);
handleNextEvent(220, 10, withRowCount(4));
handleTransactionCommit(230, 3);
handleTransactionBegin(240, 2);
handleNextEvent(250, 50, withRowCount(5));
handleTransactionCommit(300, 4);
// Try a transactions with multiple events ...
handleTransactionBegin(340, 2);
handleNextEvent(350, 20, withRowCount(6));
handleNextEvent(370, 30, withRowCount(1));
handleNextEvent(400, 40, withRowCount(3));
handleTransactionCommit(440, 4);
handleTransactionBegin(500, 2);
handleNextEvent(510, 20, withRowCount(8));
handleNextEvent(540, 15, withRowCount(9));
handleNextEvent(560, 10, withRowCount(1));
handleTransactionCommit(580, 4);
// Try another single event transaction ...
handleTransactionBegin(600, 2);
handleNextEvent(610, 50, withRowCount(1));
handleTransactionCommit(660, 4);
// Try event outside of a transaction ...
handleNextEvent(670, 10, withRowCount(5));
// Try another single event transaction ...
handleTransactionBegin(700, 2);
handleNextEvent(710, 50, withRowCount(3));
handleTransactionCommit(760, 4);
}
// -------------------------------------------------------------------------------------
// Utility methods
// -------------------------------------------------------------------------------------
protected int withRowCount(int rowCount) {
return rowCount;
}
protected void handleTransactionBegin(long positionOfEvent, int eventSize) {
offsetContext.setEventPosition(positionOfEvent, eventSize);
positionOfBeginEvent = positionOfEvent;
offsetContext.startNextTransaction();
inTxn = true;
assertThat(offsetContext.rowsToSkipUponRestart()).isEqualTo(0);
}
protected void handleTransactionCommit(long positionOfEvent, int eventSize) {
offsetContext.setEventPosition(positionOfEvent, eventSize);
offsetContext.commitTransaction();
eventNumberInTxn = 0;
inTxn = false;
// Verify the offset ...
Map<String, ?> offset = offsetContext.getOffset();
// The offset position should be the position of the next event
long position = (Long) offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY);
assertThat(position).isEqualTo(positionOfEvent + eventSize);
Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY);
if (rowsToSkip == null) {
rowsToSkip = 0L;
}
assertThat(rowsToSkip).isEqualTo(0);
assertThat(offset.get(MySqlOffsetContext.EVENTS_TO_SKIP_OFFSET_KEY)).isNull();
if (offsetContext.gtidSet() != null) {
assertThat(offset.get(MySqlOffsetContext.GTID_SET_KEY)).isEqualTo(offsetContext.gtidSet());
}
}
protected void handleNextEvent(long positionOfEvent, long eventSize, int rowCount) {
if (inTxn) {
++eventNumberInTxn;
}
offsetContext.setEventPosition(positionOfEvent, eventSize);
for (int row = 0; row != rowCount; ++row) {
// Get the offset for this row (always first!) ...
offsetContext.setRowNumber(row, rowCount);
Map<String, ?> offset = offsetContext.getOffset();
assertThat(offset.get(SourceInfo.BINLOG_FILENAME_OFFSET_KEY)).isEqualTo(FILENAME);
if (offsetContext.gtidSet() != null) {
assertThat(offset.get(MySqlOffsetContext.GTID_SET_KEY)).isEqualTo(offsetContext.gtidSet());
}
long position = (Long) offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY);
if (inTxn) {
// regardless of the row count, the position is always the txn begin position ...
assertThat(position).isEqualTo(positionOfBeginEvent);
// and the number of the last completed event (the previous one) ...
Long eventsToSkip = (Long) offset.get(MySqlOffsetContext.EVENTS_TO_SKIP_OFFSET_KEY);
if (eventsToSkip == null) {
eventsToSkip = 0L;
}
assertThat(eventsToSkip).isEqualTo(eventNumberInTxn - 1);
}
else {
// Matches the next event ...
assertThat(position).isEqualTo(positionOfEvent + eventSize);
assertThat(offset.get(MySqlOffsetContext.EVENTS_TO_SKIP_OFFSET_KEY)).isNull();
}
Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY);
if (rowsToSkip == null) {
rowsToSkip = 0L;
}
if ((row + 1) == rowCount) {
// This is the last row, so the next binlog position should be the number of rows in the event ...
assertThat(rowsToSkip).isEqualTo(rowCount);
}
else {
// This is not the last row, so the next binlog position should be the row number ...
assertThat(rowsToSkip).isEqualTo(row + 1);
}
// Get the source struct for this row (always second), which should always reflect this row in this event ...
Struct recordSource = source.struct();
assertThat(recordSource.getInt64(SourceInfo.BINLOG_POSITION_OFFSET_KEY)).isEqualTo(positionOfEvent);
assertThat(recordSource.getInt32(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY)).isEqualTo(row);
assertThat(recordSource.getString(SourceInfo.BINLOG_FILENAME_OFFSET_KEY)).isEqualTo(FILENAME);
if (offsetContext.gtidSet() != null) {
assertThat(recordSource.getString(MySqlOffsetContext.GTID_SET_KEY)).isEqualTo(offsetContext.gtidSet());
}
}
offsetContext.completeEvent();
}
protected Map<String, String> offset(long position, int row) {
return offset(null, position, row, false);
}
protected Map<String, String> offset(long position, int row, boolean snapshot) {
return offset(null, position, row, snapshot);
}
protected Map<String, String> offset(String gtidSet, long position, int row, boolean snapshot) {
Map<String, String> offset = new HashMap<>();
offset.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, FILENAME);
offset.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, Long.toString(position));
offset.put(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Integer.toString(row));
if (gtidSet != null) {
offset.put(MySqlOffsetContext.GTID_SET_KEY, gtidSet);
}
if (snapshot) {
offset.put(SourceInfo.SNAPSHOT_KEY, Boolean.TRUE.toString());
}
return offset;
}
protected SourceInfo sourceWith(Map<String, String> offset) {
offsetContext = (MySqlOffsetContext) new MySqlOffsetContext.Loader(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, SERVER_NAME)
.with(MySqlConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build())).load(offset);
source = offsetContext.getSource();
return source;
}
/**
* When we want to consume SinkRecord which generated by debezium-connector-mysql, it should not
* throw error "org.apache.avro.SchemaParseException: Illegal character in: server-id"
*/
@Test
public void shouldValidateSourceInfoSchema() {
org.apache.kafka.connect.data.Schema kafkaSchema = source.schema();
org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(kafkaSchema);
assertTrue(avroSchema != null);
}
@Test
public void shouldConsiderPositionsWithSameGtidSetsAsSame() {
assertPositionWithGtids("IdA:1-5").isAtOrBefore(positionWithGtids("IdA:1-5")); // same, single
assertPositionWithGtids("IdA:1-5,IdB:1-20").isAtOrBefore(positionWithGtids("IdA:1-5,IdB:1-20")); // same, multiple
assertPositionWithGtids("IdA:1-5,IdB:1-20").isAtOrBefore(positionWithGtids("IdB:1-20,IdA:1-5")); // equivalent
}
@Test
public void shouldConsiderPositionsWithSameGtidSetsAndSnapshotAsSame() {
assertPositionWithGtids("IdA:1-5", true).isAtOrBefore(positionWithGtids("IdA:1-5", true)); // same, single
assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdA:1-5,IdB:1-20", true)); // same,
// multiple
assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdB:1-20,IdA:1-5", true)); // equivalent
}
@Test
public void shouldOrderPositionWithGtidAndSnapshotBeforePositionWithSameGtidButNoSnapshot() {
assertPositionWithGtids("IdA:1-5", true).isAtOrBefore(positionWithGtids("IdA:1-5")); // same, single
assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdA:1-5,IdB:1-20")); // same, multiple
assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdB:1-20,IdA:1-5")); // equivalent
}
@Test
public void shouldOrderPositionWithoutGtidAndSnapshotAfterPositionWithSameGtidAndSnapshot() {
assertPositionWithGtids("IdA:1-5", false).isAfter(positionWithGtids("IdA:1-5", true)); // same, single
assertPositionWithGtids("IdA:1-5,IdB:1-20", false).isAfter(positionWithGtids("IdA:1-5,IdB:1-20", true)); // same, multiple
assertPositionWithGtids("IdA:1-5,IdB:1-20", false).isAfter(positionWithGtids("IdB:1-20,IdA:1-5", true)); // equivalent
}
@Test
public void shouldOrderPositionWithGtidsAsBeforePositionWithExtraServerUuidInGtids() {
assertPositionWithGtids("IdA:1-5").isBefore(positionWithGtids("IdA:1-5,IdB:1-20"));
}
@Test
public void shouldOrderPositionsWithSameServerButLowerUpperLimitAsBeforePositionWithSameServerUuidInGtids() {
assertPositionWithGtids("IdA:1-5").isBefore(positionWithGtids("IdA:1-6"));
assertPositionWithGtids("IdA:1-5:7-9").isBefore(positionWithGtids("IdA:1-10"));
assertPositionWithGtids("IdA:2-5:8-9").isBefore(positionWithGtids("IdA:1-10"));
}
@Test
public void shouldOrderPositionWithoutGtidAsBeforePositionWithGtid() {
assertPositionWithoutGtids("filename.01", Integer.MAX_VALUE, 0, 0).isBefore(positionWithGtids("IdA:1-5"));
}
@Test
public void shouldOrderPositionWithGtidAsAfterPositionWithoutGtid() {
assertPositionWithGtids("IdA:1-5").isAfter(positionWithoutGtids("filename.01", 0, 0, 0));
}
@Test
public void shouldComparePositionsWithoutGtids() {
// Same position ...
assertPositionWithoutGtids("fn.01", 1, 0, 0).isAt(positionWithoutGtids("fn.01", 1, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isAt(positionWithoutGtids("fn.01", 1, 0, 1));
assertPositionWithoutGtids("fn.03", 1, 0, 1).isAt(positionWithoutGtids("fn.03", 1, 0, 1));
assertPositionWithoutGtids("fn.01", 1, 1, 0).isAt(positionWithoutGtids("fn.01", 1, 1, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAt(positionWithoutGtids("fn.01", 1, 1, 1));
assertPositionWithoutGtids("fn.03", 1, 1, 1).isAt(positionWithoutGtids("fn.03", 1, 1, 1));
// Before position ...
assertPositionWithoutGtids("fn.01", 1, 0, 0).isBefore(positionWithoutGtids("fn.01", 1, 0, 1));
assertPositionWithoutGtids("fn.01", 1, 0, 0).isBefore(positionWithoutGtids("fn.01", 2, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isBefore(positionWithoutGtids("fn.01", 1, 0, 2));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isBefore(positionWithoutGtids("fn.01", 2, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 0).isBefore(positionWithoutGtids("fn.01", 1, 1, 1));
assertPositionWithoutGtids("fn.01", 1, 1, 0).isBefore(positionWithoutGtids("fn.01", 1, 2, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isBefore(positionWithoutGtids("fn.01", 1, 2, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isBefore(positionWithoutGtids("fn.01", 2, 0, 0));
// After position ...
assertPositionWithoutGtids("fn.01", 1, 0, 1).isAfter(positionWithoutGtids("fn.01", 0, 0, 99));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isAfter(positionWithoutGtids("fn.01", 1, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 0, 0, 99));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 1, 0));
}
@Test
public void shouldComparePositionsWithDifferentFields() {
Document history = positionWith("mysql-bin.000008", 380941551, "01261278-6ade-11e6-b36a-42010af00790:1-378422946,"
+ "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11002284,"
+ "716ec46f-d522-11e5-bb56-0242ac110004:1-34673215,"
+ "96c2072e-e428-11e6-9590-42010a28002d:1-3,"
+ "c627b2bc-9647-11e6-a886-42010af0044a:1-9541144", 0, 0, true);
Document current = positionWith("mysql-bin.000016", 645115324, "01261278-6ade-11e6-b36a-42010af00790:1-400944168,"
+ "30efb117-e42a-11e6-ba9e-42010a28002e:1-9,"
+ "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11604379,"
+ "621dc2f6-803b-11e6-acc1-42010af000a4:1-7963838,"
+ "716ec46f-d522-11e5-bb56-0242ac110004:1-35850702,"
+ "c627b2bc-9647-11e6-a886-42010af0044a:1-10426868,"
+ "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648", 2, 1, false);
assertThatDocument(current).isAfter(history);
Set<String> excludes = Collections.singleton("96c2072e-e428-11e6-9590-42010a28002d");
assertThatDocument(history).isAtOrBefore(current, (uuid) -> !excludes.contains(uuid));
}
@FixFor("DBZ-107")
@Test
public void shouldRemoveNewlinesFromGtidSet() {
String gtidExecuted = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2,\n" +
"7145bf69-d1ca-11e5-a588-0242ac110004:1-3149,\n" +
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-39";
String gtidCleaned = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2," +
"7145bf69-d1ca-11e5-a588-0242ac110004:1-3149," +
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-39";
offsetContext.setCompletedGtidSet(gtidExecuted);
assertThat(offsetContext.gtidSet()).isEqualTo(gtidCleaned);
}
@FixFor("DBZ-107")
@Test
public void shouldNotSetBlankGtidSet() {
offsetContext.setCompletedGtidSet("");
assertThat(offsetContext.gtidSet()).isNull();
}
@FixFor("DBZ-107")
@Test
public void shouldNotSetNullGtidSet() {
offsetContext.setCompletedGtidSet(null);
assertThat(offsetContext.gtidSet()).isNull();
}
@Test
public void shouldHaveTimestamp() {
sourceWith(offset(100, 5, true));
source.setBinlogTimestampSeconds(1_024);
source.databaseEvent(null);
assertThat(source.struct().get("ts_sec")).isEqualTo(1_024L);
}
@Test
public void versionIsPresent() {
sourceWith(offset(100, 5, true));
source.databaseEvent(null);
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version());
}
@Test
public void connectorIsPresent() {
sourceWith(offset(100, 5, true));
source.databaseEvent(null);
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
@Test
public void schemaIsCorrect() {
final Schema schema = SchemaBuilder.struct()
.name("io.debezium.connector.mysql.Source")
.field("version", Schema.OPTIONAL_STRING_SCHEMA)
.field("connector", Schema.OPTIONAL_STRING_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("server_id", Schema.INT64_SCHEMA)
.field("ts_sec", Schema.INT64_SCHEMA)
.field("gtid", Schema.OPTIONAL_STRING_SCHEMA)
.field("file", Schema.STRING_SCHEMA)
.field("pos", Schema.INT64_SCHEMA)
.field("row", Schema.INT32_SCHEMA)
.field("snapshot", SchemaBuilder.bool().optional().defaultValue(false).build())
.field("thread", Schema.OPTIONAL_INT64_SCHEMA)
.field("db", Schema.OPTIONAL_STRING_SCHEMA)
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
.field("query", Schema.OPTIONAL_STRING_SCHEMA)
.build();
VerifyRecord.assertConnectSchemasAreEqual(null, source.schema(), schema);
}
protected Document positionWithGtids(String gtids) {
return positionWithGtids(gtids, false);
}
protected Document positionWithGtids(String gtids, boolean snapshot) {
if (snapshot) {
return Document.create(MySqlOffsetContext.GTID_SET_KEY, gtids, SourceInfo.SNAPSHOT_KEY, true);
}
return Document.create(MySqlOffsetContext.GTID_SET_KEY, gtids);
}
protected Document positionWithoutGtids(String filename, int position, int event, int row) {
return positionWithoutGtids(filename, position, event, row, false);
}
protected Document positionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) {
return positionWith(filename, position, null, event, row, snapshot);
}
protected Document positionWith(String filename, int position, String gtids, int event, int row, boolean snapshot) {
Document pos = Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename,
SourceInfo.BINLOG_POSITION_OFFSET_KEY, position);
if (row >= 0) {
pos = pos.set(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row);
}
if (event >= 0) {
pos = pos.set(MySqlOffsetContext.EVENTS_TO_SKIP_OFFSET_KEY, event);
}
if (gtids != null && gtids.trim().length() != 0) {
pos = pos.set(MySqlOffsetContext.GTID_SET_KEY, gtids);
}
if (snapshot) {
pos = pos.set(SourceInfo.SNAPSHOT_KEY, true);
}
return pos;
}
protected PositionAssert assertThatDocument(Document position) {
return new PositionAssert(position);
}
protected PositionAssert assertPositionWithGtids(String gtids) {
return assertThatDocument(positionWithGtids(gtids));
}
protected PositionAssert assertPositionWithGtids(String gtids, boolean snapshot) {
return assertThatDocument(positionWithGtids(gtids, snapshot));
}
protected PositionAssert assertPositionWithoutGtids(String filename, int position, int event, int row) {
return assertPositionWithoutGtids(filename, position, event, row, false);
}
protected PositionAssert assertPositionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) {
return assertThatDocument(positionWithoutGtids(filename, position, event, row, snapshot));
}
protected static class PositionAssert extends GenericAssert<PositionAssert, Document> {
public PositionAssert(Document position) {
super(PositionAssert.class, position);
}
public PositionAssert isAt(Document otherPosition) {
return isAt(otherPosition, null);
}
public PositionAssert isAt(Document otherPosition, Predicate<String> gtidFilter) {
final MySqlHistoryRecordComparator comparator = new MySqlHistoryRecordComparator(gtidFilter);
if (comparator.isPositionAtOrBefore(actual, otherPosition)) {
return this;
}
failIfCustomMessageIsSet();
throw failure(actual + " should be consider same position as " + otherPosition);
}
public PositionAssert isBefore(Document otherPosition) {
return isBefore(otherPosition, null);
}
public PositionAssert isBefore(Document otherPosition, Predicate<String> gtidFilter) {
return isAtOrBefore(otherPosition, gtidFilter);
}
public PositionAssert isAtOrBefore(Document otherPosition) {
return isAtOrBefore(otherPosition, null);
}
public PositionAssert isAtOrBefore(Document otherPosition, Predicate<String> gtidFilter) {
final MySqlHistoryRecordComparator comparator = new MySqlHistoryRecordComparator(gtidFilter);
if (comparator.isPositionAtOrBefore(actual, otherPosition)) {
return this;
}
failIfCustomMessageIsSet();
throw failure(actual + " should be consider same position as or before " + otherPosition);
}
public PositionAssert isAfter(Document otherPosition) {
return isAfter(otherPosition, null);
}
public PositionAssert isAfter(Document otherPosition, Predicate<String> gtidFilter) {
final MySqlHistoryRecordComparator comparator = new MySqlHistoryRecordComparator(gtidFilter);
if (!comparator.isPositionAtOrBefore(actual, otherPosition)) {
return this;
}
failIfCustomMessageIsSet();
throw failure(actual + " should be consider after " + otherPosition);
}
}
}

View File

@ -1,74 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.LegacyV1AbstractSourceInfoStructMaker;
import io.debezium.connector.SnapshotRecord;
import io.debezium.time.Conversions;
public class LegacyV1PostgresSourceInfoStructMaker extends LegacyV1AbstractSourceInfoStructMaker<SourceInfo> {
private final Schema schema;
private final String serverName;
public LegacyV1PostgresSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
.name("io.debezium.connector.postgresql.Source")
.field(SourceInfo.SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.DATABASE_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.TIMESTAMP_USEC_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.TXID_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.LSN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.SCHEMA_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.SNAPSHOT_KEY, SchemaBuilder.bool().optional().defaultValue(false).build())
.field(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field(SourceInfo.XMIN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.build();
this.serverName = connectorConfig.getLogicalName();
}
@Override
public Schema schema() {
return schema;
}
@Override
public Struct struct(SourceInfo sourceInfo) {
assert sourceInfo.database() != null
&& sourceInfo.schemaName() != null
&& sourceInfo.tableName() != null;
Struct result = super.commonStruct();
result.put(SourceInfo.SERVER_NAME_KEY, serverName);
result.put(SourceInfo.DATABASE_NAME_KEY, sourceInfo.database());
result.put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.schemaName());
result.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.tableName());
if (sourceInfo.timestamp() != null) {
result.put(SourceInfo.TIMESTAMP_USEC_KEY, Conversions.toEpochMicros(sourceInfo.timestamp()));
}
if (sourceInfo.txId() != null) {
result.put(SourceInfo.TXID_KEY, sourceInfo.txId());
}
if (sourceInfo.lsn() != null) {
result.put(SourceInfo.LSN_KEY, sourceInfo.lsn().asLong());
}
if (sourceInfo.xmin() != null) {
result.put(SourceInfo.XMIN_KEY, sourceInfo.xmin());
}
if (sourceInfo.isSnapshot()) {
result.put(SourceInfo.SNAPSHOT_KEY, true);
result.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, sourceInfo.snapshot() == SnapshotRecord.LAST);
}
return result;
}
}

View File

@ -972,12 +972,7 @@ protected int moneyFractionDigits() {
@Override
protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(Version version) {
switch (version) {
case V1:
return new LegacyV1PostgresSourceInfoStructMaker(Module.name(), Module.version(), this);
default:
return new PostgresSourceInfoStructMaker(Module.name(), Module.version(), this);
}
return new PostgresSourceInfoStructMaker(Module.name(), Module.version(), this);
}
private static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()

View File

@ -1,81 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import static org.fest.assertions.Assertions.assertThat;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig.Version;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.relational.TableId;
import io.debezium.time.Conversions;
/**
* @author Jiri Pechanec
*
*/
public class LegacyV1SourceInfoTest {
private SourceInfo source;
@Before
public void beforeEach() {
source = new SourceInfo(new PostgresConnectorConfig(
Configuration.create()
.with(PostgresConnectorConfig.SERVER_NAME, "serverX")
.with(PostgresConnectorConfig.DATABASE_NAME, "serverX")
.with(PostgresConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build()));
source.update(Conversions.toInstantFromMicros(123_456_789L), new TableId("catalogNameX", "schemaNameX", "tableNameX"));
}
@Test
public void versionIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version());
}
@Test
public void connectorIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
@Test
@FixFor("DBZ-934")
public void canHandleNullValues() {
source.update(null, null, null, null, null);
}
@Test
public void shouldHaveTimestamp() {
assertThat(source.struct().getInt64(SourceInfo.TIMESTAMP_USEC_KEY)).isEqualTo(123_456_789L);
}
@Test
public void schemaIsCorrect() {
final Schema schema = SchemaBuilder.struct()
.name("io.debezium.connector.postgresql.Source")
.field("version", Schema.OPTIONAL_STRING_SCHEMA)
.field("connector", Schema.OPTIONAL_STRING_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("db", Schema.STRING_SCHEMA)
.field("ts_usec", Schema.OPTIONAL_INT64_SCHEMA)
.field("txId", Schema.OPTIONAL_INT64_SCHEMA)
.field("lsn", Schema.OPTIONAL_INT64_SCHEMA)
.field("schema", Schema.OPTIONAL_STRING_SCHEMA)
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
.field("snapshot", SchemaBuilder.bool().optional().defaultValue(false).build())
.field("last_snapshot_record", Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field("xmin", Schema.OPTIONAL_INT64_SCHEMA)
.build();
assertThat(source.struct().schema()).isEqualTo(schema);
}
}

View File

@ -61,7 +61,6 @@
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.Version;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
@ -315,32 +314,6 @@ public void shouldProduceEventsWithInitialSnapshot() throws Exception {
assertRecordsAfterInsert(2, 3, 3);
}
@Test
@FixFor("DBZ-1174")
public void shouldUseMicrosecondsForTransactionCommitTime() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
start(PostgresConnector.class, TestHelper.defaultConfig()
.with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build());
assertConnectorIsRunning();
// check records from snapshot
Instant inst = Instant.now();
// Microseconds since epoch, may overflow
final long microsSnapshot = TimeUnit.SECONDS.toMicros(inst.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(inst.getNano());
SourceRecords actualRecords = consumeRecordsByTopic(2);
actualRecords.forEach(sourceRecord -> assertSourceInfoMicrosecondTransactionTimestamp(sourceRecord, microsSnapshot, TimeUnit.MINUTES.toMicros(1L)));
// insert 2 new records
TestHelper.execute(INSERT_STMT);
// check records from streaming
inst = Instant.now();
// Microseconds since epoch, may overflow
final long microsStream = TimeUnit.SECONDS.toMicros(inst.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(inst.getNano());
actualRecords = consumeRecordsByTopic(2);
actualRecords.forEach(sourceRecord -> assertSourceInfoMicrosecondTransactionTimestamp(sourceRecord, microsStream, TimeUnit.MINUTES.toMicros(1L)));
}
@Test
@FixFor("DBZ-1235")
public void shouldUseMillisecondsForTransactionCommitTime() throws InterruptedException {
@ -2735,7 +2708,6 @@ private List<Long> getSequence(SourceRecord record) {
public void shouldHaveLastCommitLsn() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
start(PostgresConnector.class, TestHelper.defaultConfig()
.with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V2)
.with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.build());
@ -2858,14 +2830,6 @@ private void assertRecordsAfterInsert(int expectedCount, int... pks) throws Inte
IntStream.range(0, expectedCountPerSchema).forEach(i -> VerifyRecord.isValidInsert(recordsForTopicS2.remove(0), PK_FIELD, pks[i]));
}
protected void assertSourceInfoMicrosecondTransactionTimestamp(SourceRecord record, long ts_usec, long tolerance_usec) {
assertTrue(record.value() instanceof Struct);
Struct source = ((Struct) record.value()).getStruct("source");
// 1 minute difference is okay
System.out.println("TS_USEC\t" + source.getInt64("ts_usec"));
assertTrue(Math.abs(ts_usec - source.getInt64("ts_usec")) < tolerance_usec);
}
protected void assertSourceInfoMillisecondTransactionTimestamp(SourceRecord record, long ts_ms, long tolerance_ms) {
assertTrue(record.value() instanceof Struct);
Struct source = ((Struct) record.value()).getStruct("source");

View File

@ -1,56 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.LegacyV1AbstractSourceInfoStructMaker;
public class LegacyV1SqlServerSourceInfoStructMaker extends LegacyV1AbstractSourceInfoStructMaker<SourceInfo> {
public static final String SERVER_NAME_KEY = "name";
public static final String LOG_TIMESTAMP_KEY = "ts_ms";
public static final String CHANGE_LSN_KEY = "change_lsn";
public static final String COMMIT_LSN_KEY = "commit_lsn";
public static final String SNAPSHOT_KEY = "snapshot";
private final Schema schema;
public LegacyV1SqlServerSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
.name("io.debezium.connector.sqlserver.Source")
.field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(LOG_TIMESTAMP_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(CHANGE_LSN_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(COMMIT_LSN_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SNAPSHOT_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.build();
}
@Override
public Schema schema() {
return schema;
}
@Override
public Struct struct(SourceInfo sourceInfo) {
final Struct ret = super.commonStruct()
.put(SERVER_NAME_KEY, serverName)
.put(LOG_TIMESTAMP_KEY, sourceInfo.timestamp() == null ? null : sourceInfo.timestamp().toEpochMilli())
.put(SNAPSHOT_KEY, sourceInfo.isSnapshot());
if (sourceInfo.getChangeLsn() != null && sourceInfo.getChangeLsn().isAvailable()) {
ret.put(CHANGE_LSN_KEY, sourceInfo.getChangeLsn().toString());
}
if (sourceInfo.getCommitLsn() != null && sourceInfo.getCommitLsn().isAvailable()) {
ret.put(COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString());
}
return ret;
}
}

View File

@ -446,12 +446,7 @@ protected boolean supportsSchemaChangesDuringIncrementalSnapshot() {
@Override
protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(Version version) {
switch (version) {
case V1:
return new LegacyV1SqlServerSourceInfoStructMaker(Module.name(), Module.version(), this);
default:
return new SqlServerSourceInfoStructMaker(Module.name(), Module.version(), this);
}
return new SqlServerSourceInfoStructMaker(Module.name(), Module.version(), this);
}
private static class SystemTablesPredicate implements TableFilter {

View File

@ -1,89 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import static org.fest.assertions.Assertions.assertThat;
import java.time.Instant;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig.Version;
import io.debezium.config.Configuration;
import io.debezium.connector.SnapshotRecord;
public class LegacyV1SourceInfoTest {
private SourceInfo source;
@Before
public void beforeEach() {
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(
Configuration.create()
.with(SqlServerConnectorConfig.SERVER_NAME, "serverX")
.with(SqlServerConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build());
source = new SourceInfo(connectorConfig);
source.setChangeLsn(Lsn.valueOf(new byte[]{ 0x01 }));
source.setCommitLsn(Lsn.valueOf(new byte[]{ 0x02 }));
source.setSnapshot(SnapshotRecord.TRUE);
source.setSourceTime(Instant.ofEpochMilli(3000));
}
@Test
public void versionIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version());
}
@Test
public void connectorIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
@Test
public void serverNameIsPresent() {
assertThat(source.struct().getString(SourceInfo.SERVER_NAME_KEY)).isEqualTo("serverX");
}
@Test
public void changeLsnIsPresent() {
assertThat(source.struct().getString(SourceInfo.CHANGE_LSN_KEY)).isEqualTo(Lsn.valueOf(new byte[]{ 0x01 }).toString());
}
@Test
public void commitLsnIsPresent() {
assertThat(source.struct().getString(SourceInfo.COMMIT_LSN_KEY)).isEqualTo(Lsn.valueOf(new byte[]{ 0x02 }).toString());
}
@Test
public void snapshotIsPresent() {
assertThat(source.struct().getBoolean(SourceInfo.SNAPSHOT_KEY)).isEqualTo(Boolean.TRUE);
}
@Test
public void timestampIsPresent() {
assertThat(source.struct().getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(3000);
}
@Test
public void schemaIsCorrect() {
final Schema schema = SchemaBuilder.struct()
.name("io.debezium.connector.sqlserver.Source")
.field("version", Schema.OPTIONAL_STRING_SCHEMA)
.field("connector", Schema.OPTIONAL_STRING_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("ts_ms", Schema.OPTIONAL_INT64_SCHEMA)
.field("change_lsn", Schema.OPTIONAL_STRING_SCHEMA)
.field("commit_lsn", Schema.OPTIONAL_STRING_SCHEMA)
.field("snapshot", Schema.OPTIONAL_BOOLEAN_SCHEMA)
.build();
assertThat(source.struct().schema()).isEqualTo(schema);
}
}

View File

@ -28,7 +28,6 @@
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.Version;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotIsolationMode;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
@ -182,24 +181,6 @@ public void testDeadlockDetection() throws Exception {
connection.rollback();
}
@Test
public void takeSnapshotWithOldStructAndStartStreaming() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Ignore initial records
final SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE);
final List<SourceRecord> table1 = records.recordsForTopic("server1.dbo.table1");
table1.forEach(record -> {
assertThat(((Struct) record.value()).getStruct("source").getBoolean("snapshot")).isTrue();
});
testStreaming();
}
private void testStreaming() throws SQLException, InterruptedException {
for (int i = 0; i < STREAMING_RECORDS_PER_TABLE; i++) {
final int id = i + INITIAL_RECORDS_PER_TABLE;

View File

@ -421,15 +421,6 @@ public static SchemaNameAdjustmentMode parse(String value) {
.withDescription(
"this setting must be set to specify a list of tables/collections whose snapshot must be taken on creating or restarting the connector.");
public static final Field SOURCE_STRUCT_MAKER_VERSION = Field.create("source.struct.version")
.withDisplayName("Source struct maker version")
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 19))
.withEnum(Version.class, Version.V2)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("A version of the format of the publicly visible source part in the message")
.withValidation(Field::isClassName);
public static final Field SANITIZE_FIELD_NAMES = Field.create("sanitize.field.names")
.withDisplayName("Sanitize field names to adhere to Avro naming conventions")
.withType(Type.BOOLEAN)
@ -568,7 +559,6 @@ public static SchemaNameAdjustmentMode parse(String value) {
CUSTOM_CONVERTERS,
SANITIZE_FIELD_NAMES,
TOMBSTONES_ON_DELETE,
SOURCE_STRUCT_MAKER_VERSION,
Heartbeat.HEARTBEAT_INTERVAL,
Heartbeat.HEARTBEAT_TOPICS_PREFIX,
SIGNAL_DATA_COLLECTION,
@ -621,7 +611,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de
this.incrementalSnapshotChunkSize = config.getInteger(INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
this.incrementalSnapshotAllowSchemaChanges = config.getBoolean(INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES);
this.schemaNameAdjustmentMode = SchemaNameAdjustmentMode.parse(config.getString(SCHEMA_NAME_ADJUSTMENT_MODE));
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION)));
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.V2);
this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config);
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
this.eventProcessingFailureHandlingMode = EventProcessingFailureHandlingMode.parse(config.getString(EVENT_PROCESSING_FAILURE_HANDLING_MODE));