Skip to content

Commit

Permalink
feat(gateway): allow tenant to recover manifest of active lease (#251)
Browse files Browse the repository at this point in the history
refs akash-network/support#227

---------

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Dec 2, 2024
1 parent 1b957e8 commit 3b1cf60
Show file tree
Hide file tree
Showing 35 changed files with 129 additions and 131 deletions.
2 changes: 1 addition & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ linters:
- staticcheck
- revive
- gosec
- exportloopref
- copyloopvar
- prealloc
linters-settings:
gocritic:
Expand Down
18 changes: 9 additions & 9 deletions bidengine/pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ func (fp scalePricing) CalculatePrice(_ context.Context, req Request) (sdk.DecCo

endpointTotal := decimal.NewFromInt(0)
ipTotal := decimal.NewFromInt(0).Add(fp.ipScale)
ipTotal = ipTotal.Mul(decimal.NewFromInt(int64(util.GetEndpointQuantityOfResourceGroup(req.GSpec, atypes.Endpoint_LEASED_IP))))
ipTotal = ipTotal.Mul(decimal.NewFromInt(int64(util.GetEndpointQuantityOfResourceGroup(req.GSpec, atypes.Endpoint_LEASED_IP)))) // nolint: gosec

// iterate over everything & sum it up
for _, group := range req.GSpec.Resources {
groupCount := decimal.NewFromInt(int64(group.Count)) // Expand uint32 to int64
groupCount := decimal.NewFromInt(int64(group.Count)) // // nolint: gosec

cpuQuantity := decimal.NewFromBigInt(group.Resources.CPU.Units.Val.BigInt(), 0)
cpuQuantity = cpuQuantity.Mul(groupCount)
Expand All @@ -174,7 +174,7 @@ func (fp scalePricing) CalculatePrice(_ context.Context, req Request) (sdk.DecCo
total, exists := storageTotal[storageClass]

if !exists {
return sdk.DecCoin{}, errors.Wrapf(errNoPriceScaleForStorageClass, storageClass)
return sdk.DecCoin{}, errors.Wrapf(errNoPriceScaleForStorageClass, "%s", storageClass)
}

total = total.Add(storageQuantity)
Expand Down Expand Up @@ -250,14 +250,14 @@ func MakeRandomRangePricing() (BidPricingStrategy, error) {
}

func (randomRangePricing) CalculatePrice(_ context.Context, req Request) (sdk.DecCoin, error) {
min, max := calculatePriceRange(req.GSpec)
if min.IsEqual(max) {
return max, nil
minPrice, maxPrice := calculatePriceRange(req.GSpec)
if minPrice.IsEqual(maxPrice) {
return maxPrice, nil
}

const scale = 10000

delta := max.Amount.Sub(min.Amount).Mul(sdk.NewDec(scale))
delta := maxPrice.Amount.Sub(minPrice.Amount).Mul(sdk.NewDec(scale))

minbid := delta.TruncateInt64()
if minbid < 1 {
Expand All @@ -269,8 +269,8 @@ func (randomRangePricing) CalculatePrice(_ context.Context, req Request) (sdk.De
}

scaledValue := sdk.NewDecFromBigInt(val).QuoInt64(scale).QuoInt64(100)
amount := min.Amount.Add(scaledValue)
return sdk.NewDecCoinFromDec(min.Denom, amount), nil
amount := minPrice.Amount.Add(scaledValue)
return sdk.NewDecCoinFromDec(minPrice.Denom, amount), nil
}

func calculatePriceRange(gspec *dtypes.GroupSpec) (sdk.DecCoin, sdk.DecCoin) {
Expand Down
10 changes: 4 additions & 6 deletions bidengine/pricing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func Test_ScalePricingOnMemory(t *testing.T) {
price, err := pricing.CalculatePrice(context.Background(), req)
require.NoError(t, err)

expectedPrice := testutil.AkashDecCoin(t, int64(memoryScale*memoryQuantity))
expectedPrice := testutil.AkashDecCoin(t, int64(memoryScale*memoryQuantity)) // nolint: gosec
require.Equal(t, expectedPrice, price)
}

Expand Down Expand Up @@ -258,7 +258,7 @@ func Test_ScalePricingOnStorage(t *testing.T) {
price, err := pricing.CalculatePrice(context.Background(), req)
require.NoError(t, err)

decNearly(t, price.Amount, int64(storageScale*storageQuantity))
decNearly(t, price.Amount, int64(storageScale*storageQuantity)) // nolint: gosec
}

func Test_ScalePricingByCountOfResources(t *testing.T) {
Expand All @@ -282,12 +282,12 @@ func Test_ScalePricingByCountOfResources(t *testing.T) {
require.NoError(t, err)

require.NoError(t, err)
decNearly(t, firstPrice.Amount, int64(storageScale*storageQuantity))
decNearly(t, firstPrice.Amount, int64(storageScale*storageQuantity)) // nolint: gosec

gspec.Resources[0].Count = 2
secondPrice, err := pricing.CalculatePrice(context.Background(), req)
require.NoError(t, err)
decNearly(t, secondPrice.Amount, 2*int64(storageScale*storageQuantity))
decNearly(t, secondPrice.Amount, 2*int64(storageScale*storageQuantity)) // nolint: gosec
}

func Test_ScalePricingForIPs(t *testing.T) {
Expand Down Expand Up @@ -956,8 +956,6 @@ func Test_newDataForScript_GPUWildcard(t *testing.T) {
}

for _, c := range cases {
c := c

t.Run(c.desc, func(t *testing.T) {
d := newDataForScript(c.r)
assert.NotEmpty(t, d)
Expand Down
4 changes: 2 additions & 2 deletions bidengine/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ loop:
}
case ch := <-s.statusch:
ch <- &Status{
Orders: uint32(len(s.orders)),
Orders: uint32(len(s.orders)), // nolint: gosec
}
case order := <-s.drainch:
// child done
Expand All @@ -237,7 +237,7 @@ loop:
ordersCounter.WithLabelValues("stop").Inc()
trySignal()
case <-signalch:
bus.Pub(provider.BidEngineStatus{Orders: uint32(len(s.orders))}, []string{ptypes.PubSubTopicBidengineStatus}, tpubsub.WithRetain())
bus.Pub(provider.BidEngineStatus{Orders: uint32(len(s.orders))}, []string{ptypes.PubSubTopicBidengineStatus}, tpubsub.WithRetain()) // nolint: gosec
}
s.updateOrderManagerGauge()
}
Expand Down
2 changes: 1 addition & 1 deletion bidengine/shellscript.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (ssp shellScriptPricing) CalculatePrice(ctx context.Context, r Request) (sd

processCtx, cancel := context.WithTimeout(ctx, ssp.runtimeLimit)
defer cancel()
cmd := exec.CommandContext(processCtx, ssp.path) //nolint:gosec
cmd := exec.CommandContext(processCtx, ssp.path) // nolint: gosec
cmd.Stdin = buf
outputBuf := &bytes.Buffer{}
cmd.Stdout = outputBuf
Expand Down
4 changes: 2 additions & 2 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func (c *nullClient) LeaseStatus(_ context.Context, lid mtypes.LeaseID) (map[str
for _, svc := range lease.group.Services {
resp[svc.Name] = &ctypes.ServiceStatus{
Name: svc.Name,
Available: int32(svc.Count),
Total: int32(svc.Count),
Available: int32(svc.Count), // nolint: gosec
Total: int32(svc.Count), // nolint: gosec
}
}

Expand Down
2 changes: 1 addition & 1 deletion cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (is *inventoryService) reserve(order mtypes.OrderID, resources dtypes.Resou
}
}

func (is *inventoryService) unreserve(order mtypes.OrderID) error { // nolint:golint,unparam
func (is *inventoryService) unreserve(order mtypes.OrderID) error { // nolint: golint,unparam
ch := make(chan inventoryResponse, 1)
req := inventoryRequest{
order: order,
Expand Down
12 changes: 6 additions & 6 deletions cluster/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestInventory_ClusterDeploymentDeployed(t *testing.T) {
}

// availableExternalEndpoints should be consumed because of the deployed reservation
require.Equal(t, uint(1000-countOfRandomPortService), inv.availableExternalPorts)
require.Equal(t, uint(1000-countOfRandomPortService), inv.availableExternalPorts) // nolint: gosec

// Unreserving the allocated reservation should reclaim the availableExternalEndpoints
err = inv.unreserve(lid.OrderID())
Expand Down Expand Up @@ -464,8 +464,8 @@ func TestInventory_ReserveIPUnavailableWithIPOperator(t *testing.T) {

ipQty := testutil.RandRangeInt(1, 100)
mockIP.On("GetIPAddressUsage", mock.Anything).Return(cip.AddressUsage{
Available: uint(ipQty),
InUse: uint(ipQty),
Available: uint(ipQty), // nolint: gosec
InUse: uint(ipQty), // nolint: gosec
}, nil)
mockIP.On("Stop")

Expand Down Expand Up @@ -520,8 +520,8 @@ func TestInventory_ReserveIPAvailableWithIPOperator(t *testing.T) {

ipQty := testutil.RandRangeInt(5, 10)
mockIP.On("GetIPAddressUsage", mock.Anything).Return(cip.AddressUsage{
Available: uint(ipQty),
InUse: uint(ipQty - 1), // not all in use
Available: uint(ipQty), // nolint: gosec
InUse: uint(ipQty - 1), // nolint: gosec
}, nil)

ipAddrStatusCalled := make(chan struct{}, 2)
Expand Down Expand Up @@ -717,5 +717,5 @@ func TestInventory_OverReservations(t *testing.T) {
<-inv.lc.Done()

// No ports used yet
require.Equal(t, uint(1000-countOfRandomPortService), inv.availableExternalPorts)
require.Equal(t, uint(1000-countOfRandomPortService), inv.availableExternalPorts) // nolint: gosec
}
22 changes: 11 additions & 11 deletions cluster/kube/builder/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (b *Workload) container() corev1.Container {

if cpu := service.Resources.CPU; cpu != nil {
requestedCPU := sdlutil.ComputeCommittedResources(b.settings.CPUCommitLevel, cpu.Units)
kcontainer.Resources.Requests[corev1.ResourceCPU] = resource.NewScaledQuantity(int64(requestedCPU.Value()), resource.Milli).DeepCopy()
kcontainer.Resources.Limits[corev1.ResourceCPU] = resource.NewScaledQuantity(int64(cpu.Units.Value()), resource.Milli).DeepCopy()
kcontainer.Resources.Requests[corev1.ResourceCPU] = resource.NewScaledQuantity(int64(requestedCPU.Value()), resource.Milli).DeepCopy() // nolint: gosec
kcontainer.Resources.Limits[corev1.ResourceCPU] = resource.NewScaledQuantity(int64(cpu.Units.Value()), resource.Milli).DeepCopy() // nolint: gosec
}

if gpu := service.Resources.GPU; gpu != nil && gpu.Units.Value() > 0 {
Expand All @@ -105,8 +105,8 @@ func (b *Workload) container() corev1.Container {
// - can specify GPU in both limits and requests but these two values must be equal.
// - cannot specify GPU requests without specifying limits.
requestedGPU := sdlutil.ComputeCommittedResources(b.settings.GPUCommitLevel, gpu.Units)
kcontainer.Resources.Requests[resourceName] = resource.NewQuantity(int64(requestedGPU.Value()), resource.DecimalSI).DeepCopy()
kcontainer.Resources.Limits[resourceName] = resource.NewQuantity(int64(gpu.Units.Value()), resource.DecimalSI).DeepCopy()
kcontainer.Resources.Requests[resourceName] = resource.NewQuantity(int64(requestedGPU.Value()), resource.DecimalSI).DeepCopy() // nolint: gosec
kcontainer.Resources.Limits[resourceName] = resource.NewQuantity(int64(gpu.Units.Value()), resource.DecimalSI).DeepCopy() // nolint: gosec
}

var requestedMem uint64
Expand All @@ -120,8 +120,8 @@ func (b *Workload) container() corev1.Container {
if !persistent {
if class == "" {
requestedStorage := sdlutil.ComputeCommittedResources(b.settings.StorageCommitLevel, ephemeral.Quantity)
kcontainer.Resources.Requests[corev1.ResourceEphemeralStorage] = resource.NewQuantity(int64(requestedStorage.Value()), resource.DecimalSI).DeepCopy()
kcontainer.Resources.Limits[corev1.ResourceEphemeralStorage] = resource.NewQuantity(int64(ephemeral.Quantity.Value()), resource.DecimalSI).DeepCopy()
kcontainer.Resources.Requests[corev1.ResourceEphemeralStorage] = resource.NewQuantity(int64(requestedStorage.Value()), resource.DecimalSI).DeepCopy() // nolint: gosec
kcontainer.Resources.Limits[corev1.ResourceEphemeralStorage] = resource.NewQuantity(int64(ephemeral.Quantity.Value()), resource.DecimalSI).DeepCopy() // nolint: gosec
} else if class == "ram" {
requestedMem += ephemeral.Quantity.Value()
}
Expand All @@ -131,8 +131,8 @@ func (b *Workload) container() corev1.Container {
// fixme: ram is never expected to be nil
if mem := service.Resources.Memory; mem != nil {
requestedRAM := sdlutil.ComputeCommittedResources(b.settings.MemoryCommitLevel, mem.Quantity)
kcontainer.Resources.Requests[corev1.ResourceMemory] = resource.NewQuantity(int64(requestedRAM.Value()), resource.DecimalSI).DeepCopy()
kcontainer.Resources.Limits[corev1.ResourceMemory] = resource.NewQuantity(int64(mem.Quantity.Value()+requestedMem), resource.DecimalSI).DeepCopy()
kcontainer.Resources.Requests[corev1.ResourceMemory] = resource.NewQuantity(int64(requestedRAM.Value()), resource.DecimalSI).DeepCopy() // nolint: gosec
kcontainer.Resources.Limits[corev1.ResourceMemory] = resource.NewQuantity(int64(mem.Quantity.Value()+requestedMem), resource.DecimalSI).DeepCopy() // nolint: gosec
}

if service.Params != nil {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (b *Workload) container() corev1.Container {

for _, expose := range service.Expose {
kcontainer.Ports = append(kcontainer.Ports, corev1.ContainerPort{
ContainerPort: int32(expose.Port),
ContainerPort: int32(expose.Port), // nolint: gosec
})
}

Expand Down Expand Up @@ -232,7 +232,7 @@ func (b *Workload) persistentVolumeClaims() []corev1.PersistentVolumeClaim {
},
}

pvc.Spec.Resources.Requests[corev1.ResourceStorage] = resource.NewQuantity(int64(storage.Quantity.Value()), resource.DecimalSI).DeepCopy()
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = resource.NewQuantity(int64(storage.Quantity.Value()), resource.DecimalSI).DeepCopy() // nolint: gosec

attr = storage.Attributes.Find(sdl.StorageAttributeClass)
if class, valid := attr.AsString(); valid && class != sdl.StorageClassDefault {
Expand Down Expand Up @@ -262,7 +262,7 @@ func (b *Workload) runtimeClass() *string {

func (b *Workload) replicas() *int32 {
replicas := new(int32)
*replicas = int32(b.deployment.ManifestGroup().Services[b.serviceIdx].Count)
*replicas = int32(b.deployment.ManifestGroup().Services[b.serviceIdx].Count) // nolint: gosec

return replicas
}
Expand Down
5 changes: 2 additions & 3 deletions cluster/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ func newEventsFeedList(ctx context.Context, events []eventsv1.Event) ctypes.Even

done:
for _, evt := range events {
evt := evt
if !wtch.SendEvent(&evt) {
break done
}
Expand Down Expand Up @@ -552,8 +551,8 @@ func (c *client) ForwardedPortStatus(ctx context.Context, leaseID mtypes.LeaseID
// Record the actual port inside the container that is exposed
v := ctypes.ForwardedPortStatus{
Host: settings.ClusterPublicHostname,
Port: uint16(port.TargetPort.IntVal),
ExternalPort: uint16(nodePort),
Port: uint16(port.TargetPort.IntVal), // nolint: gosec
ExternalPort: uint16(nodePort), // nolint: gosec
Name: deploymentName,
}

Expand Down
2 changes: 1 addition & 1 deletion cluster/kube/operators/clients/inventory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func newInventoryConnector(ctx context.Context, svc *corev1.Service, invch chan<
return nil, err
}
} else {
ports = append(ports, portforward.ForwardedPort{Local: uint16(svcPort)})
ports = append(ports, portforward.ForwardedPort{Local: uint16(svcPort)}) // nolint: gosec
}

endpoint = fmt.Sprintf("localhost:%d", ports[0].Local)
Expand Down
6 changes: 3 additions & 3 deletions cluster/kube/operators/clients/inventory/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (sf *inventoryScaffold) createFakeOperator(t *testing.T) {
{
Name: "grpc",
Protocol: "tcp",
Port: int32(sf.ports[0]),
Port: int32(sf.ports[0]), // nolint: gosec
TargetPort: intstr.FromString("grpc"),
},
},
Expand All @@ -144,7 +144,7 @@ func (sf *inventoryScaffold) createFakeOperator(t *testing.T) {
Ports: []corev1.ContainerPort{
{
Name: "grpc",
ContainerPort: int32(sf.ports[0]),
ContainerPort: int32(sf.ports[0]), // nolint: gosec
},
},
},
Expand All @@ -168,7 +168,7 @@ func (sf *inventoryScaffold) createFakeOperator(t *testing.T) {
Ports: []corev1.ContainerPort{
{
Name: "grpc",
ContainerPort: int32(sf.ports[0]),
ContainerPort: int32(sf.ports[0]), // nolint: gosec
},
},
},
Expand Down
24 changes: 12 additions & 12 deletions cluster/kube/operators/clients/inventory/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,32 +335,32 @@ func (inv *inventory) Metrics() inventoryV1.Metrics {
invNode := inventoryV1.NodeMetrics{
Name: nd.Name,
Allocatable: inventoryV1.ResourcesMetric{
CPU: uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()),
GPU: uint64(nd.Resources.GPU.Quantity.Allocatable.Value()),
Memory: uint64(nd.Resources.Memory.Quantity.Allocatable.Value()),
StorageEphemeral: uint64(nd.Resources.EphemeralStorage.Allocatable.Value()),
CPU: uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()), // nolint: gosec
GPU: uint64(nd.Resources.GPU.Quantity.Allocatable.Value()), // nolint: gosec
Memory: uint64(nd.Resources.Memory.Quantity.Allocatable.Value()), // nolint: gosec
StorageEphemeral: uint64(nd.Resources.EphemeralStorage.Allocatable.Value()), // nolint: gosec
},
}

cpuTotal += uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue())
gpuTotal += uint64(nd.Resources.GPU.Quantity.Allocatable.Value())
memoryTotal += uint64(nd.Resources.Memory.Quantity.Allocatable.Value())
storageEphemeralTotal += uint64(nd.Resources.EphemeralStorage.Allocatable.Value())
cpuTotal += uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()) // nolint: gosec
gpuTotal += uint64(nd.Resources.GPU.Quantity.Allocatable.Value()) // nolint: gosec
memoryTotal += uint64(nd.Resources.Memory.Quantity.Allocatable.Value()) // nolint: gosec
storageEphemeralTotal += uint64(nd.Resources.EphemeralStorage.Allocatable.Value()) // nolint: gosec

avail := nd.Resources.CPU.Quantity.Available()
invNode.Available.CPU = uint64(avail.MilliValue())
invNode.Available.CPU = uint64(avail.MilliValue()) // nolint: gosec
cpuAvailable += invNode.Available.CPU

avail = nd.Resources.GPU.Quantity.Available()
invNode.Available.GPU = uint64(avail.Value())
invNode.Available.GPU = uint64(avail.Value()) // nolint: gosec
gpuAvailable += invNode.Available.GPU

avail = nd.Resources.Memory.Quantity.Available()
invNode.Available.Memory = uint64(avail.Value())
invNode.Available.Memory = uint64(avail.Value()) // nolint: gosec
memoryAvailable += invNode.Available.Memory

avail = nd.Resources.EphemeralStorage.Available()
invNode.Available.StorageEphemeral = uint64(avail.Value())
invNode.Available.StorageEphemeral = uint64(avail.Value()) // nolint: gosec
storageEphemeralAvailable += invNode.Available.StorageEphemeral

ret.Nodes = append(ret.Nodes, invNode)
Expand Down
8 changes: 4 additions & 4 deletions cluster/kube/operators/clients/ip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ type fakeOperator struct {
}

func (fio *fakeOperator) setHealthStatus(status int) {
atomic.StoreUint32(&fio.healthStatus, uint32(status))
atomic.StoreUint32(&fio.healthStatus, uint32(status)) // nolint: gosec
}

func (fio *fakeOperator) setIPLeaseStatusResponse(status int, body []byte) {
atomic.StoreUint32(&fio.ipLeaseStatusStatus, uint32(status))
atomic.StoreUint32(&fio.ipLeaseStatusStatus, uint32(status)) // nolint: gosec
fio.ipLeaseStatusResponse.Store(body)
}

func (fio *fakeOperator) setIPUsageResponse(status int, body []byte) {
atomic.StoreUint32(&fio.ipUsageStatus, uint32(status))
atomic.StoreUint32(&fio.ipUsageStatus, uint32(status)) // nolint: gosec
fio.ipUsageResponse.Store(body)
}

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestIPOperatorClient(t *testing.T) {

srv := net.SRV{
Target: "localhost",
Port: uint16(portNumber),
Port: uint16(portNumber), // nolint: gosec
Priority: 0,
Weight: 0,
}
Expand Down
Loading

0 comments on commit 3b1cf60

Please sign in to comment.