diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index b732d2992..af22c20eb 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -74,6 +74,7 @@ public class MySqlSchema extends RelationalDatabaseSchema { private final HistoryRecordComparator historyComparator; private final boolean skipUnparseableDDL; private final boolean storeOnlyMonitoredTablesDdl; + private boolean recoveredTables; /** * Create a schema component given the supplied {@link MySqlConnectorConfig MySQL connector configuration}. @@ -131,7 +132,6 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { } }; this.dbHistory.configure(dbHistoryConfig, historyComparator, new DatabaseHistoryMetrics(configuration), true); // validates - } private static MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) { @@ -248,6 +248,7 @@ protected void appendCreateTableStatement(StringBuilder sb, Table table) { public void loadHistory(SourceInfo startingPoint) { tables().clear(); dbHistory.recover(startingPoint.partition(), startingPoint.offset(), tables(), ddlParser); + recoveredTables = !tableIds().isEmpty(); refreshSchemas(); } @@ -376,4 +377,9 @@ else if (filters.databaseFilter().test(databaseName) || databaseName == null || public boolean isStoreOnlyMonitoredTablesDdl() { return storeOnlyMonitoredTablesDdl; } + + @Override + public boolean tableInformationComplete() { + return recoveredTables; + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java index 02e41684c..d8700b4ad 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java @@ -285,4 +285,10 @@ public Table tableFor(int relationId) { LOGGER.debug("Relation '{}' resolved to table '{}'", relationId, tableId); return tableFor(tableId); } + + @Override + public boolean tableInformationComplete() { + // PostgreSQL does not support HistorizedDatabaseSchema - so no tables are recovered + return false; + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index b1905fee4..baf07c8b5 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1285,6 +1285,30 @@ public void shouldRewriteIdentityKey() throws InterruptedException { } + @Test + @FixFor("DBZ-1519") + public void shouldNotIssueWarningForNoMonitoredTablesAfterApplyingFilters() throws Exception { + final LogInterceptor logInterceptor = new LogInterceptor(); + + TestHelper.execute(SETUP_TABLES_STMT); + TestHelper.execute(INSERT_STMT); + Configuration config = TestHelper.defaultConfig().with(PostgresConnectorConfig.SCHEMA_WHITELIST, "s2").build(); + + // Start connector, verify that it does not log no monitored tables warning + start(PostgresConnector.class, config); + waitForSnapshotToBeCompleted(); + SourceRecords records = consumeRecordsByTopic(1); + assertThat(logInterceptor.containsMessage(NO_MONITORED_TABLES_WARNING)).isFalse(); + stopConnector(); + + // Restart connector, verify it does not log no monitored tables warning + start(PostgresConnector.class, config); + waitForStreamingRunning(); + assertThat(logInterceptor.containsMessage(NO_MONITORED_TABLES_WARNING)).isFalse(); + + stopConnector(); + } + private CompletableFuture batchInsertRecords(long recordsCount, int batchSize) { String insertStmt = "INSERT INTO text_table(j, jb, x, u) " + "VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " + diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index aa5d1f941..4c0d10fb1 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -87,9 +87,11 @@ public synchronized void start(T taskContext, C SnapshotResult snapshotResult = snapshotSource.execute(context); LOGGER.info("Snapshot ended with {}", snapshotResult); - schema.assureNonEmptySchema(); + if (snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED || schema.tableInformationComplete()) { + schema.assureNonEmptySchema(); + } - if (running && snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED) { + if (running && snapshotResult.isCompletedOrSkipped()) { streamingSource = changeEventSourceFactory.getStreamingChangeEventSource(snapshotResult.getOffset()); eventDispatcher.setEventListener(streamingMetrics); streamingMetrics.connected(true); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/SnapshotResult.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/SnapshotResult.java index 685a70b78..e173404f6 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/SnapshotResult.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/SnapshotResult.java @@ -23,6 +23,14 @@ public static SnapshotResult aborted() { return new SnapshotResult(SnapshotResultStatus.ABORTED, null); } + public static SnapshotResult skipped(OffsetContext offset) { + return new SnapshotResult(SnapshotResultStatus.SKIPPED, offset); + } + + public boolean isCompletedOrSkipped() { + return this.status == SnapshotResultStatus.SKIPPED || this.status == SnapshotResultStatus.COMPLETED; + } + public SnapshotResultStatus getStatus() { return status; } @@ -33,7 +41,8 @@ public OffsetContext getOffset() { public static enum SnapshotResultStatus { COMPLETED, - ABORTED; + ABORTED, + SKIPPED } @Override diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java index e0017736a..46196284e 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java @@ -28,6 +28,7 @@ public abstract class HistorizedRelationalDatabaseSchema extends RelationalDatab implements HistorizedDatabaseSchema { private final DatabaseHistory databaseHistory; + private boolean recoveredTables; protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnectorConfig config, TopicSelector topicSelector, TableFilter tableFilter, ColumnNameFilter columnFilter, TableSchemaBuilder schemaBuilder, @@ -41,6 +42,7 @@ protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnect @Override public void recover(OffsetContext offset) { databaseHistory.recover(offset.getPartition(), offset.getOffset(), tables(), getDdlParser()); + recoveredTables = !tableIds().isEmpty(); for (TableId tableId : tableIds()) { buildAndRegisterSchema(tableFor(tableId)); } @@ -81,4 +83,9 @@ protected void record(SchemaChangeEvent schemaChange, TableChanges tableChanges) databaseHistory.record(schemaChange.getPartition(), schemaChange.getOffset(), schemaChange.getDatabase(), schemaChange.getSchema(), schemaChange.getDdl(), tableChanges); } + + @Override + public boolean tableInformationComplete() { + return recoveredTables; + } } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java index 040eac278..137123d7b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java @@ -176,4 +176,9 @@ private TableId toLowerCaseIfNeeded(TableId tableId) { protected TableFilter getTableFilter() { return tableFilter; } + + @Override + public boolean tableInformationComplete() { + return false; + } } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index ac7b606c3..bcb127f0f 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -92,7 +92,7 @@ public SnapshotResult execute(ChangeEventSourceContext context) throws Interrupt // Neither schema nor data require snapshotting if (!snapshottingTask.snapshotSchema() && !snapshottingTask.snapshotData()) { LOGGER.debug("Skipping snapshotting"); - return SnapshotResult.completed(previousOffset); + return SnapshotResult.skipped(previousOffset); } delaySnapshotIfNeeded(context); diff --git a/debezium-core/src/main/java/io/debezium/schema/DatabaseSchema.java b/debezium-core/src/main/java/io/debezium/schema/DatabaseSchema.java index 34b3bd569..65f125ddc 100644 --- a/debezium-core/src/main/java/io/debezium/schema/DatabaseSchema.java +++ b/debezium-core/src/main/java/io/debezium/schema/DatabaseSchema.java @@ -18,4 +18,12 @@ public interface DatabaseSchema { void close(); DataCollectionSchema schemaFor(I id); + + /** + * Indicates whether or not table names are guaranteed to be fully present, regardless of whether or not a + * snapshot has been performed. + * + * @return boolean indicating if table names are present + */ + boolean tableInformationComplete(); }