Skip to content

Commit

Permalink
add handling for Reserved IPv6 assignment and unassignment
Browse files Browse the repository at this point in the history
  • Loading branch information
anitgandhi committed Dec 12, 2024
1 parent ba1d425 commit 377c300
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 8 deletions.
41 changes: 35 additions & 6 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ import (
"syscall"
"time"

"github.com/jsimonetti/rtnetlink/v2"

"github.com/digitalocean/droplet-agent/internal/config"
"github.com/digitalocean/droplet-agent/internal/log"
"github.com/digitalocean/droplet-agent/internal/metadata"
"github.com/digitalocean/droplet-agent/internal/metadata/actioner"
"github.com/digitalocean/droplet-agent/internal/metadata/updater"
"github.com/digitalocean/droplet-agent/internal/metadata/watcher"
"github.com/digitalocean/droplet-agent/internal/reservedipv6"
"github.com/digitalocean/droplet-agent/internal/sysaccess"
)

Expand Down Expand Up @@ -51,19 +54,45 @@ func main() {
log.Fatal("failed to initialize SSHManager: %v", err)
}

doManagedKeysActioner := actioner.NewDOManagedKeysActioner(sshMgr)
// create the watcher
metadataWatcher := newMetadataWatcher(&watcher.Conf{SSHPort: sshMgr.SSHDPort()})
metadataWatcher.RegisterActioner(doManagedKeysActioner)
infoUpdater := updater.NewAgentInfoUpdater()

// monitor sshd_config
// ssh managed keys
doManagedKeysActioner := actioner.NewDOManagedKeysActioner(sshMgr)
metadataWatcher.RegisterActioner(doManagedKeysActioner)
go mustMonitorSSHDConfig(sshMgr)

// Launch background jobs
bgJobsCtx, bgJobsCancel := context.WithCancel(context.Background())
go bgJobsRemoveExpiredDOTTYKeys(bgJobsCtx, sshMgr, cfg.AuthorizedKeysCheckInterval)

// reserved ipv6
if cfg.ManageReservedIPv6 {
log.Info("Reserved IPv6 management enabled")
conn, err := rtnetlink.Dial(nil)
if err != nil {
log.Fatal("failed to create netlink client: %v", err)
}
defer conn.Close()

rip6Manager, err := reservedipv6.NewManager(conn)
if err != nil {
log.Fatal("Failed to create Reserved IPv6 manager: %v", err)
}

rip6Actioner := actioner.NewReservedIPv6Actioner(rip6Manager)
metadataWatcher.RegisterActioner(rip6Actioner)

// fetch first at the beginning
// TODO: remove this debugging
if md, err := watcher.FetchMetadata(); err != nil {
log.Info("failed to fetch metadata at startup: %v", err)
} else {
log.Info("running Reserved IPv6 action")
rip6Actioner.Do(md)
}
}

// handle shutdown
infoUpdater := updater.NewAgentInfoUpdater()
go handleShutdown(bgJobsCancel, metadataWatcher, infoUpdater, sshMgr)

// report agent status and ssh info
Expand Down
5 changes: 5 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Conf struct {
CustomSSHDPort int
CustomSSHDCfgFile string
AuthorizedKeysCheckInterval time.Duration

ManageReservedIPv6 bool
}

