Skip to content

大文件上传

交代背景

在聊天应用中,需要上传文件

问题

  • 网络断开后,之前上传的文件,需要重新上传
  • 传着传着,网络波动啥都没有了
  • 没电关机了,想接着传,做不到

专业术语对应:

  • 切片上传
  • 文件秒传
  • 断点续传
  • 断开重连重传

方案

  • 前端切片 chunk 1024M(1048576K),500K,const size = 1048576 / 500; // 分片数量
  • 将切片传递给后代,切片要取名:hash + index
  • 后端组合切片

加料

  • 前端切片:主进程做切片会卡顿,使用 web-worker 多线程切片,处理完后交给主进程发送给后端
  • 切完后,将 blob 存储到 indexdb 中,下次用户进来后,判断一下是否存在未完成上传的切片,有就尝试继续上传
  • websocket 实时上传进度,实时通知,和请求序列控制
  • 整体说大文件上传设计
    • 组件设计
    • props、事件、状态
    • 拖拽上传、多文件选择
    • 通用化不同文件的上传

整套流程

  • 原理
    • 大文件 -> 小文件
    • 逐个上传分片文件
    • 记录上传的进度
    • 发请求通知服务器进行分片合并
  • 实现步骤
    • FileReader 读取文件的内容,计算分片hash
    • 通过 FormData 上传分片,并添加文件hash,索引等标识
    • 服务端接收到分片,并保存到临时目录
    • 发送合并分片请求,服务端合并分片,并返回合并后的文件路径

具体实现

文件分片

点击上传按钮,选择文件后,调用 handleUpload

示例代码
ts
// hash
let fileHash = ref<string>('')
// 文件名
let fileName = ref<string>('')
// 分片大小 1m
const CHUNK_SIZE = 1024 * 1024

const handleUpload = async (e: Event) => {
  const files = (e.target as HTMLInputElement).files
  if (!files) return
  // 文件名
  fileName.value = files[0].name // 上传需要的数据

  // 文件分片
  let chunks = createChunks(files[0]) // 数组
  console.log(chunks)

  // 文件hash计算
  const hash = await calculateHash(chunks)
  fileHash.value = hash as string

  // 上传分片
  uploadChunks(chunks)
}

通过 file.size 进行分片

⚠️ 注意:分片的速度是非常快的,因为 FileBlob 对象都只保存了文件的基本信息(大小、类型、文件名),没有保存全部的文件数据,后续需要读取文件数据的时候,要用 FileReader 进行读取。

js
// 文件分片
const createChunks = (file: File) => {
  let chunks = []
  for (let i = 0; i < file.size; i += CHUNK_SIZE) {
    chunks.push(file.slice(i, i + CHUNK_SIZE))
  }
  return chunks
}

计算文件 hash(增量md5算法)

目的:

  1. 确保数据完整性
  2. 避免重复上传
  3. 实现断点续传
  4. 文件唯一性标识

减少计算等待时间方法有:

  1. 计算部分 hash(例如:第一个和最后一个切片全部参与计算,中间的切片只计算前面两个字节、中间两个字节、最后两个字节,但是文件过大还是会耗时较长)
示例代码
js
// 计算文件的hash
const calculateHash = (chunks: Blob[]) => {
  return new Promise((resolve) => {
    // 1. 第一个和最后一个切片全部参与计算
    // 2. 中间的切片只计算前面两个字节、中间两个字节、最后两个字节

    const targets: Blob[] = [] // 存储所有参与计算的切片 blob数组

    // 循环产生新的分片数组
    chunks.forEach((chunk, index) => {
      if (index === 0 || index === chunks.length - 1) {
        // 1. 第一个和最后一个切片全部参与计算
        targets.push(chunk)
      } else {
        // 2. 中间的切片只计算前面两个字节、中间两个字节、最后两个字节
        // chunk也是一个blob对象, 大小是CHUNK_SIZE
        targets.push(chunk.slice(0, 2)) // 前面两个字节
        targets.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2)) // 中间两个字节
        targets.push(chunk.slice(CHUNK_SIZE - 2, CHUNK_SIZE)) // 最后两个字节
      }
    })

    const spark = new SparkMD5.ArrayBuffer() // 初始化一个SparkMD5实例,用于计算ArrayBuffer类型的哈希值
    const fileReader = new FileReader() // 创建一个FileReader对象,用于读取文件或Blob数据

    // new Blob(targets) 将新分片数据转为blob对象
    // 文档用法:提供了一种异步读取文件或Blob数据的方式。
    fileReader.readAsArrayBuffer(new Blob(targets)) // 将Blob对象转换为ArrayBuffer格式进行读取
    // onload方法是异步的
    fileReader.onload = (e) => {
      spark.append((e.target as FileReader).result as ArrayBuffer)
      // 拿到计算出来的hash值
      resolve(spark.end())
    }
  })
}
  1. 推荐:采用 web-worker 多线程进行优化(主要耗时是在计算文件 hash)

