DBZ-2439 Remove redundant modifiers

This commit is contained in:
Mark Lambert 2022-11-09 14:43:30 -05:00 committed by Jiri Pechanec
parent 215211f5fb
commit e5d16b14a6
142 changed files with 517 additions and 517 deletions

View File

@ -16,12 +16,12 @@
@Incubating
public interface ChangeEvent<K, V> {
public K key();
K key();
public V value();
V value();
/**
* @return A name of the logical destination for which the event is intended
*/
public String destination();
String destination();
}

View File

@ -45,12 +45,12 @@
@Incubating
public interface DebeziumEngine<R> extends Runnable, Closeable {
public static final String OFFSET_FLUSH_INTERVAL_MS_PROP = "offset.flush.interval.ms";
String OFFSET_FLUSH_INTERVAL_MS_PROP = "offset.flush.interval.ms";
/**
* A callback function to be notified when the connector completes.
*/
public interface CompletionCallback {
interface CompletionCallback {
/**
* Handle the completion of the embedded connector engine.
*
@ -65,7 +65,7 @@ public interface CompletionCallback {
/**
* Callback function which informs users about the various stages a connector goes through during startup
*/
public interface ConnectorCallback {
interface ConnectorCallback {
/**
* Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has
@ -104,7 +104,7 @@ default void taskStopped() {
* Contract passed to {@link ChangeConsumer}s, allowing them to commit single records as they have been processed
* and to signal that offsets may be flushed eventually.
*/
public static interface RecordCommitter<R> {
interface RecordCommitter<R> {
/**
* Marks a single record as processed, must be called for each
@ -142,7 +142,7 @@ public static interface RecordCommitter<R> {
* Contract that should be passed to {@link RecordCommitter#markProcessed(Object, Offsets)} for marking a record
* as processed with updated offsets.
*/
public interface Offsets {
interface Offsets {
/**
* Associates a key with a specific value, overwrites the value if the key is already present.
@ -157,7 +157,7 @@ public interface Offsets {
* A contract invoked by the embedded engine when it has received a batch of change records to be processed. Allows
* to process multiple records in one go, acknowledging their processing once that's done.
*/
public static interface ChangeConsumer<R> {
interface ChangeConsumer<R> {
/**
* Handles a batch of records, calling the {@link RecordCommitter#markProcessed(Object)}
@ -180,7 +180,7 @@ default boolean supportsTombstoneEvents() {
/**
* A builder to set up and create {@link DebeziumEngine} instances.
*/
public static interface Builder<R> {
interface Builder<R> {
/**
* Call the specified function for every {@link SourceRecord data change event} read from the source database.
@ -269,7 +269,7 @@ public static interface Builder<R> {
*
* @return the new builder; never null
*/
public static <T> Builder<ChangeEvent<T, T>> create(Class<? extends SerializationFormat<T>> format) {
static <T> Builder<ChangeEvent<T, T>> create(Class<? extends SerializationFormat<T>> format) {
return create(format, format);
}
@ -281,13 +281,13 @@ public static <T> Builder<ChangeEvent<T, T>> create(Class<? extends Serializatio
*
* @return the new builder; never null
*/
public static <K, V> Builder<ChangeEvent<K, V>> create(Class<? extends SerializationFormat<K>> keyFormat,
Class<? extends SerializationFormat<V>> valueFormat) {
static <K, V> Builder<ChangeEvent<K, V>> create(Class<? extends SerializationFormat<K>> keyFormat,
Class<? extends SerializationFormat<V>> valueFormat) {
return create(KeyValueChangeEventFormat.of(keyFormat, valueFormat));
}
public static <S, T, K extends SerializationFormat<S>, V extends SerializationFormat<T>> Builder<ChangeEvent<S, T>> create(KeyValueChangeEventFormat<K, V> format) {
static <S, T, K extends SerializationFormat<S>, V extends SerializationFormat<T>> Builder<ChangeEvent<S, T>> create(KeyValueChangeEventFormat<K, V> format) {
final ServiceLoader<BuilderFactory> loader = ServiceLoader.load(BuilderFactory.class);
final Iterator<BuilderFactory> iterator = loader.iterator();
if (!iterator.hasNext()) {
@ -306,7 +306,7 @@ public static <S, T, K extends SerializationFormat<S>, V extends SerializationFo
*
* @return the new builder; never null
*/
public static <T, V extends SerializationFormat<T>> Builder<RecordChangeEvent<T>> create(ChangeEventFormat<V> format) {
static <T, V extends SerializationFormat<T>> Builder<RecordChangeEvent<T>> create(ChangeEventFormat<V> format) {
final ServiceLoader<BuilderFactory> loader = ServiceLoader.load(BuilderFactory.class);
final Iterator<BuilderFactory> iterator = loader.iterator();
if (!iterator.hasNext()) {
@ -323,7 +323,7 @@ public static <T, V extends SerializationFormat<T>> Builder<RecordChangeEvent<T>
* Internal contract between the API and implementation, for bootstrapping the latter.
* Not intended for direct usage by application code.
*/
public static interface BuilderFactory {
interface BuilderFactory {
/**
* Prescribe the output format used by the {@link DebeziumEngine}.

View File

@ -15,5 +15,5 @@
@Incubating
public interface RecordChangeEvent<V> {
public V record();
V record();
}

View File

@ -24,7 +24,7 @@ public interface OffsetCommitPolicy {
* An {@link OffsetCommitPolicy} that will commit offsets as frequently as possible. This may result in reduced
* performance, but it has the least potential for seeing source records more than once upon restart.
*/
public static class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy {
class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy {
@Override
public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
@ -37,7 +37,7 @@ public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration time
* time is less than {@code 0} then the policy will behave as {@link AlwaysCommitOffsetPolicy}.
* @see io.debezium.engine.DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS
*/
public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {
class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {
private final Duration minimumTime;

View File

@ -28,7 +28,7 @@ interface Converter {
/**
* Callback for registering a converter.
*/
public interface ConverterRegistration<S> {
interface ConverterRegistration<S> {
/**
* Registers the given schema and converter for the current field. Should not be

View File

@ -296,7 +296,7 @@ protected Supplier<MongoClient> preferredClientFor(ReplicaSet replicaSet, ReadPr
}
@FunctionalInterface
public static interface PreferredConnectFailed {
public interface PreferredConnectFailed {
void failed(int attemptNumber, int attemptsRemaining, Throwable error);
}

View File

@ -36,7 +36,7 @@ public final class FieldSelector {
* This filter is designed to exclude or rename fields in a document.
*/
@ThreadSafe
public static interface FieldFilter {
public interface FieldFilter {
/**
* Applies this filter to the given document to exclude or rename fields.

View File

@ -21,7 +21,7 @@
class JsonSerialization {
@FunctionalInterface
public static interface Transformer extends Function<BsonDocument, String> {
public interface Transformer extends Function<BsonDocument, String> {
}
private static final String ID_FIELD_NAME = "_id";
@ -48,7 +48,7 @@ public static interface Transformer extends Function<BsonDocument, String> {
private final Transformer transformer;
public JsonSerialization() {
JsonSerialization() {
transformer = (doc) -> doc.toJson(COMPACT_JSON_SETTINGS);
}

View File

@ -57,7 +57,7 @@ public class MongoDbConnectorConfig extends CommonConnectorConfig {
/**
* The set of predefined SnapshotMode options or aliases.
*/
public static enum SnapshotMode implements EnumeratedValue {
public enum SnapshotMode implements EnumeratedValue {
/**
* Always perform an initial snapshot when starting.
@ -72,7 +72,7 @@ public static enum SnapshotMode implements EnumeratedValue {
private final String value;
private final boolean includeData;
private SnapshotMode(String value, boolean includeData) {
SnapshotMode(String value, boolean includeData) {
this.value = value;
this.includeData = includeData;
}
@ -124,7 +124,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
/**
* The set off different ways how connector can capture changes.
*/
public static enum CaptureMode implements EnumeratedValue {
public enum CaptureMode implements EnumeratedValue {
/**
* Change capture based on MongoDB Change Streams support.
@ -155,7 +155,7 @@ public static enum CaptureMode implements EnumeratedValue {
private final boolean fullUpdate;
private final boolean includePreImage;
private CaptureMode(String value, boolean changeStreams, boolean fullUpdate, boolean includePreImage) {
CaptureMode(String value, boolean changeStreams, boolean fullUpdate, boolean includePreImage) {
this.value = value;
this.changeStreams = changeStreams;
this.fullUpdate = fullUpdate;

View File

@ -151,7 +151,7 @@ public boolean deduplicationNeeded() {
}
private String arrayToSerializedString(Object[] array) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(array);
return HexConverter.convertToHexString(bos.toByteArray());
@ -162,7 +162,7 @@ private String arrayToSerializedString(Object[] array) {
}
private Object[] serializedStringToArray(String field, String serialized) {
try (final ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
try (ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
ObjectInputStream ois = new ObjectInputStream(bis)) {
return (Object[]) ois.readObject();
}

View File

@ -152,7 +152,7 @@ static final class SessionTransactionId {
public final String lsid;
public final Long txnNumber;
public SessionTransactionId(String lsid, Long txnNumber) {
SessionTransactionId(String lsid, Long txnNumber) {
super();
this.txnNumber = txnNumber;
this.lsid = lsid;

View File

@ -63,7 +63,7 @@ class EventBuffer {
*/
private BinlogPosition forwardTillPosition;
public EventBuffer(int capacity, MySqlStreamingChangeEventSource streamingChangeEventSource, ChangeEventSourceContext changeEventSourceContext) {
EventBuffer(int capacity, MySqlStreamingChangeEventSource streamingChangeEventSource, ChangeEventSourceContext changeEventSourceContext) {
this.capacity = capacity;
this.buffer = new ArrayBlockingQueue<>(capacity);
this.streamingChangeEventSource = streamingChangeEventSource;

View File

@ -55,7 +55,7 @@ public class MySqlConnectorConfig extends HistorizedRelationalDatabaseConnectorC
/**
* The set of predefined BigIntUnsignedHandlingMode options or aliases.
*/
public static enum BigIntUnsignedHandlingMode implements EnumeratedValue {
public enum BigIntUnsignedHandlingMode implements EnumeratedValue {
/**
* Represent {@code BIGINT UNSIGNED} values as precise {@link BigDecimal} values, which are
* represented in change events in a binary form. This is precise but difficult to use.
@ -70,7 +70,7 @@ public static enum BigIntUnsignedHandlingMode implements EnumeratedValue {
private final String value;
private BigIntUnsignedHandlingMode(String value) {
BigIntUnsignedHandlingMode(String value) {
this.value = value;
}
@ -127,7 +127,7 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
/**
* The set of predefined SnapshotMode options or aliases.
*/
public static enum SnapshotMode implements EnumeratedValue {
public enum SnapshotMode implements EnumeratedValue {
/**
* Perform a snapshot when it is needed.
@ -172,8 +172,8 @@ public static enum SnapshotMode implements EnumeratedValue {
private final boolean shouldSnapshotOnSchemaError;
private final boolean shouldSnapshotOnDataError;
private SnapshotMode(String value, boolean includeSchema, boolean includeData, boolean shouldStream, boolean shouldSnapshotOnSchemaError,
boolean shouldSnapshotOnDataError) {
SnapshotMode(String value, boolean includeSchema, boolean includeData, boolean shouldStream, boolean shouldSnapshotOnSchemaError,
boolean shouldSnapshotOnDataError) {
this.value = value;
this.includeSchema = includeSchema;
this.includeData = includeData;
@ -265,7 +265,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
}
}
public static enum SnapshotNewTables implements EnumeratedValue {
public enum SnapshotNewTables implements EnumeratedValue {
/**
* Do not snapshot new tables
*/
@ -278,7 +278,7 @@ public static enum SnapshotNewTables implements EnumeratedValue {
private final String value;
private SnapshotNewTables(String value) {
SnapshotNewTables(String value) {
this.value = value;
}
@ -325,7 +325,7 @@ public static SnapshotNewTables parse(String value, String defaultValue) {
/**
* The set of predefined Snapshot Locking Mode options.
*/
public static enum SnapshotLockingMode implements EnumeratedValue {
public enum SnapshotLockingMode implements EnumeratedValue {
/**
* This mode will block all writes for the entire duration of the snapshot.
@ -361,7 +361,7 @@ public static enum SnapshotLockingMode implements EnumeratedValue {
private final String value;
private SnapshotLockingMode(String value) {
SnapshotLockingMode(String value) {
this.value = value;
}
@ -434,7 +434,7 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
/**
* The set of predefined SecureConnectionMode options or aliases.
*/
public static enum SecureConnectionMode implements EnumeratedValue {
public enum SecureConnectionMode implements EnumeratedValue {
/**
* Establish an unencrypted connection.
*/
@ -463,7 +463,7 @@ public static enum SecureConnectionMode implements EnumeratedValue {
private final String value;
private SecureConnectionMode(String value) {
SecureConnectionMode(String value) {
this.value = value;
}

View File

@ -14,7 +14,7 @@ final class MySqlHistoryRecordComparator extends HistoryRecordComparator {
private final Predicate<String> gtidSourceFilter;
public MySqlHistoryRecordComparator(Predicate<String> gtidSourceFilter) {
MySqlHistoryRecordComparator(Predicate<String> gtidSourceFilter) {
super();
this.gtidSourceFilter = gtidSourceFilter;
}

View File

@ -509,7 +509,7 @@ private Statement createStatementWithLargeResultSet() throws SQLException {
*/
private static class MySqlSnapshotContext extends RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> {
public MySqlSnapshotContext(MySqlPartition partition) throws SQLException {
MySqlSnapshotContext(MySqlPartition partition) throws SQLException {
super(partition, "");
}
}

View File

@ -19,8 +19,8 @@ class MySqlSnapshotChangeEventSourceMetrics extends DefaultSnapshotChangeEventSo
private final AtomicBoolean holdingGlobalLock = new AtomicBoolean();
public MySqlSnapshotChangeEventSourceMetrics(MySqlTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
MySqlSnapshotChangeEventSourceMetrics(MySqlTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
super(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}

View File

@ -177,7 +177,7 @@ public boolean equals(Object obj) {
}
@FunctionalInterface
private static interface BinlogChangeEmitter<T> {
private interface BinlogChangeEmitter<T> {
void emit(TableId tableId, T data) throws InterruptedException;
}

View File

@ -69,7 +69,7 @@
public class MySqlValueConverters extends JdbcValueConverters {
@FunctionalInterface
public static interface ParsingErrorHandler {
public interface ParsingErrorHandler {
void error(String message, Exception exception);
}

View File

@ -254,7 +254,7 @@ public void tableWithDatetime() throws Exception {
Testing.Print.enable();
final int ROWS = 10;
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
for (int i = 0; i < ROWS; i++) {
connection.executeWithoutCommitting(String.format(
@ -307,7 +307,7 @@ public void tableWithZeroDate() throws Exception {
Testing.Print.enable();
final LogInterceptor logInterceptor = new LogInterceptor(MySqlBinaryProtocolFieldReader.class);
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
connection.executeWithoutCommitting("INSERT INTO a_date (pk) VALUES (1)");
connection.commit();

View File

@ -3435,19 +3435,19 @@ private void assertColumn(Table table, String name, String typeName, int jdbcTyp
class MysqlDdlParserWithSimpleTestListener extends MySqlAntlrDdlParser {
public MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener) {
MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener) {
this(changesListener, false);
}
public MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener, TableFilter tableFilter) {
MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener, TableFilter tableFilter) {
this(changesListener, false, false, tableFilter);
}
public MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener, boolean includeViews) {
MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener, boolean includeViews) {
this(changesListener, includeViews, false, TableFilter.includeAll());
}
public MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener, boolean includeViews, boolean includeComments) {
MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener, boolean includeViews, boolean includeComments) {
this(changesListener, includeViews, includeComments, TableFilter.includeAll());
}

View File

@ -2541,7 +2541,7 @@ public void shouldEmitTruncateOperation() throws Exception {
private static class NoTombStonesHandler implements DebeziumEngine.ChangeConsumer<SourceRecord> {
protected BlockingQueue<SourceRecord> recordQueue;
public NoTombStonesHandler(BlockingQueue<SourceRecord> recordQueue) {
NoTombStonesHandler(BlockingQueue<SourceRecord> recordQueue) {
this.recordQueue = recordQueue;
}

View File

@ -76,7 +76,7 @@ public void shouldAcceptAllZeroDatetimeInPrimaryKey() throws SQLException, Inter
assertKey(changes);
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("SET sql_mode='';");
conn.createStatement().execute("INSERT INTO dbz_1194_datetime_key_test VALUES (default, '0000-00-00 00:00:00', '0000-00-00', '00:00:00')");
}

View File

@ -134,7 +134,7 @@ public void shouldParseComment() throws SQLException, InterruptedException {
assertThat(valueSchemaParameters).contains(entry(COLUMN_COMMENT_PARAMETER_KEY, "the value is bigint type"));
// Add a column with comment
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("ALTER TABLE dbz_4000_comment_test ADD COLUMN remark TEXT COMMENT 'description'");
}
records = consumeRecordsByTopic(1);

View File

@ -73,7 +73,7 @@ public void shouldHandleTinyIntAsNumber() throws SQLException, InterruptedExcept
assertIntChangeRecord();
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("INSERT INTO DBZ1773 VALUES (DEFAULT, 100, 5, 50, true)");
}
assertIntChangeRecord();
@ -100,7 +100,7 @@ public void shouldHandleTinyIntOneAsBoolean() throws SQLException, InterruptedEx
assertBooleanChangeRecord();
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("INSERT INTO DBZ1773 VALUES (DEFAULT, 100, 5, 50, true)");
}
assertBooleanChangeRecord();
@ -127,7 +127,7 @@ public void shouldDefaultValueForTinyIntOneAsBoolean() throws SQLException, Inte
assertDefaultValueBooleanChangeRecord();
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("INSERT INTO DBZ2085 VALUES (DEFAULT, true)");
}
assertDefaultValueBooleanChangeRecord();
@ -154,7 +154,7 @@ public void shouldHandleUnsignedTinyIntOneAsBoolean() throws SQLException, Inter
assertUnsignedBooleanChangeRecord();
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("INSERT INTO DBZ5236 VALUES (DEFAULT, 1, 1, 0)");
}
assertUnsignedBooleanChangeRecord();
@ -183,7 +183,7 @@ public void shouldHandleMySQL8TinyIntAsBoolean() throws SQLException, Interrupte
assertUnsignedBooleanChangeRecord();
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("INSERT INTO DBZ5236 VALUES (DEFAULT, 1, 1, 0)");
}
assertUnsignedBooleanChangeRecord();

View File

@ -81,7 +81,7 @@ public void testSpecifyDelimiterAndPrefixStrategy() throws SQLException, Interru
assertThat(schemaChangeEvents.size()).isEqualTo(10);
// insert data
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("INSERT INTO dbz4180(a, b, c, d) VALUE (10.1, 10.2, 'strategy 1', 1290)");
}

View File

@ -77,7 +77,7 @@ public void shouldProcessTwoAndForDigitYearsInDatabase() throws SQLException, In
assertChangeRecordByDatabase();
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("INSERT INTO dbz_1143_year_test VALUES (\n" +
" default,\n" +
" '18',\n" +
@ -127,7 +127,7 @@ public void shouldProcessTwoAndForDigitYearsInConnector() throws SQLException, I
assertChangeRecordByConnector();
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("INSERT INTO dbz_1143_year_test VALUES (\n" +
" default,\n" +
" '18',\n" +

View File

@ -521,7 +521,7 @@ public void tinyIntBooleanTest() throws Exception {
// Testing.Print.enable();
consumeRecordsByTopic(EVENT_COUNT);
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("CREATE TABLE ti_boolean_table (" +
" A TINYINT(1) NOT NULL DEFAULT TRUE," +
" B TINYINT(2) NOT NULL DEFAULT FALSE" +
@ -551,7 +551,7 @@ public void intBooleanTest() throws Exception {
Testing.Print.enable();
waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName());
consumeRecordsByTopic(EVENT_COUNT);
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
try (Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("CREATE TABLE int_boolean_table (" +
" A INT(1) NOT NULL DEFAULT TRUE," +
" B INT(2) NOT NULL DEFAULT FALSE" +

View File

@ -385,7 +385,7 @@ public static class Row {
}
public static interface UpdateBuilder {
public interface UpdateBuilder {
RowBuilder to(Serializable... values);
}

View File

@ -136,7 +136,7 @@ protected void sendKafkaSignal(String signalValue) throws ExecutionException, In
.withDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.withDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.build();
try (final KafkaProducer<String, String> producer = new KafkaProducer<>(signalProducerConfig.asProperties())) {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(signalProducerConfig.asProperties())) {
producer.send(executeSnapshotSignal).get();
}
}
@ -326,13 +326,13 @@ public void testPauseDuringSnapshotKafkaSignal() throws Exception {
}
protected void populate4PkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "a4");
}
}
protected void populate4WithoutPkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "a42");
}
}

View File

@ -381,10 +381,10 @@ public void shouldNotFailStreamingOnNonSnapshottedTable() throws Exception {
assertThat(orders.numberOfReads()).isEqualTo(5);
try (
final MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());
final JdbcConnection connection = db.connect();
final Connection jdbc = connection.connection();
final Statement statement = jdbc.createStatement()) {
MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());
JdbcConnection connection = db.connect();
Connection jdbc = connection.connection();
Statement statement = jdbc.createStatement()) {
statement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
}
@ -522,7 +522,7 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingInsertEvents() throws Excep
}
private String productsTableName() throws SQLException {
try (final MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
return db.isTableIdCaseSensitive() ? "products" : "Products";
}
}
@ -644,10 +644,10 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
start(MySqlConnector.class, config);
try (
final MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());
final JdbcConnection connection = db.connect();
final Connection jdbc = connection.connection();
final Statement statement = jdbc.createStatement()) {
MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());
JdbcConnection connection = db.connect();
Connection jdbc = connection.connection();
Statement statement = jdbc.createStatement()) {
statement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
}
recordCount = 1;

View File

@ -578,10 +578,10 @@ private void inconsistentSchema(EventProcessingFailureHandlingMode mode) throws
start(MySqlConnector.class, config, (success, message, error) -> exception.set(error));
try (
final MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());
final JdbcConnection connection = db.connect();
final Connection jdbc = connection.connection();
final Statement statement = jdbc.createStatement()) {
MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());
JdbcConnection connection = db.connect();
Connection jdbc = connection.connection();
Statement statement = jdbc.createStatement()) {
if (mode == null) {
waitForStreamingRunning("mysql", DATABASE.getServerName(), "streaming");
}
@ -608,7 +608,7 @@ private Duration toDuration(String duration) {
}
private String productsTableName() throws SQLException {
try (final MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
return db.isTableIdCaseSensitive() ? "products" : "Products";
}
}

View File

@ -258,7 +258,7 @@ public void shouldIgnoreUnparseableMessages() throws Exception {
.withDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.withDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.build();
try (final KafkaProducer<String, String> producer = new KafkaProducer<>(intruderConfig.asProperties())) {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(intruderConfig.asProperties())) {
producer.send(nullRecord).get();
producer.send(emptyRecord).get();
producer.send(noSourceRecord).get();
@ -289,7 +289,7 @@ public void shouldStopOnUnparseableSQL() throws Exception {
.withDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.withDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.build();
try (final KafkaProducer<String, String> producer = new KafkaProducer<>(intruderConfig.asProperties())) {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(intruderConfig.asProperties())) {
producer.send(invalidSQL).get();
}

View File

@ -773,7 +773,7 @@ public enum SnapshotMode implements EnumeratedValue {
private final boolean shouldStream;
private final boolean shouldSnapshotOnSchemaError;
private SnapshotMode(String value, boolean includeData, boolean shouldStream, boolean shouldSnapshotOnSchemaError) {
SnapshotMode(String value, boolean includeData, boolean shouldStream, boolean shouldSnapshotOnSchemaError) {
this.value = value;
this.includeData = includeData;
this.shouldStream = shouldStream;
@ -861,7 +861,7 @@ public enum SnapshotLockingMode implements EnumeratedValue {
private final String value;
private SnapshotLockingMode(String value) {
SnapshotLockingMode(String value) {
this.value = value;
}
@ -939,7 +939,7 @@ public enum TransactionSnapshotBoundaryMode implements EnumeratedValue {
private final String value;
private TransactionSnapshotBoundaryMode(String value) {
TransactionSnapshotBoundaryMode(String value) {
this.value = value;
}

View File

@ -293,7 +293,7 @@ private static class OracleSnapshotContext extends RelationalSnapshotContext<Ora
private Savepoint preSchemaSnapshotSavepoint;
public OracleSnapshotContext(OraclePartition partition, String catalogName) throws SQLException {
OracleSnapshotContext(OraclePartition partition, String catalogName) throws SQLException {
super(partition, catalogName);
}
}

View File

@ -64,11 +64,11 @@ class LcrEventHandler implements XStreamLCRCallbackHandler {
private final Map<String, ChunkColumnValues> columnChunks;
private RowLCR currentRow;
public LcrEventHandler(OracleConnectorConfig connectorConfig, ErrorHandler errorHandler,
EventDispatcher<OraclePartition, TableId> dispatcher, Clock clock,
OracleDatabaseSchema schema, OraclePartition partition, OracleOffsetContext offsetContext,
boolean tablenameCaseInsensitive, XstreamStreamingChangeEventSource eventSource,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
LcrEventHandler(OracleConnectorConfig connectorConfig, ErrorHandler errorHandler,
EventDispatcher<OraclePartition, TableId> dispatcher, Clock clock,
OracleDatabaseSchema schema, OraclePartition partition, OracleOffsetContext offsetContext,
boolean tablenameCaseInsensitive, XstreamStreamingChangeEventSource eventSource,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
this.connectorConfig = connectorConfig;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;

View File

@ -818,8 +818,8 @@ private static class ColumnDefinition {
public final AssertionType assertionType;
public final boolean temporalType;
public ColumnDefinition(String name, String definition, String addDefaultValue, String modifyDefaultValue,
Object expectedAddDefaultValue, Object expectedModifyDefaultValue, AssertionType assertionType) {
ColumnDefinition(String name, String definition, String addDefaultValue, String modifyDefaultValue,
Object expectedAddDefaultValue, Object expectedModifyDefaultValue, AssertionType assertionType) {
this.name = name;
this.definition = definition;
this.addDefaultValue = addDefaultValue;

View File

@ -236,7 +236,7 @@ protected void setSnapshotTransactionIsolationLevel() throws SQLException {
*/
private static class PostgresSnapshotContext extends RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> {
public PostgresSnapshotContext(PostgresPartition partition, String catalogName) throws SQLException {
PostgresSnapshotContext(PostgresPartition partition, String catalogName) throws SQLException {
super(partition, catalogName);
}
}

View File

@ -453,7 +453,7 @@ private String toString(OptionalLong l) {
}
@FunctionalInterface
public static interface PgConnectionSupplier {
public interface PgConnectionSupplier {
BaseConnection get() throws SQLException;
}
}

View File

@ -314,8 +314,8 @@ public static String normalizeTypeName(String typeName) {
* Prime the {@link TypeRegistry} with all existing database types
*/
private void prime() throws SQLException {
try (final Statement statement = connection.connection().createStatement();
final ResultSet rs = statement.executeQuery(SQL_TYPES)) {
try (Statement statement = connection.connection().createStatement();
ResultSet rs = statement.executeQuery(SQL_TYPES)) {
final List<PostgresType.Builder> delayResolvedBuilders = new ArrayList<>();
while (rs.next()) {
PostgresType.Builder builder = createTypeBuilderFromResultSet(rs);
@ -368,7 +368,7 @@ private PostgresType resolveUnknownType(String name) {
try {
LOGGER.trace("Type '{}' not cached, attempting to lookup from database.", name);
try (final PreparedStatement statement = connection.connection().prepareStatement(SQL_NAME_LOOKUP)) {
try (PreparedStatement statement = connection.connection().prepareStatement(SQL_NAME_LOOKUP)) {
statement.setString(1, name);
return loadType(statement);
}
@ -382,7 +382,7 @@ private PostgresType resolveUnknownType(int lookupOid) {
try {
LOGGER.trace("Type OID '{}' not cached, attempting to lookup from database.", lookupOid);
try (final PreparedStatement statement = connection.connection().prepareStatement(SQL_OID_LOOKUP)) {
try (PreparedStatement statement = connection.connection().prepareStatement(SQL_OID_LOOKUP)) {
statement.setInt(1, lookupOid);
return loadType(statement);
}
@ -393,7 +393,7 @@ private PostgresType resolveUnknownType(int lookupOid) {
}
private PostgresType loadType(PreparedStatement statement) throws SQLException {
try (final ResultSet rs = statement.executeQuery()) {
try (ResultSet rs = statement.executeQuery()) {
while (rs.next()) {
PostgresType result = createTypeBuilderFromResultSet(rs).build();
addType(result);
@ -479,8 +479,8 @@ public int getSqlType(String typeName) throws SQLException {
private static Map<String, Integer> getSqlTypes(PostgresConnection connection) throws SQLException {
Map<String, Integer> sqlTypesByPgTypeNames = new HashMap<>();
try (final Statement statement = connection.connection().createStatement()) {
try (final ResultSet rs = statement.executeQuery(SQL_TYPE_DETAILS)) {
try (Statement statement = connection.connection().createStatement()) {
try (ResultSet rs = statement.executeQuery(SQL_TYPE_DETAILS)) {
while (rs.next()) {
int type;
boolean isArray = rs.getBoolean(2);

View File

@ -34,23 +34,23 @@
*
*/
public interface DateTimeFormat {
public Instant timestampToInstant(final String s);
Instant timestampToInstant(String s);
public OffsetDateTime timestampWithTimeZoneToOffsetDateTime(final String s);
OffsetDateTime timestampWithTimeZoneToOffsetDateTime(String s);
public Instant systemTimestampToInstant(final String s);
Instant systemTimestampToInstant(String s);
public LocalDate date(final String s);
LocalDate date(String s);
public LocalTime time(final String s);
LocalTime time(String s);
public OffsetTime timeWithTimeZone(final String s);
OffsetTime timeWithTimeZone(String s);
public static DateTimeFormat get() {
static DateTimeFormat get() {
return new ISODateTimeFormat();
}
public static class ISODateTimeFormat implements DateTimeFormat {
class ISODateTimeFormat implements DateTimeFormat {
private static final Logger LOGGER = LoggerFactory.getLogger(ISODateTimeFormat.class);
// This formatter is similar to standard Java's ISO_LOCAL_DATE. But this one is

View File

@ -112,7 +112,7 @@ interface Builder {
* @return this instance
* @see #DEFAULT_SLOT_NAME
*/
Builder withSlot(final String slotName);
Builder withSlot(String slotName);
/**
* Sets the publication name for the PG logical publication
@ -121,7 +121,7 @@ interface Builder {
* @return this instance
* @see #DEFAULT_PUBLICATION_NAME
*/
Builder withPublication(final String publicationName);
Builder withPublication(String publicationName);
/**
* Sets the publication tables to watch for the PG logical publication
@ -130,7 +130,7 @@ interface Builder {
* @return this instance
* @see #config.getTableFilters()
*/
Builder withTableFilter(final RelationalTableFilters tableFilter);
Builder withTableFilter(RelationalTableFilters tableFilter);
/**
* Sets the publication autocreate mode for the PG logical publication
@ -139,7 +139,7 @@ interface Builder {
* @return this instance
* @see #PostgresConnectorConfig.PublicationAutocreateMode.ALL_TABLES
*/
Builder withPublicationAutocreateMode(final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode);
Builder withPublicationAutocreateMode(PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode);
/**
* Sets the instance for the PG logical decoding plugin
@ -148,7 +148,7 @@ interface Builder {
* @return this instance
* @see #PROTOBUF_PLUGIN_NAME
*/
Builder withPlugin(final PostgresConnectorConfig.LogicalDecoder plugin);
Builder withPlugin(PostgresConnectorConfig.LogicalDecoder plugin);
/**
* Whether or not to drop the replication slot once the replication connection closes
@ -157,7 +157,7 @@ interface Builder {
* @return this instance
* @see #DEFAULT_DROP_SLOT_ON_CLOSE
*/
Builder dropSlotOnClose(final boolean dropSlotOnClose);
Builder dropSlotOnClose(boolean dropSlotOnClose);
/**
* The number of milli-seconds the replication connection should periodically send updates to the server.
@ -165,7 +165,7 @@ interface Builder {
* @param statusUpdateInterval a duration; null or non-positive value causes Postgres' default to be applied
* @return this instance
*/
Builder statusUpdateInterval(final Duration statusUpdateInterval);
Builder statusUpdateInterval(Duration statusUpdateInterval);
Builder withTypeRegistry(TypeRegistry typeRegistry);
@ -185,7 +185,7 @@ interface Builder {
* @return this instance
* @see #STREAM_PARAMS
*/
Builder streamParams(final String streamParams);
Builder streamParams(String streamParams);
/**
* Provides a JDBC connection used to query metadata, database information, ...

View File

@ -40,7 +40,7 @@ public interface ReplicationMessage {
* Data modification operation executed
*
*/
public enum Operation {
enum Operation {
INSERT,
UPDATE,
DELETE,
@ -54,7 +54,7 @@ public enum Operation {
/**
* A representation of column value delivered as a part of replication message
*/
public interface Column {
interface Column {
String getName();
PostgresType getType();
@ -65,7 +65,7 @@ public interface Column {
*/
ColumnTypeMetadata getTypeMetadata();
Object getValue(final PgConnectionSupplier connection, boolean includeUnknownDatatypes);
Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes);
boolean isOptional();
@ -74,13 +74,13 @@ default boolean isToastedColumn() {
}
}
public interface ColumnTypeMetadata {
interface ColumnTypeMetadata {
int getLength();
int getScale();
}
public interface ColumnValue<T> {
interface ColumnValue<T> {
T getRawValue();
boolean isNull();
@ -141,33 +141,33 @@ public interface ColumnValue<T> {
/**
* @return A data operation executed
*/
public Operation getOperation();
Operation getOperation();
/**
* @return Transaction commit time for this change
*/
public Instant getCommitTime();
Instant getCommitTime();
/**
* @return An id of transaction to which this change belongs; will not be
* present for non-transactional logical decoding messages for instance
*/
public OptionalLong getTransactionId();
OptionalLong getTransactionId();
/**
* @return Table changed
*/
public String getTable();
String getTable();
/**
* @return Set of original values of table columns, null for INSERT
*/
public List<Column> getOldTupleList();
List<Column> getOldTupleList();
/**
* @return Set of new values of table columns, null for DELETE
*/
public List<Column> getNewTupleList();
List<Column> getNewTupleList();
/**
* @return true if this is the last message in the batch of messages with same LSN
@ -192,7 +192,7 @@ default boolean isTransactionalMessage() {
* A special message type that is used to replace event filtered already at {@link MessageDecoder}.
* Enables {@link PostgresStreamingChangeEventSource} to advance LSN forward even in case of such messages.
*/
public class NoopMessage implements ReplicationMessage {
class NoopMessage implements ReplicationMessage {
private final Long transactionId;
private final Instant commitTime;

View File

@ -19,7 +19,7 @@
public interface ReplicationStream extends AutoCloseable {
@FunctionalInterface
public interface ReplicationMessageProcessor {
interface ReplicationMessageProcessor {
/**
* Processes the given replication message.

View File

@ -35,7 +35,7 @@ class PgProtoReplicationMessage implements ReplicationMessage {
private final PgProto.RowMessage rawMessage;
private final TypeRegistry typeRegistry;
public PgProtoReplicationMessage(PgProto.RowMessage rawMessage, TypeRegistry typeRegistry) {
PgProtoReplicationMessage(PgProto.RowMessage rawMessage, TypeRegistry typeRegistry) {
this.rawMessage = rawMessage;
this.typeRegistry = typeRegistry;

View File

@ -1151,7 +1151,7 @@ protected static TableId tableIdFromDeleteStmt(String statement) {
protected static class SchemaAndValueField {
@FunctionalInterface
protected static interface Condition {
protected interface Condition {
void assertField(String fieldName, Object expectedValue, Object actualValue);
}

View File

@ -90,7 +90,7 @@ public void shouldSerializeToJson() throws Exception {
CountDownLatch allLatch = new CountDownLatch(1);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try (final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props)
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props)
.notifying((records, committer) -> {
for (ChangeEvent<String, String> r : records) {
@ -138,7 +138,7 @@ public void shouldSerializeToAvro() throws Exception {
CountDownLatch allLatch = new CountDownLatch(1);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try (final DebeziumEngine<ChangeEvent<byte[], byte[]>> engine = DebeziumEngine.create(Avro.class).using(props)
try (DebeziumEngine<ChangeEvent<byte[], byte[]>> engine = DebeziumEngine.create(Avro.class).using(props)
.notifying((records, committer) -> {
Assert.fail("Should not be invoked due to serialization error");
})
@ -177,7 +177,7 @@ public void shouldSerializeToCloudEvents() throws Exception {
CountDownLatch allLatch = new CountDownLatch(1);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try (final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class, CloudEvents.class).using(props)
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class, CloudEvents.class).using(props)
.notifying((records, committer) -> {
for (ChangeEvent<String, String> r : records) {

View File

@ -200,7 +200,7 @@ record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()
public void insertsNumericPk() throws Exception {
// Testing.Print.enable();
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
populateTable(connection, "s1.anumeric");
}
startConnector();
@ -232,7 +232,7 @@ public void snapshotPartitionedTable() throws Exception {
TestHelper.execute(SETUP_TABLES);
// insert records
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
populateTable(connection, "s1.part");
}
@ -302,13 +302,13 @@ record -> ((Struct) record.value()).getStruct("source"),
}
protected void populate4PkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "s1.a4");
}
}
protected void populate4WithoutPkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "s1.a42");
}
}

View File

@ -1344,7 +1344,7 @@ public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException
assertNoRecordsToConsume();
final Set<String> flushLsn = new HashSet<>();
try (final PostgresConnection connection = TestHelper.create()) {
try (PostgresConnection connection = TestHelper.create()) {
flushLsn.add(getConfirmedFlushLsn(connection));
for (int i = 2; i <= recordCount + 2; i++) {
TestHelper.execute(INSERT_STMT);
@ -1400,7 +1400,7 @@ public void shouldRegularlyFlushLsnWithTxMonitoring() throws InterruptedExceptio
assertOnlyTransactionRecordsToConsume();
final Set<String> flushLsn = new HashSet<>();
try (final PostgresConnection connection = TestHelper.create()) {
try (PostgresConnection connection = TestHelper.create()) {
flushLsn.add(getConfirmedFlushLsn(connection));
for (int i = 2; i <= recordCount + 2; i++) {
TestHelper.execute(INSERT_STMT);

View File

@ -32,7 +32,7 @@ public void shouldNotThrowNullPointerExceptionDuringCommit() throws Exception {
}
class FakeContext extends PostgresTaskContext {
public FakeContext(PostgresConnectorConfig postgresConnectorConfig, PostgresSchema postgresSchema) {
FakeContext(PostgresConnectorConfig postgresConnectorConfig, PostgresSchema postgresSchema) {
super(postgresConnectorConfig, postgresSchema, null);
}

View File

@ -219,21 +219,21 @@ public static void dropAllSchemas() throws SQLException {
public static TypeRegistry getTypeRegistry() {
final PostgresConnectorConfig config = new PostgresConnectorConfig(defaultConfig().build());
try (final PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), CONNECTION_TEST)) {
try (PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), CONNECTION_TEST)) {
return connection.getTypeRegistry();
}
}
public static PostgresDefaultValueConverter getDefaultValueConverter() {
final PostgresConnectorConfig config = new PostgresConnectorConfig(defaultConfig().build());
try (final PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), CONNECTION_TEST)) {
try (PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), CONNECTION_TEST)) {
return connection.getDefaultValueConverter();
}
}
public static Charset getDatabaseCharset() {
final PostgresConnectorConfig config = new PostgresConnectorConfig(defaultConfig().build());
try (final PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), CONNECTION_TEST)) {
try (PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), getPostgresValueConverterBuilder(config), CONNECTION_TEST)) {
return connection.getDatabaseCharset();
}
}

View File

@ -21,7 +21,7 @@ public class PostgresDatabaseVersionResolver implements DatabaseVersionResolver
@Override
public DatabaseVersion getVersion() {
try {
try (final PostgresConnection postgresConnection = TestHelper.create()) {
try (PostgresConnection postgresConnection = TestHelper.create()) {
final DatabaseMetaData metadata = postgresConnection.connection().getMetaData();
return new DatabaseVersion(metadata.getDatabaseMajorVersion(), metadata.getDatabaseMinorVersion(), 0);
}

View File

@ -51,7 +51,7 @@ public class SqlServerConnectorConfig extends HistorizedRelationalDatabaseConnec
/**
* The set of predefined SnapshotMode options or aliases.
*/
public static enum SnapshotMode implements EnumeratedValue {
public enum SnapshotMode implements EnumeratedValue {
/**
* Perform a snapshot of data and schema upon initial startup of a connector.
@ -71,7 +71,7 @@ public static enum SnapshotMode implements EnumeratedValue {
private final String value;
private final boolean includeData;
private SnapshotMode(String value, boolean includeData) {
SnapshotMode(String value, boolean includeData) {
this.value = value;
this.includeData = includeData;
}
@ -131,7 +131,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
/**
* The set of predefined snapshot isolation mode options.
*/
public static enum SnapshotIsolationMode implements EnumeratedValue {
public enum SnapshotIsolationMode implements EnumeratedValue {
/**
* This mode will block all reads and writes for the entire duration of the snapshot.
@ -170,7 +170,7 @@ public static enum SnapshotIsolationMode implements EnumeratedValue {
private final String value;
private SnapshotIsolationMode(String value) {
SnapshotIsolationMode(String value) {
this.value = value;
}

View File

@ -26,7 +26,7 @@ public interface SqlServerJdbcConfiguration extends JdbcConfiguration {
/**
* A field for the named instance of the database server. This field has no default value.
*/
public static final Field INSTANCE = Field.create("instance",
Field INSTANCE = Field.create("instance",
"Named instance of the database server");
/**
@ -35,7 +35,7 @@ public interface SqlServerJdbcConfiguration extends JdbcConfiguration {
* @param config the configuration; may not be null
* @return the SqlServerJdbcConfiguration; never null
*/
public static SqlServerJdbcConfiguration adapt(Configuration config) {
static SqlServerJdbcConfiguration adapt(Configuration config) {
if (config instanceof SqlServerJdbcConfiguration) {
return (SqlServerJdbcConfiguration) config;
}
@ -63,7 +63,7 @@ public String toString() {
* @see SqlServerJdbcConfiguration#copy(Configuration)
* @see SqlServerJdbcConfiguration#create()
*/
public static interface Builder extends Configuration.ConfigBuilder<SqlServerJdbcConfiguration, Builder> {
interface Builder extends Configuration.ConfigBuilder<SqlServerJdbcConfiguration, Builder> {
/**
* Use the given named instance in the resulting configuration.
*
@ -81,7 +81,7 @@ default Builder withInstance(String instance) {
* @param config the configuration to copy
* @return the configuration builder
*/
public static Builder copy(Configuration config) {
static Builder copy(Configuration config) {
return new Builder() {
private Configuration.Builder builder = Configuration.copy(config);
@ -132,7 +132,7 @@ public String toString() {
*
* @return the configuration builder
*/
public static Builder create() {
static Builder create() {
return new Builder() {
private Configuration.Builder builder = Configuration.create();

View File

@ -289,7 +289,7 @@ private static class SqlServerSnapshotContext extends RelationalSnapshotContext<
private int isolationLevelBeforeStart;
private Savepoint preSchemaSnapshotSavepoint;
public SqlServerSnapshotContext(SqlServerPartition partition) throws SQLException {
SqlServerSnapshotContext(SqlServerPartition partition) throws SQLException {
super(partition, partition.getDatabaseName());
}
}

View File

@ -239,7 +239,7 @@ public void readOnlyApplicationIntent() throws Exception {
assertThat(logInterceptor.containsMessage("Schema locking was disabled in connector configuration")).isTrue();
// Verify that multiple subsequent transactions are used in streaming phase with read-only intent
try (final SqlServerConnection admin = TestHelper.adminConnection()) {
try (SqlServerConnection admin = TestHelper.adminConnection()) {
final Set<Long> txIds = new HashSet<>();
Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> {
admin.query(

View File

@ -654,7 +654,7 @@ static class CdcRecordFoundBlockingMultiResultSetConsumer implements JdbcConnect
private final CdcRecordHandler handler;
private boolean found;
public CdcRecordFoundBlockingMultiResultSetConsumer(CdcRecordHandler handler) {
CdcRecordFoundBlockingMultiResultSetConsumer(CdcRecordHandler handler) {
this.handler = handler;
}

View File

@ -73,7 +73,7 @@ public interface Configuration {
* @param <C> the type of configuration
* @param <B> the type of builder
*/
public static interface ConfigBuilder<C extends Configuration, B extends ConfigBuilder<C, B>> {
interface ConfigBuilder<C extends Configuration, B extends ConfigBuilder<C, B>> {
/**
* Associate the given value with the specified key.
*
@ -667,7 +667,7 @@ default B changeInteger(Field field, Function<Integer, Integer> function) {
/**
* A builder of Configuration objects.
*/
public static class Builder implements ConfigBuilder<Configuration, Builder> {
class Builder implements ConfigBuilder<Configuration, Builder> {
private final Properties props = new Properties();
protected Builder() {
@ -727,7 +727,7 @@ public Configuration build() {
*
* @return the configuration builder
*/
public static Builder create() {
static Builder create() {
return new Builder();
}
@ -737,7 +737,7 @@ public static Builder create() {
* @param config the configuration to copy; may be null
* @return the configuration builder
*/
public static Builder copy(Configuration config) {
static Builder copy(Configuration config) {
return config != null ? new Builder(config.asProperties()) : new Builder();
}
@ -747,7 +747,7 @@ public static Builder copy(Configuration config) {
* @param prefix the required prefix for the system properties; may not be null but may be empty
* @return the configuration
*/
public static Configuration fromSystemProperties(String prefix) {
static Configuration fromSystemProperties(String prefix) {
return empty().withSystemProperties(prefix);
}
@ -756,7 +756,7 @@ public static Configuration fromSystemProperties(String prefix) {
*
* @return an empty configuration; never null
*/
public static Configuration empty() {
static Configuration empty() {
return new Configuration() {
@Override
public Set<String> keys() {
@ -782,7 +782,7 @@ public String toString() {
* @param properties the properties; may be null or empty
* @return the configuration; never null
*/
public static Configuration from(Properties properties) {
static Configuration from(Properties properties) {
Properties props = new Properties();
if (properties != null) {
props.putAll(properties);
@ -812,7 +812,7 @@ public String toString() {
* @param properties the properties; may be null or empty
* @return the configuration; never null
*/
public static Configuration from(Map<String, ?> properties) {
static Configuration from(Map<String, ?> properties) {
return from(properties, value -> {
if (value == null) {
return null;
@ -833,7 +833,7 @@ public static Configuration from(Map<String, ?> properties) {
* is to be excluded
* @return the configuration; never null
*/
public static <T> Configuration from(Map<String, T> properties, Function<T, String> conversion) {
static <T> Configuration from(Map<String, T> properties, Function<T, String> conversion) {
Map<String, T> props = new HashMap<>();
if (properties != null) {
props.putAll(properties);
@ -863,7 +863,7 @@ public String toString() {
* @return the configuration; never null
* @throws IOException if there is an error reading the stream
*/
public static Configuration load(URL url) throws IOException {
static Configuration load(URL url) throws IOException {
try (InputStream stream = url.openStream()) {
return load(stream);
}
@ -876,7 +876,7 @@ public static Configuration load(URL url) throws IOException {
* @return the configuration; never null
* @throws IOException if there is an error reading the stream
*/
public static Configuration load(File file) throws IOException {
static Configuration load(File file) throws IOException {
try (InputStream stream = new FileInputStream(file)) {
return load(stream);
}
@ -889,7 +889,7 @@ public static Configuration load(File file) throws IOException {
* @return the configuration; never null
* @throws IOException if there is an error reading the stream
*/
public static Configuration load(InputStream stream) throws IOException {
static Configuration load(InputStream stream) throws IOException {
try {
Properties properties = new Properties();
properties.load(stream);
@ -907,7 +907,7 @@ public static Configuration load(InputStream stream) throws IOException {
* @return the configuration; never null
* @throws IOException if there is an error reading the stream
*/
public static Configuration load(Reader reader) throws IOException {
static Configuration load(Reader reader) throws IOException {
try {
Properties properties = new Properties();
properties.load(reader);
@ -927,7 +927,7 @@ public static Configuration load(Reader reader) throws IOException {
* @return the configuration; never null but possibly empty
* @throws IOException if there is an error reading the stream
*/
public static Configuration load(String path, Class<?> clazz) throws IOException {
static Configuration load(String path, Class<?> clazz) throws IOException {
return load(path, clazz.getClassLoader());
}
@ -940,7 +940,7 @@ public static Configuration load(String path, Class<?> clazz) throws IOException
* @return the configuration; never null but possibly empty
* @throws IOException if there is an error reading the stream
*/
public static Configuration load(String path, ClassLoader classLoader) throws IOException {
static Configuration load(String path, ClassLoader classLoader) throws IOException {
Logger logger = LoggerFactory.getLogger(Configuration.class);
return load(path, classLoader, logger::debug);
}
@ -955,7 +955,7 @@ public static Configuration load(String path, ClassLoader classLoader) throws IO
* @return the configuration; never null but possibly empty
* @throws IOException if there is an error reading the stream
*/
public static Configuration load(String path, ClassLoader classLoader, Consumer<String> logger) throws IOException {
static Configuration load(String path, ClassLoader classLoader, Consumer<String> logger) throws IOException {
try (InputStream stream = IoUtil.getResourceAsStream(path, classLoader, null, null, logger)) {
Properties props = new Properties();
if (stream != null) {
@ -1000,7 +1000,7 @@ default boolean hasKey(Field field) {
*
* @return the set of keys; never null but possibly empty
*/
public Set<String> keys();
Set<String> keys();
/**
* Get the string value associated with the given key.
@ -1008,7 +1008,7 @@ default boolean hasKey(Field field) {
* @param key the key for the configuration property
* @return the value, or null if the key is null or there is no such key-value pair in the configuration
*/
public String getString(String key);
String getString(String key);
/**
* Get the string value associated with the given key, returning the default value if there is no such key-value pair.

View File

@ -246,7 +246,7 @@ public interface Recommender {
* @param config the configuration; may not be null
* @return the list of valid values
*/
public List<Object> validValues(Field field, Configuration config);
List<Object> validValues(Field field, Configuration config);
/**
* Set the visibility of the field given the current configuration values.
@ -254,7 +254,7 @@ public interface Recommender {
* @param config the configuration; may not be null
* @return {@code true} if the field is to be visible, or {@code false} otherwise
*/
public boolean visible(Field field, Configuration config);
boolean visible(Field field, Configuration config);
}
public enum Group {

View File

@ -22,7 +22,7 @@ public interface SourceInfoStructMaker<T extends AbstractSourceInfo> {
/**
* Returns the schema of the source info.
*/
public Schema schema();
Schema schema();
/**
* Converts the connector's source info into the struct to be included in the message as the source field.
@ -30,5 +30,5 @@ public interface SourceInfoStructMaker<T extends AbstractSourceInfo> {
* @param sourceInfo
* @return the converted struct
*/
public Struct struct(T sourceInfo);
Struct struct(T sourceInfo);
}

View File

@ -51,7 +51,7 @@ public abstract class BaseSourceTask<P extends Partition, O extends OffsetContex
private static final Duration INITIAL_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.SECONDS.toMillis(5));
private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1));
protected static enum State {
protected enum State {
RUNNING,
STOPPED;
}

View File

@ -31,7 +31,7 @@ public final class Envelope {
/**
* The constants for the values for the {@link FieldName#OPERATION operation} field in the message envelope.
*/
public static enum Operation {
public enum Operation {
/**
* The operation that read the current state of a record, most typically during snapshots.
*/
@ -59,7 +59,7 @@ public static enum Operation {
private final String code;
private Operation(String code) {
Operation(String code) {
this.code = code;
}
@ -141,7 +141,7 @@ public static final class FieldName {
/**
* A builder of an envelope schema.
*/
public static interface Builder {
public interface Builder {
/**
* Define the {@link Schema} used in the {@link FieldName#BEFORE} and {@link FieldName#AFTER} fields.
*

View File

@ -38,7 +38,7 @@ public class SpecialValueDecimal implements Serializable, ValueWrapper<BigDecima
/**
* Special values for floating-point and numeric types
*/
private static enum SpecialValue {
private enum SpecialValue {
NAN,
POSITIVE_INFINITY,
NEGATIVE_INFINITY;

View File

@ -24,7 +24,7 @@
*/
public interface Array extends Iterable<Array.Entry>, Comparable<Array> {
static interface Entry extends Comparable<Entry> {
interface Entry extends Comparable<Entry> {
/**
* Get the index of the entry

View File

@ -53,11 +53,11 @@ public Entry apply(Integer index, Value value) {
}
}
protected final int indexFrom(CharSequence name) {
protected int indexFrom(CharSequence name) {
return Integer.parseInt(name.toString());
}
protected final boolean isValidIndex(int index) {
protected boolean isValidIndex(int index) {
return index >= 0 && index < size();
}

View File

@ -28,7 +28,7 @@
@NotThreadSafe
public interface Document extends Iterable<Document.Field>, Comparable<Document> {
static interface Field extends Comparable<Field> {
interface Field extends Comparable<Field> {
/**
* Get the name of the field

View File

@ -18,20 +18,20 @@
@Immutable
public interface Path extends Iterable<String> {
public static interface Segments {
public static boolean isAfterLastIndex(String segment) {
interface Segments {
static boolean isAfterLastIndex(String segment) {
return "-".equals(segment);
}
public static boolean isArrayIndex(String segment) {
static boolean isArrayIndex(String segment) {
return isAfterLastIndex(segment) || asInteger(segment).isPresent();
}
public static boolean isFieldName(String segment) {
static boolean isFieldName(String segment) {
return !isArrayIndex(segment);
}
public static Optional<Integer> asInteger(String segment) {
static Optional<Integer> asInteger(String segment) {
try {
return Optional.of(Integer.valueOf(segment));
}
@ -40,7 +40,7 @@ public static Optional<Integer> asInteger(String segment) {
}
}
public static Optional<Integer> asInteger(Optional<String> segment) {
static Optional<Integer> asInteger(Optional<String> segment) {
return segment.isPresent() ? asInteger(segment.get()) : Optional.empty();
}
}
@ -50,7 +50,7 @@ public static Optional<Integer> asInteger(Optional<String> segment) {
*
* @return the shared root path; never null
*/
public static Path root() {
static Path root() {
return Paths.RootPath.INSTANCE;
}

View File

@ -52,7 +52,7 @@ private static String parseSegment(String segment, boolean resolveJsonPointerEsc
return segment;
}
static interface InnerPath {
interface InnerPath {
int copyInto(String[] segments, int start);
}

View File

@ -23,7 +23,7 @@
@Immutable
public interface Value extends Comparable<Value> {
static enum Type {
enum Type {
NULL,
STRING,
BOOLEAN,
@ -407,7 +407,7 @@ default boolean ifNull(NullHandler consumer) {
}
@FunctionalInterface
static interface NullHandler {
interface NullHandler {
void call();
}

View File

@ -28,7 +28,7 @@ public interface BufferedBlockingConsumer<T> extends BlockingConsumer<T> {
* @param function the function to apply to the values that are flushed
* @throws InterruptedException if the thread is interrupted while this consumer is blocked
*/
public void close(Function<T, T> function) throws InterruptedException;
void close(Function<T, T> function) throws InterruptedException;
/**
* Get a {@link BufferedBlockingConsumer} that buffers just the last value seen by the consumer.
@ -40,7 +40,7 @@ public interface BufferedBlockingConsumer<T> extends BlockingConsumer<T> {
* @param delegate the delegate to which values should be flushed; may not be null
* @return the blocking consumer that buffers a single value at a time; never null
*/
public static <T> BufferedBlockingConsumer<T> bufferLast(BlockingConsumer<T> delegate) {
static <T> BufferedBlockingConsumer<T> bufferLast(BlockingConsumer<T> delegate) {
return new BufferedBlockingConsumer<T>() {
private final AtomicReference<T> last = new AtomicReference<>();

View File

@ -30,45 +30,45 @@ public interface JdbcConfiguration extends Configuration {
/**
* A field for the name of the database. This field has no default value.
*/
public static final Field DATABASE = Field.create("dbname",
Field DATABASE = Field.create("dbname",
"Name of the database");
/**
* A field for the user of the database. This field has no default value.
*/
public static final Field USER = Field.create("user",
Field USER = Field.create("user",
"Name of the database user to be used when connecting to the database");
/**
* A field for the password of the database. This field has no default value.
*/
public static final Field PASSWORD = Field.create("password",
Field PASSWORD = Field.create("password",
"Password to be used when connecting to the database");
/**
* A field for the hostname of the database server. This field has no default value.
*/
public static final Field HOSTNAME = Field.create("hostname", "IP address of the database");
Field HOSTNAME = Field.create("hostname", "IP address of the database");
/**
* A field for the port of the database server. There is no default value.
*/
public static final Field PORT = Field.create("port", "Port of the database");
Field PORT = Field.create("port", "Port of the database");
/**
* A semicolon separated list of SQL statements to be executed when the connection to database is established.
* Typical use-case is setting of session parameters. There is no default value.
*/
public static final Field ON_CONNECT_STATEMENTS = Field.create("initial.statements", "A semicolon separated list of statements to be executed on connection");
Field ON_CONNECT_STATEMENTS = Field.create("initial.statements", "A semicolon separated list of statements to be executed on connection");
/**
* An optional field for datasource factory class that will be used to build the datasource connection pool.
*/
public static final Field CONNECTION_FACTORY_CLASS = Field.create("connection.factory.class")
Field CONNECTION_FACTORY_CLASS = Field.create("connection.factory.class")
.withDisplayName("Connection factory class")
.withDescription(
"(Incubating) The factory class for creation of datasource connection pool; the FQN of an implementation of io.debezium.jdbc.JdbcConnection.ConnectionFactory must be given.")
.withType(Type.CLASS)
.withValidation(Field::isOptional);
public static final Field CONNECTION_TIMEOUT_MS = Field.create("connection.timeout.ms")
Field CONNECTION_TIMEOUT_MS = Field.create("connection.timeout.ms")
.withDisplayName("Time to wait for a connection from the pool, given in milliseconds. Defaults to 600 seconds (600,000 ms).")
.withType(Type.INT)
.withDefault(600000)
@ -78,7 +78,7 @@ public interface JdbcConfiguration extends Configuration {
* The set of names of the pre-defined JDBC configuration fields, including {@link #DATABASE}, {@link #USER},
* {@link #PASSWORD}, {@link #HOSTNAME}, and {@link #PORT}.
*/
public static Set<String> ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS,
Set<String> ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS,
CONNECTION_FACTORY_CLASS, CONNECTION_TIMEOUT_MS);
/**
@ -87,7 +87,7 @@ public interface JdbcConfiguration extends Configuration {
* @param config the configuration; may not be null
* @return the ClientConfiguration; never null
*/
public static JdbcConfiguration adapt(Configuration config) {
static JdbcConfiguration adapt(Configuration config) {
if (config instanceof JdbcConfiguration) {
return (JdbcConfiguration) config;
}
@ -109,7 +109,7 @@ public String toString() {
};
}
public static JdbcConfiguration empty() {
static JdbcConfiguration empty() {
return JdbcConfiguration.adapt(Configuration.empty());
}
@ -119,7 +119,7 @@ public static JdbcConfiguration empty() {
* @see JdbcConfiguration#copy(Configuration)
* @see JdbcConfiguration#create()
*/
public static interface Builder extends Configuration.ConfigBuilder<JdbcConfiguration, Builder> {
interface Builder extends Configuration.ConfigBuilder<JdbcConfiguration, Builder> {
/**
* Use the given user in the resulting configuration.
*
@ -197,7 +197,7 @@ default Builder withConnectionTimeoutMs(int connectionTimeoutMs) {
* @param config the configuration to copy
* @return the configuration builder
*/
public static Builder copy(Configuration config) {
static Builder copy(Configuration config) {
return new Builder() {
private Configuration.Builder builder = Configuration.copy(config);
@ -248,7 +248,7 @@ public String toString() {
*
* @return the configuration builder
*/
public static Builder create() {
static Builder create() {
return new Builder() {
private Configuration.Builder builder = Configuration.create();

View File

@ -101,7 +101,7 @@ public void onEntryChosenForEviction(PreparedStatement statement) {
*/
@FunctionalInterface
@ThreadSafe
public static interface ConnectionFactory {
public interface ConnectionFactory {
/**
* Establish a connection to the database denoted by the given configuration.
*
@ -136,7 +136,7 @@ public Connection connect(JdbcConfiguration config) throws SQLException {
* Defines multiple JDBC operations.
*/
@FunctionalInterface
public static interface Operations {
public interface Operations {
/**
* Apply a series of operations against the given JDBC statement.
*
@ -150,7 +150,7 @@ public static interface Operations {
* Extracts a data of resultset..
*/
@FunctionalInterface
public static interface ResultSetExtractor<T> {
public interface ResultSetExtractor<T> {
T apply(ResultSet rs) throws SQLException;
}
@ -434,36 +434,36 @@ public JdbcConnection execute(Operations operations) throws SQLException {
return this;
}
public static interface ResultSetConsumer {
public interface ResultSetConsumer {
void accept(ResultSet rs) throws SQLException;
}
public static interface ResultSetMapper<T> {
public interface ResultSetMapper<T> {
T apply(ResultSet rs) throws SQLException;
}
public static interface BlockingResultSetConsumer {
public interface BlockingResultSetConsumer {
void accept(ResultSet rs) throws SQLException, InterruptedException;
}
public static interface ParameterResultSetConsumer {
public interface ParameterResultSetConsumer {
void accept(List<?> parameters, ResultSet rs) throws SQLException;
}
public static interface MultiResultSetConsumer {
public interface MultiResultSetConsumer {
void accept(ResultSet[] rs) throws SQLException;
}
public static interface BlockingMultiResultSetConsumer {
public interface BlockingMultiResultSetConsumer {
void accept(ResultSet[] rs) throws SQLException, InterruptedException;
}
public static interface StatementPreparer {
public interface StatementPreparer {
void accept(PreparedStatement statement) throws SQLException;
}
@FunctionalInterface
public static interface CallPreparer {
public interface CallPreparer {
void accept(CallableStatement statement) throws SQLException;
}
@ -1153,7 +1153,7 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
Map<TableId, List<Attribute>> attributesByTable = new HashMap<>();
int totalTables = 0;
try (final ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, null, supportedTableTypes())) {
try (ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, null, supportedTableTypes())) {
while (rs.next()) {
final String catalogName = resolveCatalogName(rs.getString(1));
final String schemaName = rs.getString(2);

View File

@ -16,22 +16,22 @@ public interface ResultReceiver {
* Send the object to the receiver.
* @param o - object to be delivered
*/
public void deliver(Object o);
void deliver(Object o);
/**
* @return true if a value has been sent to the receiver
*/
public boolean hasReceived();
boolean hasReceived();
/**
* @return the object sent to the receiver
*/
public Object get();
Object get();
/**
* @return default, not thread-safe implementation of the receiver
*/
public static ResultReceiver create() {
static ResultReceiver create() {
return new ResultReceiver() {
private boolean received = false;
private Object object = null;

View File

@ -496,7 +496,7 @@ private final class IncrementalSnapshotChangeRecordReceiver implements SnapshotR
public final DataChangeEventListener<P> dataListener;
public IncrementalSnapshotChangeRecordReceiver(DataChangeEventListener<P> dataListener) {
IncrementalSnapshotChangeRecordReceiver(DataChangeEventListener<P> dataListener) {
this.dataListener = dataListener;
}

View File

@ -153,7 +153,7 @@ public boolean deduplicationNeeded() {
}
private String arrayToSerializedString(Object[] array) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(array);
return HexConverter.convertToHexString(bos.toByteArray());
@ -164,7 +164,7 @@ private String arrayToSerializedString(Object[] array) {
}
private Object[] serializedStringToArray(String field, String serialized) {
try (final ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
try (ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
ObjectInputStream ois = new ObjectInputStream(bis)) {
return (Object[]) ois.readObject();
}

View File

@ -7,7 +7,7 @@
public interface ChangeEventSource {
public interface ChangeEventSourceContext {
interface ChangeEventSourceContext {
/**
* Whether this source is running or has been requested to stop.

View File

@ -17,7 +17,7 @@ public interface SchemaChangeEventEmitter {
void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException;
public interface Receiver {
interface Receiver {
void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException;
}
}

View File

@ -39,7 +39,7 @@ public O getOffset() {
return offset;
}
public static enum SnapshotResultStatus {
public enum SnapshotResultStatus {
COMPLETED,
ABORTED,
SKIPPED

View File

@ -19,7 +19,7 @@ final class AttributeImpl implements Attribute {
private final String name;
private final String value;
public AttributeImpl(String name, String value) {
AttributeImpl(String name, String value) {
this.name = name;
this.value = value;
}

View File

@ -27,7 +27,7 @@ public interface Column extends Comparable<Column> {
*
* @return the editor; never null
*/
public static ColumnEditor editor() {
static ColumnEditor editor() {
return new ColumnEditorImpl();
}

View File

@ -40,7 +40,7 @@ static DefaultValueConverter passthrough() {
* Converts the raw JDBC default value expression for a column into an object.
*/
@FunctionalInterface
public interface DefaultValueMapper {
interface DefaultValueMapper {
/**
* Parses the string-representation of the default value to an object.
*

View File

@ -67,7 +67,7 @@ public Key build() {
* Provides the column(s) that should be used within the message key for a given table.
*/
@FunctionalInterface
public static interface KeyMapper {
public interface KeyMapper {
/**
* @param table {@code Table}

View File

@ -136,7 +136,7 @@ private static class SchemasByTableId {
private final boolean tableIdCaseInsensitive;
private final ConcurrentMap<TableId, TableSchema> values;
public SchemasByTableId(boolean tableIdCaseInsensitive) {
SchemasByTableId(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
this.values = new ConcurrentHashMap<>();
}

View File

@ -46,7 +46,7 @@ private static boolean isEmpty(String value) {
* purpose of table filtering.
*/
@FunctionalInterface
public static interface TableIdToStringMapper {
public interface TableIdToStringMapper {
String toString(TableId tableId);
}

View File

@ -53,7 +53,7 @@ private static class TableIdTokenizer implements Tokenizer {
private final String identifier;
private final TableIdPredicates predicates;
public TableIdTokenizer(String identifier, TableIdPredicates predicates) {
TableIdTokenizer(String identifier, TableIdPredicates predicates) {
this.identifier = identifier;
this.predicates = predicates;
}
@ -269,7 +269,7 @@ private static class ParsingContext {
boolean escaped;
char quotingChar;
public ParsingContext(CharacterStream input, Tokens tokens, TableIdPredicates predicates) {
ParsingContext(CharacterStream input, Tokens tokens, TableIdPredicates predicates) {
this.input = input;
this.tokens = tokens;
this.predicates = predicates;

View File

@ -48,14 +48,14 @@ public interface TableFilter extends DataCollectionFilter<TableId> {
/**
* Creates a {@link TableFilter} from the given predicate.
*/
public static TableFilter fromPredicate(Predicate<TableId> predicate) {
static TableFilter fromPredicate(Predicate<TableId> predicate) {
return t -> predicate.test(t);
}
/**
* Creates a {@link TableFilter} that includes all tables.
*/
public static TableFilter includeAll() {
static TableFilter includeAll() {
return t -> true;
}
}
@ -420,7 +420,7 @@ private static class TablesById {
private final boolean tableIdCaseInsensitive;
private final ConcurrentMap<TableId, Table> values;
public TablesById(boolean tableIdCaseInsensitive) {
TablesById(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
this.values = new ConcurrentHashMap<>();
}
@ -505,7 +505,7 @@ private static class TableIds {
private final boolean tableIdCaseInsensitive;
private final Set<TableId> values;
public TableIds(boolean tableIdCaseInsensitive) {
TableIds(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
this.values = new HashSet<>();
}

View File

@ -70,7 +70,7 @@ default ValueConverter nullOr() {
*
* @return the pass-through {@link ValueConverter}; never null
*/
public static ValueConverter passthrough() {
static ValueConverter passthrough() {
return (data) -> data;
}
}

View File

@ -22,7 +22,7 @@ public interface ValueConverterProvider {
* @param columnDefinition the column definition; never null
* @return the schema builder; null if the column's type information is unknown
*/
public SchemaBuilder schemaBuilder(Column columnDefinition);
SchemaBuilder schemaBuilder(Column columnDefinition);
/**
* Returns a {@link ValueConverter} that can be used to convert the JDBC values corresponding to the given JDBC temporal type
@ -36,5 +36,5 @@ public interface ValueConverterProvider {
* never null
* @return the value converter; never null
*/
public ValueConverter converter(Column columnDefinition, Field fieldDefn);
ValueConverter converter(Column columnDefinition, Field fieldDefn);
}

View File

@ -115,7 +115,7 @@ public String toString() {
return events.toString();
}
public static interface DatabaseEventConsumer {
public interface DatabaseEventConsumer {
void consume(String databaseName, List<Event> events);
}

View File

@ -32,7 +32,7 @@ public interface DdlParserListener {
/**
* The type of concrete {@link Event}s.
*/
public static enum EventType {
enum EventType {
CREATE_TABLE,
ALTER_TABLE,
DROP_TABLE,
@ -50,7 +50,7 @@ public static enum EventType {
* The base class for all concrete events.
*/
@Immutable
public static abstract class Event {
abstract class Event {
private final String statement;
private final EventType type;
@ -80,7 +80,7 @@ public String statement() {
* The base class for all table-related events.
*/
@Immutable
public static abstract class TableEvent extends Event {
abstract class TableEvent extends Event {
private final TableId tableId;
private final boolean isView;
@ -116,7 +116,7 @@ public String toString() {
* An event describing the creation (or replacement) of a table.
*/
@Immutable
public static class TableCreatedEvent extends TableEvent {
class TableCreatedEvent extends TableEvent {
public TableCreatedEvent(TableId tableId, String ddlStatement, boolean isView) {
super(EventType.CREATE_TABLE, tableId, ddlStatement, isView);
}
@ -126,7 +126,7 @@ public TableCreatedEvent(TableId tableId, String ddlStatement, boolean isView) {
* An event describing the altering of a table.
*/
@Immutable
public static class TableAlteredEvent extends TableEvent {
class TableAlteredEvent extends TableEvent {
private final TableId previousTableId;
public TableAlteredEvent(TableId tableId, TableId previousTableId, String ddlStatement, boolean isView) {
@ -155,7 +155,7 @@ public String toString() {
* An event describing the dropping of a table.
*/
@Immutable
public static class TableDroppedEvent extends TableEvent {
class TableDroppedEvent extends TableEvent {
public TableDroppedEvent(TableId tableId, String ddlStatement, boolean isView) {
super(EventType.DROP_TABLE, tableId, ddlStatement, isView);
}
@ -165,7 +165,7 @@ public TableDroppedEvent(TableId tableId, String ddlStatement, boolean isView) {
* An event describing the truncating of a table.
*/
@Immutable
public static class TableTruncatedEvent extends TableEvent {
class TableTruncatedEvent extends TableEvent {
public TableTruncatedEvent(TableId tableId, String ddlStatement, boolean isView) {
super(EventType.TRUNCATE_TABLE, tableId, ddlStatement, isView);
}
@ -175,7 +175,7 @@ public TableTruncatedEvent(TableId tableId, String ddlStatement, boolean isView)
* The abstract base class for all index-related events.
*/
@Immutable
public static abstract class TableIndexEvent extends Event {
abstract class TableIndexEvent extends Event {
private final TableId tableId;
private final String indexName;
@ -214,7 +214,7 @@ public String toString() {
* An event describing the creation of an index on a table.
*/
@Immutable
public static class TableIndexCreatedEvent extends TableIndexEvent {
class TableIndexCreatedEvent extends TableIndexEvent {
public TableIndexCreatedEvent(String indexName, TableId tableId, String ddlStatement) {
super(EventType.CREATE_INDEX, indexName, tableId, ddlStatement);
}
@ -224,7 +224,7 @@ public TableIndexCreatedEvent(String indexName, TableId tableId, String ddlState
* An event describing the dropping of an index on a table.
*/
@Immutable
public static class TableIndexDroppedEvent extends TableIndexEvent {
class TableIndexDroppedEvent extends TableIndexEvent {
public TableIndexDroppedEvent(String indexName, TableId tableId, String ddlStatement) {
super(EventType.DROP_INDEX, indexName, tableId, ddlStatement);
}
@ -234,7 +234,7 @@ public TableIndexDroppedEvent(String indexName, TableId tableId, String ddlState
* The base class for all table-related events.
*/
@Immutable
public static abstract class DatabaseEvent extends Event {
abstract class DatabaseEvent extends Event {
private final String databaseName;
public DatabaseEvent(EventType type, String databaseName, String ddlStatement) {
@ -260,7 +260,7 @@ public String toString() {
* An event describing the creation of a database.
*/
@Immutable
public static class DatabaseCreatedEvent extends DatabaseEvent {
class DatabaseCreatedEvent extends DatabaseEvent {
public DatabaseCreatedEvent(String databaseName, String ddlStatement) {
super(EventType.CREATE_DATABASE, databaseName, ddlStatement);
}
@ -270,7 +270,7 @@ public DatabaseCreatedEvent(String databaseName, String ddlStatement) {
* An event describing the altering of a database.
*/
@Immutable
public static class DatabaseAlteredEvent extends DatabaseEvent {
class DatabaseAlteredEvent extends DatabaseEvent {
private final String previousDatabaseName;
public DatabaseAlteredEvent(String databaseName, String previousDatabaseName, String ddlStatement) {
@ -299,7 +299,7 @@ public String toString() {
* An event describing the dropping of a database.
*/
@Immutable
public static class DatabaseDroppedEvent extends DatabaseEvent {
class DatabaseDroppedEvent extends DatabaseEvent {
public DatabaseDroppedEvent(String databaseName, String ddlStatement) {
super(EventType.DROP_DATABASE, databaseName, ddlStatement);
}
@ -309,7 +309,7 @@ public DatabaseDroppedEvent(String databaseName, String ddlStatement) {
* An event describing the switching of a database.
*/
@Immutable
public static class DatabaseSwitchedEvent extends DatabaseEvent {
class DatabaseSwitchedEvent extends DatabaseEvent {
public DatabaseSwitchedEvent(String databaseName, String ddlStatement) {
super(EventType.USE_DATABASE, databaseName, ddlStatement);
}
@ -319,7 +319,7 @@ public DatabaseSwitchedEvent(String databaseName, String ddlStatement) {
* An event describing the setting of a variable.
*/
@Immutable
public static class SetVariableEvent extends Event {
class SetVariableEvent extends Event {
private final String variableName;
private final String value;

View File

@ -33,9 +33,9 @@
*/
public interface SchemaHistory {
public static final String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal.";
String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal.";
public static final Field NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name")
Field NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name")
.withDisplayName("Logical name for the database schema history")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
@ -43,7 +43,7 @@ public interface SchemaHistory {
.withDescription("The name used for the database schema history, perhaps differently by each implementation.")
.withValidation(Field::isOptional);
public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "skip.unparseable.ddl")
Field SKIP_UNPARSEABLE_DDL_STATEMENTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "skip.unparseable.ddl")
.withDisplayName("Skip DDL statements that cannot be parsed")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
@ -53,7 +53,7 @@ public interface SchemaHistory {
+ "which it cannot parse. If skipping is enabled then Debezium can miss metadata changes.")
.withDefault(false);
public static final Field STORE_ONLY_CAPTURED_TABLES_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "store.only.captured.tables.ddl")
Field STORE_ONLY_CAPTURED_TABLES_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "store.only.captured.tables.ddl")
.withDisplayName("Store only DDL that modifies tables that are captured based on include/exclude lists")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
@ -63,7 +63,7 @@ public interface SchemaHistory {
+ "then only DDL that manipulates a captured table will be stored.")
.withDefault(false);
public static final Field DDL_FILTER = Field.createInternal(CONFIGURATION_FIELD_PREFIX_STRING + "ddl.filter")
Field DDL_FILTER = Field.createInternal(CONFIGURATION_FIELD_PREFIX_STRING + "ddl.filter")
.withDisplayName("DDL filter")
.withType(Type.STRING)
.withDefault(

View File

@ -12,29 +12,29 @@
*
*/
public interface SchemaHistoryListener {
public void started();
void started();
public void stopped();
void stopped();
public void recoveryStarted();
void recoveryStarted();
public void recoveryStopped();
void recoveryStopped();
/**
* Invoked for every change read from the history during recovery.
*
* @param record
*/
public void onChangeFromHistory(HistoryRecord record);
void onChangeFromHistory(HistoryRecord record);
/**
* Invoked for every change applied and not filtered.
*
* @param record
*/
public void onChangeApplied(HistoryRecord record);
void onChangeApplied(HistoryRecord record);
static SchemaHistoryListener NOOP = new SchemaHistoryListener() {
SchemaHistoryListener NOOP = new SchemaHistoryListener() {
@Override
public void stopped() {
}

View File

@ -32,7 +32,7 @@ public class SchemaHistoryMetrics extends Metrics implements SchemaHistoryListen
private static final Logger LOGGER = LoggerFactory.getLogger(SchemaHistoryMetrics.class);
public static enum SchemaHistoryStatus {
public enum SchemaHistoryStatus {
STOPPED,
RECOVERING,
RUNNING

View File

@ -186,7 +186,7 @@ public String toString() {
*
* @param <T> target type
*/
public static interface TableChangesSerializer<T> {
public interface TableChangesSerializer<T> {
T serialize(TableChanges tableChanges);

View File

@ -14,10 +14,10 @@
*/
public interface DataCollectionFilters {
public DataCollectionFilter<?> dataCollectionFilter();
DataCollectionFilter<?> dataCollectionFilter();
@FunctionalInterface
public interface DataCollectionFilter<T extends DataCollectionId> {
interface DataCollectionFilter<T extends DataCollectionId> {
boolean isIncluded(T id);
}
}

View File

@ -46,7 +46,7 @@ private static class FieldNameSanitizer<T> implements FieldNamer<T> {
private final FieldNamer<T> delegate;
public FieldNameSanitizer(FieldNamer<T> delegate) {
FieldNameSanitizer(FieldNamer<T> delegate) {
this.delegate = delegate;
}
@ -101,7 +101,7 @@ private static class FieldNameCache<T> implements FieldNamer<T> {
private final BoundedConcurrentHashMap<T, String> fieldNames;
private final FieldNamer<T> delegate;
public FieldNameCache(FieldNamer<T> delegate) {
FieldNameCache(FieldNamer<T> delegate) {
this.fieldNames = new BoundedConcurrentHashMap<>(10_000, 10, Eviction.LRU);
this.delegate = delegate;
}

View File

@ -25,11 +25,11 @@
public interface HistorizedDatabaseSchema<I extends DataCollectionId> extends DatabaseSchema<I> {
@FunctionalInterface
public static interface SchemaChangeEventConsumer {
interface SchemaChangeEventConsumer {
void consume(SchemaChangeEvent event, Collection<TableId> tableIds);
static SchemaChangeEventConsumer NOOP = (x, y) -> {
SchemaChangeEventConsumer NOOP = (x, y) -> {
};
}

View File

@ -136,7 +136,7 @@ public String toString() {
* CREATE, ALTER, DROP, TRUNCATE - corresponds to table operations
* DATABASE - an event common to the database, like CREATE/DROP DATABASE or SET...
*/
public static enum SchemaChangeEventType {
public enum SchemaChangeEventType {
CREATE,
ALTER,
DROP,

View File

@ -106,7 +106,7 @@ private static class TopicNameSanitizer<I extends DataCollectionId> implements D
private final DataCollectionTopicNamer<I> delegate;
public TopicNameSanitizer(DataCollectionTopicNamer<I> delegate) {
TopicNameSanitizer(DataCollectionTopicNamer<I> delegate) {
this.delegate = delegate;
}
@ -159,7 +159,7 @@ private static class TopicNameCache<I extends DataCollectionId> implements DataC
private final BoundedConcurrentHashMap<I, String> topicNames;
private final DataCollectionTopicNamer<I> delegate;
public TopicNameCache(DataCollectionTopicNamer<I> delegate) {
TopicNameCache(DataCollectionTopicNamer<I> delegate) {
this.topicNames = new BoundedConcurrentHashMap<>(10_000, 10, Eviction.LRU);
this.delegate = delegate;
}

View File

@ -2038,7 +2038,7 @@ static String generateFragment(String content,
* Interface for a Tokenizer component responsible for processing the characters in a {@link CharacterStream} and constructing
* the appropriate {@link Token} objects.
*/
public static interface Tokenizer {
public interface Tokenizer {
/**
* Process the supplied characters and construct the appropriate {@link Token} objects.
*
@ -2054,7 +2054,7 @@ void tokenize(CharacterStream input,
/**
* Interface used by a {@link Tokenizer} to iterate through the characters in the content input to the {@link TokenStream}.
*/
public static interface CharacterStream {
public interface CharacterStream {
/**
* Determine if there is another character available in this stream.
@ -2194,7 +2194,7 @@ boolean isNext(char nextChar,
/**
* A factory for Token objects, used by a {@link Tokenizer} to create tokens in the correct order.
*/
public static interface Tokens {
public interface Tokens {
/**
* Create a single-character token at the supplied index in the character stream. The token type is set to 0, meaning this
* is equivalent to calling <code>addToken(index,index+1)</code> or <code>addToken(index,index+1,0)</code>.

View File

@ -16,14 +16,14 @@ public class ExtractNewRecordStateConfigDefinition {
public static final String DELETED_FIELD = "__deleted";
public static final String METADATA_FIELD_PREFIX = "__";
public static enum DeleteHandling implements EnumeratedValue {
public enum DeleteHandling implements EnumeratedValue {
DROP("drop"),
REWRITE("rewrite"),
NONE("none");
private final String value;
private DeleteHandling(String value) {
DeleteHandling(String value) {
this.value = value;
}

View File

@ -48,7 +48,7 @@
public class EventRouterDelegate<R extends ConnectRecord<R>> {
@FunctionalInterface
public static interface RecordConverter<R> {
public interface RecordConverter<R> {
R convert(R record);
}

View File

@ -180,7 +180,7 @@ private R traceRecord(R record, Struct envelope, Struct source, Struct after, St
final Span txLogSpan = txLogSpanBuilder.start();
debeziumSpanBuilder.asChildOf(txLogSpan);
final Span debeziumSpan = debeziumSpanBuilder.start();
try (final Scope debeziumScope = tracer.scopeManager().activate(debeziumSpan)) {
try (Scope debeziumScope = tracer.scopeManager().activate(debeziumSpan)) {
Tags.COMPONENT.set(txLogSpan, TRACING_COMPONENT);
Tags.COMPONENT.set(debeziumSpan, TRACING_COMPONENT);
if (eventTimestamp != null) {

View File

@ -41,7 +41,7 @@ public void put(String key, String value) {
}
public String export() {
try (final Writer sw = new StringWriter()) {
try (Writer sw = new StringWriter()) {
props.store(sw, null);
return sw.toString();
}
@ -51,7 +51,7 @@ public String export() {
}
public void load(String span) {
try (final Reader sr = new StringReader(span)) {
try (Reader sr = new StringReader(span)) {
props.load(sr);
}
catch (IOException e) {

View File

@ -304,7 +304,7 @@ public void onEntryChosenForEviction(V internalCacheEntry) {
public interface EvictionPolicy<K, V> {
public final static int MAX_BATCH_SIZE = 64;
int MAX_BATCH_SIZE = 64;
HashEntry<K, V> createNewEntry(K key, int hash, HashEntry<K, V> next, V value);

View File

@ -18,7 +18,7 @@ public interface Clock {
/**
* The {@link Clock} instance that uses the {@link System} methods.
*/
static final Clock SYSTEM = new Clock() {
Clock SYSTEM = new Clock() {
@Override
public long currentTimeInMillis() {
return System.currentTimeMillis();
@ -76,6 +76,6 @@ default Instant currentTimeAsInstant() {
* Get the current time in milliseconds.
* @return the current time in milliseconds.
*/
public long currentTimeInMillis();
long currentTimeInMillis();
}

View File

@ -39,7 +39,7 @@ default boolean sleepWhen(BooleanSupplier criteria) {
*
* @return the strategy; never null
*/
public static DelayStrategy none() {
static DelayStrategy none() {
return (criteria) -> false;
}
@ -50,7 +50,7 @@ public static DelayStrategy none() {
* @param delay the initial delay; must be positive
* @return the strategy; never null
*/
public static DelayStrategy constant(Duration delay) {
static DelayStrategy constant(Duration delay) {
long delayInMilliseconds = delay.toMillis();
return (criteria) -> {
@ -74,7 +74,7 @@ public static DelayStrategy constant(Duration delay) {
* @param delay the initial delay; must be positive
* @return the strategy; never null
*/
public static DelayStrategy linear(Duration delay) {
static DelayStrategy linear(Duration delay) {
long delayInMilliseconds = delay.toMillis();
if (delayInMilliseconds <= 0) {
throw new IllegalArgumentException("Initial delay must be positive");
@ -110,7 +110,7 @@ public boolean sleepWhen(boolean criteria) {
* @param maxDelay the maximum delay; must be greater than the initial delay
* @return the strategy; never null
*/
public static DelayStrategy exponential(Duration initialDelay, Duration maxDelay) {
static DelayStrategy exponential(Duration initialDelay, Duration maxDelay) {
return exponential(initialDelay, maxDelay, 2.0);
}
@ -123,7 +123,7 @@ public static DelayStrategy exponential(Duration initialDelay, Duration maxDelay
* @param backOffMultiplier the factor by which the delay increases each pass
* @return the strategy
*/
public static DelayStrategy exponential(Duration initialDelay, Duration maxDelay, double backOffMultiplier) {
static DelayStrategy exponential(Duration initialDelay, Duration maxDelay, double backOffMultiplier) {
final long initialDelayInMilliseconds = initialDelay.toMillis();
final long maxDelayInMilliseconds = maxDelay.toMillis();
if (backOffMultiplier <= 1.0) {

View File

@ -28,7 +28,7 @@ public interface ElapsedTimeStrategy {
*
* @return the strategy; never null
*/
public static ElapsedTimeStrategy none() {
static ElapsedTimeStrategy none() {
return () -> true;
}
@ -39,7 +39,7 @@ public static ElapsedTimeStrategy none() {
* @param delayInMilliseconds the time period; must be positive
* @return the strategy; never null
*/
public static ElapsedTimeStrategy constant(Clock clock, long delayInMilliseconds) {
static ElapsedTimeStrategy constant(Clock clock, long delayInMilliseconds) {
if (delayInMilliseconds <= 0) {
throw new IllegalArgumentException("Initial delay must be positive");
}
@ -83,10 +83,10 @@ static ElapsedTimeStrategy constant(Clock clock, Duration delay) {
* @param postStepDelay the time period before the step has occurred; must be positive
* @return the strategy; never null
*/
public static ElapsedTimeStrategy step(Clock clock,
Duration preStepDelay,
BooleanSupplier stepFunction,
Duration postStepDelay) {
static ElapsedTimeStrategy step(Clock clock,
Duration preStepDelay,
BooleanSupplier stepFunction,
Duration postStepDelay) {
long preStepDelayinMillis = preStepDelay.toMillis();
long postStepDelayinMillis = postStepDelay.toMillis();
if (preStepDelayinMillis <= 0) {
@ -136,7 +136,7 @@ public boolean hasElapsed() {
* @param delay the initial delay; must be positive
* @return the strategy; never null
*/
public static ElapsedTimeStrategy linear(Clock clock, Duration delay) {
static ElapsedTimeStrategy linear(Clock clock, Duration delay) {
long delayInMilliseconds = delay.toMillis();
if (delayInMilliseconds <= 0) {
throw new IllegalArgumentException("Initial delay must be positive");
@ -176,9 +176,9 @@ public boolean hasElapsed() {
* @param maxDelay the maximum delay; must be greater than the initial delay
* @return the strategy; never null
*/
public static ElapsedTimeStrategy exponential(Clock clock,
Duration initialDelay,
Duration maxDelay) {
static ElapsedTimeStrategy exponential(Clock clock,
Duration initialDelay,
Duration maxDelay) {
return exponential(clock, initialDelay.toMillis(), maxDelay.toMillis(), 2.0);
}
@ -191,10 +191,10 @@ public static ElapsedTimeStrategy exponential(Clock clock,
* @param multiplier the factor by which the delay increases each pass
* @return the strategy
*/
public static ElapsedTimeStrategy exponential(Clock clock,
long initialDelayInMilliseconds,
long maxDelayInMilliseconds,
double multiplier) {
static ElapsedTimeStrategy exponential(Clock clock,
long initialDelayInMilliseconds,
long maxDelayInMilliseconds,
double multiplier) {
if (multiplier <= 1.0) {
throw new IllegalArgumentException("Multiplier must be greater than 1");
}

View File

@ -353,7 +353,7 @@ public Iterator<T> iterator() {
* @param <F> the source transform type
* @param <T> the destination transform type
*/
public static interface TransformedIterator<F, T> extends Iterator<T> {
public interface TransformedIterator<F, T> extends Iterator<T> {
T transform(F from);
}
@ -398,7 +398,7 @@ public T transform(F from) {
*
* @param <T> the type of value
*/
public static interface PreviewIterator<T> extends Iterator<T> {
public interface PreviewIterator<T> extends Iterator<T> {
/**
* Peek at the next value without consuming or using it. This method returns the same value if called multiple times
* between {@link Iterator#next}.

View File

@ -23,7 +23,7 @@ public interface Metronome {
*
* @throws InterruptedException if the thread was interrupted while pausing
*/
public void pause() throws InterruptedException;
void pause() throws InterruptedException;
/**
* Create a new metronome that starts ticking immediately and that uses {@link Thread#sleep(long)} to wait.
@ -42,7 +42,7 @@ public interface Metronome {
* @param timeSystem the time system that will provide the current time; may not be null
* @return the new metronome; never null
*/
public static Metronome sleeper(Duration period, Clock timeSystem) {
static Metronome sleeper(Duration period, Clock timeSystem) {
long periodInMillis = period.toMillis();
return new Metronome() {
private long next = timeSystem.currentTimeInMillis() + periodInMillis;
@ -81,7 +81,7 @@ public String toString() {
* @param timeSystem the time system that will provide the current time; may not be null
* @return the new metronome; never null
*/
public static Metronome parker(Duration period, Clock timeSystem) {
static Metronome parker(Duration period, Clock timeSystem) {
long periodInNanos = period.toNanos();
return new Metronome() {
private long next = timeSystem.currentTimeInNanos() + periodInNanos;

View File

@ -38,7 +38,7 @@
@ThreadSafe
public interface SchemaNameAdjuster {
public static final Logger LOGGER = LoggerFactory.getLogger(SchemaNameAdjuster.class);
Logger LOGGER = LoggerFactory.getLogger(SchemaNameAdjuster.class);
/**
* Convert the proposed string to a valid Avro fullname, replacing all invalid characters with the underscore ('_')
@ -54,7 +54,7 @@ public interface SchemaNameAdjuster {
*/
@FunctionalInterface
@ThreadSafe
public static interface ReplacementFunction {
interface ReplacementFunction {
/**
* Determine the replacement string for the invalid character.
*
@ -69,7 +69,7 @@ public static interface ReplacementFunction {
*/
@FunctionalInterface
@ThreadSafe
public static interface ReplacementOccurred {
interface ReplacementOccurred {
/**
* Accept that the original value was not Avro-compatible and was replaced.
*
@ -134,7 +134,7 @@ default ReplacementOccurred andThen(ReplacementOccurred next) {
*
* @return the validator; never null
*/
public static SchemaNameAdjuster avroAdjuster() {
static SchemaNameAdjuster avroAdjuster() {
return AVRO;
}
@ -143,7 +143,7 @@ public static SchemaNameAdjuster avroAdjuster() {
* with a valid fullname, and throws an error if the replacement conflicts with that of a different original. This method
* replaces all invalid characters with the underscore character ('_').
*/
public static SchemaNameAdjuster create() {
static SchemaNameAdjuster create() {
return create((original, replacement, conflict) -> {
String msg = "The Kafka Connect schema name '" + original +
"' is not a valid Avro schema name and its replacement '" + replacement +
@ -159,7 +159,7 @@ public static SchemaNameAdjuster create() {
* @param uponConflict the function to be called when there is a conflict and after that conflict is logged; may be null
* @return the validator; never null
*/
public static SchemaNameAdjuster create(ReplacementOccurred uponConflict) {
static SchemaNameAdjuster create(ReplacementOccurred uponConflict) {
ReplacementOccurred handler = (original, replacement, conflictsWith) -> {
if (conflictsWith != null) {
LOGGER.error("The Kafka Connect schema name '{}' is not a valid Avro schema name and its replacement '{}' conflicts with another different schema '{}'",
@ -184,7 +184,7 @@ public static SchemaNameAdjuster create(ReplacementOccurred uponConflict) {
* @param uponReplacement the function called each time the original fullname is replaced; may be null
* @return the adjuster; never null
*/
public static SchemaNameAdjuster create(char replacement, ReplacementOccurred uponReplacement) {
static SchemaNameAdjuster create(char replacement, ReplacementOccurred uponReplacement) {
String replacementStr = "" + replacement;
return (original) -> validFullname(original, c -> replacementStr, uponReplacement);
}
@ -197,7 +197,7 @@ public static SchemaNameAdjuster create(char replacement, ReplacementOccurred up
* @param uponReplacement the function called each time the original fullname is replaced; may be null
* @return the adjuster; never null
*/
public static SchemaNameAdjuster create(String replacement, ReplacementOccurred uponReplacement) {
static SchemaNameAdjuster create(String replacement, ReplacementOccurred uponReplacement) {
return (original) -> validFullname(original, c -> replacement, uponReplacement);
}
@ -209,7 +209,7 @@ public static SchemaNameAdjuster create(String replacement, ReplacementOccurred
* @param uponReplacement the function called each time the original fullname is replaced; may be null
* @return the adjuster; never null
*/
public static SchemaNameAdjuster create(ReplacementFunction function, ReplacementOccurred uponReplacement) {
static SchemaNameAdjuster create(ReplacementFunction function, ReplacementOccurred uponReplacement) {
return (original) -> validFullname(original, function, uponReplacement);
}
@ -219,7 +219,7 @@ public static SchemaNameAdjuster create(ReplacementFunction function, Replacemen
* @param fullname the name to be used as an Avro fullname; may not be null
* @return {@code true} if the fullname satisfies Avro rules, or {@code false} otherwise
*/
public static boolean isValidFullname(String fullname) {
static boolean isValidFullname(String fullname) {
if (fullname.length() == 0) {
return true;
}
@ -243,7 +243,7 @@ public static boolean isValidFullname(String fullname) {
* @return {@code true} if the character is a valid first character of an Avro fullname, or {@code false} otherwise
* @see #isValidFullname(String)
*/
public static boolean isValidFullnameFirstCharacter(char c) {
static boolean isValidFullnameFirstCharacter(char c) {
return c == '_' || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z');
}
@ -254,7 +254,7 @@ public static boolean isValidFullnameFirstCharacter(char c) {
* @return {@code true} if the character is a valid non-first character of an Avro fullname, or {@code false} otherwise
* @see #isValidFullname(String)
*/
public static boolean isValidFullnameNonFirstCharacter(char c) {
static boolean isValidFullnameNonFirstCharacter(char c) {
return c == '.' || isValidFullnameFirstCharacter(c) || (c >= '0' && c <= '9');
}
@ -264,7 +264,7 @@ public static boolean isValidFullnameNonFirstCharacter(char c) {
* @param proposedName the proposed fullname; may not be null
* @return the valid fullname for Avro; never null
*/
public static String validFullname(String proposedName) {
static String validFullname(String proposedName) {
return validFullname(proposedName, "_");
}
@ -275,7 +275,7 @@ public static String validFullname(String proposedName) {
* @param replacement the character sequence that should be used to replace all invalid characters
* @return the valid fullname for Avro; never null
*/
public static String validFullname(String proposedName, String replacement) {
static String validFullname(String proposedName, String replacement) {
return validFullname(proposedName, c -> replacement);
}
@ -286,7 +286,7 @@ public static String validFullname(String proposedName, String replacement) {
* @param replacement the character sequence that should be used to replace all invalid characters
* @return the valid fullname for Avro; never null
*/
public static String validFullname(String proposedName, ReplacementFunction replacement) {
static String validFullname(String proposedName, ReplacementFunction replacement) {
return validFullname(proposedName, replacement, null);
}
@ -298,7 +298,7 @@ public static String validFullname(String proposedName, ReplacementFunction repl
* @param uponReplacement the function to be called every time the proposed name is invalid and replaced; may be null
* @return the valid fullname for Avro; never null
*/
public static String validFullname(String proposedName, ReplacementFunction replacement, ReplacementOccurred uponReplacement) {
static String validFullname(String proposedName, ReplacementFunction replacement, ReplacementOccurred uponReplacement) {
if (proposedName.length() == 0) {
return proposedName;
}

View File

@ -53,7 +53,7 @@ public abstract class Stopwatch {
* The average and total durations as measured by one or more stopwatches.
*/
@ThreadSafe
public static interface Durations {
public interface Durations {
/**
* Get the statistics for the durations in nanoseconds.
@ -66,48 +66,48 @@ public static interface Durations {
/**
* The timing statistics for a recorded set of samples.
*/
public static interface Statistics {
public interface Statistics {
/**
* Returns the count of durations recorded.
*
* @return the count of durations
*/
public long getCount();
long getCount();
/**
* Returns the total of all recorded durations.
*
* @return The total duration; never null but possibly {@link Duration#ZERO}.
*/
public Duration getTotal();
Duration getTotal();
/**
* Returns the minimum of all recorded durations.
*
* @return The minimum duration; never null but possibly {@link Duration#ZERO}.
*/
public Duration getMinimum();
Duration getMinimum();
/**
* Returns the maximum of all recorded durations.
*
* @return The maximum duration; never null but possibly {@link Duration#ZERO}.
*/
public Duration getMaximum();
Duration getMaximum();
/**
* Returns the arithmetic mean of all recorded durations.
*
* @return The average duration; never null but possibly {@link Duration#ZERO}.
*/
public Duration getAverage();
Duration getAverage();
/**
* Returns a string representation of the total of all recorded durations.
*
* @return the string representation of the total duration; never null but possibly {@link Duration#ZERO}.
*/
default public String getTotalAsString() {
default String getTotalAsString() {
return asString(getTotal());
}
@ -116,7 +116,7 @@ default public String getTotalAsString() {
*
* @return the string representation of the minimum duration; never null but possibly {@link Duration#ZERO}.
*/
default public String getMinimumAsString() {
default String getMinimumAsString() {
return asString(getMinimum());
}
@ -125,7 +125,7 @@ default public String getMinimumAsString() {
*
* @return the string representation of the maximum duration; never null but possibly {@link Duration#ZERO}.
*/
default public String getMaximumAsString() {
default String getMaximumAsString() {
return asString(getMaximum());
}
@ -134,7 +134,7 @@ default public String getMaximumAsString() {
*
* @return the string representation of the average duration; never null but possibly {@link Duration#ZERO}.
*/
default public String getAverageAsString() {
default String getAverageAsString() {
return asString(getAverage());
}
}
@ -264,7 +264,7 @@ public static Stopwatch accumulating() {
* </p>
*/
@ThreadSafe
public static interface StopwatchSet extends Durations {
public interface StopwatchSet extends Durations {
/**
* Create a new stopwatch that records durations with this set.
*
@ -277,7 +277,7 @@ public static interface StopwatchSet extends Durations {
*
* @param runnable the function to call
*/
default public void time(Runnable runnable) {
default void time(Runnable runnable) {
time(1, runnable);
}
@ -287,7 +287,7 @@ default public void time(Runnable runnable) {
* @param runnable the function that is to be executed; may not be null
* @return the result of the operation
*/
default public <T> T time(Callable<T> runnable) {
default <T> T time(Callable<T> runnable) {
Stopwatch sw = create().start();
try {
return runnable.call();
@ -309,7 +309,7 @@ default public <T> T time(Callable<T> runnable) {
* @param repeat the number of times to repeat the function call; must be positive
* @param runnable the function to call; may not be null
*/
default public void time(int repeat, Runnable runnable) {
default void time(int repeat, Runnable runnable) {
for (int i = 0; i != repeat; ++i) {
Stopwatch sw = create().start();
try {
@ -330,7 +330,7 @@ default public void time(int repeat, Runnable runnable) {
* in the time measurements; may be null
* @throws Exception the exception thrown by the runnable function
*/
default public <T> void time(int repeat, Callable<T> runnable, Consumer<T> cleanup) throws Exception {
default <T> void time(int repeat, Callable<T> runnable, Consumer<T> cleanup) throws Exception {
for (int i = 0; i != repeat; ++i) {
T result = null;
Stopwatch sw = create().start();

View File

@ -154,7 +154,7 @@ public static List<Pattern> listOfRegex(String input, int regexFlags) {
* Represents a predicate (boolean-valued function) of one character argument.
*/
@FunctionalInterface
public static interface CharacterPredicate {
public interface CharacterPredicate {
/**
* Evaluates this predicate on the given character argument.
*
@ -394,7 +394,7 @@ public static String setLength(String original,
return justifyLeft(original, length, padChar, false);
}
public static enum Justify {
public enum Justify {
LEFT,
RIGHT,
CENTER;

View File

@ -32,7 +32,7 @@ public class Threads {
/**
* Measures the amount time that has elapsed since the last {@link #reset() reset}.
*/
public static interface TimeSince {
public interface TimeSince {
/**
* Reset the elapsed time to 0.
*/
@ -50,7 +50,7 @@ public static interface TimeSince {
* Expires after defined time period.
*
*/
public static interface Timer {
public interface Timer {
/**
* @return true if current time is greater than start time plus requested time period

View File

@ -58,7 +58,7 @@
public class VerifyRecord {
@FunctionalInterface
public static interface RecordValueComparator {
public interface RecordValueComparator {
/**
* Assert that the actual and expected values are equal. By the time this method is called, the actual value
* and expected values are both determined to be non-null.

View File

@ -17,7 +17,7 @@ public interface DatabaseVersionResolver {
*/
DatabaseVersion getVersion();
public class DatabaseVersion {
class DatabaseVersion {
private final int dbVersionMajor;
private final int dbVersionMinor;
private final int dbVersionPatch;

View File

@ -28,7 +28,7 @@
String description() default "";
public enum Connector {
enum Connector {
SQL_SERVER {
@Override

View File

@ -26,7 +26,7 @@
String description() default "";
public enum KafkaVersion {
enum KafkaVersion {
KAFKA_1XX {
@Override
boolean isLessThan(int major, int minor, int patch) {

View File

@ -423,7 +423,7 @@ public Usage useTo() {
* @param <K> the type of key
* @param <V> the type of value
*/
public static interface InteractiveProducer<K, V> extends Closeable {
public interface InteractiveProducer<K, V> extends Closeable {
/**
* Write to the topic with the given name a record with the specified key and value. The message is flushed immediately.
*
@ -448,7 +448,7 @@ default InteractiveProducer<K, V> write(String topic, K key, V value) {
* Close this producer's connection to Kafka and clean up all resources.
*/
@Override
public void close();
void close();
}
/**
@ -457,7 +457,7 @@ default InteractiveProducer<K, V> write(String topic, K key, V value) {
* @param <K> the type of key
* @param <V> the type of value
*/
public static interface InteractiveConsumer<K, V> extends Closeable {
public interface InteractiveConsumer<K, V> extends Closeable {
/**
* Block until a record can be read from this consumer's topic, and return the value in that record.
*
@ -523,7 +523,7 @@ default V nextValue(long timeout, TimeUnit unit) throws InterruptedException {
* Asynchronously close this consumer's connection to Kafka and begin to clean up all resources.
*/
@Override
public void close();
void close();
}
/**

View File

@ -28,10 +28,10 @@ private static final class CompositeKey {
public int a;
public int b;
public CompositeKey() {
CompositeKey() {
}
public CompositeKey(int a, int b) {
CompositeKey(int a, int b) {
super();
this.a = a;
this.b = b;
@ -69,10 +69,10 @@ private static final class Customer {
public String email;
public Customer() {
Customer() {
}
public Customer(int id, String firstName, String lastName, String email) {
Customer(int id, String firstName, String lastName, String email) {
super();
this.id = id;
this.firstName = firstName;

View File

@ -54,7 +54,7 @@ public class ExtractNewRecordStateTest {
@Test
public void testTombstoneDroppedByDefault() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
@ -65,7 +65,7 @@ public void testTombstoneDroppedByDefault() {
@Test
public void testTombstoneDroppedConfigured() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(DROP_TOMBSTONES, "true");
transform.configure(props);
@ -77,7 +77,7 @@ public void testTombstoneDroppedConfigured() {
@Test
public void testTombstoneForwardConfigured() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(DROP_TOMBSTONES, "false");
transform.configure(props);
@ -89,7 +89,7 @@ public void testTombstoneForwardConfigured() {
@Test
public void testTruncateDroppedByDefault() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
@ -239,7 +239,7 @@ private String getSourceRecordHeaderByKey(SourceRecord record, String headerKey)
@Test
public void testDeleteDroppedByDefault() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
@ -250,7 +250,7 @@ public void testDeleteDroppedByDefault() {
@Test
public void testHandleDeleteDrop() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "drop");
transform.configure(props);
@ -262,7 +262,7 @@ public void testHandleDeleteDrop() {
@Test
public void testHandleDeleteNone() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "none");
transform.configure(props);
@ -275,7 +275,7 @@ public void testHandleDeleteNone() {
@Test
public void testHandleDeleteRewrite() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite");
transform.configure(props);
@ -288,7 +288,7 @@ public void testHandleDeleteRewrite() {
@Test
public void testHandleCreateRewrite() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite");
props.put(ADD_HEADERS, "op");
@ -305,7 +305,7 @@ public void testHandleCreateRewrite() {
@Test
public void testUnwrapCreateRecord() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
@ -319,7 +319,7 @@ public void testUnwrapCreateRecord() {
@Test
@FixFor("DBZ-5166")
public void testUnwrapCreateRecordWithOptionalDefaultValue() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
@ -332,7 +332,7 @@ public void testUnwrapCreateRecordWithOptionalDefaultValue() {
@Test
public void testIgnoreUnknownRecord() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
@ -347,7 +347,7 @@ public void testIgnoreUnknownRecord() {
@Test
@FixFor("DBZ-971")
public void testUnwrapPropagatesRecordHeaders() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
@ -367,7 +367,7 @@ public void testUnwrapPropagatesRecordHeaders() {
@Test
@FixFor("DBZ-1452")
public void testAddField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op");
transform.configure(props);
@ -381,7 +381,7 @@ public void testAddField() {
@Test
@FixFor("DBZ-2984")
public void testAddTimestamp() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props1 = new HashMap<>();
props1.put(ADD_FIELDS, "ts_ms");
transform.configure(props1);
@ -403,7 +403,7 @@ public void testAddTimestamp() {
@Test
@FixFor({ "DBZ-1452", "DBZ-2504" })
public void testAddFields() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op , lsn,id");
props.put(ADD_FIELDS_PREFIX, "prefix.");
@ -420,7 +420,7 @@ public void testAddFields() {
@Test
@FixFor({ "DBZ-2606" })
public void testNewFieldAndHeaderMapping() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
String fieldPrefix = "";
String headerPrefix = "prefix.";
@ -453,7 +453,7 @@ public void testNewFieldAndHeaderMapping() {
@Test
@FixFor("DBZ-1452")
public void testAddFieldsForMissingOptionalField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op,lsn,id");
transform.configure(props);
@ -469,7 +469,7 @@ public void testAddFieldsForMissingOptionalField() {
@Test
@FixFor("DBZ-1452")
public void testAddFieldsSpecifyStruct() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op,source.lsn,transaction.id,transaction.total_order");
transform.configure(props);
@ -486,7 +486,7 @@ public void testAddFieldsSpecifyStruct() {
@Test
@FixFor("DBZ-1452")
public void testAddHeader() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op");
transform.configure(props);
@ -502,7 +502,7 @@ public void testAddHeader() {
@Test
@FixFor("DBZ-1452")
public void testAddHeaders() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op , lsn,id");
transform.configure(props);
@ -522,7 +522,7 @@ public void testAddHeaders() {
@Test
@FixFor("DBZ-1452")
public void testAddHeadersForMissingOptionalField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,lsn,id");
transform.configure(props);
@ -542,7 +542,7 @@ public void testAddHeadersForMissingOptionalField() {
@Test
@FixFor({ "DBZ-1452", "DBZ-2504" })
public void testAddHeadersSpecifyStruct() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,source.lsn,transaction.id,transaction.total_order");
props.put(ADD_HEADERS_PREFIX, "prefix.");
@ -564,7 +564,7 @@ public void testAddHeadersSpecifyStruct() {
@Test
public void testAddTopicRoutingField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ROUTE_BY_FIELD, "name");
transform.configure(props);
@ -577,7 +577,7 @@ public void testAddTopicRoutingField() {
@Test
public void testUpdateTopicRoutingField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ROUTE_BY_FIELD, "name");
transform.configure(props);
@ -590,7 +590,7 @@ public void testUpdateTopicRoutingField() {
@Test
public void testDeleteTopicRoutingField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ROUTE_BY_FIELD, "name");
props.put(HANDLE_DELETES, "none");
@ -605,7 +605,7 @@ public void testDeleteTopicRoutingField() {
@Test
@FixFor("DBZ-1876")
public void testAddHeadersHandleDeleteRewriteAndTombstone() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite");
props.put(ADD_HEADERS, "op,source.lsn");
@ -628,7 +628,7 @@ public void testAddHeadersHandleDeleteRewriteAndTombstone() {
@Test(expected = IllegalArgumentException.class)
public void testAddFieldNonExistantField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "nope");
transform.configure(props);
@ -643,7 +643,7 @@ public void testAddFieldNonExistantField() {
@Test
@FixFor("DBZ-1452")
public void testAddFieldHandleDeleteRewrite() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite");
props.put(ADD_FIELDS, "op");
@ -659,7 +659,7 @@ public void testAddFieldHandleDeleteRewrite() {
@Test
@FixFor("DBZ-1452")
public void testAddFieldsHandleDeleteRewrite() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite");
props.put(ADD_FIELDS, "op,lsn");
@ -676,7 +676,7 @@ public void testAddFieldsHandleDeleteRewrite() {
@Test
@FixFor("DBZ-1876")
public void testAddFieldsHandleDeleteRewriteAndTombstone() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite");
props.put(ADD_FIELDS, "op,lsn");
@ -697,7 +697,7 @@ public void testAddFieldsHandleDeleteRewriteAndTombstone() {
@Test
@FixFor("DBZ-1452")
public void testAddFieldsSpecifyStructHandleDeleteRewrite() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite");
props.put(ADD_FIELDS, "op,source.lsn");
@ -714,7 +714,7 @@ public void testAddFieldsSpecifyStructHandleDeleteRewrite() {
@Test
@FixFor("DBZ-1517")
public void testSchemaChangeEventWithOperationHeader() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
try (ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op");
transform.configure(props);

View File

@ -38,7 +38,7 @@ default void resetBeforeEachTest() {
Timer.reset();
}
public static final class Print {
final class Print {
private static boolean enabled = false;
public static void enable() {
@ -54,13 +54,13 @@ public static boolean isEnabled() {
}
}
public static void print(Object message) {
static void print(Object message) {
if (message != null && Print.enabled) {
System.out.println(message);
}
}
public static void print(int length, String leader, Object message) {
static void print(int length, String leader, Object message) {
if (message != null && Print.enabled) {
int len = leader.length();
System.out.print(leader);
@ -73,7 +73,7 @@ public static void print(int length, String leader, Object message) {
}
}
public static final class Debug {
final class Debug {
private static boolean enabled = false;
public static void enable() {
@ -89,25 +89,25 @@ public static boolean isEnabled() {
}
}
public static void debug(Object message) {
static void debug(Object message) {
if (message != null && Debug.enabled) {
System.out.println(message);
}
}
public static void printError(Object message) {
static void printError(Object message) {
if (message != null) {
System.err.println(message);
}
}
public static void printError(Throwable throwable) {
static void printError(Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
}
}
public static void printError(String message, Throwable throwable) {
static void printError(String message, Throwable throwable) {
printError(message);
printError(throwable);
}
@ -115,14 +115,14 @@ public static void printError(String message, Throwable throwable) {
/**
* Network-related utility methods.
*/
public static interface Network {
interface Network {
/**
* Find a port that is available. This method starts a {@link ServerSocket} and obtains the port on which the socket is
* listening, and then shuts down the socket so the port becomes available.
*
* @return the number of the now-available port
*/
public static int getAvailablePort() {
static int getAvailablePort() {
return IoUtil.getAvailablePort();
}
@ -131,12 +131,12 @@ public static int getAvailablePort() {
/**
* File system utility methods.
*/
public static interface Files {
interface Files {
public static final String DBZ_TEST_DATA_DIR_ENV_VAR_NAME = "DBZ_TEST_DATA_DIR";
public static final String DBZ_TEST_DATA_DIR_SYSTEM_PROPERTY_KEY = "dbz.test.data.dir";
String DBZ_TEST_DATA_DIR_ENV_VAR_NAME = "DBZ_TEST_DATA_DIR";
String DBZ_TEST_DATA_DIR_SYSTEM_PROPERTY_KEY = "dbz.test.data.dir";
static final String DATA_DIR = determineTestDataDir();
String DATA_DIR = determineTestDataDir();
static String determineTestDataDir() {
String value = System.getProperty(DBZ_TEST_DATA_DIR_SYSTEM_PROPERTY_KEY);
@ -159,7 +159,7 @@ static String determineTestDataDir() {
* @param testClass the test class, used for accessing the class loader
* @return the string representation
*/
public static InputStream readResourceAsStream(String pathOnClasspath, Class<?> testClass) {
static InputStream readResourceAsStream(String pathOnClasspath, Class<?> testClass) {
InputStream stream = testClass.getClassLoader().getResourceAsStream(pathOnClasspath);
assertThat(stream).isNotNull();
return stream;
@ -171,7 +171,7 @@ public static InputStream readResourceAsStream(String pathOnClasspath, Class<?>
* @param pathOnClasspath the path of the resource on the classpath
* @return the string representation
*/
public static InputStream readResourceAsStream(String pathOnClasspath) {
static InputStream readResourceAsStream(String pathOnClasspath) {
return readResourceAsStream(pathOnClasspath, Testing.class);
}
@ -181,7 +181,7 @@ public static InputStream readResourceAsStream(String pathOnClasspath) {
* @param pathOnClasspath the path of the resource on the classpath
* @return the string representation
*/
public static String readResourceAsString(String pathOnClasspath) {
static String readResourceAsString(String pathOnClasspath) {
try (InputStream stream = readResourceAsStream(pathOnClasspath)) {
return IoUtil.read(stream);
}
@ -197,7 +197,7 @@ public static String readResourceAsString(String pathOnClasspath) {
* @param relativePath the path of the directory within the test data directory; may not be null
* @return the reference to the existing readable and writable directory
*/
public static File createTestingDirectory(String relativePath) {
static File createTestingDirectory(String relativePath) {
Path dirPath = createTestingPath(relativePath);
return IoUtil.createDirectory(dirPath);
}
@ -215,7 +215,7 @@ static String dataDir() {
*
* @return the reference to the existing readable and writable file
*/
public static File createTestingFile() {
static File createTestingFile() {
return createTestingFile(UUID.randomUUID().toString());
}
@ -225,7 +225,7 @@ public static File createTestingFile() {
* @param relativePath the path of the file within the test data directory; may not be null
* @return the reference to the existing readable and writable file
*/
public static File createTestingFile(String relativePath) {
static File createTestingFile(String relativePath) {
Path path = createTestingPath(relativePath);
return IoUtil.createFile(path);
}
@ -236,7 +236,7 @@ public static File createTestingFile(String relativePath) {
* @param relativePath the path of the file within the test data directory; may not be null
* @return the reference to the existing readable and writable file
*/
public static File createTestingFile(Path relativePath) {
static File createTestingFile(Path relativePath) {
Path path = relativePath.toAbsolutePath();
if (!inTestDataDir(path)) {
throw new IllegalStateException("Expecting '" + relativePath + "' to be within the testing directory");
@ -250,7 +250,7 @@ public static File createTestingFile(Path relativePath) {
* @param relativePath the path of the file within the test data directory; may not be null
* @return the reference to the existing readable and writable file
*/
public static Path createTestingPath(String relativePath) {
static Path createTestingPath(String relativePath) {
return Paths.get(dataDir(), relativePath).toAbsolutePath();
}
@ -262,7 +262,7 @@ public static Path createTestingPath(String relativePath) {
* @return the reference to the existing readable and writable directory
* @throws IOException if there is a problem deleting the files at this path
*/
public static File createTestingDirectory(String relativePath, boolean removeExistingContent) throws IOException {
static File createTestingDirectory(String relativePath, boolean removeExistingContent) throws IOException {
Path dirPath = createTestingPath(relativePath);
return IoUtil.createDirectory(dirPath, removeExistingContent);
}
@ -273,7 +273,7 @@ public static File createTestingDirectory(String relativePath, boolean removeExi
*
* @param path the path to the file or folder in the target directory
*/
public static void delete(String path) {
static void delete(String path) {
if (path != null) {
delete(Paths.get(path));
}
@ -285,7 +285,7 @@ public static void delete(String path) {
*
* @param fileOrFolder the file or folder in the target directory
*/
public static void delete(File fileOrFolder) {
static void delete(File fileOrFolder) {
if (fileOrFolder != null) {
delete(fileOrFolder.toPath());
}
@ -297,7 +297,7 @@ public static void delete(File fileOrFolder) {
*
* @param path the path to the file or folder in the target directory
*/
public static void delete(Path path) {
static void delete(Path path) {
if (path != null) {
path = path.toAbsolutePath();
if (inTestDataDir(path)) {
@ -320,7 +320,7 @@ public static void delete(Path path) {
* @param file the file or directory; may not be null
* @return true if inside the test data directory, or false otherwise
*/
public static boolean inTestDataDir(File file) {
static boolean inTestDataDir(File file) {
return inTestDataDir(file.toPath());
}
@ -330,29 +330,29 @@ public static boolean inTestDataDir(File file) {
* @param path the path to the file or directory; may not be null
* @return true if inside the test data directory, or false otherwise
*/
public static boolean inTestDataDir(Path path) {
static boolean inTestDataDir(Path path) {
Path target = FileSystems.getDefault().getPath(dataDir()).toAbsolutePath();
return path.toAbsolutePath().startsWith(target);
}
}
default public Statistics once(InterruptableFunction runnable) throws InterruptedException {
default Statistics once(InterruptableFunction runnable) throws InterruptedException {
return Timer.time(null, 1, runnable, null);
}
default public <T> Statistics once(Callable<T> runnable, Consumer<T> cleanup) throws InterruptedException {
default <T> Statistics once(Callable<T> runnable, Consumer<T> cleanup) throws InterruptedException {
return Timer.time(null, 1, runnable, cleanup);
}
default public Statistics time(String desc, int repeat, InterruptableFunction runnable) throws InterruptedException {
default Statistics time(String desc, int repeat, InterruptableFunction runnable) throws InterruptedException {
return Timer.time(desc, repeat, runnable, null);
}
default public <T> Statistics time(String desc, int repeat, Callable<T> runnable, Consumer<T> cleanup) throws InterruptedException {
default <T> Statistics time(String desc, int repeat, Callable<T> runnable, Consumer<T> cleanup) throws InterruptedException {
return Timer.time(desc, repeat, runnable, cleanup);
}
public static final class Timer {
final class Timer {
private static Stopwatch sw = Stopwatch.accumulating();
private static StopwatchSet sws = Stopwatch.multiple();
@ -393,8 +393,8 @@ protected static <T> Statistics time(String desc, int repeat, Callable<T> runnab
}
@FunctionalInterface
public static interface InterruptableFunction extends Callable<Void> {
interface InterruptableFunction extends Callable<Void> {
@Override
public Void call() throws InterruptedException;
Void call() throws InterruptedException;
}
}

View File

@ -486,7 +486,7 @@ public boolean hasError() {
*/
@ThreadSafe
@Deprecated
public static interface RecordCommitter extends DebeziumEngine.RecordCommitter<SourceRecord> {
public interface RecordCommitter extends DebeziumEngine.RecordCommitter<SourceRecord> {
}
/**
@ -494,7 +494,7 @@ public static interface RecordCommitter extends DebeziumEngine.RecordCommitter<S
* to process multiple records in one go, acknowledging their processing once that's done.
*/
@Deprecated
public static interface ChangeConsumer extends DebeziumEngine.ChangeConsumer<SourceRecord> {
public interface ChangeConsumer extends DebeziumEngine.ChangeConsumer<SourceRecord> {
}
private static ChangeConsumer buildDefaultChangeConsumer(Consumer<SourceRecord> consumer) {
@ -534,7 +534,7 @@ public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitt
* A builder to set up and create {@link EmbeddedEngine} instances.
*/
@Deprecated
public static interface Builder extends DebeziumEngine.Builder<SourceRecord> {
public interface Builder extends DebeziumEngine.Builder<SourceRecord> {
/**
* Use the specified configuration for the connector. The configuration is assumed to already be valid.

View File

@ -16,7 +16,7 @@ class EmbeddedEngineChangeEvent<K, V> implements ChangeEvent<K, V>, RecordChange
private final V value;
private final SourceRecord sourceRecord;
public EmbeddedEngineChangeEvent(K key, V value, SourceRecord sourceRecord) {
EmbeddedEngineChangeEvent(K key, V value, SourceRecord sourceRecord) {
this.key = key;
this.value = value;
this.sourceRecord = sourceRecord;

View File

@ -26,7 +26,7 @@ public interface OffsetCommitPolicy extends io.debezium.engine.spi.OffsetCommitP
* An {@link OffsetCommitPolicy} that will commit offsets as frequently as possible. This may result in reduced
* performance, but it has the least potential for seeing source records more than once upon restart.
*/
public static class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy {
class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy {
@Override
public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
@ -39,7 +39,7 @@ public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration time
* time is less than {@code 0} then the policy will behave as {@link AlwaysCommitOffsetPolicy}.
* @see io.debezium.embedded.EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS
*/
public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {
class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {
private final Duration minimumTime;

View File

@ -23,7 +23,7 @@ public interface AvailableVariables {
* @return the available variables function that returns the configuration property value for the given property name; never
* null
*/
public static AvailableVariables configVariables(Configuration config) {
static AvailableVariables configVariables(Configuration config) {
return config::getString;
}
@ -36,7 +36,7 @@ public static AvailableVariables configVariables(Configuration config) {
* @param config the configuration
* @return the available variables function that returns System properties; never null
*/
public static AvailableVariables systemVariables(Configuration config) {
static AvailableVariables systemVariables(Configuration config) {
return System::getProperty;
}
@ -49,7 +49,7 @@ public static AvailableVariables systemVariables(Configuration config) {
* @param config the configuration
* @return the available variables function that returns System properties; never null
*/
public static AvailableVariables environmentVariables(Configuration config) {
static AvailableVariables environmentVariables(Configuration config) {
return System::getenv;
}
@ -58,7 +58,7 @@ public static AvailableVariables environmentVariables(Configuration config) {
*
* @return the empty available variables function that always return null.
*/
public static AvailableVariables empty() {
static AvailableVariables empty() {
return (varName) -> null;
}

View File

@ -170,7 +170,7 @@ public abstract class ConnectorOutputTest {
public static final String CONTROL_STOP = "stop";
public static final String CONTROL_END = "end";
private static enum ExecutionResult {
private enum ExecutionResult {
/**
* The connector stopped after actual records did not match expected records.
*/
@ -190,7 +190,7 @@ private static enum ExecutionResult {
}
@FunctionalInterface
public static interface TestData extends AutoCloseable {
public interface TestData extends AutoCloseable {
/**
* Read the records that are expected by the test.
*
@ -574,17 +574,17 @@ public TestSpecification withVariables(VariableSupplier variableSupplier) {
}
@FunctionalInterface
protected static interface VariableSupplier {
protected interface VariableSupplier {
Map<String, String> get(Configuration config) throws Exception;
}
@FunctionalInterface
protected static interface InputStreamSupplier {
protected interface InputStreamSupplier {
InputStream get() throws IOException;
}
@FunctionalInterface
protected static interface OutputStreamSupplier {
protected interface OutputStreamSupplier {
OutputStream get() throws IOException;
}
@ -1169,7 +1169,7 @@ private static class SchemaAndValueConverter implements AutoCloseable {
private final ObjectMapper mapper = new ObjectMapper();
private final DocumentReader jsonReader = DocumentReader.defaultReader();
public SchemaAndValueConverter(Configuration config, boolean isKey) {
SchemaAndValueConverter(Configuration config, boolean isKey) {
jsonConverter.configure(config.asMap(), isKey);
jsonSerializer.configure(config.asMap(), isKey);
jsonDeserializer.configure(config.asMap(), isKey);

View File

@ -112,13 +112,13 @@ protected void populateTables(JdbcConnection connection) throws SQLException {
}
protected void populateTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
populateTable(connection);
}
}
protected void populateTableWithSpecificValue(int startRow, int count, int value) throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
populateTableWithSpecificValue(connection, tableName(), startRow, count, value);
}
}
@ -135,7 +135,7 @@ private void populateTableWithSpecificValue(JdbcConnection connection, String ta
}
protected void populateTables() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
populateTables(connection);
}
}
@ -254,7 +254,7 @@ protected void sendAdHocSnapshotSignalWithAdditionalCondition(Optional<String> a
final String dataCollectionIdsList = Arrays.stream(dataCollectionIds)
.map(x -> '"' + x + '"')
.collect(Collectors.joining(", "));
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
String query;
if (additionalCondition.isPresent()) {
query = String.format(
@ -283,7 +283,7 @@ protected void sendAdHocSnapshotStopSignal(String... dataCollectionIds) throws S
collections = ",\"data-collections\": [" + dataCollectionIdsList + "]";
}
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
String query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'stop-snapshot', '{\"type\": \"INCREMENTAL\"" + collections + "}')",
signalTableName());
@ -300,7 +300,7 @@ protected void sendAdHocSnapshotSignal() throws SQLException {
}
protected void sendPauseSignal() {
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
String query = String.format("INSERT INTO %s VALUES('test-pause', 'pause-snapshot', '')", signalTableName());
logger.info("Sending pause signal with query {}", query);
connection.execute(query);
@ -311,7 +311,7 @@ protected void sendPauseSignal() {
}
protected void sendResumeSignal() {
try (final JdbcConnection connection = databaseConnection()) {
try (JdbcConnection connection = databaseConnection()) {
String query = String.format("INSERT INTO %s VALUES('test-resume', 'resume-snapshot', '')", signalTableName());
logger.info("Sending resume signal with query {}", query);
connection.execute(query);

View File

@ -30,7 +30,7 @@ public void put(String key, String value) {
}
public String export() {
try (final Writer sw = new StringWriter()) {
try (Writer sw = new StringWriter()) {
props.store(sw, null);
return sw.toString();
}

View File

@ -57,7 +57,7 @@ public void onExportedEvent(@Observes ExportedEvent<?, ?> event) {
.withTag(TIMESTAMP, event.getTimestamp().toString());
final Span activeSpan = spanBuilder.start();
try (final Scope outboxSpanScope = tracer.scopeManager().activate(activeSpan)) {
try (Scope outboxSpanScope = tracer.scopeManager().activate(activeSpan)) {
Tags.COMPONENT.set(activeSpan, TRACING_COMPONENT);
tracer.inject(activeSpan.context(), Format.Builtin.TEXT_MAP, exportedSpanData);

View File

@ -26,7 +26,7 @@ default FieldFilter getFieldFilter() {
return f -> true;
}
public interface FieldFilter {
interface FieldFilter {
boolean include(Field field);
}

View File

@ -44,14 +44,14 @@ public abstract class ScriptingTransformation<R extends ConnectRecord<R>> implem
private static final String JAVAX_SCRIPT_ENGINE_PREFIX = "jsr223.";
private static final String GRAAL_JS_ENGINE = "graal.js";
public static enum NullHandling implements EnumeratedValue {
public enum NullHandling implements EnumeratedValue {
DROP("drop"),
KEEP("keep"),
EVALUATE("evaluate");
private final String value;
private NullHandling(String value) {
NullHandling(String value) {
this.value = value;
}

View File

@ -49,7 +49,7 @@ public class FilterTest {
@Test(expected = DebeziumException.class)
public void testLanguageRequired() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "operation != 'd'");
transform.configure(props);
@ -58,7 +58,7 @@ public void testLanguageRequired() {
@Test(expected = DebeziumException.class)
public void testExpressionRequired() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(LANGUAGE, "jsr223.groovy");
transform.configure(props);
@ -67,7 +67,7 @@ public void testExpressionRequired() {
@Test(expected = DebeziumException.class)
public void shouldFailOnUnkownLanguage() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "operation != 'd'");
props.put(LANGUAGE, "jsr223.jython");
@ -77,7 +77,7 @@ public void shouldFailOnUnkownLanguage() {
@Test(expected = DebeziumException.class)
public void shouldFailToParseCondition() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "operation != 'd");
props.put(LANGUAGE, "jsr223.groovy");
@ -87,7 +87,7 @@ public void shouldFailToParseCondition() {
@Test
public void shouldProcessCondition() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value.op != 'd' || value.before.id != 2");
props.put(LANGUAGE, "jsr223.groovy");
@ -101,7 +101,7 @@ public void shouldProcessCondition() {
@Test
@FixFor("DBZ-2074")
public void shouldProcessTopic() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "topic == 'dummy1'");
props.put(LANGUAGE, "jsr223.groovy");
@ -115,7 +115,7 @@ public void shouldProcessTopic() {
@Test
@FixFor("DBZ-2074")
public void shouldProcessHeader() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "header.idh.value == 1");
props.put(LANGUAGE, "jsr223.groovy");
@ -129,7 +129,7 @@ public void shouldProcessHeader() {
@Test
@FixFor("DBZ-2024")
public void shouldApplyTopicRegex() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(TOPIC_REGEX, "dum.*");
props.put(EXPRESSION, "value.op != 'd' || value.before.id != 2");
@ -143,7 +143,7 @@ public void shouldApplyTopicRegex() {
@Test
public void shouldKeepNulls() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value.op != 'd' || value.before.id != 2");
props.put(LANGUAGE, "jsr223.groovy");
@ -155,7 +155,7 @@ public void shouldKeepNulls() {
@Test
public void shouldDropNulls() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value.op != 'd' || value.before.id != 2");
props.put(LANGUAGE, "jsr223.groovy");
@ -168,7 +168,7 @@ public void shouldDropNulls() {
@Test(expected = DebeziumException.class)
public void shouldEvaluateNulls() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value.op != 'd' || value.before.id != 2");
props.put(LANGUAGE, "jsr223.groovy");
@ -237,7 +237,7 @@ private SourceRecord createNullRecord() {
@Test
public void shouldRunJavaScript() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value.op != 'd' || value.before.id != 2");
props.put(LANGUAGE, "jsr223.graal.js");
@ -251,7 +251,7 @@ public void shouldRunJavaScript() {
@Test
@FixFor("DBZ-2074")
public void shouldRunJavaScriptWithHeaderAndTopic() {
try (final Filter<SourceRecord> transform = new Filter<>()) {
try (Filter<SourceRecord> transform = new Filter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "header.idh.value == 1 && topic.startsWith('dummy')");
props.put(LANGUAGE, "jsr223.graal.js");

View File

@ -48,7 +48,7 @@ public class RouterTest {
@Test(expected = DebeziumException.class)
public void testExpressionRequired() {
try (final ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
try (ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(LANGUAGE, "jsr223.groovy");
transform.configure(props);
@ -57,7 +57,7 @@ public void testExpressionRequired() {
@Test(expected = DebeziumException.class)
public void shouldFailOnInvalidReturnValue() {
try (final ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
try (ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "1");
props.put(LANGUAGE, "jsr223.groovy");
@ -68,7 +68,7 @@ public void shouldFailOnInvalidReturnValue() {
@Test
public void shouldRoute() {
try (final ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
try (ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value == null ? 'nulls' : (value.before.id == 1 ? 'ones' : null)");
props.put(LANGUAGE, "jsr223.groovy");
@ -80,7 +80,7 @@ public void shouldRoute() {
@Test
public void shouldRouteMongoDbFormat() {
try (final ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
try (ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value == null ? 'nulls' : ((new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar' ? 'kretchmar' : null)");
props.put(LANGUAGE, "jsr223.groovy");
@ -92,7 +92,7 @@ public void shouldRouteMongoDbFormat() {
@Test
@FixFor("DBZ-2024")
public void shouldApplyTopicRegex() {
try (final ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
try (ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(TOPIC_REGEX, "orig.*");
props.put(EXPRESSION, "value == null ? 'nulls' : (value.before.id == 1 ? 'ones' : null)");
@ -105,7 +105,7 @@ public void shouldApplyTopicRegex() {
@Test
public void shouldKeepNulls() {
try (final ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
try (ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value == null ? 'nulls' : (value.before.id == 1 ? 'ones' : null)");
props.put(LANGUAGE, "jsr223.groovy");
@ -117,7 +117,7 @@ public void shouldKeepNulls() {
@Test
public void shouldDropNulls() {
try (final ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
try (ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value == null ? 'nulls' : (value.before.id == 1 ? 'ones' : null)");
props.put(LANGUAGE, "jsr223.groovy");
@ -130,7 +130,7 @@ public void shouldDropNulls() {
@Test
public void shouldEvaluateNulls() {
try (final ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
try (ContentBasedRouter<SourceRecord> transform = new ContentBasedRouter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(EXPRESSION, "value == null ? 'nulls' : (value.before.id == 1 ? 'ones' : null)");
props.put(LANGUAGE, "jsr223.groovy");

View File

@ -12,5 +12,5 @@
*
*/
public interface StreamNameMapper {
public String map(String topic);
String map(String topic);
}

View File

@ -71,7 +71,7 @@ public void testPravega() {
.disableAutomaticCheckpoints()
.build();
try (final ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(STREAM_NAME, clientConfig)) {
try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(STREAM_NAME, clientConfig)) {
readerGroupManager.createReaderGroup(STREAM_NAME, readerGroupConfig);
}

View File

@ -46,7 +46,7 @@ public Map<String, String> start() {
container.start();
String scope = ConfigProvider.getConfig().getValue("debezium.sink.pravega.scope", String.class);
try (final StreamManager streamManager = StreamManager.create(URI.create(getControllerUri()))) {
try (StreamManager streamManager = StreamManager.create(URI.create(getControllerUri()))) {
streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))

View File

@ -69,7 +69,7 @@ public class PubSubChangeConsumer extends BaseChangeConsumer implements Debezium
private static final String PROP_PREFIX = "debezium.sink.pubsub.";
private static final String PROP_PROJECT_ID = PROP_PREFIX + "project.id";
public static interface PublisherBuilder {
public interface PublisherBuilder {
Publisher get(ProjectTopicName topicName);
}

View File

@ -56,7 +56,7 @@ public class PubSubLiteChangeConsumer extends BaseChangeConsumer implements Debe
private static final String PROP_PROJECT_ID = PROP_PREFIX + "project.id";
private static final String PROP_REGION = PROP_PREFIX + "region";
public static interface PublisherBuilder {
public interface PublisherBuilder {
Publisher get(String topicName);
}

View File

@ -48,7 +48,7 @@ public class PulsarChangeConsumer extends BaseChangeConsumer implements Debezium
private static final String PROP_CLIENT_PREFIX = PROP_PREFIX + "client.";
private static final String PROP_PRODUCER_PREFIX = PROP_PREFIX + "producer.";
public static interface ProducerBuilder {
public interface ProducerBuilder {
Producer<Object> get(String topicName, Object value);
}

View File

@ -103,7 +103,7 @@ public void testPulsar() throws Exception {
.with("password", dbPassword)
.with("dbname", dbName)
.build();
try (final PostgresConnection connection = new PostgresConnection(config, "Debezium Pulsar Test")) {
try (PostgresConnection connection = new PostgresConnection(config, "Debezium Pulsar Test")) {
connection.execute(
"CREATE TABLE inventory.nokey (val INT);",
"INSERT INTO inventory.nokey VALUES (1)",

View File

@ -24,7 +24,7 @@
public interface KafkaAssertions<K, V> {
static final Logger LOGGER = LoggerFactory.getLogger(KafkaAssertions.class);
Logger LOGGER = LoggerFactory.getLogger(KafkaAssertions.class);
static void awaitAssert(long timeout, TimeUnit unit, ThrowingRunnable assertion) {
await()

View File

@ -173,7 +173,7 @@ public void updateOrCreateConnector(String name, ConnectorConfiguration newConfi
private static void handleFailedResponse(Response response) {
String responseBodyContent = "{empty response body}";
try (final ResponseBody responseBody = response.body()) {
try (ResponseBody responseBody = response.body()) {
if (null != responseBody) {
responseBodyContent = responseBody.string();
}
@ -188,7 +188,7 @@ private void executePOSTRequestSuccessfully(final String payload, final String f
final RequestBody body = RequestBody.create(payload, JSON);
final Request request = new Request.Builder().url(fullUrl).post(body).build();
try (final Response response = CLIENT.newCall(request).execute()) {
try (Response response = CLIENT.newCall(request).execute()) {
if (!response.isSuccessful()) {
handleFailedResponse(response);
}
@ -202,7 +202,7 @@ private void executePUTRequestSuccessfully(final String payload, final String fu
final RequestBody body = RequestBody.create(payload, JSON);
final Request request = new Request.Builder().url(fullUrl).put(body).build();
try (final Response response = CLIENT.newCall(request).execute()) {
try (Response response = CLIENT.newCall(request).execute()) {
if (!response.isSuccessful()) {
handleFailedResponse(response);
}
@ -231,7 +231,7 @@ protected static Response executeGETRequestSuccessfully(Request request) {
public boolean connectorIsNotRegistered(String connectorName) {
final Request request = new Request.Builder().url(getConnectorUri(connectorName)).build();
try (final Response response = executeGETRequest(request)) {
try (Response response = executeGETRequest(request)) {
return response.code() == 404;
}
}
@ -251,7 +251,7 @@ public void deleteConnector(String connectorName) {
public List<String> getRegisteredConnectors() {
final Request request = new Request.Builder().url(getConnectorsUri()).build();
try (final ResponseBody responseBody = executeGETRequestSuccessfully(request).body()) {
try (ResponseBody responseBody = executeGETRequestSuccessfully(request).body()) {
if (null != responseBody) {
return MAPPER.readValue(responseBody.string(), new TypeReference<List<String>>() {
});
@ -265,7 +265,7 @@ public List<String> getRegisteredConnectors() {
public boolean isConnectorConfigured(String connectorName) {
final Request request = new Request.Builder().url(getConnectorUri(connectorName)).build();
try (final Response response = executeGETRequest(request)) {
try (Response response = executeGETRequest(request)) {
return response.isSuccessful();
}
}
@ -290,7 +290,7 @@ public void deleteAllConnectors() {
public Connector.State getConnectorState(String connectorName) {
final Request request = new Request.Builder().url(getConnectorStatusUri(connectorName)).build();
try (final ResponseBody responseBody = executeGETRequestSuccessfully(request).body()) {
try (ResponseBody responseBody = executeGETRequestSuccessfully(request).body()) {
if (null != responseBody) {
final ObjectNode parsedObject = (ObjectNode) MAPPER.readTree(responseBody.string());
return Connector.State.valueOf(parsedObject.get("connector").get("state").asText());
@ -304,7 +304,7 @@ public Connector.State getConnectorState(String connectorName) {
public Connector.State getConnectorTaskState(String connectorName, int taskNumber) {
final Request request = new Request.Builder().url(getConnectorStatusUri(connectorName)).get().build();
try (final ResponseBody responseBody = executeGETRequestSuccessfully(request).body()) {
try (ResponseBody responseBody = executeGETRequestSuccessfully(request).body()) {
if (null != responseBody) {
final ObjectNode parsedObject = (ObjectNode) MAPPER.readTree(responseBody.string());
final JsonNode tasksNode = parsedObject.get("tasks").get(taskNumber);
@ -325,7 +325,7 @@ public Connector.State getConnectorTaskState(String connectorName, int taskNumbe
public String getConnectorConfigProperty(String connectorName, String configPropertyName) {
final Request request = new Request.Builder().url(getConnectorConfigUri(connectorName)).get().build();
try (final ResponseBody responseBody = executeGETRequestSuccessfully(request).body()) {
try (ResponseBody responseBody = executeGETRequestSuccessfully(request).body()) {
if (null != responseBody) {
final ObjectNode parsedObject = (ObjectNode) MAPPER.readTree(responseBody.string());
return parsedObject.get(configPropertyName).asText();