Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Aruha 473 check size of events #515

Merged
merged 41 commits into from
Jan 23, 2017
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9dabfa2
ARUHA-473 Reject batches with at least one event that's too large
Dec 15, 2016
cc50d0c
ARUHA-473 Move maximum event size to nakadi settings
Dec 15, 2016
761d455
ARUHA-473 Unit tests for max event size
Dec 15, 2016
146d6a0
ARUHA-473 Acceptance test for max event size
Dec 16, 2016
8849782
ARUHA-473 Fix checkstyle
Dec 16, 2016
a061392
ARUHA-473 Move size validation after validation, remove VALIDATING_SI…
Dec 16, 2016
0a791da
ARUHA-473 Move size validation inside validation
Dec 16, 2016
df5b675
ARUHA-473 Fix bug in test wrt event publishing step
Dec 16, 2016
e261cb8
ARUHA-473 Refactor string building
Dec 16, 2016
217be7f
ARUHA-473 Fix log string
Dec 16, 2016
ddd7a4f
ARUHA-473 Set capacity for StringBuilder
Dec 16, 2016
957938b
ARUHA-473 If an event is too large, that event is marked FAILED. Othe…
Dec 16, 2016
49c1846
ARUHA-473 Remove redundant Exception
Dec 16, 2016
cc15d1a
ARUHA-473 Explicit charset in getBytes()
Dec 20, 2016
3f9bf87
ARUHA-473 Refactor event size computation
Dec 23, 2016
6f7a450
ARUHA-473 Explicitely set Kafka consumers' fetch.message.max.bytes pr…
Dec 27, 2016
f315836
ARUHA-473 Add curly braces
Dec 28, 2016
8871dea
ARUHA-473 Explicit event size limit in swagger file
Dec 28, 2016
f5e6367
ARUHA-473 BatchItem constructor for String event, adding event size
Dec 30, 2016
d5e6f3d
ARUHA-473 Test BatchItem size with multi-byte characters
Dec 30, 2016
12253ed
ARUHA-473 Fix test style
Dec 30, 2016
30da84a
ARUHA-473 BatchFactory method to process batches as Strings
Dec 30, 2016
7287c40
ARUHA-473 Use String when publishing events to improve event size che…
Jan 3, 2017
d8af912
Merge branch 'master' into ARUHA-473-check-size-of-events
Jan 3, 2017
0fbaa46
ARUHA-473 Pass batch as String to publisher and fix checkstyle violat…
Jan 3, 2017
20aed22
ARUHA-473 Don't parse input to JSON in EventPublishingController
Jan 4, 2017
b9c36f8
ARUHA-473 Remove acceptance test
Jan 5, 2017
33bbc03
ARUHA-473 Remove unused private method
Jan 5, 2017
f6fcfe9
ARUHA-473 Error message for events too large includes the size of the…
Jan 5, 2017
f90e477
ARUHA-473 Fix tests' expected error messages for events too large
Jan 5, 2017
43a337a
Merge branch 'master' into ARUHA-473-check-size-of-events
Jan 5, 2017
bf90d6e
ARUHA-473 Fix test
Jan 5, 2017
0330145
ARUHA-473 Change max event size in swagger file
Jan 6, 2017
4cc0bcd
ARUHA-473 Fix bug when a batch is an array with only spaces.
Jan 6, 2017
30db7f5
ARUHA-473 Set max event size to 999,000 bytes by default
Jan 6, 2017
781db1e
ARUHA-473 Accept valid batches surrounded with tabs, carriage returns…
Jan 10, 2017
ffd729a
ARUHA-473 Refactoring
Jan 10, 2017
e3a4562
Merge branch 'master' into ARUHA-473-check-size-of-events
Jan 18, 2017
58cb450
ARUHA-473 Remove kafka max bytes property
Jan 18, 2017
3f2a2ac
ARUHA-473 Finish removing kafka max bytes property
Jan 19, 2017
57e67d3
Merge branch 'master' into ARUHA-473-check-size-of-events
Jan 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ paths:
incoming Events. Validation rules are evaluated in the order they are defined and the Event
is **rejected** in the first case of failure. If the offending validation rule provides
information about the violation it will be included in the `BatchItemResponse`. If the
`EventType` defines schema validation it will be performed at this moment.
`EventType` defines schema validation it will be performed at this moment. The size of each
Event will also be validated. The maximum size per Event is 999,000 bytes. We use the batch
input to measure the size of events, so unnecessary spaces, tabs, and carriage returns will
count towards the event size.

