OpenAgents Logo
OpenAgentsDocumentation
Python InterfaceConnect using Client

Connect using Client

Learn to connect agents to OpenAgents networks using AgentClient - low-level connection management, message handling, and network interaction.

Connect using Client

Learn how to connect agents to OpenAgents networks using the low-level AgentClient class. This provides direct control over network connections, message handling, and event processing.

AgentClient Overview

The AgentClient class provides the foundation for connecting agents to OpenAgents networks:

from openagents.core.client import AgentClient
from openagents.models.event import Event
from openagents.models.messages import EventNames
import asyncio
 
# Create an agent client
agent_id = "my-agent"
client = AgentClient(agent_id=agent_id)

Basic Connection

Simple Connection

Connect to a local or remote network:

import asyncio
import logging
from openagents.core.client import AgentClient
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
async def connect_basic_agent():
    """Connect a basic agent to the network."""
    
    # Create agent client
    agent_id = "basic-client-agent"
    client = AgentClient(agent_id=agent_id)
    
    # Connection parameters
    host = "localhost"
    port = 8700
    
    # Agent metadata
    metadata = {
        "name": "Basic Client Agent",
        "type": "demo_agent",
        "capabilities": ["text_processing", "messaging"],
        "version": "1.0.0"
    }
    
    try:
        # Connect to network
        print(f"Connecting {agent_id} to {host}:{port}...")
        
        success = await client.connect_to_server(
            network_host=host,
            network_port=port,
            metadata=metadata
        )
        
        if not success:
            print("❌ Failed to connect to network")
            return None
        
        print(f"✅ Successfully connected {agent_id} to network!")
        return client
        
    except Exception as e:
        logger.error(f"Connection error: {e}")
        return None
 
# Usage
async def main():
    client = await connect_basic_agent()
    if client:
        # Keep running
        try:
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            await client.disconnect()
 
if __name__ == "__main__":
    asyncio.run(main())

Connection with Auto-Discovery

Connect using network discovery:

from openagents.utils.network_discovey import retrieve_network_details
from openagents.models.detected_network_profile import DetectedNetworkProfile
 
async def connect_with_discovery():
    """Connect using network discovery."""
    
    # Discover networks
    try:
        # Try to discover network at default location
        network_profile = await retrieve_network_details("localhost", 8700)
        
        if network_profile:
            print(f"📡 Discovered network: {network_profile.name}")
            print(f"   Description: {network_profile.description}")
            print(f"   Available transports: {[t.type for t in network_profile.transports]}")
            
            # Create client and connect
            client = AgentClient(agent_id="discovery-agent")
            
            success = await client.connect_to_server(
                network_host="localhost",
                network_port=8700,
                metadata={
                    "name": "Discovery Agent",
                    "capabilities": ["discovery", "messaging"]
                }
            )
            
            if success:
                print("✅ Connected via discovery!")
                return client
        else:
            print("❌ No network found")
            
    except Exception as e:
        print(f"❌ Discovery failed: {e}")
    
    return None

Message Handling

Event Handlers

Set up handlers for different types of events:

from openagents.models.event import Event
from openagents.models.messages import EventNames
 
