PK!YC;allegro_pl/__init__.pyfrom .allegro import Allegro from .oauth import AllegroAuth, ClientCredentialsAuth, URL_TOKEN, TokenStore, PassTokenStore __name__ = 'mattes-allegro-pl' __version__ = '0.3.2' PK!w3 allegro_pl/allegro.pyimport logging import typing import allegro_api.configuration import allegro_api.rest import tenacity import zeep from .oauth import AllegroAuth logger = logging.getLogger(__name__) class Allegro: def __init__(self, auth_handler: AllegroAuth): self._webapi_session_handle: typing.Optional[str] = None self.oauth = auth_handler self.oauth.set_token_update_handler(self._on_token_updated) ts = self.oauth.token_store if not ts.access_token: if ts.refresh_token: self.oauth.refresh_token() else: self.oauth.fetch_token() self._init_rest_client() self._init_webapi_client() def _init_rest_client(self): config = allegro_api.configuration.Configuration() config.host = 'https://api.allegro.pl' config.access_token = self.oauth.token_store.access_token self.oauth.configuration = config self._rest_client = allegro_api.ApiClient(config) def _init_webapi_client(self): self._webapi_client = zeep.client.Client('https://webapi.allegro.pl/service.php?wsdl') self._webapi_client_login() def _on_token_updated(self): ts = self.oauth.token_store self._rest_client.configuration.access_token = ts.access_token self._webapi_client_login() def rest_api_client(self) -> allegro_api.ApiClient: """:return OAuth2 authenticated REST client""" return self._rest_client def webapi_client(self): """:return authenticated SOAP (WebAPI) client""" return self._webapi_client def _retry_refresh_token(self, retry_state) -> None: if retry_state.attempt_number <= 1: return self.oauth.refresh_token() def retry(self, fn): """Decorator to handle expired access token exceptions. Example: .. code-block:: python allegro = allegro_api.Allegro(...) @allegro.retry def get_cats(**kwargs): return self._cat_api.get_categories_using_get(**kwargs) """ return tenacity.retry( retry=AllegroAuth.token_needs_refresh, before=self._retry_refresh_token, stop=tenacity.stop_after_attempt(2) )(fn) def _webapi_client_login(self): if not self._webapi_client: return @self.retry def do_webapi_client_login(): logger.info('Login to webapi') self._webapi_session_handle = self._webapi_client.service.doLoginWithAccessToken( self.oauth.token_store.access_token, 1, self.oauth.client_id).sessionHandlePart do_webapi_client_login() @property def webapi_session_handle(self): return self._webapi_session_handle PK![;;allegro_pl/oauth.pyimport abc import json import logging import typing from dataclasses import dataclass import allegro_api.rest import oauthlib.oauth2 import requests.exceptions import requests_oauthlib import zeep.exceptions URL_TOKEN = 'https://allegro.pl/auth/oauth/token' logger = logging.getLogger(__name__) @dataclass class TokenStore(abc.ABC): def __init__(self): self._access_token: typing.Optional[str] = None self._refresh_token: typing.Optional[str] = None @abc.abstractmethod def save(self) -> None: logger.debug('save tokens') @property def access_token(self) -> str: return self._access_token @access_token.setter def access_token(self, access_token: str) -> None: self._access_token = access_token @property def refresh_token(self) -> str: return self._refresh_token @refresh_token.setter def refresh_token(self, refresh_token: str) -> None: self._refresh_token = refresh_token @classmethod def from_dict(cls: typing.Type['TokenStore'], data: dict) -> 'TokenStore': ts = cls() ts.update_from_dict(data) return ts def update_from_dict(self, data: dict) -> None: self.access_token = data.get('access_token') self.refresh_token = data.get('refresh_token') def to_dict(self) -> dict: d = {} if self._access_token: d['access_token'] = self.access_token if self._refresh_token: d['refresh_token'] = self.refresh_token return d class PassTokenStore(TokenStore): """In-memory Token store implementation""" def save(self) -> None: pass class AllegroAuth: """Handle acquiring and refreshing access_token""" def __init__(self, client_id: str, client_secret: str, token_store: TokenStore): self.client_id: str = client_id self.client_secret: str = client_secret assert token_store is not None self._token_store = token_store self._notify_token_updated: typing.Callable[[], None] = self._on_token_updated_pass def _on_token_updated(self, token) -> None: logger.debug('token updated') self._token_store.update_from_dict(token) self._token_store.save() self._notify_token_updated() def _on_token_updated_pass(self): pass def set_token_update_handler(self, f: typing.Callable[[], None]) -> None: self._notify_token_updated = f @property def token_store(self) -> TokenStore: return self._token_store @staticmethod def token_needs_refresh(retry_state) -> bool: x = retry_state.outcome.exception(0) if x is None: return False if isinstance(x, allegro_api.rest.ApiException) and x.status == 401: body = json.loads(x.body) return body['error'] == 'invalid_token' and body['error_description'].startswith('Access token expired: ') elif isinstance(x, zeep.exceptions.Fault): if x.code == 'ERR_INVALID_ACCESS_TOKEN': return True else: raise x elif isinstance(x, requests.exceptions.ConnectionError): return x.args[0].args[0] == 'Connection aborted.' else: raise x @abc.abstractmethod def fetch_token(self) -> None: logger.info('fetch token') @abc.abstractmethod def refresh_token(self) -> None: logger.info('refresh token') class ClientCredentialsAuth(AllegroAuth): """Authenticate with Client credentials flow""" def __init__(self, client_id, client_secret): super().__init__(client_id, client_secret, PassTokenStore()) client = oauthlib.oauth2.BackendApplicationClient(self.client_id, access_token=self.token_store.access_token) self.oauth = requests_oauthlib.OAuth2Session(client=client, token_updater=self._on_token_updated) def fetch_token(self): token = self.oauth.fetch_token(URL_TOKEN, client_id=self.client_id, client_secret=self.client_secret) self._on_token_updated(token) def refresh_token(self): self.fetch_token() PK!HڽTU'mattes_allegro_pl-0.3.2.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HNХ/%*mattes_allegro_pl-0.3.2.dist-info/METADATAMO1sԄm.Jld L`A8/.)* A3U b4)YUna;aAފU/T"XFbR 2k}lw$DC`j+d)^jm0ΠGVJ;1:*6M>HΔJ:m+/OǁoKb3$h%)^Ѭ3HY(;2Sf}r>Nwi]OAh\˰o h3߄mnv}PK!Hw-(mattes_allegro_pl-0.3.2.dist-info/RECORDͻv0g bh NZKOߥIQɕFu1(Tj$ jS;ލ4OY8,\fHK+\&%iۘEOrs"'pӬ:әx};p[E>+8foA"tF84z Jo +s];jˮU趯G$?-[`PK!YC;allegro_pl/__init__.pyPK!w3 allegro_pl/allegro.pyPK![;; allegro_pl/oauth.pyPK!HڽTU'{mattes_allegro_pl-0.3.2.dist-info/WHEELPK!HNХ/%*mattes_allegro_pl-0.3.2.dist-info/METADATAPK!Hw-(mattes_allegro_pl-0.3.2.dist-info/RECORDPK