From c25963fbd73306f4b1843b55eec991b52fb5bfc7 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Wed, 27 Oct 2021 15:05:15 -0400 Subject: [PATCH] DBZ-4206 Parse Oracle DDL using SDO_GEOMETRY data types --- .../ColumnDefinitionParserListener.java | 8 +++ .../oracle/AbstractOracleDatatypesTest.java | 55 ++++++++++++++++++- .../connector/oracle/SnapshotDatatypesIT.java | 3 + .../ddl/parser/oracle/generated/PlSqlLexer.g4 | 1 + .../parser/oracle/generated/PlSqlParser.g4 | 4 +- .../resources/oracle/examples/ddl_alter.sql | 1 + .../resources/oracle/examples/ddl_create.sql | 1 + 7 files changed, 70 insertions(+), 3 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/ColumnDefinitionParserListener.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/ColumnDefinitionParserListener.java index 8aa845ab1..4bfaf0f08 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/ColumnDefinitionParserListener.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/ColumnDefinitionParserListener.java @@ -267,6 +267,14 @@ else if (ctx.native_datatype_element().RAW() != null) { setPrecision(precisionPart, columnEditor); } + else if (ctx.native_datatype_element().SDO_GEOMETRY() != null) { + // Allows the registration of new SDO_GEOMETRY columns via an CREATE/ALTER TABLE + // This is the same registration of the column that is resolved during JDBC metadata inspection. + columnEditor + .jdbcType(OracleTypes.OTHER) + .type("SDO_GEOMETRY") + .length(1); + } else { throw new IllegalArgumentException("Unsupported column type: " + ctx.native_datatype_element().getText()); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java index 6b14fd8af..67f6e324e 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java @@ -131,6 +131,12 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest " primary key (id)" + ")"; + private static final String DDL_GEOMETRY = "create table debezium.type_geometry (" + + " id numeric(9,0) not null, " + + " location sdo_geometry, " + + " primary key (id)" + + ")"; + private static final List EXPECTED_STRING = Arrays.asList( new SchemaAndValueField("VAL_VARCHAR", Schema.OPTIONAL_STRING_SCHEMA, "v\u010d2"), new SchemaAndValueField("VAL_VARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "v\u010d2"), @@ -251,12 +257,15 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest new SchemaAndValueField("VAL_CLOB_LONG", Schema.OPTIONAL_STRING_SCHEMA, part(CLOB_JSON, 1, 5000)), new SchemaAndValueField("VAL_NCLOB_LONG", Schema.OPTIONAL_STRING_SCHEMA, part(NCLOB_JSON, 1, 5000))); + private static final List EXPECTED_GEOMETRY = Arrays.asList(); + private static final String[] ALL_TABLES = { "debezium.type_string", "debezium.type_fp", "debezium.type_int", "debezium.type_time", - "debezium.type_clob" + "debezium.type_clob", + "debezium.type_geometry" }; private static final String[] ALL_DDLS = { @@ -264,7 +273,8 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest DDL_FP, DDL_INT, DDL_TIME, - DDL_CLOB + DDL_CLOB, + DDL_GEOMETRY }; @Rule @@ -638,6 +648,40 @@ record = testTableRecords.get(0); } } + @Test + @FixFor("DBZ-4206") + public void geometryTypes() throws Exception { + int expectedRecordCount = 0; + + if (insertRecordsDuringTest()) { + insertGeometryTypes(); + } + + Testing.debug("Inserted"); + expectedRecordCount++; + + SourceRecords records = consumeRecordsByTopic(expectedRecordCount); + + List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TYPE_GEOMETRY"); + assertThat(testTableRecords).hasSize(expectedRecordCount); + SourceRecord record = testTableRecords.get(0); + + VerifyRecord.isValid(record); + + // insert + if (insertRecordsDuringTest()) { + VerifyRecord.isValidInsert(record, "ID", 1); + } + else { + VerifyRecord.isValidRead(record, "ID", 1); + } + + Struct after = (Struct) ((Struct) record.value()).get("after"); + // Verify that the SDO_GEOMETRY field is not being emitted as its current unsupported + assertThat(after.schema().field("LOCATION")).isNull(); + assertRecord(after, EXPECTED_GEOMETRY); + } + protected static void insertStringTypes() throws SQLException { connection.execute("INSERT INTO debezium.type_string VALUES (1, 'v\u010d2', 'v\u010d2', 'nv\u010d2', 'c', 'n\u010d')"); connection.execute("COMMIT"); @@ -719,6 +763,13 @@ protected static void updateClobTypes() throws Exception { connection.commit(); } + protected static void insertGeometryTypes() throws SQLException { + connection.execute("INSERT INTO debezium.type_geometry VALUES (" + + "1" + + ", SDO_GEOMETRY(2003, NULL, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 3), SDO_ORDINATE_ARRAY(1, 1, 5, 7))" + + ")"); + } + private static String part(String text, int start, int length) { return text == null ? "" : text.substring(start, Math.min(length, text.length())); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java index 58ef167e0..0e5491e5c 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java @@ -39,6 +39,7 @@ public static void beforeClass() throws SQLException { insertIntTypes(); insertTimeTypes(); insertClobTypes(); + insertGeometryTypes(); } @Before @@ -85,6 +86,8 @@ private String getTableIncludeList() { return "debezium.type_time"; case "clobTypes": return "debezium.type_clob"; + case "geometryTypes": + return "debezium.type_geometry"; default: throw new IllegalArgumentException("Unexpected test method: " + name.getMethodName()); } diff --git a/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/generated/PlSqlLexer.g4 b/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/generated/PlSqlLexer.g4 index e99b0c50b..8f8d65b88 100644 --- a/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/generated/PlSqlLexer.g4 +++ b/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/generated/PlSqlLexer.g4 @@ -1513,6 +1513,7 @@ SCRUB: 'SCRUB'; SD_ALL: 'SD_ALL'; SD_INHIBIT: 'SD_INHIBIT'; SDO_GEOM_MBR: 'SDO_GEOM_MBR'; +SDO_GEOMETRY: 'SDO_GEOMETRY'; SD_SHOW: 'SD_SHOW'; SEARCH: 'SEARCH'; SECOND: 'SECOND'; diff --git a/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/generated/PlSqlParser.g4 b/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/generated/PlSqlParser.g4 index 1cd8edddc..02532f315 100644 --- a/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/generated/PlSqlParser.g4 +++ b/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/generated/PlSqlParser.g4 @@ -2817,7 +2817,7 @@ varray_col_properties varray_storage_clause : STORE AS (SECUREFILE|BASICFILE)? LOB ( lob_segname? '(' lob_storage_parameters ')' | lob_segname - ) + )? ; lob_segname @@ -2904,6 +2904,7 @@ column_properties | nested_table_col_properties | (varray_col_properties | lob_storage_clause) //TODO '(' ( ','? lob_partition_storage)+ ')' | xmltype_column_properties + | column_properties column_properties+ ; period_definition @@ -4553,6 +4554,7 @@ native_datatype_element | HOUR | MINUTE | SECOND + | SDO_GEOMETRY | TIMEZONE_HOUR | TIMEZONE_MINUTE | TIMEZONE_REGION diff --git a/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_alter.sql b/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_alter.sql index 7bf7544f9..0ad14217e 100644 --- a/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_alter.sql +++ b/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_alter.sql @@ -25,3 +25,4 @@ alter table tcd_abc_int truncate partition (p1); ALTER TABLE SCOTT.T_DBZ_TEST1 ADD T_VARCHAR2 VARCHAR2(20); ALTER TABLE SCOTT.T_DBZ_TEST1 MODIFY T_VARCHAR2 VARCHAR2(20); ALTER TABLE SCOTT.T_DBZ_TEST1 DROP COLUMN T_VARCHAR2; +ALTER TABLE debezium.test add location sdo_geometry; \ No newline at end of file diff --git a/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_create.sql b/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_create.sql index 12ff87237..8d2349565 100644 --- a/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_create.sql +++ b/debezium-ddl-parser/src/test/resources/oracle/examples/ddl_create.sql @@ -12,6 +12,7 @@ CREATE TABLE "IDENTITYDB"."CHANGE_NUMBERS" ( "CHANGE_NO" NUMBER(*,0) NOT NULL EN CREATE TABLE "ETUDES"."IMPORT_EXMETI" (abort_step NUMBER,access_method VARCHAR2(16),ancestor_object_name VARCHAR2(128),ancestor_object_schema VARCHAR2(128),ancestor_object_type VARCHAR2(128),ancestor_process_order NUMBER,base_object_name VARCHAR2(128),base_object_schema VARCHAR2(128),base_object_type VARCHAR2(128),base_process_order NUMBER,block_size NUMBER,cluster_ok NUMBER,completed_bytes NUMBER,completed_rows NUMBER,completion_time DATE,control_queue VARCHAR2(128),creation_level NUMBER,creation_time DATE,cumulative_time NUMBER,data_buffer_size NUMBER,data_io NUMBER,dataobj_num NUMBER,db_version VARCHAR2(60),degree NUMBER,domain_process_order NUMBER,dump_allocation NUMBER,dump_fileid NUMBER,dump_length NUMBER,dump_orig_length NUMBER,dump_position NUMBER,duplicate NUMBER,elapsed_time NUMBER,error_count NUMBER,extend_size NUMBER,file_max_size NUMBER,file_name VARCHAR2(4000),file_type NUMBER,flags NUMBER,grantor VARCHAR2(128),granules NUMBER,guid RAW(16),in_progress CHAR(1),instance VARCHAR2(60),instance_id NUMBER,is_default NUMBER,job_mode VARCHAR2(21),job_version VARCHAR2(60),last_file NUMBER,last_update DATE,load_method NUMBER,metadata_buffer_size NUMBER,metadata_io NUMBER,name VARCHAR2(128),object_int_oid VARCHAR2(130),object_long_name VARCHAR2(4000),object_name VARCHAR2(200),object_number NUMBER,object_path_seqno NUMBER,object_row NUMBER,object_schema VARCHAR2(128),object_tablespace VARCHAR2(128),object_type VARCHAR2(128),object_type_path VARCHAR2(200),objnum NUMBER,old_value VARCHAR2(4000),operation VARCHAR2(8),option_tag VARCHAR2(128),orig_base_object_name VARCHAR2(128),orig_base_object_schema VARCHAR2(128),original_object_name VARCHAR2(128),original_object_schema VARCHAR2(128),packet_number NUMBER,parallelization NUMBER,parent_object_name VARCHAR2(128),parent_object_schema VARCHAR2(128),parent_process_order NUMBER,partition_name VARCHAR2(128),phase NUMBER,platform VARCHAR2(101),process_name VARCHAR2(128),process_order NUMBER,processing_state CHAR(1),processing_status CHAR(1),property NUMBER,proxy_schema VARCHAR2(128),proxy_view VARCHAR2(128),queue_tabnum NUMBER,remote_link VARCHAR2(128),scn NUMBER,seed NUMBER,service_name VARCHAR2(64),size_estimate NUMBER,src_compat VARCHAR2(60),start_time DATE,state VARCHAR2(12),status_queue VARCHAR2(128),subpartition_name VARCHAR2(128),target_xml_clob CLOB,tde_rewrapped_key RAW(2000),template_table VARCHAR2(128),timezone VARCHAR2(64),total_bytes NUMBER,trigflag NUMBER,unload_method NUMBER,user_directory VARCHAR2(4000),user_file_name VARCHAR2(4000),user_name VARCHAR2(128),value_n NUMBER,value_t VARCHAR2(4000),version NUMBER,work_item VARCHAR2(21),xml_clob CLOB,xml_process_order NUMBER) SEGMENT CREATION IMMEDIATE NO INMEMORY INITRANS 100; CREATE TABLE "IFSAPP".CMP4$94648 organization heap tablespace "IFSAPP_DATA" compress for all operations nologging as select /*+ DYNAMIC_SAMPLING(0) */ * from "IFSAPP".COMP3$94648 mytab; CREATE TABLE "ZZJOBRUN_MDE" ("MANDT" VARCHAR2 (000009) DEFAULT '000' NOT NULL, "REPID" VARCHAR2 (000120) DEFAULT ' ' NOT NULL, "VARNA" VARCHAR2 (000042) DEFAULT ' ' NOT NULL, "ZZ_MDE_INFO" VARCHAR2 (000075) DEFAULT ' ' NOT NULL) PCTFREE 10 PCTUSED 00 INITRANS 001 TABLESPACE PSAPSR3TBLS NOCOMPRESS NO INMEMORY STORAGE (INITIAL 0000000064 K NEXT 0000001024 K MINEXTENTS 0000000001 MAXEXTENTS UNLIMITED PCTINCREASE 0000 FREELISTS 001 FREELIST GROUPS 01); +CREATE TABLE "DEBEZIUM"."TYPE_GEOMETRY" ( "ID" NUMBER(9,0) NOT NULL ENABLE, "LOCATION" "SDO_GEOMETRY", PRIMARY KEY ("ID") USING INDEX ENABLE, SUPPLEMENTAL LOG DATA (ALL) COLUMNS ) VARRAY "LOCATION"."SDO_ELEM_INFO" STORE AS SECUREFILE LOB VARRAY "LOCATION"."SDO_ORDINATES" STORE AS SECUREFILE LOB; -- Create index create index hr.name on hr.table (id,data) tablespace ts; create unique index idx_eshp_auction_file_history_id on eshp_auction_file_history(history_id);