From 93d507b2cb744cf61a25ee2cb47077af77c9f4cc Mon Sep 17 00:00:00 2001 From: stefanik12 Date: Tue, 4 Jul 2017 17:14:13 +0200 Subject: [PATCH 1/7] Init commit of placeholder app (host_profile.py) --- .../hosts_profiling/host_profile.py | 242 ++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 applications/statistics/hosts_profiling/host_profile.py diff --git a/applications/statistics/hosts_profiling/host_profile.py b/applications/statistics/hosts_profiling/host_profile.py new file mode 100644 index 0000000..b9be950 --- /dev/null +++ b/applications/statistics/hosts_profiling/host_profile.py @@ -0,0 +1,242 @@ +# TODO: placeholder application - see functionality suggestions in the main method + +# -*- coding: utf-8 -*- + +# +# MIT License +# +# Copyright (c) 2017 Michal Stefanik , Tomas Jirsik +# Institute of Computer Science, Masaryk University +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +""" +Description: A method for computing statistics for hosts in network. Computed statistics +for each host each window contain: + - a list of top n most active ports as sorted by a number of flows on a given port + +Usage: + top_n_host_stats.py -iz : -it -oh + : -n -net + + To run this on the Stream4Flow, you need to receive flows by IPFIXCol and make them available via Kafka topic. Then + you can run the example + $ ./run-application.sh ./statistics/hosts_statistics/spark/top_n_host_stats.py -iz producer:2181 -it ipfix.entry + -oh consumer:20101 -n 5 -net "10.0.0.0/24" + +""" + +import sys # Common system functions +import os # Common operating system functions +import argparse # Arguments parser +import ujson as json # Fast JSON parser +import socket # Socket interface +import time # Time handling +import ipaddress # IP address handling + +from termcolor import cprint # Colors in the console output + +from pyspark import SparkContext # Spark API +from pyspark.streaming import StreamingContext # Spark streaming API +from pyspark.streaming.kafka import KafkaUtils # Spark streaming Kafka receiver + +from collections import namedtuple + +IPStats = namedtuple('IPStats', 'ports dst_ips http_hosts') +StatsItem = namedtuple('StatsItem', 'key flows type') + + +def send_data(data, output_host): + """ + Send given data to the specified host using standard socket interface. + + :param data: data to send + :param output_host: data receiver in the "hostname:port" format + """ + print data + + # Split outputHost hostname and port + host = output_host.split(':') + + # Prepare a TCP socket. + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Connect to the outputHost and send given data + try: + sock.connect((host[0], int(host[1]))) + sock.send(data) + + # Print message of sent + now = time.strftime("%c") + print("Data sent at: %s" % now) + + except socket.error: + cprint("[warning] Unable to connect to host " + output_host, "blue") + finally: + sock.close() + + +def _sort_by_flows(stats_values): + """ + Sorts the list of StatsItem by flows attribute + :param stats_values: list of StatsItem + :return: sorted list + """ + return sorted(stats_values, key=lambda entry: entry.flows, reverse=True) + + +def _parse_stats_items_list(stats_values): + """ + Parses the list of StatsItem into a dict of object for output JSON fromat + :param stats_values: list of StatsItem + :return: dict in output JSON format + """ + item_list = map(lambda stats_item: {stats_item.type: stats_item.key, "flows": stats_item.flows}, stats_values) + + # format the list of parsed StatsItems to a dict with keys of items' ordering + labeled_item_dict = {} + for order, item in map(lambda order: (order, item_list[order]), range(len(item_list))): + labeled_item_dict[int(order)] = item + + return labeled_item_dict + + +def process_results(json_rdd, n=10): + """ + Transform given computation results into the JSON format and send them to the specified host. + + JSON format: + {"src_ipv4":"", + "@type":"host_stats_topn_ports", + "stats":{ + "top_n_dst_ports": + { + "0": {"port":, "flows":# of flows}, + ... + "n": {"port":, "flows":# of flows} + }, + "top_n_dst_hosts": + { + "0": {"dst_host":, "flows":# of flows}, + ... + "n": {"dst_host":, "flows":# of flows} + } + "top_n_http_dst": + { + "0": {"dst_host":, "flows":# of flows}, + ... + "n": {"dst_host":, "flows":# of flows} + } + } + } + + :param json_rrd: Map in a format: (src IP , IPStats([PortStats], [DstIPStats], [HTTPHostStats])) + :return: json with selected TOP n ports by # of flows for each src IP + """ + + # fill in n from input params + if args.top_n is not None: + n = int(args.top_n) + + for ip, ip_stats in json_rdd.iteritems(): + # define output keys for particular stats in X_Stats named tuples + port_data_dict = {"top_n_dst_ports": ip_stats.ports, + "top_n_dst_hosts": ip_stats.dst_ips, + "top_n_http_dst": ip_stats.http_hosts} + + # take top n entries from IP's particular stats sorted by flows param + port_data_dict = {key: _sort_by_flows(val_list)[:n] for (key, val_list) in port_data_dict.iteritems()} + # parse the stats from StatsItem to a desirable form + port_data_dict = {key: _parse_stats_items_list(val_list) for (key, val_list) in port_data_dict.iteritems()} + + # construct the output object in predefined format + result_dict = {"@type": "top_n_host_stats", + "src_ipv4": ip, + "stats": port_data_dict} + + # send the processed data in json form + send_data(json.dumps(result_dict)+"\n", args.output_host) + + +def count_host_stats(flow_json): + """ + TODO + + :type flow_json: Initialized spark streaming context, windowed, json_loaded. + """ + # TODO: placeholder for host profiling app based on windowed info from host_statistics applications + # TODO: in the selected big window (supposedly 24h) is suggested to compute + # * average number of communication partners for given hosts + # * distinct ports for host, sorted by the frequency of activity + # * frequency of communication times for host + # * probability of the host being the server/client machine/network infrastructure based on + # the distance of the selected attributes' values to mean values of the selected categories + # flow_json.pprint(200) + + # Filter flows with required data, in a given address range + flow_with_keys = flow_json.filter(lambda json_rdd: ("ipfix.sourceIPv4Address" in json_rdd.keys()) and + ("ipfix.destinationTransportPort" in json_rdd.keys()) and + (ipaddress.ip_address(json_rdd["ipfix.sourceIPv4Address"]) in network_filter) + ) + # Set window and slide duration for flows analysis + flow_with_keys_windowed = flow_with_keys.window(window_duration, window_slide) + + return None + + +if __name__ == "__main__": + # Prepare arguments parser (automatically creates -h argument). + parser = argparse.ArgumentParser() + parser.add_argument("-iz", "--input_zookeeper", help="input zookeeper hostname:port", type=str, required=True) + parser.add_argument("-it", "--input_topic", help="input kafka topic", type=str, required=True) + parser.add_argument("-oh", "--output_host", help="output hostname:port", type=str, required=True) + parser.add_argument("-net", "--network_range", help="network range to watch", type=str, required=True) + parser.add_argument("-n", "--top_n", help="max. number of ports for a host to retrieve", type=int, required=False) + + # Parse arguments. + args = parser.parse_args() + + # Set variables + application_name = os.path.basename(sys.argv[0]) # Application name used as identifier + kafka_partitions = 1 # Number of partitions of the input Kafka topic + window_duration = 10 # Analysis window duration (10 seconds) + window_slide = 10 # Slide interval of the analysis window (10 seconds) + # Filter for network for detection (regex filtering), e.g. "10\.10\..+" + network_filter = ipaddress.ip_network(unicode(args.network_range, "utf8")) + + # Spark context initialization + sc = SparkContext(appName=application_name + " " + " ".join(sys.argv[1:])) # Application name used as the appName + ssc = StreamingContext(sc, 1) # Spark microbatch is 1 second + + # Initialize input DStream of flows from specified Zookeeper server and Kafka topic + input_stream = KafkaUtils.createStream(ssc, args.input_zookeeper, "spark-consumer-" + application_name, + {args.input_topic: kafka_partitions}) + + # Parse flows in the JSON format + input_stream_json = input_stream.map(lambda x: json.loads(x[1])) + + # Process data to the defined function. + host_statistics = count_host_stats(input_stream_json) + + # Transform computed statistics into desired json format and send it to output_host as given in -oh input param + stats_json = host_statistics.foreachRDD(lambda rdd: process_results(rdd.collectAsMap())) + + # Start input data processing + ssc.start() + ssc.awaitTermination() From 8349e720ca705f78aed9e1dce5c897764c6865d4 Mon Sep 17 00:00:00 2001 From: stefanik12 Date: Wed, 19 Jul 2017 16:27:02 +0200 Subject: [PATCH 2/7] Implemented tested sample of host_daily_profile app to gather temporal statistics over all day window --- ...{host_profile.py => host_daily_profile.py} | 187 +++++++++++++++--- 1 file changed, 159 insertions(+), 28 deletions(-) rename applications/statistics/hosts_profiling/{host_profile.py => host_daily_profile.py} (52%) diff --git a/applications/statistics/hosts_profiling/host_profile.py b/applications/statistics/hosts_profiling/host_daily_profile.py similarity index 52% rename from applications/statistics/hosts_profiling/host_profile.py rename to applications/statistics/hosts_profiling/host_daily_profile.py index b9be950..0b9ca7b 100644 --- a/applications/statistics/hosts_profiling/host_profile.py +++ b/applications/statistics/hosts_profiling/host_daily_profile.py @@ -32,8 +32,9 @@ - a list of top n most active ports as sorted by a number of flows on a given port Usage: +TODO top_n_host_stats.py -iz : -it -oh - : -n -net + : -n To run this on the Stream4Flow, you need to receive flows by IPFIXCol and make them available via Kafka topic. Then you can run the example @@ -58,8 +59,106 @@ from collections import namedtuple +# TODO: check these initial requirements in the end: + +# TODO: in the selected big window (supposedly 24h) is suggested to compute +# * average number of communication partners for given hosts +# * distinct ports for host, sorted by the frequency of activity +# * frequency of communication times for host +# * probability of the host being the server/client machine/network infrastructure based on +# the distance of the selected attributes' values to mean values of the selected categories +# stats_json.pprint(200) + + +# casting structures IPStats = namedtuple('IPStats', 'ports dst_ips http_hosts') -StatsItem = namedtuple('StatsItem', 'key flows type') +StatsItem = namedtuple('StatsItem', 'packets bytes flows') + +ZERO_ITEM = StatsItem(0, 0, 0) # neutral item used if no new data about the IP was collected in recent interval + +# temporal constants +HOURLY_INTERVAL = 10 # aggregation interval for one item of temporal output array +DAILY_INTERVAL = 40 # collection interval of aggregations as items in output array + +TIME_DIMENSION = DAILY_INTERVAL / HOURLY_INTERVAL +print("Time dimension: %s" % TIME_DIMENSION) + +EMPTY_TIME_DIMENSIONS = [0] * TIME_DIMENSION + +INCREMENT = 0 + +# temporal methods reassuring the temporal array consistency + + +def increment(): + """ + increments the global counter that should keep consistent with the duration of the app run in hours + """ + global INCREMENT + INCREMENT += 1 + + +def modulate_position(timestamp): + """ + counts the position in time-sorted log of IP activity as based on the timestamp attached to + the particular log in rdd + timestamp: attached timestamp + """ + result = (INCREMENT - timestamp) % TIME_DIMENSION + return result + + +def update_array(array, position, value): + """ + updates an array inserting a _value_ to a chosen _position_ in an _array_ + overcomes a general restriction disabling to use an assignment in lambda statements + :param array: _array_ + :param position: _position_ + :param value: _value_ + """ + array[int(position)] = value + return array + + +def initialize_array(value, timestamp): + """ + initializes an empty array of default log length (=TIME_DIMENSION) with a _value_ + inserted on a position at a given _timestamp_ + :param value: _value_ + :param timestamp: _timestamp_ + """ + return update_array(list([ZERO_ITEM] * TIME_DIMENSION), modulate_position(timestamp), value) + + +def align_to_long_window(rdd_set): + """ + transforms all RDDs from _rdd_set_ of a format (data, timestamp) into an initial log + of the size of the default log length (=TIME_DIMENSION) + the log contains TIME_DIMENSION of zeros except on a modulated timestamp position + filling zeros as missing values + :param rdd_set: _rdd_set_ + """ + return rdd_set.map(lambda rdd: (rdd[0], initialize_array(rdd[1][0], rdd[1][1])) \ + if len(rdd[1]) == 2 + else (rdd[0], rdd[1])) + + +def merge_init_arrays(a1, a2): + """ Merges the given arrays so that the output array contains either value of a1, or a2 for each nonzero value + Arrays should be in disjunction append -1 when both arrays are filled, so the error is traceable + :param a1 array of the size of a2 + :param a2 array of the size of a1 + :return Merges arrays + """ + merge = [] + for i in range(len(a1)): + if a1[i] != ZERO_ITEM and a2[i] != ZERO_ITEM: + # should not happen + merge.append(-1) + else: + merge.append(a1[i] if a1[i] != ZERO_ITEM else a2[i]) + + return merge def send_data(data, output_host): @@ -174,30 +273,62 @@ def process_results(json_rdd, n=10): send_data(json.dumps(result_dict)+"\n", args.output_host) -def count_host_stats(flow_json): +def collect_hourly_stats(stats_json): + """ + Performs a hourly aggregation on input data, whose result is to be collected in items of daily aggregation + :type stats_json: Initialized spark streaming context, with data in json format as in host_stats application """ - TODO - :type flow_json: Initialized spark streaming context, windowed, json_loaded. + stats_windowed = stats_json.window(HOURLY_INTERVAL, HOURLY_INTERVAL) + + stats_windowed_keyed = stats_windowed.map(lambda json_rdd: (json_rdd["src_ipv4"], + (json_rdd["stats"]["total"]["packets"], + json_rdd["stats"]["total"]["bytes"], + json_rdd["stats"]["total"]["flow"]) + )) + ip_stats_sumed = stats_windowed_keyed.reduceByKey(lambda current, update: (current[0] + update[0], + current[1] + update[1], + current[2] + update[2])) + + ip_stats_objected = ip_stats_sumed.mapValues(lambda avg_vals: (StatsItem(*avg_vals), INCREMENT)) + + return ip_stats_objected + + +def collect_daily_stats(hourly_stats): + """ + aggregation of the time stats of _small_window_data_ in a tuple format (data, timestamp) into a default log vector + in format [0, 0, ... , 0, 0, 0] containing the newest data at the beginning + appends a result of the new small window at the beginning of the time log for each IP address + :param hourly_stats: _hourly_stats_ """ - # TODO: placeholder for host profiling app based on windowed info from host_statistics applications - # TODO: in the selected big window (supposedly 24h) is suggested to compute - # * average number of communication partners for given hosts - # * distinct ports for host, sorted by the frequency of activity - # * frequency of communication times for host - # * probability of the host being the server/client machine/network infrastructure based on - # the distance of the selected attributes' values to mean values of the selected categories - # flow_json.pprint(200) + global INCREMENT - # Filter flows with required data, in a given address range - flow_with_keys = flow_json.filter(lambda json_rdd: ("ipfix.sourceIPv4Address" in json_rdd.keys()) and - ("ipfix.destinationTransportPort" in json_rdd.keys()) and - (ipaddress.ip_address(json_rdd["ipfix.sourceIPv4Address"]) in network_filter) - ) - # Set window and slide duration for flows analysis - flow_with_keys_windowed = flow_with_keys.window(window_duration, window_slide) + # set a window of DAY_WINDOW_INTERVAL upon small window RDDs + long_window_base = hourly_stats.window(DAILY_INTERVAL, HOURLY_INTERVAL) - return None + # Debug print - see how recent incoming RDDs are transformed after each HOUR_WINDOW_INTERVAL + # long_window_debug = long_window_base.map(lambda rdd: {"ip": rdd[0], + # "rdd_timestamp": rdd[1][1], + # "current_inc": INCREMENT, + # "mod_pos": modulate_position(int(rdd[1][1])), + # "value": rdd[1][0]}) + # long_window_debug.pprint(17) + + # log for each key(IP) is reduced from sparse logs of aggregated volume of activity for each time unit (=hour) + # first logs of small window in format IP: (volume, timestamp) are mapped into sparse vector=[0, 0, .. , volume, 0] + # where vector has a size of TIME_DIMENSION and volume inserted on modulated position (see modulate_position()) + # then sparse vectors are combined by merge - "summing-up": nonzero positions (see merge_init_arrays()) + long_window_data_stream = long_window_base.map(lambda rdd: (rdd[0], initialize_array(rdd[1][0], rdd[1][1]))) \ + .reduceByKey(lambda current, update: merge_init_arrays(current, update)) + + # current position counter update should keep consistent with small window counter - increment on each new data + long_window_data_stream.reduce(lambda current, update: 1).foreachRDD(lambda rdd: increment()) + + long_window_data_stream.pprint(100) + + # gives the current log after every new batch from small window + return long_window_data_stream.window(HOURLY_INTERVAL, DAILY_INTERVAL) if __name__ == "__main__": @@ -205,9 +336,9 @@ def count_host_stats(flow_json): parser = argparse.ArgumentParser() parser.add_argument("-iz", "--input_zookeeper", help="input zookeeper hostname:port", type=str, required=True) parser.add_argument("-it", "--input_topic", help="input kafka topic", type=str, required=True) - parser.add_argument("-oh", "--output_host", help="output hostname:port", type=str, required=True) - parser.add_argument("-net", "--network_range", help="network range to watch", type=str, required=True) - parser.add_argument("-n", "--top_n", help="max. number of ports for a host to retrieve", type=int, required=False) + + parser.add_argument("-oz", "--output_zookeeper", help="output zookeeper hostname:port", type=str, required=True) + parser.add_argument("-ot", "--output_topic", help="output kafka topic", type=str, required=True) # Parse arguments. args = parser.parse_args() @@ -217,8 +348,6 @@ def count_host_stats(flow_json): kafka_partitions = 1 # Number of partitions of the input Kafka topic window_duration = 10 # Analysis window duration (10 seconds) window_slide = 10 # Slide interval of the analysis window (10 seconds) - # Filter for network for detection (regex filtering), e.g. "10\.10\..+" - network_filter = ipaddress.ip_network(unicode(args.network_range, "utf8")) # Spark context initialization sc = SparkContext(appName=application_name + " " + " ".join(sys.argv[1:])) # Application name used as the appName @@ -232,10 +361,12 @@ def count_host_stats(flow_json): input_stream_json = input_stream.map(lambda x: json.loads(x[1])) # Process data to the defined function. - host_statistics = count_host_stats(input_stream_json) + hourly_host_statistics = collect_hourly_stats(input_stream_json) + daily_host_statistics = collect_daily_stats(hourly_host_statistics) # Transform computed statistics into desired json format and send it to output_host as given in -oh input param - stats_json = host_statistics.foreachRDD(lambda rdd: process_results(rdd.collectAsMap())) + # hourly_host_statistics.foreachRDD(lambda rdd: rdd.pprint()) + # daily_host_statistics.pprint(100) # Start input data processing ssc.start() From 7dee1e616d84d846a583b4072e5a90513172b856 Mon Sep 17 00:00:00 2001 From: stefanik12 Date: Mon, 24 Jul 2017 10:18:14 +0200 Subject: [PATCH 3/7] host_daily_profile app: updated documentation, tested windowing --- .../hosts_profiling/host_daily_profile.py | 38 ++++++++----------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/applications/statistics/hosts_profiling/host_daily_profile.py b/applications/statistics/hosts_profiling/host_daily_profile.py index 0b9ca7b..6e1951b 100644 --- a/applications/statistics/hosts_profiling/host_daily_profile.py +++ b/applications/statistics/hosts_profiling/host_daily_profile.py @@ -59,16 +59,6 @@ from collections import namedtuple -# TODO: check these initial requirements in the end: - -# TODO: in the selected big window (supposedly 24h) is suggested to compute -# * average number of communication partners for given hosts -# * distinct ports for host, sorted by the frequency of activity -# * frequency of communication times for host -# * probability of the host being the server/client machine/network infrastructure based on -# the distance of the selected attributes' values to mean values of the selected categories -# stats_json.pprint(200) - # casting structures IPStats = namedtuple('IPStats', 'ports dst_ips http_hosts') @@ -287,8 +277,8 @@ def collect_hourly_stats(stats_json): json_rdd["stats"]["total"]["flow"]) )) ip_stats_sumed = stats_windowed_keyed.reduceByKey(lambda current, update: (current[0] + update[0], - current[1] + update[1], - current[2] + update[2])) + current[1] + update[1], + current[2] + update[2])) ip_stats_objected = ip_stats_sumed.mapValues(lambda avg_vals: (StatsItem(*avg_vals), INCREMENT)) @@ -297,10 +287,10 @@ def collect_hourly_stats(stats_json): def collect_daily_stats(hourly_stats): """ - aggregation of the time stats of _small_window_data_ in a tuple format (data, timestamp) into a default log vector - in format [0, 0, ... , 0, 0, 0] containing the newest data at the beginning - appends a result of the new small window at the beginning of the time log for each IP address - :param hourly_stats: _hourly_stats_ + Aggregation of the time stats of _small_window_data_ in a tuple format (data, timestamp) into a log vector + in format [data_t_n, data_t_n-1, ... , data_t_n-k] containing the entries of the most k recent + _small_window_data_ rdd-s where k = TIME_DIMENSION (= DAILY_INTERVAL/HOURLY_INTERVAL) + :param hourly_stats: _hourly_stats_ aggregated in HOURLY_INTERVAL window """ global INCREMENT @@ -313,21 +303,21 @@ def collect_daily_stats(hourly_stats): # "current_inc": INCREMENT, # "mod_pos": modulate_position(int(rdd[1][1])), # "value": rdd[1][0]}) - # long_window_debug.pprint(17) + # long_window_debug.pprint() - # log for each key(IP) is reduced from sparse logs of aggregated volume of activity for each time unit (=hour) - # first logs of small window in format IP: (volume, timestamp) are mapped into sparse vector=[0, 0, .. , volume, 0] - # where vector has a size of TIME_DIMENSION and volume inserted on modulated position (see modulate_position()) + # first logs of small window in format IP: (data, timestamp) are mapped into sparse vector=[0, 0, .. , volume, 0] + # where vector has a size of TIME_DIMENSION and data inserted on modulated position (see modulate_position()) # then sparse vectors are combined by merge - "summing-up": nonzero positions (see merge_init_arrays()) long_window_data_stream = long_window_base.map(lambda rdd: (rdd[0], initialize_array(rdd[1][0], rdd[1][1]))) \ .reduceByKey(lambda current, update: merge_init_arrays(current, update)) # current position counter update should keep consistent with small window counter - increment on each new data long_window_data_stream.reduce(lambda current, update: 1).foreachRDD(lambda rdd: increment()) + + # Debug print in interval of a small window + # long_window_data_stream.pprint(5) - long_window_data_stream.pprint(100) - - # gives the current log after every new batch from small window + # return the vector logs windowed in a daily interval return long_window_data_stream.window(HOURLY_INTERVAL, DAILY_INTERVAL) @@ -364,6 +354,8 @@ def collect_daily_stats(hourly_stats): hourly_host_statistics = collect_hourly_stats(input_stream_json) daily_host_statistics = collect_daily_stats(hourly_host_statistics) + daily_host_statistics.pprint(20) + # Transform computed statistics into desired json format and send it to output_host as given in -oh input param # hourly_host_statistics.foreachRDD(lambda rdd: rdd.pprint()) # daily_host_statistics.pprint(100) From 62c8bf1f21215652d5eff52bf31bdc4b63ba3c8c Mon Sep 17 00:00:00 2001 From: stefanik12 Date: Mon, 24 Jul 2017 11:19:19 +0200 Subject: [PATCH 4/7] host_daily_profile app: implemented output parsing and sending results to kafka --- .../hosts_profiling/host_daily_profile.py | 130 +++++------------- 1 file changed, 37 insertions(+), 93 deletions(-) diff --git a/applications/statistics/hosts_profiling/host_daily_profile.py b/applications/statistics/hosts_profiling/host_daily_profile.py index 6e1951b..338d6a1 100644 --- a/applications/statistics/hosts_profiling/host_daily_profile.py +++ b/applications/statistics/hosts_profiling/host_daily_profile.py @@ -57,6 +57,8 @@ from pyspark.streaming import StreamingContext # Spark streaming API from pyspark.streaming.kafka import KafkaUtils # Spark streaming Kafka receiver +from kafka import KafkaProducer # Kafka Python client + from collections import namedtuple @@ -67,8 +69,8 @@ ZERO_ITEM = StatsItem(0, 0, 0) # neutral item used if no new data about the IP was collected in recent interval # temporal constants -HOURLY_INTERVAL = 10 # aggregation interval for one item of temporal output array -DAILY_INTERVAL = 40 # collection interval of aggregations as items in output array +HOURLY_INTERVAL = 5 # aggregation interval for one item of temporal output array +DAILY_INTERVAL = 120 # collection interval of aggregations as items in output array TIME_DIMENSION = DAILY_INTERVAL / HOURLY_INTERVAL print("Time dimension: %s" % TIME_DIMENSION) @@ -151,116 +153,57 @@ def merge_init_arrays(a1, a2): return merge -def send_data(data, output_host): +def send_to_kafka(data, producer, topic): """ - Send given data to the specified host using standard socket interface. - + Send given data to the specified kafka topic. :param data: data to send - :param output_host: data receiver in the "hostname:port" format - """ - print data - - # Split outputHost hostname and port - host = output_host.split(':') - - # Prepare a TCP socket. - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - # Connect to the outputHost and send given data - try: - sock.connect((host[0], int(host[1]))) - sock.send(data) - - # Print message of sent - now = time.strftime("%c") - print("Data sent at: %s" % now) - - except socket.error: - cprint("[warning] Unable to connect to host " + output_host, "blue") - finally: - sock.close() - - -def _sort_by_flows(stats_values): - """ - Sorts the list of StatsItem by flows attribute - :param stats_values: list of StatsItem - :return: sorted list + :param producer: producer that sends the data + :param topic: name of the receiving kafka topic """ - return sorted(stats_values, key=lambda entry: entry.flows, reverse=True) + producer.send(topic, str(data)) -def _parse_stats_items_list(stats_values): +def process_results(json_rdd, producer, topic): """ - Parses the list of StatsItem into a dict of object for output JSON fromat - :param stats_values: list of StatsItem - :return: dict in output JSON format - """ - item_list = map(lambda stats_item: {stats_item.type: stats_item.key, "flows": stats_item.flows}, stats_values) - - # format the list of parsed StatsItems to a dict with keys of items' ordering - labeled_item_dict = {} - for order, item in map(lambda order: (order, item_list[order]), range(len(item_list))): - labeled_item_dict[int(order)] = item - - return labeled_item_dict - - -def process_results(json_rdd, n=10): - """ - Transform given computation results into the JSON format and send them to the specified host. + Transform given computation results into the JSON format and send them to the specified kafka instance. JSON format: {"src_ipv4":"", - "@type":"host_stats_topn_ports", + "@type":"host_stats_profile_24h", "stats":{ - "top_n_dst_ports": - { - "0": {"port":, "flows":# of flows}, - ... - "n": {"port":, "flows":# of flows} - }, - "top_n_dst_hosts": - { - "0": {"dst_host":, "flows":# of flows}, - ... - "n": {"dst_host":, "flows":# of flows} - } - "top_n_http_dst": - { - "0": {"dst_host":, "flows":# of flows}, - ... - "n": {"dst_host":, "flows":# of flows} - } + { + : {"packets": , "bytes": , "flows": }, + : {"packets": , "bytes": , "flows": }, + ... + : {"port":, "flows":# of flows} } } - :param json_rrd: Map in a format: (src IP , IPStats([PortStats], [DstIPStats], [HTTPHostStats])) - :return: json with selected TOP n ports by # of flows for each src IP - """ + Where is aggregated sum of the specified attribute in an interval of HOURLY_INTERVAL length + that has ended in time: - ( * HOURLY_INTERVAL) - # fill in n from input params - if args.top_n is not None: - n = int(args.top_n) + :param json_rrd: Map in a format: (src IP , [ IPStats(packets, bytes, flows), ..., IPStats(packets, bytes, flows) ]) + :return: + """ for ip, ip_stats in json_rdd.iteritems(): - # define output keys for particular stats in X_Stats named tuples - port_data_dict = {"top_n_dst_ports": ip_stats.ports, - "top_n_dst_hosts": ip_stats.dst_ips, - "top_n_http_dst": ip_stats.http_hosts} - - # take top n entries from IP's particular stats sorted by flows param - port_data_dict = {key: _sort_by_flows(val_list)[:n] for (key, val_list) in port_data_dict.iteritems()} - # parse the stats from StatsItem to a desirable form - port_data_dict = {key: _parse_stats_items_list(val_list) for (key, val_list) in port_data_dict.iteritems()} + stats_dict = dict() + for stat_idx in range(len(ip_stats)): + temporal_stats = {"packets": ip_stats[stat_idx].packets, + "bytes": ip_stats[stat_idx].bytes, + "flows": ip_stats[stat_idx].flows} + stats_dict[stat_idx] = temporal_stats # construct the output object in predefined format result_dict = {"@type": "top_n_host_stats", "src_ipv4": ip, - "stats": port_data_dict} + "stats": stats_dict} # send the processed data in json form - send_data(json.dumps(result_dict)+"\n", args.output_host) + send_to_kafka(json.dumps(result_dict)+"\n", producer, topic) + + # logging terminal output + print("%s: Stats of %s IPs parsed and sent" % (time.strftime("%c"), len(json_rdd.keys()))) def collect_hourly_stats(stats_json): @@ -313,7 +256,7 @@ def collect_daily_stats(hourly_stats): # current position counter update should keep consistent with small window counter - increment on each new data long_window_data_stream.reduce(lambda current, update: 1).foreachRDD(lambda rdd: increment()) - + # Debug print in interval of a small window # long_window_data_stream.pprint(5) @@ -354,10 +297,11 @@ def collect_daily_stats(hourly_stats): hourly_host_statistics = collect_hourly_stats(input_stream_json) daily_host_statistics = collect_daily_stats(hourly_host_statistics) - daily_host_statistics.pprint(20) + kafka_producer = KafkaProducer(bootstrap_servers=args.output_zookeeper, + client_id="spark-producer-" + application_name) # Transform computed statistics into desired json format and send it to output_host as given in -oh input param - # hourly_host_statistics.foreachRDD(lambda rdd: rdd.pprint()) + daily_host_statistics.foreachRDD(lambda rdd: process_results(rdd.collectAsMap(), kafka_producer, args.output_topic)) # daily_host_statistics.pprint(100) # Start input data processing From ba90a3785db1407b1f6389dd9e10ec5b8cd553d4 Mon Sep 17 00:00:00 2001 From: stefanik12 Date: Mon, 24 Jul 2017 11:27:06 +0200 Subject: [PATCH 5/7] host_daily_profile app: updated usage in app header --- .../statistics/hosts_profiling/host_daily_profile.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/applications/statistics/hosts_profiling/host_daily_profile.py b/applications/statistics/hosts_profiling/host_daily_profile.py index 338d6a1..6e632ce 100644 --- a/applications/statistics/hosts_profiling/host_daily_profile.py +++ b/applications/statistics/hosts_profiling/host_daily_profile.py @@ -1,5 +1,3 @@ -# TODO: placeholder application - see functionality suggestions in the main method - # -*- coding: utf-8 -*- # @@ -32,14 +30,13 @@ - a list of top n most active ports as sorted by a number of flows on a given port Usage: -TODO - top_n_host_stats.py -iz : -it -oh - : -n + host_daily_profile.py -iz : -it + -oz : -ot To run this on the Stream4Flow, you need to receive flows by IPFIXCol and make them available via Kafka topic. Then you can run the example - $ ./run-application.sh ./statistics/hosts_statistics/spark/top_n_host_stats.py -iz producer:2181 -it ipfix.entry - -oh consumer:20101 -n 5 -net "10.0.0.0/24" + $ ./run-application.sh .statistics/hosts_profiling/host_daily_profile.py -iz producer:2181 -it host.stats + -oz producer:9092 -ot results.daily """ From b7abbbce23c748cd3ad97433ae2f0e4f87a6469b Mon Sep 17 00:00:00 2001 From: stefanik12 Date: Mon, 31 Jul 2017 13:46:06 +0200 Subject: [PATCH 6/7] host_daily_profile app: Updated commenting and output format, slight refactoring --- .../hosts_profiling/host_daily_profile.py | 76 +++++++++---------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/applications/statistics/hosts_profiling/host_daily_profile.py b/applications/statistics/hosts_profiling/host_daily_profile.py index 6e632ce..38163f4 100644 --- a/applications/statistics/hosts_profiling/host_daily_profile.py +++ b/applications/statistics/hosts_profiling/host_daily_profile.py @@ -66,18 +66,16 @@ ZERO_ITEM = StatsItem(0, 0, 0) # neutral item used if no new data about the IP was collected in recent interval # temporal constants -HOURLY_INTERVAL = 5 # aggregation interval for one item of temporal output array -DAILY_INTERVAL = 120 # collection interval of aggregations as items in output array +HOURLY_INTERVAL = 3600 # aggregation interval for one item of temporal array +DAILY_INTERVAL = 86400 # collection interval of all aggregations as items in temporal array -TIME_DIMENSION = DAILY_INTERVAL / HOURLY_INTERVAL +TIME_DIMENSION = DAILY_INTERVAL / HOURLY_INTERVAL # number of aggregation entries in temporal array print("Time dimension: %s" % TIME_DIMENSION) -EMPTY_TIME_DIMENSIONS = [0] * TIME_DIMENSION - INCREMENT = 0 -# temporal methods reassuring the temporal array consistency +# temporal methods resolving the temporal array consistency and construction: def increment(): """ @@ -119,25 +117,12 @@ def initialize_array(value, timestamp): return update_array(list([ZERO_ITEM] * TIME_DIMENSION), modulate_position(timestamp), value) -def align_to_long_window(rdd_set): - """ - transforms all RDDs from _rdd_set_ of a format (data, timestamp) into an initial log - of the size of the default log length (=TIME_DIMENSION) - the log contains TIME_DIMENSION of zeros except on a modulated timestamp position - filling zeros as missing values - :param rdd_set: _rdd_set_ - """ - return rdd_set.map(lambda rdd: (rdd[0], initialize_array(rdd[1][0], rdd[1][1])) \ - if len(rdd[1]) == 2 - else (rdd[0], rdd[1])) - - def merge_init_arrays(a1, a2): """ Merges the given arrays so that the output array contains either value of a1, or a2 for each nonzero value Arrays should be in disjunction append -1 when both arrays are filled, so the error is traceable :param a1 array of the size of a2 :param a2 array of the size of a1 - :return Merges arrays + :return Merged arrays """ merge = [] for i in range(len(a1)): @@ -150,6 +135,8 @@ def merge_init_arrays(a1, a2): return merge +# post-processing methods for resulting temporal arrays: + def send_to_kafka(data, producer, topic): """ Send given data to the specified kafka topic. @@ -157,6 +144,9 @@ def send_to_kafka(data, producer, topic): :param producer: producer that sends the data :param topic: name of the receiving kafka topic """ + # Debug print - data to be sent to kafka in resulting format + # print data + producer.send(topic, str(data)) @@ -166,7 +156,7 @@ def process_results(json_rdd, producer, topic): JSON format: {"src_ipv4":"", - "@type":"host_stats_profile_24h", + "@type":"host_stats_temporal_profile", "stats":{ { : {"packets": , "bytes": , "flows": }, @@ -177,9 +167,11 @@ def process_results(json_rdd, producer, topic): } Where is aggregated sum of the specified attribute in an interval of HOURLY_INTERVAL length - that has ended in time: - ( * HOURLY_INTERVAL) + that has started in time: - ( * HOURLY_INTERVAL) and ended roughly in a time of send :param json_rrd: Map in a format: (src IP , [ IPStats(packets, bytes, flows), ..., IPStats(packets, bytes, flows) ]) + :param producer: producer that sends the data + :param topic: name of the receiving kafka topic :return: """ @@ -192,7 +184,7 @@ def process_results(json_rdd, producer, topic): stats_dict[stat_idx] = temporal_stats # construct the output object in predefined format - result_dict = {"@type": "top_n_host_stats", + result_dict = {"@type": "host_stats_temporal_profile", "src_ipv4": ip, "stats": stats_dict} @@ -203,9 +195,12 @@ def process_results(json_rdd, producer, topic): print("%s: Stats of %s IPs parsed and sent" % (time.strftime("%c"), len(json_rdd.keys()))) +# main computation methods: + def collect_hourly_stats(stats_json): """ - Performs a hourly aggregation on input data, whose result is to be collected in items of daily aggregation + Performs a hourly aggregation on input data, which result is to be collected as items of daily aggregation + :param stats_json: RDDs of stats in json format matching the output format of host_stats.py application :type stats_json: Initialized spark streaming context, with data in json format as in host_stats application """ @@ -216,11 +211,12 @@ def collect_hourly_stats(stats_json): json_rdd["stats"]["total"]["bytes"], json_rdd["stats"]["total"]["flow"]) )) - ip_stats_sumed = stats_windowed_keyed.reduceByKey(lambda current, update: (current[0] + update[0], - current[1] + update[1], - current[2] + update[2])) - ip_stats_objected = ip_stats_sumed.mapValues(lambda avg_vals: (StatsItem(*avg_vals), INCREMENT)) + ip_stats_summed = stats_windowed_keyed.reduceByKey(lambda current, update: (current[0] + update[0], + current[1] + update[1], + current[2] + update[2])) + + ip_stats_objected = ip_stats_summed.mapValues(lambda summed_values: (StatsItem(*summed_values), INCREMENT)) return ip_stats_objected @@ -234,30 +230,30 @@ def collect_daily_stats(hourly_stats): """ global INCREMENT - # set a window of DAY_WINDOW_INTERVAL upon small window RDDs + # set a window of DAY_WINDOW_INTERVAL on small-windowed RDDs long_window_base = hourly_stats.window(DAILY_INTERVAL, HOURLY_INTERVAL) # Debug print - see how recent incoming RDDs are transformed after each HOUR_WINDOW_INTERVAL - # long_window_debug = long_window_base.map(lambda rdd: {"ip": rdd[0], + # long_window_debug = long_window_base.map(lambda rdd: {"ip": rdd[0] # "rdd_timestamp": rdd[1][1], # "current_inc": INCREMENT, # "mod_pos": modulate_position(int(rdd[1][1])), # "value": rdd[1][0]}) # long_window_debug.pprint() - # first logs of small window in format IP: (data, timestamp) are mapped into sparse vector=[0, 0, .. , volume, 0] + # Here, RDDs of small window in format (IP: (data, timestamp)) are mapped into sparse vector=[0, 0, .. , volume, 0] # where vector has a size of TIME_DIMENSION and data inserted on modulated position (see modulate_position()) - # then sparse vectors are combined by merge - "summing-up": nonzero positions (see merge_init_arrays()) + # then sparse vectors are combined/merged: "summing-up" nonzero positions (see merge_init_arrays()) long_window_data_stream = long_window_base.map(lambda rdd: (rdd[0], initialize_array(rdd[1][0], rdd[1][1]))) \ .reduceByKey(lambda current, update: merge_init_arrays(current, update)) - # current position counter update should keep consistent with small window counter - increment on each new data + # position counter is consistent with small window length cycle - here increments on each new data from hourly_stats long_window_data_stream.reduce(lambda current, update: 1).foreachRDD(lambda rdd: increment()) - # Debug print in interval of a small window - # long_window_data_stream.pprint(5) + # Debug print of target temporal arrays in interval of a small window + long_window_data_stream.pprint(5) - # return the vector logs windowed in a daily interval + # return the temporal arrays windowed in a daily interval return long_window_data_stream.window(HOURLY_INTERVAL, DAILY_INTERVAL) @@ -276,12 +272,10 @@ def collect_daily_stats(hourly_stats): # Set variables application_name = os.path.basename(sys.argv[0]) # Application name used as identifier kafka_partitions = 1 # Number of partitions of the input Kafka topic - window_duration = 10 # Analysis window duration (10 seconds) - window_slide = 10 # Slide interval of the analysis window (10 seconds) # Spark context initialization sc = SparkContext(appName=application_name + " " + " ".join(sys.argv[1:])) # Application name used as the appName - ssc = StreamingContext(sc, 1) # Spark microbatch is 1 second + ssc = StreamingContext(sc, 1) # Spark micro batch is 1 second # Initialize input DStream of flows from specified Zookeeper server and Kafka topic input_stream = KafkaUtils.createStream(ssc, args.input_zookeeper, "spark-consumer-" + application_name, @@ -299,7 +293,9 @@ def collect_daily_stats(hourly_stats): # Transform computed statistics into desired json format and send it to output_host as given in -oh input param daily_host_statistics.foreachRDD(lambda rdd: process_results(rdd.collectAsMap(), kafka_producer, args.output_topic)) - # daily_host_statistics.pprint(100) + + # drop the processed RDDs to balance the memory usage + daily_host_statistics.foreachRDD(lambda rdd: rdd.unpersist()) # Start input data processing ssc.start() From 6277d524558efeee6fe39961661961f169ac99fa Mon Sep 17 00:00:00 2001 From: stefanik12 Date: Wed, 2 Aug 2017 11:11:48 +0200 Subject: [PATCH 7/7] hosts_profiling: readme created, host_daily_profile got window duration params --- .../statistics/hosts_profiling/README.md | 41 ++++++++++++++++++ .../{ => spark}/host_daily_profile.py | 43 +++++++++++-------- 2 files changed, 67 insertions(+), 17 deletions(-) create mode 100644 applications/statistics/hosts_profiling/README.md rename applications/statistics/hosts_profiling/{ => spark}/host_daily_profile.py (87%) diff --git a/applications/statistics/hosts_profiling/README.md b/applications/statistics/hosts_profiling/README.md new file mode 100644 index 0000000..a8d058b --- /dev/null +++ b/applications/statistics/hosts_profiling/README.md @@ -0,0 +1,41 @@ +## Host profiling + +### Description +An application for collecting aggregated characteristics of hosts in a longer period of time, typically 24 hours. +The application produces the temporal summary of activity of every host observed over the longer period of time. + +The aggregation and observation intervals can be set by `-lw` (long window), and `-sw` (short window) params, +defining the duration of **aggregations** (-**short window**) and a duration of **observation** (=**long window**) in seconds. +The long window must be a **multiple** of short window. + +By default, the durations are set to `-lw 86400 -sw 3600` to observe and deliver in a daily interval aggregated in hourly intervals. + +The application outputs the data of host logs with temporal identification as keys. The logs contain the aggregated data delivered +from [host_stats](https://github.com/CSIRT-MU/Stream4Flow/blob/master/applications/statistics/hosts_statistics/spark/host_stats.py) +application: +- **Packets**: for each host a sum of packets transferred in each of small windows +- **Bytes**: for each host a sum of bytes transferred in each of small aggregation windows +- **Flows**: for each host a sum of flows transferred in each of small aggregation windows + +Note that the application requires a running **Kafka** and Spark's **host_stats** application with output zookeeper +and output topic **matching** the input zookeeper and input topic of **this** application. + +In addition, the **host_stats** app is supposed to deliver data in **time interval dividing** the **``-sw``**, otherwise the results + of **host_daily_profile** will be **biased**. + +### Usage: +After setting up the stream data producer with matching ``-oz `` and `` -ot ``, start the application as follows: + +- General: + +```commandline +host_daily_profile.py -iz : -it -oz : -ot -sw -lw +``` + +- Stream4Flow example: + +```commandline +/home/spark/applications/run-application.sh /home/spark/applications/hosts_profiling/host_daily_profile.py -iz producer:2181 -it host.stats -oz producer:9092 -ot results.daily -sw 3600 -lw 86400 +``` + +...starts the application with **24h** delivery interval of **1h** statistics aggregations. \ No newline at end of file diff --git a/applications/statistics/hosts_profiling/host_daily_profile.py b/applications/statistics/hosts_profiling/spark/host_daily_profile.py similarity index 87% rename from applications/statistics/hosts_profiling/host_daily_profile.py rename to applications/statistics/hosts_profiling/spark/host_daily_profile.py index 38163f4..a9210d7 100644 --- a/applications/statistics/hosts_profiling/host_daily_profile.py +++ b/applications/statistics/hosts_profiling/spark/host_daily_profile.py @@ -66,11 +66,9 @@ ZERO_ITEM = StatsItem(0, 0, 0) # neutral item used if no new data about the IP was collected in recent interval # temporal constants -HOURLY_INTERVAL = 3600 # aggregation interval for one item of temporal array -DAILY_INTERVAL = 86400 # collection interval of all aggregations as items in temporal array - -TIME_DIMENSION = DAILY_INTERVAL / HOURLY_INTERVAL # number of aggregation entries in temporal array -print("Time dimension: %s" % TIME_DIMENSION) +# default values are overridden from input params if available +hourly_interval = 3600 # aggregation interval for one item of temporal array +daily_interval = 86400 # collection interval of all aggregations as items in temporal array INCREMENT = 0 @@ -91,7 +89,7 @@ def modulate_position(timestamp): the particular log in rdd timestamp: attached timestamp """ - result = (INCREMENT - timestamp) % TIME_DIMENSION + result = (INCREMENT - timestamp) % time_dimension return result @@ -109,12 +107,12 @@ def update_array(array, position, value): def initialize_array(value, timestamp): """ - initializes an empty array of default log length (=TIME_DIMENSION) with a _value_ + initializes an empty array of default log length (=time_dimension) with a _value_ inserted on a position at a given _timestamp_ :param value: _value_ :param timestamp: _timestamp_ """ - return update_array(list([ZERO_ITEM] * TIME_DIMENSION), modulate_position(timestamp), value) + return update_array(list([ZERO_ITEM] * time_dimension), modulate_position(timestamp), value) def merge_init_arrays(a1, a2): @@ -162,12 +160,12 @@ def process_results(json_rdd, producer, topic): : {"packets": , "bytes": , "flows": }, : {"packets": , "bytes": , "flows": }, ... - : {"port":, "flows":# of flows} + : {"port":, "flows":# of flows} } } - Where is aggregated sum of the specified attribute in an interval of HOURLY_INTERVAL length - that has started in time: - ( * HOURLY_INTERVAL) and ended roughly in a time of send + Where is aggregated sum of the specified attribute in an interval of hourly_interval length + that has started in time: - ( * hourly_interval) and ended roughly in a time of send :param json_rrd: Map in a format: (src IP , [ IPStats(packets, bytes, flows), ..., IPStats(packets, bytes, flows) ]) :param producer: producer that sends the data @@ -204,7 +202,7 @@ def collect_hourly_stats(stats_json): :type stats_json: Initialized spark streaming context, with data in json format as in host_stats application """ - stats_windowed = stats_json.window(HOURLY_INTERVAL, HOURLY_INTERVAL) + stats_windowed = stats_json.window(hourly_interval, hourly_interval) stats_windowed_keyed = stats_windowed.map(lambda json_rdd: (json_rdd["src_ipv4"], (json_rdd["stats"]["total"]["packets"], @@ -225,13 +223,13 @@ def collect_daily_stats(hourly_stats): """ Aggregation of the time stats of _small_window_data_ in a tuple format (data, timestamp) into a log vector in format [data_t_n, data_t_n-1, ... , data_t_n-k] containing the entries of the most k recent - _small_window_data_ rdd-s where k = TIME_DIMENSION (= DAILY_INTERVAL/HOURLY_INTERVAL) - :param hourly_stats: _hourly_stats_ aggregated in HOURLY_INTERVAL window + _small_window_data_ rdd-s where k = time_dimension (= daily_interval/hourly_interval) + :param hourly_stats: _hourly_stats_ aggregated in hourly_interval window """ global INCREMENT # set a window of DAY_WINDOW_INTERVAL on small-windowed RDDs - long_window_base = hourly_stats.window(DAILY_INTERVAL, HOURLY_INTERVAL) + long_window_base = hourly_stats.window(daily_interval, hourly_interval) # Debug print - see how recent incoming RDDs are transformed after each HOUR_WINDOW_INTERVAL # long_window_debug = long_window_base.map(lambda rdd: {"ip": rdd[0] @@ -242,7 +240,7 @@ def collect_daily_stats(hourly_stats): # long_window_debug.pprint() # Here, RDDs of small window in format (IP: (data, timestamp)) are mapped into sparse vector=[0, 0, .. , volume, 0] - # where vector has a size of TIME_DIMENSION and data inserted on modulated position (see modulate_position()) + # where vector has a size of time_dimension and data inserted on modulated position (see modulate_position()) # then sparse vectors are combined/merged: "summing-up" nonzero positions (see merge_init_arrays()) long_window_data_stream = long_window_base.map(lambda rdd: (rdd[0], initialize_array(rdd[1][0], rdd[1][1]))) \ .reduceByKey(lambda current, update: merge_init_arrays(current, update)) @@ -254,7 +252,7 @@ def collect_daily_stats(hourly_stats): long_window_data_stream.pprint(5) # return the temporal arrays windowed in a daily interval - return long_window_data_stream.window(HOURLY_INTERVAL, DAILY_INTERVAL) + return long_window_data_stream.window(hourly_interval, daily_interval) if __name__ == "__main__": @@ -266,9 +264,20 @@ def collect_daily_stats(hourly_stats): parser.add_argument("-oz", "--output_zookeeper", help="output zookeeper hostname:port", type=str, required=True) parser.add_argument("-ot", "--output_topic", help="output kafka topic", type=str, required=True) + parser.add_argument("-sw", "--short_window", help="small window duration", type=int, required=False) + parser.add_argument("-lw", "--long_window", help="long window duration", type=int, required=False) + # Parse arguments. args = parser.parse_args() + # if input arguments are filled, override the default temporal values + if args.short_window and args.long_window: + hourly_interval = args.short_window + daily_interval = args.long_window + + time_dimension = daily_interval / hourly_interval # set a number of aggregation entries in temporal array + print("Time dimension set to %s" % time_dimension) + # Set variables application_name = os.path.basename(sys.argv[0]) # Application name used as identifier kafka_partitions = 1 # Number of partitions of the input Kafka topic