DBZ-6960 Include REDO SQL in Oracle source info block

This commit is contained in:
Chris Cranford 2024-02-18 01:24:24 -05:00 committed by Jiri Pechanec
parent 340234c2ac
commit aab87365d9
11 changed files with 308 additions and 3 deletions

View File

@ -592,6 +592,15 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withDescription("A comma-separated list of usernames that schema changes will be skipped for. Defaults to 'SYS,SYSTEM'.")
.withDefault("SYS,SYSTEM");
public static final Field LOG_MINING_INCLUDE_REDO_SQL = Field.create("log.mining.include.redo.sql")
.withDisplayName("Include the transaction log SQL")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("When enabled, the transaction log REDO SQL will be included in the source information block.")
.withDefault(false)
.withValidation(OracleConnectorConfig::validateLogMiningIncludeRedoSql);
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("Oracle")
.excluding(
@ -657,6 +666,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
LOG_MINING_RESTART_CONNECTION,
LOG_MINING_MAX_SCN_DEVIATION_MS,
LOG_MINING_SCHEMA_CHANGES_USERNAME_EXCLUDE_LIST,
LOG_MINING_INCLUDE_REDO_SQL,
OLR_SOURCE,
OLR_HOST,
OLR_PORT)
@ -726,6 +736,8 @@ public static ConfigDef configDef() {
private final Duration logMiningMaxScnDeviation;
private final String logMiningInifispanGlobalConfiguration;
private final Set<String> logMiningSchemaChangesUsernameExcludes;
private final Boolean logMiningIncludeRedoSql;
private final String openLogReplicatorSource;
private final String openLogReplicatorHostname;
private final Integer openLogReplicatorPort;
@ -792,6 +804,7 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningMaxScnDeviation = Duration.ofMillis(config.getLong(LOG_MINING_MAX_SCN_DEVIATION_MS));
this.logMiningInifispanGlobalConfiguration = config.getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_GLOBAL);
this.logMiningSchemaChangesUsernameExcludes = Strings.setOf(config.getString(LOG_MINING_SCHEMA_CHANGES_USERNAME_EXCLUDE_LIST), String::new);
this.logMiningIncludeRedoSql = config.getBoolean(LOG_MINING_INCLUDE_REDO_SQL);
// OpenLogReplicator
this.openLogReplicatorSource = config.getString(OLR_SOURCE);
@ -1794,6 +1807,15 @@ public Set<String> getLogMiningSchemaChangesUsernameExcludes() {
return logMiningSchemaChangesUsernameExcludes;
}
/**
* Returns whether to include the redo SQL in the source information block.
*
* @return if redo SQL is included in change events
*/
public boolean isLogMiningIncludeRedoSql() {
return logMiningIncludeRedoSql;
}
/**
* Returns the logical source to stream changes from when connecting to OpenLogReplicator.
*
@ -1967,4 +1989,20 @@ public static int validateRequiredWhenUsingOpenLogReplicator(Configuration confi
}
return 0;
}
public static int validateLogMiningIncludeRedoSql(Configuration config, Field field, ValidationOutput problems) {
if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
boolean lobEnabled = config.getBoolean(LOB_ENABLED);
if (lobEnabled && config.getBoolean(field)) {
problems.accept(field, config.getBoolean(field), String.format(
"The configuration property '%s' cannot be enabled when '%s' is set to true.",
field.name(), LOB_ENABLED.name()));
return 1;
}
}
else {
LOGGER.warn("The configuration property '{}' only applies to LogMiner and will be ignored.", field.name());
}
return 0;
}
}

View File

@ -297,6 +297,14 @@ public void setSsn(long ssn) {
sourceInfo.setSsn(ssn);
}
public String getRedoSql() {
return sourceInfo.getRedoSql();
}
public void setRedoSql(String redoSql) {
sourceInfo.setRedoSql(redoSql);
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && !snapshotCompleted;

View File

@ -10,6 +10,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfoStructMaker;
import io.debezium.util.Strings;
public class OracleSourceInfoStructMaker extends AbstractSourceInfoStructMaker<SourceInfo> {
@ -28,7 +29,8 @@ public void init(String connector, String version, CommonConnectorConfig connect
.field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(CommitScn.ROLLBACK_SEGMENT_ID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(CommitScn.SQL_SEQUENCE_NUMBER_KEY, Schema.OPTIONAL_INT64_SCHEMA))
.field(SourceInfo.USERNAME_KEY, Schema.OPTIONAL_STRING_SCHEMA).build();
.field(SourceInfo.USERNAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.REDO_SQL, Schema.OPTIONAL_STRING_SCHEMA).build();
}
@Override
@ -55,6 +57,9 @@ public Struct struct(SourceInfo sourceInfo) {
if (sourceInfo.getRsId() != null) {
ret.put(CommitScn.ROLLBACK_SEGMENT_ID_KEY, sourceInfo.getRsId());
}
if (!Strings.isNullOrBlank(sourceInfo.getRedoSql())) {
ret.put(SourceInfo.REDO_SQL, sourceInfo.getRedoSql());
}
ret.put(CommitScn.SQL_SEQUENCE_NUMBER_KEY, sourceInfo.getSsn());

View File

@ -26,6 +26,7 @@ public class SourceInfo extends BaseSourceInfo {
public static final String SNAPSHOT_KEY = "snapshot";
public static final String USERNAME_KEY = "user_name";
public static final String SCN_INDEX_KEY = "scn_idx";
public static final String REDO_SQL = "redo_sql";
private Scn scn;
private CommitScn commitScn;
@ -39,6 +40,7 @@ public class SourceInfo extends BaseSourceInfo {
private String rsId;
private long ssn;
private Long scnIndex;
private String redoSql;
protected SourceInfo(OracleConnectorConfig connectorConfig) {
super(connectorConfig);
@ -116,6 +118,14 @@ public void setSourceTime(Instant sourceTime) {
this.sourceTime = sourceTime;
}
public String getRedoSql() {
return redoSql;
}
public void setRedoSql(String redoSql) {
this.redoSql = redoSql;
}
public String tableSchema() {
return (tableIds == null || tableIds.isEmpty()) ? null
: tableIds.stream()

View File

@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.events;
import java.time.Instant;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry;
import io.debezium.relational.TableId;
/**
* A specialization of {@link DmlEvent} that also stores the LogMiner REDO SQL statements.
*
* @author Chris Cranford
*/
public class RedoSqlDmlEvent extends DmlEvent {
private String redoSql;
public RedoSqlDmlEvent(LogMinerEventRow row, LogMinerDmlEntry dmlEntry, String redoSql) {
super(row, dmlEntry);
this.redoSql = redoSql;
}
public RedoSqlDmlEvent(EventType eventType, Scn scn, TableId tableId, String rowId, String rsId, Instant changeTime,
LogMinerDmlEntry dmlEntry, String redoSql) {
super(eventType, scn, tableId, rowId, rsId, changeTime, dmlEntry);
this.redoSql = redoSql;
}
public String getRedoSql() {
return redoSql;
}
}

