Commit 05f29ab5 authored by Chunchi Che's avatar Chunchi Che

Merge branch 'optimize/websocket/stream' into 'main'

Optimize/websocket/stream

See merge request mycard/Neos!197
parents 2869c08b fab74f46
Pipeline #21739 failed with stages
in 12 minutes and 44 seconds
// Some implementation of infrastructure
export * from "./sleep";
export * from "./stream";
export const sleep = (delay: number) =>
new Promise((resolve) => setTimeout(resolve, delay));
import handleSocketMessage from "@/service/onSocketMessage";
import { sleep } from "./sleep";
const SLEEP_INTERVAL = 200;
export class WebSocketStream {
public ws: WebSocket;
stream: ReadableStream;
constructor(ip: string, onWsOpen?: (ws: WebSocket, ev: Event) => any) {
this.ws = new WebSocket("wss://" + ip);
if (onWsOpen) {
this.ws.onopen = (e) => onWsOpen(this.ws, e);
}
const ws = this.ws;
this.stream = new ReadableStream({
start(controller) {
// 当Websocket有数据到达时,加入队列
ws.onmessage = (event) => {
controller.enqueue(event);
};
ws.onclose = () => {
console.info("Websocket closed.");
controller.close();
};
},
pull(_) {
// currently not really need
},
cancel() {
// currently not
},
});
}
// 异步地从Websocket中获取数据并处理
async execute(onMessage: (event: MessageEvent) => Promise<void>) {
const reader: ReadableStreamDefaultReader<MessageEvent> =
this.stream.getReader();
const ws = this.ws;
reader.read().then(async function process({ done, value }): Promise<void> {
if (done) {
if (ws.readyState == WebSocket.CLOSED) {
// websocket connection has been closed
console.info("WebSocket closed, stream complete.");
return;
} else {
// websocket not closed, sleep sometime, wait for next message from server
await sleep(SLEEP_INTERVAL);
return reader.read().then(process);
}
}
if (value) {
await onMessage(value);
} else {
console.warn("value from ReadableStream is undefined!");
}
// read some more, and call process function again
return reader.read().then(process);
});
}
// 关闭流
close() {
this.ws.close();
}
}
......@@ -4,6 +4,8 @@
* 所有长连接/Websocket相关的逻辑都应该收敛在这里。
*
* */
import { WebSocketStream } from "@/infra";
import handleSocketMessage from "../service/onSocketMessage";
import handleSocketOpen from "../service/onSocketOpen";
......@@ -28,24 +30,19 @@ export interface socketAction {
payload?: Uint8Array;
}
let ws: WebSocket | null = null;
let ws: WebSocketStream | null = null;
// FIXME: 应该有个返回值,告诉业务方本次请求的结果。比如建立长连接失败。
export default function (action: socketAction) {
export default async function (action: socketAction) {
switch (action.cmd) {
case socketCmd.CONNECT: {
const info = action.initInfo;
if (info) {
ws = new WebSocket("wss://" + info.ip);
ws = new WebSocketStream(info.ip, (conn, _event) =>
handleSocketOpen(conn, info.ip, info.player, info.passWd)
);
ws.onopen = () => {
handleSocketOpen(ws, info.ip, info.player, info.passWd);
};
ws.onclose = () => {
console.log("WebSocket closed.");
ws = null;
};
ws.onmessage = handleSocketMessage;
await ws.execute(handleSocketMessage);
}
break;
......@@ -60,7 +57,7 @@ export default function (action: socketAction) {
case socketCmd.SEND: {
const payload = action.payload;
if (ws && payload) {
ws.send(payload);
ws.ws.send(payload);
}
break;
......
import { ygopro } from "@/api";
import { sleep } from "@/infra";
import { fetchEsHintMeta, matStore } from "@/stores";
export default (attack: ygopro.StocGameMessage.MsgAttack) => {
export default async (attack: ygopro.StocGameMessage.MsgAttack) => {
fetchEsHintMeta({
originMsg: "「[?]」攻击时",
location: attack.attacker_location,
......@@ -16,7 +17,8 @@ export default (attack: ygopro.StocGameMessage.MsgAttack) => {
if (attack.direct_attack) {
attacker.directAttack = true;
setTimeout(() => (attacker.directAttack = false), 500);
await sleep(500);
attacker.directAttack = false;
} else {
const target = matStore
.in(attack.target_location.location)
......@@ -30,7 +32,8 @@ export default (attack: ygopro.StocGameMessage.MsgAttack) => {
...target,
};
setTimeout(() => (attacker.attackTarget = undefined), 500);
await sleep(500);
attacker.attackTarget = undefined;
}
}
}
......
import { ygopro } from "@/api";
import { useConfig } from "@/config";
import { sleep } from "@/infra";
import { fetchEsHintMeta, matStore } from "@/stores";
export default (chaining: ygopro.StocGameMessage.MsgChaining) => {
export default async (chaining: ygopro.StocGameMessage.MsgChaining) => {
fetchEsHintMeta({
originMsg: "「[?]」被发动时",
cardID: chaining.code,
......@@ -10,8 +11,7 @@ export default (chaining: ygopro.StocGameMessage.MsgChaining) => {
matStore.setChaining(chaining.location, chaining.code, true);
setTimeout(() => {
matStore.setChaining(chaining.location, chaining.code, false);
// TODO: set chained
}, useConfig().ui.chainingDelay);
await sleep(useConfig().ui.chainingDelay);
matStore.setChaining(chaining.location, chaining.code, false);
// TODO: set chained
};
......@@ -56,7 +56,7 @@ const ActiveList = [
"select_yes_no",
];
export default function handleGameMsg(pb: ygopro.YgoStocMsg) {
export default async function handleGameMsg(pb: ygopro.YgoStocMsg) {
const msg = pb.stoc_game_msg;
if (ActiveList.includes(msg.gameMsg)) {
......@@ -100,7 +100,7 @@ export default function handleGameMsg(pb: ygopro.YgoStocMsg) {
break;
}
case "move": {
onMsgMove(msg.move);
await onMsgMove(msg.move);
break;
}
......@@ -215,7 +215,7 @@ export default function handleGameMsg(pb: ygopro.YgoStocMsg) {
break;
}
case "attack": {
onMsgAttack(msg.attack);
await onMsgAttack(msg.attack);
break;
}
......@@ -225,7 +225,7 @@ export default function handleGameMsg(pb: ygopro.YgoStocMsg) {
break;
}
case "chaining": {
onMsgChaining(msg.chaining);
await onMsgChaining(msg.chaining);
break;
}
......
......@@ -4,6 +4,7 @@ import { ygopro } from "@/api";
import { fetchOverlayMeta, store } from "@/stores";
type MsgMove = ygopro.StocGameMessage.MsgMove;
import { useConfig } from "@/config";
import { sleep } from "@/infra";
import { REASON_MATERIAL } from "../../common";
......@@ -12,7 +13,7 @@ const NeosConfig = useConfig();
const OVERLAY_STACK: { uuid: string; code: number; sequence: number }[] = [];
export default (move: MsgMove) => {
export default async (move: MsgMove) => {
const code = move.code;
const from = move.from;
const to = move.to;
......@@ -79,12 +80,8 @@ export default (move: MsgMove) => {
matStore.in(to.location).of(to.controler)[to.sequence].uuid = uuid;
}
setTimeout(
() =>
(matStore.in(to.location).of(to.controler)[to.sequence].focus =
false),
NeosConfig.ui.moveDelay
);
await sleep(NeosConfig.ui.moveDelay);
matStore.in(to.location).of(to.controler)[to.sequence].focus = false;
break;
}
case ygopro.CardZone.REMOVED:
......@@ -112,12 +109,11 @@ export default (move: MsgMove) => {
true
);
setTimeout(() => {
// 因为手牌可能会洗牌,sequence就对不上了,所以这里把所有手牌的focus字段都设置成false
for (const hand of matStore.in(to.location).of(to.controler)) {
hand.focus = false;
}
}, NeosConfig.ui.moveDelay);
await sleep(NeosConfig.ui.moveDelay);
// 因为手牌可能会洗牌,sequence就对不上了,所以这里把所有手牌的focus字段都设置成false
for (const hand of matStore.in(to.location).of(to.controler)) {
hand.focus = false;
}
}
break;
}
......
......@@ -2,11 +2,8 @@
* 长连接消息事件订阅处理逻辑
*
* */
import { ygopro } from "@/api";
import { adaptStoc } from "@/api/ocgcore/ocgAdapter/adapter";
import { YgoProPacket } from "@/api/ocgcore/ocgAdapter/packet";
import { useConfig } from "@/config";
import { matStore } from "@/stores";
import handleGameMsg from "./duel/gameMsg";
import handleTimeLimit from "./duel/timeLimit";
......@@ -21,115 +18,86 @@ import handleHsWatchChange from "./room/hsWatchChange";
import handleJoinGame from "./room/joinGame";
import handleTypeChange from "./room/typeChange";
const NeosConfig = useConfig();
/*
* 先将从长连接中读取到的二进制数据通过Adapter转成protobuf结构体,
* 然后再分发到各个处理函数中去处理。
*
* */
export default function handleSocketMessage(e: MessageEvent) {
export default async function handleSocketMessage(e: MessageEvent) {
const packet = YgoProPacket.deserialize(e.data);
const pb = adaptStoc(packet);
const delay = handleDelay(pb);
setTimeout(() => {
switch (pb.msg) {
case "stoc_join_game": {
handleJoinGame(pb);
break;
}
case "stoc_chat": {
handleChat(pb);
break;
}
case "stoc_hs_player_change": {
handleHsPlayerChange(pb);
break;
}
case "stoc_hs_watch_change": {
handleHsWatchChange(pb);
break;
}
case "stoc_hs_player_enter": {
handleHsPlayerEnter(pb);
break;
}
case "stoc_type_change": {
handleTypeChange(pb);
break;
}
case "stoc_select_hand": {
handleSelectHand(pb);
break;
}
case "stoc_hand_result": {
// TODO
console.log("TODO: handle STOC HandResult.");
break;
}
case "stoc_select_tp": {
handleSelectTp(pb);
break;
}
case "stoc_deck_count": {
handleDeckCount(pb);
break;
}
case "stoc_duel_start": {
handleDuelStart(pb);
break;
}
case "stoc_game_msg": {
handleGameMsg(pb);
break;
}
case "stoc_time_limit": {
handleTimeLimit(pb.stoc_time_limit);
break;
}
default: {
console.log(packet);
break;
}
switch (pb.msg) {
case "stoc_join_game": {
handleJoinGame(pb);
break;
}
}, delay);
}
case "stoc_chat": {
handleChat(pb);
// 该函数用于控频,防止MSG更新太频繁,返回值是延迟的时间戳(毫秒)
//
// 对于一般的MSG,我们会延迟200ms执行处理逻辑;
// 当处理一些带有动画效果的MSG时,比如`MSG_MOVE`,`MSG_CHAINING`,我们会设置下一次执行处理逻辑的延迟,确保动画完整
function handleDelay(stoc: ygopro.YgoStocMsg): number {
const delay = matStore.delay;
// 重置下次`delay`
matStore.delay = NeosConfig.ui.commonDelay;
// 对特定的`MSG`,设置特化的`delay`
if (stoc.has_stoc_game_msg) {
if (stoc.stoc_game_msg.gameMsg == "move") {
matStore.delay = NeosConfig.ui.moveDelay + 500;
} else if (stoc.stoc_game_msg.gameMsg == "chaining") {
matStore.delay = NeosConfig.ui.chainingDelay;
} else if (stoc.stoc_game_msg.gameMsg == "attack") {
matStore.delay = NeosConfig.ui.attackDelay + 200;
break;
}
}
case "stoc_hs_player_change": {
handleHsPlayerChange(pb);
break;
}
case "stoc_hs_watch_change": {
handleHsWatchChange(pb);
break;
}
case "stoc_hs_player_enter": {
handleHsPlayerEnter(pb);
break;
}
case "stoc_type_change": {
handleTypeChange(pb);
break;
}
case "stoc_select_hand": {
handleSelectHand(pb);
break;
}
case "stoc_hand_result": {
// TODO
console.log("TODO: handle STOC HandResult.");
return delay;
break;
}
case "stoc_select_tp": {
handleSelectTp(pb);
break;
}
case "stoc_deck_count": {
handleDeckCount(pb);
break;
}
case "stoc_duel_start": {
handleDuelStart(pb);
break;
}
case "stoc_game_msg": {
await handleGameMsg(pb);
break;
}
case "stoc_time_limit": {
handleTimeLimit(pb.stoc_time_limit);
break;
}
default: {
console.log(packet);
break;
}
}
}
......@@ -12,7 +12,7 @@ const NeosConfig = useConfig();
*
* */
export default function handleSocketOpen(
ws: WebSocket | null,
ws: WebSocket | undefined,
_ip: string,
player: string,
passWd: string
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment