From 8d1f01b32d84b679f4f2601fc82fbae4f2da47c9 Mon Sep 17 00:00:00 2001 From: Vanja Pejovic Date: Thu, 19 Dec 2024 10:47:54 -0500 Subject: [PATCH 1/4] Use action.platform instead of command.platform in one more place (#8080) I guess I missed this one as well. --- .../remote_execution/execution_server/execution_server.go | 2 +- enterprise/server/tasksize/tasksize.go | 4 ++-- enterprise/server/tasksize/tasksize_test.go | 8 ++++---- server/interfaces/interfaces.go | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/enterprise/server/remote_execution/execution_server/execution_server.go b/enterprise/server/remote_execution/execution_server/execution_server.go index 829c5b5ad9d..0d8fe6d1eb7 100644 --- a/enterprise/server/remote_execution/execution_server/execution_server.go +++ b/enterprise/server/remote_execution/execution_server/execution_server.go @@ -1175,7 +1175,7 @@ func (s *ExecutionServer) markTaskComplete(ctx context.Context, actionResourceNa if sizer := s.env.GetTaskSizer(); sizer != nil && execErr == nil && executeResponse.GetResult().GetExitCode() == 0 { // TODO(vanja) should this be done when the executor got a cache hit? md := executeResponse.GetResult().GetExecutionMetadata() - if err := sizer.Update(ctx, cmd, md); err != nil { + if err := sizer.Update(ctx, action, cmd, md); err != nil { log.CtxWarningf(ctx, "Failed to update task size: %s", err) } } diff --git a/enterprise/server/tasksize/tasksize.go b/enterprise/server/tasksize/tasksize.go index 3e28472720a..9b4ccac3c93 100644 --- a/enterprise/server/tasksize/tasksize.go +++ b/enterprise/server/tasksize/tasksize.go @@ -201,13 +201,13 @@ func (s *taskSizer) Predict(ctx context.Context, task *repb.ExecutionTask) *scpb return ApplyLimits(task, s.model.Predict(ctx, task)) } -func (s *taskSizer) Update(ctx context.Context, cmd *repb.Command, md *repb.ExecutedActionMetadata) error { +func (s *taskSizer) Update(ctx context.Context, action *repb.Action, cmd *repb.Command, md *repb.ExecutedActionMetadata) error { if !*useMeasuredSizes { return nil } statusLabel := "ok" defer func() { - props, err := platform.ParseProperties(&repb.ExecutionTask{Command: cmd}) + props, err := platform.ParseProperties(&repb.ExecutionTask{Action: action, Command: cmd}) if err != nil { log.CtxInfof(ctx, "Failed to parse task properties: %s", err) } diff --git a/enterprise/server/tasksize/tasksize_test.go b/enterprise/server/tasksize/tasksize_test.go index 400b6c391bb..6aa82b56d76 100644 --- a/enterprise/server/tasksize/tasksize_test.go +++ b/enterprise/server/tasksize/tasksize_test.go @@ -252,7 +252,7 @@ func TestSizer_Get_ShouldReturnRecordedUsageStats(t *testing.T) { // Set the completed timestamp so that the exec duration is 2 seconds. ExecutionCompletedTimestamp: timestamppb.New(execStart.Add(2 * time.Second)), } - err = sizer.Update(ctx, task.GetCommand(), md) + err = sizer.Update(ctx, task.GetAction(), task.GetCommand(), md) require.NoError(t, err) @@ -294,7 +294,7 @@ func TestSizer_RespectsMilliCPULimit(t *testing.T) { ExecutionStartTimestamp: timestamppb.New(execStart), ExecutionCompletedTimestamp: timestamppb.New(execStart.Add(1 * time.Second)), } - err = sizer.Update(ctx, task.GetCommand(), md) + err = sizer.Update(ctx, task.GetAction(), task.GetCommand(), md) require.NoError(t, err) ts := sizer.Get(ctx, task) @@ -335,7 +335,7 @@ func TestSizer_RespectsMinimumSize(t *testing.T) { ExecutionCompletedTimestamp: timestamppb.New(execStart.Add(1 * time.Second)), } - err = sizer.Update(ctx, task.GetCommand(), md) + err = sizer.Update(ctx, task.GetAction(), task.GetCommand(), md) require.NoError(t, err) ts := sizer.Get(ctx, task) @@ -351,7 +351,7 @@ func TestSizer_RespectsMinimumSize(t *testing.T) { }, }, } - err = sizer.Update(ctx, task.GetCommand(), md) + err = sizer.Update(ctx, task.GetAction(), task.GetCommand(), md) require.NoError(t, err) ts = sizer.Get(ctx, task) diff --git a/server/interfaces/interfaces.go b/server/interfaces/interfaces.go index 1ee8e64ec74..807037a3c82 100644 --- a/server/interfaces/interfaces.go +++ b/server/interfaces/interfaces.go @@ -932,7 +932,7 @@ type TaskSizer interface { Predict(ctx context.Context, task *repb.ExecutionTask) *scpb.TaskSize // Update records a measured task size. - Update(ctx context.Context, cmd *repb.Command, md *repb.ExecutedActionMetadata) error + Update(ctx context.Context, action *repb.Action, cmd *repb.Command, md *repb.ExecutedActionMetadata) error } // ScheduledTask represents an execution task along with its scheduling metadata From 0823d82a2b39e29c843e1ecd315985f66153c741 Mon Sep 17 00:00:00 2001 From: Vanja Pejovic Date: Thu, 19 Dec 2024 11:55:52 -0500 Subject: [PATCH 2/4] Allow empty string in platform.CoerceContainerType (#8089) Currently, we can't tell the difference between the user not requesting an isolation type and requesting an invalid one. This change fixes that. --- enterprise/server/remote_execution/platform/platform.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/enterprise/server/remote_execution/platform/platform.go b/enterprise/server/remote_execution/platform/platform.go index c6e4f9daf01..3d98f17215a 100644 --- a/enterprise/server/remote_execution/platform/platform.go +++ b/enterprise/server/remote_execution/platform/platform.go @@ -147,9 +147,12 @@ const ( // previously supported. var KnownContainerTypes []ContainerType = []ContainerType{BareContainerType, PodmanContainerType, DockerContainerType, FirecrackerContainerType, OCIContainerType, SandboxContainerType} -// CoerceContainerType returns t if it is in KnownContainerTypes. Otherwise it -// returns "Unknown". +// CoerceContainerType returns t if it's empty or in KnownContainerTypes. +// Otherwise it returns "Unknown". func CoerceContainerType(t string) string { + if t == "" { + return "" + } if slices.Contains(KnownContainerTypes, ContainerType(t)) { return t } From 0f1ef9a596d75beaeeb40dec9d483e335ec543bc Mon Sep 17 00:00:00 2001 From: Brandon Duffany Date: Thu, 19 Dec 2024 12:55:05 -0500 Subject: [PATCH 3/4] Remove an unused function (#8088) --- server/buildbuddy_server/buildbuddy_server.go | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/server/buildbuddy_server/buildbuddy_server.go b/server/buildbuddy_server/buildbuddy_server.go index da7a3a73a6f..9eaa253ac32 100644 --- a/server/buildbuddy_server/buildbuddy_server.go +++ b/server/buildbuddy_server/buildbuddy_server.go @@ -1787,44 +1787,6 @@ func parseByteStreamURL(bsURL, filename string) (*bsLookup, error) { return nil, fmt.Errorf("unparsable bytestream URL: '%s'", bsURL) } -func (s *BuildBuddyServer) getAnyAPIKeyForInvocation(ctx context.Context, invocationID string) (*tables.APIKey, error) { - // LookupInvocation implicitly checks the logged-in user's access to invocationID. - in, err := s.env.GetInvocationDB().LookupInvocation(ctx, invocationID) - if err != nil { - return nil, err - } - authDB := s.env.GetAuthDB() - if authDB == nil { - return nil, status.UnimplementedError("Not Implemented") - } - // We can use any API key because LookupInvocation above already confirmed authorization. - groupKey, err := authDB.GetAPIKeyForInternalUseOnly(ctx, in.GroupID) - if err != nil && !status.IsNotFoundError(err) { - return nil, err - } - if groupKey != nil { - return groupKey, nil - } - // If we couldn't find any group-level keys, look up user-level keys for - // the authenticated user. This handles the edge case where an org - // *only* has user-level keys. - if !authDB.GetUserOwnedKeysEnabled() { - return nil, status.NotFoundErrorf("the organization does not have any API keys configured") - } - u, err := s.env.GetAuthenticator().AuthenticatedUser(ctx) - if err != nil { - return nil, err - } - apiKeys, err := authDB.GetUserAPIKeys(ctx, u.GetUserID(), in.GroupID) - if err != nil { - return nil, err - } - if len(apiKeys) == 0 { - return nil, status.NotFoundError("The group that owns this invocation doesn't have any API keys configured.") - } - return apiKeys[0], nil -} - // ServeHTTP handles requests for build logs and artifacts either by looking // them up from our cache servers using the bytestream API or pulling them // from blobstore. From 1cad61dd53e4a22e3b86fa81bfe42626d58d08ba Mon Sep 17 00:00:00 2001 From: Vanja Pejovic Date: Thu, 19 Dec 2024 12:58:47 -0500 Subject: [PATCH 4/4] Save requested and effective timeouts to clickhouse (#8081) I expect each of these to add about 1% to the write throughput. --- .../remote_execution/execution_server/BUILD | 1 + .../execution_server/execution_server.go | 18 +++++++++++++++--- .../execution_server/execution_server_test.go | 7 ++++++- .../server/remote_execution/executor/BUILD | 1 + .../remote_execution/executor/executor.go | 2 ++ proto/BUILD | 2 ++ proto/execution_stats.proto | 8 ++++++++ proto/remote_execution.proto | 3 +++ server/util/clickhouse/clickhouse.go | 2 ++ server/util/clickhouse/schema/schema.go | 5 +++++ 10 files changed, 45 insertions(+), 4 deletions(-) diff --git a/enterprise/server/remote_execution/execution_server/BUILD b/enterprise/server/remote_execution/execution_server/BUILD index 8230d7df426..8e2477ca92f 100644 --- a/enterprise/server/remote_execution/execution_server/BUILD +++ b/enterprise/server/remote_execution/execution_server/BUILD @@ -94,6 +94,7 @@ go_test( "@org_golang_google_grpc//status", "@org_golang_google_protobuf//testing/protocmp", "@org_golang_google_protobuf//types/known/anypb", + "@org_golang_google_protobuf//types/known/durationpb", "@org_golang_google_protobuf//types/known/timestamppb", ], ) diff --git a/enterprise/server/remote_execution/execution_server/execution_server.go b/enterprise/server/remote_execution/execution_server/execution_server.go index 0d8fe6d1eb7..cbaa90df8a1 100644 --- a/enterprise/server/remote_execution/execution_server/execution_server.go +++ b/enterprise/server/remote_execution/execution_server/execution_server.go @@ -331,7 +331,14 @@ func (s *ExecutionServer) updateExecution(ctx context.Context, executionID strin return dbErr } -func (s *ExecutionServer) recordExecution(ctx context.Context, executionID string, md *repb.ExecutedActionMetadata, auxMeta *espb.ExecutionAuxiliaryMetadata, properties *platform.Properties) error { +func (s *ExecutionServer) recordExecution( + ctx context.Context, + executionID string, + action *repb.Action, + md *repb.ExecutedActionMetadata, + auxMeta *espb.ExecutionAuxiliaryMetadata, + properties *platform.Properties) error { + if s.env.GetExecutionCollector() == nil || !olapdbconfig.WriteExecutionsToOLAPDBEnabled() { return nil } @@ -369,6 +376,9 @@ func (s *ExecutionServer) recordExecution(ctx context.Context, executionID strin executionProto.EffectiveIsolationType = auxMeta.GetIsolationType() executionProto.RequestedIsolationType = platform.CoerceContainerType(properties.WorkloadIsolationType) + executionProto.EffectiveTimeoutUsec = auxMeta.GetTimeout().AsDuration().Microseconds() + executionProto.RequestedTimeoutUsec = action.GetTimeout().AsDuration().Microseconds() + executionProto.RequestedComputeUnits = properties.EstimatedComputeUnits executionProto.RequestedMemoryBytes = properties.EstimatedMemoryBytes executionProto.RequestedMilliCpu = properties.EstimatedMilliCPU @@ -1053,6 +1063,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio var auxMeta *espb.ExecutionAuxiliaryMetadata var properties *platform.Properties + var action *repb.Action if stage == repb.ExecutionStage_COMPLETED && response != nil { auxMeta = new(espb.ExecutionAuxiliaryMetadata) ok, err := rexec.AuxiliaryMetadata(response.GetResult().GetExecutionMetadata(), auxMeta) @@ -1066,7 +1077,8 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio return status.WrapErrorf(err, "Failed to parse taskID") } actionRN = digest.NewResourceName(actionRN.GetDigest(), actionRN.GetInstanceName(), rspb.CacheType_AC, actionRN.GetDigestFunction()) - action, cmd, err := s.fetchActionAndCommand(ctx, actionRN) + var cmd *repb.Command + action, cmd, err = s.fetchActionAndCommand(ctx, actionRN) if err != nil { return status.UnavailableErrorf("Failed to fetch action and command: %s", err) } @@ -1101,7 +1113,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio return status.WrapErrorf(err, "failed to update execution %q", taskID) } lastWrite = time.Now() - if err := s.recordExecution(ctx, taskID, response.GetResult().GetExecutionMetadata(), auxMeta, properties); err != nil { + if err := s.recordExecution(ctx, taskID, action, response.GetResult().GetExecutionMetadata(), auxMeta, properties); err != nil { log.CtxErrorf(ctx, "failed to record execution %q: %s", taskID, err) } return nil diff --git a/enterprise/server/remote_execution/execution_server/execution_server_test.go b/enterprise/server/remote_execution/execution_server/execution_server_test.go index 4f4cd2d040a..9873b5979c7 100644 --- a/enterprise/server/remote_execution/execution_server/execution_server_test.go +++ b/enterprise/server/remote_execution/execution_server/execution_server_test.go @@ -45,6 +45,7 @@ import ( sipb "github.com/buildbuddy-io/buildbuddy/proto/stored_invocation" gstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/durationpb" tspb "google.golang.org/protobuf/types/known/timestamppb" ) @@ -378,6 +379,7 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { clientCtx = metadata.AppendToOutgoingContext(clientCtx, "x-buildbuddy-platform."+k, v) } arn := uploadAction(clientCtx, t, env, instanceName, digestFunction, &repb.Action{ + Timeout: &durationpb.Duration{Seconds: 10}, DoNotCache: test.doNotCache, Platform: &repb.Platform{Properties: []*repb.Platform_Property{ {Name: "EstimatedComputeUnits", Value: "2.5"}, @@ -419,6 +421,7 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { } if test.publishMoreMetadata { aux.IsolationType = "firecracker" + aux.Timeout = &durationpb.Duration{Seconds: 11} aux.ExecuteRequest = &repb.ExecuteRequest{ SkipCacheLookup: true, // This is only used for writing to clickhouse ExecutionPolicy: &repb.ExecutionPolicy{Priority: 999}, @@ -534,11 +537,11 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { RequestedMemoryBytes: 2000, RequestedMilliCpu: 1500, RequestedIsolationType: "oci", + RequestedTimeoutUsec: 10000000, } if test.publishMoreMetadata { expectedExecution.ExecutionPriority = 999 expectedExecution.SkipCacheLookup = true - expectedExecution.EffectiveIsolationType = "firecracker" expectedExecution.EstimatedFreeDiskBytes = 1001 expectedExecution.PreviousMeasuredMemoryBytes = 2001 expectedExecution.PreviousMeasuredMilliCpu = 2002 @@ -546,6 +549,8 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { expectedExecution.PredictedMemoryBytes = 3001 expectedExecution.PredictedMilliCpu = 3002 expectedExecution.PredictedFreeDiskBytes = 3003 + expectedExecution.EffectiveIsolationType = "firecracker" + expectedExecution.EffectiveTimeoutUsec = 11000000 } diff := cmp.Diff( expectedExecution, diff --git a/enterprise/server/remote_execution/executor/BUILD b/enterprise/server/remote_execution/executor/BUILD index 322efe5cb3d..4b0ce6c28d3 100644 --- a/enterprise/server/remote_execution/executor/BUILD +++ b/enterprise/server/remote_execution/executor/BUILD @@ -33,6 +33,7 @@ go_library( "//server/util/tracing", "@com_github_prometheus_client_golang//prometheus", "@org_golang_google_protobuf//types/known/anypb", + "@org_golang_google_protobuf//types/known/durationpb", "@org_golang_google_protobuf//types/known/timestamppb", ], ) diff --git a/enterprise/server/remote_execution/executor/executor.go b/enterprise/server/remote_execution/executor/executor.go index 51ae69f9b3b..fcd39c5559e 100644 --- a/enterprise/server/remote_execution/executor/executor.go +++ b/enterprise/server/remote_execution/executor/executor.go @@ -32,6 +32,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/util/tracing" "github.com/prometheus/client_golang/prometheus" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" espb "github.com/buildbuddy-io/buildbuddy/proto/execution_stats" @@ -313,6 +314,7 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch // These errors are failure-specific. Pass through unchanged. return finishWithErrFn(err) } + auxMetadata.Timeout = durationpb.New(execTimeouts.TerminateAfter) now := time.Now() terminateAt := now.Add(execTimeouts.TerminateAfter) diff --git a/proto/BUILD b/proto/BUILD index 84e0e93cd46..7459481a369 100644 --- a/proto/BUILD +++ b/proto/BUILD @@ -176,6 +176,7 @@ proto_library( ":remote_execution_proto", ":scheduler_proto", ":stat_filter_proto", + "@com_google_protobuf//:duration_proto", "@com_google_protobuf//:timestamp_proto", "@googleapis//google/longrunning:operations_proto", "@googleapis//google/rpc:status_proto", @@ -1961,6 +1962,7 @@ ts_proto_library( deps = [ ":acl_ts_proto", ":context_ts_proto", + ":duration_ts_proto", ":google_longrunning_ts_proto", ":grpc_status_ts_proto", ":invocation_status_ts_proto", diff --git a/proto/execution_stats.proto b/proto/execution_stats.proto index 9ee0fb9ea88..03c05eb916b 100644 --- a/proto/execution_stats.proto +++ b/proto/execution_stats.proto @@ -2,6 +2,7 @@ syntax = "proto3"; import "google/longrunning/operations.proto"; import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; import "google/rpc/status.proto"; import "proto/acl.proto"; import "proto/context.proto"; @@ -135,9 +136,16 @@ message Execution { message ExecutionAuxiliaryMetadata { // Platform overrides set via remote header. build.bazel.remote.execution.v2.Platform platform_overrides = 1; + + // The effective isolation type. Usually either user requested or the + // default. string isolation_type = 2; + build.bazel.remote.execution.v2.ExecuteRequest execute_request = 3; scheduler.SchedulingMetadata scheduling_metadata = 4; + + // The effective action timeout. Either user requested or the default. + google.protobuf.Duration timeout = 6; } message ExecutionLookup { diff --git a/proto/remote_execution.proto b/proto/remote_execution.proto index e272d5d9e75..2b17ef7a482 100644 --- a/proto/remote_execution.proto +++ b/proto/remote_execution.proto @@ -2444,6 +2444,9 @@ message StoredExecution { int64 output_upload_start_timestamp_usec = 26; int64 output_upload_completed_timestamp_usec = 27; + int64 requested_timeout_usec = 55; + int64 effective_timeout_usec = 56; + int32 invocation_link_type = 28; int32 status_code = 29; diff --git a/server/util/clickhouse/clickhouse.go b/server/util/clickhouse/clickhouse.go index 867863265ac..114dc3850c4 100644 --- a/server/util/clickhouse/clickhouse.go +++ b/server/util/clickhouse/clickhouse.go @@ -246,6 +246,8 @@ func buildExecution(in *repb.StoredExecution, inv *sipb.StoredInvocation) *schem SkipCacheLookup: in.GetSkipCacheLookup(), RequestedIsolationType: in.GetRequestedIsolationType(), EffectiveIsolationType: in.GetEffectiveIsolationType(), + EffectiveTimeoutUsec: in.GetRequestedTimeoutUsec(), + RequestedTimeoutUsec: in.GetEffectiveTimeoutUsec(), InvocationLinkType: int8(in.GetInvocationLinkType()), User: inv.GetUser(), Host: inv.GetHost(), diff --git a/server/util/clickhouse/schema/schema.go b/server/util/clickhouse/schema/schema.go index 43d34e89145..e9002fd7c2e 100644 --- a/server/util/clickhouse/schema/schema.go +++ b/server/util/clickhouse/schema/schema.go @@ -215,6 +215,9 @@ type Execution struct { RequestedIsolationType string EffectiveIsolationType string `gorm:"type:LowCardinality(String)"` // This values comes from the executor + RequestedTimeoutUsec int64 + EffectiveTimeoutUsec int64 + // Long string fields OutputPath string StatusMessage string @@ -287,6 +290,8 @@ func (e *Execution) AdditionalFields() []string { "ExecutionPriority", "RequestedIsolationType", "EffectiveIsolationType", + "RequestedTimeoutUsec", + "EffectiveTimeoutUsec", } }