ws.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package initialize
  2. import (
  3. "AIT/exception/errmsg"
  4. "AIT/service"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/gin-gonic/gin"
  8. "github.com/gorilla/websocket"
  9. "log"
  10. "net/http"
  11. )
  12. // ClientManager is a websocket manager
  13. type ClientManager struct {
  14. Clients map[string]*Client
  15. Broadcast chan []byte
  16. Register chan *Client
  17. Unregister chan *Client
  18. }
  19. // Client is a websocket client
  20. type Client struct {
  21. ID string
  22. Socket *websocket.Conn
  23. Send chan []byte
  24. }
  25. // Message is return msg
  26. type Message struct {
  27. Sender string `json:"sender,omitempty"`
  28. Recipient string `json:"recipient,omitempty"`
  29. CarContent string `json:"car_content,omitempty"`
  30. DirectionContent string `json:"direction_content,omitempty"`
  31. RecordId string `json:"recordId,omitempty"`
  32. Status bool `json:"status,omitempty"`
  33. }
  34. // Manager define a ws server manager
  35. var Manager = ClientManager{
  36. Broadcast: make(chan []byte),
  37. Register: make(chan *Client),
  38. Unregister: make(chan *Client),
  39. Clients: make(map[string]*Client),
  40. }
  41. type RecordService struct {
  42. service.RecordService
  43. }
  44. func GetRecord() *RecordService {
  45. return &RecordService{}
  46. }
  47. // Start is 项目运行前, 协程开启start -> go Manager.Start()
  48. func (manager *ClientManager) Start() {
  49. for {
  50. log.Println("<---管道通信--->")
  51. select {
  52. case conn := <-Manager.Register:
  53. log.Printf("新用户加入:%v", conn.ID)
  54. Manager.Clients[conn.ID] = conn
  55. //jsonMessage, _ := json.Marshal(&Message{Content: "Successful connection to socket service"})
  56. //conn.Send <- jsonMessage
  57. case conn := <-Manager.Unregister:
  58. //log.Printf("用户离开:%v", conn.ID)
  59. if _, ok := Manager.Clients[conn.ID]; ok {
  60. //jsonMessage, _ := json.Marshal(&Message{Content: "A socket has disconnected"})
  61. //conn.Send <- jsonMessage
  62. close(conn.Send)
  63. delete(Manager.Clients, conn.ID)
  64. }
  65. case message := <-Manager.Broadcast:
  66. MessageStruct := Message{}
  67. err := json.Unmarshal(message, &MessageStruct)
  68. if err != nil {
  69. return
  70. }
  71. for id, conn := range Manager.Clients {
  72. //处理结果回执消息
  73. if MessageStruct.Recipient == "server" {
  74. //更新record
  75. recordService := GetRecord()
  76. if MessageStruct.RecordId != "" {
  77. content := ""
  78. if MessageStruct.Status {
  79. if len(MessageStruct.DirectionContent) > 0 {
  80. content = errmsg.TestResultSuccess + MessageStruct.CarContent + ";" + MessageStruct.DirectionContent
  81. } else {
  82. content = errmsg.TestResultSuccess + MessageStruct.CarContent
  83. }
  84. } else {
  85. if len(MessageStruct.DirectionContent) > 0 {
  86. content = errmsg.TestResultSuccess + MessageStruct.CarContent + ";" + MessageStruct.DirectionContent
  87. } else {
  88. content = errmsg.TestResultSuccess + MessageStruct.CarContent
  89. }
  90. }
  91. recordService.UpdateRecordById(MessageStruct.RecordId, content)
  92. }
  93. } else {
  94. if id == creatId(MessageStruct.Recipient, MessageStruct.Sender) {
  95. continue
  96. }
  97. select {
  98. case conn.Send <- message:
  99. default:
  100. close(conn.Send)
  101. delete(Manager.Clients, conn.ID)
  102. }
  103. }
  104. }
  105. }
  106. }
  107. }
  108. func creatId(uid, touid string) string {
  109. return uid + "_" + touid
  110. }
  111. func (c *Client) Read() {
  112. defer func() {
  113. Manager.Unregister <- c
  114. err := c.Socket.Close()
  115. if err != nil {
  116. fmt.Println(errmsg.SocketCloseError + err.Error())
  117. return
  118. }
  119. }()
  120. for {
  121. c.Socket.PongHandler()
  122. _, message, err := c.Socket.ReadMessage()
  123. if err != nil {
  124. Manager.Unregister <- c
  125. err := c.Socket.Close()
  126. if err != nil {
  127. fmt.Println(errmsg.SocketCloseError + err.Error())
  128. return
  129. }
  130. break
  131. }
  132. log.Printf("读取到客户端的信息:%s", string(message))
  133. Manager.Broadcast <- message
  134. }
  135. }
  136. func (c *Client) Write() {
  137. defer func() {
  138. err := c.Socket.Close()
  139. if err != nil {
  140. fmt.Println(errmsg.SocketCloseError + err.Error())
  141. return
  142. }
  143. }()
  144. for {
  145. select {
  146. case message, ok := <-c.Send:
  147. if !ok {
  148. err := c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
  149. if err != nil {
  150. fmt.Println("socket writeMessage error :" + err.Error())
  151. return
  152. }
  153. return
  154. }
  155. log.Printf("发送到到客户端的信息:%s", string(message))
  156. err := c.Socket.WriteMessage(websocket.TextMessage, message)
  157. if err != nil {
  158. fmt.Println("socket writeMessage error :" + err.Error())
  159. return
  160. }
  161. }
  162. }
  163. }
  164. // WsHandler TestHandler socket 连接 中间件 作用:升级协议,用户验证,自定义信息等
  165. func WsHandler(c *gin.Context) {
  166. uid := c.Query("uid")
  167. touid := c.Query("to_uid")
  168. conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(c.Writer, c.Request, nil)
  169. if err != nil {
  170. http.NotFound(c.Writer, c.Request)
  171. return
  172. }
  173. //可以添加用户信息验证
  174. client := &Client{
  175. ID: creatId(uid, touid),
  176. Socket: conn,
  177. Send: make(chan []byte),
  178. }
  179. Manager.Register <- client
  180. go client.Read()
  181. go client.Write()
  182. }