diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorStreamingChangeEventSource.java index 6d6548d64..b7d126920 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorStreamingChangeEventSource.java @@ -223,7 +223,7 @@ private void onCheckpointEvent(StreamingEvent event) { private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutationEvent) throws Exception { final Type eventType = mutationEvent.getType(); - final TableId tableId = mutationEvent.getSchema().getTableId(connectorConfig.getCatalogName()); + final TableId tableId = mutationEvent.getSchema().getTableId(event.getDatabaseName()); if (!connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) { return; } @@ -295,14 +295,14 @@ private void onSchemaChangeEvent(StreamingEvent event, SchemaChangeEvent schemaE final TableId tableId; if (payloadSchema == null) { - tableId = getTableIdFromDdlEvent(schemaEvent.getSql()); + tableId = getTableIdFromDdlEvent(event.getDatabaseName(), schemaEvent.getSql()); if (tableId == null) { LOGGER.trace("Cannot process DDL due to missing schema: {}", schemaEvent.getSql()); return; } } else { - tableId = payloadSchema.getTableId(connectorConfig.getCatalogName()); + tableId = payloadSchema.getTableId(event.getDatabaseName()); } final Instant timestamp = Instant.ofEpochMilli(Long.parseLong(event.getTimestamp())); @@ -478,7 +478,7 @@ private void processTruncateEvent(StreamingEvent event, SchemaChangeEvent ddlEve return; } - final TableId tableId = ddlEvent.getSchema().getTableId(connectorConfig.getCatalogName()); + final TableId tableId = ddlEvent.getSchema().getTableId(event.getDatabaseName()); if (!connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) { LOGGER.warn("Truncate event ignored, table is no included."); return; @@ -525,13 +525,13 @@ private void processTruncateEvent(StreamingEvent event, SchemaChangeEvent ddlEve // todo: this is a hack to get around the fact OLR does not provide schema details in DDL events // ideally this needs to be changed because we're also needing to hardcode values here :/ - private TableId getTableIdFromDdlEvent(String ddl) { + private TableId getTableIdFromDdlEvent(String catalogName, String ddl) { final OracleDdlParser parser = schema.getDdlParser(); final DdlChanges ddlChanges = parser.getDdlChanges(); try { Tables tables = new Tables(); ddlChanges.reset(); - parser.setCurrentDatabase(connectorConfig.getCatalogName()); + parser.setCurrentDatabase(catalogName); parser.setCurrentSchema("DEBEZIUM"); parser.parse(ddl, tables); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/client/OlrNetworkClient.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/client/OlrNetworkClient.java index ea8d18d8a..fc83f7072 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/client/OlrNetworkClient.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/client/OlrNetworkClient.java @@ -130,6 +130,8 @@ public StreamingEvent readEvent() throws OlrNetworkClientException { event = readNextEvent(); } + LOGGER.trace("Received Event: {}", event); + confirm(Long.parseLong(event.getScn())); return event; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/client/StreamingEvent.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/client/StreamingEvent.java index 3d9d5ebb4..9047f5d8a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/client/StreamingEvent.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/client/StreamingEvent.java @@ -25,6 +25,8 @@ public class StreamingEvent { @JsonProperty("tm") private String timestamp; private String xid; + @JsonProperty("db") + private String databaseName; private List payload; public String getScn() { @@ -39,6 +41,10 @@ public String getXid() { return xid; } + public String getDatabaseName() { + return databaseName; + } + public List getPayload() { return payload; } @@ -49,6 +55,7 @@ public String toString() { "scn='" + scn + '\'' + ", timestamp='" + timestamp + '\'' + ", xid='" + xid + '\'' + + ", databaseName='" + databaseName + '\'' + ", payload=" + payload + '}'; } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSchemaMigrationIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSchemaMigrationIT.java index d64043b7d..e820a8299 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSchemaMigrationIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSchemaMigrationIT.java @@ -39,6 +39,8 @@ import io.debezium.relational.history.SchemaHistory; import io.debezium.util.Testing; +import ch.qos.logback.classic.Level; + /** * Integration tests for the Oracle DDL and schema migration. * @@ -1058,6 +1060,7 @@ public void shouldNotEmitDdlEventsForNonTableObjects() throws Exception { final LogInterceptor errorLogInterceptor = new LogInterceptor(ErrorHandler.class); final LogInterceptor xstreamLogInterceptor = new LogInterceptor("io.debezium.connector.oracle.xstream.LcrEventHandler"); final LogInterceptor olrLogInterceptor = new LogInterceptor(OpenLogReplicatorStreamingChangeEventSource.class); + olrLogInterceptor.setLoggerLevel(OpenLogReplicatorStreamingChangeEventSource.class, Level.TRACE); // These roles are needed in order to perform certain DDL operations below. // Any roles granted here should be revoked in the finally block.