Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Docs][flyteagent] Added description of exception deletion cases. #6039

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions docs/user_guide/flyte_agents/developing_agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ jupytext:
---

(developing_agents)=

# Developing agents

The Flyte agent framework enables rapid agent development, since agents are decoupled from the core FlytePropeller engine. Rather than building a complete gRPC service from scratch, you can implement an agent as a Python class, easing development. Agents can be tested independently and deployed privately, making maintenance easier and giving you more flexibility and control over development.
Expand All @@ -20,8 +21,9 @@ We strongly encourage you to contribute your agent to the Flyte community. To do
```

There are two types of agents: **async** and **sync**.
* **Async agents** enable long-running jobs that execute on an external platform over time. They communicate with external services that have asynchronous APIs that support `create`, `get`, and `delete` operations. The vast majority of agents are async agents.
* **Sync agents** enable request/response services that return immediate outputs (e.g. calling an internal API to fetch data or communicating with the OpenAI API).

- **Async agents** enable long-running jobs that execute on an external platform over time. They communicate with external services that have asynchronous APIs that support `create`, `get`, and `delete` operations. The vast majority of agents are async agents.
- **Sync agents** enable request/response services that return immediate outputs (e.g. calling an internal API to fetch data or communicating with the OpenAI API).

```{note}
Expand All @@ -41,6 +43,15 @@ To create a new async agent, extend the [`AsyncAgentBase`](https://github.com/fl
- `get`: This method retrieves the job resource (jobID or output literal) associated with the task, such as a BigQuery job ID or Databricks task ID.
- `delete`: Invoking this method will send a request to delete the corresponding job.

```{note}
When users use create method to create a new job, with its job ID, they can use get method and job ID to check the execution state, if it is succeed or not.
Exceptional delete case:
If users interrupt task during it's running, FlytePropeller will invoke delete method to corresponding job.
```

```python
from typing import Optional
from dataclasses import dataclass
Expand Down Expand Up @@ -113,6 +124,7 @@ AgentRegistry.register(OpenAIAgent())
```

#### Sensor interface specification

With the agent framework, you can easily build a custom sensor in Flyte to watch certain events or monitor the bucket in your workflow.

To create a new sensor, extend the `[BaseSensor](https://github.com/flyteorg/flytekit/blob/master/flytekit/sensor/base_sensor.py#L43)` class and implement the `poke` method, which checks whether a specific condition is met.
Expand All @@ -130,7 +142,6 @@ class FileSensor(BaseSensor):
return fs.exists(path)
```


### 2. Test the agent

You can test your agent in a {ref}`local Python environment <testing_agents_locally>` or in a {ref}`local development cluster <testing_agents_in_a_local_development_cluster>`.
Expand Down Expand Up @@ -181,29 +192,29 @@ By default, all agent requests will be sent to the default agent service. Howeve
you can route particular task requests to designated agent services by adjusting the FlytePropeller configuration.

```yaml
plugins:
agent-service:
# By default, all requests will be sent to the default agent.
defaultAgent:
endpoint: "dns:///flyteagent.flyte.svc.cluster.local:8000"
insecure: true
timeouts:
# CreateTask, GetTask and DeleteTask are for async agents.
# ExecuteTaskSync is for sync agents.
CreateTask: 5s
GetTask: 5s
DeleteTask: 5s
ExecuteTaskSync: 10s
defaultTimeout: 10s
agents:
custom_agent:
endpoint: "dns:///custom-flyteagent.flyte.svc.cluster.local:8000"
insecure: false
defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}'
timeouts:
GetTask: 5s
defaultTimeout: 10s
agentForTaskTypes:
# It will override the default agent for custom_task, which means propeller will send the request to this agent.
- custom_task: custom_agent
plugins:
agent-service:
# By default, all requests will be sent to the default agent.
defaultAgent:
endpoint: "dns:///flyteagent.flyte.svc.cluster.local:8000"
insecure: true
timeouts:
# CreateTask, GetTask and DeleteTask are for async agents.
# ExecuteTaskSync is for sync agents.
CreateTask: 5s
GetTask: 5s
DeleteTask: 5s
ExecuteTaskSync: 10s
defaultTimeout: 10s
agents:
custom_agent:
endpoint: "dns:///custom-flyteagent.flyte.svc.cluster.local:8000"
insecure: false
defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}'
timeouts:
GetTask: 5s
defaultTimeout: 10s
agentForTaskTypes:
# It will override the default agent for custom_task, which means propeller will send the request to this agent.
- custom_task: custom_agent
```
34 changes: 26 additions & 8 deletions flytepropeller/pkg/compiler/validators/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,40 @@
// For custom types, we expect the JSON schemas in the metadata to come from the same JSON schema package,
// specifically draft 2020-12 from Mashumaro.

srcSchemaBytes, _ := json.Marshal(sourceMetaData.GetFields())
tgtSchemaBytes, _ := json.Marshal(targetMetaData.GetFields())
srcSchemaBytes, err := json.Marshal(sourceMetaData.GetFields())
if err != nil {
logger.Infof(context.Background(), "Failed to marshal source metadata: [%v]", err)
return false
}

Check warning on line 35 in flytepropeller/pkg/compiler/validators/typing.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/typing.go#L33-L35

Added lines #L33 - L35 were not covered by tests
tgtSchemaBytes, err := json.Marshal(targetMetaData.GetFields())
if err != nil {
logger.Infof(context.Background(), "Failed to marshal target metadata: [%v]", err)
return false
}

Check warning on line 40 in flytepropeller/pkg/compiler/validators/typing.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/typing.go#L38-L40

Added lines #L38 - L40 were not covered by tests

compiler := jsonschema.NewCompiler()

err := compiler.AddResource("src", bytes.NewReader(srcSchemaBytes))
err = compiler.AddResource("src", bytes.NewReader(srcSchemaBytes))
if err != nil {
logger.Infof(context.Background(), "Failed to add resource to compiler: [%v]", err)

Check warning on line 46 in flytepropeller/pkg/compiler/validators/typing.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/typing.go#L46

Added line #L46 was not covered by tests
return false
}
err = compiler.AddResource("tgt", bytes.NewReader(tgtSchemaBytes))
if err != nil {
logger.Infof(context.Background(), "Failed to add resource to compiler: [%v]", err)

Check warning on line 51 in flytepropeller/pkg/compiler/validators/typing.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/typing.go#L51

Added line #L51 was not covered by tests
return false
}

srcSchema, _ := compiler.Compile("src")
tgtSchema, _ := compiler.Compile("tgt")
srcSchema, err := compiler.Compile("src")
if err != nil {
logger.Infof(context.Background(), "Failed to compile source schema: [%v]", err)
return false
}

Check warning on line 59 in flytepropeller/pkg/compiler/validators/typing.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/typing.go#L57-L59

Added lines #L57 - L59 were not covered by tests
tgtSchema, err := compiler.Compile("tgt")
if err != nil {
logger.Infof(context.Background(), "Failed to compile target schema: [%v]", err)
return false
}

Check warning on line 64 in flytepropeller/pkg/compiler/validators/typing.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/typing.go#L62-L64

Added lines #L62 - L64 were not covered by tests

// Compare the two schemas
errs := jscmp.Compare(tgtSchema, srcSchema)
Expand All @@ -63,20 +81,20 @@
func isSameTypeInJSON(sourceMetaData, targetMetaData *structpb.Struct) bool {
srcSchemaBytes, err := json.Marshal(sourceMetaData.GetFields())
if err != nil {
logger.Infof(context.Background(), "Failed to marshal source metadata: %v", err)
logger.Infof(context.Background(), "Failed to marshal source metadata: [%v]", err)

Check warning on line 84 in flytepropeller/pkg/compiler/validators/typing.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/typing.go#L84

Added line #L84 was not covered by tests
return false
}

tgtSchemaBytes, err := json.Marshal(targetMetaData.GetFields())
if err != nil {
logger.Infof(context.Background(), "Failed to marshal target metadata: %v", err)
logger.Infof(context.Background(), "Failed to marshal target metadata: [%v]", err)

Check warning on line 90 in flytepropeller/pkg/compiler/validators/typing.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/typing.go#L90

Added line #L90 was not covered by tests
return false
}

// Use jsondiff to compare the two schemas
patch, err := jsondiff.CompareJSON(srcSchemaBytes, tgtSchemaBytes)
if err != nil {
logger.Infof(context.Background(), "Failed to compare JSON schemas: %v", err)
logger.Infof(context.Background(), "Failed to compare JSON schemas: [%v]", err)

Check warning on line 97 in flytepropeller/pkg/compiler/validators/typing.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/typing.go#L97

Added line #L97 was not covered by tests
return false
}

Expand Down
Loading