sse.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package gateway
  2. import (
  3. "confrontation-training/constant"
  4. errors "confrontation-training/err"
  5. "confrontation-training/global"
  6. "confrontation-training/models/gateway"
  7. "encoding/json"
  8. "fmt"
  9. "github.com/gorilla/websocket"
  10. "github.com/r3labs/sse/v2"
  11. "net/http"
  12. "strings"
  13. "time"
  14. )
  15. // SseScanDevice 扫描设备
  16. func SseScanDevice(paramMap map[string]string, filterType string, r *http.Request) {
  17. var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl
  18. deviceMap := make(map[string]gateway.DeviceScanned)
  19. if len(paramMap) > 0 {
  20. for key, value := range paramMap {
  21. scanUrl += "&" + key + "=" + value
  22. }
  23. }
  24. //events := make(chan *sse.Event)
  25. client := sse.NewClient(scanUrl)
  26. go func() {
  27. fmt.Println("开始SSE")
  28. //err := client.SubscribeChan("messages", events)
  29. //if err != nil {
  30. // return
  31. //}
  32. err := client.SubscribeRaw(func(msg *sse.Event) {
  33. //fmt.Println(string(msg.Data))
  34. var data gateway.DeviceScannedFromGateway
  35. err := json.Unmarshal(msg.Data, &data)
  36. if err != nil {
  37. fmt.Printf("SSE数据转换异常:%s", err.Error())
  38. return
  39. }
  40. deviceScanned := gateway.DeviceScanned{}
  41. deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
  42. if filterType == "1" {
  43. deviceService := GetDeviceService()
  44. _, i := deviceService.FindDeviceByMac(deviceScanned.MAC)
  45. if i > 0 {
  46. deviceScanned.Name = data.Name
  47. deviceScanned.Rssi = data.Rssi
  48. deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  49. deviceScanned.Chip = data.ChipId
  50. deviceMap[deviceScanned.MAC] = deviceScanned
  51. fmt.Println(deviceScanned)
  52. }
  53. }
  54. })
  55. if err != nil {
  56. fmt.Println(err.Error())
  57. return
  58. }
  59. <-r.Context().Done()
  60. return
  61. }()
  62. ticket := time.NewTicker(time.Duration(global.Config.Gateway.ScanSecond) * time.Millisecond)
  63. for range ticket.C {
  64. fmt.Println("当前时间:" + time.Now().Format("2006-01-02 15:04:05"))
  65. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  66. if len(deviceMap) > 0 {
  67. if err != nil {
  68. fmt.Println("Websocket client init error" + err.Error())
  69. return
  70. }
  71. messageMap := make(map[string]string)
  72. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  73. marshal, err := json.Marshal(deviceMap)
  74. if err != nil {
  75. fmt.Println("data parse Error :" + err.Error())
  76. return
  77. }
  78. messageMap["data"] = string(marshal)
  79. bytes, err := json.Marshal(messageMap)
  80. if err != nil {
  81. fmt.Printf("json异常:%s", err.Error())
  82. return
  83. }
  84. err = ws.WriteMessage(websocket.TextMessage, bytes)
  85. if err != nil {
  86. fmt.Println("消息发送异常:" + err.Error())
  87. return
  88. }
  89. } else {
  90. err := ws.WriteMessage(websocket.TextMessage, []byte("{}"))
  91. if err != nil {
  92. fmt.Println("消息发送异常:" + err.Error())
  93. return
  94. }
  95. }
  96. client = nil
  97. ws = nil
  98. }
  99. }
  100. // SseOpenNotify 开启通知
  101. func SseOpenNotify() {
  102. var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl
  103. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  104. if err != nil {
  105. fmt.Println("Websocket client init error" + err.Error())
  106. return
  107. }
  108. client := sse.NewClient(notifyUrl)
  109. err = client.SubscribeRaw(func(msg *sse.Event) {
  110. s := string(msg.Data)
  111. //fmt.Println("notify receive data :" + s)
  112. flag := strings.HasPrefix(s, "E840") || strings.HasPrefix(s, "E841") || strings.HasPrefix(s, "E823")
  113. if !flag {
  114. //websocket 通知数据
  115. //msgMap := make(map[string]string)
  116. //msgMap[]
  117. var receiveData gateway.DeviceDataReceived
  118. errJson := json.Unmarshal(msg.Data, &receiveData)
  119. if errJson != nil {
  120. fmt.Println("receive data parse error:" + errJson.Error())
  121. //panic(err)
  122. return
  123. }
  124. messageMap := make(map[string]string)
  125. messageMap["msgType"] = constant.MessageTypeData
  126. messageMap["data"] = string(msg.Data)
  127. err := ws.WriteMessage(websocket.TextMessage, msg.Data)
  128. if err != nil {
  129. fmt.Println(errors.SendMessageError + err.Error())
  130. return
  131. }
  132. }
  133. })
  134. if err != nil {
  135. return
  136. }
  137. }