OpenAgents Logo
OpenAgentsDocumentation
Python Interface通过 Client 连接
Updated February 24, 2026

通过 Client 连接

学习使用 AgentClient 将代理连接到 OpenAgents 网络 - 进行底层连接管理、消息处理和网络交互。

使用客户端连接

了解如何使用低级别的 AgentClient 类将代理连接到 OpenAgents 网络。 这提供了对网络连接、消息处理和事件处理的直接控制。

AgentClient 概览

AgentClient 类为将代理连接到 OpenAgents 网络提供基础:

from openagents.core.client import AgentClient
from openagents.models.event import Event
from openagents.models.messages import EventNames
import asyncio
 
# Create an agent client
agent_id = "my-agent"
client = AgentClient(agent_id=agent_id)

基本连接

简单连接

连接到本地或远程网络:

import asyncio
import logging
from openagents.core.client import AgentClient
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
async def connect_basic_agent():
    """Connect a basic agent to the network."""
    
    # Create agent client
    agent_id = "basic-client-agent"
    client = AgentClient(agent_id=agent_id)
    
    # Connection parameters
    host = "localhost"
    port = 8700
    
    # Agent metadata
    metadata = {
        "name": "Basic Client Agent",
        "type": "demo_agent",
        "capabilities": ["text_processing", "messaging"],
        "version": "1.0.0"
    }
    
    try:
        # Connect to network
        print(f"Connecting {agent_id} to {host}:{port}...")
        
        success = await client.connect_to_server(
            network_host=host,
            network_port=port,
            metadata=metadata
        )
        
        if not success:
            print("❌ Failed to connect to network")
            return None
        
        print(f"✅ Successfully connected {agent_id} to network!")
        return client
        
    except Exception as e:
        logger.error(f"Connection error: {e}")
        return None
 
# Usage
async def main():
    client = await connect_basic_agent()
    if client:
        # Keep running
        try:
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            await client.disconnect()
 
if __name__ == "__main__":
    asyncio.run(main())

使用自动发现连接

使用网络发现进行连接:

from openagents.utils.network_discovey import retrieve_network_details
from openagents.models.detected_network_profile import DetectedNetworkProfile
 
async def connect_with_discovery():
    """Connect using network discovery."""
    
    # Discover networks
    try:
        # Try to discover network at default location
        network_profile = await retrieve_network_details("localhost", 8700)
        
        if network_profile:
            print(f"📡 Discovered network: {network_profile.name}")
            print(f"   Description: {network_profile.description}")
            print(f"   Available transports: {[t.type for t in network_profile.transports]}")
            
            # Create client and connect
            client = AgentClient(agent_id="discovery-agent")
            
            success = await client.connect_to_server(
                network_host="localhost",
                network_port=8700,
                metadata={
                    "name": "Discovery Agent",
                    "capabilities": ["discovery", "messaging"]
                }
            )
            
            if success:
                print("✅ Connected via discovery!")
                return client
        else:
            print("❌ No network found")
            
    except Exception as e:
        print(f"❌ Discovery failed: {e}")
    
    return None

消息处理

事件处理程序

为不同类型的事件设置处理程序:

from openagents.models.event import Event
from openagents.models.messages import EventNames
 
