12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- from config.logger import setup_logging
- TAG = __name__
- logger = setup_logging()
- class AuthenticationError(Exception):
- pass
- class AuthMiddleware:
- def __init__(self,config):
- self.config = config
- self.auth_config = config["server"].get("auth",{})
- # 构建token查询表
- self.tokens = {
- item["token"]: item["name"]
- for item in self.auth_config.get("tokens",[])
- }
- # 设备白名单
- self.allowed_devices = set(
- self.auth_config.get("allowed_devices",[])
- )
- async def authenticate(self, headers: dict):
- """
- 验证请求头中的token
- Args:
- headers (dict): 请求头
- Returns:
- str: 用户名
- """
- if not self.auth_config.get("enabled",False):
- return True
-
- # 检查设备是否在白名单中
- device_id = headers.get("device_id","")
- if self.allowed_devices and device_id not in self.allowed_devices:
- return True
-
- # 验证Authorization header
- auth_header = headers.get("Authorization","")
- if not auth_header.startswith("Bearer "):
- logger.bind(tag=TAG).error("Missing or invalid Authorization header")
- raise AuthenticationError("Missing or invalid Authorization header")
-
- token = auth_header.split(" ")[1]
- if token not in self.tokens:
- logger.bind(tag=TAG).error(f"Invalid token: {token}")
- raise AuthenticationError(f"Invalid token: {token}")
-
- logger.bind(tag=TAG).info(f"Authentication successful - Device: {device_id}, Token: {self.tokens[token]}")
- return True
-
- def get_token_name(self, token: str) -> str:
- """
- 获取token对应的名称
- Args:
- token (str): 令牌
- Returns:
- str: 令牌名称
- """
- return self.tokens.get(token, "Unknown")
|