Tutorials自定义代理
Updated February 24, 2026
自定义代理
学习创建具有专门行为、高级逻辑模式、外部集成以及复杂自动化能力的自定义代理。
自定义代理
本教程向您展示如何创建高度定制的代理,具备专门的行为、先进的逻辑模式和复杂的自动化能力。
目录
代理个性与行为
创建独特的代理个性
from openagents.agents.worker_agent import WorkerAgent
import random
class PersonalityAgent(WorkerAgent):
"""Agent with a distinct personality and communication style"""
default_agent_id = "personality-bot"
def __init__(self, personality_type="friendly", **kwargs):
super().__init__(**kwargs)
self.personality_type = personality_type
self.personality_traits = self._load_personality_traits()
self.response_patterns = self._load_response_patterns()
self.emotional_state = "neutral"
def _load_personality_traits(self):
"""Define personality-specific traits"""
personalities = {
"friendly": {
"greeting_style": "warm",
"formality": "casual",
"enthusiasm": "high",
"humor": "frequent",
"empathy": "high"
},
"professional": {
"greeting_style": "formal",
"formality": "business",
"enthusiasm": "moderate",
"humor": "minimal",
"empathy": "moderate"
},
"quirky": {
"greeting_style": "unusual",
"formality": "informal",
"enthusiasm": "variable",
"humor": "unexpected",
"empathy": "high"
}
}
return personalities.get(self.personality_type, personalities["friendly"])
def _load_response_patterns(self):
"""Define response patterns for different personalities"""
patterns = {
"friendly": {
"greetings": ["Hello there! 😊", "Hi friend!", "Hey! Great to see you!"],
"acknowledgments": ["Absolutely!", "I'd love to help!", "That sounds wonderful!"],
"farewells": ["See you later! 👋", "Take care!", "Until next time!"]
},
"professional": {
"greetings": ["Good day.", "Hello.", "Greetings."],
"acknowledgments": ["Understood.", "I will assist you.", "Certainly."],
"farewells": ["Good day.", "Thank you.", "Best regards."]
},
"quirky": {
"greetings": ["Ahoy there! 🎭", "*tips virtual hat*", "Greetings, human specimen!"],
"acknowledgments": ["Roger that, captain!", "*beep boop* Processing...", "Ooh, interesting!"],
"farewells": ["Until we meet again! ⭐", "*disappears in a puff of digital smoke*", "Farewell, brave adventurer!"]
}
}
return patterns.get(self.personality_type, patterns["friendly"])
async def on_direct(self, msg):
"""Respond with personality-appropriate communication"""
response = await self.generate_personality_response(msg.text, msg.sender_id)
ws = self.workspace()
await ws.agent(msg.sender_id).send(response)
async def generate_personality_response(self, text, sender_id):
"""Generate response based on personality"""
text_lower = text.lower()
# Determine response type
if any(greeting in text_lower for greeting in ["hello", "hi", "hey"]):
response_base = random.choice(self.response_patterns["greetings"])
elif any(word in text_lower for word in ["thanks", "thank you"]):
response_base = random.choice(self.response_patterns["acknowledgments"])
elif any(word in text_lower for word in ["bye", "goodbye", "farewell"]):
response_base = random.choice(self.response_patterns["farewells"])
else:
response_base = await self.process_request(text)
# Add personality-specific modifications
return self.apply_personality_filter(response_base, sender_id)
def apply_personality_filter(self, response, sender_id):
"""Apply personality-specific modifications to response"""
if self.personality_traits["enthusiasm"] == "high":
response += " 🎉"
if self.personality_traits["formality"] == "casual":
response = response.replace("you", "ya").replace("your", "your")
if self.personality_traits["humor"] == "frequent" and random.random() < 0.3:
jokes = [
" (Why did the agent cross the network? To get to the other side! 😄)",
" *ba dum tss* 🥁",
" (That's what she said! 😉)"
]
response += random.choice(jokes)
return response基于上下文的自适应行为
class AdaptiveAgent(WorkerAgent):
"""Agent that adapts behavior based on context and interaction history"""
default_agent_id = "adaptive"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.user_profiles = {}
self.conversation_contexts = {}
self.interaction_history = {}
async def on_direct(self, msg):
"""Adapt response based on user profile and context"""
# Update user profile
self.update_user_profile(msg.sender_id, msg.text)
# Get adaptive response
response = await self.get_adaptive_response(msg)
ws = self.workspace()
await ws.agent(msg.sender_id).send(response)
def update_user_profile(self, user_id, message):
"""Build and update user profile based on interactions"""
if user_id not in self.user_profiles:
self.user_profiles[user_id] = {
"communication_style": "unknown",
"expertise_level": "unknown",
"interests": [],
"interaction_count": 0,
"preferred_response_length": "medium"
}
profile = self.user_profiles[user_id]
profile["interaction_count"] += 1
# Analyze communication style
if len(message) < 20:
profile["communication_style"] = "brief"
elif len(message) > 100:
profile["communication_style"] = "detailed"
else:
profile["communication_style"] = "moderate"
# Detect expertise level
technical_terms = ["algorithm", "function", "variable", "class", "method"]
if any(term in message.lower() for term in technical_terms):
profile["expertise_level"] = "technical"
elif "please" in message.lower() and "help" in message.lower():
profile["expertise_level"] = "beginner"
else:
profile["expertise_level"] = "intermediate"
async def get_adaptive_response(self, msg):
"""Generate response adapted to user profile"""
profile = self.user_profiles[msg.sender_id]
# Adapt response based on communication style
if profile["communication_style"] == "brief":
return await self.get_brief_response(msg.text)
elif profile["communication_style"] == "detailed":
return await self.get_detailed_response(msg.text)
else:
return await self.get_standard_response(msg.text)
async def get_brief_response(self, text):
"""Generate concise responses for brief communicators"""
if "help" in text.lower():
return "Sure! What specifically?"
elif "status" in text.lower():
return "All good! ✅"
else:
return "Got it!"
async def get_detailed_response(self, text):
"""Generate comprehensive responses for detailed communicators"""
if "help" in text.lower():
return (
"I'd be happy to provide assistance! To give you the most helpful response, "
"I'll need a bit more information about what specifically you're looking for. "
"Are you working on a particular project, facing a technical challenge, "
"or looking for general guidance? Please feel free to provide as much detail "
"as you'd like, and I'll do my best to provide comprehensive support."
)
else:
return await self.analyze_and_respond_thoroughly(text)自定义代理类型
专业领域代理
class DataScienceAgent(WorkerAgent):
"""Agent specialized in data science tasks"""
default_agent_id = "data-scientist"
default_channels = ["#data-science", "#analytics"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.supported_formats = ["csv", "json", "xlsx", "parquet"]
self.analysis_types = ["descriptive", "predictive", "prescriptive"]
self.current_projects = {}
async def on_direct(self, msg):
"""Handle data science requests"""
request_type = self.classify_request(msg.text)
if request_type == "analysis":
await self.handle_analysis_request(msg)
elif request_type == "visualization":
await self.handle_visualization_request(msg)
elif request_type == "modeling":
await self.handle_modeling_request(msg)
else:
await self.provide_guidance(msg)
def classify_request(self, text):
"""Classify the type of data science request"""
text_lower = text.lower()
analysis_keywords = ["analyze", "statistics", "correlation", "summary"]
viz_keywords = ["plot", "chart", "graph", "visualize", "dashboard"]
modeling_keywords = ["model", "predict", "forecast", "machine learning", "ml"]
if any(keyword in text_lower for keyword in modeling_keywords):
return "modeling"
elif any(keyword in text_lower for keyword in viz_keywords):
return "visualization"
elif any(keyword in text_lower for keyword in analysis_keywords):
return "analysis"
else:
return "general"
async def handle_analysis_request(self, msg):
"""Handle data analysis requests"""
ws = self.workspace()
# Start analysis workflow
await ws.agent(msg.sender_id).send(
"📊 Starting data analysis workflow...\n\n"
"Please provide:\n"
"1. Data source (file upload or description)\n"
"2. Analysis type (descriptive/predictive/prescriptive)\n"
"3. Specific questions you want answered\n\n"
"I'll guide you through the process step by step!"
)
# Create analysis project
project_id = f"analysis_{len(self.current_projects) + 1}"
self.current_projects[project_id] = {
"type": "analysis",
"requester": msg.sender_id,
"status": "awaiting_data",
"created_at": msg.timestamp
}多模态代理
class MultiModalAgent(WorkerAgent):
"""Agent that handles text, images, and files"""
default_agent_id = "multimodal"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.vision_enabled = True
self.audio_enabled = True
self.document_processing_enabled = True
async def on_direct(self, msg):
"""Handle text messages"""
await self.process_text_input(msg)
async def on_file_upload(self, msg):
"""Handle file uploads based on type"""
file_extension = msg.filename.split('.')[-1].lower()
if file_extension in ['jpg', 'jpeg', 'png', 'gif', 'webp']:
await self.process_image(msg)
elif file_extension in ['mp3', 'wav', 'ogg', 'flac']:
await self.process_audio(msg)
elif file_extension in ['pdf', 'docx', 'txt', 'md']:
await self.process_document(msg)
else:
await self.handle_unknown_file_type(msg)
async def process_image(self, msg):
"""Process image files with computer vision"""
ws = self.workspace()
if not self.vision_enabled:
await ws.agent(msg.sender_id).send(
"Image processing is currently disabled."
)
return
await ws.agent(msg.sender_id).send(
f"🖼️ Analyzing image: {msg.filename}..."
)
# Simulate image analysis
analysis_results = await self.analyze_image(msg.file_content)
await ws.agent(msg.sender_id).send(
f"📸 Image Analysis Results for {msg.filename}:\n\n"
f"• Objects detected: {', '.join(analysis_results.get('objects', []))}\n"
f"• Scene type: {analysis_results.get('scene', 'Unknown')}\n"
f"• Dominant colors: {', '.join(analysis_results.get('colors', []))}\n"
f"• Text detected: {analysis_results.get('text', 'None')}\n"
f"• Confidence: {analysis_results.get('confidence', 0)}%"
)
async def analyze_image(self, image_content):
"""Perform computer vision analysis on image"""
# Simulate computer vision processing
return {
"objects": ["person", "laptop", "desk"],
"scene": "office environment",
"colors": ["blue", "white", "gray"],
"text": "OpenAgents",
"confidence": 85
}
async def process_document(self, msg):
"""Process and extract information from documents"""
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"📄 Processing document: {msg.filename}..."
)
# Extract document content and metadata
doc_analysis = await self.analyze_document(msg.file_content, msg.filename)
await ws.agent(msg.sender_id).send(
f"📋 Document Analysis for {msg.filename}:\n\n"
f"• Document type: {doc_analysis.get('type', 'Unknown')}\n"
f"• Page count: {doc_analysis.get('pages', 'N/A')}\n"
f"• Word count: ~{doc_analysis.get('word_count', 0)}\n"
f"• Key topics: {', '.join(doc_analysis.get('topics', []))}\n"
f"• Summary: {doc_analysis.get('summary', 'No summary available')}"
)高级事件处理
事件管道与转换
from openagents.agents.worker_agent import WorkerAgent, on_event
import asyncio
from typing import Dict, List, Callable
class EventPipelineAgent(WorkerAgent):
"""Agent with advanced event processing pipeline"""
default_agent_id = "event-pipeline"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.event_pipeline = []
self.event_filters = {}
self.event_transformers = {}
self.event_aggregators = {}
self.setup_pipeline()
def setup_pipeline(self):
"""Configure event processing pipeline"""
# Add filters
self.add_filter("spam_filter", self.filter_spam_events)
self.add_filter("priority_filter", self.filter_priority_events)
# Add transformers
self.add_transformer("normalize", self.normalize_event_data)
self.add_transformer("enrich", self.enrich_event_context)
# Add aggregators
self.add_aggregator("user_activity", self.aggregate_user_activity)
self.add_aggregator("system_metrics", self.aggregate_system_metrics)
def add_filter(self, name: str, filter_func: Callable):
"""Add event filter to pipeline"""
self.event_filters[name] = filter_func
def add_transformer(self, name: str, transform_func: Callable):
"""Add event transformer to pipeline"""
self.event_transformers[name] = transform_func
def add_aggregator(self, name: str, aggregator_func: Callable):
"""Add event aggregator to pipeline"""
self.event_aggregators[name] = aggregator_func
@on_event("*")
async def process_all_events(self, context):
"""Process all events through pipeline"""
event = context.incoming_event
# Apply filters
if not await self.apply_filters(event):
return # Event filtered out
# Apply transformations
transformed_event = await self.apply_transformers(event)
# Apply aggregations
await self.apply_aggregators(transformed_event)
# Route to specific handlers
await self.route_event(transformed_event)
async def apply_filters(self, event) -> bool:
"""Apply all filters to event"""
for filter_name, filter_func in self.event_filters.items():
if not await filter_func(event):
return False
return True
async def filter_spam_events(self, event) -> bool:
"""Filter out spam events"""
spam_indicators = ["spam", "advertisement", "promotional"]
event_text = str(event.payload).lower()
return not any(indicator in event_text for indicator in spam_indicators)
async def filter_priority_events(self, event) -> bool:
"""Filter events based on priority"""
priority_events = ["error", "alert", "critical", "emergency"]
return any(priority in event.event_name.lower() for priority in priority_events)
async def apply_transformers(self, event):
"""Apply all transformers to event"""
transformed_event = event
for transformer_name, transformer_func in self.event_transformers.items():
transformed_event = await transformer_func(transformed_event)
return transformed_event
async def normalize_event_data(self, event):
"""Normalize event data format"""
# Standardize timestamp format
# Normalize field names
# Validate data types
return event
async def enrich_event_context(self, event):
"""Enrich event with additional context"""
# Add geolocation data
# Add user profile information
# Add system state information
return event复杂事件关联
class EventCorrelationAgent(WorkerAgent):
"""Agent that correlates and analyzes complex event patterns"""
default_agent_id = "correlator"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.event_buffer = []
self.correlation_rules = []
self.pattern_detectors = {}
self.setup_correlation_rules()
def setup_correlation_rules(self):
"""Define correlation rules and patterns"""
self.correlation_rules = [
{
"name": "login_anomaly",
"pattern": ["user.login.failed", "user.login.failed", "user.login.failed"],
"timeframe": 300, # 5 minutes
"action": self.handle_login_anomaly
},
{
"name": "system_degradation",
"pattern": ["system.performance.warning", "system.error.*"],
"timeframe": 600, # 10 minutes
"action": self.handle_system_degradation
}
]
@on_event("*")
async def collect_events(self, context):
"""Collect events for correlation analysis"""
event = context.incoming_event
# Add to event buffer
self.event_buffer.append({
"event": event,
"timestamp": context.timestamp,
"context": context
})
# Maintain buffer size (keep last 1000 events)
if len(self.event_buffer) > 1000:
self.event_buffer = self.event_buffer[-1000:]
# Check for pattern matches
await self.check_correlation_patterns()
async def check_correlation_patterns(self):
"""Check if any correlation patterns are matched"""
for rule in self.correlation_rules:
if await self.pattern_matches(rule):
await rule["action"](rule, self.get_matching_events(rule))
async def pattern_matches(self, rule) -> bool:
"""Check if a correlation pattern is matched"""
pattern = rule["pattern"]
timeframe = rule["timeframe"]
current_time = time.time()
# Filter events within timeframe
recent_events = [
e for e in self.event_buffer
if current_time - e["timestamp"] <= timeframe
]
# Check pattern match
pattern_index = 0
for event_data in recent_events:
event_name = event_data["event"].event_name
if self.event_matches_pattern(event_name, pattern[pattern_index]):
pattern_index += 1
if pattern_index >= len(pattern):
return True
return False
def event_matches_pattern(self, event_name: str, pattern: str) -> bool:
"""Check if event name matches pattern (supports wildcards)"""
if "*" in pattern:
pattern_prefix = pattern.replace("*", "")
return event_name.startswith(pattern_prefix)
else:
return event_name == pattern
async def handle_login_anomaly(self, rule, matching_events):
"""Handle detected login anomaly"""
ws = self.workspace()
# Extract user information
user_events = [e for e in matching_events if "user.login.failed" in e["event"].event_name]
affected_users = set(e["event"].payload.get("user_id") for e in user_events)
alert_message = (
f"🚨 LOGIN ANOMALY DETECTED\n\n"
f"Pattern: {rule['name']}\n"
f"Affected users: {', '.join(affected_users)}\n"
f"Event count: {len(user_events)}\n"
f"Timeframe: {rule['timeframe']} seconds\n\n"
f"Recommended actions:\n"
f"• Review login logs\n"
f"• Check for brute force attacks\n"
f"• Consider temporary account lockouts"
)
await ws.channel("#security").post(alert_message)外部服务集成
API 网关代理
import aiohttp
import asyncio
from typing import Dict, Any
class APIGatewayAgent(WorkerAgent):
"""Agent that acts as a gateway to external APIs"""
default_agent_id = "api-gateway"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.api_configs = self.load_api_configurations()
self.rate_limiters = {}
self.circuit_breakers = {}
self.api_cache = {}
def load_api_configurations(self) -> Dict[str, Dict[str, Any]]:
"""Load external API configurations"""
return {
"weather": {
"base_url": "https://api.openweathermap.org/data/2.5",
"api_key": os.getenv("WEATHER_API_KEY"),
"rate_limit": {"requests": 60, "window": 60}, # 60 requests per minute
"timeout": 10,
"retry_attempts": 3
},
"news": {
"base_url": "https://newsapi.org/v2",
"api_key": os.getenv("NEWS_API_KEY"),
"rate_limit": {"requests": 100, "window": 3600}, # 100 requests per hour
"timeout": 15,
"retry_attempts": 2
},
"translate": {
"base_url": "https://api.mymemory.translated.net",
"api_key": None, # Free tier
"rate_limit": {"requests": 1000, "window": 86400}, # 1000 requests per day
"timeout": 5,
"retry_attempts": 2
}
}
async def on_direct(self, msg):
"""Route API requests based on message content"""
request_type = self.parse_api_request(msg.text)
if request_type:
await self.handle_api_request(msg, request_type)
else:
await self.show_available_apis(msg)
def parse_api_request(self, text: str) -> str:
"""Parse API request type from message"""
text_lower = text.lower()
if any(word in text_lower for word in ["weather", "temperature", "forecast"]):
return "weather"
elif any(word in text_lower for word in ["news", "headlines", "articles"]):
return "news"
elif any(word in text_lower for word in ["translate", "translation"]):
return "translate"
else:
return None
async def handle_api_request(self, msg, api_type: str):
"""Handle specific API request with rate limiting and error handling"""
if not await self.check_rate_limit(api_type, msg.sender_id):
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"⏰ Rate limit exceeded for {api_type} API. Please try again later."
)
return
try:
# Check circuit breaker
if self.is_circuit_open(api_type):
raise Exception(f"{api_type} API is currently unavailable")
# Make API request
result = await self.make_api_request(api_type, msg.text, msg.sender_id)
# Send response
ws = self.workspace()
await ws.agent(msg.sender_id).send(result)
# Reset circuit breaker on success
self.reset_circuit_breaker(api_type)
except Exception as e:
await self.handle_api_error(msg, api_type, str(e))
async def make_api_request(self, api_type: str, query: str, user_id: str) -> str:
"""Make request to external API with retry logic"""
config = self.api_configs[api_type]
for attempt in range(config["retry_attempts"]):
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=config["timeout"])) as session:
if api_type == "weather":
return await self.call_weather_api(session, config, query)
elif api_type == "news":
return await self.call_news_api(session, config, query)
elif api_type == "translate":
return await self.call_translate_api(session, config, query)
except asyncio.TimeoutError:
if attempt == config["retry_attempts"] - 1:
raise Exception(f"API timeout after {config['retry_attempts']} attempts")
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
if attempt == config["retry_attempts"] - 1:
raise e
await asyncio.sleep(1)
async def call_weather_api(self, session, config, query):
"""Call weather API"""
# Extract location from query
location = self.extract_location_from_query(query)
url = f"{config['base_url']}/weather"
params = {
"q": location,
"appid": config["api_key"],
"units": "metric"
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return self.format_weather_response(data, location)
else:
raise Exception(f"Weather API error: {response.status}")
def format_weather_response(self, data, location):
"""Format weather API response"""
temp = data["main"]["temp"]
description = data["weather"][0]["description"]
humidity = data["main"]["humidity"]
return (
f"🌤️ Weather for {location}:\n"
f"Temperature: {temp}°C\n"
f"Conditions: {description.title()}\n"
f"Humidity: {humidity}%"
)数据库集成代理
import asyncpg
import asyncio
from datetime import datetime
class DatabaseAgent(WorkerAgent):
"""Agent that integrates with databases for data storage and retrieval"""
default_agent_id = "database"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.db_pool = None
self.query_cache = {}
self.connection_config = {
"host": os.getenv("DB_HOST", "localhost"),
"port": os.getenv("DB_PORT", 5432),
"database": os.getenv("DB_NAME", "openagents"),
"user": os.getenv("DB_USER", "postgres"),
"password": os.getenv("DB_PASSWORD", "")
}
async def on_startup(self):
"""Initialize database connection pool"""
await super().on_startup()
try:
self.db_pool = await asyncpg.create_pool(**self.connection_config)
# Initialize database schema
await self.initialize_schema()
ws = self.workspace()
await ws.channel("#system").post("🗄️ Database agent connected and ready!")
except Exception as e:
print(f"Database connection failed: {e}")
async def initialize_schema(self):
"""Create necessary database tables"""
schema_sql = """
CREATE TABLE IF NOT EXISTS agent_interactions (
id SERIAL PRIMARY KEY,
agent_id VARCHAR(255),
user_id VARCHAR(255),
message_type VARCHAR(50),
content TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS user_profiles (
user_id VARCHAR(255) PRIMARY KEY,
preferences JSONB,
interaction_count INTEGER DEFAULT 0,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS system_metrics (
id SERIAL PRIMARY KEY,
metric_name VARCHAR(255),
metric_value DECIMAL,
tags JSONB,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
async with self.db_pool.acquire() as connection:
await connection.execute(schema_sql)
async def on_direct(self, msg):
"""Handle database queries and operations"""
query_type = self.parse_database_request(msg.text)
if query_type == "search":
await self.handle_search_request(msg)
elif query_type == "analytics":
await self.handle_analytics_request(msg)
elif query_type == "profile":
await self.handle_profile_request(msg)
else:
await self.show_database_help(msg)
async def handle_search_request(self, msg):
"""Handle data search requests"""
search_term = self.extract_search_term(msg.text)
if not search_term:
ws = self.workspace()
await ws.agent(msg.sender_id).send(
"Please specify what you'd like to search for."
)
return
try:
async with self.db_pool.acquire() as connection:
query = """
SELECT agent_id, user_id, content, timestamp
FROM agent_interactions
WHERE content ILIKE $1
ORDER BY timestamp DESC
LIMIT 10
"""
results = await connection.fetch(query, f"%{search_term}%")
if results:
response = f"🔍 Search results for '{search_term}':\n\n"
for row in results:
response += f"• {row['timestamp']}: {row['agent_id']} ↔ {row['user_id']}\n"
response += f" {row['content'][:100]}...\n\n"
else:
response = f"No results found for '{search_term}'"
ws = self.workspace()
await ws.agent(msg.sender_id).send(response)
except Exception as e:
await self.handle_database_error(msg, str(e))
async def log_interaction(self, agent_id: str, user_id: str, message_type: str, content: str):
"""Log agent interaction to database"""
try:
async with self.db_pool.acquire() as connection:
await connection.execute(
"""
INSERT INTO agent_interactions (agent_id, user_id, message_type, content)
VALUES ($1, $2, $3, $4)
""",
agent_id, user_id, message_type, content
)
except Exception as e:
print(f"Failed to log interaction: {e}")自定义协议和通信
协议桥接代理
import websockets
import json
from typing import Dict, Any
class ProtocolBridgeAgent(WorkerAgent):
"""Agent that bridges different communication protocols"""
default_agent_id = "protocol-bridge"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.protocol_handlers = {}
self.active_connections = {}
self.message_queues = {}
self.setup_protocol_handlers()
def setup_protocol_handlers(self):
"""Setup handlers for different protocols"""
self.protocol_handlers = {
"websocket": self.handle_websocket_protocol,
"mqtt": self.handle_mqtt_protocol,
"slack": self.handle_slack_protocol,
"discord": self.handle_discord_protocol
}
async def on_startup(self):
"""Start protocol bridges"""
await super().on_startup()
# Start WebSocket server
asyncio.create_task(self.start_websocket_server())
# Connect to external services
await self.connect_external_protocols()
async def start_websocket_server(self):
"""Start WebSocket server for external connections"""
async def handle_websocket(websocket, path):
try:
await self.handle_websocket_connection(websocket, path)
except Exception as e:
print(f"WebSocket error: {e}")
# Start WebSocket server on port 8080
await websockets.serve(handle_websocket, "localhost", 8080)
print("🌐 WebSocket bridge server started on port 8080")
async def handle_websocket_connection(self, websocket, path):
"""Handle new WebSocket connection"""
connection_id = f"ws_{len(self.active_connections)}"
self.active_connections[connection_id] = {
"protocol": "websocket",
"connection": websocket,
"path": path
}
try:
async for message in websocket:
await self.process_external_message("websocket", connection_id, message)
finally:
del self.active_connections[connection_id]
async def process_external_message(self, protocol: str, connection_id: str, message: str):
"""Process message from external protocol"""
try:
# Parse message
if protocol == "websocket":
data = json.loads(message)
else:
data = {"content": message}
# Route to OpenAgents network
await self.route_external_message(protocol, connection_id, data)
except Exception as e:
print(f"Error processing {protocol} message: {e}")
async def route_external_message(self, protocol: str, connection_id: str, data: Dict[str, Any]):
"""Route external message to OpenAgents network"""
# Determine target channel or agent
target = data.get("target", "#bridge")
content = data.get("content", str(data))
# Add protocol information
formatted_message = f"[{protocol.upper()}] {content}"
ws = self.workspace()
if target.startswith("#"):
# Send to channel
await ws.channel(target).post(formatted_message)
else:
# Send to specific agent
await ws.agent(target).send(formatted_message)
async def on_channel_post(self, msg):
"""Bridge channel messages to external protocols"""
if msg.channel == "#bridge":
await self.broadcast_to_external_protocols(msg.text, msg.sender_id)
async def broadcast_to_external_protocols(self, message: str, sender_id: str):
"""Broadcast message to all connected external protocols"""
formatted_message = {
"sender": sender_id,
"content": message,
"timestamp": datetime.now().isoformat()
}
for connection_id, connection_info in self.active_connections.items():
protocol = connection_info["protocol"]
try:
if protocol == "websocket":
await connection_info["connection"].send(json.dumps(formatted_message))
# Add other protocol handlers here
except Exception as e:
print(f"Failed to send to {protocol} connection {connection_id}: {e}")此全面教程为创建具有复杂行为、集成和功能的高度自定义代理提供了基础。将这些模式用作构建模块,以创建针对您特定用例和需求量身定制的代理。
Was this helpful?