Skip to content

Commit

Permalink
Merge branch 'master' into dw/async_exec_via_search_apis
Browse files Browse the repository at this point in the history
  • Loading branch information
luk-kaminski authored Dec 13, 2024
2 parents 7ac06e6 + 5b1c7d1 commit 1081cce
Show file tree
Hide file tree
Showing 58 changed files with 750 additions and 741 deletions.
6 changes: 3 additions & 3 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ Alternatively the `SaveOrCancelButtons` component can be used in the edit compon

The following Java Code API changes have been made.

| File/method | Description |
|-------------------------------------|------------------------|
| `tbd` | tbd |
| File/method | Description |
|------------------------------------------------|-------------|
| `org.graylog.scheduler.JobSchedule#toDBUpdate` | removed |

## REST API Endpoint Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.joda.time.DateTime;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Optional;

public class CloudTrailCodec extends AbstractCodec {
public static final String NAME = "AWSCloudTrail";
Expand All @@ -49,9 +50,8 @@ public CloudTrailCodec(@Assisted Configuration configuration, @AWSObjectMapper O
this.messageFactory = messageFactory;
}

@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
public Optional<Message> decodeSafe(@Nonnull RawMessage rawMessage) {
try {
final CloudTrailRecord record = objectMapper.readValue(rawMessage.getPayload(), CloudTrailRecord.class);
final String source = configuration.getString(Config.CK_OVERRIDE_SOURCE, "aws-cloudtrail");
Expand All @@ -61,9 +61,10 @@ public Message decode(@Nonnull RawMessage rawMessage) {
message.addField("full_message", record.getFullMessage());
message.addField(AWS.SOURCE_GROUP_IDENTIFIER, true);

return message;
return Optional.of(message);
} catch (Exception e) {
throw new RuntimeException("Could not deserialize CloudTrail record.", e);
throw InputProcessingException.create("Could not deserialize CloudTrail record.",
e, rawMessage, new String(rawMessage.getPayload(), charset));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.mongodb.client.model.Filters;
import jakarta.inject.Inject;
import org.bson.conversions.Bson;
import org.graylog.events.notifications.EventNotificationExecutionJob;
import org.graylog.events.processor.EventDefinitionDto;
import org.graylog.events.processor.EventProcessorExecutionJob;
Expand All @@ -29,9 +32,6 @@
import org.graylog.scheduler.JobTriggerDto;
import org.graylog.scheduler.JobTriggerStatus;
import org.joda.time.DateTime;
import org.mongojack.DBQuery;

import jakarta.inject.Inject;

import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -80,10 +80,10 @@ private Map<String, List<JobTriggerDto>> getJobTriggers(Map<String, List<JobDefi
}

private long getQueuedNotifications(EventDefinitionDto eventDefinition) {
final DBQuery.Query query = DBQuery.and(
DBQuery.is("status", JobTriggerStatus.RUNNABLE),
DBQuery.is("data.type", EventNotificationExecutionJob.TYPE_NAME),
DBQuery.is("data.event_dto.event_definition_id", eventDefinition.id()));
final Bson query = Filters.and(
Filters.eq("status", JobTriggerStatus.RUNNABLE),
Filters.eq("data.type", EventNotificationExecutionJob.TYPE_NAME),
Filters.eq("data.event_dto.event_definition_id", eventDefinition.id()));

return jobTriggerService.countByQuery(query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.mongodb.BasicDBObject;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.Updates;
import jakarta.inject.Inject;
import org.bson.types.ObjectId;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.bson.conversions.Bson;
import org.graylog2.database.MongoCollections;
import org.joda.time.DateTime;
import org.mongojack.DBQuery;
import org.mongojack.DBUpdate;
import org.mongojack.JacksonDBCollection;
import org.mongojack.UpdateOperationValue;
import org.mongojack.internal.update.SingleUpdateOperationValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,34 +37,27 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;
import static org.graylog.events.processor.EventProcessorStateDto.FIELD_EVENT_DEFINITION_ID;
import static org.graylog.events.processor.EventProcessorStateDto.FIELD_MAX_PROCESSED_TIMESTAMP;
import static org.graylog.events.processor.EventProcessorStateDto.FIELD_MIN_PROCESSED_TIMESTAMP;

/**
* Manages database state for {@link EventProcessor}s.
*/
// This class does NOT use PaginatedDbService because we don't want to allow overwriting records via "save()"
// and other methods.
public class DBEventProcessorStateService {
private static final Logger LOG = LoggerFactory.getLogger(DBEventProcessorStateService.class);
private static final String COLLECTION_NAME = "event_processor_state";

private final JacksonDBCollection<EventProcessorStateDto, ObjectId> db;
private final MongoCollection<EventProcessorStateDto> collection;

@Inject
public DBEventProcessorStateService(MongoConnection mongoConnection,
MongoJackObjectMapperProvider mapper) {
this.db = JacksonDBCollection.wrap(mongoConnection.getDatabase().getCollection(COLLECTION_NAME),
EventProcessorStateDto.class,
ObjectId.class,
mapper.get());
public DBEventProcessorStateService(MongoCollections mongoCollections) {
collection = mongoCollections.collection(COLLECTION_NAME, EventProcessorStateDto.class);

// There should only be one state document for each event processor.
db.createIndex(new BasicDBObject(FIELD_EVENT_DEFINITION_ID, 1), new BasicDBObject("unique", true));
db.createIndex(new BasicDBObject(FIELD_MIN_PROCESSED_TIMESTAMP, 1));
db.createIndex(new BasicDBObject(FIELD_MAX_PROCESSED_TIMESTAMP, 1));
collection.createIndex(Indexes.ascending(FIELD_EVENT_DEFINITION_ID), new IndexOptions().unique(true));
collection.createIndex(Indexes.ascending(FIELD_MIN_PROCESSED_TIMESTAMP));
collection.createIndex(Indexes.ascending(FIELD_MAX_PROCESSED_TIMESTAMP));
}

/**
Expand All @@ -77,7 +70,7 @@ public DBEventProcessorStateService(MongoConnection mongoConnection,
Optional<EventProcessorStateDto> findByEventDefinitionId(String eventDefinitionId) {
checkArgument(!isNullOrEmpty(eventDefinitionId), "eventDefinitionId cannot be null or empty");

return Optional.ofNullable(db.findOne(DBQuery.is(FIELD_EVENT_DEFINITION_ID, eventDefinitionId)));
return Optional.ofNullable(collection.find(Filters.eq(FIELD_EVENT_DEFINITION_ID, eventDefinitionId)).first());
}

/**
Expand All @@ -92,12 +85,12 @@ public ImmutableSet<EventProcessorStateDto> findByEventDefinitionsAndMaxTimestam
checkArgument(eventDefinitionIds != null && !eventDefinitionIds.isEmpty(), "eventDefinitionIds cannot be null or empty");
checkArgument(maxTimestamp != null, "maxTimestamp cannot be null");

final DBQuery.Query query = DBQuery.and(
DBQuery.in(FIELD_EVENT_DEFINITION_ID, eventDefinitionIds),
DBQuery.greaterThanEquals(FIELD_MAX_PROCESSED_TIMESTAMP, maxTimestamp)
final Bson query = Filters.and(
Filters.in(FIELD_EVENT_DEFINITION_ID, eventDefinitionIds),
Filters.gte(FIELD_MAX_PROCESSED_TIMESTAMP, maxTimestamp)
);

return ImmutableSet.copyOf(db.find(query).iterator());
return ImmutableSet.copyOf(collection.find(query));
}

/**
Expand Down Expand Up @@ -133,31 +126,13 @@ public Optional<EventProcessorStateDto> setState(String eventDefinitionId,
// Example: If the minProcessedTimestamp argument is newer than the value in the existing record, we don't
// want to change it. The other way around for the maxProcessedTimestamp.
// That's why we are using the $min and $max operations for the update query.
final DBUpdate.Builder update = DBUpdate.set(FIELD_EVENT_DEFINITION_ID, eventDefinitionId)
// Our current mongojack implementation doesn't offer $min/$max helper
.addOperation("$min", FIELD_MIN_PROCESSED_TIMESTAMP, updateValue(minProcessedTimestamp))
.addOperation("$max", FIELD_MAX_PROCESSED_TIMESTAMP, updateValue(maxProcessedTimestamp));

return Optional.ofNullable(db.findAndModify(
// We have a unique index on the eventDefinitionId so this query is enough
DBQuery.is(FIELD_EVENT_DEFINITION_ID, eventDefinitionId),
null,
null,
false,
update,
true, // We want to return the updated document to the caller
true));
}
final Bson update = Updates.combine(
Updates.set(FIELD_EVENT_DEFINITION_ID, eventDefinitionId),
Updates.min(FIELD_MIN_PROCESSED_TIMESTAMP, minProcessedTimestamp),
Updates.max(FIELD_MAX_PROCESSED_TIMESTAMP, maxProcessedTimestamp));

/**
* Only used to create an {@link UpdateOperationValue} for
* {@link DBUpdate.Builder#addOperation(String, String, UpdateOperationValue)}.
*
* @param value the object value
* @return the update operation value
*/
private SingleUpdateOperationValue updateValue(Object value) {
return new SingleUpdateOperationValue(false, true, value);
return Optional.ofNullable(collection.findOneAndUpdate(Filters.eq(FIELD_EVENT_DEFINITION_ID, eventDefinitionId),
update, new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)));
}

/**
Expand All @@ -167,8 +142,6 @@ private SingleUpdateOperationValue updateValue(Object value) {
* @return the number of objects that have been deleted
*/
public int deleteByEventDefinitionId(String id) {
return findByEventDefinitionId(id)
.map(dto -> db.removeById(new ObjectId(requireNonNull(dto.id()))).getN())
.orElse(0);
return (int) collection.deleteOne(Filters.eq(FIELD_EVENT_DEFINITION_ID, id)).getDeletedCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.graylog.events.processor;

import com.mongodb.client.model.Filters;
import jakarta.inject.Inject;
import org.bson.conversions.Bson;
import org.graylog.events.event.Event;
import org.graylog.events.event.EventWithContext;
import org.graylog.events.notifications.EventNotificationExecutionJob;
Expand All @@ -28,12 +31,9 @@
import org.graylog2.database.entities.DefaultEntityScope;
import org.graylog2.plugin.database.users.User;
import org.joda.time.DateTime;
import org.mongojack.DBQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -206,7 +206,7 @@ public boolean deleteImmutable(String eventDefinitionId) {

private boolean doDelete(String eventDefinitionId, Supplier<Boolean> deleteSupplier) {
final Optional<EventDefinitionDto> optionalEventDefinition = eventDefinitionService.get(eventDefinitionId);
if (!optionalEventDefinition.isPresent()) {
if (optionalEventDefinition.isEmpty()) {
return false;
}

Expand Down Expand Up @@ -387,9 +387,9 @@ private void deleteJobDefinitionAndTrigger(JobDefinitionDto jobDefinition, Event
}

private void deleteNotificationJobTriggers(EventDefinitionDto eventDefinition) {
final DBQuery.Query query = DBQuery.and(
DBQuery.is("data.type", EventNotificationExecutionJob.TYPE_NAME),
DBQuery.is("data.event_dto.event_definition_id", eventDefinition.id()));
final Bson query = Filters.and(
Filters.eq("data.type", EventNotificationExecutionJob.TYPE_NAME),
Filters.eq("data.event_dto.event_definition_id", eventDefinition.id()));

final int numberOfDeletedTriggers = jobTriggerService.deleteByQuery(query);

Expand Down Expand Up @@ -427,7 +427,7 @@ private void updateJobTrigger(EventDefinitionDto eventDefinition,
JobDefinitionDto oldJobDefinition,
EventProcessorSchedulerConfig schedulerConfig) {
final Optional<JobTriggerDto> optionalOldJobTrigger = getJobTrigger(jobDefinition);
if (!optionalOldJobTrigger.isPresent()) {
if (optionalOldJobTrigger.isEmpty()) {
// Nothing to do if there are no job triggers to update
return;
}
Expand Down Expand Up @@ -460,7 +460,7 @@ private void updateJobTrigger(EventDefinitionDto eventDefinition,

private void deleteJobTrigger(JobDefinitionDto jobDefinition, EventDefinitionDto eventDefinition) {
final Optional<JobTriggerDto> optionalJobTrigger = getJobTrigger(jobDefinition);
if (!optionalJobTrigger.isPresent()) {
if (optionalJobTrigger.isEmpty()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.auto.value.AutoValue;
import org.graylog2.database.MongoEntity;
import org.joda.time.DateTime;
import org.mongojack.Id;
import org.mongojack.ObjectId;
Expand All @@ -28,7 +29,7 @@

@AutoValue
@JsonDeserialize(builder = EventProcessorStateDto.Builder.class)
public abstract class EventProcessorStateDto {
public abstract class EventProcessorStateDto implements MongoEntity {
private static final String FIELD_ID = "id";
static final String FIELD_EVENT_DEFINITION_ID = "event_definition_id";
static final String FIELD_MIN_PROCESSED_TIMESTAMP = "min_processed_timestamp";
Expand Down Expand Up @@ -78,4 +79,4 @@ public static Builder create() {

public abstract EventProcessorStateDto build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

public class FlowLogMessage {
private static final Logger LOG = LoggerFactory.getLogger(FlowLogMessage.class);

Expand Down Expand Up @@ -74,13 +72,11 @@ private FlowLogMessage(DateTime timestamp,
this.logStatus = logStatus;
}

@Nullable
public static FlowLogMessage fromLogEvent(final KinesisLogEntry logEvent) {
final String[] parts = logEvent.message().split(" ");

if (parts.length != 14) {
LOG.warn("Received FlowLog message with not exactly 14 fields. Skipping. Message was: [{}]", logEvent.message());
return null;
throw new RuntimeException("Received FlowLog message with not exactly 14 fields. Skipping. Message was: [%s]".formatted(logEvent.message()));
}

return new FlowLogMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.graylog.integrations.aws.codecs;

import com.google.inject.assistedinject.Assisted;
import jakarta.inject.Inject;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
Expand All @@ -28,17 +29,15 @@
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import jakarta.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class AWSCodec extends AbstractCodec {
Expand All @@ -64,28 +63,21 @@ public AWSCodec(@Assisted Configuration configuration,
this.availableCodecs = availableCodecs;
}

@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
public Optional<Message> decodeSafe(@Nonnull RawMessage rawMessage) {

// Load the codec by message type.
final AWSMessageType awsMessageType = AWSMessageType.valueOf(configuration.getString(CK_AWS_MESSAGE_TYPE));
final Codec.Factory<? extends Codec> codecFactory = this.availableCodecs.get(awsMessageType.getCodecName());
if (codecFactory == null) {
LOG.error("A codec with name [{}] could not be found.", awsMessageType.getCodecName());
return null;
throw InputProcessingException.create("A codec with name [%s] could not be found.".formatted(awsMessageType.getCodecName()),
rawMessage, new String(rawMessage.getPayload(), charset));
}

final Codec codec = codecFactory.create(configuration);

// Parse the message with the specified codec.
final Message message = codec.decode(new RawMessage(rawMessage.getPayload()));
if (message == null) {
LOG.error("Failed to decode message for codec [{}].", codec.getName());
return null;
}

return message;
return codec.decodeSafe(new RawMessage(rawMessage.getPayload()));
}

@Override
Expand Down
Loading

0 comments on commit 1081cce

Please sign in to comment.