Commit 4dc247fa authored by nanahira's avatar nanahira

finish

parent f8ba79a2
......@@ -3,4 +3,8 @@ bots:
endpoint: 'ws://localhost:6700'
selfId: '1111111111'
token: 'token'
routes: []
routes:
- name: default
botId: '3221204940'
token: 'oONw7YpqUdYuc'
balancePolicy: hash
......@@ -5,21 +5,8 @@ import { KoishiModule } from 'koishi-nestjs';
import { BotLoaderService } from './bot-loader/bot-loader.service';
import { RouteService } from './route/route.service';
import { OnebotGateway } from './onebot.gateway';
import { Adapter, Session } from 'koishi';
import { MessageService } from './message/message.service';
declare module 'koishi' {
interface EventMap {
dispatch: (session: Session) => void;
}
}
const originalDispatch = Adapter.prototype.dispatch;
Adapter.prototype.dispatch = function (this: Adapter, session: Session) {
if (!this.ctx.app.isActive) return;
originalDispatch.call(this, session);
this.ctx.emit(session, 'dispatch', session);
};
import { ReverseWsService } from './reverse-ws/reverse-ws.service';
@Module({
imports: [
......@@ -33,6 +20,12 @@ Adapter.prototype.dispatch = function (this: Adapter, session: Session) {
useWs: true,
}),
],
providers: [BotLoaderService, RouteService, OnebotGateway, MessageService],
providers: [
BotLoaderService,
RouteService,
OnebotGateway,
MessageService,
ReverseWsService,
],
})
export class AppModule {}
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Adapter, Context, Session } from 'koishi';
declare module 'koishi' {
interface EventMap {
dispatch: (session: Session) => void;
}
}
const originalDispatch = Adapter.prototype.dispatch;
Adapter.prototype.dispatch = function (this: Adapter, session: Session) {
if (!this.ctx.app.isActive) return;
originalDispatch.call(this, session);
this.ctx.emit(session, 'dispatch', session);
};
import * as PluginOnebot from '@koishijs/plugin-adapter-onebot';
import { ConfigService } from '@nestjs/config';
import { InjectContext, PluginDef, UsePlugin } from 'koishi-nestjs';
import { BotConfig } from '@koishijs/plugin-adapter-onebot/lib/bot';
import { Context } from 'koishi';
@Injectable()
export class BotLoaderService implements OnModuleInit {
......@@ -15,6 +29,9 @@ export class BotLoaderService implements OnModuleInit {
@UsePlugin()
loadBots() {
const bots = this.config.get<BotConfig[]>('bots');
for (const bot of bots) {
bot.selfId = bot.selfId.toString();
}
return PluginDef(PluginOnebot, { bots });
}
......
......@@ -16,7 +16,7 @@ export class MessageService extends ConsoleLogger {
registerWsEvent(client: WebSocket, route: Route) {
client.on('message', async (data) => {
if (typeof data !== 'string') {
this.warn(`Got non-string.`);
this.warn(`Got non-string from ${route.name}.`);
client.send(
JSON.stringify({
retcode: 1400,
......@@ -24,7 +24,7 @@ export class MessageService extends ConsoleLogger {
data: null,
error: {
code: 1404,
message: `Got non-string`,
message: `Got non-string from ${route.name}.`,
},
}),
);
......@@ -35,7 +35,7 @@ export class MessageService extends ConsoleLogger {
const message = JSON.stringify(await this.onWsEvent(route, parsedData));
client.send(message);
} catch (e) {
this.warn(`Got bad JSON ${data}`);
this.warn(`Got bad JSON ${data} from ${route.name}.`);
client.send(
JSON.stringify({
retcode: 1400,
......@@ -43,7 +43,7 @@ export class MessageService extends ConsoleLogger {
data: null,
error: {
code: 1404,
message: `Got bad JSON.`,
message: `Got bad JSON from ${route.name}.`,
},
}),
);
......@@ -58,33 +58,34 @@ export class MessageService extends ConsoleLogger {
(b) => b.selfId === route.botId && b.platform === 'onebot',
) as OneBotBot;
if (!bot) {
this.error(`Bot ${route.botId} not found`);
this.error(`Bot ${route.botId} from ${route.name} not found.`);
return {
retcode: 1404,
status: 'failed',
data: null,
error: {
code: 1404,
message: `Bot ${route.botId} not found.`,
message: `Bot ${route.botId} from ${route.name} not found.`,
},
echo: data?.echo,
};
}
try {
const result = await bot.internal._request(data.action, data.params);
// console.log(result);
return {
...result,
echo: data?.echo,
};
} catch (e) {
this.error(`Bot ${route.botId} timed out.`);
this.error(`Bot ${route.botId} from ${route.name} timed out.`);
return {
retcode: 1404,
status: 'failed',
data: null,
error: {
code: 1404,
message: `Bot ${route.botId} timed out.`,
message: `Bot ${route.botId} from ${route.name} timed out.`,
},
echo: data?.echo,
};
......
......@@ -56,8 +56,9 @@ export class OnebotGateway
ip: (request.socket.address() as AddressInfo).address,
};
this.clientRouteMap.set(client, clientInfo);
route.addConnection(client);
this.messageService.registerWsEvent(client, route);
this.warn(
this.log(
`Client ${clientInfo.ip} of route ${clientInfo.routeName} connected.`,
);
}
......
import { Test, TestingModule } from '@nestjs/testing';
import { ReverseWsService } from './reverse-ws.service';
describe('ReverseWsService', () => {
let service: ReverseWsService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [ReverseWsService],
}).compile();
service = module.get<ReverseWsService>(ReverseWsService);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
});
import { ConsoleLogger, Injectable } from '@nestjs/common';
import { MessageService } from '../message/message.service';
import { ReverseWsConfig, Route } from '../route/Route';
import { OutgoingHttpHeaders } from 'http';
import WebSocket from 'ws';
@Injectable()
export class ReverseWsService extends ConsoleLogger {
constructor(private meesageService: MessageService) {
super('reverse-ws');
}
initializeReverseWs(route: Route, revConfig: ReverseWsConfig) {
const headers: OutgoingHttpHeaders = {
'X-Self-ID': route.botId,
'X-Client-Role': 'Universal',
'User-Agent': 'OneBot',
};
if (revConfig.token) {
headers['Authorization'] = `Bearer ${revConfig.token}`;
}
const ws = new WebSocket(revConfig.url, { headers });
ws.on('error', (err) =>
this.warn(`Socket from ${route.name} error: ${err.toString()}`),
);
ws.on('open', () => {
this.log(`Route ${route.name} connected to ${revConfig.url}.`);
route.addConnection(ws);
this.meesageService.registerWsEvent(ws, route);
});
ws.on('close', (code, msg) => {
route.removeConnection(ws);
const interval = revConfig.reconnectInterval || 5000;
this.log(
`Route ${route.name} disconnected from ${revConfig.url}: ${code}: ${msg}. Will retry after ${interval} ms.`,
);
setTimeout(() => this.initializeReverseWs(route, revConfig), interval);
});
}
}
......@@ -5,29 +5,37 @@ import { Random, remove } from 'koishi';
import { createHash } from 'crypto';
import { OneBotBot } from '@koishijs/plugin-adapter-onebot/lib/bot';
export type HashPolicy = 'broadcast' | 'random' | 'round-robin' | 'hash';
export type BalancePolicy = 'broadcast' | 'random' | 'round-robin' | 'hash';
export interface ReverseWsConfig {
url: string;
token?: string;
reconnectInterval?: number;
}
export interface RouteConfig {
name: string;
botId: string;
token?: string;
select?: Selection;
hashPolicy?: HashPolicy;
balancePolicy?: BalancePolicy;
heartbeat?: number;
reverseWs?: ReverseWsConfig[];
}
export class Route implements RouteConfig {
connections: WebSocket[] = [];
private connections: WebSocket[] = [];
private roundCount = 0;
ctx: Context;
name: string;
botId: string;
token?: string;
select?: Selection;
hashPolicy?: HashPolicy;
balancePolicy?: BalancePolicy;
heartbeat?: number;
constructor(routeConfig: RouteConfig, ctx: Context) {
Object.assign(this, routeConfig);
this.hashPolicy ||= 'hash';
this.balancePolicy ||= 'hash';
this.botId = this.botId.toString();
this.ctx = this.getFilteredContext(ctx);
if (this.heartbeat) {
setInterval(() => {
......@@ -41,23 +49,47 @@ export class Route implements RouteConfig {
}, this.heartbeat);
}
}
send(data: any, sess: Session) {
send(data: any, sess: Session, allConns = this.connections) {
const message = JSON.stringify(data);
for (const conn of this.getRelatedConnections(sess)) {
conn.send(message, (err) => {});
const conns = this.getRelatedConnections(sess, allConns);
for (const conn of conns) {
conn.send(message, (err) => {
if (err) {
this.ctx
.logger(`route-${this.name}`)
.error(`Failed to send data: ${err.message}`);
if (allConns.length > 1 && conns.length === 1) {
this.ctx
.logger(`route-${this.name}`)
.warn(`Retrying another connection.`);
this.send(
data,
sess,
allConns.filter((c) => c !== conn),
);
}
}
});
}
}
broadcast(data: any) {
const message = JSON.stringify(data);
for (const conn of this.connections) {
conn.send(message, (err) => {});
conn.send(message, (err) => {
if (err) {
this.ctx
.logger(`route-${this.name}`)
.error(`Failed to broadcast data: ${err.message}`);
}
});
}
}
getFilteredContext(ctx: Context) {
const idCtx = ctx.self(this.botId);
if (!this.select) {
return ctx;
return idCtx;
}
return ctx.select(this.select);
return idCtx.select(this.select);
}
private getSequenceFromSession(sess: Session) {
const hash = createHash('md5');
......@@ -68,23 +100,29 @@ export class Route implements RouteConfig {
}
return parseInt(hash.digest('hex'), 16) % 4294967295;
}
getRelatedConnections(sess: Session): WebSocket[] {
switch (this.hashPolicy) {
getRelatedConnections(
sess: Session,
allConns = this.connections,
): WebSocket[] {
if (allConns.length <= 1) {
return allConns;
}
switch (this.balancePolicy) {
case 'broadcast':
return this.connections;
return allConns;
case 'round-robin':
const index = this.roundCount++ % this.connections.length;
return [this.connections[index]];
const index = this.roundCount++ % allConns.length;
return [allConns[index]];
case 'random':
return [Random.pick(this.connections)];
return [Random.pick(allConns)];
case 'hash':
return [
this.connections[
this.getSequenceFromSession(sess) % this.connections.length
],
];
return [allConns[this.getSequenceFromSession(sess) % allConns.length]];
default:
this.ctx
.logger(`route-${this.name}`)
.error(`Unknown policy ${this.balancePolicy}`);
return [];
}
return [];
}
addConnection(conn: WebSocket) {
this.connections.push(conn);
......
......@@ -29,10 +29,6 @@ export class RouteService
return this.routes.get(name);
}
getRoutesFromBot(botId: string) {
return Array.from(this.routes.values()).filter((r) => r.botId === botId);
}
onApplicationBootstrap() {
for (const route of this.routes.values()) {
route.ctx.on('dispatch', (session) => this.onOnebotEvent(session, route));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment