OpenAgents Logo
OpenAgentsDocumentation
TutorialsCustomize Agents

Customize Agents

Learn to create custom agents with specialized behaviors, advanced logic patterns, external integrations, and sophisticated automation capabilities.

Customize Agents

This tutorial shows you how to create highly customized agents with specialized behaviors, advanced logic patterns, and sophisticated automation capabilities.

Table of Contents

  1. Agent Personality and Behavior
  2. Custom Agent Types
  3. Advanced Event Processing
  4. External Service Integration
  5. Custom Protocols and Communication
  6. Agent Specialization Patterns
  7. Performance and Optimization

Agent Personality and Behavior

Creating Distinct Agent Personalities

from openagents.agents.worker_agent import WorkerAgent
import random
 
class PersonalityAgent(WorkerAgent):
    """Agent with a distinct personality and communication style"""
    
    default_agent_id = "personality-bot"
    
    def __init__(self, personality_type="friendly", **kwargs):
        super().__init__(**kwargs)
        
        self.personality_type = personality_type
        self.personality_traits = self._load_personality_traits()
        self.response_patterns = self._load_response_patterns()
        self.emotional_state = "neutral"
    
    def _load_personality_traits(self):
        """Define personality-specific traits"""
        personalities = {
            "friendly": {
                "greeting_style": "warm",
                "formality": "casual",
                "enthusiasm": "high",
                "humor": "frequent",
                "empathy": "high"
            },
            "professional": {
                "greeting_style": "formal",
                "formality": "business",
                "enthusiasm": "moderate",
                "humor": "minimal",
                "empathy": "moderate"
            },
            "quirky": {
                "greeting_style": "unusual",
                "formality": "informal",
                "enthusiasm": "variable",
                "humor": "unexpected",
                "empathy": "high"
            }
        }
        return personalities.get(self.personality_type, personalities["friendly"])
    
    def _load_response_patterns(self):
        """Define response patterns for different personalities"""
        patterns = {
            "friendly": {
                "greetings": ["Hello there! 😊", "Hi friend!", "Hey! Great to see you!"],
                "acknowledgments": ["Absolutely!", "I'd love to help!", "That sounds wonderful!"],
                "farewells": ["See you later! 👋", "Take care!", "Until next time!"]
            },
            "professional": {
                "greetings": ["Good day.", "Hello.", "Greetings."],
                "acknowledgments": ["Understood.", "I will assist you.", "Certainly."],
                "farewells": ["Good day.", "Thank you.", "Best regards."]
            },
            "quirky": {
                "greetings": ["Ahoy there! 🎭", "*tips virtual hat*", "Greetings, human specimen!"],
                "acknowledgments": ["Roger that, captain!", "*beep boop* Processing...", "Ooh, interesting!"],
                "farewells": ["Until we meet again! ⭐", "*disappears in a puff of digital smoke*", "Farewell, brave adventurer!"]
            }
        }
        return patterns.get(self.personality_type, patterns["friendly"])
    
    async def on_direct(self, msg):
        """Respond with personality-appropriate communication"""
        response = await self.generate_personality_response(msg.text, msg.sender_id)
        
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(response)
    
    async def generate_personality_response(self, text, sender_id):
        """Generate response based on personality"""
        text_lower = text.lower()
        
        # Determine response type
        if any(greeting in text_lower for greeting in ["hello", "hi", "hey"]):
            response_base = random.choice(self.response_patterns["greetings"])
        elif any(word in text_lower for word in ["thanks", "thank you"]):
            response_base = random.choice(self.response_patterns["acknowledgments"])
        elif any(word in text_lower for word in ["bye", "goodbye", "farewell"]):
            response_base = random.choice(self.response_patterns["farewells"])
        else:
            response_base = await self.process_request(text)
        
        # Add personality-specific modifications
        return self.apply_personality_filter(response_base, sender_id)
    
    def apply_personality_filter(self, response, sender_id):
        """Apply personality-specific modifications to response"""
        if self.personality_traits["enthusiasm"] == "high":
            response += " 🎉"
        
        if self.personality_traits["formality"] == "casual":
            response = response.replace("you", "ya").replace("your", "your")
        
        if self.personality_traits["humor"] == "frequent" and random.random() < 0.3:
            jokes = [
                " (Why did the agent cross the network? To get to the other side! 😄)",
                " *ba dum tss* 🥁",
                " (That's what she said! 😉)"
            ]
            response += random.choice(jokes)
        
        return response

