DBZ-7902: Addressed review comments

This commit is contained in:
rajdangwal 2024-05-31 14:25:38 +05:30 committed by Jiri Pechanec
parent e390275d91
commit 889a18f9dc
2 changed files with 74 additions and 4 deletions

View File

@ -19,9 +19,6 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,6 +26,7 @@
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
@ -54,7 +52,9 @@
import io.debezium.schema.DatabaseSchema;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import io.debezium.util.Threads;
/**
@ -208,8 +208,11 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
* Delays streaming execution as per the {@link CommonConnectorConfig#STREAMING_DELAY_MS} parameter.
*/
protected void delayStreamingIfNeeded(ChangeEventSourceContext context) throws InterruptedException {
Duration streamingDelay = connectorConfig.getStreamingDelay();
if (snapshotterService != null && !snapshotterService.getSnapshotter().shouldStream()) {
return;
}
Duration streamingDelay = connectorConfig.getStreamingDelay();
if (streamingDelay.isZero() || streamingDelay.isNegative()) {
return;
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import org.apache.kafka.connect.source.SourceConnector;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.snapshot.Snapshotter;
public class ChangeEventSourceCoordinatorTest {
SnapshotterService snapshotterService;
Snapshotter snapshotter;
CommonConnectorConfig connectorConfig;
ChangeEventSourceCoordinator coordinator;
ChangeEventSource.ChangeEventSourceContext context;
@Before
public void before() {
snapshotterService = mock(SnapshotterService.class);
snapshotter = mock(Snapshotter.class);
connectorConfig = mock(CommonConnectorConfig.class);
when(connectorConfig.getLogicalName()).thenReturn("DummyConnector");
coordinator = new ChangeEventSourceCoordinator(null, null, SourceConnector.class, connectorConfig, null,
null, null, null, null, null, snapshotterService);
context = mock(ChangeEventSource.ChangeEventSourceContext.class);
}
@Test
public void testNotDelayStreamingIfSnapshotShouldNotStream() throws Exception {
when(snapshotterService.getSnapshotter()).thenReturn(snapshotter);
when(snapshotter.shouldStream()).thenReturn(false);
coordinator.delayStreamingIfNeeded(context);
verify(connectorConfig, never()).getStreamingDelay();
}
@Test
public void testDelayStreamingIfSnapshotShouldStream() throws Exception {
when(snapshotterService.getSnapshotter()).thenReturn(snapshotter);
when(snapshotter.shouldStream()).thenReturn(true);
when(connectorConfig.getStreamingDelay()).thenReturn(Duration.of(1, ChronoUnit.SECONDS));
when(context.isRunning()).thenReturn(true);
coordinator.delayStreamingIfNeeded(context);
verify(connectorConfig, times(1)).getStreamingDelay();
}
}