DBZ-1224 Completing implementation;

* Further unification of override handling;
* Undoing change related to SnapshotStatementFactory as it seems not needed
* Adding test
This commit is contained in:
Gunnar Morling 2019-05-24 12:07:37 +02:00 committed by Jiri Pechanec
parent 034739ab06
commit 4ed8b4636c
12 changed files with 202 additions and 210 deletions

View File

@ -77,6 +77,7 @@ Pradeep Mamillapalli
Prannoy Mittal
Preethi Sadagopan
Raf Liwoch
Ram Satish
Ramesh Reddy
Randall Hauch
Renato Mefi

View File

@ -14,7 +14,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import io.debezium.connector.base.SnapshotStatementFactory;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@ -54,7 +53,6 @@ public abstract class AbstractReader implements Reader {
protected final ChangeEventQueueMetrics changeEventQueueMetrics;
private final HaltingPredicate acceptAndContinue;
private final SnapshotStatementFactory snapshotStatementFactory;
/**
* Create a snapshot reader.
@ -87,7 +85,6 @@ public int remainingCapacity() {
return records.remainingCapacity();
}
};
this.snapshotStatementFactory = new SnapshotStatementFactory(context.getConnectorConfig(), connectionContext.jdbc());
}
@Override
@ -360,12 +357,4 @@ public boolean accepts(SourceRecord sourceRecord) {
return true;
}
}
/**
*
* @return Snapshot statement factory
*/
public SnapshotStatementFactory getSnapshotStatementFactory() {
return snapshotStatementFactory;
}
}

View File

@ -947,16 +947,6 @@ public static EventProcessingFailureHandlingMode parse(String value) {
+ "'warn' the problematic event and its binlog position will be logged and the event will be skipped;"
+ "'ignore' the problematic event will be skipped.");
public static final Field SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE = Field.create("snapshot.select.statement.overrides")
.withDisplayName("List of tables where the default select statement used during snapshotting should be overridden.")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription(" This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are " +
"specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]'. " +
"The value of those properties is the select statement to use when retrieving data from the specific table during snapshotting. " +
"A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted.");
/**
* Method that generates a Field for specifying that string columns whose names match a set of regular expressions should
* have their values truncated to be no longer than the specified number of characters.
@ -1017,6 +1007,7 @@ public static final Field MASK_COLUMN(int length) {
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST,
SNAPSHOT_MODE, SNAPSHOT_NEW_TABLES, SNAPSHOT_LOCKING_MODE,
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
GTID_SOURCE_FILTER_DML_EVENTS,
GTID_NEW_CHANNEL_POSITION,
@ -1089,6 +1080,7 @@ protected static ConfigDef configDef() {
DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL);
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, INCLUDE_SQL_QUERY, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS, GTID_NEW_CHANNEL_POSITION, BUFFER_SIZE_FOR_BINLOG_READER,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX, EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE,
CommonConnectorConfig.TOMBSTONES_ON_DELETE);

View File

@ -245,10 +245,6 @@ protected SnapshotMode snapshotMode() {
return SnapshotMode.parse(value, MySqlConnectorConfig.SNAPSHOT_MODE.defaultValueAsString());
}
public String getSnapshotSelectOverrides() {
return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}
public void start() {
connectionContext.start();
// Start the MySQL database history, which simply starts up resources but does not recover the history to a specific point

View File

@ -14,7 +14,6 @@
import java.sql.Types;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -303,7 +302,7 @@ protected void execute() {
while (rs.next() && isRunning()) {
TableId id = new TableId(dbName, null, rs.getString(1));
final boolean shouldRecordTableSchema = shouldRecordTableSchema(schema, filters, id);
// Apply only when the whitelist table list is not dynamically reconfigured
// Apply only when the whitelist table list is not dynamically reconfigured
if ((createTableFilters == filters && shouldRecordTableSchema) || createTableFilters.tableFilter().test(id)) {
createTablesMap.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
}
@ -515,7 +514,7 @@ protected void execute() {
}
});
if (numRows.get() <= largeTableCount) {
statementFactory = getSnapshotStatementFactory();
statementFactory = this::createStatement;
}
rowCountStr.set(numRows.toString());
} catch (SQLException e) {
@ -528,7 +527,7 @@ protected void execute() {
long start = clock.currentTimeInMillis();
logger.info("Step {}: - scanning table '{}' ({} of {} tables)", step, tableId, ++counter, tableIds.size());
Map<TableId, String> selectOverrides = getSnapshotSelectOverridesByTable();
Map<TableId, String> selectOverrides = context.getConnectorConfig().getSnapshotSelectOverridesByTable();
String selectStatement = selectOverrides.getOrDefault(tableId, "SELECT * FROM " + quote(tableId));
logger.info("For table '{}' using select statement: '{}'", tableId, selectStatement);
@ -906,28 +905,6 @@ protected void recordRowAsInsert(RecordsForTable recordMaker, Object[] row, long
recordMaker.create(row, ts);
}
/**
* Returns any SELECT overrides, if present.
*/
private Map<TableId, String> getSnapshotSelectOverridesByTable() {
String tableList = context.getSnapshotSelectOverrides();
if (tableList == null) {
return Collections.emptyMap();
}
Map<TableId, String> snapshotSelectOverridesByTable = new HashMap<>();
for (String table : tableList.split(",")) {
snapshotSelectOverridesByTable.put(
TableId.parse(table),
context.config().getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table)
);
}
return snapshotSelectOverridesByTable;
}
protected static interface RecordRecorder {
void recordRow(RecordsForTable recordMaker, Object[] row, long ts) throws InterruptedException;
}

View File

@ -932,16 +932,6 @@ protected Snapshotter getSnapshotter() {
return this.snapshotMode.getSnapshotter(getConfig());
}
@Override
public String snapshotSelectOverrides() {
return getConfig().getString(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}
@Override
public String snapshotSelectOverrideForTable(String table) {
return getConfig().getString(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table);
}
protected boolean skipRefreshSchemaOnMissingToastableData() {
return SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST == this.schemaRefreshMode;
}

View File

@ -5,28 +5,25 @@
*/
package io.debezium.connector.postgresql.snapshot;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.relational.TableId;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.relational.TableId;
public abstract class QueryingSnapshotter implements Snapshotter {
private PostgresConnectorConfig config;
private Map<TableId, String> snapshotOverrides;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
this.config = config;
this.snapshotOverrides = getSnapshotSelectOverridesByTable();
this.snapshotOverrides = config.getSnapshotSelectOverridesByTable();
}
@Override
public Optional<String> buildSnapshotQuery(TableId tableId) {
if (snapshotOverrides.containsKey(tableId)) {
return Optional.of(snapshotOverrides.get(tableId));
@ -40,25 +37,5 @@ public Optional<String> buildSnapshotQuery(TableId tableId) {
}
}
/**
* Returns any SELECT overrides, if present.
*/
private Map<TableId, String> getSnapshotSelectOverridesByTable() {
String tableList = config.snapshotSelectOverrides();
if (tableList == null) {
return Collections.emptyMap();
}
Map<TableId, String> snapshotSelectOverridesByTable = new HashMap<>();
for (String table : tableList.split(",")) {
snapshotSelectOverridesByTable.put(
TableId.parse(table),
config.snapshotSelectOverrideForTable(table)
);
}
return snapshotSelectOverridesByTable;
}
}

View File

@ -11,9 +11,6 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -201,6 +198,7 @@ protected void complete(SnapshotContext snapshotContext) {
* @param tableId the table to generate a query for
* @return a valid query string
*/
@Override
protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) {
return String.format("SELECT * FROM [%s].[%s]", tableId.schema(), tableId.table());
}
@ -211,29 +209,6 @@ protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotCon
return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, getClock());
}
/**
* Returns any SELECT overrides, if present.
*/
@Override
protected Map<TableId, String> getSnapshotSelectOverridesByTable() {
String tableList = connectorConfig.snapshotSelectOverrides();
if (tableList == null) {
return Collections.emptyMap();
}
Map<TableId, String> snapshotSelectOverridesByTable = new HashMap<>();
for (String table : tableList.split(",")) {
snapshotSelectOverridesByTable.put(
TableId.parse(table),
connectorConfig.snapshotSelectOverrideForTable(table)
);
}
return snapshotSelectOverridesByTable;
}
/**
* Mutable context which is populated in the course of snapshotting.
*/

View File

@ -0,0 +1,150 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import static org.fest.assertions.Assertions.assertThat;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Testing;
/**
* Integration test for using snapshot SELECT overrides with the Debezium SQL Server connector.
*
* @author Gunnar Morling
*/
public class SnapshotWithSelectOverridesIT extends AbstractConnectorTest {
private static final int INITIAL_RECORDS_PER_TABLE = 10;
private SqlServerConnection connection;
@Before
public void before() throws SQLException {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
connection.execute(
"CREATE TABLE table1 (id int, name varchar(30), price decimal(8,2), ts datetime2(0), soft_deleted bit, primary key(id))"
);
connection.execute(
"CREATE TABLE table2 (id int, name varchar(30), price decimal(8,2), ts datetime2(0), soft_deleted bit, primary key(id))"
);
connection.execute(
"CREATE TABLE table3 (id int, name varchar(30), price decimal(8,2), ts datetime2(0), soft_deleted bit, primary key(id))"
);
// Populate database
for (int i = 0; i < INITIAL_RECORDS_PER_TABLE; i++) {
connection.execute(
String.format(
"INSERT INTO table1 VALUES(%s, '%s', %s, '%s', %s)",
i,
"name" + i,
new BigDecimal(i + ".23"),
"2018-07-18 13:28:56",
i % 2
)
);
connection.execute(
String.format(
"INSERT INTO table2 VALUES(%s, '%s', %s, '%s', %s)",
i,
"name" + i,
new BigDecimal(i + ".23"),
"2018-07-18 13:28:56",
i % 2
)
);
connection.execute(
String.format(
"INSERT INTO table3 VALUES(%s, '%s', %s, '%s', %s)",
i,
"name" + i,
new BigDecimal(i + ".23"),
"2018-07-18 13:28:56",
i % 2
)
);
}
TestHelper.enableTableCdc(connection, "table1");
TestHelper.enableTableCdc(connection, "table2");
TestHelper.enableTableCdc(connection, "table3");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@After
public void after() throws SQLException {
if (connection != null) {
connection.close();
}
// TestHelper.dropTestDatabase();
}
@Test
@FixFor("DBZ-1224")
public void takeSnapshotWithOverrides() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
"dbo.table1,dbo.table3"
)
.with(
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table1",
"SELECT * FROM [dbo].[table1] where soft_deleted = 0 order by id desc"
)
.with(
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3",
"SELECT * FROM [dbo].[table3] where soft_deleted = 0"
)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE / 2);
List<SourceRecord> table1 = records.recordsForTopic("server1.dbo.table1");
List<SourceRecord> table2 = records.recordsForTopic("server1.dbo.table2");
List<SourceRecord> table3 = records.recordsForTopic("server1.dbo.table3");
// soft_deleted records should be excluded for table1 and table3
assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE / 2);
assertThat(table2).hasSize(INITIAL_RECORDS_PER_TABLE);
assertThat(table3).hasSize(INITIAL_RECORDS_PER_TABLE / 2);
String expectedIdsForTable1 = "86420";
StringBuilder actualIdsForTable1 = new StringBuilder();
for (int i = 0; i < INITIAL_RECORDS_PER_TABLE / 2; i++) {
SourceRecord record = table1.get(i);
Struct key = (Struct) record.key();
actualIdsForTable1.append(key.get("id"));
// soft_deleted records should be excluded
Struct value = (Struct) record.value();
assertThat(((Struct) value.get("after")).get("soft_deleted")).isEqualTo(false);
}
// the ORDER BY clause should be applied, too
assertThat(actualIdsForTable1.toString()).isEqualTo(expectedIdsForTable1);
}
}

View File

@ -1,56 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.base;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
/**
* Builds the snapshot statement required for reading tables
*/
public class SnapshotStatementFactory implements JdbcConnection.StatementFactory {
private final JdbcConnection jdbcConnection;
private final RelationalDatabaseConnectorConfig connectorConfig;
/**
* Create a snapshot producer
*
* @param connectorConfig relational connector configuration
* @param jdbcConnection JDBC connection
*/
public SnapshotStatementFactory(RelationalDatabaseConnectorConfig connectorConfig,
JdbcConnection jdbcConnection) {
this.connectorConfig = connectorConfig;
this.jdbcConnection = jdbcConnection;
}
/**
* Create 'Read' table statement for snapshot
* @return statement
* @throws SQLException any SQL exception thrown
*/
public Statement readTableStatement() throws SQLException {
return createStatement(jdbcConnection.connection());
}
/**
* Create statement for snapshot reading
* @param connection the JDBC connection; never null
* @return statement for snapshot reading
* @throws SQLException any SQL exception thrown
*/
@Override
public Statement createStatement(Connection connection) throws SQLException {
int rowsFetchSize = connectorConfig.getSnapshotFetchSize();
Statement statement = connection.createStatement(); // the default cursor is FORWARD_ONLY
statement.setFetchSize(rowsFetchSize);
return statement;
}
}

View File

@ -13,7 +13,6 @@
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@ -218,13 +217,12 @@ protected void connectionCreated(SnapshotContext snapshotContext) throws Excepti
}
private void determineCapturedTables(SnapshotContext ctx) throws Exception {
ctx.snapshotOverrides = getSnapshotSelectOverridesByTable();
Set<TableId> allTableIds = getAllTableIds(ctx);
Set<TableId> capturedTables = new HashSet<>();
for (TableId tableId : allTableIds) {
if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId) && !ctx.skipSnapshot(tableId)) {
if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
LOGGER.trace("Adding table {} to the list of captured tables", tableId);
capturedTables.add(tableId);
}
@ -365,14 +363,21 @@ private Timer getTableScanLogTimer() {
protected abstract ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, Object[] row);
/**
* Generate a valid query string for the specified table
* Returns a valid query string for the specified table, either given by the user via snapshot select overrides or
* defaulting to a statement provided by the DB-specific change event source.
*
* @param tableId the table to generate a query for
* @return a valid query string
*/
private String determineSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) {
return snapshotContext.isSnapshotSelectOveridden(tableId) ? snapshotContext.getSnapshotSelectOveridden(tableId) :
getSnapshotSelect(snapshotContext, tableId);
String overriddenSelect = connectorConfig.getSnapshotSelectOverridesByTable().get(tableId);
// try without catalog id, as this might or might not be populated based on the given connector
if (overriddenSelect == null) {
overriddenSelect = connectorConfig.getSnapshotSelectOverridesByTable().get(new TableId(null, tableId.schema(), tableId.table()));
}
return overriddenSelect != null ? overriddenSelect : getSnapshotSelect(snapshotContext, tableId);
}
/**
@ -383,11 +388,6 @@ private String determineSnapshotSelect(SnapshotContext snapshotContext, TableId
// scn xyz")
protected abstract String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId);
/**
* Returns any SELECT overrides, if present.
*/
protected abstract Map<TableId, String> getSnapshotSelectOverridesByTable();
private Column[] getColumnsForResultSet(Table table, ResultSet rs) throws SQLException {
ResultSetMetaData metaData = rs.getMetaData();
Column[] columns = new Column[metaData.getColumnCount()];
@ -431,7 +431,6 @@ private void rollbackTransaction(Connection connection) {
* Mutable context which is populated in the course of snapshotting.
*/
public static class SnapshotContext implements AutoCloseable {
public Map<TableId, String> snapshotOverrides;
public final String catalogName;
public final Tables tables;
@ -446,22 +445,6 @@ public SnapshotContext(String catalogName) throws SQLException {
@Override
public void close() throws Exception {
}
public boolean skipSnapshot(TableId inTableId) {
if (snapshotOverrides.containsKey(inTableId) && snapshotOverrides.get(inTableId) == null) {
LOGGER.warn("For table '{}' the select statement was not provided, skipping table", inTableId);
return true;
}
return false;
}
public boolean isSnapshotSelectOveridden(TableId tableId) {
return snapshotOverrides.containsKey(tableId);
}
public String getSnapshotSelectOveridden(TableId tableId) {
return snapshotOverrides.get(tableId);
}
}
/**

View File

@ -6,6 +6,9 @@
package io.debezium.relational;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@ -182,8 +185,10 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription(" This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are " +
"specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]'. " +
.withDescription(" This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the" +
"specific connectors . Select statements for the individual tables are " +
"specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]' or " +
"'snapshot.select.statement.overrides.[SCHEMA_NAME].[TABLE_NAME]', respectively. " +
"The value of those properties is the select statement to use when retrieving data from the specific table during snapshotting. " +
"A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted.");
@ -228,12 +233,25 @@ private static int validateTableBlacklist(Configuration config, Field field, Val
return 0;
}
public String snapshotSelectOverrides() {
return getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}
/**
* Returns any SELECT overrides, if present.
*/
public Map<TableId, String> getSnapshotSelectOverridesByTable() {
String tableList = getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
public String snapshotSelectOverrideForTable(String table) {
return getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table);
}
if (tableList == null) {
return Collections.emptyMap();
}
Map<TableId, String> snapshotSelectOverridesByTable = new HashMap<>();
for (String table : tableList.split(",")) {
snapshotSelectOverridesByTable.put(
TableId.parse(table),
getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table)
);
}
return Collections.unmodifiableMap(snapshotSelectOverridesByTable);
}
}