From f50c3e82220f909990d2363840127d239d7073fa Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Mon, 29 Mar 2021 14:41:06 +0200 Subject: [PATCH] DBZ-3361 Prefer DDL over logical schema for MySQL --- .../connector/mysql/MySqlConnectorTask.java | 6 +- .../connector/mysql/MysqlDefaultValueIT.java | 72 +++++++++++++++++++ .../history/AbstractDatabaseHistory.java | 19 ++++- 3 files changed, 95 insertions(+), 2 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index 553a40d2b..b7efc4a44 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -29,6 +29,7 @@ import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; import io.debezium.relational.TableId; +import io.debezium.relational.history.AbstractDatabaseHistory; import io.debezium.schema.TopicSelector; import io.debezium.util.Clock; import io.debezium.util.SchemaNameAdjuster; @@ -59,7 +60,10 @@ public String version() { @Override public ChangeEventSourceCoordinator start(Configuration config) { final Clock clock = Clock.system(); - final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(config); + final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig( + config.edit() + .with(AbstractDatabaseHistory.INTERNAL_PREFER_DDL, true) + .build()); final TopicSelector topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig); final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); final MySqlValueConverters valueConverters = getValueConverters(connectorConfig); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java index 540a5ea2a..42c9f69fa 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java @@ -11,6 +11,8 @@ import java.math.BigDecimal; import java.nio.file.Path; import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; import java.sql.Time; import java.time.Duration; import java.time.Instant; @@ -327,6 +329,76 @@ public void stringTest() throws InterruptedException { assertEmptyFieldValue(record, "I"); } + @Test + @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x") + public void databaseHistorySaveDefaultValuesTest() throws InterruptedException, SQLException { + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) + .build(); + start(MySqlConnector.class, config); + + // Testing.Print.enable(); + + SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); + SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("STRING_TABLE")).get(0); + validate(record); + + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); + Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); + Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); + Schema schemaD = record.valueSchema().fields().get(1).schema().fields().get(3).schema(); + Schema schemaE = record.valueSchema().fields().get(1).schema().fields().get(4).schema(); + Schema schemaF = record.valueSchema().fields().get(1).schema().fields().get(5).schema(); + Schema schemaG = record.valueSchema().fields().get(1).schema().fields().get(6).schema(); + Schema schemaH = record.valueSchema().fields().get(1).schema().fields().get(7).schema(); + assertThat(schemaA.defaultValue()).isEqualTo("A"); + assertThat(schemaB.defaultValue()).isEqualTo("b"); + assertThat(schemaC.defaultValue()).isEqualTo("CC"); + assertThat(schemaD.defaultValue()).isEqualTo("10"); + assertThat(schemaE.defaultValue()).isEqualTo("0"); + assertThat(schemaF.defaultValue()).isEqualTo(null); + assertThat(schemaG.defaultValue()).isEqualTo(null); + assertThat(schemaH.defaultValue()).isEqualTo(null); + assertEmptyFieldValue(record, "I"); + + stopConnector(); + final String insert = "INSERT INTO STRING_TABLE\n" + + "VALUES (DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL)"; + try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) { + try (JdbcConnection connection = db.connect()) { + final Connection jdbc = connection.connection(); + final Statement statement = jdbc.createStatement(); + statement.executeUpdate(insert); + } + } + start(MySqlConnector.class, config); + + Testing.Print.enable(); + + records = consumeRecordsByTopic(1); + record = records.recordsForTopic(DATABASE.topicForTable("STRING_TABLE")).get(0); + validate(record); + + schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); + schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); + schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); + schemaD = record.valueSchema().fields().get(1).schema().fields().get(3).schema(); + schemaE = record.valueSchema().fields().get(1).schema().fields().get(4).schema(); + schemaF = record.valueSchema().fields().get(1).schema().fields().get(5).schema(); + schemaG = record.valueSchema().fields().get(1).schema().fields().get(6).schema(); + schemaH = record.valueSchema().fields().get(1).schema().fields().get(7).schema(); + assertThat(schemaA.defaultValue()).isEqualTo("A"); + assertThat(schemaB.defaultValue()).isEqualTo("b"); + assertThat(schemaC.defaultValue()).isEqualTo("CC"); + assertThat(schemaD.defaultValue()).isEqualTo("10"); + assertThat(schemaE.defaultValue()).isEqualTo("0"); + assertThat(schemaF.defaultValue()).isEqualTo(null); + assertThat(schemaG.defaultValue()).isEqualTo(null); + assertThat(schemaH.defaultValue()).isEqualTo(null); + assertEmptyFieldValue(record, "I"); + + } + @Test @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x") public void unsignedBitTest() throws InterruptedException { diff --git a/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java index 4858bc893..f7c717b69 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java @@ -11,10 +11,14 @@ import java.util.function.Function; import java.util.regex.Pattern; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.config.Configuration; +import io.debezium.config.Field; import io.debezium.document.Array; import io.debezium.function.Predicates; import io.debezium.relational.Tables; @@ -31,6 +35,17 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory { protected final Logger logger = LoggerFactory.getLogger(getClass()); + // Temporary preference for DDL over logical schema due to DBZ-32 + public static final Field INTERNAL_PREFER_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "prefer.ddl") + .withDisplayName("Prefer DDL for schema recovery") + .withType(Type.BOOLEAN) + .withDefault(false) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("Prefer DDL for schema reovery in case logica schema is present") + .withInvisibleRecommender() + .withNoValidation(); + protected Configuration config; private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE; private boolean skipUnparseableDDL; @@ -38,6 +53,7 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory { private Function> ddlFilter = (x -> Optional.empty()); private DatabaseHistoryListener listener = DatabaseHistoryListener.NOOP; private boolean useCatalogBeforeSchema; + private boolean preferDdl = false; private TableChanges.TableChangesSerializer tableChangesSerializer = new JsonTableChangeSerializer(); protected AbstractDatabaseHistory() { @@ -54,6 +70,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator, this.ddlFilter = (ddlFilter != null) ? Predicates.matchedBy(ddlFilter) : this.ddlFilter; this.listener = listener; this.useCatalogBeforeSchema = useCatalogBeforeSchema; + this.preferDdl = config.getBoolean(INTERNAL_PREFER_DDL); } @Override @@ -87,7 +104,7 @@ public final void recover(Map source, Map position, Tables Array tableChanges = recovered.tableChanges(); String ddl = recovered.ddl(); - if (tableChanges != null && !tableChanges.isEmpty()) { + if (!preferDdl && tableChanges != null && !tableChanges.isEmpty()) { TableChanges changes = tableChangesSerializer.deserialize(tableChanges, useCatalogBeforeSchema); for (TableChange entry : changes) { if (entry.getType() == TableChangeType.CREATE || entry.getType() == TableChangeType.ALTER) {