ws.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package initialize
  2. import (
  3. "AIT/service"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gin-gonic/gin"
  7. "github.com/gorilla/websocket"
  8. "log"
  9. "net/http"
  10. )
  11. // ClientManager is a websocket manager
  12. type ClientManager struct {
  13. Clients map[string]*Client
  14. Broadcast chan []byte
  15. Register chan *Client
  16. Unregister chan *Client
  17. }
  18. // Client is a websocket client
  19. type Client struct {
  20. ID string
  21. Socket *websocket.Conn
  22. Send chan []byte
  23. }
  24. // Message is return msg
  25. type Message struct {
  26. Sender string `json:"sender,omitempty"`
  27. Recipient string `json:"recipient,omitempty"`
  28. Content string `json:"content,omitempty"`
  29. RecordId string `json:"recordId,omitempty"`
  30. }
  31. // Manager define a ws server manager
  32. var Manager = ClientManager{
  33. Broadcast: make(chan []byte),
  34. Register: make(chan *Client),
  35. Unregister: make(chan *Client),
  36. Clients: make(map[string]*Client),
  37. }
  38. type RecordService struct {
  39. service.RecordService
  40. }
  41. func GetRecord() *RecordService {
  42. return &RecordService{}
  43. }
  44. // Start is 项目运行前, 协程开启start -> go Manager.Start()
  45. func (manager *ClientManager) Start() {
  46. for {
  47. log.Println("<---管道通信--->")
  48. select {
  49. case conn := <-Manager.Register:
  50. log.Printf("新用户加入:%v", conn.ID)
  51. Manager.Clients[conn.ID] = conn
  52. //jsonMessage, _ := json.Marshal(&Message{Content: "Successful connection to socket service"})
  53. //conn.Send <- jsonMessage
  54. case conn := <-Manager.Unregister:
  55. //log.Printf("用户离开:%v", conn.ID)
  56. if _, ok := Manager.Clients[conn.ID]; ok {
  57. //jsonMessage, _ := json.Marshal(&Message{Content: "A socket has disconnected"})
  58. //conn.Send <- jsonMessage
  59. close(conn.Send)
  60. delete(Manager.Clients, conn.ID)
  61. }
  62. case message := <-Manager.Broadcast:
  63. MessageStruct := Message{}
  64. err := json.Unmarshal(message, &MessageStruct)
  65. if err != nil {
  66. return
  67. }
  68. for id, conn := range Manager.Clients {
  69. //处理结果回执消息
  70. if MessageStruct.Recipient == "server" {
  71. //更新record
  72. recordService := GetRecord()
  73. if MessageStruct.RecordId != "" {
  74. recordService.UpdateRecordById(MessageStruct.RecordId, MessageStruct.Content)
  75. }
  76. } else {
  77. if id == creatId(MessageStruct.Recipient, MessageStruct.Sender) {
  78. continue
  79. }
  80. select {
  81. case conn.Send <- message:
  82. default:
  83. close(conn.Send)
  84. delete(Manager.Clients, conn.ID)
  85. }
  86. }
  87. }
  88. }
  89. }
  90. }
  91. func creatId(uid, touid string) string {
  92. return uid + "_" + touid
  93. }
  94. func (c *Client) Read() {
  95. defer func() {
  96. Manager.Unregister <- c
  97. err := c.Socket.Close()
  98. if err != nil {
  99. fmt.Println("socket close error:" + err.Error())
  100. return
  101. }
  102. }()
  103. for {
  104. c.Socket.PongHandler()
  105. _, message, err := c.Socket.ReadMessage()
  106. if err != nil {
  107. Manager.Unregister <- c
  108. err := c.Socket.Close()
  109. if err != nil {
  110. return
  111. }
  112. break
  113. }
  114. log.Printf("读取到客户端的信息:%s", string(message))
  115. Manager.Broadcast <- message
  116. }
  117. }
  118. func (c *Client) Write() {
  119. defer func() {
  120. err := c.Socket.Close()
  121. if err != nil {
  122. fmt.Println("socket close error:" + err.Error())
  123. return
  124. }
  125. }()
  126. for {
  127. select {
  128. case message, ok := <-c.Send:
  129. if !ok {
  130. err := c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
  131. if err != nil {
  132. fmt.Println("socket writeMessage error :" + err.Error())
  133. return
  134. }
  135. return
  136. }
  137. log.Printf("发送到到客户端的信息:%s", string(message))
  138. err := c.Socket.WriteMessage(websocket.TextMessage, message)
  139. if err != nil {
  140. fmt.Println("socket writeMessage error :" + err.Error())
  141. return
  142. }
  143. }
  144. }
  145. }
  146. // WsHandler TestHandler socket 连接 中间件 作用:升级协议,用户验证,自定义信息等
  147. func WsHandler(c *gin.Context) {
  148. uid := c.Query("uid")
  149. touid := c.Query("to_uid")
  150. conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(c.Writer, c.Request, nil)
  151. if err != nil {
  152. http.NotFound(c.Writer, c.Request)
  153. return
  154. }
  155. //可以添加用户信息验证
  156. client := &Client{
  157. ID: creatId(uid, touid),
  158. Socket: conn,
  159. Send: make(chan []byte),
  160. }
  161. Manager.Register <- client
  162. go client.Read()
  163. go client.Write()
  164. }