// Init initializes the agent's configuration
Expand All @@ -40,9 +42,12 @@ func Init() *Conf {

fs.BoolVar(&cfg.UseSyslog, "syslog", false, "Use syslog service for logging")
fs.BoolVar(&cfg.DebugMode, "debug", false, "Turn on debug mode")

fs.IntVar(&cfg.CustomSSHDPort, "sshd_port", 0, "The port sshd is binding to")
fs.StringVar(&cfg.CustomSSHDCfgFile, "sshd_config", "", "The location of sshd_config")

fs.BoolVar(&cfg.ManageReservedIPv6, "reserved_ipv6", false, "enable reserved IPv6 assignment/unassignment feature")

ff.Parse(fs, os.Args[1:],
ff.WithEnvVarPrefix("DROPLET_AGENT"),
)
Expand Down
67 changes: 67 additions & 0 deletions internal/metadata/actioner/reserved_ipv6_actioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// SPDX-License-Identifier: Apache-2.0

package actioner

import (
"sync/atomic"

"github.com/digitalocean/droplet-agent/internal/log"
"github.com/digitalocean/droplet-agent/internal/metadata"
"github.com/digitalocean/droplet-agent/internal/reservedipv6"
)

const (
rip6LogPrefix string = "[Reserved IPv6 Actioner]"
)

// NewReservedIPv6Actioner returns a new DigitalOcean Reserved IPv6 actioner
func NewReservedIPv6Actioner(mgr reservedipv6.Manager) MetadataActioner {
return &reservedIPv6Actioner{
mgr: mgr,
allDone: make(chan struct{}, 1),
}
}

type reservedIPv6Actioner struct {
mgr reservedipv6.Manager
activeActions int32
closing uint32
allDone chan struct{}
}

func (da *reservedIPv6Actioner) Do(md *metadata.Metadata) {
atomic.AddInt32(&da.activeActions, 1)
defer func() {
ret := atomic.AddInt32(&da.activeActions, -1)
if ret == 0 && atomic.LoadUint32(&da.closing) == 1 {
close(da.allDone)
}
}()

ipv6 := md.ReservedIP.IPv6

if ipv6.Active {
log.Info("%s Attempting to assign Reserved IPv6 address '%s'", rip6LogPrefix, ipv6.IPAddress)
if err := da.mgr.Assign(ipv6.IPAddress); err != nil {
log.Error("%s failed to assign Reserved IPv6 address '%s': %v", rip6LogPrefix, ipv6.IPAddress, err)
}
log.Info("%s Assigned Reserved IPv6 address '%s'", rip6LogPrefix, ipv6.IPAddress)
} else {
log.Info("%s Attempting to unassign all Reserved IPv6 addresses", rip6LogPrefix)
if err := da.mgr.Unassign(); err != nil {
log.Error("%s failed to unassign all Reserved IPv6 addresses '%s': %v", rip6LogPrefix, err)

Check failure on line 52 in internal/metadata/actioner/reserved_ipv6_actioner.go

View workflow job for this annotation

GitHub Actions / test (1.22.3, ubuntu-latest)

github.com/digitalocean/droplet-agent/internal/log.Error format %v reads arg #3, but call has 2 args
}
log.Info("%s Unassigned all Reserved IPv6 addresses", rip6LogPrefix)
}
}

func (da *reservedIPv6Actioner) Shutdown() {
log.Info("%s Shutting down", rip6LogPrefix)
atomic.StoreUint32(&da.closing, 1)
if atomic.LoadInt32(&da.activeActions) != 0 {
// if there are still jobs in progress, wait for them to finish
log.Debug("%s Waiting for jobs in progress", rip6LogPrefix)
<-da.allDone
}
log.Info("%s Bye-bye", rip6LogPrefix)
}
7 changes: 7 additions & 0 deletions internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type Metadata struct {
DOTTYStatus AgentStatus `json:"dotty_status,omitempty"`
SSHInfo *SSHInfo `json:"ssh_info,omitempty"`
ManagedKeysEnabled *bool `json:"managed_keys_enabled,omitempty"`

ReservedIP struct {
IPv6 struct {
IPAddress string `json:"ip_address,omitempty"`
Active bool `json:"active,omitempty"`
} `json:"ipv6,omitempty"`
} `json:"reserved_ip,omitempty"`
}

// SSHInfo contains the information of the sshd service running on the droplet
Expand Down
6 changes: 6 additions & 0 deletions internal/metadata/watcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ func (m *metadataFetcherImpl) fetchMetadata() (*metadata.Metadata, error) {
}
return ret, nil
}

// FetchMetadata should only be used for testing
// TODO: remove this
func FetchMetadata() (*metadata.Metadata, error) {
return newMetadataFetcher().fetchMetadata()
}
4 changes: 2 additions & 2 deletions internal/metadata/watcher/web_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"sync"
"time"

"github.com/digitalocean/droplet-agent/internal/log"
"golang.org/x/time/rate"

"github.com/digitalocean/droplet-agent/internal/log"
"github.com/digitalocean/droplet-agent/internal/metadata/actioner"
"golang.org/x/time/rate"
)

