Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/master' into aruha-1528-audience
Browse files Browse the repository at this point in the history
  • Loading branch information
rcillo committed Jun 26, 2018
2 parents d8bb1cc + ca53711 commit 734da2d
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 82 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- Extended event type's definition to support ordering_key_field attribute

### Removed
- Removed high-level API feature flag

### Changed
- Added feature toggle to make it possible to remove event-types together with subscriptions

Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ services:
- NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_EVENT_TYPE_CREATION
- NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_EVENT_TYPE_DELETION
- NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_SUBSCRIPTION_CREATION
- NAKADI_FEATURES_DEFAULT_FEATURES_HIGH_LEVEL_API
- NAKADI_FEATURES_DEFAULT_FEATURES_CHECK_PARTITIONS_KEYS
- NAKADI_FEATURES_DEFAULT_FEATURES_CHECK_OWNING_APPLICATION
- NAKADI_FEATURES_DEFAULT_FEATURES_LIMIT_CONSUMERS_NUMBER
Expand Down
29 changes: 29 additions & 0 deletions docs/_data/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2468,6 +2468,35 @@ definitions:
authorization:
$ref: '#/definitions/EventTypeAuthorization'

ordering_key_fields:
type: array
description: |
This is only an informational field. The events are delivered to consumers in the order they were published.
No reordering is done by Nakadi.
This field is useful in case the producer wants to communicate the complete order accross all the events
published to all partitions. This is the case when there is an incremental generator on the producer side,
for example.
It differs from `partition_key_fields` in the sense that it's not used for partitioning (known as sharding in
some systems). The order indicated by `ordering_key_fields` can also differ from the order the events are in
each partition, in case of out-of-order submission.
In most cases, this would have just a single item (the path of the field
by which this is to be ordered), but can have multiple items, in which case
those are considered as a compound key, with lexicographic ordering (first
item is most significant).
items:
type: string
description: |
Indicates a single ordering field. This is a dot separated string, which is applied
onto the whole event object, including the contained metadata and data (in
case of a data change event) objects.
The field must be present in the schema. This field can be modified at any moment, but event type owners are
expected to notify consumer in advance about the change.
example: "data.incremental_counter"

