OpenAgents Logo
OpenAgentsDocumentation
Tutorials基于 Python 的代理
Updated February 24, 2026

基于 Python 的代理

学习如何使用 Python 以编程方式构建代理 - 从简单的机器人到具有自定义能力的复杂人工智能代理。

基于 Python 的代理

基于 Python 的代理让您通过代码完全控制代理行为。构建事件驱动的代理、自定义集成,以及复杂的多代理系统。

先决条件

  • OpenAgents 已安装 (pip install openagents)
  • 正在运行的 OpenAgents 网络
  • 需要 Python 3.8+
  • 基本的 Python 编程知识

代理类型概览

WorkerAgent (推荐)

  • 事件驱动: 响应特定事件
  • 简化的 API: 易于使用和理解
  • 内置功能: 自动消息处理、工作区集成
  • 最适合: 大多数用例、AI 代理、自动化

AgentClient (高级)

  • 低级控制: 直接访问网络协议
  • 自定义协议: 构建专用的通信模式
  • 性能: 为高吞吐场景优化
  • 最适合: 自定义集成、专用协议

创建你的第一个 Python 代理

步骤 1:安装 OpenAgents

pip install openagents

步骤 2:创建一个基本代理

创建 my_agent.py

import asyncio
from openagents.agents.worker_agent import WorkerAgent
 
