emq.go 14 KB


  1. package initialize
  2. import (
  3. gateway2 "confrontation-training/api/gateway"
  4. "confrontation-training/common"
  5. "confrontation-training/constant"
  6. "confrontation-training/global"
  7. "confrontation-training/models/emq"
  8. "confrontation-training/models/gateway"
  9. "encoding/hex"
  10. "encoding/json"
  11. "fmt"
  12. mqtt "github.com/eclipse/paho.mqtt.golang"
  13. "github.com/gorilla/websocket"
  14. log "github.com/sirupsen/logrus"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. func CreateEmqClient() {
  20. config := global.Config
  21. global.EmqConfig.ClientId = config.EmqConfig.ClientId
  22. global.EmqConfig.Qos = config.EmqConfig.Qos
  23. global.EmqConfig.Topic = config.EmqConfig.Topic
  24. global.EmqConfig.UserName = config.EmqConfig.UserName
  25. global.EmqConfig.Password = config.EmqConfig.Password
  26. global.EmqConfig.Protocol = config.EmqConfig.Protocol
  27. global.EmqConfig.Port = config.EmqConfig.Port
  28. global.EmqConfig.Broker = config.EmqConfig.Broker
  29. global.EmqConfig.Filter = config.EmqConfig.Filter
  30. global.EmqConfig.GatewayMac = config.EmqConfig.GatewayMac
  31. global.EmqConfig.FirstOpen = "0"
  32. emqConfig := global.EmqConfig
  33. connectAddress := fmt.Sprintf("%s://%s:%d", emqConfig.Protocol, emqConfig.Broker, emqConfig.Port)
  34. fmt.Println("connect address:", connectAddress)
  35. _ = global.Log4J.Info("connect address:", connectAddress)
  36. opts := mqtt.NewClientOptions()
  37. opts.AddBroker(connectAddress)
  38. opts.SetUsername(emqConfig.UserName)
  39. opts.SetPassword(emqConfig.Password)
  40. opts.SetClientID(emqConfig.ClientId)
  41. opts.SetKeepAlive(time.Second * 60)
  42. global.EmqClient = mqtt.NewClient(opts)
  43. client := global.EmqClient
  44. token := client.Connect()
  45. if token.WaitTimeout(time.Second*3) && token.Error() != nil {
  46. _ = global.Log4J.Error(token.Error())
  47. }
  48. SubScribe(client)
  49. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  50. SendUUID(global.EmqClient, topic)
  51. //添加过滤
  52. fmt.Println("添加设备过滤:")
  53. _ = global.Log4J.Info("添加设备过滤:")
  54. filterCmd := "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"}"
  55. for _, filter := range global.EmqConfig.Filter {
  56. filterCmd = filterCmd + ",{\"t\":\"0\",\"d\":\"" + filter + "\"}"
  57. }
  58. filterCmd = filterCmd + "]"
  59. fmt.Println(filterCmd)
  60. _ = global.Log4J.Info(filterCmd)
  61. //filterCmd = "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"},{\"t\":\"0\",\"d\":\"MIND\"},{\"t\":\"0\",\"d\":\"BW-ECG\"}]"
  62. Publish(client, topic, filterCmd)
  63. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  64. //Publish(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", constant.CmdStartScan)
  65. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  66. //读取UUID
  67. //ReadUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  68. //连接设备
  69. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D30033AF0", "0", "0", "0")
  70. //fmt.Println("link device DC0D30033AF0 over")
  71. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D300335FC", "0", "0", "0")
  72. }
  73. func Publish(client mqtt.Client, topic string, message string) {
  74. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  75. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  76. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
  77. } else {
  78. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  79. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
  80. }
  81. //time.Sleep(time.Second * 1)
  82. time.Sleep(time.Millisecond * 100)
  83. }
  84. func SendUUID(client mqtt.Client, topic string) {
  85. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, constant.CmdSetUUID); token.Wait() && token.Error() != nil {
  86. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  87. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", constant.CmdSetUUID)
  88. } else {
  89. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  90. _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", constant.CmdSetUUID)
  91. }
  92. //time.Sleep(time.Second * 1)
  93. time.Sleep(time.Millisecond * 100)
  94. }
  95. func ConnectDevice(client mqtt.Client, topic string, mac string, index string, ai string, at string) {
  96. message := "[{\"cmd\":\"AT+CNN=\",\"m\":\"" + mac + "\",\"i\":\"" + index + "\",\"ai\":\"" + ai + "\",\"at\":\"" + at + "\",\"l\":\"1\",\"x\":\"251\",\"relink\":\"0\",\"timeout\":\"12000\"}]"
  97. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  98. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  99. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
  100. } else {
  101. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  102. _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", message)
  103. }
  104. //time.Sleep(time.Second * 1)
  105. time.Sleep(time.Millisecond * 100)
  106. }
  107. func SubScribe(client mqtt.Client) {
  108. emqConfig := global.EmqConfig
  109. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  110. if err != nil {
  111. fmt.Println("socket通道初始化失败")
  112. _ = global.Log4J.Error("socket通道初始化失败")
  113. panic(err)
  114. return
  115. }
  116. deviceService := GetDeviceService()
  117. d, i := deviceService.FindDeviceByType("")
  118. global.DeviceMap = make(map[string]gateway.DeviceInfo)
  119. if i > 0 {
  120. for j := 0; j < len(d); j++ {
  121. global.DeviceMap[d[j].Mac] = d[j]
  122. }
  123. }
  124. for _, topic := range emqConfig.Topic {
  125. topic = "/" + global.EmqConfig.GatewayMac + topic
  126. client.Subscribe(topic, byte(emqConfig.Qos), func(client mqtt.Client, message mqtt.Message) {
  127. _ = global.Log4J.Info("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
  128. fmt.Printf("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
  129. messageMap := make(map[string]string)
  130. var payloads []emq.Payload
  131. err := json.Unmarshal(message.Payload(), &payloads)
  132. if err != nil {
  133. _ = fmt.Errorf("%s", err.Error())
  134. return
  135. }
  136. for _, payload := range payloads {
  137. messageMap["state"] = payload.State
  138. content := ""
  139. sendFlag := true
  140. switch payload.Cmd {
  141. case constant.PayloadScan:
  142. {
  143. //关闭扫描
  144. if payload.S == "0" {
  145. messageMap["msgType"] = constant.MessageTypeCloseScan
  146. content = "关闭扫描"
  147. //开启扫描
  148. } else if payload.S == "1" {
  149. messageMap["msgType"] = constant.MessageTypeStartScan
  150. content = "开启扫描"
  151. } else if payload.S == "" { //接收到蓝牙扫描数据
  152. if strings.HasPrefix(payload.N, "MIND") {
  153. payload.T = "0"
  154. } else if strings.HasPrefix(payload.N, "BW-ECG") {
  155. payload.T = "1"
  156. }
  157. payload.M = transMac(payload.M)
  158. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  159. marshal, _ := json.Marshal(payload)
  160. content = string(marshal)
  161. }
  162. }
  163. case constant.PayloadFLT:
  164. messageMap["msgType"] = constant.MessageTypeFilter
  165. content = "设置过滤"
  166. case constant.PayloadSUUID:
  167. messageMap["msgType"] = constant.MessageTypeSUUID
  168. content = "设置UUID"
  169. if payload.State == "SUCCESS" {
  170. global.EmqConfig.FirstOpen = "1"
  171. } else {
  172. global.EmqConfig.FirstOpen = "0"
  173. }
  174. case constant.PayloadConnect:
  175. messageMap["msgType"] = constant.MessageTypeConnect
  176. //content = "设备连接"
  177. if payload.State == "SUCCESS" {
  178. realMac := transMac(payload.M)
  179. deviceInfo := global.DeviceMap[realMac]
  180. payload.T = deviceInfo.Type
  181. payload.M = realMac
  182. marshal, _ := json.Marshal(payload)
  183. content = string(marshal)
  184. }
  185. for i := 0; i < 6; i++ {
  186. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  187. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  188. }
  189. case constant.PayloadDisConnect:
  190. messageMap["msgType"] = constant.MessageTypeDisConnect
  191. //content = "断开设备连接"
  192. realMac := transMac(payload.M)
  193. deviceInfo := global.DeviceMap[realMac]
  194. payload.T = deviceInfo.Type
  195. payload.M = realMac
  196. marshal, _ := json.Marshal(payload)
  197. content = string(marshal)
  198. if payload.State == "SUCCESS" && payload.ReasonCode == "62" {
  199. messageMap["msgType"] = constant.MessageTypeConnect
  200. }
  201. //断开连接成功或连接失败,发送已连接设备列表
  202. for i := 0; i < 6; i++ {
  203. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  204. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  205. }
  206. case constant.PayloadQuenue:
  207. messageMap["msgType"] = constant.MessageTypeConnectList
  208. if payload.M == "" {
  209. sendFlag = false
  210. } else {
  211. realMac := transMac(payload.M)
  212. deviceInfo := global.DeviceMap[realMac]
  213. if deviceInfo.ID == 0 {
  214. continue
  215. } else {
  216. payload.T = deviceInfo.Type
  217. payload.N = deviceInfo.Name
  218. payload.M = transMac(payload.M)
  219. marshal, _ := json.Marshal(payload)
  220. content = string(marshal)
  221. }
  222. }
  223. case constant.PayloadCNB:
  224. connectedNum, _ := strconv.Atoi(payload.N)
  225. for i := 0; i < connectedNum; i++ {
  226. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  227. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  228. }
  229. case constant.PayloadWNP:
  230. messageMap["msgType"] = constant.MessageTypeWNP
  231. realMac := transMac(payload.M)
  232. deviceInfo := global.DeviceMap[realMac]
  233. payload.T = deviceInfo.Type
  234. payload.N = deviceInfo.Name
  235. payload.M = transMac(payload.M)
  236. marshal, _ := json.Marshal(payload)
  237. content = string(marshal)
  238. for i := 0; i < 6; i++ {
  239. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  240. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  241. }
  242. case constant.PayloadNotify: //接收到蓝牙通知数据
  243. //1.判断是脑电数据还是心电数据
  244. //1.1脑电数据直接转发
  245. //1.2心电数据做处理
  246. realMac := transMac(payload.M)
  247. deviceInfo := global.DeviceMap[realMac]
  248. var receiveData gateway.DeviceDataReceived
  249. receiveData.Mac = realMac
  250. receiveData.Value = payload.D
  251. if deviceInfo.Type == "0" { //脑电
  252. messageMap["msgType"] = constant.MessageTypeEEGData
  253. marshal, _ := json.Marshal(receiveData)
  254. content = string(marshal)
  255. } else if deviceInfo.Type == "1" { //心电
  256. messageMap["msgType"] = constant.MessageTypeECGData
  257. flag := strings.HasPrefix(receiveData.Value, "E840") || strings.HasPrefix(receiveData.Value, "E841") || strings.HasPrefix(receiveData.Value, "E823") || strings.HasPrefix(receiveData.Value, "E820") || strings.HasPrefix(receiveData.Value, "E81F") || strings.HasPrefix(receiveData.Value, "E813") || strings.HasPrefix(receiveData.Value, "E810") || strings.HasPrefix(receiveData.Value, "E822") || strings.HasPrefix(receiveData.Value, "E826") || strings.HasPrefix(receiveData.Value, "E8FF00000000")
  258. if !flag {
  259. //var ecgData []int
  260. ecgData := [6]int{}
  261. data := byteString2ByteArray(receiveData.Value[4:])
  262. if len(data) == 18 {
  263. for i := 0; i < 18; i += 3 {
  264. ecgData[i/3] = data[i]<<16 | data[i+1]<<8 | data[i+2]
  265. }
  266. } else {
  267. for len(data) < 18 {
  268. for i := len(data); i < 18; i++ {
  269. data = append(data, data[i])
  270. }
  271. }
  272. for i := 0; i < 18; i += 3 {
  273. ecgData[i/3] = int(16<<data[i]&0xFF | 8<<data[i+1]&0xFF | data[i+2]&0xFF)
  274. }
  275. }
  276. messageMap["msgType"] = constant.MessageTypeECGData
  277. receiveData.Value = "["
  278. for _, datum := range ecgData {
  279. receiveData.Value = receiveData.Value + strconv.Itoa(datum) + ","
  280. }
  281. receiveData.Value = strings.TrimRight(receiveData.Value, ",")
  282. receiveData.Value = receiveData.Value + "]"
  283. marshal, _ := json.Marshal(receiveData)
  284. content = string(marshal)
  285. }
  286. }
  287. }
  288. if messageMap["msgType"] != constant.MessageTypeECGData && messageMap["msgType"] != constant.MessageTypeEEGData && messageMap["msgType"] != constant.MessageTypeDeviceScanned && messageMap["msgType"] != constant.MessageTypeConnectList && messageMap["msgType"] != constant.MessageTypeDisConnect && messageMap["msgType"] != constant.MessageTypeConnect && messageMap["msgType"] != constant.MessageTypeWNP {
  289. if payload.State == constant.Success {
  290. content = content + "成功"
  291. } else {
  292. content = content + "失败"
  293. }
  294. }
  295. messageMap["content"] = content
  296. messageMap["Sender"] = "server"
  297. messageMap["Recipient"] = "client"
  298. bytes, err := json.Marshal(messageMap)
  299. if err != nil {
  300. return
  301. }
  302. if sendFlag && content != "成功" && content != "失败" {
  303. if messageMap["msgType"] == constant.MessageTypeConnectList {
  304. fmt.Printf("已连接列表===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  305. _ = global.Log4J.Info("已连接列表===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  306. }
  307. for k := range messageMap {
  308. delete(messageMap, k)
  309. }
  310. _ = global.Log4J.Info("发送消息===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  311. err = ws.WriteMessage(websocket.TextMessage, bytes)
  312. if err != nil {
  313. log.Infoln("消息发送异常:" + err.Error())
  314. _ = global.Log4J.Error("消息发送异常:" + err.Error())
  315. }
  316. }
  317. time.Sleep(time.Millisecond * 5)
  318. }
  319. })
  320. }
  321. }
  322. func byteString2ByteArray(byteString string) []int {
  323. //var byteArr [len(byteString)]byte
  324. byteArr := make([]int, len(byteString)/2)
  325. for i := 0; i < 18; i++ {
  326. subStr := byteString[i*2 : i*2+2]
  327. decodeString, err := hex.DecodeString(subStr)
  328. if err != nil {
  329. }
  330. byteArr[i] = int(decodeString[0])
  331. }
  332. return byteArr
  333. }
  334. func transMac(mac string) string {
  335. result := ""
  336. for i := 0; i < len(mac); i = i + 2 {
  337. result = result + mac[i:i+2] + ":"
  338. }
  339. return result[0 : len(result)-1]
  340. }