123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- package gateway
- import (
- "confrontation-training/constant"
- errors "confrontation-training/err"
- "confrontation-training/global"
- "confrontation-training/models/gateway"
- "confrontation-training/response"
- "encoding/json"
- "fmt"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- "github.com/r3labs/sse/v2"
- "net/http"
- "strings"
- "time"
- )
- // SseScanDevice 扫描设备
- func SseScanDevice(paramMap map[string]string, filterType string, r *http.Request) {
- var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl
- deviceMap := make(map[string]gateway.DeviceScanned)
- if len(paramMap) > 0 {
- for key, value := range paramMap {
- scanUrl += "&" + key + "=" + value
- }
- }
- //events := make(chan *sse.Event)
- client := sse.NewClient(scanUrl)
- go func() {
- fmt.Println("开始SSE")
- //err := client.SubscribeChan("messages", events)
- //if err != nil {
- // return
- //}
- err := client.SubscribeRaw(func(msg *sse.Event) {
- //fmt.Println(string(msg.Data))
- var data gateway.DeviceScannedFromGateway
- err := json.Unmarshal(msg.Data, &data)
- if err != nil {
- fmt.Printf("SSE数据转换异常:%s", err.Error())
- return
- }
- deviceScanned := gateway.DeviceScanned{}
- deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
- if filterType == "1" {
- deviceService := GetDeviceService()
- _, i := deviceService.FindDeviceByMac(deviceScanned.MAC)
- if i > 0 {
- deviceScanned.Name = data.Name
- deviceScanned.Rssi = data.Rssi
- deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
- deviceScanned.Chip = data.ChipId
- deviceMap[deviceScanned.MAC] = deviceScanned
- fmt.Println(deviceScanned)
- }
- }
- })
- if err != nil {
- fmt.Println(err.Error())
- return
- }
- <-r.Context().Done()
- return
- }()
- ticket := time.NewTicker(time.Duration(global.Config.Gateway.ScanSecond) * time.Millisecond)
- for range ticket.C {
- fmt.Println("当前时间:" + time.Now().Format("2006-01-02 15:04:05"))
- ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
- if len(deviceMap) > 0 {
- if err != nil {
- fmt.Println("Websocket client init error" + err.Error())
- return
- }
- messageMap := make(map[string]string)
- messageMap["msgType"] = constant.MessageTypeDeviceScanned
- marshal, err := json.Marshal(deviceMap)
- if err != nil {
- fmt.Println("data parse Error :" + err.Error())
- return
- }
- messageMap["data"] = string(marshal)
- bytes, err := json.Marshal(messageMap)
- if err != nil {
- fmt.Printf("json异常:%s", err.Error())
- return
- }
- err = ws.WriteMessage(websocket.TextMessage, bytes)
- if err != nil {
- fmt.Println("消息发送异常:" + err.Error())
- return
- }
- } else {
- err := ws.WriteMessage(websocket.TextMessage, []byte("{}"))
- if err != nil {
- fmt.Println("消息发送异常:" + err.Error())
- return
- }
- }
- client = nil
- ws = nil
- }
- }
- func SseScanDevice2(c *gin.Context) {
- var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl
- var param gateway.DeviceScanParam
- err := c.ShouldBindJSON(¶m)
- if err != nil {
- fmt.Printf("参数格式化异常:%s", err.Error())
- response.Failed(errors.ParamInvalid, c)
- return
- }
- paramMap := make(map[string]string)
- if param.Chip != "" {
- paramMap["chip"] = param.Chip
- }
- if param.FilterName != "" {
- //查询设备mac过滤信息
- filterMac := ""
- deviceInfos, count := GetDeviceService().DeviceService.FindDeviceByType(param.FilterName)
- if count > 0 {
- for i := range deviceInfos {
- filterMac += deviceInfos[i].Mac + ","
- }
- }
- if len(filterMac) > 0 {
- if strings.HasSuffix(filterMac, ",") {
- filterMac = filterMac[0 : len(filterMac)-1]
- }
- paramMap["filter_mac"] = filterMac
- }
- if param.FilterName == "0" {
- paramMap["filter_name"] = constant.FilterNameEEG
- } else if param.FilterName == "1" {
- paramMap["filter_name"] = constant.FilterNameECG
- } else {
- response.Failed(errors.ParamInvalid+":过滤类型-"+param.FilterName+"无效", c)
- return
- }
- } else {
- paramMap["filter_name"] = constant.FilterNameALL
- }
- if param.FilterRssi != "" {
- paramMap["filter_rssi"] = param.FilterRssi
- }
- if param.FilterMac != "" {
- paramMap["filter_mac"] = param.FilterMac
- }
- paramMap["active"] = "1"
- paramMap["event"] = "1"
- deviceMap := make(map[string]gateway.DeviceScanned)
- if len(paramMap) > 0 {
- for key, value := range paramMap {
- scanUrl += "&" + key + "=" + value
- }
- }
- //events := make(chan *sse.Event)
- client := sse.NewClient(scanUrl)
- fmt.Println("开始SSE")
- flusher := c.Writer.(http.Flusher)
- err = client.SubscribeRaw(func(msg *sse.Event) {
- //fmt.Println(string(msg.Data))
- var data gateway.DeviceScannedFromGateway
- err := json.Unmarshal(msg.Data, &data)
- if err != nil {
- fmt.Printf("SSE数据转换异常:%s", err.Error())
- return
- }
- deviceScanned := gateway.DeviceScanned{}
- deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
- //if param.FilterType == "1" {
- // deviceService := GetDeviceService()
- // _, i := deviceService.FindDeviceByMac(deviceScanned.MAC)
- // if i > 0 {
- // deviceScanned.Name = data.Name
- // deviceScanned.Rssi = data.Rssi
- // deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
- // deviceScanned.Chip = data.ChipId
- // deviceMap[deviceScanned.MAC] = deviceScanned
- // fmt.Println(deviceScanned)
- // }
- //}
- deviceScanned.Name = data.Name
- deviceScanned.Rssi = data.Rssi
- deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
- deviceScanned.Chip = data.ChipId
- deviceMap[deviceScanned.MAC] = deviceScanned
- fmt.Println(deviceScanned)
- if len(deviceMap) > 0 {
- marshal, _ := json.Marshal(deviceMap)
- c.Writer.Write(marshal)
- //fmt.Fprintf(c.Writer, string(marshal))
- flusher.Flush()
- }
- go func() {
- <-c.Request.Context().Done()
- client = nil
- return
- }()
- })
- if err != nil {
- fmt.Println(err.Error())
- return
- }
- return
- }
- // SseOpenNotify 开启通知
- func SseOpenNotify() {
- var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl
- ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
- if err != nil {
- fmt.Println("Websocket client init error" + err.Error())
- return
- }
- client := sse.NewClient(notifyUrl)
- err = client.SubscribeRaw(func(msg *sse.Event) {
- s := string(msg.Data)
- //fmt.Println("notify receive data :" + s)
- flag := strings.HasPrefix(s, "E840") || strings.HasPrefix(s, "E841") || strings.HasPrefix(s, "E823")
- if !flag {
- //websocket 通知数据
- //msgMap := make(map[string]string)
- //msgMap[]
- var receiveData gateway.DeviceDataReceived
- errJson := json.Unmarshal(msg.Data, &receiveData)
- if errJson != nil {
- fmt.Println("receive data parse error:" + errJson.Error())
- //panic(err)
- return
- }
- messageMap := make(map[string]string)
- messageMap["msgType"] = constant.MessageTypeData
- messageMap["data"] = string(msg.Data)
- err := ws.WriteMessage(websocket.TextMessage, msg.Data)
- if err != nil {
- fmt.Println(errors.SendMessageError + err.Error())
- return
- }
- }
- })
- if err != nil {
- return
- }
- }
|