sse.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package gateway
  2. import (
  3. "confrontation-training/constant"
  4. errors "confrontation-training/err"
  5. "confrontation-training/global"
  6. "confrontation-training/models/gateway"
  7. "encoding/json"
  8. "fmt"
  9. "github.com/gorilla/websocket"
  10. "github.com/r3labs/sse/v2"
  11. "strings"
  12. "time"
  13. )
  14. // SseScanDevice 扫描设备
  15. func SseScanDevice(paramMap map[string]string, filterType string) {
  16. var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl
  17. deviceMap := make(map[string]gateway.DeviceScanned)
  18. if len(paramMap) > 0 {
  19. for key, value := range paramMap {
  20. scanUrl += "&" + key + "=" + value
  21. }
  22. }
  23. //events := make(chan *sse.Event)
  24. client := sse.NewClient(scanUrl)
  25. go func() {
  26. fmt.Println("开始SSE")
  27. //err := client.SubscribeChan("messages", events)
  28. //if err != nil {
  29. // return
  30. //}
  31. err := client.SubscribeRaw(func(msg *sse.Event) {
  32. //fmt.Println(string(msg.Data))
  33. var data gateway.DeviceScannedFromGateway
  34. err := json.Unmarshal(msg.Data, &data)
  35. if err != nil {
  36. fmt.Printf("SSE数据转换异常:%s", err.Error())
  37. return
  38. }
  39. deviceScanned := gateway.DeviceScanned{}
  40. deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
  41. if filterType == "1" {
  42. deviceService := GetDeviceService()
  43. _, i := deviceService.FindDeviceByMac(deviceScanned.MAC)
  44. if i > 0 {
  45. deviceScanned.Name = data.Name
  46. deviceScanned.Rssi = data.Rssi
  47. deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
  48. deviceScanned.Chip = data.ChipId
  49. deviceMap[deviceScanned.MAC] = deviceScanned
  50. fmt.Println(deviceScanned)
  51. }
  52. }
  53. })
  54. if err != nil {
  55. fmt.Println(err.Error())
  56. return
  57. }
  58. }()
  59. ticket := time.NewTicker(time.Duration(global.Config.Gateway.ScanSecond) * time.Millisecond)
  60. for range ticket.C {
  61. fmt.Println("当前时间:" + time.Now().Format("2006-01-02 15:04:05"))
  62. if len(deviceMap) > 0 {
  63. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  64. if err != nil {
  65. fmt.Println("Websocket client init error" + err.Error())
  66. return
  67. }
  68. messageMap := make(map[string]string)
  69. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  70. marshal, err := json.Marshal(deviceMap)
  71. if err != nil {
  72. fmt.Println("data parse Error :" + err.Error())
  73. return
  74. }
  75. messageMap["data"] = string(marshal)
  76. bytes, err := json.Marshal(messageMap)
  77. if err != nil {
  78. fmt.Printf("json异常:%s", err.Error())
  79. return
  80. }
  81. err = ws.WriteMessage(websocket.TextMessage, bytes)
  82. if err != nil {
  83. fmt.Println("消息发送异常:" + err.Error())
  84. return
  85. }
  86. }
  87. }
  88. }
  89. // SseOpenNotify 开启通知
  90. func SseOpenNotify() {
  91. var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl
  92. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  93. if err != nil {
  94. fmt.Println("Websocket client init error" + err.Error())
  95. return
  96. }
  97. client := sse.NewClient(notifyUrl)
  98. err = client.SubscribeRaw(func(msg *sse.Event) {
  99. s := string(msg.Data)
  100. //fmt.Println("notify receive data :" + s)
  101. flag := strings.HasPrefix(s, "E840") || strings.HasPrefix(s, "E841") || strings.HasPrefix(s, "E823")
  102. if !flag {
  103. //websocket 通知数据
  104. //msgMap := make(map[string]string)
  105. //msgMap[]
  106. var receiveData gateway.DeviceDataReceived
  107. errJson := json.Unmarshal(msg.Data, &receiveData)
  108. if errJson != nil {
  109. fmt.Println("receive data parse error:" + errJson.Error())
  110. //panic(err)
  111. return
  112. }
  113. messageMap := make(map[string]string)
  114. messageMap["msgType"] = constant.MessageTypeData
  115. messageMap["data"] = string(msg.Data)
  116. err := ws.WriteMessage(websocket.TextMessage, msg.Data)
  117. if err != nil {
  118. fmt.Println(errors.SendMessageError + err.Error())
  119. return
  120. }
  121. }
  122. })
  123. if err != nil {
  124. return
  125. }
  126. }