import asyncio import datetime import os import websockets import uuid import config from loguru import logger import json from algo.car_cf import car_classification from algo.car_direction import car_dircetion from algo.class_direction import class_direction logger.add(config.log_path, rotation="50 MB", encoding='utf-8') XLS_TYPE = 'xls' XLSX_TYPE = 'xlsx' TXT_TYPE ='txt' WAV_TYPE = 'wav' JSON_TYPE ='json' NPY_TYPE = 'npy' handlers = { XLS_TYPE : 'xls', XLSX_TYPE : 'xlsx', TXT_TYPE : 'txt', WAV_TYPE : 'wav', JSON_TYPE :'json', NPY_TYPE : 'npy' } CLOSE_TYPE = 'close' ERROR_TYPE = 'error' CONNECT_TYPE = 'connect' CAR_CLASS = 'car_classification' CAR_DIREC = 'car_direction' CLASS_DIREC = 'class_direc' class MessageOperation: def __init__(self, uid: str): self._uid = uid async def handle(self, msg: str) -> str: try: ask = json.loads(msg) self.msg_type = ask['msg_type'] self.positive_dir = ask['positive_dir'] self.negativa_dir = ask['negativa_dir'] self.task_type = ask['task_type'] self.record_id = ask['recordId'] self.msg_content = ask['content'] if self.task_type == CAR_CLASS: if self.msg_type in handlers: self.response_status,self.response_car_content,self.response_direction_content = car_classification( self.msg_content) elif self.msg_type == CLOSE_TYPE: self.response_status = True self.response_car_content = "close success" self.response_direction_content = 'close success' else: self.response_status = False self.response_car_content = "无法识别" self.response_direction_content = '无法识别' # self.record_id = "null" elif self.task_type == CAR_DIREC: if self.msg_type in handlers: self.response_status, self.response_direction_content, self.response_car_content = car_dircetion( self.msg_content,self.positive_dir,self.negativa_dir) elif self.msg_type == CLOSE_TYPE: self.response_status = True self.response_direction_content = "close success" self.response_car_content = "close success" else: self.response_status = False self.response_direction_content = "无法识别" self.response_car_content = "无法识别" elif self.task_type == CLASS_DIREC: if self.msg_type in handlers: self.response_status, self.response_car_content,self.response_direction_content = class_direction( self.msg_content,self.positive_dir,self.negativa_dir) elif self.msg_type == CLOSE_TYPE: self.response_status = True self.response_car_content = "close success" # self.response_direction_content = "close success" else: self.response_status = False self.response_car_content = "无法识别" # self.response_direction_content = "无法识别" except Exception as e: self.msg_type = ERROR_TYPE self.response_status = False self.response_content = str(e) def pack(self) -> str: return json.dumps({ 'Sender': self._uid, 'Recipient': 'server', 'recordId': self.record_id, 'status': self.response_status, 'msg_type': self.msg_type, 'car_content': self.response_car_content, 'direction_content': self.response_direction_content, 'time': str(datetime.datetime.now()) }) async def main(): uid = str(uuid.uuid4()) async with websockets.connect( f'{config.WS_URI}?uid={uid}&to_uid=server') as websocket: pid = os.getpid() logger.info(f'uid: {uid}') logger.info(f'pid: {pid}') greeting = json.dumps({ 'status': True, 'msg_type': CONNECT_TYPE, 'pid': pid, 'car_content': '', 'direction_content': '', 'time': str(datetime.datetime.now()) }) logger.debug(f">>> {greeting}") await websocket.send(greeting) await websocket.recv() while True: ask = await websocket.recv() logger.debug(f"<<< {ask}") msg_operation = MessageOperation(uid) await msg_operation.handle(ask) await websocket.send(msg_operation.pack()) if msg_operation.msg_type == CLOSE_TYPE: break if __name__ == "__main__": logger.info('Starting...') asyncio.run(main()) logger.info('End')