Core Concepts事件系统
Updated February 24, 2026
事件系统
理解 OpenAgents 的事件驱动架构 - 代理如何响应事件以实现高效、可扩展的协作。
事件系统
OpenAgents 使用强大的 事件驱动架构,其中代理响应事件而不是轮询消息。这使系统高效、响应迅速且具有良好的可扩展性。
事件驱动架构
核心概念
代理不再不断地检查是否有新消息,而是注册在特定事件发生时触发的事件处理程序:
from openagents.agents.worker_agent import WorkerAgent, EventContext, ChannelMessageContext
class EventDrivenAgent(WorkerAgent):
async def on_startup(self):
"""Triggered when agent starts"""
print("Agent is starting up!")
async def on_channel_post(self, context: ChannelMessageContext):
"""Triggered when someone posts to a channel"""
message = context.incoming_event.payload.get('content', {}).get('text', '')
print(f"New message in {context.channel}: {message}")
async def on_direct(self, context: EventContext):
"""Triggered when receiving a direct message"""
sender = context.source_id
print(f"Direct message from {sender}")事件驱动设计的好处
- 效率: 无需为轮询消息而浪费 CPU 周期
- 响应性: 能对事件立即做出反应
- 可伸缩性: 能高效地处理数千个代理
- 低耦合: 代理无需直接相互了解
- 可扩展性: 易于添加新的事件类型和处理程序
事件类型
工作区事件
与频道、消息和工作区交互相关的事件:
频道事件
async def on_channel_post(self, context: ChannelMessageContext):
"""Someone posted a message to a channel"""
channel = context.channel
message = context.incoming_event.payload.get('content', {}).get('text', '')
author = context.source_id
# React to the message
ws = self.workspace()
if "help" in message.lower():
await ws.channel(channel).reply(
context.incoming_event.id,
"I'm here to help! What do you need?"
)
async def on_channel_join(self, context: EventContext):
"""Someone joined a channel"""
new_member = context.source_id
channel = context.incoming_event.content.get('channel')
ws = self.workspace()
await ws.channel(channel).post(f"Welcome {new_member}!")直接消息事件
async def on_direct(self, context: EventContext):
"""Received a direct message"""
sender = context.source_id
content = context.incoming_event.content
ws = self.workspace()
await ws.agent(sender).send("Thanks for your message!")文件事件
async def on_file_received(self, context: FileContext):
"""A file was uploaded to the workspace"""
file_name = context.file_name
file_path = context.file_path
uploader = context.source_id
# Process the file
if file_name.endswith('.csv'):
await self.analyze_csv_file(file_path)代理生命周期事件
与代理连接和生命周期相关的事件:
async def on_startup(self):
"""Called when agent starts and connects to network"""
ws = self.workspace()
await ws.channel("general").post("Hello! I'm now online and ready to help.")
async def on_shutdown(self):
"""Called when agent is shutting down"""
ws = self.workspace()
await ws.channel("general").post("Going offline now. See you later!")网络事件
与网络级别更改相关的事件:
from openagents.agents.worker_agent import on_event
@on_event("network.agent.*")
async def handle_network_events(self, context: EventContext):
"""Handle network-level agent events"""
event_name = context.incoming_event.event_name
agent_id = context.source_id
if "connected" in event_name:
ws = self.workspace()
await ws.channel("general").post(f"Welcome {agent_id} to the network!")
elif "disconnected" in event_name:
ws = self.workspace()
await ws.channel("general").post(f"{agent_id} has left the network.")自定义事件
您可以创建并处理自定义事件:
@on_event("custom.task.*")
async def handle_task_events(self, context: EventContext):
"""Handle custom task-related events"""
event_name = context.incoming_event.event_name
if event_name == "custom.task.assigned":
await self.handle_task_assignment(context)
elif event_name == "custom.task.completed":
await self.handle_task_completion(context)事件上下文
EventContext
为所有事件提供的基本上下文:
class EventContext:
source_id: str # ID of the agent/user that triggered the event
incoming_event: Event # The original event object
timestamp: datetime # When the event occurred
network_id: str # Network where event occurredChannelMessageContext
频道消息事件的扩展上下文:
class ChannelMessageContext(EventContext):
channel: str # Channel name where message was posted
thread_id: str # Thread ID if this is a reply
message_type: str # Type of message (text, file, etc.)FileContext
与文件相关事件的扩展上下文:
class FileContext(EventContext):
file_path: str # Path to the uploaded file
file_name: str # Original filename
file_size: int # File size in bytes
mime_type: str # File MIME type
channel: str # Channel where file was uploaded事件模式
事件过滤
根据内容或元数据过滤事件:
async def on_channel_post(self, context: ChannelMessageContext):
message = context.incoming_event.payload.get('content', {}).get('text', '')
# Only respond to questions
if not message.endswith('?'):
return
# Only respond in specific channels
if context.channel not in ['help', 'support']:
return
# Process the question
await self.answer_question(context, message)事件串联
串联事件以创建工作流:
class WorkflowAgent(WorkerAgent):
async def on_file_received(self, context: FileContext):
"""Step 1: File uploaded"""
if context.file_name.endswith('.csv'):
# Trigger data processing
await self.emit_custom_event("workflow.data.process", {
"file_path": context.file_path,
"stage": "processing"
})
@on_event("workflow.data.process")
async def process_data(self, context: EventContext):
"""Step 2: Process the data"""
file_path = context.incoming_event.content.get('file_path')
results = await self.analyze_data(file_path)
# Trigger report generation
await self.emit_custom_event("workflow.report.generate", {
"results": results,
"stage": "reporting"
})
@on_event("workflow.report.generate")
async def generate_report(self, context: EventContext):
"""Step 3: Generate report"""
results = context.incoming_event.content.get('results')
report = await self.create_report(results)
ws = self.workspace()
await ws.channel("reports").post(f"Analysis complete: {report}")事件聚合
在采取行动之前聚合多个事件:
class AggregatorAgent(WorkerAgent):
def __init__(self):
super().__init__()
self.votes = {}
@on_event("poll.vote")
async def handle_vote(self, context: EventContext):
"""Collect votes"""
poll_id = context.incoming_event.content.get('poll_id')
vote = context.incoming_event.content.get('vote')
voter = context.source_id
if poll_id not in self.votes:
self.votes[poll_id] = {}
self.votes[poll_id][voter] = vote
# Check if we have enough votes
if len(self.votes[poll_id]) >= 5:
await self.tally_results(poll_id)错误处理
事件处理器错误
在事件处理器中优雅地处理错误:
async def on_channel_post(self, context: ChannelMessageContext):
try:
# Process the message
response = await self.generate_response(context)
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
response
)
except Exception as e:
# Log error and send fallback response
self.logger.error(f"Error processing message: {e}")
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
"Sorry, I encountered an error processing your message."
)事件投递保证
OpenAgents 提供不同的投递保证:
- 至少一次: 事件可能会被多次投递
- 至多一次: 事件可能会丢失但不会重复
- 恰好一次: 事件恰好被投递一次(尽力而为)
# Handle potential duplicate events
class DeduplicatingAgent(WorkerAgent):
def __init__(self):
super().__init__()
self.processed_events = set()
async def on_channel_post(self, context: ChannelMessageContext):
event_id = context.incoming_event.id
# Check if we've already processed this event
if event_id in self.processed_events:
return
# Process the event
await self.process_message(context)
# Mark as processed
self.processed_events.add(event_id)性能注意事项
事件处理器性能
保持事件处理器轻量且快速:
async def on_channel_post(self, context: ChannelMessageContext):
# Good: Quick processing
if "urgent" in context.incoming_event.payload.get('content', {}).get('text', ''):
await self.handle_urgent_request(context)
# Avoid: Heavy processing that blocks other events
# await self.complex_analysis(context) # This could block other events
# Better: Offload heavy work
asyncio.create_task(self.complex_analysis(context))事件批处理
将相关事件批量处理以提高效率:
class BatchProcessor(WorkerAgent):
def __init__(self):
super().__init__()
self.message_batch = []
self.batch_timer = None
async def on_channel_post(self, context: ChannelMessageContext):
self.message_batch.append(context)
# Process batch after collecting messages for 5 seconds
if self.batch_timer:
self.batch_timer.cancel()
self.batch_timer = asyncio.create_task(asyncio.sleep(5))
await self.batch_timer
await self.process_batch()
async def process_batch(self):
if self.message_batch:
await self.analyze_message_batch(self.message_batch)
self.message_batch.clear()高级事件模式
事件溯源
存储事件以便重放和分析:
class EventStore:
def __init__(self):
self.events = []
async def store_event(self, event):
self.events.append({
'id': event.id,
'type': event.event_name,
'data': event.content,
'timestamp': event.timestamp,
'source': event.source_id
})
async def replay_events(self, from_timestamp=None):
"""Replay events from a specific time"""
filtered_events = self.events
if from_timestamp:
filtered_events = [e for e in self.events if e['timestamp'] >= from_timestamp]
for event in filtered_events:
await self.process_replayed_event(event)事件转换
在处理前转换事件:
class TransformingAgent(WorkerAgent):
async def on_channel_post(self, context: ChannelMessageContext):
# Transform the event context
transformed_context = await self.transform_context(context)
# Process with transformed context
await self.process_transformed_message(transformed_context)
async def transform_context(self, context):
# Add enrichment data
context.enriched_data = await self.enrich_message(context)
return context最佳实践
事件处理器设计
- 保持处理器响应迅速: 避免阻塞操作
- 优雅地处理错误: 始终包含错误处理
- 使用适当的上下文: 将处理器与事件类型匹配
- 避免副作用: 使处理器具有可预测性
- 彻底测试: 测试所有事件场景
事件架构
- 设计清晰的事件模式: 明确定义事件结构
- 使用有意义的事件名称: 使事件目的明显
- 避免事件风暴: 防止级联事件链
- 监控事件流: 跟踪事件处理性能
- 记录事件契约: 记录预期的事件行为
下一步
Was this helpful?
Next