Commit 2e47df8e authored by nanahira's avatar nanahira

finish

parent b6ccbd7a
Pipeline #6528 canceled with stages
in 1 minute and 26 seconds
......@@ -8,6 +8,40 @@
参考项目的 `config.example.yaml` ,并复制成为 `config.yaml` 以运行。
```yaml
host: 'localhost' # 监听地址
port: 3000 # 监听端口
# OntBot 后端配置。
## 配置模式请参照 [Koishi 文档](https://koishi.js.org/v4/plugins/adapter/onebot.html#%E6%9C%BA%E5%99%A8%E4%BA%BA%E9%80%89%E9%A1%B9)
onebot:
path: /onebot # http 或反向 WebSocket 监听路径
secret: 'secret' # 接收信息时用于验证的字段,应与 OneBot 的 `secret` 配置保持一致。
bots:
- protocol: 'ws' # 可选值: http, ws, ws-reverse
endpoint: 'ws://localhost:6700' # http 或正向 WebSocket 连接地址
selfId: '1111111111' # 机器人 id
token: 'token' # 发送信息时用于验证的字段。
# app 后端路由配置。
## app 后端只支持正向和反向 WebSocket
## 对于每个消息或事件,负载均衡器会发给所有路由的依照策略的某一个连接。
routes:
- name: default # 必填。路由名称。机器人连接的 ws 路径为 ws://<地址>/routes/<name>
selfId: '3221204940' # 必填。机器人 ID,和 OneBot 配置的 selfId 一致
token: 'token' # 连接 token
# 分流策略,有 'broadcast' | 'random' | 'round-robin' | 'hash' 四种。
## 分别为『广播给所有连接』『随机连接』『轮询』『按会话 hash』
## 默认为 hash。在有机器人交互的应用中建议使用 hash
balancePolicy: hash
select: false # 作用域,详见 [Koishi 文档](https://koishi.js.org/v4/guide/plugin/context.html#%E5%9C%A8%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%E4%B8%AD%E4%BD%BF%E7%94%A8%E9%80%89%E6%8B%A9%E5%99%A8)
heartbeat: 3000 # 心跳包的间隔。0 或不填为禁用心跳包。
readonly: false # 该路由是否为只读。只读路由的连接无法对机器人进行写操作,只会得到模拟响应,但是可以进行 get 操作以及接收事件。
rateLimitInterval: 500 # 限速调用间隔,默认 500ms。
wsReverse: # 该路由的反向 WebSocket 配置。可以配置多个。
- endpoint: 'ws://localhost:8080'
token: 'token'
reconnectInterval: 60000 # 重连间隔
```
## Docker
Docker 容器镜像位于 `git-registry.mycard.moe/3rdeye/onebot-lb`。使用时把 `config.yaml` 挂载到 `/usr/src/app/config.yaml` 即可。
......@@ -18,4 +52,26 @@ Docker 容器镜像位于 `git-registry.mycard.moe/3rdeye/onebot-lb`。使用时
* `/onebot` OneBot 后端 (http 或反向 WebSocket) 连接的路径。
* `/route/<app name>` app 后端连接的路径。
\ No newline at end of file
* `/route/<app name>` app 后端连接的路径。
### 分流策略
`onebot-lb` 支持4种负载均衡分流策略,在 `routes` 进的 `balancePolicy` 进行设定该路由的分流策略。
#### broadcast
该模式下,所有消息和事件会发给该路由的每一个连接。这种情况下,每个路由可能会重复收到消息。
#### random
该模式下,所有消息和事件会随机发送给某一个 app 后端连接。
#### round-robin
该模式下,所有消息和事件会依次轮流发送给某一个 app 后端连接。
#### hash
该模式下,消息和事件会依照消息类型,机器人 ID,用户 ID 以及群号等信息进行 hash,并确定发送的目标 app 后端连接。
该模式是默认分流策略,推荐在有机器人交互的环境下使用。
\ No newline at end of file
......@@ -23,7 +23,9 @@ routes:
balancePolicy: hash
select: false # 作用域,详见 [Koishi 文档](https://koishi.js.org/v4/guide/plugin/context.html#%E5%9C%A8%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%E4%B8%AD%E4%BD%BF%E7%94%A8%E9%80%89%E6%8B%A9%E5%99%A8)
heartbeat: 3000 # 心跳包的间隔。0 或不填为禁用心跳包。
readonly: false # 该路由是否为只读。只读路由的连接无法对机器人进行写操作,只会得到模拟响应,但是可以进行 get 操作以及接收事件。
rateLimitInterval: 500 # 限速调用间隔,默认 500ms。
wsReverse: # 该路由的反向 WebSocket 配置。可以配置多个。
- endpoint: 'ws://localhost:8080'
token: 'token'
reconnectInterval: 60000 # 重连间隔
reconnectInterval: 5000 # 重连间隔
......@@ -14,6 +14,7 @@
"@nestjs/config": "^1.0.3",
"@nestjs/core": "^8.0.0",
"@nestjs/platform-express": "^8.0.0",
"@nestjs/schedule": "^1.0.1",
"@nestjs/swagger": "^5.1.4",
"@nestjs/websockets": "^8.1.2",
"class-transformer": "^0.4.0",
......@@ -1808,6 +1809,20 @@
"rxjs": "^7.1.0"
}
},
"node_modules/@nestjs/schedule": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/@nestjs/schedule/-/schedule-1.0.1.tgz",
"integrity": "sha512-EU2tB4rxuEgum8JlorAFvXkU982EYZm/IBa7n6kgkyps5BbxQSFf7iR1CLkP9zODO9ApZTWk5z3q9L3O7vrkoQ==",
"dependencies": {
"cron": "1.7.2",
"uuid": "8.3.2"
},
"peerDependencies": {
"@nestjs/common": "^6.10.11 || ^7.0.0 || ^8.0.0",
"@nestjs/core": "^7.0.0 || ^8.0.0",
"reflect-metadata": "^0.1.12"
}
},
"node_modules/@nestjs/schematics": {
"version": "8.0.4",
"resolved": "https://registry.npmjs.org/@nestjs/schematics/-/schematics-8.0.4.tgz",
......@@ -3758,6 +3773,14 @@
"integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
"dev": true
},
"node_modules/cron": {
"version": "1.7.2",
"resolved": "https://registry.npmjs.org/cron/-/cron-1.7.2.tgz",
"integrity": "sha512-+SaJ2OfeRvfQqwXQ2kgr0Y5pzBR/lijf5OpnnaruwWnmI799JfWr2jN2ItOV9s3A/+TFOt6mxvKzQq5F0Jp6VQ==",
"dependencies": {
"moment-timezone": "^0.5.x"
}
},
"node_modules/cross-spawn": {
"version": "7.0.3",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz",
......@@ -7290,6 +7313,25 @@
"mkdirp": "bin/cmd.js"
}
},
"node_modules/moment": {
"version": "2.29.1",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz",
"integrity": "sha512-kHmoybcPV8Sqy59DwNDY3Jefr64lK/by/da0ViFcuA4DH0vQg5Q6Ze5VimxkfQNSC+Mls/Kx53s7TjP1RhFEDQ==",
"engines": {
"node": "*"
}
},
"node_modules/moment-timezone": {
"version": "0.5.33",
"resolved": "https://registry.npmjs.org/moment-timezone/-/moment-timezone-0.5.33.tgz",
"integrity": "sha512-PTc2vcT8K9J5/9rDEPe5czSIKgLoGsH8UNpA4qZTVw0Vd/Uz19geE9abbIOQKaAQFcnQ3v5YEXrbSc5BpshH+w==",
"dependencies": {
"moment": ">= 2.9.0"
},
"engines": {
"node": "*"
}
},
"node_modules/ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
......@@ -11195,6 +11237,15 @@
"ws": "7.5.5"
}
},
"@nestjs/schedule": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/@nestjs/schedule/-/schedule-1.0.1.tgz",
"integrity": "sha512-EU2tB4rxuEgum8JlorAFvXkU982EYZm/IBa7n6kgkyps5BbxQSFf7iR1CLkP9zODO9ApZTWk5z3q9L3O7vrkoQ==",
"requires": {
"cron": "1.7.2",
"uuid": "8.3.2"
}
},
"@nestjs/schematics": {
"version": "8.0.4",
"resolved": "https://registry.npmjs.org/@nestjs/schematics/-/schematics-8.0.4.tgz",
......@@ -12743,6 +12794,14 @@
"integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
"dev": true
},
"cron": {
"version": "1.7.2",
"resolved": "https://registry.npmjs.org/cron/-/cron-1.7.2.tgz",
"integrity": "sha512-+SaJ2OfeRvfQqwXQ2kgr0Y5pzBR/lijf5OpnnaruwWnmI799JfWr2jN2ItOV9s3A/+TFOt6mxvKzQq5F0Jp6VQ==",
"requires": {
"moment-timezone": "^0.5.x"
}
},
"cross-spawn": {
"version": "7.0.3",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz",
......@@ -15443,6 +15502,19 @@
"minimist": "^1.2.5"
}
},
"moment": {
"version": "2.29.1",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz",
"integrity": "sha512-kHmoybcPV8Sqy59DwNDY3Jefr64lK/by/da0ViFcuA4DH0vQg5Q6Ze5VimxkfQNSC+Mls/Kx53s7TjP1RhFEDQ=="
},
"moment-timezone": {
"version": "0.5.33",
"resolved": "https://registry.npmjs.org/moment-timezone/-/moment-timezone-0.5.33.tgz",
"integrity": "sha512-PTc2vcT8K9J5/9rDEPe5czSIKgLoGsH8UNpA4qZTVw0Vd/Uz19geE9abbIOQKaAQFcnQ3v5YEXrbSc5BpshH+w==",
"requires": {
"moment": ">= 2.9.0"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
......
......@@ -8,9 +8,11 @@ import { OnebotGateway } from './onebot.gateway';
import { MessageService } from './message/message.service';
import { ReverseWsService } from './reverse-ws/reverse-ws.service';
import { WaitBotService } from './wait-bot/wait-bot.service';
import { ScheduleModule } from '@nestjs/schedule';
@Module({
imports: [
ScheduleModule.forRoot(),
ConfigModule.forRoot({
ignoreEnvVars: true,
load: [loadConfig],
......
......@@ -4,9 +4,19 @@ import { Context } from 'koishi';
import WebSocket from 'ws';
import { Route } from '../route/Route';
import { genMetaEvent } from '../utility/oicq';
import { OnebotProtocol } from '../utility/onebot-protocol';
import {
OnebotProtocol,
OnebotAsyncResponseWithEcho,
} from '../utility/onebot-protocol';
import { OneBotBot } from '@koishijs/plugin-adapter-onebot/lib/bot';
import { WaitBotService } from '../wait-bot/wait-bot.service';
import { Interval } from '@nestjs/schedule';
export interface SendTask {
bot: OneBotBot;
route: Route;
data: OnebotProtocol;
}
@Injectable()
export class MessageService extends ConsoleLogger {
......@@ -57,6 +67,37 @@ export class MessageService extends ConsoleLogger {
client.send(JSON.stringify(genMetaEvent(route.selfId, 'enable')));
}
private async sendToBot(task: SendTask) {
await this.waitBot.waitForBotOnline(task.bot);
try {
const result = await task.bot.internal._request(
task.data.action,
task.data.params,
);
// console.log(result);
return {
...result,
echo: task.data?.echo,
};
} catch (e) {
this.error(
`Bot ${task.route.selfId} from ${
task.route.name
} errored: ${e.toString()}`,
);
return {
retcode: 1404,
status: 'failed',
data: null,
error: {
code: 1404,
message: `Bot ${task.route.selfId} from ${task.route.name} errored.`,
},
echo: task.data?.echo,
};
}
}
private async onWsEvent(route: Route, data: OnebotProtocol) {
const bot = this.ctx.bots.find(
(b) => b.selfId === route.selfId && b.platform === 'onebot',
......@@ -74,28 +115,49 @@ export class MessageService extends ConsoleLogger {
echo: data?.echo,
};
}
await this.waitBot.waitForBotOnline(bot);
try {
const result = await bot.internal._request(data.action, data.params);
// console.log(result);
return {
...result,
echo: data?.echo,
};
} catch (e) {
this.error(
`Bot ${route.selfId} from ${route.name} errored: ${e.toString()}`,
);
// eslint-disable-next-line prefer-const
let { action, params, echo } = data;
const isAsync = action.endsWith('_async');
if (isAsync) action = action.replace('_async', '');
const isQueue = action.endsWith('_rate_limited');
if (isQueue) action = action.replace('_rate_limited', '');
const task: SendTask = { bot, route, data: { action, params, echo } };
if (route.readonly && !action.startsWith('get_')) {
if (isAsync || isQueue) {
return OnebotAsyncResponseWithEcho(echo);
}
return {
retcode: 1404,
status: 'failed',
data: null,
error: {
code: 1404,
message: `Bot ${route.selfId} from ${route.name} errored.`,
},
retcode: 0,
status: 'ok',
data: action.startsWith('send_')
? { message_id: Math.floor(Math.random() * 10000) }
: null,
echo: data?.echo,
};
}
if (isQueue) {
route.addSendTask(task);
return OnebotAsyncResponseWithEcho(echo);
}
const prom = this.sendToBot(task);
if (isAsync) {
return OnebotAsyncResponseWithEcho(echo);
}
return prom;
}
registerRouteTaskInterval(route: Route) {
setInterval(
() => this.resolveSendTaskOfRoute(route),
route.rateLimitInterval,
);
}
private async resolveSendTaskOfRoute(route: Route) {
const task = route.fetchSendTask();
if (!task) {
return;
}
await this.sendToBot(task);
}
}
......@@ -3,6 +3,7 @@ import type WebSocket from 'ws';
import { Context, Session } from 'koishi';
import { Random, remove } from 'koishi';
import { createHash } from 'crypto';
import { SendTask } from '../message/message.service';
export type BalancePolicy = 'broadcast' | 'random' | 'round-robin' | 'hash';
......@@ -19,11 +20,14 @@ export interface RouteConfig {
select?: Selection;
balancePolicy?: BalancePolicy;
heartbeat?: number;
readonly?: boolean;
rateLimitInterval?: number;
reverseWs?: ReverseWsConfig[];
}
export class Route implements RouteConfig {
private connections: WebSocket[] = [];
private roundCount = 0;
private sendQueue: SendTask[] = [];
ctx: Context;
name: string;
selfId: string;
......@@ -32,10 +36,13 @@ export class Route implements RouteConfig {
balancePolicy?: BalancePolicy;
heartbeat?: number;
reverseWs?: ReverseWsConfig[];
readonly?: boolean;
rateLimitInterval: number;
preMessages: { data: any; session: Session }[] = [];
constructor(routeConfig: RouteConfig, ctx: Context) {
Object.assign(this, routeConfig);
this.balancePolicy ||= 'hash';
this.rateLimitInterval ||= 500;
this.selfId = this.selfId.toString();
this.ctx = this.getFilteredContext(ctx);
if (this.heartbeat) {
......@@ -96,11 +103,22 @@ export class Route implements RouteConfig {
}
return idCtx.select(this.select);
}
static sessionKeys: (keyof Session)[] = [
'selfId',
'guildId',
'userId',
'channelId',
'operatorId',
'type',
'subtype',
'subsubtype',
];
private getSequenceFromSession(sess: Session) {
const hash = createHash('md5');
for (const key of ['selfId', 'guildId', 'userId', 'channelId']) {
if (sess[key]) {
hash.update(sess[key]);
for (const key of Route.sessionKeys) {
const value = sess[key] as string;
if (value) {
hash.update(value);
}
}
return parseInt(hash.digest('hex'), 16) % 4294967295;
......@@ -140,4 +158,13 @@ export class Route implements RouteConfig {
removeConnection(conn: WebSocket) {
remove(this.connections, conn);
}
addSendTask(task: SendTask) {
this.sendQueue.push(task);
}
fetchSendTask() {
if (!this.sendQueue.length) {
return;
}
return this.sendQueue.shift();
}
}
......@@ -8,6 +8,7 @@ import { Route, RouteConfig } from './Route';
import { InjectContextPlatform } from 'koishi-nestjs';
import { Context, Session } from 'koishi';
import { ReverseWsService } from '../reverse-ws/reverse-ws.service';
import { MessageService } from '../message/message.service';
@Injectable()
export class RouteService
......@@ -18,6 +19,7 @@ export class RouteService
config: ConfigService,
@InjectContextPlatform('onebot') private ctx: Context,
private reverseWsService: ReverseWsService,
private messageService: MessageService,
) {
super('route');
const routeConfs = config.get<RouteConfig[]>('routes');
......@@ -31,6 +33,10 @@ export class RouteService
return this.routes.get(name);
}
getAllRoutes() {
return Array.from(this.routes.values());
}
onApplicationBootstrap() {
for (const route of this.routes.values()) {
route.ctx.on('dispatch', (session) => this.onOnebotEvent(session, route));
......@@ -39,6 +45,7 @@ export class RouteService
this.reverseWsService.initializeReverseWs(route, revConfig);
}
}
this.messageService.registerRouteTaskInterval(route);
}
}
......
import { Test, TestingModule } from '@nestjs/testing';
import { SendQueueService } from './send-queue.service';
describe('SendQueueService', () => {
let service: SendQueueService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [SendQueueService],
}).compile();
service = module.get<SendQueueService>(SendQueueService);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
});
import { Injectable } from '@nestjs/common';
@Injectable()
export class SendQueueService {}
......@@ -4,17 +4,17 @@ export interface OnebotProtocol {
echo?: any;
}
export const OnebotWsResponse = {
export const OnebotAsyncResponse = {
retcode: 1,
status: 'async',
data: null,
error: null,
};
export const OnebotWsResponseString = JSON.stringify(OnebotWsResponse);
export const OnebotAsyncResponseString = JSON.stringify(OnebotAsyncResponse);
export function OnebotWsResponseWithEcho(echo: any) {
export function OnebotAsyncResponseWithEcho(echo: any) {
return {
...OnebotWsResponse,
...OnebotAsyncResponse,
echo,
};
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment