前言

主要步骤:

  1. 前端获取到File对象之后,对其进行文件hash值的计算和把文件按照一片2m大小进行文件切片;
  2. 切片完成直接调用切片文件上传接口,全部切片上传完成之后,请求文件合并接口,后端根据文件hash值和文件名进行文件合并并生成对应文件并且入库保存上传记录;

主要难点和坑点

  1. 前端批量上传文件切片时,对并发量进行控制,推荐使用队列来进行控制
  2. 后端使用nest生成流来合并文件的时候,当切文件太多了,生成的监听器太多了,node会报异常,解决方法时直接用递归,完成一个再执行下一个;

前端实现

具体代码参考
前端主要步骤:

  1. 对文件对象进行文件切片和文件hash值计算;
  2. 调用检查文件是否已经上传接口,如果之前已经上传过的相关切片直接跳过;
  3. 对切片文件进行并发上传和并发量的控制;上传完成请求文件合并接口;

处理文件

获取到文件对象之后,使用Worker开启一个线程去执行文件的切片和文件hash值得计算;使用的库为spark-md5
worker执行文件 worker.js

// 使用线程创建切片
const createChunksByWorker = (file) => {
    return new Promise((resolve, reject) => {
      const url = new URL('./worker.js', import.meta.url).href
      const myWorker = new Worker(url)
      myWorker.postMessage({ file, chunkSize: ChunkSize, })
      myWorker.onmessage = (e) => {
        // console.log('线程切片完成', e.data)
        resolve(e.data)
        myWorker.terminate()
      }
    })
}

上传执行函数
上传成功之后直接请求合并文件接口

  const uploadFile = async (file) => {
    // 设置文件名
    fileName.value = file.name
    // 获取文件hash值
    const { chunkList, hash, } = await createChunksByWorker(file)
    fileHash.value = hash
    chunkTotal.value = chunkList.length
    // console.log(chunks, fileName)
    const [err, data] = await checkFile({ hash, })
    const { isExist, chunks = [], } = data
    curProgress.value = chunks.length
    if (err) {
      console.log(err, data)
      return
    }
    console.log(
      'curProgress==================>',
      curProgress.value,
      chunkTotal.value,
      curProgress.value / chunkTotal.value
    )
    if (isExist) {
      messageDanger('文件已存在')
      loading.value = false
      return
    }
    // 只上传没上传成功的
    const filterChunkList = chunkList.filter(v => !chunks.includes(v.index))
    const uploaded = await uploadChunks(filterChunkList)
    if (uploaded) {
      currentIndex = 0
      const [err, data] = await mergeFile({
        chunks: chunkTotal.value,
        fileName: fileName.value,
        hash,
      })
      if (!err) {
        messageSuccess('上传成功')
      }
      setTimeout(() => {
        starting.value = false
        loading.value = false
      }, 1000)
    }
  }

上传切片文件

并发数控制
限制请求并发数队列实现;首次最大并发数请求,后续一个请求完成则继续从队列取出数据接着请求,先进先出

let currentIndex = 0function limitRequests2 (chunks, fn, maxRequest) {
    requests = [...chunks]
    const totalRequests = chunks.length
    const requestFn = fn
    let errorNum = 0 // 错误数
    currentIndex = 0
    return new Promise((resolve, reject) => {
      const makeRequest = async (chunk) => {
        if (stopUpload) {
          reject(new Error('停止'))
          return
        }
        try {
          await requestFn(toFormData(chunk))
          currentIndex++
          curProgress.value++
          // console.log('Request to completed', response)
        } catch (error) {
          errorNum++
          // 请求失败重试逻辑
          if (errorNum < 10) {
            console.log('Retrying request to', errorNum)
            await makeRequest(chunk) // 重新发起请求
          }
        } finally {
          // await之后执行
          // 如果还有待处理的请求,则发起下一个请求
          if (requests.length > 0) {
            await makeRequest(requests.shift())
          }
          if (errorNum > 10) {
            reject(new Error('错误数太多'))
          }
          if (requests.length === 0 && currentIndex === totalRequests) {
            resolve(true)
          }
        }
      }
      // 初始化拿出最大最大并发数请求
      for (let i = 0; i < maxRequest; i++) {
        makeRequest(requests.shift())
      }
    })
  }

