DBZ-7693 Additional refactor (binlog/mariadb)

This commit is contained in:
Chris Cranford 2024-03-29 23:17:16 -04:00 committed by Jiri Pechanec
parent c20c91111d
commit a3007cc6ab
15 changed files with 194 additions and 293 deletions

View File

@ -0,0 +1,120 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.binlog;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.common.RelationalBaseSourceConnector;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
/**
* Abstract base class for binlog-based connectors.
*
* @author Chris Cranford
*/
public abstract class BinlogConnector<T extends BinlogConnectorConfig> extends RelationalBaseSourceConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(BinlogConnector.class);
@Immutable
private Map<String, String> properties;
@Override
public void start(Map<String, String> props) {
this.properties = Collections.unmodifiableMap(new HashMap<>(props));
}
@Override
public void stop() {
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (maxTasks > 1) {
throw new IllegalArgumentException("Only a single connector task may be started");
}
return Collections.singletonList(properties);
}
@Override
protected void validateConnection(Map<String, ConfigValue> configValues, Configuration config) {
ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
final T connectorConfig = createConnectorConfig(config);
try (BinlogConnectorConnection connection = createConnection(config, connectorConfig)) {
try {
connection.connect();
connection.execute("SELECT version()");
LOGGER.info("Successfully tested connection for {} with user '{}'",
connection.connectionString(), connection.connectionConfig().username());
}
catch (SQLException e) {
LOGGER.error("Failed testing connection for {} with user '{}'",
connection.connectionString(), connection.connectionConfig().username(), e);
hostnameValue.addErrorMessage("Unable to connect: " + e.getMessage());
}
}
catch (SQLException e) {
LOGGER.error("Unexpected error shutting down the database connection", e);
}
}
@Override
@SuppressWarnings("unchecked")
public List<TableId> getMatchingCollections(Configuration config) {
final T connectorConfig = createConnectorConfig(config);
try (BinlogConnectorConnection connection = createConnection(config, connectorConfig)) {
final List<TableId> tables = new ArrayList<>();
final List<String> databaseNames = connection.availableDatabases();
final RelationalTableFilters tableFilter = connectorConfig.getTableFilters();
for (String databaseName : databaseNames) {
if (!tableFilter.databaseFilter().test(databaseName)) {
continue;
}
tables.addAll(
connection.readTableNames(databaseName, null, null, new String[]{ "TABLE" }).stream()
.filter(tableId -> tableFilter.dataCollectionFilter().isIncluded(tableId))
.collect(Collectors.toList()));
}
return tables;
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}
/**
* Create the connection.
*
* @param config the connector configuration; never null
* @param connectorConfig the connector configuration; never null
* @return the connector connection; never null
*/
protected abstract BinlogConnectorConnection createConnection(Configuration config, T connectorConfig);
/**
* Create the connector configuration.
*
* @param config the configuration; never null
* @return the connector-specific configuration
*/
protected abstract T createConnectorConfig(Configuration config);
}

View File

@ -16,8 +16,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.RelationalTableFilters;
@ -48,7 +46,7 @@
*
* @author Chris Cranford
*/
public abstract class BinlogDatabaseSchema<P extends Partition, O extends OffsetContext, V extends ValueConverterProvider, D extends DefaultValueConverter>
public abstract class BinlogDatabaseSchema<P extends BinlogPartition, O extends BinlogOffsetContext<?>, V extends ValueConverterProvider, D extends DefaultValueConverter>
extends HistorizedRelationalDatabaseSchema {
private final static Logger LOGGER = LoggerFactory.getLogger(BinlogDatabaseSchema.class);
@ -277,7 +275,9 @@ public List<SchemaChangeEvent> parseStreamingDdl(P partition, String ddlStatemen
* @param tableIds set of table identifiers
* @param changeTime the time the event happened
*/
protected abstract void handleTableEvent(O offset, String databaseName, Set<TableId> tableIds, Instant changeTime);
protected void handleTableEvent(O offset, String databaseName, Set<TableId> tableIds, Instant changeTime) {
offset.tableEvent(databaseName, tableIds, changeTime);
}
/**
* Update offsets based on a database-specific event.
@ -286,7 +286,9 @@ public List<SchemaChangeEvent> parseStreamingDdl(P partition, String ddlStatemen
* @param databaseName the database name
* @param changeTime the time the event happened
*/
protected abstract void handleDatabaseEvent(O offset, String databaseName, Instant changeTime);
protected void handleDatabaseEvent(O offset, String databaseName, Instant changeTime) {
offset.databaseEvent(databaseName, changeTime);
}
/**
* Discards any currently-cached schemas and rebuild using filters

View File

@ -63,7 +63,7 @@
*
* @author Chris Cranford
*/
public abstract class BinlogSnapshotChangeEventSource<P extends BinlogPartition, O extends BinlogOffsetContext>
public abstract class BinlogSnapshotChangeEventSource<P extends BinlogPartition, O extends BinlogOffsetContext<?>>
extends RelationalSnapshotChangeEventSource<P, O> {
private static final Logger LOGGER = LoggerFactory.getLogger(BinlogSnapshotChangeEventSource.class);

View File

@ -18,7 +18,6 @@
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
@ -50,7 +49,6 @@
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
@ -357,6 +355,10 @@ protected void setGtidChanged(String gtid) {
metrics.onGtidChange(gtid);
}
protected boolean isGtidModeEnabled() {
return isGtidModeEnabled;
}
// todo: perhaps refactor back out to a binary log configurator instance?
protected BinaryLogClient createBinaryLogClient(BinlogTaskContext<?> taskContext,
BinlogConnectorConfig connectorConfig,
@ -516,26 +518,7 @@ protected void onEvent(O offsetContext, Event event) {
metrics.setMilliSecondsBehindSource(ts);
}
// todo: make abstract for MySQL
protected void setEventTimestamp(Event event, long eventTs) {
if (eventTimestamp == null || !isGtidModeEnabled) {
// Fallback to second resolution event timestamps
eventTimestamp = Instant.ofEpochMilli(eventTs);
}
else if (event.getHeader().getEventType() == EventType.GTID) {
// Prefer higher resolution replication timestamps from MySQL 8 GTID events, if possible
GtidEventData gtidEvent = unwrapData(event);
final long gtidEventTs = gtidEvent.getOriginalCommitTimestamp();
if (gtidEventTs != 0) {
// >= MySQL 8.0.1, prefer the higher resolution replication timestamp
eventTimestamp = Instant.EPOCH.plus(gtidEventTs, ChronoUnit.MICROS);
}
else {
// Fallback to second resolution event timestamps
eventTimestamp = Instant.ofEpochMilli(eventTs);
}
}
}
protected abstract void setEventTimestamp(Event event, long eventTs);
protected void ignoreEvent(O offsetContext, Event event) {
LOGGER.trace("Ignoring event due to missing handler: {}", event);

View File

@ -9,6 +9,7 @@
import io.debezium.annotation.VisibleForTesting;
import io.debezium.connector.binlog.BinlogOffsetContext;
import io.debezium.connector.binlog.BinlogSourceInfo;
import io.debezium.connector.binlog.gtid.GtidSet;
import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.document.Document;
@ -142,15 +143,19 @@ else if (recordedGtid != null) {
* @param document the document to inspect, should not be null
* @return the global transaction identifier set as a string
*/
protected abstract String getGtidSet(Document document);
protected String getGtidSet(Document document) {
return document.getString(BinlogOffsetContext.GTID_SET_KEY);
}
/**
* Get the server unique identifer.
* Get the server unique identifier.
*
* @param document the document to inspect, should not be null
* @return the unique server identifer
* @return the unique server identifier
*/
protected abstract int getServerId(Document document);
protected int getServerId(Document document) {
return document.getInteger(BinlogSourceInfo.SERVER_ID_KEY, 0);
}
/**
* Get whether the event is part of the connector's snapshot phase.
@ -158,7 +163,9 @@ else if (recordedGtid != null) {
* @param document the document to inspect, should not be null
* @return true if its part of the snapshot, false otherwise
*/
protected abstract boolean isSnapshot(Document document);
protected boolean isSnapshot(Document document) {
return document.has(BinlogSourceInfo.SNAPSHOT_KEY);
}
/**
* Get the timestamp.
@ -166,7 +173,9 @@ else if (recordedGtid != null) {
* @param document the document to inspect, should not be null
* @return the timestamp value
*/
protected abstract long getTimestamp(Document document);
protected long getTimestamp(Document document) {
return document.getLong(BinlogSourceInfo.TIMESTAMP_KEY, 0);
}
/**
* Get the binlog file name.
@ -174,7 +183,9 @@ else if (recordedGtid != null) {
* @param document the document to inspect, should not be null
* @return the binlog file name value
*/
protected abstract BinlogFileName getBinlogFileName(Document document);
protected BinlogFileName getBinlogFileName(Document document) {
return BinlogFileName.of(document.getString(BinlogSourceInfo.BINLOG_FILENAME_OFFSET_KEY));
}
/**
* Get the binlog position.
@ -182,7 +193,9 @@ else if (recordedGtid != null) {
* @param document the document to inspect, should not be null
* @return the binlog position value
*/
protected abstract int getBinlogPosition(Document document);
protected int getBinlogPosition(Document document) {
return document.getInteger(BinlogSourceInfo.BINLOG_POSITION_OFFSET_KEY, -1);
}
/**
* Get the number of events to skip.
@ -190,7 +203,9 @@ else if (recordedGtid != null) {
* @param document the document to inspect, should not be null
* @return the binlog number of events to skip value
*/
protected abstract int getEventsToSkip(Document document);
protected int getEventsToSkip(Document document) {
return document.getInteger(BinlogOffsetContext.EVENTS_TO_SKIP_OFFSET_KEY, 0);
}
/**
* Get the binlog row in event value.
@ -198,7 +213,9 @@ else if (recordedGtid != null) {
* @param document the document to inspect, should not be null
* @return the binlog row in event value
*/
protected abstract int getBinlogRowInEvent(Document document);
protected int getBinlogRowInEvent(Document document) {
return document.getInteger(BinlogSourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, -1);
}
protected static class BinlogFileName implements Comparable<BinlogFileName> {
private final String baseName;

View File

@ -105,6 +105,10 @@ public void afterEach() {
}
}
protected UniqueDatabase getDatabase() {
return DATABASE;
}
/**
* Verifies that the connector doesn't run with an invalid configuration. This does not actually connect to the MySQL server.
*/
@ -126,44 +130,6 @@ public void shouldNotStartWithInvalidConfiguration() {
assertConnectorNotRunning();
}
// todo: move these to MySQL only
// @Test
// public void shouldNotStartWithUnknownJdbcDriver() {
// config = DATABASE.defaultConfig()
// .with(MySqlConnectorConfig.JDBC_DRIVER, "foo.bar")
// .build();
//
// final AtomicBoolean successResult = new AtomicBoolean();
// final AtomicReference<String> message = new AtomicReference<>();
// start(getConnectorClass(), config, (success, msg, error) -> {
// successResult.set(success);
// message.set(msg);
// });
//
// assertThat(successResult.get()).isEqualTo(false);
// assertThat(message.get()).contains("java.lang.ClassNotFoundException: foo.bar");
// assertConnectorNotRunning();
// }
//
// @Test
// public void shouldNotStartWithWrongProtocol() {
// config = DATABASE.defaultConfig()
// .with(MySqlConnectorConfig.JDBC_PROTOCOL, "foo:bar")
// .build();
//
// final AtomicBoolean successResult = new AtomicBoolean();
// final AtomicReference<String> message = new AtomicReference<>();
// start(getConnectorClass(), config, (success, msg, error) -> {
// successResult.set(success);
// message.set(msg);
// });
//
// assertThat(successResult.get()).isEqualTo(false);
// assertThat(message.get()).contains("Unable to obtain a JDBC connection");
// assertConnectorNotRunning();
// }
@Test
public void shouldFailToValidateInvalidConfiguration() {
final Configuration config = Configuration.create()
@ -203,9 +169,6 @@ protected void assertInvalidConfiguration(Config result) {
assertNoConfigurationErrors(result, BinlogConnectorConfig.SCHEMA_HISTORY);
assertNoConfigurationErrors(result, BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, BinlogConnectorConfig.SNAPSHOT_MODE);
// todo: push these to MySQL override
// assertNoConfigurationErrors(result, BinlogConnectorConfig.SNAPSHOT_LOCKING_MODE);
// assertNoConfigurationErrors(result, BinlogConnectorConfig.SNAPSHOT_NEW_TABLES);
assertNoConfigurationErrors(result, BinlogConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, BinlogConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, BinlogConnectorConfig.SSL_KEYSTORE_PASSWORD);
@ -243,9 +206,6 @@ protected void assertValidConfiguration(Config result) {
validateConfigField(result, BinlogConnectorConfig.SCHEMA_HISTORY, "io.debezium.storage.kafka.history.KafkaSchemaHistory");
validateConfigField(result, BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, Boolean.TRUE);
validateConfigField(result, BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL);
// todo: push to MySQL variant
// validateConfigField(result, MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, SnapshotLockingMode.MINIMAL);
// validateConfigField(result, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES, SnapshotNewTables.OFF);
validateConfigField(result, BinlogConnectorConfig.SSL_MODE, SecureConnectionMode.PREFERRED);
validateConfigField(result, BinlogConnectorConfig.SSL_KEYSTORE, null);
validateConfigField(result, BinlogConnectorConfig.SSL_KEYSTORE_PASSWORD, null);

View File

@ -35,11 +35,9 @@
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.junit.SkipTestDependingOnGtidModeRule;
import io.debezium.connector.binlog.junit.SkipWhenGtidModeIs;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
@ -54,7 +52,6 @@
import io.debezium.util.Collect;
import io.debezium.util.Testing;
@SkipWhenGtidModeIs(value = SkipWhenGtidModeIs.GtidMode.OFF, reason = "Read only connection requires GTID_MODE to be ON")
public abstract class BinlogReadOnlyIncrementalSnapshotIT<C extends SourceConnector> extends BinlogIncrementalSnapshotIT<C> {
private static KafkaCluster kafka;
@ -62,9 +59,6 @@ public abstract class BinlogReadOnlyIncrementalSnapshotIT<C extends SourceConnec
public static final String EXCLUDED_TABLE = "b";
private final Path signalsFile = Paths.get("src", "test", "resources").resolve("debezium_signaling_file.txt");
@Rule
public TestRule skipTest = new SkipTestDependingOnGtidModeRule();
@Rule
public ConditionalFail conditionalFail = new ConditionalFail();

View File

@ -274,47 +274,6 @@ else if (Objects.equals(previousSnapshotSourceField, "last_in_data_collection"))
assertThat(customer.get("email")).isEqualTo("sally.thomas@acme.com");
}
// todo: move this to MySQL variant only?
// @Test
// @SkipWhenDatabaseIs(value = SkipWhenDatabaseIs.Type.MYSQL, reason = "Only applies to Percona")
// @SkipWhenDatabaseIs(value = SkipWhenDatabaseIs.Type.MARIADB, reason = "Only applies to Percona")
// public void snapshotWithBackupLocksShouldNotWaitForReads() throws Exception {
// config = simpleConfig()
// .with(BinlogConnectorConfig.USER, "cloud")
// .with(BinlogConnectorConfig.PASSWORD, "cloudpass")
// .with(BinlogConnectorConfig.SNAPSHOT_LOCKING_MODE, BinlogConnectorConfig.SnapshotLockingMode.MINIMAL_PERCONA)
// .build();
//
// final BinlogTestConnection db = getTestDatabaseConnection(DATABASE.getDatabaseName());
// final JdbcConnection connection = db.connect();
// final CountDownLatch latch = new CountDownLatch(1);
// Thread t = new Thread() {
// @Override
// public void run() {
// try {
// connection.executeWithoutCommitting("SELECT *, SLEEP(20) FROM products_on_hand WHERE product_id=101");
// latch.countDown();
// }
// catch (Exception e) {
// // Do nothing.
// }
// }
// };
// t.start();
//
// latch.await(10, TimeUnit.SECONDS);
// // Start the connector ...
// start(getConnectorClass(), config);
// waitForSnapshotToBeCompleted(getConnectorName(), DATABASE.getServerName());
//
// // Poll for records ...
// // Testing.Print.enable();
// final int recordCount = 9 + 9 + 4 + 5 + 1;
// SourceRecords sourceRecords = consumeRecordsByTopic(recordCount);
// assertThat(sourceRecords.allRecordsInOrder()).hasSize(recordCount);
// connection.connection().close();
// }
@Test
@FixFor("DBZ-2456")
public void shouldCreateSnapshotSelectively() throws Exception {
@ -543,7 +502,7 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingInsertEvents() throws Excep
assertThat(after.get("c11")).isEqualTo(toMicroSeconds("-PT00H00M00.000000S"));
}
private String productsTableName() throws SQLException {
protected String productsTableName() throws SQLException {
try (BinlogTestConnection db = getTestDatabaseConnection(DATABASE.getDatabaseName())) {
return db.isTableIdCaseSensitive() ? "products" : "Products";
}
@ -897,7 +856,7 @@ public void shouldCreateSnapshotSchemaOnly() throws Exception {
assertThat(heartbeatRecord.sourceOffset().get("snapshot")).isNull();
}
private long toMicroSeconds(String duration) {
protected long toMicroSeconds(String duration) {
return Duration.parse(duration).toNanos() / 1_000;
}
}

View File

@ -237,12 +237,6 @@ public Configuration.Builder defaultJdbcConfigBuilder() {
.with(BinlogConnectorConfig.PASSWORD, "snapperpass");
builder = applyConnectorDefaultJdbcConfiguration(builder);
/*
* .with(MySqlConnectorConfig.JDBC_PROTOCOL, System.getProperty("database.protocol",
* MySqlConnectorConfig.JDBC_PROTOCOL.defaultValueAsString()))
* .with(MySqlConnectorConfig.JDBC_DRIVER, System.getProperty("database.jdbc.driver",
* MySqlConnectorConfig.JDBC_DRIVER.defaultValueAsString()))
*/
String sslMode = System.getProperty("database.ssl.mode", "preferred");

View File

@ -5,29 +5,17 @@
*/
package io.debezium.connector.mariadb;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.connector.common.RelationalBaseSourceConnector;
import io.debezium.connector.binlog.BinlogConnector;
import io.debezium.connector.mariadb.jdbc.MariaDbConnection;
import io.debezium.connector.mariadb.jdbc.MariaDbConnectionConfiguration;
import io.debezium.connector.mariadb.jdbc.MariaDbFieldReader;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
/**
* A Debezium source connector that creates tasks and reads changes from MariaDB's binary transaction logs,
@ -35,12 +23,7 @@
*
* @author Chris Cranford
*/
public class MariaDbConnector extends RelationalBaseSourceConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(MariaDbConnector.class);
@Immutable
private Map<String, String> properties;
public class MariaDbConnector extends BinlogConnector<MariaDbConnectorConfig> {
public MariaDbConnector() {
}
@ -50,90 +33,28 @@ public String version() {
return Module.version();
}
@Override
public void start(Map<String, String> properties) {
this.properties = Collections.unmodifiableMap(new HashMap<>(properties));
}
@Override
public Class<? extends Task> taskClass() {
return MariaDbConnectorTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (maxTasks > 1) {
throw new IllegalArgumentException("Only a single connector task may be started");
}
return Collections.singletonList(properties);
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return MariaDbConnectorConfig.configDef();
}
@Override
protected void validateConnection(Map<String, ConfigValue> configValues, Configuration config) {
ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
try (MariaDbConnection connection = createConnection(config)) {
try {
connection.connect();
connection.execute("SELECT version()");
LOGGER.info("Successfully tested connection for {} with user '{}'",
connection.connectionString(), connection.connectionConfig().username());
}
catch (SQLException e) {
LOGGER.error("Failed testing connection for {} with user '{}'",
connection.connectionString(), connection.connectionConfig().username(), e);
hostnameValue.addErrorMessage("Unable to connect: " + e.getMessage());
}
}
catch (SQLException e) {
LOGGER.error("Unexpected error shutting down the database connection", e);
}
}
@Override
protected Map<String, ConfigValue> validateAllFields(Configuration config) {
return config.validate(MariaDbConnectorConfig.ALL_FIELDS);
}
@Override
@SuppressWarnings("unchecked")
public List<TableId> getMatchingCollections(Configuration config) {
final MariaDbConnectorConfig connectorConfig = createConnectorConfig(config);
try (MariaDbConnection connection = createConnection(config)) {
final List<TableId> tables = new ArrayList<>();
final List<String> databaseNames = connection.availableDatabases();
for (String databaseName : databaseNames) {
if (!connectorConfig.getTableFilters().databaseFilter().test(databaseName)) {
continue;
}
tables.addAll(
connection.readTableNames(databaseName, null, null, new String[]{ "TABLE" }).stream()
.filter(tableId -> connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId))
.collect(Collectors.toList()));
}
return tables;
}
catch (SQLException e) {
throw new DebeziumException(e);
}
protected MariaDbConnection createConnection(Configuration config, MariaDbConnectorConfig connectorConfig) {
return new MariaDbConnection(new MariaDbConnectionConfiguration(config), new MariaDbFieldReader(connectorConfig));
}
private MariaDbConnectorConfig createConnectorConfig(Configuration config) {
@Override
protected MariaDbConnectorConfig createConnectorConfig(Configuration config) {
return new MariaDbConnectorConfig(config);
}
private MariaDbConnection createConnection(Configuration config) {
final MariaDbConnectionConfiguration connectionConfig = new MariaDbConnectionConfiguration(config);
return new MariaDbConnection(connectionConfig, new MariaDbFieldReader(createConnectorConfig(config)));
}
}

View File

@ -176,10 +176,11 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
.excluding(
BinlogConnectorConfig.GTID_SOURCE_INCLUDES,
BinlogConnectorConfig.GTID_SOURCE_EXCLUDES)
.connector(SNAPSHOT_LOCKING_MODE)
.events(
GTID_SOURCE_INCLUDES,
GTID_SOURCE_EXCLUDES,
SNAPSHOT_LOCKING_MODE)
SOURCE_INFO_STRUCT_MAKER)
.create();
protected static ConfigDef configDef() {

View File

@ -40,7 +40,6 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
@ -274,6 +273,7 @@ protected List<SourceRecord> doPoll() throws InterruptedException {
}
@Override
@SuppressWarnings("unchecked")
protected Long getReadOnlyIncrementalSnapshotSignalOffset(MariaDbOffsetContext previousOffset) {
return ((MariaDbReadOnlyIncrementalSnapshotContext<TableId>) previousOffset.getIncrementalSnapshotContext()).getSignalOffset();
}
@ -292,16 +292,4 @@ private BinlogFieldReader getFieldReader(MariaDbConnectorConfig connectorConfig)
return new MariaDbFieldReader(connectorConfig);
}
private void resetOffset(MariaDbConnectorConfig connectorConfig, MariaDbOffsetContext previousOffset,
SignalProcessor<MariaDbPartition, MariaDbOffsetContext> signalProcessor) {
boolean isKafkaChannelEnabled = connectorConfig.getEnabledChannels().contains(KafkaSignalChannel.CHANNEL_NAME);
if (previousOffset != null && isKafkaChannelEnabled && connectorConfig.isReadOnlyConnection()) {
KafkaSignalChannel kafkaSignal = signalProcessor.getSignalChannel(KafkaSignalChannel.class);
Long signalOffset = getReadOnlyIncrementalSnapshotSignalOffset(previousOffset);
if (signalOffset != null) {
LOGGER.info("Resetting Kafka Signal offset to {}", signalOffset);
kafkaSignal.reset(signalOffset);
}
}
}
}

View File

@ -5,9 +5,6 @@
*/
package io.debezium.connector.mariadb;
import java.time.Instant;
import java.util.Set;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogDatabaseSchema;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
@ -46,13 +43,4 @@ protected DdlParser createDdlParser(BinlogConnectorConfig connectorConfig, Maria
getTableFilter());
}
@Override
protected void handleTableEvent(MariaDbOffsetContext offset, String databaseName, Set<TableId> tableIds, Instant changeTime) {
offset.tableEvent(databaseName, tableIds, changeTime);
}
@Override
protected void handleDatabaseEvent(MariaDbOffsetContext offset, String databaseName, Instant changeTime) {
offset.databaseEvent(databaseName, changeTime);
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mariadb;
import io.debezium.schema.SchemaFactory;
/**
* @author Chris Cranford
*/
public class MariaDbSchemaFactory extends SchemaFactory {
private static final MariaDbSchemaFactory INSTANCE = new MariaDbSchemaFactory();
public static MariaDbSchemaFactory get() {
return INSTANCE;
}
}

View File

@ -9,9 +9,6 @@
import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.connector.binlog.history.BinlogHistoryRecordComparator;
import io.debezium.connector.mariadb.MariaDbOffsetContext;
import io.debezium.connector.mariadb.SourceInfo;
import io.debezium.document.Document;
/**
* Schema history record comparator implementation for MariaDB.
@ -19,49 +16,7 @@
* @author Chris Cranford
*/
public class MariaDbHistoryRecordComparator extends BinlogHistoryRecordComparator {
public MariaDbHistoryRecordComparator(Predicate<String> gtidSourceFilter, GtidSetFactory gtidSetFactory) {
super(gtidSourceFilter, gtidSetFactory);
}
@Override
protected String getGtidSet(Document document) {
return document.getString(MariaDbOffsetContext.GTID_SET_KEY);
}
@Override
protected int getServerId(Document document) {
return document.getInteger(SourceInfo.SERVER_ID_KEY, 0);
}
@Override
protected boolean isSnapshot(Document document) {
return document.has(SourceInfo.SNAPSHOT_KEY);
}
@Override
protected long getTimestamp(Document document) {
return document.getLong(SourceInfo.TIMESTAMP_KEY, 0);
}
@Override
protected BinlogFileName getBinlogFileName(Document document) {
return BinlogFileName.of(document.getString(SourceInfo.BINLOG_FILENAME_OFFSET_KEY));
}
@Override
protected int getBinlogPosition(Document document) {
return document.getInteger(SourceInfo.BINLOG_POSITION_OFFSET_KEY, -1);
}
@Override
protected int getEventsToSkip(Document document) {
return document.getInteger(MariaDbOffsetContext.EVENTS_TO_SKIP_OFFSET_KEY, 0);
}
@Override
protected int getBinlogRowInEvent(Document document) {
return document.getInteger(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, -1);
}
}