From 640b2d704114df4f6d7019d10716e26844715254 Mon Sep 17 00:00:00 2001 From: Muhamad Azamy Date: Fri, 20 Oct 2023 16:56:55 +0200 Subject: [PATCH] WIP: rework update process --- cmds/identityd/main.go | 186 ++------------ cmds/identityd/monitor.go | 15 +- pkg/environment/config.go | 2 +- pkg/environment/environment.go | 18 +- pkg/monitord/host.go | 15 -- pkg/upgrade/boot.go | 116 ++++----- pkg/upgrade/hub.go | 283 ---------------------- pkg/upgrade/hub/hub.go | 347 ++++++++++++++++++++++++++ pkg/upgrade/hub/hub_test.go | 39 +++ pkg/upgrade/hub_test.go | 56 ----- pkg/upgrade/upgrade.go | 429 ++++++++++++--------------------- pkg/upgrade/upgrade_test.go | 5 +- pkg/upgrade/watcher.go | 65 ----- 13 files changed, 629 insertions(+), 947 deletions(-) delete mode 100644 pkg/upgrade/hub.go create mode 100644 pkg/upgrade/hub/hub.go create mode 100644 pkg/upgrade/hub/hub_test.go delete mode 100644 pkg/upgrade/hub_test.go delete mode 100644 pkg/upgrade/watcher.go diff --git a/cmds/identityd/main.go b/cmds/identityd/main.go index 1ab345f68..282d8dce8 100644 --- a/cmds/identityd/main.go +++ b/cmds/identityd/main.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "math/rand" "os" "os/signal" "syscall" @@ -14,7 +13,6 @@ import ( "github.com/threefoldtech/zos/pkg/stubs" "github.com/threefoldtech/zos/pkg/upgrade" - "github.com/cenkalti/backoff/v3" "github.com/threefoldtech/zos/pkg" "github.com/threefoldtech/zos/pkg/environment" "github.com/threefoldtech/zos/pkg/identity" @@ -118,10 +116,6 @@ func main() { log.Fatal().Err(err).Str("root", root).Msg("failed to create root directory") } - var boot upgrade.Boot - - bootMethod := boot.DetectBootMethod() - // 2. Register the node to BCDB // at this point we are running latest version idMgr, err := getIdentityMgr(root, debug) @@ -129,7 +123,12 @@ func main() { log.Fatal().Err(err).Msg("failed to create identity manager") } - monitor := newVersionMonitor(10 * time.Second) + upgrader, err := upgrade.NewUpgrader(root, upgrade.NoSelfUpgrade(debug)) + if err != nil { + log.Fatal().Err(err).Msg("failed to initialize upgrader") + } + + monitor := newVersionMonitor(10*time.Second, upgrader.Version()) // 3. start zbus server to serve identity interface server, err := zbus.NewRedisServer(module, broker, 1) if err != nil { @@ -143,18 +142,6 @@ func main() { // register the cancel function with defer if the process stops because of a update defer cancel() - upgrader, err := upgrade.NewUpgrader(root, upgrade.NoSelfUpgrade(debug)) - if err != nil { - log.Fatal().Err(err).Msg("failed to initialize upgrader") - } - - err = installBinaries(&boot, upgrader) - if err == upgrade.ErrRestartNeeded { - return - } else if err != nil { - log.Error().Err(err).Msg("failed to install binaries") - } - go func() { if err := server.Run(ctx); err != nil && err != context.Canceled { log.Error().Err(err).Msg("unexpected error") @@ -165,39 +152,25 @@ func main() { log.Info().Msg("received a termination signal") }) - if bootMethod != upgrade.BootMethodFList { - log.Info().Msg("node is not booted from an flist. upgrade is not supported") - <-ctx.Done() + err = upgrader.Run(ctx) + if errors.Is(err, upgrade.ErrRestartNeeded) { return + } else if err != nil { + log.Error().Err(err).Msg("error during update") + os.Exit(1) } - //NOTE: code after this commit will only - //run if the system is booted from an flist - - // 4. Start watcher for new version - log.Info().Msg("start upgrade daemon") - - // TODO: do we need to update farmer on node upgrade? - upgradeLoop(ctx, &boot, upgrader, debug, monitor, func(string) error { return nil }) } // allow reinstall if receive signal USR1 // only allowed in debug mode -func debugReinstall(boot *upgrade.Boot, up *upgrade.Upgrader) { - c := make(chan os.Signal) +func debugReinstall(up *upgrade.Upgrader) { + c := make(chan os.Signal, 10) signal.Notify(c, syscall.SIGUSR1) go func() { for range c { - current, err := boot.Current() - if err != nil { - log.Error().Err(err).Msg("couldn't get current flist info") - continue - } - - if err := Safe(func() error { - return up.Upgrade(current, current) - }); err != nil { + if err := up.Reinstall(); err != nil { log.Error().Err(err).Msg("reinstall failed") } else { log.Info().Msg("reinstall completed successfully") @@ -206,137 +179,6 @@ func debugReinstall(boot *upgrade.Boot, up *upgrade.Upgrader) { }() } -func installBinaries(boot *upgrade.Boot, upgrader *upgrade.Upgrader) error { - bins, _ := boot.CurrentBins() - env, _ := environment.Get() - - repoWatcher := upgrade.FListRepo{ - Repo: env.BinRepo, - Current: bins, - } - - current, toAdd, toDel, err := repoWatcher.Diff() - if err != nil { - return errors.Wrap(err, "failed to list latest binaries to install") - } - - if len(toAdd) == 0 && len(toDel) == 0 { - return nil - } - - for _, pkg := range toDel { - log.Debug().Str("package", pkg.Target).Msg("uninstall package") - if err := upgrader.UninstallBinary(pkg); err != nil { - log.Error().Err(err).Str("flist", pkg.Fqdn()).Msg("failed to uninstall flist") - } - } - - for _, pkg := range toAdd { - log.Debug().Str("package", pkg.Target).Msg("install package") - if err := upgrader.InstallBinary(pkg); err != nil { - log.Error().Err(err).Str("package", pkg.Fqdn()).Msg("failed to install package") - } - } - - if err := boot.SetBins(current); err != nil { - return errors.Wrap(err, "failed to commit pkg status") - } - - return upgrade.ErrRestartNeeded -} - -func upgradeLoop( - ctx context.Context, - boot *upgrade.Boot, - upgrader *upgrade.Upgrader, - debug bool, - monitor *monitorStream, - register func(string) error) { - - if debug { - debugReinstall(boot, upgrader) - } - - monitor.C <- boot.MustVersion() - var hub upgrade.HubClient - - flist := boot.Name() - //current := boot.MustVersion() - for { - // delay in case of error - select { - case <-time.After(5 * time.Second): - case <-ctx.Done(): - return - } - - current, err := boot.Current() - if err != nil { - log.Fatal().Err(err).Msg("cannot get current boot flist information") - } - - latest, err := hub.Info(flist) - if err != nil { - log.Error().Err(err).Msg("failed to get flist info") - continue - } - log.Info(). - Str("current", current.TryVersion().String()). - Str("latest", current.TryVersion().String()). - Msg("checking if update is required") - - if !latest.TryVersion().GT(current.TryVersion()) { - // We wanted to use the node id to actually calculate the delay to wait but it's not - // possible to get the numeric node id from the identityd - next := time.Duration(60+rand.Intn(60)) * time.Minute - log.Info().Dur("wait", next).Msg("checking for update after milliseconds") - select { - case <-time.After(next): - case <-ctx.Done(): - return - } - - continue - } - - // next check for update - exp := backoff.NewExponentialBackOff() - exp.MaxInterval = 3 * time.Minute - exp.MaxElapsedTime = 60 * time.Minute - err = backoff.Retry(func() error { - log.Debug().Str("version", latest.TryVersion().String()).Msg("trying to update") - - err := Safe(func() error { - return upgrader.Upgrade(current, latest) - }) - - if err == upgrade.ErrRestartNeeded { - return backoff.Permanent(err) - } else if err != nil { - log.Error().Err(err).Msg("update failure. retrying") - } - - return nil - }, exp) - - if err == upgrade.ErrRestartNeeded { - log.Info().Msg("restarting upgraded") - return - } else if err != nil { - //TODO: crash or continue! - log.Error().Err(err).Msg("upgrade failed") - continue - } else { - log.Info().Str("version", latest.TryVersion().String()).Msg("update completed") - } - if err := boot.Set(latest); err != nil { - log.Fatal().Err(err).Msg("failed to update boot information") - } - - monitor.C <- latest.TryVersion() - } -} - func getIdentityMgr(root string, debug bool) (pkg.IdentityManager, error) { manager, err := identity.NewManager(root, debug) if err != nil { diff --git a/cmds/identityd/monitor.go b/cmds/identityd/monitor.go index 2a9c81521..8c68a6d26 100644 --- a/cmds/identityd/monitor.go +++ b/cmds/identityd/monitor.go @@ -9,7 +9,6 @@ import ( ) type monitorStream struct { - C chan semver.Version duration time.Duration version semver.Version } @@ -17,9 +16,8 @@ type monitorStream struct { var _ pkg.VersionMonitor = (*monitorStream)(nil) // newVersionMonitor creates a new instance of version monitor -func newVersionMonitor(d time.Duration) *monitorStream { +func newVersionMonitor(d time.Duration, version semver.Version) *monitorStream { return &monitorStream{ - C: make(chan semver.Version), duration: d, } } @@ -32,14 +30,11 @@ func (m *monitorStream) Version(ctx context.Context) <-chan semver.Version { ch := make(chan semver.Version) go func() { defer close(ch) + ch <- m.version + for { - select { - case <-time.After(m.duration): - ch <- m.version - case v := <-m.C: - m.version = v - ch <- m.version - } + ch <- m.version + time.Sleep(m.duration) } }() diff --git a/pkg/environment/config.go b/pkg/environment/config.go index 6a3ba9c97..0482f9bcb 100644 --- a/pkg/environment/config.go +++ b/pkg/environment/config.go @@ -60,7 +60,7 @@ func uniqueStr(slice []string) []string { return list } -func getConfig(run RunningMode, url string) (ext Config, err error) { +func getConfig(run RunMode, url string) (ext Config, err error) { if !strings.HasSuffix(url, "/") { url += "/" } diff --git a/pkg/environment/environment.go b/pkg/environment/environment.go index f18bf2e0f..0d7a8b665 100644 --- a/pkg/environment/environment.go +++ b/pkg/environment/environment.go @@ -19,7 +19,7 @@ const ( // Environment holds information about running environment of a node // it defines the different constant based on the running mode (dev, test, prod) type Environment struct { - RunningMode RunningMode + RunningMode RunMode FlistURL string BinRepo string @@ -39,10 +39,10 @@ type Environment struct { ExtendedConfigURL string } -// RunningMode type -type RunningMode string +// RunMode type +type RunMode string -func (r RunningMode) String() string { +func (r RunMode) String() string { switch r { case RunningDev: return "development" @@ -60,13 +60,13 @@ func (r RunningMode) String() string { // Possible running mode of a node const ( // RunningDev mode - RunningDev RunningMode = "dev" + RunningDev RunMode = "dev" // RunningQA mode - RunningQA RunningMode = "qa" + RunningQA RunMode = "qa" // RunningTest mode - RunningTest RunningMode = "test" + RunningTest RunMode = "test" // RunningMain mode - RunningMain RunningMode = "prod" + RunningMain RunMode = "prod" // Orphanage is the default farmid where nodes are registered // if no farmid were specified on the kernel command line @@ -185,7 +185,7 @@ func getEnvironmentFromParams(params kernel.Params) (Environment, error) { runmode = string(RunningMain) } - switch RunningMode(runmode) { + switch RunMode(runmode) { case RunningDev: env = envDev case RunningQA: diff --git a/pkg/monitord/host.go b/pkg/monitord/host.go index 23496e472..0775e3ae8 100644 --- a/pkg/monitord/host.go +++ b/pkg/monitord/host.go @@ -6,11 +6,8 @@ import ( "os" "time" - "github.com/fsnotify/fsnotify" - "github.com/pkg/errors" "github.com/rs/zerolog/log" "github.com/threefoldtech/zos/pkg" - "github.com/threefoldtech/zos/pkg/upgrade" ) // HostMonitor monitor host information @@ -23,18 +20,6 @@ func NewHostMonitor(duration time.Duration) (pkg.HostMonitor, error) { if duration == 0 { duration = 2 * time.Second } - watcher, err := fsnotify.NewWatcher() - if err != nil { - return nil, errors.Wrap(err, "failed to initialize fs watcher") - } - - // the file can not exist if the system was booted from overlay - if _, err := os.Stat(upgrade.FlistInfoFile); err == nil { - if err := watcher.Add(upgrade.FlistInfoFile); err != nil { - return nil, errors.Wrapf(err, "failed to watch '%s'", upgrade.FlistInfoFile) - } - } - return &hostMonitor{ duration: duration, }, nil diff --git a/pkg/upgrade/boot.go b/pkg/upgrade/boot.go index a67142a9d..3983847a2 100644 --- a/pkg/upgrade/boot.go +++ b/pkg/upgrade/boot.go @@ -8,33 +8,39 @@ import ( "strings" "github.com/blang/semver" - "github.com/pkg/errors" "github.com/rs/zerolog/log" + "github.com/threefoldtech/zos/pkg/environment" + "github.com/threefoldtech/zos/pkg/upgrade/hub" ) const ( // those values must match the values // in the bootstrap process. (bootstrap.sh) - // FlistNameFile file contains boot flist repo/name - FlistNameFile = "/tmp/flist.name" - // FlistInfoFile file container boot flist infor - FlistInfoFile = "/tmp/flist.info" - // BinariesFile file contains binaries database - BinariesFile = "/tmp/bins.info" + TagFile = "/tmp/tag.info" +) + +var ( + ErrNotBootstrapped = fmt.Errorf("node was not bootstrapped") ) // BootMethod defines the node boot method type BootMethod string const ( - // BootMethodFList booted from an flist - BootMethodFList BootMethod = "flist" + // BootMethodBootstrap booted with bootstrapping. + // this means that all packages are installed from flist + BootMethodBootstrap BootMethod = "bootstrap" // BootMethodOther booted with other methods + // only happen during development (VM + overlay) BootMethodOther BootMethod = "other" ) +func (b BootMethod) IsBootstrapped() bool { + return b == BootMethodBootstrap +} + // Boot struct type Boot struct{} @@ -42,7 +48,7 @@ type Boot struct{} // of the node func (b Boot) DetectBootMethod() BootMethod { log.Info().Msg("detecting boot method") - _, err := os.Stat(FlistNameFile) + _, err := os.Stat(TagFile) if err != nil { log.Warn().Err(err).Msg("no flist file found") return BootMethodOther @@ -53,85 +59,69 @@ func (b Boot) DetectBootMethod() BootMethod { // to do a call to the hub, hence the detection // can be affected by the network state, or the // hub state. So we return immediately - return BootMethodFList + return BootMethodBootstrap } // Name always return name of the boot flist. If name file // does not exist, an empty string is returned -func (b *Boot) Name() string { - data, _ := os.ReadFile(FlistNameFile) - return strings.TrimSpace(string(data)) +func (b *Boot) RunMode() environment.RunMode { + env := environment.MustGet() + return env.RunningMode } -// CurrentBins returns a list of current binaries installed -func (b *Boot) CurrentBins() (map[string]FListInfo, error) { - f, err := os.Open(BinariesFile) +func (b *Boot) Version() semver.Version { + current, err := b.Current() if err != nil { - return nil, err + return semver.MustParse("0.0.0") } - dec := json.NewDecoder(f) - - var result map[string]FListInfo - err = dec.Decode(&result) - return result, err -} -// SetBins sets the current list of binaries in boot files -func (b *Boot) SetBins(current map[string]FListInfo) error { - f, err := os.Create(BinariesFile) - if err != nil { - return err + last := filepath.Base(current.Target) + var ver semver.Version + if strings.HasPrefix(last, "v") { + ver, err = semver.Parse(strings.TrimPrefix(last, "v")) + if err == nil { + return ver + } } - enc := json.NewEncoder(f) - return enc.Encode(current) + + ver.Pre = append(ver.Pre, semver.PRVersion{VersionStr: last}) + + return ver } // Current returns current flist information -func (b *Boot) Current() (FullFListInfo, error) { - name := b.Name() - if len(name) == 0 { - return FullFListInfo{}, fmt.Errorf("flist name is not known") +func (b *Boot) Current() (flist hub.TagLink, err error) { + f, err := os.Open(TagFile) + if os.IsNotExist(err) { + return flist, ErrNotBootstrapped + } else if err != nil { + return flist, err } - return loadInfo(name, FlistInfoFile) -} + defer f.Close() -// Set updates the stored flist info -func (b *Boot) Set(c FullFListInfo) error { - return c.Commit(FlistInfoFile) -} + dec := json.NewDecoder(f) -// Version always returns curent version of flist -func (b *Boot) Version() (semver.Version, error) { - info, err := b.Current() - if err != nil { - return semver.Version{}, errors.Wrap(err, "failed to load flist info") + if err = dec.Decode(&flist); err != nil { + return flist, err } - return info.Version() -} - -// MustVersion must returns the current version or panic -func (b *Boot) MustVersion() semver.Version { - ver, err := b.Version() - if err != nil { - panic(err) + if flist.Type != hub.TypeTagLink { + return flist, fmt.Errorf("expected current installation info to be a taglink, found '%s'", flist.Type) } - return ver + return } -// loadInfo get boot info set by bootstrap process -func loadInfo(fqn string, path string) (info FullFListInfo, err error) { - info.Repository = filepath.Dir(fqn) - f, err := os.Open(path) +// Set updates the stored flist info +func (b *Boot) Set(c hub.TagLink) error { + f, err := os.Create(TagFile) if err != nil { - return info, err + return err } defer f.Close() - dec := json.NewDecoder(f) - err = dec.Decode(&info) - return info, err + enc := json.NewEncoder(f) + return enc.Encode(c) } diff --git a/pkg/upgrade/hub.go b/pkg/upgrade/hub.go deleted file mode 100644 index 5f5a03063..000000000 --- a/pkg/upgrade/hub.go +++ /dev/null @@ -1,283 +0,0 @@ -package upgrade - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "os" - "path" - "path/filepath" - "strings" - - "github.com/blang/semver" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - "github.com/threefoldtech/0-fs/meta" -) - -const ( - // hubBaseURL base hub url - hubBaseURL = "https://hub.grid.tf/" - - // hubStorage default hub db - hubStorage = "zdb://hub.grid.tf:9900" -) - -// HubClient API for f-list -type HubClient struct{} - -// MountURL returns the full url of given flist. -func (h *HubClient) MountURL(flist string) string { - url, err := url.Parse(hubBaseURL) - if err != nil { - panic("invalid base url") - } - url.Path = flist - return url.String() -} - -// StorageURL return hub storage url -func (h *HubClient) StorageURL() string { - return hubStorage -} - -// Info gets flist info from hub -func (h *HubClient) Info(flist string) (info FullFListInfo, err error) { - u, err := url.Parse(hubBaseURL) - if err != nil { - panic("invalid base url") - } - - info.Repository = filepath.Dir(flist) - - u.Path = filepath.Join("api", "flist", flist, "light") - response, err := http.Get(u.String()) - if err != nil { - return info, err - } - - defer response.Body.Close() - defer func() { - _, _ = io.ReadAll(response.Body) - }() - - if response.StatusCode != http.StatusOK { - return info, fmt.Errorf("failed to get flist (%s) info: %s", flist, response.Status) - } - defer response.Body.Close() - - dec := json.NewDecoder(response.Body) - - err = dec.Decode(&info) - return info, err -} - -// List list repo flists -func (h *HubClient) List(repo string) ([]FListInfo, error) { - u, err := url.Parse(hubBaseURL) - if err != nil { - panic("invalid base url") - } - - u.Path = filepath.Join("api", "flist", repo) - response, err := http.Get(u.String()) - if err != nil { - return nil, err - } - - defer response.Body.Close() - defer func() { - _, _ = io.ReadAll(response.Body) - }() - - if response.StatusCode != http.StatusOK { - return nil, fmt.Errorf("failed to get repository listing: %s", response.Status) - } - - dec := json.NewDecoder(response.Body) - - var result []FListInfo - err = dec.Decode(&result) - - for i := range result { - result[i].Repository = repo - } - - return result, err -} - -// Download downloads an flist (fqn: repo/name) to cache and return the full -// path to the extraced meta data directory. the returned path is in format -// {cache}/{hash}/ -func (h *HubClient) Download(cache, flist string) (string, error) { - log.Info().Str("cache", cache).Str("flist", flist).Msg("attempt downloading flist") - var info FullFListInfo - for { - var err error - info, err = h.Info(flist) - if err != nil { - return "", err - } - if info.Type == "symlink" { - flist = filepath.Join(filepath.Dir(flist), info.Target) - } else if info.Type == "regular" { - break - } else { - return "", fmt.Errorf("unknown flist type: %s", info.Type) - } - } - - if info.Hash == "" { - return "", fmt.Errorf("invalid flist info returned") - } - - const ( - dbFileName = "flistdb.sqlite3" - ) - - // check if already downloaded - downloaded := filepath.Join(cache, info.Hash) - extracted := fmt.Sprintf("%s.d", downloaded) - - if stat, err := os.Stat(filepath.Join(extracted, dbFileName)); err == nil { - // already exists. - if stat.Size() > 0 { - log.Info().Str("flist", flist).Msg("already cached") - return extracted, nil - } - } - - u, err := url.Parse(hubBaseURL) - if err != nil { - panic("invalid base url") - } - - u.Path = flist - log.Debug().Str("url", u.String()).Msg("downloading flist") - response, err := http.Get(u.String()) - if err != nil { - return "", errors.Wrap(err, "failed to download flist") - } - - defer response.Body.Close() - if response.StatusCode != http.StatusOK { - return "", fmt.Errorf("failed to download flist: %s", response.Status) - } - - return extracted, meta.Unpack(response.Body, extracted) -} - -// FListInfo is information of flist as returned by repo list operation -type FListInfo struct { - Name string `json:"name"` - Target string `json:"target"` - Type string `json:"type"` - Updated uint64 `json:"updated"` - Repository string `json:"-"` -} - -// FullFListInfo reflects node boot information (flist + version) -type FullFListInfo struct { - FListInfo - Hash string `json:"md5"` - Size uint64 `json:"size"` -} - -// FileInfo is the file of an flist -type FileInfo struct { - Path string `json:"path"` - Size uint64 `json:"size"` -} - -// Commit write version to version file -func (b *FullFListInfo) Commit(path string) error { - f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0400) - if err != nil { - return err - } - defer f.Close() - enc := json.NewEncoder(f) - - return enc.Encode(b) -} - -// Fqdn return the full flist name -func (b *FListInfo) Fqdn() string { - return path.Join(b.Repository, b.Name) -} - -func (b *FListInfo) extractVersion(name string) (ver semver.Version, err error) { - // name is suppose to be as follows - // :.flist - parts := strings.Split(name, ":") - last := parts[len(parts)-1] - last = strings.TrimPrefix(last, "v") - last = strings.TrimSuffix(last, ".flist") - return semver.Parse(last) -} - -// Version returns the version of the flist -func (b *FListInfo) Version() (semver.Version, error) { - // computing the version is tricky because it's part of the name - // of the flist (or the target) depends on the type of the flist - - if b.Type == "symlink" { - return b.extractVersion(b.Target) - } - - return b.extractVersion(b.Name) -} - -func (b *FListInfo) TryVersion() semver.Version { - ver, _ := b.Version() - return ver -} - -// Absolute returns the actual flist name -func (b *FListInfo) Absolute() string { - name := b.Name - if b.Type == "symlink" { - name = b.Target - } - - return filepath.Join(b.Repository, name) -} - -// Files gets the list of the files of an flist -func (b *FListInfo) Files() ([]FileInfo, error) { - flist := b.Absolute() - if len(flist) == 0 { - return nil, fmt.Errorf("invalid flist info") - } - - var content struct { - Content []FileInfo `json:"content"` - } - - u, err := url.Parse(hubBaseURL) - if err != nil { - panic("invalid base url") - } - - u.Path = filepath.Join("api", "flist", flist) - response, err := http.Get(u.String()) - if err != nil { - return nil, err - } - - defer response.Body.Close() - defer func() { - _, _ = io.ReadAll(response.Body) - }() - - if response.StatusCode != http.StatusOK { - return nil, fmt.Errorf("failed to get flist info: %s", response.Status) - } - - dec := json.NewDecoder(response.Body) - - err = dec.Decode(&content) - return content.Content, err -} diff --git a/pkg/upgrade/hub/hub.go b/pkg/upgrade/hub/hub.go new file mode 100644 index 000000000..36eda0e62 --- /dev/null +++ b/pkg/upgrade/hub/hub.go @@ -0,0 +1,347 @@ +package hub + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "strings" + + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + "github.com/threefoldtech/0-fs/meta" +) + +const ( + // hubBaseURL base hub url + hubBaseURL = "https://hub.grid.tf/" + + // hubStorage default hub db + hubStorage = "zdb://hub.grid.tf:9900" +) + +type FListType string + +const ( + TypeRegular FListType = "regular" + TypeSymlink FListType = "symlink" + TypeTag FListType = "tag" + TypeTagLink FListType = "taglink" +) + +type FListFilter interface { + matches(*FList) bool +} + +type matchName struct { + name string +} + +func (m matchName) matches(f *FList) bool { + return f.Name == m.name +} + +type matchType struct { + typ FListType +} + +func (m matchType) matches(f *FList) bool { + return f.Type == m.typ +} + +func MatchName(name string) FListFilter { + return matchName{name} +} + +func MatchType(typ FListType) FListFilter { + return matchType{typ} +} + +// HubClient API for f-list +type HubClient struct{} + +// MountURL returns the full url of given flist. +func (h *HubClient) MountURL(flist string) string { + url, err := url.Parse(hubBaseURL) + if err != nil { + panic("invalid base url") + } + url.Path = flist + return url.String() +} + +// StorageURL return hub storage url +func (h *HubClient) StorageURL() string { + return hubStorage +} + +// Info gets flist info from hub +func (h *HubClient) Info(repo, name string) (info FList, err error) { + u, err := url.Parse(hubBaseURL) + if err != nil { + panic("invalid base url") + } + + u.Path = filepath.Join("api", "flist", repo, name, "light") + response, err := http.Get(u.String()) + if err != nil { + return info, err + } + + defer response.Body.Close() + defer func() { + _, _ = io.ReadAll(response.Body) + }() + + if response.StatusCode != http.StatusOK { + return info, fmt.Errorf("failed to get flist (%s/%s) info: %s", repo, name, response.Status) + } + defer response.Body.Close() + + dec := json.NewDecoder(response.Body) + + err = dec.Decode(&info) + return info, err +} + +func (h *HubClient) Find(repo string, filter ...FListFilter) ([]FList, error) { + result, err := h.List(repo) + if err != nil { + return nil, err + } + + filtered := result[:0] +next: + for _, flist := range result { + for _, m := range filter { + if !m.matches(&flist) { + continue next + } + } + filtered = append(filtered, flist) + } + + return filtered, nil +} + +// List list repo flists +func (h *HubClient) List(repo string) ([]FList, error) { + u, err := url.Parse(hubBaseURL) + if err != nil { + panic("invalid base url") + } + + u.Path = filepath.Join("api", "flist", repo) + response, err := http.Get(u.String()) + if err != nil { + return nil, err + } + + defer response.Body.Close() + defer func() { + _, _ = io.ReadAll(response.Body) + }() + + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to get repository listing: %s", response.Status) + } + + dec := json.NewDecoder(response.Body) + + var result []FList + err = dec.Decode(&result) + + return result, err +} + +func (h *HubClient) ListTag(repo, tag string) ([]Symlink, error) { + u, err := url.Parse(hubBaseURL) + if err != nil { + panic("invalid base url") + } + + u.Path = filepath.Join("api", "flist", repo, "tags", tag) + response, err := http.Get(u.String()) + if err != nil { + return nil, err + } + + defer response.Body.Close() + defer func() { + _, _ = io.ReadAll(response.Body) + }() + + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to get repository listing: %s", response.Status) + } + + dec := json.NewDecoder(response.Body) + + var result []Symlink + err = dec.Decode(&result) + + return result, err +} + +// Download downloads an flist to cache and return the full +// path to the extraced meta data directory. the returned path is in format +// {cache}/{hash}/ +func (h *HubClient) Download(cache, repo, name string) (string, error) { + log := log.With().Str("cache", cache).Str("repo", repo).Str("name", name).Logger() + + log.Info().Msg("attempt downloading flist") + + info, err := h.Info(repo, name) + if err != nil { + return "", err + } + + if info.Hash == "" { + return "", fmt.Errorf("invalid flist info returned") + } + + const ( + dbFileName = "flistdb.sqlite3" + ) + + // check if already downloaded + downloaded := filepath.Join(cache, info.Hash) + extracted := fmt.Sprintf("%s.d", downloaded) + + if stat, err := os.Stat(filepath.Join(extracted, dbFileName)); err == nil { + // already exists. + if stat.Size() > 0 { + log.Info().Msg("already cached") + return extracted, nil + } + } + + u, err := url.Parse(hubBaseURL) + if err != nil { + panic("invalid base url") + } + + u.Path = filepath.Join(repo, name) + log.Debug().Str("url", u.String()).Msg("downloading flist") + response, err := http.Get(u.String()) + if err != nil { + return "", errors.Wrap(err, "failed to download flist") + } + + defer response.Body.Close() + if response.StatusCode != http.StatusOK { + return "", fmt.Errorf("failed to download flist: %s", response.Status) + } + + return extracted, meta.Unpack(response.Body, extracted) +} + +// FList is information of flist as returned by repo list operation +type FList struct { + Name string `json:"name"` + Target string `json:"target"` + Type FListType `json:"type"` + Updated uint64 `json:"updated"` + Hash string `json:"md5"` +} + +// FileInfo is the file of an flist +type FileInfo struct { + Path string `json:"path"` + Size uint64 `json:"size"` +} + +type Regular struct { + FList +} + +func NewRegular(flist FList) Regular { + if flist.Type != TypeRegular { + panic("invalid flist type") + } + + return Regular{flist} +} + +// Files gets the list of the files of an flist +func (b *Regular) Files(repo string) ([]FileInfo, error) { + var content struct { + Content []FileInfo `json:"content"` + } + + u, err := url.Parse(hubBaseURL) + if err != nil { + panic("invalid base url") + } + + u.Path = filepath.Join("api", "flist", repo, b.Name) + response, err := http.Get(u.String()) + if err != nil { + return nil, err + } + + defer response.Body.Close() + defer func() { + _, _ = io.ReadAll(response.Body) + }() + + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to get flist info: %s", response.Status) + } + + dec := json.NewDecoder(response.Body) + + err = dec.Decode(&content) + return content.Content, err +} + +// TagLink is an flist of type taglink +type TagLink struct { + FList +} + +func NewTagLink(flist FList) TagLink { + if flist.Type != TypeTagLink { + panic("invalid flist type") + } + + return TagLink{flist} +} + +func (t *TagLink) Destination() (repo string, tag string, err error) { + parts := strings.Split(t.Target, "/") + if len(parts) != 3 || parts[1] != "tags" { + return repo, tag, fmt.Errorf("invalid target '%s' for taglink", t.Target) + } + + return parts[0], parts[2], nil +} + +type Symlink struct { + FList +} + +func NewSymlink(flist FList) Symlink { + if flist.Type != TypeSymlink { + panic("invalid flist type") + } + + return Symlink{flist} +} + +// Destination gets destination flist for a symlink flist +// source repo is the repo where this symlink lives, since the symlink +// can either be an absolute or relative target. +func (t *Symlink) Destination(source string) (repo string, name string, err error) { + parts := strings.Split(t.Target, "/") + if len(parts) == 1 { + return source, t.Target, nil + } else if len(parts) == 2 { + return parts[0], parts[1], nil + } else { + return repo, name, fmt.Errorf("invalid target '%s' for symlink", t.Target) + } +} diff --git a/pkg/upgrade/hub/hub_test.go b/pkg/upgrade/hub/hub_test.go new file mode 100644 index 000000000..1217b97d2 --- /dev/null +++ b/pkg/upgrade/hub/hub_test.go @@ -0,0 +1,39 @@ +package hub + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHub(t *testing.T) { + const tag = "3b51aa5" + const repo = "tf-autobuilder" + + var hub HubClient + results, err := hub.ListTag(repo, tag) + require.NoError(t, err) + require.Len(t, results, 20) + + var zos Symlink + for _, f := range results { + if f.Name == "zos.flist" { + zos = f + break + } + } + + // symlink starts from repo, and can point us to + // another repo destRepo + destRepo, name, err := zos.Destination(repo) + require.NoError(t, err) + + info, err := hub.Info(destRepo, name) + require.NoError(t, err) + + regular := NewRegular(info) + + files, err := regular.Files(repo) + require.NoError(t, err) + require.NotEmpty(t, files) +} diff --git a/pkg/upgrade/hub_test.go b/pkg/upgrade/hub_test.go deleted file mode 100644 index bfe47d2a0..000000000 --- a/pkg/upgrade/hub_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package upgrade - -import ( - "encoding/json" - "testing" - - "github.com/blang/semver" - "github.com/stretchr/testify/require" -) - -func TestInfo(t *testing.T) { - tt := []struct { - name string - input string - version semver.Version - }{ - { - name: "0.2.0", - input: `{"size": 0, "name": "zos:development:latest.flist", "target": "zos:development:0.2.0.flist", "type": "symlink", "updated": 1569924782, "md5": "9798ef9b930b49ab18c45953cf1d2369"}`, - version: semver.MustParse("0.2.0"), - }, - { - name: "v0.2.0", - input: `{"size": 0, "name": "zos:development:latest.flist", "target": "zos:development:v0.2.0.flist", "type": "symlink", "updated": 1569924782, "md5": "9798ef9b930b49ab18c45953cf1d2369"}`, - version: semver.MustParse("0.2.0"), - }, - } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - - var info FullFListInfo - err := json.Unmarshal([]byte(tc.input), &info) - require.NoError(t, err) - - ver, err := info.Version() - require.NoError(t, err) - - require.Equal(t, tc.version, ver) - }) - } -} - -func TestInfoGet(t *testing.T) { - const flist = "thabet/redis.flist" - - var hub HubClient - info, err := hub.Info(flist) - require.NoError(t, err) - require.Equal(t, flist, info.Absolute()) - - files, err := info.Files() - require.NoError(t, err) - - require.NotEmpty(t, files) -} diff --git a/pkg/upgrade/upgrade.go b/pkg/upgrade/upgrade.go index 131492dc1..64adb735d 100644 --- a/pkg/upgrade/upgrade.go +++ b/pkg/upgrade/upgrade.go @@ -1,20 +1,22 @@ package upgrade import ( + "context" "fmt" "io" + "math/rand" "os" "path/filepath" "strings" - "syscall" "time" + "github.com/blang/semver" "github.com/pkg/errors" - "github.com/shirou/gopsutil/process" "github.com/threefoldtech/0-fs/meta" "github.com/threefoldtech/0-fs/rofs" "github.com/threefoldtech/0-fs/storage" + "github.com/threefoldtech/zos/pkg/upgrade/hub" "github.com/threefoldtech/zos/pkg/zinit" "github.com/rs/zerolog/log" @@ -35,15 +37,22 @@ var ( const ( defaultHubStorage = "zdb://hub.grid.tf:9900" defaultZinitSocket = "/var/run/zinit.sock" + + checkForUpdateEvery = 60 * time.Minute + checkJitter = 10 // minutes + + ZosRepo = "tf-zos" + ZosPackage = "zos.flist" ) // Upgrader is the component that is responsible // to keep 0-OS up to date type Upgrader struct { + boot Boot zinit *zinit.Client root string noSelfUpdate bool - hub HubClient + hub hub.HubClient storage storage.Storage } @@ -114,34 +123,159 @@ func NewUpgrader(root string, opts ...UpgraderOption) (*Upgrader, error) { return u, nil } -func (u *Upgrader) flistCache() string { - return filepath.Join(u.root, "cache", "flist") +func (u *Upgrader) Run(ctx context.Context) error { + method := u.boot.DetectBootMethod() + if method == BootMethodOther { + // we need to do an update one time to fetch all + // binaries required by the system except for the zos + // binaries + // then we should block forever + remote, err := u.remote() + if err != nil { + return errors.Wrap(err, "failed to get remote tag") + } + + if err := u.updateTo(remote); err != nil { + return errors.Wrap(err, "failed to run update") + } + + <-ctx.Done() + return nil + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(u.nextUpdate()): + } + + if err := u.update(); err != nil { + return err + } + } } -func (u *Upgrader) fileCache() string { - return filepath.Join(u.root, "cache", "files") +func (u *Upgrader) Reinstall() error { + current, err := u.boot.Current() + if err != nil { + return err + } + + return u.updateTo(current) +} + +func (u *Upgrader) Version() semver.Version { + return u.boot.Version() } -// Upgrade is the method that does a full upgrade flow -// first check if a new version is available -// if yes, applies the upgrade -// on a successfully update, upgrade WILL NOT RETURN -// instead the upgraded daemon will be completely stopped -func (u *Upgrader) Upgrade(from, to FullFListInfo) error { - if err := u.applyUpgrade(from, to); err != nil { +func (u *Upgrader) nextUpdate() time.Duration { + jitter := rand.Intn(checkJitter) + return checkForUpdateEvery + (time.Duration(jitter) * time.Minute) +} + +func (u *Upgrader) remote() (remote hub.TagLink, err error) { + mode := u.boot.RunMode() + // find all taglinks that matches the same run mode (ex: development) + matches, err := u.hub.Find( + ZosRepo, + hub.MatchName(mode.String()), + hub.MatchType(hub.TypeTagLink), + ) + + if err != nil { + return remote, err + } + + if len(matches) != 1 { + return remote, fmt.Errorf("can't find taglink that matches '%s'", mode.String()) + } + + return hub.NewTagLink(matches[0]), nil +} + +func (u *Upgrader) update() error { + // here we need to do a normal full update cycle + current, err := u.boot.Current() + if err != nil { + return errors.Wrap(err, "failed to get info about current version") + } + + remote, err := u.remote() + if err != nil { + return errors.Wrap(err, "failed to get remote tag") + } + + // obviously a remote tag need to match the current tag. + // if the remote is different, we actually run the update and exit. + if remote.Target == current.Target { + // nothing to do! + return nil + } + + if err := u.updateTo(remote); err != nil { + return errors.Wrapf(err, "failed to update to new tag '%s'", remote.Target) + } + + if err := u.boot.Set(remote); err != nil { return err } - return u.cleanup() + return ErrRestartNeeded } -func (u *Upgrader) cleanup() error { - return u.zinit.Destroy(30*time.Second, deleteMe...) +func (u *Upgrader) updateTo(link hub.TagLink) error { + repo, tag, err := link.Destination() + if err != nil { + return errors.Wrap(err, "failed to get destination tag") + } + + packages, err := u.hub.ListTag(repo, tag) + if err != nil { + return errors.Wrapf(err, "failed to list tag '%s' packages", tag) + } + + var later [][]string + for _, pkg := range packages { + pkgRepo, name, err := pkg.Destination(repo) + if name == ZosPackage { + // this is the last to do + later = append(later, []string{pkgRepo, name}) + continue + } + + if err != nil { + return errors.Wrapf(err, "failed to find target for package '%s'", pkg.Target) + } + + // install package + if err := u.InstallBinary(pkgRepo, name); err != nil { + return errors.Wrapf(err, "failed to install package %s/%s", pkgRepo, name) + } + } + + // probably check flag for zos installation + for _, pkg := range later { + repo, name := pkg[0], pkg[1] + if err := u.InstallBinary(repo, name); err != nil { + return errors.Wrapf(err, "failed to install package %s/%s", repo, name) + } + } + + return nil +} + +func (u *Upgrader) flistCache() string { + return filepath.Join(u.root, "cache", "flist") +} + +func (u *Upgrader) fileCache() string { + return filepath.Join(u.root, "cache", "files") } // getFlist accepts fqdn of flist as `/.flist` -func (u *Upgrader) getFlist(flist string) (meta.Walker, error) { - db, err := u.hub.Download(u.flistCache(), flist) +func (u *Upgrader) getFlist(repo, name string) (meta.Walker, error) { + db, err := u.hub.Download(u.flistCache(), repo, name) if err != nil { return nil, errors.Wrap(err, "failed to download flist") } @@ -161,17 +295,17 @@ func (u *Upgrader) getFlist(flist string) (meta.Walker, error) { } // InstallBinary from a single flist. -func (u *Upgrader) InstallBinary(flist FListInfo) error { - log.Info().Str("flist", flist.Fqdn()).Msg("start applying upgrade") +func (u *Upgrader) InstallBinary(repo, name string) error { + log.Info().Str("repo", repo).Str("name", name).Msg("start installing package") - store, err := u.getFlist(flist.Fqdn()) + store, err := u.getFlist(repo, name) if err != nil { - return errors.Wrapf(err, "failed to process flist: %s", flist.Fqdn()) + return errors.Wrapf(err, "failed to process flist: %s/%s", repo, name) } defer store.Close() if err := u.copyRecursive(store, "/"); err != nil { - return errors.Wrapf(err, "failed to install flist: %s", flist.Fqdn()) + return errors.Wrapf(err, "failed to install flist: %s/%s", repo, name) } services, err := u.servicesFromStore(store) @@ -235,253 +369,6 @@ func (u *Upgrader) ensureRestarted(service ...string) error { return nil } -// UninstallBinary from a single flist. -func (u *Upgrader) UninstallBinary(flist FListInfo) error { - // we never delete those files from the system - // since there is no `package manager` for zos (yet) - // deleting the files from the flist blindly can cause - // issues if some deleted files were shared between - // multiple packages. - return u.uninstall(flist, false) -} - -// upgradeSelf will try to check if the flist has -// an upgraded binary with different revision. If yes -// it will copy the new binary and ask for a restart. -// next time this method is called, it will match the flist -// revision, and hence will continue updating all the other daemons -func (u *Upgrader) upgradeSelf(store meta.Walker) error { - log.Debug().Msg("starting self upgrade") - if u.noSelfUpdate { - log.Debug().Msg("skipping self upgrade") - return nil - } - - current := currentRevision() - log.Debug().Str("revision", current).Msg("current revision") - - bin := currentBinPath() - info, exists := store.Get(bin) - - if !exists { - // no bin for update daemon in the flist. - log.Debug().Str("bin", bin).Msg("binary file does not exist") - return nil - } - - newBin := fmt.Sprintf("%s.new", bin) - if err := u.copyFile(newBin, info); err != nil { - return err - } - - // the timeout here is set to 1 min because - // this most probably will trigger a download - // of the binary over 0-fs, hence we need to - // give it enough time to download the file - // on slow network (i am looking at u Egypt) - new, err := revisionOf(newBin, 2*time.Minute) - if err != nil { - return errors.Wrap(err, "failed to check new update daemon revision number") - } - - log.Debug().Str("revision", new).Msg("new revision") - - // nothing to be done here. - if current == new { - log.Debug().Msg("skipping self upgrade because same revision") - return nil - } - - if err := os.Rename(newBin, bin); err != nil { - return errors.Wrap(err, "failed to update self binary") - } - - log.Debug().Msg("revisions are differnet, self upgrade is needed") - return ErrRestartNeeded -} - -// uninstall a package, if del is true, also delete the files in that package -// from the system filesystem -func (u *Upgrader) uninstall(flist FListInfo, del bool) error { - files, err := flist.Files() - if err != nil { - return errors.Wrapf(err, "failed to get list of current installed files for '%s'", flist.Absolute()) - } - - //stop all services names - var names []string - for _, file := range files { - dir := filepath.Dir(file.Path) - if dir != "/etc/zinit" { - continue - } - - name := filepath.Base(file.Path) - if !strings.HasSuffix(name, ".yaml") { - continue - } - - name = strings.TrimSuffix(name, ".yaml") - // skip self and redis - if isIn(name, protected) { - continue - } - - names = append(names, name) - } - - log.Debug().Strs("services", names).Msg("stopping services") - - if err = u.zinit.StopMultiple(20*time.Second, names...); err != nil { - return errors.Wrapf(err, "failed to stop services") - } - - // we do a forget so any changes of the zinit config - // themselves get reflected once monitored again - for _, name := range names { - if err := u.zinit.Forget(name); err != nil { - log.Error().Err(err).Str("service", name).Msg("error on zinit forget") - } - } - - if !del { - return nil - } - - // now delete ALL files, ignore what doesn't delete - for _, file := range files { - stat, err := os.Stat(file.Path) - if err != nil { - log.Debug().Err(err).Str("file", file.Path).Msg("failed to check file") - continue - } - - if stat.IsDir() { - continue - } - - log.Debug().Str("file", file.Path).Msg("deleting file") - - if file.Path == flistIdentityPath { - log.Debug().Str("file", file.Path).Msg("skip deleting file") - continue - } - - if err := os.Remove(file.Path); err != nil { - log.Error().Err(err).Str("file", file.Path).Msg("failed to remove file") - } - } - - return nil -} - -func (u *Upgrader) applyUpgrade(from, to FullFListInfo) error { - log.Info().Str("flist", to.Fqdn()).Str("version", to.TryVersion().String()).Msg("start applying upgrade") - - store, err := u.getFlist(to.Fqdn()) - if err != nil { - return errors.Wrap(err, "failed to get flist store") - } - - defer store.Close() - - if err := u.upgradeSelf(store); err != nil { - return err - } - - if err := u.uninstall(from.FListInfo, true); err != nil { - log.Error().Err(err).Msg("failed to uninstall current flist. Upgraded anyway") - } - - log.Info().Msg("clean up complete, copying new files") - services, err := u.servicesFromStore(store) - if err != nil { - return err - } - if err := u.copyRecursive(store, "/", flistIdentityPath); err != nil { - return err - } - - log.Debug().Msg("copying files complete") - log.Debug().Msg("make sure all services are monitored") - if err := u.zinit.StartMultiple(20*time.Minute, services...); err != nil { - return errors.Wrap(err, "failed to monitor services") - } - - return u.removeDuplicates(services) -} - -func (u *Upgrader) removeDuplicates(services []string) error { - /* - this is a hack to clean up duplicate services duo to a bug in zinit version 0.2.6 (and earlier) - the issue has been fixed in next release of zinit (starting 0.2.7) - - the issue is duo to this bug we can end up with multiple running instances of the same service. - - to make sure this is cleaned up as expected we need to do the following: - - get the pid of each service from zinit status. Those are the processes that are known and - managed by zinit - - for each service, find all processes that is running using the same command - - for the found service(s), kill the ones that their PIDs does not match the one managed by zinit - - We will always assume that the binary name is the same as the service name - */ - - names := make(map[string]struct{}) - for _, name := range services { - names[name] = struct{}{} - } - - all := make(map[string][]int) - processes, err := process.Processes() - if err != nil { - return err - } - - for _, ps := range processes { - cmdline, err := ps.CmdlineSlice() - if err != nil { - log.Debug().Err(err).Msgf("failed to parse process '%d' commandline", ps.Pid) - continue - } - if len(cmdline) == 0 { - continue - } - name := cmdline[0] - if _, ok := names[name]; !ok { - continue - } - - all[name] = append(all[name], int(ps.Pid)) - } - - cl := zinit.Default() - - for _, service := range services { - ps, err := cl.Status(service) - if errors.Is(err, zinit.ErrUnknownService) { - continue - } else if err != nil { - log.Error().Err(err).Msg("failed to get service status") - } - - pids, ok := all[service] - if !ok { - // probably a short lived service, or has exited - continue - } - - for _, pid := range pids { - if ps.Pid != pid { - log.Warn().Str("service", service).Int("pid", pid).Msg("found unmanaged process for service. terminating.") - _ = syscall.Kill(pid, syscall.SIGKILL) - } - } - } - - return nil -} - func (u *Upgrader) copyRecursive(store meta.Walker, destination string, skip ...string) error { return store.Walk("", func(path string, info meta.Meta) error { diff --git a/pkg/upgrade/upgrade_test.go b/pkg/upgrade/upgrade_test.go index 1b0ddb34f..7d2ce9f14 100644 --- a/pkg/upgrade/upgrade_test.go +++ b/pkg/upgrade/upgrade_test.go @@ -19,9 +19,10 @@ func TestUpgraderDownload(t *testing.T) { err := Storage(defaultHubStorage)(up) require.NoError(err) - const flist = "azmy.3bot/test-flist.flist" + const repo = "azmy.3bot" + const flist = "test-flist.flist" - store, err := up.getFlist(flist) + store, err := up.getFlist(repo, flist) require.NoError(err) tmp := t.TempDir() diff --git a/pkg/upgrade/watcher.go b/pkg/upgrade/watcher.go deleted file mode 100644 index 9ca76fbb9..000000000 --- a/pkg/upgrade/watcher.go +++ /dev/null @@ -1,65 +0,0 @@ -package upgrade - -import ( - "github.com/pkg/errors" -) - -/** -Watcher unifies the upgrade pipeline by making sure we can watch -different types of required updates, while always make sure only -one kind of update is applied at a time. - -This to prevent updates to step on each other toes. -*/ - -// FListRepo type -type FListRepo struct { - Repo string - Current map[string]FListInfo - - client HubClient -} - -func (w *FListRepo) list() (map[string]FListInfo, error) { - packages, err := w.client.List(w.Repo) - if err != nil { - return nil, err - } - - result := make(map[string]FListInfo) - for _, pkg := range packages { - //flist := FListInfo{pkg} - result[pkg.Fqdn()] = pkg - } - - return result, nil -} - -func (w *FListRepo) diff(packages map[string]FListInfo) (toAdd, toDel []FListInfo) { - for name, pkg := range packages { - current, ok := w.Current[name] - if !ok || pkg.Updated != current.Updated { - toAdd = append(toAdd, pkg) - } - } - - for name, pkg := range w.Current { - _, ok := packages[name] - if !ok { - toDel = append(toDel, pkg) - } - } - - return -} - -// Diff return the remote changes related to current list of packages -func (w *FListRepo) Diff() (all map[string]FListInfo, toAdd, toDell []FListInfo, err error) { - all, err = w.list() - if err != nil { - return all, nil, nil, errors.Wrap(err, "failed to get available packages") - } - - toAdd, toDell = w.diff(all) - return -}