diff --git a/debezium-connector-jdbc/pom.xml b/debezium-connector-jdbc/pom.xml
index 2ee16d59b..d0f93a09a 100644
--- a/debezium-connector-jdbc/pom.xml
+++ b/debezium-connector-jdbc/pom.xml
@@ -292,6 +292,11 @@
+
+ org.mariadb.jdbc
+ mariadb-java-client
+ 3.4.0
+
org.postgresql
postgresql
diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MariaDbDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MariaDbDatabaseDialect.java
new file mode 100644
index 000000000..d214406f3
--- /dev/null
+++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MariaDbDatabaseDialect.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.connector.jdbc.dialect.mysql;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.TemporalAccessor;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.hibernate.SessionFactory;
+import org.hibernate.StatelessSession;
+import org.hibernate.dialect.Dialect;
+import org.hibernate.dialect.MariaDBDialect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
+import io.debezium.connector.jdbc.SinkRecordDescriptor;
+import io.debezium.connector.jdbc.dialect.DatabaseDialect;
+import io.debezium.connector.jdbc.dialect.DatabaseDialectProvider;
+import io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect;
+import io.debezium.connector.jdbc.dialect.SqlStatementBuilder;
+import io.debezium.connector.jdbc.relational.TableDescriptor;
+import io.debezium.time.ZonedTimestamp;
+import io.debezium.util.Strings;
+
+/**
+ * A {@link DatabaseDialect} implementation for MariaDB.
+ *
+ */
+public class MariaDbDatabaseDialect extends GeneralDatabaseDialect {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MariaDbDatabaseDialect.class);
+
+ private static final List NO_DEFAULT_VALUE_TYPES = Arrays.asList(
+ "tinytext", "mediumtext", "longtext", "text", "tinyblob", "mediumblob", "longblob");
+
+ private static final DateTimeFormatter ISO_LOCAL_DATE_TIME_WITH_SPACE = new DateTimeFormatterBuilder()
+ .parseCaseInsensitive()
+ .append(DateTimeFormatter.ISO_LOCAL_DATE)
+ .appendLiteral(' ')
+ .append(DateTimeFormatter.ISO_LOCAL_TIME)
+ .toFormatter();
+
+ public static class MariaDbDatabaseDialectProvider implements DatabaseDialectProvider {
+ @Override
+ public boolean supports(Dialect dialect) {
+ return dialect instanceof MariaDBDialect;
+ }
+
+ @Override
+ public Class> name() {
+ return MariaDbDatabaseDialect.class;
+ }
+
+ @Override
+ public DatabaseDialect instantiate(JdbcSinkConnectorConfig config, SessionFactory sessionFactory) {
+ LOGGER.info("MariaDB Dialect instantiated.");
+ return new MariaDbDatabaseDialect(config, sessionFactory);
+ }
+ }
+
+ private final boolean connectionTimeZoneSet;
+
+ private MariaDbDatabaseDialect(JdbcSinkConnectorConfig config, SessionFactory sessionFactory) {
+ super(config, sessionFactory);
+ try (StatelessSession session = sessionFactory.openStatelessSession()) {
+ this.connectionTimeZoneSet = session.doReturningWork(connection -> connection.getMetaData().getURL().contains("connectionTimeZone="));
+ }
+ }
+
+ @Override
+ protected Optional getDatabaseTimeZoneQuery() {
+ return Optional.of("SELECT @@global.time_zone, @@session.time_zone");
+ }
+
+ @Override
+ protected String getDatabaseTimeZoneQueryResult(ResultSet rs) throws SQLException {
+ return rs.getString(1) + " (global), " + rs.getString(2) + " (system)";
+ }
+
+ @Override
+ public boolean isTimeZoneSet() {
+ return connectionTimeZoneSet || super.isTimeZoneSet();
+ }
+
+ @Override
+ public boolean shouldBindTimeWithTimeZoneAsDatabaseTimeZone() {
+ return true;
+ }
+
+ @Override
+ protected void registerTypes() {
+ super.registerTypes();
+
+ registerType(BooleanType.INSTANCE);
+ registerType(BitType.INSTANCE);
+ registerType(BytesType.INSTANCE);
+ registerType(EnumType.INSTANCE);
+ registerType(SetType.INSTANCE);
+ registerType(MediumIntType.INSTANCE);
+ registerType(IntegerType.INSTANCE);
+ registerType(TinyIntType.INSTANCE);
+ registerType(YearType.INSTANCE);
+ registerType(JsonType.INSTANCE);
+ registerType(MapToJsonType.INSTANCE);
+ registerType(GeometryType.INSTANCE);
+ registerType(PointType.INSTANCE);
+ registerType(ZonedTimestampType.INSTANCE);
+ registerType(ZonedTimeType.INSTANCE);
+ }
+
+ @Override
+ public int getMaxVarcharLengthInKey() {
+ return 255;
+ }
+
+ @Override
+ public String getFormattedTime(TemporalAccessor value) {
+ return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_TIME.format(value));
+ }
+
+ @Override
+ public String getFormattedDateTime(TemporalAccessor value) {
+ return String.format("'%s'", ISO_LOCAL_DATE_TIME_WITH_SPACE.format(value));
+ }
+
+ @Override
+ public String getFormattedTimestamp(TemporalAccessor value) {
+ return String.format("'%s'", ISO_LOCAL_DATE_TIME_WITH_SPACE.format(value));
+ }
+
+ @Override
+ public String getFormattedTimestampWithTimeZone(String value) {
+ final ZonedDateTime zonedDateTime = ZonedDateTime.parse(value, ZonedTimestamp.FORMATTER);
+ return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(zonedDateTime));
+ }
+
+ @Override
+ public String getAlterTablePrefix() {
+ return "ADD COLUMN (";
+ }
+
+ /*
+ * The use of VALUES() to refer to the new row and columns is deprecated in recent versions of MySQL,
+ * but not followed by MariaDB yet.
+ */
+ @Override
+ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) {
+ final SqlStatementBuilder builder = new SqlStatementBuilder();
+ builder.append("INSERT INTO ");
+ builder.append(getQualifiedTableName(table.getId()));
+ builder.append(" (");
+ builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), name -> columnNameFromField(name, record));
+ builder.append(") VALUES (");
+ builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), name -> columnQueryBindingFromField(name, table, record));
+ builder.append(") ");
+
+ final List updateColumnNames = record.getNonKeyFieldNames().isEmpty()
+ ? record.getKeyFieldNames()
+ : record.getNonKeyFieldNames();
+
+ builder.append("ON DUPLICATE KEY UPDATE ");
+ builder.appendList(",", updateColumnNames, name -> {
+ final String columnName = columnNameFromField(name, record);
+ return columnName + "=VALUES(" + columnName + ")";
+ });
+
+ return builder.build();
+ }
+
+ @Override
+ protected void addColumnDefaultValue(SinkRecordDescriptor.FieldDescriptor field, StringBuilder columnSpec) {
+ final String fieldType = field.getTypeName();
+ if (!Strings.isNullOrBlank(fieldType)) {
+ if (NO_DEFAULT_VALUE_TYPES.contains(fieldType.toLowerCase())) {
+ return;
+ }
+ }
+ super.addColumnDefaultValue(field, columnSpec);
+ }
+}
diff --git a/debezium-connector-jdbc/src/main/resources/META-INF/services/io.debezium.connector.jdbc.dialect.DatabaseDialectProvider b/debezium-connector-jdbc/src/main/resources/META-INF/services/io.debezium.connector.jdbc.dialect.DatabaseDialectProvider
index 5c9ce281c..c8c5b7175 100644
--- a/debezium-connector-jdbc/src/main/resources/META-INF/services/io.debezium.connector.jdbc.dialect.DatabaseDialectProvider
+++ b/debezium-connector-jdbc/src/main/resources/META-INF/services/io.debezium.connector.jdbc.dialect.DatabaseDialectProvider
@@ -1,4 +1,5 @@
io.debezium.connector.jdbc.dialect.db2.Db2DatabaseDialect$Db2DatabaseProvider
+io.debezium.connector.jdbc.dialect.mysql.MariaDbDatabaseDialect$MariaDbDatabaseDialectProvider
io.debezium.connector.jdbc.dialect.mysql.MySqlDatabaseDialect$MySqlDatabaseDialectProvider
io.debezium.connector.jdbc.dialect.oracle.OracleDatabaseDialect$OracleDatabaseDialectProvider
io.debezium.connector.jdbc.dialect.postgres.PostgresDatabaseDialect$PostgresDatabaseDialectProvider