Skip to content

Commit

Permalink
Merge pull request #882 from joereuss12/shadow-hooks-with-pelican-sta…
Browse files Browse the repository at this point in the history
…ging-branch

Fixes to pelican plugin staging
  • Loading branch information
joereuss12 authored Apr 4, 2024
2 parents 89258ab + 9d0aa8f commit d77c491
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 83 deletions.
69 changes: 69 additions & 0 deletions classads/classads.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,72 @@ func attributeSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err
}
return 0, nil, nil
}

func ParseShadowClassAd(line string) (ClassAd, error) {
var ad ClassAd
ad.attributes = make(map[string]interface{})

// Trim the spaces and "[" "]"
line = strings.TrimSpace(line)
line = strings.TrimPrefix(line, "[")
line = strings.TrimSuffix(line, "]")

attributeScanner := bufio.NewScanner(strings.NewReader(line))
attributeScanner.Split(attributeShadowSplitFunc)
for attributeScanner.Scan() {
attrStr := attributeScanner.Text()
attrStr = strings.TrimSpace(attrStr)
if attrStr == "" {
continue
}

// Split on the first "="
attrSplit := strings.SplitN(attrStr, "=", 2)
name := strings.TrimSpace(attrSplit[0])

// Check for quoted attribute and remove it
value := strings.TrimSpace(attrSplit[1])

// If the value is quoted, remove the quotes
if strings.HasPrefix(value, "\"") && strings.HasSuffix(value, "\"") {
value = strings.Trim(value, "\"")
}

// Convert the value based on its type
if intValue, err := strconv.Atoi(value); err == nil {
// If the value is a number, we know it's an integer
ad.Set(name, intValue)
} else if floatValue, err := strconv.ParseFloat(value, 64); err == nil {
// If the value is a float, we know it's a float
ad.Set(name, floatValue)
} else if value == "true" || value == "false" {
// If the value is a boolean, we know it's a boolean
ad.Set(name, value == "true")
} else {
// Otherwise, we assume it's a string
ad.Set(name, value)
}
}
return ad, nil
}

// Split the classad by attribute, at the first semi-colon not in quotes
func attributeShadowSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

// Look for the next newline character in the input data
if i := strings.Index(string(data), "\n"); i >= 0 {
// Found a newline character, return the split point
return i + 1, data[0:i], nil
}

// If at end of file and no newline character is found, return the entire remaining data
if atEOF {
return len(data), data, nil
}

// Need more data to find a newline character
return 0, nil, nil
}
3 changes: 3 additions & 0 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func stashPluginMain(args []string) {
isConfigErr = true
}

// Want to try to force logging to stderr because that is how we can see logging in condor starter log
log.SetOutput(os.Stderr)

