Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task State / UI wip #281

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16,714 changes: 16,714 additions & 0 deletions cloud/package-lock.json

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions cloud/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"version": "0.1.0",
"private": true,
"dependencies": {
"@babel/eslint-parser": "^7.12.1",
"@fortawesome/fontawesome-svg-core": "^1.2.28",
"@fortawesome/free-brands-svg-icons": "^5.13.0",
"@fortawesome/free-regular-svg-icons": "^5.13.0",
Expand All @@ -18,8 +19,8 @@
"@types/react-router-dom": "^5.1.5",
"@types/react-syntax-highlighter": "^11.0.4",
"@types/redux-logger": "^3.0.7",
"eslint-webpack-plugin": "^2.4.3",
"lodash": "^4.17.19",
"pg-promise": "^9.3.6",
"polished": "^3.6.3",
"query-string": "^6.11.1",
"react": "^16.13.0",
Expand All @@ -41,7 +42,7 @@
"utility-types": "^3.10.0"
},
"scripts": {
"start": "./scripts/start.sh",
"start": "REACT_APP_AGENT_URL=http://localhost:1339/ws PORT=1339 react-scripts start",
"build": "react-scripts build",
"test": "react-scripts test",
"eject": "react-scripts eject"
Expand Down
25 changes: 0 additions & 25 deletions cloud/scripts/start.sh

This file was deleted.

2 changes: 1 addition & 1 deletion cloud/src/PipeClientStore.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import tasks from './store/tasks'
import { Store } from 'redux'

const defaultStoreConfig: StoreConfig = {
logging: false // redux-logger
logging: true // redux-logger
}

export class PipeClientStore extends PipeClient {
Expand Down
6 changes: 4 additions & 2 deletions cloud/src/components/task/Task.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import TaskError from './TaskError'
import TaskResult from './TaskResult'
import TaskInputs from './TaskInputs'
import TaskSubTasks from './TaskSubTasks'
import TaskState from './TaskState'
import { TaskHeader, TaskHeaderLink, TaskImage, TaskWrapper, TaskTitleWrapper, TaskCreatedAt, TaskTitle } from './styled/Task'
import { Task as TaskInterface } from '../../store/tasks/types'
import { formatDate } from '../../utils'
Expand All @@ -19,7 +20,7 @@ type TaskProps = TaskInterface & {
export type TaskComponent = React.FC<TaskProps>

export const Task: TaskComponent = (props: TaskProps) => {
const { id, status, image, parent, result, error, sub_tasks, inputs, maxLogHeight, created_at } = props
const { id, status, image, parent, result, error, state, sub_tasks, inputs, maxLogHeight, created_at } = props

return <TaskWrapper>
<TaskHeader>
Expand All @@ -37,10 +38,11 @@ export const Task: TaskComponent = (props: TaskProps) => {
<TaskParent parent={parent} />
<TaskError error={error} />
<TaskInputs inputs={inputs} />
<TaskState state={state} />
<TaskSubTasks sub_tasks={sub_tasks} />
<TaskLog id={id} maxHeight={maxLogHeight} />
<TaskResult result={result} />
</TaskWrapper>
}

export default Task
export default Task
18 changes: 18 additions & 0 deletions cloud/src/components/task/TaskState.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import React from 'react'
import { ContentBlock, Code } from '../ui'

type Props = {
state?: object
}

export const TaskState: React.FC<Props> = ({ state }) => {
if (!state) {
return null
}
return <ContentBlock>
<h4>State</h4>
<Code language="json">{JSON.stringify(state, null, 4)}</Code>
</ContentBlock>
}

export default TaskState
14 changes: 14 additions & 0 deletions cloud/src/store/tasks/reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ const reducer: Reducer<TaskState> = (state = initialState, action) => {
}
}

case TaskActionTypes.STATE: {
const { id, state: taskState } = action.payload
return {
...state,
items: {
...state.items,
[id]: {
...state.items[id],
state: taskState,
},
},
}
}

case TaskActionTypes.CLEAR: {
return initialState
}
Expand Down
4 changes: 3 additions & 1 deletion cloud/src/store/tasks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export enum TaskActionTypes {
STATUS = '@@task/STATUS',
RETURN = '@@task/RETURN',
FAIL = '@@task/FAIL',
STATE = '@@task/STATE',
LOG = '@@task/LOG',
CLEAR = '@@task/CLEAR',
STOP_REQUEST = '@@task/STOP_REQUEST',
Expand All @@ -28,7 +29,8 @@ export interface Task {
routes: object,
status: string,
upstream: string,
sub_tasks: string[]
sub_tasks: string[],
state: object,
}

export interface TaskState {
Expand Down
10 changes: 9 additions & 1 deletion cowait/cli/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from cowait.tasks import TaskDefinition
from cowait.engine.errors import TaskCreationError, ProviderError
from cowait.utils import parse_task_image_name
from cowait.tasks.messages import TASK_INIT, TASK_STATUS, TASK_FAIL, TASK_RETURN, TASK_LOG
from cowait.tasks.messages import TASK_INIT, TASK_STATUS, TASK_FAIL, TASK_RETURN, TASK_LOG, TASK_STATE
from ..config import Config
from ..context import Context
from ..utils import ExitTrap
Expand Down Expand Up @@ -158,6 +158,8 @@ def handle(self, msg):
self.on_fail(**msg)
elif type == TASK_STATUS:
pass
elif type == TASK_STATE:
self.on_state(**msg)
elif type == TASK_LOG:
self.on_log(**msg)

Expand Down Expand Up @@ -229,3 +231,9 @@ def on_log(self, id: str, file: str, data: str, ts: str = None, **msg):
self.print_time(ts)
self.print_id(id)
self.println(' ', data)

def on_state(self, id: str, state: dict):
self.print_time(ts)
self.print_id(id)
self.json(state)

1 change: 1 addition & 0 deletions cowait/notebook/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async def init_node(self):

# instantiate kernel task
self.task = KernelTask(node=self.node, cluster=cluster, taskdef=taskdef)
Task.set_current(self.task)

# write globals
self.shell.push({
Expand Down
7 changes: 6 additions & 1 deletion cowait/tasks/agent/tasklist.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from cowait.network import Conn
from ..instance import TaskInstance
from ..messages import TASK_INIT, TASK_STATUS, TASK_RETURN, TASK_FAIL, TASK_LOG
from ..messages import TASK_INIT, TASK_STATUS, TASK_RETURN, TASK_STATE, TASK_FAIL, TASK_LOG


class TaskList(dict):
Expand All @@ -10,6 +10,7 @@ def __init__(self, task):
task.node.server.on(TASK_INIT, self.on_init)
task.node.server.on(TASK_STATUS, self.on_status)
task.node.server.on(TASK_RETURN, self.on_return)
task.node.server.on(TASK_STATE, self.on_state)
task.node.server.on(TASK_FAIL, self.on_fail)
task.node.server.on(TASK_LOG, self.on_log)

Expand All @@ -28,6 +29,10 @@ async def on_return(self, conn: Conn, id, result, **msg):
if id in self:
self[id].result = result

async def on_state(self, conn: Conn, id, state, **msg):
if id in self:
self[id].state = state

async def on_log(self, conn: Conn, id, file, data, **msg):
if id in self:
if not hasattr(self[id], 'log') or self[id].log is None:
Expand Down
5 changes: 4 additions & 1 deletion cowait/tasks/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ def __init__(
status: str = WAIT,
error: str = None,
result: any = None,
state: dict = {},
log: str = '',
**data,
):
super().__init__(**data)
self.status = status
self.error = error
self.result = result
self.state = state
self.log = log

def serialize(self) -> dict:
Expand All @@ -26,6 +28,7 @@ def serialize(self) -> dict:
'status': self.status,
'error': self.error,
'result': self.result,
'state': self.state,
'log': self.log,
}

Expand All @@ -38,6 +41,6 @@ def deserialize(instance: dict) -> TaskInstance:
'status': instance.get('status', WAIT),
'error': instance.get('error', None),
'result': instance.get('result', None),
'state': instance.get('state', {}),
'log': instance.get('log', None),
})

1 change: 1 addition & 0 deletions cowait/tasks/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
TASK_STATUS = 'task/status'
TASK_RETURN = 'task/return'
TASK_STATS = 'task/stats'
TASK_STATE = 'task/state'
18 changes: 12 additions & 6 deletions cowait/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, **inputs):
# (at least less confusing) when invoking subtasks using constructor syntax.
# However, subtasks will actually never be instantiated. The constructor call is
# diverted by the runtime in Task.__new__().
# Tasks should only be constructed by the executor, and it will these 3 arguments:
# Tasks should only be constructed by the executor, and it will use these 3 arguments:
if 'taskdef' not in inputs or 'node' not in inputs or \
'cluster' not in inputs or len(inputs) != 3:
raise RuntimeError('Invalid task class instantiation')
Expand All @@ -32,9 +32,8 @@ def __init__(self, **inputs):
self.parent = ParentTask(self.node)
self.subtasks = TaskManager(self)
self.rpc = RpcComponent(self)

# Set this task as the current active task
Task.set_current(self)
self.state = {}
self.ui = []

def __new__(cls, *args, **inputs):
current = Task.get_current()
Expand Down Expand Up @@ -65,8 +64,8 @@ def meta(self) -> dict:
def __str__(self) -> str:
return f'Task({self.id}, {self.name})'

def init(self):
pass
def init(self) -> dict:
return {}

async def before(self, inputs: dict) -> dict:
return inputs
Expand Down Expand Up @@ -183,6 +182,13 @@ def spawn(
def exit(self, result):
raise StoppedError(result)

async def set_state(self, state: dict) -> None:
self.state = {
**self.state,
**state,
}
await self.node.parent.send_state(self.state)

@staticmethod
def get_current() -> 'Task':
return Task.__current__
Expand Down
4 changes: 4 additions & 0 deletions cowait/tasks/ui/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# flake8: noqa: 401
from .ui_component import UIComponent
from .progress_bar import ProgressBar

25 changes: 25 additions & 0 deletions cowait/tasks/ui/progress_bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from .ui_component import UIComponent


class ProgressBar(UIComponent):
def __init__(self, task, id, value: float, min: float, max: float, label: str = ''):
super().__init__(task, id, '@cowait/ProgressBar')
self.state = {
'value': value,
'min': min,
'max': max,
'label': label,
}

async def set_value(self, value):
await self.set('value', value)

async def set_min(self, value):
await self.set('min', value)

async def set_max(self, value):
await self.set('max', value)

async def set_label(self, value):
await self.set('label', value)

34 changes: 34 additions & 0 deletions cowait/tasks/ui/ui_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@


class UIComponent(object):
def __init__(self, task, id: str, component: str):
self.id = id
self.task = task
self.component = component
self.state = {}

async def set_state(self, state: dict) -> None:
self.state = {
**self.state,
**state,
}
await self.task.set_state({self.id: self.state})

def __getattr__(self, key):
return self.get(key)

def get(self, key):
if key not in self.state:
raise AttributeError(f'No such state variable {key}')
return self.state[key]

async def set(self, key, value):
if key not in self.state:
raise AttributeError(f'No such state variable {key}')
await self.set_state({key: value})

def to_json(self):
return {
'id': self.id,
'component': self.component,
}
6 changes: 6 additions & 0 deletions cowait/tasks/ui/view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .ui_component import UIComponent

class View(UIComponent):

pass

8 changes: 6 additions & 2 deletions cowait/worker/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ async def execute(cluster: ClusterProvider, taskdef: TaskDefinition) -> None:
node=node,
)

Task.set_current(task)

# monitor system resources
node.monitor_system(interval=2)

# initialize task
task.init()
# initialize task & send initial state
state = task.init()
if state is not None:
await task.set_state(state)

# start http server
# this must happen after task.init() so that tasks have a chance
Expand Down
Loading