diff --git a/CHANGELOG.md b/CHANGELOG.md index 191b35e269..e951120028 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Added +- Extended event type's definition to support ordering_key_field attribute + +### Removed +- Removed high-level API feature flag + ### Changed - Added feature toggle to make it possible to remove event-types together with subscriptions diff --git a/docker-compose.yml b/docker-compose.yml index 6457154cac..d8efb8335e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,7 +16,6 @@ services: - NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_EVENT_TYPE_CREATION - NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_EVENT_TYPE_DELETION - NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_SUBSCRIPTION_CREATION - - NAKADI_FEATURES_DEFAULT_FEATURES_HIGH_LEVEL_API - NAKADI_FEATURES_DEFAULT_FEATURES_CHECK_PARTITIONS_KEYS - NAKADI_FEATURES_DEFAULT_FEATURES_CHECK_OWNING_APPLICATION - NAKADI_FEATURES_DEFAULT_FEATURES_LIMIT_CONSUMERS_NUMBER diff --git a/docs/_data/nakadi-event-bus-api.yaml b/docs/_data/nakadi-event-bus-api.yaml index 7a016b4328..fe77194d83 100644 --- a/docs/_data/nakadi-event-bus-api.yaml +++ b/docs/_data/nakadi-event-bus-api.yaml @@ -2468,6 +2468,35 @@ definitions: authorization: $ref: '#/definitions/EventTypeAuthorization' + ordering_key_fields: + type: array + description: | + This is only an informational field. The events are delivered to consumers in the order they were published. + No reordering is done by Nakadi. + + This field is useful in case the producer wants to communicate the complete order accross all the events + published to all partitions. This is the case when there is an incremental generator on the producer side, + for example. + + It differs from `partition_key_fields` in the sense that it's not used for partitioning (known as sharding in + some systems). The order indicated by `ordering_key_fields` can also differ from the order the events are in + each partition, in case of out-of-order submission. + + In most cases, this would have just a single item (the path of the field + by which this is to be ordered), but can have multiple items, in which case + those are considered as a compound key, with lexicographic ordering (first + item is most significant). + items: + type: string + description: | + Indicates a single ordering field. This is a dot separated string, which is applied + onto the whole event object, including the contained metadata and data (in + case of a data change event) objects. + + The field must be present in the schema. This field can be modified at any moment, but event type owners are + expected to notify consumer in advance about the change. + example: "data.incremental_counter" + audience: type: string x-extensible-enum: diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java index b5bde85225..ca7cbf3caf 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java @@ -4,6 +4,7 @@ import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.io.Resources; import com.jayway.restassured.RestAssured; import com.jayway.restassured.response.Header; @@ -90,9 +91,11 @@ public void userJourneyM1() throws InterruptedException, IOException { .body("owning_application", equalTo(owningApp)) .body("category", equalTo("undefined")) .body("audience", equalTo("external-public")) + .body("ordering_key_fields", equalTo(Lists.newArrayList("foo", "bar.baz"))) .body("schema.type", equalTo("json_schema")) - .body("schema.schema", equalTo("{\"type\": \"object\", \"properties\": " + - "{\"foo\": {\"type\": \"string\"}}, \"required\": [\"foo\"]}")); + .body("schema.schema", equalTo("{\"type\": \"object\", \"properties\": {\"foo\": " + + "{\"type\": \"string\"}, \"bar\": {\"type\": \"object\", \"properties\": " + + "{\"baz\": {\"type\": \"string\"}}}}, \"required\": [\"foo\"]}")); // list event types jsonRequestSpec() diff --git a/src/main/java/org/zalando/nakadi/controller/CursorsController.java b/src/main/java/org/zalando/nakadi/controller/CursorsController.java index 60392c5d45..2b2ddf9487 100644 --- a/src/main/java/org/zalando/nakadi/controller/CursorsController.java +++ b/src/main/java/org/zalando/nakadi/controller/CursorsController.java @@ -50,7 +50,6 @@ import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import static org.springframework.http.ResponseEntity.noContent; import static org.springframework.http.ResponseEntity.ok; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API; import static org.zalando.problem.MoreStatus.UNPROCESSABLE_ENTITY; import static org.zalando.problem.spring.web.advice.Responses.create; @@ -77,7 +76,6 @@ public CursorsController(final CursorsService cursorsService, @RequestMapping(path = "/subscriptions/{subscriptionId}/cursors", method = RequestMethod.GET) public ItemsWrapper getCursors(@PathVariable("subscriptionId") final String subscriptionId) { - featureToggleService.checkFeatureOn(HIGH_LEVEL_API); try { final List cursors = cursorsService.getSubscriptionCursors(subscriptionId) .stream() @@ -99,37 +97,34 @@ public ResponseEntity commitCursors(@PathVariable("subscriptionId") final Str "COMMIT_CURSORS sid:" + subscriptionId + ", size=" + cursorsIn.getItems().size(), "isFeatureEnabled"); try { - featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - - try { - TimeLogger.addMeasure("convertToNakadiCursors"); - final List cursors = convertToNakadiCursors(cursorsIn); - if (cursors.isEmpty()) { - throw new CursorsAreEmptyException(); - } - TimeLogger.addMeasure("callService"); - final List items = cursorsService.commitCursors(streamId, subscriptionId, cursors); - - TimeLogger.addMeasure("prepareResponse"); - final boolean allCommited = items.stream().allMatch(item -> item); - if (allCommited) { - return noContent().build(); - } else { - final List body = IntStream.range(0, cursorsIn.getItems().size()) - .mapToObj(idx -> new CursorCommitResult(cursorsIn.getItems().get(idx), items.get(idx))) - .collect(Collectors.toList()); - return ok(new ItemsWrapper<>(body)); - } - } catch (final NoSuchEventTypeException | InvalidCursorException e) { - return create(Problem.valueOf(UNPROCESSABLE_ENTITY, e.getMessage()), request); - } catch (final ServiceTemporarilyUnavailableException e) { - LOG.error("Failed to commit cursors", e); - return create(Problem.valueOf(SERVICE_UNAVAILABLE, e.getMessage()), request); - } catch (final NakadiException e) { - LOG.error("Failed to commit cursors", e); - return create(e.asProblem(), request); + TimeLogger.addMeasure("convertToNakadiCursors"); + final List cursors = convertToNakadiCursors(cursorsIn); + if (cursors.isEmpty()) { + throw new CursorsAreEmptyException(); } - } finally { + TimeLogger.addMeasure("callService"); + final List items = cursorsService.commitCursors(streamId, subscriptionId, cursors); + + TimeLogger.addMeasure("prepareResponse"); + final boolean allCommited = items.stream().allMatch(item -> item); + if (allCommited) { + return noContent().build(); + } else { + final List body = IntStream.range(0, cursorsIn.getItems().size()) + .mapToObj(idx -> new CursorCommitResult(cursorsIn.getItems().get(idx), items.get(idx))) + .collect(Collectors.toList()); + return ok(new ItemsWrapper<>(body)); + } + } catch (final NoSuchEventTypeException | InvalidCursorException e) { + return create(Problem.valueOf(UNPROCESSABLE_ENTITY, e.getMessage()), request); + } catch (final ServiceTemporarilyUnavailableException e) { + LOG.error("Failed to commit cursors", e); + return create(Problem.valueOf(SERVICE_UNAVAILABLE, e.getMessage()), request); + } catch (final NakadiException e) { + LOG.error("Failed to commit cursors", e); + return create(e.asProblem(), request); + } + finally { LOG.info(TimeLogger.finishMeasure()); } } @@ -139,7 +134,6 @@ public ResponseEntity resetCursors( @PathVariable("subscriptionId") final String subscriptionId, @Valid @RequestBody final ItemsWrapper cursors, final NativeWebRequest request) { - featureToggleService.checkFeatureOn(HIGH_LEVEL_API); try { cursorsService.resetCursors(subscriptionId, convertToNakadiCursors(cursors)); return noContent().build(); diff --git a/src/main/java/org/zalando/nakadi/controller/PartitionsController.java b/src/main/java/org/zalando/nakadi/controller/PartitionsController.java index b600fa4e1b..709ce97aed 100644 --- a/src/main/java/org/zalando/nakadi/controller/PartitionsController.java +++ b/src/main/java/org/zalando/nakadi/controller/PartitionsController.java @@ -15,15 +15,16 @@ import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.NakadiCursorLag; +import org.zalando.nakadi.domain.PartitionEndStatistics; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; -import org.zalando.nakadi.exceptions.runtime.NotFoundException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation; import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; +import org.zalando.nakadi.exceptions.runtime.NotFoundException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.service.AuthorizationValidator; @@ -99,7 +100,7 @@ public ResponseEntity listPartitions(@PathVariable("name") final String event eventTypeName, first.getPartition(), cursorConverter.convert(first.getFirst()).getOffset(), - cursorConverter.convert(last.getLast()).getOffset()); + cursorConverter.convert(selectLast(timelines, last, first)).getOffset()); }).collect(Collectors.toList()); return ok().body(result); } catch (final NoSuchEventTypeException e) { @@ -111,10 +112,11 @@ public ResponseEntity listPartitions(@PathVariable("name") final String event } @RequestMapping(value = "/event-types/{name}/partitions/{partition}", method = RequestMethod.GET) - public ResponseEntity getPartition(@PathVariable("name") final String eventTypeName, - @PathVariable("partition") final String partition, - @Nullable @RequestParam(value = "consumed_offset", required = false) - final String consumedOffset, final NativeWebRequest request) { + public ResponseEntity getPartition( + @PathVariable("name") final String eventTypeName, + @PathVariable("partition") final String partition, + @Nullable @RequestParam(value = "consumed_offset", required = false) final String consumedOffset, + final NativeWebRequest request) { LOG.trace("Get partition endpoint for event-type '{}', partition '{}' is called", eventTypeName, partition); try { final EventType eventType = eventTypeRepository.findByName(eventTypeName); @@ -172,19 +174,22 @@ private EventTypePartitionView getTopicPartition(final String eventTypeName, fin if (!firstStats.isPresent()) { throw new NotFoundException("partition not found"); } - final PartitionStatistics lastStats; + final NakadiCursor newest; if (timelines.size() == 1) { - lastStats = firstStats.get(); - } else { - lastStats = timelineService.getTopicRepository(timelines.get(timelines.size() - 1)) - .loadPartitionStatistics(timelines.get(timelines.size() - 1), partition).get(); + newest = firstStats.get().getLast(); + } else { + final PartitionStatistics lastStats = timelineService + .getTopicRepository(timelines.get(timelines.size() - 1)) + .loadPartitionStatistics(timelines.get(timelines.size() - 1), partition) + .get(); + newest = selectLast(timelines, lastStats, firstStats.get()); } return new EventTypePartitionView( eventTypeName, - lastStats.getPartition(), + partition, cursorConverter.convert(firstStats.get().getFirst()).getOffset(), - cursorConverter.convert(lastStats.getLast()).getOffset()); + cursorConverter.convert(newest).getOffset()); } private CursorLag toCursorLag(final NakadiCursorLag nakadiCursorLag) { @@ -195,4 +200,26 @@ private CursorLag toCursorLag(final NakadiCursorLag nakadiCursorLag) { nakadiCursorLag.getLag() ); } + + + private static NakadiCursor selectLast(final List activeTimelines, final PartitionEndStatistics last, + final PartitionStatistics first) { + final NakadiCursor lastInLastTimeline = last.getLast(); + if (!lastInLastTimeline.isInitial()) { + return lastInLastTimeline; + } + // There may be a situation, when there is no data in all the timelines after first, but any cursor from the + // next after first timelines is greater then the cursor in first timeline. Therefore we need to roll pointer + // to the end back till the very beginning or to the end of first timeline with data. + for (int idx = activeTimelines.size() - 2; idx > 0; --idx) { + final Timeline timeline = activeTimelines.get(idx); + final NakadiCursor lastInTimeline = timeline.getLatestPosition() + .toNakadiCursor(timeline, first.getPartition()); + if (!lastInTimeline.isInitial()) { + return lastInLastTimeline; + } + } + return first.getLast(); + } + } diff --git a/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java index 26571006b7..cf9ff44ccb 100644 --- a/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java @@ -40,7 +40,6 @@ import static org.springframework.http.HttpStatus.OK; import static org.zalando.nakadi.service.FeatureToggleService.Feature.CHECK_OWNING_APPLICATION; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_SUBSCRIPTION_CREATION; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API; @RestController @@ -66,8 +65,6 @@ public ResponseEntity createOrGetSubscription(@Valid @RequestBody final Subsc final Errors errors, final NativeWebRequest request, final Client client) { - featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - if (errors.hasErrors()) { return Responses.create(new ValidationProblem(errors), request); } diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java index c53c0e2320..3116a2029a 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java @@ -33,7 +33,6 @@ import static javax.ws.rs.core.Response.Status.NOT_IMPLEMENTED; import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API; import static org.zalando.problem.MoreStatus.UNPROCESSABLE_ENTITY; @@ -61,7 +60,6 @@ public ResponseEntity listSubscriptions( @RequestParam(value = "limit", required = false, defaultValue = "20") final int limit, @RequestParam(value = "offset", required = false, defaultValue = "0") final int offset, final NativeWebRequest request) { - featureToggleService.checkFeatureOn(HIGH_LEVEL_API); return WebResult.wrap( () -> subscriptionService.listSubscriptions(owningApplication, eventTypes, showStatus, limit, offset), @@ -71,16 +69,12 @@ public ResponseEntity listSubscriptions( @RequestMapping(value = "/{id}", method = RequestMethod.GET) public ResponseEntity getSubscription(@PathVariable("id") final String subscriptionId, final NativeWebRequest request) { - featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - return WebResult.wrap(() -> subscriptionService.getSubscription(subscriptionId), request); } @RequestMapping(value = "/{id}", method = RequestMethod.DELETE) public ResponseEntity deleteSubscription(@PathVariable("id") final String subscriptionId, final NativeWebRequest request) { - featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - return WebResult.wrap(() -> subscriptionService.deleteSubscription(subscriptionId), request, HttpStatus.NO_CONTENT); } @@ -90,8 +84,6 @@ public ItemsWrapper getSubscriptionStats( @PathVariable("id") final String subscriptionId, @RequestParam(value = "show_time_lag", required = false, defaultValue = "false") final boolean showTimeLag) throws NakadiException, InconsistentStateException, ServiceTemporarilyUnavailableException { - featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - final StatsMode statsMode = showTimeLag ? StatsMode.TIMELAG : StatsMode.NORMAL; return subscriptionService.getSubscriptionStat(subscriptionId, statsMode); } diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java index 9a13c81848..7abe9f8c62 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java @@ -52,7 +52,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.zalando.nakadi.metrics.MetricUtils.metricNameForSubscription; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API; @RestController public class SubscriptionStreamController { @@ -197,11 +196,6 @@ private StreamingResponseBody stream(final String subscriptionId, return outputStream -> { FlowIdUtils.push(flowId); - if (!featureToggleService.isFeatureEnabled(HIGH_LEVEL_API)) { - response.setStatus(HttpServletResponse.SC_NOT_IMPLEMENTED); - return; - } - final String metricName = metricNameForSubscription(subscriptionId, CONSUMERS_COUNT_METRIC_NAME); final Counter consumerCounter = metricRegistry.counter(metricName); consumerCounter.inc(); diff --git a/src/main/java/org/zalando/nakadi/domain/EventTypeBase.java b/src/main/java/org/zalando/nakadi/domain/EventTypeBase.java index d2dd705585..df4862707f 100644 --- a/src/main/java/org/zalando/nakadi/domain/EventTypeBase.java +++ b/src/main/java/org/zalando/nakadi/domain/EventTypeBase.java @@ -18,6 +18,7 @@ public class EventTypeBase { private static final List EMPTY_PARTITION_KEY_FIELDS = ImmutableList.of(); + private static final List EMPTY_ORDERING_KEY_FIELDS = ImmutableList.of(); @NotNull @Pattern(regexp = "[a-zA-Z][-0-9a-zA-Z_]*(\\.[0-9a-zA-Z][-0-9a-zA-Z_]*)*", message = "format not allowed") @@ -41,6 +42,9 @@ public class EventTypeBase { @Nullable private List partitionKeyFields; + @Nullable + private List orderingKeyFields; + @Valid @NotNull private EventTypeSchemaBase schema; @@ -106,6 +110,7 @@ public EventTypeBase(final EventTypeBase eventType) { this.setCompatibilityMode(eventType.getCompatibilityMode()); this.setAuthorization(eventType.getAuthorization()); this.setAudience(eventType.getAudience()); + this.setOrderingKeyFields(eventType.getOrderingKeyFields()); } public String getName() { @@ -168,6 +173,14 @@ public void setPartitionKeyFields(final List partitionKeyFields) { this.partitionKeyFields = partitionKeyFields; } + public List getOrderingKeyFields() { + return unmodifiableList(orderingKeyFields != null ? orderingKeyFields : EMPTY_ORDERING_KEY_FIELDS); + } + + public void setOrderingKeyFields(@Nullable final List orderingKeyFields) { + this.orderingKeyFields = orderingKeyFields; + } + public List getEnrichmentStrategies() { return enrichmentStrategies; } diff --git a/src/main/java/org/zalando/nakadi/service/EventTypeService.java b/src/main/java/org/zalando/nakadi/service/EventTypeService.java index a6905f2f67..f113235929 100644 --- a/src/main/java/org/zalando/nakadi/service/EventTypeService.java +++ b/src/main/java/org/zalando/nakadi/service/EventTypeService.java @@ -486,6 +486,8 @@ private void validateSchema(final EventTypeBase eventType) throws InvalidEventTy validatePartitionKeys(schema, eventType); } + validateOrderingKeys(schema, eventType); + if (eventType.getCompatibilityMode() == CompatibilityMode.COMPATIBLE) { validateJsonSchemaConstraints(schemaAsJson); } @@ -516,6 +518,16 @@ private void validatePartitionKeys(final Schema schema, final EventTypeBase even } } + private void validateOrderingKeys(final Schema schema, final EventTypeBase eventType) + throws InvalidEventTypeException, JSONException, SchemaException { + final List absentFields = eventType.getOrderingKeyFields().stream() + .filter(field -> !schema.definesProperty(convertToJSONPointer(field))) + .collect(Collectors.toList()); + if (!absentFields.isEmpty()) { + throw new InvalidEventTypeException("ordering_key_fields " + absentFields + " absent in schema"); + } + } + private String convertToJSONPointer(final String value) { return value.replaceAll("\\.", "/"); } diff --git a/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java b/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java index 20039376e6..b6fb4afc59 100644 --- a/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java +++ b/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java @@ -33,7 +33,6 @@ enum Feature { DISABLE_EVENT_TYPE_DELETION("disable_event_type_deletion"), DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS("delete_event_type_with_subscriptions"), DISABLE_SUBSCRIPTION_CREATION("disable_subscription_creation"), - HIGH_LEVEL_API("high_level_api"), CHECK_PARTITIONS_KEYS("check_partitions_keys"), CHECK_OWNING_APPLICATION("check_owning_application"), LIMIT_CONSUMERS_NUMBER("limit_consumers_number"), diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7c4fb7dc86..0039ccf508 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -138,7 +138,6 @@ nakadi: DISABLE_EVENT_TYPE_CREATION: false DISABLE_EVENT_TYPE_DELETION: false DISABLE_SUBSCRIPTION_CREATION: false - HIGH_LEVEL_API: true CHECK_PARTITIONS_KEYS: true CHECK_OWNING_APPLICATION: false LIMIT_CONSUMERS_NUMBER: true @@ -167,7 +166,6 @@ nakadi.features.defaultFeatures: DISABLE_EVENT_TYPE_CREATION: false DISABLE_EVENT_TYPE_DELETION: false DISABLE_SUBSCRIPTION_CREATION: false - HIGH_LEVEL_API: true CHECK_PARTITIONS_KEYS: true CHECK_OWNING_APPLICATION: true LIMIT_CONSUMERS_NUMBER: true diff --git a/src/test/java/org/zalando/nakadi/controller/CursorsControllerTest.java b/src/test/java/org/zalando/nakadi/controller/CursorsControllerTest.java index aeb0600356..b489d245e9 100644 --- a/src/test/java/org/zalando/nakadi/controller/CursorsControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/CursorsControllerTest.java @@ -2,7 +2,6 @@ import com.google.common.collect.ImmutableList; import org.junit.Test; -import org.mockito.Mockito; import org.springframework.http.HttpStatus; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.test.web.servlet.MockMvc; @@ -16,7 +15,6 @@ import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.NoSuchSubscriptionException; -import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; @@ -48,7 +46,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API; import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; import static org.zalando.nakadi.utils.TestUtils.buildTimelineWithTopic; import static org.zalando.nakadi.utils.TestUtils.invalidProblem; @@ -175,13 +172,6 @@ public void whenBodyIsNotJsonThenBadRequest() throws Exception { .andExpect(status().is(HttpStatus.BAD_REQUEST.value())); } - @Test - public void whenGetAndNoFeatureThenNotImplemented() throws Exception { - Mockito.doThrow(new FeatureNotAvailableException("Not available", HIGH_LEVEL_API)) - .when(featureToggleService).checkFeatureOn(eq(HIGH_LEVEL_API)); - getCursors().andExpect(status().is(HttpStatus.NOT_IMPLEMENTED.value())); - } - @Test public void whenCommitCursorWithoutEventTypeThenUnprocessableEntity() throws Exception { checkForProblem( diff --git a/src/test/resources/sample-event-type-update.json b/src/test/resources/sample-event-type-update.json index db100120f0..8fc9cd636d 100644 --- a/src/test/resources/sample-event-type-update.json +++ b/src/test/resources/sample-event-type-update.json @@ -6,7 +6,7 @@ "schema": { "version": "1.0.0", "type": "json_schema", - "schema": "{\"type\": \"object\", \"properties\": {\"foo\": {\"type\": \"string\"}}, \"required\": [\"foo\"]}" + "schema": "{\"type\": \"object\", \"properties\": {\"foo\": {\"type\": \"string\"}, \"bar\": {\"type\": \"object\", \"properties\": {\"baz\": {\"type\": \"string\"}}}}, \"required\": [\"foo\"]}" }, "options" : { "retention_time": 86400000 diff --git a/src/test/resources/sample-event-type.json b/src/test/resources/sample-event-type.json index 30e38181d7..7c130817f6 100644 --- a/src/test/resources/sample-event-type.json +++ b/src/test/resources/sample-event-type.json @@ -5,7 +5,8 @@ "compatibility_mode": "compatible", "schema": { "type": "json_schema", - "schema": "{\"type\": \"object\", \"properties\": {\"foo\": {\"type\": \"string\"}}, \"required\": [\"foo\"]}" + "schema": "{\"type\": \"object\", \"properties\": {\"foo\": {\"type\": \"string\"}, \"bar\": {\"type\": \"object\", \"properties\": {\"baz\": {\"type\": \"string\"}}}}, \"required\": [\"foo\"]}" }, - "audience": "external-public" + "audience": "external-public", + "ordering_key_fields": ["foo", "bar.baz"] } \ No newline at end of file