Commit 3255d86e authored by nanahira's avatar nanahira

add queue retry

parent e9b74da1
Pipeline #26519 passed with stages
in 1 minute and 3 seconds
...@@ -2,11 +2,13 @@ import { Aragami, ClassType } from 'aragami'; ...@@ -2,11 +2,13 @@ import { Aragami, ClassType } from 'aragami';
import { ConsoleLogger, Logger } from '@nestjs/common'; import { ConsoleLogger, Logger } from '@nestjs/common';
import * as os from 'os'; import * as os from 'os';
import { InjectAragami } from '../index'; import { InjectAragami } from '../index';
import { retry } from 'rxjs';
export interface QueueRunnerOptions { export interface QueueRunnerOptions {
maxConcurrency?: number; maxConcurrency?: number;
key?: string; key?: string;
logTask?: boolean; logTask?: boolean;
retry?: number;
} }
export class _QueueRunner<T> { export class _QueueRunner<T> {
...@@ -43,6 +45,8 @@ export class _QueueRunner<T> { ...@@ -43,6 +45,8 @@ export class _QueueRunner<T> {
return; return;
} }
_queueKey = this._queueRunnerOptions.key || 'default';
async _mainLoop(i: number) { async _mainLoop(i: number) {
const logger = new ConsoleLogger(`${this._queueClass.name}-${i}`); const logger = new ConsoleLogger(`${this._queueClass.name}-${i}`);
while (!this._quit) { while (!this._quit) {
...@@ -50,13 +54,31 @@ export class _QueueRunner<T> { ...@@ -50,13 +54,31 @@ export class _QueueRunner<T> {
try { try {
const task: T = await this.aragami.queueGatherBlocking( const task: T = await this.aragami.queueGatherBlocking(
this._queueClass, this._queueClass,
this._queueRunnerOptions.key || 'default', this._queueKey,
); );
if (!task) continue; if (!task) continue;
if (this._queueRunnerOptions.logTask) { if (this._queueRunnerOptions.logTask) {
logger.log(`Got task: ${JSON.stringify(task)}`); logger.log(`Got task: ${JSON.stringify(task)}`);
} }
await this.runTask(task, i); try {
await this.runTask(task, i);
} catch (e) {
logger.error(`Task failed: ${e}`);
if (
this._queueRunnerOptions.retry &&
(!task['__retry'] ||
task['__retry'] < this._queueRunnerOptions.retry)
) {
logger.log(`Retrying task: ${JSON.stringify(task)}`);
if (this._queueRunnerOptions.retry > 0) {
task['__retry'] = (task['__retry'] || 0) + 1;
}
await this.aragami.queueAdd(task, {
key: this._queueKey,
prior: true,
});
}
}
} catch (e) { } catch (e) {
logger.error(`Loop failed: ${e}`); logger.error(`Loop failed: ${e}`);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment