DBZ-7938: support for multiple array columns of different types
This commit is contained in:
parent
808c7070e3
commit
e89b2ca3ae
@ -25,8 +25,6 @@ public class ArrayType extends AbstractType {
|
|||||||
|
|
||||||
public static final ArrayType INSTANCE = new ArrayType();
|
public static final ArrayType INSTANCE = new ArrayType();
|
||||||
|
|
||||||
private String typeName;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String[] getRegistrationKeys() {
|
public String[] getRegistrationKeys() {
|
||||||
return new String[]{ "ARRAY" };
|
return new String[]{ "ARRAY" };
|
||||||
@ -34,9 +32,12 @@ public String[] getRegistrationKeys() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) {
|
public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) {
|
||||||
|
return getElementTypeName(dialect, schema, key) + "[]";
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getElementTypeName(DatabaseDialect dialect, Schema schema, boolean key) {
|
||||||
Type elementType = dialect.getSchemaType(schema.valueSchema());
|
Type elementType = dialect.getSchemaType(schema.valueSchema());
|
||||||
typeName = elementType.getTypeName(dialect, schema.valueSchema(), key);
|
return elementType.getTypeName(dialect, schema.valueSchema(), key);
|
||||||
return typeName + "[]";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -44,6 +45,6 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
|
|||||||
if (value == null) {
|
if (value == null) {
|
||||||
return Arrays.asList(new ValueBindDescriptor(index, null));
|
return Arrays.asList(new ValueBindDescriptor(index, null));
|
||||||
}
|
}
|
||||||
return List.of(new ValueBindDescriptor(index, value, java.sql.Types.ARRAY, typeName));
|
return List.of(new ValueBindDescriptor(index, value, java.sql.Types.ARRAY, getElementTypeName(this.getDialect(), schema, false)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,10 @@
|
|||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.kafka.connect.data.Schema;
|
import org.apache.kafka.connect.data.Schema;
|
||||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||||
@ -25,6 +28,7 @@
|
|||||||
import io.debezium.connector.jdbc.junit.jupiter.Sink;
|
import io.debezium.connector.jdbc.junit.jupiter.Sink;
|
||||||
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
|
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
|
||||||
import io.debezium.connector.jdbc.util.SinkRecordFactory;
|
import io.debezium.connector.jdbc.util.SinkRecordFactory;
|
||||||
|
import io.debezium.data.Uuid;
|
||||||
import io.debezium.doc.FixFor;
|
import io.debezium.doc.FixFor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -414,4 +418,40 @@ public void testShouldWorkWithBoolArray(SinkRecordFactory factory) throws Except
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
|
||||||
|
@FixFor("DBZ-7938")
|
||||||
|
public void testShouldWorkWithMultipleArraysWithDifferentTypes(SinkRecordFactory factory) throws Exception {
|
||||||
|
final Map<String, String> properties = getDefaultSinkConfig();
|
||||||
|
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.getValue());
|
||||||
|
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
|
||||||
|
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
|
||||||
|
startSinkConnector(properties);
|
||||||
|
assertSinkConnectorIsRunning();
|
||||||
|
|
||||||
|
final String tableName = randomTableName();
|
||||||
|
final String topicName = topicName("server2", "schema", tableName);
|
||||||
|
final List<UUID> uuids = List.of(UUID.randomUUID(), UUID.randomUUID());
|
||||||
|
|
||||||
|
final SinkRecord createRecord = factory.createRecordWithSchemaValue(
|
||||||
|
topicName,
|
||||||
|
(byte) 1,
|
||||||
|
List.of("text_data", "uuid_data"),
|
||||||
|
List.of(SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), SchemaBuilder.array(Uuid.schema()).optional().build()),
|
||||||
|
Arrays.asList(List.of("a", "b"), uuids.stream().map(UUID::toString).collect(Collectors.toList())));
|
||||||
|
|
||||||
|
final String destinationTable = destinationTableName(createRecord);
|
||||||
|
final String sql = "CREATE TABLE %s (id int not null, text_data text[], uuid_data uuid[], primary key(id))";
|
||||||
|
getSink().execute(String.format(sql, destinationTable));
|
||||||
|
|
||||||
|
consume(createRecord);
|
||||||
|
|
||||||
|
getSink().assertRows(destinationTable, rs -> {
|
||||||
|
assertThat(rs.getInt(1)).isEqualTo(1);
|
||||||
|
assertThat(rs.getArray(2).getArray()).isEqualTo(new String[]{ "a", "b" });
|
||||||
|
assertThat(rs.getArray(3).getArray()).isEqualTo(uuids.toArray());
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user