Skip to content

Commit

Permalink
feat(alpacabkfeeder): extended_hours config (#613)
Browse files Browse the repository at this point in the history
* feat(alpacabkfeeder): extended_hours config
  • Loading branch information
dakimura authored Aug 7, 2022
1 parent d517934 commit 5d4fe6c
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 180 deletions.
16 changes: 12 additions & 4 deletions contrib/alpacabkfeeder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,22 @@ bgworkers:
# Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes.
# Whitespaces are ignored.
off_hours_schedule: "0,15,30,45"
# Alpaca Broker API Feeder runs from openTime ~ closeTime (UTC)
openTime: "14:30:00" # 14:30(UTC) = 09:30 (EST)
closeTime: "21:00:00" # 21:00(UTC) = 16:00 (EST)
# (Deprecated) Alpaca Broker API Feeder runs from openTime ~ closeTime (UTC)
# openTime: "14:30:00" # 14:30(UTC) = 09:30 (EST)
# closeTime: "14:29:00"
# Alpaca Broker API Feeder runs between open_time_NY and close_time_NY
# (in "America/New_York" Location)
open_time_NY: "9:25:00"
close_time_NY: "16:10:00"
# When extended_hours is false, TICK data during the off-hours
# (= time < openTime and time > closeTime) are dropped and not stored in DB,
# even when off_hours_schedule is set.
extended_hours: false
# Alpaca Broker API Feeder doesn't run on the following days of the week
closedDaysOfTheWeek:
- "Saturday"
- "Sunday"
# Alpaca Broker API Feeder doesn't run on the closed dates (in JST)
# Alpaca Broker API Feeder doesn't run on the closed dates (in "America/New_York" location)
# (cf. https://www.jpx.co.jp/corporate/about-jpx/calendar/ )
closedDays:
- "2021-12-24"
Expand Down
42 changes: 29 additions & 13 deletions contrib/alpacabkfeeder/alpacav2.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
apiCli := apiClient(config)

// init Market Time Checker
var timeChecker feed.MarketTimeChecker
timeChecker = feed.NewDefaultMarketTimeChecker(
config.ClosedDaysOfTheWeek,
config.ClosedDays,
config.OpenTime,
config.CloseTime)
var timeChecker feed.MarketTimeChecker = defaultTimeChecker(config)
if config.OffHoursSchedule != "" {
scheduleMin, err := feed.ParseSchedule(config.OffHoursSchedule)
if err != nil {
Expand All @@ -53,6 +48,11 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
timeChecker,
scheduleMin,
)

if !config.ExtendedHours {
log.Warn("[Alpaca Broker Feeder] both off_hours_schedule and extend_hours=false is set! " +
"off-hour records won't be stored.")
}
}

ctx := context.Background()
Expand All @@ -72,12 +72,6 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
timer.RunEveryDayAt(ctx, config.SymbolsUpdateTime, sm.UpdateSymbols)
log.Info("updated symbols using a remote json file.")

// init SnapshotWriter
var ssw writer.SnapshotWriter = writer.SnapshotWriterImpl{
MarketStoreWriter: &writer.MarketStoreWriterImpl{},
Timeframe: config.Timeframe,
Timezone: utils.InstanceConfig.Timezone,
}
// init BarWriter
var bw writer.BarWriter = writer.BarWriterImpl{
MarketStoreWriter: &writer.MarketStoreWriterImpl{},
Expand All @@ -99,7 +93,7 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
MarketTimeChecker: timeChecker,
APIClient: apiCli,
SymbolManager: sm,
SnapshotWriter: ssw,
SnapshotWriter: snapshotWriter(config),
BarWriter: bw,
Interval: config.Interval,
}, nil
Expand All @@ -120,4 +114,26 @@ func apiClient(config *configs.DefaultConfig) *api.Client {
return api.NewClient(cred)
}

func defaultTimeChecker(config *configs.DefaultConfig) *feed.DefaultMarketTimeChecker {
return feed.NewDefaultMarketTimeChecker(
config.ClosedDaysOfTheWeek,
config.ClosedDays,
config.OpenHourNY, config.OpenMinuteNY,
config.CloseHourNY, config.CloseMinuteNY)
}

func snapshotWriter(config *configs.DefaultConfig) writer.SnapshotWriter {
var tc writer.MarketTimeChecker = &writer.NoopMarketTimeChecker{}
if !config.ExtendedHours {
tc = defaultTimeChecker(config)
}

return writer.NewSnapshotWriterImpl(
&writer.MarketStoreWriterImpl{},
config.Timeframe,
utils.InstanceConfig.Timezone,
tc,
)
}

func main() {}
45 changes: 28 additions & 17 deletions contrib/alpacabkfeeder/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"strings"
"time"

"github.com/alpacahq/marketstore/v4/utils/log"

jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
)
Expand All @@ -22,19 +24,20 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary
// DefaultConfig is the configuration for Alpaca Broker API Feeder you can define in
// marketstore's config file through bgworker extension.
type DefaultConfig struct {
Exchanges []Exchange `json:"exchanges"`
SymbolsUpdateTime time.Time `json:"symbols_update_time"`
UpdateTime time.Time `json:"update_time"`
StocksJSONURL string `json:"stocks_json_url"`
StocksJSONBasicAuth string `json:"stocks_json_basic_auth"`
Timeframe string `json:"timeframe"`
APIKeyID string `json:"api_key_id"`
APISecretKey string `json:"api_secret_key"`
OpenTime time.Time
CloseTime time.Time
ClosedDaysOfTheWeek []time.Weekday
ClosedDays []time.Time
Interval int `json:"interval"`
Exchanges []Exchange `json:"exchanges"`
SymbolsUpdateTime time.Time `json:"symbols_update_time"`
UpdateTime time.Time `json:"update_time"`
StocksJSONURL string `json:"stocks_json_url"`
StocksJSONBasicAuth string `json:"stocks_json_basic_auth"`
Timeframe string `json:"timeframe"`
APIKeyID string `json:"api_key_id"`
APISecretKey string `json:"api_secret_key"`
OpenHourNY, OpenMinuteNY int
CloseHourNY, CloseMinuteNY int
ExtendedHours bool
ClosedDaysOfTheWeek []time.Weekday
ClosedDays []time.Time
Interval int `json:"interval"`
// The data-feeding is executed when 'minute' of the current time matches off_hours_schedule
// even when the market is cloded. Example: "10" -> execute at 00:10, 01:10, 02:10,...,23:10
// Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes.
Expand Down Expand Up @@ -84,8 +87,10 @@ func (c *DefaultConfig) UnmarshalJSON(input []byte) error {
aux := &struct {
SymbolsUpdateTime CustomTime `json:"symbols_update_time"`
UpdateTime CustomTime `json:"update_time"`
OpenTime CustomTime `json:"openTime"`
CloseTime CustomTime `json:"closeTime"`
OpenTime CustomTime `json:"openTime"` // deprecated
CloseTime CustomTime `json:"closeTime"` // deprecated
OpenTimeNY CustomTime `json:"open_time_NY"`
CloseTimeNY CustomTime `json:"close_time_NY"`
ClosedDaysOfTheWeek []weekday `json:"closedDaysOfTheWeek"`
ClosedDays []CustomDay `json:"closedDays"`
*Alias
Expand All @@ -96,8 +101,14 @@ func (c *DefaultConfig) UnmarshalJSON(input []byte) error {
}
c.SymbolsUpdateTime = time.Time(aux.SymbolsUpdateTime)
c.UpdateTime = time.Time(aux.UpdateTime)
c.OpenTime = time.Time(aux.OpenTime)
c.CloseTime = time.Time(aux.CloseTime)
if !time.Time(aux.OpenTime).IsZero() || !time.Time(aux.CloseTime).IsZero() {
log.Error("!!!!!!!!open_time and close_time config are DEPRECATED!!!!!!!! " +
"Please use open_time_NY and close_time_NY instead.")
return errors.New("!!!!!!!!open_time and close_time config are DEPRECATED!!!!!!!! " +
"Please use open_time_NY and close_time_NY instead.")
}
c.OpenHourNY, c.OpenMinuteNY, _ = time.Time(aux.OpenTimeNY).Clock()
c.CloseHourNY, c.CloseMinuteNY, _ = time.Time(aux.CloseTimeNY).Clock()
c.ClosedDaysOfTheWeek = convertTime(aux.ClosedDaysOfTheWeek)
c.ClosedDays = convertDate(aux.ClosedDays)

Expand Down
45 changes: 18 additions & 27 deletions contrib/alpacabkfeeder/feed/time_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/alpacahq/marketstore/v4/utils/log"
)

var jst = time.FixedZone("Asia/Tokyo", 9*60*60)
var ny, _ = time.LoadLocation("America/New_York")

// MarketTimeChecker is an interface to check if the market is open at the specified time or not.
type MarketTimeChecker interface {
Expand All @@ -26,53 +26,44 @@ type MarketTimeChecker interface {
// all those settings should be defined in this object.
type DefaultMarketTimeChecker struct {
// i.e. []string{"Saturday", "Sunday"}
ClosedDaysOfTheWeek []time.Weekday
ClosedDays []time.Time
OpenTime time.Time
CloseTime time.Time
ClosedDaysOfTheWeek []time.Weekday
ClosedDays []time.Time
OpenHourNY, OpenMinuteNY int
CloseHourNY, CloseMinuteNY int
}

// NewDefaultMarketTimeChecker initializes the DefaultMarketTimeChecker object with the specifier parameters.s.
func NewDefaultMarketTimeChecker(
closedDaysOfTheWeek []time.Weekday,
closedDays []time.Time,
openTime time.Time,
closeTime time.Time,
openHourNY, openMinuteNY int,
closeHourNY, closeMinuteNY int,
) *DefaultMarketTimeChecker {
return &DefaultMarketTimeChecker{
ClosedDaysOfTheWeek: closedDaysOfTheWeek,
ClosedDays: closedDays,
OpenTime: openTime,
CloseTime: closeTime,
OpenHourNY: openHourNY, OpenMinuteNY: openMinuteNY,
CloseHourNY: closeHourNY, CloseMinuteNY: closeMinuteNY,
}
}

// IsOpen returns true on weekdays from 08:55 to 15:10.
// if closedDates are defined, return false on those days.
func (m *DefaultMarketTimeChecker) IsOpen(t time.Time) bool {
timeInJst := t.In(jst)
return m.isOpenDate(timeInJst) && m.isOpenWeekDay(timeInJst) && m.isOpenTime(t)
tNY := t.In(ny)
return m.isOpenDate(tNY) && m.isOpenWeekDay(tNY) && m.isOpenTime(tNY)
}

// isOpenTime returns true if the specified time is between the OpenTime and the CloseTime.
func (m *DefaultMarketTimeChecker) isOpenTime(t time.Time) bool {
minFrom12am := t.Hour()*60 + t.Minute()
func (m *DefaultMarketTimeChecker) isOpenTime(nyT time.Time) bool {
nyTYear, nyTMonth, nyTDay := nyT.Date()
openTimeNY := time.Date(nyTYear, nyTMonth, nyTDay, m.OpenHourNY, m.OpenMinuteNY, 0, 0, ny)
closeTimeNY := time.Date(nyTYear, nyTMonth, nyTDay, m.CloseHourNY, m.CloseMinuteNY, 0, 0, ny)

openMinFrom12am := m.OpenTime.Hour()*60 + m.OpenTime.Minute()
closeMinFrom12am := m.CloseTime.Hour()*60 + m.CloseTime.Minute()

// if the open hour is later than the close hour (i.e. open=23h, close=6h), +1day
if closeMinFrom12am < openMinFrom12am {
closeMinFrom12am += 24 * 60
}
if minFrom12am < openMinFrom12am {
minFrom12am += 24 * 60
}

if minFrom12am < openMinFrom12am || minFrom12am >= closeMinFrom12am {
if nyT.Before(openTimeNY) || nyT.After(closeTimeNY) {
log.Debug(fmt.Sprintf("[Alpaca Broker Feeder] market is not open. "+
"openTime=%02d:%02d, closeTime=%02d:%02d, now=%v",
m.OpenTime.Hour(), m.OpenTime.Minute(), m.CloseTime.Hour(), m.CloseTime.Minute(), t))
"openTime(NewYork)=%02d:%02d, closeTime(NewYork)=%02d:%02d, now=%v",
openTimeNY.Hour(), openTimeNY.Minute(), closeTimeNY.Hour(), closeTimeNY.Minute(), nyT))
return false
}
return true
Expand Down
Loading

0 comments on commit 5d4fe6c

Please sign in to comment.