掘金 人工智能 06月07日 14:54
强大的代理通信其实是 A2A + MCP + LangChain
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍如何使用A2A、MCP和LangChain搭建一个多代理聊天机器人,以满足业务或个人需求。文章详细阐述了谷歌的Agent-to-Agent协议(A2A)和模型上下文协议(MCP)在构建智能系统中的应用,并通过一个实时聊天机器人的演示展示了其运作。该机器人能够分析股票新闻,提供实时的股票价格数据、关键指标和专家财务分析。文章还提供了代码示例,展示了如何安装必要的库、导入模块、创建服务器以及处理命令行输入。最终,文章旨在帮助开发者构建协作式、工具使用型AI代理,解决复杂的任务。

🤖 A2A协议专注于代理协作,它建立了一种智能代理发现、沟通和合作的方式,使不同的AI系统能够像人类团队一样协同工作,能够处理和分析复杂的任务。

🔨 MCP协议定义了大型语言模型与各种工具、数据和资源交互的标准方式,使人工智能能够使用各种功能,就像程序员调用函数一样,从而实现更强大的功能。

💻 文章介绍了如何使用Python A2A库和Fastmcp库, 它们分别用于实现谷歌的A2A协议和处理MCP协议的细节。通过装饰函数,开发者可以轻松地创建工具,并将其集成到系统中。

📈 通过LangChain框架,该聊天机器人能够从Finviz抓取相关新闻,并将相对URL转换为绝对链接,从而获取更丰富、更准确的股票市场信息。

🛠️ 开发者在构建过程中解决了网络问题、JSON序列化错误、跨组件数据转换、错误处理以及动态工具选择等技术挑战,最终实现了一个无缝的股票分析系统。

在今天的博文中, 我将通过一个超快速教程, 向你展示如何使用A2A, MCP和LangChain创建一个多代理聊天机器人, 为你的业务或个人使用打造一个强大的代理聊天机器人.

我最近制作了一段关于Agent2Agent协议和模型上下文协议的视频. 有人询问是否可以将MCP和A2A结合使用, 我已完成这一要求. 不仅如此, 我还利用Langchain为聊天机器人提供了动力, 使代理能够分析并抓取最新的股票新闻.

随着人工智能技术的快速发展, 两个关键协议正在重塑我们构建智能系统的方式: 谷歌的Agent-to-Agent协议(A2A)和模型上下文协议(MCP). 这两个协议代表了人工智能架构发展的不同维度, 但它们共同指向一个未来: 我们正从确定性编程转向自主协作系统.

MCP(模型上下文协议)本质上是一个工具访问协议. 它定义了大型语言模型与各种工具, 数据和资源交互的标准方式. 简单来说, MCP使人工智能能够使用各种功能, 就像程序员调用函数一样.

A2A(Agent-to-Agent Protocol)专注于代理协作, 它建立了一种智能代理发现, 沟通和合作的方式, 使不同的AI系统能够像人类团队一样协同工作.

现在, 让我通过一个实时聊天机器人的演示来向你展示我的意思.

我将向聊天机器人提问: “苹果和英伟达的当前股价是多少?” 你可以随意提问任何问题.

如果你观察聊天机器人生成输出结果的过程, 会发现当我输入查询时, 元代理会分析请求并智能选择合适的工具.

首先, 它会将请求路由到StockData工具, 该工具从查询中提取股票代码“AAPL”和“NVDA”, 通过yfinance获取实时价格数据, 并返回包括当前价格, 百分比变化和关键财务指标在内的综合指标.

它可能调用FinancialNews工具从Finviz抓取相关新闻, 并通过将相对URL转换为绝对链接来正确处理相对URL. 元代理随后将这些结构化数据点与来自A2A股票市场专家代理的上下文知识相结合, 该代理提供专业的财务分析.

在开发过程中, 我遇到了并解决了几个重大的技术挑战: 网络问题与端口冲突(通过我自定义的find_available_port函数动态测试端口可用性来解决), JSON序列化错误(通过为所有非可序列化对象如时间戳和numpy类型显式实现类型转换解决), 跨组件数据转换(通过创建包装函数对组件间输入/输出进行 sanitization 解决), 错误处理(通过在每个级别实现全面的try/except块实现), 以及动态工具选择(通过LangChain的代理框架管理)

