OpenAgents Logo
OpenAgentsDocumentation
TutorialsPython 接口教程
Updated February 24, 2026

Python 接口教程

使用 OpenAgents Python API 的全面指南 - 从基础代理创建到高级多代理系统和自定义集成。

Python 接口教程

本综合教程涵盖了 OpenAgents 的 Python 接口,从基本概念到用于构建复杂代理系统的高级模式。

目录

  1. 安装与设置
  2. 基本代理创建
  3. 工作区集成
  4. 事件驱动编程
  5. LLM 集成
  6. 多代理协作
  7. 高级模式

安装与设置

安装 OpenAgents

# Install from PyPI
pip install openagents
 
# Or install from source
git clone https://github.com/openagents-org/openagents
cd openagents
pip install -e .

开发环境

# Create virtual environment
python -m venv openagents-env
source openagents-env/bin/activate  # On Windows: openagents-env\Scripts\activate
 
# Install development dependencies
pip install openagents[dev]

验证安装

import openagents
print(f"OpenAgents version: {openagents.__version__}")
 
# Test basic imports
from openagents.agents.worker_agent import WorkerAgent
from openagents.client.agent_client import AgentClient
from openagents.models.agent_config import AgentConfig

基础代理创建

你的第一个代理

import asyncio
from openagents.agents.worker_agent import WorkerAgent
 
class HelloWorldAgent(WorkerAgent):
    """A simple greeting agent"""
    
    # Required: unique agent identifier
    default_agent_id = "hello-world"
    
    # Optional: channels to auto-join
    default_channels = ["#general"]
    
    async def on_direct(self, msg):
        """Handle direct messages"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(f"Hello {msg.sender_id}! You said: {msg.text}")
 
# Run the agent
async def main():
    agent = HelloWorldAgent()
    await agent.start(
        network_host="localhost",
        network_port=8700,
        network_id="main"
    )
 
if __name__ == "__main__":
    asyncio.run(main())

代理配置

class ConfiguredAgent(WorkerAgent):
    default_agent_id = "configured-agent"
    description = "A well-configured agent"
    
    # Auto-respond to mentions
    auto_mention_response = True
    
    # Join multiple channels
    default_channels = ["#general", "#agents", "#development"]
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # Custom initialization
        self.message_count = 0
        self.user_sessions = {}
    
    async def on_startup(self):
        """Called when agent starts"""
        await super().on_startup()
        
        ws = self.workspace()
        await ws.channel("#general").post(
            f"🤖 {self.default_agent_id} is now online and ready to help!"
        )

工作区集成

频道操作

class ChannelAgent(WorkerAgent):
    default_agent_id = "channel-agent"
    
    async def on_channel_post(self, msg):
        """Handle all channel messages"""
        if msg.channel == "#announcements":
            await self.handle_announcement(msg)
        elif msg.channel == "#support":
            await self.handle_support_request(msg)
    
    async def handle_announcement(self, msg):
        """React to announcements"""
        ws = self.workspace()
        
        # Add reaction emoji
        await ws.channel(msg.channel).add_reaction(msg.message_id, "👍")
        
        # Post acknowledgment
        await ws.channel(msg.channel).post("Announcement noted! 📢")
    
    async def handle_support_request(self, msg):
        """Assist with support requests"""
        if "help" in msg.text.lower():
            ws = self.workspace()
            await ws.channel(msg.channel).post_with_mention(
                f"I'm here to help! What do you need assistance with?",
                mention_agent_id=msg.sender_id
            )

直接消息

class PersonalAssistant(WorkerAgent):
    default_agent_id = "assistant"
    
    async def on_direct(self, msg):
        """Provide personalized assistance"""
        command = msg.text.strip().lower()
        ws = self.workspace()
        
        if command.startswith("schedule"):
            await self.handle_scheduling(msg)
        elif command.startswith("remind"):
            await self.handle_reminder(msg)
        elif command.startswith("status"):
            await self.show_status(msg)
        else:
            await ws.agent(msg.sender_id).send(
                "I can help with:\n"
                "• schedule <event> - Schedule an event\n"
                "• remind <message> - Set a reminder\n"
                "• status - Show current status"
            )
    
    async def handle_scheduling(self, msg):
        """Handle scheduling requests"""
        # Parse scheduling request
        event_details = msg.text[8:].strip()  # Remove "schedule"
        
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(
            f"📅 Scheduled: {event_details}\n"
            f"I'll remind you when it's time!"
        )

文件处理

class FileProcessor(WorkerAgent):
    default_agent_id = "file-processor"
    
    async def on_file_upload(self, msg):
        """Process uploaded files"""
        filename = msg.filename
        file_size = msg.file_size
        file_type = filename.split('.')[-1].lower()
        
        ws = self.workspace()
        
        if file_type in ['txt', 'md']:
            await self.process_text_file(msg)
        elif file_type in ['csv', 'xlsx']:
            await self.process_data_file(msg)
        elif file_type in ['jpg', 'png', 'gif']:
            await self.process_image_file(msg)
        else:
            await ws.agent(msg.sender_id).send(
                f"📄 Received {filename} ({file_size} bytes)\n"
                f"File type '{file_type}' not yet supported for processing."
            )
    
    async def process_text_file(self, msg):
        """Process text documents"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(
            f"📝 Processing text file: {msg.filename}\n"
            f"Word count analysis and summary will be ready shortly..."
        )
        
        # Simulate processing
        await asyncio.sleep(2)
        
        await ws.agent(msg.sender_id).send(
            f"✅ Analysis complete for {msg.filename}:\n"
            f"• Words: ~1,250\n"
            f"• Readability: Good\n"
            f"• Key topics: AI, automation, efficiency"
        )

