ws.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package initialize
  2. import (
  3. errmsg "confrontation-training/err"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gin-gonic/gin"
  7. "github.com/gorilla/websocket"
  8. "log"
  9. "net/http"
  10. )
  11. // ClientManager is a websocket manager
  12. type ClientManager struct {
  13. Clients map[string]*Client
  14. Broadcast chan []byte
  15. Register chan *Client
  16. Unregister chan *Client
  17. }
  18. // Client is a websocket client
  19. type Client struct {
  20. ID string
  21. Socket *websocket.Conn
  22. Send chan []byte
  23. }
  24. // Message is return msg
  25. type Message struct {
  26. Sender string `json:"sender,omitempty"`
  27. Recipient string `json:"recipient,omitempty"`
  28. Content string `json:"content,omitempty"`
  29. RecordId string `json:"recordId,omitempty"`
  30. Status bool `json:"status,omitempty"`
  31. }
  32. // Manager define a ws server manager
  33. var Manager = ClientManager{
  34. Broadcast: make(chan []byte),
  35. Register: make(chan *Client),
  36. Unregister: make(chan *Client),
  37. Clients: make(map[string]*Client),
  38. }
  39. // Start is 项目运行前, 协程开启start -> go Manager.Start()
  40. func (manager *ClientManager) Start() {
  41. for {
  42. log.Println("<---管道通信--->")
  43. select {
  44. case conn := <-Manager.Register:
  45. log.Printf("新用户加入:%v", conn.ID)
  46. fmt.Println("当前Client连接数:" + string(len(Manager.Clients)))
  47. Manager.Clients[conn.ID] = conn
  48. //jsonMessage, _ := json.Marshal(&Message{Content: "Successful connection to socket service"})
  49. //conn.Send <- jsonMessage
  50. case conn := <-Manager.Unregister:
  51. log.Printf("用户离开:%v", conn.ID)
  52. fmt.Println("当前Client连接数:" + string(len(Manager.Clients)))
  53. if _, ok := Manager.Clients[conn.ID]; ok {
  54. //jsonMessage, _ := json.Marshal(&Message{Content: "A socket has disconnected"})
  55. //conn.Send <- jsonMessage
  56. close(conn.Send)
  57. delete(Manager.Clients, conn.ID)
  58. }
  59. fmt.Println("当前Client连接数:" + string(len(Manager.Clients)))
  60. case message := <-Manager.Broadcast:
  61. MessageStruct := Message{}
  62. err := json.Unmarshal(message, &MessageStruct)
  63. if err != nil {
  64. panic(err)
  65. return
  66. }
  67. for id, conn := range Manager.Clients {
  68. //处理结果回执消息
  69. if MessageStruct.Recipient == "server" {
  70. //更新record
  71. if MessageStruct.RecordId != "" {
  72. //content := ""
  73. if MessageStruct.Status {
  74. //content = errmsg.TestResultSuccess + MessageStruct.Content
  75. } else {
  76. //content = errmsg.TestResultFailed + MessageStruct.Content
  77. }
  78. //recordService.UpdateRecordById(MessageStruct.RecordId, content)
  79. }
  80. } else {
  81. if id == creatId(MessageStruct.Recipient, MessageStruct.Sender) {
  82. continue
  83. }
  84. select {
  85. case conn.Send <- message:
  86. default:
  87. close(conn.Send)
  88. delete(Manager.Clients, conn.ID)
  89. }
  90. }
  91. }
  92. }
  93. }
  94. }
  95. func creatId(uid, touid string) string {
  96. return uid + "_" + touid
  97. }
  98. func (c *Client) Read() {
  99. defer func() {
  100. Manager.Unregister <- c
  101. err := c.Socket.Close()
  102. if err != nil {
  103. fmt.Println(errmsg.SocketCloseError + err.Error())
  104. return
  105. }
  106. }()
  107. for {
  108. c.Socket.PongHandler()
  109. _, message, err := c.Socket.ReadMessage()
  110. if err != nil {
  111. Manager.Unregister <- c
  112. err := c.Socket.Close()
  113. if err != nil {
  114. fmt.Println(errmsg.SocketCloseError + err.Error())
  115. return
  116. }
  117. break
  118. }
  119. log.Printf("读取到客户端的信息:%s", string(message))
  120. Manager.Broadcast <- message
  121. }
  122. }
  123. func (c *Client) Write() {
  124. defer func() {
  125. err := c.Socket.Close()
  126. if err != nil {
  127. fmt.Println(errmsg.SocketCloseError + err.Error())
  128. return
  129. }
  130. }()
  131. for {
  132. select {
  133. case message, ok := <-c.Send:
  134. if !ok {
  135. err := c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
  136. if err != nil {
  137. fmt.Println("socket writeMessage error :" + err.Error())
  138. return
  139. }
  140. return
  141. }
  142. log.Printf("发送到到客户端的信息:%s", string(message))
  143. err := c.Socket.WriteMessage(websocket.TextMessage, message)
  144. if err != nil {
  145. fmt.Println("socket writeMessage error :" + err.Error())
  146. return
  147. }
  148. }
  149. }
  150. }
  151. // WsHandler TestHandler socket 连接 中间件 作用:升级协议,用户验证,自定义信息等
  152. func WsHandler(c *gin.Context) {
  153. uid := c.Query("uid")
  154. touid := c.Query("to_uid")
  155. conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(c.Writer, c.Request, nil)
  156. if err != nil {
  157. http.NotFound(c.Writer, c.Request)
  158. return
  159. }
  160. //可以添加用户信息验证
  161. client := &Client{
  162. ID: creatId(uid, touid),
  163. Socket: conn,
  164. Send: make(chan []byte),
  165. }
  166. Manager.Register <- client
  167. go client.Read()
  168. go client.Write()
  169. }