DBZ-898 Misc. clean-up:

* Using database encoding for string conversion
* Not making hstore schemas optional by default
* Using Jackson instead of GSon for JSON serialization
* Removing superfluous method and log messages
* Adjusting to naming and style conventions
This commit is contained in:
Gunnar Morling 2018-09-24 13:12:34 +02:00
parent d88fa1f47f
commit 7150e387c7
17 changed files with 222 additions and 158 deletions

View File

@ -45,11 +45,6 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>

View File

@ -134,18 +134,6 @@ public enum HStoreHandlingMode implements EnumeratedValue {
this.value=value;
}
public HStoreHandlingMode asHStoreMode(){
switch(this){
case JSON:
return HStoreHandlingMode.JSON;
case MAP:
return HStoreHandlingMode.MAP;
default:
return HStoreHandlingMode.MAP;
}
}
@Override
public String getValue() {
return value;
@ -741,7 +729,7 @@ public String getPostgresPluginName() {
.withImportance(Importance.LOW)
.withDescription("Specify how HSTORE columns should be represented in change events, including:"
+ "'json' represents values as json string"
+ "'map' (default) represents values using java.util.map");
+ "'map' (default) represents values using java.util.Map");
public static final Field STATUS_UPDATE_INTERVAL_MS = Field.create("status.update.interval.ms")
.withDisplayName("Status update interval (ms)")
@ -815,7 +803,7 @@ protected PostgresConnectorConfig(Configuration config) {
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr);
HStoreHandlingMode hStoreHandlingMode = HStoreHandlingMode.parse(hstoreHandlingModeStr);
this.decimalHandlingMode = decimalHandlingMode.asDecimalMode();
this.hStoreHandlingMode = hStoreHandlingMode.asHStoreMode();
this.hStoreHandlingMode = hStoreHandlingMode;
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
}

View File

@ -6,6 +6,7 @@
package io.debezium.connector.postgresql;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@ -56,15 +57,17 @@ public void start(Configuration config) {
PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
// Create type registry
TypeRegistry typeRegistry;
Charset databaseCharset;
try (final PostgresConnection connection = new PostgresConnection(connectorConfig.jdbcConfig())) {
typeRegistry = connection.getTypeRegistry();
databaseCharset = connection.getDatabaseCharset();
}
// create the task context and schema...
TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(connectorConfig);
PostgresSchema schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector);
PostgresSchema schema = new PostgresSchema(connectorConfig, typeRegistry, databaseCharset, topicSelector);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector);
SourceInfo sourceInfo = new SourceInfo(connectorConfig.getLogicalName(), connectorConfig.databaseName());

View File

@ -6,6 +6,7 @@
package io.debezium.connector.postgresql;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.time.ZoneOffset;
@ -48,18 +49,18 @@ public class PostgresSchema extends RelationalDatabaseSchema {
*
* @param config the connector configuration, which is presumed to be valid
*/
protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry,
protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry, Charset databaseCharset,
TopicSelector<TableId> topicSelector) {
super(config, topicSelector, new Filters(config).tableFilter(),
new Filters(config).columnFilter(), getTableSchemaBuilder(config, typeRegistry), false);
new Filters(config).columnFilter(), getTableSchemaBuilder(config, typeRegistry, databaseCharset), false);
this.filters = new Filters(config);
this.typeRegistry = typeRegistry;
}
private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, TypeRegistry typeRegistry) {
PostgresValueConverter valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.temporalPrecisionMode(),
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry,config.hStoreHandlingMode());
private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, TypeRegistry typeRegistry, Charset databaseCharset) {
PostgresValueConverter valueConverter = new PostgresValueConverter(databaseCharset, config.decimalHandlingMode(), config.temporalPrecisionMode(),
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry, config.hStoreHandlingMode());
return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(LOGGER), SourceInfo.SCHEMA);
}

View File

@ -6,9 +6,11 @@
package io.debezium.connector.postgresql;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
@ -17,35 +19,35 @@
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.HashMap;
import java.util.Optional;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.geometric.PGpoint;
import org.postgresql.util.HStoreConverter;
import org.postgresql.util.PGInterval;
import org.postgresql.util.PGobject;
import io.debezium.connector.postgresql.proto.PgProto;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.debezium.data.Json;
import io.debezium.connector.postgresql.PostgresConnectorConfig.HStoreHandlingMode;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.data.Bits;
import io.debezium.data.Json;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.Uuid;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
@ -57,7 +59,6 @@
import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.NumberConversions;
import io.debezium.connector.postgresql.PostgresConnectorConfig.HStoreHandlingMode;
/**
* A provider of {@link ValueConverter}s and {@link SchemaBuilder}s for various Postgres specific column types.
@ -100,12 +101,23 @@ public class PostgresValueConverter extends JdbcValueConverters {
private final TypeRegistry typeRegistry;
private final HStoreHandlingMode hStoreMode;
protected PostgresValueConverter(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry,
HStoreHandlingMode hStoreMode) {
/**
* The current database's character encoding.
*/
private final Charset databaseCharset;
private final JsonFactory jsonFactory;
protected PostgresValueConverter(Charset databaseCharset, DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset,
BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry,
HStoreHandlingMode hStoreMode) {
super(decimalMode, temporalPrecisionMode, defaultOffset,null, bigIntUnsignedMode);
this.databaseCharset = databaseCharset;
this.jsonFactory = new JsonFactory();
this.includeUnknownDatatypes = includeUnknownDatatypes;
this.typeRegistry = typeRegistry;
this.hStoreMode=hStoreMode;
this.hStoreMode = hStoreMode;
}
@Override
@ -227,9 +239,15 @@ private SchemaBuilder numericSchema(Column column) {
private SchemaBuilder hstoreSchema(){
if (hStoreMode == PostgresConnectorConfig.HStoreHandlingMode.JSON) {
return SchemaBuilder.string().optional();
return SchemaBuilder.string();
}
else {
// keys are not nullable, but values are
return SchemaBuilder.map(
SchemaBuilder.STRING_SCHEMA,
SchemaBuilder.string().optional().build()
);
}
return SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, SchemaBuilder.string().optional().build()).optional();
}
@Override
@ -308,7 +326,7 @@ else if (oidValue == typeRegistry.citextOid()) {
return data -> convertCitext(column, fieldDefn, data);
}
else if (oidValue == typeRegistry.hstoreOid()) {
return data -> convertHStore(column,fieldDefn,data,hStoreMode);
return data -> convertHStore(column, fieldDefn, data, hStoreMode);
}
else if (oidValue == typeRegistry.geometryArrayOid() || oidValue == typeRegistry.geographyArrayOid() || oidValue == typeRegistry.citextArrayOid()) {
return createArrayConverter(column, fieldDefn);
@ -387,55 +405,85 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data, Dec
protected Object convertHStore(Column column,Field fieldDefn,Object data,HStoreHandlingMode mode) {
if (mode == HStoreHandlingMode.JSON) {
return convertJSON(column, fieldDefn, data);
return convertHstoreToJsonString(column, fieldDefn, data);
}
return convertMap(column,fieldDefn,data);
return convertHstoreToMap(column, fieldDefn, data);
}
private Object convertMap(Column column,Field fieldDefn,Object data){
if(data == null){
data = fieldDefn.schema().defaultValue(); }
if(data == null) {
if (column.isOptional()) {
return null;
private Object convertHstoreToMap(Column column, Field fieldDefn, Object data){
if(data == null) {
data = fieldDefn.schema().defaultValue();
}
if(data == null) {
if (column.isOptional()) {
return null;
}
return new HashMap<>();
}
if(data instanceof String){
String temp = ((String) data);
return HStoreConverter.fromString(temp);
if(data instanceof String) {
return HStoreConverter.fromString((String) data);
}
if(data instanceof byte[]){
byte[] temp = ((byte[]) data);
return HStoreConverter.fromString(new String(temp));
if(data instanceof byte[]) {
return HStoreConverter.fromString(asHstoreString((byte[]) data));
}
return handleUnknownData(column,fieldDefn,data) ;
}
private Object convertJSON(Column column,Field fieldDefn, Object data){
if(data == null){
/**
* Returns an Hstore field as string in the form of {@code "key 1"=>"value1", "key_2"=>"val 1"}; i.e. the given byte
* array is NOT the byte representation returned by {@link HStoreConverter#toBytes(Map,
* org.postgresql.core.Encoding))}, but the String based representation
*/
private String asHstoreString(byte[] data) {
return new String(data, databaseCharset);
}
private Object convertHstoreToJsonString(Column column,Field fieldDefn, Object data){
if(data == null) {
data = fieldDefn.schema().defaultValue();
}
if(data == null){
if(column.isOptional()){
if(data == null) {
if(column.isOptional()) {
return null;
}
return new GsonBuilder().create().toJson(null);
else {
return "{}";
}
}
if(data instanceof String){
String plainText =( (String) data);
return changePlainStringRepresentationToJSONStringRepresentation(plainText);
}
if(data instanceof byte[]){
byte[] temp = ((byte[]) data);
return changePlainStringRepresentationToJSONStringRepresentation(new String(temp));
if(data instanceof String) {
return changePlainStringRepresentationToJsonStringRepresentation(( (String) data));
}
if(data instanceof byte[]) {
return changePlainStringRepresentationToJsonStringRepresentation(asHstoreString((byte[]) data));
}
return handleUnknownData(column,fieldDefn,data);
}
private String changePlainStringRepresentationToJSONStringRepresentation(String text){
Gson gson = new GsonBuilder().serializeNulls().create();
return gson.toJson(HStoreConverter.fromString(text));
private String changePlainStringRepresentationToJsonStringRepresentation(String text){
Map<String, String> map = HStoreConverter.fromString(text);
StringWriter writer = new StringWriter();
try (JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer)) {
jsonGenerator.writeStartObject();
for (Entry<String, String> hstoreEntry : map.entrySet()) {
jsonGenerator.writeStringField(hstoreEntry.getKey(), hstoreEntry.getValue());
}
jsonGenerator.writeEndObject();
jsonGenerator.flush();
return writer.getBuffer().toString();
}
catch(Exception e) {
throw new RuntimeException("Couldn't serialize hstore value into JSON: " + text, e);
}
}
@Override

View File

@ -6,6 +6,7 @@
package io.debezium.connector.postgresql.connection;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
@ -21,6 +22,7 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.TypeInfo;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.PgDatabaseMetaData;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.util.PSQLState;
@ -69,6 +71,8 @@ public class PostgresConnection extends JdbcConnection {
private final TypeRegistry typeRegistry;
private final Charset databaseCharset;
/**
* Creates a Postgres connection using the supplied configuration.
*
@ -83,6 +87,8 @@ public PostgresConnection(Configuration config) {
catch (SQLException e) {
throw new ConnectException("Could not intialize type registry", e);
}
databaseCharset = determineDatabaseCharset();
}
/**
@ -307,6 +313,19 @@ public ServerInfo serverInfo() throws SQLException {
return serverInfo;
}
public Charset getDatabaseCharset() {
return databaseCharset;
}
private Charset determineDatabaseCharset() {
try {
return Charset.forName(((PgConnection) connection()).getEncoding().name());
}
catch (SQLException e) {
throw new RuntimeException("Couldn't obtain encoding for database " + database(), e);
}
}
protected static void defaultSettings(Configuration.Builder builder) {
// we require Postgres 9.4 as the minimum server version since that's where logical replication was first introduced
builder.with("assumeMinServerVersion", "9.4");

View File

@ -172,9 +172,8 @@ else if (datumMessage.hasDatumString()) {
case PgOid.XML:
case PgOid.UUID:
case PgOid.BIT:
case PgOid.VARBIT:{
LOGGER.warn("Identified as 123456:" +datumMessage.getDatumString());
return datumMessage.hasDatumString() ? datumMessage.getDatumString() : null;}
case PgOid.VARBIT:
return datumMessage.hasDatumString() ? datumMessage.getDatumString() : null;
case PgOid.DATE:
return datumMessage.hasDatumInt32() ? (long) datumMessage.getDatumInt32() : null;
case PgOid.TIMESTAMP:
@ -249,8 +248,7 @@ else if (datumMessage.hasDatumString()) {
if (type.getOid() == typeRegistry.geometryOid() || type.getOid() == typeRegistry.geographyOid() || type.getOid() == typeRegistry.citextOid() ) {
return datumMessage.getDatumBytes().toByteArray();
}
if(type.getOid() == typeRegistry.hstoreOid()){
LOGGER.warn("Testing03:" +datumMessage.getDatumBytes().toByteArray());
if(type.getOid() == typeRegistry.hstoreOid()) {
return datumMessage.getDatumBytes().toByteArray();
}
if (type.getOid() == typeRegistry.geometryArrayOid() || type.getOid() == typeRegistry.geographyArrayOid() || type.getOid() == typeRegistry.citextArrayOid() ) {

View File

@ -193,7 +193,6 @@ public Object getValue(String columnName, PostgresType type, String fullType, Va
}
return null;
}
LOGGER.warn("TYPE:" +type.getName() +"RAW VALUE" +rawValue + "type" +fullType);
switch (type.getName()) {
// include all types from https://www.postgresql.org/docs/current/static/datatype.html#DATATYPE-TABLE
@ -205,9 +204,6 @@ public Object getValue(String columnName, PostgresType type, String fullType, Va
case "hstore":
return rawValue.asString();
case "string":
case "struct":
LOGGER.debug("Identified as String:" +rawValue.asString());
case "integer":
case "int":
case "int4":

View File

@ -22,14 +22,14 @@
import java.time.Month;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -258,53 +258,61 @@ protected List<SchemaAndValueField> schemasAndValuesForDoubleEncodedNumericTypes
protected List<SchemaAndValueField> schemaAndValueFieldForMapEncodedHStoreType(){
final Map<String,String> expected = new HashMap<>();
expected.put("key","val");
return Arrays.asList(new SchemaAndValueField("hs",SchemaBuilder.map(Schema.STRING_SCHEMA,SchemaBuilder.string().optional().build()).optional().build(),expected));
expected.put("key", "val");
return Arrays.asList(new SchemaAndValueField("hs", hstoreMapSchema(), expected));
}
protected List<SchemaAndValueField> schemaAndValueFieldForMapEncodedHStoreTypeWithMultipleValues(){
final Map<String,String> expected = new HashMap<>();
expected.put("key1","val1");
expected.put("key2","val2");
expected.put("key3","val3");
return Arrays.asList(new SchemaAndValueField("hs",SchemaBuilder.map(Schema.STRING_SCHEMA,SchemaBuilder.string().optional().build()).optional().build(),expected));
expected.put("key1", "val1");
expected.put("key2", "val2");
expected.put("key3", "val3");
return Arrays.asList(new SchemaAndValueField("hs", hstoreMapSchema(), expected));
}
protected List<SchemaAndValueField> schemaAndValueFieldForMapEncodedHStoreTypeWithNullValues(){
final Map<String,String> expected = new HashMap<>();
expected.put("key1","val1");
expected.put("key2",null);
return Arrays.asList(new SchemaAndValueField("hs",SchemaBuilder.map(Schema.STRING_SCHEMA,SchemaBuilder.string().optional().build()).optional().build(),expected));
expected.put("key1", "val1");
expected.put("key2", null);
return Arrays.asList(new SchemaAndValueField("hs", hstoreMapSchema(), expected));
}
protected List<SchemaAndValueField> schemaAndValueFieldForMapEncodedHStoreTypeWithSpecialCharacters(){
final Map<String,String> expected = new HashMap<>();
expected.put("key_#1","val 1");
expected.put("key 2"," ##123 78");
return Arrays.asList(new SchemaAndValueField("hs",SchemaBuilder.map(Schema.STRING_SCHEMA,SchemaBuilder.string().optional().build()).optional().build(),expected));
expected.put("key_#1", "val 1");
expected.put("key 2", " ##123 78");
return Arrays.asList(new SchemaAndValueField("hs", hstoreMapSchema(), expected));
}
protected List<SchemaAndValueField> schemaAndValueFieldForJSONEncodedHStoreType(){
private Schema hstoreMapSchema() {
return SchemaBuilder.map(
Schema.STRING_SCHEMA,
SchemaBuilder.string().optional().build()
)
.optional()
.build();
}
protected List<SchemaAndValueField> schemaAndValueFieldForJsonEncodedHStoreType(){
final String expected = "{\"key\":\"val\"}";
return Arrays.asList(new SchemaAndValueField("hs",SchemaBuilder.string().optional().build(),expected));
return Arrays.asList(new SchemaAndValueField("hs", SchemaBuilder.OPTIONAL_STRING_SCHEMA, expected));
}
protected List<SchemaAndValueField> schemaAndValueFieldForJSONEncodedHStoreTypeWithMultipleValues(){
protected List<SchemaAndValueField> schemaAndValueFieldForJsonEncodedHStoreTypeWithMultipleValues(){
final String expected = "{\"key1\":\"val1\",\"key2\":\"val2\",\"key3\":\"val3\"}";
return Arrays.asList(new SchemaAndValueField("hs",SchemaBuilder.string().optional().build(),expected));
return Arrays.asList(new SchemaAndValueField("hs", SchemaBuilder.OPTIONAL_STRING_SCHEMA, expected));
}
protected List<SchemaAndValueField> schemaAndValueFieldForJSONEncodedHStoreTypeWithNullValues(){
protected List<SchemaAndValueField> schemaAndValueFieldForJsonEncodedHStoreTypeWithNullValues(){
final String expected = "{\"key1\":\"val1\",\"key2\":null}";
return Arrays.asList(new SchemaAndValueField("hs",SchemaBuilder.string().optional().build(),expected));
return Arrays.asList(new SchemaAndValueField("hs", SchemaBuilder.OPTIONAL_STRING_SCHEMA, expected));
}
protected List<SchemaAndValueField> schemaAndValueFieldForJSONEncodedHStoreTypeWithSpcialCharacters(){
protected List<SchemaAndValueField> schemaAndValueFieldForJsonEncodedHStoreTypeWithSpcialCharacters(){
final String expected = "{\"key_#1\":\"val 1\",\"key 2\":\" ##123 78\"}";
return Arrays.asList(new SchemaAndValueField("hs",SchemaBuilder.string().optional().build(),expected));
return Arrays.asList(new SchemaAndValueField("hs", SchemaBuilder.OPTIONAL_STRING_SCHEMA, expected));
}
protected List<SchemaAndValueField> schemasAndValuesForStringTypes() {
return Arrays.asList(new SchemaAndValueField("vc", Schema.OPTIONAL_STRING_SCHEMA, "\u017E\u0161"),
new SchemaAndValueField("vcv", Schema.OPTIONAL_STRING_SCHEMA, "bb"),

View File

@ -70,7 +70,7 @@ public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl");
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), PostgresTopicSelector.create(config));
schema = TestHelper.getSchema(config);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
@ -122,7 +122,7 @@ public void shouldLoadSchemaForExtensionPostgresTypes() throws Exception {
TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).build()
);
schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), PostgresTopicSelector.create(config));
schema = TestHelper.getSchema(config);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
@ -137,7 +137,7 @@ public void shouldLoadSchemaForPostgisTypes() throws Exception {
TestHelper.executeDDL("init_postgis.ddl");
TestHelper.executeDDL("postgis_create_tables.ddl");
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), PostgresTopicSelector.create(config));
schema = TestHelper.getSchema(config);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
@ -165,7 +165,7 @@ public void shouldApplyFilters() throws Exception {
TestHelper.execute(statements);
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(SCHEMA_BLACKLIST, "s1").build());
final TypeRegistry typeRegistry = TestHelper.getTypeRegistry();
schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
schema = TestHelper.getSchema(config, typeRegistry);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
assertTablesIncluded("s2.a", "s2.b");
@ -173,14 +173,14 @@ public void shouldApplyFilters() throws Exception {
}
config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(SCHEMA_BLACKLIST, "s.*").build());
schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
schema = TestHelper.getSchema(config, typeRegistry);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
assertTablesExcluded("s1.a", "s2.a", "s1.b", "s2.b");
}
config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.TABLE_BLACKLIST, "s1.A,s2.A").build());
schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
schema = TestHelper.getSchema(config, typeRegistry);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
assertTablesIncluded("s1.b", "s2.b");
@ -191,7 +191,7 @@ public void shouldApplyFilters() throws Exception {
.with(SCHEMA_BLACKLIST, "s2")
.with(PostgresConnectorConfig.TABLE_BLACKLIST, "s1.A")
.build());
schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
schema = TestHelper.getSchema(config, typeRegistry);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
assertTablesIncluded("s1.b");
@ -199,7 +199,7 @@ public void shouldApplyFilters() throws Exception {
}
config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.COLUMN_BLACKLIST, ".*aa")
.build());
schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
schema = TestHelper.getSchema(config, typeRegistry);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
assertColumnsExcluded("s1.a.aa", "s2.a.aa");
@ -213,8 +213,7 @@ public void shouldDetectNewChangesAfterRefreshing() throws Exception {
"CREATE TABLE table1 (pk SERIAL, PRIMARY KEY(pk));";
TestHelper.execute(statements);
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), PostgresTopicSelector.create(config));
schema = TestHelper.getSchema(config);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
assertTablesIncluded("public.table1");

View File

@ -54,7 +54,7 @@ public void before() throws Exception {
TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext(
config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
TestHelper.getSchema(config),
selector
);
}
@ -101,7 +101,7 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext(
config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
TestHelper.getSchema(config),
selector
);
@ -194,7 +194,7 @@ public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception {
TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext(
config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
TestHelper.getSchema(config),
selector
);
@ -236,7 +236,7 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr
TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext(
config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
TestHelper.getSchema(config),
selector
);
@ -278,7 +278,7 @@ public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() thro
TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext(
config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
TestHelper.getSchema(config),
selector
);

View File

@ -602,33 +602,36 @@ public void shouldReceiveNumericTypeAsString() throws Exception {
}
@Test
public void shouldRecieveHStoreTypeWithSingleValueAsMap() throws Exception {
@FixFor("DBZ-898")
public void shouldReceiveHStoreTypeWithSingleValueAsMap() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE,PostgresConnectorConfig.HStoreHandlingMode.MAP)
.build());
setupRecordsProducer(config);
TestHelper.executeDDL("postgres_create_tables.ddl");
consumer = testConsumer(1);
recordsProducer.start(consumer,blackHole);
recordsProducer.start(consumer, blackHole);
assertInsert(INSERT_HSTORE_TYPE_STMT, schemaAndValueFieldForMapEncodedHStoreType());
}
@Test
public void shouldRecieveHStoreTypeWithMultipleValuesAsMap() throws Exception {
@FixFor("DBZ-898")
public void shouldReceiveHStoreTypeWithMultipleValuesAsMap() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE,PostgresConnectorConfig.HStoreHandlingMode.MAP)
.build());
setupRecordsProducer(config);
TestHelper.executeDDL("postgres_create_tables.ddl");
consumer = testConsumer(1);
recordsProducer.start(consumer,blackHole);
recordsProducer.start(consumer, blackHole);
assertInsert(INSERT_HSTORE_TYPE_WITH_MULTIPLE_VALUES_STMT, schemaAndValueFieldForMapEncodedHStoreTypeWithMultipleValues());
}
@Test
public void shouldRecieveHStoreTypeWithNullValuesAsMap() throws Exception {
@FixFor("DBZ-898")
public void shouldReceiveHStoreTypeWithNullValuesAsMap() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE,PostgresConnectorConfig.HStoreHandlingMode.MAP)
.build());
@ -641,46 +644,50 @@ public void shouldRecieveHStoreTypeWithNullValuesAsMap() throws Exception {
}
@Test
public void shouldRecieveHStoreTypeWithSpecialCharactersInValuesAsMap() throws Exception {
@FixFor("DBZ-898")
public void shouldReceiveHStoreTypeWithSpecialCharactersInValuesAsMap() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE,PostgresConnectorConfig.HStoreHandlingMode.MAP)
.build());
setupRecordsProducer(config);
TestHelper.executeDDL("postgres_create_tables.ddl");
consumer = testConsumer(1);
recordsProducer.start(consumer,blackHole);
recordsProducer.start(consumer, blackHole);
assertInsert(INSERT_HSTORE_TYPE_WITH_SPECIAL_CHAR_STMT, schemaAndValueFieldForMapEncodedHStoreTypeWithSpecialCharacters());
}
@Test
public void shouldRecieveHStoreTypeAsJSONString() throws Exception {
@FixFor("DBZ-898")
public void shouldReceiveHStoreTypeAsJsonString() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE,PostgresConnectorConfig.HStoreHandlingMode.JSON)
.build());
setupRecordsProducer(config);
TestHelper.executeDDL("postgres_create_tables.ddl");
consumer = testConsumer(1);
recordsProducer.start(consumer,blackHole);
recordsProducer.start(consumer, blackHole);
assertInsert(INSERT_HSTORE_TYPE_STMT, schemaAndValueFieldForJSONEncodedHStoreType());
assertInsert(INSERT_HSTORE_TYPE_STMT, schemaAndValueFieldForJsonEncodedHStoreType());
}
@Test
public void shouldRecieveHStoreTypeWithMultipleValuesAsJSONString() throws Exception {
@FixFor("DBZ-898")
public void shouldReceiveHStoreTypeWithMultipleValuesAsJsonString() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE,PostgresConnectorConfig.HStoreHandlingMode.JSON)
.build());
setupRecordsProducer(config);
TestHelper.executeDDL("postgres_create_tables.ddl");
consumer = testConsumer(1);
recordsProducer.start(consumer,blackHole);
recordsProducer.start(consumer, blackHole);
assertInsert(INSERT_HSTORE_TYPE_WITH_MULTIPLE_VALUES_STMT, schemaAndValueFieldForJSONEncodedHStoreTypeWithMultipleValues());
assertInsert(INSERT_HSTORE_TYPE_WITH_MULTIPLE_VALUES_STMT, schemaAndValueFieldForJsonEncodedHStoreTypeWithMultipleValues());
}
@Test
public void shouldRecieveHStoreTypeWithSpecialValuesInJSONString() throws Exception {
@FixFor("DBZ-898")
public void shouldReceiveHStoreTypeWithSpecialValuesInJsonString() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE,PostgresConnectorConfig.HStoreHandlingMode.JSON)
.build());
@ -689,11 +696,12 @@ public void shouldRecieveHStoreTypeWithSpecialValuesInJSONString() throws Except
consumer = testConsumer(1);
recordsProducer.start(consumer,blackHole);
assertInsert(INSERT_HSTORE_TYPE_WITH_SPECIAL_CHAR_STMT, schemaAndValueFieldForJSONEncodedHStoreTypeWithSpcialCharacters());
assertInsert(INSERT_HSTORE_TYPE_WITH_SPECIAL_CHAR_STMT, schemaAndValueFieldForJsonEncodedHStoreTypeWithSpcialCharacters());
}
@Test
public void shouldRecieveHStoreTypeWithNullValuesAsJSONString() throws Exception {
@FixFor("DBZ-898")
public void shouldReceiveHStoreTypeWithNullValuesAsJsonString() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE,PostgresConnectorConfig.HStoreHandlingMode.JSON)
.build());
@ -702,7 +710,7 @@ public void shouldRecieveHStoreTypeWithNullValuesAsJSONString() throws Exception
consumer = testConsumer(1);
recordsProducer.start(consumer,blackHole);
assertInsert(INSERT_HSTORE_TYPE_WITH_NULL_VALUES_STMT, schemaAndValueFieldForJSONEncodedHStoreTypeWithNullValues());
assertInsert(INSERT_HSTORE_TYPE_WITH_NULL_VALUES_STMT, schemaAndValueFieldForJsonEncodedHStoreTypeWithNullValues());
}
@Test
@ -817,7 +825,7 @@ private void setupRecordsProducer(PostgresConnectorConfig config) {
PostgresTaskContext context = new PostgresTaskContext(
config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
TestHelper.getSchema(config),
selector
);
recordsProducer = new RecordsStreamProducer(context, new SourceInfo(config.getLogicalName(), config.databaseName()));

View File

@ -56,7 +56,7 @@ public void before(Configuration overrides) throws SQLException {
TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext(
config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
TestHelper.getSchema(config),
selector
);
}

View File

@ -9,6 +9,7 @@
import static org.junit.Assert.assertNotNull;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
@ -149,6 +150,19 @@ public static TypeRegistry getTypeRegistry() {
}
}
public static PostgresSchema getSchema(PostgresConnectorConfig config) {
return getSchema(config, TestHelper.getTypeRegistry());
}
public static PostgresSchema getSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry) {
return new PostgresSchema(
config,
typeRegistry,
Charset.forName("UTF-8"),
PostgresTopicSelector.create(config)
);
}
protected static Set<String> schemaNames() throws SQLException {
try (PostgresConnection connection = create()) {
return connection.readAllSchemaNames(Filters.IS_SYSTEM_SCHEMA.negate());

View File

@ -8,4 +8,4 @@ CREATE SCHEMA public;
CREATE EXTENSION IF NOT EXISTS ltree SCHEMA public;
CREATE EXTENSION IF NOT EXISTS isn SCHEMA public;
CREATE EXTENSION IF NOT EXISTS citext SCHEMA public;
CREATE EXTENSION IF NOT EXISTS hstore SCHEMA public;
CREATE EXTENSION IF NOT EXISTS hstore SCHEMA public;

View File

@ -9,17 +9,8 @@ log4j.rootLogger=INFO, stdout
# Set up the default logging to be INFO level, then override specific units
log4j.logger.io.debezium=INFO
#log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
#log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG
#log4j.logger.io.debezium.connector.postgresql.RecordsStreamProducer=TRACE
#log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=TRACE
#log4j.logger.io.debezium.connector.postgresql.PostgresConnectorTask=TRACE
#log4j.logger.io.debezium.connector.postgresql.PostgresValueConvertor=TRACE
#log4j.logger.io.debezium.connector.postgresql.RecordsSnapshotProducer=TRACE
#log4j.logger.io.debezium.connector.postgresql.RecordsStreamProducer=TRACE
#log4j.logger.io.debezium.connector.postgresql.TypeRegistry=TRACE
#log4j.logger.io.debezium.connector.postgresql.PostgresType=TRACE
#log4j.logger.io.debezium.connector.postgresql.StreamingWal2JsonMessageDecoder=TRACE
#log4j.logger.io.debezium.connector.postgresql.connection.wal2json.Wal2JsonReplicationMessage=TRACE
#log4j.logger.io.debezium.connector.postgresql.connection.pgproto.PgProtoReplicationMessage=TRACE
#log4j.logger.io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder=TRACE
#log4j.logger.io.debezium.connector.postgresql.RecordsStreamProducer=DEBUG
#log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG
#log4j.logger.io.debezium.connector.postgresql.PostgresConnectorTask=DEBUG

View File

@ -571,10 +571,6 @@
<name>skipLongRunningTests</name>
<value>${skipLongRunningTests}</value>
</property>
<property>
<name>decoder.plugin.name</name>
<value>wal2json</value>
</property>
</systemProperties>
<argLine>-Djava.awt.headless=true ${debug.argline} ${modules.argline}</argLine>
<!--runOrder>alphabetical</runOrder-->