From 16d50bd4dc65b6a3a27143e6c38cd2713d6eedaf Mon Sep 17 00:00:00 2001 From: root Date: Fri, 15 Nov 2024 16:33:42 -0500 Subject: [PATCH 1/3] Initial autogen-core seeds --- .../autogen-core/samples/common/adas/adas.py | 479 ++++++++ .../samples/common/adas/adas_prompt.py | 1048 +++++++++++++++++ .../autogen-core/samples/common/adas/utils.py | 296 +++++ 3 files changed, 1823 insertions(+) create mode 100644 python/packages/autogen-core/samples/common/adas/adas.py create mode 100644 python/packages/autogen-core/samples/common/adas/adas_prompt.py create mode 100644 python/packages/autogen-core/samples/common/adas/utils.py diff --git a/python/packages/autogen-core/samples/common/adas/adas.py b/python/packages/autogen-core/samples/common/adas/adas.py new file mode 100644 index 00000000000..3a1c6bad78c --- /dev/null +++ b/python/packages/autogen-core/samples/common/adas/adas.py @@ -0,0 +1,479 @@ + + + + + +import argparse +import asyncio +import os +import logging +import json +import re +import uuid +import pickle +from dataclasses import dataclass +from typing import Dict, List, Union +from collections import namedtuple +from concurrent.futures import ThreadPoolExecutor +from tqdm import tqdm +import threading +import random + +from autogen_agentchat.agents import CodeExecutorAgent, CodingAssistantAgent +from autogen_core.base import AgentId, AgentType, AgentRuntime, CancellationToken, MessageContext, TopicId +from autogen_core.components import RoutedAgent, default_subscription, message_handler +from autogen_core.components.models import ( + AssistantMessage, + ChatCompletionClient, + LLMMessage, + SystemMessage, + UserMessage, +) + +from autogen_core.application import SingleThreadedAgentRuntime +from autogen_core.components import DefaultTopicId +from autogen_core.components.models import OpenAIChatCompletionClient +from autogen_core.components.tools import FunctionTool, PythonCodeExecutionTool, ToolSchema +from autogen_core.components.tool_agent import ToolAgent, tool_agent_caller_loop +from autogen_ext.code_executors import DockerCommandLineCodeExecutor #, extract_markdown_code_blocks +from autogen_core.components.code_executor import CodeBlock, CodeExecutor, extract_markdown_code_blocks +from autogen_magentic_one.utils import LogHandler +from autogen_core.application.logging import EVENT_LOGGER_NAME + +# TODO fix imports +import sys +sys.path.append("/home/andyye/autogen/python/packages/autogen-core/samples/") +from common.utils import get_chat_completion_client_from_envs + +from adas_prompt import get_init_archive, get_prompt, get_reflexion_prompt +from utils import random_id, bootstrap_confidence_interval, load_drop, drop_metric + + +logging.basicConfig(level=logging.WARNING) +logging.getLogger("autogen_core").setLevel(logging.DEBUG) + +Info = namedtuple('Info', ['name', 'author', 'content', 'iteration_idx']) + +SEARCHING_MODE = True + + + +@dataclass +class CodeWritingTask: + task: str + + +@dataclass +class CodeWritingResult: + task: str + code: str + review: str + + +@dataclass +class CodeReviewTask: + session_id: str + code_writing_task: str + code_writing_scratchpad: str + code: str + + +@dataclass +class CodeReviewResult: + review: str + session_id: str + approved: bool + + +@dataclass +class ADASTask: + task: str + +@dataclass +class ADASResult: + result: str + +@dataclass +class ReflectTask: + session_id: str + task: str + thought: str + + +@dataclass +class LLMMessageList: + llm_message_list: List[LLMMessage] + + +@dataclass +class SimpleReflectAgentResponse: + json_content: Dict[str, str] + # content: str + + +@dataclass +class LLMAgentBaseTask: + system_message: LLMMessage + instruction: LLMMessage + input_infos: List[Info] + iteration_idx: int + output_fields: List[str] + role: str + + +@dataclass +class LLMAgentBaseResponse: + output: str + + +# An agent that makes a direct call to the model, and returns json +class SimpleReflectAgent(RoutedAgent): + def __init__(self, description: str, model_client: ChatCompletionClient, system_prompt: str) -> None: + super().__init__(description) + self._system_messages: List[LLMMessage] = [ + SystemMessage( + content=system_prompt, + ) + ] + self._chat_history: List[LLMMessage] = [] + self._model_client = model_client + self._cnt = 0 + + @message_handler + async def handle_task(self, message: LLMMessageList, ctx: MessageContext) -> SimpleReflectAgentResponse: + # logging.info(f"{self._description} received message: {message}") + # import pdb; pdb.set_trace() + # model_result = await self._model_client.create( + # self._system_messages + self._chat_history + message.llm_message_list + # ) + print(f"llm_message_list {len(message.llm_message_list)}") + self._chat_history.extend(message.llm_message_list) + + print(f"-----cnt {self._cnt}") + print(f"chat history {len(self._chat_history)}") + self._cnt += 1 + assert isinstance(model_result.content, str) + json_content = json.loads(model_result.content) + return SimpleReflectAgentResponse(json_content=json_content) + + +@dataclass +class Message: + content: str + + +@default_subscription +class Assistant(RoutedAgent): + def __init__(self, model_client: ChatCompletionClient) -> None: + super().__init__("An assistant agent.") + self._model_client = model_client + self._chat_history: List[LLMMessage] = [ + SystemMessage( + content="""Write Python script in markdown block, and it will be executed. +Always save figures to file in the current directory. Do not use plt.show()""", + ) + ] + + @message_handler + async def handle_message(self, message: Message, ctx: MessageContext) -> None: + self._chat_history.append(UserMessage(content=message.content, source="user")) + result = await self._model_client.create(self._chat_history) + print(f"\n{'-'*80}\nAssistant:\n{result.content}") + self._chat_history.append(AssistantMessage(content=result.content, source="assistant")) # type: ignore + await self.publish_message(Message(content=result.content), DefaultTopicId()) # type: ignore + + +@default_subscription +class Executor(RoutedAgent): + def __init__(self, code_executor: CodeExecutor) -> None: + super().__init__("An executor agent.") + self._code_executor = code_executor + + @message_handler + async def handle_message(self, message: Message, ctx: MessageContext) -> None: + code_blocks = extract_markdown_code_blocks(message.content) + if code_blocks: + result = await self._code_executor.execute_code_blocks( + code_blocks, cancellation_token=ctx.cancellation_token + ) + print(f"\n{'-'*80}\nExecutor:\n{result.output}") + await self.publish_message(Message(content=result.output), DefaultTopicId()) + + +class AgentSystem(): + def __init__(self) -> None: + pass + +def generate_task(input_infos) -> str: + + # construct input infos text + input_infos_text = '' + for input_info in input_infos: + if isinstance(input_info, Info): + (field_name, author, content, iteration_idx) = input_info + else: + continue + + if field_name == 'task': + input_infos_text += f'# Your Task:\n{content}\n\n' + elif iteration_idx != -1: + # input_infos_text += f'### {field_name} #{iteration_idx + 1} by {author}:\n{content}\n\n' + input_infos_text += f'### {field_name} #{iteration_idx + 1}:\n{content}\n\n' + else: + # input_infos_text += f'### {field_name} by {author}:\n{content}\n\n' + input_infos_text += f'### {field_name}:\n{content}\n\n' + + prompt = input_infos_text + "# Instruction: \n" + return prompt + +def evaluate_forward_fn(args, forward_str): + # dynamically define forward() + # modified from https://github.com/luchris429/DiscoPOP/blob/main/scripts/launch_evo.py + namespace = {} + print(f"forward str {forward_str}") + exec(forward_str, globals(), namespace) + names = list(namespace.keys()) + if len(names) != 1: + raise AssertionError(f"{len(names)} things in namespace. Please only provide 1") + func = namespace[names[0]] + if not callable(func): + raise AssertionError(f"{func} is not callable") + setattr(AgentSystem, "forward", func) + + # set seed 0 for valid set + examples = load_drop(args.data_filename)[1:-1] # first one and the last one is for few-shot examples + random.seed(args.shuffle_seed) + random.shuffle(examples) + + if SEARCHING_MODE: + examples = examples[:args.valid_size] * args.n_repeat + else: + examples = examples[args.valid_size:args.valid_size + args.test_size] * args.n_repeat + + questions = [example['inputs'] for example in examples] + answers = [example['targets'] for example in examples] + + print(f"problem length: {len(examples)}") + max_workers = min(len(examples), args.max_workers) if args.multiprocessing else 1 + + task_queue = [] + for q in questions: + taskInfo = Info('task', 'User', q, -1) + task_queue.append((taskInfo, AgentSystem())) + + # agentSystem = AgentSystem() + + def call_forward(agent_task_queue): + taskInfo, agent = agent_task_queue + print(f"taskInfo {taskInfo}") + task = generate_task([taskInfo]) + + # For magentic one using the create_completion_client_from_env() helper + # export CHAT_COMPLETION_PROVIDER='azure' + + + agent_model_kwargs = {} + + result = agent.forward(task, agent_model_kwargs) + return result + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + results = list(tqdm(executor.map(call_forward, task_queue), total=len(task_queue))) + + acc_list = [] + for q_idx, res in enumerate(results): + try: + if isinstance(res, Info): + extracted_answer = res.content + else: + extracted_answer = res + correct_answers = answers[q_idx] + print(f"extracted_answer {extracted_answer}, correct_answers {correct_answers}") + em_score, f1_score = drop_metric(extracted_answer, correct_answers) + except Exception as e: + acc_list.append(0) + continue + + acc_list.append(f1_score) + + print(f"f1: {bootstrap_confidence_interval(acc_list)}") + import pdb; pdb.set_trace() + return acc_list + + + +@default_subscription +class ADASAgent(RoutedAgent): + """An agent that performs ADAS.""" + + def __init__(self, + model_client: ChatCompletionClient, + # system_prompt: str, + # evaluate_agent_type: str, + reflect_agent_type: str, + executor_agent_type: str, + args, + archive + ) -> None: + super().__init__("An agent searching agent.") + # self._system_messages: List[LLMMessage] = [ + # SystemMessage( + # content=system_prompt, + # ) + # ] + # self._evaluate_agent_id = AgentId(evaluate_agent_type, self.id.key) + self._reflect_agent_id = AgentId(reflect_agent_type, self.id.key) + self._executor_agent_id = AgentId(executor_agent_type, self.id.key) + self._args = args + self._archive = archive + self._model_client = model_client + self._session_memory: Dict[str, List[ADASTask | ADASResult]] = {} + + @message_handler + async def handle_adas_task(self, message: ADASTask, ctx: MessageContext) -> None: + # Store the messages in a temporary memory for this request only. + session_id = str(uuid.uuid4()) + self._session_memory.setdefault(session_id, []).append(message) + + # Process archive + file_path = os.path.join(args.save_dir, f"{args.expr_name}_run_archive.json") + if os.path.exists(file_path): + with open(file_path, 'r') as json_file: + archive = json.load(json_file) + if "generation" in archive[-1] and isinstance(archive[-1]['generation'], int): + start = archive[-1]['generation'] + else: + start = 0 + else: + archive = get_init_archive() + start = 0 + + for solution in archive: + if 'fitness' in solution: + continue + + solution['generation'] = "initial" + print(f"============Initial Archive: {solution['name']}=================") + try: + acc_list = evaluate_forward_fn(args, solution["code"]) + except Exception as e: + print("During evaluating initial archive:") + print(e) + continue + + fitness_str = bootstrap_confidence_interval(acc_list) + solution['fitness'] = fitness_str + + # save results + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, 'w') as json_file: + json.dump(archive, json_file, indent=4) + + import pdb; pdb.set_trace() + # Initial prompt + for n in range(start, args.n_generation): + print(f"============Generation {n + 1}=================") + msg_list = [UserMessage(content=message.task, source=self.metadata["type"])] + import pdb; pdb.set_trace() + try: + response = await self.send_message(LLMMessageList(msg_list), self._reflect_agent_id) + Reflexion_prompt_1, Reflexion_prompt_2 = get_reflexion_prompt(self._archive[-1] if n > 0 else None) + + # Reflexion 1 + next_solution = response.json_content + new_messages = [ + AssistantMessage(content=str(next_solution), source=self.metadata["type"]), + UserMessage(content=Reflexion_prompt_1, source=self.metadata["type"]), + ] + response = await self.send_message(LLMMessageList(new_messages), AgentId('simple_reflect_agent', self.id.key)) + + # Reflexion 2 + next_solution = response.json_content + new_messages = [ + AssistantMessage(content=str(next_solution), source=self.metadata["type"]), + UserMessage(content=Reflexion_prompt_2, source=self.metadata["type"]), + ] + response = await self.send_message(LLMMessageList(new_messages), AgentId('simple_reflect_agent', self.id.key)) + except Exception as e: + # import pdb; pdb.set_trace() + print("During LLM generate new solution:") + print(e) + continue + + # TODO: Evaluate code + next_solution = response.json_content + print(f"final {str(next_solution)}") + import pdb; pdb.set_trace() + acc_list = evaluate_forward_fn(args, next_solution["code"]) + import pdb; pdb.set_trace() + + print("asdf") + # TODO: Maybe not... instantiate many agents to run eval. + # acc_list = await self.send_message(EvaluateTask(), self._evaluate_agent_id) + + +async def main(args) -> None: + runtime = SingleThreadedAgentRuntime() + client = get_chat_completion_client_from_envs(model="gpt-4o-mini") + archive = get_init_archive() + system_prompt, prompt = get_prompt(archive) + + # Create the reflect agent + await SimpleReflectAgent.register( + runtime, "simple_reflect_agent", lambda: SimpleReflectAgent( + description='Simple Reflect Agent', + model_client=client, + system_prompt=system_prompt, + ) + ) + + await ADASAgent.register( + runtime, "adas_agent", lambda: ADASAgent( + model_client=client, + args=args, + archive=archive, + reflect_agent_type='simple_reflect_agent', + executor_agent_type='executor', + ) + ) + + runtime.start() + + # Publish an initial message to trigger the ADAS search to start. + await runtime.publish_message( + message=ADASTask(task=prompt), + topic_id=DefaultTopicId(), + ) + + # Keep processing messages until idle. + await runtime.stop_when_idle() + + +# python packages/autogen-core/samples/common/adas/adas.py --data_filename=/home/andyye/ADAS/dataset/drop_v0_dev.jsonl.gz --valid_size=1 +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run ADAS") + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.") + parser.add_argument('--data_filename', type=str, default="dataset/drop_v0_dev.jsonl.gz") + parser.add_argument('--valid_size', type=int, default=128) + parser.add_argument('--test_size', type=int, default=800) + parser.add_argument('--shuffle_seed', type=int, default=0) + parser.add_argument('--n_repeat', type=int, default=1) + parser.add_argument('--multiprocessing', action='store_true', default=True) + parser.add_argument('--max_workers', type=int, default=48) + parser.add_argument('--debug', action='store_true', default=True) + parser.add_argument('--save_dir', type=str, default='results/') + parser.add_argument('--expr_name', type=str, default="drop_gpt3.5_results") + parser.add_argument('--n_generation', type=int, default=30) + parser.add_argument('--debug_max', type=int, default=3) + parser.add_argument('--model', + type=str, + default='gpt-4o-2024-05-13', + choices=['gpt-4-turbo-2024-04-09', 'gpt-3.5-turbo-0125', 'gpt-4o-2024-05-13']) + args = parser.parse_args() + if args.verbose: + logging.basicConfig(level=logging.WARNING) + logging.getLogger("autogen_core").setLevel(logging.DEBUG) + handler = logging.FileHandler("adas.log") + logging.getLogger("autogen_core").addHandler(handler) + + asyncio.run(main(args)) diff --git a/python/packages/autogen-core/samples/common/adas/adas_prompt.py b/python/packages/autogen-core/samples/common/adas/adas_prompt.py new file mode 100644 index 00000000000..79a1342895b --- /dev/null +++ b/python/packages/autogen-core/samples/common/adas/adas_prompt.py @@ -0,0 +1,1048 @@ +import json + +EXAMPLE = { + "thought": "**Insights:**\nYour insights on what should be the next interesting agent.\n**Overall Idea:**\nyour reasoning and the overall concept behind the agent design.\n**Implementation:**\ndescribe the implementation step by step.", + "name": "Name of your proposed agent", + "code": """def forward(self, taskInfo): + # Your code here + return answer +""" +} + +# COT = { +# "thought": "By encouraging the LLM to think step by step rather than directly outputting an answer, chain-of-thought reasoning enables complex problem-solving through intermediate steps. This practice improves the model's ability to handle tasks that require deeper reasoning and provides insight into its decision-making process.", +# "name": "Chain-of-Thought", +# "code": """def forward(self, taskInfo): +# # Instruction for the Chain-of-Thought (CoT) approach +# # It is an important practice that allows the LLM to think step by step before solving the task. +# cot_instruction = "Please think step by step and then solve the task." + +# # Instantiate a new LLM agent specifically for CoT +# # To allow LLM thinking before answering, we need to set an additional output field 'thinking'. +# cot_agent = LLMAgentBase(['thinking', 'answer'], 'Chain-of-Thought Agent') + +# # Prepare the inputs for the CoT agent +# # The input should be a list of Info, and the first one is often the taskInfo +# cot_agent_inputs = [taskInfo] + +# # Get the response from the CoT agent +# thinking, answer = cot_agent(cot_agent_inputs, cot_instruction) + +# # Return only the final answer +# return answer +# """ +# } + + +COT = { + "thought": "By encouraging the LLM to think step by step rather than directly outputting an answer, chain-of-thought reasoning enables complex problem-solving through intermediate steps. This practice improves the model's ability to handle tasks that require deeper reasoning and provides insight into its decision-making process.", + "name": "Chain-of-Thought", + "code": """def forward(self, task, agent_model_kwargs): + import asyncio + import logging + import json + from dataclasses import dataclass + import sys + from autogen_core.application import SingleThreadedAgentRuntime + from autogen_core.base import AgentId, AgentRuntime, MessageContext + from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler, ClosureAgent, DefaultSubscription + from autogen_core.components.models import ( + ChatCompletionClient, + LLMMessage, + SystemMessage, + UserMessage, + ) + from autogen_ext.models import AzureOpenAIChatCompletionClient + from typing import List + from azure.identity import DefaultAzureCredential, get_bearer_token_provider + + token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") + + # Create an AzureOpenAI model client. + model_client = AzureOpenAIChatCompletionClient( + model=agent_model_kwargs['model'], + api_version=agent_model_kwargs['api_version'], + azure_endpoint=agent_model_kwargs['azure_endpoint'], + azure_ad_token_provider=token_provider, + model_capabilities={ + "vision": True, + "function_calling": True, + "json_output": True, + }, + ) + + # Define message types as data classes + @dataclass + class ChainOfThoughtTask: + task: str + + + @dataclass + class FinalResult: + result: str + + + # Define the Chain-of-Thought Agent + class ChainOfThoughtAgent(RoutedAgent): + def __init__(self, description: str, + model_client: ChatCompletionClient, + system_prompt: str, + instruction: str, + ) -> None: + super().__init__(description) + self._system_messages: List[LLMMessage] = [ + SystemMessage( + content=system_prompt, + ) + ] + self._model_client = model_client + self._instruction = instruction + + @message_handler + async def handle_task(self, message: ChainOfThoughtTask, ctx: MessageContext) -> None: + + logging.info(f"{self._description} received message: {message.task}") + user_prompt = message.task + "\\n" + self._instruction + msgs = self._system_messages + [UserMessage(content=user_prompt, source=self.metadata["type"])] + model_result = await self._model_client.create(msgs) + assert isinstance(model_result.content, str) + + await self.publish_message( + message=FinalResult(model_result.content), + topic_id=DefaultTopicId(), + ) + + + # Define the main function to set up and run the agent system + async def main(): + + queue = asyncio.Queue[FinalResult]() + async def output_result(_runtime: AgentRuntime, id: AgentId, message: FinalResult, ctx: MessageContext) -> None: + await queue.put(message) + + # Initialize the agent runtime + runtime = SingleThreadedAgentRuntime() + + # Create the chain-of-thought agent + agent_id = AgentId("COTAgent", "default") + cot_instruction = "Please think step by step and then solve the task." + await ChainOfThoughtAgent.register( + runtime, "COTAgent", lambda: ChainOfThoughtAgent( + description='Chain-of-Thought Agent', + model_client=model_client, + system_prompt="You are a helpful assistant. Directly answer the question. Keep it very concise.", + instruction=cot_instruction, + ) + ) + await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]) + + runtime.start() + initial_message = ChainOfThoughtTask(task=task) + await runtime.send_message(initial_message, agent_id) # publish_message + await runtime.stop_when_idle() + + # Return the first answer from the queue + return (await queue.get()).result + + return asyncio.run(main()) +""" +} + +COT_SC = { + "thought": "While an LLM can arrive at the correct answer, its reasoning may vary. By repeatedly asking the same question with high temperature settings, we can generate different reasoning paths. We then combine multiple answers from these Chain-of-Thought (CoT) agents to produce a more accurate final answer through ensembling.", + "name": "Self-Consistency with Chain-of-Thought", + "code": """def forward(self, task, agent_model_kwargs): + import asyncio + import logging + import json + from dataclasses import dataclass + import sys + from autogen_core.application import SingleThreadedAgentRuntime + from autogen_core.base import AgentId, AgentRuntime, MessageContext + from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler, ClosureAgent, DefaultSubscription + from autogen_core.components.models import ( + ChatCompletionClient, + LLMMessage, + SystemMessage, + UserMessage, + ) + from typing import List + from autogen_ext.models import AzureOpenAIChatCompletionClient + from azure.identity import DefaultAzureCredential, get_bearer_token_provider + + token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") + + # Create an AzureOpenAI model client. + model_client = AzureOpenAIChatCompletionClient( + model=agent_model_kwargs['model'], + api_version=agent_model_kwargs['api_version'], + azure_endpoint=agent_model_kwargs['azure_endpoint'], + azure_ad_token_provider=token_provider, + model_capabilities={ + "vision": True, + "function_calling": True, + "json_output": True, + }, + ) + + @dataclass + class WorkerTask: + task: str + previous_results: List[str] + + + @dataclass + class WorkerTaskResult: + result: str + + + @dataclass + class UserTask: + task: str + + + @dataclass + class FinalResult: + result: str + + + class WorkerAgent(RoutedAgent): + def __init__( + self, + model_client: ChatCompletionClient, + instruction: str, + ) -> None: + super().__init__(description="Worker Agent") + self._model_client = model_client + self._instruction = instruction + + @message_handler + async def handle_task(self, message: WorkerTask, ctx: MessageContext) -> WorkerTaskResult: + user_prompt = message.task + "\\n" + self._instruction + + if message.previous_results: + # If previous results are provided, we need to synthesize them to create a single prompt. + # system_prompt = "You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability.\\n\\nResponses from models:" + system_prompt = "Given all the solutions, reason over them carefully and provide a final answer." + system_prompt += "\\n" + "\\n\\n".join([f"{i+1}. {r}" for i, r in enumerate(message.previous_results)]) + model_result = await self._model_client.create( + [SystemMessage(system_prompt), UserMessage(content=user_prompt, source="user")] + ) + else: + # If no previous results are provided, we can simply pass the user query to the model. + model_result = await self._model_client.create([UserMessage(content=user_prompt, source="user")]) + assert isinstance(model_result.content, str) + print(f"{'-'*80}\\nWorker-{self.id}:\\n{model_result.content}") + return WorkerTaskResult(result=model_result.content) + + + class OrchestratorAgent(RoutedAgent): + def __init__( + self, + model_client: ChatCompletionClient, + worker_agent_types: List[str], + num_layers: int, + ) -> None: + super().__init__(description="Aggregator Agent") + self._model_client = model_client + self._worker_agent_types = worker_agent_types + self._num_layers = num_layers + + + @message_handler + async def handle_task(self, message: UserTask, ctx: MessageContext) -> FinalResult: + print(f"{'-'*80}\\nOrchestrator-{self.id}:\\nReceived task: {message.task}") + # Create task for the first layer. + worker_task = WorkerTask(task=message.task, previous_results=[]) + # Iterate over layers. + for i in range(self._num_layers): + # Assign workers for this layer. + worker_ids = [ + AgentId(worker_type, f"{self.id.key}/layer_{i}/worker_{j}") + for j, worker_type in enumerate(self._worker_agent_types) + ] + # Dispatch tasks to workers. + print(f"{'-'*80}\\nOrchestrator-{self.id}:\\nDispatch to workers at layer {i}") + results = await asyncio.gather(*[self.send_message(worker_task, worker_id) for worker_id in worker_ids]) + print(f"{'-'*80}\\nOrchestrator-{self.id}:\\nReceived results from workers at layer {i}") + # Prepare task for the next layer. + worker_task = WorkerTask(task=message.task, previous_results=[r.result for r in results]) + # Perform final aggregation. + print(f"{'-'*80}\\nOrchestrator-{self.id}:\\nPerforming final aggregation") + # system_prompt = "You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability.\\n\\nResponses from models:" + system_prompt = "Given all the above solutions, reason over them carefully and provide a final answer." + system_prompt += "\\n" + "\\n\\n".join([f"{i+1}. {r}" for i, r in enumerate(worker_task.previous_results)]) + model_result = await self._model_client.create( + [SystemMessage(system_prompt), UserMessage(content=message.task, source="user")] + ) + assert isinstance(model_result.content, str) + return FinalResult(result=model_result.content) + + # Define the main function to set up and run the agent system + async def main(): + + # Initialize the agent runtime + runtime = SingleThreadedAgentRuntime() + + cot_instruction = "Please think step by step and then solve the task." + await WorkerAgent.register( + runtime, "worker", lambda: WorkerAgent(model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"), instruction=cot_instruction) + ) + await OrchestratorAgent.register( + runtime, + "orchestrator", + lambda: OrchestratorAgent( + model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"), worker_agent_types=["worker"] * 5, num_layers=1 + ), + ) + + runtime.start() + result = await runtime.send_message(UserTask(task=task), AgentId("orchestrator", "default")) + return result.result + + return asyncio.run(main()) +""" +} + +Reflexion = { + "thought": "To enhance its performance, an LLM can iteratively improve its answer based on feedback. By reflecting on its previous attempts and incorporating feedback, the model can refine its reasoning and provide a more accurate solution.", + "name": "Self-Refine (Reflexion)", + "code": '''def forward(self, task, agent_model_kwargs): + import asyncio + import json + import logging + import re + import sys + import uuid + from dataclasses import dataclass + from typing import Dict, List, Union + from autogen_core.base import MessageContext, TopicId, AgentId, AgentRuntime + from autogen_core.components import RoutedAgent, default_subscription, message_handler, TypeSubscription + from autogen_core.components.models import ( + AssistantMessage, + ChatCompletionClient, + LLMMessage, + SystemMessage, + UserMessage, + ) + from autogen_core.application import SingleThreadedAgentRuntime + from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler, ClosureAgent + from autogen_ext.models import AzureOpenAIChatCompletionClient + from azure.identity import DefaultAzureCredential, get_bearer_token_provider + + token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") + + # Create an AzureOpenAI model client. + model_client = AzureOpenAIChatCompletionClient( + model=agent_model_kwargs['model'], + api_version=agent_model_kwargs['api_version'], + azure_endpoint=agent_model_kwargs['azure_endpoint'], + azure_ad_token_provider=token_provider, + model_capabilities={ + "vision": True, + "function_calling": True, + "json_output": True, + }, + ) + + @dataclass + class WritingTask: + task: str + + + @dataclass + class WritingResult: + task: str + answer: str + review: str + + + @dataclass + class ReviewTask: + session_id: str + writing_task: str + answer_scratchpad: str + answer: str + + + @dataclass + class ReviewResult: + review: str + session_id: str + approved: bool + + + @default_subscription + class WorkerAgent(RoutedAgent): + "An agent that performs writing tasks." + + def __init__(self, + model_client: ChatCompletionClient, + instruction: str, + ) -> None: + super().__init__("A helpful assistant") + self._system_messages: List[LLMMessage] = [ + SystemMessage( + content="""You are a helpful assistant. Work with the critic to improve your answer. + Make sure to directly answer the question. Keep it very concise. + Respond using the following format: + + Thoughts: + Answer: + """, + ) + ] + self._model_client = model_client + self._session_memory: Dict[str, List[WritingTask | ReviewTask | ReviewResult]] = {} + self._instruction = instruction + + @message_handler + async def handle_writing_task(self, message: WritingTask, ctx: MessageContext) -> None: + # Store the messages in a temporary memory for this request only. + session_id = str(uuid.uuid4()) + self._session_memory.setdefault(session_id, []).append(message) + # Generate a response using the chat completion API. + response = await self._model_client.create( + self._system_messages + [UserMessage(content=message.task + self._instruction, source=self.metadata["type"])], + cancellation_token=ctx.cancellation_token, + ) + assert isinstance(response.content, str) + # Extract the answer from the response. + answer = self._extract_answer(response.content) + if answer is None: + raise ValueError("Answer not found.") + # Create a review task. + review_task = ReviewTask( + session_id=session_id, + writing_task=message.task, + answer_scratchpad=response.content, + answer=answer, + ) + # Store the review task in the session memory. + self._session_memory[session_id].append(review_task) + # Publish a review task. + await self.publish_message(review_task, topic_id=TopicId("default", self.id.key)) + + @message_handler + async def handle_review_result(self, message: ReviewResult, ctx: MessageContext) -> None: + # Store the review result in the session memory. + self._session_memory[message.session_id].append(message) + # Obtain the request from previous messages. + review_request = next( + m for m in reversed(self._session_memory[message.session_id]) if isinstance(m, ReviewTask) + ) + assert review_request is not None + # Check if the is approved. + if message.approved: + # Publish the writing result. + await self.publish_message( + WritingResult( + answer=review_request.answer, + task=review_request.writing_task, + review=message.review, + ), + topic_id=TopicId("result", self.id.key), + ) + print("Writing Result:") + print("-" * 80) + print(f"Task:\\n{review_request.writing_task}") + print("-" * 80) + print(f"Answer:\\n{review_request.answer}") + print("-" * 80) + print(f"Review:\\n{message.review}") + print("-" * 80) + else: + # Create a list of LLM messages to send to the model. + messages: List[LLMMessage] = [*self._system_messages] + for m in self._session_memory[message.session_id]: + if isinstance(m, ReviewResult): + messages.append(UserMessage(content=m.review, source="Reviewer")) + elif isinstance(m, ReviewTask): + messages.append(AssistantMessage(content=m.answer_scratchpad, source="Worker")) + elif isinstance(m, WritingTask): + messages.append(UserMessage(content=m.task, source="User")) + else: + raise ValueError(f"Unexpected message type: {m}") + # Generate a revision using the chat completion API. + response = await self._model_client.create(messages, cancellation_token=ctx.cancellation_token) + assert isinstance(response.content, str) + # Extract the answer from the response. + answer = self._extract_answer(response.content) + if answer is None: + raise ValueError("Answer not found.") + # Create a new review task. + review_task = ReviewTask( + session_id=message.session_id, + writing_task=review_request.writing_task, + answer_scratchpad=response.content, + answer=answer, + ) + # Store the review task in the session memory. + self._session_memory[message.session_id].append(review_task) + # Publish a new review task. + await self.publish_message(review_task, topic_id=TopicId("default", self.id.key)) + + + def _extract_answer(self, text: str) -> Union[str, None]: + pattern = "(?<=Answer: ).*" + # Search for the pattern in the markdown text + match = re.search(pattern, text, re.DOTALL) + # Extract the language and code block if a match is found + if match: + return match.group(0) + return None + + @default_subscription + class ReviewerAgent(RoutedAgent): + """An agent that critiques tasks.""" + + def __init__(self, model_client: ChatCompletionClient) -> None: + super().__init__("A critic agent.") + self._system_messages: List[LLMMessage] = [ + SystemMessage( + content="""You are a critic. Review answers and criticize on where it might be wrong. + Respond using the following JSON format: + { + "correctness": "", + "approval": "", + "suggested_changes": "" + } + """, + ) + ] + self._session_memory: Dict[str, List[ReviewTask | ReviewResult]] = {} + self._model_client = model_client + + @message_handler + async def handle_review_task(self, message: ReviewTask, ctx: MessageContext) -> None: + # Format the prompt for the review. + # Gather the previous feedback if available. + previous_feedback = "" + if message.session_id in self._session_memory: + previous_review = next( + (m for m in reversed(self._session_memory[message.session_id]) if isinstance(m, ReviewResult)), + None, + ) + if previous_review is not None: + previous_feedback = previous_review.review + # Store the messages in a temporary memory for this request only. + self._session_memory.setdefault(message.session_id, []).append(message) + prompt = f"""The problem statement is: {message.writing_task} + The answer is: + ``` + {message.answer} + ``` + + Previous feedback: + {previous_feedback} + + Please review the answer. If previous feedback was provided, see if it was addressed. + """ + # Generate a response using the chat completion API. + response = await self._model_client.create( + self._system_messages + [UserMessage(content=prompt, source=self.metadata["type"])], + cancellation_token=ctx.cancellation_token, + json_output=True, + ) + assert isinstance(response.content, str) + # TODO: use structured generation library e.g. guidance to ensure the response is in the expected format. + # Parse the response JSON. + review = json.loads(response.content) + # Construct the review text. + review_text = "Review:\\n" + "\\n".join([f"{k}: {v}" for k, v in review.items()]) + approved = review["approval"].lower().strip() == "approve" + result = ReviewResult( + review=review_text, + session_id=message.session_id, + approved=approved, + ) + # Store the review result in the session memory. + self._session_memory[message.session_id].append(result) + # Publish the review result. + await self.publish_message(result, topic_id=TopicId("default", self.id.key)) + + + # Define the main function to set up and run the agent system + async def main(): + queue = asyncio.Queue[WritingResult]() + async def output_result(_runtime: AgentRuntime, id: AgentId, message: WritingResult, ctx: MessageContext) -> None: + await queue.put(message) + + runtime = SingleThreadedAgentRuntime() + await ReviewerAgent.register( + runtime, "ReviewerAgent", lambda: ReviewerAgent(model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini")) + ) + cot_instruction = "Please think step by step and then solve the task." + await WorkerAgent.register( + runtime, "WorkerAgent", lambda: WorkerAgent(model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"), instruction=cot_instruction) + ) + result_topic = TypeSubscription(topic_type="result", agent_type="output_result") + await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [result_topic]) + + runtime.start() + await runtime.publish_message( + message=WritingTask(task=task), + topic_id=DefaultTopicId(), + ) + + # Keep processing messages until idle. + await runtime.stop_when_idle() + + # Return the answer from the queue + return (await queue.get()).answer + + return asyncio.run(main()) +''' +} + +LLM_debate = { + "thought": "By letting different LLMs debate with each other, we can leverage their diverse perspectives to find better solutions for tasks.", + "name": "LLM Debate", + "code": """def forward(self, taskInfo): + # Instruction for initial reasoning + debate_initial_instruction = "Please think step by step and then solve the task." + + # Instruction for debating and updating the solution based on other agents' solutions + debate_instruction = "Given solutions to the problem from other agents, consider their opinions as additional advice. Please think carefully and provide an updated answer." + + # Initialize debate agents with different roles and a moderate temperature for varied reasoning + debate_agents = [LLMAgentBase(['thinking', 'answer'], 'Debate Agent', temperature=0.8, role=role) for role in ['Reading Comprehension Specialist', 'Logical Reasoning Strategist', 'Multidisciplinary Knowledge Integrator']] + + # Instruction for final decision-making based on all debates and solutions + final_decision_instruction = "Given all the above thinking and answers, reason over them carefully and provide a final answer." + final_decision_agent = LLMAgentBase(['thinking', 'answer'], 'Final Decision Agent', temperature=0.1) + + max_round = 2 # Maximum number of debate rounds + all_thinking = [[] for _ in range(max_round)] + all_answer = [[] for _ in range(max_round)] + + # Perform debate rounds + for r in range(max_round): + for i in range(len(debate_agents)): + if r == 0: + thinking, answer = debate_agents[i]([taskInfo], debate_initial_instruction) + else: + input_infos = [taskInfo] + [all_thinking[r-1][i]] + all_thinking[r-1][:i] + all_thinking[r-1][i+1:] + thinking, answer = debate_agents[i](input_infos, debate_instruction) + all_thinking[r].append(thinking) + all_answer[r].append(answer) + + # Make the final decision based on all debate results and solutions + thinking, answer = final_decision_agent([taskInfo] + all_thinking[max_round-1] + all_answer[max_round-1], final_decision_instruction) + return answer +""" +} + +Take_a_step_back = {"thought": "Let LLM first think about the principles involved in solving this task which could be helpful. By understanding the underlying principles, the model can better reason through the problem and provide a more accurate solution.", + "name": "Step-back Abstraction", + "code": """def forward(self, taskInfo): + # Instruction for understanding the principles involved in the task + principle_instruction = "What are the physics, chemistry or biology principles and concepts involved in solving this task? First think step by step. Then list all involved principles and explain them." + + # Instruction for solving the task based on the principles + cot_instruction = "Given the question and the involved principle behind the question, think step by step and then solve the task." + + # Instantiate LLM agents + principle_agent = LLMAgentBase(['thinking', 'principle'], 'Principle Agent') + cot_agent = LLMAgentBase(['thinking', 'answer'], 'Chain-of-Thought Agent') + + # Get the principles involved in the task + thinking, principle = principle_agent([taskInfo], principle_instruction) + + # Use the principles to solve the task + thinking, answer = cot_agent([taskInfo, thinking, principle], cot_instruction) + return answer +""" + } + +QD = {"thought": "Similar to Quality-Diversity methods, let LLM generate multiple diverse interesting solutions could help. By encouraging the model to explore different reasoning paths, we can increase the chances of finding the best solution.", + "name": "Quality-Diversity", + "code": """def forward(self, taskInfo): + # Instruction for initial reasoning + cot_initial_instruction = "Please think step by step and then solve the task." + + # Instruction for giving diverse answers + qd_instruction = "Given previous attempts, try to come up with another interesting way to solve the task." + cot_agent = LLMAgentBase(['thinking', 'answer'], 'Chain-of-Thought Agent') + + # Instruction for final decision-making based on collected reasoning and answers + final_decision_instruction = "Given all the above solutions, reason over them carefully and provide a final answer." + final_decision_agent = LLMAgentBase(['thinking', 'answer'], 'Final Decision Agent', temperature=0.1) + + N_max = 3 # Maximum number of attempts + + # Initial attempt + cot_inputs = [taskInfo] + possible_answers = [] + thinking, answer = cot_agent(cot_inputs, cot_initial_instruction, 0) + + # Add the answer to the list of possible answers + possible_answers.extend([thinking, answer]) + + for i in range(N_max): + # Reflect on previous attempts and generate another interesting answer + cot_inputs.extend([thinking, answer]) + + # Generate another interesting answer + thinking, answer = cot_agent(cot_inputs, qd_instruction, i + 1) + possible_answers.extend([thinking, answer]) + + # Make the final decision based on all generated answers + thinking, answer = final_decision_agent([taskInfo] + possible_answers, final_decision_instruction) + return answer +""" + } + +Role_Assignment = {"thought": "Similar to Auto-GPT and expert prompting, we can use dynamic control flow in the design to let the agent decide what expert we should use.", + "name": "Dynamic Assignment of Roles", + "code": """def forward(self, taskInfo): + # Instruction for step-by-step reasoning + cot_instruction = "Please think step by step and then solve the task." + expert_agents = [LLMAgentBase(['thinking', 'answer'], 'Expert Agent', role=role) for role in ['Reading Comprehension Specialist', 'Logical Reasoning Strategist', 'Multidisciplinary Knowledge Integrator', 'Helpful Assistant']] + + # Instruction for routing the task to the appropriate expert + routing_instruction = "Given the task, please choose an Expert to answer the question. Choose from: Math Professor, Grade School Teacher, Math Enthusiast." + routing_agent = LLMAgentBase(['choice'], 'Routing agent') + + # Get the choice of expert to route the task + choice = routing_agent([taskInfo], routing_instruction)[0] + + if 'professor' in choice.content.lower(): + expert_id = 0 + elif 'teacher' in choice.content.lower(): + expert_id = 1 + elif 'enthusiast' in choice.content.lower(): + expert_id = 2 + else: + expert_id = 3 # Default to helpful assistant + + thinking, answer = expert_agents[expert_id]([taskInfo], cot_instruction) + return answer +""" + } + +system_prompt = """You are a helpful assistant. Make sure to return in a WELL-FORMED JSON object.""" + +base = """# Overview +You are an expert machine learning researcher testing various agentic systems. Your objective is to design building blocks such as prompts and control flows within these systems to solve complex tasks. Your aim is to design an optimal agent performing well on the Reading Comprehension Benchmark Requiring Discrete Reasoning Over Paragraphs (DROP), which assesses the ability to perform discrete reasoning and comprehend detailed information across multiple paragraphs. + +## An example question from DROP: + +You will be asked to read a passage and answer a question. +Passage: +Non-nationals make up more than half of the population of Bahrain, with immigrants making up about 55% of the overall population. Of those, the vast majority come from South and Southeast Asia: according to various media reports and government statistics dated between 2005-2009 roughly 290,000 Indians, 125,000 Bangladeshis, 45,000 Pakistanis, 45,000 Filipinos, and 8,000 Indonesians.\nQuestion: What two nationalities had the same number of people living in Bahrain between 2005-2009? +Answer [Not Given]: +Pakistanis and Filipinos + + +# The utility code: + +```python +from collections import namedtuple +from typing import Union +import numpy as np +import json + +import openai +import backoff +from utils import random_id + +# Initialize the OpenAI client +client = openai.OpenAI() + +# Named tuple for holding task information +Info = namedtuple('Info', ['name', 'author', 'content', 'iteration_idx']) + +# Format instructions for LLM response +FORMAT_INST = lambda request_keys: f"Reply EXACTLY with the following JSON format.\n{str(request_keys)}\nDO NOT MISS ANY FIELDS AND MAKE SURE THE JSON FORMAT IS CORRECT!\n" + +# Description of the role for the LLM +ROLE_DESC = lambda role: f"You are a {role}." + +@backoff.on_exception(backoff.expo, openai.RateLimitError) +def get_json_response_from_gpt(msg, model, system_message, temperature=0.5): + \""" + Function to get JSON response from GPT model. + + Args: + - msg (str): The user message. + - model (str): The model to use. + - system_message (str): The system message. + - temperature (float): Sampling temperature. + + Returns: + - dict: The JSON response. + \""" + response = client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_message}, + {"role": "user", "content": msg}, + ], + temperature=temperature, + max_tokens=1024, + stop=None, + response_format={"type": "json_object"} + ) + content = response.choices[0].message.content + json_dict = json.loads(content) + return json_dict + +class LLMAgentBase: + \""" + Base class for an LLM agent. + + Attributes: + - output_fields (list): Fields expected in the output. + - agent_name (str): Name of the agent. + - role (str): Role description for the agent. + - model (str): Model to be used. (option. Keep it default.) + - temperature (float): Sampling temperature. + - id (str): Unique identifier for the agent instance. + \""" + + def __init__(self, output_fields: list, agent_name: str, role='helpful assistant', model='gpt-3.5-turbo-0125', temperature=0.5) -> None: + self.output_fields = output_fields + self.agent_name = agent_name + self.role = role + self.model = model + self.temperature = temperature + self.id = random_id() + + def generate_prompt(self, input_infos, instruction) -> str: + \""" + Generates a prompt for the LLM. + + Args: + - input_infos (list): List of input information. + - instruction (str): Instruction for the task. + + Returns: + - tuple: System prompt and user prompt. + + An example of a generated prompt: + "" + You are a helpful assistant. + + # Output Format: + Reply EXACTLY with the following JSON format. + ... + + # Your Task: + You will be given some number of paired example inputs and outputs. The outputs ... + + ### thinking #1 by Chain-of-Thought Agent hkFo (yourself): + ... + + ### code #1 by Chain-of-Thought Agent hkFo (yourself): + ... + + ### answer by Chain-of-Thought Agent hkFo's code evaluator:... + + + # Instruction: + Please think step by step and then solve the task by writing the code. + "" + \""" + output_fields_and_description = {key: f"Your {key}." if not 'answer' in key else f"Your {key}. Return ONLY the alphabet choice, i.e. A or B or C or D." for key in self.output_fields} + system_prompt = ROLE_DESC(self.role) + "\n\n" + FORMAT_INST(output_fields_and_description) + + input_infos_text = '' + for input_info in input_infos: + if isinstance(input_info, Info): + (field_name, author, content, iteration_idx) = input_info + else: + continue + if author == self.__repr__(): + author += ' (yourself)' + if field_name == 'task': + input_infos_text += f'# Your Task:\n{content}\n\n' + elif iteration_idx != -1: + input_infos_text += f'### {field_name} #{iteration_idx+1} by {author}:\n{content}\n\n' + else: + input_infos_text += f'### {field_name} by {author}:\n{content}\n\n' + + prompt = input_infos_text + instruction + return system_prompt, prompt + + def query(self, input_infos: list, instruction, iteration_idx=-1) -> list[Info]: + \""" + Queries the LLM with provided input information and instruction. + + Args: + - input_infos (list): List of input information. + - instruction (str): Instruction for the task. + - iteration_idx (int): Iteration index for the task. + + Returns: + - output_infos (list[Info]): Output information. + \""" + system_prompt, prompt = self.generate_prompt(input_infos, instruction) + response_json = get_json_response_from_gpt(prompt, self.model, system_prompt, self.temperature) + + output_infos = [] + for key, value in response_json.items(): + info = Info(key, self.__repr__(), value, iteration_idx) + output_infos.append(info) + return output_infos + + def __repr__(self): + return f"{self.agent_name} {self.id}" + + def __call__(self, input_infos: list, instruction, iteration_idx=-1): + # Note: + # The output of the LLM is a list of Info. If you are only querying one output, you should access it with [0]. + # It is a good practice to always include 'thinking' in the output. + return self.query(input_infos, instruction, iteration_idx=iteration_idx) + +class AgentArchitecture: + \""" + Fill in your code here. + \""" + def forward(self, taskInfo) -> Union[Info, str]: + \""" + Placeholder method for processing task information. + + Args: + - taskInfo (Info): Task information. + + Returns: + - Answer (Union[Info, str]): Your FINAL Answer. Return either a namedtuple Info or a string of answers. + \""" + pass +``` +# Discovered architecture archive +Here is the archive of the discovered architectures: + +[ARCHIVE] + +The fitness value is the median and 95% Bootstrap Confidence Interval of the correct rate on a validation question set. Your GOAL is to maximize the "fitness". + +# Output Instruction and Example: +The first key should be ("thought"), and it should capture your thought process for designing the next function. In the "thought" section, first reason about what should be the next interesting agent to try, then describe your reasoning and the overall concept behind the agent design, and finally detail the implementation steps. +The second key ("name") corresponds to the name of your next agent architecture. +Finally, the last key ("code") corresponds to the exact “forward()” function in Python code that you would like to try. You must write a COMPLETE CODE in "code": Your code will be part of the entire project, so please implement complete, reliable, reusable code snippets. + +Here is an example of the output format for the next agent architecture: + +[EXAMPLE] + +You must use the exact function interface used above. You need to specify the instruction, input information, and the required output fields for various LLM agents to do their specific part of the architecture. +Also, it could be helpful to set the LLM’s role and temperature to further control the LLM’s response. Note that the LLMAgentBase() will automatically parse the output and return a list of “Infos”. You can get the content by Infos.content. +DO NOT FORGET the taskInfo input to LLM if you think it is needed, otherwise LLM will not know about the task. + +## WRONG Implementation examples: +Here are some mistakes you may make: + +1. This is WRONG: ``` +feedback, correct = critic_agent([taskInfo, thinking, answer], critic_instruction, i) +feedback_info = verifier_agent([taskInfo, Info('feedback', 'Critic Agent', thinking, 0)], verification_instruction) +``` +It is wrong to use "Info('feedback', 'Critic Agent', thinking, 0)". The returned "feedback" from LLMAgentBase is already Info. + +2. This is WRONG: ``` +# Debugging: Log the generated answer +print('Generated Answer:', ...) +feedback_info = verifier_agent([taskInfo, Info('feedback', 'Critic Agent', thinking, 0)], verification_instruction) +if len(feedback_info) < 3: # Check if feedback_info has enough elements + return 'Error: Feedback info incomplete' +``` +First, the len(feedback_info) will not work. +Second, you should never return an error message. You should always return the best answer you can get. +Third, you should never print anything in the code. +Lastly, again, DO NOT CREATE Info object by yourself. + +3. This is WRONG: ``` +all_thinking = [] +all_answers = [] +for agent, role in zip(agents, roles): + outputs = agent([taskInfo], independent_reasoning_instruction.format(role=role)) + all_thinking.append(outputs[0].content) + all_answers.append(outputs[1].content) + +# Aggregate the reasoning paths and answers +aggregated_thinking = '\n'.join(all_thinking) +aggregated_answers = '\n'.join(all_answers) +``` +You SHOULD NOT extract the content from the Info object by yourself. You should use the Info object directly. If you want to aggregate the content, you should just put those Info objects into a list and then use the list as input to the next LLM agent. + +4. This is WRONG: ``` +reasoning_agent = LLMAgentBase(['thinking', 'answer'], 'Reasoning Agent') +response_infos = reasoning_agent([taskInfo] + ..., reasoning_instruction) + +# Extract the final answer from the response_infos +for info in response_infos: + if info.name == 'final_answer': + return info +# Fallback if no answer is found +return Info('answer', 'Final Decision Agent', 'No answer generated.', 0) +``` +You should not extract the final answer by yourself. You SHOULD directly return the answer Info. Also, you should always return the best answer you can get. +CORRECT example: ``` +reasoning_agent = LLMAgentBase(['thinking', 'answer'], 'Reasoning Agent') +thinking, answer = reasoning_agent([taskInfo] + ..., reasoning_instruction) +return answer +``` + +# Your task +You are deeply familiar with prompting techniques and the agent works from the literature. Your goal is to maximize the specified performance metrics by proposing interestingly new agents. +Observe the discovered agents carefully and think about what insights, lessons, or stepping stones can be learned from them. +Be creative when thinking about the next interesting agent to try. You are encouraged to draw inspiration from related agent papers or academic papers from other research areas. +Use the knowledge from the archive and inspiration from academic literature to propose the next interesting agentic system design. +THINK OUTSIDE THE BOX. +""" + +Reflexion_prompt_1 = f""""[EXAMPLE]Carefully review the proposed new architecture and reflect on the following points: + +1. **Interestingness**: Assess whether your proposed architecture is interesting or innovative compared to existing methods in the archive. If you determine that the proposed architecture is not interesting, suggest a new architecture that addresses these shortcomings. +- Make sure to check the difference between the proposed architecture and previous attempts. +- Compare the proposal and the architectures in the archive CAREFULLY, including their actual differences in the implementation. +- Decide whether the current architecture is innovative. +- USE CRITICAL THINKING! + +2. **Implementation Mistakes**: Identify any mistakes you may have made in the implementation. Review the code carefully, debug any issues you find, and provide a corrected version. REMEMBER checking "## WRONG Implementation examples" in the prompt. + +3. **Improvement**: Based on the proposed architecture, suggest improvements in the detailed implementation that could increase its performance or effectiveness. In this step, focus on refining and optimizing the existing implementation without altering the overall design framework, except if you want to propose a different architecture if the current is not interesting. +- Observe carefully about whether the implementation is actually doing what it is supposed to do. +- Check if there is redundant code or unnecessary steps in the implementation. Replace them with effective implementation. +- Try to avoid the implementation being too similar to the previous agent. + +And then, you need to improve or revise the implementation, or implement the new proposed architecture based on the reflection. + +Your response should be organized as follows: + +"reflection": Provide your thoughts on the interestingness of the architecture, identify any mistakes in the implementation, and suggest improvements. + +"thought": Revise your previous proposal or propose a new architecture if necessary, using the same format as the example response. + +"name": Provide a name for the revised or new architecture. (Don't put words like "new" or "improved" in the name.) + +"code": Provide the corrected code or an improved implementation. Make sure you actually implement your fix and improvement in this code. +""" + +Reflexion_prompt_2 = """Using the tips in "## WRONG Implementation examples" section, revise the code further. +Your response should be organized as follows: +Put your new reflection thinking in "reflection". Repeat the previous "thought" and "name", and update the corrected version of the code in "code". +""" + + +def get_init_archive(): + # return [COT]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] + # return [COT_SC]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] + return [Reflexion]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] + + + +def get_prompt(current_archive, adaptive=False): + archive_str = ",\n".join([json.dumps(sol) for sol in current_archive]) + archive_str = f"[{archive_str}]" + prompt = base.replace("[ARCHIVE]", archive_str) + prompt = prompt.replace("[EXAMPLE]", json.dumps(EXAMPLE)) + + return system_prompt, prompt + + +def get_reflexion_prompt(prev_example): + prev_example_str = "Here is the previous agent you tried:\n" + json.dumps(prev_example) + "\n\n" + r1 = Reflexion_prompt_1.replace("[EXAMPLE]", prev_example_str) if prev_example else Reflexion_prompt_1.replace("[EXAMPLE]", "") + return r1, Reflexion_prompt_2 diff --git a/python/packages/autogen-core/samples/common/adas/utils.py b/python/packages/autogen-core/samples/common/adas/utils.py new file mode 100644 index 00000000000..5876c612790 --- /dev/null +++ b/python/packages/autogen-core/samples/common/adas/utils.py @@ -0,0 +1,296 @@ +# https://github.com/openai/simple-evals/blob/main/drop_eval.py +""" +DROP: A Reading Comprehension Benchmark Requiring Discrete Reasoning Over Paragraphs +Dheeru Dua, Yizhong Wang, Pradeep Dasigi, Gabriel Stanovsky, Sameer Singh, Matt Gardner +https://arxiv.org/abs/1903.00161 +""" + +import gzip +import json +import random +import re +import string +from typing import Any, Dict, List, Set, Tuple, Union + +import numpy as np +from scipy.optimize import linear_sum_assignment + + +def _remove_articles(text: str) -> str: + regex = re.compile(r"\b(a|an|the)\b", re.UNICODE) + return re.sub(regex, " ", text) + + +def _white_space_fix(text: str) -> str: + return " ".join(text.split()) + + +EXCLUDE = set(string.punctuation) + + +def _remove_punc(text: str) -> str: + if not _is_number(text): + return "".join(ch for ch in text if ch not in EXCLUDE) + else: + return text + + +def _lower(text: str) -> str: + return text.lower() + + +def _tokenize(text: str) -> List[str]: + return re.split(" |-", text) + + +def _normalize_answer(text: str) -> str: + """Lower text and remove punctuation, articles and extra whitespace.""" + + parts = [ + _white_space_fix(_remove_articles(_normalize_number(_remove_punc(_lower(token))))) + for token in _tokenize(text) + ] + parts = [part for part in parts if part.strip()] + normalized = " ".join(parts).strip() + return normalized + + +def _is_number(text: str) -> bool: + try: + float(text) + return True + except ValueError: + return False + + +def _normalize_number(text: str) -> str: + if _is_number(text): + return str(float(text)) + else: + return text + + +def _answer_to_bags( + answer: Union[str, List[str], Tuple[str, ...]] +) -> Tuple[List[str], List[Set[str]]]: + if isinstance(answer, (list, tuple)): + raw_spans = answer + else: + raw_spans = [answer] + normalized_spans: List[str] = [] + token_bags = [] + for raw_span in raw_spans: + normalized_span = _normalize_answer(raw_span) + normalized_spans.append(normalized_span) + token_bags.append(set(normalized_span.split())) + return normalized_spans, token_bags + + +def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]: + """ + Takes gold and predicted answer sets and first finds the optimal 1-1 alignment + between them and gets maximum metric values over all the answers. + """ + scores = np.zeros([len(gold), len(predicted)]) + for gold_index, gold_item in enumerate(gold): + for pred_index, pred_item in enumerate(predicted): + if _match_numbers_if_present(gold_item, pred_item): + scores[gold_index, pred_index] = _compute_f1(pred_item, gold_item) + row_ind, col_ind = linear_sum_assignment(-scores) + + max_scores = np.zeros([max(len(gold), len(predicted))]) + for row, column in zip(row_ind, col_ind): + max_scores[row] = max(max_scores[row], scores[row, column]) + return max_scores + + +def _compute_f1(predicted_bag: Set[str], gold_bag: Set[str]) -> float: + intersection = len(gold_bag.intersection(predicted_bag)) + if not predicted_bag: + precision = 1.0 + else: + precision = intersection / float(len(predicted_bag)) + if not gold_bag: + recall = 1.0 + else: + recall = intersection / float(len(gold_bag)) + f1 = ( + (2 * precision * recall) / (precision + recall) + if not (precision == 0.0 and recall == 0.0) + else 0.0 + ) * 100 + return f1 + + +def _match_numbers_if_present(gold_bag: Set[str], predicted_bag: Set[str]) -> bool: + gold_numbers = set() + predicted_numbers = set() + for word in gold_bag: + if _is_number(word): + gold_numbers.add(word) + for word in predicted_bag: + if _is_number(word): + predicted_numbers.add(word) + if (not gold_numbers) or gold_numbers.intersection(predicted_numbers): + return True + return False + + +def get_drop_metrics( + predicted: Union[str, List[str], Tuple[str, ...]], gold: Union[str, List[str], Tuple[str, ...]] +) -> Tuple[float, float]: + """ + Takes a predicted answer and a gold answer (that are both either a string or a list of + strings), and returns exact match and the DROP F1 metric for the prediction. If you are + writing a script for evaluating objects in memory (say, the output of predictions during + validation, or while training), this is the function you want to call, after using + :func:`answer_json_to_strings` when reading the gold answer from the released data file. + """ + predicted_bags = _answer_to_bags(predicted) + gold_bags = _answer_to_bags(gold) + + if set(predicted_bags[0]) == set(gold_bags[0]) and len(predicted_bags[0]) == len(gold_bags[0]): + exact_match = 1.0 + else: + exact_match = 0.0 + + f1_per_bag = _align_bags(predicted_bags[1], gold_bags[1]) + f1 = np.mean(f1_per_bag) + f1 = round(f1, 2) + return exact_match, f1 + + +def answer_json_to_strings(answer: Dict[str, Any]) -> Tuple[Tuple[str, ...], str]: + """ + Takes an answer JSON blob from the DROP data release and converts it into strings used for + evaluation. + """ + if "number" in answer and answer["number"]: + return tuple([str(answer["number"])]), "number" + elif "spans" in answer and answer["spans"]: + return tuple(answer["spans"]), "span" if len(answer["spans"]) == 1 else "spans" + elif "date" in answer: + return ( + tuple( + [ + "{0} {1} {2}".format( + answer["date"]["day"], answer["date"]["month"], answer["date"]["year"] + ).strip() + ] + ), + "date", + ) + else: + raise ValueError( + f"Answer type not found, should be one of number, spans or date at: {json.dumps(answer)}" + ) + + +def answer_json_to_string(answer_json): + return json.dumps(answer_json_to_strings(answer_json)) + + +def normalize(s: str) -> str: + """Lower text and remove punctuation, articles and extra whitespace.""" + s = s.lower() + exclude = set(string.punctuation) + s = "".join(char for char in s if char not in exclude) + s = re.sub(r"\b(a|an|the)\b", " ", s) + s = " ".join(s.split()) + return s + + +def fuzzy_match(s1: str, s2: str) -> bool: + s1 = normalize(s1) + s2 = normalize(s2) + + if s1 == "" or s2 == "": + return s1 == s2 + + return s1 in s2 or s2 in s1 + + +def drop_metric(sample: str, reference: list[str]) -> Tuple[float, float]: + em_scores = [] + f1_scores = [] + for answer in reference: + if answer.strip() != "": + em, f1 = get_drop_metrics(sample, answer) + em_scores.append(em) + f1_scores.append(f1) + return (max(em_scores), max(f1_scores)) + + +def load_drop(file_path): + with gzip.open(file_path, mode="rb") as f: + test_samples = [json.loads(line) for line in f] + prompt = """You will be asked to read a passage and answer a question.\n""" + few_shot_prompt = """You will be asked to read a passage and answer a question. + +# Examples: +Passage: As of the census of 2000, there were 952 people, 392 households, and 241 families residing in the village. The population density was 952.9 people per square mile (367.6/km²). There were 449 housing units at an average density of 449.4 per square mile (173.4/km²). The racial makeup of the village was 96.11% White (U.S. Census), 0.95% African American (U.S. Census) or Race (United States Census), 0.11% Native American (U.S. Census), 0.11% Asian (U.S. Census), 0.21% from Race (United States Census), and 2.52% from two or more races. 1.05% of the population were Hispanics in the United States or Latino (U.S. Census) of any race.\nQuestion: How many more people, in terms of percentage, were from two or more races compared to being solely Native American or solely Asian?\nAnswer: 2.3 + +# Your Task +--- + +""" + examples = [] + for sample in test_samples: + sample['inputs'] = few_shot_prompt + sample['context'] + sample['targets'] = sample["ref_text"].split("|") + examples.append(sample) + return examples + + +def random_id(length=4): + characters = string.ascii_letters + string.digits # includes both upper/lower case letters and numbers + random_id = ''.join(random.choices(characters, k=length)) + return random_id + + +def bootstrap_confidence_interval(data, num_bootstrap_samples=100000, confidence_level=0.95): + """ + Calculate the bootstrap confidence interval for the mean of 1D accuracy data. + Also returns the median of the bootstrap means. + + Args: + - data (list or array of float): 1D list or array of data points. + - num_bootstrap_samples (int): Number of bootstrap samples. + - confidence_level (float): The desired confidence level (e.g., 0.95 for 95%). + + Returns: + - str: Formatted string with 95% confidence interval and median as percentages with one decimal place. + """ + # Convert data to a numpy array for easier manipulation + data = np.array(data) + + # List to store the means of bootstrap samples + bootstrap_means = [] + + # Generate bootstrap samples and compute the mean for each sample + for _ in range(num_bootstrap_samples): + # Resample with replacement + bootstrap_sample = np.random.choice(data, size=len(data), replace=True) + # Compute the mean of the bootstrap sample + bootstrap_mean = np.mean(bootstrap_sample) + bootstrap_means.append(bootstrap_mean) + + # Convert bootstrap_means to a numpy array for percentile calculation + bootstrap_means = np.array(bootstrap_means) + + # Compute the lower and upper percentiles for the confidence interval + lower_percentile = (1.0 - confidence_level) / 2.0 + upper_percentile = 1.0 - lower_percentile + ci_lower = np.percentile(bootstrap_means, lower_percentile) + ci_upper = np.percentile(bootstrap_means, upper_percentile) + + # Compute the median of the bootstrap means + median = np.median(bootstrap_means) + + # Convert to percentages and format to one decimal place + ci_lower_percent = ci_lower + ci_upper_percent = ci_upper + median_percent = median + + # Return the formatted string with confidence interval and median + return f"95% Bootstrap Confidence Interval: ({ci_lower_percent:.1f}%, {ci_upper_percent:.1f}%), Median: {median_percent:.1f}%" From f4f9dce45c34c598e623f5376b8cdd7a2bccbae3 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 20 Nov 2024 16:42:45 -0500 Subject: [PATCH 2/3] wip --- .../autogen-core/samples/common/adas/adas.py | 287 ++++----- .../samples/common/adas/adas_prompt.py | 544 +++++++++++------- 2 files changed, 474 insertions(+), 357 deletions(-) diff --git a/python/packages/autogen-core/samples/common/adas/adas.py b/python/packages/autogen-core/samples/common/adas/adas.py index 3a1c6bad78c..7bd4cc2b779 100644 --- a/python/packages/autogen-core/samples/common/adas/adas.py +++ b/python/packages/autogen-core/samples/common/adas/adas.py @@ -18,10 +18,15 @@ from tqdm import tqdm import threading import random +import numpy as np +import requests +from github import Github -from autogen_agentchat.agents import CodeExecutorAgent, CodingAssistantAgent -from autogen_core.base import AgentId, AgentType, AgentRuntime, CancellationToken, MessageContext, TopicId from autogen_core.components import RoutedAgent, default_subscription, message_handler +from autogen_core.application import SingleThreadedAgentRuntime +from autogen_core.base import AgentId, AgentType, AgentRuntime, CancellationToken, MessageContext, TopicId +from autogen_core.components import DefaultTopicId +from autogen_core.components.code_executor import CodeBlock, CodeExecutor, extract_markdown_code_blocks from autogen_core.components.models import ( AssistantMessage, ChatCompletionClient, @@ -29,16 +34,10 @@ SystemMessage, UserMessage, ) - -from autogen_core.application import SingleThreadedAgentRuntime -from autogen_core.components import DefaultTopicId -from autogen_core.components.models import OpenAIChatCompletionClient -from autogen_core.components.tools import FunctionTool, PythonCodeExecutionTool, ToolSchema from autogen_core.components.tool_agent import ToolAgent, tool_agent_caller_loop +from autogen_core.components.tools import FunctionTool, PythonCodeExecutionTool, ToolSchema from autogen_ext.code_executors import DockerCommandLineCodeExecutor #, extract_markdown_code_blocks -from autogen_core.components.code_executor import CodeBlock, CodeExecutor, extract_markdown_code_blocks from autogen_magentic_one.utils import LogHandler -from autogen_core.application.logging import EVENT_LOGGER_NAME # TODO fix imports import sys @@ -57,32 +56,43 @@ SEARCHING_MODE = True +def read_github_file(url): + response = requests.get(url) + if response.status_code == 200: + return response.text + else: + return None -@dataclass -class CodeWritingTask: - task: str - - -@dataclass -class CodeWritingResult: - task: str - code: str - review: str +def print_repo_contents(repo, path="", indent=""): + contents = repo.get_contents(path) + documentation = [] + for content_file in contents: + if content_file.type == "dir": + documentation.extend(print_repo_contents(repo, content_file.path, indent + "│ ")) + else: + if content_file.download_url.endswith('.md'): + print(f"Reading file from {content_file.download_url}") + f = read_github_file(content_file.download_url) + documentation.append("Title: " + content_file.name + "\nContents:\n" + f) + return documentation -@dataclass -class CodeReviewTask: - session_id: str - code_writing_task: str - code_writing_scratchpad: str - code: str +def get_autogen_documentation(): + repo_name = "microsoft/autogen" + directory_name = "python/packages/autogen-core/docs/src/user-guide/core-user-guide" + g = Github() -@dataclass -class CodeReviewResult: - review: str - session_id: str - approved: bool + subdirectories = ['core-concepts', 'framework'] + documentation = [] + for subdir in subdirectories: + try: + repo = g.get_repo(repo_name) + documentation.extend(print_repo_contents(repo, directory_name + '/'+ subdir)) + except Exception as e: + print(f"Error: {e}") + print(f"Found {len(documentation)} pages of documentation") + return documentation @dataclass @@ -126,80 +136,11 @@ class LLMAgentBaseResponse: output: str -# An agent that makes a direct call to the model, and returns json -class SimpleReflectAgent(RoutedAgent): - def __init__(self, description: str, model_client: ChatCompletionClient, system_prompt: str) -> None: - super().__init__(description) - self._system_messages: List[LLMMessage] = [ - SystemMessage( - content=system_prompt, - ) - ] - self._chat_history: List[LLMMessage] = [] - self._model_client = model_client - self._cnt = 0 - - @message_handler - async def handle_task(self, message: LLMMessageList, ctx: MessageContext) -> SimpleReflectAgentResponse: - # logging.info(f"{self._description} received message: {message}") - # import pdb; pdb.set_trace() - # model_result = await self._model_client.create( - # self._system_messages + self._chat_history + message.llm_message_list - # ) - print(f"llm_message_list {len(message.llm_message_list)}") - self._chat_history.extend(message.llm_message_list) - - print(f"-----cnt {self._cnt}") - print(f"chat history {len(self._chat_history)}") - self._cnt += 1 - assert isinstance(model_result.content, str) - json_content = json.loads(model_result.content) - return SimpleReflectAgentResponse(json_content=json_content) - - @dataclass class Message: content: str -@default_subscription -class Assistant(RoutedAgent): - def __init__(self, model_client: ChatCompletionClient) -> None: - super().__init__("An assistant agent.") - self._model_client = model_client - self._chat_history: List[LLMMessage] = [ - SystemMessage( - content="""Write Python script in markdown block, and it will be executed. -Always save figures to file in the current directory. Do not use plt.show()""", - ) - ] - - @message_handler - async def handle_message(self, message: Message, ctx: MessageContext) -> None: - self._chat_history.append(UserMessage(content=message.content, source="user")) - result = await self._model_client.create(self._chat_history) - print(f"\n{'-'*80}\nAssistant:\n{result.content}") - self._chat_history.append(AssistantMessage(content=result.content, source="assistant")) # type: ignore - await self.publish_message(Message(content=result.content), DefaultTopicId()) # type: ignore - - -@default_subscription -class Executor(RoutedAgent): - def __init__(self, code_executor: CodeExecutor) -> None: - super().__init__("An executor agent.") - self._code_executor = code_executor - - @message_handler - async def handle_message(self, message: Message, ctx: MessageContext) -> None: - code_blocks = extract_markdown_code_blocks(message.content) - if code_blocks: - result = await self._code_executor.execute_code_blocks( - code_blocks, cancellation_token=ctx.cancellation_token - ) - print(f"\n{'-'*80}\nExecutor:\n{result.output}") - await self.publish_message(Message(content=result.output), DefaultTopicId()) - - class AgentSystem(): def __init__(self) -> None: pass @@ -269,9 +210,6 @@ def call_forward(agent_task_queue): task = generate_task([taskInfo]) # For magentic one using the create_completion_client_from_env() helper - # export CHAT_COMPLETION_PROVIDER='azure' - - agent_model_kwargs = {} result = agent.forward(task, agent_model_kwargs) @@ -297,7 +235,6 @@ def call_forward(agent_task_queue): acc_list.append(f1_score) print(f"f1: {bootstrap_confidence_interval(acc_list)}") - import pdb; pdb.set_trace() return acc_list @@ -308,10 +245,7 @@ class ADASAgent(RoutedAgent): def __init__(self, model_client: ChatCompletionClient, - # system_prompt: str, - # evaluate_agent_type: str, - reflect_agent_type: str, - executor_agent_type: str, + system_prompt: str, args, archive ) -> None: @@ -321,14 +255,45 @@ def __init__(self, # content=system_prompt, # ) # ] - # self._evaluate_agent_id = AgentId(evaluate_agent_type, self.id.key) - self._reflect_agent_id = AgentId(reflect_agent_type, self.id.key) - self._executor_agent_id = AgentId(executor_agent_type, self.id.key) + self._args = args self._archive = archive self._model_client = model_client self._session_memory: Dict[str, List[ADASTask | ADASResult]] = {} + # TODO(yeandy): Add this as a proper Tool https://microsoft.github.io/autogen/dev/user-guide/core-user-guide/framework/tools.html + # pip install pygithub + self._documentation = get_autogen_documentation() + + self._system_messages: List[LLMMessage] = [ + SystemMessage( + content=system_prompt('\n'.join(self._documentation)), + ) + ] + self._chat_history: List[LLMMessage] = [] + self._model_client = model_client + self._cnt = 0 + + @message_handler + async def handle_task(self, message: LLMMessageList, ctx: MessageContext) -> SimpleReflectAgentResponse: + logging.info(f"{self._description} received message: {message}") + model_result = await self._model_client.create( + # self._system_messages + self._chat_history + message.llm_message_list + self._system_messages + message.llm_message_list + + ) + print(f"llm_message_list {len(message.llm_message_list)}") + # self._chat_history.extend(message.llm_message_list) + + print(f"-----cnt {self._cnt}") + # print(f"chat history {len(self._chat_history)}") + self._cnt += 1 + assert isinstance(model_result.content, str) + print(f"model_result.content {model_result.content}") + json_content = json.loads(model_result.content) + print(f"finish converting to json") + return SimpleReflectAgentResponse(json_content=json_content) + @message_handler async def handle_adas_task(self, message: ADASTask, ctx: MessageContext) -> None: # Store the messages in a temporary memory for this request only. @@ -369,48 +334,102 @@ async def handle_adas_task(self, message: ADASTask, ctx: MessageContext) -> None with open(file_path, 'w') as json_file: json.dump(archive, json_file, indent=4) - import pdb; pdb.set_trace() # Initial prompt for n in range(start, args.n_generation): print(f"============Generation {n + 1}=================") msg_list = [UserMessage(content=message.task, source=self.metadata["type"])] import pdb; pdb.set_trace() try: - response = await self.send_message(LLMMessageList(msg_list), self._reflect_agent_id) + response = await self.send_message(LLMMessageList(msg_list), self.id) + next_solution = response.json_content Reflexion_prompt_1, Reflexion_prompt_2 = get_reflexion_prompt(self._archive[-1] if n > 0 else None) + print(f"Reflexion_prompt_1 {Reflexion_prompt_1}") + print(f"Reflexion_prompt_2 {Reflexion_prompt_2}") + print(f"@@After initial prompt {response}") # Reflexion 1 - next_solution = response.json_content - new_messages = [ + # new_messages = [ + # AssistantMessage(content=str(next_solution), source=self.metadata["type"]), + # UserMessage(content=Reflexion_prompt_1, source=self.metadata["type"]), + # ] + new_messages = msg_list + [ AssistantMessage(content=str(next_solution), source=self.metadata["type"]), UserMessage(content=Reflexion_prompt_1, source=self.metadata["type"]), ] - response = await self.send_message(LLMMessageList(new_messages), AgentId('simple_reflect_agent', self.id.key)) + response = await self.send_message(LLMMessageList(new_messages), self.id) + next_solution = response.json_content + print(f"@@After Reflexion_prompt_1 {response}") # Reflexion 2 - next_solution = response.json_content - new_messages = [ + # new_messages = [ + # AssistantMessage(content=str(next_solution), source=self.metadata["type"]), + # UserMessage(content=Reflexion_prompt_2, source=self.metadata["type"]), + # ] + new_messages = new_messages + [ AssistantMessage(content=str(next_solution), source=self.metadata["type"]), UserMessage(content=Reflexion_prompt_2, source=self.metadata["type"]), ] - response = await self.send_message(LLMMessageList(new_messages), AgentId('simple_reflect_agent', self.id.key)) + response = await self.send_message(LLMMessageList(new_messages), self.id) + next_solution = response.json_content + # next_solution = {'reflection': 'The previous code attempted to implement an ensemble approach with additional confidence estimation, but there were errors that needed addressing. Specifically:\n1. **Incorrect Use of `publish_message`:** The previously provided code misuses `self.publish_message()` in a context where the function signature might be misleading, as it requires `None` as its return.\n2. **Improper Handling of Topics and Message Types:** The correct usage for publishing and handling message types is essential, utilizing the proper `TopicId` syntax.\n3. **Incorrect Method for Calculating Confidence:** The confidence estimation implementation was overly simplistic, which could lead to skewed results. \n\nThe revised implementation corrects these issues and ensures compliance with best practices.', 'thought': '**Insights:**\nThe next iteration of the agent should refine on the concept of diversified reasoning by incorporating evaluative mechanisms within Worker Agents to self-assess their response confidence and determine when consensus should be approached collaboratively.\n\n**Overall Idea:**\nThe architecture can further benefit from introducing adaptive learning patterns, where Worker Agents adjust their reasoning strategies dynamically based on prior task ratings or other metadata. This enables a feedback loop that improves over time.\n\n**Implementation:**\n- Modify Worker Agents to give confidence ratings in their output.\n- Integrate an orchestrator that places more weight on outputs with higher confidence when synthesizing results.\n- Ensure message handling aligns with idiomatic usage of message types and topics, using `TopicId` properly.', 'name': 'Adaptive Diverse Ensemble', 'code': 'def forward(self, task, model_client_kwargs):\n import asyncio\n from dataclasses import dataclass\n from typing import List\n from collections import Counter\n from autogen_core.base import MessageContext, AgentId, AgentRuntime, TopicId\n from autogen_core.components import RoutedAgent, message_handler, ClosureAgent, TypeSubscription\n from autogen_core.components.models import ChatCompletionClient, LLMMessage, SystemMessage, UserMessage\n from autogen_core.application import SingleThreadedAgentRuntime\n from autogen_ext.models import AzureOpenAIChatCompletionClient\n from azure.identity import DefaultAzureCredential, get_bearer_token_provider\n\n token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default")\n\n # Create an AzureOpenAI model client.\n model_client = AzureOpenAIChatCompletionClient(\n model=model_client_kwargs[\'model\'],\n api_version=model_client_kwargs[\'api_version\'],\n azure_endpoint=model_client_kwargs[\'azure_endpoint\'],\n azure_ad_token_provider=token_provider,\n model_capabilities={\n "vision": True,\n "function_calling": True,\n "json_output": True,\n },\n )\n\n @dataclass\n class DiverseThoughtTask:\n task: str\n\n @dataclass\n class DiverseThoughtResult:\n result: str\n confidence: float\n\n # Define Diverse Worker Agent\n class DiverseWorkerAgent(RoutedAgent):\n def __init__(self, description: str, model_client: ChatCompletionClient, instruction: str) -> None:\n super().__init__(description)\n self._model_client = model_client\n self._instruction = instruction\n\n @message_handler\n async def handle_task(self, message: DiverseThoughtTask, ctx: MessageContext) -> None:\n user_prompt = message.task + "\\n" + self._instruction\n model_result = await self._model_client.create([UserMessage(content=user_prompt, source="worker_agent")])\n confidence = self.estimate_confidence(model_result.content)\n assert isinstance(model_result.content, str)\n await self.publish_message(DiverseThoughtResult(result=model_result.content, confidence=confidence), \n topic_id=TopicId("worker_results", self.id.key))\n\n def estimate_confidence(self, text: str) -> float:\n # Improved placeholder for actual confidence estimation method\n # Here, we can use sentiment analysis or other processing as an example\n return min(1.0, max(0.0, len(text) / 100.0))\n\n # Orchestrator Agent for Consensus\n class OrchestratorAgent(RoutedAgent):\n def __init__(self) -> None:\n super().__init__("Orchestrator for Diverse Thoughts")\n\n @message_handler\n async def handle_task(self, message: DiverseThoughtTask, ctx: MessageContext) -> None:\n worker_ids = [AgentId("worker_1", ctx.id.key), AgentId("worker_2", ctx.id.key), AgentId("worker_3", ctx.id.key)]\n results = await asyncio.gather(*[self.send_message(message, worker_id) for worker_id in worker_ids])\n combined_result = self.evaluate_results(results)\n await self.publish_message(DiverseThoughtResult(result=combined_result, confidence=1.0), \n topic_id=TopicId("diverse_result", "orchestrator"))\n\n def evaluate_results(self, results: List[DiverseThoughtResult]) -> str:\n # Implement advanced evaluation, here just demonstrating a weighted result selection based on confidence\n confidences = Counter()\n for res in results:\n confidences[res.result] += res.confidence\n return max(confidences, key=confidences.get)\n\n async def main():\n # Create a queue to collect final answers\n queue = asyncio.Queue[DiverseThoughtResult]()\n async def output_result(_runtime: AgentRuntime, id: AgentId, message: DiverseThoughtResult, ctx: MessageContext) -> None:\n await queue.put(message)\n\n # Initialize the agent runtime\n runtime = SingleThreadedAgentRuntime()\n\n # Register workers with various strategies\n strategies = ["utilize strict logical reasoning", "incorporate probabilistic reasoning", "focus on evidence-based reasoning"]\n for i, strat in enumerate(strategies, start=1):\n await DiverseWorkerAgent.register(\n runtime, f"worker_{i}", lambda strat=strat: DiverseWorkerAgent(\n description=f"Diverse Worker {i}", model_client=model_client, instruction=strat\n )\n )\n\n # Register Orchestrator\n await OrchestratorAgent.register(runtime, "orchestrator")\n\n # Create closure agent to collect final output result\n result_topic = TypeSubscription(topic_type="diverse_result", agent_type="output_result")\n await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [result_topic])\n\n # Start the runtime, and publish the first message\n runtime.start()\n await runtime.publish_message(\n message=DiverseThoughtTask(task=task),\n topic_id=TopicId("diverse", "orchestrator")\n )\n\n # Keep processing messages until idle.\n await runtime.stop_when_idle()\n\n # Return the first answer from the queue\n return (await queue.get()).result\n\n return asyncio.run(main())\n'} + print(f"@@After Reflexion_prompt_2 {next_solution}") except Exception as e: # import pdb; pdb.set_trace() print("During LLM generate new solution:") print(e) + import pdb; pdb.set_trace() + n -= 1 continue - # TODO: Evaluate code - next_solution = response.json_content - print(f"final {str(next_solution)}") - import pdb; pdb.set_trace() - acc_list = evaluate_forward_fn(args, next_solution["code"]) - import pdb; pdb.set_trace() - - print("asdf") - # TODO: Maybe not... instantiate many agents to run eval. - # acc_list = await self.send_message(EvaluateTask(), self._evaluate_agent_id) + import pdb; pdb.set_trace() + acc_list = [] + for _ in range(args.debug_max): + print(f"DEBUGGING") + try: + acc_list = evaluate_forward_fn(args, next_solution["code"]) + if np.mean(acc_list) < 0.01 and SEARCHING_MODE: + raise Exception("All 0 accuracy") + break + except Exception as e: + print("During evaluation:") + print(e) + next_solution = response.json_content + # new_messages = [ + # AssistantMessage(content=str(next_solution), source=self.metadata["type"]), + # UserMessage(content=f"Error during evaluation:\n{e}\nCarefully consider where you went wrong in your latest implementation. Using insights from previous attempts, try to debug the current code to implement the same thought. Repeat your previous thought in 'thought', and put your thinking for debugging in 'debug_thought'", source=self.metadata["type"]), + # ] + new_messages = new_messages + [ + AssistantMessage(content=str(next_solution), source=self.metadata["type"]), + UserMessage(content=f"Error during evaluation:\n{e}\nCarefully consider where you went wrong in your latest implementation. Using insights from previous attempts, try to debug the current code to implement the same thought. Repeat your previous thought in 'thought', and put your thinking for debugging in 'debug_thought'", source=self.metadata["type"]), + ] + try: + response = await self.send_message(LLMMessageList(new_messages), self.id) + next_solution = response.json_content + except Exception as e: + print("During LLM generate new solution:") + print(e) + import pdb; pdb.set_trace() + continue + continue + if not acc_list: + n -= 1 + continue + + import pdb; pdb.set_trace() + fitness_str = bootstrap_confidence_interval(acc_list) + next_solution['fitness'] = fitness_str + next_solution['generation'] = n + 1 + if 'debug_thought' in next_solution: + del next_solution['debug_thought'] + if 'reflection' in next_solution: + del next_solution['reflection'] + archive.append(next_solution) + + # save results + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, 'w') as json_file: + json.dump(archive, json_file, indent=4) async def main(args) -> None: runtime = SingleThreadedAgentRuntime() @@ -418,22 +437,12 @@ async def main(args) -> None: archive = get_init_archive() system_prompt, prompt = get_prompt(archive) - # Create the reflect agent - await SimpleReflectAgent.register( - runtime, "simple_reflect_agent", lambda: SimpleReflectAgent( - description='Simple Reflect Agent', - model_client=client, - system_prompt=system_prompt, - ) - ) - await ADASAgent.register( runtime, "adas_agent", lambda: ADASAgent( model_client=client, + system_prompt=system_prompt, args=args, archive=archive, - reflect_agent_type='simple_reflect_agent', - executor_agent_type='executor', ) ) diff --git a/python/packages/autogen-core/samples/common/adas/adas_prompt.py b/python/packages/autogen-core/samples/common/adas/adas_prompt.py index 79a1342895b..0b577fbf240 100644 --- a/python/packages/autogen-core/samples/common/adas/adas_prompt.py +++ b/python/packages/autogen-core/samples/common/adas/adas_prompt.py @@ -37,7 +37,7 @@ COT = { "thought": "By encouraging the LLM to think step by step rather than directly outputting an answer, chain-of-thought reasoning enables complex problem-solving through intermediate steps. This practice improves the model's ability to handle tasks that require deeper reasoning and provides insight into its decision-making process.", "name": "Chain-of-Thought", - "code": """def forward(self, task, agent_model_kwargs): + "code": """def forward(self, task, model_client_kwargs): import asyncio import logging import json @@ -60,9 +60,9 @@ # Create an AzureOpenAI model client. model_client = AzureOpenAIChatCompletionClient( - model=agent_model_kwargs['model'], - api_version=agent_model_kwargs['api_version'], - azure_endpoint=agent_model_kwargs['azure_endpoint'], + model=model_client_kwargs['model'], + api_version=model_client_kwargs['api_version'], + azure_endpoint=model_client_kwargs['azure_endpoint'], azure_ad_token_provider=token_provider, model_capabilities={ "vision": True, @@ -116,6 +116,7 @@ async def handle_task(self, message: ChainOfThoughtTask, ctx: MessageContext) -> # Define the main function to set up and run the agent system async def main(): + # Create a queue to collect final answer queue = asyncio.Queue[FinalResult]() async def output_result(_runtime: AgentRuntime, id: AgentId, message: FinalResult, ctx: MessageContext) -> None: await queue.put(message) @@ -134,11 +135,15 @@ async def output_result(_runtime: AgentRuntime, id: AgentId, message: FinalResul instruction=cot_instruction, ) ) + # Create closure agent to collect final output result await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]) + # Start the runtime, and publish the first message runtime.start() initial_message = ChainOfThoughtTask(task=task) await runtime.send_message(initial_message, agent_id) # publish_message + + # Keep processing messages until idle. await runtime.stop_when_idle() # Return the first answer from the queue @@ -151,7 +156,7 @@ async def output_result(_runtime: AgentRuntime, id: AgentId, message: FinalResul COT_SC = { "thought": "While an LLM can arrive at the correct answer, its reasoning may vary. By repeatedly asking the same question with high temperature settings, we can generate different reasoning paths. We then combine multiple answers from these Chain-of-Thought (CoT) agents to produce a more accurate final answer through ensembling.", "name": "Self-Consistency with Chain-of-Thought", - "code": """def forward(self, task, agent_model_kwargs): + "code": """def forward(self, task, model_client_kwargs): import asyncio import logging import json @@ -174,9 +179,9 @@ async def output_result(_runtime: AgentRuntime, id: AgentId, message: FinalResul # Create an AzureOpenAI model client. model_client = AzureOpenAIChatCompletionClient( - model=agent_model_kwargs['model'], - api_version=agent_model_kwargs['api_version'], - azure_endpoint=agent_model_kwargs['azure_endpoint'], + model=model_client_kwargs['model'], + api_version=model_client_kwargs['api_version'], + azure_endpoint=model_client_kwargs['azure_endpoint'], azure_ad_token_provider=token_provider, model_capabilities={ "vision": True, @@ -284,20 +289,24 @@ async def main(): # Initialize the agent runtime runtime = SingleThreadedAgentRuntime() + # Create the agents cot_instruction = "Please think step by step and then solve the task." await WorkerAgent.register( - runtime, "worker", lambda: WorkerAgent(model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"), instruction=cot_instruction) + runtime, "worker", lambda: WorkerAgent(model_client=model_client, instruction=cot_instruction) ) await OrchestratorAgent.register( runtime, "orchestrator", lambda: OrchestratorAgent( - model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"), worker_agent_types=["worker"] * 5, num_layers=1 + model_client=model_client, worker_agent_types=["worker"] * 5, num_layers=1 ), ) + # Start the runtime, and publish the first message runtime.start() result = await runtime.send_message(UserTask(task=task), AgentId("orchestrator", "default")) + + # Return the result return result.result return asyncio.run(main()) @@ -307,7 +316,7 @@ async def main(): Reflexion = { "thought": "To enhance its performance, an LLM can iteratively improve its answer based on feedback. By reflecting on its previous attempts and incorporating feedback, the model can refine its reasoning and provide a more accurate solution.", "name": "Self-Refine (Reflexion)", - "code": '''def forward(self, task, agent_model_kwargs): + "code": '''def forward(self, task, model_client_kwargs): import asyncio import json import logging @@ -334,9 +343,9 @@ async def main(): # Create an AzureOpenAI model client. model_client = AzureOpenAIChatCompletionClient( - model=agent_model_kwargs['model'], - api_version=agent_model_kwargs['api_version'], - azure_endpoint=agent_model_kwargs['azure_endpoint'], + model=model_client_kwargs['model'], + api_version=model_client_kwargs['api_version'], + azure_endpoint=model_client_kwargs['azure_endpoint'], azure_ad_token_provider=token_provider, model_capabilities={ "vision": True, @@ -409,8 +418,6 @@ async def handle_writing_task(self, message: WritingTask, ctx: MessageContext) - assert isinstance(response.content, str) # Extract the answer from the response. answer = self._extract_answer(response.content) - if answer is None: - raise ValueError("Answer not found.") # Create a review task. review_task = ReviewTask( session_id=session_id, @@ -468,8 +475,6 @@ async def handle_review_result(self, message: ReviewResult, ctx: MessageContext) assert isinstance(response.content, str) # Extract the answer from the response. answer = self._extract_answer(response.content) - if answer is None: - raise ValueError("Answer not found.") # Create a new review task. review_task = ReviewTask( session_id=message.session_id, @@ -564,21 +569,27 @@ async def handle_review_task(self, message: ReviewTask, ctx: MessageContext) -> # Define the main function to set up and run the agent system async def main(): + # Create a queue to collect final answer queue = asyncio.Queue[WritingResult]() async def output_result(_runtime: AgentRuntime, id: AgentId, message: WritingResult, ctx: MessageContext) -> None: await queue.put(message) + # Initialize the agent runtime runtime = SingleThreadedAgentRuntime() + + # Create agents await ReviewerAgent.register( - runtime, "ReviewerAgent", lambda: ReviewerAgent(model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini")) + runtime, "ReviewerAgent", lambda: ReviewerAgent(model_client=model_client) ) cot_instruction = "Please think step by step and then solve the task." await WorkerAgent.register( - runtime, "WorkerAgent", lambda: WorkerAgent(model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"), instruction=cot_instruction) + runtime, "WorkerAgent", lambda: WorkerAgent(model_client=model_client, instruction=cot_instruction) ) + # Create closure agent to collect final output result result_topic = TypeSubscription(topic_type="result", agent_type="output_result") await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [result_topic]) + # Start the runtime, and publish the first message runtime.start() await runtime.publish_message( message=WritingTask(task=task), @@ -588,7 +599,8 @@ async def output_result(_runtime: AgentRuntime, id: AgentId, message: WritingRes # Keep processing messages until idle. await runtime.stop_when_idle() - # Return the answer from the queue + # Return the first answer from the queue + print(f"queue {queue}") return (await queue.get()).answer return asyncio.run(main()) @@ -721,7 +733,13 @@ async def output_result(_runtime: AgentRuntime, id: AgentId, message: WritingRes """ } -system_prompt = """You are a helpful assistant. Make sure to return in a WELL-FORMED JSON object.""" +system_prompt = lambda formatted_documentation: f"""You are a helpful assistant. You have an expert understanding of the AutoGen framework, and how to use the Python API. The API documentation are as follows: + +{formatted_documentation} + +This is the end of the documentation. + +Make sure to return in a WELL-FORMED JSON object. Do not add any code blocks around the JSON object.""" base = """# Overview You are an expert machine learning researcher testing various agentic systems. Your objective is to design building blocks such as prompts and control flows within these systems to solve complex tasks. Your aim is to design an optimal agent performing well on the Reading Comprehension Benchmark Requiring Discrete Reasoning Over Paragraphs (DROP), which assesses the ability to perform discrete reasoning and comprehend detailed information across multiple paragraphs. @@ -738,176 +756,23 @@ async def output_result(_runtime: AgentRuntime, id: AgentId, message: WritingRes # The utility code: ```python -from collections import namedtuple -from typing import Union -import numpy as np -import json - -import openai -import backoff -from utils import random_id - -# Initialize the OpenAI client -client = openai.OpenAI() -# Named tuple for holding task information Info = namedtuple('Info', ['name', 'author', 'content', 'iteration_idx']) -# Format instructions for LLM response -FORMAT_INST = lambda request_keys: f"Reply EXACTLY with the following JSON format.\n{str(request_keys)}\nDO NOT MISS ANY FIELDS AND MAKE SURE THE JSON FORMAT IS CORRECT!\n" - -# Description of the role for the LLM -ROLE_DESC = lambda role: f"You are a {role}." - -@backoff.on_exception(backoff.expo, openai.RateLimitError) -def get_json_response_from_gpt(msg, model, system_message, temperature=0.5): - \""" - Function to get JSON response from GPT model. - - Args: - - msg (str): The user message. - - model (str): The model to use. - - system_message (str): The system message. - - temperature (float): Sampling temperature. - - Returns: - - dict: The JSON response. - \""" - response = client.chat.completions.create( - model=model, - messages=[ - {"role": "system", "content": system_message}, - {"role": "user", "content": msg}, - ], - temperature=temperature, - max_tokens=1024, - stop=None, - response_format={"type": "json_object"} - ) - content = response.choices[0].message.content - json_dict = json.loads(content) - return json_dict - -class LLMAgentBase: - \""" - Base class for an LLM agent. - - Attributes: - - output_fields (list): Fields expected in the output. - - agent_name (str): Name of the agent. - - role (str): Role description for the agent. - - model (str): Model to be used. (option. Keep it default.) - - temperature (float): Sampling temperature. - - id (str): Unique identifier for the agent instance. - \""" - - def __init__(self, output_fields: list, agent_name: str, role='helpful assistant', model='gpt-3.5-turbo-0125', temperature=0.5) -> None: - self.output_fields = output_fields - self.agent_name = agent_name - self.role = role - self.model = model - self.temperature = temperature - self.id = random_id() - - def generate_prompt(self, input_infos, instruction) -> str: - \""" - Generates a prompt for the LLM. - - Args: - - input_infos (list): List of input information. - - instruction (str): Instruction for the task. - - Returns: - - tuple: System prompt and user prompt. - - An example of a generated prompt: - "" - You are a helpful assistant. - - # Output Format: - Reply EXACTLY with the following JSON format. - ... - - # Your Task: - You will be given some number of paired example inputs and outputs. The outputs ... - - ### thinking #1 by Chain-of-Thought Agent hkFo (yourself): - ... - - ### code #1 by Chain-of-Thought Agent hkFo (yourself): - ... - - ### answer by Chain-of-Thought Agent hkFo's code evaluator:... - - - # Instruction: - Please think step by step and then solve the task by writing the code. - "" - \""" - output_fields_and_description = {key: f"Your {key}." if not 'answer' in key else f"Your {key}. Return ONLY the alphabet choice, i.e. A or B or C or D." for key in self.output_fields} - system_prompt = ROLE_DESC(self.role) + "\n\n" + FORMAT_INST(output_fields_and_description) - - input_infos_text = '' - for input_info in input_infos: - if isinstance(input_info, Info): - (field_name, author, content, iteration_idx) = input_info - else: - continue - if author == self.__repr__(): - author += ' (yourself)' - if field_name == 'task': - input_infos_text += f'# Your Task:\n{content}\n\n' - elif iteration_idx != -1: - input_infos_text += f'### {field_name} #{iteration_idx+1} by {author}:\n{content}\n\n' - else: - input_infos_text += f'### {field_name} by {author}:\n{content}\n\n' - - prompt = input_infos_text + instruction - return system_prompt, prompt - - def query(self, input_infos: list, instruction, iteration_idx=-1) -> list[Info]: - \""" - Queries the LLM with provided input information and instruction. - - Args: - - input_infos (list): List of input information. - - instruction (str): Instruction for the task. - - iteration_idx (int): Iteration index for the task. - - Returns: - - output_infos (list[Info]): Output information. - \""" - system_prompt, prompt = self.generate_prompt(input_infos, instruction) - response_json = get_json_response_from_gpt(prompt, self.model, system_prompt, self.temperature) - - output_infos = [] - for key, value in response_json.items(): - info = Info(key, self.__repr__(), value, iteration_idx) - output_infos.append(info) - return output_infos - - def __repr__(self): - return f"{self.agent_name} {self.id}" - - def __call__(self, input_infos: list, instruction, iteration_idx=-1): - # Note: - # The output of the LLM is a list of Info. If you are only querying one output, you should access it with [0]. - # It is a good practice to always include 'thinking' in the output. - return self.query(input_infos, instruction, iteration_idx=iteration_idx) - class AgentArchitecture: \""" Fill in your code here. \""" - def forward(self, taskInfo) -> Union[Info, str]: + def forward(self, task, model_client_kwargs) -> str: \""" Placeholder method for processing task information. Args: - - taskInfo (Info): Task information. + - task (Info): Task information. + - model_client_kwargs (Dict): Information for the AzureOpenAIChatCompletionClient Returns: - - Answer (Union[Info, str]): Your FINAL Answer. Return either a namedtuple Info or a string of answers. + - Answer (str): Your FINAL Answer. Return a string of answers. \""" pass ``` @@ -928,61 +793,300 @@ def forward(self, taskInfo) -> Union[Info, str]: [EXAMPLE] You must use the exact function interface used above. You need to specify the instruction, input information, and the required output fields for various LLM agents to do their specific part of the architecture. -Also, it could be helpful to set the LLM’s role and temperature to further control the LLM’s response. Note that the LLMAgentBase() will automatically parse the output and return a list of “Infos”. You can get the content by Infos.content. -DO NOT FORGET the taskInfo input to LLM if you think it is needed, otherwise LLM will not know about the task. +Also, it could be helpful to set the LLM’s role to further control the LLM’s response. +DO NOT FORGET the `task` input to LLM if you think it is needed, otherwise LLM will not know about the task. ## WRONG Implementation examples: Here are some mistakes you may make: 1. This is WRONG: ``` -feedback, correct = critic_agent([taskInfo, thinking, answer], critic_instruction, i) -feedback_info = verifier_agent([taskInfo, Info('feedback', 'Critic Agent', thinking, 0)], verification_instruction) + +@default_subscription +class WorkerAgent(RoutedAgent): + def __init__(self): + pass + + @message_handler + async def handle_writing_task(self, message: WritingTask, ctx: MessageContext) -> None: + pass + +async def main(): + # Create a queue to collect final answer + queue = asyncio.Queue[FinalResult]() + async def output_result(_runtime: AgentRuntime, id: AgentId, message: FinalResult, ctx: MessageContext) -> None: + await queue.put(message) + + runtime = SingleThreadedAgentRuntime() + await ReviewerAgent.register( + runtime, "ReviewerAgent", lambda: ReviewerAgent(model_client=model_client) + ) + cot_instruction = "Please think step by step and then solve the task." + await WorkerAgent.register( + runtime, "WorkerAgent", lambda: WorkerAgent(model_client=model_client, instruction=cot_instruction) + ) + # Create closure agent to collect final output result + await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]) + + runtime.start() + await runtime.publish_message( + message=WritingTask(task=task), + topic_id=DefaultTopicId(), + ) + + # Keep processing messages until idle. + await runtime.stop_when_idle() + + # Return the answer from the queue + return (await queue.get()).answer + +return asyncio.run(main()) ``` -It is wrong to use "Info('feedback', 'Critic Agent', thinking, 0)". The returned "feedback" from LLMAgentBase is already Info. +Because the WorkerAgent is subscribed to the `@default_subscription` topic, then there will be conflicts for the ClosureAgent to collect the WritingResult from the same default subscription. Create a new topic using TypeSubscription(topic_type="result", agent_type="output_result") to make this work. 2. This is WRONG: ``` -# Debugging: Log the generated answer -print('Generated Answer:', ...) -feedback_info = verifier_agent([taskInfo, Info('feedback', 'Critic Agent', thinking, 0)], verification_instruction) -if len(feedback_info) < 3: # Check if feedback_info has enough elements - return 'Error: Feedback info incomplete' +async def main(): + + # Initialize the agent runtime + runtime = SingleThreadedAgentRuntime() + + # Create the agents + cot_instruction = "Please think step by step and then solve the task." + await WorkerAgent.register( + runtime, "worker", lambda: WorkerAgent(model_client=model_client, instruction=cot_instruction) + ) + await OrchestratorAgent.register( + runtime, + "orchestrator", + lambda: OrchestratorAgent( + model_client=model_client, worker_agent_types=["worker"] * 5, num_layers=1 + ), + ) + + # Start the runtime, and publish the first message + runtime.start() + result = await runtime.send_message(UserTask(task=task), AgentId("orchestrator", "default")) + + # Return the result + return result.result + +return main() ``` -First, the len(feedback_info) will not work. -Second, you should never return an error message. You should always return the best answer you can get. -Third, you should never print anything in the code. -Lastly, again, DO NOT CREATE Info object by yourself. +The `main()` function needs to be called with `asyncio.run(main())` 3. This is WRONG: ``` -all_thinking = [] -all_answers = [] -for agent, role in zip(agents, roles): - outputs = agent([taskInfo], independent_reasoning_instruction.format(role=role)) - all_thinking.append(outputs[0].content) - all_answers.append(outputs[1].content) - -# Aggregate the reasoning paths and answers -aggregated_thinking = '\n'.join(all_thinking) -aggregated_answers = '\n'.join(all_answers) +# Define the Chain-of-Thought Agent +class ChainOfThoughtAgent(RoutedAgent): + def __init__(self, description: str, + model_client: ChatCompletionClient, + system_prompt: str, + instruction: str, + ) -> None: + super().__init__(description) + self._system_messages: List[LLMMessage] = [ + SystemMessage( + content=system_prompt, + ) + ] + self._model_client = model_client + self._instruction = instruction + + @message_handler + async def handle_task(self, message: ChainOfThoughtTask, ctx: MessageContext) -> FinalResult: + + logging.info(f"{self._description} received message: {message.task}") + user_prompt = message.task + "\\n" + self._instruction + msgs = self._system_messages + [UserMessage(content=user_prompt, source=self.metadata["type"])] + model_result = await self._model_client.create(msgs) + assert isinstance(model_result.content, str) + + await self.publish_message( + message=FinalResult(model_result.content), + topic_id=DefaultTopicId(), + ) ``` -You SHOULD NOT extract the content from the Info object by yourself. You should use the Info object directly. If you want to aggregate the content, you should just put those Info objects into a list and then use the list as input to the next LLM agent. +Any call with `self.publish_message()` will always return None, so make sure to set the output type of the `handle_task` function as `None`. Example: `async def handle_task(self, message: ChainOfThoughtTask, ctx: MessageContext) -> None:`. 4. This is WRONG: ``` -reasoning_agent = LLMAgentBase(['thinking', 'answer'], 'Reasoning Agent') -response_infos = reasoning_agent([taskInfo] + ..., reasoning_instruction) +class OrchestratorAgent(RoutedAgent): + def __init__( + self, + model_client: ChatCompletionClient, + worker_agent_types: List[str], + num_layers: int, + ) -> None: + super().__init__(description="Aggregator Agent") + self._model_client = model_client + self._worker_agent_types = worker_agent_types + self._num_layers = num_layers + + + @message_handler + async def handle_task(self, message: UserTask, ctx: MessageContext) -> None: + print(f"{'-'*80}\\nOrchestrator-{self.id}:\\nReceived task: {message.task}") + # Create task for the first layer. + worker_task = WorkerTask(task=message.task, previous_results=[]) + # Iterate over layers. + for i in range(self._num_layers): + # Assign workers for this layer. + worker_ids = [ + AgentId(worker_type, f"{self.id.key}/layer_{i}/worker_{j}") + for j, worker_type in enumerate(self._worker_agent_types) + ] + # Dispatch tasks to workers. + print(f"{'-'*80}\\nOrchestrator-{self.id}:\\nDispatch to workers at layer {i}") + results = await asyncio.gather(*[self.send_message(worker_task, worker_id) for worker_id in worker_ids]) + print(f"{'-'*80}\\nOrchestrator-{self.id}:\\nReceived results from workers at layer {i}") + # Prepare task for the next layer. + worker_task = WorkerTask(task=message.task, previous_results=[r.result for r in results]) + # Perform final aggregation. + print(f"{'-'*80}\\nOrchestrator-{self.id}:\\nPerforming final aggregation") + # system_prompt = "You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability.\\n\\nResponses from models:" + system_prompt = "Given all the above solutions, reason over them carefully and provide a final answer." + system_prompt += "\\n" + "\\n\\n".join([f"{i+1}. {r}" for i, r in enumerate(worker_task.previous_results)]) + model_result = await self._model_client.create( + [SystemMessage(system_prompt), UserMessage(content=message.task, source="user")] + ) + assert isinstance(model_result.content, str) + return FinalResult(result=model_result.content) +``` +Directly returning a message dataclass `FinalResult` requires setting the return type of the `handle_task` function to return `FinalResult`. Example: `async def handle_task(self, message: UserTask, ctx: MessageContext) -> FinalResult:`. + +5. This is WRONG: ``` + # Main orchestration + async def main(): + runtime = SingleThreadedAgentRuntime() + + # Register agents + await RetrieverAgent.register(runtime, "retriever_agent") + await ValidatorAgent.register(runtime, "validator_agent") + await ReasoningAgent.register(runtime, "reasoning_agent", lambda: ReasoningAgent(model_client=model_client)) + + # Start runtime + runtime.start() + task_data = task.content if isinstance(task, Info) else task # Assuming task contains raw question + await runtime.publish_message(task_data, AgentId("retriever_agent", "default")) + + # Stop when idle + await runtime.stop_when_idle() + + return asyncio.run(main()) +``` +The first argument into `publish_message` or `send_message` should not be an `Info` object or any other object. It must be a Message dataclass, which has the format similar to: ``` +@dataclass +class Message: + content: str +``` + +6. This is WRONG: ``` +await ctx.publish(AdaptiveResult(result=response.content), topic_id=ctx.default_topic_id()) +``` +Publishing should be called with `self.publish_message()`. + +7. This is WRONG: ``` +await ClosureAgent.register(runtime, "final_collection", collect_final_result, subscriptions=[TypeSubscription("consensus_result", "consensus_agent")]) +``` +The argument passed to `subscriptions` should not be a list. It should be a lambda function to a list. For example: ``` +await ClosureAgent.register(runtime, "final_collection", collect_final_result, subscriptions=lambda: [TypeSubscription("consensus_result", "consensus_agent")]) +``` + +8. This is WRONG: ``` +await runtime.publish_message(Task(content='What is the highest mountain in the world?'), topic_id=TypeSubscription("initial_task", "worker_agent").topic_id()) +``` +The `topic_id` needs to be a `TopicId` with or `DefaultTopicId` object. For example: ``` +await runtime.publish_message(Task(content='What is the highest mountain in the world?'), topic_id=TopicId(topic_type, source=self.id.key)) +``` +or ``` +await runtime.publish_message(Task(content='What is the highest mountain in the world?'), topic_id=TopicId(user_topic_type, source=session_id)) +``` +or ``` +await runtime.publish_message(Task(content='What is the highest mountain in the world?'), topic_id=DefaultTopicId()) +``` + +8. This is WRONG: ``` +await OrchestratorAgent.register(runtime, "orchestrator") +``` +You will encounter this error "TypeError: BaseAgent.register() missing 1 required positional argument: 'factory'". The correct solution is: ``` +await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent()) +``` + +9 This is WRONG: ``` +class OrchestratorAgent(RoutedAgent): + pass -# Extract the final answer from the response_infos -for info in response_infos: - if info.name == 'final_answer': - return info -# Fallback if no answer is found -return Info('answer', 'Final Decision Agent', 'No answer generated.', 0) +async def main(): + await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent()) + + await runtime.publish_message( + message=DiverseThoughtTask(task='What is the most creative art medium?'), + topic_id=TopicId("diverse", "orchestrator") + ) +``` +You must register subscriptions with the agent runtime through the `add_subscription` method. +``` +async def main(): + await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent()) + await runtime.add_subscription(TypeSubscription("orchestrator_type", "orchestrator")) + + await runtime.publish_message( + message=DiverseThoughtTask(task='What is the most creative art medium?'), + topic_id=TopicId(type="orchestrator_type") + ) ``` -You should not extract the final answer by yourself. You SHOULD directly return the answer Info. Also, you should always return the best answer you can get. -CORRECT example: ``` -reasoning_agent = LLMAgentBase(['thinking', 'answer'], 'Reasoning Agent') -thinking, answer = reasoning_agent([taskInfo] + ..., reasoning_instruction) -return answer +Now, you can publish directly to a specific topic through the runtime. + +## CORRECT Implementation examples: +Here are some correct patterns you should follow: + +1. This is CORRECT: ``` +from azure.identity import DefaultAzureCredential, get_bearer_token_provider +token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") + +# Create an AzureOpenAI model client. +model_client = AzureOpenAIChatCompletionClient( + model=model_client_kwargs['model'], + api_version=model_client_kwargs['api_version'], + azure_endpoint=model_client_kwargs['azure_endpoint'], + azure_ad_token_provider=token_provider, + model_capabilities={ + "vision": True, + "function_calling": True, + "json_output": True, + }, +) ``` +Creating the model client using the model_client_kwargs dictionary. + +2. This is CORRECT: ``` + async def main(): + # Create a queue to collect final answer + queue = asyncio.Queue[WritingResult]() + async def output_result(_runtime: AgentRuntime, id: AgentId, message: WritingResult, ctx: MessageContext) -> None: + await queue.put(message) + + # Initialize the agent runtime + runtime = SingleThreadedAgentRuntime() + + # Create agents + + # Create closure agent to collect final output result + result_topic = TypeSubscription(topic_type="result", agent_type="output_result") + await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [result_topic]) + + # Start the runtime, and publish the first message + runtime.start() + await runtime.publish_message() + + # Keep processing messages until idle. + await runtime.stop_when_idle() + + # Return the first answer from the queue + print(f"queue {queue}") + return (await queue.get()).answer + + return asyncio.run(main()) +``` +This is the format for the `main` function. Make sure that when creating a `ClosureAgent`, you have created `queue` from which you can call `return (await queue.get()).answer` at the very end of the `main` function. The datatype of the Queue should be the final message that the agent system publishes to indicate that the system is terminating. +The `result_topic` should have a unique `topic_type`, which can be called "result". # Your task You are deeply familiar with prompting techniques and the agent works from the literature. Your goal is to maximize the specified performance metrics by proposing interestingly new agents. @@ -992,6 +1096,9 @@ def forward(self, taskInfo) -> Union[Info, str]: THINK OUTSIDE THE BOX. """ +# Documentation: https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/docs/src/user-guide/core-user-guide + + Reflexion_prompt_1 = f""""[EXAMPLE]Carefully review the proposed new architecture and reflect on the following points: 1. **Interestingness**: Assess whether your proposed architecture is interesting or innovative compared to existing methods in the archive. If you determine that the proposed architecture is not interesting, suggest a new architecture that addresses these shortcomings. @@ -1029,7 +1136,8 @@ def forward(self, taskInfo) -> Union[Info, str]: def get_init_archive(): # return [COT]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] # return [COT_SC]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] - return [Reflexion]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] + # return [Reflexion]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] + return [COT, COT_SC, Reflexion] # LLM_debate, Take_a_step_back, QD, Role_Assignment] From f2ed2381c5d35bd4c22c0bfbdeddcddbaf03dae1 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 Nov 2024 15:31:58 -0500 Subject: [PATCH 3/3] update --- .../autogen-core/samples/common/adas/adas.py | 2 +- .../samples/common/adas/adas_prompt.py | 330 ++++++++++++++++-- 2 files changed, 302 insertions(+), 30 deletions(-) diff --git a/python/packages/autogen-core/samples/common/adas/adas.py b/python/packages/autogen-core/samples/common/adas/adas.py index 7bd4cc2b779..ff7c21666c3 100644 --- a/python/packages/autogen-core/samples/common/adas/adas.py +++ b/python/packages/autogen-core/samples/common/adas/adas.py @@ -71,7 +71,7 @@ def print_repo_contents(repo, path="", indent=""): if content_file.type == "dir": documentation.extend(print_repo_contents(repo, content_file.path, indent + "│ ")) else: - if content_file.download_url.endswith('.md'): + if content_file.download_url.endswith('.md') or content_file.download_url.endswith('.ipynb'): print(f"Reading file from {content_file.download_url}") f = read_github_file(content_file.download_url) documentation.append("Title: " + content_file.name + "\nContents:\n" + f) diff --git a/python/packages/autogen-core/samples/common/adas/adas_prompt.py b/python/packages/autogen-core/samples/common/adas/adas_prompt.py index 0b577fbf240..11889c08888 100644 --- a/python/packages/autogen-core/samples/common/adas/adas_prompt.py +++ b/python/packages/autogen-core/samples/common/adas/adas_prompt.py @@ -610,39 +610,262 @@ async def output_result(_runtime: AgentRuntime, id: AgentId, message: WritingRes LLM_debate = { "thought": "By letting different LLMs debate with each other, we can leverage their diverse perspectives to find better solutions for tasks.", "name": "LLM Debate", - "code": """def forward(self, taskInfo): - # Instruction for initial reasoning - debate_initial_instruction = "Please think step by step and then solve the task." + "code": '''def forward(self, task, model_client_kwargs): + import asyncio + import json + import logging + import re + import sys + import uuid + from dataclasses import dataclass + from typing import Dict, List, Union + from autogen_core.base import MessageContext, TopicId, AgentId, AgentRuntime + from autogen_core.components import RoutedAgent, default_subscription, message_handler, TypeSubscription + from autogen_core.components.models import ( + AssistantMessage, + ChatCompletionClient, + LLMMessage, + SystemMessage, + UserMessage, + ) + from autogen_core.application import SingleThreadedAgentRuntime + from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler, ClosureAgent + from autogen_ext.models import AzureOpenAIChatCompletionClient + from azure.identity import DefaultAzureCredential, get_bearer_token_provider + + token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") - # Instruction for debating and updating the solution based on other agents' solutions - debate_instruction = "Given solutions to the problem from other agents, consider their opinions as additional advice. Please think carefully and provide an updated answer." + # Create an AzureOpenAI model client. + model_client = AzureOpenAIChatCompletionClient( + model=model_client_kwargs['model'], + api_version=model_client_kwargs['api_version'], + azure_endpoint=model_client_kwargs['azure_endpoint'], + azure_ad_token_provider=token_provider, + model_capabilities={ + "vision": True, + "function_calling": True, + "json_output": True, + }, + ) - # Initialize debate agents with different roles and a moderate temperature for varied reasoning - debate_agents = [LLMAgentBase(['thinking', 'answer'], 'Debate Agent', temperature=0.8, role=role) for role in ['Reading Comprehension Specialist', 'Logical Reasoning Strategist', 'Multidisciplinary Knowledge Integrator']] + @dataclass + class Question: + content: str - # Instruction for final decision-making based on all debates and solutions - final_decision_instruction = "Given all the above thinking and answers, reason over them carefully and provide a final answer." - final_decision_agent = LLMAgentBase(['thinking', 'answer'], 'Final Decision Agent', temperature=0.1) - max_round = 2 # Maximum number of debate rounds - all_thinking = [[] for _ in range(max_round)] - all_answer = [[] for _ in range(max_round)] + @dataclass + class Answer: + content: str + + + @dataclass + class SolverRequest: + content: str + question: str + + + @dataclass + class IntermediateSolverResponse: + content: str + question: str + answer: str + round: int + + + @dataclass + class FinalSolverResponse: + answer: str + + @default_subscription + class Solver(RoutedAgent): + def __init__(self, model_client: ChatCompletionClient, topic_type: str, num_neighbors: int, max_round: int) -> None: + super().__init__("A debator.") + self._topic_type = topic_type + self._model_client = model_client + self._num_neighbors = num_neighbors + self._history: List[LLMMessage] = [] + self._buffer: Dict[int, List[IntermediateSolverResponse]] = {} + self._system_messages = [ + SystemMessage( + ( + "You are a helpful assistant with expertise in reasoning. " + "Your task is to assist in solving a reasoning problem by providing " + "a clear and detailed solution. Limit your output within 100 words, " + "and your final answer should be a single string." + ) + ) + ] + self._round = 0 + self._max_round = max_round - # Perform debate rounds - for r in range(max_round): - for i in range(len(debate_agents)): - if r == 0: - thinking, answer = debate_agents[i]([taskInfo], debate_initial_instruction) + @message_handler + async def handle_request(self, message: SolverRequest, ctx: MessageContext) -> None: + # Add the question to the memory. + self._history.append(UserMessage(content=message.content, source="user")) + # Make an inference using the model. + model_result = await self._model_client.create(self._system_messages + self._history) + assert isinstance(model_result.content, str) + # Add the response to the memory. + self._history.append(AssistantMessage(content=model_result.content, source=self.metadata["type"])) + print(f"{'-'*80}\\nSolver {self.id} round {self._round}:\\n{model_result.content}") + # Increment the counter. + self._round += 1 + if self._round == self._max_round: + # If the counter reaches the maximum round, publishes a final response. + await self.publish_message(FinalSolverResponse(answer=model_result.content), topic_id=DefaultTopicId()) else: - input_infos = [taskInfo] + [all_thinking[r-1][i]] + all_thinking[r-1][:i] + all_thinking[r-1][i+1:] - thinking, answer = debate_agents[i](input_infos, debate_instruction) - all_thinking[r].append(thinking) - all_answer[r].append(answer) - - # Make the final decision based on all debate results and solutions - thinking, answer = final_decision_agent([taskInfo] + all_thinking[max_round-1] + all_answer[max_round-1], final_decision_instruction) - return answer -""" + # Publish intermediate response to the topic associated with this solver. + print("publish IntermediateSolverResponse") + await self.publish_message( + IntermediateSolverResponse( + content=model_result.content, + question=message.question, + answer=model_result.content, + round=self._round, + ), + topic_id=DefaultTopicId(type=self._topic_type), + ) + + @message_handler + async def handle_response(self, message: IntermediateSolverResponse, ctx: MessageContext) -> None: + # Add neighbor's response to the buffer. + self._buffer.setdefault(message.round, []).append(message) + # Check if all neighbors have responded. + if len(self._buffer[message.round]) == self._num_neighbors: + print( + f"{'-'*80}\\nSolver {self.id} round {message.round}:\\nReceived all responses from {self._num_neighbors} neighbors." + ) + # Prepare the prompt for the next question. + prompt = "These are the solutions to the problem from other agents:\\n" + for resp in self._buffer[message.round]: + prompt += f"One agent solution: {resp.content}\\n" + prompt += ( + "Using the solutions from other agents as additional information, " + "can you provide your answer to the problem? " + f"The original problem is {message.question}. " + "Your final answer should be a single string." + ) + # Send the question to the agent itself to solve. + await self.send_message(SolverRequest(content=prompt, question=message.question), self.id) + # Clear the buffer. + self._buffer.pop(message.round) + + + @default_subscription + class Aggregator(RoutedAgent): + def __init__(self, num_solvers: int) -> None: + super().__init__("Aggregator") + self._num_solvers = num_solvers + self._buffer: List[FinalSolverResponse] = [] + + @message_handler + async def handle_question(self, message: Question, ctx: MessageContext) -> None: + print(f"{'-'*80}\\nAggregator {self.id} received question:\\n{message.content}") + prompt = ( + f"Can you solve the following problem?\\n{message.content}\\n" + "Explain your reasoning. Your final answer should be a single string." + ) + print(f"{'-'*80}\\nAggregator {self.id} publishes initial solver request.") + await self.publish_message(SolverRequest(content=prompt, question=message.content), topic_id=DefaultTopicId()) + + @message_handler + async def handle_final_solver_response(self, message: FinalSolverResponse, ctx: MessageContext) -> None: + self._buffer.append(message) + if len(self._buffer) == self._num_solvers: + print(f"{'-'*80}\\nAggregator {self.id} received all final answers from {self._num_solvers} solvers.") + # Find the majority answer. + answers = [resp.answer for resp in self._buffer] + majority_answer = max(set(answers), key=answers.count) + # Publish the aggregated response. + await self.publish_message(Answer(content=majority_answer), topic_id=TopicId("result", self.id.key)) + # Clear the responses. + self._buffer.clear() + print(f"{'-'*80}\\nAggregator {self.id} publishes final answer:\\n{majority_answer}") + + + # Define the main function to set up and run the agent system + async def main(): + queue = asyncio.Queue[Answer]() + async def output_result(_runtime: AgentRuntime, id: AgentId, message: Answer, ctx: MessageContext) -> None: + await queue.put(message) + + runtime = SingleThreadedAgentRuntime() + await Solver.register( + runtime, + "SolverA", + lambda: Solver( + model_client=model_client, + topic_type="SolverA", + num_neighbors=2, + max_round=3, + ), + ) + await Solver.register( + runtime, + "SolverB", + lambda: Solver( + model_client=model_client, + topic_type="SolverB", + num_neighbors=2, + max_round=3, + ), + ) + await Solver.register( + runtime, + "SolverC", + lambda: Solver( + model_client=model_client, + topic_type="SolverC", + num_neighbors=2, + max_round=3, + ), + ) + await Solver.register( + runtime, + "SolverD", + lambda: Solver( + model_client=model_client, + topic_type="SolverD", + num_neighbors=2, + max_round=3, + ), + ) + await Aggregator.register(runtime, "Aggregator", lambda: Aggregator(num_solvers=4)) + + # Subscriptions for topic published to by SolverA. + await runtime.add_subscription(TypeSubscription("SolverA", "SolverD")) + await runtime.add_subscription(TypeSubscription("SolverA", "SolverB")) + + # Subscriptions for topic published to by SolverB. + await runtime.add_subscription(TypeSubscription("SolverB", "SolverA")) + await runtime.add_subscription(TypeSubscription("SolverB", "SolverC")) + + # Subscriptions for topic published to by SolverC. + await runtime.add_subscription(TypeSubscription("SolverC", "SolverB")) + await runtime.add_subscription(TypeSubscription("SolverC", "SolverD")) + + # Subscriptions for topic published to by SolverD. + await runtime.add_subscription(TypeSubscription("SolverD", "SolverC")) + await runtime.add_subscription(TypeSubscription("SolverD", "SolverA")) + + # All solvers and the aggregator subscribe to the default topic. + + result_topic = TypeSubscription(topic_type="result", agent_type="output_result") + await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [result_topic]) + + runtime.start() + await runtime.publish_message(Question(content=task), DefaultTopicId()) + + # Keep processing messages until idle. + await runtime.stop_when_idle() + + # Return the answer from the queue + res = (await queue.get()).content + print(f"res {res}") + return res + + return asyncio.run(main()) +''' } Take_a_step_back = {"thought": "Let LLM first think about the principles involved in solving this task which could be helpful. By understanding the underlying principles, the model can better reason through the problem and provide a more accurate solution.", @@ -1032,8 +1255,57 @@ async def main(): topic_id=TopicId(type="orchestrator_type") ) ``` +Or use the `type_subscription()` class decorator on the agent. +``` +@type_subscription(topic_type="orchestrator_type") +class OrchestratorAgent(RoutedAgent): + pass + +async def main(): + await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent()) + + await runtime.publish_message( + message=DiverseThoughtTask(task='What is the most creative art medium?'), + topic_id=TopicId(type="orchestrator_type") + ) +``` Now, you can publish directly to a specific topic through the runtime. +10. This is WRONG: ``` +class OrchestratorAgent(RoutedAgent): + pass + +async def main(): + await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent()) + + await runtime.publish_message( + message=DiverseThoughtTask(task='What is the most creative art medium?'), + topic_id=DefaultTopicId() + ) +``` +When there is a single scope of publishing, that is, all agents publish and subscribe to all broadcasted messages, we can use the convenience classes `DefaultTopicId` and `default_subscription()` to simplify our code. +Use the `default_subscription` class decorator on the agent. +``` +@default_subscription +class OrchestratorAgent(RoutedAgent): + pass + +async def main(): + await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent()) + + await runtime.publish_message( + message=DiverseThoughtTask(task='What is the most creative art medium?'), + topic_id=DefaultTopicId() + ) +``` + +11. This is WRONG: ``` +await runtime.publish_message(DiverseThoughtTask(task='Who is the most creative composer?'), AgentId("consensus_agent", "default")) +``` +The `publish_message` should publish to a topic. Use `TopicId` or `DefaultTopicId`. For example: ``` +await runtime.publish_message(DiverseThoughtTask(task='Who is the most creative composer?'), TopicId("consensus_agent", "default")) +``` + ## CORRECT Implementation examples: Here are some correct patterns you should follow: @@ -1137,8 +1409,8 @@ def get_init_archive(): # return [COT]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] # return [COT_SC]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] # return [Reflexion]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] - return [COT, COT_SC, Reflexion] # LLM_debate, Take_a_step_back, QD, Role_Assignment] - + # return [COT, COT_SC, Reflexion] # LLM_debate, Take_a_step_back, QD, Role_Assignment] + return [LLM_debate] def get_prompt(current_archive, adaptive=False):