DBZ-1235 Timestamp in ms for MySQL connector

This commit is contained in:
Jiri Pechanec 2019-05-23 14:32:07 +02:00 committed by Gunnar Morling
parent ea63dbf291
commit 0ec2d651fe
13 changed files with 979 additions and 63 deletions

View File

@ -12,11 +12,9 @@
import java.io.Serializable;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.UnrecoverableKeyException;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.KeyManager;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.BitSet;
@ -30,6 +28,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

View File

@ -0,0 +1,70 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfoStructMaker;
public class LegacyV1MySqlSourceInfoStructMaker extends AbstractSourceInfoStructMaker<SourceInfo> {
private final Schema schema;
public LegacyV1MySqlSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
.name("io.debezium.connector.mysql.Source")
.field(SourceInfo.SERVER_ID_KEY, Schema.INT64_SCHEMA)
.field(SourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.field(SourceInfo.GTID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA)
.field(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Schema.INT32_SCHEMA)
.field(SourceInfo.SNAPSHOT_KEY, SchemaBuilder.bool().optional().defaultValue(false).build())
.field(SourceInfo.THREAD_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.DB_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.QUERY_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
@Override
public Schema schema() {
return schema;
}
@Override
public Struct struct(SourceInfo sourceInfo) {
Struct result = commonStruct();
result.put(SourceInfo.SERVER_ID_KEY, sourceInfo.getServerId());
if (sourceInfo.getCurrentGtid() != null) {
// Don't put the GTID Set into the struct; only the current GTID is fine ...
result.put(SourceInfo.GTID_KEY, sourceInfo.getCurrentGtid());
}
result.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, sourceInfo.getCurrentBinlogFilename());
result.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, sourceInfo.getCurrentBinlogPosition());
result.put(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, sourceInfo.getCurrentRowNumber());
result.put(SourceInfo.TIMESTAMP_KEY, sourceInfo.getBinlogTimestampSeconds());
if (sourceInfo.isLastSnapshot()) {
// if the snapshot is COMPLETED, then this will not happen.
result.put(SourceInfo.SNAPSHOT_KEY, true);
}
if (sourceInfo.getThreadId() >= 0) {
result.put(SourceInfo.THREAD_KEY, sourceInfo.getThreadId());
}
if (sourceInfo.getTableId() != null) {
result.put(SourceInfo.DB_NAME_KEY, sourceInfo.getTableId().catalog());
result.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table());
}
if (sourceInfo.getQuery() != null) {
result.put(SourceInfo.QUERY_KEY, sourceInfo.getQuery());
}
return result;
}
}

View File

@ -19,6 +19,8 @@
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.TemporalPrecisionMode;
@ -1175,4 +1177,14 @@ private static int randomServerId() {
int highestServerId = 6400;
return lowestServerId + new Random().nextInt(highestServerId - lowestServerId);
}
@Override
protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(Version version) {
switch (version) {
case V1:
return new LegacyV1MySqlSourceInfoStructMaker(Module.name(), Module.version(), this);
default:
return new MySqlSourceInfoStructMaker(Module.name(), Module.version(), this);
}
}
}

View File

@ -94,7 +94,7 @@ public MySqlSchema(MySqlConnectorConfig configuration,
TableFilter.fromPredicate(tableFilters.tableFilter()),
tableFilters.columnFilter(),
new TableSchemaBuilder(
getValueConverters(configuration), SchemaNameAdjuster.create(logger), SourceInfo.SCHEMA)
getValueConverters(configuration), SchemaNameAdjuster.create(logger), configuration.getSourceInfoStructMaker(SourceInfo.class).schema())
,
tableIdCaseInsensitive
);

View File

