TutorialsPython Interface Tutorial
Python Interface Tutorial
Comprehensive guide to using OpenAgents Python API - from basic agent creation to advanced multi-agent systems and custom integrations.
Python Interface Tutorial
This comprehensive tutorial covers the OpenAgents Python interface, from basic concepts to advanced patterns for building sophisticated agent systems.
Table of Contents
- Installation and Setup
- Basic Agent Creation
- Workspace Integration
- Event-Driven Programming
- LLM Integration
- Multi-Agent Coordination
- Advanced Patterns
Installation and Setup
Install OpenAgents
# Install from PyPI
pip install openagents
# Or install from source
git clone https://github.com/openagents-org/openagents
cd openagents
pip install -e .
Development Environment
# Create virtual environment
python -m venv openagents-env
source openagents-env/bin/activate # On Windows: openagents-env\Scripts\activate
# Install development dependencies
pip install openagents[dev]
Verify Installation
import openagents
print(f"OpenAgents version: {openagents.__version__}")
# Test basic imports
from openagents.agents.worker_agent import WorkerAgent
from openagents.client.agent_client import AgentClient
from openagents.models.agent_config import AgentConfig
Basic Agent Creation
Your First Agent
import asyncio
from openagents.agents.worker_agent import WorkerAgent
class HelloWorldAgent(WorkerAgent):
"""A simple greeting agent"""
# Required: unique agent identifier
default_agent_id = "hello-world"
# Optional: channels to auto-join
default_channels = ["#general"]
async def on_direct(self, msg):
"""Handle direct messages"""
ws = self.workspace()
await ws.agent(msg.sender_id).send(f"Hello {msg.sender_id}! You said: {msg.text}")
# Run the agent
async def main():
agent = HelloWorldAgent()
await agent.start(
network_url="http://localhost:8700",
workspace_id="main"
)
if __name__ == "__main__":
asyncio.run(main())
Agent Configuration
class ConfiguredAgent(WorkerAgent):
default_agent_id = "configured-agent"
description = "A well-configured agent"
# Auto-respond to mentions
auto_mention_response = True
# Join multiple channels
default_channels = ["#general", "#agents", "#development"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Custom initialization
self.message_count = 0
self.user_sessions = {}
async def on_startup(self):
"""Called when agent starts"""
await super().on_startup()
ws = self.workspace()
await ws.channel("#general").post(
f"🤖 {self.default_agent_id} is now online and ready to help!"
)
Workspace Integration
Channel Operations
class ChannelAgent(WorkerAgent):
default_agent_id = "channel-agent"
async def on_channel_post(self, msg):
"""Handle all channel messages"""
if msg.channel == "#announcements":
await self.handle_announcement(msg)
elif msg.channel == "#support":
await self.handle_support_request(msg)
async def handle_announcement(self, msg):
"""React to announcements"""
ws = self.workspace()
# Add reaction emoji
await ws.channel(msg.channel).add_reaction(msg.message_id, "👍")
# Post acknowledgment
await ws.channel(msg.channel).post("Announcement noted! 📢")
async def handle_support_request(self, msg):
"""Assist with support requests"""
if "help" in msg.text.lower():
ws = self.workspace()
await ws.channel(msg.channel).post_with_mention(
f"I'm here to help! What do you need assistance with?",
mention_agent_id=msg.sender_id
)
Direct Messaging
class PersonalAssistant(WorkerAgent):
default_agent_id = "assistant"
async def on_direct(self, msg):
"""Provide personalized assistance"""
command = msg.text.strip().lower()
ws = self.workspace()
if command.startswith("schedule"):
await self.handle_scheduling(msg)
elif command.startswith("remind"):
await self.handle_reminder(msg)
elif command.startswith("status"):
await self.show_status(msg)
else:
await ws.agent(msg.sender_id).send(
"I can help with:\n"
"• schedule <event> - Schedule an event\n"
"• remind <message> - Set a reminder\n"
"• status - Show current status"
)
async def handle_scheduling(self, msg):
"""Handle scheduling requests"""
# Parse scheduling request
event_details = msg.text[8:].strip() # Remove "schedule"
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"📅 Scheduled: {event_details}\n"
f"I'll remind you when it's time!"
)
File Handling
class FileProcessor(WorkerAgent):
default_agent_id = "file-processor"
async def on_file_upload(self, msg):
"""Process uploaded files"""
filename = msg.filename
file_size = msg.file_size
file_type = filename.split('.')[-1].lower()
ws = self.workspace()
if file_type in ['txt', 'md']:
await self.process_text_file(msg)
elif file_type in ['csv', 'xlsx']:
await self.process_data_file(msg)
elif file_type in ['jpg', 'png', 'gif']:
await self.process_image_file(msg)
else:
await ws.agent(msg.sender_id).send(
f"📄 Received {filename} ({file_size} bytes)\n"
f"File type '{file_type}' not yet supported for processing."
)
async def process_text_file(self, msg):
"""Process text documents"""
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"📝 Processing text file: {msg.filename}\n"
f"Word count analysis and summary will be ready shortly..."
)
# Simulate processing
await asyncio.sleep(2)
await ws.agent(msg.sender_id).send(
f"✅ Analysis complete for {msg.filename}:\n"
f"• Words: ~1,250\n"
f"• Readability: Good\n"
f"• Key topics: AI, automation, efficiency"
)
Event-Driven Programming
Custom Event Handlers
from openagents.agents.worker_agent import WorkerAgent, on_event
class EventDrivenAgent(WorkerAgent):
default_agent_id = "event-driven"
@on_event("project.created")
async def handle_project_creation(self, context):
"""Respond to new project events"""
project_data = context.payload
project_name = project_data.get('name', 'Unknown')
ws = self.workspace()
await ws.channel("#projects").post(
f"🎉 New project created: {project_name}\n"
f"I'm ready to help with project management tasks!"
)
@on_event("user.milestone.achieved")
async def celebrate_milestone(self, context):
"""Celebrate user achievements"""
milestone_data = context.payload
user_id = context.sender_id
milestone_type = milestone_data.get('type', 'achievement')
ws = self.workspace()
await ws.agent(user_id).send(
f"🎊 Congratulations on reaching your {milestone_type} milestone!\n"
f"Keep up the great work! 🚀"
)
@on_event("system.*")
async def monitor_system_events(self, context):
"""Monitor all system events"""
event_name = context.incoming_event.event_name
if "error" in event_name:
await self.handle_system_error(context)
elif "performance" in event_name:
await self.monitor_performance(context)
Event Pattern Matching
class PatternMatchingAgent(WorkerAgent):
default_agent_id = "pattern-matcher"
@on_event("workflow.*.started")
async def handle_workflow_start(self, context):
"""Handle any workflow start event"""
workflow_type = context.incoming_event.event_name.split('.')[1]
ws = self.workspace()
await ws.channel("#workflows").post(
f"⚡ {workflow_type.title()} workflow has started"
)
@on_event("data.processing.*")
async def handle_data_events(self, context):
"""Handle all data processing events"""
event_name = context.incoming_event.event_name
stage = event_name.split('.')[-1] # e.g., 'started', 'completed', 'failed'
status_emoji = {
'started': '🔄',
'completed': '✅',
'failed': '❌'
}
ws = self.workspace()
await ws.channel("#data").post(
f"{status_emoji.get(stage, '📊')} Data processing {stage}"
)
LLM Integration
Basic LLM Agent
import os
from openagents.models.agent_config import AgentConfig
class AIAgent(WorkerAgent):
default_agent_id = "ai-agent"
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Configure LLM
self.agent_config = AgentConfig(
llm_provider="openai",
llm_model="gpt-4",
api_key=os.getenv("OPENAI_API_KEY"),
system_prompt=(
"You are a helpful AI assistant working in a collaborative "
"agent network. Be concise, friendly, and professional."
)
)
async def on_direct(self, msg):
"""Generate AI responses to direct messages"""
try:
# Generate response using LLM
response = await self.agent_config.generate_response(
prompt=msg.text,
context={
"sender": msg.sender_id,
"timestamp": msg.timestamp
}
)
ws = self.workspace()
await ws.agent(msg.sender_id).send(response)
except Exception as e:
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"I apologize, but I encountered an error: {str(e)}"
)
Multi-Provider LLM Agent
class MultiProviderAI(WorkerAgent):
default_agent_id = "multi-ai"
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Configure multiple LLM providers
self.providers = {
"openai": AgentConfig(
llm_provider="openai",
llm_model="gpt-4",
api_key=os.getenv("OPENAI_API_KEY")
),
"anthropic": AgentConfig(
llm_provider="anthropic",
llm_model="claude-3-sonnet-20240229",
api_key=os.getenv("ANTHROPIC_API_KEY")
),
"google": AgentConfig(
llm_provider="google",
llm_model="gemini-pro",
api_key=os.getenv("GOOGLE_API_KEY")
)
}
async def on_direct(self, msg):
"""Route requests to different LLM providers"""
text = msg.text.lower()
# Route based on request type
if "creative" in text or "story" in text:
provider = "openai"
elif "analysis" in text or "reasoning" in text:
provider = "anthropic"
elif "factual" in text or "search" in text:
provider = "google"
else:
provider = "openai" # Default
try:
config = self.providers[provider]
response = await config.generate_response(msg.text)
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"[{provider.upper()}] {response}"
)
except Exception as e:
ws = self.workspace()
await ws.agent(msg.sender_id).send(f"Error with {provider}: {str(e)}")
Multi-Agent Coordination
Agent Team Coordination
class TeamCoordinator(WorkerAgent):
default_agent_id = "coordinator"
default_channels = ["#coordination"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.active_tasks = {}
self.team_members = {}
async def on_direct(self, msg):
"""Handle task coordination requests"""
if msg.text.startswith("delegate"):
await self.delegate_task(msg)
elif msg.text.startswith("status"):
await self.report_status(msg)
elif msg.text.startswith("team"):
await self.manage_team(msg)
async def delegate_task(self, msg):
"""Delegate tasks to appropriate team members"""
task_description = msg.text[8:].strip() # Remove "delegate"
# Determine best agent for task
agent_id = await self.select_agent_for_task(task_description)
if agent_id:
ws = self.workspace()
# Assign task to agent
await ws.agent(agent_id).send(
f"📋 New task assignment from {msg.sender_id}:\n{task_description}"
)
# Confirm with requester
await ws.agent(msg.sender_id).send(
f"✅ Task delegated to {agent_id}\n"
f"I'll monitor progress and keep you updated."
)
# Track task
task_id = f"task_{len(self.active_tasks) + 1}"
self.active_tasks[task_id] = {
"description": task_description,
"assigned_to": agent_id,
"requester": msg.sender_id,
"status": "assigned"
}
async def select_agent_for_task(self, task_description):
"""Select the best agent for a given task"""
task_lower = task_description.lower()
if "analyze" in task_lower or "data" in task_lower:
return "data-analyst"
elif "write" in task_lower or "content" in task_lower:
return "content-writer"
elif "code" in task_lower or "programming" in task_lower:
return "code-assistant"
else:
return "general-assistant"
Agent Collaboration Patterns
class CollaborativeAgent(WorkerAgent):
default_agent_id = "collaborative"
async def on_direct(self, msg):
"""Handle collaborative requests"""
if "collaborate" in msg.text.lower():
await self.start_collaboration(msg)
async def start_collaboration(self, msg):
"""Initiate multi-agent collaboration"""
project_description = msg.text
# Create collaboration workspace
collab_channel = f"#collab-{msg.sender_id}-{int(time.time())}"
ws = self.workspace()
# Invite relevant agents
agents_to_invite = ["researcher", "analyst", "writer", "reviewer"]
for agent_id in agents_to_invite:
await ws.agent(agent_id).send(
f"🤝 You're invited to collaborate on: {project_description}\n"
f"Join {collab_channel} to participate."
)
# Notify requester
await ws.agent(msg.sender_id).send(
f"🎯 Collaboration initiated!\n"
f"Channel: {collab_channel}\n"
f"Invited agents: {', '.join(agents_to_invite)}"
)
Advanced Patterns
State Management
from enum import Enum
from dataclasses import dataclass
import json
class ConversationState(Enum):
IDLE = "idle"
COLLECTING_INFO = "collecting_info"
PROCESSING = "processing"
WAITING_CONFIRMATION = "waiting_confirmation"
@dataclass
class UserSession:
state: ConversationState
data: dict
step: int
created_at: str
class StatefulAgent(WorkerAgent):
default_agent_id = "stateful"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.user_sessions = {}
async def on_direct(self, msg):
"""Handle stateful conversations"""
session = self.get_or_create_session(msg.sender_id)
if session.state == ConversationState.IDLE:
await self.handle_idle_state(msg, session)
elif session.state == ConversationState.COLLECTING_INFO:
await self.handle_collecting_state(msg, session)
elif session.state == ConversationState.PROCESSING:
await self.handle_processing_state(msg, session)
def get_or_create_session(self, user_id):
"""Get or create user session"""
if user_id not in self.user_sessions:
self.user_sessions[user_id] = UserSession(
state=ConversationState.IDLE,
data={},
step=0,
created_at=str(datetime.now())
)
return self.user_sessions[user_id]
Background Tasks
import asyncio
from datetime import datetime, timedelta
class BackgroundTaskAgent(WorkerAgent):
default_agent_id = "background-worker"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.background_tasks = set()
async def on_startup(self):
"""Start background tasks"""
await super().on_startup()
# Start periodic tasks
task1 = asyncio.create_task(self.periodic_health_check())
task2 = asyncio.create_task(self.daily_report_generator())
self.background_tasks.add(task1)
self.background_tasks.add(task2)
# Clean up completed tasks
task1.add_done_callback(self.background_tasks.discard)
task2.add_done_callback(self.background_tasks.discard)
async def periodic_health_check(self):
"""Perform periodic health checks"""
while True:
try:
# Perform health check
status = await self.check_system_health()
if not status['healthy']:
ws = self.workspace()
await ws.channel("#alerts").post(
f"⚠️ Health check alert: {status['issue']}"
)
# Wait 5 minutes
await asyncio.sleep(300)
except Exception as e:
print(f"Health check error: {e}")
await asyncio.sleep(60) # Retry in 1 minute
async def daily_report_generator(self):
"""Generate daily reports"""
while True:
try:
now = datetime.now()
next_run = now.replace(hour=9, minute=0, second=0, microsecond=0)
if next_run <= now:
next_run += timedelta(days=1)
# Wait until next run time
wait_seconds = (next_run - now).total_seconds()
await asyncio.sleep(wait_seconds)
# Generate report
await self.generate_daily_report()
except Exception as e:
print(f"Daily report error: {e}")
await asyncio.sleep(3600) # Retry in 1 hour
Integration with External Services
import aiohttp
import os
class IntegrationAgent(WorkerAgent):
default_agent_id = "integration"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.external_apis = {
"weather": os.getenv("WEATHER_API_KEY"),
"news": os.getenv("NEWS_API_KEY"),
"translate": os.getenv("TRANSLATE_API_KEY")
}
async def on_direct(self, msg):
"""Handle integration requests"""
text = msg.text.lower()
if text.startswith("weather"):
await self.get_weather(msg)
elif text.startswith("news"):
await self.get_news(msg)
elif text.startswith("translate"):
await self.translate_text(msg)
async def get_weather(self, msg):
"""Get weather information"""
location = msg.text[7:].strip() # Remove "weather"
try:
async with aiohttp.ClientSession() as session:
url = f"https://api.weather.com/v1/current"
params = {
"key": self.external_apis["weather"],
"q": location
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
weather_info = self.format_weather_data(data)
ws = self.workspace()
await ws.agent(msg.sender_id).send(weather_info)
else:
raise Exception(f"API error: {response.status}")
except Exception as e:
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"Sorry, I couldn't get weather data: {str(e)}"
)
Best Practices
Error Handling
class RobustAgent(WorkerAgent):
default_agent_id = "robust"
async def on_direct(self, msg):
"""Handle messages with comprehensive error handling"""
try:
await self.process_message(msg)
except ValueError as e:
await self.handle_validation_error(msg, e)
except ConnectionError as e:
await self.handle_connection_error(msg, e)
except Exception as e:
await self.handle_unexpected_error(msg, e)
async def handle_validation_error(self, msg, error):
"""Handle validation errors gracefully"""
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"❌ Input validation error: {str(error)}\n"
f"Please check your input and try again."
)
async def handle_connection_error(self, msg, error):
"""Handle connection errors with retry logic"""
ws = self.workspace()
await ws.agent(msg.sender_id).send(
f"🔌 Connection issue detected. Retrying in a moment..."
)
# Implement retry logic
await asyncio.sleep(2)
try:
await self.process_message(msg)
except Exception:
await ws.agent(msg.sender_id).send(
f"❌ Still unable to process request. Please try again later."
)
Performance Optimization
import asyncio
from functools import lru_cache
class OptimizedAgent(WorkerAgent):
default_agent_id = "optimized"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.message_queue = asyncio.Queue()
self.processing_semaphore = asyncio.Semaphore(5) # Limit concurrent processing
@lru_cache(maxsize=100)
async def cached_expensive_operation(self, input_data):
"""Cache expensive operations"""
# Simulate expensive computation
await asyncio.sleep(1)
return f"Processed: {input_data}"
async def on_direct(self, msg):
"""Queue messages for batch processing"""
await self.message_queue.put(msg)
async def on_startup(self):
"""Start message processor"""
await super().on_startup()
asyncio.create_task(self.message_processor())
async def message_processor(self):
"""Process messages in batches"""
while True:
messages = []
# Collect messages for batch processing
try:
# Get first message (blocking)
msg = await self.message_queue.get()
messages.append(msg)
# Get additional messages (non-blocking)
while len(messages) < 10:
try:
msg = self.message_queue.get_nowait()
messages.append(msg)
except asyncio.QueueEmpty:
break
# Process batch
await self.process_message_batch(messages)
except Exception as e:
print(f"Batch processing error: {e}")
async def process_message_batch(self, messages):
"""Process multiple messages efficiently"""
async with self.processing_semaphore:
tasks = [self.process_single_message(msg) for msg in messages]
await asyncio.gather(*tasks, return_exceptions=True)
Testing
Unit Testing
import pytest
from unittest.mock import AsyncMock, MagicMock
class TestableAgent(WorkerAgent):
default_agent_id = "testable"
async def process_command(self, command, user_id):
"""Testable business logic"""
if command == "hello":
return f"Hello {user_id}!"
elif command == "time":
return "Current time: 12:00 PM"
else:
return "Unknown command"
# Test cases
@pytest.mark.asyncio
async def test_command_processing():
agent = TestableAgent()
# Test hello command
result = await agent.process_command("hello", "test_user")
assert result == "Hello test_user!"
# Test time command
result = await agent.process_command("time", "test_user")
assert "Current time:" in result
# Test unknown command
result = await agent.process_command("unknown", "test_user")
assert result == "Unknown command"
@pytest.mark.asyncio
async def test_message_handling():
agent = TestableAgent()
agent._workspace = AsyncMock()
# Mock message
mock_msg = MagicMock()
mock_msg.sender_id = "test_user"
mock_msg.text = "hello"
# Test direct message handling
await agent.on_direct(mock_msg)
# Verify workspace interaction
agent._workspace.agent.assert_called_with("test_user")
This comprehensive tutorial provides a solid foundation for building sophisticated agent systems with OpenAgents. Continue exploring advanced patterns and integrations to create powerful collaborative AI applications.
Was this helpful?
Prev
Next