From b62610a3124b76471acd30f43a36d24bb581b895 Mon Sep 17 00:00:00 2001 From: Joe Webster <31218426+jwebster7@users.noreply.github.com> Date: Thu, 21 Sep 2023 14:56:53 -0500 Subject: [PATCH] Add trident-acp api client This commit adds a new API client to enable proprietary workflows in Trident. --- acp/client.go | 144 ++++++++++++++++++++++ acp/client_test.go | 291 +++++++++++++++++++++++++++++++++++++++++++++ acp/config.go | 37 ++++++ acp/plugin.go | 75 ++++++++++++ acp/plugin_test.go | 90 ++++++++++++++ acp/types.go | 20 ++++ main.go | 24 +++- 7 files changed, 680 insertions(+), 1 deletion(-) create mode 100644 acp/client.go create mode 100644 acp/client_test.go create mode 100644 acp/config.go create mode 100644 acp/plugin.go create mode 100644 acp/plugin_test.go create mode 100644 acp/types.go diff --git a/acp/client.go b/acp/client.go new file mode 100644 index 000000000..1b3b50a90 --- /dev/null +++ b/acp/client.go @@ -0,0 +1,144 @@ +// Copyright 2023 NetApp, Inc. All Rights Reserved. + +package acp + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/netapp/trident/utils/version" +) + +// Client is a concrete reference for interfacing directly with ACP REST APIs. +type Client struct { + baseURL string + httpClient http.Client +} + +// NewAPI accepts a base URL and timeout and returns a reference for interfacing directly with trident-acp REST APIs. +func NewAPI(baseURL string, timeout time.Duration) (*Client, error) { + if _, err := url.Parse(baseURL); baseURL == "" || err != nil { + return nil, fmt.Errorf("invalid URL [%s] specified for trident-acp API client; %v", baseURL, err) + } else if timeout < 1 { + return nil, fmt.Errorf("invalid timeout [%v] specified for trident-acp API client", timeout) + } + + return &Client{ + baseURL: baseURL, + httpClient: http.Client{Timeout: timeout}, + }, nil +} + +type getVersionResponse struct { + Version string `json:"version"` + Error string `json:"error,omitempty"` +} + +// GetVersion gets the installed ACP version. +// Example: http://:/trident-acp/v1/version +func (c *Client) GetVersion(ctx context.Context) (*version.Version, error) { + // Create a new HTTP request. + url := c.baseURL + versionEndpoint + req, err := c.newRequest(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + + // Assign headers. + req.Header.Set("User-Agent", userAgent()) + + // Make the request. + res, data, err := c.invokeAPI(req) + if err != nil { + return nil, err + } else if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("non-ok status code: [%v]; status %v", res.StatusCode, res.Status) + } + + // Parse the response data into a struct. + var versionResponse getVersionResponse + if err := json.Unmarshal(data, &versionResponse); err != nil { + return nil, fmt.Errorf("failed to unmarshal response body; %v", err) + } else if versionResponse.Error != "" { + return nil, fmt.Errorf("failed to get trident-acp version; %v", err) + } + + return version.ParseDate(versionResponse.Version) +} + +// Entitled accepts a feature and makes a request to ACP APIs to check if the supplied feature in Trident is allowed. +// Example: http://:/trident-acp/v1/entitled?feature= +func (c *Client) Entitled(ctx context.Context, feature string) (bool, error) { + // Create a new HTTP request. + url := c.baseURL + entitledEndpoint + req, err := c.newRequest(ctx, http.MethodGet, url, nil) + if err != nil { + return false, nil + } + + // Assign headers. + req.Header.Set("User-Agent", userAgent()) + + // Add or Set query parameters. + params := req.URL.Query() + params.Set("feature", feature) + req.URL.RawQuery = params.Encode() + + // Make the request. + // TODO: Inspect the response body when the Entitled API changes. + res, _, err := c.invokeAPI(req) + if err != nil { + return false, err + } else if res.StatusCode != http.StatusOK { + return false, fmt.Errorf("non-OK status code: [%v]; status %v", res.StatusCode, res.Status) + } + + return true, nil +} + +// newRequest accepts necessary fields to construct a new http request. +// It returns a new http request or an error if one occurs. +func (c *Client) newRequest(ctx context.Context, method, url string, data []byte) (*http.Request, error) { + // Ideally, the context should never be empty. If it is, set it to the default value used in new requests. + if ctx == nil { + ctx = context.Background() + } + + // Construct a new http request. + var body io.Reader + if data != nil { + body = bytes.NewBuffer(data) + } + + req, err := http.NewRequestWithContext(ctx, method, url, body) + if err != nil { + return nil, fmt.Errorf("failed to create new request: [%s, %s]; %v", method, url, err) + } + + return req, nil +} + +// invokeAPI accepts a http request, makes the request, and parses the response. +// It returns the http response, the response body or an error if one occurs. +func (c *Client) invokeAPI(req *http.Request) (*http.Response, []byte, error) { + // Make the request. + res, err := c.httpClient.Do(req) + if err != nil { + return nil, nil, fmt.Errorf("error making request: [%s]; %v", req.URL.String(), err) + } + defer func() { _ = res.Body.Close() }() + + // Read the response body. + data, err := io.ReadAll(res.Body) + if err != nil { + return res, nil, fmt.Errorf("failed to parse response body; %v", err) + } + + return res, data, nil +} diff --git a/acp/client_test.go b/acp/client_test.go new file mode 100644 index 000000000..f790b6b84 --- /dev/null +++ b/acp/client_test.go @@ -0,0 +1,291 @@ +// Copyright 2023 NetApp, Inc. All Rights Reserved. + +package acp + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/netapp/trident/logging" +) + +func TestMain(m *testing.M) { + // Disable any standard log output + logging.InitLogOutput(io.Discard) + os.Exit(m.Run()) +} + +// newHttpTestServer sets up a httptest server with a supplied handler and pattern. +func newHttpTestServer(pattern string, handler http.HandlerFunc) *httptest.Server { + router := http.NewServeMux() + router.HandleFunc(pattern, handler) + return httptest.NewServer(router) +} + +func TestNewAPI(t *testing.T) { + tests := map[string]struct { + baseURL string + timeout time.Duration + }{ + "WithMisconfiguredBaseURL": { + baseURL: "://127.0.0.1:8100", + timeout: httpClientTimeout, + }, + // ΗΤΤΡ here is defined using unicode: Η = capital eta; Τ = capital tau; Ρ=capital rho + "WithInvalidCharactersInBaseURL": { + baseURL: "ΗΤΤΡ://127.0.0.1:8100", + timeout: httpClientTimeout, + }, + "WithNegativeTimeoutSpecified": { + baseURL: "http://127.0.0.1.8100", + timeout: time.Duration(-1), + }, + "WithZeroTimeoutSpecified": { + baseURL: "http://127.0.0.1.8100", + timeout: time.Duration(0), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + c, err := NewAPI(test.baseURL, test.timeout) + assert.Nil(t, c, "expected nil client") + assert.Error(t, err, "expected error") + }) + } +} + +func TestClient_GetVersion(t *testing.T) { + ctx := context.Background() + t.Run("WithNoServerRunning", func(t *testing.T) { + server := newHttpTestServer(versionEndpoint, func(w http.ResponseWriter, request *http.Request) {}) + // Close the server immediately. + server.Close() + + client := &Client{baseURL: server.URL, httpClient: *server.Client()} + + version, err := client.GetVersion(ctx) + assert.Nil(t, version, "expected version to be nil") + assert.Error(t, err, "expected error") + }) + + t.Run("WithInternalServerError", func(t *testing.T) { + handler := func(w http.ResponseWriter, request *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte{}) + } + server := newHttpTestServer(versionEndpoint, handler) + defer server.Close() + + client := &Client{baseURL: server.URL, httpClient: *server.Client()} + + version, err := client.GetVersion(ctx) + assert.Nil(t, version, "expected version to be nil") + assert.Error(t, err, "expected error") + }) + + t.Run("WithIncorrectResponseType", func(t *testing.T) { + handler := func(w http.ResponseWriter, request *http.Request) { + if _, err := w.Write(nil); err != nil { + t.Fatalf("failed to write response for GetVersion") + } + } + server := newHttpTestServer(versionEndpoint, handler) + defer server.Close() + + client := &Client{baseURL: server.URL, httpClient: *server.Client()} + + version, err := client.GetVersion(ctx) + assert.Nil(t, version, "expected version to be nil") + assert.Error(t, err, "expected error") + }) + + t.Run("WithErrorInResponse", func(t *testing.T) { + expectedVersion := "23.10.0-custom+unknown" + handler := func(w http.ResponseWriter, request *http.Request) { + v := &getVersionResponse{Version: expectedVersion, Error: "non-empty error"} + bytes, err := json.Marshal(v) + if err != nil { + t.Fatalf("failed to create fake response for GetVersion") + } + + if _, err = w.Write(bytes); err != nil { + t.Fatalf("failed to write response for GetVersion") + } + } + server := newHttpTestServer(versionEndpoint, handler) + defer server.Close() + + client := &Client{baseURL: server.URL, httpClient: *server.Client()} + + version, err := client.GetVersion(ctx) + assert.Nil(t, version, "expected version to be nil") + assert.Error(t, err, "expected error") + }) + + t.Run("WithCorrectResponseType", func(t *testing.T) { + expectedVersion := "23.10.0-custom+unknown" + handler := func(w http.ResponseWriter, request *http.Request) { + bytes, err := json.Marshal(&getVersionResponse{Version: expectedVersion}) + if err != nil { + t.Fatalf("failed to create fake response for GetVersion") + } + + if _, err = w.Write(bytes); err != nil { + t.Fatalf("failed to write response for GetVersion") + } + } + server := newHttpTestServer(versionEndpoint, handler) + defer server.Close() + + client := &Client{baseURL: server.URL, httpClient: http.Client{Timeout: httpClientTimeout}} + + version, err := client.GetVersion(ctx) + assert.NotNil(t, version, "expected version to exist") + assert.Equal(t, version.String(), expectedVersion, "expected equivalent versions") + assert.NoError(t, err, "unexpected error") + }) +} + +func TestClient_Entitled(t *testing.T) { + ctx := context.Background() + feature := FeatureSnapshotMirrorUpdate + t.Run("WithNoServerRunning", func(t *testing.T) { + server := newHttpTestServer("/any", func(w http.ResponseWriter, request *http.Request) {}) + // Close the server immediately. + server.Close() + + client := &Client{baseURL: server.URL, httpClient: *server.Client()} + + allowed, err := client.Entitled(ctx, feature) + assert.False(t, allowed, "expected allowed to be false") + assert.Error(t, err, "expected error") + }) + + t.Run("WithInternalServerError", func(t *testing.T) { + handler := func(w http.ResponseWriter, request *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte{}) + } + server := newHttpTestServer(entitledEndpoint, handler) + defer server.Close() + + client := &Client{baseURL: server.URL, httpClient: *server.Client()} + + allowed, err := client.Entitled(ctx, feature) + assert.False(t, allowed, "expected allowed to be false") + assert.Error(t, err, "expected error") + }) + + t.Run("WithAllowedResponse", func(t *testing.T) { + handler := func(w http.ResponseWriter, request *http.Request) { + w.WriteHeader(200) + } + server := newHttpTestServer(entitledEndpoint, handler) + defer server.Close() + + client := &Client{baseURL: server.URL, httpClient: http.Client{Timeout: httpClientTimeout}} + + allowed, err := client.Entitled(ctx, feature) + assert.True(t, allowed, "expected allowed to be true") + assert.NoError(t, err, "unexpected error") + }) + // TODO: Add unit tests for inspecting the response body when the Entitled API changes. +} + +func TestClient_newRequest(t *testing.T) { + tests := map[string]struct { + ctx context.Context + method, url string + data []byte + }{ + "WithNilContext": { + method: "GET", + url: versionEndpoint, + }, + "WithNonNilBody": { + method: "GET", + url: versionEndpoint, + data: []byte{'t', 'r', 'i', 'd', 'e', 'n', 't'}, + }, + "WithEmptyMethod": { + method: "", + url: versionEndpoint, + }, + "WithEmptyURL": { + method: "GET", + url: "", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + client, err := NewAPI("http://127.0.0.1:8100", httpClientTimeout) + if err != nil { + t.Fatalf("failed to create ACP API client for tests") + } + + req, err := client.newRequest(test.ctx, test.method, test.url, test.data) + assert.NotNil(t, req) + assert.NoError(t, err) + + var body []byte + if test.data != nil { + body, err = io.ReadAll(req.Body) + if err != nil { + t.Fatalf("failed to parse fake request body; %v", err) + } + } + assert.Equal(t, test.data, body) + }) + } +} + +func TestClient_invokeAPI(t *testing.T) { + ctx := context.Background() + t.Run("WithNoServerRunning", func(t *testing.T) { + server := newHttpTestServer(versionEndpoint, func(w http.ResponseWriter, request *http.Request) {}) + // Close the server immediately. + server.Close() + + client := &Client{baseURL: server.URL, httpClient: *server.Client()} + req, err := client.newRequest(ctx, "GET", server.URL+versionEndpoint, nil) + if err != nil { + t.Fatalf("failed to create fake test request; %v", err) + } + + res, body, err := client.invokeAPI(req) + assert.Nil(t, res, "expected res to be nil") + assert.Nil(t, body, "expected body to be nil") + assert.Error(t, err, "expected error") + }) + + t.Run("WithInternalServerError", func(t *testing.T) { + handler := func(w http.ResponseWriter, request *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte{}) + } + server := newHttpTestServer(versionEndpoint, handler) + defer server.Close() + + client := &Client{baseURL: server.URL, httpClient: *server.Client()} + req, err := client.newRequest(ctx, "GET", server.URL+versionEndpoint, nil) + if err != nil { + t.Fatalf("failed to create fake test request; %v", err) + } + + res, body, err := client.invokeAPI(req) + assert.NotNil(t, res, "expected res to not be nil") + assert.Equal(t, http.StatusInternalServerError, res.StatusCode, "expected internal server error") + assert.NotNil(t, body, "expected body to not be nil") + assert.NoError(t, err, "expected no error") + }) +} diff --git a/acp/config.go b/acp/config.go new file mode 100644 index 000000000..7eaeae458 --- /dev/null +++ b/acp/config.go @@ -0,0 +1,37 @@ +// Copyright 2023 NetApp, Inc. All Rights Reserved. + +package acp + +import ( + "fmt" + "time" + + "github.com/netapp/trident/config" +) + +const ( + // ACP REST API constants. + + appName = "trident-acp" + apiVersion = "1" + + DefaultBaseURL = "http://127.0.0.1:8100" + + entitledEndpoint = "/" + appName + "/v" + apiVersion + "/entitled" + versionEndpoint = "/" + appName + "/v" + apiVersion + "/version" + + httpClientTimeout = time.Second * 30 + + // Feature constants. + + FeatureSnapshotRestore = "SnapshotRestore" + FeatureSnapshotMirrorUpdate = "SnapshotMirrorUpdate" + FeatureInFlightEncryption = "InFlightEncryption" + FeatureReadOnlyClone = "ReadOnlyClone" +) + +// userAgent is a helper for adding a User-Agent header for Trident to ACP API calls. +// Example: trident/v23.07.0 +func userAgent() string { + return fmt.Sprintf("%s/v%s", config.OrchestratorName, config.OrchestratorVersion.String()) +} diff --git a/acp/plugin.go b/acp/plugin.go new file mode 100644 index 000000000..73db0f252 --- /dev/null +++ b/acp/plugin.go @@ -0,0 +1,75 @@ +// Copyright 2023 NetApp, Inc. All Rights Reserved. + +package acp + +import ( + "context" + "time" + + "github.com/cenkalti/backoff/v4" + + . "github.com/netapp/trident/logging" + "github.com/netapp/trident/utils/version" +) + +var ( + initialInterval = 1 * time.Second + multiplier = 1.414 // approx sqrt(2) + maxInterval = 120 * time.Second + randomFactor = 0.1 + maxElapsedTime = 300 * time.Second +) + +// Activate reaches out to ACP REST API and attempts to establish a connection. It will retry for 5 minutes. +// It should be called by each member of Trident that must consume ACP APIs to complete premium workflows. +// It is up to the caller to decide whether this executes as a synchronous or asynchronous routine. +func (c *Client) Activate() error { + var err error + ctx := GenerateRequestContext( + context.Background(), "", ContextSourceInternal, WorkflowPluginActivate, LogLayerUtils, + ) + + var v *version.Version + activate := func() error { + v, err = c.GetVersion(ctx) + return err + } + + activateBackoff := backoff.NewExponentialBackOff() + activateBackoff.InitialInterval = initialInterval + activateBackoff.Multiplier = multiplier + activateBackoff.MaxInterval = maxInterval + activateBackoff.RandomizationFactor = randomFactor + activateBackoff.MaxElapsedTime = maxElapsedTime + + activateNotify := func(err error, duration time.Duration) { + Logc(ctx).WithFields(LogFields{ + "increment": duration, + "error": err, + }).Debug("Could not get trident-acp version; retrying...") + } + + err = backoff.RetryNotify(activate, activateBackoff, activateNotify) + if err != nil { + Logc(ctx).WithError(err).Error("Unable to communicate with trident-acp REST API.") + return err + } + + Logc(ctx).WithField("version", v.String()).Info("The trident-acp REST API is responsive.") + return nil +} + +func (c *Client) Deactivate() error { + // Close any idle connections if they exist. + c.httpClient.CloseIdleConnections() + return nil +} + +func (c *Client) GetName() string { + return appName +} + +// Version returns the API client version. +func (c *Client) Version() string { + return apiVersion +} diff --git a/acp/plugin_test.go b/acp/plugin_test.go new file mode 100644 index 000000000..536ef51b4 --- /dev/null +++ b/acp/plugin_test.go @@ -0,0 +1,90 @@ +// Copyright 2023 NetApp, Inc. All Rights Reserved. + +package acp + +import ( + "encoding/json" + "net/http" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// setupBackoff is a helper to set the backoff values used in the API client's Activate method. +func setupBackoff(interval, intervalCeiling, timeCeiling time.Duration, timeMultiplier, randomization float64) { + initialInterval = interval + maxInterval = intervalCeiling + maxElapsedTime = timeCeiling + multiplier = timeMultiplier + randomFactor = randomization +} + +func TestClient_Activate(t *testing.T) { + t.Run("WithNoServerRunning", func(t *testing.T) { + // Reset the backoff to the initial values after the test exits. + defer setupBackoff(initialInterval, maxInterval, maxElapsedTime, multiplier, randomFactor) + setupBackoff(50*time.Millisecond, 100*time.Millisecond, 250*time.Millisecond, 1.414, 1.0) + + server := newHttpTestServer(versionEndpoint, func(w http.ResponseWriter, request *http.Request) {}) + // Close the server immediately. + server.Close() + + client := &Client{baseURL: server.URL, httpClient: *server.Client()} + err := client.Activate() + // For now expect no error even though one occurs. + assert.Error(t, err, "expected error") + }) + + t.Run("WithCorrectResponseTypeAsync", func(t *testing.T) { + // Reset the backoff to the initial values after the test exits. + defer setupBackoff(initialInterval, maxInterval, maxElapsedTime, multiplier, randomFactor) + setupBackoff(50*time.Millisecond, 100*time.Millisecond, 250*time.Millisecond, 1.414, 1.0) + + handler := func(w http.ResponseWriter, request *http.Request) { + bytes, err := json.Marshal(&getVersionResponse{Version: "23.10.0-custom+unknown"}) + if err != nil { + t.Fatalf("failed to create fake response for GetVersion") + } + if _, err = w.Write(bytes); err != nil { + t.Fatalf("failed to write response for GetVersion") + } + } + server := newHttpTestServer(versionEndpoint, handler) + defer server.Close() + + client := &Client{baseURL: server.URL, httpClient: http.Client{Timeout: httpClientTimeout}} + + var wg sync.WaitGroup + var err error + func() { + wg.Add(1) + defer wg.Done() + err = client.Activate() + // Give the backoff-retry time to make the API calls. + time.Sleep(200 * time.Millisecond) + }() + + wg.Wait() + assert.NoError(t, err, "unexpected error") + }) +} + +func TestClient_Deactivate(t *testing.T) { + client := &Client{baseURL: "", httpClient: http.Client{}} + err := client.Deactivate() + assert.NoError(t, err) +} + +func TestClient_GetName(t *testing.T) { + client := &Client{baseURL: "", httpClient: http.Client{}} + name := client.GetName() + assert.Equal(t, appName, name) +} + +func TestClient_Version(t *testing.T) { + client := &Client{baseURL: "", httpClient: http.Client{}} + name := client.Version() + assert.Equal(t, apiVersion, name) +} diff --git a/acp/types.go b/acp/types.go new file mode 100644 index 000000000..5fdbb6fbd --- /dev/null +++ b/acp/types.go @@ -0,0 +1,20 @@ +// Copyright 2023 NetApp, Inc. All Rights Reserved. + +package acp + +import ( + "context" + + "github.com/netapp/trident/utils/version" +) + +// API represents a set of methods for talking with Trident-ACP REST APIs. +type API interface { + Activate() error + Deactivate() error + GetName() string + Version() string + + GetVersion(context.Context) (*version.Version, error) + Entitled(context.Context, string) (bool, error) +} diff --git a/main.go b/main.go index a5afaa4f8..717e3ab9f 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "syscall" "time" + "github.com/netapp/trident/acp" "github.com/netapp/trident/config" "github.com/netapp/trident/core" "github.com/netapp/trident/frontend" @@ -74,7 +75,8 @@ var ( nodePrep = flag.Bool("node_prep", true, "Attempt to install required packages on nodes.") // Trident-ACP - enableACP = flag.Bool("enable_acp", false, "Enable the trident-acp premium features.") + enableACP = flag.Bool("enable_acp", false, "Enable the trident-acp premium features.") + acpAddress = flag.String("acp_address", acp.DefaultBaseURL, "Specify the trident-acp REST API address.") // Persistence useInMemory = flag.Bool("no_persistence", false, "Does not persist "+ @@ -510,6 +512,26 @@ func main() { } } + var acpAPI acp.API + if *enableACP { + // See if Trident-ACP is responsive at the supplied address. + acpAPI, err = acp.NewAPI(*acpAddress, config.HTTPTimeout) + if err != nil { + Log().WithField("address", *acpAddress).WithError(err).Warning( + "Failed to initialize trident-acp API client; premium workflows may not be available.", + ) + } else { + Log().WithField("name", acpAPI.GetName()).Info("Created trident-acp API client.") + + // Check if ACP is there initially; this check should not be blocking. + go func() { + if err := acpAPI.Activate(); err != nil { + Log().Error(err) + } + }() + } + } + // Bootstrap the orchestrator and start its frontends. Some frontends, notably REST and Docker, must // start before the core so that the external interfaces are minimally responding while the core is // still initializing. Other frontends such as legacy Kubernetes and CSI benefit from starting after