DBZ-392 Making table map and set dedicated classes just with the needed API

This commit is contained in:
Gunnar Morling 2018-01-15 12:26:36 +01:00
parent 9f0f0fe5d4
commit 4d1d016acb
4 changed files with 153 additions and 209 deletions

View File

@ -5,9 +5,10 @@
*/ */
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
@ -26,7 +27,6 @@
import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Table; import io.debezium.relational.Table;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.TableIdCaseInsensitiveMap;
import io.debezium.relational.TableSchema; import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder; import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables; import io.debezium.relational.Tables;
@ -63,7 +63,7 @@ public class MySqlSchema {
private final AvroValidator schemaNameValidator = AvroValidator.create(logger); private final AvroValidator schemaNameValidator = AvroValidator.create(logger);
private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES"); private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES");
private final MySqlDdlParser ddlParser; private final MySqlDdlParser ddlParser;
private final Map<TableId, TableSchema> tableSchemaByTableId; private final SchemasByTableId tableSchemaByTableId;
private final Filters filters; private final Filters filters;
private final DatabaseHistory dbHistory; private final DatabaseHistory dbHistory;
private final TableSchemaBuilder schemaBuilder; private final TableSchemaBuilder schemaBuilder;
@ -137,7 +137,7 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
this.skipUnparseableDDL = dbHistoryConfig.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS); this.skipUnparseableDDL = dbHistoryConfig.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS);
tableSchemaByTableId = tableIdCaseInsensitive ? new TableIdCaseInsensitiveMap<>(new HashMap<>()) : new HashMap<>(); tableSchemaByTableId = new SchemasByTableId(tableIdCaseInsensitive);
} }
protected HistoryRecordComparator historyComparator() { protected HistoryRecordComparator historyComparator() {
@ -348,4 +348,38 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
}); });
return true; return true;
} }
/**
* A map of schemas by table id. Table names are stored lower-case if required as per the config.
*/
private static class SchemasByTableId {
private final boolean tableIdCaseInsensitive;
private final ConcurrentMap<TableId, TableSchema> values;
public SchemasByTableId(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
this.values = new ConcurrentHashMap<>();
}
public void clear() {
values.clear();
}
public TableSchema remove(TableId tableId) {
return values.remove(toLowerCaseIfNeeded(tableId));
}
public TableSchema get(TableId tableId) {
return values.get(toLowerCaseIfNeeded(tableId));
}
public TableSchema put(TableId tableId, TableSchema updated) {
return values.put(toLowerCaseIfNeeded(tableId), updated);
}
private TableId toLowerCaseIfNeeded(TableId tableId) {
return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}
}
} }

View File

@ -1,94 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
*
* @author Jiri Pechanec
*
* A facade for Map implementation that accepts TableId as key and stores the key with
* table name converted to lower case.
*/
public class TableIdCaseInsensitiveMap<V> implements Map<TableId, V> {
private Map<TableId, V> delegate;
public TableIdCaseInsensitiveMap(Map<TableId, V> map) {
delegate = map;
}
@Override
public int size() {
return delegate.size();
}
@Override
public boolean isEmpty() {
return delegate.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return delegate.containsKey(((TableId)key).toLowercase());
}
@Override
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}
@Override
public V get(Object key) {
return delegate.get(((TableId)key).toLowercase());
}
@Override
public V put(TableId key, V value) {
return delegate.put(((TableId)key).toLowercase(), value);
}
@Override
public V remove(Object key) {
return delegate.remove(((TableId)key).toLowercase());
}
@Override
public void clear() {
delegate.clear();
}
@Override
public Set<TableId> keySet() {
return delegate.keySet();
}
@Override
public Collection<V> values() {
return delegate.values();
}
@Override
public Set<Entry<TableId, V>> entrySet() {
return delegate.entrySet();
}
@Override
public void putAll(Map<? extends TableId, ? extends V> m) {
m.entrySet().stream().forEach(x -> put(((TableId)x.getKey()).toLowercase(), x.getValue()));
}
public int hashCode() {
return delegate.hashCode();
}
public boolean equals(Object o) {
return delegate.equals(o);
}
}

View File