@ -0,0 +1,71 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfoStructMaker;
public class MySqlSourceInfoStructMaker extends AbstractSourceInfoStructMaker<SourceInfo> {
public static final String TIMESTAMP_KEY = "ts_ms";
private final Schema schema;
public MySqlSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
.name("io.debezium.connector.mysql.Source")
.field(SourceInfo.SERVER_ID_KEY, Schema.INT64_SCHEMA)
.field(TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.field(SourceInfo.GTID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA)
.field(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Schema.INT32_SCHEMA)
.field(SourceInfo.SNAPSHOT_KEY, SchemaBuilder.bool().optional().defaultValue(false).build())
.field(SourceInfo.THREAD_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.DB_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.QUERY_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
@Override
public Schema schema() {
return schema;
}
@Override
public Struct struct(SourceInfo sourceInfo) {
Struct result = commonStruct();
result.put(SourceInfo.SERVER_ID_KEY, sourceInfo.getServerId());
if (sourceInfo.getCurrentGtid() != null) {
// Don't put the GTID Set into the struct; only the current GTID is fine ...
result.put(SourceInfo.GTID_KEY, sourceInfo.getCurrentGtid());
}
result.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, sourceInfo.getCurrentBinlogFilename());
result.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, sourceInfo.getCurrentBinlogPosition());
result.put(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, sourceInfo.getCurrentRowNumber());
result.put(TIMESTAMP_KEY, sourceInfo.getBinlogTimestampSeconds() * 1_000);
if (sourceInfo.isLastSnapshot()) {
// if the snapshot is COMPLETED, then this will not happen.
result.put(SourceInfo.SNAPSHOT_KEY, true);
}
if (sourceInfo.getThreadId() >= 0) {
result.put(SourceInfo.THREAD_KEY, sourceInfo.getThreadId());
}
if (sourceInfo.getTableId() != null) {
result.put(SourceInfo.DB_NAME_KEY, sourceInfo.getTableId().catalog());
result.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table());
}
if (sourceInfo.getQuery() != null) {
result.put(SourceInfo.QUERY_KEY, sourceInfo.getQuery());
}
return result;
}
}

View File

