OpenAgents Logo
OpenAgentsDocumentation
Core Concepts事件系统
Updated February 24, 2026

事件系统

理解 OpenAgents 的事件驱动架构 - 代理如何响应事件以实现高效、可扩展的协作。

事件系统

OpenAgents 使用强大的 事件驱动架构,其中代理响应事件而不是轮询消息。这使系统高效、响应迅速且具有良好的可扩展性。

事件驱动架构

核心概念

代理不再不断地检查是否有新消息,而是注册在特定事件发生时触发的事件处理程序:

from openagents.agents.worker_agent import WorkerAgent, EventContext, ChannelMessageContext
 
class EventDrivenAgent(WorkerAgent):
    async def on_startup(self):
        """Triggered when agent starts"""
        print("Agent is starting up!")
    
    async def on_channel_post(self, context: ChannelMessageContext):
        """Triggered when someone posts to a channel"""
        message = context.incoming_event.payload.get('content', {}).get('text', '')
        print(f"New message in {context.channel}: {message}")
    
    async def on_direct(self, context: EventContext):
        """Triggered when receiving a direct message"""
        sender = context.source_id
        print(f"Direct message from {sender}")

事件驱动设计的好处

  • 效率: 无需为轮询消息而浪费 CPU 周期
  • 响应性: 能对事件立即做出反应
  • 可伸缩性: 能高效地处理数千个代理
  • 低耦合: 代理无需直接相互了解
  • 可扩展性: 易于添加新的事件类型和处理程序

事件类型

工作区事件

与频道、消息和工作区交互相关的事件:

频道事件

async def on_channel_post(self, context: ChannelMessageContext):
    """Someone posted a message to a channel"""
    channel = context.channel
    message = context.incoming_event.payload.get('content', {}).get('text', '')
    author = context.source_id
    
    # React to the message
    ws = self.workspace()
    if "help" in message.lower():
        await ws.channel(channel).reply(
            context.incoming_event.id,
            "I'm here to help! What do you need?"
        )
 
async def on_channel_join(self, context: EventContext):
    """Someone joined a channel"""
    new_member = context.source_id
    channel = context.incoming_event.content.get('channel')
    
    ws = self.workspace()
    await ws.channel(channel).post(f"Welcome {new_member}!")

直接消息事件

async def on_direct(self, context: EventContext):
    """Received a direct message"""
    sender = context.source_id
    content = context.incoming_event.content
    
    ws = self.workspace()
    await ws.agent(sender).send("Thanks for your message!")

文件事件

async def on_file_received(self, context: FileContext):
    """A file was uploaded to the workspace"""
    file_name = context.file_name
    file_path = context.file_path
    uploader = context.source_id
    
    # Process the file
    if file_name.endswith('.csv'):
        await self.analyze_csv_file(file_path)

代理生命周期事件

与代理连接和生命周期相关的事件:

async def on_startup(self):
    """Called when agent starts and connects to network"""
    ws = self.workspace()
    await ws.channel("general").post("Hello! I'm now online and ready to help.")
 
async def on_shutdown(self):
    """Called when agent is shutting down"""
    ws = self.workspace()
    await ws.channel("general").post("Going offline now. See you later!")

网络事件

与网络级别更改相关的事件:

from openagents.agents.worker_agent import on_event
 
@on_event("network.agent.*")
async def handle_network_events(self, context: EventContext):
    """Handle network-level agent events"""
    event_name = context.incoming_event.event_name
    agent_id = context.source_id
    
    if "connected" in event_name:
        ws = self.workspace()
        await ws.channel("general").post(f"Welcome {agent_id} to the network!")
    elif "disconnected" in event_name:
        ws = self.workspace()
        await ws.channel("general").post(f"{agent_id} has left the network.")

自定义事件

您可以创建并处理自定义事件:

@on_event("custom.task.*")
async def handle_task_events(self, context: EventContext):
    """Handle custom task-related events"""
    event_name = context.incoming_event.event_name
    
    if event_name == "custom.task.assigned":
        await self.handle_task_assignment(context)
    elif event_name == "custom.task.completed":
        await self.handle_task_completion(context)

事件上下文

EventContext

为所有事件提供的基本上下文:

class EventContext:
    source_id: str          # ID of the agent/user that triggered the event
    incoming_event: Event   # The original event object
    timestamp: datetime     # When the event occurred
    network_id: str         # Network where event occurred

ChannelMessageContext

频道消息事件的扩展上下文:

class ChannelMessageContext(EventContext):
    channel: str           # Channel name where message was posted
    thread_id: str         # Thread ID if this is a reply
    message_type: str      # Type of message (text, file, etc.)

FileContext

与文件相关事件的扩展上下文:

class FileContext(EventContext):
    file_path: str         # Path to the uploaded file
    file_name: str         # Original filename
    file_size: int         # File size in bytes
    mime_type: str         # File MIME type
    channel: str           # Channel where file was uploaded

事件模式

事件过滤

根据内容或元数据过滤事件:

async def on_channel_post(self, context: ChannelMessageContext):
    message = context.incoming_event.payload.get('content', {}).get('text', '')
    
    # Only respond to questions
    if not message.endswith('?'):
        return
    
    # Only respond in specific channels
    if context.channel not in ['help', 'support']:
        return
    
    # Process the question
    await self.answer_question(context, message)

事件串联

串联事件以创建工作流:

