稀土掘金技术社区 04月23日 10:07
性能飞跃!Node.js 亿级文件读写优化
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文探讨了如何优化Node.js环境下的文件处理,特别是针对大文件场景。文章首先分析了传统方案在处理大型文本文件时遇到的内存和CPU瓶颈,如全量加载、单线程CPU过载和I/O瓶颈。接着,文章对比了多种文件处理方式,包括同步/异步读写、流式读写、内存映射等,并分析了它们各自的优缺点。最后,作者提出了一种基于多线程和字节处理的优化方案,通过分块、并行处理和减少字符串操作来提升性能。

💡 传统方案的局限性:文章指出了传统文件处理方案在处理大文件时常遇到的问题,如内存爆仓、CPU过载以及I/O瓶颈,这些问题导致服务稳定性下降。

💻 常见文件处理方式对比:文章对比了Node.js中多种文件处理方式,包括同步读写、回调异步读写、Promise异步读写、流式读写和内存映射。分析了不同方式的适用场景和优缺点,为开发者提供了选择的参考。

🚀 多线程与字节处理优化方案:文章重点介绍了基于多线程和字节处理的优化方案。该方案通过多线程并行处理文件,并使用Buffer进行字节级别的处理,减少了字符串操作,从而显著提升了处理速度。该方案还涉及到分块处理和自定义分隔符的处理。

✅ 优化方案的关键技术细节:文章详细阐述了优化方案的关键技术细节,包括使用多线程、Buffer字节处理、自定义分隔符以及流式读取。这些技术细节的结合,使得该方案能够高效地处理大文件,并有效避免了传统方案的性能瓶颈。

原创 kevlin_coder 2025-04-23 08:30 重庆

点击关注公众号,“技术干货”及时达!

💰即日起至5.11发文瓜分万元奖金
📃主赛道推好文,AI命题叠福利
🔎点击下方活动图了解详情


业务背景

所负责的业务场景是从hdfs中捞取生成的数据生成文本文件(txtcsv)等格式,并且压缩上传到对象存储,文件大小从几kb到几十个gb, 数据行数几十亿,经常是不是的内存就是CPU告警,迫切解决服务稳定性问题。

🔥 痛点爆破:为何传统方案频发内存 / CPU 告警?

    内存黑洞:全量加载百 GB 文件导致堆内存爆仓
    CPU 过载:单线程核心利用率飙至 440%+
    I/O 风暴:频繁读写文件

客户端(浏览器)上传场景

回顾下前端上传文件时对文件的处理的常见方式

    通过JSON传递
      优点:这种方式是最舒适的传递方式,首先前端把文件对象转换成base64,再以字符串的形式传递给服务端。通过这种方式可以享受到JSON的灵活性,需要考虑的问题较少。
      缺点:base64编码会导致传输体积增大,并且这个计算过程也需要消耗浏览器有限的资源。在移动端如果通过这个方式发送稍微大一些的文件,就会感觉到温度和卡顿了。
    通过formData传输
      优点:直接传递二进制文件给服务端,没有额外的带宽和性能开销。
      缺点:在TypeScript开发环境下,formData的使用体验就是不得劲。并且formData是扁平的,不能对象嵌套对象,但是JSON可以。以及服务端接收formData需要写的代码量也比JSON多。
    向服务端请求token后,将文件上传到对象存储
      优点:通常对象存储会封装很多逻辑,上传控制各方面也不需要前端关心了。
      缺点:逻辑会比较复杂,往往需要先和业务逻辑请求一次,再去请求对象存储服务器,上传成功后再请求业务逻辑。 如果文件很小,就几百KB,那毫不犹豫会选择方法 1,实在是太爽了。如果是 MB 级别,还是formData比较好。再大会倾向于方案 3 分片上传下载 + 断点续传 (这里就不展开描述了)

Nodejs 读写文件

同步读写(最简单直接)

const fs = require('node:fs');

// 同步读取
const data = fs.readFileSync('/path/to/small/file.txt', 'utf8');
console.log(data);

// 同步写入
fs.writeFileSync('/path/to/output.txt', 'Hello World', 'utf8');


回调异步读写(传统方式)

const fs = require('node:fs');

// 异步读取
fs.readFile('/path/to/small/file.txt', 'utf8', (err, data) => {
  if (err) throw err;
  console.log(data);
  
  // 异步写入
  fs.writeFile('/path/to/output.txt', data, 'utf8', (err) => {
    if (err) throw err;
  });
});


Promise 异步读写(现代方式)

const fs = require('node:fs/promises');

async function readWriteFile() {
  try {
    // Promise方式读取
    const data = await fs.readFile('/path/to/small/file.txt', 'utf8');
    console.log(data);
    
    // Promise方式写入
    await fs.writeFile('/path/to/output.txt', data, 'utf8');
  } catch (err) {
    console.error(err);
  }
}

readWriteFile();


流式读写(适合稍大文件或需要逐行处理)

const fs = require('node:fs');
const readline = require('node:readline');

// 流式读取
const readStream = fs.createReadStream('/path/to/small/file.txt', {
  encoding: 'utf8',
  highWaterMark: 1024 // 每次读取的字节数
});

// 流式写入
const writeStream = fs.createWriteStream('/path/to/output.txt');

// 逐行处理
const rl = readline.createInterface({
  input: readStream,
  crlfDelay: Infinity
});

rl.on('line', (line) => {
  writeStream.write(`${line}\n`);
});

rl.on('close', () => {
  writeStream.end();
});


const fs = require('node:fs');

function copyLargeFile(src, dest) {
  return new Promise((resolve, reject) => {
    // 1. 创建可读流(128MB分块)
    const readStream = fs.createReadStream(src, {
      highWaterMark: 128 * 1024 * 1024
    });

    // 2. 创建可写流(64MB缓冲)
    const writeStream = fs.createWriteStream(dest, {
      highWaterMark: 64 * 1024 * 1024
    });

    // 3. 管道连接与错误处理
    readStream
      .on('error', reject)
      .pipe(writeStream)
      .on('error', reject)
      .on('finish', () => {
        console.log(`文件 ${src} 已成功拷贝至 ${dest}`);
        resolve();
      });
  });
}

// 使用示例
copyLargeFile('xxx.txt', 'copy_xxx.txt')
  .catch(err => console.error('处理失败:', err));


内存映射(高性能方式)

const fs = require('node:fs');
const { Buffer } = require('node:buffer');

// 内存映射读取
fs.open('/path/to/small/file.txt', 'r', (err, fd) => {
  if (err) throw err;

  const stats = fs.fstatSync(fd);
  const buffer = Buffer.alloc(stats.size);

  fs.read(fd, buffer, 0, buffer.length, 0, (err) => {
    if (err) throw err;
    console.log(buffer.toString('utf8'));
    fs.close(fd);
  });
});


当前业务实现

核心片段

const fs = require('node:fs');
const through2 = require('through2');
const split = require('split');

// 1. 创建可读流
fs.createReadStream('ex.txt')
    .on('error', reject)
    .pipe(split(os.EOL))
    .on('error', reject)
    .pipe(
    .pipe(through2(function (chunk, enc, callback) {
        // 逻辑处理
  }))
  // 3. 创建写入流
  .pipe(fs.createWriteStream('out.txt'))
  // 4. 完成回调
  .on('finish', () => doSomethingSpecial());



    through2[1]
      是 Node.js 流(Streams)工具库,用于快速创建 Transform 流(数据转换流),无需手动继承 stream.Transform 并编写子类。它通过简洁的函数式 API,简化了流的定义和操作,适用于对数据进行中间处理(如修改、过滤、转换等)。
    输入流:fs.createReadStream创建可读流
    转换流水线:
      split(os.EOL):按行分割文本
    输出流:fs.createWriteStream写入目标文件

分析代码并处理

代码问题

    使用createReadStreampipe来处理文件,用到了splitthrough2库来按行处理。这可能存在性能问题
    因逐行处理字符串可能比较慢,尤其是在处理大文件时。避免使readline和字符串分割,改用直接处理字节,这样可以提高速度
    未开启多线程,可以尝试将文件分成多个块,每个工作线程处理一个块,独立处理其中的行和分隔符

最终实现

import * as fs from 'fs';
import * as os from 'os';
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';

// 配置常量
const FILE_DELIMITER = 'YOUR_FILE_DELIMITER';
const EOL = os.EOL;
const EOL_BUFFER = Buffer.from(EOL);
const DELIMITER_LINE = Buffer.from(FILE_DELIMITER + EOL);
const WORKER_COUNT = os.cpus().length;

// 主线程预处理函数
async function findDelimiterPositions(filePath: string) {
  const fileSize = (await fs.promises.stat(filePath)).size;
  const chunkSize = 1024 * 1024;
  const positions: number[] = [];
  let previousRemaining = Buffer.alloc(0);

  const fd = await fs.promises.open(filePath, 'r');
  try {
    for (let offset = 0; offset < fileSize;) {
      const buffer = Buffer.alloc(chunkSize + DELIMITER_LINE.length);
      const { bytesRead } = await fd.read(buffer, 0, chunkSize, offset);
      if (!bytesRead) break;

      const combined = Buffer.concat([previousRemaining, buffer.subarray(0, bytesRead)]);
      let pos = 0;

      while (pos < combined.length) {
        const idx = combined.indexOf(DELIMITER_LINE, pos);
        if (idx === -1) {
          previousRemaining = combined.subarray(pos);
          break;
        }
        positions.push(offset - previousRemaining.length + idx);
        pos = idx + DELIMITER_LINE.length;
      }

      offset += bytesRead;
    }
  } finally {
    await fd.close();
  }
  return positions;
}

// 工作线程处理逻辑
function workerProcess() {
  const { filePath, start, end, filePo } = workerData;
  const stream = fs.createReadStream(filePath, { start, end });
  const result: Buffer[] = [];
  let stationBuffer = Buffer.alloc(100);
  let tempBuffer = Buffer.alloc(5);
  let linePo = 0;

  // 字节处理状态机
  let state = 0; // 0: 正常行处理,1: 分隔符处理
  let buffer = Buffer.alloc(0);

  return new Promise<void>((resolve) => {
    stream.on('data', (chunk: Buffer) => {
      buffer = Buffer.concat([buffer, chunk]);

      while (true) {
       olIndex = buffer.indexOf(EOL_BUFFER);
        if (eolIndex === -1) break;

        const line = buffer.subarray(0, eolIndex);
        buffer = buffer.subarray(eolIndex + EOL_BUFFER.length);

        if (line.equals(DELIMITER_LINE.subarray(0, line.length))) {
          // 分隔符行直接跳过
          continue;
        }

        // 字节到字符串转换优化
        const transformed = processLine(line, linePo, filePo);
        result.push(Buffer.from(transformed + EOL));
        linePo++;
      }
    });

    stream.on('end', () => {
      parentPort.postMessage({
        filePo,
        data: Buffer.concat(result)
      });
      resolve();
    });
  });
}

// 优化的行处理函数
function processLine(lineBuffer: Buffer, linePo: number, filePo: number) {
  // 实现你的自定义转换逻辑
  return lineBuffer.toString() + `_processed_${filePo}_${linePo}`;
}

// 主线程逻辑
async function main() {
  const filePath = process.argv[2];
  const outputPath = process.argv[3];

  const delimiterPositions = await findDelimiterPositions(filePath);
  const fileSize = (await fs.promises.stat(filePath)).size;
  const parts = [];
  let prevPos = 0;

  // 生成处理区间
  for (const pos of delimiterPositions) {
    parts.push({ start: prevPos, end: pos, filePo: parts.length });
    prevPos = pos + DELIMITER_LINE.length;
  }
  parts.push({ start: prevPos, end: fileSize, filePo: parts.length });

  // 创建工作线程
  const workers = new Map();
  const results = new Map();

  parts.forEach((part, idx) => {
    const worker = new Worker(__filename, {
      workerData: { ...part, filePath }
    });
    
    worker.on('message', (msg) => {
      results.set(msg.filePo, msg.data);
      if (results.size === parts.length) {
        // 按顺序写入结果
        const sorted = Array.from(results.keys()).sort((a, b) => a - b);
        const ws = fs.createWriteStream(outputPath);
        sorted.forEach(po => ws.write(results.get(po)));
        ws.end();
      }
    });
    
    workers.set(idx, worker);
  });
}

// 启动入口
if (isMainThread) {
  main();
} else {
  workerProcess();
}


总结

    多用 ai 分析当前代码可能存在的问题
    替换逐行的字符串处理为基于 Buffer 的字节处理,减少 split 和字符串转换的开销
    尽可能的提升代码的执行速度,开启多线程

参考文档

jackyef.com/posts/1brc-…[2]

github.com/rvagg/throu…[3]

关注更多AI编程资讯请去AI Coding专区:https://juejin.cn/aicoding

阅读原文

跳转微信打开

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Node.js 文件处理 大文件 性能优化
相关文章