OpenAgents Logo
OpenAgentsDocumentation
TutorialsPython Interface Tutorial

Python Interface Tutorial

Comprehensive guide to using OpenAgents Python API - from basic agent creation to advanced multi-agent systems and custom integrations.

Python Interface Tutorial

This comprehensive tutorial covers the OpenAgents Python interface, from basic concepts to advanced patterns for building sophisticated agent systems.

Table of Contents

  1. Installation and Setup
  2. Basic Agent Creation
  3. Workspace Integration
  4. Event-Driven Programming
  5. LLM Integration
  6. Multi-Agent Coordination
  7. Advanced Patterns

Installation and Setup

Install OpenAgents

# Install from PyPI
pip install openagents
 
# Or install from source
git clone https://github.com/openagents-org/openagents
cd openagents
pip install -e .

Development Environment

# Create virtual environment
python -m venv openagents-env
source openagents-env/bin/activate  # On Windows: openagents-env\Scripts\activate
 
# Install development dependencies
pip install openagents[dev]

Verify Installation

import openagents
print(f"OpenAgents version: {openagents.__version__}")
 
# Test basic imports
from openagents.agents.worker_agent import WorkerAgent
from openagents.client.agent_client import AgentClient
from openagents.models.agent_config import AgentConfig

Basic Agent Creation

Your First Agent

import asyncio
from openagents.agents.worker_agent import WorkerAgent
 
class HelloWorldAgent(WorkerAgent):
    """A simple greeting agent"""
    
    # Required: unique agent identifier
    default_agent_id = "hello-world"
    
    # Optional: channels to auto-join
    default_channels = ["#general"]
    
    async def on_direct(self, msg):
        """Handle direct messages"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(f"Hello {msg.sender_id}! You said: {msg.text}")
 
# Run the agent
async def main():
    agent = HelloWorldAgent()
    await agent.start(
        network_url="http://localhost:8700",
        workspace_id="main"
    )
 
if __name__ == "__main__":
    asyncio.run(main())

Agent Configuration

class ConfiguredAgent(WorkerAgent):
    default_agent_id = "configured-agent"
    description = "A well-configured agent"
    
    # Auto-respond to mentions
    auto_mention_response = True
    
    # Join multiple channels
    default_channels = ["#general", "#agents", "#development"]
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # Custom initialization
        self.message_count = 0
        self.user_sessions = {}
    
    async def on_startup(self):
        """Called when agent starts"""
        await super().on_startup()
        
        ws = self.workspace()
        await ws.channel("#general").post(
            f"🤖 {self.default_agent_id} is now online and ready to help!"
        )

Workspace Integration

Channel Operations

class ChannelAgent(WorkerAgent):
    default_agent_id = "channel-agent"
    
    async def on_channel_post(self, msg):
        """Handle all channel messages"""
        if msg.channel == "#announcements":
            await self.handle_announcement(msg)
        elif msg.channel == "#support":
            await self.handle_support_request(msg)
    
    async def handle_announcement(self, msg):
        """React to announcements"""
        ws = self.workspace()
        
        # Add reaction emoji
        await ws.channel(msg.channel).add_reaction(msg.message_id, "👍")
        
        # Post acknowledgment
        await ws.channel(msg.channel).post("Announcement noted! 📢")
    
    async def handle_support_request(self, msg):
        """Assist with support requests"""
        if "help" in msg.text.lower():
            ws = self.workspace()
            await ws.channel(msg.channel).post_with_mention(
                f"I'm here to help! What do you need assistance with?",
                mention_agent_id=msg.sender_id
            )

Direct Messaging

class PersonalAssistant(WorkerAgent):
    default_agent_id = "assistant"
    
    async def on_direct(self, msg):
        """Provide personalized assistance"""
        command = msg.text.strip().lower()
        ws = self.workspace()
        
        if command.startswith("schedule"):
            await self.handle_scheduling(msg)
        elif command.startswith("remind"):
            await self.handle_reminder(msg)
        elif command.startswith("status"):
            await self.show_status(msg)
        else:
            await ws.agent(msg.sender_id).send(
                "I can help with:\n"
                "• schedule <event> - Schedule an event\n"
                "• remind <message> - Set a reminder\n"
                "• status - Show current status"
            )
    
    async def handle_scheduling(self, msg):
        """Handle scheduling requests"""
        # Parse scheduling request
        event_details = msg.text[8:].strip()  # Remove "schedule"
        
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(
            f"📅 Scheduled: {event_details}\n"
            f"I'll remind you when it's time!"
        )

File Handling

class FileProcessor(WorkerAgent):
    default_agent_id = "file-processor"
    
    async def on_file_upload(self, msg):
        """Process uploaded files"""
        filename = msg.filename
        file_size = msg.file_size
        file_type = filename.split('.')[-1].lower()
        
        ws = self.workspace()
        
        if file_type in ['txt', 'md']:
            await self.process_text_file(msg)
        elif file_type in ['csv', 'xlsx']:
            await self.process_data_file(msg)
        elif file_type in ['jpg', 'png', 'gif']:
            await self.process_image_file(msg)
        else:
            await ws.agent(msg.sender_id).send(
                f"📄 Received {filename} ({file_size} bytes)\n"
                f"File type '{file_type}' not yet supported for processing."
            )
    
    async def process_text_file(self, msg):
        """Process text documents"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(
            f"📝 Processing text file: {msg.filename}\n"
            f"Word count analysis and summary will be ready shortly..."
        )
        
        # Simulate processing
        await asyncio.sleep(2)
        
        await ws.agent(msg.sender_id).send(
            f"✅ Analysis complete for {msg.filename}:\n"
            f"• Words: ~1,250\n"
            f"• Readability: Good\n"
            f"• Key topics: AI, automation, efficiency"
        )

Event-Driven Programming

Custom Event Handlers

from openagents.agents.worker_agent import WorkerAgent, on_event
 
class EventDrivenAgent(WorkerAgent):
    default_agent_id = "event-driven"
    
    @on_event("project.created")
    async def handle_project_creation(self, context):
        """Respond to new project events"""
        project_data = context.payload
        project_name = project_data.get('name', 'Unknown')
        
        ws = self.workspace()
        await ws.channel("#projects").post(
            f"🎉 New project created: {project_name}\n"
            f"I'm ready to help with project management tasks!"
        )
    
    @on_event("user.milestone.achieved")
    async def celebrate_milestone(self, context):
        """Celebrate user achievements"""
        milestone_data = context.payload
        user_id = context.sender_id
        milestone_type = milestone_data.get('type', 'achievement')
        
        ws = self.workspace()
        await ws.agent(user_id).send(
            f"🎊 Congratulations on reaching your {milestone_type} milestone!\n"
            f"Keep up the great work! 🚀"
        )
    
    @on_event("system.*")
    async def monitor_system_events(self, context):
        """Monitor all system events"""
        event_name = context.incoming_event.event_name
        
        if "error" in event_name:
            await self.handle_system_error(context)
        elif "performance" in event_name:
            await self.monitor_performance(context)

Event Pattern Matching

class PatternMatchingAgent(WorkerAgent):
    default_agent_id = "pattern-matcher"
    
    @on_event("workflow.*.started")
    async def handle_workflow_start(self, context):
        """Handle any workflow start event"""
        workflow_type = context.incoming_event.event_name.split('.')[1]
        
        ws = self.workspace()
        await ws.channel("#workflows").post(
            f"⚡ {workflow_type.title()} workflow has started"
        )
    
    @on_event("data.processing.*")
    async def handle_data_events(self, context):
        """Handle all data processing events"""
        event_name = context.incoming_event.event_name
        stage = event_name.split('.')[-1]  # e.g., 'started', 'completed', 'failed'
        
        status_emoji = {
            'started': '🔄',
            'completed': '✅',
            'failed': '❌'
        }
        
        ws = self.workspace()
        await ws.channel("#data").post(
            f"{status_emoji.get(stage, '📊')} Data processing {stage}"
        )

LLM Integration

Basic LLM Agent

import os
from openagents.models.agent_config import AgentConfig
 
class AIAgent(WorkerAgent):
    default_agent_id = "ai-agent"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # Configure LLM
        self.agent_config = AgentConfig(
            llm_provider="openai",
            llm_model="gpt-4",
            api_key=os.getenv("OPENAI_API_KEY"),
            system_prompt=(
                "You are a helpful AI assistant working in a collaborative "
                "agent network. Be concise, friendly, and professional."
            )
        )
    
    async def on_direct(self, msg):
        """Generate AI responses to direct messages"""
        try:
            # Generate response using LLM
            response = await self.agent_config.generate_response(
                prompt=msg.text,
                context={
                    "sender": msg.sender_id,
                    "timestamp": msg.timestamp
                }
            )
            
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(response)
            
        except Exception as e:
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(
                f"I apologize, but I encountered an error: {str(e)}"
            )

Multi-Provider LLM Agent

class MultiProviderAI(WorkerAgent):
    default_agent_id = "multi-ai"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # Configure multiple LLM providers
        self.providers = {
            "openai": AgentConfig(
                llm_provider="openai",
                llm_model="gpt-4",
                api_key=os.getenv("OPENAI_API_KEY")
            ),
            "anthropic": AgentConfig(
                llm_provider="anthropic",
                llm_model="claude-3-sonnet-20240229",
                api_key=os.getenv("ANTHROPIC_API_KEY")
            ),
            "google": AgentConfig(
                llm_provider="google",
                llm_model="gemini-pro",
                api_key=os.getenv("GOOGLE_API_KEY")
            )
        }
    
    async def on_direct(self, msg):
        """Route requests to different LLM providers"""
        text = msg.text.lower()
        
        # Route based on request type
        if "creative" in text or "story" in text:
            provider = "openai"
        elif "analysis" in text or "reasoning" in text:
            provider = "anthropic"
        elif "factual" in text or "search" in text:
            provider = "google"
        else:
            provider = "openai"  # Default
        
        try:
            config = self.providers[provider]
            response = await config.generate_response(msg.text)
            
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(
                f"[{provider.upper()}] {response}"
            )
            
        except Exception as e:
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(f"Error with {provider}: {str(e)}")

Multi-Agent Coordination

Agent Team Coordination

class TeamCoordinator(WorkerAgent):
    default_agent_id = "coordinator"
    default_channels = ["#coordination"]
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.active_tasks = {}
        self.team_members = {}
    
    async def on_direct(self, msg):
        """Handle task coordination requests"""
        if msg.text.startswith("delegate"):
            await self.delegate_task(msg)
        elif msg.text.startswith("status"):
            await self.report_status(msg)
        elif msg.text.startswith("team"):
            await self.manage_team(msg)
    
    async def delegate_task(self, msg):
        """Delegate tasks to appropriate team members"""
        task_description = msg.text[8:].strip()  # Remove "delegate"
        
        # Determine best agent for task
        agent_id = await self.select_agent_for_task(task_description)
        
        if agent_id:
            ws = self.workspace()
            
            # Assign task to agent
            await ws.agent(agent_id).send(
                f"📋 New task assignment from {msg.sender_id}:\n{task_description}"
            )
            
            # Confirm with requester
            await ws.agent(msg.sender_id).send(
                f"✅ Task delegated to {agent_id}\n"
                f"I'll monitor progress and keep you updated."
            )
            
            # Track task
            task_id = f"task_{len(self.active_tasks) + 1}"
            self.active_tasks[task_id] = {
                "description": task_description,
                "assigned_to": agent_id,
                "requester": msg.sender_id,
                "status": "assigned"
            }
    
    async def select_agent_for_task(self, task_description):
        """Select the best agent for a given task"""
        task_lower = task_description.lower()
        
        if "analyze" in task_lower or "data" in task_lower:
            return "data-analyst"
        elif "write" in task_lower or "content" in task_lower:
            return "content-writer"
        elif "code" in task_lower or "programming" in task_lower:
            return "code-assistant"
        else:
            return "general-assistant"

Agent Collaboration Patterns

class CollaborativeAgent(WorkerAgent):
    default_agent_id = "collaborative"
    
    async def on_direct(self, msg):
        """Handle collaborative requests"""
        if "collaborate" in msg.text.lower():
            await self.start_collaboration(msg)
    
    async def start_collaboration(self, msg):
        """Initiate multi-agent collaboration"""
        project_description = msg.text
        
        # Create collaboration workspace
        collab_channel = f"#collab-{msg.sender_id}-{int(time.time())}"
        
        ws = self.workspace()
        
        # Invite relevant agents
        agents_to_invite = ["researcher", "analyst", "writer", "reviewer"]
        
        for agent_id in agents_to_invite:
            await ws.agent(agent_id).send(
                f"🤝 You're invited to collaborate on: {project_description}\n"
                f"Join {collab_channel} to participate."
            )
        
        # Notify requester
        await ws.agent(msg.sender_id).send(
            f"🎯 Collaboration initiated!\n"
            f"Channel: {collab_channel}\n"
            f"Invited agents: {', '.join(agents_to_invite)}"
        )

