Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Aruha 473 check size of events #515

Merged
merged 41 commits into from
Jan 23, 2017
Merged

Conversation

lmontrieux
Copy link
Contributor

This PR checks the size of all events in a batch. If there is at least one event that is over the maximum size after enrichment, then the entire batch is rejected with a 422.

@@ -4,6 +4,7 @@
NONE,
VALIDATING,
ENRICHING,
VALIDATING_SIZE,
Copy link
Contributor

@v-stepanov v-stepanov Dec 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately you can't change this. Because this enum is defined in our API and if you add values here that will mean that we add backwards incompatible changes.
https://github.com/zalando/nakadi/blob/master/api/nakadi-event-bus-api.yaml#L1817

Why do you want a separate step for that? I thought it can be part of VALIDATING step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make it part of VALIDATING, then we will check the size of the event before enriching. I think it makes more sense to check the size after enriching, since the enrichment strategy is chose by the user who created the event type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I could either check the size during validation, in which case we ignore whatever is added by the enrichment step; or, go back to validation after enriching, but that may be very confusing to users

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the simplest way to go would be to assume that enrichment and metadata that is added by kafka will not add more data than the difference between nakadi limit (1kk bytes) and kafka limit (I think we wanted to set 2kk bytes)
In that case you can simply validate the size on validation step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -76,6 +83,9 @@ public EventPublishResult publish(final JSONArray events, final String eventType
} catch (final EnrichmentException e) {
LOG.debug("Event enrichment error: {}", e.getMessage());
return aborted(EventPublishingStep.ENRICHING, batch);
} catch (final EventSizeValidationException e) {
LOG.debug("Event size validation error: ){", e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

){ -> {} ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (int i = 0; i < 1000010; i++) {
sb.append("a");
}
sb.append("\"}]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can simply do: org.apache.commons.lang3.StringUtils.repeat("a", 1000010)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -78,6 +78,7 @@ nakadi:
auth:
plugin:
factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory
event.max.bytes: 1000000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though we agreed on 1MB which is a bit bigger :)
1048576

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went for the default value in Kafka, which is 1,000,000, but I'm happy to go for a real 1MB if you think it is better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also there is a problem with that, if you occasionally want to change producer's property message.max.bytes(and others related), let say to 3MB, it won't work because you restrict it to 1MB and you always have to remember about event.max.bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://kafka.apache.org/090/documentation.html#brokerconfigs
it is different according to docs :)
message.max.bytes= 1000012

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also there is a problem with that, if you occasionally want to change producer's property message.max.bytes(and others related), let say to 3MB, it won't work because you restrict it to 1MB and you always have to remember about event.max.bytes.

Why is it a problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because you increased kafka broker message size to 3MB and expect that it is gonna be 3MB, but actually it only 1MB

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adyach Why you expect that if you know that Nakadi validates event size as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

today I expect that, because I remember :) it introduces another property to set when you try to change the size of the message and Kafka has a lot of them as well. if we could make it relative to Kafka producer it would be simpler to not forget.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adyach I think the property in Nakadi should be primary and the property in Kafka should just satisfy the property in Nakadi. So if somebody will change in Kafka without looking to the value in Nakadi - he does something really wrong.

But I still don't really understand what the problem is.
One can say: what if somebody forgets how to operate AWS and will delete the whole kafka stack and we will loose all the data :) For me these are the problems of the same sort.

} catch (final EventValidationException e) {
item.updateStatusAndDetail(EventPublishingStatus.FAILED, e.getMessage());
throw e;
} catch (final EventSizeValidationException e) {
item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here it should be FAILED like in a case when there was a schema validation problem. Because this is exactly the item that failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valgog
valgog previously requested changes Dec 16, 2016
@@ -145,6 +151,11 @@ private void validateSchema(final JSONObject event, final EventType eventType) t
}
}

private void validateEventSize(final BatchItem item) throws EventValidationException {
if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never use getBytes() without setting the default locale. Please always pass https://docs.oracle.com/javase/7/docs/api/java/nio/charset/StandardCharsets.html#UTF_8 object into that function.

Actually we have to check that we do not use cases where this stupid getByte() is done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem is to actually to do that conversion. There is the byte array representation that you can use. Or you should store the size into the item to avoid that conversion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this. There are a few other places in the code where we use getByte() without setting the locale, I'll create a ticket for that.

@@ -145,6 +151,11 @@ private void validateSchema(final JSONObject event, final EventType eventType) t
}
}

