package gateway import ( "confrontation-training/constant" errors "confrontation-training/err" "confrontation-training/global" "confrontation-training/models/gateway" "confrontation-training/response" "encoding/json" "fmt" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/r3labs/sse/v2" "net/http" "strings" "time" ) // SseScanDevice 扫描设备 func SseScanDevice(paramMap map[string]string, filterType string, r *http.Request) { var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl deviceMap := make(map[string]gateway.DeviceScanned) if len(paramMap) > 0 { for key, value := range paramMap { scanUrl += "&" + key + "=" + value } } //events := make(chan *sse.Event) client := sse.NewClient(scanUrl) go func() { fmt.Println("开始SSE") //err := client.SubscribeChan("messages", events) //if err != nil { // return //} err := client.SubscribeRaw(func(msg *sse.Event) { //fmt.Println(string(msg.Data)) var data gateway.DeviceScannedFromGateway err := json.Unmarshal(msg.Data, &data) if err != nil { fmt.Printf("SSE数据转换异常:%s", err.Error()) return } deviceScanned := gateway.DeviceScanned{} deviceScanned.MAC = data.Bdaddrs[0].Bdaddr if filterType == "1" { deviceService := GetDeviceService() _, i := deviceService.FindDeviceByMac(deviceScanned.MAC) if i > 0 { deviceScanned.Name = data.Name deviceScanned.Rssi = data.Rssi deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType deviceScanned.Chip = data.ChipId deviceMap[deviceScanned.MAC] = deviceScanned fmt.Println(deviceScanned) } } }) if err != nil { fmt.Println(err.Error()) return } <-r.Context().Done() return }() ticket := time.NewTicker(time.Duration(global.Config.Gateway.ScanSecond) * time.Millisecond) for range ticket.C { fmt.Println("当前时间:" + time.Now().Format("2006-01-02 15:04:05")) ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil) if len(deviceMap) > 0 { if err != nil { fmt.Println("Websocket client init error" + err.Error()) return } messageMap := make(map[string]string) messageMap["msgType"] = constant.MessageTypeDeviceScanned marshal, err := json.Marshal(deviceMap) if err != nil { fmt.Println("data parse Error :" + err.Error()) return } messageMap["data"] = string(marshal) bytes, err := json.Marshal(messageMap) if err != nil { fmt.Printf("json异常:%s", err.Error()) return } err = ws.WriteMessage(websocket.TextMessage, bytes) if err != nil { fmt.Println("消息发送异常:" + err.Error()) return } } else { err := ws.WriteMessage(websocket.TextMessage, []byte("{}")) if err != nil { fmt.Println("消息发送异常:" + err.Error()) return } } client = nil ws = nil } } func SseScanDevice2(c *gin.Context) { var scanUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.ScanUrl var param gateway.DeviceScanParam err := c.ShouldBindJSON(¶m) if err != nil { fmt.Printf("参数格式化异常:%s", err.Error()) response.Failed(errors.ParamInvalid, c) return } paramMap := make(map[string]string) if param.Chip != "" { paramMap["chip"] = param.Chip } if param.FilterName != "" { //查询设备mac过滤信息 filterMac := "" deviceInfos, count := GetDeviceService().DeviceService.FindDeviceByType(param.FilterName) if count > 0 { for i := range deviceInfos { filterMac += deviceInfos[i].Mac + "," } } if len(filterMac) > 0 { if strings.HasSuffix(filterMac, ",") { filterMac = filterMac[0 : len(filterMac)-1] } paramMap["filter_mac"] = filterMac } if param.FilterName == "0" { paramMap["filter_name"] = constant.FilterNameEEG } else if param.FilterName == "1" { paramMap["filter_name"] = constant.FilterNameECG } else { response.Failed(errors.ParamInvalid+":过滤类型-"+param.FilterName+"无效", c) return } } else { paramMap["filter_name"] = constant.FilterNameALL } if param.FilterRssi != "" { paramMap["filter_rssi"] = param.FilterRssi } if param.FilterMac != "" { paramMap["filter_mac"] = param.FilterMac } paramMap["active"] = "1" paramMap["event"] = "1" deviceMap := make(map[string]gateway.DeviceScanned) if len(paramMap) > 0 { for key, value := range paramMap { scanUrl += "&" + key + "=" + value } } //events := make(chan *sse.Event) client := sse.NewClient(scanUrl) fmt.Println("开始SSE") flusher := c.Writer.(http.Flusher) err = client.SubscribeRaw(func(msg *sse.Event) { //fmt.Println(string(msg.Data)) var data gateway.DeviceScannedFromGateway err := json.Unmarshal(msg.Data, &data) if err != nil { fmt.Printf("SSE数据转换异常:%s", err.Error()) return } deviceScanned := gateway.DeviceScanned{} deviceScanned.MAC = data.Bdaddrs[0].Bdaddr //if param.FilterType == "1" { // deviceService := GetDeviceService() // _, i := deviceService.FindDeviceByMac(deviceScanned.MAC) // if i > 0 { // deviceScanned.Name = data.Name // deviceScanned.Rssi = data.Rssi // deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType // deviceScanned.Chip = data.ChipId // deviceMap[deviceScanned.MAC] = deviceScanned // fmt.Println(deviceScanned) // } //} deviceScanned.Name = data.Name deviceScanned.Rssi = data.Rssi deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType deviceScanned.Chip = data.ChipId deviceMap[deviceScanned.MAC] = deviceScanned fmt.Println(deviceScanned) if len(deviceMap) > 0 { marshal, _ := json.Marshal(deviceMap) c.Writer.Write(marshal) //fmt.Fprintf(c.Writer, string(marshal)) flusher.Flush() } go func() { <-c.Request.Context().Done() client = nil return }() }) if err != nil { fmt.Println(err.Error()) return } return } // SseOpenNotify 开启通知 func SseOpenNotify() { var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil) if err != nil { fmt.Println("Websocket client init error" + err.Error()) return } client := sse.NewClient(notifyUrl) err = client.SubscribeRaw(func(msg *sse.Event) { s := string(msg.Data) //fmt.Println("notify receive data :" + s) flag := strings.HasPrefix(s, "E840") || strings.HasPrefix(s, "E841") || strings.HasPrefix(s, "E823") if !flag { //websocket 通知数据 //msgMap := make(map[string]string) //msgMap[] var receiveData gateway.DeviceDataReceived errJson := json.Unmarshal(msg.Data, &receiveData) if errJson != nil { fmt.Println("receive data parse error:" + errJson.Error()) //panic(err) return } messageMap := make(map[string]string) messageMap["msgType"] = constant.MessageTypeData messageMap["data"] = string(msg.Data) err := ws.WriteMessage(websocket.TextMessage, msg.Data) if err != nil { fmt.Println(errors.SendMessageError + err.Error()) return } } }) if err != nil { return } }