原创 王舒虹 2025-07-16 18:50 上海
如何用 Kimi K2 搭建 agent?本文手把手教你用 Kimi K2+Milvus,搭建免手动 embedding、chucking,可直传文件、智能问答及数据库管理的生产级 agent
最近,月之暗面的Kimi K2火出了圈。Hugging face 联合创始人在内业内大佬一致评价,作为开源模型的K2多数能力已经接近顶级闭源模型水平。Kimi K2凭什么?主要原因有二:开源SOTA表现,和极强的Agent能力。先说表现,Kimi K2拿下了AIME2025 在内多个benchmark的SOTA表现。代码、Agent、工具调用相关的基准测试表现,更是开源模型top。有用户实测,编码能力上,Kimi K2表现基本追平Claude 4,但是仅有Claude 4成本的五分之一。而相较发布仅提前一天的Grok4,Kimi K2更是全方位碾压。不过,最重要的是Kimi K2已经初步具备了任务规划和使用工具的能力,只要告诉它有哪些工具可以使用,它就会根据任务的需求,自主地调用不同的工具来完成任务,并且知道什么时候应该启用什么工具,什么时候切换、什么时候停止。此外,Kimi K2 的 API 还兼容OpenAI 和 Anthropic 等API 格式,时下流行的Claude Code 也可以与 Kimi K2对接使用。不难发现,Kimi K2 是瞄着agent这个市场来的。那么Kimi K2 表现究竟如何,如何用Kimi K2 搭建一个agent?本文将给出手把手教程,展示如何用Kimi K2 +Milvus,搭建一个不用手动embedding、chucking,就能直接上传文件、智能问答,并做数据库管理的生产级agent。01 系统架构概述这是一个基于Kimi K2和Milvus的智能文档问答系统,用户只需一句话就能完成复杂的文件上传、向量化、搜索等任务。该系统的功能包括:自动处理文件上传和切分,智能语义搜索,智能决策等。代码主要有2个类别:VectorDatabase类:整个系统的数据处理核心,负责与向量数据库的所有交互。SmartAssitant类:系统的大脑,负责理解用户意图并调用合适的工具。用户通过自然语言命令与SmartAssistant交互,智能助手调用Kimi K2进行决策,然后通过7个专业工具函数操作Milvus向量数据库。02导入库和基础配置这里pymilvus是Milvus向量数据库操作的库,openai 用于调用Kimi和OpenAI API(Kimi K2 的 API 兼容OpenAI 和 Anthropic的好处这就体现出来了)
import json
import numpy as np
from typing import List, Dict
from pymilvus import MilvusClient, DataType
from openai import OpenAI
import time
import os
import re
def __init__(self, openai_api_key: str):
print("🔧 初始化向量数据库组件...")
# OpenAI客户端,用于生成文本向量
self.openai_client = OpenAI(api_key=openai_api_key)
self.vector_dimension = 1536 # OpenAI text-embedding-3-small的向量维度
# Milvus客户端
self.milvus_client = None
print("✅ 向量数据库组件初始化完成")
def generate_vector(self, text: str) -> List[float]:
"""将文本转换为向量"""
response = self.openai_client.embeddings.create(
input=[text],
model="text-embedding-3-small"
)
return response.data[0].embedding
def connect_database(self) -> dict:
"""连接到Milvus向量数据库"""
try:
self.milvus_client = MilvusClient(
uri="http://localhost:19530"
)
return {"success": True, "message": "成功连接到Milvus向量数据库"}
except Exception as e:
return {"success": False, "message": f"连接失败: {str(e)}"}
向集合添加文档def create_collection(self, collection_name: str, description: str = "") -> dict:
"""创建文档集合"""
try:
# 检查是否已连接数据库
if
self
.milvus_client is None:
return
{"success": False, "message": "请先连接数据库"}
# 检查集合是否已存在
if self.milvus_client.has_collection(collection_name):
return {"success": False, "message": f"集合 {collection_name} 已存在"}
# 定义集合结构
schema = self.milvus_client.create_schema(
auto_id=True,
enable_dynamic_field=False,
description=description
)
# 添加字段
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=2000)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=self.vector_dimension)
# 创建索引参数
index_params = self.milvus_client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="IVF_FLAT",
metric_type="COSINE",
params={"nlist": 128}
)
# 创建集合
self.milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params
)
return {"success": True, "message": f"成功创建集合 {collection_name}"}
except Exception as e:
return {"success": False, "message": f"创建集合失败: {str(e)}"}
def add_documents(self, collection_name: str, documents: List[str]) -> dict:
"""向集合添加文档"""
try:
# 检查是否已连接数据库
if
self
.milvus_client is None:
return
{"success": False, "message": "请先连接数据库"}
# 为每个文档生成向量
print(f"📝 正在为 {len(documents)} 个文档生成向量...")
vectors = []
for doc in documents:
vector = self.generate_vector(doc)
vectors.append(vector)
# 准备插入数据
data = []
for i, (doc, vector) in enumerate(zip(documents, vectors)):
data.append({
"text": doc,
"vector": vector
})
# 插入数据
result = self.milvus_client.insert(
collection_name=collection_name,
data=data
)
return {
"success": True,
"message": f"成功添加 {len(documents)} 个文档到集合 {collection_name}",
"inserted_count": len(result["insert_count"]) if "insert_count" in result else len(documents)
}
except Exception as e:
return {"success": False, "message": f"添加文档失败: {str(e)}"}
def search_documents(self, collection_name: str, query: str, limit: int = 5) -> dict:
"""搜索相似文档"""
try:
# 检查是否已连接数据库
if
self
.milvus_client is None:
return
{"success": False, "message": "请先连接数据库"}
# 将查询文本转换为向量
query_vector = self.generate_vector(query)
# 搜索参数
search_params = {
"metric_type": "COSINE",
"params": {"nprobe": 10}
}
# 执行搜索
results = self.milvus_client.search(
collection_name=collection_name,
data=[query_vector],
anns_field="vector",
search_params=search_params,
limit=limit,
output_fields=["text"]
)
# 整理搜索结果
found_docs = []
for result in results[0]: # 取第一个查询的结果
found_docs.append({
"text": result["entity"]["text"],
"similarity": f"{(1 - result['distance']) * 100:.1f}%"
})
return {
"success": True,
"message": f"找到 {len(found_docs)} 个相关文档",
"query": query,
"results": found_docs
}
except Exception as e:
return {"success": False, "message": f"搜索失败: {str(e)}"}
查询集合
获取集合名称、文档数量、描述信息
def list_all_collections(self) -> dict:
"""查询数据库中所有集合"""
try:
# 检查是否已连接数据库
if
self
.milvus_client is None:
return
{"success": False, "message": "请先连接数据库"}
# 获取所有集合名称
collections = self.milvus_client.list_collections()
if not collections:
return {
"success": True,
"message": "数据库中暂无集合",
"collections": []
}
# 获取每个集合的详细信息
collection_details = []
for collection_name in collections:
try:
# 获取集合统计信息
stats = self.milvus_client.get_collection_stats(collection_name)
doc_count = stats.get("row_count", 0)
# 获取集合描述
desc_result = self.milvus_client.describe_collection(collection_name)
description = desc_result.get("description", "无描述")
collection_details.append({
"name": collection_name,
"document_count": doc_count,
"description": description
})
except Exception as e:
collection_details.append({
"name": collection_name,
"document_count": "获取失败",
"description": f"错误: {str(e)}"
})
return {
"success": True,
"message": f"数据库中共有 {len(collections)} 个集合",
"total_collections": len(collections),
"collections": collection_details
}
except Exception as e:
return {"success": False, "message": f"查询集合失败: {str(e)}"}
def split_text_into_chunks(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
"""将长文本切分成小块"""
# 清理文本
text = text.strip()
# 按段落分割
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
chunks = []
current_chunk = ""
for paragraph in paragraphs:
# 如果当前段落太长,需要进一步切分
if len(paragraph) > chunk_size:
# 先保存当前chunk
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = ""
# 按句子切分长段落
sentences = re.split(r'[。!?.!?]', paragraph)
temp_chunk = ""
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if len(temp_chunk + sentence) <= chunk_size:
temp_chunk += sentence + "。"
else:
if temp_chunk:
chunks.append(temp_chunk.strip())
temp_chunk = sentence + "。"
if temp_chunk:
chunks.append(temp_chunk.strip())
# 如果加上这个段落不会超过限制
elif len(current_chunk + paragraph) <= chunk_size:
current_chunk += paragraph + "\n\n"
# 如果会超过限制,先保存当前chunk,再开始新的
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = paragraph + "\n\n"
# 保存最后一个chunk
if current_chunk:
chunks.append(current_chunk.strip())
# 添加重叠内容提高上下文连贯性
if overlap > 0 and len(chunks) > 1:
overlapped_chunks = []
for i, chunk in enumerate(chunks):
if i == 0:
overlapped_chunks.append(chunk)
else:
# 从前一个chunk取部分内容作为重叠
prev_chunk = chunks[i-1]
overlap_text = prev_chunk[-overlap:] if len(prev_chunk) > overlap else prev_chunk
overlapped_chunk = overlap_text + "\n" + chunk
overlapped_chunks.append(overlapped_chunk)
chunks = overlapped_chunks
return chunks
文件上传到集合调用read_and_chunk_filed对用户上传的文件进行切分,并生成向量存到指定集合中元数据增强:source_file 记录文档来源,chunk_index 块的顺序索引,total_chunks 总的块数,便于追踪完整性
def read_and_chunk_file(self, file_path: str, chunk_size: int = 500, overlap: int = 50) -> dict:
"""读取本地文件并切分成chunks"""
try:
# 检查文件是否存在
if not os.path.exists(file_path):
return {"success": False, "message": f"文件不存在: {file_path}"}
# 获取文件信息
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
# 根据文件扩展名选择读取方式
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in ['.txt', '.md', '.py', '.js', '.html', '.css', '.json']:
# 文本文件,尝试多种编码
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
content = None
used_encoding = None
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
used_encoding = encoding
break
except UnicodeDecodeError:
continue
if content is None:
return {"success": False, "message": "无法读取文件,编码格式不支持"}
# 切分文本
chunks = self.split_text_into_chunks(content, chunk_size, overlap)
# 为每个chunk添加元数据
chunk_data = []
for i, chunk in enumerate(chunks):
chunk_data.append({
"text": chunk,
"source_file": file_name,
"chunk_index": i,
"total_chunks": len(chunks)
})
return {
"success": True,
"message": f"成功读取并切分文件 {file_name}",
"total_chunks": len(chunks),
"chunks": chunk_data
}
except Exception as e:
return {"success": False, "message": f"读取文件失败: {str(e)}"}
def upload_file_to_collection(self, file_path: str, collection_name: str, chunk_size: int = 500, overlap: int = 50) -> dict:
"""上传文件到指定集合"""
try:
# 检查是否已连接数据库
if
self
.milvus_client is None:
return
{"success": False, "message": "请先连接数据库"}
# 先读取并切分文件
result = self.read_and_chunk_file(file_path, chunk_size, overlap)
if not result["success"]:
return result
chunk_data = result["chunks"]
print(f"📝 正在为 {len(chunk_data)} 个文本块生成向量...")
# 为每个chunk生成向量
vectors = []
texts = []
for chunk_info in chunk_data:
vector = self.generate_vector(chunk_info["text"])
vectors.append(vector)
# 创建包含元数据的文本
enriched_text = f"[文件: {chunk_info['source_file']} | 块: {chunk_info['chunk_index']+1}/{chunk_info['total_chunks']}]\n{chunk_info['text']}"
texts.append(enriched_text)
# 准备插入数据
data = []
for i, (text, vector) in enumerate(zip(texts, vectors)):
data.append({
"text": text,
"vector": vector
})
# 插入数据到集合
insert_result = self.milvus_client.insert(
collection_name=collection_name,
data=data
)
return {
"success": True,
"message": f"成功上传文件 {result['file_name']} 到集合 {collection_name}",
"file_name": result["file_name"],
"file_size": result["file_size"],
"total_chunks": result["total_chunks"],
"average_chunk_size": result["average_chunk_size"],
"inserted_count": len(data),
"collection_name": collection_name
}
except Exception as e:
return {"success": False, "message": f"上传文件失败: {str(e)}"}
connect_database
- 数据库连接管理