From 5d4fe6ca0734730a84d17e05c7de7702b1764f8d Mon Sep 17 00:00:00 2001 From: dakimura <34202807+dakimura@users.noreply.github.com> Date: Sun, 7 Aug 2022 10:37:47 +0900 Subject: [PATCH] feat(alpacabkfeeder): extended_hours config (#613) * feat(alpacabkfeeder): extended_hours config --- contrib/alpacabkfeeder/README.md | 16 +- contrib/alpacabkfeeder/alpacav2.go | 42 ++-- contrib/alpacabkfeeder/configs/config.go | 45 +++-- contrib/alpacabkfeeder/feed/time_checker.go | 45 ++--- .../alpacabkfeeder/feed/time_checker_test.go | 182 ++++++++---------- .../alpacabkfeeder/writer/snapshot_writer.go | 29 ++- .../writer/snapshot_writer_test.go | 96 +++++++-- 7 files changed, 275 insertions(+), 180 deletions(-) diff --git a/contrib/alpacabkfeeder/README.md b/contrib/alpacabkfeeder/README.md index f490934b..021d9c04 100644 --- a/contrib/alpacabkfeeder/README.md +++ b/contrib/alpacabkfeeder/README.md @@ -68,14 +68,22 @@ bgworkers: # Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes. # Whitespaces are ignored. off_hours_schedule: "0,15,30,45" - # Alpaca Broker API Feeder runs from openTime ~ closeTime (UTC) - openTime: "14:30:00" # 14:30(UTC) = 09:30 (EST) - closeTime: "21:00:00" # 21:00(UTC) = 16:00 (EST) + # (Deprecated) Alpaca Broker API Feeder runs from openTime ~ closeTime (UTC) + # openTime: "14:30:00" # 14:30(UTC) = 09:30 (EST) + # closeTime: "14:29:00" + # Alpaca Broker API Feeder runs between open_time_NY and close_time_NY + # (in "America/New_York" Location) + open_time_NY: "9:25:00" + close_time_NY: "16:10:00" + # When extended_hours is false, TICK data during the off-hours + # (= time < openTime and time > closeTime) are dropped and not stored in DB, + # even when off_hours_schedule is set. + extended_hours: false # Alpaca Broker API Feeder doesn't run on the following days of the week closedDaysOfTheWeek: - "Saturday" - "Sunday" - # Alpaca Broker API Feeder doesn't run on the closed dates (in JST) + # Alpaca Broker API Feeder doesn't run on the closed dates (in "America/New_York" location) # (cf. https://www.jpx.co.jp/corporate/about-jpx/calendar/ ) closedDays: - "2021-12-24" diff --git a/contrib/alpacabkfeeder/alpacav2.go b/contrib/alpacabkfeeder/alpacav2.go index 60260fd6..a4d73a68 100644 --- a/contrib/alpacabkfeeder/alpacav2.go +++ b/contrib/alpacabkfeeder/alpacav2.go @@ -34,12 +34,7 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { apiCli := apiClient(config) // init Market Time Checker - var timeChecker feed.MarketTimeChecker - timeChecker = feed.NewDefaultMarketTimeChecker( - config.ClosedDaysOfTheWeek, - config.ClosedDays, - config.OpenTime, - config.CloseTime) + var timeChecker feed.MarketTimeChecker = defaultTimeChecker(config) if config.OffHoursSchedule != "" { scheduleMin, err := feed.ParseSchedule(config.OffHoursSchedule) if err != nil { @@ -53,6 +48,11 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { timeChecker, scheduleMin, ) + + if !config.ExtendedHours { + log.Warn("[Alpaca Broker Feeder] both off_hours_schedule and extend_hours=false is set! " + + "off-hour records won't be stored.") + } } ctx := context.Background() @@ -72,12 +72,6 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { timer.RunEveryDayAt(ctx, config.SymbolsUpdateTime, sm.UpdateSymbols) log.Info("updated symbols using a remote json file.") - // init SnapshotWriter - var ssw writer.SnapshotWriter = writer.SnapshotWriterImpl{ - MarketStoreWriter: &writer.MarketStoreWriterImpl{}, - Timeframe: config.Timeframe, - Timezone: utils.InstanceConfig.Timezone, - } // init BarWriter var bw writer.BarWriter = writer.BarWriterImpl{ MarketStoreWriter: &writer.MarketStoreWriterImpl{}, @@ -99,7 +93,7 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { MarketTimeChecker: timeChecker, APIClient: apiCli, SymbolManager: sm, - SnapshotWriter: ssw, + SnapshotWriter: snapshotWriter(config), BarWriter: bw, Interval: config.Interval, }, nil @@ -120,4 +114,26 @@ func apiClient(config *configs.DefaultConfig) *api.Client { return api.NewClient(cred) } +func defaultTimeChecker(config *configs.DefaultConfig) *feed.DefaultMarketTimeChecker { + return feed.NewDefaultMarketTimeChecker( + config.ClosedDaysOfTheWeek, + config.ClosedDays, + config.OpenHourNY, config.OpenMinuteNY, + config.CloseHourNY, config.CloseMinuteNY) +} + +func snapshotWriter(config *configs.DefaultConfig) writer.SnapshotWriter { + var tc writer.MarketTimeChecker = &writer.NoopMarketTimeChecker{} + if !config.ExtendedHours { + tc = defaultTimeChecker(config) + } + + return writer.NewSnapshotWriterImpl( + &writer.MarketStoreWriterImpl{}, + config.Timeframe, + utils.InstanceConfig.Timezone, + tc, + ) +} + func main() {} diff --git a/contrib/alpacabkfeeder/configs/config.go b/contrib/alpacabkfeeder/configs/config.go index 61c1c304..095c4964 100644 --- a/contrib/alpacabkfeeder/configs/config.go +++ b/contrib/alpacabkfeeder/configs/config.go @@ -4,6 +4,8 @@ import ( "strings" "time" + "github.com/alpacahq/marketstore/v4/utils/log" + jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" ) @@ -22,19 +24,20 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary // DefaultConfig is the configuration for Alpaca Broker API Feeder you can define in // marketstore's config file through bgworker extension. type DefaultConfig struct { - Exchanges []Exchange `json:"exchanges"` - SymbolsUpdateTime time.Time `json:"symbols_update_time"` - UpdateTime time.Time `json:"update_time"` - StocksJSONURL string `json:"stocks_json_url"` - StocksJSONBasicAuth string `json:"stocks_json_basic_auth"` - Timeframe string `json:"timeframe"` - APIKeyID string `json:"api_key_id"` - APISecretKey string `json:"api_secret_key"` - OpenTime time.Time - CloseTime time.Time - ClosedDaysOfTheWeek []time.Weekday - ClosedDays []time.Time - Interval int `json:"interval"` + Exchanges []Exchange `json:"exchanges"` + SymbolsUpdateTime time.Time `json:"symbols_update_time"` + UpdateTime time.Time `json:"update_time"` + StocksJSONURL string `json:"stocks_json_url"` + StocksJSONBasicAuth string `json:"stocks_json_basic_auth"` + Timeframe string `json:"timeframe"` + APIKeyID string `json:"api_key_id"` + APISecretKey string `json:"api_secret_key"` + OpenHourNY, OpenMinuteNY int + CloseHourNY, CloseMinuteNY int + ExtendedHours bool + ClosedDaysOfTheWeek []time.Weekday + ClosedDays []time.Time + Interval int `json:"interval"` // The data-feeding is executed when 'minute' of the current time matches off_hours_schedule // even when the market is cloded. Example: "10" -> execute at 00:10, 01:10, 02:10,...,23:10 // Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes. @@ -84,8 +87,10 @@ func (c *DefaultConfig) UnmarshalJSON(input []byte) error { aux := &struct { SymbolsUpdateTime CustomTime `json:"symbols_update_time"` UpdateTime CustomTime `json:"update_time"` - OpenTime CustomTime `json:"openTime"` - CloseTime CustomTime `json:"closeTime"` + OpenTime CustomTime `json:"openTime"` // deprecated + CloseTime CustomTime `json:"closeTime"` // deprecated + OpenTimeNY CustomTime `json:"open_time_NY"` + CloseTimeNY CustomTime `json:"close_time_NY"` ClosedDaysOfTheWeek []weekday `json:"closedDaysOfTheWeek"` ClosedDays []CustomDay `json:"closedDays"` *Alias @@ -96,8 +101,14 @@ func (c *DefaultConfig) UnmarshalJSON(input []byte) error { } c.SymbolsUpdateTime = time.Time(aux.SymbolsUpdateTime) c.UpdateTime = time.Time(aux.UpdateTime) - c.OpenTime = time.Time(aux.OpenTime) - c.CloseTime = time.Time(aux.CloseTime) + if !time.Time(aux.OpenTime).IsZero() || !time.Time(aux.CloseTime).IsZero() { + log.Error("!!!!!!!!open_time and close_time config are DEPRECATED!!!!!!!! " + + "Please use open_time_NY and close_time_NY instead.") + return errors.New("!!!!!!!!open_time and close_time config are DEPRECATED!!!!!!!! " + + "Please use open_time_NY and close_time_NY instead.") + } + c.OpenHourNY, c.OpenMinuteNY, _ = time.Time(aux.OpenTimeNY).Clock() + c.CloseHourNY, c.CloseMinuteNY, _ = time.Time(aux.CloseTimeNY).Clock() c.ClosedDaysOfTheWeek = convertTime(aux.ClosedDaysOfTheWeek) c.ClosedDays = convertDate(aux.ClosedDays) diff --git a/contrib/alpacabkfeeder/feed/time_checker.go b/contrib/alpacabkfeeder/feed/time_checker.go index e083e777..b0b8778f 100644 --- a/contrib/alpacabkfeeder/feed/time_checker.go +++ b/contrib/alpacabkfeeder/feed/time_checker.go @@ -9,7 +9,7 @@ import ( "github.com/alpacahq/marketstore/v4/utils/log" ) -var jst = time.FixedZone("Asia/Tokyo", 9*60*60) +var ny, _ = time.LoadLocation("America/New_York") // MarketTimeChecker is an interface to check if the market is open at the specified time or not. type MarketTimeChecker interface { @@ -26,53 +26,44 @@ type MarketTimeChecker interface { // all those settings should be defined in this object. type DefaultMarketTimeChecker struct { // i.e. []string{"Saturday", "Sunday"} - ClosedDaysOfTheWeek []time.Weekday - ClosedDays []time.Time - OpenTime time.Time - CloseTime time.Time + ClosedDaysOfTheWeek []time.Weekday + ClosedDays []time.Time + OpenHourNY, OpenMinuteNY int + CloseHourNY, CloseMinuteNY int } // NewDefaultMarketTimeChecker initializes the DefaultMarketTimeChecker object with the specifier parameters.s. func NewDefaultMarketTimeChecker( closedDaysOfTheWeek []time.Weekday, closedDays []time.Time, - openTime time.Time, - closeTime time.Time, + openHourNY, openMinuteNY int, + closeHourNY, closeMinuteNY int, ) *DefaultMarketTimeChecker { return &DefaultMarketTimeChecker{ ClosedDaysOfTheWeek: closedDaysOfTheWeek, ClosedDays: closedDays, - OpenTime: openTime, - CloseTime: closeTime, + OpenHourNY: openHourNY, OpenMinuteNY: openMinuteNY, + CloseHourNY: closeHourNY, CloseMinuteNY: closeMinuteNY, } } // IsOpen returns true on weekdays from 08:55 to 15:10. // if closedDates are defined, return false on those days. func (m *DefaultMarketTimeChecker) IsOpen(t time.Time) bool { - timeInJst := t.In(jst) - return m.isOpenDate(timeInJst) && m.isOpenWeekDay(timeInJst) && m.isOpenTime(t) + tNY := t.In(ny) + return m.isOpenDate(tNY) && m.isOpenWeekDay(tNY) && m.isOpenTime(tNY) } // isOpenTime returns true if the specified time is between the OpenTime and the CloseTime. -func (m *DefaultMarketTimeChecker) isOpenTime(t time.Time) bool { - minFrom12am := t.Hour()*60 + t.Minute() +func (m *DefaultMarketTimeChecker) isOpenTime(nyT time.Time) bool { + nyTYear, nyTMonth, nyTDay := nyT.Date() + openTimeNY := time.Date(nyTYear, nyTMonth, nyTDay, m.OpenHourNY, m.OpenMinuteNY, 0, 0, ny) + closeTimeNY := time.Date(nyTYear, nyTMonth, nyTDay, m.CloseHourNY, m.CloseMinuteNY, 0, 0, ny) - openMinFrom12am := m.OpenTime.Hour()*60 + m.OpenTime.Minute() - closeMinFrom12am := m.CloseTime.Hour()*60 + m.CloseTime.Minute() - - // if the open hour is later than the close hour (i.e. open=23h, close=6h), +1day - if closeMinFrom12am < openMinFrom12am { - closeMinFrom12am += 24 * 60 - } - if minFrom12am < openMinFrom12am { - minFrom12am += 24 * 60 - } - - if minFrom12am < openMinFrom12am || minFrom12am >= closeMinFrom12am { + if nyT.Before(openTimeNY) || nyT.After(closeTimeNY) { log.Debug(fmt.Sprintf("[Alpaca Broker Feeder] market is not open. "+ - "openTime=%02d:%02d, closeTime=%02d:%02d, now=%v", - m.OpenTime.Hour(), m.OpenTime.Minute(), m.CloseTime.Hour(), m.CloseTime.Minute(), t)) + "openTime(NewYork)=%02d:%02d, closeTime(NewYork)=%02d:%02d, now=%v", + openTimeNY.Hour(), openTimeNY.Minute(), closeTimeNY.Hour(), closeTimeNY.Minute(), nyT)) return false } return true diff --git a/contrib/alpacabkfeeder/feed/time_checker_test.go b/contrib/alpacabkfeeder/feed/time_checker_test.go index dd83a8c8..1ff416a7 100644 --- a/contrib/alpacabkfeeder/feed/time_checker_test.go +++ b/contrib/alpacabkfeeder/feed/time_checker_test.go @@ -1,139 +1,104 @@ -package feed +package feed_test import ( "testing" "time" + + "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/feed" ) var ( ClosedDaysOfTheWeek = []time.Weekday{time.Saturday, time.Sunday} ClosedDays = []time.Time{ - // Marine day in Japan - time.Date(2019, 7, 15, 0, 0, 0, 0, time.UTC), - // Health and Sports day in Japan - time.Date(2019, 10, 14, 0, 0, 0, 0, time.UTC), + // Independence day + time.Date(2019, 7, 4, 0, 0, 0, 0, time.UTC), } + ny, _ = time.LoadLocation("America/New_York") + jst = time.FixedZone("Asia/Tokyo", 9*60*60) ) -// 5 minutes before the market opens in Japan (UTC). -var OpenTime = time.Date(0, 0, 0, 23, 55, 0, 0, time.UTC) - -// 10 minutes after the market closes in Japan (UTC). -var CloseTime = time.Date(0, 0, 0, 6, 10, 0, 0, time.UTC) - type testCase struct { - name string - arg time.Time - isOpen bool + name string + arg time.Time + openHour, openMinute int + closeHour, closeMinute int + wantIsOpen bool } func TestDefaultMarketTimeChecker_isOpen(t *testing.T) { t.Parallel() - // --- given --- - SUT := &DefaultMarketTimeChecker{ - ClosedDaysOfTheWeek, - ClosedDays, - OpenTime, - CloseTime, - } + // test cases tests := []testCase{ + // Sunday, 13 March 2022, 02:00:00 clocks were turned forward 1 hour to + // Sunday, 13 March 2022, 03:00:00 local daylight time instead. + // Sunday, 6 November 2022, 02:00:00 clocks are turned backward 1 hour to + // Sunday, 6 November 2022, 01:00:00 local standard time instead. + // 2019-07-14 = Sunday, EDT(UTC-0400) + // 2019-07-16 = Tuesday, EDT(UTC-0400) + // 2019-07-20 = Saturday, EDT(UTC-0400) { - "open", // 09:00, Tuesday in JST - time.Date(2019, 7, 16, 23, 55, 0, 0, time.UTC), true, + name: "open(2019-03-19(Tuesday) 20:00UTC = 2019-03-19 16:00EDT)", + arg: time.Date(2019, 3, 19, 20, 0o0, 0, 0, time.UTC), + openHour: 9, openMinute: 30, + closeHour: 16, closeMinute: 0, + wantIsOpen: true, }, { - "open", // 12:00, Tuesday in JST - time.Date(2019, 7, 16, 3, 0, 0, 0, time.UTC), true, + name: "open(2019-03-19(Tuesday) 13:30UTC = 2019-03-19 9:30EDT)", + arg: time.Date(2019, 3, 19, 13, 30, 0, 0, time.UTC), + openHour: 9, openMinute: 30, + closeHour: 16, closeMinute: 0, + wantIsOpen: true, }, { - "close", // 19:00 in JST - time.Date(2019, 7, 16, 6, 10, 0, 0, time.UTC), false, + name: "close(2019-03-19(Tuesday) 13:29UTC = 2019-03-19 9:29EDT)", + arg: time.Date(2019, 3, 19, 13, 29, 0, 0, time.UTC), + openHour: 9, openMinute: 30, + closeHour: 16, closeMinute: 0, + wantIsOpen: false, }, - { - "weekend", // Sunday - time.Date(2019, 7, 7, 0, 0, 0, 0, time.UTC), false, + name: "close(2019-01-15(Tuesday) 06:00JST = 2019-03-18 16:01EST)", + arg: time.Date(2019, 1, 15, 0o6, 0o1, 0, 0, jst), + openHour: 9, openMinute: 30, + closeHour: 16, closeMinute: 0, + wantIsOpen: false, }, { - "not weekend in JST", - time.Date(2019, 7, 7, 23, 56, 0, 0, time.UTC), true, + name: "open(2019-01-15(Tuesday) 14:30UTC = 2019-03-19 09:30EST)", + arg: time.Date(2019, 1, 15, 14, 30, 0, 0, time.UTC), + openHour: 9, openMinute: 30, + closeHour: 16, closeMinute: 0, + wantIsOpen: true, }, - { - "holiday", - time.Date(2019, 7, 15, 0, 0, 0, 0, time.UTC), false, + name: "open(2019-07-20 06:00 is Saturday in JST but 2019-07-19 16:00 is Friday in EDT", + arg: time.Date(2019, 7, 20, 5, 0, 0, 0, jst), + openHour: 9, openMinute: 30, + closeHour: 16, closeMinute: 0, + wantIsOpen: true, }, { - "not holiday in JST", - time.Date(2019, 7, 15, 23, 56, 0, 0, time.UTC), true, + name: "close(weekend. 2019-07-07 is Sunday)", + arg: time.Date(2019, 7, 7, 0, 0, 0, 0, time.UTC), + openHour: 9, openMinute: 30, + closeHour: 16, closeMinute: 0, + wantIsOpen: false, }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - // --- when --- - got := SUT.IsOpen(tt.arg) - - // --- then --- - if got != tt.isOpen { - t.Errorf("DefaultMarketTimeChecker.IsOpen() = %v, want %v", got, tt.isOpen) - } - }) - } -} - -type subTestCase struct { - name string - currentTime time.Time - businessDays int - expected time.Time - isErr bool -} - -func TestDefaultMarketTimeChecker_Sub(t *testing.T) { - t.Parallel() - // --- given --- - SUT := &DefaultMarketTimeChecker{ - ClosedDaysOfTheWeek, - ClosedDays, - OpenTime, - CloseTime, - } - // test cases - tests := []subTestCase{ { - "3 business days", // 2019-10-18 = Friday - time.Date(2019, 10, 18, 0, 0, 0, 0, time.UTC), 3, - time.Date(2019, 10, 15, 0, 0, 0, 0, time.UTC), false, + name: "close(holiday. 2019-07-04 is Monday but Independence day)", + arg: time.Date(2019, 7, 4, 12, 0, 0, 0, ny), + openHour: 9, openMinute: 30, + closeHour: 16, closeMinute: 0, + wantIsOpen: false, }, { - "Sunday and Saturday are not business days", // 2019-10-21 = Monday - time.Date(2019, 10, 21, 0, 0, 0, 0, time.UTC), 3, - time.Date(2019, 10, 16, 0, 0, 0, 0, time.UTC), false, - }, - { - "10/14(Mon) is a national holiday", - time.Date(2019, 10, 15, 0, 0, 0, 0, time.UTC), 3, - time.Date(2019, 10, 9, 0, 0, 0, 0, time.UTC), false, - }, - { - "We consider Friday is one business-day before Sunday", // 2019-10-18 = Friday - time.Date(2019, 10, 21, 0, 0, 0, 0, time.UTC), 1, - time.Date(2019, 10, 18, 0, 0, 0, 0, time.UTC), false, - }, - { - "businessDays argument should be a positive integer", - time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), -1, - time.Time{}, - true, - }, - { - "Current date is returned when businessDays = 0", - time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), 0, - time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), false, + name: "close(holiday. 2019-07-05 is not a national holiday in JST but Independence day in US)", + arg: time.Date(2019, 7, 5, 2, 0, 0, 0, jst), + openHour: 9, openMinute: 30, + closeHour: 16, closeMinute: 0, + wantIsOpen: false, }, } @@ -141,15 +106,20 @@ func TestDefaultMarketTimeChecker_Sub(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() + // --- given --- + SUT := &feed.DefaultMarketTimeChecker{ + ClosedDaysOfTheWeek: ClosedDaysOfTheWeek, + ClosedDays: ClosedDays, + OpenHourNY: tt.openHour, OpenMinuteNY: tt.openMinute, + CloseHourNY: tt.closeHour, CloseMinuteNY: tt.closeMinute, + } + // --- when --- - got, err := SUT.Sub(tt.currentTime, tt.businessDays) + got := SUT.IsOpen(tt.arg) // --- then --- - if got != tt.expected { - t.Errorf("DefaultMarketTimeChecker.Sub() = %v, want %v", got, tt.expected) - } - if (err != nil) != tt.isErr { - t.Errorf("DefaultMarketTimeChecker.Sub() returned %v error", err) + if got != tt.wantIsOpen { + t.Errorf("DefaultMarketTimeChecker.IsOpen() = %v, want %v", got, tt.wantIsOpen) } }) } diff --git a/contrib/alpacabkfeeder/writer/snapshot_writer.go b/contrib/alpacabkfeeder/writer/snapshot_writer.go index 84ec6c5f..a2b3ca7e 100644 --- a/contrib/alpacabkfeeder/writer/snapshot_writer.go +++ b/contrib/alpacabkfeeder/writer/snapshot_writer.go @@ -14,12 +14,34 @@ type SnapshotWriter interface { Write(snapshots map[string]*api.Snapshot) error } +type MarketTimeChecker interface { + IsOpen(t time.Time) bool +} + +// NoopMarketTimeChecker checks nothing and always returns IsOpen=true. +type NoopMarketTimeChecker struct{} + +func (nc *NoopMarketTimeChecker) IsOpen(t time.Time) bool { + return true +} + +func NewSnapshotWriterImpl(w MarketStoreWriter, tf string, tz *time.Location, tc MarketTimeChecker, +) *SnapshotWriterImpl { + return &SnapshotWriterImpl{ + MarketStoreWriter: w, + Timeframe: tf, + Timezone: tz, + TimeChecker: tc, + } +} + // SnapshotWriterImpl is an implementation of the SnapshotWriter interface. type SnapshotWriterImpl struct { MarketStoreWriter MarketStoreWriter Timeframe string // SnapshotWriterImpl writes data with the timezone - Timezone *time.Location + Timezone *time.Location + TimeChecker MarketTimeChecker } // Write converts the map(key:symbol, value:snapshot) to a ColumnSeriesMap and write it to the local marketstore server. @@ -50,6 +72,11 @@ func (q *SnapshotWriterImpl) convertToCSM(snapshots map[string]*api.Snapshot) io snapshot.LatestTrade.Timestamp, ).In(q.Timezone) + // drop all the off_hours records when extended_hours config is false + if !q.TimeChecker.IsOpen(latestTime) { + continue + } + // These additional fields are not always provided. // fill empty data to keep the number of columns in the CSM if snapshot.DailyBar == nil { diff --git a/contrib/alpacabkfeeder/writer/snapshot_writer_test.go b/contrib/alpacabkfeeder/writer/snapshot_writer_test.go index ca3bd102..231aa9e6 100644 --- a/contrib/alpacabkfeeder/writer/snapshot_writer_test.go +++ b/contrib/alpacabkfeeder/writer/snapshot_writer_test.go @@ -1,9 +1,13 @@ -package writer +package writer_test import ( + "sort" "testing" "time" + "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/feed" + "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/writer" + "github.com/pkg/errors" "github.com/stretchr/testify/require" @@ -46,13 +50,31 @@ var ( Close: 22, Volume: 23, } + + ny, _ = time.LoadLocation("America/New_York") + exampleCloseTimeQuote1 = &api.Quote{ + BidPrice: 1, + Timestamp: time.Date(2019, 7, 19, 9, 29, 0, 0, ny), // close + } + exampleOpenTimeQuote1 = &api.Quote{ + BidPrice: 1, + Timestamp: time.Date(2019, 7, 19, 9, 30, 0, 0, ny), // open + } + exampleOpenTimeQuote2 = &api.Quote{ + BidPrice: 2, + Timestamp: time.Date(2019, 7, 19, 9, 31, 0, 0, ny), // open + } + timeChecker = feed.NewDefaultMarketTimeChecker(nil, nil, + 9, 30, 16, 30, + ) ) func TestSnapshotWriterImpl_Write(t *testing.T) { t.Parallel() type fields struct { - Timeframe string - Timezone *time.Location + Timeframe string + Timezone *time.Location + TimeChecker writer.MarketTimeChecker } tests := []struct { name string @@ -67,8 +89,9 @@ func TestSnapshotWriterImpl_Write(t *testing.T) { { name: "OK/empty snapshot/snapshot with empty trade/quote is ignored", fields: fields{ - Timeframe: "1Sec", - Timezone: time.UTC, + Timeframe: "1Sec", + Timezone: time.UTC, + TimeChecker: &writer.NoopMarketTimeChecker{}, }, snapshots: map[string]*api.Snapshot{ "AAPL": { @@ -121,11 +144,60 @@ func TestSnapshotWriterImpl_Write(t *testing.T) { }, wantCSMLen: 1, }, + { + name: "OK/records in off-hour time must be dropped (extended_hours:false)", + fields: fields{ + Timeframe: "1Sec", + Timezone: time.UTC, + TimeChecker: timeChecker, + }, + snapshots: map[string]*api.Snapshot{ + "AAPL": {LatestTrade: exampleTrade, LatestQuote: exampleCloseTimeQuote1}, + "AMZN": {LatestTrade: exampleTrade, LatestQuote: exampleOpenTimeQuote1}, + "FB": {LatestTrade: exampleTrade, LatestQuote: exampleOpenTimeQuote2}, + }, + wantErr: false, + wantTBKs: []io.TimeBucketKey{ + *io.NewTimeBucketKey("AMZN/1Sec/TICK"), + *io.NewTimeBucketKey("FB/1Sec/TICK"), + }, + wantCSMDataShapes: []io.DataShape{ + {Name: "Epoch", Type: io.INT64}, + {Name: "QuoteTimestamp", Type: io.INT64}, + {Name: "Ask", Type: io.FLOAT32}, + {Name: "AskSize", Type: io.UINT32}, + {Name: "Bid", Type: io.FLOAT32}, + {Name: "BidSize", Type: io.UINT32}, + {Name: "TradeTimestamp", Type: io.INT64}, + {Name: "Price", Type: io.FLOAT32}, + {Name: "Size", Type: io.UINT32}, + {Name: "DailyTimestamp", Type: io.INT64}, + {Name: "Open", Type: io.FLOAT32}, + {Name: "High", Type: io.FLOAT32}, + {Name: "Low", Type: io.FLOAT32}, + {Name: "Close", Type: io.FLOAT32}, + {Name: "Volume", Type: io.UINT64}, + {Name: "MinuteTimestamp", Type: io.INT64}, + {Name: "MinuteOpen", Type: io.FLOAT32}, + {Name: "MinuteHigh", Type: io.FLOAT32}, + {Name: "MinuteLow", Type: io.FLOAT32}, + {Name: "MinuteClose", Type: io.FLOAT32}, + {Name: "MinuteVolume", Type: io.UINT64}, + {Name: "PreviousTimestamp", Type: io.INT64}, + {Name: "PreviousOpen", Type: io.FLOAT32}, + {Name: "PreviousHigh", Type: io.FLOAT32}, + {Name: "PreviousLow", Type: io.FLOAT32}, + {Name: "PreviousClose", Type: io.FLOAT32}, + {Name: "PreviousVolume", Type: io.UINT64}, + }, + wantCSMLen: 2, // AAPL record is dropped because it's off-hours + }, { name: "NG/failed to write to marketstore", fields: fields{ - Timeframe: "1Sec", - Timezone: time.UTC, + Timeframe: "1Sec", + Timezone: time.UTC, + TimeChecker: &writer.NoopMarketTimeChecker{}, }, snapshots: map[string]*api.Snapshot{ "AAPL": { @@ -144,16 +216,16 @@ func TestSnapshotWriterImpl_Write(t *testing.T) { msw := &internal.MockMarketStoreWriter{Err: tt.writeErr} - q := SnapshotWriterImpl{ - MarketStoreWriter: msw, - Timeframe: tt.fields.Timeframe, - Timezone: tt.fields.Timezone, - } + q := writer.NewSnapshotWriterImpl(msw, tt.fields.Timeframe, tt.fields.Timezone, tt.fields.TimeChecker) err := q.Write(tt.snapshots) require.Equal(t, tt.wantErr, err != nil) tbks := msw.WrittenCSM.GetMetadataKeys() if tt.wantTBKs != nil { + // sort tbks to ignore the order of keys + sort.SliceStable(tbks, func(i, j int) bool { + return tbks[i].String() < tbks[j].String() + }) require.Equal(t, tt.wantTBKs, tbks) }