DBZ-2916 Add support for Oracle DDL statement parsing

This commit is contained in:
Chris Cranford 2021-04-21 22:31:59 -04:00 committed by Gunnar Morling
parent 9f66d3e5ff
commit b13cbb1077
35 changed files with 1881 additions and 354 deletions

View File

@ -1,97 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
/**
* {@link SchemaChangeEventEmitter} implementation based on Oracle.
*
* @author Gunnar Morling
*/
public class BaseOracleSchemaChangeEventEmitter implements SchemaChangeEventEmitter {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseOracleSchemaChangeEventEmitter.class);
private final OracleOffsetContext offsetContext;
private final TableId tableId;
private String sourceDatabaseName;
private String objectOwner;
private String ddlText;
private String commandType;
public BaseOracleSchemaChangeEventEmitter(OracleOffsetContext offsetContext, TableId tableId,
String sourceDatabaseName, String objectOwner, String ddlText,
String commandType) {
this.offsetContext = offsetContext;
this.tableId = tableId;
this.sourceDatabaseName = sourceDatabaseName;
this.objectOwner = objectOwner;
this.ddlText = ddlText;
this.commandType = commandType;
}
@Override
public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException {
SchemaChangeEventType eventType = getSchemaChangeEventType();
if (eventType == null) {
return;
}
Tables tables = new Tables();
OracleDdlParser parser = new OracleDdlParser();
parser.setCurrentDatabase(sourceDatabaseName);
parser.setCurrentSchema(objectOwner);
parser.parse(ddlText, tables);
Set<TableId> changedTableIds = tables.drainChanges();
if (changedTableIds.isEmpty()) {
throw new IllegalArgumentException("Couldn't parse DDL statement " + ddlText);
}
Table table = tables.forTable(tableId);
receiver.schemaChangeEvent(new SchemaChangeEvent(
offsetContext.getPartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
sourceDatabaseName,
objectOwner,
ddlText,
table,
eventType,
false));
}
private SchemaChangeEventType getSchemaChangeEventType() {
switch (commandType) {
case "CREATE TABLE":
return SchemaChangeEventType.CREATE;
case "ALTER TABLE":
LOGGER.warn("ALTER TABLE not yet implemented");
break;
case "DROP TABLE":
LOGGER.warn("DROP TABLE not yet implemented");
break;
default:
LOGGER.debug("Ignoring DDL event of type {}", commandType);
}
return null;
}
}

View File

@ -51,7 +51,9 @@ public ChangeEventSourceCoordinator start(Configuration config) {
Configuration jdbcConfig = connectorConfig.jdbcConfig(); Configuration jdbcConfig = connectorConfig.jdbcConfig();
jdbcConnection = new OracleConnection(jdbcConfig, () -> getClass().getClassLoader()); jdbcConnection = new OracleConnection(jdbcConfig, () -> getClass().getClassLoader());
this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); OracleValueConverters valueConverters = new OracleValueConverters(connectorConfig, jdbcConnection);
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, schemaNameAdjuster, topicSelector, jdbcConnection);
this.schema.initializeStorage(); this.schema.initializeStorage();
OffsetContext previousOffset = getPreviousOffset(connectorConfig.getAdapter().getOffsetContextLoader(connectorConfig)); OffsetContext previousOffset = getPreviousOffset(connectorConfig.getAdapter().getOffsetContextLoader(connectorConfig));

View File

@ -14,10 +14,7 @@
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder; import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables; import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.schema.TopicSelector; import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster; import io.debezium.util.SchemaNameAdjuster;
@ -30,51 +27,61 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleDatabaseSchema.class); private static final Logger LOGGER = LoggerFactory.getLogger(OracleDatabaseSchema.class);
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, private final OracleDdlParser ddlParser;
private final OracleValueConverters valueConverters;
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters,
SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector,
OracleConnection connection) { OracleConnection connection) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(), super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(),
connectorConfig.getColumnFilter(),
new TableSchemaBuilder( new TableSchemaBuilder(
new OracleValueConverters(connectorConfig, connection), valueConverters,
schemaNameAdjuster, schemaNameAdjuster,
connectorConfig.customConverterRegistry(), connectorConfig.customConverterRegistry(),
connectorConfig.getSourceInfoStructMaker().schema(), connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.getSanitizeFieldNames()), connectorConfig.getSanitizeFieldNames()),
connection.getTablenameCaseInsensitivity(connectorConfig), connection.getTablenameCaseInsensitivity(connectorConfig),
connectorConfig.getKeyMapper()); connectorConfig.getKeyMapper());
this.ddlParser = new OracleDdlParser(valueConverters, connectorConfig.getTableFilters().dataCollectionFilter());
this.valueConverters = valueConverters;
} }
public Tables getTables() { public Tables getTables() {
return tables(); return tables();
} }
public OracleValueConverters getValueConverters() {
return valueConverters;
}
@Override @Override
protected DdlParser getDdlParser() { public OracleDdlParser getDdlParser() {
return new OracleDdlParser(); return ddlParser;
} }
@Override @Override
public void applySchemaChange(SchemaChangeEvent schemaChange) { public void applySchemaChange(SchemaChangeEvent schemaChange) {
LOGGER.debug("Applying schema change event {}", schemaChange); LOGGER.debug("Applying schema change event {}", schemaChange);
// just a single table per DDL event for Oracle switch (schemaChange.getType()) {
Table table = schemaChange.getTables().iterator().next(); case CREATE:
buildAndRegisterSchema(table); case ALTER:
tables().overwriteTable(table); schemaChange.getTableChanges().forEach(x -> {
buildAndRegisterSchema(x.getTable());
TableChanges tableChanges = null; tables().overwriteTable(x.getTable());
if (schemaChange.getType() == SchemaChangeEventType.CREATE) { });
tableChanges = new TableChanges(); break;
tableChanges.create(table); case DROP:
} schemaChange.getTableChanges().forEach(x -> removeSchema(x.getId()));
else if (schemaChange.getType() == SchemaChangeEventType.ALTER) { break;
tableChanges = new TableChanges(); default:
tableChanges.alter(table);
}
else if (schemaChange.getType() == SchemaChangeEventType.DROP) {
tableChanges = new TableChanges();
tableChanges.drop(table);
} }
record(schemaChange, tableChanges); if (schemaChange.getTables().stream().map(Table::id).anyMatch(getTableFilter()::isIncluded)) {
LOGGER.debug("Recorded DDL statements for database '{}': {}", schemaChange.getDatabase(), schemaChange.getDdl());
record(schemaChange, schemaChange.getTableChanges());
}
} }
} }

View File

@ -9,6 +9,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -184,7 +185,7 @@ public void setSourceTime(Instant instant) {
} }
public void setTableId(TableId tableId) { public void setTableId(TableId tableId) {
sourceInfo.setTableId(tableId); sourceInfo.tableEvent(tableId);
} }
@Override @Override
@ -229,10 +230,20 @@ public void markLastSnapshotRecord() {
@Override @Override
public void event(DataCollectionId tableId, Instant timestamp) { public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.setTableId((TableId) tableId); sourceInfo.tableEvent((TableId) tableId);
sourceInfo.setSourceTime(timestamp); sourceInfo.setSourceTime(timestamp);
} }
public void tableEvent(TableId tableId, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
sourceInfo.tableEvent(tableId);
}
public void tableEvent(Set<TableId> tableIds, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
sourceInfo.tableEvent(tableIds);
}
@Override @Override
public TransactionContext getTransactionContext() { public TransactionContext getTransactionContext() {
return transactionContext; return transactionContext;

View File

@ -0,0 +1,166 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlParserListener;
import io.debezium.relational.ddl.DdlParserListener.TableAlteredEvent;
import io.debezium.relational.ddl.DdlParserListener.TableCreatedEvent;
import io.debezium.relational.ddl.DdlParserListener.TableDroppedEvent;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException;
/**
* {@link SchemaChangeEventEmitter} implementation based on Oracle.
*
* @author Gunnar Morling
*/
public class OracleSchemaChangeEventEmitter implements SchemaChangeEventEmitter {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleSchemaChangeEventEmitter.class);
private final OracleOffsetContext offsetContext;
private final TableId tableId;
private final OracleDatabaseSchema schema;
private final DatabaseHistory databaseHistory;
private final Instant changeTime;
private final String sourceDatabaseName;
private final String objectOwner;
private final String ddlText;
private final TableFilter filters;
private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
public OracleSchemaChangeEventEmitter(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext, TableId tableId,
String sourceDatabaseName, String objectOwner, String ddlText,
OracleDatabaseSchema schema, Instant changeTime, OracleStreamingChangeEventSourceMetrics streamingMetrics) {
this.offsetContext = offsetContext;
this.tableId = tableId;
this.sourceDatabaseName = sourceDatabaseName;
this.objectOwner = objectOwner;
this.ddlText = ddlText;
this.schema = schema;
this.databaseHistory = connectorConfig.getDatabaseHistory();
this.changeTime = changeTime;
this.streamingMetrics = streamingMetrics;
this.filters = connectorConfig.getTableFilters().dataCollectionFilter();
}
@Override
public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException {
// Cache a copy of the table's schema prior to parsing the DDL.
// This is needed in the event that the parsed DDL is a drop table
// todo: verify whether this is actually necessary in the emitted SchemaChangeEvent
final Table tableBefore = schema.tableFor(tableId);
final OracleDdlParser parser = schema.getDdlParser();
final DdlChanges ddlChanges = parser.getDdlChanges();
try {
ddlChanges.reset();
parser.setCurrentDatabase(sourceDatabaseName);
parser.setCurrentSchema(objectOwner);
parser.parse(ddlText, schema.getTables());
}
catch (ParsingException | MultipleParsingExceptions e) {
if (databaseHistory.skipUnparseableDdlStatements()) {
LOGGER.warn("Ignoring unparsable DDL statement '{}': {}", ddlText, e);
streamingMetrics.incrementWarningCount();
streamingMetrics.incrementUnparsableDdlCount();
}
else {
throw e;
}
}
if (!ddlChanges.isEmpty() && filters.isIncluded(tableId)) {
List<SchemaChangeEvent> changeEvents = new ArrayList<>();
ddlChanges.getEventsByDatabase((String dbName, List<DdlParserListener.Event> events) -> {
events.forEach(event -> {
switch (event.type()) {
case CREATE_TABLE:
changeEvents.add(createTableEvent((TableCreatedEvent) event));
break;
case ALTER_TABLE:
changeEvents.add(alterTableEvent((TableAlteredEvent) event));
break;
case DROP_TABLE:
changeEvents.add(dropTableEvent(tableBefore, (TableDroppedEvent) event));
break;
default:
LOGGER.info("Skipped DDL event type {}: {}", event.type(), ddlText);
break;
}
});
});
for (SchemaChangeEvent event : changeEvents) {
receiver.schemaChangeEvent(event);
}
}
}
private SchemaChangeEvent createTableEvent(TableCreatedEvent event) {
offsetContext.tableEvent(tableId, changeTime);
return new SchemaChangeEvent(
offsetContext.getPartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
tableId.catalog(),
tableId.schema(),
event.statement(),
schema.tableFor(event.tableId()),
SchemaChangeEventType.CREATE,
false);
}
private SchemaChangeEvent alterTableEvent(TableAlteredEvent event) {
final Set<TableId> tableIds = new HashSet<>();
tableIds.add(tableId);
tableIds.add(event.tableId());
offsetContext.tableEvent(tableIds, changeTime);
return new SchemaChangeEvent(
offsetContext.getPartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
tableId.catalog(),
tableId.schema(),
event.statement(),
schema.tableFor(event.tableId()),
SchemaChangeEventType.ALTER,
false);
}
private SchemaChangeEvent dropTableEvent(Table tableSchemaBeforeDrop, TableDroppedEvent event) {
offsetContext.tableEvent(tableId, changeTime);
return new SchemaChangeEvent(
offsetContext.getPartition(),
offsetContext.getOffset(),
offsetContext.getSourceInfo(),
tableId.catalog(),
tableId.schema(),
event.statement(),
tableSchemaBeforeDrop,
SchemaChangeEventType.DROP,
false);
}
}

View File

@ -39,8 +39,8 @@ public Struct struct(SourceInfo sourceInfo) {
final String commitScn = sourceInfo.getCommitScn() == null ? null : sourceInfo.getCommitScn().toString(); final String commitScn = sourceInfo.getCommitScn() == null ? null : sourceInfo.getCommitScn().toString();
final Struct ret = super.commonStruct(sourceInfo) final Struct ret = super.commonStruct(sourceInfo)
.put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.getTableId().schema()) .put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.tableSchema())
.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table()) .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.table())
.put(SourceInfo.TXID_KEY, sourceInfo.getTransactionId()) .put(SourceInfo.TXID_KEY, sourceInfo.getTransactionId())
.put(SourceInfo.SCN_KEY, scn); .put(SourceInfo.SCN_KEY, scn);

View File

@ -87,6 +87,7 @@ public class OracleStreamingChangeEventSourceMetrics extends StreamingChangeEven
private final AtomicReference<Scn> oldestScn = new AtomicReference<>(); private final AtomicReference<Scn> oldestScn = new AtomicReference<>();
private final AtomicReference<Scn> committedScn = new AtomicReference<>(); private final AtomicReference<Scn> committedScn = new AtomicReference<>();
private final AtomicReference<Scn> offsetScn = new AtomicReference<>(); private final AtomicReference<Scn> offsetScn = new AtomicReference<>();
private final AtomicInteger unparsableDdlCount = new AtomicInteger();
// Constants for sliding window algorithm // Constants for sliding window algorithm
private final int batchSizeMin; private final int batchSizeMin;
@ -552,6 +553,11 @@ public int getScnFreezeCount() {
return scnFreezeCount.get(); return scnFreezeCount.get();
} }
@Override
public int getUnparsableDdlCount() {
return unparsableDdlCount.get();
}
public void setOldestScn(Scn scn) { public void setOldestScn(Scn scn) {
oldestScn.set(scn); oldestScn.set(scn);
} }
@ -634,6 +640,10 @@ public void calculateLagMetrics(Instant changeTime) {
} }
} }
public void incrementUnparsableDdlCount() {
unparsableDdlCount.incrementAndGet();
}
@Override @Override
public String toString() { public String toString() {
return "OracleStreamingChangeEventSourceMetrics{" + return "OracleStreamingChangeEventSourceMetrics{" +
@ -693,6 +703,7 @@ public String toString() {
", errorCount=" + errorCount.get() + ", errorCount=" + errorCount.get() +
", warningCount=" + warningCount.get() + ", warningCount=" + warningCount.get() +
", scnFreezeCount=" + scnFreezeCount.get() + ", scnFreezeCount=" + scnFreezeCount.get() +
", unparsableDdlCount=" + unparsableDdlCount.get() +
'}'; '}';
} }
} }

View File

@ -292,6 +292,11 @@ public interface OracleStreamingChangeEventSourceMetricsMXBean extends Streaming
*/ */
int getScnFreezeCount(); int getScnFreezeCount();
/**
* @return the number of unparsable ddl statements
*/
int getUnparsableDdlCount();
/** /**
* Resets metrics. * Resets metrics.
*/ */

View File

@ -6,6 +6,10 @@
package io.debezium.connector.oracle; package io.debezium.connector.oracle;
import java.time.Instant; import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import io.debezium.annotation.NotThreadSafe; import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.common.BaseSourceInfo; import io.debezium.connector.common.BaseSourceInfo;
@ -25,7 +29,7 @@ public class SourceInfo extends BaseSourceInfo {
private String lcrPosition; private String lcrPosition;
private String transactionId; private String transactionId;
private Instant sourceTime; private Instant sourceTime;
private TableId tableId; private Set<TableId> tableIds;
protected SourceInfo(OracleConnectorConfig connectorConfig) { protected SourceInfo(OracleConnectorConfig connectorConfig) {
super(connectorConfig); super(connectorConfig);
@ -71,12 +75,29 @@ public void setSourceTime(Instant sourceTime) {
this.sourceTime = sourceTime; this.sourceTime = sourceTime;
} }
public TableId getTableId() { public String tableSchema() {
return tableId; return tableIds.isEmpty() ? null
: tableIds.stream()
.filter(x -> x != null)
.map(TableId::schema)
.distinct()
.collect(Collectors.joining(","));
} }
public void setTableId(TableId tableId) { public String table() {
this.tableId = tableId; return tableIds.isEmpty() ? null
: tableIds.stream()
.filter(x -> x != null)
.map(TableId::table)
.collect(Collectors.joining(","));
}
public void tableEvent(Set<TableId> tableIds) {
this.tableIds = new HashSet<>(tableIds);
}
public void tableEvent(TableId tableId) {
this.tableIds = Collections.singleton(tableId);
} }
@Override @Override
@ -86,6 +107,6 @@ protected Instant timestamp() {
@Override @Override
protected String database() { protected String database() {
return tableId.catalog(); return tableIds.iterator().next().catalog();
} }
} }

View File

@ -7,7 +7,6 @@
import java.sql.Types; import java.sql.Types;
import java.util.Arrays; import java.util.Arrays;
import java.util.Locale;
import org.antlr.v4.runtime.CharStream; import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CommonTokenStream; import org.antlr.v4.runtime.CommonTokenStream;
@ -17,11 +16,13 @@
import io.debezium.antlr.AntlrDdlParserListener; import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.DataTypeResolver; import io.debezium.antlr.DataTypeResolver;
import io.debezium.antlr.DataTypeResolver.DataTypeEntry; import io.debezium.antlr.DataTypeResolver.DataTypeEntry;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.antlr.listener.OracleDdlParserListener; import io.debezium.connector.oracle.antlr.listener.OracleDdlParserListener;
import io.debezium.ddl.parser.oracle.generated.PlSqlLexer; import io.debezium.ddl.parser.oracle.generated.PlSqlLexer;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.SystemVariables; import io.debezium.relational.SystemVariables;
import io.debezium.relational.Tables; import io.debezium.relational.Tables;
import io.debezium.relational.Tables.TableFilter;
import oracle.jdbc.OracleTypes; import oracle.jdbc.OracleTypes;
@ -30,17 +31,28 @@
*/ */
public class OracleDdlParser extends AntlrDdlParser<PlSqlLexer, PlSqlParser> { public class OracleDdlParser extends AntlrDdlParser<PlSqlLexer, PlSqlParser> {
private final TableFilter tableFilter;
private final OracleValueConverters converters;
private String catalogName; private String catalogName;
private String schemaName; private String schemaName;
public OracleDdlParser() { public OracleDdlParser() {
super(true); this(null, TableFilter.includeAll());
} }
public OracleDdlParser(boolean throwErrorsFromTreeWalk, final String catalogName, final String schemaName) { public OracleDdlParser(OracleValueConverters valueConverters) {
this(true, valueConverters, TableFilter.includeAll());
}
public OracleDdlParser(OracleValueConverters valueConverters, TableFilter tableFilter) {
this(true, valueConverters, tableFilter);
}
public OracleDdlParser(boolean throwErrorsFromTreeWalk, OracleValueConverters converters, TableFilter tableFilter) {
super(throwErrorsFromTreeWalk); super(throwErrorsFromTreeWalk);
this.catalogName = catalogName; this.converters = converters;
this.schemaName = schemaName; this.tableFilter = tableFilter;
} }
@Override @Override
@ -48,7 +60,7 @@ public void parse(String ddlContent, Tables databaseTables) {
if (!ddlContent.endsWith(";")) { if (!ddlContent.endsWith(";")) {
ddlContent = ddlContent + ";"; ddlContent = ddlContent + ";";
} }
super.parse(toUpperCase(ddlContent), databaseTables); super.parse(ddlContent, databaseTables);
} }
@Override @Override
@ -146,8 +158,11 @@ public void runIfNotNull(Runnable function, Object... nullableObjects) {
function.run(); function.run();
} }
// TODO excluded quoted identifiers public OracleValueConverters getConverters() {
private String toUpperCase(String ddl) { return converters;
return ddl.toUpperCase(Locale.ENGLISH); }
public TableFilter getTableFilter() {
return tableFilter;
} }
} }

View File

