DBZ-20 Recording schema history;

This records the DDL for DDL events captured during streaming. For the
initial schema snapshot, a JSON-style representation of the captured
Table objects is used in a new field of HistoryRecord, as the DDL
returned by dbms_metadata.get_ddl() isn't fully parseable by our
grammar.
This commit is contained in:
Gunnar Morling 2018-03-02 22:08:26 +01:00 committed by Jiri Pechanec
parent b1d91f6f87
commit 0733110696
20 changed files with 367 additions and 69 deletions

View File

@ -14,10 +14,7 @@
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
import oracle.streams.ChunkColumnValue;
import oracle.streams.DDLLCR;
@ -90,9 +87,8 @@ private void dispatchDataChangeEvent(RowLCR lcr) throws InterruptedException {
TableId tableId = getTableId(lcr);
dispatcher.dispatchDataChangeEvent(
offsetContext,
tableId,
() -> new OracleChangeRecordEmitter(lcr, schema.getTable(tableId), clock),
() -> new OracleChangeRecordEmitter(offsetContext, lcr, schema.getTable(tableId), clock),
DataChangeEvent::new
);
}
@ -106,25 +102,10 @@ private void dispatchSchemaChangeEvent(DDLLCR ddlLcr) throws InterruptedExceptio
dispatcher.dispatchSchemaChangeEvent(
tableId,
(tid, r) -> {
SchemaChangeEventType eventType = getSchemaChangeEventType(ddlLcr);
if (eventType != null) {
Table table = new OracleDdlParser().parseCreateTable(tid, ddlLcr.getDDLText());
r.schemaChangeEvent(new SchemaChangeEvent(ddlLcr.getDDLText(), table, eventType));
}
}
() -> new OracleSchemaChangeEventEmitter(offsetContext, tableId, ddlLcr)
);
}
private SchemaChangeEventType getSchemaChangeEventType(DDLLCR ddlLcr) {
switch(ddlLcr.getCommandType()) {
case "CREATE TABLE": return SchemaChangeEventType.CREATE;
case "ALTER TABLE": throw new UnsupportedOperationException("ALTER TABLE not yet implemented");
case "DROP TABLE": throw new UnsupportedOperationException("DROP TABLE not yet implemented");
default: return null;
}
}
private TableId getTableId(LCR lcr) {
return new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName());
}

View File

