From 4ade54351f0f1d158b668be59bcb1177772f9316 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Fri, 16 Feb 2024 14:59:00 +0100 Subject: [PATCH] DBZ-7461 Rename SCHEMA_ONLY_RECOVERY to RECOVERY and SCHEMA_ONLY to NO_DATA --- .../mongodb/MongoDbConnectorConfig.java | 8 ++- .../MongoDbSnapshotChangeEventSource.java | 3 +- .../mongodb/IncrementalSnapshotIT.java | 2 +- .../connector/mongodb/MongoDbConnectorIT.java | 4 +- .../connector/mongodb/MongoMetricsIT.java | 4 +- .../mongodb/ShardedIncrementalSnapshotIT.java | 2 +- .../connector/mysql/MySqlConnectorConfig.java | 21 +++++- .../connector/mysql/BlockingSnapshotIT.java | 2 +- .../mysql/IncrementalSnapshotIT.java | 2 +- .../MySqlConnectorConvertingFailureIT.java | 14 ++-- .../connector/mysql/MySqlConnectorIT.java | 22 +++---- .../mysql/MySqlConnectorSchemaValidateIT.java | 16 ++--- .../connector/mysql/MySqlSchemaHistoryIT.java | 8 +-- .../mysql/MySqlTopicNamingStrategyIT.java | 4 +- .../connector/mysql/MysqlDefaultValueIT.java | 4 +- .../debezium/connector/mysql/SignalsIT.java | 2 +- .../mysql/SnapshotParallelSourceIT.java | 2 +- .../connector/mysql/SnapshotSourceIT.java | 8 +-- .../mysql/TransactionMetadataIT.java | 6 +- .../oracle/OracleConnectorConfig.java | 19 +++++- .../IncrementalSnapshotCaseSensitiveIT.java | 2 +- .../oracle/IncrementalSnapshotIT.java | 2 +- .../oracle/OracleConnectorFilterIT.java | 8 +-- .../connector/oracle/OracleConnectorIT.java | 30 ++++----- ...acleSkipMessagesWithoutChangeConfigIT.java | 6 +- .../connector/oracle/OutboxEventRouterIT.java | 2 +- .../debezium/connector/oracle/SignalsIT.java | 2 +- .../oracle/StreamingDatatypesIT.java | 2 +- .../oracle/TransactionMetadataIT.java | 4 +- .../postgresql/PostgresConnectorConfig.java | 6 ++ .../postgresql/BlockingSnapshotIT.java | 2 +- .../postgresql/CloudEventsConverterIT.java | 2 +- .../connector/postgresql/DomainTypesIT.java | 4 +- .../postgresql/IncrementalSnapshotIT.java | 4 +- .../postgresql/OutboxEventRouterIT.java | 2 +- .../postgresql/PostgresConnectorIT.java | 38 +++++------ .../postgresql/PostgresMetricsIT.java | 4 +- .../connector/postgresql/PostgresMoneyIT.java | 8 +-- ...gresSkipMessagesWithoutChangeConfigIT.java | 8 +-- .../postgresql/PublicGeometryIT.java | 2 +- .../postgresql/RecordsStreamProducerIT.java | 30 ++++----- .../connector/postgresql/SignalsIT.java | 10 +-- .../postgresql/TablesWithoutPrimaryKeyIT.java | 4 +- .../postgresql/TransactionMetadataIT.java | 2 +- .../timescaledb/TimescaleDbDatabaseTest.java | 2 +- .../sqlserver/SqlServerConnectorConfig.java | 8 ++- .../EventProcessingFailureHandlingIT.java | 6 +- .../sqlserver/IncrementalSnapshotIT.java | 2 +- .../IncrementalSnapshotWithRecompileIT.java | 2 +- .../connector/sqlserver/NotificationsIT.java | 2 +- .../sqlserver/SchemaHistoryTopicIT.java | 2 +- .../connector/sqlserver/SnapshotIT.java | 6 +- .../sqlserver/SqlServerChangeTableSetIT.java | 28 ++++---- .../sqlserver/SqlServerConnectorIT.java | 30 ++++----- ...erverSkipMessagesWithNoUpdateConfigIT.java | 4 +- .../snapshot/mode/NeverSnapshotter.java | 11 ++-- .../snapshot/mode/NoDataSnapshotter.java | 64 +++++++++++++++++++ .../snapshot/mode/RecoverySnapshotter.java | 20 ++++++ .../mode/SchemaOnlyRecoverySnapshotter.java | 10 ++- .../snapshot/mode/SchemaOnlySnapshotter.java | 50 ++------------- .../io.debezium.spi.snapshot.Snapshotter | 7 +- .../connector/oracle/EndToEndPerf.java | 2 +- 62 files changed, 347 insertions(+), 246 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/snapshot/mode/NoDataSnapshotter.java create mode 100644 debezium-core/src/main/java/io/debezium/snapshot/mode/RecoverySnapshotter.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java index 6b6e5d1f0..ee2d9828f 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java @@ -74,8 +74,14 @@ public enum SnapshotMode implements EnumeratedValue { /** * Never perform a snapshot and only receive new data changes. + * @deprecated to be removed in Debezium 3.0, replaced by {{@link #NO_DATA}} */ - NEVER("never", false); + NEVER("never", false), + + /** + * Never perform a snapshot and only receive new data changes. + */ + NO_DATA("no_data", false); private final String value; private final boolean includeData; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java index 47af02d51..7a3f68350 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java @@ -132,7 +132,8 @@ public SnapshottingTask getSnapshottingTask(MongoDbPartition partition, MongoDbO List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); // If no snapshot should occur, return task with no replica sets - if (this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) { + if (this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER) || + this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NO_DATA)) { LOGGER.info("According to the connector configuration, no snapshot will occur."); return new SnapshottingTask(false, false, dataCollectionsToBeSnapshotted, Map.of(), false); } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/IncrementalSnapshotIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/IncrementalSnapshotIT.java index a0b321c43..682635301 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/IncrementalSnapshotIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/IncrementalSnapshotIT.java @@ -97,7 +97,7 @@ protected Configuration.Builder config() { .with(MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_COLLECTION_NAME) .with(MongoDbConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) .with(MongoDbConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) - .with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER); + .with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA); } protected String dataCollectionName() { diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java index 6011bdb37..8974fc51b 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java @@ -576,7 +576,7 @@ public void shouldConsumeLargeEvents() throws InterruptedException { final var dbName = "dbit"; config = TestHelper.getConfiguration(mongo).edit() - .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER) + .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NO_DATA) .with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS_UPDATE_FULL_WITH_PRE_IMAGE) .with(MongoDbConnectorConfig.CURSOR_OVERSIZE_HANDLING_MODE, MongoDbConnectorConfig.OversizeHandlingMode.SPLIT) .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) @@ -2488,7 +2488,7 @@ public void shouldNotReplicateSnapshot() throws Exception { config = TestHelper.getConfiguration(mongo).edit() .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.contacts") .with(CommonConnectorConfig.TOPIC_PREFIX, "mongo") - .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER) + .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NO_DATA) .build(); context = new MongoDbTaskContext(config); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java index 6cad93294..820b617e3 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java @@ -114,7 +114,7 @@ public void testStreamingOnlyMetrics() throws Exception { // Setup this.config = TestHelper.getConfiguration(mongo) .edit() - .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER) + .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NO_DATA) .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .build(); this.context = new MongoDbTaskContext(config); @@ -156,7 +156,7 @@ public void testPauseResumeSnapshotMetrics() throws Exception { this.config = TestHelper.getConfiguration(mongo) .edit() - .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER) + .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NO_DATA) .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION, "dbit.debezium_signal") .with(MongoDbConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1) diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ShardedIncrementalSnapshotIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ShardedIncrementalSnapshotIT.java index 5c1b8a822..82569bfe2 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ShardedIncrementalSnapshotIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ShardedIncrementalSnapshotIT.java @@ -65,7 +65,7 @@ protected Configuration.Builder config() { .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, fullDataCollectionName() + ",dbA.c1,dbA.c2") .with(MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_COLLECTION_NAME) .with(MongoDbConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, CHUNK_SIZE) - .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER); + .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NO_DATA); } @Test diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 1782f2f61..3d7b413e1 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -152,16 +152,35 @@ public enum SnapshotMode implements EnumeratedValue { * Perform a snapshot of only the database schemas (without data) and then begin reading the binlog. * This should be used with care, but it is very useful when the change event consumers need only the changes * from the point in time the snapshot is made (and doesn't care about any state or changes prior to this point). + * + * @deprecated to be removed in Debezium 3.0, replaced by {{@link #NO_DATA}} */ SCHEMA_ONLY("schema_only"), + /** + * Perform a snapshot of only the database schemas (without data) and then begin reading the binlog. + * This should be used with care, but it is very useful when the change event consumers need only the changes + * from the point in time the snapshot is made (and doesn't care about any state or changes prior to this point). + */ + NO_DATA("no_data"), + + /** + * Perform a snapshot of only the database schemas (without data) and then begin reading the binlog at the current binlog position. + * This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted). + * This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped, + * otherwise some events during the gap may be processed with an incorrect schema and corrupted. + * + * @deprecated to be removed in Debezium 3.0, replaced by {{@link #RECOVERY}} + */ + SCHEMA_ONLY_RECOVERY("schema_only_recovery"), + /** * Perform a snapshot of only the database schemas (without data) and then begin reading the binlog at the current binlog position. * This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted). * This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped, * otherwise some events during the gap may be processed with an incorrect schema and corrupted. */ - SCHEMA_ONLY_RECOVERY("schema_only_recovery"), + RECOVERY("recovery"), /** * Never perform a snapshot and only read the binlog. This assumes the binlog contains all the history of those diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java index d03a66f17..b4a1694aa 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java @@ -61,7 +61,7 @@ protected Configuration.Builder config() { .with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true) .with(MySqlConnectorConfig.USER, "mysqluser") .with(MySqlConnectorConfig.PASSWORD, "mysqlpw") - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY.getValue()) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA.getValue()) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal")) .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java index db9ec8ef4..d8b719b63 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java @@ -70,7 +70,7 @@ protected Configuration.Builder config() { .with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true) .with(MySqlConnectorConfig.USER, "mysqluser") .with(MySqlConnectorConfig.PASSWORD, "mysqlpw") - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY.getValue()) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal")) .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorConvertingFailureIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorConvertingFailureIT.java index 893da5d75..2099cadbb 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorConvertingFailureIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorConvertingFailureIT.java @@ -71,7 +71,7 @@ public void shouldRecoverToSyncSchemaWhenFailedValueConvertByDdlWithSqlLogBinIsO .with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")) .with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("dbz7143")) .with(MySqlConnectorConfig.EVENT_CONVERTING_FAILURE_HANDLING_MODE, EventConvertingFailureHandlingMode.FAIL) .build(); @@ -113,7 +113,7 @@ public void shouldRecoverToSyncSchemaWhenFailedValueConvertByDdlWithSqlLogBinIsO .with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")) .with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY_RECOVERY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.RECOVERY) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("dbz7143")) .build(); @@ -150,7 +150,7 @@ public void shouldRecoverToSyncSchemaWhenFailedValueConvertByDdlWithSqlLogBinIsO public void shouldFailConversionNullableTimeTypeWithConnectModeWhenWarnMode() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("time_table")) .with(MySqlConnectorConfig.EVENT_CONVERTING_FAILURE_HANDLING_MODE, EventConvertingFailureHandlingMode.WARN) .with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT) @@ -187,7 +187,7 @@ public void shouldFailConversionNullableTimeTypeWithConnectModeWhenWarnMode() th public void shouldFailedConvertedValueIsNullWithSkipMode() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("dbz7143")) .with(MySqlConnectorConfig.EVENT_CONVERTING_FAILURE_HANDLING_MODE, EventConvertingFailureHandlingMode.SKIP) .build(); @@ -246,7 +246,7 @@ public void shouldFailedConvertedValueIsNullWithSkipMode() throws Exception { public void shouldFailConversionNotNullTimeTypeWithConnectModeWhenWarnMode() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("time_table")) .with(MySqlConnectorConfig.EVENT_CONVERTING_FAILURE_HANDLING_MODE, EventConvertingFailureHandlingMode.WARN) .with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT) @@ -280,7 +280,7 @@ public void shouldFailConversionNotNullTimeTypeWithConnectModeWhenWarnMode() thr public void shouldFailConversionTimeTypeWithConnectModeWhenFailMode() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("time_table")) .with(MySqlConnectorConfig.EVENT_CONVERTING_FAILURE_HANDLING_MODE, EventConvertingFailureHandlingMode.FAIL) .with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT) @@ -313,7 +313,7 @@ public void shouldFailConversionTimeTypeWithConnectModeWhenFailMode() throws Exc public void shouldFailConversionDefaultTimeTypeWithConnectModeWhenWarnMode() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("default_time_table")) .with(MySqlConnectorConfig.EVENT_CONVERTING_FAILURE_HANDLING_MODE, EventConvertingFailureHandlingMode.WARN) .with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index b4470d23d..3149e9a22 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -844,7 +844,7 @@ public void shouldIgnoreAlterTableForNonCapturedTablesNotStoredInHistory() throw final String tables = String.format("%s.customers", DATABASE.getDatabaseName(), DATABASE.getDatabaseName()); config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true) .build(); @@ -880,7 +880,7 @@ public void shouldSaveSetCharacterSetWhenStoringOnlyCapturededTables() throws SQ config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "no_" + DATABASE.getDatabaseName()) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true) .build(); @@ -904,7 +904,7 @@ public void shouldProcessCreateUniqueIndex() throws SQLException, InterruptedExc final String tables = String.format("%s.migration_test", DATABASE.getDatabaseName(), DATABASE.getDatabaseName()); config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .build(); @@ -972,7 +972,7 @@ public void shouldIgnoreAlterTableForNonCapturedTablesStoredInHistory() throws S final String tables = String.format("%s.customers", DATABASE.getDatabaseName(), DATABASE.getDatabaseName()); config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .build(); @@ -1010,7 +1010,7 @@ public void shouldIgnoreCreateIndexForNonCapturedTablesNotStoredInHistory() thro final String tables = String.format("%s.customers", DATABASE.getDatabaseName(), DATABASE.getDatabaseName()); config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true) .build(); @@ -1049,7 +1049,7 @@ public void shouldReceiveSchemaForNonWhitelistedTablesAndDatabases() throws SQLE final String tables = String.format("%s.customers,%s.orders", DATABASE.getDatabaseName(), DATABASE.getDatabaseName()); config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, ".*") .build(); @@ -2412,7 +2412,7 @@ public void shouldEmitHeadersOnPrimaryKeyUpdate() throws Exception { public void shouldEmitNoEventsForSkippedCreateOperations() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c") .build(); @@ -2452,7 +2452,7 @@ public void shouldEmitNoEventsForSkippedCreateOperations() throws Exception { public void shouldEmitNoEventsForSkippedUpdateAndDeleteOperations() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TOMBSTONES_ON_DELETE, false) .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "u,d") .build(); @@ -2575,7 +2575,7 @@ public void testDmlInChangeEvents() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("products")) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, CommonConnectorConfig.EventProcessingFailureHandlingMode.FAIL) .build(); @@ -2631,7 +2631,7 @@ public void testDmlInChangeEvents() throws Exception { public void shouldNotSendTombstonesWhenNotSupportedByHandler() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c") .build(); @@ -2663,7 +2663,7 @@ public void shouldNotSendTombstonesWhenNotSupportedByHandler() throws Exception public void shouldEmitTruncateOperation() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, SnapshotLockingMode.NONE) .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "none") .build(); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorSchemaValidateIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorSchemaValidateIT.java index c099f3c91..c89e122b9 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorSchemaValidateIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorSchemaValidateIT.java @@ -68,7 +68,7 @@ public void afterEach() { public void shouldRecoverToSyncSchemaWhenAddColumnToEndWithSqlLogBinIsOff() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); AtomicReference exception = new AtomicReference<>(); @@ -106,7 +106,7 @@ public void shouldRecoverToSyncSchemaWhenAddColumnToEndWithSqlLogBinIsOff() thro config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY_RECOVERY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.RECOVERY) .build(); start(MySqlConnector.class, config, (success, message, error) -> exception.set(error)); @@ -152,7 +152,7 @@ public void shouldRecoverToSyncSchemaWhenAddColumnToEndWithSqlLogBinIsOff() thro public void shouldRecoverToSyncSchemaWhenAddColumnInMiddleWithSqlLogBinIsOff() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); AtomicReference exception = new AtomicReference<>(); @@ -190,7 +190,7 @@ public void shouldRecoverToSyncSchemaWhenAddColumnInMiddleWithSqlLogBinIsOff() t config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY_RECOVERY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.RECOVERY) .build(); start(MySqlConnector.class, config, (success, message, error) -> exception.set(error)); @@ -236,7 +236,7 @@ public void shouldRecoverToSyncSchemaWhenAddColumnInMiddleWithSqlLogBinIsOff() t public void shouldRecoverToSyncSchemaWhenDropColumnWithSqlLogBinIsOff() throws Exception { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); AtomicReference exception = new AtomicReference<>(); @@ -274,7 +274,7 @@ public void shouldRecoverToSyncSchemaWhenDropColumnWithSqlLogBinIsOff() throws E config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY_RECOVERY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.RECOVERY) .build(); start(MySqlConnector.class, config, (success, message, error) -> exception.set(error)); @@ -313,7 +313,7 @@ public void shouldRecoverToSyncSchemaWhenAddColumnToEndWithSqlLogBinIsOffAndColu config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(MySqlConnectorConfig.COLUMN_INCLUDE_LIST, "dbz7093.id" + "," + "dbz7093.newcol") - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); AtomicReference exception = new AtomicReference<>(); @@ -351,7 +351,7 @@ public void shouldRecoverToSyncSchemaWhenAddColumnToEndWithSqlLogBinIsOffAndColu config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY_RECOVERY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.RECOVERY) .build(); start(MySqlConnector.class, config, (success, message, error) -> exception.set(error)); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaHistoryIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaHistoryIT.java index 1bcda4ff5..326d7a0fe 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaHistoryIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaHistoryIT.java @@ -64,7 +64,7 @@ public void afterEach() { @FixFor("DBZ-3485") public void shouldUseQuotedNameInDrop() throws SQLException, InterruptedException { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .build(); // Start the connector ... @@ -85,7 +85,7 @@ public void shouldUseQuotedNameInDrop() throws SQLException, InterruptedExceptio @FixFor("DBZ-3399") public void shouldStoreSingleRename() throws SQLException, InterruptedException { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .build(); // Start the connector ... @@ -113,7 +113,7 @@ public void shouldStoreSingleRename() throws SQLException, InterruptedException @FixFor("DBZ-3399") public void shouldStoreMultipleRenames() throws SQLException, InterruptedException { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .build(); // Start the connector ... @@ -141,7 +141,7 @@ public void shouldStoreMultipleRenames() throws SQLException, InterruptedExcepti @FixFor("DBZ-3399") public void shouldStoreAlterRename() throws SQLException, InterruptedException { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .build(); // Start the connector ... diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTopicNamingStrategyIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTopicNamingStrategyIT.java index dbaea9ccb..63796715c 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTopicNamingStrategyIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTopicNamingStrategyIT.java @@ -101,7 +101,7 @@ public void testSpecifyDelimiterAndPrefixStrategy() throws SQLException, Interru public void testSpecifyByLogicalTableStrategy() throws SQLException, InterruptedException { String tables = DATABASE.qualifiedTableName("dbz_4180_00") + "," + DATABASE.qualifiedTableName("dbz_4180_01"); config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables) .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false") .with(DefaultRegexTopicNamingStrategy.TOPIC_REGEX, "(.*)(dbz_4180)(.*)") @@ -142,7 +142,7 @@ public void testSpecifyByLogicalTableStrategy() throws SQLException, Interrupted @FixFor("DBZ-4180") public void testSpecifyTransactionStrategy() throws SQLException, InterruptedException { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName(TABLE_NAME)) .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false") .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, "true") diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java index aaa870bd9..faf490acf 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java @@ -944,7 +944,7 @@ public void alterDateAndTimeTest() throws Exception { @FixFor("DBZ-4822") public void shouldConvertDefaultBoolean2Number() throws Exception { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("DBZ_4822_DEFAULT_BOOLEAN")) .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) @@ -990,7 +990,7 @@ public void shouldConvertDefaultBoolean2Number() throws Exception { @FixFor("DBZ-5241") public void shouldConvertDefaultWithCharacterSetIntroducer() throws Exception { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("DBZ_5241_DEFAULT_CS_INTRO")) .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SignalsIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SignalsIT.java index f5a7951ee..a3bd3d6d0 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SignalsIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SignalsIT.java @@ -88,7 +88,7 @@ protected Configuration.Builder config() { .with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true) .with(MySqlConnectorConfig.USER, "mysqluser") .with(MySqlConnectorConfig.PASSWORD, "mysqlpw") - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY.getValue()) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA.getValue()) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal")) .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotParallelSourceIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotParallelSourceIT.java index 117d94d12..a408c2f35 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotParallelSourceIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotParallelSourceIT.java @@ -83,7 +83,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLock() throws Excep public void shouldParallelCreateSnapshotSchema() throws Exception { List includeDatabases = Collect.arrayListOf(DATABASE.getDatabaseName(), OTHER_DATABASE.getDatabaseName()); config = simpleConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, MySqlConnectorConfig.SnapshotLockingMode.NONE) .with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, String.join(",", includeDatabases)) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java index 977eb7fae..a48d01ddf 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java @@ -145,7 +145,7 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyCa .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedTables); } if (!data) { - builder.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY); + builder.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA); } config = builder.build(); @@ -618,7 +618,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep @Test(expected = DebeziumException.class) public void shouldCreateSnapshotSchemaOnlyRecovery_exception() throws Exception { - config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY).build(); + config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.RECOVERY).build(); // Start the connector ... AtomicReference exception = new AtomicReference<>(); @@ -649,7 +649,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception { assertThat(sourceRecords.allRecordsInOrder()).hasSize(recordCount); stopConnector(); - builder.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY); + builder.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.RECOVERY); config = builder.build(); start(MySqlConnector.class, config); @@ -823,7 +823,7 @@ private LinkedHashSet getTableNamesInSpecifiedOrder(String... tables) { @Test public void shouldCreateSnapshotSchemaOnly() throws Exception { config = simpleConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(Heartbeat.HEARTBEAT_INTERVAL, 300_000) .build(); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/TransactionMetadataIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/TransactionMetadataIT.java index 2db7de70e..8e6b80fdb 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/TransactionMetadataIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/TransactionMetadataIT.java @@ -66,7 +66,7 @@ public void afterEach() { @Test public void transactionMetadataEnabled() throws InterruptedException, SQLException { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .build(); @@ -113,7 +113,7 @@ public void transactionMetadataEnabled() throws InterruptedException, SQLExcepti @FixFor("DBZ-4077") public void shouldUseConfiguredTransactionTopicName() throws InterruptedException, SQLException { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .with(AbstractTopicNamingStrategy.TOPIC_TRANSACTION, "tx.of.server") @@ -143,7 +143,7 @@ public void shouldUseConfiguredTransactionTopicName() throws InterruptedExceptio @FixFor("DBZ-4077") public void shouldUseConfiguredTransactionTopicNameWithoutServerName() throws InterruptedException, SQLException { config = DATABASE.defaultConfig() - .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .with(AbstractTopicNamingStrategy.TOPIC_TRANSACTION, "mytransactions") diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index c750ff671..e6b8cb936 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -953,16 +953,33 @@ public enum SnapshotMode implements EnumeratedValue { /** * Perform a snapshot of the schema but no data upon initial startup of a connector. + * + * @deprecated to be removed in Debezium 3.0, replaced by {{@link #NO_DATA}} */ SCHEMA_ONLY("schema_only"), + /** + * Perform a snapshot of the schema but no data upon initial startup of a connector. + */ + NO_DATA("no_data"), + + /** + * Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position. + * This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted). + * This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped, + * otherwise some events during the gap may be processed with an incorrect schema and corrupted. + * + * @deprecated to be removed in Debezium 3.0, replaced by {{@link #RECOVERY}} + */ + SCHEMA_ONLY_RECOVERY("schema_only_recovery"), + /** * Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position. * This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted). * This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped, * otherwise some events during the gap may be processed with an incorrect schema and corrupted. */ - SCHEMA_ONLY_RECOVERY("schema_only_recovery"), + RECOVERY("recovery"), /** * Perform a snapshot when it is needed. diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotCaseSensitiveIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotCaseSensitiveIT.java index c1614fb7d..28cfa03d6 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotCaseSensitiveIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotCaseSensitiveIT.java @@ -163,7 +163,7 @@ protected String signalTableName() { @Override protected Configuration.Builder config() { return TestHelper.defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA) .with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL") .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A,DEBEZIUM\\.B,DEBEZIUM\\.A42") .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DEBEZIUM\\.A42:pk1,pk2,pk3,pk4") diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java index 5878a38f0..37bb67599 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java @@ -155,7 +155,7 @@ protected String signalTableName() { @Override protected Configuration.Builder config() { return TestHelper.defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA) .with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL") .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A,DEBEZIUM\\.B,DEBEZIUM\\.A42") .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DEBEZIUM.A42:pk1,pk2,pk3,pk4") diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java index cab870117..3b9e87e62 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java @@ -286,7 +286,7 @@ private void shouldApplyTableInclusionConfiguration() throws Exception { Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM") .with(option, "DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(OracleConnector.class, config); @@ -348,7 +348,7 @@ private void shouldApplySchemaAndTableInclusionConfiguration() throws Exception Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM,DEBEZIUM2") .with(option, "DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(OracleConnector.class, config); @@ -428,7 +428,7 @@ private void shouldApplyTableExclusionsConfiguration() throws Exception { Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM") .with(option, "DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(OracleConnector.class, config); @@ -496,7 +496,7 @@ else if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.OLR) Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.SCHEMA_EXCLUDE_LIST, "DEBEZIUM,SYS") .with(option, "DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*,DEBEZIUM2\\.NOPK") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(OracleConnector.class, config); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index ddb3eb13f..7610212d9 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -617,7 +617,7 @@ public void shouldStreamAfterRestartAfterSnapshot() throws Exception { public void shouldReadChangeStreamForExistingTable() throws Exception { Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(OracleConnector.class, config); @@ -707,7 +707,7 @@ public void shouldReadChangeStreamForExistingTable() throws Exception { public void deleteWithoutTombstone() throws Exception { Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.TOMBSTONES_ON_DELETE, false) .build(); @@ -858,7 +858,7 @@ public void shouldConsumeEventsWithMaskedAndTruncatedColumns(boolean useDatabase if (useDatabaseName) { final String dbName = TestHelper.getDatabaseName(); config = TestHelper.defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("column.mask.with.12.chars", dbName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME") .with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", dbName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2," + dbName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3") @@ -867,7 +867,7 @@ public void shouldConsumeEventsWithMaskedAndTruncatedColumns(boolean useDatabase } else { config = TestHelper.defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("column.mask.with.12.chars", "DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME") .with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2,DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3") .with("column.truncate.to.4.chars", "DEBEZIUM.TRUNCATED_COLUMN_TABLE.NAME") @@ -927,13 +927,13 @@ private void shouldRewriteIdentityKey(boolean useDatabaseName) throws Exception final Configuration config; if (useDatabaseName) { config = TestHelper.defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.MSG_KEY_COLUMNS, "(.*).debezium.customer:id,name") .build(); } else { config = TestHelper.defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.MSG_KEY_COLUMNS, "debezium.customer:id,name") .build(); } @@ -960,7 +960,7 @@ private void shouldRewriteIdentityKey(boolean useDatabaseName) throws Exception @FixFor({ "DBZ-1916", "DBZ-1830" }) public void shouldPropagateSourceTypeByDatatype() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("datatype.propagate.source.type", ".+\\.NUMBER,.+\\.VARCHAR2,.+\\.FLOAT") .build(); @@ -1923,7 +1923,7 @@ public void shouldResumeStreamingAtCorrectScnOffset() throws Exception { connection.execute("ALTER TABLE debezium.offset_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); final Configuration config = TestHelper.defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.OFFSET_TEST") .build(); @@ -2817,7 +2817,7 @@ public void shouldFilterUser() throws Exception { Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3978") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.LOG_MINING_USERNAME_EXCLUDE_LIST, "DEBEZIUM") // This test expects the filtering to occur in the connector, not the query .with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "none") @@ -2952,7 +2952,7 @@ private long toMicroSecondsSinceEpoch(LocalDateTime localDateTime) { public void shouldCreateSnapshotSchemaOnlyRecoveryExceptionWithoutOffset() { final Path path = Testing.Files.createTestingPath("missing-history.txt").toAbsolutePath(); Configuration config = defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY_RECOVERY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA_RECOVERY) .with(FileSchemaHistory.FILE_PATH, path) .build(); @@ -2968,7 +2968,7 @@ public void shouldCreateSnapshotSchemaOnlyRecoveryExceptionWithoutOffset() { public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception { try { Configuration.Builder builder = defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986") .with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName()) .with(EmbeddedEngineConfig.OFFSET_STORAGE, FileOffsetBackingStore.class.getName()); @@ -2977,7 +2977,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception { // Insert a row of data in advance connection.execute("INSERT INTO DBZ3986 (ID, DATA) values (3, 'asuka')"); - builder.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY_RECOVERY); + builder.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA_RECOVERY); config = builder.build(); start(OracleConnector.class, config); @@ -3001,7 +3001,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception { public void shouldCreateSnapshotSchemaOnlyExceptionWithoutHistory() throws Exception { try { Configuration.Builder builder = defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986") .with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName()) .with(EmbeddedEngineConfig.OFFSET_STORAGE, FileOffsetBackingStore.class.getName()); @@ -3022,7 +3022,7 @@ public void shouldCreateSnapshotSchemaOnlyExceptionWithoutHistory() throws Excep public void shouldSkipDataOnSnapshotSchemaOnly() throws Exception { try { Configuration.Builder builder = defaultConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986") .with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName()) .with(EmbeddedEngineConfig.OFFSET_STORAGE, MemoryOffsetBackingStore.class.getName()); @@ -5371,7 +5371,7 @@ public void shouldNotFailToStartWhenSignalDataCollectionNotDefinedWithinTableInc .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6528") .with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".C##DBZUSER.SIGNALS") .with(OracleConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, "true") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY.getValue()) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .build(); start(OracleConnector.class, config); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSkipMessagesWithoutChangeConfigIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSkipMessagesWithoutChangeConfigIT.java index cef8d3853..5b64d27b9 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSkipMessagesWithoutChangeConfigIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSkipMessagesWithoutChangeConfigIT.java @@ -69,7 +69,7 @@ public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabled() throw .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TEST") .with(OracleConnectorConfig.COLUMN_INCLUDE_LIST, "DEBEZIUM\\.TEST\\.ID, DEBEZIUM\\.TEST\\.WHITE") .with(OracleConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true) - .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA) .build(); start(OracleConnector.class, config); @@ -111,7 +111,7 @@ public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledWithExcl .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TEST") .with(OracleConnectorConfig.COLUMN_EXCLUDE_LIST, "DEBEZIUM\\.TEST\\.BLACK") .with(OracleConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true) - .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA) .build(); start(OracleConnector.class, config); @@ -153,7 +153,7 @@ public void shouldNotSkipEventsWithNoChangeInIncludedColumnsWhenSkipDisabled() t .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TEST") .with(OracleConnectorConfig.COLUMN_INCLUDE_LIST, "DEBEZIUM\\.TEST\\.ID, DEBEZIUM\\.TEST\\.WHITE") .with(OracleConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, false) - .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA) .build(); start(OracleConnector.class, config); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OutboxEventRouterIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OutboxEventRouterIT.java index d120181b4..f96d4c069 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OutboxEventRouterIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OutboxEventRouterIT.java @@ -70,7 +70,7 @@ protected JdbcConnection databaseConnection() { @Override protected Configuration.Builder getConfigurationBuilder(boolean initialSnapshot) { - final SnapshotMode snapshotMode = initialSnapshot ? SnapshotMode.INITIAL : SnapshotMode.SCHEMA_ONLY; + final SnapshotMode snapshotMode = initialSnapshot ? SnapshotMode.INITIAL : SnapshotMode.NO_DATA; return TestHelper.defaultConfig() .with(OracleConnectorConfig.SNAPSHOT_MODE, snapshotMode.getValue()) // this allows numeric(1) to be simulated as boolean types like other databases diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SignalsIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SignalsIT.java index c91eae40d..d5cf660e9 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SignalsIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SignalsIT.java @@ -97,7 +97,7 @@ public void signalSchemaChange() throws Exception { Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER") .with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(OracleConnectorConfig.LOG_MINING_STRATEGY, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG) .build(); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java index 01428d918..0983cd970 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java @@ -62,7 +62,7 @@ protected Builder connectorConfig() { return TestHelper.defaultConfig() .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList) - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY); + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA); } @Override diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/TransactionMetadataIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/TransactionMetadataIT.java index 9e911baae..be9db3853 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/TransactionMetadataIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/TransactionMetadataIT.java @@ -92,7 +92,7 @@ public void before() throws SQLException { public void transactionMetadata() throws Exception { Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER,DEBEZIUM\\.ORDERS") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .with(OracleConnectorConfig.LOG_MINING_STRATEGY, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG) .build(); @@ -151,7 +151,7 @@ public void transactionMetadataMultipleTransactions() throws Exception { Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER,DEBEZIUM\\.ORDERS") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .with(OracleConnectorConfig.LOG_MINING_STRATEGY, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG) .build(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 5009e3260..bd504733c 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -188,9 +188,15 @@ public enum SnapshotMode implements EnumeratedValue { /** * Never perform a snapshot and only receive logical changes. + * @deprecated to be removed in Debezium 3.0, replaced by {{@link #NO_DATA}} */ NEVER("never"), + /** + * Never perform a snapshot and only receive logical changes. + */ + NO_DATA("no_data"), + /** * Perform a snapshot and then stop before attempting to receive any logical changes. */ diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java index 7f5d129a0..a657b87fd 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java @@ -54,7 +54,7 @@ public void after() { protected Configuration.Builder config() { return TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CloudEventsConverterIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CloudEventsConverterIT.java index 59d59bc2e..d7084f272 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CloudEventsConverterIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CloudEventsConverterIT.java @@ -73,7 +73,7 @@ protected JdbcConnection databaseConnection() { @Override protected Configuration.Builder getConfigurationBuilder() { return TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "outboxsmtit,s1") .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "outboxsmtit.outbox,s1.a"); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DomainTypesIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DomainTypesIT.java index fc78d6e9f..4e02c7f29 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DomainTypesIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DomainTypesIT.java @@ -44,7 +44,7 @@ public void before() throws SQLException { @FixFor("DBZ-3657") public void shouldNotChokeOnDomainTypeInArray() throws Exception { start(PostgresConnector.class, TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "domaintypes") .build()); assertConnectorIsRunning(); @@ -64,7 +64,7 @@ public void shouldNotChokeOnDomainTypeInArray() throws Exception { @FixFor("DBZ-3657") public void shouldExportDomainTypeInArrayAsUnknown() throws Exception { start(PostgresConnector.class, TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "domaintypes") .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) .build()); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java index 8b53bb7db..e7102e57b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java @@ -95,7 +95,7 @@ public void after() { protected Configuration.Builder config() { return TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) @@ -117,7 +117,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s tableIncludeList = "s1.a,s1.b"; } return TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java index 25a9f0241..66458595a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java @@ -58,7 +58,7 @@ protected JdbcConnection databaseConnection() { @Override protected Configuration.Builder getConfigurationBuilder(boolean initialSnapshot) { - SnapshotMode snapshotMode = initialSnapshot ? SnapshotMode.INITIAL : SnapshotMode.NEVER; + SnapshotMode snapshotMode = initialSnapshot ? SnapshotMode.INITIAL : SnapshotMode.NO_DATA; return TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, snapshotMode.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 40a355b80..7040c7e39 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -186,7 +186,7 @@ public void shouldNotStartWithInvalidSlotConfigAndUserRoles() throws Exception { TestHelper.execute("CREATE USER badboy WITH PASSWORD 'failing';", "GRANT ALL PRIVILEGES ON DATABASE postgres TO badboy;"); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME) .build(); @@ -197,7 +197,7 @@ public void shouldNotStartWithInvalidSlotConfigAndUserRoles() throws Exception { .with("name", "failingPGConnector") .with(PostgresConnectorConfig.DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER, "badboy") .with(PostgresConnectorConfig.DATABASE_CONFIG_PREFIX + JdbcConfiguration.PASSWORD, "failing") - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME) .build(); List validatedConfig = new PostgresConnector().validate(failingConfig.asMap()).configValues(); @@ -943,7 +943,7 @@ public void shouldProduceEventsWhenSnapshotsAreNeverAllowed() throws Interrupted TestHelper.dropDefaultReplicationSlot(); TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .build(); start(PostgresConnector.class, config); @@ -1089,7 +1089,7 @@ public void shouldUpdateReplicaIdentity() throws Exception { String setupStmt = SETUP_TABLES_STMT; TestHelper.execute(setupStmt); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.a:DEFAULT") .build(); @@ -1118,7 +1118,7 @@ public void shouldUpdateReplicaIdentityWithRegExp() throws Exception { TestHelper.executeDDL("postgres_create_multiple_tables.ddl"); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "(.*).a:FULL,s2.*:NOTHING") .build(); @@ -1152,7 +1152,7 @@ public void shouldNotUpdateReplicaIdentityWithRegExpDuplicated() throws Exceptio TestHelper.executeDDL("postgres_create_multiple_tables.ddl"); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s.*:FULL,s2.*:NOTHING") .build(); @@ -1180,7 +1180,7 @@ public void shouldUpdateReplicaIdentityWithOneTable() throws Exception { String setupStmt = SETUP_TABLES_STMT; TestHelper.execute(setupStmt); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL") .build(); @@ -1209,7 +1209,7 @@ public void shouldUpdateReplicaIdentityUsingIndex() throws Exception { String setupStmt = SETUP_TABLES_STMT; TestHelper.execute(setupStmt); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.a:INDEX a_pkey") .build(); @@ -1246,7 +1246,7 @@ public void shouldLogOwnershipErrorForReplicaIdentityUpdate() throws Exception { TestHelper.executeDDL("postgres_create_role_specific_tables.ddl"); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.a:DEFAULT") .with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, "DISABLED") @@ -1274,7 +1274,7 @@ public void shouldCheckTablesToUpdateReplicaIdentityAreCaptured() throws Excepti String setupStmt = SETUP_TABLES_STMT; TestHelper.execute(setupStmt); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.b:DEFAULT") .build(); @@ -1534,7 +1534,7 @@ public void shouldNotSendEmptyOffset() throws InterruptedException, SQLException "CREATE SCHEMA s1; " + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));"; Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a") .with(Heartbeat.HEARTBEAT_INTERVAL, 10) @@ -1556,7 +1556,7 @@ public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException final int recordCount = 10; TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a") .build(); @@ -1596,7 +1596,7 @@ public void shouldRegularlyFlushLsnWithTxMonitoring() throws InterruptedExceptio final int recordCount = 10; TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a") .with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) @@ -2180,7 +2180,7 @@ public void testStreamingPerformance() throws Exception { TestHelper.dropAllSchemas(); TestHelper.executeDDL("postgres_create_tables.ddl"); Configuration.Builder configBuilder = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE); start(PostgresConnector.class, configBuilder.build()); assertConnectorIsRunning(); @@ -2409,7 +2409,7 @@ public void shouldCreatePublicationWhenReplicationSlotExists() throws Exception TestHelper.executeDDL("postgres_create_tables.ddl"); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false) .build(); @@ -3020,7 +3020,7 @@ public void shouldEmitNoEventsForSkippedCreateOperations() throws Exception { TestHelper.dropDefaultReplicationSlot(); TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.SKIPPED_OPERATIONS, Envelope.Operation.UPDATE.code()) .build(); @@ -3096,7 +3096,7 @@ public void shouldHaveLastCommitLsn() throws InterruptedException { TestHelper.execute(SETUP_TABLES_STMT); start(PostgresConnector.class, TestHelper.defaultConfig() .with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .build()); assertConnectorIsRunning(); @@ -3299,7 +3299,7 @@ public void shouldStopConnectorOnSlotRecreation() throws InterruptedException { TestHelper.execute(INSERT_STMT); configBuilder = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.name()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.name()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.SLOT_SEEK_TO_KNOWN_OFFSET, Boolean.TRUE); @@ -3332,7 +3332,7 @@ public void shouldSeekToCorrectOffset() throws InterruptedException { TestHelper.execute(INSERT_STMT); configBuilder = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.name()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.name()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.SLOT_SEEK_TO_KNOWN_OFFSET, Boolean.TRUE); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java index 7ff4d50f2..72e2f92d8 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java @@ -152,7 +152,7 @@ public void testStreamingOnlyMetrics() throws Exception { // start connector start(PostgresConnector.class, TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .build()); @@ -269,7 +269,7 @@ public void oneRecordInQueue() throws Exception { final CountDownLatch step2 = new CountDownLatch(1); Configuration.Builder configBuilder = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 10) .with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMoneyIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMoneyIT.java index 2d7192e41..f01afc03a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMoneyIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMoneyIT.java @@ -49,7 +49,7 @@ public void shouldReceiveChangesForInsertsWithPreciseMode() throws Exception { createTable(); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) .build(); start(PostgresConnector.class, config); waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); @@ -74,7 +74,7 @@ public void shouldReceiveChangesForInsertsWithStringMode() throws Exception { createTable(); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, "string") .build(); start(PostgresConnector.class, config); @@ -100,7 +100,7 @@ public void shouldReceiveChangesForInsertsWithDoubleMode() throws Exception { createTable(); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, "double") .build(); start(PostgresConnector.class, config); @@ -126,7 +126,7 @@ public void shouldReceiveChangesForInsertNullAndZeroMoney() throws Exception { createTable(); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) .build(); start(PostgresConnector.class, config); waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSkipMessagesWithoutChangeConfigIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSkipMessagesWithoutChangeConfigIT.java index 1b4fef818..5804d7d46 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSkipMessagesWithoutChangeConfigIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSkipMessagesWithoutChangeConfigIT.java @@ -55,7 +55,7 @@ public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabled() throw Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.COLUMN_INCLUDE_LIST, "updates_test.debezium_test.id, updates_test.debezium_test.white") .with(PostgresConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) .build(); start(PostgresConnector.class, config); @@ -95,7 +95,7 @@ public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledWithExcl Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.COLUMN_EXCLUDE_LIST, "updates_test.debezium_test.black") .with(PostgresConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) .build(); start(PostgresConnector.class, config); @@ -133,7 +133,7 @@ public void shouldNotSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledButTa Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.COLUMN_INCLUDE_LIST, "updates_test.debezium_test.id, updates_test.debezium_test.white") .with(PostgresConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) .build(); start(PostgresConnector.class, config); @@ -174,7 +174,7 @@ public void shouldNotSkipEventsWithNoChangeInIncludedColumnsWhenSkipDisabled() t Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.COLUMN_INCLUDE_LIST, "updates_test.debezium_test.id, updates_test.debezium_test.white") .with(PostgresConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, false) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) .build(); start(PostgresConnector.class, config); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PublicGeometryIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PublicGeometryIT.java index 624fbcd46..03db2a616 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PublicGeometryIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PublicGeometryIT.java @@ -91,7 +91,7 @@ public void shouldReceiveChangesForInsertsWithPostgisTypes() throws Exception { private void setupRecordsProducer(Configuration.Builder config) { start(PostgresConnector.class, config - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build()); assertConnectorIsRunning(); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 27474e25a..6898528fc 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -153,7 +153,7 @@ private void startConnector(Function config .with(PostgresConnectorConfig.POLL_INTERVAL_MS, "50") .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.b") - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER), + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA), false); waitForStreamingToStart(); @@ -2714,7 +2714,7 @@ public void shouldStartConsumingFromSlotLocation() throws Exception { "INSERT INTO test_table (text) VALUES ('insert4')"); startConnector(config -> config .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) .with(EmbeddedEngineConfig.OFFSET_STORAGE, MemoryOffsetBackingStore.class), false); consumer.expects(3); @@ -2886,7 +2886,7 @@ public void shouldStreamChangesForDomainAliasAlterTable() throws Exception { startConnector(config -> config .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table") .with("column.propagate.source.type", "public.alias_table.salary3"), false); @@ -2928,7 +2928,7 @@ public void shouldStreamDomainAliasWithProperModifiers() throws Exception { startConnector(config -> config .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table"), false); @@ -2960,7 +2960,7 @@ public void shouldStreamValuesForDomainTypeOfDomainType() throws Exception { startConnector(config -> config .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table") .with("column.propagate.source.type", "public.alias_table.value"), false); @@ -2993,7 +2993,7 @@ public void shouldStreamValuesForAliasLikeBaseTypes() throws Exception { startConnector(config -> config .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table"), false); @@ -3151,7 +3151,7 @@ public void shouldStreamEnumAsKnownType() throws Exception { TestHelper.execute("CREATE TABLE enum_table (pk SERIAL, PRIMARY KEY (pk));"); startConnector(config -> config .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("column.propagate.source.type", "public.enum_table.value") .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.enum_table"), false); @@ -3188,7 +3188,7 @@ public void shouldEmitEnumColumnDefaultValuesInSchema() throws Exception { TestHelper.execute("CREATE TABLE enum_table (pk SERIAL, PRIMARY KEY (pk));"); startConnector(config -> config .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("column.propagate.source.type", "public.enum_table.value") .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.enum_table"), false); @@ -3227,7 +3227,7 @@ public void shouldStreamEnumArrayAsKnownType() throws Exception { TestHelper.execute("CREATE TABLE enum_array_table (pk SERIAL, PRIMARY KEY (pk));"); startConnector(config -> config .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("column.propagate.source.type", "public.enum_array_table.value") .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.enum_array_table"), false); @@ -3290,7 +3290,7 @@ public void shouldStreamTimeArrayTypesAsKnownTypes() throws Exception { + "timestamptza timestamptz[] NOT NULL, primary key(pk));"); startConnector(config -> config .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.time_array_table"), false); waitForStreamingToStart(); @@ -3355,7 +3355,7 @@ public void shouldStreamEnumsWhenIncludeUnknownDataTypesDisabled() throws Except TestHelper.execute("CREATE TABLE enum_table (pk SERIAL, data varchar(25) NOT NULL, value test_type NOT NULL DEFAULT 'V1', PRIMARY KEY (pk));"); startConnector(config -> config .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("column.propagate.source.type", "public.enum_table.value") .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.enum_table"), false); @@ -3547,7 +3547,7 @@ public void shouldPropagateSourceTypeByDatatype() throws Exception { TestHelper.execute("CREATE TABLE test_table (id SERIAL, c1 INT, c2 INT, c3a NUMERIC(5,2), c3b VARCHAR(128), f1 float(10), f2 decimal(8,4), primary key (id));"); startConnector(config -> config - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.FLOAT4"), false); waitForStreamingToStart(); @@ -3761,7 +3761,7 @@ record = consumer.remove(); @FixFor({ "DBZ-6635", "DBZ-7316" }) public void testSendingHeartbeatsWithoutWalUpdates() throws Exception { Function configMapper = config -> config - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(Heartbeat.HEARTBEAT_INTERVAL, "100") .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SignalsIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SignalsIT.java index 567471420..d5fd95395 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SignalsIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SignalsIT.java @@ -77,7 +77,7 @@ private void signalLog(boolean includingEscapedCharacter) throws InterruptedExce TestHelper.dropDefaultReplicationSlot(); TestHelper.execute(includingEscapedCharacter ? SETUP_TABLES_STMT.replace(signalTable, signalTableWithEscapedCharacter) : SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, includingEscapedCharacter ? signalTableWithEscapedCharacter : signalTable) .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500") @@ -113,7 +113,7 @@ public void signalingDisabled() throws InterruptedException { TestHelper.dropDefaultReplicationSlot(); TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500") @@ -147,7 +147,7 @@ public void signalSchemaChange() throws InterruptedException { TestHelper.dropDefaultReplicationSlot(); TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500") @@ -227,7 +227,7 @@ public void jmxSignals() throws Exception { TestHelper.dropDefaultReplicationSlot(); TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500") .with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "jmx") @@ -253,7 +253,7 @@ public void customAction() throws Exception { TestHelper.dropDefaultReplicationSlot(); TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500") .with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "jmx") diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TablesWithoutPrimaryKeyIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TablesWithoutPrimaryKeyIT.java index 345a989d8..b501749e9 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TablesWithoutPrimaryKeyIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TablesWithoutPrimaryKeyIT.java @@ -87,7 +87,7 @@ public void shouldProcessFromSnapshotOld() throws Exception { @Test public void shouldProcessFromStreaming() throws Exception { start(PostgresConnector.class, TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "nopk") .build()); assertConnectorIsRunning(); @@ -126,7 +126,7 @@ public void shouldProcessFromStreaming() throws Exception { @Test public void shouldProcessFromStreamingOld() throws Exception { start(PostgresConnector.class, TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "nopk") .build()); assertConnectorIsRunning(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TransactionMetadataIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TransactionMetadataIT.java index bd65c93c5..bedbe6c16 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TransactionMetadataIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TransactionMetadataIT.java @@ -76,7 +76,7 @@ public void transactionMetadata() throws InterruptedException { TestHelper.dropDefaultReplicationSlot(); TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .build(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/timescaledb/TimescaleDbDatabaseTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/timescaledb/TimescaleDbDatabaseTest.java index 79707c15a..cedb965f3 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/timescaledb/TimescaleDbDatabaseTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/timescaledb/TimescaleDbDatabaseTest.java @@ -70,7 +70,7 @@ public void prepareDatabase() throws Exception { config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.HOSTNAME, timescaleDbContainer.getHost()) .with(PostgresConnectorConfig.PORT, timescaleDbContainer.getMappedPort(5432)) - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(PostgresConnectorConfig.PLUGIN_NAME, PostgresConnectorConfig.LogicalDecoder.PGOUTPUT) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "_timescaledb_internal") .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index edee0e429..b11dba228 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -68,8 +68,14 @@ public enum SnapshotMode implements EnumeratedValue { /** * Perform a snapshot of the schema but no data upon initial startup of a connector. + * @deprecated to be removed in Debezium 3.0, replaced by {{@link #NO_DATA}} */ - SCHEMA_ONLY("schema_only", false); + SCHEMA_ONLY("schema_only", false), + + /** + * Perform a snapshot of the schema but no data upon initial startup of a connector. + */ + NO_DATA("no_data", false); private final String value; private final boolean includeData; diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/EventProcessingFailureHandlingIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/EventProcessingFailureHandlingIT.java index 63e9a6227..9b2dd6df9 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/EventProcessingFailureHandlingIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/EventProcessingFailureHandlingIT.java @@ -63,7 +63,7 @@ public void warn() throws Exception { final int RECORDS_PER_TABLE = 5; final int ID_START_1 = 10; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE, EventProcessingFailureHandlingMode.WARN) .build(); final LogInterceptor logInterceptor = new LogInterceptor(EventDispatcher.class); @@ -106,7 +106,7 @@ public void ignore() throws Exception { final int RECORDS_PER_TABLE = 5; final int ID_START_1 = 10; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE, EventProcessingFailureHandlingMode.SKIP) .build(); @@ -142,7 +142,7 @@ public void fail() throws Exception { final int RECORDS_PER_TABLE = 5; final int ID_START_1 = 10; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); final LogInterceptor logInterceptor = new LogInterceptor(ErrorHandler.class); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java index 30f6c0b64..a1179ad4f 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java @@ -166,7 +166,7 @@ protected String createTableStatement(String newTable, String copyTable) { @Override protected Builder config() { return TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal") .with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250) .with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true) diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotWithRecompileIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotWithRecompileIT.java index 04d0b8dfa..9324badce 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotWithRecompileIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotWithRecompileIT.java @@ -116,7 +116,7 @@ protected String signalTableName() { @Override protected Builder config() { return TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal") .with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE, true) .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "dbo.a42:pk1,pk2,pk3,pk4"); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/NotificationsIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/NotificationsIT.java index 9ecca4bba..0308ad8ea 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/NotificationsIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/NotificationsIT.java @@ -97,7 +97,7 @@ protected String snapshotStatusResult() { @Test public void completeReadingFromACaptureInstanceNotificationEmitted() throws SQLException { startConnector(config -> config - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA) .with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification") .with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink")); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SchemaHistoryTopicIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SchemaHistoryTopicIT.java index 68148e79f..8a75c7c12 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SchemaHistoryTopicIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SchemaHistoryTopicIT.java @@ -71,7 +71,7 @@ public void streamingSchemaChanges() throws Exception { final int ID_START_2 = 100; final int ID_START_3 = 1000; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .build(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java index a58d47fc0..801f45aa8 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java @@ -232,7 +232,7 @@ private void testStreaming() throws SQLException, InterruptedException { @Test public void takeSchemaOnlySnapshotAndStartStreaming() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -291,7 +291,7 @@ public void takeSnapshotFromTableWithReservedName() throws Exception { @Test public void takeSchemaOnlySnapshotAndSendHeartbeat() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(Heartbeat.HEARTBEAT_INTERVAL, 300_000) .build(); @@ -595,7 +595,7 @@ public void shouldHandleBracketsInSnapshotSelect() throws InterruptedException, @FixFor("DBZ-6811") public void shouldSendHeartbeatsWhenNoRecordsAreSent() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(Heartbeat.HEARTBEAT_INTERVAL, 100) .build(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java index 26eb926f8..6e7ac6bc2 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java @@ -72,7 +72,7 @@ public void addTable() throws Exception { final int TABLES = 2; final int ID_START = 10; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -138,7 +138,7 @@ public void removeTable() throws Exception { final int ID_START_1 = 10; final int ID_START_2 = 100; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -175,7 +175,7 @@ public void removeTable() throws Exception { @Test public void addColumnToTableEndOfBatchWithoutLsnLimit() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); addColumnToTable(config, true); } @@ -184,7 +184,7 @@ public void addColumnToTableEndOfBatchWithoutLsnLimit() throws Exception { @FixFor("DBZ-3992") public void addColumnToTableEndOfBatchWithLsnLimit() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.MAX_TRANSACTIONS_PER_ITERATION, 1) .build(); addColumnToTable(config, true); @@ -193,7 +193,7 @@ public void addColumnToTableEndOfBatchWithLsnLimit() throws Exception { @Test public void addColumnToTableMiddleOfBatchWithoutLsnLimit() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); addColumnToTable(config, false); } @@ -202,7 +202,7 @@ public void addColumnToTableMiddleOfBatchWithoutLsnLimit() throws Exception { @FixFor("DBZ-3992") public void addColumnToTableMiddleOfBatchWithLsnLimit() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.MAX_TRANSACTIONS_PER_ITERATION, 1) .build(); addColumnToTable(config, true); @@ -326,7 +326,7 @@ public void removeColumnFromTable() throws Exception { final int ID_START_2 = 100; final int ID_START_3 = 1000; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -413,7 +413,7 @@ public void removeColumnFromTableWithoutChangingCapture() throws Exception { final int ID_START_1 = 10; final Configuration config = TestHelper.defaultConfig() .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.tableb2") - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, ".*id") .build(); @@ -448,7 +448,7 @@ public void addColumnToTableWithParallelWrites() throws Exception { final int ID_START_2 = 100; final int ID_START_3 = 1000; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -547,7 +547,7 @@ public void readHistoryAfterRestart() throws Exception { final int ID_START_2 = 100; final int ID_START_3 = 1000; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -640,7 +640,7 @@ public void renameColumn() throws Exception { final int ID_START_2 = 100; final int ID_START_3 = 1000; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -727,7 +727,7 @@ public void changeColumn() throws Exception { final int ID_START_2 = 100; final int ID_START_3 = 1000; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -820,7 +820,7 @@ public void changeColumn() throws Exception { @FixFor("DBZ-1491") public void addDefaultValue() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -859,7 +859,7 @@ public void alterDefaultValue() throws Exception { TestHelper.enableTableCdc(connection, "table_dv"); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 7ef0a366e..3aa1bfcba 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -963,7 +963,7 @@ public void testIncludeTable() throws Exception { final int TABLES = 1; final int ID_START = 10; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.tableb") .build(); connection.execute( @@ -1120,7 +1120,7 @@ public void blacklistColumnWhenCdcColumnsDoNotMatchWithOriginalSnapshot() throws connection.execute("ALTER TABLE table_a ADD blacklisted_column varchar(30)"); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.COLUMN_EXCLUDE_LIST, "dbo.table_a.blacklisted_column") .build(); @@ -1165,7 +1165,7 @@ public void testColumnExcludeList() throws Exception { TestHelper.enableTableCdc(connection, "blacklist_column_table_b"); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.COLUMN_EXCLUDE_LIST, "dbo.blacklist_column_table_a.amount") .build(); @@ -1227,7 +1227,7 @@ public void testColumnIncludeList() throws Exception { TestHelper.enableTableCdc(connection, "include_list_column_table_b"); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, ".*id,.*name,dbo.include_list_column_table_b.amount") .build(); @@ -1311,7 +1311,7 @@ public void shouldConsumeEventsWithMaskedHashedColumns() throws Exception { TestHelper.enableTableCdc(connection, "masked_hashed_column_table_b"); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "testDB1.dbo.masked_hashed_column_table_a.name, testDB1.dbo.masked_hashed_column_table_b.name") .build(); @@ -1359,7 +1359,7 @@ public void shouldConsumeEventsWithMaskedAndTruncatedColumns() throws Exception TestHelper.enableTableCdc(connection, "truncated_column_table"); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("column.mask.with.12.chars", "testDB1.dbo.masked_hashed_column_table.name") .with("column.truncate.to.4.chars", "testDB1.dbo.truncated_column_table.name") .build(); @@ -1567,7 +1567,7 @@ public void excludeColumnWhenCaptureInstanceExcludesColumns() throws Exception { Arrays.asList("id", "name")); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -1654,7 +1654,7 @@ public void includeColumnsWhenCaptureInstanceExcludesColumnInMiddleOfTable() thr Arrays.asList("id", "name")); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, "dbo.include_list_column_table_a.id,dbo.include_list_column_table_a.name") .build(); @@ -1696,7 +1696,7 @@ public void excludeMultipleColumnsWhenCaptureInstanceExcludesSingleColumn() thro // Exclude the note column on top of the already excluded amount column final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.COLUMN_EXCLUDE_LIST, "dbo.exclude_list_column_table_a.amount,dbo.exclude_list_column_table_a.note") .build(); @@ -1738,7 +1738,7 @@ public void includeMultipleColumnsWhenCaptureInstanceExcludesSingleColumn() thro // Exclude the note column on top of the already excluded amount column final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, "dbo.include_list_column_table_a.id,dbo.include_list_column_table_a.name") .build(); @@ -1777,7 +1777,7 @@ public void includeMultipleColumnsWhenCaptureInstanceExcludesSingleColumn() thro @FixFor("DBZ-964") public void shouldPropagateDatabaseDriverProperties() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with("database.applicationName", "Debezium App DBZ-964") .build(); @@ -2083,7 +2083,7 @@ public void shouldCaptureTableSchema() throws SQLException, InterruptedException TestHelper.enableTableCdc(connection, "table_schema_test"); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -2372,7 +2372,7 @@ public void shouldPropagateSourceTypeByDatatype() throws Exception { TestHelper.enableTableCdc(connection, "dt_table"); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.dt_table") .with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.REAL,.+\\.DECIMAL") .build(); @@ -2442,7 +2442,7 @@ public void testMaxLsnSelectStatementWithoutLimit() throws Exception { final int ID_START = 10; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .build(); start(SqlServerConnector.class, config); @@ -2476,7 +2476,7 @@ public void testMaxLsnSelectStatementWithLimit() throws Exception { final int ID_START = 10; final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.MAX_TRANSACTIONS_PER_ITERATION, 1) .build(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerSkipMessagesWithNoUpdateConfigIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerSkipMessagesWithNoUpdateConfigIT.java index ac63c3624..8cc769323 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerSkipMessagesWithNoUpdateConfigIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerSkipMessagesWithNoUpdateConfigIT.java @@ -32,7 +32,7 @@ public class SqlServerSkipMessagesWithNoUpdateConfigIT extends AbstractConnector private SqlServerConnection connection; private final Configuration.Builder configBuilder = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.skip_messages_test") .with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, "dbo.skip_messages_test.id, dbo.skip_messages_test.white"); @@ -89,7 +89,7 @@ public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabled() throw @FixFor("DBZ-2979") public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledWithExcludeConfig() throws Exception { Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.skip_messages_test") .with(SqlServerConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true) .with(SqlServerConnectorConfig.COLUMN_EXCLUDE_LIST, "dbo.skip_messages_test.black") diff --git a/debezium-core/src/main/java/io/debezium/snapshot/mode/NeverSnapshotter.java b/debezium-core/src/main/java/io/debezium/snapshot/mode/NeverSnapshotter.java index ce9cd8560..7102fe22b 100644 --- a/debezium-core/src/main/java/io/debezium/snapshot/mode/NeverSnapshotter.java +++ b/debezium-core/src/main/java/io/debezium/snapshot/mode/NeverSnapshotter.java @@ -9,6 +9,9 @@ import io.debezium.spi.snapshot.Snapshotter; +/** + * Currently only valid for MySQL. Deprecation is in evaluation for Debezium 3.0 + */ public class NeverSnapshotter implements Snapshotter { @Override @@ -27,13 +30,13 @@ public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgre } @Override - public boolean shouldStream() { - return true; + public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) { + return false; } @Override - public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) { - return false; + public boolean shouldStream() { + return true; } @Override diff --git a/debezium-core/src/main/java/io/debezium/snapshot/mode/NoDataSnapshotter.java b/debezium-core/src/main/java/io/debezium/snapshot/mode/NoDataSnapshotter.java new file mode 100644 index 000000000..66745e17c --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/snapshot/mode/NoDataSnapshotter.java @@ -0,0 +1,64 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.snapshot.mode; + +import java.util.Map; + +import io.debezium.bean.StandardBeanNames; +import io.debezium.relational.HistorizedRelationalDatabaseSchema; +import io.debezium.schema.DatabaseSchema; +import io.debezium.spi.snapshot.Snapshotter; + +public class NoDataSnapshotter extends BeanAwareSnapshotter implements Snapshotter { + + @Override + public String name() { + return "no_data"; + } + + @Override + public void configure(Map properties) { + + } + + @Override + public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) { + return false; + } + + @Override + public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) { + + final DatabaseSchema databaseSchema = beanRegistry.lookupByName(StandardBeanNames.DATABASE_SCHEMA, DatabaseSchema.class); + + if (!databaseSchema.isHistorized()) { + return false; + } + + final HistorizedRelationalDatabaseSchema historizedRelationalDatabaseSchema = (HistorizedRelationalDatabaseSchema) databaseSchema; + if (offsetExists && !snapshotInProgress) { + return historizedRelationalDatabaseSchema.isStorageInitializationExecuted(); + } + + return true; + } + + @Override + public boolean shouldStream() { + return true; + } + + @Override + public boolean shouldSnapshotOnSchemaError() { + return false; + } + + @Override + public boolean shouldSnapshotOnDataError() { + return false; + } + +} diff --git a/debezium-core/src/main/java/io/debezium/snapshot/mode/RecoverySnapshotter.java b/debezium-core/src/main/java/io/debezium/snapshot/mode/RecoverySnapshotter.java new file mode 100644 index 000000000..161a92ce5 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/snapshot/mode/RecoverySnapshotter.java @@ -0,0 +1,20 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.snapshot.mode; + +public class RecoverySnapshotter extends NoDataSnapshotter { + + @Override + public String name() { + return "recovery"; + } + + @Override + public boolean shouldSnapshotOnSchemaError() { + return true; + } + +} diff --git a/debezium-core/src/main/java/io/debezium/snapshot/mode/SchemaOnlyRecoverySnapshotter.java b/debezium-core/src/main/java/io/debezium/snapshot/mode/SchemaOnlyRecoverySnapshotter.java index 162b14650..f486756be 100644 --- a/debezium-core/src/main/java/io/debezium/snapshot/mode/SchemaOnlyRecoverySnapshotter.java +++ b/debezium-core/src/main/java/io/debezium/snapshot/mode/SchemaOnlyRecoverySnapshotter.java @@ -5,16 +5,14 @@ */ package io.debezium.snapshot.mode; -public class SchemaOnlyRecoverySnapshotter extends SchemaOnlySnapshotter { +/** + * @deprecated to be removed in Debezium 3.0, replaced by {{@link RecoverySnapshotter}} + */ +public class SchemaOnlyRecoverySnapshotter extends NoDataSnapshotter { @Override public String name() { return "schema_only_recovery"; } - @Override - public boolean shouldSnapshotOnSchemaError() { - return true; - } - } diff --git a/debezium-core/src/main/java/io/debezium/snapshot/mode/SchemaOnlySnapshotter.java b/debezium-core/src/main/java/io/debezium/snapshot/mode/SchemaOnlySnapshotter.java index 0d521b7fe..543757fe7 100644 --- a/debezium-core/src/main/java/io/debezium/snapshot/mode/SchemaOnlySnapshotter.java +++ b/debezium-core/src/main/java/io/debezium/snapshot/mode/SchemaOnlySnapshotter.java @@ -5,54 +5,14 @@ */ package io.debezium.snapshot.mode; -import java.util.Map; - -import io.debezium.bean.StandardBeanNames; -import io.debezium.relational.HistorizedRelationalDatabaseSchema; -import io.debezium.spi.snapshot.Snapshotter; - -public class SchemaOnlySnapshotter extends BeanAwareSnapshotter implements Snapshotter { +/** + * @deprecated to be removed in Debezium 3.0, replaced by {{@link NoDataSnapshotter}} + */ +@Deprecated +public class SchemaOnlySnapshotter extends NoDataSnapshotter { @Override public String name() { return "schema_only"; } - - @Override - public void configure(Map properties) { - - } - - @Override - public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) { - - return false; - } - - @Override - public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) { - - final HistorizedRelationalDatabaseSchema databaseSchema = beanRegistry.lookupByName(StandardBeanNames.DATABASE_SCHEMA, HistorizedRelationalDatabaseSchema.class); - - if (offsetExists && !snapshotInProgress) { - return databaseSchema.isStorageInitializationExecuted(); - } - - return true; - } - - @Override - public boolean shouldStream() { - return true; - } - - @Override - public boolean shouldSnapshotOnSchemaError() { - return false; - } - - @Override - public boolean shouldSnapshotOnDataError() { - return false; - } } diff --git a/debezium-core/src/main/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter b/debezium-core/src/main/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter index 75b222a21..3a92ad77d 100644 --- a/debezium-core/src/main/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter +++ b/debezium-core/src/main/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter @@ -1,7 +1,8 @@ io.debezium.snapshot.mode.AlwaysSnapshotter io.debezium.snapshot.mode.InitialSnapshotter io.debezium.snapshot.mode.InitialOnlySnapshotter -io.debezium.snapshot.mode.SchemaOnlySnapshotter -io.debezium.snapshot.mode.SchemaOnlyRecoverySnapshotter +io.debezium.snapshot.mode.NoDataSnapshotter +io.debezium.snapshot.mode.RecoverySnapshotter io.debezium.snapshot.mode.WhenNeededSnapshotter -io.debezium.snapshot.mode.NeverSnapshotter \ No newline at end of file +io.debezium.snapshot.mode.NeverSnapshotter +io.debezium.snapshot.mode.SchemaOnlySnapshotter \ No newline at end of file diff --git a/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java b/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java index 84d64014f..58bb556ab 100644 --- a/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java +++ b/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java @@ -113,7 +113,7 @@ public void doSetup() { delete("history.txt"); Configuration connectorConfig = defaultConnectorConfig() - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TEST") .with(OracleConnectorConfig.LOG_MINING_STRATEGY, LogMiningStrategy.parse(miningStrategy)) .build();