DBZ-6416 Improve tests stability

This commit is contained in:
mfvitale 2023-06-20 12:25:02 +02:00 committed by Jiri Pechanec
parent 34e28ac52d
commit 9ed928a1c9
11 changed files with 124 additions and 21 deletions

View File

@ -102,6 +102,16 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
.with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, SchemaNameAdjustmentMode.AVRO);
}
@Override
protected String connector() {
return "mysql";
}
@Override
protected String server() {
return DATABASE.getServerName();
}
@Override
protected Class<MySqlConnector> connectorClass() {
return MySqlConnector.class;

View File

@ -185,6 +185,16 @@ protected int defaultIncrementalSnapshotChunkSize() {
return 250;
}
@Override
protected String connector() {
return "oracle";
}
@Override
protected String server() {
return TestHelper.SERVER_NAME;
}
@Test
public void snapshotPreceededBySchemaChange() throws Exception {
// TODO: remove once https://github.com/Apicurio/apicurio-registry/issues/2980 is fixed

View File

@ -180,6 +180,16 @@ protected int defaultIncrementalSnapshotChunkSize() {
return 250;
}
@Override
protected String connector() {
return "oracle";
}
@Override
protected String server() {
return TestHelper.SERVER_NAME;
}
@Test
public void snapshotPreceededBySchemaChange() throws Exception {
// TODO: remove once https://github.com/Apicurio/apicurio-registry/issues/2980 is fixed

View File

@ -167,6 +167,16 @@ protected void waitForConnectorToStart() {
TestHelper.waitForDefaultReplicationSlotBeActive();
}
@Override
protected String connector() {
return "postgres";
}
@Override
protected String server() {
return TestHelper.TEST_SERVER;
}
@Test
public void inserts4Pks() throws Exception {
// Testing.Print.enable();

View File

@ -178,4 +178,24 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur
protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
TestHelper.waitForCdcTransactionPropagation(connection, TestHelper.TEST_DATABASE_1, expectedTransactions);
}
@Override
protected String connector() {
return "sql_server";
}
@Override
protected String server() {
return TestHelper.TEST_SERVER_NAME;
}
@Override
protected String task() {
return "0";
}
@Override
protected String database() {
return TestHelper.TEST_DATABASE_1;
}
}

View File

@ -122,6 +122,26 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl);
}
@Override
protected String connector() {
return "sql_server";
}
@Override
protected String server() {
return TestHelper.TEST_SERVER_NAME;
}
@Override
protected String task() {
return "0";
}
@Override
protected String database() {
return TestHelper.TEST_DATABASE_1;
}
@Override
protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
TestHelper.waitForCdcTransactionPropagation(connection, TestHelper.TEST_DATABASE_1, expectedTransactions);

View File

@ -11,9 +11,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.DataException;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.source.snapshot.incremental.DataCollection;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@ -36,6 +34,8 @@ public class IncrementalSnapshotNotificationService<P extends Partition, O exten
private final NotificationService<P, O> notificationService;
private final CommonConnectorConfig connectorConfig;
public enum TableScanCompletionStatus {
EMPTY,
NO_PRIMARY_KEY,
@ -45,8 +45,9 @@ public enum TableScanCompletionStatus {
UNKNOWN_SCHEMA
}
public IncrementalSnapshotNotificationService(NotificationService<P, O> notificationService) {
public IncrementalSnapshotNotificationService(NotificationService<P, O> notificationService, CommonConnectorConfig config) {
this.notificationService = notificationService;
connectorConfig = config;
}
public <T extends DataCollectionId> void notifyStarted(IncrementalSnapshotContext<T> incrementalSnapshotContext, P partition, OffsetContext offsetContext) {
@ -139,14 +140,7 @@ private <T extends DataCollectionId> Notification buildNotificationWith(Incremen
Map<String, String> fullMap = new HashMap<>(additionalData);
String connectorName;
try {
connectorName = offsetContext.getSourceInfo().getString(BaseSourceInfo.SERVER_NAME_KEY);
}
catch (DataException e) {
connectorName = NONE;
}
fullMap.put(CONNECTOR_NAME, connectorName);
fullMap.put(CONNECTOR_NAME, connectorConfig.getLogicalName());
String id = incrementalSnapshotContext.getCorrelationId() != null ? incrementalSnapshotContext.getCorrelationId() : UUID.randomUUID().toString();
return Notification.Builder.builder()

View File

@ -48,7 +48,7 @@ public NotificationService(List<NotificationChannel> notificationChannels,
.filter(isConnectChannel())
.forEach(channel -> ((ConnectChannel) channel).initConnectChannel(schemaFactory, consumer));
incrementalSnapshotNotificationService = new IncrementalSnapshotNotificationService<>(this);
incrementalSnapshotNotificationService = new IncrementalSnapshotNotificationService<>(this, config);
initialSnapshotNotificationService = new InitialSnapshotNotificationService<>(this, config);
}

View File

@ -8,7 +8,6 @@
import static io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.TableScanCompletionStatus.SUCCEEDED;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -16,7 +15,6 @@
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.data.Struct;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -24,6 +22,7 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.source.snapshot.incremental.DataCollection;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@ -42,14 +41,15 @@ public class IncrementalSnapshotNotificationServiceTest {
private NotificationService<Partition, OffsetContext> notificationService;
@Mock
private IncrementalSnapshotContext<TableId> incrementalSnapshotContext;
@Mock
private CommonConnectorConfig connectorConfig;
@InjectMocks
private IncrementalSnapshotNotificationService<Partition, OffsetContext> incrementalSnapshotNotificationService;
@Before
public void setUp() {
Struct mockedSourceStruct = mock(Struct.class);
when(mockedSourceStruct.getString("name")).thenReturn("connector-test");
when(offsetContext.getSourceInfo()).thenReturn(mockedSourceStruct);
when(connectorConfig.getLogicalName()).thenReturn("connector-test");
when(incrementalSnapshotContext.getCorrelationId()).thenReturn("12345");
when(incrementalSnapshotContext.getDataCollections()).thenReturn(List.of(
new DataCollection<>(new TableId("db", "inventory", "product")),

View File

@ -1242,12 +1242,16 @@ public static void waitForStreamingRunning(String connector, String server) thro
}
public static void waitForStreamingRunning(String connector, String server, String contextName) {
waitForStreamingRunning(connector, server, contextName, null);
}
public static void waitForStreamingRunning(String connector, String server, String contextName, String task) {
Awaitility.await()
.alias("Streaming was not started on time")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> isStreamingRunning(connector, server, contextName));
.until(() -> isStreamingRunning(connector, server, contextName, task));
}
public static void waitForConnectorShutdown(String connector, String server) {
@ -1258,14 +1262,20 @@ public static void waitForConnectorShutdown(String connector, String server) {
}
public static boolean isStreamingRunning(String connector, String server) {
return isStreamingRunning(connector, server, getStreamingNamespace());
return isStreamingRunning(connector, server, getStreamingNamespace(), null);
}
public static boolean isStreamingRunning(String connector, String server, String contextName) {
return isStreamingRunning(connector, server, contextName, null);
}
public static boolean isStreamingRunning(String connector, String server, String contextName, String task) {
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
try {
return (boolean) mbeanServer.getAttribute(getStreamingMetricsObjectName(connector, server, contextName), "Connected");
ObjectName streamingMetricsObjectName = task != null ? getStreamingMetricsObjectName(connector, server, contextName, task)
: getStreamingMetricsObjectName(connector, server, contextName);
return (boolean) mbeanServer.getAttribute(streamingMetricsObjectName, "Connected");
}
catch (JMException ignored) {
}
@ -1301,6 +1311,10 @@ public static ObjectName getStreamingMetricsObjectName(String connector, String
return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server);
}
public static ObjectName getStreamingMetricsObjectName(String connector, String server, String context, String task) throws MalformedObjectNameException {
return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server + ",task=" + task);
}
protected static String getStreamingNamespace() {
return System.getProperty("test.streaming.metrics.namespace", "streaming");
}

View File

@ -88,6 +88,18 @@ protected String signalTableNameSanitized() {
protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl);
protected abstract String connector();
protected abstract String server();
protected String task() {
return null;
}
protected String database() {
return null;
}
protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
}
@ -1094,6 +1106,7 @@ public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception {
@Test
public void testNotification() throws Exception {
populateTable();
startConnector(x -> x.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink")
.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, defaultIncrementalSnapshotChunkSize())
@ -1103,6 +1116,8 @@ public void testNotification() throws Exception {
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
sendAdHocSnapshotSignal();
List<SourceRecord> records = new ArrayList<>();