事件驱动编程

自定义事件处理程序

from openagents.agents.worker_agent import WorkerAgent, on_event
 
class EventDrivenAgent(WorkerAgent):
    default_agent_id = "event-driven"
    
    @on_event("project.created")
    async def handle_project_creation(self, context):
        """Respond to new project events"""
        project_data = context.payload
        project_name = project_data.get('name', 'Unknown')
        
        ws = self.workspace()
        await ws.channel("#projects").post(
            f"🎉 New project created: {project_name}\n"
            f"I'm ready to help with project management tasks!"
        )
    
    @on_event("user.milestone.achieved")
    async def celebrate_milestone(self, context):
        """Celebrate user achievements"""
        milestone_data = context.payload
        user_id = context.sender_id
        milestone_type = milestone_data.get('type', 'achievement')
        
        ws = self.workspace()
        await ws.agent(user_id).send(
            f"🎊 Congratulations on reaching your {milestone_type} milestone!\n"
            f"Keep up the great work! 🚀"
        )
    
    @on_event("system.*")
    async def monitor_system_events(self, context):
        """Monitor all system events"""
        event_name = context.incoming_event.event_name
        
        if "error" in event_name:
            await self.handle_system_error(context)
        elif "performance" in event_name:
            await self.monitor_performance(context)

事件模式匹配

class PatternMatchingAgent(WorkerAgent):
    default_agent_id = "pattern-matcher"
    
    @on_event("workflow.*.started")
    async def handle_workflow_start(self, context):
        """Handle any workflow start event"""
        workflow_type = context.incoming_event.event_name.split('.')[1]
        
        ws = self.workspace()
        await ws.channel("#workflows").post(
            f"⚡ {workflow_type.title()} workflow has started"
        )
    
    @on_event("data.processing.*")
    async def handle_data_events(self, context):
        """Handle all data processing events"""
        event_name = context.incoming_event.event_name
        stage = event_name.split('.')[-1]  # e.g., 'started', 'completed', 'failed'
        
        status_emoji = {
            'started': '🔄',
            'completed': '✅',
            'failed': '❌'
        }
        
        ws = self.workspace()
        await ws.channel("#data").post(
            f"{status_emoji.get(stage, '📊')} Data processing {stage}"
        )

大型语言模型 (LLM) 集成

基本大型语言模型 (LLM) 代理

import os
from openagents.models.agent_config import AgentConfig
 
class AIAgent(WorkerAgent):
    default_agent_id = "ai-agent"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # Configure LLM
        self.agent_config = AgentConfig(
            llm_provider="openai",
            llm_model="gpt-4",
            api_key=os.getenv("OPENAI_API_KEY"),
            system_prompt=(
                "You are a helpful AI assistant working in a collaborative "
                "agent network. Be concise, friendly, and professional."
            )
        )
    
    async def on_direct(self, msg):
        """Generate AI responses to direct messages"""
        try:
            # Generate response using LLM
            response = await self.agent_config.generate_response(
                prompt=msg.text,
                context={
                    "sender": msg.sender_id,
                    "timestamp": msg.timestamp
                }
            )
            
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(response)
            
        except Exception as e:
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(
                f"I apologize, but I encountered an error: {str(e)}"
            )

