Skip to content

Commit

Permalink
Merge pull request #204 from digitalocean/support-custom-vpc
Browse files Browse the repository at this point in the history
Add support for specifying the vpc that the cluster belongs to
  • Loading branch information
nanzhong authored Apr 3, 2019
2 parents de436ef + 19afa09 commit feab009
Show file tree
Hide file tree
Showing 15 changed files with 946 additions and 79 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

[[constraint]]
name = "github.com/digitalocean/godo"
version = "1.9.0"
version = "1.11.0"

[[constraint]]
branch = "master"
Expand Down
10 changes: 5 additions & 5 deletions cloud-controller-manager/do/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
doAccessTokenEnv string = "DO_ACCESS_TOKEN"
doOverrideAPIURLEnv string = "DO_OVERRIDE_URL"
doClusterIDEnv string = "DO_CLUSTER_ID"
doClusterVPCIDEnv string = "DO_CLUSTER_VPC_ID"
providerName string = "digitalocean"
)

Expand All @@ -49,7 +50,6 @@ func (t *tokenSource) Token() (*oauth2.Token, error) {
}

type cloud struct {
clusterID string
client *godo.Client
instances cloudprovider.Instances
zones cloudprovider.Zones
Expand Down Expand Up @@ -87,14 +87,14 @@ func newCloud() (cloudprovider.Interface, error) {
}

clusterID := os.Getenv(doClusterIDEnv)
resources := newResources()
clusterVPCID := os.Getenv(doClusterVPCIDEnv)
resources := newResources(clusterID, clusterVPCID)

return &cloud{
clusterID: clusterID,
client: doClient,
instances: newInstances(resources, region),
zones: newZones(resources, region),
loadbalancers: newLoadBalancers(resources, doClient, region, clusterID),
loadbalancers: newLoadBalancers(resources, doClient, region),

resources: resources,
}, nil
Expand All @@ -110,7 +110,7 @@ func (c *cloud) Initialize(clientBuilder controller.ControllerClientBuilder) {
clientset := clientBuilder.ClientOrDie("do-shared-informers")
sharedInformer := informers.NewSharedInformerFactory(clientset, 0)

res := NewResourcesController(c.clusterID, c.resources, sharedInformer.Core().V1().Services(), clientset, c.client)
res := NewResourcesController(c.resources, sharedInformer.Core().V1().Services(), clientset, c.client)

sharedInformer.Start(nil)
sharedInformer.WaitForCacheSync(nil)
Expand Down
8 changes: 4 additions & 4 deletions cloud-controller-manager/do/loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,11 @@ type loadBalancers struct {
}

// newLoadbalancers returns a cloudprovider.LoadBalancer whose concrete type is a *loadbalancer.
func newLoadBalancers(resources *resources, client *godo.Client, region, clusterID string) cloudprovider.LoadBalancer {
func newLoadBalancers(resources *resources, client *godo.Client, region string) cloudprovider.LoadBalancer {
return &loadBalancers{
resources: resources,
client: client,
region: region,
clusterID: clusterID,
lbActiveTimeout: defaultActiveTimeout,
lbActiveCheckTick: defaultActiveCheckTick,
}
Expand Down Expand Up @@ -345,8 +344,8 @@ func (l *loadBalancers) buildLoadBalancerRequest(service *v1.Service, nodes []*v
}

var tags []string
if l.clusterID != "" {
tags = []string{buildK8sTag(l.clusterID)}
if l.resources.clusterID != "" {
tags = []string{buildK8sTag(l.resources.clusterID)}
}

return &godo.LoadBalancerRequest{
Expand All @@ -360,6 +359,7 @@ func (l *loadBalancers) buildLoadBalancerRequest(service *v1.Service, nodes []*v
Algorithm: algorithm,
RedirectHttpToHttps: redirectHTTPToHTTPS,
EnableProxyProtocol: enableProxyProtocol,
VPCUUID: l.resources.clusterVPCID,
}, nil
}

Expand Down
130 changes: 86 additions & 44 deletions cloud-controller-manager/do/loadbalancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1958,7 +1958,7 @@ func Test_buildLoadBalancerRequest(t *testing.T) {
for _, test := range testcases {
t.Run(test.name, func(t *testing.T) {
fakeClient := newFakeLBClient(&fakeLBService{})
fakeResources := newResources()
fakeResources := newResources("", "")
fakeResources.UpdateDroplets(test.droplets)

lb := &loadBalancers{
Expand Down Expand Up @@ -1988,53 +1988,95 @@ func Test_buildLoadBalancerRequest(t *testing.T) {
}

func Test_buildLoadBalancerRequestWithClusterID(t *testing.T) {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: "test",
Protocol: "TCP",
Port: int32(80),
NodePort: int32(30000),
},
},
},
}
nodes := []*v1.Node{
tests := []struct {
name string
clusterID string
vpcID string
err error
}{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
name: "happy path",
clusterID: clusterID,
vpcID: "vpc_uuid",
},
}
fakeClient := newFakeLBClient(&fakeLBService{})
fakeResources := newResources()
fakeResources.UpdateDroplets([]godo.Droplet{
{
ID: 100,
Name: "node-1",
name: "missing cluster id",
clusterID: "",
vpcID: "vpc_uuid",
},
})

clusterID := "fdda2d9d-0856-4ca4-b8ee-27ca8bfecc77"
lb := &loadBalancers{
resources: fakeResources,
client: fakeClient,
region: "nyc3",
clusterID: clusterID,
}

lbr, err := lb.buildLoadBalancerRequest(service, nodes)
if err != nil {
t.Errorf("got error: %s", err)
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()

wantTags := []string{buildK8sTag(clusterID)}
if !reflect.DeepEqual(lbr.Tags, wantTags) {
t.Errorf("got tags %q, want %q", lbr.Tags, wantTags)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: "test",
Protocol: "TCP",
Port: int32(80),
NodePort: int32(30000),
},
},
},
}
nodes := []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
},
}
fakeClient := newFakeLBClient(&fakeLBService{})
fakeResources := newResources(test.clusterID, test.vpcID)
fakeResources.clusterVPCID = test.vpcID
fakeResources.UpdateDroplets([]godo.Droplet{
{
ID: 100,
Name: "node-1",
},
})

lb := &loadBalancers{
resources: fakeResources,
client: fakeClient,
region: "nyc3",
clusterID: clusterID,
}

lbr, err := lb.buildLoadBalancerRequest(service, nodes)
if test.err != nil {
if err == nil {
t.Fatal("expected error but got none")
}

if want, got := test.err, err; !reflect.DeepEqual(want, got) {
t.Errorf("incorrect err\nwant: %#v\n got: %#v", want, got)
}
return
}
if err != nil {
t.Errorf("got error: %s", err)
}

var wantTags []string
if test.clusterID != "" {
wantTags = []string{buildK8sTag(clusterID)}
}
if !reflect.DeepEqual(lbr.Tags, wantTags) {
t.Errorf("got tags %q, want %q", lbr.Tags, wantTags)
}

if want, got := "vpc_uuid", lbr.VPCUUID; want != got {
t.Errorf("incorrect vpc uuid\nwant: %#v\n got: %#v", want, got)
}
})
}
}

