diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java index 5e3554e8a..0a83e016d 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java @@ -12,11 +12,9 @@ import java.io.Serializable; import java.security.GeneralSecurityException; import java.security.KeyStore; -import java.security.UnrecoverableKeyException; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.KeyManager; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.BitSet; @@ -30,6 +28,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/LegacyV1MySqlSourceInfoStructMaker.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/LegacyV1MySqlSourceInfoStructMaker.java new file mode 100644 index 000000000..23faf25ec --- /dev/null +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/LegacyV1MySqlSourceInfoStructMaker.java @@ -0,0 +1,70 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.connector.AbstractSourceInfoStructMaker; + +public class LegacyV1MySqlSourceInfoStructMaker extends AbstractSourceInfoStructMaker { + + private final Schema schema; + + public LegacyV1MySqlSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) { + super(connector, version, connectorConfig); + schema = commonSchemaBuilder() + .name("io.debezium.connector.mysql.Source") + .field(SourceInfo.SERVER_ID_KEY, Schema.INT64_SCHEMA) + .field(SourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA) + .field(SourceInfo.GTID_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA) + .field(SourceInfo.BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA) + .field(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Schema.INT32_SCHEMA) + .field(SourceInfo.SNAPSHOT_KEY, SchemaBuilder.bool().optional().defaultValue(false).build()) + .field(SourceInfo.THREAD_KEY, Schema.OPTIONAL_INT64_SCHEMA) + .field(SourceInfo.DB_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.QUERY_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + } + + @Override + public Schema schema() { + return schema; + } + + @Override + public Struct struct(SourceInfo sourceInfo) { + Struct result = commonStruct(); + result.put(SourceInfo.SERVER_ID_KEY, sourceInfo.getServerId()); + if (sourceInfo.getCurrentGtid() != null) { + // Don't put the GTID Set into the struct; only the current GTID is fine ... + result.put(SourceInfo.GTID_KEY, sourceInfo.getCurrentGtid()); + } + result.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, sourceInfo.getCurrentBinlogFilename()); + result.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, sourceInfo.getCurrentBinlogPosition()); + result.put(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, sourceInfo.getCurrentRowNumber()); + result.put(SourceInfo.TIMESTAMP_KEY, sourceInfo.getBinlogTimestampSeconds()); + if (sourceInfo.isLastSnapshot()) { + // if the snapshot is COMPLETED, then this will not happen. + result.put(SourceInfo.SNAPSHOT_KEY, true); + } + if (sourceInfo.getThreadId() >= 0) { + result.put(SourceInfo.THREAD_KEY, sourceInfo.getThreadId()); + } + if (sourceInfo.getTableId() != null) { + result.put(SourceInfo.DB_NAME_KEY, sourceInfo.getTableId().catalog()); + result.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table()); + } + if (sourceInfo.getQuery() != null) { + result.put(SourceInfo.QUERY_KEY, sourceInfo.getQuery()); + } + return result; + } +} diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 4b36741d7..c0292da1e 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -19,6 +19,8 @@ import io.debezium.config.EnumeratedValue; import io.debezium.config.Field; import io.debezium.config.Field.ValidationOutput; +import io.debezium.connector.AbstractSourceInfo; +import io.debezium.connector.SourceInfoStructMaker; import io.debezium.heartbeat.Heartbeat; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.TemporalPrecisionMode; @@ -1175,4 +1177,14 @@ private static int randomServerId() { int highestServerId = 6400; return lowestServerId + new Random().nextInt(highestServerId - lowestServerId); } + + @Override + protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { + switch (version) { + case V1: + return new LegacyV1MySqlSourceInfoStructMaker(Module.name(), Module.version(), this); + default: + return new MySqlSourceInfoStructMaker(Module.name(), Module.version(), this); + } + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index 6ff2ffe80..9d2fc448c 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -94,7 +94,7 @@ public MySqlSchema(MySqlConnectorConfig configuration, TableFilter.fromPredicate(tableFilters.tableFilter()), tableFilters.columnFilter(), new TableSchemaBuilder( - getValueConverters(configuration), SchemaNameAdjuster.create(logger), SourceInfo.SCHEMA) + getValueConverters(configuration), SchemaNameAdjuster.create(logger), configuration.getSourceInfoStructMaker(SourceInfo.class).schema()) , tableIdCaseInsensitive ); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSourceInfoStructMaker.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSourceInfoStructMaker.java new file mode 100644 index 000000000..a74555a4f --- /dev/null +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSourceInfoStructMaker.java @@ -0,0 +1,71 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.connector.AbstractSourceInfoStructMaker; + +public class MySqlSourceInfoStructMaker extends AbstractSourceInfoStructMaker { + public static final String TIMESTAMP_KEY = "ts_ms"; + + private final Schema schema; + + public MySqlSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) { + super(connector, version, connectorConfig); + schema = commonSchemaBuilder() + .name("io.debezium.connector.mysql.Source") + .field(SourceInfo.SERVER_ID_KEY, Schema.INT64_SCHEMA) + .field(TIMESTAMP_KEY, Schema.INT64_SCHEMA) + .field(SourceInfo.GTID_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA) + .field(SourceInfo.BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA) + .field(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Schema.INT32_SCHEMA) + .field(SourceInfo.SNAPSHOT_KEY, SchemaBuilder.bool().optional().defaultValue(false).build()) + .field(SourceInfo.THREAD_KEY, Schema.OPTIONAL_INT64_SCHEMA) + .field(SourceInfo.DB_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.QUERY_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + } + + @Override + public Schema schema() { + return schema; + } + + @Override + public Struct struct(SourceInfo sourceInfo) { + Struct result = commonStruct(); + result.put(SourceInfo.SERVER_ID_KEY, sourceInfo.getServerId()); + if (sourceInfo.getCurrentGtid() != null) { + // Don't put the GTID Set into the struct; only the current GTID is fine ... + result.put(SourceInfo.GTID_KEY, sourceInfo.getCurrentGtid()); + } + result.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, sourceInfo.getCurrentBinlogFilename()); + result.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, sourceInfo.getCurrentBinlogPosition()); + result.put(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, sourceInfo.getCurrentRowNumber()); + result.put(TIMESTAMP_KEY, sourceInfo.getBinlogTimestampSeconds() * 1_000); + if (sourceInfo.isLastSnapshot()) { + // if the snapshot is COMPLETED, then this will not happen. + result.put(SourceInfo.SNAPSHOT_KEY, true); + } + if (sourceInfo.getThreadId() >= 0) { + result.put(SourceInfo.THREAD_KEY, sourceInfo.getThreadId()); + } + if (sourceInfo.getTableId() != null) { + result.put(SourceInfo.DB_NAME_KEY, sourceInfo.getTableId().catalog()); + result.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table()); + } + if (sourceInfo.getQuery() != null) { + result.put(SourceInfo.QUERY_KEY, sourceInfo.getQuery()); + } + return result; + } +} diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSystemVariables.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSystemVariables.java index 28b5b4237..94a32b6f3 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSystemVariables.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSystemVariables.java @@ -6,11 +6,11 @@ package io.debezium.connector.mysql; -import io.debezium.relational.SystemVariables; - import java.util.Arrays; import java.util.concurrent.ConcurrentMap; +import io.debezium.relational.SystemVariables; + /** * Custom class for MySQL {@link SystemVariables}, which defines MySQL scopes and constants of used variable names. * diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index a82f7ae9d..af904ea15 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -68,7 +68,7 @@ public MySqlTaskContext(Configuration config, Filters filters, Boolean tableIdCa this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), connectorConfig.getHeartbeatTopicsPrefix()); // Set up the source information ... - this.source = new SourceInfo(connectorConfig.getLogicalName()); + this.source = new SourceInfo(connectorConfig); // Set up the GTID filter ... String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java index 8b6fb42f0..62ec91d08 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java @@ -76,7 +76,7 @@ public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector structMaker; + private TableId tableId; - public SourceInfo(String logicalId) { - super(Module.version(), logicalId); - sourcePartition = Collect.hashMapOf(SERVER_PARTITION_KEY, logicalId); + public SourceInfo(MySqlConnectorConfig connectorConfig) { + super(Module.version(), connectorConfig.getLogicalName()); + this.structMaker = connectorConfig.getSourceInfoStructMaker(SourceInfo.class); + + sourcePartition = Collect.hashMapOf(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); } /** @@ -305,7 +291,7 @@ private Map offsetUsingPosition(long rowsToSkip) { */ @Override public Schema schema() { - return SCHEMA; + return structMaker.schema(); } @Override @@ -315,7 +301,7 @@ protected String connector() { /** * Get a {@link Struct} representation of the source {@link #partition()} and {@link #offset()} information. The Struct - * complies with the {@link #SCHEMA} for the MySQL connector. + * complies with the versioned source schema for the MySQL connector. *

* This method should always be called after {@link #offsetForRow(int, int)}. * @@ -329,7 +315,7 @@ public Struct struct() { /** * Get a {@link Struct} representation of the source {@link #partition()} and {@link #offset()} information. The Struct - * complies with the {@link #SCHEMA} for the MySQL connector. + * complies with the versioned source schema for the MySQL connector. *

* This method should always be called after {@link #offsetForRow(int, int)}. * @@ -338,31 +324,8 @@ public Struct struct() { * @see #schema() */ public Struct struct(TableId tableId) { - Struct result = super.struct(); - result.put(SERVER_ID_KEY, serverId); - if (currentGtid != null) { - // Don't put the GTID Set into the struct; only the current GTID is fine ... - result.put(GTID_KEY, currentGtid); - } - result.put(BINLOG_FILENAME_OFFSET_KEY, currentBinlogFilename); - result.put(BINLOG_POSITION_OFFSET_KEY, currentBinlogPosition); - result.put(BINLOG_ROW_IN_EVENT_OFFSET_KEY, currentRowNumber); - result.put(TIMESTAMP_KEY, binlogTimestampSeconds); - if (lastSnapshot) { - // if the snapshot is COMPLETED, then this will not happen. - result.put(SNAPSHOT_KEY, true); - } - if (threadId >= 0) { - result.put(THREAD_KEY, threadId); - } - if (tableId != null) { - result.put(DB_NAME_KEY, tableId.catalog()); - result.put(TABLE_NAME_KEY, tableId.table()); - } - if (currentQuery != null) { - result.put(QUERY_KEY, currentQuery); - } - return result; + this.tableId = tableId; + return structMaker.struct(this); } /** @@ -658,6 +621,42 @@ public int rowsToSkipUponRestart() { return restartRowsToSkip; } + long getServerId() { + return serverId; + } + + long getThreadId() { + return threadId; + } + + TableId getTableId() { + return tableId; + } + + String getCurrentGtid() { + return currentGtid; + } + + boolean isLastSnapshot() { + return lastSnapshot; + } + + String getCurrentBinlogFilename() { + return currentBinlogFilename; + } + + long getCurrentBinlogPosition() { + return currentBinlogPosition; + } + + long getBinlogTimestampSeconds() { + return binlogTimestampSeconds; + } + + int getCurrentRowNumber() { + return currentRowNumber; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/LegacyV1SourceInfoTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/LegacyV1SourceInfoTest.java new file mode 100644 index 000000000..8054d9f32 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/LegacyV1SourceInfoTest.java @@ -0,0 +1,748 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import static org.fest.assertions.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.Struct; +import org.fest.assertions.GenericAssert; +import org.junit.Before; +import org.junit.Test; + +import io.confluent.connect.avro.AvroData; +import io.debezium.config.CommonConnectorConfig.Version; +import io.debezium.config.Configuration; +import io.debezium.doc.FixFor; +import io.debezium.document.Document; + +public class LegacyV1SourceInfoTest { + + private static int avroSchemaCacheSize = 1000; + private static final AvroData avroData = new AvroData(avroSchemaCacheSize); + private static final String FILENAME = "mysql-bin.00001"; + private static final String GTID_SET = "gtid-set"; // can technically be any string + private static final String SERVER_NAME = "my-server"; // can technically be any string + + private SourceInfo source; + private boolean inTxn = false; + private long positionOfBeginEvent = 0L; + private int eventNumberInTxn = 0; + + @Before + public void beforeEach() { + source = new SourceInfo(new MySqlConnectorConfig(Configuration.create() + .with(MySqlConnectorConfig.SERVER_NAME, "server") + .with(MySqlConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1) + .build())); + inTxn = false; + positionOfBeginEvent = 0L; + eventNumberInTxn = 0; + } + + @Test + public void shouldStartSourceInfoFromZeroBinlogCoordinates() { + source.setBinlogStartPoint(FILENAME, 0); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(0); + assertThat(source.eventsToSkipUponRestart()).isEqualTo(0); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + @Test + public void shouldStartSourceInfoFromNonZeroBinlogCoordinates() { + source.setBinlogStartPoint(FILENAME, 100); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(100); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + // ------------------------------------------------------------------------------------- + // Test reading the offset map and recovering the proper SourceInfo state + // ------------------------------------------------------------------------------------- + + @Test + public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinates() { + sourceWith(offset(0, 0)); + assertThat(source.gtidSet()).isNull(); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(0); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + @Test + public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinates() { + sourceWith(offset(100, 0)); + assertThat(source.gtidSet()).isNull(); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(100); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + @Test + public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndNonZeroRow() { + sourceWith(offset(0, 5)); + assertThat(source.gtidSet()).isNull(); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(0); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(5); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + @Test + public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndNonZeroRow() { + sourceWith(offset(100, 5)); + assertThat(source.gtidSet()).isNull(); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(100); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(5); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + @Test + public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndSnapshot() { + sourceWith(offset(0, 0, true)); + assertThat(source.gtidSet()).isNull(); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(0); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isTrue(); + } + + @Test + public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndSnapshot() { + sourceWith(offset(100, 0, true)); + assertThat(source.gtidSet()).isNull(); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(100); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isTrue(); + } + + @Test + public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() { + sourceWith(offset(0, 5, true)); + assertThat(source.gtidSet()).isNull(); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(0); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(5); + assertThat(source.isSnapshotInEffect()).isTrue(); + } + + @Test + public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() { + sourceWith(offset(100, 5, true)); + assertThat(source.gtidSet()).isNull(); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(100); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(5); + assertThat(source.isSnapshotInEffect()).isTrue(); + } + + @Test + public void shouldRecoverSourceInfoFromOffsetWithFilterData() { + final String databaseWhitelist = "a,b"; + final String tableWhitelist = "c.foo,d.bar,d.baz"; + Map offset = offset(10, 10); + offset.put(SourceInfo.DATABASE_WHITELIST_KEY, databaseWhitelist); + offset.put(SourceInfo.TABLE_WHITELIST_KEY, tableWhitelist); + + sourceWith(offset); + assertThat(source.hasFilterInfo()).isTrue(); + assertEquals(databaseWhitelist, source.getDatabaseWhitelist()); + assertEquals(tableWhitelist, source.getTableWhitelist()); + // confirm other filter info is null + assertThat(source.getDatabaseBlacklist()).isNull(); + assertThat(source.getTableBlacklist()).isNull(); + } + + @Test + public void setOffsetFilterFromFilter() { + final String databaseBlacklist = "a,b"; + final String tableBlacklist = "c.foo, d.bar, d.baz"; + Map offset = offset(10, 10); + + sourceWith(offset); + assertThat(!source.hasFilterInfo()); + + final Configuration configuration = Configuration.create() + .with(MySqlConnectorConfig.DATABASE_BLACKLIST, databaseBlacklist) + .with(MySqlConnectorConfig.TABLE_BLACKLIST, tableBlacklist) + .build(); + source.setFilterDataFromConfig(configuration); + + assertThat(source.hasFilterInfo()).isTrue(); + assertEquals(databaseBlacklist, source.getDatabaseBlacklist()); + assertEquals(tableBlacklist, source.getTableBlacklist()); + } + + @Test + public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinates() { + sourceWith(offset(GTID_SET, 0, 0, false)); + assertThat(source.gtidSet()).isEqualTo(GTID_SET); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(0); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + @Test + public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinatesAndNonZeroRow() { + sourceWith(offset(GTID_SET, 0, 5, false)); + assertThat(source.gtidSet()).isEqualTo(GTID_SET); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(0); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(5); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + @Test + public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinates() { + sourceWith(offset(GTID_SET, 100, 0, false)); + assertThat(source.gtidSet()).isEqualTo(GTID_SET); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(100); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + @Test + public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinatesAndNonZeroRow() { + sourceWith(offset(GTID_SET, 100, 5, false)); + assertThat(source.gtidSet()).isEqualTo(GTID_SET); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(100); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(5); + assertThat(source.isSnapshotInEffect()).isFalse(); + } + + @Test + public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinatesAndSnapshot() { + sourceWith(offset(GTID_SET, 0, 0, true)); + assertThat(source.gtidSet()).isEqualTo(GTID_SET); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(0); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isTrue(); + } + + @Test + public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() { + sourceWith(offset(GTID_SET, 0, 5, true)); + assertThat(source.gtidSet()).isEqualTo(GTID_SET); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(0); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(5); + assertThat(source.isSnapshotInEffect()).isTrue(); + } + + @Test + public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinatesAndSnapshot() { + sourceWith(offset(GTID_SET, 100, 0, true)); + assertThat(source.gtidSet()).isEqualTo(GTID_SET); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(100); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + assertThat(source.isSnapshotInEffect()).isTrue(); + } + + @Test + public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() { + sourceWith(offset(GTID_SET, 100, 5, true)); + assertThat(source.gtidSet()).isEqualTo(GTID_SET); + assertThat(source.binlogFilename()).isEqualTo(FILENAME); + assertThat(source.binlogPosition()).isEqualTo(100); + assertThat(source.rowsToSkipUponRestart()).isEqualTo(5); + assertThat(source.isSnapshotInEffect()).isTrue(); + } + + // ------------------------------------------------------------------------------------- + // Test advancing SourceInfo state (similar to how the BinlogReader uses it) + // ------------------------------------------------------------------------------------- + + @Test + public void shouldAdvanceSourceInfoFromNonZeroPositionAndRowZeroForEventsWithOneRow() { + sourceWith(offset(100, 0)); + + // Try a transactions with just one event ... + handleTransactionBegin(150, 2); + handleNextEvent(200, 10, withRowCount(1)); + handleTransactionCommit(210, 2); + + handleTransactionBegin(210, 2); + handleNextEvent(220, 10, withRowCount(1)); + handleTransactionCommit(230, 3); + + handleTransactionBegin(240, 2); + handleNextEvent(250, 50, withRowCount(1)); + handleTransactionCommit(300, 4); + + // Try a transactions with multiple events ... + handleTransactionBegin(340, 2); + handleNextEvent(350, 20, withRowCount(1)); + handleNextEvent(370, 30, withRowCount(1)); + handleNextEvent(400, 40, withRowCount(1)); + handleTransactionCommit(440, 4); + + handleTransactionBegin(500, 2); + handleNextEvent(510, 20, withRowCount(1)); + handleNextEvent(540, 15, withRowCount(1)); + handleNextEvent(560, 10, withRowCount(1)); + handleTransactionCommit(580, 4); + + // Try another single event transaction ... + handleTransactionBegin(600, 2); + handleNextEvent(610, 50, withRowCount(1)); + handleTransactionCommit(660, 4); + + // Try event outside of a transaction ... + handleNextEvent(670, 10, withRowCount(1)); + + // Try another single event transaction ... + handleTransactionBegin(700, 2); + handleNextEvent(710, 50, withRowCount(1)); + handleTransactionCommit(760, 4); + } + + @Test + public void shouldAdvanceSourceInfoFromNonZeroPositionAndRowZeroForEventsWithMultipleRow() { + sourceWith(offset(100, 0)); + + // Try a transactions with just one event ... + handleTransactionBegin(150, 2); + handleNextEvent(200, 10, withRowCount(3)); + handleTransactionCommit(210, 2); + + handleTransactionBegin(210, 2); + handleNextEvent(220, 10, withRowCount(4)); + handleTransactionCommit(230, 3); + + handleTransactionBegin(240, 2); + handleNextEvent(250, 50, withRowCount(5)); + handleTransactionCommit(300, 4); + + // Try a transactions with multiple events ... + handleTransactionBegin(340, 2); + handleNextEvent(350, 20, withRowCount(6)); + handleNextEvent(370, 30, withRowCount(1)); + handleNextEvent(400, 40, withRowCount(3)); + handleTransactionCommit(440, 4); + + handleTransactionBegin(500, 2); + handleNextEvent(510, 20, withRowCount(8)); + handleNextEvent(540, 15, withRowCount(9)); + handleNextEvent(560, 10, withRowCount(1)); + handleTransactionCommit(580, 4); + + // Try another single event transaction ... + handleTransactionBegin(600, 2); + handleNextEvent(610, 50, withRowCount(1)); + handleTransactionCommit(660, 4); + + // Try event outside of a transaction ... + handleNextEvent(670, 10, withRowCount(5)); + + // Try another single event transaction ... + handleTransactionBegin(700, 2); + handleNextEvent(710, 50, withRowCount(3)); + handleTransactionCommit(760, 4); + } + + // ------------------------------------------------------------------------------------- + // Utility methods + // ------------------------------------------------------------------------------------- + + protected int withRowCount(int rowCount) { + return rowCount; + } + + protected void handleTransactionBegin(long positionOfEvent, int eventSize) { + source.setEventPosition(positionOfEvent, eventSize); + positionOfBeginEvent = positionOfEvent; + source.startNextTransaction(); + inTxn = true; + + assertThat(source.rowsToSkipUponRestart()).isEqualTo(0); + } + + protected void handleTransactionCommit(long positionOfEvent, int eventSize) { + source.setEventPosition(positionOfEvent, eventSize); + source.commitTransaction(); + eventNumberInTxn = 0; + inTxn = false; + + // Verify the offset ... + Map offset = source.offset(); + + // The offset position should be the position of the next event + long position = (Long) offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY); + assertThat(position).isEqualTo(positionOfEvent + eventSize); + Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY); + if (rowsToSkip == null) { + rowsToSkip = 0L; + } + assertThat(rowsToSkip).isEqualTo(0); + assertThat(offset.get(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY)).isNull(); + if (source.gtidSet() != null) { + assertThat(offset.get(SourceInfo.GTID_SET_KEY)).isEqualTo(source.gtidSet()); + } + } + + protected void handleNextEvent(long positionOfEvent, long eventSize, int rowCount) { + if (inTxn) { + ++eventNumberInTxn; + } + source.setEventPosition(positionOfEvent, eventSize); + for (int row = 0; row != rowCount; ++row) { + // Get the offset for this row (always first!) ... + Map offset = source.offsetForRow(row, rowCount); + assertThat(offset.get(SourceInfo.BINLOG_FILENAME_OFFSET_KEY)).isEqualTo(FILENAME); + if (source.gtidSet() != null) { + assertThat(offset.get(SourceInfo.GTID_SET_KEY)).isEqualTo(source.gtidSet()); + } + long position = (Long) offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY); + if (inTxn) { + // regardless of the row count, the position is always the txn begin position ... + assertThat(position).isEqualTo(positionOfBeginEvent); + // and the number of the last completed event (the previous one) ... + Long eventsToSkip = (Long) offset.get(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY); + if (eventsToSkip == null) { + eventsToSkip = 0L; + } + assertThat(eventsToSkip).isEqualTo(eventNumberInTxn - 1); + } else { + // Matches the next event ... + assertThat(position).isEqualTo(positionOfEvent + eventSize); + assertThat(offset.get(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY)).isNull(); + } + Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY); + if (rowsToSkip == null) { + rowsToSkip = 0L; + } + if ((row + 1) == rowCount) { + // This is the last row, so the next binlog position should be the number of rows in the event ... + assertThat(rowsToSkip).isEqualTo(rowCount); + } else { + // This is not the last row, so the next binlog position should be the row number ... + assertThat(rowsToSkip).isEqualTo(row + 1); + } + // Get the source struct for this row (always second), which should always reflect this row in this event ... + Struct recordSource = source.struct(); + assertThat(recordSource.getInt64(SourceInfo.BINLOG_POSITION_OFFSET_KEY)).isEqualTo(positionOfEvent); + assertThat(recordSource.getInt32(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY)).isEqualTo(row); + assertThat(recordSource.getString(SourceInfo.BINLOG_FILENAME_OFFSET_KEY)).isEqualTo(FILENAME); + if (source.gtidSet() != null) { + assertThat(recordSource.getString(SourceInfo.GTID_SET_KEY)).isEqualTo(source.gtidSet()); + } + } + source.completeEvent(); + } + + protected Map offset(long position, int row) { + return offset(null, position, row, false); + } + + protected Map offset(long position, int row, boolean snapshot) { + return offset(null, position, row, snapshot); + } + + protected Map offset(String gtidSet, long position, int row, boolean snapshot) { + Map offset = new HashMap<>(); + offset.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, FILENAME); + offset.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, Long.toString(position)); + offset.put(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Integer.toString(row)); + if (gtidSet != null) { + offset.put(SourceInfo.GTID_SET_KEY, gtidSet); + } + if (snapshot) { + offset.put(SourceInfo.SNAPSHOT_KEY, Boolean.TRUE.toString()); + } + return offset; + } + + protected SourceInfo sourceWith(Map offset) { + source = new SourceInfo(new MySqlConnectorConfig(Configuration.create() + .with(MySqlConnectorConfig.SERVER_NAME, SERVER_NAME) + .with(MySqlConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1) + .build())); + source.setOffset(offset); + return source; + } + + /** + * When we want to consume SinkRecord which generated by debezium-connector-mysql, it should not + * throw error "org.apache.avro.SchemaParseException: Illegal character in: server-id" + */ + @Test + public void shouldValidateSourceInfoSchema() { + org.apache.kafka.connect.data.Schema kafkaSchema = source.schema(); + Schema avroSchema = avroData.fromConnectSchema(kafkaSchema); + assertTrue(avroSchema != null); + } + + @Test + public void shouldConsiderPositionsWithSameGtidSetsAsSame() { + assertPositionWithGtids("IdA:1-5").isAtOrBefore(positionWithGtids("IdA:1-5")); // same, single + assertPositionWithGtids("IdA:1-5,IdB:1-20").isAtOrBefore(positionWithGtids("IdA:1-5,IdB:1-20")); // same, multiple + assertPositionWithGtids("IdA:1-5,IdB:1-20").isAtOrBefore(positionWithGtids("IdB:1-20,IdA:1-5")); // equivalent + } + + @Test + public void shouldConsiderPositionsWithSameGtidSetsAndSnapshotAsSame() { + assertPositionWithGtids("IdA:1-5", true).isAtOrBefore(positionWithGtids("IdA:1-5", true)); // same, single + assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdA:1-5,IdB:1-20", true)); // same, + // multiple + assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdB:1-20,IdA:1-5", true)); // equivalent + } + + @Test + public void shouldOrderPositionWithGtidAndSnapshotBeforePositionWithSameGtidButNoSnapshot() { + assertPositionWithGtids("IdA:1-5", true).isAtOrBefore(positionWithGtids("IdA:1-5")); // same, single + assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdA:1-5,IdB:1-20")); // same, multiple + assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdB:1-20,IdA:1-5")); // equivalent + } + + @Test + public void shouldOrderPositionWithoutGtidAndSnapshotAfterPositionWithSameGtidAndSnapshot() { + assertPositionWithGtids("IdA:1-5", false).isAfter(positionWithGtids("IdA:1-5", true)); // same, single + assertPositionWithGtids("IdA:1-5,IdB:1-20", false).isAfter(positionWithGtids("IdA:1-5,IdB:1-20", true)); // same, multiple + assertPositionWithGtids("IdA:1-5,IdB:1-20", false).isAfter(positionWithGtids("IdB:1-20,IdA:1-5", true)); // equivalent + } + + @Test + public void shouldOrderPositionWithGtidsAsBeforePositionWithExtraServerUuidInGtids() { + assertPositionWithGtids("IdA:1-5").isBefore(positionWithGtids("IdA:1-5,IdB:1-20")); + } + + @Test + public void shouldOrderPositionsWithSameServerButLowerUpperLimitAsBeforePositionWithSameServerUuidInGtids() { + assertPositionWithGtids("IdA:1-5").isBefore(positionWithGtids("IdA:1-6")); + assertPositionWithGtids("IdA:1-5:7-9").isBefore(positionWithGtids("IdA:1-10")); + assertPositionWithGtids("IdA:2-5:8-9").isBefore(positionWithGtids("IdA:1-10")); + } + + @Test + public void shouldOrderPositionWithoutGtidAsBeforePositionWithGtid() { + assertPositionWithoutGtids("filename.01", Integer.MAX_VALUE, 0, 0).isBefore(positionWithGtids("IdA:1-5")); + } + + @Test + public void shouldOrderPositionWithGtidAsAfterPositionWithoutGtid() { + assertPositionWithGtids("IdA:1-5").isAfter(positionWithoutGtids("filename.01", 0, 0, 0)); + } + + @Test + public void shouldComparePositionsWithoutGtids() { + // Same position ... + assertPositionWithoutGtids("fn.01", 1, 0, 0).isAt(positionWithoutGtids("fn.01", 1, 0, 0)); + assertPositionWithoutGtids("fn.01", 1, 0, 1).isAt(positionWithoutGtids("fn.01", 1, 0, 1)); + assertPositionWithoutGtids("fn.03", 1, 0, 1).isAt(positionWithoutGtids("fn.03", 1, 0, 1)); + assertPositionWithoutGtids("fn.01", 1, 1, 0).isAt(positionWithoutGtids("fn.01", 1, 1, 0)); + assertPositionWithoutGtids("fn.01", 1, 1, 1).isAt(positionWithoutGtids("fn.01", 1, 1, 1)); + assertPositionWithoutGtids("fn.03", 1, 1, 1).isAt(positionWithoutGtids("fn.03", 1, 1, 1)); + + // Before position ... + assertPositionWithoutGtids("fn.01", 1, 0, 0).isBefore(positionWithoutGtids("fn.01", 1, 0, 1)); + assertPositionWithoutGtids("fn.01", 1, 0, 0).isBefore(positionWithoutGtids("fn.01", 2, 0, 0)); + assertPositionWithoutGtids("fn.01", 1, 0, 1).isBefore(positionWithoutGtids("fn.01", 1, 0, 2)); + assertPositionWithoutGtids("fn.01", 1, 0, 1).isBefore(positionWithoutGtids("fn.01", 2, 0, 0)); + assertPositionWithoutGtids("fn.01", 1, 1, 0).isBefore(positionWithoutGtids("fn.01", 1, 1, 1)); + assertPositionWithoutGtids("fn.01", 1, 1, 0).isBefore(positionWithoutGtids("fn.01", 1, 2, 0)); + assertPositionWithoutGtids("fn.01", 1, 1, 1).isBefore(positionWithoutGtids("fn.01", 1, 2, 0)); + assertPositionWithoutGtids("fn.01", 1, 1, 1).isBefore(positionWithoutGtids("fn.01", 2, 0, 0)); + + // After position ... + assertPositionWithoutGtids("fn.01", 1, 0, 1).isAfter(positionWithoutGtids("fn.01", 0, 0, 99)); + assertPositionWithoutGtids("fn.01", 1, 0, 1).isAfter(positionWithoutGtids("fn.01", 1, 0, 0)); + assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 0, 0, 99)); + assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 0, 0)); + assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 1, 0)); + } + + @Test + public void shouldComparePositionsWithDifferentFields() { + Document history = positionWith("mysql-bin.000008", 380941551, "01261278-6ade-11e6-b36a-42010af00790:1-378422946," + + "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11002284," + + "716ec46f-d522-11e5-bb56-0242ac110004:1-34673215," + + "96c2072e-e428-11e6-9590-42010a28002d:1-3," + + "c627b2bc-9647-11e6-a886-42010af0044a:1-9541144", 0, 0, true); + Document current = positionWith("mysql-bin.000016", 645115324, "01261278-6ade-11e6-b36a-42010af00790:1-400944168," + + "30efb117-e42a-11e6-ba9e-42010a28002e:1-9," + + "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11604379," + + "621dc2f6-803b-11e6-acc1-42010af000a4:1-7963838," + + "716ec46f-d522-11e5-bb56-0242ac110004:1-35850702," + + "c627b2bc-9647-11e6-a886-42010af0044a:1-10426868," + + "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648", 2, 1, false); + assertThatDocument(current).isAfter(history); + Set excludes = Collections.singleton("96c2072e-e428-11e6-9590-42010a28002d"); + assertThatDocument(history).isAtOrBefore(current, (uuid) -> !excludes.contains(uuid)); + } + + @FixFor("DBZ-107") + @Test + public void shouldRemoveNewlinesFromGtidSet() { + String gtidExecuted = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2,\n" + + "7145bf69-d1ca-11e5-a588-0242ac110004:1-3149,\n" + + "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-39"; + String gtidCleaned = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2," + + "7145bf69-d1ca-11e5-a588-0242ac110004:1-3149," + + "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-39"; + source.setCompletedGtidSet(gtidExecuted); + assertThat(source.gtidSet()).isEqualTo(gtidCleaned); + } + + @FixFor("DBZ-107") + @Test + public void shouldNotSetBlankGtidSet() { + source.setCompletedGtidSet(""); + assertThat(source.gtidSet()).isNull(); + } + + @FixFor("DBZ-107") + @Test + public void shouldNotSetNullGtidSet() { + source.setCompletedGtidSet(null); + assertThat(source.gtidSet()).isNull(); + } + + @Test + public void shouldHaveTimestamp() { + sourceWith(offset(100, 5, true)); + source.setBinlogTimestampSeconds(1_024); + assertThat(source.struct().get("ts_sec")).isEqualTo(1_024L); + } + + @Test + public void versionIsPresent() { + sourceWith(offset(100, 5, true)); + assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version()); + } + + @Test + public void connectorIsPresent() { + sourceWith(offset(100, 5, true)); + assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name()); + } + + protected Document positionWithGtids(String gtids) { + return positionWithGtids(gtids, false); + } + + protected Document positionWithGtids(String gtids, boolean snapshot) { + if (snapshot) { + return Document.create(SourceInfo.GTID_SET_KEY, gtids, SourceInfo.SNAPSHOT_KEY, true); + } + return Document.create(SourceInfo.GTID_SET_KEY, gtids); + } + + protected Document positionWithoutGtids(String filename, int position, int event, int row) { + return positionWithoutGtids(filename, position, event, row, false); + } + + protected Document positionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) { + return positionWith(filename, position, null, event, row, snapshot); + } + + protected Document positionWith(String filename, int position, String gtids, int event, int row, boolean snapshot) { + Document pos = Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename, + SourceInfo.BINLOG_POSITION_OFFSET_KEY, position); + if (row >= 0) { + pos = pos.set(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row); + } + if (event >= 0) { + pos = pos.set(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event); + } + if (gtids != null && gtids.trim().length() != 0) { + pos = pos.set(SourceInfo.GTID_SET_KEY, gtids); + } + if (snapshot) { + pos = pos.set(SourceInfo.SNAPSHOT_KEY, true); + } + return pos; + } + + protected PositionAssert assertThatDocument(Document position) { + return new PositionAssert(position); + } + + protected PositionAssert assertPositionWithGtids(String gtids) { + return assertThatDocument(positionWithGtids(gtids)); + } + + protected PositionAssert assertPositionWithGtids(String gtids, boolean snapshot) { + return assertThatDocument(positionWithGtids(gtids, snapshot)); + } + + protected PositionAssert assertPositionWithoutGtids(String filename, int position, int event, int row) { + return assertPositionWithoutGtids(filename, position, event, row, false); + } + + protected PositionAssert assertPositionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) { + return assertThatDocument(positionWithoutGtids(filename, position, event, row, snapshot)); + } + + protected static class PositionAssert extends GenericAssert { + public PositionAssert(Document position) { + super(PositionAssert.class, position); + } + + public PositionAssert isAt(Document otherPosition) { + return isAt(otherPosition, null); + } + + public PositionAssert isAt(Document otherPosition, Predicate gtidFilter) { + if (SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) { + return this; + } + failIfCustomMessageIsSet(); + throw failure(actual + " should be consider same position as " + otherPosition); + } + + public PositionAssert isBefore(Document otherPosition) { + return isBefore(otherPosition, null); + } + + public PositionAssert isBefore(Document otherPosition, Predicate gtidFilter) { + return isAtOrBefore(otherPosition, gtidFilter); + } + + public PositionAssert isAtOrBefore(Document otherPosition) { + return isAtOrBefore(otherPosition, null); + } + + public PositionAssert isAtOrBefore(Document otherPosition, Predicate gtidFilter) { + if (SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) { + return this; + } + failIfCustomMessageIsSet(); + throw failure(actual + " should be consider same position as or before " + otherPosition); + } + + public PositionAssert isAfter(Document otherPosition) { + return isAfter(otherPosition, null); + } + + public PositionAssert isAfter(Document otherPosition, Predicate gtidFilter) { + if (!SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) { + return this; + } + failIfCustomMessageIsSet(); + throw failure(actual + " should be consider after " + otherPosition); + } + } +} diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index d96185ca8..0231d7066 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -609,7 +609,9 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio stopConnector(); // Read the last committed offsets, and verify the binlog coordinates ... - SourceInfo persistedOffsetSource = new SourceInfo(config.getString(RelationalDatabaseConnectorConfig.SERVER_NAME)); + SourceInfo persistedOffsetSource = new SourceInfo(new MySqlConnectorConfig(Configuration.create() + .with(MySqlConnectorConfig.SERVER_NAME, config.getString(RelationalDatabaseConnectorConfig.SERVER_NAME)) + .build())); Map lastCommittedOffset = readLastCommittedOffset(config, persistedOffsetSource.partition()); persistedOffsetSource.setOffset(lastCommittedOffset); Testing.print("Position before inserts: " + positionBeforeInserts); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaTest.java index ee09ccddb..c3cfe9ba3 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaTest.java @@ -16,6 +16,7 @@ import org.junit.Before; import org.junit.Test; +import io.debezium.config.Configuration; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.TableSchema; @@ -42,7 +43,9 @@ public void beforeEach() { Testing.Files.delete(TEST_FILE_PATH); build = new Configurator(); mysql = null; - source = new SourceInfo("server"); + source = new SourceInfo(new MySqlConnectorConfig(Configuration.create() + .with(MySqlConnectorConfig.SERVER_NAME, "server") + .build())); } @After diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java index d4a3cdd03..3b9b27b36 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java @@ -42,7 +42,9 @@ public class SourceInfoTest { @Before public void beforeEach() { - source = new SourceInfo("server"); + source = new SourceInfo(new MySqlConnectorConfig(Configuration.create() + .with(MySqlConnectorConfig.SERVER_NAME, "server") + .build())); inTxn = false; positionOfBeginEvent = 0L; eventNumberInTxn = 0; @@ -473,7 +475,9 @@ protected Map offset(String gtidSet, long position, int row, boo } protected SourceInfo sourceWith(Map offset) { - source = new SourceInfo(SERVER_NAME); + source = new SourceInfo(new MySqlConnectorConfig(Configuration.create() + .with(MySqlConnectorConfig.SERVER_NAME, SERVER_NAME) + .build())); source.setOffset(offset); return source; } @@ -484,7 +488,7 @@ protected SourceInfo sourceWith(Map offset) { */ @Test public void shouldValidateSourceInfoSchema() { - org.apache.kafka.connect.data.Schema kafkaSchema = SourceInfo.SCHEMA; + org.apache.kafka.connect.data.Schema kafkaSchema = source.schema(); Schema avroSchema = avroData.fromConnectSchema(kafkaSchema); assertTrue(avroSchema != null); } @@ -614,6 +618,13 @@ public void shouldNotSetNullGtidSet() { assertThat(source.gtidSet()).isNull(); } + @Test + public void shouldHaveTimestamp() { + sourceWith(offset(100, 5, true)); + source.setBinlogTimestampSeconds(1_024); + assertThat(source.struct().get("ts_ms")).isEqualTo(1_024_000L); + } + @Test public void versionIsPresent() { sourceWith(offset(100, 5, true));