MarkTechPost@AI 10小时前
How to Build an Asynchronous AI Agent Network Using Gemini for Research, Analysis, and Validation Tasks
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍 Gemini Agent 网络协议,这是一个强大的框架,旨在促进专业 AI 代理之间的智能协作。该协议基于 Google 的 Gemini 模型,支持 Analyzer、Researcher、Synthesizer 和 Validator 等不同角色的代理之间的动态通信。用户将学习如何设置和配置异步代理网络,从而实现任务自动化分配、协同问题解决和增强的对话管理。该框架适用于深入研究、复杂数据分析和信息验证等场景,帮助用户高效利用集体 AI 智能。

💡 协议核心基于 Gemini 模型,构建异步代理网络,包括 Analyzer、Researcher、Synthesizer 和 Validator 四种类型的代理。每个代理都具有特定角色和功能,例如,Analyzer 负责分解问题,Researcher 负责收集信息,Synthesizer 负责整合信息,而 Validator 负责验证信息的准确性。

🤖 代理通过消息进行通信,消息包含发送者、接收者、内容、消息类型和元数据。每个代理都有一个收件箱,用于接收消息,并使用 Gemini 模型处理传入消息,生成响应。代理之间可以发送消息、广播信息,并维护上下文记忆以支持更智能的交互。

⚙️ AgentNetwork 类负责管理代理的协调和通信,包括添加代理、路由消息和启动协作任务。用户可以向网络添加不同类型的代理,并启动一个任务,该任务将首先发送给 Analyzer 代理,然后由网络中的其他代理协同处理。

⏱️ 整个系统使用 asyncio 实现并发执行,保证代理之间的异步交互。通过设置运行时间,网络可以自动运行,并在指定时间后结束。文章还演示了如何创建一个网络,并让其执行一个关于量子计算对网络安全影响的分析任务。

In this tutorial, we introduce the Gemini Agent Network Protocol, a powerful and flexible framework designed to enable intelligent collaboration among specialized AI agents. Leveraging Google’s Gemini models, the protocol facilitates dynamic communication between agents, each equipped with distinct roles: Analyzer, Researcher, Synthesizer, and Validator. Users will learn to set up and configure an asynchronous agent network, enabling automated task distribution, collaborative problem-solving, and enriched dialogue management. Ideal for scenarios such as in-depth research, complex data analysis, and information validation, this framework empowers users to harness collective AI intelligence efficiently.

import asyncioimport jsonimport randomfrom dataclasses import dataclass, asdictfrom typing import Dict, List, Optional, Anyfrom enum import Enumimport google.generativeai as genai

We leverage asyncio for concurrent execution, dataclasses for structured message management, and Google’s Generative AI (google.generativeai) to facilitate interactions among multiple AI-driven agents. It includes utilities for dynamic message handling and structured agent roles, enhancing scalability and flexibility in collaborative AI tasks.

API_KEY = Nonetry:    import google.colab    IN_COLAB = Trueexcept ImportError:    IN_COLAB = False

We initialize the API_KEY and detect whether the code is running in a Colab environment. If the google.colab module is successfully imported, the IN_COLAB flag is set to True; otherwise, it defaults to False, allowing the script to adjust behavior accordingly.

class AgentType(Enum):    ANALYZER = "analyzer"    RESEARCHER = "researcher"    SYNTHESIZER = "synthesizer"    VALIDATOR = "validator"@dataclassclass Message:    sender: str    receiver: str    content: str    msg_type: str    metadata: Dict = None

Check out the Notebook

We define the core structures for agent interaction. The AgentType enum categorizes agents into four distinct roles, Analyzer, Researcher, Synthesizer, and Validator, each with a specific function in the collaborative network. The Message dataclass represents the format for inter-agent communication, encapsulating sender and receiver IDs, message content, type, and optional metadata.

