From 377c300f4f06bf7089728c6bd640f60e483f9405 Mon Sep 17 00:00:00 2001 From: Anit Gandhi Date: Thu, 12 Dec 2024 17:48:35 -0600 Subject: [PATCH] add handling for Reserved IPv6 assignment and unassignment --- cmd/agent/main.go | 41 +++++- internal/config/config.go | 5 + .../actioner/reserved_ipv6_actioner.go | 67 +++++++++ internal/metadata/metadata.go | 7 + internal/metadata/watcher/fetcher.go | 6 + internal/metadata/watcher/web_watcher.go | 4 +- internal/reservedipv6/reserved_ipv6.go | 127 ++++++++++++++++++ internal/reservedipv6/reserved_ipv6_test.go | 3 + 8 files changed, 252 insertions(+), 8 deletions(-) create mode 100644 internal/metadata/actioner/reserved_ipv6_actioner.go create mode 100644 internal/reservedipv6/reserved_ipv6.go create mode 100644 internal/reservedipv6/reserved_ipv6_test.go diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 2d4f95a..4c81aaf 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -12,12 +12,15 @@ import ( "syscall" "time" + "github.com/jsimonetti/rtnetlink/v2" + "github.com/digitalocean/droplet-agent/internal/config" "github.com/digitalocean/droplet-agent/internal/log" "github.com/digitalocean/droplet-agent/internal/metadata" "github.com/digitalocean/droplet-agent/internal/metadata/actioner" "github.com/digitalocean/droplet-agent/internal/metadata/updater" "github.com/digitalocean/droplet-agent/internal/metadata/watcher" + "github.com/digitalocean/droplet-agent/internal/reservedipv6" "github.com/digitalocean/droplet-agent/internal/sysaccess" ) @@ -51,19 +54,45 @@ func main() { log.Fatal("failed to initialize SSHManager: %v", err) } - doManagedKeysActioner := actioner.NewDOManagedKeysActioner(sshMgr) + // create the watcher metadataWatcher := newMetadataWatcher(&watcher.Conf{SSHPort: sshMgr.SSHDPort()}) - metadataWatcher.RegisterActioner(doManagedKeysActioner) - infoUpdater := updater.NewAgentInfoUpdater() - // monitor sshd_config + // ssh managed keys + doManagedKeysActioner := actioner.NewDOManagedKeysActioner(sshMgr) + metadataWatcher.RegisterActioner(doManagedKeysActioner) go mustMonitorSSHDConfig(sshMgr) - - // Launch background jobs bgJobsCtx, bgJobsCancel := context.WithCancel(context.Background()) go bgJobsRemoveExpiredDOTTYKeys(bgJobsCtx, sshMgr, cfg.AuthorizedKeysCheckInterval) + // reserved ipv6 + if cfg.ManageReservedIPv6 { + log.Info("Reserved IPv6 management enabled") + conn, err := rtnetlink.Dial(nil) + if err != nil { + log.Fatal("failed to create netlink client: %v", err) + } + defer conn.Close() + + rip6Manager, err := reservedipv6.NewManager(conn) + if err != nil { + log.Fatal("Failed to create Reserved IPv6 manager: %v", err) + } + + rip6Actioner := actioner.NewReservedIPv6Actioner(rip6Manager) + metadataWatcher.RegisterActioner(rip6Actioner) + + // fetch first at the beginning + // TODO: remove this debugging + if md, err := watcher.FetchMetadata(); err != nil { + log.Info("failed to fetch metadata at startup: %v", err) + } else { + log.Info("running Reserved IPv6 action") + rip6Actioner.Do(md) + } + } + // handle shutdown + infoUpdater := updater.NewAgentInfoUpdater() go handleShutdown(bgJobsCancel, metadataWatcher, infoUpdater, sshMgr) // report agent status and ssh info diff --git a/internal/config/config.go b/internal/config/config.go index c1114a7..e6974cb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -28,6 +28,8 @@ type Conf struct { CustomSSHDPort int CustomSSHDCfgFile string AuthorizedKeysCheckInterval time.Duration + + ManageReservedIPv6 bool } // Init initializes the agent's configuration @@ -40,9 +42,12 @@ func Init() *Conf { fs.BoolVar(&cfg.UseSyslog, "syslog", false, "Use syslog service for logging") fs.BoolVar(&cfg.DebugMode, "debug", false, "Turn on debug mode") + fs.IntVar(&cfg.CustomSSHDPort, "sshd_port", 0, "The port sshd is binding to") fs.StringVar(&cfg.CustomSSHDCfgFile, "sshd_config", "", "The location of sshd_config") + fs.BoolVar(&cfg.ManageReservedIPv6, "reserved_ipv6", false, "enable reserved IPv6 assignment/unassignment feature") + ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("DROPLET_AGENT"), ) diff --git a/internal/metadata/actioner/reserved_ipv6_actioner.go b/internal/metadata/actioner/reserved_ipv6_actioner.go new file mode 100644 index 0000000..363c94b --- /dev/null +++ b/internal/metadata/actioner/reserved_ipv6_actioner.go @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: Apache-2.0 + +package actioner + +import ( + "sync/atomic" + + "github.com/digitalocean/droplet-agent/internal/log" + "github.com/digitalocean/droplet-agent/internal/metadata" + "github.com/digitalocean/droplet-agent/internal/reservedipv6" +) + +const ( + rip6LogPrefix string = "[Reserved IPv6 Actioner]" +) + +// NewReservedIPv6Actioner returns a new DigitalOcean Reserved IPv6 actioner +func NewReservedIPv6Actioner(mgr reservedipv6.Manager) MetadataActioner { + return &reservedIPv6Actioner{ + mgr: mgr, + allDone: make(chan struct{}, 1), + } +} + +type reservedIPv6Actioner struct { + mgr reservedipv6.Manager + activeActions int32 + closing uint32 + allDone chan struct{} +} + +func (da *reservedIPv6Actioner) Do(md *metadata.Metadata) { + atomic.AddInt32(&da.activeActions, 1) + defer func() { + ret := atomic.AddInt32(&da.activeActions, -1) + if ret == 0 && atomic.LoadUint32(&da.closing) == 1 { + close(da.allDone) + } + }() + + ipv6 := md.ReservedIP.IPv6 + + if ipv6.Active { + log.Info("%s Attempting to assign Reserved IPv6 address '%s'", rip6LogPrefix, ipv6.IPAddress) + if err := da.mgr.Assign(ipv6.IPAddress); err != nil { + log.Error("%s failed to assign Reserved IPv6 address '%s': %v", rip6LogPrefix, ipv6.IPAddress, err) + } + log.Info("%s Assigned Reserved IPv6 address '%s'", rip6LogPrefix, ipv6.IPAddress) + } else { + log.Info("%s Attempting to unassign all Reserved IPv6 addresses", rip6LogPrefix) + if err := da.mgr.Unassign(); err != nil { + log.Error("%s failed to unassign all Reserved IPv6 addresses '%s': %v", rip6LogPrefix, err) + } + log.Info("%s Unassigned all Reserved IPv6 addresses", rip6LogPrefix) + } +} + +func (da *reservedIPv6Actioner) Shutdown() { + log.Info("%s Shutting down", rip6LogPrefix) + atomic.StoreUint32(&da.closing, 1) + if atomic.LoadInt32(&da.activeActions) != 0 { + // if there are still jobs in progress, wait for them to finish + log.Debug("%s Waiting for jobs in progress", rip6LogPrefix) + <-da.allDone + } + log.Info("%s Bye-bye", rip6LogPrefix) +} diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go index e682a3f..0b84ede 100644 --- a/internal/metadata/metadata.go +++ b/internal/metadata/metadata.go @@ -30,6 +30,13 @@ type Metadata struct { DOTTYStatus AgentStatus `json:"dotty_status,omitempty"` SSHInfo *SSHInfo `json:"ssh_info,omitempty"` ManagedKeysEnabled *bool `json:"managed_keys_enabled,omitempty"` + + ReservedIP struct { + IPv6 struct { + IPAddress string `json:"ip_address,omitempty"` + Active bool `json:"active,omitempty"` + } `json:"ipv6,omitempty"` + } `json:"reserved_ip,omitempty"` } // SSHInfo contains the information of the sshd service running on the droplet diff --git a/internal/metadata/watcher/fetcher.go b/internal/metadata/watcher/fetcher.go index 21101d8..2b5095a 100644 --- a/internal/metadata/watcher/fetcher.go +++ b/internal/metadata/watcher/fetcher.go @@ -42,3 +42,9 @@ func (m *metadataFetcherImpl) fetchMetadata() (*metadata.Metadata, error) { } return ret, nil } + +// FetchMetadata should only be used for testing +// TODO: remove this +func FetchMetadata() (*metadata.Metadata, error) { + return newMetadataFetcher().fetchMetadata() +} diff --git a/internal/metadata/watcher/web_watcher.go b/internal/metadata/watcher/web_watcher.go index 8d5857f..721843f 100644 --- a/internal/metadata/watcher/web_watcher.go +++ b/internal/metadata/watcher/web_watcher.go @@ -9,10 +9,10 @@ import ( "sync" "time" - "github.com/digitalocean/droplet-agent/internal/log" + "golang.org/x/time/rate" + "github.com/digitalocean/droplet-agent/internal/log" "github.com/digitalocean/droplet-agent/internal/metadata/actioner" - "golang.org/x/time/rate" ) type webBasedWatcher struct { diff --git a/internal/reservedipv6/reserved_ipv6.go b/internal/reservedipv6/reserved_ipv6.go new file mode 100644 index 0000000..7b53ab5 --- /dev/null +++ b/internal/reservedipv6/reserved_ipv6.go @@ -0,0 +1,127 @@ +package reservedipv6 + +import ( + "fmt" + "net" + "net/netip" + + "github.com/jsimonetti/rtnetlink/v2" + "github.com/mdlayher/netlink" + "golang.org/x/sys/unix" +) + +const ( + logPrefix string = "[Reserved IPv6 Manager]" + + loIface string = "lo" + eth0Iface string = "eth0" + prefixLen uint8 = 124 +) + +type Manager interface { + Assign(ip string) error + Unassign() error +} + +type mgr struct { + nlConn *rtnetlink.Conn + loIdx uint32 + eth0Idx uint32 +} + +func NewManager(netlink *rtnetlink.Conn) (Manager, error) { + lo, err := net.InterfaceByName(loIface) + if err != nil { + return nil, fmt.Errorf("failed to determine index for interface '%s': %w", loIface, err) + } + + eth0, err := net.InterfaceByName(eth0Iface) + if err != nil { + return nil, fmt.Errorf("failed to determine index for interface '%s': %w", eth0Iface, err) + } + + return &mgr{ + nlConn: netlink, + loIdx: uint32(lo.Index), + eth0Idx: uint32(eth0.Index), + }, nil +} + +// Assign creates a new global-scoped IPv6 address on +func (m *mgr) Assign(ip string) error { + addr, err := netip.ParseAddr(ip) + if err != nil { + return fmt.Errorf("invalid IP: %w", err) + } + if addr.Is4() || addr.Is4In6() { + return fmt.Errorf("IP must be an IPv6 address") + } + + // Equivalent to `ip -6 addr replace "${ip}/124" dev lo scope global` + req := reservedIPv6Addr(m.loIdx, addr) + flags := netlink.Request | netlink.Replace | netlink.Acknowledge + if _, err := m.nlConn.Execute(req, unix.RTM_NEWADDR, flags); err != nil { + return fmt.Errorf("failed to assign address: %w", err) + } + + if err := m.nlConn.Route.Replace(defaultIPv6Route(m.eth0Idx)); err != nil { + return fmt.Errorf("failed to replace default IPv6 route on eth0: %w", err) + } + + return nil +} + +// Unassign removes all global-scoped IPv6 addresses on localhost +func (m *mgr) Unassign() error { + addrs, err := m.nlConn.Address.List() + if err != nil { + fmt.Errorf("failed to list addreses: %w", err) + } + + for _, a := range addrs { + if a.Index == m.loIdx && a.Family == unix.AF_INET6 && a.Scope == unix.RT_SCOPE_UNIVERSE { + if err := m.nlConn.Address.Delete(&a); err != nil { + return fmt.Errorf("failed to delete address '%s' from interface '%s': %w", a.Attributes.Address, loIface, err) + } + } + } + + route := defaultIPv6Route(m.eth0Idx) + if _, err := m.nlConn.Route.Get(route); err != nil { + fmt.Errorf("failed to check if default IPv6 route already exists: %w", err) + } + if err := m.nlConn.Route.Delete(defaultIPv6Route(m.eth0Idx)); err != nil { + return fmt.Errorf("failed to remove default IPv6 route on %s: %w", eth0Iface, err) + } + + return nil +} + +func reservedIPv6Addr(intfIdx uint32, addr netip.Addr) *rtnetlink.AddressMessage { + return &rtnetlink.AddressMessage{ + Family: unix.AF_INET6, + PrefixLength: prefixLen, + Scope: unix.RT_SCOPE_UNIVERSE, // global + Index: intfIdx, + Attributes: &rtnetlink.AddressAttributes{ + Address: net.IP(addr.AsSlice()), + }, + } +} + +// defaultIPv6Route returns a route equivalent to `ip -6 route replace default dev eth0` +func defaultIPv6Route(intfIdx uint32) *rtnetlink.RouteMessage { + return &rtnetlink.RouteMessage{ + Family: unix.AF_INET6, + Table: unix.RT_TABLE_MAIN, + Protocol: unix.RTPROT_BOOT, + Type: unix.RTN_UNICAST, + Scope: unix.RT_SCOPE_UNIVERSE, + DstLength: 0, // default route + Attributes: rtnetlink.RouteAttributes{ + Dst: nil, // default route + OutIface: intfIdx, + Priority: 1024, + }, + } +} diff --git a/internal/reservedipv6/reserved_ipv6_test.go b/internal/reservedipv6/reserved_ipv6_test.go new file mode 100644 index 0000000..df1795e --- /dev/null +++ b/internal/reservedipv6/reserved_ipv6_test.go @@ -0,0 +1,3 @@ +package reservedipv6 + +var _ Manager = (*mgr)(nil)