Commit c4838659 authored by nanahira's avatar nanahira

unfinished

parent 000e82b2
......@@ -81,5 +81,21 @@ export class BaseDriver {
}
}
async queueAck(key: string, value: Buffer): Promise<void> {
return;
}
async queueResume(
key: string,
value: Buffer,
prior?: boolean,
): Promise<void> {
return;
}
async queueResumeAll(key: string, prior?: boolean): Promise<void> {
return;
}
async queueClear(key: string): Promise<void> {}
}
......@@ -9,6 +9,7 @@ export type RedisDriverOptions = RedisOptions & {
stacked?: boolean;
};
queueKey?: string;
queueBackupKey?: string;
};
export interface AragamiOptions {
......
......@@ -84,8 +84,23 @@ export class MemoryDriver extends BaseDriver {
}
private queues = new Map<string, Buffer[]>();
private backupQueues = new Map<string, Buffer[]>();
private blockingGathers = new Map<string, ((buf: Buffer) => void)[]>();
private getQueue(key: string) {
if (!this.queues.has(key)) {
this.queues.set(key, []);
}
return this.queues.get(key);
}
private getBackupQueue(key: string) {
if (!this.backupQueues.has(key)) {
this.backupQueues.set(key, []);
}
return this.backupQueues.get(key);
}
override async queueAdd(
key: string,
value: Buffer,
......@@ -98,10 +113,7 @@ export class MemoryDriver extends BaseDriver {
return;
}
}
if (!this.queues.has(key)) {
this.queues.set(key, []);
}
const queue = this.queues.get(key);
const queue = this.getQueue(key);
if (prior) {
queue.unshift(value);
} else {
......@@ -115,7 +127,11 @@ export class MemoryDriver extends BaseDriver {
override async queueGather(key: string): Promise<Buffer> {
const queue = this.queues.get(key);
if (queue?.length) return queue.shift();
if (!queue.length) return;
const elem = queue.shift();
const backupQueue = this.getBackupQueue(key);
backupQueue.push(elem);
return elem;
}
override async queueGatherBlocking(key: string): Promise<Buffer> {
......@@ -129,6 +145,36 @@ export class MemoryDriver extends BaseDriver {
});
}
override async queueAck(key: string, value: Buffer): Promise<void> {
const backupQueue = this.getBackupQueue(key);
const idx = backupQueue.findIndex((v) => value.equals(v));
if (idx === -1) return;
backupQueue.splice(idx, 1);
}
override async queueResume(
key: string,
value: Buffer,
prior?: boolean,
): Promise<void> {
const backupQueue = this.getBackupQueue(key);
const idx = backupQueue.findIndex((v) => value.equals(v));
if (idx === -1) return;
backupQueue.splice(idx, 1);
return this.queueAdd(key, value, prior);
}
override async queueResumeAll(key: string, prior?: boolean): Promise<void> {
const backupQueue = this.getBackupQueue(key);
if (prior) {
backupQueue.reverse();
}
for (const value of backupQueue) {
await this.queueAdd(key, value, prior);
}
backupQueue.splice(0, backupQueue.length);
}
override async queueClear(key: string): Promise<void> {
this.queues.delete(key);
}
......
......@@ -133,6 +133,10 @@ export class RedisDriver extends BaseDriver {
return `${this.options.queueKey || '_queue'}:${key}`;
}
private getQueueBackupKey(key: string) {
return `${this.options.queueBackupKey || '_queue_backup'}:${key}`;
}
async queueLength(key: string): Promise<number> {
const _key = this.getQueueKey(key);
return this.pool.use((r) => r.redis.llen(_key));
......@@ -160,32 +164,72 @@ export class RedisDriver extends BaseDriver {
override async queueGather(key: string): Promise<Buffer> {
const _key = this.getQueueKey(key);
const value = await this.pool.use((r) => r.redis.lpopBuffer(_key));
const backupKey = this.getQueueBackupKey(key);
const value = await this.pool.use((r) =>
r.redis.lmoveBuffer(_key, backupKey, 'LEFT', 'RIGHT'),
);
return value || undefined;
}
private waitingBlockingProms = new Map<
Promise<[Buffer, Buffer]>,
() => void
>();
private waitingBlockingProms = new Map<Promise<Buffer>, () => void>();
override async queueGatherBlocking(key: string): Promise<Buffer> {
if (this.quitted) return;
const _key = this.getQueueKey(key);
const backupKey = this.getQueueBackupKey(key);
const res = await this.useTempRedisClient(async (redisClient) => {
try {
const valueProm = redisClient.blpopBuffer(_key, 0);
const valueProm = redisClient.blmoveBuffer(
_key,
backupKey,
'LEFT',
'RIGHT',
0,
);
const exitProm = new Promise<void>((resolve) => {
this.waitingBlockingProms.set(valueProm, resolve);
});
const value = await Promise.race([valueProm, exitProm]);
this.waitingBlockingProms.delete(valueProm);
if (value) return value?.[1];
if (value) return value;
} catch (e) {}
});
return res || this.queueGatherBlocking(key);
}
async queueAck(key: string, value: Buffer): Promise<void> {
if (this.quitted) return;
const backupKey = this.getQueueBackupKey(key);
await this.pool.use((r) => r.redis.lrem(backupKey, 0, value));
}
async queueResume(
key: string,
value: Buffer,
prior?: boolean,
): Promise<void> {
if (this.quitted) return;
const _key = this.getQueueKey(key);
const backupKey = this.getQueueBackupKey(key);
await this.pool.use(async (r) => {
if (!(await r.redis.lrem(backupKey, 0, value))) return;
if (prior) {
await r.redis.rpush(_key, value);
} else {
await r.redis.lpush(_key, value);
}
});
}
async queueResumeAll(key: string, prior?: boolean): Promise<void> {
if (this.quitted) return;
const _key = this.getQueueKey(key);
const backupKey = this.getQueueBackupKey(key);
await this.pool.use(async (r) => {
const values = await r.redis.lrangeBuffer(backupKey, 0, -1);
});
}
async queueClear(key: string): Promise<void> {
const _key = this.getQueueKey(key);
await this.pool.use((r) => r.redis.del(_key));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment