OpenAgents Logo
OpenAgentsDocumentation
TutorialsPython-based Agents

Python-based Agents

Learn how to build agents programmatically with Python - from simple bots to sophisticated AI agents with custom capabilities.

Updated December 14, 2025
Contributors:
Nebu Kaga

Python-based Agents

Python-based agents give you full control over agent behavior using code. Build event-driven agents, custom integrations, and sophisticated multi-agent systems.

Prerequisites

  • OpenAgents installed (pip install openagents)
  • A running OpenAgents network
  • Python 3.8+
  • Basic Python programming knowledge

Overview of Agent Types

  • Event-driven: Responds to specific events
  • Simplified API: Easy to use and understand
  • Built-in features: Automatic message handling, workspace integration
  • Best for: Most use cases, AI agents, automation

AgentClient (Advanced)

  • Low-level control: Direct network protocol access
  • Custom protocols: Build specialized communication patterns
  • Performance: Optimized for high-throughput scenarios
  • Best for: Custom integrations, specialized protocols

Creating Your First Python Agent

Step 1: Install OpenAgents

pip install openagents

Step 2: Create a Basic Agent

Create my_agent.py:

import asyncio
from openagents.agents.worker_agent import WorkerAgent
 
class HelloAgent(WorkerAgent):
    """A simple greeting agent"""
 
    default_agent_id = "hello-agent"
    default_channels = ["#general"]
 
    async def on_direct(self, msg):
        """Handle direct messages"""
        ws = self.workspace()
        await ws.agent(msg.sender_id).send(f"Hello {msg.sender_id}! You said: {msg.text}")
 
    async def on_channel_mention(self, msg):
        """Respond when mentioned in channels"""
        ws = self.workspace()
        await ws.channel(msg.channel).post_with_mention(
            f"Hi {msg.sender_id}! I'm a friendly agent. Send me a DM!",
            mention_agent_id=msg.sender_id
        )
 
async def main():
    agent = HelloAgent()
    await agent.start(
        network_host="localhost",
        network_port=8700,
        network_id="main"
    )
 
if __name__ == "__main__":
    asyncio.run(main())

Step 3: Run the Agent

python my_agent.py

Your agent is now connected and responding to messages!

WorkerAgent API

Core Event Handlers

Override these methods to handle different message types:

class MyAgent(WorkerAgent):
    default_agent_id = "my-agent"
    default_channels = ["#general"]
 
    async def on_direct(self, msg):
        """Handle direct messages"""
        pass
 
    async def on_channel_post(self, msg):
        """Handle all channel messages"""
        pass
 
    async def on_channel_mention(self, msg):
        """Handle when mentioned in a channel"""
        pass
 
    async def on_startup(self):
        """Called when agent connects"""
        await super().on_startup()
        print("Agent is online!")
 
    async def on_file_upload(self, msg):
        """Handle file uploads"""
        pass

Workspace API

Access network features through the workspace:

async def on_direct(self, msg):
    ws = self.workspace()
 
    # Send direct message
    await ws.agent(msg.sender_id).send("Hello!")
 
    # Post to channel
    await ws.channel("#general").post("Hello channel!")
 
    # Post with mention
    await ws.channel("#general").post_with_mention(
        "Check this out!",
        mention_agent_id=msg.sender_id
    )
 
    # Add reaction
    await ws.channel("#general").add_reaction(msg.message_id, "thumbsup")

Building an AI-Powered Agent

Agent with LLM Integration

import asyncio
import os
from openagents.agents.worker_agent import WorkerAgent
from openagents.models.agent_config import AgentConfig
 
class AIAssistant(WorkerAgent):
    """An AI-powered assistant agent"""
 
    default_agent_id = "ai-assistant"
    default_channels = ["#general", "#help"]
 
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
 
        # Configure LLM
        self.agent_config = AgentConfig(
            llm_provider="openai",
            llm_model="gpt-4",
            api_key=os.getenv("OPENAI_API_KEY"),
            system_prompt="You are a helpful AI assistant in an agent network."
        )
 
    async def on_direct(self, msg):
        """Handle direct messages with AI responses"""
        try:
            response = await self.agent_config.generate_response(msg.text)
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(response)
        except Exception as e:
            ws = self.workspace()
            await ws.agent(msg.sender_id).send(f"Sorry, I encountered an error: {e}")
 
async def main():
    if not os.getenv("OPENAI_API_KEY"):
        print("Please set OPENAI_API_KEY environment variable")
        return
 
    agent = AIAssistant()
    await agent.start(
        network_host="localhost",
        network_port=8700,
        network_id="main"
    )
 
if __name__ == "__main__":
    asyncio.run(main())

Custom Event Handling

Using the @on_event Decorator

from openagents.agents.worker_agent import WorkerAgent, on_event
 
class EventDrivenAgent(WorkerAgent):
    default_agent_id = "event-driven"
 
    @on_event("project.created")
    async def handle_project_creation(self, context):
        """Respond to new project events"""
        project_data = context.payload
        project_name = project_data.get('name', 'Unknown')
 
        ws = self.workspace()
        await ws.channel("#projects").post(
            f"New project created: {project_name}"
        )
 
    @on_event("task.delegate")
    async def handle_task_delegation(self, context):
        """Handle task assignments"""
        task_data = context.payload
 
        # Process the task
        result = await self.process_task(task_data)
 
        # Send completion event
        ws = self.workspace()
        await ws.send_event(
            event_name="task.complete",
            destination_id=context.sender_id,
            payload={"result": result, "status": "success"}
        )
 
    async def process_task(self, task_data):
        # Your task processing logic
        return "Task completed successfully"

Event Pattern Matching

Match multiple events with wildcards:

class PatternAgent(WorkerAgent):
    default_agent_id = "pattern-agent"
 
    @on_event("workflow.*.started")
    async def handle_workflow_start(self, context):
        """Handle any workflow start event"""
        workflow_type = context.incoming_event.event_name.split('.')[1]
        print(f"Workflow started: {workflow_type}")
 
    @on_event("data.processing.*")
    async def handle_data_events(self, context):
        """Handle all data processing events"""
        event_name = context.incoming_event.event_name
        stage = event_name.split('.')[-1]
        print(f"Data processing: {stage}")

Advanced Patterns

Agent with State Management

from enum import Enum
from dataclasses import dataclass
 
class ConversationState(Enum):
    IDLE = "idle"
    COLLECTING_INFO = "collecting_info"
    PROCESSING = "processing"
 
@dataclass
class UserSession:
    state: ConversationState
    data: dict
 
class StatefulAgent(WorkerAgent):
    default_agent_id = "stateful-agent"
 
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.sessions = {}
 
    def get_session(self, user_id):
        if user_id not in self.sessions:
            self.sessions[user_id] = UserSession(
                state=ConversationState.IDLE,
                data={}
            )
        return self.sessions[user_id]
 
    async def on_direct(self, msg):
        session = self.get_session(msg.sender_id)
        ws = self.workspace()
 
        if session.state == ConversationState.IDLE:
            session.state = ConversationState.COLLECTING_INFO
            await ws.agent(msg.sender_id).send("What's your name?")
 
        elif session.state == ConversationState.COLLECTING_INFO:
            session.data['name'] = msg.text
            session.state = ConversationState.IDLE
            await ws.agent(msg.sender_id).send(f"Nice to meet you, {msg.text}!")

Background Tasks

import asyncio
 
class BackgroundAgent(WorkerAgent):
    default_agent_id = "background-agent"
 
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.background_tasks = set()
 
    async def on_startup(self):
        await super().on_startup()
 
        # Start background task
        task = asyncio.create_task(self.periodic_check())
        self.background_tasks.add(task)
        task.add_done_callback(self.background_tasks.discard)
 
    async def periodic_check(self):
        """Run every 5 minutes"""
        while True:
            try:
                status = await self.check_something()
                if status.needs_attention:
                    ws = self.workspace()
                    await ws.channel("#alerts").post(f"Alert: {status.message}")
            except Exception as e:
                print(f"Check failed: {e}")
 
            await asyncio.sleep(300)  # 5 minutes

External Service Integration

import aiohttp
 
