Commit 512f8de6 authored by Chunchi Che's avatar Chunchi Che

use channel

parent a90eadb8
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"sync" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
darkneos "github.com/sktt1ryze/ygopro-proxy/DarkNeos" darkneos "github.com/sktt1ryze/ygopro-proxy/DarkNeos"
...@@ -16,6 +16,7 @@ const TARGET_PORT = ":8000" ...@@ -16,6 +16,7 @@ const TARGET_PORT = ":8000"
const PROXY_PORT = ":3344" const PROXY_PORT = ":3344"
const CHANNEL_SIZE = 0x1000 const CHANNEL_SIZE = 0x1000
const BUFFER_SIZE = 0x1000 const BUFFER_SIZE = 0x1000
const TIME_OUT = 5
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
ReadBufferSize: 0x1000, ReadBufferSize: 0x1000,
...@@ -23,7 +24,7 @@ var upgrader = websocket.Upgrader{ ...@@ -23,7 +24,7 @@ var upgrader = websocket.Upgrader{
} }
func ygoEndpoint(w http.ResponseWriter, r *http.Request) { func ygoEndpoint(w http.ResponseWriter, r *http.Request) {
var wg sync.WaitGroup defer log.Println("ygoEndpoint finished")
upgrader.CheckOrigin = wsChecker upgrader.CheckOrigin = wsChecker
...@@ -31,81 +32,137 @@ func ygoEndpoint(w http.ResponseWriter, r *http.Request) { ...@@ -31,81 +32,137 @@ func ygoEndpoint(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer ws.Close()
log.Println("Websocket connected") log.Println("Connection to ws://localhost" + TARGET_PORT + " [websocket] succeeded!")
tcp, err := net.Dial("tcp", "127.0.0.1"+PROXY_PORT) tcp, err := net.Dial("tcp", "127.0.0.1"+PROXY_PORT)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
log.Println("Tcp connected") log.Println("Connection to " + "12.0.0.1" + PROXY_PORT + " [tcp] succeeded!")
defer tcp.Close() wsCh := make(chan []byte, CHANNEL_SIZE)
tcpCh := make(chan []byte, CHANNEL_SIZE)
wsStopCh := make(chan bool, CHANNEL_SIZE)
tcpStopCh := make(chan bool, CHANNEL_SIZE)
wg.Add(2) defer func() {
go wsProxy(ws, &tcp, &wg) wsStopCh <- true
go tcpProxy(&tcp, ws, &wg) tcpStopCh <- true
wg.Wait()
} close(wsStopCh)
close(tcpStopCh)
}()
func wsProxy(ws *websocket.Conn, tcp *net.Conn, wg *sync.WaitGroup) { go wsProxy(ws, wsCh, wsStopCh)
defer wg.Done() go tcpProxy(tcp, tcpCh, tcpStopCh)
for { for {
messageType, buffer, err := ws.ReadMessage() select {
if err != nil { case wsBuf, ok := <-wsCh:
log.Println(err) if !ok {
break return
} }
if messageType == websocket.CloseMessage { if _, err = tcp.Write(wsBuf); err != nil {
log.Println("Websocket closed") log.Println(err)
break return
} }
case tcpBuf, ok := <-tcpCh:
if !ok {
return
}
buffer, err = darkneos.Transform(buffer, darkneos.ProtobufToRawBuf) if err = ws.WriteMessage(websocket.BinaryMessage, tcpBuf); err != nil {
if err != nil { log.Println(err)
log.Fatal(err) return
break }
} }
}
}
_, err = (*tcp).Write(buffer) // todo: generic
if err != nil { func wsProxy(ws *websocket.Conn, Ch chan<- []byte, stopCh <-chan bool) {
log.Fatal(err) defer ws.Close()
break defer close(Ch)
for {
select {
case _, ok := <-stopCh:
log.Println("wsProxy recv stop singal, exit. channel closed: ", ok)
return
default:
// if err := ws.SetReadDeadline(time.Now().Add(time.Second * TIME_OUT)); err != nil {
// log.Println(err)
// return
// }
messageType, buffer, err := ws.ReadMessage()
if err != nil {
// if err, ok := err.(net.Error); ok && err.Timeout() {
// continue
// }
log.Println(err)
return
}
if messageType == websocket.CloseMessage {
log.Println("Websocket closed")
return
}
buffer, err = darkneos.Transform(buffer, darkneos.ProtobufToRawBuf)
if err != nil {
log.Println(err)
return
}
Ch <- buffer
} }
} }
} }
func tcpProxy(tcp *net.Conn, ws *websocket.Conn, wg *sync.WaitGroup) { func tcpProxy(tcp net.Conn, Ch chan<- []byte, stopCh <-chan bool) {
defer wg.Done() defer tcp.Close()
defer close(Ch)
reader := bufio.NewReader(*tcp) reader := bufio.NewReader(tcp)
buffer := make([]byte, BUFFER_SIZE) buffer := make([]byte, BUFFER_SIZE)
for { for {
_, err := reader.Read(buffer) select {
if err != nil { case _, ok := <-stopCh:
if err == io.EOF { log.Println("tcpProxy recv stop singal, exit. channel closed: ", ok)
continue return
default:
if err := tcp.SetReadDeadline(time.Now().Add(time.Second * TIME_OUT)); err != nil {
log.Println(err)
return
} }
log.Println("Tcp read message error: ", err) _, err := reader.Read(buffer)
break if err != nil {
} if err == io.EOF {
continue
}
buffer, err = darkneos.Transform(buffer, darkneos.RawBufToProtobuf) if err, ok := err.(net.Error); ok && err.Timeout() {
if err != nil { continue
log.Fatal(err) }
break
} log.Println(err)
return
}
err = ws.WriteMessage(websocket.BinaryMessage, buffer) buffer, err = darkneos.Transform(buffer, darkneos.RawBufToProtobuf)
if err != nil { if err != nil {
log.Fatal(err) log.Println(err)
break return
}
Ch <- buffer
} }
} }
} }
...@@ -117,6 +174,8 @@ func setupRoutes() { ...@@ -117,6 +174,8 @@ func setupRoutes() {
} }
func main() { func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile | log.Llongfile)
setupRoutes() setupRoutes()
log.Fatal(http.ListenAndServe(TARGET_PORT, nil)) log.Fatal(http.ListenAndServe(TARGET_PORT, nil))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment