Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #924 from zalando/ARUHA-1828
Browse files Browse the repository at this point in the history
Reduce size of logs of nakadi
  • Loading branch information
antban authored Aug 9, 2018
2 parents 53df492 + 82741a6 commit 650d107
Show file tree
Hide file tree
Showing 16 changed files with 59 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.zalando.nakadi.service.CursorTokenService;
import org.zalando.nakadi.service.CursorsService;
import org.zalando.nakadi.service.FeatureToggleService;
import org.zalando.nakadi.util.TimeLogger;
import org.zalando.nakadi.view.CursorCommitResult;
import org.zalando.nakadi.view.SubscriptionCursor;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;
Expand Down Expand Up @@ -93,19 +92,13 @@ public ResponseEntity<?> commitCursors(@PathVariable("subscriptionId") final Str
@NotNull @RequestHeader("X-Nakadi-StreamId") final String streamId,
final NativeWebRequest request) {

TimeLogger.startMeasure(
"COMMIT_CURSORS sid:" + subscriptionId + ", size=" + cursorsIn.getItems().size(),
"isFeatureEnabled");
try {
TimeLogger.addMeasure("convertToNakadiCursors");
final List<NakadiCursor> cursors = convertToNakadiCursors(cursorsIn);
if (cursors.isEmpty()) {
throw new CursorsAreEmptyException();
}
TimeLogger.addMeasure("callService");
final List<Boolean> items = cursorsService.commitCursors(streamId, subscriptionId, cursors);

TimeLogger.addMeasure("prepareResponse");
final boolean allCommited = items.stream().allMatch(item -> item);
if (allCommited) {
return noContent().build();
Expand All @@ -123,8 +116,6 @@ public ResponseEntity<?> commitCursors(@PathVariable("subscriptionId") final Str
} catch (final NakadiException e) {
LOG.error("Failed to commit cursors", e);
return create(e.asProblem(), request);
} finally {
LOG.info(TimeLogger.finishMeasure());
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/zalando/nakadi/filters/LoggingFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
@Component
public class LoggingFilter extends OncePerRequestFilter {

private static final Logger LOG = LoggerFactory.getLogger(LoggingFilter.class);
// We are using empty log name, cause it is used only for access log and we do not care about class name
private static final Logger ACCESS_LOGGER = LoggerFactory.getLogger("ACCESS_LOG");

private final NakadiKpiPublisher nakadiKpiPublisher;
private final String accessLogEventType;
Expand Down Expand Up @@ -55,7 +56,7 @@ protected void doFilterInternal(final HttpServletRequest request,
.orElse("-");
final Long contentLength = request.getContentLengthLong() == -1 ? 0 : request.getContentLengthLong();

LOG.info("[ACCESS_LOG] {} \"{}{}\" \"{}\" \"{}\" statusCode: {} {} ms \"{}\" \"{}\" {} bytes",
ACCESS_LOGGER.info("{} \"{}{}\" \"{}\" \"{}\" {} {}ms \"{}\" \"{}\" {}B",
method,
path,
query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void listenForConnectionClose(
if (!featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.CONNECTION_CLOSE_CRUTCH)) {
return;
}
LOG.debug("Listening for connection to close using crutch (" + address + ":" + port + ")");
LOG.debug("Listening for connection to close using crutch ({}:{})", address, port);
synchronized (toAdd) {
toAdd.computeIfAbsent(new ConnectionInfo(address, port), tmp -> new ArrayList<>()).add(onCloseListener);
}
Expand All @@ -169,7 +169,7 @@ public void refresh() throws IOException {
.filter(key -> CLOSED_STATES.contains(currentConnections.getOrDefault(key,
ConnectionState.TCP_CLOSE)))
.mapToLong(key -> {
LOG.info("Notifying about connection close via crutch: " + key);
LOG.debug("Notifying about connection close via crutch: {}", key);
return listeners.remove(key).stream().filter(BooleanSupplier::getAsBoolean).count();
}).sum();
if (closedCount > 0) {
Expand Down
16 changes: 5 additions & 11 deletions src/main/java/org/zalando/nakadi/service/CursorsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@
import org.zalando.nakadi.exceptions.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.db.EventTypeCache;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.service.subscription.LogPathBuilder;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.state.StartingState;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.util.TimeLogger;
import org.zalando.nakadi.util.UUIDGenerator;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

Expand Down Expand Up @@ -80,23 +80,17 @@ public List<Boolean> commitCursors(final String streamId, final String subscript
throws ServiceTemporarilyUnavailableException, InvalidCursorException, InvalidStreamIdException,
NoSuchEventTypeException, InternalNakadiException, NoSuchSubscriptionException, UnableProcessException,
AccessDeniedException {
TimeLogger.addMeasure("getSubscription");
final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId);

TimeLogger.addMeasure("authorize");
authorizationValidator.authorizeSubscriptionCommit(subscription);

TimeLogger.addMeasure("validateSubscriptionCursors");
validateSubscriptionCommitCursors(subscription, cursors);

TimeLogger.addMeasure("createSubscriptionClient");
final ZkSubscriptionClient zkClient = zkSubscriptionFactory.createClient(
subscription, "subscription." + subscriptionId + "." + streamId + ".offsets");
subscription, LogPathBuilder.build(subscriptionId, streamId, "offsets"));

TimeLogger.addMeasure("validateStreamId");
validateStreamId(cursors, streamId, zkClient);

TimeLogger.addMeasure("writeToZK");
return zkClient.commitOffsets(
cursors.stream().map(cursorConverter::convertToNoToken).collect(Collectors.toList()),
new SubscriptionCursorComparator(new NakadiCursorComparator(eventTypeCache)));
Expand Down Expand Up @@ -137,7 +131,7 @@ public List<SubscriptionCursorWithoutToken> getSubscriptionCursors(final String
throws NakadiException, ServiceTemporarilyUnavailableException {
final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId);
final ZkSubscriptionClient zkSubscriptionClient = zkSubscriptionFactory.createClient(
subscription, "subscription." + subscriptionId + ".get_cursors");
subscription, LogPathBuilder.build(subscriptionId, "get_cursors"));
final ImmutableList.Builder<SubscriptionCursorWithoutToken> cursorsListBuilder = ImmutableList.builder();

Partition[] partitions;
Expand Down Expand Up @@ -176,7 +170,7 @@ public void resetCursors(final String subscriptionId, final List<NakadiCursor> c
}

final ZkSubscriptionClient zkClient = zkSubscriptionFactory.createClient(
subscription, "subscription." + subscriptionId + ".reset_cursors");
subscription, LogPathBuilder.build(subscriptionId, "reset_cursors"));

// In case if subscription was never initialized - initialize it
zkClient.runLocked(() -> StartingState.initializeSubscriptionLocked(
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/zalando/nakadi/service/EventPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ EventPublishResult publishInternal(final String events,

return ok(batch);
} catch (final EventValidationException e) {
LOG.debug("Event validation error: {}", e.getMessage());
LOG.debug(
"Event validation error: {}",
Optional.ofNullable(e.getMessage()).map(s -> s.replaceAll("\n", "; ")).orElse(null)
);
return aborted(EventPublishingStep.VALIDATING, batch);
} catch (final PartitioningException e) {
LOG.debug("Event partition error: {}", e.getMessage());
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/zalando/nakadi/service/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
.get();
sendBatch(latestOffsets.get(heaviestPartition.getKey()), heaviestPartition.getValue());
final long freed = heaviestPartition.getValue().stream().mapToLong(v -> v.length).sum();
LOG.warn("Memory limit reached for event type {}: {} bytes. Freed: {} bytes, {} messages",
LOG.info("Memory limit reached for event type {}: {} bytes. Freed: {} bytes, {} messages",
config.getEtName(), bytesInMemory, freed, heaviestPartition.getValue().size());
bytesInMemory -= freed;
// Init new batch for subscription
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.zalando.nakadi.service.subscription;

public class LogPathBuilder {
public static String build(final String subscriptionId, final String addition) {
return "s." + subscriptionId + "." + addition;
}

public static String build(final String subscriptionId, final String streamId, final String addition) {
return build(subscriptionId, streamId + "." + addition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class StreamingContext implements SubscriptionStreamer {
private final ScheduledExecutorService timer;
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private final BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer;
private final String loggingPath;
private final CursorConverter cursorConverter;
private final Subscription subscription;
private final MetricRegistry metricRegistry;
Expand Down Expand Up @@ -86,8 +85,7 @@ private StreamingContext(final Builder builder) {
this.timer = builder.timer;
this.zkClient = builder.zkClient;
this.kafkaPollTimeout = builder.kafkaPollTimeout;
this.loggingPath = builder.loggingPath + ".stream";
this.log = LoggerFactory.getLogger(builder.loggingPath);
this.log = LoggerFactory.getLogger(LogPathBuilder.build(builder.subscription.getId(), builder.session.getId()));
this.connectionReady = builder.connectionReady;
this.timelineService = builder.timelineService;
this.cursorTokenService = builder.cursorTokenService;
Expand Down Expand Up @@ -198,7 +196,9 @@ void streamInternal(final State firstState) throws InterruptedException {

public void switchState(final State newState) {
this.addTask(() -> {
log.info("Switching state from " + currentState.getClass().getSimpleName());
log.info("Switching state from {} to {}",
currentState.getClass().getSimpleName(),
newState.getClass().getSimpleName());
// There is a problem with onExit call - it can not throw exceptions, otherwise it won't be possible
// to finish state correctly. In order to avoid it in future state will be switched even in case of
// exception.
Expand All @@ -207,8 +207,7 @@ public void switchState(final State newState) {
} finally {
currentState = newState;

log.info("Switching state to " + currentState.getClass().getSimpleName());
currentState.setContext(this, loggingPath);
currentState.setContext(this);
currentState.onEnter();
}
});
Expand Down Expand Up @@ -331,7 +330,6 @@ public static final class Builder {
private ZkSubscriptionClient zkClient;
private BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer;
private long kafkaPollTimeout;
private String loggingPath;
private AtomicBoolean connectionReady;
private CursorTokenService cursorTokenService;
private ObjectMapper objectMapper;
Expand Down Expand Up @@ -394,11 +392,6 @@ public Builder setKafkaPollTimeout(final long kafkaPollTimeout) {
return this;
}

public Builder setLoggingPath(final String loggingPath) {
this.loggingPath = loggingPath;
return this;
}

public Builder setConnectionReady(final AtomicBoolean connectionReady) {
this.connectionReady = connectionReady;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public Result<Void> deleteSubscription(final String subscriptionId) throws DbWri

subscriptionRepository.deleteSubscription(subscriptionId);
final ZkSubscriptionClient zkSubscriptionClient = subscriptionClientFactory.createClient(
subscription, "subscription." + subscriptionId + ".delete_subscription");
subscription, LogPathBuilder.build(subscriptionId, "delete_subscription"));
zkSubscriptionClient.deleteSubscription();

nakadiKpiPublisher.publish(subLogEventType, () -> new JSONObject()
Expand Down Expand Up @@ -279,7 +279,7 @@ private ZkSubscriptionClient createZkSubscriptionClient(final Subscription subsc
throws ServiceTemporarilyUnavailableException {
try {
return subscriptionClientFactory.createClient(subscription,
"subscription." + subscription.getId() + ".stats");
LogPathBuilder.build(subscription.getId(), "stats"));
} catch (final InternalNakadiException | NoSuchEventTypeException e) {
throw new ServiceTemporarilyUnavailableException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,17 @@ public SubscriptionStreamer build(
final BlacklistService blacklistService)
throws InternalNakadiException, NoSuchEventTypeException {
final Session session = Session.generate(1, streamParameters.getPartitions());
final String loggingPath = "subscription." + subscription.getId() + "." + session.getId();
// Create streaming context
return new StreamingContext.Builder()
.setOut(output)
.setStreamMemoryLimitBytes(streamMemoryLimitBytes)
.setParameters(streamParameters)
.setSession(session)
.setTimer(executorService)
.setZkClient(zkClientFactory.createClient(subscription, loggingPath))
.setZkClient(zkClientFactory.createClient(
subscription, LogPathBuilder.build(subscription.getId(), session.getId())))
.setRebalancer(new SubscriptionRebalancer())
.setKafkaPollTimeout(kafkaPollTimeout)
.setLoggingPath(loggingPath)
.setConnectionReady(connectionReady)
.setCursorTokenService(cursorTokenService)
.setObjectMapper(objectMapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.service.subscription.LogPathBuilder;
import org.zalando.nakadi.service.subscription.StreamParameters;
import org.zalando.nakadi.service.subscription.StreamingContext;
import org.zalando.nakadi.service.subscription.SubscriptionOutput;
Expand All @@ -15,9 +16,12 @@ public abstract class State {
private StreamingContext context;
private Logger log;

public void setContext(final StreamingContext context, final String loggingPath) {
public void setContext(final StreamingContext context) {
this.context = context;
this.log = LoggerFactory.getLogger(loggingPath + "." + this.getClass().getSimpleName());
this.log = LoggerFactory.getLogger(LogPathBuilder.build(
context.getSubscription().getId(),
context.getSessionId(),
"state." + this.getClass().getSimpleName()));
}

public Logger getLog() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.NakadiKpiPublisher;
import org.zalando.nakadi.service.subscription.IdleStreamWatcher;
import org.zalando.nakadi.service.subscription.LogPathBuilder;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.zk.ZkSubscription;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
Expand Down Expand Up @@ -359,7 +360,7 @@ public void logExtendedCommitInformation() {
final SubscriptionCursorWithoutToken remembered =
getContext().getCursorConverter().convertToNoToken(v.getValue().getCommitOffset());
final SubscriptionCursorWithoutToken real = realCommitted.get(v.getKey());
return real.getOffset().compareTo(remembered.getOffset())> 0;
return real.getOffset().compareTo(remembered.getOffset()) > 0;
})
.map(Map.Entry::getKey)
.collect(Collectors.toList());
Expand Down Expand Up @@ -627,7 +628,8 @@ private void addToStreaming(final Partition partition,
getComparator(),
subscription,
cursor,
LoggerFactory.getLogger("subscription." + getSessionId() + "." + partition.getKey()),
LoggerFactory.getLogger(LogPathBuilder.build(
getContext().getSubscription().getId(), getSessionId(), String.valueOf(partition.getKey()))),
System.currentTimeMillis());

offsets.put(partition.getKey(), pd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,11 @@ public List<Boolean> commitOffsets(
}
}
if (!currentMaxCursor.getOffset().equals(currentMaxOffset)) {
getLog().info("Committing {} to {}", currentMaxCursor.getOffset(), offsetPath);
getLog().info("Committing {} to {}/{}",
currentMaxCursor.getOffset(),
entry.getKey().getEventType(),
entry.getKey().getPartition());

getCurator()
.setData()
.withVersion(stat.getVersion())
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ log4j.category.org.thymeleaf=WARN
log4j.category.org.zalando.nakadi=DEBUG
log4j.category.org.zalando.nakadi.config=INFO
log4j.category.org.apache.kafka=WARN
lgo4j.category.org.zalando.nakadi.service.ClosedConnectionsCrutch=INFO
Loading

0 comments on commit 650d107

Please sign in to comment.