diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 95d1e7b51..185c47633 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -1707,6 +1707,95 @@ record = records.recordsForTopic("server1.DEBEZIUM.DBZ832").get(0); } } + @Test + @FixFor("DBZ-1211") + public void shouldSnapshotAndStreamTablesWithUniqueIndexPrimaryKey() throws Exception { + TestHelper.dropTables(connection, "dbz1211_child", "dbz1211"); + try { + connection.execute("create table dbz1211 (id numeric(9,0), data varchar2(50), constraint pkdbz1211 primary key (id) using index)"); + connection.execute("alter table dbz1211 add constraint xdbz1211 unique (id,data) using index"); + connection + .execute("create table dbz1211_child (id numeric(9,0), data varchar2(50), constraint fk1211 foreign key (id) references dbz1211 on delete cascade)"); + connection.execute("alter table dbz1211_child add constraint ydbz1211 unique (id,data) using index"); + TestHelper.streamTable(connection, "dbz1211"); + TestHelper.streamTable(connection, "dbz1211_child"); + + connection.executeWithoutCommitting("INSERT INTO dbz1211 values (1, 'Test')"); + connection.executeWithoutCommitting("INSERT INTO dbz1211_child values (1, 'Child')"); + connection.commit(); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ1211,DEBEZIUM\\.DBZ1211\\_CHILD") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + SourceRecords records = consumeRecordsByTopic(2); + assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ1211")).hasSize(1); + assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD")).hasSize(1); + + SourceRecord record = records.recordsForTopic("server1.DEBEZIUM.DBZ1211").get(0); + Struct key = (Struct) record.key(); + assertThat(key).isNotNull(); + assertThat(key.get("ID")).isEqualTo(1); + Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("DATA")).isEqualTo("Test"); + + record = records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD").get(0); + key = (Struct) record.key(); + assertThat(key).isNotNull(); + assertThat(key.get("ID")).isEqualTo(1); + assertThat(key.get("DATA")).isEqualTo("Child"); + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("DATA")).isEqualTo("Child"); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO dbz1211 values (2, 'Test2')"); + connection.executeWithoutCommitting("INSERT INTO dbz1211_child values (1, 'Child1-2')"); + connection.executeWithoutCommitting("INSERT INTO dbz1211_child values (2, 'Child2-1')"); + connection.commit(); + + records = consumeRecordsByTopic(3); + assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ1211")).hasSize(1); + assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD")).hasSize(2); + + record = records.recordsForTopic("server1.DEBEZIUM.DBZ1211").get(0); + key = (Struct) record.key(); + assertThat(key).isNotNull(); + assertThat(key.get("ID")).isEqualTo(2); + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("ID")).isEqualTo(2); + assertThat(after.get("DATA")).isEqualTo("Test2"); + + record = records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD").get(0); + key = (Struct) record.key(); + assertThat(key).isNotNull(); + assertThat(key.get("ID")).isEqualTo(1); + assertThat(key.get("DATA")).isEqualTo("Child1-2"); + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("DATA")).isEqualTo("Child1-2"); + + record = records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD").get(1); + key = (Struct) record.key(); + assertThat(key).isNotNull(); + assertThat(key.get("ID")).isEqualTo(2); + assertThat(key.get("DATA")).isEqualTo("Child2-1"); + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("ID")).isEqualTo(2); + assertThat(after.get("DATA")).isEqualTo("Child2-1"); + } + finally { + TestHelper.dropTables(connection, "dbz1211_child", "dbz1211"); + } + } + @Test @FixFor("DBZ-3322") public void shouldNotEmitEventsOnConstraintViolations() throws Exception { diff --git a/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_alter.sql b/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_alter.sql index d0a9c0f01..fbab69120 100644 --- a/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_alter.sql +++ b/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_alter.sql @@ -11,3 +11,6 @@ ALTER TABLE customers MODIFY PARTITION south_india ADD VALUES ('KOCHI', 'MANGALO ALTER TABLE customers MODIFY PARTITION south_india DROP VALUES ('KOCHI','MANGALORE'); ALTER TABLE sales split partition p5 into (Partition p6 values less than (1996), Partition p7 values less than (MAXVALUE)); ALTER TABLE sales truncate partition p5; +-- Alter table add unique index +alter table dbz1211 add constraint name unique (id,data) using index tablespace ts; +alter table dbz1211_child add constraint name unique (id) using index tablespace ts; \ No newline at end of file diff --git a/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_create.sql b/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_create.sql index 34628c6e6..145764935 100644 --- a/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_create.sql +++ b/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_create.sql @@ -3,4 +3,7 @@ create table debezium.products (id NUMBER(4) GENERATED BY DEFAULT ON NULL AS IDE create table debezium.products (id NUMBER(4) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 101 INCREMENT BY 1 CYCLE CACHE 200) NOT NULL PRIMARY KEY, name VARCHAR2(255) NOT NULL, description VARCHAR2(512), weight FLOAT); create global temporary table sys.ora_temp_1_ds_1550399 sharing=none on commit preserve rows cache noparallel as select /*+ no_parallel(t) no_parallel_index(t) dbms_stats cursor_sharing_exact use_weak_name_resl dynamic_sampling(0) no_monitoring xmlindex_sel_idx_tbl opt_param('optimizer_inmemory_aware' 'false') no_substrb_pad */"ENTRYUUID", rowid SYS_DS_ALIAS_0 from "IDENTITYDB"."OAUTH2_CLIENT_CHANGE_LOGS" sample ( 10.0000000000) t WHERE 1 = 2; CREATE TABLE "ABCD_SHARD_1_3"."D_PLAN_SCHEDULE_LOT_ENTRY"("ID" NUMBER(38,0) NOT NULL ENABLE, "LOT_ID" NUMBER(38,0) NOT NULL ENABLE, "PLAN_SCHEDULE_ID" NUMBER(38,0) NOT NULL ENABLE, "IS_ACTUAL" NUMBER(1,0) NOT NULL ENABLE, "OOS_POSITION_NUMBER" NVARCHAR2(50) DEFAULT 0, CONSTRAINT "PK_PSLE_ENTRY_ID" PRIMARY KEY ("ID") USING INDEX ENABLE, SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS, SUPPLEMENTAL LOG DATA (UNIQUE INDEX) COLUMNS, SUPPLEMENTAL LOG DATA (FOREIGN KEY) COLUMNS, CONSTRAINT "FK_PSLE_LOT_VERSION" FOREIGN KEY ("LOT_ID") REFERENCES "ABCD_SHARD_1_3"."D_LOT_VERSION" ("ID") DISABLE, CONSTRAINT "FK_PSLE_PLAN_SCHEDULE_VERSION" FOREIGN KEY ("PLAN_SCHEDULE_ID") REFERENCES "ABCD_SHARD_1_3"."D_PLAN_SCHEDULE_VERSION" ("ID") ENABLE ); -CREATE TABLE IDATA_BAIIR.CUST_INFO (ID NUMBER(20) NOT NULL ENABLE, CUST_NAME VARCHAR2(200 CHAR) NOT NULL ENABLE, CUST_NAME_PY VARCHAR2(500 CHAR), CUST_NAME_PY_SZ VARCHAR2(50 CHAR), CUST_NAME_ABBR VARCHAR2(100 CHAR), CUST_NAME_ABBR_PY VARCHAR2(300 CHAR), CUST_NAME_ABBR_PY_SZ VARCHAR2(50 CHAR), CUST_NAME_EN VARCHAR2(100 CHAR), CUST_TYPE NUMBER(10), CUST_STATUS NUMBER(10), CUST_LEVEL NUMBER(10), MEMO VARCHAR2(500 CHAR), TEL_NUM VARCHAR2(100 CHAR), FAX_NUM VARCHAR2(100 CHAR), INTERNET_ADDRESS VARCHAR2(200 CHAR), COMPANY_ADDRESS VARCHAR2(200 CHAR), POST_CODE VARCHAR2(50 CHAR), EMAIL_ADDRESS VARCHAR2(200 CHAR), CUST_VALID_FLAG NUMBER(1), EXP_DATE DATE, CERT_TYPE NUMBER(10), CERT_NUM VARCHAR2(100 CHAR), CERT_CUST_NAME VARCHAR2(200 CHAR), RISK_LEVEL NUMBER(10), FXPP_CONFIRM_PROCESS NUMBER(10), FXPP_CONFIRM_PROCESS_INFO NUMBER(10), SDX_INFO_AUDITOR VARCHAR2(100 CHAR), SDX_NEW_COM NUMBER(1), ECIF_OTC_FLAG NUMBER(1), AUDIT_STATUS NUMBER(1), IS_VALID NUMBER(1) DEFAULT 1 NOT NULL ENABLE, CREATE_TIME DATE, CREATOR NUMBER(20), CREATOR_NAME VARCHAR2(30 CHAR), MODIFY_TIME DATE, MODIFIER NUMBER(20), MODIFIER_NAME VARCHAR2(30 CHAR)) tablespace BIGDATADBT pctfree 10 initrans 1 maxtrans 255 storage ( initial 192K next 1M minextents 1 maxextents unlimited ); \ No newline at end of file +CREATE TABLE IDATA_BAIIR.CUST_INFO (ID NUMBER(20) NOT NULL ENABLE, CUST_NAME VARCHAR2(200 CHAR) NOT NULL ENABLE, CUST_NAME_PY VARCHAR2(500 CHAR), CUST_NAME_PY_SZ VARCHAR2(50 CHAR), CUST_NAME_ABBR VARCHAR2(100 CHAR), CUST_NAME_ABBR_PY VARCHAR2(300 CHAR), CUST_NAME_ABBR_PY_SZ VARCHAR2(50 CHAR), CUST_NAME_EN VARCHAR2(100 CHAR), CUST_TYPE NUMBER(10), CUST_STATUS NUMBER(10), CUST_LEVEL NUMBER(10), MEMO VARCHAR2(500 CHAR), TEL_NUM VARCHAR2(100 CHAR), FAX_NUM VARCHAR2(100 CHAR), INTERNET_ADDRESS VARCHAR2(200 CHAR), COMPANY_ADDRESS VARCHAR2(200 CHAR), POST_CODE VARCHAR2(50 CHAR), EMAIL_ADDRESS VARCHAR2(200 CHAR), CUST_VALID_FLAG NUMBER(1), EXP_DATE DATE, CERT_TYPE NUMBER(10), CERT_NUM VARCHAR2(100 CHAR), CERT_CUST_NAME VARCHAR2(200 CHAR), RISK_LEVEL NUMBER(10), FXPP_CONFIRM_PROCESS NUMBER(10), FXPP_CONFIRM_PROCESS_INFO NUMBER(10), SDX_INFO_AUDITOR VARCHAR2(100 CHAR), SDX_NEW_COM NUMBER(1), ECIF_OTC_FLAG NUMBER(1), AUDIT_STATUS NUMBER(1), IS_VALID NUMBER(1) DEFAULT 1 NOT NULL ENABLE, CREATE_TIME DATE, CREATOR NUMBER(20), CREATOR_NAME VARCHAR2(30 CHAR), MODIFY_TIME DATE, MODIFIER NUMBER(20), MODIFIER_NAME VARCHAR2(30 CHAR)) tablespace BIGDATADBT pctfree 10 initrans 1 maxtrans 255 storage ( initial 192K next 1M minextents 1 maxextents unlimited ); +create table dbz1211 (id number(38) not null, data varchar2(50), constraint name primary key (id) using index tablespace ts1) tablespace ts2; +-- Create index +create index hr.name on hr.table (id,data) tablespace ts; \ No newline at end of file