-
Notifications
You must be signed in to change notification settings - Fork 293
Aruha 473 check size of events #515
Changes from 11 commits
9dabfa2
cc50d0c
761d455
146d6a0
8849782
a061392
0a791da
df5b675
e261cb8
217be7f
ddd7a4f
957938b
49c1846
cc15d1a
3f9bf87
6f7a450
f315836
8871dea
f5e6367
d5e6f3d
12253ed
30da84a
7287c40
d8af912
0fbaa46
20aed22
b9c36f8
33bbc03
f6fcfe9
f90e477
43a337a
bf90d6e
0330145
4cc0bcd
30db7f5
781db1e
ffd729a
e3a4562
58cb450
3f2a2ac
57e67d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package org.zalando.nakadi.webservice; | ||
|
||
import com.jayway.restassured.http.ContentType; | ||
import com.jayway.restassured.response.Response; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.http.HttpStatus; | ||
import org.junit.Test; | ||
import org.zalando.nakadi.domain.EventType; | ||
import org.zalando.nakadi.webservice.utils.NakadiTestUtils; | ||
|
||
import java.text.MessageFormat; | ||
|
||
import static com.jayway.restassured.RestAssured.given; | ||
|
||
public class EventPublishingAT extends BaseAT { | ||
|
||
@Test | ||
public void whenPublishingEventTooLargeThen422() throws Exception { | ||
final EventType eventType = NakadiTestUtils.createEventType(); | ||
|
||
publishLargeEvent(eventType) | ||
.then() | ||
.statusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY); | ||
} | ||
|
||
private Response publishLargeEvent(final EventType eventType) { | ||
final StringBuilder sb = new StringBuilder(1000023); | ||
sb.append("[{\"blah\":\""); | ||
sb.append(StringUtils.repeat("a", 1000010)); | ||
sb.append("\"}]"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can simply do: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
return given() | ||
.body(sb.toString()) | ||
.contentType(ContentType.JSON) | ||
.post(MessageFormat.format("/event-types/{0}/events", eventType.getName())); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package org.zalando.nakadi.exceptions; | ||
|
||
import org.zalando.nakadi.validation.ValidationError; | ||
import org.zalando.problem.MoreStatus; | ||
|
||
import javax.ws.rs.core.Response; | ||
|
||
public class EventSizeValidationException extends NakadiException { | ||
public EventSizeValidationException(final String message) { | ||
super(message); | ||
} | ||
|
||
public EventSizeValidationException (final ValidationError validationError) { | ||
super(validationError.getMessage()); | ||
} | ||
|
||
@Override | ||
protected Response.StatusType getStatus() { | ||
return MoreStatus.UNPROCESSABLE_ENTITY; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.stereotype.Component; | ||
import org.zalando.nakadi.config.NakadiSettings; | ||
import org.zalando.nakadi.domain.BatchFactory; | ||
import org.zalando.nakadi.domain.BatchItem; | ||
import org.zalando.nakadi.domain.BatchItemResponse; | ||
|
@@ -16,6 +17,7 @@ | |
import org.zalando.nakadi.enrichment.Enrichment; | ||
import org.zalando.nakadi.exceptions.EnrichmentException; | ||
import org.zalando.nakadi.exceptions.EventPublishingException; | ||
import org.zalando.nakadi.exceptions.EventSizeValidationException; | ||
import org.zalando.nakadi.exceptions.EventValidationException; | ||
import org.zalando.nakadi.exceptions.InternalNakadiException; | ||
import org.zalando.nakadi.exceptions.NoSuchEventTypeException; | ||
|
@@ -37,6 +39,8 @@ public class EventPublisher { | |
|
||
private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class); | ||
|
||
private final NakadiSettings nakadiSettings; | ||
|
||
private final TopicRepository topicRepository; | ||
private final EventTypeCache eventTypeCache; | ||
private final PartitionResolver partitionResolver; | ||
|
@@ -46,11 +50,13 @@ public class EventPublisher { | |
public EventPublisher(final TopicRepository topicRepository, | ||
final EventTypeCache eventTypeCache, | ||
final PartitionResolver partitionResolver, | ||
final Enrichment enrichment) { | ||
final Enrichment enrichment, | ||
final NakadiSettings nakadiSettings) { | ||
this.topicRepository = topicRepository; | ||
this.eventTypeCache = eventTypeCache; | ||
this.partitionResolver = partitionResolver; | ||
this.enrichment = enrichment; | ||
this.nakadiSettings = nakadiSettings; | ||
} | ||
|
||
public EventPublishResult publish(final JSONArray events, final String eventTypeName, final Client client) | ||
|
@@ -70,6 +76,9 @@ public EventPublishResult publish(final JSONArray events, final String eventType | |
} catch (final EventValidationException e) { | ||
LOG.debug("Event validation error: {}", e.getMessage()); | ||
return aborted(EventPublishingStep.VALIDATING, batch); | ||
} catch (final EventSizeValidationException e) { | ||
LOG.debug("Event size validation error: {}", e.getMessage()); | ||
return aborted(EventPublishingStep.VALIDATING, batch); | ||
} catch (final PartitioningException e) { | ||
LOG.debug("Event partition error: {}", e.getMessage()); | ||
return aborted(EventPublishingStep.PARTITIONING, batch); | ||
|
@@ -114,14 +123,18 @@ private void partition(final List<BatchItem> batch, final EventType eventType) t | |
} | ||
|
||
private void validate(final List<BatchItem> batch, final EventType eventType) throws EventValidationException, | ||
InternalNakadiException { | ||
EventSizeValidationException, InternalNakadiException { | ||
for (final BatchItem item : batch) { | ||
item.setStep(EventPublishingStep.VALIDATING); | ||
try { | ||
validateSchema(item.getEvent(), eventType); | ||
validateEventSize(item); | ||
} catch (final EventValidationException e) { | ||
item.updateStatusAndDetail(EventPublishingStatus.FAILED, e.getMessage()); | ||
throw e; | ||
} catch (final EventSizeValidationException e) { | ||
item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think here it should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
throw e; | ||
} | ||
} | ||
} | ||
|
@@ -145,6 +158,11 @@ private void validateSchema(final JSONObject event, final EventType eventType) t | |
} | ||
} | ||
|
||
private void validateEventSize(final BatchItem item) throws EventSizeValidationException { | ||
if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never use Actually we have to check that we do not use cases where this stupid There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another problem is to actually to do that conversion. There is the byte array representation that you can use. Or you should store the size into the item to avoid that conversion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll fix this. There are a few other places in the code where we use getByte() without setting the locale, I'll create a ticket for that. |
||
throw new EventSizeValidationException("Event too large"); | ||
} | ||
|
||
private EventPublishResult failed(final List<BatchItem> batch) { | ||
return new EventPublishResult(EventPublishingStatus.FAILED, EventPublishingStep.PUBLISHING, responses(batch)); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,6 +78,7 @@ nakadi: | |
auth: | ||
plugin: | ||
factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory | ||
event.max.bytes: 1000000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I though we agreed on 1MB which is a bit bigger :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went for the default value in Kafka, which is 1,000,000, but I'm happy to go for a real 1MB if you think it is better There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also there is a problem with that, if you occasionally want to change producer's property message.max.bytes(and others related), let say to 3MB, it won't work because you restrict it to 1MB and you always have to remember about event.max.bytes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://kafka.apache.org/090/documentation.html#brokerconfigs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why is it a problem? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because you increased kafka broker message size to 3MB and expect that it is gonna be 3MB, but actually it only 1MB There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @adyach Why you expect that if you know that Nakadi validates event size as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. today I expect that, because I remember :) it introduces another property to set when you try to change the size of the message and Kafka has a lot of them as well. if we could make it relative to Kafka producer it would be simpler to not forget. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @adyach I think the property in Nakadi should be primary and the property in Kafka should just satisfy the property in Nakadi. So if somebody will change in Kafka without looking to the value in Nakadi - he does something really wrong. But I still don't really understand what the problem is. |
||
--- | ||
|
||
spring: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered writing a unit test instead of an acceptance test? We try to write as few acceptance tests as possible because they are too slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in b9c36f8