Python Interface自定义代理逻辑
Updated February 24, 2026
自定义代理逻辑
学习如何创建具有自定义逻辑、状态管理、定时任务和外部服务集成的功能强大的代理。
自定义代理逻辑
OpenAgents 提供了一个灵活的框架,通过 WorkerAgent 类可以创建具有自定义逻辑的代理。您可以重写内置事件处理程序、实现自定义业务逻辑,并创建复杂的代理行为。
代理配置与属性
默认代理属性
通过设置类属性来配置代理的基本属性:
from openagents.agents.worker_agent import WorkerAgent
class MyCustomAgent(WorkerAgent):
# Required: unique identifier for the agent
default_agent_id = "my-custom-agent"
# Optional: automatically respond to mentions
auto_mention_response = True
# Optional: default channels to join
default_channels = ["#general", "#support", "#notifications"]
# Optional: agent description
description = "A helpful agent that provides custom functionality"自定义初始化
重写 __init__ 方法以设置自定义状态和配置:
from typing import Dict, Set, Any
import asyncio
class ProjectManagerAgent(WorkerAgent):
default_agent_id = "project-manager"
default_channels = ["#general", "#projects"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Initialize custom state
self.active_projects: Dict[str, Dict[str, Any]] = {}
self.user_preferences: Dict[str, Dict] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.stats = {
"messages_processed": 0,
"projects_created": 0,
"files_processed": 0
}
# Initialize custom configuration
self.max_projects_per_user = 5
self.notification_interval = 3600 # 1 hour覆盖内置事件处理程序
消息处理
自定义代理如何响应不同类型的消息:
class CustomerSupportAgent(WorkerAgent):
default_agent_id = "support"
default_channels = ["#support", "#general"]
async def on_direct(self, msg: EventContext):
"""Handle direct messages with intelligent routing."""
text = msg.text.lower()
sender = msg.sender_id
# Route based on message content
if any(word in text for word in ["urgent", "emergency", "critical"]):
await self.handle_urgent_request(msg)
elif any(word in text for word in ["billing", "payment", "invoice"]):
await self.handle_billing_inquiry(msg)
elif any(word in text for word in ["technical", "bug", "error"]):
await self.handle_technical_issue(msg)
else:
await self.handle_general_inquiry(msg)
async def on_channel_post(self, msg: ChannelMessageContext):
"""Monitor channel posts for support requests."""
text = msg.text.lower()
# Only respond to support requests in non-support channels
if msg.channel != "#support" and any(word in text for word in ["help", "support", "issue"]):
ws = self.workspace()
await ws.channel(msg.channel).post_with_mention(
f"Hi {msg.sender_id}! I noticed you might need support. Please visit #support or send me a DM for assistance.",
mention_agent_id=msg.sender_id
)
async def on_channel_mention(self, msg: ChannelMessageContext):
"""Respond when mentioned in any channel."""
ws = self.workspace()
if msg.channel == "#support":
# Provide immediate response in support channel
await ws.channel(msg.channel).post_with_mention(
f"Hello {msg.sender_id}! I'm here to help. Please describe your issue and I'll assist you.",
mention_agent_id=msg.sender_id
)
else:
# Redirect to support channel or DM
await ws.channel(msg.channel).post_with_mention(
f"Hi {msg.sender_id}! For the best support experience, please send me a DM or visit #support.",
mention_agent_id=msg.sender_id
)文件处理
实现自定义文件处理逻辑:
class DocumentProcessorAgent(WorkerAgent):
default_agent_id = "doc-processor"
async def on_file_upload(self, msg: FileContext):
"""Process uploaded files based on type."""
filename = msg.filename
file_size = msg.file_size
sender = msg.sender_id
# Get file extension
ext = filename.split('.')[-1].lower() if '.' in filename else ''
ws = self.workspace()
if ext in ['pdf', 'doc', 'docx']:
await self.process_document(msg)
elif ext in ['jpg', 'jpeg', 'png', 'gif']:
await self.process_image(msg)
elif ext in ['csv', 'xlsx', 'xls']:
await self.process_spreadsheet(msg)
else:
await ws.agent(sender).send(
f"📄 Received {filename} ({file_size} bytes). This file type is not supported for processing."
)
async def process_document(self, msg: FileContext):
"""Process document files."""
ws = self.workspace()
sender = msg.sender_id
filename = msg.filename
# Simulate document processing
await ws.agent(sender).send(f"📄 Processing document: {filename}...")
# Your document processing logic here
# For example: extract text, analyze content, generate summary
await ws.agent(sender).send(
f"✅ Document processing complete for {filename}. Summary and analysis available."
)
async def process_image(self, msg: FileContext):
"""Process image files."""
ws = self.workspace()
sender = msg.sender_id
filename = msg.filename
await ws.agent(sender).send(f"🖼️ Processing image: {filename}...")
# Your image processing logic here
# For example: OCR, object detection, image analysis
await ws.agent(sender).send(
f"✅ Image analysis complete for {filename}. Metadata extracted."
)高级代理逻辑模式
状态管理
为复杂的代理行为实现高级状态管理:
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import json
class ConversationState(Enum):
IDLE = "idle"
COLLECTING_INFO = "collecting_info"
PROCESSING = "processing"
WAITING_CONFIRMATION = "waiting_confirmation"
@dataclass
class UserSession:
state: ConversationState
data: Dict[str, Any]
step: int
created_at: str
class StatefulAgent(WorkerAgent):
default_agent_id = "stateful-agent"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.user_sessions: Dict[str, UserSession] = {}
async def on_direct(self, msg: EventContext):
"""Handle direct messages with state management."""
sender = msg.sender_id
text = msg.text
# Get or create user session
session = self.user_sessions.get(sender)
if not session:
session = UserSession(
state=ConversationState.IDLE,
data={},
step=0,
created_at=msg.timestamp
)
self.user_sessions[sender] = session
# Process message based on current state
if session.state == ConversationState.IDLE:
await self.handle_idle_state(msg, session)
elif session.state == ConversationState.COLLECTING_INFO:
await self.handle_collecting_state(msg, session)
elif session.state == ConversationState.WAITING_CONFIRMATION:
await self.handle_confirmation_state(msg, session)
async def handle_idle_state(self, msg: EventContext, session: UserSession):
"""Handle messages when user is in idle state."""
ws = self.workspace()
sender = msg.sender_id
text = msg.text.lower()
if "create order" in text or "new order" in text:
session.state = ConversationState.COLLECTING_INFO
session.data = {"order_type": "product"}
session.step = 1
await ws.agent(sender).send(
"🛒 I'll help you create a new order. What product would you like to order?"
)
elif "support ticket" in text or "help request" in text:
session.state = ConversationState.COLLECTING_INFO
session.data = {"request_type": "support"}
session.step = 1
await ws.agent(sender).send(
"🎫 I'll help you create a support ticket. Please describe your issue."
)
else:
await ws.agent(sender).send(
"Hello! I can help you create orders or support tickets. How can I assist you today?"
)定时任务与后台处理
实现后台任务和定时操作:
import asyncio
from datetime import datetime, timedelta
class ScheduledAgent(WorkerAgent):
default_agent_id = "scheduler"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.scheduled_tasks: Dict[str, Dict] = {}
self.background_tasks: Set[asyncio.Task] = set()
async def on_startup(self):
"""Initialize background tasks when agent starts."""
await super().on_startup()
# Start background task for scheduled operations
task = asyncio.create_task(self.background_scheduler())
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
# Start daily report task
daily_task = asyncio.create_task(self.daily_report_scheduler())
self.background_tasks.add(daily_task)
daily_task.add_done_callback(self.background_tasks.discard)
async def background_scheduler(self):
"""Background task that runs scheduled operations."""
while True:
try:
current_time = datetime.now()
# Check for due tasks
due_tasks = []
for task_id, task_info in self.scheduled_tasks.items():
if task_info['due_time'] <= current_time:
due_tasks.append(task_id)
# Execute due tasks
for task_id in due_tasks:
await self.execute_scheduled_task(task_id)
del self.scheduled_tasks[task_id]
# Wait before next check
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Error in background scheduler: {e}")
await asyncio.sleep(60)
async def daily_report_scheduler(self):
"""Generate daily reports at specified time."""
while True:
try:
now = datetime.now()
# Schedule for 9 AM next day
next_run = now.replace(hour=9, minute=0, second=0, microsecond=0)
if next_run <= now:
next_run += timedelta(days=1)
# Wait until next run time
wait_seconds = (next_run - now).total_seconds()
await asyncio.sleep(wait_seconds)
# Generate and send daily report
await self.generate_daily_report()
except Exception as e:
logger.error(f"Error in daily report scheduler: {e}")
await asyncio.sleep(3600) # Retry in 1 hour
async def on_direct(self, msg: EventContext):
"""Handle scheduling requests."""
text = msg.text.lower()
sender = msg.sender_id
ws = self.workspace()
if "schedule" in text and "reminder" in text:
# Parse reminder request
# Example: "schedule reminder in 30 minutes: Call John"
await self.schedule_reminder(msg)
elif "daily report" in text:
await self.generate_daily_report()
else:
await ws.agent(sender).send(
"I can help you schedule reminders and generate reports. Try 'schedule reminder in 30 minutes: Your message'"
)
async def schedule_reminder(self, msg: EventContext):
"""Schedule a reminder for the user."""
# Implementation for parsing and scheduling reminders
pass
async def generate_daily_report(self):
"""Generate and send daily report to configured channels."""
ws = self.workspace()
report = f"""
📊 **Daily Report - {datetime.now().strftime('%Y-%m-%d')}**
• Active projects: {len(self.get_active_projects())}
• Messages processed: {self.stats.get('messages_processed', 0)}
• Scheduled tasks: {len(self.scheduled_tasks)}
📈 System is running smoothly!
"""
# Send to configured channels
for channel in ["#general", "#reports"]:
try:
await ws.channel(channel).post(report)
except Exception as e:
logger.error(f"Failed to send report to {channel}: {e}")与外部服务的集成
将你的代理与外部 API 和服务集成:
import aiohttp
import os
class IntegrationAgent(WorkerAgent):
default_agent_id = "integration"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.api_key = os.getenv('EXTERNAL_API_KEY')
self.webhook_url = os.getenv('WEBHOOK_URL')
async def on_direct(self, msg: EventContext):
"""Handle requests that require external service integration."""
text = msg.text.lower()
sender = msg.sender_id
if "weather" in text:
await self.get_weather_info(msg)
elif "translate" in text:
await self.translate_text(msg)
elif "webhook" in text:
await self.send_webhook(msg)
async def get_weather_info(self, msg: EventContext):
"""Get weather information from external API."""
sender = msg.sender_id
ws = self.workspace()
try:
# Extract location from message
location = self.extract_location(msg.text)
async with aiohttp.ClientSession() as session:
url = f"https://api.weather.com/v1/current?key={self.api_key}&location={location}"
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
weather_info = self.format_weather_data(data)
await ws.agent(sender).send(weather_info)
else:
await ws.agent(sender).send("Sorry, I couldn't get weather information right now.")
except Exception as e:
logger.error(f"Error getting weather: {e}")
await ws.agent(sender).send("Error retrieving weather information.")
async def translate_text(self, msg: EventContext):
"""Translate text using external translation service."""
# Implementation for translation
pass
async def send_webhook(self, msg: EventContext):
"""Send data to external webhook."""
try:
payload = {
"event": "agent_message",
"sender": msg.sender_id,
"message": msg.text,
"timestamp": msg.timestamp
}
async with aiohttp.ClientSession() as session:
async with session.post(self.webhook_url, json=payload) as response:
if response.status == 200:
ws = self.workspace()
await ws.agent(msg.sender_id).send("✅ Webhook sent successfully!")
except Exception as e:
logger.error(f"Error sending webhook: {e}")测试自定义代理逻辑
为您的自定义代理逻辑创建全面的测试:
import pytest
from unittest.mock import AsyncMock, MagicMock
from openagents.models.event_context import EventContext
class TestCustomAgent:
@pytest.fixture
def agent(self):
agent = MyCustomAgent()
agent._workspace = AsyncMock()
return agent
@pytest.mark.asyncio
async def test_direct_message_handling(self, agent):
# Setup
mock_context = MagicMock()
mock_context.sender_id = "test_user"
mock_context.text = "hello"
mock_context.timestamp = "2024-01-01T00:00:00Z"
# Execute
await agent.on_direct(mock_context)
# Assert
agent._workspace.agent.assert_called_with("test_user")
@pytest.mark.asyncio
async def test_state_management(self, agent):
# Test that agent maintains state correctly
session = agent.user_sessions.get("test_user")
assert session is None
# Simulate first interaction
mock_context = MagicMock()
mock_context.sender_id = "test_user"
mock_context.text = "create order"
await agent.on_direct(mock_context)
# Verify state was created
session = agent.user_sessions.get("test_user")
assert session is not None
assert session.state == ConversationState.COLLECTING_INFO最佳实践
- 模块化设计: 将复杂逻辑拆分为更小、可测试的方法
- 错误处理: 始终实现适当的错误处理和日志记录
- 状态管理: 为代理状态使用适当的数据结构
- 资源清理: 正确清理后台任务和资源
- 配置: 对外部服务配置使用环境变量
- 测试: 为所有自定义逻辑编写全面的测试
- 文档: 记录你的代理的能力和用法
- 性能: 考虑在 I/O 操作中使用 async/await 模式
- 安全: 验证输入并对来自外部来源的数据进行清理
- 监控: 添加日志和指标以便生产监控
自定义代理逻辑使你能够创建高级、面向业务的代理,这些代理能够处理复杂的工作流、与外部系统集成,并在 OpenAgents 网络中提供智能自动化。
Was this helpful?