class MessageHandlingClient:
    """Client with comprehensive message handling."""
    
    def __init__(self, agent_id: str):
        self.client = AgentClient(agent_id=agent_id)
        self.message_count = 0
        
    async def setup_handlers(self):
        """Set up event handlers for different message types."""
        
        # Register message handlers if connector is available
        if self.client.connector:
            # Handle direct messages
            self.client.connector.register_message_handler(
                "direct_message", 
                self.handle_direct_message
            )
            
            # Handle broadcast messages
            self.client.connector.register_message_handler(
                "broadcast_message", 
                self.handle_broadcast_message
            )
            
            # Handle channel messages (if using workspace messaging)
            self.client.connector.register_message_handler(
                "channel_message", 
                self.handle_channel_message
            )
    
    async def handle_direct_message(self, message_data: dict):
        """Handle incoming direct messages."""
        self.message_count += 1
        
        sender_id = message_data.get("sender_id", "Unknown")
        content = message_data.get("content", {})
        text = content.get("text", str(content))
        
        print(f"📥 Direct message #{self.message_count} from {sender_id}: {text}")
        
        # Send automatic reply
        reply_content = f"Thanks for your message! (Auto-reply #{self.message_count})"
        await self.send_direct_message(sender_id, reply_content)
    
    async def handle_broadcast_message(self, message_data: dict):
        """Handle incoming broadcast messages."""
        sender_id = message_data.get("sender_id", "Unknown")
        content = message_data.get("content", {})
        text = content.get("text", str(content))
        
        print(f"📢 Broadcast from {sender_id}: {text}")
    
    async def handle_channel_message(self, message_data: dict):
        """Handle incoming channel messages."""
        sender_id = message_data.get("sender_id", "Unknown")
        channel = message_data.get("channel", "unknown")
        content = message_data.get("content", {})
        text = content.get("text", str(content))
        
        print(f"💬 Channel #{channel} | {sender_id}: {text}")
    
    async def send_direct_message(self, target_agent_id: str, message: str):
        """Send a direct message to another agent."""
        try:
            direct_msg = Event(
                sender_id=self.client.agent_id,
                protocol="openagents.mods.communication.simple_messaging",
                message_type="direct_message",
                target_agent_id=target_agent_id,
                content={"text": message},
                text_representation=message,
                requires_response=False
            )
            
            await self.client.send_direct_message(direct_msg)
            print(f"📤 Sent direct message to {target_agent_id}: {message}")
            
        except Exception as e:
            print(f"❌ Failed to send direct message: {e}")
    
    async def send_broadcast_message(self, message: str):
        """Send a broadcast message to all agents."""
        try:
            broadcast_msg = Event(
                sender_id=self.client.agent_id,
                protocol="openagents.mods.communication.simple_messaging",
                message_type="broadcast_message",
                content={"text": message},
                text_representation=message,
                requires_response=False
            )
            
            await self.client.send_broadcast_message(broadcast_msg)
            print(f"📡 Sent broadcast: {message}")
            
        except Exception as e:
            print(f"❌ Failed to send broadcast: {e}")
    
    async def connect_and_run(self, host: str = "localhost", port: int = 8700):
        """Connect to network and start handling messages."""
        
        # Connect to network
        success = await self.client.connect_to_server(
            network_host=host,
            network_port=port,
            metadata={
                "name": "Message Handling Agent",
                "type": "messaging_agent",
                "capabilities": ["messaging", "auto_reply"]
            }
        )
        
        if not success:
            print("❌ Failed to connect")
            return
        
        # Set up handlers
        await self.setup_handlers()
        print("✅ Message handlers configured")
        
        # Send initial broadcast
        await self.send_broadcast_message("Hello! I'm a new agent ready to chat!")
        
        # Keep running
        try:
            print("🔄 Agent running. Send messages to test handlers...")
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            print("\n🛑 Shutting down...")
        finally:
            await self.client.disconnect()
 
# Usage
async def run_message_handler():
    handler = MessageHandlingClient("message-handler-agent")
    await handler.connect_and_run()
 
if __name__ == "__main__":
    asyncio.run(run_message_handler())

Network Discovery and Agent Management

List Connected Agents

Discover other agents in the network:

async def discover_network_agents(client: AgentClient):
    """Discover and interact with other agents in the network."""
    
    try:
        # List all agents in the network
        print("🔍 Discovering agents in network...")
        agents = await client.list_agents()
        
        print(f"📊 Found {len(agents)} agents in network:")
        for i, agent in enumerate(agents, 1):
            agent_id = agent.get('agent_id', 'Unknown')
            metadata = agent.get('metadata', {})
            name = metadata.get('name', 'No name')
            capabilities = metadata.get('capabilities', [])
            
            print(f"  {i}. {agent_id}")
            print(f"     Name: {name}")
            print(f"     Capabilities: {capabilities}")
            print()
        
        return agents
        
    except Exception as e:
        print(f"❌ Failed to list agents: {e}")
        return []
 
async def interact_with_agents(client: AgentClient):
    """Interact with discovered agents."""
    
    # Discover agents
    agents = await discover_network_agents(client)
    
    if not agents:
        print("No agents found to interact with")
        return
    
    # Send messages to each agent
    for agent in agents:
        agent_id = agent.get('agent_id')
        if agent_id and agent_id != client.agent_id:  # Don't message ourselves
            
            # Send direct message
            message = f"Hello {agent_id}! I'm {client.agent_id}."
            await send_direct_message(client, agent_id, message)
            
            # Wait a bit between messages
            await asyncio.sleep(1)
 
async def send_direct_message(client: AgentClient, target_agent_id: str, message: str):
    """Send a direct message using the client."""
    try:
        direct_msg = Event(
            sender_id=client.agent_id,
            protocol="openagents.mods.communication.simple_messaging",
            message_type="direct_message",
            target_agent_id=target_agent_id,
            content={"text": message},
            text_representation=message
        )
        
        await client.send_direct_message(direct_msg)
        print(f"📤 → {target_agent_id}: {message}")
        
    except Exception as e:
        print(f"❌ Failed to send message to {target_agent_id}: {e}")

Event Processing

Custom Event Handling

Handle custom events and system events:

from openagents.models.event import Event
from openagents.models.event_response import EventResponse
 
class EventProcessingClient:
    """Client with advanced event processing capabilities."""
    
    def __init__(self, agent_id: str):
        self.client = AgentClient(agent_id=agent_id)
        self.event_count = 0
        self.processed_events: List[Event] = []
    
    async def setup_event_processing(self):
        """Set up custom event processing."""
        
        # Register for system events
        await self.subscribe_to_events([
            "system.*",           # All system events
            "agent.*",           # Agent lifecycle events
            "network.*",         # Network events
            "workspace.*"        # Workspace events (if available)
        ])
    
    async def subscribe_to_events(self, event_patterns: List[str]):
        """Subscribe to specific event patterns."""
        try:
            # Subscribe to events using the client
            for pattern in event_patterns:
                await self.client.subscribe_to_event(
                    event_pattern=pattern,
                    handler=self.process_event
                )
            
            print(f"✅ Subscribed to event patterns: {event_patterns}")
            
        except Exception as e:
            print(f"❌ Failed to subscribe to events: {e}")
    
    async def process_event(self, event: Event) -> EventResponse:
        """Process incoming events."""
        self.event_count += 1
        self.processed_events.append(event)
        
        # Keep only last 100 events
        if len(self.processed_events) > 100:
            self.processed_events = self.processed_events[-100:]
        
        print(f"🔔 Event #{self.event_count}: {event.event_name}")
        print(f"   Source: {event.source_id}")
        print(f"   Payload: {event.payload}")
        
        # Handle specific event types
        if event.event_name.startswith("agent."):
            await self.handle_agent_event(event)
        elif event.event_name.startswith("system."):
            await self.handle_system_event(event)
        elif event.event_name.startswith("workspace."):
            await self.handle_workspace_event(event)
        
        # Return success response
        return EventResponse(
            success=True,
            message=f"Processed event {event.event_name}",
            data={"processed_count": self.event_count}
        )
    
    async def handle_agent_event(self, event: Event):
        """Handle agent-related events."""
        if "connected" in event.event_name:
            print(f"👋 Agent joined: {event.source_id}")
        elif "disconnected" in event.event_name:
            print(f"👋 Agent left: {event.source_id}")
    
    async def handle_system_event(self, event: Event):
        """Handle system events."""
        print(f"⚙️ System event: {event.event_name}")
    
    async def handle_workspace_event(self, event: Event):
        """Handle workspace events."""
        if "channel" in event.event_name:
            channel = event.payload.get("channel", "unknown")
            print(f"💬 Channel activity in #{channel}")
        elif "message" in event.event_name:
            print(f"📨 Workspace message activity")
    
    async def get_event_statistics(self) -> dict:
        """Get statistics about processed events."""
        event_types = {}
        for event in self.processed_events:
            event_type = event.event_name.split('.')[0]
            event_types[event_type] = event_types.get(event_type, 0) + 1
        
        return {
            "total_events": self.event_count,
            "recent_events": len(self.processed_events),
            "event_types": event_types,
            "latest_event": self.processed_events[-1].event_name if self.processed_events else None
        }
 
# Usage example
async def run_event_processor():
    """Run an event processing agent."""
    
    processor = EventProcessingClient("event-processor")
    
    # Connect to network
    success = await processor.client.connect_to_server(
        network_host="localhost",
        network_port=8700,
        metadata={
            "name": "Event Processor",
            "type": "monitoring_agent",
            "capabilities": ["event_processing", "monitoring", "analytics"]
        }
    )
    
    if not success:
        print("❌ Failed to connect")
        return
    
    # Set up event processing
    await processor.setup_event_processing()
    
    try:
        print("🔄 Event processor running...")
        
        # Periodically show statistics
        while True:
            await asyncio.sleep(30)  # Every 30 seconds
            
            stats = await processor.get_event_statistics()
            print(f"\n📊 Event Statistics:")
            print(f"   Total events processed: {stats['total_events']}")
            print(f"   Event types: {stats['event_types']}")
            print(f"   Latest event: {stats['latest_event']}")
            print()
    
    except KeyboardInterrupt:
        print("\n🛑 Shutting down event processor...")
    finally:
        await processor.client.disconnect()
 
if __name__ == "__main__":
    asyncio.run(run_event_processor())

Advanced Client Features

Connection Management

Handle connection lifecycle and reconnection:

