DBZ-5386 Missing "previousId" property with parsing the rename statement in kafka history topic
This commit is contained in:
parent
366c95707a
commit
7ba7929d7e
@ -291,15 +291,30 @@ else if (event instanceof SetVariableEvent) {
|
||||
private void emitChangeEvent(MySqlPartition partition, MySqlOffsetContext offset, List<SchemaChangeEvent> schemaChangeEvents,
|
||||
final String sanitizedDbName, Event event, TableId tableId, SchemaChangeEventType type,
|
||||
boolean snapshot) {
|
||||
schemaChangeEvents.add(SchemaChangeEvent.of(
|
||||
type,
|
||||
partition,
|
||||
offset,
|
||||
sanitizedDbName,
|
||||
null,
|
||||
event.statement(),
|
||||
tableId != null ? tableFor(tableId) : null,
|
||||
snapshot));
|
||||
SchemaChangeEvent schemaChangeEvent;
|
||||
if (type.equals(SchemaChangeEventType.ALTER) && event instanceof TableAlteredEvent
|
||||
&& ((TableAlteredEvent) event).previousTableId() != null) {
|
||||
schemaChangeEvent = SchemaChangeEvent.ofRename(
|
||||
partition,
|
||||
offset,
|
||||
sanitizedDbName,
|
||||
null,
|
||||
event.statement(),
|
||||
tableId != null ? tableFor(tableId) : null,
|
||||
((TableAlteredEvent) event).previousTableId());
|
||||
}
|
||||
else {
|
||||
schemaChangeEvent = SchemaChangeEvent.of(
|
||||
type,
|
||||
partition,
|
||||
offset,
|
||||
sanitizedDbName,
|
||||
null,
|
||||
event.statement(),
|
||||
tableId != null ? tableFor(tableId) : null,
|
||||
snapshot);
|
||||
}
|
||||
schemaChangeEvents.add(schemaChangeEvent);
|
||||
}
|
||||
|
||||
private boolean acceptableDatabase(final String databaseName) {
|
||||
|
Loading…
Reference in New Issue
Block a user