DBZ-2580 Fix "The primary key cannot reference a non-existant column" error from MySQL DDL parser when CREATE TABLE statement starts with a primary key definition like "CREATE TABLE Products (PRIMARY KEY (id), ..." where the referenced primary key column is not yet defined

This commit is contained in:
rkerner 2020-11-10 17:39:12 +01:00 committed by Jiri Pechanec
parent dca04130c2
commit 9c5db0cd2f
4 changed files with 28 additions and 38 deletions

View File

@ -4,7 +4,8 @@
-- Create and populate our products using a single insert with many rows
CREATE TABLE Products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
PRIMARY KEY (id),
id INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT

View File

@ -6,15 +6,20 @@
package io.debezium.relational;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TableEditorImpl implements TableEditor {
private static final Logger LOGGER = LoggerFactory.getLogger(TableEditorImpl.class);
private TableId id;
private LinkedHashMap<String, Column> sortedColumns = new LinkedHashMap<>();
private final List<String> pkColumnNames = new ArrayList<>();
@ -83,7 +88,6 @@ protected void add(Column defn) {
public TableEditor setColumns(Column... columns) {
sortedColumns.clear();
addColumns(columns);
updatePrimaryKeys();
assert positionsAreValid();
return this;
}
@ -92,47 +96,30 @@ public TableEditor setColumns(Column... columns) {
public TableEditor setColumns(Iterable<Column> columns) {
sortedColumns.clear();
addColumns(columns);
updatePrimaryKeys();
assert positionsAreValid();
return this;
}
protected void updatePrimaryKeys() {
// table does not have any primary key, no need to update
if (uniqueValues) {
return;
}
Iterator<String> nameIter = this.pkColumnNames.iterator();
while (nameIter.hasNext()) {
String pkColumnName = nameIter.next();
if (!hasColumnWithName(pkColumnName)) {
nameIter.remove();
}
if (!uniqueValues) {
// table does have any primary key --> we need to remove it
this.pkColumnNames.removeIf(pkColumnName -> {
final boolean pkColumnDoesNotExists = !hasColumnWithName(pkColumnName);
if (pkColumnDoesNotExists) {
LOGGER.warn("The column \"" + pkColumnName + "\" is referenced as PRIMARY KEY, but a matching column is not defined in table \"" + tableId() + "\"!");
}
return pkColumnDoesNotExists;
});
}
}
@Override
public TableEditor setPrimaryKeyNames(String... pkColumnNames) {
for (String pkColumnName : pkColumnNames) {
if (!hasColumnWithName(pkColumnName)) {
throw new IllegalArgumentException("The primary key cannot reference a non-existant column'" + pkColumnName + "'");
}
}
uniqueValues = false;
this.pkColumnNames.clear();
for (String pkColumnName : pkColumnNames) {
this.pkColumnNames.add(pkColumnName);
}
return this;
return setPrimaryKeyNames(Arrays.asList(pkColumnNames));
}
@Override
public TableEditor setPrimaryKeyNames(List<String> pkColumnNames) {
for (String pkColumnName : pkColumnNames) {
if (!hasColumnWithName(pkColumnName)) {
throw new IllegalArgumentException("The primary key cannot reference a non-existant column'" + pkColumnName + "' in table '" + tableId() + "'");
}
}
this.pkColumnNames.clear();
this.pkColumnNames.addAll(pkColumnNames);
uniqueValues = false;
@ -188,12 +175,6 @@ public TableEditor reorderColumn(String columnName, String afterColumnName) {
throw new IllegalArgumentException("No column with name '" + columnName + "'");
}
Column afterColumn = afterColumnName == null ? null : columnWithName(afterColumnName);
if (afterColumn != null && (afterColumn.position() + 1) == columnToMove.position()) {
// nothing to do ...
}
else if (afterColumn == null && columnToMove.position() == 1) {
// nothing to do ...
}
if (afterColumn != null && afterColumn.position() == sortedColumns.size()) {
// Just append ...
sortedColumns.remove(columnName);
@ -274,6 +255,7 @@ public Table create() {
column = column.edit().charsetNameOfTable(defaultCharsetName).create();
columns.add(column);
});
updatePrimaryKeys();
return new TableImpl(id, columns, primaryKeyColumnNames(), defaultCharsetName);
}
}

View File

@ -6,13 +6,17 @@
package io.debezium.relational;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import java.sql.Types;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import io.debezium.doc.FixFor;
public class TableEditorTest {
private final TableId id = new TableId("catalog", "schema", "table");
@ -47,14 +51,16 @@ public void shouldCreateTableWhenEditorHasIdButNoColumns() {
assertThat(table.primaryKeyColumnNames()).isEmpty();
}
@Test(expected = IllegalArgumentException.class)
@Test
@FixFor("DBZ-2580")
public void shouldNotAllowAddingPrimaryKeyColumnWhenNotFound() {
editor.tableId(id);
editor.setPrimaryKeyNames("C1", "WOOPS");
Column c1 = columnEditor.name("C1").type("VARCHAR").jdbcType(Types.VARCHAR).length(10).position(1).create();
Column c2 = columnEditor.name("C2").type("NUMBER").jdbcType(Types.NUMERIC).length(5).position(1).create();
Column c3 = columnEditor.name("C3").type("DATE").jdbcType(Types.DATE).position(1).create();
editor.addColumns(c1, c2, c3);
editor.setPrimaryKeyNames("C1", "WOOPS");
assertEquals(Arrays.asList("C1"), editor.create().primaryKeyColumnNames());
}
@Test

View File

@ -34,6 +34,7 @@ create table add_test(col1 varchar(255), col2 int, col3 int);
create table blob_test(id int, col1 blob(45));
CREATE TABLE `user_account` ( `id1` bigint(20) unsigned NOT NULL DEFAULT nextval(`useraccount`.`user_account_id_seq`));
create table žluťoučký (kůň int);
CREATE TABLE staff (PRIMARY KEY (staff_num), staff_num INT(5) NOT NULL, first_name VARCHAR(100) NOT NULL, pens_in_drawer INT(2) NOT NULL, CONSTRAINT pens_in_drawer_range CHECK(pens_in_drawer BETWEEN 1 AND 99));
#end
#begin
-- Rename table