Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #515 from zalando/ARUHA-473-check-size-of-events
Browse files Browse the repository at this point in the history
Aruha 473 check size of events
lmontrieux authored Jan 23, 2017
2 parents 02a6fa1 + 57e67d3 commit 30c961b
Showing 17 changed files with 590 additions and 88 deletions.
5 changes: 4 additions & 1 deletion api/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
@@ -316,7 +316,10 @@ paths:
incoming Events. Validation rules are evaluated in the order they are defined and the Event
is **rejected** in the first case of failure. If the offending validation rule provides
information about the violation it will be included in the `BatchItemResponse`. If the
`EventType` defines schema validation it will be performed at this moment.
`EventType` defines schema validation it will be performed at this moment. The size of each
Event will also be validated. The maximum size per Event is 999,000 bytes. We use the batch
input to measure the size of events, so unnecessary spaces, tabs, and carriage returns will
count towards the event size.
1. Once the validation succeeded, the content of the Event is updated according to the
enrichment rules in the order the rules are defined in the `EventType`. No preexisting
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -54,6 +53,7 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int KAFKA_REQUEST_TIMEOUT = 30000;
private static final int KAFKA_BATCH_SIZE = 1048576;
private static final long KAFKA_LINGER_MS = 0;
private static final long NAKADI_EVENT_MAX_BYTES = 1000000L;

