sse.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package gateway
  2. import (
  3. "confrontation-training/constant"
  4. errors "confrontation-training/err"
  5. "confrontation-training/global"
  6. "confrontation-training/models/gateway"
  7. "encoding/json"
  8. "fmt"
  9. "github.com/gorilla/websocket"
  10. "github.com/r3labs/sse/v2"
  11. "net/http"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. // SseScanDevice 扫描设备
  17. func SseScanDevice(paramMap map[string]string, filterType string) {
  18. var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl
  19. deviceMap := make(map[string]gateway.DeviceScanned)
  20. if len(paramMap) > 0 {
  21. for key, value := range paramMap {
  22. scanUrl += "&" + key + "=" + value
  23. }
  24. }
  25. fmt.Println("创建新的sse")
  26. //events := make(chan *sse.Event)
  27. client := global.SseClientDevice
  28. client.URL = scanUrl
  29. //ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  30. //if err != nil {
  31. // response.Failed(errors.SocketInitError, c)
  32. //}
  33. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  34. if err != nil {
  35. panic(err)
  36. return
  37. }
  38. fmt.Println("开始SSE")
  39. err = client.SubscribeRaw(func(msg *sse.Event) {
  40. var data gateway.DeviceScannedFromGateway
  41. err := json.Unmarshal(msg.Data, &data)
  42. if err != nil {
  43. fmt.Printf("SSE数据转换异常:%s", err.Error())
  44. return
  45. }
  46. deviceScanned := gateway.DeviceScanned{}
  47. deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
  48. //库中存在的设备
  49. if filterType == "1" {
  50. deviceService := GetDeviceService()
  51. d, i := deviceService.FindDeviceByMac(deviceScanned.MAC)
  52. if i > 0 {
  53. deviceScanned.Name = data.Name
  54. deviceScanned.Rssi = data.Rssi
  55. deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  56. deviceScanned.Chip = data.ChipId
  57. deviceScanned.DeviceType = d.Type
  58. deviceMap[deviceScanned.MAC] = deviceScanned
  59. fmt.Println(deviceScanned)
  60. }
  61. } else {
  62. deviceScanned.Name = data.Name
  63. deviceScanned.Rssi = data.Rssi
  64. deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  65. deviceScanned.Chip = data.ChipId
  66. deviceMap[deviceScanned.MAC] = deviceScanned
  67. }
  68. if len(deviceMap) > 0 {
  69. if err != nil {
  70. fmt.Println("Websocket client init error" + err.Error())
  71. return
  72. }
  73. messageMap := make(map[string]string)
  74. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  75. marshal, err := json.Marshal(deviceMap)
  76. if err != nil {
  77. fmt.Println("data parse Error :" + err.Error())
  78. return
  79. }
  80. messageMap["content"] = string(marshal)
  81. messageMap["Sender"] = "server"
  82. messageMap["Recipient"] = "client"
  83. bytes, err := json.Marshal(messageMap)
  84. if err != nil {
  85. fmt.Printf("json异常:%s", err.Error())
  86. return
  87. }
  88. err = ws.WriteMessage(websocket.TextMessage, bytes)
  89. if err != nil {
  90. fmt.Println("消息发送异常:" + err.Error())
  91. return
  92. }
  93. } else {
  94. err := ws.WriteMessage(websocket.TextMessage, []byte("{}"))
  95. if err != nil {
  96. fmt.Println("消息发送异常:" + err.Error())
  97. return
  98. }
  99. }
  100. time.Sleep(200 * time.Millisecond)
  101. })
  102. }
  103. func StopScan(w http.ResponseWriter, r *http.Request) {
  104. go func() {
  105. <-r.Context().Done()
  106. println("The client is disconnected here")
  107. return
  108. }()
  109. }
  110. // SseOpenNotify 开启通知
  111. func SseOpenNotify() {
  112. var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl
  113. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  114. if err != nil {
  115. fmt.Println("Websocket client init error" + err.Error())
  116. return
  117. }
  118. //client := global.SseClientData(notifyUrl)
  119. client := global.SseClientData
  120. client.URL = notifyUrl
  121. err = client.SubscribeRaw(func(msg *sse.Event) {
  122. bytes := msg.Data
  123. //s := string(bytes)
  124. //fmt.Println("notify receive data :" + s)
  125. //脑电数据
  126. var receiveData gateway.DeviceDataReceived
  127. errJson := json.Unmarshal(msg.Data, &receiveData)
  128. if errJson != nil {
  129. fmt.Println("receive data parse error:" + errJson.Error())
  130. return
  131. }
  132. messageMap := make(map[string]string)
  133. if strings.Contains(receiveData.Value, "AAAA") {
  134. messageMap["msgType"] = constant.MessageTypeEEGData
  135. marshal, _ := json.Marshal(receiveData)
  136. messageMap["content"] = string(marshal)
  137. } else { //心电数据
  138. flag := strings.HasPrefix(receiveData.Value, "E840") || strings.HasPrefix(receiveData.Value, "E841") || strings.HasPrefix(receiveData.Value, "E823")
  139. if !flag {
  140. //var ecgData []int
  141. ecgData := [12]int{}
  142. fmt.Println("收到的心电数据:" + string(msg.Data))
  143. data := []byte(receiveData.Value[4:])
  144. fmt.Println(len(data))
  145. if len(data) == 36 {
  146. for i := 0; i < 36; i += 3 {
  147. ecgData[i/3] = int(16<<data[i]&0xFF | 8<<data[i+1]&0xFF | data[i+2]&0xFF)
  148. }
  149. } else {
  150. for i := len(data); i < 36; i++ {
  151. data = append(data, data[i])
  152. }
  153. for i := 0; i < 36; i += 3 {
  154. ecgData[i/3] = int(16<<data[i]&0xFF | 8<<data[i+1]&0xFF | data[i+2]&0xFF)
  155. }
  156. }
  157. messageMap["msgType"] = constant.MessageTypeECGData
  158. var dataStr []string
  159. for _, i := range ecgData {
  160. dataStr = append(dataStr, strconv.Itoa(i))
  161. }
  162. receiveData.Value = strings.Join(dataStr, ",")
  163. marshal, _ := json.Marshal(receiveData)
  164. messageMap["content"] = string(marshal)
  165. }
  166. }
  167. messageMap["Sender"] = "server"
  168. messageMap["Recipient"] = "client"
  169. bytes, err := json.Marshal(messageMap)
  170. err = ws.WriteMessage(websocket.TextMessage, bytes)
  171. if err != nil {
  172. fmt.Println(errors.SendMessageError + err.Error())
  173. return
  174. }
  175. })
  176. if err != nil {
  177. return
  178. }
  179. }