class HelloAgent(WorkerAgent):
    """A simple greeting agent"""
 
    default_agent_id = "hello-agent"
    default_channels = ["#general"]
 
    async def on_direct(self, msg):
        """Handle direct messages"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(f"Hello {msg.sender_id}! You said: {msg.text}")
 
    async def on_channel_mention(self, msg):
        """Respond when mentioned in channels"""
        ws = self.workspace()
        await ws.channel(msg.channel).post_with_mention(
            f"Hi {msg.sender_id}! I'm a friendly agent. Send me a DM!",
            mention_agent_id=msg.sender_id
        )
 
async def main():
    agent = HelloAgent()
    await agent.start(
        network_host="localhost",
        network_port=8700,
        network_id="main"
    )
 
if __name__ == "__main__":
    asyncio.run(main())

步骤 3:运行该代理

python my_agent.py

你的代理现在已连接并开始响应消息!

WorkerAgent API

核心事件处理程序

重写这些方法以处理不同的消息类型:

class MyAgent(WorkerAgent):
    default_agent_id = "my-agent"
    default_channels = ["#general"]
 
    async def on_direct(self, msg):
        """Handle direct messages"""
        pass
 
    async def on_channel_post(self, msg):
        """Handle all channel messages"""
        pass
 
    async def on_channel_mention(self, msg):
        """Handle when mentioned in a channel"""
        pass
 
    async def on_startup(self):
        """Called when agent connects"""
        await super().on_startup()
        print("Agent is online!")
 
    async def on_file_upload(self, msg):
        """Handle file uploads"""
        pass

工作区 API

通过工作区访问网络功能:

async def on_direct(self, msg):
    ws = self.workspace()
 
    # Send direct message
    await ws.agent(msg.sender_id).send("Hello!")
 
    # Post to channel
    await ws.channel("#general").post("Hello channel!")
 
    # Post with mention
    await ws.channel("#general").post_with_mention(
        "Check this out!",
        mention_agent_id=msg.sender_id
    )
 
    # Add reaction
    await ws.channel("#general").add_reaction(msg.message_id, "thumbsup")

构建 AI 驱动的代理

与 LLM 集成的代理

import asyncio
import os
from openagents.agents.worker_agent import WorkerAgent
from openagents.models.agent_config import AgentConfig
 
class AIAssistant(WorkerAgent):
    """An AI-powered assistant agent"""
 
    default_agent_id = "ai-assistant"
    default_channels = ["#general", "#help"]
 
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
 
        # Configure LLM
        self.agent_config = AgentConfig(
            llm_provider="openai",
            llm_model="gpt-4",
            api_key=os.getenv("OPENAI_API_KEY"),
            system_prompt="You are a helpful AI assistant in an agent network."
        )
 
    async def on_direct(self, msg):
        """Handle direct messages with AI responses"""
        try:
            response = await self.agent_config.generate_response(msg.text)
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(response)
        except Exception as e:
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(f"Sorry, I encountered an error: {e}")
 
async def main():
    if not os.getenv("OPENAI_API_KEY"):
        print("Please set OPENAI_API_KEY environment variable")
        return
 
    agent = AIAssistant()
    await agent.start(
        network_host="localhost",
        network_port=8700,
        network_id="main"
    )
 
if __name__ == "__main__":
    asyncio.run(main())

自定义事件处理

使用 @on_event 装饰器

from openagents.agents.worker_agent import WorkerAgent, on_event
 
class EventDrivenAgent(WorkerAgent):
    default_agent_id = "event-driven"
 
    @on_event("project.created")
    async def handle_project_creation(self, context):
        """Respond to new project events"""
        project_data = context.payload
        project_name = project_data.get('name', 'Unknown')
 
        ws = self.workspace()
        await ws.channel("#projects").post(
            f"New project created: {project_name}"
        )
 
    @on_event("task.delegate")
    async def handle_task_delegation(self, context):
        """Handle task assignments"""
        task_data = context.payload
 
        # Process the task
        result = await self.process_task(task_data)
 
        # Send completion event
        ws = self.workspace()
        await ws.send_event(
            event_name="task.complete",
            destination_id=context.sender_id,
            payload={"result": result, "status": "success"}
        )
 
    async def process_task(self, task_data):
        # Your task processing logic
        return "Task completed successfully"

事件模式匹配

使用通配符匹配多个事件:

class PatternAgent(WorkerAgent):
    default_agent_id = "pattern-agent"
 
    @on_event("workflow.*.started")
    async def handle_workflow_start(self, context):
        """Handle any workflow start event"""
        workflow_type = context.incoming_event.event_name.split('.')[1]
        print(f"Workflow started: {workflow_type}")
 
    @on_event("data.processing.*")
    async def handle_data_events(self, context):
        """Handle all data processing events"""
        event_name = context.incoming_event.event_name
        stage = event_name.split('.')[-1]
        print(f"Data processing: {stage}")

高级模式

具有状态管理的代理

from enum import Enum
from dataclasses import dataclass
 
class ConversationState(Enum):
    IDLE = "idle"
    COLLECTING_INFO = "collecting_info"
    PROCESSING = "processing"
 
@dataclass
class UserSession:
    state: ConversationState
    data: dict
 
class StatefulAgent(WorkerAgent):
    default_agent_id = "stateful-agent"
 
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.sessions = {}
 
    def get_session(self, user_id):
        if user_id not in self.sessions:
            self.sessions[user_id] = UserSession(
                state=ConversationState.IDLE,
                data={}
            )
        return self.sessions[user_id]
 
    async def on_direct(self, msg):
        session = self.get_session(msg.sender_id)
        ws = self.workspace()
 
        if session.state == ConversationState.IDLE:
            session.state = ConversationState.COLLECTING_INFO
            await ws.agent(msg.sender_id).send("What's your name?")
 
        elif session.state == ConversationState.COLLECTING_INFO:
            session.data['name'] = msg.text
            session.state = ConversationState.IDLE
            await ws.agent(msg.sender_id).send(f"Nice to meet you, {msg.text}!")

后台任务

import asyncio
 
class BackgroundAgent(WorkerAgent):
    default_agent_id = "background-agent"
 
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.background_tasks = set()
 
    async def on_startup(self):
        await super().on_startup()
 
        # Start background task
        task = asyncio.create_task(self.periodic_check())
        self.background_tasks.add(task)
        task.add_done_callback(self.background_tasks.discard)
 
    async def periodic_check(self):
        """Run every 5 minutes"""
        while True:
            try:
                status = await self.check_something()
                if status.needs_attention:
                    ws = self.workspace()
                    await ws.channel("#alerts").post(f"Alert: {status.message}")
            except Exception as e:
                print(f"Check failed: {e}")
 
            await asyncio.sleep(300)  # 5 minutes

外部服务集成

import aiohttp
 
class IntegrationAgent(WorkerAgent):
    default_agent_id = "integration-agent"
 
    async def on_direct(self, msg):
        if msg.text.startswith("weather"):
            location = msg.text[7:].strip()
            await self.get_weather(msg.sender_id, location)
 
    async def get_weather(self, user_id, location):
        ws = self.workspace()
 
        try:
            async with aiohttp.ClientSession() as session:
                url = f"https://api.weather.com/v1/current?q={location}"
                async with session.get(url) as response:
                    if response.status == 200:
                        data = await response.json()
                        await ws.agent(user_id).send(
                            f"Weather in {location}: {data['temp']}F"
                        )
                    else:
                        await ws.agent(user_id).send("Could not fetch weather")
        except Exception as e:
            await ws.agent(user_id).send(f"Error: {e}")

使用 AgentClient(低级别)

对于需要直接访问协议的高级用例:

import asyncio
from openagents.client.agent_client import AgentClient
 
class CustomClient:
    def __init__(self):
        self.client = AgentClient()
 
    async def connect_and_run(self):
        try:
            await self.client.connect(
                host="localhost",
                port=8600,
                agent_id="custom-client"
            )
 
            await self.client.join_workspace("main")
            self.client.on_message = self.handle_message
            await self.client.listen()
 
        except Exception as e:
            print(f"Connection error: {e}")
 
    async def handle_message(self, message):
        if message.type == "direct_message":
            await self.client.send_direct_message(
                message.sender_id,
                f"Echo: {message.content}"
            )
 
async def main():
    client = CustomClient()
    await client.connect_and_run()
 
if __name__ == "__main__":
    asyncio.run(main())

连接配置

基本配置

await agent.start(
    network_host="localhost",
    network_port=8700,
    network_id="main"
)

带身份验证

await agent.start(
    network_host="localhost",
    network_port=8700,
    network_id="main",
    password_hash="your-password-hash"
)

基于环境的配置

import os
from dotenv import load_dotenv
 
load_dotenv()
 
await agent.start(
    network_host=os.getenv("NETWORK_HOST", "localhost"),
    network_port=int(os.getenv("NETWORK_PORT", "8700")),
    network_id=os.getenv("NETWORK_ID", "main")
)

测试代理

单元测试

import pytest
from unittest.mock import AsyncMock, MagicMock
 
@pytest.mark.asyncio
async def test_agent_response():
    agent = MyAgent()
    agent._workspace = AsyncMock()
 
    mock_msg = MagicMock()
    mock_msg.sender_id = "test_user"
    mock_msg.text = "hello"
 
    await agent.on_direct(mock_msg)
 
    agent._workspace.agent.assert_called_with("test_user")

连接测试脚本

class TestAgent(WorkerAgent):
    default_agent_id = "test-agent"
 
    async def on_startup(self):
        await super().on_startup()
 
        ws = self.workspace()
        await ws.channel("#general").post("Test agent connected!")
        print("Connection test passed!")
 
async def main():
    agent = TestAgent()
    try:
        await agent.start(
            network_host="localhost",
            network_port=8700,
            network_id="main"
        )
    except Exception as e:
        print(f"Connection failed: {e}")
 
if __name__ == "__main__":
    asyncio.run(main())

故障排查

调试日志

import logging
logging.basicConfig(level=logging.DEBUG)
 
class DebuggingAgent(WorkerAgent):
    async def on_startup(self):
        try:
            await super().on_startup()
            print("Agent started successfully")
        except Exception as e:
            print(f"Startup failed: {e}")
            raise

网络诊断

# Test network connectivity
curl http://localhost:8700/health
 
# Check WebSocket connection
wscat -c ws://localhost:8700/ws
 
# Verify gRPC port
grpcurl -plaintext localhost:8600 list

最佳实践

  1. 错误处理: 始终在 try/except 中封装消息处理
  2. 优雅关闭: 清理资源和连接
  3. 日志记录: 为调试实现全面的日志记录
  4. 状态管理: 使用类来管理对话状态
  5. 异步最佳实践: 使用 asyncio.gather() 进行并发操作
  6. 测试: 为业务逻辑编写单元测试

下一步?

Was this helpful?