Commit 5a182d17 authored by nano's avatar nano

use Queue

parent f5212323
This diff is collapsed.
...@@ -12,13 +12,13 @@ import {mongodb} from '../models/Iridium'; ...@@ -12,13 +12,13 @@ import {mongodb} from '../models/Iridium';
import {toObjectID} from 'iridium'; import {toObjectID} from 'iridium';
import config from '../../config'; import config from '../../config';
import {UploadOSS} from '../utils'; import {UploadOSS} from '../utils';
import Queue from '../serives/Queue';
import Router = require('koa-router'); import Router = require('koa-router');
const downloader = new Aria2; const downloader = new Aria2;
let DownloadQueue = new Queue;
downloader.open(); downloader.open();
const checkFilePath = async (file) => { const checkFilePath = async (file) => {
if (['.gz', '.rar', '.zip', '.7z'].indexOf(path.extname(file.path)) === -1) { if (['.gz', '.rar', '.zip', '.7z'].indexOf(path.extname(file.path)) === -1) {
console.log(file); console.log(file);
...@@ -153,68 +153,69 @@ const uploadPackageUrl = async (ctx: Context) => { ...@@ -153,68 +153,69 @@ const uploadPackageUrl = async (ctx: Context) => {
} }
// testUrl: https://r.my-card.in/release/dist/0c16a3ecb115fd7cf575ccdd64f62a8f3edc635b087950e4ed4f3f781972bbfd.tar.gz // testUrl: https://r.my-card.in/release/dist/0c16a3ecb115fd7cf575ccdd64f62a8f3edc635b087950e4ed4f3f781972bbfd.tar.gz
let pack = await mongodb.Packages.findOne({_id: toObjectID(ctx.request.body._id)}); DownloadQueue.run(async (ctx, _next) => {
if (!pack) { let pack = await mongodb.Packages.findOne({_id: toObjectID(ctx.request.body._id)});
return ctx.throw(400, 'pack not exists'); if (!pack) {
} return ctx.throw(400, 'pack not exists');
}
downloader.onDownloadStart = async () => {
pack!.status = 'uploading';
await pack!.save();
};
downloader.onDownloadStart = async () => { downloader.onDownloadComplete = async (m) => {
pack!.status = 'uploading'; const {files} = await downloader.send('tellStatus', m.gid);
await pack!.save(); const [file] = files;
};
downloader.onDownloadComplete = async (m) => { try {
const {files} = await downloader.send('tellStatus', m.gid);
const [file] = files;
try { await checkFilePath(file);
// 打包
const bundled = await bundle(path.basename(file.path));
await checkFilePath(file); // 打包完, 上传阿里云
// 打包 await UploadOSS(bundled.distPath);
const bundled = await bundle(path.basename(file.path));
// 打包完, 上传阿里云 Object.assign(pack, bundled);
await UploadOSS(bundled.distPath); pack!.status = 'uploaded';
Object.assign(pack, bundled); await mongodb.Packages.update({id: pack!.id}, {$set: {status: 'deprecated'}}, {multi: true});
pack!.status = 'uploaded'; await pack!.save();
await mongodb.Packages.update({id: pack!.id}, {$set: {status: 'deprecated'}}, {multi: true}); // 上传完,干掉本地目录
await pack!.save(); await fs.removeAsync(bundled.distPath);
_next();
} catch (e) {
console.trace(e);
pack!.status = 'failed';
_next();
await pack!.save();
}
};
// 上传完,干掉本地目录 downloader.onDownloadError = async (err) => {
await fs.removeAsync(bundled.distPath); // console.log(await downloader.send('tellStatus', err.gid))
} catch (e) {
console.trace(e);
pack!.status = 'failed'; pack!.status = 'failed';
await pack!.save(); await pack!.save();
} console.log(err);
await downloader.close() _next();
}; };
downloader.onDownloadError = async (err) => {
// console.log(await downloader.send('tellStatus', err.gid))
pack!.status = 'failed';
await pack!.save();
await downloader.close()
console.log(err);
};
ctx.body = await new Promise((resolve, reject) => {
ctx.body = await new Promise((resolve, reject) => { downloader.onmessage = m => {
if (m['error']) {
reject(m['error']);
} else {
resolve(m);
}
};
downloader.onmessage = m => { downloader.send('addUri', [ctx.request.body.url], {dir: config.upload_path});
if (m['error']) { });
reject(m['error']);
} else {
resolve(m);
}
};
downloader.send('addUri', [ctx.request.body.url], {dir: config.upload_path});
}); });
}; };
router.post('/v1/upload/image', UploadImage); router.post('/v1/upload/image', UploadImage);
......
export default class Queue {
running: number;
queue: Function[];
concurrency: number;
constructor() {
this.concurrency = 1;
this.running = 0;
this.queue = [];
}
set(args) {
return Object.assign(this, args);
}
run(task) {
this.queue.push(task);
this.next();
return this;
}
next() {
while (this.running < this.concurrency && this.queue.length) {
let task = this.queue.shift();
if (task) {
task(this, () => {
this.running--;
this.next();
});
this.running++;
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment