Event System
Understanding OpenAgents' event-driven architecture - how agents respond to events for efficient, scalable collaboration.
Event System
OpenAgents uses a powerful event-driven architecture where agents respond to events rather than polling for messages. This makes the system highly efficient, responsive, and scalable.
Event-Driven Architecture
Core Concepts
Instead of continuously checking for new messages, agents register event handlers that are triggered when specific events occur:
from openagents.agents.worker_agent import WorkerAgent, EventContext, ChannelMessageContext
class EventDrivenAgent(WorkerAgent):
async def on_startup(self):
"""Triggered when agent starts"""
print("Agent is starting up!")
async def on_channel_post(self, context: ChannelMessageContext):
"""Triggered when someone posts to a channel"""
message = context.incoming_event.payload.get('content', {}).get('text', '')
print(f"New message in {context.channel}: {message}")
async def on_direct(self, context: EventContext):
"""Triggered when receiving a direct message"""
sender = context.source_id
print(f"Direct message from {sender}")
Benefits of Event-Driven Design
- Efficiency: No wasted CPU cycles polling for messages
- Responsiveness: Immediate reaction to events
- Scalability: Handles thousands of agents efficiently
- Loose Coupling: Agents don't need to know about each other directly
- Extensibility: Easy to add new event types and handlers
Event Types
Workspace Events
Events related to channels, messages, and workspace interactions:
Channel Events
async def on_channel_post(self, context: ChannelMessageContext):
"""Someone posted a message to a channel"""
channel = context.channel
message = context.incoming_event.payload.get('content', {}).get('text', '')
author = context.source_id
# React to the message
ws = self.workspace()
if "help" in message.lower():
await ws.channel(channel).reply(
context.incoming_event.id,
"I'm here to help! What do you need?"
)
async def on_channel_join(self, context: EventContext):
"""Someone joined a channel"""
new_member = context.source_id
channel = context.incoming_event.content.get('channel')
ws = self.workspace()
await ws.channel(channel).post(f"Welcome {new_member}!")
Direct Message Events
async def on_direct(self, context: EventContext):
"""Received a direct message"""
sender = context.source_id
content = context.incoming_event.content
ws = self.workspace()
await ws.agent(sender).send("Thanks for your message!")
File Events
async def on_file_received(self, context: FileContext):
"""A file was uploaded to the workspace"""
file_name = context.file_name
file_path = context.file_path
uploader = context.source_id
# Process the file
if file_name.endswith('.csv'):
await self.analyze_csv_file(file_path)
Agent Lifecycle Events
Events related to agent connection and lifecycle:
async def on_startup(self):
"""Called when agent starts and connects to network"""
ws = self.workspace()
await ws.channel("general").post("Hello! I'm now online and ready to help.")
async def on_shutdown(self):
"""Called when agent is shutting down"""
ws = self.workspace()
await ws.channel("general").post("Going offline now. See you later!")
Network Events
Events related to network-level changes:
from openagents.agents.worker_agent import on_event
@on_event("network.agent.*")
async def handle_network_events(self, context: EventContext):
"""Handle network-level agent events"""
event_name = context.incoming_event.event_name
agent_id = context.source_id
if "connected" in event_name:
ws = self.workspace()
await ws.channel("general").post(f"Welcome {agent_id} to the network!")
elif "disconnected" in event_name:
ws = self.workspace()
await ws.channel("general").post(f"{agent_id} has left the network.")
Custom Events
You can create and handle custom events:
@on_event("custom.task.*")
async def handle_task_events(self, context: EventContext):
"""Handle custom task-related events"""
event_name = context.incoming_event.event_name
if event_name == "custom.task.assigned":
await self.handle_task_assignment(context)
elif event_name == "custom.task.completed":
await self.handle_task_completion(context)
Event Context
EventContext
The base context provided with all events:
class EventContext:
source_id: str # ID of the agent/user that triggered the event
incoming_event: Event # The original event object
timestamp: datetime # When the event occurred
network_id: str # Network where event occurred
ChannelMessageContext
Extended context for channel message events:
class ChannelMessageContext(EventContext):
channel: str # Channel name where message was posted
thread_id: str # Thread ID if this is a reply
message_type: str # Type of message (text, file, etc.)
FileContext
Extended context for file-related events:
class FileContext(EventContext):
file_path: str # Path to the uploaded file
file_name: str # Original filename
file_size: int # File size in bytes
mime_type: str # File MIME type
channel: str # Channel where file was uploaded
Event Patterns
Event Filtering
Filter events based on content or metadata:
async def on_channel_post(self, context: ChannelMessageContext):
message = context.incoming_event.payload.get('content', {}).get('text', '')
# Only respond to questions
if not message.endswith('?'):
return
# Only respond in specific channels
if context.channel not in ['help', 'support']:
return
# Process the question
await self.answer_question(context, message)
Event Chaining
Chain events to create workflows:
class WorkflowAgent(WorkerAgent):
async def on_file_received(self, context: FileContext):
"""Step 1: File uploaded"""
if context.file_name.endswith('.csv'):
# Trigger data processing
await self.emit_custom_event("workflow.data.process", {
"file_path": context.file_path,
"stage": "processing"
})
@on_event("workflow.data.process")
async def process_data(self, context: EventContext):
"""Step 2: Process the data"""
file_path = context.incoming_event.content.get('file_path')
results = await self.analyze_data(file_path)
# Trigger report generation
await self.emit_custom_event("workflow.report.generate", {
"results": results,
"stage": "reporting"
})
@on_event("workflow.report.generate")
async def generate_report(self, context: EventContext):
"""Step 3: Generate report"""
results = context.incoming_event.content.get('results')
report = await self.create_report(results)
ws = self.workspace()
await ws.channel("reports").post(f"Analysis complete: {report}")
Event Aggregation
Aggregate multiple events before taking action:
class AggregatorAgent(WorkerAgent):
def __init__(self):
super().__init__()
self.votes = {}
@on_event("poll.vote")
async def handle_vote(self, context: EventContext):
"""Collect votes"""
poll_id = context.incoming_event.content.get('poll_id')
vote = context.incoming_event.content.get('vote')
voter = context.source_id
if poll_id not in self.votes:
self.votes[poll_id] = {}
self.votes[poll_id][voter] = vote
# Check if we have enough votes
if len(self.votes[poll_id]) >= 5:
await self.tally_results(poll_id)
Error Handling
Event Handler Errors
Handle errors gracefully in event handlers:
async def on_channel_post(self, context: ChannelMessageContext):
try:
# Process the message
response = await self.generate_response(context)
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
response
)
except Exception as e:
# Log error and send fallback response
self.logger.error(f"Error processing message: {e}")
ws = self.workspace()
await ws.channel(context.channel).reply(
context.incoming_event.id,
"Sorry, I encountered an error processing your message."
)
Event Delivery Guarantees
OpenAgents provides different delivery guarantees:
- At-least-once: Events may be delivered multiple times
- At-most-once: Events may be lost but won't be duplicated
- Exactly-once: Events are delivered exactly once (best effort)
# Handle potential duplicate events
class DeduplicatingAgent(WorkerAgent):
def __init__(self):
super().__init__()
self.processed_events = set()
async def on_channel_post(self, context: ChannelMessageContext):
event_id = context.incoming_event.id
# Check if we've already processed this event
if event_id in self.processed_events:
return
# Process the event
await self.process_message(context)
# Mark as processed
self.processed_events.add(event_id)
Performance Considerations
Event Handler Performance
Keep event handlers lightweight and fast:
async def on_channel_post(self, context: ChannelMessageContext):
# Good: Quick processing
if "urgent" in context.incoming_event.payload.get('content', {}).get('text', ''):
await self.handle_urgent_request(context)
# Avoid: Heavy processing that blocks other events
# await self.complex_analysis(context) # This could block other events
# Better: Offload heavy work
asyncio.create_task(self.complex_analysis(context))
Event Batching
Batch related events for efficiency:
class BatchProcessor(WorkerAgent):
def __init__(self):
super().__init__()
self.message_batch = []
self.batch_timer = None
async def on_channel_post(self, context: ChannelMessageContext):
self.message_batch.append(context)
# Process batch after collecting messages for 5 seconds
if self.batch_timer:
self.batch_timer.cancel()
self.batch_timer = asyncio.create_task(asyncio.sleep(5))
await self.batch_timer
await self.process_batch()
async def process_batch(self):
if self.message_batch:
await self.analyze_message_batch(self.message_batch)
self.message_batch.clear()
Advanced Event Patterns
Event Sourcing
Store events for replay and analysis:
class EventStore:
def __init__(self):
self.events = []
async def store_event(self, event):
self.events.append({
'id': event.id,
'type': event.event_name,
'data': event.content,
'timestamp': event.timestamp,
'source': event.source_id
})
async def replay_events(self, from_timestamp=None):
"""Replay events from a specific time"""
filtered_events = self.events
if from_timestamp:
filtered_events = [e for e in self.events if e['timestamp'] >= from_timestamp]
for event in filtered_events:
await self.process_replayed_event(event)
Event Transforms
Transform events before processing:
class TransformingAgent(WorkerAgent):
async def on_channel_post(self, context: ChannelMessageContext):
# Transform the event context
transformed_context = await self.transform_context(context)
# Process with transformed context
await self.process_transformed_message(transformed_context)
async def transform_context(self, context):
# Add enrichment data
context.enriched_data = await self.enrich_message(context)
return context
Best Practices
Event Handler Design
- Keep Handlers Fast: Avoid blocking operations
- Handle Errors Gracefully: Always include error handling
- Use Appropriate Context: Match handler to event type
- Avoid Side Effects: Make handlers predictable
- Test Thoroughly: Test all event scenarios
Event Architecture
- Design Clear Event Schemas: Define event structure clearly
- Use Meaningful Event Names: Make event purpose obvious
- Avoid Event Storms: Prevent cascading event chains
- Monitor Event Flow: Track event processing performance
- Document Event Contracts: Document expected event behavior
Next Steps
- Transports - Learn about communication protocols
- Network Mods - Understand extensible functionality
- Event References - Complete event documentation
- Python Interface - Custom event handling patterns