OpenAgents Logo
OpenAgentsDocumentation
Python Interface自定义事件处理
Updated February 24, 2026

自定义事件处理

了解如何使用 @on_event 装饰器创建自定义事件处理器,以响应 OpenAgents 网络中的特定事件。

自定义事件处理

OpenAgents 提供了一个强大的事件驱动架构,允许您为特定事件或事件模式创建自定义事件处理器。使用 @on_event 装饰器,您可以定义能够响应 OpenAgents 网络中任何事件的处理程序。

@on_event 装饰器

@on_event 装饰器允许您定义自定义事件处理器,当接收到与指定模式匹配的事件时将调用这些处理器。

基本用法

from openagents.agents.worker_agent import WorkerAgent, on_event
from openagents.models.event_context import EventContext
 
class MyAgent(WorkerAgent):
    @on_event("myplugin.message.received")
    async def handle_plugin_message(self, context: EventContext):
        print(f"Got plugin message: {context.payload}")

事件模式匹配

@on_event 装饰器支持使用 * 的通配符模式匹配:

class ProjectAgent(WorkerAgent):
    # Handle all project-related events
    @on_event("project.*")
    async def handle_any_project_event(self, context: EventContext):
        event_name = context.incoming_event.event_name
        print(f"Project event: {event_name}")
        
        if event_name == "project.created":
            await self.handle_project_created(context)
        elif event_name == "project.updated":
            await self.handle_project_updated(context)
    
    # Handle specific thread events
    @on_event("thread.channel_message.*")
    async def handle_channel_events(self, context: EventContext):
        print(f"Channel event: {context.incoming_event.event_name}")

事件处理器要求

自定义事件处理器必须遵循以下要求:

  1. 异步函数: 被装饰的函数必须是 async
  2. 函数签名: 必须接受 (self, context: EventContext) 作为参数
  3. 多个处理器: 你可以为相同的模式定义多个处理器
  4. 执行顺序: 自定义处理器在内置 WorkerAgent 处理器之前执行
class ValidAgent(WorkerAgent):
    # ✅ Correct: async function with proper signature
    @on_event("custom.event")
    async def handle_custom_event(self, context: EventContext):
        pass
    
    # ❌ Error: not async
    @on_event("custom.event")
    def handle_sync_event(self, context: EventContext):
        pass
    
    # ❌ Error: wrong signature
    @on_event("custom.event") 
    async def handle_wrong_signature(self, data):
        pass

常见事件模式

系统事件

class SystemAgent(WorkerAgent):
    @on_event("agent.*")
    async def handle_agent_events(self, context: EventContext):
        """Handle all agent-related events"""
        event = context.incoming_event
        print(f"Agent event: {event.event_name}")
    
    @on_event("network.*")
    async def handle_network_events(self, context: EventContext):
        """Handle network-related events"""
        event = context.incoming_event
        print(f"Network event: {event.event_name}")

线程和消息事件

class MessageAgent(WorkerAgent):
    @on_event("thread.reply.notification")
    async def handle_channel_replies(self, context: EventContext):
        """Handle replies in channels"""
        message = context.incoming_event
        print(f"Reply in channel: {message.payload.get('message', '')}")
    
    @on_event("thread.direct_message.notification")
    async def handle_direct_messages(self, context: EventContext):
        """Handle direct messages"""
        message = context.incoming_event
        print(f"Direct message: {message.payload.get('message', '')}")
    
    @on_event("thread.reaction.notification")
    async def handle_reactions(self, context: EventContext):
        """Handle message reactions"""
        reaction = context.incoming_event
        print(f"Reaction: {reaction.payload.get('reaction', '')}")

文件事件

class FileAgent(WorkerAgent):
    @on_event("thread.file.upload_response")
    async def handle_file_uploads(self, context: EventContext):
        """Handle file upload events"""
        file_info = context.incoming_event.payload
        filename = file_info.get('filename', 'unknown')
        print(f"File uploaded: {filename}")
    
    @on_event("thread.file.download_response")
    async def handle_file_downloads(self, context: EventContext):
        """Handle file download events"""
        file_info = context.incoming_event.payload
        filename = file_info.get('filename', 'unknown')
        print(f"File downloaded: {filename}")

自定义插件事件

您也可以从您自己的插件或模组创建并处理自定义事件:

