Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add list acls endpoint for the new programmatic API #896

Merged
merged 8 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ require (
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/carlmjohnson/requests v0.23.5 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudflare/circl v1.3.3 // indirect
Expand All @@ -88,6 +89,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/cel-go v0.18.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions backend/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR
github.com/bufbuild/protovalidate-go v0.3.1 h1:+jbgQXo+7SzttLbGwVClpHowXKEgwK1QG/bK4xrmUy8=
github.com/bufbuild/protovalidate-go v0.3.1/go.mod h1:oD/fAR3ojBAunOmY3SGFJ4jhILpUtnuIalI4Id9rluY=
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/carlmjohnson/requests v0.23.5 h1:NPANcAofwwSuC6SIMwlgmHry2V3pLrSqRiSBKYbNHHA=
github.com/carlmjohnson/requests v0.23.5/go.mod h1:zG9P28thdRnN61aD7iECFhH5iGGKX2jIjKQD9kqYH+o=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -272,6 +274,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down
11 changes: 11 additions & 0 deletions backend/pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package api

import (
"context"
"fmt"
"io/fs"
"math"

Expand Down Expand Up @@ -136,3 +138,12 @@ func (api *API) Start() {
api.Logger.Fatal("REST Server returned an error", zap.Error(err))
}
}

// Stop gracefully stops the API.
func (api *API) Stop(ctx context.Context) error {
err := api.server.Server.Shutdown(ctx)
if err != nil {
return fmt.Errorf("failed to shutdown HTTP server: %w", err)
}
return nil
}
30 changes: 30 additions & 0 deletions backend/pkg/api/connect/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
package errors

import (
"errors"
"strconv"

"connectrpc.com/connect"
"github.com/twmb/franz-go/pkg/kerr"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -65,3 +69,29 @@ func NewErrorInfo(reason string, metadata ...KeyVal) *errdetails.ErrorInfo {
Metadata: md,
}
}

// KeyValsFromKafkaError tries to check if a given error is a Kafka error.
// If this is the case, this function extracts the Kafka error code (int16)
// as well as the string enum of this error code and returns a Key-Value
// pair for each. These Key-Value pairs can be attached to the connect errors.
func KeyValsFromKafkaError(err error) []KeyVal {
if err == nil {
return []KeyVal{}
}

var kafkaErr *kerr.Error
if errors.As(err, &kafkaErr) {
return []KeyVal{
{
Key: "kafka_error_code",
Value: strconv.Itoa(int(kafkaErr.Code)),
},
{
Key: "kafka_error_message",
Value: kafkaErr.Message,
},
}
}

return []KeyVal{}
}
229 changes: 229 additions & 0 deletions backend/pkg/api/connect/integration/acl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

//go:build integration

package integration

import (
"context"
"fmt"
"net/http"
"testing"
"time"

"connectrpc.com/connect"
"github.com/carlmjohnson/requests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"

"github.com/redpanda-data/console/backend/pkg/protocmp"
v1alpha1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha1"
v1alpha1connect "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha1/dataplanev1alpha1connect"
)

func (s *APISuite) DeleteAllACLs(ctx context.Context) error {
acls := kadm.NewACLs().
Allow().
ResourcePatternType(kadm.ACLPatternAny).
Groups().
Topics().
TransactionalIDs().
Clusters().
Operations(kadm.OpAny)
if err := acls.ValidateDelete(); err != nil {
return fmt.Errorf("failed to validate delete acl request: %w", err)
}

_, err := s.kafkaAdminClient.DeleteACLs(ctx, acls)
return err
}

func (s *APISuite) newStdACLResource(resourceType v1alpha1.ACL_ResourceType, resourceName string, principal string) *v1alpha1.ListACLsResponse_Resource {
return &v1alpha1.ListACLsResponse_Resource{
ResourceType: resourceType,
ResourceName: resourceName,
ResourcePatternType: v1alpha1.ACL_RESOURCE_PATTERN_TYPE_LITERAL,
Acls: []*v1alpha1.ListACLsResponse_Policy{
{
Principal: principal,
Host: "*",
Operation: v1alpha1.ACL_OPERATION_ALL,
PermissionType: v1alpha1.ACL_PERMISSION_TYPE_ALLOW,
},
},
}
}

func (s *APISuite) TestListACLs() {
t := s.T()
require := require.New(t)
assert := assert.New(t)

t.Run("list ACLs with default request (connect-go)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// 1. Seed some ACLs
principal := "User:test"
resourceNamePrefix := "console-test-"

resourceNames := map[v1alpha1.ACL_ResourceType]string{
v1alpha1.ACL_RESOURCE_TYPE_GROUP: fmt.Sprintf("%vgroup", resourceNamePrefix),
v1alpha1.ACL_RESOURCE_TYPE_TOPIC: fmt.Sprintf("%vtopic", resourceNamePrefix),
v1alpha1.ACL_RESOURCE_TYPE_TRANSACTIONAL_ID: fmt.Sprintf("%vtransactional-id", resourceNamePrefix),
v1alpha1.ACL_RESOURCE_TYPE_CLUSTER: "kafka-cluster",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit or check, but this key doesn't seem to be used? Should it be used somewhere? or removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm we iterate over the map.

}

acls := kadm.NewACLs().
Allow(principal).
ResourcePatternType(kadm.ACLPatternLiteral).
Groups(resourceNames[v1alpha1.ACL_RESOURCE_TYPE_GROUP]).
Topics(resourceNames[v1alpha1.ACL_RESOURCE_TYPE_TOPIC]).
TransactionalIDs(resourceNames[v1alpha1.ACL_RESOURCE_TYPE_TRANSACTIONAL_ID]).
Clusters().
Operations(kadm.OpAll)

err := acls.ValidateCreate()
require.NoError(err)

_, err = s.kafkaAdminClient.CreateACLs(ctx, acls)
require.NoError(err)

defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err := s.DeleteAllACLs(ctx)
assert.NoError(err, "failed to delete all ACLs")
}()

// 2. List ACLs
client := v1alpha1connect.NewACLServiceClient(http.DefaultClient, s.httpAddress())
res, err := client.ListACLs(ctx, connect.NewRequest(&v1alpha1.ListACLsRequest{}))
require.NoError(err)
require.NotNil(res.Msg, "response message must not be nil")
require.GreaterOrEqual(len(res.Msg.Resources), 4)

// 3. From all ACLs returned by the Kafka API, filter only those that match
// the patterns of the seeded ACLs. There may be existing system-internal ACLs,
// see: https://github.com/redpanda-data/redpanda/issues/14373 .
filteredResources := make([]*v1alpha1.ListACLsResponse_Resource, 0)
for _, res := range res.Msg.Resources {
// Check if resource name matches a seeded ACL resource name
name, isMatch := resourceNames[res.ResourceType]
if !isMatch || name != res.ResourceName {
continue
}

// There should be no such case, but we want to fail loudly if it happens regardless.
// Otherwise, it may be unnecessarily hard to debug why this test failed.
require.Greater(len(res.Acls), 0, fmt.Sprintf("The ACLs list for a given resource group (name: %q, type: %q) is empty", res.ResourceName, res.ResourceType))

if res.Acls[0].Principal != principal {
continue
}
filteredResources = append(filteredResources, res)
}

// 4. Compare all filtered resources against expected ACLs
expectedResources := map[v1alpha1.ACL_ResourceType]*v1alpha1.ListACLsResponse_Resource{
v1alpha1.ACL_RESOURCE_TYPE_TOPIC: s.newStdACLResource(v1alpha1.ACL_RESOURCE_TYPE_TOPIC, resourceNames[v1alpha1.ACL_RESOURCE_TYPE_TOPIC], principal),
v1alpha1.ACL_RESOURCE_TYPE_TRANSACTIONAL_ID: s.newStdACLResource(v1alpha1.ACL_RESOURCE_TYPE_TRANSACTIONAL_ID, resourceNames[v1alpha1.ACL_RESOURCE_TYPE_TRANSACTIONAL_ID], principal),
v1alpha1.ACL_RESOURCE_TYPE_GROUP: s.newStdACLResource(v1alpha1.ACL_RESOURCE_TYPE_GROUP, resourceNames[v1alpha1.ACL_RESOURCE_TYPE_GROUP], principal),
v1alpha1.ACL_RESOURCE_TYPE_CLUSTER: s.newStdACLResource(v1alpha1.ACL_RESOURCE_TYPE_CLUSTER, resourceNames[v1alpha1.ACL_RESOURCE_TYPE_CLUSTER], principal),
}
assert.Len(filteredResources, len(expectedResources))

for _, res := range filteredResources {
protocmp.AssertProtoEqual(t, expectedResources[res.ResourceType], res)
delete(expectedResources, res.ResourceType)
}

// Check if all expected resources have been matched
assert.Len(expectedResources, 0)
})

t.Run("list ACLs with invalid filter (connect-go)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

client := v1alpha1connect.NewACLServiceClient(http.DefaultClient, s.httpAddress())
_, err := client.ListACLs(ctx, connect.NewRequest(&v1alpha1.ListACLsRequest{
Filter: &v1alpha1.ACL_Filter{
ResourceType: v1alpha1.ACL_RESOURCE_TYPE_ANY,
PermissionType: v1alpha1.ACL_PermissionType(999),
},
}))
assert.Error(err)
assert.Equal(connect.CodeInvalidArgument, connect.CodeOf(err))
})

t.Run("list ACLs with default request (http)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// 1. Seed some ACLs
principal := "User:test"
resourceNamePrefix := "console-test-"

resourceNames := map[v1alpha1.ACL_ResourceType]string{
v1alpha1.ACL_RESOURCE_TYPE_GROUP: fmt.Sprintf("%vgroup", resourceNamePrefix),
v1alpha1.ACL_RESOURCE_TYPE_TOPIC: fmt.Sprintf("%vtopic", resourceNamePrefix),
v1alpha1.ACL_RESOURCE_TYPE_TRANSACTIONAL_ID: fmt.Sprintf("%vtransactional-id", resourceNamePrefix),
v1alpha1.ACL_RESOURCE_TYPE_CLUSTER: "kafka-cluster",
}

acls := kadm.NewACLs().
Allow(principal).
ResourcePatternType(kadm.ACLPatternLiteral).
Groups(resourceNames[v1alpha1.ACL_RESOURCE_TYPE_GROUP]).
Topics(resourceNames[v1alpha1.ACL_RESOURCE_TYPE_TOPIC]).
TransactionalIDs(resourceNames[v1alpha1.ACL_RESOURCE_TYPE_TRANSACTIONAL_ID]).
Clusters().
Operations(kadm.OpAll)

err := acls.ValidateCreate()
require.NoError(err)

_, err = s.kafkaAdminClient.CreateACLs(ctx, acls)
require.NoError(err)

defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err := s.DeleteAllACLs(ctx)
assert.NoError(err, "failed to delete all ACLs")
}()

// 2. List ACLs via HTTP API
type listAclsResponse struct {
Resources []struct {
ResourceType string `json:"resource_type"`
ResourceName string `json:"resource_name"`
ResourcePatternType string `json:"resource_pattern_type"`
ACLs []struct {
Principal string `json:"principal"`
Host string `json:"host"`
Operation string `json:"operation"`
PermissionType string `json:"permission_type"`
} `json:"acls"`
} `json:"resources"`
}
var response listAclsResponse
err = requests.
URL(s.httpAddress() + "/v1alpha1/").
Path("acls").
CheckStatus(http.StatusOK). // Allows 2xx otherwise
ToJSON(&response).
Fetch(ctx)
require.NoError(err)
assert.GreaterOrEqual(len(response.Resources), 4)
})
}
Loading
Loading