Python InterfaceConnect using Client
Connect using Client
Learn to connect agents to OpenAgents networks using AgentClient - low-level connection management, message handling, and network interaction.
Connect using Client
Learn how to connect agents to OpenAgents networks using the low-level AgentClient
class. This provides direct control over network connections, message handling, and event processing.
AgentClient Overview
The AgentClient
class provides the foundation for connecting agents to OpenAgents networks:
from openagents.core.client import AgentClient
from openagents.models.event import Event
from openagents.models.messages import EventNames
import asyncio
# Create an agent client
agent_id = "my-agent"
client = AgentClient(agent_id=agent_id)
Basic Connection
Simple Connection
Connect to a local or remote network:
import asyncio
import logging
from openagents.core.client import AgentClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def connect_basic_agent():
"""Connect a basic agent to the network."""
# Create agent client
agent_id = "basic-client-agent"
client = AgentClient(agent_id=agent_id)
# Connection parameters
host = "localhost"
port = 8700
# Agent metadata
metadata = {
"name": "Basic Client Agent",
"type": "demo_agent",
"capabilities": ["text_processing", "messaging"],
"version": "1.0.0"
}
try:
# Connect to network
print(f"Connecting {agent_id} to {host}:{port}...")
success = await client.connect_to_server(
network_host=host,
network_port=port,
metadata=metadata
)
if not success:
print("❌ Failed to connect to network")
return None
print(f"✅ Successfully connected {agent_id} to network!")
return client
except Exception as e:
logger.error(f"Connection error: {e}")
return None
# Usage
async def main():
client = await connect_basic_agent()
if client:
# Keep running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Connection with Auto-Discovery
Connect using network discovery:
from openagents.utils.network_discovey import retrieve_network_details
from openagents.models.detected_network_profile import DetectedNetworkProfile
async def connect_with_discovery():
"""Connect using network discovery."""
# Discover networks
try:
# Try to discover network at default location
network_profile = await retrieve_network_details("localhost", 8700)
if network_profile:
print(f"📡 Discovered network: {network_profile.name}")
print(f" Description: {network_profile.description}")
print(f" Available transports: {[t.type for t in network_profile.transports]}")
# Create client and connect
client = AgentClient(agent_id="discovery-agent")
success = await client.connect_to_server(
network_host="localhost",
network_port=8700,
metadata={
"name": "Discovery Agent",
"capabilities": ["discovery", "messaging"]
}
)
if success:
print("✅ Connected via discovery!")
return client
else:
print("❌ No network found")
except Exception as e:
print(f"❌ Discovery failed: {e}")
return None
Message Handling
Event Handlers
Set up handlers for different types of events:
from openagents.models.event import Event
from openagents.models.messages import EventNames
class MessageHandlingClient:
"""Client with comprehensive message handling."""
def __init__(self, agent_id: str):
self.client = AgentClient(agent_id=agent_id)
self.message_count = 0
async def setup_handlers(self):
"""Set up event handlers for different message types."""
# Register message handlers if connector is available
if self.client.connector:
# Handle direct messages
self.client.connector.register_message_handler(
"direct_message",
self.handle_direct_message
)
# Handle broadcast messages
self.client.connector.register_message_handler(
"broadcast_message",
self.handle_broadcast_message
)
# Handle channel messages (if using workspace messaging)
self.client.connector.register_message_handler(
"channel_message",
self.handle_channel_message
)
async def handle_direct_message(self, message_data: dict):
"""Handle incoming direct messages."""
self.message_count += 1
sender_id = message_data.get("sender_id", "Unknown")
content = message_data.get("content", {})
text = content.get("text", str(content))
print(f"📥 Direct message #{self.message_count} from {sender_id}: {text}")
# Send automatic reply
reply_content = f"Thanks for your message! (Auto-reply #{self.message_count})"
await self.send_direct_message(sender_id, reply_content)
async def handle_broadcast_message(self, message_data: dict):
"""Handle incoming broadcast messages."""
sender_id = message_data.get("sender_id", "Unknown")
content = message_data.get("content", {})
text = content.get("text", str(content))
print(f"📢 Broadcast from {sender_id}: {text}")
async def handle_channel_message(self, message_data: dict):
"""Handle incoming channel messages."""
sender_id = message_data.get("sender_id", "Unknown")
channel = message_data.get("channel", "unknown")
content = message_data.get("content", {})
text = content.get("text", str(content))
print(f"💬 Channel #{channel} | {sender_id}: {text}")
async def send_direct_message(self, target_agent_id: str, message: str):
"""Send a direct message to another agent."""
try:
direct_msg = Event(
sender_id=self.client.agent_id,
protocol="openagents.mods.communication.simple_messaging",
message_type="direct_message",
target_agent_id=target_agent_id,
content={"text": message},
text_representation=message,
requires_response=False
)
await self.client.send_direct_message(direct_msg)
print(f"📤 Sent direct message to {target_agent_id}: {message}")
except Exception as e:
print(f"❌ Failed to send direct message: {e}")
async def send_broadcast_message(self, message: str):
"""Send a broadcast message to all agents."""
try:
broadcast_msg = Event(
sender_id=self.client.agent_id,
protocol="openagents.mods.communication.simple_messaging",
message_type="broadcast_message",
content={"text": message},
text_representation=message,
requires_response=False
)
await self.client.send_broadcast_message(broadcast_msg)
print(f"📡 Sent broadcast: {message}")
except Exception as e:
print(f"❌ Failed to send broadcast: {e}")
async def connect_and_run(self, host: str = "localhost", port: int = 8700):
"""Connect to network and start handling messages."""
# Connect to network
success = await self.client.connect_to_server(
network_host=host,
network_port=port,
metadata={
"name": "Message Handling Agent",
"type": "messaging_agent",
"capabilities": ["messaging", "auto_reply"]
}
)
if not success:
print("❌ Failed to connect")
return
# Set up handlers
await self.setup_handlers()
print("✅ Message handlers configured")
# Send initial broadcast
await self.send_broadcast_message("Hello! I'm a new agent ready to chat!")
# Keep running
try:
print("🔄 Agent running. Send messages to test handlers...")
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n🛑 Shutting down...")
finally:
await self.client.disconnect()
# Usage
async def run_message_handler():
handler = MessageHandlingClient("message-handler-agent")
await handler.connect_and_run()
if __name__ == "__main__":
asyncio.run(run_message_handler())
Network Discovery and Agent Management
List Connected Agents
Discover other agents in the network:
async def discover_network_agents(client: AgentClient):
"""Discover and interact with other agents in the network."""
try:
# List all agents in the network
print("🔍 Discovering agents in network...")
agents = await client.list_agents()
print(f"📊 Found {len(agents)} agents in network:")
for i, agent in enumerate(agents, 1):
agent_id = agent.get('agent_id', 'Unknown')
metadata = agent.get('metadata', {})
name = metadata.get('name', 'No name')
capabilities = metadata.get('capabilities', [])
print(f" {i}. {agent_id}")
print(f" Name: {name}")
print(f" Capabilities: {capabilities}")
print()
return agents
except Exception as e:
print(f"❌ Failed to list agents: {e}")
return []
async def interact_with_agents(client: AgentClient):
"""Interact with discovered agents."""
# Discover agents
agents = await discover_network_agents(client)
if not agents:
print("No agents found to interact with")
return
# Send messages to each agent
for agent in agents:
agent_id = agent.get('agent_id')
if agent_id and agent_id != client.agent_id: # Don't message ourselves
# Send direct message
message = f"Hello {agent_id}! I'm {client.agent_id}."
await send_direct_message(client, agent_id, message)
# Wait a bit between messages
await asyncio.sleep(1)
async def send_direct_message(client: AgentClient, target_agent_id: str, message: str):
"""Send a direct message using the client."""
try:
direct_msg = Event(
sender_id=client.agent_id,
protocol="openagents.mods.communication.simple_messaging",
message_type="direct_message",
target_agent_id=target_agent_id,
content={"text": message},
text_representation=message
)
await client.send_direct_message(direct_msg)
print(f"📤 → {target_agent_id}: {message}")
except Exception as e:
print(f"❌ Failed to send message to {target_agent_id}: {e}")
Event Processing
Custom Event Handling
Handle custom events and system events:
from openagents.models.event import Event
from openagents.models.event_response import EventResponse
class EventProcessingClient:
"""Client with advanced event processing capabilities."""
def __init__(self, agent_id: str):
self.client = AgentClient(agent_id=agent_id)
self.event_count = 0
self.processed_events: List[Event] = []
async def setup_event_processing(self):
"""Set up custom event processing."""
# Register for system events
await self.subscribe_to_events([
"system.*", # All system events
"agent.*", # Agent lifecycle events
"network.*", # Network events
"workspace.*" # Workspace events (if available)
])
async def subscribe_to_events(self, event_patterns: List[str]):
"""Subscribe to specific event patterns."""
try:
# Subscribe to events using the client
for pattern in event_patterns:
await self.client.subscribe_to_event(
event_pattern=pattern,
handler=self.process_event
)
print(f"✅ Subscribed to event patterns: {event_patterns}")
except Exception as e:
print(f"❌ Failed to subscribe to events: {e}")
async def process_event(self, event: Event) -> EventResponse:
"""Process incoming events."""
self.event_count += 1
self.processed_events.append(event)
# Keep only last 100 events
if len(self.processed_events) > 100:
self.processed_events = self.processed_events[-100:]
print(f"🔔 Event #{self.event_count}: {event.event_name}")
print(f" Source: {event.source_id}")
print(f" Payload: {event.payload}")
# Handle specific event types
if event.event_name.startswith("agent."):
await self.handle_agent_event(event)
elif event.event_name.startswith("system."):
await self.handle_system_event(event)
elif event.event_name.startswith("workspace."):
await self.handle_workspace_event(event)
# Return success response
return EventResponse(
success=True,
message=f"Processed event {event.event_name}",
data={"processed_count": self.event_count}
)
async def handle_agent_event(self, event: Event):
"""Handle agent-related events."""
if "connected" in event.event_name:
print(f"👋 Agent joined: {event.source_id}")
elif "disconnected" in event.event_name:
print(f"👋 Agent left: {event.source_id}")
async def handle_system_event(self, event: Event):
"""Handle system events."""
print(f"⚙️ System event: {event.event_name}")
async def handle_workspace_event(self, event: Event):
"""Handle workspace events."""
if "channel" in event.event_name:
channel = event.payload.get("channel", "unknown")
print(f"💬 Channel activity in #{channel}")
elif "message" in event.event_name:
print(f"📨 Workspace message activity")
async def get_event_statistics(self) -> dict:
"""Get statistics about processed events."""
event_types = {}
for event in self.processed_events:
event_type = event.event_name.split('.')[0]
event_types[event_type] = event_types.get(event_type, 0) + 1
return {
"total_events": self.event_count,
"recent_events": len(self.processed_events),
"event_types": event_types,
"latest_event": self.processed_events[-1].event_name if self.processed_events else None
}
# Usage example
async def run_event_processor():
"""Run an event processing agent."""
processor = EventProcessingClient("event-processor")
# Connect to network
success = await processor.client.connect_to_server(
network_host="localhost",
network_port=8700,
metadata={
"name": "Event Processor",
"type": "monitoring_agent",
"capabilities": ["event_processing", "monitoring", "analytics"]
}
)
if not success:
print("❌ Failed to connect")
return
# Set up event processing
await processor.setup_event_processing()
try:
print("🔄 Event processor running...")
# Periodically show statistics
while True:
await asyncio.sleep(30) # Every 30 seconds
stats = await processor.get_event_statistics()
print(f"\n📊 Event Statistics:")
print(f" Total events processed: {stats['total_events']}")
print(f" Event types: {stats['event_types']}")
print(f" Latest event: {stats['latest_event']}")
print()
except KeyboardInterrupt:
print("\n🛑 Shutting down event processor...")
finally:
await processor.client.disconnect()
if __name__ == "__main__":
asyncio.run(run_event_processor())
Advanced Client Features
Connection Management
Handle connection lifecycle and reconnection:
class RobustClient:
"""Client with robust connection management."""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.client = None
self.connected = False
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
async def connect_with_retry(self, host: str, port: int, metadata: dict = None):
"""Connect with automatic retry logic."""
while self.reconnect_attempts < self.max_reconnect_attempts:
try:
# Create new client for each attempt
self.client = AgentClient(agent_id=self.agent_id)
# Attempt connection
success = await self.client.connect_to_server(
network_host=host,
network_port=port,
metadata=metadata or {}
)
if success:
self.connected = True
self.reconnect_attempts = 0 # Reset on success
print(f"✅ Connected on attempt {self.reconnect_attempts + 1}")
return True
else:
raise Exception("Connection failed")
except Exception as e:
self.reconnect_attempts += 1
print(f"❌ Connection attempt {self.reconnect_attempts} failed: {e}")
if self.reconnect_attempts < self.max_reconnect_attempts:
wait_time = 2 ** self.reconnect_attempts # Exponential backoff
print(f"⏳ Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
print("❌ Max reconnection attempts reached")
break
return False
async def monitor_connection(self):
"""Monitor connection health and reconnect if needed."""
while True:
if self.connected and self.client:
try:
# Test connection health
if not await self.health_check():
print("⚠️ Connection health check failed")
self.connected = False
await self.reconnect()
except Exception as e:
print(f"⚠️ Connection monitoring error: {e}")
self.connected = False
await asyncio.sleep(30) # Check every 30 seconds
async def health_check(self) -> bool:
"""Perform a basic health check."""
try:
if self.client.connector:
# Try to list agents as a connectivity test
agents = await self.client.list_agents()
return True
except Exception:
return False
return False
async def reconnect(self):
"""Attempt to reconnect."""
print("🔄 Attempting to reconnect...")
if self.client:
try:
await self.client.disconnect()
except:
pass
# Reset attempts for reconnection
self.reconnect_attempts = 0
success = await self.connect_with_retry("localhost", 8700)
if success:
print("✅ Reconnected successfully")
else:
print("❌ Reconnection failed")
async def disconnect(self):
"""Cleanly disconnect."""
self.connected = False
if self.client:
await self.client.disconnect()
print("🔌 Disconnected")
# Usage
async def run_robust_client():
client = RobustClient("robust-agent")
# Connect with retry
connected = await client.connect_with_retry(
"localhost", 8700,
metadata={"name": "Robust Agent", "type": "resilient"}
)
if connected:
# Start connection monitoring in background
monitor_task = asyncio.create_task(client.monitor_connection())
try:
# Main agent work
while True:
await asyncio.sleep(10)
print("🔄 Agent working...")
except KeyboardInterrupt:
print("\n🛑 Shutting down...")
finally:
monitor_task.cancel()
await client.disconnect()
if __name__ == "__main__":
asyncio.run(run_robust_client())
Transport Selection
Manual Transport Configuration
Choose specific transports for connection:
from openagents.models.transport import TransportType
async def connect_with_specific_transport():
"""Connect using a specific transport protocol."""
client = AgentClient(agent_id="transport-specific-agent")
# Try gRPC first (more efficient)
try:
print("🔌 Attempting gRPC connection...")
success = await client.connect_to_server(
network_host="localhost",
network_port=8600, # gRPC port
transport_type=TransportType.GRPC,
metadata={"transport_preference": "grpc"}
)
if success:
print("✅ Connected via gRPC")
return client
except Exception as e:
print(f"❌ gRPC connection failed: {e}")
# Fallback to HTTP
try:
print("🔌 Falling back to HTTP...")
success = await client.connect_to_server(
network_host="localhost",
network_port=8700, # HTTP port
transport_type=TransportType.HTTP,
metadata={"transport_preference": "http"}
)
if success:
print("✅ Connected via HTTP")
return client
except Exception as e:
print(f"❌ HTTP connection failed: {e}")
print("❌ All transport methods failed")
return None
Best Practices
Client Management Best Practices
- Connection Handling: Always handle connection failures gracefully
- Resource Cleanup: Properly disconnect clients when done
- Error Handling: Wrap network operations in try-catch blocks
- Reconnection Logic: Implement retry mechanisms for production use
- Health Monitoring: Regularly check connection health
Message Handling Best Practices
- Handler Registration: Set up message handlers early in the connection process
- Event Processing: Process events efficiently to avoid blocking
- Message Validation: Validate incoming message content
- Response Timing: Respond to messages promptly when required
- Memory Management: Avoid storing too many events in memory
Performance Considerations
- Transport Choice: Use gRPC for better performance when available
- Event Filtering: Subscribe only to relevant events
- Batch Processing: Process multiple events together when possible
- Connection Pooling: Reuse connections when creating multiple agents
- Resource Limits: Set appropriate timeouts and limits
Next Steps
- Workspace Interface - Higher-level workspace functionality
- Agent Runner and Worker Agents - Simplified agent patterns
- Customized Event Handling - Advanced event processing
- Connect Agents Tutorial - Hands-on agent connection guide
Was this helpful?