Skip to content

Commit

Permalink
error with storage_provider empty
Browse files Browse the repository at this point in the history
  • Loading branch information
vicente87 committed Dec 16, 2024
1 parent 86c17ac commit c397dad
Showing 1 changed file with 115 additions and 115 deletions.
230 changes: 115 additions & 115 deletions pkg/resourcemanager/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"sort"
"strconv"
"strings"

"sync"
"time"

"github.com/grycap/oscar/v3/pkg/types"
Expand All @@ -46,7 +46,7 @@ const (
// tokenCache map to store tokens from services and endpoints -> [CLUSTER_ENDPOINT][SERVICE_NAME]
var tokenCache = map[string]map[string]string{}

//var mutex sync.Mutex
var mutex sync.Mutex

// DelegatedEvent wraps the original input event by adding the storage provider ID
type DelegatedEvent struct {
Expand Down Expand Up @@ -258,8 +258,8 @@ func reorganizeIfNearby(alternatives []Alternative, distances []float64, thresho
func DelegateJob(service *types.Service, event string, logger *log.Logger) error {

//Block access before executing the function
//mutex.Lock()
//defer mutex.Unlock()
mutex.Lock()
defer mutex.Unlock()

//Determine priority level of each replica to delegate
if service.Delegation == "topsis" {
Expand Down Expand Up @@ -445,144 +445,144 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error
fmt.Println("Storage_provider : ", provider)
fmt.Println(string(eventJSON))

if provider != "" { //storatage _provider not empty in the delegation proccess.

for _, replica := range service.Replicas {
// Manage if replica.Type is "oscar" and have the capacity to receive a service
fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority)
if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode {
// Check ClusterID is defined in 'Clusters'
fmt.Println("Delegating ...")
cluster, ok := service.Clusters[replica.ClusterID]
if !ok {
logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID)
continue
}

// Get token
token, err := getServiceToken(replica, cluster)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err)
continue
}
//if provider != "" { //storatage _provider not empty in the delegation proccess.

// Parse the cluster's endpoint URL and add the service's path
postJobURL, err := url.Parse(cluster.Endpoint)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err)
continue
}
postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName)
for _, replica := range service.Replicas {
// Manage if replica.Type is "oscar" and have the capacity to receive a service
fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority)
if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode {
// Check ClusterID is defined in 'Clusters'
fmt.Println("Delegating ...")
cluster, ok := service.Clusters[replica.ClusterID]
if !ok {
logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID)
continue
}

// Make request to get service's definition (including token) from cluster
//fmt.Println(string(eventJSON))
req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON))
// Get token
token, err := getServiceToken(replica, cluster)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err)
continue
}

if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err)
continue
}
// Parse the cluster's endpoint URL and add the service's path
postJobURL, err := url.Parse(cluster.Endpoint)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err)
continue
}
postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName)

// Add Headers
for k, v := range replica.Headers {
req.Header.Add(k, v)
}
// Make request to get service's definition (including token) from cluster
//fmt.Println(string(eventJSON))
req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON))

// Add service token to the request
req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token))
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err)
continue
}

// Make HTTP client
// Add Headers
for k, v := range replica.Headers {
req.Header.Add(k, v)
}

var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify},
}
// Add service token to the request
req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token))

client := &http.Client{
Transport: transport,
Timeout: time.Second * 20,
}
// Make HTTP client

// Send the request
res, err := client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err)
continue
}
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify},
}

// Check status code
if res.StatusCode == http.StatusCreated {
logger.Printf("Job successfully delegated to cluster \"%s\"\n", replica.ClusterID)
return nil
} else if res.StatusCode == http.StatusUnauthorized {
// Retry updating the token
token, err := updateServiceToken(replica, cluster)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err)
continue
}
// Add service token to the request
req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token))
client := &http.Client{
Transport: transport,
Timeout: time.Second * 20,
}

// Send the request
res, err = client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err)
continue
}
}
log.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": Status code %d\n", service.Name, replica.ClusterID, res.StatusCode)
// Send the request
res, err := client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err)
continue
}

// Manage if replica.Type is "endpoint"
if strings.ToLower(replica.Type) == endpointReplicaType {
// Parse the replica URL to check if it's valid
replicaURL, err := url.Parse(replica.URL)
// Check status code
if res.StatusCode == http.StatusCreated {
logger.Printf("Job successfully delegated to cluster \"%s\"\n", replica.ClusterID)
return nil
} else if res.StatusCode == http.StatusUnauthorized {
// Retry updating the token
token, err := updateServiceToken(replica, cluster)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to parse URL: %v\n", service.Name, replica.URL, err)
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err)
continue
}
// Add service token to the request
req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token))

// Make request to get service's definition (including token) from cluster
req, err := http.NewRequest(http.MethodPost, replicaURL.String(), bytes.NewBuffer(eventJSON))
// Send the request
res, err = client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to make request: %v\n", service.Name, replica.URL, err)
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err)
continue
}
}
log.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": Status code %d\n", service.Name, replica.ClusterID, res.StatusCode)
}

// Add Headers
for k, v := range replica.Headers {
req.Header.Add(k, v)
}
// Manage if replica.Type is "endpoint"
if strings.ToLower(replica.Type) == endpointReplicaType {
// Parse the replica URL to check if it's valid
replicaURL, err := url.Parse(replica.URL)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to parse URL: %v\n", service.Name, replica.URL, err)
continue
}

// Make HTTP client
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify},
}
client := &http.Client{
Transport: transport,
Timeout: time.Second * 20,
}
// Make request to get service's definition (including token) from cluster
req, err := http.NewRequest(http.MethodPost, replicaURL.String(), bytes.NewBuffer(eventJSON))
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to make request: %v\n", service.Name, replica.URL, err)
continue
}

// Send the request
res, err := client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to send request: %v\n", service.Name, replica.URL, err)
continue
}
// Add Headers
for k, v := range replica.Headers {
req.Header.Add(k, v)
}

// Check status code
if res.StatusCode == http.StatusOK {
logger.Printf("Job successfully delegated to endpoint \"%s\"\n", replica.URL)
return nil
}
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode)
// Make HTTP client
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify},
}
client := &http.Client{
Transport: transport,
Timeout: time.Second * 20,
}

// Send the request
res, err := client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to send request: %v\n", service.Name, replica.URL, err)
continue
}

// Check status code
if res.StatusCode == http.StatusOK {
logger.Printf("Job successfully delegated to endpoint \"%s\"\n", replica.URL)
return nil
}
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode)
}
} else {
fmt.Println("Error by Storage_Provider empty.")
}
//} else {
// fmt.Println("Error by Storage_Provider empty.")
//}

return fmt.Errorf("unable to delegate job from service \"%s\" to any replica, scheduling in the current cluster", service.Name)
}
Expand Down

0 comments on commit c397dad

Please sign in to comment.