Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Aruha 473 check size of events #515

Merged
merged 41 commits into from
Jan 23, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9dabfa2
ARUHA-473 Reject batches with at least one event that's too large
Dec 15, 2016
cc50d0c
ARUHA-473 Move maximum event size to nakadi settings
Dec 15, 2016
761d455
ARUHA-473 Unit tests for max event size
Dec 15, 2016
146d6a0
ARUHA-473 Acceptance test for max event size
Dec 16, 2016
8849782
ARUHA-473 Fix checkstyle
Dec 16, 2016
a061392
ARUHA-473 Move size validation after validation, remove VALIDATING_SI…
Dec 16, 2016
0a791da
ARUHA-473 Move size validation inside validation
Dec 16, 2016
df5b675
ARUHA-473 Fix bug in test wrt event publishing step
Dec 16, 2016
e261cb8
ARUHA-473 Refactor string building
Dec 16, 2016
217be7f
ARUHA-473 Fix log string
Dec 16, 2016
ddd7a4f
ARUHA-473 Set capacity for StringBuilder
Dec 16, 2016
957938b
ARUHA-473 If an event is too large, that event is marked FAILED. Othe…
Dec 16, 2016
49c1846
ARUHA-473 Remove redundant Exception
Dec 16, 2016
cc15d1a
ARUHA-473 Explicit charset in getBytes()
Dec 20, 2016
3f9bf87
ARUHA-473 Refactor event size computation
Dec 23, 2016
6f7a450
ARUHA-473 Explicitely set Kafka consumers' fetch.message.max.bytes pr…
Dec 27, 2016
f315836
ARUHA-473 Add curly braces
Dec 28, 2016
8871dea
ARUHA-473 Explicit event size limit in swagger file
Dec 28, 2016
f5e6367
ARUHA-473 BatchItem constructor for String event, adding event size
Dec 30, 2016
d5e6f3d
ARUHA-473 Test BatchItem size with multi-byte characters
Dec 30, 2016
12253ed
ARUHA-473 Fix test style
Dec 30, 2016
30da84a
ARUHA-473 BatchFactory method to process batches as Strings
Dec 30, 2016
7287c40
ARUHA-473 Use String when publishing events to improve event size che…
Jan 3, 2017
d8af912
Merge branch 'master' into ARUHA-473-check-size-of-events
Jan 3, 2017
0fbaa46
ARUHA-473 Pass batch as String to publisher and fix checkstyle violat…
Jan 3, 2017
20aed22
ARUHA-473 Don't parse input to JSON in EventPublishingController
Jan 4, 2017
b9c36f8
ARUHA-473 Remove acceptance test
Jan 5, 2017
33bbc03
ARUHA-473 Remove unused private method
Jan 5, 2017
f6fcfe9
ARUHA-473 Error message for events too large includes the size of the…
Jan 5, 2017
f90e477
ARUHA-473 Fix tests' expected error messages for events too large
Jan 5, 2017
43a337a
Merge branch 'master' into ARUHA-473-check-size-of-events
Jan 5, 2017
bf90d6e
ARUHA-473 Fix test
Jan 5, 2017
0330145
ARUHA-473 Change max event size in swagger file
Jan 6, 2017
4cc0bcd
ARUHA-473 Fix bug when a batch is an array with only spaces.
Jan 6, 2017
30db7f5
ARUHA-473 Set max event size to 999,000 bytes by default
Jan 6, 2017
781db1e
ARUHA-473 Accept valid batches surrounded with tabs, carriage returns…
Jan 10, 2017
ffd729a
ARUHA-473 Refactoring
Jan 10, 2017
e3a4562
Merge branch 'master' into ARUHA-473-check-size-of-events
Jan 18, 2017
58cb450
ARUHA-473 Remove kafka max bytes property
Jan 18, 2017
3f2a2ac
ARUHA-473 Finish removing kafka max bytes property
Jan 19, 2017
57e67d3
Merge branch 'master' into ARUHA-473-check-size-of-events
Jan 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int KAFKA_REQUEST_TIMEOUT = 30000;
private static final int KAFKA_BATCH_SIZE = 1048576;
private static final long KAFKA_LINGER_MS = 0;
private static final long NAKADI_EVENT_MAX_BYTES = 1000000L;

private NakadiSettings nakadiSettings;
private KafkaSettings kafkaSettings;
Expand All @@ -73,7 +74,8 @@ public void setup() {
DEFAULT_TOPIC_ROTATION,
DEFAULT_COMMIT_TIMEOUT,
NAKADI_POLL_TIMEOUT,
NAKADI_SEND_TIMEOUT);
NAKADI_SEND_TIMEOUT,
NAKADI_EVENT_MAX_BYTES);
kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS);
zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
kafkaHelper = new KafkaTestHelper(KAFKA_URL);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.zalando.nakadi.webservice;

import com.jayway.restassured.http.ContentType;
import com.jayway.restassured.response.Response;
import org.apache.http.HttpStatus;
import org.junit.Test;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;

import java.text.MessageFormat;

import static com.jayway.restassured.RestAssured.given;

public class EventPublishingAT extends BaseAT {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered writing a unit test instead of an acceptance test? We try to write as few acceptance tests as possible because they are too slow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in b9c36f8


@Test
public void whenPublishingEventTooLargeThen422() throws Exception {
final EventType eventType = NakadiTestUtils.createEventType();

publishLargeEvent(eventType)
.then()
.statusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY);
}

private Response publishLargeEvent(final EventType eventType) {
final StringBuilder sb = new StringBuilder();
sb.append("[{\"blah\":\"");
for (int i = 0; i < 1000010; i++) {
sb.append("a");
}
sb.append("\"}]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can simply do: org.apache.commons.lang3.StringUtils.repeat("a", 1000010)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return given()
.body(sb.toString())
.contentType(ContentType.JSON)
.post(MessageFormat.format("/event-types/{0}/events", eventType.getName()));
}
}
9 changes: 8 additions & 1 deletion src/main/java/org/zalando/nakadi/config/NakadiSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class NakadiSettings {
private final long defaultCommitTimeoutSeconds;
private final long kafkaPollTimeoutMs;
private final long kafkaSendTimeoutMs;
private final long eventMaxBytes;

@Autowired
public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTopicPartitionCount,
Expand All @@ -24,7 +25,8 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
@Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs,
@Value("${nakadi.stream.default.commitTimeout}") final long defaultCommitTimeoutSeconds,
@Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs,
@Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs) {
@Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs,
@Value("${nakadi.event.max.bytes}") final long eventMaxBytes) {
this.maxTopicPartitionCount = maxTopicPartitionCount;
this.defaultTopicPartitionCount = defaultTopicPartitionCount;
this.defaultTopicReplicaFactor = defaultTopicReplicaFactor;
Expand All @@ -33,6 +35,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
this.defaultCommitTimeoutSeconds = defaultCommitTimeoutSeconds;
this.kafkaPollTimeoutMs = kafkaPollTimeoutMs;
this.kafkaSendTimeoutMs = kafkaSendTimeoutMs;
this.eventMaxBytes = eventMaxBytes;
}

public int getDefaultTopicPartitionCount() {
Expand Down Expand Up @@ -67,4 +70,8 @@ public long getKafkaSendTimeoutMs() {
return kafkaSendTimeoutMs;
}

public long getEventMaxBytes() {
return eventMaxBytes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public enum EventPublishingStep {
NONE,
VALIDATING,
ENRICHING,
VALIDATING_SIZE,
Copy link
Contributor

@v-stepanov v-stepanov Dec 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately you can't change this. Because this enum is defined in our API and if you add values here that will mean that we add backwards incompatible changes.
https://github.com/zalando/nakadi/blob/master/api/nakadi-event-bus-api.yaml#L1817

Why do you want a separate step for that? I thought it can be part of VALIDATING step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make it part of VALIDATING, then we will check the size of the event before enriching. I think it makes more sense to check the size after enriching, since the enrichment strategy is chose by the user who created the event type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I could either check the size during validation, in which case we ignore whatever is added by the enrichment step; or, go back to validation after enriching, but that may be very confusing to users

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the simplest way to go would be to assume that enrichment and metadata that is added by kafka will not add more data than the difference between nakadi limit (1kk bytes) and kafka limit (I think we wanted to set 2kk bytes)
In that case you can simply validate the size on validation step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PARTITIONING,
PUBLISHING,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.zalando.nakadi.exceptions;

import org.zalando.nakadi.validation.ValidationError;
import org.zalando.problem.MoreStatus;

import javax.ws.rs.core.Response;

public class EventSizeValidationException extends NakadiException {
public EventSizeValidationException(final String message) {
super(message);
}

public EventSizeValidationException (final ValidationError validationError) {
super(validationError.getMessage());
}

@Override
protected Response.StatusType getStatus() {
return MoreStatus.UNPROCESSABLE_ENTITY;
}
}
29 changes: 28 additions & 1 deletion src/main/java/org/zalando/nakadi/service/EventPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.domain.BatchFactory;
import org.zalando.nakadi.domain.BatchItem;
import org.zalando.nakadi.domain.BatchItemResponse;
Expand All @@ -16,6 +17,7 @@
import org.zalando.nakadi.enrichment.Enrichment;
import org.zalando.nakadi.exceptions.EnrichmentException;
import org.zalando.nakadi.exceptions.EventPublishingException;
import org.zalando.nakadi.exceptions.EventSizeValidationException;
import org.zalando.nakadi.exceptions.EventValidationException;
import org.zalando.nakadi.exceptions.InternalNakadiException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
Expand All @@ -37,6 +39,8 @@ public class EventPublisher {

private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class);

private final NakadiSettings nakadiSettings;

private final TopicRepository topicRepository;
private final EventTypeCache eventTypeCache;
private final PartitionResolver partitionResolver;
Expand All @@ -46,11 +50,13 @@ public class EventPublisher {
public EventPublisher(final TopicRepository topicRepository,
final EventTypeCache eventTypeCache,
final PartitionResolver partitionResolver,
final Enrichment enrichment) {
final Enrichment enrichment,
final NakadiSettings nakadiSettings) {
this.topicRepository = topicRepository;
this.eventTypeCache = eventTypeCache;
this.partitionResolver = partitionResolver;
this.enrichment = enrichment;
this.nakadiSettings = nakadiSettings;
}

public EventPublishResult publish(final JSONArray events, final String eventTypeName, final Client client)
Expand All @@ -64,6 +70,7 @@ public EventPublishResult publish(final JSONArray events, final String eventType
validate(batch, eventType);
partition(batch, eventType);
enrich(batch, eventType);
validateSize(batch);
submit(batch, eventType);

return ok(batch);
Expand All @@ -76,6 +83,9 @@ public EventPublishResult publish(final JSONArray events, final String eventType
} catch (final EnrichmentException e) {
LOG.debug("Event enrichment error: {}", e.getMessage());
return aborted(EventPublishingStep.ENRICHING, batch);
} catch (final EventSizeValidationException e) {
LOG.debug("Event size validation error: ){", e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

){ -> {} ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return aborted(EventPublishingStep.VALIDATING_SIZE, batch);
} catch (final EventPublishingException e) {
LOG.error("error publishing event", e);
return failed(batch);
Expand Down Expand Up @@ -145,6 +155,23 @@ private void validateSchema(final JSONObject event, final EventType eventType) t
}
}

private void validateSize(final List<BatchItem> batch) throws EventSizeValidationException {
for (final BatchItem item: batch) {
item.setStep(EventPublishingStep.VALIDATING_SIZE);
try {
validateEventSize(item);
} catch (final EventSizeValidationException e) {
item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage());
throw e;
}
}
}

private void validateEventSize(final BatchItem item) throws EventSizeValidationException {
if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never use getBytes() without setting the default locale. Please always pass https://docs.oracle.com/javase/7/docs/api/java/nio/charset/StandardCharsets.html#UTF_8 object into that function.

Actually we have to check that we do not use cases where this stupid getByte() is done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem is to actually to do that conversion. There is the byte array representation that you can use. Or you should store the size into the item to avoid that conversion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this. There are a few other places in the code where we use getByte() without setting the locale, I'll create a ticket for that.

throw new EventSizeValidationException("Event too large");
}

private EventPublishResult failed(final List<BatchItem> batch) {
return new EventPublishResult(EventPublishingStatus.FAILED, EventPublishingStep.PUBLISHING, responses(batch));
}
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ nakadi:
auth:
plugin:
factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory
event.max.bytes: 1000000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though we agreed on 1MB which is a bit bigger :)
1048576

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went for the default value in Kafka, which is 1,000,000, but I'm happy to go for a real 1MB if you think it is better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also there is a problem with that, if you occasionally want to change producer's property message.max.bytes(and others related), let say to 3MB, it won't work because you restrict it to 1MB and you always have to remember about event.max.bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://kafka.apache.org/090/documentation.html#brokerconfigs
it is different according to docs :)
message.max.bytes= 1000012

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also there is a problem with that, if you occasionally want to change producer's property message.max.bytes(and others related), let say to 3MB, it won't work because you restrict it to 1MB and you always have to remember about event.max.bytes.

Why is it a problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because you increased kafka broker message size to 3MB and expect that it is gonna be 3MB, but actually it only 1MB

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adyach Why you expect that if you know that Nakadi validates event size as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

today I expect that, because I remember :) it introduces another property to set when you try to change the size of the message and Kafka has a lot of them as well. if we could make it relative to Kafka producer it would be simpler to not forget.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adyach I think the property in Nakadi should be primary and the property in Kafka should just satisfy the property in Nakadi. So if somebody will change in Kafka without looking to the value in Nakadi - he does something really wrong.

But I still don't really understand what the problem is.
One can say: what if somebody forgets how to operate AWS and will delete the whole kafka stack and we will loose all the data :) For me these are the problems of the same sort.

---

spring:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class EventTypeControllerTest {
private static final long TOPIC_RETENTION_TIME_MS = 150;
private static final int NAKADI_SEND_TIMEOUT = 10000;
private static final int NAKADI_POLL_TIMEOUT = 10000;
private static final long NAKADI_EVENT_MAX_BYTES = 1000000;
private final EventTypeRepository eventTypeRepository = mock(EventTypeRepository.class);
private final TopicRepository topicRepository = mock(TopicRepository.class);
private final PartitionResolver partitionResolver = mock(PartitionResolver.class);
Expand All @@ -111,7 +112,7 @@ public void init() throws Exception {
final EventTypeOptionsValidator eventTypeOptionsValidator =
new EventTypeOptionsValidator(TOPIC_RETENTION_MIN_MS, TOPIC_RETENTION_MAX_MS);
final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60,
NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT);
NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, NAKADI_EVENT_MAX_BYTES);
final EventTypeController controller = new EventTypeController(eventTypeService,
featureToggleService, eventTypeOptionsValidator, applicationService, nakadiSettings);

Expand Down
Loading