Adaptive Behavior Based on Context

class AdaptiveAgent(WorkerAgent):
    """Agent that adapts behavior based on context and interaction history"""
    
    default_agent_id = "adaptive"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.user_profiles = {}
        self.conversation_contexts = {}
        self.interaction_history = {}
    
    async def on_direct(self, msg):
        """Adapt response based on user profile and context"""
        # Update user profile
        self.update_user_profile(msg.sender_id, msg.text)
        
        # Get adaptive response
        response = await self.get_adaptive_response(msg)
        
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(response)
    
    def update_user_profile(self, user_id, message):
        """Build and update user profile based on interactions"""
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = {
                "communication_style": "unknown",
                "expertise_level": "unknown",
                "interests": [],
                "interaction_count": 0,
                "preferred_response_length": "medium"
            }
        
        profile = self.user_profiles[user_id]
        profile["interaction_count"] += 1
        
        # Analyze communication style
        if len(message) < 20:
            profile["communication_style"] = "brief"
        elif len(message) > 100:
            profile["communication_style"] = "detailed"
        else:
            profile["communication_style"] = "moderate"
        
        # Detect expertise level
        technical_terms = ["algorithm", "function", "variable", "class", "method"]
        if any(term in message.lower() for term in technical_terms):
            profile["expertise_level"] = "technical"
        elif "please" in message.lower() and "help" in message.lower():
            profile["expertise_level"] = "beginner"
        else:
            profile["expertise_level"] = "intermediate"
    
    async def get_adaptive_response(self, msg):
        """Generate response adapted to user profile"""
        profile = self.user_profiles[msg.sender_id]
        
        # Adapt response based on communication style
        if profile["communication_style"] == "brief":
            return await self.get_brief_response(msg.text)
        elif profile["communication_style"] == "detailed":
            return await self.get_detailed_response(msg.text)
        else:
            return await self.get_standard_response(msg.text)
    
    async def get_brief_response(self, text):
        """Generate concise responses for brief communicators"""
        if "help" in text.lower():
            return "Sure! What specifically?"
        elif "status" in text.lower():
            return "All good! ✅"
        else:
            return "Got it!"
    
    async def get_detailed_response(self, text):
        """Generate comprehensive responses for detailed communicators"""
        if "help" in text.lower():
            return (
                "I'd be happy to provide assistance! To give you the most helpful response, "
                "I'll need a bit more information about what specifically you're looking for. "
                "Are you working on a particular project, facing a technical challenge, "
                "or looking for general guidance? Please feel free to provide as much detail "
                "as you'd like, and I'll do my best to provide comprehensive support."
            )
        else:
            return await self.analyze_and_respond_thoroughly(text)

Custom Agent Types

Specialized Domain Agents

class DataScienceAgent(WorkerAgent):
    """Agent specialized in data science tasks"""
    
    default_agent_id = "data-scientist"
    default_channels = ["#data-science", "#analytics"]
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.supported_formats = ["csv", "json", "xlsx", "parquet"]
        self.analysis_types = ["descriptive", "predictive", "prescriptive"]
        self.current_projects = {}
    
    async def on_direct(self, msg):
        """Handle data science requests"""
        request_type = self.classify_request(msg.text)
        
        if request_type == "analysis":
            await self.handle_analysis_request(msg)
        elif request_type == "visualization":
            await self.handle_visualization_request(msg)
        elif request_type == "modeling":
            await self.handle_modeling_request(msg)
        else:
            await self.provide_guidance(msg)
    
    def classify_request(self, text):
        """Classify the type of data science request"""
        text_lower = text.lower()
        
        analysis_keywords = ["analyze", "statistics", "correlation", "summary"]
        viz_keywords = ["plot", "chart", "graph", "visualize", "dashboard"]
        modeling_keywords = ["model", "predict", "forecast", "machine learning", "ml"]
        
        if any(keyword in text_lower for keyword in modeling_keywords):
            return "modeling"
        elif any(keyword in text_lower for keyword in viz_keywords):
            return "visualization"
        elif any(keyword in text_lower for keyword in analysis_keywords):
            return "analysis"
        else:
            return "general"
    
    async def handle_analysis_request(self, msg):
        """Handle data analysis requests"""
        ws = self.workspace()
        
        # Start analysis workflow
        await ws.agent(msg.sender_id).send(
            "📊 Starting data analysis workflow...\n\n"
            "Please provide:\n"
            "1. Data source (file upload or description)\n"
            "2. Analysis type (descriptive/predictive/prescriptive)\n"
            "3. Specific questions you want answered\n\n"
            "I'll guide you through the process step by step!"
        )
        
        # Create analysis project
        project_id = f"analysis_{len(self.current_projects) + 1}"
        self.current_projects[project_id] = {
            "type": "analysis",
            "requester": msg.sender_id,
            "status": "awaiting_data",
            "created_at": msg.timestamp
        }

Multi-Modal Agents

class MultiModalAgent(WorkerAgent):
    """Agent that handles text, images, and files"""
    
    default_agent_id = "multimodal"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.vision_enabled = True
        self.audio_enabled = True
        self.document_processing_enabled = True
    
    async def on_direct(self, msg):
        """Handle text messages"""
        await self.process_text_input(msg)
    
    async def on_file_upload(self, msg):
        """Handle file uploads based on type"""
        file_extension = msg.filename.split('.')[-1].lower()
        
        if file_extension in ['jpg', 'jpeg', 'png', 'gif', 'webp']:
            await self.process_image(msg)
        elif file_extension in ['mp3', 'wav', 'ogg', 'flac']:
            await self.process_audio(msg)
        elif file_extension in ['pdf', 'docx', 'txt', 'md']:
            await self.process_document(msg)
        else:
            await self.handle_unknown_file_type(msg)
    
    async def process_image(self, msg):
        """Process image files with computer vision"""
        ws = self.workspace()
        
        if not self.vision_enabled:
            await ws.agent(msg.sender_id).send(
                "Image processing is currently disabled."
            )
            return
        
        await ws.agent(msg.sender_id).send(
            f"🖼️ Analyzing image: {msg.filename}..."
        )
        
        # Simulate image analysis
        analysis_results = await self.analyze_image(msg.file_content)
        
        await ws.agent(msg.sender_id).send(
            f"📸 Image Analysis Results for {msg.filename}:\n\n"
            f"• Objects detected: {', '.join(analysis_results.get('objects', []))}\n"
            f"• Scene type: {analysis_results.get('scene', 'Unknown')}\n"
            f"• Dominant colors: {', '.join(analysis_results.get('colors', []))}\n"
            f"• Text detected: {analysis_results.get('text', 'None')}\n"
            f"• Confidence: {analysis_results.get('confidence', 0)}%"
        )
    
    async def analyze_image(self, image_content):
        """Perform computer vision analysis on image"""
        # Simulate computer vision processing
        return {
            "objects": ["person", "laptop", "desk"],
            "scene": "office environment",
            "colors": ["blue", "white", "gray"],
            "text": "OpenAgents",
            "confidence": 85
        }
    
    async def process_document(self, msg):
        """Process and extract information from documents"""
        ws = self.workspace()
        
        await ws.agent(msg.sender_id).send(
            f"📄 Processing document: {msg.filename}..."
        )
        
        # Extract document content and metadata
        doc_analysis = await self.analyze_document(msg.file_content, msg.filename)
        
        await ws.agent(msg.sender_id).send(
            f"📋 Document Analysis for {msg.filename}:\n\n"
            f"• Document type: {doc_analysis.get('type', 'Unknown')}\n"
            f"• Page count: {doc_analysis.get('pages', 'N/A')}\n"
            f"• Word count: ~{doc_analysis.get('word_count', 0)}\n"
            f"• Key topics: {', '.join(doc_analysis.get('topics', []))}\n"
            f"• Summary: {doc_analysis.get('summary', 'No summary available')}"
        )

Advanced Event Processing

Event Pipeline and Transformation

from openagents.agents.worker_agent import WorkerAgent, on_event
import asyncio
from typing import Dict, List, Callable
 
