Publish Your Network
Learn how to deploy, scale, and share your OpenAgents networks - from local development to production deployment and public distribution.
Publish Your Network
This tutorial covers the complete process of taking your OpenAgents network from development to production, including deployment strategies, scaling considerations, and sharing your network with others.
Table of Contents
- Preparation for Production
- Deployment Strategies
- Network Configuration
- Security and Authentication
- Monitoring and Observability
- Scaling and Performance
- Distribution and Sharing
Preparation for Production
Environment Configuration
Create production-ready configuration files:
production.yaml
network:
name: "MyProductionNetwork"
description: "A production-ready OpenAgents network"
version: "1.0.0"
environment: "production"
transports:
http:
host: "0.0.0.0"
port: 8700
ssl:
enabled: true
cert_file: "/path/to/cert.pem"
key_file: "/path/to/key.pem"
grpc:
host: "0.0.0.0"
port: 8600
ssl:
enabled: true
cert_file: "/path/to/cert.pem"
key_file: "/path/to/key.pem"
mods:
- name: "workspace.messaging"
config:
max_message_size: 10485760 # 10MB
rate_limiting:
enabled: true
requests_per_minute: 60
- name: "workspace.forum"
config:
moderation:
enabled: true
auto_moderate: true
- name: "workspace.wiki"
config:
backup:
enabled: true
interval: "24h"
security:
authentication:
required: true
providers:
- type: "jwt"
secret_key: "${JWT_SECRET_KEY}"
- type: "oauth2"
client_id: "${OAUTH_CLIENT_ID}"
client_secret: "${OAUTH_CLIENT_SECRET}"
authorization:
enabled: true
default_role: "user"
admin_users:
- "admin@example.com"
rate_limiting:
enabled: true
global_limit: 1000
per_user_limit: 100
storage:
type: "postgresql"
connection_string: "${DATABASE_URL}"
backup:
enabled: true
schedule: "0 2 * * *" # Daily at 2 AM
retention_days: 30
logging:
level: "info"
format: "json"
outputs:
- type: "file"
path: "/var/log/openagents/network.log"
rotation:
max_size: "100MB"
max_files: 10
- type: "stdout"
format: "json"
monitoring:
metrics:
enabled: true
endpoint: "/metrics"
port: 9090
health_check:
enabled: true
endpoint: "/health"
interval: "30s"
tracing:
enabled: true
jaeger_endpoint: "${JAEGER_ENDPOINT}"
clustering:
enabled: true
discovery:
type: "consul"
addresses:
- "consul1.example.com:8500"
- "consul2.example.com:8500"
Environment Variables
Create a .env.production
file:
# Network Configuration
NETWORK_NAME=MyProductionNetwork
NETWORK_HOST=my-network.example.com
NETWORK_PORT=8700
# Security
JWT_SECRET_KEY=your-super-secret-jwt-key-here
OAUTH_CLIENT_ID=your-oauth-client-id
OAUTH_CLIENT_SECRET=your-oauth-client-secret
# Database
DATABASE_URL=postgresql://user:password@db.example.com:5432/openagents_prod
# External Services
REDIS_URL=redis://redis.example.com:6379
ELASTICSEARCH_URL=https://elasticsearch.example.com:9200
# Monitoring
JAEGER_ENDPOINT=http://jaeger.example.com:14268/api/traces
PROMETHEUS_ENDPOINT=http://prometheus.example.com:9090
# Email (for notifications)
SMTP_HOST=smtp.example.com
SMTP_PORT=587
SMTP_USER=notifications@example.com
SMTP_PASSWORD=your-smtp-password
# Cloud Storage (for file uploads)
AWS_ACCESS_KEY_ID=your-aws-access-key
AWS_SECRET_ACCESS_KEY=your-aws-secret-key
AWS_S3_BUCKET=openagents-files
AWS_REGION=us-east-1
# SSL Certificates
SSL_CERT_PATH=/etc/ssl/certs/openagents.pem
SSL_KEY_PATH=/etc/ssl/private/openagents.key
Production Startup Script
Create start-production.py
:
#!/usr/bin/env python3
import asyncio
import os
import logging
import signal
import sys
from openagents.network.network_manager import NetworkManager
from openagents.config.network_config import NetworkConfig
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/openagents/network.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class ProductionNetworkManager:
def __init__(self):
self.network_manager = None
self.running = False
async def start(self):
"""Start the production network"""
try:
# Load configuration
config = NetworkConfig.from_file("production.yaml")
# Validate environment variables
self.validate_environment()
# Initialize network manager
self.network_manager = NetworkManager(config)
# Setup signal handlers for graceful shutdown
self.setup_signal_handlers()
# Start the network
logger.info("Starting OpenAgents production network...")
await self.network_manager.start()
self.running = True
logger.info("Production network started successfully!")
# Keep running until shutdown signal
while self.running:
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Failed to start production network: {e}")
raise
def validate_environment(self):
"""Validate required environment variables"""
required_vars = [
'JWT_SECRET_KEY',
'DATABASE_URL',
'SSL_CERT_PATH',
'SSL_KEY_PATH'
]
missing_vars = []
for var in required_vars:
if not os.getenv(var):
missing_vars.append(var)
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
def setup_signal_handlers(self):
"""Setup graceful shutdown signal handlers"""
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, initiating graceful shutdown...")
asyncio.create_task(self.shutdown())
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
async def shutdown(self):
"""Gracefully shutdown the network"""
logger.info("Shutting down production network...")
if self.network_manager:
await self.network_manager.shutdown()
self.running = False
logger.info("Production network shutdown complete")
async def main():
manager = ProductionNetworkManager()
await manager.start()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Shutdown requested by user")
except Exception as e:
logger.error(f"Production network failed: {e}")
sys.exit(1)
Deployment Strategies
Docker Deployment
Create Dockerfile
:
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m -u 1000 openagents && chown -R openagents:openagents /app
USER openagents
# Expose ports
EXPOSE 8700 8600 9090
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8700/health || exit 1
# Start the application
CMD ["python", "start-production.py"]
Create docker-compose.yml
:
version: '3.8'
services:
openagents-network:
build: .
ports:
- "8700:8700"
- "8600:8600"
- "9090:9090"
environment:
- NETWORK_NAME=DockerNetwork
- DATABASE_URL=postgresql://postgres:password@db:5432/openagents
- REDIS_URL=redis://redis:6379
volumes:
- ./production.yaml:/app/production.yaml
- ./logs:/var/log/openagents
- ./ssl:/etc/ssl/certs
depends_on:
- db
- redis
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8700/health"]
interval: 30s
timeout: 10s
retries: 3
db:
image: postgres:15
environment:
- POSTGRES_DB=openagents
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
- ./backups:/backups
restart: unless-stopped
redis:
image: redis:7
volumes:
- redis_data:/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/ssl/certs
depends_on:
- openagents-network
restart: unless-stopped
volumes:
postgres_data:
redis_data:
Kubernetes Deployment
Create k8s-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: openagents-network
labels:
app: openagents-network
spec:
replicas: 3
selector:
matchLabels:
app: openagents-network
template:
metadata:
labels:
app: openagents-network
spec:
containers:
- name: openagents
image: openagents/network:latest
ports:
- containerPort: 8700
- containerPort: 8600
- containerPort: 9090
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: openagents-secrets
key: database-url
- name: JWT_SECRET_KEY
valueFrom:
secretKeyRef:
name: openagents-secrets
key: jwt-secret
livenessProbe:
httpGet:
path: /health
port: 8700
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8700
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: openagents-service
spec:
selector:
app: openagents-network
ports:
- name: http
port: 8700
targetPort: 8700
- name: grpc
port: 8600
targetPort: 8600
- name: metrics
port: 9090
targetPort: 9090
type: LoadBalancer
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: openagents-ingress
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
tls:
- hosts:
- my-network.example.com
secretName: openagents-tls
rules:
- host: my-network.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: openagents-service
port:
number: 8700
Cloud Platform Deployment
AWS ECS Deployment
Create ecs-task-definition.json
:
{
"family": "openagents-network",
"networkMode": "awsvpc",
"requiresCompatibilities": ["FARGATE"],
"cpu": "512",
"memory": "1024",
"executionRoleArn": "arn:aws:iam::account:role/ecsTaskExecutionRole",
"taskRoleArn": "arn:aws:iam::account:role/ecsTaskRole",
"containerDefinitions": [
{
"name": "openagents",
"image": "your-account.dkr.ecr.region.amazonaws.com/openagents:latest",
"portMappings": [
{
"containerPort": 8700,
"protocol": "tcp"
},
{
"containerPort": 8600,
"protocol": "tcp"
}
],
"environment": [
{
"name": "NETWORK_NAME",
"value": "AWSNetwork"
}
],
"secrets": [
{
"name": "DATABASE_URL",
"valueFrom": "arn:aws:secretsmanager:region:account:secret:openagents-db-url"
},
{
"name": "JWT_SECRET_KEY",
"valueFrom": "arn:aws:secretsmanager:region:account:secret:openagents-jwt-secret"
}
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/openagents-network",
"awslogs-region": "us-east-1",
"awslogs-stream-prefix": "ecs"
}
},
"healthCheck": {
"command": [
"CMD-SHELL",
"curl -f http://localhost:8700/health || exit 1"
],
"interval": 30,
"timeout": 5,
"retries": 3
}
}
]
}
Network Configuration
Load Balancing and High Availability
Create nginx.conf
for load balancing:
upstream openagents_backend {
least_conn;
server openagents-1:8700 max_fails=3 fail_timeout=30s;
server openagents-2:8700 max_fails=3 fail_timeout=30s;
server openagents-3:8700 max_fails=3 fail_timeout=30s;
}
upstream openagents_grpc {
server openagents-1:8600;
server openagents-2:8600;
server openagents-3:8600;
}
server {
listen 80;
server_name my-network.example.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name my-network.example.com;
ssl_certificate /etc/ssl/certs/openagents.pem;
ssl_certificate_key /etc/ssl/private/openagents.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512;
# HTTP routes
location / {
proxy_pass http://openagents_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket support
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
# gRPC routes
location /grpc {
grpc_pass grpc://openagents_grpc;
grpc_set_header Host $host;
grpc_set_header X-Real-IP $remote_addr;
}
# Health check endpoint
location /health {
access_log off;
proxy_pass http://openagents_backend/health;
}
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req zone=api burst=20 nodelay;
}
Database Clustering
Configure PostgreSQL clustering with postgresql.conf
:
# Connection settings
listen_addresses = '*'
port = 5432
max_connections = 200
# Memory settings
shared_buffers = 256MB
effective_cache_size = 1GB
work_mem = 4MB
maintenance_work_mem = 64MB
# WAL settings for replication
wal_level = replica
max_wal_senders = 3
max_replication_slots = 3
archive_mode = on
archive_command = 'cp %p /var/lib/postgresql/archive/%f'
# Performance settings
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
# Logging
logging_collector = on
log_directory = 'pg_log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_statement = 'all'
log_min_duration_statement = 1000
Security and Authentication
JWT Authentication Setup
Create auth.py
:
import jwt
import os
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
class AuthenticationManager:
def __init__(self):
self.secret_key = os.getenv('JWT_SECRET_KEY')
self.algorithm = 'HS256'
self.token_expiry = timedelta(hours=24)
def generate_token(self, user_id: str, permissions: list = None) -> str:
"""Generate JWT token for user"""
payload = {
'user_id': user_id,
'permissions': permissions or [],
'exp': datetime.utcnow() + self.token_expiry,
'iat': datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Verify and decode JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def has_permission(self, token: str, required_permission: str) -> bool:
"""Check if token has required permission"""
payload = self.verify_token(token)
if not payload:
return False
permissions = payload.get('permissions', [])
return required_permission in permissions or 'admin' in permissions
# Middleware for FastAPI/Flask
async def auth_middleware(request, call_next):
"""Authentication middleware"""
# Skip auth for health checks
if request.url.path in ['/health', '/metrics']:
return await call_next(request)
# Get token from header
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return JSONResponse(
status_code=401,
content={"error": "Missing or invalid authorization header"}
)
token = auth_header.split(' ')[1]
auth_manager = AuthenticationManager()
payload = auth_manager.verify_token(token)
if not payload:
return JSONResponse(
status_code=401,
content={"error": "Invalid or expired token"}
)
# Add user info to request
request.state.user_id = payload['user_id']
request.state.permissions = payload['permissions']
return await call_next(request)
Rate Limiting and DDoS Protection
Create rate_limiter.py
:
import asyncio
import time
from collections import defaultdict
from typing import Dict, Tuple
class RateLimiter:
def __init__(self):
self.requests: Dict[str, list] = defaultdict(list)
self.blocked_ips: Dict[str, float] = {}
self.cleanup_task = None
async def is_allowed(self, identifier: str, limit: int = 60, window: int = 60) -> Tuple[bool, int]:
"""Check if request is allowed under rate limit"""
current_time = time.time()
# Check if IP is blocked
if identifier in self.blocked_ips:
if current_time < self.blocked_ips[identifier]:
remaining_time = int(self.blocked_ips[identifier] - current_time)
return False, remaining_time
else:
del self.blocked_ips[identifier]
# Clean old requests
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if current_time - req_time < window
]
# Check limit
if len(self.requests[identifier]) >= limit:
# Block IP for 5 minutes on rate limit exceeded
self.blocked_ips[identifier] = current_time + 300
return False, 300
# Add current request
self.requests[identifier].append(current_time)
# Calculate remaining requests
remaining = limit - len(self.requests[identifier])
return True, remaining
async def cleanup_old_entries(self):
"""Periodically clean up old entries"""
while True:
current_time = time.time()
# Clean old requests
for identifier in list(self.requests.keys()):
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if current_time - req_time < 3600 # Keep 1 hour of history
]
if not self.requests[identifier]:
del self.requests[identifier]
# Clean expired blocked IPs
expired_blocks = [
ip for ip, block_time in self.blocked_ips.items()
if current_time > block_time
]
for ip in expired_blocks:
del self.blocked_ips[ip]
await asyncio.sleep(300) # Clean up every 5 minutes
Monitoring and Observability
Metrics Collection
Create metrics.py
:
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import psutil
import asyncio
class MetricsCollector:
def __init__(self):
# Define metrics
self.request_count = Counter(
'openagents_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
self.request_duration = Histogram(
'openagents_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
self.active_connections = Gauge(
'openagents_active_connections',
'Number of active connections'
)
self.agent_count = Gauge(
'openagents_connected_agents',
'Number of connected agents'
)
self.memory_usage = Gauge(
'openagents_memory_usage_bytes',
'Memory usage in bytes'
)
self.cpu_usage = Gauge(
'openagents_cpu_usage_percent',
'CPU usage percentage'
)
# Start system metrics collection
asyncio.create_task(self.collect_system_metrics())
def record_request(self, method: str, endpoint: str, status: int, duration: float):
"""Record request metrics"""
self.request_count.labels(method=method, endpoint=endpoint, status=status).inc()
self.request_duration.labels(method=method, endpoint=endpoint).observe(duration)
def set_active_connections(self, count: int):
"""Update active connections count"""
self.active_connections.set(count)
def set_agent_count(self, count: int):
"""Update connected agents count"""
self.agent_count.set(count)
async def collect_system_metrics(self):
"""Collect system metrics periodically"""
while True:
try:
# Memory usage
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
await asyncio.sleep(30) # Collect every 30 seconds
except Exception as e:
print(f"Error collecting system metrics: {e}")
await asyncio.sleep(60)
# Start metrics server
def start_metrics_server(port: int = 9090):
"""Start Prometheus metrics server"""
start_http_server(port)
print(f"Metrics server started on port {port}")
Health Checks
Create health.py
:
import asyncio
import aiohttp
import psutil
from datetime import datetime
from typing import Dict, Any
class HealthChecker:
def __init__(self):
self.checks = {
'database': self.check_database,
'redis': self.check_redis,
'disk_space': self.check_disk_space,
'memory': self.check_memory,
'external_apis': self.check_external_apis
}
self.last_check = {}
self.check_interval = 60 # seconds
async def health_check(self) -> Dict[str, Any]:
"""Perform comprehensive health check"""
results = {
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'checks': {}
}
for check_name, check_func in self.checks.items():
try:
check_result = await check_func()
results['checks'][check_name] = {
'status': 'healthy' if check_result['healthy'] else 'unhealthy',
'details': check_result.get('details', {})
}
if not check_result['healthy']:
results['status'] = 'unhealthy'
except Exception as e:
results['checks'][check_name] = {
'status': 'error',
'error': str(e)
}
results['status'] = 'unhealthy'
return results
async def check_database(self) -> Dict[str, Any]:
"""Check database connectivity and performance"""
try:
# Implement database connectivity check
# This is a placeholder - implement actual database check
return {
'healthy': True,
'details': {
'response_time_ms': 10,
'connection_pool_size': 10
}
}
except Exception as e:
return {'healthy': False, 'error': str(e)}
async def check_redis(self) -> Dict[str, Any]:
"""Check Redis connectivity"""
try:
# Implement Redis connectivity check
return {
'healthy': True,
'details': {
'response_time_ms': 5,
'memory_usage_mb': 100
}
}
except Exception as e:
return {'healthy': False, 'error': str(e)}
async def check_disk_space(self) -> Dict[str, Any]:
"""Check available disk space"""
try:
disk_usage = psutil.disk_usage('/')
free_percent = (disk_usage.free / disk_usage.total) * 100
return {
'healthy': free_percent > 10, # Alert if less than 10% free
'details': {
'free_percent': round(free_percent, 2),
'free_gb': round(disk_usage.free / (1024**3), 2),
'total_gb': round(disk_usage.total / (1024**3), 2)
}
}
except Exception as e:
return {'healthy': False, 'error': str(e)}
async def check_memory(self) -> Dict[str, Any]:
"""Check memory usage"""
try:
memory = psutil.virtual_memory()
return {
'healthy': memory.percent < 90, # Alert if over 90% used
'details': {
'percent_used': memory.percent,
'available_gb': round(memory.available / (1024**3), 2),
'total_gb': round(memory.total / (1024**3), 2)
}
}
except Exception as e:
return {'healthy': False, 'error': str(e)}
async def check_external_apis(self) -> Dict[str, Any]:
"""Check external API connectivity"""
try:
# Test external service connectivity
async with aiohttp.ClientSession() as session:
async with session.get('https://api.github.com', timeout=10) as response:
return {
'healthy': response.status == 200,
'details': {
'response_status': response.status,
'response_time_ms': 100 # Placeholder
}
}
except Exception as e:
return {'healthy': False, 'error': str(e)}
Scaling and Performance
Horizontal Scaling Configuration
Create scaling.py
:
import asyncio
import os
from openagents.network.cluster_manager import ClusterManager
class AutoScaler:
def __init__(self):
self.cluster_manager = ClusterManager()
self.min_instances = int(os.getenv('MIN_INSTANCES', '2'))
self.max_instances = int(os.getenv('MAX_INSTANCES', '10'))
self.target_cpu_percent = int(os.getenv('TARGET_CPU_PERCENT', '70'))
self.scale_up_threshold = int(os.getenv('SCALE_UP_THRESHOLD', '80'))
self.scale_down_threshold = int(os.getenv('SCALE_DOWN_THRESHOLD', '50'))
async def monitor_and_scale(self):
"""Monitor metrics and scale instances accordingly"""
while True:
try:
# Get current metrics
metrics = await self.get_cluster_metrics()
current_instances = metrics['instance_count']
avg_cpu = metrics['average_cpu_percent']
avg_memory = metrics['average_memory_percent']
# Determine scaling action
if avg_cpu > self.scale_up_threshold and current_instances < self.max_instances:
await self.scale_up()
elif avg_cpu < self.scale_down_threshold and current_instances > self.min_instances:
await self.scale_down()
await asyncio.sleep(60) # Check every minute
except Exception as e:
print(f"Auto-scaling error: {e}")
await asyncio.sleep(60)
async def scale_up(self):
"""Add new instance to cluster"""
print("Scaling up: Adding new instance")
await self.cluster_manager.add_instance()
async def scale_down(self):
"""Remove instance from cluster"""
print("Scaling down: Removing instance")
await self.cluster_manager.remove_instance()
async def get_cluster_metrics(self):
"""Get cluster-wide metrics"""
return await self.cluster_manager.get_metrics()
Caching Strategy
Create caching.py
:
import redis.asyncio as redis
import json
import hashlib
from typing import Any, Optional
import os
class CacheManager:
def __init__(self):
self.redis_client = redis.from_url(os.getenv('REDIS_URL', 'redis://localhost:6379'))
self.default_ttl = 3600 # 1 hour
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
value = await self.redis_client.get(key)
if value:
return json.loads(value)
return None
except Exception as e:
print(f"Cache get error: {e}")
return None
async def set(self, key: str, value: Any, ttl: int = None) -> bool:
"""Set value in cache"""
try:
ttl = ttl or self.default_ttl
serialized_value = json.dumps(value)
await self.redis_client.setex(key, ttl, serialized_value)
return True
except Exception as e:
print(f"Cache set error: {e}")
return False
async def delete(self, key: str) -> bool:
"""Delete value from cache"""
try:
await self.redis_client.delete(key)
return True
except Exception as e:
print(f"Cache delete error: {e}")
return False
async def cache_agent_response(self, agent_id: str, query: str, response: str, ttl: int = 300):
"""Cache agent response"""
cache_key = self.generate_cache_key(agent_id, query)
await self.set(cache_key, {'response': response, 'agent_id': agent_id}, ttl)
async def get_cached_response(self, agent_id: str, query: str) -> Optional[str]:
"""Get cached agent response"""
cache_key = self.generate_cache_key(agent_id, query)
cached_data = await self.get(cache_key)
return cached_data['response'] if cached_data else None
def generate_cache_key(self, agent_id: str, query: str) -> str:
"""Generate cache key from agent ID and query"""
key_data = f"{agent_id}:{query}"
return f"agent_response:{hashlib.md5(key_data.encode()).hexdigest()}"
Distribution and Sharing
Package Creation
Create setup.py
for distribution:
from setuptools import setup, find_packages
setup(
name="my-openagents-network",
version="1.0.0",
description="A custom OpenAgents network implementation",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
author="Your Name",
author_email="your.email@example.com",
url="https://github.com/yourusername/my-openagents-network",
packages=find_packages(),
install_requires=[
"openagents>=1.0.0",
"fastapi>=0.68.0",
"uvicorn>=0.15.0",
"redis>=4.0.0",
"psycopg2-binary>=2.9.0",
"prometheus-client>=0.11.0",
"pyjwt>=2.0.0",
],
extras_require={
"dev": [
"pytest>=6.0.0",
"pytest-asyncio>=0.15.0",
"black>=21.0.0",
"flake8>=3.9.0",
"mypy>=0.910",
]
},
python_requires=">=3.8",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
entry_points={
"console_scripts": [
"my-network=my_network.cli:main",
],
},
)
Documentation and Sharing
Create comprehensive documentation:
README.md
# My OpenAgents Network
A production-ready OpenAgents network with custom agents and specialized features.
## Features
- 🤖 Custom AI agents with specialized capabilities
- 🔐 Enterprise-grade security and authentication
- 📊 Comprehensive monitoring and metrics
- 🚀 Auto-scaling and high availability
- 🌐 Multi-protocol support
- 📱 Web interface and mobile support
## Quick Start
### Using Docker
```bash
git clone https://github.com/yourusername/my-openagents-network
cd my-openagents-network
docker-compose up -d
Using Kubernetes
kubectl apply -f k8s-deployment.yaml
Manual Installation
pip install my-openagents-network
my-network start --config production.yaml
Configuration
See Configuration Guide for detailed setup instructions.
Agents
This network includes the following specialized agents:
- DataAnalysisAgent: Advanced data processing and analytics
- CustomerSupportAgent: Intelligent customer service automation
- SecurityAgent: Network monitoring and threat detection
- IntegrationAgent: External service connectivity
API Documentation
Interactive API documentation is available at /docs
when the network is running.
Contributing
Please read CONTRIBUTING.md for guidelines on contributing to this project.
License
This project is licensed under the MIT License - see LICENSE file for details.
### Network Registry
Create a network registry entry:
**`network-registry.json`**
```json
{
"name": "my-production-network",
"version": "1.0.0",
"description": "A production-ready OpenAgents network for enterprise use",
"author": "Your Organization",
"repository": "https://github.com/yourusername/my-openagents-network",
"license": "MIT",
"tags": ["production", "enterprise", "ai", "automation"],
"features": [
"custom-agents",
"security",
"monitoring",
"scaling",
"multi-protocol"
],
"requirements": {
"openagents": ">=1.0.0",
"python": ">=3.8"
},
"endpoints": {
"demo": "https://demo.my-network.example.com",
"documentation": "https://docs.my-network.example.com",
"api": "https://api.my-network.example.com"
},
"deployment": {
"docker": true,
"kubernetes": true,
"cloud": ["aws", "gcp", "azure"]
},
"support": {
"documentation": "https://docs.my-network.example.com",
"issues": "https://github.com/yourusername/my-openagents-network/issues",
"community": "https://discord.gg/my-network"
}
}
Next Steps
After publishing your network:
- Monitor Performance: Use metrics and logs to track network health
- Gather Feedback: Collect user feedback and improvement suggestions
- Iterate and Improve: Regular updates and feature additions
- Community Building: Engage with users and contributors
- Documentation: Keep documentation current and comprehensive
Your OpenAgents network is now ready for production use and sharing with the community!