ws.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package initialize
  2. import (
  3. "AIT/exception/errmsg"
  4. "AIT/service"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/gin-gonic/gin"
  8. "github.com/gorilla/websocket"
  9. "log"
  10. "net/http"
  11. )
  12. // ClientManager is a websocket manager
  13. type ClientManager struct {
  14. Clients map[string]*Client
  15. Broadcast chan []byte
  16. Register chan *Client
  17. Unregister chan *Client
  18. }
  19. // Client is a websocket client
  20. type Client struct {
  21. ID string
  22. Socket *websocket.Conn
  23. Send chan []byte
  24. }
  25. // Message is return msg
  26. type Message struct {
  27. Sender string `json:"sender,omitempty"`
  28. Recipient string `json:"recipient,omitempty"`
  29. Content string `json:"content,omitempty"`
  30. RecordId string `json:"recordId,omitempty"`
  31. Status bool `json:"status,omitempty"`
  32. }
  33. // Manager define a ws server manager
  34. var Manager = ClientManager{
  35. Broadcast: make(chan []byte),
  36. Register: make(chan *Client),
  37. Unregister: make(chan *Client),
  38. Clients: make(map[string]*Client),
  39. }
  40. type RecordService struct {
  41. service.RecordService
  42. }
  43. func GetRecord() *RecordService {
  44. return &RecordService{}
  45. }
  46. // Start is 项目运行前, 协程开启start -> go Manager.Start()
  47. func (manager *ClientManager) Start() {
  48. for {
  49. log.Println("<---管道通信--->")
  50. select {
  51. case conn := <-Manager.Register:
  52. log.Printf("新用户加入:%v", conn.ID)
  53. Manager.Clients[conn.ID] = conn
  54. //jsonMessage, _ := json.Marshal(&Message{Content: "Successful connection to socket service"})
  55. //conn.Send <- jsonMessage
  56. case conn := <-Manager.Unregister:
  57. //log.Printf("用户离开:%v", conn.ID)
  58. if _, ok := Manager.Clients[conn.ID]; ok {
  59. //jsonMessage, _ := json.Marshal(&Message{Content: "A socket has disconnected"})
  60. //conn.Send <- jsonMessage
  61. close(conn.Send)
  62. delete(Manager.Clients, conn.ID)
  63. }
  64. case message := <-Manager.Broadcast:
  65. MessageStruct := Message{}
  66. err := json.Unmarshal(message, &MessageStruct)
  67. if err != nil {
  68. return
  69. }
  70. for id, conn := range Manager.Clients {
  71. //处理结果回执消息
  72. if MessageStruct.Recipient == "server" {
  73. //更新record
  74. recordService := GetRecord()
  75. if MessageStruct.RecordId != "" {
  76. content := ""
  77. if MessageStruct.Status {
  78. content = errmsg.TestResultSuccess + MessageStruct.Content
  79. } else {
  80. content = errmsg.TestResultFailed + MessageStruct.Content
  81. }
  82. recordService.UpdateRecordById(MessageStruct.RecordId, content)
  83. }
  84. } else {
  85. if id == creatId(MessageStruct.Recipient, MessageStruct.Sender) {
  86. continue
  87. }
  88. select {
  89. case conn.Send <- message:
  90. default:
  91. close(conn.Send)
  92. delete(Manager.Clients, conn.ID)
  93. }
  94. }
  95. }
  96. }
  97. }
  98. }
  99. func creatId(uid, touid string) string {
  100. return uid + "_" + touid
  101. }
  102. func (c *Client) Read() {
  103. defer func() {
  104. Manager.Unregister <- c
  105. err := c.Socket.Close()
  106. if err != nil {
  107. fmt.Println(errmsg.SocketCloseError + err.Error())
  108. return
  109. }
  110. }()
  111. for {
  112. c.Socket.PongHandler()
  113. _, message, err := c.Socket.ReadMessage()
  114. if err != nil {
  115. Manager.Unregister <- c
  116. err := c.Socket.Close()
  117. if err != nil {
  118. fmt.Println(errmsg.SocketCloseError + err.Error())
  119. return
  120. }
  121. break
  122. }
  123. log.Printf("读取到客户端的信息:%s", string(message))
  124. Manager.Broadcast <- message
  125. }
  126. }
  127. func (c *Client) Write() {
  128. defer func() {
  129. err := c.Socket.Close()
  130. if err != nil {
  131. fmt.Println(errmsg.SocketCloseError + err.Error())
  132. return
  133. }
  134. }()
  135. for {
  136. select {
  137. case message, ok := <-c.Send:
  138. if !ok {
  139. err := c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
  140. if err != nil {
  141. fmt.Println("socket writeMessage error :" + err.Error())
  142. return
  143. }
  144. return
  145. }
  146. log.Printf("发送到到客户端的信息:%s", string(message))
  147. err := c.Socket.WriteMessage(websocket.TextMessage, message)
  148. if err != nil {
  149. fmt.Println("socket writeMessage error :" + err.Error())
  150. return
  151. }
  152. }
  153. }
  154. }
  155. // WsHandler TestHandler socket 连接 中间件 作用:升级协议,用户验证,自定义信息等
  156. func WsHandler(c *gin.Context) {
  157. uid := c.Query("uid")
  158. touid := c.Query("to_uid")
  159. conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(c.Writer, c.Request, nil)
  160. if err != nil {
  161. http.NotFound(c.Writer, c.Request)
  162. return
  163. }
  164. //可以添加用户信息验证
  165. client := &Client{
  166. ID: creatId(uid, touid),
  167. Socket: conn,
  168. Send: make(chan []byte),
  169. }
  170. Manager.Register <- client
  171. go client.Read()
  172. go client.Write()
  173. }