Skip to content

Commit

Permalink
feat: add nextgen jobspec (#10)
Browse files Browse the repository at this point in the history
* feat: add nextgen jobspec

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch authored May 2, 2024
1 parent 886aab9 commit 90dd999
Show file tree
Hide file tree
Showing 9 changed files with 515 additions and 3 deletions.
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ COMMONENVVAR=GOOS=$(shell uname -s | tr A-Z a-z)
RELEASE_VERSION?=v$(shell date +%Y%m%d)-$(shell git describe --tags --match "v*")

.PHONY: all
all: example1 example2 example3 example4 example5 example6 createnew exp1 exp2
all: example1 example2 example3 example4 example5 example6 createnew exp1 exp2 ng1

.PHONY: build
build:
go mod tidy
mkdir -p ./examples/v1/bin
mkdir -p ./examples/experimental/bin
mkdir -p ./examples/nextgen/v1/bin

# Build examples
.PHONY: createnew
Expand Down Expand Up @@ -48,8 +49,13 @@ exp1: build
exp2: build
$(COMMONENVVAR) $(BUILDENVVAR) go build -ldflags '-w' -o ./examples/experimental/bin/example2 examples/experimental/example2/example.go

.PHONY: ng1
ng1: build
$(COMMONENVVAR) $(BUILDENVVAR) go build -ldflags '-w' -o ./examples/nextgen/v1/bin/example1 examples/nextgen/v1/example1/example.go


.PHONY: test
test: all
test: build all
./examples/v1/bin/example1
./examples/v1/bin/example2
./examples/v1/bin/example3
Expand All @@ -59,6 +65,7 @@ test: all
./examples/v1/bin/createnew
./examples/experimental/bin/example1
./examples/experimental/bin/example2
./examples/nextgen/v1/bin/example1

.PHONY: clean
clean:
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Flux Jobspec (Go)

This is a simple library that provides go structures for the Flux Framework [Jobspec](https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_25.html) for use in other projects.
This is a simple library that provides go structures for:

- the Flux Framework [Jobspec](https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_25.html) (package [jobspec](pkg/jobspec))
- the [Next Generation Jobspec](https://compspec.github.io/jobspec) (package [nextgen](pkg/nextgen/))

Note for nextgen, since Go is more strict with typing, we accept a parsed JobSpec, meaning that all resources have been defined in the top level named section,
and are referenced by name in tasks. We will start assuming that a request for the resource groups should be satisfied within the same cluster, and each is a separate
match request.

## Usage

Expand Down
52 changes: 52 additions & 0 deletions examples/nextgen/v1/example1/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"flag"
"fmt"
"os"

v1 "github.com/compspec/jobspec-go/pkg/nextgen/v1"
)

func main() {
fmt.Println("This example reads, parses, and validates a Jobspec")

// Assumes running from the root
fileName := flag.String("json", "examples/nextgen/v1/example1/jobspec.yaml", "yaml file")
flag.Parse()

yamlFile := *fileName
if yamlFile == "" {
flag.Usage()
os.Exit(0)
}
js, err := v1.LoadJobspecYaml(yamlFile)
if err != nil {
fmt.Printf("error reading %s:%s\n", yamlFile, err)
os.Exit(1)
}

// Validate the jobspec
valid, err := js.Validate()
if !valid || err != nil {
fmt.Printf("schema is not valid:%s\n", err)
os.Exit(1)
} else {
fmt.Println("schema is valid")
}

out, err := js.JobspecToYaml()
if err != nil {
fmt.Printf("error marshalling %s:%s\n", yamlFile, err)
os.Exit(1)
}
fmt.Println(string(out))

// One example of json
out, err = js.JobspecToJson()
if err != nil {
fmt.Printf("error marshalling %s:%s\n", yamlFile, err)
os.Exit(1)
}
fmt.Println(string(out))
}
70 changes: 70 additions & 0 deletions examples/nextgen/v1/example1/jobspec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
version: 1

# Resources can be used with tasks or task groups
# They are named so can be referenced in multiple places,
# and used with AND and OR
resources:
mini-mummi:
count: 8
type: node
with:
- type: gpu
count: 2
- type: cores
count: 4

mummi-gpu:
count: 2
type: node
with:
- type: gpu
count: 1

mummi-cpu:
count: 2
type: node
with:
- type: cores
count: 4


tasks:
- name: task-1
resources: common
command:
- bash
- -c
- "echo Starting task 1; sleep 3; echo Finishing task 1"

# flux batch...
- group: group-1

# A group is a "flux batch"
groups:
- name: mini-mummi
resources: mini-mummi
tasks:

# flux batch
- group: train

# flux submit to train job
- name: test
replicas: 20
resources: mummi-cpu
command:
- bash
- -c
- echo Running machine learning test

# flux batch from mini-mummi group
- name: train
resources: "mummi-gpu|mummi-cpu"
tasks:

# If a task doesn't have resources, it inherits parent group (uses all)
- name: train
command:
- bash
- -c
- echo running machine learning train
68 changes: 68 additions & 0 deletions pkg/nextgen/v1/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package v1

import (
"fmt"
"strings"
)

// NewSimpleJobSpec generates a simple jobspec for nodes, command, tasks, and (optionally) a name
func NewSimpleJobspec(name, command string, nodes, tasks int32) (*Jobspec, error) {

// If no name provided for the slot, use the first
// work of the command
if name == "" {
parts := strings.Split(command, " ")
name = strings.ToLower(parts[0])
}
if nodes < 1 {
return nil, fmt.Errorf("nodes for the job must be >= 1")
}
if command == "" {
return nil, fmt.Errorf("a command must be provided")
}

// The node resource is what we are asking for
nodeResource := Resource{
Type: "node",
Count: nodes,
}

// The slot is where we are doing an assessment for scheduling
slot := Resource{
Type: "slot",
Count: int32(1),
Label: name,
}

// If tasks are defined, this is total tasks across the nodes
// We add to the slot
if tasks != 0 {
taskResource := Resource{
Type: "core",
Count: tasks,
}
slot.With = []Resource{taskResource}
}

// And then the entire resource spec is added to the top level node resource
nodeResource.With = []Resource{slot}

// Resource name matches resources to named set
resourceName := "task-resources"

// Tasks reference the slot and command
// Note: if we need better split can use "github.com/google/shlex"
cmd := strings.Split(command, " ")
taskResource := Task{
Command: cmd,
Replicas: 1,
Resources: resourceName,
}
tasklist := []Task{taskResource}

return &Jobspec{
Version: jobspecVersion,
Tasks: tasklist,
Resources: Resources{resourceName: nodeResource},
}, nil
}
114 changes: 114 additions & 0 deletions pkg/nextgen/v1/jobspec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package v1

import (
"encoding/json"
"fmt"
"os"

"sigs.k8s.io/yaml"

"github.com/compspec/jobspec-go/pkg/schema"
)

// LoadJobspecYaml loads a jobspec from a yaml file path
func LoadJobspecYaml(yamlFile string) (*Jobspec, error) {
js := Jobspec{}
file, err := os.ReadFile(yamlFile)
if err != nil {
return &js, err
}

err = yaml.Unmarshal([]byte(file), &js)
if err != nil {
return &js, err
}
return &js, nil
}

// JobspectoYaml convets back to yaml (as string)
func (js *Jobspec) JobspecToYaml() (string, error) {
out, err := yaml.Marshal(js)
if err != nil {
return "", err
}
return string(out), nil
}

// GetResources to get resource groups across the jobspec
// this is intended for graph scheduling
func (js *Jobspec) GetResources(data []byte) ([]Resource, error) {

// We assume every discovered resource is a unique satisfy
resources := []Resource{}

// Make sure all task and group resources are known
for _, task := range js.Tasks {
r, err := js.getResources(task)
if err != nil {
return resources, err
}
resources = append(resources, r)
}
for _, group := range js.Groups {
r, err := js.getResources(group)
if err != nil {
return resources, err
}
resources = append(resources, r)
}
return resources, nil
}

// getResources unwraps resources. If there is a named string, we assume
// in reference to a named resource group. We will need a strategy to combine
// these intelligently when we ask for a match - right now just assuming
// separate groups
func (js *Jobspec) getResources(resources interface{}) (Resource, error) {
resource := Resource{}
switch v := resources.(type) {
case string:
resourceKey := resources.(string)
spec, ok := js.Resources[resourceKey]
if !ok {
return resource, fmt.Errorf("task is missing resource")
}
return spec, nil
case Resource:
return resources.(Resource), nil
default:
return resource, fmt.Errorf("type %s is unknown", v)
}
}

// JobspectoJson convets back to json string
func (js *Jobspec) JobspecToJson() (string, error) {
out, err := json.MarshalIndent(js, "", " ")
if err != nil {
return "", err
}
return string(out), nil
}

// Validate converts to bytes and validate with jsonschema
func (js *Jobspec) Validate() (bool, error) {

// Get back into bytes form
out, err := yaml.Marshal(js)
if err != nil {
return false, err
}
// Validate the jobspec
return schema.Validate(out, schema.SchemaUrl, Schema)

}

// Helper function to get a job name, derived from the command
func (js *Jobspec) GetJobName() string {

// Generic name to fall back tp
name := "app"
if js.Name != "" {
name = js.Name
}
return name
}
8 changes: 8 additions & 0 deletions pkg/nextgen/v1/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package v1

import (
_ "embed"
)

//go:embed schema.json
var Schema string
Loading

0 comments on commit 90dd999

Please sign in to comment.