diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index 603a21421..ae507ac33 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -5,9 +5,10 @@ */ package io.debezium.connector.mysql; -import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.Predicate; import org.apache.kafka.connect.data.Schema; @@ -26,7 +27,6 @@ import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.relational.TableIdCaseInsensitiveMap; import io.debezium.relational.TableSchema; import io.debezium.relational.TableSchemaBuilder; import io.debezium.relational.Tables; @@ -63,7 +63,7 @@ public class MySqlSchema { private final AvroValidator schemaNameValidator = AvroValidator.create(logger); private final Set ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES"); private final MySqlDdlParser ddlParser; - private final Map tableSchemaByTableId; + private final SchemasByTableId tableSchemaByTableId; private final Filters filters; private final DatabaseHistory dbHistory; private final TableSchemaBuilder schemaBuilder; @@ -137,7 +137,7 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { this.skipUnparseableDDL = dbHistoryConfig.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS); - tableSchemaByTableId = tableIdCaseInsensitive ? new TableIdCaseInsensitiveMap<>(new HashMap<>()) : new HashMap<>(); + tableSchemaByTableId = new SchemasByTableId(tableIdCaseInsensitive); } protected HistoryRecordComparator historyComparator() { @@ -348,4 +348,38 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem }); return true; } + + /** + * A map of schemas by table id. Table names are stored lower-case if required as per the config. + */ + private static class SchemasByTableId { + + private final boolean tableIdCaseInsensitive; + private final ConcurrentMap values; + + public SchemasByTableId(boolean tableIdCaseInsensitive) { + this.tableIdCaseInsensitive = tableIdCaseInsensitive; + this.values = new ConcurrentHashMap<>(); + } + + public void clear() { + values.clear(); + } + + public TableSchema remove(TableId tableId) { + return values.remove(toLowerCaseIfNeeded(tableId)); + } + + public TableSchema get(TableId tableId) { + return values.get(toLowerCaseIfNeeded(tableId)); + } + + public TableSchema put(TableId tableId, TableSchema updated) { + return values.put(toLowerCaseIfNeeded(tableId), updated); + } + + private TableId toLowerCaseIfNeeded(TableId tableId) { + return tableIdCaseInsensitive ? tableId.toLowercase() : tableId; + } + } } diff --git a/debezium-core/src/main/java/io/debezium/relational/TableIdCaseInsensitiveMap.java b/debezium-core/src/main/java/io/debezium/relational/TableIdCaseInsensitiveMap.java deleted file mode 100644 index 13e44339e..000000000 --- a/debezium-core/src/main/java/io/debezium/relational/TableIdCaseInsensitiveMap.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.relational; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; - -/** - * - * @author Jiri Pechanec - * - * A facade for Map implementation that accepts TableId as key and stores the key with - * table name converted to lower case. - */ -public class TableIdCaseInsensitiveMap implements Map { - - private Map delegate; - - public TableIdCaseInsensitiveMap(Map map) { - delegate = map; - } - - @Override - public int size() { - return delegate.size(); - } - - @Override - public boolean isEmpty() { - return delegate.isEmpty(); - } - - @Override - public boolean containsKey(Object key) { - return delegate.containsKey(((TableId)key).toLowercase()); - } - - @Override - public boolean containsValue(Object value) { - return delegate.containsValue(value); - } - - @Override - public V get(Object key) { - return delegate.get(((TableId)key).toLowercase()); - } - - @Override - public V put(TableId key, V value) { - return delegate.put(((TableId)key).toLowercase(), value); - } - - @Override - public V remove(Object key) { - return delegate.remove(((TableId)key).toLowercase()); - } - - @Override - public void clear() { - delegate.clear(); - } - - @Override - public Set keySet() { - return delegate.keySet(); - } - - @Override - public Collection values() { - return delegate.values(); - } - - @Override - public Set> entrySet() { - return delegate.entrySet(); - } - - @Override - public void putAll(Map m) { - m.entrySet().stream().forEach(x -> put(((TableId)x.getKey()).toLowercase(), x.getValue())); - } - - public int hashCode() { - return delegate.hashCode(); - } - - public boolean equals(Object o) { - return delegate.equals(o); - } -} diff --git a/debezium-core/src/main/java/io/debezium/relational/TableIdCaseInsensitiveSet.java b/debezium-core/src/main/java/io/debezium/relational/TableIdCaseInsensitiveSet.java deleted file mode 100644 index d97d2c548..000000000 --- a/debezium-core/src/main/java/io/debezium/relational/TableIdCaseInsensitiveSet.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.relational; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; - -/** - * - * @author Jiri Pechanec - * - * A facade for Set implementation that accepts TableId and stores it with - * table name converted to lower case. - */ -public class TableIdCaseInsensitiveSet implements Set { - - public TableIdCaseInsensitiveSet(Set delegate) { - this.delegate = delegate; - } - - private Set delegate; - - @Override - public int size() { - return delegate.size(); - } - - @Override - public boolean isEmpty() { - return delegate.isEmpty(); - } - - @Override - public boolean contains(Object o) { - return delegate.contains(((TableId)o).toLowercase()); - } - - @Override - public Iterator iterator() { - return delegate.iterator(); - } - - @Override - public Object[] toArray() { - return delegate.toArray(); - } - - @Override - public T[] toArray(T[] a) { - return delegate.toArray(a); - } - - @Override - public boolean add(TableId e) { - return delegate.add(((TableId)e).toLowercase()); - } - - @Override - public boolean remove(Object o) { - return delegate.remove(((TableId)o).toLowercase()); - } - - @Override - public boolean containsAll(Collection c) { - return delegate.containsAll(c); - } - - @Override - public boolean addAll(Collection c) { - return delegate.addAll(c); - } - - @Override - public boolean retainAll(Collection c) { - return delegate.retainAll(c); - } - - @Override - public boolean removeAll(Collection c) { - return delegate.removeAll(c); - } - - @Override - public void clear() { - delegate.clear(); - } - - public int hashCode() { - return delegate.hashCode(); - } - - public boolean equals(Object o) { - return delegate.equals(o); - } -} diff --git a/debezium-core/src/main/java/io/debezium/relational/Tables.java b/debezium-core/src/main/java/io/debezium/relational/Tables.java index c68495157..4e8b5dacf 100644 --- a/debezium-core/src/main/java/io/debezium/relational/Tables.java +++ b/debezium-core/src/main/java/io/debezium/relational/Tables.java @@ -7,9 +7,10 @@ import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; @@ -78,8 +79,8 @@ public static interface ColumnNameFilter { } private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant(); - private final Map tablesByTableId; - private final Set changes; + private final TablesById tablesByTableId; + private final TableIds changes; private final boolean tableIdCaseInsensitive; /** @@ -89,13 +90,8 @@ public static interface ColumnNameFilter { */ public Tables(boolean tableIdCaseInsensitive) { this.tableIdCaseInsensitive = tableIdCaseInsensitive; - if (tableIdCaseInsensitive) { - tablesByTableId = new TableIdCaseInsensitiveMap<>(new ConcurrentHashMap<>()); - changes = new TableIdCaseInsensitiveSet(new HashSet<>()); - } else { - tablesByTableId = new ConcurrentHashMap<>(); - changes = new HashSet<>(); - } + this.tablesByTableId = new TablesById(tableIdCaseInsensitive); + this.changes = new TableIds(tableIdCaseInsensitive); } /** @@ -126,7 +122,7 @@ public int size() { public Set drainChanges() { return lock.write(() -> { - Set result = new HashSet<>(changes); + Set result = changes.toSet(); changes.clear(); return result; }); @@ -259,7 +255,7 @@ public Table forTable(String catalogName, String schemaName, String tableName) { * @return the immutable set of table identifiers; never null */ public Set tableIds() { - return lock.read(() -> Collect.unmodifiableSet(tablesByTableId.keySet())); + return lock.read(() -> Collect.unmodifiableSet(tablesByTableId.ids())); } /** @@ -333,4 +329,111 @@ public String toString() { return sb.toString(); }); } + + /** + * A map of tables by id. Table names are stored lower-case if required as per the config. + */ + private static class TablesById { + + private final boolean tableIdCaseInsensitive; + private final ConcurrentMap values; + + public TablesById(boolean tableIdCaseInsensitive) { + this.tableIdCaseInsensitive = tableIdCaseInsensitive; + this.values = new ConcurrentHashMap<>(); + } + + public Set ids() { + return values.keySet(); + } + + boolean isEmpty() { + return values.isEmpty(); + } + + public void putAll(TablesById tablesByTableId) { + if(tableIdCaseInsensitive) { + tablesByTableId.values.entrySet() + .forEach(e -> put(e.getKey().toLowercase(), e.getValue())); + } + else { + values.putAll(tablesByTableId.values); + } + } + + public TableImpl remove(TableId tableId) { + return values.remove(toLowerCaseIfNeeded(tableId)); + } + + public TableImpl get(TableId tableId) { + return values.get(toLowerCaseIfNeeded(tableId)); + } + + public Table put(TableId tableId, TableImpl updated) { + return values.put(toLowerCaseIfNeeded(tableId), updated); + } + + int size() { + return values.size(); + } + + void forEach(BiConsumer action) { + values.forEach(action); + } + + private TableId toLowerCaseIfNeeded(TableId tableId) { + return tableIdCaseInsensitive ? tableId.toLowercase() : tableId; + } + + @Override + public int hashCode() { + return values.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TablesById other = (TablesById) obj; + + return values.equals(other.values); + } + } + + /** + * A set of table ids. Table names are stored lower-case if required as per the config. + */ + private static class TableIds { + + private final boolean tableIdCaseInsensitive; + private final Set values; + + public TableIds(boolean tableIdCaseInsensitive) { + this.tableIdCaseInsensitive = tableIdCaseInsensitive; + this.values = new HashSet<>(); + } + + public void add(TableId tableId) { + values.add(toLowerCaseIfNeeded(tableId)); + } + + public Set toSet() { + return new HashSet<>(values); + } + + public void clear() { + values.clear(); + } + + private TableId toLowerCaseIfNeeded(TableId tableId) { + return tableIdCaseInsensitive ? tableId.toLowercase() : tableId; + } + } }