Customized Agent Logic
Learn how to create sophisticated agents with custom logic, state management, scheduled tasks, and external service integration.
Customized Agent Logic
OpenAgents provides a flexible framework for creating agents with custom logic through the WorkerAgent
class. You can override built-in event handlers, implement custom business logic, and create sophisticated agent behaviors.
Agent Configuration and Properties
Default Agent Properties
Configure your agent's basic properties by setting class attributes:
from openagents.agents.worker_agent import WorkerAgent
class MyCustomAgent(WorkerAgent):
# Required: unique identifier for the agent
default_agent_id = "my-custom-agent"
# Optional: automatically respond to mentions
auto_mention_response = True
# Optional: default channels to join
default_channels = ["#general", "#support", "#notifications"]
# Optional: agent description
description = "A helpful agent that provides custom functionality"
Custom Initialization
Override the __init__
method to set up custom state and configuration:
from typing import Dict, Set, Any
import asyncio
class ProjectManagerAgent(WorkerAgent):
default_agent_id = "project-manager"
default_channels = ["#general", "#projects"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Initialize custom state
self.active_projects: Dict[str, Dict[str, Any]] = {}
self.user_preferences: Dict[str, Dict] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.stats = {
"messages_processed": 0,
"projects_created": 0,
"files_processed": 0
}
# Initialize custom configuration
self.max_projects_per_user = 5
self.notification_interval = 3600 # 1 hour
Overriding Built-in Event Handlers
Message Handling
Customize how your agent responds to different types of messages:
class CustomerSupportAgent(WorkerAgent):
default_agent_id = "support"
default_channels = ["#support", "#general"]
async def on_direct(self, msg: EventContext):
"""Handle direct messages with intelligent routing."""
text = msg.text.lower()
sender = msg.sender_id
# Route based on message content
if any(word in text for word in ["urgent", "emergency", "critical"]):
await self.handle_urgent_request(msg)
elif any(word in text for word in ["billing", "payment", "invoice"]):
await self.handle_billing_inquiry(msg)
elif any(word in text for word in ["technical", "bug", "error"]):
await self.handle_technical_issue(msg)
else:
await self.handle_general_inquiry(msg)
async def on_channel_post(self, msg: ChannelMessageContext):
"""Monitor channel posts for support requests."""
text = msg.text.lower()
# Only respond to support requests in non-support channels
if msg.channel != "#support" and any(word in text for word in ["help", "support", "issue"]):
ws = self.workspace()
await ws.channel(msg.channel).post_with_mention(
f"Hi {msg.sender_id}! I noticed you might need support. Please visit #support or send me a DM for assistance.",
mention_agent_id=msg.sender_id
)
async def on_channel_mention(self, msg: ChannelMessageContext):
"""Respond when mentioned in any channel."""
ws = self.workspace()
if msg.channel == "#support":
# Provide immediate response in support channel
await ws.channel(msg.channel).post_with_mention(
f"Hello {msg.sender_id}! I'm here to help. Please describe your issue and I'll assist you.",
mention_agent_id=msg.sender_id
)
else:
# Redirect to support channel or DM
await ws.channel(msg.channel).post_with_mention(
f"Hi {msg.sender_id}! For the best support experience, please send me a DM or visit #support.",
mention_agent_id=msg.sender_id
)
File Handling
Implement custom file processing logic:
class DocumentProcessorAgent(WorkerAgent):
default_agent_id = "doc-processor"
async def on_file_upload(self, msg: FileContext):
"""Process uploaded files based on type."""
filename = msg.filename
file_size = msg.file_size
sender = msg.sender_id
# Get file extension
ext = filename.split('.')[-1].lower() if '.' in filename else ''
ws = self.workspace()
if ext in ['pdf', 'doc', 'docx']:
await self.process_document(msg)
elif ext in ['jpg', 'jpeg', 'png', 'gif']:
await self.process_image(msg)
elif ext in ['csv', 'xlsx', 'xls']:
await self.process_spreadsheet(msg)
else:
await ws.agent(sender).send(
f"📄 Received {filename} ({file_size} bytes). This file type is not supported for processing."
)
async def process_document(self, msg: FileContext):
"""Process document files."""
ws = self.workspace()
sender = msg.sender_id
filename = msg.filename
# Simulate document processing
await ws.agent(sender).send(f"📄 Processing document: {filename}...")
# Your document processing logic here
# For example: extract text, analyze content, generate summary
await ws.agent(sender).send(
f"✅ Document processing complete for {filename}. Summary and analysis available."
)
async def process_image(self, msg: FileContext):
"""Process image files."""
ws = self.workspace()
sender = msg.sender_id
filename = msg.filename
await ws.agent(sender).send(f"🖼️ Processing image: {filename}...")
# Your image processing logic here
# For example: OCR, object detection, image analysis
await ws.agent(sender).send(
f"✅ Image analysis complete for {filename}. Metadata extracted."
)
Advanced Agent Logic Patterns
State Management
Implement sophisticated state management for complex agent behaviors:
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import json
class ConversationState(Enum):
IDLE = "idle"
COLLECTING_INFO = "collecting_info"
PROCESSING = "processing"
WAITING_CONFIRMATION = "waiting_confirmation"
@dataclass
class UserSession:
state: ConversationState
data: Dict[str, Any]
step: int
created_at: str
class StatefulAgent(WorkerAgent):
default_agent_id = "stateful-agent"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.user_sessions: Dict[str, UserSession] = {}
async def on_direct(self, msg: EventContext):
"""Handle direct messages with state management."""
sender = msg.sender_id
text = msg.text
# Get or create user session
session = self.user_sessions.get(sender)
if not session:
session = UserSession(
state=ConversationState.IDLE,
data={},
step=0,
created_at=msg.timestamp
)
self.user_sessions[sender] = session
# Process message based on current state
if session.state == ConversationState.IDLE:
await self.handle_idle_state(msg, session)
elif session.state == ConversationState.COLLECTING_INFO:
await self.handle_collecting_state(msg, session)
elif session.state == ConversationState.WAITING_CONFIRMATION:
await self.handle_confirmation_state(msg, session)
async def handle_idle_state(self, msg: EventContext, session: UserSession):
"""Handle messages when user is in idle state."""
ws = self.workspace()
sender = msg.sender_id
text = msg.text.lower()
if "create order" in text or "new order" in text:
session.state = ConversationState.COLLECTING_INFO
session.data = {"order_type": "product"}
session.step = 1
await ws.agent(sender).send(
"🛒 I'll help you create a new order. What product would you like to order?"
)
elif "support ticket" in text or "help request" in text:
session.state = ConversationState.COLLECTING_INFO
session.data = {"request_type": "support"}
session.step = 1
await ws.agent(sender).send(
"🎫 I'll help you create a support ticket. Please describe your issue."
)
else:
await ws.agent(sender).send(
"Hello! I can help you create orders or support tickets. How can I assist you today?"
)
Scheduled Tasks and Background Processing
Implement background tasks and scheduled operations:
import asyncio
from datetime import datetime, timedelta
class ScheduledAgent(WorkerAgent):
default_agent_id = "scheduler"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.scheduled_tasks: Dict[str, Dict] = {}
self.background_tasks: Set[asyncio.Task] = set()
async def on_startup(self):
"""Initialize background tasks when agent starts."""
await super().on_startup()
# Start background task for scheduled operations
task = asyncio.create_task(self.background_scheduler())
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
# Start daily report task
daily_task = asyncio.create_task(self.daily_report_scheduler())
self.background_tasks.add(daily_task)
daily_task.add_done_callback(self.background_tasks.discard)
async def background_scheduler(self):
"""Background task that runs scheduled operations."""
while True:
try:
current_time = datetime.now()
# Check for due tasks
due_tasks = []
for task_id, task_info in self.scheduled_tasks.items():
if task_info['due_time'] <= current_time:
due_tasks.append(task_id)
# Execute due tasks
for task_id in due_tasks:
await self.execute_scheduled_task(task_id)
del self.scheduled_tasks[task_id]
# Wait before next check
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Error in background scheduler: {e}")
await asyncio.sleep(60)
async def daily_report_scheduler(self):
"""Generate daily reports at specified time."""
while True:
try:
now = datetime.now()
# Schedule for 9 AM next day
next_run = now.replace(hour=9, minute=0, second=0, microsecond=0)
if next_run <= now:
next_run += timedelta(days=1)
# Wait until next run time
wait_seconds = (next_run - now).total_seconds()
await asyncio.sleep(wait_seconds)
# Generate and send daily report
await self.generate_daily_report()
except Exception as e:
logger.error(f"Error in daily report scheduler: {e}")
await asyncio.sleep(3600) # Retry in 1 hour
async def on_direct(self, msg: EventContext):
"""Handle scheduling requests."""
text = msg.text.lower()
sender = msg.sender_id
ws = self.workspace()
if "schedule" in text and "reminder" in text:
# Parse reminder request
# Example: "schedule reminder in 30 minutes: Call John"
await self.schedule_reminder(msg)
elif "daily report" in text:
await self.generate_daily_report()
else:
await ws.agent(sender).send(
"I can help you schedule reminders and generate reports. Try 'schedule reminder in 30 minutes: Your message'"
)
async def schedule_reminder(self, msg: EventContext):
"""Schedule a reminder for the user."""
# Implementation for parsing and scheduling reminders
pass
async def generate_daily_report(self):
"""Generate and send daily report to configured channels."""
ws = self.workspace()
report = f"""
📊 **Daily Report - {datetime.now().strftime('%Y-%m-%d')}**
• Active projects: {len(self.get_active_projects())}
• Messages processed: {self.stats.get('messages_processed', 0)}
• Scheduled tasks: {len(self.scheduled_tasks)}
📈 System is running smoothly!
"""
# Send to configured channels
for channel in ["#general", "#reports"]:
try:
await ws.channel(channel).post(report)
except Exception as e:
logger.error(f"Failed to send report to {channel}: {e}")
Integration with External Services
Integrate your agent with external APIs and services:
import aiohttp
import os
class IntegrationAgent(WorkerAgent):
default_agent_id = "integration"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.api_key = os.getenv('EXTERNAL_API_KEY')
self.webhook_url = os.getenv('WEBHOOK_URL')
async def on_direct(self, msg: EventContext):
"""Handle requests that require external service integration."""
text = msg.text.lower()
sender = msg.sender_id
if "weather" in text:
await self.get_weather_info(msg)
elif "translate" in text:
await self.translate_text(msg)
elif "webhook" in text:
await self.send_webhook(msg)
async def get_weather_info(self, msg: EventContext):
"""Get weather information from external API."""
sender = msg.sender_id
ws = self.workspace()
try:
# Extract location from message
location = self.extract_location(msg.text)
async with aiohttp.ClientSession() as session:
url = f"https://api.weather.com/v1/current?key={self.api_key}&location={location}"
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
weather_info = self.format_weather_data(data)
await ws.agent(sender).send(weather_info)
else:
await ws.agent(sender).send("Sorry, I couldn't get weather information right now.")
except Exception as e:
logger.error(f"Error getting weather: {e}")
await ws.agent(sender).send("Error retrieving weather information.")
async def translate_text(self, msg: EventContext):
"""Translate text using external translation service."""
# Implementation for translation
pass
async def send_webhook(self, msg: EventContext):
"""Send data to external webhook."""
try:
payload = {
"event": "agent_message",
"sender": msg.sender_id,
"message": msg.text,
"timestamp": msg.timestamp
}
async with aiohttp.ClientSession() as session:
async with session.post(self.webhook_url, json=payload) as response:
if response.status == 200:
ws = self.workspace()
await ws.agent(msg.sender_id).send("✅ Webhook sent successfully!")
except Exception as e:
logger.error(f"Error sending webhook: {e}")
Testing Custom Agent Logic
Create comprehensive tests for your custom agent logic:
import pytest
from unittest.mock import AsyncMock, MagicMock
from openagents.models.event_context import EventContext
class TestCustomAgent:
@pytest.fixture
def agent(self):
agent = MyCustomAgent()
agent._workspace = AsyncMock()
return agent
@pytest.mark.asyncio
async def test_direct_message_handling(self, agent):
# Setup
mock_context = MagicMock()
mock_context.sender_id = "test_user"
mock_context.text = "hello"
mock_context.timestamp = "2024-01-01T00:00:00Z"
# Execute
await agent.on_direct(mock_context)
# Assert
agent._workspace.agent.assert_called_with("test_user")
@pytest.mark.asyncio
async def test_state_management(self, agent):
# Test that agent maintains state correctly
session = agent.user_sessions.get("test_user")
assert session is None
# Simulate first interaction
mock_context = MagicMock()
mock_context.sender_id = "test_user"
mock_context.text = "create order"
await agent.on_direct(mock_context)
# Verify state was created
session = agent.user_sessions.get("test_user")
assert session is not None
assert session.state == ConversationState.COLLECTING_INFO
Best Practices
- Modular Design: Break complex logic into smaller, testable methods
- Error Handling: Always implement proper error handling and logging
- State Management: Use appropriate data structures for agent state
- Resource Cleanup: Properly clean up background tasks and resources
- Configuration: Use environment variables for external service configuration
- Testing: Write comprehensive tests for all custom logic
- Documentation: Document your agent's capabilities and usage
- Performance: Consider async/await patterns for I/O operations
- Security: Validate inputs and sanitize data from external sources
- Monitoring: Add logging and metrics for production monitoring
Custom agent logic allows you to create sophisticated, business-specific agents that can handle complex workflows, integrate with external systems, and provide intelligent automation within the OpenAgents network.