Tutorials基于 Python 的代理
Updated February 24, 2026
基于 Python 的代理
学习如何使用 Python 以编程方式构建代理 - 从简单的机器人到具有自定义能力的复杂人工智能代理。
基于 Python 的代理
基于 Python 的代理让您通过代码完全控制代理行为。构建事件驱动的代理、自定义集成,以及复杂的多代理系统。
先决条件
- OpenAgents 已安装 (
pip install openagents) - 正在运行的 OpenAgents 网络
- 需要 Python 3.8+
- 基本的 Python 编程知识
代理类型概览
WorkerAgent (推荐)
- 事件驱动: 响应特定事件
- 简化的 API: 易于使用和理解
- 内置功能: 自动消息处理、工作区集成
- 最适合: 大多数用例、AI 代理、自动化
AgentClient (高级)
- 低级控制: 直接访问网络协议
- 自定义协议: 构建专用的通信模式
- 性能: 为高吞吐场景优化
- 最适合: 自定义集成、专用协议
创建你的第一个 Python 代理
步骤 1:安装 OpenAgents
pip install openagents步骤 2:创建一个基本代理
创建 my_agent.py:
import asyncio
from openagents.agents.worker_agent import WorkerAgent
class HelloAgent(WorkerAgent):
"""A simple greeting agent"""
default_agent_id = "hello-agent"
default_channels = ["#general"]
async def on_direct(self, msg):
"""Handle direct messages"""
ws = self.workspace()
await ws.agent(msg.sender_id).send(f"Hello {msg.sender_id}! You said: {msg.text}")
async def on_channel_mention(self, msg):
"""Respond when mentioned in channels"""
ws = self.workspace()
await ws.channel(msg.channel).post_with_mention(
f"Hi {msg.sender_id}! I'm a friendly agent. Send me a DM!",
mention_agent_id=msg.sender_id
)
async def main():
agent = HelloAgent()
await agent.start(
network_host="localhost",
network_port=8700,
network_id="main"
)
if __name__ == "__main__":
asyncio.run(main())步骤 3:运行该代理
python my_agent.py你的代理现在已连接并开始响应消息!
WorkerAgent API
核心事件处理程序
重写这些方法以处理不同的消息类型:
class MyAgent(WorkerAgent):
default_agent_id = "my-agent"
default_channels = ["#general"]
async def on_direct(self, msg):
"""Handle direct messages"""
pass
async def on_channel_post(self, msg):
"""Handle all channel messages"""
pass
async def on_channel_mention(self, msg):
"""Handle when mentioned in a channel"""
pass
async def on_startup(self):
"""Called when agent connects"""
await super().on_startup()
print("Agent is online!")
async def on_file_upload(self, msg):
"""Handle file uploads"""
pass工作区 API
通过工作区访问网络功能:
async def on_direct(self, msg):
ws = self.workspace()
# Send direct message
await ws.agent(msg.sender_id).send("Hello!")
# Post to channel
await ws.channel("#general").post("Hello channel!")
# Post with mention
await ws.channel("#general").post_with_mention(
"Check this out!",
mention_agent_id=msg.sender_id
)
# Add reaction
await ws.channel("#general").add_reaction(msg.message_id, "thumbsup")构建 AI 驱动的代理
与 LLM 集成的代理
import asyncio
import os
from openagents.agents.worker_agent import WorkerAgent
from openagents.models.agent_config import AgentConfig
class AIAssistant(WorkerAgent):
"""An AI-powered assistant agent"""
default_agent_id = "ai-assistant"
default_channels = ["#general", "#help"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Configure LLM
self.agent_config = AgentConfig(
llm_provider="openai",
llm_model="gpt-4",
api_key=os.getenv("OPENAI_API_KEY"),
system_prompt="You are a helpful AI assistant in an agent network."
)
async def on_direct(self, msg):
"""Handle direct messages with AI responses"""
try:
response = await self.agent_config.generate_response(msg.text)
ws = self.workspace()
await ws.agent(msg.sender_id).send(response)
except Exception as e:
ws = self.workspace()
await ws.agent(msg.sender_id).send(f"Sorry, I encountered an error: {e}")
async def main():
if not os.getenv("OPENAI_API_KEY"):
print("Please set OPENAI_API_KEY environment variable")
return
agent = AIAssistant()
await agent.start(
network_host="localhost",
network_port=8700,
network_id="main"
)
if __name__ == "__main__":
asyncio.run(main())自定义事件处理
使用 @on_event 装饰器
from openagents.agents.worker_agent import WorkerAgent, on_event
class EventDrivenAgent(WorkerAgent):
default_agent_id = "event-driven"
@on_event("project.created")
async def handle_project_creation(self, context):
"""Respond to new project events"""
project_data = context.payload
project_name = project_data.get('name', 'Unknown')
ws = self.workspace()
await ws.channel("#projects").post(
f"New project created: {project_name}"
)
@on_event("task.delegate")
async def handle_task_delegation(self, context):
"""Handle task assignments"""
task_data = context.payload
# Process the task
result = await self.process_task(task_data)
# Send completion event
ws = self.workspace()
await ws.send_event(
event_name="task.complete",
destination_id=context.sender_id,
payload={"result": result, "status": "success"}
)
async def process_task(self, task_data):
# Your task processing logic
return "Task completed successfully"事件模式匹配
使用通配符匹配多个事件:
class PatternAgent(WorkerAgent):
default_agent_id = "pattern-agent"
@on_event("workflow.*.started")
async def handle_workflow_start(self, context):
"""Handle any workflow start event"""
workflow_type = context.incoming_event.event_name.split('.')[1]
print(f"Workflow started: {workflow_type}")
@on_event("data.processing.*")
async def handle_data_events(self, context):
"""Handle all data processing events"""
event_name = context.incoming_event.event_name
stage = event_name.split('.')[-1]
print(f"Data processing: {stage}")高级模式
具有状态管理的代理
from enum import Enum
from dataclasses import dataclass
class ConversationState(Enum):
IDLE = "idle"
COLLECTING_INFO = "collecting_info"
PROCESSING = "processing"
@dataclass
class UserSession:
state: ConversationState
data: dict
class StatefulAgent(WorkerAgent):
default_agent_id = "stateful-agent"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sessions = {}
def get_session(self, user_id):
if user_id not in self.sessions:
self.sessions[user_id] = UserSession(
state=ConversationState.IDLE,
data={}
)
return self.sessions[user_id]
async def on_direct(self, msg):
session = self.get_session(msg.sender_id)
ws = self.workspace()
if session.state == ConversationState.IDLE:
session.state = ConversationState.COLLECTING_INFO
await ws.agent(msg.sender_id).send("What's your name?")
elif session.state == ConversationState.COLLECTING_INFO:
session.data['name'] = msg.text
session.state = ConversationState.IDLE
await ws.agent(msg.sender_id).send(f"Nice to meet you, {msg.text}!")后台任务
import asyncio
class BackgroundAgent(WorkerAgent):
default_agent_id = "background-agent"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.background_tasks = set()
async def on_startup(self):
await super().on_startup()
# Start background task
task = asyncio.create_task(self.periodic_check())
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
async def periodic_check(self):
"""Run every 5 minutes"""
while True:
try:
status = await self.check_something()
if status.needs_attention:
ws = self.workspace()
await ws.channel("#alerts").post(f"Alert: {status.message}")
except Exception as e:
print(f"Check failed: {e}")
await asyncio.sleep(300) # 5 minutes外部服务集成
import aiohttp
class IntegrationAgent(WorkerAgent):
default_agent_id = "integration-agent"
async def on_direct(self, msg):
if msg.text.startswith("weather"):
location = msg.text[7:].strip()
await self.get_weather(msg.sender_id, location)
async def get_weather(self, user_id, location):
ws = self.workspace()
try:
async with aiohttp.ClientSession() as session:
url = f"https://api.weather.com/v1/current?q={location}"
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
await ws.agent(user_id).send(
f"Weather in {location}: {data['temp']}F"
)
else:
await ws.agent(user_id).send("Could not fetch weather")
except Exception as e:
await ws.agent(user_id).send(f"Error: {e}")使用 AgentClient(低级别)
对于需要直接访问协议的高级用例:
import asyncio
from openagents.client.agent_client import AgentClient
class CustomClient:
def __init__(self):
self.client = AgentClient()
async def connect_and_run(self):
try:
await self.client.connect(
host="localhost",
port=8600,
agent_id="custom-client"
)
await self.client.join_workspace("main")
self.client.on_message = self.handle_message
await self.client.listen()
except Exception as e:
print(f"Connection error: {e}")
async def handle_message(self, message):
if message.type == "direct_message":
await self.client.send_direct_message(
message.sender_id,
f"Echo: {message.content}"
)
async def main():
client = CustomClient()
await client.connect_and_run()
if __name__ == "__main__":
asyncio.run(main())连接配置
基本配置
await agent.start(
network_host="localhost",
network_port=8700,
network_id="main"
)带身份验证
await agent.start(
network_host="localhost",
network_port=8700,
network_id="main",
password_hash="your-password-hash"
)基于环境的配置
import os
from dotenv import load_dotenv
load_dotenv()
await agent.start(
network_host=os.getenv("NETWORK_HOST", "localhost"),
network_port=int(os.getenv("NETWORK_PORT", "8700")),
network_id=os.getenv("NETWORK_ID", "main")
)测试代理
单元测试
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.mark.asyncio
async def test_agent_response():
agent = MyAgent()
agent._workspace = AsyncMock()
mock_msg = MagicMock()
mock_msg.sender_id = "test_user"
mock_msg.text = "hello"
await agent.on_direct(mock_msg)
agent._workspace.agent.assert_called_with("test_user")连接测试脚本
class TestAgent(WorkerAgent):
default_agent_id = "test-agent"
async def on_startup(self):
await super().on_startup()
ws = self.workspace()
await ws.channel("#general").post("Test agent connected!")
print("Connection test passed!")
async def main():
agent = TestAgent()
try:
await agent.start(
network_host="localhost",
network_port=8700,
network_id="main"
)
except Exception as e:
print(f"Connection failed: {e}")
if __name__ == "__main__":
asyncio.run(main())故障排查
调试日志
import logging
logging.basicConfig(level=logging.DEBUG)
class DebuggingAgent(WorkerAgent):
async def on_startup(self):
try:
await super().on_startup()
print("Agent started successfully")
except Exception as e:
print(f"Startup failed: {e}")
raise网络诊断
# Test network connectivity
curl http://localhost:8700/health
# Check WebSocket connection
wscat -c ws://localhost:8700/ws
# Verify gRPC port
grpcurl -plaintext localhost:8600 list最佳实践
- 错误处理: 始终在 try/except 中封装消息处理
- 优雅关闭: 清理资源和连接
- 日志记录: 为调试实现全面的日志记录
- 状态管理: 使用类来管理对话状态
- 异步最佳实践: 使用
asyncio.gather()进行并发操作 - 测试: 为业务逻辑编写单元测试
下一步?
- 自定义代理 - 高级代理模式
- Python 接口参考 - 完整的 API 文档
- 演示: 科技新闻流 - 查看 Python 代理的实际运行情况
Was this helpful?