From d1ee10af0528d168cce4f51cc740148532b77aae Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 2 May 2023 14:08:53 +0200 Subject: [PATCH] DBZ-3621 Make SELECT to check table presence configurable --- .../jdbc/history/JdbcSchemaHistoryConfig.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistoryConfig.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistoryConfig.java index 1da3dfbc2..459f5ccb3 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistoryConfig.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistoryConfig.java @@ -23,7 +23,7 @@ public class JdbcSchemaHistoryConfig extends JdbcCommonConfig { private static final String DEFAULT_TABLE_NAME = "debezium_database_history"; private static final Field PROP_TABLE_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "schema_history_table_name") - .withDescription("The Redis key -that will be used to store the database schema history") + .withDescription("The database key that will be used to store the database schema history") .withDefault(DEFAULT_TABLE_NAME); /** @@ -43,20 +43,31 @@ public class JdbcSchemaHistoryConfig extends JdbcCommonConfig { "record_insert_seq INTEGER NOT NULL" + ")"; - // Field that will store the Create Table DDL for schema history. + /** + * Field that will store the CREATE TABLE DDL for schema history. + */ public static final Field PROP_TABLE_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "schema_history_table_ddl") - .withDescription("Create table syntax for schema history table") + .withDescription("CREATE TABLE statement for schema history table") .withDefault(DEFAULT_TABLE_DDL); private static final String DEFAULT_TABLE_SELECT = "SELECT id, history_data, history_data_seq FROM %s" + " ORDER BY record_insert_ts, record_insert_seq, id, history_data_seq"; - // Field that will store the Schema history Select query. + /** + * Field that will store the Schema history SELECT query. + */ public static final Field PROP_TABLE_SELECT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "schema_history_table_select") - .withDescription("Select syntax to get schema history from jdbc table") + .withDescription("SELECT statement to get the schema history from a database table") .withDefault(DEFAULT_TABLE_SELECT); - private static final String TABLE_DATA_EXISTS_SELECT = "SELECT * FROM %s LIMIT 1"; + private static final String DEFAULT_TABLE_DATA_EXISTS_SELECT = "SELECT * FROM %s LIMIT 1"; + + /** + * Field that will store the Schema history SELECT query to check existence of the table. + */ + public static final Field PROP_TABLE_DATA_EXISTS_SELECT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "schema_history_table_select") + .withDescription("SELECT statement to check existence of the storage table") + .withDefault(DEFAULT_TABLE_DATA_EXISTS_SELECT); private static final String TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ?, ?, ? )"; @@ -76,13 +87,13 @@ protected void init(Configuration config) { this.tableName = config.getString(PROP_TABLE_NAME); this.tableCreate = String.format(config.getString(PROP_TABLE_DDL), tableName); this.tableSelect = String.format(config.getString(PROP_TABLE_SELECT), tableName); - this.tableDataExistsSelect = String.format(TABLE_DATA_EXISTS_SELECT, tableName); + this.tableDataExistsSelect = String.format(config.getString(PROP_TABLE_DATA_EXISTS_SELECT), tableName); this.tableInsert = String.format(TABLE_INSERT, tableName); } @Override protected List getAllConfigurationFields() { - List fields = Collect.arrayListOf(PROP_TABLE_NAME, PROP_TABLE_DDL, PROP_TABLE_SELECT); + List fields = Collect.arrayListOf(PROP_TABLE_NAME, PROP_TABLE_DDL, PROP_TABLE_SELECT, PROP_TABLE_DATA_EXISTS_SELECT); fields.addAll(super.getAllConfigurationFields()); return fields; }