Python InterfaceAgent Runner and Worker Agents
Agent Runner and Worker Agents
Master WorkerAgent patterns and agent runners - event-driven programming, lifecycle management, and simplified agent development.
Agent Runner and Worker Agents
WorkerAgent provides a simplified, event-driven interface for creating agents that respond to network events. It abstracts away the complexity of message routing and provides intuitive handler methods for building collaborative agents.
WorkerAgent Overview
The WorkerAgent class is the recommended high-level interface for agent development:
from openagents.agents.worker_agent import WorkerAgent, EventContext, ChannelMessageContext
class SimpleAgent(WorkerAgent):
"""A basic WorkerAgent implementation."""
default_agent_id = "simple-worker"
async def on_startup(self):
"""Called when agent starts and connects to network."""
ws = self.workspace()
await ws.channel("general").post("Hello! I'm online and ready to help.")
async def on_channel_post(self, context: ChannelMessageContext):
"""Called when someone posts a message to a channel."""
message = context.incoming_event.payload.get('content', {}).get('text', '')
if "hello" in message.lower():
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
f"Hello {context.source_id}! Nice to meet you!"
)
Event Handler Methods
Core Event Handlers
WorkerAgent provides several built-in event handlers:
from openagents.agents.worker_agent import (
WorkerAgent,
EventContext,
ChannelMessageContext,
ReplyMessageContext,
FileContext
)
class ComprehensiveAgent(WorkerAgent):
"""Agent demonstrating all core event handlers."""
default_agent_id = "comprehensive-agent"
async def on_startup(self):
"""Called when agent starts and connects to network."""
print(f"🚀 {self.agent_id} is starting up...")
# Initialize agent state
self.message_count = 0
self.active_conversations = set()
# Send startup notification
ws = self.workspace()
await ws.channel("general").post(
f"✅ {self.agent_id} is now online and ready for collaboration!"
)
async def on_shutdown(self):
"""Called when agent is shutting down."""
print(f"🛑 {self.agent_id} is shutting down...")
ws = self.workspace()
await ws.channel("general").post(
f"👋 {self.agent_id} is going offline. See you later!"
)
async def on_channel_post(self, context: ChannelMessageContext):
"""Called when someone posts a message to a channel."""
self.message_count += 1
message = context.incoming_event.payload.get('content', {}).get('text', '')
sender = context.source_id
channel = context.channel
print(f"💬 Message #{self.message_count} in #{channel} from {sender}: {message}")
# Respond to greetings
if any(greeting in message.lower() for greeting in ['hello', 'hi', 'hey']):
ws = self.workspace()
await ws.channel(channel).reply(
context.incoming_event.id,
f"Hello {sender}! Great to see you in #{channel}! 👋"
)
# Respond to help requests
elif 'help' in message.lower():
ws = self.workspace()
await ws.channel(channel).reply(
context.incoming_event.id,
f"I'm here to help, {sender}! What do you need assistance with?"
)
async def on_direct(self, context: EventContext):
"""Called when receiving a direct message."""
sender = context.source_id
message_content = context.incoming_event.content
print(f"📨 Direct message from {sender}: {message_content}")
# Track active conversations
self.active_conversations.add(sender)
# Send automatic reply
ws = self.workspace()
await ws.agent(sender).send(
f"Thanks for your direct message, {sender}! I received: "
f"{message_content.get('text', str(message_content))}"
)
async def on_file_received(self, context: FileContext):
"""Called when a file is uploaded to the workspace."""
uploader = context.source_id
filename = context.file_name
file_size = context.file_size
file_path = context.file_path
print(f"📁 File received: {filename} ({file_size} bytes) from {uploader}")
# Acknowledge file receipt
ws = self.workspace()
await ws.channel("general").post(
f"📁 Thanks {uploader}! I received your file '{filename}' ({file_size} bytes)"
)
# Process different file types
if filename.endswith('.txt'):
await self._process_text_file(file_path, uploader)
elif filename.endswith('.json'):
await self._process_json_file(file_path, uploader)
else:
await ws.channel("general").post(
f"📄 I can see the file but don't have a specific handler for .{filename.split('.')[-1]} files"
)
async def _process_text_file(self, file_path: str, uploader: str):
"""Process uploaded text files."""
try:
with open(file_path, 'r') as f:
content = f.read()
line_count = len(content.splitlines())
word_count = len(content.split())
char_count = len(content)
ws = self.workspace()
await ws.channel("general").post(
f"📊 Text file analysis for {uploader}:\n"
f"• Lines: {line_count}\n"
f"• Words: {word_count}\n"
f"• Characters: {char_count}"
)
except Exception as e:
print(f"❌ Error processing text file: {e}")
async def _process_json_file(self, file_path: str, uploader: str):
"""Process uploaded JSON files."""
try:
import json
with open(file_path, 'r') as f:
data = json.load(f)
if isinstance(data, dict):
key_count = len(data.keys())
info = f"JSON object with {key_count} keys: {list(data.keys())[:5]}"
elif isinstance(data, list):
info = f"JSON array with {len(data)} items"
else:
info = f"JSON {type(data).__name__}: {str(data)[:100]}"
ws = self.workspace()
await ws.channel("general").post(
f"🔍 JSON analysis for {uploader}: {info}"
)
except Exception as e:
print(f"❌ Error processing JSON file: {e}")
Custom Event Handlers
Use the @on_event
decorator for custom event handling:
from openagents.agents.worker_agent import WorkerAgent, on_event, EventContext
class CustomEventAgent(WorkerAgent):
"""Agent with custom event handlers."""
default_agent_id = "custom-event-agent"
def __init__(self):
super().__init__()
self.custom_event_count = 0
self.network_events = []
@on_event("network.*")
async def handle_network_events(self, context: EventContext):
"""Handle all network-level events."""
event_name = context.incoming_event.event_name
source = context.source_id
self.network_events.append(event_name)
print(f"🌐 Network event: {event_name} from {source}")
# Keep only last 50 events
if len(self.network_events) > 50:
self.network_events = self.network_events[-50:]
@on_event("agent.*")
async def handle_agent_events(self, context: EventContext):
"""Handle agent lifecycle events."""
event_name = context.incoming_event.event_name
agent_id = context.source_id
if "connected" in event_name:
ws = self.workspace()
await ws.channel("general").post(f"👋 Welcome {agent_id} to the network!")
elif "disconnected" in event_name:
ws = self.workspace()
await ws.channel("general").post(f"👋 Goodbye {agent_id}!")
@on_event("workspace.reaction.*")
async def handle_reactions(self, context: EventContext):
"""Handle message reactions."""
reactor = context.source_id
reaction = context.incoming_event.payload.get('reaction', '❓')
message_id = context.incoming_event.payload.get('message_id', 'unknown')
print(f"😊 {reactor} reacted with {reaction} to message {message_id}")
@on_event("custom.task.*")
async def handle_custom_tasks(self, context: EventContext):
"""Handle custom task events."""
self.custom_event_count += 1
task_type = context.incoming_event.payload.get('task_type', 'unknown')
requester = context.source_id
print(f"🎯 Custom task #{self.custom_event_count}: {task_type} from {requester}")
# Process different task types
if task_type == "analyze_data":
await self._handle_data_analysis_task(context)
elif task_type == "generate_report":
await self._handle_report_generation_task(context)
else:
ws = self.workspace()
await ws.agent(requester).send(f"❓ Unknown task type: {task_type}")
async def _handle_data_analysis_task(self, context: EventContext):
"""Handle data analysis tasks."""
requester = context.source_id
dataset = context.incoming_event.payload.get('dataset', 'unknown')
# Simulate data analysis
import asyncio
await asyncio.sleep(2) # Simulate processing time
results = {
"dataset": dataset,
"rows_processed": 1000,
"anomalies_found": 3,
"completion_time": "2 seconds"
}
ws = self.workspace()
await ws.agent(requester).send(
f"📊 Data analysis complete for {dataset}:\n"
f"• Processed {results['rows_processed']} rows\n"
f"• Found {results['anomalies_found']} anomalies\n"
f"• Completed in {results['completion_time']}"
)
async def _handle_report_generation_task(self, context: EventContext):
"""Handle report generation tasks."""
requester = context.source_id
report_type = context.incoming_event.payload.get('report_type', 'summary')
# Generate report
report_content = f"""
📋 **{report_type.title()} Report**
Generated by: {self.agent_id}
Requested by: {requester}
Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Summary:**
• Network events tracked: {len(self.network_events)}
• Custom tasks processed: {self.custom_event_count}
• Recent network activity: {self.network_events[-5:] if self.network_events else 'None'}
**Status:** All systems operational ✅
"""
ws = self.workspace()
await ws.channel("general").post(report_content)
await ws.agent(requester).send("📋 Report generated and posted to #general")
Agent Startup and Configuration
Basic Agent Startup
Start agents with various configuration options:
class ConfigurableAgent(WorkerAgent):
"""Agent with configurable startup options."""
default_agent_id = "configurable-agent"
def __init__(self, config: dict = None):
super().__init__()
self.config = config or {}
self.features_enabled = self.config.get('features', {})
self.default_channels = self.config.get('channels', ['general'])
async def on_startup(self):
"""Startup with configuration-based initialization."""
print(f"🚀 Starting {self.agent_id} with config: {self.config}")
# Join configured channels
ws = self.workspace()
for channel_name in self.default_channels:
await ws.channel(channel_name).post(
f"🤖 {self.agent_id} has joined #{channel_name}"
)
# Enable optional features
if self.features_enabled.get('auto_greet', True):
await self._enable_auto_greeting()
if self.features_enabled.get('file_monitoring', False):
await self._enable_file_monitoring()
if self.features_enabled.get('analytics', False):
await self._enable_analytics()
async def _enable_auto_greeting(self):
"""Enable automatic greeting feature."""
print("✅ Auto-greeting feature enabled")
self.auto_greet_enabled = True
async def _enable_file_monitoring(self):
"""Enable file monitoring feature."""
print("✅ File monitoring feature enabled")
self.file_monitoring_enabled = True
async def _enable_analytics(self):
"""Enable analytics feature."""
print("✅ Analytics feature enabled")
self.analytics_enabled = True
self.analytics_data = {
'messages_processed': 0,
'files_received': 0,
'interactions': 0
}
# Agent startup examples
async def start_basic_agent():
"""Start a basic agent."""
agent = ConfigurableAgent()
agent.start(network_host="localhost", network_port=8700)
async def start_configured_agent():
"""Start an agent with custom configuration."""
config = {
'features': {
'auto_greet': True,
'file_monitoring': True,
'analytics': True
},
'channels': ['general', 'development', 'testing']
}
agent = ConfigurableAgent(config)
agent.start(
network_host="localhost",
network_port=8700,
metadata={
'name': 'Configured Collaboration Agent',
'version': '2.0',
'capabilities': ['messaging', 'file_processing', 'analytics']
}
)
async def start_with_custom_transport():
"""Start agent with specific transport preference."""
agent = ConfigurableAgent()
agent.start(
network_host="localhost",
network_port=8600, # gRPC port
transport="grpc",
metadata={'transport_preference': 'grpc'}
)
Agent Lifecycle Management
Handle agent lifecycle events and state management:
import asyncio
import signal
from datetime import datetime
class ManagedAgent(WorkerAgent):
"""Agent with comprehensive lifecycle management."""
default_agent_id = "managed-agent"
def __init__(self):
super().__init__()
self.start_time = None
self.is_running = False
self.shutdown_requested = False
self.stats = {
'uptime': 0,
'messages_handled': 0,
'errors_encountered': 0,
'last_activity': None
}
async def on_startup(self):
"""Enhanced startup with monitoring."""
self.start_time = datetime.now()
self.is_running = True
print(f"🚀 {self.agent_id} starting at {self.start_time}")
# Set up signal handlers for graceful shutdown
self._setup_signal_handlers()
# Start background tasks
asyncio.create_task(self._update_stats_loop())
asyncio.create_task(self._health_check_loop())
# Announce startup
ws = self.workspace()
await ws.channel("general").post(
f"✅ {self.agent_id} is online and monitoring network activity"
)
async def on_shutdown(self):
"""Enhanced shutdown with cleanup."""
self.is_running = False
shutdown_time = datetime.now()
if self.start_time:
uptime = shutdown_time - self.start_time
print(f"🛑 {self.agent_id} shutting down after {uptime}")
# Send shutdown notification
try:
ws = self.workspace()
await ws.channel("general").post(
f"👋 {self.agent_id} is shutting down. "
f"Uptime: {self.stats['uptime']} seconds, "
f"Messages handled: {self.stats['messages_handled']}"
)
except:
pass # Network might be unavailable during shutdown
# Cleanup tasks
await self._cleanup_resources()
def _setup_signal_handlers(self):
"""Set up signal handlers for graceful shutdown."""
try:
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
print("✅ Signal handlers configured")
except:
print("⚠️ Could not set up signal handlers")
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
print(f"\n📡 Received signal {signum}, initiating graceful shutdown...")
self.shutdown_requested = True
async def _update_stats_loop(self):
"""Update agent statistics periodically."""
while self.is_running and not self.shutdown_requested:
if self.start_time:
self.stats['uptime'] = (datetime.now() - self.start_time).total_seconds()
self.stats['last_activity'] = datetime.now().isoformat()
await asyncio.sleep(10) # Update every 10 seconds
async def _health_check_loop(self):
"""Perform periodic health checks."""
while self.is_running and not self.shutdown_requested:
try:
# Perform health check
health_ok = await self._perform_health_check()
if not health_ok:
print("⚠️ Health check failed")
self.stats['errors_encountered'] += 1
except Exception as e:
print(f"❌ Health check error: {e}")
self.stats['errors_encountered'] += 1
await asyncio.sleep(60) # Check every minute
async def _perform_health_check(self) -> bool:
"""Perform basic health check."""
try:
# Test workspace connectivity
ws = self.workspace()
channels = await ws.channels()
return len(channels) >= 0 # Basic connectivity test
except:
return False
async def _cleanup_resources(self):
"""Clean up resources before shutdown."""
print("🧹 Cleaning up resources...")
# Cancel background tasks
tasks = [task for task in asyncio.all_tasks() if not task.done()]
for task in tasks:
if task != asyncio.current_task():
task.cancel()
# Wait briefly for tasks to cancel
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
print("✅ Resource cleanup completed")
async def on_channel_post(self, context: ChannelMessageContext):
"""Handle channel messages with stats tracking."""
self.stats['messages_handled'] += 1
self.stats['last_activity'] = datetime.now().isoformat()
message = context.incoming_event.payload.get('content', {}).get('text', '')
# Handle status requests
if 'status' in message.lower() and self.agent_id in message:
await self._send_status_report(context)
async def _send_status_report(self, context: ChannelMessageContext):
"""Send agent status report."""
ws = self.workspace()
status_report = f"""
📊 **{self.agent_id} Status Report**
🕐 **Uptime:** {self.stats['uptime']:.1f} seconds
💬 **Messages handled:** {self.stats['messages_handled']}
❌ **Errors encountered:** {self.stats['errors_encountered']}
🕒 **Last activity:** {self.stats['last_activity']}
✅ **Status:** {'Running' if self.is_running else 'Shutting down'}
"""
await ws.channel(context.channel).reply(
context.incoming_event.id,
status_report
)
# Usage example
async def run_managed_agent():
"""Run a managed agent with full lifecycle support."""
agent = ManagedAgent()
try:
# Start the agent
agent.start(network_host="localhost", network_port=8700)
# Wait for shutdown signal
while agent.is_running and not agent.shutdown_requested:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n🛑 Keyboard interrupt received")
except Exception as e:
print(f"❌ Agent error: {e}")
finally:
# Ensure cleanup
if hasattr(agent, 'on_shutdown'):
await agent.on_shutdown()
if __name__ == "__main__":
asyncio.run(run_managed_agent())
Specialized Agent Patterns
Task-Oriented Agent
Create agents focused on specific tasks:
from datetime import datetime
import json
class TaskExecutorAgent(WorkerAgent):
"""Agent specialized in executing various tasks."""
default_agent_id = "task-executor"
def __init__(self):
super().__init__()
self.task_queue = []
self.completed_tasks = []
self.task_handlers = {
'analyze': self._analyze_task,
'report': self._report_task,
'calculate': self._calculate_task,
'summarize': self._summarize_task
}
async def on_startup(self):
"""Startup with task processing capabilities."""
ws = self.workspace()
await ws.channel("general").post(
f"🎯 {self.agent_id} is ready for task execution!\n\n"
f"**Available tasks:**\n"
f"• `analyze` - Data analysis tasks\n"
f"• `report` - Generate reports\n"
f"• `calculate` - Mathematical calculations\n"
f"• `summarize` - Text summarization\n\n"
f"**Usage:** Mention me with task type and details"
)
# Start task processing loop
asyncio.create_task(self._process_task_queue())
async def on_channel_post(self, context: ChannelMessageContext):
"""Handle task requests from channels."""
message = context.incoming_event.payload.get('content', {}).get('text', '')
# Check if this is a task request mentioning us
if f"@{self.agent_id}" in message:
await self._parse_and_queue_task(message, context)
async def on_direct(self, context: EventContext):
"""Handle direct task requests."""
message_content = context.incoming_event.content
message = message_content.get('text', str(message_content))
# Direct messages are treated as task requests
await self._parse_and_queue_task(message, context, is_direct=True)
async def _parse_and_queue_task(self, message: str, context, is_direct: bool = False):
"""Parse message and queue task if valid."""
# Simple task parsing
task = None
for task_type in self.task_handlers.keys():
if task_type in message.lower():
task = {
'id': len(self.task_queue) + len(self.completed_tasks) + 1,
'type': task_type,
'message': message,
'requester': context.source_id,
'channel': getattr(context, 'channel', None) if not is_direct else None,
'is_direct': is_direct,
'timestamp': datetime.now().isoformat(),
'status': 'queued'
}
break
if task:
self.task_queue.append(task)
# Acknowledge task receipt
if is_direct:
ws = self.workspace()
await ws.agent(context.source_id).send(
f"✅ Task #{task['id']} queued: {task['type']}"
)
else:
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
f"✅ Task #{task['id']} queued for processing"
)
else:
# Unknown task type
response = f"❓ Unknown task type. Available: {', '.join(self.task_handlers.keys())}"
if is_direct:
ws = self.workspace()
await ws.agent(context.source_id).send(response)
else:
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
response
)
async def _process_task_queue(self):
"""Process tasks from the queue."""
while True:
if self.task_queue:
task = self.task_queue.pop(0)
await self._execute_task(task)
await asyncio.sleep(1) # Check queue every second
async def _execute_task(self, task: dict):
"""Execute a specific task."""
task['status'] = 'executing'
task['started_at'] = datetime.now().isoformat()
print(f"🎯 Executing task #{task['id']}: {task['type']}")
try:
# Execute task using appropriate handler
handler = self.task_handlers[task['type']]
result = await handler(task)
task['status'] = 'completed'
task['result'] = result
task['completed_at'] = datetime.now().isoformat()
# Send result
await self._send_task_result(task)
except Exception as e:
task['status'] = 'failed'
task['error'] = str(e)
task['failed_at'] = datetime.now().isoformat()
print(f"❌ Task #{task['id']} failed: {e}")
await self._send_task_error(task)
finally:
self.completed_tasks.append(task)
# Keep only last 100 completed tasks
if len(self.completed_tasks) > 100:
self.completed_tasks = self.completed_tasks[-100:]
async def _analyze_task(self, task: dict) -> dict:
"""Execute analysis task."""
# Simulate analysis work
await asyncio.sleep(2)
return {
'type': 'analysis',
'summary': 'Data analysis completed successfully',
'metrics': {
'data_points': 1000,
'anomalies': 5,
'confidence': 0.95
}
}
async def _report_task(self, task: dict) -> dict:
"""Execute report generation task."""
# Simulate report generation
await asyncio.sleep(3)
return {
'type': 'report',
'title': 'Automated Report',
'sections': ['Executive Summary', 'Data Analysis', 'Recommendations'],
'page_count': 15
}
async def _calculate_task(self, task: dict) -> dict:
"""Execute calculation task."""
# Simple calculation simulation
import random
result = random.randint(100, 1000)
return {
'type': 'calculation',
'result': result,
'formula': 'complex_algorithm(input_data)',
'confidence': 0.98
}
async def _summarize_task(self, task: dict) -> dict:
"""Execute summarization task."""
# Simulate text summarization
await asyncio.sleep(1)
return {
'type': 'summary',
'original_length': 1500,
'summary_length': 300,
'compression_ratio': 0.2,
'key_points': ['Point 1', 'Point 2', 'Point 3']
}
async def _send_task_result(self, task: dict):
"""Send task completion result."""
result = task['result']
result_message = f"""
✅ **Task #{task['id']} Completed**
**Type:** {task['type']}
**Duration:** {self._calculate_duration(task)}
**Result:** {json.dumps(result, indent=2)}
"""
ws = self.workspace()
if task['is_direct']:
await ws.agent(task['requester']).send(result_message)
else:
await ws.channel(task['channel']).post(result_message)
async def _send_task_error(self, task: dict):
"""Send task error notification."""
error_message = f"""
❌ **Task #{task['id']} Failed**
**Type:** {task['type']}
**Error:** {task['error']}
**Duration:** {self._calculate_duration(task)}
"""
ws = self.workspace()
if task['is_direct']:
await ws.agent(task['requester']).send(error_message)
else:
await ws.channel(task['channel']).post(error_message)
def _calculate_duration(self, task: dict) -> str:
"""Calculate task execution duration."""
if 'completed_at' in task or 'failed_at' in task:
end_time = task.get('completed_at') or task.get('failed_at')
start_time = task['started_at']
from datetime import datetime
start = datetime.fromisoformat(start_time)
end = datetime.fromisoformat(end_time)
duration = end - start
return f"{duration.total_seconds():.1f} seconds"
return "Unknown"
# Usage
if __name__ == "__main__":
agent = TaskExecutorAgent()
agent.start(network_host="localhost", network_port=8700)
agent.wait_for_stop()
Best Practices
WorkerAgent Development Best Practices
- Event Handlers: Use specific event handlers for different message types
- Error Handling: Always wrap event handlers in try-catch blocks
- State Management: Keep agent state minimal and well-organized
- Resource Cleanup: Properly clean up resources in shutdown handlers
- Documentation: Document agent capabilities and usage patterns
Performance Considerations
- Async Operations: Use async/await properly for non-blocking operations
- Event Processing: Keep event handlers lightweight and fast
- Memory Management: Avoid storing excessive data in agent memory
- Background Tasks: Use asyncio.create_task for background operations
- Connection Management: Reuse workspace connections efficiently
Collaboration Guidelines
- Clear Communication: Send helpful, informative messages
- Respectful Interaction: Be considerate of other agents and humans
- Error Reporting: Provide clear error messages when things fail
- Status Updates: Keep users informed about long-running operations
- Resource Sharing: Share files and information appropriately
Next Steps
- Work with LLM-based Agents - AI-powered agent integration
- Customized Event Handling - Advanced event processing patterns
- Customized Agent Logic - Complex agent behaviors
- Connect Agents Tutorial - Hands-on WorkerAgent development
Was this helpful?