DBZ-1613 Fix schema snapshotting of unlocked tables
This commit is contained in:
parent
f7de97b6d7
commit
eb27cc5c54
@ -15,6 +15,7 @@
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -59,6 +60,7 @@ public class SnapshotReader extends AbstractReader {
|
||||
private RecordRecorder recorder;
|
||||
private final SnapshotReaderMetrics metrics;
|
||||
private ExecutorService executorService;
|
||||
private final boolean useGlobalLock;
|
||||
|
||||
private final MySqlConnectorConfig.SnapshotLockingMode snapshotLockingMode;
|
||||
|
||||
@ -69,11 +71,25 @@ public class SnapshotReader extends AbstractReader {
|
||||
* @param context the task context in which this reader is running; may not be null
|
||||
*/
|
||||
public SnapshotReader(String name, MySqlTaskContext context) {
|
||||
this(name, context, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a snapshot reader that can use global locking onyl optionally.
|
||||
* Used mostly for testing.
|
||||
*
|
||||
* @param name the name of this reader; may not be null
|
||||
* @param context the task context in which this reader is running; may not be null
|
||||
* @param useGlobalLock {@code false} to simulate cloud (Amazon RDS) restrictions
|
||||
*/
|
||||
SnapshotReader(String name, MySqlTaskContext context, boolean useGlobalLock) {
|
||||
super(name, context, null);
|
||||
|
||||
this.includeData = context.snapshotMode().includeData();
|
||||
this.snapshotLockingMode = context.getConnectorConfig().getSnapshotLockingMode();
|
||||
recorder = this::recordRowAsRead;
|
||||
metrics = new SnapshotReaderMetrics(context, context.dbSchema(), changeEventQueueMetrics);
|
||||
this.useGlobalLock = useGlobalLock;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -227,6 +243,9 @@ protected void execute() {
|
||||
boolean isLocked = false;
|
||||
boolean isTxnStarted = false;
|
||||
boolean tableLocks = false;
|
||||
final List<TableId> tablesToSnapshotSchemaAfterUnlock = new ArrayList<>();
|
||||
Set<TableId> lockedTables = Collections.emptySet();
|
||||
|
||||
try {
|
||||
metrics.snapshotStarted();
|
||||
|
||||
@ -268,7 +287,7 @@ protected void execute() {
|
||||
if (!isRunning()) {
|
||||
return;
|
||||
}
|
||||
if (!snapshotLockingMode.equals(MySqlConnectorConfig.SnapshotLockingMode.NONE)) {
|
||||
if (!snapshotLockingMode.equals(MySqlConnectorConfig.SnapshotLockingMode.NONE) && useGlobalLock) {
|
||||
try {
|
||||
logger.info("Step 1: flush and obtain global read lock to prevent writes to database");
|
||||
sql.set("FLUSH TABLES WITH READ LOCK");
|
||||
@ -416,6 +435,7 @@ protected void execute() {
|
||||
}
|
||||
// We have the required privileges, so try to lock all of the tables we're interested in ...
|
||||
logger.info("Step {}: flush and obtain read lock for {} tables (preventing writes)", step++, knownTableIds.size());
|
||||
lockedTables = new HashSet<>(capturedTableIds);
|
||||
String tableList = capturedTableIds.stream()
|
||||
.map(tid -> quote(tid))
|
||||
.reduce((r, element) -> r + "," + element)
|
||||
@ -476,12 +496,16 @@ protected void execute() {
|
||||
if (!isRunning()) {
|
||||
break;
|
||||
}
|
||||
sql.set("SHOW CREATE TABLE " + quote(tableId));
|
||||
mysql.query(sql.get(), rs -> {
|
||||
if (rs.next()) {
|
||||
schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges);
|
||||
}
|
||||
});
|
||||
// This is to handle situation when global read lock is unavailable and tables are locked instead of it.
|
||||
// MySQL forbids access to an unlocked table when there is at least one lock held on another table.
|
||||
// Thus when we need to obtain schema even for non-monitored tables (which are not locked as we might not have access privileges)
|
||||
// we need to do it after the tables are unlocked
|
||||
if (lockedTables.isEmpty() || lockedTables.contains(tableId)) {
|
||||
readTableSchema(sql, mysql, schema, source, dbName, tableId);
|
||||
}
|
||||
else {
|
||||
tablesToSnapshotSchemaAfterUnlock.add(tableId);
|
||||
}
|
||||
}
|
||||
}
|
||||
context.makeRecord().regenerate();
|
||||
@ -734,6 +758,15 @@ protected void execute() {
|
||||
logger.info("Writes to MySQL tables prevented for a total of {}", Strings.duration(lockReleased - lockAcquired));
|
||||
}
|
||||
}
|
||||
if (!tablesToSnapshotSchemaAfterUnlock.isEmpty()) {
|
||||
logger.info("Step {}: reading table schema for non-whitelisted tables", step++);
|
||||
for (TableId tableId : tablesToSnapshotSchemaAfterUnlock) {
|
||||
if (!isRunning()) {
|
||||
break;
|
||||
}
|
||||
readTableSchema(sql, mysql, schema, source, tableId.schema(), tableId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -803,6 +836,17 @@ protected void execute() {
|
||||
}
|
||||
}
|
||||
|
||||
private void readTableSchema(final AtomicReference<String> sql, final JdbcConnection mysql,
|
||||
final MySqlSchema schema, final SourceInfo source, String dbName, TableId tableId)
|
||||
throws SQLException {
|
||||
sql.set("SHOW CREATE TABLE " + quote(tableId));
|
||||
mysql.query(sql.get(), rs -> {
|
||||
if (rs.next()) {
|
||||
schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private boolean shouldRecordTableSchema(final MySqlSchema schema, final Filters filters, TableId id) {
|
||||
return !schema.isStoreOnlyMonitoredTablesDdl() || filters.tableFilter().test(id);
|
||||
}
|
||||
|
@ -10,6 +10,8 @@ CREATE USER 'replicator' IDENTIFIED BY 'replpass';
|
||||
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator';
|
||||
CREATE USER 'snapper' IDENTIFIED BY 'snapperpass';
|
||||
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'snapper'@'%';
|
||||
CREATE USER 'cloud' IDENTIFIED BY 'cloudpass';
|
||||
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'cloud'@'%';
|
||||
GRANT ALL PRIVILEGES ON *.* TO 'mysqlreplica'@'%';
|
||||
|
||||
-- Start the GTID-based replication ...
|
||||
|
@ -10,6 +10,8 @@ CREATE USER 'replicator' IDENTIFIED BY 'replpass';
|
||||
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator';
|
||||
CREATE USER 'snapper' IDENTIFIED BY 'snapperpass';
|
||||
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'snapper'@'%';
|
||||
CREATE USER 'cloud' IDENTIFIED BY 'cloudpass';
|
||||
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'cloud'@'%';
|
||||
GRANT ALL PRIVILEGES ON *.* TO 'mysqlreplica'@'%';
|
||||
|
||||
-- Start the GTID-based replication ...
|
||||
|
@ -9,7 +9,9 @@
|
||||
CREATE USER 'replicator' IDENTIFIED BY 'replpass';
|
||||
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator';
|
||||
CREATE USER 'snapper' IDENTIFIED BY 'snapperpass';
|
||||
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'snapper'@'%';
|
||||
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'snapper'@'%';
|
||||
CREATE USER 'cloud' IDENTIFIED BY 'cloudpass';
|
||||
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'cloud'@'%';
|
||||
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
|
||||
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
|
@ -26,11 +26,13 @@
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.Configuration.Builder;
|
||||
import io.debezium.data.KeyValueStore;
|
||||
import io.debezium.data.KeyValueStore.Collection;
|
||||
import io.debezium.data.SchemaChangeHistory;
|
||||
import io.debezium.data.VerifyRecord;
|
||||
import io.debezium.heartbeat.Heartbeat;
|
||||
import io.debezium.relational.history.DatabaseHistory;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
/**
|
||||
@ -89,11 +91,31 @@ protected Configuration.Builder simpleConfig() {
|
||||
|
||||
@Test
|
||||
public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
|
||||
config = simpleConfig()
|
||||
.build();
|
||||
snapshotOfSingleDatabase(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLock() throws Exception {
|
||||
snapshotOfSingleDatabase(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyMonitoredTables() throws Exception {
|
||||
snapshotOfSingleDatabase(false, true);
|
||||
}
|
||||
|
||||
private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMonitoredTables) throws Exception {
|
||||
final Builder builder = simpleConfig();
|
||||
if (!useGlobalLock) {
|
||||
builder
|
||||
.with(MySqlConnectorConfig.USER, "cloud")
|
||||
.with(MySqlConnectorConfig.PASSWORD, "cloudpass")
|
||||
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, storeOnlyMonitoredTables);
|
||||
}
|
||||
config = builder.build();
|
||||
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
|
||||
context.start();
|
||||
reader = new SnapshotReader("snapshot", context);
|
||||
reader = new SnapshotReader("snapshot", context, useGlobalLock);
|
||||
reader.uponCompletion(completed::countDown);
|
||||
reader.generateInsertEvents();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user