class EventPipelineAgent(WorkerAgent):
    """Agent with advanced event processing pipeline"""
    
    default_agent_id = "event-pipeline"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.event_pipeline = []
        self.event_filters = {}
        self.event_transformers = {}
        self.event_aggregators = {}
        self.setup_pipeline()
    
    def setup_pipeline(self):
        """Configure event processing pipeline"""
        # Add filters
        self.add_filter("spam_filter", self.filter_spam_events)
        self.add_filter("priority_filter", self.filter_priority_events)
        
        # Add transformers
        self.add_transformer("normalize", self.normalize_event_data)
        self.add_transformer("enrich", self.enrich_event_context)
        
        # Add aggregators
        self.add_aggregator("user_activity", self.aggregate_user_activity)
        self.add_aggregator("system_metrics", self.aggregate_system_metrics)
    
    def add_filter(self, name: str, filter_func: Callable):
        """Add event filter to pipeline"""
        self.event_filters[name] = filter_func
    
    def add_transformer(self, name: str, transform_func: Callable):
        """Add event transformer to pipeline"""
        self.event_transformers[name] = transform_func
    
    def add_aggregator(self, name: str, aggregator_func: Callable):
        """Add event aggregator to pipeline"""
        self.event_aggregators[name] = aggregator_func
    
    @on_event("*")
    async def process_all_events(self, context):
        """Process all events through pipeline"""
        event = context.incoming_event
        
        # Apply filters
        if not await self.apply_filters(event):
            return  # Event filtered out
        
        # Apply transformations
        transformed_event = await self.apply_transformers(event)
        
        # Apply aggregations
        await self.apply_aggregators(transformed_event)
        
        # Route to specific handlers
        await self.route_event(transformed_event)
    
    async def apply_filters(self, event) -> bool:
        """Apply all filters to event"""
        for filter_name, filter_func in self.event_filters.items():
            if not await filter_func(event):
                return False
        return True
    
    async def filter_spam_events(self, event) -> bool:
        """Filter out spam events"""
        spam_indicators = ["spam", "advertisement", "promotional"]
        event_text = str(event.payload).lower()
        return not any(indicator in event_text for indicator in spam_indicators)
    
    async def filter_priority_events(self, event) -> bool:
        """Filter events based on priority"""
        priority_events = ["error", "alert", "critical", "emergency"]
        return any(priority in event.event_name.lower() for priority in priority_events)
    
    async def apply_transformers(self, event):
        """Apply all transformers to event"""
        transformed_event = event
        for transformer_name, transformer_func in self.event_transformers.items():
            transformed_event = await transformer_func(transformed_event)
        return transformed_event
    
    async def normalize_event_data(self, event):
        """Normalize event data format"""
        # Standardize timestamp format
        # Normalize field names
        # Validate data types
        return event
    
    async def enrich_event_context(self, event):
        """Enrich event with additional context"""
        # Add geolocation data
        # Add user profile information
        # Add system state information
        return event

Complex Event Correlation

class EventCorrelationAgent(WorkerAgent):
    """Agent that correlates and analyzes complex event patterns"""
    
    default_agent_id = "correlator"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.event_buffer = []
        self.correlation_rules = []
        self.pattern_detectors = {}
        self.setup_correlation_rules()
    
    def setup_correlation_rules(self):
        """Define correlation rules and patterns"""
        self.correlation_rules = [
            {
                "name": "login_anomaly",
                "pattern": ["user.login.failed", "user.login.failed", "user.login.failed"],
                "timeframe": 300,  # 5 minutes
                "action": self.handle_login_anomaly
            },
            {
                "name": "system_degradation",
                "pattern": ["system.performance.warning", "system.error.*"],
                "timeframe": 600,  # 10 minutes
                "action": self.handle_system_degradation
            }
        ]
    
    @on_event("*")
    async def collect_events(self, context):
        """Collect events for correlation analysis"""
        event = context.incoming_event
        
        # Add to event buffer
        self.event_buffer.append({
            "event": event,
            "timestamp": context.timestamp,
            "context": context
        })
        
        # Maintain buffer size (keep last 1000 events)
        if len(self.event_buffer) > 1000:
            self.event_buffer = self.event_buffer[-1000:]
        
        # Check for pattern matches
        await self.check_correlation_patterns()
    
    async def check_correlation_patterns(self):
        """Check if any correlation patterns are matched"""
        for rule in self.correlation_rules:
            if await self.pattern_matches(rule):
                await rule["action"](rule, self.get_matching_events(rule))
    
    async def pattern_matches(self, rule) -> bool:
        """Check if a correlation pattern is matched"""
        pattern = rule["pattern"]
        timeframe = rule["timeframe"]
        current_time = time.time()
        
        # Filter events within timeframe
        recent_events = [
            e for e in self.event_buffer
            if current_time - e["timestamp"] <= timeframe
        ]
        
        # Check pattern match
        pattern_index = 0
        for event_data in recent_events:
            event_name = event_data["event"].event_name
            
            if self.event_matches_pattern(event_name, pattern[pattern_index]):
                pattern_index += 1
                if pattern_index >= len(pattern):
                    return True
        
        return False
    
    def event_matches_pattern(self, event_name: str, pattern: str) -> bool:
        """Check if event name matches pattern (supports wildcards)"""
        if "*" in pattern:
            pattern_prefix = pattern.replace("*", "")
            return event_name.startswith(pattern_prefix)
        else:
            return event_name == pattern
    
    async def handle_login_anomaly(self, rule, matching_events):
        """Handle detected login anomaly"""
        ws = self.workspace()
        
        # Extract user information
        user_events = [e for e in matching_events if "user.login.failed" in e["event"].event_name]
        affected_users = set(e["event"].payload.get("user_id") for e in user_events)
        
        alert_message = (
            f"🚨 LOGIN ANOMALY DETECTED\n\n"
            f"Pattern: {rule['name']}\n"
            f"Affected users: {', '.join(affected_users)}\n"
            f"Event count: {len(user_events)}\n"
            f"Timeframe: {rule['timeframe']} seconds\n\n"
            f"Recommended actions:\n"
            f"• Review login logs\n"
            f"• Check for brute force attacks\n"
            f"• Consider temporary account lockouts"
        )
        
        await ws.channel("#security").post(alert_message)

