- Add support for processing WAL2JSON events that have no changes
DBZ-1181
This commit is contained in:
parent
c740400587
commit
df0e789601
@ -230,8 +230,12 @@ private void closeConnections() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void process(ReplicationMessage message, Long lsn, BlockingConsumer<ChangeEvent> consumer) throws SQLException, InterruptedException {
|
private void process(ReplicationMessage message, Long lsn, BlockingConsumer<ChangeEvent> consumer) throws SQLException, InterruptedException {
|
||||||
|
// in some cases we can get null if PG gives us back a message earlier than the latest reported flushed LSN.
|
||||||
|
// WAL2JSON can also send empty changes for DDL, materialized views, etc. and the heartbeat still needs to fire.
|
||||||
if (message == null) {
|
if (message == null) {
|
||||||
// in some cases we can get null if PG gives us back a message earlier than the latest reported flushed LSN
|
lastCompletelyProcessedLsn = lsn;
|
||||||
|
heartbeat.heartbeat(sourceInfo.partition(), sourceInfo.offset(),
|
||||||
|
r -> consumer.accept(new ChangeEvent(r, lsn)));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (message.isLastEventForLsn()) {
|
if (message.isLastEventForLsn()) {
|
||||||
|
@ -58,10 +58,17 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
|
|||||||
final Instant commitTime = Conversions.toInstant(dateTime.systemTimestamp(timestamp));
|
final Instant commitTime = Conversions.toInstant(dateTime.systemTimestamp(timestamp));
|
||||||
final Array changes = message.getArray("change");
|
final Array changes = message.getArray("change");
|
||||||
|
|
||||||
Iterator<Entry> it = changes.iterator();
|
// WAL2JSON may send empty changes that still have a txid. These events are from things like vacuum,
|
||||||
while (it.hasNext()) {
|
// materialized view, DDL, etc. They still need to be processed for the heartbeat to fire.
|
||||||
Value value = it.next().getValue();
|
if(changes.isEmpty()) {
|
||||||
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext(), typeRegistry));
|
processor.process(null);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Iterator<Entry> it = changes.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
Value value = it.next().getValue();
|
||||||
|
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext(), typeRegistry));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
throw new ConnectException(e);
|
throw new ConnectException(e);
|
||||||
|
@ -182,9 +182,7 @@ else if (firstChar == COMMA) {
|
|||||||
}
|
}
|
||||||
else if (firstChar == RIGHT_BRACKET) {
|
else if (firstChar == RIGHT_BRACKET) {
|
||||||
// No more changes
|
// No more changes
|
||||||
if (currentChunk != null) {
|
doProcessMessage(processor, typeRegistry, currentChunk, true);
|
||||||
doProcessMessage(processor, typeRegistry, currentChunk, true);
|
|
||||||
}
|
|
||||||
messageInProgress = false;
|
messageInProgress = false;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@ -238,10 +236,20 @@ private boolean isWhitespace(byte c) {
|
|||||||
|
|
||||||
private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, boolean lastMessage)
|
private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, boolean lastMessage)
|
||||||
throws IOException, SQLException, InterruptedException {
|
throws IOException, SQLException, InterruptedException {
|
||||||
final Document change = DocumentReader.floatNumbersAsTextReader().read(content);
|
if(content != null) {
|
||||||
|
final Document change = DocumentReader.floatNumbersAsTextReader().read(content);
|
||||||
|
LOGGER.trace("Change arrived for decoding {}", change);
|
||||||
|
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, change, containsMetadata, lastMessage, typeRegistry));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// If content is null then this is an empty change event that WAL2JSON can generate for events like DDL,
|
||||||
|
// truncate table, materialized views, etc. The transaction still needs to be processed for the heartbeat
|
||||||
|
// to fire.
|
||||||
|
LOGGER.trace("Empty change arrived");
|
||||||
|
processor.process(null);
|
||||||
|
}
|
||||||
|
|
||||||
LOGGER.trace("Change arrived for decoding {}", change);
|
|
||||||
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, change, containsMetadata, lastMessage, typeRegistry));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -20,8 +20,10 @@
|
|||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON;
|
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON;
|
||||||
|
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName.DECODERBUFS;
|
||||||
|
|
||||||
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
|
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
|
||||||
|
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs;
|
||||||
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
|
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.apache.kafka.connect.data.Decimal;
|
import org.apache.kafka.connect.data.Decimal;
|
||||||
@ -1263,6 +1265,28 @@ public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTable
|
|||||||
testReceiveChangesForReplicaIdentityFullTableWithToastedValue(SchemaRefreshMode.COLUMNS_DIFF, false);
|
testReceiveChangesForReplicaIdentityFullTableWithToastedValue(SchemaRefreshMode.COLUMNS_DIFF, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test()
|
||||||
|
@FixFor("DBZ-1181")
|
||||||
|
@SkipWhenDecoderPluginNameIs(DECODERBUFS)
|
||||||
|
public void testEmptyChangesProducesHeartbeat() throws Exception {
|
||||||
|
// the low heartbeat interval should make sure that a heartbeat message is emitted after each change record
|
||||||
|
// received from Postgres
|
||||||
|
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
|
||||||
|
.with(Heartbeat.HEARTBEAT_INTERVAL, "1")
|
||||||
|
.build());
|
||||||
|
setupRecordsProducer(config);
|
||||||
|
String statement = "CREATE SCHEMA s1;" ;
|
||||||
|
|
||||||
|
// Expecting one empty DDL change
|
||||||
|
consumer = testConsumer(1);
|
||||||
|
recordsProducer.start(consumer, blackHole);
|
||||||
|
executeAndWait(statement);
|
||||||
|
|
||||||
|
// Expecting one heartbeat for the empty DDL change
|
||||||
|
assertHeartBeatRecordInserted();
|
||||||
|
assertThat(consumer.isEmpty()).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode mode, boolean tablesBeforeStart) throws Exception{
|
private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode mode, boolean tablesBeforeStart) throws Exception{
|
||||||
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
|
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
|
||||||
.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, mode)
|
.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, mode)
|
||||||
|
Loading…
Reference in New Issue
Block a user