class GeminiAgent:    def __init__(self, agent_id: str, agent_type: AgentType, network: 'AgentNetwork'):        self.id = agent_id        self.type = agent_type        self.network = network        self.model = genai.GenerativeModel('gemini-2.0-flash')        self.inbox = asyncio.Queue()        self.context_memory = []               self.system_prompts = {            AgentType.ANALYZER: "You are a data analyzer. Break down complex problems into components and identify key patterns.",            AgentType.RESEARCHER: "You are a researcher. Gather information and provide detailed context on topics.",            AgentType.SYNTHESIZER: "You are a synthesizer. Combine information from multiple sources into coherent insights.",            AgentType.VALIDATOR: "You are a validator. Check accuracy and consistency of information and conclusions."        }       async def process_message(self, message: Message):        """Process incoming message and generate response"""        if not API_KEY:            return " API key not configured. Please set API_KEY variable."                   prompt = f"""        {self.system_prompts[self.type]}               Context from previous interactions: {json.dumps(self.context_memory[-3:], indent=2)}               Message from {message.sender}: {message.content}               Provide a focused response (max 100 words) that adds value to the network discussion.        """               try:            response = await asyncio.to_thread(                self.model.generate_content, prompt            )            return response.text.strip()        except Exception as e:            return f"Error processing: {str(e)}"       async def send_message(self, receiver_id: str, content: str, msg_type: str = "task"):        """Send message to another agent"""        message = Message(self.id, receiver_id, content, msg_type)        await self.network.route_message(message)       async def broadcast(self, content: str, exclude_self: bool = True):        """Broadcast message to all agents in network"""        for agent_id in self.network.agents:            if exclude_self and agent_id == self.id:                continue            await self.send_message(agent_id, content, "broadcast")       async def run(self):        """Main agent loop"""        while True:            try:                message = await asyncio.wait_for(self.inbox.get(), timeout=1.0)                               response = await self.process_message(message)                               self.context_memory.append({                    "from": message.sender,                    "content": message.content,                    "my_response": response                })                               if len(self.context_memory) > 10:                    self.context_memory = self.context_memory[-10:]                               print(f" {self.id} ({self.type.value}): {response}")                               if random.random() < 0.3:                      other_agents = [aid for aid in self.network.agents.keys() if aid != self.id]                    if other_agents:                        target = random.choice(other_agents)                        await self.send_message(target, f"Building on that: {response[:50]}...")                           except asyncio.TimeoutError:                continue            except Exception as e:                print(f" Error in {self.id}: {e}")

Check out the Notebook

The GeminiAgent class defines the behavior and capabilities of each agent in the network. Upon initialization, it assigns a unique ID, role type, and a reference to the agent network and loads the Gemini 2.0 Flash model. It uses role-specific system prompts to generate intelligent responses based on incoming messages, which are processed asynchronously through a queue. Each agent maintains a context memory to retain recent interactions and can either respond directly, send targeted messages, or broadcast insights to others. The run() method continuously processes messages, promotes collaboration by occasionally initiating responses to other agents, and manages message handling in a non-blocking loop.

class AgentNetwork:    def __init__(self):        self.agents: Dict[str, GeminiAgent] = {}        self.message_log = []        self.running = False       def add_agent(self, agent_type: AgentType, agent_id: Optional[str] = None):        """Add new agent to network"""        if not agent_id:            agent_id = f"{agent_type.value}_{len(self.agents)+1}"               agent = GeminiAgent(agent_id, agent_type, self)        self.agents[agent_id] = agent        print(f" Added {agent_id} to network")        return agent_id       async def route_message(self, message: Message):        """Route message to target agent"""        self.message_log.append(asdict(message))               if message.receiver in self.agents:            await self.agents[message.receiver].inbox.put(message)        else:            print(f"  Agent {message.receiver} not found")       async def initiate_task(self, task: str):        """Start a collaborative task"""        print(f" Starting task: {task}")               analyzer_agents = [aid for aid, agent in self.agents.items()                          if agent.type == AgentType.ANALYZER]               if analyzer_agents:            initial_message = Message("system", analyzer_agents[0], task, "task")            await self.route_message(initial_message)       async def run_network(self, duration: int = 30):        """Run the agent network for specified duration"""        self.running = True        print(f" Starting agent network for {duration} seconds...")               agent_tasks = [agent.run() for agent in self.agents.values()]               try:            await asyncio.wait_for(asyncio.gather(*agent_tasks), timeout=duration)        except asyncio.TimeoutError:            print(" Network session completed")        finally:            self.running = False

