Skip to content

Commit

Permalink
WIP: rework update process
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Oct 20, 2023
1 parent ada147e commit 640b2d7
Show file tree
Hide file tree
Showing 13 changed files with 629 additions and 947 deletions.
186 changes: 14 additions & 172 deletions cmds/identityd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"syscall"
Expand All @@ -14,7 +13,6 @@ import (
"github.com/threefoldtech/zos/pkg/stubs"
"github.com/threefoldtech/zos/pkg/upgrade"

"github.com/cenkalti/backoff/v3"
"github.com/threefoldtech/zos/pkg"
"github.com/threefoldtech/zos/pkg/environment"
"github.com/threefoldtech/zos/pkg/identity"
Expand Down Expand Up @@ -118,18 +116,19 @@ func main() {
log.Fatal().Err(err).Str("root", root).Msg("failed to create root directory")
}

var boot upgrade.Boot

bootMethod := boot.DetectBootMethod()

// 2. Register the node to BCDB
// at this point we are running latest version
idMgr, err := getIdentityMgr(root, debug)
if err != nil {
log.Fatal().Err(err).Msg("failed to create identity manager")
}

monitor := newVersionMonitor(10 * time.Second)
upgrader, err := upgrade.NewUpgrader(root, upgrade.NoSelfUpgrade(debug))
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize upgrader")
}

