Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[errors] return errors instead of exiting process #79

Merged
merged 14 commits into from
Jun 18, 2018
Merged
6 changes: 5 additions & 1 deletion src/cmd/cmd_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cmd

import (
"flag"
"fmt"
"os"

"github.com/logv/sybil/src/sybil"
)
Expand All @@ -17,7 +19,9 @@ func RunAggregateCmdLine() {
sybil.FLAGS.DEBUG = true
sybil.Debug("AGGREGATING")

sybil.DecodeFlags()
if err := sybil.DecodeFlags(); err != nil {
fmt.Fprintln(os.Stderr, "aggregate: failed to decode flags:", err)
}
sybil.FLAGS.PRINT = true
sybil.FLAGS.ENCODE_RESULTS = false
sybil.Debug("AGGREGATING DIRS", dirs)
Expand Down
25 changes: 17 additions & 8 deletions src/cmd/cmd_digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,35 @@ package cmd

import (
"flag"
"fmt"
"os"

"github.com/logv/sybil/src/sybil"
"github.com/pkg/errors"
)

func RunDigestCmdLine() {
flag.Parse()

if sybil.FLAGS.TABLE == "" {
if err := runDigestCmdLine(&sybil.FLAGS); err != nil {
fmt.Fprintln(os.Stderr, errors.Wrap(err, "digest"))
os.Exit(1)
}
}

func runDigestCmdLine(flags *sybil.FlagDefs) error {
if flags.TABLE == "" {
flag.PrintDefaults()
return
return sybil.ErrMissingTable
}

if sybil.FLAGS.PROFILE {
if flags.PROFILE {
profile := sybil.RUN_PROFILER()
defer profile.Start().Stop()
}
t := sybil.GetTable(sybil.FLAGS.TABLE)
if !t.LoadTableInfo() {
sybil.Warn("Couldn't read table info, exiting early")
return
t := sybil.GetTable(flags.TABLE)
if err := t.LoadTableInfo(); err != nil {
return err
}
t.DigestRecords()
return t.DigestRecords()
}
37 changes: 26 additions & 11 deletions src/cmd/cmd_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,51 @@ package cmd

import (
"flag"
"fmt"
"os"
"strings"

"github.com/logv/sybil/src/sybil"
"github.com/pkg/errors"
)

func RunIndexCmdLine() {
var fInts = flag.String("int", "", "Integer values to index")
flag.Parse()
if sybil.FLAGS.TABLE == "" {
flag.PrintDefaults()
return
ints := strings.Split(*fInts, sybil.FLAGS.FIELD_SEPARATOR)
if err := runIndexCmdLine(&sybil.FLAGS, ints); err != nil {
fmt.Fprintln(os.Stderr, errors.Wrap(err, "index"))
os.Exit(1)
}
}

var ints []string
if *fInts != "" {
ints = strings.Split(*fInts, sybil.FLAGS.FIELD_SEPARATOR)
func runIndexCmdLine(flags *sybil.FlagDefs, ints []string) error {
if sybil.FLAGS.TABLE == "" {
flag.PrintDefaults()
return sybil.ErrMissingTable
}

sybil.FLAGS.UPDATE_TABLE_INFO = true

t := sybil.GetTable(sybil.FLAGS.TABLE)

t.LoadRecords(nil)
t.SaveTableInfo("info")
if _, err := t.LoadRecords(nil); err != nil {
return err
}
if err := t.SaveTableInfo("info"); err != nil {
return err
}
sybil.FLAGS.WRITE_BLOCK_INFO = true

loadSpec := t.NewLoadSpec()
for _, v := range ints {
loadSpec.Int(v)
err := loadSpec.Int(v)
if err != nil {
return err
}
}
if _, err := t.LoadRecords(&loadSpec); err != nil {
return err
}
t.LoadRecords(&loadSpec)
t.SaveTableInfo("info")
return t.SaveTableInfo("info")
}
123 changes: 76 additions & 47 deletions src/cmd/cmd_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ import (
"time"

"github.com/logv/sybil/src/sybil"
"github.com/pkg/errors"
)

type Dictionary map[string]interface{}

var JSON_PATH string

// how many times we try to grab table info when ingesting
var TABLE_INFO_GRABS = 10

func ingestDictionary(r *sybil.Record, recordmap *Dictionary, prefix string) {
func ingestDictionary(r *sybil.Record, recordmap *Dictionary, prefix string) error {
for k, v := range *recordmap {
keyName := fmt.Sprint(prefix, k)
_, ok := EXCLUDES[keyName]
Expand All @@ -31,25 +30,26 @@ func ingestDictionary(r *sybil.Record, recordmap *Dictionary, prefix string) {
}

prefixName := fmt.Sprint(keyName, "_")
var err error
switch iv := v.(type) {
case string:
if INT_CAST[keyName] {
val, err := strconv.ParseInt(iv, 10, 64)
if err == nil {
r.AddIntField(keyName, int64(val))
val, cerr := strconv.ParseInt(iv, 10, 64)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it called cerr? to help with nesting errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I shadow "err" then I can't populate err from line 33

if cerr == nil {
err = r.AddIntField(keyName, int64(val))
}
} else {
r.AddStrField(keyName, iv)
err = r.AddStrField(keyName, iv)

}
case int64:
r.AddIntField(keyName, int64(iv))
err = r.AddIntField(keyName, int64(iv))
case float64:
r.AddIntField(keyName, int64(iv))
err = r.AddIntField(keyName, int64(iv))
// nested fields
case map[string]interface{}:
d := Dictionary(iv)
ingestDictionary(r, &d, prefixName)
err = ingestDictionary(r, &d, prefixName)
// This is a set field
case []interface{}:
keyStrs := make([]string, 0)
Expand All @@ -64,25 +64,30 @@ func ingestDictionary(r *sybil.Record, recordmap *Dictionary, prefix string) {
}
}

r.AddSetField(keyName, keyStrs)
err = r.AddSetField(keyName, keyStrs)
case nil:
default:
sybil.Debug(fmt.Sprintf("TYPE %T IS UNKNOWN FOR FIELD", iv), keyName)
}
if err != nil {
// TODO: collect error counters?
sybil.Debug("INGEST RECORD ISSUE:", errors.Wrap(err, fmt.Sprintf("issue with field %v", keyName)))
}
}
return nil
}

var IMPORTED_COUNT = 0

func importCsvRecords() {
func importCsvRecords() error {
// For importing CSV records, we need to validate the headers, then we just
// read in and fill out record fields!
scanner := csv.NewReader(os.Stdin)
headerFields, err := scanner.Read()
if err == nil {
sybil.Debug("HEADER FIELDS FOR CSV ARE", headerFields)
} else {
sybil.Error("ERROR READING CSV HEADER", err)
return errors.Wrap(err, "error reading csv header")
}

t := sybil.GetTable(sybil.FLAGS.TABLE)
Expand Down Expand Up @@ -110,18 +115,24 @@ func importCsvRecords() {
continue
}

val, err := strconv.ParseFloat(v, 64)
if err == nil {
r.AddIntField(fieldName, int64(val))
var err error
val, cerr := strconv.ParseFloat(v, 64)
if cerr == nil {
err = r.AddIntField(fieldName, int64(val))
} else {
r.AddStrField(fieldName, v)
err = r.AddStrField(fieldName, v)
}
if err != nil {
sybil.Debug("INGEST RECORD ISSUE:", errors.Wrap(err, fmt.Sprintf("issue loading %v", fieldName)))
}

}

t.ChunkAndSave()
if err := t.ChunkAndSave(); err != nil {
// TODO: collect error counters?
sybil.Debug("INGEST RECORD ISSUE:", err)
}
}

return nil
}

func jsonQuery(obj *interface{}, path []string) []interface{} {
Expand Down Expand Up @@ -165,10 +176,10 @@ func jsonQuery(obj *interface{}, path []string) []interface{} {
return nil
}

func importJSONRecords() {
func importJSONRecords(jsonPath string) error {
t := sybil.GetTable(sybil.FLAGS.TABLE)

path := strings.Split(JSON_PATH, ".")
path := strings.Split(jsonPath, ".")
sybil.Debug("PATH IS", path)

dec := json.NewDecoder(os.Stdin)
Expand All @@ -188,21 +199,29 @@ func importJSONRecords() {
records := jsonQuery(&decoded, path)
decoded = nil

for _, ing := range records {
for i, ing := range records {
r := t.NewRecord()
var err error
switch dict := ing.(type) {
case map[string]interface{}:
ndict := Dictionary(dict)
ingestDictionary(r, &ndict, "")
err = ingestDictionary(r, &ndict, "")
case Dictionary:
ingestDictionary(r, &dict, "")

err = ingestDictionary(r, &dict, "")
}
if err != nil {
// TODO: collect error counters?
sybil.Debug("INGEST RECORD ISSUE:", errors.Wrap(err, fmt.Sprintf("issue with record %v", i)))
}
if err := t.ChunkAndSave(); err != nil {
// TODO: collect error counters?
sybil.Debug("INGEST RECORD ISSUE:", err)
}
t.ChunkAndSave()
}

}

return nil
}

var INT_CAST = make(map[string]bool)
Expand All @@ -218,51 +237,55 @@ func RunIngestCmdLine() {
flag.BoolVar(&sybil.FLAGS.SKIP_COMPACT, "skip-compact", false, "skip auto compaction during ingest")

flag.Parse()
if err := runIngestCmdLine(&sybil.FLAGS, *ingestfile, *fInts, *fCsv, *fExcludes, *fJSONPath, *fReopen); err != nil {
fmt.Fprintln(os.Stderr, errors.Wrap(err, "ingest"))
os.Exit(1)
}
}

digestfile := *ingestfile
func runIngestCmdLine(flags *sybil.FlagDefs, digestFile string, ints string, csv bool, excludes string, jsonPath string, filePath string) error {

if sybil.FLAGS.TABLE == "" {
if flags.TABLE == "" {
flag.PrintDefaults()
return
return sybil.ErrMissingTable
}

JSON_PATH = *fJSONPath
if filePath != "" {

if *fReopen != "" {

infile, err := os.OpenFile(*fReopen, syscall.O_RDONLY|syscall.O_CREAT, 0666)
infile, err := os.OpenFile(filePath, syscall.O_RDONLY|syscall.O_CREAT, 0666)
if err != nil {
sybil.Error("ERROR OPENING INFILE", err)
return errors.Wrap(err, "error opening infile")
}

os.Stdin = infile

}

if sybil.FLAGS.PROFILE {
if flags.PROFILE {
profile := sybil.RUN_PROFILER()
defer profile.Start().Stop()
}

for _, v := range strings.Split(*fInts, ",") {
for _, v := range strings.Split(ints, ",") {
INT_CAST[v] = true
}
for _, v := range strings.Split(*fExcludes, ",") {
for _, v := range strings.Split(excludes, ",") {
EXCLUDES[v] = true
}

for k := range EXCLUDES {
sybil.Debug("EXCLUDING COLUMN", k)
}

t := sybil.GetTable(sybil.FLAGS.TABLE)
t := sybil.GetTable(flags.TABLE)

// We have 5 tries to load table info, just in case the lock is held by
// someone else
var loadedTable = false
var loadErr error
for i := 0; i < TABLE_INFO_GRABS; i++ {
loaded := t.LoadTableInfo()
if loaded || !t.HasFlagFile() {
loadErr = t.LoadTableInfo()
if loadErr == nil || !t.HasFlagFile() {
loadedTable = true
break
}
Expand All @@ -272,15 +295,21 @@ func RunIngestCmdLine() {
if !loadedTable {
if t.HasFlagFile() {
sybil.Warn("INGESTOR COULDNT READ TABLE INFO, LOSING SAMPLES")
return
if loadErr == nil {
loadErr = fmt.Errorf("unknown (nil) error")
}
return errors.Wrap(loadErr, "issue loading existing table")
}
}

if !*fCsv {
importJSONRecords()
var err error
if !csv {
err = importJSONRecords(jsonPath)
} else {
importCsvRecords()
err = importCsvRecords()
}

t.IngestRecords(digestfile)
if err != nil {
return err
}
return t.IngestRecords(digestFile)
}
Loading