private void validateEventSize(final BatchItem item) throws EventValidationException {
if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes())
throw new EventValidationException("Event too large");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide a proper error message that includes the size and the limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmontrieux it seems that this message wasn't changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rcillo Pretty sure I removed the whole method :). The size computation is now in the BatchItem constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rcillo my mistake, I was wrong. Fixing it now

@@ -128,6 +139,147 @@ public void whenValidationFailsThenSubsequentItemsAreAborted() throws Exception
}

@Test
public void whenEventIsTooLargeThenResultIsAborted() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide a test with multibyte chars in the message body.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two tests with multibyte characters already, lines 252-280. The first one is an event that is just 1 byte over the limit; the second one is exactly at the limit. Multibyte events are created on lines 502-512

@valgog
Copy link
Member

valgog commented Dec 18, 2016

@lmontrieux

After thinking about this more, it looks like we do not have an well defined idea how to limit the size of the messages.

The message itself has the size (in bytes).

The message is being enriched by information, that is also increasing the size of the message, written to Kafka.

This means that actual limitation on the customer message size is influenced by Kafka message limits and the enrichments, that we provide.

To be able to correctly show the reason for applying some limits on the customer message size, we have to also be able to extract and show information about added sizes by every provided enrichment. Such numbers would be also nice to have for monitoring anyway.

My suggestion currently would be:

  1. extend Item object to keep both byte[] and String representation of the original message (check that conversions to and from String are done minimal amount of times)
  2. extend Item object to keep statistics per enrichment (currently only added size in bytes per enrichment)
  3. include information about the original and enriched sizes per enrichment into error message

What do you think?

@v-stepanov
Copy link
Contributor

@valgog
I just wanted to mention that this was supposed to be a simple bugfix, not a spaceship-size feature.
Current situation is that message size is not checked anywhere and when somebody pushes message bigger than 1MB (kafka limit) - we respond with 207 (which is error situation for us) and have hole in the published batch.

The idea was very simple - to limit the message size to 1MB in Nakadi and 2MB in Kafka (so that 1MB + enrichment + kafka metadata will always fit to 2MB)

Also I have doubts about point 1 you suggested. That will in fact double Nakadi memory usage.

@v-stepanov v-stepanov closed this Dec 23, 2016
@v-stepanov
Copy link
Contributor

sorry, I accidentally closed PR :)

@v-stepanov v-stepanov reopened this Dec 23, 2016
@lmontrieux
Copy link
Contributor Author

@valgog @v-stepanov I've had a short discussion offline with @valgog about this, but with Hack week I hadn't updated this PR yet. The way things work is:

  • producer sends an event of size x;
  • nakadi enriches the event, whose size is now x + e;
  • the kafka client serialises the event somehow, and add a bit of metadata. Size is now x + e + k.
  • kafka has 4 different configuration options for the max size of a message: the max size that a broker will accept (default 1MB); the max size of a log segment (must be higher than the max size of a message, default 1GB); the max size of data that a broker can replicate (default 1MB); the max size a consumer can read (default 1MB). Currently we use the defaults.

The idea behind this fix was to limit the size of an event to 1MB before enrichment (limit x only), but bump up all max values in Kafka to 2MB, to be safe (e+k are just a few bytes long). This way we will accept slightly larger messages than we do now. This only works assuming that increasing max values in Kafka will not increase its memory usage. I haven't looked into it but I'd be surprised in Kafka were to always allocate the max message size for each message.

As I recall, the outcome of this discussion was:
1/ limit size in validation, as originally planned;
2/ next week, as a different ticket, work on a more accurate way to limit the size of events, and keep statistics about the size of enrichments.

@codecov-io
Copy link

codecov-io commented Dec 23, 2016

Current coverage is 59.57% (diff: 78.26%)

Merging #515 into master will increase coverage by 0.18%

@@             master       #515   diff @@
==========================================
  Files           198        198          
  Lines          5415       5470    +55   
  Methods           0          0          
  Messages          0          0          
  Branches        630        648    +18   
==========================================
+ Hits           3216       3259    +43   
- Misses         2017       2023     +6   
- Partials        182        188     +6   

Powered by Codecov. Last update e03f6c8...e3a4562

@lmontrieux
Copy link
Contributor Author

A quick look at the kafka code doesn't seem to indicate any allocation of max message size for each message.

@rcillo
Copy link
Contributor

rcillo commented Jan 5, 2017

:shipit:

@@ -316,7 +316,9 @@ paths:
incoming Events. Validation rules are evaluated in the order they are defined and the Event
is **rejected** in the first case of failure. If the offending validation rule provides
information about the violation it will be included in the `BatchItemResponse`. If the
`EventType` defines schema validation it will be performed at this moment.
`EventType` defines schema validation it will be performed at this moment. The size of each
Event will also be validated. The maximum size per Event is 1,000,000 bytes, before
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmontrieux As far as I know, max.message.bytes for topic is 1,000,000 bytes. That means, that if we will enrich something, then it is guaranteed, that this message won't be written

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@antban we had agreed to set it to something a bit lower, but I was waiting for today's standup to agree on a value. I've suggested 999,000 bytes on hipchat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if (sb.length() != 0) {
batch.add(new BatchItem(sb.toString()));
Copy link
Contributor

@antban antban Jan 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I completely disagree with this (I mean the fact that we are measuring user input), but there is a bug here:
It seems that message [ ] will create one batch item (but there is nothing there).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@antban You were right, there was a bug with [ ]. No batch would be created, but a JSONException would be thrown. I fixed it in 0330145

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@antban As for measuring user input, I thought it was what we had agreed on earlier this week (or was it last week?). Happy to discuss it offline if I'm wrong.

@rcillo
Copy link
Contributor

rcillo commented Jan 6, 2017

@lmontrieux is enrichment always using less than 100 bytes?

@lmontrieux
Copy link
Contributor Author

@rcillo Well, we have (just under) 1,000 bytes of wiggle room for enrichment. Currently we only have one enrichment strategy, which sets: received_at, event_type, flow_id, partition, and version. Looks fairly stable in its size.

Of course, if we implement more enrichment strategies, we'll have to revise this.

@rcillo
Copy link
Contributor

rcillo commented Jan 6, 2017

:shipit:

Copy link
Contributor

@rcillo rcillo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember you mentioning something about wrong status code. I could not find any code related to it.

final int length = events.length();
int end = length - 1;

while ((events.charAt(start) == ' '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this condition could be extracted to it's own method, something like isBlankCharacter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or as per the test name isEmptyCharacter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will extract

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&& start < end) {
start++;
}
while ((events.charAt(end) == ' '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this condition is extracted to its own method, then this should be refactored to reuse it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmontrieux
Copy link
Contributor Author

If a batch had "empty" (carriage return, tab, space,...) characters around it, it was rejected with a 400 for malformed data. When I made the comment I thought the issue was with large events, but investigating further, I found that it wasn't a size issue, but a trailing carriage return after the batch.

@rcillo
Copy link
Contributor

rcillo commented Jan 10, 2017

:shipit:

@@ -59,7 +59,6 @@ nakadi:
send.timeoutMs: 5000
batch.size: 5242880
linger.ms: 0
fetch.message.max.bytes: 2000000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not needed anymore, since we decided to stick to the default value for Kafka. I forgot to remove it, and now I see in Scalyr that it was never correct in the first place, so this line is just ignored.

@rcillo
Copy link
Contributor

rcillo commented Jan 18, 2017

:shipit:

@lmontrieux
Copy link
Contributor Author

@rcillo I broke the build with my previous commit. This removes all code related to the update in the consumer's max fetch size (does not affect producer's max event size btw)

@@ -99,9 +99,7 @@ private void updateBrokers() {
}

public Properties getKafkaConsumerProperties() {
final Properties consumerProps = (Properties) kafkaProperties.clone();
consumerProps.put("fetch.message.max.bytes", kafkaSettings.getFetchMessageMaxBytes());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmontrieux I think we need to migrate to bigger message size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adyach yes, but this is not working. I get this in the logs: [org.apache.kafka.clients.consumer.ConsumerConfig] --- The configuration fetch.message.max.bytes = 2000000 was supplied but isn't a known config.

I think we could do this as a separate ticket, where we increase the max event size from 999,000 bytes to 1,000,000 bytes

@rcillo
Copy link
Contributor

rcillo commented Jan 19, 2017

:shipit:

@v-stepanov
Copy link
Contributor

:shipit:

@v-stepanov v-stepanov dismissed valgog’s stale review January 23, 2017 14:09

dismissed for not showing up in PR again :)

@lmontrieux lmontrieux merged commit 30c961b into master Jan 23, 2017
@lmontrieux lmontrieux deleted the ARUHA-473-check-size-of-events branch January 24, 2017 09:57
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants