DBZ-7596 Improved tests for reselect post processor
This commit is contained in:
parent
0e267d8ef6
commit
200f9ed28e
@ -31,6 +31,7 @@
|
|||||||
import io.debezium.data.VerifyRecord;
|
import io.debezium.data.VerifyRecord;
|
||||||
import io.debezium.doc.FixFor;
|
import io.debezium.doc.FixFor;
|
||||||
import io.debezium.jdbc.JdbcConnection;
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
|
import io.debezium.junit.logging.LogInterceptor;
|
||||||
import io.debezium.processors.AbstractReselectProcessorTest;
|
import io.debezium.processors.AbstractReselectProcessorTest;
|
||||||
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
|
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
|
||||||
|
|
||||||
@ -134,6 +135,8 @@ protected String fieldName(String fieldName) {
|
|||||||
public void testColumnReselectionUsesPrimaryKeyColumnAndValuesDespiteMessageKeyColumnConfigs() throws Exception {
|
public void testColumnReselectionUsesPrimaryKeyColumnAndValuesDespiteMessageKeyColumnConfigs() throws Exception {
|
||||||
TestHelper.dropTable(connection, "dbz7729");
|
TestHelper.dropTable(connection, "dbz7729");
|
||||||
try {
|
try {
|
||||||
|
final LogInterceptor logInterceptor = getReselectLogInterceptor();
|
||||||
|
|
||||||
connection.execute("CREATE TABLE dbz7729 (id numeric(9,0) primary key, data clob, data2 numeric(9,0), data3 varchar2(25))");
|
connection.execute("CREATE TABLE dbz7729 (id numeric(9,0) primary key, data clob, data2 numeric(9,0), data3 varchar2(25))");
|
||||||
TestHelper.streamTable(connection, "dbz7729");
|
TestHelper.streamTable(connection, "dbz7729");
|
||||||
|
|
||||||
@ -176,6 +179,8 @@ public void testColumnReselectionUsesPrimaryKeyColumnAndValuesDespiteMessageKeyC
|
|||||||
assertThat(after.get("DATA")).isEqualTo(clobData);
|
assertThat(after.get("DATA")).isEqualTo(clobData);
|
||||||
assertThat(after.get("DATA2")).isEqualTo(10);
|
assertThat(after.get("DATA2")).isEqualTo(10);
|
||||||
assertThat(after.get("DATA3")).isEqualTo("A");
|
assertThat(after.get("DATA3")).isEqualTo("A");
|
||||||
|
|
||||||
|
assertColumnReselectedForUnavailableValue(logInterceptor, TestHelper.getDatabaseName() + ".DEBEZIUM.DBZ7729", "DATA");
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
TestHelper.dropTable(connection, "dbz7729");
|
TestHelper.dropTable(connection, "dbz7729");
|
||||||
@ -188,6 +193,8 @@ public void testColumnReselectionUsesPrimaryKeyColumnAndValuesDespiteMessageKeyC
|
|||||||
public void testClobReselectedWhenValueIsUnavailable() throws Exception {
|
public void testClobReselectedWhenValueIsUnavailable() throws Exception {
|
||||||
TestHelper.dropTable(connection, "dbz4321");
|
TestHelper.dropTable(connection, "dbz4321");
|
||||||
try {
|
try {
|
||||||
|
final LogInterceptor logInterceptor = getReselectLogInterceptor();
|
||||||
|
|
||||||
connection.execute("CREATE TABLE dbz4321 (id numeric(9,0) primary key, data clob, data2 numeric(9,0))");
|
connection.execute("CREATE TABLE dbz4321 (id numeric(9,0) primary key, data clob, data2 numeric(9,0))");
|
||||||
TestHelper.streamTable(connection, "dbz4321");
|
TestHelper.streamTable(connection, "dbz4321");
|
||||||
|
|
||||||
@ -222,6 +229,8 @@ public void testClobReselectedWhenValueIsUnavailable() throws Exception {
|
|||||||
assertThat(after.get("ID")).isEqualTo(1);
|
assertThat(after.get("ID")).isEqualTo(1);
|
||||||
assertThat(after.get("DATA")).isEqualTo(clobData);
|
assertThat(after.get("DATA")).isEqualTo(clobData);
|
||||||
assertThat(after.get("DATA2")).isEqualTo(10);
|
assertThat(after.get("DATA2")).isEqualTo(10);
|
||||||
|
|
||||||
|
assertColumnReselectedForUnavailableValue(logInterceptor, TestHelper.getDatabaseName() + ".DEBEZIUM.DBZ4321", "DATA");
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
TestHelper.dropTable(connection, "dbz4321");
|
TestHelper.dropTable(connection, "dbz4321");
|
||||||
@ -234,6 +243,8 @@ public void testClobReselectedWhenValueIsUnavailable() throws Exception {
|
|||||||
public void testBlobReselectedWhenValueIsUnavailable() throws Exception {
|
public void testBlobReselectedWhenValueIsUnavailable() throws Exception {
|
||||||
TestHelper.dropTable(connection, "dbz4321");
|
TestHelper.dropTable(connection, "dbz4321");
|
||||||
try {
|
try {
|
||||||
|
final LogInterceptor logInterceptor = getReselectLogInterceptor();
|
||||||
|
|
||||||
connection.execute("CREATE TABLE dbz4321 (id numeric(9,0) primary key, data blob, data2 numeric(9,0))");
|
connection.execute("CREATE TABLE dbz4321 (id numeric(9,0) primary key, data blob, data2 numeric(9,0))");
|
||||||
TestHelper.streamTable(connection, "dbz4321");
|
TestHelper.streamTable(connection, "dbz4321");
|
||||||
|
|
||||||
@ -268,6 +279,8 @@ public void testBlobReselectedWhenValueIsUnavailable() throws Exception {
|
|||||||
assertThat(after.get("ID")).isEqualTo(1);
|
assertThat(after.get("ID")).isEqualTo(1);
|
||||||
assertThat(after.get("DATA")).isEqualTo(ByteBuffer.wrap(blobData));
|
assertThat(after.get("DATA")).isEqualTo(ByteBuffer.wrap(blobData));
|
||||||
assertThat(after.get("DATA2")).isEqualTo(10);
|
assertThat(after.get("DATA2")).isEqualTo(10);
|
||||||
|
|
||||||
|
assertColumnReselectedForUnavailableValue(logInterceptor, TestHelper.getDatabaseName() + ".DEBEZIUM.DBZ4321", "DATA");
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
TestHelper.dropTable(connection, "dbz4321");
|
TestHelper.dropTable(connection, "dbz4321");
|
||||||
|
@ -72,8 +72,7 @@ public void afterEach() throws Exception {
|
|||||||
@FixFor("DBZ-4321")
|
@FixFor("DBZ-4321")
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
public void testNoColumnsReselectedWhenNullAndUnavailableColumnsAreDisabled() throws Exception {
|
public void testNoColumnsReselectedWhenNullAndUnavailableColumnsAreDisabled() throws Exception {
|
||||||
LogInterceptor interceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
|
LogInterceptor interceptor = getReselectLogInterceptor();
|
||||||
interceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
|
|
||||||
|
|
||||||
databaseConnection().execute(getInsertWithNullValue());
|
databaseConnection().execute(getInsertWithNullValue());
|
||||||
|
|
||||||
@ -108,8 +107,7 @@ public void testNoColumnsReselectedWhenNullAndUnavailableColumnsAreDisabled() th
|
|||||||
@FixFor("DBZ-4321")
|
@FixFor("DBZ-4321")
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
public void testNoColumnsReselectedWhenNotNullSnapshot() throws Exception {
|
public void testNoColumnsReselectedWhenNotNullSnapshot() throws Exception {
|
||||||
LogInterceptor interceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
|
LogInterceptor interceptor = getReselectLogInterceptor();
|
||||||
interceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
|
|
||||||
|
|
||||||
databaseConnection().execute(getInsertWithValue());
|
databaseConnection().execute(getInsertWithValue());
|
||||||
|
|
||||||
@ -144,8 +142,7 @@ public void testNoColumnsReselectedWhenNotNullSnapshot() throws Exception {
|
|||||||
public void testNoColumnsReselectedWhenNotNullStreaming() throws Exception {
|
public void testNoColumnsReselectedWhenNotNullStreaming() throws Exception {
|
||||||
enableTableForCdc();
|
enableTableForCdc();
|
||||||
|
|
||||||
LogInterceptor interceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
|
LogInterceptor interceptor = getReselectLogInterceptor();
|
||||||
interceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
|
|
||||||
|
|
||||||
Configuration config = getConfigurationBuilder()
|
Configuration config = getConfigurationBuilder()
|
||||||
.with("reselector.reselect.columns.include.list", reselectColumnsList())
|
.with("reselector.reselect.columns.include.list", reselectColumnsList())
|
||||||
|
Loading…
Reference in New Issue
Block a user