DBZ-137 contribution to the core, Oracle related

This commit is contained in:
AndreyIg 2020-07-09 11:57:47 -07:00 committed by Gunnar Morling
parent 2410c47626
commit 77662a0dde
12 changed files with 431 additions and 163 deletions

View File

@ -500,7 +500,7 @@ public static class MongoDbSnapshottingTask extends SnapshottingTask {
private final List<ReplicaSet> replicaSetsToSnapshot;
public MongoDbSnapshottingTask(List<ReplicaSet> replicaSetsToSnapshot) {
super(false, !replicaSetsToSnapshot.isEmpty());
super(false, !replicaSetsToSnapshot.isEmpty(), false);
this.replicaSetsToSnapshot = replicaSetsToSnapshot;
}

View File

@ -71,7 +71,7 @@ protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
snapshotSchema = false;
}
return new SnapshottingTask(snapshotSchema, snapshotData);
return new SnapshottingTask(snapshotSchema, snapshotData, false);
}
@Override

View File

@ -19,6 +19,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -86,14 +87,36 @@ public class SqlServerConnection extends JdbcConnection {
* Creates a new connection using the supplied configuration.
*
* @param config {@link Configuration} instance, may not be null.
* @param clock the clock
* @param sourceTimestampMode strategy for populating {@code source.ts_ms}.
* @param config
* {@link Configuration} instance, may not be null.
* @param valueConverters
* {@link SqlServerValueConverters} instance
*/
public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMode sourceTimestampMode) {
this(config, clock, sourceTimestampMode, null);
}
/**
* Creates a new connection using the supplied configuration.
*
* @param config {@link Configuration} instance, may not be null.
* @param clock the clock
* @param sourceTimestampMode strategy for populating {@code source.ts_ms}.
* @param valueConverters {@link SqlServerValueConverters} instance
*/
public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMode sourceTimestampMode, SqlServerValueConverters valueConverters) {
super(config, FACTORY);
this(config, clock, sourceTimestampMode, null, null);
}
/**
* Creates a new connection using the supplied configuration.
*
* @param config {@link Configuration} instance, may not be null.
* @param clock the clock
* @param sourceTimestampMode strategy for populating {@code source.ts_ms}.
* @param valueConverters {@link SqlServerValueConverters} instance
* @param classLoaderSupplier class loader supplier
*/
public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMode sourceTimestampMode, SqlServerValueConverters valueConverters, Supplier<ClassLoader> classLoaderSupplier) {
super(config, FACTORY, classLoaderSupplier);
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
realDatabaseName = retrieveRealDatabaseName();
boolean supportsAtTimeZone = supportsAtTimeZone();
@ -185,6 +208,10 @@ public void getChangesForTables(SqlServerChangeTable[] changeTables, Lsn interva
final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
preparers[idx] = statement -> {
String fetchSizeStr = config().asProperties().getProperty("incremental.fetch.size");
if (fetchSizeStr != null && fetchSizeStr.trim().length() > 0) {
statement.setFetchSize(Integer.parseInt(fetchSizeStr));
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());
};

View File

@ -318,6 +318,16 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
+ "In '" + SnapshotIsolationMode.READ_UNCOMMITTED.getValue()
+ "' mode neither table nor row-level locks are acquired, but connector does not guarantee snapshot consistency.");
public static final Field SNAPSHOT_SKIP_LOCKS = Field.create("snapshot.skip.locks")
.withDisplayName("Should schema be locked as we build the schema snapshot?")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(false)
.withValidation(Field::isBoolean)
.withDescription(
"If true, locks all tables to be captured as we build the schema snapshot. This will prevent from any concurrent schema changes being applied to them.");
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SQL Server")
.type(
@ -330,7 +340,8 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
.connector(
SNAPSHOT_MODE,
SNAPSHOT_ISOLATION_MODE,
SOURCE_TIMESTAMP_MODE)
SOURCE_TIMESTAMP_MODE,
SNAPSHOT_SKIP_LOCKS)
.excluding(
SCHEMA_WHITELIST,
SCHEMA_INCLUDE_LIST,
@ -451,4 +462,11 @@ public String getContextName() {
public String getConnectorName() {
return Module.name();
}
/**
* @return true if lock to be obtained before building the snapshot schema
*/
public boolean skipSnapshotLock() {
return getConfig().getBoolean(SNAPSHOT_SKIP_LOCKS);
}
}

View File

@ -72,8 +72,11 @@ public ChangeEventSourceCoordinator start(Configuration config) {
final Configuration jdbcConfig = config.filter(
x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) || x.equals(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name())))
.subset("database.", true);
dataConnection = new SqlServerConnection(jdbcConfig, clock, connectorConfig.getSourceTimestampMode(), valueConverters);
metadataConnection = new SqlServerConnection(jdbcConfig, clock, connectorConfig.getSourceTimestampMode(), valueConverters);
Configuration dataConfig = jdbcConfig.edit()
.with("database.connection.autocommit", false)
.build();
dataConnection = new SqlServerConnection(dataConfig, clock, connectorConfig.getSourceTimestampMode(), valueConverters, () -> getClass().getClassLoader());
metadataConnection = new SqlServerConnection(jdbcConfig, clock, connectorConfig.getSourceTimestampMode(), valueConverters, () -> getClass().getClassLoader());
try {
dataConnection.setAutoCommit(false);
}

View File

@ -42,6 +42,7 @@ public class SqlServerSnapshotChangeEventSource extends RelationalSnapshotChange
private final SqlServerConnectorConfig connectorConfig;
private final SqlServerConnection jdbcConnection;
private final SqlServerDatabaseSchema sqlServerDatabaseSchema;
public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext previousOffset, SqlServerConnection jdbcConnection,
SqlServerDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock,
@ -49,12 +50,14 @@ public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConf
super(connectorConfig, previousOffset, jdbcConnection, schema, dispatcher, clock, snapshotProgressListener);
this.connectorConfig = connectorConfig;
this.jdbcConnection = jdbcConnection;
this.sqlServerDatabaseSchema = schema;
}
@Override
protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
boolean snapshotSchema = true;
boolean snapshotData = true;
boolean skipSnapshotLock = false;
// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
@ -71,9 +74,10 @@ protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
LOGGER.info("According to the connector configuration only schema will be snapshotted");
}
snapshotData = connectorConfig.getSnapshotMode().includeData();
skipSnapshotLock = connectorConfig.skipSnapshotLock();
}
return new SnapshottingTask(snapshotSchema, snapshotData);
return new SnapshottingTask(snapshotSchema, snapshotData, skipSnapshotLock);
}
@Override

View File

@ -10,6 +10,8 @@
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.common.config.ConfigDef.Type;
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
@ -44,6 +46,11 @@ public interface JdbcConfiguration extends Configuration {
*/
public static final Field HOSTNAME = Field.create("hostname", "IP address of the database");
/**
* A field for the instance name if any. This field has no default value.
*/
public static final Field INSTANCE = Field.create("instance", "Instance name").withValidation(Field::isOptional);
/**
* A field for the port of the database server. There is no default value.
*/
@ -55,11 +62,25 @@ public interface JdbcConfiguration extends Configuration {
*/
public static final Field ON_CONNECT_STATEMENTS = Field.create("initial.statements", "A semicolon separated list of statements to be executed on connection");
/**
* An optional field for datasource factory class that will be used to build the datasource connection pool.
*/
public static final Field CONNECTION_FACTORY_CLASS = Field.create("connection.factory.class")
.withDescription("The factory class for creation of datasource connection pool")
.withValidation(Field::isOptional);
public static final Field CONNECTION_TIMEOUT_MS = Field.create("connection.timeout.ms")
.withDisplayName("The maximum time (ms) to wait for a connection from the pool")
.withType(Type.INT)
.withDefault(600000)
.withValidation(Field::isOptional);
/**
* The set of names of the pre-defined JDBC configuration fields, including {@link #DATABASE}, {@link #USER},
* {@link #PASSWORD}, {@link #HOSTNAME}, and {@link #PORT}.
*/
public static Set<String> ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS);
public static Set<String> ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, INSTANCE, PORT, ON_CONNECT_STATEMENTS,
CONNECTION_FACTORY_CLASS, CONNECTION_TIMEOUT_MS);
/**
* Obtain a {@link JdbcConfiguration} adapter for the given {@link Configuration}.
@ -136,6 +157,16 @@ default Builder withDatabase(String databaseName) {
return with(DATABASE, databaseName);
}
/**
* Use the given instance name in the resulting configuration.
*
* @param instanceName the name of the instance
* @return this builder object so methods can be chained together; never null
*/
default Builder withInstance(String instanceName) {
return with(INSTANCE, instanceName);
}
/**
* Use the given port in the resulting configuration.
*
@ -145,6 +176,26 @@ default Builder withDatabase(String databaseName) {
default Builder withPort(int port) {
return with(PORT, port);
}
/**
* Use the given datasource factory class in the resulting configuration.
*
* @param datasourceFactoryClassName the datasource factory class name
* @return this builder object so methods can be chained together; never null
*/
default Builder withDatasourceClass(String datasourceFactoryClassName) {
return with(CONNECTION_FACTORY_CLASS, datasourceFactoryClassName);
}
/**
* Use the given connection timeout in the resulting configuration.
*
* @param connectionTimeoutMs connection timeout in ms
* @return this builder object so methods can be chained together; never null
*/
default Builder withConnectionTimeoutMs(int connectionTimeoutMs) {
return with(CONNECTION_TIMEOUT_MS, connectionTimeoutMs);
}
}
/**
@ -321,4 +372,31 @@ default String getUser() {
default String getPassword() {
return getString(PASSWORD);
}
/**
* Get the datasource factory property from the configuration.
*
* @return the specified value, or null if there is none.
*/
default String getConnectionFactoryClassName() {
return getString(CONNECTION_FACTORY_CLASS);
}
/**
* Get the connection timeout from the configuration.
*
* @return the specified value, or null if there is none.
*/
default int getConnectionTimeoutMs() {
return getInteger(CONNECTION_TIMEOUT_MS);
}
/**
* Get the instance from the configuration.
*
* @return the specified value, or null if there is none.
*/
default String getInstance() {
return getString(INSTANCE);
}
}

View File

@ -29,6 +29,7 @@
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@ -90,6 +91,29 @@ public static interface ConnectionFactory {
Connection connect(JdbcConfiguration config) throws SQLException;
}
private class ConnectionFactoryDecorator implements ConnectionFactory {
private ConnectionFactory defaultConnectionFactory;
private ConnectionFactory customConnectionFactory;
private Supplier<ClassLoader> classLoaderSupplier;
private ConnectionFactoryDecorator(ConnectionFactory connectionFactory, Supplier<ClassLoader> classLoaderSupplier) {
this.defaultConnectionFactory = connectionFactory;
this.classLoaderSupplier = classLoaderSupplier;
}
@Override
public Connection connect(JdbcConfiguration config) throws SQLException {
if (Strings.isNullOrEmpty(config.getConnectionFactoryClassName())) {
return defaultConnectionFactory.connect(config);
}
if (customConnectionFactory == null) {
customConnectionFactory = config.getInstance(JdbcConfiguration.CONNECTION_FACTORY_CLASS,
ConnectionFactory.class, classLoaderSupplier);
}
return customConnectionFactory.connect(config);
}
}
/**
* Defines multiple JDBC operations.
*/
@ -280,7 +304,17 @@ private static String findAndReplace(String url, String name, Properties props,
* @param connectionFactory the connection factory; may not be null
*/
public JdbcConnection(Configuration config, ConnectionFactory connectionFactory) {
this(config, connectionFactory, null);
this(config, connectionFactory, (Operations) null);
}
/**
* Create a new instance with the given configuration and connection factory.
*
* @param config the configuration; may not be null
* @param connectionFactory the connection factory; may not be null
*/
public JdbcConnection(Configuration config, ConnectionFactory connectionFactory, Supplier<ClassLoader> classLoaderSupplier) {
this(config, connectionFactory, null, null, classLoaderSupplier);
}
/**
@ -306,8 +340,23 @@ public JdbcConnection(Configuration config, ConnectionFactory connectionFactory,
*/
protected JdbcConnection(Configuration config, ConnectionFactory connectionFactory, Operations initialOperations,
Consumer<Configuration.Builder> adapter) {
this(config, connectionFactory, initialOperations, adapter, null);
}
/**
* Create a new instance with the given configuration and connection factory, and specify the operations that should be
* run against each newly-established connection.
*
* @param config the configuration; may not be null
* @param connectionFactory the connection factory; may not be null
* @param initialOperations the initial operations that should be run on each new connection; may be null
* @param adapter the function that can be called to update the configuration with defaults
* @param classLoaderSupplier class loader supplier
*/
protected JdbcConnection(Configuration config, ConnectionFactory connectionFactory, Operations initialOperations,
Consumer<Configuration.Builder> adapter, Supplier<ClassLoader> classLoaderSupplier) {
this.config = adapter == null ? config : config.edit().apply(adapter).build();
this.factory = connectionFactory;
this.factory = classLoaderSupplier == null ? connectionFactory : new ConnectionFactoryDecorator(connectionFactory, classLoaderSupplier);
this.initialOps = initialOperations;
this.conn = null;
}
@ -375,14 +424,45 @@ public JdbcConnection execute(String... sqlStatements) throws SQLException {
* @throws SQLException if there is an error connecting to the database or executing the statements
*/
public JdbcConnection execute(Operations operations) throws SQLException {
return closeOnExecute((conn) -> {
try (Statement statement = conn.createStatement();) {
operations.apply(statement);
if (!conn.getAutoCommit()) {
conn.commit();
}
}
});
}
private JdbcConnection closeOnExecute(IExecutable executable) throws SQLException {
Connection conn = connection();
try (Statement statement = conn.createStatement();) {
operations.apply(statement);
commit();
}
executable.executeOn(conn);
return this;
}
private <T> T closeOnCallable(ICallable<T> callable) throws SQLException {
Connection conn = connection();
return callable.callOn(conn);
}
private JdbcConnection closeOnBlockableExecute(IBlockingExecutable executable) throws SQLException, InterruptedException {
Connection conn = connection();
executable.executeOn(conn);
return this;
}
private interface IExecutable {
void executeOn(Connection conn) throws SQLException;
}
private interface IBlockingExecutable {
void executeOn(Connection conn) throws SQLException, InterruptedException;
}
private interface ICallable<T> {
T callOn(Connection conn) throws SQLException;
}
public static interface ResultSetConsumer {
void accept(ResultSet rs) throws SQLException;
}
@ -453,17 +533,18 @@ public <T> T queryAndMap(String query, ResultSetMapper<T> mapper) throws SQLExce
* @throws SQLException if anything unexpected fails
*/
public JdbcConnection call(String sql, CallPreparer callPreparer, ResultSetConsumer resultSetConsumer) throws SQLException {
Connection conn = connection();
try (CallableStatement callableStatement = conn.prepareCall(sql)) {
if (callPreparer != null) {
callPreparer.accept(callableStatement);
}
try (ResultSet rs = callableStatement.executeQuery()) {
if (resultSetConsumer != null) {
resultSetConsumer.accept(rs);
closeOnExecute((conn) -> {
try (CallableStatement callableStatement = conn.prepareCall(sql)) {
if (callPreparer != null) {
callPreparer.accept(callableStatement);
}
try (ResultSet rs = callableStatement.executeQuery()) {
if (resultSetConsumer != null) {
resultSetConsumer.accept(rs);
}
}
}
}
});
return this;
}
@ -478,17 +559,18 @@ public JdbcConnection call(String sql, CallPreparer callPreparer, ResultSetConsu
* @see #execute(Operations)
*/
public JdbcConnection query(String query, StatementFactory statementFactory, ResultSetConsumer resultConsumer) throws SQLException {
Connection conn = connection();
try (Statement statement = statementFactory.createStatement(conn);) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
if (resultConsumer != null) {
resultConsumer.accept(resultSet);
closeOnExecute((conn) -> {
try (Statement statement = statementFactory.createStatement(conn);) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
if (resultConsumer != null) {
resultConsumer.accept(resultSet);
}
}
}
}
});
return this;
}
@ -566,30 +648,32 @@ public JdbcConnection prepareQuery(String[] multiQuery, StatementPreparer[] prep
*/
public <T> T queryAndMap(String query, StatementFactory statementFactory, ResultSetMapper<T> mapper) throws SQLException {
Objects.requireNonNull(mapper, "Mapper must be provided");
Connection conn = connection();
try (Statement statement = statementFactory.createStatement(conn);) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
return closeOnCallable((conn) -> {
try (Statement statement = statementFactory.createStatement(conn);) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
return mapper.apply(resultSet);
}
}
try (ResultSet resultSet = statement.executeQuery(query);) {
return mapper.apply(resultSet);
}
}
});
}
public JdbcConnection queryWithBlockingConsumer(String query, StatementFactory statementFactory, BlockingResultSetConsumer resultConsumer)
throws SQLException, InterruptedException {
Connection conn = connection();
try (Statement statement = statementFactory.createStatement(conn);) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
if (resultConsumer != null) {
resultConsumer.accept(resultSet);
closeOnBlockableExecute((conn) -> {
try (Statement statement = statementFactory.createStatement(conn);) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
if (resultConsumer != null) {
resultConsumer.accept(resultSet);
}
}
}
}
});
return this;
}
@ -876,15 +960,17 @@ public synchronized void close() throws SQLException {
*/
public Set<String> readAllCatalogNames()
throws SQLException {
Set<String> catalogs = new HashSet<>();
DatabaseMetaData metadata = connection().getMetaData();
try (ResultSet rs = metadata.getCatalogs()) {
while (rs.next()) {
String catalogName = rs.getString(1);
catalogs.add(catalogName);
return closeOnCallable((conn) -> {
Set<String> catalogs = new HashSet<>();
DatabaseMetaData metadata = conn.getMetaData();
try (ResultSet rs = metadata.getCatalogs()) {
while (rs.next()) {
String catalogName = rs.getString(1);
catalogs.add(catalogName);
}
}
}
return catalogs;
return catalogs;
});
}
/**
@ -896,31 +982,35 @@ public Set<String> readAllCatalogNames()
*/
public Set<String> readAllSchemaNames(Predicate<String> filter)
throws SQLException {
Set<String> schemas = new HashSet<>();
DatabaseMetaData metadata = connection().getMetaData();
try (ResultSet rs = metadata.getSchemas()) {
while (rs.next()) {
String schema = rs.getString(1);
if (filter != null && filter.test(schema)) {
schemas.add(schema);
return closeOnCallable((conn) -> {
Set<String> schemas = new HashSet<>();
DatabaseMetaData metadata = conn.getMetaData();
try (ResultSet rs = metadata.getSchemas()) {
while (rs.next()) {
String schema = rs.getString(1);
if (filter != null && filter.test(schema)) {
schemas.add(schema);
}
}
}
}
return schemas;
return schemas;
});
}
public String[] tableTypes() throws SQLException {
List<String> types = new ArrayList<>();
DatabaseMetaData metadata = connection().getMetaData();
try (ResultSet rs = metadata.getTableTypes()) {
while (rs.next()) {
String tableType = rs.getString(1);
if (tableType != null) {
types.add(tableType);
return closeOnCallable((conn) -> {
List<String> types = new ArrayList<>();
DatabaseMetaData metadata = conn.getMetaData();
try (ResultSet rs = metadata.getTableTypes()) {
while (rs.next()) {
String tableType = rs.getString(1);
if (tableType != null) {
types.add(tableType);
}
}
}
}
return types.toArray(new String[types.size()]);
return types.toArray(new String[types.size()]);
});
}
/**
@ -951,21 +1041,22 @@ public Set<TableId> readAllTableNames(String[] tableTypes) throws SQLException {
public Set<TableId> readTableNames(String databaseCatalog, String schemaNamePattern, String tableNamePattern,
String[] tableTypes)
throws SQLException {
if (tableNamePattern == null) {
tableNamePattern = "%";
}
Set<TableId> tableIds = new HashSet<>();
DatabaseMetaData metadata = connection().getMetaData();
try (ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, tableNamePattern, tableTypes)) {
while (rs.next()) {
String catalogName = rs.getString(1);
String schemaName = rs.getString(2);
String tableName = rs.getString(3);
TableId tableId = new TableId(catalogName, schemaName, tableName);
tableIds.add(tableId);
String tabNamePattern = (tableNamePattern == null) ? "%" : tableNamePattern;
return closeOnCallable((conn) -> {
Set<TableId> tableIds = new HashSet<>();
DatabaseMetaData metadata = conn.getMetaData();
try (ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, tableNamePattern, tableTypes)) {
while (rs.next()) {
String catalogName = rs.getString(1);
String schemaName = rs.getString(2);
String tableName = rs.getString(3);
TableId tableId = new TableId(catalogName, schemaName, tableName);
tableIds.add(tableId);
}
}
}
return tableIds;
return tableIds;
});
}
/**
@ -1042,62 +1133,64 @@ protected int resolveJdbcType(int metadataJdbcType, int nativeType) {
public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern,
TableFilter tableFilter, ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc)
throws SQLException {
// Before we make any changes, get the copy of the set of table IDs ...
Set<TableId> tableIdsBefore = new HashSet<>(tables.tableIds());
closeOnExecute((conn) -> {
// Before we make any changes, get the copy of the set of table IDs ...
Set<TableId> tableIdsBefore = new HashSet<>(tables.tableIds());
// Read the metadata for the table columns ...
DatabaseMetaData metadata = connection().getMetaData();
// Read the metadata for the table columns ...
DatabaseMetaData metadata = conn.getMetaData();
// Find regular and materialized views as they cannot be snapshotted
final Set<TableId> viewIds = new HashSet<>();
try (final ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, null, new String[]{ "VIEW", "MATERIALIZED VIEW" })) {
while (rs.next()) {
final String catalogName = rs.getString(1);
final String schemaName = rs.getString(2);
final String tableName = rs.getString(3);
viewIds.add(new TableId(catalogName, schemaName, tableName));
}
}
Map<TableId, List<Column>> columnsByTable = new HashMap<>();
try (ResultSet columnMetadata = metadata.getColumns(databaseCatalog, schemaNamePattern, null, null)) {
while (columnMetadata.next()) {
String catalogName = columnMetadata.getString(1);
String schemaName = columnMetadata.getString(2);
String tableName = columnMetadata.getString(3);
TableId tableId = new TableId(catalogName, schemaName, tableName);
// exclude views and non-captured tables
if (viewIds.contains(tableId) ||
(tableFilter != null && !tableFilter.isIncluded(tableId))) {
continue;
// Find regular and materialized views as they cannot be snapshotted
final Set<TableId> viewIds = new HashSet<>();
try (final ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, null, new String[]{ "VIEW", "MATERIALIZED VIEW" })) {
while (rs.next()) {
final String catalogName = rs.getString(1);
final String schemaName = rs.getString(2);
final String tableName = rs.getString(3);
viewIds.add(new TableId(catalogName, schemaName, tableName));
}
// add all included columns
readTableColumn(columnMetadata, tableId, columnFilter).ifPresent(column -> {
columnsByTable.computeIfAbsent(tableId, t -> new ArrayList<>())
.add(column.create());
});
}
}
// Read the metadata for the primary keys ...
for (Entry<TableId, List<Column>> tableEntry : columnsByTable.entrySet()) {
// First get the primary key information, which must be done for *each* table ...
List<String> pkColumnNames = readPrimaryKeyOrUniqueIndexNames(metadata, tableEntry.getKey());
Map<TableId, List<Column>> columnsByTable = new HashMap<>();
try (ResultSet columnMetadata = metadata.getColumns(databaseCatalog, schemaNamePattern, null, null)) {
while (columnMetadata.next()) {
String catalogName = columnMetadata.getString(1);
String schemaName = columnMetadata.getString(2);
String tableName = columnMetadata.getString(3);
TableId tableId = new TableId(catalogName, schemaName, tableName);
// Then define the table ...
List<Column> columns = tableEntry.getValue();
Collections.sort(columns);
String defaultCharsetName = null; // JDBC does not expose character sets
tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, defaultCharsetName);
}
// exclude views and non-captured tables
if (viewIds.contains(tableId) ||
(tableFilter != null && !tableFilter.isIncluded(tableId))) {
continue;
}
if (removeTablesNotFoundInJdbc) {
// Remove any definitions for tables that were not found in the database metadata ...
tableIdsBefore.removeAll(columnsByTable.keySet());
tableIdsBefore.forEach(tables::removeTable);
}
// add all included columns
readTableColumn(columnMetadata, tableId, columnFilter).ifPresent(column -> {
columnsByTable.computeIfAbsent(tableId, t -> new ArrayList<>())
.add(column.create());
});
}
}
// Read the metadata for the primary keys ...
for (Entry<TableId, List<Column>> tableEntry : columnsByTable.entrySet()) {
// First get the primary key information, which must be done for *each* table ...
List<String> pkColumnNames = readPrimaryKeyOrUniqueIndexNames(metadata, tableEntry.getKey());
// Then define the table ...
List<Column> columns = tableEntry.getValue();
Collections.sort(columns);
String defaultCharsetName = null; // JDBC does not expose character sets
tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, defaultCharsetName);
}
if (removeTablesNotFoundInJdbc) {
// Remove any definitions for tables that were not found in the database metadata ...
tableIdsBefore.removeAll(columnsByTable.keySet());
tableIdsBefore.forEach(tables::removeTable);
}
});
}
/**
@ -1108,8 +1201,7 @@ protected Optional<ColumnEditor> readTableColumn(ResultSet columnMetadata, Table
final String columnName = columnMetadata.getString(4);
if (columnFilter == null || columnFilter.matches(tableId.catalog(), tableId.schema(), tableId.table(), columnName)) {
final ColumnEditor column = Column.editor().name(columnName);
String columnType = columnMetadata.getString(6);
column.type(columnType);
column.type(columnMetadata.getString(6));
column.length(columnMetadata.getInt(7));
if (columnMetadata.getObject(9) != null) {
column.scale(columnMetadata.getInt(9));
@ -1216,15 +1308,16 @@ private PreparedStatement createPreparedStatement(String preparedQueryString) {
* @throws SQLException if anything fails
*/
public JdbcConnection executeWithoutCommitting(String... statements) throws SQLException {
Connection conn = connection();
try (Statement statement = conn.createStatement()) {
for (String stmt : statements) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Executing statement {}", stmt);
closeOnExecute((conn) -> {
try (Statement statement = conn.createStatement()) {
for (String stmt : statements) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Executing statement {}", stmt);
}
statement.execute(stmt);
}
statement.execute(stmt);
}
}
});
return this;
}

View File

@ -150,10 +150,12 @@ public static class SnapshottingTask {
private final boolean snapshotSchema;
private final boolean snapshotData;
private final boolean skipSnapshotLocking;
public SnapshottingTask(boolean snapshotSchema, boolean snapshotData) {
public SnapshottingTask(boolean snapshotSchema, boolean snapshotData, boolean skipSnapshotLocking) {
this.snapshotSchema = snapshotSchema;
this.snapshotData = snapshotData;
this.skipSnapshotLocking = skipSnapshotLocking;
}
/**
@ -180,9 +182,16 @@ public boolean shouldSkipSnapshot() {
return !snapshotSchema() && !snapshotData();
}
/**
* @return true if locking to be skipped before building the snapshot schema
*/
public boolean skipSnapshotLocking() {
return skipSnapshotLocking;
}
@Override
public String toString() {
return "SnapshottingTask [snapshotSchema=" + snapshotSchema + ", snapshotData=" + snapshotData + "]";
return "SnapshottingTask [snapshotSchema=" + snapshotSchema + ", snapshotData=" + snapshotData + ", skipSnapshotLocking=" + skipSnapshotLocking + "]";
}
}
}

View File

@ -5,6 +5,7 @@
*/
package io.debezium.relational;
import java.time.Instant;
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
@ -12,6 +13,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.AbstractChangeRecordEmitter;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
@ -64,7 +66,8 @@ protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema)
Object[] newColumnValues = getNewColumnValues();
Object newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
Instant instant = getInstantFrom(getOffset());
Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), instant);
if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
// This case can be hit on UPDATE / DELETE when there's no primary key defined while using certain decoders
@ -80,7 +83,8 @@ protected void emitReadRecord(Receiver receiver, TableSchema tableSchema)
Object[] newColumnValues = getNewColumnValues();
Object newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
Instant instant = getInstantFrom(getOffset());
Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), instant);
receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset(), null);
}
@ -96,6 +100,7 @@ protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
Instant instant = getInstantFrom(getOffset());
if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
logger.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
@ -104,7 +109,7 @@ protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
// some configurations does not provide old values in case of updates
// in this case we handle all updates as regular ones
if (oldKey == null || Objects.equals(oldKey, newKey)) {
Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), instant);
receiver.changeRecord(tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null);
}
// PK update -> emit as delete and re-insert with new key
@ -112,13 +117,13 @@ protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
ConnectHeaders headers = new ConnectHeaders();
headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema());
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), instant);
receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
headers = new ConnectHeaders();
headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());
envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), instant);
receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);
}
}
@ -134,7 +139,8 @@ protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) thro
return;
}
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
Instant instant = getInstantFrom(getOffset());
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), instant);
receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), null);
}
@ -162,4 +168,16 @@ protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) thro
protected boolean skipEmptyMessages() {
return false;
}
protected Instant getInstantFrom(OffsetContext offsetContext) {
Instant result = getClock().currentTimeAsInstant();
Object timeObj = offsetContext.getSourceInfo().get(Envelope.FieldName.TIMESTAMP);
if (timeObj != null && timeObj instanceof Long) {
result = Instant.ofEpochMilli((Long) timeObj);
}
else if (timeObj != null && timeObj instanceof Instant) {
result = (Instant) timeObj;
}
return result;
}
}

