-
Notifications
You must be signed in to change notification settings - Fork 293
Aruha 473 check size of events #515
Changes from 5 commits
9dabfa2
cc50d0c
761d455
146d6a0
8849782
a061392
0a791da
df5b675
e261cb8
217be7f
ddd7a4f
957938b
49c1846
cc15d1a
3f9bf87
6f7a450
f315836
8871dea
f5e6367
d5e6f3d
12253ed
30da84a
7287c40
d8af912
0fbaa46
20aed22
b9c36f8
33bbc03
f6fcfe9
f90e477
43a337a
bf90d6e
0330145
4cc0bcd
30db7f5
781db1e
ffd729a
e3a4562
58cb450
3f2a2ac
57e67d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package org.zalando.nakadi.webservice; | ||
|
||
import com.jayway.restassured.http.ContentType; | ||
import com.jayway.restassured.response.Response; | ||
import org.apache.http.HttpStatus; | ||
import org.junit.Test; | ||
import org.zalando.nakadi.domain.EventType; | ||
import org.zalando.nakadi.webservice.utils.NakadiTestUtils; | ||
|
||
import java.text.MessageFormat; | ||
|
||
import static com.jayway.restassured.RestAssured.given; | ||
|
||
public class EventPublishingAT extends BaseAT { | ||
|
||
@Test | ||
public void whenPublishingEventTooLargeThen422() throws Exception { | ||
final EventType eventType = NakadiTestUtils.createEventType(); | ||
|
||
publishLargeEvent(eventType) | ||
.then() | ||
.statusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY); | ||
} | ||
|
||
private Response publishLargeEvent(final EventType eventType) { | ||
final StringBuilder sb = new StringBuilder(); | ||
sb.append("[{\"blah\":\""); | ||
for (int i = 0; i < 1000010; i++) { | ||
sb.append("a"); | ||
} | ||
sb.append("\"}]"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can simply do: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
return given() | ||
.body(sb.toString()) | ||
.contentType(ContentType.JSON) | ||
.post(MessageFormat.format("/event-types/{0}/events", eventType.getName())); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ public enum EventPublishingStep { | |
NONE, | ||
VALIDATING, | ||
ENRICHING, | ||
VALIDATING_SIZE, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately you can't change this. Because this enum is defined in our API and if you add values here that will mean that we add backwards incompatible changes. Why do you want a separate step for that? I thought it can be part of VALIDATING step. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we make it part of VALIDATING, then we will check the size of the event before enriching. I think it makes more sense to check the size after enriching, since the enrichment strategy is chose by the user who created the event type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I could either check the size during validation, in which case we ignore whatever is added by the enrichment step; or, go back to validation after enriching, but that may be very confusing to users There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the simplest way to go would be to assume that enrichment and metadata that is added by kafka will not add more data than the difference between nakadi limit (1kk bytes) and kafka limit (I think we wanted to set 2kk bytes) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
PARTITIONING, | ||
PUBLISHING, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package org.zalando.nakadi.exceptions; | ||
|
||
import org.zalando.nakadi.validation.ValidationError; | ||
import org.zalando.problem.MoreStatus; | ||
|
||
import javax.ws.rs.core.Response; | ||
|
||
public class EventSizeValidationException extends NakadiException { | ||
public EventSizeValidationException(final String message) { | ||
super(message); | ||
} | ||
|
||
public EventSizeValidationException (final ValidationError validationError) { | ||
super(validationError.getMessage()); | ||
} | ||
|
||
@Override | ||
protected Response.StatusType getStatus() { | ||
return MoreStatus.UNPROCESSABLE_ENTITY; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.stereotype.Component; | ||
import org.zalando.nakadi.config.NakadiSettings; | ||
import org.zalando.nakadi.domain.BatchFactory; | ||
import org.zalando.nakadi.domain.BatchItem; | ||
import org.zalando.nakadi.domain.BatchItemResponse; | ||
|
@@ -16,6 +17,7 @@ | |
import org.zalando.nakadi.enrichment.Enrichment; | ||
import org.zalando.nakadi.exceptions.EnrichmentException; | ||
import org.zalando.nakadi.exceptions.EventPublishingException; | ||
import org.zalando.nakadi.exceptions.EventSizeValidationException; | ||
import org.zalando.nakadi.exceptions.EventValidationException; | ||
import org.zalando.nakadi.exceptions.InternalNakadiException; | ||
import org.zalando.nakadi.exceptions.NoSuchEventTypeException; | ||
|
@@ -37,6 +39,8 @@ public class EventPublisher { | |
|
||
private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class); | ||
|
||
private final NakadiSettings nakadiSettings; | ||
|
||
private final TopicRepository topicRepository; | ||
private final EventTypeCache eventTypeCache; | ||
private final PartitionResolver partitionResolver; | ||
|
@@ -46,11 +50,13 @@ public class EventPublisher { | |
public EventPublisher(final TopicRepository topicRepository, | ||
final EventTypeCache eventTypeCache, | ||
final PartitionResolver partitionResolver, | ||
final Enrichment enrichment) { | ||
final Enrichment enrichment, | ||
final NakadiSettings nakadiSettings) { | ||
this.topicRepository = topicRepository; | ||
this.eventTypeCache = eventTypeCache; | ||
this.partitionResolver = partitionResolver; | ||
this.enrichment = enrichment; | ||
this.nakadiSettings = nakadiSettings; | ||
} | ||
|
||
public EventPublishResult publish(final JSONArray events, final String eventTypeName, final Client client) | ||
|
@@ -64,6 +70,7 @@ public EventPublishResult publish(final JSONArray events, final String eventType | |
validate(batch, eventType); | ||
partition(batch, eventType); | ||
enrich(batch, eventType); | ||
validateSize(batch); | ||
submit(batch, eventType); | ||
|
||
return ok(batch); | ||
|
@@ -76,6 +83,9 @@ public EventPublishResult publish(final JSONArray events, final String eventType | |
} catch (final EnrichmentException e) { | ||
LOG.debug("Event enrichment error: {}", e.getMessage()); | ||
return aborted(EventPublishingStep.ENRICHING, batch); | ||
} catch (final EventSizeValidationException e) { | ||
LOG.debug("Event size validation error: ){", e.getMessage()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ){ -> {} ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
return aborted(EventPublishingStep.VALIDATING_SIZE, batch); | ||
} catch (final EventPublishingException e) { | ||
LOG.error("error publishing event", e); | ||
return failed(batch); | ||
|
@@ -145,6 +155,23 @@ private void validateSchema(final JSONObject event, final EventType eventType) t | |
} | ||
} | ||
|
||
private void validateSize(final List<BatchItem> batch) throws EventSizeValidationException { | ||
for (final BatchItem item: batch) { | ||
item.setStep(EventPublishingStep.VALIDATING_SIZE); | ||
try { | ||
validateEventSize(item); | ||
} catch (final EventSizeValidationException e) { | ||
item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage()); | ||
throw e; | ||
} | ||
} | ||
} | ||
|
||
private void validateEventSize(final BatchItem item) throws EventSizeValidationException { | ||
if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never use Actually we have to check that we do not use cases where this stupid There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another problem is to actually to do that conversion. There is the byte array representation that you can use. Or you should store the size into the item to avoid that conversion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll fix this. There are a few other places in the code where we use getByte() without setting the locale, I'll create a ticket for that. |
||
throw new EventSizeValidationException("Event too large"); | ||
} | ||
|
||
private EventPublishResult failed(final List<BatchItem> batch) { | ||
return new EventPublishResult(EventPublishingStatus.FAILED, EventPublishingStep.PUBLISHING, responses(batch)); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,6 +78,7 @@ nakadi: | |
auth: | ||
plugin: | ||
factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory | ||
event.max.bytes: 1000000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I though we agreed on 1MB which is a bit bigger :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went for the default value in Kafka, which is 1,000,000, but I'm happy to go for a real 1MB if you think it is better There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also there is a problem with that, if you occasionally want to change producer's property message.max.bytes(and others related), let say to 3MB, it won't work because you restrict it to 1MB and you always have to remember about event.max.bytes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://kafka.apache.org/090/documentation.html#brokerconfigs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why is it a problem? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because you increased kafka broker message size to 3MB and expect that it is gonna be 3MB, but actually it only 1MB There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @adyach Why you expect that if you know that Nakadi validates event size as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. today I expect that, because I remember :) it introduces another property to set when you try to change the size of the message and Kafka has a lot of them as well. if we could make it relative to Kafka producer it would be simpler to not forget. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @adyach I think the property in Nakadi should be primary and the property in Kafka should just satisfy the property in Nakadi. So if somebody will change in Kafka without looking to the value in Nakadi - he does something really wrong. But I still don't really understand what the problem is. |
||
--- | ||
|
||
spring: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered writing a unit test instead of an acceptance test? We try to write as few acceptance tests as possible because they are too slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in b9c36f8