diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java index 33a641524..996a9fb27 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java @@ -14,10 +14,8 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import java.sql.SQLException; -import java.util.LinkedHashMap; +import java.util.Collections; import java.util.List; -import java.util.Map; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; @@ -25,12 +23,9 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestRule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnector; @@ -50,19 +45,27 @@ */ public class DecodeLogicalDecodingMessageContentTest extends AbstractConnectorTest { - private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLogicalDecodingMessageContentTest.class); - @Rule public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule(); - @BeforeClass - public static void beforeClass() throws SQLException { - TestHelper.dropAllSchemas(); - } + private DecodeLogicalDecodingMessageContent decodeLogicalDecodingMessageContent; @Before - public void before() { - initializeConnectorTestFramework(); + public void beforeEach() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig(); + start(PostgresConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); + + decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>(); + decodeLogicalDecodingMessageContent.configure(Collections.emptyMap()); + } + + @After + public void afterEach() throws Exception { + stopConnector(); + assertNoRecordsToConsume(); + decodeLogicalDecodingMessageContent.close(); } @After @@ -77,15 +80,6 @@ public void after() { @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput") @SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14") public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Exception { - DecodeLogicalDecodingMessageContent decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>(); - Map smtConfig = new LinkedHashMap<>(); - decodeLogicalDecodingMessageContent.configure(smtConfig); - - Configuration.Builder configBuilder = TestHelper.defaultConfig(); - start(PostgresConnector.class, configBuilder.build()); - assertConnectorIsRunning(); - waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); - // emit non transactional logical decoding message TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '');"); @@ -96,9 +90,6 @@ public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Ex Exception exception = assertThrows(DataException.class, () -> decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0))); assertThat(exception.getMessage()).isEqualTo("Conversion of logical decoding message content failed"); - - decodeLogicalDecodingMessageContent.close(); - stopConnector(); } @Test @@ -106,15 +97,6 @@ public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Ex @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput") @SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14") public void shouldConvertRecordWithNonTransactionalLogicalDecodingMessageWithEmptyContent() throws Exception { - DecodeLogicalDecodingMessageContent decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>(); - Map smtConfig = new LinkedHashMap<>(); - decodeLogicalDecodingMessageContent.configure(smtConfig); - - Configuration.Builder configBuilder = TestHelper.defaultConfig(); - start(PostgresConnector.class, configBuilder.build()); - assertConnectorIsRunning(); - waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); - // emit non transactional logical decoding message TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '{}');"); @@ -140,9 +122,6 @@ public void shouldConvertRecordWithNonTransactionalLogicalDecodingMessageWithEmp assertEquals(0, after.schema().fields().size()); List recordValueSchemaFields = value.schema().fields(); assertTrue(recordValueSchemaFields.stream().noneMatch(f -> f.name().equals("message"))); - - decodeLogicalDecodingMessageContent.close(); - stopConnector(); } @Test @@ -150,15 +129,6 @@ public void shouldConvertRecordWithNonTransactionalLogicalDecodingMessageWithEmp @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput") @SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14") public void shouldConvertRecordWithTransactionalLogicalDecodingMessageWithContent() throws Exception { - DecodeLogicalDecodingMessageContent decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>(); - Map smtConfig = new LinkedHashMap<>(); - decodeLogicalDecodingMessageContent.configure(smtConfig); - - Configuration.Builder configBuilder = TestHelper.defaultConfig(); - start(PostgresConnector.class, configBuilder.build()); - assertConnectorIsRunning(); - waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); - // emit transactional logical decoding message TestHelper.execute("SELECT pg_logical_emit_message(true, 'foo', '{\"bar\": \"baz\", \"qux\": 9703}');"); @@ -186,8 +156,5 @@ public void shouldConvertRecordWithTransactionalLogicalDecodingMessageWithConten assertEquals(9703, after.get("qux")); List recordValueSchemaFields = value.schema().fields(); assertTrue(recordValueSchemaFields.stream().noneMatch(f -> f.name().equals("message"))); - - decodeLogicalDecodingMessageContent.close(); - stopConnector(); } }