View File

@ -112,7 +112,12 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, SnapshotContex
LOGGER.info("Snapshot step 3 - Locking captured tables");
if (snapshottingTask.snapshotSchema()) {
lockTablesForSchemaSnapshot(context, ctx);
if (snapshottingTask.skipSnapshotLocking()) {
LOGGER.info("Schema locking was disabled in connector configuration as snapshot.schema.lock is set to false");
}
else {
lockTablesForSchemaSnapshot(context, ctx);
}
}
LOGGER.info("Snapshot step 4 - Determining snapshot offset");
@ -127,7 +132,9 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, SnapshotContex
createSchemaChangeEventsForTables(context, ctx, snapshottingTask);
// if we've been interrupted before, the TX rollback will cause any locks to be released
releaseSchemaSnapshotLocks(ctx);
if (!snapshottingTask.skipSnapshotLocking()) {
releaseSchemaSnapshotLocks(ctx);
}
}
else {
LOGGER.info("Snapshot step 6 - Skipping persisting of schema history");
@ -392,7 +399,17 @@ private Optional<String> determineSnapshotSelect(SnapshotContext snapshotContext
overriddenSelect = connectorConfig.getSnapshotSelectOverridesByTable().get(new TableId(null, tableId.schema(), tableId.table()));
}
return overriddenSelect != null ? Optional.of(overriddenSelect) : getSnapshotSelect(snapshotContext, tableId);
return overriddenSelect != null ? Optional.of(enhanceOverriddenSelect(snapshotContext, overriddenSelect, tableId)) : getSnapshotSelect(snapshotContext, tableId);
}
/**
* This method is overridden for Oracle to implement "as of SCN" predicate
* @param snapshotContext snapshot context, used for getting offset SCN
* @param overriddenSelect conditional snapshot select
* @return enhanced select statement. By default it just returns original select statements.
*/
protected String enhanceOverriddenSelect(SnapshotContext snapshotContext, String overriddenSelect, TableId tableId) {
return overriddenSelect;
}
/**

View File

@ -74,7 +74,8 @@
<version.mongo.server>3.6</version.mongo.server>
<version.mongo.driver>3.12.3</version.mongo.driver>
<version.sqlserver.driver>7.2.2.jre8</version.sqlserver.driver>
<!-- todo: DBZ-137 this should likely be moved to incubator repository? -->
<version.oracle.driver>12.2.0.1</version.oracle.driver>
<!-- Connectors -->
<version.com.google.protobuf>3.8.0</version.com.google.protobuf>