PK\O6fastapi_permissions/__init__.py""" Row Level Permissions for FastAPI This module provides an implementation for row level permissions for the FastAPI framework. This is heavily inspired / ripped off the Pyramids Web Framework, so all cudos to them! extremely simple and incomplete example: from fastapi import Depends, FastAPI from fastapi.security import OAuth2PasswordBearer from fastapi_permissions import configure_permissions, Allow, Deny from pydantic import BaseModel app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") class Item(BaseModel): name: str owner: str def __acl__(self): return [ (Allow, Authenticated, "view"), (Allow, "role:admin", "edit"), (Allow, f"user:{self.owner}", "delete"), ] class User(BaseModel): name: str def principals(self): return [f"user:{self.name}"] def get_current_user(token: str = Depends(oauth2_scheme)): ... def get_active_user_principals(user:User = Depends(get_current_user)): ... def get_item(item_identifier): ... # Permission is already wrapped in Depends() Permissions = configure_permissions(get_active_user_principals) @app.get("/item/{item_identifier}") async def show_item(item:Item = Permission("view", get_item)): return [{"item": item}] """ __version__ = "0.2.1" from fastapi import Depends, HTTPException from starlette.status import HTTP_403_FORBIDDEN import functools import itertools from typing import Any # constants Allow = "Allow" # acl "allow" action Deny = "Deny" # acl "deny" action Everyone = "system:everyone" # user principal for everyone Authenticated = "system:authenticated" # authenticated user principal class _AllPermissions: """ special container class for the all permissions constant first try was to override the __contains__ method of a str instance, but it turns out to be readonly... """ def __contains__(self, other): """ returns alway true any permission """ return True def __str__(self): """ string representation """ return "permissions:*" All = _AllPermissions() DENY_ALL = (Deny, Everyone, All) # acl shorthand, denies anything ALOW_ALL = (Allow, Everyone, All) # acl shorthand, allows everything # the exception that will be raised, if no sufficient permissions are found # can be configured in the configure_permissions() function permission_exception = HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Insufficient permissions", headers={"WWW-Authenticate": "Bearer"}, ) def configure_permissions( active_principals_func: Any, permission_exception: HTTPException = permission_exception, ): """ sets the basic configuration for the permissions system active_principals_func: a dependency that returns the principals of the current active user permission_exception: the exception used if a permission is denied returns: permission_dependency_factory function, with some parameters already provisioned """ active_principals_func = Depends(active_principals_func) return functools.partial( permission_dependency_factory, active_principals_func=active_principals_func, permission_exception=permission_exception, ) def permission_dependency_factory( permission: str, resource: Any, active_principals_func: Any, permission_exception: HTTPException, ): """ returns a function that acts as a dependable for checking permissions This is the actual function used for creating the permission dependency, with the help of fucntools.partial in the "configure_permissions()" function. permission: the permission to check resource: the resource that will be accessed active_principals_func (provisioned by configure_permissions): a dependency that returns the principals of the current active user permission_exception (provisioned by configure_permissions): exception if permission is denied returns: dependency function for "Depends()" """ if callable(resource): resource = Depends(resource) else: resource = Depends(lambda: resource) # to get the caller signature right, we need to add only the resource and # user dependable in the definition # the permission itself is available through the outer function scope def permission_dependency( resource=resource, principals=active_principals_func ): if has_permission(principals, permission, resource): return resource raise permission_exception return Depends(permission_dependency) def has_permission( user_principals: list, requested_permission: str, resource: Any ): """ checks if a user has the permission for a resource The order of the function parameters can be remembered like "Joe eat apple" user_principals: the principals of a user requested_permission: the permission that should be checked resource: the object the user wants to access, must provide an ACL returns bool: permission granted or denied """ acl = normalize_acl(resource) for action, principal, permissions in acl: if isinstance(permissions, str): permissions = {permissions} if requested_permission in permissions: if principal in user_principals: return action == Allow return False def list_permissions(user_principals: list, resource: Any): """ lists all permissions of a user for a resouce user_principals: the principals of a user resource: the object the user wants to access, must provide an ACL returns dict: every available permission of the resource as key and True / False as value if the permission is granted. """ acl = normalize_acl(resource) acl_permissions = (permissions for _, _, permissions in acl) as_iterables = ({p} if not is_like_list(p) else p for p in acl_permissions) permissions = set(itertools.chain.from_iterable(as_iterables)) return { str(p): has_permission(user_principals, p, acl) for p in permissions } # utility functions def normalize_acl(resource: Any): """ returns the access controll list for a resource If the resource is not an acl list itself it needs to have an "__acl__" attribute. If the "__acl__" attribute is a callable, it will be called and the result of the call returned. An existing __acl__ attribute takes precedence before checking if it is an iterable. """ acl = getattr(resource, "__acl__", None) if callable(acl): return acl() elif acl is not None: return acl elif is_like_list(resource): return resource return [] def is_like_list(something): """ checks if something is iterable but not a string """ if isinstance(something, str): return False return hasattr(something, "__iter__") PKJXO'y55fastapi_permissions/example.pyfrom datetime import datetime, timedelta from typing import List import jwt from fastapi import Depends, FastAPI, HTTPException from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jwt import PyJWTError from passlib.context import CryptContext from pydantic import BaseModel, ValidationError from starlette.status import HTTP_401_UNAUTHORIZED # >>> THIS IS NEW # import of the new "permission" module for row level permissions from fastapi_permissions import ( Allow, Authenticated, Everyone, configure_permissions, list_permissions, ) # <<< # to get a string like this run: # openssl rand -hex 32 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # >>> THIS IS NEW # users get a new field "principals", that contains a list with # roles and other identifiers for the user # <<< fake_users_db = { "bob": { "username": "bob", "full_name": "Bobby Bob", "email": "bob@example.com", "hashed_password": pwd_context.hash("secret"), # >>> THIS IS NEW "principals": ["user:bob", "role:admin"], # <<< }, "alice": { "username": "alice", "full_name": "Alice Chains", "email": "alicechains@example.com", "hashed_password": pwd_context.hash("secret"), # >>> THIS IS NEW "principals": ["user:alice"], # <<< }, } class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: str = None class User(BaseModel): username: str email: str = None full_name: str = None # >>> THIS IS NEW # just reflects the changes in the fake_user_db principals: List[str] = [] # <<< class UserInDB(User): hashed_password: str oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") app = FastAPI() def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def get_item(item_id: int): if item_id in fake_items_db: item_dict = fake_items_db[item_id] return Item(**item_dict) def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user def create_access_token(*, data: dict, expires_delta: timedelta): to_encode = data.copy() expire = datetime.utcnow() + expires_delta to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except (PyJWTError, ValidationError): raise credentials_exception user = get_user(fake_users_db, username=username) if user is None: raise credentials_exception return user # >>> THIS IS NEW # a fake database for some cheesy items fake_items_db = { 1: {"name": "Stilton", "owner": "bob"}, 2: {"name": "Danish Blue", "owner": "alice"}, } # the model class for the items most important is the __acl__ method class Item(BaseModel): name: str owner: str def __acl__(self): """ defines who can do what to the model instance the function returns a list containing tuples in the form of (Allow or Deny, principal identifier, permission name) If a role is not listed (like "role:user") the access will be automatically deny. It's like a (Deny, Everyone, All) is automatically appended at the end. """ return [ (Allow, Authenticated, "view"), (Allow, "role:admin", "use"), (Allow, f"user:{self.owner}", "use"), ] # for resources that don't have a corresponding model in the database # a simple class with an "__acl__" property is defined class ItemListResource: __acl__ = [(Allow, Authenticated, "view")] # you can even use just a list NewItemAcl = [(Allow, Authenticated, "view")] # the current user is determined by the "get_current_user" function. # but the permissions system is not interested in the user itself, but in the # associated principals. def get_active_principals(user: User = Depends(get_current_user)): if user: # user is logged in principals = [Everyone, Authenticated] principals.extend(getattr(user, "principals", [])) else: # user is not logged in principals = [Everyone] return principals # We need to tell the permissions system, how to get the principals of the # active user. # # "configure_permissions" returns a function that will return another function # that can act as a dependable. Confusing? Propably, but easy to use. Permission = configure_permissions(get_active_principals) # <<< @app.post("/token", response_model=Token) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends() ): user = authenticate_user( fake_users_db, form_data.username, form_data.password ) if not user: raise HTTPException( status_code=400, detail="Incorrect username or password" ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_user)): return current_user # >>> THIS IS NEW # The most interesting part here is Permission("view", ItemListResource)" # This function call will return a function that acts as a dependable # If the currently logged in user has the permission "view" for the # ItemListResource, the resource will be returned # If the user does not have the proper permission, a HTTP_401_UNAUTHORIZED # exception will be raised # permission result for the fake users: # - bob: granted # - alice: granted @app.get("/items/") async def show_items( ilr: ItemListResource = Permission("view", ItemListResource), user=Depends(get_current_user), ): available_permissions = { index: list_permissions(user, get_item(index)) for index in fake_items_db } return [ { "items": fake_items_db, "available_permissions": available_permissions, } ] # permission result for the fake users: # - bob: DENIED # - alice: DENIED @app.get("/item/add") async def add_items(acls: list = Permission("create", NewItemAcl)): return [{"items": "I can haz cheese?"}] # here is the second interesting thing: instead of using a resource class, # a dependable can be used. This way, we can easily acces database entries # permission result for the fake users: # - bob: item 1: granted, item 2: granted # - alice: item 1: granted, item 2: granted @app.get("/item/{item_id}") async def show_item(item: Item = Permission("view", get_item)): return [{"item": item}] # permission result for the fake users: # - bob: item 1: granted, item 2: granted # - alice: item 1: DENIED, item 2: granted @app.get("/item/{item_id}/use") async def use_item(item: Item = Permission("use", get_item)): return [{"item": item}] # <<< PKN/8ޱ+fastapi_permissions-0.2.1.dist-info/LICENSE/* * ---------------------------------------------------------------------------- * "THE BEER-WARE LICENSE" (Revision 42): * wrote this file. As long as you retain this notice you * can do whatever you want with this stuff. If we meet some day, and you think * this stuff is worth it, you can buy me a beer in return. Holger Frey * ---------------------------------------------------------------------------- */ PK!HPO)fastapi_permissions-0.2.1.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H5~n6,fastapi_permissions-0.2.1.dist-info/METADATA[moɑ+`C л. 服vuZ# Lpz2#0 ܯ/{ȡ$#r8}șz}gd*EV|!^˭ZNPV[zkiLoV+u2qR)e٪Y!טx\aZMoL⬻+bOͻy6\0DaRóJ-Æ_6NU[Keg{P?gB٤ԅJoMTf9ͱLVXm[ЊNʊBF7f%͘TWVy2 {aD-l/^hJ\fG{Qu)[++ѧgo?yW:~ɳ}oܥ)tBohe}~ ߪ$6ۘ{,~zHPٹ (s1JdzyL}"^,{vKu3fV}H('pToGcǒn#h6;&vS|G/?zFQpxyOL WɤM(,#_ }}u-V|3-єJ&WPN `Z̅ @R:}GC54l6+QY<ɔKSvEf2Q6o5Ōx3M}u:%n7S&m) Q> |Ai[o0{Sas -dNB<3ZM(s%@v }q$WI?Q?_UiM8 t)X+K[Tv %// %KUFt[=(1JRu_Ny {?EO!0i՟M(E!,'ǍM6 5vV?ќ?&QPT/q3d 䜠2E^asF&تlO\Uy@?=bLѮ$Le*FwZF v!STS3ivCsɢBf>{k GXƮ xPR0qOy@2!w)n:𲴏dKΧIaʠڥ{ B[!3Dt/v% O6MaL3x4';k_>oY{6۬_+È|hE#L! )4QGlgSc(INP9>,FUURɔ+)qSqgtb q(f%早_E`/_|[J>}:OMbOyi/T,.9!N3!w;\ 1ZY (^AZF#!rZiZ3CA ,δ,cZ)ujE3v0>,oK2@^9+C Ub!qEn\MCi@mzFaC;HCw,ɹњRId&,6MrS7L%ͲTs.^oQϞX{R`eXv5Z 5iT=q<XԚk+`瘹Ib܁W2$€';*ȈO0wȔb#x9[-xT (Әzr(-0 =/ !QQ-3m7x;^4w=E@TPb6i@̈́AIm /, /\Hx<=,"ai7J0 % gnt8a)|ÐIʠz~&-^)ɒi+0_f)0,Q v _|C!{bBlqg<~ %#QVTx^v 8kVEKJDg\Y`aD5&*|5}4٘+z-RdeyA*[O\ nP=H,RH'" Ό40-6" %*; )c|l9J8bEzM t'3q+ \3'n *|S0uA[4p8gL%zo3%YspA<ٔQ]+SPשvuiB?'ُAMe쳺GSBoTʡ{T.>kyycU]j } Aoq&/Һ (,q~  K9p uwsn_ثe?EIMk ){|?͠!aDI yo<kM- .&Sd*o_1CfGp&Y1["2 0c߹0JIٲ2l(͎bLn?jXxPK=;. Y`)Y>z@y;'r[-X-30\#/̼ x3h'— Z]RZk>gv>njP}q0Ez!ꠛpH$hZl @ Iy)\uήQҹqAikkQKO(2!Xm{2[(2L8ˊ`J4xv@Zr[ATkYRמ.Ҟ$2r>wzX+fYA.So\ַ 1rn,zP?4:*G6w-*K@KR 4ixuE?YVQ(Z 5 8&H]. v ĸ>b]yrOW61bjʲ<@ 4T.mhl%U> k 6! 'p N CFٌFmG&i?M~("lML񆎝j}6yHߜ W}hpJ Ck2T&MB nݓG;=TIO#'c֧Gk|N"B7-|U9߮#ՁےYK3WC!5+A`ǨKq䟴p5и*-j _2\K^NXJc~fvNW>i[f!S̀D"եJ\7puEJ9 6ȆaecQ/rg k'mE'dG<3\P{'bNylˊQP¨ª_8{)8ǔS6eP xݴzXw>8E}*wB)HۑXwYmLta}$<:TpdupIa!]!BuϤN8`)Ipj˕aS:>O>k[/Em7먌;ux |(_R[Ҭ2M|'Nv0Ne@R.닅+!#;Mnz7懆%τnE>j>/iZgHI}p8,OY:-,8GHc(MonvS*<75OKLzͽ{YDѶl|5ft ט^DtIeaC\_֎x֮:0ߐͅ ZսΙ@Ư9&Ʒ._(aoKF{䈹Jۥ#]lfc5c8.M!N|..q؅KLhCc`_5xMS%$'x>?ê"߳ٶ]q(̗:ɏnh~>v *.4Ø;/ ;"๚g\E:AH62_+]   Ȭ3E.ltYTY@| #*I3{2]v^g/ri|'_PkKni!WQ*7);o??TOG&3lQR;g5ͬSlX3j&ot4.pD r=< |TsYzxYTzfszϣPK!HP=6*fastapi_permissions-0.2.1.dist-info/RECORDMr0@g$ ]"U0Q@ H"o7t o^) IJI]i$g,)#E FOneDؼW4NN &bs*I=d Ԁ'9z<*vf 5؟qEKN5UIBJ(ISoauF]6ujᒀNeM|Gx6%JMqop3Y~v򞀻+Uk]apiWdc6M_И$\{5X4kv`þۈ2H;q@U#˟sPK\O6fastapi_permissions/__init__.pyPKJXO'y55fastapi_permissions/example.pyPKN/8ޱ+;fastapi_permissions-0.2.1.dist-info/LICENSEPK!HPO)z=fastapi_permissions-0.2.1.dist-info/WHEELPK!H5~n6,>fastapi_permissions-0.2.1.dist-info/METADATAPK!HP=6*Pfastapi_permissions-0.2.1.dist-info/RECORDPKGR