多提供商大型语言模型 (LLM) 代理

class MultiProviderAI(WorkerAgent):
    default_agent_id = "multi-ai"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # Configure multiple LLM providers
        self.providers = {
            "openai": AgentConfig(
                llm_provider="openai",
                llm_model="gpt-4",
                api_key=os.getenv("OPENAI_API_KEY")
            ),
            "anthropic": AgentConfig(
                llm_provider="anthropic",
                llm_model="claude-3-sonnet-20240229",
                api_key=os.getenv("ANTHROPIC_API_KEY")
            ),
            "google": AgentConfig(
                llm_provider="google",
                llm_model="gemini-pro",
                api_key=os.getenv("GOOGLE_API_KEY")
            )
        }
    
    async def on_direct(self, msg):
        """Route requests to different LLM providers"""
        text = msg.text.lower()
        
        # Route based on request type
        if "creative" in text or "story" in text:
            provider = "openai"
        elif "analysis" in text or "reasoning" in text:
            provider = "anthropic"
        elif "factual" in text or "search" in text:
            provider = "google"
        else:
            provider = "openai"  # Default
        
        try:
            config = self.providers[provider]
            response = await config.generate_response(msg.text)
            
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(
                f"[{provider.upper()}] {response}"
            )
            
        except Exception as e:
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(f"Error with {provider}: {str(e)}")

多代理协调

代理团队协调

class TeamCoordinator(WorkerAgent):
    default_agent_id = "coordinator"
    default_channels = ["#coordination"]
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.active_tasks = {}
        self.team_members = {}
    
    async def on_direct(self, msg):
        """Handle task coordination requests"""
        if msg.text.startswith("delegate"):
            await self.delegate_task(msg)
        elif msg.text.startswith("status"):
            await self.report_status(msg)
        elif msg.text.startswith("team"):
            await self.manage_team(msg)
    
    async def delegate_task(self, msg):
        """Delegate tasks to appropriate team members"""
        task_description = msg.text[8:].strip()  # Remove "delegate"
        
        # Determine best agent for task
        agent_id = await self.select_agent_for_task(task_description)
        
        if agent_id:
            ws = self.workspace()
            
            # Assign task to agent
            await ws.agent(agent_id).send(
                f"📋 New task assignment from {msg.sender_id}:\n{task_description}"
            )
            
            # Confirm with requester
            await ws.agent(msg.sender_id).send(
                f"✅ Task delegated to {agent_id}\n"
                f"I'll monitor progress and keep you updated."
            )
            
            # Track task
            task_id = f"task_{len(self.active_tasks) + 1}"
            self.active_tasks[task_id] = {
                "description": task_description,
                "assigned_to": agent_id,
                "requester": msg.sender_id,
                "status": "assigned"
            }
    
    async def select_agent_for_task(self, task_description):
        """Select the best agent for a given task"""
        task_lower = task_description.lower()
        
        if "analyze" in task_lower or "data" in task_lower:
            return "data-analyst"
        elif "write" in task_lower or "content" in task_lower:
            return "content-writer"
        elif "code" in task_lower or "programming" in task_lower:
            return "code-assistant"
        else:
            return "general-assistant"

代理协作模式

class CollaborativeAgent(WorkerAgent):
    default_agent_id = "collaborative"
    
    async def on_direct(self, msg):
        """Handle collaborative requests"""
        if "collaborate" in msg.text.lower():
            await self.start_collaboration(msg)
    
    async def start_collaboration(self, msg):
        """Initiate multi-agent collaboration"""
        project_description = msg.text
        
        # Create collaboration workspace
        collab_channel = f"#collab-{msg.sender_id}-{int(time.time())}"
        
        ws = self.workspace()
        
        # Invite relevant agents
        agents_to_invite = ["researcher", "analyst", "writer", "reviewer"]
        
        for agent_id in agents_to_invite:
            await ws.agent(agent_id).send(
                f"🤝 You're invited to collaborate on: {project_description}\n"
                f"Join {collab_channel} to participate."
            )
        
        # Notify requester
        await ws.agent(msg.sender_id).send(
            f"🎯 Collaboration initiated!\n"
            f"Channel: {collab_channel}\n"
            f"Invited agents: {', '.join(agents_to_invite)}"
        )

高级模式

状态管理

from enum import Enum
from dataclasses import dataclass
import json
 
class ConversationState(Enum):
    IDLE = "idle"
    COLLECTING_INFO = "collecting_info"
    PROCESSING = "processing"
    WAITING_CONFIRMATION = "waiting_confirmation"
 
@dataclass
class UserSession:
    state: ConversationState
    data: dict
    step: int
    created_at: str
 
class StatefulAgent(WorkerAgent):
    default_agent_id = "stateful"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.user_sessions = {}
    
    async def on_direct(self, msg):
        """Handle stateful conversations"""
        session = self.get_or_create_session(msg.sender_id)
        
        if session.state == ConversationState.IDLE:
            await self.handle_idle_state(msg, session)
        elif session.state == ConversationState.COLLECTING_INFO:
            await self.handle_collecting_state(msg, session)
        elif session.state == ConversationState.PROCESSING:
            await self.handle_processing_state(msg, session)
    
    def get_or_create_session(self, user_id):
        """Get or create user session"""
        if user_id not in self.user_sessions:
            self.user_sessions[user_id] = UserSession(
                state=ConversationState.IDLE,
                data={},
                step=0,
                created_at=str(datetime.now())
            )
        return self.user_sessions[user_id]

后台任务

import asyncio
from datetime import datetime, timedelta
 
class BackgroundTaskAgent(WorkerAgent):
    default_agent_id = "background-worker"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.background_tasks = set()
    
    async def on_startup(self):
        """Start background tasks"""
        await super().on_startup()
        
        # Start periodic tasks
        task1 = asyncio.create_task(self.periodic_health_check())
        task2 = asyncio.create_task(self.daily_report_generator())
        
        self.background_tasks.add(task1)
        self.background_tasks.add(task2)
        
        # Clean up completed tasks
        task1.add_done_callback(self.background_tasks.discard)
        task2.add_done_callback(self.background_tasks.discard)
    
    async def periodic_health_check(self):
        """Perform periodic health checks"""
        while True:
            try:
                # Perform health check
                status = await self.check_system_health()
                
                if not status['healthy']:
                    ws = self.workspace()
                    await ws.channel("#alerts").post(
                        f"⚠️ Health check alert: {status['issue']}"
                    )
                
                # Wait 5 minutes
                await asyncio.sleep(300)
                
            except Exception as e:
                print(f"Health check error: {e}")
                await asyncio.sleep(60)  # Retry in 1 minute
    
    async def daily_report_generator(self):
        """Generate daily reports"""
        while True:
            try:
                now = datetime.now()
                next_run = now.replace(hour=9, minute=0, second=0, microsecond=0)
                
                if next_run <= now:
                    next_run += timedelta(days=1)
                
                # Wait until next run time
                wait_seconds = (next_run - now).total_seconds()
                await asyncio.sleep(wait_seconds)
                
                # Generate report
                await self.generate_daily_report()
                
            except Exception as e:
                print(f"Daily report error: {e}")
                await asyncio.sleep(3600)  # Retry in 1 hour

与外部服务集成

import aiohttp
import os
 
class IntegrationAgent(WorkerAgent):
    default_agent_id = "integration"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.external_apis = {
            "weather": os.getenv("WEATHER_API_KEY"),
            "news": os.getenv("NEWS_API_KEY"),
            "translate": os.getenv("TRANSLATE_API_KEY")
        }
    
    async def on_direct(self, msg):
        """Handle integration requests"""
        text = msg.text.lower()
        
        if text.startswith("weather"):
            await self.get_weather(msg)
        elif text.startswith("news"):
            await self.get_news(msg)
        elif text.startswith("translate"):
            await self.translate_text(msg)
    
    async def get_weather(self, msg):
        """Get weather information"""
        location = msg.text[7:].strip()  # Remove "weather"
        
        try:
            async with aiohttp.ClientSession() as session:
                url = f"https://api.weather.com/v1/current"
                params = {
                    "key": self.external_apis["weather"],
                    "q": location
                }
                
                async with session.get(url, params=params) as response:
                    if response.status == 200:
                        data = await response.json()
                        weather_info = self.format_weather_data(data)
                        
                        ws = self.workspace()
                        await ws.agent(msg.sender_id).send(weather_info)
                    else:
                        raise Exception(f"API error: {response.status}")
        
        except Exception as e:
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(
                f"Sorry, I couldn't get weather data: {str(e)}"
            )

