emq.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package initialize
  2. import (
  3. gateway2 "confrontation-training/api/gateway"
  4. "confrontation-training/common"
  5. "confrontation-training/constant"
  6. "confrontation-training/global"
  7. "confrontation-training/models/emq"
  8. "confrontation-training/models/gateway"
  9. "encoding/hex"
  10. "encoding/json"
  11. "fmt"
  12. mqtt "github.com/eclipse/paho.mqtt.golang"
  13. "github.com/gorilla/websocket"
  14. "log"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. func CreateEmqClient() {
  20. config := global.Config
  21. global.EmqConfig.ClientId = config.EmqConfig.ClientId
  22. global.EmqConfig.Qos = config.EmqConfig.Qos
  23. global.EmqConfig.Topic = config.EmqConfig.Topic
  24. global.EmqConfig.UserName = config.EmqConfig.UserName
  25. global.EmqConfig.Password = config.EmqConfig.Password
  26. global.EmqConfig.Protocol = config.EmqConfig.Protocol
  27. global.EmqConfig.Port = config.EmqConfig.Port
  28. global.EmqConfig.Broker = config.EmqConfig.Broker
  29. global.EmqConfig.Filter = config.EmqConfig.Filter
  30. global.EmqConfig.GatewayMac = config.EmqConfig.GatewayMac
  31. global.EmqConfig.FirstOpen = "0"
  32. emqConfig := global.EmqConfig
  33. connectAddress := fmt.Sprintf("%s://%s:%d", emqConfig.Protocol, emqConfig.Broker, emqConfig.Port)
  34. fmt.Println("connect address:", connectAddress)
  35. opts := mqtt.NewClientOptions()
  36. opts.AddBroker(connectAddress)
  37. opts.SetUsername(emqConfig.UserName)
  38. opts.SetPassword(emqConfig.Password)
  39. opts.SetClientID(emqConfig.ClientId)
  40. opts.SetKeepAlive(time.Second * 60)
  41. global.EmqClient = mqtt.NewClient(opts)
  42. client := global.EmqClient
  43. token := client.Connect()
  44. if token.WaitTimeout(time.Second*3) && token.Error() != nil {
  45. log.Fatal(token.Error())
  46. }
  47. SubScribe(client)
  48. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  49. SendUUID(global.EmqClient, topic)
  50. //添加过滤
  51. fmt.Println("添加设备过滤:")
  52. filterCmd := "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"}"
  53. for _, filter := range global.EmqConfig.Filter {
  54. filterCmd = filterCmd + ",{\"t\":\"0\",\"d\":\"" + filter + "\"}"
  55. }
  56. filterCmd = filterCmd + "]"
  57. fmt.Println(filterCmd)
  58. //filterCmd = "[{\"cmd\":\"AT+FLT=\",\"total\":\"2\"},{\"t\":\"0\",\"d\":\"MIND\"},{\"t\":\"0\",\"d\":\"BW-ECG\"}]"
  59. Publish(client, topic, filterCmd)
  60. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  61. //Publish(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", constant.CmdStartScan)
  62. //SendUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  63. //读取UUID
  64. //ReadUUID(client, "/EE3870DA24C4/connect_packet/connect1_subscribe")
  65. //连接设备
  66. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D30033AF0", "0", "0", "0")
  67. //fmt.Println("link device DC0D30033AF0 over")
  68. //ConnectDevice(client, "/EE3870DA24C4/connect_packet/connect1_subscribe", "DC0D300335FC", "0", "0", "0")
  69. }
  70. func Publish(client mqtt.Client, topic string, message string) {
  71. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  72. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  73. } else {
  74. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  75. }
  76. //time.Sleep(time.Second * 1)
  77. time.Sleep(time.Millisecond * 100)
  78. }
  79. func SendUUID(client mqtt.Client, topic string) {
  80. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, constant.CmdSetUUID); token.Wait() && token.Error() != nil {
  81. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  82. } else {
  83. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, constant.CmdSetUUID)
  84. }
  85. //time.Sleep(time.Second * 1)
  86. time.Sleep(time.Millisecond * 100)
  87. }
  88. func ConnectDevice(client mqtt.Client, topic string, mac string, index string, ai string, at string) {
  89. message := "[{\"cmd\":\"AT+CNN=\",\"m\":\"" + mac + "\",\"i\":\"" + index + "\",\"ai\":\"" + ai + "\",\"at\":\"" + at + "\",\"l\":\"1\",\"x\":\"251\",\"relink\":\"0\",\"timeout\":\"12000\"}]"
  90. if token := client.Publish(topic, byte(global.EmqConfig.Qos), false, message); token.Wait() && token.Error() != nil {
  91. fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, message)
  92. } else {
  93. fmt.Printf("publish success, topic: %s, payload: %s\n", topic, message)
  94. }
  95. //time.Sleep(time.Second * 1)
  96. time.Sleep(time.Millisecond * 100)
  97. }
  98. func SubScribe(client mqtt.Client) {
  99. emqConfig := global.EmqConfig
  100. ws, _, err := websocket.DefaultDialer.Dial(global.Config.Websocket.WSUrl, nil)
  101. if err != nil {
  102. fmt.Println("socket通道初始化失败")
  103. panic(err)
  104. return
  105. }
  106. deviceService := GetDeviceService()
  107. d, i := deviceService.FindDeviceByType("")
  108. global.DeviceMap = make(map[string]gateway.DeviceInfo)
  109. if i > 0 {
  110. for j := 0; j < len(d); j++ {
  111. global.DeviceMap[d[j].Mac] = d[j]
  112. }
  113. }
  114. for _, topic := range emqConfig.Topic {
  115. topic = "/" + global.EmqConfig.GatewayMac + topic
  116. client.Subscribe(topic, byte(emqConfig.Qos), func(client mqtt.Client, message mqtt.Message) {
  117. fmt.Printf("`%s` Received `%s` from `%s` topic\n", common.NowTime("2006-01-02 15:04:05"), message.Payload(), message.Topic())
  118. messageMap := make(map[string]string)
  119. var payloads []emq.Payload
  120. err := json.Unmarshal(message.Payload(), &payloads)
  121. if err != nil {
  122. _ = fmt.Errorf("%s", err.Error())
  123. return
  124. }
  125. for _, payload := range payloads {
  126. messageMap["state"] = payload.State
  127. content := ""
  128. sendFlag := true
  129. switch payload.Cmd {
  130. case constant.PayloadScan:
  131. {
  132. //关闭扫描
  133. if payload.S == "0" {
  134. messageMap["msgType"] = constant.MessageTypeCloseScan
  135. content = "关闭扫描"
  136. //开启扫描
  137. } else if payload.S == "1" {
  138. messageMap["msgType"] = constant.MessageTypeStartScan
  139. content = "开启扫描"
  140. } else if payload.S == "" { //接收到蓝牙扫描数据
  141. if strings.HasPrefix(payload.N, "MIND") {
  142. payload.T = "0"
  143. } else if strings.HasPrefix(payload.N, "BW-ECG") {
  144. payload.T = "1"
  145. }
  146. messageMap["msgType"] = constant.MessageTypeDeviceScanned
  147. marshal, _ := json.Marshal(payload)
  148. content = string(marshal)
  149. }
  150. }
  151. case constant.PayloadFLT:
  152. messageMap["msgType"] = constant.MessageTypeFilter
  153. content = "设置过滤"
  154. case constant.PayloadSUUID:
  155. messageMap["msgType"] = constant.MessageTypeSUUID
  156. content = "设置UUID"
  157. if payload.State == "SUCCESS" {
  158. global.EmqConfig.FirstOpen = "1"
  159. } else {
  160. global.EmqConfig.FirstOpen = "0"
  161. }
  162. case constant.PayloadConnect:
  163. messageMap["msgType"] = constant.MessageTypeConnect
  164. //content = "设备连接"
  165. realMac := transMac(payload.M)
  166. deviceInfo := global.DeviceMap[realMac]
  167. payload.T = deviceInfo.Type
  168. marshal, _ := json.Marshal(payload)
  169. content = string(marshal)
  170. //连接成功或连接失败,发送已连接设备列表
  171. for i := 0; i < 6; i++ {
  172. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  173. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  174. }
  175. case constant.PayloadDisConnect:
  176. messageMap["msgType"] = constant.MessageTypeDisConnect
  177. //content = "断开设备连接"
  178. if payload.State == "SUCCESS" && payload.ReasonCode == "62" {
  179. messageMap["msgType"] = constant.MessageTypeConnect
  180. }
  181. realMac := transMac(payload.M)
  182. deviceInfo := global.DeviceMap[realMac]
  183. payload.T = deviceInfo.Type
  184. marshal, _ := json.Marshal(payload)
  185. content = string(marshal)
  186. //断开连接成功或连接失败,发送已连接设备列表
  187. for i := 0; i < 6; i++ {
  188. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  189. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  190. }
  191. case constant.PayloadQuenue:
  192. messageMap["msgType"] = constant.MessageTypeConnectList
  193. if payload.M == "" {
  194. sendFlag = false
  195. } else {
  196. realMac := transMac(payload.M)
  197. deviceInfo := global.DeviceMap[realMac]
  198. if deviceInfo.ID == 0 {
  199. continue
  200. } else {
  201. payload.T = deviceInfo.Type
  202. payload.N = deviceInfo.Name
  203. marshal, _ := json.Marshal(payload)
  204. content = string(marshal)
  205. }
  206. }
  207. case constant.PayloadCNB:
  208. connectedNum, _ := strconv.Atoi(payload.N)
  209. for i := 0; i < connectedNum; i++ {
  210. topic := "/" + global.Config.EmqConfig.GatewayMac + constant.TopicConnectSub
  211. gateway2.ConnectedListEmq(global.EmqClient, topic, strconv.Itoa(i))
  212. }
  213. case constant.PayloadWNP:
  214. messageMap["msgType"] = constant.MessageTypeWNP
  215. realMac := transMac(payload.M)
  216. deviceInfo := global.DeviceMap[realMac]
  217. payload.T = deviceInfo.Type
  218. payload.N = deviceInfo.Name
  219. marshal, _ := json.Marshal(payload)
  220. content = string(marshal)
  221. case constant.PayloadNotify: //接收到蓝牙通知数据
  222. //1.判断是脑电数据还是心电数据
  223. //1.1脑电数据直接转发
  224. //1.2心电数据做处理
  225. realMac := transMac(payload.M)
  226. deviceInfo := global.DeviceMap[realMac]
  227. var receiveData gateway.DeviceDataReceived
  228. receiveData.Mac = payload.M
  229. receiveData.Value = payload.D
  230. if deviceInfo.Type == "0" { //脑电
  231. messageMap["msgType"] = constant.MessageTypeEEGData
  232. marshal, _ := json.Marshal(receiveData)
  233. content = string(marshal)
  234. } else if deviceInfo.Type == "1" { //心电
  235. messageMap["msgType"] = constant.MessageTypeECGData
  236. flag := strings.HasPrefix(receiveData.Value, "E840") || strings.HasPrefix(receiveData.Value, "E841") || strings.HasPrefix(receiveData.Value, "E823") || strings.HasPrefix(receiveData.Value, "E820") || strings.HasPrefix(receiveData.Value, "E81F") || strings.HasPrefix(receiveData.Value, "E813") || strings.HasPrefix(receiveData.Value, "E810") || strings.HasPrefix(receiveData.Value, "E822") || strings.HasPrefix(receiveData.Value, "E826") || strings.HasPrefix(receiveData.Value, "E8FF00000000")
  237. if !flag {
  238. //var ecgData []int
  239. ecgData := [6]int{}
  240. data := byteString2ByteArray(receiveData.Value[4:])
  241. if len(data) == 18 {
  242. for i := 0; i < 18; i += 3 {
  243. ecgData[i/3] = data[i]<<16 | data[i+1]<<8 | data[i+2]
  244. }
  245. } else {
  246. for len(data) < 18 {
  247. for i := len(data); i < 18; i++ {
  248. data = append(data, data[i])
  249. }
  250. }
  251. for i := 0; i < 18; i += 3 {
  252. ecgData[i/3] = int(16<<data[i]&0xFF | 8<<data[i+1]&0xFF | data[i+2]&0xFF)
  253. }
  254. }
  255. messageMap["msgType"] = constant.MessageTypeECGData
  256. receiveData.Value = "["
  257. for _, datum := range ecgData {
  258. receiveData.Value = receiveData.Value + strconv.Itoa(datum) + ","
  259. }
  260. receiveData.Value = strings.TrimRight(receiveData.Value, ",")
  261. receiveData.Value = receiveData.Value + "]"
  262. marshal, _ := json.Marshal(receiveData)
  263. content = string(marshal)
  264. }
  265. }
  266. }
  267. if messageMap["msgType"] != constant.MessageTypeECGData && messageMap["msgType"] != constant.MessageTypeEEGData && messageMap["msgType"] != constant.MessageTypeDeviceScanned && messageMap["msgType"] != constant.MessageTypeConnectList && messageMap["msgType"] != constant.MessageTypeDisConnect && messageMap["msgType"] != constant.MessageTypeConnect && messageMap["msgType"] != constant.MessageTypeWNP {
  268. if payload.State == constant.Success {
  269. content = content + "成功"
  270. } else {
  271. content = content + "失败"
  272. }
  273. }
  274. messageMap["content"] = content
  275. messageMap["Sender"] = "server"
  276. messageMap["Recipient"] = "client"
  277. bytes, err := json.Marshal(messageMap)
  278. if err != nil {
  279. return
  280. }
  281. if sendFlag && content != "成功" && content != "失败" {
  282. if messageMap["msgType"] == constant.MessageTypeECGData {
  283. fmt.Printf("发送心电数据===" + common.NowTime("2006-01-02 15:04:05") + "=====" + string(bytes))
  284. }
  285. for k, _ := range messageMap {
  286. delete(messageMap, k)
  287. }
  288. err = ws.WriteMessage(websocket.TextMessage, bytes)
  289. if err != nil {
  290. fmt.Println("消息发送异常:" + err.Error())
  291. }
  292. }
  293. time.Sleep(time.Millisecond * 5)
  294. }
  295. })
  296. }
  297. }
  298. func byteString2ByteArray(byteString string) []int {
  299. //var byteArr [len(byteString)]byte
  300. byteArr := make([]int, len(byteString)/2)
  301. for i := 0; i < 18; i++ {
  302. subStr := byteString[i*2 : i*2+2]
  303. decodeString, err := hex.DecodeString(subStr)
  304. if err != nil {
  305. }
  306. byteArr[i] = int(decodeString[0])
  307. }
  308. return byteArr
  309. }
  310. func transMac(mac string) string {
  311. result := ""
  312. for i := 0; i < len(mac); i = i + 2 {
  313. result = result + mac[i:i+2] + ":"
  314. }
  315. return result[0 : len(result)-1]
  316. }