forked from maxbbraun/trump2cash
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathanalysis.py
295 lines (240 loc) · 10.7 KB
/
analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
from backoff import expo
from backoff import on_exception
from google.cloud import language
from re import compile
from re import IGNORECASE
from requests import get
from requests.exceptions import HTTPError
from urllib.parse import quote_plus
from logs import Logs
from twitter import Twitter
# The URL for a GET request to the Wikidata API. The string parameter is the
# SPARQL query.
WIKIDATA_QUERY_URL = "https://query.wikidata.org/sparql?query=%s&format=JSON"
# The HTTP headers with a User-Agent used for Wikidata requests.
WIKIDATA_QUERY_HEADERS = {
"User-Agent": "Trump2Cash/1.0 (https://trump2cash.biz)"}
# A Wikidata SPARQL query to find stock ticker symbols and other information
# for a company. The string parameter is the Freebase ID of the company.
MID_TO_TICKER_QUERY = (
'SELECT ?companyLabel ?rootLabel ?tickerLabel ?exchangeNameLabel'
' WHERE {'
' ?entity wdt:P646 "%s" .' # Entity with specified Freebase ID.
' ?entity wdt:P176* ?manufacturer .' # Entity may be product.
' ?manufacturer wdt:P1366* ?company .' # Company may have restructured.
' { ?company p:P414 ?exchange } UNION' # Company traded on exchange ...
' { ?company wdt:P127+ / wdt:P1366* ?root .' # ... or company has owner.
' ?root p:P414 ?exchange } UNION' # Owner traded on exchange or ...
' { ?company wdt:P749+ / wdt:P1366* ?root .' # ... company has parent.
' ?root p:P414 ?exchange } .' # Parent traded on exchange.
' VALUES ?exchanges { wd:Q13677 wd:Q82059 } .' # Whitelist NYSE, NASDAQ.
' ?exchange ps:P414 ?exchanges .' # Stock exchange is whitelisted.
' ?exchange pq:P249 ?ticker .' # Get ticker symbol.
' ?exchange ps:P414 ?exchangeName .' # Get name of exchange.
' FILTER NOT EXISTS { ?company wdt:P31 /' # Exclude newspapers.
' wdt:P279* wd:Q11032 } .'
' FILTER NOT EXISTS { ?company wdt:P31 /' # Exclude news agencies.
' wdt:P279* wd:Q192283 } .'
' FILTER NOT EXISTS { ?company wdt:P31 /' # Exclude news magazines.
' wdt:P279* wd:Q1684600 } .'
' FILTER NOT EXISTS { ?company wdt:P31 /' # Exclude radio stations.
' wdt:P279* wd:Q14350 } .'
' FILTER NOT EXISTS { ?company wdt:P31 /' # Exclude TV stations.
' wdt:P279* wd:Q1616075 } .'
' FILTER NOT EXISTS { ?company wdt:P31 /' # Exclude TV channels.
' wdt:P279* wd:Q2001305 } .'
' SERVICE wikibase:label {'
' bd:serviceParam wikibase:language "en" .' # Use English labels.
' }'
' } GROUP BY ?companyLabel ?rootLabel ?tickerLabel ?exchangeNameLabel'
' ORDER BY ?companyLabel ?rootLabel ?tickerLabel ?exchangeNameLabel')
class Analysis:
"""A helper for analyzing company data in text."""
def __init__(self, logs_to_cloud):
self.logs = Logs(name="analysis", to_cloud=logs_to_cloud)
self.language_client = language.LanguageServiceClient()
self.twitter = Twitter(logs_to_cloud=logs_to_cloud)
def get_company_data(self, mid):
"""Looks up stock ticker information for a company via its Freebase ID.
"""
query = MID_TO_TICKER_QUERY % mid
try:
bindings = self.make_wikidata_request(query)
except HTTPError as e:
self.logs.error("Wikidata request failed: %s" % e)
bindings = None
if not bindings:
self.logs.debug("No company data found for MID: %s" % mid)
return None
# Collect the data from the response.
datas = []
for binding in bindings:
try:
name = binding["companyLabel"]["value"]
except KeyError:
name = None
try:
root = binding["rootLabel"]["value"]
except KeyError:
root = None
try:
ticker = binding["tickerLabel"]["value"]
except KeyError:
ticker = None
try:
exchange = binding["exchangeNameLabel"]["value"]
except KeyError:
exchange = None
data = {"name": name,
"ticker": ticker,
"exchange": exchange}
# Add the root if there is one.
if root and root != name:
data["root"] = root
# Add to the list unless we already have the same entry.
if data not in datas:
self.logs.debug("Adding company data: %s" % data)
datas.append(data)
else:
self.logs.warn("Skipping duplicate company data: %s" % data)
return datas
def find_companies(self, tweet):
"""Finds mentions of companies in a tweet."""
if not tweet:
self.logs.warn("No tweet to find companies.")
return None
# Use the text of the tweet with any mentions expanded to improve
# entity detection.
text = self.get_expanded_text(tweet)
if not text:
self.logs.error("Failed to get text from tweet: %s" % tweet)
return None
# Run entity detection.
document = language.types.Document(
content=text,
type=language.enums.Document.Type.PLAIN_TEXT,
language="en")
entities = self.language_client.analyze_entities(document).entities
self.logs.debug("Found entities: %s" %
self.entities_tostring(entities))
# Collect all entities which are publicly traded companies, i.e.
# entities which have a known stock ticker symbol.
companies = []
for entity in entities:
# Use the Freebase ID of the entity to find company data. Skip any
# entity which doesn't have a Freebase ID (unless we find one via
# the Twitter handle).
name = entity.name
metadata = entity.metadata
try:
mid = metadata["mid"]
except KeyError:
self.logs.debug("No MID found for entity: %s" % name)
continue
company_data = self.get_company_data(mid)
# Skip any entity for which we can't find any company data.
if not company_data:
self.logs.debug("No company data found for entity: %s (%s)" %
(name, mid))
continue
self.logs.debug("Found company data: %s" % company_data)
for company in company_data:
# Extract and add a sentiment score.
sentiment = self.get_sentiment(text)
self.logs.debug("Using sentiment for company: %f %s" %
(sentiment, company))
company["sentiment"] = sentiment
# Add the company to the list unless we already have the same
# ticker.
tickers = [existing["ticker"] for existing in companies]
if not company["ticker"] in tickers:
companies.append(company)
else:
self.logs.warn(
"Skipping company with duplicate ticker: %s" % company)
return companies
def get_expanded_text(self, tweet):
"""Retrieves the text from a tweet with any @mentions expanded to
their full names.
"""
if not tweet:
self.logs.warn("No tweet to expand text.")
return None
try:
text = self.twitter.get_tweet_text(tweet)
mentions = tweet["entities"]["user_mentions"]
except KeyError:
self.logs.error("Malformed tweet: %s" % tweet)
return None
if not text:
self.logs.warn("Empty text.")
return None
if not mentions:
self.logs.debug("No mentions.")
return text
self.logs.debug("Using mentions: %s" % mentions)
for mention in mentions:
try:
screen_name = "@%s" % mention["screen_name"]
name = mention["name"]
except KeyError:
self.logs.warn("Malformed mention: %s" % mention)
continue
self.logs.debug("Expanding mention: %s %s" % (screen_name, name))
pattern = compile(screen_name, IGNORECASE)
text = pattern.sub(name, text)
return text
@on_exception(expo, HTTPError, max_tries=8)
def make_wikidata_request(self, query):
"""Makes a request to the Wikidata SPARQL API."""
query_url = WIKIDATA_QUERY_URL % quote_plus(query)
self.logs.debug("Wikidata query: %s" % query_url)
response = get(query_url, headers=WIKIDATA_QUERY_HEADERS)
response.raise_for_status()
try:
response_json = response.json()
except ValueError:
self.logs.error("Failed to decode JSON response: %s" % response)
return None
self.logs.debug("Wikidata response: %s" % response_json)
try:
results = response_json["results"]
bindings = results["bindings"]
except KeyError:
self.logs.error("Malformed Wikidata response: %s" % response_json)
return None
return bindings
def entities_tostring(self, entities):
"""Converts a list of entities to a readable string."""
tostrings = [self.entity_tostring(entity) for entity in entities]
return "[%s]" % ", ".join(tostrings)
def entity_tostring(self, entity):
"""Converts one entity to a readable string."""
metadata = ", ".join(['"%s": "%s"' % (key, value) for
key, value in entity.metadata.items()])
mentions = ", ".join(['"%s"' % mention for mention in entity.mentions])
return ('{name: "%s",'
' type: "%s",'
' metadata: {%s},'
' salience: %s,'
' mentions: [%s]}') % (
entity.name,
entity.type,
metadata,
entity.salience,
mentions)
def get_sentiment(self, text):
"""Extracts a sentiment score [-1, 1] from text."""
if not text:
self.logs.warn("No sentiment for empty text.")
return 0
document = language.types.Document(
content=text,
type=language.enums.Document.Type.PLAIN_TEXT,
language="en")
sentiment = self.language_client.analyze_sentiment(
document).document_sentiment
self.logs.debug(
"Sentiment score and magnitude for text: %f %f \"%s\"" %
(sentiment.score, sentiment.magnitude, text))
return sentiment.score