Skip to content

Commit

Permalink
Merge branch 'master' into vanja-mnemonic
Browse files Browse the repository at this point in the history
  • Loading branch information
vanja-p committed Dec 19, 2024
2 parents b7cf06d + 1cad61d commit eb74316
Show file tree
Hide file tree
Showing 15 changed files with 58 additions and 52 deletions.
1 change: 1 addition & 0 deletions enterprise/server/remote_execution/execution_server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ go_test(
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//testing/protocmp",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/durationpb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,14 @@ func (s *ExecutionServer) updateExecution(ctx context.Context, executionID strin
return dbErr
}

func (s *ExecutionServer) recordExecution(ctx context.Context, executionID string, md *repb.ExecutedActionMetadata, auxMeta *espb.ExecutionAuxiliaryMetadata, properties *platform.Properties) error {
func (s *ExecutionServer) recordExecution(
ctx context.Context,
executionID string,
action *repb.Action,
md *repb.ExecutedActionMetadata,
auxMeta *espb.ExecutionAuxiliaryMetadata,
properties *platform.Properties) error {

if s.env.GetExecutionCollector() == nil || !olapdbconfig.WriteExecutionsToOLAPDBEnabled() {
return nil
}
Expand Down Expand Up @@ -370,6 +377,9 @@ func (s *ExecutionServer) recordExecution(ctx context.Context, executionID strin
executionProto.EffectiveIsolationType = auxMeta.GetIsolationType()
executionProto.RequestedIsolationType = platform.CoerceContainerType(properties.WorkloadIsolationType)

executionProto.EffectiveTimeoutUsec = auxMeta.GetTimeout().AsDuration().Microseconds()
executionProto.RequestedTimeoutUsec = action.GetTimeout().AsDuration().Microseconds()

executionProto.RequestedComputeUnits = properties.EstimatedComputeUnits
executionProto.RequestedMemoryBytes = properties.EstimatedMemoryBytes
executionProto.RequestedMilliCpu = properties.EstimatedMilliCPU
Expand Down Expand Up @@ -1054,6 +1064,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio

var auxMeta *espb.ExecutionAuxiliaryMetadata
var properties *platform.Properties
var action *repb.Action
if stage == repb.ExecutionStage_COMPLETED && response != nil {
auxMeta = new(espb.ExecutionAuxiliaryMetadata)
ok, err := rexec.AuxiliaryMetadata(response.GetResult().GetExecutionMetadata(), auxMeta)
Expand All @@ -1067,7 +1078,8 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
return status.WrapErrorf(err, "Failed to parse taskID")
}
actionRN = digest.NewResourceName(actionRN.GetDigest(), actionRN.GetInstanceName(), rspb.CacheType_AC, actionRN.GetDigestFunction())
action, cmd, err := s.fetchActionAndCommand(ctx, actionRN)
var cmd *repb.Command
action, cmd, err = s.fetchActionAndCommand(ctx, actionRN)
if err != nil {
return status.UnavailableErrorf("Failed to fetch action and command: %s", err)
}
Expand Down Expand Up @@ -1102,7 +1114,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
return status.WrapErrorf(err, "failed to update execution %q", taskID)
}
lastWrite = time.Now()
if err := s.recordExecution(ctx, taskID, response.GetResult().GetExecutionMetadata(), auxMeta, properties); err != nil {
if err := s.recordExecution(ctx, taskID, action, response.GetResult().GetExecutionMetadata(), auxMeta, properties); err != nil {
log.CtxErrorf(ctx, "failed to record execution %q: %s", taskID, err)
}
return nil
Expand Down Expand Up @@ -1176,7 +1188,7 @@ func (s *ExecutionServer) markTaskComplete(ctx context.Context, actionResourceNa
if sizer := s.env.GetTaskSizer(); sizer != nil && execErr == nil && executeResponse.GetResult().GetExitCode() == 0 {
// TODO(vanja) should this be done when the executor got a cache hit?
md := executeResponse.GetResult().GetExecutionMetadata()
if err := sizer.Update(ctx, cmd, md); err != nil {
if err := sizer.Update(ctx, action, cmd, md); err != nil {
log.CtxWarningf(ctx, "Failed to update task size: %s", err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
sipb "github.com/buildbuddy-io/buildbuddy/proto/stored_invocation"
gstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
tspb "google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -380,6 +381,7 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) {
clientCtx = metadata.AppendToOutgoingContext(clientCtx, "x-buildbuddy-platform."+k, v)
}
arn := uploadAction(clientCtx, t, env, instanceName, digestFunction, &repb.Action{
Timeout: &durationpb.Duration{Seconds: 10},
DoNotCache: test.doNotCache,
Platform: &repb.Platform{Properties: []*repb.Platform_Property{
{Name: "EstimatedComputeUnits", Value: "2.5"},
Expand Down Expand Up @@ -421,6 +423,7 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) {
}
if test.publishMoreMetadata {
aux.IsolationType = "firecracker"
aux.Timeout = &durationpb.Duration{Seconds: 11}
aux.ExecuteRequest = &repb.ExecuteRequest{
SkipCacheLookup: true, // This is only used for writing to clickhouse
ExecutionPolicy: &repb.ExecutionPolicy{Priority: 999},
Expand Down Expand Up @@ -536,20 +539,22 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) {
RequestedMemoryBytes: 2000,
RequestedMilliCpu: 1500,
RequestedIsolationType: "oci",
RequestedTimeoutUsec: 10000000,
TargetLabel: "//some:test",
ActionMnemonic: "TestRunner",
}
if test.publishMoreMetadata {
expectedExecution.ExecutionPriority = 999
expectedExecution.SkipCacheLookup = true
expectedExecution.EffectiveIsolationType = "firecracker"
expectedExecution.EstimatedFreeDiskBytes = 1001
expectedExecution.PreviousMeasuredMemoryBytes = 2001
expectedExecution.PreviousMeasuredMilliCpu = 2002
expectedExecution.PreviousMeasuredFreeDiskBytes = 2003
expectedExecution.PredictedMemoryBytes = 3001
expectedExecution.PredictedMilliCpu = 3002
expectedExecution.PredictedFreeDiskBytes = 3003
expectedExecution.EffectiveIsolationType = "firecracker"
expectedExecution.EffectiveTimeoutUsec = 11000000
}
diff := cmp.Diff(
expectedExecution,
Expand Down
1 change: 1 addition & 0 deletions enterprise/server/remote_execution/executor/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//server/util/tracing",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/durationpb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
2 changes: 2 additions & 0 deletions enterprise/server/remote_execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/tracing"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

espb "github.com/buildbuddy-io/buildbuddy/proto/execution_stats"
Expand Down Expand Up @@ -313,6 +314,7 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch
// These errors are failure-specific. Pass through unchanged.
return finishWithErrFn(err)
}
auxMetadata.Timeout = durationpb.New(execTimeouts.TerminateAfter)

now := time.Now()
terminateAt := now.Add(execTimeouts.TerminateAfter)
Expand Down
7 changes: 5 additions & 2 deletions enterprise/server/remote_execution/platform/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,12 @@ const (
// previously supported.
var KnownContainerTypes []ContainerType = []ContainerType{BareContainerType, PodmanContainerType, DockerContainerType, FirecrackerContainerType, OCIContainerType, SandboxContainerType}

// CoerceContainerType returns t if it is in KnownContainerTypes. Otherwise it
// returns "Unknown".
// CoerceContainerType returns t if it's empty or in KnownContainerTypes.
// Otherwise it returns "Unknown".
func CoerceContainerType(t string) string {
if t == "" {
return ""
}
if slices.Contains(KnownContainerTypes, ContainerType(t)) {
return t
}
Expand Down
4 changes: 2 additions & 2 deletions enterprise/server/tasksize/tasksize.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,13 @@ func (s *taskSizer) Predict(ctx context.Context, task *repb.ExecutionTask) *scpb
return ApplyLimits(task, s.model.Predict(ctx, task))
}

func (s *taskSizer) Update(ctx context.Context, cmd *repb.Command, md *repb.ExecutedActionMetadata) error {
func (s *taskSizer) Update(ctx context.Context, action *repb.Action, cmd *repb.Command, md *repb.ExecutedActionMetadata) error {
if !*useMeasuredSizes {
return nil
}
statusLabel := "ok"
defer func() {
props, err := platform.ParseProperties(&repb.ExecutionTask{Command: cmd})
props, err := platform.ParseProperties(&repb.ExecutionTask{Action: action, Command: cmd})
if err != nil {
log.CtxInfof(ctx, "Failed to parse task properties: %s", err)
}
Expand Down
8 changes: 4 additions & 4 deletions enterprise/server/tasksize/tasksize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestSizer_Get_ShouldReturnRecordedUsageStats(t *testing.T) {
// Set the completed timestamp so that the exec duration is 2 seconds.
ExecutionCompletedTimestamp: timestamppb.New(execStart.Add(2 * time.Second)),
}
err = sizer.Update(ctx, task.GetCommand(), md)
err = sizer.Update(ctx, task.GetAction(), task.GetCommand(), md)

require.NoError(t, err)

Expand Down Expand Up @@ -294,7 +294,7 @@ func TestSizer_RespectsMilliCPULimit(t *testing.T) {
ExecutionStartTimestamp: timestamppb.New(execStart),
ExecutionCompletedTimestamp: timestamppb.New(execStart.Add(1 * time.Second)),
}
err = sizer.Update(ctx, task.GetCommand(), md)
err = sizer.Update(ctx, task.GetAction(), task.GetCommand(), md)
require.NoError(t, err)

ts := sizer.Get(ctx, task)
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestSizer_RespectsMinimumSize(t *testing.T) {
ExecutionCompletedTimestamp: timestamppb.New(execStart.Add(1 * time.Second)),
}

err = sizer.Update(ctx, task.GetCommand(), md)
err = sizer.Update(ctx, task.GetAction(), task.GetCommand(), md)
require.NoError(t, err)

ts := sizer.Get(ctx, task)
Expand All @@ -351,7 +351,7 @@ func TestSizer_RespectsMinimumSize(t *testing.T) {
},
},
}
err = sizer.Update(ctx, task.GetCommand(), md)
err = sizer.Update(ctx, task.GetAction(), task.GetCommand(), md)
require.NoError(t, err)

ts = sizer.Get(ctx, task)
Expand Down
2 changes: 2 additions & 0 deletions proto/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ proto_library(
":remote_execution_proto",
":scheduler_proto",
":stat_filter_proto",
"@com_google_protobuf//:duration_proto",
"@com_google_protobuf//:timestamp_proto",
"@googleapis//google/longrunning:operations_proto",
"@googleapis//google/rpc:status_proto",
Expand Down Expand Up @@ -1961,6 +1962,7 @@ ts_proto_library(
deps = [
":acl_ts_proto",
":context_ts_proto",
":duration_ts_proto",
":google_longrunning_ts_proto",
":grpc_status_ts_proto",
":invocation_status_ts_proto",
Expand Down
8 changes: 8 additions & 0 deletions proto/execution_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

import "google/longrunning/operations.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/rpc/status.proto";
import "proto/acl.proto";
import "proto/context.proto";
Expand Down Expand Up @@ -135,9 +136,16 @@ message Execution {
message ExecutionAuxiliaryMetadata {
// Platform overrides set via remote header.
build.bazel.remote.execution.v2.Platform platform_overrides = 1;

// The effective isolation type. Usually either user requested or the
// default.
string isolation_type = 2;

build.bazel.remote.execution.v2.ExecuteRequest execute_request = 3;
scheduler.SchedulingMetadata scheduling_metadata = 4;

// The effective action timeout. Either user requested or the default.
google.protobuf.Duration timeout = 6;
}

message ExecutionLookup {
Expand Down
3 changes: 3 additions & 0 deletions proto/remote_execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2444,6 +2444,9 @@ message StoredExecution {
int64 output_upload_start_timestamp_usec = 26;
int64 output_upload_completed_timestamp_usec = 27;

int64 requested_timeout_usec = 55;
int64 effective_timeout_usec = 56;

int32 invocation_link_type = 28;

int32 status_code = 29;
Expand Down
38 changes: 0 additions & 38 deletions server/buildbuddy_server/buildbuddy_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1787,44 +1787,6 @@ func parseByteStreamURL(bsURL, filename string) (*bsLookup, error) {
return nil, fmt.Errorf("unparsable bytestream URL: '%s'", bsURL)
}

func (s *BuildBuddyServer) getAnyAPIKeyForInvocation(ctx context.Context, invocationID string) (*tables.APIKey, error) {
// LookupInvocation implicitly checks the logged-in user's access to invocationID.
in, err := s.env.GetInvocationDB().LookupInvocation(ctx, invocationID)
if err != nil {
return nil, err
}
authDB := s.env.GetAuthDB()
if authDB == nil {
return nil, status.UnimplementedError("Not Implemented")
}
// We can use any API key because LookupInvocation above already confirmed authorization.
groupKey, err := authDB.GetAPIKeyForInternalUseOnly(ctx, in.GroupID)
if err != nil && !status.IsNotFoundError(err) {
return nil, err
}
if groupKey != nil {
return groupKey, nil
}
// If we couldn't find any group-level keys, look up user-level keys for
// the authenticated user. This handles the edge case where an org
// *only* has user-level keys.
if !authDB.GetUserOwnedKeysEnabled() {
return nil, status.NotFoundErrorf("the organization does not have any API keys configured")
}
u, err := s.env.GetAuthenticator().AuthenticatedUser(ctx)
if err != nil {
return nil, err
}
apiKeys, err := authDB.GetUserAPIKeys(ctx, u.GetUserID(), in.GroupID)
if err != nil {
return nil, err
}
if len(apiKeys) == 0 {
return nil, status.NotFoundError("The group that owns this invocation doesn't have any API keys configured.")
}
return apiKeys[0], nil
}

// ServeHTTP handles requests for build logs and artifacts either by looking
// them up from our cache servers using the bytestream API or pulling them
// from blobstore.
Expand Down
2 changes: 1 addition & 1 deletion server/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ type TaskSizer interface {
Predict(ctx context.Context, task *repb.ExecutionTask) *scpb.TaskSize

// Update records a measured task size.
Update(ctx context.Context, cmd *repb.Command, md *repb.ExecutedActionMetadata) error
Update(ctx context.Context, action *repb.Action, cmd *repb.Command, md *repb.ExecutedActionMetadata) error
}

// ScheduledTask represents an execution task along with its scheduling metadata
Expand Down
2 changes: 2 additions & 0 deletions server/util/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ func buildExecution(in *repb.StoredExecution, inv *sipb.StoredInvocation) *schem
SkipCacheLookup: in.GetSkipCacheLookup(),
RequestedIsolationType: in.GetRequestedIsolationType(),
EffectiveIsolationType: in.GetEffectiveIsolationType(),
EffectiveTimeoutUsec: in.GetRequestedTimeoutUsec(),
RequestedTimeoutUsec: in.GetEffectiveTimeoutUsec(),
InvocationLinkType: int8(in.GetInvocationLinkType()),
User: inv.GetUser(),
Host: inv.GetHost(),
Expand Down
5 changes: 5 additions & 0 deletions server/util/clickhouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ type Execution struct {
RequestedIsolationType string
EffectiveIsolationType string `gorm:"type:LowCardinality(String)"` // This values comes from the executor

RequestedTimeoutUsec int64
EffectiveTimeoutUsec int64

// Long string fields
OutputPath string
StatusMessage string
Expand Down Expand Up @@ -289,6 +292,8 @@ func (e *Execution) AdditionalFields() []string {
"ExecutionPriority",
"RequestedIsolationType",
"EffectiveIsolationType",
"RequestedTimeoutUsec",
"EffectiveTimeoutUsec",
}
}

Expand Down

0 comments on commit eb74316

Please sign in to comment.