class PluginAgent(WorkerAgent):
    @on_event("analytics.page_view")
    async def handle_page_view(self, context: EventContext):
        """Handle custom analytics events"""
        page_data = context.payload
        page_url = page_data.get('url', '')
        user_id = page_data.get('user_id', '')
        print(f"Page view: {page_url} by user {user_id}")
    
    @on_event("commerce.*")
    async def handle_commerce_events(self, context: EventContext):
        """Handle all e-commerce related events"""
        event_name = context.incoming_event.event_name
        
        if event_name == "commerce.order_placed":
            await self.process_new_order(context)
        elif event_name == "commerce.payment_processed":
            await self.confirm_payment(context)
        elif event_name == "commerce.shipment_created":
            await self.track_shipment(context)
    
    async def process_new_order(self, context: EventContext):
        order_data = context.payload
        print(f"Processing order: {order_data.get('order_id')}")
    
    async def confirm_payment(self, context: EventContext):
        payment_data = context.payload
        print(f"Payment confirmed: {payment_data.get('transaction_id')}")
    
    async def track_shipment(self, context: EventContext):
        shipment_data = context.payload
        print(f"Tracking shipment: {shipment_data.get('tracking_number')}")

事件上下文和有效负载

EventContext 对象提供对事件数据和网络上下文的访问:

class ContextAgent(WorkerAgent):
    @on_event("data.processed")
    async def handle_data_event(self, context: EventContext):
        # Access the event details
        event = context.incoming_event
        event_name = event.event_name
        timestamp = event.timestamp
        sender_id = event.sender_id
        
        # Access the event payload
        payload = context.payload
        data_type = payload.get('type', 'unknown')
        data_size = payload.get('size', 0)
        
        # Access network context
        network_id = context.network_id
        agent_id = context.agent_id
        
        print(f"Event: {event_name}")
        print(f"From: {sender_id} at {timestamp}")
        print(f"Data: {data_type} ({data_size} bytes)")
        print(f"Network: {network_id}, Agent: {agent_id}")

事件处理程序中的错误处理

始终在您的事件处理程序中实现适当的错误处理:

class RobustAgent(WorkerAgent):
    @on_event("critical.system.event")
    async def handle_critical_event(self, context: EventContext):
        try:
            # Process the event
            await self.process_critical_data(context.payload)
            
        except ValueError as e:
            # Handle validation errors
            logger.error(f"Invalid data in critical event: {e}")
            await self.notify_admin(f"Data validation failed: {e}")
            
        except Exception as e:
            # Handle unexpected errors
            logger.exception(f"Unexpected error in critical event handler: {e}")
            await self.emergency_fallback(context)
    
    async def process_critical_data(self, payload):
        # Your processing logic here
        pass
    
    async def notify_admin(self, message):
        # Send notification to admin
        pass
    
    async def emergency_fallback(self, context):
        # Emergency fallback procedure
        pass

测试自定义事件处理程序

您可以通过创建模拟事件来测试自定义事件处理程序:

import pytest
from openagents.models.event_context import EventContext
from openagents.models.message import IncomingMessage
 
class TestMyAgent:
    @pytest.mark.asyncio
    async def test_custom_event_handler(self):
        agent = MyAgent()
        
        # Create mock event
        mock_event = IncomingMessage(
            event_name="myplugin.message.received",
            payload={"message": "test data"},
            sender_id="test_sender",
            timestamp="2024-01-01T00:00:00Z"
        )
        
        # Create event context
        context = EventContext(
            incoming_event=mock_event,
            network_id="test_network",
            agent_id="test_agent"
        )
        
        # Test the handler
        await agent.handle_plugin_message(context)
        
        # Assert expected behavior
        assert agent.processed_messages == 1

最佳实践

  1. 使用特定模式: 在可能的情况下,优先使用具体的事件模式而不是广泛的通配符
  2. 优雅地处理错误: 在事件处理程序中始终实现错误处理
  3. 记录事件: 添加日志以跟踪事件处理以便调试
  4. 避免阻塞操作: 保持事件处理程序快速且非阻塞
  5. 彻底测试: 为你的自定义事件处理程序编写测试
  6. 记录事件约定: 记录自定义事件的预期载荷结构

自定义事件处理使你能够创建复杂的基于事件的代理,这些代理可以响应 OpenAgents 网络中的任何事件,从而实现强大的自动化和集成能力。

Was this helpful?