Skip to content

Commit

Permalink
[Terrarium - Providers]: Implementation of Upload Feature - 3 (#77)
Browse files Browse the repository at this point in the history
* implementation of upload feature in storage service + unit tests

* go fmt

* fix GetName for req

* updated storage client

* enable upload in gateway

* go fmt

* update endpoint for provider

* removed span.addEvents

---------

Co-authored-by: Anjani Kumar Srivastava <[email protected]>
  • Loading branch information
anjanikshree12 and Anjani Kumar Srivastava authored Jun 21, 2024
1 parent b6cf43e commit 03131a6
Show file tree
Hide file tree
Showing 4 changed files with 785 additions and 1 deletion.
154 changes: 154 additions & 0 deletions internal/common/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
moduleServices "github.com/terrariumcloud/terrarium/internal/module/services"
"github.com/terrariumcloud/terrarium/internal/module/services/storage"
providerServices "github.com/terrariumcloud/terrarium/internal/provider/services"
providerStorage "github.com/terrariumcloud/terrarium/internal/provider/services/storage"
release "github.com/terrariumcloud/terrarium/internal/release/services"
terrariumModule "github.com/terrariumcloud/terrarium/pkg/terrarium/module"
terrariumProvider "github.com/terrariumcloud/terrarium/pkg/terrarium/provider"
Expand Down Expand Up @@ -536,3 +537,156 @@ func (gw *TerrariumGrpcGateway) EndProviderWithClient(ctx context.Context, reque
return nil, UnknownVersionManagerActionError
}
}

// UploadProviderBinaryZip uploads provider binary zip to Storage service
func (gw *TerrariumGrpcGateway) UploadProviderBinaryZip(server terrariumProvider.ProviderPublisher_UploadProviderBinaryZipServer) error {
return gw.UploadProviderBinaryZipWithClient(server, gw.providerStorageClient)
}

// UploadSourceZipWithClient calls UploadProviderBinaryZip on Storage client
func (gw *TerrariumGrpcGateway) UploadProviderBinaryZipWithClient(server terrariumProvider.ProviderPublisher_UploadProviderBinaryZipServer, client providerServices.StorageClient) error {
upstream, upErr := client.UploadProviderBinaryZip(server.Context())
ctx := server.Context()
span := trace.SpanFromContext(ctx)
if upErr != nil {
log.Println(upErr)
span.RecordError(upErr)
return upErr
}

for {
req, err := server.Recv()

if err == io.EOF {
res, upErr := upstream.CloseAndRecv()

if upErr != nil {
return upErr
}
log.Println("Done <= Store")
return server.SendAndClose(res)
}

if err != nil {
log.Printf("Failed to receive: %v", err)
span.RecordError(err)
return providerStorage.ReceiveBinaryZipError
}

upErr = upstream.Send(req)

if upErr == io.EOF {
log.Println("Done <= Store")
upstream.CloseSend()
return server.SendAndClose(providerStorage.BinaryZipUploaded)
}

if upErr != nil {
log.Printf("Failed to send: %v", upErr)
span.RecordError(upErr)
return upErr
}
}
}

// UploadShasum uploads shasum file to Storage service
func (gw *TerrariumGrpcGateway) UploadShasum(server terrariumProvider.ProviderPublisher_UploadShasumServer) error {
return gw.UploadShasumWithClient(server, gw.providerStorageClient)
}

// UploadShasumWithClient calls UploadShasum on Storage client
func (gw *TerrariumGrpcGateway) UploadShasumWithClient(server terrariumProvider.ProviderPublisher_UploadShasumServer, client providerServices.StorageClient) error {
upstream, upErr := client.UploadShasum(server.Context())
ctx := server.Context()
span := trace.SpanFromContext(ctx)
if upErr != nil {
log.Println(upErr)
span.RecordError(upErr)
return upErr
}

for {
req, err := server.Recv()

if err == io.EOF {
res, upErr := upstream.CloseAndRecv()

if upErr != nil {
return upErr
}
log.Println("Done <= Store")
return server.SendAndClose(res)
}

if err != nil {
log.Printf("Failed to receive: %v", err)
span.RecordError(err)
return providerStorage.ReceiveShasumError
}

upErr = upstream.Send(req)

if upErr == io.EOF {
log.Println("Done <= Store")
upstream.CloseSend()
return server.SendAndClose(providerStorage.ShasumUploaded)
}

if upErr != nil {
log.Printf("Failed to send: %v", upErr)
span.RecordError(upErr)
return upErr
}
}
}

// UploadShasumSignature uploads shasum signature file to Storage service
func (gw *TerrariumGrpcGateway) UploadShasumSignature(server terrariumProvider.ProviderPublisher_UploadShasumSignatureServer) error {
return gw.UploadShasumSignatureWithClient(server, gw.providerStorageClient)
}

// UploadShasumSignatureWithClient calls UploadShasumSignature on Storage client
func (gw *TerrariumGrpcGateway) UploadShasumSignatureWithClient(server terrariumProvider.ProviderPublisher_UploadShasumSignatureServer, client providerServices.StorageClient) error {
upstream, upErr := client.UploadShasumSignature(server.Context())
ctx := server.Context()
span := trace.SpanFromContext(ctx)
if upErr != nil {
log.Println(upErr)
span.RecordError(upErr)
return upErr
}

for {
req, err := server.Recv()

if err == io.EOF {
res, upErr := upstream.CloseAndRecv()

if upErr != nil {
return upErr
}
log.Println("Done <= Store")
return server.SendAndClose(res)
}

if err != nil {
log.Printf("Failed to receive: %v", err)
span.RecordError(err)
return providerStorage.ReceiveShasumSigError
}

upErr = upstream.Send(req)

if upErr == io.EOF {
log.Println("Done <= Store")
upstream.CloseSend()
return server.SendAndClose(providerStorage.ShasumSigUploaded)
}

if upErr != nil {
log.Printf("Failed to send: %v", upErr)
span.RecordError(upErr)
return upErr
}
}
}
Loading

0 comments on commit 03131a6

Please sign in to comment.