diff --git a/pkg/resourcemanager/delegate.go b/pkg/resourcemanager/delegate.go index 6665e4da..e391d44b 100644 --- a/pkg/resourcemanager/delegate.go +++ b/pkg/resourcemanager/delegate.go @@ -31,7 +31,7 @@ import ( "sort" "strconv" "strings" - + "sync" "time" "github.com/grycap/oscar/v3/pkg/types" @@ -46,7 +46,7 @@ const ( // tokenCache map to store tokens from services and endpoints -> [CLUSTER_ENDPOINT][SERVICE_NAME] var tokenCache = map[string]map[string]string{} -//var mutex sync.Mutex +var mutex sync.Mutex // DelegatedEvent wraps the original input event by adding the storage provider ID type DelegatedEvent struct { @@ -258,8 +258,8 @@ func reorganizeIfNearby(alternatives []Alternative, distances []float64, thresho func DelegateJob(service *types.Service, event string, logger *log.Logger) error { //Block access before executing the function - //mutex.Lock() - //defer mutex.Unlock() + mutex.Lock() + defer mutex.Unlock() //Determine priority level of each replica to delegate if service.Delegation == "topsis" { @@ -445,144 +445,144 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error fmt.Println("Storage_provider : ", provider) fmt.Println(string(eventJSON)) - if provider != "" { //storatage _provider not empty in the delegation proccess. - - for _, replica := range service.Replicas { - // Manage if replica.Type is "oscar" and have the capacity to receive a service - fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority) - if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode { - // Check ClusterID is defined in 'Clusters' - fmt.Println("Delegating ...") - cluster, ok := service.Clusters[replica.ClusterID] - if !ok { - logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID) - continue - } - - // Get token - token, err := getServiceToken(replica, cluster) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) - continue - } + //if provider != "" { //storatage _provider not empty in the delegation proccess. - // Parse the cluster's endpoint URL and add the service's path - postJobURL, err := url.Parse(cluster.Endpoint) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err) - continue - } - postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName) + for _, replica := range service.Replicas { + // Manage if replica.Type is "oscar" and have the capacity to receive a service + fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority) + if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode { + // Check ClusterID is defined in 'Clusters' + fmt.Println("Delegating ...") + cluster, ok := service.Clusters[replica.ClusterID] + if !ok { + logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID) + continue + } - // Make request to get service's definition (including token) from cluster - //fmt.Println(string(eventJSON)) - req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON)) + // Get token + token, err := getServiceToken(replica, cluster) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) + continue + } - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err) - continue - } + // Parse the cluster's endpoint URL and add the service's path + postJobURL, err := url.Parse(cluster.Endpoint) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err) + continue + } + postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName) - // Add Headers - for k, v := range replica.Headers { - req.Header.Add(k, v) - } + // Make request to get service's definition (including token) from cluster + //fmt.Println(string(eventJSON)) + req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON)) - // Add service token to the request - req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err) + continue + } - // Make HTTP client + // Add Headers + for k, v := range replica.Headers { + req.Header.Add(k, v) + } - var transport http.RoundTripper = &http.Transport{ - // Enable/disable SSL verification - TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, - } + // Add service token to the request + req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) - client := &http.Client{ - Transport: transport, - Timeout: time.Second * 20, - } + // Make HTTP client - // Send the request - res, err := client.Do(req) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) - continue - } + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, + } - // Check status code - if res.StatusCode == http.StatusCreated { - logger.Printf("Job successfully delegated to cluster \"%s\"\n", replica.ClusterID) - return nil - } else if res.StatusCode == http.StatusUnauthorized { - // Retry updating the token - token, err := updateServiceToken(replica, cluster) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) - continue - } - // Add service token to the request - req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } - // Send the request - res, err = client.Do(req) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) - continue - } - } - log.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": Status code %d\n", service.Name, replica.ClusterID, res.StatusCode) + // Send the request + res, err := client.Do(req) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) + continue } - // Manage if replica.Type is "endpoint" - if strings.ToLower(replica.Type) == endpointReplicaType { - // Parse the replica URL to check if it's valid - replicaURL, err := url.Parse(replica.URL) + // Check status code + if res.StatusCode == http.StatusCreated { + logger.Printf("Job successfully delegated to cluster \"%s\"\n", replica.ClusterID) + return nil + } else if res.StatusCode == http.StatusUnauthorized { + // Retry updating the token + token, err := updateServiceToken(replica, cluster) if err != nil { - logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to parse URL: %v\n", service.Name, replica.URL, err) + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) continue } + // Add service token to the request + req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) - // Make request to get service's definition (including token) from cluster - req, err := http.NewRequest(http.MethodPost, replicaURL.String(), bytes.NewBuffer(eventJSON)) + // Send the request + res, err = client.Do(req) if err != nil { - logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to make request: %v\n", service.Name, replica.URL, err) + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) continue } + } + log.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": Status code %d\n", service.Name, replica.ClusterID, res.StatusCode) + } - // Add Headers - for k, v := range replica.Headers { - req.Header.Add(k, v) - } + // Manage if replica.Type is "endpoint" + if strings.ToLower(replica.Type) == endpointReplicaType { + // Parse the replica URL to check if it's valid + replicaURL, err := url.Parse(replica.URL) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to parse URL: %v\n", service.Name, replica.URL, err) + continue + } - // Make HTTP client - var transport http.RoundTripper = &http.Transport{ - // Enable/disable SSL verification - TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify}, - } - client := &http.Client{ - Transport: transport, - Timeout: time.Second * 20, - } + // Make request to get service's definition (including token) from cluster + req, err := http.NewRequest(http.MethodPost, replicaURL.String(), bytes.NewBuffer(eventJSON)) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to make request: %v\n", service.Name, replica.URL, err) + continue + } - // Send the request - res, err := client.Do(req) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to send request: %v\n", service.Name, replica.URL, err) - continue - } + // Add Headers + for k, v := range replica.Headers { + req.Header.Add(k, v) + } - // Check status code - if res.StatusCode == http.StatusOK { - logger.Printf("Job successfully delegated to endpoint \"%s\"\n", replica.URL) - return nil - } - logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode) + // Make HTTP client + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify}, } + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } + + // Send the request + res, err := client.Do(req) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to send request: %v\n", service.Name, replica.URL, err) + continue + } + + // Check status code + if res.StatusCode == http.StatusOK { + logger.Printf("Job successfully delegated to endpoint \"%s\"\n", replica.URL) + return nil + } + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode) } - } else { - fmt.Println("Error by Storage_Provider empty.") } + //} else { + // fmt.Println("Error by Storage_Provider empty.") + //} return fmt.Errorf("unable to delegate job from service \"%s\" to any replica, scheduling in the current cluster", service.Name) }