Skip to content
This repository has been archived by the owner on Sep 7, 2023. It is now read-only.

Handle updates in batches #33

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions Processor/src/main/java/org/ulyssis/ipp/processor/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.ulyssis.ipp.processor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ulyssis.ipp.config.Config;
Expand Down Expand Up @@ -79,7 +80,7 @@
public final class Processor implements Runnable {
private static final Logger LOG = LogManager.getLogger(Processor.class);

private final BlockingQueue<Event> eventQueue;
private final BlockingQueue<List<Event>> eventQueue;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

private final ConcurrentMap<Event, Consumer<Boolean> > eventCallbacks;
Expand Down Expand Up @@ -255,8 +256,8 @@ public void run() {
notifyStarted();
try {
while (!Thread.currentThread().isInterrupted()) {
Event event = eventQueue.take();
processEvent(event);
List<Event> events = eventQueue.take();
processEvents(events);
// TODO(Roel): Do deferred event processing later!
}
} catch (InterruptedException ignored) {
Expand Down Expand Up @@ -315,7 +316,7 @@ private void trySpawnReaderListener(int readerId) {
}
}
Jedis subJedis = JedisHelper.get(uri);
ReaderListener listener = new ReaderListener(readerId, this::queueEvent, lastUpdate);
ReaderListener listener = new ReaderListener(readerId, this::queueEvents, lastUpdate);
readerListeners.add(listener);
Thread thread = new Thread(() -> {
try {
Expand All @@ -342,40 +343,42 @@ private void spawnReaderListener(int readerId) {
executorService.submit(() -> trySpawnReaderListener(readerId));
}

private void processEvent(Event event) {
logProcessEvent(event);
private void processEvents(List<Event> events) {
Connection connection = null;
Snapshot oldSnapshot = this.snapshot;
try {
connection = Database.createConnection(EnumSet.of(READ_WRITE));
Event firstEvent = event;
if (event.isUnique()) {
Optional<Event> other = Event.loadUnique(connection, event.getClass());
if (other.isPresent()) {
other.get().setRemoved(connection, true);
if (!other.get().getTime().isAfter(event.getTime())) {
firstEvent = other.get();
Event firstEvent = events.get(0);
for (Event event : events) {
logProcessEvent(event);
if (event.isUnique()) {
Optional<Event> other = Event.loadUnique(connection, event.getClass());
if (other.isPresent()) {
other.get().setRemoved(connection, true);
if (!other.get().getTime().isAfter(firstEvent.getTime())) {
firstEvent = other.get();
}
}
}
event.save(connection);
}
event.save(connection);
Snapshot snapshotToUpdateFrom = this.snapshot;
if (!firstEvent.getTime().isAfter(this.snapshot.getSnapshotTime())) {
LOG.debug("Event before current snapshot, loading snapshot before");
Optional<Snapshot> s = Snapshot.loadBefore(connection, firstEvent.getTime());
if (s.isPresent()) snapshotToUpdateFrom = s.get();
else snapshotToUpdateFrom = new Snapshot(Instant.EPOCH);
}
List<Event> events;
List<Event> eventsSinceFirstEvent;
Snapshot.deleteAfter(connection, snapshotToUpdateFrom);
LOG.debug("Updating from snapshot: {}", snapshotToUpdateFrom.getId());
if (snapshotToUpdateFrom.getId().isPresent()) {
assert snapshotToUpdateFrom.getEventId().isPresent();
events = Event.loadAfter(connection, snapshotToUpdateFrom.getSnapshotTime(), snapshotToUpdateFrom.getEventId().get());
eventsSinceFirstEvent = Event.loadAfter(connection, snapshotToUpdateFrom.getSnapshotTime(), snapshotToUpdateFrom.getEventId().get());
} else {
events = Event.loadAll(connection);
eventsSinceFirstEvent = Event.loadAll(connection);
}
for (Event e : events) {
for (Event e : eventsSinceFirstEvent) {
if (!e.isRemoved()) {
snapshotToUpdateFrom = e.apply(snapshotToUpdateFrom);
snapshotToUpdateFrom.save(connection);
Expand All @@ -386,9 +389,11 @@ private void processEvent(Event event) {
connection.commit();
// TODO: Provide a sensible message for NEW_SNAPSHOT?
statusReporter.broadcast(new StatusMessage(StatusMessage.MessageType.NEW_SNAPSHOT, "New snapshot!"));
if (eventCallbacks.containsKey(event)) {
eventCallbacks.get(event).accept(true);
eventCallbacks.remove(event);
for (Event event : events) {
if (eventCallbacks.containsKey(event)) {
eventCallbacks.get(event).accept(true);
eventCallbacks.remove(event);
}
}
} catch (SQLException | IOException e) {
LOG.error("Error when handling event!", e);
Expand Down Expand Up @@ -421,19 +426,22 @@ private void logProcessEvent(Event event) {
}

/**
* Queue a tag update for processing.
* Queue a single event for processing, with a callback
*/
private void queueEvent(Event event) {
private void queueEvent(Event event, Consumer<Boolean> callback) {
try {
eventQueue.put(event);
eventCallbacks.put(event, callback);
eventQueue.put(ImmutableList.of(event));
} catch (InterruptedException ignored) {
}
}

private void queueEvent(Event event, Consumer<Boolean> callback) {
/**
* Queue a batch of events for processing
*/
private void queueEvents(List<Event> events) {
try {
eventCallbacks.put(event, callback);
eventQueue.put(event);
eventQueue.put(events);
} catch (InterruptedException ignored) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.ulyssis.ipp.processor;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ulyssis.ipp.config.Config;
Expand All @@ -28,6 +29,8 @@
import redis.clients.jedis.Jedis;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
Expand All @@ -45,9 +48,9 @@ public final class ReaderListener extends JedisHelper.CallBackPubSub {
/** The Redis connection to the reader */
private final Jedis remoteJedis;
/** The processor that this ReaderListener belongs to (and will push updates to) */
private final Consumer<Event> updateConsumer;
private final Consumer<List<Event>> updateConsumer;

public ReaderListener(int id, final Consumer<Event> updateConsumer, Optional<Long> lastUpdate) {
public ReaderListener(int id, final Consumer<List<Event>> updateConsumer, Optional<Long> lastUpdate) {
this.updateConsumer = updateConsumer;
this.remoteJedis = JedisHelper.get(Config.getCurrentConfig().getReader(id).getURI());
this.lastUpdate = lastUpdate.orElse(-1L);
Expand All @@ -66,32 +69,36 @@ private void onMessageListener(String channel, String message) {
long updateId = Long.parseLong(message, 10);
List<byte[]> updates = remoteJedis.lrange(Config.getCurrentConfig().getUpdatesList().getBytes(),
lastUpdate + 1L, updateId);
for (byte[] update : updates) {
processMessage(update);
if (!updates.isEmpty()) {
processMessages(updates);
lastUpdate = updateId;
}
lastUpdate = updateId;
}

/**
* Batch process all updates on startup
*/
private void syncUpdates() {
List<byte[]> updates = remoteJedis.lrange(Config.getCurrentConfig().getUpdatesList().getBytes(), lastUpdate + 1L, -1L);
for (byte[] update : updates) {
processMessage(update);
if (!updates.isEmpty()) {
processMessages(updates);
lastUpdate += updates.size();
}
lastUpdate += updates.size();
}

/**
* Turn a JSON-represented message into a TagUpdate object, and queue it for processing.
* Turn a list of JSON-represented messages into TagUpdate objects, and queue them for processing.
*/
private void processMessage(byte[] message) {
try {
TagUpdate update = Serialization.getJsonMapper().readValue(message, TagUpdate.class);
updateConsumer.accept(new TagSeenEvent(update));
} catch (IOException e) {
LOG.error("Couldn't process update {}!", new String(message), e);
private void processMessages(List<byte[]> messages) {
List<TagSeenEvent> updates = new ArrayList<>();
for (byte[] message : messages) {
try {
TagUpdate update = Serialization.getJsonMapper().readValue(message, TagUpdate.class);
updates.add(new TagSeenEvent(update));
} catch (IOException e) {
LOG.error("Couldn't process update {}!", new String(message), e);
}
}
updateConsumer.accept(ImmutableList.copyOf(updates));
}
}