Skip to content

Commit

Permalink
Merge pull request #2457 from threefoldtech/main_multipletimeservers
Browse files Browse the repository at this point in the history
support multiple timeservers for the ntpcheck
  • Loading branch information
xmonader authored Nov 10, 2024
2 parents f99d64a + 1fca8f5 commit 41fe8f7
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 27 deletions.
6 changes: 5 additions & 1 deletion cmds/modules/noded/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ func action(cli *cli.Context) error {
if err != nil {
return errors.Wrap(err, "failed to create a new perfMon")
}

zcl, err := zbus.NewRedisClient(msgBrokerCon)
if err != nil {
return errors.Wrap(err, "failed to create a zbus client to the msgBroker")
}
ctx = perf.WithZbusClient(ctx, zcl)
healthcheck.RunNTPCheck(ctx)
perfMon.AddTask(iperf.NewTask())
perfMon.AddTask(cpubench.NewTask())
Expand Down
1 change: 1 addition & 0 deletions pkg/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ all: getdeps test

getdeps:
@echo "Installing golangci-lint" && go get github.com/golangci/golangci-lint/cmd/golangci-lint && go install github.com/golangci/golangci-lint/cmd/golangci-lint
@echo "Installing zbusc" && go install github.com/threefoldtech/zbus/zbusc
go mod tidy

lint:
Expand Down
3 changes: 3 additions & 0 deletions pkg/api_gateway.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pkg

import (
"time"

"github.com/centrifuge/go-substrate-rpc-client/v4/types"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
)
Expand All @@ -27,6 +29,7 @@ type SubstrateGateway interface {
SetNodePowerState(up bool) (hash types.Hash, err error)
UpdateNode(node substrate.Node) (uint32, error)
UpdateNodeUptimeV2(uptime uint64, timestampHint uint64) (hash types.Hash, err error)
GetTime() (time.Time, error)
}

type SubstrateError struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/perf/cpubench/cpubench_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *CPUBenchmarkTask) Run(ctx context.Context) (interface{}, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse cpubench output: %w", err)
}
client := perf.GetZbusClient(ctx)
client := perf.MustGetZbusClient(ctx)
statistics := stubs.NewStatisticsStub(client)