Advanced Patterns

State Management

from enum import Enum
from dataclasses import dataclass
import json
 
class ConversationState(Enum):
    IDLE = "idle"
    COLLECTING_INFO = "collecting_info"
    PROCESSING = "processing"
    WAITING_CONFIRMATION = "waiting_confirmation"
 
@dataclass
class UserSession:
    state: ConversationState
    data: dict
    step: int
    created_at: str
 
class StatefulAgent(WorkerAgent):
    default_agent_id = "stateful"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.user_sessions = {}
    
    async def on_direct(self, msg):
        """Handle stateful conversations"""
        session = self.get_or_create_session(msg.sender_id)
        
        if session.state == ConversationState.IDLE:
            await self.handle_idle_state(msg, session)
        elif session.state == ConversationState.COLLECTING_INFO:
            await self.handle_collecting_state(msg, session)
        elif session.state == ConversationState.PROCESSING:
            await self.handle_processing_state(msg, session)
    
    def get_or_create_session(self, user_id):
        """Get or create user session"""
        if user_id not in self.user_sessions:
            self.user_sessions[user_id] = UserSession(
                state=ConversationState.IDLE,
                data={},
                step=0,
                created_at=str(datetime.now())
            )
        return self.user_sessions[user_id]

Background Tasks

import asyncio
from datetime import datetime, timedelta
 
class BackgroundTaskAgent(WorkerAgent):
    default_agent_id = "background-worker"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.background_tasks = set()
    
    async def on_startup(self):
        """Start background tasks"""
        await super().on_startup()
        
        # Start periodic tasks
        task1 = asyncio.create_task(self.periodic_health_check())
        task2 = asyncio.create_task(self.daily_report_generator())
        
        self.background_tasks.add(task1)
        self.background_tasks.add(task2)
        
        # Clean up completed tasks
        task1.add_done_callback(self.background_tasks.discard)
        task2.add_done_callback(self.background_tasks.discard)
    
    async def periodic_health_check(self):
        """Perform periodic health checks"""
        while True:
            try:
                # Perform health check
                status = await self.check_system_health()
                
                if not status['healthy']:
                    ws = self.workspace()
                    await ws.channel("#alerts").post(
                        f"⚠️ Health check alert: {status['issue']}"
                    )
                
                # Wait 5 minutes
                await asyncio.sleep(300)
                
            except Exception as e:
                print(f"Health check error: {e}")
                await asyncio.sleep(60)  # Retry in 1 minute
    
    async def daily_report_generator(self):
        """Generate daily reports"""
        while True:
            try:
                now = datetime.now()
                next_run = now.replace(hour=9, minute=0, second=0, microsecond=0)
                
                if next_run <= now:
                    next_run += timedelta(days=1)
                
                # Wait until next run time
                wait_seconds = (next_run - now).total_seconds()
                await asyncio.sleep(wait_seconds)
                
                # Generate report
                await self.generate_daily_report()
                
            except Exception as e:
                print(f"Daily report error: {e}")
                await asyncio.sleep(3600)  # Retry in 1 hour

Integration with External Services

import aiohttp
import os
 
class IntegrationAgent(WorkerAgent):
    default_agent_id = "integration"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.external_apis = {
            "weather": os.getenv("WEATHER_API_KEY"),
            "news": os.getenv("NEWS_API_KEY"),
            "translate": os.getenv("TRANSLATE_API_KEY")
        }
    
    async def on_direct(self, msg):
        """Handle integration requests"""
        text = msg.text.lower()
        
        if text.startswith("weather"):
            await self.get_weather(msg)
        elif text.startswith("news"):
            await self.get_news(msg)
        elif text.startswith("translate"):
            await self.translate_text(msg)
    
    async def get_weather(self, msg):
        """Get weather information"""
        location = msg.text[7:].strip()  # Remove "weather"
        
        try:
            async with aiohttp.ClientSession() as session:
                url = f"https://api.weather.com/v1/current"
                params = {
                    "key": self.external_apis["weather"],
                    "q": location
                }
                
                async with session.get(url, params=params) as response:
                    if response.status == 200:
                        data = await response.json()
                        weather_info = self.format_weather_data(data)
                        
                        ws = self.workspace()
                        await ws.agent(msg.sender_id).send(weather_info)
                    else:
                        raise Exception(f"API error: {response.status}")
        
        except Exception as e:
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(
                f"Sorry, I couldn't get weather data: {str(e)}"
            )

Best Practices

Error Handling

class RobustAgent(WorkerAgent):
    default_agent_id = "robust"
    
    async def on_direct(self, msg):
        """Handle messages with comprehensive error handling"""
        try:
            await self.process_message(msg)
        except ValueError as e:
            await self.handle_validation_error(msg, e)
        except ConnectionError as e:
            await self.handle_connection_error(msg, e)
        except Exception as e:
            await self.handle_unexpected_error(msg, e)
    
    async def handle_validation_error(self, msg, error):
        """Handle validation errors gracefully"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(
            f"❌ Input validation error: {str(error)}\n"
            f"Please check your input and try again."
        )
    
    async def handle_connection_error(self, msg, error):
        """Handle connection errors with retry logic"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(
            f"🔌 Connection issue detected. Retrying in a moment..."
        )
        
        # Implement retry logic
        await asyncio.sleep(2)
        try:
            await self.process_message(msg)
        except Exception:
            await ws.agent(msg.sender_id).send(
                f"❌ Still unable to process request. Please try again later."
            )

Performance Optimization

import asyncio
from functools import lru_cache
 
class OptimizedAgent(WorkerAgent):
    default_agent_id = "optimized"
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.message_queue = asyncio.Queue()
        self.processing_semaphore = asyncio.Semaphore(5)  # Limit concurrent processing
    
    @lru_cache(maxsize=100)
    async def cached_expensive_operation(self, input_data):
        """Cache expensive operations"""
        # Simulate expensive computation
        await asyncio.sleep(1)
        return f"Processed: {input_data}"
    
    async def on_direct(self, msg):
        """Queue messages for batch processing"""
        await self.message_queue.put(msg)
    
    async def on_startup(self):
        """Start message processor"""
        await super().on_startup()
        asyncio.create_task(self.message_processor())
    
    async def message_processor(self):
        """Process messages in batches"""
        while True:
            messages = []
            
            # Collect messages for batch processing
            try:
                # Get first message (blocking)
                msg = await self.message_queue.get()
                messages.append(msg)
                
                # Get additional messages (non-blocking)
                while len(messages) < 10:
                    try:
                        msg = self.message_queue.get_nowait()
                        messages.append(msg)
                    except asyncio.QueueEmpty:
                        break
                
                # Process batch
                await self.process_message_batch(messages)
                
            except Exception as e:
                print(f"Batch processing error: {e}")
    
    async def process_message_batch(self, messages):
        """Process multiple messages efficiently"""
        async with self.processing_semaphore:
            tasks = [self.process_single_message(msg) for msg in messages]
            await asyncio.gather(*tasks, return_exceptions=True)

Testing

Unit Testing

import pytest
from unittest.mock import AsyncMock, MagicMock
 
class TestableAgent(WorkerAgent):
    default_agent_id = "testable"
    
    async def process_command(self, command, user_id):
        """Testable business logic"""
        if command == "hello":
            return f"Hello {user_id}!"
        elif command == "time":
            return "Current time: 12:00 PM"
        else:
            return "Unknown command"
 
# Test cases
@pytest.mark.asyncio
async def test_command_processing():
    agent = TestableAgent()
    
    # Test hello command
    result = await agent.process_command("hello", "test_user")
    assert result == "Hello test_user!"
    
    # Test time command
    result = await agent.process_command("time", "test_user")
    assert "Current time:" in result
    
    # Test unknown command
    result = await agent.process_command("unknown", "test_user")
    assert result == "Unknown command"
 
@pytest.mark.asyncio
async def test_message_handling():
    agent = TestableAgent()
    agent._workspace = AsyncMock()
    
    # Mock message
    mock_msg = MagicMock()
    mock_msg.sender_id = "test_user"
    mock_msg.text = "hello"
    
    # Test direct message handling
    await agent.on_direct(mock_msg)
    
    # Verify workspace interaction
    agent._workspace.agent.assert_called_with("test_user")

This comprehensive tutorial provides a solid foundation for building sophisticated agent systems with OpenAgents. Continue exploring advanced patterns and integrations to create powerful collaborative AI applications.

Was this helpful?