View File

@ -49,6 +49,7 @@
import io.debezium.connector.oracle.logminer.events.LobWriteEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent;
import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
import io.debezium.connector.oracle.logminer.events.TruncateEvent;
import io.debezium.connector.oracle.logminer.events.XmlBeginEvent;
@ -501,6 +502,10 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
offsetContext.setRedoThread(row.getThread());
offsetContext.setRsId(event.getRsId());
if (event instanceof RedoSqlDmlEvent) {
offsetContext.setRedoSql(((RedoSqlDmlEvent) event).getRedoSql());
}
final DmlEvent dmlEvent = (DmlEvent) event;
if (!skipExcludedUserName) {
LogMinerChangeRecordEmitter logMinerChangeRecordEmitter;
@ -531,8 +536,11 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
Clock.system());
}
dispatcher.dispatchDataChangeEvent(partition, event.getTableId(), logMinerChangeRecordEmitter);
}
// Clear redo SQL
offsetContext.setRedoSql(null);
}
};
@ -1089,6 +1097,9 @@ protected void handleDataEvent(LogMinerEventRow row) throws SQLException, Interr
final LogMinerDmlEntry dmlEntry = parseDmlStatement(row.getRedoSql(), table);
dmlEntry.setObjectName(row.getTableName());
dmlEntry.setObjectOwner(row.getTablespaceName());
if (connectorConfig.isLogMiningIncludeRedoSql()) {
return new RedoSqlDmlEvent(row, dmlEntry, row.getRedoSql());
}
return new DmlEvent(row, dmlEntry);
});

View File