Check out the Notebook

The AgentNetwork class manages the coordination and communication between all agents in the system. It allows dynamic addition of agents with unique IDs and specified roles, maintains a log of all exchanged messages, and facilitates message routing to the correct recipient. The network can initiate a collaborative task by sending the starting message to an Analyzer agent, and runs the full asynchronous event loop for a specified duration, enabling agents to operate concurrently and interactively within a shared environment.

async def demo_agent_network():    """Demonstrate the Gemini Agent Network Protocol"""       network = AgentNetwork()       network.add_agent(AgentType.ANALYZER, "deep_analyzer")    network.add_agent(AgentType.RESEARCHER, "info_gatherer")    network.add_agent(AgentType.SYNTHESIZER, "insight_maker")    network.add_agent(AgentType.VALIDATOR, "fact_checker")       task = "Analyze the potential impact of quantum computing on cybersecurity"       network_task = asyncio.create_task(network.run_network(20))    await asyncio.sleep(1)      await network.initiate_task(task)    await network_task       print(f"\n Network completed with {len(network.message_log)} messages exchanged")    agent_participation = {aid: sum(1 for msg in network.message_log if msg['sender'] == aid)                          for aid in network.agents}    print("Agent participation:", agent_participation)def setup_api_key():    """Interactive API key setup"""    global API_KEY       if IN_COLAB:        from google.colab import userdata        try:            API_KEY = userdata.get('GEMINI_API_KEY')            genai.configure(api_key=API_KEY)            print(" API key loaded from Colab secrets")            return True        except:            print(" To use Colab secrets: Add 'GEMINI_API_KEY' in the secrets panel")       print(" Please enter your Gemini API key:")    print("   Get it from: https://makersuite.google.com/app/apikey")       try:        if IN_COLAB:            from google.colab import userdata            API_KEY = input("Paste your API key here: ").strip()        else:            import getpass            API_KEY = getpass.getpass("Paste your API key here: ").strip()               if API_KEY and len(API_KEY) > 10:            genai.configure(api_key=API_KEY)            print(" API key configured successfully!")            return True        else:            print(" Invalid API key")            return False    except KeyboardInterrupt:        print("\n Setup cancelled")        return False

Check out the Notebook

The demo_agent_network() function orchestrates the entire agent workflow: it initializes an agent network, adds four role-specific agents, launches a cybersecurity task, and runs the network asynchronously for a fixed duration while tracking message exchanges and agent participation. Meanwhile, setup_api_key() provides an interactive mechanism to securely configure the Gemini API key, with tailored logic for both Colab and non-Colab environments, ensuring the AI agents can communicate with the Gemini model backend before the demo begins.

if __name__ == "__main__":    print(" Gemini Agent Network Protocol")    print("=" * 40)       if not setup_api_key():        print(" Cannot run without valid API key")        exit()       print("\n Starting demo...")       if IN_COLAB:        import nest_asyncio        nest_asyncio.apply()        loop = asyncio.get_event_loop()        loop.run_until_complete(demo_agent_network())    else:        asyncio.run(demo_agent_network())

Finally, the above code serves as the entry point for executing the Gemini Agent Network Protocol. It begins by prompting the user to set up the Gemini API key, exiting if not provided. Upon successful configuration, the demo is launched. If running in Google Colab, it applies nest_asyncio to handle Colab’s event loop restrictions; otherwise, it uses Python’s native asyncio.run() to execute the asynchronous demo of agent collaboration.

In conclusion, by completing this tutorial, users gain practical knowledge of implementing an AI-powered collaborative network using Gemini agents. The hands-on experience provided here demonstrates how autonomous agents can effectively break down complex problems, collaboratively generate insights, and ensure the accuracy of information through validation.


Check out the Notebook. All credit for this research goes to the researchers of this project. Also, feel free to follow us on Twitter and don’t forget to join our 99k+ ML SubReddit and Subscribe to our Newsletter.

The post How to Build an Asynchronous AI Agent Network Using Gemini for Research, Analysis, and Validation Tasks appeared first on MarkTechPost.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Gemini AI代理 异步网络 协作
相关文章