diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index cf745d1f3ae..cfd1c2fc0e1 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -1129,47 +1129,85 @@ func (c *Client) getContextTypeID(ctx context.Context, contextType *pb.ContextTy } func (c *Client) getOrInsertContext(ctx context.Context, name string, contextType *pb.ContextType, customProps map[string]*pb.Value) (*pb.Context, error) { - // The most common case -- the context is already created by upstream tasks. - // So we try to get the context first. - getCtxRes, err := c.svc.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{TypeName: contextType.Name, ContextName: proto.String(name)}) + fmt.Printf("getOrInsertContext: name=%q, type=%q\n", name, contextType.GetName()) - if err != nil { - return nil, fmt.Errorf("Failed GetContextByTypeAndName(type=%q, name=%q)", contextType.GetName(), name) - } - // Bug in MLMD GetContextsByTypeAndName? It doesn't return error even when no - // context was found. - if getCtxRes.Context != nil { - return getCtxRes.Context, nil - } + // This function is known to be flaky and racy right after a server is initially created + // and pipeline runs are just starting, so we retry up to 3 times. The actual cause of the + // race isn't fully understood, but it's probably caused by deadlocks in MLMD SQL code. + // General MySQL advice is to retry on deadlock errors, so here we are. - // Get the ContextType ID. - typeID, err := c.getContextTypeID(ctx, contextType) - if err != nil { - return nil, err - } - // Next, create the Context. - putReq := &pb.PutContextsRequest{ - Contexts: []*pb.Context{ - { - Name: proto.String(name), - TypeId: proto.Int64(typeID), - CustomProperties: customProps, + for i := 1; i <= 3; i++ { + + fmt.Printf("getOrInsertContext: attempt %v\n", i) + // The most common case -- the context is already created by previous tasks. + // So we try to get the context first. + + fmt.Printf("getOrInsertContext: calling GetContextByTypeAndName\n") + + getCtxRes, err := c.svc.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{TypeName: contextType.Name, ContextName: proto.String(name)}) + fmt.Printf("getOrInsertContext: response=%v, err=%v\n", getCtxRes, err) + + // grpc returns a nil response if there was an error + if err != nil { + fmt.Printf("getOrInsertContext: error=%v\n", err) + fmt.Println(fmt.Errorf("failed GetContextByTypeAndName(type=%q, name=%q), error=%w", contextType.GetName(), name, err)) + time.Sleep(1 * time.Second) + continue + } + + if getCtxRes.Context != nil { + fmt.Printf("getOrInsertContext: context found: %v\n", getCtxRes.Context) + return getCtxRes.Context, nil + } + + fmt.Printf("getOrInsertContext: context not found, creating\n") + + // Get the ContextType ID. + typeID, err := c.getContextTypeID(ctx, contextType) + if err != nil { + fmt.Printf("getOrInsertContext: error getting getContextTypeID for creation: %v\n", err) + time.Sleep(1 * time.Second) + continue + } + // Next, create the Context. + putReq := &pb.PutContextsRequest{ + Contexts: []*pb.Context{ + { + Name: proto.String(name), + TypeId: proto.Int64(typeID), + CustomProperties: customProps, + }, }, - }, - } - _, err = c.svc.PutContexts(ctx, putReq) - // It's expected other tasks may try to create the context at the same time, - // so ignore AlreadyExists error. - if err != nil && status.Convert(err).Code() != codes.AlreadyExists { - return nil, fmt.Errorf("Failed PutContext(name=%q, type=%q, typeid=%v): %w", name, contextType.GetName(), typeID, err) - } + } + fmt.Printf("getOrInsertContext: sending create request\n") + _, err = c.svc.PutContexts(ctx, putReq) + // It's expected other tasks may try to create the context at the same time, + // so ignore AlreadyExists error. + if err != nil { + if status.Convert(err).Code() != codes.AlreadyExists { + fmt.Println(fmt.Errorf("failed PutContext(name=%q, type=%q, typeid=%v): %w", name, contextType.GetName(), typeID, err)) + time.Sleep(1 * time.Second) + continue + } else { + fmt.Printf("getOrInsertContext: context now already exists, someone beat us to it\n") + } + } else { + fmt.Printf("getOrInsertContext: context created successfully\n") + } - // Get the created context. - getCtxRes, err = c.svc.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{TypeName: contextType.Name, ContextName: proto.String(name)}) - if err != nil { - return nil, fmt.Errorf("Failed GetContext(name=%q, type=%q): %w", name, contextType.GetName(), err) + // Get the created context. + fmt.Printf("getOrInsertContext: now getting the context we just created\n") + getCtxRes, err = c.svc.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{TypeName: contextType.Name, ContextName: proto.String(name)}) + if err != nil { + fmt.Println(fmt.Errorf("failed GetContext(name=%q, type=%q): %w", name, contextType.GetName(), err)) + time.Sleep(1 * time.Second) + continue + } + fmt.Printf("getOrInsertContext: successfully got context, returning it :)\n") + return getCtxRes.GetContext(), nil } - return getCtxRes.GetContext(), nil + + return nil, fmt.Errorf("failed to get or insert context %q after 3 attempts", name) } func GenerateExecutionConfig(executorInput *pipelinespec.ExecutorInput) (*ExecutionConfig, error) {