Skip to content

Commit

Permalink
feat(alpacabkfeeder): retrieve tradable stocks from a remote json file (
Browse files Browse the repository at this point in the history
  • Loading branch information
dakimura authored Mar 29, 2022
1 parent 3d0a0d4 commit 19a890f
Show file tree
Hide file tree
Showing 23 changed files with 319 additions and 66 deletions.
6 changes: 4 additions & 2 deletions contrib/alpacabkfeeder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ bgworkers:
- NASDAQ
# - NYSEARCA
# - OTC
# time when target symbols in the exchanges are updated every day.
# this time is also used for the historical data back-fill.
# time when the list of target symbols (tradable stocks) are updated every day.
# This config can be manually overridden by "ALPACA_BROKER_FEEDER_SYMBOLS_UPDATE_TIME" environmental variable.
symbols_update_time: "13:00:00" # (UTC). = every day at 08:00:00 (EST)
# time when the historical data back-fill is run.
# This config can be manually overridden by "ALPACA_BROKER_FEEDER_UPDATE_TIME" environmental variable.
update_time: "13:30:00" # (UTC). = every day at 08:30:00 (EST)
# Alpava Broker API Feeder writes data to "{symbol}/{timeframe}/TICK" TimeBucketKey
Expand Down
46 changes: 27 additions & 19 deletions contrib/alpacabkfeeder/alpacav2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net/http"
"time"

"github.com/pkg/errors"
Expand All @@ -18,6 +19,8 @@ import (
"github.com/alpacahq/marketstore/v4/utils/log"
)

const getJSONFileTimeout = 10 * time.Second

// NewBgWorker returns the new instance of Alpaca Broker API Feeder.
// See configs.Config for the details of available configurations.
// nolint:deadcode // used as a plugin
Expand All @@ -28,18 +31,7 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
}
log.Info("loaded Alpaca Broker Feeder config...")

// init Alpaca API client
cred := &api.APIKey{
ID: config.APIKeyID,
PolygonKeyID: config.APIKeyID,
Secret: config.APISecretKey,
// OAuth: os.Getenv(EnvApiOAuth),
}
if config.APIKeyID == "" || config.APISecretKey == "" {
// if empty, get from env vars
cred = api.Credentials()
}
apiClient := api.NewClient(cred)
apiCli := apiClient(config)

// init Market Time Checker
var timeChecker feed.MarketTimeChecker
Expand Down Expand Up @@ -73,12 +65,13 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
}

ctx := context.Background()
// init Symbols Manager to update symbols in the target exchanges

sm := symbols.NewManager(apiClient, config.Exchanges)
// init symbols Manager to update symbols in the target exchanges
sm := symbols.NewJSONFileManager(&http.Client{Timeout: getJSONFileTimeout},
config.StocksJSONURL, config.StocksJSONBasicAuth,
)
sm.UpdateSymbols()
timer.RunEveryDayAt(ctx, config.UpdateTime, sm.UpdateSymbols)
log.Info("updated symbols in the target exchanges")
timer.RunEveryDayAt(ctx, config.SymbolsUpdateTime, sm.UpdateSymbols)
log.Info("updated symbols using a remote json file.")

// init SnapshotWriter
var ssw writer.SnapshotWriter = writer.SnapshotWriterImpl{
Expand All @@ -97,20 +90,35 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
if config.Backfill.Enabled {
const maxBarsPerRequest = 1000
const maxSymbolsPerRequest = 100
bf := feed.NewBackfill(sm, apiClient, bw, time.Time(config.Backfill.Since),
bf := feed.NewBackfill(sm, apiCli, bw, time.Time(config.Backfill.Since),
maxBarsPerRequest, maxSymbolsPerRequest,
)
timer.RunEveryDayAt(ctx, config.UpdateTime, bf.UpdateSymbols)
}

return &feed.Worker{
MarketTimeChecker: timeChecker,
APIClient: apiClient,
APIClient: apiCli,
SymbolManager: sm,
SnapshotWriter: ssw,
BarWriter: bw,
Interval: config.Interval,
}, nil
}

func apiClient(config *configs.DefaultConfig) *api.Client {
// init Alpaca API client
cred := &api.APIKey{
ID: config.APIKeyID,
PolygonKeyID: config.APIKeyID,
Secret: config.APISecretKey,
// OAuth: os.Getenv(EnvApiOAuth),
}
if config.APIKeyID == "" || config.APISecretKey == "" {
// if empty, get from env vars
cred = api.Credentials()
}
return api.NewClient(cred)
}

func main() {}
18 changes: 9 additions & 9 deletions contrib/alpacabkfeeder/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (

var (
// DefaultClient is the default Alpaca client using the
// environment variable set credentials
// environment variable set credentials.
DefaultClient = NewClient(Credentials())
base = "https://api.alpaca.markets"
dataURL = "https://data.alpaca.markets"
Expand All @@ -35,10 +35,10 @@ func defaultDo(c *Client, req *http.Request) (*http.Response, error) {
if c.credentials.OAuth != "" {
req.Header.Set("Authorization", "Bearer "+c.credentials.OAuth)
} else {
if strings.Contains(req.URL.String(), "sandbox"){
if strings.Contains(req.URL.String(), "sandbox") {
// Add Basic Auth
req.SetBasicAuth(c.credentials.ID, c.credentials.Secret)
}else {
} else {
req.Header.Set("APCA-API-KEY-ID", c.credentials.ID)
req.Header.Set("APCA-API-SECRET-KEY", c.credentials.Secret)
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func defaultDo(c *Client, req *http.Request) (*http.Response, error) {
}

const (
// v2MaxLimit is the maximum allowed limit parameter for all v2 endpoints
// v2MaxLimit is the maximum allowed limit parameter for all v2 endpoints.
v2MaxLimit = 10000
)

Expand Down Expand Up @@ -102,7 +102,7 @@ func init() {
}

// APIError wraps the detailed code and message supplied
// by Alpaca's API for debugging purposes
// by Alpaca's API for debugging purposes.
type APIError struct {
Code int `json:"code"`
Message string `json:"message"`
Expand All @@ -112,7 +112,7 @@ func (e *APIError) Error() string {
return e.Message
}

// Client is an Alpaca REST API client
// Client is an Alpaca REST API client.
type Client struct {
credentials *APIKey
}
Expand All @@ -122,12 +122,12 @@ func SetBaseUrl(baseUrl string) {
}

// NewClient creates a new Alpaca client with specified
// credentials
// credentials.
func NewClient(credentials *APIKey) *Client {
return &Client{credentials: credentials}
}

// GetSnapshots returns the snapshots for multiple symbol
// GetSnapshots returns the snapshots for multiple symbol.
func (c *Client) GetSnapshots(symbols []string) (map[string]*Snapshot, error) {
u, err := url.Parse(fmt.Sprintf("%s/v2/stocks/snapshots?symbols=%s",
dataURL, strings.Join(symbols, ",")))
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *Client) ListBars(symbols []string, opts v1.ListBarParams) (map[string][
func (c *Client) ListAssets(status *string) ([]v1.Asset, error) {
// TODO: add tests
apiVer := apiVersion
if strings.Contains(base, "broker"){
if strings.Contains(base, "broker") {
apiVer = "v1"
}

Expand Down
8 changes: 4 additions & 4 deletions contrib/alpacabkfeeder/api/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package api

import "time"

// Trade is a stock trade that happened on the market
// Trade is a stock trade that happened on the market.
type Trade struct {
ID int64 `json:"i"`
Exchange string `json:"x"`
Expand All @@ -13,7 +13,7 @@ type Trade struct {
Tape string `json:"z"`
}

// Quote is a stock quote from the market
// Quote is a stock quote from the market.
type Quote struct {
BidExchange string `json:"bx"`
BidPrice float64 `json:"bp"`
Expand All @@ -26,7 +26,7 @@ type Quote struct {
Tape string `json:"z"`
}

// Bar is an aggregate of trades
// Bar is an aggregate of trades.
type Bar struct {
Open float64 `json:"o"`
High float64 `json:"h"`
Expand All @@ -36,7 +36,7 @@ type Bar struct {
Timestamp time.Time `json:"t"`
}

// Snapshot is a snapshot of a symbol
// Snapshot is a snapshot of a symbol.
type Snapshot struct {
LatestTrade *Trade `json:"latestTrade"`
LatestQuote *Quote `json:"latestQuote"`
Expand Down
2 changes: 1 addition & 1 deletion contrib/alpacabkfeeder/api/v1/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Asset struct {
Symbol string `json:"symbol"`
Status string `json:"status"`
Tradable bool `json:"tradable"`
Marginable bool `json:"marginable"`
Marginal bool `json:"marginal"`
Shortable bool `json:"shortable"`
EasyToBorrow bool `json:"easy_to_borrow"`
}
Expand Down
9 changes: 7 additions & 2 deletions contrib/alpacabkfeeder/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary
// marketstore's config file through bgworker extension.
type DefaultConfig struct {
Exchanges []Exchange `json:"exchanges"`
SymbolsUpdateTime time.Time `json:"symbols_update_time"`
UpdateTime time.Time `json:"update_time"`
StocksJSONURL string `json:"stocks_json_url"`
StocksJSONBasicAuth string `json:"stocks_json_basic_auth"`
Timeframe string `json:"timeframe"`
APIKeyID string `json:"api_key_id"`
APISecretKey string `json:"api_secret_key"`
Expand Down Expand Up @@ -84,6 +87,7 @@ func (c *DefaultConfig) UnmarshalJSON(input []byte) error {
type Alias DefaultConfig

aux := &struct {
SymbolsUpdateTime CustomTime `json:"symbols_update_time"`
UpdateTime CustomTime `json:"update_time"`
OpenTime CustomTime `json:"openTime"`
CloseTime CustomTime `json:"closeTime"`
Expand All @@ -95,6 +99,7 @@ func (c *DefaultConfig) UnmarshalJSON(input []byte) error {
if err := json.Unmarshal(input, &aux); err != nil {
return err
}
c.SymbolsUpdateTime = time.Time(aux.SymbolsUpdateTime)
c.UpdateTime = time.Time(aux.UpdateTime)
c.OpenTime = time.Time(aux.OpenTime)
c.CloseTime = time.Time(aux.CloseTime)
Expand All @@ -108,15 +113,15 @@ func (c *DefaultConfig) UnmarshalJSON(input []byte) error {
func convertTime(w []weekday) []time.Weekday {
d := make([]time.Weekday, len(w))
for i := range w {
d = append(d, time.Weekday(w[i]))
d[i] = time.Weekday(w[i])
}
return d
}

func convertDate(cd []CustomDay) []time.Time {
d := make([]time.Time, len(cd))
for i := range cd {
d = append(d, time.Time(cd[i]))
d[i] = time.Time(cd[i])
}
return d
}
Expand Down
26 changes: 17 additions & 9 deletions contrib/alpacabkfeeder/configs/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
)

var testConfig = map[string]interface{}{
"api_key_id": "hello",
"api_secret_key": "world",
"update_time": "12:34:56",
"exchanges": []string{"AMEX", "ARCA", "BATS", "NYSE", "NASDAQ", "NYSEARCA", "OTC"},
"index_groups": []string{"bar"},
"api_key_id": "hello",
"api_secret_key": "world",
"symbols_update_time": "01:23:45",
"update_time": "12:34:56",
"stocks_json_basic_auth": "foo:bar",
"exchanges": []string{"AMEX", "ARCA", "BATS", "NYSE", "NASDAQ", "NYSEARCA", "OTC"},
"index_groups": []string{"bar"},
}

var configWithInvalidExchange = map[string]interface{}{
Expand All @@ -30,12 +32,14 @@ func TestNewConfig(t *testing.T) {
want *configs.DefaultConfig
wantErr bool
}{
"ok/ API key ID, API secret key and UpdateTime can be overridden by env vars": {
"ok/ API key ID, API secret key, UpdateTime, basicAuth can be overridden by env vars": {
config: testConfig,
envVars: map[string]string{
"ALPACA_BROKER_FEEDER_API_KEY_ID": "yo",
"ALPACA_BROKER_FEEDER_API_SECRET_KEY": "yoyo",
"ALPACA_BROKER_FEEDER_UPDATE_TIME": "20:00:00",
"ALPACA_BROKER_FEEDER_API_KEY_ID": "yo",
"ALPACA_BROKER_FEEDER_API_SECRET_KEY": "yoyo",
"ALPACA_BROKER_FEEDER_SYMBOLS_UPDATE_TIME": "10:00:00",
"ALPACA_BROKER_FEEDER_UPDATE_TIME": "20:00:00",
"ALPACA_BROKER_FEEDER_STOCKS_JSON_BASIC_AUTH": "akkie:mypassword",
},
want: &configs.DefaultConfig{
Exchanges: []configs.Exchange{
Expand All @@ -44,9 +48,11 @@ func TestNewConfig(t *testing.T) {
},
ClosedDaysOfTheWeek: []time.Weekday{},
ClosedDays: []time.Time{},
SymbolsUpdateTime: time.Date(0, 1, 1, 10, 0, 0, 0, time.UTC),
UpdateTime: time.Date(0, 1, 1, 20, 0, 0, 0, time.UTC),
APIKeyID: "yo",
APISecretKey: "yoyo",
StocksJSONBasicAuth: "akkie:mypassword",
},
wantErr: false,
},
Expand All @@ -60,9 +66,11 @@ func TestNewConfig(t *testing.T) {
},
ClosedDaysOfTheWeek: []time.Weekday{},
ClosedDays: []time.Time{},
SymbolsUpdateTime: time.Date(0, 1, 1, 1, 23, 45, 0, time.UTC),
UpdateTime: time.Date(0, 1, 1, 12, 34, 56, 0, time.UTC),
APIKeyID: "hello",
APISecretKey: "world",
StocksJSONBasicAuth: "foo:bar",
},
wantErr: false,
},
Expand Down
18 changes: 17 additions & 1 deletion contrib/alpacabkfeeder/configs/envconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ import (
"time"
)

// APItoken and UpdateTime settings can be overridden by environment variables
// APItoken, UpdateTime, and Basic Auth of stocksJsonURL settings can be overridden by environment variables
// to flexibly re-run processes that are performed only at marketstore start-up/certain times of the day
// and not to write security-related configs directly in the configuration file.

// envOverride updates some configs by environment variables.
func envOverride(config *DefaultConfig) (*DefaultConfig, error) {
// override SymbolsUpdateTime
symbolsUpdateTime := os.Getenv("ALPACA_BROKER_FEEDER_SYMBOLS_UPDATE_TIME")
if symbolsUpdateTime != "" {
t, err := time.Parse(ctLayout, symbolsUpdateTime)
if err != nil {
return nil, err
}
config.SymbolsUpdateTime = t
}

// override UpdateTime
updateTime := os.Getenv("ALPACA_BROKER_FEEDER_UPDATE_TIME")
if updateTime != "" {
Expand All @@ -32,5 +42,11 @@ func envOverride(config *DefaultConfig) (*DefaultConfig, error) {
config.APISecretKey = apiSecretKey
}

// override the basic Auth of Stocks Json URL
stocksJSONBasicAuth := os.Getenv("ALPACA_BROKER_FEEDER_STOCKS_JSON_BASIC_AUTH")
if stocksJSONBasicAuth != "" {
config.StocksJSONBasicAuth = stocksJSONBasicAuth
}

return config, nil
}
4 changes: 2 additions & 2 deletions contrib/alpacabkfeeder/feed/backfill.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package feed

import (
v1 "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api/v1"
"time"

v1 "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api/v1"
"github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/symbols"
"github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/writer"
"github.com/alpacahq/marketstore/v4/utils/log"
Expand Down Expand Up @@ -61,7 +61,7 @@ func (b *Backfill) UpdateSymbols() {
log.Error("Alpaca Broker ListBars API call error. Err=%v", err)
return
}
log.Info("Alpaca ListBars API call: From=%v, To=%v, Symbols=%v",
log.Info("Alpaca ListBars API call: From=%v, To=%v, symbols=%v",
dateRange.From, dateRange.To, allSymbols[idx.From:idx.To],
)

Expand Down
Loading

0 comments on commit 19a890f

Please sign in to comment.