From edfa58a1b646c213db472fde45dbfff7c6256d30 Mon Sep 17 00:00:00 2001 From: mohitjha-elastic Date: Tue, 19 Nov 2024 17:46:11 +0530 Subject: [PATCH 1/7] Add support of crowdstrike event stream via streaming. --- .../crowdstrike/_dev/build/docs/README.md | 27 ++++++-- packages/crowdstrike/changelog.yml | 8 +++ .../falcon/agent/stream/streaming.yml.hbs | 29 ++++++++ .../data_stream/falcon/manifest.yml | 67 +++++++++++++++++++ packages/crowdstrike/docs/README.md | 27 ++++++-- packages/crowdstrike/manifest.yml | 7 +- 6 files changed, 153 insertions(+), 12 deletions(-) create mode 100644 packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs diff --git a/packages/crowdstrike/_dev/build/docs/README.md b/packages/crowdstrike/_dev/build/docs/README.md index 93c1161a60b..764ae6cbc7a 100644 --- a/packages/crowdstrike/_dev/build/docs/README.md +++ b/packages/crowdstrike/_dev/build/docs/README.md @@ -1,24 +1,28 @@ # CrowdStrike Integration -The [CrowdStrike](https://www.crowdstrike.com/) Falcon integration allows you to easily connect your CrowdStrike Falcon platform to Elastic for seamless onboarding of alerts and telemetry from CrowdStrike Falcon and Falcon Data Replicator. Elastic Security can leverage this data for security analytics including correlation, visualization and incident response. It provides support using three different modes for integrating CrowdStrike to the Elastic: +The [CrowdStrike](https://www.crowdstrike.com/) integration allows you to easily connect your CrowdStrike Falcon platform to Elastic for seamless onboarding of alerts and telemetry from CrowdStrike Falcon and Falcon Data Replicator. Elastic Security can leverage this data for security analytics including correlation, visualization and incident response. It provides support using four different modes for integrating CrowdStrike to the Elastic: -1. Falcon SIEM Connector: This is a pre-built integration designed to connect CrowdStrike Falcon with Security Information and Event Management (SIEM) systems. It streamlines the flow of security data from CrowdStrike Falcon to the SIEM, providing a standardized and structured way of feeding information into the SIEM platform. It includes the following datasets for receiving logs: +1. **Falcon SIEM Connector**: This is a pre-built integration designed to connect CrowdStrike Falcon with Security Information and Event Management (SIEM) systems. It streamlines the flow of security data from CrowdStrike Falcon to the SIEM, providing a standardized and structured way of feeding information into the SIEM platform. It includes the following datasets for receiving logs: - `falcon` dataset: consists of endpoint data and Falcon platform audit data forwarded from [Falcon SIEM Connector](https://www.crowdstrike.com/blog/tech-center/integrate-with-your-siem/). -2. CrowdStrike REST API: This provides a programmatic interface to interact with the CrowdStrike Falcon platform. It allows users to perform various operations such as querying information about unified alerts and hosts/devices. It includes the following datasets for receiving logs: +2. **CrowdStrike REST API**: This provides a programmatic interface to interact with the CrowdStrike Falcon platform. It allows users to perform various operations such as querying information about unified alerts and hosts/devices. It includes the following datasets for receiving logs: - `alert` dataset: It is typically used to retrieve detailed information about unified alerts generated by the CrowdStrike Falcon platform, via Falcon Intelligence Alert API - `/alerts/entities/alerts/v2`. - `host` dataset: It retrieves all the hosts/devices in your environment providing information such as device metadata, configuration, and status generated by the CrowdStrike Falcon platform, via Falcon Intelligence Host/Device API - `/devices/entities/devices/v2`. It is more focused to provide the management and monitoring information of devices such as login details, status, policies, configuration etc. -3. Falcon Data Replicator: This Collect events in near real time from your endpoints and cloud workloads, identities and data. CrowdStrike Falcon Data Replicator (FDR) enables you with actionable insights to improve SOC performance. FDR contains near real-time data collected by the Falcon platform's single, lightweight agent. It includes the following datasets for receiving logs: +3. **Falcon Data Replicator**: This Collect events in near real time from your endpoints and cloud workloads, identities and data. CrowdStrike Falcon Data Replicator (FDR) enables you with actionable insights to improve SOC performance. FDR contains near real-time data collected by the Falcon platform's single, lightweight agent. It includes the following datasets for receiving logs: - `fdr` dataset: consists of logs forwarded using the [Falcon Data Replicator](https://github.com/CrowdStrike/FDR). +4. **CrowdStrike Event Stream**: This streams security logs from CrowdStrike Event Stream, including authentication activity, cloud security posture management (CSPM), firewall logs, user activity, and XDR data. It captures real-time security events like user logins, cloud environment changes, network traffic, and advanced threat detections. The streaming integration provides continuous monitoring and analysis for proactive threat detection. It enhances visibility into user behavior, network security, and overall system health. This setup enables faster response capabilities to emerging security incidents. It includes the following datasets for receiving logs: + +- `falcon` dataset: consists of streaming data forwarded from CrowdStrike Event Stream. + ## Compatibility -This integration is compatible with both CrowdStrike Falcon SIEM-Connector-v2.0 and REST API. +This integration is compatible with CrowdStrike Falcon SIEM-Connector-v2.0, REST API, and CrowdStrike Event Streaming. For Rest API support, this module has been tested against the **CrowdStrike API Version v1/v2**. ## Setup @@ -35,6 +39,19 @@ For Rest API support, this module has been tested against the **CrowdStrike API | Alert | read:alert | | Host | read:host | +### To collect data from CrowdStrike Event Stream, the following parameters from your CrowdStrike instance are required: + +1. Client ID +2. Client Secret +3. Token URL +4. API Endpoint URL +5. CrowdStrike App ID +6. Required scopes for event stream: + + | Data Stream | Scope | + | ------------- | ------------------- | + | Event Stream | read: Event streams | + ## Logs ### Alert diff --git a/packages/crowdstrike/changelog.yml b/packages/crowdstrike/changelog.yml index 7fc934ed11c..c9a3dc6d3a1 100644 --- a/packages/crowdstrike/changelog.yml +++ b/packages/crowdstrike/changelog.yml @@ -1,4 +1,12 @@ # newer versions go on top +- version: "1.46.0" + changes: + - description: Add Support of CrowdStrike Event Stream. + type: enhancement + link: https://github.com/elastic/integrations/pull/1 + - description: Update the minimum kibana version to 8.16.0. + type: enhancement + link: https://github.com/elastic/integrations/pull/1 - version: "1.45.0" changes: - description: Add support for FQL queries in `alert` and `host` data streams. diff --git a/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs b/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs new file mode 100644 index 00000000000..932c268eec4 --- /dev/null +++ b/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs @@ -0,0 +1,29 @@ +stream_type: crowdstrike +url: {{url}}/sensors/entities/datafeed/v2 +auth: + client_id: {{client_id}} + client_secret: {{client_secret}} + token_url: {{token_url}} +crowdstrike_app_id: {{app_id}} +redact: + fields: ~ +program: | + bytes(state.response).decode_json().as(body,{ + "events": { + "message": body.encode_json(), + } + }) +tags: +{{#if preserve_original_event}} + - preserve_original_event +{{/if}} +{{#each tags as |tag|}} + - {{tag}} +{{/each}} +{{#contains "forwarded" tags}} +publisher_pipeline.disable_host: true +{{/contains}} +processors: +{{#if processors}} +{{processors}} +{{/if}} \ No newline at end of file diff --git a/packages/crowdstrike/data_stream/falcon/manifest.yml b/packages/crowdstrike/data_stream/falcon/manifest.yml index 6f1a96ca600..e8b5dc56d52 100644 --- a/packages/crowdstrike/data_stream/falcon/manifest.yml +++ b/packages/crowdstrike/data_stream/falcon/manifest.yml @@ -41,3 +41,70 @@ streams: template_path: log.yml.hbs title: Crowdstrike falcon logs (log) description: Collect Crowdstrike falcon logs using log input + - input: streaming + template_path: streaming.yml.hbs + title: CrowdStrike Falcon Logs + description: Collect Falcon logs from CrowdStrike Event Stream. + enabled: false + vars: + - name: url + type: text + title: URL + description: Base URL of the CrowdStrike API. Defaults to https://api.crowdstrike.com. + default: https://api.crowdstrike.com + required: true + show_user: true + - name: token_url + type: text + title: Token URL + description: Token URL of CrowdStrike. + default: https://api.crowdstrike.com/oauth2/token + required: true + show_user: false + - name: client_id + type: text + title: Client ID + description: Client ID for the CrowdStrike. + multi: false + required: true + show_user: true + - name: client_secret + type: password + title: Client Secret + description: Client Secret for the CrowdStrike. + multi: false + required: true + show_user: true + secret: true + - name: app_id + type: text + title: App ID + description: App ID for the CrowdStrike. + multi: false + required: true + show_user: true + - name: tags + type: text + title: Tags + multi: true + required: true + show_user: false + default: + - forwarded + - crowdstrike-falcon + - name: preserve_original_event + required: true + show_user: true + title: Preserve original event + description: Preserves a raw copy of the original event, added to the field `event.original`. + type: bool + multi: false + default: false + - name: processors + type: yaml + title: Processors + multi: false + required: false + show_user: false + description: > + Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html) for details. diff --git a/packages/crowdstrike/docs/README.md b/packages/crowdstrike/docs/README.md index 60e36a421f9..ac5fab19177 100644 --- a/packages/crowdstrike/docs/README.md +++ b/packages/crowdstrike/docs/README.md @@ -1,24 +1,28 @@ # CrowdStrike Integration -The [CrowdStrike](https://www.crowdstrike.com/) Falcon integration allows you to easily connect your CrowdStrike Falcon platform to Elastic for seamless onboarding of alerts and telemetry from CrowdStrike Falcon and Falcon Data Replicator. Elastic Security can leverage this data for security analytics including correlation, visualization and incident response. It provides support using three different modes for integrating CrowdStrike to the Elastic: +The [CrowdStrike](https://www.crowdstrike.com/) integration allows you to easily connect your CrowdStrike Falcon platform to Elastic for seamless onboarding of alerts and telemetry from CrowdStrike Falcon and Falcon Data Replicator. Elastic Security can leverage this data for security analytics including correlation, visualization and incident response. It provides support using four different modes for integrating CrowdStrike to the Elastic: -1. Falcon SIEM Connector: This is a pre-built integration designed to connect CrowdStrike Falcon with Security Information and Event Management (SIEM) systems. It streamlines the flow of security data from CrowdStrike Falcon to the SIEM, providing a standardized and structured way of feeding information into the SIEM platform. It includes the following datasets for receiving logs: +1. **Falcon SIEM Connector**: This is a pre-built integration designed to connect CrowdStrike Falcon with Security Information and Event Management (SIEM) systems. It streamlines the flow of security data from CrowdStrike Falcon to the SIEM, providing a standardized and structured way of feeding information into the SIEM platform. It includes the following datasets for receiving logs: - `falcon` dataset: consists of endpoint data and Falcon platform audit data forwarded from [Falcon SIEM Connector](https://www.crowdstrike.com/blog/tech-center/integrate-with-your-siem/). -2. CrowdStrike REST API: This provides a programmatic interface to interact with the CrowdStrike Falcon platform. It allows users to perform various operations such as querying information about unified alerts and hosts/devices. It includes the following datasets for receiving logs: +2. **CrowdStrike REST API**: This provides a programmatic interface to interact with the CrowdStrike Falcon platform. It allows users to perform various operations such as querying information about unified alerts and hosts/devices. It includes the following datasets for receiving logs: - `alert` dataset: It is typically used to retrieve detailed information about unified alerts generated by the CrowdStrike Falcon platform, via Falcon Intelligence Alert API - `/alerts/entities/alerts/v2`. - `host` dataset: It retrieves all the hosts/devices in your environment providing information such as device metadata, configuration, and status generated by the CrowdStrike Falcon platform, via Falcon Intelligence Host/Device API - `/devices/entities/devices/v2`. It is more focused to provide the management and monitoring information of devices such as login details, status, policies, configuration etc. -3. Falcon Data Replicator: This Collect events in near real time from your endpoints and cloud workloads, identities and data. CrowdStrike Falcon Data Replicator (FDR) enables you with actionable insights to improve SOC performance. FDR contains near real-time data collected by the Falcon platform's single, lightweight agent. It includes the following datasets for receiving logs: +3. **Falcon Data Replicator**: This Collect events in near real time from your endpoints and cloud workloads, identities and data. CrowdStrike Falcon Data Replicator (FDR) enables you with actionable insights to improve SOC performance. FDR contains near real-time data collected by the Falcon platform's single, lightweight agent. It includes the following datasets for receiving logs: - `fdr` dataset: consists of logs forwarded using the [Falcon Data Replicator](https://github.com/CrowdStrike/FDR). +4. **CrowdStrike Event Stream**: This streams security logs from CrowdStrike Event Stream, including authentication activity, cloud security posture management (CSPM), firewall logs, user activity, and XDR data. It captures real-time security events like user logins, cloud environment changes, network traffic, and advanced threat detections. The streaming integration provides continuous monitoring and analysis for proactive threat detection. It enhances visibility into user behavior, network security, and overall system health. This setup enables faster response capabilities to emerging security incidents. It includes the following datasets for receiving logs: + +- `falcon` dataset: consists of streaming data forwarded from CrowdStrike Event Stream. + ## Compatibility -This integration is compatible with both CrowdStrike Falcon SIEM-Connector-v2.0 and REST API. +This integration is compatible with CrowdStrike Falcon SIEM-Connector-v2.0, REST API, and CrowdStrike Event Streaming. For Rest API support, this module has been tested against the **CrowdStrike API Version v1/v2**. ## Setup @@ -35,6 +39,19 @@ For Rest API support, this module has been tested against the **CrowdStrike API | Alert | read:alert | | Host | read:host | +### To collect data from CrowdStrike Event Stream, the following parameters from your CrowdStrike instance are required: + +1. Client ID +2. Client Secret +3. Token URL +4. API Endpoint URL +5. CrowdStrike App ID +6. Required scopes for event stream: + + | Data Stream | Scope | + | ------------- | ------------------- | + | Event Stream | read: Event streams | + ## Logs ### Alert diff --git a/packages/crowdstrike/manifest.yml b/packages/crowdstrike/manifest.yml index d1c5ebeca5b..ceb93405aa2 100644 --- a/packages/crowdstrike/manifest.yml +++ b/packages/crowdstrike/manifest.yml @@ -1,13 +1,13 @@ name: crowdstrike title: CrowdStrike -version: "1.45.0" +version: "1.46.0" description: Collect logs from Crowdstrike with Elastic Agent. type: integration format_version: "3.0.3" categories: [security, edr_xdr] conditions: kibana: - version: "^8.13.0" + version: ^8.16.0 icons: - src: /img/logo-integrations-crowdstrike.svg title: CrowdStrike @@ -53,6 +53,9 @@ policy_templates: - type: aws-s3 title: "Collect CrowdStrike Falcon Data Replicator logs (input: aws-s3)" description: "Collecting logs from CrowdStrike Falcon Data Replicator (input: aws-s3)" + - type: streaming + title: Collect CrowdStrike Falcon Logs via Event Stream + description: Collecting CrowdStrike Falcon Logs via Event Stream. - type: cel title: Collect CrowdStrike logs via API description: Collecting CrowdStrike logs via API. From 2679eec3473c3fa7193f70a72f88cd2e992ebab1 Mon Sep 17 00:00:00 2001 From: mohitjha-elastic Date: Tue, 19 Nov 2024 18:00:08 +0530 Subject: [PATCH 2/7] Update the changelog entry. --- packages/crowdstrike/changelog.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/crowdstrike/changelog.yml b/packages/crowdstrike/changelog.yml index c9a3dc6d3a1..4735bc6f3d1 100644 --- a/packages/crowdstrike/changelog.yml +++ b/packages/crowdstrike/changelog.yml @@ -3,10 +3,10 @@ changes: - description: Add Support of CrowdStrike Event Stream. type: enhancement - link: https://github.com/elastic/integrations/pull/1 + link: https://github.com/elastic/integrations/pull/11773 - description: Update the minimum kibana version to 8.16.0. type: enhancement - link: https://github.com/elastic/integrations/pull/1 + link: https://github.com/elastic/integrations/pull/11773 - version: "1.45.0" changes: - description: Add support for FQL queries in `alert` and `host` data streams. From e44c3fd24af9626e8798a749763d686be2c03c4b Mon Sep 17 00:00:00 2001 From: mohitjha-elastic Date: Wed, 20 Nov 2024 16:21:31 +0530 Subject: [PATCH 3/7] Resolve review comments by @efd6. Add new line at last in input file. Remove the byte conversion. --- .../data_stream/falcon/agent/stream/streaming.yml.hbs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs b/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs index 932c268eec4..79eab67dcaf 100644 --- a/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs +++ b/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs @@ -8,7 +8,7 @@ crowdstrike_app_id: {{app_id}} redact: fields: ~ program: | - bytes(state.response).decode_json().as(body,{ + state.response.decode_json().as(body,{ "events": { "message": body.encode_json(), } @@ -26,4 +26,4 @@ publisher_pipeline.disable_host: true processors: {{#if processors}} {{processors}} -{{/if}} \ No newline at end of file +{{/if}} From 4d5507c0f2eea0f436b8a8ff67581f2bde0a41e1 Mon Sep 17 00:00:00 2001 From: mohitjha-elastic Date: Mon, 25 Nov 2024 16:05:31 +0530 Subject: [PATCH 4/7] Resolve review comments by @efd6 Add system test. --- .../_dev/deploy/docker/docker-compose.yml | 11 ++ .../docker/streaming-mock-service/go.mod | 3 + .../docker/streaming-mock-service/main.go | 169 ++++++++++++++++++ .../test/system/test-streaming-config.yml | 12 ++ .../data_stream/falcon/sample_event.json | 90 ++++------ packages/crowdstrike/docs/README.md | 90 ++++------ 6 files changed, 259 insertions(+), 116 deletions(-) create mode 100644 packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod create mode 100644 packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go create mode 100644 packages/crowdstrike/data_stream/falcon/_dev/test/system/test-streaming-config.yml diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml index 0f16cbc2242..0895426779c 100644 --- a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml @@ -6,3 +6,14 @@ services: - ./sample_logs:/sample_logs:ro - ${SERVICE_LOGS_DIR}:/var/log command: /bin/sh -c "cp /sample_logs/* /var/log/" + crowdstrike-streaming: + image: golang:1.21-alpine + hostname: crowdstrike + working_dir: /app + volumes: + - ./streaming-mock-service:/app + ports: + - "8090:8090" + environment: + PORT: '8090' + command: ["go", "run", "main.go"] diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod new file mode 100644 index 00000000000..9cd62400e59 --- /dev/null +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod @@ -0,0 +1,3 @@ +module streaming-mock-service + +go 1.21.3 diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go new file mode 100644 index 00000000000..767546fe511 --- /dev/null +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go @@ -0,0 +1,169 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "strings" +) + +// Mock OAuth2 client credentials +var mockClientID = "xxxx" +var mockClientSecret = "xxxx" +var datafeedCalled bool = false + +// Mock OAuth2 token +var mockAccessToken = "abcd" +var mockSessionToken = "xyz" + +// Mock dataFeedURL (simulating a URL to fetch events from) +var mockDataFeedURL = "http://svc-crowdstrike-streaming:8090/events" + +// Mock OAuth2 Token endpoint +func mockTokenHandler(w http.ResponseWriter, r *http.Request) { + log.Printf("Received request: Method=%s, URL=%s", r.Method, r.URL.String()) + log.Printf("Headers: %+v", r.Header) + // Only allow POST method + if r.Method != http.MethodPost { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + return + } + + // Parse form data (client credentials) + err := r.ParseForm() + if err != nil { + http.Error(w, "Error parsing form data", http.StatusBadRequest) + return + } + + // Get client credentials from the form + clientID := r.FormValue("client_id") + clientSecret := r.FormValue("client_secret") + + // Validate client credentials + if clientID == mockClientID && clientSecret == mockClientSecret { + // Mock success, return an access token + tokenResponse := map[string]string{ + "access_token": mockAccessToken, + "token_type": "bearer", + } + log.Printf("Returning auth token") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(tokenResponse) + } else { + // Invalid credentials + http.Error(w, "Invalid client credentials", http.StatusUnauthorized) + } +} + +// Middleware to verify OAuth2 Bearer token +func verifyOAuth2Token(r *http.Request) error { + // Extract OAuth2 token from the Authorization header + authHeader := r.Header.Get("Authorization") + if !strings.HasPrefix(authHeader, "Bearer ") { + return fmt.Errorf("missing Bearer token") + } + tokenString := strings.TrimPrefix(authHeader, "Bearer ") + + // Validate the token + if tokenString != mockAccessToken { + return fmt.Errorf("invalid token") + } + return nil +} + +// Returns a resource with a dataFeedURL and sessionToken +func resourceHandler(w http.ResponseWriter, r *http.Request) { + log.Printf("Received request: Method=%s, URL=%s", r.Method, r.URL.String()) + log.Printf("Headers: %+v", r.Header) + if datafeedCalled { + fmt.Println("Datafeed has already been called. Exiting...") + os.Exit(0) // Exit the program, or use return if you don't want to terminate + } + datafeedCalled = true + err := verifyOAuth2Token(r) + if err != nil { + http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) + return + } + + resource := map[string]interface{}{ + "resources": []map[string]interface{}{ + { + "dataFeedURL": mockDataFeedURL, + "sessionToken": map[string]interface{}{ + "token": mockSessionToken, + "expiration": "2025-11-18T09:21:31.767185156Z", + }, + "refreshActiveSessionURL": mockDataFeedURL, + "refreshActiveSessionInterval": 1800, + }, + }, + } + log.Printf("Returning datafeed url.") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resource) +} + +type streamReader struct { + data []string + index int +} + +func (s *streamReader) Read(p []byte) (n int, err error) { + if s.index >= len(s.data) { + return 0, io.EOF // No more data to stream + } + chunk := s.data[s.index] + s.index++ + copy(p, chunk) + return len(chunk), nil +} + +func streamData(w http.ResponseWriter, r *http.Request) { + // Set the response header for streaming + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Transfer-Encoding", "chunked") // This will stream data in chunks + + // Prepare data to stream + data := []string{ + `{"metadata":{"customerIDString":"abcabcabc22221","offset":8695284,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}}`, + `{"metadata":{"customerIDString":"abcabcabc22222","offset":8695285,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}}`, + `{"metadata":{"customerIDString":"abcabcabc22223","offset":8695286,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}}`, + } + + // Create a custom reader for streaming data + reader := &streamReader{ + data: data, + index: 0, + } + + // Stream the data using the custom reader + _, err := io.Copy(w, reader) + if err != nil { + http.Error(w, "Error while streaming data", http.StatusInternalServerError) + return + } + + log.Println("Data streaming completed, closing connection.") +} + +func main() { + // Setup routes + http.HandleFunc("/oauth2/token", mockTokenHandler) // Token endpoint + http.HandleFunc("/sensors/entities/datafeed/v2", resourceHandler) // Resource endpoint + http.HandleFunc("/", streamData) // Event stream endpoint + + // Start the server + port := ":8090" + log.Printf("Starting mock OAuth2 server on %s...", port) + err := http.ListenAndServe(port, nil) + if err != nil { + log.Fatal("Server failed to start:", err) + } +} diff --git a/packages/crowdstrike/data_stream/falcon/_dev/test/system/test-streaming-config.yml b/packages/crowdstrike/data_stream/falcon/_dev/test/system/test-streaming-config.yml new file mode 100644 index 00000000000..58d014c8541 --- /dev/null +++ b/packages/crowdstrike/data_stream/falcon/_dev/test/system/test-streaming-config.yml @@ -0,0 +1,12 @@ +input: streaming +service: crowdstrike-streaming +data_stream: + vars: + url: http://{{Hostname}}:{{Port}} + client_id: xxxx + client_secret: xxxx + token_url: http://{{Hostname}}:{{Port}}/oauth2/token + app_id: xxxx + preserve_original_event: true +assert: + hit_count: 3 diff --git a/packages/crowdstrike/data_stream/falcon/sample_event.json b/packages/crowdstrike/data_stream/falcon/sample_event.json index ffc1cb8d9c0..ca0a5e8c5a3 100644 --- a/packages/crowdstrike/data_stream/falcon/sample_event.json +++ b/packages/crowdstrike/data_stream/falcon/sample_event.json @@ -1,108 +1,82 @@ { - "@timestamp": "2020-02-12T21:29:10.000Z", + "@timestamp": "2023-11-02T13:41:34.000Z", "agent": { - "ephemeral_id": "fe495f50-2dbf-43ee-9c49-b35ef8bf9235", - "id": "df7cb44a-7978-449c-992e-c6b22e788ae9", + "ephemeral_id": "15e08fe3-d195-449b-b73c-b8b2a9dee02a", + "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "name": "docker-fleet-agent", "type": "filebeat", - "version": "8.11.0" + "version": "8.16.0" }, "crowdstrike": { "event": { - "AuditKeyValues": [ - { - "Key": "APIClientID", - "ValueString": "1234567890abcdefghijklmnopqr" - }, - { - "Key": "partition", - "ValueString": "0" - }, - { - "Key": "offset", - "ValueString": "-1" - }, - { - "Key": "appId", - "ValueString": "siem-connector-v2.0.0" - }, - { - "Key": "eventType", - "ValueString": "[UserActivityAuditEvent HashSpreadingEvent RemoteResponseSessionStartEvent RemoteResponseSessionEndEvent DetectionSummaryEvent AuthActivityAuditEvent]" - } - ], - "OperationName": "streamStarted", - "Success": true + "AgentIdString": "fffffffff33333", + "SessionId": "1111-fffff-4bb4-99c1-74c13cfc3e5a" }, "metadata": { - "customerIDString": "8f69fe9e-b995-4204-95ad-44f9bcf75b6b", - "eventType": "AuthActivityAuditEvent", - "offset": 0, + "customerIDString": "abcabcabc22221", + "eventType": "RemoteResponseSessionStartEvent", + "offset": 8695284, "version": "1.0" } }, "data_stream": { "dataset": "crowdstrike.falcon", - "namespace": "ep", + "namespace": "59294", "type": "logs" }, "ecs": { "version": "8.11.0" }, "elastic_agent": { - "id": "df7cb44a-7978-449c-992e-c6b22e788ae9", + "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "snapshot": false, - "version": "8.11.0" + "version": "8.16.0" }, "event": { "action": [ - "streamStarted" + "remote_response_session_start_event" ], "agent_id_status": "verified", "category": [ - "iam" + "network", + "session" ], - "created": "2020-02-12T21:29:10.710Z", + "created": "2023-11-02T13:41:34.000Z", "dataset": "crowdstrike.falcon", - "ingested": "2024-01-29T08:59:16Z", + "ingested": "2024-11-25T08:23:15Z", "kind": "event", - "original": "{\n \"metadata\": {\n \"customerIDString\": \"8f69fe9e-b995-4204-95ad-44f9bcf75b6b\",\n \"offset\": 0,\n \"eventType\": \"AuthActivityAuditEvent\",\n \"eventCreationTime\": 1581542950710,\n \"version\": \"1.0\"\n },\n \"event\": {\n \"UserId\": \"api-client-id:1234567890abcdefghijklmnopqrstuvwxyz\",\n \"UserIp\": \"10.10.0.8\",\n \"OperationName\": \"streamStarted\",\n \"ServiceName\": \"Crowdstrike Streaming API\",\n \"Success\": true,\n \"UTCTimestamp\": 1581542950,\n \"AuditKeyValues\": [\n {\n \"Key\": \"APIClientID\",\n \"ValueString\": \"1234567890abcdefghijklmnopqr\"\n },\n {\n \"Key\": \"partition\",\n \"ValueString\": \"0\"\n },\n {\n \"Key\": \"offset\",\n \"ValueString\": \"-1\"\n },\n {\n \"Key\": \"appId\",\n \"ValueString\": \"siem-connector-v2.0.0\"\n },\n {\n \"Key\": \"eventType\",\n \"ValueString\": \"[UserActivityAuditEvent HashSpreadingEvent RemoteResponseSessionStartEvent RemoteResponseSessionEndEvent DetectionSummaryEvent AuthActivityAuditEvent]\"\n }\n ]\n }\n}", - "outcome": "success" + "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":8695284,\"version\":\"1.0\"}}", + "start": "2023-11-02T13:41:34.000Z", + "type": [ + "start" + ] }, - "input": { - "type": "log" + "host": { + "name": "UKCHUDL00206" }, - "log": { - "file": { - "path": "/tmp/service_logs/falcon-audit-events.log" - }, - "flags": [ - "multiline" - ], - "offset": 910 + "input": { + "type": "streaming" }, - "message": "Crowdstrike Streaming API", + "message": "Remote response session started.", "observer": { "product": "Falcon", "vendor": "Crowdstrike" }, "related": { - "ip": [ - "10.10.0.8" + "hosts": [ + "UKCHUDL00206" ], "user": [ - "api-client-id:1234567890abcdefghijklmnopqrstuvwxyz" + "admin.rose@example.com" ] }, - "source": { - "ip": "10.10.0.8" - }, "tags": [ "preserve_original_event", "forwarded", "crowdstrike-falcon" ], "user": { - "name": "api-client-id:1234567890abcdefghijklmnopqrstuvwxyz" + "email": "admin.rose@example.com", + "name": "admin.rose@example.com" } } \ No newline at end of file diff --git a/packages/crowdstrike/docs/README.md b/packages/crowdstrike/docs/README.md index ac5fab19177..42e18fb8140 100644 --- a/packages/crowdstrike/docs/README.md +++ b/packages/crowdstrike/docs/README.md @@ -1058,111 +1058,85 @@ An example event for `falcon` looks as following: ```json { - "@timestamp": "2020-02-12T21:29:10.000Z", + "@timestamp": "2023-11-02T13:41:34.000Z", "agent": { - "ephemeral_id": "fe495f50-2dbf-43ee-9c49-b35ef8bf9235", - "id": "df7cb44a-7978-449c-992e-c6b22e788ae9", + "ephemeral_id": "15e08fe3-d195-449b-b73c-b8b2a9dee02a", + "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "name": "docker-fleet-agent", "type": "filebeat", - "version": "8.11.0" + "version": "8.16.0" }, "crowdstrike": { "event": { - "AuditKeyValues": [ - { - "Key": "APIClientID", - "ValueString": "1234567890abcdefghijklmnopqr" - }, - { - "Key": "partition", - "ValueString": "0" - }, - { - "Key": "offset", - "ValueString": "-1" - }, - { - "Key": "appId", - "ValueString": "siem-connector-v2.0.0" - }, - { - "Key": "eventType", - "ValueString": "[UserActivityAuditEvent HashSpreadingEvent RemoteResponseSessionStartEvent RemoteResponseSessionEndEvent DetectionSummaryEvent AuthActivityAuditEvent]" - } - ], - "OperationName": "streamStarted", - "Success": true + "AgentIdString": "fffffffff33333", + "SessionId": "1111-fffff-4bb4-99c1-74c13cfc3e5a" }, "metadata": { - "customerIDString": "8f69fe9e-b995-4204-95ad-44f9bcf75b6b", - "eventType": "AuthActivityAuditEvent", - "offset": 0, + "customerIDString": "abcabcabc22221", + "eventType": "RemoteResponseSessionStartEvent", + "offset": 8695284, "version": "1.0" } }, "data_stream": { "dataset": "crowdstrike.falcon", - "namespace": "ep", + "namespace": "59294", "type": "logs" }, "ecs": { "version": "8.11.0" }, "elastic_agent": { - "id": "df7cb44a-7978-449c-992e-c6b22e788ae9", + "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "snapshot": false, - "version": "8.11.0" + "version": "8.16.0" }, "event": { "action": [ - "streamStarted" + "remote_response_session_start_event" ], "agent_id_status": "verified", "category": [ - "iam" + "network", + "session" ], - "created": "2020-02-12T21:29:10.710Z", + "created": "2023-11-02T13:41:34.000Z", "dataset": "crowdstrike.falcon", - "ingested": "2024-01-29T08:59:16Z", + "ingested": "2024-11-25T08:23:15Z", "kind": "event", - "original": "{\n \"metadata\": {\n \"customerIDString\": \"8f69fe9e-b995-4204-95ad-44f9bcf75b6b\",\n \"offset\": 0,\n \"eventType\": \"AuthActivityAuditEvent\",\n \"eventCreationTime\": 1581542950710,\n \"version\": \"1.0\"\n },\n \"event\": {\n \"UserId\": \"api-client-id:1234567890abcdefghijklmnopqrstuvwxyz\",\n \"UserIp\": \"10.10.0.8\",\n \"OperationName\": \"streamStarted\",\n \"ServiceName\": \"Crowdstrike Streaming API\",\n \"Success\": true,\n \"UTCTimestamp\": 1581542950,\n \"AuditKeyValues\": [\n {\n \"Key\": \"APIClientID\",\n \"ValueString\": \"1234567890abcdefghijklmnopqr\"\n },\n {\n \"Key\": \"partition\",\n \"ValueString\": \"0\"\n },\n {\n \"Key\": \"offset\",\n \"ValueString\": \"-1\"\n },\n {\n \"Key\": \"appId\",\n \"ValueString\": \"siem-connector-v2.0.0\"\n },\n {\n \"Key\": \"eventType\",\n \"ValueString\": \"[UserActivityAuditEvent HashSpreadingEvent RemoteResponseSessionStartEvent RemoteResponseSessionEndEvent DetectionSummaryEvent AuthActivityAuditEvent]\"\n }\n ]\n }\n}", - "outcome": "success" + "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":8695284,\"version\":\"1.0\"}}", + "start": "2023-11-02T13:41:34.000Z", + "type": [ + "start" + ] }, - "input": { - "type": "log" + "host": { + "name": "UKCHUDL00206" }, - "log": { - "file": { - "path": "/tmp/service_logs/falcon-audit-events.log" - }, - "flags": [ - "multiline" - ], - "offset": 910 + "input": { + "type": "streaming" }, - "message": "Crowdstrike Streaming API", + "message": "Remote response session started.", "observer": { "product": "Falcon", "vendor": "Crowdstrike" }, "related": { - "ip": [ - "10.10.0.8" + "hosts": [ + "UKCHUDL00206" ], "user": [ - "api-client-id:1234567890abcdefghijklmnopqrstuvwxyz" + "admin.rose@example.com" ] }, - "source": { - "ip": "10.10.0.8" - }, "tags": [ "preserve_original_event", "forwarded", "crowdstrike-falcon" ], "user": { - "name": "api-client-id:1234567890abcdefghijklmnopqrstuvwxyz" + "email": "admin.rose@example.com", + "name": "admin.rose@example.com" } } ``` From 018d08136814b99f3b11a6f5e993ce92ce4d0015 Mon Sep 17 00:00:00 2001 From: mohitjha-elastic Date: Tue, 26 Nov 2024 20:05:50 +0530 Subject: [PATCH 5/7] Resolve review comments by @efd6 1. Update golang version. 2. Use caller before calee. 3. Replace logs with log files. --- .../_dev/deploy/docker/docker-compose.yml | 5 +- .../sample_logs/falcon-event-stream.log | 3 + .../docker/streaming-mock-service/go.mod | 2 +- .../docker/streaming-mock-service/main.go | 138 ++++++++++-------- .../data_stream/falcon/sample_event.json | 6 +- packages/crowdstrike/docs/README.md | 6 +- 6 files changed, 91 insertions(+), 69 deletions(-) create mode 100644 packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/sample_logs/falcon-event-stream.log diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml index 0895426779c..9f6e320183c 100644 --- a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml @@ -7,13 +7,14 @@ services: - ${SERVICE_LOGS_DIR}:/var/log command: /bin/sh -c "cp /sample_logs/* /var/log/" crowdstrike-streaming: - image: golang:1.21-alpine + image: golang:1.23-alpine hostname: crowdstrike working_dir: /app volumes: + - ./sample_logs:/sample_logs:ro - ./streaming-mock-service:/app ports: - "8090:8090" environment: PORT: '8090' - command: ["go", "run", "main.go"] + command: ["go", "run", "main.go", "--client_id", "xxxx", "--client_secret", "xxxx", "--access_token", "abcd", "--session_token", "xyz", "--file", "/sample_logs/falcon-event-stream.log", "--mock_datafeed_url", "http://svc-crowdstrike-streaming:8090/events"] diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/sample_logs/falcon-event-stream.log b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/sample_logs/falcon-event-stream.log new file mode 100644 index 00000000000..c6163f1e7ee --- /dev/null +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/sample_logs/falcon-event-stream.log @@ -0,0 +1,3 @@ +{"metadata":{"customerIDString":"abcabcabc22221","offset":8695284,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}} +{"metadata":{"customerIDString":"abcabcabc22222","offset":8695285,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}} +{"metadata":{"customerIDString":"abcabcabc22223","offset":8695286,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}} diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod index 9cd62400e59..1dc854f0492 100644 --- a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod @@ -1,3 +1,3 @@ module streaming-mock-service -go 1.21.3 +go 1.23.0 diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go index 767546fe511..eca1d6ffc53 100644 --- a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go @@ -1,7 +1,9 @@ package main import ( + "bufio" "encoding/json" + "flag" "fmt" "io" "log" @@ -10,17 +12,31 @@ import ( "strings" ) -// Mock OAuth2 client credentials -var mockClientID = "xxxx" -var mockClientSecret = "xxxx" -var datafeedCalled bool = false +var ( + mockClientID = flag.String("client_id", "", "Mock Client ID") + mockClientSecret = flag.String("client_secret", "", "Mock Client Secret") + mockAccessToken = flag.String("access_token", "", "Mock Access Token") + mockSessionToken = flag.String("session_token", "", "Mock Session Token") + filePath = flag.String("file", "", "Path to the input file") + mockDataFeedURL = flag.String("mock_datafeed_url", "", "Mock DataFeed URL") + datafeedCalled = flag.Bool("datafeed_called", false, "Mock Datafeed Called") +) -// Mock OAuth2 token -var mockAccessToken = "abcd" -var mockSessionToken = "xyz" +func main() { + flag.Parse() + // Setup routes + http.HandleFunc("/oauth2/token", mockTokenHandler) // Token endpoint + http.HandleFunc("/sensors/entities/datafeed/v2", resourceHandler) // Resource endpoint + http.HandleFunc("/", streamData) // Event stream endpoint -// Mock dataFeedURL (simulating a URL to fetch events from) -var mockDataFeedURL = "http://svc-crowdstrike-streaming:8090/events" + // Start the server + port := ":8090" + log.Printf("Starting mock OAuth2 server on %s...", port) + err := http.ListenAndServe(port, nil) + if err != nil { + log.Fatal("Server failed to start:", err) + } +} // Mock OAuth2 Token endpoint func mockTokenHandler(w http.ResponseWriter, r *http.Request) { @@ -44,10 +60,10 @@ func mockTokenHandler(w http.ResponseWriter, r *http.Request) { clientSecret := r.FormValue("client_secret") // Validate client credentials - if clientID == mockClientID && clientSecret == mockClientSecret { + if clientID == *mockClientID && clientSecret == *mockClientSecret { // Mock success, return an access token tokenResponse := map[string]string{ - "access_token": mockAccessToken, + "access_token": *mockAccessToken, "token_type": "bearer", } log.Printf("Returning auth token") @@ -60,31 +76,15 @@ func mockTokenHandler(w http.ResponseWriter, r *http.Request) { } } -// Middleware to verify OAuth2 Bearer token -func verifyOAuth2Token(r *http.Request) error { - // Extract OAuth2 token from the Authorization header - authHeader := r.Header.Get("Authorization") - if !strings.HasPrefix(authHeader, "Bearer ") { - return fmt.Errorf("missing Bearer token") - } - tokenString := strings.TrimPrefix(authHeader, "Bearer ") - - // Validate the token - if tokenString != mockAccessToken { - return fmt.Errorf("invalid token") - } - return nil -} - // Returns a resource with a dataFeedURL and sessionToken func resourceHandler(w http.ResponseWriter, r *http.Request) { log.Printf("Received request: Method=%s, URL=%s", r.Method, r.URL.String()) log.Printf("Headers: %+v", r.Header) - if datafeedCalled { + if *datafeedCalled { fmt.Println("Datafeed has already been called. Exiting...") os.Exit(0) // Exit the program, or use return if you don't want to terminate } - datafeedCalled = true + *datafeedCalled = true err := verifyOAuth2Token(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) @@ -94,12 +94,12 @@ func resourceHandler(w http.ResponseWriter, r *http.Request) { resource := map[string]interface{}{ "resources": []map[string]interface{}{ { - "dataFeedURL": mockDataFeedURL, + "dataFeedURL": *mockDataFeedURL, "sessionToken": map[string]interface{}{ - "token": mockSessionToken, + "token": *mockSessionToken, "expiration": "2025-11-18T09:21:31.767185156Z", }, - "refreshActiveSessionURL": mockDataFeedURL, + "refreshActiveSessionURL": *mockDataFeedURL, "refreshActiveSessionInterval": 1800, }, }, @@ -110,31 +110,54 @@ func resourceHandler(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(resource) } +// Middleware to verify OAuth2 Bearer token +func verifyOAuth2Token(r *http.Request) error { + // Extract OAuth2 token from the Authorization header + authHeader := r.Header.Get("Authorization") + if !strings.HasPrefix(authHeader, "Bearer ") { + return fmt.Errorf("missing Bearer token") + } + tokenString := strings.TrimPrefix(authHeader, "Bearer ") + + // Validate the token + if tokenString != *mockAccessToken { + return fmt.Errorf("invalid token") + } + return nil +} + type streamReader struct { data []string index int } -func (s *streamReader) Read(p []byte) (n int, err error) { - if s.index >= len(s.data) { - return 0, io.EOF // No more data to stream - } - chunk := s.data[s.index] - s.index++ - copy(p, chunk) - return len(chunk), nil -} - func streamData(w http.ResponseWriter, r *http.Request) { // Set the response header for streaming w.Header().Set("Content-Type", "application/json") w.Header().Set("Transfer-Encoding", "chunked") // This will stream data in chunks + file, err := os.Open(*filePath) + if err != nil { + log.Fatalf("Error opening file: %v", err) + } + defer file.Close() // Ensure the file is closed when done + + // Initialize a slice to hold the JSON strings + var data []string + + // Create a scanner to read the file line by line + scanner := bufio.NewScanner(file) + + // Read the file line by line + for scanner.Scan() { + line := scanner.Text() // Get the current line + + // Skip empty lines + if len(line) == 0 { + continue + } - // Prepare data to stream - data := []string{ - `{"metadata":{"customerIDString":"abcabcabc22221","offset":8695284,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}}`, - `{"metadata":{"customerIDString":"abcabcabc22222","offset":8695285,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}}`, - `{"metadata":{"customerIDString":"abcabcabc22223","offset":8695286,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}}`, + // Append the non-empty line (JSON string) to the data slice + data = append(data, line) } // Create a custom reader for streaming data @@ -144,7 +167,7 @@ func streamData(w http.ResponseWriter, r *http.Request) { } // Stream the data using the custom reader - _, err := io.Copy(w, reader) + _, err = io.Copy(w, reader) if err != nil { http.Error(w, "Error while streaming data", http.StatusInternalServerError) return @@ -153,17 +176,12 @@ func streamData(w http.ResponseWriter, r *http.Request) { log.Println("Data streaming completed, closing connection.") } -func main() { - // Setup routes - http.HandleFunc("/oauth2/token", mockTokenHandler) // Token endpoint - http.HandleFunc("/sensors/entities/datafeed/v2", resourceHandler) // Resource endpoint - http.HandleFunc("/", streamData) // Event stream endpoint - - // Start the server - port := ":8090" - log.Printf("Starting mock OAuth2 server on %s...", port) - err := http.ListenAndServe(port, nil) - if err != nil { - log.Fatal("Server failed to start:", err) +func (s *streamReader) Read(p []byte) (n int, err error) { + if s.index >= len(s.data) { + return 0, io.EOF // No more data to stream } + chunk := s.data[s.index] + s.index++ + copy(p, chunk) + return len(chunk), nil } diff --git a/packages/crowdstrike/data_stream/falcon/sample_event.json b/packages/crowdstrike/data_stream/falcon/sample_event.json index ca0a5e8c5a3..423de43576f 100644 --- a/packages/crowdstrike/data_stream/falcon/sample_event.json +++ b/packages/crowdstrike/data_stream/falcon/sample_event.json @@ -1,7 +1,7 @@ { "@timestamp": "2023-11-02T13:41:34.000Z", "agent": { - "ephemeral_id": "15e08fe3-d195-449b-b73c-b8b2a9dee02a", + "ephemeral_id": "4e0b57dd-e769-443c-abe6-91b91aa2cf94", "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "name": "docker-fleet-agent", "type": "filebeat", @@ -21,7 +21,7 @@ }, "data_stream": { "dataset": "crowdstrike.falcon", - "namespace": "59294", + "namespace": "24027", "type": "logs" }, "ecs": { @@ -43,7 +43,7 @@ ], "created": "2023-11-02T13:41:34.000Z", "dataset": "crowdstrike.falcon", - "ingested": "2024-11-25T08:23:15Z", + "ingested": "2024-11-26T14:32:01Z", "kind": "event", "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":8695284,\"version\":\"1.0\"}}", "start": "2023-11-02T13:41:34.000Z", diff --git a/packages/crowdstrike/docs/README.md b/packages/crowdstrike/docs/README.md index 42e18fb8140..e3c9f38e0de 100644 --- a/packages/crowdstrike/docs/README.md +++ b/packages/crowdstrike/docs/README.md @@ -1060,7 +1060,7 @@ An example event for `falcon` looks as following: { "@timestamp": "2023-11-02T13:41:34.000Z", "agent": { - "ephemeral_id": "15e08fe3-d195-449b-b73c-b8b2a9dee02a", + "ephemeral_id": "4e0b57dd-e769-443c-abe6-91b91aa2cf94", "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "name": "docker-fleet-agent", "type": "filebeat", @@ -1080,7 +1080,7 @@ An example event for `falcon` looks as following: }, "data_stream": { "dataset": "crowdstrike.falcon", - "namespace": "59294", + "namespace": "24027", "type": "logs" }, "ecs": { @@ -1102,7 +1102,7 @@ An example event for `falcon` looks as following: ], "created": "2023-11-02T13:41:34.000Z", "dataset": "crowdstrike.falcon", - "ingested": "2024-11-25T08:23:15Z", + "ingested": "2024-11-26T14:32:01Z", "kind": "event", "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":8695284,\"version\":\"1.0\"}}", "start": "2023-11-02T13:41:34.000Z", From f42a1c450f75ff78048fb7b63897b8126b506346 Mon Sep 17 00:00:00 2001 From: mohitjha-elastic Date: Wed, 27 Nov 2024 11:22:10 +0530 Subject: [PATCH 6/7] Resolve comments by @efd6 1. Remove kibana version from changelog. 2. Utilize pattern feature of the MUX. 3. Add len check of chunk safety check. --- packages/crowdstrike/changelog.yml | 3 --- .../docker/streaming-mock-service/main.go | 24 +++++++++---------- .../data_stream/falcon/sample_event.json | 6 ++--- packages/crowdstrike/docs/README.md | 6 ++--- 4 files changed, 17 insertions(+), 22 deletions(-) diff --git a/packages/crowdstrike/changelog.yml b/packages/crowdstrike/changelog.yml index 37314c50b0b..9a57f9d4e6e 100644 --- a/packages/crowdstrike/changelog.yml +++ b/packages/crowdstrike/changelog.yml @@ -4,9 +4,6 @@ - description: Add Support of CrowdStrike Event Stream. type: enhancement link: https://github.com/elastic/integrations/pull/11773 - - description: Update the minimum kibana version to 8.16.0. - type: enhancement - link: https://github.com/elastic/integrations/pull/11773 - version: "1.46.0" changes: - description: Extract user and host names from the name field. diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go index eca1d6ffc53..a97fb414b4f 100644 --- a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go @@ -13,11 +13,11 @@ import ( ) var ( - mockClientID = flag.String("client_id", "", "Mock Client ID") - mockClientSecret = flag.String("client_secret", "", "Mock Client Secret") - mockAccessToken = flag.String("access_token", "", "Mock Access Token") - mockSessionToken = flag.String("session_token", "", "Mock Session Token") - filePath = flag.String("file", "", "Path to the input file") + mockClientID = flag.String("client_id", "", "Mock Client ID") + mockClientSecret = flag.String("client_secret", "", "Mock Client Secret") + mockAccessToken = flag.String("access_token", "", "Mock Access Token") + mockSessionToken = flag.String("session_token", "", "Mock Session Token") + filePath = flag.String("file", "", "Path to the input file") mockDataFeedURL = flag.String("mock_datafeed_url", "", "Mock DataFeed URL") datafeedCalled = flag.Bool("datafeed_called", false, "Mock Datafeed Called") ) @@ -25,9 +25,9 @@ var ( func main() { flag.Parse() // Setup routes - http.HandleFunc("/oauth2/token", mockTokenHandler) // Token endpoint - http.HandleFunc("/sensors/entities/datafeed/v2", resourceHandler) // Resource endpoint - http.HandleFunc("/", streamData) // Event stream endpoint + http.HandleFunc("POST /oauth2/token", mockTokenHandler) + http.HandleFunc("GET /sensors/entities/datafeed/v2", resourceHandler) // Resource endpoint + http.HandleFunc("GET /", streamData) // Event stream endpoint // Start the server port := ":8090" @@ -42,11 +42,6 @@ func main() { func mockTokenHandler(w http.ResponseWriter, r *http.Request) { log.Printf("Received request: Method=%s, URL=%s", r.Method, r.URL.String()) log.Printf("Headers: %+v", r.Header) - // Only allow POST method - if r.Method != http.MethodPost { - http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) - return - } // Parse form data (client credentials) err := r.ParseForm() @@ -182,6 +177,9 @@ func (s *streamReader) Read(p []byte) (n int, err error) { } chunk := s.data[s.index] s.index++ + if len(chunk) > len(p) { + p = append(p, make([]byte, len(chunk)-len(p))...) + } copy(p, chunk) return len(chunk), nil } diff --git a/packages/crowdstrike/data_stream/falcon/sample_event.json b/packages/crowdstrike/data_stream/falcon/sample_event.json index 423de43576f..a6243bd9169 100644 --- a/packages/crowdstrike/data_stream/falcon/sample_event.json +++ b/packages/crowdstrike/data_stream/falcon/sample_event.json @@ -1,7 +1,7 @@ { "@timestamp": "2023-11-02T13:41:34.000Z", "agent": { - "ephemeral_id": "4e0b57dd-e769-443c-abe6-91b91aa2cf94", + "ephemeral_id": "5b283a3a-afb4-4477-87e2-32f63990f868", "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "name": "docker-fleet-agent", "type": "filebeat", @@ -21,7 +21,7 @@ }, "data_stream": { "dataset": "crowdstrike.falcon", - "namespace": "24027", + "namespace": "27837", "type": "logs" }, "ecs": { @@ -43,7 +43,7 @@ ], "created": "2023-11-02T13:41:34.000Z", "dataset": "crowdstrike.falcon", - "ingested": "2024-11-26T14:32:01Z", + "ingested": "2024-11-27T05:38:00Z", "kind": "event", "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":8695284,\"version\":\"1.0\"}}", "start": "2023-11-02T13:41:34.000Z", diff --git a/packages/crowdstrike/docs/README.md b/packages/crowdstrike/docs/README.md index e3c9f38e0de..04c0cd13949 100644 --- a/packages/crowdstrike/docs/README.md +++ b/packages/crowdstrike/docs/README.md @@ -1060,7 +1060,7 @@ An example event for `falcon` looks as following: { "@timestamp": "2023-11-02T13:41:34.000Z", "agent": { - "ephemeral_id": "4e0b57dd-e769-443c-abe6-91b91aa2cf94", + "ephemeral_id": "5b283a3a-afb4-4477-87e2-32f63990f868", "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "name": "docker-fleet-agent", "type": "filebeat", @@ -1080,7 +1080,7 @@ An example event for `falcon` looks as following: }, "data_stream": { "dataset": "crowdstrike.falcon", - "namespace": "24027", + "namespace": "27837", "type": "logs" }, "ecs": { @@ -1102,7 +1102,7 @@ An example event for `falcon` looks as following: ], "created": "2023-11-02T13:41:34.000Z", "dataset": "crowdstrike.falcon", - "ingested": "2024-11-26T14:32:01Z", + "ingested": "2024-11-27T05:38:00Z", "kind": "event", "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":8695284,\"version\":\"1.0\"}}", "start": "2023-11-02T13:41:34.000Z", From a3d39f864ee58baa4cd6003ecec1b9a231dcb67d Mon Sep 17 00:00:00 2001 From: mohitjha-elastic Date: Fri, 29 Nov 2024 12:04:11 +0530 Subject: [PATCH 7/7] Resolve review comments by @efd6. Apply the patch file sent by Dan. 1. Update the system test, use stream instead of custom mock. 2. Add offset in curson in data collection. --- .../_dev/deploy/docker/docker-compose.yml | 18 +- .../_dev/deploy/docker/files/config.yml | 74 +++++++ .../sample_logs/falcon-event-stream.log | 3 - .../docker/streaming-mock-service/go.mod | 3 - .../docker/streaming-mock-service/main.go | 185 ------------------ .../test/system/test-streaming-config.yml | 6 +- .../falcon/agent/stream/streaming.yml.hbs | 5 +- .../data_stream/falcon/sample_event.json | 10 +- packages/crowdstrike/docs/README.md | 10 +- 9 files changed, 99 insertions(+), 215 deletions(-) create mode 100644 packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/files/config.yml delete mode 100644 packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/sample_logs/falcon-event-stream.log delete mode 100644 packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod delete mode 100644 packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml index 9f6e320183c..58bf52984e4 100644 --- a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/docker-compose.yml @@ -7,14 +7,14 @@ services: - ${SERVICE_LOGS_DIR}:/var/log command: /bin/sh -c "cp /sample_logs/* /var/log/" crowdstrike-streaming: - image: golang:1.23-alpine - hostname: crowdstrike - working_dir: /app - volumes: - - ./sample_logs:/sample_logs:ro - - ./streaming-mock-service:/app + image: docker.elastic.co/observability/stream:v0.17.1 ports: - - "8090:8090" + - 8080 + volumes: + - ./files:/files:ro environment: - PORT: '8090' - command: ["go", "run", "main.go", "--client_id", "xxxx", "--client_secret", "xxxx", "--access_token", "abcd", "--session_token", "xyz", "--file", "/sample_logs/falcon-event-stream.log", "--mock_datafeed_url", "http://svc-crowdstrike-streaming:8090/events"] + PORT: 8080 + command: + - http-server + - --addr=:8080 + - --config=/files/config.yml diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/files/config.yml b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/files/config.yml new file mode 100644 index 00000000000..67b2d5cc77d --- /dev/null +++ b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/files/config.yml @@ -0,0 +1,74 @@ +rules: + - path: /oauth2/token + methods: ["POST"] + query_params: + client_id: slightlysecretclientid + client_secret: totallysecretlientsecret + grant_type: client_credentials + request_headers: + Content-Type: + - "application/x-www-form-urlencoded" + responses: + - status_code: 200 + headers: + Content-Type: + - "application/json" + body: |- + {{ minify_json ` + { + "access_token": "topsecretaccesstokenthatshouldnotbeleakedforabit", + "token_type": "Bearer" + } + `}} + + - path: /sensors/entities/datafeed/v2 + methods: ["GET"] + request_headers: + authorization: ["Bearer topsecretaccesstokenthatshouldnotbeleakedforabit"] + query_params: + appId: nonsecretappid + responses: + - status_code: 200 + headers: + Content-Type: + - "application/json" + body: |- + {{ minify_json ` + { + "resources": [ + { + "dataFeedURL": "http://svc-crowdstrike-streaming:8080/events", + "sessionToken": { + "token": "secretsessiontoken" + }, + "refreshActiveSessionURL": "http://svc-crowdstrike-streaming:8080/refresh", + "refreshActiveSessionInterval": 1800 + } + ] + } + `}} + + - path: /events + methods: ["GET"] + request_headers: + authorization: ["Token secretsessiontoken"] + query_params: + offset: null + responses: + - status_code: 200 + headers: + Content-Type: + - "application/json" + body: |- + {"metadata":{"customerIDString":"abcabcabc22221","offset":1,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}} + {"metadata":{"customerIDString":"abcabcabc22222","offset":2,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}} + {"metadata":{"customerIDString":"abcabcabc22223","offset":3,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}} + + - path: /refresh + methods: ["POST"] + responses: + - status_code: 200 + headers: + Content-Type: + - "application/json" + body: '' diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/sample_logs/falcon-event-stream.log b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/sample_logs/falcon-event-stream.log deleted file mode 100644 index c6163f1e7ee..00000000000 --- a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/sample_logs/falcon-event-stream.log +++ /dev/null @@ -1,3 +0,0 @@ -{"metadata":{"customerIDString":"abcabcabc22221","offset":8695284,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}} -{"metadata":{"customerIDString":"abcabcabc22222","offset":8695285,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}} -{"metadata":{"customerIDString":"abcabcabc22223","offset":8695286,"eventType":"RemoteResponseSessionStartEvent","eventCreationTime":1698932494000,"version":"1.0"},"event":{"SessionId":"1111-fffff-4bb4-99c1-74c13cfc3e5a","HostnameField":"UKCHUDL00206","UserName":"admin.rose@example.com","StartTimestamp":1698932494,"AgentIdString":"fffffffff33333"}} diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod deleted file mode 100644 index 1dc854f0492..00000000000 --- a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module streaming-mock-service - -go 1.23.0 diff --git a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go b/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go deleted file mode 100644 index a97fb414b4f..00000000000 --- a/packages/crowdstrike/data_stream/falcon/_dev/deploy/docker/streaming-mock-service/main.go +++ /dev/null @@ -1,185 +0,0 @@ -package main - -import ( - "bufio" - "encoding/json" - "flag" - "fmt" - "io" - "log" - "net/http" - "os" - "strings" -) - -var ( - mockClientID = flag.String("client_id", "", "Mock Client ID") - mockClientSecret = flag.String("client_secret", "", "Mock Client Secret") - mockAccessToken = flag.String("access_token", "", "Mock Access Token") - mockSessionToken = flag.String("session_token", "", "Mock Session Token") - filePath = flag.String("file", "", "Path to the input file") - mockDataFeedURL = flag.String("mock_datafeed_url", "", "Mock DataFeed URL") - datafeedCalled = flag.Bool("datafeed_called", false, "Mock Datafeed Called") -) - -func main() { - flag.Parse() - // Setup routes - http.HandleFunc("POST /oauth2/token", mockTokenHandler) - http.HandleFunc("GET /sensors/entities/datafeed/v2", resourceHandler) // Resource endpoint - http.HandleFunc("GET /", streamData) // Event stream endpoint - - // Start the server - port := ":8090" - log.Printf("Starting mock OAuth2 server on %s...", port) - err := http.ListenAndServe(port, nil) - if err != nil { - log.Fatal("Server failed to start:", err) - } -} - -// Mock OAuth2 Token endpoint -func mockTokenHandler(w http.ResponseWriter, r *http.Request) { - log.Printf("Received request: Method=%s, URL=%s", r.Method, r.URL.String()) - log.Printf("Headers: %+v", r.Header) - - // Parse form data (client credentials) - err := r.ParseForm() - if err != nil { - http.Error(w, "Error parsing form data", http.StatusBadRequest) - return - } - - // Get client credentials from the form - clientID := r.FormValue("client_id") - clientSecret := r.FormValue("client_secret") - - // Validate client credentials - if clientID == *mockClientID && clientSecret == *mockClientSecret { - // Mock success, return an access token - tokenResponse := map[string]string{ - "access_token": *mockAccessToken, - "token_type": "bearer", - } - log.Printf("Returning auth token") - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(tokenResponse) - } else { - // Invalid credentials - http.Error(w, "Invalid client credentials", http.StatusUnauthorized) - } -} - -// Returns a resource with a dataFeedURL and sessionToken -func resourceHandler(w http.ResponseWriter, r *http.Request) { - log.Printf("Received request: Method=%s, URL=%s", r.Method, r.URL.String()) - log.Printf("Headers: %+v", r.Header) - if *datafeedCalled { - fmt.Println("Datafeed has already been called. Exiting...") - os.Exit(0) // Exit the program, or use return if you don't want to terminate - } - *datafeedCalled = true - err := verifyOAuth2Token(r) - if err != nil { - http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) - return - } - - resource := map[string]interface{}{ - "resources": []map[string]interface{}{ - { - "dataFeedURL": *mockDataFeedURL, - "sessionToken": map[string]interface{}{ - "token": *mockSessionToken, - "expiration": "2025-11-18T09:21:31.767185156Z", - }, - "refreshActiveSessionURL": *mockDataFeedURL, - "refreshActiveSessionInterval": 1800, - }, - }, - } - log.Printf("Returning datafeed url.") - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resource) -} - -// Middleware to verify OAuth2 Bearer token -func verifyOAuth2Token(r *http.Request) error { - // Extract OAuth2 token from the Authorization header - authHeader := r.Header.Get("Authorization") - if !strings.HasPrefix(authHeader, "Bearer ") { - return fmt.Errorf("missing Bearer token") - } - tokenString := strings.TrimPrefix(authHeader, "Bearer ") - - // Validate the token - if tokenString != *mockAccessToken { - return fmt.Errorf("invalid token") - } - return nil -} - -type streamReader struct { - data []string - index int -} - -func streamData(w http.ResponseWriter, r *http.Request) { - // Set the response header for streaming - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Transfer-Encoding", "chunked") // This will stream data in chunks - file, err := os.Open(*filePath) - if err != nil { - log.Fatalf("Error opening file: %v", err) - } - defer file.Close() // Ensure the file is closed when done - - // Initialize a slice to hold the JSON strings - var data []string - - // Create a scanner to read the file line by line - scanner := bufio.NewScanner(file) - - // Read the file line by line - for scanner.Scan() { - line := scanner.Text() // Get the current line - - // Skip empty lines - if len(line) == 0 { - continue - } - - // Append the non-empty line (JSON string) to the data slice - data = append(data, line) - } - - // Create a custom reader for streaming data - reader := &streamReader{ - data: data, - index: 0, - } - - // Stream the data using the custom reader - _, err = io.Copy(w, reader) - if err != nil { - http.Error(w, "Error while streaming data", http.StatusInternalServerError) - return - } - - log.Println("Data streaming completed, closing connection.") -} - -func (s *streamReader) Read(p []byte) (n int, err error) { - if s.index >= len(s.data) { - return 0, io.EOF // No more data to stream - } - chunk := s.data[s.index] - s.index++ - if len(chunk) > len(p) { - p = append(p, make([]byte, len(chunk)-len(p))...) - } - copy(p, chunk) - return len(chunk), nil -} diff --git a/packages/crowdstrike/data_stream/falcon/_dev/test/system/test-streaming-config.yml b/packages/crowdstrike/data_stream/falcon/_dev/test/system/test-streaming-config.yml index 58d014c8541..0cc9daccf8b 100644 --- a/packages/crowdstrike/data_stream/falcon/_dev/test/system/test-streaming-config.yml +++ b/packages/crowdstrike/data_stream/falcon/_dev/test/system/test-streaming-config.yml @@ -3,10 +3,10 @@ service: crowdstrike-streaming data_stream: vars: url: http://{{Hostname}}:{{Port}} - client_id: xxxx - client_secret: xxxx + client_id: slightlysecretclientid + client_secret: totallysecretlientsecret token_url: http://{{Hostname}}:{{Port}}/oauth2/token - app_id: xxxx + app_id: nonsecretappid preserve_original_event: true assert: hit_count: 3 diff --git a/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs b/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs index 79eab67dcaf..510db7554fb 100644 --- a/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs +++ b/packages/crowdstrike/data_stream/falcon/agent/stream/streaming.yml.hbs @@ -9,9 +9,10 @@ redact: fields: ~ program: | state.response.decode_json().as(body,{ - "events": { + ?"cursor": body.?metadata.optMap(m, {"offset": m.offset}), + "events": [{ "message": body.encode_json(), - } + }], }) tags: {{#if preserve_original_event}} diff --git a/packages/crowdstrike/data_stream/falcon/sample_event.json b/packages/crowdstrike/data_stream/falcon/sample_event.json index a6243bd9169..560ab81c2ca 100644 --- a/packages/crowdstrike/data_stream/falcon/sample_event.json +++ b/packages/crowdstrike/data_stream/falcon/sample_event.json @@ -1,7 +1,7 @@ { "@timestamp": "2023-11-02T13:41:34.000Z", "agent": { - "ephemeral_id": "5b283a3a-afb4-4477-87e2-32f63990f868", + "ephemeral_id": "72e0f5e3-f55d-4983-a209-651b6b071a1c", "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "name": "docker-fleet-agent", "type": "filebeat", @@ -15,13 +15,13 @@ "metadata": { "customerIDString": "abcabcabc22221", "eventType": "RemoteResponseSessionStartEvent", - "offset": 8695284, + "offset": 1, "version": "1.0" } }, "data_stream": { "dataset": "crowdstrike.falcon", - "namespace": "27837", + "namespace": "34236", "type": "logs" }, "ecs": { @@ -43,9 +43,9 @@ ], "created": "2023-11-02T13:41:34.000Z", "dataset": "crowdstrike.falcon", - "ingested": "2024-11-27T05:38:00Z", + "ingested": "2024-11-29T06:04:44Z", "kind": "event", - "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":8695284,\"version\":\"1.0\"}}", + "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":1,\"version\":\"1.0\"}}", "start": "2023-11-02T13:41:34.000Z", "type": [ "start" diff --git a/packages/crowdstrike/docs/README.md b/packages/crowdstrike/docs/README.md index 04c0cd13949..3972576da20 100644 --- a/packages/crowdstrike/docs/README.md +++ b/packages/crowdstrike/docs/README.md @@ -1060,7 +1060,7 @@ An example event for `falcon` looks as following: { "@timestamp": "2023-11-02T13:41:34.000Z", "agent": { - "ephemeral_id": "5b283a3a-afb4-4477-87e2-32f63990f868", + "ephemeral_id": "72e0f5e3-f55d-4983-a209-651b6b071a1c", "id": "60ae74ae-652f-4e24-9a9e-e5a00ceb1c1c", "name": "docker-fleet-agent", "type": "filebeat", @@ -1074,13 +1074,13 @@ An example event for `falcon` looks as following: "metadata": { "customerIDString": "abcabcabc22221", "eventType": "RemoteResponseSessionStartEvent", - "offset": 8695284, + "offset": 1, "version": "1.0" } }, "data_stream": { "dataset": "crowdstrike.falcon", - "namespace": "27837", + "namespace": "34236", "type": "logs" }, "ecs": { @@ -1102,9 +1102,9 @@ An example event for `falcon` looks as following: ], "created": "2023-11-02T13:41:34.000Z", "dataset": "crowdstrike.falcon", - "ingested": "2024-11-27T05:38:00Z", + "ingested": "2024-11-29T06:04:44Z", "kind": "event", - "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":8695284,\"version\":\"1.0\"}}", + "original": "{\"event\":{\"AgentIdString\":\"fffffffff33333\",\"HostnameField\":\"UKCHUDL00206\",\"SessionId\":\"1111-fffff-4bb4-99c1-74c13cfc3e5a\",\"StartTimestamp\":1698932494,\"UserName\":\"admin.rose@example.com\"},\"metadata\":{\"customerIDString\":\"abcabcabc22221\",\"eventCreationTime\":1698932494000,\"eventType\":\"RemoteResponseSessionStartEvent\",\"offset\":1,\"version\":\"1.0\"}}", "start": "2023-11-02T13:41:34.000Z", "type": [ "start"