DBZ-3093 Dispatch schema change events always

This commit is contained in:
Jiri Pechanec 2021-02-11 19:01:20 +01:00 committed by Gunnar Morling
parent 6607310f13
commit 7bfbd5c7c5
3 changed files with 37 additions and 30 deletions

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -218,7 +219,7 @@ public Optional<Table> parseSnapshotDdl(String ddlStatements, String databaseNam
return (table != null) ? Optional.of(table) : Optional.of(Table.editor().tableId(tableId).create());
}
public List<SchemaChangeEvent> parseStreamingDdl(String ddlStatements, String databaseName, MySqlOffsetContext offset) {
public List<SchemaChangeEvent> parseStreamingDdl(String ddlStatements, String databaseName, MySqlOffsetContext offset, Instant sourceTime) {
final List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>();
LOGGER.debug("Processing streaming DDL '{}' for database '{}'", ddlStatements, databaseName);
@ -255,14 +256,18 @@ public List<SchemaChangeEvent> parseStreamingDdl(String ddlStatements, String da
// the same order they were read for each _affected_ database, grouped together if multiple apply
// to the same _affected_ database...
ddlChanges.getEventsByDatabase((String dbName, List<Event> events) -> {
if (acceptableDatabase(dbName)) {
final String sanitizedDbName = (dbName == null) ? "" : dbName;
if (acceptableDatabase(dbName)) {
final Set<TableId> tableIds = new HashSet<>();
events.forEach(event -> {
final TableId tableId = getTableId(event);
if (tableId != null) {
tableIds.add(tableId);
}
});
events.forEach(event -> {
final TableId tableId = getTableId(event);
offset.tableEvent(dbName, tableIds, sourceTime);
if (event instanceof TableCreatedEvent) {
schemaChangeEvents
.add(new SchemaChangeEvent(offset.getPartition(), offset.getOffset(), offset.getSourceInfo(),
@ -284,21 +289,11 @@ else if (event instanceof TableDroppedEvent) {
sanitizedDbName, null, event.statement(), (Table) null, SchemaChangeEventType.DATABASE, false));
}
});
// final Struct source = schemaChange.getSource();
// source.put(AbstractSourceInfo.DATABASE_NAME_KEY, sanitizedDbName);
// final String tableNamesStr = tableIds.stream().map(TableId::table).collect(Collectors.joining(","));
// if (!tableNamesStr.isEmpty()) {
// source.put(AbstractSourceInfo.TABLE_NAME_KEY, tableNamesStr);
// }
// schemaEventConsumer.consume(new SchemaChangeEvent(schemaChange.getPartition(),
// schemaChange.getOffset(), schemaChange.getSource(), sanitizedDbName,
// schemaChange.getSchema(), schemaChange.getDdl(), Collections.emptySet(), SchemaChangeEventType.DATABASE,
// schemaChange.isFromSnapshot()),
// tableIds);
}
});
}
else if (acceptableDatabase(databaseName)) {
offset.databaseEvent(databaseName, sourceTime);
schemaChangeEvents
.add(new SchemaChangeEvent(offset.getPartition(), offset.getOffset(), offset.getSourceInfo(),
databaseName, null, ddlStatements, (Table) null, SchemaChangeEventType.DATABASE, false));

View File

@ -9,6 +9,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@ -221,6 +222,18 @@ public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.tableEvent((TableId) tableId);
}
public void databaseEvent(String database, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
sourceInfo.databaseEvent(database);
sourceInfo.tableEvent((TableId) null);
}
public void tableEvent(String database, Set<TableId> tableIds, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
sourceInfo.databaseEvent(database);
sourceInfo.tableEvent(tableIds);
}
@Override
public TransactionContext getTransactionContext() {
return transactionContext;

View File

@ -570,8 +570,8 @@ else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandli
MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
}
final List<SchemaChangeEvent> schemaChangeEvents = taskContext.getSchema().parseStreamingDdl(sql, command.getDatabase(), offsetContext);
if (recordSchemaChangesInSourceRecords) {
final List<SchemaChangeEvent> schemaChangeEvents = taskContext.getSchema().parseStreamingDdl(sql, command.getDatabase(), offsetContext,
clock.currentTimeAsInstant());
try {
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id();
@ -589,7 +589,6 @@ else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandli
LOGGER.info("Processing interrupted");
}
}
}
private void handleTransactionCompletion(Event event) {
// We are completing the transaction ...