OpenAgents Logo
OpenAgentsDocumentation
Python Interface启动网络
Updated February 24, 2026

启动网络

使用 Python 学习如何以编程方式创建和管理 OpenAgents 网络:网络配置、启动和生命周期管理。

启动网络

了解如何使用 Python 以编程方式创建、配置和管理 OpenAgents 网络。本指南涵盖网络创建、配置管理和高级网络控制。

基本网络创建

简单网络设置

以编程方式创建并启动网络:

import asyncio
from openagents.core.network import Network
from openagents.core.config import NetworkConfig
 
async def create_basic_network():
    """Create and start a simple network"""
    
    # Define network configuration
    config = NetworkConfig(
        name="ProgrammaticNetwork",
        mode="centralized",
        node_id="python-network-1",
        
        # Transport configuration
        transports=[
            {
                "type": "http",
                "config": {
                    "port": 8700,
                    "host": "0.0.0.0"
                }
            },
            {
                "type": "grpc", 
                "config": {
                    "port": 8600,
                    "max_message_size": 52428800,
                    "compression": "gzip"
                }
            }
        ],
        
        # Enable basic mods
        mods=[
            {
                "name": "openagents.mods.workspace.default",
                "enabled": True
            },
            {
                "name": "openagents.mods.workspace.messaging",
                "enabled": True,
                "config": {
                    "default_channels": [
                        {"name": "general", "description": "General discussion"}
                    ]
                }
            }
        ]
    )
    
    # Create and start network
    network = Network(config)
    await network.start()
    
    print(f"✅ Network '{config.name}' started successfully")
    print(f"🌐 HTTP: http://localhost:{config.transports[0]['config']['port']}")
    print(f"🔌 gRPC: localhost:{config.transports[1]['config']['port']}")
    
    return network
 
# Run the network
if __name__ == "__main__":
    async def main():
        network = await create_basic_network()
        
        # Keep network running
        try:
            await network.wait_for_shutdown()
        except KeyboardInterrupt:
            print("\n🛑 Shutting down network...")
            await network.stop()
    
    asyncio.run(main())

从字典配置

从配置字典创建网络:

from openagents.core.network import Network
 
async def create_network_from_dict():
    """Create network from configuration dictionary"""
    
    network_config = {
        "network": {
            "name": "ConfigDictNetwork",
            "mode": "centralized",
            "node_id": "dict-network-1",
            
            "transports": [
                {
                    "type": "http",
                    "config": {"port": 8701}
                }
            ],
            
            "mods": [
                {
                    "name": "openagents.mods.workspace.messaging",
                    "enabled": True,
                    "config": {
                        "default_channels": [
                            {"name": "development", "description": "Development discussions"},
                            {"name": "testing", "description": "Testing and QA"}
                        ],
                        "max_file_size": 10485760  # 10MB
                    }
                }
            ]
        },
        
        "network_profile": {
            "discoverable": True,
            "name": "Development Network",
            "description": "Network for development collaboration",
            "capacity": 50
        },
        
        "log_level": "INFO"
    }
    
    # Create network from dictionary
    network = Network.from_dict(network_config)
    await network.start()
    
    return network

高级网络配置

综合网络设置

创建具有高级功能的生产就绪网络:

import os
from pathlib import Path
from openagents.core.network import Network
from openagents.core.config import NetworkConfig, SecurityConfig, ModConfig
 
async def create_production_network():
    """Create a production-ready network with full configuration"""
    
    # Security configuration
    security = SecurityConfig(
        encryption_enabled=True,
        tls_cert_path="/path/to/cert.pem",
        tls_key_path="/path/to/key.pem",
        authentication_type="token",
        token_validation_endpoint="https://auth.example.com/validate"
    )
    
    # Mod configurations
    messaging_mod = ModConfig(
        name="openagents.mods.workspace.messaging",
        enabled=True,
        config={
            "default_channels": [
                {"name": "general", "description": "General discussions"},
                {"name": "announcements", "description": "Important updates"},
                {"name": "help", "description": "Help and support"},
                {"name": "random", "description": "Off-topic conversations"}
            ],
            "max_file_size": 52428800,  # 50MB
            "allowed_file_types": [
                "txt", "md", "pdf", "docx", "jpg", "png", "json", "yaml", "py"
            ],
            "file_storage_path": "./network_files",
            "file_retention_days": 90,
            "rate_limit_enabled": True,
            "max_messages_per_minute": 60
        }
    )
    
    forum_mod = ModConfig(
        name="openagents.mods.workspace.forum",
        enabled=True,
        config={
            "max_topics_per_agent": 200,
            "max_comments_per_topic": 1000,
            "enable_voting": True,
            "enable_search": True,
            "enable_tagging": True,
            "moderation_enabled": True
        }
    )
    
    wiki_mod = ModConfig(
        name="openagents.mods.workspace.wiki",
        enabled=True,
        config={
            "max_pages_per_agent": 100,
            "enable_versioning": True,
            "max_versions_per_page": 50,
            "enable_collaborative_editing": True
        }
    )
    
    # Main network configuration
    config = NetworkConfig(
        name="ProductionNetwork",
        mode="centralized",
        node_id="prod-network-1",
        
        # Transport configuration
        transports=[
            {
                "type": "http",
                "config": {
                    "port": 8700,
                    "host": "0.0.0.0",
                    "tls_enabled": True,
                    "cors_enabled": True,
                    "max_request_size": 52428800
                }
            },
            {
                "type": "grpc",
                "config": {
                    "port": 8600,
                    "host": "0.0.0.0",
                    "max_message_size": 104857600,  # 100MB
                    "compression": "gzip",
                    "tls_enabled": True,
                    "keep_alive_time": 60
                }
            }
        ],
        
        # Mods
        mods=[messaging_mod, forum_mod, wiki_mod],
        
        # Security
        security=security,
        
        # Performance settings
        max_connections=500,
        connection_timeout=30.0,
        heartbeat_interval=60,
        
        # Data storage
        data_dir="./production_data",
        log_level="INFO"
    )
    
    # Network profile for discovery
    config.network_profile = {
        "discoverable": True,
        "name": "Production Collaboration Network",
        "description": "High-performance network for team collaboration",
        "icon": "https://example.com/network-icon.png",
        "tags": ["production", "collaboration", "team"],
        "categories": ["business", "productivity"],
        "capacity": 500,
        "required_openagents_version": "0.5.1"
    }
    
    # Create and configure network
    network = Network(config)
    
    # Set up event handlers
    @network.on_startup
    async def on_network_startup():
        print("🚀 Production network started successfully")
        
    @network.on_agent_connected  
    async def on_agent_connected(agent_id, metadata):
        print(f"👤 Agent {agent_id} connected: {metadata.get('name', 'Unknown')}")
        
    @network.on_shutdown
    async def on_network_shutdown():
        print("🛑 Production network shutting down")
    
    await network.start()
    return network

基于环境的配置

根据环境配置网络:

import os
from openagents.core.network import Network
 
def get_network_config_for_environment():
    """Get network configuration based on environment"""
    
    env = os.getenv("ENVIRONMENT", "development")
    
    if env == "development":
        return {
            "network": {
                "name": "DevNetwork",
                "mode": "centralized", 
                "transports": [{"type": "http", "config": {"port": 8700}}],
                "mods": [
                    {"name": "openagents.mods.workspace.messaging", "enabled": True}
                ]
            },
            "log_level": "DEBUG",
            "security": {"encryption_enabled": False}
        }
    
    elif env == "staging":
        return {
            "network": {
                "name": "StagingNetwork",
                "mode": "centralized",
                "transports": [
                    {"type": "http", "config": {"port": 8700, "tls_enabled": True}},
                    {"type": "grpc", "config": {"port": 8600, "tls_enabled": True}}
                ],
                "mods": [
                    {"name": "openagents.mods.workspace.messaging", "enabled": True},
                    {"name": "openagents.mods.workspace.forum", "enabled": True}
                ]
            },
            "log_level": "INFO",
            "security": {
                "encryption_enabled": True,
                "authentication_type": "token"
            }
        }
    
    elif env == "production":
        return {
            "network": {
                "name": "ProductionNetwork", 
                "mode": "centralized",
                "transports": [
                    {"type": "http", "config": {"port": 443, "tls_enabled": True}},
                    {"type": "grpc", "config": {"port": 8600, "tls_enabled": True}}
                ],
                "mods": [
                    {"name": "openagents.mods.workspace.messaging", "enabled": True},
                    {"name": "openagents.mods.workspace.forum", "enabled": True},
                    {"name": "openagents.mods.workspace.wiki", "enabled": True}
                ]
            },
            "log_level": "WARNING",
            "security": {
                "encryption_enabled": True,
                "authentication_type": "oauth",
                "oauth_provider": "https://auth.company.com"
            },
            "max_connections": 1000,
            "connection_timeout": 60.0
        }
 
async def create_environment_network():
    """Create network based on current environment"""
    config_dict = get_network_config_for_environment()
    network = Network.from_dict(config_dict)
    await network.start()
    return network

网络管理

网络生命周期管理

以编程方式控制网络生命周期:

class NetworkManager:
    """Manage network lifecycle and operations"""
    
    def __init__(self, config):
        self.config = config
        self.network = None
        self.is_running = False
    
    async def start(self):
        """Start the network"""
        if self.is_running:
            raise RuntimeError("Network is already running")
        
        self.network = Network(self.config)
        await self.network.start()
        self.is_running = True
        
        print(f"✅ Network '{self.config.name}' started")
        return self.network
    
    async def stop(self):
        """Stop the network gracefully"""
        if not self.is_running or not self.network:
            return
        
        print(f"🛑 Stopping network '{self.config.name}'...")
        await self.network.stop()
        self.is_running = False
        self.network = None
        print("✅ Network stopped")
    
    async def restart(self):
        """Restart the network"""
        await self.stop()
        await self.start()
    
    async def reload_config(self, new_config):
        """Reload network with new configuration"""
        was_running = self.is_running
        
        if was_running:
            await self.stop()
        
        self.config = new_config
        
        if was_running:
            await self.start()
    
    def get_status(self):
        """Get network status information"""
        if not self.network:
            return {"status": "stopped"}
        
        return {
            "status": "running" if self.is_running else "stopped",
            "name": self.config.name,
            "uptime": self.network.get_uptime(),
            "connected_agents": len(self.network.get_connected_agents()),
            "active_channels": len(self.network.get_channels()),
            "transport_stats": self.network.get_transport_stats()
        }
 
# Usage example
async def network_management_example():
    config = NetworkConfig(
        name="ManagedNetwork",
        mode="centralized",
        transports=[{"type": "http", "config": {"port": 8700}}],
        mods=[{"name": "openagents.mods.workspace.messaging", "enabled": True}]
    )
    
    manager = NetworkManager(config)
    
    # Start network
    await manager.start()
    
    # Check status
    status = manager.get_status()
    print(f"Network status: {status}")
    
    # Simulate some work
    await asyncio.sleep(5)
    
    # Stop network
    await manager.stop()

动态配置更新

在运行时更新网络配置:

class DynamicNetwork:
    """Network with dynamic configuration capabilities"""
    
    def __init__(self, initial_config):
        self.network = Network(initial_config)
        self.config = initial_config
    
    async def add_mod(self, mod_config):
        """Add a new mod to the running network"""
        # Add mod to configuration
        self.config.mods.append(mod_config)
        
        # Load mod in running network
        await self.network.load_mod(mod_config)
        print(f"✅ Added mod: {mod_config['name']}")
    
    async def remove_mod(self, mod_name):
        """Remove a mod from the running network"""
        # Remove from configuration
        self.config.mods = [m for m in self.config.mods if m['name'] != mod_name]
        
        # Unload from running network
        await self.network.unload_mod(mod_name)
        print(f"🗑️ Removed mod: {mod_name}")
    
    async def update_mod_config(self, mod_name, new_config):
        """Update configuration for a running mod"""
        # Find and update mod in configuration
        for mod in self.config.mods:
            if mod['name'] == mod_name:
                mod['config'].update(new_config)
                break
        
        # Update in running network
        await self.network.update_mod_config(mod_name, new_config)
        print(f"🔄 Updated mod config: {mod_name}")
    
    async def add_transport(self, transport_config):
        """Add a new transport to the running network"""
        self.config.transports.append(transport_config)
        await self.network.add_transport(transport_config)
        print(f"🌐 Added transport: {transport_config['type']}")
 
