DBZ-3966 Remove defaultValue and keep defaultValueExpression

This commit is contained in:
jiabao.sun 2021-10-29 19:39:20 +08:00 committed by Gunnar Morling
parent 6aa9b2ea96
commit 24ead16c24
43 changed files with 576 additions and 596 deletions

View File

@ -21,7 +21,6 @@
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.mysql.MySqlSystemVariables.MySqlScope;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.SystemVariables;
@ -77,7 +76,6 @@ public class MySqlDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES");
private final DdlParser ddlParser;
private final DefaultValueConverter defaultValueConverter;
private final RelationalTableFilters filters;
private final DdlChanges ddlChanges;
private final Map<Long, TableId> tableIdsByTableNumber = new ConcurrentHashMap<>();
@ -94,6 +92,7 @@ public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConve
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(),
new TableSchemaBuilder(
valueConverter,
new MySqlDefaultValueConverter(valueConverter),
schemaNameAdjuster,
connectorConfig.customConverterRegistry(),
connectorConfig.getSourceInfoStructMaker().schema(),
@ -101,7 +100,6 @@ public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConve
false),
tableIdCaseInsensitive, connectorConfig.getKeyMapper());
this.defaultValueConverter = new MySqlDefaultValueConverter(valueConverter);
this.ddlParser = new MySqlAntlrDdlParser(
true,
false,
@ -332,11 +330,6 @@ protected DdlParser getDdlParser() {
return ddlParser;
}
@Override
public DefaultValueConverter getDefaultValueConverter() {
return defaultValueConverter;
}
/**
* Return true if the database history entity exists
*/

View File

@ -27,7 +27,6 @@
import io.debezium.annotation.Immutable;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.ValueConverter;
import io.debezium.util.Collect;
@ -75,12 +74,29 @@ public MySqlDefaultValueConverter(MySqlValueConverters converters) {
* recognized by value converters for a subset of types.
*
* @param column the column definition describing the {@code data} value; never null
* @param defaultValue the default value; may be null
* @param defaultValueExpression the default value literal; may be null
* @return value converted to a Java type; optional
*/
@Override
public Optional<Object> parseDefaultValue(Column column, String defaultValue) {
return Optional.ofNullable(convert(column, defaultValue));
public Optional<Object> parseDefaultValue(Column column, String defaultValueExpression) {
Object logicalDefaultValue = convert(column, defaultValueExpression);
if (logicalDefaultValue == null) {
return Optional.empty();
}
final SchemaBuilder schemaBuilder = converters.schemaBuilder(column);
if (schemaBuilder == null) {
return Optional.of(logicalDefaultValue);
}
final Schema schema = schemaBuilder.build();
// In order to get the valueConverter for this column, we have to create a field;
// The index value -1 in the field will never used when converting default value;
// So we can set any number here;
final Field field = new Field(column.name(), -1, schema);
final ValueConverter valueConverter = converters.converter(column, field);
return Optional.ofNullable(valueConverter.convert(logicalDefaultValue));
}
/**
@ -419,29 +435,4 @@ private String replaceFirstNonNumericSubstring(String s, int startIndex, char c)
return sb.toString();
}
@Override
public ColumnEditor setColumnDefaultValue(ColumnEditor columnEditor) {
final Column column = columnEditor.create();
// if converters is not null and the default value is not null, we need to convert default value
if (converters != null && columnEditor.defaultValueExpression() != null) {
String defaultValueExpression = columnEditor.defaultValueExpression();
final SchemaBuilder schemaBuilder = converters.schemaBuilder(column);
if (schemaBuilder == null) {
return columnEditor;
}
final Schema schema = schemaBuilder.build();
// In order to get the valueConverter for this column, we have to create a field;
// The index value -1 in the field will never used when converting default value;
// So we can set any number here;
final Field field = new Field(columnEditor.name(), -1, schema);
final ValueConverter valueConverter = converters.converter(columnEditor.create(), field);
Object defaultValue = convert(column, defaultValueExpression);
defaultValue = valueConverter.convert(defaultValue);
columnEditor.defaultValue(defaultValue);
}
return columnEditor;
}
}

View File

@ -308,8 +308,8 @@ public void parsePrimaryIndexColumnNames(MySqlParser.IndexColumnNamesContext ind
Column column = tableEditor.columnWithName(columnName);
if (column != null && column.isOptional()) {
final ColumnEditor ce = column.edit().optional(false);
if (ce.hasDefaultValue() && ce.defaultValue() == null) {
ce.unsetDefaultValue();
if (ce.hasDefaultValue() && ce.defaultValueExpression() == null) {
ce.unsetDefaultValueExpression();
}
tableEditor.addColumn(ce.create());
}

View File

@ -166,7 +166,6 @@ public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
// I'm going to leave this as is for now, to be prepared for the ability of updating column definitions in 8.0
ColumnEditor columnEditor = existingColumn.edit();
columnEditor.unsetDefaultValueExpression();
columnEditor.unsetDefaultValue();
columnDefinitionListener = new ColumnDefinitionParserListener(tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
@ -207,7 +206,7 @@ public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
Column existingColumn = tableEditor.columnWithName(columnName);
if (existingColumn != null) {
ColumnEditor columnEditor = existingColumn.edit();
columnEditor.unsetDefaultValue();
columnEditor.unsetDefaultValueExpression();
columnDefinitionListener = new ColumnDefinitionParserListener(tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
@ -267,13 +266,12 @@ public void enterAlterByChangeDefault(MySqlParser.AlterByChangeDefaultContext ct
if (column != null) {
defaultValueColumnEditor = column.edit();
if (ctx.SET() != null) {
defaultValueListener = new DefaultValueParserListener(defaultValueColumnEditor, parser.getConverters(),
new AtomicReference<Boolean>(column.isOptional()), true);
defaultValueListener = new DefaultValueParserListener(defaultValueColumnEditor,
new AtomicReference<Boolean>(column.isOptional()));
listeners.add(defaultValueListener);
}
else if (ctx.DROP() != null) {
defaultValueColumnEditor.unsetDefaultValueExpression();
defaultValueColumnEditor.unsetDefaultValue();
}
}
}, tableEditor);

View File

@ -43,27 +43,13 @@ public class ColumnDefinitionParserListener extends MySqlParserBaseListener {
private final List<ParseTreeListener> listeners;
/**
* Whether to convert the column's default value into the corresponding schema type or not. This is done for column
* definitions of ALTER TABLE statements but not for CREATE TABLE. In case of the latter, the default value
* conversion is handled by the CREATE TABLE statement listener itself, as a default character set given at the
* table level might have to be applied.
*/
private final boolean convertDefault;
public ColumnDefinitionParserListener(TableEditor tableEditor, ColumnEditor columnEditor, MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners, boolean convertDefault) {
List<ParseTreeListener> listeners) {
this.tableEditor = tableEditor;
this.columnEditor = columnEditor;
this.parser = parser;
this.dataTypeResolver = parser.dataTypeResolver();
this.listeners = listeners;
this.convertDefault = convertDefault;
}
public ColumnDefinitionParserListener(TableEditor tableEditor, ColumnEditor columnEditor, MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners) {
this(tableEditor, columnEditor, parser, listeners, true);
}
public void setColumnEditor(ColumnEditor columnEditor) {
@ -84,7 +70,7 @@ public void enterColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) {
optionalColumn = new AtomicReference<>();
resolveColumnDataType(ctx.dataType());
parser.runIfNotNull(() -> {
defaultValueListener = new DefaultValueParserListener(columnEditor, parser.getConverters(), optionalColumn, convertDefault);
defaultValueListener = new DefaultValueParserListener(columnEditor, optionalColumn);
listeners.add(defaultValueListener);
}, tableEditor);
super.enterColumnDefinition(ctx);
@ -100,7 +86,7 @@ public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) {
tableEditor.addColumn(columnEditor.create());
tableEditor.setPrimaryKeyNames(columnEditor.name());
}
defaultValueListener.convertDefaultValue(false);
defaultValueListener.exitDefaultValue(false);
parser.runIfNotNull(() -> {
listeners.remove(defaultValueListener);
}, tableEditor);

View File

@ -70,7 +70,6 @@ public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
}
return columnEditor;
})
.map(this::convertDefaultValueToSchemaType)
.map(ColumnEditor::create)
.collect(Collectors.toList()));
parser.databaseTables().overwriteTable(tableEditor.create());
@ -97,7 +96,7 @@ public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
String columnName = parser.parseName(ctx.uid());
ColumnEditor columnEditor = Column.editor().name(columnName);
if (columnDefinitionListener == null) {
columnDefinitionListener = new ColumnDefinitionParserListener(tableEditor, columnEditor, parser, listeners, false);
columnDefinitionListener = new ColumnDefinitionParserListener(tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
}
else {
@ -155,7 +154,4 @@ public void enterTableOptionComment(MySqlParser.TableOptionCommentContext ctx) {
super.enterTableOptionComment(ctx);
}
private ColumnEditor convertDefaultValueToSchemaType(ColumnEditor columnEditor) {
return defaultValueConverter.setColumnDefaultValue(columnEditor);
}
}

View File

@ -8,8 +8,6 @@
import java.util.concurrent.atomic.AtomicReference;
import io.debezium.connector.mysql.MySqlDefaultValueConverter;
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.ddl.parser.mysql.generated.MySqlParser.CurrentTimestampContext;
import io.debezium.ddl.parser.mysql.generated.MySqlParser.DefaultValueContext;
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
@ -25,23 +23,11 @@ public class DefaultValueParserListener extends MySqlParserBaseListener {
private final ColumnEditor columnEditor;
private final AtomicReference<Boolean> optionalColumn;
private final MySqlDefaultValueConverter defaultValueConverter;
/**
* Whether to convert the column's default value into the corresponding schema type or not. This is done for column
* definitions of ALTER TABLE statements but not for CREATE TABLE. In case of the latter, the default value
* conversion is handled by the CREATE TABLE statement listener itself, as a default character set given at the
* table level might have to be applied.
*/
private final boolean convertDefault;
private boolean converted;
public DefaultValueParserListener(ColumnEditor columnEditor, MySqlValueConverters converters,
AtomicReference<Boolean> optionalColumn, boolean convertDefault) {
public DefaultValueParserListener(ColumnEditor columnEditor, AtomicReference<Boolean> optionalColumn) {
this.columnEditor = columnEditor;
this.defaultValueConverter = new MySqlDefaultValueConverter(converters);
this.optionalColumn = optionalColumn;
this.convertDefault = convertDefault;
this.converted = false;
}
@ -82,28 +68,20 @@ else if (ctx.currentTimestamp() != null && !ctx.currentTimestamp().isEmpty()) {
}
}
}
convertDefaultValue(true);
exitDefaultValue(true);
super.enterDefaultValue(ctx);
}
public void convertDefaultValue(boolean skipIfUnknownOptional) {
// For CREATE TABLE are all column default values converted only after charset is known.
if (convertDefault) {
if (!converted && (optionalColumn.get() != null || !skipIfUnknownOptional)) {
convertDefaultValueToSchemaType(columnEditor);
converted = true;
public void exitDefaultValue(boolean skipIfUnknownOptional) {
boolean isOptionalColumn = optionalColumn.get() != null;
if (!converted && (isOptionalColumn || !skipIfUnknownOptional)) {
if (isOptionalColumn) {
columnEditor.optional(optionalColumn.get().booleanValue());
}
converted = true;
}
}
private void convertDefaultValueToSchemaType(ColumnEditor columnEditor) {
if (optionalColumn.get() != null) {
columnEditor.optional(optionalColumn.get().booleanValue());
}
defaultValueConverter.setColumnDefaultValue(columnEditor);
}
private String unquote(String stringLiteral) {
return stringLiteral.substring(1, stringLiteral.length() - 1);
}

View File

@ -30,7 +30,6 @@
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.SystemVariables;
import io.debezium.relational.Table;
@ -76,7 +75,6 @@ public class MySqlSchema extends RelationalDatabaseSchema {
private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES");
private final DdlParser ddlParser;
private final DefaultValueConverter defaultValueConverter;
private final Filters filters;
private final DatabaseHistory dbHistory;
private final DdlChanges ddlChanges;
@ -105,7 +103,9 @@ public MySqlSchema(MySqlConnectorConfig configuration,
TableFilter.fromPredicate(tableFilters.tableFilter()),
tableFilters.columnFilter(),
new TableSchemaBuilder(
getValueConverters(configuration), SchemaNameAdjuster.create(),
getValueConverters(configuration),
new MySqlDefaultValueConverter(getValueConverters(configuration)),
SchemaNameAdjuster.create(),
configuration.customConverterRegistry(),
configuration.getSourceInfoStructMaker().schema(),
configuration.getSanitizeFieldNames(),
@ -129,8 +129,6 @@ public MySqlSchema(MySqlConnectorConfig configuration,
this.storeOnlyCapturedTablesDdl = Boolean.valueOf(
dbHistoryConfig.getFallbackStringPropertyWithWarning(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL));
MySqlValueConverters valueConverters = getValueConverters(configuration);
this.defaultValueConverter = new MySqlDefaultValueConverter(valueConverters);
this.ddlParser = new MySqlAntlrDdlParser(
true,
false,
@ -280,7 +278,7 @@ protected void appendCreateTableStatement(StringBuilder sb, Table table) {
*/
public void loadHistory(SourceInfo startingPoint) {
tables().clear();
dbHistory.recover(startingPoint.partition(), startingPoint.offset(), tables(), ddlParser, defaultValueConverter);
dbHistory.recover(startingPoint.partition(), startingPoint.offset(), tables(), ddlParser);
recoveredTables = !tableIds().isEmpty();
refreshSchemas();
}

View File

@ -25,6 +25,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.fest.assertions.Assertions;
import org.junit.Before;
import org.junit.Test;
@ -35,9 +37,12 @@
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.SystemVariables;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.ddl.DdlChanges;
@ -46,6 +51,7 @@
import io.debezium.relational.ddl.SimpleDdlParserListener;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.IoUtil;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Testing;
/**
@ -56,12 +62,23 @@ public class MySqlAntlrDdlParserTest {
private DdlParser parser;
private Tables tables;
private SimpleDdlParserListener listener;
private MySqlValueConverters converters;
private TableSchemaBuilder tableSchemaBuilder;
@Before
public void beforeEach() {
listener = new SimpleDdlParserListener();
parser = new MysqlDdlParserWithSimpleTestListener(listener);
tables = new Tables();
converters = new MySqlValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS,
JdbcValueConverters.BigIntUnsignedMode.PRECISE,
BinaryHandlingMode.BYTES);
tableSchemaBuilder = new TableSchemaBuilder(
converters,
new MySqlDefaultValueConverter(converters),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
}
@Test
@ -231,7 +248,7 @@ public void shouldProcessExpressionWithDefault() {
assertThat(table.columns()).hasSize(2);
// The default value is computed for column dynamically so we set default to null
assertThat(table.columnWithName("bin_volume").hasDefaultValue()).isTrue();
assertThat(table.columnWithName("bin_volume").defaultValue()).isNull();
assertThat(getColumnSchema(table, "bin_volume").defaultValue()).isNull();
}
@Test
@ -384,7 +401,7 @@ public void shouldUpdateSchemaForRemovedDefaultValue() {
assertThat(table.columnWithName("id")).isNotNull();
assertThat(table.columnWithName("val1")).isNotNull();
assertThat(table.columnWithName("last_val")).isNotNull();
assertThat(table.columnWithName("last_val").defaultValue()).isNull();
assertThat(getColumnSchema(table, "last_val").defaultValue()).isNull();
parser.parse("ALTER TABLE mytable CHANGE COLUMN last_val last_val INT NOT NULL;", tables);
assertThat(((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size()).isEqualTo(0);
@ -413,7 +430,7 @@ public void shouldUpdateSchemaForChangedDefaultValue() {
assertThat(table.columnWithName("id")).isNotNull();
assertThat(table.columnWithName("val1")).isNotNull();
assertThat(table.columnWithName("last_val")).isNotNull();
assertThat(table.columnWithName("last_val").defaultValue()).isEqualTo(10);
assertThat(getColumnSchema(table, "last_val").defaultValue()).isEqualTo(10);
}
@Test
@ -2755,13 +2772,13 @@ public void parseDefaultValue() {
parser.parse(ddl, tables);
Table table = tables.forTable(new TableId(null, null, "tmp"));
assertThat(table.columnWithName("id").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("columnA").defaultValue()).isEqualTo("A");
assertThat(table.columnWithName("columnB").defaultValue()).isEqualTo(1);
assertThat(table.columnWithName("columnC").defaultValue()).isEqualTo("C");
assertThat(table.columnWithName("columnD").defaultValue()).isEqualTo(null);
assertThat(table.columnWithName("columnE").defaultValue()).isEqualTo(null);
assertThat(table.columnWithName("my_dateA").defaultValue()).isEqualTo(LocalDateTime.of(2018, 4, 27, 13, 28, 43).toEpochSecond(ZoneOffset.UTC) * 1_000);
assertThat(table.columnWithName("my_dateB").defaultValue()).isEqualTo(LocalDateTime.of(9999, 12, 31, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1_000);
assertThat(getColumnSchema(table, "columnA").defaultValue()).isEqualTo("A");
assertThat(getColumnSchema(table, "columnB").defaultValue()).isEqualTo(1);
assertThat(getColumnSchema(table, "columnC").defaultValue()).isEqualTo("C");
assertThat(getColumnSchema(table, "columnD").defaultValue()).isEqualTo(null);
assertThat(getColumnSchema(table, "columnE").defaultValue()).isEqualTo(null);
assertThat(getColumnSchema(table, "my_dateA").defaultValue()).isEqualTo(LocalDateTime.of(2018, 4, 27, 13, 28, 43).toEpochSecond(ZoneOffset.UTC) * 1_000);
assertThat(getColumnSchema(table, "my_dateB").defaultValue()).isEqualTo(LocalDateTime.of(9999, 12, 31, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1_000);
}
@Test
@ -2778,7 +2795,7 @@ public void shouldTreatPrimaryKeyColumnsImplicitlyAsNonNull() {
Table tableDef = tables.forTable(new TableId(null, null, "datadef"));
assertThat(tableDef.columnWithName("id").isOptional()).isEqualTo(false);
assertThat(tableDef.columnWithName("id").hasDefaultValue()).isEqualTo(true);
assertThat(tableDef.columnWithName("id").defaultValue()).isEqualTo(0);
assertThat(getColumnSchema(tableDef, "id").defaultValue()).isEqualTo(0);
ddl = "DROP TABLE IF EXISTS data; " +
"CREATE TABLE data(id INT DEFAULT 1, PRIMARY KEY (id))";
@ -2787,7 +2804,7 @@ public void shouldTreatPrimaryKeyColumnsImplicitlyAsNonNull() {
table = tables.forTable(new TableId(null, null, "data"));
assertThat(table.columnWithName("id").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("id").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("id").defaultValue()).isEqualTo(1);
assertThat(getColumnSchema(table, "id").defaultValue()).isEqualTo(1);
}
@Test
@ -2805,15 +2822,15 @@ public void shouldNotNullPositionBeforeOrAfterDefaultValue() {
assertThat(table.columnWithName("ts_col").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("ts_col").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col").defaultValue()).isEqualTo(isoEpoch);
assertThat(getColumnSchema(table, "ts_col").defaultValue()).isEqualTo(isoEpoch);
assertThat(table.columnWithName("ts_col2").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("ts_col2").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col2").defaultValue()).isEqualTo(isoEpoch);
assertThat(getColumnSchema(table, "ts_col2").defaultValue()).isEqualTo(isoEpoch);
assertThat(table.columnWithName("ts_col3").isOptional()).isEqualTo(true);
assertThat(table.columnWithName("ts_col3").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col3").defaultValue()).isNull();
assertThat(getColumnSchema(table, "ts_col3").defaultValue()).isNull();
final String alter1 = "ALTER TABLE my_table " +
" ADD ts_col4 TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL;";
@ -2824,7 +2841,7 @@ public void shouldNotNullPositionBeforeOrAfterDefaultValue() {
assertThat(table.columns().size()).isEqualTo(4);
assertThat(table.columnWithName("ts_col4").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("ts_col4").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col4").defaultValue()).isEqualTo(isoEpoch);
assertThat(getColumnSchema(table, "ts_col4").defaultValue()).isEqualTo(isoEpoch);
final String alter2 = "ALTER TABLE my_table " +
" ADD ts_col5 TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP";
@ -2835,7 +2852,7 @@ public void shouldNotNullPositionBeforeOrAfterDefaultValue() {
assertThat(table.columns().size()).isEqualTo(5);
assertThat(table.columnWithName("ts_col5").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("ts_col5").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col5").defaultValue()).isEqualTo(isoEpoch);
assertThat(getColumnSchema(table, "ts_col5").defaultValue()).isEqualTo(isoEpoch);
}
@Test
@ -2873,55 +2890,55 @@ public void shouldParseTimestampDefaultValue() {
Table table = tables.forTable(new TableId(null, null, "my_table"));
assertThat(table.columnWithName("ts_col01").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col01").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
assertThat(getColumnSchema(table, "ts_col01").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
assertThat(table.columnWithName("ts_col02").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col02").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
assertThat(getColumnSchema(table, "ts_col02").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
assertThat(table.columnWithName("ts_col03").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col03").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
assertThat(getColumnSchema(table, "ts_col03").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
assertThat(table.columnWithName("ts_col04").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col04").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
assertThat(getColumnSchema(table, "ts_col04").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
assertThat(table.columnWithName("ts_col05").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col05").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:00:00"));
assertThat(getColumnSchema(table, "ts_col05").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:00:00"));
assertThat(table.columnWithName("ts_col06").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col06").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:00:00"));
assertThat(getColumnSchema(table, "ts_col06").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:00:00"));
assertThat(table.columnWithName("ts_col07").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col07").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:00:00"));
assertThat(getColumnSchema(table, "ts_col07").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:00:00"));
assertThat(table.columnWithName("ts_col08").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col08").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(getColumnSchema(table, "ts_col08").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(table.columnWithName("ts_col09").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col09").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(getColumnSchema(table, "ts_col09").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(table.columnWithName("ts_col10").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col10").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(getColumnSchema(table, "ts_col10").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(table.columnWithName("ts_col11").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col11").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05.123456"));
assertThat(getColumnSchema(table, "ts_col11").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05.123456"));
assertThat(table.columnWithName("ts_col12").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col12").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(getColumnSchema(table, "ts_col12").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(table.columnWithName("ts_col13").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col13").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(getColumnSchema(table, "ts_col13").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(table.columnWithName("ts_col14").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col14").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(getColumnSchema(table, "ts_col14").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(table.columnWithName("ts_col15").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col15").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(getColumnSchema(table, "ts_col15").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(table.columnWithName("ts_col16").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col16").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(getColumnSchema(table, "ts_col16").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(table.columnWithName("ts_col17").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col17").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(getColumnSchema(table, "ts_col17").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(table.columnWithName("ts_col18").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col18").defaultValue()).isEqualTo(toIsoString("1970-01-01 00:00:01"));
assertThat(getColumnSchema(table, "ts_col18").defaultValue()).isEqualTo(toIsoString("1970-01-01 00:00:01"));
assertThat(table.columnWithName("ts_col19").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col19").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(getColumnSchema(table, "ts_col19").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(table.columnWithName("ts_col20").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col20").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(getColumnSchema(table, "ts_col20").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(table.columnWithName("ts_col21").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col21").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(getColumnSchema(table, "ts_col21").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(table.columnWithName("ts_col22").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col22").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(getColumnSchema(table, "ts_col22").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(table.columnWithName("ts_col23").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col23").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(getColumnSchema(table, "ts_col23").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(table.columnWithName("ts_col24").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col24").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(getColumnSchema(table, "ts_col24").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(table.columnWithName("ts_col25").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col25").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
assertThat(getColumnSchema(table, "ts_col25").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:00"));
final String alter1 = "ALTER TABLE my_table ADD ts_col TIMESTAMP DEFAULT '2020-01-02';";
@ -2929,7 +2946,7 @@ public void shouldParseTimestampDefaultValue() {
table = tables.forTable(new TableId(null, null, "my_table"));
assertThat(table.columnWithName("ts_col").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
assertThat(getColumnSchema(table, "ts_col").defaultValue()).isEqualTo(toIsoString("2020-01-02 00:00:00"));
final String alter2 = "ALTER TABLE my_table MODIFY ts_col TIMESTAMP DEFAULT '2020-01-02:03:04:05';";
@ -2937,7 +2954,7 @@ public void shouldParseTimestampDefaultValue() {
table = tables.forTable(new TableId(null, null, "my_table"));
assertThat(table.columnWithName("ts_col").hasDefaultValue()).isEqualTo(true);
assertThat(table.columnWithName("ts_col").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
assertThat(getColumnSchema(table, "ts_col").defaultValue()).isEqualTo(toIsoString("2020-01-02 03:04:05"));
}
private String toIsoString(String timestamp) {
@ -3094,7 +3111,7 @@ private void assertColumn(Table table, String name, String typeName, int jdbcTyp
assertThat(column.isGenerated()).isEqualTo(generated);
assertThat(column.isAutoIncremented()).isEqualTo(autoIncremented);
assertThat(column.hasDefaultValue()).isEqualTo(hasDefaultValue);
assertThat(column.defaultValue()).isEqualTo(defaultValue);
assertThat(getColumnSchema(table, name).defaultValue()).isEqualTo(defaultValue);
}
class MysqlDdlParserWithSimpleTestListener extends MySqlAntlrDdlParser {
@ -3119,13 +3136,14 @@ private MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener, boolean
super(false,
includeViews,
includeComments,
new MySqlValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS,
JdbcValueConverters.BigIntUnsignedMode.PRECISE,
BinaryHandlingMode.BYTES),
converters,
tableFilter);
this.ddlChanges = changesListener;
}
}
private Schema getColumnSchema(Table table, String column) {
TableSchema schema = tableSchemaBuilder.create("test-1", "dummy", table, null, null, null);
return schema.getEnvelopeSchema().schema().field("after").schema().field(column).schema();
}
}

View File

@ -56,6 +56,7 @@ public void beforeEach() {
tables = new Tables();
tableSchemaBuilder = new TableSchemaBuilder(
converters,
new MySqlDefaultValueConverter(converters),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
}
@ -73,17 +74,17 @@ public void parseUnsignedTinyintDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "UNSIGNED_TINYINT_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo((short) 0);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo((short) 10);
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo((short) 0);
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo((short) 10);
assertThat(table.columnWithName("C").isOptional()).isEqualTo(true);
assertThat(table.columnWithName("C").hasDefaultValue()).isTrue();
assertThat(table.columnWithName("C").defaultValue()).isNull();
assertThat(getColumnSchema(table, "C").defaultValue()).isNull();
assertThat(table.columnWithName("D").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("D").hasDefaultValue()).isFalse();
assertThat(table.columnWithName("E").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("E").defaultValue()).isEqualTo((short) 0);
assertThat(table.columnWithName("F").defaultValue()).isEqualTo((short) 0);
assertThat(table.columnWithName("G").defaultValue()).isEqualTo((short) 255);
assertThat(getColumnSchema(table, "E").defaultValue()).isEqualTo((short) 0);
assertThat(getColumnSchema(table, "F").defaultValue()).isEqualTo((short) 0);
assertThat(getColumnSchema(table, "G").defaultValue()).isEqualTo((short) 255);
}
@Test
@ -99,16 +100,16 @@ public void parseUnsignedSmallintDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "UNSIGNED_SMALLINT_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(0);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(10);
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo(0);
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo(10);
assertThat(table.columnWithName("C").isOptional()).isEqualTo(true);
assertThat(table.columnWithName("C").hasDefaultValue()).isTrue();
assertThat(table.columnWithName("D").isOptional()).isEqualTo(false);
assertThat(getColumnSchema(table, "D").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("D").hasDefaultValue()).isFalse();
assertThat(table.columnWithName("E").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("E").defaultValue()).isEqualTo(0);
assertThat(table.columnWithName("F").defaultValue()).isEqualTo(0);
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(65535);
assertThat(getColumnSchema(table, "E").isOptional()).isEqualTo(false);
assertThat(getColumnSchema(table, "E").defaultValue()).isEqualTo(0);
assertThat(getColumnSchema(table, "F").defaultValue()).isEqualTo(0);
assertThat(getColumnSchema(table, "G").defaultValue()).isEqualTo(65535);
}
@Test
@ -124,16 +125,16 @@ public void parseUnsignedMediumintDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "UNSIGNED_MEDIUMINT_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(0);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(10);
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo(0);
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo(10);
assertThat(table.columnWithName("C").isOptional()).isEqualTo(true);
assertThat(table.columnWithName("C").hasDefaultValue()).isTrue();
assertThat(table.columnWithName("D").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("D").hasDefaultValue()).isFalse();
assertThat(table.columnWithName("E").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("E").defaultValue()).isEqualTo(0);
assertThat(table.columnWithName("F").defaultValue()).isEqualTo(0);
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(16777215);
assertThat(getColumnSchema(table, "E").defaultValue()).isEqualTo(0);
assertThat(getColumnSchema(table, "F").defaultValue()).isEqualTo(0);
assertThat(getColumnSchema(table, "G").defaultValue()).isEqualTo(16777215);
}
@Test
@ -149,16 +150,16 @@ public void parseUnsignedIntDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "UNSIGNED_INT_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(0L);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(10L);
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo(0L);
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo(10L);
assertThat(table.columnWithName("C").isOptional()).isEqualTo(true);
assertThat(table.columnWithName("C").hasDefaultValue()).isTrue();
assertThat(table.columnWithName("D").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("D").hasDefaultValue()).isFalse();
assertThat(table.columnWithName("E").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("E").defaultValue()).isEqualTo(0L);
assertThat(table.columnWithName("F").defaultValue()).isEqualTo(0L);
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(4294967295L);
assertThat(getColumnSchema(table, "E").defaultValue()).isEqualTo(0L);
assertThat(getColumnSchema(table, "F").defaultValue()).isEqualTo(0L);
assertThat(getColumnSchema(table, "G").defaultValue()).isEqualTo(4294967295L);
}
@Test
@ -173,15 +174,15 @@ public void parseUnsignedBigIntDefaultValueToLong() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "UNSIGNED_BIGINT_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(0L);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(10L);
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo(0L);
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo(10L);
assertThat(table.columnWithName("C").isOptional()).isEqualTo(true);
assertThat(table.columnWithName("C").hasDefaultValue()).isTrue();
assertThat(table.columnWithName("D").isOptional()).isEqualTo(false);
assertThat(getColumnSchema(table, "D").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("D").hasDefaultValue()).isFalse();
assertThat(table.columnWithName("E").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("E").defaultValue()).isEqualTo(0L);
assertThat(table.columnWithName("F").defaultValue()).isEqualTo(0L);
assertThat(getColumnSchema(table, "E").isOptional()).isEqualTo(false);
assertThat(getColumnSchema(table, "E").defaultValue()).isEqualTo(0L);
assertThat(getColumnSchema(table, "F").defaultValue()).isEqualTo(0L);
}
@Test
@ -191,6 +192,11 @@ public void parseUnsignedBigIntDefaultValueToBigDecimal() {
JdbcValueConverters.BigIntUnsignedMode.PRECISE,
BinaryHandlingMode.BYTES);
final AbstractDdlParser parser = new MySqlAntlrDdlParser(converters);
final TableSchemaBuilder tableSchemaBuilder = new TableSchemaBuilder(
converters,
new MySqlDefaultValueConverter(converters),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
String sql = "CREATE TABLE UNSIGNED_BIGINT_TABLE (\n" +
" A BIGINT UNSIGNED NULL DEFAULT 0,\n" +
" B BIGINT UNSIGNED NULL DEFAULT '10',\n" +
@ -202,16 +208,16 @@ public void parseUnsignedBigIntDefaultValueToBigDecimal() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "UNSIGNED_BIGINT_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(BigDecimal.ZERO);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(new BigDecimal(10));
assertThat(getColumnSchema(table, "A", tableSchemaBuilder).defaultValue()).isEqualTo(BigDecimal.ZERO);
assertThat(getColumnSchema(table, "B", tableSchemaBuilder).defaultValue()).isEqualTo(new BigDecimal(10));
assertThat(table.columnWithName("C").isOptional()).isEqualTo(true);
assertThat(table.columnWithName("C").hasDefaultValue()).isTrue();
assertThat(table.columnWithName("D").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("D").hasDefaultValue()).isFalse();
assertThat(table.columnWithName("E").isOptional()).isEqualTo(false);
assertThat(table.columnWithName("E").defaultValue()).isEqualTo(BigDecimal.ZERO);
assertThat(table.columnWithName("F").defaultValue()).isEqualTo(BigDecimal.ZERO);
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(new BigDecimal("18446744073709551615"));
assertThat(getColumnSchema(table, "E", tableSchemaBuilder).defaultValue()).isEqualTo(BigDecimal.ZERO);
assertThat(getColumnSchema(table, "F", tableSchemaBuilder).defaultValue()).isEqualTo(BigDecimal.ZERO);
assertThat(getColumnSchema(table, "G", tableSchemaBuilder).defaultValue()).isEqualTo(new BigDecimal("18446744073709551615"));
}
@Test
@ -228,16 +234,16 @@ public void parseStringDefaultValue() {
") CHARACTER SET 'latin2';";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "UNSIGNED_STRING_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo("A");
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo("A");
assertThat(table.columnWithName("A").charsetName()).isEqualTo("latin2");
assertThat(table.columnWithName("B").defaultValue()).isEqualTo("b");
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo("b");
assertThat(table.columnWithName("B").charsetName()).isEqualTo("utf8");
assertThat(table.columnWithName("C").defaultValue()).isEqualTo("CC");
assertThat(table.columnWithName("D").defaultValue()).isEqualTo("10");
assertThat(table.columnWithName("E").defaultValue()).isEqualTo("0");
assertThat(table.columnWithName("F").defaultValue()).isEqualTo(null);
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(null);
assertThat(table.columnWithName("H").defaultValue()).isEqualTo(null);
assertThat(getColumnSchema(table, "C").defaultValue()).isEqualTo("CC");
assertThat(getColumnSchema(table, "D").defaultValue()).isEqualTo("10");
assertThat(getColumnSchema(table, "E").defaultValue()).isEqualTo("0");
assertThat(getColumnSchema(table, "F").defaultValue()).isEqualTo(null);
assertThat(getColumnSchema(table, "G").defaultValue()).isEqualTo(null);
assertThat(getColumnSchema(table, "H").defaultValue()).isEqualTo(null);
}
@Test
@ -256,16 +262,16 @@ public void parseBitDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "BIT_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(null);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(false);
assertThat(table.columnWithName("C").defaultValue()).isEqualTo(true);
assertThat(table.columnWithName("D").defaultValue()).isEqualTo(false);
assertThat(table.columnWithName("E").defaultValue()).isEqualTo(true);
assertThat(table.columnWithName("F").defaultValue()).isEqualTo(true);
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(false);
assertThat(table.columnWithName("H").defaultValue()).isEqualTo(new byte[]{ 66, 1 });
assertThat(table.columnWithName("I").defaultValue()).isEqualTo(null);
assertThat(table.columnWithName("J").defaultValue()).isEqualTo(new byte[]{ 15, 97, 1, 0 });
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo(null);
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo(false);
assertThat(getColumnSchema(table, "C").defaultValue()).isEqualTo(true);
assertThat(getColumnSchema(table, "D").defaultValue()).isEqualTo(false);
assertThat(getColumnSchema(table, "E").defaultValue()).isEqualTo(true);
assertThat(getColumnSchema(table, "F").defaultValue()).isEqualTo(true);
assertThat(getColumnSchema(table, "G").defaultValue()).isEqualTo(false);
assertThat(getColumnSchema(table, "H").defaultValue()).isEqualTo(new byte[]{ 66, 1 });
assertThat(getColumnSchema(table, "I").defaultValue()).isEqualTo(null);
assertThat(getColumnSchema(table, "J").defaultValue()).isEqualTo(new byte[]{ 15, 97, 1, 0 });
}
@Test
@ -279,11 +285,11 @@ public void parseBooleanDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "BOOLEAN_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(false);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(true);
assertThat(table.columnWithName("C").defaultValue()).isEqualTo(true);
assertThat(table.columnWithName("D").defaultValue()).isEqualTo(true);
assertThat(table.columnWithName("E").defaultValue()).isEqualTo(null);
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo(false);
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo(true);
assertThat(getColumnSchema(table, "C").defaultValue()).isEqualTo(true);
assertThat(getColumnSchema(table, "D").defaultValue()).isEqualTo(true);
assertThat(getColumnSchema(table, "E").defaultValue()).isEqualTo(null);
}
@Test
@ -316,8 +322,8 @@ public void parseRealDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "REAL_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(1f);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(null);
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo(1f);
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo(null);
}
@Test
@ -330,10 +336,10 @@ public void parseNumericAndDecimalToDoubleDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "NUMERIC_DECIMAL_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(1.0d);
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(2.321d);
assertThat(table.columnWithName("C").defaultValue()).isEqualTo(13d);
assertThat(table.columnWithName("D").defaultValue()).isEqualTo(12.68d);
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo(1.0d);
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo(2.321d);
assertThat(getColumnSchema(table, "C").defaultValue()).isEqualTo(13d);
assertThat(getColumnSchema(table, "D").defaultValue()).isEqualTo(12.68d);
}
@Test
@ -343,6 +349,10 @@ public void parseNumericAndDecimalToDecimalDefaultValue() {
JdbcValueConverters.BigIntUnsignedMode.LONG,
BinaryHandlingMode.BYTES);
final AbstractDdlParser parser = new MySqlAntlrDdlParser(converters);
final TableSchemaBuilder tableSchemaBuilder = new TableSchemaBuilder(
converters,
new MySqlDefaultValueConverter(converters),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
String sql = "CREATE TABLE NUMERIC_DECIMAL_TABLE (\n" +
" A NUMERIC NOT NULL DEFAULT 1.23,\n" +
" B DECIMAL(5,3) NOT NULL DEFAULT 2.321,\n" +
@ -350,9 +360,9 @@ public void parseNumericAndDecimalToDecimalDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "NUMERIC_DECIMAL_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo(BigDecimal.valueOf(1));
assertThat(table.columnWithName("B").defaultValue()).isEqualTo(BigDecimal.valueOf(2.321));
assertThat(table.columnWithName("C").defaultValue()).isEqualTo(BigDecimal.valueOf(13));
assertThat(getColumnSchema(table, "A", tableSchemaBuilder).defaultValue()).isEqualTo(BigDecimal.valueOf(1));
assertThat(getColumnSchema(table, "B", tableSchemaBuilder).defaultValue()).isEqualTo(BigDecimal.valueOf(2.321));
assertThat(getColumnSchema(table, "C", tableSchemaBuilder).defaultValue()).isEqualTo(BigDecimal.valueOf(13));
}
@Test
@ -376,24 +386,24 @@ public void parseTimeDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "TIME_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
assertThat(table.columnWithName("B").defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
assertThat(table.columnWithName("C").defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
assertThat(table.columnWithName("D").defaultValue())
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
assertThat(getColumnSchema(table, "C").defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
assertThat(getColumnSchema(table, "D").defaultValue())
.isEqualTo(ZonedTimestamp.toIsoString(LocalDateTime.of(2018, 6, 26, 12, 34, 56, 0).atZone(ZoneId.systemDefault()), null));
assertThat(table.columnWithName("E").defaultValue())
assertThat(getColumnSchema(table, "E").defaultValue())
.isEqualTo(ZonedTimestamp.toIsoString(LocalDateTime.of(2018, 6, 26, 12, 34, 56, 0).atZone(ZoneId.systemDefault()), null));
assertThat(table.columnWithName("F").defaultValue())
assertThat(getColumnSchema(table, "F").defaultValue())
.isEqualTo(ZonedTimestamp.toIsoString(LocalDateTime.of(2018, 6, 26, 12, 34, 56, 780_000_000).atZone(ZoneId.systemDefault()), null));
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(Date.from(Instant.ofEpochMilli(0)));
assertThat(table.columnWithName("H").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(table.columnWithName("I").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(table.columnWithName("J").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2018, 6, 26, 12, 34, 56, 0, ZoneOffset.UTC).toInstant()));
assertThat(table.columnWithName("K").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2018, 6, 26, 12, 34, 56, 0, ZoneOffset.UTC).toInstant()));
assertThat(table.columnWithName("L").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2018, 6, 26, 12, 34, 56, 780_000_000, ZoneOffset.UTC).toInstant()));
assertThat(table.columnWithName("M").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(table.columnWithName("N").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(table.columnWithName("O").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "G").defaultValue()).isEqualTo(Date.from(Instant.ofEpochMilli(0)));
assertThat(getColumnSchema(table, "H").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "I").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "J").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2018, 6, 26, 12, 34, 56, 0, ZoneOffset.UTC).toInstant()));
assertThat(getColumnSchema(table, "K").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2018, 6, 26, 12, 34, 56, 0, ZoneOffset.UTC).toInstant()));
assertThat(getColumnSchema(table, "L").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2018, 6, 26, 12, 34, 56, 780_000_000, ZoneOffset.UTC).toInstant()));
assertThat(getColumnSchema(table, "M").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "N").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "O").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
}
@Test
@ -411,15 +421,15 @@ public void parseDateDefaultValue() {
");";
parser.parse(sql, tables);
Table table = tables.forTable(new TableId(null, null, "DATE_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(table.columnWithName("B").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(table.columnWithName("C").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(table.columnWithName("D").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(table.columnWithName("E").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(9999, 9, 9, 0, 0, 0, 0, ZoneOffset.UTC).toInstant()));
assertThat(table.columnWithName("F").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(1111, 11, 11, 0, 0, 0, 0, ZoneOffset.UTC).toInstant()));
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2018, 8, 31, 0, 0, 0, 0, ZoneOffset.UTC).toInstant()));
assertThat(table.columnWithName("H").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2050, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant()));
assertThat(table.columnWithName("I").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "C").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "D").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "E").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(9999, 9, 9, 0, 0, 0, 0, ZoneOffset.UTC).toInstant()));
assertThat(getColumnSchema(table, "F").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(1111, 11, 11, 0, 0, 0, 0, ZoneOffset.UTC).toInstant()));
assertThat(getColumnSchema(table, "G").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2018, 8, 31, 0, 0, 0, 0, ZoneOffset.UTC).toInstant()));
assertThat(getColumnSchema(table, "H").defaultValue()).isEqualTo(Date.from(ZonedDateTime.of(2050, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant()));
assertThat(getColumnSchema(table, "I").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
}
@Test
@ -432,8 +442,8 @@ public void parseAlterTableTruncatedDefaulDateTime() {
parser.parse(sql, tables);
parser.parse(alterSql, tables);
Table table = tables.forTable(new TableId(null, null, "TIME_TABLE"));
assertThat(table.columnWithName("A").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(table.columnWithName("B").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "A").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
assertThat(getColumnSchema(table, "B").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
}
@Test
@ -447,13 +457,13 @@ public void shouldAcceptZeroAsDefaultValueForDateColumn() {
assertThat(table.columnWithName("nullable_date").hasDefaultValue()).isTrue();
// zero date should be mapped to null for nullable column
assertThat(table.columnWithName("nullable_date").defaultValue()).isNull();
assertThat(getColumnSchema(table, "nullable_date").defaultValue()).isNull();
assertThat(table.columnWithName("not_nullable_date").hasDefaultValue()).isTrue();
// zero date should be mapped to epoch for non-nullable column (expecting Date, as this test is using "connect"
// mode)
assertThat(table.columnWithName("not_nullable_date").defaultValue()).isEqualTo(getEpochDate());
assertThat(getColumnSchema(table, "not_nullable_date").defaultValue()).isEqualTo(getEpochDate());
}
private Date getEpochDate() {
@ -475,11 +485,11 @@ public void shouldAcceptBooleanAsTinyIntDefaultValue() {
Table table = tables.forTable(new TableId(null, null, "data"));
assertThat((Boolean) table.columnWithName("bval").defaultValue()).isTrue();
assertThat((Short) table.columnWithName("tival1").defaultValue()).isZero();
assertThat((Short) table.columnWithName("tival2").defaultValue()).isEqualTo((short) 3);
assertThat((Short) table.columnWithName("tival3").defaultValue()).isEqualTo((short) 1);
assertThat((Short) table.columnWithName("tival4").defaultValue()).isEqualTo((short) 18);
assertThat((Boolean) getColumnSchema(table, "bval").defaultValue()).isTrue();
assertThat((Short) getColumnSchema(table, "tival1").defaultValue()).isZero();
assertThat((Short) getColumnSchema(table, "tival2").defaultValue()).isEqualTo((short) 3);
assertThat((Short) getColumnSchema(table, "tival3").defaultValue()).isEqualTo((short) 1);
assertThat((Short) getColumnSchema(table, "tival4").defaultValue()).isEqualTo((short) 18);
}
@Test
@ -497,11 +507,11 @@ public void shouldAcceptBooleanAsIntDefaultValue() {
Table table = tables.forTable(new TableId(null, null, "data"));
assertThat((Boolean) table.columnWithName("bval").defaultValue()).isTrue();
assertThat((Integer) table.columnWithName("ival1").defaultValue()).isZero();
assertThat((Integer) table.columnWithName("ival2").defaultValue()).isEqualTo(3);
assertThat((Integer) table.columnWithName("ival3").defaultValue()).isEqualTo(1);
assertThat((Integer) table.columnWithName("ival4").defaultValue()).isEqualTo(18);
assertThat((Boolean) getColumnSchema(table, "bval").defaultValue()).isTrue();
assertThat((Integer) getColumnSchema(table, "ival1").defaultValue()).isZero();
assertThat((Integer) getColumnSchema(table, "ival2").defaultValue()).isEqualTo(3);
assertThat((Integer) getColumnSchema(table, "ival3").defaultValue()).isEqualTo(1);
assertThat((Integer) getColumnSchema(table, "ival4").defaultValue()).isEqualTo(18);
}
@Test
@ -513,7 +523,7 @@ public void shouldAcceptBitSetDefaultValue() {
Table table = tables.forTable(new TableId(null, null, "user_subscribe"));
final byte[] defVal = (byte[]) table.columnWithName("content").defaultValue();
final byte[] defVal = (byte[]) getColumnSchema(table, "content").defaultValue();
assertThat(Byte.toUnsignedInt((defVal[0]))).isEqualTo(0b00001110);
assertThat(Byte.toUnsignedInt((defVal[1]))).isEqualTo(0b11111011);
assertThat(Byte.toUnsignedInt((defVal[2]))).isEqualTo(0b11111111);
@ -526,11 +536,15 @@ public void shouldTrimNumericalDefaultValueAndShouldNotTrimNonNumericalDefaultVa
parser.parse(ddl, tables);
Table table = tables.forTable(new TableId(null, null, "data"));
assertThat((Integer) table.columnWithName("id").defaultValue()).isEqualTo(1);
assertThat((String) table.columnWithName("data").defaultValue()).isEqualTo(" 3 ");
assertThat((Integer) getColumnSchema(table, "id").defaultValue()).isEqualTo(1);
assertThat((String) getColumnSchema(table, "data").defaultValue()).isEqualTo(" 3 ");
}
private Schema getColumnSchema(Table table, String column) {
return getColumnSchema(table, column, tableSchemaBuilder);
}
private Schema getColumnSchema(Table table, String column, TableSchemaBuilder tableSchemaBuilder) {
TableSchema schema = tableSchemaBuilder.create("test", "dummy", table, null, null, null);
return schema.getEnvelopeSchema().schema().field("after").schema().field(column).schema();
}

View File

@ -15,7 +15,6 @@
import org.junit.Test;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.util.Collect;
@ -38,7 +37,6 @@ public abstract class AbstractDatabaseHistoryTest {
protected Tables t4;
protected Tables all;
protected DdlParser parser;
protected DefaultValueConverter defaultValueConverter;
@Before
public void beforeEach() {
@ -89,7 +87,7 @@ protected void record(long pos, int entry, String ddl, Tables... update) {
protected Tables recover(long pos, int entry) {
Tables result = new Tables();
history.recover(source1, position("a.log", pos, entry), result, parser, defaultValueConverter);
history.recover(source1, position("a.log", pos, entry), result, parser);
return result;
}

View File

@ -86,7 +86,6 @@ private void resolveColumnDataType(PlSqlParser.Column_definitionContext ctx) {
if (ctx.DEFAULT() != null) {
String defaultValue = ctx.expression().getText();
columnEditor.defaultValueExpression(defaultValue);
columnEditor.defaultValue(defaultValue);
}
}

View File

@ -369,8 +369,8 @@ private void testColumn(@NotNull Table table, @NotNull String name, boolean isOp
assertThat(oScale.get()).isEqualTo(scale);
}
assertThat(column.hasDefaultValue()).isEqualTo(hasDefault);
if (column.hasDefaultValue() && column.defaultValue() != null) {
assertThat(defaultValue.equals(column.defaultValue()));
if (column.hasDefaultValue() && column.defaultValueExpression().isPresent()) {
assertThat(defaultValue.equals(column.defaultValueExpression().get()));
}
}
}

View File

@ -373,13 +373,9 @@ private Table tableFromFromMessage(List<ReplicationMessage.Column> columns, Tabl
// as long as default value is not added to the decoded message metadata, we must apply
// the current default read from the database
Optional.ofNullable(table.columnWithName(column.getName()))
.map(Column::defaultValueExpression)
.flatMap(Column::defaultValueExpression)
.ifPresent(columnEditor::defaultValueExpression);
Optional.ofNullable(table.columnWithName(column.getName()))
.map(Column::defaultValue)
.ifPresent(columnEditor::defaultValue);
return columnEditor.create();
})
.collect(Collectors.toList()));

View File

@ -26,6 +26,7 @@
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresConnection.PostgresValueConverterBuilder;
import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
@ -92,8 +93,9 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
}
final TypeRegistry typeRegistry = jdbcConnection.getTypeRegistry();
final PostgresDefaultValueConverter defaultValueConverter = jdbcConnection.getDefaultValueConverter();
schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector, valueConverterBuilder.build(typeRegistry));
schema = new PostgresSchema(connectorConfig, typeRegistry, defaultValueConverter, topicSelector, valueConverterBuilder.build(typeRegistry));
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector);
final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
new PostgresPartition.Provider(connectorConfig), new PostgresOffsetContext.Loader(connectorConfig));

View File

@ -20,6 +20,7 @@
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalDatabaseSchema;
@ -55,11 +56,11 @@ public class PostgresSchema extends RelationalDatabaseSchema {
*
* @param config the connector configuration, which is presumed to be valid
*/
protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry,
protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry, PostgresDefaultValueConverter defaultValueConverter,
TopicSelector<TableId> topicSelector, PostgresValueConverter valueConverter) {
super(config, topicSelector, new Filters(config).tableFilter(),
config.getColumnFilter(), getTableSchemaBuilder(config, valueConverter), false,
config.getKeyMapper());
config.getColumnFilter(), getTableSchemaBuilder(config, valueConverter, defaultValueConverter),
false, config.getKeyMapper());
this.typeRegistry = typeRegistry;
this.tableIdToToastableColumns = new HashMap<>();
@ -67,8 +68,10 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist
this.readToastableColumns = config.skipRefreshSchemaOnMissingToastableData();
}
private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, PostgresValueConverter valueConverter) {
return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(), config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(),
private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, PostgresValueConverter valueConverter,
PostgresDefaultValueConverter defaultValueConverter) {
return new TableSchemaBuilder(valueConverter, defaultValueConverter, SchemaNameAdjuster.create(),
config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(),
config.getSanitizeFieldNames(), false);
}

View File

@ -567,10 +567,9 @@ private Optional<ColumnEditor> doReadTableColumn(ResultSet columnMetadata, Table
column.scale(nativeType.getDefaultScale());
}
final String defaultValue = columnMetadata.getString(13);
if (defaultValue != null) {
column.defaultValueExpression(defaultValue);
getDefaultValue(column.create(), defaultValue).ifPresent(column::defaultValue);
final String defaultValueExpression = columnMetadata.getString(13);
if (defaultValueExpression != null && getDefaultValueConverter().supportConversion(column.typeName())) {
column.defaultValueExpression(defaultValueExpression);
}
return Optional.of(column);
@ -579,9 +578,9 @@ private Optional<ColumnEditor> doReadTableColumn(ResultSet columnMetadata, Table
return Optional.empty();
}
@Override
protected Optional<Object> getDefaultValue(Column column, String defaultValue) {
return defaultValueConverter.parseDefaultValue(column, defaultValue);
public PostgresDefaultValueConverter getDefaultValueConverter() {
Objects.requireNonNull(defaultValueConverter, "Connection does not provide default value converter");
return defaultValueConverter;
}
public TypeRegistry getTypeRegistry() {

View File

@ -29,6 +29,7 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.relational.Column;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.ValueConverter;
import io.debezium.util.Collect;
@ -36,7 +37,7 @@
* Parses and converts column default values.
*/
@ThreadSafe
class PostgresDefaultValueConverter {
public class PostgresDefaultValueConverter implements DefaultValueConverter {
private static Logger LOGGER = LoggerFactory.getLogger(PostgresDefaultValueConverter.class);
@ -65,12 +66,17 @@ private interface DefaultValueMapper {
private final PostgresValueConverter valueConverters;
private final Map<String, DefaultValueMapper> defaultValueMappers;
PostgresDefaultValueConverter(PostgresValueConverter valueConverters, TimestampUtils timestampUtils) {
public PostgresDefaultValueConverter(PostgresValueConverter valueConverters, TimestampUtils timestampUtils) {
this.valueConverters = valueConverters;
this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers(timestampUtils));
}
Optional<Object> parseDefaultValue(Column column, String defaultValue) {
@Override
public Optional<Object> parseDefaultValue(Column column, String defaultValueExpression) {
if (defaultValueExpression == null) {
return Optional.empty();
}
final String dataType = column.typeName();
final DefaultValueMapper mapper = defaultValueMappers.get(dataType);
@ -80,11 +86,11 @@ Optional<Object> parseDefaultValue(Column column, String defaultValue) {
}
if (TRIM_DATA_TYPES.contains(dataType)) {
defaultValue = defaultValue.trim();
defaultValueExpression = defaultValueExpression.trim();
}
try {
Object rawDefaultValue = mapper.parse(defaultValue);
Object rawDefaultValue = mapper.parse(defaultValueExpression);
Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, column);
if (convertedDefaultValue == null) {
return Optional.empty();
@ -97,7 +103,7 @@ Optional<Object> parseDefaultValue(Column column, String defaultValue) {
return Optional.of(convertedDefaultValue);
}
catch (Exception e) {
LOGGER.warn("Cannot parse column default value '{}' to type '{}'. Expression evaluation is not supported.", defaultValue, dataType);
LOGGER.warn("Cannot parse column default value '{}' to type '{}'. Expression evaluation is not supported.", defaultValueExpression, dataType);
LOGGER.debug("Parsing failed due to error", e);
return Optional.empty();
}
@ -205,4 +211,8 @@ private static String extractDefault(String defaultValue, String generatedValueP
return extractDefault(defaultValue);
}
public boolean supportConversion(String typeName) {
return defaultValueMappers.containsKey(typeName);
}
}

View File

@ -22,7 +22,7 @@ public class ColumnMetaData {
private final boolean key;
private final boolean optional;
private final boolean hasDefaultValue;
private final Object defaultValue;
private final String defaultValueExpression;
private final int length;
private final int scale;
private final String typeName;
@ -35,16 +35,17 @@ public class ColumnMetaData {
* @param key {@code true} if column is part of the primary key, {@code false} otherwise
* @param optional {@code true} if the column is considered optional, {@code false} otherwise
* @param hasDefaultValue {@code true} if the column has a default value specified, {@code false} otherwise
* @param defaultValue the parsed default value for the column
* @param defaultValueExpression the parsed default value literal for the column
* @param typeModifier the attribute type modifier
*/
ColumnMetaData(String columnName, PostgresType postgresType, boolean key, boolean optional, boolean hasDefaultValue, Object defaultValue, int typeModifier) {
ColumnMetaData(String columnName, PostgresType postgresType, boolean key, boolean optional, boolean hasDefaultValue, String defaultValueExpression,
int typeModifier) {
this.columnName = columnName;
this.postgresType = postgresType;
this.key = key;
this.optional = optional;
this.hasDefaultValue = hasDefaultValue;
this.defaultValue = defaultValue;
this.defaultValueExpression = defaultValueExpression;
// todo: investigate whether this can be removed and PostgresType updated to always delegate
// Currently PostgresType only delegates calls to length and scale with an attribute modifier
@ -88,8 +89,8 @@ public boolean hasDefaultValue() {
return hasDefaultValue;
}
public Object getDefaultValue() {
return defaultValue;
public String getDefaultValueExpression() {
return defaultValueExpression;
}
public int getLength() {

View File

@ -262,7 +262,7 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry)
LOGGER.trace("Schema: '{}', Table: '{}'", schemaName, tableName);
// Perform several out-of-bands database metadata queries
Map<String, Optional<Object>> columnDefaults;
Map<String, Optional<String>> columnDefaults;
Map<String, Boolean> columnOptionality;
List<String> primaryKeyColumns;
@ -272,7 +272,7 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry)
final List<io.debezium.relational.Column> readColumns = getTableColumnsFromDatabase(connection, databaseMetadata, tableId);
columnDefaults = readColumns.stream()
.filter(io.debezium.relational.Column::hasDefaultValue)
.collect(toMap(io.debezium.relational.Column::name, column -> Optional.ofNullable(column.defaultValue())));
.collect(toMap(io.debezium.relational.Column::name, io.debezium.relational.Column::defaultValueExpression));
columnOptionality = readColumns.stream().collect(toMap(io.debezium.relational.Column::name, io.debezium.relational.Column::isOptional));
primaryKeyColumns = connection.readPrimaryKeyNames(databaseMetadata, tableId);
@ -299,9 +299,9 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry)
}
final boolean hasDefault = columnDefaults.containsKey(columnName);
final Object defaultValue = columnDefaults.getOrDefault(columnName, Optional.empty()).orElse(null);
final String defaultValueExpression = columnDefaults.getOrDefault(columnName, Optional.empty()).orElse(null);
columns.add(new ColumnMetaData(columnName, postgresType, key, optional, hasDefault, defaultValue, attypmod));
columns.add(new ColumnMetaData(columnName, postgresType, key, optional, hasDefault, defaultValueExpression, attypmod));
columnNames.add(columnName);
}
@ -583,7 +583,7 @@ private Table resolveRelationFromMetadata(PgOutputRelationMetaData metadata) {
.scale(columnMetadata.getScale());
if (columnMetadata.hasDefaultValue()) {
editor.defaultValue(columnMetadata.getDefaultValue());
editor.defaultValueExpression(columnMetadata.getDefaultValueExpression());
}
columns.add(editor.create());

View File

@ -48,6 +48,7 @@ public void retryOnFailureToCreateConnection() throws Exception {
postgresConnectorTask.createReplicationConnection(new FakeContext(config, new PostgresSchema(
config,
null,
null,
PostgresTopicSelector.create(config), null)), true, 3, Duration.ofSeconds(2));
// Verify retry happened for 10 seconds

View File

@ -34,6 +34,7 @@
import org.junit.Test;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
import io.debezium.connector.postgresql.data.Ltree;
import io.debezium.data.Bits;
import io.debezium.data.Json;
@ -445,79 +446,91 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
schema = TestHelper.getSchema(config);
final PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder = (typeRegistry) -> PostgresValueConverter.of(
config,
TestHelper.getDatabaseCharset(),
typeRegistry);
try (PostgresConnection connection = TestHelper.createWithTypeRegistry()) {
PostgresDefaultValueConverter defaultValueConverter = connection.getDefaultValueConverter();
connection.execute(ddl);
schema.refresh(connection, false);
List<Column> columns = tableFor("public.default_column_test").columns();
assertColumnDefault("pk", 0, columns);
assertColumnDefault("ss", (short) 0, columns);
assertColumnDefault("bs", 0L, columns);
assertColumnDefault("bigint", 9223372036854775807L, columns);
assertColumnDefault("bit_as_boolean", true, columns);
assertColumnDefault("bit", new byte[]{ 3 }, columns);
assertColumnDefault("varbit", new byte[]{ 6 }, columns);
assertColumnDefault("boolean", true, columns);
assertColumnDefault("char", "abcd", columns);
assertColumnDefault("varchar", "abcde", columns);
assertColumnDefault("pk", 0, columns, defaultValueConverter);
assertColumnDefault("ss", (short) 0, columns, defaultValueConverter);
assertColumnDefault("bs", 0L, columns, defaultValueConverter);
assertColumnDefault("bigint", 9223372036854775807L, columns, defaultValueConverter);
assertColumnDefault("bit_as_boolean", true, columns, defaultValueConverter);
assertColumnDefault("bit", new byte[]{ 3 }, columns, defaultValueConverter);
assertColumnDefault("varbit", new byte[]{ 6 }, columns, defaultValueConverter);
assertColumnDefault("boolean", true, columns, defaultValueConverter);
assertColumnDefault("char", "abcd", columns, defaultValueConverter);
assertColumnDefault("varchar", "abcde", columns, defaultValueConverter);
assertColumnDefault("date", (int) LocalDate.of(2021, 3, 19).toEpochDay(), columns);
assertColumnDefault("date_func", 0, columns);
assertColumnDefault("date", (int) LocalDate.of(2021, 3, 19).toEpochDay(), columns, defaultValueConverter);
assertColumnDefault("date_func", 0, columns, defaultValueConverter);
assertColumnDefault("double", 123456789.1234567890123, columns);
assertColumnDefault("integer", 2147483647, columns);
assertColumnDefault("integer_func1", 0, columns);
assertColumnDefault("integer_func2", 0, columns);
assertColumnDefault("integer_opt", null, columns);
assertColumnDefault("double", 123456789.1234567890123, columns, defaultValueConverter);
assertColumnDefault("integer", 2147483647, columns, defaultValueConverter);
assertColumnDefault("integer_func1", 0, columns, defaultValueConverter);
assertColumnDefault("integer_func2", 0, columns, defaultValueConverter);
assertColumnDefault("integer_opt", null, columns, defaultValueConverter);
assertColumnDefault("interval", TimeUnit.HOURS.toMicros(1), columns);
assertColumnDefault("interval_func1", 0L, columns);
assertColumnDefault("interval", TimeUnit.HOURS.toMicros(1), columns, defaultValueConverter);
assertColumnDefault("interval_func1", 0L, columns, defaultValueConverter);
assertColumnDefault("json", "{}", columns);
assertColumnDefault("json_opt", null, columns);
assertColumnDefault("jsonb", "{}", columns);
assertColumnDefault("json", "{}", columns, defaultValueConverter);
assertColumnDefault("json_opt", null, columns, defaultValueConverter);
assertColumnDefault("jsonb", "{}", columns, defaultValueConverter);
assertColumnDefault("numeric", new BigDecimal("12345.67891"), columns);
assertColumnDefault("numeric", new BigDecimal("12345.67891"), columns, defaultValueConverter);
// KAFKA-12694: default value for Struct currently exported as null
assertColumnDefault("numeric_var", null, columns);
assertColumnDefault("real", 1234567890.5f, columns);
assertColumnDefault("smallint", (short) 32767, columns);
assertColumnDefault("numeric_var", null, columns, defaultValueConverter);
assertColumnDefault("real", 1234567890.5f, columns, defaultValueConverter);
assertColumnDefault("smallint", (short) 32767, columns, defaultValueConverter);
assertColumnDefault("text", "asdf", columns);
assertColumnDefault("text_parens", "text(parens)", columns);
assertColumnDefault("text_func3", "", columns);
assertColumnDefault("text", "asdf", columns, defaultValueConverter);
assertColumnDefault("text_parens", "text(parens)", columns, defaultValueConverter);
assertColumnDefault("text_func3", "", columns, defaultValueConverter);
assertColumnDefault("time_hm", TimeUnit.SECONDS.toMicros(LocalTime.of(12, 34).toSecondOfDay()), columns);
assertColumnDefault("time_hms", TimeUnit.SECONDS.toMicros(LocalTime.of(12, 34, 56).toSecondOfDay()), columns);
assertColumnDefault("time_func", 0L, columns);
assertColumnDefault("timestamp", TimeUnit.SECONDS.toMicros(1616247868), columns);
assertColumnDefault("timestamp_func", 0L, columns);
assertColumnDefault("timestamp_opt", null, columns);
assertColumnDefault("timestamptz", Instant.ofEpochSecond(1616247868).toString(), columns);
assertColumnDefault("timestamptz_func", Instant.ofEpochSecond(0).toString(), columns);
assertColumnDefault("timestamptz_opt", null, columns);
assertColumnDefault("time_hm", TimeUnit.SECONDS.toMicros(LocalTime.of(12, 34).toSecondOfDay()), columns, defaultValueConverter);
assertColumnDefault("time_hms", TimeUnit.SECONDS.toMicros(LocalTime.of(12, 34, 56).toSecondOfDay()), columns, defaultValueConverter);
assertColumnDefault("time_func", 0L, columns, defaultValueConverter);
assertColumnDefault("timestamp", TimeUnit.SECONDS.toMicros(1616247868), columns, defaultValueConverter);
assertColumnDefault("timestamp_func", 0L, columns, defaultValueConverter);
assertColumnDefault("timestamp_opt", null, columns, defaultValueConverter);
assertColumnDefault("timestamptz", Instant.ofEpochSecond(1616247868).toString(), columns, defaultValueConverter);
assertColumnDefault("timestamptz_func", Instant.ofEpochSecond(0).toString(), columns, defaultValueConverter);
assertColumnDefault("timestamptz_opt", null, columns, defaultValueConverter);
assertColumnDefault("uuid", "76019d1a-ad2e-4b22-96e9-1a6d6543c818", columns);
assertColumnDefault("uuid_func", "00000000-0000-0000-0000-000000000000", columns);
assertColumnDefault("uuid_opt", null, columns);
assertColumnDefault("xml", "<foo>bar</foo>", columns);
assertColumnDefault("uuid", "76019d1a-ad2e-4b22-96e9-1a6d6543c818", columns, defaultValueConverter);
assertColumnDefault("uuid_func", "00000000-0000-0000-0000-000000000000", columns, defaultValueConverter);
assertColumnDefault("uuid_opt", null, columns, defaultValueConverter);
assertColumnDefault("xml", "<foo>bar</foo>", columns, defaultValueConverter);
}
}
private void assertColumnDefault(String columnName, Object expectedDefault, List<Column> columns) {
private void assertColumnDefault(String columnName, Object expectedDefault, List<Column> columns, PostgresDefaultValueConverter defaultValueConverter) {
Column column = columns.stream().filter(c -> c.name().equals(columnName)).findFirst().get();
Object defaultValue = defaultValueConverter
.parseDefaultValue(column, column.defaultValueExpression().orElse(null))
.orElse(null);
if (expectedDefault instanceof byte[]) {
byte[] expectedBytes = (byte[]) expectedDefault;
byte[] defaultBytes = (byte[]) column.defaultValue();
byte[] defaultBytes = (byte[]) defaultValue;
assertArrayEquals(expectedBytes, defaultBytes);
}
else {
if (Objects.isNull(column.defaultValue())) {
if (Objects.isNull(defaultValue)) {
assertTrue(Objects.isNull(expectedDefault));
}
else {
assertTrue(column.defaultValue().equals(expectedDefault));
assertTrue(defaultValue.equals(expectedDefault));
}
}
}

View File

@ -34,6 +34,7 @@
import io.debezium.connector.postgresql.PostgresConnectorConfig.SecureConnectionMode;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresConnection.PostgresValueConverterBuilder;
import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@ -201,6 +202,20 @@ public static TypeRegistry getTypeRegistry() {
}
}
public static PostgresDefaultValueConverter getDefaultValueConverter() {
final PostgresConnectorConfig config = new PostgresConnectorConfig(defaultConfig().build());
try (final PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config))) {
return connection.getDefaultValueConverter();
}
}
public static Charset getDatabaseCharset() {
final PostgresConnectorConfig config = new PostgresConnectorConfig(defaultConfig().build());
try (final PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config))) {
return connection.getDatabaseCharset();
}
}
public static PostgresSchema getSchema(PostgresConnectorConfig config) {
return getSchema(config, TestHelper.getTypeRegistry());
}
@ -209,6 +224,7 @@ public static PostgresSchema getSchema(PostgresConnectorConfig config, TypeRegis
return new PostgresSchema(
config,
typeRegistry,
TestHelper.getDefaultValueConverter(),
PostgresTopicSelector.create(config),
getPostgresValueConverter(typeRegistry, config));
}

View File

@ -29,6 +29,7 @@
public class PostgresDefaultValueConverterIT {
private PostgresConnection postgresConnection;
private PostgresValueConverter postgresValueConverter;
private PostgresDefaultValueConverter postgresDefaultValueConverter;
@Before
@ -38,7 +39,7 @@ public void before() throws SQLException {
postgresConnection = TestHelper.create();
PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(defaultJdbcConfig());
PostgresValueConverter postgresValueConverter = PostgresValueConverter.of(
postgresValueConverter = PostgresValueConverter.of(
postgresConnectorConfig,
Charset.defaultCharset(),
new TypeRegistry(postgresConnection));
@ -58,10 +59,10 @@ public void closeConnection() {
@FixFor("DBZ-4137")
public void shouldReturnNullForNumericDefaultValue() {
final Column NumericalColumn = Column.editor().type("numeric", "numeric(19, 4)")
.jdbcType(Types.NUMERIC).defaultValue("NULL::numeric").optional(true).create();
.jdbcType(Types.NUMERIC).defaultValueExpression("NULL::numeric").optional(true).create();
final Optional<Object> numericalConvertedValue = postgresDefaultValueConverter.parseDefaultValue(
NumericalColumn,
(String) NumericalColumn.defaultValue());
NumericalColumn.defaultValueExpression().orElse(null));
Assert.assertEquals(numericalConvertedValue, Optional.empty());
}
@ -84,10 +85,10 @@ public void shouldReturnNullForNumericDefaultValueUsingDecimalHandlingModePrecis
postgresValueConverter, postgresConnection.getTimestampUtils());
final Column NumericalColumn = Column.editor().type("numeric", "numeric(19, 4)")
.jdbcType(Types.NUMERIC).defaultValue("NULL::numeric").optional(true).create();
.jdbcType(Types.NUMERIC).defaultValueExpression("NULL::numeric").optional(true).create();
final Optional<Object> numericalConvertedValue = postgresDefaultValueConverter.parseDefaultValue(
NumericalColumn,
(String) NumericalColumn.defaultValue());
NumericalColumn.defaultValueExpression().orElse(null));
Assert.assertEquals(numericalConvertedValue, Optional.empty());
}
@ -95,17 +96,17 @@ public void shouldReturnNullForNumericDefaultValueUsingDecimalHandlingModePrecis
@Test
@FixFor("DBZ-3989")
public void shouldTrimNumericalDefaultValueAndShouldNotTrimNonNumericalDefaultValue() {
final Column NumericalColumn = Column.editor().type("int8").jdbcType(Types.INTEGER).defaultValue(" 1 ").create();
final Column NumericalColumn = Column.editor().type("int8").jdbcType(Types.INTEGER).defaultValueExpression(" 1 ").create();
final Optional<Object> numericalConvertedValue = postgresDefaultValueConverter.parseDefaultValue(
NumericalColumn,
(String) NumericalColumn.defaultValue());
NumericalColumn.defaultValueExpression().orElse(null));
Assert.assertEquals(numericalConvertedValue, Optional.of(1));
final Column nonNumericalColumn = Column.editor().type("text").jdbcType(Types.VARCHAR).defaultValue(" 1 ").create();
final Column nonNumericalColumn = Column.editor().type("text").jdbcType(Types.VARCHAR).defaultValueExpression(" 1 ").create();
final Optional<Object> nonNumericalConvertedValue = postgresDefaultValueConverter.parseDefaultValue(
nonNumericalColumn,
(String) nonNumericalColumn.defaultValue());
NumericalColumn.defaultValueExpression().orElse(null));
Assert.assertEquals(nonNumericalConvertedValue, Optional.of(" 1 "));
}

View File

@ -433,12 +433,6 @@ public String retrieveRealDatabaseName(String databaseName) {
}
}
@Override
protected Optional<Object> getDefaultValue(Column column, String defaultValue) {
return defaultValueConverter
.parseDefaultValue(column, defaultValue);
}
@Override
protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) {
// SQL Server provides indices also without index name
@ -490,4 +484,8 @@ public String quotedTableIdString(TableId tableId) {
private String replaceDatabaseNamePlaceholder(String sql, String databaseName) {
return sql.replace(DATABASE_NAME_PLACEHOLDER, databaseName);
}
public SqlServerDefaultValueConverter getDefaultValueConverter() {
return defaultValueConverter;
}
}

View File

@ -77,7 +77,7 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
metadataConnection = new SqlServerConnection(jdbcConfig, connectorConfig.getSourceTimestampMode(), valueConverters, () -> getClass().getClassLoader(),
connectorConfig.getSkippedOperations(), connectorConfig.isMultiPartitionModeEnabled());
this.schema = new SqlServerDatabaseSchema(connectorConfig, metadataConnection::getDefaultValue, valueConverters, topicSelector, schemaNameAdjuster);
this.schema = new SqlServerDatabaseSchema(connectorConfig, metadataConnection.getDefaultValueConverter(), valueConverters, topicSelector, schemaNameAdjuster);
this.schema.initializeStorage();
Offsets<SqlServerPartition, SqlServerOffsetContext> offsets = getPreviousOffsets(

View File

@ -8,7 +8,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@ -30,21 +29,19 @@ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class);
private final DefaultValueConverter defaultValueConverter;
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, DefaultValueConverter defaultValueConverter,
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SqlServerDefaultValueConverter defaultValueConverter,
ValueConverterProvider valueConverter, TopicSelector<TableId> topicSelector,
SchemaNameAdjuster schemaNameAdjuster) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(),
new TableSchemaBuilder(
valueConverter,
defaultValueConverter,
schemaNameAdjuster,
connectorConfig.customConverterRegistry(),
connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.getSanitizeFieldNames(),
connectorConfig.isMultiPartitionModeEnabled()),
false, connectorConfig.getKeyMapper());
this.defaultValueConverter = defaultValueConverter;
}
@Override
@ -74,9 +71,4 @@ protected DdlParser getDdlParser() {
return null;
}
@Override
public DefaultValueConverter getDefaultValueConverter() {
return defaultValueConverter;
}
}

View File

@ -24,6 +24,7 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.ValueConverter;
import io.debezium.util.HexConverter;
@ -33,7 +34,7 @@
* Parses and converts column default values.
*/
@ThreadSafe
class SqlServerDefaultValueConverter {
class SqlServerDefaultValueConverter implements DefaultValueConverter {
private static Logger LOGGER = LoggerFactory.getLogger(SqlServerDefaultValueConverter.class);
@ -71,7 +72,12 @@ private interface DefaultValueMapper {
this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers());
}
Optional<Object> parseDefaultValue(Column column, String defaultValue) {
@Override
public Optional<Object> parseDefaultValue(Column column, String defaultValueExpression) {
if (defaultValueExpression == null) {
return Optional.empty();
}
final String dataType = column.typeName();
final DefaultValueMapper mapper = defaultValueMappers.get(dataType);
if (mapper == null) {
@ -80,12 +86,12 @@ Optional<Object> parseDefaultValue(Column column, String defaultValue) {
}
try {
Object rawDefaultValue = mapper.parse(defaultValue);
Object rawDefaultValue = mapper.parse(defaultValueExpression);
Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, column);
return Optional.ofNullable(convertedDefaultValue);
}
catch (Exception e) {
LOGGER.warn("Cannot parse column default value '{}' to type '{}'. Expression evaluation is not supported.", defaultValue, dataType);
LOGGER.warn("Cannot parse column default value '{}' to type '{}'. Expression evaluation is not supported.", defaultValueExpression, dataType);
LOGGER.debug("Parsing failed due to error", e);
return Optional.empty();
}

View File

@ -17,6 +17,8 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.Before;
@ -24,9 +26,15 @@
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Testing;
/**
@ -198,57 +206,78 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception {
null, 0, null, null, capturedColumns);
Table table = connection.getTableSchemaFromTable(TestHelper.TEST_DATABASE, changeTable);
assertColumnHasNotDefaultValue(table, "int_no_default_not_null");
assertColumnHasDefaultValue(table, "int_no_default", null);
TableSchemaBuilder tableSchemaBuilder = new TableSchemaBuilder(
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
connection.getDefaultValueConverter(),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
assertColumnHasDefaultValue(table, "bigint_column", 3147483648L);
assertColumnHasDefaultValue(table, "int_column", 2147483647);
assertColumnHasDefaultValue(table, "smallint_column", (short) 32767);
assertColumnHasDefaultValue(table, "tinyint_column", (short) 255);
assertColumnHasDefaultValue(table, "bit_column", true);
assertColumnHasNotDefaultValue(table, "int_no_default_not_null");
assertColumnHasDefaultValue(table, "int_no_default", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "bigint_column", 3147483648L, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "int_column", 2147483647, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "smallint_column", (short) 32767, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "tinyint_column", (short) 255, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "bit_column", true, tableSchemaBuilder);
// The expected BugDecimal must have the correct scale.
assertColumnHasDefaultValue(table, "decimal_column", new BigDecimal("100.12345"));
assertColumnHasDefaultValue(table, "decimal_mismatch_default", new BigDecimal("200.10000"));
assertColumnHasDefaultValue(table, "numeric_column", new BigDecimal("200.123"));
assertColumnHasDefaultValue(table, "numeric_mismatch_default", new BigDecimal("200.100"));
assertColumnHasDefaultValue(table, "money_column", new BigDecimal("922337203685477.5800"));
assertColumnHasDefaultValue(table, "money_mismatch_default", new BigDecimal("922337203685477.0000"));
assertColumnHasDefaultValue(table, "smallmoney_column", new BigDecimal("214748.3647"));
assertColumnHasDefaultValue(table, "smallmoney_mismatch_default", new BigDecimal("922337203685477.0000"));
assertColumnHasDefaultValue(table, "float_column", 123.45);
assertColumnHasDefaultValue(table, "real_column", 1234.5f);
assertColumnHasDefaultValue(table, "date_column", 17930);
assertColumnHasDefaultValue(table, "datetime_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 790_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetime2_column", toNanos(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_456_700, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetime2_0_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 0, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetime2_1_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 100_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetime2_2_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 120_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetime2_3_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetime2_4_column", toMicros(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_400_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetime2_5_column", toMicros(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_450_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetime2_6_column", toMicros(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_456_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetime2_7_column", toNanos(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_456_700, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "datetimeoffset_column", "2019-01-01T00:00:00.1234567+02:00");
assertColumnHasDefaultValue(table, "smalldatetime_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 0, 0, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "time_column", toNanos(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "time_0_column", (int) toMillis(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 0, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "time_1_column", (int) toMillis(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 100_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "time_2_column", (int) toMillis(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 120_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "time_3_column", (int) toMillis(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "decimal_column", new BigDecimal("100.12345"), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "decimal_mismatch_default", new BigDecimal("200.10000"), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "numeric_column", new BigDecimal("200.123"), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "numeric_mismatch_default", new BigDecimal("200.100"), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "money_column", new BigDecimal("922337203685477.5800"), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "money_mismatch_default", new BigDecimal("922337203685477.0000"), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "smallmoney_column", new BigDecimal("214748.3647"), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "smallmoney_mismatch_default", new BigDecimal("922337203685477.0000"), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "float_column", 123.45, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "real_column", 1234.5f, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "date_column", 17930, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 790_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime2_column", toNanos(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_456_700, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime2_0_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 0, databaseZoneOffset)), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime2_1_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 100_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime2_2_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 120_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime2_3_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime2_4_column", toMicros(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_400_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime2_5_column", toMicros(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_450_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime2_6_column", toMicros(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_456_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetime2_7_column", toNanos(OffsetDateTime.of(2019, 1, 1, 12, 34, 56, 123_456_700, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "datetimeoffset_column", "2019-01-01T00:00:00.1234567+02:00", tableSchemaBuilder);
assertColumnHasDefaultValue(table, "smalldatetime_column", toMillis(OffsetDateTime.of(2019, 1, 1, 12, 34, 0, 0, databaseZoneOffset)), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "time_column", toNanos(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "time_0_column", (int) toMillis(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 0, databaseZoneOffset)), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "time_1_column", (int) toMillis(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 100_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "time_2_column", (int) toMillis(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 120_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "time_3_column", (int) toMillis(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)),
tableSchemaBuilder);
// JDBC connector does not support full precision for type time(n), n = 4, 5, 6, 7
assertColumnHasDefaultValue(table, "time_4_column", toMicros(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "time_5_column", toMicros(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "time_6_column", toMicros(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "time_7_column", toNanos(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)));
assertColumnHasDefaultValue(table, "char_column", "aaa");
assertColumnHasDefaultValue(table, "varchar_column", "bbb");
assertColumnHasDefaultValue(table, "text_column", "ccc");
assertColumnHasDefaultValue(table, "nchar_column", "ddd");
assertColumnHasDefaultValue(table, "nvarchar_column", "eee");
assertColumnHasDefaultValue(table, "ntext_column", "fff");
assertColumnHasDefaultValue(table, "binary_column", ByteBuffer.wrap(new byte[]{ 1, 2, 3, 4, 5 }));
assertColumnHasDefaultValue(table, "varbinary_column", ByteBuffer.wrap(new byte[]{ 1, 2, 3, 4, 5, 6 }));
assertColumnHasDefaultValue(table, "image_column", ByteBuffer.wrap(new byte[]{ 1, 2, 3, 4, 5, 6, 7 }));
assertColumnHasDefaultValue(table, "time_4_column", toMicros(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "time_5_column", toMicros(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "time_6_column", toMicros(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "time_7_column", toNanos(OffsetDateTime.of(1970, 1, 1, 12, 34, 56, 123_000_000, databaseZoneOffset)),
tableSchemaBuilder);
assertColumnHasDefaultValue(table, "char_column", "aaa", tableSchemaBuilder);
assertColumnHasDefaultValue(table, "varchar_column", "bbb", tableSchemaBuilder);
assertColumnHasDefaultValue(table, "text_column", "ccc", tableSchemaBuilder);
assertColumnHasDefaultValue(table, "nchar_column", "ddd", tableSchemaBuilder);
assertColumnHasDefaultValue(table, "nvarchar_column", "eee", tableSchemaBuilder);
assertColumnHasDefaultValue(table, "ntext_column", "fff", tableSchemaBuilder);
assertColumnHasDefaultValue(table, "binary_column", ByteBuffer.wrap(new byte[]{ 1, 2, 3, 4, 5 }), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "varbinary_column", ByteBuffer.wrap(new byte[]{ 1, 2, 3, 4, 5, 6 }), tableSchemaBuilder);
assertColumnHasDefaultValue(table, "image_column", ByteBuffer.wrap(new byte[]{ 1, 2, 3, 4, 5, 6, 7 }), tableSchemaBuilder);
}
}
@ -347,35 +376,40 @@ public void shouldProperlyGetDefaultColumnNullValues() throws Exception {
null, 0, null, null, capturedColumns);
Table table = connection.getTableSchemaFromTable(TestHelper.TEST_DATABASE, changeTable);
TableSchemaBuilder tableSchemaBuilder = new TableSchemaBuilder(
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
connection.getDefaultValueConverter(),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
assertColumnHasNotDefaultValue(table, "int_no_default_not_null");
assertColumnHasDefaultValue(table, "int_no_default", null);
assertColumnHasDefaultValue(table, "int_default_null", null);
assertColumnHasDefaultValue(table, "int_column", 2147483647);
assertColumnHasDefaultValue(table, "int_no_default", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "int_default_null", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "int_column", 2147483647, tableSchemaBuilder);
assertColumnHasNotDefaultValue(table, "bigint_no_default_not_null");
assertColumnHasDefaultValue(table, "bigint_no_default", null);
assertColumnHasDefaultValue(table, "bigint_default_null", null);
assertColumnHasDefaultValue(table, "bigint_column", 3147483648L);
assertColumnHasDefaultValue(table, "bigint_no_default", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "bigint_default_null", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "bigint_column", 3147483648L, tableSchemaBuilder);
assertColumnHasNotDefaultValue(table, "smallint_no_default_not_null");
assertColumnHasDefaultValue(table, "smallint_no_default", null);
assertColumnHasDefaultValue(table, "smallint_default_null", null);
assertColumnHasDefaultValue(table, "smallint_column", (short) 32767);
assertColumnHasDefaultValue(table, "smallint_no_default", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "smallint_default_null", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "smallint_column", (short) 32767, tableSchemaBuilder);
assertColumnHasNotDefaultValue(table, "tinyint_no_default_not_null");
assertColumnHasDefaultValue(table, "tinyint_no_default", null);
assertColumnHasDefaultValue(table, "tinyint_default_null", null);
assertColumnHasDefaultValue(table, "tinyint_column", (short) 255);
assertColumnHasDefaultValue(table, "tinyint_no_default", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "tinyint_default_null", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "tinyint_column", (short) 255, tableSchemaBuilder);
assertColumnHasNotDefaultValue(table, "float_no_default_not_null");
assertColumnHasDefaultValue(table, "float_no_default", null);
assertColumnHasDefaultValue(table, "float_default_null", null);
assertColumnHasDefaultValue(table, "float_column", 123.45);
assertColumnHasDefaultValue(table, "float_no_default", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "float_default_null", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "float_column", 123.45, tableSchemaBuilder);
assertColumnHasNotDefaultValue(table, "real_no_default_not_null");
assertColumnHasDefaultValue(table, "real_no_default", null);
assertColumnHasDefaultValue(table, "real_default_null", null);
assertColumnHasDefaultValue(table, "real_column", 1234.5f);
assertColumnHasDefaultValue(table, "real_no_default", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "real_default_null", null, tableSchemaBuilder);
assertColumnHasDefaultValue(table, "real_column", 1234.5f, tableSchemaBuilder);
}
}
@ -403,13 +437,16 @@ private void assertColumnHasNotDefaultValue(Table table, String columnName) {
Assertions.assertThat(column.hasDefaultValue()).isFalse();
}
private void assertColumnHasDefaultValue(Table table, String columnName, Object expectedValue) {
private void assertColumnHasDefaultValue(Table table, String columnName, Object expectedValue, TableSchemaBuilder tableSchemaBuilder) {
TableSchema schema = tableSchemaBuilder.create("test", "dummy", table, null, null, null);
Schema columnSchema = schema.getEnvelopeSchema().schema().field("after").schema().field(columnName).schema();
Column column = table.columnWithName(columnName);
Assertions.assertThat(column.hasDefaultValue()).isTrue();
Assertions.assertThat(column.defaultValue()).isEqualTo(expectedValue);
Assertions.assertThat(columnSchema.defaultValue()).isEqualTo(expectedValue);
if (expectedValue instanceof BigDecimal) {
// safe cast as we know the expectedValue and column.defaultValue are equal
BigDecimal columnValue = (BigDecimal) column.defaultValue();
BigDecimal columnValue = (BigDecimal) columnSchema.defaultValue();
BigDecimal expectedBigDecimal = (BigDecimal) expectedValue;
Assertions.assertThat(column.scale().isPresent()).isTrue();
int columnScale = column.scale().get();

View File

@ -1283,7 +1283,6 @@ protected Optional<ColumnEditor> readTableColumn(ResultSet columnMetadata, Table
column.jdbcType(resolveJdbcType(columnMetadata.getInt(5), column.nativeType()));
if (defaultValue != null) {
column.defaultValueExpression(defaultValue);
getDefaultValue(column.create(), defaultValue).ifPresent(column::defaultValue);
}
return Optional.of(column);
}
@ -1291,11 +1290,6 @@ protected Optional<ColumnEditor> readTableColumn(ResultSet columnMetadata, Table
return Optional.empty();
}
protected Optional<Object> getDefaultValue(Column column, String defaultValue) {
// nothing to do by default; overwrite in database specific implementation
return Optional.empty();
}
public List<String> readPrimaryKeyNames(DatabaseMetaData metadata, TableId id) throws SQLException {
final List<String> pkColumnNames = new ArrayList<>();
try (ResultSet rs = metadata.getPrimaryKeys(id.catalog(), id.schema(), id.table())) {

View File

@ -12,8 +12,6 @@
import io.debezium.document.Array;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.signal.Signal.Payload;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.TableId;
import io.debezium.relational.history.JsonTableChangeSerializer;
@ -58,13 +56,7 @@ public boolean arrived(Payload signalPayload) throws InterruptedException {
LOGGER.warn("Table changes signal '{}' has arrived but the requested field '{}' is missing from data", signalPayload, FIELD_DATABASE);
return false;
}
DefaultValueConverter defaultValueConverter = null;
if (dispatcher.getHistorizedSchema() instanceof HistorizedRelationalDatabaseSchema) {
defaultValueConverter = ((HistorizedRelationalDatabaseSchema) dispatcher.getHistorizedSchema()).getDefaultValueConverter();
}
for (TableChanges.TableChange tableChange : serializer.deserialize(changes, useCatalogBeforeSchema, defaultValueConverter)) {
for (TableChanges.TableChange tableChange : serializer.deserialize(changes, useCatalogBeforeSchema)) {
if (dispatcher.getHistorizedSchema() != null) {
LOGGER.info("Executing schema change for table '{}' requested by signal '{}'", tableChange.getId(), signalPayload.id);
dispatcher.dispatchSchemaChangeEvent(tableChange.getId(), emitter -> {

View File

@ -133,14 +133,7 @@ default boolean isRequired() {
*
* @return the complete type expression
*/
String defaultValueExpression();
/**
* Get the default value of the column
*
* @return the default value
*/
Object defaultValue();
Optional<String> defaultValueExpression();
/**
* Determine whether this column's has a default value

View File

@ -121,13 +121,6 @@ public interface ColumnEditor {
*/
String defaultValueExpression();
/**
* Get the default value of the column.
*
* @return the default value
*/
Object defaultValue();
/**
* Determine whether this column's has a default value set
*
@ -255,14 +248,6 @@ public interface ColumnEditor {
*/
ColumnEditor position(int position);
/**
* Set the default value of the column;
*
* @param defaultValue the default value
* @return this editor so callers can chain methods together
*/
ColumnEditor defaultValue(Object defaultValue);
/**
* Set the default value expression of the column;
*
@ -287,15 +272,7 @@ public interface ColumnEditor {
ColumnEditor comment(String comment);
/**
* Unsets the default value of the column, reverting the editor to the state where {@link #defaultValue(Object))}
* has never been called
*
* @return this editor so callers can chain methods together
*/
ColumnEditor unsetDefaultValue();
/**
* Unsets the default value expression of the column, reverting the editor to the state where {@link #defaultValueExpression(Object))}
* Unsets the default value expression of the column, reverting the editor to the state where {@link #defaultValueExpression(String))}
* has never been called
*
* @return this editor so callers can chain methods together

View File

@ -24,7 +24,6 @@ final class ColumnEditorImpl implements ColumnEditor {
private boolean optional = true;
private boolean autoIncremented = false;
private boolean generated = false;
private Object defaultValue = null;
private String defaultValueExpression = null;
private boolean hasDefaultValue = false;
private List<String> enumValues;
@ -103,11 +102,6 @@ public String defaultValueExpression() {
return defaultValueExpression;
}
@Override
public Object defaultValue() {
return defaultValue;
}
@Override
public boolean hasDefaultValue() {
return hasDefaultValue;
@ -185,7 +179,7 @@ public ColumnEditorImpl optional(boolean optional) {
this.optional = optional;
if (optional && !hasDefaultValue()) {
// Optional columns have implicit NULL default value
defaultValue(null);
defaultValueExpression(null);
}
return this;
}
@ -208,28 +202,16 @@ public ColumnEditorImpl position(int position) {
return this;
}
@Override
public ColumnEditor defaultValue(final Object defaultValue) {
this.hasDefaultValue = true;
this.defaultValue = defaultValue;
return this;
}
@Override
public ColumnEditor defaultValueExpression(String defaultValueExpression) {
this.hasDefaultValue = true;
this.defaultValueExpression = defaultValueExpression;
return this;
}
@Override
public ColumnEditor unsetDefaultValue() {
this.hasDefaultValue = false;
this.defaultValue = null;
return this;
}
@Override
public ColumnEditor unsetDefaultValueExpression() {
this.hasDefaultValue = false;
this.defaultValueExpression = null;
return this;
}
@ -249,7 +231,7 @@ public ColumnEditor comment(String comment) {
@Override
public Column create() {
return new ColumnImpl(name, position, jdbcType, nativeType, typeName, typeExpression, charsetName, tableCharsetName,
length, scale, enumValues, optional, autoIncremented, generated, defaultValue, defaultValueExpression, hasDefaultValue, comment);
length, scale, enumValues, optional, autoIncremented, generated, defaultValueExpression, hasDefaultValue, comment);
}
@Override

View File

@ -25,7 +25,6 @@ final class ColumnImpl implements Column, Comparable<Column> {
private final boolean optional;
private final boolean autoIncremented;
private final boolean generated;
private final Object defaultValue;
private final String defaultValueExpression;
private final boolean hasDefaultValue;
private final List<String> enumValues;
@ -35,20 +34,20 @@ protected ColumnImpl(String columnName, int position, int jdbcType, int componen
String charsetName, String defaultCharsetName, int columnLength, Integer columnScale,
boolean optional, boolean autoIncremented, boolean generated) {
this(columnName, position, jdbcType, componentType, typeName, typeExpression, charsetName,
defaultCharsetName, columnLength, columnScale, null, optional, autoIncremented, generated, null, null, false, null);
defaultCharsetName, columnLength, columnScale, null, optional, autoIncremented, generated, null, false, null);
}
protected ColumnImpl(String columnName, int position, int jdbcType, int nativeType, String typeName, String typeExpression,
String charsetName, String defaultCharsetName, int columnLength, Integer columnScale,
boolean optional, boolean autoIncremented, boolean generated, Object defaultValue, String defaultValueExpression, boolean hasDefaultValue) {
boolean optional, boolean autoIncremented, boolean generated, String defaultValueExpression, boolean hasDefaultValue) {
this(columnName, position, jdbcType, nativeType, typeName, typeExpression, charsetName,
defaultCharsetName, columnLength, columnScale, null, optional, autoIncremented, generated, defaultValue, defaultValueExpression, hasDefaultValue, null);
defaultCharsetName, columnLength, columnScale, null, optional, autoIncremented, generated, defaultValueExpression, hasDefaultValue, null);
}
protected ColumnImpl(String columnName, int position, int jdbcType, int nativeType, String typeName, String typeExpression,
String charsetName, String defaultCharsetName, int columnLength, Integer columnScale,
List<String> enumValues, boolean optional, boolean autoIncremented, boolean generated,
Object defaultValue, String defaultValueExpression, boolean hasDefaultValue, String comment) {
String defaultValueExpression, boolean hasDefaultValue, String comment) {
this.name = columnName;
this.position = position;
this.jdbcType = jdbcType;
@ -66,7 +65,6 @@ protected ColumnImpl(String columnName, int position, int jdbcType, int nativeTy
this.optional = optional;
this.autoIncremented = autoIncremented;
this.generated = generated;
this.defaultValue = defaultValue;
this.defaultValueExpression = defaultValueExpression;
this.hasDefaultValue = hasDefaultValue;
this.enumValues = enumValues == null ? new ArrayList<>() : enumValues;
@ -135,13 +133,8 @@ public boolean isGenerated() {
}
@Override
public String defaultValueExpression() {
return defaultValueExpression;
}
@Override
public Object defaultValue() {
return defaultValue;
public Optional<String> defaultValueExpression() {
return Optional.ofNullable(defaultValueExpression);
}
@Override
@ -182,7 +175,6 @@ public boolean equals(Object obj) {
this.isOptional() == that.isOptional() &&
this.isAutoIncremented() == that.isAutoIncremented() &&
this.isGenerated() == that.isGenerated() &&
Objects.equals(this.defaultValue(), that.defaultValue()) &&
Objects.equals(this.defaultValueExpression(), that.defaultValueExpression()) &&
this.hasDefaultValue() == that.hasDefaultValue() &&
this.enumValues().equals(that.enumValues());
@ -239,11 +231,10 @@ public ColumnEditor edit() {
.optional(isOptional())
.autoIncremented(isAutoIncremented())
.generated(isGenerated())
.defaultValueExpression(defaultValueExpression())
.enumValues(enumValues)
.comment(comment);
if (hasDefaultValue()) {
editor.defaultValue(defaultValue());
editor.defaultValueExpression(defaultValueExpression().orElse(null));
}
return editor;
}

View File

@ -54,7 +54,7 @@ public CustomConverterRegistry(List<CustomConverter<SchemaBuilder, ConvertedFiel
* @param column the column metadata
* @return the schema of the value generated by the converter or empty if converter does not support the column
*/
public synchronized Optional<SchemaBuilder> registerConverterFor(TableId table, Column column) {
public synchronized Optional<SchemaBuilder> registerConverterFor(TableId table, Column column, Object defaultValue) {
final String fullColumnName = fullColumnName(table, column);
for (CustomConverter<SchemaBuilder, ConvertedField> converter : converters) {
@ -113,7 +113,7 @@ public boolean hasDefaultValue() {
@Override
public Object defaultValue() {
return column.defaultValue();
return defaultValue;
}
},
new CustomConverter.ConverterRegistration<SchemaBuilder>() {

View File

@ -18,25 +18,22 @@
public interface DefaultValueConverter {
/**
* This interface is used to convert the string default value to a Java type
* This interface is used to convert the default value literal to a Java type
* recognized by value converters for a subset of types.
*
* @param column the column definition describing the {@code data} value; never null
* @param defaultValue the default value; may be null
* @param defaultValueExpression the default value literal; may be null
* @return value converted to a Java type; optional
*/
Optional<Object> parseDefaultValue(Column column, String defaultValue);
Optional<Object> parseDefaultValue(Column column, String defaultValueExpression);
/**
* Parse default value expression and set column's default value.
* @param columnEditor
* @return
* Obtain a DefaultValueConverter that passes through values.
*
* @return the pass-through DefaultValueConverter; never null
*/
default ColumnEditor setColumnDefaultValue(ColumnEditor columnEditor) {
Column column = columnEditor.create();
return parseDefaultValue(column, column.defaultValueExpression())
.map(defaultValue -> column.edit().defaultValue(defaultValue))
.orElse(column.edit());
static DefaultValueConverter passthrough() {
return (column, defaultValueExpression) -> Optional.ofNullable(defaultValueExpression);
}
}

View File

@ -7,6 +7,7 @@
import java.sql.Types;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -54,6 +55,7 @@ public class TableSchemaBuilder {
private final SchemaNameAdjuster schemaNameAdjuster;
private final ValueConverterProvider valueConverterProvider;
private final DefaultValueConverter defaultValueConverter;
private final Schema sourceInfoSchema;
private final FieldNamer<Column> fieldNamer;
private final CustomConverterRegistry customConverterRegistry;
@ -66,11 +68,34 @@ public class TableSchemaBuilder {
* null
* @param schemaNameAdjuster the adjuster for schema names; may not be null
*/
public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, SchemaNameAdjuster schemaNameAdjuster,
CustomConverterRegistry customConverterRegistry, Schema sourceInfoSchema,
public TableSchemaBuilder(ValueConverterProvider valueConverterProvider,
SchemaNameAdjuster schemaNameAdjuster,
CustomConverterRegistry customConverterRegistry,
Schema sourceInfoSchema,
boolean sanitizeFieldNames, boolean multiPartitionMode) {
this(valueConverterProvider, null, schemaNameAdjuster,
customConverterRegistry, sourceInfoSchema, sanitizeFieldNames, multiPartitionMode);
}
/**
* Create a new instance of the builder.
*
* @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be
* null
* @param defaultValueConverter is used to convert the default value literal to a Java type
* recognized by value converters for a subset of types. may be null.
* @param schemaNameAdjuster the adjuster for schema names; may not be null
*/
public TableSchemaBuilder(ValueConverterProvider valueConverterProvider,
DefaultValueConverter defaultValueConverter,
SchemaNameAdjuster schemaNameAdjuster,
CustomConverterRegistry customConverterRegistry,
Schema sourceInfoSchema,
boolean sanitizeFieldNames, boolean multiPartitionMode) {
this.schemaNameAdjuster = schemaNameAdjuster;
this.valueConverterProvider = valueConverterProvider;
this.defaultValueConverter = Optional.ofNullable(defaultValueConverter)
.orElse(DefaultValueConverter.passthrough());
this.sourceInfoSchema = sourceInfoSchema;
this.fieldNamer = FieldNameSelector.defaultSelector(sanitizeFieldNames);
this.customConverterRegistry = customConverterRegistry;
@ -362,7 +387,11 @@ private ValueConverter wrapInMappingConverterIfNeeded(ColumnMappers mappers, Tab
* @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values
*/
protected void addField(SchemaBuilder builder, Table table, Column column, ColumnMapper mapper) {
final SchemaBuilder fieldBuilder = customConverterRegistry.registerConverterFor(table.id(), column)
final Object defaultValue = column.defaultValueExpression()
.flatMap(e -> defaultValueConverter.parseDefaultValue(column, e))
.orElse(null);
final SchemaBuilder fieldBuilder = customConverterRegistry.registerConverterFor(table.id(), column, defaultValue)
.orElse(valueConverterProvider.schemaBuilder(column));
if (fieldBuilder != null) {
@ -377,7 +406,8 @@ protected void addField(SchemaBuilder builder, Table table, Column column, Colum
// if the default value is provided
if (column.hasDefaultValue()) {
fieldBuilder
.defaultValue(customConverterRegistry.getValueConverter(table.id(), column).orElse(ValueConverter.passthrough()).convert(column.defaultValue()));
.defaultValue(customConverterRegistry.getValueConverter(table.id(), column)
.orElse(ValueConverter.passthrough()).convert(defaultValue));
}
builder.field(fieldNamer.fieldNameFor(column), fieldBuilder.build());

View File

@ -16,7 +16,6 @@
import org.slf4j.LoggerFactory;
import io.debezium.relational.Column;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.util.SchemaNameAdjuster;
@ -145,7 +144,7 @@ private Struct toStruct(Column column) {
}
@Override
public TableChanges deserialize(List<Struct> data, boolean useCatalogBeforeSchema, DefaultValueConverter defaultValueConverter) {
public TableChanges deserialize(List<Struct> data, boolean useCatalogBeforeSchema) {
throw new UnsupportedOperationException("Deserialization from Connect Struct is not supported");
}
}

View File

@ -16,7 +16,6 @@
import io.debezium.document.Value;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
@ -93,8 +92,7 @@ private Document toDocument(Column column) {
document.setBoolean("generated", column.isGenerated());
document.setString("comment", column.comment());
document.setBoolean("hasDefaultValue", column.hasDefaultValue());
document.setString("defaultValueExpression", column.defaultValueExpression());
column.defaultValueExpression().ifPresent(d -> document.setString("defaultValueExpression", d));
Optional.ofNullable(column.enumValues())
.map(List::toArray)
@ -104,11 +102,11 @@ private Document toDocument(Column column) {
}
@Override
public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema, DefaultValueConverter defaultValueConverter) {
public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema) {
TableChanges tableChanges = new TableChanges();
for (Entry entry : array) {
TableChange change = fromDocument(entry.getValue().asDocument(), useCatalogBeforeSchema, defaultValueConverter);
TableChange change = fromDocument(entry.getValue().asDocument(), useCatalogBeforeSchema);
if (change.getType() == TableChangeType.CREATE) {
tableChanges.create(change.getTable());
@ -124,7 +122,7 @@ else if (change.getType() == TableChangeType.DROP) {
return tableChanges;
}
private static Table fromDocument(TableId id, Document document, DefaultValueConverter defaultValueConverter) {
private static Table fromDocument(TableId id, Document document) {
TableEditor editor = Table.editor()
.tableId(id)
.setDefaultCharsetName(document.getString("defaultCharsetName"));
@ -164,16 +162,8 @@ private static Table fromDocument(TableId id, Document document, DefaultValueCon
}
String defaultValueExpression = v.getString("defaultValueExpression");
columnEditor.defaultValueExpression(defaultValueExpression);
Boolean hasDefaultValue = v.getBoolean("hasDefaultValue");
if (hasDefaultValue != null && hasDefaultValue) {
if (defaultValueExpression != null && defaultValueConverter != null) {
columnEditor = defaultValueConverter.setColumnDefaultValue(columnEditor);
}
else {
columnEditor = columnEditor.defaultValue(null);
}
if (defaultValueExpression != null) {
columnEditor.defaultValueExpression(defaultValueExpression);
}
Array enumValues = v.getArray("enumValues");
@ -201,13 +191,13 @@ private static Table fromDocument(TableId id, Document document, DefaultValueCon
return editor.create();
}
public static TableChange fromDocument(Document document, boolean useCatalogBeforeSchema, DefaultValueConverter defaultValueConverter) {
public static TableChange fromDocument(Document document, boolean useCatalogBeforeSchema) {
TableChangeType type = TableChangeType.valueOf(document.getString("type"));
TableId id = TableId.parse(document.getString("id"), useCatalogBeforeSchema);
Table table = null;
if (type == TableChangeType.CREATE || type == TableChangeType.ALTER) {
table = fromDocument(id, document.getDocument("table"), defaultValueConverter);
table = fromDocument(id, document.getDocument("table"));
}
else {
table = Table.editor().tableId(id).create();

View File

@ -9,7 +9,6 @@
import java.util.Iterator;
import java.util.List;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
@ -164,7 +163,7 @@ public static interface TableChangesSerializer<T> {
* the catalog and the second as the table name, or false if the first should be used as the schema and the
* second as the table name
*/
TableChanges deserialize(T data, boolean useCatalogBeforeSchema, DefaultValueConverter defaultValueConverter);
TableChanges deserialize(T data, boolean useCatalogBeforeSchema);
}
public enum TableChangeType {

View File

@ -140,14 +140,14 @@ public void checkPreconditions() {
@Test(expected = NullPointerException.class)
public void shouldFailToBuildTableSchemaFromNullTable() {
new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", null, null, null, null);
}
@Test
public void shouldBuildTableSchemaFromTable() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull();
@ -157,7 +157,7 @@ public void shouldBuildTableSchemaFromTable() {
@FixFor("DBZ-1089")
public void shouldBuildCorrectSchemaNames() {
// table id with catalog and schema
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull();
@ -169,7 +169,7 @@ public void shouldBuildCorrectSchemaNames() {
.tableId(new TableId("testDb", null, "testTable"))
.create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
@ -182,7 +182,7 @@ public void shouldBuildCorrectSchemaNames() {
.tableId(new TableId(null, "testSchema", "testTable"))
.create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
@ -195,7 +195,7 @@ public void shouldBuildCorrectSchemaNames() {
.tableId(new TableId(null, null, "testTable"))
.create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
@ -208,7 +208,7 @@ public void shouldBuildCorrectSchemaNames() {
@FixFor("DBZ-2975")
public void shouldBuildCorrectSchemaNamesInMultiPartitionMode() {
// table id with catalog and schema
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, true)
.create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull();
@ -219,7 +219,7 @@ public void shouldBuildCorrectSchemaNamesInMultiPartitionMode() {
@Test
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull();
@ -280,7 +280,7 @@ public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
@FixFor("DBZ-1044")
public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKey() {
table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), true, false)
.create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull();
@ -348,7 +348,7 @@ public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKe
public void shouldSanitizeFieldNamesAndValidateSerialization() {
LogInterceptor logInterceptor = new LogInterceptor(TableSchemaBuilder.class);
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), true, false)
.create(prefix, "sometopic", table, null, null, null);
@ -367,7 +367,7 @@ public void shouldSanitizeFieldNamesAndValidateSerialization() {
@FixFor("DBZ-1015")
public void shouldBuildTableSchemaFromTableWithCustomKey() {
table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
assertThat(schema).isNotNull();
@ -381,7 +381,7 @@ public void shouldBuildTableSchemaFromTableWithCustomKey() {
@Test
@FixFor("DBZ-1015")
public void shouldOverrideIdentityKey() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
assertThat(schema).isNotNull();
@ -396,7 +396,7 @@ public void shouldOverrideIdentityKey() {
@Test
@FixFor("DBZ-1015")
public void shouldFallbackToIdentyKeyWhenCustomMapperIsNull() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull();
@ -428,7 +428,7 @@ public void customKeyMapperShouldMapMultipleTables() {
KeyMapper keyMapper = CustomKeyMapper.getInstance("(.*).table:C2,C3;(.*).table2:C1", null);
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, keyMapper);
@ -440,7 +440,7 @@ public void customKeyMapperShouldMapMultipleTables() {
assertThat(keys.field("C2").name()).isEqualTo("C2");
assertThat(keys.field("C3").name()).isEqualTo("C3");
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table2, null, null, keyMapper);
@ -468,7 +468,7 @@ public void defaultKeyMapperShouldOrderKeyColumnsBasedOnPrimaryKeyColumnNamesOrd
.setPrimaryKeyNames("t2ID", "t1ID")
.create();
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table2, null, null, null);
@ -500,11 +500,12 @@ public void mapperConvertersShouldLeaveEmptyDatesAsZero() {
ColumnMappers mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0));
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table2, null, mappers, null);
Struct value = schema.valueFromColumnData(data);
assertThat(value.get("C1")).isEqualTo(0);
}
}

View File

@ -10,7 +10,6 @@
import java.sql.Types;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;
@ -18,7 +17,6 @@
import io.debezium.document.Array;
import io.debezium.document.DocumentReader;
import io.debezium.relational.Column;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChangesSerializer;
@ -54,7 +52,6 @@ public void canSerializeAndDeserializeHistoryRecord() throws Exception {
.type("ENUM", "ENUM")
.optional(false)
.defaultValueExpression("1")
.defaultValue("1")
.enumValues(Collect.arrayListOf("1", "2"))
.create())
.setPrimaryKeyNames("first")
@ -83,15 +80,9 @@ public void canSerializeAndDeserializeHistoryRecord() throws Exception {
assertThat(deserialized.ddl()).isEqualTo(ddl);
System.out.println(record);
final DefaultValueConverter mockDefaultValueConverter = new DefaultValueConverter() {
@Override
public Optional<Object> parseDefaultValue(Column column, String defaultValue) {
return Optional.ofNullable(defaultValue);
}
};
final TableChangesSerializer<Array> tableChangesSerializer = new JsonTableChangeSerializer();
assertThat((Object) tableChangesSerializer.deserialize(deserialized.tableChanges(), true, mockDefaultValueConverter)).isEqualTo(tableChanges);
assertThat((Object) tableChangesSerializer.deserialize(deserialized.tableChanges(), true)).isEqualTo(tableChanges);
final TableChangesSerializer<List<Struct>> connectTableChangeSerializer = new ConnectTableChangeSerializer();
Struct struct = connectTableChangeSerializer.serialize(tableChanges).get(0);