Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chatgpt Agent #1822

Closed

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Sep 7, 2023

TL;DR

  • Create Base class for all external API tasks, a base task for all plugins with DoTask function
  • Create an TaskExecutor class, a base agent for all plugins with DoTask function
  • Create a ChatGPT plugin by OpenAI API

  • Support local execution for do task function

  • Do refactor for base agent

  • Add 2 variable in base_agent.py. ASYNC_PLUGIN and SYNC_PLUGIN. They are use in runtime flavor from TaskMetadata

  • Use the runtime_flavor in TaskMetadata (It hasn't been used before in flytekit)

  • Change the usecase of runtime_flavor by adding comments

Change the usecase to the plugin routing mechanism

  • Add runtime_flavor paramters in all agent task and webapi task
    Here's the list.

plugins/flytekit-airflow
plugins/flytekit-aws-athena
plugins/flytekit-aws-batch
plugins/flytekit-bigquery
plugins/flytekit-mmcloud
plugins/flytekit-snowflake
plugins/flytekit-spark
flytekit/sensor

I get the list from

  1. https://github.com/flyteorg/flyte/tree/master/flyteplugins/go/tasks/plugins/webapi
  2. use find . | grep agent.py under the flytekit folder
  3. sensor is a special case.

Note: ChatGPT doensn't need it because the external api task has done it for it.

  • Add test for the above changes

  • Finish above without introducing breaking changes

  • Currently, ChatGPT timeout works when retrying the second time in an API call, which means that if the query takes you 40seconds, you have to wait 40seconds.
    This is not ideal, we will improve it when a new server integrates into Flyte Backend System. Currently, we can set the timeout settings in the config-map.

Additional Information

Note: the timeout of the ChatGPT API will check the logic when retry,
so I highly recommend users change the timeout of the DoTask function to 40 seconds,
which will probably not exceed the limit, and we will introduce a api_task_server to fix this situation.

Note: If we use aiohttp to prevent the timeout by setting the timeout time, it is doable,
but in my current experiment, openai.acreate is faster than using aiohttp (I haven't figure it out).
Here is the proof:
https://hackmd.io/Fmx3DHGtQBSqYydcrdF_Cg

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

How to test it?

Configuration Example

tasks:
  task-plugins:
    enabled-plugins:
      - agent-service
    default-for-task-types:
      api_task: agent-service
plugins:
  agent-service:
    supportedTaskTypes:
      - api_task
    defaultAgent:
      endpoint: "dns:///localhost:8000"
      insecure: true
      timeouts:
        DoTask: 100s

Run in Remote Environment

You can test the agent with Dockerfile.

FROM python:3.9-slim-buster
USER root
WORKDIR /root
ENV PYTHONPATH /root
RUN apt-get update && apt-get install build-essential -y
RUN apt-get install git -y
RUN pip install -U git+https://github.com/Future-Outlier/flytekit.git@0ecebdfc3a018d888748e4d9adf715c72c761180#subdirectory=plugins/flytekit-openai-chatgpt

RUN pip install -U git+https://github.com/Future-Outlier/flyte.git@b879137d24fe6bfb60d6d1182cd8d9b3f32335cc#subdirectory=flyteidl

RUN pip install -U git+https://github.com/Future-Outlier/flytekit.git@0ecebdfc3a018d888748e4d9adf715c72c761180
from flytekit import task, workflow, ImageSpec
from flytekitplugins.chatgpt import ChatGPTTask


chatgpt_job = ChatGPTTask(
    name="chatgpt",
    config={
        "openai_organization": "org-NayNG68kGnVXMJ8Ak4PMgQv7",
        "chatgpt_conf": {
            "model": "gpt-3.5-turbo",
            # 'messages': [{'role': 'user', 'content': 'Say this is a test!'}],
            "temperature": 0.7,
        },
    },
)

@task()
def t1(s: str) -> str:
    s = "Repsonse: " + s
    return s

@workflow
def wf() -> str:
    message = chatgpt_job(message="hi")
    return t1(s=message)

if __name__ == "__main__":
    print(wf())
  1. start your grpc agent server
pyflyte serve
  1. execute it in remote environment
pyflyte run --remote --image futureoutlier/flytekit:chatgpt-v2 chatgpt_example.py wf

Screen Shot

image
Note: You can find that the ChatGPT task is an api_task.

How it works at high level

image

Tracking Issue

flyteorg/flyte#3936

Future Outlier added 2 commits September 7, 2023 10:38
Signed-off-by: Future Outlier <[email protected]>
@Future-Outlier Future-Outlier marked this pull request as draft September 7, 2023 02:42
@codecov
Copy link

codecov bot commented Sep 7, 2023

Codecov Report

Attention: 22 lines in your changes are missing coverage. Please review.

Comparison is base (88818fd) 85.94% compared to head (970bf3b) 18.71%.
Report is 15 commits behind head on master.

❗ Current head 970bf3b differs from pull request most recent head 68fd527. Consider uploading reports for the commit 68fd527 to get more accurate results

Files Patch % Lines
flytekit/extend/backend/agent_service.py 0.00% 20 Missing ⚠️
flytekit/extend/backend/base_agent.py 50.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #1822       +/-   ##
===========================================
- Coverage   85.94%   18.71%   -67.24%     
===========================================
  Files         308      332       +24     
  Lines       22946    31410     +8464     
  Branches     3468     3084      -384     
===========================================
- Hits        19721     5878    -13843     
- Misses       2621    25448    +22827     
+ Partials      604       84      -520     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Future-Outlier Future-Outlier changed the title Chatgpt Agent Chatgpt Agent and Agent Do Task Function Sep 11, 2023
Future Outlier added 4 commits November 22, 2023 13:01
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
"""
raise NotImplementedError

def get_custom(self, settings: SerializationSettings = None) -> Dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have to use pickle?
cc @pingsutw do you know? is this to maintain simply python coversion? This is potentially dangerous as it may break across python versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pickle is faster, we've talked about this before and Kevin told me that it might be ok.
Will there be multiple python versions usecases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot control backend and flytekit version

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, I fix it!
Thank you very much for your advice!

…nto chatgpt-agent-sync-plugin

Signed-off-by: Future Outlier <[email protected]>
@Future-Outlier Future-Outlier marked this pull request as draft December 13, 2023 08:29
def __init__(self):
super().__init__(task_type=TASK_TYPE, asynchronous=True)

async def async_create(
Copy link
Contributor

@kumare3 kumare3 Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would change this to SyncAgentBase as follows

class SyncAgentBase(AgentBase):

      @final
      async def async_create( self,
        context: grpc.ServicerContext,  io_ctx: IOContext, task_template: TaskTemplate, 
    ) -> CreateTaskResponse:
          do(context, output_prefix, task_template, inputs)   

    def async do(context, io_ctx, task_template):
        python_interface_inputs = {
            name: TypeEngine.guess_python_type(lt.type) for name, lt in task_template.interface.inputs.items()
        }
        ctx = FlyteContextManager.current_context()

        native_inputs = {}
        if inputs:
            native_inputs = TypeEngine.literal_map_to_kwargs(ctx, inputs, python_interface_inputs)

        meta = task_template.custom

        task_module = importlib.import_module(name=meta[TASK_MODULE])
        task_def = getattr(task_module, meta[TASK_NAME])
        config = jsonpickle.decode(meta[TASK_CONFIG_PKL]) if meta.get(TASK_CONFIG_PKL) else None
        return task_def(TASK_TYPE, config=config).execute(**native_inputs)
            
    async def execute(**kwargs):
         raise NotImplementedError()

Checkout the final decorator - https://docs.python.org/3.8/library/typing.html#typing.final

Independently, lets are make the signature of all the get/create/delete methods simpler. Think, if we have to refactor the signature in the future how can you do it easily?

@dataclass
class IOContext():
     inputs: LiteralMap
     output_prefix: str

OR change the signature to have **kwargs so that we maintain ease of refactoring

def __init__(self, *args, **kwargs):
   ...

async def create(ctx, inputs, outputs, task_template, **kwargs):
   ...

Future Outlier added 6 commits December 19, 2023 15:04
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
@kumare3
Copy link
Contributor

kumare3 commented Dec 20, 2023

no changes here yet?

@Future-Outlier
Copy link
Member Author

Future-Outlier commented Dec 20, 2023

no changes here yet?

I will update it later.
I originally thought that only a certain part needed to be changed. (only need to change pickle to dict)
I will update the syncAgentBase class, xd

@Future-Outlier
Copy link
Member Author

Future-Outlier commented Dec 20, 2023

Execute with the class SyncAgentBase.
image

Signed-off-by: Future Outlier <[email protected]>
@Future-Outlier
Copy link
Member Author

Future-Outlier commented Dec 20, 2023

I am considering the following changes.

  1. rename api_task to sync_agent_task.
  2. rename task_executor to sync_agent_executor.
  3. rename external_api_task to sync_agent_task
  4. don't use IOContext, this is due to avoid breaking change.

Note:
I think sync_agent_task is better than sync_task.
This is because there are use case to write sync_task in the propeller to execute, I don't want to make it be confused.

Signed-off-by: Future Outlier <[email protected]>
This was referenced Jan 3, 2024
@Future-Outlier
Copy link
Member Author

Future-Outlier commented Jan 11, 2024

This PR will be deprecated.
We can refer to this PR for generic sync agent executor in the future.

@Future-Outlier Future-Outlier mentioned this pull request Mar 1, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants