掘金 人工智能 07月03日 10:23
文心大模型4.5-国产算力华为昇腾910B部署教程
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本教程旨在指导用户在华为昇腾910B NPU硬件上部署百度文心大模型4.5,并提供OpenAI兼容的API服务。该方案支持并发推理和流式输出,特别适用于国产算力环境。由于Fastdeploy尚未支持昇腾部署,本项目基于transformers框架,虽然速度较慢,但可供学习参考。教程详细介绍了环境配置、依赖安装、模型下载与准备,以及API服务的部署过程。

💻 环境配置是部署的第一步。首先,需要安装NPU驱动,并通过`npu-smi info`命令验证驱动是否正确安装。然后,使用提供的自动化环境配置脚本`env.sh`,该脚本会自动检测NPU驱动状态、系统架构,下载并安装torch_npu包,以及安装项目依赖,最后验证torch_npu是否可用。

💾 模型下载与准备是关键环节。使用`huggingface-cli download`命令从Hugging Face Hub下载百度ERNIE-4.5-21B模型,并指定下载源。下载完成后,通过`ls -la`命令检查模型目录结构,确保配置文件和模型权重文件都已正确下载。

🚀 API服务部署是核心。项目提供了一个名为`ernie_api.py`的Python脚本,实现了OpenAI兼容的API服务,支持标准的`/v1/chat/completions`接口,并具备并发处理、流式输出和NPU优化等特性。该脚本定义了数据模型、加载模型、处理请求、以及服务状态监控等功能。

💡 服务架构设计值得关注。`ernie_api.py`脚本使用了FastAPI框架构建,支持CORS,并定义了用于处理请求的`ChatCompletionRequest`和返回结果的`ChatCompletionResponse`等数据结构。它还包含一个工作线程,用于处理请求队列中的任务,实现高并发推理。

项目概述

本教程将详细介绍如何在华为昇腾910B NPU硬件上部署百度文心大模型4.5(ERNIE-4.5-21B),并提供OpenAI兼容的API服务。该方案支持并发推理、流式输出,适用于国产算力环境部署。
⚠️注意:由于飞桨官方套件Fastdeploy尚未支持昇腾部署,因此本项目以transformers框架部署为基础,经过测试,部署可用但速度较慢,仅供参考学习,后续等vllm和fastdeploy成熟再持续更新高性能部署内容。

环境要求

技术步骤

一、环境配置与依赖安装

1.1 NPU驱动安装验证

# 检查NPU设备状态npu-smi info

正确输出如下图所示:

如果命令无输出或报错,请参考 华为昇腾官方文档 安装驱动。

1.2 自动化环境配置

项目提供了自动化环境配置脚本 env.sh ,执行以下命令:

#!/bin/bash# 环境配置脚本 - 华为昇腾NPU环境设置echo "开始配置华为昇腾NPU环境..."# 检查NPU驱动是否正确安装echo "检查NPU驱动状态..."if [ ! -f "/usr/local/bin/npu-smi" ]; then    echo "错误: 未找到npu-smi命令,请先安装NPU驱动"    echo "请前往以下链接安装驱动: https://www.hiascend.com/document/detail/zh/CANNCommunityEdition/80RC2alpha002/quickstart/quickstart/quickstart_18_0004.html"    exit 1fi# 检查npu-smi info是否有输出echo "执行npu-smi info检查NPU设备..."/usr/local/bin/npu-smi info > /tmp/npu_output.txt 2>&1npu_exit_code=$?npu_output=$(cat /tmp/npu_output.txt)if [ $npu_exit_code -ne 0 ] || [ -z "$npu_output" ]; then    echo "错误: npu-smi info 无输出或执行失败,NPU驱动未正确安装或NPU设备不可用"    echo "请前往以下链接检查驱动安装: https://www.hiascend.com/document/detail/zh/CANNCommunityEdition/80RC2alpha002/quickstart/quickstart/quickstart_18_0004.html"    echo "NPU驱动安装后,npu-smi info 应该有设备信息输出"    echo "实际输出内容:"    echo "$npu_output"    exit 1fi# 显示NPU信息echo "NPU设备信息:"echo "$npu_output"# 清理临时文件rm -f /tmp/npu_output.txtecho "NPU驱动检查通过,开始检测系统架构..."# 检测系统架构ARCH=$(uname -m)echo "检测到系统架构: $ARCH"# 根据架构选择下载链接和文件名if [ "$ARCH" = "aarch64" ]; then    DOWNLOAD_URL="https://gitee.com/ascend/pytorch/releases/download/v7.0.0-pytorch2.5.1/torch_npu-2.5.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl"    WHEEL_FILE="torch_npu-2.5.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl"    echo "使用ARM64架构的torch_npu包"elif [ "$ARCH" = "x86_64" ]; then    DOWNLOAD_URL="https://gitee.com/ascend/pytorch/releases/download/v7.0.0-pytorch2.5.1/torch_npu-2.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl"    WHEEL_FILE="torch_npu-2.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl"    echo "使用x86_64架构的torch_npu包"else    echo "错误: 不支持的系统架构 $ARCH"    echo "仅支持 aarch64 和 x86_64 架构"    exit 1fiecho "开始下载torch_npu包..."# 检查文件是否已存在if [ -f "$WHEEL_FILE" ]; then    echo "检测到torch_npu包文件已存在,将覆盖现有文件"fi# 下载插件包(使用-O选项强制覆盖)echo "正在下载torch_npu包..."wget -O "$WHEEL_FILE" "$DOWNLOAD_URL"if [ $? -ne 0 ]; then    echo "错误: torch_npu包下载失败"    exit 1fiecho "torch_npu包下载完成,开始安装..."# 安装命令pip install $WHEEL_FILEif [ $? -ne 0 ]; then    echo "错误: torch_npu包安装失败"    exit 1fiecho "torch_npu包安装完成,正在测试..."# 测试torch_npu是否支持(如果为True则为支持)echo "测试torch_npu是否可用..."result=$(python -c "import torch;import torch_npu;print(torch_npu.npu.is_available())" 2>/dev/null)if [ "$result" = "True" ]; then    echo "✅ torch_npu测试通过,NPU支持已启用"else    echo "❌ torch_npu测试失败,NPU支持未启用"    echo "请检查NPU驱动和torch_npu安装是否正确"    exit 1fi# 检查requirements.txt是否存在if [ -f "requirements.txt" ]; then    echo "正在安装项目依赖..."    # 安装包依赖    pip install -r requirements.txt        if [ $? -eq 0 ]; then        echo "✅ 项目依赖安装完成"    else        echo "❌ 项目依赖安装失败"        exit 1    fielse    echo "⚠️  未找到requirements.txt文件,跳过依赖安装"fiecho "🎉 华为昇腾NPU环境配置完成!"echo "您现在可以使用torch_npu进行NPU加速计算了。"

该脚本将自动完成:

1.3 验证环境

# 验证torch_npu可用性python -c "import torch; import torch_npu; print('NPU可用:', torch_npu.npu.is_available())"

二、模型下载与准备

2.1 自动化模型下载

# 这里有点问题,用高版本的hf下载模型会很慢,但是低版本的就正常了pip install huggingface_hub==0.19.0export HF_ENDPOINT=https://hf-mirror.comhuggingface-cli download --resume-download baidu/ERNIE-4.5-21B-A3B-PT --local-dir ./baidu/ERNIE-4.5-21B-A3B-PTpip install huggingface_hub --upgrade

2.2 验证模型文件

# 检查模型目录结构ls -la ./baidu/ERNIE-4.5-21B-A3B-PT/# 验证关键文件存在ls -la ./baidu/ERNIE-4.5-21B-A3B-PT/config.jsonls -la ./baidu/ERNIE-4.5-21B-A3B-PT/model.safetensors.index.json

三、API服务部署

3.1 服务架构说明

ernie_api.py 实现了完整的OpenAI兼容API服务,主要特性:

