DBZ-4669 Reformat after plugin version consolidation
This commit is contained in:
parent
f0a1fa8a67
commit
8ca5e0a9b5
@ -173,7 +173,7 @@ public String hosts() {
|
||||
|
||||
/**
|
||||
* Initial connection seed which is either a host specification or connection string
|
||||
*
|
||||
*
|
||||
* @return hosts or connection string
|
||||
*/
|
||||
public String connectionSeed() {
|
||||
|
@ -550,7 +550,7 @@ String checkFieldExists(Map<String, Object> doc, String field) {
|
||||
|
||||
/**
|
||||
* Verifies whether a parameter representing path is the same or belongs under this path.
|
||||
*
|
||||
*
|
||||
* @param other - the string representing the other path
|
||||
* @return - true if this path is the same or parent of the path passed
|
||||
*/
|
||||
|
@ -1653,7 +1653,7 @@ public void shouldParseEngineNameWithApostrophes() {
|
||||
.stream()
|
||||
.map(TableId::table)
|
||||
.collect(Collectors.toSet()))
|
||||
.containsOnly("t1", "t2", "t3", "t4");
|
||||
.containsOnly("t1", "t2", "t3", "t4");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -199,18 +199,18 @@ else if (Objects.equals(previousSnapshotSourceField, "last_in_data_collection"))
|
||||
if (storeOnlyCapturedTables) {
|
||||
assertThat(schemaChanges.ddlRecordsForDatabaseOrEmpty("").size()
|
||||
+ schemaChanges.ddlRecordsForDatabaseOrEmpty(DATABASE.getDatabaseName()).size())
|
||||
.isEqualTo(schemaEventsCount);
|
||||
.isEqualTo(schemaEventsCount);
|
||||
assertThat(schemaChanges.ddlRecordsForDatabaseOrEmpty("").size()
|
||||
+ schemaChanges.ddlRecordsForDatabaseOrEmpty(OTHER_DATABASE.getDatabaseName()).size())
|
||||
.isEqualTo(1);
|
||||
.isEqualTo(1);
|
||||
}
|
||||
else {
|
||||
assertThat(schemaChanges.ddlRecordsForDatabaseOrEmpty("").size()
|
||||
+ schemaChanges.ddlRecordsForDatabaseOrEmpty(DATABASE.getDatabaseName()).size())
|
||||
.isEqualTo(schemaEventsCount);
|
||||
.isEqualTo(schemaEventsCount);
|
||||
assertThat(schemaChanges.ddlRecordsForDatabaseOrEmpty("").size()
|
||||
+ schemaChanges.ddlRecordsForDatabaseOrEmpty(OTHER_DATABASE.getDatabaseName()).size())
|
||||
.isEqualTo(useGlobalLock ? 1 : 5);
|
||||
.isEqualTo(useGlobalLock ? 1 : 5);
|
||||
}
|
||||
|
||||
if (!useGlobalLock) {
|
||||
|
@ -91,7 +91,7 @@ public UniqueDatabase(final String serverName, final String databaseName, final
|
||||
* Creates an instance with given Debezium logical name and database name and id suffix same
|
||||
* as another database. This is handy for tests that need multpli databases and can use regex
|
||||
* based whitelisting.
|
||||
|
||||
|
||||
* @param serverName - logical Debezium server name
|
||||
* @param databaseName - the name of the database (prix)
|
||||
* @param sibling - a database whose unique suffix will be used
|
||||
|
@ -36,7 +36,7 @@ public class OracleOffsetContext extends CommonOffsetContext<SourceInfo> {
|
||||
* SCN that was used for the initial consistent snapshot.
|
||||
*
|
||||
* We keep track of this field because it's a cutoff for emitting DDL statements,
|
||||
* in case we start mining _before_ the snapshot SCN to cover transactions that were
|
||||
* in case we start mining _before_ the snapshot SCN to cover transactions that were
|
||||
* ongoing at the time the snapshot was taken.
|
||||
*/
|
||||
private final Scn snapshotScn;
|
||||
|
@ -257,7 +257,7 @@ private void logOnlineRedoLogSizes(OracleConnectorConfig config) throws SQLExcep
|
||||
/**
|
||||
* Computes the start SCN for the first mining session.
|
||||
*
|
||||
* Normally, this would be the snapshot SCN, but if there were pending transactions at the time
|
||||
* Normally, this would be the snapshot SCN, but if there were pending transactions at the time
|
||||
* the snapshot was taken, we'd miss the events in those transactions that have an SCN smaller
|
||||
* than the snapshot SCN.
|
||||
*
|
||||
|
@ -3116,7 +3116,7 @@ public void shouldSafelySnapshotAndStreamWithDatabaseIncludeList() throws Except
|
||||
}
|
||||
|
||||
/**
|
||||
* database include/exclude list are not support (yet) for the Oracle connector; this test is just there to make
|
||||
* database include/exclude list are not support (yet) for the Oracle connector; this test is just there to make
|
||||
* sure that the presence of these (functionally ignored) properties doesn't cause any problems.
|
||||
*/
|
||||
@Test
|
||||
|
@ -30,7 +30,7 @@
|
||||
|
||||
/**
|
||||
* Unit tests for Oracle's database schema history.
|
||||
*
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class OracleSchemaHistoryTest extends AbstractSchemaHistoryTest {
|
||||
|
@ -25,7 +25,7 @@
|
||||
/**
|
||||
* Custom PostgreSQL implementation of the {@link SignalBasedIncrementalSnapshotChangeEventSource} implementation
|
||||
* which performs an explicit schema refresh of a table prior to the incremental snapshot starting.
|
||||
*
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class PostgresSignalBasedIncrementalSnapshotChangeEventSource
|
||||
|
@ -144,7 +144,7 @@ protected void assertEndTransaction(SourceRecord record, String beginTxId, long
|
||||
|
||||
assertThat(end.getArray("data_collections").stream().map(x -> (Struct) x)
|
||||
.collect(Collectors.toMap(x -> x.getString("data_collection"), x -> x.getInt64("event_count"))))
|
||||
.isEqualTo(expectedPerTableCount.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().longValue())));
|
||||
.isEqualTo(expectedPerTableCount.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().longValue())));
|
||||
assertThat(offset.get("transaction_id")).isEqualTo(expectedId);
|
||||
}
|
||||
|
||||
|
@ -2083,7 +2083,7 @@ public void shouldDetectPurgedHistory() throws Exception {
|
||||
assertConnectorNotRunning();
|
||||
assertThat(logInterceptor.containsStacktraceElement(
|
||||
"The db history topic or its content is fully or partially missing. Please check database schema history topic configuration and re-execute the snapshot."))
|
||||
.isTrue();
|
||||
.isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -764,7 +764,7 @@ public <T extends Enum<T>> Field withEnum(Class<T> enumType, T defaultOption) {
|
||||
public Field required() {
|
||||
return new Field(name(), displayName(), type(), width(), description(), importance, dependents,
|
||||
defaultValueGenerator, validator, recommender, true, group, allowedValues)
|
||||
.withValidation(Field::isRequired);
|
||||
.withValidation(Field::isRequired);
|
||||
}
|
||||
|
||||
public Field optional() {
|
||||
|
@ -228,7 +228,7 @@ protected static <T, U> BiPredicate<T, U> includedInPatterns(Collection<Pattern>
|
||||
* in the supplied comma-separated list that matches the predicate parameter in a case-insensitive manner.
|
||||
*
|
||||
* @param regexPatterns the comma-separated regular expression pattern (or literal) strings; may not be null
|
||||
|
||||
|
||||
* @return the function that performs the matching
|
||||
* @throws PatternSyntaxException if the string includes an invalid regular expression
|
||||
*/
|
||||
|
@ -150,14 +150,14 @@ public void checkPreconditions() {
|
||||
public void shouldFailToBuildTableSchemaFromNullTable() {
|
||||
new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, null, null, null, null);
|
||||
.create(topicNamingStrategy, null, null, null, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBuildTableSchemaFromTable() {
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
assertThat(schema).isNotNull();
|
||||
}
|
||||
|
||||
@ -167,7 +167,7 @@ public void shouldBuildCorrectSchemaNames() {
|
||||
// table id with catalog and schema
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
assertThat(schema).isNotNull();
|
||||
assertThat(schema.keySchema().name()).isEqualTo("test.schema.table.Key");
|
||||
assertThat(schema.valueSchema().name()).isEqualTo("test.schema.table.Value");
|
||||
@ -179,7 +179,7 @@ public void shouldBuildCorrectSchemaNames() {
|
||||
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(new DefaultTopicNamingStrategy(topicProperties), table, null, null, null);
|
||||
.create(new DefaultTopicNamingStrategy(topicProperties), table, null, null, null);
|
||||
|
||||
assertThat(schema).isNotNull();
|
||||
assertThat(schema.keySchema().name()).isEqualTo("test.testDb.testTable.Key");
|
||||
@ -192,7 +192,7 @@ public void shouldBuildCorrectSchemaNames() {
|
||||
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
|
||||
assertThat(schema).isNotNull();
|
||||
assertThat(schema.keySchema().name()).isEqualTo("test.testSchema.testTable.Key");
|
||||
@ -205,7 +205,7 @@ public void shouldBuildCorrectSchemaNames() {
|
||||
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
|
||||
assertThat(schema).isNotNull();
|
||||
assertThat(schema.keySchema().name()).isEqualTo("test.testTable.Key");
|
||||
@ -218,7 +218,7 @@ public void shouldBuildCorrectSchemaNamesInMultiPartitionMode() {
|
||||
// table id with catalog and schema
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, true)
|
||||
.create(new SchemaTopicNamingStrategy(topicProperties, true), table, null, null, null);
|
||||
.create(new SchemaTopicNamingStrategy(topicProperties, true), table, null, null, null);
|
||||
assertThat(schema).isNotNull();
|
||||
assertThat(schema.keySchema().name()).isEqualTo("test.catalog.schema.table.Key");
|
||||
assertThat(schema.valueSchema().name()).isEqualTo("test.catalog.schema.table.Value");
|
||||
@ -229,7 +229,7 @@ public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
|
||||
table = table.edit().setPrimaryKeyNames().create();
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
assertThat(schema).isNotNull();
|
||||
// Check the keys ...
|
||||
assertThat(schema.keySchema()).isNull();
|
||||
@ -290,7 +290,7 @@ public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKe
|
||||
table = table.edit().setPrimaryKeyNames().create();
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), true, false)
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
assertThat(schema).isNotNull();
|
||||
// Check the keys ...
|
||||
assertThat(schema.keySchema()).isNull();
|
||||
@ -358,7 +358,7 @@ public void shouldSanitizeFieldNamesAndValidateSerialization() {
|
||||
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), true, false)
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
|
||||
Struct key = (Struct) schema.keyFromColumnData(keyData);
|
||||
Struct value = schema.valueFromColumnData(data);
|
||||
@ -377,7 +377,7 @@ public void shouldBuildTableSchemaFromTableWithCustomKey() {
|
||||
table = table.edit().setPrimaryKeyNames().create();
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
|
||||
.create(topicNamingStrategy, table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
|
||||
assertThat(schema).isNotNull();
|
||||
Schema keys = schema.keySchema();
|
||||
assertThat(keys).isNotNull();
|
||||
@ -391,7 +391,7 @@ public void shouldBuildTableSchemaFromTableWithCustomKey() {
|
||||
public void shouldOverrideIdentityKey() {
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
|
||||
.create(topicNamingStrategy, table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
|
||||
assertThat(schema).isNotNull();
|
||||
Schema keys = schema.keySchema();
|
||||
assertThat(keys).isNotNull();
|
||||
@ -406,7 +406,7 @@ public void shouldOverrideIdentityKey() {
|
||||
public void shouldFallbackToIdentyKeyWhenCustomMapperIsNull() {
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
.create(topicNamingStrategy, table, null, null, null);
|
||||
assertThat(schema).isNotNull();
|
||||
Schema keys = schema.keySchema();
|
||||
assertThat(keys).isNotNull();
|
||||
@ -438,7 +438,7 @@ public void customKeyMapperShouldMapMultipleTables() {
|
||||
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table, null, null, keyMapper);
|
||||
.create(topicNamingStrategy, table, null, null, keyMapper);
|
||||
|
||||
assertThat(schema).isNotNull();
|
||||
Schema keys = schema.keySchema();
|
||||
@ -450,7 +450,7 @@ public void customKeyMapperShouldMapMultipleTables() {
|
||||
|
||||
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table2, null, null, keyMapper);
|
||||
.create(topicNamingStrategy, table2, null, null, keyMapper);
|
||||
|
||||
assertThat(schema2).isNotNull();
|
||||
Schema key2 = schema2.keySchema();
|
||||
@ -478,7 +478,7 @@ public void defaultKeyMapperShouldOrderKeyColumnsBasedOnPrimaryKeyColumnNamesOrd
|
||||
|
||||
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table2, null, null, null);
|
||||
.create(topicNamingStrategy, table2, null, null, null);
|
||||
|
||||
Schema key2 = schema2.keySchema();
|
||||
assertThat(key2).isNotNull();
|
||||
@ -510,7 +510,7 @@ public void mapperConvertersShouldLeaveEmptyDatesAsZero() {
|
||||
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
|
||||
SchemaBuilder.struct().build(), false, false)
|
||||
.create(topicNamingStrategy, table2, null, mappers, null);
|
||||
.create(topicNamingStrategy, table2, null, mappers, null);
|
||||
|
||||
Struct value = schema.valueFromColumnData(data);
|
||||
assertThat(value.get("C1")).isEqualTo(0);
|
||||
|
@ -129,9 +129,9 @@ public void notConsistentConfigurationWillThrowConnectionException() {
|
||||
|
||||
assertThatThrownBy(
|
||||
() -> configureTransformation("inventory.orders,inventory.products", "inventory.orders:purchaser,inventory.products:product", "prod:2,purchaser:2"))
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings and partition.data-collections.field.mappings has different tables defined");
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings and partition.data-collections.field.mappings has different tables defined");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -151,9 +151,9 @@ public void zeroAsPartitionNumberWillThrowConnectionException() {
|
||||
assertThatThrownBy(
|
||||
() -> configureTransformation("inventory.orders,inventory.products", "inventory.orders:purchaser,inventory.products:products",
|
||||
"inventory.products:0,inventory.orders:2"))
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings: partition number for 'inventory.products' must be positive");
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings: partition number for 'inventory.products' must be positive");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -161,9 +161,9 @@ public void negativeAsPartitionNumberWillThrowConnectionException() {
|
||||
assertThatThrownBy(
|
||||
() -> configureTransformation("inventory.orders,inventory.products", "inventory.orders:purchaser,inventory.products:products",
|
||||
"inventory.products:-3,inventory.orders:2"))
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings: partition number for 'inventory.products' must be positive");
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings: partition number for 'inventory.products' must be positive");
|
||||
}
|
||||
|
||||
private SourceRecord buildSourceRecord(String connector, String db, String schema, String tableName, Struct row, Envelope.Operation operation) {
|
||||
|
@ -1131,7 +1131,7 @@ protected void assertEndTransaction(SourceRecord record, String expectedTxId, lo
|
||||
|
||||
assertThat(end.getArray("data_collections").stream().map(x -> (Struct) x)
|
||||
.collect(Collectors.toMap(x -> x.getString("data_collection"), x -> x.getInt64("event_count"))))
|
||||
.isEqualTo(expectedPerTableCount.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().longValue())));
|
||||
.isEqualTo(expectedPerTableCount.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().longValue())));
|
||||
assertThat(offset.get("transaction_id")).isEqualTo(expectedTxId);
|
||||
}
|
||||
|
||||
|
@ -136,7 +136,7 @@ public void produceOutboxBuildItem(CombinedIndexBuildItem index,
|
||||
integrationConfiguredProducer.produce(
|
||||
new HibernateOrmIntegrationStaticConfiguredBuildItem(DEBEZIUM_OUTBOX,
|
||||
puDescriptor.getPersistenceUnitName())
|
||||
.setXmlMappingRequired(true));
|
||||
.setXmlMappingRequired(true));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,10 +43,10 @@ void init() {
|
||||
|
||||
/**
|
||||
* Get a subset of the configuration properties that matches the given prefix.
|
||||
*
|
||||
*
|
||||
* @param config The global configuration object to extract the subset from.
|
||||
* @param prefix The prefix to filter property names.
|
||||
*
|
||||
*
|
||||
* @return A subset of the original configuration properties containing property names
|
||||
* without the prefix.
|
||||
*/
|
||||
|
@ -27,7 +27,7 @@ public ConnectorCompletedEvent(boolean success, String message, Throwable error)
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @return true if the connector was completed successfully
|
||||
*/
|
||||
public boolean isSuccess() {
|
||||
@ -35,7 +35,7 @@ public boolean isSuccess() {
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @return message associated with connection completion
|
||||
*/
|
||||
public String getMessage() {
|
||||
@ -43,7 +43,7 @@ public String getMessage() {
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @return optional error in case the connector has not started successfully or was terminated with an error
|
||||
*/
|
||||
public Optional<Throwable> getError() {
|
||||
|
@ -35,7 +35,7 @@
|
||||
/**
|
||||
* Integration test for verifying that the Kafka sink adapter can stream change events from a PostgreSQL database
|
||||
* to a configured Apache Kafka broker.
|
||||
*
|
||||
*
|
||||
* @author Alfusainey Jallow
|
||||
*/
|
||||
@QuarkusTest
|
||||
|
@ -52,7 +52,7 @@
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
|
||||
/**
|
||||
* Integration test that verifies basic reading from PostgreSQL database and writing to a Google Cloud PubSub stream running on a Google PubSub Emulator
|
||||
* Integration test that verifies basic reading from PostgreSQL database and writing to a Google Cloud PubSub stream running on a Google PubSub Emulator
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*/
|
||||
|
@ -12,19 +12,19 @@
|
||||
public interface RedisClient {
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @throws RedisClientConnectionException
|
||||
*/
|
||||
void disconnect();
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @throws RedisClientConnectionException
|
||||
*/
|
||||
void close();
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param key
|
||||
* @param hash
|
||||
* @return
|
||||
@ -33,7 +33,7 @@ public interface RedisClient {
|
||||
String xadd(String key, Map<String, String> hash);
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param hashes
|
||||
* @return
|
||||
* @throws RedisClientConnectionException
|
||||
@ -41,7 +41,7 @@ public interface RedisClient {
|
||||
List<String> xadd(List<SimpleEntry<String, Map<String, String>>> hashes);
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
* @throws RedisClientConnectionException
|
||||
@ -49,7 +49,7 @@ public interface RedisClient {
|
||||
List<Map<String, String>> xrange(String key);
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
* @throws RedisClientConnectionException
|
||||
@ -57,7 +57,7 @@ public interface RedisClient {
|
||||
long xlen(String key);
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
* @throws RedisClientConnectionException
|
||||
@ -65,7 +65,7 @@ public interface RedisClient {
|
||||
Map<String, String> hgetAll(String key);
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param key
|
||||
* @param field
|
||||
* @param value
|
||||
@ -75,7 +75,7 @@ public interface RedisClient {
|
||||
long hset(byte[] key, byte[] field, byte[] value);
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param replicas
|
||||
* @param timeout
|
||||
* @return
|
||||
@ -84,7 +84,7 @@ public interface RedisClient {
|
||||
long waitReplicas(int replicas, long timeout);
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @return
|
||||
* @throws RedisClientConnectionException
|
||||
*/
|
||||
|
@ -34,7 +34,7 @@ public class RedisConnection {
|
||||
private boolean sslEnabled;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param address
|
||||
* @param user
|
||||
* @param password
|
||||
@ -52,7 +52,7 @@ public RedisConnection(String address, String user, String password, int connect
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param clientName
|
||||
* @param waitEnabled
|
||||
* @param waitTimeout
|
||||
|
@ -63,7 +63,7 @@ default void assertMinimalRecordsCount(String topic, int count) {
|
||||
consumer.seekToBeginning(consumer.assignment());
|
||||
assertThat(
|
||||
records.count()).withFailMessage("Expecting topic '%s' to have at least <%d> messages but it had <%d>.", topic, count, records.count())
|
||||
.isGreaterThanOrEqualTo(count);
|
||||
.isGreaterThanOrEqualTo(count);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -307,8 +307,8 @@ private void addShard(MongoDbReplicaSet shard) {
|
||||
.pollInterval(1, SECONDS)
|
||||
.until(() -> stream(arbitraryRouter.eval("db.adminCommand({listShards: 1})")
|
||||
.path("shards"))
|
||||
.anyMatch(s -> s.get("_id").asText().equals(shard.getName()) &&
|
||||
s.get("state").asInt() == 1));
|
||||
.anyMatch(s -> s.get("_id").asText().equals(shard.getName()) &&
|
||||
s.get("state").asInt() == 1));
|
||||
}
|
||||
|
||||
private Stream<Startable> stream() {
|
||||
|
Loading…
Reference in New Issue
Block a user