From 3fb5bf133947e3a447a1e9ba5822333ca561d3ec Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Mon, 13 Jun 2022 14:28:52 -0400 Subject: [PATCH] DBZ-4451 Introduce SchemaChangeEvent factory methods --- .../connector/mysql/MySqlDatabaseSchema.java | 15 +- .../mysql/MySqlSnapshotChangeEventSource.java | 13 +- .../OracleSchemaChangeEventEmitter.java | 54 ++--- .../OracleSnapshotChangeEventSource.java | 12 +- .../oracle/OracleDatabaseHistoryTest.java | 2 +- .../PostgresSnapshotChangeEventSource.java | 13 +- .../SqlServerSchemaChangeEventEmitter.java | 12 +- .../SqlServerSnapshotChangeEventSource.java | 13 +- .../pipeline/signal/SchemaChanges.java | 24 +- .../history/JsonTableChangeSerializer.java | 2 +- .../relational/history/TableChanges.java | 13 +- .../io/debezium/schema/SchemaChangeEvent.java | 223 +++++++++++++++++- 12 files changed, 286 insertions(+), 110 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java index 32bf11798..ae73d6c5b 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java @@ -279,9 +279,7 @@ else if (event instanceof SetVariableEvent) { else { offset.databaseEvent(databaseName, sourceTime); schemaChangeEvents - .add(new SchemaChangeEvent(partition.getSourcePartition(), offset.getOffset(), - offset.getSourceInfo(), databaseName, null, ddlStatements, (Table) null, - SchemaChangeEventType.DATABASE, snapshot, null)); + .add(SchemaChangeEvent.ofDatabase(partition, offset, databaseName, ddlStatements, snapshot)); } } else { @@ -293,8 +291,15 @@ else if (event instanceof SetVariableEvent) { private void emitChangeEvent(MySqlPartition partition, MySqlOffsetContext offset, List schemaChangeEvents, final String sanitizedDbName, Event event, TableId tableId, SchemaChangeEventType type, boolean snapshot) { - schemaChangeEvents.add(new SchemaChangeEvent(partition.getSourcePartition(), offset.getOffset(), offset.getSourceInfo(), - sanitizedDbName, null, event.statement(), tableId != null ? tableFor(tableId) : null, type, snapshot, null)); + schemaChangeEvents.add(SchemaChangeEvent.of( + type, + partition, + offset, + sanitizedDbName, + null, + event.statement(), + tableId != null ? tableFor(tableId) : null, + snapshot)); } private boolean acceptableDatabase(final String databaseName) { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java index e64e58566..b6c3b8a78 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java @@ -39,7 +39,6 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.SchemaChangeEvent; -import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.util.Clock; import io.debezium.util.Collect; import io.debezium.util.Strings; @@ -384,17 +383,7 @@ private boolean twoPhaseSchemaSnapshot() { protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapshotContext, Table table) throws SQLException { - return new SchemaChangeEvent( - snapshotContext.partition.getSourcePartition(), - snapshotContext.offset.getOffset(), - snapshotContext.offset.getSourceInfo(), - snapshotContext.catalogName, - table.id().schema(), - null, - table, - SchemaChangeEventType.CREATE, - true, - null); + return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table); } @Override diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java index 2c944811d..c1262f136 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java @@ -26,7 +26,6 @@ import io.debezium.relational.ddl.DdlParserListener.TableCreatedEvent; import io.debezium.relational.ddl.DdlParserListener.TableDroppedEvent; import io.debezium.schema.SchemaChangeEvent; -import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.text.MultipleParsingExceptions; import io.debezium.text.ParsingException; @@ -127,17 +126,14 @@ public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException private SchemaChangeEvent createTableEvent(OraclePartition partition, TableCreatedEvent event) { offsetContext.tableEvent(tableId, changeTime); - return new SchemaChangeEvent( - partition.getSourcePartition(), - offsetContext.getOffset(), - offsetContext.getSourceInfo(), + return SchemaChangeEvent.ofCreate( + partition, + offsetContext, tableId.catalog(), tableId.schema(), event.statement(), schema.tableFor(event.tableId()), - SchemaChangeEventType.CREATE, - false, - null); + false); } private SchemaChangeEvent alterTableEvent(OraclePartition partition, TableAlteredEvent event) { @@ -146,31 +142,35 @@ private SchemaChangeEvent alterTableEvent(OraclePartition partition, TableAltere tableIds.add(event.tableId()); offsetContext.tableEvent(tableIds, changeTime); - return new SchemaChangeEvent( - partition.getSourcePartition(), - offsetContext.getOffset(), - offsetContext.getSourceInfo(), - tableId.catalog(), - tableId.schema(), - event.statement(), - schema.tableFor(event.tableId()), - SchemaChangeEventType.ALTER, - false, - tableId); + if (tableId == null) { + return SchemaChangeEvent.ofAlter( + partition, + offsetContext, + tableId.catalog(), + tableId.schema(), + event.statement(), + schema.tableFor(event.tableId())); + } + else { + return SchemaChangeEvent.ofRename( + partition, + offsetContext, + tableId.catalog(), + tableId.schema(), + event.statement(), + schema.tableFor(event.tableId()), + tableId); + } } private SchemaChangeEvent dropTableEvent(OraclePartition partition, Table tableSchemaBeforeDrop, TableDroppedEvent event) { offsetContext.tableEvent(tableId, changeTime); - return new SchemaChangeEvent( - partition.getSourcePartition(), - offsetContext.getOffset(), - offsetContext.getSourceInfo(), + return SchemaChangeEvent.ofDrop( + partition, + offsetContext, tableId.catalog(), tableId.schema(), event.statement(), - tableSchemaBeforeDrop, - SchemaChangeEventType.DROP, - false, - null); + tableSchemaBeforeDrop); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index a0a4bcfb9..c9a30f554 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -25,7 +25,6 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.SchemaChangeEvent; -import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.util.Clock; /** @@ -226,17 +225,14 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapshotContext, Table table) throws SQLException { - return new SchemaChangeEvent( - snapshotContext.partition.getSourcePartition(), - snapshotContext.offset.getOffset(), - snapshotContext.offset.getSourceInfo(), + return SchemaChangeEvent.ofCreate( + snapshotContext.partition, + snapshotContext.offset, snapshotContext.catalogName, table.id().schema(), jdbcConnection.getTableMetadataDdl(table.id()), table, - SchemaChangeEventType.CREATE, - true, - null); + true); } /** diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDatabaseHistoryTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDatabaseHistoryTest.java index 4c9b4f024..81cd13ad4 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDatabaseHistoryTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDatabaseHistoryTest.java @@ -59,7 +59,7 @@ protected HistoryRecord getRenameAlterHistoryRecord() { TestHelper.getDatabaseName(), "DEBEZIUM", "ALTER TABLE DBZ4451A RENAME TO DBZ4451B;", - new TableChanges().alter(table, oldTableId), + new TableChanges().rename(table, oldTableId), Instant.now()); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java index 4d24308cd..2bedf2fbd 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java @@ -26,7 +26,6 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.SchemaChangeEvent; -import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.util.Clock; public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { @@ -205,17 +204,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapshotContext, Table table) throws SQLException { - return new SchemaChangeEvent( - snapshotContext.partition.getSourcePartition(), - snapshotContext.offset.getOffset(), - snapshotContext.offset.getSourceInfo(), - snapshotContext.catalogName, - table.id().schema(), - null, - table, - SchemaChangeEventType.CREATE, - true, - null); + return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table); } @Override diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java index 32b339f50..7c3cd1f4b 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java @@ -34,17 +34,15 @@ public SqlServerSchemaChangeEventEmitter(SqlServerPartition partition, SqlServer @Override public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException { - final SchemaChangeEvent event = new SchemaChangeEvent( - partition.getSourcePartition(), - offsetContext.getOffset(), - offsetContext.getSourceInfo(), + final SchemaChangeEvent event = SchemaChangeEvent.of( + eventType, + partition, + offsetContext, changeTable.getSourceTableId().catalog(), changeTable.getSourceTableId().schema(), "N/A", tableSchema, - eventType, - false, - null); + false); receiver.schemaChangeEvent(event); } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java index e4756069b..56e162fe3 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java @@ -28,7 +28,6 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.SchemaChangeEvent; -import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.util.Clock; public class SqlServerSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { @@ -230,17 +229,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapshotContext, Table table) throws SQLException { - return new SchemaChangeEvent( - snapshotContext.partition.getSourcePartition(), - snapshotContext.offset.getOffset(), - snapshotContext.offset.getSourceInfo(), - snapshotContext.catalogName, - table.id().schema(), - null, - table, - SchemaChangeEventType.CREATE, - true, - null); + return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table); } @Override diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/SchemaChanges.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/SchemaChanges.java index 828f9994a..96d24fccd 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/SchemaChanges.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/SchemaChanges.java @@ -8,7 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.DebeziumException; import io.debezium.document.Array; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.signal.Signal.Payload; @@ -20,7 +19,6 @@ import io.debezium.relational.history.TableChanges.TableChangeType; import io.debezium.schema.DataCollectionId; import io.debezium.schema.SchemaChangeEvent; -import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; public class SchemaChanges

implements Signal.Action

{ @@ -61,9 +59,13 @@ public boolean arrived(Payload

signalPayload) throws InterruptedException { if (dispatcher.getHistorizedSchema() != null) { LOGGER.info("Executing schema change for table '{}' requested by signal '{}'", tableChange.getId(), signalPayload.id); dispatcher.dispatchSchemaChangeEvent(signalPayload.partition, tableChange.getId(), emitter -> { - emitter.schemaChangeEvent(new SchemaChangeEvent(signalPayload.partition.getSourcePartition(), - signalPayload.offsetContext.getOffset(), signalPayload.source, database, schema, null, - tableChange.getTable(), toSchemaChangeEventType(tableChange.getType()), false, tableChange.getPreviousId())); + emitter.schemaChangeEvent(SchemaChangeEvent.ofTableChange( + tableChange, + signalPayload.partition.getSourcePartition(), + signalPayload.offsetContext.getOffset(), + signalPayload.source, + database, + schema)); }); } else if (dispatcher.getSchema() instanceof RelationalDatabaseSchema) { @@ -76,16 +78,4 @@ else if (dispatcher.getSchema() instanceof RelationalDatabaseSchema) { } return true; } - - private SchemaChangeEvent.SchemaChangeEventType toSchemaChangeEventType(TableChanges.TableChangeType type) { - switch (type) { - case CREATE: - return SchemaChangeEventType.CREATE; - case ALTER: - return SchemaChangeEventType.ALTER; - case DROP: - return SchemaChangeEventType.DROP; - } - throw new DebeziumException("Unknown table change event type " + type); - } } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/JsonTableChangeSerializer.java b/debezium-core/src/main/java/io/debezium/relational/history/JsonTableChangeSerializer.java index 1fec8ae52..87c63ac27 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/JsonTableChangeSerializer.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/JsonTableChangeSerializer.java @@ -116,7 +116,7 @@ public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema) { tableChanges.create(change.getTable()); } else if (change.getType() == TableChangeType.ALTER) { - tableChanges.alter(change.getTable(), change.getPreviousId()); + tableChanges.alter(change); } else if (change.getType() == TableChangeType.DROP) { tableChanges.drop(change.getTable()); diff --git a/debezium-core/src/main/java/io/debezium/relational/history/TableChanges.java b/debezium-core/src/main/java/io/debezium/relational/history/TableChanges.java index bd29d519f..0a5889d29 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/TableChanges.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/TableChanges.java @@ -34,10 +34,18 @@ public TableChanges create(Table table) { } public TableChanges alter(Table table) { - return alter(table, null); + changes.add(new TableChange(TableChangeType.ALTER, table)); + return this; } - public TableChanges alter(Table table, TableId previousId) { + public TableChanges alter(TableChange change) { + if (change.getPreviousId() == null) { + return alter(change.getTable()); + } + return rename(change.getTable(), change.getPreviousId()); + } + + public TableChanges rename(Table table, TableId previousId) { changes.add(new TableChange(TableChangeType.ALTER, table, previousId)); return this; } @@ -167,6 +175,7 @@ else if (!table.equals(other.table)) { public String toString() { return "TableChange [type=" + type + ", id=" + id + ", previousId=" + previousId + ", table=" + table + "]"; } + } /** diff --git a/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java b/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java index 004941579..a71237746 100644 --- a/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java +++ b/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java @@ -13,6 +13,9 @@ import org.apache.kafka.connect.data.Struct; +import io.debezium.DebeziumException; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.spi.Partition; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; @@ -37,14 +40,14 @@ public class SchemaChangeEvent { private final Instant timestamp; private TableChanges tableChanges = new TableChanges(); - public SchemaChangeEvent(Map partition, Map offset, Struct source, String database, String schema, String ddl, Table table, - SchemaChangeEventType type, boolean isFromSnapshot, TableId previousTableId) { + private SchemaChangeEvent(Map partition, Map offset, Struct source, String database, String schema, String ddl, Table table, + SchemaChangeEventType type, boolean isFromSnapshot, TableId previousTableId) { this(partition, offset, source, database, schema, ddl, table != null ? Collections.singleton(table) : Collections.emptySet(), type, isFromSnapshot, Clock.SYSTEM.currentTimeAsInstant(), previousTableId); } - public SchemaChangeEvent(Map partition, Map offset, Struct source, String database, String schema, String ddl, Set tables, - SchemaChangeEventType type, boolean isFromSnapshot, Instant timestamp, TableId previousTableId) { + private SchemaChangeEvent(Map partition, Map offset, Struct source, String database, String schema, String ddl, Set
tables, + SchemaChangeEventType type, boolean isFromSnapshot, Instant timestamp, TableId previousTableId) { this.partition = Objects.requireNonNull(partition, "partition must not be null"); this.offset = Objects.requireNonNull(offset, "offset must not be null"); this.source = Objects.requireNonNull(source, "source must not be null"); @@ -62,8 +65,13 @@ public SchemaChangeEvent(Map partition, Map offset, Struct tables.forEach(tableChanges::create); break; case ALTER: - // there is only ever 1 table within the set, so it's safe to apply the previousTableId like this - tables.forEach(t -> tableChanges.alter(t, previousTableId)); + if (previousTableId == null) { + tables.forEach(tableChanges::alter); + } + else { + // there is only ever 1 table within the set, so it's safe to apply the previousTableId like this + tables.forEach(t -> tableChanges.rename(t, previousTableId)); + } break; case DROP: tables.forEach(tableChanges::drop); @@ -134,4 +142,207 @@ public static enum SchemaChangeEventType { DROP, DATABASE; } + + /** + * Create a schema change event for any event type that does not involve a table rename. + * + * @param type the schema change event type + * @param partition the partition + * @param offsetContext the offset context + * @param databaseName the database name + * @param schemaName the schema name + * @param ddl the schema change DDL statement + * @param table the affected relational table + * @param isFromSnapshot flag indicating whether the change is from snapshot + * @return the schema change event + */ + public static SchemaChangeEvent of(SchemaChangeEventType type, Partition partition, OffsetContext offsetContext, + String databaseName, String schemaName, String ddl, Table table, boolean isFromSnapshot) { + return new SchemaChangeEvent( + partition.getSourcePartition(), + offsetContext.getOffset(), + offsetContext.getSourceInfo(), + databaseName, + schemaName, + ddl, + table, + type, + isFromSnapshot, + null); + } + + /** + * Create a schema change event for a {@link io.debezium.relational.history.TableChanges.TableChange}. + * + * @param change the table change event + * @param partition the partition + * @param offset the offsets + * @param source the source + * @param databaseName the database name + * @param schemaName the schema name + * @return the schema change event + */ + public static SchemaChangeEvent ofTableChange(TableChanges.TableChange change, Map partition, Map offset, + Struct source, String databaseName, String schemaName) { + return new SchemaChangeEvent( + partition, + offset, + source, + databaseName, + schemaName, + null, + change.getTable(), + toSchemaChangeEventType(change.getType()), + false, + change.getPreviousId()); + } + + /** + * Create a schema change event for a database-specific DDL operation. + * + * @param partition the partition + * @param offsetContext the offset context + * @param databaseName the database name + * @param ddl the schema change DDL statement + * @param isFromSnapshot flag indicating whether the change is from snapshot + * @return the schema change event + */ + public static SchemaChangeEvent ofDatabase(Partition partition, OffsetContext offsetContext, String databaseName, + String ddl, boolean isFromSnapshot) { + return of( + SchemaChangeEventType.DATABASE, + partition, + offsetContext, + databaseName, + null, + ddl, + (Table) null, + isFromSnapshot); + } + + /** + * Create a schema change event for a {@code CREATE TABLE} statement without DDL from snapshot phase. + * + * @param partition the partition + * @param offsetContext the offset context + * @param databaseName the database name + * @param table the affected relational table + * @return the schema change event + */ + public static SchemaChangeEvent ofSnapshotCreate(Partition partition, OffsetContext offsetContext, String databaseName, + Table table) { + return ofCreate(partition, offsetContext, databaseName, table.id().schema(), null, table, true); + } + + /** + * Create a schema change event for a {@code CREATE TABLE} statement with DDL. + * + * @param partition the partition + * @param offsetContext the offset context + * @param databaseName the database name + * @param schemaName the schema name + * @param ddl the schema change DDL statement + * @param table the affected relational table + * @param isFromSnapshot flag indicating whether the change is from snapshot + * @return the schema change event + */ + public static SchemaChangeEvent ofCreate(Partition partition, OffsetContext offsetContext, String databaseName, + String schemaName, String ddl, Table table, boolean isFromSnapshot) { + return of( + SchemaChangeEventType.CREATE, + partition, + offsetContext, + databaseName, + schemaName, + ddl, + table, + isFromSnapshot); + } + + /** + * Create a schema change event for a {@code ALTER TABLE} event. + * + * @param partition the partition + * @param offsetContext the offset context + * @param databaseName the database name + * @param schemaName the schema name + * @param ddl the schema change DDL statement + * @param table the affected relational table + * @return the schema change event + */ + public static SchemaChangeEvent ofAlter(Partition partition, OffsetContext offsetContext, String databaseName, + String schemaName, String ddl, Table table) { + return of( + SchemaChangeEventType.ALTER, + partition, + offsetContext, + databaseName, + schemaName, + ddl, + table, + false); + } + + /** + * Create a schema change event for a {@code ALTER TABLE RENAME} event. + * + * @param partition the partition + * @param offsetContext the offset context + * @param databaseName the database name + * @param schemaName the schema name + * @param ddl the schema change DDL statement + * @param table the affected relational table + * @param previousTableId the old, previous relational table identifier + * @return the schema change event + */ + public static SchemaChangeEvent ofRename(Partition partition, OffsetContext offsetContext, String databaseName, + String schemaName, String ddl, Table table, TableId previousTableId) { + return new SchemaChangeEvent( + partition.getSourcePartition(), + offsetContext.getOffset(), + offsetContext.getSourceInfo(), + databaseName, + schemaName, + ddl, + table, + SchemaChangeEventType.ALTER, + false, + previousTableId); + } + + /** + * Create a schema change event for a {@code DROP TABLE} event. + * + * @param partition the partition + * @param offsetContext the offset context + * @param databaseName the database name + * @param schemaName the schema name + * @param ddl the schema change DDL statement + * @param table the affected relational table + * @return the schema change event + */ + public static SchemaChangeEvent ofDrop(Partition partition, OffsetContext offsetContext, String databaseName, + String schemaName, String ddl, Table table) { + return of( + SchemaChangeEventType.DROP, + partition, + offsetContext, + databaseName, + schemaName, + ddl, + table, + false); + } + + private static SchemaChangeEvent.SchemaChangeEventType toSchemaChangeEventType(TableChanges.TableChangeType type) { + switch (type) { + case CREATE: + return SchemaChangeEventType.CREATE; + case ALTER: + return SchemaChangeEventType.ALTER; + case DROP: + return SchemaChangeEventType.DROP; + } + throw new DebeziumException("Unknown table change event type " + type); + } }