Skip to content

Commit

Permalink
Separate creating namespace form the task and check other nodes befor…
Browse files Browse the repository at this point in the history
…e running
  • Loading branch information
AbdelrahmanElawady committed Oct 17, 2023
1 parent b8d3598 commit 2502823
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 49 deletions.
3 changes: 3 additions & 0 deletions cmds/modules/noded/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ func action(cli *cli.Context) error {
cpuBenchmarkTask := perf.NewCPUBenchmarkTask()
perfMon.AddTask(&cpuBenchmarkTask)

pubIPTask := perf.NewpublicIPTask()
perfMon.AddTask(pubIPTask)

if err = perfMon.Run(ctx); err != nil {
return errors.Wrap(err, "failed to run the scheduler")
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/network/public/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
const (
toZosVeth = "tozos" // veth pair from br-pub to zos
publicNsMACDerivationSuffix = "-public"
testMacvlan = "pubtestmacvlan"
testNamespace = "pubtestns"

// PublicBridge public bridge name, exists only after a call to EnsurePublicSetup
PublicBridge = types.PublicBridge
Expand Down Expand Up @@ -319,6 +321,11 @@ func EnsurePublicSetup(nodeID pkg.Identifier, inf *pkg.PublicConfig) (*netlink.B
return nil, errors.Wrap(err, "failed to get current public bridge uplink")
}

err = ensureTestNamespace(br)
if err != nil {
return nil, fmt.Errorf("failed to create test namespace: %w", err)
}

if inf == nil || inf.IsEmpty() {
// we need to check if there is already a public config
// if yes! we need to make sure to delete it and also restart
Expand Down Expand Up @@ -349,6 +356,15 @@ func EnsurePublicSetup(nodeID pkg.Identifier, inf *pkg.PublicConfig) (*netlink.B
return br, netlink.LinkSetUp(br)
}

func ensureTestNamespace(publicBrdige *netlink.Bridge) error {
ns, err := namespace.Create(testNamespace)
if err != nil {
return fmt.Errorf("failed to create namespace %s: %w", ns, err)
}
_, err = macvlan.Create(testMacvlan, publicBrdige.Name, ns)
return err
}

func detectExitNic() (string, error) {
log.Debug().Msg("find possible ipv6 exit interface")
// otherwise we try to find the right one
Expand Down
227 changes: 178 additions & 49 deletions pkg/perf/pubip_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,50 @@ import (
"io"
"net"
"net/http"
"os/exec"
"strings"
"time"

"github.com/containernetworking/plugins/pkg/ns"
"github.com/rs/zerolog/log"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/zos/pkg/environment"
"github.com/threefoldtech/zos/pkg/network/macvlan"
"github.com/threefoldtech/zos/pkg/network/namespace"
"github.com/threefoldtech/zos/pkg/network/types"
"github.com/threefoldtech/zos/pkg/stubs"
"github.com/vishvananda/netlink"
)

const macvlanName = "pubtestmacvlan"
const namespaceName = "pubtestns"
const testMacvlan = "pubtestmacvlan"
const testNamespace = "pubtestns"

type PubIPTask struct {
type publicIPValidationTask struct {
taskID string
schedule string
}

var _ Task = (*PubIPTask)(nil)
var _ Task = (*publicIPValidationTask)(nil)

func (p *PubIPTask) ID() string {
func NewpublicIPValidationTask() Task {
return &publicIPValidationTask{
taskID: "PublicIPValidation",
schedule: "0 0 */6 * * *",
}
}

func (p *publicIPValidationTask) ID() string {
return p.taskID
}

func (p *PubIPTask) Cron() string {
func (p *publicIPValidationTask) Cron() string {
return p.schedule
}

func (p *PubIPTask) Run(ctx context.Context) (interface{}, error) {
netNS, err := namespace.GetByName(namespaceName)
if err != nil {
netNS, err = namespace.Create(namespaceName)
if err != nil {
return nil, fmt.Errorf("failed to create namespace %s: %w", namespaceName, err)
}
}
mv, err := macvlan.GetByName(macvlanName)
func (p *publicIPValidationTask) Run(ctx context.Context) (interface{}, error) {

netNS, err := namespace.GetByName(testNamespace)
if err != nil {
mv, err = macvlan.Create(macvlanName, types.PublicBridge, netNS)
if err != nil {
return nil, fmt.Errorf("failed to create macvlan %s: %w", namespaceName, err)
}
return nil, fmt.Errorf("failed to get namespace %s: %w", testNamespace, err)
}
manager, err := environment.GetSubstrate()
if err != nil {
Expand All @@ -57,53 +59,180 @@ func (p *PubIPTask) Run(ctx context.Context) (interface{}, error) {
if err != nil {
return nil, fmt.Errorf("failed to get substrate client: %w", err)
}
defer sub.Close()
farmID := environment.MustGet().FarmID

shouldRun, err := isLeastValidNode(ctx, uint32(farmID), sub)
if err != nil {
return nil, fmt.Errorf("failed to check if the node should run public IP verification: %w", err)
}
if !shouldRun {
log.Info().Msg("skipping because there is a node with less ID available")
return nil, nil
}

farm, err := sub.GetFarm(uint32(farmID))
if farm != nil {
if err != nil {
return nil, fmt.Errorf("failed to get farm with id %d: %w", farmID, err)
}
unusedIPs := map[string]bool{}
for _, publicIP := range farm.PublicIPs {
if publicIP.ContractID == 0 {
unusedIPs[publicIP.IP] = false
}
}
for ip := range unusedIPs {
_, ipNet, err := net.ParseCIDR(ip)
unusedIPs := make(map[string]bool)
err = netNS.Do(func(nn ns.NetNS) error {
mv, err := macvlan.GetByName(testMacvlan)
if err != nil {
continue
return fmt.Errorf("failed to get macvlan %s in namespace %s: %w", testMacvlan, testNamespace, err)
}
err = macvlan.Install(mv, nil, []*net.IPNet{ipNet}, nil, netNS)
if err != nil {
return nil, fmt.Errorf("failed to install macvlan %s with ip %s to namespace %s: %w", macvlanName, ipNet, namespaceName, err)
}
err = netNS.Do(func(_ ns.NetNS) error {
req, err := http.Get("https://api.ipify.org/")
for _, publicIP := range farm.PublicIPs {
if publicIP.ContractID != 0 {
continue
}
unusedIPs[publicIP.IP] = false

ip, ipNet, routes, err := getIPWithRoute(publicIP)
if err != nil {
log.Err(err).Send()
continue
}
err = macvlan.Install(mv, nil, ipNet, routes, netNS)
if err != nil {
return err
log.Err(err).Msgf("failed to install macvlan %s with ip %s to namespace %s", testMacvlan, ipNet, testNamespace)
continue
}

body, err := io.ReadAll(req.Body)
realIP, err := getRealPublicIP()
if err != nil {
req.Body.Close()
return err
log.Err(err).Msg("failed to get node real IP")
}
req.Body.Close()

if ip == strings.TrimSpace(string(body)) {
unusedIPs[ip] = true
if ip.String() == strings.TrimSpace(string(realIP)) {
unusedIPs[publicIP.IP] = true
}
return nil
})

err = deleteIPAndRoutes(publicIP, routes, mv)
if err != nil {
log.Err(err).Send()
}
}
err = netlink.LinkSetDown(mv)
if err != nil {
return nil, fmt.Errorf("failed to check if ip is valid: %w", err)
return fmt.Errorf("failed to set link down: %w", err)
}
return nil
})

if err != nil {
return nil, fmt.Errorf("failed to run public IP validation: %w", err)
}
err = netlink.LinkSetDown(mv)
return unusedIPs, nil
}

func isLeastValidNode(ctx context.Context, farmID uint32, sub *substrate.Substrate) (bool, error) {
nodes, err := sub.GetNodes(uint32(farmID))
if err != nil {
return nil, fmt.Errorf("failed to set macvlan %s link down: %w", macvlanName, err)
return false, fmt.Errorf("failed to get farm %d nodes: %w", farmID, err)
}
cl := GetZbusClient(ctx)
registrar := stubs.NewRegistrarStub(cl)
var nodeID uint32
for {
nodeID, err = registrar.NodeID(ctx)
if err == nil {
break
}
log.Err(err).Msg("failed to get node id")
time.Sleep(10 * time.Second)
}
for _, node := range nodes {
if node >= uint32(nodeID) {
continue
}
state, err := sub.GetPowerTarget(node)
if err != nil {
return false, fmt.Errorf("failed to get node %d power target: %w", node, err)
}
if state.Target.IsDown {
continue
}
n, err := sub.GetNode(node)
if err != nil {
return false, fmt.Errorf("failed to get node %d: %w", node, err)
}
ip, err := getValidNodeIP(n)
if err != nil {
return false, err
}
// stop at three and quiet output
err = exec.CommandContext(ctx, "ping", "-c", "3", "-q", ip).Run()
if err != nil {
log.Err(err).Msgf("failed to ping node %d", node)
continue
}
return false, nil
}
return true, nil
}

return unusedIPs, nil
func getValidNodeIP(node *substrate.Node) (string, error) {
for _, inf := range node.Interfaces {
if inf.Name != "zos" {
continue
}
if len(inf.IPs) == 0 {
return "", fmt.Errorf("no private IP available on node %d", node.ID)
}
return inf.IPs[0], nil
}
return "", fmt.Errorf("failed to get private IP for node %d", node.ID)
}

func getIPWithRoute(publicIP substrate.PublicIP) (net.IP, []*net.IPNet, []*netlink.Route, error) {
ip, ipNet, err := net.ParseCIDR(publicIP.IP)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse IP %s: %w", publicIP.IP, err)
}
gateway := net.ParseIP(publicIP.Gateway)
if gateway == nil {
return nil, nil, nil, fmt.Errorf("failed to parse gateway %s: %w", publicIP.Gateway, err)
}
route, err := netlink.RouteGet(gateway)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to get route to gateway %s", publicIP.Gateway)
}
routes := make([]*netlink.Route, 0)
for _, r := range route {
routes = append(routes, &r)
}
return ip, []*net.IPNet{ipNet}, routes, nil
}

func getRealPublicIP() (string, error) {
// for testing now, should change to cloudflare
req, err := http.Get("https://api.ipify.org/")
if err != nil {
return "", err
}
defer req.Body.Close()

body, err := io.ReadAll(req.Body)
if err != nil {
return "", err
}
return string(body), nil
}

func deleteIPAndRoutes(publicIP substrate.PublicIP, routes []*netlink.Route, macvlan netlink.Link) error {
addr, err := netlink.ParseAddr(publicIP.IP)
if err != nil {
return fmt.Errorf("failed to parse public IP %s: %w", publicIP.IP, err)
}
for _, r := range routes {
err = netlink.RouteDel(r)
if err != nil {
return fmt.Errorf("failed to delete route %s: %w", r.String(), err)
}
}
err = netlink.AddrDel(macvlan, addr)
if err != nil {
return fmt.Errorf("failed to delete address %s: %w", addr.String(), err)
}
return nil
}

0 comments on commit 2502823

Please sign in to comment.