Skip to content

Commit

Permalink
Merge pull request #59 from boanlab/ml-engine-rework
Browse files Browse the repository at this point in the history
Ml engine rework
  • Loading branch information
isu-kim authored Feb 20, 2024
2 parents 6623343 + 8a5f486 commit dbdd054
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 34 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/ci-test-py.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: ci-test-py
on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
py-pip-ai-sentryflow:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: '3.11'
cache: 'pip'

- name: check Python pip3
- run: pip install -r requirements.txt
- run: pip test
working-directory: ai-engine

py-lint-ai-sentryflow:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: '3.11'
cache: 'pip'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
working-directory: ai-engine

- name: Lint with Ruff
run: |
pip install ruff
ruff --output-format=github .
continue-on-error: true
working-directory: ai-engine
6 changes: 6 additions & 0 deletions ai-engine/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.idea
.git
.gitignore
protobuf
Dockerfile
__pycache__/
3 changes: 3 additions & 0 deletions ai-engine/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea/
__pycache__/
protobuf/
22 changes: 17 additions & 5 deletions ai-engine/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,26 @@
# Dockerfile
FROM ubuntu:latest

RUN apt-get update && apt-get -y install python3 python3-pip wget git
RUN apt-get update && apt-get -y install python3 python3-pip wget git

RUN git clone https://github.com/adobe/stringlifier

RUN pip3 install ./stringlifier pymongo Flask
RUN git clone https://github.com/isu-kim/stringlifier.git
WORKDIR ./stringlifier
RUN pip install .

RUN mkdir /app
WORKDIR /app
COPY /ai-engine .

# Build protobuf for Python
RUN pip install grpcio grpcio-tools
RUN mkdir protobuf/
COPY /protobuf ./protobuf

COPY . .
# Due to python import bugs, we have to compile protoc using this command
# Refer to https://github.com/protocolbuffers/protobuf/issues/1491#issuecomment-261621112 for more information on this
RUN python3 -m grpc_tools.protoc --python_out=. --pyi_out=. --grpc_python_out=. -I=. protobuf/sentryflow_metrics.proto

WORKDIR /app
RUN pip install -r requirements.txt

CMD ["python3", "ai-engine.py"]
6 changes: 3 additions & 3 deletions ai-engine/Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# SPDX-License-Identifier: Apache-2.0

IMAGE_NAME = 5GSEC/sentryflow-ai-engine
TAG = v0.1
IMAGE_NAME = 5gsec/sentryflow-ai-engine
TAG = v0.0.1

.PHONY: build

build:
docker build -t $(IMAGE_NAME):$(TAG) -f ./Dockerfile
docker build -t $(IMAGE_NAME):$(TAG) -f ./Dockerfile ../
105 changes: 81 additions & 24 deletions ai-engine/ai-engine.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,94 @@
from pymongo import MongoClient
import os
import grpc

from stringlifier.api import Stringlifier
from flask import Flask
from concurrent import futures

from protobuf import sentryflow_metrics_pb2_grpc
from protobuf import sentryflow_metrics_pb2


class HandlerServer:
"""
Class for gRPC Servers
"""
def __init__(self):
try:
self.listen_addr = os.environ["AI_ENGINE_ADDRESS"]
except KeyError:
self.listen_addr = "0.0.0.0:5000"

self.server = None
self.grpc_servers = list()

def init_grpc_servers(self):
"""
init_grpc_servers method that initializes and registers gRPC servers
:return: None
"""
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
self.grpc_servers.append(APIClassificationServer()) # @todo: make this configurable

grpc_server: GRPCServer
for grpc_server in self.grpc_servers:
grpc_server.register(self.server)

def serve(self):
"""
serve method that starts serving gRPC servers, this is blocking function.
:return: None
"""
self.server.add_insecure_port(self.listen_addr)

print("[INFO] Starting to serve on {}".format(self.listen_addr))
self.server.start()
self.server.wait_for_termination()

