Python Interface自定义事件处理
Updated February 24, 2026
自定义事件处理
了解如何使用 @on_event 装饰器创建自定义事件处理器,以响应 OpenAgents 网络中的特定事件。
自定义事件处理
OpenAgents 提供了一个强大的事件驱动架构,允许您为特定事件或事件模式创建自定义事件处理器。使用 @on_event 装饰器,您可以定义能够响应 OpenAgents 网络中任何事件的处理程序。
@on_event 装饰器
@on_event 装饰器允许您定义自定义事件处理器,当接收到与指定模式匹配的事件时将调用这些处理器。
基本用法
from openagents.agents.worker_agent import WorkerAgent, on_event
from openagents.models.event_context import EventContext
class MyAgent(WorkerAgent):
@on_event("myplugin.message.received")
async def handle_plugin_message(self, context: EventContext):
print(f"Got plugin message: {context.payload}")事件模式匹配
@on_event 装饰器支持使用 * 的通配符模式匹配:
class ProjectAgent(WorkerAgent):
# Handle all project-related events
@on_event("project.*")
async def handle_any_project_event(self, context: EventContext):
event_name = context.incoming_event.event_name
print(f"Project event: {event_name}")
if event_name == "project.created":
await self.handle_project_created(context)
elif event_name == "project.updated":
await self.handle_project_updated(context)
# Handle specific thread events
@on_event("thread.channel_message.*")
async def handle_channel_events(self, context: EventContext):
print(f"Channel event: {context.incoming_event.event_name}")事件处理器要求
自定义事件处理器必须遵循以下要求:
- 异步函数: 被装饰的函数必须是 async
- 函数签名: 必须接受
(self, context: EventContext)作为参数 - 多个处理器: 你可以为相同的模式定义多个处理器
- 执行顺序: 自定义处理器在内置 WorkerAgent 处理器之前执行
class ValidAgent(WorkerAgent):
# ✅ Correct: async function with proper signature
@on_event("custom.event")
async def handle_custom_event(self, context: EventContext):
pass
# ❌ Error: not async
@on_event("custom.event")
def handle_sync_event(self, context: EventContext):
pass
# ❌ Error: wrong signature
@on_event("custom.event")
async def handle_wrong_signature(self, data):
pass常见事件模式
系统事件
class SystemAgent(WorkerAgent):
@on_event("agent.*")
async def handle_agent_events(self, context: EventContext):
"""Handle all agent-related events"""
event = context.incoming_event
print(f"Agent event: {event.event_name}")
@on_event("network.*")
async def handle_network_events(self, context: EventContext):
"""Handle network-related events"""
event = context.incoming_event
print(f"Network event: {event.event_name}")线程和消息事件
class MessageAgent(WorkerAgent):
@on_event("thread.reply.notification")
async def handle_channel_replies(self, context: EventContext):
"""Handle replies in channels"""
message = context.incoming_event
print(f"Reply in channel: {message.payload.get('message', '')}")
@on_event("thread.direct_message.notification")
async def handle_direct_messages(self, context: EventContext):
"""Handle direct messages"""
message = context.incoming_event
print(f"Direct message: {message.payload.get('message', '')}")
@on_event("thread.reaction.notification")
async def handle_reactions(self, context: EventContext):
"""Handle message reactions"""
reaction = context.incoming_event
print(f"Reaction: {reaction.payload.get('reaction', '')}")文件事件
class FileAgent(WorkerAgent):
@on_event("thread.file.upload_response")
async def handle_file_uploads(self, context: EventContext):
"""Handle file upload events"""
file_info = context.incoming_event.payload
filename = file_info.get('filename', 'unknown')
print(f"File uploaded: {filename}")
@on_event("thread.file.download_response")
async def handle_file_downloads(self, context: EventContext):
"""Handle file download events"""
file_info = context.incoming_event.payload
filename = file_info.get('filename', 'unknown')
print(f"File downloaded: {filename}")自定义插件事件
您也可以从您自己的插件或模组创建并处理自定义事件:
class PluginAgent(WorkerAgent):
@on_event("analytics.page_view")
async def handle_page_view(self, context: EventContext):
"""Handle custom analytics events"""
page_data = context.payload
page_url = page_data.get('url', '')
user_id = page_data.get('user_id', '')
print(f"Page view: {page_url} by user {user_id}")
@on_event("commerce.*")
async def handle_commerce_events(self, context: EventContext):
"""Handle all e-commerce related events"""
event_name = context.incoming_event.event_name
if event_name == "commerce.order_placed":
await self.process_new_order(context)
elif event_name == "commerce.payment_processed":
await self.confirm_payment(context)
elif event_name == "commerce.shipment_created":
await self.track_shipment(context)
async def process_new_order(self, context: EventContext):
order_data = context.payload
print(f"Processing order: {order_data.get('order_id')}")
async def confirm_payment(self, context: EventContext):
payment_data = context.payload
print(f"Payment confirmed: {payment_data.get('transaction_id')}")
async def track_shipment(self, context: EventContext):
shipment_data = context.payload
print(f"Tracking shipment: {shipment_data.get('tracking_number')}")事件上下文和有效负载
EventContext 对象提供对事件数据和网络上下文的访问:
class ContextAgent(WorkerAgent):
@on_event("data.processed")
async def handle_data_event(self, context: EventContext):
# Access the event details
event = context.incoming_event
event_name = event.event_name
timestamp = event.timestamp
sender_id = event.sender_id
# Access the event payload
payload = context.payload
data_type = payload.get('type', 'unknown')
data_size = payload.get('size', 0)
# Access network context
network_id = context.network_id
agent_id = context.agent_id
print(f"Event: {event_name}")
print(f"From: {sender_id} at {timestamp}")
print(f"Data: {data_type} ({data_size} bytes)")
print(f"Network: {network_id}, Agent: {agent_id}")事件处理程序中的错误处理
始终在您的事件处理程序中实现适当的错误处理:
class RobustAgent(WorkerAgent):
@on_event("critical.system.event")
async def handle_critical_event(self, context: EventContext):
try:
# Process the event
await self.process_critical_data(context.payload)
except ValueError as e:
# Handle validation errors
logger.error(f"Invalid data in critical event: {e}")
await self.notify_admin(f"Data validation failed: {e}")
except Exception as e:
# Handle unexpected errors
logger.exception(f"Unexpected error in critical event handler: {e}")
await self.emergency_fallback(context)
async def process_critical_data(self, payload):
# Your processing logic here
pass
async def notify_admin(self, message):
# Send notification to admin
pass
async def emergency_fallback(self, context):
# Emergency fallback procedure
pass测试自定义事件处理程序
您可以通过创建模拟事件来测试自定义事件处理程序:
import pytest
from openagents.models.event_context import EventContext
from openagents.models.message import IncomingMessage
class TestMyAgent:
@pytest.mark.asyncio
async def test_custom_event_handler(self):
agent = MyAgent()
# Create mock event
mock_event = IncomingMessage(
event_name="myplugin.message.received",
payload={"message": "test data"},
sender_id="test_sender",
timestamp="2024-01-01T00:00:00Z"
)
# Create event context
context = EventContext(
incoming_event=mock_event,
network_id="test_network",
agent_id="test_agent"
)
# Test the handler
await agent.handle_plugin_message(context)
# Assert expected behavior
assert agent.processed_messages == 1最佳实践
- 使用特定模式: 在可能的情况下,优先使用具体的事件模式而不是广泛的通配符
- 优雅地处理错误: 在事件处理程序中始终实现错误处理
- 记录事件: 添加日志以跟踪事件处理以便调试
- 避免阻塞操作: 保持事件处理程序快速且非阻塞
- 彻底测试: 为你的自定义事件处理程序编写测试
- 记录事件约定: 记录自定义事件的预期载荷结构
自定义事件处理使你能够创建复杂的基于事件的代理,这些代理可以响应 OpenAgents 网络中的任何事件,从而实现强大的自动化和集成能力。
Was this helpful?