ws.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package initialize
  2. import (
  3. errmsg "confrontation-training/err"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gin-gonic/gin"
  7. "github.com/gorilla/websocket"
  8. "log"
  9. "net/http"
  10. )
  11. // ClientManager is a websocket manager
  12. type ClientManager struct {
  13. Clients map[string]*Client
  14. Broadcast chan []byte
  15. Register chan *Client
  16. Unregister chan *Client
  17. }
  18. // Client is a websocket client
  19. type Client struct {
  20. ID string
  21. Socket *websocket.Conn
  22. Send chan []byte
  23. }
  24. // Message is return msg
  25. type Message struct {
  26. Sender string `json:"sender,omitempty"`
  27. Recipient string `json:"recipient,omitempty"`
  28. Content string `json:"content,omitempty"`
  29. RecordId string `json:"recordId,omitempty"`
  30. Status bool `json:"status,omitempty"`
  31. }
  32. // Manager define a ws server manager
  33. var Manager = ClientManager{
  34. Broadcast: make(chan []byte),
  35. Register: make(chan *Client),
  36. Unregister: make(chan *Client),
  37. Clients: make(map[string]*Client),
  38. }
  39. // Start is 项目运行前, 协程开启start -> go Manager.Start()
  40. func (manager *ClientManager) Start() {
  41. for {
  42. log.Println("<---管道通信--->")
  43. select {
  44. case conn := <-Manager.Register:
  45. log.Printf("新用户加入:%v", conn.ID)
  46. Manager.Clients[conn.ID] = conn
  47. //jsonMessage, _ := json.Marshal(&Message{Content: "Successful connection to socket service"})
  48. //conn.Send <- jsonMessage
  49. case conn := <-Manager.Unregister:
  50. //log.Printf("用户离开:%v", conn.ID)
  51. if _, ok := Manager.Clients[conn.ID]; ok {
  52. //jsonMessage, _ := json.Marshal(&Message{Content: "A socket has disconnected"})
  53. //conn.Send <- jsonMessage
  54. close(conn.Send)
  55. delete(Manager.Clients, conn.ID)
  56. }
  57. case message := <-Manager.Broadcast:
  58. MessageStruct := Message{}
  59. err := json.Unmarshal(message, &MessageStruct)
  60. if err != nil {
  61. return
  62. }
  63. for id, conn := range Manager.Clients {
  64. //处理结果回执消息
  65. if MessageStruct.Recipient == "server" {
  66. //更新record
  67. if MessageStruct.RecordId != "" {
  68. //content := ""
  69. if MessageStruct.Status {
  70. //content = errmsg.TestResultSuccess + MessageStruct.Content
  71. } else {
  72. //content = errmsg.TestResultFailed + MessageStruct.Content
  73. }
  74. //recordService.UpdateRecordById(MessageStruct.RecordId, content)
  75. }
  76. } else {
  77. if id == creatId(MessageStruct.Recipient, MessageStruct.Sender) {
  78. continue
  79. }
  80. select {
  81. case conn.Send <- message:
  82. default:
  83. close(conn.Send)
  84. delete(Manager.Clients, conn.ID)
  85. }
  86. }
  87. }
  88. }
  89. }
  90. }
  91. func creatId(uid, touid string) string {
  92. return uid + "_" + touid
  93. }
  94. func (c *Client) Read() {
  95. defer func() {
  96. Manager.Unregister <- c
  97. err := c.Socket.Close()
  98. if err != nil {
  99. fmt.Println(errmsg.SocketCloseError + err.Error())
  100. return
  101. }
  102. }()
  103. for {
  104. c.Socket.PongHandler()
  105. _, message, err := c.Socket.ReadMessage()
  106. if err != nil {
  107. Manager.Unregister <- c
  108. err := c.Socket.Close()
  109. if err != nil {
  110. fmt.Println(errmsg.SocketCloseError + err.Error())
  111. return
  112. }
  113. break
  114. }
  115. log.Printf("读取到客户端的信息:%s", string(message))
  116. Manager.Broadcast <- message
  117. }
  118. }
  119. func (c *Client) Write() {
  120. defer func() {
  121. err := c.Socket.Close()
  122. if err != nil {
  123. fmt.Println(errmsg.SocketCloseError + err.Error())
  124. return
  125. }
  126. }()
  127. for {
  128. select {
  129. case message, ok := <-c.Send:
  130. if !ok {
  131. err := c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
  132. if err != nil {
  133. fmt.Println("socket writeMessage error :" + err.Error())
  134. return
  135. }
  136. return
  137. }
  138. log.Printf("发送到到客户端的信息:%s", string(message))
  139. err := c.Socket.WriteMessage(websocket.TextMessage, message)
  140. if err != nil {
  141. fmt.Println("socket writeMessage error :" + err.Error())
  142. return
  143. }
  144. }
  145. }
  146. }
  147. // WsHandler TestHandler socket 连接 中间件 作用:升级协议,用户验证,自定义信息等
  148. func WsHandler(c *gin.Context) {
  149. uid := c.Query("uid")
  150. touid := c.Query("to_uid")
  151. conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(c.Writer, c.Request, nil)
  152. if err != nil {
  153. http.NotFound(c.Writer, c.Request)
  154. return
  155. }
  156. //可以添加用户信息验证
  157. client := &Client{
  158. ID: creatId(uid, touid),
  159. Socket: conn,
  160. Send: make(chan []byte),
  161. }
  162. Manager.Register <- client
  163. go client.Read()
  164. go client.Write()
  165. }