DBZ-1648 outbox EventRouter remove topic toLowerCase
- kafka topics names are case sensitive. We should not perform a toLowerCase on the topic name. It can cause some issue if topic name have a upper case
This commit is contained in:
parent
d0e3c8bf90
commit
17e44d4c0c
@ -129,7 +129,7 @@ public void shouldConsumeRecordsFromInsert() throws Exception {
|
|||||||
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
|
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
|
||||||
|
|
||||||
assertThat(routedEvent).isNotNull();
|
assertThat(routedEvent).isNotNull();
|
||||||
assertThat(routedEvent.topic()).isEqualTo("outbox.event.user");
|
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
|
||||||
|
|
||||||
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
|
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
|
||||||
assertThat(valueStruct.getString("eventType")).isEqualTo("UserCreated");
|
assertThat(valueStruct.getString("eventType")).isEqualTo("UserCreated");
|
||||||
@ -163,7 +163,7 @@ public void shouldSendEventTypeAsHeader() throws Exception {
|
|||||||
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
|
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
|
||||||
|
|
||||||
assertThat(routedEvent).isNotNull();
|
assertThat(routedEvent).isNotNull();
|
||||||
assertThat(routedEvent.topic()).isEqualTo("outbox.event.user");
|
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
|
||||||
|
|
||||||
Object value = routedEvent.value();
|
Object value = routedEvent.value();
|
||||||
assertThat(routedEvent.headers().lastWithName("eventType").value()).isEqualTo("UserCreated");
|
assertThat(routedEvent.headers().lastWithName("eventType").value()).isEqualTo("UserCreated");
|
||||||
@ -246,7 +246,7 @@ public void shouldSupportAllFeatures() throws Exception {
|
|||||||
assertConnectSchemasAreEqual(null, eventRouted.valueSchema(), expectedSchema);
|
assertConnectSchemasAreEqual(null, eventRouted.valueSchema(), expectedSchema);
|
||||||
|
|
||||||
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
|
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
|
||||||
assertThat(eventRouted.topic()).isEqualTo("outbox.event.useremail");
|
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
|
||||||
|
|
||||||
// Validate headers
|
// Validate headers
|
||||||
Headers headers = eventRouted.headers();
|
Headers headers = eventRouted.headers();
|
||||||
@ -311,7 +311,7 @@ public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {
|
|||||||
// Validate metadata
|
// Validate metadata
|
||||||
assertThat(eventRouted.valueSchema()).isNotNull();
|
assertThat(eventRouted.valueSchema()).isNotNull();
|
||||||
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
|
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
|
||||||
assertThat(eventRouted.topic()).isEqualTo("outbox.event.useremail");
|
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
|
||||||
|
|
||||||
// Validate headers
|
// Validate headers
|
||||||
Headers headers = eventRouted.headers();
|
Headers headers = eventRouted.headers();
|
||||||
@ -374,7 +374,7 @@ public void shouldProduceTombstoneEventForNullPayload() throws Exception {
|
|||||||
// Validate metadata
|
// Validate metadata
|
||||||
assertThat(eventRouted.valueSchema()).isNull();
|
assertThat(eventRouted.valueSchema()).isNull();
|
||||||
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
|
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
|
||||||
assertThat(eventRouted.topic()).isEqualTo("outbox.event.useremail");
|
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
|
||||||
|
|
||||||
// Validate headers
|
// Validate headers
|
||||||
Headers headers = eventRouted.headers();
|
Headers headers = eventRouted.headers();
|
||||||
@ -422,7 +422,7 @@ public void shouldProduceTombstoneEventForEmptyPayload() throws Exception {
|
|||||||
|
|
||||||
// Validate metadata
|
// Validate metadata
|
||||||
assertThat(eventRouted.valueSchema()).isNull();
|
assertThat(eventRouted.valueSchema()).isNull();
|
||||||
assertThat(eventRouted.topic()).isEqualTo("outbox.event.useremail");
|
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
|
||||||
|
|
||||||
// Validate headers
|
// Validate headers
|
||||||
Headers headers = eventRouted.headers();
|
Headers headers = eventRouted.headers();
|
||||||
|
@ -165,7 +165,7 @@ else if (onlyHeadersInOutputMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
R newRecord = r.newRecord(
|
R newRecord = r.newRecord(
|
||||||
eventStruct.getString(routeByField).toLowerCase(),
|
eventStruct.getString(routeByField),
|
||||||
null,
|
null,
|
||||||
Schema.STRING_SCHEMA,
|
Schema.STRING_SCHEMA,
|
||||||
defineRecordKey(eventStruct, payloadId),
|
defineRecordKey(eventStruct, payloadId),
|
||||||
|
@ -376,7 +376,7 @@ public void canRouteBasedOnField() {
|
|||||||
final SourceRecord userEventRouted = router.apply(userEventRecord);
|
final SourceRecord userEventRouted = router.apply(userEventRecord);
|
||||||
|
|
||||||
assertThat(userEventRouted).isNotNull();
|
assertThat(userEventRouted).isNotNull();
|
||||||
assertThat(userEventRouted.topic()).isEqualTo("outbox.event.user");
|
assertThat(userEventRouted.topic()).isEqualTo("outbox.event.User");
|
||||||
|
|
||||||
final SourceRecord userUpdatedEventRecord = createEventRecord(
|
final SourceRecord userUpdatedEventRecord = createEventRecord(
|
||||||
"ab720dd3-176d-40a6-96f3-6cf961d7df6a",
|
"ab720dd3-176d-40a6-96f3-6cf961d7df6a",
|
||||||
@ -387,7 +387,7 @@ public void canRouteBasedOnField() {
|
|||||||
final SourceRecord userUpdatedEventRouted = router.apply(userUpdatedEventRecord);
|
final SourceRecord userUpdatedEventRouted = router.apply(userUpdatedEventRecord);
|
||||||
|
|
||||||
assertThat(userUpdatedEventRouted).isNotNull();
|
assertThat(userUpdatedEventRouted).isNotNull();
|
||||||
assertThat(userUpdatedEventRouted.topic()).isEqualTo("outbox.event.user");
|
assertThat(userUpdatedEventRouted.topic()).isEqualTo("outbox.event.User");
|
||||||
|
|
||||||
final SourceRecord addressCreatedEventRecord = createEventRecord(
|
final SourceRecord addressCreatedEventRecord = createEventRecord(
|
||||||
"ab720dd3-176d-40a6-96f3-6cf961d7df6a",
|
"ab720dd3-176d-40a6-96f3-6cf961d7df6a",
|
||||||
@ -398,7 +398,7 @@ public void canRouteBasedOnField() {
|
|||||||
final SourceRecord addressCreatedEventRouted = router.apply(addressCreatedEventRecord);
|
final SourceRecord addressCreatedEventRouted = router.apply(addressCreatedEventRecord);
|
||||||
|
|
||||||
assertThat(addressCreatedEventRouted).isNotNull();
|
assertThat(addressCreatedEventRouted).isNotNull();
|
||||||
assertThat(addressCreatedEventRouted.topic()).isEqualTo("outbox.event.address");
|
assertThat(addressCreatedEventRouted.topic()).isEqualTo("outbox.event.Address");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user