Expand Down Expand Up @@ -2131,7 +2173,7 @@ func Test_nodeToDropletIDs(t *testing.T) {
for _, test := range testcases {
t.Run(test.name, func(t *testing.T) {
fakeClient := newFakeLBClient(&fakeLBService{})
fakeResources := newResources()
fakeResources := newResources("", "")
fakeResources.UpdateDroplets(test.droplets)

lb := &loadBalancers{
Expand Down Expand Up @@ -2236,7 +2278,7 @@ func Test_GetLoadBalancer(t *testing.T) {

for _, test := range testcases {
t.Run(test.name, func(t *testing.T) {
fakeResources := newResources()
fakeResources := newResources("", "")
fakeResources.UpdateLoadBalancers(test.lbs)

lb := &loadBalancers{
Expand Down Expand Up @@ -2440,7 +2482,7 @@ func Test_EnsureLoadBalancer(t *testing.T) {
updateFn: test.updateFn,
}
fakeClient := newFakeLBClient(fakeLB)
fakeResources := newResources()
fakeResources := newResources("", "")
fakeResources.UpdateDroplets(test.droplets)
fakeResources.UpdateLoadBalancers(test.lbs)

Expand Down
19 changes: 11 additions & 8 deletions cloud-controller-manager/do/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type tagMissingError struct {
}

type resources struct {
clusterID string
clusterVPCID string

dropletIDMap map[int]*godo.Droplet
dropletNameMap map[string]*godo.Droplet
loadBalancerIDMap map[string]*godo.LoadBalancer
Expand All @@ -54,8 +57,11 @@ type resources struct {
mutex sync.RWMutex
}

func newResources() *resources {
func newResources(clusterID, clusterVPCID string) *resources {
return &resources{
clusterID: clusterID,
clusterVPCID: clusterVPCID,

dropletIDMap: make(map[int]*godo.Droplet),
dropletNameMap: make(map[string]*godo.Droplet),
loadBalancerIDMap: make(map[string]*godo.LoadBalancer),
Expand Down Expand Up @@ -199,7 +205,6 @@ func (s *tickerSyncer) Sync(name string, period time.Duration, stopCh <-chan str
// resources. It maintains a local state of the resources and
// synchronizes when needed.
type ResourcesController struct {
clusterID string
kclient kubernetes.Interface
gclient *godo.Client
svcLister v1lister.ServiceLister
Expand All @@ -210,14 +215,12 @@ type ResourcesController struct {

// NewResourcesController returns a new resource controller.
func NewResourcesController(
clusterID string,
r *resources,
inf v1informers.ServiceInformer,
k kubernetes.Interface,
g *godo.Client,
) *ResourcesController {
return &ResourcesController{
clusterID: clusterID,
resources: r,
kclient: k,
gclient: g,
Expand All @@ -230,8 +233,8 @@ func NewResourcesController(
func (r *ResourcesController) Run(stopCh <-chan struct{}) {
go r.syncer.Sync("resources syncer", controllerSyncResourcesPeriod, stopCh, r.syncResources)

if r.clusterID == "" {
glog.Info("No cluster ID configured -- skipping tags syncing.")
if r.resources.clusterID == "" {
glog.Info("No cluster ID configured -- skipping cluster dependent syncers.")
return
}
go r.syncer.Sync("tags syncer", controllerSyncTagsPeriod, stopCh, r.syncTags)
Expand Down Expand Up @@ -303,7 +306,7 @@ func (r *ResourcesController) syncTags() error {
return nil
}

tag := buildK8sTag(r.clusterID)
tag := buildK8sTag(r.resources.clusterID)
// Tag collected resources with the cluster ID. If the tag does not exist
// (for reasons outlined below), we will create it and retry tagging again.
err = r.tagResources(res)
Expand Down Expand Up @@ -334,7 +337,7 @@ func (r *ResourcesController) syncTags() error {
func (r *ResourcesController) tagResources(res []godo.Resource) error {
ctx, cancel := context.WithTimeout(context.Background(), syncTagsTimeout)
defer cancel()
tag := buildK8sTag(r.clusterID)
tag := buildK8sTag(r.resources.clusterID)
resp, err := r.gclient.Tags.TagResources(ctx, tag, &godo.TagResourcesRequest{
Resources: res,
})
Expand Down
Loading

0 comments on commit feab009

Please sign in to comment.