-
Notifications
You must be signed in to change notification settings - Fork 293
Conversation
@@ -4,6 +4,7 @@ | |||
NONE, | |||
VALIDATING, | |||
ENRICHING, | |||
VALIDATING_SIZE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately you can't change this. Because this enum is defined in our API and if you add values here that will mean that we add backwards incompatible changes.
https://github.com/zalando/nakadi/blob/master/api/nakadi-event-bus-api.yaml#L1817
Why do you want a separate step for that? I thought it can be part of VALIDATING step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we make it part of VALIDATING, then we will check the size of the event before enriching. I think it makes more sense to check the size after enriching, since the enrichment strategy is chose by the user who created the event type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I could either check the size during validation, in which case we ignore whatever is added by the enrichment step; or, go back to validation after enriching, but that may be very confusing to users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the simplest way to go would be to assume that enrichment and metadata that is added by kafka will not add more data than the difference between nakadi limit (1kk bytes) and kafka limit (I think we wanted to set 2kk bytes)
In that case you can simply validate the size on validation step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -76,6 +83,9 @@ public EventPublishResult publish(final JSONArray events, final String eventType | |||
} catch (final EnrichmentException e) { | |||
LOG.debug("Event enrichment error: {}", e.getMessage()); | |||
return aborted(EventPublishingStep.ENRICHING, batch); | |||
} catch (final EventSizeValidationException e) { | |||
LOG.debug("Event size validation error: ){", e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
){ -> {} ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int i = 0; i < 1000010; i++) { | ||
sb.append("a"); | ||
} | ||
sb.append("\"}]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can simply do: org.apache.commons.lang3.StringUtils.repeat("a", 1000010)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -78,6 +78,7 @@ nakadi: | |||
auth: | |||
plugin: | |||
factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory | |||
event.max.bytes: 1000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I though we agreed on 1MB which is a bit bigger :)
1048576
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went for the default value in Kafka, which is 1,000,000, but I'm happy to go for a real 1MB if you think it is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also there is a problem with that, if you occasionally want to change producer's property message.max.bytes(and others related), let say to 3MB, it won't work because you restrict it to 1MB and you always have to remember about event.max.bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://kafka.apache.org/090/documentation.html#brokerconfigs
it is different according to docs :)
message.max.bytes= 1000012
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also there is a problem with that, if you occasionally want to change producer's property message.max.bytes(and others related), let say to 3MB, it won't work because you restrict it to 1MB and you always have to remember about event.max.bytes.
Why is it a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because you increased kafka broker message size to 3MB and expect that it is gonna be 3MB, but actually it only 1MB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adyach Why you expect that if you know that Nakadi validates event size as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
today I expect that, because I remember :) it introduces another property to set when you try to change the size of the message and Kafka has a lot of them as well. if we could make it relative to Kafka producer it would be simpler to not forget.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adyach I think the property in Nakadi should be primary and the property in Kafka should just satisfy the property in Nakadi. So if somebody will change in Kafka without looking to the value in Nakadi - he does something really wrong.
But I still don't really understand what the problem is.
One can say: what if somebody forgets how to operate AWS and will delete the whole kafka stack and we will loose all the data :) For me these are the problems of the same sort.
} catch (final EventValidationException e) { | ||
item.updateStatusAndDetail(EventPublishingStatus.FAILED, e.getMessage()); | ||
throw e; | ||
} catch (final EventSizeValidationException e) { | ||
item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here it should be FAILED
like in a case when there was a schema validation problem. Because this is exactly the item that failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…r events in the batch are marked ABORTED
@@ -145,6 +151,11 @@ private void validateSchema(final JSONObject event, final EventType eventType) t | |||
} | |||
} | |||
|
|||
private void validateEventSize(final BatchItem item) throws EventValidationException { | |||
if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never use getBytes()
without setting the default locale. Please always pass https://docs.oracle.com/javase/7/docs/api/java/nio/charset/StandardCharsets.html#UTF_8 object into that function.
Actually we have to check that we do not use cases where this stupid getByte()
is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another problem is to actually to do that conversion. There is the byte array representation that you can use. Or you should store the size into the item to avoid that conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix this. There are a few other places in the code where we use getByte() without setting the locale, I'll create a ticket for that.
@@ -145,6 +151,11 @@ private void validateSchema(final JSONObject event, final EventType eventType) t | |||
} | |||
} | |||
|
|||
private void validateEventSize(final BatchItem item) throws EventValidationException { | |||
if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes()) | |||
throw new EventValidationException("Event too large"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide a proper error message that includes the size and the limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lmontrieux it seems that this message wasn't changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rcillo Pretty sure I removed the whole method :). The size computation is now in the BatchItem
constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rcillo my mistake, I was wrong. Fixing it now
@@ -128,6 +139,147 @@ public void whenValidationFailsThenSubsequentItemsAreAborted() throws Exception | |||
} | |||
|
|||
@Test | |||
public void whenEventIsTooLargeThenResultIsAborted() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide a test with multibyte chars in the message body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two tests with multibyte characters already, lines 252-280. The first one is an event that is just 1 byte over the limit; the second one is exactly at the limit. Multibyte events are created on lines 502-512
After thinking about this more, it looks like we do not have an well defined idea how to limit the size of the messages. The message itself has the size (in bytes). The message is being enriched by information, that is also increasing the size of the message, written to Kafka. This means that actual limitation on the customer message size is influenced by Kafka message limits and the enrichments, that we provide. To be able to correctly show the reason for applying some limits on the customer message size, we have to also be able to extract and show information about added sizes by every provided enrichment. Such numbers would be also nice to have for monitoring anyway. My suggestion currently would be:
What do you think? |
@valgog The idea was very simple - to limit the message size to 1MB in Nakadi and 2MB in Kafka (so that 1MB + enrichment + kafka metadata will always fit to 2MB) Also I have doubts about point 1 you suggested. That will in fact double Nakadi memory usage. |
sorry, I accidentally closed PR :) |
@valgog @v-stepanov I've had a short discussion offline with @valgog about this, but with Hack week I hadn't updated this PR yet. The way things work is:
The idea behind this fix was to limit the size of an event to 1MB before enrichment (limit x only), but bump up all max values in Kafka to 2MB, to be safe (e+k are just a few bytes long). This way we will accept slightly larger messages than we do now. This only works assuming that increasing max values in Kafka will not increase its memory usage. I haven't looked into it but I'd be surprised in Kafka were to always allocate the max message size for each message. As I recall, the outcome of this discussion was: |
Current coverage is 59.57% (diff: 78.26%)@@ master #515 diff @@
==========================================
Files 198 198
Lines 5415 5470 +55
Methods 0 0
Messages 0 0
Branches 630 648 +18
==========================================
+ Hits 3216 3259 +43
- Misses 2017 2023 +6
- Partials 182 188 +6
|
A quick look at the kafka code doesn't seem to indicate any allocation of max message size for each message. |
@@ -316,7 +316,9 @@ paths: | |||
incoming Events. Validation rules are evaluated in the order they are defined and the Event | |||
is **rejected** in the first case of failure. If the offending validation rule provides | |||
information about the violation it will be included in the `BatchItemResponse`. If the | |||
`EventType` defines schema validation it will be performed at this moment. | |||
`EventType` defines schema validation it will be performed at this moment. The size of each | |||
Event will also be validated. The maximum size per Event is 1,000,000 bytes, before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lmontrieux As far as I know, max.message.bytes for topic is 1,000,000 bytes. That means, that if we will enrich something, then it is guaranteed, that this message won't be written
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@antban we had agreed to set it to something a bit lower, but I was waiting for today's standup to agree on a value. I've suggested 999,000 bytes on hipchat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
if (sb.length() != 0) { | ||
batch.add(new BatchItem(sb.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I completely disagree with this (I mean the fact that we are measuring user input), but there is a bug here:
It seems that message [ ]
will create one batch item (but there is nothing there).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@antban As for measuring user input, I thought it was what we had agreed on earlier this week (or was it last week?). Happy to discuss it offline if I'm wrong.
@lmontrieux is enrichment always using less than 100 bytes? |
@rcillo Well, we have (just under) 1,000 bytes of wiggle room for enrichment. Currently we only have one enrichment strategy, which sets: Of course, if we implement more enrichment strategies, we'll have to revise this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember you mentioning something about wrong status code. I could not find any code related to it.
final int length = events.length(); | ||
int end = length - 1; | ||
|
||
while ((events.charAt(start) == ' ' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this condition could be extracted to it's own method, something like isBlankCharacter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or as per the test name isEmptyCharacter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, will extract
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&& start < end) { | ||
start++; | ||
} | ||
while ((events.charAt(end) == ' ' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this condition is extracted to its own method, then this should be refactored to reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a batch had "empty" (carriage return, tab, space,...) characters around it, it was rejected with a 400 for malformed data. When I made the comment I thought the issue was with large events, but investigating further, I found that it wasn't a size issue, but a trailing carriage return after the batch. |
@@ -59,7 +59,6 @@ nakadi: | |||
send.timeoutMs: 5000 | |||
batch.size: 5242880 | |||
linger.ms: 0 | |||
fetch.message.max.bytes: 2000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not needed anymore, since we decided to stick to the default value for Kafka. I forgot to remove it, and now I see in Scalyr that it was never correct in the first place, so this line is just ignored.
@rcillo I broke the build with my previous commit. This removes all code related to the update in the consumer's max fetch size (does not affect producer's max event size btw) |
@@ -99,9 +99,7 @@ private void updateBrokers() { | |||
} | |||
|
|||
public Properties getKafkaConsumerProperties() { | |||
final Properties consumerProps = (Properties) kafkaProperties.clone(); | |||
consumerProps.put("fetch.message.max.bytes", kafkaSettings.getFetchMessageMaxBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lmontrieux I think we need to migrate to bigger message size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adyach yes, but this is not working. I get this in the logs: [org.apache.kafka.clients.consumer.ConsumerConfig] --- The configuration fetch.message.max.bytes = 2000000 was supplied but isn't a known config.
I think we could do this as a separate ticket, where we increase the max event size from 999,000 bytes to 1,000,000 bytes
dismissed for not showing up in PR again :)
This PR checks the size of all events in a batch. If there is at least one event that is over the maximum size after enrichment, then the entire batch is rejected with a 422.