1. Once the validation succeeded, the content of the Event is updated according to the
enrichment rules in the order the rules are defined in the `EventType`. No preexisting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -54,6 +53,8 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int KAFKA_REQUEST_TIMEOUT = 30000;
private static final int KAFKA_BATCH_SIZE = 1048576;
private static final long KAFKA_LINGER_MS = 0;
private static final long NAKADI_EVENT_MAX_BYTES = 1000000L;
private static final int KAFKA_FETCH_MESSAGE_MAX_BYTES = 2000000;

private NakadiSettings nakadiSettings;
private KafkaSettings kafkaSettings;
Expand All @@ -73,8 +74,10 @@ public void setup() {
DEFAULT_TOPIC_ROTATION,
DEFAULT_COMMIT_TIMEOUT,
NAKADI_POLL_TIMEOUT,
NAKADI_SEND_TIMEOUT);
kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS);
NAKADI_SEND_TIMEOUT,
NAKADI_EVENT_MAX_BYTES);
kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS,
KAFKA_FETCH_MESSAGE_MAX_BYTES);
zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
kafkaHelper = new KafkaTestHelper(KAFKA_URL);
kafkaTopicRepository = createKafkaTopicRepository();
Expand Down Expand Up @@ -133,7 +136,7 @@ public void whenDeleteTopicThenTopicIsDeleted() throws Exception {
@Test(timeout = 10000)
public void whenBulkSendSuccessfullyThenUpdateBatchItemStatus() throws Exception {
final List<BatchItem> items = new ArrayList<>();
final JSONObject event = new JSONObject();
final String event = "{}";
final String topicId = TestUtils.randomValidEventTypeName();
kafkaHelper.createTopic(topicId, ZOOKEEPER_URL);

Expand Down
9 changes: 8 additions & 1 deletion src/main/java/org/zalando/nakadi/config/NakadiSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class NakadiSettings {
private final long defaultCommitTimeoutSeconds;
private final long kafkaPollTimeoutMs;
private final long kafkaSendTimeoutMs;
private final long eventMaxBytes;

@Autowired
public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTopicPartitionCount,
Expand All @@ -24,7 +25,8 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
@Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs,
@Value("${nakadi.stream.default.commitTimeout}") final long defaultCommitTimeoutSeconds,
@Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs,
@Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs) {
@Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs,
@Value("${nakadi.event.max.bytes}") final long eventMaxBytes) {
this.maxTopicPartitionCount = maxTopicPartitionCount;
this.defaultTopicPartitionCount = defaultTopicPartitionCount;
this.defaultTopicReplicaFactor = defaultTopicReplicaFactor;
Expand All @@ -33,6 +35,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
this.defaultCommitTimeoutSeconds = defaultCommitTimeoutSeconds;
this.kafkaPollTimeoutMs = kafkaPollTimeoutMs;
this.kafkaSendTimeoutMs = kafkaSendTimeoutMs;
this.eventMaxBytes = eventMaxBytes;
}

public int getDefaultTopicPartitionCount() {
Expand Down Expand Up @@ -67,4 +70,8 @@ public long getKafkaSendTimeoutMs() {
return kafkaSendTimeoutMs;
}

public long getEventMaxBytes() {
return eventMaxBytes;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.zalando.nakadi.controller;

import org.json.JSONArray;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,11 +81,8 @@ private ResponseEntity postEventInternal(final String eventTypeName,
final Client client) {
final long startingNanos = System.nanoTime();
try {
final JSONArray eventsAsJsonObjects = new JSONArray(eventsAsString);

final int eventCount = eventsAsJsonObjects.length();
final EventPublishResult result = publisher.publish(eventsAsJsonObjects, eventTypeName, client);
reportMetrics(eventTypeMetrics, result, eventsAsString, eventCount);
final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, client);
reportMetrics(eventTypeMetrics, result, eventsAsString, result.getResponses().size());

final ResponseEntity response = response(result);
return response;
Expand Down
85 changes: 79 additions & 6 deletions src/main/java/org/zalando/nakadi/domain/BatchFactory.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,92 @@
package org.zalando.nakadi.domain;

import org.json.JSONArray;
import org.json.JSONException;

import java.util.ArrayList;
import java.util.List;

public class BatchFactory {

public static List<BatchItem> from(final JSONArray events) {
final List<BatchItem> batch = new ArrayList<>(events.length());
for (int i = 0; i < events.length(); i++) {
batch.add(new BatchItem(events.getJSONObject(i)));
public static List<BatchItem> from(final String events) {
final List<BatchItem> batch = new ArrayList<>();
StringBuilder sb = new StringBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if isn't there a way to build this substring with zero copy. I couldn't find, because since Java 7 String.substring creates a copy of the original char array.

I think it would be valuable to look at JSONTokener and try to figure out if isn't there a way to avoid extra copies of this string. This is the most executed code path in the project. I would expect that this change has an impact on garbage collection and overall performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if it wasn't a string but an array of bytes, then we could just create strings using String(bytes[] bytes, int offset, int length). I'm not sure, I think we should investigate more and reduce memory footprint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I wonder if we could use CharBuffer.wrap(..).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this, and short of using another JSON parser, or doing some ugly size validation inside the controller, I don't see how we can improve this. We should discuss this in more detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. So let's move forward with this solution and we can test the performance impact on staging.

int brackets = 0;
boolean insideQuote = false;
boolean escaped = false;
int start = 0;
final int length = events.length();
int end = length - 1;

while ((events.charAt(start) == ' '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this condition could be extracted to it's own method, something like isBlankCharacter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or as per the test name isEmptyCharacter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will extract

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

|| events.charAt(start) == '\t'
|| events.charAt(start) == '\n'
|| events.charAt(start) == '\r')
&& start < end) {
start++;
}
while ((events.charAt(end) == ' '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this condition is extracted to its own method, then this should be refactored to reuse it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

|| events.charAt(end) == '\t'
|| events.charAt(end) == '\n'
|| events.charAt(end) == '\r')
&& end > start) {
end--;
}
if (!(events.charAt(start) == '[')) {
throw new JSONException(String.format("Unexpected character %s in position %d, expected '['",
events.charAt(start), start));
}
start++;
if (!(events.charAt(end) == ']')) {
throw new JSONException(String.format("Unexpected character %s in position %d, expected ']'",
events.charAt(end), end));
}

for (int i = start; i < end; i++) {
if (!escaped && events.charAt(i) == '"') {
if (insideQuote) {
insideQuote = false;
} else {
insideQuote = true;
}
}
if (escaped) {
sb.append(events.charAt(i));
escaped = false;
} else if (!escaped && events.charAt(i) == '\\') {
sb.append(events.charAt(i));
escaped = true;
} else if (insideQuote) {
sb.append(events.charAt(i));
} else {
if (events.charAt(i) == '{') {
brackets++;
}
if (events.charAt(i) == '}') {
brackets--;
}
if (!((brackets == 0) && ((events.charAt(i) == ',')
|| (events.charAt(i) == ' ')
|| (events.charAt(i) == '\t')
|| (events.charAt(i) == '\n')
|| (events.charAt(i) == '\r')))) {
sb.append(events.charAt(i));
}
if (brackets == 0 && (events.charAt(i) != ' ')
&& (events.charAt(i) != '\t'
&& (events.charAt(i) != '\n')
&& (events.charAt(i) != '\r'))) {
if (sb.length() > 0) {
batch.add(new BatchItem(sb.toString()));
}
sb = new StringBuilder();
}
}
}

if (sb.length() != 0) {
batch.add(new BatchItem(sb.toString()));
Copy link
Contributor

@antban antban Jan 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I completely disagree with this (I mean the fact that we are measuring user input), but there is a bug here:
It seems that message [ ] will create one batch item (but there is nothing there).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@antban You were right, there was a bug with [ ]. No batch would be created, but a JSONException would be thrown. I fixed it in 0330145

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@antban As for measuring user input, I thought it was what we had agreed on earlier this week (or was it last week?). Happy to discuss it offline if I'm wrong.

}

return batch;
}

}
13 changes: 10 additions & 3 deletions src/main/java/org/zalando/nakadi/domain/BatchItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
import org.json.JSONObject;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

public class BatchItem {
private final BatchItemResponse response;
private final JSONObject event;
private String partition;
private String brokerId;
private int eventSize;

public BatchItem(final JSONObject event) {
public BatchItem(final String event) {
this.event = new JSONObject(event);
this.eventSize = event.getBytes(StandardCharsets.UTF_8).length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method also generates a copy of the original array.

this.response = new BatchItemResponse();
this.event = event;

Optional.ofNullable(event.optJSONObject("metadata"))
Optional.ofNullable(this.event.optJSONObject("metadata"))
.map(e -> e.optString("eid", null))
.ifPresent(this.response::setEid);
}
Expand Down Expand Up @@ -58,4 +61,8 @@ public void updateStatusAndDetail(final EventPublishingStatus publishingStatus,
response.setPublishingStatus(publishingStatus);
response.setDetail(detail);
}

public int getEventSize() {
return eventSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ private void updateBrokers() {
}

public Properties getKafkaConsumerProperties() {
return (Properties) kafkaProperties.clone();
final Properties consumerProps = (Properties) kafkaProperties.clone();
consumerProps.put("fetch.message.max.bytes", kafkaSettings.getFetchMessageMaxBytes());
return consumerProps;
}

public Properties getKafkaProducerProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ public class KafkaSettings {
// /kafka/clients/producer/ProducerConfig.java#L232
private final int batchSize;
private final long lingerMs;
private final int fetchMessageMaxBytes;

@Autowired
public KafkaSettings(@Value("${nakadi.kafka.request.timeout.ms}") final int requestTimeoutMs,
@Value("${nakadi.kafka.batch.size}") final int batchSize,
@Value("${nakadi.kafka.linger.ms}") final long lingerMs) {
@Value("${nakadi.kafka.linger.ms}") final long lingerMs,
@Value("${nakadi.kafka.fetch.message.max.bytes}") final int fetchMessageMaxBytes) {
this.requestTimeoutMs = requestTimeoutMs;
this.batchSize = batchSize;
this.lingerMs = lingerMs;
this.fetchMessageMaxBytes = fetchMessageMaxBytes;
}

public int getRequestTimeoutMs() {
Expand All @@ -37,4 +40,8 @@ public int getBatchSize() {
public long getLingerMs() {
return lingerMs;
}

public int getFetchMessageMaxBytes() {
return fetchMessageMaxBytes;
}
}
18 changes: 15 additions & 3 deletions src/main/java/org/zalando/nakadi/service/EventPublisher.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.zalando.nakadi.service;

import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.domain.BatchFactory;
import org.zalando.nakadi.domain.BatchItem;
import org.zalando.nakadi.domain.BatchItemResponse;
Expand Down Expand Up @@ -37,6 +37,8 @@ public class EventPublisher {

private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class);

private final NakadiSettings nakadiSettings;

private final TopicRepository topicRepository;
private final EventTypeCache eventTypeCache;
private final PartitionResolver partitionResolver;
Expand All @@ -46,14 +48,16 @@ public class EventPublisher {
public EventPublisher(final TopicRepository topicRepository,
final EventTypeCache eventTypeCache,
final PartitionResolver partitionResolver,
final Enrichment enrichment) {
final Enrichment enrichment,
final NakadiSettings nakadiSettings) {
this.topicRepository = topicRepository;
this.eventTypeCache = eventTypeCache;
this.partitionResolver = partitionResolver;
this.enrichment = enrichment;
this.nakadiSettings = nakadiSettings;
}

public EventPublishResult publish(final JSONArray events, final String eventTypeName, final Client client)
public EventPublishResult publish(final String events, final String eventTypeName, final Client client)
throws NoSuchEventTypeException, InternalNakadiException {
final EventType eventType = eventTypeCache.getEventType(eventTypeName);
final List<BatchItem> batch = BatchFactory.from(events);
Expand Down Expand Up @@ -119,6 +123,7 @@ private void validate(final List<BatchItem> batch, final EventType eventType) th
item.setStep(EventPublishingStep.VALIDATING);
try {
validateSchema(item.getEvent(), eventType);
validateEventSize(item);
} catch (final EventValidationException e) {
item.updateStatusAndDetail(EventPublishingStatus.FAILED, e.getMessage());
throw e;
Expand All @@ -145,6 +150,13 @@ private void validateSchema(final JSONObject event, final EventType eventType) t
}
}

private void validateEventSize(final BatchItem item) throws EventValidationException {
if (item.getEventSize() > nakadiSettings.getEventMaxBytes()) {
throw new EventValidationException("Event too large: " + item.getEventSize()
+ " bytes, max size is " + nakadiSettings.getEventMaxBytes() + " bytes");
}
}

private EventPublishResult failed(final List<BatchItem> batch) {
return new EventPublishResult(EventPublishingStatus.FAILED, EventPublishingStep.PUBLISHING, responses(batch));
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ nakadi:
send.timeoutMs: 5000
batch.size: 5242880
linger.ms: 0
fetch.message.max.bytes: 2000000
zookeeper:
kafkaNamespace:
brokers: 127.0.0.1:2181
Expand All @@ -78,6 +79,7 @@ nakadi:
auth:
plugin:
factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory
event.max.bytes: 999000
---

spring:
Expand Down
Loading