PK!ppflask_praetorian/__init__.pyfrom flask_praetorian.base import Praetorian # noqa from flask_praetorian.exceptions import PraetorianError # noqa from flask_praetorian.decorators import ( # noqa auth_required, roles_required, roles_accepted, ) from flask_praetorian.utilities import ( # noqa current_user, current_user_id, current_rolenames, current_custom_claims, ) PK!FdGGflask_praetorian/base.pyimport flask import jwt import pendulum import re import textwrap import uuid import warnings from passlib.context import CryptContext from flask_praetorian.exceptions import ( AuthenticationError, BlacklistedError, ClaimCollisionError, EarlyRefreshError, ExpiredAccessError, ExpiredRefreshError, InvalidTokenHeader, InvalidUserError, MissingClaimError, MissingTokenHeader, MissingUserError, PraetorianError, ) from flask_praetorian.constants import ( DEFAULT_JWT_ACCESS_LIFESPAN, DEFAULT_JWT_ALGORITHM, DEFAULT_JWT_ALLOWED_ALGORITHMS, DEFAULT_JWT_HEADER_NAME, DEFAULT_JWT_HEADER_TYPE, DEFAULT_JWT_REFRESH_LIFESPAN, DEFAULT_USER_CLASS_VALIDATION_METHOD, RESERVED_CLAIMS, VITAM_AETERNUM, AccessType, ) class Praetorian: """ Comprises the implementation for the flask-praetorian flask extension. Provides a tool that allows password authentication and token provision for applications and designated endpoints """ def __init__(self, app=None, user_class=None, is_blacklisted=None): self.pwd_ctx = None self.hash_scheme = None self.salt = None if app is not None and user_class is not None: self.init_app(app, user_class, is_blacklisted) def init_app(self, app, user_class, is_blacklisted=None): """ Initializes the Praetorian extension :param: app: The flask app to bind this extension to :param: user_class: The class used to interact with user data :param: is_blacklisted: A method that may optionally be used to check the token against a blacklist when access or refresh is requested Should take the jti for the token to check as a single argument. Returns True if the jti is blacklisted, False otherwise. By default, always returns False. """ PraetorianError.require_condition( app.config.get('SECRET_KEY') is not None, "There must be a SECRET_KEY app config setting set", ) possible_schemes = [ 'argon2', 'bcrypt', 'pbkdf2_sha512', ] self.pwd_ctx = CryptContext( default='pbkdf2_sha512', schemes=possible_schemes + ['plaintext'], deprecated=[], ) self.hash_scheme = app.config.get('PRAETORIAN_HASH_SCHEME') valid_schemes = self.pwd_ctx.schemes() PraetorianError.require_condition( self.hash_scheme in valid_schemes or self.hash_scheme is None, "If {} is set, it must be one of the following schemes: {}", 'PRAETORIAN_HASH_SCHEME', valid_schemes, ) self.user_class = self._validate_user_class(user_class) self.is_blacklisted = is_blacklisted or (lambda t: False) self.encode_key = app.config['SECRET_KEY'] self.allowed_algorithms = app.config.get( 'JWT_ALLOWED_ALGORITHMS', DEFAULT_JWT_ALLOWED_ALGORITHMS, ) self.encode_algorithm = app.config.get( 'JWT_ALGORITHM', DEFAULT_JWT_ALGORITHM, ) self.access_lifespan = pendulum.Duration(**app.config.get( 'JWT_ACCESS_LIFESPAN', DEFAULT_JWT_ACCESS_LIFESPAN, )) self.refresh_lifespan = pendulum.Duration(**app.config.get( 'JWT_REFRESH_LIFESPAN', DEFAULT_JWT_REFRESH_LIFESPAN, )) self.header_name = app.config.get( 'JWT_HEADER_NAME', DEFAULT_JWT_HEADER_NAME, ) self.header_type = app.config.get( 'JWT_HEADER_TYPE', DEFAULT_JWT_HEADER_TYPE, ) self.user_class_validation_method = app.config.get( 'USER_CLASS_VALIDATION_METHOD', DEFAULT_USER_CLASS_VALIDATION_METHOD, ) if not app.config.get('DISABLE_PRAETORIAN_ERROR_HANDLER'): app.register_error_handler( PraetorianError, PraetorianError.build_error_handler(), ) self.is_testing = app.config.get('TESTING', False) if not hasattr(app, 'extensions'): app.extensions = {} app.extensions['praetorian'] = self @classmethod def _validate_user_class(cls, user_class): """ Validates the supplied user_class to make sure that it has the class methods necessary to function correctly. Requirements: - ``lookup`` method. Accepts a string parameter, returns instance - ``identify`` method. Accepts an identity parameter, returns instance """ PraetorianError.require_condition( getattr(user_class, 'lookup', None) is not None, textwrap.dedent(""" The user_class must have a lookup class method: user_class.lookup() -> """), ) PraetorianError.require_condition( getattr(user_class, 'identify', None) is not None, textwrap.dedent(""" The user_class must have an identify class method: user_class.identify() -> """), ) # TODO: Figure out how to check for an identity property return user_class def authenticate(self, username, password): """ Verifies that a password matches the stored password for that username. If verification passes, the matching user instance is returned """ PraetorianError.require_condition( self.user_class is not None, "Praetorian must be initialized before this method is available", ) user = self.user_class.lookup(username) MissingUserError.require_condition( user is not None, 'Could not find the requested user', ) AuthenticationError.require_condition( self._verify_password(password, user.password), 'The password is incorrect', ) return user def _verify_password(self, raw_password, hashed_password): """ Verifies that a plaintext password matches the hashed version of that password using the stored passlib password context """ PraetorianError.require_condition( self.pwd_ctx is not None, "Praetorian must be initialized before this method is available", ) return self.pwd_ctx.verify(raw_password, hashed_password) def encrypt_password(self, raw_password): """ Encrypts a plaintext password using the stored passlib password context """ PraetorianError.require_condition( self.pwd_ctx is not None, "Praetorian must be initialized before this method is available", ) return self.pwd_ctx.encrypt(raw_password, scheme=self.hash_scheme) def error_handler(self, error): """ Provides a flask error handler that is used for PraetorianErrors (and derived exceptions). """ warnings.warn( """ error_handler is deprecated. Use FlaskBuzz.build_error_handler instead """, warnings.DeprecationWarning, ) return error.jsonify(), error.status_code, error.headers def _check_user(self, user): """ Checks to make sure that a user is valid. First, checks that the user is not None. If this check fails, a MissingUserError is raised. Next, checks if the user has a validation method. If the method does not exist, the check passes. If the method exists, it is called. If the result of the call is not truthy, an InvalidUserError is raised """ MissingUserError.require_condition( user is not None, 'Could not find the requested user', ) user_validate_method = getattr( user, self.user_class_validation_method, None ) if user_validate_method is None: return InvalidUserError.require_condition( user_validate_method(), "The user is not valid or has had access revoked", ) def encode_jwt_token( self, user, override_access_lifespan=None, override_refresh_lifespan=None, **custom_claims ): """ Encodes user data into a jwt token that can be used for authorization at protected endpoints :param: override_access_lifespan: Override's the instance's access lifespan to set a custom duration after which the new token's accessability will expire. May not exceed the refresh_lifespan :param: override_refresh_lifespan: Override's the instance's refresh lifespan to set a custom duration after which the new token's refreshability will expire. :param: custom_claims: Additional claims that should be packed in the payload. Note that any claims supplied here must be JSON compatible types """ ClaimCollisionError.require_condition( set(custom_claims.keys()).isdisjoint(RESERVED_CLAIMS), "The custom claims collide with required claims", ) self._check_user(user) moment = pendulum.now('UTC') if override_refresh_lifespan is None: refresh_lifespan = self.refresh_lifespan else: refresh_lifespan = override_refresh_lifespan refresh_expiration = (moment + refresh_lifespan).int_timestamp if override_access_lifespan is None: access_lifespan = self.access_lifespan else: access_lifespan = override_access_lifespan access_expiration = min( (moment + access_lifespan).int_timestamp, refresh_expiration, ) payload_parts = dict( iat=moment.int_timestamp, exp=access_expiration, rf_exp=refresh_expiration, jti=str(uuid.uuid4()), id=user.identity, rls=','.join(user.rolenames), **custom_claims ) return jwt.encode( payload_parts, self.encode_key, self.encode_algorithm, ).decode('utf-8') def encode_eternal_jwt_token(self, user, **custom_claims): """ This utility function encodes a jwt token that never expires .. note:: This should be used sparingly since the token could become a security concern if it is ever lost. If you use this method, you should be sure that your application also implements a blacklist so that a given token can be blocked should it be lost or become a security concern """ return self.encode_jwt_token( user, override_access_lifespan=VITAM_AETERNUM, override_refresh_lifespan=VITAM_AETERNUM, **custom_claims ) def refresh_jwt_token(self, token, override_access_lifespan=None): """ Creates a new token for a user if and only if the old token's access permission is expired but its refresh permission is not yet expired. The new token's refresh expiration moment is the same as the old token's, but the new token's access expiration is refreshed :param: token: The existing jwt token that needs to be replaced with a new, refreshed token :param: override_access_lifespan: Override's the instance's access lifespan to set a custom duration after which the new token's accessability will expire. May not exceed the refresh lifespan """ moment = pendulum.now('UTC') # Note: we disable exp verification because we do custom checks here with InvalidTokenHeader.handle_errors('failed to decode JWT token'): data = jwt.decode( token, self.encode_key, algorithms=self.allowed_algorithms, options={'verify_exp': False}, ) self._validate_jwt_data(data, access_type=AccessType.refresh) user = self.user_class.identify(data['id']) self._check_user(user) if override_access_lifespan is None: access_lifespan = self.access_lifespan else: access_lifespan = override_access_lifespan refresh_expiration = data['rf_exp'] access_expiration = min( (moment + access_lifespan).int_timestamp, refresh_expiration, ) custom_claims = { k: v for (k, v) in data.items() if k not in RESERVED_CLAIMS } payload_parts = dict( iat=moment.int_timestamp, exp=access_expiration, rf_exp=refresh_expiration, jti=data['jti'], id=data['id'], rls=','.join(user.rolenames), **custom_claims ) return jwt.encode( payload_parts, self.encode_key, self.encode_algorithm, ).decode('utf-8') def extract_jwt_token(self, token): """ Extracts a data dictionary from a jwt token """ # Note: we disable exp verification because we will do it ourselves with InvalidTokenHeader.handle_errors('failed to decode JWT token'): data = jwt.decode( token, self.encode_key, algorithms=self.allowed_algorithms, options={'verify_exp': False}, ) self._validate_jwt_data(data, access_type=AccessType.access) return data def _validate_jwt_data(self, data, access_type): """ Validates that the data for a jwt token is valid """ MissingClaimError.require_condition( 'jti' in data, 'Token is missing jti claim', ) BlacklistedError.require_condition( not self.is_blacklisted(data['jti']), 'Token has a blacklisted jti', ) MissingClaimError.require_condition( 'id' in data, 'Token is missing id field', ) MissingClaimError.require_condition( 'exp' in data, 'Token is missing exp claim', ) MissingClaimError.require_condition( 'rf_exp' in data, 'Token is missing rf_exp claim', ) moment = pendulum.now('UTC').int_timestamp if access_type == AccessType.access: ExpiredAccessError.require_condition( moment <= data['exp'], 'access permission has expired', ) elif access_type == AccessType.refresh: EarlyRefreshError.require_condition( moment > data['exp'], 'access permission for token has not expired. may not refresh', ) ExpiredRefreshError.require_condition( moment <= data['rf_exp'], 'refresh permission for token has expired', ) def _unpack_header(self, headers): """ Unpacks a jwt token from a request header """ jwt_header = headers.get(self.header_name) MissingTokenHeader.require_condition( jwt_header is not None, "JWT token not found in headers under '{}'", self.header_name, ) match = re.match(self.header_type + r'\s*([\w\.-]+)', jwt_header) InvalidTokenHeader.require_condition( match is not None, "JWT header structure is invalid", ) token = match.group(1) return token def read_token_from_header(self): """ Unpacks a jwt token from the current flask request """ return self._unpack_header(flask.request.headers) def pack_header_for_user( self, user, override_access_lifespan=None, override_refresh_lifespan=None, **custom_claims ): """ Encodes a jwt token and packages it into a header dict for a given user :param: user: The user to package the header for :param: override_access_lifespan: Override's the instance's access lifespan to set a custom duration after which the new token's accessability will expire. May not exceed the refresh_lifespan :param: override_refresh_lifespan: Override's the instance's refresh lifespan to set a custom duration after which the new token's refreshability will expire. :param: custom_claims: Additional claims that should be packed in the payload. Note that any claims supplied here must be JSON compatible types """ token = self.encode_jwt_token( user, override_access_lifespan=override_access_lifespan, override_refresh_lifespan=override_refresh_lifespan, **custom_claims ) return {self.header_name: self.header_type + ' ' + token} PK!6qPPflask_praetorian/constants.pyimport pendulum import enum DEFAULT_JWT_HEADER_NAME = 'Authorization' DEFAULT_JWT_HEADER_TYPE = 'Bearer' DEFAULT_JWT_ACCESS_LIFESPAN = dict(minutes=15) DEFAULT_JWT_REFRESH_LIFESPAN = dict(days=30) DEFAULT_JWT_ALGORITHM = 'HS256' DEFAULT_JWT_ALLOWED_ALGORITHMS = ['HS256'] DEFAULT_USER_CLASS_VALIDATION_METHOD = 'is_valid' RESERVED_CLAIMS = {'iat', 'exp', 'rf_exp', 'jti', 'id', 'rls'} # 1M days seems reasonable. If this code is being used in 3000 years...welp VITAM_AETERNUM = pendulum.Duration(days=1000000) class AccessType(enum.Enum): access = 'ACCESS' refresh = 'REFRESH' PK!=+Xx x flask_praetorian/decorators.pyimport functools from flask_praetorian.exceptions import MissingRoleError from flask_praetorian.utilities import ( current_guard, add_jwt_data_to_app_context, app_context_has_jwt_data, remove_jwt_data_from_app_context, current_rolenames, ) def _verify_and_add_jwt(): """ This helper method just checks and adds jwt data to the app context. Will not add jwt data if it is already present. Only use in this module """ if not app_context_has_jwt_data(): guard = current_guard() token = guard.read_token_from_header() jwt_data = guard.extract_jwt_token(token) add_jwt_data_to_app_context(jwt_data) def auth_required(method): """ This decorator is used to ensure that a user is authenticated before being able to access a flask route. It also adds the current user to the current flask context. """ @functools.wraps(method) def wrapper(*args, **kwargs): _verify_and_add_jwt() try: return method(*args, **kwargs) finally: remove_jwt_data_from_app_context() return wrapper def roles_required(*required_rolenames): """ This decorator ensures that any uses accessing the decorated route have all the needed roles to access it. If an @auth_required decorator is not supplied already, this decorator will implicitly check @auth_required first """ def decorator(method): @functools.wraps(method) def wrapper(*args, **kwargs): _verify_and_add_jwt() try: MissingRoleError.require_condition( current_rolenames().issuperset(set(required_rolenames)), "This endpoint requires all the following roles: {}", [', '.join(required_rolenames)], ) return method(*args, **kwargs) finally: remove_jwt_data_from_app_context() return wrapper return decorator def roles_accepted(*accepted_rolenames): """ This decorator ensures that any uses accessing the decorated route have one of the needed roles to access it. If an @auth_required decorator is not supplied already, this decorator will implicitly check @auth_required first """ def decorator(method): @functools.wraps(method) def wrapper(*args, **kwargs): _verify_and_add_jwt() try: MissingRoleError.require_condition( not current_rolenames().isdisjoint( set(accepted_rolenames) ), "This endpoint requires one of the following roles: {}", [', '.join(accepted_rolenames)], ) return method(*args, **kwargs) finally: remove_jwt_data_from_app_context() return wrapper return decorator PK!/**flask_praetorian/exceptions.pyimport flask_buzz class PraetorianError(flask_buzz.FlaskBuzz): """ Provides a custom exception class for flask-praetorian based on flask-buzz. `flask-buzz on gitub `_ """ status_code = 401 class MissingClaimError(PraetorianError): """ The jwt token is missing a required claim """ pass class BlacklistedError(PraetorianError): """ The jwt token has been blacklisted and may not be used any more """ status_code = 403 class ExpiredAccessError(PraetorianError): """ The jwt token has expired for access and must be refreshed """ pass class EarlyRefreshError(PraetorianError): """ The jwt token has not yet expired for access and may not be refreshed """ pass class ExpiredRefreshError(PraetorianError): """ The jwt token has expired for refresh. An entirely new token must be issued """ pass class MissingTokenHeader(PraetorianError): """ The header is missing the required jwt token """ pass class InvalidTokenHeader(PraetorianError): """ The token contained in the header is invalid """ pass class InvalidUserError(PraetorianError): """ The user is no longer valid and is now not authorized """ status_code = 403 class MissingRoleError(PraetorianError): """ The token is missing a required role """ status_code = 403 class MissingUserError(PraetorianError): """ The user could not be identified """ pass class AuthenticationError(PraetorianError): """ The entered user's password did not match the stored password """ pass class ClaimCollisionError(PraetorianError): """" Custom claims to pack into the JWT payload collide with reserved claims """ pass PK!ӿ flask_praetorian/utilities.pyimport flask from flask_praetorian.constants import RESERVED_CLAIMS from flask_praetorian.exceptions import PraetorianError def current_guard(): """ Fetches the current instance of flask-praetorian that is attached to the current flask app """ guard = flask.current_app.extensions.get('praetorian', None) PraetorianError.require_condition( guard is not None, "No current guard found; Praetorian must be initialized first", ) return guard def app_context_has_jwt_data(): """ Checks if there is already jwt_data added to the app context """ return hasattr(flask._app_ctx_stack.top, 'jwt_data') def add_jwt_data_to_app_context(jwt_data): """ Adds a dictionary of jwt data (presumably unpacked from a token) to the top of the flask app's context """ ctx = flask._app_ctx_stack.top ctx.jwt_data = jwt_data def get_jwt_data_from_app_context(): """ Fetches a dict of jwt token data from the top of the flask app's context """ ctx = flask._app_ctx_stack.top jwt_data = getattr(ctx, 'jwt_data', None) PraetorianError.require_condition( jwt_data is not None, """ No jwt_data found in app context. Make sure @auth_required decorator is specified *first* for route """, ) return jwt_data def remove_jwt_data_from_app_context(): """ Removes the dict of jwt token data from the top of the flask app's context """ ctx = flask._app_ctx_stack.top if app_context_has_jwt_data(): del ctx.jwt_data def current_user_id(): """ This method returns the user id retrieved from jwt token data attached to the current flask app's context """ jwt_data = get_jwt_data_from_app_context() user_id = jwt_data.get('id') PraetorianError.require_condition( user_id is not None, "Could not fetch an id for the current user", ) return user_id def current_user(): """ This method returns a user instance for jwt token data attached to the current flask app's context """ user_id = current_user_id() guard = current_guard() user = guard.user_class.identify(user_id) PraetorianError.require_condition( user is not None, "Could not identify the current user from the current id", ) return user def current_rolenames(): """ This method returns the names of all roles associated with the current user """ jwt_data = get_jwt_data_from_app_context() if 'rls' not in jwt_data: # This is necessary so our set arithmetic works correctly return set(['non-empty-but-definitely-not-matching-subset']) else: return set(r.strip() for r in jwt_data['rls'].split(',')) def current_custom_claims(): """ This method returns any custom claims in the current jwt """ jwt_data = get_jwt_data_from_app_context() return {k: v for (k, v) in jwt_data.items() if k not in RESERVED_CLAIMS} PK!ݘ^<<<,flask_praetorian-0.5.3.dist-info/LICENSE.rstMIT License =========== Copyright (c) `2016` `Tucker Beck` Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTU&flask_praetorian-0.5.3.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hh?q )flask_praetorian-0.5.3.dist-info/METADATAWrFW!UU([^X΢$:9C`L`Y$_2EL^i~T^e6̈́oVZV$e#gӗ鉸u-m7koMSZmF$Ve)r* VOH GOMAS|޷n2N/Un2j3V͸^9?:SNj84vB(KTv3|ZjB>>IxC_ŕ;h\2p)9;I_N_=bqzpg4t޶ܪ-5u^ʦHcϧ/mmɛgyzd|c8=tE0^põCj>}:ԁu0/>,Ԫ҃oܕjӰ ˰`nfN ?L7DjY24]VK,ێsL(/%r'dU%0xb\)VY)jA-~mƵ`RgҩCV@E\'\lV*+ƻ<\M]^k^vVx7,e\Oë|w+U,NEIezh U]{Yܱ`vv8zLcȌr&үGR[Ebzb(K=e7 LRc>5*@fqN'Y4+)qn0xQl=H93Rs*h-RW1H4Z85 (zyN1bЃL 20җXNjod+j^:&ةw\>+$Vkмk7S~{+8djwwl>s6z%W'NgiR_aPj/"]4TGY\b(l[w K?bJD"l1PK!HyM'flask_praetorian-0.5.3.dist-info/RECORDһ@|%؀EEL8EZhA~`x`J?UUܧm{6IHd R^~we;37Y&/Nсle|F%%ś'eA-,1}47P*FJ|.[pka%ŬmMJدqU fFe,YӚe~EkxW7SAyBi]ў,iCl[]m(^9[LY)|5~cQcSi-]3=yB, ˯.#J(D]o|B<yg$ͳkW_vu)V3 q!,rH( -pc2M,`.]i5$ NK`1XϫBaD5z NR C:~3kp] o?Ov>@t(0|tʼ7S{׭93W5ccFEy{a>~PK!ppflask_praetorian/__init__.pyPK!FdGGflask_praetorian/base.pyPK!6qPPIflask_praetorian/constants.pyPK!=+Xx x 2Lflask_praetorian/decorators.pyPK!/**Wflask_praetorian/exceptions.pyPK!ӿ L_flask_praetorian/utilities.pyPK!ݘ^<<<,Fkflask_praetorian-0.5.3.dist-info/LICENSE.rstPK!HڽTU&oflask_praetorian-0.5.3.dist-info/WHEELPK!Hh?q )dpflask_praetorian-0.5.3.dist-info/METADATAPK!HyM'vflask_praetorian-0.5.3.dist-info/RECORDPK Ux