audience:
type: string
x-extensible-enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import com.jayway.restassured.RestAssured;
import com.jayway.restassured.response.Header;
Expand Down Expand Up @@ -90,9 +91,11 @@ public void userJourneyM1() throws InterruptedException, IOException {
.body("owning_application", equalTo(owningApp))
.body("category", equalTo("undefined"))
.body("audience", equalTo("external-public"))
.body("ordering_key_fields", equalTo(Lists.newArrayList("foo", "bar.baz")))
.body("schema.type", equalTo("json_schema"))
.body("schema.schema", equalTo("{\"type\": \"object\", \"properties\": " +
"{\"foo\": {\"type\": \"string\"}}, \"required\": [\"foo\"]}"));
.body("schema.schema", equalTo("{\"type\": \"object\", \"properties\": {\"foo\": " +
"{\"type\": \"string\"}, \"bar\": {\"type\": \"object\", \"properties\": " +
"{\"baz\": {\"type\": \"string\"}}}}, \"required\": [\"foo\"]}"));

// list event types
jsonRequestSpec()
Expand Down
60 changes: 27 additions & 33 deletions src/main/java/org/zalando/nakadi/controller/CursorsController.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static org.springframework.http.ResponseEntity.noContent;
import static org.springframework.http.ResponseEntity.ok;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API;
import static org.zalando.problem.MoreStatus.UNPROCESSABLE_ENTITY;
import static org.zalando.problem.spring.web.advice.Responses.create;

Expand All @@ -77,7 +76,6 @@ public CursorsController(final CursorsService cursorsService,

@RequestMapping(path = "/subscriptions/{subscriptionId}/cursors", method = RequestMethod.GET)
public ItemsWrapper<SubscriptionCursor> getCursors(@PathVariable("subscriptionId") final String subscriptionId) {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);
try {
final List<SubscriptionCursor> cursors = cursorsService.getSubscriptionCursors(subscriptionId)
.stream()
Expand All @@ -99,37 +97,34 @@ public ResponseEntity<?> commitCursors(@PathVariable("subscriptionId") final Str
"COMMIT_CURSORS sid:" + subscriptionId + ", size=" + cursorsIn.getItems().size(),
"isFeatureEnabled");
try {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);

try {
TimeLogger.addMeasure("convertToNakadiCursors");
final List<NakadiCursor> cursors = convertToNakadiCursors(cursorsIn);
if (cursors.isEmpty()) {
throw new CursorsAreEmptyException();
}
TimeLogger.addMeasure("callService");
final List<Boolean> items = cursorsService.commitCursors(streamId, subscriptionId, cursors);

TimeLogger.addMeasure("prepareResponse");
final boolean allCommited = items.stream().allMatch(item -> item);
if (allCommited) {
return noContent().build();
} else {
final List<CursorCommitResult> body = IntStream.range(0, cursorsIn.getItems().size())
.mapToObj(idx -> new CursorCommitResult(cursorsIn.getItems().get(idx), items.get(idx)))
.collect(Collectors.toList());
return ok(new ItemsWrapper<>(body));
}
} catch (final NoSuchEventTypeException | InvalidCursorException e) {
return create(Problem.valueOf(UNPROCESSABLE_ENTITY, e.getMessage()), request);
} catch (final ServiceTemporarilyUnavailableException e) {
LOG.error("Failed to commit cursors", e);
return create(Problem.valueOf(SERVICE_UNAVAILABLE, e.getMessage()), request);
} catch (final NakadiException e) {
LOG.error("Failed to commit cursors", e);
return create(e.asProblem(), request);
TimeLogger.addMeasure("convertToNakadiCursors");
final List<NakadiCursor> cursors = convertToNakadiCursors(cursorsIn);
if (cursors.isEmpty()) {
throw new CursorsAreEmptyException();
}
} finally {
TimeLogger.addMeasure("callService");
final List<Boolean> items = cursorsService.commitCursors(streamId, subscriptionId, cursors);

TimeLogger.addMeasure("prepareResponse");
final boolean allCommited = items.stream().allMatch(item -> item);
if (allCommited) {
return noContent().build();
} else {
final List<CursorCommitResult> body = IntStream.range(0, cursorsIn.getItems().size())
.mapToObj(idx -> new CursorCommitResult(cursorsIn.getItems().get(idx), items.get(idx)))
.collect(Collectors.toList());
return ok(new ItemsWrapper<>(body));
}
} catch (final NoSuchEventTypeException | InvalidCursorException e) {
return create(Problem.valueOf(UNPROCESSABLE_ENTITY, e.getMessage()), request);
} catch (final ServiceTemporarilyUnavailableException e) {
LOG.error("Failed to commit cursors", e);
return create(Problem.valueOf(SERVICE_UNAVAILABLE, e.getMessage()), request);
} catch (final NakadiException e) {
LOG.error("Failed to commit cursors", e);
return create(e.asProblem(), request);
}
finally {
LOG.info(TimeLogger.finishMeasure());
}
}
Expand All @@ -139,7 +134,6 @@ public ResponseEntity<?> resetCursors(
@PathVariable("subscriptionId") final String subscriptionId,
@Valid @RequestBody final ItemsWrapper<SubscriptionCursorWithoutToken> cursors,
final NativeWebRequest request) {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);
try {
cursorsService.resetCursors(subscriptionId, convertToNakadiCursors(cursors));
return noContent().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.NakadiCursorLag;
import org.zalando.nakadi.domain.PartitionEndStatistics;
import org.zalando.nakadi.domain.PartitionStatistics;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.InternalNakadiException;
import org.zalando.nakadi.exceptions.InvalidCursorException;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.runtime.NotFoundException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.exceptions.runtime.NotFoundException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.service.AuthorizationValidator;
Expand Down Expand Up @@ -99,7 +100,7 @@ public ResponseEntity<?> listPartitions(@PathVariable("name") final String event
eventTypeName,
first.getPartition(),
cursorConverter.convert(first.getFirst()).getOffset(),
cursorConverter.convert(last.getLast()).getOffset());
cursorConverter.convert(selectLast(timelines, last, first)).getOffset());
}).collect(Collectors.toList());
return ok().body(result);
} catch (final NoSuchEventTypeException e) {
Expand All @@ -111,10 +112,11 @@ public ResponseEntity<?> listPartitions(@PathVariable("name") final String event
}

@RequestMapping(value = "/event-types/{name}/partitions/{partition}", method = RequestMethod.GET)
public ResponseEntity<?> getPartition(@PathVariable("name") final String eventTypeName,
@PathVariable("partition") final String partition,
@Nullable @RequestParam(value = "consumed_offset", required = false)
final String consumedOffset, final NativeWebRequest request) {
public ResponseEntity<?> getPartition(
@PathVariable("name") final String eventTypeName,
@PathVariable("partition") final String partition,
@Nullable @RequestParam(value = "consumed_offset", required = false) final String consumedOffset,
final NativeWebRequest request) {
LOG.trace("Get partition endpoint for event-type '{}', partition '{}' is called", eventTypeName, partition);
try {
final EventType eventType = eventTypeRepository.findByName(eventTypeName);
Expand Down Expand Up @@ -172,19 +174,22 @@ private EventTypePartitionView getTopicPartition(final String eventTypeName, fin
if (!firstStats.isPresent()) {
throw new NotFoundException("partition not found");
}
final PartitionStatistics lastStats;
final NakadiCursor newest;
if (timelines.size() == 1) {
lastStats = firstStats.get();
} else {
lastStats = timelineService.getTopicRepository(timelines.get(timelines.size() - 1))
.loadPartitionStatistics(timelines.get(timelines.size() - 1), partition).get();
newest = firstStats.get().getLast();
} else {
final PartitionStatistics lastStats = timelineService
.getTopicRepository(timelines.get(timelines.size() - 1))
.loadPartitionStatistics(timelines.get(timelines.size() - 1), partition)
.get();
newest = selectLast(timelines, lastStats, firstStats.get());
}

return new EventTypePartitionView(
eventTypeName,
lastStats.getPartition(),
partition,
cursorConverter.convert(firstStats.get().getFirst()).getOffset(),
cursorConverter.convert(lastStats.getLast()).getOffset());
cursorConverter.convert(newest).getOffset());
}

private CursorLag toCursorLag(final NakadiCursorLag nakadiCursorLag) {
Expand All @@ -195,4 +200,26 @@ private CursorLag toCursorLag(final NakadiCursorLag nakadiCursorLag) {
nakadiCursorLag.getLag()
);
}


private static NakadiCursor selectLast(final List<Timeline> activeTimelines, final PartitionEndStatistics last,
final PartitionStatistics first) {
final NakadiCursor lastInLastTimeline = last.getLast();
if (!lastInLastTimeline.isInitial()) {
return lastInLastTimeline;
}
// There may be a situation, when there is no data in all the timelines after first, but any cursor from the
// next after first timelines is greater then the cursor in first timeline. Therefore we need to roll pointer
// to the end back till the very beginning or to the end of first timeline with data.
for (int idx = activeTimelines.size() - 2; idx > 0; --idx) {
final Timeline timeline = activeTimelines.get(idx);
final NakadiCursor lastInTimeline = timeline.getLatestPosition()
.toNakadiCursor(timeline, first.getPartition());
if (!lastInTimeline.isInitial()) {
return lastInLastTimeline;
}
}
return first.getLast();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import static org.springframework.http.HttpStatus.OK;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.CHECK_OWNING_APPLICATION;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_SUBSCRIPTION_CREATION;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API;


@RestController
Expand All @@ -66,8 +65,6 @@ public ResponseEntity<?> createOrGetSubscription(@Valid @RequestBody final Subsc
final Errors errors,
final NativeWebRequest request,
final Client client) {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);

if (errors.hasErrors()) {
return Responses.create(new ValidationProblem(errors), request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import static javax.ws.rs.core.Response.Status.NOT_IMPLEMENTED;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API;
import static org.zalando.problem.MoreStatus.UNPROCESSABLE_ENTITY;


Expand Down Expand Up @@ -61,7 +60,6 @@ public ResponseEntity<?> listSubscriptions(
@RequestParam(value = "limit", required = false, defaultValue = "20") final int limit,
@RequestParam(value = "offset", required = false, defaultValue = "0") final int offset,
final NativeWebRequest request) {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);

return WebResult.wrap(
() -> subscriptionService.listSubscriptions(owningApplication, eventTypes, showStatus, limit, offset),
Expand All @@ -71,16 +69,12 @@ public ResponseEntity<?> listSubscriptions(
@RequestMapping(value = "/{id}", method = RequestMethod.GET)
public ResponseEntity<?> getSubscription(@PathVariable("id") final String subscriptionId,
final NativeWebRequest request) {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);

return WebResult.wrap(() -> subscriptionService.getSubscription(subscriptionId), request);
}

@RequestMapping(value = "/{id}", method = RequestMethod.DELETE)
public ResponseEntity<?> deleteSubscription(@PathVariable("id") final String subscriptionId,
final NativeWebRequest request) {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);

return WebResult.wrap(() -> subscriptionService.deleteSubscription(subscriptionId), request,
HttpStatus.NO_CONTENT);
}
Expand All @@ -90,8 +84,6 @@ public ItemsWrapper<SubscriptionEventTypeStats> getSubscriptionStats(
@PathVariable("id") final String subscriptionId,
@RequestParam(value = "show_time_lag", required = false, defaultValue = "false") final boolean showTimeLag)
throws NakadiException, InconsistentStateException, ServiceTemporarilyUnavailableException {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);

final StatsMode statsMode = showTimeLag ? StatsMode.TIMELAG : StatsMode.NORMAL;
return subscriptionService.getSubscriptionStat(subscriptionId, statsMode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.zalando.nakadi.metrics.MetricUtils.metricNameForSubscription;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API;

@RestController
public class SubscriptionStreamController {
Expand Down Expand Up @@ -197,11 +196,6 @@ private StreamingResponseBody stream(final String subscriptionId,
return outputStream -> {
FlowIdUtils.push(flowId);

if (!featureToggleService.isFeatureEnabled(HIGH_LEVEL_API)) {
response.setStatus(HttpServletResponse.SC_NOT_IMPLEMENTED);
return;
}

final String metricName = metricNameForSubscription(subscriptionId, CONSUMERS_COUNT_METRIC_NAME);
final Counter consumerCounter = metricRegistry.counter(metricName);
consumerCounter.inc();
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/org/zalando/nakadi/domain/EventTypeBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
public class EventTypeBase {

private static final List<String> EMPTY_PARTITION_KEY_FIELDS = ImmutableList.of();
private static final List<String> EMPTY_ORDERING_KEY_FIELDS = ImmutableList.of();

@NotNull
@Pattern(regexp = "[a-zA-Z][-0-9a-zA-Z_]*(\\.[0-9a-zA-Z][-0-9a-zA-Z_]*)*", message = "format not allowed")
Expand All @@ -41,6 +42,9 @@ public class EventTypeBase {
@Nullable
private List<String> partitionKeyFields;

@Nullable
private List<String> orderingKeyFields;

@Valid
@NotNull
private EventTypeSchemaBase schema;
Expand Down Expand Up @@ -106,6 +110,7 @@ public EventTypeBase(final EventTypeBase eventType) {
this.setCompatibilityMode(eventType.getCompatibilityMode());
this.setAuthorization(eventType.getAuthorization());
this.setAudience(eventType.getAudience());
this.setOrderingKeyFields(eventType.getOrderingKeyFields());
}

public String getName() {
Expand Down Expand Up @@ -168,6 +173,14 @@ public void setPartitionKeyFields(final List<String> partitionKeyFields) {
this.partitionKeyFields = partitionKeyFields;
}

public List<String> getOrderingKeyFields() {
return unmodifiableList(orderingKeyFields != null ? orderingKeyFields : EMPTY_ORDERING_KEY_FIELDS);
}

public void setOrderingKeyFields(@Nullable final List<String> orderingKeyFields) {
this.orderingKeyFields = orderingKeyFields;
}

public List<EnrichmentStrategyDescriptor> getEnrichmentStrategies() {
return enrichmentStrategies;
}
Expand Down
Loading

0 comments on commit 734da2d

Please sign in to comment.