app = Flask(__name__)
s = Stringlifier()

@app.route('/api_metrics')
def api_metrics():
# Connect to MongoDB
client = MongoClient('mongodb://mongo:27017')
class GRPCServer:
"""
Abstract class for an individual gRPC Server
"""
def register(self, server):
"""
register method that registers gRPC service to target server
:param server: The server
:return: None
"""
pass

# Access the numbat database
db = client.numbat

# Access the access-logs collection
collection = db['access-logs']
class APIClassificationServer(sentryflow_metrics_pb2_grpc.SentryFlowMetricsServicer, GRPCServer):
"""
Class for API Classification Server using Stringlifier
"""

# Retrieve all documents from the collection
logs = list(collection.find({}))
def __init__(self):
self.stringlifier = Stringlifier()
print("[Init] Successfully initialized APIClassificationServer")

# Close the MongoDB connection
client.close()
def register(self, server):
sentryflow_metrics_pb2_grpc.add_SentryFlowMetricsServicer_to_server(self, server)

paths = list()
def GetAPIClassification(self, request_iterator, context):
"""
GetAPIClassification method that runs multiple API ML Classification at once
:param request_iterator: The requests
:param context: The context
:return: The results
"""

# Print out all entries
for log in logs:
paths.append(log["path"])
for req in request_iterator:
paths = req.paths
ml_results = self.stringlifier(paths)
print("{} -> {}".format(paths, ml_results))

parsed = s(paths)
print(set(parsed))
results = [sentryflow_metrics_pb2.APIClassificationSingleResponse(merged=ml_result, fields=[]) for ml_result
in ml_results]
yield sentryflow_metrics_pb2.APIClassificationResponse(response=results)

return str(set(parsed))

if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
hs = HandlerServer()
hs.init_grpc_servers()
hs.serve()
24 changes: 24 additions & 0 deletions ai-engine/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import uuid

import grpc

from protobuf import sentryflow_metrics_pb2_grpc
from protobuf import sentryflow_metrics_pb2

if __name__ == "__main__":
try:
listen_addr = os.environ["AI_ENGINE_ADDRESS"]
except KeyError:
listen_addr = "0.0.0.0:5000"

with grpc.insecure_channel(listen_addr) as channel:
stub = sentryflow_metrics_pb2_grpc.SentryFlowMetricsStub(channel)
req = sentryflow_metrics_pb2.APIClassificationRequest(paths=["/api/test", "/api/test/" + str(uuid.uuid4())])

try:
response_stream = stub.GetAPIClassification(req)
for response in response_stream:
print("Response: ", str(response))
except grpc.RpcError as e:
print("Error occurred during RPC:", e)
Binary file added ai-engine/requirements.txt
Binary file not shown.
37 changes: 37 additions & 0 deletions deployments/sentryflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,43 @@ metadata:
pod-security.kubernetes.io/enforce: privileged
pod-security.kubernetes.io/warn: privileged
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-engine
namespace: sentryflow
spec:
replicas: 1
selector:
matchLabels:
app: ai-engine
template:
metadata:
labels:
app: ai-engine
spec:
containers:
- name: sentryflow
image: 5gsec/sentryflow-ai-engine:v0.0.1
ports:
- containerPort: 5000
protocol: TCP
name: grpc-sentryflow
---
apiVersion: v1
kind: Service
metadata:
name: ai-engine
namespace: sentryflow
spec:
selector:
app: ai-engine
ports:
- protocol: TCP
port: 5000
targetPort: 5000
name: grpc-sentryflow
---
apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down
2 changes: 1 addition & 1 deletion protobuf/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PROTO:=sentryflow.proto
PROTO:=sentryflow.proto sentryflow_metrics.proto
PBGO:=$(PROTO:.proto=.pb.go)

.PHONY: build
Expand Down
18 changes: 18 additions & 0 deletions protobuf/sentryflow_metrics.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package protobuf;

option go_package = "sentryflow/protobuf";

message APIClassificationRequest {
string path = 1;
}

message APIClassificationResponse {
string merged = 1;
repeated string fields = 2;
}

service SentryFlowMetrics {
rpc GetAPIClassification(stream APIClassificationRequest) returns (stream APIClassificationResponse);
}
13 changes: 12 additions & 1 deletion sentryflow/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type NumbatConfig struct {
PatchNamespace bool // Enable/Disable patching namespace for Istio injection
PatchRestartDeployments bool // Enable/Disable restarting deployments after patching

AIEngineService string
AIEngineBatchSize int

Debug bool // Enable/Disable SentryFlow debug mode
}

Expand All @@ -37,6 +40,8 @@ const (
CustomExportListenPort string = "customExportListenPort"
PatchNamespace string = "patchNamespace"
PatchRestartDeployments string = "patchRestartDeployments"
AIEngineService string = "AIEngineService"
AIEngineBatchSize string = "AIEngineBatchSize"
Debug string = "debug"
)

Expand All @@ -47,6 +52,8 @@ func readCmdLineParams() {
customExportListenPortStr := flag.String(CustomExportListenPort, "8080", "Custom export gRPC server listen port")
patchNamespaceB := flag.Bool(PatchNamespace, false, "Enable/Disable patching Istio injection to all namespaces")
patchRestartDeploymentsB := flag.Bool(PatchRestartDeployments, false, "Enable/Disable restarting deployments in all namespaces")
AIEngineServiceStr := flag.String(AIEngineService, "ai-engine.sentryflow.svc.cluster.local", "Service address for SentryFlow AI Engine")
AIEngineBatchSizeInt := flag.Int(AIEngineBatchSize, 5, "Batch size fo SentryFlow AI Engine")
configDebugB := flag.Bool(Debug, false, "Enable/Disable debugging mode using logs")

var flags []string
Expand All @@ -64,6 +71,8 @@ func readCmdLineParams() {
viper.SetDefault(CustomExportListenPort, *customExportListenPortStr)
viper.SetDefault(PatchNamespace, *patchNamespaceB)
viper.SetDefault(PatchRestartDeployments, *patchRestartDeploymentsB)
viper.SetDefault(AIEngineService, *AIEngineServiceStr)
viper.SetDefault(AIEngineBatchSize, *AIEngineBatchSizeInt)
viper.SetDefault(Debug, *configDebugB)
}

Expand All @@ -76,14 +85,16 @@ func LoadConfig() error {
viper.AutomaticEnv()

// todo: read configuration from config file
_ = os.Getenv("NUMBAT_CFG")
_ = os.Getenv("SENTRYFLOW_CFG")

GlobalCfg.OtelGRPCListenAddr = viper.GetString(OtelGRPCListenAddr)
GlobalCfg.OtelGRPCListenPort = viper.GetString(OtelGRPCListenPort)
GlobalCfg.CustomExportListenAddr = viper.GetString(CustomExportListenAddr)
GlobalCfg.CustomExportListenPort = viper.GetString(CustomExportListenPort)
GlobalCfg.PatchNamespace = viper.GetBool(PatchNamespace)
GlobalCfg.PatchRestartDeployments = viper.GetBool(PatchRestartDeployments)
GlobalCfg.AIEngineService = viper.GetString(AIEngineService)
GlobalCfg.AIEngineBatchSize = viper.GetInt(AIEngineBatchSize)
GlobalCfg.Debug = viper.GetBool(Debug)

log.Printf("Configuration [%+v]", GlobalCfg)
Expand Down
3 changes: 3 additions & 0 deletions sentryflow/metrics/api/apiAnalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Analyzer struct {
perAPICount map[string]uint64
perAPICountLock sync.Mutex // @todo perhaps combine those two?

curBatchCount int
batchCountLock sync.Mutex

stopChan chan struct{}
apiJob chan string
}
Expand Down

0 comments on commit dbdd054

Please sign in to comment.