DBZ-4206 Parse Oracle DDL using SDO_GEOMETRY data types

This commit is contained in:
Chris Cranford 2021-10-27 15:05:15 -04:00 committed by Gunnar Morling
parent d3731f1d7b
commit c25963fbd7
7 changed files with 70 additions and 3 deletions

View File

@ -267,6 +267,14 @@ else if (ctx.native_datatype_element().RAW() != null) {
setPrecision(precisionPart, columnEditor);
}
else if (ctx.native_datatype_element().SDO_GEOMETRY() != null) {
// Allows the registration of new SDO_GEOMETRY columns via an CREATE/ALTER TABLE
// This is the same registration of the column that is resolved during JDBC metadata inspection.
columnEditor
.jdbcType(OracleTypes.OTHER)
.type("SDO_GEOMETRY")
.length(1);
}
else {
throw new IllegalArgumentException("Unsupported column type: " + ctx.native_datatype_element().getText());
}

View File

@ -131,6 +131,12 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
" primary key (id)" +
")";
private static final String DDL_GEOMETRY = "create table debezium.type_geometry (" +
" id numeric(9,0) not null, " +
" location sdo_geometry, " +
" primary key (id)" +
")";
private static final List<SchemaAndValueField> EXPECTED_STRING = Arrays.asList(
new SchemaAndValueField("VAL_VARCHAR", Schema.OPTIONAL_STRING_SCHEMA, "v\u010d2"),
new SchemaAndValueField("VAL_VARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "v\u010d2"),
@ -251,12 +257,15 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
new SchemaAndValueField("VAL_CLOB_LONG", Schema.OPTIONAL_STRING_SCHEMA, part(CLOB_JSON, 1, 5000)),
new SchemaAndValueField("VAL_NCLOB_LONG", Schema.OPTIONAL_STRING_SCHEMA, part(NCLOB_JSON, 1, 5000)));
private static final List<SchemaAndValueField> EXPECTED_GEOMETRY = Arrays.asList();
private static final String[] ALL_TABLES = {
"debezium.type_string",
"debezium.type_fp",
"debezium.type_int",
"debezium.type_time",
"debezium.type_clob"
"debezium.type_clob",
"debezium.type_geometry"
};
private static final String[] ALL_DDLS = {
@ -264,7 +273,8 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
DDL_FP,
DDL_INT,
DDL_TIME,
DDL_CLOB
DDL_CLOB,
DDL_GEOMETRY
};
@Rule
@ -638,6 +648,40 @@ record = testTableRecords.get(0);
}
}
@Test
@FixFor("DBZ-4206")
public void geometryTypes() throws Exception {
int expectedRecordCount = 0;
if (insertRecordsDuringTest()) {
insertGeometryTypes();
}
Testing.debug("Inserted");
expectedRecordCount++;
SourceRecords records = consumeRecordsByTopic(expectedRecordCount);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TYPE_GEOMETRY");
assertThat(testTableRecords).hasSize(expectedRecordCount);
SourceRecord record = testTableRecords.get(0);
VerifyRecord.isValid(record);
// insert
if (insertRecordsDuringTest()) {
VerifyRecord.isValidInsert(record, "ID", 1);
}
else {
VerifyRecord.isValidRead(record, "ID", 1);
}
Struct after = (Struct) ((Struct) record.value()).get("after");
// Verify that the SDO_GEOMETRY field is not being emitted as its current unsupported
assertThat(after.schema().field("LOCATION")).isNull();
assertRecord(after, EXPECTED_GEOMETRY);
}
protected static void insertStringTypes() throws SQLException {
connection.execute("INSERT INTO debezium.type_string VALUES (1, 'v\u010d2', 'v\u010d2', 'nv\u010d2', 'c', 'n\u010d')");
connection.execute("COMMIT");
@ -719,6 +763,13 @@ protected static void updateClobTypes() throws Exception {
connection.commit();
}
protected static void insertGeometryTypes() throws SQLException {
connection.execute("INSERT INTO debezium.type_geometry VALUES ("
+ "1"
+ ", SDO_GEOMETRY(2003, NULL, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 3), SDO_ORDINATE_ARRAY(1, 1, 5, 7))"
+ ")");
}
private static String part(String text, int start, int length) {
return text == null ? "" : text.substring(start, Math.min(length, text.length()));
}

View File

@ -39,6 +39,7 @@ public static void beforeClass() throws SQLException {
insertIntTypes();
insertTimeTypes();
insertClobTypes();
insertGeometryTypes();
}
@Before
@ -85,6 +86,8 @@ private String getTableIncludeList() {
return "debezium.type_time";
case "clobTypes":
return "debezium.type_clob";
case "geometryTypes":
return "debezium.type_geometry";
default:
throw new IllegalArgumentException("Unexpected test method: " + name.getMethodName());
}

View File

@ -1513,6 +1513,7 @@ SCRUB: 'SCRUB';
SD_ALL: 'SD_ALL';
SD_INHIBIT: 'SD_INHIBIT';
SDO_GEOM_MBR: 'SDO_GEOM_MBR';
SDO_GEOMETRY: 'SDO_GEOMETRY';
SD_SHOW: 'SD_SHOW';
SEARCH: 'SEARCH';
SECOND: 'SECOND';

View File

@ -2817,7 +2817,7 @@ varray_col_properties
varray_storage_clause
: STORE AS (SECUREFILE|BASICFILE)? LOB ( lob_segname? '(' lob_storage_parameters ')'
| lob_segname
)
)?
;
lob_segname
@ -2904,6 +2904,7 @@ column_properties
| nested_table_col_properties
| (varray_col_properties | lob_storage_clause) //TODO '(' ( ','? lob_partition_storage)+ ')'
| xmltype_column_properties
| column_properties column_properties+
;
period_definition
@ -4553,6 +4554,7 @@ native_datatype_element
| HOUR
| MINUTE
| SECOND
| SDO_GEOMETRY
| TIMEZONE_HOUR
| TIMEZONE_MINUTE
| TIMEZONE_REGION