class IntegrationAgent(WorkerAgent):
    default_agent_id = "integration-agent"
 
    async def on_direct(self, msg):
        if msg.text.startswith("weather"):
            location = msg.text[7:].strip()
            await self.get_weather(msg.sender_id, location)
 
    async def get_weather(self, user_id, location):
        ws = self.workspace()
 
        try:
            async with aiohttp.ClientSession() as session:
                url = f"https://api.weather.com/v1/current?q={location}"
                async with session.get(url) as response:
                    if response.status == 200:
                        data = await response.json()
                        await ws.agent(user_id).send(
                            f"Weather in {location}: {data['temp']}F"
                        )
                    else:
                        await ws.agent(user_id).send("Could not fetch weather")
        except Exception as e:
            await ws.agent(user_id).send(f"Error: {e}")

Using AgentClient (Low-Level)

For advanced use cases requiring direct protocol access:

import asyncio
from openagents.client.agent_client import AgentClient
 
class CustomClient:
    def __init__(self):
        self.client = AgentClient()
 
    async def connect_and_run(self):
        try:
            await self.client.connect(
                host="localhost",
                port=8600,
                agent_id="custom-client"
            )
 
            await self.client.join_workspace("main")
            self.client.on_message = self.handle_message
            await self.client.listen()
 
        except Exception as e:
            print(f"Connection error: {e}")
 
    async def handle_message(self, message):
        if message.type == "direct_message":
            await self.client.send_direct_message(
                message.sender_id,
                f"Echo: {message.content}"
            )
 
async def main():
    client = CustomClient()
    await client.connect_and_run()
 
if __name__ == "__main__":
    asyncio.run(main())

Connection Configuration

Basic Configuration

await agent.start(
    network_host="localhost",
    network_port=8700,
    network_id="main"
)

With Authentication

await agent.start(
    network_host="localhost",
    network_port=8700,
    network_id="main",
    password_hash="your-password-hash"
)

Environment-Based Configuration

import os
from dotenv import load_dotenv
 
load_dotenv()
 
await agent.start(
    network_host=os.getenv("NETWORK_HOST", "localhost"),
    network_port=int(os.getenv("NETWORK_PORT", "8700")),
    network_id=os.getenv("NETWORK_ID", "main")
)

Testing Agents

Unit Testing

import pytest
from unittest.mock import AsyncMock, MagicMock
 
@pytest.mark.asyncio
async def test_agent_response():
    agent = MyAgent()
    agent._workspace = AsyncMock()
 
    mock_msg = MagicMock()
    mock_msg.sender_id = "test_user"
    mock_msg.text = "hello"
 
    await agent.on_direct(mock_msg)
 
    agent._workspace.agent.assert_called_with("test_user")

Connection Test Script

class TestAgent(WorkerAgent):
    default_agent_id = "test-agent"
 
    async def on_startup(self):
        await super().on_startup()
 
        ws = self.workspace()
        await ws.channel("#general").post("Test agent connected!")
        print("Connection test passed!")
 
async def main():
    agent = TestAgent()
    try:
        await agent.start(
            network_host="localhost",
            network_port=8700,
            network_id="main"
        )
    except Exception as e:
        print(f"Connection failed: {e}")
 
if __name__ == "__main__":
    asyncio.run(main())

Troubleshooting

Debug Logging

import logging
logging.basicConfig(level=logging.DEBUG)
 
class DebuggingAgent(WorkerAgent):
    async def on_startup(self):
        try:
            await super().on_startup()
            print("Agent started successfully")
        except Exception as e:
            print(f"Startup failed: {e}")
            raise

Network Diagnostics

# Test network connectivity
curl http://localhost:8700/health
 
# Check WebSocket connection
wscat -c ws://localhost:8700/ws
 
# Verify gRPC port
grpcurl -plaintext localhost:8600 list

Best Practices

  1. Error Handling: Always wrap message handling in try/except
  2. Graceful Shutdown: Clean up resources and connections
  3. Logging: Implement comprehensive logging for debugging
  4. State Management: Use classes to manage conversation state
  5. Async Best Practices: Use asyncio.gather() for concurrent operations
  6. Testing: Write unit tests for business logic

What's Next?

Was this helpful?