emq.go 14 KB


  1. package initialize
  2. import (
  3. gateway2 "confrontation-training/api/gateway"
  4. "confrontation-training/common"
  5. "confrontation-training/constant"
  6. "confrontation-training/global"
  7. "confrontation-training/models/emq"
  8. "confrontation-training/models/gateway"
  9. "encoding/hex"
  10. "encoding/json"
  11. "fmt"
  12. mqtt "github.com/eclipse/paho.mqtt.golang"
  13. "github.com/gorilla/websocket"
  14. log "github.com/sirupsen/logrus"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. func CreateEmqClient() {
  20. config := global.Config
  21. global.EmqConfig.ClientId = config.EmqConfig.ClientId
  22. global.EmqConfig.Qos = config.EmqConfig.Qos
  23. global.EmqConfig.Topic = config.EmqConfig.Topic
  24. global.EmqConfig.UserName = config.EmqConfig.UserName
  25. global.EmqConfig.Password = config.EmqConfig.Password
  26. global.EmqConfig.Protocol = config.EmqConfig.Protocol
  27. global.EmqConfig.Port = config.EmqConfig.Port
  28. global.EmqConfig.Broker = config.EmqConfig.Broker
  29. global.EmqConfig.Filter = config.EmqConfig.Filter
  30. global.EmqConfig.GatewayMac = config.EmqConfig.GatewayMac
  31. global.EmqConfig.FirstOpen = "0"
  32. emqConfig := global.EmqConfig
  33. connectAddress := fmt.Sprintf("%s://%s:%d", emqConfig.Protocol, emqConfig.Broker, emqConfig.Port)
  34. fmt.Println("connect address:", connectAddress)
  35. _ = global.Log4J.Info("connect address:", connectAddress)
  36. opts := mqtt.NewClientOptions()
  37. opts.AddBroker(connectAddress)
  38. opts.SetUsername(emqConfig.UserName)
  39. opts.SetPassword(emqConfig.Password)
  40. opts.SetClientID(emqConfig.ClientId)
  41. opts.SetKeepAlive(time.Second * 60)
  42. global.EmqClient = mqtt.NewClient(opts)
  43. client := global.EmqClient
  44. token := client.Connect()
  45. if token.WaitTimeout(time.Second*3) && token.Error() != nil {
  46. _ = global.Log4J.Error(token.Error())
  47. }
  48. SubScribe(client)
  49. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  50. SendUUID(global.EmqClient, topic)
  51. //添加过滤
  52. fmt.Println("添加设备过滤:")
  53. _ = global.Log4J.Info("添加设备过滤:")
  54. filterCmd := "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"}"
  55. for _, filter := range global.EmqConfig.Filter {
  56. filterCmd = filterCmd + ",{\"t\":\"0\",\"d\":\"" + filter + "\"}"
  57. }
  58. filterCmd = filterCmd + "]"
  59. fmt.Println(filterCmd)
  60. _ = global.Log4J.Info(filterCmd)
  61. //filterCmd = "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"},{\"t\":\"0\",\"d\":\"MIND\"},{\"t\":\"0\",\"d\":\"BW-ECG\"}]"
  62. Publish(client, topic, filterCmd)
  63. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  64. //Publish(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", constant.CmdStartScan)
  65. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  66. //读取UUID
  67. //ReadUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  68. //连接设备
  69. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D30033AF0", "0", "0", "0")
  70. //fmt.Println("link device DC0D30033AF0 over")
  71. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D300335FC", "0", "0", "0")
  72. }
  73. func Publish(client mqtt.Client, topic string, message string) {
  74. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  75. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  76. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
  77. } else {
  78. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  79. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
  80. }
  81. //time.Sleep(time.Second * 1)
  82. time.Sleep(time.Millisecond * 100)
  83. }
  84. func SendUUID(client mqtt.Client, topic string) {
  85. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, constant.CmdSetUUID); token.Wait() && token.Error() != nil {
  86. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  87. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", constant.CmdSetUUID)
  88. } else {
  89. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  90. _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", constant.CmdSetUUID)
  91. }
  92. //time.Sleep(time.Second * 1)
  93. time.Sleep(time.Millisecond * 100)
  94. }
  95. func ConnectDevice(client mqtt.Client, topic string, mac string, index string, ai string, at string) {
  96. message := "[{\"cmd\":\"AT+CNN=\",\"m\":\"" + mac + "\",\"i\":\"" + index + "\",\"ai\":\"" + ai + "\",\"at\":\"" + at + "\",\"l\":\"1\",\"x\":\"251\",\"relink\":\"0\",\"timeout\":\"12000\"}]"
  97. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  98. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  99. _ = global.Log4J.Info("publish failed, topic: ", topic, "payload: ", message)
  100. } else {
  101. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  102. _ = global.Log4J.Info("publish success, topic: ", topic, "payload: ", message)
  103. }
  104. //time.Sleep(time.Second * 1)
  105. time.Sleep(time.Millisecond * 100)
  106. }
  107. func SubScribe(client mqtt.Client) {
  108. emqConfig := global.EmqConfig
  109. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  110. if err != nil {
  111. fmt.Println("socket通道初始化失败")
  112. _ = global.Log4J.Error("socket通道初始化失败")
  113. panic(err)
  114. return
  115. }
  116. deviceService := GetDeviceService()
  117. d, i := deviceService.FindDeviceByType("")
  118. global.DeviceMap = make(map[string]gateway.DeviceInfo)
  119. if i > 0 {
  120. for j := 0; j < len(d); j++ {
  121. global.DeviceMap[d[j].Mac] = d[j]
  122. }
  123. }
  124. for _, topic := range emqConfig.Topic {
  125. topic = "/" + global.EmqConfig.GatewayMac + topic
  126. client.Subscribe(topic, byte(emqConfig.Qos), func(client mqtt.Client, message mqtt.Message) {
  127. _ = global.Log4J.Info("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
  128. fmt.Printf("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
  129. messageMap := make(map[string]string)
  130. var payloads []emq.Payload
  131. err := json.Unmarshal(message.Payload(), &payloads)
  132. if err != nil {
  133. _ = fmt.Errorf("%s", err.Error())
  134. return
  135. }
  136. for _, payload := range payloads {
  137. messageMap["state"] = payload.State
  138. content := ""
  139. sendFlag := true
  140. switch payload.Cmd {
  141. case constant.PayloadScan:
  142. {
  143. //关闭扫描
  144. if payload.S == "0" {
  145. messageMap["msgType"] = constant.MessageTypeCloseScan
  146. content = "关闭扫描"
  147. //开启扫描
  148. } else if payload.S == "1" {
  149. messageMap["msgType"] = constant.MessageTypeStartScan
  150. content = "开启扫描"
  151. } else if payload.S == "" { //接收到蓝牙扫描数据
  152. if strings.HasPrefix(payload.N, "MIND") {
  153. payload.T = "0"
  154. } else if strings.HasPrefix(payload.N, "BW-ECG") {
  155. payload.T = "1"
  156. }
  157. payload.M = transMac(payload.M)
  158. info := global.DeviceMap[payload.M]
  159. if info.ID != 0 {
  160. payload.AliasName = info.AliasName
  161. }
  162. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  163. marshal, _ := json.Marshal(payload)
  164. content = string(marshal)
  165. }
  166. }
  167. case constant.PayloadFLT:
  168. messageMap["msgType"] = constant.MessageTypeFilter
  169. content = "设置过滤"
  170. case constant.PayloadSUUID:
  171. messageMap["msgType"] = constant.MessageTypeSUUID
  172. content = "设置UUID"
  173. if payload.State == "SUCCESS" {
  174. global.EmqConfig.FirstOpen = "1"
  175. } else {
  176. global.EmqConfig.FirstOpen = "0"
  177. }
  178. case constant.PayloadConnect:
  179. messageMap["msgType"] = constant.MessageTypeConnect
  180. //content = "设备连接"
  181. if payload.State == "SUCCESS" {
  182. realMac := transMac(payload.M)
  183. deviceInfo := global.DeviceMap[realMac]
  184. payload.T = deviceInfo.Type
  185. payload.M = realMac
  186. info := global.DeviceMap[payload.M]
  187. if info.ID != 0 {
  188. payload.AliasName = info.AliasName
  189. }
  190. marshal, _ := json.Marshal(payload)
  191. content = string(marshal)
  192. }
  193. for i := 0; i < 6; i++ {
  194. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  195. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  196. }
  197. case constant.PayloadDisConnect:
  198. messageMap["msgType"] = constant.MessageTypeDisConnect
  199. //content = "断开设备连接"
  200. realMac := transMac(payload.M)
  201. deviceInfo := global.DeviceMap[realMac]
  202. payload.T = deviceInfo.Type
  203. payload.M = realMac
  204. info := global.DeviceMap[payload.M]
  205. if info.ID != 0 {
  206. payload.AliasName = info.AliasName
  207. }
  208. marshal, _ := json.Marshal(payload)
  209. content = string(marshal)
  210. if payload.State == "SUCCESS" && payload.ReasonCode == "62" {
  211. messageMap["msgType"] = constant.MessageTypeConnect
  212. }
  213. //断开连接成功或连接失败,发送已连接设备列表
  214. for i := 0; i < 6; i++ {
  215. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  216. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  217. }
  218. case constant.PayloadQuenue:
  219. messageMap["msgType"] = constant.MessageTypeConnectList
  220. if payload.M == "" {
  221. sendFlag = false
  222. } else {
  223. realMac := transMac(payload.M)
  224. deviceInfo := global.DeviceMap[realMac]
  225. if deviceInfo.ID == 0 {
  226. continue
  227. } else {
  228. payload.T = deviceInfo.Type
  229. payload.N = deviceInfo.Name
  230. payload.M = transMac(payload.M)
  231. payload.AliasName = deviceInfo.AliasName
  232. marshal, _ := json.Marshal(payload)
  233. content = string(marshal)
  234. }
  235. }
  236. case constant.PayloadCNB:
  237. connectedNum, _ := strconv.Atoi(payload.N)
  238. for i := 0; i < connectedNum; i++ {
  239. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  240. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  241. }
  242. case constant.PayloadWNP:
  243. messageMap["msgType"] = constant.MessageTypeWNP
  244. realMac := transMac(payload.M)
  245. deviceInfo := global.DeviceMap[realMac]
  246. payload.T = deviceInfo.Type
  247. payload.N = deviceInfo.Name
  248. payload.M = transMac(payload.M)
  249. payload.AliasName = deviceInfo.AliasName
  250. marshal, _ := json.Marshal(payload)
  251. content = string(marshal)
  252. for i := 0; i < 6; i++ {
  253. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  254. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  255. }
  256. case constant.PayloadNotify: //接收到蓝牙通知数据
  257. //1.判断是脑电数据还是心电数据
  258. //1.1脑电数据直接转发
  259. //1.2心电数据做处理
  260. realMac := transMac(payload.M)
  261. deviceInfo := global.DeviceMap[realMac]
  262. var receiveData gateway.DeviceDataReceived
  263. receiveData.Mac = realMac
  264. receiveData.Value = payload.D
  265. receiveData.AliasName = deviceInfo.AliasName
  266. if deviceInfo.Type == "0" { //脑电
  267. messageMap["msgType"] = constant.MessageTypeEEGData
  268. marshal, _ := json.Marshal(receiveData)
  269. content = string(marshal)
  270. } else if deviceInfo.Type == "1" { //心电
  271. messageMap["msgType"] = constant.MessageTypeECGData
  272. flag := strings.HasPrefix(receiveData.Value, "E840") ||
  273. strings.HasPrefix(receiveData.Value, "E841") ||
  274. strings.HasPrefix(receiveData.Value, "E823") ||
  275. strings.HasPrefix(receiveData.Value, "E820") ||
  276. strings.HasPrefix(receiveData.Value, "E81F") ||
  277. strings.HasPrefix(receiveData.Value, "E813") ||
  278. strings.HasPrefix(receiveData.Value, "E810") ||
  279. strings.HasPrefix(receiveData.Value, "E822") ||
  280. strings.HasPrefix(receiveData.Value, "E826") ||
  281. strings.HasPrefix(receiveData.Value, "E8FF00000000")
  282. if !flag {
  283. //var ecgData []int
  284. ecgData := [6]int{}
  285. data := byteString2ByteArray(receiveData.Value[4:])
  286. if len(data) == 18 {
  287. for i := 0; i < 18; i += 3 {
  288. ecgData[i/3] = data[i]<<16 | data[i+1]<<8 | data[i+2]
  289. }
  290. } else {
  291. for len(data) < 18 {
  292. for i := len(data); i < 18; i++ {
  293. data = append(data, data[i])
  294. }
  295. }
  296. for i := 0; i < 18; i += 3 {
  297. ecgData[i/3] = int(16<<data[i]&0xFF | 8<<data[i+1]&0xFF | data[i+2]&0xFF)
  298. }
  299. }
  300. messageMap["msgType"] = constant.MessageTypeECGData
  301. receiveData.Value = "["
  302. for _, datum := range ecgData {
  303. receiveData.Value = receiveData.Value + strconv.Itoa(datum) + ","
  304. }
  305. receiveData.Value = strings.TrimRight(receiveData.Value, ",")
  306. receiveData.Value = receiveData.Value + "]"
  307. marshal, _ := json.Marshal(receiveData)
  308. content = string(marshal)
  309. }
  310. }
  311. }
  312. if messageMap["msgType"] != constant.MessageTypeECGData && messageMap["msgType"] != constant.MessageTypeEEGData && messageMap["msgType"] != constant.MessageTypeDeviceScanned && messageMap["msgType"] != constant.MessageTypeConnectList && messageMap["msgType"] != constant.MessageTypeDisConnect && messageMap["msgType"] != constant.MessageTypeConnect && messageMap["msgType"] != constant.MessageTypeWNP {
  313. if payload.State == constant.Success {
  314. content = content + "成功"
  315. } else {
  316. content = content + "失败"
  317. }
  318. }
  319. messageMap["content"] = content
  320. messageMap["Sender"] = "server"
  321. messageMap["Recipient"] = "client"
  322. bytes, err := json.Marshal(messageMap)
  323. if err != nil {
  324. return
  325. }
  326. if sendFlag && content != "成功" && content != "失败" {
  327. if messageMap["msgType"] == constant.MessageTypeConnectList {
  328. fmt.Printf("已连接列表===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  329. _ = global.Log4J.Info("已连接列表===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  330. }
  331. for k := range messageMap {
  332. delete(messageMap, k)
  333. }
  334. _ = global.Log4J.Info("发送消息===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  335. err = ws.WriteMessage(websocket.TextMessage, bytes)
  336. if err != nil {
  337. log.Infoln("消息发送异常:" + err.Error())
  338. _ = global.Log4J.Error("消息发送异常:" + err.Error())
  339. }
  340. }
  341. time.Sleep(time.Millisecond * 5)
  342. }
  343. })
  344. }
  345. }
  346. func byteString2ByteArray(byteString string) []int {
  347. //var byteArr [len(byteString)]byte
  348. byteArr := make([]int, len(byteString)/2)
  349. for i := 0; i < 18; i++ {
  350. subStr := byteString[i*2 : i*2+2]
  351. decodeString, err := hex.DecodeString(subStr)
  352. if err != nil {
  353. }
  354. byteArr[i] = int(decodeString[0])
  355. }
  356. return byteArr
  357. }
  358. func transMac(mac string) string {
  359. result := ""
  360. for i := 0; i < len(mac); i = i + 2 {
  361. result = result + mac[i:i+2] + ":"
  362. }
  363. return result[0 : len(result)-1]
  364. }