diff --git a/CHANGELOG.md b/CHANGELOG.md index d35a25dde8..b757100797 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.8.2] - 2018-07-26 + +### Removed +- Removed Legacy Feature Toggles + ## [2.8.1] ### Added diff --git a/docker-compose.yml b/docker-compose.yml index e9bc82d7c6..d5bfc3d793 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,10 +16,6 @@ services: - NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_EVENT_TYPE_CREATION - NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_EVENT_TYPE_DELETION - NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_SUBSCRIPTION_CREATION - - NAKADI_FEATURES_DEFAULT_FEATURES_CHECK_PARTITIONS_KEYS - - NAKADI_FEATURES_DEFAULT_FEATURES_CHECK_OWNING_APPLICATION - - NAKADI_FEATURES_DEFAULT_FEATURES_LIMIT_CONSUMERS_NUMBER - - NAKADI_FEATURES_DEFAULT_FEATURES_SEND_BATCH_VIA_OUTPUT_STREAM - NAKADI_FEATURES_DEFAULT_FEATURES_REMOTE_TOKENINFO - NAKADI_FEATURES_DEFAULT_FEATURES_KPI_COLLECTION - NAKADI_FEATURES_DEFAULT_FEATURES_DISABLE_DB_WRITE_OPERATIONS diff --git a/src/acceptance-test/java/org/zalando/nakadi/service/job/ConsumerLimitingCleaningJobAT.java b/src/acceptance-test/java/org/zalando/nakadi/service/job/ConsumerLimitingCleaningJobAT.java deleted file mode 100644 index 2adf8f1398..0000000000 --- a/src/acceptance-test/java/org/zalando/nakadi/service/job/ConsumerLimitingCleaningJobAT.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.zalando.nakadi.service.job; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory; -import org.zalando.nakadi.service.ConsumerLimitingService; -import org.zalando.nakadi.webservice.BaseAT; -import org.zalando.nakadi.webservice.utils.ZookeeperTestUtils; - -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class ConsumerLimitingCleaningJobAT extends BaseAT { - - private static final CuratorFramework CURATOR = ZookeeperTestUtils.createCurator(ZOOKEEPER_URL); - - private ConsumerLimitingCleaningJob cleaningService; - - public ConsumerLimitingCleaningJobAT() { - final ZooKeeperHolder zkHolder = mock(ZooKeeperHolder.class); - when(zkHolder.get()).thenReturn(CURATOR); - final ZooKeeperLockFactory zkLockFactory = new ZooKeeperLockFactory(zkHolder); - final ConsumerLimitingService limitingService = new ConsumerLimitingService(zkHolder, zkLockFactory, 5); - - final JobWrapperFactory jobWrapperFactory = mock(JobWrapperFactory.class); - final ExclusiveJobWrapper jobWrapper = DummyJobWrapper.create(); - when(jobWrapperFactory.createExclusiveJobWrapper(any(), anyLong())).thenReturn(jobWrapper); - - cleaningService = new ConsumerLimitingCleaningJob(zkHolder, jobWrapperFactory, limitingService, 0); - } - - @Before - public void before() throws Exception { - deleteConsumersData(); - - // "hanging" node - CURATOR.create().creatingParentsIfNeeded().forPath("/nakadi/consumers/connections/hanging"); - - // "normal" node with child - CURATOR.create().creatingParentsIfNeeded().forPath("/nakadi/consumers/connections/normal/some_child"); - } - - @After - public void deleteConsumersData() throws Exception { - try { - CURATOR.delete().deletingChildrenIfNeeded().forPath("/nakadi/consumers"); - } catch (final KeeperException.NoNodeException e) { - // this is fine - } - } - - @Test - public void whenCleanThenOk() throws Exception { - cleaningService.cleanHangingNodes(); - - assertThat("the 'hanging' node should be deleted", - CURATOR.checkExists().forPath("/nakadi/consumers/connections/hanging"), - nullValue()); - assertThat("node with children nodes should not be deleted", - CURATOR.checkExists().forPath("/nakadi/consumers/connections/normal"), - not(nullValue())); - } - -} diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/ConsumerLimitingServiceAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/ConsumerLimitingServiceAT.java deleted file mode 100644 index 57f18fdb84..0000000000 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/ConsumerLimitingServiceAT.java +++ /dev/null @@ -1,112 +0,0 @@ -package org.zalando.nakadi.webservice; - -import com.google.common.collect.ImmutableList; -import org.apache.curator.framework.CuratorFramework; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.zalando.nakadi.exceptions.runtime.NoConnectionSlotsException; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory; -import org.zalando.nakadi.service.ConnectionSlot; -import org.zalando.nakadi.service.ConsumerLimitingService; -import org.zalando.nakadi.utils.TestUtils; -import org.zalando.nakadi.webservice.utils.ZookeeperTestUtils; - -import java.util.List; - -import static java.text.MessageFormat.format; -import static java.util.stream.IntStream.range; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.mockito.Mockito.when; -import static org.zalando.nakadi.utils.TestUtils.randomUUID; - -public class ConsumerLimitingServiceAT extends BaseAT { - - private static final CuratorFramework CURATOR = ZookeeperTestUtils.createCurator(ZOOKEEPER_URL); - - private ConsumerLimitingService limitingService; - private String eventType; - private String client; - - @Before - public void before() { - eventType = TestUtils.randomValidEventTypeName(); - client = TestUtils.randomTextString(); - - final ZooKeeperHolder zkHolder = Mockito.mock(ZooKeeperHolder.class); - when(zkHolder.get()).thenReturn(CURATOR); - final ZooKeeperLockFactory zkLockFactory = new ZooKeeperLockFactory(zkHolder); - limitingService = new ConsumerLimitingService(zkHolder, zkLockFactory, 5); - } - - @Test - public void whenAcquireConnectionSlotsThenDataInZk() throws Exception { - final ImmutableList partitions = ImmutableList.of("0", "1", "2", "3"); - - final List connectionSlots = limitingService.acquireConnectionSlots(client, eventType, - partitions); - - assertThat("4 connection slots were created in ZK", connectionSlots, hasSize(4)); - - for (final String partition : partitions) { - final String path = zkPathForConsumer(partition); - - assertThat("Node for partition should be created", - CURATOR.checkExists().forPath(path), - not(nullValue())); - - final List children = CURATOR.getChildren().forPath(path); - assertThat("Node for connection should be created", - children, - hasSize(1)); - - final ConnectionSlot expectedSlot = new ConnectionSlot(client, eventType, partition, children.get(0)); - assertThat(connectionSlots, hasItem(expectedSlot)); - } - } - - @Test(expected = NoConnectionSlotsException.class) - public void whenNoFreeSlotsThenException() throws Exception { - final String partition = "0"; - - range(0, 5).forEach(x -> { - try { - final String path = zkPathForConsumer(partition) + "/" + randomUUID(); - CURATOR.create().creatingParentsIfNeeded().forPath(path); - } catch (Exception e) { - throw new AssertionError("Error occurred when accessing Zookeeper"); - } - }); - - limitingService.acquireConnectionSlots(client, eventType, ImmutableList.of(partition)); - } - - @Test - public void whenReleaseSlotThatNodeDeletedInZk() throws Exception { - final String connectionId = randomUUID(); - final String partition = "0"; - - final String partitionPath = zkPathForConsumer(partition); - final String connectionPath = partitionPath + "/" + connectionId; - CURATOR.create().creatingParentsIfNeeded().forPath(connectionPath); - - final ImmutableList connectionSlots = - ImmutableList.of(new ConnectionSlot(client, eventType, partition, connectionId)); - - limitingService.releaseConnectionSlots(connectionSlots); - - assertThat("partition and connection Zk nodes should be deleted", - CURATOR.checkExists().forPath(partitionPath), - nullValue()); - } - - private String zkPathForConsumer(final String partition) { - return format("/nakadi/consumers/connections/{0}|{1}|{2}", client, eventType, partition); - } - -} diff --git a/src/main/java/org/zalando/nakadi/controller/EventStreamController.java b/src/main/java/org/zalando/nakadi/controller/EventStreamController.java index c6e15b47a1..159b13cb09 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventStreamController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventStreamController.java @@ -6,7 +6,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -42,14 +41,11 @@ import org.zalando.nakadi.service.AuthorizationValidator; import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.service.ClosedConnectionsCrutch; -import org.zalando.nakadi.service.ConnectionSlot; -import org.zalando.nakadi.service.ConsumerLimitingService; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.EventStream; import org.zalando.nakadi.service.EventStreamConfig; import org.zalando.nakadi.service.EventStreamFactory; import org.zalando.nakadi.service.EventTypeChangeListener; -import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.util.FlowIdUtils; import org.zalando.nakadi.view.Cursor; @@ -78,7 +74,6 @@ import static javax.ws.rs.core.Response.Status.PRECONDITION_FAILED; import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import static org.zalando.nakadi.metrics.MetricUtils.metricNameFor; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.LIMIT_CONSUMERS_NUMBER; @RestController public class EventStreamController { @@ -93,8 +88,6 @@ public class EventStreamController { private final MetricRegistry metricRegistry; private final ClosedConnectionsCrutch closedConnectionsCrutch; private final BlacklistService blacklistService; - private final ConsumerLimitingService consumerLimitingService; - private final FeatureToggleService featureToggleService; private final CursorConverter cursorConverter; private final MetricRegistry streamMetrics; private final AuthorizationValidator authorizationValidator; @@ -110,8 +103,6 @@ public EventStreamController(final EventTypeRepository eventTypeRepository, @Qualifier("streamMetricsRegistry") final MetricRegistry streamMetrics, final ClosedConnectionsCrutch closedConnectionsCrutch, final BlacklistService blacklistService, - final ConsumerLimitingService consumerLimitingService, - final FeatureToggleService featureToggleService, final CursorConverter cursorConverter, final AuthorizationValidator authorizationValidator, final EventTypeChangeListener eventTypeChangeListener, @@ -124,8 +115,6 @@ public EventStreamController(final EventTypeRepository eventTypeRepository, this.streamMetrics = streamMetrics; this.closedConnectionsCrutch = closedConnectionsCrutch; this.blacklistService = blacklistService; - this.consumerLimitingService = consumerLimitingService; - this.featureToggleService = featureToggleService; this.cursorConverter = cursorConverter; this.authorizationValidator = authorizationValidator; this.eventTypeChangeListener = eventTypeChangeListener; @@ -219,7 +208,6 @@ public StreamingResponseBody streamEvents( final AtomicBoolean connectionReady = closedConnectionsCrutch.listenForConnectionClose(request); Counter consumerCounter = null; EventStream eventStream = null; - List connectionSlots = ImmutableList.of(); final AtomicBoolean needCheckAuthorization = new AtomicBoolean(false); LOG.info("[X-NAKADI-CURSORS] \"{}\" {}", eventTypeName, Optional.ofNullable(cursorsStr).orElse("-")); @@ -243,15 +231,6 @@ public StreamingResponseBody streamEvents( .withMaxMemoryUsageBytes(maxMemoryUsageBytes) .build(); - // acquire connection slots to limit the number of simultaneous connections from one client - if (featureToggleService.isFeatureEnabled(LIMIT_CONSUMERS_NUMBER)) { - final List partitions = streamConfig.getCursors().stream() - .map(NakadiCursor::getPartition) - .collect(Collectors.toList()); - connectionSlots = consumerLimitingService.acquireConnectionSlots( - client.getClientId(), eventTypeName, partitions); - } - consumerCounter = metricRegistry.counter(metricNameFor(eventTypeName, CONSUMERS_COUNT_METRIC_NAME)); consumerCounter.inc(); @@ -305,7 +284,6 @@ public StreamingResponseBody streamEvents( writeProblemResponse(response, outputStream, INTERNAL_SERVER_ERROR, e.getMessage()); } finally { connectionReady.set(false); - consumerLimitingService.releaseConnectionSlots(connectionSlots); if (consumerCounter != null) { consumerCounter.dec(); } diff --git a/src/main/java/org/zalando/nakadi/controller/EventTypeController.java b/src/main/java/org/zalando/nakadi/controller/EventTypeController.java index 7768e808f9..e0b825fa60 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventTypeController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventTypeController.java @@ -50,7 +50,6 @@ import java.util.List; import static org.springframework.http.ResponseEntity.status; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.CHECK_OWNING_APPLICATION; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_EVENT_TYPE_CREATION; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_EVENT_TYPE_DELETION; @@ -96,12 +95,6 @@ public ResponseEntity create(@Valid @RequestBody final EventTypeBase eventTyp return new ResponseEntity<>(HttpStatus.NOT_IMPLEMENTED); } - if (featureToggleService.isFeatureEnabled(CHECK_OWNING_APPLICATION) - && !applicationService.exists(eventType.getOwningApplication())) { - return Responses.create(Problem.valueOf(MoreStatus.UNPROCESSABLE_ENTITY, - "owning_application doesn't exist"), request); - } - if (errors.hasErrors()) { return Responses.create(new ValidationProblem(errors), request); } diff --git a/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java index c106062948..b1e9ceff1d 100644 --- a/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java @@ -36,7 +36,6 @@ import javax.ws.rs.core.Response; import static org.springframework.http.HttpStatus.OK; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.CHECK_OWNING_APPLICATION; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_SUBSCRIPTION_CREATION; @@ -67,12 +66,6 @@ public ResponseEntity createOrGetSubscription(@Valid @RequestBody final Subsc return Responses.create(new ValidationProblem(errors), request); } - if (featureToggleService.isFeatureEnabled(CHECK_OWNING_APPLICATION) - && !applicationService.exists(subscriptionBase.getOwningApplication())) { - return Responses.create(Problem.valueOf(MoreStatus.UNPROCESSABLE_ENTITY, - "owning_application doesn't exist"), request); - } - try { return ok(subscriptionService.getExistingSubscription(subscriptionBase)); } catch (final NoSubscriptionException e) { diff --git a/src/main/java/org/zalando/nakadi/service/ConsumerLimitingService.java b/src/main/java/org/zalando/nakadi/service/ConsumerLimitingService.java deleted file mode 100644 index 0cdfd56be2..0000000000 --- a/src/main/java/org/zalando/nakadi/service/ConsumerLimitingService.java +++ /dev/null @@ -1,209 +0,0 @@ -package org.zalando.nakadi.service; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.lang3.StringUtils; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; -import org.zalando.nakadi.exceptions.NakadiRuntimeException; -import org.zalando.nakadi.exceptions.runtime.NoConnectionSlotsException; -import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; -import org.zalando.nakadi.repository.zookeeper.ZkChildrenCache; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; - -import static java.text.MessageFormat.format; -import static java.util.stream.Collectors.toList; -import static org.zalando.nakadi.repository.zookeeper.ZookeeperUtils.runLocked; - -@Component -public class ConsumerLimitingService { - - public static final String CONNECTIONS_ZK_PATH = "/nakadi/consumers/connections"; - public static final String LOCKS_ZK_PATH = "/nakadi/consumers/locks"; - - private static final Logger LOG = LoggerFactory.getLogger(ConsumerLimitingService.class); - - private static final String ERROR_MSG = "You exceeded the maximum number of simultaneous connections to a single " + - "partition for event type: {0}, partition(s): {1}; max limit is {2} connections per client"; - - private final ZooKeeperHolder zkHolder; - private final ZooKeeperLockFactory zkLockFactory; - private final int maxConnections; - - private static final ConcurrentMap SLOTS_CACHES = Maps.newConcurrentMap(); - private static final List ACQUIRED_SLOTS = Collections.synchronizedList(Lists.newArrayList()); - - @Autowired - public ConsumerLimitingService(final ZooKeeperHolder zkHolder, - final ZooKeeperLockFactory zkLockFactory, - @Value("${nakadi.stream.maxConnections}") final int maxConnections) { - this.zkHolder = zkHolder; - this.zkLockFactory = zkLockFactory; - this.maxConnections = maxConnections; - } - - @SuppressWarnings("unchecked") - public List acquireConnectionSlots(final String client, final String eventType, - final List partitions) - throws NoConnectionSlotsException, ServiceTemporarilyUnavailableException { - - final List partitionsWithNoFreeSlots = getPartitionsWithNoFreeSlots(client, eventType, partitions); - if (partitionsWithNoFreeSlots.size() == 0) { - - final List slots = new ArrayList<>(); - final String lockZkPath = ZKPaths.makePath(LOCKS_ZK_PATH, client + "|" + eventType); - try { - return runLocked(() -> { - // we need to check it again when we are under lock - final List occupiedPartitions = getPartitionsWithNoFreeSlots(client, eventType, partitions); - if (occupiedPartitions.size() > 0) { - throw generateNoConnectionSlotsException(eventType, occupiedPartitions, client); - } - - for (final String partition : partitions) { - final ConnectionSlot connectionSlot = acquireConnectionSlot(client, eventType, partition); - slots.add(connectionSlot); - } - return slots; - }, zkLockFactory.createLock(lockZkPath)); - } catch (final NoConnectionSlotsException e) { - throw e; - } catch (final Exception e) { - // in a case of failure release slots for partitions that already acquired slots - slots.forEach(this::releaseConnectionSlot); - throw new ServiceTemporarilyUnavailableException("Error communicating with zookeeper", e); - } - } else { - throw generateNoConnectionSlotsException(eventType, partitionsWithNoFreeSlots, client); - } - } - - private List getPartitionsWithNoFreeSlots(final String client, final String eventType, - final List partitions) { - return partitions.stream() - .filter(partition -> { - final String zkPath = zkPathForConsumer(client, eventType, partition); - final List slotsOccupied = getChildrenCached(zkPath); - return slotsOccupied.size() >= maxConnections; - }) - .collect(toList()); - } - - private NoConnectionSlotsException generateNoConnectionSlotsException(final String eventType, - final List overBookedPartitions, - final String client) { - final String partitionsStr = StringUtils.join(overBookedPartitions, ","); - final String msg = format(ERROR_MSG, eventType, partitionsStr, maxConnections); - LOG.debug("Limit exceeded for connection count for client: {}, event type: {}, partition(s): {}", - client, eventType, partitionsStr); - return new NoConnectionSlotsException(msg); - } - - public void releaseConnectionSlots(final List connectionSlots) { - connectionSlots.forEach(this::releaseConnectionSlot); - } - - private void releaseConnectionSlot(final ConnectionSlot slot) { - final String consumerNode = zkNodeNameForConsumer(slot.getClient(), slot.getEventType(), slot.getPartition()); - final String connectionNodePath = ZKPaths.makePath(CONNECTIONS_ZK_PATH, consumerNode, slot.getConnectionId()); - try { - zkHolder.get() - .delete() - .guaranteed() - .forPath(connectionNodePath); - deletePartitionNodeIfPossible(consumerNode); - } catch (final Exception e) { - LOG.error("Zookeeper error when deleting consumer connection node", e); - } - - ACQUIRED_SLOTS.remove(slot); - try { - deleteCacheIfPossible(slot); - } catch (final Exception e) { - LOG.error("Zookeeper error when deleting consumer connections cache", e); - } - } - - private void deleteCacheIfPossible(final ConnectionSlot slot) throws IOException { - final boolean hasMoreConnectionsToPartition = ACQUIRED_SLOTS.stream() - .anyMatch(s -> s.getPartition().equals(slot.getPartition()) - && s.getClient().equals(slot.getClient()) - && s.getEventType().equals(slot.getEventType())); - if (!hasMoreConnectionsToPartition) { - final String consumerPath = zkPathForConsumer(slot.getClient(), slot.getEventType(), slot.getPartition()); - final PathChildrenCache cache = SLOTS_CACHES.remove(consumerPath); - if (cache != null) { - cache.close(); - } - } - } - - public void deletePartitionNodeIfPossible(final String nodeName) { - try { - zkHolder.get() - .delete() - .forPath(ZKPaths.makePath(CONNECTIONS_ZK_PATH, nodeName)); - } catch (final KeeperException.NotEmptyException | KeeperException.NoNodeException e) { - // if the node has children - we should not delete it - // if the node doesn't exist - good, other thread/instance already deleted it - } catch (final Exception e) { - LOG.error("Zookeeper error when trying delete consumer node", e); - } - } - - private ConnectionSlot acquireConnectionSlot(final String client, final String eventType, - final String partition) { - - final String parent = zkPathForConsumer(client, eventType, partition); - final String slotId = UUID.randomUUID().toString(); - final String zkPath = ZKPaths.makePath(parent, slotId); - try { - zkHolder.get() - .create() - .creatingParentsIfNeeded() - .withMode(CreateMode.EPHEMERAL) - .forPath(zkPath); - } catch (Exception e) { - LOG.error("Zookeeper error when creating consumer node", e); - throw new NakadiRuntimeException(e); - } - final ConnectionSlot acquiredSlot = new ConnectionSlot(client, eventType, partition, slotId); - ACQUIRED_SLOTS.add(acquiredSlot); - return acquiredSlot; - } - - private List getChildrenCached(final String zkPath) { - final PathChildrenCache cache = SLOTS_CACHES.computeIfAbsent(zkPath, - key -> ZkChildrenCache.createCache(zkHolder.get(), key)); - return cache.getCurrentData().stream() - .map(childData -> { - final String[] pathParts = childData.getPath().split("/"); - return pathParts[pathParts.length - 1]; - }) - .collect(toList()); - } - - private String zkPathForConsumer(final String client, final String eventType, final String partition) { - return ZKPaths.makePath(CONNECTIONS_ZK_PATH, zkNodeNameForConsumer(client, eventType, partition)); - } - - private String zkNodeNameForConsumer(final String client, final String eventType, final String partition) { - return format("{0}|{1}|{2}", client, eventType, partition); - } - -} diff --git a/src/main/java/org/zalando/nakadi/service/EventStream.java b/src/main/java/org/zalando/nakadi/service/EventStream.java index b3edc30164..5819c11603 100644 --- a/src/main/java/org/zalando/nakadi/service/EventStream.java +++ b/src/main/java/org/zalando/nakadi/service/EventStream.java @@ -36,7 +36,7 @@ public class EventStream { private final BlacklistService blacklistService; private final CursorConverter cursorConverter; private final Meter bytesFlushedMeter; - private final EventStreamWriterProvider writer; + private final EventStreamWriter eventStreamWriter; private final StreamKpiData kpiData; private final String kpiDataStreamedEventType; private final long kpiFrequencyMs; @@ -47,7 +47,7 @@ public EventStream(final EventConsumer eventConsumer, final EventStreamConfig config, final BlacklistService blacklistService, final CursorConverter cursorConverter, final Meter bytesFlushedMeter, - final EventStreamWriterProvider writer, + final EventStreamWriter eventStreamWriter, final NakadiKpiPublisher kpiPublisher, final String kpiDataStreamedEventType, final long kpiFrequencyMs) { this.eventConsumer = eventConsumer; @@ -56,7 +56,7 @@ public EventStream(final EventConsumer eventConsumer, this.blacklistService = blacklistService; this.cursorConverter = cursorConverter; this.bytesFlushedMeter = bytesFlushedMeter; - this.writer = writer; + this.eventStreamWriter = eventStreamWriter; this.kpiPublisher = kpiPublisher; this.kpiData = new StreamKpiData(); this.kpiDataStreamedEventType = kpiDataStreamedEventType; @@ -224,7 +224,7 @@ private Map createMapWithPartitionKeys(final Function private void sendBatch(final NakadiCursor topicPosition, final List currentBatch) throws IOException { - final int bytesWritten = writer.getWriter() + final int bytesWritten = eventStreamWriter .writeBatch(outputStream, cursorConverter.convert(topicPosition), currentBatch); bytesFlushedMeter.mark(bytesWritten); kpiData.addBytesSent(bytesWritten); diff --git a/src/main/java/org/zalando/nakadi/service/EventStreamFactory.java b/src/main/java/org/zalando/nakadi/service/EventStreamFactory.java index aa65ea0190..9dee599d65 100644 --- a/src/main/java/org/zalando/nakadi/service/EventStreamFactory.java +++ b/src/main/java/org/zalando/nakadi/service/EventStreamFactory.java @@ -14,7 +14,7 @@ public class EventStreamFactory { private final CursorConverter cursorConverter; - private final EventStreamWriterProvider writerProvider; + private final EventStreamWriter eventStreamWriter; private final BlacklistService blacklistService; private final NakadiKpiPublisher nakadiKpiPublisher; private final String kpiDataStreamedEventType; @@ -23,13 +23,13 @@ public class EventStreamFactory { @Autowired public EventStreamFactory( final CursorConverter cursorConverter, - final EventStreamWriterProvider writerProvider, + final EventStreamWriter eventStreamWriter, final BlacklistService blacklistService, final NakadiKpiPublisher nakadiKpiPublisher, @Value("${nakadi.kpi.event-types.nakadiDataStreamed}") final String kpiDataStreamedEventType, @Value("${nakadi.kpi.config.stream-data-collection-frequency-ms}") final long kpiFrequencyMs) { this.cursorConverter = cursorConverter; - this.writerProvider = writerProvider; + this.eventStreamWriter = eventStreamWriter; this.blacklistService = blacklistService; this.nakadiKpiPublisher = nakadiKpiPublisher; this.kpiDataStreamedEventType = kpiDataStreamedEventType; @@ -46,7 +46,7 @@ public EventStream createEventStream(final OutputStream outputStream, final Even blacklistService, cursorConverter, bytesFlushedMeter, - writerProvider, + eventStreamWriter, nakadiKpiPublisher, kpiDataStreamedEventType, kpiFrequencyMs); diff --git a/src/main/java/org/zalando/nakadi/service/EventStreamWriterBinary.java b/src/main/java/org/zalando/nakadi/service/EventStreamWriterBinary.java index adce181e9e..1046a2c3e9 100644 --- a/src/main/java/org/zalando/nakadi/service/EventStreamWriterBinary.java +++ b/src/main/java/org/zalando/nakadi/service/EventStreamWriterBinary.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.service; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.zalando.nakadi.domain.ConsumedEvent; import org.zalando.nakadi.view.Cursor; @@ -14,7 +13,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; @Component -@Qualifier("binary") class EventStreamWriterBinary implements EventStreamWriter { private static final byte[] B_BATCH_SEPARATOR = BATCH_SEPARATOR.getBytes(UTF_8); diff --git a/src/main/java/org/zalando/nakadi/service/EventStreamWriterProvider.java b/src/main/java/org/zalando/nakadi/service/EventStreamWriterProvider.java deleted file mode 100644 index c18df9f966..0000000000 --- a/src/main/java/org/zalando/nakadi/service/EventStreamWriterProvider.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.zalando.nakadi.service; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Component; - -@Component -public class EventStreamWriterProvider { - private final FeatureToggleService featureToggleService; - private final EventStreamWriter binaryWriter; - private final EventStreamWriter stringWriter; - - @Autowired - public EventStreamWriterProvider( - final FeatureToggleService featureToggleService, - @Qualifier("binary") final EventStreamWriter binaryWriter, - @Qualifier("string") final EventStreamWriter stringWriter) { - this.featureToggleService = featureToggleService; - this.binaryWriter = binaryWriter; - this.stringWriter = stringWriter; - } - - public EventStreamWriter getWriter() { - if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.SEND_BATCH_VIA_OUTPUT_STREAM)) { - return binaryWriter; - } else { - return stringWriter; - } - } -} diff --git a/src/main/java/org/zalando/nakadi/service/EventStreamWriterString.java b/src/main/java/org/zalando/nakadi/service/EventStreamWriterString.java deleted file mode 100644 index 87e710f859..0000000000 --- a/src/main/java/org/zalando/nakadi/service/EventStreamWriterString.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.zalando.nakadi.service; - -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Component; -import org.zalando.nakadi.domain.ConsumedEvent; -import org.zalando.nakadi.view.Cursor; -import org.zalando.nakadi.view.SubscriptionCursor; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; -import java.util.Optional; - -import static com.google.common.base.Charsets.UTF_8; - -@Component -@Qualifier("string") -public class EventStreamWriterString implements EventStreamWriter { - - @Override - public int writeSubscriptionBatch(final OutputStream os, final SubscriptionCursor cursor, - final List events, - final Optional metadata) throws IOException { - final StringBuilder builder = new StringBuilder() - .append("{\"cursor\":{\"partition\":\"").append(cursor.getPartition()) - .append("\",\"offset\":\"").append(cursor.getOffset()) - .append("\",\"event_type\":\"").append(cursor.getEventType()) - .append("\",\"cursor_token\":\"").append(cursor.getCursorToken()) - .append("\"}"); - if (!events.isEmpty()) { - builder.append(",\"events\":["); - events.forEach(event -> builder.append(new String(event.getEvent())).append(",")); - builder.deleteCharAt(builder.length() - 1).append("]"); - } - metadata.ifPresent(s -> builder.append(",\"info\":{\"debug\":\"").append(s).append("\"}")); - builder.append("}").append(BATCH_SEPARATOR); - - final String eventsString = builder.toString(); - - final byte[] batchBytes = eventsString.getBytes(UTF_8); - os.write(batchBytes); - - os.flush(); - - return batchBytes.length; - } - - @Override - public int writeBatch(final OutputStream os, final Cursor cursor, final List events) throws IOException { - final StringBuilder builder = new StringBuilder() - .append("{\"cursor\":{\"partition\":\"").append(cursor.getPartition()) - .append("\",\"offset\":\"").append(cursor.getOffset()).append("\"}"); - if (!events.isEmpty()) { - builder.append(",\"events\":["); - events.forEach(event -> builder.append(new String(event)).append(",")); - builder.deleteCharAt(builder.length() - 1).append("]"); - } - - builder.append("}").append(BATCH_SEPARATOR); - - final String eventsString = builder.toString(); - - final byte[] batchBytes = eventsString.getBytes(UTF_8); - os.write(batchBytes); - - os.flush(); - - return batchBytes.length; - } -} diff --git a/src/main/java/org/zalando/nakadi/service/EventTypeService.java b/src/main/java/org/zalando/nakadi/service/EventTypeService.java index dda30c2c48..0b3ab246d1 100644 --- a/src/main/java/org/zalando/nakadi/service/EventTypeService.java +++ b/src/main/java/org/zalando/nakadi/service/EventTypeService.java @@ -71,7 +71,6 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.CHECK_PARTITIONS_KEYS; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS; @Component @@ -506,10 +505,7 @@ private void validateSchema(final EventTypeBase eventType) throws InvalidEventTy throw new InvalidEventTypeException("\"metadata\" property is reserved"); } - if (featureToggleService.isFeatureEnabled(CHECK_PARTITIONS_KEYS)) { - validatePartitionKeys(schema, eventType); - } - + validatePartitionKeys(schema, eventType); validateOrderingKeys(schema, eventType); if (eventType.getCompatibilityMode() == CompatibilityMode.COMPATIBLE) { diff --git a/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java b/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java index 22a85d2a08..dd124c1e43 100644 --- a/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java +++ b/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java @@ -33,10 +33,6 @@ enum Feature { DISABLE_EVENT_TYPE_DELETION("disable_event_type_deletion"), DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS("delete_event_type_with_subscriptions"), DISABLE_SUBSCRIPTION_CREATION("disable_subscription_creation"), - CHECK_PARTITIONS_KEYS("check_partitions_keys"), - CHECK_OWNING_APPLICATION("check_owning_application"), - LIMIT_CONSUMERS_NUMBER("limit_consumers_number"), - SEND_BATCH_VIA_OUTPUT_STREAM("send_batch_via_output_stream"), REMOTE_TOKENINFO("remote_tokeninfo"), KPI_COLLECTION("kpi_collection"), DISABLE_DB_WRITE_OPERATIONS("disable_db_write_operations"), diff --git a/src/main/java/org/zalando/nakadi/service/job/ConsumerLimitingCleaningJob.java b/src/main/java/org/zalando/nakadi/service/job/ConsumerLimitingCleaningJob.java deleted file mode 100644 index 2fbe837b08..0000000000 --- a/src/main/java/org/zalando/nakadi/service/job/ConsumerLimitingCleaningJob.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.zalando.nakadi.service.job; - -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; -import org.zalando.nakadi.service.ConsumerLimitingService; - -import static org.zalando.nakadi.service.ConsumerLimitingService.CONNECTIONS_ZK_PATH; - -@Service -public class ConsumerLimitingCleaningJob { - - public static final String JOB_NAME = "consumer-nodes-cleanup"; - - private static final Logger LOG = LoggerFactory.getLogger(ConsumerLimitingCleaningJob.class); - - private final ZooKeeperHolder zkHolder; - private final ConsumerLimitingService limitingService; - private final ExclusiveJobWrapper jobWrapper; - - @Autowired - public ConsumerLimitingCleaningJob(final ZooKeeperHolder zkHolder, - final JobWrapperFactory jobWrapperFactory, - final ConsumerLimitingService limitingService, - @Value("${nakadi.jobs.consumerNodesCleanup.runPeriodMs}") final int periodMs) { - this.zkHolder = zkHolder; - this.limitingService = limitingService; - jobWrapper = jobWrapperFactory.createExclusiveJobWrapper(JOB_NAME, periodMs); - } - - @Scheduled( - fixedDelayString = "${nakadi.jobs.checkRunMs}", - initialDelayString = "${random.int(${nakadi.jobs.checkRunMs})}") - public void cleanHangingNodes() { - jobWrapper.runJobLocked(this::cleanHangingConsumersNodes); - } - - private void cleanHangingConsumersNodes() { - // try to remove every consumer node; - // the nodes that have children will fail to be removed - try { - zkHolder.get() - .getChildren() - .forPath(CONNECTIONS_ZK_PATH) - .forEach(limitingService::deletePartitionNodeIfPossible); - } catch (final KeeperException.NoNodeException e) { - LOG.debug("ZK node for connections doesn't exist"); - } catch (final Exception e) { - LOG.error("ZK error when cleaning consumer nodes"); - } - } - -} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java index e6249fcea3..cafd3d2790 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java @@ -14,7 +14,7 @@ import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorTokenService; -import org.zalando.nakadi.service.EventStreamWriterProvider; +import org.zalando.nakadi.service.EventStreamWriter; import org.zalando.nakadi.service.EventTypeChangeListener; import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.NakadiKpiPublisher; @@ -37,7 +37,7 @@ public class SubscriptionStreamerFactory { private final CursorConverter cursorConverter; private final MetricRegistry metricRegistry; private final SubscriptionClientFactory zkClientFactory; - private final EventStreamWriterProvider eventStreamWriterProvider; + private final EventStreamWriter eventStreamWriter; private final AuthorizationValidator authorizationValidator; private final EventTypeChangeListener eventTypeChangeListener; private final EventTypeCache eventTypeCache; @@ -54,7 +54,7 @@ public SubscriptionStreamerFactory( final CursorConverter cursorConverter, @Qualifier("streamMetricsRegistry") final MetricRegistry metricRegistry, final SubscriptionClientFactory zkClientFactory, - final EventStreamWriterProvider eventStreamWriterProvider, + final EventStreamWriter eventStreamWriter, final AuthorizationValidator authorizationValidator, final EventTypeChangeListener eventTypeChangeListener, final EventTypeCache eventTypeCache, @@ -68,7 +68,7 @@ public SubscriptionStreamerFactory( this.cursorConverter = cursorConverter; this.metricRegistry = metricRegistry; this.zkClientFactory = zkClientFactory; - this.eventStreamWriterProvider = eventStreamWriterProvider; + this.eventStreamWriter = eventStreamWriter; this.authorizationValidator = authorizationValidator; this.eventTypeChangeListener = eventTypeChangeListener; this.eventTypeCache = eventTypeCache; @@ -106,7 +106,7 @@ public SubscriptionStreamer build( .setSubscription(subscription) .setMetricRegistry(metricRegistry) .setTimelineService(timelineService) - .setWriter(eventStreamWriterProvider.getWriter()) + .setWriter(eventStreamWriter) .setAuthorizationValidator(authorizationValidator) .setEventTypeChangeListener(eventTypeChangeListener) .setCursorComparator(new NakadiCursorComparator(eventTypeCache)) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3ad5cb4d00..ec2a97ced3 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -144,10 +144,6 @@ nakadi: DISABLE_EVENT_TYPE_CREATION: false DISABLE_EVENT_TYPE_DELETION: false DISABLE_SUBSCRIPTION_CREATION: false - CHECK_PARTITIONS_KEYS: true - CHECK_OWNING_APPLICATION: false - LIMIT_CONSUMERS_NUMBER: true - SEND_BATCH_VIA_OUTPUT_STREAM: true REMOTE_TOKENINFO: true KPI_COLLECTION: false DISABLE_DB_WRITE_OPERATIONS: false @@ -172,10 +168,6 @@ nakadi.features.defaultFeatures: DISABLE_EVENT_TYPE_CREATION: false DISABLE_EVENT_TYPE_DELETION: false DISABLE_SUBSCRIPTION_CREATION: false - CHECK_PARTITIONS_KEYS: true - CHECK_OWNING_APPLICATION: true - LIMIT_CONSUMERS_NUMBER: true - SEND_BATCH_VIA_OUTPUT_STREAM: true REMOTE_TOKENINFO: true KPI_COLLECTION: false DISABLE_DB_WRITE_OPERATIONS: false diff --git a/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java index 0130d22eaa..58eed109ee 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java @@ -39,7 +39,6 @@ import org.zalando.nakadi.service.AuthorizationValidator; import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.service.ClosedConnectionsCrutch; -import org.zalando.nakadi.service.ConsumerLimitingService; import org.zalando.nakadi.service.EventStream; import org.zalando.nakadi.service.EventStreamConfig; import org.zalando.nakadi.service.EventStreamFactory; @@ -151,9 +150,6 @@ public void setup() throws NakadiException, UnknownHostException, InvalidCursorE blacklistService = Mockito.mock(BlacklistService.class); Mockito.when(blacklistService.isConsumptionBlocked(any(), any())).thenReturn(false); - final ConsumerLimitingService consumerLimitingService = Mockito.mock(ConsumerLimitingService.class); - when(consumerLimitingService.acquireConnectionSlots(any(), any(), any())).thenReturn(ImmutableList.of()); - featureToggleService = mock(FeatureToggleService.class); timelineService = mock(TimelineService.class); when(timelineService.getTopicRepository((Timeline) any())).thenReturn(topicRepositoryMock); @@ -167,7 +163,7 @@ public void setup() throws NakadiException, UnknownHostException, InvalidCursorE when(eventTypeChangeListener.registerListener(any(), any())).thenReturn(mock(Closeable.class)); controller = new EventStreamController( eventTypeRepository, timelineService, TestUtils.OBJECT_MAPPER, eventStreamFactoryMock, metricRegistry, - streamMetrics, crutch, blacklistService, consumerLimitingService, featureToggleService, + streamMetrics, crutch, blacklistService, new CursorConverterImpl(eventTypeCache, timelineService), authorizationValidator, eventTypeChangeListener, null); diff --git a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java index 7888ec792f..8d78acb26c 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java @@ -583,20 +583,6 @@ public void whenCreateEventTypeWithWrongPartitionKeyFieldsThen422() throws Excep .andExpect(content().contentType("application/problem+json")); } - @Test - public void whenCreateEventTypeWithUnknownApplicationThen422() throws Exception { - - doReturn(false).when(applicationService).exists(any()); - - final EventType eventType = EventTypeTestBuilder.builder() - .partitionKeyFields(Collections.singletonList("blabla")).build(); - - doReturn(eventType).when(eventTypeRepository).findByName(eventType.getName()); - - postEventType(eventType).andExpect(status().isUnprocessableEntity()) - .andExpect(content().contentType("application/problem+json")); - } - @Test public void whenPUTEventTypeWithWrongPartitionKeyFieldsThen422() throws Exception { diff --git a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java index cc09983bba..91ead44928 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java +++ b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java @@ -50,7 +50,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.CHECK_PARTITIONS_KEYS; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_EVENT_TYPE_DELETION; import static org.zalando.nakadi.util.PrincipalMockFactory.mockPrincipal; import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs; @@ -114,7 +113,6 @@ public void init() throws Exception { doReturn(randomUUID).when(uuid).randomUUID(); doReturn(true).when(applicationService).exists(any()); - doReturn(true).when(featureToggleService).isFeatureEnabled(CHECK_PARTITIONS_KEYS); mockMvc = standaloneSetup(controller) .setMessageConverters(new StringHttpMessageConverter(), TestUtils.JACKSON_2_HTTP_MESSAGE_CONVERTER) diff --git a/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java index 3c91da81d0..14a6ee80a9 100644 --- a/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java @@ -114,16 +114,6 @@ public void whenPostValidSubscriptionThenOk() throws Exception { .andExpect(header().string("Content-Location", "/subscriptions/123")); } - @Test - public void whenCreateSubscriptionWithUnknownApplicationThenUnprocessableEntity() throws Exception { - final SubscriptionBase subscriptionBase = builder().buildSubscriptionBase(); - when(applicationService.exists(any())).thenReturn(false); - - postSubscription(subscriptionBase) - .andExpect(status().isUnprocessableEntity()) - .andExpect(content().contentTypeCompatibleWith("application/problem+json")); - } - @Test public void whenCreateSubscriptionWithEmptyConsumerGroupThenUnprocessableEntity() throws Exception { final SubscriptionBase subscriptionBase = builder() diff --git a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java index 3cb06e8390..d269c5fe3e 100644 --- a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java @@ -8,7 +8,6 @@ import org.json.JSONArray; import org.json.JSONObject; import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.zalando.nakadi.domain.ConsumedEvent; @@ -69,7 +68,7 @@ public class EventStreamTest { private static final Timeline TIMELINE = buildTimelineWithTopic(TOPIC); private static CursorConverter cursorConverter; - private static EventStreamWriterProvider writerProvider; + private static EventStreamWriter eventStreamWriter = new EventStreamWriterBinary(); private final NakadiKpiPublisher kpiPublisher = mock(NakadiKpiPublisher.class); private final String kpiEventType = "nakadi.data.streamed"; @@ -80,16 +79,6 @@ public static void setupMocks() { final TimelineService timelineService = mock(TimelineService.class); final EventTypeCache eventTypeCache = mock(EventTypeCache.class); cursorConverter = new CursorConverterImpl(eventTypeCache, timelineService); - writerProvider = mock(EventStreamWriterProvider.class); - } - - @Before - public void setupWriter() { - when(writerProvider.getWriter()).thenReturn(createWriter()); - } - - protected EventStreamWriter createWriter() { - return new EventStreamWriterBinary(); } @Test(timeout = 15000) @@ -104,7 +93,7 @@ public void whenIOExceptionThenStreamIsClosed() throws NakadiException, Interrup final OutputStream outputStreamMock = mock(OutputStream.class); final EventStream eventStream = new EventStream( emptyConsumer(), outputStreamMock, config, mock(BlacklistService.class), cursorConverter, - BYTES_FLUSHED_METER, writerProvider, kpiPublisher, kpiEventType, kpiFrequencyMs); + BYTES_FLUSHED_METER, eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); final Thread thread = new Thread(() -> eventStream.streamEvents(new AtomicBoolean(true), () -> { })); @@ -132,7 +121,7 @@ public void whenCrutchWorkedThenStreamIsClosed() throws NakadiException, Interru .build(); final EventStream eventStream = new EventStream( emptyConsumer(), mock(OutputStream.class), config, mock(BlacklistService.class), cursorConverter, - BYTES_FLUSHED_METER, writerProvider, kpiPublisher, kpiEventType, kpiFrequencyMs); + BYTES_FLUSHED_METER, eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); final AtomicBoolean streamOpen = new AtomicBoolean(true); final Thread thread = new Thread(() -> eventStream.streamEvents(streamOpen, () -> { })); @@ -160,7 +149,7 @@ public void whenAuthorizationChangedStreamClosed() throws NakadiException, Inter .build(); final EventStream eventStream = new EventStream( emptyConsumer(), mock(OutputStream.class), config, mock(BlacklistService.class), cursorConverter, - BYTES_FLUSHED_METER, writerProvider, kpiPublisher, kpiEventType, kpiFrequencyMs); + BYTES_FLUSHED_METER, eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); final AtomicBoolean triggerAuthChange = new AtomicBoolean(false); final AtomicBoolean accessDeniedTriggered = new AtomicBoolean(false); final Thread thread = new Thread(() -> { @@ -203,7 +192,7 @@ public void whenStreamTimeoutIsSetThenStreamIsClosed() throws NakadiException, I .build(); final EventStream eventStream = new EventStream( emptyConsumer(), mock(OutputStream.class), config, mock(BlacklistService.class), cursorConverter, - BYTES_FLUSHED_METER, writerProvider, kpiPublisher, kpiEventType, kpiFrequencyMs); + BYTES_FLUSHED_METER, eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); eventStream.streamEvents(new AtomicBoolean(true), () -> { }); // if something goes wrong - the test should fail with a timeout @@ -219,7 +208,7 @@ public void whenStreamLimitIsSetThenStreamIsClosed() throws NakadiException, IOE .withConsumingClient(mock(Client.class)) .build(); final EventStream eventStream = new EventStream(endlessDummyConsumer(), mock(OutputStream.class), config, - mock(BlacklistService.class), cursorConverter, BYTES_FLUSHED_METER, writerProvider, kpiPublisher, + mock(BlacklistService.class), cursorConverter, BYTES_FLUSHED_METER, eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); eventStream.streamEvents(new AtomicBoolean(true), () -> { }); @@ -238,7 +227,7 @@ public void whenKeepAliveLimitIsSetThenStreamIsClosed() throws NakadiException, .build(); final EventStream eventStream = new EventStream( emptyConsumer(), mock(OutputStream.class), config, mock(BlacklistService.class), cursorConverter, - BYTES_FLUSHED_METER, writerProvider, kpiPublisher, kpiEventType, kpiFrequencyMs); + BYTES_FLUSHED_METER, eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); eventStream.streamEvents(new AtomicBoolean(true), () -> { }); // if something goes wrong - the test should fail with a timeout @@ -259,7 +248,7 @@ public void whenNoEventsToReadThenKeepAliveIsSent() throws NakadiException, IOEx final EventStream eventStream = new EventStream( emptyConsumer(), out, config, mock(BlacklistService.class), cursorConverter, BYTES_FLUSHED_METER, - writerProvider, kpiPublisher, kpiEventType, kpiFrequencyMs); + eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); eventStream.streamEvents(new AtomicBoolean(true), () -> { }); @@ -286,7 +275,7 @@ public void whenBatchSizeIsSetThenGetEventsInBatches() throws NakadiException, I final EventStream eventStream = new EventStream( nCountDummyConsumerForPartition(12, "0"), out, config, mock(BlacklistService.class), - cursorConverter, BYTES_FLUSHED_METER, writerProvider, kpiPublisher, kpiEventType, kpiFrequencyMs); + cursorConverter, BYTES_FLUSHED_METER, eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); eventStream.streamEvents(new AtomicBoolean(true), () -> { }); @@ -324,7 +313,7 @@ public void whenReadingEventsTheOrderIsCorrect() throws NakadiException, IOExcep final EventStream eventStream = new EventStream(predefinedConsumer(events), out, config, mock(BlacklistService.class), cursorConverter, - BYTES_FLUSHED_METER, writerProvider, kpiPublisher, kpiEventType, kpiFrequencyMs); + BYTES_FLUSHED_METER, eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); eventStream.streamEvents(new AtomicBoolean(true), () -> { }); @@ -369,7 +358,7 @@ public void whenReadFromMultiplePartitionsThenGroupedInBatchesAccordingToPartiti final EventStream eventStream = new EventStream(predefinedConsumer(events), out, config, mock(BlacklistService.class), cursorConverter, - BYTES_FLUSHED_METER, writerProvider, kpiPublisher, kpiEventType, kpiFrequencyMs); + BYTES_FLUSHED_METER, eventStreamWriter, kpiPublisher, kpiEventType, kpiFrequencyMs); eventStream.streamEvents(new AtomicBoolean(true), () -> { }); @@ -466,7 +455,7 @@ public void testWriteStreamEvent() { "{\"e\":\"f\"}".getBytes()); try { - writerProvider.getWriter().writeBatch(baos, cursor, events); + eventStreamWriter.writeBatch(baos, cursor, events); final Map batch = TestUtils.OBJECT_MAPPER.readValue(baos.toString(), new TypeReference>() { }); @@ -497,7 +486,7 @@ public void testWriteStreamEventEmptyBatchProducesNoEventArray() { final ArrayList events = Lists.newArrayList(); try { - writerProvider.getWriter().writeBatch(baos, cursor, events); + eventStreamWriter.writeBatch(baos, cursor, events); final String json = baos.toString(); assertEquals("{\"cursor\":{\"partition\":\"11\",\"offset\":\"000000000000000012\"}}\n", json); @@ -527,7 +516,7 @@ public void testWriteStreamInfoWhenPresent() { new ConsumedEvent("{\"a\":\"b\"}".getBytes(), mock(NakadiCursor.class), 0)); try { - writerProvider.getWriter().writeSubscriptionBatch(baos, cursor, events, Optional.of("something")); + eventStreamWriter.writeSubscriptionBatch(baos, cursor, events, Optional.of("something")); final JSONObject batch = new JSONObject(baos.toString()); final JSONObject cursorM = batch.getJSONObject("cursor");