diff --git a/src/cmd/cmd_aggregate.go b/src/cmd/cmd_aggregate.go index 63b858f1..dddfc9e2 100644 --- a/src/cmd/cmd_aggregate.go +++ b/src/cmd/cmd_aggregate.go @@ -2,6 +2,8 @@ package cmd import ( "flag" + "fmt" + "os" "github.com/logv/sybil/src/sybil" ) @@ -17,7 +19,9 @@ func RunAggregateCmdLine() { sybil.FLAGS.DEBUG = true sybil.Debug("AGGREGATING") - sybil.DecodeFlags() + if err := sybil.DecodeFlags(); err != nil { + fmt.Fprintln(os.Stderr, "aggregate: failed to decode flags:", err) + } sybil.FLAGS.PRINT = true sybil.FLAGS.ENCODE_RESULTS = false sybil.Debug("AGGREGATING DIRS", dirs) diff --git a/src/cmd/cmd_digest.go b/src/cmd/cmd_digest.go index 9609dd2e..eed9262d 100644 --- a/src/cmd/cmd_digest.go +++ b/src/cmd/cmd_digest.go @@ -2,26 +2,35 @@ package cmd import ( "flag" + "fmt" + "os" "github.com/logv/sybil/src/sybil" + "github.com/pkg/errors" ) func RunDigestCmdLine() { flag.Parse() - if sybil.FLAGS.TABLE == "" { + if err := runDigestCmdLine(&sybil.FLAGS); err != nil { + fmt.Fprintln(os.Stderr, errors.Wrap(err, "digest")) + os.Exit(1) + } +} + +func runDigestCmdLine(flags *sybil.FlagDefs) error { + if flags.TABLE == "" { flag.PrintDefaults() - return + return sybil.ErrMissingTable } - if sybil.FLAGS.PROFILE { + if flags.PROFILE { profile := sybil.RUN_PROFILER() defer profile.Start().Stop() } - t := sybil.GetTable(sybil.FLAGS.TABLE) - if !t.LoadTableInfo() { - sybil.Warn("Couldn't read table info, exiting early") - return + t := sybil.GetTable(flags.TABLE) + if err := t.LoadTableInfo(); err != nil { + return err } - t.DigestRecords() + return t.DigestRecords() } diff --git a/src/cmd/cmd_index.go b/src/cmd/cmd_index.go index f547f951..9889d4e5 100644 --- a/src/cmd/cmd_index.go +++ b/src/cmd/cmd_index.go @@ -2,36 +2,51 @@ package cmd import ( "flag" + "fmt" + "os" "strings" "github.com/logv/sybil/src/sybil" + "github.com/pkg/errors" ) func RunIndexCmdLine() { var fInts = flag.String("int", "", "Integer values to index") flag.Parse() - if sybil.FLAGS.TABLE == "" { - flag.PrintDefaults() - return + ints := strings.Split(*fInts, sybil.FLAGS.FIELD_SEPARATOR) + if err := runIndexCmdLine(&sybil.FLAGS, ints); err != nil { + fmt.Fprintln(os.Stderr, errors.Wrap(err, "index")) + os.Exit(1) } +} - var ints []string - if *fInts != "" { - ints = strings.Split(*fInts, sybil.FLAGS.FIELD_SEPARATOR) +func runIndexCmdLine(flags *sybil.FlagDefs, ints []string) error { + if sybil.FLAGS.TABLE == "" { + flag.PrintDefaults() + return sybil.ErrMissingTable } sybil.FLAGS.UPDATE_TABLE_INFO = true t := sybil.GetTable(sybil.FLAGS.TABLE) - t.LoadRecords(nil) - t.SaveTableInfo("info") + if _, err := t.LoadRecords(nil); err != nil { + return err + } + if err := t.SaveTableInfo("info"); err != nil { + return err + } sybil.FLAGS.WRITE_BLOCK_INFO = true loadSpec := t.NewLoadSpec() for _, v := range ints { - loadSpec.Int(v) + err := loadSpec.Int(v) + if err != nil { + return err + } + } + if _, err := t.LoadRecords(&loadSpec); err != nil { + return err } - t.LoadRecords(&loadSpec) - t.SaveTableInfo("info") + return t.SaveTableInfo("info") } diff --git a/src/cmd/cmd_ingest.go b/src/cmd/cmd_ingest.go index 5ebc9c02..a2854cb7 100644 --- a/src/cmd/cmd_ingest.go +++ b/src/cmd/cmd_ingest.go @@ -13,16 +13,15 @@ import ( "time" "github.com/logv/sybil/src/sybil" + "github.com/pkg/errors" ) type Dictionary map[string]interface{} -var JSON_PATH string - // how many times we try to grab table info when ingesting var TABLE_INFO_GRABS = 10 -func ingestDictionary(r *sybil.Record, recordmap *Dictionary, prefix string) { +func ingestDictionary(r *sybil.Record, recordmap *Dictionary, prefix string) error { for k, v := range *recordmap { keyName := fmt.Sprint(prefix, k) _, ok := EXCLUDES[keyName] @@ -31,25 +30,26 @@ func ingestDictionary(r *sybil.Record, recordmap *Dictionary, prefix string) { } prefixName := fmt.Sprint(keyName, "_") + var err error switch iv := v.(type) { case string: if INT_CAST[keyName] { - val, err := strconv.ParseInt(iv, 10, 64) - if err == nil { - r.AddIntField(keyName, int64(val)) + val, cerr := strconv.ParseInt(iv, 10, 64) + if cerr == nil { + err = r.AddIntField(keyName, int64(val)) } } else { - r.AddStrField(keyName, iv) + err = r.AddStrField(keyName, iv) } case int64: - r.AddIntField(keyName, int64(iv)) + err = r.AddIntField(keyName, int64(iv)) case float64: - r.AddIntField(keyName, int64(iv)) + err = r.AddIntField(keyName, int64(iv)) // nested fields case map[string]interface{}: d := Dictionary(iv) - ingestDictionary(r, &d, prefixName) + err = ingestDictionary(r, &d, prefixName) // This is a set field case []interface{}: keyStrs := make([]string, 0) @@ -64,17 +64,22 @@ func ingestDictionary(r *sybil.Record, recordmap *Dictionary, prefix string) { } } - r.AddSetField(keyName, keyStrs) + err = r.AddSetField(keyName, keyStrs) case nil: default: sybil.Debug(fmt.Sprintf("TYPE %T IS UNKNOWN FOR FIELD", iv), keyName) } + if err != nil { + // TODO: collect error counters? + sybil.Debug("INGEST RECORD ISSUE:", errors.Wrap(err, fmt.Sprintf("issue with field %v", keyName))) + } } + return nil } var IMPORTED_COUNT = 0 -func importCsvRecords() { +func importCsvRecords() error { // For importing CSV records, we need to validate the headers, then we just // read in and fill out record fields! scanner := csv.NewReader(os.Stdin) @@ -82,7 +87,7 @@ func importCsvRecords() { if err == nil { sybil.Debug("HEADER FIELDS FOR CSV ARE", headerFields) } else { - sybil.Error("ERROR READING CSV HEADER", err) + return errors.Wrap(err, "error reading csv header") } t := sybil.GetTable(sybil.FLAGS.TABLE) @@ -110,18 +115,24 @@ func importCsvRecords() { continue } - val, err := strconv.ParseFloat(v, 64) - if err == nil { - r.AddIntField(fieldName, int64(val)) + var err error + val, cerr := strconv.ParseFloat(v, 64) + if cerr == nil { + err = r.AddIntField(fieldName, int64(val)) } else { - r.AddStrField(fieldName, v) + err = r.AddStrField(fieldName, v) + } + if err != nil { + sybil.Debug("INGEST RECORD ISSUE:", errors.Wrap(err, fmt.Sprintf("issue loading %v", fieldName))) } - } - t.ChunkAndSave() + if err := t.ChunkAndSave(); err != nil { + // TODO: collect error counters? + sybil.Debug("INGEST RECORD ISSUE:", err) + } } - + return nil } func jsonQuery(obj *interface{}, path []string) []interface{} { @@ -165,10 +176,10 @@ func jsonQuery(obj *interface{}, path []string) []interface{} { return nil } -func importJSONRecords() { +func importJSONRecords(jsonPath string) error { t := sybil.GetTable(sybil.FLAGS.TABLE) - path := strings.Split(JSON_PATH, ".") + path := strings.Split(jsonPath, ".") sybil.Debug("PATH IS", path) dec := json.NewDecoder(os.Stdin) @@ -188,21 +199,29 @@ func importJSONRecords() { records := jsonQuery(&decoded, path) decoded = nil - for _, ing := range records { + for i, ing := range records { r := t.NewRecord() + var err error switch dict := ing.(type) { case map[string]interface{}: ndict := Dictionary(dict) - ingestDictionary(r, &ndict, "") + err = ingestDictionary(r, &ndict, "") case Dictionary: - ingestDictionary(r, &dict, "") - + err = ingestDictionary(r, &dict, "") + } + if err != nil { + // TODO: collect error counters? + sybil.Debug("INGEST RECORD ISSUE:", errors.Wrap(err, fmt.Sprintf("issue with record %v", i))) + } + if err := t.ChunkAndSave(); err != nil { + // TODO: collect error counters? + sybil.Debug("INGEST RECORD ISSUE:", err) } - t.ChunkAndSave() } } + return nil } var INT_CAST = make(map[string]bool) @@ -218,36 +237,39 @@ func RunIngestCmdLine() { flag.BoolVar(&sybil.FLAGS.SKIP_COMPACT, "skip-compact", false, "skip auto compaction during ingest") flag.Parse() + if err := runIngestCmdLine(&sybil.FLAGS, *ingestfile, *fInts, *fCsv, *fExcludes, *fJSONPath, *fReopen); err != nil { + fmt.Fprintln(os.Stderr, errors.Wrap(err, "ingest")) + os.Exit(1) + } +} - digestfile := *ingestfile +func runIngestCmdLine(flags *sybil.FlagDefs, digestFile string, ints string, csv bool, excludes string, jsonPath string, filePath string) error { - if sybil.FLAGS.TABLE == "" { + if flags.TABLE == "" { flag.PrintDefaults() - return + return sybil.ErrMissingTable } - JSON_PATH = *fJSONPath + if filePath != "" { - if *fReopen != "" { - - infile, err := os.OpenFile(*fReopen, syscall.O_RDONLY|syscall.O_CREAT, 0666) + infile, err := os.OpenFile(filePath, syscall.O_RDONLY|syscall.O_CREAT, 0666) if err != nil { - sybil.Error("ERROR OPENING INFILE", err) + return errors.Wrap(err, "error opening infile") } os.Stdin = infile } - if sybil.FLAGS.PROFILE { + if flags.PROFILE { profile := sybil.RUN_PROFILER() defer profile.Start().Stop() } - for _, v := range strings.Split(*fInts, ",") { + for _, v := range strings.Split(ints, ",") { INT_CAST[v] = true } - for _, v := range strings.Split(*fExcludes, ",") { + for _, v := range strings.Split(excludes, ",") { EXCLUDES[v] = true } @@ -255,14 +277,15 @@ func RunIngestCmdLine() { sybil.Debug("EXCLUDING COLUMN", k) } - t := sybil.GetTable(sybil.FLAGS.TABLE) + t := sybil.GetTable(flags.TABLE) // We have 5 tries to load table info, just in case the lock is held by // someone else var loadedTable = false + var loadErr error for i := 0; i < TABLE_INFO_GRABS; i++ { - loaded := t.LoadTableInfo() - if loaded || !t.HasFlagFile() { + loadErr = t.LoadTableInfo() + if loadErr == nil || !t.HasFlagFile() { loadedTable = true break } @@ -272,15 +295,21 @@ func RunIngestCmdLine() { if !loadedTable { if t.HasFlagFile() { sybil.Warn("INGESTOR COULDNT READ TABLE INFO, LOSING SAMPLES") - return + if loadErr == nil { + loadErr = fmt.Errorf("unknown (nil) error") + } + return errors.Wrap(loadErr, "issue loading existing table") } } - if !*fCsv { - importJSONRecords() + var err error + if !csv { + err = importJSONRecords(jsonPath) } else { - importCsvRecords() + err = importCsvRecords() } - - t.IngestRecords(digestfile) + if err != nil { + return err + } + return t.IngestRecords(digestFile) } diff --git a/src/cmd/cmd_inspect.go b/src/cmd/cmd_inspect.go index c74f22c5..9e4d3df7 100644 --- a/src/cmd/cmd_inspect.go +++ b/src/cmd/cmd_inspect.go @@ -2,82 +2,86 @@ package cmd import ( "flag" + "fmt" + "os" "strconv" "github.com/logv/sybil/src/sybil" + "github.com/pkg/errors" ) -func decodeTableInfo(digestFile *string) bool { - dec := sybil.GetFileDecoder(*digestFile) +func decodeTableInfo(digestFile *string) error { + dec, err := sybil.GetFileDecoder(*digestFile) + if err != nil { + return err + } savedTable := sybil.Table{} - err := dec.Decode(&savedTable) + if err = dec.Decode(&savedTable); err != nil { + return err + } - if err != nil || len(savedTable.KeyTable) == 0 { - return false + if len(savedTable.KeyTable) == 0 { + return fmt.Errorf("empty keytable") } sybil.Print("TABLE INFO", savedTable) - return true - + return nil } -func decodeInfoCol(digestFile *string) bool { - dec := sybil.GetFileDecoder(*digestFile) +func decodeInfoCol(digestFile *string) error { + dec, err := sybil.GetFileDecoder(*digestFile) + if err != nil { + return err + } info := sybil.SavedColumnInfo{} - err := dec.Decode(&info) - - if err != nil { - sybil.Print("ERROR", err) - return false + if err := dec.Decode(&info); err != nil { + return err } sybil.Print("INFO COL", info) - return true - + return nil } -func decodeIntCol(digestFile *string) bool { - dec := sybil.GetFileDecoder(*digestFile) +func decodeIntCol(digestFile *string) error { + dec, err := sybil.GetFileDecoder(*digestFile) + if err != nil { + return err + } info := sybil.SavedIntColumn{} - err := dec.Decode(&info) - - if err != nil { - sybil.Print("ERROR", err) - return false + if err := dec.Decode(&info); err != nil { + return err } sybil.Print("INT COL", info) - return true - + return nil } -func decodeStrCol(digestFile *string) bool { - dec := sybil.GetFileDecoder(*digestFile) +func decodeStrCol(digestFile *string) error { + dec, err := sybil.GetFileDecoder(*digestFile) + if err != nil { + return err + } info := sybil.SavedStrColumn{} - err := dec.Decode(&info) + if err := dec.Decode(&info); err != nil { + return err + } bins := make([]string, 0) for _, bin := range info.Bins { bins = append(bins, strconv.FormatInt(int64(len(bin.Records)), 10)) } - if err != nil { - sybil.Print("ERROR", err) - return false - } - sybil.Print("STR COL", info) sybil.Print("BINS ARE", bins) - return true - + return nil } // TODO: make a list of potential types that can be decoded into @@ -85,23 +89,44 @@ func RunInspectCmdLine() { digestFile := flag.String("file", "", "Name of file to inspect") flag.Parse() - if *digestFile == "" || digestFile == nil { + if *digestFile == "" { sybil.Print("Please specify a file to inspect with the -file flag") return } + if err := runInspectCmdLine(digestFile); err != nil { + fmt.Fprintln(os.Stderr, errors.Wrap(err, "query")) + os.Exit(1) + } +} - if decodeTableInfo(digestFile) { - return +func runInspectCmdLine(digestFile *string) error { + err := decodeTableInfo(digestFile) + if err == nil { + return nil + } else { + sybil.Debug("inspect encountered:", err) } - if decodeInfoCol(digestFile) { - return + err = decodeInfoCol(digestFile) + if err == nil { + return nil + } else { + sybil.Debug("inspect encountered:", err) } - if decodeStrCol(digestFile) { - return + + err = decodeStrCol(digestFile) + if err == nil { + return nil + } else { + sybil.Debug("inspect encountered:", err) } - if decodeIntCol(digestFile) { - return + + err = decodeIntCol(digestFile) + if err == nil { + return nil + } else { + sybil.Debug("inspect encountered:", err) } + return fmt.Errorf("no decoding succeeded") } diff --git a/src/cmd/cmd_query.go b/src/cmd/cmd_query.go index e92d8efa..90eff390 100644 --- a/src/cmd/cmd_query.go +++ b/src/cmd/cmd_query.go @@ -10,6 +10,7 @@ import ( "time" "github.com/logv/sybil/src/sybil" + "github.com/pkg/errors" ) var MAX_RECORDS_NO_GC = 4 * 1000 * 1000 // 4 million @@ -74,7 +75,7 @@ func RunQueryCmdLine() { addPrintFlags() flag.Parse() if err := runQueryCmdLine(&sybil.FLAGS); err != nil { - fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, errors.Wrap(err, "query")) os.Exit(1) } } @@ -113,7 +114,7 @@ func runQueryCmdLine(flags *sybil.FlagDefs) error { t := sybil.GetTable(table) if t.IsNotExist() { - sybil.Error(t.Name, "table can not be loaded or does not exist in", flags.DIR) + return fmt.Errorf("table %v does not exist in %v", flags.TABLE, flags.DIR) } ints := make([]string, 0) @@ -147,8 +148,12 @@ func runQueryCmdLine(flags *sybil.FlagDefs) error { // LOAD TABLE INFOS BEFORE WE CREATE OUR FILTERS, SO WE CAN CREATE FILTERS ON // THE RIGHT COLUMN ID - t.LoadTableInfo() - t.LoadRecords(nil) + if err := t.LoadTableInfo(); err != nil { + return errors.Wrap(err, "load table info failed") + } + if _, err := t.LoadRecords(nil); err != nil { + return errors.Wrap(err, "loading records failed") + } count := 0 for _, block := range t.BlockList { @@ -194,14 +199,16 @@ func runQueryCmdLine(flags *sybil.FlagDefs) error { for _, v := range t.KeyTable { used[v]++ if used[v] > 1 { - sybil.Error("THERE IS A SERIOUS KEY TABLE INCONSISTENCY") - return nil + return sybil.ErrKeyTableInconsistent } } loadSpec := t.NewLoadSpec() filterSpec := sybil.FilterSpec{Int: flags.INT_FILTERS, Str: flags.STR_FILTERS, Set: flags.SET_FILTERS} - filters := sybil.BuildFilters(t, &loadSpec, filterSpec) + filters, err := sybil.BuildFilters(t, &loadSpec, filterSpec) + if err != nil { + return err + } replacements := sybil.BuildReplacements(flags.FIELD_SEPARATOR, flags.STR_REPLACE) queryParams := sybil.QueryParams{ @@ -229,28 +236,37 @@ func runQueryCmdLine(flags *sybil.FlagDefs) error { allGroups := append(groups, distinct...) for _, v := range allGroups { + var err error switch t.GetColumnType(v) { case sybil.STR_VAL: - loadSpec.Str(v) + err = loadSpec.Str(v) case sybil.INT_VAL: - loadSpec.Int(v) + err = loadSpec.Int(v) default: t.PrintColInfo(printSpec) fmt.Println("") - sybil.Error("Unknown column type for column: ", v, t.GetColumnType(v)) + err = fmt.Errorf("Unknown column type for column: %v %v", v, t.GetColumnType(v)) + } + if err != nil { + return err } - } for _, v := range strs { - loadSpec.Str(v) + if err := loadSpec.Str(v); err != nil { + return err + } } for _, v := range ints { - loadSpec.Int(v) + if err := loadSpec.Int(v); err != nil { + return err + } } if flags.SORT != "" { if flags.SORT != sybil.SORT_COUNT { - loadSpec.Int(flags.SORT) + if err := loadSpec.Int(flags.SORT); err != nil { + return err + } } querySpec.OrderBy = flags.SORT } else { @@ -259,7 +275,9 @@ func runQueryCmdLine(flags *sybil.FlagDefs) error { if flags.PRUNE_BY != "" { if flags.PRUNE_BY != sybil.SORT_COUNT { - loadSpec.Int(flags.PRUNE_BY) + if err := loadSpec.Int(flags.PRUNE_BY); err != nil { + return err + } } querySpec.PruneBy = flags.PRUNE_BY } else { @@ -270,11 +288,15 @@ func runQueryCmdLine(flags *sybil.FlagDefs) error { // TODO: infer the TimeBucket size querySpec.TimeBucket = flags.TIME_BUCKET sybil.Debug("USING TIME BUCKET", querySpec.TimeBucket, "SECONDS") - loadSpec.Int(flags.TIME_COL) + if err := loadSpec.Int(flags.TIME_COL); err != nil { + return err + } } if flags.WEIGHT_COL != "" { - loadSpec.Int(flags.WEIGHT_COL) + if err := loadSpec.Int(flags.WEIGHT_COL); err != nil { + return err + } } querySpec.Limit = flags.LIMIT @@ -287,7 +309,9 @@ func runQueryCmdLine(flags *sybil.FlagDefs) error { loadSpec.SkipDeleteBlocksAfterQuery = true querySpec.Samples = true - t.LoadAndQueryRecords(&loadSpec, &querySpec) + if _, err := t.LoadAndQueryRecords(&loadSpec, &querySpec); err != nil { + return err + } t.PrintSamples(printSpec) @@ -310,7 +334,9 @@ func runQueryCmdLine(flags *sybil.FlagDefs) error { start := time.Now() if flags.LOAD_AND_QUERY { - t.LoadAndQueryRecords(&loadSpec, &querySpec) + if _, err := t.LoadAndQueryRecords(&loadSpec, &querySpec); err != nil { + return err + } end := time.Now() sybil.Debug("LOAD AND QUERY RECORDS TOOK", end.Sub(start)) @@ -327,7 +353,9 @@ func runQueryCmdLine(flags *sybil.FlagDefs) error { t := sybil.GetTable(table) flags.LOAD_AND_QUERY = false - t.LoadRecords(nil) + if _, err := t.LoadRecords(nil); err != nil { + return err + } t.PrintColInfo(printSpec) } diff --git a/src/cmd/cmd_rebuild.go b/src/cmd/cmd_rebuild.go index 3e8ceda6..0394e067 100644 --- a/src/cmd/cmd_rebuild.go +++ b/src/cmd/cmd_rebuild.go @@ -2,44 +2,59 @@ package cmd import ( "flag" + "fmt" + "os" "github.com/logv/sybil/src/sybil" + "github.com/pkg/errors" ) func RunRebuildCmdLine() { REPLACE_INFO := flag.Bool("replace", false, "Replace broken info.db if it exists") FORCE_UPDATE := flag.Bool("force", false, "Force re-calculation of info.db, even if it exists") flag.Parse() + if err := runRebuildCmdLine(&sybil.FLAGS, *REPLACE_INFO, *FORCE_UPDATE); err != nil { + fmt.Fprintln(os.Stderr, errors.Wrap(err, "digest")) + os.Exit(1) + } +} - if sybil.FLAGS.TABLE == "" { +func runRebuildCmdLine(flags *sybil.FlagDefs, replaceInfo bool, forceUpdate bool) error { + if flags.TABLE == "" { flag.PrintDefaults() - return + return sybil.ErrMissingTable } - if sybil.FLAGS.PROFILE { + if flags.PROFILE { profile := sybil.RUN_PROFILER() defer profile.Start().Stop() } - t := sybil.GetTable(sybil.FLAGS.TABLE) + t := sybil.GetTable(flags.TABLE) - loaded := t.LoadTableInfo() && !*FORCE_UPDATE + loadErr := t.LoadTableInfo() + if loadErr != nil { + return loadErr + } + loaded := loadErr == nil && !forceUpdate if loaded { sybil.Print("TABLE INFO ALREADY EXISTS, NOTHING TO REBUILD!") - return + return nil } t.DeduceTableInfoFromBlocks() // TODO: prompt to see if this table info looks good and then write it to // original info.db - if *REPLACE_INFO { + if replaceInfo { sybil.Print("REPLACING info.db WITH DATA COMPUTED ABOVE") lock := sybil.Lock{Table: t, Name: "info"} - lock.ForceDeleteFile() - t.SaveTableInfo("info") + if err := lock.ForceDeleteFile(); err != nil { + return err + } + return t.SaveTableInfo("info") } else { sybil.Print("SAVING TO temp_info.db") - t.SaveTableInfo("temp_info") + return t.SaveTableInfo("temp_info") } } diff --git a/src/cmd/cmd_trim.go b/src/cmd/cmd_trim.go index c14b3d43..01ce4dc5 100644 --- a/src/cmd/cmd_trim.go +++ b/src/cmd/cmd_trim.go @@ -6,22 +6,23 @@ import ( "os" "github.com/logv/sybil/src/sybil" + "github.com/pkg/errors" ) -func askConfirmation() bool { +func askConfirmation() (bool, error) { var response string _, err := fmt.Scanln(&response) if err != nil { - sybil.Error(err) + return false, err } if response == "Y" { - return true + return true, nil } if response == "N" { - return false + return false, nil } fmt.Println("Y or N only") @@ -37,32 +38,43 @@ func RunTrimCmdLine() { flag.StringVar(&sybil.FLAGS.TIME_COL, "time-col", "", "which column to treat as a timestamp [REQUIRED]") flag.Parse() + err := runTrimCmdLine(&sybil.FLAGS, *MB_LIMIT, *DELETE_BEFORE, !*REALLY, *DELETE) + if err != nil { + fmt.Fprintln(os.Stderr, errors.Wrap(err, "trim")) + os.Exit(1) + } +} - if sybil.FLAGS.TABLE == "" || sybil.FLAGS.TIME_COL == "" { +func runTrimCmdLine(flags *sybil.FlagDefs, mbLimit int, deleteBefore int, skipPrompt bool, delete bool) error { + if flags.TABLE == "" || flags.TIME_COL == "" { flag.PrintDefaults() - return + return sybil.ErrMissingTable } - if sybil.FLAGS.PROFILE { + if flags.PROFILE { profile := sybil.RUN_PROFILER() defer profile.Start().Stop() } - t := sybil.GetTable(sybil.FLAGS.TABLE) - if !t.LoadTableInfo() { - sybil.Warn("Couldn't read table info, exiting early") - return + t := sybil.GetTable(flags.TABLE) + if err := t.LoadTableInfo(); err != nil { + return err } loadSpec := t.NewLoadSpec() loadSpec.SkipDeleteBlocksAfterQuery = true - loadSpec.Int(sybil.FLAGS.TIME_COL) + if err := loadSpec.Int(flags.TIME_COL); err != nil { + return err + } trimSpec := sybil.TrimSpec{} - trimSpec.DeleteBefore = int64(*DELETE_BEFORE) - trimSpec.MBLimit = int64(*MB_LIMIT) + trimSpec.DeleteBefore = int64(deleteBefore) + trimSpec.MBLimit = int64(mbLimit) - toTrim := t.TrimTable(&trimSpec) + toTrim, err := t.TrimTable(&trimSpec) + if err != nil { + return err + } sybil.Debug("FOUND", len(toTrim), "CANDIDATE BLOCKS FOR TRIMMING") if len(toTrim) > 0 { @@ -71,13 +83,13 @@ func RunTrimCmdLine() { } } - if *DELETE { - if !*REALLY { + if delete { + if !skipPrompt { // TODO: prompt for deletion fmt.Println("DELETE THE ABOVE BLOCKS? (Y/N)") - if !askConfirmation() { + if ok, err := askConfirmation(); !ok { sybil.Debug("ABORTING") - return + return err } } @@ -86,11 +98,14 @@ func RunTrimCmdLine() { for _, b := range toTrim { sybil.Debug("DELETING", b.Name) if len(b.Name) > 5 { - os.RemoveAll(b.Name) + if err := os.RemoveAll(b.Name); err != nil { + return errors.Wrap(err, fmt.Sprintf("removing '%v' failed", b.Name)) + } } else { sybil.Debug("REFUSING TO DELETE", b.Name) } } - } + + return nil } diff --git a/src/cmd/tc b/src/cmd/tc new file mode 100644 index 00000000..621d020c --- /dev/null +++ b/src/cmd/tc @@ -0,0 +1 @@ +../../workdir/query/corpus/flags-45f65bd773da10889558682dae055023.json diff --git a/src/sybil/column_store_io.go b/src/sybil/column_store_io.go index 1f3792c7..92bab36a 100644 --- a/src/sybil/column_store_io.go +++ b/src/sybil/column_store_io.go @@ -1,13 +1,16 @@ package sybil -import "fmt" -import "bytes" - -import "os" -import "encoding/gob" -import "runtime/debug" -import "time" -import "regexp" +import ( + "bytes" + "encoding/gob" + "fmt" + "os" + "regexp" + "runtime/debug" + "time" + + "github.com/pkg/errors" +) type ValueMap map[int64][]uint32 @@ -58,11 +61,13 @@ func (tb *TableBlock) GetColumnInfo(nameID int16) *TableColumn { return col } -func (tb *TableBlock) SaveIntsToColumns(dirname string, sameInts map[int16]ValueMap) { +func (tb *TableBlock) SaveIntsToColumns(dirname string, sameInts map[int16]ValueMap) error { // now make the dir and shoot each blob out into a separate file // SAVED TO A SINGLE BLOCK ON DISK, NOW TO SAVE IT OUT TO SEPARATE VALUES - os.MkdirAll(dirname, 0777) + if err := os.MkdirAll(dirname, 0777); err != nil { + return err + } for k, v := range sameInts { colName := tb.getStringForKey(k) if colName == "" { @@ -116,7 +121,7 @@ func (tb *TableBlock) SaveIntsToColumns(dirname string, sameInts map[int16]Value enc := gob.NewEncoder(&network) err := enc.Encode(intCol) if err != nil { - Error("encode:", err) + return errors.Wrap(err, "encode") } action := "SERIALIZED" @@ -128,12 +133,15 @@ func (tb *TableBlock) SaveIntsToColumns(dirname string, sameInts map[int16]Value w, _ := os.Create(colFname) - network.WriteTo(w) + if _, err := network.WriteTo(w); err != nil { + return err + } } + return nil } -func (tb *TableBlock) SaveSetsToColumns(dirname string, sameSets map[int16]ValueMap) { +func (tb *TableBlock) SaveSetsToColumns(dirname string, sameSets map[int16]ValueMap) error { for k, v := range sameSets { colName := tb.getStringForKey(k) if colName == "" { @@ -197,7 +205,7 @@ func (tb *TableBlock) SaveSetsToColumns(dirname string, sameSets map[int16]Value err := enc.Encode(setCol) if err != nil { - Error("encode:", err) + return errors.Wrap(err, "error opening infile") } action := "SERIALIZED" @@ -208,12 +216,15 @@ func (tb *TableBlock) SaveSetsToColumns(dirname string, sameSets map[int16]Value Debug(action, "COLUMN BLOCK", colFname, network.Len(), "BYTES", "( PER RECORD", network.Len()/len(tb.RecordList), ")") w, _ := os.Create(colFname) - network.WriteTo(w) + if _, err := network.WriteTo(w); err != nil { + return err + } } + return nil } -func (tb *TableBlock) SaveStrsToColumns(dirname string, sameStrs map[int16]ValueMap) { +func (tb *TableBlock) SaveStrsToColumns(dirname string, sameStrs map[int16]ValueMap) error { for k, v := range sameStrs { colName := tb.getStringForKey(k) if colName == "" { @@ -283,7 +294,7 @@ func (tb *TableBlock) SaveStrsToColumns(dirname string, sameStrs map[int16]Value err := enc.Encode(strCol) if err != nil { - Error("encode:", err) + return errors.Wrap(err, "encode") } action := "SERIALIZED" @@ -294,15 +305,18 @@ func (tb *TableBlock) SaveStrsToColumns(dirname string, sameStrs map[int16]Value Debug(action, "COLUMN BLOCK", colFname, network.Len(), "BYTES", "( PER RECORD", network.Len()/len(tb.RecordList), ")") w, _ := os.Create(colFname) - network.WriteTo(w) + if _, err := network.WriteTo(w); err != nil { + return err + } } + return nil } type SavedIntInfo map[string]*IntInfo type SavedStrInfo map[string]*StrInfo -func (tb *TableBlock) SaveInfoToColumns(dirname string) { +func (tb *TableBlock) SaveInfoToColumns(dirname string) error { records := tb.RecordList // Now to save block info... @@ -338,7 +352,7 @@ func (tb *TableBlock) SaveInfoToColumns(dirname string) { err := enc.Encode(colInfo) if err != nil { - Error("encode:", err) + return errors.Wrap(err, "encode") } length := len(records) @@ -351,7 +365,8 @@ func (tb *TableBlock) SaveInfoToColumns(dirname string) { } w, _ := os.Create(colFname) - network.WriteTo(w) + _, err = network.WriteTo(w) + return err } type SeparatedColumns struct { @@ -412,17 +427,21 @@ func (tb *TableBlock) SeparateRecordsIntoColumns() SeparatedColumns { } -func (tb *TableBlock) SaveToColumns(filename string) bool { +func (tb *TableBlock) SaveToColumns(filename string) error { dirname := filename // Important to set the BLOCK's dirName so we can keep track // of the various block infos tb.Name = dirname - defer tb.table.ReleaseBlockLock(filename) - if !tb.table.GrabBlockLock(filename) { + defer func() { + if err := tb.table.ReleaseBlockLock(filename); err != nil { + Warn("failed to release block lock:", err) + } + }() + if err := tb.table.GrabBlockLock(filename); err != nil { Debug("Can't grab lock to save block", filename) - return false + return err } partialname := fmt.Sprintf("%s.partial", dirname) @@ -434,10 +453,18 @@ func (tb *TableBlock) SaveToColumns(filename string) bool { end := time.Now() Debug("COLLATING BLOCKS TOOK", end.Sub(start)) - tb.SaveIntsToColumns(partialname, separatedColumns.ints) - tb.SaveStrsToColumns(partialname, separatedColumns.strs) - tb.SaveSetsToColumns(partialname, separatedColumns.sets) - tb.SaveInfoToColumns(partialname) + if err := tb.SaveIntsToColumns(partialname, separatedColumns.ints); err != nil { + return err + } + if err := tb.SaveStrsToColumns(partialname, separatedColumns.strs); err != nil { + return err + } + if err := tb.SaveSetsToColumns(partialname, separatedColumns.sets); err != nil { + return err + } + if err := tb.SaveInfoToColumns(partialname); err != nil { + return err + } end = time.Now() Debug("FINISHED BLOCK", partialname, "RELINKING TO", dirname, "TOOK", end.Sub(start)) @@ -448,52 +475,61 @@ func (tb *TableBlock) SaveToColumns(filename string) bool { // For now, we load info.db and check NumRecords inside it to prevent // catastrophics, but we could load everything potentially start = time.Now() - nb := tb.table.LoadBlockFromDir(partialname, nil, false, nil) + nb, err := tb.table.LoadBlockFromDir(partialname, nil, false, nil) + if err != nil { + return err + } end = time.Now() // TODO: if nb == nil || nb.Info.NumRecords != int32(len(tb.RecordList)) { - Error("COULDNT VALIDATE CONSISTENCY FOR RECENTLY SAVED BLOCK!", filename) + return fmt.Errorf("could not validate consistency for recently saved block %v", filename) } if DEBUG_RECORD_CONSISTENCY { - nb = tb.table.LoadBlockFromDir(partialname, nil, true, nil) + nb, err = tb.table.LoadBlockFromDir(partialname, nil, true, nil) + if err != nil { + return err + } if nb == nil || len(nb.RecordList) != len(tb.RecordList) { - Error("DEEP VALIDATION OF BLOCK FAILED CONSISTENCY CHECK!", filename) + return fmt.Errorf("deep validation of block failed consistency check %v", filename) } } Debug("VALIDATED NEW BLOCK HAS", nb.Info.NumRecords, "RECORDS, TOOK", end.Sub(start)) - os.RemoveAll(oldblock) - err := RenameAndMod(dirname, oldblock) + if err := os.RemoveAll(oldblock); err != nil { + return err + } + err = RenameAndMod(dirname, oldblock) if err != nil { - Error("ERROR RENAMING BLOCK", dirname, oldblock, err) + return errors.Wrap(err, fmt.Sprintf("error renaming block %v to %v", dirname, oldblock)) } err = RenameAndMod(partialname, dirname) if err != nil { - Error("ERROR RENAMING PARTIAL", partialname, dirname, err) + return errors.Wrap(err, fmt.Sprintf("error renaming partial %v to %v", partialname, dirname)) } if err == nil { - os.RemoveAll(oldblock) + if err := os.RemoveAll(oldblock); err != nil { + return err + } } else { - Error("ERROR SAVING BLOCK", partialname, dirname, err) + return errors.Wrap(err, fmt.Sprintf("error saving block %v to %v", partialname, dirname)) } Debug("RELEASING BLOCK", tb.Name) - return true + return nil } -func (tb *TableBlock) unpackStrCol(dec FileDecoder, info SavedColumnInfo, replacements map[string]StrReplace) { +func (tb *TableBlock) unpackStrCol(dec FileDecoder, info SavedColumnInfo, replacements map[string]StrReplace) error { records := tb.RecordList[:] into := &SavedStrColumn{} err := dec.Decode(into) if err != nil { - Debug("DECODE COL ERR:", err) - return + return err } stringLookup := make([]string, info.NumRecords) @@ -502,7 +538,7 @@ func (tb *TableBlock) unpackStrCol(dec FileDecoder, info SavedColumnInfo, replac if int(colID) >= keyTableLen { Debug("IGNORING STR COLUMN", into.Name, "SINCE ITS NOT IN KEY TABLE IN BLOCK", tb.Name) - return + return nil } col := tb.GetColumnInfo(colID) @@ -565,7 +601,7 @@ func (tb *TableBlock) unpackStrCol(dec FileDecoder, info SavedColumnInfo, replac if DEBUG_RECORD_CONSISTENCY { if record.Populated[colID] != _NO_VAL { - Error("OVERWRITING RECORD VALUE", record, into.Name, colID, bucket.Value) + return fmt.Errorf("overwriting record value: %v %v %v %v", record, into.Name, colID, bucket.Value) } } @@ -587,16 +623,17 @@ func (tb *TableBlock) unpackStrCol(dec FileDecoder, info SavedColumnInfo, replac } } + return nil } -func (tb *TableBlock) unpackSetCol(dec FileDecoder, info SavedColumnInfo) { +func (tb *TableBlock) unpackSetCol(dec FileDecoder, info SavedColumnInfo) error { records := tb.RecordList savedCol := NewSavedSetColumn() into := &savedCol err := dec.Decode(into) if err != nil { - Debug("DECODE COL ERR:", err) + return err } keyTableLen := len(tb.table.KeyTable) @@ -605,7 +642,7 @@ func (tb *TableBlock) unpackSetCol(dec FileDecoder, info SavedColumnInfo) { if int(colID) >= keyTableLen { Debug("IGNORING SET COLUMN", into.Name, "SINCE ITS NOT IN KEY TABLE IN BLOCK", tb.Name) - return + return nil } col := tb.GetColumnInfo(colID) @@ -650,9 +687,10 @@ func (tb *TableBlock) unpackSetCol(dec FileDecoder, info SavedColumnInfo) { records[r].Populated[colID] = SET_VAL } } + return nil } -func (tb *TableBlock) unpackIntCol(dec FileDecoder, info SavedColumnInfo) { +func (tb *TableBlock) unpackIntCol(dec FileDecoder, info SavedColumnInfo) error { records := tb.RecordList[:] into := &SavedIntColumn{} @@ -665,7 +703,7 @@ func (tb *TableBlock) unpackIntCol(dec FileDecoder, info SavedColumnInfo) { colID := tb.table.getKeyID(into.Name) if int(colID) >= keyTableLen { Debug("IGNORING INT COLUMN", into.Name, "SINCE ITS NOT IN KEY TABLE IN BLOCK", tb.Name) - return + return nil } isTimeCol := into.Name == FLAGS.TIME_COL @@ -686,7 +724,7 @@ func (tb *TableBlock) unpackIntCol(dec FileDecoder, info SavedColumnInfo) { if DEBUG_RECORD_CONSISTENCY { if records[r].Populated[colID] != _NO_VAL { - Error("OVERWRITING RECORD VALUE", records[r], into.Name, colID, bucket.Value) + return fmt.Errorf("overwriting record value: %v %v %v %v", records[r], into.Name, colID, bucket.Value) } } @@ -727,4 +765,5 @@ func (tb *TableBlock) unpackIntCol(dec FileDecoder, info SavedColumnInfo) { } } + return nil } diff --git a/src/sybil/column_store_test.go b/src/sybil/column_store_test.go index 6bd7935f..635bc75e 100644 --- a/src/sybil/column_store_test.go +++ b/src/sybil/column_store_test.go @@ -28,10 +28,15 @@ func TestTableDigestRowRecords(t *testing.T) { FLAGS.TABLE = tableName // TODO: eliminate global use FLAGS.READ_INGESTION_LOG = true - nt.LoadTableInfo() - nt.LoadRecords(&LoadSpec{ + if err := nt.LoadTableInfo(); err != nil { + t.Error(err) + } + _, err := nt.LoadRecords(&LoadSpec{ SkipDeleteBlocksAfterQuery: true, }) + if err != nil { + t.Fatal(err) + } if len(nt.RowBlock.RecordList) != CHUNK_SIZE*blockCount { t.Error("Row Store didn't read back right number of records", len(nt.RowBlock.RecordList)) @@ -41,12 +46,16 @@ func TestTableDigestRowRecords(t *testing.T) { t.Error("Found other records than rowblock") } - nt.DigestRecords() + if err := nt.DigestRecords(); err != nil { + t.Error(err) + } unloadTestTable(tableName) nt = GetTable(tableName) - nt.LoadRecords(nil) + if _, err := nt.LoadRecords(nil); err != nil { + t.Error(err) + } count := int32(0) for _, b := range nt.BlockList { @@ -83,10 +92,15 @@ func TestColumnStoreFileNames(t *testing.T) { FLAGS.TABLE = tableName // TODO: eliminate global use FLAGS.READ_INGESTION_LOG = true - nt.LoadTableInfo() - nt.LoadRecords(&LoadSpec{ + if err := nt.LoadTableInfo(); err != nil { + t.Error(err) + } + _, err := nt.LoadRecords(&LoadSpec{ SkipDeleteBlocksAfterQuery: true, }) + if err != nil { + t.Fatal(err) + } if len(nt.RowBlock.RecordList) != CHUNK_SIZE*blockCount { t.Error("Row Store didn't read back right number of records", len(nt.RowBlock.RecordList)) @@ -96,12 +110,16 @@ func TestColumnStoreFileNames(t *testing.T) { t.Error("Found other records than rowblock") } - nt.DigestRecords() + if err := nt.DigestRecords(); err != nil { + t.Error(err) + } unloadTestTable(tableName) nt = GetTable(tableName) - nt.LoadRecords(nil) + if _, err := nt.LoadRecords(nil); err != nil { + t.Error(err) + } count := int32(0) @@ -161,10 +179,15 @@ func TestBigIntColumns(t *testing.T) { FLAGS.TABLE = tableName // TODO: eliminate global use FLAGS.READ_INGESTION_LOG = true - nt.LoadTableInfo() - nt.LoadRecords(&LoadSpec{ + if err := nt.LoadTableInfo(); err != nil { + t.Error(err) + } + _, err := nt.LoadRecords(&LoadSpec{ SkipDeleteBlocksAfterQuery: true, }) + if err != nil { + t.Fatal(err) + } if len(nt.RowBlock.RecordList) != CHUNK_SIZE*blockCount { t.Error("Row Store didn't read back right number of records", len(nt.RowBlock.RecordList)) @@ -174,7 +197,9 @@ func TestBigIntColumns(t *testing.T) { t.Error("Found other records than rowblock") } - nt.DigestRecords() + if err := nt.DigestRecords(); err != nil { + t.Error(err) + } unloadTestTable(tableName) @@ -185,7 +210,9 @@ func TestBigIntColumns(t *testing.T) { querySpec := newQuerySpec() querySpec.Samples = true querySpec.Limit = 1000 - nt.LoadAndQueryRecords(&loadSpec, querySpec) + if _, err := nt.LoadAndQueryRecords(&loadSpec, querySpec); err != nil { + t.Error(err) + } count := int32(0) Debug("MIN VALUE BEING CHECKED FOR IS", minVal, "2^32 is", 1<<32) diff --git a/src/sybil/config.go b/src/sybil/config.go index 29846a12..bf143bd7 100644 --- a/src/sybil/config.go +++ b/src/sybil/config.go @@ -115,11 +115,8 @@ func EncodeFlags() { FLAGS.ENCODE_FLAGS = oldEncode } -func DecodeFlags() { +func DecodeFlags() error { Debug("READING ENCODED FLAGS FROM STDIN") dec := gob.NewDecoder(os.Stdin) - err := dec.Decode(&FLAGS) - if err != nil { - Error("ERROR DECODING FLAGS", err) - } + return dec.Decode(&FLAGS) } diff --git a/src/sybil/debug.go b/src/sybil/debug.go index 259c3d2d..16b045eb 100644 --- a/src/sybil/debug.go +++ b/src/sybil/debug.go @@ -22,7 +22,3 @@ func Debug(args ...interface{}) { log.Println(args...) } } - -func Error(args ...interface{}) { - log.Fatalln(append([]interface{}{"ERROR"}, args...)...) -} diff --git a/src/sybil/errors.go b/src/sybil/errors.go new file mode 100644 index 00000000..eda2c2f4 --- /dev/null +++ b/src/sybil/errors.go @@ -0,0 +1,38 @@ +package sybil + +import ( + "errors" + "fmt" +) + +var ( + ErrMissingTable = errors.New("missing table") + ErrLockTimeout = errors.New("lock timeout") + ErrLockBroken = errors.New("lock broken") + ErrKeyTableInconsistent = errors.New("key table is inconsistent") +) + +type ErrMissingColumn struct { + column string +} + +func (e ErrMissingColumn) Error() string { + return fmt.Sprintf("column '%s' is missing", e.column) +} + +type ErrColumnTypeMismatch struct { + column string + expected string +} + +func (e ErrColumnTypeMismatch) Error() string { + return fmt.Sprintf("column '%s' is not of type %s", e.column, e.expected) +} + +type ErrUnrecoverableLock struct { + lockfile string +} + +func (e ErrUnrecoverableLock) Error() string { + return fmt.Sprintf("recovery failed for broken lock file: '%s'", e.lockfile) +} diff --git a/src/sybil/file_decoder.go b/src/sybil/file_decoder.go index f3d8741e..b98e848c 100644 --- a/src/sybil/file_decoder.go +++ b/src/sybil/file_decoder.go @@ -1,11 +1,12 @@ package sybil -import "fmt" - -import "os" -import "strings" -import "encoding/gob" -import "compress/gzip" +import ( + "compress/gzip" + "encoding/gob" + "fmt" + "os" + "strings" +) var GOB_GZIP_EXT = ".db.gz" @@ -16,66 +17,69 @@ type GobFileDecoder struct { type FileDecoder interface { Decode(interface{}) error - CloseFile() bool + CloseFile() error } -func (gfd GobFileDecoder) CloseFile() bool { - gfd.File.Close() - return true +func (gfd GobFileDecoder) CloseFile() error { + return gfd.File.Close() } func decodeInto(filename string, obj interface{}) error { - dec := GetFileDecoder(filename) - defer dec.CloseFile() + dec, err := GetFileDecoder(filename) + if err != nil { + return err + } - err := dec.Decode(obj) - return err + if err := dec.Decode(obj); err != nil { + return err + } + return dec.CloseFile() } -func getGobGzipDecoder(filename string) FileDecoder { +func getGobGzipDecoder(filename string) (FileDecoder, error) { var dec *gob.Decoder file, err := os.Open(filename) if err != nil { Debug("COULDNT OPEN GZ", filename) - return GobFileDecoder{gob.NewDecoder(file), file} + return nil, err } reader, err := gzip.NewReader(file) if err != nil { Debug("COULDNT DECOMPRESS GZ", filename) - return GobFileDecoder{gob.NewDecoder(reader), file} + return nil, err } dec = gob.NewDecoder(reader) - return GobFileDecoder{dec, file} + return GobFileDecoder{dec, file}, nil } -func GetFileDecoder(filename string) FileDecoder { +func GetFileDecoder(filename string) (FileDecoder, error) { // if the file ends with GZ ext, we use compressed decoder if strings.HasSuffix(filename, GOB_GZIP_EXT) { - dec := getGobGzipDecoder(filename) - return dec + return getGobGzipDecoder(filename) } file, err := os.Open(filename) // if we try to open the file and its missing, maybe there is a .gz version of it if err != nil { zfilename := fmt.Sprintf("%s%s", filename, GZIP_EXT) - _, err = os.Open(zfilename) + _, err := os.Open(zfilename) // if we can open this file, we return compressed file decoder if err == nil { if strings.HasSuffix(zfilename, GOB_GZIP_EXT) { - dec := getGobGzipDecoder(zfilename) - return dec + return getGobGzipDecoder(zfilename) } } } - + if err != nil { + return nil, err + } // otherwise, we just return vanilla decoder for this file dec := GobFileDecoder{gob.NewDecoder(file), file} - return dec + return dec, nil } diff --git a/src/sybil/file_decoder_test.go b/src/sybil/file_decoder_test.go index f9d2d638..8348763e 100644 --- a/src/sybil/file_decoder_test.go +++ b/src/sybil/file_decoder_test.go @@ -49,21 +49,28 @@ func TestOpenCompressedInfoDB(t *testing.T) { } zinfo := gzip.NewWriter(file) - zinfo.Write(dat) - zinfo.Close() + if _, err := zinfo.Write(dat); err != nil { + t.Error(err) + } + if err := zinfo.Close(); err != nil { + t.Error(err) + } - os.RemoveAll(filename) + if err := os.RemoveAll(filename); err != nil { + t.Error(err) + } // END ZIPPING INFO.DB.GZ loadSpec := nt.NewLoadSpec() loadSpec.LoadAllColumns = true - loaded := nt.LoadTableInfo() - if !loaded { - t.Error("COULDNT LOAD ZIPPED TABLE INFO!") + if err := nt.LoadTableInfo(); err != nil { + t.Error("COULDNT LOAD ZIPPED TABLE INFO!", err) } - nt.LoadRecords(&loadSpec) + if _, err := nt.LoadRecords(&loadSpec); err != nil { + t.Error(err) + } var records = make([]*Record, 0) for _, b := range nt.BlockList { @@ -92,8 +99,12 @@ func TestOpenCompressedColumn(t *testing.T) { }, blockCount) nt := saveAndReloadTable(t, tableName, blockCount) - nt.DigestRecords() - nt.LoadRecords(nil) + if err := nt.DigestRecords(); err != nil { + t.Error(err) + } + if _, err := nt.LoadRecords(nil); err != nil { + t.Error(err) + } blocks := nt.BlockList @@ -120,8 +131,12 @@ func TestOpenCompressedColumn(t *testing.T) { } zinfo := gzip.NewWriter(file) - zinfo.Write(dat) - zinfo.Close() + if _, err := zinfo.Write(dat); err != nil { + t.Error(err) + } + if err := zinfo.Close(); err != nil { + t.Error(err) + } Debug("CREATED GZIP FILE", zfilename) err = os.RemoveAll(filename) @@ -137,12 +152,13 @@ func TestOpenCompressedColumn(t *testing.T) { loadSpec := bt.NewLoadSpec() loadSpec.LoadAllColumns = true - loaded := bt.LoadTableInfo() - if !loaded { - t.Error("COULDNT LOAD ZIPPED TABLE INFO!") + if err := bt.LoadTableInfo(); err != nil { + t.Error("COULDNT LOAD ZIPPED TABLE INFO!", err) } - bt.LoadRecords(&loadSpec) + if _, err := bt.LoadRecords(&loadSpec); err != nil { + t.Error(err) + } var records = make([]*Record, 0) for _, b := range bt.BlockList { diff --git a/src/sybil/filter.go b/src/sybil/filter.go index f4e21bc9..6a1459f0 100644 --- a/src/sybil/filter.go +++ b/src/sybil/filter.go @@ -1,9 +1,12 @@ package sybil -import "regexp" +import ( + "regexp" + "strconv" + "strings" -import "strings" -import "strconv" + "github.com/pkg/errors" +) // This is the passed in flags type FilterSpec struct { @@ -19,7 +22,7 @@ func checkTable(tokens []string, t *Table) bool { return true } -func BuildFilters(t *Table, loadSpec *LoadSpec, filterSpec FilterSpec) []Filter { +func BuildFilters(t *Table, loadSpec *LoadSpec, filterSpec FilterSpec) ([]Filter, error) { strfilters := make([]string, 0) intfilters := make([]string, 0) setfilters := make([]string, 0) @@ -58,7 +61,9 @@ func BuildFilters(t *Table, loadSpec *LoadSpec, filterSpec FilterSpec) []Filter } filters = append(filters, t.IntFilter(col, op, int(val))) - loadSpec.Int(col) + if err := loadSpec.Int(col); err != nil { + return nil, errors.Wrap(err, "building int filters") + } } for _, filter := range setfilters { @@ -70,7 +75,9 @@ func BuildFilters(t *Table, loadSpec *LoadSpec, filterSpec FilterSpec) []Filter if !checkTable(tokens, t) { continue } - loadSpec.Set(col) + if err := loadSpec.Set(col); err != nil { + return nil, errors.Wrap(err, "building set filters") + } filters = append(filters, t.SetFilter(col, op, val)) @@ -86,14 +93,15 @@ func BuildFilters(t *Table, loadSpec *LoadSpec, filterSpec FilterSpec) []Filter continue } - loadSpec.Str(col) + if err := loadSpec.Str(col); err != nil { + return nil, errors.Wrap(err, "building string filters") + } filters = append(filters, t.StrFilter(col, op, val)) } - return filters - + return filters, nil } // FILTERS RETURN TRUE ON MATCH SUCCESS diff --git a/src/sybil/filter_test.go b/src/sybil/filter_test.go index c5804b30..1a70f606 100644 --- a/src/sybil/filter_test.go +++ b/src/sybil/filter_test.go @@ -1,10 +1,12 @@ package sybil -import "testing" -import "math/rand" -import "strconv" -import "math" -import "strings" +import ( + "math" + "math/rand" + "strconv" + "strings" + "testing" +) func TestFilters(t *testing.T) { tableName := getTestTableName(t) @@ -39,8 +41,11 @@ func TestFilters(t *testing.T) { func testIntLt(t *testing.T, tableName string) { nt := GetTable(tableName) - filters := []Filter{} - filters = append(filters, nt.IntFilter("age", "lt", 20)) + loadSpec := nt.NewLoadSpec() + filters, err := BuildFilters(nt, &loadSpec, FilterSpec{Int: "age:lt:20"}) + if err != nil { + t.Fatal(err) + } aggs := []Aggregation{} aggs = append(aggs, nt.Aggregation("age", "avg")) @@ -65,8 +70,11 @@ func testIntLt(t *testing.T, tableName string) { func testIntGt(t *testing.T, tableName string) { nt := GetTable(tableName) - filters := []Filter{} - filters = append(filters, nt.IntFilter("age", "gt", 20)) + loadSpec := nt.NewLoadSpec() + filters, err := BuildFilters(nt, &loadSpec, FilterSpec{Int: "age:gt:20"}) + if err != nil { + t.Fatal(err) + } aggs := []Aggregation{} aggs = append(aggs, nt.Aggregation("age", "avg")) @@ -92,8 +100,11 @@ func testIntGt(t *testing.T, tableName string) { func testIntNeq(t *testing.T, tableName string) { nt := GetTable(tableName) - filters := []Filter{} - filters = append(filters, nt.IntFilter("age", "neq", 20)) + loadSpec := nt.NewLoadSpec() + filters, err := BuildFilters(nt, &loadSpec, FilterSpec{Int: "age:neq:20"}) + if err != nil { + t.Fatal(err) + } aggs := []Aggregation{} aggs = append(aggs, nt.Aggregation("age", "avg")) @@ -126,8 +137,11 @@ func testIntNeq(t *testing.T, tableName string) { func testIntEq(t *testing.T, tableName string) { nt := GetTable(tableName) - filters := []Filter{} - filters = append(filters, nt.IntFilter("age", "eq", 20)) + loadSpec := nt.NewLoadSpec() + filters, err := BuildFilters(nt, &loadSpec, FilterSpec{Int: "age:eq:20"}) + if err != nil { + t.Fatal(err) + } aggs := []Aggregation{} aggs = append(aggs, nt.Aggregation("age", "avg")) @@ -153,8 +167,11 @@ func testIntEq(t *testing.T, tableName string) { func testStrEq(t *testing.T, tableName string) { nt := GetTable(tableName) - filters := []Filter{} - filters = append(filters, nt.StrFilter("age_str", "re", "20")) + loadSpec := nt.NewLoadSpec() + filters, err := BuildFilters(nt, &loadSpec, FilterSpec{Str: "age_str:re:20"}) + if err != nil { + t.Fatal(err) + } aggs := []Aggregation{} aggs = append(aggs, nt.Aggregation("age", "avg")) @@ -184,8 +201,11 @@ func testStrEq(t *testing.T, tableName string) { func testStrNeq(t *testing.T, tableName string) { nt := GetTable(tableName) - filters := []Filter{} - filters = append(filters, nt.StrFilter("age_str", "nre", "20")) + loadSpec := nt.NewLoadSpec() + filters, err := BuildFilters(nt, &loadSpec, FilterSpec{Str: "age_str:nre:20"}) + if err != nil { + t.Fatal(err) + } aggs := []Aggregation{} aggs = append(aggs, nt.Aggregation("age", "avg")) @@ -210,8 +230,11 @@ func testStrNeq(t *testing.T, tableName string) { func testStrRe(t *testing.T, tableName string) { nt := GetTable(tableName) - filters := []Filter{} - filters = append(filters, nt.StrFilter("age_str", "re", "^2")) + loadSpec := nt.NewLoadSpec() + filters, err := BuildFilters(nt, &loadSpec, FilterSpec{Str: "age_str:re:^2"}) + if err != nil { + t.Fatal(err) + } aggs := []Aggregation{} aggs = append(aggs, nt.Aggregation("age", "avg")) @@ -241,8 +264,11 @@ func testStrRe(t *testing.T, tableName string) { func testSetIn(t *testing.T, tableName string) { nt := GetTable(tableName) - filters := []Filter{} - filters = append(filters, nt.SetFilter("age_set", "in", "20")) + loadSpec := nt.NewLoadSpec() + filters, err := BuildFilters(nt, &loadSpec, FilterSpec{Set: "age_set:in:20"}) + if err != nil { + t.Fatal(err) + } aggs := []Aggregation{} aggs = append(aggs, nt.Aggregation("age", "avg")) @@ -283,8 +309,11 @@ func testSetIn(t *testing.T, tableName string) { func testSetNin(t *testing.T, tableName string) { nt := GetTable(tableName) - filters := []Filter{} - filters = append(filters, nt.SetFilter("age_set", "nin", "20")) + loadSpec := nt.NewLoadSpec() + filters, err := BuildFilters(nt, &loadSpec, FilterSpec{Set: "age_set:nin:20"}) + if err != nil { + t.Fatal(err) + } aggs := []Aggregation{} aggs = append(aggs, nt.Aggregation("age", "avg")) diff --git a/src/sybil/helpers_test.go b/src/sybil/helpers_test.go index 50ee08e9..2d2217c3 100644 --- a/src/sybil/helpers_test.go +++ b/src/sybil/helpers_test.go @@ -65,11 +65,16 @@ func saveAndReloadTable(t *testing.T, tableName string, expectedBlocks int) *Tab unloadTestTable(tableName) nt := GetTable(tableName) - nt.LoadTableInfo() + if err := nt.LoadTableInfo(); err != nil { + t.Error(err) + } loadSpec := NewLoadSpec() loadSpec.LoadAllColumns = true - count := nt.LoadRecords(&loadSpec) + count, err := nt.LoadRecords(&loadSpec) + if err != nil { + t.Fatal(err) + } if count != expectedCount { t.Error("Wrote", expectedCount, "records, but read back", count) diff --git a/src/sybil/ingest_test.go b/src/sybil/ingest_test.go new file mode 100644 index 00000000..43cf3f08 --- /dev/null +++ b/src/sybil/ingest_test.go @@ -0,0 +1,10 @@ +package sybil + +import "testing" + +func TestIngestion(t *testing.T) { + tableName := getTestTableName(t) + deleteTestDb(tableName) + defer deleteTestDb(tableName) + +} diff --git a/src/sybil/node_aggregator.go b/src/sybil/node_aggregator.go index 2d8a9666..6211c39e 100644 --- a/src/sybil/node_aggregator.go +++ b/src/sybil/node_aggregator.go @@ -56,7 +56,7 @@ func (vt *VTable) findResultsInDirs(dirs []string) map[string]*NodeResults { } -func (vt *VTable) AggregateSamples(printSpec *PrintSpec, dirs []string) { +func (vt *VTable) AggregateSamples(printSpec *PrintSpec, dirs []string) error { Debug("AGGREGATING TABLE LIST") allResults := vt.findResultsInDirs(dirs) @@ -72,11 +72,10 @@ func (vt *VTable) AggregateSamples(printSpec *PrintSpec, dirs []string) { // TODO: call into vt.PrintSamples later after adjusting how we store the samples // on a per table basis - printJSON(samples) - + return printJSON(samples) } -func (vt *VTable) AggregateTables(printSpec *PrintSpec, dirs []string) { +func (vt *VTable) AggregateTables(printSpec *PrintSpec, dirs []string) error { Debug("AGGREGATING TABLE LIST") allResults := vt.findResultsInDirs(dirs) Debug("FOUND", len(allResults), "SPECS TO AGG") @@ -98,7 +97,7 @@ func (vt *VTable) AggregateTables(printSpec *PrintSpec, dirs []string) { tableArr = append(tableArr, table) } - printTablesToOutput(printSpec, tableArr) + return printTablesToOutput(printSpec, tableArr) } func (vt *VTable) AggregateInfo(printSpec *PrintSpec, dirs []string) { diff --git a/src/sybil/printer.go b/src/sybil/printer.go index 09df9a3b..9fdeddb8 100644 --- a/src/sybil/printer.go +++ b/src/sybil/printer.go @@ -12,16 +12,17 @@ import "io/ioutil" import "text/tabwriter" import "time" -func printJSON(data interface{}) { +func printJSON(data interface{}) error { b, err := json.Marshal(data) if err == nil { os.Stdout.Write(b) } else { - Error("JSON encoding error", err) + return err } + return nil } -func printTimeResults(printSpec *PrintSpec, querySpec *QuerySpec) { +func printTimeResults(printSpec *PrintSpec, querySpec *QuerySpec) error { Debug("PRINTING TIME RESULTS") Debug("CHECKING SORT ORDER", len(querySpec.Sorted)) @@ -59,8 +60,7 @@ func printTimeResults(printSpec *PrintSpec, querySpec *QuerySpec) { } } - printJSON(marshalledResults) - return + return printJSON(marshalledResults) } w := new(tabwriter.Writer) @@ -86,8 +86,7 @@ func printTimeResults(printSpec *PrintSpec, querySpec *QuerySpec) { } } - w.Flush() - + return w.Flush() } func getSparseBuckets(buckets map[string]int64) map[string]int64 { @@ -262,16 +261,16 @@ func printResults(printSpec *PrintSpec, querySpec *QuerySpec) { } } -func PrintBytes(obj interface{}) { +func PrintBytes(obj interface{}) error { var buf bytes.Buffer enc := gob.NewEncoder(&buf) err := enc.Encode(obj) if err != nil { - Warn("COULDNT ENCODE BYTES", err) + return err } Print(buf.String()) - + return nil } func encodeResults(qs *QuerySpec) { @@ -432,7 +431,6 @@ func (t *Table) PrintSamples(printSpec *PrintSpec) { func ListTables() []string { files, err := ioutil.ReadDir(FLAGS.DIR) if err != nil { - Error("No tables found!") return []string{} } @@ -453,10 +451,9 @@ func PrintTables(printSpec *PrintSpec) { } -func printTablesToOutput(printSpec *PrintSpec, tables []string) { +func printTablesToOutput(printSpec *PrintSpec, tables []string) error { if printSpec.EncodeResults { - PrintBytes(NodeResults{Tables: tables}) - return + return PrintBytes(NodeResults{Tables: tables}) } if printSpec.JSON { @@ -464,10 +461,10 @@ func printTablesToOutput(printSpec *PrintSpec, tables []string) { if err == nil { os.Stdout.Write(b) } else { - Error("JSON encoding error", err) + return err } - return + return nil } for _, name := range tables { @@ -475,6 +472,7 @@ func printTablesToOutput(printSpec *PrintSpec, tables []string) { } fmt.Println("") + return nil } func (t *Table) getColsOfType(wantedType int8) []string { diff --git a/src/sybil/record.go b/src/sybil/record.go index 41443675..1ce83717 100644 --- a/src/sybil/record.go +++ b/src/sybil/record.go @@ -1,5 +1,11 @@ package sybil +import ( + "fmt" + + "github.com/pkg/errors" +) + type Record struct { Strs []StrField Ints []IntField @@ -85,7 +91,7 @@ func (r *Record) ResizeFields(length int16) { } -func (r *Record) AddStrField(name string, val string) { +func (r *Record) AddStrField(name string, val string) error { nameID := r.block.getKeyID(name) col := r.block.GetColumnInfo(nameID) @@ -96,11 +102,12 @@ func (r *Record) AddStrField(name string, val string) { r.Populated[nameID] = STR_VAL if !r.block.table.setKeyType(nameID, STR_VAL) { - Error("COULDNT SET STR VAL", name, val, nameID) + return errors.New(fmt.Sprint("couldnt set str val", name, val, nameID)) } + return nil } -func (r *Record) AddIntField(name string, val int64) { +func (r *Record) AddIntField(name string, val int64) error { nameID := r.block.getKeyID(name) r.block.table.updateIntInfo(nameID, val) @@ -108,11 +115,12 @@ func (r *Record) AddIntField(name string, val int64) { r.Ints[nameID] = IntField(val) r.Populated[nameID] = INT_VAL if !r.block.table.setKeyType(nameID, INT_VAL) { - Error("COULDNT SET INT VAL", name, val, nameID) + return errors.New(fmt.Sprint("couldnt set int val", name, val, nameID)) } + return nil } -func (r *Record) AddSetField(name string, val []string) { +func (r *Record) AddSetField(name string, val []string) error { nameID := r.block.getKeyID(name) vals := make([]int32, len(val)) for i, v := range val { @@ -128,8 +136,9 @@ func (r *Record) AddSetField(name string, val []string) { r.SetMap[nameID] = SetField(vals) r.Populated[nameID] = SET_VAL if !r.block.table.setKeyType(nameID, SET_VAL) { - Error("COULDNT SET SET VAL", name, val, nameID) + return errors.New(fmt.Sprint("couldnt set set val", name, val, nameID)) } + return nil } var COPY_RECORD_INTERNS = false diff --git a/src/sybil/record_slab.go b/src/sybil/record_slab.go index 823d94a4..f54c6b81 100644 --- a/src/sybil/record_slab.go +++ b/src/sybil/record_slab.go @@ -1,8 +1,13 @@ package sybil -import "time" +import ( + "fmt" + "time" -func (tb *TableBlock) allocateRecords(loadSpec *LoadSpec, info SavedColumnInfo, loadRecords bool) RecordList { + "github.com/pkg/errors" +) + +func (tb *TableBlock) allocateRecords(loadSpec *LoadSpec, info SavedColumnInfo, loadRecords bool) (RecordList, error) { if FLAGS.RECYCLE_MEM && info.NumRecords == int32(CHUNK_SIZE) && loadSpec != nil && !loadRecords { loadSpec.slabMu.Lock() @@ -13,16 +18,14 @@ func (tb *TableBlock) allocateRecords(loadSpec *LoadSpec, info SavedColumnInfo, slab.ResetRecords(tb) tb.RecordList = *slab - return *slab + return *slab, nil } } - slab := tb.makeRecordSlab(loadSpec, info, loadRecords) - return slab - + return tb.makeRecordSlab(loadSpec, info, loadRecords) } -func (tb *TableBlock) makeRecordSlab(loadSpec *LoadSpec, info SavedColumnInfo, loadRecords bool) RecordList { +func (tb *TableBlock) makeRecordSlab(loadSpec *LoadSpec, info SavedColumnInfo, loadRecords bool) (RecordList, error) { t := tb.table var r *Record @@ -56,7 +59,7 @@ func (tb *TableBlock) makeRecordSlab(loadSpec *LoadSpec, info SavedColumnInfo, l case STR_VAL: hasStrs = true default: - Error("MISSING KEY TYPE FOR COL", v) + return nil, errors.New(fmt.Sprint("missing key type for col", v)) } } } else { @@ -111,8 +114,7 @@ func (tb *TableBlock) makeRecordSlab(loadSpec *LoadSpec, info SavedColumnInfo, l } tb.RecordList = records[:] - return tb.RecordList - + return tb.RecordList, nil } // recycle allocated records between blocks diff --git a/src/sybil/row_store.go b/src/sybil/row_store.go index 93ccbc8d..438b83cd 100644 --- a/src/sybil/row_store.go +++ b/src/sybil/row_store.go @@ -1,12 +1,16 @@ package sybil -import "fmt" -import "path" -import "bytes" -import "encoding/gob" -import "io/ioutil" -import "time" -import "os" +import ( + "bytes" + "encoding/gob" + "fmt" + "io/ioutil" + "os" + "path" + "time" + + "github.com/pkg/errors" +) type RowSavedInt struct { Name int16 @@ -29,7 +33,7 @@ type SavedRecord struct { Sets []RowSavedSet } -func (s SavedRecord) toRecord(t *Table) *Record { +func (s SavedRecord) toRecord(t *Table) (*Record, error) { r := Record{} r.Ints = IntArr{} r.Strs = StrArr{} @@ -57,15 +61,21 @@ func (s SavedRecord) toRecord(t *Table) *Record { } for _, v := range s.Strs { - r.AddStrField(t.getStringForKey(int(v.Name)), v.Value) + err := r.AddStrField(t.getStringForKey(int(v.Name)), v.Value) + if err != nil { + return nil, err + } } for _, v := range s.Sets { - r.AddSetField(t.getStringForKey(int(v.Name)), v.Value) + err := r.AddSetField(t.getStringForKey(int(v.Name)), v.Value) + if err != nil { + return nil, err + } r.Populated[v.Name] = SET_VAL } - return &r + return &r, nil } func (r Record) toSavedRecord() *SavedRecord { @@ -117,7 +127,7 @@ func (t *Table) LoadSavedRecordsFromLog(filename string) []*SavedRecord { return marshalledRecords } -func (t *Table) LoadRecordsFromLog(filename string) RecordList { +func (t *Table) LoadRecordsFromLog(filename string) (RecordList, error) { var marshalledRecords []*SavedRecord // Create an encoder and send a value. @@ -129,15 +139,17 @@ func (t *Table) LoadRecordsFromLog(filename string) RecordList { ret := make(RecordList, len(marshalledRecords)) for i, r := range marshalledRecords { - ret[i] = r.toRecord(t) + ret[i], err = r.toRecord(t) + if err != nil { + return RecordList{}, err + } } - return ret - + return ret, nil } -func (t *Table) AppendRecordsToLog(records RecordList, blockname string) { +func (t *Table) AppendRecordsToLog(records RecordList, blockname string) error { if len(records) == 0 { - return + return nil } // TODO: fix this up, so that we don't @@ -163,7 +175,7 @@ func (t *Table) AppendRecordsToLog(records RecordList, blockname string) { err := enc.Encode(marshalledRecords) if err != nil { - Error("encode:", err) + return errors.Wrap(err, "encoding issue") } filename := fmt.Sprintf("%s.db", w.Name()) @@ -171,7 +183,9 @@ func (t *Table) AppendRecordsToLog(records RecordList, blockname string) { Debug("SERIALIZED INTO LOG", filename, network.Len(), "BYTES", "( PER RECORD", network.Len()/len(marshalledRecords), ")") - network.WriteTo(w) + if _, err := network.WriteTo(w); err != nil { + return err + } for i := 0; i < 3; i++ { fullname := path.Join(ingestdir, basename) @@ -179,7 +193,7 @@ func (t *Table) AppendRecordsToLog(records RecordList, blockname string) { err = RenameAndMod(w.Name(), fullname) if err == nil { // we are done writing, time to exit - return + return nil } if err != nil { @@ -188,4 +202,5 @@ func (t *Table) AppendRecordsToLog(records RecordList, blockname string) { } Warn("COULDNT INGEST INTO ROW STORE") + return fmt.Errorf("issue ingesting") } diff --git a/src/sybil/table_block_io.go b/src/sybil/table_block_io.go index 8502e4b5..ecd35601 100644 --- a/src/sybil/table_block_io.go +++ b/src/sybil/table_block_io.go @@ -1,19 +1,23 @@ package sybil -import "bytes" -import "fmt" -import "time" -import "os" -import "path" -import "strings" -import "sync" -import "compress/gzip" +import ( + "bytes" + "compress/gzip" + "fmt" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/pkg/errors" +) var GZIP_EXT = ".gz" -func (t *Table) SaveRecordsToBlock(records RecordList, filename string) bool { +func (t *Table) SaveRecordsToBlock(records RecordList, filename string) error { if len(records) == 0 { - return true + return nil } tempBlock := newTableBlock() @@ -23,8 +27,10 @@ func (t *Table) SaveRecordsToBlock(records RecordList, filename string) bool { return tempBlock.SaveToColumns(filename) } -func (t *Table) FindPartialBlocks() []*TableBlock { - t.LoadRecords(nil) +func (t *Table) FindPartialBlocks() ([]*TableBlock, error) { + if _, err := t.LoadRecords(nil); err != nil { + return nil, err + } ret := make([]*TableBlock, 0) @@ -40,22 +46,25 @@ func (t *Table) FindPartialBlocks() []*TableBlock { } t.blockMu.Unlock() - return ret + return ret, nil } // TODO: find any open blocks and then fill them... -func (t *Table) FillPartialBlock() bool { +func (t *Table) FillPartialBlock() error { if len(t.newRecords) == 0 { - return false + return nil } - openBlocks := t.FindPartialBlocks() + openBlocks, err := t.FindPartialBlocks() + if err != nil { + return err + } Debug("OPEN BLOCKS", openBlocks) var filename string if len(openBlocks) == 0 { - return true + return nil } for _, b := range openBlocks { @@ -64,9 +73,9 @@ func (t *Table) FillPartialBlock() bool { Debug("OPENING PARTIAL BLOCK", filename) - if !t.GrabBlockLock(filename) { + if err := t.GrabBlockLock(filename); err != nil { Debug("CANT FILL PARTIAL BLOCK DUE TO LOCK", filename) - return true + return errors.Wrap(err, "failed to grab block lock") } defer t.ReleaseBlockLock(filename) @@ -74,9 +83,13 @@ func (t *Table) FillPartialBlock() bool { // open up our last record block, see how full it is delete(t.BlockInfoCache, filename) - block := t.LoadBlockFromDir(filename, nil, true /* LOAD ALL RECORDS */, nil) + block, err := t.LoadBlockFromDir(filename, nil, true /* LOAD ALL RECORDS */, nil) + if err != nil { + return err + } + // TODO add error handling if block == nil { - return true + return nil } partialRecords := block.RecordList @@ -90,9 +103,9 @@ func (t *Table) FillPartialBlock() bool { Debug("SAVING PARTIAL RECORDS", delta, "TO", filename) partialRecords = append(partialRecords, t.newRecords[0:delta]...) - if !t.SaveRecordsToBlock(partialRecords, filename) { + if err := t.SaveRecordsToBlock(partialRecords, filename); err != nil { Debug("COULDNT SAVE PARTIAL RECORDS TO", filename) - return false + return errors.Wrap(err, "save records to block") } if delta < len(t.newRecords) { @@ -102,7 +115,7 @@ func (t *Table) FillPartialBlock() bool { } } - return true + return nil } // optimizing for integer pre-cached info @@ -196,7 +209,7 @@ func (t *Table) LoadBlockInfo(dirname string) *SavedColumnInfo { // TODO: have this only pull the blocks into column format and not materialize // the columns immediately -func (t *Table) LoadBlockFromDir(dirname string, loadSpec *LoadSpec, loadRecords bool, replacements map[string]StrReplace) *TableBlock { +func (t *Table) LoadBlockFromDir(dirname string, loadSpec *LoadSpec, loadRecords bool, replacements map[string]StrReplace) (*TableBlock, error) { tb := newTableBlock() tb.Name = dirname @@ -206,11 +219,11 @@ func (t *Table) LoadBlockFromDir(dirname string, loadSpec *LoadSpec, loadRecords info := t.LoadBlockInfo(dirname) if info == nil { - return nil + return nil, nil } if info.NumRecords <= 0 { - return nil + return nil, nil } t.blockMu.Lock() @@ -244,25 +257,31 @@ func (t *Table) LoadBlockFromDir(dirname string, loadSpec *LoadSpec, loadRecords filename := fmt.Sprintf("%s/%s", dirname, fname) - dec := GetFileDecoder(filename) + dec, err := GetFileDecoder(filename) + if err != nil { + return nil, err + } switch { case strings.HasPrefix(fname, "str"): - tb.unpackStrCol(dec, *info, replacements) + err = tb.unpackStrCol(dec, *info, replacements) case strings.HasPrefix(fname, "set"): - tb.unpackSetCol(dec, *info) + err = tb.unpackSetCol(dec, *info) case strings.HasPrefix(fname, "int"): - tb.unpackIntCol(dec, *info) + err = tb.unpackIntCol(dec, *info) } dec.CloseFile() + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("issue unpacking %v", fname)) + } } tb.Size = size file.Close() - return &tb + return &tb, nil } type AfterLoadQueryCB struct { diff --git a/src/sybil/table_ingest.go b/src/sybil/table_ingest.go index 60b4bdb2..6cd81169 100644 --- a/src/sybil/table_ingest.go +++ b/src/sybil/table_ingest.go @@ -1,11 +1,14 @@ package sybil -import "time" -import "path" -import "io/ioutil" +import ( + "io/ioutil" + "os" + "path" + "strings" + "time" -import "os" -import "strings" + "github.com/pkg/errors" +) // to ingest, make a new tmp file inside ingest/ (or append to an existing one) // to digest, make a new STOMACHE_DIR tempdir and move all files from ingest/ into it @@ -29,16 +32,21 @@ func (t *Table) getNewCacheBlockFile() (*os.File, error) { } // Go through newRecords list and save all the new records out to a row store -func (t *Table) IngestRecords(blockname string) { +func (t *Table) IngestRecords(blockname string) error { Debug("KEY TABLE", t.KeyTable) Debug("KEY TYPES", t.KeyTypes) - t.AppendRecordsToLog(t.newRecords[:], blockname) + if err := t.AppendRecordsToLog(t.newRecords[:], blockname); err != nil { + return err + } t.newRecords = make(RecordList, 0) - t.SaveTableInfo("info") + if err := t.SaveTableInfo("info"); err != nil { + Warn(errors.Wrap(err, "t.SaveTableInfo")) + } t.ReleaseRecords() t.MaybeCompactRecords() + return nil } // TODO: figure out how often we actually do a collation check by storing last @@ -116,7 +124,7 @@ func (t *Table) ShouldCompactRowStore(digest string) bool { } -func (t *Table) LoadRowStoreRecords(digest string, afterBlockLoadCb AfterRowBlockLoad) { +func (t *Table) LoadRowStoreRecords(digest string, afterBlockLoadCb AfterRowBlockLoad) error { dirname := path.Join(FLAGS.DIR, t.Name, digest) var err error @@ -127,7 +135,7 @@ func (t *Table) LoadRowStoreRecords(digest string, afterBlockLoadCb AfterRowBloc afterBlockLoadCb(NO_MORE_BLOCKS, nil) } - return + return err } var file *os.File @@ -137,7 +145,7 @@ func (t *Table) LoadRowStoreRecords(digest string, afterBlockLoadCb AfterRowBloc Debug("Can't open the ingestion dir", dirname) time.Sleep(LOCK_US) if i > MAX_ROW_STORE_TRIES { - return + return ErrLockTimeout } continue } @@ -167,7 +175,10 @@ func (t *Table) LoadRowStoreRecords(digest string, afterBlockLoadCb AfterRowBloc filename = path.Join(dirname, file.Name()) - records := t.LoadRecordsFromLog(filename) + records, err := t.LoadRecordsFromLog(filename) + if err != nil { + return err + } if afterBlockLoadCb != nil { afterBlockLoadCb(filename, records) } @@ -177,6 +188,7 @@ func (t *Table) LoadRowStoreRecords(digest string, afterBlockLoadCb AfterRowBloc afterBlockLoadCb(NO_MORE_BLOCKS, nil) } + return nil } func LoadRowBlockCB(digestname string, records RecordList) { @@ -196,16 +208,17 @@ func LoadRowBlockCB(digestname string, records RecordList) { var DELETE_BLOCKS = make([]string, 0) -func (t *Table) RestoreUningestedFiles() { - if !t.GrabDigestLock() { +func (t *Table) RestoreUningestedFiles() error { + if err := t.GrabDigestLock(); err != nil { Debug("CANT RESTORE UNINGESTED RECORDS WITHOUT DIGEST LOCK") - return + return err } ingestdir := path.Join(FLAGS.DIR, t.Name, INGEST_DIR) os.MkdirAll(ingestdir, 0777) digesting := path.Join(FLAGS.DIR, t.Name) + // TODO add error handling file, _ := os.Open(digesting) dirs, _ := file.Readdir(0) @@ -221,17 +234,20 @@ func (t *Table) RestoreUningestedFiles() { err := RenameAndMod(from, to) if err != nil { Debug("COULDNT RESTORE UNINGESTED FILE", from, to, err) + return err } } err := os.Remove(path.Join(digesting, dir.Name())) if err != nil { Debug("REMOVING STOMACHE FAILED!", err) + return err } } } + return nil } type SaveBlockChunkCB struct { @@ -275,13 +291,11 @@ func (cb *SaveBlockChunkCB) CB(digestname string, records RecordList) { var STOMACHE_DIR = "stomache" // Go through rowstore and save records out to column store -func (t *Table) DigestRecords() { - canDigest := t.GrabDigestLock() - - if !canDigest { +func (t *Table) DigestRecords() error { + if err := t.GrabDigestLock(); err != nil { t.ReleaseInfoLock() Debug("CANT GRAB LOCK FOR DIGEST RECORDS") - return + return errors.Wrap(err, "grabbing digest lock failed") } dirname := path.Join(FLAGS.DIR, t.Name) @@ -294,16 +308,17 @@ func (t *Table) DigestRecords() { t.ReleaseDigestLock() Debug("ERROR CREATING DIGESTION DIR", err) time.Sleep(time.Millisecond * 50) - return + return err } file, _ := os.Open(digestfile) files, err := file.Readdir(0) + // TODO add error handling if len(files) < MIN_FILES_TO_DIGEST { Debug("SKIPPING DIGESTION, NOT AS MANY FILES AS WE THOUGHT", len(files), "VS", MIN_FILES_TO_DIGEST) t.ReleaseDigestLock() - return + return nil } if err == nil { @@ -319,4 +334,5 @@ func (t *Table) DigestRecords() { } else { t.ReleaseDigestLock() } + return nil } diff --git a/src/sybil/table_io.go b/src/sybil/table_io.go index 84e2abff..f4d4fc32 100644 --- a/src/sybil/table_io.go +++ b/src/sybil/table_io.go @@ -1,17 +1,20 @@ package sybil -import "fmt" - -import "os" -import "path" -import "sort" -import "strings" -import "sync" -import "time" -import "bytes" -import "io/ioutil" -import "encoding/gob" -import "runtime/debug" +import ( + "bytes" + "encoding/gob" + "fmt" + "io/ioutil" + "os" + "path" + "runtime/debug" + "sort" + "strings" + "sync" + "time" + + "github.com/pkg/errors" +) var DEBUG_TIMING = false var CHUNKS_BEFORE_GC = 16 @@ -22,9 +25,9 @@ var CACHE_DIR = "cache" var HOLD_MATCHES = false var BLOCKS_PER_CACHE_FILE = 64 -func (t *Table) saveTableInfo(fname string) { - if !t.GrabInfoLock() { - return +func (t *Table) saveTableInfo(fname string) error { + if err := t.GrabInfoLock(); err != nil { + return errors.Wrap(err, "t.GrabInfoLock") } defer t.ReleaseInfoLock() @@ -43,29 +46,28 @@ func (t *Table) saveTableInfo(fname string) { err := enc.Encode(t) if err != nil { - Error("encode:", err) + return err } Debug("SERIALIZED TABLE INFO", fname, "INTO ", network.Len(), "BYTES") tempfile, err := ioutil.TempFile(dirname, "info.db") if err != nil { - Error("ERROR CREATING TEMP FILE FOR TABLE INFO", err) + return errors.Wrap(err, "error creating temp file for table info") } _, err = network.WriteTo(tempfile) if err != nil { - Error("ERROR SAVING TABLE INFO INTO TEMPFILE", err) + return errors.Wrap(err, "error saving table info into tempfile") } RenameAndMod(tempfile.Name(), filename) - os.Create(flagfile) + _, err = os.Create(flagfile) + return err } -func (t *Table) SaveTableInfo(fname string) { - saveTable := getSaveTable(t) - saveTable.saveTableInfo(fname) - +func (t *Table) SaveTableInfo(fname string) error { + return getSaveTable(t).saveTableInfo(fname) } func getSaveTable(t *Table) *Table { @@ -76,9 +78,9 @@ func getSaveTable(t *Table) *Table { StrInfo: t.StrInfo} } -func (t *Table) saveRecordList(records RecordList) bool { +func (t *Table) saveRecordList(records RecordList) error { if len(records) == 0 { - return false + return nil } Debug("SAVING RECORD LIST", len(records), t.Name) @@ -89,59 +91,65 @@ func (t *Table) saveRecordList(records RecordList) bool { if chunks == 0 { filename, err := t.getNewIngestBlockName() if err != nil { - Error("ERR SAVING BLOCK", filename, err) + return errors.Wrap(err, fmt.Sprintf("error saving block %v", filename)) + } + if err := t.SaveRecordsToBlock(records, filename); err != nil { + return err } - t.SaveRecordsToBlock(records, filename) } else { for j := 0; j < chunks; j++ { filename, err := t.getNewIngestBlockName() if err != nil { - Error("ERR SAVING BLOCK", filename, err) + return errors.Wrap(err, fmt.Sprintf("error saving block %v", filename)) + } + if err := t.SaveRecordsToBlock(records[j*chunkSize:(j+1)*chunkSize], filename); err != nil { + return err } - t.SaveRecordsToBlock(records[j*chunkSize:(j+1)*chunkSize], filename) } // SAVE THE REMAINDER if len(records) > chunks*chunkSize { filename, err := t.getNewIngestBlockName() if err != nil { - Error("Error creating new ingestion block", err) + return errors.Wrap(err, "error creating new ingestion block") } - t.SaveRecordsToBlock(records[chunks*chunkSize:], filename) + if err := t.SaveRecordsToBlock(records[chunks*chunkSize:], filename); err != nil { + return err + } } } - return true + return nil } -func (t *Table) SaveRecordsToColumns() bool { +func (t *Table) SaveRecordsToColumns() error { os.MkdirAll(path.Join(FLAGS.DIR, t.Name), 0777) sort.Sort(SortRecordsByTime{t.newRecords}) - t.FillPartialBlock() + if err := t.FillPartialBlock(); err != nil { + return err + } ret := t.saveRecordList(t.newRecords) t.newRecords = make(RecordList, 0) t.SaveTableInfo("info") - return ret - } -func (t *Table) LoadTableInfo() bool { +func (t *Table) LoadTableInfo() error { tablename := t.Name filename := path.Join(FLAGS.DIR, tablename, "info.db") - if t.GrabInfoLock() { + if err := t.GrabInfoLock(); err == nil { defer t.ReleaseInfoLock() } else { Debug("LOAD TABLE INFO LOCK TAKEN") - return false + return err } return t.LoadTableInfoFrom(filename) } -func (t *Table) LoadTableInfoFrom(filename string) bool { +func (t *Table) LoadTableInfoFrom(filename string) error { savedTable := Table{Name: t.Name} savedTable.initDataStructures() @@ -152,7 +160,7 @@ func (t *Table) LoadTableInfoFrom(filename string) bool { end := time.Now() if err != nil { Debug("TABLE INFO DECODE:", err) - return false + return err } if DEBUG_TIMING { @@ -180,7 +188,7 @@ func (t *Table) LoadTableInfoFrom(filename string) bool { t.populateStringIDLookup() } - return true + return nil } // Remove our pointer to the blocklist so a GC is triggered and @@ -234,24 +242,21 @@ func fileLooksLikeBlock(v os.FileInfo) bool { } -func (t *Table) LoadBlockCache() { - if !t.GrabCacheLock() { - return +func (t *Table) LoadBlockCache() error { + if err := t.GrabCacheLock(); err != nil { + return nil } defer t.ReleaseCacheLock() files, err := ioutil.ReadDir(path.Join(FLAGS.DIR, t.Name, CACHE_DIR)) if err != nil { - return + return err } for _, blockFile := range files { filename := path.Join(FLAGS.DIR, t.Name, CACHE_DIR, blockFile.Name()) blockCache := SavedBlockCache{} - if err != nil { - continue - } err = decodeInto(filename, &blockCache) if err != nil { @@ -264,6 +269,7 @@ func (t *Table) LoadBlockCache() { } Debug("FILLED BLOCK CACHE WITH", len(t.BlockInfoCache), "ITEMS") + return nil } func (t *Table) ResetBlockCache() { @@ -314,13 +320,13 @@ func (t *Table) WriteQueryCache(toCacheSpecs map[string]*QuerySpec) { } -func (t *Table) WriteBlockCache() { +func (t *Table) WriteBlockCache() error { if len(t.NewBlockInfos) == 0 { - return + return nil } - if !t.GrabCacheLock() { - return + if err := t.GrabCacheLock(); err != nil { + return err } defer t.ReleaseCacheLock() @@ -358,32 +364,39 @@ func (t *Table) WriteBlockCache() { t.NewBlockInfos = t.NewBlockInfos[:0] + return nil } -func (t *Table) LoadRecords(loadSpec *LoadSpec) int { +func (t *Table) LoadRecords(loadSpec *LoadSpec) (int, error) { t.LoadBlockCache() return t.LoadAndQueryRecords(loadSpec, nil) } -func (t *Table) ChunkAndSave() { +func (t *Table) ChunkAndSave() error { if len(t.newRecords) >= CHUNK_SIZE { os.MkdirAll(path.Join(FLAGS.DIR, t.Name), 0777) name, err := t.getNewIngestBlockName() if err == nil { - t.SaveRecordsToBlock(t.newRecords, name) - t.SaveTableInfo("info") + if err := t.SaveRecordsToBlock(t.newRecords, name); err != nil { + return err + } + if err := t.SaveTableInfo("info"); err != nil { + return err + } t.newRecords = make(RecordList, 0) t.ReleaseRecords() } else { - Error("ERROR SAVING BLOCK", err) + return errors.Wrap(err, "error saving block") } } + return nil } func (t *Table) IsNotExist() bool { + // TODO: consider using os.Stat and os.IsNotExist tableDir := path.Join(FLAGS.DIR, t.Name) _, err := ioutil.ReadDir(tableDir) return err != nil diff --git a/src/sybil/table_load_spec.go b/src/sybil/table_load_spec.go index 2a94a65e..573bfd1d 100644 --- a/src/sybil/table_load_spec.go +++ b/src/sybil/table_load_spec.go @@ -1,6 +1,10 @@ package sybil -import "sync" +import ( + "sync" + + "github.com/pkg/errors" +) type LoadSpec struct { columns map[string]bool @@ -34,14 +38,14 @@ func (t *Table) NewLoadSpec() LoadSpec { return l } -func (l *LoadSpec) assertColType(name string, colType int8) { +func (l *LoadSpec) checkColType(name string, colType int8) error { if l.table == nil { - return + return nil } nameID := l.table.getKeyID(name) if l.table.KeyTypes[nameID] == 0 { - Error("Query Error! Column ", name, " does not exist") + return ErrMissingColumn{name} } if l.table.KeyTypes[nameID] != colType { @@ -54,23 +58,32 @@ func (l *LoadSpec) assertColType(name string, colType int8) { case SET_VAL: colTypeName = "Set" } - - Error("Query Error! Key ", name, " exists, but is not of type ", colTypeName) + return ErrColumnTypeMismatch{name, colTypeName} } + return nil } -func (l *LoadSpec) Str(name string) { - l.assertColType(name, STR_VAL) +func (l *LoadSpec) Str(name string) error { + if err := l.checkColType(name, STR_VAL); err != nil { + return errors.Wrap(err, "Str") + } l.columns[name] = true l.files["str_"+name+".db"] = true + return nil } -func (l *LoadSpec) Int(name string) { - l.assertColType(name, INT_VAL) +func (l *LoadSpec) Int(name string) error { + if err := l.checkColType(name, INT_VAL); err != nil { + return errors.Wrap(err, "Int") + } l.columns[name] = true l.files["int_"+name+".db"] = true + return nil } -func (l *LoadSpec) Set(name string) { - l.assertColType(name, SET_VAL) +func (l *LoadSpec) Set(name string) error { + if err := l.checkColType(name, SET_VAL); err != nil { + return errors.Wrap(err, "Set") + } l.columns[name] = true l.files["set_"+name+".db"] = true + return nil } diff --git a/src/sybil/table_lock.go b/src/sybil/table_lock.go index 034479e4..937ce612 100644 --- a/src/sybil/table_lock.go +++ b/src/sybil/table_lock.go @@ -1,12 +1,16 @@ package sybil -import "path" -import "os" -import "syscall" -import "fmt" -import "strconv" -import "io/ioutil" -import "time" +import ( + "fmt" + "io/ioutil" + "os" + "path" + "strconv" + "syscall" + "time" + + "github.com/pkg/errors" +) var LOCK_US = time.Millisecond * 3 var LOCK_TRIES = 50 @@ -14,9 +18,9 @@ var MAX_LOCK_BREAKS = 5 // Every LockFile should have a recovery plan type RecoverableLock interface { - Grab() bool - Release() bool - Recover() bool + Grab() error + Release() error + Recover() error } var BREAK_MAP = make(map[string]int) @@ -43,26 +47,29 @@ type DigestLock struct { Lock } -func RecoverLock(lock RecoverableLock) bool { +func RecoverLock(lock RecoverableLock) error { // TODO: log the auto recovery into a recovery file return lock.Recover() } -func (l *InfoLock) Recover() bool { +func (l *InfoLock) Recover() error { t := l.Lock.Table Debug("INFO LOCK RECOVERY") dirname := path.Join(FLAGS.DIR, t.Name) backup := path.Join(dirname, "info.bak") infodb := path.Join(dirname, "info.db") - if t.LoadTableInfoFrom(infodb) { + if err := t.LoadTableInfoFrom(infodb); err == nil { Debug("LOADED REASONABLE TABLE INFO, DELETING LOCK") l.ForceDeleteFile() - return true + return nil } - if t.LoadTableInfoFrom(backup) { + err := t.LoadTableInfoFrom(backup) + + if err == nil { Debug("LOADED TABLE INFO FROM BACKUP, RESTORING BACKUP") + // TODO add error handling os.Remove(infodb) RenameAndMod(backup, infodb) l.ForceDeleteFile() @@ -72,10 +79,10 @@ func (l *InfoLock) Recover() bool { Debug("CANT READ info.db OR RECOVER info.bak") Debug("TRY DELETING LOCK BY HAND FOR", l.Name) - return false + return errors.Wrap(err, "loading from backup failed") } -func (l *DigestLock) Recover() bool { +func (l *DigestLock) Recover() error { Debug("RECOVERING DIGEST LOCK", l.Name) t := l.Table ingestdir := path.Join(FLAGS.DIR, t.Name, INGEST_DIR) @@ -83,17 +90,21 @@ func (l *DigestLock) Recover() bool { os.MkdirAll(ingestdir, 0777) // TODO: understand if any file in particular is messing things up... pid := int64(os.Getpid()) + // TODO add error handling to these methods l.ForceMakeFile(pid) t.RestoreUningestedFiles() l.ForceDeleteFile() - return true + return nil } -func (l *BlockLock) Recover() bool { +func (l *BlockLock) Recover() error { Debug("RECOVERING BLOCK LOCK", l.Name) t := l.Table - tb := t.LoadBlockFromDir(l.Name, nil, true, nil) + tb, err := t.LoadBlockFromDir(l.Name, nil, true, nil) + if err != nil { + return err + } if tb == nil || tb.Info == nil || tb.Info.NumRecords <= 0 { Debug("BLOCK IS NO GOOD, TURNING IT INTO A BROKEN BLOCK") // This block is not good! need to put it into remediation... @@ -105,17 +116,17 @@ func (l *BlockLock) Recover() bool { l.ForceDeleteFile() } - return true + return nil } -func (l *CacheLock) Recover() bool { +func (l *CacheLock) Recover() error { Debug("RECOVERING BLOCK LOCK", l.Name) t := l.Table files, err := ioutil.ReadDir(path.Join(FLAGS.DIR, t.Name, CACHE_DIR)) if err != nil { l.ForceDeleteFile() - return true + return nil } for _, blockFile := range files { @@ -136,18 +147,17 @@ func (l *CacheLock) Recover() bool { } + // TODO: handle errors l.ForceDeleteFile() - - return true - + return nil } -func (l *Lock) Recover() bool { +func (l *Lock) Recover() error { Debug("UNIMPLEMENTED RECOVERY FOR LOCK", l.Table.Name, l.Name) - return false + return errors.New("unimplemented Recover()") } -func (l *Lock) ForceDeleteFile() { +func (l *Lock) ForceDeleteFile() error { t := l.Table digest := l.Name @@ -156,7 +166,7 @@ func (l *Lock) ForceDeleteFile() { lockfile := path.Join(FLAGS.DIR, t.Name, fmt.Sprintf("%s.lock", digest)) Debug("FORCE DELETING", lockfile) - os.RemoveAll(lockfile) + return os.RemoveAll(lockfile) } func (l *Lock) ForceMakeFile(pid int64) { @@ -280,7 +290,7 @@ func checkPid(lockfile string, l *Lock) bool { return cangrab } -func (l *Lock) Grab() bool { +func (l *Lock) Grab() error { t := l.Table digest := l.Name @@ -294,7 +304,7 @@ func (l *Lock) Grab() bool { if !checkPid(lockfile, l) { if l.broken { Debug("MARKING BROKEN LOCKFILE", lockfile) - return false + return ErrLockBroken } continue @@ -318,16 +328,15 @@ func (l *Lock) Grab() bool { } Debug("LOCKING", lockfile) - return true + return nil } Debug("CANT CREATE LOCK FILE:", err) Debug("LOCK FAIL!", lockfile) - return false - + return ErrLockTimeout } -func (l *Lock) Release() bool { +func (l *Lock) Release() error { t := l.Table digest := l.Name @@ -349,75 +358,74 @@ func (l *Lock) Release() bool { } - return true + return nil } -func (t *Table) GrabInfoLock() bool { +func (t *Table) GrabInfoLock() error { lock := Lock{Table: t, Name: "info"} info := &InfoLock{lock} ret := info.Grab() - if !ret && info.broken { - ret = RecoverLock(info) + if ret == ErrLockBroken { + return RecoverLock(info) } return ret } -func (t *Table) ReleaseInfoLock() bool { +func (t *Table) ReleaseInfoLock() error { lock := Lock{Table: t, Name: "info"} info := &InfoLock{lock} ret := info.Release() return ret } -func (t *Table) GrabDigestLock() bool { +func (t *Table) GrabDigestLock() error { lock := Lock{Table: t, Name: STOMACHE_DIR} info := &DigestLock{lock} ret := info.Grab() - if !ret && info.broken { - ret = RecoverLock(info) + if ret == ErrLockBroken { + return RecoverLock(info) } return ret } -func (t *Table) ReleaseDigestLock() bool { +func (t *Table) ReleaseDigestLock() error { lock := Lock{Table: t, Name: STOMACHE_DIR} info := &DigestLock{lock} ret := info.Release() return ret } -func (t *Table) GrabBlockLock(name string) bool { +func (t *Table) GrabBlockLock(name string) error { lock := Lock{Table: t, Name: name} info := &BlockLock{lock} ret := info.Grab() // INFO RECOVER IS GOING TO HAVE TIMING ISSUES... WHEN MULTIPLE THREADS ARE // AT PLAY - if !ret && info.broken { - ret = RecoverLock(info) + if ret == ErrLockBroken { + return RecoverLock(info) } return ret - } -func (t *Table) ReleaseBlockLock(name string) bool { +func (t *Table) ReleaseBlockLock(name string) error { lock := Lock{Table: t, Name: name} info := &BlockLock{lock} ret := info.Release() return ret } -func (t *Table) GrabCacheLock() bool { +func (t *Table) GrabCacheLock() error { lock := Lock{Table: t, Name: CACHE_DIR} info := &CacheLock{lock} ret := info.Grab() - if !ret && info.broken { - ret = RecoverLock(info) + if ret == ErrLockBroken { + return RecoverLock(info) } return ret } -func (t *Table) ReleaseCacheLock() bool { +func (t *Table) ReleaseCacheLock() error { lock := Lock{Table: t, Name: CACHE_DIR} info := &CacheLock{lock} ret := info.Release() diff --git a/src/sybil/table_lock_test.go b/src/sybil/table_lock_test.go index da6b0852..12a3b410 100644 --- a/src/sybil/table_lock_test.go +++ b/src/sybil/table_lock_test.go @@ -13,9 +13,8 @@ func TestGrabInfoLock(t *testing.T) { tbl.MakeDir() - grabbed := tbl.GrabInfoLock() - if !grabbed { - t.Errorf("COULD NOT GRAB INFO LOCK, tried %v", tableName) + if err := tbl.GrabInfoLock(); err != nil { + t.Errorf("COULD NOT GRAB INFO LOCK, tried %v - %v", tableName, err) } } @@ -32,7 +31,7 @@ func TestRecoverInfoLock(t *testing.T) { tbl.MakeDir() - grabbed := tbl.GrabInfoLock() + grabbed := tbl.GrabInfoLock() == nil if grabbed { t.Error("GRABBED INFO LOCK WHEN IT ALREADY EXISTS AND BELONGS ELSEWHERE") } @@ -49,9 +48,8 @@ func TestGrabDigestLock(t *testing.T) { tbl := GetTable(tableName) tbl.MakeDir() - grabbed := tbl.GrabDigestLock() - if !grabbed { - t.Error("COULD NOT GRAB DIGEST LOCK") + if err := tbl.GrabDigestLock(); err != nil { + t.Error("COULD NOT GRAB DIGEST LOCK", err) } } @@ -64,15 +62,15 @@ func TestRecoverDigestLock(t *testing.T) { tbl.MakeDir() // first grab digest lock - if grabbed := tbl.GrabDigestLock(); !grabbed { - t.Error("COULD NOT GRAB DIGEST LOCK") + if err := tbl.GrabDigestLock(); err != nil { + t.Error("COULD NOT GRAB DIGEST LOCK", err) } lock := Lock{Table: tbl, Name: STOMACHE_DIR} lock.ForceMakeFile(int64(0)) tbl.MakeDir() - grabbed := tbl.GrabDigestLock() + grabbed := tbl.GrabDigestLock() == nil if grabbed { t.Error("COULD GRAB DIGEST LOCK WHEN IT ARLEADY EXISTS") } diff --git a/src/sybil/table_query.go b/src/sybil/table_query.go index 281594ac..061c2297 100644 --- a/src/sybil/table_query.go +++ b/src/sybil/table_query.go @@ -9,9 +9,11 @@ import ( "runtime/debug" "sync" "time" + + "github.com/pkg/errors" ) -func (t *Table) LoadAndQueryRecords(loadSpec *LoadSpec, querySpec *QuerySpec) int { +func (t *Table) LoadAndQueryRecords(loadSpec *LoadSpec, querySpec *QuerySpec) (int, error) { waystart := time.Now() Debug("LOADING", FLAGS.DIR, t.Name) @@ -33,10 +35,9 @@ func (t *Table) LoadAndQueryRecords(loadSpec *LoadSpec, querySpec *QuerySpec) in blockSpecs := make(map[string]*QuerySpec) toCacheSpecs := make(map[string]*QuerySpec) - loadedInfo := t.LoadTableInfo() - if !loadedInfo { + if err := t.LoadTableInfo(); err != nil { if t.HasFlagFile() { - return 0 + return 0, errors.Wrap(err, "issue loading existing table") } } @@ -65,7 +66,11 @@ func (t *Table) LoadAndQueryRecords(loadSpec *LoadSpec, querySpec *QuerySpec) in allResults := make([]*QuerySpec, 0) brokenMu := sync.Mutex{} - brokenBlocks := make([]string, 0) + type brokenBlock struct { + name string + err error + } + brokenBlocks := make([]brokenBlock, 0) var memstats runtime.MemStats var maxAlloc = uint64(0) @@ -111,10 +116,14 @@ func (t *Table) LoadAndQueryRecords(loadSpec *LoadSpec, querySpec *QuerySpec) in if querySpec != nil { replacements = querySpec.StrReplace } - block = t.LoadBlockFromDir(filename, loadSpec, loadAll, replacements) - if block == nil { + var err error + block, err = t.LoadBlockFromDir(filename, loadSpec, loadAll, replacements) + if block == nil || err != nil { brokenMu.Lock() - brokenBlocks = append(brokenBlocks, filename) + brokenBlocks = append(brokenBlocks, brokenBlock{ + name: filename, + err: err, + }) brokenMu.Unlock() return } @@ -352,6 +361,5 @@ func (t *Table) LoadAndQueryRecords(loadSpec *LoadSpec, querySpec *QuerySpec) in t.WriteBlockCache() - return count - + return count, nil } diff --git a/src/sybil/table_trim.go b/src/sybil/table_trim.go index 6f616cd5..a1bba655 100644 --- a/src/sybil/table_trim.go +++ b/src/sybil/table_trim.go @@ -9,7 +9,7 @@ type TrimSpec struct { // List all the blocks that should be trimmed to keep the table within it's // memory limits -func (t *Table) TrimTable(trimSpec *TrimSpec) []*TableBlock { +func (t *Table) TrimTable(trimSpec *TrimSpec) ([]*TableBlock, error) { t.LoadRecords(nil) Debug("TRIMMING TABLE, MEMORY LIMIT", trimSpec.MBLimit, "TIME LIMIT", trimSpec.DeleteBefore) @@ -21,7 +21,10 @@ func (t *Table) TrimTable(trimSpec *TrimSpec) []*TableBlock { continue } - block := t.LoadBlockFromDir(b.Name, nil, false, nil) + block, err := t.LoadBlockFromDir(b.Name, nil, false, nil) + if err != nil { + return nil, err + } if block != nil { if block.Info.IntInfoMap[FLAGS.TIME_COL] != nil { block.table = t @@ -54,5 +57,5 @@ func (t *Table) TrimTable(trimSpec *TrimSpec) []*TableBlock { size += b.Size } - return toTrim + return toTrim, nil }