@ -6,13 +6,13 @@
package io.debezium.connector.oracle.antlr.listener; package io.debezium.connector.oracle.antlr.listener;
import static io.debezium.antlr.AntlrDdlParser.getText; import static io.debezium.antlr.AntlrDdlParser.getText;
import static io.debezium.connector.oracle.antlr.listener.ParserUtils.getColumnName;
import static io.debezium.connector.oracle.antlr.listener.ParserUtils.getTableName;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.antlr.v4.runtime.tree.ParseTreeListener; import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.antlr.OracleDdlParser; import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
@ -27,10 +27,13 @@
*/ */
public class AlterTableParserListener extends BaseParserListener { public class AlterTableParserListener extends BaseParserListener {
private static final Logger LOGGER = LoggerFactory.getLogger(AlterTableParserListener.class);
private static final int STARTING_INDEX = 1; private static final int STARTING_INDEX = 1;
private TableEditor tableEditor; private TableEditor tableEditor;
private String catalogName; private String catalogName;
private String schemaName; private String schemaName;
private TableId previousTableId;
private OracleDdlParser parser; private OracleDdlParser parser;
private final List<ParseTreeListener> listeners; private final List<ParseTreeListener> listeners;
private ColumnDefinitionParserListener columnDefinitionParserListener; private ColumnDefinitionParserListener columnDefinitionParserListener;
@ -55,8 +58,8 @@ public class AlterTableParserListener extends BaseParserListener {
@Override @Override
public void enterAlter_table(PlSqlParser.Alter_tableContext ctx) { public void enterAlter_table(PlSqlParser.Alter_tableContext ctx) {
previousTableId = null;
TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name())); TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name()));
// todo filter tables not in table.include.list
tableEditor = parser.databaseTables().editTable(tableId); tableEditor = parser.databaseTables().editTable(tableId);
if (tableEditor == null) { if (tableEditor == null) {
throw new ParsingException(null, "Trying to alter table " + tableId.toString() throw new ParsingException(null, "Trying to alter table " + tableId.toString()
@ -70,11 +73,32 @@ public void exitAlter_table(PlSqlParser.Alter_tableContext ctx) {
parser.runIfNotNull(() -> { parser.runIfNotNull(() -> {
listeners.remove(columnDefinitionParserListener); listeners.remove(columnDefinitionParserListener);
parser.databaseTables().overwriteTable(tableEditor.create()); parser.databaseTables().overwriteTable(tableEditor.create());
// parser.signalAlterTable(tableEditor.tableId(), null, ctx.getParent());// todo? parser.signalAlterTable(tableEditor.tableId(), previousTableId, ctx.getParent());
}, tableEditor); }, tableEditor);
super.exitAlter_table(ctx); super.exitAlter_table(ctx);
} }
@Override
public void enterAlter_table_properties(PlSqlParser.Alter_table_propertiesContext ctx) {
parser.runIfNotNull(() -> {
if (ctx.RENAME() != null && ctx.TO() != null) {
previousTableId = tableEditor.tableId();
String tableName = getTableName(ctx.tableview_name());
final TableId newTableId = new TableId(tableEditor.tableId().catalog(), tableEditor.tableId().schema(), tableName);
if (parser.getTableFilter().isIncluded(previousTableId) && !parser.getTableFilter().isIncluded(newTableId)) {
LOGGER.warn("Renaming included table {} to non-included table {}, this can lead to schema inconsistency", previousTableId, newTableId);
}
else if (!parser.getTableFilter().isIncluded(previousTableId) && parser.getTableFilter().isIncluded(newTableId)) {
LOGGER.warn("Renaming non-included table {} to included table {}, this can lead to schema inconsistency", previousTableId, newTableId);
}
parser.databaseTables().overwriteTable(tableEditor.create());
parser.databaseTables().renameTable(tableEditor.tableId(), newTableId);
tableEditor = parser.databaseTables().editTable(newTableId);
}
}, tableEditor);
super.exitAlter_table_properties(ctx);
}
@Override @Override
public void enterAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) { public void enterAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) {
parser.runIfNotNull(() -> { parser.runIfNotNull(() -> {
@ -85,12 +109,35 @@ public void enterAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) {
ColumnEditor editor = Column.editor().name(columnName); ColumnEditor editor = Column.editor().name(columnName);
columnEditors.add(editor); columnEditors.add(editor);
} }
columnDefinitionParserListener = new ColumnDefinitionParserListener(tableEditor, columnEditors.get(0), parser.dataTypeResolver()); columnDefinitionParserListener = new ColumnDefinitionParserListener(tableEditor, columnEditors.get(0), parser, listeners);
listeners.add(columnDefinitionParserListener); listeners.add(columnDefinitionParserListener);
}, tableEditor); }, tableEditor);
super.enterAdd_column_clause(ctx); super.enterAdd_column_clause(ctx);
} }
@Override
public void enterModify_column_clauses(PlSqlParser.Modify_column_clausesContext ctx) {
parser.runIfNotNull(() -> {
List<PlSqlParser.Modify_col_propertiesContext> columns = ctx.modify_col_properties();
columnEditors = new ArrayList<>(columns.size());
for (PlSqlParser.Modify_col_propertiesContext column : columns) {
String columnName = getColumnName(column.column_name());
Column existingColumn = tableEditor.columnWithName(columnName);
if (existingColumn != null) {
ColumnEditor columnEditor = existingColumn.edit();
columnEditors.add(columnEditor);
}
else {
throw new ParsingException(null, "trying to change column " + columnName + " in " +
tableEditor.tableId().toString() + " table, which does not exist. Query: " + getText(ctx));
}
}
columnDefinitionParserListener = new ColumnDefinitionParserListener(tableEditor, columnEditors.get(0), parser, listeners);
listeners.add(columnDefinitionParserListener);
}, tableEditor);
super.enterModify_column_clauses(ctx);
}
@Override @Override
public void exitAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) { public void exitAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) {
parser.runIfNotNull(() -> { parser.runIfNotNull(() -> {
@ -101,6 +148,16 @@ public void exitAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) {
super.exitAdd_column_clause(ctx); super.exitAdd_column_clause(ctx);
} }
@Override
public void exitModify_column_clauses(PlSqlParser.Modify_column_clausesContext ctx) {
parser.runIfNotNull(() -> {
columnEditors.forEach(columnEditor -> tableEditor.addColumn(columnEditor.create()));
listeners.remove(columnDefinitionParserListener);
columnDefinitionParserListener = null;
}, tableEditor, columnEditors);
super.exitModify_column_clauses(ctx);
}
@Override @Override
public void exitColumn_definition(PlSqlParser.Column_definitionContext ctx) { public void exitColumn_definition(PlSqlParser.Column_definitionContext ctx) {
parser.runIfNotNull(() -> { parser.runIfNotNull(() -> {
@ -122,6 +179,27 @@ public void exitColumn_definition(PlSqlParser.Column_definitionContext ctx) {
super.exitColumn_definition(ctx); super.exitColumn_definition(ctx);
} }
@Override
public void exitModify_col_properties(PlSqlParser.Modify_col_propertiesContext ctx) {
parser.runIfNotNull(() -> {
if (columnEditors != null) {
// column editor list is not null when multiple columns are paresd in one statement
if (columnEditors.size() > parsingColumnIndex) {
// assign next column editor to parse another column definition
columnDefinitionParserListener.setColumnEditor(columnEditors.get(parsingColumnIndex++));
}
else {
// all columns parsed
// reset global variables for next parsed statement
columnEditors.forEach(columnEditor -> tableEditor.addColumn(columnEditor.create()));
columnEditors = null;
parsingColumnIndex = STARTING_INDEX;
}
}
}, tableEditor, columnEditors);
super.exitModify_col_properties(ctx);
}
@Override @Override
public void enterDrop_column_clause(PlSqlParser.Drop_column_clauseContext ctx) { public void enterDrop_column_clause(PlSqlParser.Drop_column_clauseContext ctx) {
parser.runIfNotNull(() -> { parser.runIfNotNull(() -> {
@ -134,4 +212,12 @@ public void enterDrop_column_clause(PlSqlParser.Drop_column_clauseContext ctx) {
}, tableEditor); }, tableEditor);
super.enterDrop_column_clause(ctx); super.enterDrop_column_clause(ctx);
} }
@Override
public void exitRename_column_clause(PlSqlParser.Rename_column_clauseContext ctx) {
parser.runIfNotNull(() -> {
tableEditor.renameColumn(getColumnName(ctx.old_column_name()), getColumnName(ctx.new_column_name()));
}, tableEditor);
super.exitRename_column_clause(ctx);
}
} }

View File

@ -14,15 +14,56 @@
class BaseParserListener extends PlSqlParserBaseListener { class BaseParserListener extends PlSqlParserBaseListener {
String getTableName(final PlSqlParser.Tableview_nameContext tableview_name) { String getTableName(final PlSqlParser.Tableview_nameContext tableview_name) {
final String tableName;
if (tableview_name.id_expression() != null) { if (tableview_name.id_expression() != null) {
return tableview_name.id_expression().getText(); tableName = tableview_name.id_expression().getText();
} }
else { else {
return tableview_name.identifier().id_expression().getText(); tableName = tableview_name.identifier().id_expression().getText();
} }
return getTableOrColumnName(tableName);
} }
String getColumnName(final PlSqlParser.Column_nameContext ctx) { String getColumnName(final PlSqlParser.Column_nameContext ctx) {
return ctx.identifier().id_expression().getText(); return getTableOrColumnName(ctx.identifier().id_expression().getText());
}
String getColumnName(final PlSqlParser.Old_column_nameContext ctx) {
return getTableOrColumnName(ctx.getText());
}
String getColumnName(final PlSqlParser.New_column_nameContext ctx) {
return getTableOrColumnName(ctx.getText());
}
/**
* Resolves a table or column name from the provided string.
*
* Oracle table and column names are inherently stored in upper-case; however, if the objects
* are created using double-quotes, the case of the object name is retained. Therefore when
* needing to parse a table or column name, this method will adhere to those rules and will
* always return the name in upper-case unless the provided name is double-quoted in which
* the returned value will have the double-quotes removed and case retained.
*
* @param name table or column name
* @return parsed table or column name from the supplied name argument
*/
private static String getTableOrColumnName(String name) {
return removeQuotes(name, true);
}
/**
* Removes leading and trailing double quote characters from the provided string.
*
* @param text value to have double quotes removed
* @param upperCaseIfNotQuoted control if returned string is upper-cased if not quoted
* @return string that has had quotes removed
*/
@SuppressWarnings("SameParameterValue")
private static String removeQuotes(String text, boolean upperCaseIfNotQuoted) {
if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) {
return text.substring(1, text.length() - 1);
}
return upperCaseIfNotQuoted ? text.toUpperCase() : text;
} }
} }

View File

@ -6,8 +6,12 @@
package io.debezium.connector.oracle.antlr.listener; package io.debezium.connector.oracle.antlr.listener;
import java.sql.Types; import java.sql.Types;
import java.util.List;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import io.debezium.antlr.DataTypeResolver; import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column; import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor; import io.debezium.relational.ColumnEditor;
@ -16,19 +20,24 @@
import oracle.jdbc.OracleTypes; import oracle.jdbc.OracleTypes;
/** /**
* This class parses column definitions of Oracle statements. * Parser listener that parses column definitions of Oracle DDL statements.
*/ */
public class ColumnDefinitionParserListener extends BaseParserListener { public class ColumnDefinitionParserListener extends BaseParserListener {
private final OracleDdlParser parser;
private final DataTypeResolver dataTypeResolver; private final DataTypeResolver dataTypeResolver;
private final TableEditor tableEditor; private final TableEditor tableEditor;
private final List<ParseTreeListener> listeners;
private ColumnEditor columnEditor; private ColumnEditor columnEditor;
ColumnDefinitionParserListener(final TableEditor tableEditor, final ColumnEditor columnEditor, ColumnDefinitionParserListener(final TableEditor tableEditor, final ColumnEditor columnEditor, OracleDdlParser parser,
final DataTypeResolver dataTypeResolver) { List<ParseTreeListener> listeners) {
this.dataTypeResolver = dataTypeResolver;
this.tableEditor = tableEditor; this.tableEditor = tableEditor;
this.columnEditor = columnEditor; this.columnEditor = columnEditor;
this.parser = parser;
this.dataTypeResolver = parser.dataTypeResolver();
this.listeners = listeners;
} }
void setColumnEditor(ColumnEditor columnEditor) { void setColumnEditor(ColumnEditor columnEditor) {
@ -55,26 +64,63 @@ public void enterPrimary_key_clause(PlSqlParser.Primary_key_clauseContext ctx) {
super.enterPrimary_key_clause(ctx); super.enterPrimary_key_clause(ctx);
} }
@Override
public void enterModify_col_properties(PlSqlParser.Modify_col_propertiesContext ctx) {
resolveColumnDataType(ctx);
super.enterModify_col_properties(ctx);
}
// todo use dataTypeResolver instead // todo use dataTypeResolver instead
private void resolveColumnDataType(PlSqlParser.Column_definitionContext ctx) { private void resolveColumnDataType(PlSqlParser.Column_definitionContext ctx) {
columnEditor.name(getColumnName(ctx.column_name())); columnEditor.name(getColumnName(ctx.column_name()));
PlSqlParser.Precision_partContext precisionPart = null;
if (ctx.datatype() != null) {
precisionPart = ctx.datatype().precision_part();
}
if (ctx.datatype() == null) { if (ctx.datatype() == null) {
if (ctx.type_name() != null && "\"MDSYS\".\"SDO_GEOMETRY\"".equalsIgnoreCase(ctx.type_name().getText())) { if (ctx.type_name() != null && "\"MDSYS\".\"SDO_GEOMETRY\"".equalsIgnoreCase(ctx.type_name().getText())) {
columnEditor.jdbcType(Types.STRUCT).type("MDSYS.SDO_GEOMETRY"); columnEditor.jdbcType(Types.STRUCT).type("MDSYS.SDO_GEOMETRY");
} }
} }
else if (ctx.datatype().native_datatype_element() != null) { else {
if (ctx.datatype().native_datatype_element().INT() != null resolveColumnDataType(ctx.datatype());
|| ctx.datatype().native_datatype_element().INTEGER() != null
|| ctx.datatype().native_datatype_element().SMALLINT() != null // todo move to enterExpression and apply type conversion
|| ctx.datatype().native_datatype_element().NUMERIC() != null if (ctx.DEFAULT() != null) {
|| ctx.datatype().native_datatype_element().DECIMAL() != null) { String defaultValue = ctx.expression().getText();
columnEditor.defaultValue(defaultValue);
}
}
boolean hasNotNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NOT() != null);
// todo move to nonNull
columnEditor.optional(!hasNotNullConstraint);
}
private void resolveColumnDataType(PlSqlParser.Modify_col_propertiesContext ctx) {
columnEditor.name(getColumnName(ctx.column_name()));
resolveColumnDataType(ctx.datatype());
boolean hasNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NULL_() != null);
boolean hasNotNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NOT() != null);
if (hasNotNullConstraint && columnEditor.isOptional()) {
columnEditor.optional(false);
}
else if (hasNullConstraint && !columnEditor.isOptional()) {
columnEditor.optional(true);
}
}
private void resolveColumnDataType(PlSqlParser.DatatypeContext ctx) {
PlSqlParser.Precision_partContext precisionPart = null;
if (ctx != null) {
precisionPart = ctx.precision_part();
}
if (ctx != null && ctx.native_datatype_element() != null) {
if (ctx.native_datatype_element().INT() != null
|| ctx.native_datatype_element().INTEGER() != null
|| ctx.native_datatype_element().SMALLINT() != null
|| ctx.native_datatype_element().NUMERIC() != null
|| ctx.native_datatype_element().DECIMAL() != null) {
// NUMERIC and DECIMAL types have by default zero scale // NUMERIC and DECIMAL types have by default zero scale
columnEditor columnEditor
.jdbcType(Types.NUMERIC) .jdbcType(Types.NUMERIC)
@ -89,17 +135,17 @@ else if (ctx.datatype().native_datatype_element() != null) {
setScale(precisionPart, columnEditor); setScale(precisionPart, columnEditor);
} }
} }
else if (ctx.datatype().native_datatype_element().DATE() != null) { else if (ctx.native_datatype_element().DATE() != null) {
// JDBC driver reports type as timestamp but name DATE // JDBC driver reports type as timestamp but name DATE
columnEditor columnEditor
.jdbcType(Types.TIMESTAMP) .jdbcType(Types.TIMESTAMP)
.type("DATE"); .type("DATE");
} }
else if (ctx.datatype().native_datatype_element().TIMESTAMP() != null) { else if (ctx.native_datatype_element().TIMESTAMP() != null) {
if (ctx.datatype().WITH() != null if (ctx.WITH() != null
&& ctx.datatype().TIME() != null && ctx.TIME() != null
&& ctx.datatype().ZONE() != null) { && ctx.ZONE() != null) {
if (ctx.datatype().LOCAL() != null) { if (ctx.LOCAL() != null) {
columnEditor columnEditor
.jdbcType(OracleTypes.TIMESTAMPLTZ) .jdbcType(OracleTypes.TIMESTAMPLTZ)
.type("TIMESTAMP WITH LOCAL TIME ZONE"); .type("TIMESTAMP WITH LOCAL TIME ZONE");
@ -124,8 +170,8 @@ else if (ctx.datatype().native_datatype_element().TIMESTAMP() != null) {
} }
} }
// VARCHAR is the same as VARCHAR2 in Oracle // VARCHAR is the same as VARCHAR2 in Oracle
else if (ctx.datatype().native_datatype_element().VARCHAR2() != null || else if (ctx.native_datatype_element().VARCHAR2() != null ||
ctx.datatype().native_datatype_element().VARCHAR() != null) { ctx.native_datatype_element().VARCHAR() != null) {
columnEditor columnEditor
.jdbcType(Types.VARCHAR) .jdbcType(Types.VARCHAR)
.type("VARCHAR2"); .type("VARCHAR2");
@ -137,7 +183,7 @@ else if (ctx.datatype().native_datatype_element().VARCHAR2() != null ||
setPrecision(precisionPart, columnEditor); setPrecision(precisionPart, columnEditor);
} }
} }
else if (ctx.datatype().native_datatype_element().NVARCHAR2() != null) { else if (ctx.native_datatype_element().NVARCHAR2() != null) {
columnEditor columnEditor
.jdbcType(Types.NVARCHAR) .jdbcType(Types.NVARCHAR)
.type("NVARCHAR2"); .type("NVARCHAR2");
@ -149,31 +195,31 @@ else if (ctx.datatype().native_datatype_element().NVARCHAR2() != null) {
setPrecision(precisionPart, columnEditor); setPrecision(precisionPart, columnEditor);
} }
} }
else if (ctx.datatype().native_datatype_element().CHAR() != null) { else if (ctx.native_datatype_element().CHAR() != null) {
columnEditor columnEditor
.jdbcType(Types.CHAR) .jdbcType(Types.CHAR)
.type("CHAR") .type("CHAR")
.length(1); .length(1);
} }
else if (ctx.datatype().native_datatype_element().NCHAR() != null) { else if (ctx.native_datatype_element().NCHAR() != null) {
columnEditor columnEditor
.jdbcType(Types.NCHAR) .jdbcType(Types.NCHAR)
.type("NCHAR") .type("NCHAR")
.length(1); .length(1);
} }
else if (ctx.datatype().native_datatype_element().BINARY_FLOAT() != null) { else if (ctx.native_datatype_element().BINARY_FLOAT() != null) {
columnEditor columnEditor
.jdbcType(OracleTypes.BINARY_FLOAT) .jdbcType(OracleTypes.BINARY_FLOAT)
.type("BINARY_FLOAT"); .type("BINARY_FLOAT");
} }
else if (ctx.datatype().native_datatype_element().BINARY_DOUBLE() != null) { else if (ctx.native_datatype_element().BINARY_DOUBLE() != null) {
columnEditor columnEditor
.jdbcType(OracleTypes.BINARY_DOUBLE) .jdbcType(OracleTypes.BINARY_DOUBLE)
.type("BINARY_DOUBLE"); .type("BINARY_DOUBLE");
} }
// PRECISION keyword is mandatory // PRECISION keyword is mandatory
else if (ctx.datatype().native_datatype_element().FLOAT() != null || else if (ctx.native_datatype_element().FLOAT() != null ||
(ctx.datatype().native_datatype_element().DOUBLE() != null && ctx.datatype().native_datatype_element().PRECISION() != null)) { (ctx.native_datatype_element().DOUBLE() != null && ctx.native_datatype_element().PRECISION() != null)) {
columnEditor columnEditor
.jdbcType(Types.FLOAT) .jdbcType(Types.FLOAT)
.type("FLOAT") .type("FLOAT")
@ -184,14 +230,14 @@ else if (ctx.datatype().native_datatype_element().FLOAT() != null ||
setPrecision(precisionPart, columnEditor); setPrecision(precisionPart, columnEditor);
} }
} }
else if (ctx.datatype().native_datatype_element().REAL() != null) { else if (ctx.native_datatype_element().REAL() != null) {
columnEditor columnEditor
.jdbcType(Types.FLOAT) .jdbcType(Types.FLOAT)
.type("FLOAT") .type("FLOAT")
// TODO float's precision is about bits not decimal digits; should be ok for now to over-size // TODO float's precision is about bits not decimal digits; should be ok for now to over-size
.length(63); .length(63);
} }
else if (ctx.datatype().native_datatype_element().NUMBER() != null) { else if (ctx.native_datatype_element().NUMBER() != null) {
columnEditor columnEditor
.jdbcType(Types.NUMERIC) .jdbcType(Types.NUMERIC)
.type("NUMBER"); .type("NUMBER");
@ -204,66 +250,56 @@ else if (ctx.datatype().native_datatype_element().NUMBER() != null) {
setScale(precisionPart, columnEditor); setScale(precisionPart, columnEditor);
} }
} }
else if (ctx.datatype().native_datatype_element().BLOB() != null) { else if (ctx.native_datatype_element().BLOB() != null) {
columnEditor columnEditor
.jdbcType(Types.BLOB) .jdbcType(Types.BLOB)
.type("BLOB"); .type("BLOB");
} }
else if (ctx.datatype().native_datatype_element().CLOB() != null) { else if (ctx.native_datatype_element().CLOB() != null) {
columnEditor columnEditor
.jdbcType(Types.CLOB) .jdbcType(Types.CLOB)
.type("CLOB"); .type("CLOB");
} }
else { else {
throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().native_datatype_element().getText()); throw new IllegalArgumentException("Unsupported column type: " + ctx.native_datatype_element().getText());
} }
} }
else if (ctx.datatype().INTERVAL() != null else if (ctx.INTERVAL() != null
&& ctx.datatype().YEAR() != null && ctx.YEAR() != null
&& ctx.datatype().TO() != null && ctx.TO() != null
&& ctx.datatype().MONTH() != null) { && ctx.MONTH() != null) {
columnEditor columnEditor
.jdbcType(OracleTypes.INTERVALYM) .jdbcType(OracleTypes.INTERVALYM)
.type("INTERVAL YEAR TO MONTH") .type("INTERVAL YEAR TO MONTH")
.length(2); .length(2);
if (!ctx.datatype().expression().isEmpty()) { if (!ctx.expression().isEmpty()) {
columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText()))); columnEditor.length(Integer.valueOf((ctx.expression(0).getText())));
} }
} }
else if (ctx.datatype().INTERVAL() != null else if (ctx.INTERVAL() != null
&& ctx.datatype().DAY() != null && ctx.DAY() != null
&& ctx.datatype().TO() != null && ctx.TO() != null
&& ctx.datatype().SECOND() != null) { && ctx.SECOND() != null) {
columnEditor columnEditor
.jdbcType(OracleTypes.INTERVALDS) .jdbcType(OracleTypes.INTERVALDS)
.type("INTERVAL DAY TO SECOND") .type("INTERVAL DAY TO SECOND")
.length(2) .length(2)
.scale(6); .scale(6);
for (final PlSqlParser.ExpressionContext e : ctx.datatype().expression()) { for (final PlSqlParser.ExpressionContext e : ctx.expression()) {
if (e.getSourceInterval().startsAfter(ctx.datatype().TO().getSourceInterval())) { if (e.getSourceInterval().startsAfter(ctx.TO().getSourceInterval())) {
columnEditor.scale(Integer.valueOf(e.getText())); columnEditor.scale(Integer.valueOf(e.getText()));
} }
else { else {
columnEditor.length(Integer.valueOf(e.getText())); columnEditor.length(Integer.valueOf(e.getText()));
} }
} }
if (!ctx.datatype().expression().isEmpty()) { if (!ctx.expression().isEmpty()) {
columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText()))); columnEditor.length(Integer.valueOf((ctx.expression(0).getText())));
} }
} }
else { else {
throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().getText()); throw new IllegalArgumentException("Unsupported column type: " + ctx.getText());
} }
boolean hasNotNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NOT() != null);
// todo move to enterExpression and apply type conversion
if (ctx.DEFAULT() != null) {
String defaultValue = ctx.expression().getText();
columnEditor.defaultValue(defaultValue);
}
// todo move to nonNull
columnEditor.optional(!hasNotNullConstraint);
} }
private int getVarCharDefaultLength() { private int getVarCharDefaultLength() {

View File

@ -5,9 +5,6 @@
*/ */
package io.debezium.connector.oracle.antlr.listener; package io.debezium.connector.oracle.antlr.listener;
import static io.debezium.connector.oracle.antlr.listener.ParserUtils.getColumnName;
import static io.debezium.connector.oracle.antlr.listener.ParserUtils.getTableName;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -20,6 +17,7 @@
import io.debezium.relational.Table; import io.debezium.relational.Table;
import io.debezium.relational.TableEditor; import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.text.ParsingException;
public class CreateTableParserListener extends BaseParserListener { public class CreateTableParserListener extends BaseParserListener {
@ -29,6 +27,7 @@ public class CreateTableParserListener extends BaseParserListener {
private String schemaName; private String schemaName;
private OracleDdlParser parser; private OracleDdlParser parser;
private ColumnDefinitionParserListener columnDefinitionParserListener; private ColumnDefinitionParserListener columnDefinitionParserListener;
private String inlinePrimaryKey;
CreateTableParserListener(final String catalogName, final String schemaName, final OracleDdlParser parser, CreateTableParserListener(final String catalogName, final String schemaName, final OracleDdlParser parser,
final List<ParseTreeListener> listeners) { final List<ParseTreeListener> listeners) {
@ -44,12 +43,21 @@ public void enterCreate_table(PlSqlParser.Create_tableContext ctx) {
throw new IllegalArgumentException("Only relational tables are supported"); throw new IllegalArgumentException("Only relational tables are supported");
} }
TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name())); TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name()));
if (parser.databaseTables().forTable(tableId) == null) {
tableEditor = parser.databaseTables().editOrCreateTable(tableId); tableEditor = parser.databaseTables().editOrCreateTable(tableId);
super.enterCreate_table(ctx); super.enterCreate_table(ctx);
} }
}
@Override @Override
public void exitCreate_table(PlSqlParser.Create_tableContext ctx) { public void exitCreate_table(PlSqlParser.Create_tableContext ctx) {
if (inlinePrimaryKey != null) {
if (!tableEditor.primaryKeyColumnNames().isEmpty()) {
throw new ParsingException(null, "Can only specify in-line or out-of-line primary keys but not both");
}
tableEditor.setPrimaryKeyNames(inlinePrimaryKey);
}
Table table = getTable(); Table table = getTable();
assert table != null; assert table != null;
@ -57,7 +65,7 @@ public void exitCreate_table(PlSqlParser.Create_tableContext ctx) {
listeners.remove(columnDefinitionParserListener); listeners.remove(columnDefinitionParserListener);
columnDefinitionParserListener = null; columnDefinitionParserListener = null;
parser.databaseTables().overwriteTable(table); parser.databaseTables().overwriteTable(table);
// parser.signalCreateTable(tableEditor.tableId(), ctx); todo ? parser.signalCreateTable(tableEditor.tableId(), ctx);
}, tableEditor, table); }, tableEditor, table);
super.exitCreate_table(ctx); super.exitCreate_table(ctx);
@ -66,11 +74,10 @@ public void exitCreate_table(PlSqlParser.Create_tableContext ctx) {
@Override @Override
public void enterColumn_definition(PlSqlParser.Column_definitionContext ctx) { public void enterColumn_definition(PlSqlParser.Column_definitionContext ctx) {
parser.runIfNotNull(() -> { parser.runIfNotNull(() -> {
String columnName = ParserUtils.stripeQuotes(getColumnName(ctx.column_name())); String columnName = getColumnName(ctx.column_name());
ColumnEditor columnEditor = Column.editor().name(columnName); ColumnEditor columnEditor = Column.editor().name(columnName);
if (columnDefinitionParserListener == null) { if (columnDefinitionParserListener == null) {
columnDefinitionParserListener = new ColumnDefinitionParserListener(tableEditor, columnEditor, parser.dataTypeResolver()); columnDefinitionParserListener = new ColumnDefinitionParserListener(tableEditor, columnEditor, parser, listeners);
// todo: this explicit call is for the first column, should it be fixed?
columnDefinitionParserListener.enterColumn_definition(ctx); columnDefinitionParserListener.enterColumn_definition(ctx);
listeners.add(columnDefinitionParserListener); listeners.add(columnDefinitionParserListener);
} }
@ -89,14 +96,30 @@ public void exitColumn_definition(PlSqlParser.Column_definitionContext ctx) {
} }
@Override @Override
public void exitOut_of_line_constraint(PlSqlParser.Out_of_line_constraintContext ctx) { public void exitInline_constraint(PlSqlParser.Inline_constraintContext ctx) {
if (ctx.PRIMARY() != null) { if (ctx.PRIMARY() != null) {
if (ctx.getParent() instanceof PlSqlParser.Column_definitionContext) {
PlSqlParser.Column_definitionContext columnCtx = (PlSqlParser.Column_definitionContext) ctx.getParent();
inlinePrimaryKey = getColumnName(columnCtx.column_name());
}
}
super.exitInline_constraint(ctx);
}
@Override
public void exitOut_of_line_constraint(PlSqlParser.Out_of_line_constraintContext ctx) {
parser.runIfNotNull(() -> {
if (ctx.PRIMARY() != null) {
if (inlinePrimaryKey != null) {
throw new ParsingException(null, "Cannot specify inline and out of line primary keys");
}
List<String> pkColumnNames = ctx.column_name().stream() List<String> pkColumnNames = ctx.column_name().stream()
.map(ParserUtils::getColumnName) .map(this::getColumnName)
.collect(Collectors.toList()); .collect(Collectors.toList());
tableEditor.setPrimaryKeyNames(pkColumnNames); tableEditor.setPrimaryKeyNames(pkColumnNames);
} }
}, tableEditor);
super.exitOut_of_line_constraint(ctx); super.exitOut_of_line_constraint(ctx);
} }

View File

@ -5,17 +5,14 @@
*/ */
package io.debezium.connector.oracle.antlr.listener; package io.debezium.connector.oracle.antlr.listener;
import static io.debezium.connector.oracle.antlr.listener.ParserUtils.getTableName;
import io.debezium.connector.oracle.antlr.OracleDdlParser; import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
/** /**
* This class is parsing Oracle drop table statements. * This class is parsing Oracle drop table statements.
*/ */
public class DropTableParserListener extends PlSqlParserBaseListener { public class DropTableParserListener extends BaseParserListener {
private String catalogName; private String catalogName;
private String schemaName; private String schemaName;
@ -29,8 +26,9 @@ public class DropTableParserListener extends PlSqlParserBaseListener {
@Override @Override
public void enterDrop_table(final PlSqlParser.Drop_tableContext ctx) { public void enterDrop_table(final PlSqlParser.Drop_tableContext ctx) {
TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name())); TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name().get(0)));
parser.databaseTables().removeTable(tableId); parser.databaseTables().removeTable(tableId);
parser.signalDropTable(tableId, ctx);
super.enterDrop_table(ctx); super.enterDrop_table(ctx);
} }
} }

View File

@ -26,15 +26,6 @@ public class ParserUtils {
private ParserUtils() { private ParserUtils() {
} }
/**
* This method returns pure column name without quotes
* @param ctx column name context
* @return column name
*/
public static String getColumnName(final PlSqlParser.Column_nameContext ctx) {
return stripeQuotes(ctx.identifier().id_expression().getText());
}
/** /**
* stripes double quotes that surrounds a string * stripes double quotes that surrounds a string
* @param text text * @param text text

View File

@ -14,11 +14,13 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningDmlParser; import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningDmlParser;
import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleValueConverters; import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.Scn;
@ -76,13 +78,12 @@ class LogMinerQueryResultProcessor {
this.clock = clock; this.clock = clock;
this.historyRecorder = historyRecorder; this.historyRecorder = historyRecorder;
this.connectorConfig = connectorConfig; this.connectorConfig = connectorConfig;
this.dmlParser = resolveParser(connectorConfig, jdbcConnection); this.dmlParser = resolveParser(connectorConfig, schema.getValueConverters());
} }
private static DmlParser resolveParser(OracleConnectorConfig connectorConfig, OracleConnection connection) { private static DmlParser resolveParser(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters) {
if (connectorConfig.getLogMiningDmlParser().equals(LogMiningDmlParser.LEGACY)) { if (connectorConfig.getLogMiningDmlParser().equals(LogMiningDmlParser.LEGACY)) {
OracleValueConverters converter = new OracleValueConverters(connectorConfig, connection); return new SimpleDmlParser(connectorConfig.getCatalogName(), valueConverters);
return new SimpleDmlParser(connectorConfig.getCatalogName(), converter);
} }
return new LogMinerDmlParser(); return new LogMinerDmlParser();
} }
@ -163,9 +164,26 @@ int processResult(ResultSet resultSet) {
// DDL // DDL
if (operationCode == RowMapper.DDL) { if (operationCode == RowMapper.DDL) {
// todo: DDL operations are not yet supported during streaming while using LogMiner.
historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql); historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql);
LOGGER.info("DDL: {}, REDO_SQL: {}", logMessage, redoSql); LOGGER.info("DDL: {}, REDO_SQL: {}", logMessage, redoSql);
try {
assert tableName != null;
final TableId tableId = RowMapper.getTableId(connectorConfig.getCatalogName(), resultSet);
dispatcher.dispatchSchemaChangeEvent(tableId,
new OracleSchemaChangeEventEmitter(
connectorConfig,
offsetContext,
tableId,
tableId.catalog(),
tableId.schema(),
redoSql,
schema,
changeTime.toInstant(),
streamingMetrics));
}
catch (InterruptedException e) {
throw new DebeziumException("Failed to dispatch DDL event", e);
}
continue; continue;
} }
@ -220,8 +238,7 @@ int processResult(ResultSet resultSet) {
streamingMetrics.setOldestScn(scn); streamingMetrics.setOldestScn(scn);
} }
offsetContext.setTransactionId(txId); offsetContext.setTransactionId(txId);
offsetContext.setSourceTime(timestamp.toInstant()); offsetContext.tableEvent(tableId, timestamp.toInstant());
offsetContext.setTableId(tableId);
if (counter == 0) { if (counter == 0) {
offsetContext.setCommitScn(commitScn); offsetContext.setCommitScn(commitScn);
} }

View File

@ -1,27 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;
import io.debezium.connector.oracle.BaseOracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDdlEntry;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
/**
* {@link SchemaChangeEventEmitter} implementation based on Oracle LogMiner utility.
*/
public class LogMinerSchemaChangeEventEmitter extends BaseOracleSchemaChangeEventEmitter {
public LogMinerSchemaChangeEventEmitter(OracleOffsetContext offsetContext, TableId tableId, LogMinerDdlEntry ddlLcr) {
super(offsetContext,
tableId,
tableId.catalog(), // todo tableId should be enough
tableId.schema(), // todo same here
ddlLcr.getDdlText(),
ddlLcr.getCommandType());
}
}

View File

@ -241,7 +241,9 @@ static String logMinerContentsQuery(OracleConnectorConfig connectorConfig, Strin
query.append("SCN > ? AND SCN <= ? "); query.append("SCN > ? AND SCN <= ? ");
query.append("AND ("); query.append("AND (");
// MISSING_SCN/DDL only when not performed by excluded users // MISSING_SCN/DDL only when not performed by excluded users
query.append("(OPERATION_CODE IN (5,34) AND USERNAME NOT IN (").append(getExcludedUsers(logMinerUser)).append(")) "); // For DDL, the `INTERNAL DDL%` info rows should be excluded as these are commands executed by the database that
// typically perform operations such as renaming a deleted object when dropped if the drop doesn't specify PURGE
query.append("(OPERATION_CODE IN (5,34) AND USERNAME NOT IN (").append(getExcludedUsers(logMinerUser)).append(") AND INFO NOT LIKE 'INTERNAL DDL%') ");
// COMMIT/ROLLBACK // COMMIT/ROLLBACK
query.append("OR (OPERATION_CODE IN (7,36)) "); query.append("OR (OPERATION_CODE IN (7,36)) ");
// INSERT/UPDATE/DELETE // INSERT/UPDATE/DELETE

View File

@ -9,11 +9,14 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource.PositionAndScn; import io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource.PositionAndScn;
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.util.Clock; import io.debezium.util.Clock;
@ -34,16 +37,20 @@ class LcrEventHandler implements XStreamLCRCallbackHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(LcrEventHandler.class); private static final Logger LOGGER = LoggerFactory.getLogger(LcrEventHandler.class);
private final OracleConnectorConfig connectorConfig;
private final ErrorHandler errorHandler; private final ErrorHandler errorHandler;
private final EventDispatcher<TableId> dispatcher; private final EventDispatcher<TableId> dispatcher;
private final Clock clock; private final Clock clock;
private final RelationalDatabaseSchema schema; private final OracleDatabaseSchema schema;
private final OracleOffsetContext offsetContext; private final OracleOffsetContext offsetContext;
private final boolean tablenameCaseInsensitive; private final boolean tablenameCaseInsensitive;
private final XstreamStreamingChangeEventSource eventSource; private final XstreamStreamingChangeEventSource eventSource;
private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
public LcrEventHandler(ErrorHandler errorHandler, EventDispatcher<TableId> dispatcher, Clock clock, RelationalDatabaseSchema schema, public LcrEventHandler(OracleConnectorConfig connectorConfig, ErrorHandler errorHandler, EventDispatcher<TableId> dispatcher, Clock clock,
OracleOffsetContext offsetContext, boolean tablenameCaseInsensitive, XstreamStreamingChangeEventSource eventSource) { OracleDatabaseSchema schema, OracleOffsetContext offsetContext, boolean tablenameCaseInsensitive,
XstreamStreamingChangeEventSource eventSource, OracleStreamingChangeEventSourceMetrics streamingMetrics) {
this.connectorConfig = connectorConfig;
this.errorHandler = errorHandler; this.errorHandler = errorHandler;
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.clock = clock; this.clock = clock;
@ -51,6 +58,7 @@ public LcrEventHandler(ErrorHandler errorHandler, EventDispatcher<TableId> dispa
this.offsetContext = offsetContext; this.offsetContext = offsetContext;
this.tablenameCaseInsensitive = tablenameCaseInsensitive; this.tablenameCaseInsensitive = tablenameCaseInsensitive;
this.eventSource = eventSource; this.eventSource = eventSource;
this.streamingMetrics = streamingMetrics;
} }
@Override @Override
@ -79,8 +87,7 @@ public void processLCR(LCR lcr) throws StreamsException {
offsetContext.setScn(lcrPosition.getScn()); offsetContext.setScn(lcrPosition.getScn());
offsetContext.setLcrPosition(lcrPosition.toString()); offsetContext.setLcrPosition(lcrPosition.toString());
offsetContext.setTransactionId(lcr.getTransactionId()); offsetContext.setTransactionId(lcr.getTransactionId());
offsetContext.setSourceTime(lcr.getSourceTime().timestampValue().toInstant()); offsetContext.tableEvent(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName()), lcr.getSourceTime().timestampValue().toInstant());
offsetContext.setTableId(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName()));
try { try {
if (lcr instanceof RowLCR) { if (lcr instanceof RowLCR) {
@ -125,7 +132,16 @@ private void dispatchSchemaChangeEvent(DDLLCR ddlLcr) throws InterruptedExceptio
dispatcher.dispatchSchemaChangeEvent( dispatcher.dispatchSchemaChangeEvent(
tableId, tableId,
new XStreamSchemaChangeEventEmitter(offsetContext, tableId, ddlLcr)); new OracleSchemaChangeEventEmitter(
connectorConfig,
offsetContext,
tableId,
ddlLcr.getSourceDatabaseName(),
ddlLcr.getObjectOwner(),
ddlLcr.getDDLText(),
schema,
ddlLcr.getSourceTime().timestampValue().toInstant(),
streamingMetrics));
} }
private TableId getTableId(LCR lcr) { private TableId getTableId(LCR lcr) {

View File

@ -78,6 +78,7 @@ public StreamingChangeEventSource getSource(OracleConnectorConfig connectorConfi
dispatcher, dispatcher,
errorHandler, errorHandler,
clock, clock,
schema); schema,
streamingMetrics);
} }
} }

View File

@ -1,30 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.xstream;
import io.debezium.connector.oracle.BaseOracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import oracle.streams.DDLLCR;
/**
* {@link SchemaChangeEventEmitter} implementation based on Oracle.
*
* @author Gunnar Morling
*/
public class XStreamSchemaChangeEventEmitter extends BaseOracleSchemaChangeEventEmitter {
public XStreamSchemaChangeEventEmitter(OracleOffsetContext offsetContext, TableId tableId, DDLLCR ddlLcr) {
super(offsetContext,
tableId,
ddlLcr.getSourceDatabaseName(),
ddlLcr.getObjectOwner(),
ddlLcr.getDDLText(),
ddlLcr.getCommandType());
}
}

View File

@ -17,6 +17,7 @@
import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleDatabaseVersion; import io.debezium.connector.oracle.OracleDatabaseVersion;
import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.SourceInfo; import io.debezium.connector.oracle.SourceInfo;
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
@ -42,15 +43,12 @@ public class XstreamStreamingChangeEventSource implements StreamingChangeEventSo
private static final Logger LOGGER = LoggerFactory.getLogger(XstreamStreamingChangeEventSource.class); private static final Logger LOGGER = LoggerFactory.getLogger(XstreamStreamingChangeEventSource.class);
private final OracleConnection jdbcConnection; private final OracleConnection jdbcConnection;
private final EventDispatcher<TableId> dispatcher;
private final ErrorHandler errorHandler; private final ErrorHandler errorHandler;
private final Clock clock;
private final OracleDatabaseSchema schema;
private final OracleOffsetContext offsetContext; private final OracleOffsetContext offsetContext;
private final String xStreamServerName; private final String xStreamServerName;
private volatile XStreamOut xsOut; private volatile XStreamOut xsOut;
private final boolean tablenameCaseInsensitive;
private final int posVersion; private final int posVersion;
private final LcrEventHandler eventHandler;
/** /**
* A message box between thread that is informed about committed offsets and the XStream thread. * A message box between thread that is informed about committed offsets and the XStream thread.
* When the last offset is committed its value is passed to the XStream thread and a watermark is * When the last offset is committed its value is passed to the XStream thread and a watermark is
@ -60,17 +58,19 @@ public class XstreamStreamingChangeEventSource implements StreamingChangeEventSo
*/ */
private final AtomicReference<PositionAndScn> lcrMessage = new AtomicReference<>(); private final AtomicReference<PositionAndScn> lcrMessage = new AtomicReference<>();
public XstreamStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext, OracleConnection jdbcConnection, public XstreamStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext,
EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema) { OracleConnection jdbcConnection, EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
this.jdbcConnection = jdbcConnection; this.jdbcConnection = jdbcConnection;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler; this.errorHandler = errorHandler;
this.clock = clock;
this.schema = schema;
this.offsetContext = offsetContext; this.offsetContext = offsetContext;
this.xStreamServerName = connectorConfig.getXoutServerName(); this.xStreamServerName = connectorConfig.getXoutServerName();
this.tablenameCaseInsensitive = jdbcConnection.getTablenameCaseInsensitivity(connectorConfig);
this.posVersion = resolvePosVersion(jdbcConnection, connectorConfig); this.posVersion = resolvePosVersion(jdbcConnection, connectorConfig);
boolean tableNameCaseInsensitive = jdbcConnection.getTablenameCaseInsensitivity(connectorConfig);
this.eventHandler = new LcrEventHandler(connectorConfig, errorHandler, dispatcher, clock, schema, offsetContext, tableNameCaseInsensitive, this,
streamingMetrics);
} }
@Override @Override
@ -90,13 +90,10 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
xsOut = XStreamOut.attach((oracle.jdbc.OracleConnection) xsConnection.connection(), xStreamServerName, xsOut = XStreamOut.attach((oracle.jdbc.OracleConnection) xsConnection.connection(), xStreamServerName,
startPosition, 1, 1, XStreamOut.DEFAULT_MODE); startPosition, 1, 1, XStreamOut.DEFAULT_MODE);
LcrEventHandler handler = new LcrEventHandler(errorHandler, dispatcher, clock, schema, offsetContext,
this.tablenameCaseInsensitive, this);
// 2. receive events while running // 2. receive events while running
while (context.isRunning()) { while (context.isRunning()) {
LOGGER.trace("Receiving LCR"); LOGGER.trace("Receiving LCR");
xsOut.receiveLCRCallback(handler, XStreamOut.DEFAULT_MODE); xsOut.receiveLCRCallback(eventHandler, XStreamOut.DEFAULT_MODE);
} }
} }
finally { finally {

View File

@ -63,6 +63,7 @@ public static void closeConnection() throws SQLException {
if (connection != null) { if (connection != null) {
TestHelper.dropTable(connection, "debezium.table1"); TestHelper.dropTable(connection, "debezium.table1");
TestHelper.dropTable(connection, "debezium.table2"); TestHelper.dropTable(connection, "debezium.table2");
TestHelper.dropTable(connection, "debezium.table3");
connection.close(); connection.close();
} }
} }

View File

@ -41,7 +41,6 @@
import io.debezium.connector.oracle.junit.RequireDatabaseOption; import io.debezium.connector.oracle.junit.RequireDatabaseOption;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnDatabaseOptionRule; import io.debezium.connector.oracle.junit.SkipTestDependingOnDatabaseOptionRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper; import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
@ -134,6 +133,7 @@ public static void beforeClass() throws SQLException {
@AfterClass @AfterClass
public static void closeConnection() throws SQLException { public static void closeConnection() throws SQLException {
if (connection != null) { if (connection != null) {
TestHelper.dropTable(connection, "debezium.customer2");
TestHelper.dropTable(connection, "customer"); TestHelper.dropTable(connection, "customer");
TestHelper.dropTable(connection, "masked_hashed_column_table"); TestHelper.dropTable(connection, "masked_hashed_column_table");
TestHelper.dropTable(connection, "truncated_column_table"); TestHelper.dropTable(connection, "truncated_column_table");
@ -144,6 +144,8 @@ public static void closeConnection() throws SQLException {
@Before @Before
public void before() throws SQLException { public void before() throws SQLException {
TestHelper.dropTable(connection, "debezium.dbz800a");
TestHelper.dropTable(connection, "debezium.dbz800b");
connection.execute("delete from debezium.customer"); connection.execute("delete from debezium.customer");
connection.execute("delete from debezium.masked_hashed_column_table"); connection.execute("delete from debezium.masked_hashed_column_table");
connection.execute("delete from debezium.truncated_column_table"); connection.execute("delete from debezium.truncated_column_table");
@ -334,7 +336,6 @@ private void continueStreamingAfterSnapshot(Configuration config) throws Excepti
@Test @Test
@FixFor("DBZ-1223") @FixFor("DBZ-1223")
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "sendTxBatch randomly fails")
public void shouldStreamTransaction() throws Exception { public void shouldStreamTransaction() throws Exception {
Configuration config = TestHelper.defaultConfig() Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER") .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")
@ -441,7 +442,6 @@ private void assertTxBatch(Configuration config, int expectedRecordCount, int of
} }
@Test @Test
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "Test randomly fails in sendTxBatch")
public void shouldStreamAfterRestart() throws Exception { public void shouldStreamAfterRestart() throws Exception {
Configuration config = TestHelper.defaultConfig() Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER") .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")
@ -657,7 +657,6 @@ public void deleteWithoutTombstone() throws Exception {
} }
@Test @Test
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not yet support DDL during streaming")
public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Exception { public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Exception {
TestHelper.dropTable(connection, "debezium.customer2"); TestHelper.dropTable(connection, "debezium.customer2");
@ -679,7 +678,7 @@ public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Excepti
")"; ")";
connection.execute(ddl); connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.customer2 to " + TestHelper.getConnectorUserName()); TestHelper.streamTable(connection, "debezium.customer2");
connection.execute("INSERT INTO debezium.customer2 VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))"); connection.execute("INSERT INTO debezium.customer2 VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))");
connection.execute("COMMIT"); connection.execute("COMMIT");
@ -699,7 +698,6 @@ public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Excepti
@Test @Test
@FixFor("DBZ-800") @FixFor("DBZ-800")
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not yet support DDL during streaming")
public void shouldReceiveHeartbeatAlsoWhenChangingTableIncludeListTables() throws Exception { public void shouldReceiveHeartbeatAlsoWhenChangingTableIncludeListTables() throws Exception {
TestHelper.dropTable(connection, "debezium.dbz800a"); TestHelper.dropTable(connection, "debezium.dbz800a");
TestHelper.dropTable(connection, "debezium.dbz800b"); TestHelper.dropTable(connection, "debezium.dbz800b");
@ -725,6 +723,7 @@ public void shouldReceiveHeartbeatAlsoWhenChangingTableIncludeListTables() throw
// expecting two heartbeat records and one actual change record // expecting two heartbeat records and one actual change record
List<SourceRecord> records = consumeRecordsByTopic(3).allRecordsInOrder(); List<SourceRecord> records = consumeRecordsByTopic(3).allRecordsInOrder();
if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.XSTREAM)) {
// expecting no change record for s1.a but a heartbeat // expecting no change record for s1.a but a heartbeat
verifyHeartbeatRecord(records.get(0)); verifyHeartbeatRecord(records.get(0));
@ -732,6 +731,14 @@ public void shouldReceiveHeartbeatAlsoWhenChangingTableIncludeListTables() throw
verifyHeartbeatRecord(records.get(1)); verifyHeartbeatRecord(records.get(1));
VerifyRecord.isValidInsert(records.get(2), "ID", 2); VerifyRecord.isValidInsert(records.get(2), "ID", 2);
} }
else {
// Unlike Xstream, LogMiner's query will exclude the insert for dbz800a and
// so we won't actually emit a Heartbeat for that record at all but we will
// instead emit one when we detect dbz800b only.
verifyHeartbeatRecord(records.get(0));
VerifyRecord.isValidInsert(records.get(1), "ID", 2);
}
}
@Test @Test
@FixFor("DBZ-775") @FixFor("DBZ-775")

View File

@ -0,0 +1,1152 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import static org.fest.assertions.Assertions.assertThat;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.TableId;
import io.debezium.util.Testing;
/**
* Integration tests for the Oracle DDL and schema migration.
*
* @author Chris Cranford
*/
public class OracleSchemaMigrationIT extends AbstractConnectorTest {
private OracleConnection connection;
@Before
public void beforeEach() throws Exception {
connection = TestHelper.testConnection();
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
TestHelper.dropTables(connection, "debezium_signal", "tablea", "tableb", "\"tableC\"");
}
@After
public void afterEach() throws Exception {
if (connection != null) {
TestHelper.dropTables(connection, "tablea", "tableb", "\"tableC\"");
connection.close();
}
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamNewlyCreatedNotFilteredTable() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
createTable("debezium.tableb", "CREATE TABLE debezium.tableb (ID numeric(9,0) primary key, data varchar2(50))");
// Streaming should have generated 2 schema change event for tableb, create & alter
// Connector does not apply any filters, so tableb should be included
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(2);
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEB");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEB");
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(1);
assertStreamingSchemaChange(record);
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEB");
// Insert record into tableb and verify it is streamed
connection.execute("INSERT INTO debezium.tableb (ID,DATA) values (1, 'Test')");
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEB"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEB")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEB");
}
@Test
@FixFor("DBZ-2916")
public void shouldNotStreamNewlyCreatedTableDueToFilters() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TABLEA")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
createTable("debezium.tableb", "CREATE TABLE debezium.tableb (ID numeric(9,0) primary key, data varchar2(50))");
// While tableb is created during streaming, it should not generate any schema changes
// because the filters explicitly only capture changes for tablea, not tableb.
assertNoRecordsToConsume();
// Insert a record into tableb
// This record won't be captured due to the configuration filters
connection.execute("INSERT INTO debezium.tableb (ID,DATA) values (1, 'B')");
assertNoRecordsToConsume();
// Insert a record into tablea
// This record should be captured as this table is included in the filters
connection.execute("INSERT INTO debezium.tablea (ID,DATA) values (1, 'A')");
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertThat(((Struct) record.value()).getStruct("after").get("DATA")).isEqualTo("A");
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamAlterTableAddColumnSchemaChange() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
// Alter tablea and add a single column using single column syntax
// Then insert a new record immediately afterward into tablea
connection.executeWithoutCommitting("ALTER TABLE debezium.tablea ADD data2 numeric");
connection.execute("INSERT INTO debezium.tablea (ID,DATA,DATA2) values (2, 'Test2', 100)");
// There should be 2 records generated, 1 schema change and 1 insert
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
// Verify schema change
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
// Verify insert
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(3);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo("Test2");
assertThat(after.get("DATA2")).isEqualTo(BigDecimal.valueOf(100));
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamAlterTableAddMultipleColumnsSchemaChange() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
// Alter tablea and add multiple columns using multi-column syntax
// Then insert a new record immediately afterward into tablea
connection.executeWithoutCommitting("ALTER TABLE debezium.tablea ADD (data2 numeric, data3 varchar2(25))");
connection.execute("INSERT INTO debezium.tablea (ID,DATA,DATA2,DATA3) values (2, 'Test2', 100, 'a')");
// There should be 2 records generated, 1 schema change and 1 insert
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
// Verify schema change
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
// Verify insert
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(4);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo("Test2");
assertThat(after.get("DATA2")).isEqualTo(BigDecimal.valueOf(100));
assertThat(after.get("DATA3")).isEqualTo("a");
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamAlterTableRenameColumnSchemaChange() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
// Alter tablea and rename column
// Then insert a new record immediately afterward into tablea
connection.executeWithoutCommitting("ALTER TABLE debezium.tablea RENAME COLUMN data TO data1");
connection.execute("INSERT INTO debezium.tablea (ID,DATA1) values (2, 'Test2')");
// There should be 2 records generated, 1 schema change and 1 insert
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
// Verify schema change
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
// Verify insert
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.schema().field("DATA")).isNull();
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA1")).isEqualTo("Test2");
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamAlterTableDropColumnSchemaChange() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
// Alter tablea and drop column
// Then insert a new record immediately afterward into tablea
connection.executeWithoutCommitting("ALTER TABLE debezium.tablea DROP COLUMN data");
connection.execute("INSERT INTO debezium.tablea (ID) values (2)");
// There should be 2 records generated, 1 schema change and 1 insert
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
// Verify schema change
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
// Verify insert
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(1);
assertThat(after.schema().field("DATA")).isNull();
assertThat(after.get("ID")).isEqualTo(2);
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamAlterTableDropMultipleColumnsSchemaChange() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data1 varchar2(50), data2 numeric)");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (ID,DATA1,DATA2) values (1, 'Test', 100)");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(3);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA1")).isEqualTo("Test");
assertThat(after.get("DATA2")).isEqualTo(BigDecimal.valueOf(100));
// Alter tablea and drop multiple columns
// Then insert a new record immediately afterward into tablea
connection.executeWithoutCommitting("ALTER TABLE debezium.tablea DROP (data1, data2)");
connection.execute("INSERT INTO debezium.tablea (ID) values (2)");
// There should be 2 records generated, 1 schema change and 1 insert
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
// Verify schema change
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
// Verify insert
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(1);
assertThat(after.schema().field("DATA1")).isNull();
assertThat(after.schema().field("DATA2")).isNull();
assertThat(after.get("ID")).isEqualTo(2);
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamAlterTableRenameTableSchemaChange() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
// Alter tablea and rename to tableb
// Then insert a new record immediately afterward into tablea
connection.executeWithoutCommitting("ALTER TABLE debezium.tablea RENAME TO tableb");
connection.execute("INSERT INTO debezium.tableb (ID,DATA) values (2, 'Test2')");
// There should be 2 records generated, 1 schema change and 1 insert
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEB"))).hasSize(1);
// Verify schema change
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEB,TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEB");
// Verify insert
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEB")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEB");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo("Test2");
}
@Test
@FixFor("DBZ-2916")
public void shouldNotStreamAfterTableRenameToExcludedName() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TABLEA")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
// Alter tablea and rename to tableb
// Then insert a new record immediately afterward into tablea
connection.executeWithoutCommitting("ALTER TABLE debezium.tablea RENAME TO tableb");
connection.execute("INSERT INTO debezium.tableb (ID,DATA) values (2, 'Test2')");
// There should be 1 records generated, 1 schema change and
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
// Verify schema change contains no changes
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEB,TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).isEmpty();
// There should be no other events to consume
assertNoRecordsToConsume();
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamAlterTableChangeColumnDataType() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data numeric)");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Alter table, change data from numeric to varchar2(50) then insert new record
connection.execute("ALTER TABLE debezium.tablea modify (data varchar2(50))");
connection.execute("INSERT INTO debezium.tablea (id, data) values (1, 'Test')");
// There should be 2 records generated, 1 schema change and 1 insert
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
// Verify schema change
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
// Verify insert
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamAlterTableChangeColumnNullability() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50) not null)");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
Schema dataSchema = after.schema().field("DATA").schema();
assertThat(dataSchema.isOptional()).isFalse();
// Alter table, change data from not null to null
connection.execute("ALTER TABLE debezium.tablea modify (data varchar2(50) null)");
connection.execute("INSERT INTO debezium.tablea (id) values (2)");
// There should be 2 records generated, 1 schema change and 1 insert
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
// Verify schema change
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
// Verify insert
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isNull();
dataSchema = after.schema().field("DATA").schema();
assertThat(dataSchema.isOptional()).isTrue();
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamAlterTableChangeColumnPrecisionAndScale() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data numeric(8,2) not null)");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (ID, DATA) values (1, 12345.67)");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo(BigDecimal.valueOf(12345.67));
// Verify column schema definition
Schema dataSchema = after.schema().field("DATA").schema();
assertThat(dataSchema.parameters().get("scale")).isEqualTo("2");
assertThat(dataSchema.parameters().get("connect.decimal.precision")).isEqualTo("8");
// Alter table, change data from not null to null
connection.execute("ALTER TABLE debezium.tablea modify (data numeric(10,3))");
connection.execute("INSERT INTO debezium.tablea (id, data) values (2, 234567.891)");
// There should be 2 records generated, 1 schema change and 1 insert
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
// Verify schema change
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
// Verify insert
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo(BigDecimal.valueOf(234567.891));
// Verify DATA column schema definition
dataSchema = after.schema().field("DATA").schema();
assertThat(dataSchema.parameters().get("scale")).isEqualTo("3");
assertThat(dataSchema.parameters().get("connect.decimal.precision")).isEqualTo("10");
}
@Test
@FixFor("DBZ-2916")
public void shouldStreamDropTable() throws Exception {
createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for tablea
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record into tablea
connection.execute("INSERT INTO debezium.tablea (id, data) values (1, 'Test')");
// The record should be emitted based on snapshot schema
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
// Drop the table
connection.execute("DROP TABLE debezium.tablea");
// Should emit a single schema change event
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "DROP", "DEBEZIUM", "TABLEA");
// Should be no more events
assertNoRecordsToConsume();
}
@Test
@FixFor("DBZ-2916")
public void shouldSnapshotAndStreamSchemaChangesUsingExplicitCasedNames() throws Exception {
createTable("debezium.\"tableC\"", "CREATE TABLE debezium.\"tableC\" (\"id\" numeric(9,0) primary key, \"data\" varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot should have generated 1 schema change event for "tableC"
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
SourceRecord record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertSnapshotSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "tableC");
List<Struct> tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "tableC");
assertTableChangePrimaryKeyNames(tableChanges.get(0), "id");
assertTableChangeColumn(tableChanges.get(0), 0, "id");
assertTableChangeColumn(tableChanges.get(0), 1, "data");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("ALTER TABLE debezium.\"tableC\" add \"data2\" number(9,0)");
// Should generate 1 schema change for tableC
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "tableC");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "tableC");
assertTableChangePrimaryKeyNames(tableChanges.get(0), "id");
assertTableChangeColumn(tableChanges.get(0), 0, "id");
assertTableChangeColumn(tableChanges.get(0), 1, "data");
assertTableChangeColumn(tableChanges.get(0), 2, "data2");
connection.execute("ALTER TABLE debezium.\"tableC\" add (\"data3\" number(9,0), \"data4\" varchar2(25))");
// Should generate 1 schema change for tableC
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "tableC");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "tableC");
assertTableChangePrimaryKeyNames(tableChanges.get(0), "id");
assertTableChangeColumn(tableChanges.get(0), 0, "id");
assertTableChangeColumn(tableChanges.get(0), 1, "data");
assertTableChangeColumn(tableChanges.get(0), 2, "data2");
assertTableChangeColumn(tableChanges.get(0), 3, "data3");
assertTableChangeColumn(tableChanges.get(0), 4, "data4");
connection.execute("ALTER TABLE debezium.\"tableC\" drop column \"data3\"");
// Should generate 1 schema change for tableC
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "tableC");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "tableC");
assertTableChangePrimaryKeyNames(tableChanges.get(0), "id");
assertTableChangeColumn(tableChanges.get(0), 0, "id");
assertTableChangeColumn(tableChanges.get(0), 1, "data");
assertTableChangeColumn(tableChanges.get(0), 2, "data2");
assertTableChangeColumn(tableChanges.get(0), 3, "data4");
connection.execute("ALTER TABLE debezium.\"tableC\" rename column \"data4\" to \"Data3\"");
// Should generate 1 schema change for tableC
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "tableC");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "tableC");
assertTableChangePrimaryKeyNames(tableChanges.get(0), "id");
assertTableChangeColumn(tableChanges.get(0), 0, "id");
assertTableChangeColumn(tableChanges.get(0), 1, "data");
assertTableChangeColumn(tableChanges.get(0), 2, "data2");
assertTableChangeColumn(tableChanges.get(0), 3, "Data3");
connection.execute("DROP TABLE debezium.\"tableC\"");
// Should generate 1 schema change for tableC
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
assertStreamingSchemaChange(record);
assertSourceTableInfo(record, "DEBEZIUM", "tableC");
tableChanges = ((Struct) record.value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "DROP", "DEBEZIUM", "tableC");
assertTableChangePrimaryKeyNames(tableChanges.get(0), "id");
assertTableChangeColumn(tableChanges.get(0), 0, "id");
assertTableChangeColumn(tableChanges.get(0), 1, "data");
assertTableChangeColumn(tableChanges.get(0), 2, "data2");
assertTableChangeColumn(tableChanges.get(0), 3, "Data3");
}
@Test
@FixFor("DBZ-2916")
public void shouldNotEmitDdlEventsForNonTableObjects() throws Exception {
try {
final LogInterceptor logInterceptor = new LogInterceptor();
// These roles are needed in order to perform certain DDL operations below.
// Any roles granted here should be revoked in the finally block.
TestHelper.grantRole("CREATE PROCEDURE");
TestHelper.grantRole("ALTER ANY PROCEDURE");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// todo: do we need to add more here?
final int expected = 7;
connection.execute("CREATE OR REPLACE FUNCTION mytestf() return number is x number(11,2); begin return x; END;");
connection.execute("DROP FUNCTION mytestf");
connection.execute("CREATE OR REPLACE PROCEDURE mytest() BEGIN select * from dual; END;");
connection.execute("DROP PROCEDURE mytest");
connection.execute("CREATE OR REPLACE PACKAGE pkgtest as function hire return number; END;");
connection.execute("CREATE OR REPLACE PACKAGE BODY pkgtest as function hire return number; begin return 0; end; END;");
connection.execute("DROP PACKAGE pkgtest");
try {
Awaitility.await()
.atMost(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS)
.until(() -> {
if (logInterceptor.countOccurrences("Processing DDL event ") == expected) {
return true;
}
return false;
});
stopConnector();
waitForConnectorShutdown(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
}
catch (ConditionTimeoutException e) {
assertThat(logInterceptor.countOccurrences("Processing DDL event ")).as("Did not get all expected DDL events").isEqualTo(expected);
}
// Make sure there are no events to process and that no DDL exceptions were logged
assertThat(logInterceptor.containsMessage("Producer failure")).as("Connector failure").isFalse();
assertNoRecordsToConsume();
}
finally {
TestHelper.revokeRole("ALTER ANY PROCEDURE");
TestHelper.revokeRole("CREATE PROCEDURE");
}
}
private static String getTableIdString(String schemaName, String tableName) {
return new TableId(TestHelper.getDatabaseName(), schemaName, tableName).toDoubleQuotedString();
}
private void createTable(String tableName, String sql) throws SQLException {
connection.execute(sql);
TestHelper.streamTable(connection, tableName);
}
private static void assertSnapshotSchemaChange(SourceRecord record) {
assertThat(record.topic()).isEqualTo(TestHelper.SERVER_NAME);
assertThat(((Struct) record.key()).getString("databaseName")).isEqualTo(TestHelper.getDatabaseName());
assertThat(record.sourceOffset().get("snapshot")).isEqualTo(true);
assertThat(((Struct) record.value()).getStruct("source").getString("snapshot")).isEqualTo("true");
}
private static void assertStreamingSchemaChange(SourceRecord record) {
assertThat(record.topic()).isEqualTo(TestHelper.SERVER_NAME);
assertThat(((Struct) record.key()).getString("databaseName")).isEqualTo(TestHelper.getDatabaseName());
assertThat(record.sourceOffset().get("snapshot")).isNull();
assertThat(((Struct) record.value()).getStruct("source").getString("snapshot")).isNull();
}
private static void assertTableChange(Struct change, String type, String schema, String table) {
assertThat(change.get("type")).isEqualTo(type);
assertThat(change.get("id")).isEqualTo(getTableIdString(schema, table));
}
private static void assertTableChangePrimaryKeyNames(Struct change, String... names) {
assertThat(change.getStruct("table").getArray("primaryKeyColumnNames")).isEqualTo(Arrays.asList(names));
}
private static void assertTableChangeColumn(Struct change, int index, String columnName) {
List<Struct> columns = change.getStruct("table").getArray("columns");
assertThat(columns.size()).isGreaterThan(index);
Struct column = columns.get(index);
assertThat(column.get("name")).isEqualTo(columnName);
}
private static void assertSourceTableInfo(SourceRecord record, String schema, String table) {
final Struct source = ((Struct) record.value()).getStruct("source");
assertThat(source.get("db")).isEqualTo(TestHelper.DATABASE);
assertThat(source.get("schema")).isEqualTo(schema);
assertThat(source.get("table")).isEqualTo(table);
}
private static String topicName(String schema, String table) {
return TestHelper.SERVER_NAME + "." + schema + "." + table;
}
}

