Skip to content
This repository has been archived by the owner on Aug 17, 2021. It is now read-only.

Commit

Permalink
hosts_profiling: readme created, host_daily_profile got window durati…
Browse files Browse the repository at this point in the history
…on params
  • Loading branch information
stefanik12 committed Aug 2, 2017
1 parent b7abbbc commit 6277d52
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 17 deletions.
41 changes: 41 additions & 0 deletions applications/statistics/hosts_profiling/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
## Host profiling

### Description
An application for collecting aggregated characteristics of hosts in a longer period of time, typically 24 hours.
The application produces the temporal summary of activity of every host observed over the longer period of time.

The aggregation and observation intervals can be set by `-lw` (long window), and `-sw` (short window) params,
defining the duration of **aggregations** (-**short window**) and a duration of **observation** (=**long window**) in seconds.
The long window must be a **multiple** of short window.

By default, the durations are set to `-lw 86400 -sw 3600` to observe and deliver in a daily interval aggregated in hourly intervals.

The application outputs the data of host logs with temporal identification as keys. The logs contain the aggregated data delivered
from [host_stats](https://github.com/CSIRT-MU/Stream4Flow/blob/master/applications/statistics/hosts_statistics/spark/host_stats.py)
application:
- **Packets**: for each host a sum of packets transferred in each of small windows
- **Bytes**: for each host a sum of bytes transferred in each of small aggregation windows
- **Flows**: for each host a sum of flows transferred in each of small aggregation windows

Note that the application requires a running **Kafka** and Spark's **host_stats** application with output zookeeper
and output topic **matching** the input zookeeper and input topic of **this** application.

In addition, the **host_stats** app is supposed to deliver data in **time interval dividing** the **``-sw``**, otherwise the results
of **host_daily_profile** will be **biased**.

### Usage:
After setting up the stream data producer with matching ``-oz `` and `` -ot ``, start the application as follows:

- General:

```commandline
host_daily_profile.py -iz <input-zookeeper-hostname>:<input-zookeeper-port> -it <input-topic> -oz <output-hostname>:<output-port> -ot <output-topic> -sw <small window in secs> -lw <long window in secs>
```

- Stream4Flow example:

```commandline
/home/spark/applications/run-application.sh /home/spark/applications/hosts_profiling/host_daily_profile.py -iz producer:2181 -it host.stats -oz producer:9092 -ot results.daily -sw 3600 -lw 86400
```

...starts the application with **24h** delivery interval of **1h** statistics aggregations.
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,9 @@
ZERO_ITEM = StatsItem(0, 0, 0) # neutral item used if no new data about the IP was collected in recent interval

# temporal constants
HOURLY_INTERVAL = 3600 # aggregation interval for one item of temporal array
DAILY_INTERVAL = 86400 # collection interval of all aggregations as items in temporal array

TIME_DIMENSION = DAILY_INTERVAL / HOURLY_INTERVAL # number of aggregation entries in temporal array
print("Time dimension: %s" % TIME_DIMENSION)
# default values are overridden from input params if available
hourly_interval = 3600 # aggregation interval for one item of temporal array
daily_interval = 86400 # collection interval of all aggregations as items in temporal array

INCREMENT = 0

Expand All @@ -91,7 +89,7 @@ def modulate_position(timestamp):
the particular log in rdd
timestamp: attached timestamp
"""
result = (INCREMENT - timestamp) % TIME_DIMENSION
result = (INCREMENT - timestamp) % time_dimension
return result


Expand All @@ -109,12 +107,12 @@ def update_array(array, position, value):

def initialize_array(value, timestamp):
"""
initializes an empty array of default log length (=TIME_DIMENSION) with a _value_
initializes an empty array of default log length (=time_dimension) with a _value_
inserted on a position at a given _timestamp_
:param value: _value_
:param timestamp: _timestamp_
"""
return update_array(list([ZERO_ITEM] * TIME_DIMENSION), modulate_position(timestamp), value)
return update_array(list([ZERO_ITEM] * time_dimension), modulate_position(timestamp), value)


def merge_init_arrays(a1, a2):
Expand Down Expand Up @@ -162,12 +160,12 @@ def process_results(json_rdd, producer, topic):
<t=1>: {"packets": <val>, "bytes": <val>, "flows": <val>},
<t=2>: {"packets": <val>, "bytes": <val>, "flows": <val>},
...
<t=TIME_DIMENSION>: {"port":<port #n>, "flows":# of flows}
<t=time_dimension>: {"port":<port #n>, "flows":# of flows}
}
}
Where <val> is aggregated sum of the specified attribute in an interval of HOURLY_INTERVAL length
that has started in time: <current time> - (<entry's t> * HOURLY_INTERVAL) and ended roughly in a time of send
Where <val> is aggregated sum of the specified attribute in an interval of hourly_interval length
that has started in time: <current time> - (<entry's t> * hourly_interval) and ended roughly in a time of send
:param json_rrd: Map in a format: (src IP , [ IPStats(packets, bytes, flows), ..., IPStats(packets, bytes, flows) ])
:param producer: producer that sends the data
Expand Down Expand Up @@ -204,7 +202,7 @@ def collect_hourly_stats(stats_json):
:type stats_json: Initialized spark streaming context, with data in json format as in host_stats application
"""

stats_windowed = stats_json.window(HOURLY_INTERVAL, HOURLY_INTERVAL)
stats_windowed = stats_json.window(hourly_interval, hourly_interval)

stats_windowed_keyed = stats_windowed.map(lambda json_rdd: (json_rdd["src_ipv4"],
(json_rdd["stats"]["total"]["packets"],
Expand All @@ -225,13 +223,13 @@ def collect_daily_stats(hourly_stats):
"""
Aggregation of the time stats of _small_window_data_ in a tuple format (data, timestamp) into a log vector
in format [data_t_n, data_t_n-1, ... , data_t_n-k] containing the entries of the most k recent
_small_window_data_ rdd-s where k = TIME_DIMENSION (= DAILY_INTERVAL/HOURLY_INTERVAL)
:param hourly_stats: _hourly_stats_ aggregated in HOURLY_INTERVAL window
_small_window_data_ rdd-s where k = time_dimension (= daily_interval/hourly_interval)
:param hourly_stats: _hourly_stats_ aggregated in hourly_interval window
"""
global INCREMENT

# set a window of DAY_WINDOW_INTERVAL on small-windowed RDDs
long_window_base = hourly_stats.window(DAILY_INTERVAL, HOURLY_INTERVAL)
long_window_base = hourly_stats.window(daily_interval, hourly_interval)

# Debug print - see how recent incoming RDDs are transformed after each HOUR_WINDOW_INTERVAL
# long_window_debug = long_window_base.map(lambda rdd: {"ip": rdd[0]
Expand All @@ -242,7 +240,7 @@ def collect_daily_stats(hourly_stats):
# long_window_debug.pprint()

# Here, RDDs of small window in format (IP: (data, timestamp)) are mapped into sparse vector=[0, 0, .. , volume, 0]
# where vector has a size of TIME_DIMENSION and data inserted on modulated position (see modulate_position())
# where vector has a size of time_dimension and data inserted on modulated position (see modulate_position())
# then sparse vectors are combined/merged: "summing-up" nonzero positions (see merge_init_arrays())
long_window_data_stream = long_window_base.map(lambda rdd: (rdd[0], initialize_array(rdd[1][0], rdd[1][1]))) \
.reduceByKey(lambda current, update: merge_init_arrays(current, update))
Expand All @@ -254,7 +252,7 @@ def collect_daily_stats(hourly_stats):
long_window_data_stream.pprint(5)

# return the temporal arrays windowed in a daily interval
return long_window_data_stream.window(HOURLY_INTERVAL, DAILY_INTERVAL)
return long_window_data_stream.window(hourly_interval, daily_interval)


if __name__ == "__main__":
Expand All @@ -266,9 +264,20 @@ def collect_daily_stats(hourly_stats):
parser.add_argument("-oz", "--output_zookeeper", help="output zookeeper hostname:port", type=str, required=True)
parser.add_argument("-ot", "--output_topic", help="output kafka topic", type=str, required=True)

parser.add_argument("-sw", "--short_window", help="small window duration", type=int, required=False)
parser.add_argument("-lw", "--long_window", help="long window duration", type=int, required=False)

# Parse arguments.
args = parser.parse_args()

# if input arguments are filled, override the default temporal values
if args.short_window and args.long_window:
hourly_interval = args.short_window
daily_interval = args.long_window

time_dimension = daily_interval / hourly_interval # set a number of aggregation entries in temporal array
print("Time dimension set to %s" % time_dimension)

# Set variables
application_name = os.path.basename(sys.argv[0]) # Application name used as identifier
kafka_partitions = 1 # Number of partitions of the input Kafka topic
Expand Down

0 comments on commit 6277d52

Please sign in to comment.