-
Notifications
You must be signed in to change notification settings - Fork 2
/
kafka-producer-v0.py
117 lines (92 loc) · 3.65 KB
/
kafka-producer-v0.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Tweepy - Python library for accessing the Twitter API.
import tweepy
# Pandas - Data manipulation and analysis library
import pandas as pd
# NumPy - mathematical functions on multi-dimensional arrays and matrices
import numpy as np
# Regular Expression Python module
import re
import twitter_credentials
import json
import requests
import datetime as DT
import RAKE
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
from kafka import KafkaProducer
import time
# import elasticsearch
# from elasticsearch import Elasticsearch,helpers
# Twitter API config
twitterApiKey = twitter_credentials.CONSUMER_KEY
twitterApiSecret = twitter_credentials.CONSUMER_SECRET
twitterApiAccessToken = twitter_credentials.ACCESS_TOKEN
twitterApiAccessTokenSecret = twitter_credentials.ACCESS_TOKEN_SECRET
# Authenticate
auth = tweepy.OAuthHandler(twitterApiKey,twitterApiSecret)
auth.set_access_token(twitterApiAccessToken, twitterApiAccessTokenSecret)
twetterApi = tweepy.API(auth, wait_on_rate_limit = True)
stopwords = stopwords.words('english')
rake_object = RAKE.Rake(stopwords)
keywords = ['officials issued citations', 'coronavirus violations overnight', 'ohio investigative unit']
APIKEY=""
url = ('https://newsapi.org/v2/top-headlines?country=us&category=business&'
'apiKey='+APIKEY)
# serialize data for kafka
def json_serializer(data):
return json.dumps(data).encode("utf-8")
producer = KafkaProducer(bootstrap_servers=['10.128.0.57:9092'], value_serializer=json_serializer)
response = requests.get(url)
# issue: need to remove duplicates
search_keys = []
search_indices = []
for k in response.json()['articles']:
if k['description'] != None:
keywords_pre = rake_object.run(k['description'][:-1]+k['title'],maxWords=5,minFrequency=1)
keywords = list(map(lambda x: x[0],keywords_pre))[:3]
index = '-'.join(keywords).replace(' ', '-')
search_indices.append(index)
keywords_or = '\"'+'\" OR \"'.join(keywords)+'\"'
search_keys.append(keywords_or)
# res1 = es.index(index='news_v2',id=index,body=k)
producer.send("news_v2", k)
for i in search_keys[7:]:
print(i)
def get_text(data):
if data.get("extended_tweet")!=None:
return data.get("extended_tweet").get("full_text")
elif data.get("retweeted_status") != None:
if data.get("retweeted_status").get("extended_tweet") !=None:
return data.get("retweeted_status").get("extended_tweet").get("full_text")
else:
return data.get("retweeted_status").get("text")
else:
return data.get("text")
if __name__ == "__main__":
while 1 == 1:
for idx,keywords in enumerate(search_keys):
test=keywords
date_since = str(DT.date.today()-DT.timedelta(days=14))
print(keywords)
tweets = tweepy.Cursor(twetterApi.search,
q=keywords,
lang="en",
since=date_since).items(2000)
index = search_indices[idx]
for t in tweets:
try:
data=t._json
text = get_text(data)
data_clean = {
'news_id':index,
"created_at":data.get("created_at"),
'text':text,
'sentiment': model(text)
}
# res = es.index(index='tweets_v2',id=data["id"],body=data_clean)
producer.send("tweets_v2", data_clean)
except BaseException as e:
print(data)
print("Error on_data %s" % str(e))
time.sleep(910)