DBZ-2543 Add support for new db field in streaming events

This commit is contained in:
Chris Cranford 2023-08-02 03:06:06 -04:00 committed by Chris Cranford
parent 2d4d34428d
commit 748420605e
4 changed files with 18 additions and 6 deletions

View File

@ -223,7 +223,7 @@ private void onCheckpointEvent(StreamingEvent event) {
private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutationEvent) throws Exception {
final Type eventType = mutationEvent.getType();
final TableId tableId = mutationEvent.getSchema().getTableId(connectorConfig.getCatalogName());
final TableId tableId = mutationEvent.getSchema().getTableId(event.getDatabaseName());
if (!connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
return;
}
@ -295,14 +295,14 @@ private void onSchemaChangeEvent(StreamingEvent event, SchemaChangeEvent schemaE
final TableId tableId;
if (payloadSchema == null) {
tableId = getTableIdFromDdlEvent(schemaEvent.getSql());
tableId = getTableIdFromDdlEvent(event.getDatabaseName(), schemaEvent.getSql());
if (tableId == null) {
LOGGER.trace("Cannot process DDL due to missing schema: {}", schemaEvent.getSql());
return;
}
}
else {
tableId = payloadSchema.getTableId(connectorConfig.getCatalogName());
tableId = payloadSchema.getTableId(event.getDatabaseName());
}
final Instant timestamp = Instant.ofEpochMilli(Long.parseLong(event.getTimestamp()));
@ -478,7 +478,7 @@ private void processTruncateEvent(StreamingEvent event, SchemaChangeEvent ddlEve
return;
}
final TableId tableId = ddlEvent.getSchema().getTableId(connectorConfig.getCatalogName());
final TableId tableId = ddlEvent.getSchema().getTableId(event.getDatabaseName());
if (!connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
LOGGER.warn("Truncate event ignored, table is no included.");
return;
@ -525,13 +525,13 @@ private void processTruncateEvent(StreamingEvent event, SchemaChangeEvent ddlEve
// todo: this is a hack to get around the fact OLR does not provide schema details in DDL events
// ideally this needs to be changed because we're also needing to hardcode values here :/
private TableId getTableIdFromDdlEvent(String ddl) {
private TableId getTableIdFromDdlEvent(String catalogName, String ddl) {
final OracleDdlParser parser = schema.getDdlParser();
final DdlChanges ddlChanges = parser.getDdlChanges();
try {
Tables tables = new Tables();
ddlChanges.reset();
parser.setCurrentDatabase(connectorConfig.getCatalogName());
parser.setCurrentDatabase(catalogName);
parser.setCurrentSchema("DEBEZIUM");
parser.parse(ddl, tables);

View File

@ -130,6 +130,8 @@ public StreamingEvent readEvent() throws OlrNetworkClientException {
event = readNextEvent();
}
LOGGER.trace("Received Event: {}", event);
confirm(Long.parseLong(event.getScn()));
return event;
}

View File

@ -25,6 +25,8 @@ public class StreamingEvent {
@JsonProperty("tm")
private String timestamp;
private String xid;
@JsonProperty("db")
private String databaseName;
private List<PayloadEvent> payload;
public String getScn() {
@ -39,6 +41,10 @@ public String getXid() {
return xid;
}
public String getDatabaseName() {
return databaseName;
}
public List<PayloadEvent> getPayload() {
return payload;
}
@ -49,6 +55,7 @@ public String toString() {
"scn='" + scn + '\'' +
", timestamp='" + timestamp + '\'' +
", xid='" + xid + '\'' +
", databaseName='" + databaseName + '\'' +
", payload=" + payload +
'}';
}

View File

@ -39,6 +39,8 @@
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;
import ch.qos.logback.classic.Level;
/**
* Integration tests for the Oracle DDL and schema migration.
*
@ -1058,6 +1060,7 @@ public void shouldNotEmitDdlEventsForNonTableObjects() throws Exception {
final LogInterceptor errorLogInterceptor = new LogInterceptor(ErrorHandler.class);
final LogInterceptor xstreamLogInterceptor = new LogInterceptor("io.debezium.connector.oracle.xstream.LcrEventHandler");
final LogInterceptor olrLogInterceptor = new LogInterceptor(OpenLogReplicatorStreamingChangeEventSource.class);
olrLogInterceptor.setLoggerLevel(OpenLogReplicatorStreamingChangeEventSource.class, Level.TRACE);
// These roles are needed in order to perform certain DDL operations below.
// Any roles granted here should be revoked in the finally block.