From 92b9558218494c682bc139ec7cf146ae8357f959 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Thu, 11 Feb 2021 11:26:45 -0500 Subject: [PATCH] DBZ-3078 Initial implementation of FastDmlParser --- .../oracle/logminer/FastDmlParser.java | 472 ++++++++++++++++++ .../oracle/logminer/FastDmlParserTest.java | 340 +++++++++++++ 2 files changed, 812 insertions(+) create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/FastDmlParser.java create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/FastDmlParserTest.java diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/FastDmlParser.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/FastDmlParser.java new file mode 100644 index 000000000..04b861a53 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/FastDmlParser.java @@ -0,0 +1,472 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import io.debezium.DebeziumException; +import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue; +import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueImpl; +import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry; +import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl; +import io.debezium.data.Envelope; + +/** + * @author Chris Cranford + */ +public class FastDmlParser { + + private static final String SINGLE_QUOTE = "'"; + private static final String NULL = "NULL"; + private static final String INSERT_INTO = "insert into "; + private static final String UPDATE = "update "; + private static final String DELETE = "delete "; + private static final String AND = "and "; + private static final String OR = "or "; + private static final String SET = " set "; + private static final String WHERE = " where "; + private static final String VALUES = " values "; + + /** + * Parse a DML SQL statement. + * + * @param sql the sql statement + * @return the parsed DML entry record or {@code null} if the SQL was not parsed + */ + public LogMinerDmlEntry parse(String sql) { + if (sql.startsWith(INSERT_INTO)) { + return parseInsert(sql); + } + else if (sql.startsWith(UPDATE)) { + return parseUpdate(sql); + } + else if (sql.startsWith(DELETE)) { + return parseDelete(sql); + } + return null; + } + + /** + * Parse an {@code INSERT} SQL statement. + * + * @param sql the sql statement + * @return the parsed DML entry record or {@code null} if the SQL was not parsed + */ + private LogMinerDmlEntry parseInsert(String sql) { + // advance beyond "insert into " + int index = 12; + + // parse table + index = parseTableName(sql, index); + + // capture column names + List columnNames = new ArrayList<>(); + index = parseColumnListClause(sql, index, columnNames); + + // capture values + List columnValues = new ArrayList<>(); + index = parseColumnValuesClause(sql, index, columnValues); + + List newValues = new ArrayList<>(); + for (int i = 0; i < columnNames.size(); ++i) { + newValues.add(createColumnValue(columnNames.get(i), columnValues.get(i))); + } + + return new LogMinerDmlEntryImpl(Envelope.Operation.CREATE, newValues, Collections.emptyList()); + } + + /** + * Parse an {@code UPDATE} SQL statement. + * + * @param sql the sql statement + * @return the parsed DML entry record or {@code null} if the SQL was not parsed + */ + private LogMinerDmlEntry parseUpdate(String sql) { + // advance beyond "update " + int index = 7; + + // parse table + index = parseTableName(sql, index); + + // parse set + List newColumnNames = new ArrayList<>(); + List newColumnValues = new ArrayList<>(); + index = parseSetClause(sql, index, newColumnNames, newColumnValues); + + // parse where + List oldColumnNames = new ArrayList<>(); + List oldColumnValues = new ArrayList<>(); + parseWhereClause(sql, index, oldColumnNames, oldColumnValues); + + List oldValues = new ArrayList<>(); + for (int i = 0; i < oldColumnNames.size(); ++i) { + LogMinerColumnValue value = new LogMinerColumnValueImpl(oldColumnNames.get(i), 0); + value.setColumnData(oldColumnValues.get(i)); + oldValues.add(value); + } + + List newValues = new ArrayList<>(); + for (LogMinerColumnValue oldValue : oldValues) { + boolean found = false; + for (int j = 0; j < newColumnNames.size(); ++j) { + if (newColumnNames.get(j).equals(oldValue.getColumnName())) { + newValues.add(createColumnValue(newColumnNames.get(j), newColumnValues.get(j))); + found = true; + break; + } + } + if (!found) { + newValues.add(oldValue); + } + } + + return new LogMinerDmlEntryImpl(Envelope.Operation.UPDATE, newValues, oldValues); + } + + /** + * Parses a SQL {@code DELETE} statement. + * + * @param sql the sql statement + * @return the parsed DML entry record or {@code null} if the SQL was not parsed + */ + private LogMinerDmlEntry parseDelete(String sql) { + // advance beyond "delete from " + int index = 12; + + // parse table + index = parseTableName(sql, index); + + // parse where + List columnNames = new ArrayList<>(); + List columnValues = new ArrayList<>(); + parseWhereClause(sql, index, columnNames, columnValues); + + List oldValues = new ArrayList<>(); + for (int i = 0; i < columnNames.size(); ++i) { + oldValues.add(createColumnValue(columnNames.get(i), columnValues.get(i))); + } + + return new LogMinerDmlEntryImpl(Envelope.Operation.DELETE, Collections.emptyList(), oldValues); + } + + /** + * Parses a table-name in the SQL clause + * + * @param sql the sql statement + * @param index the index into the sql statement to begin parsing + * @return the index into the sql string where the table name ended + */ + private int parseTableName(String sql, int index) { + boolean inQuote = false; + + for (; index < sql.length(); ++index) { + char c = sql.charAt(index); + if (c == '"') { + if (inQuote) { + inQuote = false; + continue; + } + inQuote = true; + } + else if (c == ' ' || c == '(' && !inQuote) { + break; + } + } + + return index; + } + + /** + * Parse an {@code INSERT} statement's column-list clause. + * + * @param sql the sql statement + * @param start the index into the sql statement to begin parsing + * @param columnNames the list that will be populated with the column names + * @return the index into the sql string where the column-list clause ended + */ + private int parseColumnListClause(String sql, int start, List columnNames) { + int index = start; + boolean inQuote = false; + for (; index < sql.length(); ++index) { + char c = sql.charAt(index); + if (c == '(' && !inQuote) { + start = index + 1; + } + else if (c == ')' && !inQuote) { + index++; + break; + } + else if (c == '"') { + if (inQuote) { + inQuote = false; + columnNames.add(sql.substring(start + 1, index)); + start = index + 2; + continue; + } + inQuote = true; + } + } + return index; + } + + /** + * Parse an {@code INSERT} statement's column-values clause. + * + * @param sql the sql statement + * @param start the index into the sql statement to begin parsing + * @param columnValues the list of that will populated with the column values + * @return the index into the sql string where the column-values clause ended + */ + private int parseColumnValuesClause(String sql, int start, List columnValues) { + int index = start; + int nested = 0; + boolean inQuote = false; + boolean inValues = false; + + // verify entering values-clause + if (!sql.substring(index, index + 8).equals(VALUES)) { + throw new DebeziumException("Failed to parse DML: " + sql); + } + index += VALUES.length(); + + for (; index < sql.length(); ++index) { + char c = sql.charAt(index); + if (c == '(' && !inQuote && !inValues) { + inValues = true; + start = index + 1; + } + else if (c == '(' && !inQuote) { + nested++; + } + else if (c == '\'') { + if (inQuote) { + inQuote = false; + continue; + } + inQuote = true; + } + else if (!inQuote && (c == ',' || c == ')')) { + if (c == ')' && nested != 0) { + nested--; + continue; + } + if (c == ',' && nested != 0) { + continue; + } + String s = sql.substring(start, index); + if (s.startsWith("'") && s.endsWith("'")) { + // if the value is single-quoted at the start/end, clear the quotes. + s = s.substring(1, s.length() - 1); + } + columnValues.add(s); + start = index + 1; + } + } + + return index; + } + + /** + * Parse an {@code UPDATE} statement's {@code SET} clause. + * + * @param sql the sql statement + * @param start the index into the sql statement to begin parsing + * @param columnNames the list of the changed column names that will be populated + * @param columnValues the list of the changed column values that will be populated + * @return the index into the sql string where the set-clause ended + */ + private int parseSetClause(String sql, int start, List columnNames, List columnValues) { + boolean inQuote = false; + boolean inSingleQuote = false; + boolean inColumnName = true; + boolean inColumnValue = false; + int nested = 0; + + // verify entering set-clause + if (!sql.substring(start, start + 5).equals(SET)) { + throw new DebeziumException("Failed to parse DML: " + sql); + } + start += SET.length(); + + int index = start; + for (; index < sql.length(); ++index) { + char c = sql.charAt(index); + if (c == '"') { + // where clause column names are double-quoted + if (inQuote) { + inQuote = false; + continue; + } + inQuote = true; + } + else if (c == '\'') { + if (inSingleQuote) { + inSingleQuote = false; + continue; + } + inSingleQuote = true; + } + else if (c == '=' && !inQuote && inColumnName) { + String s = sql.substring(start + 1, index - 2); + start = index + 2; + columnNames.add(s); + inColumnValue = true; + inColumnName = false; + index++; + } + else if (c == '(' && !inQuote && !inSingleQuote && inColumnValue) { + nested++; + } + else if (c == ')' && !inQuote && !inSingleQuote && inColumnValue && nested > 0) { + nested--; + } + else if ((c == ',' || c == ' ') && !inQuote && !inSingleQuote && inColumnValue) { + if (nested > 0) { + continue; + } + String s = sql.substring(start, index); + if (s.startsWith("'") && s.endsWith("'")) { + s = s.substring(1, s.length() - 1); + } + inColumnValue = false; + columnValues.add(s); + if (c == ',') { + start = index + 2; + inColumnName = true; + } + else { + start = index; + } + } + else if (!inQuote && !inColumnValue && sql.substring(index).startsWith("where ")) { + index--; + break; + } + else if (!inQuote && !inColumnName && sql.substring(index).startsWith("and ")) { + index += 3; + start = index + 1; + inColumnName = true; + } + else if (!inQuote && !inColumnName && sql.substring(index).startsWith("or ")) { + index += 2; + start = index + 1; + inColumnName = true; + } + } + + return index; + } + + /** + * Parses a {@code WHERE} clause populates the provided column names and values arrays. + * + * @param sql the sql statement + * @param start the index into the sql statement to begin parsing + * @param columnNames the column names parsed from the clause + * @param columnValues the column values parsed from the clause + * @return the index into the sql string to continue parsing + */ + private int parseWhereClause(String sql, int start, List columnNames, List columnValues) { + int nested = 0; + boolean inColumnName = true; + boolean inColumnValue = false; + boolean inQuote = false; + boolean inSingleQuote = false; + + // verify entering where-clause + if (!sql.substring(start, start + 7).equals(WHERE)) { + throw new DebeziumException("Failed to parse DML: " + sql); + } + start += WHERE.length(); + + int index = start; + for (; index < sql.length(); ++index) { + char c = sql.charAt(index); + if (c == '"') { + // where clause column names are double-quoted + if (inQuote) { + inQuote = false; + continue; + } + inQuote = true; + } + else if (c == '=' && !inQuote && inColumnName) { + String s = sql.substring(start + 1, index - 2); + start = index + 2; + columnNames.add(s); + inColumnValue = true; + inColumnName = false; + index++; + } + else if (c == '\'') { + if (inSingleQuote) { + inSingleQuote = false; + continue; + } + inSingleQuote = true; + } + else if (c == '(' && !inQuote && !inSingleQuote && inColumnValue) { + nested++; + } + else if (c == ')' && !inQuote && !inSingleQuote && inColumnValue && nested > 0) { + nested--; + } + else if (c == ' ' && !inQuote && !inSingleQuote && inColumnValue) { + if (nested > 0) { + continue; + } + columnValues.add(removeSingleQuotes(sql.substring(start, index))); + inColumnValue = false; + start = index; + } + else if (!inQuote && !inColumnName && sql.substring(index).startsWith(AND)) { + index += 3; + start = index + 1; + inColumnName = true; + } + else if (!inQuote && !inColumnName && sql.substring(index).startsWith(OR)) { + index += 2; + start = index + 1; + inColumnName = true; + } + else if (c == ';' && !inQuote && !inSingleQuote && inColumnValue) { + columnValues.add(removeSingleQuotes(sql.substring(start, index))); + } + } + + return index; + } + + /** + * Remove {@code '} quotes from around the provided text if they exist; otherwise the value is returned as-is. + * + * @param text the text to remove single quotes + * @return the text with single quotes removed + */ + private static String removeSingleQuotes(String text) { + if (text.startsWith(SINGLE_QUOTE) && text.endsWith(SINGLE_QUOTE)) { + return text.substring(1, text.length() - 1); + } + return text; + } + + /** + * Helper method to create a {@link LogMinerColumnValue} from a column name/value pair. + * + * @param columnName the column name + * @param columnValue the column value + * @return the LogMiner column value object + */ + private static LogMinerColumnValue createColumnValue(String columnName, String columnValue) { + LogMinerColumnValue value = new LogMinerColumnValueImpl(columnName, 0); + if (columnValue != null && !columnValue.equals(NULL)) { + value.setColumnData(columnValue); + } + return value; + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/FastDmlParserTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/FastDmlParserTest.java new file mode 100644 index 000000000..d6042fa1c --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/FastDmlParserTest.java @@ -0,0 +1,340 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer; + +import static org.fest.assertions.Assertions.assertThat; + +import java.sql.Types; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import org.junit.Before; +import org.junit.Test; + +import io.debezium.connector.oracle.OracleConnection; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.jsqlparser.SimpleDmlParser; +import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry; +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.data.Envelope.Operation; +import io.debezium.doc.FixFor; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.Table; +import io.debezium.relational.TableEditor; +import io.debezium.relational.TableId; +import io.debezium.relational.Tables; +import io.debezium.util.Testing; + +/** + * @author Chris Cranford + */ +public class FastDmlParserTest { + + private static final String CATALOG_NAME = "ORCLCDB1"; + private static final String SCHEMA_NAME = "DEBEZIUM"; + private final List iterations = Arrays.asList(1000, 5000, 10000, 20000, 50000, 100000, 500000, 1000000); + + private SimpleDmlParser simpleDmlParser; + private FastDmlParser fastDmlParser; + + @Before + public void beforeEach() throws Exception { + // Create SimpleDmlParser + OracleConnection jdbcConnection = TestHelper.testConnection(); + OracleConnectorConfig connectorConfig = new OracleConnectorConfig(TestHelper.defaultConfig().build()); + OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(connectorConfig, jdbcConnection); + simpleDmlParser = new SimpleDmlParser(CATALOG_NAME, SCHEMA_NAME, converters); + + // Create FastDmlParser + fastDmlParser = new FastDmlParser(); + } + + // Oracle's generated SQL avoids common spacing patterns such as spaces between column values or values + // in an insert statement and is explicit about spacing and commas with SET and WHERE clauses. As of + // now the parser expects this explicit spacing usage. + + @Test + @FixFor("DBZ-3078") + public void testParsingInsert() throws Exception { + String sql = "insert into \"DEBEZIUM\".\"TEST\"(\"ID\",\"NAME\",\"TS\",\"DATE\") values " + + "('1','Acme',TO_TIMESTAMP('2020-02-01 00:00:00.'),TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS'));"; + + LogMinerDmlEntry entry = fastDmlParser.parse(sql); + assertThat(entry.getCommandType()).isEqualTo(Operation.CREATE); + assertThat(entry.getOldValues()).isEmpty(); + assertThat(entry.getNewValues()).hasSize(4); + assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("TS"); + assertThat(entry.getNewValues().get(3).getColumnName()).isEqualTo("DATE"); + assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("1"); + assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("Acme"); + assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-01 00:00:00.')"); + assertThat(entry.getNewValues().get(3).getColumnData()).isEqualTo("TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS')"); + } + + @Test + @FixFor("DBZ-3078") + public void testParsingUpdate() throws Exception { + String sql = "update \"DEBEZIUM\".\"TEST\" " + + "set \"NAME\" = 'Bob', \"TS\" = TO_TIMESTAMP('2020-02-02 00:00:00.'), \"DATE\" = TO_DATE('2020-02-02 00:00:00', 'YYYY-MM-DD HH24:MI:SS') " + + "where \"ID\" = '1' and \"NAME\" = 'Acme' and \"TS\" = TO_TIMESTAMP('2020-02-01 00:00:00.') and \"DATE\" = TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS');"; + + LogMinerDmlEntry entry = fastDmlParser.parse(sql); + assertThat(entry.getCommandType()).isEqualTo(Operation.UPDATE); + assertThat(entry.getOldValues()).hasSize(4); + assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("TS"); + assertThat(entry.getOldValues().get(3).getColumnName()).isEqualTo("DATE"); + assertThat(entry.getOldValues().get(0).getColumnData()).isEqualTo("1"); + assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("Acme"); + assertThat(entry.getOldValues().get(2).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-01 00:00:00.')"); + assertThat(entry.getOldValues().get(3).getColumnData()).isEqualTo("TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS')"); + assertThat(entry.getNewValues()).hasSize(4); + assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("TS"); + assertThat(entry.getNewValues().get(3).getColumnName()).isEqualTo("DATE"); + assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("1"); + assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("Bob"); + assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-02 00:00:00.')"); + assertThat(entry.getNewValues().get(3).getColumnData()).isEqualTo("TO_DATE('2020-02-02 00:00:00', 'YYYY-MM-DD HH24:MI:SS')"); + } + + @Test + @FixFor("DBZ-3078") + public void testParsingDelete() throws Exception { + String sql = "delete from \"DEBEZIUM\".\"TEST\" " + + "where \"ID\" = '1' and \"NAME\" = 'Acme' and \"TS\" = TO_TIMESTAMP('2020-02-01 00:00:00.') and \"DATE\" = TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS');"; + + LogMinerDmlEntry entry = fastDmlParser.parse(sql); + assertThat(entry.getCommandType()).isEqualTo(Operation.DELETE); + assertThat(entry.getOldValues()).hasSize(4); + assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("TS"); + assertThat(entry.getOldValues().get(3).getColumnName()).isEqualTo("DATE"); + assertThat(entry.getOldValues().get(0).getColumnData()).isEqualTo("1"); + assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("Acme"); + assertThat(entry.getOldValues().get(2).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-01 00:00:00.')"); + assertThat(entry.getOldValues().get(3).getColumnData()).isEqualTo("TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS')"); + assertThat(entry.getNewValues()).isEmpty(); + } + + @Test + @FixFor("DBZ-3078") + public void benchmarkInserts() throws Exception { + Testing.Print.enable(); + + final String STATEMENT = "insert into \"DEBEZIUM\".\"TEST\"(\"ID\",\"NAME\",\"CREATED\") values ('0','Test0',TO_TIMESTAMP('2020-02-01 00:00:00'));"; + + TableEditor editor = Table.editor(); + editor.tableId(new TableId(CATALOG_NAME, SCHEMA_NAME, "TEST")); + + ColumnEditor columnEditor = Column.editor(); + Column c1 = columnEditor.name("ID").type("NUMERIC").jdbcType(Types.NUMERIC).create(); + Column c2 = columnEditor.name("NAME").type("VARCHAR2").jdbcType(Types.VARCHAR).create(); + Column c3 = columnEditor.name("CREATED").type("TIMESTAMP").jdbcType(Types.TIMESTAMP).create(); + Table table = editor.addColumns(c1, c2, c3).create(); + + Tables tables = new Tables(); + tables.overwriteTable(table); + + benchmark(iterations, + tables, + entry -> { + assertThat(entry).isNotNull(); + assertThat(entry.getCommandType()).isEqualTo(Operation.CREATE); + assertThat(entry.getOldValues()).isEmpty(); + assertThat(entry.getNewValues()).isNotEmpty(); + assertThat(entry.getNewValues()).hasSize(3); + assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("CREATED"); + assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("Test0"); + assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo(1580515200000L); + }, + entry -> { + assertThat(entry).isNotNull(); + assertThat(entry.getCommandType()).isEqualTo(Operation.CREATE); + assertThat(entry.getOldValues()).isEmpty(); + assertThat(entry.getNewValues()).isNotEmpty(); + assertThat(entry.getNewValues()).hasSize(3); + assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("CREATED"); + assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("0"); + assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("Test0"); + assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-01 00:00:00')"); + }, + STATEMENT); + } + + @Test + @FixFor("DBZ-3078") + public void benchmarkUpdates() throws Exception { + Testing.Print.enable(); + + final String STATEMENT = "update \"DEBEZIUM\".\"TEST\" set \"NAME\" = 'TEST0', \"SCORE\" = '1234.56' " + + "where \"ID\" = '0' and \"NAME\" = 'Test0' and \"SCORE\" = '2345.67' and \"CREATED\" = TO_TIMESTAMP('2020-02-01 00:00:00.');"; + + TableEditor editor = Table.editor(); + editor.tableId(new TableId(CATALOG_NAME, SCHEMA_NAME, "TEST")); + + ColumnEditor columnEditor = Column.editor(); + Column c1 = columnEditor.name("ID").type("NUMERIC").jdbcType(Types.NUMERIC).create(); + Column c2 = columnEditor.name("NAME").type("VARCHAR2").jdbcType(Types.VARCHAR).create(); + Column c3 = columnEditor.name("SCORE").type("FLOAT").jdbcType(Types.FLOAT).create(); + Column c4 = columnEditor.name("CREATED").type("TIMESTAMP").jdbcType(Types.TIMESTAMP).create(); + Table table = editor.addColumns(c1, c2, c3, c4).create(); + + Tables tables = new Tables(); + tables.overwriteTable(table); + + benchmark(iterations, + tables, + entry -> { + assertThat(entry).isNotNull(); + assertThat(entry.getCommandType()).isEqualTo(Operation.UPDATE); + assertThat(entry.getOldValues()).hasSize(4); + assertThat(entry.getNewValues()).hasSize(4); + + assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("SCORE"); + assertThat(entry.getOldValues().get(3).getColumnName()).isEqualTo("CREATED"); + assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("SCORE"); + assertThat(entry.getNewValues().get(3).getColumnName()).isEqualTo("CREATED"); + + assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("Test0"); + assertThat(entry.getOldValues().get(2).getColumnData()).isEqualTo(2345.67f); + assertThat(entry.getNewValues().get(3).getColumnData()).isEqualTo(1580515200000L); + assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("TEST0"); + assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo(1234.56f); + assertThat(entry.getNewValues().get(3).getColumnData()).isEqualTo(1580515200000L); + }, + entry -> { + assertThat(entry).isNotNull(); + assertThat(entry.getCommandType()).isEqualTo(Operation.UPDATE); + assertThat(entry.getOldValues()).hasSize(4); + assertThat(entry.getNewValues()).hasSize(4); + + assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("SCORE"); + assertThat(entry.getOldValues().get(3).getColumnName()).isEqualTo("CREATED"); + assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("SCORE"); + assertThat(entry.getNewValues().get(3).getColumnName()).isEqualTo("CREATED"); + + assertThat(entry.getOldValues().get(0).getColumnData()).isEqualTo("0"); + assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("Test0"); + assertThat(entry.getOldValues().get(2).getColumnData()).isEqualTo("2345.67"); + assertThat(entry.getNewValues().get(3).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-01 00:00:00.')"); + assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("0"); + assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("TEST0"); + assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo("1234.56"); + assertThat(entry.getNewValues().get(3).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-01 00:00:00.')"); + }, + STATEMENT); + } + + @Test + @FixFor("DBZ-3078") + public void benchmarkDeletes() throws Exception { + Testing.Print.enable(); + + final String STATEMENT = "delete from \"DEBEZIUM\".\"TEST\" where \"ID\" = '2' and \"NAME\" = 'TEST2' and \"CREATED\" = TO_TIMESTAMP('2020-02-01 00:00:00.');"; + + TableEditor editor = Table.editor(); + editor.tableId(new TableId(CATALOG_NAME, SCHEMA_NAME, "TEST")); + + ColumnEditor columnEditor = Column.editor(); + Column c1 = columnEditor.name("ID").type("NUMERIC").jdbcType(Types.NUMERIC).create(); + Column c2 = columnEditor.name("NAME").type("VARCHAR2").jdbcType(Types.VARCHAR).create(); + Column c3 = columnEditor.name("CREATED").type("TIMESTAMP").jdbcType(Types.TIMESTAMP).create(); + Table table = editor.addColumns(c1, c2, c3).create(); + + Tables tables = new Tables(); + tables.overwriteTable(table); + + benchmark(iterations, + tables, + entry -> { + assertThat(entry).isNotNull(); + assertThat(entry.getCommandType()).isEqualTo(Operation.DELETE); + assertThat(entry.getOldValues()).hasSize(3); + assertThat(entry.getNewValues()).isEmpty(); + + assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("CREATED"); + + assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("TEST2"); + assertThat(entry.getOldValues().get(2).getColumnData()).isEqualTo(1580515200000L); + }, + entry -> { + assertThat(entry).isNotNull(); + assertThat(entry.getCommandType()).isEqualTo(Operation.DELETE); + assertThat(entry.getOldValues()).hasSize(3); + assertThat(entry.getNewValues()).isEmpty(); + assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID"); + assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME"); + assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("CREATED"); + + assertThat(entry.getOldValues().get(0).getColumnData()).isEqualTo("2"); + assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("TEST2"); + assertThat(entry.getOldValues().get(2).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-01 00:00:00.')"); + }, + STATEMENT); + } + + private void benchmark(List iterationList, Tables tables, Consumer oldValidator, Consumer newValidator, String... sql) { + Map> metrics = new LinkedHashMap<>(); + for (Integer iterations : iterationList) { + List parses = metrics.computeIfAbsent(iterations.toString(), e -> new ArrayList<>()); + Duration time = Duration.ZERO; + for (int i = 0; i < iterations; ++i) { + for (int j = 0; j < sql.length; ++j) { + Instant s = Instant.now(); + LogMinerDmlEntry entry = simpleDmlParser.parse(sql[j], tables, "1234567890"); + time = time.plus(Duration.between(s, Instant.now())); + oldValidator.accept(entry); + } + } + double pps = (iterations / (time.toMillis() / 1000.f)); + parses.add(Double.isInfinite(pps) ? "Infinite" : String.format("%.0f", pps)); + + time = Duration.ZERO; + for (int i = 0; i < iterations; ++i) { + for (int j = 0; j < sql.length; ++j) { + Instant s = Instant.now(); + LogMinerDmlEntry entry = fastDmlParser.parse(sql[j]); + time = time.plus(Duration.between(s, Instant.now())); + newValidator.accept(entry); + } + } + pps = (iterations / (time.toMillis() / 1000.f)); + parses.add(Double.isInfinite(pps) ? "Infinity" : String.format("%.0f", pps)); + } + + Testing.print("||Iterations||Old Parses/Sec||New Parses/Sec||"); + for (Map.Entry> entry : metrics.entrySet()) { + Testing.print(entry.getKey() + "|" + entry.getValue().get(0) + "|" + entry.getValue().get(1)); + } + } +}