sse.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package gateway
  2. import (
  3. "confrontation-training/constant"
  4. errors "confrontation-training/err"
  5. "confrontation-training/global"
  6. "confrontation-training/models/gateway"
  7. "confrontation-training/response"
  8. "encoding/json"
  9. "fmt"
  10. "github.com/gin-gonic/gin"
  11. "github.com/gorilla/websocket"
  12. "github.com/r3labs/sse/v2"
  13. "net/http"
  14. "strings"
  15. "time"
  16. )
  17. // SseScanDevice 扫描设备
  18. func SseScanDevice(paramMap map[string]string, filterType string, r *http.Request) {
  19. var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl
  20. deviceMap := make(map[string]gateway.DeviceScanned)
  21. if len(paramMap) > 0 {
  22. for key, value := range paramMap {
  23. scanUrl += "&" + key + "=" + value
  24. }
  25. }
  26. //events := make(chan *sse.Event)
  27. client := sse.NewClient(scanUrl)
  28. go func() {
  29. fmt.Println("开始SSE")
  30. //err := client.SubscribeChan("messages", events)
  31. //if err != nil {
  32. // return
  33. //}
  34. err := client.SubscribeRaw(func(msg *sse.Event) {
  35. //fmt.Println(string(msg.Data))
  36. var data gateway.DeviceScannedFromGateway
  37. err := json.Unmarshal(msg.Data, &data)
  38. if err != nil {
  39. fmt.Printf("SSE数据转换异常:%s", err.Error())
  40. return
  41. }
  42. deviceScanned := gateway.DeviceScanned{}
  43. deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
  44. if filterType == "1" {
  45. deviceService := GetDeviceService()
  46. _, i := deviceService.FindDeviceByMac(deviceScanned.MAC)
  47. if i > 0 {
  48. deviceScanned.Name = data.Name
  49. deviceScanned.Rssi = data.Rssi
  50. deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  51. deviceScanned.Chip = data.ChipId
  52. deviceMap[deviceScanned.MAC] = deviceScanned
  53. fmt.Println(deviceScanned)
  54. }
  55. }
  56. })
  57. if err != nil {
  58. fmt.Println(err.Error())
  59. return
  60. }
  61. <-r.Context().Done()
  62. return
  63. }()
  64. ticket := time.NewTicker(time.Duration(global.Config.Gateway.ScanSecond) * time.Millisecond)
  65. for range ticket.C {
  66. fmt.Println("当前时间:" + time.Now().Format("2006-01-02 15:04:05"))
  67. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  68. if len(deviceMap) > 0 {
  69. if err != nil {
  70. fmt.Println("Websocket client init error" + err.Error())
  71. return
  72. }
  73. messageMap := make(map[string]string)
  74. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  75. marshal, err := json.Marshal(deviceMap)
  76. if err != nil {
  77. fmt.Println("data parse Error :" + err.Error())
  78. return
  79. }
  80. messageMap["data"] = string(marshal)
  81. bytes, err := json.Marshal(messageMap)
  82. if err != nil {
  83. fmt.Printf("json异常:%s", err.Error())
  84. return
  85. }
  86. err = ws.WriteMessage(websocket.TextMessage, bytes)
  87. if err != nil {
  88. fmt.Println("消息发送异常:" + err.Error())
  89. return
  90. }
  91. } else {
  92. err := ws.WriteMessage(websocket.TextMessage, []byte("{}"))
  93. if err != nil {
  94. fmt.Println("消息发送异常:" + err.Error())
  95. return
  96. }
  97. }
  98. client = nil
  99. ws = nil
  100. }
  101. }
  102. func SseScanDevice2(c *gin.Context) {
  103. var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl
  104. var param gateway.DeviceScanParam
  105. err := c.ShouldBindJSON(&param)
  106. if err != nil {
  107. fmt.Printf("参数格式化异常:%s", err.Error())
  108. response.Failed(errors.ParamInvalid, c)
  109. return
  110. }
  111. paramMap := make(map[string]string)
  112. if param.Chip != "" {
  113. paramMap["chip"] = param.Chip
  114. }
  115. if param.FilterName != "" {
  116. //查询设备mac过滤信息
  117. filterMac := ""
  118. deviceInfos, count := GetDeviceService().DeviceService.FindDeviceByType(param.FilterName)
  119. if count > 0 {
  120. for i := range deviceInfos {
  121. filterMac += deviceInfos[i].Mac + ","
  122. }
  123. }
  124. if len(filterMac) > 0 {
  125. if strings.HasSuffix(filterMac, ",") {
  126. filterMac = filterMac[0 : len(filterMac)-1]
  127. }
  128. paramMap["filter_mac"] = filterMac
  129. }
  130. if param.FilterName == "0" {
  131. paramMap["filter_name"] = constant.FilterNameEEG
  132. } else if param.FilterName == "1" {
  133. paramMap["filter_name"] = constant.FilterNameECG
  134. } else {
  135. response.Failed(errors.ParamInvalid+":过滤类型-"+param.FilterName+"无效", c)
  136. return
  137. }
  138. } else {
  139. paramMap["filter_name"] = constant.FilterNameALL
  140. }
  141. if param.FilterRssi != "" {
  142. paramMap["filter_rssi"] = param.FilterRssi
  143. }
  144. if param.FilterMac != "" {
  145. paramMap["filter_mac"] = param.FilterMac
  146. }
  147. paramMap["active"] = "1"
  148. paramMap["event"] = "1"
  149. deviceMap := make(map[string]gateway.DeviceScanned)
  150. if len(paramMap) > 0 {
  151. for key, value := range paramMap {
  152. scanUrl += "&" + key + "=" + value
  153. }
  154. }
  155. //events := make(chan *sse.Event)
  156. client := sse.NewClient(scanUrl)
  157. fmt.Println("开始SSE")
  158. flusher := c.Writer.(http.Flusher)
  159. err = client.SubscribeRaw(func(msg *sse.Event) {
  160. //fmt.Println(string(msg.Data))
  161. var data gateway.DeviceScannedFromGateway
  162. err := json.Unmarshal(msg.Data, &data)
  163. if err != nil {
  164. fmt.Printf("SSE数据转换异常:%s", err.Error())
  165. return
  166. }
  167. deviceScanned := gateway.DeviceScanned{}
  168. deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
  169. //if param.FilterType == "1" {
  170. // deviceService := GetDeviceService()
  171. // _, i := deviceService.FindDeviceByMac(deviceScanned.MAC)
  172. // if i > 0 {
  173. // deviceScanned.Name = data.Name
  174. // deviceScanned.Rssi = data.Rssi
  175. // deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  176. // deviceScanned.Chip = data.ChipId
  177. // deviceMap[deviceScanned.MAC] = deviceScanned
  178. // fmt.Println(deviceScanned)
  179. // }
  180. //}
  181. deviceScanned.Name = data.Name
  182. deviceScanned.Rssi = data.Rssi
  183. deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  184. deviceScanned.Chip = data.ChipId
  185. deviceMap[deviceScanned.MAC] = deviceScanned
  186. fmt.Println(deviceScanned)
  187. if len(deviceMap) > 0 {
  188. marshal, _ := json.Marshal(deviceMap)
  189. c.Writer.Write(marshal)
  190. //fmt.Fprintf(c.Writer, string(marshal))
  191. flusher.Flush()
  192. }
  193. go func() {
  194. <-c.Request.Context().Done()
  195. client = nil
  196. return
  197. }()
  198. })
  199. if err != nil {
  200. fmt.Println(err.Error())
  201. return
  202. }
  203. return
  204. }
  205. // SseOpenNotify 开启通知
  206. func SseOpenNotify() {
  207. var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl
  208. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  209. if err != nil {
  210. fmt.Println("Websocket client init error" + err.Error())
  211. return
  212. }
  213. client := sse.NewClient(notifyUrl)
  214. err = client.SubscribeRaw(func(msg *sse.Event) {
  215. s := string(msg.Data)
  216. //fmt.Println("notify receive data :" + s)
  217. flag := strings.HasPrefix(s, "E840") || strings.HasPrefix(s, "E841") || strings.HasPrefix(s, "E823")
  218. if !flag {
  219. //websocket 通知数据
  220. //msgMap := make(map[string]string)
  221. //msgMap[]
  222. var receiveData gateway.DeviceDataReceived
  223. errJson := json.Unmarshal(msg.Data, &receiveData)
  224. if errJson != nil {
  225. fmt.Println("receive data parse error:" + errJson.Error())
  226. //panic(err)
  227. return
  228. }
  229. messageMap := make(map[string]string)
  230. messageMap["msgType"] = constant.MessageTypeData
  231. messageMap["data"] = string(msg.Data)
  232. err := ws.WriteMessage(websocket.TextMessage, msg.Data)
  233. if err != nil {
  234. fmt.Println(errors.SendMessageError + err.Error())
  235. return
  236. }
  237. }
  238. })
  239. if err != nil {
  240. return
  241. }
  242. }