DBZ-7819 Allow log.mining.flush.table.name to optionally include schema

This commit is contained in:
Chris Cranford 2024-04-25 14:05:47 -04:00 committed by Chris Cranford
parent 47b02585fd
commit f23734c5d6
4 changed files with 28 additions and 18 deletions

View File

@ -353,16 +353,15 @@ public Long getSessionStatisticByName(String name) throws SQLException {
/**
* Returns whether the given table exists or not.
*
* @param tableName table name, should not be {@code null}
* @param tableId table id, should not be {@code null}
* @return true if the table exists, false if it does not
* @throws SQLException if a database exception occurred
*/
public boolean isTableExists(String tableName) throws SQLException {
return queryAndMap("SELECT COUNT(1) FROM USER_TABLES WHERE TABLE_NAME = '" + tableName + "'",
rs -> rs.next() && rs.getLong(1) > 0);
}
public boolean isTableExists(TableId tableId) throws SQLException {
if (Strings.isNullOrBlank(tableId.schema())) {
return queryAndMap("SELECT COUNT(1) FROM USER_TABLES WHERE TABLE_NAME = '" + tableId.table() + "'",
rs -> rs.next() && rs.getLong(1) > 0);
}
return queryAndMap("SELECT COUNT(1) FROM ALL_TABLES WHERE OWNER = '" + tableId.schema() + "' AND TABLE_NAME = '" + tableId.table() + "'",
rs -> rs.next() && rs.getLong(1) > 0);
}
@ -370,22 +369,23 @@ public boolean isTableExists(TableId tableId) throws SQLException {
/**
* Returns whether the given table is empty or not.
*
* @param tableName table name, should not be {@code null}
* @param tableId table id, should not be {@code null}
* @return true if the table has no records, false otherwise
* @throws SQLException if a database exception occurred
*/
public boolean isTableEmpty(String tableName) throws SQLException {
return getRowCount(tableName) == 0L;
public boolean isTableEmpty(TableId tableId) throws SQLException {
return getRowCount(tableId) == 0L;
}
/**
* Returns the number of rows in a given table.
*
* @param tableName table name, should not be {@code null}
* @param tableId table id, should not be {@code null}
* @return the number of rows
* @throws SQLException if a database exception occurred
*/
public long getRowCount(String tableName) throws SQLException {
public long getRowCount(TableId tableId) throws SQLException {
final String tableName = new TableId(null, tableId.schema(), tableId.table()).toDoubleQuotedString();
return queryAndMap("SELECT COUNT(1) FROM " + tableName, rs -> {
if (rs.next()) {
return rs.getLong(1);

View File

@ -15,6 +15,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.Scn;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;
/**
@ -33,6 +34,7 @@ public class CommitLogWriterFlushStrategy implements LogWriterFlushStrategy {
private static final String DELETE_FLUSH_TABLE = "DELETE FROM %s";
private final String flushTableName;
private final TableId flushTableId;
private final String databasePdbName;
private final OracleConnection connection;
private final boolean closeConnectionOnClose;
@ -47,7 +49,8 @@ public class CommitLogWriterFlushStrategy implements LogWriterFlushStrategy {
* @param connection the connection to be used to force the flush, must not be {@code null}
*/
public CommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, OracleConnection connection) {
this.flushTableName = connectorConfig.getLogMiningFlushTableName();
this.flushTableId = TableId.parse(connectorConfig.getLogMiningFlushTableName());
this.flushTableName = flushTableId.toDoubleQuotedString();
this.databasePdbName = connectorConfig.getPdbName();
this.connection = connection;
this.closeConnectionOnClose = false;
@ -65,7 +68,8 @@ public CommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, Oracl
* @throws SQLException if there was a database problem
*/
public CommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig) throws SQLException {
this.flushTableName = connectorConfig.getLogMiningFlushTableName();
this.flushTableId = TableId.parse(connectorConfig.getLogMiningFlushTableName());
this.flushTableName = flushTableId.toDoubleQuotedString();
this.databasePdbName = connectorConfig.getPdbName();
this.connection = new OracleConnection(jdbcConfig);
this.connection.setAutoCommit(false);
@ -118,13 +122,13 @@ private void createFlushTableIfNotExists() {
connection.setSessionToPdb(databasePdbName);
}
if (!connection.isTableExists(flushTableName)) {
if (!connection.isTableExists(flushTableId)) {
connection.executeWithoutCommitting(String.format(CREATE_FLUSH_TABLE, flushTableName));
}
fixMultiRowDataBug();
if (connection.isTableEmpty(flushTableName)) {
if (connection.isTableEmpty(flushTableId)) {
connection.executeWithoutCommitting(String.format(INSERT_FLUSH_TABLE, flushTableName));
connection.commit();
}
@ -148,8 +152,8 @@ private void createFlushTableIfNotExists() {
* @throws SQLException if a database exception occurs
*/
private void fixMultiRowDataBug() throws SQLException {
if (connection.getRowCount(flushTableName) > 1L) {
LOGGER.warn("DBZ-4118: The flush table, {}, has multiple rows and has been corrected.", flushTableName);
if (connection.getRowCount(flushTableId) > 1L) {
LOGGER.warn("DBZ-4118: The flush table, {}, has multiple rows and has been corrected.", flushTableId);
connection.executeWithoutCommitting(String.format(DELETE_FLUSH_TABLE, flushTableName));
connection.executeWithoutCommitting(String.format(INSERT_FLUSH_TABLE, flushTableName));
connection.commit();

View File

@ -32,6 +32,7 @@
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
@ -141,7 +142,7 @@ private void assertFlushTableHasExactlyOneRow(Configuration config) throws SQLEx
if (!Strings.isNullOrEmpty(databasePdbName)) {
conn.setSessionToPdb(databasePdbName);
}
assertThat(conn.getRowCount(getFlushTableName())).isEqualTo(1L);
assertThat(conn.getRowCount(getFlushTableId())).isEqualTo(1L);
}
}
@ -168,4 +169,8 @@ private void insertFlushTable(Configuration config, String scnValue) throws SQLE
private static String getFlushTableName() {
return TestHelper.getConnectorUserName() + "." + flushTableName;
}
private static TableId getFlushTableId() {
return TableId.parse(getFlushTableName());
}
}

View File

@ -3985,6 +3985,7 @@ If the difference between the timestamps is less than the specified value, and t
|[[oracle-property-log-mining-flush-table-name]]<<oracle-property-log-mining-flush-table-name, `+log.mining.flush.table.name+`>>
|`LOG_MINING_FLUSH`
|Specifies the name of the flush table that coordinates flushing the Oracle LogWriter Buffer (LGWR) to the redo logs.
This name can be specified using the format `_<schemaName>_._<tableName>_` or `_<tableName>_`.
Typically, multiple connectors can use the same flush table.
However, if connectors encounter table lock contention errors, use this property to specify a dedicated table for each connector deployment.