@ -6,6 +6,7 @@
package io.debezium.connector.oracle;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.util.Clock;
@ -22,8 +23,8 @@ public class OracleChangeRecordEmitter extends RelationalChangeRecordEmitter {
private final RowLCR lcr;
private final Table table;
public OracleChangeRecordEmitter(RowLCR lcr, Table table, Clock clock) {
super(clock);
public OracleChangeRecordEmitter(OffsetContext offset, RowLCR lcr, Table table, Clock clock) {
super(offset, clock);
this.lcr = lcr;
this.table = table;

View File

@ -9,11 +9,16 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.KafkaDatabaseHistory;
public class OracleConnectorConfig extends CommonConnectorConfig {
@ -42,6 +47,21 @@ public class OracleConnectorConfig extends CommonConnectorConfig {
.withDescription("The name of the database the connector should be monitoring. When working with a "
+ "multi-tenant set-up, must be set to the CDB name.");
/**
* The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go.
*/
public static final Field DATABASE_HISTORY = Field.create("database.history")
.withDisplayName("Database history class")
.withType(Type.CLASS)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withInvisibleRecommender()
.withDescription("The name of the DatabaseHistory class that should be used to store and recover database schema changes. "
+ "The configuration properties for the history are prefixed with the '"
+ DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(KafkaDatabaseHistory.class.getName());
public static final Field PDB_NAME = Field.create(DATABASE_CONFIG_PREFIX + "pdb.name")
.withDisplayName("PDB name")
.withType(Type.STRING)
@ -103,4 +123,35 @@ public String getPdbName() {
public String getXoutServerName() {
return xoutServerName;
}
/**
* Returns a configured (but not yet started) instance of the database history.
*/
public DatabaseHistory getDatabaseHistory() {
Configuration config = getConfig();
DatabaseHistory databaseHistory = config.getInstance(OracleConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (databaseHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(OracleConnectorConfig.DATABASE_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.build();
// Set up a history record comparator that uses the GTID filter ...
HistoryRecordComparator historyComparator = new HistoryRecordComparator() {
@Override
protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
throw new UnsupportedOperationException();
// return SourceInfo.isPositionAtOrBefore(recorded, desired, gtidFilter);
}
};
databaseHistory.configure(dbHistoryConfig, historyComparator); // validates
return databaseHistory;
}
}

View File

@ -74,9 +74,9 @@ public void start(Configuration config) {
jdbcConnection = new OracleConnection(jdbcConfig, new OracleConnectionFactory());
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
OracleDatabaseSchema schema = new OracleDatabaseSchema(schemaNameAdjuster, topicSelector);
OracleDatabaseSchema schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector);
EventDispatcher dispatcher = new EventDispatcher(errorHandler, topicSelector, schema, queue);
EventDispatcher dispatcher = new EventDispatcher(topicSelector, schema, queue);
coordinator = new ChangeEventSourceCoordinator(
errorHandler,

View File

@ -17,9 +17,12 @@
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
@ -34,23 +37,36 @@ public class OracleDatabaseSchema implements RelationalDatabaseSchema {
private final Tables tables;
private final Map<TableId, TableSchema> schemas;
private final TableSchemaBuilder tableSchemaBuilder;
private final DatabaseHistory databaseHistory;
public OracleDatabaseSchema(SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector) {
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector) {
this.topicSelector = topicSelector;
this.tables = new Tables();
this.schemas = new HashMap<>();
this.tableSchemaBuilder = new TableSchemaBuilder(new OracleValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA);
this.databaseHistory = connectorConfig.getDatabaseHistory();
this.databaseHistory.start();
}
@Override
public void applySchemaChange(SchemaChangeEvent schemaChange) {
LOGGER.debug("Applying schema change event {}", schemaChange);
Table table = schemaChange.getTable();
// just a single table per DDL event for Oracle
Table table = schemaChange.getTables().iterator().next();
tables.overwriteTable(table);
schemas.put(table.id(), tableSchemaBuilder.create(null, getEnvelopeSchemaName(table), table, null, null));
TableChanges tableChanges = null;
if (schemaChange.getType() == SchemaChangeEventType.CREATE && !schemaChange.isFromSnapshot()) {
tableChanges = new TableChanges();
tableChanges.create(table);
}
databaseHistory.record(schemaChange.getPartition(), schemaChange.getOffset(), schemaChange.getDatabase(),
schemaChange.getDdl(), tableChanges);
}
private String getEnvelopeSchemaName(Table table) {

View File

@ -0,0 +1,46 @@
package io.debezium.connector.oracle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import oracle.streams.DDLLCR;
public class OracleSchemaChangeEventEmitter implements SchemaChangeEventEmitter {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleSchemaChangeEventEmitter.class);
private final OracleOffsetContext offsetContext;
private final TableId tableId;
private final DDLLCR ddlLcr;
public OracleSchemaChangeEventEmitter(OracleOffsetContext offsetContext, TableId tableId, DDLLCR ddlLcr) {
this.offsetContext = offsetContext;
this.tableId = tableId;
this.ddlLcr = ddlLcr;
}
@Override
public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException {
SchemaChangeEventType eventType = getSchemaChangeEventType();
if (eventType != null) {
Table table = new OracleDdlParser().parseCreateTable(tableId, ddlLcr.getDDLText());
receiver.schemaChangeEvent(new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), ddlLcr.getSourceDatabaseName(), ddlLcr.getDDLText(), table, eventType, false));
}
}
private SchemaChangeEventType getSchemaChangeEventType() {
switch(ddlLcr.getCommandType()) {
case "CREATE TABLE": return SchemaChangeEventType.CREATE;
case "ALTER TABLE": throw new UnsupportedOperationException("ALTER TABLE not yet implemented");
case "DROP TABLE": throw new UnsupportedOperationException("DROP TABLE not yet implemented");
default:
LOGGER.debug("Ignoring DDL event of type {}", ddlLcr.getCommandType());
return null;
}
}
}

View File

@ -73,6 +73,9 @@ public SnapshotResult execute(ChangeEventSourceContext context) throws Interrupt
Long scn = rs.getLong(1);
rs.close();
OracleOffsetContext offset = new OracleOffsetContext(connectorConfig.getLogicalName());
offset.setScn(scn);
Tables tables = new Tables();
jdbcConnection.readSchema(tables, catalogName, "%DEBEZIUM%", null, null, false);
@ -93,12 +96,9 @@ public SnapshotResult execute(ChangeEventSourceContext context) throws Interrupt
String ddl = ((Clob)res).getSubString(1, (int) ((Clob)res).length());
rs.close();
schema.applySchemaChange(new SchemaChangeEvent(ddl, table, SchemaChangeEventType.CREATE));
schema.applySchemaChange(new SchemaChangeEvent(offset.getPartition(), offset.getOffset(), catalogName, ddl, table, SchemaChangeEventType.CREATE, true));
}
OracleOffsetContext offset = new OracleOffsetContext(connectorConfig.getLogicalName());
offset.setScn(scn);
return SnapshotResult.completed(offset);
}
catch(SQLException e) {

View File

@ -25,6 +25,7 @@
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
/**
* Integration test for the Debezium Oracle connector.
@ -65,6 +66,7 @@ public static void closeConnection() throws SQLException {
@Before
public void before() {
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@Test
@ -79,6 +81,8 @@ public void shouldReadChangeStreamForExistingTable() throws Exception {
int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))");
connection.execute("COMMIT");
System.out.println("Inserted");
expectedRecordCount += 1;
connection.execute("UPDATE debezium.customer SET name = 'Bruce', score = 2345.67, registered = TO_DATE('2018/03/23', 'yyyy-mm-dd') WHERE id = 1");
@ -157,7 +161,7 @@ public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Excepti
start(OracleConnector.class, config);
assertConnectorIsRunning();
Thread.sleep(4000);
Thread.sleep(1000);
String ddl = "create table debezium.customer2 (" +
" id int not null, " +

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.oracle.util;
import java.nio.file.Path;
import java.sql.SQLException;
import org.slf4j.Logger;
@ -15,9 +16,13 @@
import io.debezium.connector.oracle.OracleConnectionFactory;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
public class TestHelper {
public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
private static final Logger LOGGER = LoggerFactory.getLogger(TestHelper.class);
private static JdbcConfiguration defaultJdbcConfig() {
@ -30,6 +35,10 @@ private static JdbcConfiguration defaultJdbcConfig() {
.build();
}
/**
* Returns a default configuration suitable for most test cases. Can be amended/overridden in individual tests as
* needed.
*/
public static Configuration.Builder defaultConfig() {
JdbcConfiguration jdbcConfiguration = defaultJdbcConfig();
Configuration.Builder builder = Configuration.create();
@ -38,11 +47,11 @@ public static Configuration.Builder defaultConfig() {
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)
);
builder.with(OracleConnectorConfig.LOGICAL_NAME, "server1")
return builder.with(OracleConnectorConfig.LOGICAL_NAME, "server1")
.with(OracleConnectorConfig.PDB_NAME, "ORCLPDB1")
.with(OracleConnectorConfig.XSTREAM_SERVER_NAME, "dbzxout");
return builder;
.with(OracleConnectorConfig.XSTREAM_SERVER_NAME, "dbzxout")
.with(OracleConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH);
}
public static OracleConnection defaultConnection() {
@ -89,6 +98,12 @@ public static OracleConnection testConnection() {
Configuration jdbcConfig = config.subset("database.", true);
OracleConnection jdbcConnection = new OracleConnection(jdbcConfig, new OracleConnectionFactory());
try {
jdbcConnection.setAutoCommit(false);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
String pdbName = new OracleConnectorConfig(config).getPdbName();

View File

@ -62,6 +62,7 @@ public class CommonConnectorConfig {
.withDefault(DEFAULT_POLL_INTERVAL_MILLIS)
.withValidation(Field::isPositiveInteger);
private final Configuration config;
private final boolean emitTombstoneOnDelete;
private final int maxQueueSize;
private final int maxBatchSize;
@ -69,6 +70,7 @@ public class CommonConnectorConfig {
private final String logicalName;
protected CommonConnectorConfig(Configuration config, Field logicalNameField) {
this.config = config;
this.emitTombstoneOnDelete = config.getBoolean(CommonConnectorConfig.TOMBSTONES_ON_DELETE);
this.maxQueueSize = config.getInteger(MAX_QUEUE_SIZE);
this.maxBatchSize = config.getInteger(MAX_BATCH_SIZE);
@ -76,6 +78,14 @@ protected CommonConnectorConfig(Configuration config, Field logicalNameField) {
this.logicalName = config.getString(logicalNameField);
}
/**
* Provides access to the "raw" config instance. In most cases, access via typed getters for invididual properties
* on the connector config class should be preferred.
*/
public Configuration getConfig() {
return config;
}
public boolean isEmitTombstoneOnDelete() {
return emitTombstoneOnDelete;
}

View File

@ -40,14 +40,12 @@ public class EventDispatcher {
private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
private final ErrorHandler errorHandler;
private final TopicSelector topicSelector;
private final DatabaseSchema schema;
private final ChangeEventQueue<Object> queue;
public EventDispatcher(ErrorHandler errorHandler, TopicSelector topicSelector, DatabaseSchema schema,
public EventDispatcher(TopicSelector topicSelector, DatabaseSchema schema,
ChangeEventQueue<Object> queue) {
this.errorHandler = errorHandler;
this.topicSelector = topicSelector;
this.schema = schema;
this.queue = queue;
@ -60,7 +58,7 @@ public EventDispatcher(ErrorHandler errorHandler, TopicSelector topicSelector, D
* receiving coordinator creates {@link SourceRecord}s for all emitted events and passes them to the given
* {@link ChangeEventCreator} for converting them into data change events.
*/
public void dispatchDataChangeEvent(OffsetContext offsetContext, DataCollectionId dataCollectionId, Supplier<ChangeRecordEmitter> changeRecordEmitter, ChangeEventCreator changeEventCreator) throws InterruptedException {
public void dispatchDataChangeEvent(DataCollectionId dataCollectionId, Supplier<ChangeRecordEmitter> changeRecordEmitter, ChangeEventCreator changeEventCreator) throws InterruptedException {
// TODO Handle Heartbeat
// TODO Handle JMX
@ -76,23 +74,21 @@ public void dispatchDataChangeEvent(OffsetContext offsetContext, DataCollectionI
// TODO handle as per inconsistent schema info option
if(dataCollectionSchema == null) {
errorHandler.setProducerThrowable(new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId));
return;
throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId);
}
changeRecordEmitter.get().emitChangeRecords(
offsetContext,
dataCollectionSchema,
new ChangeRecordReceiver(dataCollectionId, changeEventCreator, dataCollectionSchema)
);
}
public void dispatchSchemaChangeEvent(TableId tableId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
public void dispatchSchemaChangeEvent(TableId tableId, Supplier<SchemaChangeEventEmitter> schemaChangeEventEmitter) throws InterruptedException {
// TODO
boolean tableIncluded = true;
if(tableIncluded) {
schemaChangeEventEmitter.emitSchemaChangeEvent(tableId, new SchemaChangeEventReceiver());
schemaChangeEventEmitter.get().emitSchemaChangeEvent(new SchemaChangeEventReceiver());
}
}

View File

@ -17,7 +17,7 @@
*/
public interface ChangeRecordEmitter {
void emitChangeRecords(OffsetContext offsetContext, DataCollectionSchema schema, Receiver receiver) throws InterruptedException;
void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException;
public interface Receiver {
void changeRecord(Operation operation, Object key, Struct value, OffsetContext offsetManager) throws InterruptedException;

View File

@ -5,7 +5,6 @@
*/
package io.debezium.pipeline.spi;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.SchemaChangeEvent;
@ -16,7 +15,7 @@
*/
public interface SchemaChangeEventEmitter {
void emitSchemaChangeEvent(TableId tableId, Receiver receiver) throws InterruptedException;
void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException;
public interface Receiver {
void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException;

View File

@ -22,33 +22,35 @@
*/
public abstract class RelationalChangeRecordEmitter implements ChangeRecordEmitter {
private final OffsetContext offsetContext;
private final Clock clock;
public RelationalChangeRecordEmitter(Clock clock) {
public RelationalChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {
this.offsetContext = offsetContext;
this.clock = clock;
}
@Override
public void emitChangeRecords(OffsetContext offsetContext, DataCollectionSchema schema, Receiver receiver) throws InterruptedException {
public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {
TableSchema tableSchema = (TableSchema) schema;
Operation operation = getOperation();
switch(operation) {
case CREATE:
emitCreateRecord(offsetContext, receiver, tableSchema, operation);
emitCreateRecord(receiver, tableSchema, operation);
break;
case UPDATE:
emitUpdateRecord(offsetContext, receiver, tableSchema, operation);
emitUpdateRecord(receiver, tableSchema, operation);
break;
case DELETE:
emitDeleteRecord(offsetContext, receiver, tableSchema, operation);
emitDeleteRecord(receiver, tableSchema, operation);
break;
default:
throw new IllegalArgumentException("Unsupported operation: " + operation);
}
}
private void emitCreateRecord(OffsetContext offsetContext, Receiver receiver, TableSchema tableSchema, Operation operation)
private void emitCreateRecord(Receiver receiver, TableSchema tableSchema, Operation operation)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Object newKey = tableSchema.keyFromColumnData(newColumnValues);
@ -58,7 +60,7 @@ private void emitCreateRecord(OffsetContext offsetContext, Receiver receiver, Ta
receiver.changeRecord(operation, newKey, envelope, offsetContext);
}
private void emitUpdateRecord(OffsetContext offsetContext, Receiver receiver, TableSchema tableSchema, Operation operation)
private void emitUpdateRecord(Receiver receiver, TableSchema tableSchema, Operation operation)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();
@ -84,7 +86,7 @@ private void emitUpdateRecord(OffsetContext offsetContext, Receiver receiver, Ta
}
}
private void emitDeleteRecord(OffsetContext offsetContext, Receiver receiver, TableSchema tableSchema, Operation operation)
private void emitDeleteRecord(Receiver receiver, TableSchema tableSchema, Operation operation)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);

View File

@ -53,13 +53,20 @@ public void start() {
@Override
public final void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl)
throws DatabaseHistoryException {
storeRecord(new HistoryRecord(source, position, databaseName, ddl));
record(source, position, databaseName, ddl, null);
}
@Override
public final void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl, TableChanges changes)
throws DatabaseHistoryException {
storeRecord(new HistoryRecord(source, position, databaseName, ddl, changes));
}
@Override
public final void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
logger.debug("Recovering DDL history for source partition {} and offset {}", source, position);
HistoryRecord stopPoint = new HistoryRecord(source, position, null, null);
HistoryRecord stopPoint = new HistoryRecord(source, position, null, null, null);
recoverRecords(recovered -> {
if (comparator.isAtOrBefore(recovered, stopPoint)) {
String ddl = recovered.ddl();

View File

@ -99,6 +99,8 @@ public interface DatabaseHistory {
*/
void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl) throws DatabaseHistoryException;
void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl, TableChanges changes) throws DatabaseHistoryException;
/**
* Recover the {@link Tables database schema} to a known point in its history. Note that it is possible to recover the
* database schema to a point in history that is earlier than what has been {@link #record(Map, Map, String, Tables, String)

View File

@ -6,6 +6,7 @@
package io.debezium.relational.history;
import java.util.Map;
import java.util.Map.Entry;
import io.debezium.document.Document;
@ -16,6 +17,7 @@ public static final class Fields {
public static final String POSITION = "position";
public static final String DATABASE_NAME = "databaseName";
public static final String DDL_STATEMENTS = "ddl";
public static final String TABLE_CHANGES = "tableChanges";
}
private final Document doc;
@ -24,14 +26,36 @@ public HistoryRecord(Document document) {
this.doc = document;
}
public HistoryRecord(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl) {
public HistoryRecord(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl, TableChanges changes) {
this.doc = Document.create();
Document src = doc.setDocument(Fields.SOURCE);
if (source != null) source.forEach(src::set);
Document pos = doc.setDocument(Fields.POSITION);
if (position != null) position.forEach(pos::set);
if (databaseName != null) doc.setString(Fields.DATABASE_NAME, databaseName);
if (ddl != null) doc.setString(Fields.DDL_STATEMENTS, ddl);
if (position != null) {
for (Entry<String, ?> positionElement : position.entrySet()) {
if (positionElement.getValue() instanceof byte[]) {
pos.setBinary(positionElement.getKey(), (byte[]) positionElement.getValue());
}
else {
pos.set(positionElement.getKey(), positionElement.getValue());
}
}
}
if (databaseName != null) {
doc.setString(Fields.DATABASE_NAME, databaseName);
}
if (ddl != null) {
doc.setString(Fields.DDL_STATEMENTS, ddl);
}
if (changes != null) {
doc.setArray(Fields.TABLE_CHANGES, changes.toArray());
}
}
public Document document() {

View File

@ -0,0 +1,113 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.document.Value;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
public class TableChanges {
private final List<TableChange> changes;
public TableChanges() {
this.changes = new ArrayList<>();
}
public TableChanges create(Table table) {
changes.add(new TableChange(TableChangeType.CREATE, table));
return this;
}
public Array toArray() {
List<Value> values = changes.stream()
.map(TableChange::toDocument)
.map(Value::create)
.collect(Collectors.toList());
return Array.create(values);
}
public static class TableChange {
private final TableChangeType type;
private final TableId id;
private final Table table;
public TableChange(TableChangeType type, Table table) {
this.type = type;
this.table = table;
this.id = table.id();
}
public Document toDocument() {
Document document = Document.create();
document.setString("type", type.name());
document.setString("id", id.toString());
document.setDocument("table", toDocument(table));
return document;
}
private Document toDocument(Table table) {
Document document = Document.create();
document.set("defaultCharsetName", table.defaultCharsetName());
document.set("primaryKeyColumnNames", Array.create(table.primaryKeyColumnNames()));
List<Document> columns = table.columns()
.stream()
.map(this::toDocument)
.collect(Collectors.toList());
document.setArray("columns", Array.create(columns));
return document;
}
private Document toDocument(Column column) {
Document document = Document.create();
document.setString("name", column.name());
document.setNumber("jdbcType", column.jdbcType());
if (column.nativeType() != Column.UNSET_INT_VALUE) {
document.setNumber("nativeType", column.nativeType());
}
document.setString("typeName", column.typeName());
document.setString("typeExpression", column.typeExpression());
document.setString("charsetName", column.charsetName());
if (column.length() != Column.UNSET_INT_VALUE) {
document.setNumber("length", column.length());
}
if (column.scale() != Column.UNSET_INT_VALUE) {
document.setNumber("scale", column.scale());
}
document.setNumber("position", column.position());
document.setBoolean("optional", column.isOptional());
document.setBoolean("autoIncremented", column.isAutoIncremented());
document.setBoolean("generated", column.isGenerated());
return document;
}
}
public enum TableChangeType {
CREATE,
ALTER,
DROP;
}
}

View File

@ -5,37 +5,68 @@
*/
package io.debezium.schema;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import io.debezium.relational.Table;
public class SchemaChangeEvent {
private final String database;
private final String ddl;
private final Table table;
private final Set<Table> tables;
private final SchemaChangeEventType type;
private final Map<String, ?> partition;
private final Map<String, ?> offset;
private final boolean isFromSnapshot;
public SchemaChangeEvent(String ddl, Table table, SchemaChangeEventType type) {
public SchemaChangeEvent(Map<String, ?> partition, Map<String, ?> offset, String database, String ddl, Table table, SchemaChangeEventType type, boolean isFromSnapshot) {
this(partition, offset, database, ddl, table != null ? Collections.singleton(table) : null, type, isFromSnapshot);
}
public SchemaChangeEvent(Map<String, ?> partition, Map<String, ?> offset, String database, String ddl, Set<Table> tables, SchemaChangeEventType type, boolean isFromSnapshot) {
this.partition = Objects.requireNonNull(partition, "partition must not be null");
this.offset = Objects.requireNonNull(offset, "offset must not be null");
this.database = Objects.requireNonNull(database, "database must not be null");
this.ddl = Objects.requireNonNull(ddl, "ddl must not be null");
this.table = Objects.requireNonNull(table, "table must not be null");
this.tables = Objects.requireNonNull(tables, "tables must not be null");
this.type = Objects.requireNonNull(type, "type must not be null");
this.isFromSnapshot = isFromSnapshot;
}
public Map<String, ?> getPartition() {
return partition;
}
public Map<String, ?> getOffset() {
return offset;
}
public String getDatabase() {
return database;
}
public String getDdl() {
return ddl;
}
public Table getTable() {
return table;
public Set<Table> getTables() {
return tables;
}
public SchemaChangeEventType getType() {
return type;
}
public boolean isFromSnapshot() {
return isFromSnapshot;
}
@Override
public String toString() {
return "SchemaChangeEvent [ddl=" + ddl + ", table=" + table + ", type=" + type + "]";
return "SchemaChangeEvent [ddl=" + ddl + ", tables=" + tables + ", type=" + type + "]";
}
public static enum SchemaChangeEventType {

View File

@ -27,7 +27,7 @@ public void canSerializeAndDeserializeHistoryRecord() throws Exception {
String databaseName = "db";
String ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );";
HistoryRecord record = new HistoryRecord(source, position, databaseName, ddl);
HistoryRecord record = new HistoryRecord(source, position, databaseName, ddl, null);
String serialized = record.toString();
DocumentReader reader = DocumentReader.defaultReader();