Commit ffcd690e authored by nanahira's avatar nanahira

add queues && use redis pool

parent 81f79d1b
......@@ -13,6 +13,7 @@
"better-lock": "^2.0.3",
"class-transformer": "^0.5.1",
"encoded-buffer": "^0.2.6",
"generic-pool": "^3.9.0",
"ioredis": "^5.2.3",
"lodash": "^4.17.21",
"lru-cache": "^7.13.1",
......@@ -2600,6 +2601,14 @@
"integrity": "sha512-yIovAzMX49sF8Yl58fSCWJ5svSLuaibPxXQJFLmBObTuCr0Mf1KiPopGM9NiFjiYBCbfaa2Fh6breQ6ANVTI0A==",
"dev": true
},
"node_modules/generic-pool": {
"version": "3.9.0",
"resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz",
"integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==",
"engines": {
"node": ">= 4"
}
},
"node_modules/gensync": {
"version": "1.0.0-beta.2",
"resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz",
......@@ -6854,6 +6863,11 @@
"integrity": "sha512-yIovAzMX49sF8Yl58fSCWJ5svSLuaibPxXQJFLmBObTuCr0Mf1KiPopGM9NiFjiYBCbfaa2Fh6breQ6ANVTI0A==",
"dev": true
},
"generic-pool": {
"version": "3.9.0",
"resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz",
"integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g=="
},
"gensync": {
"version": "1.0.0-beta.2",
"resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz",
......
......@@ -35,7 +35,7 @@ export class Aragami {
return 'default';
}
private async getKey(o: any, prototype?: AnyClass) {
private async getKey(o: any, prototype?: AnyClass, fallback?: string) {
if (typeof o === 'string') {
return o;
}
......@@ -44,6 +44,9 @@ export class Aragami {
}
const keyTransformer = reflector.get('AragamiCacheKey', o);
if (!keyTransformer) {
if (fallback) {
return fallback;
}
throw new Error(`No key metadata found for ${o.constructor.name}`);
}
return await keyTransformer(o);
......@@ -218,4 +221,83 @@ export class Aragami {
await this.driver.destroy();
} catch (e) {}
}
async isQueueEmpty<T>(cl: ClassType<T>, key = 'default') {
return this.driver.isQueueEmpty(this.getBaseKey(cl) + ':' + key);
}
async queueLength<T>(cl: ClassType<T>, key = 'default') {
return this.driver.queueLength(this.getBaseKey(cl) + ':' + key);
}
async queueItems<T>(cl: ClassType<T>, key = 'default') {
const items = await this.driver.queueItems(this.getBaseKey(cl) + ':' + key);
return items.map((buf) => this.decode(cl, buf));
}
async queueAdd<T>(
o: T,
options?: { key?: string; prototype?: ClassType<T>; prior?: boolean },
): Promise<T>;
async queueAdd<T>(
prototype: ClassType<T>,
o: PartialDeep<T>,
options?: { key?: string; prior?: boolean },
): Promise<T>;
async queueAdd<T>(...args: any[]) {
let prototype: ClassType<T>;
let o: T;
let options: {
ttl?: number;
key?: string;
prototype?: ClassType<T>;
prior?: boolean;
};
const firstArg = args[0];
if (typeof firstArg === 'function') {
prototype = firstArg;
o = args[1];
options = args[2] || {};
} else {
o = firstArg;
options = args[1] || {};
prototype = options.prototype;
}
if (!o) {
return o;
}
if (prototype) {
o = plainToInstance(prototype, o);
}
const buf = this.encode(o);
const key =
this.getBaseKey(o) +
':' +
(options.key || (await this.getKey(o, undefined, 'default')));
await this.driver.queueAdd(key, buf, options.prior);
return o;
}
async queueGather<T>(prototype: ClassType<T>, key = 'default'): Promise<T> {
const baseKey = this.getBaseKey(prototype);
const buffer = await this.driver.queueGather(baseKey + ':' + key);
if (!buffer) {
return;
}
return this.decode(prototype, buffer);
}
async queueGatherBlocking<T>(
prototype: ClassType<T>,
key = 'default',
): Promise<T> {
const baseKey = this.getBaseKey(prototype);
const buffer = await this.driver.queueGatherBlocking(baseKey + ':' + key);
return this.decode(prototype, buffer);
}
async queueClear<T>(prototype: ClassType<T>, key = 'default') {
const baseKey = this.getBaseKey(prototype);
await this.driver.queueClear(baseKey + ':' + key);
}
}
......@@ -48,4 +48,34 @@ export class BaseDriver {
}
async destroy(): Promise<void> {}
async isQueueEmpty(key: string): Promise<boolean> {
return (await this.queueLength(key)) === 0;
}
async queueLength(key: string): Promise<number> {
return (await this.queueItems(key)).length;
}
async queueItems(key: string): Promise<Buffer[]> {
return [];
}
async queueAdd(key: string, value: Buffer, prior?: boolean): Promise<void> {}
async queueGather(key: string): Promise<Buffer> {
return;
}
async queueGatherBlocking(key: string): Promise<Buffer> {
while (true) {
const value = await this.queueGather(key);
if (value) {
return value;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
async queueClear(key: string): Promise<void> {}
}
......@@ -4,6 +4,7 @@ import { Settings } from '@nanahira/redlock';
export type RedisDriverOptions = RedisOptions & {
uri?: string;
lock?: Settings & { duration?: number; prefix?: string };
queueKey?: string;
};
export interface AragamiOptions {
......
......@@ -73,4 +73,54 @@ export class MemoryDriver extends BaseDriver {
override lock<R>(keys: string[], cb: () => Promise<R>): Promise<R> {
return this.betterLock.acquire(keys, cb);
}
private queues = new Map<string, Buffer[]>();
private blockingGathers = new Map<string, ((buf: Buffer) => void)[]>();
override async queueAdd(
key: string,
value: Buffer,
prior?: boolean,
): Promise<void> {
if (this.blockingGathers.has(key)) {
const cb = this.blockingGathers.get(key).shift();
if (cb) {
cb(value);
return;
}
}
if (!this.queues.has(key)) {
this.queues.set(key, []);
}
const queue = this.queues.get(key);
if (prior) {
queue.unshift(value);
} else {
queue.push(value);
}
}
async queueItems(key: string): Promise<Buffer[]> {
return [...(this.queues.get(key) || [])];
}
override async queueGather(key: string): Promise<Buffer> {
const queue = this.queues.get(key);
if (queue?.length) return queue.shift();
}
override async queueGatherBlocking(key: string): Promise<Buffer> {
const itemInQueue = await this.queueGather(key);
if (itemInQueue) return itemInQueue;
if (!this.blockingGathers.has(key)) {
this.blockingGathers.set(key, []);
}
return new Promise((resolve) => {
this.blockingGathers.get(key).push(resolve);
});
}
override async queueClear(key: string): Promise<void> {
this.queues.delete(key);
}
}
......@@ -2,26 +2,46 @@ import { BaseDriver } from '../base-driver';
import Redis from 'ioredis';
import Redlock from '@nanahira/redlock';
import { RedisDriverOptions } from '../def';
import { createPool } from 'generic-pool';
export class RedisDriver extends BaseDriver {
private readonly redis: Redis;
private readonly redlock: Redlock;
constructor(private options: RedisDriverOptions) {
super();
if (options.uri) {
this.redis = new Redis(options.uri);
async createRedisClient() {
let redis: Redis;
if (this.options.uri) {
redis = new Redis(this.options.uri);
} else {
this.redis = new Redis(options);
redis = new Redis(this.options);
}
this.redlock = new Redlock([this.redis], options.lock);
// await redis.connect();
return redis;
}
private pool = createPool({
create: async () => {
const redis = await this.createRedisClient();
return {
redis,
redlock: new Redlock([redis], this.options.lock),
};
},
destroy: async ({ redis }) => {
await redis.quit();
},
});
constructor(private options: RedisDriverOptions) {
super();
}
override async has(baseKey: string, key: string) {
return (await this.redis.exists(this.usingKey(baseKey, key))) !== 0;
return (
(await this.pool.use((r) =>
r.redis.exists(this.usingKey(baseKey, key)),
)) !== 0
);
}
override async get(baseKey: string, key: string): Promise<Buffer> {
return this.redis.getBuffer(this.usingKey(baseKey, key));
return this.pool.use((r) => r.redis.getBuffer(this.usingKey(baseKey, key)));
}
override async set(
......@@ -31,19 +51,25 @@ export class RedisDriver extends BaseDriver {
ttl: number,
): Promise<void> {
const redisKey = this.usingKey(baseKey, key);
if (ttl) {
await this.redis.set(redisKey, value, 'PX', ttl);
} else {
await this.redis.set(redisKey, value);
}
await this.pool.use((r) => {
if (ttl) {
return r.redis.set(redisKey, value, 'PX', ttl);
} else {
return r.redis.set(redisKey, value);
}
});
}
override async del(baseKey: string, key: string): Promise<boolean> {
return !!this.redis.del(this.usingKey(baseKey, key));
return !!(await this.pool.use((r) =>
r.redis.del(this.usingKey(baseKey, key)),
));
}
private originalKeys(baseKey: string, prefix = '') {
return this.redis.keys(this.usingKey(baseKey, `${prefix}*`));
return this.pool.use((r) =>
r.redis.keys(this.usingKey(baseKey, `${prefix}*`)),
);
}
override async keys(baseKey: string, prefix?: string): Promise<string[]> {
......@@ -56,18 +82,76 @@ export class RedisDriver extends BaseDriver {
if (!keys.length) {
return;
}
await this.redis.del(keys);
await this.pool.use((r) => r.redis.del(keys));
}
override lock<R>(keys: string[], cb: () => Promise<R>): Promise<R> {
return this.redlock.using(
keys.map((key) => `${this.options.lock?.prefix || '_lock'}:${key}`),
this.options.lock?.duration || 5000,
cb,
return this.pool.use((r) =>
r.redlock.using(
keys.map((key) => `${this.options.lock?.prefix || '_lock'}:${key}`),
this.options.lock?.duration || 5000,
cb,
),
);
}
async destroy() {
await this.redis.quit();
await this.pool.drain();
}
private getQueueKey(key: string) {
return `${this.options.queueKey || '_queue'}:${key}`;
}
async queueLength(key: string): Promise<number> {
const _key = this.getQueueKey(key);
return this.pool.use((r) => r.redis.llen(_key));
}
async queueItems(key: string): Promise<Buffer[]> {
const _key = this.getQueueKey(key);
return this.pool.use((r) => r.redis.lrangeBuffer(_key, 0, -1));
}
override async queueAdd(
key: string,
value: Buffer,
prior?: boolean,
): Promise<void> {
const _key = this.getQueueKey(key);
await this.pool.use(async (r) => {
if (prior) {
await r.redis.lpush(_key, value);
} else {
await r.redis.rpush(_key, value);
}
});
}
override async queueGather(key: string): Promise<Buffer> {
const _key = this.getQueueKey(key);
const value = await this.pool.use((r) => r.redis.rpopBuffer(_key));
return value || undefined;
}
override async queueGatherBlocking(key: string): Promise<Buffer> {
const _key = this.getQueueKey(key);
const redisClient = await this.createRedisClient();
try {
const value = await redisClient.brpopBuffer(_key, 0);
if (value) return value?.[1];
console.log('wait2');
return await this.queueGatherBlocking(key);
} catch (e) {
console.log(e);
return this.queueGatherBlocking(key);
} finally {
await redisClient.quit();
}
}
async queueClear(key: string): Promise<void> {
const _key = this.getQueueKey(key);
await this.pool.use((r) => r.redis.del(_key));
}
}
......@@ -144,4 +144,29 @@ describe('Aragami.', () => {
const savedBook = await service.saveBook(book);
expect(savedBook).toEqual(book);
});
it('should run queue', async () => {
class Task {
id: number;
}
const task = new Task();
task.id = 1;
await aragami.queueClear(Task);
await expect(aragami.queueLength(Task)).resolves.toBe(0);
await aragami.queueAdd(task);
await expect(aragami.queueLength(Task)).resolves.toBe(1);
const _task = await aragami.queueGather(Task);
expect(_task.id).toBe(1);
await expect(aragami.queueLength(Task)).resolves.toBe(0);
const taskProm = aragami.queueGatherBlocking(Task);
console.log('before wait');
await new Promise((resolve) => setTimeout(resolve, 100));
await aragami.queueAdd(task);
await new Promise((resolve) => setTimeout(resolve, 100));
console.log('wait');
const _task2 = await taskProm;
expect(_task2.id).toBe(1);
});
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment