DBZ-3290 Suggested changes

* Use {debezium-version} variable in examples
* Correct typos
* Document non-support for CLOB/BLOB data types
* Fix rendering of <= with unicode character
* Moved XStream documentation into separate section
* Moved database.oracle.version to internal
* Removed reference to database.oracle.version
* Resolve XStream POS version from OracleDatabaseVersion as fallback
  when internal.database.oracle.version is not supplied.
This commit is contained in:
Chris Cranford 2021-03-18 13:08:55 -04:00 committed by Chris Cranford
parent 2516cf9e07
commit 1d9594ed88
5 changed files with 246 additions and 269 deletions

View File

@ -61,7 +61,6 @@ It's important to remember, these are only required for Oracle 11 and should not
```json ```json
"database.tablename.case.insensitive": "true", "database.tablename.case.insensitive": "true",
"database.oracle.version": "11"
``` ```
Additionally, the connector ignores several built-in tables and schemas in Oracle 12+ but those tables differ in Oracle 11. Additionally, the connector ignores several built-in tables and schemas in Oracle 12+ but those tables differ in Oracle 11.

View File

@ -31,7 +31,6 @@
import io.debezium.connector.oracle.logminer.NeverHistoryRecorder; import io.debezium.connector.oracle.logminer.NeverHistoryRecorder;
import io.debezium.connector.oracle.logminer.SqlUtils; import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.connector.oracle.xstream.LcrPosition; import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.connector.oracle.xstream.OracleVersion;
import io.debezium.document.Document; import io.debezium.document.Document;
import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.ColumnFilterMode; import io.debezium.relational.ColumnFilterMode;
@ -105,11 +104,11 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withImportance(Importance.LOW) .withImportance(Importance.LOW)
.withDescription("Deprecated: Case insensitive table names; set to 'true' for Oracle 11g, 'false' (default) otherwise."); .withDescription("Deprecated: Case insensitive table names; set to 'true' for Oracle 11g, 'false' (default) otherwise.");
public static final Field ORACLE_VERSION = Field.create("database.oracle.version") public static final Field ORACLE_VERSION = Field.createInternal("database.oracle.version")
.withDisplayName("Oracle version, 11 or 12+") .withDisplayName("Oracle version, 11 or 12+")
.withEnum(OracleVersion.class, OracleVersion.V12Plus) .withType(Type.STRING)
.withImportance(Importance.LOW) .withImportance(Importance.LOW)
.withDescription("For default Oracle 12+, use default pos_version value v2, for Oracle 11, use pos_version value v1."); .withDescription("Deprecated: For default Oracle 12+, use default pos_version value v2, for Oracle 11, use pos_version value v1.");
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
.withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName); .withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName);
@ -344,7 +343,7 @@ public static ConfigDef configDef() {
private final SnapshotMode snapshotMode; private final SnapshotMode snapshotMode;
private final Boolean tablenameCaseInsensitive; private final Boolean tablenameCaseInsensitive;
private final OracleVersion oracleVersion; private final String oracleVersion;
private final HistoryRecorder logMiningHistoryRecorder; private final HistoryRecorder logMiningHistoryRecorder;
private final Configuration jdbcConfig; private final Configuration jdbcConfig;
private final ConnectorAdapter connectorAdapter; private final ConnectorAdapter connectorAdapter;
@ -376,7 +375,7 @@ public OracleConnectorConfig(Configuration config) {
this.xoutServerName = config.getString(XSTREAM_SERVER_NAME); this.xoutServerName = config.getString(XSTREAM_SERVER_NAME);
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE)); this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
this.tablenameCaseInsensitive = resolveTableNameCaseInsensitivity(config); this.tablenameCaseInsensitive = resolveTableNameCaseInsensitivity(config);
this.oracleVersion = OracleVersion.parse(config.getString(ORACLE_VERSION)); this.oracleVersion = config.getString(ORACLE_VERSION);
this.logMiningHistoryRecorder = resolveLogMiningHistoryRecorder(config); this.logMiningHistoryRecorder = resolveLogMiningHistoryRecorder(config);
this.jdbcConfig = config.subset(DATABASE_CONFIG_PREFIX, true); this.jdbcConfig = config.subset(DATABASE_CONFIG_PREFIX, true);
this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN); this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN);
@ -442,7 +441,7 @@ public Optional<Boolean> getTablenameCaseInsensitive() {
return Optional.ofNullable(tablenameCaseInsensitive); return Optional.ofNullable(tablenameCaseInsensitive);
} }
public OracleVersion getOracleVersion() { public String getOracleVersion() {
return oracleVersion; return oracleVersion;
} }

View File

@ -1,68 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.xstream;
import io.debezium.config.EnumeratedValue;
import oracle.streams.XStreamUtility;
/**
* This enum class includes Oracle 11 and 12 major versions
* It returns position version based on the Oracle release.
* Position version get used in conversion SCN (system change number) into position
*/
public enum OracleVersion implements EnumeratedValue {
V11("11"),
V12Plus("12+");
private final String version;
OracleVersion(String version) {
this.version = version;
}
@Override
public String getValue() {
return version;
}
public int getPosVersion() {
switch (version) {
case "11":
return XStreamUtility.POS_VERSION_V1;
case "12+":
return XStreamUtility.POS_VERSION_V2;
default:
return XStreamUtility.POS_VERSION_V2;
}
}
public static OracleVersion parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (OracleVersion option : OracleVersion.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
public static OracleVersion parse(String value, String defaultValue) {
OracleVersion option = parse(value);
if (option == null && defaultValue != null) {
option = parse(defaultValue);
}
return option;
}
}

View File

@ -15,6 +15,7 @@
import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleDatabaseVersion;
import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.SourceInfo; import io.debezium.connector.oracle.SourceInfo;
@ -23,6 +24,7 @@
import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.util.Clock; import io.debezium.util.Clock;
import io.debezium.util.Strings;
import oracle.sql.NUMBER; import oracle.sql.NUMBER;
import oracle.streams.StreamsException; import oracle.streams.StreamsException;
@ -68,7 +70,7 @@ public XstreamStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
this.offsetContext = offsetContext; this.offsetContext = offsetContext;
this.xStreamServerName = connectorConfig.getXoutServerName(); this.xStreamServerName = connectorConfig.getXoutServerName();
this.tablenameCaseInsensitive = jdbcConnection.getTablenameCaseInsensitivity(connectorConfig); this.tablenameCaseInsensitive = jdbcConnection.getTablenameCaseInsensitivity(connectorConfig);
this.posVersion = connectorConfig.getOracleVersion().getPosVersion(); this.posVersion = resolvePosVersion(jdbcConnection, connectorConfig);
} }
@Override @Override
@ -153,4 +155,22 @@ private void sendPublishedPosition(final LcrPosition lcrPosition, final Scn scn)
PositionAndScn receivePublishedPosition() { PositionAndScn receivePublishedPosition() {
return lcrMessage.getAndSet(null); return lcrMessage.getAndSet(null);
} }
private static int resolvePosVersion(OracleConnection connection, OracleConnectorConfig connectorConfig) {
// Option 'internal.database.oracle.version' takes precedence
final String oracleVersion = connectorConfig.getOracleVersion();
if (!Strings.isNullOrEmpty(oracleVersion)) {
if ("11".equals(oracleVersion)) {
return XStreamUtility.POS_VERSION_V1;
}
return XStreamUtility.POS_VERSION_V2;
}
// As fallback, resolve this based on the OracleDatabaseVersion
final OracleDatabaseVersion databaseVersion = connection.getOracleVersion();
if (databaseVersion.getMajor() == 11) {
return XStreamUtility.POS_VERSION_V1;
}
return XStreamUtility.POS_VERSION_V2;
}
} }

View File

@ -436,7 +436,7 @@ And of course, the _schema_ portion of the event message's value contains a sche
Let's look at what a _create_ event value might look like for our `customers` table: Let's look at what a _create_ event value might look like for our `customers` table:
[source,json,indent=0,subs="attributes"] [source,json,indent=0,subs="+attributes"]
---- ----
{ {
"schema": { "schema": {
@ -564,7 +564,7 @@ Let's look at what a _create_ event value might look like for our `customers` ta
"EMAIL": "annek@noanswer.org" "EMAIL": "annek@noanswer.org"
}, },
"source": { "source": {
"version": "0.9.0.Alpha1", "version": "{debezium-version}",
"name": "server1", "name": "server1",
"ts_ms": 1520085154000, "ts_ms": 1520085154000,
"txId": "6.28.807", "txId": "6.28.807",
@ -600,7 +600,7 @@ It is possible and even recommended to use the link:/docs/faq/#avro-converter[Av
The value of an _update_ change event on this table will actually have the exact same _schema_, and its payload will be structured the same but will hold different values. The value of an _update_ change event on this table will actually have the exact same _schema_, and its payload will be structured the same but will hold different values.
Here's an example: Here's an example:
[source,json,indent=0,subs="attributes"] [source,json,indent=0,subs="+attributes"]
---- ----
{ {
"schema": { ... }, "schema": { ... },
@ -618,7 +618,7 @@ Here's an example:
"EMAIL": "anne@example.com" "EMAIL": "anne@example.com"
}, },
"source": { "source": {
"version": "0.9.0.Alpha1", "version": "{debezium-version}",
"name": "server1", "name": "server1",
"ts_ms": 1520085811000, "ts_ms": 1520085811000,
"txId": "6.9.809", "txId": "6.9.809",
@ -654,7 +654,7 @@ When the columns for a row's primary/unique key are updated, the value of the ro
So far we've seen samples of _create_ and _update_ events. So far we've seen samples of _create_ and _update_ events.
Now, let's look at the value of a _delete_ event for the same table. Once again, the `schema` portion of the value will be exactly the same as with the _create_ and _update_ events: Now, let's look at the value of a _delete_ event for the same table. Once again, the `schema` portion of the value will be exactly the same as with the _create_ and _update_ events:
[source,json,indent=0,subs="attributes"] [source,json,indent=0,subs="+attributes"]
---- ----
{ {
"schema": { ... }, "schema": { ... },
@ -667,7 +667,7 @@ Now, let's look at the value of a _delete_ event for the same table. Once again,
}, },
"after": null, "after": null,
"source": { "source": {
"version": "0.9.0.Alpha1", "version": "{debezium-version}",
"name": "server1", "name": "server1",
"ts_ms": 1520085153000, "ts_ms": 1520085153000,
"txId": "6.28.807", "txId": "6.28.807",
@ -703,9 +703,9 @@ To make this possible, {prodname}'s Oracle connector always follows the _delete_
[[oracle-data-type-mappings]] [[oracle-data-type-mappings]]
== Data Type mappings == Data Type mappings
The Oracle conenctor represents changes to rows with events that are structured like the table in which the rows exists. The event contains a field for each column value. How that value is represented in the event depends on the Oracle data type of the column. The following sections describe how the connector maps oracle data types to a _litearl type_ and a _semantic type_ in event fields. The Oracle connector represents changes to rows with events that are structured like the table in which the rows exists. The event contains a field for each column value. How that value is represented in the event depends on the Oracle data type of the column. The following sections describe how the connector maps oracle data types to a _litearl type_ and a _semantic type_ in event fields.
* _litearl type_ describes how the value is literally represented using Kafka Connect schema types: `INT8`, `INT16`, `INT32`, `INT64`, `FLOAT32`, `FLOAT64`, `BOOLEAN`, `STRING`, `BYTES`, `ARRAY`, `MAP`, and `STRUCT`. * _literal type_ describes how the value is literally represented using Kafka Connect schema types: `INT8`, `INT16`, `INT32`, `INT64`, `FLOAT32`, `FLOAT64`, `BOOLEAN`, `STRING`, `BYTES`, `ARRAY`, `MAP`, and `STRUCT`.
* _semantic type_ describes how the Kafka Connect schema captures the _meaning_ of the field using the name of the Kafka Connect schema for the field. * _semantic type_ describes how the Kafka Connect schema captures the _meaning_ of the field using the name of the Kafka Connect schema for the field.
@ -725,21 +725,29 @@ Please file a {jira-url}/browse/DBZ[JIRA issue] for any specific types that may
endif::[] endif::[]
[id="oracle-character-types"] [id="oracle-character-types"]
=== Character types === Character and BLOB types
The following table describes how the connector maps character types. The following table describes how the connector maps character and blob types.
.Mappings for Oracle character data types .Mappings for Oracle character and blob data types
[cols="20%a,15%a,55%a",options="header"] [cols="20%a,15%a,55%a",options="header"]
|=== |===
|Oracle Data Type |Oracle Data Type
|Literal type (schema type) |Literal type (schema type)
|Semantic type (schema name) and Notes |Semantic type (schema name) and Notes
|`BLOB`
|n/a
|_This data type is not yet supported._
|`CHAR[(M)]` |`CHAR[(M)]`
|`STRING` |`STRING`
|n/a |n/a
|`CLOB`
|n/a
|_This data type is not yet supported._
|`NCHAR[(M)]` |`NCHAR[(M)]`
|`STRING` |`STRING`
|n/a |n/a
@ -812,11 +820,7 @@ Contains a structure with two fields: `scale` of type `INT32` that contains the
+ +
Contains a structure with two fields: `scale` of type `INT32` that contains the scale of the transferred value and `value` of type `BYTES` containing the original value in an unscaled form. Contains a structure with two fields: `scale` of type `INT32` that contains the scale of the transferred value and `value` of type `BYTES` containing the original value in an unscaled form.
|`NUMBER(P, S > 0)` |`NUMBER(P, S \<= 0)`
|`BYTES`
|`org.apache.kafka.connect.data.Decimal`
|`NUMBER(P, S <= 0)`
|`INT8` / `INT16` / `INT32` / `INT64` |`INT8` / `INT16` / `INT32` / `INT64`
|`NUMBER` columns with a scale of 0 represent integer numbers; a negative scale indicates rounding in Oracle, e.g. a scale of -2 will cause rounding to hundreds. + |`NUMBER` columns with a scale of 0 represent integer numbers; a negative scale indicates rounding in Oracle, e.g. a scale of -2 will cause rounding to hundreds. +
+ +
@ -828,6 +832,10 @@ Depending on the precision and scale, a matching Kafka Connect integer type will
* P - S < 19, `INT64` + * P - S < 19, `INT64` +
* P - S >= 19, `BYTES` (`org.apache.kafka.connect.data.Decimal`). * P - S >= 19, `BYTES` (`org.apache.kafka.connect.data.Decimal`).
|`NUMBER(P, S > 0)`
|`BYTES`
|`org.apache.kafka.connect.data.Decimal`
|`NUMERIC[(P, S)]` |`NUMERIC[(P, S)]`
|`BYTES` / `INT8` / `INT16` / `INT32` / `INT64` |`BYTES` / `INT8` / `INT16` / `INT32` / `INT64`
|`org.apache.kafka.connect.data.Decimal` if using `BYTES` + |`org.apache.kafka.connect.data.Decimal` if using `BYTES` +
@ -1026,25 +1034,6 @@ archive log list
exit; exit;
---- ----
.Configuration needed for Oracle XStream
[source,indent=0]
----
ORACLE_SID=ORCLCDB dbz_oracle sqlplus /nolog
CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
alter system set enable_goldengate_replication=true;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode"
archive log list
exit;
----
In addition, supplemental logging must be enabled for captured tables or the database in order for data changes to capture the _before_ state of changed database rows. In addition, supplemental logging must be enabled for captured tables or the database in order for data changes to capture the _before_ state of changed database rows.
The following illustrates how to configure this on a specific table, which is the ideal choice to minimize the amount of information captured in the Oracle redo logs. The following illustrates how to configure this on a specific table, which is the ideal choice to minimize the amount of information captured in the Oracle redo logs.
@ -1058,9 +1047,6 @@ ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
The {prodname} Oracle connector requires that users accounts be set up with specific permissions so that the connector can capture change events. The {prodname} Oracle connector requires that users accounts be set up with specific permissions so that the connector can capture change events.
The following briefly describes these user configurations using a multi-tenant database model. The following briefly describes these user configurations using a multi-tenant database model.
* <<oracle-create-users-logminer, `Creating users for Oracle LogMiner`>>
* <<oracle-create-users-xstream, `Creating users for Oracle XStream`>>
[[oracle-create-users-logminer]] [[oracle-create-users-logminer]]
.Creating the connector's LogMiner user .Creating the connector's LogMiner user
[source,indent=0] [source,indent=0]
@ -1112,117 +1098,6 @@ sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
exit; exit;
---- ----
[[oracle-create-users-xstream]]
.Creating an XStream Administrator user
[source,indent=0]
----
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_adm_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_adm_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE USER c##dbzadmin IDENTIFIED BY dbz
DEFAULT TABLESPACE xstream_adm_tbs
QUOTA UNLIMITED ON xstream_adm_tbs
CONTAINER=ALL;
GRANT CREATE SESSION, SET CONTAINER TO c##dbzadmin CONTAINER=ALL;
BEGIN
DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'c##dbzadmin',
privilege_type => 'CAPTURE',
grant_select_privileges => TRUE,
container => 'ALL'
);
END;
/
exit;
----
.Creating the connector's XStream user
[source,indent=0]
----
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE USER c##dbzuser IDENTIFIED BY dbz
DEFAULT TABLESPACE xstream_tbs
QUOTA UNLIMITED ON xstream_tbs
CONTAINER=ALL;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
exit;
----
=== Create an XStream Outbound Server
[NOTE]
====
If you're using the default adapter, LogMiner, then this step can be skipped.
====
Create an https://docs.oracle.com/cd/E11882_01/server.112/e16545/xstrm_cncpt.htm#XSTRM1088[XStream Outbound server]
(given the right privileges, this may be done automatically by the connector going forward, see {jira-url}/browse/DBZ-721[DBZ-721]):
.Create an XStream Outbound Server
[source,indent=0]
----
sqlplus c##dbzadmin/dbz@//localhost:1521/ORCLCDB
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := NULL;
schemas(1) := 'debezium';
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'dbzxout',
table_names => tables,
schema_names => schemas);
END;
/
exit;
----
.Configure the XStream user account to connect to the XStream Outbound Server
[source,indent=0]
----
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
BEGIN
DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
server_name => 'dbzxout',
connect_user => 'c##dbzuser');
END;
/
exit;
----
[NOTE]
====
A single XStream Outbound server cannot be shared by multiple {prodname} Oracle connectors.
Each connector requires a unique XStream Outbound connector to be configured.
====
[[oracle-deploying-a-connector]] [[oracle-deploying-a-connector]]
== Deploying a Connector == Deploying a Connector
@ -1330,41 +1205,10 @@ The {prodname} Oracle connector supports both deployment practices of pluggable
[WARNING] [WARNING]
==== ====
When using CDB installations, specify `database.pbd.name`. + When using CDB installations, specify `database.pdb.name`. +
When using a non-CDB installation, do *not* specify the `database.pdb.name`. When using a non-CDB installation, do *not* specify the `database.pdb.name`.
==== ====
[[selecting-the-adapter]]
=== Selecting the adapter
{prodname} provides multiple ways to ingest change events from Oracle.
By default {prodname} uses Oracle LogMiner but you may wish to use the Oracle XStream API for your installation.
The following example configuration illustrates that by adding the `database.connection.adapter` and `database.out.server.name`,
the connector can be toggled to use the XStream API implementation.
[source,json,indent=0]
----
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "<oracle ip>",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCLCDB",
"database.pdb.name" : "ORCLPDB1",
"database.history.kafka.bootstrap.servers" : "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"database.connection.adapter": "xstream",
"database.out.server.name" : "dbzxout",
}
}
----
[[oracle-connector-properties]] [[oracle-connector-properties]]
=== Connector Properties === Connector Properties
@ -1412,21 +1256,10 @@ The following configuration properties are _required_ unless a default value is
| |
|Raw database jdbc url. This property can be used when more flexibility is needed and can support raw TNS names or RAC connection strings. |Raw database jdbc url. This property can be used when more flexibility is needed and can support raw TNS names or RAC connection strings.
|[[oracle-property-database-oracle-version]]<<oracle-property-database-oracle-version, `+database.oracle.version+`>>
|`12+`
|Specifies how to decode the Oracle SCN values.
`11` should be used when connecting to Oracle 11 databases.
`12+` (the default) should be used when connecting to Oracle 12 or later databases.
This field is only applicable when using the Oracle XStreams connector adapter.
|[[oracle-property-database-pdb-name]]<<oracle-property-database-pdb-name, `+database.pdb.name+`>> |[[oracle-property-database-pdb-name]]<<oracle-property-database-pdb-name, `+database.pdb.name+`>>
| |
|Name of the PDB to connect to, when working with the CDB + PDB model. |Name of the PDB to connect to, when working with the CDB + PDB model.
|[[oracle-property-database-out-server-name]]<<oracle-property-database-out-server-name, `+database.out.server.name+`>>
|
|Name of the XStream outbound server configured in the database.
|[[oracle-property-database-server-name]]<<oracle-property-database-server-name, `+database.server.name+`>> |[[oracle-property-database-server-name]]<<oracle-property-database-server-name, `+database.server.name+`>>
| |
|Logical name that identifies and provides a namespace for the particular Oracle database server being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names emanating from this connector. |Logical name that identifies and provides a namespace for the particular Oracle database server being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names emanating from this connector.
@ -2004,6 +1837,200 @@ The message does not contain the update to the schema, but the complete new sche
|=== |===
[[oracle-xstreams-support]]
== XStreams support
The {prodname} Oracle connector by default ingests changes using native Oracle LogMiner.
The connector can be toggled to use Oracle XStream instead and to do so specific database and connector configurations must be used that differ from that of LogMiner.
=== Preparing the Database
.Configuration needed for Oracle XStream
[source,indent=0]
----
ORACLE_SID=ORCLCDB dbz_oracle sqlplus /nolog
CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
alter system set enable_goldengate_replication=true;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode"
archive log list
exit;
----
In addition, supplemental logging must be enabled for captured tables or the database in order for data changes to capture the _before_ state of changed database rows.
The following illustrates how to configure this on a specific table, which is the ideal choice to minimize the amount of information captured in the Oracle redo logs.
[source,indent=0]
----
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
----
=== Creating Users for the connector
The {prodname} Oracle connector requires that users accounts be set up with specific permissions so that the connector can capture change events.
The following briefly describes these user configurations using a multi-tenant database model.
[[oracle-create-users-xstream]]
.Creating an XStream Administrator user
[source,indent=0]
----
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_adm_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_adm_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE USER c##dbzadmin IDENTIFIED BY dbz
DEFAULT TABLESPACE xstream_adm_tbs
QUOTA UNLIMITED ON xstream_adm_tbs
CONTAINER=ALL;
GRANT CREATE SESSION, SET CONTAINER TO c##dbzadmin CONTAINER=ALL;
BEGIN
DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'c##dbzadmin',
privilege_type => 'CAPTURE',
grant_select_privileges => TRUE,
container => 'ALL'
);
END;
/
exit;
----
.Creating the connector's XStream user
[source,indent=0]
----
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE USER c##dbzuser IDENTIFIED BY dbz
DEFAULT TABLESPACE xstream_tbs
QUOTA UNLIMITED ON xstream_tbs
CONTAINER=ALL;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
exit;
----
=== Create an XStream Outbound Server
Create an https://docs.oracle.com/cd/E11882_01/server.112/e16545/xstrm_cncpt.htm#XSTRM1088[XStream Outbound server]
(given the right privileges, this may be done automatically by the connector going forward, see {jira-url}/browse/DBZ-721[DBZ-721]):
.Create an XStream Outbound Server
[source,indent=0]
----
sqlplus c##dbzadmin/dbz@//localhost:1521/ORCLCDB
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := NULL;
schemas(1) := 'debezium';
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'dbzxout',
table_names => tables,
schema_names => schemas);
END;
/
exit;
----
.Configure the XStream user account to connect to the XStream Outbound Server
[source,indent=0]
----
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
BEGIN
DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
server_name => 'dbzxout',
connect_user => 'c##dbzuser');
END;
/
exit;
----
[NOTE]
====
A single XStream Outbound server cannot be shared by multiple {prodname} Oracle connectors.
Each connector requires a unique XStream Outbound connector to be configured.
====
[[selecting-the-adapter]]
=== Configuring the XStream adapter
By default {prodname} uses Oracle LogMiner to ingest change events from Oracle.
In order to use Oracle XStreams, the connector configuration must be adjusted to enable this adapter.
The following example configuration illustrates that by adding the `database.connection.adapter` and `database.out.server.name`,
the connector can be toggled to use the XStream API implementation.
[source,json,indent=0]
----
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "<oracle ip>",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCLCDB",
"database.pdb.name" : "ORCLPDB1",
"database.history.kafka.bootstrap.servers" : "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"database.connection.adapter": "xstream",
"database.out.server.name" : "dbzxout"
}
}
----
[[oracle-xstreams-connector-properties]]
=== Connector properties
The following configuration properties are _required_ when using XStreams unless a default value is available.
[cols="30%a,25%a,45%a"]
|===
|Property
|Default
|Description
|[[oracle-property-database-out-server-name]]<<oracle-property-database-out-server-name, `+database.out.server.name+`>>
|
|Name of the XStream outbound server configured in the database.
|===
[[oracle-when-things-go-wrong]] [[oracle-when-things-go-wrong]]
== Behavior when things go wrong == Behavior when things go wrong