@ -17,6 +17,6 @@
*/
@AutoProtoSchemaBuilder(includeClasses = { LogMinerEventAdapter.class, DmlEventAdapter.class, SelectLobLocatorEventAdapter.class, LobWriteEventAdapter.class,
LobEraseEventAdapter.class, LogMinerDmlEntryImplAdapter.class, TruncateEventAdapter.class, XmlBeginEventAdapter.class, XmlWriteEventAdapter.class,
XmlEndEventAdapter.class }, schemaFilePath = "/")
XmlEndEventAdapter.class, RedoSqlDmlEventAdapter.class }, schemaFilePath = "/")
public interface LogMinerEventMarshaller extends SerializationContextInitializer {
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling;
import java.time.Instant;
import org.infinispan.protostream.annotations.ProtoAdapter;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl;
import io.debezium.relational.TableId;
/**
* An Infinispan ProtoStream adapter to marshall {@link RedoSqlDmlEvent} instances.
*
* This class defines a factory for creating {@link RedoSqlDmlEvent} instances when hydrating
* records from the persisted datastore as well as field handlers to extract values
* to be marshalled to the protocol buffer stream.
*
* The underlying protocol buffer record consists of the following structure:
* <pre>
* message RedoSqlDmlEvent {
* // structure of the super type, DmlEventAdapter
* required string entry = 8;
* }
* </pre>
* @author Chris Cranford
*/
@ProtoAdapter(RedoSqlDmlEvent.class)
public class RedoSqlDmlEventAdapter extends DmlEventAdapter {
/**
* A ProtoStream factory that creates {@link RedoSqlDmlEvent} instances.
*
* @param eventType the event type
* @param scn the system change number, must not be {@code null}
* @param tableId the fully-qualified table name
* @param rowId the Oracle row-id the change is associated with
* @param rsId the Oracle rollback segment identifier
* @param changeTime the time the change occurred
* @param entry the parsed SQL statement entry
* @param redoSql the redo sql
* @return the constructed RedoSqlDmlEvent
*/
@ProtoFactory
public RedoSqlDmlEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, LogMinerDmlEntryImpl entry, String redoSql) {
return new RedoSqlDmlEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), entry, redoSql);
}
/**
* A ProtoStream handler to extract the {@code redoSql} field from the {@link RedoSqlDmlEvent}.
*
* @param event the event instance, must not be {@code null}
* @return the redo SQL statement
*/
@ProtoField(number = 8, required = true)
public String getRedoSql(RedoSqlDmlEvent event) {
return event.getRedoSql();
}
}

View File

@ -69,6 +69,7 @@ public void schemaIsCorrect() {
.field("ssn", Schema.OPTIONAL_INT64_SCHEMA)
.field("redo_thread", Schema.OPTIONAL_INT32_SCHEMA)
.field("user_name", Schema.OPTIONAL_STRING_SCHEMA)
.field("redo_sql", Schema.OPTIONAL_STRING_SCHEMA)
.build();
VerifyRecord.assertConnectSchemasAreEqual(null, source.schema(), schema);

View File

@ -0,0 +1,120 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;
import static org.assertj.core.api.Assertions.assertThat;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.SourceInfo;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
/**
* Tests for LogMiner's source info block REDO SQL
*
* @author Chris Cranford
*/
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Requires LogMiner")
public class LogMinerSourceInfoRedoSqlIT extends AbstractConnectorTest {
@Rule
public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
private static OracleConnection connection;
@BeforeClass
public static void beforeClass() throws SQLException {
connection = TestHelper.testConnection();
}
@AfterClass
public static void closeConnection() throws SQLException {
if (connection != null) {
connection.close();
}
}
@Before
public void before() throws SQLException {
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
}
@Test
@FixFor("DBZ-6960")
public void shouldStreamRedoSqlInSourceInfoBlockWhenEnabled() throws Exception {
testRedoSqlInSourceInfoBlock(true);
}
@Test
@FixFor("DBZ-6960")
public void shouldNotIncludeRedoSqlInSourceInfoBlockWhenNotEnabled() throws Exception {
testRedoSqlInSourceInfoBlock(false);
}
private void testRedoSqlInSourceInfoBlock(boolean includeRedoSql) throws Exception {
TestHelper.dropTable(connection, "dbz6960");
try {
connection.execute("CREATE TABLE dbz6960 (id number(9,0) primary key, data varchar2(50))");
TestHelper.streamTable(connection, "dbz6960");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.LOG_MINING_INCLUDE_REDO_SQL, Boolean.toString(includeRedoSql))
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6960")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO dbz6960 (id,data) values (1, 'test')");
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ6960");
assertThat(tableRecords).hasSize(1);
VerifyRecord.isValidInsert(tableRecords.get(0), "ID", 1);
Struct source = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.SOURCE);
assertThat(source.schema().field(SourceInfo.REDO_SQL)).isNotNull();
String redoSql = source.getString(SourceInfo.REDO_SQL);
if (includeRedoSql) {
assertThat(redoSql).isEqualTo("insert into \"DEBEZIUM\".\"DBZ6960\"(\"ID\",\"DATA\") values ('1','test');");
}
else {
assertThat(redoSql).isNull();
}
}
finally {
TestHelper.dropTable(connection, "dbz6960");
}
}
}

View File

@ -3718,6 +3718,13 @@ If the difference between the timestamps is less than the specified value, and t
Typically, multiple connectors can use the same flush table.
However, if connectors encounter table lock contention errors, use this property to specify a dedicated table for each connector deployment.
|[[oracle-property-log-mining-include-redo-sql]]<<oracle-property-log-mining-include-redo-sql, `+log.mining.include.redo.sql+`>>
|`false`
|Specifies whether the redo log constructed SQL statement is included in `source.redo_sql` field.
ifdef::community[]
This configuration is ignored when using XStream or OpenLogReplicator adapters.
endif::community[]
|[[oracle-property-lob-enabled]]<<oracle-property-lob-enabled, `+lob.enabled+`>>
|`false`
|Controls whether or not large object (CLOB or BLOB) column values are emitted in change events. +