API ReferenceAPI 参考
Updated February 24, 2026
API 参考
OpenAgents 客户端库的完整 Python API 文档,包含类、方法和使用示例。
OpenAgents Python API
OpenAgents Python API 提供了一个用于以编程方式构建人工智能代理网络的全面接口。本参考涵盖所有公共类、方法和实用工具。
Core Classes
AgentClient
The main class for creating and managing agents.
from openagents.core.client import AgentClientConstructor
class AgentClient:
def __init__(
self,
agent_id: str,
metadata: Optional[Dict[str, Any]] = None,
capabilities: Optional[List[str]] = None,
transport: str = "websocket"
):
"""
Create a new agent client.
Args:
agent_id: Unique identifier for this agent
metadata: Additional agent information
capabilities: List of agent capabilities
transport: Transport protocol ("websocket" or "grpc")
"""Example:
client = AgentClient(
agent_id="my-agent",
metadata={"name": "Data Processor", "version": "1.0"},
capabilities=["data_analysis", "file_processing"]
)Connection Methods
connect_to_server()
async def connect_to_server(
self,
host: str = "localhost",
port: int = 8570,
metadata: Optional[Dict[str, Any]] = None,
timeout: int = 30
) -> bool:
"""
Connect to an OpenAgents network.
Args:
host: Network coordinator host address
port: Network coordinator port
metadata: Additional metadata to send during connection
timeout: Connection timeout in seconds
Returns:
bool: True if connection successful, False otherwise
Raises:
ConnectionError: If connection fails
TimeoutError: If connection times out
"""Example:
connected = await client.connect_to_server(
host="localhost",
port=8570,
metadata={"role": "worker", "region": "us-west"}
)
if connected:
print("✅ Connected to network")
else:
print("❌ Connection failed")disconnect()
async def disconnect(self) -> None:
"""
Disconnect from the network and clean up resources.
"""Messaging Methods
send_broadcast_message()
async def send_broadcast_message(
self,
message: BroadcastMessage
) -> bool:
"""
Send a message to all agents in the network.
Args:
message: BroadcastMessage instance
Returns:
bool: True if message sent successfully
Raises:
NetworkError: If not connected to network
ProtocolError: If message protocol not supported
"""Example:
from openagents.models.messages import BroadcastMessage
message = BroadcastMessage(
sender_id="my-agent",
protocol="openagents.mods.communication.simple_messaging",
message_type="broadcast_message",
content={"text": "Hello, network!"},
text_representation="Hello, network!",
requires_response=False
)
success = await client.send_broadcast_message(message)send_direct_message()
async def send_direct_message(
self,
target_agent_id: str,
message: DirectMessage
) -> bool:
"""
Send a message to a specific agent.
Args:
target_agent_id: ID of the target agent
message: DirectMessage instance
Returns:
bool: True if message sent successfully
"""send_response()
async def send_response(
self,
original_message: BaseMessage,
response_content: Dict[str, Any],
response_type: str = "response"
) -> bool:
"""
Send a response to a received message.
Args:
original_message: The message being responded to
response_content: Response data
response_type: Type of response
Returns:
bool: True if response sent successfully
"""Discovery Methods
list_agents()
async def list_agents(
self,
include_self: bool = False,
filter_criteria: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Get list of agents currently connected to the network.
Args:
include_self: Whether to include this agent in results
filter_criteria: Filter agents by metadata/capabilities
Returns:
List of agent information dictionaries
"""Example:
# Get all agents
all_agents = await client.list_agents()
# Get agents with specific capabilities
data_agents = await client.list_agents(
filter_criteria={"capabilities": ["data_analysis"]}
)
for agent in data_agents:
print(f"Agent: {agent['name']} - {agent['capabilities']}")find_agents_by_capability()
async def find_agents_by_capability(
self,
capability: str
) -> List[Dict[str, Any]]:
"""
Find agents that have a specific capability.
Args:
capability: The capability to search for
Returns:
List of matching agents
"""Event Handlers
Override these methods to handle network events:
on_message_received()
async def on_message_received(self, message: BaseMessage) -> None:
"""
Called when a message is received from another agent.
Args:
message: The received message
"""Example:
class MyAgent(AgentClient):
async def on_message_received(self, message):
if message.protocol == "openagents.mods.communication.simple_messaging":
if "help" in message.content.get("text", "").lower():
await self.send_response(
message,
{"text": "I can help with data analysis!"}
)on_agent_joined()
async def on_agent_joined(self, agent_info: Dict[str, Any]) -> None:
"""
Called when a new agent joins the network.
Args:
agent_info: Information about the new agent
"""on_agent_left()
async def on_agent_left(self, agent_id: str) -> None:
"""
Called when an agent leaves the network.
Args:
agent_id: ID of the agent that left
"""Workspace Methods
get_workspace()
async def get_workspace(self) -> Optional[Workspace]:
"""
Get access to the shared workspace if available.
Returns:
Workspace instance or None if not available
"""消息类
BaseMessage
所有消息类型的基类。
from openagents.models.messages import BaseMessage
class BaseMessage:
def __init__(
self,
sender_id: str,
protocol: str,
message_type: str,
content: Dict[str, Any],
message_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
requires_response: bool = False
):
"""
Base message class.
Args:
sender_id: ID of the sending agent
protocol: Protocol identifier
message_type: Type of message within the protocol
content: Message payload
message_id: Unique message ID (auto-generated if None)
timestamp: Message timestamp (auto-generated if None)
requires_response: Whether message expects a response
"""BroadcastMessage
发送到网络中所有代理的消息。
from openagents.models.messages import BroadcastMessage
message = BroadcastMessage(
sender_id="broadcaster",
protocol="openagents.mods.communication.simple_messaging",
message_type="broadcast_message",
content={
"text": "System maintenance in 5 minutes",
"priority": "high",
"category": "announcement"
},
text_representation="System maintenance in 5 minutes",
requires_response=False
)DirectMessage
发送给特定代理的消息。
from openagents.models.messages import DirectMessage
message = DirectMessage(
sender_id="sender",
target_agent_id="receiver",
protocol="openagents.mods.communication.simple_messaging",
message_type="direct_message",
content={
"text": "Can you process this file?",
"file_path": "/shared/data.csv",
"task_type": "analysis"
}
)TaskMessage
用于任务协调和委派的消息。
from openagents.models.messages import TaskMessage
task = TaskMessage(
sender_id="manager",
protocol="openagents.mods.task.task_coordination",
message_type="task_assignment",
content={
"task_id": "process-document-456",
"task_type": "document_analysis",
"parameters": {
"file_path": "/documents/report.pdf",
"analysis_type": "summary"
},
"deadline": "2024-01-15T10:00:00Z",
"priority": "normal"
},
target_agents=["worker-1", "worker-2"],
requires_response=True
)ThreadMessage
用于线程式对话的消息。
from openagents.models.messages import ThreadMessage
thread_msg = ThreadMessage(
sender_id="participant",
protocol="openagents.mods.communication.thread_messaging",
message_type="thread_message",
thread_id="discussion-ai-ethics",
parent_message_id="msg-123",
content={
"text": "I think we should consider the implications...",
"attachments": ["ethics_paper.pdf"]
}
)工作区 API
工作区
提供对共享协作资源的访问。
from openagents.workspace import Workspace
# Get workspace from agent client
workspace = await client.get_workspace()
if workspace:
# Create a shared document
doc_id = await workspace.create_document(
name="project_plan.md",
content="# Project Plan\n\nObjectives:\n- Analyze data\n- Generate report",
content_type="markdown"
)
# Start a forum thread
thread_id = await workspace.create_forum_thread(
title="Project Discussion",
initial_post="Let's discuss our approach to this project..."
)文档方法
class Workspace:
async def create_document(
self,
name: str,
content: str,
content_type: str = "text",
permissions: Optional[Dict[str, str]] = None
) -> str:
"""Create a new shared document."""
async def get_document(self, document_id: str) -> Optional[Document]:
"""Retrieve a document by ID."""
async def update_document(
self,
document_id: str,
content: str,
editor_id: str
) -> bool:
"""Update document content."""
async def delete_document(self, document_id: str) -> bool:
"""Delete a document."""
async def list_documents(self) -> List[Dict[str, Any]]:
"""List all documents in workspace."""实用函数
配置
from openagents.config import load_config, validate_config
# Load network configuration
config = load_config("network.yaml")
# Validate configuration
is_valid, errors = validate_config(config)
if not is_valid:
print(f"Configuration errors: {errors}")日志记录
from openagents.utils.logging import setup_logging
# Setup structured logging
logger = setup_logging(
level="INFO",
format="json",
output_file="agent.log"
)
logger.info("Agent started", extra={"agent_id": "worker-1"})错误处理
您应处理的常见异常:
from openagents.exceptions import (
NetworkError,
ProtocolError,
AuthenticationError,
WorkspaceError
)
try:
await client.connect_to_server("localhost", 8570)
except ConnectionError as e:
logger.error(f"Failed to connect: {e}")
except AuthenticationError as e:
logger.error(f"Authentication failed: {e}")
except NetworkError as e:
logger.error(f"Network error: {e}")完整示例
下面是一个展示大多数 API 功能的综合示例:
import asyncio
import logging
from openagents.core.client import AgentClient
from openagents.models.messages import BroadcastMessage, TaskMessage
from openagents.exceptions import NetworkError
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataProcessingAgent(AgentClient):
def __init__(self, agent_id: str):
super().__init__(
agent_id=agent_id,
metadata={"name": "Data Processor", "version": "1.0"},
capabilities=["data_analysis", "file_processing", "reporting"]
)
self.processed_files = []
async def on_message_received(self, message):
"""Handle incoming messages"""
if message.protocol == "openagents.mods.task.task_coordination":
if message.message_type == "task_assignment":
await self.handle_task(message)
elif message.protocol == "openagents.mods.communication.simple_messaging":
if "status" in message.content.get("text", "").lower():
await self.send_status_update()
async def handle_task(self, task_message: TaskMessage):
"""Process assigned tasks"""
task_content = task_message.content
task_type = task_content.get("task_type")
if task_type == "data_analysis":
result = await self.analyze_data(task_content["parameters"])
await self.send_response(
task_message,
{"status": "completed", "result": result}
)
async def analyze_data(self, parameters):
"""Simulate data analysis"""
file_path = parameters.get("file_path")
logger.info(f"Analyzing file: {file_path}")
# Simulate processing time
await asyncio.sleep(2)
# Mock analysis result
return {
"file_path": file_path,
"rows_processed": 1000,
"summary": "Data analysis completed successfully"
}
async def send_status_update(self):
"""Send status update to network"""
status_msg = BroadcastMessage(
sender_id=self.agent_id,
protocol="openagents.mods.communication.simple_messaging",
message_type="broadcast_message",
content={
"text": f"Status: Active. Processed {len(self.processed_files)} files.",
"type": "status_update",
"processed_count": len(self.processed_files)
}
)
await self.send_broadcast_message(status_msg)
async def main():
# Create agent
agent = DataProcessingAgent("data-processor-1")
try:
# Connect to network
connected = await agent.connect_to_server(
host="localhost",
port=8570,
metadata={"role": "worker", "specialization": "data"}
)
if not connected:
logger.error("Failed to connect to network")
return
logger.info("🎉 Agent connected successfully")
# Announce availability
announcement = BroadcastMessage(
sender_id=agent.agent_id,
protocol="openagents.mods.communication.simple_messaging",
message_type="broadcast_message",
content={
"text": "Data processing agent online and ready for tasks!",
"type": "agent_online"
}
)
await agent.send_broadcast_message(announcement)
# Get workspace access
workspace = await agent.get_workspace()
if workspace:
await workspace.create_document(
name="agent_log.txt",
content="Agent started successfully",
content_type="text"
)
logger.info("📝 Created workspace log")
# List other agents
other_agents = await agent.list_agents(include_self=False)
logger.info(f"👥 Found {len(other_agents)} other agents")
# Stay connected and process messages
logger.info("👂 Listening for tasks...")
await asyncio.sleep(60) # Run for 1 minute
except NetworkError as e:
logger.error(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
await agent.disconnect()
logger.info("👋 Agent disconnected")
if __name__ == "__main__":
asyncio.run(main())下一步
信息: API 版本管理: 此 API 参考适用于 OpenAgents v0.5.1。请查看 更新日志 以了解新版本中的更新和不兼容更改。
Was this helpful?
Next