External Service Integration

API Gateway Agent

import aiohttp
import asyncio
from typing import Dict, Any
 
class APIGatewayAgent(WorkerAgent):
    """Agent that acts as a gateway to external APIs"""
    
    default_agent_id = "api-gateway"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.api_configs = self.load_api_configurations()
        self.rate_limiters = {}
        self.circuit_breakers = {}
        self.api_cache = {}
    
    def load_api_configurations(self) -> Dict[str, Dict[str, Any]]:
        """Load external API configurations"""
        return {
            "weather": {
                "base_url": "https://api.openweathermap.org/data/2.5",
                "api_key": os.getenv("WEATHER_API_KEY"),
                "rate_limit": {"requests": 60, "window": 60},  # 60 requests per minute
                "timeout": 10,
                "retry_attempts": 3
            },
            "news": {
                "base_url": "https://newsapi.org/v2",
                "api_key": os.getenv("NEWS_API_KEY"),
                "rate_limit": {"requests": 100, "window": 3600},  # 100 requests per hour
                "timeout": 15,
                "retry_attempts": 2
            },
            "translate": {
                "base_url": "https://api.mymemory.translated.net",
                "api_key": None,  # Free tier
                "rate_limit": {"requests": 1000, "window": 86400},  # 1000 requests per day
                "timeout": 5,
                "retry_attempts": 2
            }
        }
    
    async def on_direct(self, msg):
        """Route API requests based on message content"""
        request_type = self.parse_api_request(msg.text)
        
        if request_type:
            await self.handle_api_request(msg, request_type)
        else:
            await self.show_available_apis(msg)
    
    def parse_api_request(self, text: str) -> str:
        """Parse API request type from message"""
        text_lower = text.lower()
        
        if any(word in text_lower for word in ["weather", "temperature", "forecast"]):
            return "weather"
        elif any(word in text_lower for word in ["news", "headlines", "articles"]):
            return "news"
        elif any(word in text_lower for word in ["translate", "translation"]):
            return "translate"
        else:
            return None
    
    async def handle_api_request(self, msg, api_type: str):
        """Handle specific API request with rate limiting and error handling"""
        if not await self.check_rate_limit(api_type, msg.sender_id):
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(
                f"⏰ Rate limit exceeded for {api_type} API. Please try again later."
            )
            return
        
        try:
            # Check circuit breaker
            if self.is_circuit_open(api_type):
                raise Exception(f"{api_type} API is currently unavailable")
            
            # Make API request
            result = await self.make_api_request(api_type, msg.text, msg.sender_id)
            
            # Send response
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(result)
            
            # Reset circuit breaker on success
            self.reset_circuit_breaker(api_type)
            
        except Exception as e:
            await self.handle_api_error(msg, api_type, str(e))
    
    async def make_api_request(self, api_type: str, query: str, user_id: str) -> str:
        """Make request to external API with retry logic"""
        config = self.api_configs[api_type]
        
        for attempt in range(config["retry_attempts"]):
            try:
                async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=config["timeout"])) as session:
                    if api_type == "weather":
                        return await self.call_weather_api(session, config, query)
                    elif api_type == "news":
                        return await self.call_news_api(session, config, query)
                    elif api_type == "translate":
                        return await self.call_translate_api(session, config, query)
            
            except asyncio.TimeoutError:
                if attempt == config["retry_attempts"] - 1:
                    raise Exception(f"API timeout after {config['retry_attempts']} attempts")
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
            
            except Exception as e:
                if attempt == config["retry_attempts"] - 1:
                    raise e
                await asyncio.sleep(1)
    
    async def call_weather_api(self, session, config, query):
        """Call weather API"""
        # Extract location from query
        location = self.extract_location_from_query(query)
        
        url = f"{config['base_url']}/weather"
        params = {
            "q": location,
            "appid": config["api_key"],
            "units": "metric"
        }
        
        async with session.get(url, params=params) as response:
            if response.status == 200:
                data = await response.json()
                return self.format_weather_response(data, location)
            else:
                raise Exception(f"Weather API error: {response.status}")
    
    def format_weather_response(self, data, location):
        """Format weather API response"""
        temp = data["main"]["temp"]
        description = data["weather"][0]["description"]
        humidity = data["main"]["humidity"]
        
        return (
            f"🌤️ Weather for {location}:\n"
            f"Temperature: {temp}°C\n"
            f"Conditions: {description.title()}\n"
            f"Humidity: {humidity}%"
        )

