DBZ-2975: Recover database schema from multiple partitions

This commit is contained in:
Mike Kamornikov 2021-10-13 17:07:21 +03:00 committed by Gunnar Morling
parent 62f00343b7
commit 16c07e915b
6 changed files with 67 additions and 12 deletions

View File

@ -2579,8 +2579,8 @@ public void record(Map<String, ?> source, Map<String, ?> position, String databa
} }
@Override @Override
public void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) { public void recover(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
delegate.recover(source, position, schema, ddlParser); delegate.recover(offsets, schema, ddlParser);
} }
@Override @Override

View File

@ -23,6 +23,10 @@ public void resetOffset(P partition) {
offsets.put(partition, null); offsets.put(partition, null);
} }
public Map<P, O> getOffsets() {
return offsets;
}
/** /**
* Returns the offset of the only partition that the task is configured to use. * Returns the offset of the only partition that the task is configured to use.
* *

View File

@ -5,9 +5,12 @@
*/ */
package io.debezium.relational; package io.debezium.relational;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Key.KeyMapper; import io.debezium.relational.Key.KeyMapper;
import io.debezium.relational.Tables.ColumnNameFilter; import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.Tables.TableFilter;
@ -42,12 +45,27 @@ protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnect
} }
@Override @Override
public void recover(Partition partition, OffsetContext offset) { public void recover(Offsets<?, ?> offsets) {
final boolean hasNonNullOffsets = offsets.getOffsets()
.values()
.stream()
.anyMatch(Objects::nonNull);
if (!hasNonNullOffsets) {
// there is nothing to recover
return;
}
if (!databaseHistory.exists()) { if (!databaseHistory.exists()) {
String msg = "The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot."; String msg = "The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.";
throw new DebeziumException(msg); throw new DebeziumException(msg);
} }
databaseHistory.recover(partition.getSourcePartition(), offset.getOffset(), tables(), getDdlParser()); Map<Map<String, ?>, Map<String, ?>> sourceOffsets = new HashMap<>();
offsets.getOffsets().forEach((partition, offsetContext) -> {
Map<String, ?> offset = offsetContext != null ? offsetContext.getOffset() : null;
sourceOffsets.put(partition.getSourcePartition(), offset);
});
databaseHistory.recover(sourceOffsets, tables(), getDdlParser());
recoveredTables = !tableIds().isEmpty(); recoveredTables = !tableIds().isEmpty();
for (TableId tableId : tableIds()) { for (TableId tableId : tableIds()) {
buildAndRegisterSchema(tableFor(tableId)); buildAndRegisterSchema(tableFor(tableId));

View File

@ -5,6 +5,7 @@
*/ */
package io.debezium.relational.history; package io.debezium.relational.history;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -20,6 +21,7 @@
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.document.Array; import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.function.Predicates; import io.debezium.function.Predicates;
import io.debezium.relational.Tables; import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.ddl.DdlParser;
@ -96,13 +98,21 @@ public final void record(Map<String, ?> source, Map<String, ?> position, String
} }
@Override @Override
public final void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) { public void recover(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
logger.debug("Recovering DDL history for source partition {} and offset {}", source, position);
listener.recoveryStarted(); listener.recoveryStarted();
HistoryRecord stopPoint = new HistoryRecord(source, position, null, null, null, null); Map<Document, HistoryRecord> stopPoints = new HashMap<>();
offsets.forEach((Map<String, ?> source, Map<String, ?> position) -> {
Document srcDocument = Document.create();
if (source != null) {
source.forEach(srcDocument::set);
}
stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null));
});
recoverRecords(recovered -> { recoverRecords(recovered -> {
listener.onChangeFromHistory(recovered); listener.onChangeFromHistory(recovered);
if (comparator.isAtOrBefore(recovered, stopPoint)) { Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE);
if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) {
Array tableChanges = recovered.tableChanges(); Array tableChanges = recovered.tableChanges();
String ddl = recovered.ddl(); String ddl = recovered.ddl();

View File

@ -5,6 +5,7 @@
*/ */
package io.debezium.relational.history; package io.debezium.relational.history;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
@ -135,7 +136,23 @@ public interface DatabaseHistory {
* may not be null * may not be null
* @param ddlParser the DDL parser that can be used to apply DDL statements to the given {@code schema}; may not be null * @param ddlParser the DDL parser that can be used to apply DDL statements to the given {@code schema}; may not be null
*/ */
void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser); default void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
recover(Collections.singletonMap(source, position), schema, ddlParser);
}
/**
* Recover the {@link Tables database schema} to a known point in its history. Note that it is possible to recover the
* database schema to a point in history that is earlier than what has been {@link #record(Map, Map, String, Tables, String)
* recorded}. Likewise, when recovering to a point in history <em>later</em> than what was recorded, the database schema will
* reflect the latest state known to the history.
*
* @param offsets the map of information about the source database to corresponding point in history at which database
* schema should be recovered
* @param schema the table definitions that should be changed to reflect the database schema at the desired point in history;
* may not be null
* @param ddlParser the DDL parser that can be used to apply DDL statements to the given {@code schema}; may not be null
*/
void recover(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser);
/** /**
* Stop recording history and release any resources acquired since {@link #configure(Configuration, HistoryRecordComparator, DatabaseHistoryListener)}. * Stop recording history and release any resources acquired since {@link #configure(Configuration, HistoryRecordComparator, DatabaseHistoryListener)}.

View File

@ -6,8 +6,10 @@
package io.debezium.schema; package io.debezium.schema;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition; import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
@ -33,7 +35,11 @@ public static interface SchemaChangeEventConsumer {
void applySchemaChange(SchemaChangeEvent schemaChange); void applySchemaChange(SchemaChangeEvent schemaChange);
void recover(Partition partition, OffsetContext offset); default void recover(Partition partition, OffsetContext offset) {
recover(new Offsets<>(Collections.singletonMap(partition, offset)));
}
void recover(Offsets<?, ?> offsets);
void initializeStorage(); void initializeStorage();