PKDTO fastapi_users/__init__.py"""Ready-to-use and customizable users management for FastAPI.""" __version__ = "0.1.0" from fastapi_users.fastapi_users import FastAPIUsers # noqa: F401 from fastapi_users.models import BaseUser # noqa: F401 PKDTOcX..fastapi_users/fastapi_users.pyfrom typing import Any, Callable, Type from fastapi import APIRouter from fastapi_users.authentication import BaseAuthentication from fastapi_users.db import BaseUserDatabase from fastapi_users.models import BaseUser, BaseUserDB from fastapi_users.router import get_user_router class FastAPIUsers: """ Main object that ties together the component for users authentication. :param db: Database adapter instance. :param auth: Authentication logic instance. :param user_model: Pydantic model of a user. :param on_after_forgot_password: Hook called after a forgot password request. :param reset_password_token_secret: Secret to encode reset password token. :param reset_password_token_lifetime_seconds: Lifetime of reset password token. :attribute router: FastAPI router exposing authentication routes. :attribute get_current_user: Dependency callable to inject authenticated user. """ db: BaseUserDatabase auth: BaseAuthentication router: APIRouter get_current_user: Callable[..., BaseUserDB] def __init__( self, db: BaseUserDatabase, auth: BaseAuthentication, user_model: Type[BaseUser], on_after_forgot_password: Callable[[BaseUserDB, str], Any], reset_password_token_secret: str, reset_password_token_lifetime_seconds: int = 3600, ): self.db = db self.auth = auth self.router = get_user_router( self.db, user_model, self.auth, on_after_forgot_password, reset_password_token_secret, reset_password_token_lifetime_seconds, ) get_current_user = self.auth.get_current_user(self.db) self.get_current_user = get_current_user # type: ignore get_current_active_user = self.auth.get_current_active_user(self.db) self.get_current_active_user = get_current_active_user # type: ignore get_current_superuser = self.auth.get_current_superuser(self.db) self.get_current_superuser = get_current_superuser # type: ignore PKDTOfastapi_users/models.pyimport uuid from typing import Optional, Type import pydantic from pydantic import BaseModel from pydantic.types import EmailStr class BaseUser(BaseModel): """Base User model.""" id: Optional[str] = None email: Optional[EmailStr] = None is_active: Optional[bool] = True is_superuser: Optional[bool] = False @pydantic.validator("id", pre=True, always=True) def default_id(cls, v): return v or str(uuid.uuid4()) def create_update_dict(self): return self.dict( skip_defaults=True, exclude={"id", "is_superuser", "is_active"} ) class BaseUserCreate(BaseUser): email: EmailStr password: str class BaseUserUpdate(BaseUser): password: Optional[str] class BaseUserDB(BaseUser): hashed_password: str class Models: """Generate models inheriting from the custom User model.""" def __init__(self, user_model: Type[BaseUser]): class UserCreate(user_model, BaseUserCreate): # type: ignore pass class UserUpdate(user_model, BaseUserUpdate): # type: ignore pass class UserDB(user_model, BaseUserDB): # type: ignore pass self.User = user_model self.UserCreate = UserCreate self.UserUpdate = UserUpdate self.UserDB = UserDB PKDTO Hfastapi_users/password.pyfrom typing import Tuple from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_and_update_password( plain_password: str, hashed_password: str ) -> Tuple[bool, str]: return pwd_context.verify_and_update(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) PKDTO BBfastapi_users/router.pyimport asyncio from typing import Any, Callable, Type import jwt from fastapi import APIRouter, Body, Depends, HTTPException from fastapi.security import OAuth2PasswordRequestForm from pydantic.types import EmailStr from starlette import status from starlette.responses import Response from fastapi_users.authentication import BaseAuthentication from fastapi_users.db import BaseUserDatabase from fastapi_users.models import BaseUser, BaseUserDB, Models from fastapi_users.password import get_password_hash from fastapi_users.utils import JWT_ALGORITHM, generate_jwt def get_user_router( user_db: BaseUserDatabase, user_model: Type[BaseUser], auth: BaseAuthentication, on_after_forgot_password: Callable[[BaseUserDB, str], Any], reset_password_token_secret: str, reset_password_token_lifetime_seconds: int = 3600, ) -> APIRouter: """Generate a router with the authentication routes.""" router = APIRouter() models = Models(user_model) reset_password_token_audience = "fastapi-users:reset" is_on_after_forgot_password_async = asyncio.iscoroutinefunction( on_after_forgot_password ) get_current_active_user = auth.get_current_active_user(user_db) @router.post( "/register", response_model=models.User, status_code=status.HTTP_201_CREATED ) async def register(user: models.UserCreate): # type: ignore existing_user = await user_db.get_by_email(user.email) if existing_user is not None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) hashed_password = get_password_hash(user.password) db_user = models.UserDB( **user.create_update_dict(), hashed_password=hashed_password ) created_user = await user_db.create(db_user) return created_user @router.post("/login") async def login( response: Response, credentials: OAuth2PasswordRequestForm = Depends() ): user = await user_db.authenticate(credentials) if user is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) elif not user.is_active: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) return await auth.get_login_response(user, response) @router.post("/forgot-password", status_code=status.HTTP_202_ACCEPTED) async def forgot_password(email: EmailStr = Body(..., embed=True)): user = await user_db.get_by_email(email) if user is not None and user.is_active: token_data = {"user_id": user.id, "aud": reset_password_token_audience} token = generate_jwt( token_data, reset_password_token_lifetime_seconds, reset_password_token_secret, ) if is_on_after_forgot_password_async: await on_after_forgot_password(user, token) else: on_after_forgot_password(user, token) return None @router.post("/reset-password") async def reset_password(token: str = Body(...), password: str = Body(...)): try: data = jwt.decode( token, reset_password_token_secret, audience=reset_password_token_audience, algorithms=[JWT_ALGORITHM], ) user_id = data.get("user_id") if user_id is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) user = await user_db.get(user_id) if user is None or not user.is_active: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) user.hashed_password = get_password_hash(password) await user_db.update(user) except jwt.PyJWTError: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) @router.get("/me", response_model=models.User) async def me( user: models.UserDB = Depends(get_current_active_user) # type: ignore ): return user @router.patch("/me", response_model=models.User) async def update_me( updated_user: models.UserUpdate, # type: ignore user: models.UserDB = Depends(get_current_active_user), # type: ignore ): updated_user_data = updated_user.create_update_dict() for field in updated_user_data: if field == "password": hashed_password = get_password_hash(updated_user_data[field]) user.hashed_password = hashed_password else: setattr(user, field, updated_user_data[field]) return await user_db.update(user) return router PKDTO1fastapi_users/utils.pyfrom datetime import datetime, timedelta import jwt JWT_ALGORITHM = "HS256" def generate_jwt( data: dict, lifetime_seconds: int, secret: str, algorithm: str = JWT_ALGORITHM ) -> str: payload = data.copy() expire = datetime.utcnow() + timedelta(seconds=lifetime_seconds) payload["exp"] = expire return jwt.encode(payload, secret, algorithm=algorithm).decode("utf-8") PKDTOǜ(fastapi_users/authentication/__init__.pyfrom fastapi_users.authentication.base import BaseAuthentication # noqa: F401 from fastapi_users.authentication.jwt import JWTAuthentication # noqa: F401 PKDTO \dd$fastapi_users/authentication/base.pyfrom typing import Callable from fastapi import HTTPException from starlette import status from starlette.responses import Response from fastapi_users.db import BaseUserDatabase from fastapi_users.models import BaseUserDB credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) class BaseAuthentication: """ Base adapter for generating and decoding authentication tokens. Provides dependency injectors to get current active/superuser user. """ async def get_login_response(self, user: BaseUserDB, response: Response): raise NotImplementedError() def get_current_user(self, user_db: BaseUserDatabase): raise NotImplementedError() def get_current_active_user(self, user_db: BaseUserDatabase): raise NotImplementedError() def get_current_superuser(self, user_db: BaseUserDatabase): raise NotImplementedError() def _get_authentication_method( self, user_db: BaseUserDatabase ) -> Callable[..., BaseUserDB]: raise NotImplementedError() def _get_current_user_base(self, user: BaseUserDB) -> BaseUserDB: if user is None: raise self._get_credentials_exception() return user def _get_current_active_user_base(self, user: BaseUserDB) -> BaseUserDB: user = self._get_current_user_base(user) if not user.is_active: raise self._get_credentials_exception() return user def _get_current_superuser_base(self, user: BaseUserDB) -> BaseUserDB: user = self._get_current_active_user_base(user) if not user.is_superuser: raise self._get_credentials_exception(status.HTTP_403_FORBIDDEN) return user def _get_credentials_exception( self, status_code: int = status.HTTP_401_UNAUTHORIZED ) -> HTTPException: return HTTPException(status_code=status_code) PKDTO@"z z #fastapi_users/authentication/jwt.pyimport jwt from fastapi import Depends from fastapi.security import OAuth2PasswordBearer from starlette.responses import Response from fastapi_users.authentication.base import BaseAuthentication from fastapi_users.db.base import BaseUserDatabase from fastapi_users.models import BaseUserDB from fastapi_users.utils import JWT_ALGORITHM, generate_jwt oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login") class JWTAuthentication(BaseAuthentication): """ Authentication using a JWT. :param secret: Secret used to encode the JWT. :param lifetime_seconds: Lifetime duration of the JWT in seconds. """ token_audience: str = "fastapi-users:auth" secret: str lifetime_seconds: int def __init__(self, secret: str, lifetime_seconds: int): self.secret = secret self.lifetime_seconds = lifetime_seconds async def get_login_response(self, user: BaseUserDB, response: Response): data = {"user_id": user.id, "aud": self.token_audience} token = generate_jwt(data, self.lifetime_seconds, self.secret, JWT_ALGORITHM) return {"token": token} def get_current_user(self, user_db: BaseUserDatabase): async def _get_current_user(token: str = Depends(oauth2_scheme)): user = await self._get_authentication_method(user_db)(token) return self._get_current_user_base(user) return _get_current_user def get_current_active_user(self, user_db: BaseUserDatabase): async def _get_current_active_user(token: str = Depends(oauth2_scheme)): user = await self._get_authentication_method(user_db)(token) return self._get_current_active_user_base(user) return _get_current_active_user def get_current_superuser(self, user_db: BaseUserDatabase): async def _get_current_superuser(token: str = Depends(oauth2_scheme)): user = await self._get_authentication_method(user_db)(token) return self._get_current_superuser_base(user) return _get_current_superuser def _get_authentication_method(self, user_db: BaseUserDatabase): async def authentication_method(token: str = Depends(oauth2_scheme)): try: data = jwt.decode( token, self.secret, audience=self.token_audience, algorithms=[JWT_ALGORITHM], ) user_id = data.get("user_id") if user_id is None: return None except jwt.PyJWTError: return None return await user_db.get(user_id) return authentication_method PKDTOyyfastapi_users/db/__init__.pyfrom fastapi_users.db.base import BaseUserDatabase # noqa: F401 from fastapi_users.db.sqlalchemy import ( # noqa: F401 SQLAlchemyBaseUserTable, SQLAlchemyUserDatabase, ) PKDTOfastapi_users/db/base.pyfrom typing import List, Optional from fastapi.security import OAuth2PasswordRequestForm from fastapi_users import password from fastapi_users.models import BaseUserDB class BaseUserDatabase: """Base adapter for retrieving, creating and updating users from a database.""" async def list(self) -> List[BaseUserDB]: """List all users.""" raise NotImplementedError() async def get(self, id: str) -> Optional[BaseUserDB]: """Get a single user by id.""" raise NotImplementedError() async def get_by_email(self, email: str) -> Optional[BaseUserDB]: """Get a single user by email.""" raise NotImplementedError() async def create(self, user: BaseUserDB) -> BaseUserDB: """Create a user.""" raise NotImplementedError() async def update(self, user: BaseUserDB) -> BaseUserDB: """Update a user.""" raise NotImplementedError() async def authenticate( self, credentials: OAuth2PasswordRequestForm ) -> Optional[BaseUserDB]: """ Authenticate and return a user following an email and a password. Will automatically upgrade password hash if necessary. """ user = await self.get_by_email(credentials.username) # Always run the hasher to mitigate timing attack # Inspired from Django: https://code.djangoproject.com/ticket/20760 password.get_password_hash(credentials.password) if user is None: return None else: verified, updated_password_hash = password.verify_and_update_password( credentials.password, user.hashed_password ) if not verified: return None # Update password hash to a more robust one if needed if updated_password_hash is not None: user.hashed_password = updated_password_hash await self.update(user) return user PKDTO'fastapi_users/db/sqlalchemy.pyfrom typing import List, cast from databases import Database from sqlalchemy import Boolean, Column, String, Table from fastapi_users.db.base import BaseUserDatabase from fastapi_users.models import BaseUserDB class SQLAlchemyBaseUserTable: """Base SQLAlchemy users table definition.""" __tablename__ = "user" id = Column(String, primary_key=True) email = Column(String, unique=True, index=True, nullable=False) hashed_password = Column(String, nullable=False) is_active = Column(Boolean, default=True, nullable=False) is_superuser = Column(Boolean, default=False, nullable=False) class SQLAlchemyUserDatabase(BaseUserDatabase): """ Database adapter for SQLAlchemy. :param database: `Database` instance from `encode/databases`. :param users: SQLAlchemy users table instance. """ database: Database users: Table def __init__(self, database: Database, users: Table): self.database = database self.users = users async def list(self) -> List[BaseUserDB]: query = self.users.select() return cast(List[BaseUserDB], await self.database.fetch_all(query)) async def get(self, id: str) -> BaseUserDB: query = self.users.select().where(self.users.c.id == id) return cast(BaseUserDB, await self.database.fetch_one(query)) async def get_by_email(self, email: str) -> BaseUserDB: query = self.users.select().where(self.users.c.email == email) return cast(BaseUserDB, await self.database.fetch_one(query)) async def create(self, user: BaseUserDB) -> BaseUserDB: query = self.users.insert().values(**user.dict()) await self.database.execute(query) return user async def update(self, user: BaseUserDB) -> BaseUserDB: query = ( self.users.update().where(self.users.c.id == user.id).values(**user.dict()) ) await self.database.execute(query) return user PKDTO=00%fastapi_users-0.1.0.dist-info/LICENSEMIT License Copyright (c) 2019 François Voron Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HPO#fastapi_users-0.1.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H'M"!J &fastapi_users-0.1.0.dist-info/METADATAVn8}W̺6*)6 v׍hvsi0EIdIʩC!)9M̜N!11Ȕ^띑aA!3;޴s>\2Wڈ%QBN B{t<8"gģ>HÄ*pHq7a htu7.M*TޣŷQ(Q4}a>Kľ%\rŴQ!S Co4U\?Aϰ/&ĴocqWxGњ/8 %|z c)XؾV- [LHW!>3ٝP2UA[cK3ƀ2欠IG y )`u[[nU-[U0cم}2mw ~a8 pK hQUIsf_p- >h!G #v׏U&ҕpIqNv ƿ<GpZiA?.ښ =JpT $`I1pc;#`ʹ"wWւQbHD[M nW64dj =h'd_T0Ѿ0esLC"Zek/ c CoSͰWŤnvv(\Tبe}Iu^RU6 Y=>Hy^KE^R:`JNQh AHFT wvIqTaWVɽD:.{E]?հ*|I:P*sYI^KS WqPe$oM=g\Xx*Z[sw̼(>+QYQ:_Bѵ+]|6;!)0 !cӝ?)qU57 iŵ~pcI13k>>88L GO.C4-L׋5?'Ç^rЗ?ү=C0U`p5\/C_Inn >n8Z2tLKLl꿽zVï#:L(zj!R`9o?O >.ݧs9swKb ãr«PA0(V +j?=-n@V. Mnt$`a6sX(oÐ3{VkmQA~>"0⹜4)ܿχ6g࢚[ӍI] NC)*\z V|Y iq"M+(/"u&;ell +pr>rW/|Ke7-]Vݶ',uQnrO`jܮ@fgUmxUHpp ǝ] ǣ_*ů!j *b ~KEG7Rv$P3 cݿ&X+ʎPԯPKDTO fastapi_users/__init__.pyPKDTOcX.. fastapi_users/fastapi_users.pyPKDTOv fastapi_users/models.pyPKDTO Hfastapi_users/password.pyPKDTO BBfastapi_users/router.pyPKDTO1#fastapi_users/utils.pyPKDTOǜ($fastapi_users/authentication/__init__.pyPKDTO \dd$%fastapi_users/authentication/base.pyPKDTO@"z z #F-fastapi_users/authentication/jwt.pyPKDTOyy8fastapi_users/db/__init__.pyPKDTO8fastapi_users/db/base.pyPKDTO'@fastapi_users/db/sqlalchemy.pyPKDTO=00%Hfastapi_users-0.1.0.dist-info/LICENSEPK!HPO#2Mfastapi_users-0.1.0.dist-info/WHEELPK!H'M"!J &Mfastapi_users-0.1.0.dist-info/METADATAPK!Hú:$QRfastapi_users-0.1.0.dist-info/RECORDPKU