-
Notifications
You must be signed in to change notification settings - Fork 0
/
poll_watcher.py
51 lines (47 loc) · 1.8 KB
/
poll_watcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import os
import json
import httpx
import logging
import kafka_pub
from prometheus_client import Counter
import asyncio
from gauges import quote_gauge_update
QUOTES_URL='https://data.alpaca.markets/v2/stocks/quotes/latest'
last_quotes={}
def is_dupe(symbol, quote):
global last_quotes
if symbol in last_quotes and quote["t"] == last_quotes[symbol]["t"]:
logging.debug("Duplicate quote for " + symbol)
return True
else:
last_quotes[symbol] = quote
return False
def prep_request(symbols, auth_headers):
return httpx.Request(
'GET',
QUOTES_URL,
params={'symbols': ','.join(symbols)},
headers=auth_headers
)
async def poller(symbols, interval, auth_headers):
request_ctr = Counter('poll_requests', 'Number of HTTP requests')
quote_ctr = Counter('poll_quotes', 'Number of unique quotes', ['symbol'])
error_ctr = Counter('poll_errors', 'Number of polling errors')
prepped = prep_request(symbols, auth_headers)
async with httpx.AsyncClient(http2=True) as client:
while True:
response = await client.send(prepped)
request_ctr.inc()
if (response.status_code == httpx.codes.ok):
quotes = response.json()['quotes']
for symbol in quotes.keys():
logging.info(f"Quote: {quotes[symbol]}")
if not is_dupe(symbol, quotes[symbol]):
await kafka_pub.publish(symbol, 'quote', quotes[symbol])
quote_ctr.labels(symbol).inc()
quote_gauge_update(symbol, quotes[symbol])
await kafka_pub.flush()
else:
logging.error('Error retrieving quotes: ' + response.text)
error_ctr.inc()
await asyncio.sleep(interval)