结果是一个无缝系统, 可将简单的自然语言查询转换为包含价格数据, 关键指标, 相关新闻和专家财务背景的全面股票分析.

让我们开始编码

让我们一步步探索并揭示如何创建 A2A 和 MCP 应用程序. 我们将安装支持该模型的库. 为此, 我们将执行 pip install requirements

pip install requirements

下一步是常规操作, 我们将导入相关库, 其重要性将在后续过程中逐渐显现.

我们通过导入以下类来启动代码:

Python A2A 是一个强大且生产就绪的库, 用于实现 Google 的代理到代理(A2A)协议, 并全面支持模型上下文协议(MCP). 它使开发者能够构建协作式, 工具使用型 AI 代理, 能够解决复杂任务.

Fastmcp 处理所有复杂的协议细节和服务器管理, 因此你可以专注于构建优秀的工具. 它设计为高级且符合 Python 风格——在大多数情况下, 只需装饰一个函数即可.

import osimport sysimport socketimport timeimport threadingimport argparseimport jsonfrom python_a2a import OpenAIA2AServer, run_server, A2AServer, AgentCard, AgentSkillfrom python_a2a.langchain import to_langchain_agent, to_langchain_toolfrom python_a2a.mcp import FastMCPfrom langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain.agents import initialize_agent, Toolfrom langchain.agents import AgentTypeimport yfinance as yfimport pandas as pdimport json # Import required librariesimport jsonimport requestsfrom bs4 import BeautifulSoupimport refrom urllib.parse import urljoin

我创建了一个实用程序, 用于自动查找可用端口并在后台运行服务器, 这对于本地测试或 Web 应用程序等场景非常有用. 首先, 我开发了 find_available_port 函数, 从指定的端口号(默认是 5000)开始检查端口, 并尝试连续 20 个端口.

它通过尝试将套接字绑定到每个端口来实现这一点; 如果由于端口已被占用而失败, 它将转到下一个端口. 如果没有可用端口, 它将回退到一个更高的端口号以避免常见冲突.

然后, 我构建了run_server_in_thread函数, 该函数旨在使用Python的threading模块在单独的线程中启动服务器.

这使得服务器可以在后台运行, 同时主程序继续执行. 我添加了一个sleep时间, 以确保服务器有足够的时间正常启动, 然后其他代码再与之交互.

def find_available_port(start_port=5000, max_tries=20):    """Find an available port starting from start_port"""    for port in range(start_port, start_port + max_tries):        try:            # Try to create a socket on the port            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            sock.bind(('localhost', port))            sock.close()            return port        except OSError:            # Port is already in use, try the next one            continue        # If we get here, no ports were available    print(f"⚠️  Could not find an available port in range {start_port}-{start_port + max_tries - 1}")    return start_port + 1000  # Try a port well outside the normal rangedef run_server_in_thread(server_func, server, **kwargs):    """Run a server in a background thread"""    thread = threading.Thread(target=server_func, args=(server,), kwargs=kwargs, daemon=True)    thread.start()    time.sleep(2)  # Give the server time to start    return thread

我创建了一个parse_arguments函数, 用于处理命令行输入, 以自定义脚本的运行方式, 特别是配置服务器端口和模型行为. 我使用Python的argparse模块创建了一个带有有用描述的解析器, 使用户能够轻松理解每个选项的作用.

我添加了如--a2a-port--mcp-port等参数, 允许用户手动设置两个不同服务器的端口, 尽管它们默认值为None, 因此程序可在需要时自动选择.

我还包含了如--model选项, 用于指定使用哪个OpenAI模型(默认为gpt-4o), 以及--temperature选项, 用于控制响应的创造性程度, 默认值为0.0以生成确定性输出.

def parse_arguments():    """Parse command line arguments"""    parser = argparse.ArgumentParser(description="A2A + MCP + LangChain Stock Market Integration Example")    parser.add_argument(        "--a2a-port", type=int, default=None,        help="Port to run the A2A server on (default: auto-select)"    )    parser.add_argument(        "--mcp-port", type=int, default=None,        help="Port to run the MCP server on (default: auto-select)"    )    parser.add_argument(        "--model", type=str, default="gpt-4o",        help="OpenAI model to use (default: gpt-4o)"    )    parser.add_argument(        "--temperature", type=float, default=0.0,        help="Temperature for generation (default: 0.0)"    )    return parser.parse_args()

随后, 我利用OpenAI的API和模块化多智能体系统开发了一个用于股票市场分析的AI服务器.

首先, 我检查用户是否通过命令行提供了自定义端口; 如果没有, 我使用 find_available_port 自动为每个服务器选择一个端口, 并使用不同范围以避免端口冲突. 然后, 我打印出选定的端口以帮助调试.

接着进入步骤1, 我创建了一个基于OpenAI的A2A(代理间通信)服务器. 为定义其角色和能力, 我创建了一个包含名称, 描述及技能列表(如市场分析, 投资策略, 公司分析)的AgentCard, 每个技能均附带示例以指导其行为.

之后, 我使用OpenAI模型类型, 温度参数以及一个详细的系统提示信息初始化了OpenAIA2AServer, 该提示信息指示AI以金融专家的身份进行操作.

def main():    """Main function"""    # Check API key    if not check_api_key():        return 1        # Parse arguments    args = parse_arguments()# Find available ports - use different ranges to avoid conflicts    a2a_port = args.a2a_port or find_available_port(5000, 20)    mcp_port = args.mcp_port or find_available_port(7000, 20)  # Try higher port range        print(f"🔍 A2A server port: {a2a_port}")    print(f"🔍 MCP server port: {mcp_port}")        # Step 1: Create the OpenAI-powered A2A server for Stock Market Expertise    print("\n📝 Step 1: Creating OpenAI-Powered A2A Server")        # Create an Agent Card for our OpenAI-powered agent    agent_card = AgentCard(        name="Stock Market Expert",        description=f"An A2A agent specialized in stock market analysis and financial information",        url=f"http://localhost:{a2a_port}",        version="1.0.0",        skills=[            AgentSkill(                name="Market Analysis",                description="Analyzing market trends, stock performance, and market indicators",                examples=["What's the current market sentiment?", "How do interest rates affect tech stocks?"]            ),            AgentSkill(                name="Investment Strategies",                description="Providing information about various investment approaches and strategies",                examples=["What is dollar cost averaging?", "How should I diversify my portfolio?"]            ),            AgentSkill(                name="Company Analysis",                description="Analyzing specific companies, their fundamentals, and performance metrics",                examples=["What are key metrics to evaluate a tech company?", "How to interpret P/E ratios?"]            )        ]    )        # Create the OpenAI server    openai_server = OpenAIA2AServer(        api_key=os.environ["OPENAI_API_KEY"],        model=args.model,        temperature=args.temperature,        system_prompt="You are a stock market and financial analysis expert. Provide accurate, concise information about stocks, market trends, investment strategies, and financial metrics. Focus on educational content and avoid making specific investment recommendations or predictions."    )

我构建了这一部分, 将基于OpenAI的股票市场专家封装为标准的A2A服务器, 以便其能够以结构化, 可扩展的方式处理消息.

我定义了一个新类StockMarketExpert, 该类继承自A2AServer并重写了handle_message方法, 以将传入的消息直接转发至OpenAI后端. 这使得A2A接口与更广泛的多智能体生态系统兼容.

在将OpenAI服务器和代理卡封装到该类后, 我将其实例化为stock_agent. 为了在后台运行服务器而不阻塞主程序, 我使用了run_server_in_thread函数, 并传入run_a2a_server, 该函数将服务器绑定到适当的主机和端口.

    # Wrap it in a standard A2A server for proper handling    class StockMarketExpert(A2AServer):        def __init__(self, openai_server, agent_card):            super().__init__(agent_card=agent_card)            self.openai_server = openai_server                def handle_message(self, message):            """Handle incoming messages by delegating to OpenAI server"""            return self.openai_server.handle_message(message)        # Create the wrapped agent    stock_agent = StockMarketExpert(openai_server, agent_card)        # Start the A2A server in a background thread    a2a_server_url = f"http://localhost:{a2a_port}"    print(f"\nStarting A2A server on {a2a_server_url}...")        def run_a2a_server(server, host="0.0.0.0", port=a2a_port):        """Run the A2A server"""        run_server(server, host=host, port=port)        a2a_thread = run_server_in_thread(run_a2a_server, stock_agent)        # Step 2: Create MCP Server with Finance Tools    print("\n📝 Step 2: Creating MCP Server with Finance Tools")        # Create MCP server with tools    mcp_server = FastMCP(        name="Finance Tools",        description="Advanced tools for stock market analysis"    )

我创建了一个股票数据获取器MCP工具, 允许用户检索财务数据. 我首先设计了一个函数stock_data, 该函数可以接受关键字参数或直接字符串, 然后解析以提取股票代码——无论是逗号分隔的还是嵌入在普通英语中的(如“apple”或“Tesla”). 为了处理这一点,

我构建了一个基于正则表达式的解析器和一个备用字典, 用于将常见公司名称映射到股票代码. 我添加了逻辑来检测输入中的时间范围(如“1y”或“1w”), 以自定义数据周期, 并使用yfinance库提取历史数据和详细公司信息.

我计算了每只股票的关键统计数据, 如价格变动, 百分比变动, 平均成交量, 市值和市盈率. 所有结果都以详细的 JSON 摘要形式呈现, 并且该函数包裹了异常处理以确保健壮性和有用的错误报告.

# Stock Data Fetcher Tool - Enhanced    @mcp_server.tool(        name="stock_data",        description="Fetch stock data for one or more ticker symbols"    )    def stock_data(input_str=None, **kwargs):        """Fetch stock data using yfinance with enhanced parsing."""        try:            # Handle both positional and keyword arguments            if 'input' in kwargs:                input_str = kwargs['input']                        # Make sure we have a string            if input_str is None:                return {"text": "Error: No ticker symbol provided"}                        input_str = str(input_str).strip()                        # Extract tickers - support multiple formats            tickers = []                        # Check for comma-separated tickers            if ',' in input_str:                tickers = [t.strip().upper() for t in input_str.split(',') if t.strip()]            else:                # Extract words that look like tickers (1-5 letters)                import re                tickers = [word.upper() for word in re.findall(r'\b[A-Za-z]{1,5}\b', input_str)]                        # Map common stock names to tickers if no explicit tickers found            if not tickers:                common_stocks = {                    'apple': 'AAPL', 'microsoft': 'MSFT', 'google': 'GOOGL',                     'amazon': 'AMZN', 'tesla': 'TSLA', 'nvidia': 'NVDA'                }                for name, ticker in common_stocks.items():                    if name.lower() in input_str.lower():                        tickers.append(ticker)                        if not tickers:                return {"text": "No valid ticker symbols found in input"}                        # Default parameters            period = "1mo"            interval = "1d"_sy                        # Very simple parameter extraction            if "year" in input_str or "1y" in input_str:                period = "1y"            elif "week" in input_str or "1w" in input_str:                period = "1wk"                        # Import libraries            import yfinance as yf            import pandas as pd            import json                        # Process each ticker            results = {}                        for ticker_symbol in tickers:                try:                    # Fetch data                    ticker = yf.Ticker(ticker_symbol)                    hist = ticker.history(period=period, interval=interval)                                        if hist.empty:                        results[ticker_symbol] = {"error": f"No data found for {ticker_symbol}"}                        continue                                        # Get more comprehensive info                    info = ticker.info                    company_name = info.get('shortName', ticker_symbol)                    sector = info.get('sector', 'Unknown')                    industry = info.get('industry', 'Unknown')                                        # Enhanced summary with more metrics                    latest = hist.iloc[-1]                    earliest = hist.iloc[0]                    price_change = float(latest['Close']) - float(earliest['Close'])                    percent_change = (price_change / float(earliest['Close'])) * 100                                        # Calculate some key statistics                    high_52week = info.get('fiftyTwoWeekHigh', 'Unknown')                    low_52week = info.get('fiftyTwoWeekLow', 'Unknown')                    avg_volume = info.get('averageVolume', 'Unknown')                    market_cap = info.get('marketCap', 'Unknown')                    pe_ratio = info.get('trailingPE', 'Unknown')                                        # Create a more comprehensive summary                    summary = {                        "ticker": ticker_symbol,                        "company_name": company_name,                        "sector": sector,                        "industry": industry,                        "latest_price": float(latest['Close']),                        "price_change": float(price_change),                        "percent_change": float(percent_change),                        "52_week_high": high_52week,                        "52_week_low": low_52week,                        "average_volume": avg_volume,                        "market_cap": market_cap,                        "pe_ratio": pe_ratio,                        "period": period,                        "interval": interval,                        "data_points": len(hist)                    }                                        results[ticker_symbol] = summary                                    except Exception as e:                    results[ticker_symbol] = {"error": f"Error processing {ticker_symbol}: {str(e)}"}                        return {"text": json.dumps(results)}        except Exception as e:            import traceback            error_details = traceback.format_exc()            return {"text": f"Error: {str(e)}\nDetails: {error_details}"}

我开发了一个 web_scraper MCP 工具, 用于通过灵活的输入系统获取财务新闻和公司数据, 该系统支持股票代码, URL 和一般主题.

首先, 我设置了一个具有描述性名称的工具函数, 并配备了输入处理器, 用于检查位置参数和关键字参数. 在确认输入为有效字符串后, 我使用正则表达式识别输入是股票代码, URL还是主题.

对于股票代码如“AAPL”, 代理会向Finviz发送请求, 使用BeautifulSoup解析HTML, 并提取最新五条新闻标题及公司概况详情. 若输入为URL, 则直接返回链接并附带提示信息. 若为通用主题, 则提示用户使用网页搜索替代.

 # Web Scraper Tool - Enhanced    @mcp_server.tool(        name="web_scraper",        description="Get financial news and information"    )    def web_scraper(input_str=None, **kwargs):        """Get financial news using web search."""        try:            # Handle both positional and keyword arguments            if 'input' in kwargs:                input_str = kwargs['input']                        # Make sure we have a string            if input_str is None:                return {"text": "Error: No input provided"}                        input_str = str(input_str).strip()                        # Import required libraries            import json            import requests            from bs4 import BeautifulSoup            import re            from urllib.parse import urljoin                        # Determine if this is a ticker, URL, or topic            if re.match(r'^[A-Za-z]{1,5}$', input_str):                # It's a ticker symbol - get stock news                ticker = input_str.upper()                                # Use a simple approach with Finviz                headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124 Safari/537.36'}                url = f"https://finviz.com/quote.ashx?t={ticker.lower()}"                                response = requests.get(url, headers=headers)                soup = BeautifulSoup(response.text, 'html.parser')                                # Get news items                news_table = soup.find('table', {'id': 'news-table'})                news_items = []                                if news_table:                    for row in news_table.find_all('tr')[:5]:                        cells = row.find_all('td')                        if len(cells) >= 2:                            date_cell = cells[0]                            title_cell = cells[1]                                                        link = title_cell.find('a')                            if link:                                news_link = link['href']                                # Fix relative URLs                                if not news_link.startswith('http'):                                    news_link = urljoin(url, news_link)                                                                news_items.append({                                    "title": link.text.strip(),                                    "link": news_link,                                    "date": date_cell.text.strip()                                })                                # Try to get some company details from Finviz as well                company_details = {}                snapshot_table = soup.find('table', {'class': 'snapshot-table2'})                if snapshot_table:                    rows = snapshot_table.find_all('tr')                    for row in rows:                        cells = row.find_all('td')                        for i in range(0, len(cells), 2):                            if i+1 < len(cells):                                key = cells[i].text.strip()                                value = cells[i+1].text.strip()                                if key and value:                                    company_details[key] = value                                return {"text": json.dumps({                    "ticker": ticker,                    "news_items": news_items,                    "company_details": company_details                })}                            elif input_str.startswith('http'):                # It's a URL - just return basic info                return {"text": json.dumps({                    "url": input_str,                    "message": "URL processing is simplified in this version."                })}            else:                # Treat as a topic                topic = input_str.replace("topic:", "").strip()                                return {"text": json.dumps({                    "topic": topic,                    "message": "Please use web search for detailed topic information."                })}                        except Exception as e:            import traceback            error_details = traceback.format_exc()            return {"text": f"Error: {str(e)}\nDetails: {error_details}"}

我创建此系统旨在在后台启动MCP服务器, 将其实际工具和A2A代理转换为LangChain兼容组件, 并在简化工作流中测试所有功能.

我通过后台线程在指定端口初始化MCP服务器, 随后短暂暂停以确保服务器完全上线. 若初始端口失败, 系统将动态查找新可用端口并重启服务器.

一旦验证 MCP 服务器正在运行, 我使用辅助函数将 A2A 代理和 MCP 工具(如 stock_dataweb_scraper MCP 工具)转换为 LangChain 工具.

成功转换后, 我对每个组件进行测试: 首先向A2A代理查询财务洞察, 然后使用如“AAPL”等样本股票代码调用MCP工具, 以确认其返回准确数据.

 # Start the MCP server in a background thread    mcp_server_url = f"http://localhost:{mcp_port}"    print(f"\nStarting MCP server on {mcp_server_url}...")        def run_mcp_server(server, host="0.0.0.0", port=mcp_port):        """Run the MCP server"""        server.run(host=host, port=port)        mcp_thread = run_server_in_thread(run_mcp_server, mcp_server)        # Wait a bit longer for the MCP server to start    print("\nWaiting for servers to initialize...")    time.sleep(5)        # Check if MCP server is actually running    mcp_server_running = False    try:        import requests        response = requests.get(f"{mcp_server_url}/tools")        if response.status_code == 200:            mcp_server_running = True    except:        pass        if not mcp_server_running:        print(f"❌ MCP server failed to start on port {mcp_port}.")        print("Let's try a different port...")                # Try a different port        mcp_port = find_available_port(8000, 20)        mcp_server_url = f"http://localhost:{mcp_port}"        print(f"\nRetrying MCP server on {mcp_server_url}...")                # Start the MCP server in a background thread with new port        mcp_thread = run_server_in_thread(run_mcp_server, mcp_server, port=mcp_port)        time.sleep(5)        # Step 3: Convert A2A agent to LangChain    print("\n📝 Step 3: Converting A2A Agent to LangChain")        try:        langchain_agent = to_langchain_agent(a2a_server_url)        print("✅ Successfully converted A2A agent to LangChain")    except Exception as e:        print(f"❌ Error converting A2A agent to LangChain: {e}")        return 1        # Step 4: Convert MCP tools to LangChain tools    print("\n📝 Step 4: Converting MCP Tools to LangChain")        try:        stock_data_tool = to_langchain_tool(mcp_server_url, "stock_data")        web_scraper_tool = to_langchain_tool(mcp_server_url, "web_scraper")        print("✅ Successfully converted MCP tools to LangChain")    except Exception as e:        print(f"❌ Error converting MCP tools to LangChain: {e}")        print("\nContinuing with only the A2A agent...")        stock_data_tool = None        web_scraper_tool = None        # Step 5: Test the components individually    print("\n📝 Step 5: Testing Individual Components")        # Test A2A agent via LangChain    try:        print("\nTesting A2A-based LangChain agent:")        result = langchain_agent.invoke("What are some key metrics to evaluate a company's stock?")        print(f"A2A Agent Response: {result.get('output', '')}")    except Exception as e:        print(f"❌ Error using A2A-based LangChain agent: {e}")        # Test MCP tools via LangChain if available    if stock_data_tool and web_scraper_tool:        try:            print("\nTesting MCP-based LangChain tools:")                        print("\n1. Stock Data Tool:")            stock_result = stock_data_tool.invoke("AAPL")            print(f"Stock Data Tool Response: {stock_result[:500]}...")  # Truncate for display                        print("\n2. Web Scraper Tool:")            web_result = web_scraper_tool.invoke("AAPL")            print(f"Web Scraper Tool Response: {web_result[:500]}...")  # Truncate for display                    except Exception as e:            print(f"❌ Error using MCP-based LangChain tools: {e}")            import traceback            traceback.print_exc()

我创建了这一步, 旨在利用LangChain构建一个元代理, 将多个智能工具整合为一个统一系统. 首先, 我初始化了一个OpenAI大型语言模型(LLM), 并配置了可调整的参数(如模型和温度). 随后, 我将每个工具的核心逻辑——如基于A2A的股票专家, MCP股票数据提取器和网页抓取器——封装为可调用的Python函数, 确保安全处理输入和错误情况.

我将这些函数注册为LangChain Tool对象, 并为其指定清晰的名称和描述, 使其可供代理访问. 最后, 我使用initialize_agent将所有这些工具整合为一个元代理, 该代理通过OpenAI Functions根据用户查询智能选择要调用的工具.

