DBZ-1021 Ignore events for dropped table

This commit is contained in:
Jiri Pechanec 2018-12-06 12:06:29 +01:00 committed by Gunnar Morling
parent 284ae98adf
commit 9c225c63eb
2 changed files with 38 additions and 2 deletions

View File

@ -115,8 +115,11 @@ protected void refresh(PostgresConnection connection, TableId tableId, boolean r
Tables temp = new Tables();
connection.readSchema(temp, null, null, tableId::equals, null, true);
// we expect the refreshed table to be there
assert temp.size() == 1;
// the table could be deleted before the event was processed
if (temp.size() == 0) {
LOGGER.warn("Refresh of {} was requested but the table no longer exists", tableId);
return;
}
// overwrite (add or update) or views of the tables
tables().overwriteTable(temp.forTable(tableId));
// refresh the schema

View File

@ -212,6 +212,39 @@ public void shouldProduceEventsWithInitialSnapshot() throws Exception {
assertRecordsAfterInsert(2, 3, 3);
}
@Test
@FixFor("DBZ-1021")
public void shouldIgnoreEventsForDeletedTable() throws Exception {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
//check the records from the snapshot
assertRecordsFromSnapshot(2, 1, 1);
// insert 2 new records
TestHelper.execute(INSERT_STMT);
assertRecordsAfterInsert(2, 2, 2);
//now stop the connector
stopConnector();
assertNoRecordsToConsume();
//insert some more records and deleted the table
TestHelper.execute(INSERT_STMT);
TestHelper.execute("DROP TABLE s1.a");
start(PostgresConnector.class, configBuilder.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
assertConnectorIsRunning();
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics()).hasSize(1);
assertThat(actualRecords.recordsForTopic(topicName("s2.a"))).hasSize(1);
}
@Test
public void shouldIgnoreViews() throws Exception {
TestHelper.execute(