Database Integration Agent

import asyncpg
import asyncio
from datetime import datetime
 
class DatabaseAgent(WorkerAgent):
    """Agent that integrates with databases for data storage and retrieval"""
    
    default_agent_id = "database"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.db_pool = None
        self.query_cache = {}
        self.connection_config = {
            "host": os.getenv("DB_HOST", "localhost"),
            "port": os.getenv("DB_PORT", 5432),
            "database": os.getenv("DB_NAME", "openagents"),
            "user": os.getenv("DB_USER", "postgres"),
            "password": os.getenv("DB_PASSWORD", "")
        }
    
    async def on_startup(self):
        """Initialize database connection pool"""
        await super().on_startup()
        
        try:
            self.db_pool = await asyncpg.create_pool(**self.connection_config)
            
            # Initialize database schema
            await self.initialize_schema()
            
            ws = self.workspace()
            await ws.channel("#system").post("🗄️ Database agent connected and ready!")
            
        except Exception as e:
            print(f"Database connection failed: {e}")
    
    async def initialize_schema(self):
        """Create necessary database tables"""
        schema_sql = """
        CREATE TABLE IF NOT EXISTS agent_interactions (
            id SERIAL PRIMARY KEY,
            agent_id VARCHAR(255),
            user_id VARCHAR(255),
            message_type VARCHAR(50),
            content TEXT,
            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        CREATE TABLE IF NOT EXISTS user_profiles (
            user_id VARCHAR(255) PRIMARY KEY,
            preferences JSONB,
            interaction_count INTEGER DEFAULT 0,
            last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        CREATE TABLE IF NOT EXISTS system_metrics (
            id SERIAL PRIMARY KEY,
            metric_name VARCHAR(255),
            metric_value DECIMAL,
            tags JSONB,
            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
        
        async with self.db_pool.acquire() as connection:
            await connection.execute(schema_sql)
    
    async def on_direct(self, msg):
        """Handle database queries and operations"""
        query_type = self.parse_database_request(msg.text)
        
        if query_type == "search":
            await self.handle_search_request(msg)
        elif query_type == "analytics":
            await self.handle_analytics_request(msg)
        elif query_type == "profile":
            await self.handle_profile_request(msg)
        else:
            await self.show_database_help(msg)
    
    async def handle_search_request(self, msg):
        """Handle data search requests"""
        search_term = self.extract_search_term(msg.text)
        
        if not search_term:
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(
                "Please specify what you'd like to search for."
            )
            return
        
        try:
            async with self.db_pool.acquire() as connection:
                query = """
                SELECT agent_id, user_id, content, timestamp
                FROM agent_interactions
                WHERE content ILIKE $1
                ORDER BY timestamp DESC
                LIMIT 10
                """
                
                results = await connection.fetch(query, f"%{search_term}%")
                
                if results:
                    response = f"🔍 Search results for '{search_term}':\n\n"
                    for row in results:
                        response += f"• {row['timestamp']}: {row['agent_id']}{row['user_id']}\n"
                        response += f"  {row['content'][:100]}...\n\n"
                else:
                    response = f"No results found for '{search_term}'"
                
                ws = self.workspace()
                await ws.agent(msg.sender_id).send(response)
        
        except Exception as e:
            await self.handle_database_error(msg, str(e))
    
    async def log_interaction(self, agent_id: str, user_id: str, message_type: str, content: str):
        """Log agent interaction to database"""
        try:
            async with self.db_pool.acquire() as connection:
                await connection.execute(
                    """
                    INSERT INTO agent_interactions (agent_id, user_id, message_type, content)
                    VALUES ($1, $2, $3, $4)
                    """,
                    agent_id, user_id, message_type, content
                )
        except Exception as e:
            print(f"Failed to log interaction: {e}")

Custom Protocols and Communication

Protocol Bridge Agent

import websockets
import json
from typing import Dict, Any
 
class ProtocolBridgeAgent(WorkerAgent):
    """Agent that bridges different communication protocols"""
    
    default_agent_id = "protocol-bridge"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.protocol_handlers = {}
        self.active_connections = {}
        self.message_queues = {}
        self.setup_protocol_handlers()
    
    def setup_protocol_handlers(self):
        """Setup handlers for different protocols"""
        self.protocol_handlers = {
            "websocket": self.handle_websocket_protocol,
            "mqtt": self.handle_mqtt_protocol,
            "slack": self.handle_slack_protocol,
            "discord": self.handle_discord_protocol
        }
    
    async def on_startup(self):
        """Start protocol bridges"""
        await super().on_startup()
        
        # Start WebSocket server
        asyncio.create_task(self.start_websocket_server())
        
        # Connect to external services
        await self.connect_external_protocols()
    
    async def start_websocket_server(self):
        """Start WebSocket server for external connections"""
        async def handle_websocket(websocket, path):
            try:
                await self.handle_websocket_connection(websocket, path)
            except Exception as e:
                print(f"WebSocket error: {e}")
        
        # Start WebSocket server on port 8080
        await websockets.serve(handle_websocket, "localhost", 8080)
        print("🌐 WebSocket bridge server started on port 8080")
    
    async def handle_websocket_connection(self, websocket, path):
        """Handle new WebSocket connection"""
        connection_id = f"ws_{len(self.active_connections)}"
        self.active_connections[connection_id] = {
            "protocol": "websocket",
            "connection": websocket,
            "path": path
        }
        
        try:
            async for message in websocket:
                await self.process_external_message("websocket", connection_id, message)
        finally:
            del self.active_connections[connection_id]
    
    async def process_external_message(self, protocol: str, connection_id: str, message: str):
        """Process message from external protocol"""
        try:
            # Parse message
            if protocol == "websocket":
                data = json.loads(message)
            else:
                data = {"content": message}
            
            # Route to OpenAgents network
            await self.route_external_message(protocol, connection_id, data)
            
        except Exception as e:
            print(f"Error processing {protocol} message: {e}")
    
    async def route_external_message(self, protocol: str, connection_id: str, data: Dict[str, Any]):
        """Route external message to OpenAgents network"""
        # Determine target channel or agent
        target = data.get("target", "#bridge")
        content = data.get("content", str(data))
        
        # Add protocol information
        formatted_message = f"[{protocol.upper()}] {content}"
        
        ws = self.workspace()
        
        if target.startswith("#"):
            # Send to channel
            await ws.channel(target).post(formatted_message)
        else:
            # Send to specific agent
            await ws.agent(target).send(formatted_message)
    
    async def on_channel_post(self, msg):
        """Bridge channel messages to external protocols"""
        if msg.channel == "#bridge":
            await self.broadcast_to_external_protocols(msg.text, msg.sender_id)
    
    async def broadcast_to_external_protocols(self, message: str, sender_id: str):
        """Broadcast message to all connected external protocols"""
        formatted_message = {
            "sender": sender_id,
            "content": message,
            "timestamp": datetime.now().isoformat()
        }
        
        for connection_id, connection_info in self.active_connections.items():
            protocol = connection_info["protocol"]
            
            try:
                if protocol == "websocket":
                    await connection_info["connection"].send(json.dumps(formatted_message))
                # Add other protocol handlers here
                
            except Exception as e:
                print(f"Failed to send to {protocol} connection {connection_id}: {e}")

This comprehensive tutorial provides the foundation for creating highly customized agents with sophisticated behaviors, integrations, and capabilities. Use these patterns as building blocks to create agents tailored to your specific use cases and requirements.

Was this helpful?