type webBasedWatcher struct {
Expand Down
127 changes: 127 additions & 0 deletions internal/reservedipv6/reserved_ipv6.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package reservedipv6

import (
"fmt"
"net"
"net/netip"

"github.com/jsimonetti/rtnetlink/v2"
"github.com/mdlayher/netlink"
"golang.org/x/sys/unix"
)

const (
logPrefix string = "[Reserved IPv6 Manager]"

loIface string = "lo"
eth0Iface string = "eth0"
prefixLen uint8 = 124
)

type Manager interface {
Assign(ip string) error
Unassign() error
}

type mgr struct {
nlConn *rtnetlink.Conn
loIdx uint32
eth0Idx uint32
}

func NewManager(netlink *rtnetlink.Conn) (Manager, error) {
lo, err := net.InterfaceByName(loIface)
if err != nil {
return nil, fmt.Errorf("failed to determine index for interface '%s': %w", loIface, err)
}

eth0, err := net.InterfaceByName(eth0Iface)
if err != nil {
return nil, fmt.Errorf("failed to determine index for interface '%s': %w", eth0Iface, err)
}

return &mgr{
nlConn: netlink,
loIdx: uint32(lo.Index),
eth0Idx: uint32(eth0.Index),
}, nil
}

// Assign creates a new global-scoped IPv6 address on
func (m *mgr) Assign(ip string) error {
addr, err := netip.ParseAddr(ip)
if err != nil {
return fmt.Errorf("invalid IP: %w", err)
}
if addr.Is4() || addr.Is4In6() {
return fmt.Errorf("IP must be an IPv6 address")
}

// Equivalent to `ip -6 addr replace "${ip}/124" dev lo scope global`
req := reservedIPv6Addr(m.loIdx, addr)
flags := netlink.Request | netlink.Replace | netlink.Acknowledge
if _, err := m.nlConn.Execute(req, unix.RTM_NEWADDR, flags); err != nil {
return fmt.Errorf("failed to assign address: %w", err)
}

if err := m.nlConn.Route.Replace(defaultIPv6Route(m.eth0Idx)); err != nil {
return fmt.Errorf("failed to replace default IPv6 route on eth0: %w", err)
}

return nil
}

// Unassign removes all global-scoped IPv6 addresses on localhost
func (m *mgr) Unassign() error {
addrs, err := m.nlConn.Address.List()
if err != nil {
fmt.Errorf("failed to list addreses: %w", err)
}

for _, a := range addrs {
if a.Index == m.loIdx && a.Family == unix.AF_INET6 && a.Scope == unix.RT_SCOPE_UNIVERSE {
if err := m.nlConn.Address.Delete(&a); err != nil {
return fmt.Errorf("failed to delete address '%s' from interface '%s': %w", a.Attributes.Address, loIface, err)
}
}
}

route := defaultIPv6Route(m.eth0Idx)
if _, err := m.nlConn.Route.Get(route); err != nil {
fmt.Errorf("failed to check if default IPv6 route already exists: %w", err)
}
if err := m.nlConn.Route.Delete(defaultIPv6Route(m.eth0Idx)); err != nil {
return fmt.Errorf("failed to remove default IPv6 route on %s: %w", eth0Iface, err)
}

return nil
}

func reservedIPv6Addr(intfIdx uint32, addr netip.Addr) *rtnetlink.AddressMessage {
return &rtnetlink.AddressMessage{
Family: unix.AF_INET6,
PrefixLength: prefixLen,
Scope: unix.RT_SCOPE_UNIVERSE, // global
Index: intfIdx,
Attributes: &rtnetlink.AddressAttributes{
Address: net.IP(addr.AsSlice()),
},
}
}

// defaultIPv6Route returns a route equivalent to `ip -6 route replace default dev eth0`
func defaultIPv6Route(intfIdx uint32) *rtnetlink.RouteMessage {
return &rtnetlink.RouteMessage{
Family: unix.AF_INET6,
Table: unix.RT_TABLE_MAIN,
Protocol: unix.RTPROT_BOOT,
Type: unix.RTN_UNICAST,
Scope: unix.RT_SCOPE_UNIVERSE,
DstLength: 0, // default route
Attributes: rtnetlink.RouteAttributes{
Dst: nil, // default route
OutIface: intfIdx,
Priority: 1024,
},
}
}
3 changes: 3 additions & 0 deletions internal/reservedipv6/reserved_ipv6_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package reservedipv6

var _ Manager = (*mgr)(nil)

0 comments on commit 377c300

Please sign in to comment.