A Python framework for building AI agent systems with robust task management in the form of a graph execution engine, inference capabilities, and caching.
We support advanced features like State Snapshotting, Middleware, Agent Directed Graph Execution, Open Telemetry Integrations and more.
đź”® Asimov is the foundation of bismuth.sh an in terminal coding agent that can handle many tasks autonomously. Check us out! đź”®
pip install asimov_agents
Checkout these docs which show off two basic examples that should be enough to get you experimenting!
Further documentation greatly appreciated in PRs!
Asimov Agents is composed of three main components:
-
Task Graph System
- Manages task execution flow and dependencies
- Supports different task states (WAITING, EXECUTING, COMPLETE, FAILED, PARTIAL)
- Uses Pydantic models for robust data validation
- Unique task identification via UUIDs
-
Inference Clients
- Supports multiple LLM providers:
- Anthropic Claude (via API)
- AWS Bedrock
- OpenAI (Including local models)
- Vertex
- Features:
- Streaming responses
- Tool/function calling capabilities
- Token usage tracking
- OpenTelemetry instrumentation
- Prompt caching support
- Supports multiple LLM providers:
-
Caching System
- Abstract Cache interface with Redis implementation
- Features:
- Key-value storage with JSON serialization
- Prefix/suffix namespacing
- Pub/sub messaging via mailboxes
- Bulk operations (get_all, clear)
- Async interface
- Tasks are created and tracked using the
Task
class - Each task has:
- Unique ID
- Type and objective
- Parameters dictionary
- Status tracking
- Result/error storage
-
Module Types
SUBGRAPH
: Nodes composes of other nodes.EXECUTOR
: Task execution modulesFLOW_CONTROL
: Execution flow control modules
-
Node Configuration
node_config = NodeConfig( parallel=True, # Enable parallel execution condition="task.ready", # Conditional execution retry_on_failure=True, # Enable retry mechanism max_retries=3, # Maximum retry attempts max_visits=5, # Maximum node visits inputs=["data"], # Required inputs outputs=["result"] # Expected outputs )
-
Flow Control Features
- Conditional branching based on task state
- Dynamic node addition during execution
- Dependency chain management
- Automatic cleanup of completed nodes
- Execution state tracking and recovery
- LLM directed flow for complex decisisons
-
Snapshot System
- State preservation modes:
NEVER
: No snapshotsONCE
: Single snapshotALWAYS
: Continuous snapshots
- Captures:
- Agent state
- Cache contents
- Task status
- Execution history
- Configurable storage location via
ASIMOV_SNAPSHOT
- State preservation modes:
-
Error Handling
- Automatic retry mechanisms
- Partial completion states
- Failed chain tracking
- Detailed error reporting
- Timeout management
- Messages are formatted with appropriate roles (SYSTEM, USER, ASSISTANT, TOOL_RESULT)
- Inference clients handle:
- Message formatting
- API communication
- Response streaming
- Token accounting
- Error handling
- Redis cache provides:
- Fast key-value storage
- Message queuing
- Namespace management
- Atomic operations
The Asimov Agents framework is built around several core primitives that enable flexible and powerful agent architectures:
The framework supports different types of modules through the ModuleType
enum:
SUBGRAPH
: Nodes composes of other nodes.EXECUTOR
: Task execution and action implementationFLOW_CONTROL
: Execution flow and routing control
The AgentModule
is the base class for all agent components:
class AgentModule:
name: str # Unique module identifier
type: ModuleType # Module type classification
config: ModuleConfig # Module configuration
dependencies: List[str] # Module dependencies
input_mailboxes: List[str] # Input communication channels
output_mailbox: str # Output communication channel
trace: bool # OpenTelemetry tracing flag
Nodes can be configured with various parameters through NodeConfig
:
class NodeConfig:
parallel: bool = False # Enable parallel execution
condition: Optional[str] = None # Execution condition
retry_on_failure: bool = True # Auto-retry on failures
max_retries: int = 3 # Maximum retry attempts
max_visits: int = 5 # Maximum node visits
inputs: List[str] = [] # Required inputs
outputs: List[str] = [] # Expected outputs
Flow control enables dynamic execution paths:
class FlowDecision:
next_node: str # Target node
condition: Optional[str] = None # Jump condition
cleanup_on_jump: bool = False # Cleanup on transition
class FlowControlConfig:
decisions: List[FlowDecision] # Decision rules
default: Optional[str] = None # Default node
cleanup_on_default: bool = True # Cleanup on default
Agent Directed Flow Control is a powerful feature that enables intelligent routing of tasks based on LLM decision making. It allows the system to:
- Dynamically route tasks to specialized modules based on content analysis
- Use example-based learning for routing decisions
- Support multiple voters for consensus-based routing
- Handle fallback cases with error handlers
Example configuration:
flow_control = Node(
name="flow_control",
type=ModuleType.FLOW_CONTROL,
modules=[
AgentDirectedFlowControl(
name="ContentFlowControl",
type=ModuleType.FLOW_CONTROL,
voters=3, # Number of voters for consensus
inference_client=inference_client,
system_description="A system that handles various content creation tasks",
flow_config=AgentDrivenFlowControlConfig(
decisions=[
AgentDrivenFlowDecision(
next_node="blog_writer",
metadata={"description": "Writes blog posts on technical topics"},
examples=[
Example(
message="Write a blog post about AI agents",
choices=[
{"choice": "blog_writer", "description": "Writes blog posts"},
{"choice": "code_writer", "description": "Writes code"}
],
choice="blog_writer",
reasoning="The request is specifically for blog content"
)
]
),
AgentDrivenFlowDecision(
next_node="code_writer",
metadata={"description": "Writes code examples and tutorials"},
examples=[
Example(
message="Create a Python script for data processing",
choices=[
{"choice": "blog_writer", "description": "Writes blog posts"},
{"choice": "code_writer", "description": "Writes code"}
],
choice="code_writer",
reasoning="The request is for code creation"
)
]
)
],
default="error_handler" # Fallback node for unmatched requests
)
)
]
)
Key features:
- Example-based routing decisions with clear reasoning
- Multiple voter support (configurable number of voters) for robust decision making
- Specialized executor modules for different content types (e.g., blog posts, code)
- Metadata-enriched routing configuration for better decision context
- Fallback error handling for unmatched requests
- Cached message passing between nodes using Redis
- Asynchronous execution with semaphore control
- Comprehensive error handling and reporting
For a complete working example of Agent Directed Flow Control, check out the examples/agent_directed_flow.py
file which demonstrates a content creation system that intelligently routes tasks between blog writing and code generation modules.
Middleware allows for processing interception:
class Middleware:
async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
return data # Process or transform data
The framework maintains execution state through:
class ExecutionState:
execution_index: int # Current execution position
current_plan: ExecutionPlan # Active execution plan
execution_history: List[ExecutionPlan] # Historical plans
total_iterations: int # Total execution iterations
State persistence is managed through SnapshotControl
:
NEVER
: No snapshots takenONCE
: Single snapshot captureALWAYS
: Continuous state capture
cache = RedisCache(
host="localhost", # Redis host
port=6379, # Redis port
db=0, # Database number
password=None, # Optional password
default_prefix="" # Optional key prefix
)
# Anthropic Client
client = AnthropicInferenceClient(
model="claude-3-5-sonnet-20241022",
api_key="your-api-key",
api_url="https://api.anthropic.com/v1/messages"
)
# AWS Bedrock Client
client = BedrockInferenceClient(
model="anthropic.claude-3-5-sonnet-20241022-v2:0",
region_name="us-east-1"
)
There is similar set up for VertexAI and OpenAI
# Create a task
task = Task(
type="processing",
objective="Process data",
params={"input": "data"}
)
# Create nodes with different module types
executor_node = Node(
name="executor",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
dependencies=["planner"]
)
flow_control = Node(
name="flow_control",
type=ModuleType.FLOW_CONTROL,
modules=[FlowControlModule(
flow_config=FlowControlConfig(
decisions=[
FlowDecision(
next_node="executor",
condition="task.ready == true" # Conditions are small lua scripts that get run based on current state.
)
],
default="planner"
)
)]
)
# Set up the agent
agent = Agent(
cache=RedisCache(),
max_concurrent_tasks=5,
max_total_iterations=100
)
# Add nodes to the agent
agent.add_multiple_nodes([executor_node, flow_control])
# Run the task
await agent.run_task(task)
class LoggingMiddleware(Middleware):
async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
print(f"Processing data: {data}")
return data
node = Node(
name="executor",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
config=ModuleConfig(
middlewares=[LoggingMiddleware()],
timeout=30.0
)
)
- Tracks execution history
- Supports execution plan compilation
- Enables dynamic plan modification
- Provides state restoration capabilities
# Access execution state
current_plan = agent.execution_state.current_plan
execution_history = agent.execution_state.execution_history
total_iterations = agent.execution_state.total_iterations
# Compile execution plans
full_plan = agent.compile_execution_plan()
partial_plan = agent.compile_execution_plan_from("specific_node")
# Restore from snapshot
await agent.run_from_snapshot(snapshot_dir)
- Automatic span creation for nodes
- Execution tracking
- Performance monitoring
- Error tracing
node = Node(
name="traced_node",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
trace=True # Enable OpenTelemetry tracing
)
- Use appropriate key prefixes/suffixes for namespace isolation
- Consider timeout settings for blocking operations
- Monitor Redis memory usage
- Use raw mode when bypassing JSON serialization
- Token usage is tracked automatically
- Streaming reduces time-to-first-token
- Tool calls support iteration limits
- Prompt caching can improve response times
- Tasks support partial failure states
- Use UUIDs for guaranteed uniqueness
- Status transitions are atomic
pytest tests/
- Redis server (If using caching)
- Python 3.12+
- See requirements.txt for Python packages
ApacheV2