workloads, err := statistics.Workloads(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/perf/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (h *healthcheckTask) Run(ctx context.Context) (interface{}, error) {
log.Debug().Msg("starting health check task")
errs := make(map[string][]string)

cl := perf.GetZbusClient(ctx)
cl := perf.MustGetZbusClient(ctx)
zui := stubs.NewZUIStub(cl)

var wg sync.WaitGroup
Expand Down
111 changes: 101 additions & 10 deletions pkg/perf/healthcheck/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,34 @@ package healthcheck
import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/threefoldtech/zbus"
"github.com/threefoldtech/zos/pkg/perf"
"github.com/threefoldtech/zos/pkg/stubs"
"github.com/threefoldtech/zos/pkg/zinit"
)

const acceptableSkew = 10 * time.Minute

func RunNTPCheck(ctx context.Context) {
operation := func() error {
return ntpCheck(ctx)
}
go func() {
for {
exp := backoff.NewExponentialBackOff()
retryNotify := func(err error, d time.Duration) {
log.Error().Err(err).Msg("failed to run ntp check")
}

if err := backoff.RetryNotify(ntpCheck, backoff.WithContext(exp, ctx), retryNotify); err != nil {
if err := backoff.RetryNotify(operation, backoff.WithContext(exp, ctx), retryNotify); err != nil {
log.Error().Err(err).Send()
continue
}
Expand All @@ -40,10 +47,13 @@ func RunNTPCheck(ctx context.Context) {
}()
}

func ntpCheck() error {
func ntpCheck(ctx context.Context) error {
z := zinit.Default()

utcTime, err := getCurrentUTCTime()
zcl, err := perf.TryGetZbusClient(ctx)
if err != nil {
return fmt.Errorf("ntpCheck expects zbus client in the context and found none %w", err)
}
utcTime, err := getCurrentUTCTime(zcl)
if err != nil {
return err
}
Expand All @@ -57,20 +67,101 @@ func ntpCheck() error {
return nil
}

func getCurrentUTCTime() (time.Time, error) {
func getCurrentUTCTime(zcl zbus.Client) (time.Time, error) {

// TimeServer represents a time server with its name and fetching function
type TimeServer struct {
Name string
Func func() (time.Time, error)
}

// List of time servers, and here not in the global vars, so we can inject zcl to pass to getTimeChainWithZCL
var timeServers = []TimeServer{
{
Name: "tfchain",
Func: func() (time.Time, error) {
return getTimeChainWithZCL(zcl)
},
},
{
Name: "worldtimeapi",
Func: getWorldTimeAPI,
},
{
Name: "worldclockapi",
Func: getWorldClockAPI,
},
{
Name: "timeapi.io",
Func: getTimeAPI,
},
}
for _, server := range timeServers {
log.Info().Msg(fmt.Sprint("running NTP check against ", server.Name))
utcTime, err := server.Func()
if err == nil {
log.Info().Msg(fmt.Sprint("utc time from ", server.Name, ": ", utcTime))
return utcTime, nil
}
log.Error().Err(err).Str("server", server.Name).Msg("failed to get time from server")
}
return time.Time{}, errors.New("failed to get time from all servers")
}

func getWorldTimeAPI() (time.Time, error) {
timeRes, err := http.Get("https://worldtimeapi.org/api/timezone/UTC")
if err != nil {
return time.Time{}, errors.Wrapf(err, "failed to get date")
return time.Time{}, errors.Wrapf(err, "failed to get date from worldtimeapi")
}
defer timeRes.Body.Close()

var utcTime struct {
DateTime time.Time `json:"datetime"`
}
err = json.NewDecoder(timeRes.Body).Decode(&utcTime)
timeRes.Body.Close()
if err != nil {
return time.Time{}, errors.Wrapf(err, "failed to decode date response")
if err := json.NewDecoder(timeRes.Body).Decode(&utcTime); err != nil {
return time.Time{}, errors.Wrapf(err, "failed to decode date response from worldtimeapi")
}

return utcTime.DateTime, nil
}

func getWorldClockAPI() (time.Time, error) {
timeRes, err := http.Get("http://worldclockapi.com/api/json/utc/now")
if err != nil {
return time.Time{}, errors.Wrapf(err, "failed to get date from worldclockapi")
}
defer timeRes.Body.Close()

var utcTime struct {
CurrentDateTime string `json:"currentDateTime"` // Changed to string, needs manual parsing
}
if err := json.NewDecoder(timeRes.Body).Decode(&utcTime); err != nil {
return time.Time{}, errors.Wrapf(err, "failed to decode date response from worldclockapi")
}

// Parse the time manually, handling the "Z"
return time.Parse("2006-01-02T15:04Z", utcTime.CurrentDateTime)
}

func getTimeAPI() (time.Time, error) {
timeRes, err := http.Get("https://timeapi.io/api/Time/current/zone?timeZone=UTC")
if err != nil {
return time.Time{}, errors.Wrapf(err, "failed to get date from timeapi.io")
}
defer timeRes.Body.Close()

var utcTime struct {
DateTime string `json:"dateTime"` // Changed to string, needs manual parsing
}
if err := json.NewDecoder(timeRes.Body).Decode(&utcTime); err != nil {
return time.Time{}, errors.Wrapf(err, "failed to decode date response from timeapi.io")
}

// Parse the time manually, handling the fractional seconds
return time.Parse("2006-01-02T15:04:05.999999", utcTime.DateTime)
}

func getTimeChainWithZCL(zcl zbus.Client) (time.Time, error) {
gw := stubs.NewSubstrateGatewayStub(zcl)
return gw.GetTime(context.Background())
}
13 changes: 1 addition & 12 deletions pkg/perf/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (pm *PerformanceMonitor) runTask(ctx context.Context, task Task) error {

// Run adds the tasks to the cron queue and start the scheduler
func (pm *PerformanceMonitor) Run(ctx context.Context) error {
ctx = withZbusClient(ctx, pm.zbusClient)
ctx = WithZbusClient(ctx, pm.zbusClient)
for _, task := range pm.tasks {
task := task
if _, err := pm.scheduler.CronWithSeconds(task.Cron()).Do(func() error {
Expand All @@ -105,14 +105,3 @@ func (pm *PerformanceMonitor) Run(ctx context.Context) error {
pm.scheduler.StartAsync()
return nil
}

type zbusClient struct{}

func withZbusClient(ctx context.Context, client zbus.Client) context.Context {
return context.WithValue(ctx, zbusClient{}, client)
}

// GetZbusClient gets zbus client from the given context
func GetZbusClient(ctx context.Context) zbus.Client {
return ctx.Value(zbusClient{}).(zbus.Client)
}
4 changes: 2 additions & 2 deletions pkg/perf/publicip/publicip_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (p *publicIPValidationTask) Run(ctx context.Context) (interface{}, error) {
if err != nil {
return nil, fmt.Errorf("failed to get namespace %s: %w", testNamespace, err)
}
cl := perf.GetZbusClient(ctx)
cl := perf.MustGetZbusClient(ctx)
substrateGateway := stubs.NewSubstrateGatewayStub(cl)
farmID := environment.MustGet().FarmID

Expand Down Expand Up @@ -191,7 +191,7 @@ func isLeastValidNode(ctx context.Context, farmID uint32, substrateGateway *stub
if err != nil {
return false, fmt.Errorf("failed to get farm %d nodes: %w", farmID, err)
}
cl := perf.GetZbusClient(ctx)
cl := perf.MustGetZbusClient(ctx)
registrar := stubs.NewRegistrarStub(cl)
var nodeID uint32
err = backoff.Retry(func() error {
Expand Down
30 changes: 30 additions & 0 deletions pkg/perf/zbusctx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package perf

import (
"context"
"fmt"

"github.com/threefoldtech/zbus"
)

// WithZbusClient adds a zbus.Client to the provided context, returning a new context.
// This allows for the retrieval of the client from the context at a later time.
type zbusClientKey struct{}

func WithZbusClient(ctx context.Context, client zbus.Client) context.Context {
return context.WithValue(ctx, zbusClientKey{}, client)
}

// MustGetZbusClient gets zbus client from the given context
func MustGetZbusClient(ctx context.Context) zbus.Client {
return ctx.Value(zbusClientKey{}).(zbus.Client)
}

// TryGetZbusClient tries to get zbus client from the given context
func TryGetZbusClient(ctx context.Context) (zbus.Client, error) {
zcl, ok := ctx.Value(zbusClientKey{}).(zbus.Client)
if !ok {
return zcl, fmt.Errorf("context does not have zbus client")
}
return zcl, nil
}
18 changes: 18 additions & 0 deletions pkg/stubs/api_gateway_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
tfchainclientgo "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
zbus "github.com/threefoldtech/zbus"
pkg "github.com/threefoldtech/zos/pkg"
"time"
)

type SubstrateGatewayStub struct {
Expand Down Expand Up @@ -233,6 +234,23 @@ func (s *SubstrateGatewayStub) GetPowerTarget(ctx context.Context, arg0 uint32)
return
}

func (s *SubstrateGatewayStub) GetTime(ctx context.Context) (ret0 time.Time, ret1 error) {
args := []interface{}{}
result, err := s.client.RequestContext(ctx, s.module, s.object, "GetTime", args...)
if err != nil {
panic(err)
}
result.PanicOnError()
ret1 = result.CallError()
loader := zbus.Loader{
&ret0,
}
if err := result.Unmarshal(&loader); err != nil {
panic(err)
}
return
}

func (s *SubstrateGatewayStub) GetTwin(ctx context.Context, arg0 uint32) (ret0 tfchainclientgo.Twin, ret1 error) {
args := []interface{}{arg0}
result, err := s.client.RequestContext(ctx, s.module, s.object, "GetTwin", args...)
Expand Down
6 changes: 6 additions & 0 deletions pkg/substrate_gateway/substrate_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/hex"
"errors"
"sync"
"time"

"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -200,6 +201,11 @@ func (g *substrateGateway) UpdateNodeUptimeV2(uptime uint64, timestampHint uint6
defer g.mu.Unlock()
return g.sub.UpdateNodeUptimeV2(g.identity, uptime, timestampHint)
}
func (g *substrateGateway) GetTime() (time.Time, error) {
log.Trace().Str("method", "Time").Msg("method called")

return g.sub.Time()
}

func buildSubstrateError(err error) (serr pkg.SubstrateError) {
if err == nil {
Expand Down

0 comments on commit 41fe8f7

Please sign in to comment.