Commit 43c3f4bf authored by nanahira's avatar nanahira

better destroy process

parent 3dd0ba32
......@@ -11,6 +11,11 @@ export class MemoryDriver extends BaseDriver {
cache.clear();
}
this.cacheMap.clear();
this.queues.clear();
[...this.blockingGathers.values()].forEach((v) =>
v.forEach((f) => f(undefined)),
);
this.blockingGathers.clear();
}
private getCacheInstance(baseKey: string) {
......
......@@ -95,7 +95,11 @@ export class RedisDriver extends BaseDriver {
);
}
quitted = false;
async destroy() {
this.quitted = true;
[...this.waitingBlockingProms.values()].forEach((resolve) => resolve());
await this.pool.drain();
}
......@@ -134,17 +138,28 @@ export class RedisDriver extends BaseDriver {
return value || undefined;
}
private waitingBlockingProms = new Map<
Promise<[Buffer, Buffer]>,
() => void
>();
override async queueGatherBlocking(key: string): Promise<Buffer> {
if (this.quitted) return;
const _key = this.getQueueKey(key);
const redisClient = await this.createRedisClient();
try {
const value = await redisClient.brpopBuffer(_key, 0);
const valueProm = redisClient.brpopBuffer(_key, 0);
const exitProm = new Promise<void>((resolve) => {
this.waitingBlockingProms.set(valueProm, resolve);
});
const value = await Promise.race([valueProm, exitProm]);
this.waitingBlockingProms.delete(valueProm);
if (value) return value?.[1];
//console.log('wait2');
return await this.queueGatherBlocking(key);
} catch (e) {
//console.log(e);
return this.queueGatherBlocking(key);
return await this.queueGatherBlocking(key);
} finally {
await redisClient.quit();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment