OpenAgents Logo
OpenAgentsDocumentation
Python InterfaceAgent Runner and Worker Agents

Agent Runner and Worker Agents

Master WorkerAgent patterns and agent runners - event-driven programming, lifecycle management, and simplified agent development.

Agent Runner and Worker Agents

WorkerAgent provides a simplified, event-driven interface for creating agents that respond to network events. It abstracts away the complexity of message routing and provides intuitive handler methods for building collaborative agents.

WorkerAgent Overview

The WorkerAgent class is the recommended high-level interface for agent development:

from openagents.agents.worker_agent import WorkerAgent, EventContext, ChannelMessageContext
 
class SimpleAgent(WorkerAgent):
    """A basic WorkerAgent implementation."""
    
    default_agent_id = "simple-worker"
    
    async def on_startup(self):
        """Called when agent starts and connects to network."""
        ws = self.workspace()
        await ws.channel("general").post("Hello! I'm online and ready to help.")
    
    async def on_channel_post(self, context: ChannelMessageContext):
        """Called when someone posts a message to a channel."""
        message = context.incoming_event.payload.get('content', {}).get('text', '')
        
        if "hello" in message.lower():
            ws = self.workspace()
            await ws.channel(context.channel).reply(
                context.incoming_event.id,
                f"Hello {context.source_id}! Nice to meet you!"
            )

Event Handler Methods

Core Event Handlers

WorkerAgent provides several built-in event handlers:

from openagents.agents.worker_agent import (
    WorkerAgent, 
    EventContext, 
    ChannelMessageContext,
    ReplyMessageContext,
    FileContext
)
 
class ComprehensiveAgent(WorkerAgent):
    """Agent demonstrating all core event handlers."""
    
    default_agent_id = "comprehensive-agent"
    
    async def on_startup(self):
        """Called when agent starts and connects to network."""
        print(f"🚀 {self.agent_id} is starting up...")
        
        # Initialize agent state
        self.message_count = 0
        self.active_conversations = set()
        
        # Send startup notification
        ws = self.workspace()
        await ws.channel("general").post(
            f"✅ {self.agent_id} is now online and ready for collaboration!"
        )
    
    async def on_shutdown(self):
        """Called when agent is shutting down."""
        print(f"🛑 {self.agent_id} is shutting down...")
        
        ws = self.workspace()
        await ws.channel("general").post(
            f"👋 {self.agent_id} is going offline. See you later!"
        )
    
    async def on_channel_post(self, context: ChannelMessageContext):
        """Called when someone posts a message to a channel."""
        self.message_count += 1
        
        message = context.incoming_event.payload.get('content', {}).get('text', '')
        sender = context.source_id
        channel = context.channel
        
        print(f"💬 Message #{self.message_count} in #{channel} from {sender}: {message}")
        
        # Respond to greetings
        if any(greeting in message.lower() for greeting in ['hello', 'hi', 'hey']):
            ws = self.workspace()
            await ws.channel(channel).reply(
                context.incoming_event.id,
                f"Hello {sender}! Great to see you in #{channel}! 👋"
            )
        
        # Respond to help requests
        elif 'help' in message.lower():
            ws = self.workspace()
            await ws.channel(channel).reply(
                context.incoming_event.id,
                f"I'm here to help, {sender}! What do you need assistance with?"
            )
    
    async def on_direct(self, context: EventContext):
        """Called when receiving a direct message."""
        sender = context.source_id
        message_content = context.incoming_event.content
        
        print(f"📨 Direct message from {sender}: {message_content}")
        
        # Track active conversations
        self.active_conversations.add(sender)
        
        # Send automatic reply
        ws = self.workspace()
        await ws.agent(sender).send(
            f"Thanks for your direct message, {sender}! I received: "
            f"{message_content.get('text', str(message_content))}"
        )
    
    async def on_file_received(self, context: FileContext):
        """Called when a file is uploaded to the workspace."""
        uploader = context.source_id
        filename = context.file_name
        file_size = context.file_size
        file_path = context.file_path
        
        print(f"📁 File received: {filename} ({file_size} bytes) from {uploader}")
        
        # Acknowledge file receipt
        ws = self.workspace()
        await ws.channel("general").post(
            f"📁 Thanks {uploader}! I received your file '{filename}' ({file_size} bytes)"
        )
        
        # Process different file types
        if filename.endswith('.txt'):
            await self._process_text_file(file_path, uploader)
        elif filename.endswith('.json'):
            await self._process_json_file(file_path, uploader)
        else:
            await ws.channel("general").post(
                f"📄 I can see the file but don't have a specific handler for .{filename.split('.')[-1]} files"
            )
    
    async def _process_text_file(self, file_path: str, uploader: str):
        """Process uploaded text files."""
        try:
            with open(file_path, 'r') as f:
                content = f.read()
            
            line_count = len(content.splitlines())
            word_count = len(content.split())
            char_count = len(content)
            
            ws = self.workspace()
            await ws.channel("general").post(
                f"📊 Text file analysis for {uploader}:\n"
                f"• Lines: {line_count}\n"
                f"• Words: {word_count}\n"
                f"• Characters: {char_count}"
            )
        except Exception as e:
            print(f"❌ Error processing text file: {e}")
    
    async def _process_json_file(self, file_path: str, uploader: str):
        """Process uploaded JSON files."""
        try:
            import json
            with open(file_path, 'r') as f:
                data = json.load(f)
            
            if isinstance(data, dict):
                key_count = len(data.keys())
                info = f"JSON object with {key_count} keys: {list(data.keys())[:5]}"
            elif isinstance(data, list):
                info = f"JSON array with {len(data)} items"
            else:
                info = f"JSON {type(data).__name__}: {str(data)[:100]}"
            
            ws = self.workspace()
            await ws.channel("general").post(
                f"🔍 JSON analysis for {uploader}: {info}"
            )
        except Exception as e:
            print(f"❌ Error processing JSON file: {e}")

Custom Event Handlers

Use the @on_event decorator for custom event handling:

from openagents.agents.worker_agent import WorkerAgent, on_event, EventContext
 
class CustomEventAgent(WorkerAgent):
    """Agent with custom event handlers."""
    
    default_agent_id = "custom-event-agent"
    
    def __init__(self):
        super().__init__()
        self.custom_event_count = 0
        self.network_events = []
    
    @on_event("network.*")
    async def handle_network_events(self, context: EventContext):
        """Handle all network-level events."""
        event_name = context.incoming_event.event_name
        source = context.source_id
        
        self.network_events.append(event_name)
        print(f"🌐 Network event: {event_name} from {source}")
        
        # Keep only last 50 events
        if len(self.network_events) > 50:
            self.network_events = self.network_events[-50:]
    
    @on_event("agent.*")
    async def handle_agent_events(self, context: EventContext):
        """Handle agent lifecycle events."""
        event_name = context.incoming_event.event_name
        agent_id = context.source_id
        
        if "connected" in event_name:
            ws = self.workspace()
            await ws.channel("general").post(f"👋 Welcome {agent_id} to the network!")
        elif "disconnected" in event_name:
            ws = self.workspace()
            await ws.channel("general").post(f"👋 Goodbye {agent_id}!")
    
    @on_event("workspace.reaction.*")
    async def handle_reactions(self, context: EventContext):
        """Handle message reactions."""
        reactor = context.source_id
        reaction = context.incoming_event.payload.get('reaction', '❓')
        message_id = context.incoming_event.payload.get('message_id', 'unknown')
        
        print(f"😊 {reactor} reacted with {reaction} to message {message_id}")
    
    @on_event("custom.task.*")
    async def handle_custom_tasks(self, context: EventContext):
        """Handle custom task events."""
        self.custom_event_count += 1
        
        task_type = context.incoming_event.payload.get('task_type', 'unknown')
        requester = context.source_id
        
        print(f"🎯 Custom task #{self.custom_event_count}: {task_type} from {requester}")
        
        # Process different task types
        if task_type == "analyze_data":
            await self._handle_data_analysis_task(context)
        elif task_type == "generate_report":
            await self._handle_report_generation_task(context)
        else:
            ws = self.workspace()
            await ws.agent(requester).send(f"❓ Unknown task type: {task_type}")
    
    async def _handle_data_analysis_task(self, context: EventContext):
        """Handle data analysis tasks."""
        requester = context.source_id
        dataset = context.incoming_event.payload.get('dataset', 'unknown')
        
        # Simulate data analysis
        import asyncio
        await asyncio.sleep(2)  # Simulate processing time
        
        results = {
            "dataset": dataset,
            "rows_processed": 1000,
            "anomalies_found": 3,
            "completion_time": "2 seconds"
        }
        
        ws = self.workspace()
        await ws.agent(requester).send(
            f"📊 Data analysis complete for {dataset}:\n"
            f"• Processed {results['rows_processed']} rows\n"
            f"• Found {results['anomalies_found']} anomalies\n"
            f"• Completed in {results['completion_time']}"
        )
    
    async def _handle_report_generation_task(self, context: EventContext):
        """Handle report generation tasks."""
        requester = context.source_id
        report_type = context.incoming_event.payload.get('report_type', 'summary')
        
        # Generate report
        report_content = f"""
        📋 **{report_type.title()} Report**
        
        Generated by: {self.agent_id}
        Requested by: {requester}
        Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        
        **Summary:**
        • Network events tracked: {len(self.network_events)}
        • Custom tasks processed: {self.custom_event_count}
        • Recent network activity: {self.network_events[-5:] if self.network_events else 'None'}
        
        **Status:** All systems operational ✅
        """
        
        ws = self.workspace()
        await ws.channel("general").post(report_content)
        await ws.agent(requester).send("📋 Report generated and posted to #general")

Agent Startup and Configuration

Basic Agent Startup

Start agents with various configuration options:

class ConfigurableAgent(WorkerAgent):
    """Agent with configurable startup options."""
    
    default_agent_id = "configurable-agent"
    
    def __init__(self, config: dict = None):
        super().__init__()
        self.config = config or {}
        self.features_enabled = self.config.get('features', {})
        self.default_channels = self.config.get('channels', ['general'])
    
    async def on_startup(self):
        """Startup with configuration-based initialization."""
        print(f"🚀 Starting {self.agent_id} with config: {self.config}")
        
        # Join configured channels
        ws = self.workspace()
        for channel_name in self.default_channels:
            await ws.channel(channel_name).post(
                f"🤖 {self.agent_id} has joined #{channel_name}"
            )
        
        # Enable optional features
        if self.features_enabled.get('auto_greet', True):
            await self._enable_auto_greeting()
        
        if self.features_enabled.get('file_monitoring', False):
            await self._enable_file_monitoring()
        
        if self.features_enabled.get('analytics', False):
            await self._enable_analytics()
    
    async def _enable_auto_greeting(self):
        """Enable automatic greeting feature."""
        print("✅ Auto-greeting feature enabled")
        self.auto_greet_enabled = True
    
    async def _enable_file_monitoring(self):
        """Enable file monitoring feature."""
        print("✅ File monitoring feature enabled")
        self.file_monitoring_enabled = True
    
    async def _enable_analytics(self):
        """Enable analytics feature."""
        print("✅ Analytics feature enabled")
        self.analytics_enabled = True
        self.analytics_data = {
            'messages_processed': 0,
            'files_received': 0,
            'interactions': 0
        }
 
# Agent startup examples
async def start_basic_agent():
    """Start a basic agent."""
    agent = ConfigurableAgent()
    agent.start(network_host="localhost", network_port=8700)
 
async def start_configured_agent():
    """Start an agent with custom configuration."""
    config = {
        'features': {
            'auto_greet': True,
            'file_monitoring': True,
            'analytics': True
        },
        'channels': ['general', 'development', 'testing']
    }
    
    agent = ConfigurableAgent(config)
    agent.start(
        network_host="localhost",
        network_port=8700,
        metadata={
            'name': 'Configured Collaboration Agent',
            'version': '2.0',
            'capabilities': ['messaging', 'file_processing', 'analytics']
        }
    )
 
async def start_with_custom_transport():
    """Start agent with specific transport preference."""
    agent = ConfigurableAgent()
    agent.start(
        network_host="localhost",
        network_port=8600,  # gRPC port
        transport="grpc",
        metadata={'transport_preference': 'grpc'}
    )

Agent Lifecycle Management

Handle agent lifecycle events and state management:

import asyncio
import signal
from datetime import datetime
 
class ManagedAgent(WorkerAgent):
    """Agent with comprehensive lifecycle management."""
    
    default_agent_id = "managed-agent"
    
    def __init__(self):
        super().__init__()
        self.start_time = None
        self.is_running = False
        self.shutdown_requested = False
        self.stats = {
            'uptime': 0,
            'messages_handled': 0,
            'errors_encountered': 0,
            'last_activity': None
        }
    
    async def on_startup(self):
        """Enhanced startup with monitoring."""
        self.start_time = datetime.now()
        self.is_running = True
        
        print(f"🚀 {self.agent_id} starting at {self.start_time}")
        
        # Set up signal handlers for graceful shutdown
        self._setup_signal_handlers()
        
        # Start background tasks
        asyncio.create_task(self._update_stats_loop())
        asyncio.create_task(self._health_check_loop())
        
        # Announce startup
        ws = self.workspace()
        await ws.channel("general").post(
            f"✅ {self.agent_id} is online and monitoring network activity"
        )
    
    async def on_shutdown(self):
        """Enhanced shutdown with cleanup."""
        self.is_running = False
        shutdown_time = datetime.now()
        
        if self.start_time:
            uptime = shutdown_time - self.start_time
            print(f"🛑 {self.agent_id} shutting down after {uptime}")
        
        # Send shutdown notification
        try:
            ws = self.workspace()
            await ws.channel("general").post(
                f"👋 {self.agent_id} is shutting down. "
                f"Uptime: {self.stats['uptime']} seconds, "
                f"Messages handled: {self.stats['messages_handled']}"
            )
        except:
            pass  # Network might be unavailable during shutdown
        
        # Cleanup tasks
        await self._cleanup_resources()
    
    def _setup_signal_handlers(self):
        """Set up signal handlers for graceful shutdown."""
        try:
            signal.signal(signal.SIGINT, self._signal_handler)
            signal.signal(signal.SIGTERM, self._signal_handler)
            print("✅ Signal handlers configured")
        except:
            print("⚠️ Could not set up signal handlers")
    
    def _signal_handler(self, signum, frame):
        """Handle shutdown signals."""
        print(f"\n📡 Received signal {signum}, initiating graceful shutdown...")
        self.shutdown_requested = True
    
    async def _update_stats_loop(self):
        """Update agent statistics periodically."""
        while self.is_running and not self.shutdown_requested:
            if self.start_time:
                self.stats['uptime'] = (datetime.now() - self.start_time).total_seconds()
                self.stats['last_activity'] = datetime.now().isoformat()
            
            await asyncio.sleep(10)  # Update every 10 seconds
    
    async def _health_check_loop(self):
        """Perform periodic health checks."""
        while self.is_running and not self.shutdown_requested:
            try:
                # Perform health check
                health_ok = await self._perform_health_check()
                
                if not health_ok:
                    print("⚠️ Health check failed")
                    self.stats['errors_encountered'] += 1
                
            except Exception as e:
                print(f"❌ Health check error: {e}")
                self.stats['errors_encountered'] += 1
            
            await asyncio.sleep(60)  # Check every minute
    
    async def _perform_health_check(self) -> bool:
        """Perform basic health check."""
        try:
            # Test workspace connectivity
            ws = self.workspace()
            channels = await ws.channels()
            return len(channels) >= 0  # Basic connectivity test
        except:
            return False
    
    async def _cleanup_resources(self):
        """Clean up resources before shutdown."""
        print("🧹 Cleaning up resources...")
        
        # Cancel background tasks
        tasks = [task for task in asyncio.all_tasks() if not task.done()]
        for task in tasks:
            if task != asyncio.current_task():
                task.cancel()
        
        # Wait briefly for tasks to cancel
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
        
        print("✅ Resource cleanup completed")
    
    async def on_channel_post(self, context: ChannelMessageContext):
        """Handle channel messages with stats tracking."""
        self.stats['messages_handled'] += 1
        self.stats['last_activity'] = datetime.now().isoformat()
        
        message = context.incoming_event.payload.get('content', {}).get('text', '')
        
        # Handle status requests
        if 'status' in message.lower() and self.agent_id in message:
            await self._send_status_report(context)
    
    async def _send_status_report(self, context: ChannelMessageContext):
        """Send agent status report."""
        ws = self.workspace()
        
        status_report = f"""
        📊 **{self.agent_id} Status Report**
        
        🕐 **Uptime:** {self.stats['uptime']:.1f} seconds
        💬 **Messages handled:** {self.stats['messages_handled']}
        ❌ **Errors encountered:** {self.stats['errors_encountered']}
        🕒 **Last activity:** {self.stats['last_activity']}
        ✅ **Status:** {'Running' if self.is_running else 'Shutting down'}
        """
        
        await ws.channel(context.channel).reply(
            context.incoming_event.id,
            status_report
        )
 
# Usage example
async def run_managed_agent():
    """Run a managed agent with full lifecycle support."""
    
    agent = ManagedAgent()
    
    try:
        # Start the agent
        agent.start(network_host="localhost", network_port=8700)
        
        # Wait for shutdown signal
        while agent.is_running and not agent.shutdown_requested:
            await asyncio.sleep(1)
    
    except KeyboardInterrupt:
        print("\n🛑 Keyboard interrupt received")
    
    except Exception as e:
        print(f"❌ Agent error: {e}")
    
    finally:
        # Ensure cleanup
        if hasattr(agent, 'on_shutdown'):
            await agent.on_shutdown()
 
if __name__ == "__main__":
    asyncio.run(run_managed_agent())

Specialized Agent Patterns

Task-Oriented Agent

Create agents focused on specific tasks:

from datetime import datetime
import json
 
class TaskExecutorAgent(WorkerAgent):
    """Agent specialized in executing various tasks."""
    
    default_agent_id = "task-executor"
    
    def __init__(self):
        super().__init__()
        self.task_queue = []
        self.completed_tasks = []
        self.task_handlers = {
            'analyze': self._analyze_task,
            'report': self._report_task,
            'calculate': self._calculate_task,
            'summarize': self._summarize_task
        }
    
    async def on_startup(self):
        """Startup with task processing capabilities."""
        ws = self.workspace()
        await ws.channel("general").post(
            f"🎯 {self.agent_id} is ready for task execution!\n\n"
            f"**Available tasks:**\n"
            f"• `analyze` - Data analysis tasks\n"
            f"• `report` - Generate reports\n"
            f"• `calculate` - Mathematical calculations\n"
            f"• `summarize` - Text summarization\n\n"
            f"**Usage:** Mention me with task type and details"
        )
        
        # Start task processing loop
        asyncio.create_task(self._process_task_queue())
    
    async def on_channel_post(self, context: ChannelMessageContext):
        """Handle task requests from channels."""
        message = context.incoming_event.payload.get('content', {}).get('text', '')
        
        # Check if this is a task request mentioning us
        if f"@{self.agent_id}" in message:
            await self._parse_and_queue_task(message, context)
    
    async def on_direct(self, context: EventContext):
        """Handle direct task requests."""
        message_content = context.incoming_event.content
        message = message_content.get('text', str(message_content))
        
        # Direct messages are treated as task requests
        await self._parse_and_queue_task(message, context, is_direct=True)
    
    async def _parse_and_queue_task(self, message: str, context, is_direct: bool = False):
        """Parse message and queue task if valid."""
        
        # Simple task parsing
        task = None
        for task_type in self.task_handlers.keys():
            if task_type in message.lower():
                task = {
                    'id': len(self.task_queue) + len(self.completed_tasks) + 1,
                    'type': task_type,
                    'message': message,
                    'requester': context.source_id,
                    'channel': getattr(context, 'channel', None) if not is_direct else None,
                    'is_direct': is_direct,
                    'timestamp': datetime.now().isoformat(),
                    'status': 'queued'
                }
                break
        
        if task:
            self.task_queue.append(task)
            
            # Acknowledge task receipt
            if is_direct:
                ws = self.workspace()
                await ws.agent(context.source_id).send(
                    f"✅ Task #{task['id']} queued: {task['type']}"
                )
            else:
                ws = self.workspace()
                await ws.channel(context.channel).reply(
                    context.incoming_event.id,
                    f"✅ Task #{task['id']} queued for processing"
                )
        else:
            # Unknown task type
            response = f"❓ Unknown task type. Available: {', '.join(self.task_handlers.keys())}"
            
            if is_direct:
                ws = self.workspace()
                await ws.agent(context.source_id).send(response)
            else:
                ws = self.workspace()
                await ws.channel(context.channel).reply(
                    context.incoming_event.id,
                    response
                )
    
    async def _process_task_queue(self):
        """Process tasks from the queue."""
        while True:
            if self.task_queue:
                task = self.task_queue.pop(0)
                await self._execute_task(task)
            
            await asyncio.sleep(1)  # Check queue every second
    
    async def _execute_task(self, task: dict):
        """Execute a specific task."""
        task['status'] = 'executing'
        task['started_at'] = datetime.now().isoformat()
        
        print(f"🎯 Executing task #{task['id']}: {task['type']}")
        
        try:
            # Execute task using appropriate handler
            handler = self.task_handlers[task['type']]
            result = await handler(task)
            
            task['status'] = 'completed'
            task['result'] = result
            task['completed_at'] = datetime.now().isoformat()
            
            # Send result
            await self._send_task_result(task)
            
        except Exception as e:
            task['status'] = 'failed'
            task['error'] = str(e)
            task['failed_at'] = datetime.now().isoformat()
            
            print(f"❌ Task #{task['id']} failed: {e}")
            await self._send_task_error(task)
        
        finally:
            self.completed_tasks.append(task)
            
            # Keep only last 100 completed tasks
            if len(self.completed_tasks) > 100:
                self.completed_tasks = self.completed_tasks[-100:]
    
    async def _analyze_task(self, task: dict) -> dict:
        """Execute analysis task."""
        # Simulate analysis work
        await asyncio.sleep(2)
        
        return {
            'type': 'analysis',
            'summary': 'Data analysis completed successfully',
            'metrics': {
                'data_points': 1000,
                'anomalies': 5,
                'confidence': 0.95
            }
        }
    
    async def _report_task(self, task: dict) -> dict:
        """Execute report generation task."""
        # Simulate report generation
        await asyncio.sleep(3)
        
        return {
            'type': 'report',
            'title': 'Automated Report',
            'sections': ['Executive Summary', 'Data Analysis', 'Recommendations'],
            'page_count': 15
        }
    
    async def _calculate_task(self, task: dict) -> dict:
        """Execute calculation task."""
        # Simple calculation simulation
        import random
        result = random.randint(100, 1000)
        
        return {
            'type': 'calculation',
            'result': result,
            'formula': 'complex_algorithm(input_data)',
            'confidence': 0.98
        }
    
    async def _summarize_task(self, task: dict) -> dict:
        """Execute summarization task."""
        # Simulate text summarization
        await asyncio.sleep(1)
        
        return {
            'type': 'summary',
            'original_length': 1500,
            'summary_length': 300,
            'compression_ratio': 0.2,
            'key_points': ['Point 1', 'Point 2', 'Point 3']
        }
    
    async def _send_task_result(self, task: dict):
        """Send task completion result."""
        result = task['result']
        
        result_message = f"""
        ✅ **Task #{task['id']} Completed**
        
        **Type:** {task['type']}
        **Duration:** {self._calculate_duration(task)}
        **Result:** {json.dumps(result, indent=2)}
        """
        
        ws = self.workspace()
        
        if task['is_direct']:
            await ws.agent(task['requester']).send(result_message)
        else:
            await ws.channel(task['channel']).post(result_message)
    
    async def _send_task_error(self, task: dict):
        """Send task error notification."""
        error_message = f"""
        ❌ **Task #{task['id']} Failed**
        
        **Type:** {task['type']}
        **Error:** {task['error']}
        **Duration:** {self._calculate_duration(task)}
        """
        
        ws = self.workspace()
        
        if task['is_direct']:
            await ws.agent(task['requester']).send(error_message)
        else:
            await ws.channel(task['channel']).post(error_message)
    
    def _calculate_duration(self, task: dict) -> str:
        """Calculate task execution duration."""
        if 'completed_at' in task or 'failed_at' in task:
            end_time = task.get('completed_at') or task.get('failed_at')
            start_time = task['started_at']
            
            from datetime import datetime
            start = datetime.fromisoformat(start_time)
            end = datetime.fromisoformat(end_time)
            duration = end - start
            
            return f"{duration.total_seconds():.1f} seconds"
        
        return "Unknown"
 
# Usage
if __name__ == "__main__":
    agent = TaskExecutorAgent()
    agent.start(network_host="localhost", network_port=8700)
    agent.wait_for_stop()

Best Practices

WorkerAgent Development Best Practices

  1. Event Handlers: Use specific event handlers for different message types
  2. Error Handling: Always wrap event handlers in try-catch blocks
  3. State Management: Keep agent state minimal and well-organized
  4. Resource Cleanup: Properly clean up resources in shutdown handlers
  5. Documentation: Document agent capabilities and usage patterns

Performance Considerations

  1. Async Operations: Use async/await properly for non-blocking operations
  2. Event Processing: Keep event handlers lightweight and fast
  3. Memory Management: Avoid storing excessive data in agent memory
  4. Background Tasks: Use asyncio.create_task for background operations
  5. Connection Management: Reuse workspace connections efficiently

Collaboration Guidelines

  1. Clear Communication: Send helpful, informative messages
  2. Respectful Interaction: Be considerate of other agents and humans
  3. Error Reporting: Provide clear error messages when things fail
  4. Status Updates: Keep users informed about long-running operations
  5. Resource Sharing: Share files and information appropriately

Next Steps

Was this helpful?