package initialize import ( gateway2 "confrontation-training/api/gateway" "confrontation-training/common" "confrontation-training/constant" "confrontation-training/global" "confrontation-training/models/emq" "confrontation-training/models/gateway" "encoding/hex" "encoding/json" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" "strconv" "strings" "time" ) func CreateEmqClient() { config := global.Config global.EmqConfig.ClientId = config.EmqConfig.ClientId global.EmqConfig.Qos = config.EmqConfig.Qos global.EmqConfig.Topic = config.EmqConfig.Topic global.EmqConfig.UserName = config.EmqConfig.UserName global.EmqConfig.Password = config.EmqConfig.Password global.EmqConfig.Protocol = config.EmqConfig.Protocol global.EmqConfig.Port = config.EmqConfig.Port global.EmqConfig.Broker = config.EmqConfig.Broker global.EmqConfig.Filter = config.EmqConfig.Filter global.EmqConfig.GatewayMac = config.EmqConfig.GatewayMac global.EmqConfig.FirstOpen = "0" emqConfig := global.EmqConfig connectAddress := fmt.Sprintf("%s://%s:%d", emqConfig.Protocol, emqConfig.Broker, emqConfig.Port) fmt.Println("connect address:", connectAddress) _ = global.Log4J.Info("connect address:", connectAddress) opts := mqtt.NewClientOptions() opts.AddBroker(connectAddress) opts.SetUsername(emqConfig.UserName) opts.SetPassword(emqConfig.Password) opts.SetClientID(emqConfig.ClientId) opts.SetKeepAlive(time.Second * 60) global.EmqClient = mqtt.NewClient(opts) client := global.EmqClient token := client.Connect() if token.WaitTimeout(time.Second*3) && token.Error() != nil { _ = global.Log4J.Error(token.Error()) } SubScribe(client) topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub SendUUID(global.EmqClient, topic) //添加过滤 fmt.Println("添加设备过滤:") _ = global.Log4J.Info("添加设备过滤:") filterCmd := "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"}" for _, filter := range global.EmqConfig.Filter { filterCmd = filterCmd + ",{\"t\":\"0\",\"d\":\"" + filter + "\"}" } filterCmd = filterCmd + "]" fmt.Println(filterCmd) _ = global.Log4J.Info(filterCmd) //filterCmd = "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"},{\"t\":\"0\",\"d\":\"MIND\"},{\"t\":\"0\",\"d\":\"BW-ECG\"}]" Publish(client, topic, filterCmd) //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe") //Publish(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", constant.CmdStartScan) //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe") //读取UUID //ReadUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe") //连接设备 //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D30033AF0", "0", "0", "0") //fmt.Println("link device DC0D30033AF0 over") //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D300335FC", "0", "0", "0") } func Publish(client mqtt.Client, topic string, message string) { if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil { fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message) _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message) } else { fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message) _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message) } //time.Sleep(time.Second * 1) time.Sleep(time.Millisecond * 100) } func SendUUID(client mqtt.Client, topic string) { if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, constant.CmdSetUUID); token.Wait() && token.Error() != nil { fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, constant.CmdSetUUID) _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", constant.CmdSetUUID) } else { fmt.Printf("publish success, topic: %s, payload: %s\n", topic, constant.CmdSetUUID) _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", constant.CmdSetUUID) } //time.Sleep(time.Second * 1) time.Sleep(time.Millisecond * 100) } func ConnectDevice(client mqtt.Client, topic string, mac string, index string, ai string, at string) { message := "[{\"cmd\":\"AT+CNN=\",\"m\":\"" + mac + "\",\"i\":\"" + index + "\",\"ai\":\"" + ai + "\",\"at\":\"" + at + "\",\"l\":\"1\",\"x\":\"251\",\"relink\":\"0\",\"timeout\":\"12000\"}]" if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil { fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message) _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message) } else { fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message) _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", message) } //time.Sleep(time.Second * 1) time.Sleep(time.Millisecond * 100) } func SubScribe(client mqtt.Client) { emqConfig := global.EmqConfig ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil) if err != nil { fmt.Println("socket通道初始化失败") _ = global.Log4J.Error("socket通道初始化失败") panic(err) return } deviceService := GetDeviceService() d, i := deviceService.FindDeviceByType("") global.DeviceMap = make(map[string]gateway.DeviceInfo) if i > 0 { for j := 0; j < len(d); j++ { global.DeviceMap[d[j].Mac] = d[j] } } for _, topic := range emqConfig.Topic { topic = "/" + global.EmqConfig.GatewayMac + topic client.Subscribe(topic, byte(emqConfig.Qos), func(client mqtt.Client, message mqtt.Message) { _ = global.Log4J.Info("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic()) fmt.Printf("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic()) messageMap := make(map[string]string) var payloads []emq.Payload err := json.Unmarshal(message.Payload(), &payloads) if err != nil { _ = fmt.Errorf("%s", err.Error()) return } for _, payload := range payloads { messageMap["state"] = payload.State content := "" sendFlag := true switch payload.Cmd { case constant.PayloadScan: { //关闭扫描 if payload.S == "0" { messageMap["msgType"] = constant.MessageTypeCloseScan content = "关闭扫描" //开启扫描 } else if payload.S == "1" { messageMap["msgType"] = constant.MessageTypeStartScan content = "开启扫描" } else if payload.S == "" { //接收到蓝牙扫描数据 if strings.HasPrefix(payload.N, "MIND") { payload.T = "0" } else if strings.HasPrefix(payload.N, "BW-ECG") { payload.T = "1" } payload.M = transMac(payload.M) info := global.DeviceMap[payload.M] if info.ID != 0 { payload.AliasName = info.AliasName } messageMap["msgType"] = constant.MessageTypeDeviceScanned marshal, _ := json.Marshal(payload) content = string(marshal) } } case constant.PayloadFLT: messageMap["msgType"] = constant.MessageTypeFilter content = "设置过滤" case constant.PayloadSUUID: messageMap["msgType"] = constant.MessageTypeSUUID content = "设置UUID" if payload.State == "SUCCESS" { global.EmqConfig.FirstOpen = "1" } else { global.EmqConfig.FirstOpen = "0" } case constant.PayloadConnect: messageMap["msgType"] = constant.MessageTypeConnect //content = "设备连接" if payload.State == "SUCCESS" { realMac := transMac(payload.M) deviceInfo := global.DeviceMap[realMac] payload.T = deviceInfo.Type payload.M = realMac info := global.DeviceMap[payload.M] if info.ID != 0 { payload.AliasName = info.AliasName } marshal, _ := json.Marshal(payload) content = string(marshal) } for i := 0; i < 6; i++ { topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i)) } case constant.PayloadDisConnect: messageMap["msgType"] = constant.MessageTypeDisConnect //content = "断开设备连接" realMac := transMac(payload.M) deviceInfo := global.DeviceMap[realMac] payload.T = deviceInfo.Type payload.M = realMac info := global.DeviceMap[payload.M] if info.ID != 0 { payload.AliasName = info.AliasName } marshal, _ := json.Marshal(payload) content = string(marshal) if payload.State == "SUCCESS" && payload.ReasonCode == "62" { messageMap["msgType"] = constant.MessageTypeConnect } //断开连接成功或连接失败,发送已连接设备列表 for i := 0; i < 6; i++ { topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i)) } case constant.PayloadQuenue: messageMap["msgType"] = constant.MessageTypeConnectList if payload.M == "" { sendFlag = false } else { realMac := transMac(payload.M) deviceInfo := global.DeviceMap[realMac] if deviceInfo.ID == 0 { continue } else { payload.T = deviceInfo.Type payload.N = deviceInfo.Name payload.M = transMac(payload.M) payload.AliasName = deviceInfo.AliasName marshal, _ := json.Marshal(payload) content = string(marshal) } } case constant.PayloadCNB: connectedNum, _ := strconv.Atoi(payload.N) for i := 0; i < connectedNum; i++ { topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i)) } case constant.PayloadWNP: messageMap["msgType"] = constant.MessageTypeWNP realMac := transMac(payload.M) deviceInfo := global.DeviceMap[realMac] payload.T = deviceInfo.Type payload.N = deviceInfo.Name payload.M = transMac(payload.M) payload.AliasName = deviceInfo.AliasName marshal, _ := json.Marshal(payload) content = string(marshal) for i := 0; i < 6; i++ { topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i)) } case constant.PayloadNotify: //接收到蓝牙通知数据 //1.判断是脑电数据还是心电数据 //1.1脑电数据直接转发 //1.2心电数据做处理 realMac := transMac(payload.M) deviceInfo := global.DeviceMap[realMac] var receiveData gateway.DeviceDataReceived receiveData.Mac = realMac receiveData.Value = payload.D receiveData.AliasName = deviceInfo.AliasName if deviceInfo.Type == "0" { //脑电 messageMap["msgType"] = constant.MessageTypeEEGData marshal, _ := json.Marshal(receiveData) content = string(marshal) } else if deviceInfo.Type == "1" { //心电 messageMap["msgType"] = constant.MessageTypeECGData flag := strings.HasPrefix(receiveData.Value, "E840") || strings.HasPrefix(receiveData.Value, "E841") || strings.HasPrefix(receiveData.Value, "E823") || strings.HasPrefix(receiveData.Value, "E820") || strings.HasPrefix(receiveData.Value, "E81F") || strings.HasPrefix(receiveData.Value, "E813") || strings.HasPrefix(receiveData.Value, "E810") || strings.HasPrefix(receiveData.Value, "E822") || strings.HasPrefix(receiveData.Value, "E826") || strings.HasPrefix(receiveData.Value, "E8FF00000000") if !flag { //var ecgData []int ecgData := [6]int{} data := byteString2ByteArray(receiveData.Value[4:]) if len(data) == 18 { for i := 0; i < 18; i += 3 { ecgData[i/3] = data[i]<<16 | data[i+1]<<8 | data[i+2] } } else { for len(data) < 18 { for i := len(data); i < 18; i++ { data = append(data, data[i]) } } for i := 0; i < 18; i += 3 { ecgData[i/3] = int(16<