auth.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from config.logger import setup_logging
  2. TAG = __name__
  3. logger = setup_logging()
  4. class AuthenticationError(Exception):
  5. pass
  6. class AuthMiddleware:
  7. def __init__(self,config):
  8. self.config = config
  9. self.auth_config = config["server"].get("auth",{})
  10. # 构建token查询表
  11. self.tokens = {
  12. item["token"]: item["name"]
  13. for item in self.auth_config.get("tokens",[])
  14. }
  15. # 设备白名单
  16. self.allowed_devices = set(
  17. self.auth_config.get("allowed_devices",[])
  18. )
  19. async def authenticate(self, headers: dict):
  20. """
  21. 验证请求头中的token
  22. Args:
  23. headers (dict): 请求头
  24. Returns:
  25. str: 用户名
  26. """
  27. if not self.auth_config.get("enabled",False):
  28. return True
  29. # 检查设备是否在白名单中
  30. device_id = headers.get("device_id","")
  31. if self.allowed_devices and device_id not in self.allowed_devices:
  32. return True
  33. # 验证Authorization header
  34. auth_header = headers.get("Authorization","")
  35. if not auth_header.startswith("Bearer "):
  36. logger.bind(tag=TAG).error("Missing or invalid Authorization header")
  37. raise AuthenticationError("Missing or invalid Authorization header")
  38. token = auth_header.split(" ")[1]
  39. if token not in self.tokens:
  40. logger.bind(tag=TAG).error(f"Invalid token: {token}")
  41. raise AuthenticationError(f"Invalid token: {token}")
  42. logger.bind(tag=TAG).info(f"Authentication successful - Device: {device_id}, Token: {self.tokens[token]}")
  43. return True
  44. def get_token_name(self, token: str) -> str:
  45. """
  46. 获取token对应的名称
  47. Args:
  48. token (str): 令牌
  49. Returns:
  50. str: 令牌名称
  51. """
  52. return self.tokens.get(token, "Unknown")