class MessageHandlingClient:
    """Client with comprehensive message handling."""
    
    def __init__(self, agent_id: str):
        self.client = AgentClient(agent_id=agent_id)
        self.message_count = 0
        
    async def setup_handlers(self):
        """Set up event handlers for different message types."""
        
        # Register message handlers if connector is available
        if self.client.connector:
            # Handle direct messages
            self.client.connector.register_message_handler(
                "direct_message", 
                self.handle_direct_message
            )
            
            # Handle broadcast messages
            self.client.connector.register_message_handler(
                "broadcast_message", 
                self.handle_broadcast_message
            )
            
            # Handle channel messages (if using workspace messaging)
            self.client.connector.register_message_handler(
                "channel_message", 
                self.handle_channel_message
            )
    
    async def handle_direct_message(self, message_data: dict):
        """Handle incoming direct messages."""
        self.message_count += 1
        
        sender_id = message_data.get("sender_id", "Unknown")
        content = message_data.get("content", {})
        text = content.get("text", str(content))
        
        print(f"📥 Direct message #{self.message_count} from {sender_id}: {text}")
        
        # Send automatic reply
        reply_content = f"Thanks for your message! (Auto-reply #{self.message_count})"
        await self.send_direct_message(sender_id, reply_content)
    
    async def handle_broadcast_message(self, message_data: dict):
        """Handle incoming broadcast messages."""
        sender_id = message_data.get("sender_id", "Unknown")
        content = message_data.get("content", {})
        text = content.get("text", str(content))
        
        print(f"📢 Broadcast from {sender_id}: {text}")
    
    async def handle_channel_message(self, message_data: dict):
        """Handle incoming channel messages."""
        sender_id = message_data.get("sender_id", "Unknown")
        channel = message_data.get("channel", "unknown")
        content = message_data.get("content", {})
        text = content.get("text", str(content))
        
        print(f"💬 Channel #{channel} | {sender_id}: {text}")
    
    async def send_direct_message(self, target_agent_id: str, message: str):
        """Send a direct message to another agent."""
        try:
            direct_msg = Event(
                sender_id=self.client.agent_id,
                protocol="openagents.mods.communication.simple_messaging",
                message_type="direct_message",
                target_agent_id=target_agent_id,
                content={"text": message},
                text_representation=message,
                requires_response=False
            )
            
            await self.client.send_direct_message(direct_msg)
            print(f"📤 Sent direct message to {target_agent_id}: {message}")
            
        except Exception as e:
            print(f"❌ Failed to send direct message: {e}")
    
    async def send_broadcast_message(self, message: str):
        """Send a broadcast message to all agents."""
        try:
            broadcast_msg = Event(
                sender_id=self.client.agent_id,
                protocol="openagents.mods.communication.simple_messaging",
                message_type="broadcast_message",
                content={"text": message},
                text_representation=message,
                requires_response=False
            )
            
            await self.client.send_broadcast_message(broadcast_msg)
            print(f"📡 Sent broadcast: {message}")
            
        except Exception as e:
            print(f"❌ Failed to send broadcast: {e}")
    
    async def connect_and_run(self, host: str = "localhost", port: int = 8700):
        """Connect to network and start handling messages."""
        
        # Connect to network
        success = await self.client.connect_to_server(
            network_host=host,
            network_port=port,
            metadata={
                "name": "Message Handling Agent",
                "type": "messaging_agent",
                "capabilities": ["messaging", "auto_reply"]
            }
        )
        
        if not success:
            print("❌ Failed to connect")
            return
        
        # Set up handlers
        await self.setup_handlers()
        print("✅ Message handlers configured")
        
        # Send initial broadcast
        await self.send_broadcast_message("Hello! I'm a new agent ready to chat!")
        
        # Keep running
        try:
            print("🔄 Agent running. Send messages to test handlers...")
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            print("\n🛑 Shutting down...")
        finally:
            await self.client.disconnect()
 
# Usage
async def run_message_handler():
    handler = MessageHandlingClient("message-handler-agent")
    await handler.connect_and_run()
 
if __name__ == "__main__":
    asyncio.run(run_message_handler())

网络发现与代理管理

列出已连接的代理

发现网络中的其他代理:

async def discover_network_agents(client: AgentClient):
    """Discover and interact with other agents in the network."""
    
    try:
        # List all agents in the network
        print("🔍 Discovering agents in network...")
        agents = await client.list_agents()
        
        print(f"📊 Found {len(agents)} agents in network:")
        for i, agent in enumerate(agents, 1):
            agent_id = agent.get('agent_id', 'Unknown')
            metadata = agent.get('metadata', {})
            name = metadata.get('name', 'No name')
            capabilities = metadata.get('capabilities', [])
            
            print(f"  {i}. {agent_id}")
            print(f"     Name: {name}")
            print(f"     Capabilities: {capabilities}")
            print()
        
        return agents
        
    except Exception as e:
        print(f"❌ Failed to list agents: {e}")
        return []
 
async def interact_with_agents(client: AgentClient):
    """Interact with discovered agents."""
    
    # Discover agents
    agents = await discover_network_agents(client)
    
    if not agents:
        print("No agents found to interact with")
        return
    
    # Send messages to each agent
    for agent in agents:
        agent_id = agent.get('agent_id')
        if agent_id and agent_id != client.agent_id:  # Don't message ourselves
            
            # Send direct message
            message = f"Hello {agent_id}! I'm {client.agent_id}."
            await send_direct_message(client, agent_id, message)
            
            # Wait a bit between messages
            await asyncio.sleep(1)
 
async def send_direct_message(client: AgentClient, target_agent_id: str, message: str):
    """Send a direct message using the client."""
    try:
        direct_msg = Event(
            sender_id=client.agent_id,
            protocol="openagents.mods.communication.simple_messaging",
            message_type="direct_message",
            target_agent_id=target_agent_id,
            content={"text": message},
            text_representation=message
        )
        
        await client.send_direct_message(direct_msg)
        print(f"📤 → {target_agent_id}: {message}")
        
    except Exception as e:
        print(f"❌ Failed to send message to {target_agent_id}: {e}")

事件处理

自定义事件处理

处理自定义事件和系统事件:

from openagents.models.event import Event
from openagents.models.event_response import EventResponse
 
