emq.go 14 KB


  1. package initialize
  2. import (
  3. gateway2 "confrontation-training/api/gateway"
  4. "confrontation-training/common"
  5. "confrontation-training/constant"
  6. "confrontation-training/global"
  7. "confrontation-training/models/emq"
  8. "confrontation-training/models/gateway"
  9. "encoding/hex"
  10. "encoding/json"
  11. "fmt"
  12. mqtt "github.com/eclipse/paho.mqtt.golang"
  13. "github.com/gorilla/websocket"
  14. log "github.com/sirupsen/logrus"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. func CreateEmqClient() {
  20. config := global.Config
  21. global.EmqConfig.ClientId = config.EmqConfig.ClientId
  22. global.EmqConfig.Qos = config.EmqConfig.Qos
  23. global.EmqConfig.Topic = config.EmqConfig.Topic
  24. global.EmqConfig.UserName = config.EmqConfig.UserName
  25. global.EmqConfig.Password = config.EmqConfig.Password
  26. global.EmqConfig.Protocol = config.EmqConfig.Protocol
  27. global.EmqConfig.Port = config.EmqConfig.Port
  28. global.EmqConfig.Broker = config.EmqConfig.Broker
  29. global.EmqConfig.Filter = config.EmqConfig.Filter
  30. global.EmqConfig.GatewayMac = config.EmqConfig.GatewayMac
  31. global.EmqConfig.FirstOpen = "0"
  32. emqConfig := global.EmqConfig
  33. connectAddress := fmt.Sprintf("%s://%s:%d", emqConfig.Protocol, emqConfig.Broker, emqConfig.Port)
  34. fmt.Println("connect address:", connectAddress)
  35. _ = global.Log4J.Info("connect address:", connectAddress)
  36. opts := mqtt.NewClientOptions()
  37. opts.AddBroker(connectAddress)
  38. opts.SetUsername(emqConfig.UserName)
  39. opts.SetPassword(emqConfig.Password)
  40. opts.SetClientID(emqConfig.ClientId)
  41. opts.SetKeepAlive(time.Second * 60)
  42. global.EmqClient = mqtt.NewClient(opts)
  43. client := global.EmqClient
  44. token := client.Connect()
  45. if token.WaitTimeout(time.Second*3) && token.Error() != nil {
  46. _ = global.Log4J.Error(token.Error())
  47. }
  48. SubScribe(client)
  49. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  50. SendUUID(global.EmqClient, topic)
  51. //添加过滤
  52. fmt.Println("添加设备过滤:")
  53. _ = global.Log4J.Info("添加设备过滤:")
  54. filterCmd := "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"}"
  55. for _, filter := range global.EmqConfig.Filter {
  56. filterCmd = filterCmd + ",{\"t\":\"0\",\"d\":\"" + filter + "\"}"
  57. }
  58. filterCmd = filterCmd + "]"
  59. fmt.Println(filterCmd)
  60. _ = global.Log4J.Info(filterCmd)
  61. //filterCmd = "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"},{\"t\":\"0\",\"d\":\"MIND\"},{\"t\":\"0\",\"d\":\"BW-ECG\"}]"
  62. Publish(client, topic, filterCmd)
  63. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  64. //Publish(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", constant.CmdStartScan)
  65. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  66. //读取UUID
  67. //ReadUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  68. //连接设备
  69. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D30033AF0", "0", "0", "0")
  70. //fmt.Println("link device DC0D30033AF0 over")
  71. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D300335FC", "0", "0", "0")
  72. }
  73. func Publish(client mqtt.Client, topic string, message string) {
  74. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  75. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  76. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
  77. } else {
  78. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  79. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
  80. }
  81. //time.Sleep(time.Second * 1)
  82. time.Sleep(time.Millisecond * 100)
  83. }
  84. func SendUUID(client mqtt.Client, topic string) {
  85. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, constant.CmdSetUUID); token.Wait() && token.Error() != nil {
  86. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  87. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", constant.CmdSetUUID)
  88. } else {
  89. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  90. _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", constant.CmdSetUUID)
  91. }
  92. //time.Sleep(time.Second * 1)
  93. time.Sleep(time.Millisecond * 100)
  94. }
  95. func ConnectDevice(client mqtt.Client, topic string, mac string, index string, ai string, at string) {
  96. message := "[{\"cmd\":\"AT+CNN=\",\"m\":\"" + mac + "\",\"i\":\"" + index + "\",\"ai\":\"" + ai + "\",\"at\":\"" + at + "\",\"l\":\"1\",\"x\":\"251\",\"relink\":\"0\",\"timeout\":\"12000\"}]"
  97. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  98. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  99. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
  100. } else {
  101. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  102. _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", message)
  103. }
  104. //time.Sleep(time.Second * 1)
  105. time.Sleep(time.Millisecond * 100)
  106. }
  107. func SubScribe(client mqtt.Client) {
  108. emqConfig := global.EmqConfig
  109. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  110. if err != nil {
  111. fmt.Println("socket通道初始化失败")
  112. _ = global.Log4J.Error("socket通道初始化失败")
  113. panic(err)
  114. return
  115. }
  116. deviceService := GetDeviceService()
  117. d, i := deviceService.FindDeviceByType("")
  118. global.DeviceMap = make(map[string]gateway.DeviceInfo)
  119. if i > 0 {
  120. for j := 0; j < len(d); j++ {
  121. global.DeviceMap[d[j].Mac] = d[j]
  122. }
  123. }
  124. for _, topic := range emqConfig.Topic {
  125. topic = "/" + global.EmqConfig.GatewayMac + topic
  126. client.Subscribe(topic, byte(emqConfig.Qos), func(client mqtt.Client, message mqtt.Message) {
  127. _ = global.Log4J.Info("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
  128. fmt.Printf("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
  129. messageMap := make(map[string]string)
  130. var payloads []emq.Payload
  131. err := json.Unmarshal(message.Payload(), &payloads)
  132. if err != nil {
  133. _ = fmt.Errorf("%s", err.Error())
  134. return
  135. }
  136. for _, payload := range payloads {
  137. messageMap["state"] = payload.State
  138. content := ""
  139. sendFlag := true
  140. switch payload.Cmd {
  141. case constant.PayloadScan:
  142. {
  143. //关闭扫描
  144. if payload.S == "0" {
  145. messageMap["msgType"] = constant.MessageTypeCloseScan
  146. content = "关闭扫描"
  147. //开启扫描
  148. } else if payload.S == "1" {
  149. messageMap["msgType"] = constant.MessageTypeStartScan
  150. content = "开启扫描"
  151. } else if payload.S == "" { //接收到蓝牙扫描数据
  152. if strings.HasPrefix(payload.N, "MIND") {
  153. payload.T = "0"
  154. } else if strings.HasPrefix(payload.N, "BW-ECG") {
  155. payload.T = "1"
  156. }
  157. payload.M = transMac(payload.M)
  158. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  159. marshal, _ := json.Marshal(payload)
  160. content = string(marshal)
  161. }
  162. }
  163. case constant.PayloadFLT:
  164. messageMap["msgType"] = constant.MessageTypeFilter
  165. content = "设置过滤"
  166. case constant.PayloadSUUID:
  167. messageMap["msgType"] = constant.MessageTypeSUUID
  168. content = "设置UUID"
  169. if payload.State == "SUCCESS" {
  170. global.EmqConfig.FirstOpen = "1"
  171. } else {
  172. global.EmqConfig.FirstOpen = "0"
  173. }
  174. case constant.PayloadConnect:
  175. messageMap["msgType"] = constant.MessageTypeConnect
  176. //content = "设备连接"
  177. if payload.State == "SUCCESS" {
  178. realMac := transMac(payload.M)
  179. deviceInfo := global.DeviceMap[realMac]
  180. payload.T = deviceInfo.Type
  181. payload.M = realMac
  182. marshal, _ := json.Marshal(payload)
  183. content = string(marshal)
  184. //连接成功或连接失败,发送已连接设备列表
  185. }
  186. for i := 0; i < 6; i++ {
  187. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  188. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  189. }
  190. case constant.PayloadDisConnect:
  191. messageMap["msgType"] = constant.MessageTypeDisConnect
  192. //content = "断开设备连接"
  193. if payload.State == "SUCCESS" && payload.ReasonCode == "62" {
  194. messageMap["msgType"] = constant.MessageTypeConnect
  195. }
  196. realMac := transMac(payload.M)
  197. deviceInfo := global.DeviceMap[realMac]
  198. payload.T = deviceInfo.Type
  199. payload.M = realMac
  200. marshal, _ := json.Marshal(payload)
  201. content = string(marshal)
  202. //断开连接成功或连接失败,发送已连接设备列表
  203. for i := 0; i < 6; i++ {
  204. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  205. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  206. }
  207. case constant.PayloadQuenue:
  208. messageMap["msgType"] = constant.MessageTypeConnectList
  209. if payload.M == "" {
  210. sendFlag = false
  211. } else {
  212. realMac := transMac(payload.M)
  213. deviceInfo := global.DeviceMap[realMac]
  214. if deviceInfo.ID == 0 {
  215. continue
  216. } else {
  217. payload.T = deviceInfo.Type
  218. payload.N = deviceInfo.Name
  219. payload.M = transMac(payload.M)
  220. marshal, _ := json.Marshal(payload)
  221. content = string(marshal)
  222. }
  223. }
  224. case constant.PayloadCNB:
  225. connectedNum, _ := strconv.Atoi(payload.N)
  226. for i := 0; i < connectedNum; i++ {
  227. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  228. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  229. }
  230. case constant.PayloadWNP:
  231. messageMap["msgType"] = constant.MessageTypeWNP
  232. realMac := transMac(payload.M)
  233. deviceInfo := global.DeviceMap[realMac]
  234. payload.T = deviceInfo.Type
  235. payload.N = deviceInfo.Name
  236. payload.M = transMac(payload.M)
  237. marshal, _ := json.Marshal(payload)
  238. content = string(marshal)
  239. for i := 0; i < 6; i++ {
  240. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  241. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  242. }
  243. case constant.PayloadNotify: //接收到蓝牙通知数据
  244. //1.判断是脑电数据还是心电数据
  245. //1.1脑电数据直接转发
  246. //1.2心电数据做处理
  247. realMac := transMac(payload.M)
  248. deviceInfo := global.DeviceMap[realMac]
  249. var receiveData gateway.DeviceDataReceived
  250. receiveData.Mac = realMac
  251. receiveData.Value = payload.D
  252. if deviceInfo.Type == "0" { //脑电
  253. messageMap["msgType"] = constant.MessageTypeEEGData
  254. marshal, _ := json.Marshal(receiveData)
  255. content = string(marshal)
  256. } else if deviceInfo.Type == "1" { //心电
  257. messageMap["msgType"] = constant.MessageTypeECGData
  258. flag := strings.HasPrefix(receiveData.Value, "E840") || strings.HasPrefix(receiveData.Value, "E841") || strings.HasPrefix(receiveData.Value, "E823") || strings.HasPrefix(receiveData.Value, "E820") || strings.HasPrefix(receiveData.Value, "E81F") || strings.HasPrefix(receiveData.Value, "E813") || strings.HasPrefix(receiveData.Value, "E810") || strings.HasPrefix(receiveData.Value, "E822") || strings.HasPrefix(receiveData.Value, "E826") || strings.HasPrefix(receiveData.Value, "E8FF00000000")
  259. if !flag {
  260. //var ecgData []int
  261. ecgData := [6]int{}
  262. data := byteString2ByteArray(receiveData.Value[4:])
  263. if len(data) == 18 {
  264. for i := 0; i < 18; i += 3 {
  265. ecgData[i/3] = data[i]<<16 | data[i+1]<<8 | data[i+2]
  266. }
  267. } else {
  268. for len(data) < 18 {
  269. for i := len(data); i < 18; i++ {
  270. data = append(data, data[i])
  271. }
  272. }
  273. for i := 0; i < 18; i += 3 {
  274. ecgData[i/3] = int(16<<data[i]&0xFF | 8<<data[i+1]&0xFF | data[i+2]&0xFF)
  275. }
  276. }
  277. messageMap["msgType"] = constant.MessageTypeECGData
  278. receiveData.Value = "["
  279. for _, datum := range ecgData {
  280. receiveData.Value = receiveData.Value + strconv.Itoa(datum) + ","
  281. }
  282. receiveData.Value = strings.TrimRight(receiveData.Value, ",")
  283. receiveData.Value = receiveData.Value + "]"
  284. marshal, _ := json.Marshal(receiveData)
  285. content = string(marshal)
  286. }
  287. }
  288. }
  289. if messageMap["msgType"] != constant.MessageTypeECGData && messageMap["msgType"] != constant.MessageTypeEEGData && messageMap["msgType"] != constant.MessageTypeDeviceScanned && messageMap["msgType"] != constant.MessageTypeConnectList && messageMap["msgType"] != constant.MessageTypeDisConnect && messageMap["msgType"] != constant.MessageTypeConnect && messageMap["msgType"] != constant.MessageTypeWNP {
  290. if payload.State == constant.Success {
  291. content = content + "成功"
  292. } else {
  293. content = content + "失败"
  294. }
  295. }
  296. messageMap["content"] = content
  297. messageMap["Sender"] = "server"
  298. messageMap["Recipient"] = "client"
  299. bytes, err := json.Marshal(messageMap)
  300. if err != nil {
  301. return
  302. }
  303. if sendFlag && content != "成功" && content != "失败" {
  304. if messageMap["msgType"] == constant.MessageTypeECGData {
  305. fmt.Printf("发送心电数据===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  306. _ = global.Log4J.Info("发送心电数据===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  307. }
  308. for k := range messageMap {
  309. delete(messageMap, k)
  310. }
  311. _ = global.Log4J.Info("发送消息===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  312. err = ws.WriteMessage(websocket.TextMessage, bytes)
  313. if err != nil {
  314. log.Infoln("消息发送异常:" + err.Error())
  315. _ = global.Log4J.Error("消息发送异常:" + err.Error())
  316. }
  317. }
  318. time.Sleep(time.Millisecond * 5)
  319. }
  320. })
  321. }
  322. }
  323. func byteString2ByteArray(byteString string) []int {
  324. //var byteArr [len(byteString)]byte
  325. byteArr := make([]int, len(byteString)/2)
  326. for i := 0; i < 18; i++ {
  327. subStr := byteString[i*2 : i*2+2]
  328. decodeString, err := hex.DecodeString(subStr)
  329. if err != nil {
  330. }
  331. byteArr[i] = int(decodeString[0])
  332. }
  333. return byteArr
  334. }
  335. func transMac(mac string) string {
  336. result := ""
  337. for i := 0; i < len(mac); i = i + 2 {
  338. result = result + mac[i:i+2] + ":"
  339. }
  340. return result[0 : len(result)-1]
  341. }