// 批量上传切片 const uploadChunks = async (chunks) => { try { const resList = await limitRequests2(chunks, uploadHandler, MaxRequest) // const resList = await limitRequests1(chunks) console.log('resList====>', resList) return resList } catch (error) { // console.log(error) } } // 转为formData const toFormData = (chunk) => { const fd = new FormData() Object.keys(chunk).forEach(k => fd.append(k, chunk[k])) return fd } // 上传 const uploadHandler = (formData) => { return uploadFileRequest(formData) }

后端实现

具体代码实现
需要实现三个接口:

  1. 上传文件切片接口;
  2. 检查文件是否已经上传过;
  3. 文件合并接口

    Multer相关配置

import * as fs from 'fs';
@Module({
  imports: [
    TypeOrmModule.forFeature([FileStore]),
    MulterModule.registerAsync({
      imports: [],
      useFactory: async () => ({
        storage: diskStorage({
          // 配置文件上传后的文件夹路径
          destination: (req, file, callback) => {
            const { hash, index } = req.query;
            const path = `${Config.fileConfig.filePath}tempFolder/${hash}`;
            // 创建文件夹
            fs.mkdir(path, { recursive: true }, (err) => {
              if (err) {
                console.log('创建文件夹失败', err);
                return;
              }
            });
            callback(null, path);
          },
          filename: (req, file, cb) => {
            const { hash, index } = req.query;
            const filename = `${hash}-${index}`;

        <span class="hljs-keyword">return</span> <span class="hljs-title function_">cb</span>(<span class="hljs-literal">null</span>, filename);
      },
    }),
  }),
  <span class="hljs-attr">inject</span>: [],
}),

] }) export class FileModule {}

处理合并文件操作

