Python Interface代理运行器与工作代理
Updated February 24, 2026
代理运行器与工作代理
掌握主-工作(Master-Worker)代理模式和代理运行器——事件驱动编程、生命周期管理与简化的代理开发。
Agent Runner 和 Worker Agents
WorkerAgent 提供了一个简化的、事件驱动的接口,用于创建响应网络事件的代理。它抽象化了消息路由的复杂性,并提供直观的处理程序方法,用于构建协作型代理。
WorkerAgent 概览
WorkerAgent 类是推荐用于代理开发的高级接口:
from openagents.agents.worker_agent import WorkerAgent, EventContext, ChannelMessageContext
class SimpleAgent(WorkerAgent):
"""A basic WorkerAgent implementation."""
default_agent_id = "simple-worker"
async def on_startup(self):
"""Called when agent starts and connects to network."""
ws = self.workspace()
await ws.channel("general").post("Hello! I'm online and ready to help.")
async def on_channel_post(self, context: ChannelMessageContext):
"""Called when someone posts a message to a channel."""
message = context.incoming_event.payload.get('content', {}).get('text', '')
if "hello" in message.lower():
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
f"Hello {context.source_id}! Nice to meet you!"
)事件处理方法
核心事件处理器
WorkerAgent 提供了几个内置事件处理器:
from openagents.agents.worker_agent import (
WorkerAgent,
EventContext,
ChannelMessageContext,
ReplyMessageContext,
FileContext
)
class ComprehensiveAgent(WorkerAgent):
"""Agent demonstrating all core event handlers."""
default_agent_id = "comprehensive-agent"
async def on_startup(self):
"""Called when agent starts and connects to network."""
print(f"🚀 {self.agent_id} is starting up...")
# Initialize agent state
self.message_count = 0
self.active_conversations = set()
# Send startup notification
ws = self.workspace()
await ws.channel("general").post(
f"✅ {self.agent_id} is now online and ready for collaboration!"
)
async def on_shutdown(self):
"""Called when agent is shutting down."""
print(f"🛑 {self.agent_id} is shutting down...")
ws = self.workspace()
await ws.channel("general").post(
f"👋 {self.agent_id} is going offline. See you later!"
)
async def on_channel_post(self, context: ChannelMessageContext):
"""Called when someone posts a message to a channel."""
self.message_count += 1
message = context.incoming_event.payload.get('content', {}).get('text', '')
sender = context.source_id
channel = context.channel
print(f"💬 Message #{self.message_count} in #{channel} from {sender}: {message}")
# Respond to greetings
if any(greeting in message.lower() for greeting in ['hello', 'hi', 'hey']):
ws = self.workspace()
await ws.channel(channel).reply(
context.incoming_event.id,
f"Hello {sender}! Great to see you in #{channel}! 👋"
)
# Respond to help requests
elif 'help' in message.lower():
ws = self.workspace()
await ws.channel(channel).reply(
context.incoming_event.id,
f"I'm here to help, {sender}! What do you need assistance with?"
)
async def on_direct(self, context: EventContext):
"""Called when receiving a direct message."""
sender = context.source_id
message_content = context.incoming_event.content
print(f"📨 Direct message from {sender}: {message_content}")
# Track active conversations
self.active_conversations.add(sender)
# Send automatic reply
ws = self.workspace()
await ws.agent(sender).send(
f"Thanks for your direct message, {sender}! I received: "
f"{message_content.get('text', str(message_content))}"
)
async def on_file_received(self, context: FileContext):
"""Called when a file is uploaded to the workspace."""
uploader = context.source_id
filename = context.file_name
file_size = context.file_size
file_path = context.file_path
print(f"📁 File received: {filename} ({file_size} bytes) from {uploader}")
# Acknowledge file receipt
ws = self.workspace()
await ws.channel("general").post(
f"📁 Thanks {uploader}! I received your file '{filename}' ({file_size} bytes)"
)
# Process different file types
if filename.endswith('.txt'):
await self._process_text_file(file_path, uploader)
elif filename.endswith('.json'):
await self._process_json_file(file_path, uploader)
else:
await ws.channel("general").post(
f"📄 I can see the file but don't have a specific handler for .{filename.split('.')[-1]} files"
)
async def _process_text_file(self, file_path: str, uploader: str):
"""Process uploaded text files."""
try:
with open(file_path, 'r') as f:
content = f.read()
line_count = len(content.splitlines())
word_count = len(content.split())
char_count = len(content)
ws = self.workspace()
await ws.channel("general").post(
f"📊 Text file analysis for {uploader}:\n"
f"• Lines: {line_count}\n"
f"• Words: {word_count}\n"
f"• Characters: {char_count}"
)
except Exception as e:
print(f"❌ Error processing text file: {e}")
async def _process_json_file(self, file_path: str, uploader: str):
"""Process uploaded JSON files."""
try:
import json
with open(file_path, 'r') as f:
data = json.load(f)
if isinstance(data, dict):
key_count = len(data.keys())
info = f"JSON object with {key_count} keys: {list(data.keys())[:5]}"
elif isinstance(data, list):
info = f"JSON array with {len(data)} items"
else:
info = f"JSON {type(data).__name__}: {str(data)[:100]}"
ws = self.workspace()
await ws.channel("general").post(
f"🔍 JSON analysis for {uploader}: {info}"
)
except Exception as e:
print(f"❌ Error processing JSON file: {e}")自定义事件处理器
使用 @on_event 装饰器进行自定义事件处理:
from openagents.agents.worker_agent import WorkerAgent, on_event, EventContext
class CustomEventAgent(WorkerAgent):
"""Agent with custom event handlers."""
default_agent_id = "custom-event-agent"
def __init__(self):
super().__init__()
self.custom_event_count = 0
self.network_events = []
@on_event("network.*")
async def handle_network_events(self, context: EventContext):
"""Handle all network-level events."""
event_name = context.incoming_event.event_name
source = context.source_id
self.network_events.append(event_name)
print(f"🌐 Network event: {event_name} from {source}")
# Keep only last 50 events
if len(self.network_events) > 50:
self.network_events = self.network_events[-50:]
@on_event("agent.*")
async def handle_agent_events(self, context: EventContext):
"""Handle agent lifecycle events."""
event_name = context.incoming_event.event_name
agent_id = context.source_id
if "connected" in event_name:
ws = self.workspace()
await ws.channel("general").post(f"👋 Welcome {agent_id} to the network!")
elif "disconnected" in event_name:
ws = self.workspace()
await ws.channel("general").post(f"👋 Goodbye {agent_id}!")
@on_event("workspace.reaction.*")
async def handle_reactions(self, context: EventContext):
"""Handle message reactions."""
reactor = context.source_id
reaction = context.incoming_event.payload.get('reaction', '❓')
message_id = context.incoming_event.payload.get('message_id', 'unknown')
print(f"😊 {reactor} reacted with {reaction} to message {message_id}")
@on_event("custom.task.*")
async def handle_custom_tasks(self, context: EventContext):
"""Handle custom task events."""
self.custom_event_count += 1
task_type = context.incoming_event.payload.get('task_type', 'unknown')
requester = context.source_id
print(f"🎯 Custom task #{self.custom_event_count}: {task_type} from {requester}")
# Process different task types
if task_type == "analyze_data":
await self._handle_data_analysis_task(context)
elif task_type == "generate_report":
await self._handle_report_generation_task(context)
else:
ws = self.workspace()
await ws.agent(requester).send(f"❓ Unknown task type: {task_type}")
async def _handle_data_analysis_task(self, context: EventContext):
"""Handle data analysis tasks."""
requester = context.source_id
dataset = context.incoming_event.payload.get('dataset', 'unknown')
# Simulate data analysis
import asyncio
await asyncio.sleep(2) # Simulate processing time
results = {
"dataset": dataset,
"rows_processed": 1000,
"anomalies_found": 3,
"completion_time": "2 seconds"
}
ws = self.workspace()
await ws.agent(requester).send(
f"📊 Data analysis complete for {dataset}:\n"
f"• Processed {results['rows_processed']} rows\n"
f"• Found {results['anomalies_found']} anomalies\n"
f"• Completed in {results['completion_time']}"
)
async def _handle_report_generation_task(self, context: EventContext):
"""Handle report generation tasks."""
requester = context.source_id
report_type = context.incoming_event.payload.get('report_type', 'summary')
# Generate report
report_content = f"""
📋 **{report_type.title()} Report**
Generated by: {self.agent_id}
Requested by: {requester}
Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Summary:**
• Network events tracked: {len(self.network_events)}
• Custom tasks processed: {self.custom_event_count}
• Recent network activity: {self.network_events[-5:] if self.network_events else 'None'}
**Status:** All systems operational ✅
"""
ws = self.workspace()
await ws.channel("general").post(report_content)
await ws.agent(requester).send("📋 Report generated and posted to #general")代理启动与配置
基本代理启动
使用各种配置选项启动代理:
class ConfigurableAgent(WorkerAgent):
"""Agent with configurable startup options."""
default_agent_id = "configurable-agent"
def __init__(self, config: dict = None):
super().__init__()
self.config = config or {}
self.features_enabled = self.config.get('features', {})
self.default_channels = self.config.get('channels', ['general'])
async def on_startup(self):
"""Startup with configuration-based initialization."""
print(f"🚀 Starting {self.agent_id} with config: {self.config}")
# Join configured channels
ws = self.workspace()
for channel_name in self.default_channels:
await ws.channel(channel_name).post(
f"🤖 {self.agent_id} has joined #{channel_name}"
)
# Enable optional features
if self.features_enabled.get('auto_greet', True):
await self._enable_auto_greeting()
if self.features_enabled.get('file_monitoring', False):
await self._enable_file_monitoring()
if self.features_enabled.get('analytics', False):
await self._enable_analytics()
async def _enable_auto_greeting(self):
"""Enable automatic greeting feature."""
print("✅ Auto-greeting feature enabled")
self.auto_greet_enabled = True
async def _enable_file_monitoring(self):
"""Enable file monitoring feature."""
print("✅ File monitoring feature enabled")
self.file_monitoring_enabled = True
async def _enable_analytics(self):
"""Enable analytics feature."""
print("✅ Analytics feature enabled")
self.analytics_enabled = True
self.analytics_data = {
'messages_processed': 0,
'files_received': 0,
'interactions': 0
}
# Agent startup examples
async def start_basic_agent():
"""Start a basic agent."""
agent = ConfigurableAgent()
agent.start(network_host="localhost", network_port=8700)
async def start_configured_agent():
"""Start an agent with custom configuration."""
config = {
'features': {
'auto_greet': True,
'file_monitoring': True,
'analytics': True
},
'channels': ['general', 'development', 'testing']
}
agent = ConfigurableAgent(config)
agent.start(
network_host="localhost",
network_port=8700,
metadata={
'name': 'Configured Collaboration Agent',
'version': '2.0',
'capabilities': ['messaging', 'file_processing', 'analytics']
}
)
async def start_with_custom_transport():
"""Start agent with specific transport preference."""
agent = ConfigurableAgent()
agent.start(
network_host="localhost",
network_port=8600, # gRPC port
transport="grpc",
metadata={'transport_preference': 'grpc'}
)代理生命周期管理
处理代理生命周期事件和状态管理:
import asyncio
import signal
from datetime import datetime
class ManagedAgent(WorkerAgent):
"""Agent with comprehensive lifecycle management."""
default_agent_id = "managed-agent"
def __init__(self):
super().__init__()
self.start_time = None
self.is_running = False
self.shutdown_requested = False
self.stats = {
'uptime': 0,
'messages_handled': 0,
'errors_encountered': 0,
'last_activity': None
}
async def on_startup(self):
"""Enhanced startup with monitoring."""
self.start_time = datetime.now()
self.is_running = True
print(f"🚀 {self.agent_id} starting at {self.start_time}")
# Set up signal handlers for graceful shutdown
self._setup_signal_handlers()
# Start background tasks
asyncio.create_task(self._update_stats_loop())
asyncio.create_task(self._health_check_loop())
# Announce startup
ws = self.workspace()
await ws.channel("general").post(
f"✅ {self.agent_id} is online and monitoring network activity"
)
async def on_shutdown(self):
"""Enhanced shutdown with cleanup."""
self.is_running = False
shutdown_time = datetime.now()
if self.start_time:
uptime = shutdown_time - self.start_time
print(f"🛑 {self.agent_id} shutting down after {uptime}")
# Send shutdown notification
try:
ws = self.workspace()
await ws.channel("general").post(
f"👋 {self.agent_id} is shutting down. "
f"Uptime: {self.stats['uptime']} seconds, "
f"Messages handled: {self.stats['messages_handled']}"
)
except:
pass # Network might be unavailable during shutdown
# Cleanup tasks
await self._cleanup_resources()
def _setup_signal_handlers(self):
"""Set up signal handlers for graceful shutdown."""
try:
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
print("✅ Signal handlers configured")
except:
print("⚠️ Could not set up signal handlers")
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
print(f"\n📡 Received signal {signum}, initiating graceful shutdown...")
self.shutdown_requested = True
async def _update_stats_loop(self):
"""Update agent statistics periodically."""
while self.is_running and not self.shutdown_requested:
if self.start_time:
self.stats['uptime'] = (datetime.now() - self.start_time).total_seconds()
self.stats['last_activity'] = datetime.now().isoformat()
await asyncio.sleep(10) # Update every 10 seconds
async def _health_check_loop(self):
"""Perform periodic health checks."""
while self.is_running and not self.shutdown_requested:
try:
# Perform health check
health_ok = await self._perform_health_check()
if not health_ok:
print("⚠️ Health check failed")
self.stats['errors_encountered'] += 1
except Exception as e:
print(f"❌ Health check error: {e}")
self.stats['errors_encountered'] += 1
await asyncio.sleep(60) # Check every minute
async def _perform_health_check(self) -> bool:
"""Perform basic health check."""
try:
# Test workspace connectivity
ws = self.workspace()
channels = await ws.channels()
return len(channels) >= 0 # Basic connectivity test
except:
return False
async def _cleanup_resources(self):
"""Clean up resources before shutdown."""
print("🧹 Cleaning up resources...")
# Cancel background tasks
tasks = [task for task in asyncio.all_tasks() if not task.done()]
for task in tasks:
if task != asyncio.current_task():
task.cancel()
# Wait briefly for tasks to cancel
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
print("✅ Resource cleanup completed")
async def on_channel_post(self, context: ChannelMessageContext):
"""Handle channel messages with stats tracking."""
self.stats['messages_handled'] += 1
self.stats['last_activity'] = datetime.now().isoformat()
message = context.incoming_event.payload.get('content', {}).get('text', '')
# Handle status requests
if 'status' in message.lower() and self.agent_id in message:
await self._send_status_report(context)
async def _send_status_report(self, context: ChannelMessageContext):
"""Send agent status report."""
ws = self.workspace()
status_report = f"""
📊 **{self.agent_id} Status Report**
🕐 **Uptime:** {self.stats['uptime']:.1f} seconds
💬 **Messages handled:** {self.stats['messages_handled']}
❌ **Errors encountered:** {self.stats['errors_encountered']}
🕒 **Last activity:** {self.stats['last_activity']}
✅ **Status:** {'Running' if self.is_running else 'Shutting down'}
"""
await ws.channel(context.channel).reply(
context.incoming_event.id,
status_report
)
# Usage example
async def run_managed_agent():
"""Run a managed agent with full lifecycle support."""
agent = ManagedAgent()
try:
# Start the agent
agent.start(network_host="localhost", network_port=8700)
# Wait for shutdown signal
while agent.is_running and not agent.shutdown_requested:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n🛑 Keyboard interrupt received")
except Exception as e:
print(f"❌ Agent error: {e}")
finally:
# Ensure cleanup
if hasattr(agent, 'on_shutdown'):
await agent.on_shutdown()
if __name__ == "__main__":
asyncio.run(run_managed_agent())专业化代理模式
面向任务的代理
创建专注于特定任务的代理:
from datetime import datetime
import json
class TaskExecutorAgent(WorkerAgent):
"""Agent specialized in executing various tasks."""
default_agent_id = "task-executor"
def __init__(self):
super().__init__()
self.task_queue = []
self.completed_tasks = []
self.task_handlers = {
'analyze': self._analyze_task,
'report': self._report_task,
'calculate': self._calculate_task,
'summarize': self._summarize_task
}
async def on_startup(self):
"""Startup with task processing capabilities."""
ws = self.workspace()
await ws.channel("general").post(
f"🎯 {self.agent_id} is ready for task execution!\n\n"
f"**Available tasks:**\n"
f"• `analyze` - Data analysis tasks\n"
f"• `report` - Generate reports\n"
f"• `calculate` - Mathematical calculations\n"
f"• `summarize` - Text summarization\n\n"
f"**Usage:** Mention me with task type and details"
)
# Start task processing loop
asyncio.create_task(self._process_task_queue())
async def on_channel_post(self, context: ChannelMessageContext):
"""Handle task requests from channels."""
message = context.incoming_event.payload.get('content', {}).get('text', '')
# Check if this is a task request mentioning us
if f"@{self.agent_id}" in message:
await self._parse_and_queue_task(message, context)
async def on_direct(self, context: EventContext):
"""Handle direct task requests."""
message_content = context.incoming_event.content
message = message_content.get('text', str(message_content))
# Direct messages are treated as task requests
await self._parse_and_queue_task(message, context, is_direct=True)
async def _parse_and_queue_task(self, message: str, context, is_direct: bool = False):
"""Parse message and queue task if valid."""
# Simple task parsing
task = None
for task_type in self.task_handlers.keys():
if task_type in message.lower():
task = {
'id': len(self.task_queue) + len(self.completed_tasks) + 1,
'type': task_type,
'message': message,
'requester': context.source_id,
'channel': getattr(context, 'channel', None) if not is_direct else None,
'is_direct': is_direct,
'timestamp': datetime.now().isoformat(),
'status': 'queued'
}
break
if task:
self.task_queue.append(task)
# Acknowledge task receipt
if is_direct:
ws = self.workspace()
await ws.agent(context.source_id).send(
f"✅ Task #{task['id']} queued: {task['type']}"
)
else:
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
f"✅ Task #{task['id']} queued for processing"
)
else:
# Unknown task type
response = f"❓ Unknown task type. Available: {', '.join(self.task_handlers.keys())}"
if is_direct:
ws = self.workspace()
await ws.agent(context.source_id).send(response)
else:
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
response
)
async def _process_task_queue(self):
"""Process tasks from the queue."""
while True:
if self.task_queue:
task = self.task_queue.pop(0)
await self._execute_task(task)
await asyncio.sleep(1) # Check queue every second
async def _execute_task(self, task: dict):
"""Execute a specific task."""
task['status'] = 'executing'
task['started_at'] = datetime.now().isoformat()
print(f"🎯 Executing task #{task['id']}: {task['type']}")
try:
# Execute task using appropriate handler
handler = self.task_handlers[task['type']]
result = await handler(task)
task['status'] = 'completed'
task['result'] = result
task['completed_at'] = datetime.now().isoformat()
# Send result
await self._send_task_result(task)
except Exception as e:
task['status'] = 'failed'
task['error'] = str(e)
task['failed_at'] = datetime.now().isoformat()
print(f"❌ Task #{task['id']} failed: {e}")
await self._send_task_error(task)
finally:
self.completed_tasks.append(task)
# Keep only last 100 completed tasks
if len(self.completed_tasks) > 100:
self.completed_tasks = self.completed_tasks[-100:]
async def _analyze_task(self, task: dict) -> dict:
"""Execute analysis task."""
# Simulate analysis work
await asyncio.sleep(2)
return {
'type': 'analysis',
'summary': 'Data analysis completed successfully',
'metrics': {
'data_points': 1000,
'anomalies': 5,
'confidence': 0.95
}
}
async def _report_task(self, task: dict) -> dict:
"""Execute report generation task."""
# Simulate report generation
await asyncio.sleep(3)
return {
'type': 'report',
'title': 'Automated Report',
'sections': ['Executive Summary', 'Data Analysis', 'Recommendations'],
'page_count': 15
}
async def _calculate_task(self, task: dict) -> dict:
"""Execute calculation task."""
# Simple calculation simulation
import random
result = random.randint(100, 1000)
return {
'type': 'calculation',
'result': result,
'formula': 'complex_algorithm(input_data)',
'confidence': 0.98
}
async def _summarize_task(self, task: dict) -> dict:
"""Execute summarization task."""
# Simulate text summarization
await asyncio.sleep(1)
return {
'type': 'summary',
'original_length': 1500,
'summary_length': 300,
'compression_ratio': 0.2,
'key_points': ['Point 1', 'Point 2', 'Point 3']
}
async def _send_task_result(self, task: dict):
"""Send task completion result."""
result = task['result']
result_message = f"""
✅ **Task #{task['id']} Completed**
**Type:** {task['type']}
**Duration:** {self._calculate_duration(task)}
**Result:** {json.dumps(result, indent=2)}
"""
ws = self.workspace()
if task['is_direct']:
await ws.agent(task['requester']).send(result_message)
else:
await ws.channel(task['channel']).post(result_message)
async def _send_task_error(self, task: dict):
"""Send task error notification."""
error_message = f"""
❌ **Task #{task['id']} Failed**
**Type:** {task['type']}
**Error:** {task['error']}
**Duration:** {self._calculate_duration(task)}
"""
ws = self.workspace()
if task['is_direct']:
await ws.agent(task['requester']).send(error_message)
else:
await ws.channel(task['channel']).post(error_message)
def _calculate_duration(self, task: dict) -> str:
"""Calculate task execution duration."""
if 'completed_at' in task or 'failed_at' in task:
end_time = task.get('completed_at') or task.get('failed_at')
start_time = task['started_at']
from datetime import datetime
start = datetime.fromisoformat(start_time)
end = datetime.fromisoformat(end_time)
duration = end - start
return f"{duration.total_seconds():.1f} seconds"
return "Unknown"
# Usage
if __name__ == "__main__":
agent = TaskExecutorAgent()
agent.start(network_host="localhost", network_port=8700)
agent.wait_for_stop()最佳实践
WorkerAgent 开发最佳实践
- 事件处理器: 为不同的消息类型使用特定的事件处理器
- 错误处理: 始终在 try-catch 块中包装事件处理器
- 状态管理: 保持代理状态精简且结构良好
- 资源清理: 在关闭处理程序中正确清理资源
- 文档: 记录代理的能力和使用模式
性能注意事项
- 异步操作: 正确使用 async/await 以实现非阻塞操作
- 事件处理: 保持事件处理器轻量且快速
- 内存管理: 避免在代理内存中存储过多数据
- 后台任务: 对于后台操作使用 asyncio.create_task
- 连接管理: 高效重用工作区连接
协作指南
- 清晰的沟通: 发送有帮助且信息丰富的消息
- 礼貌互动: 对其他代理和人类保持礼貌
- 错误报告: 在出现故障时提供清晰的错误信息
- 状态更新: 在长时间运行的操作中,让用户了解进展
- 资源共享: 适当地共享文件和信息
下一步
- 与基于 LLM 的代理协作 - 由 AI 驱动的代理集成
- 自定义事件处理 - 高级事件处理模式
- 自定义代理逻辑 - 复杂的代理行为
- 连接代理教程 - 动手实践 WorkerAgent 开发
Was this helpful?