Commit 55738829 authored by nanahira's avatar nanahira

refa and add /api/status

parent ce556e1e
export class AccountPoolStatusDto {
free: number;
online: number;
total: number;
queuing: number;
}
import { Test, TestingModule } from '@nestjs/testing';
import { AccountPoolService } from './account-pool.service';
describe('AccountPoolService', () => {
let service: AccountPoolService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [AccountPoolService],
}).compile();
service = module.get<AccountPoolService>(AccountPoolService);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
});
import {
ConsoleLogger,
Injectable,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { ProxyPoolService } from '../proxy-pool/proxy-pool.service';
import { ChatGPTAPIBrowser } from 'chatgpt3';
import { OpenAIAccount } from '../utility/config';
import { AccountState } from './account-state';
import { AccountProviderService } from '../account-provider/account-provider.service';
import { AccountPoolStatusDto } from './account-pool-status.dto';
@Injectable()
export class AccountPoolService
extends ConsoleLogger
implements OnModuleInit, OnModuleDestroy
{
constructor(
private proxyPoolService: ProxyPoolService,
private accountProvider: AccountProviderService,
) {
super('AccountPoolService');
}
private ChatGPTAPIBrowserConstructor: typeof ChatGPTAPIBrowser;
private accounts = new Map<string, AccountState>();
private async initAccount(
account: OpenAIAccount,
retry = true,
): Promise<boolean> {
const proxy = this.proxyPoolService.getProxy();
const state = new AccountState(
account,
new this.ChatGPTAPIBrowserConstructor({
email: account.email,
password: account.password,
isProAccount: !!account.pro,
isGoogleLogin: account.loginType === 'google',
isMicrosoftLogin: account.loginType === 'microsoft',
proxyServer: proxy,
}),
);
const success = await state.init();
if (success) {
this.accounts.set(account.email, state);
return true;
} else if (retry) {
await Promise.all([
new Promise((r) => setTimeout(r, 5000)),
state.close(),
]);
return this.initAccount(account, retry);
} else {
await state.close();
return false;
}
}
private accountInfos: OpenAIAccount[];
async onModuleInit() {
this.ChatGPTAPIBrowserConstructor = (
await eval("import('chatgpt3')")
).ChatGPTAPIBrowser;
await this.proxyPoolService.init();
this.accountInfos = await this.accountProvider.getAccounts();
this.accountInfos.forEach((a) => this.initAccount(a));
}
async onModuleDestroy() {
await Promise.all(Array.from(this.accounts.values()).map((a) => a.close()));
}
randomAccount(exclude: string[] = []) {
const accounts = Array.from(this.accounts.values()).filter(
(a) => !exclude.includes(a.loginInfo.email),
);
const freeAccounts = accounts.filter((a) => a.isFree());
if (freeAccounts.length) {
return freeAccounts[Math.floor(Math.random() * freeAccounts.length)];
}
if (!accounts.length) {
return;
}
let useAccount: AccountState;
for (const account of accounts) {
if (
!useAccount ||
account.queueSize() < useAccount.queueSize() ||
(account.queueSize() === useAccount.queueSize() &&
account.occupyTimestamp < useAccount.occupyTimestamp)
) {
useAccount = account;
}
}
return useAccount;
}
getAccount(email: string, exclude: string[] = []) {
if (!email || exclude.includes(email)) {
return this.randomAccount(exclude);
}
return this.accounts.get(email) || this.randomAccount(exclude);
}
async addAccount(account: OpenAIAccount) {
if (this.accounts.has(account.email)) {
return false;
}
const result = await this.initAccount(account, false);
if (result) {
this.accountInfos.push(account);
}
await this.accountProvider.writeBack(this.accountInfos);
return result;
}
async removeAccount(email: string) {
const index = this.accountInfos.findIndex((a) => a.email === email);
this.accountInfos.splice(index, 1);
if (index === -1) {
return false;
}
const account = this.accounts.get(email);
if (account) {
this.accounts.delete(email);
await account.close();
}
await this.accountProvider.writeBack(this.accountInfos);
return true;
}
getAccountStatus() {
const accounts = Array.from(this.accounts.values());
const plain = {
free: accounts.filter((a) => a.isFree()).length,
online: accounts.length,
total: this.accountInfos.length,
queuing: accounts.reduce((p, c) => p + c.queueSize(), 0),
};
const dto = new AccountPoolStatusDto();
Object.assign(dto, plain);
return dto;
}
}
import type { ChatGPTAPIBrowser } from 'chatgpt3';
import PQueue from 'p-queue';
import { ConsoleLogger } from '@nestjs/common';
import pTimeout from 'p-timeout';
import { OpenAIAccount } from '../utility/config';
export class AccountState {
private lock = new PQueue({ concurrency: 1 });
occupyTimestamp = 0;
private logger = new ConsoleLogger(`OpenAI Account ${this.loginInfo.email}`);
constructor(
public loginInfo: OpenAIAccount,
private instance: ChatGPTAPIBrowser,
) {}
async init() {
this.logger.log('Initializing ChatGPT API');
try {
await pTimeout(this.instance.initSession(), 120000);
} catch (e) {
this.logger.error('Failed to initialize ChatGPT API');
this.logger.error(e);
return false;
}
this.logger.log('Initialized ChatGPT API');
return true;
}
async close() {
return this.instance.closeSession();
}
isFree() {
return this.occupyTimestamp === 0;
}
queueSize() {
return this.lock.size + this.lock.pending;
}
getEmail() {
return this.loginInfo.email;
}
async sendMessage(...args: Parameters<ChatGPTAPIBrowser['sendMessage']>) {
return this.lock.add(async () => {
this.occupyTimestamp = Date.now();
try {
this.logger.log(`Sending message`);
const result = await this.instance.sendMessage(...args);
this.logger.log(`Sent message`);
return result;
} catch (e) {
this.logger.error(`Failed to send message`);
this.logger.error(e);
return;
} finally {
this.occupyTimestamp = 0;
}
});
}
}
import { Test, TestingModule } from '@nestjs/testing';
import { AccountProviderService } from './account-provider.service';
describe('AccountProviderService', () => {
let service: AccountProviderService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [AccountProviderService],
}).compile();
service = module.get<AccountProviderService>(AccountProviderService);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
});
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { OpenAIAccount } from '../utility/config';
@Injectable()
export class AccountProviderService {
constructor(private config: ConfigService) {}
async getAccounts() {
return this.config.get<OpenAIAccount[]>('accounts');
}
async writeBack(accounts: OpenAIAccount[]) {}
}
import {
Body,
Controller,
Header,
Get,
Headers,
Post,
ValidationPipe,
......@@ -15,15 +15,18 @@ import { ConversationDto } from './conversation/conversation.dto';
import {
ApiBody,
ApiCreatedResponse,
ApiHeader,
ApiOkResponse,
ApiOperation,
} from '@nestjs/swagger';
import { TalkDto } from './chatgpt/talk.dto';
import { ResetConversationDto } from './conversation/reset-conversation.dto';
import { ConversationService } from './conversation/conversation.service';
import { AuthService } from './auth/auth.service';
import { AccountPoolStatusDto } from './account-pool/account-pool-status.dto';
import { AccountPoolService } from './account-pool/account-pool.service';
const ResponseDto = ReturnMessageDto(ConversationDto);
const StatusDto = ReturnMessageDto(AccountPoolStatusDto);
@Controller()
export class AppController {
......@@ -31,6 +34,7 @@ export class AppController {
private readonly chatgpt: ChatgptService,
private readonly conversation: ConversationService,
private readonly authService: AuthService,
private readonly accountPool: AccountPoolService,
) {}
@Post('chat')
......@@ -69,4 +73,12 @@ export class AppController {
await this.conversation.resetConversation(dto.session);
return new BlankReturnMessageDto(201, 'success');
}
@Get('status')
@ApiOperation({ summary: 'Get account status.' })
@ApiOkResponse({ type: StatusDto })
async status(@Headers('Authorization') header: string) {
await this.authService.auth(header);
return new StatusDto(200, 'success', this.accountPool.getAccountStatus());
}
}
......@@ -7,6 +7,9 @@ import { ChatgptService } from './chatgpt/chatgpt.service';
import { ConversationService } from './conversation/conversation.service';
import { AuthService } from './auth/auth.service';
import { ProxyPoolService } from './proxy-pool/proxy-pool.service';
import { AccountPoolService } from './account-pool/account-pool.service';
import { AccountProviderService } from './account-provider/account-provider.service';
import { ProxyProviderService } from './proxy-provider/proxy-provider.service';
@Module({
imports: [
......@@ -32,6 +35,14 @@ import { ProxyPoolService } from './proxy-pool/proxy-pool.service';
}),
],
controllers: [AppController],
providers: [ChatgptService, ConversationService, AuthService, ProxyPoolService],
providers: [
ChatgptService,
ConversationService,
AuthService,
ProxyPoolService,
AccountPoolService,
AccountProviderService,
ProxyProviderService,
],
})
export class AppModule {}
import {
ConsoleLogger,
Injectable,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import type { ChatGPTAPIBrowser } from 'chatgpt3';
import { ConsoleLogger, Injectable } from '@nestjs/common';
import { TalkDto } from './talk.dto';
import { ConversationService } from '../conversation/conversation.service';
import { v4 as uuid } from 'uuid';
import { ConversationDto } from '../conversation/conversation.dto';
import { OpenAIAccount } from '../utility/config';
import { BetterLock } from 'better-lock/dist/better_lock';
import { BlankReturnMessageDto } from '../dto/ReturnMessage.dto';
import pTimeout from 'p-timeout';
import { ProxyPoolService } from '../proxy-pool/proxy-pool.service';
import PQueue from 'p-queue';
interface AccountState {
email: string;
account: ChatGPTAPIBrowser;
lock: PQueue;
occupyTimestamp: number;
}
import { AccountPoolService } from '../account-pool/account-pool.service';
@Injectable()
export class ChatgptService
extends ConsoleLogger
implements OnModuleInit, OnModuleDestroy
{
export class ChatgptService extends ConsoleLogger {
constructor(
private config: ConfigService,
private conversationService: ConversationService,
private proxyPoolService: ProxyPoolService,
private accountPoolService: AccountPoolService,
) {
super('ChatGPT');
}
private ChatGPTAPIBrowserConstructor: typeof ChatGPTAPIBrowser;
private accounts = new Map<string, AccountState>();
private async initAccount(account: OpenAIAccount) {
this.log(`Initializing ChatGPT API for ${account.email}`);
const instance = new this.ChatGPTAPIBrowserConstructor({
email: account.email,
password: account.password,
isProAccount: !!account.pro,
isGoogleLogin: account.loginType === 'google',
isMicrosoftLogin: account.loginType === 'microsoft',
proxyServer: this.proxyPoolService.getProxy(),
});
try {
await pTimeout(instance.initSession(), 120000);
this.log(`Initialized ChatGPT API for ${account.email}`);
this.accounts.set(account.email, {
email: account.email,
account: instance,
lock: new PQueue({ concurrency: 1 }),
occupyTimestamp: 0,
});
} catch (e) {
this.error(`Failed to initialize ChatGPT API for ${account.email}`, e);
await Promise.all([
new Promise((r) => setTimeout(r, 5000)),
instance.closeSession().catch(),
]);
return this.initAccount(account);
}
}
randomAccount(exclude: string[] = []) {
const accounts = Array.from(this.accounts.values()).filter(
(a) => !exclude.includes(a.email),
);
const freeAccounts = accounts.filter((a) => a.occupyTimestamp === 0);
if (freeAccounts.length) {
return freeAccounts[Math.floor(Math.random() * freeAccounts.length)];
}
if (!accounts.length) {
return;
}
let useAccount: AccountState;
for (const account of accounts) {
if (
!useAccount ||
account.lock.size < useAccount.lock.size ||
(account.lock.size === useAccount.lock.size &&
account.occupyTimestamp < useAccount.occupyTimestamp)
) {
useAccount = account;
}
}
return useAccount;
}
async onModuleInit() {
this.ChatGPTAPIBrowserConstructor = (
await eval("import('chatgpt3')")
).ChatGPTAPIBrowser;
this.config
.get<OpenAIAccount[]>('accounts')
.forEach((a) => this.initAccount(a));
}
async onModuleDestroy() {
await Promise.all(
Array.from(this.accounts.values()).map((a) => a.account.closeSession()),
);
}
async chatProcess(
question: TalkDto,
failedAccounts: string[] = [],
......@@ -117,44 +24,36 @@ export class ChatgptService
const previousConversation = failedAccounts.length
? undefined
: await this.conversationService.getConversation(session);
const account =
this.accounts.get(previousConversation?.account || '_random') ||
this.randomAccount(failedAccounts);
const account = this.accountPoolService.getAccount(
previousConversation?.account,
failedAccounts,
);
if (!account) {
throw new BlankReturnMessageDto(
500,
'No available accounts',
).toException();
}
const result = await account.lock.add(async () => {
account.occupyTimestamp = Date.now();
try {
this.log(
`Processing conversation with account ${account.email} for ${session}`,
);
return await account.account.sendMessage(question.text, {
...(previousConversation
? {
conversationId: previousConversation.conversationId,
parentMessageId: previousConversation.messageId,
}
: {}),
timeoutMs: 300000,
});
} catch (e) {
this.log(`ChatGPT API for ${account.email} failed: ${e.toString()}`);
return;
} finally {
account.occupyTimestamp = 0;
}
this.log(`Processing chat for ${session} with ${account.getEmail()}`);
const result = await account.sendMessage(question.text, {
...(previousConversation
? {
conversationId: previousConversation.conversationId,
parentMessageId: previousConversation.messageId,
}
: {}),
timeoutMs: 300000,
});
if (!result) {
return this.chatProcess(question, [...failedAccounts, account.email]);
return this.chatProcess(question, [
...failedAccounts,
account.getEmail(),
]);
}
await this.conversationService.saveConversation(
session,
result,
account.email,
account.getEmail(),
);
const dto = new ConversationDto();
dto.session = session;
......
import { NestFactory } from '@nestjs/core';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
import { NestExpressApplication } from '@nestjs/platform-express';
import { AppModule } from './app.module';
import { ConfigService } from '@nestjs/config';
......
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ProxyProviderService } from '../proxy-provider/proxy-provider.service';
@Injectable()
export class ProxyPoolService {
private pointer = 0;
private proxies = this.config.get<string[]>('proxies');
constructor(private config: ConfigService) {}
private proxies: string[] = [];
constructor(private proxyProvider: ProxyProviderService) {}
async init() {
this.proxies = await this.proxyProvider.getProxies();
}
getProxy() {
if (!this.proxies?.length) {
......
import { Test, TestingModule } from '@nestjs/testing';
import { ProxyProviderService } from './proxy-provider.service';
describe('ProxyProviderService', () => {
let service: ProxyProviderService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [ProxyProviderService],
}).compile();
service = module.get<ProxyProviderService>(ProxyProviderService);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
});
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
@Injectable()
export class ProxyProviderService {
constructor(private config: ConfigService) {}
async getProxies() {
return this.config.get<string[]>('proxies');
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment