diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index 0fb1ddf1f..eaad5f0af 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -171,6 +171,12 @@ else if (actualColumn.jdbcType() == Types.TINYINT || actualColumn.jdbcType() == // read it again to get correct scale return rs.getObject(fieldNo) == null ? null : rs.getInt(fieldNo); } + // DBZ-2673 + else if (actualColumn.typeName().equals("CHAR") || + actualColumn.typeName().equals("VARCHAR") || + actualColumn.typeName().equals("TEXT")) { + return rs.getBytes(fieldNo); + } else { return rs.getObject(fieldNo); } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java index 8f98b386d..ea83f35e3 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java @@ -190,6 +190,20 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMo assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1); + List customerRecrods = new ArrayList<>(); + customers.forEach(val -> { + customerRecrods.add(((Struct) val.value()).getStruct("after")); + }); + + Struct customer = customerRecrods.stream().sorted((a, b) -> a.getInt32("id").compareTo(b.getInt32("id"))).findFirst().get(); + assertThat(customer.get("first_name")).isInstanceOf(String.class); + assertThat(customer.get("last_name")).isInstanceOf(String.class); + assertThat(customer.get("email")).isInstanceOf(String.class); + + assertThat(customer.get("first_name")).isEqualTo("Sally"); + assertThat(customer.get("last_name")).isEqualTo("Thomas"); + assertThat(customer.get("email")).isEqualTo("sally.thomas@acme.com"); + Collection orders = store.collection(DATABASE.getDatabaseName(), "orders"); assertThat(orders.numberOfCreates()).isEqualTo(5); assertThat(orders.numberOfUpdates()).isEqualTo(0); @@ -250,6 +264,7 @@ public void snapshotWithBackupLocksShouldNotWaitForReads() throws Exception { MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName()); Thread t = new Thread() { + @Override public void run() { try { JdbcConnection connection = db.connect();