DBZ-4451 Introduce SchemaChangeEvent factory methods

This commit is contained in:
Chris Cranford 2022-06-13 14:28:52 -04:00 committed by Jiri Pechanec
parent b6549670c8
commit 3fb5bf1339
12 changed files with 286 additions and 110 deletions

View File

@ -279,9 +279,7 @@ else if (event instanceof SetVariableEvent) {
else {
offset.databaseEvent(databaseName, sourceTime);
schemaChangeEvents
.add(new SchemaChangeEvent(partition.getSourcePartition(), offset.getOffset(),
offset.getSourceInfo(), databaseName, null, ddlStatements, (Table) null,
SchemaChangeEventType.DATABASE, snapshot, null));
.add(SchemaChangeEvent.ofDatabase(partition, offset, databaseName, ddlStatements, snapshot));
}
}
else {
@ -293,8 +291,15 @@ else if (event instanceof SetVariableEvent) {
private void emitChangeEvent(MySqlPartition partition, MySqlOffsetContext offset, List<SchemaChangeEvent> schemaChangeEvents,
final String sanitizedDbName, Event event, TableId tableId, SchemaChangeEventType type,
boolean snapshot) {
schemaChangeEvents.add(new SchemaChangeEvent(partition.getSourcePartition(), offset.getOffset(), offset.getSourceInfo(),
sanitizedDbName, null, event.statement(), tableId != null ? tableFor(tableId) : null, type, snapshot, null));
schemaChangeEvents.add(SchemaChangeEvent.of(
type,
partition,
offset,
sanitizedDbName,
null,
event.statement(),
tableId != null ? tableFor(tableId) : null,
snapshot));
}
private boolean acceptableDatabase(final String databaseName) {

View File

@ -39,7 +39,6 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.Strings;
@ -384,17 +383,7 @@ private boolean twoPhaseSchemaSnapshot() {
protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext,
Table table)
throws SQLException {
return new SchemaChangeEvent(
snapshotContext.partition.getSourcePartition(),
snapshotContext.offset.getOffset(),
snapshotContext.offset.getSourceInfo(),
snapshotContext.catalogName,
table.id().schema(),
null,
table,
SchemaChangeEventType.CREATE,
true,
null);
return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table);
}
@Override

View File

@ -26,7 +26,6 @@
import io.debezium.relational.ddl.DdlParserListener.TableCreatedEvent;
import io.debezium.relational.ddl.DdlParserListener.TableDroppedEvent;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException;
@ -127,17 +126,14 @@ public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException
private SchemaChangeEvent createTableEvent(OraclePartition partition, TableCreatedEvent event) {
offsetContext.tableEvent(tableId, changeTime);
return new SchemaChangeEvent(
partition.getSourcePartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
return SchemaChangeEvent.ofCreate(
partition,
offsetContext,
tableId.catalog(),
tableId.schema(),
event.statement(),
schema.tableFor(event.tableId()),
SchemaChangeEventType.CREATE,
false,
null);
false);
}
private SchemaChangeEvent alterTableEvent(OraclePartition partition, TableAlteredEvent event) {
@ -146,31 +142,35 @@ private SchemaChangeEvent alterTableEvent(OraclePartition partition, TableAltere
tableIds.add(event.tableId());
offsetContext.tableEvent(tableIds, changeTime);
return new SchemaChangeEvent(
partition.getSourcePartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
tableId.catalog(),
tableId.schema(),
event.statement(),
schema.tableFor(event.tableId()),
SchemaChangeEventType.ALTER,
false,
tableId);
if (tableId == null) {
return SchemaChangeEvent.ofAlter(
partition,
offsetContext,
tableId.catalog(),
tableId.schema(),
event.statement(),
schema.tableFor(event.tableId()));
}
else {
return SchemaChangeEvent.ofRename(
partition,
offsetContext,
tableId.catalog(),
tableId.schema(),
event.statement(),
schema.tableFor(event.tableId()),
tableId);
}
}
private SchemaChangeEvent dropTableEvent(OraclePartition partition, Table tableSchemaBeforeDrop, TableDroppedEvent event) {
offsetContext.tableEvent(tableId, changeTime);
return new SchemaChangeEvent(
partition.getSourcePartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
return SchemaChangeEvent.ofDrop(
partition,
offsetContext,
tableId.catalog(),
tableId.schema(),
event.statement(),
tableSchemaBeforeDrop,
SchemaChangeEventType.DROP,
false,
null);
tableSchemaBeforeDrop);
}
}

View File

@ -25,7 +25,6 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
/**
@ -226,17 +225,14 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext,
Table table)
throws SQLException {
return new SchemaChangeEvent(
snapshotContext.partition.getSourcePartition(),
snapshotContext.offset.getOffset(),
snapshotContext.offset.getSourceInfo(),
return SchemaChangeEvent.ofCreate(
snapshotContext.partition,
snapshotContext.offset,
snapshotContext.catalogName,
table.id().schema(),
jdbcConnection.getTableMetadataDdl(table.id()),
table,
SchemaChangeEventType.CREATE,
true,
null);
true);
}
/**

View File

@ -59,7 +59,7 @@ protected HistoryRecord getRenameAlterHistoryRecord() {
TestHelper.getDatabaseName(),
"DEBEZIUM",
"ALTER TABLE DBZ4451A RENAME TO DBZ4451B;",
new TableChanges().alter(table, oldTableId),
new TableChanges().rename(table, oldTableId),
Instant.now());
}

View File

@ -26,7 +26,6 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
@ -205,17 +204,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext,
Table table)
throws SQLException {
return new SchemaChangeEvent(
snapshotContext.partition.getSourcePartition(),
snapshotContext.offset.getOffset(),
snapshotContext.offset.getSourceInfo(),
snapshotContext.catalogName,
table.id().schema(),
null,
table,
SchemaChangeEventType.CREATE,
true,
null);
return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table);
}
@Override

View File

@ -34,17 +34,15 @@ public SqlServerSchemaChangeEventEmitter(SqlServerPartition partition, SqlServer
@Override
public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException {
final SchemaChangeEvent event = new SchemaChangeEvent(
partition.getSourcePartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
final SchemaChangeEvent event = SchemaChangeEvent.of(
eventType,
partition,
offsetContext,
changeTable.getSourceTableId().catalog(),
changeTable.getSourceTableId().schema(),
"N/A",
tableSchema,
eventType,
false,
null);
false);
receiver.schemaChangeEvent(event);
}

View File

@ -28,7 +28,6 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
public class SqlServerSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<SqlServerPartition, SqlServerOffsetContext> {
@ -230,17 +229,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<SqlServerPartition, SqlServerOffsetContext> snapshotContext,
Table table)
throws SQLException {
return new SchemaChangeEvent(
snapshotContext.partition.getSourcePartition(),
snapshotContext.offset.getOffset(),
snapshotContext.offset.getSourceInfo(),
snapshotContext.catalogName,
table.id().schema(),
null,
table,
SchemaChangeEventType.CREATE,
true,
null);
return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table);
}
@Override

View File

@ -8,7 +8,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.document.Array;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.signal.Signal.Payload;
@ -20,7 +19,6 @@
import io.debezium.relational.history.TableChanges.TableChangeType;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
public class SchemaChanges<P extends Partition> implements Signal.Action<P> {
@ -61,9 +59,13 @@ public boolean arrived(Payload<P> signalPayload) throws InterruptedException {
if (dispatcher.getHistorizedSchema() != null) {
LOGGER.info("Executing schema change for table '{}' requested by signal '{}'", tableChange.getId(), signalPayload.id);
dispatcher.dispatchSchemaChangeEvent(signalPayload.partition, tableChange.getId(), emitter -> {
emitter.schemaChangeEvent(new SchemaChangeEvent(signalPayload.partition.getSourcePartition(),
signalPayload.offsetContext.getOffset(), signalPayload.source, database, schema, null,
tableChange.getTable(), toSchemaChangeEventType(tableChange.getType()), false, tableChange.getPreviousId()));
emitter.schemaChangeEvent(SchemaChangeEvent.ofTableChange(
tableChange,
signalPayload.partition.getSourcePartition(),
signalPayload.offsetContext.getOffset(),
signalPayload.source,
database,
schema));
});
}
else if (dispatcher.getSchema() instanceof RelationalDatabaseSchema) {
@ -76,16 +78,4 @@ else if (dispatcher.getSchema() instanceof RelationalDatabaseSchema) {
}
return true;
}
private SchemaChangeEvent.SchemaChangeEventType toSchemaChangeEventType(TableChanges.TableChangeType type) {
switch (type) {
case CREATE:
return SchemaChangeEventType.CREATE;
case ALTER:
return SchemaChangeEventType.ALTER;
case DROP:
return SchemaChangeEventType.DROP;
}
throw new DebeziumException("Unknown table change event type " + type);
}
}

View File

@ -116,7 +116,7 @@ public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema) {
tableChanges.create(change.getTable());
}
else if (change.getType() == TableChangeType.ALTER) {
tableChanges.alter(change.getTable(), change.getPreviousId());
tableChanges.alter(change);
}
else if (change.getType() == TableChangeType.DROP) {
tableChanges.drop(change.getTable());

View File

@ -34,10 +34,18 @@ public TableChanges create(Table table) {
}
public TableChanges alter(Table table) {
return alter(table, null);
changes.add(new TableChange(TableChangeType.ALTER, table));
return this;
}
public TableChanges alter(Table table, TableId previousId) {
public TableChanges alter(TableChange change) {
if (change.getPreviousId() == null) {
return alter(change.getTable());
}
return rename(change.getTable(), change.getPreviousId());
}
public TableChanges rename(Table table, TableId previousId) {
changes.add(new TableChange(TableChangeType.ALTER, table, previousId));
return this;
}
@ -167,6 +175,7 @@ else if (!table.equals(other.table)) {
public String toString() {
return "TableChange [type=" + type + ", id=" + id + ", previousId=" + previousId + ", table=" + table + "]";
}
}
/**

View File

@ -13,6 +13,9 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.DebeziumException;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
@ -37,14 +40,14 @@ public class SchemaChangeEvent {
private final Instant timestamp;
private TableChanges tableChanges = new TableChanges();
public SchemaChangeEvent(Map<String, ?> partition, Map<String, ?> offset, Struct source, String database, String schema, String ddl, Table table,
SchemaChangeEventType type, boolean isFromSnapshot, TableId previousTableId) {
private SchemaChangeEvent(Map<String, ?> partition, Map<String, ?> offset, Struct source, String database, String schema, String ddl, Table table,
SchemaChangeEventType type, boolean isFromSnapshot, TableId previousTableId) {
this(partition, offset, source, database, schema, ddl, table != null ? Collections.singleton(table) : Collections.emptySet(),
type, isFromSnapshot, Clock.SYSTEM.currentTimeAsInstant(), previousTableId);
}
public SchemaChangeEvent(Map<String, ?> partition, Map<String, ?> offset, Struct source, String database, String schema, String ddl, Set<Table> tables,
SchemaChangeEventType type, boolean isFromSnapshot, Instant timestamp, TableId previousTableId) {
private SchemaChangeEvent(Map<String, ?> partition, Map<String, ?> offset, Struct source, String database, String schema, String ddl, Set<Table> tables,
SchemaChangeEventType type, boolean isFromSnapshot, Instant timestamp, TableId previousTableId) {
this.partition = Objects.requireNonNull(partition, "partition must not be null");
this.offset = Objects.requireNonNull(offset, "offset must not be null");
this.source = Objects.requireNonNull(source, "source must not be null");
@ -62,8 +65,13 @@ public SchemaChangeEvent(Map<String, ?> partition, Map<String, ?> offset, Struct
tables.forEach(tableChanges::create);
break;
case ALTER:
// there is only ever 1 table within the set, so it's safe to apply the previousTableId like this
tables.forEach(t -> tableChanges.alter(t, previousTableId));
if (previousTableId == null) {
tables.forEach(tableChanges::alter);
}
else {
// there is only ever 1 table within the set, so it's safe to apply the previousTableId like this
tables.forEach(t -> tableChanges.rename(t, previousTableId));
}
break;
case DROP:
tables.forEach(tableChanges::drop);
@ -134,4 +142,207 @@ public static enum SchemaChangeEventType {
DROP,
DATABASE;
}
/**
* Create a schema change event for any event type that does not involve a table rename.
*
* @param type the schema change event type
* @param partition the partition
* @param offsetContext the offset context
* @param databaseName the database name
* @param schemaName the schema name
* @param ddl the schema change DDL statement
* @param table the affected relational table
* @param isFromSnapshot flag indicating whether the change is from snapshot
* @return the schema change event
*/
public static SchemaChangeEvent of(SchemaChangeEventType type, Partition partition, OffsetContext offsetContext,
String databaseName, String schemaName, String ddl, Table table, boolean isFromSnapshot) {
return new SchemaChangeEvent(
partition.getSourcePartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
databaseName,
schemaName,
ddl,
table,
type,
isFromSnapshot,
null);
}
/**
* Create a schema change event for a {@link io.debezium.relational.history.TableChanges.TableChange}.
*
* @param change the table change event
* @param partition the partition
* @param offset the offsets
* @param source the source
* @param databaseName the database name
* @param schemaName the schema name
* @return the schema change event
*/
public static SchemaChangeEvent ofTableChange(TableChanges.TableChange change, Map<String, ?> partition, Map<String, ?> offset,
Struct source, String databaseName, String schemaName) {
return new SchemaChangeEvent(
partition,
offset,
source,
databaseName,
schemaName,
null,
change.getTable(),
toSchemaChangeEventType(change.getType()),
false,
change.getPreviousId());
}
/**
* Create a schema change event for a database-specific DDL operation.
*
* @param partition the partition
* @param offsetContext the offset context
* @param databaseName the database name
* @param ddl the schema change DDL statement
* @param isFromSnapshot flag indicating whether the change is from snapshot
* @return the schema change event
*/
public static SchemaChangeEvent ofDatabase(Partition partition, OffsetContext offsetContext, String databaseName,
String ddl, boolean isFromSnapshot) {
return of(
SchemaChangeEventType.DATABASE,
partition,
offsetContext,
databaseName,
null,
ddl,
(Table) null,
isFromSnapshot);
}
/**
* Create a schema change event for a {@code CREATE TABLE} statement without DDL from snapshot phase.
*
* @param partition the partition
* @param offsetContext the offset context
* @param databaseName the database name
* @param table the affected relational table
* @return the schema change event
*/
public static SchemaChangeEvent ofSnapshotCreate(Partition partition, OffsetContext offsetContext, String databaseName,
Table table) {
return ofCreate(partition, offsetContext, databaseName, table.id().schema(), null, table, true);
}
/**
* Create a schema change event for a {@code CREATE TABLE} statement with DDL.
*
* @param partition the partition
* @param offsetContext the offset context
* @param databaseName the database name
* @param schemaName the schema name
* @param ddl the schema change DDL statement
* @param table the affected relational table
* @param isFromSnapshot flag indicating whether the change is from snapshot
* @return the schema change event
*/
public static SchemaChangeEvent ofCreate(Partition partition, OffsetContext offsetContext, String databaseName,
String schemaName, String ddl, Table table, boolean isFromSnapshot) {
return of(
SchemaChangeEventType.CREATE,
partition,
offsetContext,
databaseName,
schemaName,
ddl,
table,
isFromSnapshot);
}
/**
* Create a schema change event for a {@code ALTER TABLE} event.
*
* @param partition the partition
* @param offsetContext the offset context
* @param databaseName the database name
* @param schemaName the schema name
* @param ddl the schema change DDL statement
* @param table the affected relational table
* @return the schema change event
*/
public static SchemaChangeEvent ofAlter(Partition partition, OffsetContext offsetContext, String databaseName,
String schemaName, String ddl, Table table) {
return of(
SchemaChangeEventType.ALTER,
partition,
offsetContext,
databaseName,
schemaName,
ddl,
table,
false);
}
/**
* Create a schema change event for a {@code ALTER TABLE RENAME} event.
*
* @param partition the partition
* @param offsetContext the offset context
* @param databaseName the database name
* @param schemaName the schema name
* @param ddl the schema change DDL statement
* @param table the affected relational table
* @param previousTableId the old, previous relational table identifier
* @return the schema change event
*/
public static SchemaChangeEvent ofRename(Partition partition, OffsetContext offsetContext, String databaseName,
String schemaName, String ddl, Table table, TableId previousTableId) {
return new SchemaChangeEvent(
partition.getSourcePartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
databaseName,
schemaName,
ddl,
table,
SchemaChangeEventType.ALTER,
false,
previousTableId);
}
/**
* Create a schema change event for a {@code DROP TABLE} event.
*
* @param partition the partition
* @param offsetContext the offset context
* @param databaseName the database name
* @param schemaName the schema name
* @param ddl the schema change DDL statement
* @param table the affected relational table
* @return the schema change event
*/
public static SchemaChangeEvent ofDrop(Partition partition, OffsetContext offsetContext, String databaseName,
String schemaName, String ddl, Table table) {
return of(
SchemaChangeEventType.DROP,
partition,
offsetContext,
databaseName,
schemaName,
ddl,
table,
false);
}
private static SchemaChangeEvent.SchemaChangeEventType toSchemaChangeEventType(TableChanges.TableChangeType type) {
switch (type) {
case CREATE:
return SchemaChangeEventType.CREATE;
case ALTER:
return SchemaChangeEventType.ALTER;
case DROP:
return SchemaChangeEventType.DROP;
}
throw new DebeziumException("Unknown table change event type " + type);
}
}