DBZ-6782 Properly solve XMLTYPE data types during DDL changes
This commit is contained in:
parent
e5766ae26a
commit
dc23f18456
@ -299,6 +299,11 @@ else if (ctx.native_datatype_element().ROWID() != null) {
|
||||
.jdbcType(Types.VARCHAR)
|
||||
.type("ROWID");
|
||||
}
|
||||
else if (ctx.native_datatype_element().XMLTYPE() != null) {
|
||||
columnEditor
|
||||
.jdbcType(OracleTypes.SQLXML)
|
||||
.type("XMLTYPE");
|
||||
}
|
||||
else {
|
||||
columnEditor
|
||||
.jdbcType(OracleTypes.OTHER)
|
||||
|
@ -36,8 +36,10 @@
|
||||
import io.debezium.data.VerifyRecord;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.relational.history.SchemaHistory;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
import oracle.jdbc.OracleTypes;
|
||||
import oracle.xdb.XMLType;
|
||||
import oracle.xml.parser.v2.XMLDocument;
|
||||
|
||||
@ -641,6 +643,89 @@ record = topicRecords.get(0);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-6782")
|
||||
public void shouldProperlyResolveAddedXmlColumnTypeAndStreamChanges() throws Exception {
|
||||
TestHelper.dropTable(connection, "dbz6782");
|
||||
try {
|
||||
// Explicitly no key.
|
||||
connection.execute("CREATE TABLE dbz6782 (ID numeric(9,0), DATA xmltype, primary key(ID))");
|
||||
TestHelper.streamTable(connection, "dbz6782");
|
||||
|
||||
final String xml = XML_LONG_DATA;
|
||||
connection.prepareQuery("insert into dbz6782 values (1,?)", ps -> ps.setObject(1, toXmlType(xml)), null);
|
||||
connection.commit();
|
||||
|
||||
Configuration config = getDefaultXmlConfig()
|
||||
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6782")
|
||||
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, "true")
|
||||
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, "true")
|
||||
.build();
|
||||
|
||||
start(OracleConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||
|
||||
connection.execute("ALTER TABLE dbz6782 add DATA2 xmltype");
|
||||
|
||||
final String xml2 = XML_LONG_DATA2;
|
||||
connection.prepareQuery("insert into dbz6782 values (2,?,?)",
|
||||
ps -> {
|
||||
ps.setObject(1, toXmlType(xml));
|
||||
ps.setObject(2, toXmlType(xml2));
|
||||
}, null);
|
||||
connection.commit();
|
||||
|
||||
// Schema changes + data changes
|
||||
SourceRecords records = consumeRecordsByTopic(4);
|
||||
List<SourceRecord> topicRecords = records.recordsForTopic(topicName("DBZ6782"));
|
||||
assertThat(topicRecords).hasSize(2);
|
||||
|
||||
// Snapshot
|
||||
SourceRecord record = topicRecords.get(0);
|
||||
VerifyRecord.isValidRead(record, "ID", 1);
|
||||
|
||||
Struct after = after(record);
|
||||
assertThat(after.get("ID")).isEqualTo(1);
|
||||
assertXmlFieldIsEqual(after, "DATA", xml);
|
||||
|
||||
// Insert during streaming
|
||||
record = topicRecords.get(1);
|
||||
VerifyRecord.isValidInsert(record, "ID", 2);
|
||||
|
||||
after = after(record);
|
||||
assertThat(after.get("ID")).isEqualTo(2);
|
||||
assertXmlFieldIsEqual(after, "DATA", xml);
|
||||
assertXmlFieldIsEqual(after, "DATA2", xml2);
|
||||
|
||||
// Schema changes
|
||||
List<SourceRecord> schemaChanges = records.recordsForTopic("server1");
|
||||
|
||||
List<Object> tableChanges = ((Struct) schemaChanges.get(1).value()).getArray("tableChanges");
|
||||
assertThat(tableChanges).hasSize(1);
|
||||
|
||||
Struct tableChange = (Struct) tableChanges.get(0);
|
||||
assertThat(tableChange.getString("type")).isEqualTo("ALTER");
|
||||
assertThat(tableChange.getString("id")).contains("\"DBZ6782\"");
|
||||
|
||||
// Verify columns
|
||||
for (Object column : tableChange.getStruct("table").getArray("columns")) {
|
||||
Struct columnStruct = (Struct) column;
|
||||
if (columnStruct.getString("name").startsWith("DATA")) {
|
||||
assertThat(columnStruct.get("jdbcType")).isEqualTo(OracleTypes.SQLXML);
|
||||
assertThat(columnStruct.get("typeName")).isEqualTo("XMLTYPE");
|
||||
assertThat(columnStruct.get("typeExpression")).isEqualTo("XMLTYPE");
|
||||
}
|
||||
}
|
||||
|
||||
assertNoRecordsToConsume();
|
||||
}
|
||||
finally {
|
||||
TestHelper.dropTable(connection, "dbz6782");
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration.Builder getDefaultXmlConfig() {
|
||||
return TestHelper.defaultConfig().with(OracleConnectorConfig.LOB_ENABLED, true);
|
||||
}
|
||||
|
@ -5020,6 +5020,7 @@ native_datatype_element
|
||||
| CLOB
|
||||
| NCLOB
|
||||
| MLSLABEL
|
||||
| XMLTYPE
|
||||
;
|
||||
|
||||
bind_variable
|
||||
|
@ -48,6 +48,7 @@ ADD FOREIGN KEY (TABLE_B_ID) REFERENCES TABLE_B (ID);
|
||||
alter table mesg_perf_stat TRUNCATE PARTITION SYS_P1221396 DROP STORAGE CASCADE UPDATE INDEXES;
|
||||
ALTER TABLE PK_CNT_GIORNI_TIPI_INC SPLIT PARTITION PMAX AT (TO_DATE('1410202215','DDMMYYYYHH24'))
|
||||
INTO (PARTITION P2022101414, PARTITION PMAX);
|
||||
ALTER TABLE tab ADD XMLTYPE;
|
||||
-- Virtual column support
|
||||
ALTER TABLE VIDEO ADD (sql_code_injection_check NUMBER GENERATED ALWAYS AS (sdd_avoid_sql_injection(SKRIPT)) VIRTUAL);
|
||||
-- Shrink Space
|
||||
|
@ -18,6 +18,7 @@ create table products (id NUMBER(4) NOT NULL PRIMARY KEY, name VARCHAR2(255) NOT
|
||||
CREATE TABLE T$BO_PATIENT_LMAP NOLOGGING TABLESPACE CMX_TEMP_TDE PARALLEL (DEGREE 1) AS SELECT DISTINCT XREF.ORIG_ROWID_OBJECT, XREF.ROWID_OBJECT FROM C_S_PATIENT STAGE INNER JOIN C_BO_PATIENT_XREF XREF ON XREF.ORIG_ROWID_OBJECT = STAGE.ROWID_OBJECT WHERE STAGE.ROWID_OBJECT IS NOT NULL ;
|
||||
CREATE TABLE "ORACDC"."DEPARTMENT" ( "EMP_NO" VARCHAR2(20) NOT NULL ENABLE, "REQUEST_ID" VARCHAR2(30) NOT NULL ENABLE, "EMP_RCODE" NUMBER(2,0), "EMP_RFLAG" VARCHAR2(20), "EMP_RMSG" VARCHAR2(1000), "EMP_EMP_AMOUNT" NUMBER(8,2), "EMP_EMP_AVS" VARCHAR2(20), "EMP_EMP_CODE" VARCHAR2(200), "EMP_EMP_TIME" VARCHAR2(20), "LAST_UPD_DATE" DATE, "LAST_UPD_BY" VARCHAR2(20), "SITE_ID" NUMBER(5,0) DEFAULT NULL NOT NULL ENABLE, "OBSOLETE_FLAG" CHAR(1) DEFAULT 'N' NOT NULL ENABLE, "PRODUCT_TYPE_ID" NUMBER DEFAULT 2, "TRX_NUMBER" VARCHAR2(30), "RECONCILIATION_ID" VARCHAR2(60), "REQUEST_TOKEN" VARCHAR2(256), "LOAD_TIMESTAMP" TIMESTAMP (6), "PAYMENT_TYPE" VARCHAR2(50), "TRANSACTION_SOURCE" VARCHAR2(50), "CREDIT_CARD_TYPE" VARCHAR2(50), SUPPLEMENTAL LOG GROUP "GGS_19282" ("EMP_NO", "REQUEST_ID") ALWAYS, SUPPLEMENTAL LOG DATA (ALL) COLUMNS, CONSTRAINT "DEPARTMENT_FK" FOREIGN KEY ("EMP_NO") REFERENCES "ORACDC"."MEM_REG" ("EMP_NO") ENABLE NOVALIDATE ) PARTITION BY HASH ("EMP_NO") (PARTITION "DEPARTMENT_P1" ,PARTITION "DEPARTMENT_P2" , PARTITION "DEPARTMENT_P32" );
|
||||
CREATE TABLE "ODS_XMES_QY"."ORDER" ("Oid" CHAR(36), "CONTRACT" CHAR(36), CONSTRAINT "PK_ORDER" PRIMARY KEY ("Oid") USING INDEX ENABLE, SUPPLEMENTAL LOG GROUP "GGS_87678" ("Oid") ALWAYS) INMEMORY PRIORITY CRITICAL MEMCOMPRESS FOR QUERY LOW DISTRIBUTE AUTO FOR SERVICE ALL NO DUPLICATE;
|
||||
CREATE TABLE tab (ID number(9) primary key, data xmltype, data2 xmltype);
|
||||
CREATE TABLE test9 ( doc mdsys.sdo_geometry );
|
||||
CREATE TABLE "XMES"."ORDER_ITEM"
|
||||
( "CREATE_TIME" DATE,
|
||||
|
Loading…
Reference in New Issue
Block a user