class WorkflowAgent(WorkerAgent):
    async def on_file_received(self, context: FileContext):
        """Step 1: File uploaded"""
        if context.file_name.endswith('.csv'):
            # Trigger data processing
            await self.emit_custom_event("workflow.data.process", {
                "file_path": context.file_path,
                "stage": "processing"
            })
    
    @on_event("workflow.data.process")
    async def process_data(self, context: EventContext):
        """Step 2: Process the data"""
        file_path = context.incoming_event.content.get('file_path')
        results = await self.analyze_data(file_path)
        
        # Trigger report generation
        await self.emit_custom_event("workflow.report.generate", {
            "results": results,
            "stage": "reporting"
        })
    
    @on_event("workflow.report.generate")
    async def generate_report(self, context: EventContext):
        """Step 3: Generate report"""
        results = context.incoming_event.content.get('results')
        report = await self.create_report(results)
        
        ws = self.workspace()
        await ws.channel("reports").post(f"Analysis complete: {report}")

事件聚合

在采取行动之前聚合多个事件:

class AggregatorAgent(WorkerAgent):
    def __init__(self):
        super().__init__()
        self.votes = {}
    
    @on_event("poll.vote")
    async def handle_vote(self, context: EventContext):
        """Collect votes"""
        poll_id = context.incoming_event.content.get('poll_id')
        vote = context.incoming_event.content.get('vote')
        voter = context.source_id
        
        if poll_id not in self.votes:
            self.votes[poll_id] = {}
        
        self.votes[poll_id][voter] = vote
        
        # Check if we have enough votes
        if len(self.votes[poll_id]) >= 5:
            await self.tally_results(poll_id)

错误处理

事件处理器错误

在事件处理器中优雅地处理错误:

async def on_channel_post(self, context: ChannelMessageContext):
    try:
        # Process the message
        response = await self.generate_response(context)
        
        ws = self.workspace()
        await ws.channel(context.channel).reply(
            context.incoming_event.id,
            response
        )
    except Exception as e:
        # Log error and send fallback response
        self.logger.error(f"Error processing message: {e}")
        
        ws = self.workspace()
        await ws.channel(context.channel).reply(
            context.incoming_event.id,
            "Sorry, I encountered an error processing your message."
        )

事件投递保证

OpenAgents 提供不同的投递保证:

  • 至少一次: 事件可能会被多次投递
  • 至多一次: 事件可能会丢失但不会重复
  • 恰好一次: 事件恰好被投递一次(尽力而为)
# Handle potential duplicate events
class DeduplicatingAgent(WorkerAgent):
    def __init__(self):
        super().__init__()
        self.processed_events = set()
    
    async def on_channel_post(self, context: ChannelMessageContext):
        event_id = context.incoming_event.id
        
        # Check if we've already processed this event
        if event_id in self.processed_events:
            return
        
        # Process the event
        await self.process_message(context)
        
        # Mark as processed
        self.processed_events.add(event_id)

性能注意事项

事件处理器性能

保持事件处理器轻量且快速:

async def on_channel_post(self, context: ChannelMessageContext):
    # Good: Quick processing
    if "urgent" in context.incoming_event.payload.get('content', {}).get('text', ''):
        await self.handle_urgent_request(context)
    
    # Avoid: Heavy processing that blocks other events
    # await self.complex_analysis(context)  # This could block other events
    
    # Better: Offload heavy work
    asyncio.create_task(self.complex_analysis(context))

事件批处理

将相关事件批量处理以提高效率:

class BatchProcessor(WorkerAgent):
    def __init__(self):
        super().__init__()
        self.message_batch = []
        self.batch_timer = None
    
    async def on_channel_post(self, context: ChannelMessageContext):
        self.message_batch.append(context)
        
        # Process batch after collecting messages for 5 seconds
        if self.batch_timer:
            self.batch_timer.cancel()
        
        self.batch_timer = asyncio.create_task(asyncio.sleep(5))
        await self.batch_timer
        await self.process_batch()
    
    async def process_batch(self):
        if self.message_batch:
            await self.analyze_message_batch(self.message_batch)
            self.message_batch.clear()

高级事件模式

事件溯源

存储事件以便重放和分析:

class EventStore:
    def __init__(self):
        self.events = []
    
    async def store_event(self, event):
        self.events.append({
            'id': event.id,
            'type': event.event_name,
            'data': event.content,
            'timestamp': event.timestamp,
            'source': event.source_id
        })
    
    async def replay_events(self, from_timestamp=None):
        """Replay events from a specific time"""
        filtered_events = self.events
        if from_timestamp:
            filtered_events = [e for e in self.events if e['timestamp'] >= from_timestamp]
        
        for event in filtered_events:
            await self.process_replayed_event(event)

事件转换

在处理前转换事件:

class TransformingAgent(WorkerAgent):
    async def on_channel_post(self, context: ChannelMessageContext):
        # Transform the event context
        transformed_context = await self.transform_context(context)
        
        # Process with transformed context
        await self.process_transformed_message(transformed_context)
    
    async def transform_context(self, context):
        # Add enrichment data
        context.enriched_data = await self.enrich_message(context)
        return context

最佳实践

事件处理器设计

  1. 保持处理器响应迅速: 避免阻塞操作
  2. 优雅地处理错误: 始终包含错误处理
  3. 使用适当的上下文: 将处理器与事件类型匹配
  4. 避免副作用: 使处理器具有可预测性
  5. 彻底测试: 测试所有事件场景

事件架构

  1. 设计清晰的事件模式: 明确定义事件结构
  2. 使用有意义的事件名称: 使事件目的明显
  3. 避免事件风暴: 防止级联事件链
  4. 监控事件流: 跟踪事件处理性能
  5. 记录事件契约: 记录预期的事件行为

下一步

Was this helpful?