diff --git a/applications/statistics/hosts_profiling/README.md b/applications/statistics/hosts_profiling/README.md new file mode 100644 index 0000000..a8d058b --- /dev/null +++ b/applications/statistics/hosts_profiling/README.md @@ -0,0 +1,41 @@ +## Host profiling + +### Description +An application for collecting aggregated characteristics of hosts in a longer period of time, typically 24 hours. +The application produces the temporal summary of activity of every host observed over the longer period of time. + +The aggregation and observation intervals can be set by `-lw` (long window), and `-sw` (short window) params, +defining the duration of **aggregations** (-**short window**) and a duration of **observation** (=**long window**) in seconds. +The long window must be a **multiple** of short window. + +By default, the durations are set to `-lw 86400 -sw 3600` to observe and deliver in a daily interval aggregated in hourly intervals. + +The application outputs the data of host logs with temporal identification as keys. The logs contain the aggregated data delivered +from [host_stats](https://github.com/CSIRT-MU/Stream4Flow/blob/master/applications/statistics/hosts_statistics/spark/host_stats.py) +application: +- **Packets**: for each host a sum of packets transferred in each of small windows +- **Bytes**: for each host a sum of bytes transferred in each of small aggregation windows +- **Flows**: for each host a sum of flows transferred in each of small aggregation windows + +Note that the application requires a running **Kafka** and Spark's **host_stats** application with output zookeeper +and output topic **matching** the input zookeeper and input topic of **this** application. + +In addition, the **host_stats** app is supposed to deliver data in **time interval dividing** the **``-sw``**, otherwise the results + of **host_daily_profile** will be **biased**. + +### Usage: +After setting up the stream data producer with matching ``-oz `` and `` -ot ``, start the application as follows: + +- General: + +```commandline +host_daily_profile.py -iz : -it -oz : -ot -sw -lw +``` + +- Stream4Flow example: + +```commandline +/home/spark/applications/run-application.sh /home/spark/applications/hosts_profiling/host_daily_profile.py -iz producer:2181 -it host.stats -oz producer:9092 -ot results.daily -sw 3600 -lw 86400 +``` + +...starts the application with **24h** delivery interval of **1h** statistics aggregations. \ No newline at end of file diff --git a/applications/statistics/hosts_profiling/spark/host_daily_profile.py b/applications/statistics/hosts_profiling/spark/host_daily_profile.py new file mode 100644 index 0000000..a9210d7 --- /dev/null +++ b/applications/statistics/hosts_profiling/spark/host_daily_profile.py @@ -0,0 +1,311 @@ +# -*- coding: utf-8 -*- + +# +# MIT License +# +# Copyright (c) 2017 Michal Stefanik , Tomas Jirsik +# Institute of Computer Science, Masaryk University +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +""" +Description: A method for computing statistics for hosts in network. Computed statistics +for each host each window contain: + - a list of top n most active ports as sorted by a number of flows on a given port + +Usage: + host_daily_profile.py -iz : -it + -oz : -ot + + To run this on the Stream4Flow, you need to receive flows by IPFIXCol and make them available via Kafka topic. Then + you can run the example + $ ./run-application.sh .statistics/hosts_profiling/host_daily_profile.py -iz producer:2181 -it host.stats + -oz producer:9092 -ot results.daily + +""" + +import sys # Common system functions +import os # Common operating system functions +import argparse # Arguments parser +import ujson as json # Fast JSON parser +import socket # Socket interface +import time # Time handling +import ipaddress # IP address handling + +from termcolor import cprint # Colors in the console output + +from pyspark import SparkContext # Spark API +from pyspark.streaming import StreamingContext # Spark streaming API +from pyspark.streaming.kafka import KafkaUtils # Spark streaming Kafka receiver + +from kafka import KafkaProducer # Kafka Python client + +from collections import namedtuple + + +# casting structures +IPStats = namedtuple('IPStats', 'ports dst_ips http_hosts') +StatsItem = namedtuple('StatsItem', 'packets bytes flows') + +ZERO_ITEM = StatsItem(0, 0, 0) # neutral item used if no new data about the IP was collected in recent interval + +# temporal constants +# default values are overridden from input params if available +hourly_interval = 3600 # aggregation interval for one item of temporal array +daily_interval = 86400 # collection interval of all aggregations as items in temporal array + +INCREMENT = 0 + + +# temporal methods resolving the temporal array consistency and construction: + +def increment(): + """ + increments the global counter that should keep consistent with the duration of the app run in hours + """ + global INCREMENT + INCREMENT += 1 + + +def modulate_position(timestamp): + """ + counts the position in time-sorted log of IP activity as based on the timestamp attached to + the particular log in rdd + timestamp: attached timestamp + """ + result = (INCREMENT - timestamp) % time_dimension + return result + + +def update_array(array, position, value): + """ + updates an array inserting a _value_ to a chosen _position_ in an _array_ + overcomes a general restriction disabling to use an assignment in lambda statements + :param array: _array_ + :param position: _position_ + :param value: _value_ + """ + array[int(position)] = value + return array + + +def initialize_array(value, timestamp): + """ + initializes an empty array of default log length (=time_dimension) with a _value_ + inserted on a position at a given _timestamp_ + :param value: _value_ + :param timestamp: _timestamp_ + """ + return update_array(list([ZERO_ITEM] * time_dimension), modulate_position(timestamp), value) + + +def merge_init_arrays(a1, a2): + """ Merges the given arrays so that the output array contains either value of a1, or a2 for each nonzero value + Arrays should be in disjunction append -1 when both arrays are filled, so the error is traceable + :param a1 array of the size of a2 + :param a2 array of the size of a1 + :return Merged arrays + """ + merge = [] + for i in range(len(a1)): + if a1[i] != ZERO_ITEM and a2[i] != ZERO_ITEM: + # should not happen + merge.append(-1) + else: + merge.append(a1[i] if a1[i] != ZERO_ITEM else a2[i]) + + return merge + + +# post-processing methods for resulting temporal arrays: + +def send_to_kafka(data, producer, topic): + """ + Send given data to the specified kafka topic. + :param data: data to send + :param producer: producer that sends the data + :param topic: name of the receiving kafka topic + """ + # Debug print - data to be sent to kafka in resulting format + # print data + + producer.send(topic, str(data)) + + +def process_results(json_rdd, producer, topic): + """ + Transform given computation results into the JSON format and send them to the specified kafka instance. + + JSON format: + {"src_ipv4":"", + "@type":"host_stats_temporal_profile", + "stats":{ + { + : {"packets": , "bytes": , "flows": }, + : {"packets": , "bytes": , "flows": }, + ... + : {"port":, "flows":# of flows} + } + } + + Where is aggregated sum of the specified attribute in an interval of hourly_interval length + that has started in time: - ( * hourly_interval) and ended roughly in a time of send + + :param json_rrd: Map in a format: (src IP , [ IPStats(packets, bytes, flows), ..., IPStats(packets, bytes, flows) ]) + :param producer: producer that sends the data + :param topic: name of the receiving kafka topic + :return: + """ + + for ip, ip_stats in json_rdd.iteritems(): + stats_dict = dict() + for stat_idx in range(len(ip_stats)): + temporal_stats = {"packets": ip_stats[stat_idx].packets, + "bytes": ip_stats[stat_idx].bytes, + "flows": ip_stats[stat_idx].flows} + stats_dict[stat_idx] = temporal_stats + + # construct the output object in predefined format + result_dict = {"@type": "host_stats_temporal_profile", + "src_ipv4": ip, + "stats": stats_dict} + + # send the processed data in json form + send_to_kafka(json.dumps(result_dict)+"\n", producer, topic) + + # logging terminal output + print("%s: Stats of %s IPs parsed and sent" % (time.strftime("%c"), len(json_rdd.keys()))) + + +# main computation methods: + +def collect_hourly_stats(stats_json): + """ + Performs a hourly aggregation on input data, which result is to be collected as items of daily aggregation + :param stats_json: RDDs of stats in json format matching the output format of host_stats.py application + :type stats_json: Initialized spark streaming context, with data in json format as in host_stats application + """ + + stats_windowed = stats_json.window(hourly_interval, hourly_interval) + + stats_windowed_keyed = stats_windowed.map(lambda json_rdd: (json_rdd["src_ipv4"], + (json_rdd["stats"]["total"]["packets"], + json_rdd["stats"]["total"]["bytes"], + json_rdd["stats"]["total"]["flow"]) + )) + + ip_stats_summed = stats_windowed_keyed.reduceByKey(lambda current, update: (current[0] + update[0], + current[1] + update[1], + current[2] + update[2])) + + ip_stats_objected = ip_stats_summed.mapValues(lambda summed_values: (StatsItem(*summed_values), INCREMENT)) + + return ip_stats_objected + + +def collect_daily_stats(hourly_stats): + """ + Aggregation of the time stats of _small_window_data_ in a tuple format (data, timestamp) into a log vector + in format [data_t_n, data_t_n-1, ... , data_t_n-k] containing the entries of the most k recent + _small_window_data_ rdd-s where k = time_dimension (= daily_interval/hourly_interval) + :param hourly_stats: _hourly_stats_ aggregated in hourly_interval window + """ + global INCREMENT + + # set a window of DAY_WINDOW_INTERVAL on small-windowed RDDs + long_window_base = hourly_stats.window(daily_interval, hourly_interval) + + # Debug print - see how recent incoming RDDs are transformed after each HOUR_WINDOW_INTERVAL + # long_window_debug = long_window_base.map(lambda rdd: {"ip": rdd[0] + # "rdd_timestamp": rdd[1][1], + # "current_inc": INCREMENT, + # "mod_pos": modulate_position(int(rdd[1][1])), + # "value": rdd[1][0]}) + # long_window_debug.pprint() + + # Here, RDDs of small window in format (IP: (data, timestamp)) are mapped into sparse vector=[0, 0, .. , volume, 0] + # where vector has a size of time_dimension and data inserted on modulated position (see modulate_position()) + # then sparse vectors are combined/merged: "summing-up" nonzero positions (see merge_init_arrays()) + long_window_data_stream = long_window_base.map(lambda rdd: (rdd[0], initialize_array(rdd[1][0], rdd[1][1]))) \ + .reduceByKey(lambda current, update: merge_init_arrays(current, update)) + + # position counter is consistent with small window length cycle - here increments on each new data from hourly_stats + long_window_data_stream.reduce(lambda current, update: 1).foreachRDD(lambda rdd: increment()) + + # Debug print of target temporal arrays in interval of a small window + long_window_data_stream.pprint(5) + + # return the temporal arrays windowed in a daily interval + return long_window_data_stream.window(hourly_interval, daily_interval) + + +if __name__ == "__main__": + # Prepare arguments parser (automatically creates -h argument). + parser = argparse.ArgumentParser() + parser.add_argument("-iz", "--input_zookeeper", help="input zookeeper hostname:port", type=str, required=True) + parser.add_argument("-it", "--input_topic", help="input kafka topic", type=str, required=True) + + parser.add_argument("-oz", "--output_zookeeper", help="output zookeeper hostname:port", type=str, required=True) + parser.add_argument("-ot", "--output_topic", help="output kafka topic", type=str, required=True) + + parser.add_argument("-sw", "--short_window", help="small window duration", type=int, required=False) + parser.add_argument("-lw", "--long_window", help="long window duration", type=int, required=False) + + # Parse arguments. + args = parser.parse_args() + + # if input arguments are filled, override the default temporal values + if args.short_window and args.long_window: + hourly_interval = args.short_window + daily_interval = args.long_window + + time_dimension = daily_interval / hourly_interval # set a number of aggregation entries in temporal array + print("Time dimension set to %s" % time_dimension) + + # Set variables + application_name = os.path.basename(sys.argv[0]) # Application name used as identifier + kafka_partitions = 1 # Number of partitions of the input Kafka topic + + # Spark context initialization + sc = SparkContext(appName=application_name + " " + " ".join(sys.argv[1:])) # Application name used as the appName + ssc = StreamingContext(sc, 1) # Spark micro batch is 1 second + + # Initialize input DStream of flows from specified Zookeeper server and Kafka topic + input_stream = KafkaUtils.createStream(ssc, args.input_zookeeper, "spark-consumer-" + application_name, + {args.input_topic: kafka_partitions}) + + # Parse flows in the JSON format + input_stream_json = input_stream.map(lambda x: json.loads(x[1])) + + # Process data to the defined function. + hourly_host_statistics = collect_hourly_stats(input_stream_json) + daily_host_statistics = collect_daily_stats(hourly_host_statistics) + + kafka_producer = KafkaProducer(bootstrap_servers=args.output_zookeeper, + client_id="spark-producer-" + application_name) + + # Transform computed statistics into desired json format and send it to output_host as given in -oh input param + daily_host_statistics.foreachRDD(lambda rdd: process_results(rdd.collectAsMap(), kafka_producer, args.output_topic)) + + # drop the processed RDDs to balance the memory usage + daily_host_statistics.foreachRDD(lambda rdd: rdd.unpersist()) + + # Start input data processing + ssc.start() + ssc.awaitTermination()