123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- package initialize
- import (
- gateway2 "confrontation-training/api/gateway"
- "confrontation-training/common"
- "confrontation-training/constant"
- "confrontation-training/global"
- "confrontation-training/models/emq"
- "confrontation-training/models/gateway"
- "encoding/hex"
- "encoding/json"
- "fmt"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/gorilla/websocket"
- log "github.com/sirupsen/logrus"
- "strconv"
- "strings"
- "time"
- )
- func CreateEmqClient() {
- config := global.Config
- global.EmqConfig.ClientId = config.EmqConfig.ClientId
- global.EmqConfig.Qos = config.EmqConfig.Qos
- global.EmqConfig.Topic = config.EmqConfig.Topic
- global.EmqConfig.UserName = config.EmqConfig.UserName
- global.EmqConfig.Password = config.EmqConfig.Password
- global.EmqConfig.Protocol = config.EmqConfig.Protocol
- global.EmqConfig.Port = config.EmqConfig.Port
- global.EmqConfig.Broker = config.EmqConfig.Broker
- global.EmqConfig.Filter = config.EmqConfig.Filter
- global.EmqConfig.GatewayMac = config.EmqConfig.GatewayMac
- global.EmqConfig.FirstOpen = "0"
- emqConfig := global.EmqConfig
- connectAddress := fmt.Sprintf("%s://%s:%d", emqConfig.Protocol, emqConfig.Broker, emqConfig.Port)
- fmt.Println("connect address:", connectAddress)
- _ = global.Log4J.Info("connect address:", connectAddress)
- opts := mqtt.NewClientOptions()
- opts.AddBroker(connectAddress)
- opts.SetUsername(emqConfig.UserName)
- opts.SetPassword(emqConfig.Password)
- opts.SetClientID(emqConfig.ClientId)
- opts.SetKeepAlive(time.Second * 60)
- global.EmqClient = mqtt.NewClient(opts)
- client := global.EmqClient
- token := client.Connect()
- if token.WaitTimeout(time.Second*3) && token.Error() != nil {
- _ = global.Log4J.Error(token.Error())
- }
- SubScribe(client)
- topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
- SendUUID(global.EmqClient, topic)
- //添加过滤
- fmt.Println("添加设备过滤:")
- _ = global.Log4J.Info("添加设备过滤:")
- filterCmd := "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"}"
- for _, filter := range global.EmqConfig.Filter {
- filterCmd = filterCmd + ",{\"t\":\"0\",\"d\":\"" + filter + "\"}"
- }
- filterCmd = filterCmd + "]"
- fmt.Println(filterCmd)
- _ = global.Log4J.Info(filterCmd)
- //filterCmd = "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"},{\"t\":\"0\",\"d\":\"MIND\"},{\"t\":\"0\",\"d\":\"BW-ECG\"}]"
- Publish(client, topic, filterCmd)
- //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
- //Publish(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", constant.CmdStartScan)
- //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
- //读取UUID
- //ReadUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
- //连接设备
- //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D30033AF0", "0", "0", "0")
- //fmt.Println("link device DC0D30033AF0 over")
- //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D300335FC", "0", "0", "0")
- }
- func Publish(client mqtt.Client, topic string, message string) {
- if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
- fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
- _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
- } else {
- fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
- _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
- }
- //time.Sleep(time.Second * 1)
- time.Sleep(time.Millisecond * 100)
- }
- func SendUUID(client mqtt.Client, topic string) {
- if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, constant.CmdSetUUID); token.Wait() && token.Error() != nil {
- fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
- _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", constant.CmdSetUUID)
- } else {
- fmt.Printf("publish success, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
- _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", constant.CmdSetUUID)
- }
- //time.Sleep(time.Second * 1)
- time.Sleep(time.Millisecond * 100)
- }
- func ConnectDevice(client mqtt.Client, topic string, mac string, index string, ai string, at string) {
- message := "[{\"cmd\":\"AT+CNN=\",\"m\":\"" + mac + "\",\"i\":\"" + index + "\",\"ai\":\"" + ai + "\",\"at\":\"" + at + "\",\"l\":\"1\",\"x\":\"251\",\"relink\":\"0\",\"timeout\":\"12000\"}]"
- if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
- fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
- _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
- } else {
- fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
- _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", message)
- }
- //time.Sleep(time.Second * 1)
- time.Sleep(time.Millisecond * 100)
- }
- func SubScribe(client mqtt.Client) {
- emqConfig := global.EmqConfig
- ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
- if err != nil {
- fmt.Println("socket通道初始化失败")
- _ = global.Log4J.Error("socket通道初始化失败")
- panic(err)
- return
- }
- deviceService := GetDeviceService()
- d, i := deviceService.FindDeviceByType("")
- global.DeviceMap = make(map[string]gateway.DeviceInfo)
- if i > 0 {
- for j := 0; j < len(d); j++ {
- global.DeviceMap[d[j].Mac] = d[j]
- }
- }
- for _, topic := range emqConfig.Topic {
- topic = "/" + global.EmqConfig.GatewayMac + topic
- client.Subscribe(topic, byte(emqConfig.Qos), func(client mqtt.Client, message mqtt.Message) {
- _ = global.Log4J.Info("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
- fmt.Printf("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
- messageMap := make(map[string]string)
- var payloads []emq.Payload
- err := json.Unmarshal(message.Payload(), &payloads)
- if err != nil {
- _ = fmt.Errorf("%s", err.Error())
- return
- }
- for _, payload := range payloads {
- messageMap["state"] = payload.State
- content := ""
- sendFlag := true
- switch payload.Cmd {
- case constant.PayloadScan:
- {
- //关闭扫描
- if payload.S == "0" {
- messageMap["msgType"] = constant.MessageTypeCloseScan
- content = "关闭扫描"
- //开启扫描
- } else if payload.S == "1" {
- messageMap["msgType"] = constant.MessageTypeStartScan
- content = "开启扫描"
- } else if payload.S == "" { //接收到蓝牙扫描数据
- if strings.HasPrefix(payload.N, "MIND") {
- payload.T = "0"
- } else if strings.HasPrefix(payload.N, "BW-ECG") {
- payload.T = "1"
- }
- payload.M = transMac(payload.M)
- info := global.DeviceMap[payload.M]
- if info.ID != 0 {
- payload.AliasName = info.AliasName
- }
- messageMap["msgType"] = constant.MessageTypeDeviceScanned
- marshal, _ := json.Marshal(payload)
- content = string(marshal)
- }
- }
- case constant.PayloadFLT:
- messageMap["msgType"] = constant.MessageTypeFilter
- content = "设置过滤"
- case constant.PayloadSUUID:
- messageMap["msgType"] = constant.MessageTypeSUUID
- content = "设置UUID"
- if payload.State == "SUCCESS" {
- global.EmqConfig.FirstOpen = "1"
- } else {
- global.EmqConfig.FirstOpen = "0"
- }
- case constant.PayloadConnect:
- messageMap["msgType"] = constant.MessageTypeConnect
- //content = "设备连接"
- if payload.State == "SUCCESS" {
- realMac := transMac(payload.M)
- deviceInfo := global.DeviceMap[realMac]
- payload.T = deviceInfo.Type
- payload.M = realMac
- info := global.DeviceMap[payload.M]
- if info.ID != 0 {
- payload.AliasName = info.AliasName
- }
- marshal, _ := json.Marshal(payload)
- content = string(marshal)
- }
- for i := 0; i < 6; i++ {
- topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
- gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
- }
- case constant.PayloadDisConnect:
- messageMap["msgType"] = constant.MessageTypeDisConnect
- //content = "断开设备连接"
- realMac := transMac(payload.M)
- deviceInfo := global.DeviceMap[realMac]
- payload.T = deviceInfo.Type
- payload.M = realMac
- info := global.DeviceMap[payload.M]
- if info.ID != 0 {
- payload.AliasName = info.AliasName
- }
- marshal, _ := json.Marshal(payload)
- content = string(marshal)
- if payload.State == "SUCCESS" && payload.ReasonCode == "62" {
- messageMap["msgType"] = constant.MessageTypeConnect
- }
- //断开连接成功或连接失败,发送已连接设备列表
- for i := 0; i < 6; i++ {
- topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
- gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
- }
- case constant.PayloadQuenue:
- messageMap["msgType"] = constant.MessageTypeConnectList
- if payload.M == "" {
- sendFlag = false
- } else {
- realMac := transMac(payload.M)
- deviceInfo := global.DeviceMap[realMac]
- if deviceInfo.ID == 0 {
- continue
- } else {
- payload.T = deviceInfo.Type
- payload.N = deviceInfo.Name
- payload.M = transMac(payload.M)
- payload.AliasName = deviceInfo.AliasName
- marshal, _ := json.Marshal(payload)
- content = string(marshal)
- }
- }
- case constant.PayloadCNB:
- connectedNum, _ := strconv.Atoi(payload.N)
- for i := 0; i < connectedNum; i++ {
- topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
- gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
- }
- case constant.PayloadWNP:
- messageMap["msgType"] = constant.MessageTypeWNP
- realMac := transMac(payload.M)
- deviceInfo := global.DeviceMap[realMac]
- payload.T = deviceInfo.Type
- payload.N = deviceInfo.Name
- payload.M = transMac(payload.M)
- payload.AliasName = deviceInfo.AliasName
- marshal, _ := json.Marshal(payload)
- content = string(marshal)
- for i := 0; i < 6; i++ {
- topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
- gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
- }
- case constant.PayloadNotify: //接收到蓝牙通知数据
- //1.判断是脑电数据还是心电数据
- //1.1脑电数据直接转发
- //1.2心电数据做处理
- realMac := transMac(payload.M)
- deviceInfo := global.DeviceMap[realMac]
- var receiveData gateway.DeviceDataReceived
- receiveData.Mac = realMac
- receiveData.Value = payload.D
- receiveData.AliasName = deviceInfo.AliasName
- if deviceInfo.Type == "0" { //脑电
- messageMap["msgType"] = constant.MessageTypeEEGData
- marshal, _ := json.Marshal(receiveData)
- content = string(marshal)
- } else if deviceInfo.Type == "1" { //心电
- messageMap["msgType"] = constant.MessageTypeECGData
- flag := strings.HasPrefix(receiveData.Value, "E840") ||
- strings.HasPrefix(receiveData.Value, "E841") ||
- strings.HasPrefix(receiveData.Value, "E823") ||
- strings.HasPrefix(receiveData.Value, "E820") ||
- strings.HasPrefix(receiveData.Value, "E81F") ||
- strings.HasPrefix(receiveData.Value, "E813") ||
- strings.HasPrefix(receiveData.Value, "E810") ||
- strings.HasPrefix(receiveData.Value, "E822") ||
- strings.HasPrefix(receiveData.Value, "E826") ||
- strings.HasPrefix(receiveData.Value, "E8FF00000000")
- if !flag {
- //var ecgData []int
- ecgData := [6]int{}
- data := byteString2ByteArray(receiveData.Value[4:])
- if len(data) == 18 {
- for i := 0; i < 18; i += 3 {
- ecgData[i/3] = data[i]<<16 | data[i+1]<<8 | data[i+2]
- }
- } else {
- for len(data) < 18 {
- for i := len(data); i < 18; i++ {
- data = append(data, data[i])
- }
- }
- for i := 0; i < 18; i += 3 {
- ecgData[i/3] = int(16<<data[i]&0xFF | 8<<data[i+1]&0xFF | data[i+2]&0xFF)
- }
- }
- messageMap["msgType"] = constant.MessageTypeECGData
- receiveData.Value = "["
- for _, datum := range ecgData {
- receiveData.Value = receiveData.Value + strconv.Itoa(datum) + ","
- }
- receiveData.Value = strings.TrimRight(receiveData.Value, ",")
- receiveData.Value = receiveData.Value + "]"
- marshal, _ := json.Marshal(receiveData)
- content = string(marshal)
- }
- }
- }
- if messageMap["msgType"] != constant.MessageTypeECGData && messageMap["msgType"] != constant.MessageTypeEEGData && messageMap["msgType"] != constant.MessageTypeDeviceScanned && messageMap["msgType"] != constant.MessageTypeConnectList && messageMap["msgType"] != constant.MessageTypeDisConnect && messageMap["msgType"] != constant.MessageTypeConnect && messageMap["msgType"] != constant.MessageTypeWNP {
- if payload.State == constant.Success {
- content = content + "成功"
- } else {
- content = content + "失败"
- }
- }
- messageMap["content"] = content
- messageMap["Sender"] = "server"
- messageMap["Recipient"] = "client"
- bytes, err := json.Marshal(messageMap)
- if err != nil {
- return
- }
- if sendFlag && content != "成功" && content != "失败" {
- if messageMap["msgType"] == constant.MessageTypeConnectList {
- fmt.Printf("已连接列表===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
- _ = global.Log4J.Info("已连接列表===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
- }
- for k := range messageMap {
- delete(messageMap, k)
- }
- _ = global.Log4J.Info("发送消息===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
- err = ws.WriteMessage(websocket.TextMessage, bytes)
- if err != nil {
- log.Infoln("消息发送异常:" + err.Error())
- _ = global.Log4J.Error("消息发送异常:" + err.Error())
- }
- }
- time.Sleep(time.Millisecond * 5)
- }
- })
- }
- }
- func byteString2ByteArray(byteString string) []int {
- //var byteArr [len(byteString)]byte
- byteArr := make([]int, len(byteString)/2)
- for i := 0; i < 18; i++ {
- subStr := byteString[i*2 : i*2+2]
- decodeString, err := hex.DecodeString(subStr)
- if err != nil {
- }
- byteArr[i] = int(decodeString[0])
- }
- return byteArr
- }
- func transMac(mac string) string {
- result := ""
- for i := 0; i < len(mac); i = i + 2 {
- result = result + mac[i:i+2] + ":"
- }
- return result[0 : len(result)-1]
- }
|