import asyncioimport timeimport uuidimport jsonfrom typing import List, Optional, Dict, Any, AsyncGeneratorfrom datetime import datetimefrom queue import Queuefrom threading import Thread, Lockimport torch_npuimport torchfrom transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamerfrom fastapi import FastAPI, HTTPExceptionfrom fastapi.middleware.cors import CORSMiddlewarefrom fastapi.responses import StreamingResponsefrom pydantic import BaseModelimport uvicorn# OpenAI API兼容的数据模型class Message(BaseModel):    role: str    content: strclass ChatCompletionRequest(BaseModel):    model: str = "ernie-4.5"    messages: List[Message]    max_tokens: Optional[int] = 1024    temperature: Optional[float] = 0.7    top_p: Optional[float] = 0.9    stream: Optional[bool] = Falseclass ChatCompletionResponse(BaseModel):    id: str    object: str = "chat.completion"    created: int    model: str    choices: List[Dict[str, Any]]    usage: Dict[str, int]class ModelInfo(BaseModel):    id: str    object: str = "model"    created: int    owned_by: str = "baidu"# 全局变量app = FastAPI(title="ERNIE API Server", version="1.0.0")model = Nonetokenizer = Nonerequest_queue = Queue()response_dict = {}response_lock = Lock()stream_dict = {}stream_lock = Lock()# CORS中间件app.add_middleware(    CORSMiddleware,    allow_origins=["*"],    allow_credentials=True,    allow_methods=["*"],    allow_headers=["*"],)def load_model():    """加载模型和分词器"""    global model, tokenizer    model_name = "./baidu/ERNIE-4.5-21B-A3B-PT"    device = "npu:0"        print("正在加载模型...")    tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)    model = AutoModelForCausalLM.from_pretrained(        model_name,         trust_remote_code=True,        torch_dtype=torch.float16    ).to(device)    print("模型加载完成")def process_request(request_id: str, request_data: ChatCompletionRequest):    """处理单个请求"""    try:        # 准备输入        messages = [{"role": msg.role, "content": msg.content} for msg in request_data.messages]        text = tokenizer.apply_chat_template(            messages,            tokenize=False,            add_generation_prompt=True        )                model_inputs = tokenizer([text], add_special_tokens=False, return_tensors="pt").to("npu")                # 生成文本        with torch.no_grad():            generated_ids = model.generate(                model_inputs.input_ids,                max_new_tokens=request_data.max_tokens,                temperature=request_data.temperature,                top_p=request_data.top_p,                do_sample=True,                pad_token_id=tokenizer.eos_token_id            )                output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist()        generated_text = tokenizer.decode(output_ids, skip_special_tokens=True).strip("\n")                # 构造响应        response = ChatCompletionResponse(            id=request_id,            created=int(time.time()),            model=request_data.model,            choices=[                {                    "index": 0,                    "message": {                        "role": "assistant",                        "content": generated_text                    },                    "finish_reason": "stop"                }            ],            usage={                "prompt_tokens": len(model_inputs.input_ids[0]),                "completion_tokens": len(output_ids),                "total_tokens": len(model_inputs.input_ids[0]) + len(output_ids)            }        )                # 存储响应        with response_lock:            response_dict[request_id] = response                except Exception as e:        with response_lock:            response_dict[request_id] = {"error": str(e)}def process_stream_request(request_id: str, request_data: ChatCompletionRequest):    """处理流式请求"""    try:        # 准备输入        messages = [{"role": msg.role, "content": msg.content} for msg in request_data.messages]        text = tokenizer.apply_chat_template(            messages,            tokenize=False,            add_generation_prompt=True        )                model_inputs = tokenizer([text], add_special_tokens=False, return_tensors="pt").to("npu")                # 创建流式生成器        streamer = TextIteratorStreamer(            tokenizer,             timeout=60.0,             skip_prompt=True,             skip_special_tokens=True        )                # 在新线程中生成文本        generation_kwargs = {            "input_ids": model_inputs.input_ids,            "max_new_tokens": request_data.max_tokens,            "temperature": request_data.temperature,            "top_p": request_data.top_p,            "do_sample": True,            "pad_token_id": tokenizer.eos_token_id,            "streamer": streamer        }                def generate():            with torch.no_grad():                model.generate(**generation_kwargs)                thread = Thread(target=generate)        thread.start()                # 存储流式生成器        with stream_lock:            stream_dict[request_id] = {                "streamer": streamer,                "thread": thread,                "created": int(time.time()),                "model": request_data.model            }                except Exception as e:        with stream_lock:            stream_dict[request_id] = {"error": str(e)}def worker():    """工作线程处理队列中的请求"""    while True:        try:            request_id, request_data = request_queue.get(timeout=1)            if request_data.stream:                process_stream_request(request_id, request_data)            else:                process_request(request_id, request_data)            request_queue.task_done()        except:            continueasync def generate_stream_response(request_id: str) -> AsyncGenerator[str, None]:    """生成流式响应"""    with stream_lock:        if request_id not in stream_dict:            yield f"data: {{\"error\": \"Request not found\"}}\n\n"            return                stream_info = stream_dict[request_id]        if "error" in stream_info:            yield f"data: {{\"error\": \"{stream_info['error']}\"}}\n\n"            return        streamer = stream_info["streamer"]    created = stream_info["created"]    model_name = stream_info["model"]        try:        for new_text in streamer:            if new_text:                chunk = {                    "id": request_id,                    "object": "chat.completion.chunk",                    "created": created,                    "model": model_name,                    "choices": [                        {                            "index": 0,                            "delta": {                                "content": new_text                            },                            "finish_reason": None                        }                    ]                }                yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"                await asyncio.sleep(0.01)  # 小延迟避免过快发送                # 发送结束标记        final_chunk = {            "id": request_id,            "object": "chat.completion.chunk",            "created": created,            "model": model_name,            "choices": [                {                    "index": 0,                    "delta": {},                    "finish_reason": "stop"                }            ]        }        yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"        yield "data: [DONE]\n\n"            except Exception as e:        error_chunk = {            "error": {                "message": str(e),                "type": "server_error"            }        }        yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"        finally:        # 清理资源        with stream_lock:            if request_id in stream_dict:                del stream_dict[request_id]@app.on_event("startup")async def startup_event():    """启动时加载模型并启动工作线程"""    load_model()        # 启动多个工作线程支持并发    for i in range(2):  # 可以根据需要调整线程数        thread = Thread(target=worker, daemon=True)        thread.start()@app.get("/v1/models")async def list_models():    """列出可用模型"""    return {        "object": "list",        "data": [            ModelInfo(                id="ernie-4.5",                created=int(time.time()),                owned_by="baidu"            )        ]    }@app.post("/v1/chat/completions")async def chat_completions(request: ChatCompletionRequest):    """聊天完成接口"""    if model is None or tokenizer is None:        raise HTTPException(status_code=503, detail="模型尚未加载完成")        request_id = str(uuid.uuid4())        # 将请求加入队列    request_queue.put((request_id, request))        if request.stream:        # 流式响应        # 等待流式处理开始        max_wait_time = 30        start_time = time.time()                while time.time() - start_time < max_wait_time:            with stream_lock:                if request_id in stream_dict:                    break            await asyncio.sleep(0.1)        else:            raise HTTPException(status_code=408, detail="流式请求初始化超时")                return StreamingResponse(            generate_stream_response(request_id),            media_type="text/plain",            headers={                "Cache-Control": "no-cache",                "Connection": "keep-alive",                "Content-Type": "text/event-stream"            }        )    else:        # 非流式响应        max_wait_time = 300  # 最大等待时间5分钟        start_time = time.time()                while time.time() - start_time < max_wait_time:            with response_lock:                if request_id in response_dict:                    response = response_dict.pop(request_id)                    if "error" in response:                        raise HTTPException(status_code=500, detail=response["error"])                    return response                        await asyncio.sleep(0.1)                raise HTTPException(status_code=408, detail="请求超时")@app.get("/health")async def health_check():    """健康检查"""    return {        "status": "healthy",        "model_loaded": model is not None,        "queue_size": request_queue.qsize(),        "active_streams": len(stream_dict)    }if __name__ == "__main__":    uvicorn.run(app, host="0.0.0.0", port=6006)

3.2 启动API服务

# 启动服务(默认端口6006)python ernie_api.py

服务启动后将显示:

正在加载模型...模型加载完成INFO:     Started server process [PID]INFO:     Waiting for application startup.INFO:     Application startup complete.INFO:     Uvicorn running on http://0.0.0.0:6006

3.3 服务配置说明

关键配置参数(在 load_model 函数中):

def load_model():    global model, tokenizer    model_name = "./baidu/ERNIE-4.5-21B-A3B-PT"  # 模型路径    device = "npu:0"  # NPU设备        tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)    model = AutoModelForCausalLM.from_pretrained(        model_name,         trust_remote_code=True,        torch_dtype=torch.float16  # 使用半精度节省显存    ).to(device)

3.4 API接口说明

健康检查

curl http://localhost:6006/health
{"status":"healthy","model_loaded":true,"queue_size":0,"active_streams":0}

模型列表

curl http://localhost:6006/v1/models
{"object":"list","data":[{"id":"ernie-4.5","object":"model","created":1751462626,"owned_by":"baidu"}]}

Chat请求(非流式)

curl -X POST http://localhost:6006/v1/chat/completions \  -H "Content-Type: application/json" \  -d '{    "model": "ernie-4.5",    "messages": [      {"role": "user", "content": "你好,请介绍一下你自己"}    ],    "max_tokens": 512,    "temperature": 0.7  }'

Chat请求(流式)

curl -X POST http://localhost:6006/v1/chat/completions \  -H "Content-Type: application/json" \  -d '{    "model": "ernie-4.5",    "messages": [      {"role": "user", "content": "请详细介绍Python编程语言"}    ],    "max_tokens": 512,    "temperature": 0.7,    "stream": true  }'

四、功能测试

4.1 基础功能测试

使用 test.py 进行全面测试:

import requestsimport jsonimport timeimport asyncioimport aiohttpfrom concurrent.futures import ThreadPoolExecutor# 服务器配置BASE_URL = "http://localhost:6006"def test_health_check():    """测试健康检查"""    print("=== 健康检查测试 ===")    response = requests.get(f"{BASE_URL}/health")    print(f"状态码: {response.status_code}")    print(f"响应: {response.json()}")    print()def test_list_models():    """测试模型列表"""    print("=== 模型列表测试 ===")    response = requests.get(f"{BASE_URL}/v1/models")    print(f"状态码: {response.status_code}")    print(f"响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}")    print()def test_single_chat():    """测试单次聊天"""    print("=== 单次聊天测试 ===")        payload = {        "model": "ernie-4.5",        "messages": [            {"role": "user", "content": "你好,请介绍一下你自己"}        ],        "max_tokens": 512,        "temperature": 0.7    }        start_time = time.time()    response = requests.post(f"{BASE_URL}/v1/chat/completions", json=payload)    end_time = time.time()        print(f"状态码: {response.status_code}")    print(f"响应时间: {end_time - start_time:.2f}秒")        if response.status_code == 200:        result = response.json()        print(f"响应: {json.dumps(result, indent=2, ensure_ascii=False)}")    else:        print(f"错误: {response.text}")    print()def single_request(session, request_id):    """单个请求函数"""    payload = {        "model": "ernie-4.5",        "messages": [            {"role": "user", "content": f"请用一句话介绍Python编程语言(请求{request_id})"}        ],        "max_tokens": 256,        "temperature": 0.7    }        start_time = time.time()    response = session.post(f"{BASE_URL}/v1/chat/completions", json=payload)    end_time = time.time()        return {        "request_id": request_id,        "status_code": response.status_code,        "response_time": end_time - start_time,        "success": response.status_code == 200    }def test_concurrent_requests():    """测试并发请求"""    print("=== 并发请求测试 ===")        num_requests = 5        with requests.Session() as session:        with ThreadPoolExecutor(max_workers=num_requests) as executor:            start_time = time.time()            futures = [executor.submit(single_request, session, i+1) for i in range(num_requests)]            results = [future.result() for future in futures]            end_time = time.time()        print(f"总耗时: {end_time - start_time:.2f}秒")    print(f"成功请求: {sum(1 for r in results if r['success'])}/{num_requests}")        for result in results:        print(f"请求{result['request_id']}: 状态码={result['status_code']}, 耗时={result['response_time']:.2f}秒")    print()async def async_single_request(session, request_id):    """异步单个请求"""    payload = {        "model": "ernie-4.5",        "messages": [            {"role": "user", "content": f"请简单介绍一下人工智能(异步请求{request_id})"}        ],        "max_tokens": 256,        "temperature": 0.7    }        start_time = time.time()    async with session.post(f"{BASE_URL}/v1/chat/completions", json=payload) as response:        end_time = time.time()        return {            "request_id": request_id,            "status_code": response.status,            "response_time": end_time - start_time,            "success": response.status == 200        }async def test_async_concurrent_requests():    """测试异步并发请求"""    print("=== 异步并发请求测试 ===")        num_requests = 3        async with aiohttp.ClientSession() as session:        start_time = time.time()        tasks = [async_single_request(session, i+1) for i in range(num_requests)]        results = await asyncio.gather(*tasks)        end_time = time.time()        print(f"总耗时: {end_time - start_time:.2f}秒")    print(f"成功请求: {sum(1 for r in results if r['success'])}/{num_requests}")        for result in results:        print(f"异步请求{result['request_id']}: 状态码={result['status_code']}, 耗时={result['response_time']:.2f}秒")    print()def test_openai_compatibility():    """测试OpenAI兼容性"""    print("=== OpenAI兼容性测试 ===")        # 模拟OpenAI客户端调用    headers = {        "Content-Type": "application/json",        "Authorization": "Bearer dummy-key"  # 可选,当前实现不验证    }        payload = {        "model": "ernie-4.5",        "messages": [            {"role": "system", "content": "你是一个有用的AI助手。"},            {"role": "user", "content": "解释一下什么是机器学习?"}        ],        "max_tokens": 512,        "temperature": 0.8,        "top_p": 0.9    }        response = requests.post(f"{BASE_URL}/v1/chat/completions", json=payload, headers=headers)        print(f"状态码: {response.status_code}")    if response.status_code == 200:        result = response.json()        print("OpenAI格式响应结构验证:")        print(f"- id: {result.get('id')}")        print(f"- object: {result.get('object')}")        print(f"- model: {result.get('model')}")        print(f"- choices数量: {len(result.get('choices', []))}")        print(f"- usage: {result.get('usage')}")        print(f"- 生成内容: {result['choices'][0]['message']['content'][:100]}...")    else:        print(f"错误: {response.text}")    print()def main():    """主测试函数"""    print("开始测试ERNIE API服务...")    print("请确保服务已在 http://localhost:6006 启动\n")        # 等待服务启动    print("等待服务启动...")    for i in range(30):        try:            response = requests.get(f"{BASE_URL}/health", timeout=2)            if response.status_code == 200:                print("服务已启动\n")                break        except:            time.sleep(2)    else:        print("服务启动超时,请检查服务是否正常运行")        return        # 运行测试    test_health_check()    test_list_models()    test_single_chat()    test_concurrent_requests()        # 运行异步测试    asyncio.run(test_async_concurrent_requests())        test_openai_compatibility()        print("所有测试完成!")if __name__ == "__main__":    main()

测试内容包括:

4.2 流式功能测试

使用 test_stream.py 测试流式输出:

import requestsimport jsonimport timeimport asyncioimport aiohttpfrom concurrent.futures import ThreadPoolExecutor# 服务器配置BASE_URL = "http://localhost:6006"def test_stream_chat():    """测试流式聊天"""    print("=== 流式聊天测试 ===")        payload = {        "model": "ernie-4.5",        "messages": [            {"role": "user", "content": "请详细介绍一下Python编程语言的特点和应用领域"}        ],        "max_tokens": 512,        "temperature": 0.7,        "stream": True    }        start_time = time.time()        with requests.post(f"{BASE_URL}/v1/chat/completions", json=payload, stream=True) as response:        print(f"状态码: {response.status_code}")                if response.status_code == 200:            print("流式响应内容:")            full_content = ""                        for line in response.iter_lines():                if line:                    line = line.decode('utf-8')                    if line.startswith('data: '):                        data = line[6:]  # 移除 'data: ' 前缀                                                if data == '[DONE]':                            print("\n[流式响应完成]")                            break                                                try:                            chunk = json.loads(data)                            if 'choices' in chunk and len(chunk['choices']) > 0:                                delta = chunk['choices'][0].get('delta', {})                                content = delta.get('content', '')                                if content:                                    print(content, end='', flush=True)                                    full_content += content                        except json.JSONDecodeError:                            continue                        end_time = time.time()            print(f"\n\n总耗时: {end_time - start_time:.2f}秒")            print(f"生成内容长度: {len(full_content)}字符")        else:            print(f"错误: {response.text}")    print()def test_non_stream_chat():    """测试非流式聊天对比"""    print("=== 非流式聊天测试(对比) ===")        payload = {        "model": "ernie-4.5",        "messages": [            {"role": "user", "content": "请简单介绍一下人工智能"}        ],        "max_tokens": 256,        "temperature": 0.7,        "stream": False    }        start_time = time.time()    response = requests.post(f"{BASE_URL}/v1/chat/completions", json=payload)    end_time = time.time()        print(f"状态码: {response.status_code}")    print(f"响应时间: {end_time - start_time:.2f}秒")        if response.status_code == 200:        result = response.json()        content = result['choices'][0]['message']['content']        print(f"生成内容: {content}")        print(f"Token使用: {result['usage']}")    else:        print(f"错误: {response.text}")    print()async def test_async_stream():    """测试异步流式请求"""    print("=== 异步流式测试 ===")        payload = {        "model": "ernie-4.5",        "messages": [            {"role": "user", "content": "请用一段话描述机器学习的基本概念"}        ],        "max_tokens": 300,        "temperature": 0.8,        "stream": True    }        async with aiohttp.ClientSession() as session:        start_time = time.time()        async with session.post(f"{BASE_URL}/v1/chat/completions", json=payload) as response:            print(f"状态码: {response.status}")                        if response.status == 200:                print("异步流式响应:")                full_content = ""                                async for line in response.content:                    line = line.decode('utf-8').strip()                    if line.startswith('data: '):                        data = line[6:]                                                if data == '[DONE]':                            print("\n[异步流式响应完成]")                            break                                                try:                            chunk = json.loads(data)                            if 'choices' in chunk and len(chunk['choices']) > 0:                                delta = chunk['choices'][0].get('delta', {})                                content = delta.get('content', '')                                if content:                                    print(content, end='', flush=True)                                    full_content += content                        except json.JSONDecodeError:                            continue                                end_time = time.time()                print(f"\n\n异步总耗时: {end_time - start_time:.2f}秒")                print(f"生成内容长度: {len(full_content)}字符")            else:                print(f"错误: {await response.text()}")    print()def test_concurrent_streams():    """测试并发流式请求"""    print("=== 并发流式测试 ===")        def single_stream_request(request_id):        payload = {            "model": "ernie-4.5",            "messages": [                {"role": "user", "content": f"请简单介绍编程语言的发展历史(请求{request_id})"}            ],            "max_tokens": 200,            "temperature": 0.7,            "stream": True        }                start_time = time.time()        try:            with requests.post(f"{BASE_URL}/v1/chat/completions", json=payload, stream=True, timeout=60) as response:                if response.status_code == 200:                    content_length = 0                    for line in response.iter_lines():                        if line:                            line = line.decode('utf-8')                            if line.startswith('data: '):                                data = line[6:]                                if data == '[DONE]':                                    break                                try:                                    chunk = json.loads(data)                                    if 'choices' in chunk and len(chunk['choices']) > 0:                                        delta = chunk['choices'][0].get('delta', {})                                        content = delta.get('content', '')                                        content_length += len(content)                                except json.JSONDecodeError:                                    continue                                        end_time = time.time()                    return {                        "request_id": request_id,                        "success": True,                        "response_time": end_time - start_time,                        "content_length": content_length                    }                else:                    return {                        "request_id": request_id,                        "success": False,                        "error": response.text                    }        except Exception as e:            return {                "request_id": request_id,                "success": False,                "error": str(e)            }        num_requests = 3        with ThreadPoolExecutor(max_workers=num_requests) as executor:        start_time = time.time()        futures = [executor.submit(single_stream_request, i+1) for i in range(num_requests)]        results = [future.result() for future in futures]        end_time = time.time()        print(f"并发流式请求总耗时: {end_time - start_time:.2f}秒")    print(f"成功请求: {sum(1 for r in results if r['success'])}/{num_requests}")        for result in results:        if result['success']:            print(f"流式请求{result['request_id']}: 耗时={result['response_time']:.2f}秒, 内容长度={result['content_length']}字符")        else:            print(f"流式请求{result['request_id']}: 失败 - {result['error']}")    print()def main():    """主测试函数"""    print("开始测试ERNIE API流式服务...")    print("请确保服务已在 http://localhost:6006 启动\n")        # 等待服务启动    print("等待服务启动...")    for i in range(30):        try:            response = requests.get(f"{BASE_URL}/health", timeout=2)            if response.status_code == 200:                print("服务已启动\n")                break        except:            time.sleep(2)    else:        print("服务启动超时,请检查服务是否正常运行")        return        # 运行测试    test_stream_chat()    test_non_stream_chat()        # 运行异步测试    asyncio.run(test_async_stream())        test_concurrent_streams()        print("所有流式测试完成!")if __name__ == "__main__":    main()

测试内容包括:

4.3 性能基准测试

# 测试单次推理延迟time curl -X POST http://localhost:6006/v1/chat/completions \  -H "Content-Type: application/json" \  -d '{"model": "ernie-4.5", "messages": [{"role": "user", "content": "你好"}], "max_tokens": 100}'

五、性能调优建议

    模型量化 : 考虑使用INT4/8量化减少内存占用批处理优化 : 调整batch size平衡延迟和吞吐量缓存策略 : 实现KV缓存优化重复推理多卡部署 : 使用模型并行提升处理能力

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

昇腾910B 文心大模型 NPU部署
相关文章