DBZ-3621 Make SELECT to check table presence configurable

This commit is contained in:
Jiri Pechanec 2023-05-02 14:08:53 +02:00
parent 6f72efa5eb
commit d1ee10af05

View File

@ -23,7 +23,7 @@ public class JdbcSchemaHistoryConfig extends JdbcCommonConfig {
private static final String DEFAULT_TABLE_NAME = "debezium_database_history";
private static final Field PROP_TABLE_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "schema_history_table_name")
.withDescription("The Redis key -that will be used to store the database schema history")
.withDescription("The database key that will be used to store the database schema history")
.withDefault(DEFAULT_TABLE_NAME);
/**
@ -43,20 +43,31 @@ public class JdbcSchemaHistoryConfig extends JdbcCommonConfig {
"record_insert_seq INTEGER NOT NULL" +
")";
// Field that will store the Create Table DDL for schema history.
/**
* Field that will store the CREATE TABLE DDL for schema history.
*/
public static final Field PROP_TABLE_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "schema_history_table_ddl")
.withDescription("Create table syntax for schema history table")
.withDescription("CREATE TABLE statement for schema history table")
.withDefault(DEFAULT_TABLE_DDL);
private static final String DEFAULT_TABLE_SELECT = "SELECT id, history_data, history_data_seq FROM %s"
+ " ORDER BY record_insert_ts, record_insert_seq, id, history_data_seq";
// Field that will store the Schema history Select query.
/**
* Field that will store the Schema history SELECT query.
*/
public static final Field PROP_TABLE_SELECT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "schema_history_table_select")
.withDescription("Select syntax to get schema history from jdbc table")
.withDescription("SELECT statement to get the schema history from a database table")
.withDefault(DEFAULT_TABLE_SELECT);
private static final String TABLE_DATA_EXISTS_SELECT = "SELECT * FROM %s LIMIT 1";
private static final String DEFAULT_TABLE_DATA_EXISTS_SELECT = "SELECT * FROM %s LIMIT 1";
/**
* Field that will store the Schema history SELECT query to check existence of the table.
*/
public static final Field PROP_TABLE_DATA_EXISTS_SELECT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "schema_history_table_select")
.withDescription("SELECT statement to check existence of the storage table")
.withDefault(DEFAULT_TABLE_DATA_EXISTS_SELECT);
private static final String TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ?, ?, ? )";
@ -76,13 +87,13 @@ protected void init(Configuration config) {
this.tableName = config.getString(PROP_TABLE_NAME);
this.tableCreate = String.format(config.getString(PROP_TABLE_DDL), tableName);
this.tableSelect = String.format(config.getString(PROP_TABLE_SELECT), tableName);
this.tableDataExistsSelect = String.format(TABLE_DATA_EXISTS_SELECT, tableName);
this.tableDataExistsSelect = String.format(config.getString(PROP_TABLE_DATA_EXISTS_SELECT), tableName);
this.tableInsert = String.format(TABLE_INSERT, tableName);
}
@Override
protected List<Field> getAllConfigurationFields() {
List<Field> fields = Collect.arrayListOf(PROP_TABLE_NAME, PROP_TABLE_DDL, PROP_TABLE_SELECT);
List<Field> fields = Collect.arrayListOf(PROP_TABLE_NAME, PROP_TABLE_DDL, PROP_TABLE_SELECT, PROP_TABLE_DATA_EXISTS_SELECT);
fields.addAll(super.getAllConfigurationFields());
return fields;
}