PK!aGallegro_pl/__init__.pyfrom .allegro import Allegro from .oauth import AllegroAuth, ClientCredentialsAuth, URL_TOKEN __name__ = 'mattes-allegro-pl' __version__='0.2.0' PK!ZKallegro_pl/allegro.pyimport allegro_api.configuration import allegro_api.rest import tenacity import zeep from .oauth import AllegroAuth class Allegro: def __init__(self, auth_handler: AllegroAuth): self.oauth = auth_handler if not self.oauth.access_token: if self.oauth._refresh_token: self.oauth.refresh_token() else: self.oauth.fetch_token() def rest_api_client(self): config = allegro_api.configuration.Configuration() config.host = 'https://api.allegro.pl' self.oauth.configure(config) return allegro_api.ApiClient(config) def retry(self, fn): return tenacity.retry( retry=AllegroAuth.token_needs_refresh, before=self.oauth.retry_refresh_token, stop=tenacity.stop_after_attempt(2) )(fn) def web_api_client(self): return zeep.client.Client('https://webapi.allegro.pl/service.php?wsdl') PK!upa) ) allegro_pl/oauth.pyimport abc import json import concurrent.futures import allegro_api.rest import oauthlib.oauth2 import requests_oauthlib URL_TOKEN = 'https://allegro.pl/auth/oauth/token' class AllegroAuth: def __init__(self, client_id: str, client_secret: str, access_token=None, refresh_token=None): self.client_id: str = client_id self.client_secret: str = client_secret self._access_token: str = access_token self._refresh_token: str = refresh_token self._config: allegro_api.configuration.Configuration = None @property def access_token(self) -> str: return self._access_token @access_token.setter def access_token(self, access_token: str) -> None: self._access_token = access_token self.update_configuration() @abc.abstractmethod def fetch_token(self): pass def configure(self, config: allegro_api.configuration.Configuration): self._config = config self.update_configuration() def update_configuration(self): if self._config: self._config.access_token = self._access_token def retry_refresh_token(self, _, attempt) -> None: if attempt <= 1: return self.refresh_token() @abc.abstractmethod def refresh_token(self): pass @staticmethod def token_needs_refresh(f: concurrent.futures.Future) -> bool: x = f.exception(0) if isinstance(x, allegro_api.rest.ApiException) and x.status == 401: body = json.loads(x.body) return body['error'] == 'invalid_token' and body['error_description'].startswith('Access token expired: ') else: return False class ClientCredentialsAuth(AllegroAuth): def __init__(self, client_id, client_secret): super().__init__(client_id, client_secret) client = oauthlib.oauth2.BackendApplicationClient(self.client_id, access_token=self.access_token) self.oauth = requests_oauthlib.OAuth2Session(client=client, token_updater=self.access_token) def fetch_token(self): token = self.oauth.fetch_token(URL_TOKEN, client_id=self.client_id, client_secret=self.client_secret) self.access_token = token['access_token'] def refresh_token(self): return self.fetch_token() PK!HڽTU'mattes_allegro_pl-0.2.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HoE*mattes_allegro_pl-0.2.0.dist-info/METADATAMO1+zԄ6e%6@B ~G]N.yw2s}жQ,ݬ["2[ƀ)ZAErZ.l'ٸ[[l[%Sݎ z]CE#)vL)XM9oqz Zy[y0F7ASt_I*ˋbpFN0+!;\R* Z?'H\%] :bkw H'D,PK!Hش?*(mattes_allegro_pl-0.2.0.dist-info/RECORDͻr@@o!)vQ B<">MMgwsqY׾AmF(sԐa _ˌ!t9#EpSc(6//߰:\nngώcl0@n]C?#ިϚu,h!m=L U۪ E,IyӬ}ӿhw"kec/=L+.VU)[Y:0nAWWyIηӀ~dx b3}Cy/ep%JpڎFQ_PK!aGallegro_pl/__init__.pyPK!ZKallegro_pl/allegro.pyPK!upa) ) allegro_pl/oauth.pyPK!HڽTU'mattes_allegro_pl-0.2.0.dist-info/WHEELPK!HoE*mattes_allegro_pl-0.2.0.dist-info/METADATAPK!Hش?*(mattes_allegro_pl-0.2.0.dist-info/RECORDPKu