MarkTechPost@AI 前天 15:20
A Coding Guide to Building a Scalable Multi-Agent Communication Systems Using Agent Communication Protocol (ACP)
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍如何使用Python构建一个灵活的、符合Agent通信协议(ACP)的消息系统,并结合Google的Gemini API进行自然语言处理。教程从安装和配置google-generativeai库开始,介绍了核心抽象、消息类型、执行器和ACPMessage数据类,该类用于标准化智能体间的通信。通过定义ACPAgent和ACPMessageBroker类,演示了如何在多个自主智能体之间创建、发送、路由和处理结构化消息。用户可以通过清晰的代码示例,学习如何实现查询、请求操作和广播信息,同时维护对话线程、确认和错误处理。

💡 教程首先安装并配置了`google-generativeai`库,为后续调用Gemini API做准备,为实现ACP打下基础。

🔑 定义了`ACPMessageType`枚举,列出了ACP中使用的核心消息类型,包括请求、响应、通知、查询等,确保了智能体间消息处理的一致性。

🗣️ 提供了`ACPPerformative`枚举,定义了智能体在ACP框架下交互时使用的各种“言语行为”,如提问、请求、同意等,使得智能体能够根据上下文进行恰当的响应。

✉️ `ACPMessage`数据类封装了ACP交换所需的所有字段,包括标识符、参与者、执行器、有效载荷和元数据,并提供了`to_acp_format`和`from_acp_format`方法,用于JSON格式的序列化和反序列化。

🤖 `ACPAgent`类实现了ACP,包括创建消息、发送信息、查询、请求操作和回复等方法,还包括处理消息的`process_message`方法,根据消息类型生成相应的响应。

In this tutorial, we implement the Agent Communication Protocol (ACP) through building a flexible, ACP-compliant messaging system in Python, leveraging Google’s Gemini API for natural language processing. Beginning with the installation and configuration of the google-generativeai library, the tutorial introduces core abstractions, message types, performatives, and the ACPMessage data class, which standardizes inter-agent communication. By defining ACPAgent and ACPMessageBroker classes, the guide demonstrates how to create, send, route, and process structured messages among multiple autonomous agents. Through clear code examples, users learn to implement querying, requesting actions, and broadcasting information, while maintaining conversation threads, acknowledgments, and error handling.

import google.generativeai as genaiimport jsonimport timeimport uuidfrom enum import Enumfrom typing import Dict, List, Any, Optionalfrom dataclasses import dataclass, asdictGEMINI_API_KEY = "Use Your Gemini API Key"genai.configure(api_key=GEMINI_API_KEY)

We import essential Python modules, ranging from JSON handling and timing to unique identifier generation and type annotations, to support a structured ACP implementation. It then retrieves the user’s Gemini API key placeholder and configures the google-generativeai client for subsequent calls to the Gemini language model.

class ACPMessageType(Enum):    """Standard ACP message types"""    REQUEST = "request"    RESPONSE = "response"    INFORM = "inform"    QUERY = "query"    SUBSCRIBE = "subscribe"    UNSUBSCRIBE = "unsubscribe"    ERROR = "error"    ACK = "acknowledge"

The ACPMessageType enumeration defines the core message categories used in the Agent Communication Protocol, including requests, responses, informational broadcasts, queries, and control actions like subscription management, error signaling, and acknowledgments. By centralizing these message types, the protocol ensures consistent handling and routing of inter-agent communications throughout the system.

class ACPPerformative(Enum):    """ACP speech acts (performatives)"""    TELL = "tell"    ASK = "ask"    REPLY = "reply"    REQUEST_ACTION = "request-action"    AGREE = "agree"    REFUSE = "refuse"    PROPOSE = "propose"    ACCEPT = "accept"    REJECT = "reject"

The ACPPerformative enumeration captures the variety of speech acts agents can use when interacting under the ACP framework, mapping high-level intentions, such as making requests, posing questions, giving commands, or negotiating agreements, onto standardized labels. This clear taxonomy enables agents to interpret and respond to messages in contextually appropriate ways, ensuring robust and semantically rich communication.