private NakadiSettings nakadiSettings;
private KafkaSettings kafkaSettings;
@@ -73,7 +73,8 @@ public void setup() {
DEFAULT_TOPIC_ROTATION,
DEFAULT_COMMIT_TIMEOUT,
NAKADI_POLL_TIMEOUT,
NAKADI_SEND_TIMEOUT);
NAKADI_SEND_TIMEOUT,
NAKADI_EVENT_MAX_BYTES);
kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS);
zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
kafkaHelper = new KafkaTestHelper(KAFKA_URL);
@@ -133,7 +134,7 @@ public void whenDeleteTopicThenTopicIsDeleted() throws Exception {
@Test(timeout = 10000)
public void whenBulkSendSuccessfullyThenUpdateBatchItemStatus() throws Exception {
final List<BatchItem> items = new ArrayList<>();
final JSONObject event = new JSONObject();
final String event = "{}";
final String topicId = TestUtils.randomValidEventTypeName();
kafkaHelper.createTopic(topicId, ZOOKEEPER_URL);

9 changes: 8 additions & 1 deletion src/main/java/org/zalando/nakadi/config/NakadiSettings.java
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ public class NakadiSettings {
private final long defaultCommitTimeoutSeconds;
private final long kafkaPollTimeoutMs;
private final long kafkaSendTimeoutMs;
private final long eventMaxBytes;

@Autowired
public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTopicPartitionCount,
@@ -24,7 +25,8 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
@Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs,
@Value("${nakadi.stream.default.commitTimeout}") final long defaultCommitTimeoutSeconds,
@Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs,
@Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs) {
@Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs,
@Value("${nakadi.event.max.bytes}") final long eventMaxBytes) {
this.maxTopicPartitionCount = maxTopicPartitionCount;
this.defaultTopicPartitionCount = defaultTopicPartitionCount;
this.defaultTopicReplicaFactor = defaultTopicReplicaFactor;
@@ -33,6 +35,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
this.defaultCommitTimeoutSeconds = defaultCommitTimeoutSeconds;
this.kafkaPollTimeoutMs = kafkaPollTimeoutMs;
this.kafkaSendTimeoutMs = kafkaSendTimeoutMs;
this.eventMaxBytes = eventMaxBytes;
}

public int getDefaultTopicPartitionCount() {
@@ -67,4 +70,8 @@ public long getKafkaSendTimeoutMs() {
return kafkaSendTimeoutMs;
}

public long getEventMaxBytes() {
return eventMaxBytes;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.zalando.nakadi.controller;

import org.json.JSONArray;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,11 +81,8 @@ private ResponseEntity postEventInternal(final String eventTypeName,
final Client client) {
final long startingNanos = System.nanoTime();
try {
final JSONArray eventsAsJsonObjects = new JSONArray(eventsAsString);

final int eventCount = eventsAsJsonObjects.length();
final EventPublishResult result = publisher.publish(eventsAsJsonObjects, eventTypeName, client);
reportMetrics(eventTypeMetrics, result, eventsAsString, eventCount);
final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, client);
reportMetrics(eventTypeMetrics, result, eventsAsString, result.getResponses().size());

final ResponseEntity response = response(result);
return response;
73 changes: 68 additions & 5 deletions src/main/java/org/zalando/nakadi/domain/BatchFactory.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,82 @@
package org.zalando.nakadi.domain;

import org.json.JSONArray;
import org.json.JSONException;

import java.util.ArrayList;
import java.util.List;

public class BatchFactory {

public static List<BatchItem> from(final JSONArray events) {
final List<BatchItem> batch = new ArrayList<>(events.length());
for (int i = 0; i < events.length(); i++) {
batch.add(new BatchItem(events.getJSONObject(i)));
public static List<BatchItem> from(final String events) {
final List<BatchItem> batch = new ArrayList<>();
StringBuilder sb = new StringBuilder();
int brackets = 0;
boolean insideQuote = false;
boolean escaped = false;
int start = 0;
final int length = events.length();
int end = length - 1;

while (isEmptyCharacter(events.charAt(start)) && start < end) {
start++;
}
while (isEmptyCharacter(events.charAt(end)) && end > start) {
end--;
}
if (!(events.charAt(start) == '[')) {
throw new JSONException(String.format("Unexpected character %s in position %d, expected '['",
events.charAt(start), start));
}
start++;
if (!(events.charAt(end) == ']')) {
throw new JSONException(String.format("Unexpected character %s in position %d, expected ']'",
events.charAt(end), end));
}

for (int i = start; i < end; i++) {
if (!escaped && events.charAt(i) == '"') {
if (insideQuote) {
insideQuote = false;
} else {
insideQuote = true;
}
}
if (escaped) {
sb.append(events.charAt(i));
escaped = false;
} else if (!escaped && events.charAt(i) == '\\') {
sb.append(events.charAt(i));
escaped = true;
} else if (insideQuote) {
sb.append(events.charAt(i));
} else {
if (events.charAt(i) == '{') {
brackets++;
}
if (events.charAt(i) == '}') {
brackets--;
}
if (!((brackets == 0) && ((events.charAt(i) == ',')
|| isEmptyCharacter(events.charAt(i))))) {
sb.append(events.charAt(i));
}
if (brackets == 0 && !isEmptyCharacter(events.charAt(i))) {
if (sb.length() > 0) {
batch.add(new BatchItem(sb.toString()));
}
sb = new StringBuilder();
}
}
}

if (sb.length() != 0) {
batch.add(new BatchItem(sb.toString()));
}

return batch;
}

private static boolean isEmptyCharacter(final char c) {
return (c == ' ' || c == '\t' || c == '\n' || c == '\r');
}
}
13 changes: 10 additions & 3 deletions src/main/java/org/zalando/nakadi/domain/BatchItem.java
Original file line number Diff line number Diff line change
@@ -3,19 +3,22 @@
import org.json.JSONObject;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

public class BatchItem {
private final BatchItemResponse response;
private final JSONObject event;
private String partition;
private String brokerId;
private int eventSize;

public BatchItem(final JSONObject event) {
public BatchItem(final String event) {
this.event = new JSONObject(event);
this.eventSize = event.getBytes(StandardCharsets.UTF_8).length;
this.response = new BatchItemResponse();
this.event = event;

Optional.ofNullable(event.optJSONObject("metadata"))
Optional.ofNullable(this.event.optJSONObject("metadata"))
.map(e -> e.optString("eid", null))
.ifPresent(this.response::setEid);
}
@@ -58,4 +61,8 @@ public void updateStatusAndDetail(final EventPublishingStatus publishingStatus,
response.setPublishingStatus(publishingStatus);
response.setDetail(detail);
}

public int getEventSize() {
return eventSize;
}
}
18 changes: 15 additions & 3 deletions src/main/java/org/zalando/nakadi/service/EventPublisher.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.zalando.nakadi.service;

import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.domain.BatchFactory;
import org.zalando.nakadi.domain.BatchItem;
import org.zalando.nakadi.domain.BatchItemResponse;
@@ -36,6 +36,8 @@ public class EventPublisher {

private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class);

private final NakadiSettings nakadiSettings;

private final TopicRepository topicRepository;
private final EventTypeCache eventTypeCache;
private final PartitionResolver partitionResolver;
@@ -45,14 +47,16 @@ public class EventPublisher {
public EventPublisher(final TopicRepository topicRepository,
final EventTypeCache eventTypeCache,
final PartitionResolver partitionResolver,
final Enrichment enrichment) {
final Enrichment enrichment,
final NakadiSettings nakadiSettings) {
this.topicRepository = topicRepository;
this.eventTypeCache = eventTypeCache;
this.partitionResolver = partitionResolver;
this.enrichment = enrichment;
this.nakadiSettings = nakadiSettings;
}

public EventPublishResult publish(final JSONArray events, final String eventTypeName, final Client client)
public EventPublishResult publish(final String events, final String eventTypeName, final Client client)
throws NoSuchEventTypeException, InternalNakadiException {
final EventType eventType = eventTypeCache.getEventType(eventTypeName);
final List<BatchItem> batch = BatchFactory.from(events);
@@ -118,6 +122,7 @@ private void validate(final List<BatchItem> batch, final EventType eventType) th
item.setStep(EventPublishingStep.VALIDATING);
try {
validateSchema(item.getEvent(), eventType);
validateEventSize(item);
} catch (final EventValidationException e) {
item.updateStatusAndDetail(EventPublishingStatus.FAILED, e.getMessage());
throw e;
@@ -140,6 +145,13 @@ private void validateSchema(final JSONObject event, final EventType eventType) t
}
}

private void validateEventSize(final BatchItem item) throws EventValidationException {
if (item.getEventSize() > nakadiSettings.getEventMaxBytes()) {
throw new EventValidationException("Event too large: " + item.getEventSize()
+ " bytes, max size is " + nakadiSettings.getEventMaxBytes() + " bytes");
}
}

private EventPublishResult failed(final List<BatchItem> batch) {
return new EventPublishResult(EventPublishingStatus.FAILED, EventPublishingStep.PUBLISHING, responses(batch));
}
1 change: 1 addition & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -78,6 +78,7 @@ nakadi:
auth:
plugin:
factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory
event.max.bytes: 999000
---

spring:
Loading

0 comments on commit 30c961b

Please sign in to comment.