Skip to content

Commit

Permalink
From Xmurobi: Add new types and timeframes support (#218)
Browse files Browse the repository at this point in the history
* update go.mod to solve network blocking issue

* person vscode setup

* small change

* small change

* update

* add vscode setup

* update go.mod and go.sum

* remove go.sum and add .test in gitignore

* update

* upgrade slait bgworker and other bugs fix

* fix crash bug after connection lost

* make slait plugin work

* a temporary multiiple instance for WALFile in same rootDir

* export parameters to config file

* BUGFIX: csv import column missing

* BUGFIX: Column type missing cause display crash

* Multiple instanaces on one root to separate readers & writers support.

* remove debug print

* remove debug prints

* Upgrade show command for more features
Add Gap func to find the big hole in data.

* add help and remove clean code

* Use both Z-Score and specific threshold

* support timestamp parse in timeFormat

* WIP: partial merge

* Custom query timeframes not supported yet

* Fix tests, remove new features from /show

* Kick CI
  • Loading branch information
Notargets authored Jun 19, 2019
1 parent 4a06a2e commit a5e0d91
Show file tree
Hide file tree
Showing 34 changed files with 1,892 additions and 144 deletions.
11 changes: 2 additions & 9 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,5 @@
mktsdb/
marketstore
*.exe

# Python Virtual Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
*.test
.vscode
12 changes: 10 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ debug:
$(MAKE) debug -C contrib/bitmexfeeder
$(MAKE) debug -C contrib/binancefeeder
$(MAKE) debug -C contrib/iex
$(MAKE) debug -C contrib/xignitefeeder
go install -gcflags="all=-N -l" -ldflags "-X $(UTIL_PATH).Tag=$(DOCKER_TAG) -X $(UTIL_PATH).BuildStamp=$(shell date -u +%Y-%m-%d-%H-%M-%S) -X $(UTIL_PATH).GitHash=$(shell git rev-parse HEAD)" ./...

install: all
Expand Down Expand Up @@ -42,11 +43,17 @@ plugins:

unittest: install
go fmt ./...
go test ./...
$(MAKE) test
$(MAKE) integration-test

integration-test:
$(MAKE) -C tests/integ test

test:
go test ./...

image:
docker build -t alpacamarkets/marketstore.test .
docker build . -t marketstore:latest -f $(DOCKER_FILE_PATH)

runimage:
make -C tests/integ run IMAGE_NAME=alpacamarkets/marketstore.test
Expand All @@ -59,3 +66,4 @@ push:
docker login -u $(DOCKER_USER) -p $(DOCKER_PASS)
docker push alpacamarkets/marketstore:$(DOCKER_TAG)
docker push alpacamarkets/marketstore:latest

20 changes: 19 additions & 1 deletion cmd/connect/loader/all_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package loader

import (
. "gopkg.in/check.v1"
"testing"
"time"

. "gopkg.in/check.v1"
)

// Hook up gocheck into the "go test" runner.
Expand All @@ -28,3 +29,20 @@ func (s *LoaderTests) TestParseTime(c *C) {
tTest, err = parseTime(timeFormat, dateTime, tzLoc, formatAdj)
c.Assert(tt == tTest, Equals, true)
}

func (s *LoaderTests) TestParseTimestamp(c *C) {
tt := time.Date(2017, 11, 07, 07, 8, 23, 383000000, time.UTC)
var fAdj int
timeFormat := "timestamp"
dateTime := "1510038503.383"
tzLoc := time.UTC
tTest, err := parseTime(timeFormat, dateTime, tzLoc, fAdj)
c.Assert(err == nil, Equals, true)
c.Assert(tt == tTest, Equals, true)

tt1 := time.Date(2017, 11, 07, 07, 8, 23, 0, time.UTC)
dateTime = "1510038503"
tTest, err = parseTime(timeFormat, dateTime, tzLoc, fAdj)
c.Assert(err == nil, Equals, true)
c.Assert(tt1 == tTest, Equals, true)
}
1 change: 1 addition & 0 deletions cmd/connect/loader/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func readTimeColumns(csvData [][]string, columnIndex []int, conf *CSVConfig) (ep
}
if err != nil {
fmt.Printf("Error parsing Epoch column(s) from input data file: %s\n", err.Error())
fmt.Printf("rowTime %v, mustComposeEpoch %v, dateTime %v, format %v, loc %v, fromAdj %v", rowTime, mustComposeEpoch, dateTime, conf.TimeFormat, tzLoc, formatAdj)
return nil, nil
}
epochCol[i] = rowTime.UTC().Unix()
Expand Down
26 changes: 25 additions & 1 deletion cmd/connect/loader/time.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
package loader

import (
"math"
"strconv"
"strings"
"time"
)

func parseTime(format, dateTime string, tzLoc *time.Location, formatFixupState int) (parsedTime time.Time, err error) {

dateString := dateTime[:len(dateTime)-formatFixupState]
if tzLoc != nil {
if format == "timestamp" {
parts := strings.Split(dateTime, ".")
sec, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return time.Time{}, err
}

nsec := (int64)(0)
if len(parts) > 1 {
nsec, err = strconv.ParseInt(parts[1], 10, 64)
if err == nil {
nsec = (int64)(math.Pow10(9-len(parts[1]))) * nsec
} else {
return time.Time{}, err
}
}

parsedTime = time.Unix(sec, nsec)
if tzLoc != nil {
parsedTime = parsedTime.In(tzLoc)
}
formatFixupState = 0
} else if tzLoc != nil {
parsedTime, err = time.ParseInLocation(format, dateString, tzLoc)
if err != nil {
return time.Time{}, err
Expand Down
144 changes: 143 additions & 1 deletion cmd/connect/loader/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package loader

import (
"fmt"
"github.com/alpacahq/marketstore/utils/io"
"strconv"

"github.com/alpacahq/marketstore/utils/io"
)

func columnSeriesMapFromCSVData(csmInit io.ColumnSeriesMap, key io.TimeBucketKey, csvRows [][]string, columnIndex []int,
Expand All @@ -21,6 +22,12 @@ func columnSeriesMapFromCSVData(csmInit io.ColumnSeriesMap, key io.TimeBucketKey
We skip the first column, as it's the Epoch and we parse that independently
*/
switch shape.Type {
case io.STRING:
col, err := getStringColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
return nil
}
csm.AddColumn(key, shape.Name, col)
case io.FLOAT32:
col, err := getFloat32ColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
Expand All @@ -33,6 +40,18 @@ func columnSeriesMapFromCSVData(csmInit io.ColumnSeriesMap, key io.TimeBucketKey
return nil
}
csm.AddColumn(key, shape.Name, col)
case io.BYTE:
col, err := getInt8ColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
return nil
}
csm.AddColumn(key, shape.Name, col)
case io.INT16:
col, err := getInt16ColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
return nil
}
csm.AddColumn(key, shape.Name, col)
case io.INT32:
col, err := getInt32ColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
Expand All @@ -45,7 +64,39 @@ func columnSeriesMapFromCSVData(csmInit io.ColumnSeriesMap, key io.TimeBucketKey
return nil
}
csm.AddColumn(key, shape.Name, col)
case io.UINT8:
col, err := getUInt8ColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
return nil
}
csm.AddColumn(key, shape.Name, col)
case io.UINT16:
col, err := getUInt16ColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
return nil
}
csm.AddColumn(key, shape.Name, col)
case io.UINT32:
col, err := getUInt32ColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
return nil
}
csm.AddColumn(key, shape.Name, col)
case io.UINT64:
col, err := getUInt64ColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
return nil
}
csm.AddColumn(key, shape.Name, col)
case io.BOOL:
col, err := getBoolColumnFromCSVRows(csvRows, index)
if columnError(err, shape.Name) {
return nil
}
csm.AddColumn(key, shape.Name, col)

}

}
}
return csm
Expand All @@ -59,6 +110,26 @@ func columnError(err error, name string) bool {
return false
}

func getBoolColumnFromCSVRows(csvRows [][]string, index int) (col []bool, err error) {
col = make([]bool, len(csvRows))
for i, row := range csvRows {
val, err := strconv.ParseBool(row[index])
if err != nil {
return nil, err
}
col[i] = bool(val)
}
return col, nil
}

func getStringColumnFromCSVRows(csvRows [][]string, index int) (col []string, err error) {
col = make([]string, len(csvRows))
for i, row := range csvRows {
col[i] = row[index]
}
return col, nil
}

func getFloat32ColumnFromCSVRows(csvRows [][]string, index int) (col []float32, err error) {
col = make([]float32, len(csvRows))
for i, row := range csvRows {
Expand All @@ -82,6 +153,30 @@ func getFloat64ColumnFromCSVRows(csvRows [][]string, index int) (col []float64,
return col, nil
}

func getInt8ColumnFromCSVRows(csvRows [][]string, index int) (col []int8, err error) {
col = make([]int8, len(csvRows))
for i, row := range csvRows {
val, err := strconv.ParseInt(row[index], 10, 8)
if err != nil {
return nil, err
}
col[i] = int8(val)
}
return col, nil
}

func getInt16ColumnFromCSVRows(csvRows [][]string, index int) (col []int16, err error) {
col = make([]int16, len(csvRows))
for i, row := range csvRows {
val, err := strconv.ParseInt(row[index], 10, 16)
if err != nil {
return nil, err
}
col[i] = int16(val)
}
return col, nil
}

func getInt32ColumnFromCSVRows(csvRows [][]string, index int) (col []int32, err error) {
col = make([]int32, len(csvRows))
for i, row := range csvRows {
Expand All @@ -104,3 +199,50 @@ func getInt64ColumnFromCSVRows(csvRows [][]string, index int) (col []int64, err
}
return col, nil
}

func getUInt8ColumnFromCSVRows(csvRows [][]string, index int) (col []uint8, err error) {
col = make([]uint8, len(csvRows))
for i, row := range csvRows {
val, err := strconv.ParseUint(row[index], 10, 8)
if err != nil {
return nil, err
}
col[i] = uint8(val)
}
return col, nil
}

func getUInt16ColumnFromCSVRows(csvRows [][]string, index int) (col []uint16, err error) {
col = make([]uint16, len(csvRows))
for i, row := range csvRows {
val, err := strconv.ParseUint(row[index], 10, 16)
if err != nil {
return nil, err
}
col[i] = uint16(val)
}
return col, nil
}

func getUInt32ColumnFromCSVRows(csvRows [][]string, index int) (col []uint32, err error) {
col = make([]uint32, len(csvRows))
for i, row := range csvRows {
val, err := strconv.ParseUint(row[index], 10, 32)
if err != nil {
return nil, err
}
col[i] = uint32(val)
}
return col, nil
}

func getUInt64ColumnFromCSVRows(csvRows [][]string, index int) (col []uint64, err error) {
col = make([]uint64, len(csvRows))
for i, row := range csvRows {
col[i], err = strconv.ParseUint(row[index], 10, 64)
if err != nil {
return nil, err
}
}
return col, nil
}
43 changes: 39 additions & 4 deletions cmd/connect/session/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ EVAL:
c.show(line)
case strings.HasPrefix(line, "\\trim"):
c.trim(line)
case strings.HasPrefix(line, "\\gaps"):
c.findGaps(line)
case strings.HasPrefix(line, "\\load"):
c.load(line)
case strings.HasPrefix(line, "\\create"):
Expand Down Expand Up @@ -295,15 +293,38 @@ func printResult(queryText string, cs *dbio.ColumnSeries, optionalFile ...string
case reflect.Float64:
val := col.([]float64)[i]
element = strconv.FormatFloat(val, 'f', -1, 32)
case reflect.Int8:
val := col.([]int8)[i]
element = strconv.FormatInt(int64(val), 10)
case reflect.Int16:
val := col.([]int16)[i]
element = strconv.FormatInt(int64(val), 10)
case reflect.Int32:
val := col.([]int32)[i]
element = strconv.FormatInt(int64(val), 10)
case reflect.Int64:
val := col.([]int64)[i]
element = strconv.FormatInt(val, 10)
case reflect.Uint8:
val := col.([]byte)[i]
element = strconv.FormatInt(int64(val), 10)
val := col.([]uint8)[i]
element = strconv.FormatUint(uint64(val), 10)
case reflect.Uint16:
val := col.([]uint16)[i]
element = strconv.FormatUint(uint64(val), 10)
case reflect.Uint32:
val := col.([]uint32)[i]
element = strconv.FormatUint(uint64(val), 10)
case reflect.Uint64:
val := col.([]uint64)[i]
element = strconv.FormatUint(val, 10)
case reflect.Bool:
val := col.([]bool)[i]
if val {
element = "TRUE"
} else {
element = "FALSE"
}

}
element = fmt.Sprintf("%-10s", element)
}
Expand Down Expand Up @@ -349,12 +370,26 @@ func formatHeader(cs *dbio.ColumnSeries, printChar string) string {
appendChars(10)
case reflect.Float64:
appendChars(10)
case reflect.Int8:
appendChars(10)
case reflect.Int16:
appendChars(10)
case reflect.Int32:
appendChars(10)
case reflect.Int64:
appendChars(10)
case reflect.Uint8:
appendChars(10)
case reflect.Uint16:
appendChars(10)
case reflect.Uint32:
appendChars(10)
case reflect.Uint64:
appendChars(10)
case reflect.String:
appendChars(10)
case reflect.Bool:
appendChars(10)
}
}
return buffer.String()
Expand Down
Loading

0 comments on commit a5e0d91

Please sign in to comment.