@ -6,11 +6,11 @@
package io.debezium.connector.mysql;
import io.debezium.relational.SystemVariables;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import io.debezium.relational.SystemVariables;
/**
* Custom class for MySQL {@link SystemVariables}, which defines MySQL scopes and constants of used variable names.
*

View File

@ -68,7 +68,7 @@ public MySqlTaskContext(Configuration config, Filters filters, Boolean tableIdCa
this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), connectorConfig.getHeartbeatTopicsPrefix());
// Set up the source information ...
this.source = new SourceInfo(connectorConfig.getLogicalName());
this.source = new SourceInfo(connectorConfig);
// Set up the GTID filter ...
String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES);

View File

@ -76,7 +76,7 @@ public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector<TableId
.build();
this.schemaChangeValueSchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.mysql.SchemaChangeValue"))
.field(Fields.SOURCE, SourceInfo.SCHEMA)
.field(Fields.SOURCE, source.schema())
.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA)
.field(Fields.DDL_STATEMENTS, Schema.STRING_SCHEMA)
.build();

View File

@ -10,13 +10,13 @@
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.data.Envelope;
import io.debezium.document.Document;
import io.debezium.relational.TableId;
@ -117,24 +117,6 @@ final class SourceInfo extends AbstractSourceInfo {
public static final String TABLE_BLACKLIST_KEY = "table_blacklist";
public static final String RESTART_PREFIX = "RESTART_";
/**
* A {@link Schema} definition for a {@link Struct} used to store the {@link #partition()} and {@link #offset()} information.
*/
public static final Schema SCHEMA = schemaBuilder()
.name("io.debezium.connector.mysql.Source")
.field(SERVER_ID_KEY, Schema.INT64_SCHEMA)
.field(TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.field(GTID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA)
.field(BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA)
.field(BINLOG_ROW_IN_EVENT_OFFSET_KEY, Schema.INT32_SCHEMA)
.field(SNAPSHOT_KEY, SchemaBuilder.bool().optional().defaultValue(false).build())
.field(THREAD_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(DB_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(QUERY_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
private String currentGtidSet;
private String currentGtid;
private String currentBinlogFilename;
@ -158,10 +140,14 @@ final class SourceInfo extends AbstractSourceInfo {
private String databaseBlacklist;
private String tableWhitelist;
private String tableBlacklist;
private SourceInfoStructMaker<SourceInfo> structMaker;
private TableId tableId;
public SourceInfo(String logicalId) {
super(Module.version(), logicalId);
sourcePartition = Collect.hashMapOf(SERVER_PARTITION_KEY, logicalId);
public SourceInfo(MySqlConnectorConfig connectorConfig) {
super(Module.version(), connectorConfig.getLogicalName());
this.structMaker = connectorConfig.getSourceInfoStructMaker(SourceInfo.class);
sourcePartition = Collect.hashMapOf(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
}
/**
@ -305,7 +291,7 @@ private Map<String, Object> offsetUsingPosition(long rowsToSkip) {
*/
@Override
public Schema schema() {
return SCHEMA;
return structMaker.schema();
}
@Override
@ -315,7 +301,7 @@ protected String connector() {
/**
* Get a {@link Struct} representation of the source {@link #partition()} and {@link #offset()} information. The Struct
* complies with the {@link #SCHEMA} for the MySQL connector.
* complies with the versioned source schema for the MySQL connector.
* <p>
* This method should always be called after {@link #offsetForRow(int, int)}.
*
@ -329,7 +315,7 @@ public Struct struct() {
/**
* Get a {@link Struct} representation of the source {@link #partition()} and {@link #offset()} information. The Struct
* complies with the {@link #SCHEMA} for the MySQL connector.
* complies with the versioned source schema for the MySQL connector.
* <p>
* This method should always be called after {@link #offsetForRow(int, int)}.
*
@ -338,31 +324,8 @@ public Struct struct() {
* @see #schema()
*/
public Struct struct(TableId tableId) {
Struct result = super.struct();
result.put(SERVER_ID_KEY, serverId);
if (currentGtid != null) {
// Don't put the GTID Set into the struct; only the current GTID is fine ...
result.put(GTID_KEY, currentGtid);
}
result.put(BINLOG_FILENAME_OFFSET_KEY, currentBinlogFilename);
result.put(BINLOG_POSITION_OFFSET_KEY, currentBinlogPosition);
result.put(BINLOG_ROW_IN_EVENT_OFFSET_KEY, currentRowNumber);
result.put(TIMESTAMP_KEY, binlogTimestampSeconds);
if (lastSnapshot) {
// if the snapshot is COMPLETED, then this will not happen.
result.put(SNAPSHOT_KEY, true);
}
if (threadId >= 0) {
result.put(THREAD_KEY, threadId);
}
if (tableId != null) {
result.put(DB_NAME_KEY, tableId.catalog());
result.put(TABLE_NAME_KEY, tableId.table());
}
if (currentQuery != null) {
result.put(QUERY_KEY, currentQuery);
}
return result;
this.tableId = tableId;
return structMaker.struct(this);
}
/**
@ -658,6 +621,42 @@ public int rowsToSkipUponRestart() {
return restartRowsToSkip;
}
long getServerId() {
return serverId;
}
long getThreadId() {
return threadId;
}
TableId getTableId() {
return tableId;
}
String getCurrentGtid() {
return currentGtid;
}
boolean isLastSnapshot() {
return lastSnapshot;
}
String getCurrentBinlogFilename() {
return currentBinlogFilename;
}
long getCurrentBinlogPosition() {
return currentBinlogPosition;
}
long getBinlogTimestampSeconds() {
return binlogTimestampSeconds;
}
int getCurrentRowNumber() {
return currentRowNumber;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();

View File

@ -0,0 +1,748 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.GenericAssert;
import org.junit.Before;
import org.junit.Test;
import io.confluent.connect.avro.AvroData;
import io.debezium.config.CommonConnectorConfig.Version;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.document.Document;
public class LegacyV1SourceInfoTest {
private static int avroSchemaCacheSize = 1000;
private static final AvroData avroData = new AvroData(avroSchemaCacheSize);
private static final String FILENAME = "mysql-bin.00001";
private static final String GTID_SET = "gtid-set"; // can technically be any string
private static final String SERVER_NAME = "my-server"; // can technically be any string
private SourceInfo source;
private boolean inTxn = false;
private long positionOfBeginEvent = 0L;
private int eventNumberInTxn = 0;
@Before
public void beforeEach() {
source = new SourceInfo(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, "server")
.with(MySqlConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build()));
inTxn = false;
positionOfBeginEvent = 0L;
eventNumberInTxn = 0;
}
@Test
public void shouldStartSourceInfoFromZeroBinlogCoordinates() {
source.setBinlogStartPoint(FILENAME, 0);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.eventsToSkipUponRestart()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@Test
public void shouldStartSourceInfoFromNonZeroBinlogCoordinates() {
source.setBinlogStartPoint(FILENAME, 100);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
// -------------------------------------------------------------------------------------
// Test reading the offset map and recovering the proper SourceInfo state
// -------------------------------------------------------------------------------------
@Test
public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinates() {
sourceWith(offset(0, 0));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinates() {
sourceWith(offset(100, 0));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndNonZeroRow() {
sourceWith(offset(0, 5));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndNonZeroRow() {
sourceWith(offset(100, 5));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndSnapshot() {
sourceWith(offset(0, 0, true));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndSnapshot() {
sourceWith(offset(100, 0, true));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() {
sourceWith(offset(0, 5, true));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() {
sourceWith(offset(100, 5, true));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithFilterData() {
final String databaseWhitelist = "a,b";
final String tableWhitelist = "c.foo,d.bar,d.baz";
Map<String, String> offset = offset(10, 10);
offset.put(SourceInfo.DATABASE_WHITELIST_KEY, databaseWhitelist);
offset.put(SourceInfo.TABLE_WHITELIST_KEY, tableWhitelist);
sourceWith(offset);
assertThat(source.hasFilterInfo()).isTrue();
assertEquals(databaseWhitelist, source.getDatabaseWhitelist());
assertEquals(tableWhitelist, source.getTableWhitelist());
// confirm other filter info is null
assertThat(source.getDatabaseBlacklist()).isNull();
assertThat(source.getTableBlacklist()).isNull();
}
@Test
public void setOffsetFilterFromFilter() {
final String databaseBlacklist = "a,b";
final String tableBlacklist = "c.foo, d.bar, d.baz";
Map<String, String> offset = offset(10, 10);
sourceWith(offset);
assertThat(!source.hasFilterInfo());
final Configuration configuration = Configuration.create()
.with(MySqlConnectorConfig.DATABASE_BLACKLIST, databaseBlacklist)
.with(MySqlConnectorConfig.TABLE_BLACKLIST, tableBlacklist)
.build();
source.setFilterDataFromConfig(configuration);
assertThat(source.hasFilterInfo()).isTrue();
assertEquals(databaseBlacklist, source.getDatabaseBlacklist());
assertEquals(tableBlacklist, source.getTableBlacklist());
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinates() {
sourceWith(offset(GTID_SET, 0, 0, false));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinatesAndNonZeroRow() {
sourceWith(offset(GTID_SET, 0, 5, false));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinates() {
sourceWith(offset(GTID_SET, 100, 0, false));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinatesAndNonZeroRow() {
sourceWith(offset(GTID_SET, 100, 5, false));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinatesAndSnapshot() {
sourceWith(offset(GTID_SET, 0, 0, true));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() {
sourceWith(offset(GTID_SET, 0, 5, true));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinatesAndSnapshot() {
sourceWith(offset(GTID_SET, 100, 0, true));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogCoordinatesAndNonZeroRowAndSnapshot() {
sourceWith(offset(GTID_SET, 100, 5, true));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isTrue();
}
// -------------------------------------------------------------------------------------
// Test advancing SourceInfo state (similar to how the BinlogReader uses it)
// -------------------------------------------------------------------------------------
@Test
public void shouldAdvanceSourceInfoFromNonZeroPositionAndRowZeroForEventsWithOneRow() {
sourceWith(offset(100, 0));
// Try a transactions with just one event ...
handleTransactionBegin(150, 2);
handleNextEvent(200, 10, withRowCount(1));
handleTransactionCommit(210, 2);
handleTransactionBegin(210, 2);
handleNextEvent(220, 10, withRowCount(1));
handleTransactionCommit(230, 3);
handleTransactionBegin(240, 2);
handleNextEvent(250, 50, withRowCount(1));
handleTransactionCommit(300, 4);
// Try a transactions with multiple events ...
handleTransactionBegin(340, 2);
handleNextEvent(350, 20, withRowCount(1));
handleNextEvent(370, 30, withRowCount(1));
handleNextEvent(400, 40, withRowCount(1));
handleTransactionCommit(440, 4);
handleTransactionBegin(500, 2);
handleNextEvent(510, 20, withRowCount(1));
handleNextEvent(540, 15, withRowCount(1));
handleNextEvent(560, 10, withRowCount(1));
handleTransactionCommit(580, 4);
// Try another single event transaction ...
handleTransactionBegin(600, 2);
handleNextEvent(610, 50, withRowCount(1));
handleTransactionCommit(660, 4);
// Try event outside of a transaction ...
handleNextEvent(670, 10, withRowCount(1));
// Try another single event transaction ...
handleTransactionBegin(700, 2);
handleNextEvent(710, 50, withRowCount(1));
handleTransactionCommit(760, 4);
}
@Test
public void shouldAdvanceSourceInfoFromNonZeroPositionAndRowZeroForEventsWithMultipleRow() {
sourceWith(offset(100, 0));
// Try a transactions with just one event ...
handleTransactionBegin(150, 2);
handleNextEvent(200, 10, withRowCount(3));
handleTransactionCommit(210, 2);
handleTransactionBegin(210, 2);
handleNextEvent(220, 10, withRowCount(4));
handleTransactionCommit(230, 3);
handleTransactionBegin(240, 2);
handleNextEvent(250, 50, withRowCount(5));
handleTransactionCommit(300, 4);
// Try a transactions with multiple events ...
handleTransactionBegin(340, 2);
handleNextEvent(350, 20, withRowCount(6));
handleNextEvent(370, 30, withRowCount(1));
handleNextEvent(400, 40, withRowCount(3));
handleTransactionCommit(440, 4);
handleTransactionBegin(500, 2);
handleNextEvent(510, 20, withRowCount(8));
handleNextEvent(540, 15, withRowCount(9));
handleNextEvent(560, 10, withRowCount(1));
handleTransactionCommit(580, 4);
// Try another single event transaction ...
handleTransactionBegin(600, 2);
handleNextEvent(610, 50, withRowCount(1));
handleTransactionCommit(660, 4);
// Try event outside of a transaction ...
handleNextEvent(670, 10, withRowCount(5));
// Try another single event transaction ...
handleTransactionBegin(700, 2);
handleNextEvent(710, 50, withRowCount(3));
handleTransactionCommit(760, 4);
}
// -------------------------------------------------------------------------------------
// Utility methods
// -------------------------------------------------------------------------------------
protected int withRowCount(int rowCount) {
return rowCount;
}
protected void handleTransactionBegin(long positionOfEvent, int eventSize) {
source.setEventPosition(positionOfEvent, eventSize);
positionOfBeginEvent = positionOfEvent;
source.startNextTransaction();
inTxn = true;
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
}
protected void handleTransactionCommit(long positionOfEvent, int eventSize) {
source.setEventPosition(positionOfEvent, eventSize);
source.commitTransaction();
eventNumberInTxn = 0;
inTxn = false;
// Verify the offset ...
Map<String, ?> offset = source.offset();
// The offset position should be the position of the next event
long position = (Long) offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY);
assertThat(position).isEqualTo(positionOfEvent + eventSize);
Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY);
if (rowsToSkip == null) {
rowsToSkip = 0L;
}
assertThat(rowsToSkip).isEqualTo(0);
assertThat(offset.get(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY)).isNull();
if (source.gtidSet() != null) {
assertThat(offset.get(SourceInfo.GTID_SET_KEY)).isEqualTo(source.gtidSet());
}
}
protected void handleNextEvent(long positionOfEvent, long eventSize, int rowCount) {
if (inTxn) {
++eventNumberInTxn;
}
source.setEventPosition(positionOfEvent, eventSize);
for (int row = 0; row != rowCount; ++row) {
// Get the offset for this row (always first!) ...
Map<String, ?> offset = source.offsetForRow(row, rowCount);
assertThat(offset.get(SourceInfo.BINLOG_FILENAME_OFFSET_KEY)).isEqualTo(FILENAME);
if (source.gtidSet() != null) {
assertThat(offset.get(SourceInfo.GTID_SET_KEY)).isEqualTo(source.gtidSet());
}
long position = (Long) offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY);
if (inTxn) {
// regardless of the row count, the position is always the txn begin position ...
assertThat(position).isEqualTo(positionOfBeginEvent);
// and the number of the last completed event (the previous one) ...
Long eventsToSkip = (Long) offset.get(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY);
if (eventsToSkip == null) {
eventsToSkip = 0L;
}
assertThat(eventsToSkip).isEqualTo(eventNumberInTxn - 1);
} else {
// Matches the next event ...
assertThat(position).isEqualTo(positionOfEvent + eventSize);
assertThat(offset.get(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY)).isNull();
}
Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY);
if (rowsToSkip == null) {
rowsToSkip = 0L;
}
if ((row + 1) == rowCount) {
// This is the last row, so the next binlog position should be the number of rows in the event ...
assertThat(rowsToSkip).isEqualTo(rowCount);
} else {
// This is not the last row, so the next binlog position should be the row number ...
assertThat(rowsToSkip).isEqualTo(row + 1);
}
// Get the source struct for this row (always second), which should always reflect this row in this event ...
Struct recordSource = source.struct();
assertThat(recordSource.getInt64(SourceInfo.BINLOG_POSITION_OFFSET_KEY)).isEqualTo(positionOfEvent);
assertThat(recordSource.getInt32(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY)).isEqualTo(row);
assertThat(recordSource.getString(SourceInfo.BINLOG_FILENAME_OFFSET_KEY)).isEqualTo(FILENAME);
if (source.gtidSet() != null) {
assertThat(recordSource.getString(SourceInfo.GTID_SET_KEY)).isEqualTo(source.gtidSet());
}
}
source.completeEvent();
}
protected Map<String, String> offset(long position, int row) {
return offset(null, position, row, false);
}
protected Map<String, String> offset(long position, int row, boolean snapshot) {
return offset(null, position, row, snapshot);
}
protected Map<String, String> offset(String gtidSet, long position, int row, boolean snapshot) {
Map<String, String> offset = new HashMap<>();
offset.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, FILENAME);
offset.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, Long.toString(position));
offset.put(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Integer.toString(row));
if (gtidSet != null) {
offset.put(SourceInfo.GTID_SET_KEY, gtidSet);
}
if (snapshot) {
offset.put(SourceInfo.SNAPSHOT_KEY, Boolean.TRUE.toString());
}
return offset;
}
protected SourceInfo sourceWith(Map<String, String> offset) {
source = new SourceInfo(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, SERVER_NAME)
.with(MySqlConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build()));
source.setOffset(offset);
return source;
}
/**
* When we want to consume SinkRecord which generated by debezium-connector-mysql, it should not
* throw error "org.apache.avro.SchemaParseException: Illegal character in: server-id"
*/
@Test
public void shouldValidateSourceInfoSchema() {
org.apache.kafka.connect.data.Schema kafkaSchema = source.schema();
Schema avroSchema = avroData.fromConnectSchema(kafkaSchema);
assertTrue(avroSchema != null);
}
@Test
public void shouldConsiderPositionsWithSameGtidSetsAsSame() {
assertPositionWithGtids("IdA:1-5").isAtOrBefore(positionWithGtids("IdA:1-5")); // same, single
assertPositionWithGtids("IdA:1-5,IdB:1-20").isAtOrBefore(positionWithGtids("IdA:1-5,IdB:1-20")); // same, multiple
assertPositionWithGtids("IdA:1-5,IdB:1-20").isAtOrBefore(positionWithGtids("IdB:1-20,IdA:1-5")); // equivalent
}
@Test
public void shouldConsiderPositionsWithSameGtidSetsAndSnapshotAsSame() {
assertPositionWithGtids("IdA:1-5", true).isAtOrBefore(positionWithGtids("IdA:1-5", true)); // same, single
assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdA:1-5,IdB:1-20", true)); // same,
// multiple
assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdB:1-20,IdA:1-5", true)); // equivalent
}
@Test
public void shouldOrderPositionWithGtidAndSnapshotBeforePositionWithSameGtidButNoSnapshot() {
assertPositionWithGtids("IdA:1-5", true).isAtOrBefore(positionWithGtids("IdA:1-5")); // same, single
assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdA:1-5,IdB:1-20")); // same, multiple
assertPositionWithGtids("IdA:1-5,IdB:1-20", true).isAtOrBefore(positionWithGtids("IdB:1-20,IdA:1-5")); // equivalent
}
@Test
public void shouldOrderPositionWithoutGtidAndSnapshotAfterPositionWithSameGtidAndSnapshot() {
assertPositionWithGtids("IdA:1-5", false).isAfter(positionWithGtids("IdA:1-5", true)); // same, single
assertPositionWithGtids("IdA:1-5,IdB:1-20", false).isAfter(positionWithGtids("IdA:1-5,IdB:1-20", true)); // same, multiple
assertPositionWithGtids("IdA:1-5,IdB:1-20", false).isAfter(positionWithGtids("IdB:1-20,IdA:1-5", true)); // equivalent
}
@Test
public void shouldOrderPositionWithGtidsAsBeforePositionWithExtraServerUuidInGtids() {
assertPositionWithGtids("IdA:1-5").isBefore(positionWithGtids("IdA:1-5,IdB:1-20"));
}
@Test
public void shouldOrderPositionsWithSameServerButLowerUpperLimitAsBeforePositionWithSameServerUuidInGtids() {
assertPositionWithGtids("IdA:1-5").isBefore(positionWithGtids("IdA:1-6"));
assertPositionWithGtids("IdA:1-5:7-9").isBefore(positionWithGtids("IdA:1-10"));
assertPositionWithGtids("IdA:2-5:8-9").isBefore(positionWithGtids("IdA:1-10"));
}
@Test
public void shouldOrderPositionWithoutGtidAsBeforePositionWithGtid() {
assertPositionWithoutGtids("filename.01", Integer.MAX_VALUE, 0, 0).isBefore(positionWithGtids("IdA:1-5"));
}
@Test
public void shouldOrderPositionWithGtidAsAfterPositionWithoutGtid() {
assertPositionWithGtids("IdA:1-5").isAfter(positionWithoutGtids("filename.01", 0, 0, 0));
}
@Test
public void shouldComparePositionsWithoutGtids() {
// Same position ...
assertPositionWithoutGtids("fn.01", 1, 0, 0).isAt(positionWithoutGtids("fn.01", 1, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isAt(positionWithoutGtids("fn.01", 1, 0, 1));
assertPositionWithoutGtids("fn.03", 1, 0, 1).isAt(positionWithoutGtids("fn.03", 1, 0, 1));
assertPositionWithoutGtids("fn.01", 1, 1, 0).isAt(positionWithoutGtids("fn.01", 1, 1, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAt(positionWithoutGtids("fn.01", 1, 1, 1));
assertPositionWithoutGtids("fn.03", 1, 1, 1).isAt(positionWithoutGtids("fn.03", 1, 1, 1));
// Before position ...
assertPositionWithoutGtids("fn.01", 1, 0, 0).isBefore(positionWithoutGtids("fn.01", 1, 0, 1));
assertPositionWithoutGtids("fn.01", 1, 0, 0).isBefore(positionWithoutGtids("fn.01", 2, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isBefore(positionWithoutGtids("fn.01", 1, 0, 2));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isBefore(positionWithoutGtids("fn.01", 2, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 0).isBefore(positionWithoutGtids("fn.01", 1, 1, 1));
assertPositionWithoutGtids("fn.01", 1, 1, 0).isBefore(positionWithoutGtids("fn.01", 1, 2, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isBefore(positionWithoutGtids("fn.01", 1, 2, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isBefore(positionWithoutGtids("fn.01", 2, 0, 0));
// After position ...
assertPositionWithoutGtids("fn.01", 1, 0, 1).isAfter(positionWithoutGtids("fn.01", 0, 0, 99));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isAfter(positionWithoutGtids("fn.01", 1, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 0, 0, 99));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 1, 0));
}
@Test
public void shouldComparePositionsWithDifferentFields() {
Document history = positionWith("mysql-bin.000008", 380941551, "01261278-6ade-11e6-b36a-42010af00790:1-378422946,"
+ "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11002284,"
+ "716ec46f-d522-11e5-bb56-0242ac110004:1-34673215,"
+ "96c2072e-e428-11e6-9590-42010a28002d:1-3,"
+ "c627b2bc-9647-11e6-a886-42010af0044a:1-9541144", 0, 0, true);
Document current = positionWith("mysql-bin.000016", 645115324, "01261278-6ade-11e6-b36a-42010af00790:1-400944168,"
+ "30efb117-e42a-11e6-ba9e-42010a28002e:1-9,"
+ "4d1a4918-44ba-11e6-bf12-42010af0040b:1-11604379,"
+ "621dc2f6-803b-11e6-acc1-42010af000a4:1-7963838,"
+ "716ec46f-d522-11e5-bb56-0242ac110004:1-35850702,"
+ "c627b2bc-9647-11e6-a886-42010af0044a:1-10426868,"
+ "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648", 2, 1, false);
assertThatDocument(current).isAfter(history);
Set<String> excludes = Collections.singleton("96c2072e-e428-11e6-9590-42010a28002d");
assertThatDocument(history).isAtOrBefore(current, (uuid) -> !excludes.contains(uuid));
}
@FixFor("DBZ-107")
@Test
public void shouldRemoveNewlinesFromGtidSet() {
String gtidExecuted = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2,\n" +
"7145bf69-d1ca-11e5-a588-0242ac110004:1-3149,\n" +
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-39";
String gtidCleaned = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2," +
"7145bf69-d1ca-11e5-a588-0242ac110004:1-3149," +
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-39";
source.setCompletedGtidSet(gtidExecuted);
assertThat(source.gtidSet()).isEqualTo(gtidCleaned);
}
@FixFor("DBZ-107")
@Test
public void shouldNotSetBlankGtidSet() {
source.setCompletedGtidSet("");
assertThat(source.gtidSet()).isNull();
}
@FixFor("DBZ-107")
@Test
public void shouldNotSetNullGtidSet() {
source.setCompletedGtidSet(null);
assertThat(source.gtidSet()).isNull();
}
@Test
public void shouldHaveTimestamp() {
sourceWith(offset(100, 5, true));
source.setBinlogTimestampSeconds(1_024);
assertThat(source.struct().get("ts_sec")).isEqualTo(1_024L);
}
@Test
public void versionIsPresent() {
sourceWith(offset(100, 5, true));
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version());
}
@Test
public void connectorIsPresent() {
sourceWith(offset(100, 5, true));
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
protected Document positionWithGtids(String gtids) {
return positionWithGtids(gtids, false);
}
protected Document positionWithGtids(String gtids, boolean snapshot) {
if (snapshot) {
return Document.create(SourceInfo.GTID_SET_KEY, gtids, SourceInfo.SNAPSHOT_KEY, true);
}
return Document.create(SourceInfo.GTID_SET_KEY, gtids);
}
protected Document positionWithoutGtids(String filename, int position, int event, int row) {
return positionWithoutGtids(filename, position, event, row, false);
}
protected Document positionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) {
return positionWith(filename, position, null, event, row, snapshot);
}
protected Document positionWith(String filename, int position, String gtids, int event, int row, boolean snapshot) {
Document pos = Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename,
SourceInfo.BINLOG_POSITION_OFFSET_KEY, position);
if (row >= 0) {
pos = pos.set(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row);
}
if (event >= 0) {
pos = pos.set(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event);
}
if (gtids != null && gtids.trim().length() != 0) {
pos = pos.set(SourceInfo.GTID_SET_KEY, gtids);
}
if (snapshot) {
pos = pos.set(SourceInfo.SNAPSHOT_KEY, true);
}
return pos;
}
protected PositionAssert assertThatDocument(Document position) {
return new PositionAssert(position);
}
protected PositionAssert assertPositionWithGtids(String gtids) {
return assertThatDocument(positionWithGtids(gtids));
}
protected PositionAssert assertPositionWithGtids(String gtids, boolean snapshot) {
return assertThatDocument(positionWithGtids(gtids, snapshot));
}
protected PositionAssert assertPositionWithoutGtids(String filename, int position, int event, int row) {
return assertPositionWithoutGtids(filename, position, event, row, false);
}
protected PositionAssert assertPositionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) {
return assertThatDocument(positionWithoutGtids(filename, position, event, row, snapshot));
}
protected static class PositionAssert extends GenericAssert<PositionAssert, Document> {
public PositionAssert(Document position) {
super(PositionAssert.class, position);
}
public PositionAssert isAt(Document otherPosition) {
return isAt(otherPosition, null);
}
public PositionAssert isAt(Document otherPosition, Predicate<String> gtidFilter) {
if (SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) {
return this;
}
failIfCustomMessageIsSet();
throw failure(actual + " should be consider same position as " + otherPosition);
}
public PositionAssert isBefore(Document otherPosition) {
return isBefore(otherPosition, null);
}
public PositionAssert isBefore(Document otherPosition, Predicate<String> gtidFilter) {
return isAtOrBefore(otherPosition, gtidFilter);
}
public PositionAssert isAtOrBefore(Document otherPosition) {
return isAtOrBefore(otherPosition, null);
}
public PositionAssert isAtOrBefore(Document otherPosition, Predicate<String> gtidFilter) {
if (SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) {
return this;
}
failIfCustomMessageIsSet();
throw failure(actual + " should be consider same position as or before " + otherPosition);
}
public PositionAssert isAfter(Document otherPosition) {
return isAfter(otherPosition, null);
}
public PositionAssert isAfter(Document otherPosition, Predicate<String> gtidFilter) {
if (!SourceInfo.isPositionAtOrBefore(actual, otherPosition, gtidFilter)) {
return this;
}
failIfCustomMessageIsSet();
throw failure(actual + " should be consider after " + otherPosition);
}
}
}

View File

@ -609,7 +609,9 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
stopConnector();
// Read the last committed offsets, and verify the binlog coordinates ...
SourceInfo persistedOffsetSource = new SourceInfo(config.getString(RelationalDatabaseConnectorConfig.SERVER_NAME));
SourceInfo persistedOffsetSource = new SourceInfo(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, config.getString(RelationalDatabaseConnectorConfig.SERVER_NAME))
.build()));
Map<String, ?> lastCommittedOffset = readLastCommittedOffset(config, persistedOffsetSource.partition());
persistedOffsetSource.setOffset(lastCommittedOffset);
Testing.print("Position before inserts: " + positionBeforeInserts);

View File

@ -16,6 +16,7 @@
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
@ -42,7 +43,9 @@ public void beforeEach() {
Testing.Files.delete(TEST_FILE_PATH);
build = new Configurator();
mysql = null;
source = new SourceInfo("server");
source = new SourceInfo(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, "server")
.build()));
}
@After

View File

@ -42,7 +42,9 @@ public class SourceInfoTest {
@Before
public void beforeEach() {
source = new SourceInfo("server");
source = new SourceInfo(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, "server")
.build()));
inTxn = false;
positionOfBeginEvent = 0L;
eventNumberInTxn = 0;
@ -473,7 +475,9 @@ protected Map<String, String> offset(String gtidSet, long position, int row, boo
}
protected SourceInfo sourceWith(Map<String, String> offset) {
source = new SourceInfo(SERVER_NAME);
source = new SourceInfo(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, SERVER_NAME)
.build()));
source.setOffset(offset);
return source;
}
@ -484,7 +488,7 @@ protected SourceInfo sourceWith(Map<String, String> offset) {
*/
@Test
public void shouldValidateSourceInfoSchema() {
org.apache.kafka.connect.data.Schema kafkaSchema = SourceInfo.SCHEMA;
org.apache.kafka.connect.data.Schema kafkaSchema = source.schema();
Schema avroSchema = avroData.fromConnectSchema(kafkaSchema);
assertTrue(avroSchema != null);
}
@ -614,6 +618,13 @@ public void shouldNotSetNullGtidSet() {
assertThat(source.gtidSet()).isNull();
}
@Test
public void shouldHaveTimestamp() {
sourceWith(offset(100, 5, true));
source.setBinlogTimestampSeconds(1_024);
assertThat(source.struct().get("ts_ms")).isEqualTo(1_024_000L);
}
@Test
public void versionIsPresent() {
sourceWith(offset(100, 5, true));