TutorialsCustomize Agents
Customize Agents
Learn to create custom agents with specialized behaviors, advanced logic patterns, external integrations, and sophisticated automation capabilities.
Customize Agents
This tutorial shows you how to create highly customized agents with specialized behaviors, advanced logic patterns, and sophisticated automation capabilities.
Table of Contents
- Agent Personality and Behavior
- Custom Agent Types
- Advanced Event Processing
- External Service Integration
- Custom Protocols and Communication
- Agent Specialization Patterns
- Performance and Optimization
Agent Personality and Behavior
Creating Distinct Agent Personalities
from openagents.agents.worker_agent import WorkerAgent
import random
class PersonalityAgent(WorkerAgent):
"""Agent with a distinct personality and communication style"""
default_agent_id = "personality-bot"
def __init__(self, personality_type="friendly", **kwargs):
super().__init__(**kwargs)
self.personality_type = personality_type
self.personality_traits = self._load_personality_traits()
self.response_patterns = self._load_response_patterns()
self.emotional_state = "neutral"
def _load_personality_traits(self):
"""Define personality-specific traits"""
personalities = {
"friendly": {
"greeting_style": "warm",
"formality": "casual",
"enthusiasm": "high",
"humor": "frequent",
"empathy": "high"
},
"professional": {
"greeting_style": "formal",
"formality": "business",
"enthusiasm": "moderate",
"humor": "minimal",
"empathy": "moderate"
},
"quirky": {
"greeting_style": "unusual",
"formality": "informal",
"enthusiasm": "variable",
"humor": "unexpected",
"empathy": "high"
}
}
return personalities.get(self.personality_type, personalities["friendly"])
def _load_response_patterns(self):
"""Define response patterns for different personalities"""
patterns = {
"friendly": {
"greetings": ["Hello there! 😊", "Hi friend!", "Hey! Great to see you!"],
"acknowledgments": ["Absolutely!", "I'd love to help!", "That sounds wonderful!"],
"farewells": ["See you later! 👋", "Take care!", "Until next time!"]
},
"professional": {
"greetings": ["Good day.", "Hello.", "Greetings."],
"acknowledgments": ["Understood.", "I will assist you.", "Certainly."],
"farewells": ["Good day.", "Thank you.", "Best regards."]
},
"quirky": {
"greetings": ["Ahoy there! 🎭", "*tips virtual hat*", "Greetings, human specimen!"],
"acknowledgments": ["Roger that, captain!", "*beep boop* Processing...", "Ooh, interesting!"],
"farewells": ["Until we meet again! ⭐", "*disappears in a puff of digital smoke*", "Farewell, brave adventurer!"]
}
}
return patterns.get(self.personality_type, patterns["friendly"])
async def on_direct(self, msg):
"""Respond with personality-appropriate communication"""
response = await self.generate_personality_response(msg.text, msg.sender_id)
ws = self.workspace()
await ws.agent(msg.sender_id).send(response)
async def generate_personality_response(self, text, sender_id):
"""Generate response based on personality"""
text_lower = text.lower()
# Determine response type
if any(greeting in text_lower for greeting in ["hello", "hi", "hey"]):
response_base = random.choice(self.response_patterns["greetings"])
elif any(word in text_lower for word in ["thanks", "thank you"]):
response_base = random.choice(self.response_patterns["acknowledgments"])
elif any(word in text_lower for word in ["bye", "goodbye", "farewell"]):
response_base = random.choice(self.response_patterns["farewells"])
else:
response_base = await self.process_request(text)
# Add personality-specific modifications
return self.apply_personality_filter(response_base, sender_id)
def apply_personality_filter(self, response, sender_id):
"""Apply personality-specific modifications to response"""
if self.personality_traits["enthusiasm"] == "high":
response += " 🎉"
if self.personality_traits["formality"] == "casual":
response = response.replace("you", "ya").replace("your", "your")
if self.personality_traits["humor"] == "frequent" and random.random() < 0.3:
jokes = [
" (Why did the agent cross the network? To get to the other side! 😄)",
" *ba dum tss* 🥁",
" (That's what she said! 😉)"
]
response += random.choice(jokes)
return response
Adaptive Behavior Based on Context
class AdaptiveAgent(WorkerAgent):
"""Agent that adapts behavior based on context and interaction history"""
default_agent_id = "adaptive"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.user_profiles = {}
self.conversation_contexts = {}
self.interaction_history = {}
async def on_direct(self, msg):
"""Adapt response based on user profile and context"""
# Update user profile
self.update_user_profile(msg.sender_id, msg.text)
# Get adaptive response
response = await self.get_adaptive_response(msg)
ws = self.workspace()
await ws.agent(msg.sender_id).send(response)
def update_user_profile(self, user_id, message):
"""Build and update user profile based on interactions"""
if user_id not in self.user_profiles:
self.user_profiles[user_id] = {
"communication_style": "unknown",
"expertise_level": "unknown",
"interests": [],
"interaction_count": 0,
"preferred_response_length": "medium"
}
profile = self.user_profiles[user_id]
profile["interaction_count"] += 1
# Analyze communication style
if len(message) < 20:
profile["communication_style"] = "brief"
elif len(message) > 100:
profile["communication_style"] = "detailed"
else:
profile["communication_style"] = "moderate"
# Detect expertise level
technical_terms = ["algorithm", "function", "variable", "class", "method"]
if any(term in message.lower() for term in technical_terms):
profile["expertise_level"] = "technical"
elif "please" in message.lower() and "help" in message.lower():
profile["expertise_level"] = "beginner"
else:
profile["expertise_level"] = "intermediate"
async def get_adaptive_response(self, msg):
"""Generate response adapted to user profile"""
profile = self.user_profiles[msg.sender_id]
# Adapt response based on communication style
if profile["communication_style"] == "brief":
return await self.get_brief_response(msg.text)
elif profile["communication_style"] == "detailed":
return await self.get_detailed_response(msg.text)
else:
return await self.get_standard_response(msg.text)
async def get_brief_response(self, text):
"""Generate concise responses for brief communicators"""
if "help" in text.lower():
return "Sure! What specifically?"
elif "status" in text.lower():
return "All good! ✅"
else:
return "Got it!"
async def get_detailed_response(self, text):
"""Generate comprehensive responses for detailed communicators"""
if "help" in text.lower():
return (
"I'd be happy to provide assistance! To give you the most helpful response, "
"I'll need a bit more information about what specifically you're looking for. "
"Are you working on a particular project, facing a technical challenge, "
"or looking for general guidance? Please feel free to provide as much detail "
"as you'd like, and I'll do my best to provide comprehensive support."
)
else:
return await self.analyze_and_respond_thoroughly(text)
Custom Agent Types
Specialized Domain Agents
class DataScienceAgent(WorkerAgent):
"""Agent specialized in data science tasks"""
default_agent_id = "data-scientist"
default_channels = ["#data-science", "#analytics"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.supported_formats = ["csv", "json", "xlsx", "parquet"]
self.analysis_types = ["descriptive", "predictive", "prescriptive"]
self.current_projects = {}
async def on_direct(self, msg):
"""Handle data science requests"""
request_type = self.classify_request(msg.text)
if request_type == "analysis":
await self.handle_analysis_request(msg)
elif request_type == "visualization":
await self.handle_visualization_request(msg)
elif request_type == "modeling":
await self.handle_modeling_request(msg)
else:
await self.provide_guidance(msg)
def classify_request(self, text):
"""Classify the type of data science request"""
text_lower = text.lower()
analysis_keywords = ["analyze", "statistics", "correlation", "summary"]
viz_keywords = ["plot", "chart", "graph", "visualize", "dashboard"]
modeling_keywords = ["model", "predict", "forecast", "machine learning", "ml"]
if any(keyword in text_lower for keyword in modeling_keywords):
return "modeling"
elif any(keyword in text_lower for keyword in viz_keywords):
return "visualization"
elif any(keyword in text_lower for keyword in analysis_keywords):
return "analysis"
else:
return "general"
async def handle_analysis_request(self, msg):
"""Handle data analysis requests"""
ws = self.workspace()
# Start analysis workflow
await ws.agent(msg.sender_id).send(
"📊 Starting data analysis workflow...\n\n"
"Please provide:\n"
"1. Data source (file upload or description)\n"
"2. Analysis type (descriptive/predictive/prescriptive)\n"
"3. Specific questions you want answered\n\n"
"I'll guide you through the process step by step!"
)
# Create analysis project
project_id = f"analysis_{len(self.current_projects) + 1}"
self.current_projects[project_id] = {
"type": "analysis",
"requester": msg.sender_id,
"status": "awaiting_data",
"created_at": msg.timestamp
}
Multi-Modal Agents
class MultiModalAgent(WorkerAgent):
"""Agent that handles text, images, and files"""
default_agent_id = "multimodal"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.vision_enabled = True
self.audio_enabled = True
self.document_processing_enabled = True
async def on_direct(self, msg):
"""Handle text messages"""
await self.process_text_input(msg)
async def on_file_upload(self, msg):
"""Handle file uploads based on type"""
file_extension = msg.filename.split('.')[-1].lower()
if file_extension in ['jpg', 'jpeg', 'png', 'gif', 'webp']:
await self.process_image(msg)
elif file_extension in ['mp3', 'wav', 'ogg', 'flac']:
await self.process_audio(msg)
elif file_extension in ['pdf', 'docx', 'txt', 'md']:
await self.process_document(msg)
else:
await self.handle_unknown_file_type(msg)
async def process_image(self, msg):
"""Process image files with computer vision"""
ws = self.workspace()
if not self.vision_enabled:
await ws.agent(msg.sender_id).send(
"Image processing is currently disabled."
)
return
await ws.agent(msg.sender_id).send(
f"🖼️ Analyzing image: {msg.filename}..."
)
# Simulate image analysis
analysis_results = await self.analyze_image(msg.file_content)
await ws.agent(msg.sender_id).send(
f"📸 Image Analysis Results for {msg.filename}:\n\n"
f"• Objects detected: {', '.join(analysis_results.get('objects', []))}\n"
f"• Scene type: {analysis_results.get('scene', 'Unknown')}\n"
f"• Dominant colors: {', '.join(analysis_results.get('colors', []))}\n"
f"• Text detected: {analysis_results.get('text', 'None')}\n"
f"• Confidence: {analysis_results.get('confidence', 0)}%"
)
async def analyze_image(self, image_content):
"""Perform computer vision analysis on image"""
# Simulate computer vision processing
return {
"objects": ["person", "laptop", "desk"],
"scene": "office environment",
"colors": ["blue", "white", "gray"],
"text": "OpenAgents",
"confidence": 85
}
async def process_document(self, msg):
"""Process and extract information from documents"""
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"📄 Processing document: {msg.filename}..."
)
# Extract document content and metadata
doc_analysis = await self.analyze_document(msg.file_content, msg.filename)
await ws.agent(msg.sender_id).send(
f"📋 Document Analysis for {msg.filename}:\n\n"
f"• Document type: {doc_analysis.get('type', 'Unknown')}\n"
f"• Page count: {doc_analysis.get('pages', 'N/A')}\n"
f"• Word count: ~{doc_analysis.get('word_count', 0)}\n"
f"• Key topics: {', '.join(doc_analysis.get('topics', []))}\n"
f"• Summary: {doc_analysis.get('summary', 'No summary available')}"
)
Advanced Event Processing
Event Pipeline and Transformation
from openagents.agents.worker_agent import WorkerAgent, on_event
import asyncio
from typing import Dict, List, Callable
class EventPipelineAgent(WorkerAgent):
"""Agent with advanced event processing pipeline"""
default_agent_id = "event-pipeline"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.event_pipeline = []
self.event_filters = {}
self.event_transformers = {}
self.event_aggregators = {}
self.setup_pipeline()
def setup_pipeline(self):
"""Configure event processing pipeline"""
# Add filters
self.add_filter("spam_filter", self.filter_spam_events)
self.add_filter("priority_filter", self.filter_priority_events)
# Add transformers
self.add_transformer("normalize", self.normalize_event_data)
self.add_transformer("enrich", self.enrich_event_context)
# Add aggregators
self.add_aggregator("user_activity", self.aggregate_user_activity)
self.add_aggregator("system_metrics", self.aggregate_system_metrics)
def add_filter(self, name: str, filter_func: Callable):
"""Add event filter to pipeline"""
self.event_filters[name] = filter_func
def add_transformer(self, name: str, transform_func: Callable):
"""Add event transformer to pipeline"""
self.event_transformers[name] = transform_func
def add_aggregator(self, name: str, aggregator_func: Callable):
"""Add event aggregator to pipeline"""
self.event_aggregators[name] = aggregator_func
@on_event("*")
async def process_all_events(self, context):
"""Process all events through pipeline"""
event = context.incoming_event
# Apply filters
if not await self.apply_filters(event):
return # Event filtered out
# Apply transformations
transformed_event = await self.apply_transformers(event)
# Apply aggregations
await self.apply_aggregators(transformed_event)
# Route to specific handlers
await self.route_event(transformed_event)
async def apply_filters(self, event) -> bool:
"""Apply all filters to event"""
for filter_name, filter_func in self.event_filters.items():
if not await filter_func(event):
return False
return True
async def filter_spam_events(self, event) -> bool:
"""Filter out spam events"""
spam_indicators = ["spam", "advertisement", "promotional"]
event_text = str(event.payload).lower()
return not any(indicator in event_text for indicator in spam_indicators)
async def filter_priority_events(self, event) -> bool:
"""Filter events based on priority"""
priority_events = ["error", "alert", "critical", "emergency"]
return any(priority in event.event_name.lower() for priority in priority_events)
async def apply_transformers(self, event):
"""Apply all transformers to event"""
transformed_event = event
for transformer_name, transformer_func in self.event_transformers.items():
transformed_event = await transformer_func(transformed_event)
return transformed_event
async def normalize_event_data(self, event):
"""Normalize event data format"""
# Standardize timestamp format
# Normalize field names
# Validate data types
return event
async def enrich_event_context(self, event):
"""Enrich event with additional context"""
# Add geolocation data
# Add user profile information
# Add system state information
return event
Complex Event Correlation
class EventCorrelationAgent(WorkerAgent):
"""Agent that correlates and analyzes complex event patterns"""
default_agent_id = "correlator"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.event_buffer = []
self.correlation_rules = []
self.pattern_detectors = {}
self.setup_correlation_rules()
def setup_correlation_rules(self):
"""Define correlation rules and patterns"""
self.correlation_rules = [
{
"name": "login_anomaly",
"pattern": ["user.login.failed", "user.login.failed", "user.login.failed"],
"timeframe": 300, # 5 minutes
"action": self.handle_login_anomaly
},
{
"name": "system_degradation",
"pattern": ["system.performance.warning", "system.error.*"],
"timeframe": 600, # 10 minutes
"action": self.handle_system_degradation
}
]
@on_event("*")
async def collect_events(self, context):
"""Collect events for correlation analysis"""
event = context.incoming_event
# Add to event buffer
self.event_buffer.append({
"event": event,
"timestamp": context.timestamp,
"context": context
})
# Maintain buffer size (keep last 1000 events)
if len(self.event_buffer) > 1000:
self.event_buffer = self.event_buffer[-1000:]
# Check for pattern matches
await self.check_correlation_patterns()
async def check_correlation_patterns(self):
"""Check if any correlation patterns are matched"""
for rule in self.correlation_rules:
if await self.pattern_matches(rule):
await rule["action"](rule, self.get_matching_events(rule))
async def pattern_matches(self, rule) -> bool:
"""Check if a correlation pattern is matched"""
pattern = rule["pattern"]
timeframe = rule["timeframe"]
current_time = time.time()
# Filter events within timeframe
recent_events = [
e for e in self.event_buffer
if current_time - e["timestamp"] <= timeframe
]
# Check pattern match
pattern_index = 0
for event_data in recent_events:
event_name = event_data["event"].event_name
if self.event_matches_pattern(event_name, pattern[pattern_index]):
pattern_index += 1
if pattern_index >= len(pattern):
return True
return False
def event_matches_pattern(self, event_name: str, pattern: str) -> bool:
"""Check if event name matches pattern (supports wildcards)"""
if "*" in pattern:
pattern_prefix = pattern.replace("*", "")
return event_name.startswith(pattern_prefix)
else:
return event_name == pattern
async def handle_login_anomaly(self, rule, matching_events):
"""Handle detected login anomaly"""
ws = self.workspace()
# Extract user information
user_events = [e for e in matching_events if "user.login.failed" in e["event"].event_name]
affected_users = set(e["event"].payload.get("user_id") for e in user_events)
alert_message = (
f"🚨 LOGIN ANOMALY DETECTED\n\n"
f"Pattern: {rule['name']}\n"
f"Affected users: {', '.join(affected_users)}\n"
f"Event count: {len(user_events)}\n"
f"Timeframe: {rule['timeframe']} seconds\n\n"
f"Recommended actions:\n"
f"• Review login logs\n"
f"• Check for brute force attacks\n"
f"• Consider temporary account lockouts"
)
await ws.channel("#security").post(alert_message)
External Service Integration
API Gateway Agent
import aiohttp
import asyncio
from typing import Dict, Any
class APIGatewayAgent(WorkerAgent):
"""Agent that acts as a gateway to external APIs"""
default_agent_id = "api-gateway"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.api_configs = self.load_api_configurations()
self.rate_limiters = {}
self.circuit_breakers = {}
self.api_cache = {}
def load_api_configurations(self) -> Dict[str, Dict[str, Any]]:
"""Load external API configurations"""
return {
"weather": {
"base_url": "https://api.openweathermap.org/data/2.5",
"api_key": os.getenv("WEATHER_API_KEY"),
"rate_limit": {"requests": 60, "window": 60}, # 60 requests per minute
"timeout": 10,
"retry_attempts": 3
},
"news": {
"base_url": "https://newsapi.org/v2",
"api_key": os.getenv("NEWS_API_KEY"),
"rate_limit": {"requests": 100, "window": 3600}, # 100 requests per hour
"timeout": 15,
"retry_attempts": 2
},
"translate": {
"base_url": "https://api.mymemory.translated.net",
"api_key": None, # Free tier
"rate_limit": {"requests": 1000, "window": 86400}, # 1000 requests per day
"timeout": 5,
"retry_attempts": 2
}
}
async def on_direct(self, msg):
"""Route API requests based on message content"""
request_type = self.parse_api_request(msg.text)
if request_type:
await self.handle_api_request(msg, request_type)
else:
await self.show_available_apis(msg)
def parse_api_request(self, text: str) -> str:
"""Parse API request type from message"""
text_lower = text.lower()
if any(word in text_lower for word in ["weather", "temperature", "forecast"]):
return "weather"
elif any(word in text_lower for word in ["news", "headlines", "articles"]):
return "news"
elif any(word in text_lower for word in ["translate", "translation"]):
return "translate"
else:
return None
async def handle_api_request(self, msg, api_type: str):
"""Handle specific API request with rate limiting and error handling"""
if not await self.check_rate_limit(api_type, msg.sender_id):
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"⏰ Rate limit exceeded for {api_type} API. Please try again later."
)
return
try:
# Check circuit breaker
if self.is_circuit_open(api_type):
raise Exception(f"{api_type} API is currently unavailable")
# Make API request
result = await self.make_api_request(api_type, msg.text, msg.sender_id)
# Send response
ws = self.workspace()
await ws.agent(msg.sender_id).send(result)
# Reset circuit breaker on success
self.reset_circuit_breaker(api_type)
except Exception as e:
await self.handle_api_error(msg, api_type, str(e))
async def make_api_request(self, api_type: str, query: str, user_id: str) -> str:
"""Make request to external API with retry logic"""
config = self.api_configs[api_type]
for attempt in range(config["retry_attempts"]):
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=config["timeout"])) as session:
if api_type == "weather":
return await self.call_weather_api(session, config, query)
elif api_type == "news":
return await self.call_news_api(session, config, query)
elif api_type == "translate":
return await self.call_translate_api(session, config, query)
except asyncio.TimeoutError:
if attempt == config["retry_attempts"] - 1:
raise Exception(f"API timeout after {config['retry_attempts']} attempts")
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
if attempt == config["retry_attempts"] - 1:
raise e
await asyncio.sleep(1)
async def call_weather_api(self, session, config, query):
"""Call weather API"""
# Extract location from query
location = self.extract_location_from_query(query)
url = f"{config['base_url']}/weather"
params = {
"q": location,
"appid": config["api_key"],
"units": "metric"
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return self.format_weather_response(data, location)
else:
raise Exception(f"Weather API error: {response.status}")
def format_weather_response(self, data, location):
"""Format weather API response"""
temp = data["main"]["temp"]
description = data["weather"][0]["description"]
humidity = data["main"]["humidity"]
return (
f"🌤️ Weather for {location}:\n"
f"Temperature: {temp}°C\n"
f"Conditions: {description.title()}\n"
f"Humidity: {humidity}%"
)
Database Integration Agent
import asyncpg
import asyncio
from datetime import datetime
class DatabaseAgent(WorkerAgent):
"""Agent that integrates with databases for data storage and retrieval"""
default_agent_id = "database"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.db_pool = None
self.query_cache = {}
self.connection_config = {
"host": os.getenv("DB_HOST", "localhost"),
"port": os.getenv("DB_PORT", 5432),
"database": os.getenv("DB_NAME", "openagents"),
"user": os.getenv("DB_USER", "postgres"),
"password": os.getenv("DB_PASSWORD", "")
}
async def on_startup(self):
"""Initialize database connection pool"""
await super().on_startup()
try:
self.db_pool = await asyncpg.create_pool(**self.connection_config)
# Initialize database schema
await self.initialize_schema()
ws = self.workspace()
await ws.channel("#system").post("🗄️ Database agent connected and ready!")
except Exception as e:
print(f"Database connection failed: {e}")
async def initialize_schema(self):
"""Create necessary database tables"""
schema_sql = """
CREATE TABLE IF NOT EXISTS agent_interactions (
id SERIAL PRIMARY KEY,
agent_id VARCHAR(255),
user_id VARCHAR(255),
message_type VARCHAR(50),
content TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS user_profiles (
user_id VARCHAR(255) PRIMARY KEY,
preferences JSONB,
interaction_count INTEGER DEFAULT 0,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS system_metrics (
id SERIAL PRIMARY KEY,
metric_name VARCHAR(255),
metric_value DECIMAL,
tags JSONB,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
async with self.db_pool.acquire() as connection:
await connection.execute(schema_sql)
async def on_direct(self, msg):
"""Handle database queries and operations"""
query_type = self.parse_database_request(msg.text)
if query_type == "search":
await self.handle_search_request(msg)
elif query_type == "analytics":
await self.handle_analytics_request(msg)
elif query_type == "profile":
await self.handle_profile_request(msg)
else:
await self.show_database_help(msg)
async def handle_search_request(self, msg):
"""Handle data search requests"""
search_term = self.extract_search_term(msg.text)
if not search_term:
ws = self.workspace()
await ws.agent(msg.sender_id).send(
"Please specify what you'd like to search for."
)
return
try:
async with self.db_pool.acquire() as connection:
query = """
SELECT agent_id, user_id, content, timestamp
FROM agent_interactions
WHERE content ILIKE $1
ORDER BY timestamp DESC
LIMIT 10
"""
results = await connection.fetch(query, f"%{search_term}%")
if results:
response = f"🔍 Search results for '{search_term}':\n\n"
for row in results:
response += f"• {row['timestamp']}: {row['agent_id']} ↔ {row['user_id']}\n"
response += f" {row['content'][:100]}...\n\n"
else:
response = f"No results found for '{search_term}'"
ws = self.workspace()
await ws.agent(msg.sender_id).send(response)
except Exception as e:
await self.handle_database_error(msg, str(e))
async def log_interaction(self, agent_id: str, user_id: str, message_type: str, content: str):
"""Log agent interaction to database"""
try:
async with self.db_pool.acquire() as connection:
await connection.execute(
"""
INSERT INTO agent_interactions (agent_id, user_id, message_type, content)
VALUES ($1, $2, $3, $4)
""",
agent_id, user_id, message_type, content
)
except Exception as e:
print(f"Failed to log interaction: {e}")
Custom Protocols and Communication
Protocol Bridge Agent
import websockets
import json
from typing import Dict, Any
class ProtocolBridgeAgent(WorkerAgent):
"""Agent that bridges different communication protocols"""
default_agent_id = "protocol-bridge"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.protocol_handlers = {}
self.active_connections = {}
self.message_queues = {}
self.setup_protocol_handlers()
def setup_protocol_handlers(self):
"""Setup handlers for different protocols"""
self.protocol_handlers = {
"websocket": self.handle_websocket_protocol,
"mqtt": self.handle_mqtt_protocol,
"slack": self.handle_slack_protocol,
"discord": self.handle_discord_protocol
}
async def on_startup(self):
"""Start protocol bridges"""
await super().on_startup()
# Start WebSocket server
asyncio.create_task(self.start_websocket_server())
# Connect to external services
await self.connect_external_protocols()
async def start_websocket_server(self):
"""Start WebSocket server for external connections"""
async def handle_websocket(websocket, path):
try:
await self.handle_websocket_connection(websocket, path)
except Exception as e:
print(f"WebSocket error: {e}")
# Start WebSocket server on port 8080
await websockets.serve(handle_websocket, "localhost", 8080)
print("🌐 WebSocket bridge server started on port 8080")
async def handle_websocket_connection(self, websocket, path):
"""Handle new WebSocket connection"""
connection_id = f"ws_{len(self.active_connections)}"
self.active_connections[connection_id] = {
"protocol": "websocket",
"connection": websocket,
"path": path
}
try:
async for message in websocket:
await self.process_external_message("websocket", connection_id, message)
finally:
del self.active_connections[connection_id]
async def process_external_message(self, protocol: str, connection_id: str, message: str):
"""Process message from external protocol"""
try:
# Parse message
if protocol == "websocket":
data = json.loads(message)
else:
data = {"content": message}
# Route to OpenAgents network
await self.route_external_message(protocol, connection_id, data)
except Exception as e:
print(f"Error processing {protocol} message: {e}")
async def route_external_message(self, protocol: str, connection_id: str, data: Dict[str, Any]):
"""Route external message to OpenAgents network"""
# Determine target channel or agent
target = data.get("target", "#bridge")
content = data.get("content", str(data))
# Add protocol information
formatted_message = f"[{protocol.upper()}] {content}"
ws = self.workspace()
if target.startswith("#"):
# Send to channel
await ws.channel(target).post(formatted_message)
else:
# Send to specific agent
await ws.agent(target).send(formatted_message)
async def on_channel_post(self, msg):
"""Bridge channel messages to external protocols"""
if msg.channel == "#bridge":
await self.broadcast_to_external_protocols(msg.text, msg.sender_id)
async def broadcast_to_external_protocols(self, message: str, sender_id: str):
"""Broadcast message to all connected external protocols"""
formatted_message = {
"sender": sender_id,
"content": message,
"timestamp": datetime.now().isoformat()
}
for connection_id, connection_info in self.active_connections.items():
protocol = connection_info["protocol"]
try:
if protocol == "websocket":
await connection_info["connection"].send(json.dumps(formatted_message))
# Add other protocol handlers here
except Exception as e:
print(f"Failed to send to {protocol} connection {connection_id}: {e}")
This comprehensive tutorial provides the foundation for creating highly customized agents with sophisticated behaviors, integrations, and capabilities. Use these patterns as building blocks to create agents tailored to your specific use cases and requirements.
Was this helpful?