Skip to content

Commit

Permalink
Fix mapping to Detection Finding OCSF class (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexRuiz7 committed Jun 28, 2024
1 parent a737e06 commit c5ea300
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
4 changes: 2 additions & 2 deletions integrations/amazon-security-lake/src/models/ocsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class AttackInfo(pydantic.BaseModel):

class FindingInfo(pydantic.BaseModel):
analytic: AnalyticInfo
attacks: AttackInfo
attacks: typing.List[AttackInfo]
title: str
types: typing.List[str]
uid: str
Expand Down Expand Up @@ -61,6 +61,6 @@ class DetectionFinding(pydantic.BaseModel):
risk_score: int
severity_id: int
status_id: int = 99
time: str
time: int
type_uid: int = 200401
unmapped: typing.Dict[str, typing.List[str]] = pydantic.Field()
31 changes: 19 additions & 12 deletions integrations/amazon-security-lake/src/wazuh_ocsf_converter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import pydantic
import models
import logging
from datetime import datetime

timestamp_pattern = "%Y-%m-%dT%H:%M:%S.%f%z"

def normalize(level: int) -> int:
"""
Expand Down Expand Up @@ -40,17 +42,19 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind
type_id=1,
uid=event.rule.id
),
attacks=models.ocsf.AttackInfo(
tactic=models.ocsf.TechniqueInfo(
name=", ".join(event.rule.mitre.tactic),
uid=", ".join(event.rule.mitre.id)
),
technique=models.ocsf.TechniqueInfo(
name=", ".join(event.rule.mitre.technique),
uid=", ".join(event.rule.mitre.id)
),
version="v13.1"
),
attacks=[
models.ocsf.AttackInfo(
tactic=models.ocsf.TechniqueInfo(
name=", ".join(event.rule.mitre.tactic),
uid=", ".join(event.rule.mitre.id)
),
technique=models.ocsf.TechniqueInfo(
name=", ".join(event.rule.mitre.technique),
uid=", ".join(event.rule.mitre.id)
),
version="v13.1"
)
],
title=event.rule.description,
types=[event.input.type],
uid=event.id
Expand Down Expand Up @@ -89,13 +93,16 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind
resources=resources,
risk_score=event.rule.level,
severity_id=severity_id,
time=event.timestamp,
time=to_epoch(event.timestamp),
unmapped=unmapped
)
except AttributeError as e:
logging.error(f"Error transforming event: {e}")
return {}

def to_epoch(timestamp: str) -> int:
return int(datetime.strptime(timestamp, timestamp_pattern).timestamp())


def from_json(json_line: str) -> models.wazuh.Event:
"""
Expand Down

0 comments on commit c5ea300

Please sign in to comment.