Spaces:
Sleeping
Sleeping
File size: 4,650 Bytes
b9669e1 c1d7a04 b9669e1 c1d7a04 b9669e1 c1d7a04 66536b4 b9669e1 66536b4 b9669e1 c1d7a04 b9669e1 c1d7a04 b9669e1 c1d7a04 b9669e1 c1d7a04 66536b4 c1d7a04 66536b4 c1d7a04 66536b4 c1d7a04 66536b4 c1d7a04 66536b4 b9669e1 66536b4 b9669e1 c1d7a04 b9669e1 c1d7a04 b9669e1 c1d7a04 b9669e1 66536b4 b9669e1 1bffbb8 b9669e1 c1d7a04 b9669e1 c1d7a04 b9669e1 1bffbb8 b9669e1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import uuid
from src.auth.utils import (
# send_otp_email,
verify_password,
create_refresh_token,
verify_verification_token,
create_access_token,
hash_password,
create_verification_token,
)
from src.core.models import Users
from sqlmodel import Session, select
from fastapi import HTTPException
from sqlmodel.ext.asyncio.session import AsyncSession
async def create_user(session: AsyncSession, name: str, email: str, password: str):
"""Create user without sending email"""
if not email.lower().endswith("@yuvabe.com"):
raise HTTPException(status_code=400, detail="Enter you're Yuvabe email ID")
user = await session.exec(select(Users).where(Users.email_id == email))
existing_user = user.first()
if existing_user:
raise ValueError("User already exists")
new_user = Users(
user_name=name,
email_id=email,
password=hash_password(password),
is_verified=True,
)
session.add(new_user)
await session.commit()
await session.refresh(new_user)
access_token = create_access_token(
data={
"sub": str(new_user.id),
"name": new_user.user_name,
"email": new_user.email_id,
}
)
refresh_token = create_refresh_token(
data={
"sub": str(new_user.id),
"name": new_user.user_name,
"email": new_user.email_id,
}
)
return {
"message": "User created successfully",
"user_id": str(new_user.id),
"access_token": access_token,
"refresh_token": refresh_token,
}
# async def send_verification_link(session: Session, email: str):
# """Send verification email for an existing user."""
# result = await session.exec(select(Users).where(Users.email_id == email))
# user = result.first()
# if not user:
# raise HTTPException(status_code=404, detail="User not found")
# if user.is_verified:
# raise HTTPException(status_code=400, detail="User is already verified")
# # Create a token using existing user ID (opaque token)
# token = create_verification_token(str(user.id))
# try:
# send_verification_email(email, token)
# except Exception as e:
# raise HTTPException(
# status_code=500, detail=f"Failed to send verification email: {str(e)}"
# )
# return {
# "message": "Verification link sent successfully",
# "user_id": str(user.id),
# "email": user.email_id,
# }
async def verify_email(session: Session, token: str):
try:
user_id = await verify_verification_token(token)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
user = await session.get(Users, uuid.UUID(user_id))
if not user:
raise HTTPException(status_code=404, detail="User not found")
if not user.is_verified:
user.is_verified = True
await session.commit()
access_token = create_access_token(
data={"sub": str(user.id), "name": user.user_name, "email": user.email_id}
)
refresh_token = create_refresh_token(
data={"sub": str(user.id), "name": user.user_name, "email": user.email_id}
)
return {
"message": "Email verified successfully!",
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
}
async def login_user(session: Session, email: str, password: str):
if not email.lower().endswith("@yuvabe.com"):
raise HTTPException(status_code=400, detail="Enter you're Yuvabe email ID")
users = await session.exec(select(Users).where(Users.email_id == email))
user = users.first()
if not user:
raise HTTPException(status_code=400, detail="Invalid email or password")
if not verify_password(password, user.password):
raise HTTPException(status_code=400, detail="Invalid email or password")
if not user.is_verified:
raise HTTPException(status_code=400, detail="Verify email to login")
access_token = create_access_token(
data={"sub": str(user.id), "name": user.user_name, "email": user.email_id}
)
refresh_token = create_refresh_token(
data={"sub": str(user.id), "name": user.user_name, "email": user.email_id}
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"user": {
"id": str(user.id),
"name": user.user_name,
"email": user.email_id,
"is_verified": user.is_verified,
},
}
|