Skip to content

Commit

Permalink
Fix minors
Browse files Browse the repository at this point in the history
Signed-off-by: Jaehyun Nam <[email protected]>
  • Loading branch information
nam-jaehyun committed May 10, 2024
1 parent a3ef3d1 commit 62564be
Show file tree
Hide file tree
Showing 28 changed files with 153 additions and 194 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: sentryflow-pr-checks
name: pr-checks

on:
pull_request:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: sentryflow-release-image
name: release

on:
push:
Expand Down
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
# SentryFlow

[![SentryFlow Docker Build](https://github.com/boanlab/numbat/actions/workflows/sentryflow-release-image.yml/badge.svg)](https://github.com/boanlab/numbat/actions/workflows/sentryflow-release-image.yml) [![CI for SentryFlow](https://github.com/boanlab/numbat/actions/workflows/ci-test-go.yml/badge.svg)](https://github.com/boanlab/numbat/actions/workflows/ci-test-go.yml) [![CI for AI](https://github.com/boanlab/sentryflow/actions/workflows/ci-test-py.yml/badge.svg)](https://github.com/boanlab/sentryflow/actions/workflows/ci-test-py.yml)
[![SentryFlow Docker Build](https://github.com/boanlab/sentryflow/actions/workflows/release.yml/badge.svg)](https://github.com/boanlab/sentryflow/actions/workflows/release.yml) [![CI for SentryFlow](https://github.com/boanlab/sentryflow/actions/workflows/ci-test-go.yml/badge.svg)](https://github.com/boanlab/sentryflow/actions/workflows/ci-test-go.yml) [![CI for AI Engine](https://github.com/boanlab/sentryflow/actions/workflows/ci-test-py.yml/badge.svg)](https://github.com/boanlab/sentryflow/actions/workflows/ci-test-py.yml)

SentryFlow is a cloud-native system for API observability and security, specializing in log collection, metric production, and data exportation.

## Architecture Overview

![Numbat_Overview](docs/sentryflow_overview.png)
![SentryFlow_Overview](docs/sentryflow_overview.png)

### Features

- Generation of API Access Logs
- Proudction of API Metrics and Statistics
- Inference of API Specifications
- Production of API Metrics
- AI-driven API Classification (Inference)

## Documentation

### Basic Information

- [Getting Started](docs/getting_started.md)
- [Use Cases](examples/README.md)

### Contribution

- [Contribution Guide](contribution/README.md)
1 change: 0 additions & 1 deletion ai-engine/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# SPDX-License-Identifier: Apache-2.0

# Dockerfile
FROM ubuntu:22.04

RUN apt-get update
Expand Down
25 changes: 10 additions & 15 deletions ai-engine/ai_engine.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
# SPDX-License-Identifier: Apache-2.0

"""SentryFlow AI API Classification Engine"""
"""SentryFlow AI Engine for API Classification"""

from concurrent import futures
from collections import Counter

import os
import grpc

from protobuf import sentryflow_metrics_pb2_grpc
from protobuf import sentryflow_metrics_pb2
from stringlifier.api import Stringlifier
from protobuf import sentryflow_metrics_pb2_grpc

from stringlifier.api import Stringlifier

class HandlerServer:
"""
Class for gRPC Servers
"""
def __init__(self):
self.server = None
self.grpc_servers = []

try:
self.listen_addr = os.environ["AI_ENGINE_ADDRESS"]
self.listen_addr = os.environ["AI_ENGINE"]
except KeyError:
self.listen_addr = "0.0.0.0:5000"

self.server = None
self.grpc_servers = []

def init_grpc_servers(self):
"""
init_grpc_servers method that initializes and registers gRPC servers
Expand All @@ -40,16 +40,16 @@ def init_grpc_servers(self):

def serve(self):
"""
serve method that starts serving gRPC servers, this is blocking function.
serve method that starts serving the gRPC servers (this is blocking function)
:return: None
"""
self.server.add_insecure_port(self.listen_addr)

print(f"[INFO] Starting to serve on {self.listen_addr}")

self.server.start()
self.server.wait_for_termination()


class GRPCServer:
"""
Abstract class for an individual gRPC Server
Expand All @@ -68,12 +68,10 @@ def unregister(self, server):
:return: None
"""


class APIClassificationServer(sentryflow_metrics_pb2_grpc.APIClassificationServicer, GRPCServer):
"""
Class for API Classification Server using Stringlifier
"""

def __init__(self):
self.stringlifier = Stringlifier()
print("[Init] Successfully initialized APIClassificationServer")
Expand All @@ -83,23 +81,20 @@ def register(self, server):

def ClassifyAPIs(self, request_iterator, _): # pylint: disable=C0103
"""
GetAPIClassification method that runs multiple API ML Classification at once
ClassifyAPIs method that runs multiple MLs for API Classification at once
:param request_iterator: The requests
:param context: The context
:return: The results
"""

for req in request_iterator:
all_paths = req.API
# for paths in all_paths:
ml_results = self.stringlifier(all_paths)

ml_counts = Counter(ml_results)
print(f"{all_paths} -> {ml_counts}")

yield sentryflow_metrics_pb2.APIClassificationResponse(APIs=ml_counts)


if __name__ == '__main__':
hs = HandlerServer()
hs.init_grpc_servers()
Expand Down
27 changes: 14 additions & 13 deletions clients/log-client/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewClient(client pb.SentryFlowClient, clientInfo *pb.ClientInfo, logCfg str
// Contact the server and print out its response
logStream, err := client.GetAPILog(context.Background(), clientInfo)
if err != nil {
log.Fatalf("[Client] Could not get log: %v", err)
log.Fatalf("[Client] Could not get API log: %v", err)
}

fd.logStream = logStream
Expand All @@ -77,7 +77,7 @@ func NewClient(client pb.SentryFlowClient, clientInfo *pb.ClientInfo, logCfg str
if metricCfg != "none" && (metricFilter == "all" || metricFilter == "api") {
amStream, err := client.GetAPIMetrics(context.Background(), clientInfo)
if err != nil {
log.Fatalf("[Client] Could not get log: %v", err)
log.Fatalf("[Client] Could not get API metrics: %v", err)
}

fd.apiMetricStream = amStream
Expand All @@ -86,7 +86,7 @@ func NewClient(client pb.SentryFlowClient, clientInfo *pb.ClientInfo, logCfg str
if metricCfg != "none" && (metricFilter == "all" || metricFilter == "envoy") {
emStream, err := client.GetEnvoyMetrics(context.Background(), clientInfo)
if err != nil {
log.Fatalf("[Client] Could not get log: %v", err)
log.Fatalf("[Client] Could not get Enovy metrics: %v", err)
}

fd.envoyMetricStream = emStream
Expand All @@ -95,18 +95,19 @@ func NewClient(client pb.SentryFlowClient, clientInfo *pb.ClientInfo, logCfg str
return fd
}

// LogRoutine Function
func (fd *Feeder) LogRoutine(logCfg string) {
// APILogRoutine Function
func (fd *Feeder) APILogRoutine(logCfg string) {
for fd.Running {
select {
default:
data, err := fd.logStream.Recv()
if err != nil {
log.Fatalf("[Client] Failed to receive a log: %v", err)
log.Fatalf("[Client] Failed to receive an API log: %v", err)
break
}

str := ""
str = str + "== Access Log ==\n"
str = str + "== API Log ==\n"
str = str + fmt.Sprintf("%v\n", data)

if logCfg == "stdout" {
Expand All @@ -120,14 +121,14 @@ func (fd *Feeder) LogRoutine(logCfg string) {
}
}

// APIMetricRoutine Function
func (fd *Feeder) APIMetricRoutine(metricCfg string) {
// APIMetricsRoutine Function
func (fd *Feeder) APIMetricsRoutine(metricCfg string) {
for fd.Running {
select {
default:
data, err := fd.apiMetricStream.Recv()
if err != nil {
log.Fatalf("[Client] Failed to receive metrics: %v", err)
log.Fatalf("[Client] Failed to receive API metrics: %v", err)
break
}

Expand All @@ -146,15 +147,15 @@ func (fd *Feeder) APIMetricRoutine(metricCfg string) {
}
}

// EnvoyMetricRoutine Function
func (fd *Feeder) EnvoyMetricRoutine(metricCfg string) {
// EnvoyMetricsRoutine Function
func (fd *Feeder) EnvoyMetricsRoutine(metricCfg string) {
metricKeys := []string{"GAUGE", "COUNTER", "HISTOGRAM", "SUMMARY"}
for fd.Running {
select {
default:
data, err := fd.envoyMetricStream.Recv()
if err != nil {
log.Fatalf("[Client] Failed to receive metrics: %v", err)
log.Fatalf("[Client] Failed to receive Envoy metrics: %v", err)
break
}

Expand Down
14 changes: 7 additions & 7 deletions clients/log-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func main() {
}

// Get arguments
logCfgPtr := flag.String("logCfg", "stdout", "Output location for logs, {stdout|file|none}")
metricCfgPtr := flag.String("metricCfg", "stdout", "Output location for envoy metrics and api metrics, {stdout|file|none}")
metricFilterPtr := flag.String("metricFilter", "envoy", "Filter for what kinds of envoy and api metric to receive, {api|policy|envoy}")
logCfgPtr := flag.String("logCfg", "stdout", "Output location for API logs, {stdout|file|none}")
metricCfgPtr := flag.String("metricCfg", "stdout", "Output location for API and Envoy metrics, {stdout|file|none}")
metricFilterPtr := flag.String("metricFilter", "envoy", "Filter to select specific API or Envoy metrics to receive, {api|envoy}")
flag.Parse()

if *logCfgPtr == "none" && *metricCfgPtr == "none" {
Expand Down Expand Up @@ -67,7 +67,7 @@ func main() {
defer conn.Close()

// Connected to the gRPC server
log.Printf("[gRPC] Started to collect Access Logs from %s", addr)
log.Printf("[gRPC] Started to collect Logs from %s", addr)

// Define clientInfo
clientInfo := &protobuf.ClientInfo{
Expand All @@ -81,18 +81,18 @@ func main() {
logClient := client.NewClient(sfClient, clientInfo, *logCfgPtr, *metricCfgPtr, *metricFilterPtr)

if *logCfgPtr != "none" {
go logClient.LogRoutine(*logCfgPtr)
go logClient.APILogRoutine(*logCfgPtr)
fmt.Printf("[APILog] Started to watch API logs\n")
}

if *metricCfgPtr != "none" {
if *metricFilterPtr == "all" || *metricFilterPtr == "api" {
go logClient.APIMetricRoutine(*metricCfgPtr)
go logClient.APIMetricsRoutine(*metricCfgPtr)
fmt.Printf("[Metric] Started to watch API Metrics\n")
}

if *metricFilterPtr == "all" || *metricFilterPtr == "envoy" {
go logClient.EnvoyMetricRoutine(*metricCfgPtr)
go logClient.EnvoyMetricsRoutine(*metricCfgPtr)
fmt.Printf("[Metric] Started to watch Envoy Metrics\n")
}
}
Expand Down
32 changes: 15 additions & 17 deletions clients/mongo-client/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@ func NewClient(client pb.SentryFlowClient, clientInfo *pb.ClientInfo, logCfg str
fd := &Feeder{}

fd.Running = true

fd.client = client

fd.Done = make(chan struct{})

if logCfg != "none" {
// Contact the server and print out its response
logStream, err := client.GetAPILog(context.Background(), clientInfo)
if err != nil {
log.Fatalf("[Client] Could not get log: %v", err)
log.Fatalf("[Client] Could not get API log: %v", err)
}

fd.logStream = logStream
Expand All @@ -46,7 +44,7 @@ func NewClient(client pb.SentryFlowClient, clientInfo *pb.ClientInfo, logCfg str
if metricCfg != "none" && (metricFilter == "all" || metricFilter == "api") {
amStream, err := client.GetAPIMetrics(context.Background(), clientInfo)
if err != nil {
log.Fatalf("[Client] Could not get log: %v", err)
log.Fatalf("[Client] Could not get API metrics: %v", err)
}

fd.apiMetricStream = amStream
Expand All @@ -55,7 +53,7 @@ func NewClient(client pb.SentryFlowClient, clientInfo *pb.ClientInfo, logCfg str
if metricCfg != "none" && (metricFilter == "all" || metricFilter == "envoy") {
emStream, err := client.GetEnvoyMetrics(context.Background(), clientInfo)
if err != nil {
log.Fatalf("[Client] Could not get log: %v", err)
log.Fatalf("[Client] Could not get Envoy metrics: %v", err)
}

fd.envoyMetricStream = emStream
Expand All @@ -71,59 +69,59 @@ func NewClient(client pb.SentryFlowClient, clientInfo *pb.ClientInfo, logCfg str
return fd
}

// LogRoutine Function
func (fd *Feeder) LogRoutine(logCfg string) {
// APILogRoutine Function
func (fd *Feeder) APILogRoutine(logCfg string) {
for fd.Running {
select {
default:
data, err := fd.logStream.Recv()
if err != nil {
log.Fatalf("[Client] Failed to receive a log: %v", err)
log.Fatalf("[Client] Failed to receive an API log: %v", err)
break
}
err = fd.dbHandler.InsertAPILog(data)
if err != nil {
log.Fatalf("[MongoDB] Failed to insert API Log: %v", err)
log.Fatalf("[MongoDB] Failed to insert an API log: %v", err)
}
case <-fd.Done:
return
}
}
}

// APIMetricRoutine Function
func (fd *Feeder) APIMetricRoutine(metricCfg string) {
// APIMetricsRoutine Function
func (fd *Feeder) APIMetricsRoutine(metricCfg string) {
for fd.Running {
select {
default:
data, err := fd.apiMetricStream.Recv()
if err != nil {
log.Fatalf("[Client] Failed to receive metrics: %v", err)
log.Fatalf("[Client] Failed to receive API metrics: %v", err)
break
}
err = fd.dbHandler.InsertAPIMetrics(data)
if err != nil {
log.Fatalf("[MongoDB] Failed to insert API Metrics: %v", err)
log.Fatalf("[MongoDB] Failed to insert API metrics: %v", err)
}
case <-fd.Done:
return
}
}
}

// EnvoyMetricRoutine Function
func (fd *Feeder) EnvoyMetricRoutine(metricCfg string) {
// EnvoyMetricsRoutine Function
func (fd *Feeder) EnvoyMetricsRoutine(metricCfg string) {
for fd.Running {
select {
default:
data, err := fd.envoyMetricStream.Recv()
if err != nil {
log.Fatalf("[Client] Failed to receive metrics: %v", err)
log.Fatalf("[Client] Failed to receive Envoy metrics: %v", err)
break
}
err = fd.dbHandler.InsertEnvoyMetrics(data)
if err != nil {
log.Fatalf("[MongoDB] Failed to insert Envoy Metrics: %v", err)
log.Fatalf("[MongoDB] Failed to insert Envoy metrics: %v", err)
}
case <-fd.Done:
return
Expand Down
Loading

0 comments on commit 62564be

Please sign in to comment.