View File

@ -108,13 +108,13 @@ public void signalSchemaChange() throws Exception {
connection.execute("INSERT INTO debezium.customer VALUES (2, 'Battle-Bug', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))"); connection.execute("INSERT INTO debezium.customer VALUES (2, 'Battle-Bug', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))");
// two schema changes, one data record, one signal record, one schema change, one data record // two schema changes, one data record, two schema changes (alters), one signal record, one schema change, one data record
final int expected = 2 + 1 + 1 + 1 + 1; final int expected = 2 + 1 + 2 + 1 + 1 + 1;
List<SourceRecord> records = consumeRecordsByTopic(expected).allRecordsInOrder(); List<SourceRecord> records = consumeRecordsByTopic(expected).allRecordsInOrder();
assertThat(records).hasSize(expected); assertThat(records).hasSize(expected);
final SourceRecord pre = records.get(0); final SourceRecord pre = records.get(0);
final SourceRecord post = records.get(5); final SourceRecord post = records.get(7);
Assertions.assertThat(((Struct) pre.key()).schema().fields()).hasSize(1); Assertions.assertThat(((Struct) pre.key()).schema().fields()).hasSize(1);

View File

@ -32,7 +32,7 @@ public void beforeEach() {
.build()); .build());
source = new SourceInfo(connectorConfig); source = new SourceInfo(connectorConfig);
source.setSourceTime(Instant.now()); source.setSourceTime(Instant.now());
source.setTableId(new TableId("c", "s", "t")); source.tableEvent(new TableId("c", "s", "t"));
} }
@Test @Test

View File

@ -71,7 +71,9 @@ public class OracleDmlParserTest {
public void setUp() { public void setUp() {
OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null); OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null);
ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME); ddlParser = new OracleDdlParser();
ddlParser.setCurrentSchema(SCHEMA_NAME);
ddlParser.setCurrentDatabase(CATALOG_NAME);
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters); sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters);
tables = new Tables(); tables = new Tables();

View File

@ -35,7 +35,7 @@ public class SqlUtilsTest {
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " + private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME " + "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME " +
"FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND ((" + "FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND ((" +
"OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','${user}')) " + "OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','${user}') AND INFO NOT LIKE 'INTERNAL DDL%') " +
"OR (OPERATION_CODE IN (7,36)) " + "OR (OPERATION_CODE IN (7,36)) " +
"OR (OPERATION_CODE IN (1,2,3) " + "OR (OPERATION_CODE IN (1,2,3) " +
"AND TABLE_NAME != '" + SqlUtils.LOGMNR_FLUSH_TABLE + "' " + "AND TABLE_NAME != '" + SqlUtils.LOGMNR_FLUSH_TABLE + "' " +

View File

@ -57,7 +57,9 @@ public class ValueHolderTest {
@Before @Before
public void setUp() { public void setUp() {
OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null); OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null);
ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME); ddlParser = new OracleDdlParser();
ddlParser.setCurrentSchema(SCHEMA_NAME);
ddlParser.setCurrentDatabase(CATALOG_NAME);
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters); sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters);
tables = new Tables(); tables = new Tables();
} }

View File

@ -301,6 +301,12 @@ public static void dropTable(OracleConnection connection, String table) {
} }
} }
public static void dropTables(OracleConnection connection, String... tables) {
for (String table : tables) {
dropTable(connection, table);
}
}
/** /**
* Enables a given table to be streamed by Oracle. * Enables a given table to be streamed by Oracle.
* *
@ -329,6 +335,50 @@ public static void purgeRecycleBin(OracleConnection connection) {
} }
} }
/**
* Grants the specified role to the {@link TestHelper#SCHEMA_USER} or the user configured using the
* configuration option {@code database.user}, whichever has precedence. If the configuration uses
* PDB, the grant will be performed in the PDB and not the CDB database.
*
* @param roleName role to be granted
* @throws RuntimeException if the role cannot be granted
*/
public static void grantRole(String roleName) {
final String pdbName = defaultConfig().build().getString(OracleConnectorConfig.PDB_NAME);
final String userName = testJdbcConfig().getString(JdbcConfiguration.USER);
try (OracleConnection connection = adminConnection()) {
if (pdbName != null) {
connection.setSessionToPdb(pdbName);
}
connection.execute("GRANT " + roleName + " TO " + userName);
}
catch (SQLException e) {
throw new RuntimeException("Failed to grant role '" + roleName + "' for user " + userName, e);
}
}
/**
* Revokes the specified role from the {@link TestHelper#SCHEMA_USER} or the user configured using
* the configuration option {@code database.user}, whichever has precedence. If the configuration
* uses PDB, the revoke will be performed in the PDB and not the CDB instance.
*
* @param roleName role to be revoked
* @throws RuntimeException if the role cannot be revoked
*/
public static void revokeRole(String roleName) {
final String pdbName = defaultConfig().build().getString(OracleConnectorConfig.PDB_NAME);
final String userName = testJdbcConfig().getString(JdbcConfiguration.USER);
try (OracleConnection connection = adminConnection()) {
if (pdbName != null) {
connection.setSessionToPdb(pdbName);
}
connection.execute("REVOKE " + roleName + " FROM " + userName);
}
catch (SQLException e) {
throw new RuntimeException("Failed to revoke role '" + roleName + "' for user " + userName, e);
}
}
public static int defaultMessageConsumerPollTimeout() { public static int defaultMessageConsumerPollTimeout() {
return 120; return 120;
} }

