DBZ-2495 Misc. fixes
This commit is contained in:
parent
b0a4a8b5ba
commit
b792ac3274
@ -7,8 +7,8 @@
|
|||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
|
import io.debezium.annotation.Immutable;
|
||||||
import io.debezium.relational.ChangeTable;
|
import io.debezium.relational.ChangeTable;
|
||||||
import io.debezium.relational.TableId;
|
import io.debezium.relational.TableId;
|
||||||
|
|
||||||
@ -35,8 +35,9 @@ public class SqlServerChangeTable extends ChangeTable {
|
|||||||
private Lsn stopLsn;
|
private Lsn stopLsn;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List of columns captured by the cdc table.
|
* List of columns captured by the CDC table.
|
||||||
*/
|
*/
|
||||||
|
@Immutable
|
||||||
private final List<String> capturedColumns;
|
private final List<String> capturedColumns;
|
||||||
|
|
||||||
public SqlServerChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn,
|
public SqlServerChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn,
|
||||||
@ -44,7 +45,7 @@ public SqlServerChangeTable(TableId sourceTableId, String captureInstance, int c
|
|||||||
super(captureInstance, sourceTableId, resolveChangeTableId(sourceTableId, captureInstance), changeTableObjectId);
|
super(captureInstance, sourceTableId, resolveChangeTableId(sourceTableId, captureInstance), changeTableObjectId);
|
||||||
this.startLsn = startLsn;
|
this.startLsn = startLsn;
|
||||||
this.stopLsn = stopLsn;
|
this.stopLsn = stopLsn;
|
||||||
this.capturedColumns = Optional.ofNullable(capturedColumns).orElse(Collections.emptyList());
|
this.capturedColumns = capturedColumns != null ? Collections.unmodifiableList(capturedColumns) : Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
public SqlServerChangeTable(String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) {
|
public SqlServerChangeTable(String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) {
|
||||||
|
@ -276,8 +276,4 @@ public Table create() {
|
|||||||
});
|
});
|
||||||
return new TableImpl(id, columns, primaryKeyColumnNames(), defaultCharsetName);
|
return new TableImpl(id, columns, primaryKeyColumnNames(), defaultCharsetName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public LinkedHashMap<String, Column> getSortedColumns() {
|
|
||||||
return sortedColumns;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import javax.management.InstanceNotFoundException;
|
||||||
import javax.management.MBeanServer;
|
import javax.management.MBeanServer;
|
||||||
import javax.management.MalformedObjectNameException;
|
import javax.management.MalformedObjectNameException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
@ -875,6 +876,7 @@ public static void waitForSnapshotToBeCompleted(String connector, String server)
|
|||||||
.alias("Streaming was not started on time")
|
.alias("Streaming was not started on time")
|
||||||
.pollInterval(100, TimeUnit.MILLISECONDS)
|
.pollInterval(100, TimeUnit.MILLISECONDS)
|
||||||
.atMost(60, TimeUnit.SECONDS)
|
.atMost(60, TimeUnit.SECONDS)
|
||||||
|
.ignoreException(InstanceNotFoundException.class)
|
||||||
.until(() -> {
|
.until(() -> {
|
||||||
boolean snapshotCompleted = (boolean) mbeanServer
|
boolean snapshotCompleted = (boolean) mbeanServer
|
||||||
.getAttribute(getSnapshotMetricsObjectName(connector, server), "SnapshotCompleted");
|
.getAttribute(getSnapshotMetricsObjectName(connector, server), "SnapshotCompleted");
|
||||||
@ -894,6 +896,7 @@ public static void waitForStreamingRunning(String connector, String server, Stri
|
|||||||
.alias("Streaming was not started on time")
|
.alias("Streaming was not started on time")
|
||||||
.pollInterval(100, TimeUnit.MILLISECONDS)
|
.pollInterval(100, TimeUnit.MILLISECONDS)
|
||||||
.atMost(60, TimeUnit.SECONDS)
|
.atMost(60, TimeUnit.SECONDS)
|
||||||
|
.ignoreException(InstanceNotFoundException.class)
|
||||||
.until(() -> {
|
.until(() -> {
|
||||||
boolean connected = (boolean) mbeanServer
|
boolean connected = (boolean) mbeanServer
|
||||||
.getAttribute(getStreamingMetricsObjectName(connector, server, contextName), "Connected");
|
.getAttribute(getStreamingMetricsObjectName(connector, server, contextName), "Connected");
|
||||||
|
Loading…
Reference in New Issue
Block a user