DBZ-1278 Rename to non-whitelisted table sends warn
This commit is contained in:
parent
deaa5b7f4d
commit
3012524ea1
@ -52,8 +52,8 @@ public AlterTableParserListener(MySqlAntlrDdlParser parser, List<ParseTreeListen
|
||||
@Override
|
||||
public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
|
||||
final TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId());
|
||||
if (!parser.getTableFilter().isIncluded(tableId)) {
|
||||
LOG.debug("Ignoring ALTER TABLE statement for non-whitelisted table {}", tableId);
|
||||
if (parser.databaseTables().forTable(tableId) == null) {
|
||||
LOG.debug("Ignoring ALTER TABLE statement for non-recorded table {}", tableId);
|
||||
return;
|
||||
}
|
||||
tableEditor = parser.databaseTables().editTable(tableId);
|
||||
@ -72,6 +72,7 @@ public void exitAlterTable(MySqlParser.AlterTableContext ctx) {
|
||||
parser.signalAlterTable(tableEditor.tableId(), null, ctx.getParent());
|
||||
}, tableEditor);
|
||||
super.exitAlterTable(ctx);
|
||||
tableEditor = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -6,6 +6,9 @@
|
||||
|
||||
package io.debezium.connector.mysql.antlr.listener;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
|
||||
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
|
||||
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
|
||||
@ -18,6 +21,8 @@
|
||||
*/
|
||||
public class RenameTableParserListener extends MySqlParserBaseListener {
|
||||
|
||||
private final static Logger LOG = LoggerFactory.getLogger(RenameTableParserListener.class);
|
||||
|
||||
private final MySqlAntlrDdlParser parser;
|
||||
|
||||
public RenameTableParserListener(MySqlAntlrDdlParser parser) {
|
||||
@ -28,6 +33,12 @@ public RenameTableParserListener(MySqlAntlrDdlParser parser) {
|
||||
public void enterRenameTableClause(MySqlParser.RenameTableClauseContext ctx) {
|
||||
TableId oldTable = parser.parseQualifiedTableId(ctx.tableName(0).fullId());
|
||||
TableId newTable = parser.parseQualifiedTableId(ctx.tableName(1).fullId());
|
||||
if (parser.getTableFilter().isIncluded(oldTable) && !parser.getTableFilter().isIncluded(newTable)) {
|
||||
LOG.warn("Renaming whitelisted table {} to non-whitelisted table {}, this can lead to schema inconsistency", oldTable, newTable);
|
||||
}
|
||||
else if (!parser.getTableFilter().isIncluded(oldTable) && parser.getTableFilter().isIncluded(newTable)) {
|
||||
LOG.warn("Renaming non-whitelisted table {} to whitelisted table {}, this can lead to schema inconsistency", oldTable, newTable);
|
||||
}
|
||||
parser.databaseTables().renameTable(oldTable, newTable);
|
||||
parser.signalAlterTable(newTable, oldTable, ctx);
|
||||
super.enterRenameTableClause(ctx);
|
||||
|
@ -55,10 +55,10 @@ public void beforeEach() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetExceptionOnParseAlterStatementsWithoutCreate() {
|
||||
public void shouldNotGetExceptionOnParseAlterStatementsWithoutCreate() {
|
||||
String ddl = "ALTER TABLE foo ADD COLUMN c bigint;" + System.lineSeparator();
|
||||
parser.parse(ddl, tables);
|
||||
assertThat(((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size()).isEqualTo(1);
|
||||
assertThat(((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size()).isEqualTo(0);
|
||||
assertThat(tables.size()).isEqualTo(0);
|
||||
}
|
||||
|
||||
@ -322,13 +322,14 @@ public void shouldParseTestStatements() {
|
||||
parser.parse(readFile("ddl/mysql-test-statements.ddl"), tables);
|
||||
Testing.print(tables);
|
||||
assertThat(tables.size()).isEqualTo(6);
|
||||
int numberOfAlteredTablesWhichDoesNotExists = ((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size();
|
||||
int numberOfIndexesOnNonExistingTables = ((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size();
|
||||
int numberOfAlteredTablesWhichDoesNotExists = 10;
|
||||
// legacy parser was signaling all created index
|
||||
// antlr is parsing only those, which will make any model changes
|
||||
int numberOfCreatedIndexesWhichNotMakeChangeOnTablesModel = 5;
|
||||
int numberOfAlterViewStatements = 6;
|
||||
int numberOfDroppedViews = 7;
|
||||
assertThat(listener.total()).isEqualTo(58 - numberOfAlteredTablesWhichDoesNotExists
|
||||
assertThat(listener.total()).isEqualTo(58 - numberOfAlteredTablesWhichDoesNotExists - numberOfIndexesOnNonExistingTables
|
||||
- numberOfCreatedIndexesWhichNotMakeChangeOnTablesModel + numberOfAlterViewStatements + numberOfDroppedViews);
|
||||
listener.forEach(this::printEvent);
|
||||
}
|
||||
@ -337,10 +338,11 @@ public void shouldParseTestStatements() {
|
||||
public void shouldParseSomeLinesFromCreateStatements() {
|
||||
parser.parse(readLines(189, "ddl/mysql-test-create.ddl"), tables);
|
||||
assertThat(tables.size()).isEqualTo(39);
|
||||
int numberOfAlteredTablesWhichDoesNotExists = ((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size();
|
||||
int numberOfIndexesOnNonExistingTables = ((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size();
|
||||
int numberOfAlteredTablesWhichDoesNotExists = 2;
|
||||
int numberOfCreatedIndexesWhichNotMakeChangeOnTablesModel = 43;
|
||||
assertThat(listener.total()).isEqualTo(120 - numberOfAlteredTablesWhichDoesNotExists
|
||||
- numberOfCreatedIndexesWhichNotMakeChangeOnTablesModel);
|
||||
- numberOfCreatedIndexesWhichNotMakeChangeOnTablesModel - numberOfIndexesOnNonExistingTables);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -369,7 +371,7 @@ public void shouldParseButSkipAlterTableWhenTableIsNotKnown() {
|
||||
listener.forEach(this::printEvent);
|
||||
assertThat(tables.size()).isEqualTo(1);
|
||||
|
||||
int numberOfAlteredTablesWhichDoesNotExists = ((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size();
|
||||
int numberOfAlteredTablesWhichDoesNotExists = 1;
|
||||
assertThat(listener.total()).isEqualTo(2 - numberOfAlteredTablesWhichDoesNotExists);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,186 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.junit.logging.LogInterceptor;
|
||||
import io.debezium.relational.history.DatabaseHistory;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
/**
|
||||
* @author Jiri Pechanec
|
||||
*/
|
||||
public class MySqlSchemaMigrationIT extends AbstractConnectorTest {
|
||||
|
||||
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-json.txt")
|
||||
.toAbsolutePath();
|
||||
private UniqueDatabase DATABASE = new UniqueDatabase("migration", "empty")
|
||||
.withDbHistoryPath(DB_HISTORY_PATH);
|
||||
|
||||
private Configuration config;
|
||||
|
||||
@Before
|
||||
public void beforeEach() {
|
||||
stopConnector();
|
||||
DATABASE.createAndInitialize();
|
||||
|
||||
initializeConnectorTestFramework();
|
||||
Testing.Files.delete(DB_HISTORY_PATH);
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterEach() {
|
||||
try {
|
||||
stopConnector();
|
||||
} finally {
|
||||
Testing.Files.delete(DB_HISTORY_PATH);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCorrectlyMigrateTable() throws SQLException, InterruptedException {
|
||||
// Use the DB configuration to define the connector's configuration ...
|
||||
// Breaks if table whitelist does not contain both tables
|
||||
config = DATABASE.defaultConfig()
|
||||
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
|
||||
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
|
||||
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("monitored") + "," + DATABASE.qualifiedTableName("_monitored_new"))
|
||||
.build();
|
||||
|
||||
final MySQLConnection connection = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());
|
||||
connection.execute("create table monitored (id int auto_increment primary key, value1 varchar(100), value2 int)");
|
||||
connection.execute("insert into monitored values(default, 'a1', 1)");
|
||||
|
||||
// Start the connector ...
|
||||
start(MySqlConnector.class, config);
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Consume all of the events due to startup and initialization of the database
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Testing.Debug.enable();
|
||||
|
||||
connection.execute("insert into monitored values(default, 'a2', 2)");
|
||||
connection.execute("CREATE TABLE `_monitored_new` ( `id` int(11) NOT NULL AUTO_INCREMENT, `value1` varchar(100) DEFAULT NULL, `value2` int(11) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1");
|
||||
connection.execute("ALTER TABLE `_monitored_new` drop value1");
|
||||
connection.execute("insert into _monitored_new values(default, 1)");
|
||||
connection.execute("insert into _monitored_new values(default, 2)");
|
||||
connection.execute("RENAME TABLE `monitored` TO `_monitored_old`, `_monitored_new` TO `monitored`");
|
||||
connection.execute("insert into monitored values(default, 3)");
|
||||
|
||||
records = consumeRecordsByTopic(4);
|
||||
stopConnector();
|
||||
assertThat(records).isNotNull();
|
||||
records.forEach(this::validate);
|
||||
|
||||
assertInsert(records.allRecordsInOrder().get(3), "id", 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldProcessAndWarnOnNonWhitelistedMigrateTable() throws SQLException, InterruptedException {
|
||||
// Use the DB configuration to define the connector's configuration ...
|
||||
final LogInterceptor logInterceptor = new LogInterceptor();
|
||||
|
||||
config = DATABASE.defaultConfig()
|
||||
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
|
||||
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
|
||||
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("monitored"))
|
||||
.build();
|
||||
|
||||
final MySQLConnection connection = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());
|
||||
connection.execute("create table monitored (id int auto_increment primary key, value1 varchar(100), value2 int)");
|
||||
connection.execute("insert into monitored values(default, 'a1', 1)");
|
||||
|
||||
// Start the connector ...
|
||||
start(MySqlConnector.class, config);
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Consume all of the events due to startup and initialization of the database
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Testing.Debug.enable();
|
||||
|
||||
connection.execute("insert into monitored values(default, 'a2', 2)");
|
||||
connection.execute("CREATE TABLE `_monitored_new` ( `id` int(11) NOT NULL AUTO_INCREMENT, `value1` varchar(100) DEFAULT NULL, `value2` int(11) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1");
|
||||
connection.execute("ALTER TABLE `_monitored_new` drop value1");
|
||||
connection.execute("insert into _monitored_new values(default, 1)");
|
||||
connection.execute("insert into _monitored_new values(default, 2)");
|
||||
connection.execute("RENAME TABLE `monitored` TO `_monitored_old`, `_monitored_new` TO `monitored`");
|
||||
connection.execute("insert into monitored values(default, 3)");
|
||||
|
||||
final String msg1 = "Renaming whitelisted table " + DATABASE.qualifiedTableName("monitored") + " to non-whitelisted table " + DATABASE.qualifiedTableName("_monitored_old") + ", this can lead to schema inconsistency";
|
||||
final String msg2 = "Renaming non-whitelisted table " + DATABASE.qualifiedTableName("_monitored_new") + " to whitelisted table "+ DATABASE.qualifiedTableName("monitored") + ", this can lead to schema inconsistency";
|
||||
|
||||
records = consumeRecordsByTopic(2);
|
||||
stopConnector(value -> {
|
||||
assertThat(logInterceptor.containsWarnMessage(msg1)).isTrue();
|
||||
assertThat(logInterceptor.containsWarnMessage(msg2)).isTrue();
|
||||
});
|
||||
|
||||
assertThat(records).isNotNull();
|
||||
records.forEach(this::validate);
|
||||
|
||||
assertInsert(records.allRecordsInOrder().get(1), "id", 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldWarnOnInvalidMigrateTable() throws SQLException, InterruptedException {
|
||||
// Use the DB configuration to define the connector's configuration ...
|
||||
final LogInterceptor logInterceptor = new LogInterceptor();
|
||||
|
||||
config = DATABASE.defaultConfig()
|
||||
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
|
||||
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
|
||||
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
|
||||
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("monitored"))
|
||||
.build();
|
||||
|
||||
final MySQLConnection connection = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());
|
||||
connection.execute("create table monitored (id int auto_increment primary key, value1 varchar(100), value2 int)");
|
||||
connection.execute("insert into monitored values(default, 'a1', 1)");
|
||||
|
||||
// Start the connector ...
|
||||
start(MySqlConnector.class, config);
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Consume all of the events due to startup and initialization of the database
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Testing.Debug.enable();
|
||||
|
||||
connection.execute("insert into monitored values(default, 'a2', 2)");
|
||||
connection.execute("CREATE TABLE `_monitored_new` ( `id` int(11) NOT NULL AUTO_INCREMENT, `value1` varchar(100) DEFAULT NULL, `value2` int(11) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1");
|
||||
connection.execute("ALTER TABLE `_monitored_new` drop value1");
|
||||
connection.execute("insert into _monitored_new values(default, 1)");
|
||||
connection.execute("insert into _monitored_new values(default, 2)");
|
||||
connection.execute("RENAME TABLE `monitored` TO `_monitored_old`, `_monitored_new` TO `monitored`");
|
||||
connection.execute("insert into monitored values(default, 3)");
|
||||
|
||||
final String msg1 = "Renaming whitelisted table " + DATABASE.qualifiedTableName("monitored") + " to non-whitelisted table " + DATABASE.qualifiedTableName("_monitored_old") + ", this can lead to schema inconsistency";
|
||||
final String msg2 = "Renaming non-whitelisted table " + DATABASE.qualifiedTableName("_monitored_new") + " to whitelisted table "+ DATABASE.qualifiedTableName("monitored") + ", this can lead to schema inconsistency";
|
||||
|
||||
records = consumeRecordsByTopic(2);
|
||||
stopConnector(value -> {
|
||||
assertThat(logInterceptor.containsWarnMessage(msg1)).isTrue();
|
||||
assertThat(logInterceptor.containsWarnMessage(msg2)).isTrue();
|
||||
});
|
||||
|
||||
assertThat(records).isNotNull();
|
||||
records.forEach(this::validate);
|
||||
|
||||
assertInsert(records.allRecordsInOrder().get(1), "id", 5);
|
||||
}
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
-- DATABASE: empty
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
@ -129,7 +129,7 @@ public TableEditor setPrimaryKeyNames(String... pkColumnNames) {
|
||||
public TableEditor setPrimaryKeyNames(List<String> pkColumnNames) {
|
||||
for (String pkColumnName : pkColumnNames) {
|
||||
if (!hasColumnWithName(pkColumnName)) {
|
||||
throw new IllegalArgumentException("The primary key cannot reference a non-existant column'" + pkColumnName + "'");
|
||||
throw new IllegalArgumentException("The primary key cannot reference a non-existant column'" + pkColumnName + "' in table '" + tableId() + "'");
|
||||
}
|
||||
}
|
||||
this.pkColumnNames.clear();
|
||||
|
Loading…
Reference in New Issue
Block a user