PK |N<? ? kisee/__init__.py"""HTTP server managing identities.""" __version__ = "19.7.0" PK `N=2{E E kisee/__main__.pyfrom kisee import kisee if __name__ == "__main__": kisee.main() PK TN{f f kisee/authentication.py"""Authentication handler """ import base64 import binascii from typing import Mapping, Tuple, Union import jwt from aiohttp import web from kisee.identity_provider import IdentityProvider, User Claims = Mapping[str, Union[str, bool, int]] async def _basic_authentication( encoded: bytes, idp: IdentityProvider ) -> Tuple[User, Claims]: """Authentication using Basic scheme. """ try: decoded = base64.b64decode(encoded) except binascii.Error: raise web.HTTPUnauthorized(reason="Bad authorization") try: username, password = decoded.decode("utf-8").split(":", 1) except ValueError: raise web.HTTPUnauthorized(reason="Bad authorization") user = await idp.identify(username, password) if user is None: raise web.HTTPUnauthorized(reason="Bad authorization") # Using basic auth means user knows its password so he's authorized to change it. return user, {"can_change_pwd": True} async def _jwt_authentication( token: str, idp: IdentityProvider, public_key: str ) -> Tuple[User, Claims]: """Authentication using JWT. """ try: claims = jwt.decode(token, public_key, algorithms="ES256") except jwt.DecodeError as err: raise web.HTTPUnauthorized(reason="Bad authorization") from err else: return await idp.get_user_by_username(claims["sub"]), claims async def authenticate_user(request: web.Request) -> Tuple[User, Claims]: """Multiple schemes authentication using request Authorization header. Raises HTTPUnauthorized on failure. """ if not request.headers.get("Authorization"): raise web.HTTPUnauthorized(reason="Missing authorization header") scheme, value = request.headers.get("Authorization").strip().split(" ", 1) if scheme == "Basic": return await _basic_authentication(value, request.app["identity_backend"]) if scheme == "Bearer": return await _jwt_authentication( value, request.app["identity_backend"], request.app["settings"]["jwt"]["public_key"], ) raise web.HTTPUnauthorized(reason="Bad authorization") PK TNw54o kisee/emails.py"""Email utils """ import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import Tuple def is_email(email: str) -> bool: """Assert that email has minimun requirements, can be splited in two parts with '@' """ return "@" in email def send_mail(subject: str, text: str, html: str, email_settings: dict, recipient: str): """Basically send a multipart/alternative email with text and HTML to recipient """ msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = email_settings["sender"] msg.add_header("reply-to", email_settings["sender"]) msg["To"] = recipient msg.attach(MIMEText(text, "plain")) msg.attach(MIMEText(html, "html")) with smtplib.SMTP(email_settings.get("host", "localhost")) as smtp_server: smtp_server.sendmail(email_settings["sender"], recipient, msg.as_string()) def forge_forgotten_email(username: str, email: str, token: str) -> Tuple[str, str]: """Return email template """ del email del token return ( f""" Salut {username}, Un mot de passe est si vite oublié ! Une demande de récuperation de mot de passe à été enregistrée. """, f"""
Salut {username},
Un mot de passe est si vite oublié ! Une demande de récuperation de mot de passe à été enregistrée.
""", ) PK TNQ߃"( ( kisee/identity_provider.py"""Abstract class representing an identity provider """ from abc import ABC, abstractmethod from importlib import import_module from typing import AsyncContextManager, Optional, Type class UserAlreadyExist(Exception): """Exception raised when user already exists """ class User: """Represents a logged-in, correctly identified, person. """ def __init__( self, user_id: str, username: str, email: str, is_superuser: bool = False ) -> None: self.user_id = user_id self.username = username self.email = email self.is_superuser = is_superuser class IdentityProvider( AsyncContextManager, ABC ): # pragma: no cover, pylint: disable=inherit-non-class # See: https://github.com/PyCQA/pylint/issues/2472 """Abtract class representing an identity provider """ def __init__(self, options: dict = None) -> None: self.options = options super().__init__() @abstractmethod async def identify(self, login: str, password: str) -> Optional[User]: """Identifies the given login/password pair, returns a dict if found. """ @abstractmethod async def register_user( self, username: str, password: str, email: str, is_superuser: bool = False ): """Create user with login, password and email """ @abstractmethod async def get_user_by_email(self, email) -> User: """Get user with provided email address """ @abstractmethod async def get_user_by_username(self, username) -> User: """Get user with provided username """ @abstractmethod async def set_password_for_user(self, user: User, password: str): """Set password for user """ @abstractmethod async def is_connection_alive(self) -> bool: """Verify that connection with identity provider datastore is alive """ def import_idp(dotted_path: str) -> Type[IdentityProvider]: """Import a dotted module path and return the attribute/class designated by the last name in the path. Raise ImportError if the import failed. """ try: module_path, class_name = dotted_path.rsplit(".", 1) except ValueError as err: raise ImportError("%s doesn't look like a module path" % dotted_path) from err module = import_module(module_path) try: return getattr(module, class_name) except AttributeError as err: raise ImportError( 'Module "%s" does not define a "%s" attribute/class' % (module_path, class_name) ) from err PK TN2 kisee/kisee.py"""Entry point for the identification provider. """ import argparse import logging import os import sys from typing import Mapping, Any import toml from aiohttp import web import sentry_sdk from kisee import views from kisee.identity_provider import import_idp from kisee.middlewares import verify_input_body_is_json, coreapi_error_middleware Settings = Mapping[str, Any] AIOHTTP_LOGGERS = ( "aiohttp.access", "aiohttp.client", "aiohttp.internal", "aiohttp.server", "aiohttp.web", "aiohttp.websocket", ) def setup_logging(loglevel): # pragma: no cover """Setup basic logging Args: loglevel (int): minimum loglevel for emitting messages """ logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s" logging.basicConfig( level=50 - (loglevel * 10), stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S", ) def load_conf(settings_path: str) -> Settings: """Search for a settings.toml file and load it. """ candidates = ( settings_path, os.path.join(os.getcwd(), settings_path), os.path.join(os.getcwd(), "settings.toml"), os.path.expanduser("~/settings.toml"), os.path.expanduser(os.path.join("~/", settings_path)), "/etc/settings.toml", os.path.join("/etc/", settings_path), ) for candidate in candidates: if os.path.exists(candidate): with open(candidate) as candidate_file: return toml.load(candidate_file) print("Failed to locate the settings.toml file.", file=sys.stderr) sys.exit(1) def parse_args(program_args=None) -> argparse.Namespace: """Parses command line arguments. """ if program_args is None: program_args = sys.argv[1:] parser = argparse.ArgumentParser(description="Shape Identity Provider") parser.add_argument("--settings", default="settings.toml") parser.add_argument( "-v", "--verbose", dest="loglevel", default=0, help="Verbose mode (-vv for more, -vvv, …)", action="count", ) return parser.parse_args(program_args) def identification_app(settings: Settings) -> web.Application: """Identification provider entry point: builds and run a webserver. """ app = web.Application( middlewares=[verify_input_body_is_json, coreapi_error_middleware], debug=settings["server"].get("debug", False), ) app["settings"] = settings app["identity_backend"] = import_idp(settings["identity_backend"]["class"])( settings["identity_backend"]["options"] ) async def on_startup_wrapper(app): """Wrapper to call __aenter__. """ await app["identity_backend"].__aenter__() async def on_cleanup_wrapper(app): """Wrapper to call __exit__. """ await app["identity_backend"].__aexit__(None, None, None) app.on_startup.append(on_startup_wrapper) app.on_cleanup.append(on_cleanup_wrapper) app.add_routes( [ web.get("/", views.get_root), web.get("/users/", views.get_users), web.post("/users/", views.post_users), web.get("/jwt/", views.get_jwts), web.post("/jwt/", views.post_jwt), web.get("/jwt/{jid}", views.get_jwt), web.get("/users/", views.get_users), web.patch("/users/{username}/", views.patch_user), web.get("/forgotten_passwords/", views.get_forgotten_passwords), web.post("/forgotten_passwords/", views.post_forgotten_passwords), web.get("/health/", views.get_health), ] ) return app def main() -> None: # pragma: no cover """Command line entry point. """ sentry_sdk.init() args = parse_args() setup_logging(args.loglevel) settings = load_conf(args.settings) app = identification_app(settings) web.run_app( app, host=settings["server"]["host"], port=int(settings["server"]["port"]) ) PK TND@ @ kisee/middlewares.py"""Middlewares for Kisee service """ import json from typing import Callable, Awaitable from aiohttp import web from kisee.serializers import serialize Handler = Callable[[web.Request], Awaitable[web.StreamResponse]] @web.middleware async def verify_input_body_is_json( request: web.Request, handler: Handler ) -> web.StreamResponse: """ Middleware to verify that input body is of json format """ if request.can_read_body: try: await request.json() except json.decoder.JSONDecodeError: raise web.HTTPBadRequest(reason="Malformed JSON.") return await handler(request) @web.middleware async def coreapi_error_middleware( request: web.Request, handler: Handler ) -> web.StreamResponse: """Implementation of: http://www.coreapi.org/specification/transport/#coercing-4xx-and-5xx-responses-to-errors """ try: return await handler(request) except web.HTTPException as ex: return serialize( request, {"_type": "error", "_meta": {"title": ex.reason}}, ex.status ) PK TN#^ kisee/serializers.py"""Serialisers using coreapi, the idea is to (in the future) provide various representations of our resources like mason, json-ld, hal, ... """ import coreapi from aiohttp import web def serialize( request: web.Request, document: dict, status=200, headers=None ) -> web.Response: """Serialize the given document according to the Accept header of the given request. """ accept = request.headers.get("Accept") codec = coreapi.utils.negotiate_encoder([coreapi.codecs.CoreJSONCodec()], accept) content = codec.dump(document, indent=True) return web.Response( body=content, content_type=codec.media_type, headers=headers, status=status ) PK TNKӫ kisee/utils.py"""Some utils for kisee """ from kisee.identity_provider import IdentityProvider, User async def get_user_with_email_or_username( user_input: dict, idp_backend: IdentityProvider ) -> User: """Retrieve user with either email or username """ if "email" in user_input: return await idp_backend.get_user_by_email(user_input["email"]) return await idp_backend.get_user_by_username(user_input["login"]) PK |N( ( kisee/views.py"""Views for the IdP server, implementing: - GET / - GET /jwt/ - POST /jwt/ """ import json import logging from datetime import datetime, timedelta import coreapi import jwt import psutil import shortuuid from aiohttp import web from kisee.authentication import authenticate_user from kisee.emails import forge_forgotten_email, is_email, send_mail from kisee.identity_provider import UserAlreadyExist from kisee.serializers import serialize from kisee.utils import get_user_with_email_or_username logger = logging.getLogger(__name__) async def get_root(request: web.Request) -> web.Response: """https://tools.ietf.org/html/draft-nottingham-json-home-06 """ hostname = request.app["settings"]["server"]["hostname"] home = { "api": { "title": "Identification Provider", "links": { "author": "mailto:julien@palard.fr", "describedBy": "https://kisee.readthedocs.io", }, }, "resources": { "jwt": { "href": f"{hostname}/jwt/", "hints": { "allow": ["GET", "POST"], "formats": {"application/coreapi+json": {}}, }, }, "users": { "href": f"{hostname}/users/", "hints": { "allow": ["GET", "POST", "PATCH"], "formats": {"application/coreapi+json": {}}, }, }, "forgotten-passwords": { "href": f"{hostname}/forgotten-passwords/", "hints": { "allow": ["GET", "POST"], "formats": {"application/coreapi+json": {}}, }, }, }, "actions": { "register-user": { "href": f"{hostname}/users/", "method": "POST", "fields": [ {"name": "username", "required": True}, {"name": "password", "required": True}, {"name": "email", "required": True}, ], }, "create-token": { "href": f"{hostname}/jwt/", "method": "POST", "fields": [ {"name": "login", "required": True}, {"name": "password", "required": True}, ], }, }, } return web.Response( body=json.dumps(home, indent=4), content_type="application/json-home" ) async def get_users(request: web.Request) -> web.Response: """View for GET /users/, just describes that a POST is possible. """ hostname = request.app["settings"]["server"]["hostname"] return serialize( request, coreapi.Document( url=f"{hostname}/users/", title="Users", content={ "users": [], "register_user": coreapi.Link( action="post", title="Register a new user", description="POSTing to this endpoint creates a new user", fields=[ coreapi.Field(name="username", required=True), coreapi.Field(name="password", required=True), coreapi.Field(name="email", required=True), ], ), }, ), ) async def post_users(request: web.Request) -> web.Response: """A client is asking to create a new user """ data = await request.json() if not all(key in data.keys() for key in {"username", "email", "password"}): raise web.HTTPBadRequest(reason="Missing required input fields") logger.debug("Trying to create user %s", data["username"]) if not is_email(data["email"]): raise web.HTTPBadRequest(reason="Email is not valid") try: await request.app["identity_backend"].register_user( data["username"], data["password"], data["email"] ) except UserAlreadyExist: raise web.HTTPConflict(reason="User already exist") location = f"/users/{data['username']}/" return web.Response(status=201, headers={"Location": location}) async def patch_user(request: web.Request) -> web.Response: """Patch user password """ user, claims = await authenticate_user(request) if not claims.get("can_change_pwd"): raise web.HTTPForbidden(reason="Password change forbidden") data = await request.json() if "password" not in data: raise web.HTTPBadRequest(reason="Missing fields to patch") username = request.match_info["username"] if username != user.username: raise web.HTTPForbidden(reason="Token does not apply to user resource") await request.app["identity_backend"].set_password_for_user(user, data["password"]) return web.Response(status=204) async def get_jwts(request: web.Request) -> web.Response: """Handlers for GET /jwt/, just describes that a POST is possible. """ hostname = request.app["settings"]["server"]["hostname"] return serialize( request, coreapi.Document( url=f"{hostname}/jwt/", title="JSON Web Tokens", content={ "tokens": [], "add_token": coreapi.Link( action="post", title="Create a new JWT", description="POSTing to this endpoint create JWT tokens.", fields=[ coreapi.Field(name="login", required=True), coreapi.Field(name="password", required=True), ], ), }, ), ) async def get_jwt(request: web.Request) -> web.Response: """Handler for GET /jwt{/jid}. """ del request # unused raise NotImplementedError() async def post_jwt(request: web.Request) -> web.Response: """A user is asking for a JWT. """ data = await request.json() if "login" not in data or "password" not in data: raise web.HTTPUnprocessableEntity(reason="Missing login or password.") logger.debug("Trying to identify user %s", data["login"]) user = await request.app["identity_backend"].identify( data["login"], data["password"] ) if user is None: raise web.HTTPForbidden(reason="Failed identification for kisee.") jti = shortuuid.uuid() return serialize( request, coreapi.Document( url="/jwt/", title="JSON Web Tokens", content={ "tokens": [ jwt.encode( { "iss": request.app["settings"]["jwt"]["iss"], "sub": user.user_id, "exp": datetime.utcnow() + timedelta(hours=1), "jti": jti, }, request.app["settings"]["jwt"]["private_key"], algorithm="ES256", ).decode("utf8") ], "add_token": coreapi.Link( action="post", title="Create a new JWT", description="POSTing to this endpoint create JWT tokens.", fields=[ coreapi.Field(name="login", required=True), coreapi.Field(name="password", required=True), ], ), }, ), status=201, headers={"Location": "/jwt/" + jti}, ) async def get_forgotten_passwords(request: web.Request) -> web.Response: """Get forgotten password view, just describes that a POST is possible. """ hostname = request.app["settings"]["server"]["hostname"] return serialize( request, coreapi.Document( url=f"{hostname}/forgotten-passwords/", title="Forgotten password management", content={ "reset-password": coreapi.Link( action="post", title="", description=""" POSTing to this endpoint subscribe for a forgotten password """, fields=[coreapi.Field(name="login"), coreapi.Field(name="email")], ) }, ), ) async def post_forgotten_passwords(request: web.Request) -> web.Response: """Create process to register new password """ data = await request.json() if "login" not in data and "email" not in data: raise web.HTTPBadRequest(reason="Missing required fields email or login") user = await get_user_with_email_or_username(data, request.app["identity_backend"]) jwt_token = jwt.encode( { "iss": request.app["settings"]["jwt"]["iss"], "sub": user.username, "exp": datetime.utcnow() + timedelta(hours=12), "jti": shortuuid.uuid(), "can_change_pwd": True, }, request.app["settings"]["jwt"]["private_key"], algorithm="ES256", ).decode("utf-8") content_text, content_html = forge_forgotten_email( user.username, user.email, jwt_token ) subject = "Forgotten password" send_mail( subject, content_text, content_html, request.app["settings"]["email"], user.email, ) return web.Response(status=201) async def get_health(request: web.Request) -> web.Response: """Get service health metrics """ is_database_ok = ( "OK" if await request.app["identity_backend"].is_connection_alive() else "KO" ) disk_usage = psutil.disk_usage("/") disk_free_percentage = disk_usage.free / disk_usage.total * 100 return web.Response( body=json.dumps( { "overall": "OK", "load_average": open("/proc/loadavg").readline().split(" ")[0], "database": is_database_ok, "disk_free": f"{disk_free_percentage:.2f}%", } ), content_type="application/json", ) PK TN kisee/providers/__init__.pyPK TN\֍ kisee/providers/demo.py"""This is a really dumb identification backend: it does not store anything and accepts almost any login/password pair. """ from typing import Optional from kisee.identity_provider import IdentityProvider, User class DemoBackend(IdentityProvider): """Dumb identity backend, for demo purposes. This backend follow the following rules: - Any user exist and have virtually all passwords. - Any password less or equal than 4 characters will fail. - root is a superuser. Yes, this mean than anybody logging as root with any password of more than 4 chars will be superuser. This is for demo purposes only. """ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): pass async def identify(self, login: str, password: str) -> Optional[User]: """Identifies the given login/password pair, returns a dict if found. """ # pylint: disable=unused-argument if len(password) < 4: return None return User( user_id=login, username=login, email=f"{login}@example.com", is_superuser=login == "root", ) async def register_user( self, username: str, password: str, email: str, is_superuser: bool = False ): pass async def get_user_by_email(self, email): """Get user with provided email address """ return User(user_id=email, username=email, email=email) async def get_user_by_username(self, username): """Get user with provided username """ return User(user_id=username, username=username, email=f"{username}@gmail.com") async def set_password_for_user(self, user: User, password: str): pass async def is_connection_alive(self) -> bool: """Verify that connection is alive, always return True """ return True PK !Hn% * ' kisee-19.7.0.dist-info/entry_points.txtN+I/N.,(),NMz`*713 PK TN(Y* * kisee-19.7.0.dist-info/LICENSEMIT License Copyright (c) 2018 meltygroup Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.PK !HP O kisee-19.7.0.dist-info/WHEELHM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&Ur PK !Hp " kisee-19.7.0.dist-info/METADATAWnH}Wj 5kj,ǖ/XہaX-%EvS&001o11_q gzt5f%Oq~NEAs8wwodQҼAaO