Commit 965f6b03 authored by nanahira's avatar nanahira

add QueueRunner

parent 7927e453
......@@ -25,7 +25,7 @@
"peerDependencies": {
"@nestjs/common": "^9.1.2",
"@nestjs/core": "^9.4.2",
"aragami": "^1.1.2",
"aragami": "^1.1.8",
"rxjs": "^7.8.1"
}
},
......@@ -1653,15 +1653,16 @@
}
},
"node_modules/aragami": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/aragami/-/aragami-1.1.2.tgz",
"integrity": "sha512-asrmtKkLOgtXMYt3TeM/lWsmbKdJwjUPDV9yb+h62FBBMFx0h+lrEQN1XL6ZIkfyggtyC0G5gv0f9G+Ul0cTUg==",
"version": "1.1.8",
"resolved": "https://registry.npmjs.org/aragami/-/aragami-1.1.8.tgz",
"integrity": "sha512-kHwOawwEFfEyxAt6/dRm5YORgIfFtZl2b/TFoZx0LxQzP3uqNgf1tkbfjLEVi5DI+nRAe6iDd+43bn1c5ZNXCg==",
"peer": true,
"dependencies": {
"@nanahira/redlock": "^1.0.0",
"better-lock": "^2.0.3",
"class-transformer": "^0.5.1",
"encoded-buffer": "^0.2.6",
"generic-pool": "^3.9.0",
"ioredis": "^5.2.3",
"lodash": "^4.17.21",
"lru-cache": "^7.13.1",
......@@ -2671,6 +2672,15 @@
"integrity": "sha512-dsKNQNdj6xA3T+QlADDA7mOSlX0qiMINjn0cgr+eGHGsbSHzTabcIogz2+p/iqP1Xs6EP/sS2SbqH+brGTbq0g==",
"dev": true
},
"node_modules/generic-pool": {
"version": "3.9.0",
"resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz",
"integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==",
"peer": true,
"engines": {
"node": ">= 4"
}
},
"node_modules/gensync": {
"version": "1.0.0-beta.2",
"resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz",
......@@ -6369,15 +6379,16 @@
}
},
"aragami": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/aragami/-/aragami-1.1.2.tgz",
"integrity": "sha512-asrmtKkLOgtXMYt3TeM/lWsmbKdJwjUPDV9yb+h62FBBMFx0h+lrEQN1XL6ZIkfyggtyC0G5gv0f9G+Ul0cTUg==",
"version": "1.1.8",
"resolved": "https://registry.npmjs.org/aragami/-/aragami-1.1.8.tgz",
"integrity": "sha512-kHwOawwEFfEyxAt6/dRm5YORgIfFtZl2b/TFoZx0LxQzP3uqNgf1tkbfjLEVi5DI+nRAe6iDd+43bn1c5ZNXCg==",
"peer": true,
"requires": {
"@nanahira/redlock": "^1.0.0",
"better-lock": "^2.0.3",
"class-transformer": "^0.5.1",
"encoded-buffer": "^0.2.6",
"generic-pool": "^3.9.0",
"ioredis": "^5.2.3",
"lodash": "^4.17.21",
"lru-cache": "^7.13.1",
......@@ -7147,6 +7158,12 @@
"integrity": "sha512-dsKNQNdj6xA3T+QlADDA7mOSlX0qiMINjn0cgr+eGHGsbSHzTabcIogz2+p/iqP1Xs6EP/sS2SbqH+brGTbq0g==",
"dev": true
},
"generic-pool": {
"version": "3.9.0",
"resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz",
"integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==",
"peer": true
},
"gensync": {
"version": "1.0.0-beta.2",
"resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz",
......
......@@ -55,7 +55,7 @@
"peerDependencies": {
"@nestjs/common": "^9.1.2",
"@nestjs/core": "^9.4.2",
"aragami": "^1.1.2",
"aragami": "^1.1.8",
"rxjs": "^7.8.1"
}
}
import { Aragami, ClassType } from 'aragami';
import { Logger } from '@nestjs/common';
import * as os from 'os';
import { InjectAragami } from '../index';
export interface QueueRunnerOptions {
maxConcurrency?: number;
key?: string;
logTask?: boolean;
}
export class _QueueRunner<T> {
logger = new Logger(this._queueClass.name);
_maxConcurrency = this._queueRunnerOptions.maxConcurrency ?? os.cpus().length;
constructor(
public _queueClass: ClassType<T>,
public _queueRunnerOptions: QueueRunnerOptions,
) {}
@InjectAragami()
aragami: Aragami;
async onApplicationBootstrap() {
if (this._queueRunnerOptions.logTask) {
this.logger.log(`Starting ${this._maxConcurrency} workers`);
}
for (let i = 0; i < this._maxConcurrency; i++) {
if (this._queueRunnerOptions.logTask) {
this.logger.log(`Starting worker ${i}`);
}
this._mainLoop().then();
}
}
_quit = false;
async onApplicationShutdown() {
this._quit = true;
}
async runTask(task: T) {
return;
}
async _mainLoop() {
while (!this._quit) {
this.logger.log('Looping');
try {
const task: T = await this.aragami.queueGatherBlocking(
this._queueClass,
this._queueRunnerOptions.key || 'default',
);
if (!task) continue;
if (this._queueRunnerOptions.logTask) {
this.logger.log(`Got task: ${JSON.stringify(task)}`);
}
await this.runTask(task);
} catch (e) {
this.logger.error(`Loop failed: ${e}`);
}
}
}
}
export function QueueRunner<T>(
queueClass: ClassType<T>,
options?: Partial<QueueRunnerOptions>,
) {
const cl = class extends _QueueRunner<T> {
constructor(innerOptions?: Partial<QueueRunnerOptions>) {
super(queueClass, {
...(innerOptions || {}),
...(options || {}),
});
}
};
Object.defineProperty(cl, 'name', { value: queueClass.name + 'Runner' });
return cl;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment