Skip to content

Commit

Permalink
Merge branch 'master' of ssh://github.com/buildbuddy-io/buildbuddy
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerwilliams committed Oct 15, 2021
2 parents 1452589 + 95eff13 commit 25e8eac
Show file tree
Hide file tree
Showing 20 changed files with 389 additions and 77 deletions.
2 changes: 1 addition & 1 deletion enterprise/server/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func lookupUserFromSubID(env environment.Env, ctx context.Context, subID string)
g.use_group_owned_executors,
g.saml_idp_metadata_url,
ug.role
FROM Groups AS g, UserGroups AS ug
FROM `+"`Groups`"+` AS g, UserGroups AS ug
WHERE g.group_id = ug.group_group_id
AND ug.membership_status = ?
AND ug.user_user_id = ?
Expand Down
11 changes: 9 additions & 2 deletions enterprise/server/backends/userdb/userdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ func (d *UserDB) GetAPIKeys(ctx context.Context, groupID string) ([]*tables.APIK
return nil, status.InvalidArgumentError("Group ID cannot be empty.")
}

query := d.h.Raw(`SELECT api_key_id, value, label, perms, capabilities FROM APIKeys WHERE group_id = ?`, groupID)
query := d.h.Raw(`
SELECT api_key_id, value, label, perms, capabilities
FROM APIKeys
WHERE group_id = ?
ORDER BY label ASC
`, groupID)
rows, err := query.Rows()
if err != nil {
return nil, err
Expand Down Expand Up @@ -331,7 +336,7 @@ func (d *UserDB) InsertOrUpdateGroup(ctx context.Context, g *tables.Group) (stri

groupID = g.GroupID
res := tx.Exec(`
UPDATE Groups SET name = ?, url_identifier = ?, owned_domain = ?, sharing_enabled = ?,
UPDATE `+"`Groups`"+` SET name = ?, url_identifier = ?, owned_domain = ?, sharing_enabled = ?,
use_group_owned_executors = ?
WHERE group_id = ?`,
g.Name, g.URLIdentifier, g.OwnedDomain, g.SharingEnabled, g.UseGroupOwnedExecutors,
Expand Down Expand Up @@ -408,6 +413,8 @@ func (d *UserDB) GetGroupUsers(ctx context.Context, groupID string, statuses []g
orQuery, orArgs := o.Build()
q = q.AddWhereClause("("+orQuery+")", orArgs...)

q.SetOrderBy(`u.email`, true /*=ascending*/)

qString, qArgs := q.Build()
rows, err := d.h.Raw(qString, qArgs...).Rows()
if err != nil {
Expand Down
50 changes: 34 additions & 16 deletions enterprise/server/scheduling/scheduler_server/scheduler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const (
maxUnclaimedTasksTracked = 10_000
// TTL for sets used to track unclaimed tasks in Redis. TTL is extended when new tasks are added.
unclaimedTaskSetTTL = 1 * time.Hour
// Unclaimed tasks older than this are removed from the unclaimed tasks list.
unclaimedTaskMaxAge = 2 * time.Hour

unusedSchedulerClientExpiration = 5 * time.Minute
unusedSchedulerClientCheckInterval = 1 * time.Minute
Expand Down Expand Up @@ -551,8 +553,17 @@ func (np *nodePool) AddUnclaimedTask(ctx context.Context, taskID string) error {
if n > maxUnclaimedTasksTracked {
// Trim the oldest tasks. We use the task insertion timestamp as the score so the oldest task is at rank 0, next
// oldest is at rank 1 and so on. We subtract 1 because the indexes are inclusive.
return np.rdb.ZRemRangeByRank(ctx, key, 0, n-maxUnclaimedTasksTracked-1).Err()
if err := np.rdb.ZRemRangeByRank(ctx, key, 0, n-maxUnclaimedTasksTracked-1).Err(); err != nil {
log.Warningf("Error trimming unclaimed tasks: %s", err)
}
}

// Also trim any stale tasks from the set. The data is stored in score order so this is a cheap operation.
cutoff := time.Now().Add(-unclaimedTaskMaxAge).Unix()
if err := np.rdb.ZRemRangeByScore(ctx, key, "0", strconv.FormatInt(cutoff, 10)).Err(); err != nil {
log.Warningf("Error deleting old unclaimed tasks: %s", err)
}

return nil
}

Expand Down Expand Up @@ -1207,9 +1218,12 @@ func minInt(i, j int) int {
}

type enqueueTaskReservationOpts struct {
numReplicas int
maxAttempts int
alwaysScheduleLocally bool
numReplicas int
maxAttempts int
// This option determines whether tasks should be scheduled only on executors connected to this scheduler.
// If false, this scheduler will make RPCs to other schedulers to have them enqueue tasks on their connected
// executors.
scheduleOnConnectedExecutors bool
}

func (s *SchedulerServer) enqueueTaskReservations(ctx context.Context, enqueueRequest *scpb.EnqueueTaskReservationRequest, serializedTask []byte, opts enqueueTaskReservationOpts) error {
Expand All @@ -1228,9 +1242,13 @@ func (s *SchedulerServer) enqueueTaskReservations(ctx context.Context, enqueueRe
return err
}

err = nodeBalancer.AddUnclaimedTask(ctx, enqueueRequest.GetTaskId())
if err != nil {
log.Warningf("Could not add task to unclaimed task list: %s", err)
// We only want to add the unclaimed task once on the "master" scheduler.
// scheduleOnConnectedExecutors implies that we are enqueuing task reservations on behalf of another scheduler.
if !opts.scheduleOnConnectedExecutors {
err = nodeBalancer.AddUnclaimedTask(ctx, enqueueRequest.GetTaskId())
if err != nil {
log.Warningf("Could not add task to unclaimed task list: %s", err)
}
}

probeCount := minInt(opts.numReplicas, nodeCount)
Expand Down Expand Up @@ -1277,7 +1295,7 @@ func (s *SchedulerServer) enqueueTaskReservations(ctx context.Context, enqueueRe
preferredNode = nil
} else {
nodes = nodeBalancer.nodes
if opts.alwaysScheduleLocally {
if opts.scheduleOnConnectedExecutors {
nodes = nodeBalancer.connectedExecutors
}
if len(nodes) == 0 {
Expand All @@ -1300,7 +1318,7 @@ func (s *SchedulerServer) enqueueTaskReservations(ctx context.Context, enqueueRe
enqueueRequest.ExecutorId = node.GetExecutorID()

enqueueStart := time.Now()
if opts.alwaysScheduleLocally {
if opts.scheduleOnConnectedExecutors {
if node.handle == nil {
log.Errorf("nil handle for a local executor %q", node.GetExecutorID())
continue
Expand Down Expand Up @@ -1360,8 +1378,8 @@ func (s *SchedulerServer) ScheduleTask(ctx context.Context, req *scpb.ScheduleTa
}

opts := enqueueTaskReservationOpts{
numReplicas: probesPerTask,
alwaysScheduleLocally: false,
numReplicas: probesPerTask,
scheduleOnConnectedExecutors: false,
}
if err := s.enqueueTaskReservations(ctx, enqueueRequest, req.GetSerializedTask(), opts); err != nil {
return nil, err
Expand All @@ -1373,9 +1391,9 @@ func (s *SchedulerServer) EnqueueTaskReservation(ctx context.Context, req *scpb.
// TODO(vadim): verify user is authorized to use executor pool

opts := enqueueTaskReservationOpts{
numReplicas: 1,
maxAttempts: 10,
alwaysScheduleLocally: true,
numReplicas: 1,
maxAttempts: 10,
scheduleOnConnectedExecutors: true,
}
if err := s.enqueueTaskReservations(ctx, req, nil /*=serializedTask*/, opts); err != nil {
return nil, err
Expand Down Expand Up @@ -1405,8 +1423,8 @@ func (s *SchedulerServer) reEnqueueTask(ctx context.Context, taskID string, numR
SchedulingMetadata: task.metadata,
}
opts := enqueueTaskReservationOpts{
numReplicas: numReplicas,
alwaysScheduleLocally: false,
numReplicas: numReplicas,
scheduleOnConnectedExecutors: false,
}
if err := s.enqueueTaskReservations(ctx, enqueueRequest, task.serializedTask, opts); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions enterprise/server/util/cacheproxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//server/environment",
"//server/interfaces",
"//server/util/alert",
"//server/util/bytebufferpool",
"//server/util/devnull",
"//server/util/grpc_client",
"//server/util/grpc_server",
Expand Down
8 changes: 6 additions & 2 deletions enterprise/server/util/cacheproxy/cacheproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/util/alert"
"github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool"
"github.com/buildbuddy-io/buildbuddy/server/util/devnull"
"github.com/buildbuddy-io/buildbuddy/server/util/grpc_client"
"github.com/buildbuddy-io/buildbuddy/server/util/grpc_server"
Expand All @@ -36,6 +37,7 @@ type CacheProxy struct {
env environment.Env
cache interfaces.Cache
log log.Logger
bufferPool *bytebufferpool.Pool
mu *sync.Mutex
server *grpc.Server
clients map[string]*dcClient
Expand All @@ -49,6 +51,7 @@ func NewCacheProxy(env environment.Env, c interfaces.Cache, listenAddr string) *
env: env,
cache: c,
log: log.NamedSubLogger(fmt.Sprintf("CacheProxy(%s)", listenAddr)),
bufferPool: bytebufferpool.New(readBufSizeBytes),
listenAddr: listenAddr,
mu: &sync.Mutex{},
// server goes here
Expand Down Expand Up @@ -255,8 +258,9 @@ func (c *CacheProxy) Read(req *dcpb.ReadRequest, stream dcpb.DistributedCache_Re
if d.GetSizeBytes() > 0 && d.GetSizeBytes() < bufSize {
bufSize = d.GetSizeBytes()
}
copyBuf := make([]byte, bufSize)
_, err = io.CopyBuffer(&streamWriter{stream}, reader, copyBuf)
copyBuf := c.bufferPool.Get(bufSize)
_, err = io.CopyBuffer(&streamWriter{stream}, reader, copyBuf[:bufSize])
c.bufferPool.Put(copyBuf)
c.log.Debugf("Read(%q) succeeded (user prefix: %s)", IsolationToString(req.GetIsolation())+d.GetHash(), up)
return err
}
Expand Down
2 changes: 2 additions & 0 deletions server/backends/github/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ go_library(
deps = [
"//server/environment",
"//server/tables",
"//server/util/authutil",
"//server/util/log",
"//server/util/perms",
"//server/util/random",
"//server/util/role",
"//server/util/status",
],
)
13 changes: 10 additions & 3 deletions server/backends/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (

"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/tables"
"github.com/buildbuddy-io/buildbuddy/server/util/authutil"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/perms"
"github.com/buildbuddy-io/buildbuddy/server/util/random"
"github.com/buildbuddy-io/buildbuddy/server/util/role"
"github.com/buildbuddy-io/buildbuddy/server/util/status"
)

Expand Down Expand Up @@ -138,8 +140,13 @@ func (c *GithubClient) Link(w http.ResponseWriter, r *http.Request) {

// Restore group ID from cookie.
groupID := getCookie(r, groupIDCookieName)
if err := perms.AuthorizeGroupAccess(r.Context(), c.env, groupID); err != nil {
redirectWithError(w, r, status.PermissionDeniedErrorf("Group auth failed; not linking GitHub account: %s", err.Error()))
u, err := perms.AuthenticatedUser(r.Context(), c.env)
if err != nil {
redirectWithError(w, r, status.WrapError(err, "Failed to link GitHub account"))
return
}
if err := authutil.AuthorizeGroupRole(u, groupID, role.Admin); err != nil {
redirectWithError(w, r, status.WrapError(err, "Failed to link GitHub account"))
return
}

Expand All @@ -150,7 +157,7 @@ func (c *GithubClient) Link(w http.ResponseWriter, r *http.Request) {
}

err = dbHandle.Exec(
"UPDATE Groups SET github_token = ? WHERE group_id = ?",
`UPDATE `+"`Groups`"+` SET github_token = ? WHERE group_id = ?`,
accessTokenResponse.AccessToken, groupID).Error
if err != nil {
redirectWithError(w, r, status.PermissionDeniedErrorf("Error linking github account to user: %v", err))
Expand Down
Loading

0 comments on commit 25e8eac

Please sign in to comment.