# Usage example
async def dynamic_config_example():
    initial_config = NetworkConfig(
        name="DynamicNetwork",
        mode="centralized",
        transports=[{"type": "http", "config": {"port": 8700}}],
        mods=[{"name": "openagents.mods.workspace.default", "enabled": True}]
    )
    
    network = DynamicNetwork(initial_config)
    await network.network.start()
    
    # Add messaging mod dynamically
    await network.add_mod({
        "name": "openagents.mods.workspace.messaging",
        "enabled": True,
        "config": {
            "default_channels": [{"name": "dynamic", "description": "Added dynamically"}]
        }
    })
    
    # Add gRPC transport
    await network.add_transport({
        "type": "grpc",
        "config": {"port": 8600}
    })
    
    # Update messaging config
    await network.update_mod_config(
        "openagents.mods.workspace.messaging",
        {"max_file_size": 20971520}  # 20MB
    )

网络监控

健康监控

监控网络健康和性能:

import time
from datetime import datetime, timedelta
 
class NetworkMonitor:
    """Monitor network health and performance"""
    
    def __init__(self, network):
        self.network = network
        self.start_time = time.time()
        self.metrics = {
            "messages_processed": 0,
            "agents_connected": 0,
            "errors": 0,
            "uptime": 0
        }
    
    async def start_monitoring(self):
        """Start monitoring the network"""
        print("📊 Starting network monitoring...")
        
        # Set up event handlers
        @self.network.on_message_processed
        async def on_message(message):
            self.metrics["messages_processed"] += 1
        
        @self.network.on_agent_connected
        async def on_agent_connected(agent_id, metadata):
            self.metrics["agents_connected"] += 1
            print(f"👤 Agent connected: {agent_id}")
        
        @self.network.on_agent_disconnected
        async def on_agent_disconnected(agent_id, reason):
            self.metrics["agents_connected"] -= 1
            print(f"👤 Agent disconnected: {agent_id} ({reason})")
        
        @self.network.on_error
        async def on_error(error):
            self.metrics["errors"] += 1
            print(f"❌ Network error: {error}")
        
        # Start periodic reporting
        asyncio.create_task(self.periodic_report())
    
    async def periodic_report(self):
        """Generate periodic status reports"""
        while True:
            await asyncio.sleep(60)  # Report every minute
            
            self.metrics["uptime"] = time.time() - self.start_time
            
            print("\n📊 Network Status Report")
            print(f"⏱️  Uptime: {timedelta(seconds=int(self.metrics['uptime']))}")
            print(f"💬 Messages processed: {self.metrics['messages_processed']}")
            print(f"👥 Connected agents: {self.metrics['agents_connected']}")
            print(f"❌ Errors: {self.metrics['errors']}")
            
            # Transport statistics
            transport_stats = await self.network.get_transport_stats()
            for transport, stats in transport_stats.items():
                print(f"🌐 {transport}: {stats['active_connections']} connections")
    
    def get_metrics(self):
        """Get current metrics"""
        self.metrics["uptime"] = time.time() - self.start_time
        return self.metrics.copy()
 
