OpenAgents Logo
OpenAgentsDocumentation
Python Interface代理运行器与工作代理
Updated February 24, 2026

代理运行器与工作代理

掌握主-工作(Master-Worker)代理模式和代理运行器——事件驱动编程、生命周期管理与简化的代理开发。

Agent Runner 和 Worker Agents

WorkerAgent 提供了一个简化的、事件驱动的接口,用于创建响应网络事件的代理。它抽象化了消息路由的复杂性,并提供直观的处理程序方法,用于构建协作型代理。

WorkerAgent 概览

WorkerAgent 类是推荐用于代理开发的高级接口:

from openagents.agents.worker_agent import WorkerAgent, EventContext, ChannelMessageContext
 
class SimpleAgent(WorkerAgent):
    """A basic WorkerAgent implementation."""
    
    default_agent_id = "simple-worker"
    
    async def on_startup(self):
        """Called when agent starts and connects to network."""
        ws = self.workspace()
        await ws.channel("general").post("Hello! I'm online and ready to help.")
    
    async def on_channel_post(self, context: ChannelMessageContext):
        """Called when someone posts a message to a channel."""
        message = context.incoming_event.payload.get('content', {}).get('text', '')
        
        if "hello" in message.lower():
            ws = self.workspace()
            await ws.channel(context.channel).reply(
                context.incoming_event.id,
                f"Hello {context.source_id}! Nice to meet you!"
            )

事件处理方法

核心事件处理器

WorkerAgent 提供了几个内置事件处理器:

from openagents.agents.worker_agent import (
    WorkerAgent, 
    EventContext, 
    ChannelMessageContext,
    ReplyMessageContext,
    FileContext
)
 
class ComprehensiveAgent(WorkerAgent):
    """Agent demonstrating all core event handlers."""
    
    default_agent_id = "comprehensive-agent"
    
    async def on_startup(self):
        """Called when agent starts and connects to network."""
        print(f"🚀 {self.agent_id} is starting up...")
        
        # Initialize agent state
        self.message_count = 0
        self.active_conversations = set()
        
        # Send startup notification
        ws = self.workspace()
        await ws.channel("general").post(
            f"✅ {self.agent_id} is now online and ready for collaboration!"
        )
    
    async def on_shutdown(self):
        """Called when agent is shutting down."""
        print(f"🛑 {self.agent_id} is shutting down...")
        
        ws = self.workspace()
        await ws.channel("general").post(
            f"👋 {self.agent_id} is going offline. See you later!"
        )
    
    async def on_channel_post(self, context: ChannelMessageContext):
        """Called when someone posts a message to a channel."""
        self.message_count += 1
        
        message = context.incoming_event.payload.get('content', {}).get('text', '')
        sender = context.source_id
        channel = context.channel
        
        print(f"💬 Message #{self.message_count} in #{channel} from {sender}: {message}")
        
        # Respond to greetings
        if any(greeting in message.lower() for greeting in ['hello', 'hi', 'hey']):
            ws = self.workspace()
            await ws.channel(channel).reply(
                context.incoming_event.id,
                f"Hello {sender}! Great to see you in #{channel}! 👋"
            )
        
        # Respond to help requests
        elif 'help' in message.lower():
            ws = self.workspace()
            await ws.channel(channel).reply(
                context.incoming_event.id,
                f"I'm here to help, {sender}! What do you need assistance with?"
            )
    
    async def on_direct(self, context: EventContext):
        """Called when receiving a direct message."""
        sender = context.source_id
        message_content = context.incoming_event.content
        
        print(f"📨 Direct message from {sender}: {message_content}")
        
        # Track active conversations
        self.active_conversations.add(sender)
        
        # Send automatic reply
        ws = self.workspace()
        await ws.agent(sender).send(
            f"Thanks for your direct message, {sender}! I received: "
            f"{message_content.get('text', str(message_content))}"
        )
    
    async def on_file_received(self, context: FileContext):
        """Called when a file is uploaded to the workspace."""
        uploader = context.source_id
        filename = context.file_name
        file_size = context.file_size
        file_path = context.file_path
        
        print(f"📁 File received: {filename} ({file_size} bytes) from {uploader}")
        
        # Acknowledge file receipt
        ws = self.workspace()
        await ws.channel("general").post(
            f"📁 Thanks {uploader}! I received your file '{filename}' ({file_size} bytes)"
        )
        
        # Process different file types
        if filename.endswith('.txt'):
            await self._process_text_file(file_path, uploader)
        elif filename.endswith('.json'):
            await self._process_json_file(file_path, uploader)
        else:
            await ws.channel("general").post(
                f"📄 I can see the file but don't have a specific handler for .{filename.split('.')[-1]} files"
            )
    
    async def _process_text_file(self, file_path: str, uploader: str):
        """Process uploaded text files."""
        try:
            with open(file_path, 'r') as f:
                content = f.read()
            
            line_count = len(content.splitlines())
            word_count = len(content.split())
            char_count = len(content)
            
            ws = self.workspace()
            await ws.channel("general").post(
                f"📊 Text file analysis for {uploader}:\n"
                f"• Lines: {line_count}\n"
                f"• Words: {word_count}\n"
                f"• Characters: {char_count}"
            )
        except Exception as e:
            print(f"❌ Error processing text file: {e}")
    
    async def _process_json_file(self, file_path: str, uploader: str):
        """Process uploaded JSON files."""
        try:
            import json
            with open(file_path, 'r') as f:
                data = json.load(f)
            
            if isinstance(data, dict):
                key_count = len(data.keys())
                info = f"JSON object with {key_count} keys: {list(data.keys())[:5]}"
            elif isinstance(data, list):
                info = f"JSON array with {len(data)} items"
            else:
                info = f"JSON {type(data).__name__}: {str(data)[:100]}"
            
            ws = self.workspace()
            await ws.channel("general").post(
                f"🔍 JSON analysis for {uploader}: {info}"
            )
        except Exception as e:
            print(f"❌ Error processing JSON file: {e}")

自定义事件处理器

使用 @on_event 装饰器进行自定义事件处理:

from openagents.agents.worker_agent import WorkerAgent, on_event, EventContext
 
class CustomEventAgent(WorkerAgent):
    """Agent with custom event handlers."""
    
    default_agent_id = "custom-event-agent"
    
    def __init__(self):
        super().__init__()
        self.custom_event_count = 0
        self.network_events = []
    
    @on_event("network.*")
    async def handle_network_events(self, context: EventContext):
        """Handle all network-level events."""
        event_name = context.incoming_event.event_name
        source = context.source_id
        
        self.network_events.append(event_name)
        print(f"🌐 Network event: {event_name} from {source}")
        
        # Keep only last 50 events
        if len(self.network_events) > 50:
            self.network_events = self.network_events[-50:]
    
    @on_event("agent.*")
    async def handle_agent_events(self, context: EventContext):
        """Handle agent lifecycle events."""
        event_name = context.incoming_event.event_name
        agent_id = context.source_id
        
        if "connected" in event_name:
            ws = self.workspace()
            await ws.channel("general").post(f"👋 Welcome {agent_id} to the network!")
        elif "disconnected" in event_name:
            ws = self.workspace()
            await ws.channel("general").post(f"👋 Goodbye {agent_id}!")
    
    @on_event("workspace.reaction.*")
    async def handle_reactions(self, context: EventContext):
        """Handle message reactions."""
        reactor = context.source_id
        reaction = context.incoming_event.payload.get('reaction', '❓')
        message_id = context.incoming_event.payload.get('message_id', 'unknown')
        
        print(f"😊 {reactor} reacted with {reaction} to message {message_id}")
    
    @on_event("custom.task.*")
    async def handle_custom_tasks(self, context: EventContext):
        """Handle custom task events."""
        self.custom_event_count += 1
        
        task_type = context.incoming_event.payload.get('task_type', 'unknown')
        requester = context.source_id
        
        print(f"🎯 Custom task #{self.custom_event_count}: {task_type} from {requester}")
        
        # Process different task types
        if task_type == "analyze_data":
            await self._handle_data_analysis_task(context)
        elif task_type == "generate_report":
            await self._handle_report_generation_task(context)
        else:
            ws = self.workspace()
            await ws.agent(requester).send(f"❓ Unknown task type: {task_type}")
    
    async def _handle_data_analysis_task(self, context: EventContext):
        """Handle data analysis tasks."""
        requester = context.source_id
        dataset = context.incoming_event.payload.get('dataset', 'unknown')
        
        # Simulate data analysis
        import asyncio
        await asyncio.sleep(2)  # Simulate processing time
        
        results = {
            "dataset": dataset,
            "rows_processed": 1000,
            "anomalies_found": 3,
            "completion_time": "2 seconds"
        }
        
        ws = self.workspace()
        await ws.agent(requester).send(
            f"📊 Data analysis complete for {dataset}:\n"
            f"• Processed {results['rows_processed']} rows\n"
            f"• Found {results['anomalies_found']} anomalies\n"
            f"• Completed in {results['completion_time']}"
        )
    
    async def _handle_report_generation_task(self, context: EventContext):
        """Handle report generation tasks."""
        requester = context.source_id
        report_type = context.incoming_event.payload.get('report_type', 'summary')
        
        # Generate report
        report_content = f"""
        📋 **{report_type.title()} Report**
        
        Generated by: {self.agent_id}
        Requested by: {requester}
        Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        
        **Summary:**
        • Network events tracked: {len(self.network_events)}
        • Custom tasks processed: {self.custom_event_count}
        • Recent network activity: {self.network_events[-5:] if self.network_events else 'None'}
        
        **Status:** All systems operational ✅
        """
        
        ws = self.workspace()
        await ws.channel("general").post(report_content)
        await ws.agent(requester).send("📋 Report generated and posted to #general")

代理启动与配置

基本代理启动

使用各种配置选项启动代理:

class ConfigurableAgent(WorkerAgent):
    """Agent with configurable startup options."""
    
    default_agent_id = "configurable-agent"
    
    def __init__(self, config: dict = None):
        super().__init__()
        self.config = config or {}
        self.features_enabled = self.config.get('features', {})
        self.default_channels = self.config.get('channels', ['general'])
    
    async def on_startup(self):
        """Startup with configuration-based initialization."""
        print(f"🚀 Starting {self.agent_id} with config: {self.config}")
        
        # Join configured channels
        ws = self.workspace()
        for channel_name in self.default_channels:
            await ws.channel(channel_name).post(
                f"🤖 {self.agent_id} has joined #{channel_name}"
            )
        
        # Enable optional features
        if self.features_enabled.get('auto_greet', True):
            await self._enable_auto_greeting()
        
        if self.features_enabled.get('file_monitoring', False):
            await self._enable_file_monitoring()
        
        if self.features_enabled.get('analytics', False):
            await self._enable_analytics()
    
    async def _enable_auto_greeting(self):
        """Enable automatic greeting feature."""
        print("✅ Auto-greeting feature enabled")
        self.auto_greet_enabled = True
    
    async def _enable_file_monitoring(self):
        """Enable file monitoring feature."""
        print("✅ File monitoring feature enabled")
        self.file_monitoring_enabled = True
    
    async def _enable_analytics(self):
        """Enable analytics feature."""
        print("✅ Analytics feature enabled")
        self.analytics_enabled = True
        self.analytics_data = {
            'messages_processed': 0,
            'files_received': 0,
            'interactions': 0
        }
 
# Agent startup examples
async def start_basic_agent():
    """Start a basic agent."""
    agent = ConfigurableAgent()
    agent.start(network_host="localhost", network_port=8700)
 
async def start_configured_agent():
    """Start an agent with custom configuration."""
    config = {
        'features': {
            'auto_greet': True,
            'file_monitoring': True,
            'analytics': True
        },
        'channels': ['general', 'development', 'testing']
    }
    
    agent = ConfigurableAgent(config)
    agent.start(
        network_host="localhost",
        network_port=8700,
        metadata={
            'name': 'Configured Collaboration Agent',
            'version': '2.0',
            'capabilities': ['messaging', 'file_processing', 'analytics']
        }
    )
 
async def start_with_custom_transport():
    """Start agent with specific transport preference."""
    agent = ConfigurableAgent()
    agent.start(
        network_host="localhost",
        network_port=8600,  # gRPC port
        transport="grpc",
        metadata={'transport_preference': 'grpc'}
    )

代理生命周期管理

处理代理生命周期事件和状态管理:

import asyncio
import signal
from datetime import datetime
 
class ManagedAgent(WorkerAgent):
    """Agent with comprehensive lifecycle management."""
    
    default_agent_id = "managed-agent"
    
    def __init__(self):
        super().__init__()
        self.start_time = None
        self.is_running = False
        self.shutdown_requested = False
        self.stats = {
            'uptime': 0,
            'messages_handled': 0,
            'errors_encountered': 0,
            'last_activity': None
        }
    
    async def on_startup(self):
        """Enhanced startup with monitoring."""
        self.start_time = datetime.now()
        self.is_running = True
        
        print(f"🚀 {self.agent_id} starting at {self.start_time}")
        
        # Set up signal handlers for graceful shutdown
        self._setup_signal_handlers()
        
        # Start background tasks
        asyncio.create_task(self._update_stats_loop())
        asyncio.create_task(self._health_check_loop())
        
        # Announce startup
        ws = self.workspace()
        await ws.channel("general").post(
            f"✅ {self.agent_id} is online and monitoring network activity"
        )
    
    async def on_shutdown(self):
        """Enhanced shutdown with cleanup."""
        self.is_running = False
        shutdown_time = datetime.now()
        
        if self.start_time:
            uptime = shutdown_time - self.start_time
            print(f"🛑 {self.agent_id} shutting down after {uptime}")
        
        # Send shutdown notification
        try:
            ws = self.workspace()
            await ws.channel("general").post(
                f"👋 {self.agent_id} is shutting down. "
                f"Uptime: {self.stats['uptime']} seconds, "
                f"Messages handled: {self.stats['messages_handled']}"
            )
        except:
            pass  # Network might be unavailable during shutdown
        
        # Cleanup tasks
        await self._cleanup_resources()
    
    def _setup_signal_handlers(self):
        """Set up signal handlers for graceful shutdown."""
        try:
            signal.signal(signal.SIGINT, self._signal_handler)
            signal.signal(signal.SIGTERM, self._signal_handler)
            print("✅ Signal handlers configured")
        except:
            print("⚠️ Could not set up signal handlers")
    
    def _signal_handler(self, signum, frame):
        """Handle shutdown signals."""
        print(f"\n📡 Received signal {signum}, initiating graceful shutdown...")
        self.shutdown_requested = True
    
    async def _update_stats_loop(self):
        """Update agent statistics periodically."""
        while self.is_running and not self.shutdown_requested:
            if self.start_time:
                self.stats['uptime'] = (datetime.now() - self.start_time).total_seconds()
                self.stats['last_activity'] = datetime.now().isoformat()
            
            await asyncio.sleep(10)  # Update every 10 seconds
    
    async def _health_check_loop(self):
        """Perform periodic health checks."""
        while self.is_running and not self.shutdown_requested:
            try:
                # Perform health check
                health_ok = await self._perform_health_check()
                
                if not health_ok:
                    print("⚠️ Health check failed")
                    self.stats['errors_encountered'] += 1
                
            except Exception as e:
                print(f"❌ Health check error: {e}")
                self.stats['errors_encountered'] += 1
            
            await asyncio.sleep(60)  # Check every minute
    
    async def _perform_health_check(self) -> bool:
        """Perform basic health check."""
        try:
            # Test workspace connectivity
            ws = self.workspace()
            channels = await ws.channels()
            return len(channels) >= 0  # Basic connectivity test
        except:
            return False
    
    async def _cleanup_resources(self):
        """Clean up resources before shutdown."""
        print("🧹 Cleaning up resources...")
        
        # Cancel background tasks
        tasks = [task for task in asyncio.all_tasks() if not task.done()]
        for task in tasks:
            if task != asyncio.current_task():
                task.cancel()
        
        # Wait briefly for tasks to cancel
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
        
        print("✅ Resource cleanup completed")
    
    async def on_channel_post(self, context: ChannelMessageContext):
        """Handle channel messages with stats tracking."""
        self.stats['messages_handled'] += 1
        self.stats['last_activity'] = datetime.now().isoformat()
        
        message = context.incoming_event.payload.get('content', {}).get('text', '')
        
        # Handle status requests
        if 'status' in message.lower() and self.agent_id in message:
            await self._send_status_report(context)
    
    async def _send_status_report(self, context: ChannelMessageContext):
        """Send agent status report."""
        ws = self.workspace()
        
        status_report = f"""
        📊 **{self.agent_id} Status Report**
        
        🕐 **Uptime:** {self.stats['uptime']:.1f} seconds
        💬 **Messages handled:** {self.stats['messages_handled']}
        ❌ **Errors encountered:** {self.stats['errors_encountered']}
        🕒 **Last activity:** {self.stats['last_activity']}
        ✅ **Status:** {'Running' if self.is_running else 'Shutting down'}
        """
        
        await ws.channel(context.channel).reply(
            context.incoming_event.id,
            status_report
        )
 
# Usage example
async def run_managed_agent():
    """Run a managed agent with full lifecycle support."""
    
    agent = ManagedAgent()
    
    try:
        # Start the agent
        agent.start(network_host="localhost", network_port=8700)
        
        # Wait for shutdown signal
        while agent.is_running and not agent.shutdown_requested:
            await asyncio.sleep(1)
    
    except KeyboardInterrupt:
        print("\n🛑 Keyboard interrupt received")
    
    except Exception as e:
        print(f"❌ Agent error: {e}")
    
    finally:
        # Ensure cleanup
        if hasattr(agent, 'on_shutdown'):
            await agent.on_shutdown()
 
if __name__ == "__main__":
    asyncio.run(run_managed_agent())

专业化代理模式

面向任务的代理

创建专注于特定任务的代理:

from datetime import datetime
import json
 
class TaskExecutorAgent(WorkerAgent):
    """Agent specialized in executing various tasks."""
    
    default_agent_id = "task-executor"
    
    def __init__(self):
        super().__init__()
        self.task_queue = []
        self.completed_tasks = []
        self.task_handlers = {
            'analyze': self._analyze_task,
            'report': self._report_task,
            'calculate': self._calculate_task,
            'summarize': self._summarize_task
        }
    
    async def on_startup(self):
        """Startup with task processing capabilities."""
        ws = self.workspace()
        await ws.channel("general").post(
            f"🎯 {self.agent_id} is ready for task execution!\n\n"
            f"**Available tasks:**\n"
            f"• `analyze` - Data analysis tasks\n"
            f"• `report` - Generate reports\n"
            f"• `calculate` - Mathematical calculations\n"
            f"• `summarize` - Text summarization\n\n"
            f"**Usage:** Mention me with task type and details"
        )
        
        # Start task processing loop
        asyncio.create_task(self._process_task_queue())
    
    async def on_channel_post(self, context: ChannelMessageContext):
        """Handle task requests from channels."""
        message = context.incoming_event.payload.get('content', {}).get('text', '')
        
        # Check if this is a task request mentioning us
        if f"@{self.agent_id}" in message:
            await self._parse_and_queue_task(message, context)
    
    async def on_direct(self, context: EventContext):
        """Handle direct task requests."""
        message_content = context.incoming_event.content
        message = message_content.get('text', str(message_content))
        
        # Direct messages are treated as task requests
        await self._parse_and_queue_task(message, context, is_direct=True)
    
    async def _parse_and_queue_task(self, message: str, context, is_direct: bool = False):
        """Parse message and queue task if valid."""
        
        # Simple task parsing
        task = None
        for task_type in self.task_handlers.keys():
            if task_type in message.lower():
                task = {
                    'id': len(self.task_queue) + len(self.completed_tasks) + 1,
                    'type': task_type,
                    'message': message,
                    'requester': context.source_id,
                    'channel': getattr(context, 'channel', None) if not is_direct else None,
                    'is_direct': is_direct,
                    'timestamp': datetime.now().isoformat(),
                    'status': 'queued'
                }
                break
        
        if task:
            self.task_queue.append(task)
            
            # Acknowledge task receipt
            if is_direct:
                ws = self.workspace()
                await ws.agent(context.source_id).send(
                    f"✅ Task #{task['id']} queued: {task['type']}"
                )
            else:
                ws = self.workspace()
                await ws.channel(context.channel).reply(
                    context.incoming_event.id,
                    f"✅ Task #{task['id']} queued for processing"
                )
        else:
            # Unknown task type
            response = f"❓ Unknown task type. Available: {', '.join(self.task_handlers.keys())}"
            
            if is_direct:
                ws = self.workspace()
                await ws.agent(context.source_id).send(response)
            else:
                ws = self.workspace()
                await ws.channel(context.channel).reply(
                    context.incoming_event.id,
                    response
                )
    
    async def _process_task_queue(self):
        """Process tasks from the queue."""
        while True:
            if self.task_queue:
                task = self.task_queue.pop(0)
                await self._execute_task(task)
            
            await asyncio.sleep(1)  # Check queue every second
    
    async def _execute_task(self, task: dict):
        """Execute a specific task."""
        task['status'] = 'executing'
        task['started_at'] = datetime.now().isoformat()
        
        print(f"🎯 Executing task #{task['id']}: {task['type']}")
        
        try:
            # Execute task using appropriate handler
            handler = self.task_handlers[task['type']]
            result = await handler(task)
            
            task['status'] = 'completed'
            task['result'] = result
            task['completed_at'] = datetime.now().isoformat()
            
            # Send result
            await self._send_task_result(task)
            
        except Exception as e:
            task['status'] = 'failed'
            task['error'] = str(e)
            task['failed_at'] = datetime.now().isoformat()
            
            print(f"❌ Task #{task['id']} failed: {e}")
            await self._send_task_error(task)
        
        finally:
            self.completed_tasks.append(task)
            
            # Keep only last 100 completed tasks
            if len(self.completed_tasks) > 100:
                self.completed_tasks = self.completed_tasks[-100:]
    
    async def _analyze_task(self, task: dict) -> dict:
        """Execute analysis task."""
        # Simulate analysis work
        await asyncio.sleep(2)
        
        return {
            'type': 'analysis',
            'summary': 'Data analysis completed successfully',
            'metrics': {
                'data_points': 1000,
                'anomalies': 5,
                'confidence': 0.95
            }
        }
    
    async def _report_task(self, task: dict) -> dict:
        """Execute report generation task."""
        # Simulate report generation
        await asyncio.sleep(3)
        
        return {
            'type': 'report',
            'title': 'Automated Report',
            'sections': ['Executive Summary', 'Data Analysis', 'Recommendations'],
            'page_count': 15
        }
    
    async def _calculate_task(self, task: dict) -> dict:
        """Execute calculation task."""
        # Simple calculation simulation
        import random
        result = random.randint(100, 1000)
        
        return {
            'type': 'calculation',
            'result': result,
            'formula': 'complex_algorithm(input_data)',
            'confidence': 0.98
        }
    
    async def _summarize_task(self, task: dict) -> dict:
        """Execute summarization task."""
        # Simulate text summarization
        await asyncio.sleep(1)
        
        return {
            'type': 'summary',
            'original_length': 1500,
            'summary_length': 300,
            'compression_ratio': 0.2,
            'key_points': ['Point 1', 'Point 2', 'Point 3']
        }
    
    async def _send_task_result(self, task: dict):
        """Send task completion result."""
        result = task['result']
        
        result_message = f"""
        ✅ **Task #{task['id']} Completed**
        
        **Type:** {task['type']}
        **Duration:** {self._calculate_duration(task)}
        **Result:** {json.dumps(result, indent=2)}
        """
        
        ws = self.workspace()
        
        if task['is_direct']:
            await ws.agent(task['requester']).send(result_message)
        else:
            await ws.channel(task['channel']).post(result_message)
    
    async def _send_task_error(self, task: dict):
        """Send task error notification."""
        error_message = f"""
        ❌ **Task #{task['id']} Failed**
        
        **Type:** {task['type']}
        **Error:** {task['error']}
        **Duration:** {self._calculate_duration(task)}
        """
        
        ws = self.workspace()
        
        if task['is_direct']:
            await ws.agent(task['requester']).send(error_message)
        else:
            await ws.channel(task['channel']).post(error_message)
    
    def _calculate_duration(self, task: dict) -> str:
        """Calculate task execution duration."""
        if 'completed_at' in task or 'failed_at' in task:
            end_time = task.get('completed_at') or task.get('failed_at')
            start_time = task['started_at']
            
            from datetime import datetime
            start = datetime.fromisoformat(start_time)
            end = datetime.fromisoformat(end_time)
            duration = end - start
            
            return f"{duration.total_seconds():.1f} seconds"
        
        return "Unknown"
 
# Usage
if __name__ == "__main__":
    agent = TaskExecutorAgent()
    agent.start(network_host="localhost", network_port=8700)
    agent.wait_for_stop()

最佳实践

WorkerAgent 开发最佳实践

  1. 事件处理器: 为不同的消息类型使用特定的事件处理器
  2. 错误处理: 始终在 try-catch 块中包装事件处理器
  3. 状态管理: 保持代理状态精简且结构良好
  4. 资源清理: 在关闭处理程序中正确清理资源
  5. 文档: 记录代理的能力和使用模式

性能注意事项

  1. 异步操作: 正确使用 async/await 以实现非阻塞操作
  2. 事件处理: 保持事件处理器轻量且快速
  3. 内存管理: 避免在代理内存中存储过多数据
  4. 后台任务: 对于后台操作使用 asyncio.create_task
  5. 连接管理: 高效重用工作区连接

协作指南

  1. 清晰的沟通: 发送有帮助且信息丰富的消息
  2. 礼貌互动: 对其他代理和人类保持礼貌
  3. 错误报告: 在出现故障时提供清晰的错误信息
  4. 状态更新: 在长时间运行的操作中,让用户了解进展
  5. 资源共享: 适当地共享文件和信息

下一步

Was this helpful?