Skip to content

Commit

Permalink
fix(backend): add retries to getOrInsertContext in MLMD client
Browse files Browse the repository at this point in the history
This function is known to be flaky and racy right after a server is initially created
and pipeline runs are just starting, so we retry up to 3 times. The actual cause of the
race isn't fully understood, but it's probably caused by deadlocks in MLMD SQL code.
General MySQL advice is to retry on deadlock errors.

Signed-off-by: Greg Sheremeta <[email protected]>
  • Loading branch information
gregsheremeta committed Oct 30, 2024
1 parent 57a2823 commit 927dae7
Showing 1 changed file with 74 additions and 36 deletions.
110 changes: 74 additions & 36 deletions backend/src/v2/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,47 +1129,85 @@ func (c *Client) getContextTypeID(ctx context.Context, contextType *pb.ContextTy
}

func (c *Client) getOrInsertContext(ctx context.Context, name string, contextType *pb.ContextType, customProps map[string]*pb.Value) (*pb.Context, error) {
// The most common case -- the context is already created by upstream tasks.
// So we try to get the context first.
getCtxRes, err := c.svc.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{TypeName: contextType.Name, ContextName: proto.String(name)})
fmt.Printf("getOrInsertContext: name=%q, type=%q\n", name, contextType.GetName())

if err != nil {
return nil, fmt.Errorf("Failed GetContextByTypeAndName(type=%q, name=%q)", contextType.GetName(), name)
}
// Bug in MLMD GetContextsByTypeAndName? It doesn't return error even when no
// context was found.
if getCtxRes.Context != nil {
return getCtxRes.Context, nil
}
// This function is known to be flaky and racy right after a server is initially created
// and pipeline runs are just starting, so we retry up to 3 times. The actual cause of the
// race isn't fully understood, but it's probably caused by deadlocks in MLMD SQL code.
// General MySQL advice is to retry on deadlock errors, so here we are.

// Get the ContextType ID.
typeID, err := c.getContextTypeID(ctx, contextType)
if err != nil {
return nil, err
}
// Next, create the Context.
putReq := &pb.PutContextsRequest{
Contexts: []*pb.Context{
{
Name: proto.String(name),
TypeId: proto.Int64(typeID),
CustomProperties: customProps,
for i := 1; i <= 3; i++ {

fmt.Printf("getOrInsertContext: attempt %v\n", i)
// The most common case -- the context is already created by previous tasks.
// So we try to get the context first.

fmt.Printf("getOrInsertContext: calling GetContextByTypeAndName\n")

getCtxRes, err := c.svc.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{TypeName: contextType.Name, ContextName: proto.String(name)})
fmt.Printf("getOrInsertContext: response=%v, err=%v\n", getCtxRes, err)

// grpc returns a nil response if there was an error
if err != nil {
fmt.Printf("getOrInsertContext: error=%v\n", err)
fmt.Println(fmt.Errorf("failed GetContextByTypeAndName(type=%q, name=%q), error=%w", contextType.GetName(), name, err))
time.Sleep(1 * time.Second)
continue
}

if getCtxRes.Context != nil {
fmt.Printf("getOrInsertContext: context found: %v\n", getCtxRes.Context)
return getCtxRes.Context, nil
}

fmt.Printf("getOrInsertContext: context not found, creating\n")

// Get the ContextType ID.
typeID, err := c.getContextTypeID(ctx, contextType)
if err != nil {
fmt.Printf("getOrInsertContext: error getting getContextTypeID for creation: %v\n", err)
time.Sleep(1 * time.Second)
continue
}
// Next, create the Context.
putReq := &pb.PutContextsRequest{
Contexts: []*pb.Context{
{
Name: proto.String(name),
TypeId: proto.Int64(typeID),
CustomProperties: customProps,
},
},
},
}
_, err = c.svc.PutContexts(ctx, putReq)
// It's expected other tasks may try to create the context at the same time,
// so ignore AlreadyExists error.
if err != nil && status.Convert(err).Code() != codes.AlreadyExists {
return nil, fmt.Errorf("Failed PutContext(name=%q, type=%q, typeid=%v): %w", name, contextType.GetName(), typeID, err)
}
}
fmt.Printf("getOrInsertContext: sending create request\n")
_, err = c.svc.PutContexts(ctx, putReq)
// It's expected other tasks may try to create the context at the same time,
// so ignore AlreadyExists error.
if err != nil {
if status.Convert(err).Code() != codes.AlreadyExists {
fmt.Println(fmt.Errorf("failed PutContext(name=%q, type=%q, typeid=%v): %w", name, contextType.GetName(), typeID, err))
time.Sleep(1 * time.Second)
continue
} else {
fmt.Printf("getOrInsertContext: context now already exists, someone beat us to it\n")
}
} else {
fmt.Printf("getOrInsertContext: context created successfully\n")
}

// Get the created context.
getCtxRes, err = c.svc.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{TypeName: contextType.Name, ContextName: proto.String(name)})
if err != nil {
return nil, fmt.Errorf("Failed GetContext(name=%q, type=%q): %w", name, contextType.GetName(), err)
// Get the created context.
fmt.Printf("getOrInsertContext: now getting the context we just created\n")
getCtxRes, err = c.svc.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{TypeName: contextType.Name, ContextName: proto.String(name)})
if err != nil {
fmt.Println(fmt.Errorf("failed GetContext(name=%q, type=%q): %w", name, contextType.GetName(), err))
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("getOrInsertContext: successfully got context, returning it :)\n")
return getCtxRes.GetContext(), nil
}
return getCtxRes.GetContext(), nil

return nil, fmt.Errorf("failed to get or insert context %q after 3 attempts", name)
}

func GenerateExecutionConfig(executorInput *pipelinespec.ExecutorInput) (*ExecutionConfig, error) {
Expand Down

0 comments on commit 927dae7

Please sign in to comment.