package initialize import ( "confrontation-training/common" "confrontation-training/constant" errors "confrontation-training/err" "confrontation-training/global" "confrontation-training/models/gateway" deviceService "confrontation-training/service/device" "encoding/hex" "encoding/json" "fmt" "github.com/gorilla/websocket" "github.com/r3labs/sse/v2" "github.com/rosshemsley/kalman" "github.com/rosshemsley/kalman/models" "strconv" "strings" "time" ) type DeviceService struct { deviceService.DeviceService } func GetDeviceService() *DeviceService { return &DeviceService{} } func StartDataSSeClient() { fmt.Println("start data sse") var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl global.SseClientData = sse.NewClient(notifyUrl) ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil) if err != nil { fmt.Println("Websocket client init error" + err.Error()) return } //client := global.SseClientData(notifyUrl) client := global.SseClientData client.URL = notifyUrl //库中存在的设备 deviceService := GetDeviceService() d, i := deviceService.FindDeviceByType("") global.DeviceMap = make(map[string]gateway.DeviceInfo) if i > 0 { for j := 0; j < len(d); j++ { global.DeviceMap[d[j].Mac] = d[j] } } err = client.SubscribeRaw(func(msg *sse.Event) { bytes := msg.Data s := string(bytes) nowTime := common.NowTime("2006-01-02 15:04:05") fmt.Println(nowTime) fmt.Println("notify receive data :" + s) //脑电数据 var receiveData gateway.DeviceDataReceived errJson := json.Unmarshal(msg.Data, &receiveData) if errJson != nil { fmt.Println("receive data parse error:" + errJson.Error()) return } deviceInfo := global.DeviceMap[receiveData.Mac] messageMap := make(map[string]string) if deviceInfo.Type == "0" { messageMap["msgType"] = constant.MessageTypeEEGData marshal, _ := json.Marshal(receiveData) messageMap["content"] = string(marshal) } else if deviceInfo.Type == "1" { //心电数据 flag := strings.HasPrefix(receiveData.Value, "E840") || strings.HasPrefix(receiveData.Value, "E841") || strings.HasPrefix(receiveData.Value, "E823") if !flag { //var ecgData []int ecgData := [6]int{} fmt.Println("收到的心电数据:" + string(msg.Data)) fmt.Print("数据长度:") fmt.Println(len(msg.Data)) data, _ := hex.DecodeString(receiveData.Value[4:]) //data := []byte(receiveData.Value[4:]) fmt.Println(len(data)) if len(data) == 18 { for i := 0; i < 18; i += 3 { ecgData[i/3] = int(int32(data[i]&255)<<16 | int32(data[i+1]&255)<<8 | int32(data[i+2]&255)) fmt.Print("ecgData:") fmt.Println(ecgData[i/3]) } } else { for i := len(data); i < 18; i++ { data = append(data, data[i]) } for i := 0; i < 18; i += 3 { ecgData[i/3] = int(int32(data[i]&255)<<16 | int32(data[i+1]&255)<<8 | int32(data[i+2]&255)) fmt.Print("ecgData:") fmt.Println(ecgData[i/3]) } } messageMap["msgType"] = constant.MessageTypeECGData var dataStr []string var t time.Time model := models.NewSimpleModel(t, float64(ecgData[0]), models.SimpleModelConfig{ InitialVariance: 1.0, ProcessVariance: 1.0, ObservationVariance: 2.0, }) filter := kalman.NewKalmanFilter(model) for _, v := range ecgData { t = t.Add(time.Second) filter.Update(t, model.NewMeasurement(float64(v))) fmt.Printf("filtered value: %f\n", model.Value(filter.State())) dataStr = append(dataStr, strconv.FormatFloat(model.Value(filter.State()), 'f', 2, 64)) } //for _, v := range ecgData { // dataStr = append(dataStr, strconv.Itoa(v)) //} receiveData.Value = strings.Join(dataStr, ",") marshal, _ := json.Marshal(receiveData) messageMap["content"] = string(marshal) } } messageMap["Sender"] = "server" messageMap["Recipient"] = "client" bytes, err := json.Marshal(messageMap) //fmt.Println("sendData:" + common.NowTime("2006-01-02 15:04:05")) fmt.Println(messageMap) err = ws.WriteMessage(websocket.TextMessage, bytes) if err != nil { fmt.Println(errors.SendMessageError + err.Error()) return } }) } func StartScanSseClient() { fmt.Println("start scan sse") var baseUrl = global.Config.Gateway.BaseUrl var deviceUrl = global.Config.Gateway.ScanUrl global.SseClientDevice = sse.NewClient(baseUrl + deviceUrl) deviceMap := make(map[string]gateway.DeviceScanned) ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil) if err != nil { fmt.Println("websocket初始化异常:" + err.Error()) return } //库中存在的设备 deviceService := GetDeviceService() d, i := deviceService.FindDeviceByType("") temp := make(map[string]gateway.DeviceInfo) if i > 0 { for j := 0; j < len(d); j++ { temp[d[j].Mac] = d[j] } err = global.SseClientDevice.SubscribeRaw(func(msg *sse.Event) { fmt.Print("扫描数据:") fmt.Print(string(msg.Data)) var data gateway.DeviceScannedFromGateway err := json.Unmarshal(msg.Data, &data) if err != nil { fmt.Printf("SSE数据转换异常:%s", err.Error()) } else { deviceScanned := gateway.DeviceScanned{} deviceScanned.MAC = data.Bdaddrs[0].Bdaddr d := temp[deviceScanned.MAC] if d.ID > 0 { deviceScanned.Name = d.Name deviceScanned.Rssi = data.Rssi deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType deviceScanned.Chip = data.ChipId deviceScanned.DeviceType = d.Type deviceMap[deviceScanned.MAC] = deviceScanned } if deviceScanned.Name == "" { if d.Type == "0" { deviceScanned.Name = "MIND" + strings.ReplaceAll(deviceScanned.MAC[len(deviceScanned.MAC)-5:], ":", "") } else { deviceScanned.Name = "BW-ECG-01" } } if len(deviceMap) > 0 { //fmt.Print("deviceMap:") //fmt.Println(deviceMap) if err != nil { fmt.Println("Websocket client init error" + err.Error()) return } messageMap := make(map[string]string) messageMap["msgType"] = constant.MessageTypeDeviceScanned marshal, err := json.Marshal(deviceMap) if err != nil { fmt.Println("data parse Error :" + err.Error()) return } messageMap["content"] = string(marshal) messageMap["Sender"] = "server" messageMap["Recipient"] = "client" //fmt.Print("messageMap:") //fmt.Println(messageMap) bytes, err := json.Marshal(messageMap) if err != nil { fmt.Printf("json异常:%s", err.Error()) return } err = ws.WriteMessage(websocket.TextMessage, bytes) if err != nil { fmt.Println("消息发送异常:" + err.Error()) return } } else { err := ws.WriteMessage(websocket.TextMessage, []byte("{}")) if err != nil { fmt.Println("消息发送异常:" + err.Error()) return } } //time.Sleep(200 * time.Millisecond) } }) } }