大厂不愧是大厂呀!就是稳定可靠,扩展性也是真的流弊。如果想在dify工作流实现一些特殊功能,最理想的方式还得创建自定义插件,然后嵌到项目里面。
前置要求:
- 使用docker成功运行dify,已创建工作流。已下载dify源码。python 3.12 以及对应的IDE(vscode/pycharm)。(可选)uv ,这样初始化虚拟环境以及依赖包安装会非常方便
参考网站:
注意事项:
- 善于使用:
docker logs -f --tail 100 dify-plugin-daemon
命令,要求随时观察插件运行情况,确保插件完全退出之后,才能再重启插件,不然会报错,参考这个:github.com/langgenius/… 。界面无法识别插件,重复启动也不行默认的生成插件源码不支持热重载;dify的文本提取器,输出是不包含页码的,我想开发一个可以输出页码的文本提取器。window10系统,vscode,以此为例:
下载插件
Releases · langgenius/dify-plugin-daemon
下载完成后,将下载好的exe(dify-plugin-windows-amd64.exe
)移动到新项目的根目录中,在该目录打开vscode。(不要修改环境变量,没什么用!)
.\dify-plugin-windows-amd64.exe plugin init
- Plugin name指的是新项目的名称,也就是会在当前目录中创建该文件夹,
my_text_exector_tool
;作者必须全英文,且小写;选python
选tool
- Tool: Tool Providers like Google Search, Stable Diffusion, etc. Used to perform specific tasks.- Model: Model Providers like OpenAI, Anthropic, etc. Use their models to enhance AI capabilities.- Endpoint: Similar to Service API in Dify and Ingress in Kubernetes. Extend HTTP services as endpoints with custom logic.- Agent Strategy: Implement your own agent strategies like Function Calling, ReAct, ToT, CoT, etc.Based on the ability you want to extend, Plugins are divided into four types: Tool, Model, Extension, and Agent Strategy.- Tool: A tool provider that can also implement endpoints. For example, building a Discord Bot requires both Sending and Receiving Messages, so both Tool and Endpoint functionality.- Model: Strictly for model providers, no other extensions allowed.- Extension: For simple HTTP services that extend functionality.- Agent Strategy: Implement custom agent logic with a focused approach.We've provided templates to help you get started. Choose one of the options below:-> tool agent-strategy llm text-embedding rerank tts speech2text moderation extension
建议选tool,功能很全。理论上来讲,我这个功能extionsion应该也能干,官方文档真看不懂(ノへ ̄、)。
Configure the permissions of the plugin, use up and down to navigate, tab to select, after selection, press enter to finishBackwards Invocation:Tools:功能 :允许在 Dify(工作流) 中调用自定义工具(如 PDF 解析库、数据处理脚本等) → Enabled: [✘] You can invoke tools inside Dify if it's enabled Models:功能 :启用后可调用 LLM、文本嵌入、TTS 等 AI 模型。 Enabled: [✘] You can invoke models inside Dify if it's enabled LLM: [✘] You can invoke LLM models inside Dify if it's enabled Text Embedding: [✘] You can invoke text embedding models inside Dify if it's enabled Rerank: [✘] You can invoke rerank models inside Dify if it's enabled TTS: [✘] You can invoke TTS models inside Dify if it's enabled Speech2Text: [✘] You can invoke speech2text models inside Dify if it's enabled Moderation: [✘] You can invoke moderation models inside Dify if it's enabled Apps:功能 :调用 Dify 内部应用(如聊天机器人、工作流)。 Enabled: [✘] Ability to invoke apps like BasicChat/ChatFlow/Agent/Workflow etc. Resources:功能 :持久化存储插件生成的数据(如解析结果、缓存文件)。Storage: Enabled: [✘] Persistence storage for the plugin Size: N/A The maximum size of the storage Endpoints:功能 :对外暴露 API 接口,供外部服务调用插件功能。 Enabled: [✘] Ability to register endpoints
插件要求解析pdf文件,输出页码,所以不需要Models功能,勾选Tools、Storage和EndPoints,
跳过
修改代码
初始化
复制prompt,将该页面文本复制到deepseek或其他大模型中中,大模型会变得非常智能docs.dify.ai/plugin-dev-…
创建虚拟环境
cd my_text_exector_tooluv init --python 3.12# 保证.python-version的内容为:3.12 # 创建虚拟环境uv venv .venv/Scripts/activatecp .env.example .env # pip install -r requirements.txt uv add -r requirements.txt
打开dify项目网站,将KEY复制到.env的REMOTE_INSTALL_KEY中,REMOTE_INSTALL_URL为difyIP地址:5003
,REMOTE_INSTALL_KEY 一定时间后会失效,需重新设置。
# 调试,启动python main.py{"event": "log", "data": {"level": "INFO", "message": "Installed tool: my_text_exector_tool", "timestamp": 1751855707.8209617}}INFO:dify_plugin.plugin:Installed tool: my_text_exector_tool
打开dify后端服务器,实时查看日志docker logs -f --tail 100 dify-plugin-daemon
注意:只有完全后端日志显示完全退出后,才能在此启动插件
界面和代码的对应关系
# provider\my_text_exector_tool.yamlidentity: author: "wwwwwwww" name: "my_text_exector_tool" label: en_US: "文本解析器provider" zh_Hans: "文本解析器provider" pt_BR: "my_text_exector_tool" description: en_US: "test" zh_Hans: "test" pt_BR: "test" icon: "icon.svg"tools: - tools/my_text_exector_tool.yamlextra: python: source: provider/my_text_exector_tool.py #---------------# # tools\my_text_exector_tool.yamlidentity: name: "my_text_exector_tool" author: "wwwwwwww" label: en_US: "my_text_exector_tool" zh_Hans: "文本解析器tool" pt_BR: "my_text_exector_tool"description: human: en_US: "工具" zh_Hans: "工具" pt_BR: "工具" llm: "工具"parameters: - name: file_info type: file required: true label: en_US: PDF File zh_Hans: PDF 文件 human_description: en_US: Upload the PDF file to parse. zh_Hans: 要解析的 PDF 文件。 llm_description: The PDF file to be parsed for text. form: llm # 参数在 UI 中填入(让用户上传文件)extra: python: source: tools/my_text_exector_tool.pyoutput_schema: type: object properties: name: type: string
pyproject.toml 部分dependencies = [ "dify-plugin>=0.2.0,<0.3.0", "pypdfium2==4.30.0",]
uv sync
from collections.abc import Generatorfrom typing import Anyfrom dify_plugin import Toolfrom dify_plugin.entities.tool import ToolInvokeMessageimport ioimport dify_plugin.file.fileimport pypdfium2import requestsimport osclass MyTextExectorToolTool(Tool): def get_uploaded_file(self, file_info: dify_plugin.file.file.File) -> bytes: # dify网站前端地址,例如:192.168.127.1:5201,只能强行指定或者通过环境变量获取 base_url = os.environ.get("DIFY_API_HOST", "192.168.127.1:5201") # 从运行时环境获取API密钥 try: file_dict = file_info.model_dump() url = file_dict.get("url") # print(file_dict) # 1. 获取文件元数据 file_url = f"http://{base_url}{url}" # print("file_url:", file_url) # 3. 下载文件内容 file_response = requests.get(file_url) file_response.raise_for_status() return file_response.content except requests.RequestException as e: raise Exception(f"File download failed: {str(e)}") except ValueError as e: raise Exception(f"Invalid file data: {str(e)}") def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]: try: """ IMPLEMENT YOUR VALIDATION HERE """ print("开始处理PDF文件") # 1. 从参数中获取文件ID (与YAML中的'name'字段对应) # print(tool_parameters) file_info: dify_plugin.file.file.File = tool_parameters.get( "file_info") # print(type(file_info)) # print(file_info) if not file_info: yield self.create_text_message("错误:必须上传一个文件。") return # 2. 使用Dify API获取文件内容(字节流) file_bytes = self.get_uploaded_file(file_info) if not file_bytes: yield self.create_text_message("错误:无法获取上传的文件内容。") return file_detail: dict[str, Any] = file_info.model_dump() # 3. 使用PyMuPDF(fitz)从内存中解析PDF字节流 extracted_pages = [] if file_detail["extension"] == ".pdf": pdf_file = io.BytesIO(file_bytes) pdf_document = pypdfium2.PdfDocument(pdf_file, autoclose=True) text = "" page_num = 0 for page in pdf_document: # page.get_ page_num += 1 text_page = page.get_textpage() extracted_pages.append({ "page_number": page_num, "text": text_page.get_text_range() }) text += text_page.get_text_range() + f"\n\n 页码:{page_num}" text_page.close() print(f"已提取第{page_num}页内容") page.close() pdf_document.close() # 5. 检查是否提取到内容 if not extracted_pages: yield self.create_text_message("未能从PDF中提取任何文本内容。") return # 6. 返回结构化的JSON结果(符合您的output_schema) yield self.create_json_message({"pages": extracted_pages}) return except Exception as e: # 捕获所有异常,并返回明确的错误信息 yield self.create_text_message(f"处理PDF时发生严重错误: {str(e)}")
工具嵌入到 dify 工作流
运行
python main.py
- 非常容易出现找不到parameters的
file_info
属性的bug,这个没多少办法,可以尝试刷新缓存必须通过:yield
来返回值点击运行,成功的样式如图,可以看到输入的import_files
变成了parameters中的file_info
;
获取输出参数
获取页码之后,对内容简单分组。(右键添加节点->代码执行)
# 分段def main(input_text): input_text = input_text[0]["pages"] print(input_text) max_length = 20000 grouped_data = [] current_group = {"group": len(grouped_data) + 1, "items": [], "total_length": 0} for item in input_text: text = item.get("text", "") text_length = len(text) print(len(text)) # 如果当前组为空或者加上新文本后超过最大长度,则新建一组 if current_group["total_length"] + text_length > max_length: if current_group["items"]: grouped_data.append(current_group) last_current_group = current_group # if last_current_group["items"].__len__() > = 1: last_current_group_items = last_current_group["items"] d = [last_current_group_items[-1]] if len(last_current_group_items)>0 else [] current_group = { "group": len(grouped_data) + 1, "items": d, "total_length": 0, } # 添加当前文本到组中 current_group["items"].append(item) current_group["total_length"] += text_length # 将最后一组加入结果 if current_group["items"]: grouped_data.append(current_group) grouped_data = grouped_data[:30] return {"result": grouped_data}
注意
输入变量和python函数的入参名必须保持一致
输出变量应python函数的返回值(字典)保持一致
代码执行的主函数为main()
可以通过上次执行查看数据结构