Appearance
大文件上传
交代背景
在聊天应用中,需要上传文件
问题
- 网络断开后,之前上传的文件,需要重新上传
- 传着传着,网络波动啥都没有了
- 没电关机了,想接着传,做不到
专业术语对应:
- 切片上传
- 文件秒传
- 断点续传
- 断开重连重传
方案
- 前端切片 chunk 1024M(1048576K),500K,const size = 1048576 / 500; // 分片数量
- 将切片传递给后代,切片要取名:hash + index
- 后端组合切片
加料
- 前端切片:主进程做切片会卡顿,使用 web-worker 多线程切片,处理完后交给主进程发送给后端
- 切完后,将 blob 存储到 indexdb 中,下次用户进来后,判断一下是否存在未完成上传的切片,有就尝试继续上传
- websocket 实时上传进度,实时通知,和请求序列控制
- 整体说大文件上传设计
- 组件设计
- props、事件、状态
- 拖拽上传、多文件选择
- 通用化不同文件的上传
整套流程
- 原理
- 大文件 -> 小文件
- 逐个上传分片文件
- 记录上传的进度
- 发请求通知服务器进行分片合并
- 实现步骤
- FileReader 读取文件的内容,计算分片hash
- 通过 FormData 上传分片,并添加文件hash,索引等标识
- 服务端接收到分片,并保存到临时目录
- 发送合并分片请求,服务端合并分片,并返回合并后的文件路径
具体实现
文件分片
点击上传按钮,选择文件后,调用 handleUpload
示例代码
ts
// hash
let fileHash = ref<string>('')
// 文件名
let fileName = ref<string>('')
// 分片大小 1m
const CHUNK_SIZE = 1024 * 1024
const handleUpload = async (e: Event) => {
const files = (e.target as HTMLInputElement).files
if (!files) return
// 文件名
fileName.value = files[0].name // 上传需要的数据
// 文件分片
let chunks = createChunks(files[0]) // 数组
console.log(chunks)
// 文件hash计算
const hash = await calculateHash(chunks)
fileHash.value = hash as string
// 上传分片
uploadChunks(chunks)
}
通过 file.size
进行分片
⚠️ 注意:分片的速度是非常快的,因为
File
和Blob
对象都只保存了文件的基本信息(大小、类型、文件名),没有保存全部的文件数据,后续需要读取文件数据的时候,要用FileReader
进行读取。
js
// 文件分片
const createChunks = (file: File) => {
let chunks = []
for (let i = 0; i < file.size; i += CHUNK_SIZE) {
chunks.push(file.slice(i, i + CHUNK_SIZE))
}
return chunks
}
计算文件 hash(增量md5算法)
目的:
- 确保数据完整性
- 避免重复上传
- 实现断点续传
- 文件唯一性标识
减少计算等待时间方法有:
- 计算部分 hash(例如:第一个和最后一个切片全部参与计算,中间的切片只计算前面两个字节、中间两个字节、最后两个字节,但是文件过大还是会耗时较长)
示例代码
js
// 计算文件的hash
const calculateHash = (chunks: Blob[]) => {
return new Promise((resolve) => {
// 1. 第一个和最后一个切片全部参与计算
// 2. 中间的切片只计算前面两个字节、中间两个字节、最后两个字节
const targets: Blob[] = [] // 存储所有参与计算的切片 blob数组
// 循环产生新的分片数组
chunks.forEach((chunk, index) => {
if (index === 0 || index === chunks.length - 1) {
// 1. 第一个和最后一个切片全部参与计算
targets.push(chunk)
} else {
// 2. 中间的切片只计算前面两个字节、中间两个字节、最后两个字节
// chunk也是一个blob对象, 大小是CHUNK_SIZE
targets.push(chunk.slice(0, 2)) // 前面两个字节
targets.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2)) // 中间两个字节
targets.push(chunk.slice(CHUNK_SIZE - 2, CHUNK_SIZE)) // 最后两个字节
}
})
const spark = new SparkMD5.ArrayBuffer() // 初始化一个SparkMD5实例,用于计算ArrayBuffer类型的哈希值
const fileReader = new FileReader() // 创建一个FileReader对象,用于读取文件或Blob数据
// new Blob(targets) 将新分片数据转为blob对象
// 文档用法:提供了一种异步读取文件或Blob数据的方式。
fileReader.readAsArrayBuffer(new Blob(targets)) // 将Blob对象转换为ArrayBuffer格式进行读取
// onload方法是异步的
fileReader.onload = (e) => {
spark.append((e.target as FileReader).result as ArrayBuffer)
// 拿到计算出来的hash值
resolve(spark.end())
}
})
}
- 推荐:采用 web-worker 多线程进行优化(主要耗时是在计算文件 hash)
思路:
(切片总数 / 开启线程的数量) = 一个线程的切片数
,循环开启多个线程,去计算 md5,结束后调用worker.terminate()
关闭线程
上传分片
当切片全部上传成功,发送一个“合并切片”请求,告诉后端开始合并切片
示例代码
js
// 上传分片
const uploadChunks = async (chunks: Blob[], existChunks: string[]) => {
const dataObj = chunks.map((chunk, index) => {
// 返回上传需要的数据
return {
fileHash: fileHash.value, // 文件的hash: 区分上传的是哪个文件
chunkHash: fileHash.value + '-' + index, // 切片的hash
chunk // chunk:每个blob对象
}
})
// 每个切片都要有formData对象
const formDatas = dataObj
.filter((item) => !existChunks.includes(item.chunkHash))
.map((item) => {
const formData = new FormData()
formData.append('fileHash', item.fileHash)
formData.append('chunkHash', item.chunkHash)
formData.append('chunk', item.chunk)
return formData
})
const max = 6 // 最大并发请求数
const taskPool: any = [] // 请求池: 用来存放当前执行的请求 Promise数组
let num = 0 // 当前上传成功数量
for (let i = 0; i < formDatas.length; i++) {
const task = fetch('http://127.0.0.1:3000/upload', {
method: 'POST',
body: formDatas[i]
})
// 请求完成从请求池移除
task.then(() => {
num++
progressNum.value =
(Math.round((num / formDatas.length) * 100) / 100) * 100
taskPool.splice(taskPool.findIndex((item: any) => item === task))
})
taskPool.push(task) // 将每个请求放入请求池数组中
// 请求池已经达到最大请求数, 需要等待请求池中要有完成的请求(完成一个就行)
if (taskPool.length === max) {
// 到第7个时候,如果前面的还没有执行完,这个for循环会暂停
await Promise.race(taskPool) // 一个完成 promise状态为成功
}
}
// 为了保证请求池中的请求全部完成
await Promise.all(taskPool)
// 全部完成后, 通知服务器去合并分片
mergeRequest()
}
// 合并分片请求
const mergeRequest = () => {
fetch('http://127.0.0.1:3000/merge', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
fileHash: fileHash.value,
fileName: fileName.value,
size: CHUNK_SIZE
})
}).then((res) => {
console.log('合并成功')
})
}
文件秒传
实现思路:在上传文件前,就要把文件 hash 值告诉后端, 如果后端有该文件,直接返回上传成功
示例代码
js
const handleUpload = async (e: Event) => {
const files = (e.target as HTMLInputElement).files
if (!files) return
// 文件名
fileName.value = files[0].name // 上传需要的数据
// 文件分片
let chunks = createChunks(files[0]) // 数组
// 文件hash计算
const hash = await calculateHash(chunks, files[0])
fileHash.value = hash as string
// 文件hash校验 (文件秒传 在上传文件前,就要把文件hash值告诉后端, 如果后端有该文件,直接返回上传成功)
let shouldUpload = await verify()
if (shouldUpload.data.shouldUpload) {
// true: 服务器没有该文件需要上传
// 上传分片
uploadChunks(chunks, shouldUpload.data?.existChunks)
}
// 上传分片
uploadChunks(chunks)
}
// 校验文件hash
const verify = () => {
return fetch('http://127.0.0.1:3000/verify', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
fileHash: fileHash.value,
fileName: fileName.value // 用来获取文件后缀的
})
})
.then((res) => res.json())
.then((data) => {
return data
})
}
断点续传
前端发送请求,看后端是否存在切片,如果有切片,只需要补剩余切片内容就行
js
// 获取已上传的切片信息
const response = await fetch(`/check-chunks?filehash=${fileHase.value}`)
断开重连重传
断开重连重传,前端需要记录上传进度,断开重连时,需要从断点处继续上传。可以通过 socket.io
(因为有心跳重连机制),当网络重连成功,内容没有上传完成,就继续接着上传。