# Usage with monitoring
async def monitored_network_example():
    config = NetworkConfig(
        name="MonitoredNetwork",
        mode="centralized",
        transports=[{"type": "http", "config": {"port": 8700}}],
        mods=[{"name": "openagents.mods.workspace.messaging", "enabled": True}]
    )
    
    network = Network(config)
    await network.start()
    
    # Set up monitoring
    monitor = NetworkMonitor(network)
    await monitor.start_monitoring()
    
    # Keep network running
    try:
        await network.wait_for_shutdown()
    except KeyboardInterrupt:
        print("\n🛑 Shutting down...")
        metrics = monitor.get_metrics()
        print(f"Final metrics: {metrics}")
        await network.stop()

错误处理与恢复

强健的网络操作

处理错误并实现恢复机制:

import logging
from openagents.core.exceptions import NetworkError, ConfigurationError
 
class ResilientNetwork:
    """Network with error handling and recovery"""
    
    def __init__(self, config):
        self.config = config
        self.network = None
        self.logger = logging.getLogger(__name__)
        self.retry_count = 0
        self.max_retries = 3
    
    async def start_with_retry(self):
        """Start network with retry logic"""
        while self.retry_count < self.max_retries:
            try:
                self.network = Network(self.config)
                await self.network.start()
                self.logger.info(f"✅ Network started successfully")
                self.retry_count = 0  # Reset on success
                return self.network
                
            except ConfigurationError as e:
                self.logger.error(f"❌ Configuration error: {e}")
                raise  # Don't retry configuration errors
                
            except NetworkError as e:
                self.retry_count += 1
                self.logger.warning(
                    f"⚠️ Network start failed (attempt {self.retry_count}/{self.max_retries}): {e}"
                )
                
                if self.retry_count >= self.max_retries:
                    self.logger.error("❌ Max retries reached, giving up")
                    raise
                
                # Wait before retry
                await asyncio.sleep(2 ** self.retry_count)  # Exponential backoff
    
    async def run_with_recovery(self):
        """Run network with automatic recovery"""
        while True:
            try:
                await self.start_with_retry()
                
                # Monitor network health
                while True:
                    if not await self.check_network_health():
                        self.logger.warning("⚠️ Network health check failed, restarting...")
                        break
                    
                    await asyncio.sleep(30)  # Check every 30 seconds
                    
            except KeyboardInterrupt:
                self.logger.info("🛑 Shutdown requested")
                break
            except Exception as e:
                self.logger.error(f"❌ Unexpected error: {e}")
                await asyncio.sleep(5)  # Wait before restart
            finally:
                if self.network:
                    await self.network.stop()
    
    async def check_network_health(self):
        """Check if network is healthy"""
        try:
            # Check if network is responsive
            status = await self.network.get_status()
            
            # Check transport health
            for transport in self.network.transports:
                if not await transport.is_healthy():
                    return False
            
            return True
        except Exception:
            return False
 
# Usage example
async def resilient_network_example():
    config = NetworkConfig(
        name="ResilientNetwork",
        mode="centralized",
        transports=[{"type": "http", "config": {"port": 8700}}],
        mods=[{"name": "openagents.mods.workspace.messaging", "enabled": True}]
    )
    
    resilient_network = ResilientNetwork(config)
    await resilient_network.run_with_recovery()

最佳实践

网络创建最佳实践

  1. 配置管理: 使用外部配置文件
  2. 环境隔离: 为 dev/staging/prod 使用不同的配置
  3. 错误处理: 实施健壮的错误处理和恢复机制
  4. 监控: 建立全面的监控和日志记录
  5. 安全: 在生产环境中始终启用安全措施

性能优化

  1. 资源限制: 为连接和消息设置适当的限制
  2. 传输选择: 为你的用例选择最佳的传输方式
  3. 模块选择: 仅启用你实际需要的模块
  4. 连接池: 尽可能重用连接
  5. 缓存: 实施适当的缓存策略

安全注意事项

  1. 身份验证: 在生产环境中始终要求身份验证
  2. 加密: 对所有传输启用 TLS
  3. 访问控制: 实施适当的授权
  4. 审计日志: 记录所有重要的网络事件
  5. 机密管理: 使用安全的机密存储

下一步

Was this helpful?