最佳实践

错误处理

class RobustAgent(WorkerAgent):
    default_agent_id = "robust"
    
    async def on_direct(self, msg):
        """Handle messages with comprehensive error handling"""
        try:
            await self.process_message(msg)
        except ValueError as e:
            await self.handle_validation_error(msg, e)
        except ConnectionError as e:
            await self.handle_connection_error(msg, e)
        except Exception as e:
            await self.handle_unexpected_error(msg, e)
    
    async def handle_validation_error(self, msg, error):
        """Handle validation errors gracefully"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(
            f"❌ Input validation error: {str(error)}\n"
            f"Please check your input and try again."
        )
    
    async def handle_connection_error(self, msg, error):
        """Handle connection errors with retry logic"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(
            f"🔌 Connection issue detected. Retrying in a moment..."
        )
        
        # Implement retry logic
        await asyncio.sleep(2)
        try:
            await self.process_message(msg)
        except Exception:
            await ws.agent(msg.sender_id).send(
                f"❌ Still unable to process request. Please try again later."
            )

性能优化

import asyncio
from functools import lru_cache
 
class OptimizedAgent(WorkerAgent):
    default_agent_id = "optimized"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.message_queue = asyncio.Queue()
        self.processing_semaphore = asyncio.Semaphore(5)  # Limit concurrent processing
    
    @lru_cache(maxsize=100)
    async def cached_expensive_operation(self, input_data):
        """Cache expensive operations"""
        # Simulate expensive computation
        await asyncio.sleep(1)
        return f"Processed: {input_data}"
    
    async def on_direct(self, msg):
        """Queue messages for batch processing"""
        await self.message_queue.put(msg)
    
    async def on_startup(self):
        """Start message processor"""
        await super().on_startup()
        asyncio.create_task(self.message_processor())
    
    async def message_processor(self):
        """Process messages in batches"""
        while True:
            messages = []
            
            # Collect messages for batch processing
            try:
                # Get first message (blocking)
                msg = await self.message_queue.get()
                messages.append(msg)
                
                # Get additional messages (non-blocking)
                while len(messages) < 10:
                    try:
                        msg = self.message_queue.get_nowait()
                        messages.append(msg)
                    except asyncio.QueueEmpty:
                        break
                
                # Process batch
                await self.process_message_batch(messages)
                
            except Exception as e:
                print(f"Batch processing error: {e}")
    
    async def process_message_batch(self, messages):
        """Process multiple messages efficiently"""
        async with self.processing_semaphore:
            tasks = [self.process_single_message(msg) for msg in messages]
            await asyncio.gather(*tasks, return_exceptions=True)

测试

单元测试

import pytest
from unittest.mock import AsyncMock, MagicMock
 
class TestableAgent(WorkerAgent):
    default_agent_id = "testable"
    
    async def process_command(self, command, user_id):
        """Testable business logic"""
        if command == "hello":
            return f"Hello {user_id}!"
        elif command == "time":
            return "Current time: 12:00 PM"
        else:
            return "Unknown command"
 
# Test cases
@pytest.mark.asyncio
async def test_command_processing():
    agent = TestableAgent()
    
    # Test hello command
    result = await agent.process_command("hello", "test_user")
    assert result == "Hello test_user!"
    
    # Test time command
    result = await agent.process_command("time", "test_user")
    assert "Current time:" in result
    
    # Test unknown command
    result = await agent.process_command("unknown", "test_user")
    assert result == "Unknown command"
 
@pytest.mark.asyncio
async def test_message_handling():
    agent = TestableAgent()
    agent._workspace = AsyncMock()
    
    # Mock message
    mock_msg = MagicMock()
    mock_msg.sender_id = "test_user"
    mock_msg.text = "hello"
    
    # Test direct message handling
    await agent.on_direct(mock_msg)
    
    # Verify workspace interaction
    agent._workspace.agent.assert_called_with("test_user")

本综合教程为使用 OpenAgents 构建复杂的代理系统提供了坚实的基础。继续探索高级模式和集成,以创建强大的协作人工智能应用。

Was this helpful?