Commit 54cbd172 authored by nanahira's avatar nanahira

add init account queue per proxy

parent 8af6f27f
......@@ -12,6 +12,13 @@ import { AccountProviderService } from '../account-provider/account-provider.ser
import { AccountPoolStatusDto } from './account-pool-status.dto';
import { Interval } from '@nestjs/schedule';
import BetterLock from 'better-lock';
import { resolve } from 'path';
interface AddAccountPayload extends OpenAIAccount {
resolve: (success: boolean) => void;
reject: (error: any) => void;
retry: boolean;
}
@Injectable()
export class AccountPoolService
......@@ -31,11 +38,14 @@ export class AccountPoolService
private async initAccount(
account: OpenAIAccount,
retry = true,
proxy?: string,
): Promise<boolean> {
if (this.accounts.has(account.email)) {
return false;
}
const proxy = await this.proxyPoolService.getProxy();
if (!proxy) {
proxy = await this.proxyPoolService.getProxy();
}
const state = new AccountState(
account,
new this.ChatGPTAPIBrowserConstructor({
......@@ -44,7 +54,7 @@ export class AccountPoolService
isProAccount: !!account.pro,
isGoogleLogin: account.loginType === 'google',
isMicrosoftLogin: account.loginType === 'microsoft',
proxyServer: proxy,
proxyServer: proxy || undefined,
}),
);
this.log(`Adding account ${account.email}`);
......@@ -54,17 +64,52 @@ export class AccountPoolService
this.log(`Added account ${account.email}`);
return true;
} else if (retry) {
await Promise.all([
new Promise((r) => setTimeout(r, 5000)),
state.close(),
]);
return await this.initAccount(account, retry);
await state.close();
return this.initAccountQueued(account, retry);
} else {
await state.close();
return false;
}
}
private initAccountQueue = new Map<string, AddAccountPayload[]>();
private async initAccountQueued(account: OpenAIAccount, retry = true) {
if (this.accounts.has(account.email)) {
return false;
}
const proxy = (await this.proxyPoolService.getProxy()) || '';
if (!this.initAccountQueue.has(proxy)) {
this.initAccountQueue.set(proxy, []);
}
const queue = this.initAccountQueue.get(proxy);
return new Promise<boolean>((resolve, reject) =>
queue.push({
...account,
retry,
resolve,
reject,
}),
);
}
@Interval(1000 * 10)
private async initAccountQueueResolve() {
const entries = [...this.initAccountQueue.entries()];
await Promise.all(
entries.map(async ([proxy, queue]) => {
const item = queue.shift();
if (!item) {
return;
}
try {
item.resolve(await this.initAccount(item, item.retry, proxy));
} catch (e) {
item.reject(e);
}
}),
);
}
private accountInfos: OpenAIAccount[];
@Interval(1000 * 60 * 5)
......@@ -91,7 +136,17 @@ export class AccountPoolService
await eval("import('chatgpt3')")
).ChatGPTAPIBrowser;
this.accountInfos = await this.accountProvider.get();
this.accountInfos.forEach((a) => this.addAccount(a, true));
this.accountInfos.forEach((a, i) => this.addAccount(a, true, i === 0));
}
private addAccountLock = new BetterLock();
async addAccount(account: OpenAIAccount, retry = false, noQueue = false) {
return this.addAccountLock.acquire(account.email, () =>
noQueue
? this.initAccount(account, retry)
: this.initAccountQueued(account, retry),
);
}
async onModuleDestroy() {
......@@ -134,14 +189,6 @@ export class AccountPoolService
return this.accounts.get(email) || this.randomAccount(exclude);
}
private addAccountLock = new BetterLock();
async addAccount(account: OpenAIAccount, retry = false) {
return this.addAccountLock.acquire(account.email, () =>
this.initAccount(account, retry),
);
}
async removeAccount(email: string) {
const state = this.accounts.get(email);
if (!state) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment