DBZ-3361 Prefer DDL over logical schema for MySQL

This commit is contained in:
Jiri Pechanec 2021-03-29 14:41:06 +02:00 committed by René Kerner
parent 776180a0df
commit f50c3e8222
3 changed files with 95 additions and 2 deletions

View File

@ -29,6 +29,7 @@
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.relational.TableId;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
@ -59,7 +60,10 @@ public String version() {
@Override
public ChangeEventSourceCoordinator start(Configuration config) {
final Clock clock = Clock.system();
final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(config);
final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(
config.edit()
.with(AbstractDatabaseHistory.INTERNAL_PREFER_DDL, true)
.build());
final TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
final MySqlValueConverters valueConverters = getValueConverters(connectorConfig);

View File

@ -11,6 +11,8 @@
import java.math.BigDecimal;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.time.Duration;
import java.time.Instant;
@ -327,6 +329,76 @@ public void stringTest() throws InterruptedException {
assertEmptyFieldValue(record, "I");
}
@Test
@SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
public void databaseHistorySaveDefaultValuesTest() throws InterruptedException, SQLException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.build();
start(MySqlConnector.class, config);
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("STRING_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
Schema schemaD = record.valueSchema().fields().get(1).schema().fields().get(3).schema();
Schema schemaE = record.valueSchema().fields().get(1).schema().fields().get(4).schema();
Schema schemaF = record.valueSchema().fields().get(1).schema().fields().get(5).schema();
Schema schemaG = record.valueSchema().fields().get(1).schema().fields().get(6).schema();
Schema schemaH = record.valueSchema().fields().get(1).schema().fields().get(7).schema();
assertThat(schemaA.defaultValue()).isEqualTo("A");
assertThat(schemaB.defaultValue()).isEqualTo("b");
assertThat(schemaC.defaultValue()).isEqualTo("CC");
assertThat(schemaD.defaultValue()).isEqualTo("10");
assertThat(schemaE.defaultValue()).isEqualTo("0");
assertThat(schemaF.defaultValue()).isEqualTo(null);
assertThat(schemaG.defaultValue()).isEqualTo(null);
assertThat(schemaH.defaultValue()).isEqualTo(null);
assertEmptyFieldValue(record, "I");
stopConnector();
final String insert = "INSERT INTO STRING_TABLE\n"
+ "VALUES (DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL)";
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
final Connection jdbc = connection.connection();
final Statement statement = jdbc.createStatement();
statement.executeUpdate(insert);
}
}
start(MySqlConnector.class, config);
Testing.Print.enable();
records = consumeRecordsByTopic(1);
record = records.recordsForTopic(DATABASE.topicForTable("STRING_TABLE")).get(0);
validate(record);
schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
schemaD = record.valueSchema().fields().get(1).schema().fields().get(3).schema();
schemaE = record.valueSchema().fields().get(1).schema().fields().get(4).schema();
schemaF = record.valueSchema().fields().get(1).schema().fields().get(5).schema();
schemaG = record.valueSchema().fields().get(1).schema().fields().get(6).schema();
schemaH = record.valueSchema().fields().get(1).schema().fields().get(7).schema();
assertThat(schemaA.defaultValue()).isEqualTo("A");
assertThat(schemaB.defaultValue()).isEqualTo("b");
assertThat(schemaC.defaultValue()).isEqualTo("CC");
assertThat(schemaD.defaultValue()).isEqualTo("10");
assertThat(schemaE.defaultValue()).isEqualTo("0");
assertThat(schemaF.defaultValue()).isEqualTo(null);
assertThat(schemaG.defaultValue()).isEqualTo(null);
assertThat(schemaH.defaultValue()).isEqualTo(null);
assertEmptyFieldValue(record, "I");
}
@Test
@SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
public void unsignedBitTest() throws InterruptedException {

View File

@ -11,10 +11,14 @@
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.Array;
import io.debezium.function.Predicates;
import io.debezium.relational.Tables;
@ -31,6 +35,17 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory {
protected final Logger logger = LoggerFactory.getLogger(getClass());
// Temporary preference for DDL over logical schema due to DBZ-32
public static final Field INTERNAL_PREFER_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "prefer.ddl")
.withDisplayName("Prefer DDL for schema recovery")
.withType(Type.BOOLEAN)
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Prefer DDL for schema reovery in case logica schema is present")
.withInvisibleRecommender()
.withNoValidation();
protected Configuration config;
private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE;
private boolean skipUnparseableDDL;
@ -38,6 +53,7 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory {
private Function<String, Optional<Pattern>> ddlFilter = (x -> Optional.empty());
private DatabaseHistoryListener listener = DatabaseHistoryListener.NOOP;
private boolean useCatalogBeforeSchema;
private boolean preferDdl = false;
private TableChanges.TableChangesSerializer<Array> tableChangesSerializer = new JsonTableChangeSerializer();
protected AbstractDatabaseHistory() {
@ -54,6 +70,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
this.ddlFilter = (ddlFilter != null) ? Predicates.matchedBy(ddlFilter) : this.ddlFilter;
this.listener = listener;
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.preferDdl = config.getBoolean(INTERNAL_PREFER_DDL);
}
@Override
@ -87,7 +104,7 @@ public final void recover(Map<String, ?> source, Map<String, ?> position, Tables
Array tableChanges = recovered.tableChanges();
String ddl = recovered.ddl();
if (tableChanges != null && !tableChanges.isEmpty()) {
if (!preferDdl && tableChanges != null && !tableChanges.isEmpty()) {
TableChanges changes = tableChangesSerializer.deserialize(tableChanges, useCatalogBeforeSchema);
for (TableChange entry : changes) {
if (entry.getType() == TableChangeType.CREATE || entry.getType() == TableChangeType.ALTER) {