DBZ-5075: Remove unused legacy MySQL code

This commit is contained in:
Sergei Morozov 2022-04-28 12:27:34 -07:00 committed by Jiri Pechanec
parent 1e8181e5c4
commit f6546e4d87
8 changed files with 4 additions and 282 deletions

View File

@ -1,23 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import org.apache.kafka.connect.source.SourceRecord;
/**
* A predicate invoked by {@link Reader} implementations in order to determine whether they should continue with
* processing records or not.
*
* @author Gunnar Morling
*/
@FunctionalInterface
public interface HaltingPredicate {
/**
* Whether this record should be processed by the calling reader or not.
*/
boolean accepts(SourceRecord record);
}

View File

@ -30,7 +30,6 @@
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.ddl.DdlParserListener.Event;
import io.debezium.relational.ddl.DdlParserListener.SetVariableEvent;
@ -41,7 +40,6 @@
import io.debezium.relational.ddl.DdlParserListener.TableIndexCreatedEvent;
import io.debezium.relational.ddl.DdlParserListener.TableIndexDroppedEvent;
import io.debezium.relational.ddl.DdlParserListener.TableIndexEvent;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.schema.TopicSelector;
@ -56,16 +54,6 @@
* the {@link Tables table definitions} and the Kafka Connect {@link #schemaFor(TableId) Schema}s for each table, where the
* {@link Schema} excludes any columns that have been {@link MySqlConnectorConfig#COLUMN_EXCLUDE_LIST specified} in the
* configuration.
* <p>
* The history is changed by {@link #applyDdl(SourceInfo, String, String, DatabaseStatementStringConsumer) applying DDL
* statements}, and every change is {@link DatabaseHistory persisted} as defined in the supplied {@link MySqlConnectorConfig MySQL
* connector configuration}. This component can be reconstructed (e.g., on connector restart) and the history
* {@link #loadHistory(SourceInfo) loaded} from persisted storage.
* <p>
* Note that when {@link #applyDdl(SourceInfo, String, String, DatabaseStatementStringConsumer) applying DDL statements}, the
* caller is able to supply a {@link DatabaseStatementStringConsumer consumer function} that will be called with the DDL
* statements and the database to which they apply, grouped by database names. However, these will only be called based when the
* databases are included by the database filters defined in the {@link MySqlConnectorConfig MySQL connector configuration}.
*
* @author Randall Hauch
*/

View File

@ -1,97 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.ddl;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParserListener.EventType;
public class DdlChangesTest {
private DdlChanges changes;
private DdlParser parser;
private Tables tables;
@Before
public void beforeEach() {
parser = new MySqlAntlrDdlParser();
changes = parser.getDdlChanges();
tables = new Tables();
}
@Test
public void shouldParseMultipleStatementsWithDefaultDatabase() {
parser.setCurrentSchema("mydb");
String ddl = "CREATE TABLE foo ( " + System.lineSeparator()
+ " c1 INTEGER NOT NULL, " + System.lineSeparator()
+ " c2 VARCHAR(22) " + System.lineSeparator()
+ "); " + System.lineSeparator()
+ "-- This is a comment" + System.lineSeparator()
+ "DROP TABLE foo;" + System.lineSeparator();
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // table created and dropped
changes.groupEventsByDatabase((dbName, list) -> {
assertThat(dbName).isEqualTo("mydb");
assertThat(list.size()).isEqualTo(2);
assertThat(list.get(0).type()).isEqualTo(EventType.CREATE_TABLE);
assertThat(list.get(1).type()).isEqualTo(EventType.DROP_TABLE);
});
}
@Test
public void shouldParseMultipleStatementsWithFullyQualifiedDatabase() {
parser.setCurrentSchema("mydb");
String ddl = "CREATE TABLE other.foo ( " + System.lineSeparator()
+ " c1 INTEGER NOT NULL, " + System.lineSeparator()
+ " c2 VARCHAR(22) " + System.lineSeparator()
+ "); " + System.lineSeparator()
+ "-- This is a comment" + System.lineSeparator()
+ "DROP TABLE other.foo;" + System.lineSeparator();
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // table created and dropped
changes.groupEventsByDatabase((dbName, list) -> {
assertThat(dbName).isEqualTo("other");
assertThat(list.size()).isEqualTo(2);
assertThat(list.get(0).type()).isEqualTo(EventType.CREATE_TABLE);
assertThat(list.get(1).type()).isEqualTo(EventType.DROP_TABLE);
});
}
@Test
public void shouldParseMultipleStatementsWithNoCurrentSchemaAndFullyQualifiedDatabase() {
String ddl = "CREATE TABLE other.foo ( " + System.lineSeparator()
+ " c1 INTEGER NOT NULL, " + System.lineSeparator()
+ " c2 VARCHAR(22) " + System.lineSeparator()
+ "); " + System.lineSeparator()
+ "-- This is a comment" + System.lineSeparator()
+ "DROP TABLE other.foo;" + System.lineSeparator();
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // table created and dropped
for (int i = 0; i != 5; ++i) {
changes.groupEventsByDatabase((dbName, list) -> {
assertThat(dbName).isEqualTo("other");
assertThat(list.size()).isEqualTo(2);
assertThat(list.get(0).type()).isEqualTo(EventType.CREATE_TABLE);
assertThat(list.get(1).type()).isEqualTo(EventType.DROP_TABLE);
});
}
changes.reset();
changes.groupEventsByDatabase((dbName, list) -> {
fail("Should not have any changes");
});
}
}

View File

@ -26,7 +26,6 @@
*/
public abstract class AbstractDdlParser implements DdlParser {
private final String terminator;
protected final boolean skipViews;
protected final boolean skipComments;
protected DdlChanges ddlChanges;
@ -36,26 +35,15 @@ public abstract class AbstractDdlParser implements DdlParser {
private String currentSchema = null;
/**
* Create a new parser that uses the supplied {@link DataTypeParser}, but that does not include view definitions.
* Create a new parser.
*
* @param terminator the terminator character sequence; may be null if the default terminator ({@code ;}) should be used
*/
public AbstractDdlParser(String terminator) {
this(terminator, false, false);
}
/**
* Create a new parser that uses the supplied {@link DataTypeParser}.
*
* @param terminator the terminator character sequence; may be null if the default terminator ({@code ;}) should be used
* @param includeViews {@code true} if view definitions should be included, or {@code false} if they should be skipped
* @param includeComments {@code true} if table and column's comment definitions should be included, or {@code false} if they should be skipped
*/
public AbstractDdlParser(String terminator, boolean includeViews, boolean includeComments) {
this.terminator = terminator != null ? terminator : ";";
public AbstractDdlParser(boolean includeViews, boolean includeComments) {
this.skipViews = !includeViews;
this.skipComments = !includeComments;
this.ddlChanges = new DdlChanges(terminator);
this.ddlChanges = new DdlChanges();
this.systemVariables = createNewSystemVariablesInstance();
}
@ -71,11 +59,6 @@ public void setCurrentDatabase(String databaseName) {
this.currentSchema = databaseName;
}
@Override
public final String terminator() {
return terminator;
}
@Override
public DdlChanges getDdlChanges() {
return ddlChanges;

View File

@ -24,26 +24,9 @@
@NotThreadSafe
public class DdlChanges implements DdlParserListener {
private final String terminator;
private final List<Event> events = new ArrayList<>();
private final Set<String> databaseNames = new HashSet<>();
/**
* Create a new changes object with ';' as the terminator token.
*/
public DdlChanges() {
this(null);
}
/**
* Create a new changes object with the designated terminator token.
*
* @param terminator the token used to terminate each statement; may be null
*/
public DdlChanges(String terminator) {
this.terminator = terminator != null ? terminator : ";";
}
/**
* Clear all accumulated changes.
*
@ -61,76 +44,6 @@ public void handle(Event event) {
databaseNames.add(getDatabase(event));
}
/**
* Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
* but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
* @param consumer the consumer
*/
public void groupStatementStringsByDatabase(DatabaseStatementStringConsumer consumer) {
groupEventsByDatabase((DatabaseEventConsumer) (dbName, eventList) -> {
final StringBuilder statements = new StringBuilder();
final Set<TableId> tables = new HashSet<>();
eventList.forEach(event -> {
statements.append(event.statement());
statements.append(terminator);
addTable(tables, event);
});
consumer.consume(dbName, tables, statements.toString());
});
}
private void addTable(final Set<TableId> tables, Event event) {
if (event instanceof TableEvent) {
tables.add(((TableEvent) event).tableId());
}
}
/**
* Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
* but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
* @param consumer the consumer
*/
public void groupStatementsByDatabase(DatabaseStatementConsumer consumer) {
groupEventsByDatabase((DatabaseEventConsumer) (dbName, eventList) -> {
List<String> statements = new ArrayList<>();
final Set<TableId> tables = new HashSet<>();
eventList.forEach(event -> {
statements.add(event.statement());
addTable(tables, event);
});
consumer.consume(dbName, tables, statements);
});
}
/**
* Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
* but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
* @param consumer the consumer
*/
public void groupEventsByDatabase(DatabaseEventConsumer consumer) {
if (isEmpty()) {
return;
}
if (databaseNames.size() <= 1) {
consumer.consume(databaseNames.iterator().next(), events);
return;
}
List<Event> dbEvents = new ArrayList<>();
String currentDatabase = null;
for (Event event : events) {
String dbName = getDatabase(event);
if (currentDatabase == null || dbName.equals(currentDatabase)) {
currentDatabase = dbName;
// Accumulate the statement ...
dbEvents.add(event);
}
else {
// Submit the statements ...
consumer.consume(currentDatabase, dbEvents);
}
}
}
/**
* Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
* but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
@ -197,10 +110,6 @@ public boolean isEmpty() {
return events.isEmpty();
}
public boolean applyToMoreDatabasesThan(String name) {
return databaseNames.contains(name) ? databaseNames.size() > 1 : databaseNames.size() > 0;
}
@Override
public String toString() {
return events.toString();
@ -210,14 +119,6 @@ public static interface DatabaseEventConsumer {
void consume(String databaseName, List<Event> events);
}
public static interface DatabaseStatementConsumer {
void consume(String databaseName, Set<TableId> tableList, List<String> ddlStatements);
}
public static interface DatabaseStatementStringConsumer {
void consume(String databaseName, Set<TableId> tableList, String ddlStatements);
}
/**
* @return true if any event stored is one of
* <ul>

View File

@ -39,12 +39,5 @@ public interface DdlParser {
DdlChanges getDdlChanges();
/**
* The token used to terminate a DDL statement.
*
* @return the terminating token; never null
*/
String terminator();
SystemVariables systemVariables();
}

View File

@ -5,8 +5,6 @@
*/
package io.debezium.relational.history;
import java.util.function.BiFunction;
import io.debezium.document.Document;
/**
@ -23,23 +21,6 @@ public class HistoryRecordComparator {
*/
public static final HistoryRecordComparator INSTANCE = new HistoryRecordComparator();
/**
* Create a {@link HistoryRecordComparator} that requires identical sources but will use the supplied function to compare
* positions.
*
* @param positionComparator the non-null function that returns {@code true} if the first position is at or before
* the second position or {@code false} otherwise
* @return the comparator instance; never null
*/
public static HistoryRecordComparator usingPositions(BiFunction<Document, Document, Boolean> positionComparator) {
return new HistoryRecordComparator() {
@Override
protected boolean isPositionAtOrBefore(Document position1, Document position2) {
return positionComparator.apply(position1, position2);
}
};
}
/**
* Determine if the first {@link HistoryRecord} is at the same or earlier point in time than the second {@link HistoryRecord}.
*

View File

@ -54,12 +54,8 @@ public abstract class AntlrDdlParser<L extends Lexer, P extends Parser> extends
protected Tables databaseTables;
protected DataTypeResolver dataTypeResolver;
public AntlrDdlParser(boolean throwErrorsFromTreeWalk) {
this(throwErrorsFromTreeWalk, false, false);
}
public AntlrDdlParser(boolean throwErrorsFromTreeWalk, boolean includeViews, boolean includeComments) {
super(";", includeViews, includeComments);
super(includeViews, includeComments);
this.throwErrorsFromTreeWalk = throwErrorsFromTreeWalk;
}