Skip to content

Commit

Permalink
Dataplane api for mounting unmounting topics (#1457)
Browse files Browse the repository at this point in the history
This commit adds a couple of endpoints around mounting and
unmounting topics for Redpanda Clusters.

Jira: CONSOLE-26
  • Loading branch information
weeco authored Nov 6, 2024
1 parent e03e5c4 commit 0a68f5c
Show file tree
Hide file tree
Showing 17 changed files with 5,210 additions and 46 deletions.
2 changes: 1 addition & 1 deletion backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/redpanda-data/benthos/v4 v4.35.0
github.com/redpanda-data/common-go/api v0.0.0-20240918135346-6c838a508d64
github.com/redpanda-data/common-go/net v0.1.1-0.20240429123545-4da3d2b371f7
github.com/redpanda-data/common-go/rpadmin v0.1.9
github.com/redpanda-data/common-go/rpadmin v0.1.10
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.32.0
Expand Down
2 changes: 2 additions & 0 deletions backend/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ github.com/redpanda-data/common-go/net v0.1.1-0.20240429123545-4da3d2b371f7 h1:M
github.com/redpanda-data/common-go/net v0.1.1-0.20240429123545-4da3d2b371f7/go.mod h1:UJIi/yUxGOBYXUrfUsOkxfYxcb/ll7mZrwae/i+U2kc=
github.com/redpanda-data/common-go/rpadmin v0.1.9 h1:X5a95P7Dc+7EaidU7dusWJyiG3eJmk4zJtUttfvhmc4=
github.com/redpanda-data/common-go/rpadmin v0.1.9/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4=
github.com/redpanda-data/common-go/rpadmin v0.1.10 h1:3j/iRfqVglmmotE5bL0OF/3zwnrBwKZNHnnaTyA1bGU=
github.com/redpanda-data/common-go/rpadmin v0.1.10/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rickb777/period v1.0.6 h1:f4TcHBtL/4qa4D44eqgxs7785/kfLKUjRI7XYI2HCvk=
github.com/rickb777/period v1.0.6/go.mod h1:TKkPHI/WSyjjVdeVCyqwBoQg0Cdb/jRvnc8FFdq2cgw=
Expand Down
24 changes: 12 additions & 12 deletions backend/pkg/api/connect/service/topic/v1alpha2/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ import (
v1alpha2 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha2"
)

type kafkaClientMapper struct {
type mapper struct {
commonKafkaClientMapper common.KafkaClientMapper
}

// createTopicRequestToKafka maps the proto request to create a topic into a kmsg.CreateTopicsRequestTopic.
func (k *kafkaClientMapper) createTopicRequestToKafka(req *v1alpha2.CreateTopicRequest) *kmsg.CreateTopicsRequest {
func (k *mapper) createTopicRequestToKafka(req *v1alpha2.CreateTopicRequest) *kmsg.CreateTopicsRequest {
kafkaReq := kmsg.NewCreateTopicsRequest()
kafkaReq.ValidateOnly = req.ValidateOnly
kafkaReq.Topics = []kmsg.CreateTopicsRequestTopic{k.createTopicRequestTopicToKafka(req.Topic)}
return &kafkaReq
}

// createTopicRequestToKafka maps the proto message for creating a topic to kmsg.CreateTopicsRequestTopic.
func (*kafkaClientMapper) createTopicRequestTopicToKafka(topicReq *v1alpha2.CreateTopicRequest_Topic) kmsg.CreateTopicsRequestTopic {
func (*mapper) createTopicRequestTopicToKafka(topicReq *v1alpha2.CreateTopicRequest_Topic) kmsg.CreateTopicsRequestTopic {
partitionCount := int32(-1)
if topicReq.PartitionCount != nil {
partitionCount = *topicReq.PartitionCount
Expand Down Expand Up @@ -68,15 +68,15 @@ func (*kafkaClientMapper) createTopicRequestTopicToKafka(topicReq *v1alpha2.Crea
return req
}

func (*kafkaClientMapper) createTopicResponseTopicToProto(topic kmsg.CreateTopicsResponseTopic) *v1alpha2.CreateTopicResponse {
func (*mapper) createTopicResponseTopicToProto(topic kmsg.CreateTopicsResponseTopic) *v1alpha2.CreateTopicResponse {
return &v1alpha2.CreateTopicResponse{
Name: topic.Topic,
PartitionCount: topic.NumPartitions,
ReplicationFactor: int32(topic.ReplicationFactor),
}
}

func (*kafkaClientMapper) describeTopicConfigsToKafka(req *v1alpha2.GetTopicConfigurationsRequest) kmsg.DescribeConfigsRequest {
func (*mapper) describeTopicConfigsToKafka(req *v1alpha2.GetTopicConfigurationsRequest) kmsg.DescribeConfigsRequest {
configResource := kmsg.NewDescribeConfigsRequestResource()
configResource.ResourceType = kmsg.ConfigResourceTypeTopic
configResource.ResourceName = req.TopicName
Expand All @@ -90,7 +90,7 @@ func (*kafkaClientMapper) describeTopicConfigsToKafka(req *v1alpha2.GetTopicConf
return kafkaReq
}

func (k *kafkaClientMapper) describeTopicConfigsToProto(resources []kmsg.DescribeConfigsResponseResourceConfig) ([]*v1alpha2.Topic_Configuration, error) {
func (k *mapper) describeTopicConfigsToProto(resources []kmsg.DescribeConfigsResponseResourceConfig) ([]*v1alpha2.Topic_Configuration, error) {
mappedResources := make([]*v1alpha2.Topic_Configuration, len(resources))
for i, resource := range resources {
configType, err := k.commonKafkaClientMapper.ConfigTypeToProto(resource.ConfigType)
Expand Down Expand Up @@ -127,7 +127,7 @@ func (k *kafkaClientMapper) describeTopicConfigsToProto(resources []kmsg.Describ
return mappedResources, nil
}

func (*kafkaClientMapper) deleteTopicToKmsg(req *v1alpha2.DeleteTopicRequest) kmsg.DeleteTopicsRequest {
func (*mapper) deleteTopicToKmsg(req *v1alpha2.DeleteTopicRequest) kmsg.DeleteTopicsRequest {
kafkaReq := kmsg.NewDeleteTopicsRequest()
kafkaReq.TopicNames = []string{req.Name}
kafkaReq.Topics = []kmsg.DeleteTopicsRequestTopic{
Expand All @@ -139,7 +139,7 @@ func (*kafkaClientMapper) deleteTopicToKmsg(req *v1alpha2.DeleteTopicRequest) km
return kafkaReq
}

func (k *kafkaClientMapper) updateTopicConfigsToKafka(req *v1alpha2.UpdateTopicConfigurationsRequest) (*kmsg.IncrementalAlterConfigsRequest, error) {
func (k *mapper) updateTopicConfigsToKafka(req *v1alpha2.UpdateTopicConfigurationsRequest) (*kmsg.IncrementalAlterConfigsRequest, error) {
// We only have one resource (a single topic) whose configs we want to update incrementally
// The API allows to add many, independent resources of different types. Because we always only
// want to patch configs for a single Kafka topic, we can simplify the mapping here by hardcoding
Expand Down Expand Up @@ -170,7 +170,7 @@ func (k *kafkaClientMapper) updateTopicConfigsToKafka(req *v1alpha2.UpdateTopicC
return &kafkaReq, nil
}

func (k *kafkaClientMapper) kafkaMetadataToProto(metadata *kmsg.MetadataResponse) []*v1alpha2.ListTopicsResponse_Topic {
func (k *mapper) kafkaMetadataToProto(metadata *kmsg.MetadataResponse) []*v1alpha2.ListTopicsResponse_Topic {
topics := make([]*v1alpha2.ListTopicsResponse_Topic, len(metadata.Topics))
for i, topicMetadata := range metadata.Topics {
topics[i] = k.kafkaTopicMetadataToProto(topicMetadata)
Expand All @@ -179,7 +179,7 @@ func (k *kafkaClientMapper) kafkaMetadataToProto(metadata *kmsg.MetadataResponse
return topics
}

func (*kafkaClientMapper) kafkaTopicMetadataToProto(topicMetadata kmsg.MetadataResponseTopic) *v1alpha2.ListTopicsResponse_Topic {
func (*mapper) kafkaTopicMetadataToProto(topicMetadata kmsg.MetadataResponseTopic) *v1alpha2.ListTopicsResponse_Topic {
// We iterate through all partitions to figure out the replication factor,
// in case we get an error for the first partitions
replicationFactor := -1
Expand All @@ -197,7 +197,7 @@ func (*kafkaClientMapper) kafkaTopicMetadataToProto(topicMetadata kmsg.MetadataR
}
}

func (k *kafkaClientMapper) setTopicConfigurationsToKafka(req *v1alpha2.SetTopicConfigurationsRequest) *kmsg.AlterConfigsRequest {
func (k *mapper) setTopicConfigurationsToKafka(req *v1alpha2.SetTopicConfigurationsRequest) *kmsg.AlterConfigsRequest {
alterConfigResource := kmsg.NewAlterConfigsRequestResource()
alterConfigResource.ResourceType = kmsg.ConfigResourceTypeTopic
alterConfigResource.ResourceName = req.TopicName
Expand All @@ -212,7 +212,7 @@ func (k *kafkaClientMapper) setTopicConfigurationsToKafka(req *v1alpha2.SetTopic
return &kafkaReq
}

func (*kafkaClientMapper) setTopicConfigurationsResourceToKafka(req *v1alpha2.SetTopicConfigurationsRequest_SetConfiguration) kmsg.AlterConfigsRequestResourceConfig {
func (*mapper) setTopicConfigurationsResourceToKafka(req *v1alpha2.SetTopicConfigurationsRequest_SetConfiguration) kmsg.AlterConfigsRequestResourceConfig {
kafkaReq := kmsg.NewAlterConfigsRequestResourceConfig()
kafkaReq.Name = req.Name
kafkaReq.Value = req.Value
Expand Down
4 changes: 2 additions & 2 deletions backend/pkg/api/connect/service/topic/v1alpha2/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDeleteTopicRequestToKafka(t *testing.T) {
},
}

kafkaMapper := kafkaClientMapper{}
kafkaMapper := mapper{}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestDescribeTopicConfigsToKafka(t *testing.T) {
},
}

kafkaMapper := kafkaClientMapper{}
kafkaMapper := mapper{}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
24 changes: 14 additions & 10 deletions backend/pkg/api/connect/service/topic/v1alpha2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,20 @@ import (
"github.com/redpanda-data/console/backend/pkg/console"
v1alpha2 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha2"
"github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha2/dataplanev1alpha2connect"
"github.com/redpanda-data/console/backend/pkg/redpanda"
)

var _ dataplanev1alpha2connect.TopicServiceHandler = (*Service)(nil)

// Service that implements the UserServiceHandler interface. This includes all
// RPCs to manage Redpanda or Kafka users.
type Service struct {
cfg *config.Config
logger *zap.Logger
consoleSvc console.Servicer
mapper kafkaClientMapper
defaulter defaulter
cfg *config.Config
logger *zap.Logger
consoleSvc console.Servicer
redpandaSvc *redpanda.Service
mapper mapper
defaulter defaulter
}

// ListTopics lists all Kafka topics with their most important metadata.
Expand Down Expand Up @@ -335,13 +337,15 @@ func (s *Service) SetTopicConfigurations(ctx context.Context, req *connect.Reque
func NewService(cfg *config.Config,
logger *zap.Logger,
consoleSvc console.Servicer,
redpandaSvc *redpanda.Service,
) *Service {
return &Service{
cfg: cfg,
logger: logger,
consoleSvc: consoleSvc,
mapper: kafkaClientMapper{},
defaulter: defaulter{},
cfg: cfg,
logger: logger,
consoleSvc: consoleSvc,
redpandaSvc: redpandaSvc,
mapper: mapper{},
defaulter: defaulter{},
}
}

Expand Down
11 changes: 10 additions & 1 deletion backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (api *API) setupConnectWithGRPCGateway(r chi.Router) {
// v1alpha2

aclSvc := apiaclsvc.NewService(api.Cfg, api.Logger.Named("kafka_service"), api.ConsoleSvc)
topicSvc := topicsvc.NewService(api.Cfg, api.Logger.Named("topic_service"), api.ConsoleSvc)
topicSvc := topicsvc.NewService(api.Cfg, api.Logger.Named("topic_service"), api.ConsoleSvc, api.RedpandaSvc)
var userSvc dataplanev1alpha2connect.UserServiceHandler = apiusersvc.NewService(api.Cfg, api.Logger.Named("user_service"), api.RedpandaSvc, api.ConsoleSvc, api.Hooks.Authorization.IsProtectedKafkaUser)
transformSvc := transformsvc.NewService(api.Cfg, api.Logger.Named("transform_service"), api.RedpandaSvc, v)
kafkaConnectSvc := apikafkaconnectsvc.NewService(api.Cfg, api.Logger.Named("kafka_connect_service"), api.ConnectSvc)
Expand Down Expand Up @@ -149,6 +149,7 @@ func (api *API) setupConnectWithGRPCGateway(r chi.Router) {
dataplanev1alpha2connect.UserServiceName: userSvc,
dataplanev1alpha2connect.TransformServiceName: transformSvc,
dataplanev1alpha2connect.KafkaConnectServiceName: kafkaConnectSvc,
dataplanev1alpha2connect.CloudStorageServiceName: dataplanev1alpha2connect.UnimplementedCloudStorageServiceHandler{},
},
})

Expand Down Expand Up @@ -224,6 +225,9 @@ func (api *API) setupConnectWithGRPCGateway(r chi.Router) {
kafkaConnectSvcPath, kafkaConnectSvcHandler := dataplanev1alpha2connect.NewKafkaConnectServiceHandler(
hookOutput.Services[dataplanev1alpha2connect.KafkaConnectServiceName].(dataplanev1alpha2connect.KafkaConnectServiceHandler),
connect.WithInterceptors(hookOutput.Interceptors...))
cloudStorageSvcPath, cloudStorageSvcHandler := dataplanev1alpha2connect.NewCloudStorageServiceHandler(
hookOutput.Services[dataplanev1alpha2connect.CloudStorageServiceName].(dataplanev1alpha2connect.CloudStorageServiceHandler),
connect.WithInterceptors(hookOutput.Interceptors...))

ossServices := []ConnectService{
{
Expand Down Expand Up @@ -301,6 +305,11 @@ func (api *API) setupConnectWithGRPCGateway(r chi.Router) {
MountPath: kafkaConnectSvcPath,
Handler: kafkaConnectSvcHandler,
},
{
ServiceName: dataplanev1alpha2connect.CloudStorageServiceName,
MountPath: cloudStorageSvcPath,
Handler: cloudStorageSvcHandler,
},
}

// Order matters. OSS services first, so Enterprise handlers override OSS.
Expand Down
Loading

0 comments on commit 0a68f5c

Please sign in to comment.