DBZ-5756 Add Compression Advisor tables as system-excluded tables

This commit is contained in:
Chris Cranford 2022-10-25 08:46:21 -04:00 committed by Jiri Pechanec
parent 5bf3dad9fd
commit 993a547c2f
2 changed files with 84 additions and 1 deletions

View File

@ -12,6 +12,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
@ -1208,6 +1209,12 @@ private static LogMiningBufferType parse(String value, String defaultValue) {
*/
private static class SystemTablesPredicate implements TableFilter {
/**
* Pattern that matches temporary analysis tables created by the Compression Advisor subsystem.
* These tables will be ignored by the connector.
*/
private final Pattern COMPRESSION_ADVISOR = Pattern.compile("^CMP[3|4]\\$[0-9]+$");
private final Configuration config;
SystemTablesPredicate(Configuration config) {
@ -1216,7 +1223,7 @@ private static class SystemTablesPredicate implements TableFilter {
@Override
public boolean isIncluded(TableId t) {
return !isExcludedSchema(t) && !isFlushTable(t);
return !isExcludedSchema(t) && !isFlushTable(t) && !isCompressionAdvisorTable(t);
}
private boolean isExcludedSchema(TableId id) {
@ -1226,6 +1233,10 @@ private boolean isExcludedSchema(TableId id) {
private boolean isFlushTable(TableId id) {
return LogWriterFlushStrategy.isFlushTable(id, config.getString(USER));
}
private boolean isCompressionAdvisorTable(TableId id) {
return COMPRESSION_ADVISOR.matcher(id.table()).matches();
}
}
@Override

View File

@ -2777,6 +2777,78 @@ public void shouldFilterUser() throws Exception {
}
@Test
@FixFor("DBZ-5756")
public void testShouldIgnoreCompressionAdvisorTablesDuringSnapshotAndStreaming() throws Exception {
// This test creates a dummy table to mimic the creation of a compression advisor table.
TestHelper.dropTable(connection, "CMP3$12345");
try {
// Create the advisor table prior to the connector starting
connection.execute("CREATE TABLE CMP3$12345 (id numeric(9,0), id2 numeric(9,0), data varchar2(50), primary key(id, id2))");
TestHelper.streamTable(connection, "CMP3$12345");
// insert some data
connection.execute("INSERT INTO CMP3$12345 (id,id2,data) values (1, 1, 'data')");
Configuration config = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CMP.*").build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// insert some data
connection.execute("INSERT INTO CMP3$12345 (id,id2,data) values (2, 2, 'data')");
try {
Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> {
assertNoRecordsToConsume();
return false;
});
}
catch (ConditionTimeoutException e) {
// expected
}
}
finally {
TestHelper.dropTable(connection, "CMP3$12345");
}
}
@Test
@FixFor("DBZ-5756")
public void testShouldIgnoreCompressionAdvisorTablesDuringStreaming() throws Exception {
// This test creates a dummy table to mimic the creation of a compression advisor table.
TestHelper.dropTable(connection, "CMP3$12345");
try {
Configuration config = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CMP.*").build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Create the advisor table while the connector is running
connection.execute("CREATE TABLE CMP3$12345 (id numeric(9,0), id2 numeric(9,0), data varchar2(50), primary key(id, id2))");
TestHelper.streamTable(connection, "CMP3$12345");
// insert some data
connection.execute("INSERT INTO CMP3$12345 (id,id2,data) values (1, 1, 'data')");
try {
Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> {
assertNoRecordsToConsume();
return false;
});
}
catch (ConditionTimeoutException e) {
// expected
}
}
finally {
TestHelper.dropTable(connection, "CMP3$12345");
}
}
@SuppressWarnings("unchecked")
private <T> T getStreamingMetric(String metricName) throws JMException {
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();