async mergeFile(body: any) {
    return await new Promise(async (resolve, reject) => {
      const { hash, fileName, chunks } = body;
      const basePath = `${Config.fileConfig.filePath}tempFolder`;
      const slicePath = `${basePath}/${hash}`;
      const folderSize = await getFolderSizeBin(basePath);
      const sliceSize = await getFolderSizeBin(basePath);
      const status = HttpStatus.INTERNAL_SERVER_ERROR;
      // console.log('folderSize=============>', folderSize);
      // 4194304 4M 524288000 500M 2147483648 2G
      if (folderSize > 2147483648) {
        // promise 响应错误
        reject(new HttpException('硬盘内存不足了~', status));
        return;
      }
      let files = [];
      try {
        // 文件夹不存在会报错
        files = fs.readdirSync(slicePath);
      } catch (error) {
        reject(new HttpException('文件不存在!', status));
        return;
      }

  <span class="hljs-keyword">if</span> (chunks !== files.<span class="hljs-property">length</span>) {
    <span class="hljs-title function_">reject</span>(<span class="hljs-keyword">new</span> <span class="hljs-title class_">HttpException</span>(<span class="hljs-string">&#x27;前后切片数量不一致,禁止合并&#x27;</span>, status));
    <span class="hljs-keyword">return</span>;
  }
  <span class="hljs-keyword">const</span> sortedFiles = files.<span class="hljs-title function_">sort</span>(<span class="hljs-function">(<span class="hljs-params">a, b</span>) =&gt;</span> {
    <span class="hljs-keyword">const</span> [aIndex, bIndex] = [<span class="hljs-built_in">parseInt</span>(a.<span class="hljs-title function_">split</span>(<span class="hljs-string">&#x27;-&#x27;</span>)[<span class="hljs-number">1</span>]), <span class="hljs-built_in">parseInt</span>(b.<span class="hljs-title function_">split</span>(<span class="hljs-string">&#x27;-&#x27;</span>)[<span class="hljs-number">1</span>])];
    <span class="hljs-keyword">return</span> aIndex - bIndex;
  });
  <span class="hljs-comment">// console.log(&#x27;sortedFiles=============&gt;&#x27;, sortedFiles, fileName);</span>
  <span class="hljs-comment">// 保存合成文件路径</span>
  <span class="hljs-keyword">const</span> datePath = <span class="hljs-string">`<span class="hljs-subst">${Config.fileConfig.filePath}</span><span class="hljs-subst">${dayjs().format(<span class="hljs-string">&#x27;YYYY-MM&#x27;</span>)}</span>`</span>;
  <span class="hljs-keyword">const</span> fileNameTargetPath = <span class="hljs-string">`<span class="hljs-subst">${datePath}</span>/<span class="hljs-subst">${hash}</span>-<span class="hljs-subst">${fileName}</span>`</span>;
  <span class="hljs-keyword">const</span> writeStream = fs.<span class="hljs-title function_">createWriteStream</span>(fileNameTargetPath);
  <span class="hljs-comment">// 一次合并一块切片</span>
  <span class="hljs-keyword">const</span> <span class="hljs-title function_">mergeChunk</span> = (<span class="hljs-params">index: <span class="hljs-built_in">number</span></span>) =&gt; {
    <span class="hljs-keyword">if</span> (index &gt;= sortedFiles.<span class="hljs-property">length</span>) {
      <span class="hljs-comment">// 全部读取完写入</span>
      <span class="hljs-comment">// 完结束写入执行end()才会触发finish时间</span>
      writeStream.<span class="hljs-title function_">end</span>();
      <span class="hljs-keyword">return</span>;
    }
    <span class="hljs-keyword">const</span> filePath = path.<span class="hljs-title function_">join</span>(slicePath, sortedFiles[index]);
    <span class="hljs-keyword">const</span> readStream = fs.<span class="hljs-title function_">createReadStream</span>(filePath);
    <span class="hljs-comment">// https://www.nodeapp.cn/stream.html#stream_class_stream_readable</span>
    readStream.<span class="hljs-title function_">pipe</span>(writeStream, { <span class="hljs-attr">end</span>: <span class="hljs-literal">false</span> });
    readStream.<span class="hljs-title function_">on</span>(<span class="hljs-string">&#x27;end&#x27;</span>, <span class="hljs-function">() =&gt;</span> {
      <span class="hljs-comment">// 当切片文件过多时会报监听器超过最大值</span>
      <span class="hljs-comment">// 删除已合并的切片文件(单个删除) 每个事件独立的</span>
      fs.<span class="hljs-title function_">unlinkSync</span>(filePath);
      <span class="hljs-comment">// 处理下一个切片</span>
      <span class="hljs-title function_">mergeChunk</span>(index + <span class="hljs-number">1</span>);
    });
  };
  <span class="hljs-title function_">mergeChunk</span>(<span class="hljs-number">0</span>);
  <span class="hljs-comment">// 写入完成事件</span>
  writeStream.<span class="hljs-title function_">on</span>(<span class="hljs-string">&#x27;finish&#x27;</span>, <span class="hljs-function">() =&gt;</span> {
    <span class="hljs-variable language_">console</span>.<span class="hljs-title function_">log</span>(<span class="hljs-string">&#x27;Merge complete&#x27;</span>);
    <span class="hljs-keyword">const</span> <span class="hljs-attr">list</span>: <span class="hljs-title class_">Partial</span>&lt;<span class="hljs-title class_">Express</span>.<span class="hljs-property">Multer</span>.<span class="hljs-property">File</span>&gt;[] = [];
    <span class="hljs-keyword">const</span> fName = <span class="hljs-string">`<span class="hljs-subst">${hash}</span>-<span class="hljs-subst">${fileName}</span>`</span>;
    <span class="hljs-keyword">const</span> saveFile = {
      <span class="hljs-attr">originalname</span>: fileName,
      <span class="hljs-attr">filename</span>: fName,
      <span class="hljs-attr">destination</span>: datePath,
      <span class="hljs-attr">mimetype</span>: fileName.<span class="hljs-title function_">split</span>(<span class="hljs-string">&#x27;.&#x27;</span>).<span class="hljs-title function_">slice</span>(-<span class="hljs-number">1</span>)[<span class="hljs-number">0</span>],
      <span class="hljs-attr">size</span>: sliceSize,
    };
    list.<span class="hljs-title function_">push</span>(saveFile);
    <span class="hljs-variable language_">this</span>.<span class="hljs-property">resourcesService</span>.<span class="hljs-title function_">uploadFile</span>(list, <span class="hljs-string">&#x27;19f66b84-8841-4cf5-8932-d11b95947d2d&#x27;</span>);
    <span class="hljs-title function_">resolve</span>(saveFile);
    <span class="hljs-comment">// 移除切片文件夹</span>
    <span class="hljs-title function_">deleteFolderRecursive</span>(slicePath);
  });
});

}

链接

Demo页面