55 lines
1.5 KiB
Python
55 lines
1.5 KiB
Python
from datetime import timedelta
|
|
import uuid as uuid_pkg
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from core.config import settings
|
|
from modules.auth.models import User
|
|
from modules.auth.security import (
|
|
authenticate_user,
|
|
create_access_token,
|
|
create_refresh_token,
|
|
hash_password,
|
|
)
|
|
from modules.auth.schemas import UserRole
|
|
from tests.conftest import fake
|
|
from typing import Optional # Import Optional
|
|
|
|
|
|
def create_user(
|
|
db: Session, is_admin: bool = False, username: Optional[str] = None
|
|
) -> User:
|
|
unhashed_password = fake.password()
|
|
_user = User(
|
|
name=fake.name(),
|
|
username=username or fake.user_name(), # Use provided username or generate one
|
|
hashed_password=hash_password(unhashed_password),
|
|
uuid=uuid_pkg.uuid4(),
|
|
role=UserRole.ADMIN if is_admin else UserRole.USER,
|
|
)
|
|
|
|
db.add(_user)
|
|
db.commit()
|
|
db.refresh(_user)
|
|
return _user, unhashed_password # return for testing
|
|
|
|
|
|
def login(db: Session, username: str, password: str) -> str:
|
|
user = authenticate_user(username, password, db)
|
|
if not user:
|
|
raise Exception("Incorrect username or password")
|
|
|
|
access_token = create_access_token(
|
|
data={"sub": user.username},
|
|
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES),
|
|
)
|
|
refresh_token = create_refresh_token(data={"sub": user.username})
|
|
|
|
max_age = settings.REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
|
|
|
|
return {
|
|
"access_token": access_token,
|
|
"refresh_token": refresh_token,
|
|
"max_age": max_age,
|
|
}
|