Commit e8394e3d authored by nanahira's avatar nanahira

Merge branch 'srvpro2' into mc

parents bb037bee caabc5aa
"use strict";
const Struct = require('./struct.js').Struct;
const fs = require("fs");
const _ = require("underscore");
function loadJSON(path) {
return JSON.parse(fs.readFileSync(path, "utf8"));
}
class Handler {
constructor(handler, synchronous) {
this.handler = handler;
this.synchronous = synchronous || false;
}
async handle(buffer, info, datas, params) {
if (this.synchronous) {
return !!(await this.handler(buffer, info, datas, params));
} else {
const newBuffer = Buffer.from(buffer);
const newDatas = datas.map(b => Buffer.from(b));
this.handler(newBuffer, info, newDatas, params);
return false;
}
}
}
class YGOProMessagesHelper {
constructor() {
this.initDatas();
this.initStructs();
}
initDatas() {
this.structs_declaration = loadJSON('./data/structs.json');
this.typedefs = loadJSON('./data/typedefs.json');
this.proto_structs = loadJSON('./data/proto_structs.json');
this.constants = loadJSON('./data/constants.json');
}
initStructs() {
for (let name in this.structs_declaration ) {
const declaration = this.structs_declaration [name];
let result = Struct();
for (let field of declaration) {
if (field.encoding) {
switch (field.encoding) {
case "UTF-16LE":
result.chars(field.name, field.length * 2, field.encoding);
break;
default:
throw `unsupported encoding: ${field.encoding}`;
}
} else {
let type = field.type;
if (this.typedefs[type]) {
type = this.typedefs[type];
}
if (field.length) {
result.array(field.name, field.length, type); //不支持结构体
} else {
if (this.structs[type]) {
result.struct(field.name, this.structs[type]);
} else {
result[type](field.name);
}
}
}
}
this.structs[name] = result;
}
}
getDirectionAndProto(protoStr) {
const protoStrMatch = protoStr.match(/^(STOC|CTOS)_([_A-Z]+)$/);
if (!protoStrMatch) {
throw `Invalid proto string: ${protoStr}`
}
return {
direction: protoStrMatch[1].toUpperCase(),
proto: protoStrMatch[2].toUpperCase()
}
}
translateProto(proto, direction) {
const directionProtoList = this.constants[direction];
if (typeof proto !== "string") {
return proto;
}
let translatedProto = _.find(Object.keys(directionProtoList), p => {
return directionProtoList[p] === proto;
});
if (!translatedProto) {
throw `unknown proto ${direction} ${proto}`;
}
return translatedProto;
}
sendMessage(socket, protostr, info) {
const {
direction,
proto
} = this.getDirectionAndProto(protostr);
let buffer;
if (socket.closed) {
return;
}
//console.log(proto, this.proto_structs[direction][proto]);
//const directionProtoList = this.constants[direction];
if (typeof info === 'undefined') {
buffer = "";
} else if (Buffer.isBuffer(info)) {
buffer = info;
} else {
let struct = this.structs[this.proto_structs[direction][proto]];
struct.allocate();
struct.set(info);
buffer = struct.buffer();
}
const translatedProto = this.translateProto(proto, direction);
let header = Buffer.allocUnsafe(3);
header.writeUInt16LE(buffer.length + 1, 0);
header.writeUInt8(translatedProto, 2);
socket.write(header);
if (buffer.length) {
socket.write(buffer);
}
}
addHandler(protostr, handler, synchronous, priority) {
if (priority < 0 || priority > 4) {
throw "Invalid priority: " + priority;
}
let {
direction,
proto
} = this.getDirectionAndProto(protostr);
synchronous = synchronous || false;
priority = priority || 1;
const handlerObj = new Handler(handler, synchronous);
let handlerCollection = this.handlers[direction][priority];
const translatedProto = this.translateProto(proto, direction);
if (!handlerCollection[translatedProto]) {
handlerCollection[translatedProto] = [];
}
handlerCollection[translatedProto].push(handlerObj);
}
async handleBuffer(messageBuffer, direction, protoFilter, params) {
let feedback = null;
let messageLength = 0;
let bufferProto = 0;
let datas = [];
for (let l = 0; l < 1000; ++l) {
if (messageLength === 0) {
if (messageBuffer.length >= 2) {
messageLength = messageBuffer.readUInt16LE(0);
} else {
if (messageBuffer.length !== 0) {
feedback = {
type: "BUFFER_LENGTH",
message: `Bad ${direction} buffer length`
};
}
break;
}
} else if (bufferProto === 0) {
if (messageBuffer.length >= 3) {
bufferProto = messageBuffer.readUInt8(2);
} else {
feedback = {
type: "PROTO_LENGTH",
message: `Bad ${direction} proto length`
};
break;
}
} else {
if (messageBuffer.length >= 2 + messageLength) {
const proto = this.constants[direction][bufferProto];
let cancel = proto && protoFilter && _.indexOf(protoFilter, proto) === -1;
let buffer = messageBuffer.slice(3, 2 + messageLength);
//console.log(l, direction, proto, cancel);
for (let priority = 0; priority < 4; ++priority) {
if (cancel) {
break;
}
const handlerCollection = this.handlers[direction][priority];
if (proto && handlerCollection[bufferProto]) {
let struct = this.structs[this.proto_structs[direction][proto]];
let info = null;
if (struct) {
struct._setBuff(buffer);
info = _.clone(struct.fields);
}
for (let handler of handlerCollection[bufferProto]) {
cancel = await handler.handle(buffer, info, datas, params);
if (cancel) {
break;
}
}
}
}
if (!cancel) {
datas.push(messageBuffer.slice(0, 2 + messageLength));
}
messageBuffer = messageBuffer.slice(2 + messageLength);
messageLength = 0;
bufferProto = 0;
} else {
if (direction === "STOC" || messageLength !== 17735) {
feedback = {
type: "MESSAGE_LENGTH",
message: `Bad ${direction} message length`
};
}
break;
}
}
if (l === 999) {
feedback = {
type: "OVERSIZE",
message: `Oversized ${direction}`
};
}
}
return {
datas,
feedback
};
}
}
YGOProMessagesHelper.prototype.structs = {}
YGOProMessagesHelper.prototype.handlers = {
STOC: [{},
{},
{},
{},
{},
],
CTOS: [{},
{},
{},
{},
{},
]
}
module.exports = YGOProMessagesHelper;
......@@ -1732,163 +1732,60 @@ net.createServer (client) ->
client.on 'data', (ctos_buffer) ->
if client.is_post_watcher
room=ROOM_all[client.rid]
room.watcher.write ctos_buffer if room and !CLIENT_is_banned_by_mc(client)
if room
handle_data = await ygopro.helper.handleBuffer(ctos_buffer, "CTOS", ["CHAT"], {
client: client,
server: client.server
})
if handle_data.feedback
log.warn(handle_data.feedback.message, client.name, client.ip)
if handle_data.feedback.type == "OVERSIZE" or ROOM_bad_ip[client.ip] > 5
bad_ip_count = ROOM_bad_ip[client.ip]
if bad_ip_count
ROOM_bad_ip[client.ip] = bad_ip_count + 1
else
ROOM_bad_ip[client.ip] = 1
CLIENT_kick(client)
return
room.watcher.write(buffer) for buffer in handle_data.datas
else
#ctos_buffer = Buffer.alloc(0)
ctos_message_length = 0
ctos_proto = 0
#ctos_buffer = Buffer.concat([ctos_buffer, data], ctos_buffer.length + data.length) #buffer的错误使用方式,好孩子不要学
datas = []
looplimit = 0
while true
if ctos_message_length == 0
if ctos_buffer.length >= 2
ctos_message_length = ctos_buffer.readUInt16LE(0)
else
log.warn("bad ctos_buffer length", client.ip) unless ctos_buffer.length == 0
break
else if ctos_proto == 0
if ctos_buffer.length >= 3
ctos_proto = ctos_buffer.readUInt8(2)
else
log.warn("bad ctos_proto length", client.ip)
break
else
if ctos_buffer.length >= 2 + ctos_message_length
#console.log client.pos, "CTOS", ygopro.constants.CTOS[ctos_proto]
cancel = false
if settings.modules.reconnect.enabled and client.pre_reconnecting and ygopro.constants.CTOS[ctos_proto] != 'UPDATE_DECK'
cancel = true
b = ctos_buffer.slice(3, ctos_message_length - 1 + 3)
info = null
struct = ygopro.structs[ygopro.proto_structs.CTOS[ygopro.constants.CTOS[ctos_proto]]]
if struct and !cancel
struct._setBuff(b)
info = _.clone(struct.fields)
if ygopro.ctos_follows_before[ctos_proto] and !cancel
for ctos_event in ygopro.ctos_follows_before[ctos_proto]
result = ctos_event.callback b, info, client, client.server, datas
if result and ctos_event.synchronous
cancel = true
if struct and !cancel
struct._setBuff(b)
info = _.clone(struct.fields)
if ygopro.ctos_follows[ctos_proto] and !cancel
result = ygopro.ctos_follows[ctos_proto].callback b, info, client, client.server, datas
if result and ygopro.ctos_follows[ctos_proto].synchronous
cancel = true
if struct and !cancel
struct._setBuff(b)
info = _.clone(struct.fields)
if ygopro.ctos_follows_after[ctos_proto] and !cancel
for ctos_event in ygopro.ctos_follows_after[ctos_proto]
result = ctos_event.callback b, info, client, client.server, datas
if result and ctos_event.synchronous
cancel = true
datas.push ctos_buffer.slice(0, 2 + ctos_message_length) unless cancel
ctos_buffer = ctos_buffer.slice(2 + ctos_message_length)
ctos_message_length = 0
ctos_proto = 0
else
log.warn("bad ctos_message length", client.ip, ctos_buffer.length, ctos_message_length, ctos_proto) if ctos_message_length != 17735
break
looplimit++
#log.info(looplimit)
if looplimit > 800 or ROOM_bad_ip[client.ip] > 5
log.info("error ctos", client.name, client.ip)
ctos_filter = if settings.modules.reconnect.enabled and client.pre_reconnecting then ["UPDATE_DECK"] else null
handle_data = await ygopro.helper.handleBuffer(ctos_buffer, "CTOS", ctos_filter, {
client: client,
server: client.server
})
if handle_data.feedback
log.warn(handle_data.feedback.message, client.name, client.ip)
if handle_data.feedback.type == "OVERSIZE" or ROOM_bad_ip[client.ip] > 5
bad_ip_count = ROOM_bad_ip[client.ip]
if bad_ip_count
ROOM_bad_ip[client.ip] = bad_ip_count + 1
else
ROOM_bad_ip[client.ip] = 1
CLIENT_kick(client)
break
return
if !client.server
return
if client.established
client.server.write buffer for buffer in datas
client.server.write buffer for buffer in handle_data.datas
else
client.pre_establish_buffers.push buffer for buffer in datas
client.pre_establish_buffers.push buffer for buffer in handle_data.datas
return
# 服务端到客户端(stoc)
server.on 'data', (stoc_buffer)->
#stoc_buffer = Buffer.alloc(0)
stoc_message_length = 0
stoc_proto = 0
#stoc_buffer = Buffer.concat([stoc_buffer, data], stoc_buffer.length + data.length) #buffer的错误使用方式,好孩子不要学
#unless ygopro.stoc_follows[stoc_proto] and ygopro.stoc_follows[stoc_proto].synchronous
#server.client.write data
datas = []
looplimit = 0
while true
if stoc_message_length == 0
if stoc_buffer.length >= 2
stoc_message_length = stoc_buffer.readUInt16LE(0)
else
log.warn("bad stoc_buffer length", server.client.ip) unless stoc_buffer.length == 0
break
else if stoc_proto == 0
if stoc_buffer.length >= 3
stoc_proto = stoc_buffer.readUInt8(2)
else
log.warn("bad stoc_proto length", server.client.ip)
break
else
if stoc_buffer.length >= 2 + stoc_message_length
#console.log client.pos, "STOC", ygopro.constants.STOC[stoc_proto]
cancel = false
b = stoc_buffer.slice(3, stoc_message_length - 1 + 3)
info = null
struct = ygopro.structs[ygopro.proto_structs.STOC[ygopro.constants.STOC[stoc_proto]]]
if struct and !cancel
struct._setBuff(b)
info = _.clone(struct.fields)
if ygopro.stoc_follows_before[stoc_proto] and !cancel
for stoc_event in ygopro.stoc_follows_before[stoc_proto]
result = stoc_event.callback b, info, server.client, server, datas
if result and stoc_event.synchronous
cancel = true
if struct and !cancel
struct._setBuff(b)
info = _.clone(struct.fields)
if ygopro.stoc_follows[stoc_proto] and !cancel
result = ygopro.stoc_follows[stoc_proto].callback b, info, server.client, server, datas
if result and ygopro.stoc_follows[stoc_proto].synchronous
cancel = true
if struct and !cancel
struct._setBuff(b)
info = _.clone(struct.fields)
if ygopro.stoc_follows_after[stoc_proto] and !cancel
for stoc_event in ygopro.stoc_follows_after[stoc_proto]
result = stoc_event.callback b, info, server.client, server, datas
if result and stoc_event.synchronous
cancel = true
datas.push stoc_buffer.slice(0, 2 + stoc_message_length) unless cancel
stoc_buffer = stoc_buffer.slice(2 + stoc_message_length)
stoc_message_length = 0
stoc_proto = 0
else
log.warn("bad stoc_message length", server.client.ip)
break
looplimit++
#log.info(looplimit)
if looplimit > 800
log.info("error stoc", server.client.name)
handle_data = await ygopro.helper.handleBuffer(stoc_buffer, "STOC", null, {
client: server.client,
server: server
})
if handle_data.feedback
log.warn(handle_data.feedback.message, server.client.name, server.client.ip)
if handle_data.feedback.type == "OVERSIZE"
server.destroy()
break
return
if server.client and !server.client.closed
server.client.write buffer for buffer in datas
server.client.write buffer for buffer in handle_data.datas
return
return
......
This diff is collapsed.
......@@ -7,142 +7,44 @@ loadJSON = require('load-json-file').sync
@i18ns = loadJSON './data/i18n.json'
#常量/类型声明
structs_declaration = loadJSON './data/structs.json' #结构体声明
typedefs = loadJSON './data/typedefs.json' #类型声明
@proto_structs = loadJSON './data/proto_structs.json' #消息与结构体的对应,未完成,对着duelclient.cpp加
@constants = loadJSON './data/constants.json' #network.h里定义的常量
YGOProMessageHelper = require("./YGOProMessages.js") # 为 SRVPro2 准备的库,这里拿这个库只用来测试,SRVPro1 对异步支持不是特别完善,因此不会有很多异步优化
@helper = new YGOProMessageHelper()
#结构体定义
@structs = {}
for name, declaration of structs_declaration
result = Struct()
for field in declaration
if field.encoding
switch field.encoding
when "UTF-16LE" then result.chars field.name, field.length*2, field.encoding
else throw "unsupported encoding: #{field.encoding}"
else
type = field.type
type = typedefs[type] if typedefs[type]
if field.length
result.array field.name, field.length, type #不支持结构体
else
if @structs[type]
result.struct field.name, @structs[type]
else
result[type] field.name
@structs[name] = result
@structs = @helper.structs
@structs_declaration = @helper.structs_declaration
@typedefs = @helper.typedefs
@proto_structs = @helper.proto_structs
@constants = @helper.constants
#消息跟踪函数 需要重构, 另暂时只支持异步, 同步没做.
@stoc_follows = {}
@stoc_follows_before = {}
@stoc_follows_after = {}
@ctos_follows = {}
@ctos_follows_before = {}
@ctos_follows_after = {}
@replace_proto = (proto, tp) ->
if typeof(proto) != "string"
return proto
changed_proto = proto
for key, value of @constants[tp]
if value == proto
changed_proto = key
break
throw "unknown proto" if !@constants[tp][changed_proto]
return changed_proto
translateHandler = (handler) ->
return (buffer, info, datas, params)->
await return handler(buffer, info, params.client, params.server, datas)
@stoc_follow = (proto, synchronous, callback)->
changed_proto = @replace_proto(proto, "STOC")
@stoc_follows[changed_proto] = {callback: callback, synchronous: synchronous}
@helper.addHandler("STOC_#{proto}", translateHandler(callback), synchronous, 1)
return
@stoc_follow_before = (proto, synchronous, callback)->
changed_proto = @replace_proto(proto, "STOC")
if !@stoc_follows_before[changed_proto]
@stoc_follows_before[changed_proto] = []
@stoc_follows_before[changed_proto].push({callback: callback, synchronous: synchronous})
@helper.addHandler("STOC_#{proto}", translateHandler(callback), synchronous, 0)
return
@stoc_follow_after = (proto, synchronous, callback)->
changed_proto = @replace_proto(proto, "STOC")
if !@stoc_follows_after[changed_proto]
@stoc_follows_after[changed_proto] = []
@stoc_follows_after[changed_proto].push({callback: callback, synchronous: synchronous})
@helper.addHandler("STOC_#{proto}", translateHandler(callback), synchronous, 2)
return
@ctos_follow = (proto, synchronous, callback)->
changed_proto = @replace_proto(proto, "CTOS")
@ctos_follows[changed_proto] = {callback: callback, synchronous: synchronous}
@helper.addHandler("CTOS_#{proto}", translateHandler(callback), synchronous, 1)
return
@ctos_follow_before = (proto, synchronous, callback)->
changed_proto = @replace_proto(proto, "CTOS")
if !@ctos_follows_before[changed_proto]
@ctos_follows_before[changed_proto] = []
@ctos_follows_before[changed_proto].push({callback: callback, synchronous: synchronous})
@helper.addHandler("CTOS_#{proto}", translateHandler(callback), synchronous, 0)
return
@ctos_follow_after = (proto, synchronous, callback)->
changed_proto = @replace_proto(proto, "CTOS")
if !@ctos_follows_after[changed_proto]
@ctos_follows_after[changed_proto] = []
@ctos_follows_after[changed_proto].push({callback: callback, synchronous: synchronous})
@helper.addHandler("CTOS_#{proto}", translateHandler(callback), synchronous, 2)
return
#消息发送函数,至少要把俩合起来....
@stoc_send = (socket, proto, info)->
if socket.closed
return
#console.log proto, proto_structs.STOC[proto], structs[proto_structs.STOC[proto]]
if typeof info == 'undefined'
buffer = ""
else if Buffer.isBuffer(info)
buffer = info
else
struct = @structs[@proto_structs.STOC[proto]]
struct.allocate()
struct.set info
buffer = struct.buffer()
if typeof proto == 'string' #需要重构
for key, value of @constants.STOC
if value == proto
proto = key
break
throw "unknown proto" if !@constants.STOC[proto]
header = Buffer.allocUnsafe(3)
header.writeUInt16LE buffer.length + 1, 0
header.writeUInt8 proto, 2
socket.write header
socket.write buffer if buffer.length
return
return @helper.sendMessage(socket, "STOC_#{proto}", info)
@ctos_send = (socket, proto, info)->
if socket.closed
return
#console.log proto, proto_structs.CTOS[proto], structs[proto_structs.CTOS[proto]]
if typeof info == 'undefined'
buffer = ""
else if Buffer.isBuffer(info)
buffer = info
else
struct = @structs[@proto_structs.CTOS[proto]]
struct.allocate()
struct.set info
buffer = struct.buffer()
if typeof proto == 'string' #需要重构
for key, value of @constants.CTOS
if value == proto
proto = key
break
throw "unknown proto" if !@constants.CTOS[proto]
header = Buffer.allocUnsafe(3)
header.writeUInt16LE buffer.length + 1, 0
header.writeUInt8 proto, 2
socket.write header
socket.write buffer if buffer.length
return
return @helper.sendMessage(socket, "CTOS_#{proto}", info)
#util
@stoc_send_chat = (client, msg, player = 8)->
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment