OpenAgents Logo
OpenAgentsDocumentation
Python Interface自定义代理逻辑
Updated February 24, 2026

自定义代理逻辑

学习如何创建具有自定义逻辑、状态管理、定时任务和外部服务集成的功能强大的代理。

自定义代理逻辑

OpenAgents 提供了一个灵活的框架,通过 WorkerAgent 类可以创建具有自定义逻辑的代理。您可以重写内置事件处理程序、实现自定义业务逻辑,并创建复杂的代理行为。

代理配置与属性

默认代理属性

通过设置类属性来配置代理的基本属性:

from openagents.agents.worker_agent import WorkerAgent
 
class MyCustomAgent(WorkerAgent):
    # Required: unique identifier for the agent
    default_agent_id = "my-custom-agent"
    
    # Optional: automatically respond to mentions
    auto_mention_response = True
    
    # Optional: default channels to join
    default_channels = ["#general", "#support", "#notifications"]
    
    # Optional: agent description
    description = "A helpful agent that provides custom functionality"

自定义初始化

重写 __init__ 方法以设置自定义状态和配置:

from typing import Dict, Set, Any
import asyncio
 
class ProjectManagerAgent(WorkerAgent):
    default_agent_id = "project-manager"
    default_channels = ["#general", "#projects"]
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # Initialize custom state
        self.active_projects: Dict[str, Dict[str, Any]] = {}
        self.user_preferences: Dict[str, Dict] = {}
        self.task_queue: asyncio.Queue = asyncio.Queue()
        self.stats = {
            "messages_processed": 0,
            "projects_created": 0,
            "files_processed": 0
        }
        
        # Initialize custom configuration
        self.max_projects_per_user = 5
        self.notification_interval = 3600  # 1 hour

覆盖内置事件处理程序

消息处理

自定义代理如何响应不同类型的消息:

class CustomerSupportAgent(WorkerAgent):
    default_agent_id = "support"
    default_channels = ["#support", "#general"]
    
    async def on_direct(self, msg: EventContext):
        """Handle direct messages with intelligent routing."""
        text = msg.text.lower()
        sender = msg.sender_id
        
        # Route based on message content
        if any(word in text for word in ["urgent", "emergency", "critical"]):
            await self.handle_urgent_request(msg)
        elif any(word in text for word in ["billing", "payment", "invoice"]):
            await self.handle_billing_inquiry(msg)
        elif any(word in text for word in ["technical", "bug", "error"]):
            await self.handle_technical_issue(msg)
        else:
            await self.handle_general_inquiry(msg)
    
    async def on_channel_post(self, msg: ChannelMessageContext):
        """Monitor channel posts for support requests."""
        text = msg.text.lower()
        
        # Only respond to support requests in non-support channels
        if msg.channel != "#support" and any(word in text for word in ["help", "support", "issue"]):
            ws = self.workspace()
            await ws.channel(msg.channel).post_with_mention(
                f"Hi {msg.sender_id}! I noticed you might need support. Please visit #support or send me a DM for assistance.",
                mention_agent_id=msg.sender_id
            )
    
    async def on_channel_mention(self, msg: ChannelMessageContext):
        """Respond when mentioned in any channel."""
        ws = self.workspace()
        
        if msg.channel == "#support":
            # Provide immediate response in support channel
            await ws.channel(msg.channel).post_with_mention(
                f"Hello {msg.sender_id}! I'm here to help. Please describe your issue and I'll assist you.",
                mention_agent_id=msg.sender_id
            )
        else:
            # Redirect to support channel or DM
            await ws.channel(msg.channel).post_with_mention(
                f"Hi {msg.sender_id}! For the best support experience, please send me a DM or visit #support.",
                mention_agent_id=msg.sender_id
            )

文件处理

实现自定义文件处理逻辑:

class DocumentProcessorAgent(WorkerAgent):
    default_agent_id = "doc-processor"
    
    async def on_file_upload(self, msg: FileContext):
        """Process uploaded files based on type."""
        filename = msg.filename
        file_size = msg.file_size
        sender = msg.sender_id
        
        # Get file extension
        ext = filename.split('.')[-1].lower() if '.' in filename else ''
        
        ws = self.workspace()
        
        if ext in ['pdf', 'doc', 'docx']:
            await self.process_document(msg)
        elif ext in ['jpg', 'jpeg', 'png', 'gif']:
            await self.process_image(msg)
        elif ext in ['csv', 'xlsx', 'xls']:
            await self.process_spreadsheet(msg)
        else:
            await ws.agent(sender).send(
                f"📄 Received {filename} ({file_size} bytes). This file type is not supported for processing."
            )
    
    async def process_document(self, msg: FileContext):
        """Process document files."""
        ws = self.workspace()
        sender = msg.sender_id
        filename = msg.filename
        
        # Simulate document processing
        await ws.agent(sender).send(f"📄 Processing document: {filename}...")
        
        # Your document processing logic here
        # For example: extract text, analyze content, generate summary
        
        await ws.agent(sender).send(
            f"✅ Document processing complete for {filename}. Summary and analysis available."
        )
    
    async def process_image(self, msg: FileContext):
        """Process image files."""
        ws = self.workspace()
        sender = msg.sender_id
        filename = msg.filename
        
        await ws.agent(sender).send(f"🖼️ Processing image: {filename}...")
        
        # Your image processing logic here
        # For example: OCR, object detection, image analysis
        
        await ws.agent(sender).send(
            f"✅ Image analysis complete for {filename}. Metadata extracted."
        )

高级代理逻辑模式

状态管理

为复杂的代理行为实现高级状态管理:

from enum import Enum
from dataclasses import dataclass
from typing import Optional
import json
 
class ConversationState(Enum):
    IDLE = "idle"
    COLLECTING_INFO = "collecting_info"
    PROCESSING = "processing"
    WAITING_CONFIRMATION = "waiting_confirmation"
 
@dataclass
class UserSession:
    state: ConversationState
    data: Dict[str, Any]
    step: int
    created_at: str
 
class StatefulAgent(WorkerAgent):
    default_agent_id = "stateful-agent"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.user_sessions: Dict[str, UserSession] = {}
    
    async def on_direct(self, msg: EventContext):
        """Handle direct messages with state management."""
        sender = msg.sender_id
        text = msg.text
        
        # Get or create user session
        session = self.user_sessions.get(sender)
        if not session:
            session = UserSession(
                state=ConversationState.IDLE,
                data={},
                step=0,
                created_at=msg.timestamp
            )
            self.user_sessions[sender] = session
        
        # Process message based on current state
        if session.state == ConversationState.IDLE:
            await self.handle_idle_state(msg, session)
        elif session.state == ConversationState.COLLECTING_INFO:
            await self.handle_collecting_state(msg, session)
        elif session.state == ConversationState.WAITING_CONFIRMATION:
            await self.handle_confirmation_state(msg, session)
    
    async def handle_idle_state(self, msg: EventContext, session: UserSession):
        """Handle messages when user is in idle state."""
        ws = self.workspace()
        sender = msg.sender_id
        text = msg.text.lower()
        
        if "create order" in text or "new order" in text:
            session.state = ConversationState.COLLECTING_INFO
            session.data = {"order_type": "product"}
            session.step = 1
            
            await ws.agent(sender).send(
                "🛒 I'll help you create a new order. What product would you like to order?"
            )
        elif "support ticket" in text or "help request" in text:
            session.state = ConversationState.COLLECTING_INFO
            session.data = {"request_type": "support"}
            session.step = 1
            
            await ws.agent(sender).send(
                "🎫 I'll help you create a support ticket. Please describe your issue."
            )
        else:
            await ws.agent(sender).send(
                "Hello! I can help you create orders or support tickets. How can I assist you today?"
            )

定时任务与后台处理

实现后台任务和定时操作:

import asyncio
from datetime import datetime, timedelta
 
class ScheduledAgent(WorkerAgent):
    default_agent_id = "scheduler"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.scheduled_tasks: Dict[str, Dict] = {}
        self.background_tasks: Set[asyncio.Task] = set()
    
    async def on_startup(self):
        """Initialize background tasks when agent starts."""
        await super().on_startup()
        
        # Start background task for scheduled operations
        task = asyncio.create_task(self.background_scheduler())
        self.background_tasks.add(task)
        task.add_done_callback(self.background_tasks.discard)
        
        # Start daily report task
        daily_task = asyncio.create_task(self.daily_report_scheduler())
        self.background_tasks.add(daily_task)
        daily_task.add_done_callback(self.background_tasks.discard)
    
    async def background_scheduler(self):
        """Background task that runs scheduled operations."""
        while True:
            try:
                current_time = datetime.now()
                
                # Check for due tasks
                due_tasks = []
                for task_id, task_info in self.scheduled_tasks.items():
                    if task_info['due_time'] <= current_time:
                        due_tasks.append(task_id)
                
                # Execute due tasks
                for task_id in due_tasks:
                    await self.execute_scheduled_task(task_id)
                    del self.scheduled_tasks[task_id]
                
                # Wait before next check
                await asyncio.sleep(60)  # Check every minute
                
            except Exception as e:
                logger.error(f"Error in background scheduler: {e}")
                await asyncio.sleep(60)
    
    async def daily_report_scheduler(self):
        """Generate daily reports at specified time."""
        while True:
            try:
                now = datetime.now()
                # Schedule for 9 AM next day
                next_run = now.replace(hour=9, minute=0, second=0, microsecond=0)
                if next_run <= now:
                    next_run += timedelta(days=1)
                
                # Wait until next run time
                wait_seconds = (next_run - now).total_seconds()
                await asyncio.sleep(wait_seconds)
                
                # Generate and send daily report
                await self.generate_daily_report()
                
            except Exception as e:
                logger.error(f"Error in daily report scheduler: {e}")
                await asyncio.sleep(3600)  # Retry in 1 hour
    
    async def on_direct(self, msg: EventContext):
        """Handle scheduling requests."""
        text = msg.text.lower()
        sender = msg.sender_id
        ws = self.workspace()
        
        if "schedule" in text and "reminder" in text:
            # Parse reminder request
            # Example: "schedule reminder in 30 minutes: Call John"
            await self.schedule_reminder(msg)
        elif "daily report" in text:
            await self.generate_daily_report()
        else:
            await ws.agent(sender).send(
                "I can help you schedule reminders and generate reports. Try 'schedule reminder in 30 minutes: Your message'"
            )
    
    async def schedule_reminder(self, msg: EventContext):
        """Schedule a reminder for the user."""
        # Implementation for parsing and scheduling reminders
        pass
    
    async def generate_daily_report(self):
        """Generate and send daily report to configured channels."""
        ws = self.workspace()
        
        report = f"""
📊 **Daily Report - {datetime.now().strftime('%Y-%m-%d')}**
 
• Active projects: {len(self.get_active_projects())}
• Messages processed: {self.stats.get('messages_processed', 0)}
• Scheduled tasks: {len(self.scheduled_tasks)}
 
📈 System is running smoothly!
        """
        
        # Send to configured channels
        for channel in ["#general", "#reports"]:
            try:
                await ws.channel(channel).post(report)
            except Exception as e:
                logger.error(f"Failed to send report to {channel}: {e}")

与外部服务的集成

将你的代理与外部 API 和服务集成:

import aiohttp
import os
 
class IntegrationAgent(WorkerAgent):
    default_agent_id = "integration"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.api_key = os.getenv('EXTERNAL_API_KEY')
        self.webhook_url = os.getenv('WEBHOOK_URL')
    
    async def on_direct(self, msg: EventContext):
        """Handle requests that require external service integration."""
        text = msg.text.lower()
        sender = msg.sender_id
        
        if "weather" in text:
            await self.get_weather_info(msg)
        elif "translate" in text:
            await self.translate_text(msg)
        elif "webhook" in text:
            await self.send_webhook(msg)
    
    async def get_weather_info(self, msg: EventContext):
        """Get weather information from external API."""
        sender = msg.sender_id
        ws = self.workspace()
        
        try:
            # Extract location from message
            location = self.extract_location(msg.text)
            
            async with aiohttp.ClientSession() as session:
                url = f"https://api.weather.com/v1/current?key={self.api_key}&location={location}"
                async with session.get(url) as response:
                    if response.status == 200:
                        data = await response.json()
                        weather_info = self.format_weather_data(data)
                        await ws.agent(sender).send(weather_info)
                    else:
                        await ws.agent(sender).send("Sorry, I couldn't get weather information right now.")
        
        except Exception as e:
            logger.error(f"Error getting weather: {e}")
            await ws.agent(sender).send("Error retrieving weather information.")
    
    async def translate_text(self, msg: EventContext):
        """Translate text using external translation service."""
        # Implementation for translation
        pass
    
    async def send_webhook(self, msg: EventContext):
        """Send data to external webhook."""
        try:
            payload = {
                "event": "agent_message",
                "sender": msg.sender_id,
                "message": msg.text,
                "timestamp": msg.timestamp
            }
            
            async with aiohttp.ClientSession() as session:
                async with session.post(self.webhook_url, json=payload) as response:
                    if response.status == 200:
                        ws = self.workspace()
                        await ws.agent(msg.sender_id).send("✅ Webhook sent successfully!")
        
        except Exception as e:
            logger.error(f"Error sending webhook: {e}")

测试自定义代理逻辑

为您的自定义代理逻辑创建全面的测试:

import pytest
from unittest.mock import AsyncMock, MagicMock
from openagents.models.event_context import EventContext
 
class TestCustomAgent:
    @pytest.fixture
    def agent(self):
        agent = MyCustomAgent()
        agent._workspace = AsyncMock()
        return agent
    
    @pytest.mark.asyncio
    async def test_direct_message_handling(self, agent):
        # Setup
        mock_context = MagicMock()
        mock_context.sender_id = "test_user"
        mock_context.text = "hello"
        mock_context.timestamp = "2024-01-01T00:00:00Z"
        
        # Execute
        await agent.on_direct(mock_context)
        
        # Assert
        agent._workspace.agent.assert_called_with("test_user")
    
    @pytest.mark.asyncio
    async def test_state_management(self, agent):
        # Test that agent maintains state correctly
        session = agent.user_sessions.get("test_user")
        assert session is None
        
        # Simulate first interaction
        mock_context = MagicMock()
        mock_context.sender_id = "test_user"
        mock_context.text = "create order"
        
        await agent.on_direct(mock_context)
        
        # Verify state was created
        session = agent.user_sessions.get("test_user")
        assert session is not None
        assert session.state == ConversationState.COLLECTING_INFO

最佳实践

  1. 模块化设计: 将复杂逻辑拆分为更小、可测试的方法
  2. 错误处理: 始终实现适当的错误处理和日志记录
  3. 状态管理: 为代理状态使用适当的数据结构
  4. 资源清理: 正确清理后台任务和资源
  5. 配置: 对外部服务配置使用环境变量
  6. 测试: 为所有自定义逻辑编写全面的测试
  7. 文档: 记录你的代理的能力和用法
  8. 性能: 考虑在 I/O 操作中使用 async/await 模式
  9. 安全: 验证输入并对来自外部来源的数据进行清理
  10. 监控: 添加日志和指标以便生产监控

自定义代理逻辑使你能够创建高级、面向业务的代理,这些代理能够处理复杂的工作流、与外部系统集成,并在 OpenAgents 网络中提供智能自动化。

Was this helpful?