DBZ-7237 Align LOB behavior in snapshot and streaming

The snapshot phase was not setting the unavailable value placeholder when the
user had configured LOB as off, this aligns that behavior to be consistent
with the behavior from streaming.
This commit is contained in:
Chris Cranford 2023-12-11 12:35:43 -05:00 committed by Jiri Pechanec
parent c5ce1cf945
commit 9a5d04578e
3 changed files with 20 additions and 26 deletions

View File

@ -285,12 +285,7 @@ protected Object convertString(Column column, Field fieldDefn, Object data) {
return ((CHAR) data).stringValue();
}
if (data instanceof Clob) {
if (!lobEnabled) {
if (column.isOptional()) {
return null;
}
return "";
}
if (lobEnabled) {
try {
Clob clob = (Clob) data;
// Note that java.sql.Clob specifies that the first character starts at 1
@ -302,6 +297,10 @@ protected Object convertString(Column column, Field fieldDefn, Object data) {
throw new DebeziumException("Couldn't convert value for column " + column.name(), e);
}
}
else {
data = UNAVAILABLE_VALUE;
}
}
if (data instanceof String) {
String s = (String) data;
if (EMPTY_CLOB_FUNCTION.equals(s)) {
@ -335,18 +334,13 @@ else if (isHexToRawFunctionCall(str)) {
}
}
else if (data instanceof Blob) {
if (!lobEnabled) {
if (column.isOptional()) {
return null;
}
else {
data = NumberConversions.BYTE_ZERO;
}
}
else {
if (lobEnabled) {
Blob blob = (Blob) data;
data = blob.getBytes(1, Long.valueOf(blob.length()).intValue());
}
else {
data = UNAVAILABLE_VALUE;
}
}
else if (data instanceof RAW) {
data = ((RAW) data).getBytes();

View File

@ -1174,12 +1174,12 @@ public void shouldNotEmitBlobFieldValuesWhenLobSupportIsNotEnabled() throws Exce
SourceRecord record = table.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isNull();
assertThat(after.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config));
record = table.get(1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isNull();
assertThat(after.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config));
// Small data and large data
connection.prepareQuery("INSERT INTO dbz3645 (id,data) values (3,?)", ps -> ps.setBlob(1, blob1), null);

View File

@ -1565,12 +1565,12 @@ public void shouldNotEmitClobFieldValuesWhenLobSupportIsNotEnabled() throws Exce
SourceRecord record = table.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isNull();
assertThat(after.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config));
record = table.get(1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isNull();
assertThat(after.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config));
// Small data and large data
connection.executeWithoutCommitting("INSERT INTO dbz3645 (id,data) values (3,'Test3')");