sseclient.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package initialize
  2. import (
  3. "confrontation-training/common"
  4. "confrontation-training/constant"
  5. errors "confrontation-training/err"
  6. "confrontation-training/global"
  7. "confrontation-training/models/gateway"
  8. deviceService "confrontation-training/service/device"
  9. "encoding/hex"
  10. "encoding/json"
  11. "fmt"
  12. "github.com/gorilla/websocket"
  13. "github.com/r3labs/sse/v2"
  14. "github.com/rosshemsley/kalman"
  15. "github.com/rosshemsley/kalman/models"
  16. "strconv"
  17. "strings"
  18. "time"
  19. )
  20. type DeviceService struct {
  21. deviceService.DeviceService
  22. }
  23. func GetDeviceService() *DeviceService {
  24. return &DeviceService{}
  25. }
  26. func StartDataSSeClient() {
  27. fmt.Println("start data sse")
  28. var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl
  29. global.SseClientData = sse.NewClient(notifyUrl)
  30. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  31. if err != nil {
  32. fmt.Println("Websocket client init error" + err.Error())
  33. return
  34. }
  35. //client := global.SseClientData(notifyUrl)
  36. client := global.SseClientData
  37. client.URL = notifyUrl
  38. //库中存在的设备
  39. deviceService := GetDeviceService()
  40. d, i := deviceService.FindDeviceByType("")
  41. global.DeviceMap = make(map[string]gateway.DeviceInfo)
  42. if i > 0 {
  43. for j := 0; j < len(d); j++ {
  44. global.DeviceMap[d[j].Mac] = d[j]
  45. }
  46. }
  47. err = client.SubscribeRaw(func(msg *sse.Event) {
  48. bytes := msg.Data
  49. s := string(bytes)
  50. nowTime := common.NowTime("2006-01-02 15:04:05")
  51. fmt.Println(nowTime)
  52. fmt.Println("notify receive data :" + s)
  53. //脑电数据
  54. var receiveData gateway.DeviceDataReceived
  55. errJson := json.Unmarshal(msg.Data, &receiveData)
  56. if errJson != nil {
  57. fmt.Println("receive data parse error:" + errJson.Error())
  58. return
  59. }
  60. deviceInfo := global.DeviceMap[receiveData.Mac]
  61. messageMap := make(map[string]string)
  62. if deviceInfo.Type == "0" {
  63. messageMap["msgType"] = constant.MessageTypeEEGData
  64. marshal, _ := json.Marshal(receiveData)
  65. messageMap["content"] = string(marshal)
  66. } else if deviceInfo.Type == "1" {
  67. //心电数据
  68. flag := strings.HasPrefix(receiveData.Value, "E840") || strings.HasPrefix(receiveData.Value, "E841") || strings.HasPrefix(receiveData.Value, "E823")
  69. if !flag {
  70. //var ecgData []int
  71. ecgData := [6]int{}
  72. fmt.Println("收到的心电数据:" + string(msg.Data))
  73. fmt.Print("数据长度:")
  74. fmt.Println(len(msg.Data))
  75. data, _ := hex.DecodeString(receiveData.Value[4:])
  76. //data := []byte(receiveData.Value[4:])
  77. fmt.Println(len(data))
  78. if len(data) == 18 {
  79. for i := 0; i < 18; i += 3 {
  80. ecgData[i/3] = int(int32(data[i]&255)<<16 | int32(data[i+1]&255)<<8 | int32(data[i+2]&255))
  81. fmt.Print("ecgData:")
  82. fmt.Println(ecgData[i/3])
  83. }
  84. } else {
  85. for i := len(data); i < 18; i++ {
  86. data = append(data, data[i])
  87. }
  88. for i := 0; i < 18; i += 3 {
  89. ecgData[i/3] = int(int32(data[i]&255)<<16 | int32(data[i+1]&255)<<8 | int32(data[i+2]&255))
  90. fmt.Print("ecgData:")
  91. fmt.Println(ecgData[i/3])
  92. }
  93. }
  94. messageMap["msgType"] = constant.MessageTypeECGData
  95. var dataStr []string
  96. var t time.Time
  97. model := models.NewSimpleModel(t, float64(ecgData[0]), models.SimpleModelConfig{
  98. InitialVariance: 1.0,
  99. ProcessVariance: 1.0,
  100. ObservationVariance: 2.0,
  101. })
  102. filter := kalman.NewKalmanFilter(model)
  103. for _, v := range ecgData {
  104. t = t.Add(time.Second)
  105. filter.Update(t, model.NewMeasurement(float64(v)))
  106. fmt.Printf("filtered value: %f\n", model.Value(filter.State()))
  107. dataStr = append(dataStr, strconv.FormatFloat(model.Value(filter.State()), 'f', 2, 64))
  108. }
  109. //for _, v := range ecgData {
  110. // dataStr = append(dataStr, strconv.Itoa(v))
  111. //}
  112. receiveData.Value = strings.Join(dataStr, ",")
  113. marshal, _ := json.Marshal(receiveData)
  114. messageMap["content"] = string(marshal)
  115. }
  116. }
  117. messageMap["Sender"] = "server"
  118. messageMap["Recipient"] = "client"
  119. bytes, err := json.Marshal(messageMap)
  120. //fmt.Println("sendData:" + common.NowTime("2006-01-02 15:04:05"))
  121. fmt.Println(messageMap)
  122. err = ws.WriteMessage(websocket.TextMessage, bytes)
  123. if err != nil {
  124. fmt.Println(errors.SendMessageError + err.Error())
  125. return
  126. }
  127. })
  128. }
  129. func StartScanSseClient() {
  130. fmt.Println("start scan sse")
  131. var baseUrl = global.Config.Gateway.BaseUrl
  132. var deviceUrl = global.Config.Gateway.ScanUrl
  133. global.SseClientDevice = sse.NewClient(baseUrl + deviceUrl)
  134. deviceMap := make(map[string]gateway.DeviceScanned)
  135. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  136. if err != nil {
  137. fmt.Println("websocket初始化异常:" + err.Error())
  138. return
  139. }
  140. //库中存在的设备
  141. deviceService := GetDeviceService()
  142. d, i := deviceService.FindDeviceByType("")
  143. temp := make(map[string]gateway.DeviceInfo)
  144. if i > 0 {
  145. for j := 0; j < len(d); j++ {
  146. temp[d[j].Mac] = d[j]
  147. }
  148. err = global.SseClientDevice.SubscribeRaw(func(msg *sse.Event) {
  149. fmt.Print("扫描数据:")
  150. fmt.Print(string(msg.Data))
  151. var data gateway.DeviceScannedFromGateway
  152. err := json.Unmarshal(msg.Data, &data)
  153. if err != nil {
  154. fmt.Printf("SSE数据转换异常:%s", err.Error())
  155. } else {
  156. deviceScanned := gateway.DeviceScanned{}
  157. deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
  158. d := temp[deviceScanned.MAC]
  159. if d.ID > 0 {
  160. deviceScanned.Name = d.Name
  161. deviceScanned.Rssi = data.Rssi
  162. deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  163. deviceScanned.Chip = data.ChipId
  164. deviceScanned.DeviceType = d.Type
  165. deviceMap[deviceScanned.MAC] = deviceScanned
  166. }
  167. if deviceScanned.Name == "" {
  168. if d.Type == "0" {
  169. deviceScanned.Name = "MIND" + strings.ReplaceAll(deviceScanned.MAC[len(deviceScanned.MAC)-5:], ":", "")
  170. } else {
  171. deviceScanned.Name = "BW-ECG-01"
  172. }
  173. }
  174. if len(deviceMap) > 0 {
  175. //fmt.Print("deviceMap:")
  176. //fmt.Println(deviceMap)
  177. if err != nil {
  178. fmt.Println("Websocket client init error" + err.Error())
  179. return
  180. }
  181. messageMap := make(map[string]string)
  182. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  183. marshal, err := json.Marshal(deviceMap)
  184. if err != nil {
  185. fmt.Println("data parse Error :" + err.Error())
  186. return
  187. }
  188. messageMap["content"] = string(marshal)
  189. messageMap["Sender"] = "server"
  190. messageMap["Recipient"] = "client"
  191. //fmt.Print("messageMap:")
  192. //fmt.Println(messageMap)
  193. bytes, err := json.Marshal(messageMap)
  194. if err != nil {
  195. fmt.Printf("json异常:%s", err.Error())
  196. return
  197. }
  198. err = ws.WriteMessage(websocket.TextMessage, bytes)
  199. if err != nil {
  200. fmt.Println("消息发送异常:" + err.Error())
  201. return
  202. }
  203. } else {
  204. err := ws.WriteMessage(websocket.TextMessage, []byte("{}"))
  205. if err != nil {
  206. fmt.Println("消息发送异常:" + err.Error())
  207. return
  208. }
  209. }
  210. //time.Sleep(200 * time.Millisecond)
  211. }
  212. })
  213. }
  214. }