123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package initialize
- import (
- "AIT/exception/errmsg"
- "AIT/service"
- "encoding/json"
- "fmt"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- "log"
- "net/http"
- )
- // ClientManager is a websocket manager
- type ClientManager struct {
- Clients map[string]*Client
- Broadcast chan []byte
- Register chan *Client
- Unregister chan *Client
- }
- // Client is a websocket client
- type Client struct {
- ID string
- Socket *websocket.Conn
- Send chan []byte
- }
- // Message is return msg
- type Message struct {
- Sender string `json:"sender,omitempty"`
- Recipient string `json:"recipient,omitempty"`
- CarContent string `json:"car_content,omitempty"`
- DirectionContent string `json:"direction_content,omitempty"`
- RecordId string `json:"recordId,omitempty"`
- Status bool `json:"status,omitempty"`
- }
- // Manager define a ws server manager
- var Manager = ClientManager{
- Broadcast: make(chan []byte),
- Register: make(chan *Client),
- Unregister: make(chan *Client),
- Clients: make(map[string]*Client),
- }
- type RecordService struct {
- service.RecordService
- }
- func GetRecord() *RecordService {
- return &RecordService{}
- }
- // Start is 项目运行前, 协程开启start -> go Manager.Start()
- func (manager *ClientManager) Start() {
- for {
- log.Println("<---管道通信--->")
- select {
- case conn := <-Manager.Register:
- log.Printf("新用户加入:%v", conn.ID)
- Manager.Clients[conn.ID] = conn
- //jsonMessage, _ := json.Marshal(&Message{Content: "Successful connection to socket service"})
- //conn.Send <- jsonMessage
- case conn := <-Manager.Unregister:
- //log.Printf("用户离开:%v", conn.ID)
- if _, ok := Manager.Clients[conn.ID]; ok {
- //jsonMessage, _ := json.Marshal(&Message{Content: "A socket has disconnected"})
- //conn.Send <- jsonMessage
- close(conn.Send)
- delete(Manager.Clients, conn.ID)
- }
- case message := <-Manager.Broadcast:
- MessageStruct := Message{}
- err := json.Unmarshal(message, &MessageStruct)
- if err != nil {
- return
- }
- for id, conn := range Manager.Clients {
- //处理结果回执消息
- if MessageStruct.Recipient == "server" {
- //更新record
- recordService := GetRecord()
- if MessageStruct.RecordId != "" {
- content := ""
- if MessageStruct.Status {
- if len(MessageStruct.DirectionContent) > 0 {
- content = errmsg.TestResultSuccess + MessageStruct.CarContent + ";" + MessageStruct.DirectionContent
- } else {
- content = errmsg.TestResultSuccess + MessageStruct.CarContent
- }
- } else {
- if len(MessageStruct.DirectionContent) > 0 {
- content = errmsg.TestResultSuccess + MessageStruct.CarContent + ";" + MessageStruct.DirectionContent
- } else {
- content = errmsg.TestResultSuccess + MessageStruct.CarContent
- }
- }
- recordService.UpdateRecordById(MessageStruct.RecordId, content)
- }
- } else {
- if id == creatId(MessageStruct.Recipient, MessageStruct.Sender) {
- continue
- }
- select {
- case conn.Send <- message:
- default:
- close(conn.Send)
- delete(Manager.Clients, conn.ID)
- }
- }
- }
- }
- }
- }
- func creatId(uid, touid string) string {
- return uid + "_" + touid
- }
- func (c *Client) Read() {
- defer func() {
- Manager.Unregister <- c
- err := c.Socket.Close()
- if err != nil {
- fmt.Println(errmsg.SocketCloseError + err.Error())
- return
- }
- }()
- for {
- c.Socket.PongHandler()
- _, message, err := c.Socket.ReadMessage()
- if err != nil {
- Manager.Unregister <- c
- err := c.Socket.Close()
- if err != nil {
- fmt.Println(errmsg.SocketCloseError + err.Error())
- return
- }
- break
- }
- log.Printf("读取到客户端的信息:%s", string(message))
- Manager.Broadcast <- message
- }
- }
- func (c *Client) Write() {
- defer func() {
- err := c.Socket.Close()
- if err != nil {
- fmt.Println(errmsg.SocketCloseError + err.Error())
- return
- }
- }()
- for {
- select {
- case message, ok := <-c.Send:
- if !ok {
- err := c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
- if err != nil {
- fmt.Println("socket writeMessage error :" + err.Error())
- return
- }
- return
- }
- log.Printf("发送到到客户端的信息:%s", string(message))
- err := c.Socket.WriteMessage(websocket.TextMessage, message)
- if err != nil {
- fmt.Println("socket writeMessage error :" + err.Error())
- return
- }
- }
- }
- }
- // WsHandler TestHandler socket 连接 中间件 作用:升级协议,用户验证,自定义信息等
- func WsHandler(c *gin.Context) {
- uid := c.Query("uid")
- touid := c.Query("to_uid")
- conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(c.Writer, c.Request, nil)
- if err != nil {
- http.NotFound(c.Writer, c.Request)
- return
- }
- //可以添加用户信息验证
- client := &Client{
- ID: creatId(uid, touid),
- Socket: conn,
- Send: make(chan []byte),
- }
- Manager.Register <- client
- go client.Read()
- go client.Write()
- }
|