主要步骤:
主要难点和坑点
具体代码参考
前端主要步骤:
获取到文件对象之后,使用Worker开启一个线程去执行文件的切片和文件hash值得计算;使用的库为spark-md5
worker执行文件 worker.js
// 使用线程创建切片
const createChunksByWorker = (file) => {
return new Promise((resolve, reject) => {
const url = new URL('./worker.js', import.meta.url).href
const myWorker = new Worker(url)
myWorker.postMessage({ file, chunkSize: ChunkSize, })
myWorker.onmessage = (e) => {
// console.log('线程切片完成', e.data)
resolve(e.data)
myWorker.terminate()
}
})
}
上传执行函数
上传成功之后直接请求合并文件接口
const uploadFile = async (file) => {
// 设置文件名
fileName.value = file.name
// 获取文件hash值
const { chunkList, hash, } = await createChunksByWorker(file)
fileHash.value = hash
chunkTotal.value = chunkList.length
// console.log(chunks, fileName)
const [err, data] = await checkFile({ hash, })
const { isExist, chunks = [], } = data
curProgress.value = chunks.length
if (err) {
console.log(err, data)
return
}
console.log(
'curProgress==================>',
curProgress.value,
chunkTotal.value,
curProgress.value / chunkTotal.value
)
if (isExist) {
messageDanger('文件已存在')
loading.value = false
return
}
// 只上传没上传成功的
const filterChunkList = chunkList.filter(v => !chunks.includes(v.index))
const uploaded = await uploadChunks(filterChunkList)
if (uploaded) {
currentIndex = 0
const [err, data] = await mergeFile({
chunks: chunkTotal.value,
fileName: fileName.value,
hash,
})
if (!err) {
messageSuccess('上传成功')
}
setTimeout(() => {
starting.value = false
loading.value = false
}, 1000)
}
}
并发数控制
限制请求并发数队列实现;首次最大并发数请求,后续一个请求完成则继续从队列取出数据接着请求,先进先出
let currentIndex = 0;
function limitRequests2 (chunks, fn, maxRequest) {
requests = [...chunks]
const totalRequests = chunks.length
const requestFn = fn
let errorNum = 0 // 错误数
currentIndex = 0
return new Promise((resolve, reject) => {
const makeRequest = async (chunk) => {
if (stopUpload) {
reject(new Error('停止'))
return
}
try {
await requestFn(toFormData(chunk))
currentIndex++
curProgress.value++
// console.log('Request to completed', response)
} catch (error) {
errorNum++
// 请求失败重试逻辑
if (errorNum < 10) {
console.log('Retrying request to', errorNum)
await makeRequest(chunk) // 重新发起请求
}
} finally {
// await之后执行
// 如果还有待处理的请求,则发起下一个请求
if (requests.length > 0) {
await makeRequest(requests.shift())
}
if (errorNum > 10) {
reject(new Error('错误数太多'))
}
if (requests.length === 0 && currentIndex === totalRequests) {
resolve(true)
}
}
}
// 初始化拿出最大最大并发数请求
for (let i = 0; i < maxRequest; i++) {
makeRequest(requests.shift())
}
})
}
// 批量上传切片
const uploadChunks = async (chunks) => {
try {
const resList = await limitRequests2(chunks, uploadHandler, MaxRequest)
// const resList = await limitRequests1(chunks)
console.log('resList====>', resList)
return resList
} catch (error) {
// console.log(error)
}
}
// 转为formData
const toFormData = (chunk) => {
const fd = new FormData()
Object.keys(chunk).forEach(k => fd.append(k, chunk[k]))
return fd
}
// 上传
const uploadHandler = (formData) => {
return uploadFileRequest(formData)
}
具体代码实现
需要实现三个接口:
import * as fs from 'fs';
@Module({
imports: [
TypeOrmModule.forFeature([FileStore]),
MulterModule.registerAsync({
imports: [],
useFactory: async () => ({
storage: diskStorage({
// 配置文件上传后的文件夹路径
destination: (req, file, callback) => {
const { hash, index } = req.query;
const path = `${Config.fileConfig.filePath}tempFolder/${hash}`;
// 创建文件夹
fs.mkdir(path, { recursive: true }, (err) => {
if (err) {
console.log('创建文件夹失败', err);
return;
}
});
callback(null, path);
},
filename: (req, file, cb) => {
const { hash, index } = req.query;
const filename = `${hash}-${index}`;
<span class="hljs-keyword">return</span> <span class="hljs-title function_">cb</span>(<span class="hljs-literal">null</span>, filename);
},
}),
}),
<span class="hljs-attr">inject</span>: [],
}),
]
})
export class FileModule {}
async mergeFile(body: any) {
return await new Promise(async (resolve, reject) => {
const { hash, fileName, chunks } = body;
const basePath = `${Config.fileConfig.filePath}tempFolder`;
const slicePath = `${basePath}/${hash}`;
const folderSize = await getFolderSizeBin(basePath);
const sliceSize = await getFolderSizeBin(basePath);
const status = HttpStatus.INTERNAL_SERVER_ERROR;
// console.log('folderSize=============>', folderSize);
// 4194304 4M 524288000 500M 2147483648 2G
if (folderSize > 2147483648) {
// promise 响应错误
reject(new HttpException('硬盘内存不足了~', status));
return;
}
let files = [];
try {
// 文件夹不存在会报错
files = fs.readdirSync(slicePath);
} catch (error) {
reject(new HttpException('文件不存在!', status));
return;
}
<span class="hljs-keyword">if</span> (chunks !== files.<span class="hljs-property">length</span>) {
<span class="hljs-title function_">reject</span>(<span class="hljs-keyword">new</span> <span class="hljs-title class_">HttpException</span>(<span class="hljs-string">'前后切片数量不一致,禁止合并'</span>, status));
<span class="hljs-keyword">return</span>;
}
<span class="hljs-keyword">const</span> sortedFiles = files.<span class="hljs-title function_">sort</span>(<span class="hljs-function">(<span class="hljs-params">a, b</span>) =></span> {
<span class="hljs-keyword">const</span> [aIndex, bIndex] = [<span class="hljs-built_in">parseInt</span>(a.<span class="hljs-title function_">split</span>(<span class="hljs-string">'-'</span>)[<span class="hljs-number">1</span>]), <span class="hljs-built_in">parseInt</span>(b.<span class="hljs-title function_">split</span>(<span class="hljs-string">'-'</span>)[<span class="hljs-number">1</span>])];
<span class="hljs-keyword">return</span> aIndex - bIndex;
});
<span class="hljs-comment">// console.log('sortedFiles=============>', sortedFiles, fileName);</span>
<span class="hljs-comment">// 保存合成文件路径</span>
<span class="hljs-keyword">const</span> datePath = <span class="hljs-string">`<span class="hljs-subst">${Config.fileConfig.filePath}</span><span class="hljs-subst">${dayjs().format(<span class="hljs-string">'YYYY-MM'</span>)}</span>`</span>;
<span class="hljs-keyword">const</span> fileNameTargetPath = <span class="hljs-string">`<span class="hljs-subst">${datePath}</span>/<span class="hljs-subst">${hash}</span>-<span class="hljs-subst">${fileName}</span>`</span>;
<span class="hljs-keyword">const</span> writeStream = fs.<span class="hljs-title function_">createWriteStream</span>(fileNameTargetPath);
<span class="hljs-comment">// 一次合并一块切片</span>
<span class="hljs-keyword">const</span> <span class="hljs-title function_">mergeChunk</span> = (<span class="hljs-params">index: <span class="hljs-built_in">number</span></span>) => {
<span class="hljs-keyword">if</span> (index >= sortedFiles.<span class="hljs-property">length</span>) {
<span class="hljs-comment">// 全部读取完写入</span>
<span class="hljs-comment">// 完结束写入执行end()才会触发finish时间</span>
writeStream.<span class="hljs-title function_">end</span>();
<span class="hljs-keyword">return</span>;
}
<span class="hljs-keyword">const</span> filePath = path.<span class="hljs-title function_">join</span>(slicePath, sortedFiles[index]);
<span class="hljs-keyword">const</span> readStream = fs.<span class="hljs-title function_">createReadStream</span>(filePath);
<span class="hljs-comment">// https://www.nodeapp.cn/stream.html#stream_class_stream_readable</span>
readStream.<span class="hljs-title function_">pipe</span>(writeStream, { <span class="hljs-attr">end</span>: <span class="hljs-literal">false</span> });
readStream.<span class="hljs-title function_">on</span>(<span class="hljs-string">'end'</span>, <span class="hljs-function">() =></span> {
<span class="hljs-comment">// 当切片文件过多时会报监听器超过最大值</span>
<span class="hljs-comment">// 删除已合并的切片文件(单个删除) 每个事件独立的</span>
fs.<span class="hljs-title function_">unlinkSync</span>(filePath);
<span class="hljs-comment">// 处理下一个切片</span>
<span class="hljs-title function_">mergeChunk</span>(index + <span class="hljs-number">1</span>);
});
};
<span class="hljs-title function_">mergeChunk</span>(<span class="hljs-number">0</span>);
<span class="hljs-comment">// 写入完成事件</span>
writeStream.<span class="hljs-title function_">on</span>(<span class="hljs-string">'finish'</span>, <span class="hljs-function">() =></span> {
<span class="hljs-variable language_">console</span>.<span class="hljs-title function_">log</span>(<span class="hljs-string">'Merge complete'</span>);
<span class="hljs-keyword">const</span> <span class="hljs-attr">list</span>: <span class="hljs-title class_">Partial</span><<span class="hljs-title class_">Express</span>.<span class="hljs-property">Multer</span>.<span class="hljs-property">File</span>>[] = [];
<span class="hljs-keyword">const</span> fName = <span class="hljs-string">`<span class="hljs-subst">${hash}</span>-<span class="hljs-subst">${fileName}</span>`</span>;
<span class="hljs-keyword">const</span> saveFile = {
<span class="hljs-attr">originalname</span>: fileName,
<span class="hljs-attr">filename</span>: fName,
<span class="hljs-attr">destination</span>: datePath,
<span class="hljs-attr">mimetype</span>: fileName.<span class="hljs-title function_">split</span>(<span class="hljs-string">'.'</span>).<span class="hljs-title function_">slice</span>(-<span class="hljs-number">1</span>)[<span class="hljs-number">0</span>],
<span class="hljs-attr">size</span>: sliceSize,
};
list.<span class="hljs-title function_">push</span>(saveFile);
<span class="hljs-variable language_">this</span>.<span class="hljs-property">resourcesService</span>.<span class="hljs-title function_">uploadFile</span>(list, <span class="hljs-string">'19f66b84-8841-4cf5-8932-d11b95947d2d'</span>);
<span class="hljs-title function_">resolve</span>(saveFile);
<span class="hljs-comment">// 移除切片文件夹</span>
<span class="hljs-title function_">deleteFolderRecursive</span>(slicePath);
});
});
}