Skip to content

Commit

Permalink
Create anomaly_detection.py
Browse files Browse the repository at this point in the history
  • Loading branch information
KOSASIH authored Aug 12, 2024
1 parent 74f706b commit aa0bac9
Showing 1 changed file with 64 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# anomaly_detection.py

import os
import sys
import logging
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from typing import Any, Dict, List, Optional

_LOGGER = logging.getLogger(__name__)

class AnomalyDetector:
"""Anomaly detector class."""

def __init__(self, model_type: str, threshold: float):
self.model_type = model_type
self.threshold = threshold
self.model = None

def train(self, data: pd.DataFrame) -> None:
"""Train the anomaly detector model."""
if self.model_type == 'isolation_forest':
self.model = IsolationForest(contamination=self.threshold)
elif self.model_type == 'one_class_svm':
self.model = OneClassSVM(kernel='rbf', gamma=0.1, nu=self.threshold)
else:
raise ValueError(f'Invalid model type: {self.model_type}')
self.model.fit(data)

def predict(self, data: pd.DataFrame) -> np.ndarray:
"""Predict anomalies in the data."""
return self.model.predict(data)

def evaluate(self, data: pd.DataFrame, labels: np.ndarray) -> Dict[str, float]:
"""Evaluate the anomaly detector model."""
predictions = self.predict(data)
accuracy = accuracy_score(labels, predictions)
precision = precision_score(labels, predictions)
recall = recall_score(labels, predictions)
f1 = f1_score(labels, predictions)
return {'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': f1}

def load_data(file_path: str) -> pd.DataFrame:
"""Load data from a CSV file."""
return pd.read_csv(file_path)

def main():
logging.basicConfig(level=logging.INFO)
data_file = 'data.csv'
model_type = 'isolation_forest'
threshold = 0.1
detector = AnomalyDetector(model_type, threshold)
data = load_data(data_file)
detector.train(data)
predictions = detector.predict(data)
labels = np.array([0] * len(data)) # assume all data is normal
evaluation = detector.evaluate(data, labels)
_LOGGER.info(f'Evaluation results: {evaluation}')

if __name__ == '__main__':
main()

0 comments on commit aa0bac9

Please sign in to comment.