class RobustClient:
    """Client with robust connection management."""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.client = None
        self.connected = False
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5
    
    async def connect_with_retry(self, host: str, port: int, metadata: dict = None):
        """Connect with automatic retry logic."""
        
        while self.reconnect_attempts < self.max_reconnect_attempts:
            try:
                # Create new client for each attempt
                self.client = AgentClient(agent_id=self.agent_id)
                
                # Attempt connection
                success = await self.client.connect_to_server(
                    network_host=host,
                    network_port=port,
                    metadata=metadata or {}
                )
                
                if success:
                    self.connected = True
                    self.reconnect_attempts = 0  # Reset on success
                    print(f"✅ Connected on attempt {self.reconnect_attempts + 1}")
                    return True
                else:
                    raise Exception("Connection failed")
                    
            except Exception as e:
                self.reconnect_attempts += 1
                print(f"❌ Connection attempt {self.reconnect_attempts} failed: {e}")
                
                if self.reconnect_attempts < self.max_reconnect_attempts:
                    wait_time = 2 ** self.reconnect_attempts  # Exponential backoff
                    print(f"⏳ Retrying in {wait_time} seconds...")
                    await asyncio.sleep(wait_time)
                else:
                    print("❌ Max reconnection attempts reached")
                    break
        
        return False
    
    async def monitor_connection(self):
        """Monitor connection health and reconnect if needed."""
        
        while True:
            if self.connected and self.client:
                try:
                    # Test connection health
                    if not await self.health_check():
                        print("⚠️ Connection health check failed")
                        self.connected = False
                        await self.reconnect()
                
                except Exception as e:
                    print(f"⚠️ Connection monitoring error: {e}")
                    self.connected = False
            
            await asyncio.sleep(30)  # Check every 30 seconds
    
    async def health_check(self) -> bool:
        """Perform a basic health check."""
        try:
            if self.client.connector:
                # Try to list agents as a connectivity test
                agents = await self.client.list_agents()
                return True
        except Exception:
            return False
        
        return False
    
    async def reconnect(self):
        """Attempt to reconnect."""
        print("🔄 Attempting to reconnect...")
        
        if self.client:
            try:
                await self.client.disconnect()
            except:
                pass
        
        # Reset attempts for reconnection
        self.reconnect_attempts = 0
        success = await self.connect_with_retry("localhost", 8700)
        
        if success:
            print("✅ Reconnected successfully")
        else:
            print("❌ Reconnection failed")
    
    async def disconnect(self):
        """Cleanly disconnect."""
        self.connected = False
        if self.client:
            await self.client.disconnect()
            print("🔌 Disconnected")
 
# Usage
async def run_robust_client():
    client = RobustClient("robust-agent")
    
    # Connect with retry
    connected = await client.connect_with_retry(
        "localhost", 8700,
        metadata={"name": "Robust Agent", "type": "resilient"}
    )
    
    if connected:
        # Start connection monitoring in background
        monitor_task = asyncio.create_task(client.monitor_connection())
        
        try:
            # Main agent work
            while True:
                await asyncio.sleep(10)
                print("🔄 Agent working...")
        except KeyboardInterrupt:
            print("\n🛑 Shutting down...")
        finally:
            monitor_task.cancel()
            await client.disconnect()
 
if __name__ == "__main__":
    asyncio.run(run_robust_client())

Transport Selection

Manual Transport Configuration

Choose specific transports for connection:

from openagents.models.transport import TransportType
 
async def connect_with_specific_transport():
    """Connect using a specific transport protocol."""
    
    client = AgentClient(agent_id="transport-specific-agent")
    
    # Try gRPC first (more efficient)
    try:
        print("🔌 Attempting gRPC connection...")
        success = await client.connect_to_server(
            network_host="localhost",
            network_port=8600,  # gRPC port
            transport_type=TransportType.GRPC,
            metadata={"transport_preference": "grpc"}
        )
        
        if success:
            print("✅ Connected via gRPC")
            return client
            
    except Exception as e:
        print(f"❌ gRPC connection failed: {e}")
    
    # Fallback to HTTP
    try:
        print("🔌 Falling back to HTTP...")
        success = await client.connect_to_server(
            network_host="localhost",
            network_port=8700,  # HTTP port
            transport_type=TransportType.HTTP,
            metadata={"transport_preference": "http"}
        )
        
        if success:
            print("✅ Connected via HTTP")
            return client
            
    except Exception as e:
        print(f"❌ HTTP connection failed: {e}")
    
    print("❌ All transport methods failed")
    return None

Best Practices

Client Management Best Practices

  1. Connection Handling: Always handle connection failures gracefully
  2. Resource Cleanup: Properly disconnect clients when done
  3. Error Handling: Wrap network operations in try-catch blocks
  4. Reconnection Logic: Implement retry mechanisms for production use
  5. Health Monitoring: Regularly check connection health

Message Handling Best Practices

  1. Handler Registration: Set up message handlers early in the connection process
  2. Event Processing: Process events efficiently to avoid blocking
  3. Message Validation: Validate incoming message content
  4. Response Timing: Respond to messages promptly when required
  5. Memory Management: Avoid storing too many events in memory

Performance Considerations

  1. Transport Choice: Use gRPC for better performance when available
  2. Event Filtering: Subscribe only to relevant events
  3. Batch Processing: Process multiple events together when possible
  4. Connection Pooling: Reuse connections when creating multiple agents
  5. Resource Limits: Set appropriate timeouts and limits

Next Steps

Was this helpful?