From aab87365d960194ed829500bffcb9fe1b3fc7e47 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Sun, 18 Feb 2024 01:24:24 -0500 Subject: [PATCH] DBZ-6960 Include REDO SQL in Oracle source info block --- .../oracle/OracleConnectorConfig.java | 38 ++++++ .../connector/oracle/OracleOffsetContext.java | 8 ++ .../oracle/OracleSourceInfoStructMaker.java | 7 +- .../debezium/connector/oracle/SourceInfo.java | 10 ++ .../logminer/events/RedoSqlDmlEvent.java | 38 ++++++ .../AbstractLogMinerEventProcessor.java | 13 +- .../marshalling/LogMinerEventMarshaller.java | 2 +- .../marshalling/RedoSqlDmlEventAdapter.java | 67 ++++++++++ .../connector/oracle/SourceInfoTest.java | 1 + .../logminer/LogMinerSourceInfoRedoSqlIT.java | 120 ++++++++++++++++++ .../modules/ROOT/pages/connectors/oracle.adoc | 7 + 11 files changed, 308 insertions(+), 3 deletions(-) create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/RedoSqlDmlEvent.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/RedoSqlDmlEventAdapter.java create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerSourceInfoRedoSqlIT.java diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index e57e347d0..67038d244 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -592,6 +592,15 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector .withDescription("A comma-separated list of usernames that schema changes will be skipped for. Defaults to 'SYS,SYSTEM'.") .withDefault("SYS,SYSTEM"); + public static final Field LOG_MINING_INCLUDE_REDO_SQL = Field.create("log.mining.include.redo.sql") + .withDisplayName("Include the transaction log SQL") + .withType(Type.BOOLEAN) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("When enabled, the transaction log REDO SQL will be included in the source information block.") + .withDefault(false) + .withValidation(OracleConnectorConfig::validateLogMiningIncludeRedoSql); + private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit() .name("Oracle") .excluding( @@ -657,6 +666,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector LOG_MINING_RESTART_CONNECTION, LOG_MINING_MAX_SCN_DEVIATION_MS, LOG_MINING_SCHEMA_CHANGES_USERNAME_EXCLUDE_LIST, + LOG_MINING_INCLUDE_REDO_SQL, OLR_SOURCE, OLR_HOST, OLR_PORT) @@ -726,6 +736,8 @@ public static ConfigDef configDef() { private final Duration logMiningMaxScnDeviation; private final String logMiningInifispanGlobalConfiguration; private final Set logMiningSchemaChangesUsernameExcludes; + private final Boolean logMiningIncludeRedoSql; + private final String openLogReplicatorSource; private final String openLogReplicatorHostname; private final Integer openLogReplicatorPort; @@ -792,6 +804,7 @@ public OracleConnectorConfig(Configuration config) { this.logMiningMaxScnDeviation = Duration.ofMillis(config.getLong(LOG_MINING_MAX_SCN_DEVIATION_MS)); this.logMiningInifispanGlobalConfiguration = config.getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_GLOBAL); this.logMiningSchemaChangesUsernameExcludes = Strings.setOf(config.getString(LOG_MINING_SCHEMA_CHANGES_USERNAME_EXCLUDE_LIST), String::new); + this.logMiningIncludeRedoSql = config.getBoolean(LOG_MINING_INCLUDE_REDO_SQL); // OpenLogReplicator this.openLogReplicatorSource = config.getString(OLR_SOURCE); @@ -1794,6 +1807,15 @@ public Set getLogMiningSchemaChangesUsernameExcludes() { return logMiningSchemaChangesUsernameExcludes; } + /** + * Returns whether to include the redo SQL in the source information block. + * + * @return if redo SQL is included in change events + */ + public boolean isLogMiningIncludeRedoSql() { + return logMiningIncludeRedoSql; + } + /** * Returns the logical source to stream changes from when connecting to OpenLogReplicator. * @@ -1967,4 +1989,20 @@ public static int validateRequiredWhenUsingOpenLogReplicator(Configuration confi } return 0; } + + public static int validateLogMiningIncludeRedoSql(Configuration config, Field field, ValidationOutput problems) { + if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) { + boolean lobEnabled = config.getBoolean(LOB_ENABLED); + if (lobEnabled && config.getBoolean(field)) { + problems.accept(field, config.getBoolean(field), String.format( + "The configuration property '%s' cannot be enabled when '%s' is set to true.", + field.name(), LOB_ENABLED.name())); + return 1; + } + } + else { + LOGGER.warn("The configuration property '{}' only applies to LogMiner and will be ignored.", field.name()); + } + return 0; + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index 48d71b2e1..4de483c58 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -297,6 +297,14 @@ public void setSsn(long ssn) { sourceInfo.setSsn(ssn); } + public String getRedoSql() { + return sourceInfo.getRedoSql(); + } + + public void setRedoSql(String redoSql) { + sourceInfo.setRedoSql(redoSql); + } + @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot() && !snapshotCompleted; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java index c7631c0ba..5f159c9fb 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java @@ -10,6 +10,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.AbstractSourceInfoStructMaker; +import io.debezium.util.Strings; public class OracleSourceInfoStructMaker extends AbstractSourceInfoStructMaker { @@ -28,7 +29,8 @@ public void init(String connector, String version, CommonConnectorConfig connect .field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA) .field(CommitScn.ROLLBACK_SEGMENT_ID_KEY, Schema.OPTIONAL_STRING_SCHEMA) .field(CommitScn.SQL_SEQUENCE_NUMBER_KEY, Schema.OPTIONAL_INT64_SCHEMA)) - .field(SourceInfo.USERNAME_KEY, Schema.OPTIONAL_STRING_SCHEMA).build(); + .field(SourceInfo.USERNAME_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.REDO_SQL, Schema.OPTIONAL_STRING_SCHEMA).build(); } @Override @@ -55,6 +57,9 @@ public Struct struct(SourceInfo sourceInfo) { if (sourceInfo.getRsId() != null) { ret.put(CommitScn.ROLLBACK_SEGMENT_ID_KEY, sourceInfo.getRsId()); } + if (!Strings.isNullOrBlank(sourceInfo.getRedoSql())) { + ret.put(SourceInfo.REDO_SQL, sourceInfo.getRedoSql()); + } ret.put(CommitScn.SQL_SEQUENCE_NUMBER_KEY, sourceInfo.getSsn()); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java index 9b7c9c25b..ff9c825bc 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java @@ -26,6 +26,7 @@ public class SourceInfo extends BaseSourceInfo { public static final String SNAPSHOT_KEY = "snapshot"; public static final String USERNAME_KEY = "user_name"; public static final String SCN_INDEX_KEY = "scn_idx"; + public static final String REDO_SQL = "redo_sql"; private Scn scn; private CommitScn commitScn; @@ -39,6 +40,7 @@ public class SourceInfo extends BaseSourceInfo { private String rsId; private long ssn; private Long scnIndex; + private String redoSql; protected SourceInfo(OracleConnectorConfig connectorConfig) { super(connectorConfig); @@ -116,6 +118,14 @@ public void setSourceTime(Instant sourceTime) { this.sourceTime = sourceTime; } + public String getRedoSql() { + return redoSql; + } + + public void setRedoSql(String redoSql) { + this.redoSql = redoSql; + } + public String tableSchema() { return (tableIds == null || tableIds.isEmpty()) ? null : tableIds.stream() diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/RedoSqlDmlEvent.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/RedoSqlDmlEvent.java new file mode 100644 index 000000000..ecd53b7e9 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/RedoSqlDmlEvent.java @@ -0,0 +1,38 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.events; + +import java.time.Instant; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry; +import io.debezium.relational.TableId; + +/** + * A specialization of {@link DmlEvent} that also stores the LogMiner REDO SQL statements. + * + * @author Chris Cranford + */ +public class RedoSqlDmlEvent extends DmlEvent { + + private String redoSql; + + public RedoSqlDmlEvent(LogMinerEventRow row, LogMinerDmlEntry dmlEntry, String redoSql) { + super(row, dmlEntry); + this.redoSql = redoSql; + } + + public RedoSqlDmlEvent(EventType eventType, Scn scn, TableId tableId, String rowId, String rsId, Instant changeTime, + LogMinerDmlEntry dmlEntry, String redoSql) { + super(eventType, scn, tableId, rowId, rsId, changeTime, dmlEntry); + this.redoSql = redoSql; + } + + public String getRedoSql() { + return redoSql; + } + +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 7112528f4..e112407b6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -49,6 +49,7 @@ import io.debezium.connector.oracle.logminer.events.LobWriteEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; +import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent; import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent; import io.debezium.connector.oracle.logminer.events.TruncateEvent; import io.debezium.connector.oracle.logminer.events.XmlBeginEvent; @@ -501,6 +502,10 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted offsetContext.setRedoThread(row.getThread()); offsetContext.setRsId(event.getRsId()); + if (event instanceof RedoSqlDmlEvent) { + offsetContext.setRedoSql(((RedoSqlDmlEvent) event).getRedoSql()); + } + final DmlEvent dmlEvent = (DmlEvent) event; if (!skipExcludedUserName) { LogMinerChangeRecordEmitter logMinerChangeRecordEmitter; @@ -531,8 +536,11 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted Clock.system()); } dispatcher.dispatchDataChangeEvent(partition, event.getTableId(), logMinerChangeRecordEmitter); - } + + // Clear redo SQL + offsetContext.setRedoSql(null); + } }; @@ -1089,6 +1097,9 @@ protected void handleDataEvent(LogMinerEventRow row) throws SQLException, Interr final LogMinerDmlEntry dmlEntry = parseDmlStatement(row.getRedoSql(), table); dmlEntry.setObjectName(row.getTableName()); dmlEntry.setObjectOwner(row.getTablespaceName()); + if (connectorConfig.isLogMiningIncludeRedoSql()) { + return new RedoSqlDmlEvent(row, dmlEntry, row.getRedoSql()); + } return new DmlEvent(row, dmlEntry); }); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventMarshaller.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventMarshaller.java index 01118ef8f..1690ff540 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventMarshaller.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventMarshaller.java @@ -17,6 +17,6 @@ */ @AutoProtoSchemaBuilder(includeClasses = { LogMinerEventAdapter.class, DmlEventAdapter.class, SelectLobLocatorEventAdapter.class, LobWriteEventAdapter.class, LobEraseEventAdapter.class, LogMinerDmlEntryImplAdapter.class, TruncateEventAdapter.class, XmlBeginEventAdapter.class, XmlWriteEventAdapter.class, - XmlEndEventAdapter.class }, schemaFilePath = "/") + XmlEndEventAdapter.class, RedoSqlDmlEventAdapter.class }, schemaFilePath = "/") public interface LogMinerEventMarshaller extends SerializationContextInitializer { } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/RedoSqlDmlEventAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/RedoSqlDmlEventAdapter.java new file mode 100644 index 000000000..3e1d15df6 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/RedoSqlDmlEventAdapter.java @@ -0,0 +1,67 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling; + +import java.time.Instant; + +import org.infinispan.protostream.annotations.ProtoAdapter; +import org.infinispan.protostream.annotations.ProtoFactory; +import org.infinispan.protostream.annotations.ProtoField; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.events.EventType; +import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent; +import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl; +import io.debezium.relational.TableId; + +/** + * An Infinispan ProtoStream adapter to marshall {@link RedoSqlDmlEvent} instances. + * + * This class defines a factory for creating {@link RedoSqlDmlEvent} instances when hydrating + * records from the persisted datastore as well as field handlers to extract values + * to be marshalled to the protocol buffer stream. + * + * The underlying protocol buffer record consists of the following structure: + *
+ *     message RedoSqlDmlEvent {
+ *         // structure of the super type, DmlEventAdapter
+ *         required string entry = 8;
+ *     }
+ * 
+ * @author Chris Cranford + */ +@ProtoAdapter(RedoSqlDmlEvent.class) +public class RedoSqlDmlEventAdapter extends DmlEventAdapter { + + /** + * A ProtoStream factory that creates {@link RedoSqlDmlEvent} instances. + * + * @param eventType the event type + * @param scn the system change number, must not be {@code null} + * @param tableId the fully-qualified table name + * @param rowId the Oracle row-id the change is associated with + * @param rsId the Oracle rollback segment identifier + * @param changeTime the time the change occurred + * @param entry the parsed SQL statement entry + * @param redoSql the redo sql + * @return the constructed RedoSqlDmlEvent + */ + @ProtoFactory + public RedoSqlDmlEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, LogMinerDmlEntryImpl entry, String redoSql) { + return new RedoSqlDmlEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), entry, redoSql); + } + + /** + * A ProtoStream handler to extract the {@code redoSql} field from the {@link RedoSqlDmlEvent}. + * + * @param event the event instance, must not be {@code null} + * @return the redo SQL statement + */ + @ProtoField(number = 8, required = true) + public String getRedoSql(RedoSqlDmlEvent event) { + return event.getRedoSql(); + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java index 59cf5097b..25635d77f 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java @@ -69,6 +69,7 @@ public void schemaIsCorrect() { .field("ssn", Schema.OPTIONAL_INT64_SCHEMA) .field("redo_thread", Schema.OPTIONAL_INT32_SCHEMA) .field("user_name", Schema.OPTIONAL_STRING_SCHEMA) + .field("redo_sql", Schema.OPTIONAL_STRING_SCHEMA) .build(); VerifyRecord.assertConnectSchemasAreEqual(null, source.schema(), schema); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerSourceInfoRedoSqlIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerSourceInfoRedoSqlIT.java new file mode 100644 index 000000000..3395aa90e --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerSourceInfoRedoSqlIT.java @@ -0,0 +1,120 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; + +import io.debezium.config.Configuration; +import io.debezium.connector.oracle.OracleConnection; +import io.debezium.connector.oracle.OracleConnector; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.SourceInfo; +import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; +import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot; +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.data.Envelope; +import io.debezium.data.VerifyRecord; +import io.debezium.doc.FixFor; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.util.Testing; + +/** + * Tests for LogMiner's source info block REDO SQL + * + * @author Chris Cranford + */ +@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Requires LogMiner") +public class LogMinerSourceInfoRedoSqlIT extends AbstractConnectorTest { + + @Rule + public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule(); + + private static OracleConnection connection; + + @BeforeClass + public static void beforeClass() throws SQLException { + connection = TestHelper.testConnection(); + } + + @AfterClass + public static void closeConnection() throws SQLException { + if (connection != null) { + connection.close(); + } + } + + @Before + public void before() throws SQLException { + setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS); + initializeConnectorTestFramework(); + Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH); + } + + @Test + @FixFor("DBZ-6960") + public void shouldStreamRedoSqlInSourceInfoBlockWhenEnabled() throws Exception { + testRedoSqlInSourceInfoBlock(true); + } + + @Test + @FixFor("DBZ-6960") + public void shouldNotIncludeRedoSqlInSourceInfoBlockWhenNotEnabled() throws Exception { + testRedoSqlInSourceInfoBlock(false); + } + + private void testRedoSqlInSourceInfoBlock(boolean includeRedoSql) throws Exception { + TestHelper.dropTable(connection, "dbz6960"); + try { + connection.execute("CREATE TABLE dbz6960 (id number(9,0) primary key, data varchar2(50))"); + TestHelper.streamTable(connection, "dbz6960"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.LOG_MINING_INCLUDE_REDO_SQL, Boolean.toString(includeRedoSql)) + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6960") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO dbz6960 (id,data) values (1, 'test')"); + + SourceRecords records = consumeRecordsByTopic(1); + List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ6960"); + assertThat(tableRecords).hasSize(1); + + VerifyRecord.isValidInsert(tableRecords.get(0), "ID", 1); + + Struct source = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.SOURCE); + assertThat(source.schema().field(SourceInfo.REDO_SQL)).isNotNull(); + + String redoSql = source.getString(SourceInfo.REDO_SQL); + if (includeRedoSql) { + assertThat(redoSql).isEqualTo("insert into \"DEBEZIUM\".\"DBZ6960\"(\"ID\",\"DATA\") values ('1','test');"); + } + else { + assertThat(redoSql).isNull(); + } + } + finally { + TestHelper.dropTable(connection, "dbz6960"); + } + } +} diff --git a/documentation/modules/ROOT/pages/connectors/oracle.adoc b/documentation/modules/ROOT/pages/connectors/oracle.adoc index c0b92b271..9e1a6c314 100644 --- a/documentation/modules/ROOT/pages/connectors/oracle.adoc +++ b/documentation/modules/ROOT/pages/connectors/oracle.adoc @@ -3718,6 +3718,13 @@ If the difference between the timestamps is less than the specified value, and t Typically, multiple connectors can use the same flush table. However, if connectors encounter table lock contention errors, use this property to specify a dedicated table for each connector deployment. +|[[oracle-property-log-mining-include-redo-sql]]<> +|`false` +|Specifies whether the redo log constructed SQL statement is included in `source.redo_sql` field. +ifdef::community[] +This configuration is ignored when using XStream or OpenLogReplicator adapters. +endif::community[] + |[[oracle-property-lob-enabled]]<> |`false` |Controls whether or not large object (CLOB or BLOB) column values are emitted in change events. +