emq.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package initialize
  2. import (
  3. gateway2 "confrontation-training/api/gateway"
  4. "confrontation-training/constant"
  5. "confrontation-training/global"
  6. "confrontation-training/models/emq"
  7. "confrontation-training/models/gateway"
  8. "encoding/json"
  9. "fmt"
  10. mqtt "github.com/eclipse/paho.mqtt.golang"
  11. "github.com/gorilla/websocket"
  12. "log"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. func CreateEmqClient() {
  18. config := global.Config
  19. global.EmqConfig.ClientId = config.EmqConfig.ClientId
  20. global.EmqConfig.Qos = config.EmqConfig.Qos
  21. global.EmqConfig.Topic = config.EmqConfig.Topic
  22. global.EmqConfig.UserName = config.EmqConfig.UserName
  23. global.EmqConfig.Password = config.EmqConfig.Password
  24. global.EmqConfig.Protocol = config.EmqConfig.Protocol
  25. global.EmqConfig.Port = config.EmqConfig.Port
  26. global.EmqConfig.Broker = config.EmqConfig.Broker
  27. global.EmqConfig.Filter = config.EmqConfig.Filter
  28. global.EmqConfig.GatewayMac = config.EmqConfig.GatewayMac
  29. emqConfig := global.EmqConfig
  30. connectAddress := fmt.Sprintf("%s://%s:%d", emqConfig.Protocol, emqConfig.Broker, emqConfig.Port)
  31. fmt.Println("connect address:", connectAddress)
  32. opts := mqtt.NewClientOptions()
  33. opts.AddBroker(connectAddress)
  34. opts.SetUsername(emqConfig.UserName)
  35. opts.SetPassword(emqConfig.Password)
  36. opts.SetClientID(emqConfig.ClientId)
  37. opts.SetKeepAlive(time.Second * 60)
  38. global.EmqClient = mqtt.NewClient(opts)
  39. client := global.EmqClient
  40. token := client.Connect()
  41. if token.WaitTimeout(time.Second*3) && token.Error() != nil {
  42. log.Fatal(token.Error())
  43. }
  44. SubScribe(client)
  45. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  46. //Publish(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", constant.CmdStartScan)
  47. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  48. //读取UUID
  49. //ReadUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  50. //连接设备
  51. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D30033AF0", "0", "0", "0")
  52. //fmt.Println("link device DC0D30033AF0 over")
  53. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D300335FC", "0", "0", "0")
  54. }
  55. func Publish(client mqtt.Client, topic string, message string) {
  56. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  57. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  58. } else {
  59. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  60. }
  61. //time.Sleep(time.Second * 1)
  62. time.Sleep(time.Millisecond * 100)
  63. }
  64. func SendUUID(client mqtt.Client, topic string) {
  65. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, constant.CmdSetUUID); token.Wait() && token.Error() != nil {
  66. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  67. } else {
  68. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  69. }
  70. //time.Sleep(time.Second * 1)
  71. time.Sleep(time.Millisecond * 100)
  72. }
  73. func ConnectDevice(client mqtt.Client, topic string, mac string, index string, ai string, at string) {
  74. message := "[{\"cmd\":\"AT+CNN=\",\"m\":\"" + mac + "\",\"i\":\"" + index + "\",\"ai\":\"" + ai + "\",\"at\":\"" + at + "\",\"l\":\"1\",\"x\":\"251\",\"relink\":\"1\",\"timeout\":\"12000\"}]"
  75. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  76. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  77. } else {
  78. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  79. }
  80. //time.Sleep(time.Second * 1)
  81. time.Sleep(time.Millisecond * 100)
  82. }
  83. func SubScribe(client mqtt.Client) {
  84. emqConfig := global.EmqConfig
  85. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  86. if err != nil {
  87. fmt.Println("socket通道初始化失败")
  88. panic(err)
  89. return
  90. }
  91. deviceService := GetDeviceService()
  92. d, i := deviceService.FindDeviceByType("")
  93. global.DeviceMap = make(map[string]gateway.DeviceInfo)
  94. if i > 0 {
  95. for j := 0; j < len(d); j++ {
  96. global.DeviceMap[d[j].Mac] = d[j]
  97. }
  98. }
  99. for _, topic := range emqConfig.Topic {
  100. topic = "/" + global.EmqConfig.GatewayMac + topic
  101. client.Subscribe(topic, byte(emqConfig.Qos), func(client mqtt.Client, message mqtt.Message) {
  102. //fmt.Printf("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
  103. messageMap := make(map[string]string)
  104. var payloads []emq.Payload
  105. err := json.Unmarshal(message.Payload(), &payloads)
  106. if err != nil {
  107. _ = fmt.Errorf("%s", err.Error())
  108. return
  109. }
  110. for _, payload := range payloads {
  111. messageMap["state"] = payload.State
  112. content := ""
  113. sendFlag := true
  114. switch payload.Cmd {
  115. case constant.PayloadScan:
  116. {
  117. //关闭扫描
  118. if payload.S == "0" {
  119. messageMap["msgType"] = constant.MessageTypeCloseScan
  120. content = "关闭扫描"
  121. //开启扫描
  122. } else if payload.S == "1" {
  123. messageMap["msgType"] = constant.MessageTypeStartScan
  124. content = "开启扫描"
  125. } else if payload.S == "" { //接收到蓝牙扫描数据
  126. if strings.HasPrefix(payload.N, "MIND") {
  127. payload.T = "0"
  128. } else if strings.HasPrefix(payload.N, "BW-ECG") {
  129. payload.T = "1"
  130. }
  131. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  132. marshal, _ := json.Marshal(payload)
  133. content = string(marshal)
  134. }
  135. }
  136. case constant.PayloadFLT:
  137. messageMap["msgType"] = constant.MessageTypeFilter
  138. content = "设置过滤"
  139. case constant.PayloadSUUID:
  140. messageMap["msgType"] = constant.MessageTypeSUUID
  141. content = "设置UUID"
  142. case constant.PayloadConnect:
  143. messageMap["msgType"] = constant.MessageTypeConnect
  144. content = "设备连接"
  145. case constant.PayloadDisConnect:
  146. messageMap["msgType"] = constant.MessageTypeDisConnect
  147. //content = "断开设备连接"
  148. marshal, _ := json.Marshal(payload)
  149. content = string(marshal)
  150. case constant.PayloadQuenue:
  151. messageMap["msgType"] = constant.MessageTypeConnectList
  152. if payload.M == "" {
  153. sendFlag = false
  154. } else {
  155. realMac := transMac(payload.M)
  156. deviceInfo := global.DeviceMap[realMac]
  157. if deviceInfo.ID == 0 {
  158. continue
  159. } else {
  160. payload.T = deviceInfo.Type
  161. payload.N = deviceInfo.Name
  162. marshal, _ := json.Marshal(payload)
  163. content = string(marshal)
  164. }
  165. }
  166. case constant.PayloadCNB:
  167. connectedNum, _ := strconv.Atoi(payload.N)
  168. for i := 0; i < connectedNum; i++ {
  169. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  170. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  171. }
  172. case constant.PayloadNotify: //接收到蓝牙通知数据
  173. //1.判断是脑电数据还是心电数据
  174. //1.1脑电数据直接转发
  175. //1.2心电数据做处理
  176. realMac := transMac(payload.M)
  177. deviceInfo := global.DeviceMap[realMac]
  178. var receiveData gateway.DeviceDataReceived
  179. receiveData.Mac = payload.M
  180. receiveData.Value = payload.D
  181. if deviceInfo.Type == "0" { //脑电
  182. messageMap["msgType"] = constant.MessageTypeEEGData
  183. marshal, _ := json.Marshal(receiveData)
  184. content = string(marshal)
  185. } else if deviceInfo.Type == "1" { //心电
  186. messageMap["msgType"] = constant.MessageTypeECGData
  187. flag := strings.HasPrefix(receiveData.Value, "E840") || strings.HasPrefix(receiveData.Value, "E841") || strings.HasPrefix(receiveData.Value, "E823") || strings.HasPrefix(receiveData.Value, "E820") || strings.HasPrefix(receiveData.Value, "E81F") || strings.HasPrefix(receiveData.Value, "E813") || strings.HasPrefix(receiveData.Value, "E810") || strings.HasPrefix(receiveData.Value, "E822") || strings.HasPrefix(receiveData.Value, "E826") || strings.HasPrefix(receiveData.Value, "E8FF00000000")
  188. if !flag {
  189. //var ecgData []int
  190. ecgData := [12]int{}
  191. data := []byte(receiveData.Value[4:])
  192. if len(data) == 36 {
  193. for i := 0; i < 36; i += 3 {
  194. ecgData[i/3] = int(16<<data[i]&0xFF | 8<<data[i+1]&0xFF | data[i+2]&0xFF)
  195. }
  196. } else {
  197. for len(data) < 36 {
  198. for i := len(data); i < 36; i++ {
  199. data = append(data, data[i])
  200. }
  201. }
  202. for i := 0; i < 36; i += 3 {
  203. ecgData[i/3] = int(16<<data[i]&0xFF | 8<<data[i+1]&0xFF | data[i+2]&0xFF)
  204. }
  205. }
  206. messageMap["msgType"] = constant.MessageTypeECGData
  207. receiveData.Value = "["
  208. for _, datum := range ecgData {
  209. receiveData.Value = receiveData.Value + strconv.Itoa(datum) + ","
  210. }
  211. receiveData.Value = receiveData.Value + "]"
  212. marshal, _ := json.Marshal(receiveData)
  213. content = string(marshal)
  214. }
  215. }
  216. }
  217. if messageMap["msgType"] != constant.MessageTypeECGData && messageMap["msgType"] != constant.MessageTypeEEGData && messageMap["msgType"] != constant.MessageTypeDeviceScanned && messageMap["msgType"] != constant.MessageTypeConnectList && messageMap["msgType"] != constant.MessageTypeDisConnect {
  218. if payload.State == constant.Success {
  219. content = content + "成功"
  220. } else {
  221. content = content + "失败"
  222. }
  223. }
  224. messageMap["content"] = content
  225. messageMap["Sender"] = "server"
  226. messageMap["Recipient"] = "client"
  227. bytes, err := json.Marshal(messageMap)
  228. if err != nil {
  229. return
  230. }
  231. if sendFlag && content != "成功" && content != "失败" {
  232. err = ws.WriteMessage(websocket.TextMessage, bytes)
  233. if err != nil {
  234. fmt.Println("消息发送异常:" + err.Error())
  235. }
  236. }
  237. time.Sleep(time.Millisecond * 5)
  238. }
  239. })
  240. }
  241. }
  242. func transMac(mac string) string {
  243. result := ""
  244. for i := 0; i < len(mac); i = i + 2 {
  245. result = result + mac[i:i+2] + ":"
  246. }
  247. return result[0 : len(result)-1]
  248. }