Spaces:
Sleeping
Sleeping
| import os | |
| import secrets | |
| from typing import Annotated | |
| from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
| from fastapi import status, Depends, Cookie | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict | |
| from loguru import logger | |
| from dotenv import load_dotenv | |
| from models.exceptions import MyHTTPException | |
| load_dotenv() | |
| API_USER = os.getenv("API_USER") | |
| API_PWD = os.getenv("API_PWD") | |
| API_KEY = os.getenv("API_KEY") | |
| security = HTTPBasic() | |
| class AuthUsers: | |
| REQUESTS_LIMIT = 3 # 3 REQUESTS PER MINUTE | |
| AUTH_TIME = 1 # 1 MINUTES | |
| users: set | |
| users_auth: defaultdict | |
| def __init__(self): | |
| self.users = set() | |
| self.users_auth = defaultdict(list) | |
| def generate_user_token(self) -> str: | |
| return secrets.token_hex(16) | |
| def verify_user(self, user_token: str) -> bool: | |
| logger.info(f"Check user token: {user_token}") | |
| if user_token in self.users: | |
| print(self.users_auth[user_token]) | |
| if len(self.users_auth[user_token]) < self.REQUESTS_LIMIT: | |
| return True | |
| elif datetime.now() - self.users_auth[user_token][ | |
| -self.REQUESTS_LIMIT | |
| ] >= timedelta(minutes=self.AUTH_TIME): | |
| return True | |
| else: | |
| raise MyHTTPException( | |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| detail="Too many requests. Try again later.", | |
| ) | |
| logger.info(f"User {user_token} not found") | |
| raise MyHTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Unauthorized user", | |
| ) | |
| def add_user_session(self, user_token: str): | |
| if user_token in self.users: | |
| self.users_auth[user_token].append(datetime.now()) | |
| else: | |
| self.users.add(user_token) | |
| self.users_auth.setdefault(user_token, []).append(datetime.now()) | |
| print(self.users_auth[user_token]) | |
| while len(self.users_auth[user_token]) > self.REQUESTS_LIMIT: | |
| self.users_auth[user_token].pop(0) | |
| logger.info(f"User's {user_token} session added") | |
| def get_cookie_data( | |
| self, user_token: str = Cookie(default=None, alias="Authorization") | |
| ) -> str: | |
| if not user_token: | |
| user_token = self.generate_user_token() | |
| if user_token not in self.users: | |
| self.users.add(user_token) | |
| logger.info("Unauthorized user") | |
| logger.info(f"Verified user with token: {user_token}") | |
| return user_token | |
| credentials_exception = MyHTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Login or password is incorrect", | |
| headers={"WWW-Authenticate": "Basic"}, | |
| ) | |
| def check_api_credentials( | |
| credentials: Annotated[HTTPBasicCredentials, Depends(security)] | |
| ): | |
| current_username_bytes = credentials.username.encode("utf8") | |
| correct_username_bytes = bytes(API_USER, "utf-8") | |
| is_correct_username = secrets.compare_digest( | |
| current_username_bytes, correct_username_bytes | |
| ) | |
| current_password_bytes = credentials.password.encode("utf8") | |
| correct_password_bytes = bytes(API_PWD, "utf-8") | |
| is_correct_password = secrets.compare_digest( | |
| current_password_bytes, correct_password_bytes | |
| ) | |
| if not (is_correct_username and is_correct_password): | |
| raise credentials_exception | |
| return credentials.username | |