# Step 6: Creating a Meta-Agent with available components    print("\n📝 Step 6: Creating Meta-Agent with Available Tools")        try:        # Create an LLM for the meta-agent        llm = ChatOpenAI(model=args.model, temperature=args.temperature)                # Create wrapper functions for the LangChain tools        def ask_stock_expert(query):            """Ask the stock market expert agent a question."""            try:                result = langchain_agent.invoke(query)                return result.get('output', 'No response')            except Exception as e:                return f"Error querying stock expert: {str(e)}"                # Create tools list starting with the A2A agent        tools = [            Tool(                name="StockExpert",                func=ask_stock_expert,                description="Ask the stock market expert questions about investing, market trends, financial concepts, etc."            )        ]                # Add MCP tools if available        if stock_data_tool:            def get_stock_data(ticker_info):                """Get stock data for analysis."""                try:                    # Ensure ticker_info is a string                    if ticker_info is None:                        return "Error: No ticker symbol provided"                    if not isinstance(ticker_info, str):                        ticker_info = str(ticker_info)                                        result = stock_data_tool.invoke(ticker_info)                    return result                except Exception as e:                    return f"Error getting stock data: {str(e)}"                                tools.append(Tool(                name="StockData",                func=get_stock_data,                description="Get historical stock data. Input can be one or more ticker symbols (e.g., 'AAPL' or 'AAPL, MSFT')"            ))                    if web_scraper_tool:            def get_financial_news(query):                """Get financial news and information."""                try:                    # Ensure query is a string                    if query is None:                        return "Error: No query provided"                    if not isinstance(query, str):                        query = str(query)                                        result = web_scraper_tool.invoke(query)                    return result                except Exception as e:                    return f"Error getting financial news: {str(e)}"                                tools.append(Tool(                name="FinancialNews",                func=get_financial_news,                description="Get financial news. Input can be a ticker symbol, URL, or topic."            ))                # Create the meta-agent        meta_agent = initialize_agent(            tools,             llm,             agent=AgentType.OPENAI_FUNCTIONS,            verbose=True,            handle_parsing_errors=True        )

我开发了AI代理的这一最终部分, 以测试一个完全集成的元代理, 该代理将所有可用工具(如A2A代理和MCP工具)整合到一个统一的LangChain接口下.

我向系统查询, “苹果和英伟达的当前股票价格是多少?”, 元代理会根据查询内容智能地将任务委托给相应的工具.

在打印元代理的响应后, 我通过进入一个等待用户中断的循环, 确保整个系统持续稳定运行, 同时保持后台服务器处于活跃状态.

# Test the meta-agent        print("\nTesting Meta-Agent with available tools:")                test_query = "What are the current stock prices of Apple and Nvidia?"                print(f"\nQuery: {test_query}")        meta_result = meta_agent.invoke(test_query)        print(f"\nMeta-Agent Response: {meta_result}")            except Exception as e:        print(f"❌ Error creating or using meta-agent: {e}")        import traceback        traceback.print_exc()        # Keep the servers running until user interrupts    print("\n✅ Integration successful!")    print("Press Ctrl+C to stop the servers and exit")        try:        while True:            time.sleep(1)    except KeyboardInterrupt:        print("\nExiting...")        return 0if __name__ == "__main__":    try:        sys.exit(main())    except KeyboardInterrupt:        print("\nProgram interrupted by user")        sys.exit(0)

MCP和A2A代表了AI系统构建的两个关键维度——一个用于工具集成, 另一个用于代理协作. 二者共同标志着软件开发范式的根本性转变: 从显式编程转向描述性, 自主且协作的系统.

随着这些协议的成熟, 我们可以期待更智能, 灵活和强大的AI应用程序——这些应用程序不仅执行预定义的指令, 还能自主思考, 适应和协作以完成复杂任务. 我们不再是编程软件, 而是与智能系统协作.

这不仅仅是AI架构的演进, 而是软件开发方式的革命.

好吧, 今天的内容就分享到这里啦!

一家之言,欢迎拍砖!

Happy coding! Stay GOLDEN!

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

A2A MCP LangChain 聊天机器人 人工智能
相关文章