Skip to content

Commit

Permalink
Fix handling of missing pipeline connection. (#21115)
Browse files Browse the repository at this point in the history
* Fix handling of missing pipeline connection.

* Adding test case.
  • Loading branch information
dennisoelkers authored Dec 5, 2024
1 parent 465cc2d commit be29310
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.graylog2.plugin.streams.Stream.DEFAULT_EVENTS_STREAM_ID;
import static org.graylog2.plugin.streams.Stream.DEFAULT_STREAM_ID;
import static org.graylog2.plugin.streams.Stream.DEFAULT_SYSTEM_EVENTS_STREAM_ID;

@ContainerMatrixTestsConfiguration
public class StreamsPipelinesIT {
Expand Down Expand Up @@ -64,26 +68,55 @@ void afterAll() {
api.indices().deleteIndexSet(this.indexSetId, true);
}

private record BulkPipelinesRequest(Collection<String> streamIds) {}

@ContainerMatrixTest
void bulkRetrievalOfPipelineConnections() throws Exception {
final var result = api.post("/streams/pipelines", Map.of("stream_ids", Set.of(stream1Id, stream2Id, stream3Id)), 200)
final var result = api.post("/streams/pipelines", new BulkPipelinesRequest(Set.of(stream1Id, stream2Id, stream3Id)), 200)
.extract().body().jsonPath();
final var pipeline1 = Map.of("id", pipeline1Id, "title", pipeline1Title);
final var pipeline2 = Map.of("id", pipeline2Id, "title", pipeline2Title);
final var pipeline1 = pipelineSummary(pipeline1Id, pipeline1Title);
final var pipeline2 = pipelineSummary(pipeline2Id, pipeline2Title);

assertThat(result.getList(stream1Id)).containsExactlyInAnyOrder(pipeline1, pipeline2);
assertThat(result.getList(stream2Id)).containsExactlyInAnyOrder(pipeline1);
assertThat(result.getList(stream3Id)).containsExactlyInAnyOrder(pipeline2);
}

@ContainerMatrixTest
void bulkRetrievalOfPipelineConnectionsForBuiltinStreams() throws Exception {
final var result = api.post("/streams/pipelines", new BulkPipelinesRequest(Set.of(DEFAULT_STREAM_ID, DEFAULT_EVENTS_STREAM_ID, DEFAULT_SYSTEM_EVENTS_STREAM_ID)), 200)
.extract().body().jsonPath();

assertThat(result.getList(DEFAULT_STREAM_ID)).isEmpty();
assertThat(result.getList(DEFAULT_EVENTS_STREAM_ID)).isEmpty();
assertThat(result.getList(DEFAULT_SYSTEM_EVENTS_STREAM_ID)).isEmpty();
}

@ContainerMatrixTest
void bulkRetrievalOfPipelineConnectionsForDanglingReferences() throws Exception {
final var defaultIndexSet = api.indices().defaultIndexSetId();
final var streamId = api.streams().createStream("Stream with dangling pipeline reference", defaultIndexSet);
final var pipelineId = api.pipelines().create("A pipeline which is about to get deleted", Set.of(streamId));
api.pipelines().delete(pipelineId);

final var result = api.post("/streams/pipelines", new BulkPipelinesRequest(Set.of(streamId)), 200)
.extract().body().jsonPath();

assertThat(result.getList(streamId)).isEmpty();
}

@ContainerMatrixTest
void retrievePipelineConnectionsForASingleStream() {
var result = api.get("/streams/" + stream1Id + "/pipelines", 200)
.extract().body().jsonPath();

final var pipeline1 = Map.of("id", pipeline1Id, "title", pipeline1Title);
final var pipeline2 = Map.of("id", pipeline2Id, "title", pipeline2Title);
final var pipeline1 = pipelineSummary(pipeline1Id, pipeline1Title);
final var pipeline2 = pipelineSummary(pipeline2Id, pipeline2Title);

assertThat(result.getList("")).containsExactlyInAnyOrder(pipeline1, pipeline2);
}

private Map<String, String> pipelineSummary(String id, String title) {
return Map.of("id", id, "title", title);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ public Map<String, List<PipelineCompactSource>> getConnectedPipelinesForStreams(
.map(PipelineConnections::pipelineIds)
.orElse(Set.of());
return pipelinesForStream.stream()
.map(pipelines::get)
.flatMap(pipeline -> Optional.ofNullable(pipelines.get(pipeline)).stream())
.map(pipeline -> PipelineCompactSource.create(pipeline.id(), pipeline.title()))
.toList();
}));
Expand Down

0 comments on commit be29310

Please sign in to comment.