View File

@ -25,3 +25,4 @@ alter table tcd_abc_int truncate partition (p1);
ALTER TABLE SCOTT.T_DBZ_TEST1 ADD T_VARCHAR2 VARCHAR2(20);
ALTER TABLE SCOTT.T_DBZ_TEST1 MODIFY T_VARCHAR2 VARCHAR2(20);
ALTER TABLE SCOTT.T_DBZ_TEST1 DROP COLUMN T_VARCHAR2;
ALTER TABLE debezium.test add location sdo_geometry;

View File

@ -12,6 +12,7 @@ CREATE TABLE "IDENTITYDB"."CHANGE_NUMBERS" ( "CHANGE_NO" NUMBER(*,0) NOT NULL EN
CREATE TABLE "ETUDES"."IMPORT_EXMETI" (abort_step NUMBER,access_method VARCHAR2(16),ancestor_object_name VARCHAR2(128),ancestor_object_schema VARCHAR2(128),ancestor_object_type VARCHAR2(128),ancestor_process_order NUMBER,base_object_name VARCHAR2(128),base_object_schema VARCHAR2(128),base_object_type VARCHAR2(128),base_process_order NUMBER,block_size NUMBER,cluster_ok NUMBER,completed_bytes NUMBER,completed_rows NUMBER,completion_time DATE,control_queue VARCHAR2(128),creation_level NUMBER,creation_time DATE,cumulative_time NUMBER,data_buffer_size NUMBER,data_io NUMBER,dataobj_num NUMBER,db_version VARCHAR2(60),degree NUMBER,domain_process_order NUMBER,dump_allocation NUMBER,dump_fileid NUMBER,dump_length NUMBER,dump_orig_length NUMBER,dump_position NUMBER,duplicate NUMBER,elapsed_time NUMBER,error_count NUMBER,extend_size NUMBER,file_max_size NUMBER,file_name VARCHAR2(4000),file_type NUMBER,flags NUMBER,grantor VARCHAR2(128),granules NUMBER,guid RAW(16),in_progress CHAR(1),instance VARCHAR2(60),instance_id NUMBER,is_default NUMBER,job_mode VARCHAR2(21),job_version VARCHAR2(60),last_file NUMBER,last_update DATE,load_method NUMBER,metadata_buffer_size NUMBER,metadata_io NUMBER,name VARCHAR2(128),object_int_oid VARCHAR2(130),object_long_name VARCHAR2(4000),object_name VARCHAR2(200),object_number NUMBER,object_path_seqno NUMBER,object_row NUMBER,object_schema VARCHAR2(128),object_tablespace VARCHAR2(128),object_type VARCHAR2(128),object_type_path VARCHAR2(200),objnum NUMBER,old_value VARCHAR2(4000),operation VARCHAR2(8),option_tag VARCHAR2(128),orig_base_object_name VARCHAR2(128),orig_base_object_schema VARCHAR2(128),original_object_name VARCHAR2(128),original_object_schema VARCHAR2(128),packet_number NUMBER,parallelization NUMBER,parent_object_name VARCHAR2(128),parent_object_schema VARCHAR2(128),parent_process_order NUMBER,partition_name VARCHAR2(128),phase NUMBER,platform VARCHAR2(101),process_name VARCHAR2(128),process_order NUMBER,processing_state CHAR(1),processing_status CHAR(1),property NUMBER,proxy_schema VARCHAR2(128),proxy_view VARCHAR2(128),queue_tabnum NUMBER,remote_link VARCHAR2(128),scn NUMBER,seed NUMBER,service_name VARCHAR2(64),size_estimate NUMBER,src_compat VARCHAR2(60),start_time DATE,state VARCHAR2(12),status_queue VARCHAR2(128),subpartition_name VARCHAR2(128),target_xml_clob CLOB,tde_rewrapped_key RAW(2000),template_table VARCHAR2(128),timezone VARCHAR2(64),total_bytes NUMBER,trigflag NUMBER,unload_method NUMBER,user_directory VARCHAR2(4000),user_file_name VARCHAR2(4000),user_name VARCHAR2(128),value_n NUMBER,value_t VARCHAR2(4000),version NUMBER,work_item VARCHAR2(21),xml_clob CLOB,xml_process_order NUMBER) SEGMENT CREATION IMMEDIATE NO INMEMORY INITRANS 100;
CREATE TABLE "IFSAPP".CMP4$94648 organization heap tablespace "IFSAPP_DATA" compress for all operations nologging as select /*+ DYNAMIC_SAMPLING(0) */ * from "IFSAPP".COMP3$94648 mytab;
CREATE TABLE "ZZJOBRUN_MDE" ("MANDT" VARCHAR2 (000009) DEFAULT '000' NOT NULL, "REPID" VARCHAR2 (000120) DEFAULT ' ' NOT NULL, "VARNA" VARCHAR2 (000042) DEFAULT ' ' NOT NULL, "ZZ_MDE_INFO" VARCHAR2 (000075) DEFAULT ' ' NOT NULL) PCTFREE 10 PCTUSED 00 INITRANS 001 TABLESPACE PSAPSR3TBLS NOCOMPRESS NO INMEMORY STORAGE (INITIAL 0000000064 K NEXT 0000001024 K MINEXTENTS 0000000001 MAXEXTENTS UNLIMITED PCTINCREASE 0000 FREELISTS 001 FREELIST GROUPS 01);
CREATE TABLE "DEBEZIUM"."TYPE_GEOMETRY" ( "ID" NUMBER(9,0) NOT NULL ENABLE, "LOCATION" "SDO_GEOMETRY", PRIMARY KEY ("ID") USING INDEX ENABLE, SUPPLEMENTAL LOG DATA (ALL) COLUMNS ) VARRAY "LOCATION"."SDO_ELEM_INFO" STORE AS SECUREFILE LOB VARRAY "LOCATION"."SDO_ORDINATES" STORE AS SECUREFILE LOB;
-- Create index
create index hr.name on hr.table (id,data) tablespace ts;
create unique index idx_eshp_auction_file_history_id on eshp_auction_file_history(history_id);