Skip to content

Commit

Permalink
Add new All in one command to make development easier (#59)
Browse files Browse the repository at this point in the history
* Introduce GRPC client helpers and pass *Client interfaces around

Make unit testing easier by removing the Grpc code from the majority of places and instead pass around *Client interfaces.

Add GRPC Client helper functions which do the same thing we had been doing before, ie on call to an interface function create a GRPC connection, do the request and return.

* Fix some namings

* Fix issues with closing grpc conn too soon

* Update client.go

* Add remaining client implementations

* Add all-in-one command

* Move gateway to accepting clients
  • Loading branch information
adcharre authored Dec 6, 2023
1 parent 8e0263c commit 3972462
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 492 deletions.
159 changes: 159 additions & 0 deletions cmd/allInOne.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
Copyright © 2023 NAME HERE <EMAIL ADDRESS>
*/
package cmd

import (
"github.com/gorilla/mux"
"github.com/spf13/cobra"
services2 "github.com/terrariumcloud/terrarium/internal/module/services"
"github.com/terrariumcloud/terrarium/internal/module/services/dependency_manager"
"github.com/terrariumcloud/terrarium/internal/module/services/gateway"
"github.com/terrariumcloud/terrarium/internal/module/services/registrar"
storage2 "github.com/terrariumcloud/terrarium/internal/module/services/storage"
"github.com/terrariumcloud/terrarium/internal/module/services/tag_manager"
"github.com/terrariumcloud/terrarium/internal/module/services/version_manager"
"github.com/terrariumcloud/terrarium/internal/release/services/release"
"github.com/terrariumcloud/terrarium/internal/restapi/browse"
modulesv1 "github.com/terrariumcloud/terrarium/internal/restapi/modules/v1"
"github.com/terrariumcloud/terrarium/internal/storage"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"log"
"net"
"net/http"
)

const (
allInOneInternalEndpoint = "localhost:30001"
allInOneGrpcGatewayEndpoint = "0.0.0.0:3001"
allInOneHTTPEndpoint = "0.0.0.0:8080"
)

type allInOneRestHandler struct {
router *mux.Router
}

func (a allInOneRestHandler) GetHttpHandler(mountPath string) http.Handler {
return a.router
}

// allInOneCmd represents the allInOne command
var allInOneCmd = &cobra.Command{
Use: "all-in-one",
Short: "Runs all the services in a single command.",
Long: `This runs all the micro-services as part of a single process, useful for developing and for trying out Terrarium.`,
Run: func(cmd *cobra.Command, args []string) {
dependencyServiceServer := &dependency_manager.DependencyManagerService{
Db: storage.NewDynamoDbClient(awsAccessKey, awsSecretKey, awsRegion),
ModuleTable: dependency_manager.ModuleDependenciesTableName,
ModuleSchema: dependency_manager.GetDependenciesSchema(dependency_manager.ModuleDependenciesTableName),
ContainerTable: dependency_manager.ContainerDependenciesTableName,
ContainerSchema: dependency_manager.GetDependenciesSchema(dependency_manager.ContainerDependenciesTableName),
}

registrarServiceServer := &registrar.RegistrarService{
Db: storage.NewDynamoDbClient(awsAccessKey, awsSecretKey, awsRegion),
Table: registrar.RegistrarTableName,
Schema: registrar.GetModulesSchema(registrar.RegistrarTableName),
}

storageServiceServer := &storage2.StorageService{
Client: storage.NewS3Client(awsAccessKey, awsSecretKey, awsRegion),
BucketName: storage2.BucketName,
Region: awsRegion,
}

tagManagerServer := &tag_manager.TagManagerService{
Db: storage.NewDynamoDbClient(awsAccessKey, awsSecretKey, awsRegion),
Table: tag_manager.TagTableName,
Schema: tag_manager.GetTagsSchema(tag_manager.TagTableName),
}

releaseServiceServer := &release.ReleaseService{
Db: storage.NewDynamoDbClient(awsAccessKey, awsSecretKey, awsRegion),
Table: release.ReleaseTableName,
Schema: release.GetReleaseSchema(release.ReleaseTableName),
}

versionManagerServer := &version_manager.VersionManagerService{
Db: storage.NewDynamoDbClient(awsAccessKey, awsSecretKey, awsRegion),
Table: version_manager.VersionsTableName,
Schema: version_manager.GetModuleVersionsSchema(version_manager.VersionsTableName),
ReleaseService: release.NewPublisherGrpcClient(allInOneInternalEndpoint),
}

services := []services2.Service{
dependencyServiceServer,
registrarServiceServer,
storageServiceServer,
tagManagerServer,
releaseServiceServer,
versionManagerServer,
}

otelShutdown := initOpenTelemetry("all-in-one")
defer otelShutdown()

startAllInOneGrpcServices(services, allInOneInternalEndpoint)

gatewayServer := gateway.New(registrar.NewRegistrarGrpcClient(allInOneInternalEndpoint),
tag_manager.NewTagManagerGrpcClient(allInOneInternalEndpoint),
version_manager.NewVersionManagerGrpcClient(allInOneInternalEndpoint),
storage2.NewStorageGrpcClient(allInOneInternalEndpoint),
dependency_manager.NewDependencyManagerGrpcClient(allInOneInternalEndpoint),
release.NewPublisherGrpcClient(allInOneInternalEndpoint),
)
startAllInOneGrpcServices([]services2.Service{gatewayServer}, allInOneGrpcGatewayEndpoint)

restAPIServer := browse.New(registrar.NewRegistrarGrpcClient(allInOneInternalEndpoint),
version_manager.NewVersionManagerGrpcClient(allInOneInternalEndpoint),
release.NewBrowseGrpcClient(allInOneInternalEndpoint))

modulesAPIServer := modulesv1.New(version_manager.NewVersionManagerGrpcClient(allInOneInternalEndpoint), storage2.NewStorageGrpcClient(allInOneInternalEndpoint))

router := mux.NewRouter()
router.PathPrefix("/modules").Handler(modulesAPIServer.GetHttpHandler("/modules"))
router.PathPrefix("/").Handler(restAPIServer.GetHttpHandler(""))

endpoint = allInOneHTTPEndpoint
startRESTAPIService("browse", "", allInOneRestHandler{router: router})

},
}

func init() {
rootCmd.AddCommand(allInOneCmd)
allInOneCmd.Flags().StringVar(&storage2.BucketName, "storage-bucket", storage2.DefaultBucketName, "Module bucket name")
allInOneCmd.Flags().StringVar(&version_manager.VersionsTableName, "version-table", version_manager.DefaultVersionsTableName, "Module versions table name")
allInOneCmd.Flags().StringVar(&tag_manager.TagTableName, "tag-table", tag_manager.DefaultTagTableName, "Module tags table name")
allInOneCmd.Flags().StringVar(&release.ReleaseTableName, "release-table", release.DefaultReleaseTableName, "Releases table name")
allInOneCmd.Flags().StringVar(&registrar.RegistrarTableName, "registrar-table", registrar.DefaultRegistrarTableName, "Module Registrar table name")
allInOneCmd.Flags().StringVar(&dependency_manager.ModuleDependenciesTableName, "module-dependencies-table", dependency_manager.DefaultModuleDependenciesTableName, "Module dependencies table name")
allInOneCmd.Flags().StringVar(&dependency_manager.ContainerDependenciesTableName, "container-dependencies-table", dependency_manager.DefaultContainerDependenciesTableName, "Module container dependencies table name")
}

func startAllInOneGrpcServices(services []services2.Service, endpoint string) {
listener, err := net.Listen("tcp4", endpoint)
if err != nil {
log.Fatalf("Failed to start: %v", err)
}

grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
)

for _, service := range services {
if err := service.RegisterWithServer(grpcServer); err != nil {
log.Fatalf("Failed to start: %v", err)
}
}

go func() {
log.Printf("Listening at %s", endpoint)
if err := grpcServer.Serve(listener); err != nil {
log.Fatalf("Failed: %v", err)
}
}()
}
8 changes: 7 additions & 1 deletion cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ func init() {

func runGateway(cmd *cobra.Command, args []string) {

gatewayServer := &gateway.TerrariumGrpcGateway{}
gatewayServer := gateway.New(registrar.NewRegistrarGrpcClient(registrar.RegistrarServiceEndpoint),
tag_manager.NewTagManagerGrpcClient(tag_manager.TagManagerEndpoint),
version_manager.NewVersionManagerGrpcClient(version_manager.VersionManagerEndpoint),
storage.NewStorageGrpcClient(storage.StorageServiceEndpoint),
dependency_manager.NewDependencyManagerGrpcClient(dependency_manager.DependencyManagerEndpoint),
release.NewPublisherGrpcClient(release.ReleaseServiceEndpoint),
)

startGRPCService("api-gateway", gatewayServer)
}
53 changes: 24 additions & 29 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var endpoint = defaultEndpoint
var awsAccessKey string
var awsSecretKey string
var awsRegion string
var opentelemetryInited = false

var rootCmd = &cobra.Command{
Use: "terrarium",
Expand Down Expand Up @@ -89,9 +90,8 @@ func newServiceResource(name string) *resource.Resource {
return resources
}

func startGRPCService(name string, service services.Service) {

log.Printf("Starting %s", name)
func initOpenTelemetry(name string) func() {
opentelemetryInited = true

ctx := context.Background()

Expand All @@ -106,16 +106,28 @@ func startGRPCService(name string, service services.Service) {
trace.WithBatcher(traceExporter),
trace.WithResource(newServiceResource(name)),
)
defer func() {
otel.SetTracerProvider(tracerProvider)
return func() {
if err := tracerProvider.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}()
otel.SetTracerProvider(tracerProvider)
}
} else {
otel.SetTracerProvider(noop.NewNoopTracerProvider())
}
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return func() {
// No-op
}
}

func startGRPCService(name string, service services.Service) {
log.Printf("Starting %s", name)
if !opentelemetryInited {
otelShutdown := initOpenTelemetry(name)
defer otelShutdown()
}

listener, err := net.Listen("tcp4", endpoint)
if err != nil {
log.Fatalf("Failed to start: %v", err)
Expand All @@ -138,33 +150,16 @@ func startGRPCService(name string, service services.Service) {

func startRESTAPIService(name, mountPath string, rootHandler restapi.RESTAPIHandler) {
log.Printf("Starting %s", name)

ctx := context.Background()

traceExporter, err := newTraceExporter(ctx)
if err != nil {
log.Fatal(err)
if !opentelemetryInited {
otelShutdown := initOpenTelemetry(name)
defer otelShutdown()
}

if traceExporter != nil {
tracerProvider := trace.NewTracerProvider(
trace.WithSampler(trace.AlwaysSample()),
trace.WithBatcher(traceExporter),
trace.WithResource(newServiceResource(name)),
)
defer func() {
if err := tracerProvider.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}()
otel.SetTracerProvider(tracerProvider)
} else {
otel.SetTracerProvider(noop.NewNoopTracerProvider())
log.Printf("Listening on %s", endpoint)
if err := http.ListenAndServe(endpoint, rootHandler.GetHttpHandler(mountPath)); err != nil {
log.Fatalf("Failed: %v", err)
}
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))

log.Println(fmt.Printf("Listening on %s", endpoint))
log.Fatal(http.ListenAndServe(endpoint, rootHandler.GetHttpHandler(mountPath)))
}

// Execute root command
Expand Down
60 changes: 60 additions & 0 deletions internal/module/services/dependency_manager/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package dependency_manager

import (
"context"
"github.com/terrariumcloud/terrarium/internal/module/services"
"github.com/terrariumcloud/terrarium/pkg/terrarium/module"
"google.golang.org/grpc"
)

type dependencyManagerGrpcClient struct {
endpoint string
}

func NewDependencyManagerGrpcClient(endpoint string) services.DependencyManagerClient {
return &dependencyManagerGrpcClient{endpoint: endpoint}
}

func (d dependencyManagerGrpcClient) RegisterModuleDependencies(ctx context.Context, in *module.RegisterModuleDependenciesRequest, opts ...grpc.CallOption) (*module.Response, error) {
if conn, err := services.CreateGRPCConnection(d.endpoint); err != nil {
return nil, err
} else {
defer func() { _ = conn.Close() }()

client := services.NewDependencyManagerClient(conn)
return client.RegisterModuleDependencies(ctx, in, opts...)
}
}

func (d dependencyManagerGrpcClient) RegisterContainerDependencies(ctx context.Context, in *module.RegisterContainerDependenciesRequest, opts ...grpc.CallOption) (*module.Response, error) {
if conn, err := services.CreateGRPCConnection(d.endpoint); err != nil {
return nil, err
} else {
defer func() { _ = conn.Close() }()

client := services.NewDependencyManagerClient(conn)
return client.RegisterContainerDependencies(ctx, in, opts...)
}
}

func (d dependencyManagerGrpcClient) RetrieveContainerDependencies(ctx context.Context, in *module.RetrieveContainerDependenciesRequestV2, opts ...grpc.CallOption) (services.DependencyManager_RetrieveContainerDependenciesClient, error) {
if conn, err := services.CreateGRPCConnection(d.endpoint); err != nil {
return nil, err
} else {
defer func() { _ = conn.Close() }()

client := services.NewDependencyManagerClient(conn)
return client.RetrieveContainerDependencies(ctx, in, opts...)
}
}

func (d dependencyManagerGrpcClient) RetrieveModuleDependencies(ctx context.Context, in *module.RetrieveModuleDependenciesRequest, opts ...grpc.CallOption) (services.DependencyManager_RetrieveModuleDependenciesClient, error) {
if conn, err := services.CreateGRPCConnection(d.endpoint); err != nil {
return nil, err
} else {
defer func() { _ = conn.Close() }()

client := services.NewDependencyManagerClient(conn)
return client.RetrieveModuleDependencies(ctx, in, opts...)
}
}
Loading

0 comments on commit 3972462

Please sign in to comment.