Commit 56ca382a authored by Mrs4s's avatar Mrs4s

reverse websocket supported.

parent 54952523
......@@ -9,7 +9,7 @@
- [x] HTTP API
- [x] 反向HTTP POST
- [x] 正向Websocket
- [ ] 反向Websocket (开发中)
- [x] 反向Websocket (测试中)
#### 实现
<details>
<summary>已实现API</summary>
......
......@@ -38,7 +38,7 @@ func NewQQBot(cli *client.QQClient, conf *global.JsonConfig) *CQBot {
opt.EntryIdxMode = nutsdb.HintBPTSparseIdxMode
db, err := nutsdb.Open(opt)
if err != nil {
log.Fatalf("打开数据库失败, 如果频繁遇到此问题请关闭数据库功能。")
log.Fatalf("打开数据库失败, 如果频繁遇到此问题请清理 data/db 文件夹或关闭数据库功能。")
}
bot.db = db
gob.Register(message.Sender{})
......
......@@ -6,14 +6,15 @@ import (
)
type JsonConfig struct {
Uin int64 `json:"uin"`
Password string `json:"password"`
EnableDB bool `json:"enable_db"`
AccessToken string `json:"access_token"`
Reconnect bool `json:"reconnect"`
ReconnectDelay int `json:"reconnect_delay"`
HttpConfig *GoCQHttpConfig `json:"http_config"`
WSConfig *GoCQWebsocketConfig `json:"ws_config"`
Uin int64 `json:"uin"`
Password string `json:"password"`
EnableDB bool `json:"enable_db"`
AccessToken string `json:"access_token"`
ReLogin bool `json:"relogin"`
ReLoginDelay int `json:"relogin_delay"`
HttpConfig *GoCQHttpConfig `json:"http_config"`
WSConfig *GoCQWebsocketConfig `json:"ws_config"`
ReverseServers []*GoCQReverseWebsocketConfig `json:"ws_reverse_servers"`
}
type CQHttpApiConfig struct {
......@@ -48,6 +49,14 @@ type GoCQWebsocketConfig struct {
Port uint16 `json:"port"`
}
type GoCQReverseWebsocketConfig struct {
Enabled bool `json:"enabled"`
ReverseUrl string `json:"reverse_url"`
ReverseApiUrl string `json:"reverse_api_url"`
ReverseEventUrl string `json:"reverse_event_url"`
ReverseReconnectInterval uint16 `json:"reverse_reconnect_interval"`
}
func DefaultConfig() *JsonConfig {
return &JsonConfig{
EnableDB: true,
......@@ -62,6 +71,15 @@ func DefaultConfig() *JsonConfig {
Host: "0.0.0.0",
Port: 6700,
},
ReverseServers: []*GoCQReverseWebsocketConfig{
{
Enabled: false,
ReverseUrl: "ws://you_websocket_universal.server",
ReverseApiUrl: "ws://you_websocket_api.server",
ReverseEventUrl: "ws://you_websocket_event.server",
ReverseReconnectInterval: 3000,
},
},
}
}
......
......@@ -15,5 +15,6 @@ require (
github.com/tidwall/gjson v1.6.0
github.com/xujiajun/nutsdb v0.5.0
golang.org/x/image v0.0.0-20200618115811-c13761719519
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
)
......@@ -131,18 +131,21 @@ func main() {
if conf.HttpConfig != nil && conf.HttpConfig.Enabled {
server.HttpServer.Run(fmt.Sprintf("%s:%d", conf.HttpConfig.Host, conf.HttpConfig.Port), conf.AccessToken, b)
for k, v := range conf.HttpConfig.PostUrls {
server.NewClient().Run(k, v, b)
server.NewHttpClient().Run(k, v, b)
}
}
if conf.WSConfig != nil && conf.WSConfig.Enabled {
server.WebsocketServer.Run(fmt.Sprintf("%s:%d", conf.WSConfig.Host, conf.WSConfig.Port), conf.AccessToken, b)
}
for _, rc := range conf.ReverseServers {
server.NewWebsocketClient(rc, conf.AccessToken, b).Run()
}
log.Info("资源初始化完成, 开始处理信息.")
log.Info("アトリは、高性能ですから!")
cli.OnDisconnected(func(bot *client.QQClient, e *client.ClientDisconnectedEvent) {
if conf.Reconnect {
log.Warnf("Bot已离线,将在 %v 秒后尝试重连.", conf.ReconnectDelay)
time.Sleep(time.Second * time.Duration(conf.ReconnectDelay))
if conf.ReLogin {
log.Warnf("Bot已离线,将在 %v 秒后尝试重连.", conf.ReLoginDelay)
time.Sleep(time.Second * time.Duration(conf.ReLoginDelay))
rsp, err := cli.Login()
if err != nil {
log.Fatalf("重连失败: %v", err)
......
......@@ -150,7 +150,7 @@ func (s *httpServer) Run(addr, authToken string, bot *coolq.CQBot) {
}()
}
func NewClient() *httpClient {
func NewHttpClient() *httpClient {
return &httpClient{}
}
......@@ -166,9 +166,8 @@ func (c *httpClient) onBotPushEvent(m coolq.MSG) {
var res string
err := gout.POST(c.addr).SetJSON(m).BindBody(&res).SetHeader(func() gout.H {
h := gout.H{
"X-Self_ID": c.bot.Client.Uin,
"X-Client-Role": "Universal",
"User-Agent": "CQHttp/4.15.0",
"X-Self-ID": c.bot.Client.Uin,
"User-Agent": "CQHttp/4.15.0",
}
if c.secret != "" {
mac := hmac.New(sha1.New, []byte(c.secret))
......
......@@ -3,10 +3,13 @@ package server
import (
"fmt"
"github.com/Mrs4s/go-cqhttp/coolq"
"github.com/Mrs4s/go-cqhttp/global"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
wsc "golang.org/x/net/websocket"
"net/http"
"strconv"
"strings"
"sync"
"time"
......@@ -21,6 +24,13 @@ type websocketServer struct {
}
type websocketClient struct {
conf *global.GoCQReverseWebsocketConfig
token string
bot *coolq.CQBot
pushLock *sync.Mutex
universalConn *wsc.Conn
eventConn *wsc.Conn
}
var WebsocketServer = &websocketServer{}
......@@ -46,6 +56,149 @@ func (s *websocketServer) Run(addr, authToken string, b *coolq.CQBot) {
}()
}
func NewWebsocketClient(conf *global.GoCQReverseWebsocketConfig, authToken string, b *coolq.CQBot) *websocketClient {
return &websocketClient{conf: conf, token: authToken, bot: b, pushLock: new(sync.Mutex)}
}
func (c *websocketClient) Run() {
if !c.conf.Enabled {
return
}
if c.conf.ReverseApiUrl != "" {
c.connectApi()
}
if c.conf.ReverseEventUrl != "" {
c.connectEvent()
}
if c.conf.ReverseUrl != "" {
c.connectUniversal()
}
c.bot.OnEventPush(c.onBotPushEvent)
}
func (c *websocketClient) connectApi() {
log.Infof("开始尝试连接到反向Websocket API服务器: %v", c.conf.ReverseApiUrl)
wsConf, err := wsc.NewConfig(c.conf.ReverseApiUrl, c.conf.ReverseApiUrl)
if err != nil {
log.Warnf("连接到反向Websocket API服务器 %v 时出现致命错误: %v", c.conf.ReverseApiUrl, err)
return
}
wsConf.Header["X-Client-Role"] = []string{"API"}
wsConf.Header["X-Self-ID"] = []string{strconv.FormatInt(c.bot.Client.Uin, 10)}
wsConf.Header["User-Agent"] = []string{"CQHttp/4.15.0"}
if c.token != "" {
wsConf.Header["Authorization"] = []string{"Token " + c.token}
}
conn, err := wsc.DialConfig(wsConf)
if err != nil {
log.Warnf("连接到反向Websocket API服务器 %v 时出现错误: %v", c.conf.ReverseApiUrl, err)
if c.conf.ReverseReconnectInterval != 0 {
time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval))
c.connectApi()
}
return
}
log.Infof("已连接到反向Websocket API服务器 %v", c.conf.ReverseApiUrl)
go c.listenApi(conn, false)
}
func (c *websocketClient) connectEvent() {
log.Infof("开始尝试连接到反向Websocket Event服务器: %v", c.conf.ReverseEventUrl)
wsConf, err := wsc.NewConfig(c.conf.ReverseEventUrl, c.conf.ReverseEventUrl)
if err != nil {
log.Warnf("连接到反向Websocket Event服务器 %v 时出现致命错误: %v", c.conf.ReverseApiUrl, err)
return
}
wsConf.Header["X-Client-Role"] = []string{"Event"}
wsConf.Header["X-Self-ID"] = []string{strconv.FormatInt(c.bot.Client.Uin, 10)}
wsConf.Header["User-Agent"] = []string{"CQHttp/4.15.0"}
if c.token != "" {
wsConf.Header["Authorization"] = []string{"Token " + c.token}
}
conn, err := wsc.DialConfig(wsConf)
if err != nil {
log.Warnf("连接到反向Websocket API服务器 %v 时出现错误: %v", c.conf.ReverseApiUrl, err)
if c.conf.ReverseReconnectInterval != 0 {
time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval))
c.connectApi()
}
return
}
log.Infof("已连接到反向Websocket Event服务器 %v", c.conf.ReverseEventUrl)
c.eventConn = conn
}
func (c *websocketClient) connectUniversal() {
log.Infof("开始尝试连接到反向Websocket Universal服务器: %v", c.conf.ReverseUrl)
wsConf, err := wsc.NewConfig(c.conf.ReverseUrl, c.conf.ReverseUrl)
if err != nil {
log.Warnf("连接到反向Websocket Universal服务器 %v 时出现致命错误: %v", c.conf.ReverseUrl, err)
return
}
wsConf.Header["X-Client-Role"] = []string{"Universal"}
wsConf.Header["X-Self-ID"] = []string{strconv.FormatInt(c.bot.Client.Uin, 10)}
wsConf.Header["User-Agent"] = []string{"CQHttp/4.15.0"}
if c.token != "" {
wsConf.Header["Authorization"] = []string{"Token " + c.token}
}
conn, err := wsc.DialConfig(wsConf)
if err != nil {
log.Warnf("连接到反向Websocket Universal服务器 %v 时出现错误: %v", c.conf.ReverseUrl, err)
if c.conf.ReverseReconnectInterval != 0 {
time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval))
c.connectUniversal()
}
return
}
go c.listenApi(conn, true)
c.universalConn = conn
}
func (c *websocketClient) listenApi(conn *wsc.Conn, u bool) {
defer conn.Close()
for {
buf := make([]byte, 10240)
l, err := conn.Read(buf)
if err != nil {
break
}
j := gjson.ParseBytes(buf[:l])
t := strings.ReplaceAll(j.Get("action").Str, "_async", "")
if f, ok := wsApi[t]; ok {
ret := f(c.bot, j.Get("params"))
if j.Get("echo").Exists() {
ret["echo"] = j.Get("echo").Value()
}
_, _ = conn.Write([]byte(ret.ToJson()))
}
}
if c.conf.ReverseReconnectInterval != 0 {
time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval))
if u {
c.connectUniversal()
return
}
c.connectApi()
}
}
func (c *websocketClient) onBotPushEvent(m coolq.MSG) {
c.pushLock.Lock()
defer c.pushLock.Unlock()
if c.eventConn != nil {
if _, err := c.eventConn.Write([]byte(m.ToJson())); err != nil {
_ = c.eventConn.Close()
if c.conf.ReverseReconnectInterval != 0 {
time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval))
c.connectEvent()
}
}
}
if c.universalConn != nil {
_, _ = c.universalConn.Write([]byte(m.ToJson()))
}
}
func (s *websocketServer) event(w http.ResponseWriter, r *http.Request) {
if s.token != "" {
if r.URL.Query().Get("access_token") != s.token && strings.SplitN(r.Header.Get("Authorization"), " ", 2)[1] != s.token {
......@@ -114,7 +267,7 @@ func (s *websocketServer) listenApi(c *websocket.Conn) {
if t == websocket.TextMessage {
j := gjson.ParseBytes(payload)
t := strings.ReplaceAll(j.Get("action").Str, "_async", "") //TODO: async support
log.Infof("API调用: %v", j.Get("action").Str)
//log.Infof("API调用: %v", j.Get("action").Str)
if f, ok := wsApi[t]; ok {
ret := f(s.bot, j.Get("params"))
if j.Get("echo").Exists() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment