DBZ-1949 Return BitSet instead of short/int/long;

When converting Postgres bit varying column, convert to BitSet instead of short/int/long to handle cases where the value is larger than Long.MAX_VALUE
This commit is contained in:
dajerome 2020-04-21 05:26:49 -04:00 committed by GitHub
parent 1f6d53d13d
commit ad8beb76d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 23 additions and 21 deletions

View File

@ -21,6 +21,7 @@
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@ -632,17 +633,15 @@ protected Object convertBits(Column column, Field fieldDefn, Object data, int nu
data = ((PGobject) data).getValue();
}
if (data instanceof String) {
long longValue = Long.parseLong((String) data, 2);
// return the smallest possible value
if (Short.MIN_VALUE <= longValue && longValue <= Short.MAX_VALUE) {
data = (short) longValue;
}
else if (Integer.MIN_VALUE <= longValue && longValue <= Integer.MAX_VALUE) {
data = (int) longValue;
}
else {
data = longValue;
String dataStr = (String) data;
BitSet bitset = new BitSet(dataStr.length());
int len = dataStr.length();
for (int i = len - 1; i >= 0; i--) {
if (dataStr.charAt(i) == '1') {
bitset.set(len - i - 1);
}
}
data = bitset;
}
return super.convertBits(column, fieldDefn, data, numBytes);
}

View File

@ -112,8 +112,9 @@ public abstract class AbstractRecordsProducerTest extends AbstractConnectorTest
"'P1Y2M3DT4H5M6.78S'::INTERVAL," +
"'21016-11-04T13:51:30.123456'::TIMESTAMP, '21016-11-04T13:51:30.123457'::TIMESTAMP, '21016-11-04T13:51:30.124'::TIMESTAMP," +
"'21016-11-04T13:51:30.123456+07:00'::TIMESTAMPTZ)";
protected static final String INSERT_BIN_TYPES_STMT = "INSERT INTO bitbin_table (ba, bol, bs, bv) " +
"VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '11'::bit(2), '00'::bit(2))";
protected static final String INSERT_BIN_TYPES_STMT = "INSERT INTO bitbin_table (ba, bol, bs, bv, bv2, bvl) " +
"VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '11'::bit(2), '00'::bit(2), '000000110000001000000001'::bit(24)," +
"'1000000000000000000000000000000000000000000000000000000000000000'::bit(64))";
protected static final String INSERT_GEOM_TYPES_STMT = "INSERT INTO geom_table(p) VALUES ('(1,1)'::point)";
protected static final String INSERT_TEXT_TYPES_STMT = "INSERT INTO text_table(j, jb, x, u) " +
"VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " +
@ -525,8 +526,10 @@ protected List<SchemaAndValueField> schemaAndValuesForRangeTypes() {
protected List<SchemaAndValueField> schemaAndValuesForBinTypes() {
return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap(new byte[]{ 1, 2, 3 })),
new SchemaAndValueField("bol", Schema.OPTIONAL_BOOLEAN_SCHEMA, false),
new SchemaAndValueField("bs", Bits.builder(2).optional().build(), new byte[]{ 3, 0 }), // bitsets get converted from two's complement
new SchemaAndValueField("bv", Bits.builder(2).optional().build(), new byte[]{ 0, 0 }));
new SchemaAndValueField("bs", Bits.builder(2).optional().build(), new byte[]{ 3 }),
new SchemaAndValueField("bv", Bits.builder(2).optional().build(), new byte[]{ 0 }),
new SchemaAndValueField("bv2", Bits.builder(24).optional().build(), new byte[]{ 1, 2, 3 }),
new SchemaAndValueField("bvl", Bits.builder(64).optional().build(), new byte[]{ 0, 0, 0, 0, 0, 0, 0, -128 })); // Long.MAX_VALUE + 1
}
private long asEpochMillis(String timestamp) {
@ -811,8 +814,8 @@ protected List<SchemaAndValueField> schemasAndValuesForDomainAliasTypes(boolean
return Arrays.asList(
new SchemaAndValueField(PK_FIELD, SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("bit_base", Bits.builder(3).build(), new byte[]{ 5, 0 }),
new SchemaAndValueField("bit_alias", Bits.builder(3).build(), new byte[]{ 5, 0 }),
new SchemaAndValueField("bit_base", Bits.builder(3).build(), new byte[]{ 5 }),
new SchemaAndValueField("bit_alias", Bits.builder(3).build(), new byte[]{ 5 }),
new SchemaAndValueField("smallint_base", SchemaBuilder.INT16_SCHEMA, (short) 1),
new SchemaAndValueField("smallint_alias", SchemaBuilder.INT16_SCHEMA, (short) 1),
new SchemaAndValueField("integer_base", SchemaBuilder.INT32_SCHEMA, 1),
@ -869,8 +872,8 @@ protected List<SchemaAndValueField> schemasAndValuesForDomainAliasTypes(boolean
new SchemaAndValueField("xml_alias", Xml.builder().build(), "<foo>Hello</foo>"),
new SchemaAndValueField("uuid_base", Uuid.builder().build(), "40e6215d-b5c6-4896-987c-f30f3678f608"),
new SchemaAndValueField("uuid_alias", Uuid.builder().build(), "40e6215d-b5c6-4896-987c-f30f3678f608"),
new SchemaAndValueField("varbit_base", Bits.builder(3).build(), new byte[]{ 5, 0 }),
new SchemaAndValueField("varbit_alias", Bits.builder(3).build(), new byte[]{ 5, 0 }),
new SchemaAndValueField("varbit_base", Bits.builder(3).build(), new byte[]{ 5 }),
new SchemaAndValueField("varbit_alias", Bits.builder(3).build(), new byte[]{ 5 }),
new SchemaAndValueField("inet_base", SchemaBuilder.STRING_SCHEMA, "192.168.0.1"),
new SchemaAndValueField("inet_alias", SchemaBuilder.STRING_SCHEMA, "192.168.0.1"),
new SchemaAndValueField("cidr_base", SchemaBuilder.STRING_SCHEMA, "192.168.0.0/24"),

View File

@ -668,7 +668,7 @@ public void shouldSnapshotDomainAliasWithProperModifiers() throws Exception {
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "VARBIT2")
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "3")
.parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")
.build(), new byte[]{ 5, 0 }));
.build(), new byte[]{ 5 }));
consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", expected)));
}

View File

@ -1855,7 +1855,7 @@ public void shouldStreamDomainAliasWithProperModifiers() throws Exception {
List<SchemaAndValueField> expected = Arrays.asList(
new SchemaAndValueField(PK_FIELD, SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("value", Bits.builder(3).build(), new byte[]{ 5, 0 }));
new SchemaAndValueField("value", Bits.builder(3).build(), new byte[]{ 5 }));
assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER);
assertThat(consumer.isEmpty()).isTrue();

View File

@ -20,7 +20,7 @@ CREATE TABLE network_address_table (pk SERIAL, i INET, PRIMARY KEY(pk));
CREATE TABLE cidr_network_address_table (pk SERIAL, i CIDR, PRIMARY KEY(pk));
CREATE TABLE macaddr_table(pk SERIAL, m MACADDR, PRIMARY KEY(pk));
CREATE TABLE cash_table (pk SERIAL, csh MONEY, PRIMARY KEY(pk));
CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VARYING(2) , PRIMARY KEY(pk));
CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VARYING(2), bv2 BIT VARYING(24), bvl BIT VARYING(64), PRIMARY KEY(pk));
CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIME ZONE, ts_ms TIMESTAMP(3), ts_us TIMESTAMP(6), tz TIMESTAMPTZ, date DATE,
ti TIME, tip TIME(3), ttf TIME,