monitor := newVersionMonitor(10*time.Second, upgrader.Version())
// 3. start zbus server to serve identity interface
server, err := zbus.NewRedisServer(module, broker, 1)
if err != nil {
Expand All @@ -143,18 +142,6 @@ func main() {
// register the cancel function with defer if the process stops because of a update
defer cancel()

upgrader, err := upgrade.NewUpgrader(root, upgrade.NoSelfUpgrade(debug))
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize upgrader")
}

err = installBinaries(&boot, upgrader)
if err == upgrade.ErrRestartNeeded {
return
} else if err != nil {
log.Error().Err(err).Msg("failed to install binaries")
}

go func() {
if err := server.Run(ctx); err != nil && err != context.Canceled {
log.Error().Err(err).Msg("unexpected error")
Expand All @@ -165,39 +152,25 @@ func main() {
log.Info().Msg("received a termination signal")
})

if bootMethod != upgrade.BootMethodFList {
log.Info().Msg("node is not booted from an flist. upgrade is not supported")
<-ctx.Done()
err = upgrader.Run(ctx)
if errors.Is(err, upgrade.ErrRestartNeeded) {
return
} else if err != nil {
log.Error().Err(err).Msg("error during update")
os.Exit(1)
}

//NOTE: code after this commit will only
//run if the system is booted from an flist

// 4. Start watcher for new version
log.Info().Msg("start upgrade daemon")

// TODO: do we need to update farmer on node upgrade?
upgradeLoop(ctx, &boot, upgrader, debug, monitor, func(string) error { return nil })
}

// allow reinstall if receive signal USR1
// only allowed in debug mode
func debugReinstall(boot *upgrade.Boot, up *upgrade.Upgrader) {
c := make(chan os.Signal)
func debugReinstall(up *upgrade.Upgrader) {
c := make(chan os.Signal, 10)
signal.Notify(c, syscall.SIGUSR1)

go func() {
for range c {
current, err := boot.Current()
if err != nil {
log.Error().Err(err).Msg("couldn't get current flist info")
continue
}

if err := Safe(func() error {
return up.Upgrade(current, current)
}); err != nil {
if err := up.Reinstall(); err != nil {
log.Error().Err(err).Msg("reinstall failed")
} else {
log.Info().Msg("reinstall completed successfully")
Expand All @@ -206,137 +179,6 @@ func debugReinstall(boot *upgrade.Boot, up *upgrade.Upgrader) {
}()
}

func installBinaries(boot *upgrade.Boot, upgrader *upgrade.Upgrader) error {
bins, _ := boot.CurrentBins()
env, _ := environment.Get()

repoWatcher := upgrade.FListRepo{
Repo: env.BinRepo,
Current: bins,
}

current, toAdd, toDel, err := repoWatcher.Diff()
if err != nil {
return errors.Wrap(err, "failed to list latest binaries to install")
}

if len(toAdd) == 0 && len(toDel) == 0 {
return nil
}

for _, pkg := range toDel {
log.Debug().Str("package", pkg.Target).Msg("uninstall package")
if err := upgrader.UninstallBinary(pkg); err != nil {
log.Error().Err(err).Str("flist", pkg.Fqdn()).Msg("failed to uninstall flist")
}
}

for _, pkg := range toAdd {
log.Debug().Str("package", pkg.Target).Msg("install package")
if err := upgrader.InstallBinary(pkg); err != nil {
log.Error().Err(err).Str("package", pkg.Fqdn()).Msg("failed to install package")
}
}

if err := boot.SetBins(current); err != nil {
return errors.Wrap(err, "failed to commit pkg status")
}

return upgrade.ErrRestartNeeded
}

func upgradeLoop(
ctx context.Context,
boot *upgrade.Boot,
upgrader *upgrade.Upgrader,
debug bool,
monitor *monitorStream,
register func(string) error) {

if debug {
debugReinstall(boot, upgrader)
}

monitor.C <- boot.MustVersion()
var hub upgrade.HubClient

flist := boot.Name()
//current := boot.MustVersion()
for {
// delay in case of error
select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
return
}

current, err := boot.Current()
if err != nil {
log.Fatal().Err(err).Msg("cannot get current boot flist information")
}

latest, err := hub.Info(flist)
if err != nil {
log.Error().Err(err).Msg("failed to get flist info")
continue
}
log.Info().
Str("current", current.TryVersion().String()).
Str("latest", current.TryVersion().String()).
Msg("checking if update is required")

if !latest.TryVersion().GT(current.TryVersion()) {
// We wanted to use the node id to actually calculate the delay to wait but it's not
// possible to get the numeric node id from the identityd
next := time.Duration(60+rand.Intn(60)) * time.Minute
log.Info().Dur("wait", next).Msg("checking for update after milliseconds")
select {
case <-time.After(next):
case <-ctx.Done():
return
}

continue
}

// next check for update
exp := backoff.NewExponentialBackOff()
exp.MaxInterval = 3 * time.Minute
exp.MaxElapsedTime = 60 * time.Minute
err = backoff.Retry(func() error {
log.Debug().Str("version", latest.TryVersion().String()).Msg("trying to update")

err := Safe(func() error {
return upgrader.Upgrade(current, latest)
})

if err == upgrade.ErrRestartNeeded {
return backoff.Permanent(err)
} else if err != nil {
log.Error().Err(err).Msg("update failure. retrying")
}

return nil
}, exp)

if err == upgrade.ErrRestartNeeded {
log.Info().Msg("restarting upgraded")
return
} else if err != nil {
//TODO: crash or continue!
log.Error().Err(err).Msg("upgrade failed")
continue
} else {
log.Info().Str("version", latest.TryVersion().String()).Msg("update completed")
}
if err := boot.Set(latest); err != nil {
log.Fatal().Err(err).Msg("failed to update boot information")
}

monitor.C <- latest.TryVersion()
}
}

func getIdentityMgr(root string, debug bool) (pkg.IdentityManager, error) {
manager, err := identity.NewManager(root, debug)
if err != nil {
Expand Down
15 changes: 5 additions & 10 deletions cmds/identityd/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,15 @@ import (
)

type monitorStream struct {
C chan semver.Version
duration time.Duration
version semver.Version
}

var _ pkg.VersionMonitor = (*monitorStream)(nil)

// newVersionMonitor creates a new instance of version monitor
func newVersionMonitor(d time.Duration) *monitorStream {
func newVersionMonitor(d time.Duration, version semver.Version) *monitorStream {
return &monitorStream{
C: make(chan semver.Version),
duration: d,
}
}
Expand All @@ -32,14 +30,11 @@ func (m *monitorStream) Version(ctx context.Context) <-chan semver.Version {
ch := make(chan semver.Version)
go func() {
defer close(ch)
ch <- m.version

for {
select {
case <-time.After(m.duration):
ch <- m.version
case v := <-m.C:
m.version = v
ch <- m.version
}
ch <- m.version
time.Sleep(m.duration)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/environment/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func uniqueStr(slice []string) []string {
return list
}

func getConfig(run RunningMode, url string) (ext Config, err error) {
func getConfig(run RunMode, url string) (ext Config, err error) {
if !strings.HasSuffix(url, "/") {
url += "/"
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
// Environment holds information about running environment of a node
// it defines the different constant based on the running mode (dev, test, prod)
type Environment struct {
RunningMode RunningMode
RunningMode RunMode

FlistURL string
BinRepo string
Expand All @@ -39,10 +39,10 @@ type Environment struct {
ExtendedConfigURL string
}

// RunningMode type
type RunningMode string
// RunMode type
type RunMode string

func (r RunningMode) String() string {
func (r RunMode) String() string {
switch r {
case RunningDev:
return "development"
Expand All @@ -60,13 +60,13 @@ func (r RunningMode) String() string {
// Possible running mode of a node
const (
// RunningDev mode
RunningDev RunningMode = "dev"
RunningDev RunMode = "dev"
// RunningQA mode
RunningQA RunningMode = "qa"
RunningQA RunMode = "qa"
// RunningTest mode
RunningTest RunningMode = "test"
RunningTest RunMode = "test"
// RunningMain mode
RunningMain RunningMode = "prod"
RunningMain RunMode = "prod"

// Orphanage is the default farmid where nodes are registered
// if no farmid were specified on the kernel command line
Expand Down Expand Up @@ -185,7 +185,7 @@ func getEnvironmentFromParams(params kernel.Params) (Environment, error) {
runmode = string(RunningMain)
}

switch RunningMode(runmode) {
switch RunMode(runmode) {
case RunningDev:
env = envDev
case RunningQA:
Expand Down
15 changes: 0 additions & 15 deletions pkg/monitord/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ import (
"os"
"time"

"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/threefoldtech/zos/pkg"
"github.com/threefoldtech/zos/pkg/upgrade"
)

// HostMonitor monitor host information
Expand All @@ -23,18 +20,6 @@ func NewHostMonitor(duration time.Duration) (pkg.HostMonitor, error) {
if duration == 0 {
duration = 2 * time.Second
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, errors.Wrap(err, "failed to initialize fs watcher")
}

// the file can not exist if the system was booted from overlay
if _, err := os.Stat(upgrade.FlistInfoFile); err == nil {
if err := watcher.Add(upgrade.FlistInfoFile); err != nil {
return nil, errors.Wrapf(err, "failed to watch '%s'", upgrade.FlistInfoFile)
}
}

return &hostMonitor{
duration: duration,
}, nil
Expand Down
Loading

0 comments on commit 640b2d7

Please sign in to comment.