diff --git a/modelscope_agent/agents/data_science_assistant.py b/modelscope_agent/agents/data_science_assistant.py index db8ad247..718f6210 100644 --- a/modelscope_agent/agents/data_science_assistant.py +++ b/modelscope_agent/agents/data_science_assistant.py @@ -1,6 +1,4 @@ # Implementation inspired by the paper "DATA INTERPRETER: AN LLM AGENT FOR DATA SCIENCE" -import asyncio -import copy import os import time from datetime import datetime @@ -39,8 +37,7 @@ - **other**: Any tasks not in the defined categories # Task: -Based on the context, write a simple plan or modify an existing plan of what you should do to achieve the goal. A plan \ -consists of one to four tasks. +Based on the context, write a simple plan or modify an existing plan of what you should do to achieve the goal. Output a list of jsons following the format: ```json @@ -55,6 +52,44 @@ ] ``` """ + +DECOMPOSE_TASK_TEMPLATE = """ +# Context: +{context} +# Available Task Types: +- **eda**: For performing exploratory data analysis +- **data preprocessing**: For preprocessing dataset in a data analysis or machine learning task ONLY,\ +general data operation doesn't fall into this type +- **feature engineering**: Only for creating new columns fo input data. +- **model train**: Only for training model. +- **model evaluate**: Only for evaluating model. +- **ocr**: Only for OCR tasks. +- **other**: Any tasks not in the defined categories + +# Previous Tasks +We have already generated the following tasks: +{previous_tasks} +# Task: +The current task is: +{current_task} +Currently, the current task is too complex to be executed in one step. Please decompose the task into smaller tasks, \ +and output a list of jsons following the format: +Output a list of jsons following the format: + +```json +[ + {{ + "task_id": str = "unique identifier for a task in plan, can be an ordinal, \ + should be unique and not conflict with previous task ids", + "dependent_task_ids": list[str] = "ids of tasks prerequisite to this task", + "instruction": "what you should do in this task, one short phrase or sentence", + "task_type": "type of this task, should be one of Available Task Types", + }}, + ... +] +``` +""" + CODE_TEMPLATE = """ # Task you are a code generator, you need to generate a code python block in jupyter notebook to achieve the \ @@ -597,8 +632,8 @@ def _judge_code(self, task, previous_code_blocks, code, if 'incorrect' in judge_result.split('\n')[-1]: success = False failed_reason = ( - 'Though the code executes successfully, The code logic is incorrect, here is the reason: ' - + judge_result) + 'Though the code executes successfully, The code logic is \ + incorrect, here is the reason: ' + judge_result) return success, failed_reason else: @@ -634,7 +669,7 @@ def _run(self, user_request, save: bool = True, **kwargs): previous_code_blocks = self._get_previous_code_blocks() success = False code_counter = 0 - max_try = kwargs.get('max_try', 10) + max_try = kwargs.get('max_try', 1) while not success and code_counter < max_try: code_execute_success = False code_logic_success = False @@ -726,9 +761,13 @@ def _run(self, user_request, save: bool = True, **kwargs): encoding='utf-8') as file: nbformat.write(self.code_interpreter.nb, file) else: - self.plan = self._update_plan( - user_request=user_request, curr_plan=self.plan) - self.code_interpreter.reset() + decomposed_tasks = self._decompose_task(task) + if decomposed_tasks: + self.plan.replace_task(task, decomposed_tasks) + else: + self.plan = self._update_plan( + user_request=user_request, curr_plan=self.plan) + self.code_interpreter.reset() # save the plan into json file if save: after_time = time.time() @@ -769,3 +808,36 @@ def _get_total_tokens(self): except Exception as e: logger.error(f'get total token error: {e}') pass + + def _decompose_task(self, task): + try: + print(f'decompose task {task.task_id}') + messages = [{ + 'role': + 'user', + 'content': + DECOMPOSE_TASK_TEMPLATE.format( + context='User Request: ' + task.instruction + '\n', + previous_tasks='\n'.join([ + json.dumps({ + 'task_id': t.task_id, + 'dependent_task_ids': t.dependent_task_ids, + 'instruction': t.instruction, + 'task_type': t.task_type + }) for t in self.plan.tasks + ]), + current_task=json.dumps(task.__dict__)) + }] + resp = self._call_llm(prompt=None, messages=messages, stop=None) + tasks_text = '' + for r in resp: + tasks_text += r + tasks_text = parse_code(text=tasks_text, lang='json') + logger.info(f'decomposed tasks: {tasks_text}') + + tasks = json5.loads(tasks_text) + tasks = [Task(**task) for task in tasks] + return tasks + except Exception as e: + logger.error(f'decompose task error: {e}') + return None