sse.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package gateway
  2. import (
  3. "confrontation-training/constant"
  4. errors "confrontation-training/err"
  5. "confrontation-training/global"
  6. "confrontation-training/models/gateway"
  7. "encoding/json"
  8. "fmt"
  9. "github.com/gin-gonic/gin"
  10. "github.com/gorilla/websocket"
  11. "github.com/r3labs/sse/v2"
  12. "strings"
  13. "time"
  14. )
  15. // SseScanDevice 扫描设备
  16. func SseScanDevice(paramMap map[string]string, filterType string, c *gin.Context) {
  17. var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl
  18. deviceMap := make(map[string]gateway.DeviceScanned)
  19. if len(paramMap) > 0 {
  20. for key, value := range paramMap {
  21. scanUrl += "&" + key + "=" + value
  22. }
  23. }
  24. fmt.Println("创建新的sse")
  25. //events := make(chan *sse.Event)
  26. client := global.SseClientDevice
  27. client.URL = scanUrl
  28. //ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  29. //if err != nil {
  30. // response.Failed(errors.SocketInitError, c)
  31. //}
  32. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  33. if err != nil {
  34. panic(err)
  35. return
  36. }
  37. fmt.Println("开始SSE")
  38. err = client.SubscribeRaw(func(msg *sse.Event) {
  39. var data gateway.DeviceScannedFromGateway
  40. err := json.Unmarshal(msg.Data, &data)
  41. if err != nil {
  42. fmt.Printf("SSE数据转换异常:%s", err.Error())
  43. return
  44. }
  45. deviceScanned := gateway.DeviceScanned{}
  46. deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
  47. //库中存在的设备
  48. if filterType == "1" {
  49. deviceService := GetDeviceService()
  50. _, i := deviceService.FindDeviceByMac(deviceScanned.MAC)
  51. if i > 0 {
  52. deviceScanned.Name = data.Name
  53. deviceScanned.Rssi = data.Rssi
  54. deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  55. deviceScanned.Chip = data.ChipId
  56. deviceMap[deviceScanned.MAC] = deviceScanned
  57. fmt.Println(deviceScanned)
  58. }
  59. } else {
  60. deviceScanned.Name = data.Name
  61. deviceScanned.Rssi = data.Rssi
  62. deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  63. deviceScanned.Chip = data.ChipId
  64. deviceMap[deviceScanned.MAC] = deviceScanned
  65. }
  66. if len(deviceMap) > 0 {
  67. if err != nil {
  68. fmt.Println("Websocket client init error" + err.Error())
  69. return
  70. }
  71. messageMap := make(map[string]string)
  72. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  73. marshal, err := json.Marshal(deviceMap)
  74. if err != nil {
  75. fmt.Println("data parse Error :" + err.Error())
  76. return
  77. }
  78. messageMap["content"] = string(marshal)
  79. messageMap["Sender"] = "server"
  80. messageMap["Recipient"] = "client"
  81. bytes, err := json.Marshal(messageMap)
  82. if err != nil {
  83. fmt.Printf("json异常:%s", err.Error())
  84. return
  85. }
  86. err = ws.WriteMessage(websocket.TextMessage, bytes)
  87. if err != nil {
  88. fmt.Println("消息发送异常:" + err.Error())
  89. return
  90. }
  91. } else {
  92. err := ws.WriteMessage(websocket.TextMessage, []byte("{}"))
  93. if err != nil {
  94. fmt.Println("消息发送异常:" + err.Error())
  95. return
  96. }
  97. }
  98. time.Sleep(200 * time.Millisecond)
  99. })
  100. }
  101. // SseOpenNotify 开启通知
  102. func SseOpenNotify() {
  103. var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl
  104. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  105. if err != nil {
  106. fmt.Println("Websocket client init error" + err.Error())
  107. return
  108. }
  109. //client := global.SseClientData(notifyUrl)
  110. client := global.SseClientData
  111. client.URL = notifyUrl
  112. err = client.SubscribeRaw(func(msg *sse.Event) {
  113. bytes := msg.Data
  114. s := string(bytes)
  115. //fmt.Println("notify receive data :" + s)
  116. flag := strings.HasPrefix(s, "E840") || strings.HasPrefix(s, "E841") || strings.HasPrefix(s, "E823")
  117. //非心电数据
  118. if !flag {
  119. //websocket 通知数据
  120. //msgMap := make(map[string]string)
  121. //msgMap[]
  122. var receiveData gateway.DeviceDataReceived
  123. errJson := json.Unmarshal(msg.Data, &receiveData)
  124. if errJson != nil {
  125. fmt.Println("receive data parse error:" + errJson.Error())
  126. //panic(err)
  127. return
  128. }
  129. messageMap := make(map[string]string)
  130. messageMap["msgType"] = constant.MessageTypeData
  131. messageMap["content"] = string(msg.Data)
  132. messageMap["Sender"] = "server"
  133. messageMap["Recipient"] = "client"
  134. bytes, err := json.Marshal(messageMap)
  135. err = ws.WriteMessage(websocket.TextMessage, bytes)
  136. if err != nil {
  137. fmt.Println(errors.SendMessageError + err.Error())
  138. return
  139. }
  140. //心电数据
  141. } else {
  142. if len(bytes) == 18 {
  143. }
  144. }
  145. //time.Sleep(100)
  146. //time.Sleep(100 * time.Millisecond)
  147. //time.Sleep(10 * time.Millisecond)
  148. })
  149. if err != nil {
  150. return
  151. }
  152. }