123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- package initialize
- import (
- "confrontation-training/common"
- "confrontation-training/constant"
- errors "confrontation-training/err"
- "confrontation-training/global"
- "confrontation-training/models/gateway"
- deviceService "confrontation-training/service/device"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "github.com/gorilla/websocket"
- "github.com/r3labs/sse/v2"
- "github.com/rosshemsley/kalman"
- "github.com/rosshemsley/kalman/models"
- "strconv"
- "strings"
- "time"
- )
- type DeviceService struct {
- deviceService.DeviceService
- }
- func GetDeviceService() *DeviceService {
- return &DeviceService{}
- }
- func StartDataSSeClient() {
- fmt.Println("start data sse")
- var notifyUrl = global.Config.Gateway.BaseUrl + global.Config.Gateway.NotifyUrl
- global.SseClientData = sse.NewClient(notifyUrl)
- ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
- if err != nil {
- fmt.Println("Websocket client init error" + err.Error())
- return
- }
- //client := global.SseClientData(notifyUrl)
- client := global.SseClientData
- client.URL = notifyUrl
- //库中存在的设备
- deviceService := GetDeviceService()
- d, i := deviceService.FindDeviceByType("")
- global.DeviceMap = make(map[string]gateway.DeviceInfo)
- if i > 0 {
- for j := 0; j < len(d); j++ {
- global.DeviceMap[d[j].Mac] = d[j]
- }
- }
- err = client.SubscribeRaw(func(msg *sse.Event) {
- bytes := msg.Data
- s := string(bytes)
- nowTime := common.NowTime("2006-01-02 15:04:05")
- fmt.Println(nowTime)
- fmt.Println("notify receive data :" + s)
- //脑电数据
- var receiveData gateway.DeviceDataReceived
- errJson := json.Unmarshal(msg.Data, &receiveData)
- if errJson != nil {
- fmt.Println("receive data parse error:" + errJson.Error())
- return
- }
- deviceInfo := global.DeviceMap[receiveData.Mac]
- messageMap := make(map[string]string)
- if deviceInfo.Type == "0" {
- messageMap["msgType"] = constant.MessageTypeEEGData
- marshal, _ := json.Marshal(receiveData)
- messageMap["content"] = string(marshal)
- } else if deviceInfo.Type == "1" {
- //心电数据
- flag := strings.HasPrefix(receiveData.Value, "E840") || strings.HasPrefix(receiveData.Value, "E841") || strings.HasPrefix(receiveData.Value, "E823")
- if !flag {
- //var ecgData []int
- ecgData := [6]int{}
- fmt.Println("收到的心电数据:" + string(msg.Data))
- fmt.Print("数据长度:")
- fmt.Println(len(msg.Data))
- data, _ := hex.DecodeString(receiveData.Value[4:])
- //data := []byte(receiveData.Value[4:])
- fmt.Println(len(data))
- if len(data) == 18 {
- for i := 0; i < 18; i += 3 {
- ecgData[i/3] = int(int32(data[i]&255)<<16 | int32(data[i+1]&255)<<8 | int32(data[i+2]&255))
- fmt.Print("ecgData:")
- fmt.Println(ecgData[i/3])
- }
- } else {
- for i := len(data); i < 18; i++ {
- data = append(data, data[i])
- }
- for i := 0; i < 18; i += 3 {
- ecgData[i/3] = int(int32(data[i]&255)<<16 | int32(data[i+1]&255)<<8 | int32(data[i+2]&255))
- fmt.Print("ecgData:")
- fmt.Println(ecgData[i/3])
- }
- }
- messageMap["msgType"] = constant.MessageTypeECGData
- var dataStr []string
- var t time.Time
- model := models.NewSimpleModel(t, float64(ecgData[0]), models.SimpleModelConfig{
- InitialVariance: 1.0,
- ProcessVariance: 1.0,
- ObservationVariance: 2.0,
- })
- filter := kalman.NewKalmanFilter(model)
- for _, v := range ecgData {
- t = t.Add(time.Second)
- filter.Update(t, model.NewMeasurement(float64(v)))
- fmt.Printf("filtered value: %f\n", model.Value(filter.State()))
- dataStr = append(dataStr, strconv.FormatFloat(model.Value(filter.State()), 'f', 2, 64))
- }
- //for _, v := range ecgData {
- // dataStr = append(dataStr, strconv.Itoa(v))
- //}
- receiveData.Value = strings.Join(dataStr, ",")
- marshal, _ := json.Marshal(receiveData)
- messageMap["content"] = string(marshal)
- }
- }
- messageMap["Sender"] = "server"
- messageMap["Recipient"] = "client"
- bytes, err := json.Marshal(messageMap)
- //fmt.Println("sendData:" + common.NowTime("2006-01-02 15:04:05"))
- fmt.Println(messageMap)
- err = ws.WriteMessage(websocket.TextMessage, bytes)
- if err != nil {
- fmt.Println(errors.SendMessageError + err.Error())
- return
- }
- })
- }
- func StartScanSseClient() {
- fmt.Println("start scan sse")
- var baseUrl = global.Config.Gateway.BaseUrl
- var deviceUrl = global.Config.Gateway.ScanUrl
- global.SseClientDevice = sse.NewClient(baseUrl + deviceUrl)
- deviceMap := make(map[string]gateway.DeviceScanned)
- ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
- if err != nil {
- fmt.Println("websocket初始化异常:" + err.Error())
- return
- }
- //库中存在的设备
- deviceService := GetDeviceService()
- d, i := deviceService.FindDeviceByType("")
- temp := make(map[string]gateway.DeviceInfo)
- if i > 0 {
- for j := 0; j < len(d); j++ {
- temp[d[j].Mac] = d[j]
- }
- err = global.SseClientDevice.SubscribeRaw(func(msg *sse.Event) {
- fmt.Print("扫描数据:")
- fmt.Print(string(msg.Data))
- var data gateway.DeviceScannedFromGateway
- err := json.Unmarshal(msg.Data, &data)
- if err != nil {
- fmt.Printf("SSE数据转换异常:%s", err.Error())
- } else {
- deviceScanned := gateway.DeviceScanned{}
- deviceScanned.MAC = data.Bdaddrs[0].Bdaddr
- d := temp[deviceScanned.MAC]
- if d.ID > 0 {
- deviceScanned.Name = d.Name
- deviceScanned.Rssi = data.Rssi
- deviceScanned.BdadrType = data.Bdaddrs[0].BdaddrType
- deviceScanned.Chip = data.ChipId
- deviceScanned.DeviceType = d.Type
- deviceMap[deviceScanned.MAC] = deviceScanned
- }
- if deviceScanned.Name == "" {
- if d.Type == "0" {
- deviceScanned.Name = "MIND" + strings.ReplaceAll(deviceScanned.MAC[len(deviceScanned.MAC)-5:], ":", "")
- } else {
- deviceScanned.Name = "BW-ECG-01"
- }
- }
- if len(deviceMap) > 0 {
- //fmt.Print("deviceMap:")
- //fmt.Println(deviceMap)
- if err != nil {
- fmt.Println("Websocket client init error" + err.Error())
- return
- }
- messageMap := make(map[string]string)
- messageMap["msgType"] = constant.MessageTypeDeviceScanned
- marshal, err := json.Marshal(deviceMap)
- if err != nil {
- fmt.Println("data parse Error :" + err.Error())
- return
- }
- messageMap["content"] = string(marshal)
- messageMap["Sender"] = "server"
- messageMap["Recipient"] = "client"
- //fmt.Print("messageMap:")
- //fmt.Println(messageMap)
- bytes, err := json.Marshal(messageMap)
- if err != nil {
- fmt.Printf("json异常:%s", err.Error())
- return
- }
- err = ws.WriteMessage(websocket.TextMessage, bytes)
- if err != nil {
- fmt.Println("消息发送异常:" + err.Error())
- return
- }
- } else {
- err := ws.WriteMessage(websocket.TextMessage, []byte("{}"))
- if err != nil {
- fmt.Println("消息发送异常:" + err.Error())
- return
- }
- }
- //time.Sleep(200 * time.Millisecond)
- }
- })
- }
- }
|