View File

@ -65,6 +65,10 @@ public boolean requiresLayout() {
return false; return false;
} }
public long countOccurrences(String text) {
return events.stream().filter(e -> e.getMessage().toString().contains(text)).count();
}
public boolean containsMessage(String text) { public boolean containsMessage(String text) {
for (LoggingEvent event : events) { for (LoggingEvent event : events) {
if (event.getMessage().toString().contains(text)) { if (event.getMessage().toString().contains(text)) {

View File

@ -119,7 +119,7 @@ alter_function
; ;
create_function_body create_function_body
: CREATE (OR REPLACE)? FUNCTION function_name ('(' parameter (',' parameter)* ')')? : CREATE (OR REPLACE)? FUNCTION function_name parameters_clause
RETURN type_spec (invoker_rights_clause | parallel_enable_clause | result_cache_clause | DETERMINISTIC)* RETURN type_spec (invoker_rights_clause | parallel_enable_clause | result_cache_clause | DETERMINISTIC)*
((PIPELINED? (IS | AS) (DECLARE? seq_of_declare_specs? body | call_spec)) | (PIPELINED | AGGREGATE) USING implementation_type_name) ';' ((PIPELINED? (IS | AS) (DECLARE? seq_of_declare_specs? body | call_spec)) | (PIPELINED | AGGREGATE) USING implementation_type_name) ';'
; ;
@ -178,11 +178,11 @@ package_obj_spec
; ;
procedure_spec procedure_spec
: PROCEDURE identifier ('(' parameter ( ',' parameter )* ')')? ';' : PROCEDURE identifier parameters_clause ';'
; ;
function_spec function_spec
: FUNCTION identifier ('(' parameter ( ',' parameter)* ')')? : FUNCTION identifier parameters_clause
RETURN type_spec PIPELINED? DETERMINISTIC? (RESULT_CACHE)? ';' RETURN type_spec PIPELINED? DETERMINISTIC? (RESULT_CACHE)? ';'
; ;
@ -209,19 +209,19 @@ alter_procedure
; ;
function_body function_body
: FUNCTION identifier ('(' parameter (',' parameter)* ')')? : FUNCTION identifier parameters_clause
RETURN type_spec (invoker_rights_clause | parallel_enable_clause | result_cache_clause | DETERMINISTIC)* RETURN type_spec (invoker_rights_clause | parallel_enable_clause | result_cache_clause | DETERMINISTIC)*
((PIPELINED? DETERMINISTIC? (IS | AS) (DECLARE? seq_of_declare_specs? body | call_spec)) | (PIPELINED | AGGREGATE) USING implementation_type_name) ';' ((PIPELINED? DETERMINISTIC? (IS | AS) (DECLARE? seq_of_declare_specs? body | call_spec)) | (PIPELINED | AGGREGATE) USING implementation_type_name) ';'
; ;
procedure_body procedure_body
: PROCEDURE identifier ('(' parameter (',' parameter)* ')')? (IS | AS) : PROCEDURE identifier parameters_clause (IS | AS)?
(DECLARE? seq_of_declare_specs? body | call_spec | EXTERNAL) ';' (DECLARE? seq_of_declare_specs? body | call_spec | EXTERNAL) ';'
; ;
create_procedure_body create_procedure_body
: CREATE (OR REPLACE)? PROCEDURE procedure_name ('(' parameter (',' parameter)* ')')? : CREATE (OR REPLACE)? PROCEDURE procedure_name parameters_clause
invoker_rights_clause? (IS | AS) invoker_rights_clause? (IS | AS)?
(DECLARE? seq_of_declare_specs? body | call_spec | EXTERNAL) ';' (DECLARE? seq_of_declare_specs? body | call_spec | EXTERNAL) ';'
; ;
@ -2140,7 +2140,7 @@ truncate_table
; ;
drop_table drop_table
: DROP TABLE tableview_name PURGE? SEMICOLON : DROP TABLE tableview_name (AS tableview_name)? PURGE? SEMICOLON
; ;
drop_view drop_view
@ -2659,7 +2659,8 @@ modify_col_substitutable
; ;
add_column_clause add_column_clause
: ADD ('(' (column_definition | virtual_column_definition) (',' (column_definition : ADD column_definition | virtual_column_definition
| ADD ('(' (column_definition | virtual_column_definition) (',' (column_definition
| virtual_column_definition) | virtual_column_definition)
)* )*
')' ')'
@ -2943,6 +2944,11 @@ c_parameters_clause
: PARAMETERS '(' (expressions | '.' '.' '.') ')' : PARAMETERS '(' (expressions | '.' '.' '.') ')'
; ;
parameters_clause
: ('(' parameter (',' parameter)* ')')?
| ('(' ')')?
;
parameter parameter
: parameter_name (IN | OUT | INOUT | NOCOPY)* type_spec? default_value_part? : parameter_name (IN | OUT | INOUT | NOCOPY)* type_spec? default_value_part?
; ;

View File

@ -1284,6 +1284,12 @@ Only alphanumeric characters and underscores must be used.
| |
|A list of host/port pairs that the connector will use for establishing an initial connection to the Kafka cluster. This connection will be used for retrieving database schema history previously stored by the connector, and for writing each DDL statement read from the source database. This should point to the same Kafka cluster used by the Kafka Connect process. |A list of host/port pairs that the connector will use for establishing an initial connection to the Kafka cluster. This connection will be used for retrieving database schema history previously stored by the connector, and for writing each DDL statement read from the source database. This should point to the same Kafka cluster used by the Kafka Connect process.
|[[oracle-property-database-history-skip-unparseable-ddl]]<<oracle-property-database-history-skip-unparseable-ddl, `+database.history.skip.unparseable.ddl+`>>
|`false`
|A Boolean value that specifies whether the connector should ignore malformed or unknown database statements or stop processing so a human can fix the issue.
The safe default is `false`.
Skipping should be used only with care as it can lead to data loss or mangling when the redo logs are being processed.
|[[oracle-property-snapshot-mode]]<<oracle-property-snapshot-mode, `+snapshot.mode+`>> |[[oracle-property-snapshot-mode]]<<oracle-property-snapshot-mode, `+snapshot.mode+`>>
|_initial_ |_initial_
|A mode for taking an initial snapshot of the structure and optionally data of captured tables. Supported values are _initial_ (will take a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables) and _schema_only_ (will take a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics). Once the snapshot is complete, the connector will continue reading change events from the database's redo logs. |A mode for taking an initial snapshot of the structure and optionally data of captured tables. Supported values are _initial_ (will take a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables) and _schema_only_ (will take a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics). Once the snapshot is complete, the connector will continue reading change events from the database's redo logs.
@ -1760,6 +1766,11 @@ See <<oracle-property-log-mining-transaction-retention-hours, `log.mining.transa
This is an indicator that long-running transaction(s) are ongoing and preventing the connector from flushing the latest processed system change number to the connector's offsets. This is an indicator that long-running transaction(s) are ongoing and preventing the connector from flushing the latest processed system change number to the connector's offsets.
Under optimal operations, this should always be or remain close to `0`. Under optimal operations, this should always be or remain close to `0`.
|[[oracle-streaming-metrics-unparsable-ddl-count]]<<oracle-streaming-metrics-unparsable-ddl-count, `+UnparsableDdlCount+`>>
|`int`
|The number of DDL records that have been detected but could not be parsed by the DDL parser.
This should always be `0`; however when allowing unparsable DDL to be skipped, this metric can be used to determine if any warnings have been written to the connector logs.
|=== |===
[[oracle-monitoring-schema-history]] [[oracle-monitoring-schema-history]]