DBZ-8103 Fix tests

This commit is contained in:
Roman Kudryashov 2024-08-06 12:55:09 +03:00 committed by Jiri Pechanec
parent 851b2c7479
commit a2de933a1d

View File

@ -14,10 +14,8 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
@ -25,12 +23,9 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnector;
@ -50,19 +45,27 @@
*/
public class DecodeLogicalDecodingMessageContentTest extends AbstractConnectorTest {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLogicalDecodingMessageContentTest.class);
@Rule
public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule();
@BeforeClass
public static void beforeClass() throws SQLException {
TestHelper.dropAllSchemas();
}
private DecodeLogicalDecodingMessageContent<SourceRecord> decodeLogicalDecodingMessageContent;
@Before
public void before() {
initializeConnectorTestFramework();
public void beforeEach() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig();
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>();
decodeLogicalDecodingMessageContent.configure(Collections.emptyMap());
}
@After
public void afterEach() throws Exception {
stopConnector();
assertNoRecordsToConsume();
decodeLogicalDecodingMessageContent.close();
}
@After
@ -77,15 +80,6 @@ public void after() {
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Exception {
DecodeLogicalDecodingMessageContent<SourceRecord> decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>();
Map<String, String> smtConfig = new LinkedHashMap<>();
decodeLogicalDecodingMessageContent.configure(smtConfig);
Configuration.Builder configBuilder = TestHelper.defaultConfig();
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
// emit non transactional logical decoding message
TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '');");
@ -96,9 +90,6 @@ public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Ex
Exception exception = assertThrows(DataException.class, () -> decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0)));
assertThat(exception.getMessage()).isEqualTo("Conversion of logical decoding message content failed");
decodeLogicalDecodingMessageContent.close();
stopConnector();
}
@Test
@ -106,15 +97,6 @@ public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Ex
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
public void shouldConvertRecordWithNonTransactionalLogicalDecodingMessageWithEmptyContent() throws Exception {
DecodeLogicalDecodingMessageContent<SourceRecord> decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>();
Map<String, String> smtConfig = new LinkedHashMap<>();
decodeLogicalDecodingMessageContent.configure(smtConfig);
Configuration.Builder configBuilder = TestHelper.defaultConfig();
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
// emit non transactional logical decoding message
TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '{}');");
@ -140,9 +122,6 @@ public void shouldConvertRecordWithNonTransactionalLogicalDecodingMessageWithEmp
assertEquals(0, after.schema().fields().size());
List<Field> recordValueSchemaFields = value.schema().fields();
assertTrue(recordValueSchemaFields.stream().noneMatch(f -> f.name().equals("message")));
decodeLogicalDecodingMessageContent.close();
stopConnector();
}
@Test
@ -150,15 +129,6 @@ public void shouldConvertRecordWithNonTransactionalLogicalDecodingMessageWithEmp
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
public void shouldConvertRecordWithTransactionalLogicalDecodingMessageWithContent() throws Exception {
DecodeLogicalDecodingMessageContent<SourceRecord> decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>();
Map<String, String> smtConfig = new LinkedHashMap<>();
decodeLogicalDecodingMessageContent.configure(smtConfig);
Configuration.Builder configBuilder = TestHelper.defaultConfig();
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
// emit transactional logical decoding message
TestHelper.execute("SELECT pg_logical_emit_message(true, 'foo', '{\"bar\": \"baz\", \"qux\": 9703}');");
@ -186,8 +156,5 @@ public void shouldConvertRecordWithTransactionalLogicalDecodingMessageWithConten
assertEquals(9703, after.get("qux"));
List<Field> recordValueSchemaFields = value.schema().fields();
assertTrue(recordValueSchemaFields.stream().noneMatch(f -> f.name().equals("message")));
decodeLogicalDecodingMessageContent.close();
stopConnector();
}
}