思路:(切片总数 / 开启线程的数量) = 一个线程的切片数,循环开启多个线程,去计算 md5,结束后调用 worker.terminate() 关闭线程

上传分片

当切片全部上传成功,发送一个“合并切片”请求,告诉后端开始合并切片

示例代码
js
// 上传分片
const uploadChunks = async (chunks: Blob[], existChunks: string[]) => {
  const dataObj = chunks.map((chunk, index) => {
    // 返回上传需要的数据
    return {
      fileHash: fileHash.value, // 文件的hash: 区分上传的是哪个文件
      chunkHash: fileHash.value + '-' + index, // 切片的hash
      chunk // chunk:每个blob对象
    }
  })

  // 每个切片都要有formData对象
  const formDatas = dataObj
    .filter((item) => !existChunks.includes(item.chunkHash))
    .map((item) => {
      const formData = new FormData()
      formData.append('fileHash', item.fileHash)
      formData.append('chunkHash', item.chunkHash)
      formData.append('chunk', item.chunk)
      return formData
    })

  const max = 6 // 最大并发请求数
  const taskPool: any = [] // 请求池: 用来存放当前执行的请求 Promise数组

  let num = 0 // 当前上传成功数量

  for (let i = 0; i < formDatas.length; i++) {
    const task = fetch('http://127.0.0.1:3000/upload', {
      method: 'POST',
      body: formDatas[i]
    })

    // 请求完成从请求池移除
    task.then(() => {
      num++
      progressNum.value =
        (Math.round((num / formDatas.length) * 100) / 100) * 100
      taskPool.splice(taskPool.findIndex((item: any) => item === task))
    })

    taskPool.push(task) // 将每个请求放入请求池数组中

    // 请求池已经达到最大请求数, 需要等待请求池中要有完成的请求(完成一个就行)
    if (taskPool.length === max) {
      // 到第7个时候,如果前面的还没有执行完,这个for循环会暂停
      await Promise.race(taskPool) // 一个完成 promise状态为成功
    }
  }

  // 为了保证请求池中的请求全部完成
  await Promise.all(taskPool)

  // 全部完成后, 通知服务器去合并分片
  mergeRequest()
}

// 合并分片请求
const mergeRequest = () => {
  fetch('http://127.0.0.1:3000/merge', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      fileHash: fileHash.value,
      fileName: fileName.value,
      size: CHUNK_SIZE
    })
  }).then((res) => {
    console.log('合并成功')
  })
}

文件秒传

实现思路:在上传文件前,就要把文件 hash 值告诉后端, 如果后端有该文件,直接返回上传成功

示例代码
js
const handleUpload = async (e: Event) => {
  const files = (e.target as HTMLInputElement).files
  if (!files) return

  // 文件名
  fileName.value = files[0].name // 上传需要的数据

  // 文件分片
  let chunks = createChunks(files[0]) // 数组

  // 文件hash计算
  const hash = await calculateHash(chunks, files[0])
  fileHash.value = hash as string

  // 文件hash校验  (文件秒传 在上传文件前,就要把文件hash值告诉后端, 如果后端有该文件,直接返回上传成功)
  let shouldUpload = await verify() 
  if (shouldUpload.data.shouldUpload) { 
    // true: 服务器没有该文件需要上传
    // 上传分片
    uploadChunks(chunks, shouldUpload.data?.existChunks) 
  } 

  // 上传分片
  uploadChunks(chunks)
}

// 校验文件hash
const verify = () => { 
  return fetch('http://127.0.0.1:3000/verify', { 
    method: 'POST', 
    headers: { 
      'Content-Type': 'application/json'
    }, 
    body: JSON.stringify({ 
      fileHash: fileHash.value, 
      fileName: fileName.value // 用来获取文件后缀的
    }) 
  }) 
    .then((res) => res.json()) 
    .then((data) => { 
      return data 
    }) 
} 

断点续传

前端发送请求,看后端是否存在切片,如果有切片,只需要补剩余切片内容就行

js
// 获取已上传的切片信息
const response = await fetch(`/check-chunks?filehash=${fileHase.value}`)

断开重连重传

断开重连重传,前端需要记录上传进度,断开重连时,需要从断点处继续上传。可以通过 socket.io(因为有心跳重连机制),当网络重连成功,内容没有上传完成,就继续接着上传。