@ -1,99 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
/**
*
* @author Jiri Pechanec
*
* A facade for Set implementation that accepts TableId and stores it with
* table name converted to lower case.
*/
public class TableIdCaseInsensitiveSet implements Set<TableId> {
public TableIdCaseInsensitiveSet(Set<TableId> delegate) {
this.delegate = delegate;
}
private Set<TableId> delegate;
@Override
public int size() {
return delegate.size();
}
@Override
public boolean isEmpty() {
return delegate.isEmpty();
}
@Override
public boolean contains(Object o) {
return delegate.contains(((TableId)o).toLowercase());
}
@Override
public Iterator<TableId> iterator() {
return delegate.iterator();
}
@Override
public Object[] toArray() {
return delegate.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return delegate.toArray(a);
}
@Override
public boolean add(TableId e) {
return delegate.add(((TableId)e).toLowercase());
}
@Override
public boolean remove(Object o) {
return delegate.remove(((TableId)o).toLowercase());
}
@Override
public boolean containsAll(Collection<?> c) {
return delegate.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends TableId> c) {
return delegate.addAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
return delegate.retainAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
return delegate.removeAll(c);
}
@Override
public void clear() {
delegate.clear();
}
public int hashCode() {
return delegate.hashCode();
}
public boolean equals(Object o) {
return delegate.equals(o);
}
}

View File

@ -7,9 +7,10 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -78,8 +79,8 @@ public static interface ColumnNameFilter {
} }
private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant(); private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
private final Map<TableId, TableImpl> tablesByTableId; private final TablesById tablesByTableId;
private final Set<TableId> changes; private final TableIds changes;
private final boolean tableIdCaseInsensitive; private final boolean tableIdCaseInsensitive;
/** /**
@ -89,13 +90,8 @@ public static interface ColumnNameFilter {
*/ */
public Tables(boolean tableIdCaseInsensitive) { public Tables(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive; this.tableIdCaseInsensitive = tableIdCaseInsensitive;
if (tableIdCaseInsensitive) { this.tablesByTableId = new TablesById(tableIdCaseInsensitive);
tablesByTableId = new TableIdCaseInsensitiveMap<>(new ConcurrentHashMap<>()); this.changes = new TableIds(tableIdCaseInsensitive);
changes = new TableIdCaseInsensitiveSet(new HashSet<>());
} else {
tablesByTableId = new ConcurrentHashMap<>();
changes = new HashSet<>();
}
} }
/** /**
@ -126,7 +122,7 @@ public int size() {
public Set<TableId> drainChanges() { public Set<TableId> drainChanges() {
return lock.write(() -> { return lock.write(() -> {
Set<TableId> result = new HashSet<>(changes); Set<TableId> result = changes.toSet();
changes.clear(); changes.clear();
return result; return result;
}); });
@ -259,7 +255,7 @@ public Table forTable(String catalogName, String schemaName, String tableName) {
* @return the immutable set of table identifiers; never null * @return the immutable set of table identifiers; never null
*/ */
public Set<TableId> tableIds() { public Set<TableId> tableIds() {
return lock.read(() -> Collect.unmodifiableSet(tablesByTableId.keySet())); return lock.read(() -> Collect.unmodifiableSet(tablesByTableId.ids()));
} }
/** /**
@ -333,4 +329,111 @@ public String toString() {
return sb.toString(); return sb.toString();
}); });
} }
/**
* A map of tables by id. Table names are stored lower-case if required as per the config.
*/
private static class TablesById {
private final boolean tableIdCaseInsensitive;
private final ConcurrentMap<TableId, TableImpl> values;
public TablesById(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
this.values = new ConcurrentHashMap<>();
}
public Set<TableId> ids() {
return values.keySet();
}
boolean isEmpty() {
return values.isEmpty();
}
public void putAll(TablesById tablesByTableId) {
if(tableIdCaseInsensitive) {
tablesByTableId.values.entrySet()
.forEach(e -> put(e.getKey().toLowercase(), e.getValue()));
}
else {
values.putAll(tablesByTableId.values);
}
}
public TableImpl remove(TableId tableId) {
return values.remove(toLowerCaseIfNeeded(tableId));
}
public TableImpl get(TableId tableId) {
return values.get(toLowerCaseIfNeeded(tableId));
}
public Table put(TableId tableId, TableImpl updated) {
return values.put(toLowerCaseIfNeeded(tableId), updated);
}
int size() {
return values.size();
}
void forEach(BiConsumer<? super TableId, ? super TableImpl> action) {
values.forEach(action);
}
private TableId toLowerCaseIfNeeded(TableId tableId) {
return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}
@Override
public int hashCode() {
return values.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
TablesById other = (TablesById) obj;
return values.equals(other.values);
}
}
/**
* A set of table ids. Table names are stored lower-case if required as per the config.
*/
private static class TableIds {
private final boolean tableIdCaseInsensitive;
private final Set<TableId> values;
public TableIds(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
this.values = new HashSet<>();
}
public void add(TableId tableId) {
values.add(toLowerCaseIfNeeded(tableId));
}
public Set<TableId> toSet() {
return new HashSet<>(values);
}
public void clear() {
values.clear();
}
private TableId toLowerCaseIfNeeded(TableId tableId) {
return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}
}
} }