Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #871 from zalando/ARUHA-1636
Browse files Browse the repository at this point in the history
aruha-1636: unlimited retention time by Nakadi admin
  • Loading branch information
adyach authored May 8, 2018
2 parents ad65520 + 1282961 commit 6d4849e
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.springframework.context.annotation.Configuration;
import org.zalando.nakadi.domain.SchemaChange;
import org.zalando.nakadi.domain.Version;
import org.zalando.nakadi.validation.EventTypeOptionsValidator;
import org.zalando.nakadi.service.validation.EventTypeOptionsValidator;
import org.zalando.nakadi.validation.SchemaEvolutionService;
import org.zalando.nakadi.validation.schema.CategoryChangeConstraint;
import org.zalando.nakadi.validation.schema.CompatibilityModeChangeConstraint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.Errors;
import org.springframework.validation.ValidationUtils;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
Expand All @@ -34,14 +33,14 @@
import org.zalando.nakadi.exceptions.runtime.NoEventTypeException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.TopicConfigException;
import org.zalando.nakadi.exceptions.runtime.EventTypeOptionsValidationException;
import org.zalando.nakadi.plugin.api.ApplicationService;
import org.zalando.nakadi.plugin.api.authz.AuthorizationService;
import org.zalando.nakadi.problem.ValidationProblem;
import org.zalando.nakadi.service.AdminService;
import org.zalando.nakadi.service.EventTypeService;
import org.zalando.nakadi.service.FeatureToggleService;
import org.zalando.nakadi.service.Result;
import org.zalando.nakadi.validation.EventTypeOptionsValidator;
import org.zalando.problem.MoreStatus;
import org.zalando.problem.Problem;
import org.zalando.problem.spring.web.advice.Responses;
Expand All @@ -63,21 +62,18 @@ public class EventTypeController {

private final EventTypeService eventTypeService;
private final FeatureToggleService featureToggleService;
private final EventTypeOptionsValidator eventTypeOptionsValidator;
private final ApplicationService applicationService;
private final AdminService adminService;
private final NakadiSettings nakadiSettings;

@Autowired
public EventTypeController(final EventTypeService eventTypeService,
final FeatureToggleService featureToggleService,
final EventTypeOptionsValidator eventTypeOptionsValidator,
final ApplicationService applicationService,
final AdminService adminService,
final NakadiSettings nakadiSettings) {
this.eventTypeService = eventTypeService;
this.featureToggleService = featureToggleService;
this.eventTypeOptionsValidator = eventTypeOptionsValidator;
this.applicationService = applicationService;
this.adminService = adminService;
this.nakadiSettings = nakadiSettings;
Expand All @@ -100,7 +96,6 @@ public ResponseEntity<?> create(@Valid @RequestBody final EventTypeBase eventTyp
return new ResponseEntity<>(HttpStatus.NOT_IMPLEMENTED);
}

ValidationUtils.invokeValidator(eventTypeOptionsValidator, eventType.getOptions(), errors);
if (featureToggleService.isFeatureEnabled(CHECK_OWNING_APPLICATION)
&& !applicationService.exists(eventType.getOwningApplication())) {
return Responses.create(Problem.valueOf(MoreStatus.UNPROCESSABLE_ENTITY,
Expand Down Expand Up @@ -144,7 +139,6 @@ public ResponseEntity<?> update(
NakadiRuntimeException,
ServiceTemporarilyUnavailableException,
UnableProcessException {
ValidationUtils.invokeValidator(eventTypeOptionsValidator, eventType.getOptions(), errors);
if (errors.hasErrors()) {
return Responses.create(new ValidationProblem(errors), request);
}
Expand Down Expand Up @@ -234,4 +228,10 @@ public ResponseEntity<Problem> invalidEventTypeException(final InvalidEventTypeE
return Responses.create(exception.asProblem(), request);
}

@ExceptionHandler(EventTypeOptionsValidationException.class)
public ResponseEntity<Problem> unableProcess(final EventTypeOptionsValidationException exception,
final NativeWebRequest request) {
LOG.debug(exception.getMessage(), exception);
return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, exception.getMessage(), request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.zalando.nakadi.exceptions.runtime;

public class EventTypeOptionsValidationException extends MyNakadiRuntimeException1 {

public EventTypeOptionsValidationException(final String message) {
super(message);
}
}
7 changes: 4 additions & 3 deletions src/main/java/org/zalando/nakadi/service/AdminService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.zalando.nakadi.domain.ResourceAuthorization;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.DbWriteOperationsBlockedException;
import org.zalando.nakadi.plugin.api.PluginException;
import org.zalando.nakadi.plugin.api.authz.AuthorizationService;
import org.zalando.nakadi.plugin.api.authz.Resource;
import org.zalando.nakadi.repository.db.AuthorizationDbRepository;
Expand Down Expand Up @@ -67,14 +68,14 @@ public void updateAdmins(final List<Permission> newAdmins) throws DbWriteOperati
authorizationDbRepository.update(add, delete);
}

public boolean isAdmin(final AuthorizationService.Operation operation) {
public boolean isAdmin(final AuthorizationService.Operation operation) throws PluginException {
final List<Permission> permissions = getAdmins();
final Resource resource = new AdminResource(ADMIN_RESOURCE,
ResourceAuthorization.fromPermissionsList(permissions));
return authorizationService.isAuthorized(operation, resource);
}

public boolean hasAllDataAccess(final AuthorizationService.Operation operation) {
public boolean hasAllDataAccess(final AuthorizationService.Operation operation) throws PluginException {
try {
final List<Permission> permissions = resourceCache.get(ALL_DATA_ACCESS_RESOURCE,
() -> authorizationDbRepository.listAllDataAccess());
Expand All @@ -100,7 +101,7 @@ private List<Permission> removeDefaultAdmin(final List<Permission> permissions)
.collect(Collectors.toList());
}

private void validateAllAdmins(final List<Permission> admins) throws UnableProcessException {
private void validateAllAdmins(final List<Permission> admins) throws UnableProcessException, PluginException {
final List<Permission> invalid = admins.stream().filter(permission ->
!authorizationService.isAuthorizationAttributeValid(permission.getAuthorizationAttribute()))
.collect(Collectors.toList());
Expand Down
32 changes: 25 additions & 7 deletions src/main/java/org/zalando/nakadi/service/EventTypeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,21 @@
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.DbWriteOperationsBlockedException;
import org.zalando.nakadi.exceptions.runtime.EventTypeDeletionException;
import org.zalando.nakadi.exceptions.runtime.EventTypeOptionsValidationException;
import org.zalando.nakadi.exceptions.runtime.EventTypeUnavailableException;
import org.zalando.nakadi.exceptions.runtime.InconsistentStateException;
import org.zalando.nakadi.exceptions.runtime.NoEventTypeException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.TopicConfigException;
import org.zalando.nakadi.partitioning.PartitionResolver;
import org.zalando.nakadi.plugin.api.authz.AuthorizationService;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.repository.kafka.PartitionsCalculator;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.service.timeline.TimelineSync;
import org.zalando.nakadi.service.validation.EventTypeOptionsValidator;
import org.zalando.nakadi.util.JsonUtils;
import org.zalando.nakadi.validation.SchemaEvolutionService;
import org.zalando.nakadi.validation.SchemaIncompatibility;
Expand Down Expand Up @@ -85,6 +88,8 @@ public class EventTypeService {
private final TransactionTemplate transactionTemplate;
private final NakadiKpiPublisher nakadiKpiPublisher;
private final String etLogEventType;
private final EventTypeOptionsValidator eventTypeOptionsValidator;
private final AdminService adminService;

@Autowired
public EventTypeService(final EventTypeRepository eventTypeRepository,
Expand All @@ -100,7 +105,9 @@ public EventTypeService(final EventTypeRepository eventTypeRepository,
final TransactionTemplate transactionTemplate,
final NakadiSettings nakadiSettings,
final NakadiKpiPublisher nakadiKpiPublisher,
@Value("${nakadi.kpi.event-types.nakadiEventTypeLog}") final String etLogEventType) {
@Value("${nakadi.kpi.event-types.nakadiEventTypeLog}") final String etLogEventType,
final EventTypeOptionsValidator eventTypeOptionsValidator,
final AdminService adminService) {
this.eventTypeRepository = eventTypeRepository;
this.timelineService = timelineService;
this.partitionResolver = partitionResolver;
Expand All @@ -115,19 +122,27 @@ public EventTypeService(final EventTypeRepository eventTypeRepository,
this.nakadiSettings = nakadiSettings;
this.nakadiKpiPublisher = nakadiKpiPublisher;
this.etLogEventType = etLogEventType;
this.eventTypeOptionsValidator = eventTypeOptionsValidator;
this.adminService = adminService;
}

public List<EventType> list() {
return eventTypeRepository.list();
}

public void create(final EventTypeBase eventType) throws TopicCreationException, InternalNakadiException,
NoSuchPartitionStrategyException, DuplicatedEventTypeNameException, InvalidEventTypeException,
DbWriteOperationsBlockedException {
public void create(final EventTypeBase eventType)
throws TopicCreationException,
InternalNakadiException,
NoSuchPartitionStrategyException,
DuplicatedEventTypeNameException,
InvalidEventTypeException,
DbWriteOperationsBlockedException,
EventTypeOptionsValidationException {
if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) {
throw new DbWriteOperationsBlockedException("Cannot create event type: write operations on DB " +
"are blocked by feature flag.");
}
eventTypeOptionsValidator.checkRetentionTime(eventType.getOptions());
setDefaultEventTypeOptions(eventType);
validateSchema(eventType);
enrichment.validate(eventType);
Expand Down Expand Up @@ -250,18 +265,21 @@ public void update(final String eventTypeName,
NakadiRuntimeException,
ServiceTemporarilyUnavailableException,
UnableProcessException,
DbWriteOperationsBlockedException {
DbWriteOperationsBlockedException,
EventTypeOptionsValidationException {
if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) {
throw new DbWriteOperationsBlockedException("Cannot update event type: write operations on DB " +
"are blocked by feature flag.");
}
Closeable updatingCloser = null;
try {
updatingCloser = timelineSync.workWithEventType(eventTypeName, nakadiSettings.getTimelineWaitTimeoutMs());

final EventType original = eventTypeRepository.findByName(eventTypeName);

authorizationValidator.authorizeEventTypeAdmin(original);
if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) {
eventTypeOptionsValidator.checkRetentionTime(eventTypeBase.getOptions());
authorizationValidator.authorizeEventTypeAdmin(original);
}
authorizationValidator.validateAuthorization(original, eventTypeBase);
validateName(eventTypeName, eventTypeBase);
validateSchema(eventTypeBase);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.zalando.nakadi.service.validation;

import org.zalando.nakadi.domain.EventTypeOptions;
import org.zalando.nakadi.exceptions.runtime.EventTypeOptionsValidationException;

public final class EventTypeOptionsValidator {

private final long minTopicRetentionMs;
private final long maxTopicRetentionMs;

public EventTypeOptionsValidator(final long minTopicRetentionMs,
final long maxTopicRetentionMs) {
this.minTopicRetentionMs = minTopicRetentionMs;
this.maxTopicRetentionMs = maxTopicRetentionMs;
}

public void checkRetentionTime(final EventTypeOptions options) throws EventTypeOptionsValidationException {
if (options == null) {
return;
}

final Long retentionTime = options.getRetentionTime();
if (retentionTime != null) {
if (retentionTime > maxTopicRetentionMs) {
throw new EventTypeOptionsValidationException(
"Field \"options.retention_time\" can not be more than " + maxTopicRetentionMs);
} else if (retentionTime < minTopicRetentionMs) {
throw new EventTypeOptionsValidationException(
"Field \"options.retention_time\" can not be less than " + minTopicRetentionMs);
}
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.junit.Test;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.EventTypeOptions;
import org.zalando.nakadi.domain.EventTypeResource;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
Expand All @@ -18,6 +19,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

Expand Down Expand Up @@ -51,6 +53,34 @@ public void whenPUTNotAuthorizedThen403() throws Exception {
"Access on ADMIN event-type:"+ eventType.getName() + " denied"))));
}

@Test
public void whenPUTUnlimitedRetentionTimeByAdminThen200() throws Exception {
final EventTypeOptions eto = new EventTypeOptions();
eto.setRetentionTime(Long.MAX_VALUE);
final EventType eventType = EventTypeTestBuilder.builder().options(eto).build();

doReturn(eventType).when(eventTypeRepository).findByName(any());
when(adminService.isAdmin(AuthorizationService.Operation.WRITE)).thenReturn(true);

putEventType(eventType, eventType.getName())
.andExpect(status().isOk());
}

@Test
public void whenPUTUnlimitedRetentionTimeByUserThen422() throws Exception {
final EventTypeOptions eto = new EventTypeOptions();
eto.setRetentionTime(Long.MAX_VALUE);
final EventType eventType = EventTypeTestBuilder.builder().options(eto).build();

doReturn(eventType).when(eventTypeRepository).findByName(any());
when(adminService.isAdmin(AuthorizationService.Operation.WRITE)).thenReturn(false);

putEventType(eventType, eventType.getName())
.andExpect(status().isUnprocessableEntity())
.andExpect(content().string(matchesProblem(Problem.valueOf(MoreStatus.UNPROCESSABLE_ENTITY,
"Field \"options.retention_time\" can not be more than 345600000"))));
}

@Test
public void whenPUTNullAuthorizationForExistingAuthorization() throws Exception {
final EventType newEventType = EventTypeTestBuilder.builder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.zalando.nakadi.service.timeline.TimelineSync;
import org.zalando.nakadi.util.UUIDGenerator;
import org.zalando.nakadi.utils.TestUtils;
import org.zalando.nakadi.validation.EventTypeOptionsValidator;
import org.zalando.nakadi.service.validation.EventTypeOptionsValidator;
import org.zalando.nakadi.validation.SchemaEvolutionService;
import org.zalando.problem.Problem;
import uk.co.datumedge.hamcrest.json.SameJSONAs;
Expand Down Expand Up @@ -103,15 +103,14 @@ public void init() throws Exception {
return callback.doInTransaction(null);
});

final EventTypeOptionsValidator eventTypeOptionsValidator =
new EventTypeOptionsValidator(TOPIC_RETENTION_MIN_MS, TOPIC_RETENTION_MAX_MS);
final EventTypeService eventTypeService = new EventTypeService(eventTypeRepository, timelineService,
partitionResolver, enrichment, subscriptionRepository, schemaEvolutionService, partitionsCalculator,
featureToggleService, authorizationValidator, timelineSync, transactionTemplate, nakadiSettings,
nakadiKpiPublisher, "et-log-event-type");

final EventTypeOptionsValidator eventTypeOptionsValidator =
new EventTypeOptionsValidator(TOPIC_RETENTION_MIN_MS, TOPIC_RETENTION_MAX_MS);
final EventTypeController controller = new EventTypeController(eventTypeService,
featureToggleService, eventTypeOptionsValidator, applicationService, adminService, nakadiSettings);
nakadiKpiPublisher, "et-log-event-type", eventTypeOptionsValidator, adminService);
final EventTypeController controller = new EventTypeController(eventTypeService,featureToggleService,
applicationService, adminService, nakadiSettings);
doReturn(randomUUID).when(uuid).randomUUID();

doReturn(true).when(applicationService).exists(any());
Expand Down
Loading

0 comments on commit 6d4849e

Please sign in to comment.