// Parse command line arguments
var upload bool = false
// Set the options
Expand Down
202 changes: 119 additions & 83 deletions cmd/plugin_stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package main
import (
"context"
"fmt"
"io"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -89,34 +90,18 @@ func stagePluginMain(cmd *cobra.Command, args []string) {
ctx := cmd.Context()

originPrefixStr := param.StagePlugin_OriginPrefix.GetString()
if len(originPrefixStr) == 0 {
log.Errorln("Origin prefix not specified; must be a URL (osdf://...)")
os.Exit(1)
}
originPrefixUri, err := url.Parse(originPrefixStr)
mountPrefixStr := param.StagePlugin_MountPrefix.GetString()
shadowOriginPrefixStr := param.StagePlugin_ShadowOriginPrefix.GetString()

originPrefixUri, err := validatePrefixes(originPrefixStr, mountPrefixStr, shadowOriginPrefixStr)
if err != nil {
log.Errorln("Origin prefix must be a URL (osdf://...):", err)
os.Exit(1)
}
if originPrefixUri.Scheme != "osdf" {
log.Errorln("Origin prefix scheme must be osdf://:", originPrefixUri.Scheme)
log.Errorln("Problem validating provided prefixes:", err)
os.Exit(1)
}

originPrefixPath := path.Clean("/" + originPrefixUri.Host + "/" + originPrefixUri.Path)
log.Debugln("Local origin prefix:", originPrefixPath)

mountPrefixStr := param.StagePlugin_MountPrefix.GetString()
if len(mountPrefixStr) == 0 {
log.Errorln("Mount prefix is required; must be a local path (/mnt/foo/...)")
os.Exit(1)
}

shadowOriginPrefixStr := param.StagePlugin_ShadowOriginPrefix.GetString()
if len(shadowOriginPrefixStr) == 0 {
log.Errorln("Shadow origin prefix is required; must be a URL (osdf://....)")
os.Exit(1)
}

tokenLocation := param.Plugin_Token.GetString()

pb := newProgressBar()
Expand All @@ -128,51 +113,12 @@ func stagePluginMain(cmd *cobra.Command, args []string) {
pb.launchDisplay(ctx)
}

var sources []string
var extraSources []string
isHook := param.StagePlugin_Hook.GetBool()
if isHook {
buffer := make([]byte, 100*1024)
bytesread, err := os.Stdin.Read(buffer)
if err != nil {
log.Errorln("Failed to read ClassAd from stdin:", err)
os.Exit(1)
}
classad, err := classads.ParseClassAd(string(buffer[:bytesread]))
if err != nil {
log.Errorln("Failed to parse ClassAd from stdin: ", err)
os.Exit(1)
}
inputList, err := classad.Get("TransferInput")
if err != nil || inputList == nil {
// No TransferInput, no need to transform...
os.Exit(0)
}
inputListStr, ok := inputList.(string)
if !ok {
log.Errorln("TransferInput is not a string")
os.Exit(1)
}
re := regexp.MustCompile(`[,\s]+`)
for _, source := range re.Split(inputListStr, -1) {
log.Debugln("Examining transfer input file", source)
if strings.HasPrefix(source, mountPrefixStr) {
sources = append(sources, source)
} else {
// Replace the osdf:// prefix with the local mount path
source_uri, err := url.Parse(source)
source_uri_scheme := strings.SplitN(source_uri.Scheme, "+", 2)[0]
if err == nil && source_uri_scheme == "osdf" {
source_path := path.Clean("/" + source_uri.Host + "/" + source_uri.Path)
if strings.HasPrefix(source_path, originPrefixPath) {
sources = append(sources, mountPrefixStr+source_path[len(originPrefixPath):])
continue
}
}
extraSources = append(extraSources, source)
}
}
} else {
var sources, extraSources []string
var exitCode int

// If not a condor hook, our souces come from our args
if !isHook {
log.Debugln("Len of source:", len(args))
if len(args) < 1 {
log.Errorln("No ingest sources")
Expand All @@ -182,11 +128,39 @@ func stagePluginMain(cmd *cobra.Command, args []string) {
os.Exit(1)
}
sources = args
log.Debugln("Sources:", sources)
} else { // Otherwise, parse the classad for our sources
// We pass in stdin here because that is how we get the classad
sources, extraSources, err, exitCode = processTransferInput(os.Stdin, mountPrefixStr, originPrefixPath)
if err != nil {
log.Errorln("Failure to get sources from job's classad:", err)
os.Exit(exitCode)
}
}
log.Debugln("Sources:", sources)

var result error
var xformSources []string

xformSources, result = doPluginStaging(sources, extraSources, mountPrefixStr, shadowOriginPrefixStr, tokenLocation)
// Exit with failure
if result != nil {
// Print the list of errors
log.Errorln("Failure in staging files:", result)
if client.ShouldRetry(result) {
log.Errorln("Errors are retryable")
os.Exit(11)
}
os.Exit(1)
}
// If we are a condor hook, we need to print the classad change out. Condor will notice it and handle the rest
if isHook {
printOutput(xformSources, extraSources)
}
}

// This function performs the actual "staging" on the specified shadow origin
func doPluginStaging(sources []string, extraSources []string, mountPrefixStr, shadowOriginPrefixStr, tokenLocation string) (xformSources []string, result error) {

for _, src := range sources {
newSource := ""
_, newSource, result = client.DoShadowIngest(context.Background(), src, mountPrefixStr, shadowOriginPrefixStr, client.WithTokenLocation(tokenLocation), client.WithAcquireToken(false))
Expand All @@ -203,23 +177,85 @@ func stagePluginMain(cmd *cobra.Command, args []string) {
xformSources = append(xformSources, newSource)
}

// Exit with failure
if result != nil {
// Print the list of errors
log.Errorln("Failure in staging files:", result)
if client.ShouldRetry(result) {
log.Errorln("Errors are retryable")
os.Exit(11)
}
os.Exit(1)
return xformSources, result
}

// This function is used to print our changes out in the case we are a condor hook
func printOutput(xformSources []string, extraSources []string) {
inputsStr := strings.Join(extraSources, ", ")
if len(extraSources) > 0 && len(xformSources) > 0 {
inputsStr = inputsStr + ", " + strings.Join(xformSources, ", ")
} else if len(xformSources) > 0 {
inputsStr = strings.Join(xformSources, ", ")
}
if isHook {
inputsStr := strings.Join(extraSources, ", ")
if len(extraSources) > 0 && len(xformSources) > 0 {
inputsStr = inputsStr + ", " + strings.Join(xformSources, ", ")
} else if len(xformSources) > 0 {
inputsStr = strings.Join(xformSources, ", ")
fmt.Printf("TransferInput = \"%s\"", inputsStr)
}

// This function is utilized to validate the arguments passed in to ensure they exist and are in the correct format
func validatePrefixes(originPrefixStr string, mountPrefixStr string, shadowOriginPrefixStr string) (originPrefixUri *url.URL, err error) {
if len(originPrefixStr) == 0 {
return nil, fmt.Errorf("Origin prefix not specified; must be a URL (osdf://...)")
}

originPrefixUri, err = url.Parse(originPrefixStr)
if err != nil {
return nil, fmt.Errorf("Origin prefix must be a URL (osdf://...): %v", err)
}
if originPrefixUri.Scheme != "osdf" {
return nil, fmt.Errorf("Origin prefix scheme must be osdf://: %s", originPrefixUri.Scheme)
}

if len(mountPrefixStr) == 0 {
return nil, fmt.Errorf("Mount prefix is required; must be a local path (/mnt/foo/...)")
}
if len(shadowOriginPrefixStr) == 0 {
return nil, fmt.Errorf("Shadow origin prefix is required; must be a URL (osdf://....)")
}

return originPrefixUri, nil
}

// This function is used when we are using a condor hook and need to get our sources from the "TransferInput" classad
// We return our sources, any extra sources, an err, and the exit code (since we have a case to exit 0)
// Note: we pass in a reader for testability but the main function will always pass stdin to get the classad
func processTransferInput(reader io.Reader, mountPrefixStr string, originPrefixPath string) (sources []string, extraSources []string, err error, exitCode int) {
buffer := make([]byte, 100*1024)
bytesread, err := reader.Read(buffer)
if err != nil {
return nil, nil, fmt.Errorf("Failed to read ClassAd from stdin: %v", err), 1
}
classad, err := classads.ParseShadowClassAd(string(buffer[:bytesread]))
if err != nil {
return nil, nil, fmt.Errorf("Failed to parse ClassAd from stdin: %v", err), 1
}
inputList, err := classad.Get("TransferInput")
if err != nil || inputList == nil {
// No TransferInput, no need to transform therefore we exit(0)
return nil, nil, fmt.Errorf("No transfer input found in classad, no need to transform."), 0
}
inputListStr, ok := inputList.(string)
if !ok {
return nil, nil, fmt.Errorf("TransferInput is not a string"), 1
}
re := regexp.MustCompile(`[,\s]+`)
for _, source := range re.Split(inputListStr, -1) {
log.Debugln("Examining transfer input file", source)
if strings.HasPrefix(source, mountPrefixStr) {
sources = append(sources, source)
} else {
// Replace the osdf:// prefix with the local mount path
source_uri, err := url.Parse(source)
source_uri_scheme := strings.SplitN(source_uri.Scheme, "+", 2)[0]
if err == nil && source_uri_scheme == "osdf" {
source_path := path.Clean("/" + source_uri.Host + "/" + source_uri.Path)
if strings.HasPrefix(source_path, originPrefixPath) {
sources = append(sources, mountPrefixStr+source_path[len(originPrefixPath):])
continue
}
}
extraSources = append(extraSources, source)
}
fmt.Printf("TransferInput = \"%s\"", inputsStr)
}
log.Debugln("Sources:", sources)
return sources, extraSources, nil, 0
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ func InitConfig() {
log.Errorf("Failed to access specified log file. Error: %v", err)
os.Exit(1)
}

fmt.Fprintf(os.Stderr, "Logging.LogLocation is set to %s. All logs are redirected to the log file.\n", logLocation)
log.SetOutput(f)
}
Expand Down

0 comments on commit d77c491

Please sign in to comment.