DBZ-1895 Implement SKIPPED_OPERATIONS for mysql

This commit is contained in:
Hossein Torabi 2020-03-24 02:38:01 +04:30 committed by Jiri Pechanec
parent dae655c78b
commit 9ddd893074
5 changed files with 88 additions and 17 deletions

View File

@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@ -57,6 +58,7 @@
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.NullEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
@ -66,6 +68,7 @@
import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
import io.debezium.data.Envelope.Operation;
import io.debezium.function.BlockingConsumer;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.TableId;
@ -267,21 +270,44 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
Set<Operation> skippedOperations = context.getConnectorConfig().getSkippedOps();
if (skippedOperations.contains(Operation.CREATE.code())) {
eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS, new NullEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new NullEventDataDeserializer());
}
else {
eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
}
if (skippedOperations.contains(Operation.UPDATE.code())) {
eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS, new NullEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, new NullEventDataDeserializer());
}
else {
eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
}
if (skippedOperations.contains(Operation.DELETE.code())) {
eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS, new NullEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new NullEventDataDeserializer());
}
else {
eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
}
client.setEventDeserializer(eventDeserializer);
// Set up for JMX ...

View File

@ -985,7 +985,8 @@ public static final Field MASK_COLUMN(int length) {
CommonConnectorConfig.SNAPSHOT_DELAY_MS,
CommonConnectorConfig.SNAPSHOT_FETCH_SIZE,
CommonConnectorConfig.TOMBSTONES_ON_DELETE, ENABLE_TIME_ADJUSTER,
CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION);
CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION,
CommonConnectorConfig.SKIPPED_OPERATIONS);
/**
* The set of {@link Field}s that are included in the {@link #configDef() configuration definition}. This includes
@ -1044,7 +1045,7 @@ public SnapshotNewTables getSnapshotNewTables() {
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID, SERVER_ID_OFFSET,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER);
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER, CommonConnectorConfig.SKIPPED_OPERATIONS);
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY,

View File

@ -440,6 +440,45 @@ public void shouldHandleMySQLTimeCorrectly() throws Exception {
assertThat(c5Time).isEqualTo(Duration.ofHours(-838).minusMinutes(59).minusSeconds(58).minusNanos(999999000));
}
@Test
public void shouldNotConsumeAllEventsFromDatabaseWithSkippedOperations() throws Exception {
// Define configuration that will ignore all events from MySQL source.
config = simpleConfig()
.with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c")
.with(MySqlConnectorConfig.SNAPSHOT_MODE, "never")
.build();
Filters filters = new Filters.Builder(config).build();
context = new MySqlTaskContext(config, filters);
context.start();
context.initializeHistory();
reader = new BinlogReader("binlog", context, new AcceptAllPredicate());
// Start reading the binlog ...
reader.start();
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
final Connection jdbc = connection.connection();
connection.setAutoCommit(false);
final Statement statement = jdbc.createStatement();
statement.executeUpdate("INSERT INTO customers VALUES(default, 'first', 'first', 'first')");
jdbc.setSavepoint();
statement.executeUpdate("INSERT INTO customers VALUES(default, 'second', 'second', 'second')");
jdbc.commit();
connection.query("SELECT * FROM customers", rs -> {
if (Testing.Print.isEnabled()) {
connection.print(rs);
}
});
connection.setAutoCommit(true);
}
}
Collection customers = store.collection(DATABASE.getDatabaseName(), "customers");
assertThat(customers.numberOfTombstones()).isEqualTo(2);
}
@Test(expected = ConnectException.class)
public void shouldFailOnSchemaInconsistency() throws Exception {
inconsistentSchema(null);

View File

@ -342,4 +342,9 @@ endif::cdc-product[]
|`true` when connector configuration explicitly specifies the `key.converter` or `value.converter` parameters to use Avro, otherwise defaults to `false`.
|Whether field names will be sanitized to adhere to Avro naming requirements.
|`skipped.operations`
|
| comma-separated list of oplog operations that will be skipped during streaming.
The operations include: `c` for inserts, `u` for updates, and `d` for deletes.
By default, no operations are skipped.
|===