PK!Q566hydra_client/__init__.py__version__ = "0.1.0" from .client import HydraAdmin PK!͖t ((hydra_client/abc.pyfrom __future__ import annotations import abc import typing import requests from . import exceptions from .utils import urljoin if typing.TYPE_CHECKING: from .client import Hydra class AbstractResource(abc.ABC): def __init__(self, resource: AbstractResource = None): if resource is not None: self.session: requests.Session = resource.session self.url: str = resource.url else: self.session = requests.Session() self.url = "" def _request( self, method: str, url: str, params: dict = None, json: dict = None ) -> requests.Response: try: response = self.session.request(method, url, params=params, json=json) except ( requests.exceptions.ConnectionError, requests.exceptions.Timeout, ) as exc: raise exceptions.ConnectionError from exc except requests.exceptions.RequestException as exc: raise exceptions.TransportError from exc try: response.raise_for_status() except requests.exceptions.HTTPError as exc: wrapper_exc = exceptions.status_map.get( exc.response.status_code, exceptions.HTTPError ) raise wrapper_exc from exc return response class AbstractEndpoint(AbstractResource): def __init__(self, resource: AbstractResource = None): super().__init__(resource) self.url = urljoin(self.url, self.endpoint) @abc.abstractproperty def endpoint(self) -> str: pass PK! hydra_client/client.pyimport typing from .abc import AbstractResource from .consent import ConsentRequest from .login import LoginRequest from .oauth2 import OAuth2Client from .version import Version class Hydra(AbstractResource): def __init__(self, url: str): super().__init__() self.url = url class HydraAdmin(Hydra): def clients( self, limit: int = None, offset: int = None ) -> typing.List[OAuth2Client]: return OAuth2Client.list(self, limit, offset) def client(self, id: str) -> OAuth2Client: return OAuth2Client.get(id, self) def create_client( self, allowed_cors_origins: typing.List[str] = None, audience: typing.List[str] = None, client_id: str = None, client_name: str = None, client_secret: str = None, client_secret_expires_at: int = None, client_uri: str = None, contacts: typing.List[str] = None, grant_types: typing.List[str] = None, jwks: dict = None, jwks_uri: str = None, logo_uri: str = None, policy_uri: str = None, redirect_uris: typing.List[str] = None, redirect_object_signing_alg: str = None, request_uris: typing.List[str] = None, response_uris: typing.List[str] = None, scope: str = None, sector_identifier_uri: str = None, subject_type: str = None, token_endpoint_auth_method: str = None, tos_uri: str = None, userinfo_signed_response_alg: str = None, ) -> OAuth2Client: return OAuth2Client.create( self, allowed_cors_origins, audience, client_id, client_name, client_secret, client_secret_expires_at, client_uri, contacts, grant_types, jwks, jwks_uri, logo_uri, policy_uri, redirect_uris, redirect_object_signing_alg, request_uris, response_uris, scope, sector_identifier_uri, subject_type, token_endpoint_auth_method, tos_uri, userinfo_signed_response_alg, ) def login_request(self, challenge: str) -> LoginRequest: return LoginRequest.get(challenge, self) def consent_request(self, challenge: str) -> ConsentRequest: return ConsentRequest.get(challenge, self) def version(self) -> str: return Version.get(self) PK!QU hydra_client/consent.pyfrom __future__ import annotations import typing from .abc import AbstractEndpoint, AbstractResource from .utils import filter_none, urljoin if typing.TYPE_CHECKING: from .client import Hydra class ConsentRequest(AbstractEndpoint): endpoint = "/oauth2/auth/requests/consent" def __init__(self, data: dict, parent: AbstractResource): super().__init__(parent) self.acr = data["acr"] self.challenge = data["challenge"] self.client = data["client"] self.login_challenge = data["login_challenge"] self.login_session_id = data["login_session_id"] self.oidc_context = data["oidc_context"] self.request_url = data["request_url"] self.requested_access_token_audience = data["requested_access_token_audience"] self.requested_scope = data["requested_scope"] self.skip = data["skip"] self.subject = data["subject"] @classmethod def params(cls, challenge: str) -> dict: return {"consent_challenge": challenge} @classmethod def get(cls, challenge: str, hydra: Hydra) -> ConsentRequest: url = urljoin(hydra.url, cls.endpoint) response = hydra._request("GET", url, params=cls.params(challenge)) return cls(response.json(), hydra) def accept( self, grant_access_token_audience: typing.Iterable[str] = None, grant_scope: typing.Iterable[str] = None, remember: bool = False, remember_for: int = None, session: dict = None, ) -> str: data = filter_none( { "grant_access_token_audience": grant_access_token_audience, "grant_scope": grant_scope, "remember": remember, "remember_for": remember_for, "session": session, } ) url = urljoin(self.url, "accept") response = self._request( "PUT", url, params=self.params(self.challenge), json=data ) payload = response.json() return payload["redirect_to"] def reject( self, error: str = None, error_debug: str = None, error_description: str = None, error_hint: str = None, status_code: int = None, ) -> str: url = urljoin(self.url, "reject") data = filter_none( { "error": error, "error_debug": error_debug, "error_description": error_description, "error_hint": error_hint, "status_code": status_code, } ) response = self._request( "PUT", url, params=self.params(self.challenge), json=data ) payload = response.json() return payload["redirect_to"] PK!)44hydra_client/exceptions.pyclass HydraException(Exception): pass class UnboundResourceError(HydraException): pass class TransportError(HydraException): pass class ConnectionError(TransportError): pass class HTTPError(HydraException): pass class BadRequest(HTTPError): pass class Unauthorized(HTTPError): pass class Forbidden(HTTPError): pass class NotFound(HTTPError): pass class ServerError(HTTPError): pass status_map = { 400: BadRequest, 401: Unauthorized, 403: Forbidden, 404: NotFound, 500: ServerError, } PK!^2+ + hydra_client/login.pyfrom __future__ import annotations import typing from .abc import AbstractEndpoint, AbstractResource from .utils import filter_none, urljoin if typing.TYPE_CHECKING: from .client import Hydra class LoginRequest(AbstractEndpoint): endpoint = "/oauth2/auth/requests/login" def __init__(self, data: dict, parent: AbstractResource): super().__init__(parent) self.challenge = data["challenge"] self.client = data["client"] self.oidc_context = data["oidc_context"] self.request_url = data["request_url"] self.requested_access_token_audience = data["requested_access_token_audience"] self.requested_scope = data["requested_scope"] self.session_id = data["session_id"] self.skip = data["skip"] self.subject = data["subject"] @classmethod def params(cls, challenge: str) -> dict: return {"login_challenge": challenge} @classmethod def get(cls, challenge: str, hydra: Hydra) -> LoginRequest: url = urljoin(hydra.url, cls.endpoint) response = hydra._request("GET", url, cls.params(challenge)) return cls(response.json(), hydra) def accept( self, subject: str, acr: str = None, force_subject_identifier: str = None, remember: bool = False, remember_for: int = None, ) -> str: data = filter_none( { "acr": acr, "force_subject_identifier": force_subject_identifier, "remember": remember, "remember_for": remember_for, "subject": subject, } ) url = urljoin(self.url, "accept") response = self._request( "PUT", url, params=self.params(self.challenge), json=data ) payload = response.json() return payload["redirect_to"] def reject( self, error: str = None, error_debug: str = None, error_description: str = None, error_hint: str = None, status_code: int = None, ) -> str: url = urljoin(self.url, "reject") data = filter_none( { "error": error, "error_debug": error_debug, "error_description": error_description, "error_hint": error_hint, "status_code": status_code, } ) response = self._request( "PUT", url, params=self.params(self.challenge), json=data ) payload = response.json() return payload["redirect_to"] PK!4MMhydra_client/oauth2.pyfrom __future__ import annotations import typing from .abc import AbstractEndpoint, AbstractResource from .utils import filter_none, urljoin if typing.TYPE_CHECKING: from .client import Hydra class OAuth2Client(AbstractEndpoint): endpoint = "/clients" def __init__(self, data: dict, parent: AbstractResource): super().__init__(parent) self._update(data) self.url = urljoin(self.url, self.client_id) def _update(self, data: dict): self.allowed_cors_origins = data["allowed_cors_origins"] self.audience = data["audience"] self.client_id = data["client_id"] self.client_name = data["client_name"] self.client_secret = data.get("client_secret") self.client_uri = data["client_uri"] self.contacts = data["contacts"] self.grant_types = data["grant_types"] self.jwks = data.get("jwks") self.jwks_uri = data.get("jwks_uri", []) self.logo_uri = data["logo_uri"] self.owner = data["owner"] self.policy_uri = data["policy_uri"] self.redirect_uris = data["redirect_uris"] self.request_object_signing_alg = data.get("request_object_signing_alg") self.request_uris = data.get("request_uris", []) self.response_types = data["response_types"] self.scope = data["scope"] self.sector_identifier_uri = data.get("sector_identifier_uri") self.subject_type = data["subject_type"] self.token_endpoint_auth_method = data["token_endpoint_auth_method"] self.tos_uri = data["tos_uri"] self.userinfo_signed_response_alg = data["userinfo_signed_response_alg"] @classmethod def list( cls, hydra: Hydra, limit: int = None, offset: int = None ) -> typing.List[OAuth2Client]: url = urljoin(hydra.url, cls.endpoint) params = filter_none({"limit": limit, "offset": offset}) response = hydra._request("GET", url, params=params) payload = response.json() return [OAuth2Client(d, hydra) for d in payload] @classmethod def create( cls, hydra: Hydra, allowed_cors_origins: typing.List[str] = None, audience: typing.List[str] = None, client_id: str = None, client_name: str = None, client_secret: str = None, client_secret_expires_at: int = None, client_uri: str = None, contacts: typing.List[str] = None, grant_types: typing.List[str] = None, jwks: dict = None, jwks_uri: str = None, logo_uri: str = None, policy_uri: str = None, redirect_uris: typing.List[str] = None, redirect_object_signing_alg: str = None, request_uris: typing.List[str] = None, response_uris: typing.List[str] = None, scope: str = None, sector_identifier_uri: str = None, subject_type: str = None, token_endpoint_auth_method: str = None, tos_uri: str = None, userinfo_signed_response_alg: str = None, ) -> OAuth2Client: url = urljoin(hydra.url, cls.endpoint) data = filter_none( { "allowed_cors_origins": allowed_cors_origins, "audience": audience, "client_id": client_id, "client_name": client_name, "client_secret": client_secret, "client_secret_expires_at": client_secret_expires_at, "client_uri": client_uri, "contacts": contacts, "grant_types": grant_types, "jwks": jwks, "jwks_uri": jwks_uri, "logo_uri": logo_uri, "policy_uri": policy_uri, "redirect_uris": redirect_uris, "redirect_object_signing_alg": redirect_object_signing_alg, "request_uris": request_uris, "response_uris": response_uris, "scope": scope, "sector_identifier_uri": sector_identifier_uri, "subject_type": subject_type, "token_endpoint_auth_method": token_endpoint_auth_method, "tos_uri": tos_uri, "userinfo_signed_response_alg": userinfo_signed_response_alg, } ) response = hydra._request("POST", url, json=data) return cls(response.json(), hydra) @classmethod def get(cls, client_id: str, hydra: Hydra) -> OAuth2Client: url = urljoin(hydra.url, cls.endpoint, client_id) response = hydra._request("GET", url) return cls(response.json(), hydra) def update( self, allowed_cors_origins: typing.List[str] = None, audience: typing.List[str] = None, client_id: str = None, client_name: str = None, client_secret: str = None, client_secret_expires_at: int = None, client_uri: str = None, contacts: typing.List[str] = None, grant_types: typing.List[str] = None, jwks: dict = None, jwks_uri: str = None, logo_uri: str = None, policy_uri: str = None, redirect_uris: typing.List[str] = None, redirect_object_signing_alg: str = None, request_uris: typing.List[str] = None, response_uris: typing.List[str] = None, scope: str = None, sector_identifier_uri: str = None, subject_type: str = None, token_endpoint_auth_method: str = None, tos_uri: str = None, userinfo_signed_response_alg: str = None, ) -> OAuth2Client: data = filter_none( { "allowed_cors_origins": allowed_cors_origins, "audience": audience, "client_id": client_id, "client_name": client_name, "client_secret": client_secret, "client_secret_expires_at": client_secret_expires_at, "client_uri": client_uri, "contacts": contacts, "grant_types": grant_types, "jwks": jwks, "jwks_uri": jwks_uri, "logo_uri": logo_uri, "policy_uri": policy_uri, "redirect_uris": redirect_uris, "redirect_object_signing_alg": redirect_object_signing_alg, "request_uris": request_uris, "response_uris": response_uris, "scope": scope, "sector_identifier_uri": sector_identifier_uri, "subject_type": subject_type, "token_endpoint_auth_method": token_endpoint_auth_method, "tos_uri": tos_uri, "userinfo_signed_response_alg": userinfo_signed_response_alg, } ) response = self._request("PUT", self.url, json=data) payload = response.json() self._update(payload) return self def delete(self) -> None: self._request("DELETE", self.url) PK!%y0hydra_client/utils.pydef filter_none(data: dict) -> dict: return {k: v for k, v in data.items() if v is not None} def urljoin(url: str, *parts: str) -> str: return "/".join( (url.rstrip("/"), *(p.strip("/") for p in parts[:-1]), (parts[-1]).lstrip("/")) ) PK!U hydra_client/version.pyfrom __future__ import annotations import typing import requests from . import exceptions from .abc import AbstractEndpoint from .utils import urljoin if typing.TYPE_CHECKING: from .client import Hydra class Version(AbstractEndpoint): endpoint = "/version" @classmethod def get(cls, hydra: Hydra) -> str: url = urljoin(hydra.url, cls.endpoint) response = hydra._request("GET", url) payload = response.json() return payload["version"] PK!HnHTU"hydra_client-0.3.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H^&0/%hydra_client-0.3.0.dist-info/METADATAQn0 +|LB $XLv4i#5I%0qa~xKKtB+FO(C}. E#H6Oوѡ͙{ 8_Io \*!oIJQ5CdJK[sIR WwGVh|umuو9 "p8^~LE ={' xwPn6`#.V9mOe32(PlPUO?J5/ /u/}, 4cp05 !K,PK!H ?6#hydra_client-0.3.0.dist-info/RECORD}ҽ@|\h 6@Q _v 'fߩsfCRGa\PR_aHKpv&osɸ6,O6G$TZsDoٔND*68^˻wxAQ{5?rT ks"RWp. 1]Dc#TZJE9Uei-ޝX"Sݙ%?T d)Z7Oj8hG}r)Wcy`$;UHMRzl W[WTgZNW-D'ϊ3ג+t.|r@WZ ǯHu?xLxL݇1]Zf+3-柜+2ZLRr"7Djӫ-duCMR^s 7 OY>-,_6>.[9)BGAMxxa&̈́YB2~Vo?piuq?(t*0ࡎGUǜ~-ח^g*1Q"ޜm(yIK9r՘6#8q/vΒPK!Q566hydra_client/__init__.pyPK!͖t ((lhydra_client/abc.pyPK! hydra_client/client.pyPK!QU hydra_client/consent.pyPK!)44hydra_client/exceptions.pyPK!^2+ + ^hydra_client/login.pyPK!4MM(hydra_client/oauth2.pyPK!%y0=Dhydra_client/utils.pyPK!U qEhydra_client/version.pyPK!HnHTU"Ghydra_client-0.3.0.dist-info/WHEELPK!H^&0/%$Hhydra_client-0.3.0.dist-info/METADATAPK!H ?6#Ihydra_client-0.3.0.dist-info/RECORDPK [ L