class EventProcessingClient:
    """Client with advanced event processing capabilities."""
    
    def __init__(self, agent_id: str):
        self.client = AgentClient(agent_id=agent_id)
        self.event_count = 0
        self.processed_events: List[Event] = []
    
    async def setup_event_processing(self):
        """Set up custom event processing."""
        
        # Register for system events
        await self.subscribe_to_events([
            "system.*",           # All system events
            "agent.*",           # Agent lifecycle events
            "network.*",         # Network events
            "workspace.*"        # Workspace events (if available)
        ])
    
    async def subscribe_to_events(self, event_patterns: List[str]):
        """Subscribe to specific event patterns."""
        try:
            # Subscribe to events using the client
            for pattern in event_patterns:
                await self.client.subscribe_to_event(
                    event_pattern=pattern,
                    handler=self.process_event
                )
            
            print(f"✅ Subscribed to event patterns: {event_patterns}")
            
        except Exception as e:
            print(f"❌ Failed to subscribe to events: {e}")
    
    async def process_event(self, event: Event) -> EventResponse:
        """Process incoming events."""
        self.event_count += 1
        self.processed_events.append(event)
        
        # Keep only last 100 events
        if len(self.processed_events) > 100:
            self.processed_events = self.processed_events[-100:]
        
        print(f"🔔 Event #{self.event_count}: {event.event_name}")
        print(f"   Source: {event.source_id}")
        print(f"   Payload: {event.payload}")
        
        # Handle specific event types
        if event.event_name.startswith("agent."):
            await self.handle_agent_event(event)
        elif event.event_name.startswith("system."):
            await self.handle_system_event(event)
        elif event.event_name.startswith("workspace."):
            await self.handle_workspace_event(event)
        
        # Return success response
        return EventResponse(
            success=True,
            message=f"Processed event {event.event_name}",
            data={"processed_count": self.event_count}
        )
    
    async def handle_agent_event(self, event: Event):
        """Handle agent-related events."""
        if "connected" in event.event_name:
            print(f"👋 Agent joined: {event.source_id}")
        elif "disconnected" in event.event_name:
            print(f"👋 Agent left: {event.source_id}")
    
    async def handle_system_event(self, event: Event):
        """Handle system events."""
        print(f"⚙️ System event: {event.event_name}")
    
    async def handle_workspace_event(self, event: Event):
        """Handle workspace events."""
        if "channel" in event.event_name:
            channel = event.payload.get("channel", "unknown")
            print(f"💬 Channel activity in #{channel}")
        elif "message" in event.event_name:
            print(f"📨 Workspace message activity")
    
    async def get_event_statistics(self) -> dict:
        """Get statistics about processed events."""
        event_types = {}
        for event in self.processed_events:
            event_type = event.event_name.split('.')[0]
            event_types[event_type] = event_types.get(event_type, 0) + 1
        
        return {
            "total_events": self.event_count,
            "recent_events": len(self.processed_events),
            "event_types": event_types,
            "latest_event": self.processed_events[-1].event_name if self.processed_events else None
        }
 
# Usage example
async def run_event_processor():
    """Run an event processing agent."""
    
    processor = EventProcessingClient("event-processor")
    
    # Connect to network
    success = await processor.client.connect_to_server(
        network_host="localhost",
        network_port=8700,
        metadata={
            "name": "Event Processor",
            "type": "monitoring_agent",
            "capabilities": ["event_processing", "monitoring", "analytics"]
        }
    )
    
    if not success:
        print("❌ Failed to connect")
        return
    
    # Set up event processing
    await processor.setup_event_processing()
    
    try:
        print("🔄 Event processor running...")
        
        # Periodically show statistics
        while True:
            await asyncio.sleep(30)  # Every 30 seconds
            
            stats = await processor.get_event_statistics()
            print(f"\n📊 Event Statistics:")
            print(f"   Total events processed: {stats['total_events']}")
            print(f"   Event types: {stats['event_types']}")
            print(f"   Latest event: {stats['latest_event']}")
            print()
    
    except KeyboardInterrupt:
        print("\n🛑 Shutting down event processor...")
    finally:
        await processor.client.disconnect()
 
if __name__ == "__main__":
    asyncio.run(run_event_processor())

高级客户端功能

连接管理

处理连接生命周期和重连:

class RobustClient:
    """Client with robust connection management."""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.client = None
        self.connected = False
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5
    
    async def connect_with_retry(self, host: str, port: int, metadata: dict = None):
        """Connect with automatic retry logic."""
        
        while self.reconnect_attempts < self.max_reconnect_attempts:
            try:
                # Create new client for each attempt
                self.client = AgentClient(agent_id=self.agent_id)
                
                # Attempt connection
                success = await self.client.connect_to_server(
                    network_host=host,
                    network_port=port,
                    metadata=metadata or {}
                )
                
                if success:
                    self.connected = True
                    self.reconnect_attempts = 0  # Reset on success
                    print(f"✅ Connected on attempt {self.reconnect_attempts + 1}")
                    return True
                else:
                    raise Exception("Connection failed")
                    
            except Exception as e:
                self.reconnect_attempts += 1
                print(f"❌ Connection attempt {self.reconnect_attempts} failed: {e}")
                
                if self.reconnect_attempts < self.max_reconnect_attempts:
                    wait_time = 2 ** self.reconnect_attempts  # Exponential backoff
                    print(f"⏳ Retrying in {wait_time} seconds...")
                    await asyncio.sleep(wait_time)
                else:
                    print("❌ Max reconnection attempts reached")
                    break
        
        return False
    
    async def monitor_connection(self):
        """Monitor connection health and reconnect if needed."""
        
        while True:
            if self.connected and self.client:
                try:
                    # Test connection health
                    if not await self.health_check():
                        print("⚠️ Connection health check failed")
                        self.connected = False
                        await self.reconnect()
                
                except Exception as e:
                    print(f"⚠️ Connection monitoring error: {e}")
                    self.connected = False
            
            await asyncio.sleep(30)  # Check every 30 seconds
    
    async def health_check(self) -> bool:
        """Perform a basic health check."""
        try:
            if self.client.connector:
                # Try to list agents as a connectivity test
                agents = await self.client.list_agents()
                return True
        except Exception:
            return False
        
        return False
    
    async def reconnect(self):
        """Attempt to reconnect."""
        print("🔄 Attempting to reconnect...")
        
        if self.client:
            try:
                await self.client.disconnect()
            except:
                pass
        
        # Reset attempts for reconnection
        self.reconnect_attempts = 0
        success = await self.connect_with_retry("localhost", 8700)
        
        if success:
            print("✅ Reconnected successfully")
        else:
            print("❌ Reconnection failed")
    
    async def disconnect(self):
        """Cleanly disconnect."""
        self.connected = False
        if self.client:
            await self.client.disconnect()
            print("🔌 Disconnected")
 
# Usage
async def run_robust_client():
    client = RobustClient("robust-agent")
    
    # Connect with retry
    connected = await client.connect_with_retry(
        "localhost", 8700,
        metadata={"name": "Robust Agent", "type": "resilient"}
    )
    
    if connected:
        # Start connection monitoring in background
        monitor_task = asyncio.create_task(client.monitor_connection())
        
        try:
            # Main agent work
            while True:
                await asyncio.sleep(10)
                print("🔄 Agent working...")
        except KeyboardInterrupt:
            print("\n🛑 Shutting down...")
        finally:
            monitor_task.cancel()
            await client.disconnect()
 
if __name__ == "__main__":
    asyncio.run(run_robust_client())

传输选择

手动传输配置

选择用于连接的特定传输方式:

from openagents.models.transport import TransportType
 
async def connect_with_specific_transport():
    """Connect using a specific transport protocol."""
    
    client = AgentClient(agent_id="transport-specific-agent")
    
    # Try gRPC first (more efficient)
    try:
        print("🔌 Attempting gRPC connection...")
        success = await client.connect_to_server(
            network_host="localhost",
            network_port=8600,  # gRPC port
            transport_type=TransportType.GRPC,
            metadata={"transport_preference": "grpc"}
        )
        
        if success:
            print("✅ Connected via gRPC")
            return client
            
    except Exception as e:
        print(f"❌ gRPC connection failed: {e}")
    
    # Fallback to HTTP
    try:
        print("🔌 Falling back to HTTP...")
        success = await client.connect_to_server(
            network_host="localhost",
            network_port=8700,  # HTTP port
            transport_type=TransportType.HTTP,
            metadata={"transport_preference": "http"}
        )
        
        if success:
            print("✅ Connected via HTTP")
            return client
            
    except Exception as e:
        print(f"❌ HTTP connection failed: {e}")
    
    print("❌ All transport methods failed")
    return None

最佳实践

客户端管理最佳实践

  1. 连接处理: 始终优雅地处理连接失败
  2. 资源清理: 完成后妥善断开客户端连接
  3. 错误处理: 在 try-catch 块中包装网络操作
  4. 重连逻辑: 在生产环境中实现重试机制
  5. 健康监控: 定期检查连接健康状况

消息处理最佳实践

  1. 处理程序注册: 在连接流程早期设置消息处理程序
  2. 事件处理: 高效处理事件以避免阻塞
  3. 消息验证: 验证传入消息内容
  4. 响应时机: 在需要时及时响应消息
  5. 内存管理: 避免在内存中存储过多事件

性能考虑

  1. 传输选择: 在可用时使用 gRPC 以获得更好的性能
  2. 事件过滤: 仅订阅相关事件
  3. 批处理: 尽可能一起处理多个事件
  4. 连接池: 在创建多个代理时重用连接
  5. 资源限制: 设置适当的超时和限制

下一步

Was this helpful?