PK!Q566hydra_client/__init__.py__version__ = "0.1.0" from .client import HydraAdmin PK!Cf,,hydra_client/abc.pyfrom __future__ import annotations import abc import typing import requests from . import exceptions from .utils import urljoin if typing.TYPE_CHECKING: from .client import Hydra class AbstractResource(abc.ABC): def __init__(self, resource: AbstractResource = None): if resource is not None: self._session: requests.Session = resource._session self.url: str = resource.url else: self._session = requests.Session() self.url = "" def _request( self, method: str, url: str, params: dict = None, json: dict = None ) -> requests.Response: try: response = self._session.request(method, url, params=params, json=json) except ( requests.exceptions.ConnectionError, requests.exceptions.Timeout, ) as exc: raise exceptions.ConnectionError from exc except requests.exceptions.RequestException as exc: raise exceptions.TransportError from exc try: response.raise_for_status() except requests.exceptions.HTTPError as exc: wrapper_exc = exceptions.status_map.get( exc.response.status_code, exceptions.HTTPError ) raise wrapper_exc from exc return response class AbstractEndpoint(AbstractResource): def __init__(self, resource: AbstractResource = None): super().__init__(resource) self.url = urljoin(self.url, self.endpoint) @abc.abstractproperty def endpoint(self) -> str: pass PK!ybhydra_client/client.pyimport typing from .abc import AbstractResource from .consent import ConsentRequest, ConsentSession from .login import LoginRequest, LoginSession from .logout import LogoutRequest from .oauth2 import OAuth2Client from .version import Version class Hydra(AbstractResource): def __init__(self, url: str): super().__init__() self.url = url class HydraAdmin(Hydra): def clients( self, limit: int = None, offset: int = None ) -> typing.List[OAuth2Client]: return OAuth2Client.list(self, limit, offset) def client(self, id: str) -> OAuth2Client: return OAuth2Client.get(id, self) def create_client( self, allowed_cors_origins: typing.List[str] = None, audience: typing.List[str] = None, backchannel_logout_session_required: bool = None, backchannel_logout_uri: str = None, client_id: str = None, client_name: str = None, client_secret: str = None, client_secret_expires_at: int = None, client_uri: str = None, contacts: typing.List[str] = None, frontchannel_logout_session_required: bool = None, frontchannel_logout_uri: str = None, grant_types: typing.List[str] = None, jwks: dict = None, jwks_uri: str = None, logo_uri: str = None, owner: str = None, policy_uri: str = None, post_logout_redirect_uris: typing.List[str] = None, redirect_uris: typing.List[str] = None, redirect_object_signing_alg: str = None, request_uris: typing.List[str] = None, response_uris: typing.List[str] = None, scope: str = None, sector_identifier_uri: str = None, subject_type: str = None, token_endpoint_auth_method: str = None, tos_uri: str = None, userinfo_signed_response_alg: str = None, ) -> OAuth2Client: return OAuth2Client.create( self, allowed_cors_origins, audience, backchannel_logout_session_required, backchannel_logout_uri, client_id, client_name, client_secret, client_secret_expires_at, client_uri, contacts, frontchannel_logout_session_required, frontchannel_logout_uri, grant_types, jwks, jwks_uri, logo_uri, owner, policy_uri, post_logout_redirect_uris, redirect_uris, redirect_object_signing_alg, request_uris, response_uris, scope, sector_identifier_uri, subject_type, token_endpoint_auth_method, tos_uri, userinfo_signed_response_alg, ) def login_request(self, challenge: str) -> LoginRequest: return LoginRequest.get(challenge, self) def consent_request(self, challenge: str) -> ConsentRequest: return ConsentRequest.get(challenge, self) def logout_request(self, challenge: str) -> LogoutRequest: return LogoutRequest.get(challenge, self) def consent_sessions(self, subject: str) -> typing.Iterator[ConsentSession]: yield from ConsentSession.list(subject, self) def revoke_consent_sessions(self, subject: str, client: str = None) -> None: ConsentSession.revoke(subject, client, self) def invalidate_login_sessions(self, subject: str) -> None: LoginSession.invalidate(subject, self) def version(self) -> str: return Version.get(self) PK!fg11hydra_client/consent.pyfrom __future__ import annotations import typing from .abc import AbstractEndpoint, AbstractResource from .utils import filter_none, urljoin if typing.TYPE_CHECKING: from .client import Hydra class ConsentRequest(AbstractEndpoint): endpoint = "/oauth2/auth/requests/consent" def __init__(self, data: dict, parent: AbstractResource): super().__init__(parent) self.acr = data["acr"] self.challenge = data["challenge"] self.client = data["client"] self.context = data.get("context") self.login_challenge = data["login_challenge"] self.login_session_id = data["login_session_id"] self.oidc_context = data["oidc_context"] self.request_url = data["request_url"] self.requested_access_token_audience = data["requested_access_token_audience"] self.requested_scope = data["requested_scope"] self.skip = data["skip"] self.subject = data["subject"] @classmethod def params(cls, challenge: str) -> dict: return {"consent_challenge": challenge} @classmethod def get(cls, challenge: str, hydra: Hydra) -> ConsentRequest: url = urljoin(hydra.url, cls.endpoint) response = hydra._request("GET", url, params=cls.params(challenge)) return cls(response.json(), hydra) def accept( self, grant_access_token_audience: typing.Iterable[str] = None, grant_scope: typing.Iterable[str] = None, remember: bool = False, remember_for: int = None, session: dict = None, ) -> str: data = filter_none( { "grant_access_token_audience": grant_access_token_audience, "grant_scope": grant_scope, "remember": remember, "remember_for": remember_for, "session": session, } ) url = urljoin(self.url, "accept") response = self._request( "PUT", url, params=self.params(self.challenge), json=data ) payload = response.json() return payload["redirect_to"] def reject( self, error: str = None, error_debug: str = None, error_description: str = None, error_hint: str = None, status_code: int = None, ) -> str: url = urljoin(self.url, "reject") data = filter_none( { "error": error, "error_debug": error_debug, "error_description": error_description, "error_hint": error_hint, "status_code": status_code, } ) response = self._request( "PUT", url, params=self.params(self.challenge), json=data ) payload = response.json() return payload["redirect_to"] class ConsentSession(AbstractResource): endpoint = "/oauth2/auth/sessions/consent" def __init__(self, data: dict, parent: AbstractResource): super().__init__(parent) self.consent_request = ConsentRequest(data["consent_request"], parent) self.grant_access_token_audience = data["grant_access_token_audience"] self.grant_scope = data["grant_scope"] self.remember = data["remember"] self.remember_for = data["remember_for"] self.session = data.get("session") @classmethod def params(cls, subject: str, client: str = None) -> dict: return filter_none({"subject": subject, "client": client}) @classmethod def list(cls, subject: str, hydra: Hydra) -> typing.Iterator[ConsentSession]: url = urljoin(hydra.url, cls.endpoint) response = hydra._request("GET", url, params=cls.params(subject)) session_list = response.json() for consent_session in session_list: yield ConsentSession(consent_session, hydra) @classmethod def revoke(cls, subject: str, client: typing.Optional[str], hydra: Hydra) -> None: url = urljoin(hydra.url, cls.endpoint) # This returns 204/201 without any content hydra._request("DELETE", url, params=cls.params(subject, client)) PK!)44hydra_client/exceptions.pyclass HydraException(Exception): pass class UnboundResourceError(HydraException): pass class TransportError(HydraException): pass class ConnectionError(TransportError): pass class HTTPError(HydraException): pass class BadRequest(HTTPError): pass class Unauthorized(HTTPError): pass class Forbidden(HTTPError): pass class NotFound(HTTPError): pass class ServerError(HTTPError): pass status_map = { 400: BadRequest, 401: Unauthorized, 403: Forbidden, 404: NotFound, 500: ServerError, } PK!c  hydra_client/login.pyfrom __future__ import annotations import typing from .abc import AbstractEndpoint, AbstractResource from .utils import filter_none, urljoin if typing.TYPE_CHECKING: from .client import Hydra class LoginRequest(AbstractEndpoint): endpoint = "/oauth2/auth/requests/login" def __init__(self, data: dict, parent: AbstractResource): super().__init__(parent) self.challenge = data["challenge"] self.client = data["client"] self.oidc_context = data["oidc_context"] self.request_url = data["request_url"] self.requested_access_token_audience = data["requested_access_token_audience"] self.requested_scope = data["requested_scope"] self.session_id = data["session_id"] self.skip = data["skip"] self.subject = data["subject"] @classmethod def params(cls, challenge: str) -> dict: return {"login_challenge": challenge} @classmethod def get(cls, challenge: str, hydra: Hydra) -> LoginRequest: url = urljoin(hydra.url, cls.endpoint) response = hydra._request("GET", url, cls.params(challenge)) return cls(response.json(), hydra) def accept( self, subject: str, acr: str = None, context: dict = None, force_subject_identifier: str = None, remember: bool = False, remember_for: int = None, ) -> str: data = filter_none( { "acr": acr, "context": context, "force_subject_identifier": force_subject_identifier, "remember": remember, "remember_for": remember_for, "subject": subject, } ) url = urljoin(self.url, "accept") response = self._request( "PUT", url, params=self.params(self.challenge), json=data ) payload = response.json() return payload["redirect_to"] def reject( self, error: str = None, error_debug: str = None, error_description: str = None, error_hint: str = None, status_code: int = None, ) -> str: url = urljoin(self.url, "reject") data = filter_none( { "error": error, "error_debug": error_debug, "error_description": error_description, "error_hint": error_hint, "status_code": status_code, } ) response = self._request( "PUT", url, params=self.params(self.challenge), json=data ) payload = response.json() return payload["redirect_to"] class LoginSession(AbstractResource): endpoint = "/oauth2/auth/sessions/login" @classmethod def params(cls, subject: str) -> dict: return {"subject": subject} @classmethod def invalidate(cls, subject: str, hydra: Hydra) -> None: url = urljoin(hydra.url, cls.endpoint) # This returns 204/201 without any content hydra._request("DELETE", url, params=cls.params(subject)) PK!C  hydra_client/logout.pyfrom __future__ import annotations import typing from .abc import AbstractEndpoint, AbstractResource from .utils import filter_none, urljoin if typing.TYPE_CHECKING: from .client import Hydra class LogoutRequest(AbstractEndpoint): endpoint = "/oauth2/auth/requests/logout" def __init__(self, data: dict, parent: AbstractResource): super().__init__(parent) self.challenge = data["_challenge"] self.request_url = data["request_url"] self.rp_initiated = data["rp_initiated"] self.sid = data["sid"] self.subject = data["subject"] @classmethod def params(cls, challenge: str) -> dict: return {"logout_challenge": challenge} @classmethod def get(cls, challenge: str, hydra: Hydra) -> LogoutRequest: url = urljoin(hydra.url, cls.endpoint) response = hydra._request("GET", url, cls.params(challenge)) # NOTE: we have to inject the challenge here since the endpoint doesn't # return it as it's the case for login/consent. data = response.json() data["_challenge"] = challenge return cls(data, hydra) def accept( self, subject: str, acr: str = None, context: dict = None, force_subject_identifier: str = None, remember: bool = False, remember_for: int = None, ) -> str: data = filter_none( { "acr": acr, "context": context, "force_subject_identifier": force_subject_identifier, "remember": remember, "remember_for": remember_for, "subject": subject, } ) url = urljoin(self.url, "accept") response = self._request( "PUT", url, params=self.params(self.challenge), json=data ) payload = response.json() return payload["redirect_to"] def reject( self, error: str = None, error_debug: str = None, error_description: str = None, error_hint: str = None, status_code: int = None, ) -> None: url = urljoin(self.url, "reject") data = filter_none( { "error": error, "error_debug": error_debug, "error_description": error_description, "error_hint": error_hint, "status_code": status_code, } ) # This returns 204/201 without any content self._request("PUT", url, params=self.params(self.challenge), json=data) PK!D=4#4#hydra_client/oauth2.pyfrom __future__ import annotations import typing from .abc import AbstractEndpoint, AbstractResource from .utils import filter_none, urljoin if typing.TYPE_CHECKING: from .client import Hydra class OAuth2Client(AbstractEndpoint): endpoint = "/clients" def __init__(self, data: dict, parent: AbstractResource): super().__init__(parent) self._update(data) self.url = urljoin(self.url, self.client_id) def _update(self, data: dict): self.allowed_cors_origins = data["allowed_cors_origins"] self.audience = data["audience"] self.backchannel_logout_session_required = data.get( "backchannel_logout_session_required" ) self.backchannel_logout_uri = data.get("backchannel_logout_uri") self.client_id = data["client_id"] self.client_name = data["client_name"] self.client_secret = data.get("client_secret") self.client_secret_expires_at = data["client_secret_expires_at"] self.client_uri = data["client_uri"] self.contacts = data["contacts"] self.frontchannel_logout_session_required = data.get( "frontchannel_logout_session_required" ) self.frontchannel_logout_uri = data.get("frontchannel_logout_uri") self.grant_types = data["grant_types"] self.jwks = data.get("jwks") self.jwks_uri = data.get("jwks_uri", []) self.logo_uri = data["logo_uri"] self.owner = data["owner"] self.policy_uri = data["policy_uri"] self.post_logout_redirect_uris = data.get("post_logout_redirect_uris") self.redirect_uris = data["redirect_uris"] self.request_object_signing_alg = data.get("request_object_signing_alg") self.request_uris = data.get("request_uris", []) self.response_types = data["response_types"] self.scope = data["scope"] self.sector_identifier_uri = data.get("sector_identifier_uri") self.subject_type = data["subject_type"] self.token_endpoint_auth_method = data["token_endpoint_auth_method"] self.tos_uri = data["tos_uri"] self.updated_at = data["updated_at"] self.userinfo_signed_response_alg = data["userinfo_signed_response_alg"] @classmethod def list( cls, hydra: Hydra, limit: int = None, offset: int = None ) -> typing.List[OAuth2Client]: url = urljoin(hydra.url, cls.endpoint) params = filter_none({"limit": limit, "offset": offset}) response = hydra._request("GET", url, params=params) payload = response.json() return [OAuth2Client(d, hydra) for d in payload] @classmethod def create( cls, hydra: Hydra, allowed_cors_origins: typing.List[str] = None, audience: typing.List[str] = None, backchannel_logout_session_required: bool = None, backchannel_logout_uri: str = None, client_id: str = None, client_name: str = None, client_secret: str = None, client_secret_expires_at: int = None, client_uri: str = None, contacts: typing.List[str] = None, frontchannel_logout_session_required: bool = None, frontchannel_logout_uri: str = None, grant_types: typing.List[str] = None, jwks: dict = None, jwks_uri: str = None, logo_uri: str = None, owner: str = None, policy_uri: str = None, post_logout_redirect_uris: typing.List[str] = None, redirect_uris: typing.List[str] = None, redirect_object_signing_alg: str = None, request_uris: typing.List[str] = None, response_uris: typing.List[str] = None, scope: str = None, sector_identifier_uri: str = None, subject_type: str = None, token_endpoint_auth_method: str = None, tos_uri: str = None, userinfo_signed_response_alg: str = None, ) -> OAuth2Client: url = urljoin(hydra.url, cls.endpoint) data = filter_none( { "allowed_cors_origins": allowed_cors_origins, "audience": audience, "backchannel_logout_session_required": backchannel_logout_session_required, "backchannel_logout_uri": backchannel_logout_uri, "client_id": client_id, "client_name": client_name, "client_secret": client_secret, "client_secret_expires_at": client_secret_expires_at, "client_uri": client_uri, "contacts": contacts, "frontchannel_logout_session_required": frontchannel_logout_session_required, "frontchannel_logout_uri": frontchannel_logout_uri, "grant_types": grant_types, "jwks": jwks, "jwks_uri": jwks_uri, "logo_uri": logo_uri, "owner": owner, "policy_uri": policy_uri, "post_logout_redirect_uris": post_logout_redirect_uris, "redirect_uris": redirect_uris, "redirect_object_signing_alg": redirect_object_signing_alg, "request_uris": request_uris, "response_uris": response_uris, "scope": scope, "sector_identifier_uri": sector_identifier_uri, "subject_type": subject_type, "token_endpoint_auth_method": token_endpoint_auth_method, "tos_uri": tos_uri, "userinfo_signed_response_alg": userinfo_signed_response_alg, } ) response = hydra._request("POST", url, json=data) return cls(response.json(), hydra) @classmethod def get(cls, client_id: str, hydra: Hydra) -> OAuth2Client: url = urljoin(hydra.url, cls.endpoint, client_id) response = hydra._request("GET", url) return cls(response.json(), hydra) def update( self, allowed_cors_origins: typing.List[str] = None, audience: typing.List[str] = None, backchannel_logout_session_required: bool = None, backchannel_logout_uri: str = None, client_id: str = None, client_name: str = None, client_secret: str = None, client_secret_expires_at: int = None, client_uri: str = None, contacts: typing.List[str] = None, frontchannel_logout_session_required: bool = None, frontchannel_logout_uri: str = None, grant_types: typing.List[str] = None, jwks: dict = None, jwks_uri: str = None, logo_uri: str = None, owner: str = None, policy_uri: str = None, post_logout_redirect_uris: typing.List[str] = None, redirect_uris: typing.List[str] = None, redirect_object_signing_alg: str = None, request_uris: typing.List[str] = None, response_uris: typing.List[str] = None, scope: str = None, sector_identifier_uri: str = None, subject_type: str = None, token_endpoint_auth_method: str = None, tos_uri: str = None, userinfo_signed_response_alg: str = None, ) -> OAuth2Client: data = filter_none( { "allowed_cors_origins": allowed_cors_origins, "audience": audience, "backchannel_logout_session_required": backchannel_logout_session_required, "backchannel_logout_uri": backchannel_logout_uri, "client_id": client_id, "client_name": client_name, "client_secret": client_secret, "client_secret_expires_at": client_secret_expires_at, "client_uri": client_uri, "contacts": contacts, "frontchannel_logout_session_required": frontchannel_logout_session_required, "frontchannel_logout_uri": frontchannel_logout_uri, "grant_types": grant_types, "jwks": jwks, "jwks_uri": jwks_uri, "logo_uri": logo_uri, "owner": owner, "policy_uri": policy_uri, "post_logout_redirect_uris": post_logout_redirect_uris, "redirect_uris": redirect_uris, "redirect_object_signing_alg": redirect_object_signing_alg, "request_uris": request_uris, "response_uris": response_uris, "scope": scope, "sector_identifier_uri": sector_identifier_uri, "subject_type": subject_type, "token_endpoint_auth_method": token_endpoint_auth_method, "tos_uri": tos_uri, "userinfo_signed_response_alg": userinfo_signed_response_alg, } ) response = self._request("PUT", self.url, json=data) payload = response.json() self._update(payload) return self def delete(self) -> None: self._request("DELETE", self.url) PK!%y0hydra_client/utils.pydef filter_none(data: dict) -> dict: return {k: v for k, v in data.items() if v is not None} def urljoin(url: str, *parts: str) -> str: return "/".join( (url.rstrip("/"), *(p.strip("/") for p in parts[:-1]), (parts[-1]).lstrip("/")) ) PK!U hydra_client/version.pyfrom __future__ import annotations import typing import requests from . import exceptions from .abc import AbstractEndpoint from .utils import urljoin if typing.TYPE_CHECKING: from .client import Hydra class Version(AbstractEndpoint): endpoint = "/version" @classmethod def get(cls, hydra: Hydra) -> str: url = urljoin(hydra.url, cls.endpoint) response = hydra._request("GET", url) payload = response.json() return payload["version"] PK!HnHTU"hydra_client-0.4.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H AX.%hydra_client-0.4.0.dist-info/METADATAQn0 +|LB $XLv4i#5I%0qa~=gKKtB+FO(C}. E#H6OKC'%3q46`睫3@UBޒZ/aAk(XEZ˓2$&))sDpR+x2.$mca{N>l:b$D\4h8 /r95`P{egtePJ*ؠ:0k_<^ 8_XziƲ`2bi/T_UY:dPK!Hn"`#hydra_client-0.4.0.dist-info/RECORD}I@{s@ A(EO?&`Oҧ޿1j?,POGuGrrNѤm]2]l`) ԦK@#s5IkAS~ QÖZTfD=Js ޭ?όi,w/¸K-^t7hydra_client/oauth2.pyPK!%y0ahydra_client/utils.pyPK!U chydra_client/version.pyPK!HnHTU">ehydra_client-0.4.0.dist-info/WHEELPK!H AX.%ehydra_client-0.4.0.dist-info/METADATAPK!Hn"`#Cghydra_client-0.4.0.dist-info/RECORDPK i