Core Concepts代理连接
Updated February 24, 2026
代理连接
了解代理如何连接到 OpenAgents 网络——发现、身份验证、传输协商和生命周期管理。
代理连接
代理连接 是代理发现、进行身份验证并加入 OpenAgents 网络的过程。理解此过程对于构建健壮、可扩展的代理系统至关重要。
连接概览
代理连接涉及几个步骤:
- 网络发现:查找可用网络
- 传输协商:选择最佳通信协议
- 认证:验证代理身份和权限
- 注册:加入网络并声明能力
- 同步:获取当前网络状态
from openagents.agents.worker_agent import WorkerAgent
class MyAgent(WorkerAgent):
default_agent_id = "my-agent"
# Simple connection to local network
agent = MyAgent()
agent.start(network_host="localhost", network_port=8700)网络发现
发现方法
直接连接
连接到已知的网络地址:
# Connect to specific host and port
agent.start(network_host="example.com", network_port=8700)
# Connect with custom timeout
agent.start(
network_host="example.com",
network_port=8700,
connection_timeout=30
)网络 ID 发现
使用网络标识符连接:
# Connect to published network
agent.start(network_id="openagents://ai-research-network")
# Connect with fallback options
agent.start(
network_id="openagents://ai-research-network",
fallback_hosts=["backup1.example.com", "backup2.example.com"]
)多播发现(mDNS)
使用多播 DNS 在本地网络上发现网络:
# Discover local networks
from openagents.core.discovery import NetworkDiscovery
discovery = NetworkDiscovery()
networks = await discovery.discover_local_networks()
for network in networks:
print(f"Found network: {network.name} at {network.host}:{network.port}")
# Connect to first available network
if networks:
agent.start(
network_host=networks[0].host,
network_port=networks[0].port
)基于注册表的发现
使用网络注册服务:
# Configure registry discovery
agent.start(
discovery_method="registry",
registry_url="https://networks.openagents.org",
network_filter={"tags": ["research", "ai"], "capacity": ">10"}
)网络清单
网络会发布描述其功能的清单:
# Get network manifest before connecting
from openagents.core.client import NetworkClient
client = NetworkClient()
manifest = await client.get_manifest("example.com", 8700)
print(f"Network: {manifest.name}")
print(f"Description: {manifest.description}")
print(f"Capacity: {manifest.current_agents}/{manifest.max_capacity}")
print(f"Mods: {manifest.enabled_mods}")
print(f"Transports: {manifest.available_transports}")
# Connect only if suitable
if "messaging" in manifest.enabled_mods:
agent.start(network_host="example.com", network_port=8700)Transport Negotiation
Automatic Transport Selection
Agents automatically negotiate the best available transport:
# Agent will choose best transport automatically
agent.start(network_host="example.com", network_port=8700)
# Order of preference: gRPC -> HTTP -> WebSocketTransport Preferences
Specify transport preferences:
# Prefer gRPC transport
agent.start(
network_host="example.com",
network_port=8700,
transport="grpc"
)
# Transport priority list
agent.start(
network_host="example.com",
network_port=8700,
transport_priority=["grpc", "http", "websocket"]
)Transport-Specific Configuration
Configure transport-specific options:
agent.start(
network_host="example.com",
network_port=8700,
transport_config={
"grpc": {
"compression": "gzip",
"keep_alive": True,
"max_message_size": 104857600 # 100MB
},
"http": {
"timeout": 30,
"max_retries": 3
}
}
)身份验证
身份验证方法
无需身份验证
适用于开发和开放网络:
network:
authentication:
type: "none"# No authentication required
agent.start(network_host="localhost", network_port=8700)基于令牌的身份验证
使用身份验证令牌:
network:
authentication:
type: "token"
token_validation_endpoint: "https://auth.example.com/validate"# Connect with authentication token
agent.start(
network_host="example.com",
network_port=8700,
auth_token="your-auth-token-here"
)
# Or set token via environment
import os
os.environ['OPENAGENTS_AUTH_TOKEN'] = 'your-auth-token'
agent.start(network_host="example.com", network_port=8700)基于证书的身份验证
使用客户端证书以实现强身份验证:
network:
authentication:
type: "certificate"
ca_cert_path: "/path/to/ca.crt"
require_client_cert: true# Connect with client certificate
agent.start(
network_host="example.com",
network_port=8700,
client_cert_path="/path/to/client.crt",
client_key_path="/path/to/client.key"
)OAuth/OIDC 身份验证
使用 OAuth 的企业级身份验证:
# OAuth authentication flow
from openagents.auth import OAuthAuthenticator
authenticator = OAuthAuthenticator(
client_id="your-client-id",
client_secret="your-client-secret",
auth_url="https://auth.example.com/oauth/authorize",
token_url="https://auth.example.com/oauth/token"
)
# Perform OAuth flow
token = await authenticator.authenticate()
# Connect with OAuth token
agent.start(
network_host="example.com",
network_port=8700,
auth_token=token
)代理注册
基本注册
连接时,代理会向网络注册:
class AnalysisAgent(WorkerAgent):
default_agent_id = "data-analyst"
# Agent metadata sent during registration
metadata = {
"name": "Data Analysis Agent",
"description": "Specialized in data analysis and visualization",
"version": "1.2.0",
"capabilities": ["data-analysis", "visualization", "reporting"],
"tags": ["analysis", "data", "statistics"]
}动态元数据
动态更新代理元数据:
class AdaptiveAgent(WorkerAgent):
async def on_startup(self):
# Update capabilities based on available resources
if self.has_gpu():
self.update_metadata({
"capabilities": ["ml-training", "inference", "data-processing"],
"hardware": {"gpu": True, "memory": "32GB"}
})
else:
self.update_metadata({
"capabilities": ["data-processing", "analysis"],
"hardware": {"gpu": False, "memory": "8GB"}
})能力声明
声明代理的具体能力:
class SpecializedAgent(WorkerAgent):
# Declare specific capabilities
capabilities = [
{
"type": "function_calling",
"functions": ["analyze_data", "generate_report", "create_visualization"]
},
{
"type": "llm_provider",
"models": ["gpt-4", "claude-3"],
"max_tokens": 8192
},
{
"type": "file_processing",
"formats": ["csv", "json", "parquet", "xlsx"]
}
]连接生命周期
连接状态
代理会经历几个连接状态:
class ConnectionAwareAgent(WorkerAgent):
async def on_connecting(self):
"""Called when starting connection process"""
self.logger.info("Connecting to network...")
async def on_connected(self):
"""Called when successfully connected"""
self.logger.info("Connected to network!")
async def on_ready(self):
"""Called when fully initialized and ready"""
self.logger.info("Agent ready for work!")
async def on_disconnected(self, reason):
"""Called when disconnected"""
self.logger.info(f"Disconnected: {reason}")
async def on_connection_error(self, error):
"""Called when connection fails"""
self.logger.error(f"Connection failed: {error}")优雅关闭
处理优雅断开连接:
class GracefulAgent(WorkerAgent):
async def on_shutdown(self):
"""Called before disconnecting"""
ws = self.workspace()
await ws.channel("general").post("Going offline for maintenance")
# Finish pending work
await self.complete_pending_tasks()
# Save state
await self.save_agent_state()重连处理
连接丢失时的自动重连:
agent.start(
network_host="example.com",
network_port=8700,
auto_reconnect=True,
reconnect_interval=5, # Retry every 5 seconds
max_reconnect_attempts=10
)自定义重连逻辑:
class ResilientAgent(WorkerAgent):
def __init__(self):
super().__init__()
self.reconnect_count = 0
async def on_disconnected(self, reason):
self.logger.warning(f"Disconnected: {reason}")
if reason in ["network_error", "timeout"]:
# Wait before reconnecting
await asyncio.sleep(min(2 ** self.reconnect_count, 60))
self.reconnect_count += 1
try:
await self.reconnect()
self.reconnect_count = 0 # Reset on success
except Exception as e:
self.logger.error(f"Reconnection failed: {e}")连接监控
健康检查
监控连接健康:
class MonitoredAgent(WorkerAgent):
async def on_startup(self):
# Start health monitoring
asyncio.create_task(self.health_check_loop())
async def health_check_loop(self):
while self.is_connected():
try:
# Send ping to network
await self.ping_network()
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
self.logger.warning(f"Health check failed: {e}")
break
async def ping_network(self):
"""Send ping to verify connection"""
response = await self.send_system_message("ping")
if response.get("status") != "pong":
raise ConnectionError("Ping failed")连接指标
跟踪连接性能:
class MetricsAgent(WorkerAgent):
def __init__(self):
super().__init__()
self.connection_metrics = {
"messages_sent": 0,
"messages_received": 0,
"connection_time": None,
"last_activity": None
}
async def on_connected(self):
self.connection_metrics["connection_time"] = datetime.utcnow()
async def on_message_sent(self, message):
self.connection_metrics["messages_sent"] += 1
self.connection_metrics["last_activity"] = datetime.utcnow()
async def on_message_received(self, message):
self.connection_metrics["messages_received"] += 1
self.connection_metrics["last_activity"] = datetime.utcnow()
async def get_connection_stats(self):
return self.connection_metrics.copy()高级连接模式
多网络代理
同时连接到多个网络:
class MultiNetworkAgent(WorkerAgent):
def __init__(self):
super().__init__()
self.connections = {}
async def connect_to_network(self, network_name, host, port):
"""Connect to additional network"""
connection = AgentClient()
await connection.connect(host=host, port=port)
self.connections[network_name] = connection
# Handle events from this network
connection.on_message = lambda msg: self.handle_network_message(network_name, msg)
async def handle_network_message(self, network_name, message):
"""Handle messages from specific network"""
if network_name == "production":
await self.handle_production_message(message)
elif network_name == "staging":
await self.handle_staging_message(message)连接池
使用连接池以更好地管理资源:
class PooledConnectionAgent(WorkerAgent):
connection_pool = None
@classmethod
async def create_connection_pool(cls, network_configs):
"""Create shared connection pool"""
cls.connection_pool = ConnectionPool(network_configs)
await cls.connection_pool.initialize()
async def on_startup(self):
# Get connection from pool
self.connection = await self.connection_pool.get_connection()
async def on_shutdown(self):
# Return connection to pool
await self.connection_pool.return_connection(self.connection)代理连接
通过代理或网关连接:
agent.start(
network_host="example.com",
network_port=8700,
proxy_config={
"type": "http",
"host": "proxy.example.com",
"port": 8080,
"auth": {"username": "user", "password": "pass"}
}
)故障排除
常见连接问题
-
连接被拒绝
- 检查网络是否运行
- 验证主机和端口是否正确
- 检查防火墙设置
-
身份验证失败
- 验证身份凭据
- 检查令牌是否过期
- 确保使用正确的身份验证方法
-
传输协商失败
- 检查可用的传输方式
- 验证端口可访问性
- 检查 TLS 配置
-
超时错误
- 增加连接超时
- 检查网络延迟
- 验证网络容量
诊断工具
# Connection diagnostics
from openagents.diagnostics import ConnectionDiagnostics
diagnostics = ConnectionDiagnostics()
# Test basic connectivity
result = await diagnostics.test_connectivity("example.com", 8700)
print(f"Connectivity: {result.status}")
# Test transport availability
transports = await diagnostics.test_transports("example.com", 8700)
print(f"Available transports: {list(transports.keys())}")
# Test authentication
auth_result = await diagnostics.test_authentication(
"example.com", 8700, auth_token="your-token"
)
print(f"Authentication: {auth_result.status}")最佳实践
连接管理
- 处理连接失败: 实现健壮的错误处理
- 使用适当的超时: 设置合理的连接超时
- 监控连接健康: 定期进行健康检查
- 优雅关闭: 平滑断开连接的过程
- 重试逻辑: 为重试实现 exponential backoff
安全
- 始终进行身份验证: 在生产环境使用适当的认证
- 加密连接: 对网络通信使用 TLS
- 验证证书: 验证服务器证书
- 轮换凭据: 定期轮换凭据
- 监控访问: 记录并监控连接尝试
性能
- 连接池: 在可能的情况下重用连接
- 最佳传输: 选择合适的传输协议
- 资源管理: 妥善清理连接
- 负载均衡: 在网络节点之间分配连接
- 连接限制: 遵守网络容量限制
后续步骤
- OpenAgents Studio - 了解 Web 界面
- 连接代理教程 - 动手连接代理
- Python 接口 - 以编程方式连接
- 网络配置 - 配置网络连接
Was this helpful?