-
Notifications
You must be signed in to change notification settings - Fork 0
/
SparkFlume.py
50 lines (39 loc) · 1.67 KB
/
SparkFlume.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
parts = [
r'(?P<host>\S+)', # host %h
r'\S+', # indent %l (unused)
r'(?P<user>\S+)', # user %u
r'\[(?P<time>.+)\]', # time %t
r'"(?P<request>.+)"', # request "%r"
r'(?P<status>[0-9]+)', # status %>s
r'(?P<size>\S+)', # size %b (careful, can be '-')
r'"(?P<referer>.*)"', # referer "%{Referer}i"
r'"(?P<agent>.*)"', # user agent "%{User-agent}i"
]
pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')
def extractURLRequest(line):
exp = pattern.match(line)
if exp:
request = exp.groupdict()["request"]
if request:
requestFields = request.split()
if (len(requestFields) > 1):
return requestFields[1]
if __name__ == "__main__":
sc = SparkContext(appName="StreamingFlumeLogAggregator")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
flumeStream = FlumeUtils.createStream(ssc, "localhost", 9092)
lines = flumeStream.map(lambda x: x[1])
urls = lines.map(extractURLRequest)
# Reduce by URL over a 5-minute window sliding every second
urlCounts = urls.map(lambda x: (x, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y : x - y, 300, 1)
# Sort and print the results
sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))
sortedResults.pprint()
ssc.checkpoint("/home/maria_dev/checkpoint")
ssc.start()
ssc.awaitTermination()