@dataclassclass ACPMessage:    """Agent Communication Protocol Message Structure"""    message_id: str    sender: str    receiver: str    performative: str      content: Dict[str, Any]    protocol: str = "ACP-1.0"    conversation_id: str = None    reply_to: str = None    language: str = "english"    encoding: str = "json"    timestamp: float = None       def __post_init__(self):        if self.timestamp is None:            self.timestamp = time.time()        if self.conversation_id is None:            self.conversation_id = str(uuid.uuid4())       def to_acp_format(self) -> str:        """Convert to standard ACP message format"""        acp_msg = {            "message-id": self.message_id,            "sender": self.sender,            "receiver": self.receiver,            "performative": self.performative,            "content": self.content,            "protocol": self.protocol,            "conversation-id": self.conversation_id,            "reply-to": self.reply_to,            "language": self.language,            "encoding": self.encoding,            "timestamp": self.timestamp        }        return json.dumps(acp_msg, indent=2)       @classmethod    def from_acp_format(cls, acp_string: str) -> 'ACPMessage':        """Parse ACP message from string format"""        data = json.loads(acp_string)        return cls(            message_id=data["message-id"],            sender=data["sender"],            receiver=data["receiver"],            performative=data["performative"],            content=data["content"],            protocol=data.get("protocol", "ACP-1.0"),            conversation_id=data.get("conversation-id"),            reply_to=data.get("reply-to"),            language=data.get("language", "english"),            encoding=data.get("encoding", "json"),            timestamp=data.get("timestamp", time.time())        )

The ACPMessage data class encapsulates all the fields required for a structured ACP exchange, including identifiers, participants, performative, payload, and metadata such as protocol version, language, and timestamps. Its __post_init__ method auto-populates missing timestamp and conversation_id values, ensuring every message is uniquely tracked. Utility methods to_acp_format and from_acp_format handle serialization to and from the standardized JSON representation for seamless transmission and parsing.

class ACPAgent:    """Agent implementing Agent Communication Protocol"""       def __init__(self, agent_id: str, name: str, capabilities: List[str]):        self.agent_id = agent_id        self.name = name        self.capabilities = capabilities        self.model = genai.GenerativeModel("gemini-1.5-flash")        self.message_queue: List[ACPMessage] = []        self.subscriptions: Dict[str, List[str]] = {}          self.conversations: Dict[str, List[ACPMessage]] = {}       def create_message(self, receiver: str, performative: str,                      content: Dict[str, Any], conversation_id: str = None,                      reply_to: str = None) -> ACPMessage:        """Create a new ACP-compliant message"""        return ACPMessage(            message_id=str(uuid.uuid4()),            sender=self.agent_id,            receiver=receiver,            performative=performative,            content=content,            conversation_id=conversation_id,            reply_to=reply_to        )       def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:        """Send an INFORM message (telling someone a fact)"""        content = {"fact": fact, "data": data}        return self.create_message(receiver, ACPPerformative.TELL.value, content)       def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:        """Send a QUERY message (asking for information)"""        content = {"question": question, "query-type": query_type}        return self.create_message(receiver, ACPPerformative.ASK.value, content)       def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:        """Send a REQUEST message (asking someone to perform an action)"""        content = {"action": action, "parameters": parameters or {}}        return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)       def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:        """Send a REPLY message in response to another message"""        content = {"response": response_data, "original-question": original_msg.content}        return self.create_message(            original_msg.sender,            ACPPerformative.REPLY.value,            content,            conversation_id=original_msg.conversation_id,            reply_to=original_msg.message_id        )       def process_message(self, message: ACPMessage) -> Optional[ACPMessage]:        """Process incoming ACP message and generate appropriate response"""        self.message_queue.append(message)               conv_id = message.conversation_id        if conv_id not in self.conversations:            self.conversations[conv_id] = []        self.conversations[conv_id].append(message)               if message.performative == ACPPerformative.ASK.value:            return self._handle_query(message)        elif message.performative == ACPPerformative.REQUEST_ACTION.value:            return self._handle_request(message)        elif message.performative == ACPPerformative.TELL.value:            return self._handle_inform(message)               return None       def _handle_query(self, message: ACPMessage) -> ACPMessage:        """Handle incoming query messages"""        question = message.content.get("question", "")               prompt = f"As agent {self.name} with capabilities {self.capabilities}, answer: {question}"        try:            response = self.model.generate_content(prompt)            answer = response.text.strip()        except:            answer = "Unable to process query at this time"               return self.send_reply(message, {"answer": answer, "confidence": 0.8})       def _handle_request(self, message: ACPMessage) -> ACPMessage:        """Handle incoming action requests"""        action = message.content.get("action", "")        parameters = message.content.get("parameters", {})               if any(capability in action.lower() for capability in self.capabilities):            result = f"Executing {action} with parameters {parameters}"            status = "agreed"        else:            result = f"Cannot perform {action} - not in my capabilities"            status = "refused"               return self.send_reply(message, {"status": status, "result": result})       def _handle_inform(self, message: ACPMessage) -> Optional[ACPMessage]:        """Handle incoming information messages"""        fact = message.content.get("fact", "")        print(f"[{self.name}] Received information: {fact}")               ack_content = {"status": "received", "fact": fact}        return self.create_message(message.sender, "acknowledge", ack_content,                                 conversation_id=message.conversation_id)

The ACPAgent class encapsulates an autonomous entity capable of sending, receiving, and processing ACP-compliant messages using Gemini’s language model. It manages its own message queue, conversation history, and subscriptions, and provides helper methods (send_inform, send_query, send_request, send_reply) to construct correctly formatted ACPMessage instances. Incoming messages are routed through process_message, which delegates to specialized handlers for queries, action requests, and informational messages.

class ACPMessageBroker:    """Message broker implementing ACP routing and delivery"""       def __init__(self):        self.agents: Dict[str, ACPAgent] = {}        self.message_log: List[ACPMessage] = []        self.routing_table: Dict[str, str] = {}         def register_agent(self, agent: ACPAgent):        """Register an agent with the message broker"""        self.agents[agent.agent_id] = agent        self.routing_table[agent.agent_id] = "local"        print(f"✓ Registered agent: {agent.name} ({agent.agent_id})")       def route_message(self, message: ACPMessage) -> bool:        """Route ACP message to appropriate recipient"""        if message.receiver not in self.agents:            print(f"✗ Receiver {message.receiver} not found")            return False               print(f"\n ACP MESSAGE ROUTING:")        print(f"From: {message.sender} → To: {message.receiver}")        print(f"Performative: {message.performative}")        print(f"Content: {json.dumps(message.content, indent=2)}")               receiver_agent = self.agents[message.receiver]        response = receiver_agent.process_message(message)               self.message_log.append(message)               if response:            print(f"\n GENERATED RESPONSE:")            print(f"From: {response.sender} → To: {response.receiver}")            print(f"Content: {json.dumps(response.content, indent=2)}")                       if response.receiver in self.agents:                self.agents[response.receiver].process_message(response)                self.message_log.append(response)               return True       def broadcast_message(self, message: ACPMessage, recipients: List[str]):        """Broadcast message to multiple recipients"""        for recipient in recipients:            msg_copy = ACPMessage(                message_id=str(uuid.uuid4()),                sender=message.sender,                receiver=recipient,                performative=message.performative,                content=message.content.copy(),                conversation_id=message.conversation_id            )            self.route_message(msg_copy)

The ACPMessageBroker serves as the central router for ACP messages, maintaining a registry of agents and a message log. It provides methods to register agents, deliver individual messages via route_message, which handles lookup, logging, and response chaining, and to send the same message to multiple recipients with broadcast_message.

def demonstrate_acp():    """Comprehensive demonstration of Agent Communication Protocol"""       print(" AGENT COMMUNICATION PROTOCOL (ACP) DEMONSTRATION")    print("=" * 60)       broker = ACPMessageBroker()       researcher = ACPAgent("agent-001", "Dr. Research", ["analysis", "research", "data-processing"])    assistant = ACPAgent("agent-002", "AI Assistant", ["information", "scheduling", "communication"])    calculator = ACPAgent("agent-003", "MathBot", ["calculation", "mathematics", "computation"])       broker.register_agent(researcher)    broker.register_agent(assistant)    broker.register_agent(calculator)       print(f"\n REGISTERED AGENTS:")    for agent_id, agent in broker.agents.items():        print(f"  • {agent.name} ({agent_id}): {', '.join(agent.capabilities)}")       print(f"\n SCENARIO 1: Information Query (ASK performative)")    query_msg = assistant.send_query("agent-001", "What are the key factors in AI research?")    broker.route_message(query_msg)       print(f"\n SCENARIO 2: Action Request (REQUEST-ACTION performative)")    calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})    broker.route_message(calc_request)       print(f"\n SCENARIO 3: Information Sharing (TELL performative)")    info_msg = researcher.send_inform("agent-002", "New research paper published on quantum computing")    broker.route_message(info_msg)       print(f"\n PROTOCOL STATISTICS:")    print(f"  • Total messages processed: {len(broker.message_log)}")    print(f"  • Active conversations: {len(set(msg.conversation_id for msg in broker.message_log))}")    print(f"  • Message types used: {len(set(msg.performative for msg in broker.message_log))}")       print(f"\n SAMPLE ACP MESSAGE FORMAT:")    sample_msg = assistant.send_query("agent-001", "Sample question for format demonstration")    print(sample_msg.to_acp_format())

The demonstrate_acp function orchestrates a hands-on walkthrough of the entire ACP framework: it initializes a broker and three distinct agents (Researcher, AI Assistant, and MathBot), registers them, and illustrates three key interaction scenarios, querying for information, requesting a computation, and sharing an update. After routing each message and handling responses, it prints summary statistics on the message flow. It showcases a formatted ACP message, providing users with a clear, end-to-end example of how agents communicate under the protocol.

def setup_guide():    print("""     GOOGLE COLAB SETUP GUIDE:       1. Get Gemini API Key: https://makersuite.google.com/app/apikey    2. Replace: GEMINI_API_KEY = "YOUR_ACTUAL_API_KEY"    3. Run: demonstrate_acp()        ACP PROTOCOL FEATURES:       • Standardized message format with required fields    • Speech act performatives (TELL, ASK, REQUEST-ACTION, etc.)    • Conversation tracking and message threading    • Error handling and acknowledgments    • Message routing and delivery confirmation        EXTEND THE PROTOCOL:    ```python    # Create custom agent    my_agent = ACPAgent("my-001", "CustomBot", ["custom-capability"])    broker.register_agent(my_agent)       # Send custom message    msg = my_agent.send_query("agent-001", "Your question here")    broker.route_message(msg)    ```    """)if __name__ == "__main__":    setup_guide()    demonstrate_acp() 

Finally, the setup_guide function provides a quick-start reference for running the ACP demo in Google Colab, outlining how to obtain and configure your Gemini API key and invoke the demonstrate_acp routine. It also summarizes key protocol features, such as standardized message formats, performatives, and message routing. It provides a concise code snippet illustrating how to register custom agents and send tailored messages.

In conclusion, this tutorial implements ACP-based multi-agent systems capable of research, computation, and collaboration tasks. The provided sample scenarios illustrate common use cases, information queries, computational requests, and fact sharing, while the broker ensures reliable message delivery and logging. Readers are encouraged to extend the framework by adding new agent capabilities, integrating domain-specific actions, or incorporating more sophisticated subscription and notification mechanisms.


Download the Notebook on GitHub. All credit for this research goes to the researchers of this project. Also, feel free to follow us on Twitter and don’t forget to join our 95k+ ML SubReddit and Subscribe to our Newsletter.

The post A Coding Guide to Building a Scalable Multi-Agent Communication Systems Using Agent Communication Protocol (ACP) appeared first on MarkTechPost.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

ACP Python Gemini API 智能体通信
相关文章