PK?N Wpygetty/__init__.pyfrom __future__ import unicode_literals __doc__ = 'Wrapper for Getty v3 API' __author__ = 'Josh Klar ' __version__ = '1.2.3' PK?Ndpygetty/assetchanges.pyfrom __future__ import absolute_import, unicode_literals import requests from .auth import flex_auth from .util import gen_v3_url """ 1. Use the /v3/asset-changes/channels endpoint to retrieve the IDs for your channels (you only need to do this once) 2. Use the /v3/asset-changes/change-sets endpoint along with one of the channel IDs to retrieve a list of image IDs in that channel. The max batch size is 500 IDs. 3. Use the /v3/asset-changes/change-sets/{change-set-id} to confirm that you’ve received the list. 4. Repeat steps 2 and 3 as many time as necessary until the channel returns zero IDs. """ def channels( api_key=None, client_secret=None, auth_token_manager=None, ): """Get a list of asset change notification channels. """ auth_token_manager = flex_auth( api_key=api_key, client_secret=client_secret, auth_token_manager=auth_token_manager, ) res = requests.get( gen_v3_url('asset-changes', 'channels'), headers=auth_token_manager.request_headers(), ) res.raise_for_status() return res.json() def changesets( channel_id, batch_size=None, api_key=None, client_secret=None, auth_token_manager=None, ): """Get asset change notifications """ auth_token_manager = flex_auth( api_key=api_key, client_secret=client_secret, auth_token_manager=auth_token_manager, ) params = { 'channel_id': channel_id, } if batch_size: params['batch_size'] = batch_size res = requests.put( gen_v3_url('asset-changes', 'change-sets'), headers=auth_token_manager.request_headers(), params=params, ) res.raise_for_status() return res.json() def confirm( changeset_id, api_key=None, client_secret=None, auth_token_manager=None, ): """Confirm asset change notifications. """ auth_token_manager = flex_auth( api_key=api_key, client_secret=client_secret, auth_token_manager=auth_token_manager, ) res = requests.delete( gen_v3_url('asset-changes', 'change-sets', changeset_id), headers=auth_token_manager.request_headers(), ) res.raise_for_status() return True PK?N*gMMpygetty/assets.pyfrom __future__ import absolute_import, unicode_literals from copy import deepcopy import requests from .auth import flex_auth from .search import asset_formatters from .util import gen_v3_url def individual_asset( id, asset_type, detailed=False, fields=set(), query_params={}, auth_token_manager=None, api_key=None, client_secret=None, ): assert asset_type in asset_formatters auth_token_manager = flex_auth( auth_token_manager=auth_token_manager, api_key=api_key, client_secret=client_secret, ) params = deepcopy(query_params) new_fields = fields.copy() if detailed: new_fields.add('detail_set') if len(new_fields) > 0: params['fields'] = ','.join(new_fields) url = gen_v3_url(asset_type, str(id)) res = requests.get( url, headers=auth_token_manager.request_headers(), params=params, ) res.raise_for_status() return asset_formatters[asset_type](res.json()) def multiple_assets( ids, asset_type, detailed=False, fields=set(), query_params={}, auth_token_manager=None, api_key=None, client_secret=None, ): assert asset_type in asset_formatters auth_token_manager = flex_auth( auth_token_manager=auth_token_manager, api_key=api_key, client_secret=client_secret, ) params = deepcopy(query_params) params['ids'] = ','.join(map(str, ids)) new_fields = fields.copy() if detailed: new_fields.add('detail_set') if len(new_fields) > 0: params['fields'] = ','.join(new_fields) url = gen_v3_url(asset_type) res = requests.get( url, headers=auth_token_manager.request_headers(), params=params, ) res.raise_for_status() return asset_formatters[asset_type](res.json()) PK?N# pygetty/auth.pyfrom __future__ import absolute_import, unicode_literals from builtins import str import pendulum import requests from .util import gen_url TOKEN_VALIDITY_PADDING = 5 # trim these many seconds off token validity GRANT_TYPE_CLIENT_CREDENTIALS = 'client_credentials' class AuthToken(object): def __init__(self, access_token, expires_in=1800, token_type='Bearer'): if not access_token: raise ValueError('access_token must be provided and truthy') self.access_token = access_token self.expiry = pendulum.now().add(seconds=int(expires_in)) self.token_type = token_type def __repr__(self): return '<{}AuthToken: {}>'.format( '' if self.valid else 'Expired ', str(self), ) def __str__(self): return str(self.access_token) def __unicode__(self): return str(self.access_token) @staticmethod def from_dict(obj): return AuthToken(**obj) @property def valid(self): """ Ensures a token is valid now, and for at least the next few seconds. """ return self.expiry > pendulum.now().add(seconds=TOKEN_VALIDITY_PADDING) class AuthTokenManager(object): def __init__(self, api_key, client_secret, auth_token=None): self.api_key = api_key self.client_secret = client_secret self.auth_token = auth_token def _fetch_token(self): res = requests.post(gen_url('oauth2', 'token'), data={ 'grant_type': GRANT_TYPE_CLIENT_CREDENTIALS, 'client_id': self.api_key, 'client_secret': self.client_secret, }) res.raise_for_status() return AuthToken.from_dict(res.json()) @property def token(self): """ A wrapper around AuthTokenManager.auth_token which will always return a currently-valid token (or raise a requests Exception) """ if not self.auth_token or not self.auth_token.valid: self.auth_token = self._fetch_token() return self.auth_token def request_headers(self): return { 'Api-Key': self.api_key, 'Authorization': 'Bearer {}'.format(self.token), } def flex_auth(api_key=None, client_secret=None, auth_token_manager=None): """ Takes either an AuthTokenManager (which is passed through), or an API Key and Client Secret (which is turned into an AuthTokenManager). Exists so endpoint wrappers can take either an ATM or raw creds at the call validation level, but only need to handle ATMs in the "real" functionality. This entire flow basically exists to make the REPL and one-off calls less boilerplatey. """ if auth_token_manager: return auth_token_manager if not (api_key and client_secret): raise ValueError('Either auth_token_manager or api_key+client_secret required') return AuthTokenManager(api_key, client_secret) PK?NkRD``pygetty/collections.pyfrom __future__ import absolute_import, unicode_literals import requests from .auth import flex_auth from .util import gen_v3_url def collections( api_key=None, client_secret=None, auth_token_manager=None, ): """Get collections applicable for this customer. """ auth_token_manager = flex_auth( api_key=api_key, client_secret=client_secret, auth_token_manager=auth_token_manager, ) res = requests.get( gen_v3_url('collections'), headers=auth_token_manager.request_headers(), ) res.raise_for_status() return res.json() PK?Ncÿpygetty/consts.pyfrom __future__ import unicode_literals base_url = 'https://api.gettyimages.com' base_url_v3 = 'https://api.gettyimages.com/v3' PK?N`[pygetty/downloads.pyfrom __future__ import absolute_import, unicode_literals from builtins import str import requests from .auth import flex_auth from .util import gen_v3_url def image_download_url( id, file_type='jpg', auth_token_manager=None, api_key=None, client_secret=None, ): """ Request the URL from which to download a full-resolution Getty asset. """ auth_token_manager = flex_auth( api_key=api_key, client_secret=client_secret, auth_token_manager=auth_token_manager, ) res = requests.post( gen_v3_url('downloads', 'images', str(id)), headers=auth_token_manager.request_headers(), params={ 'auto_download': False, 'file_type': file_type, }, ) res.raise_for_status() return res.json().get('uri') def video_download_url( id, size='hd1', auth_token_manager=None, api_key=None, client_secret=None, ): """ Request the URL from which to download a full-resolution Getty asset. """ auth_token_manager = flex_auth( api_key=api_key, client_secret=client_secret, auth_token_manager=auth_token_manager, ) res = requests.post( gen_v3_url('downloads', 'videos', str(id)), headers=auth_token_manager.request_headers(), params={ 'auto_download': False, 'size': size, }, ) res.raise_for_status() return res.json().get('uri') PK?Ns pygetty/formatters.pyfrom __future__ import absolute_import, print_function, unicode_literals import logging import re import pendulum try: from itertools import zip_longest except ImportError: # Python 2 from itertools import izip_longest as zip_longest MASTERY_DIMENSIONS_REGEX = re.compile(r'(?P[0-9]+)x(?P[0-9]+)') logger = logging.getLogger(__name__) def format_image(image): if image.get('date_created') is not None: image['date_created'] = pendulum.parse(image['date_created']) if image.get('keywords') is not None: image['raw_keywords'] = image['keywords'] image['keywords'] = [kw['text'] for kw in image['keywords']] return image def format_video(video): if video.get('date_created') is not None: video['date_created'] = pendulum.parse(video['date_created']) if video.get('clip_length') is not None: fields = ('days', 'hours', 'minutes', 'seconds', 'frames') cl = [] for x in re.split('[:;]+', video['clip_length']): try: cl.append(int(x)) except (ValueError, TypeError): # if there is an exception can either pad with 0 # or stop iterating, resulting in lower resolution break # Getty durations are provided as strings that can # omit zeroed leading fields. This forces those # missing fields to be parsed as zero, avoiding # an IndexError in the old implementation which # blindly used `cl[2]` and similar # # https://stackoverflow.com/a/13085898 video['clip_length'] = pendulum.duration( **{ k: v for k, v in zip_longest( reversed(fields), reversed(cl), fillvalue=0, ) # discard frames - pendulum Duration objects # obviously don't support them. # Possible TODO would be to calculate milliseconds # off this value and the reported FPS of the # video # # Also remove any improperly parsed fields (somehow # the duration string had more sections than we can # handle, which should basically never happen) if k != 'frames' and k != 0 } ) if video.get('keywords') is not None: video['raw_keywords'] = video['keywords'] video['keywords'] = [kw['text'] for kw in video['keywords']] if video.get('mastered_to') is not None: matches = re.search(pattern=MASTERY_DIMENSIONS_REGEX, string=video['mastered_to']) if matches is not None: video['parsed_dimensions'] = {k: int(v) for k, v in matches.groupdict().viewitems()} else: logger.warning('Could not parse dimensions from video {ID}'.format(ID=video.get('id'))) return video PK?NHpygetty/images.pyfrom __future__ import absolute_import, unicode_literals from .assets import individual_asset, multiple_assets def image( id, detailed=False, fields=set(), query_params={}, auth_token_manager=None, api_key=None, client_secret=None, ): return individual_asset( id, 'images', detailed=detailed, fields=fields, query_params=query_params, auth_token_manager=auth_token_manager, api_key=api_key, client_secret=client_secret, ) def images( ids, detailed=False, fields=set(), query_params={}, auth_token_manager=None, api_key=None, client_secret=None, ): return multiple_assets( ids, 'images', detailed=detailed, fields=fields, query_params=query_params, auth_token_manager=auth_token_manager, api_key=api_key, client_secret=client_secret, ) PK?NA6 pygetty/search.pyfrom __future__ import absolute_import, unicode_literals import logging import warnings from copy import deepcopy from textwrap import dedent import requests from .auth import flex_auth from .formatters import format_image, format_video from .util import gen_v3_url DEFAULT_PAGE_SIZE = 30 MAX_PAGE_SIZE = 100 DEFAULT_MAX_RESULTS = float('inf') logger = logging.getLogger(__name__) asset_formatters = { 'videos': format_video, 'images': format_image, } class APIPageSizeLimitExceeded(Warning): pass def _fetch_page( page, page_size, query_params, asset_type, search_type=None, auth_token_manager=None, ): params = { 'page': page, 'page_size': page_size, } params.update(query_params) if search_type: url = gen_v3_url('search', asset_type, search_type) else: url = gen_v3_url('search', asset_type) res = requests.get( url, headers=auth_token_manager.request_headers(), params=params, ) res.raise_for_status() return res.json() def search( max_results=DEFAULT_MAX_RESULTS, start_page=1, page_size=DEFAULT_PAGE_SIZE, detailed=False, fields=set(), query_params={}, asset_type=None, search_type=None, auth_token_manager=None, api_key=None, client_secret=None, ): if page_size > MAX_PAGE_SIZE: warnings.warn(dedent(""" search: Requested page_size {page_size} is greater than max {max_page_size}, using {max_page_size} """).format( page_size=page_size, max_page_size=MAX_PAGE_SIZE, ), APIPageSizeLimitExceeded) auth_token_manager = flex_auth( auth_token_manager=auth_token_manager, api_key=api_key, client_secret=client_secret, ) returned = 0 page_num = start_page page_size = min((page_size, MAX_PAGE_SIZE)) params = deepcopy(query_params) new_fields = fields.copy() if detailed: new_fields.add('detail_set') params['fields'] = ','.join(new_fields) total_results = None while returned < max_results if total_results is None else min(max_results, total_results): page = _fetch_page( page=page_num, page_size=page_size, query_params=params, asset_type=asset_type, search_type=search_type, auth_token_manager=auth_token_manager, ) total_results = page.get('result_count') for asset in page[asset_type]: yield asset_formatters[asset_type](asset) returned += 1 if returned >= max_results: return page_num += 1 def all_videos(*args, **kwargs): kwargs['asset_type'] = 'videos' return search(*args, **kwargs) def creative_videos(*args, **kwargs): kwargs['search_type'] = 'creative' kwargs['asset_type'] = 'videos' return search(*args, **kwargs) def editorial_videos(*args, **kwargs): kwargs['search_type'] = 'editorial' kwargs['asset_type'] = 'videos' return search(*args, **kwargs) def all_images(*args, **kwargs): kwargs['asset_type'] = 'images' return search(*args, **kwargs) def creative_images(*args, **kwargs): kwargs['search_type'] = 'creative' kwargs['asset_type'] = 'images' return search(*args, **kwargs) def editorial_images(*args, **kwargs): kwargs['search_type'] = 'editorial' kwargs['asset_type'] = 'images' return search(*args, **kwargs) PK?N*ipygetty/usage.pyfrom __future__ import absolute_import, unicode_literals import uuid from builtins import str import pendulum import requests from .auth import flex_auth from .util import gen_v3_url class ReportingFailed(Exception): def __init__(self, message, invalid_assets): super(ReportingFailed, self).__init__(message) self.invalid_assets = invalid_assets def report_usage_now( id, api_key=None, client_secret=None, auth_token_manager=None, ): """ Reports asset usage at the current moment for a single asset. Returns 1 on success, or raises ReportingFailed. """ return report_usage( id, pendulum.now().isoformat(), api_key=api_key, client_secret=client_secret, auth_token_manager=auth_token_manager, ) def report_usage( id, usage_date, api_key=None, client_secret=None, auth_token_manager=None, ): """ Reports asset usage at the specified time for a single asset. Returns 1 on success, or raises ReportingFailed. """ auth_token_manager = flex_auth( api_key=api_key, client_secret=client_secret, auth_token_manager=auth_token_manager, ) res = requests.put( gen_v3_url('usage-batches', str(uuid.uuid4())), headers=auth_token_manager.request_headers(), json={ 'asset_usages': [{ 'asset_id': id, 'quantity': 1, 'usage_date': usage_date, }], }, ) res.raise_for_status() if res.json().get('invalid_assets'): raise ReportingFailed( 'Some assets failed to report', res.json()['invalid_assets'], ) return res.json() PK?N'%o pygetty/util.pyfrom __future__ import absolute_import, unicode_literals from .consts import base_url, base_url_v3 def gen_url(*args): """ Generate an "old"-style URL. As far as I know this is only needed for auth. """ join_args = [base_url] join_args.extend(args) return '/'.join(map(str, join_args)) def gen_v3_url(*args): """ Generate a v3 REST API URL. """ join_args = [base_url_v3] join_args.extend(args) return '/'.join(map(str, join_args)) PK?Npygetty/videos.pyfrom __future__ import absolute_import, unicode_literals from .assets import individual_asset, multiple_assets def video( id, detailed=False, fields=set(), query_params={}, auth_token_manager=None, api_key=None, client_secret=None, ): return individual_asset( id, 'videos', detailed=detailed, fields=fields, query_params=query_params, auth_token_manager=auth_token_manager, api_key=api_key, client_secret=client_secret, ) def videos( ids, detailed=False, fields=set(), query_params={}, auth_token_manager=None, api_key=None, client_secret=None, ): return multiple_assets( ids, 'videos', detailed=detailed, fields=fields, query_params=query_params, auth_token_manager=auth_token_manager, api_key=api_key, client_secret=client_secret, ) PK?N9̹pygetty-1.2.3.dist-info/LICENSEISC License Copyright 2018 Sniply Projects, Inc. (d/b/a Lumen5) Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. PK!Hd BUcpygetty-1.2.3.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,rzd&Y)r$[)T&UD"PK!H5}f pygetty-1.2.3.dist-info/METADATA]N0w?ab"*BCZAgӜ$F_*8}缢-<@V r 5L=z?s9]&H`g1h&PlK%&1xoK^!|ҽi$Dz]ov5)eۣbd2jΉN`al c>`;Y(kAXtɃpww93]]kP!W{㏷UZ!PK!H~?pygetty-1.2.3.dist-info/RECORDuGh}f1 D TϏ~.Q1"~q~h E?T%޽{Zdڰe5fnyW^c k!Q0c\mnu3ۙ?YUM'Y3Ҧgee0<0&zG)p_gGXu]V}PD۞@F4nV"qF ط4)uCD&1lQ C ETR 㮮x]{+ s#=Iny{ʊZ9D!Akݷs*8O\xK]n[YͯzIu&GK N_{TFSa^O%r|7*{68ϣv~w5٬2A\£ >B{R J5Є?aGe> {܊h?s078J.!CK АX"0\XdtY*ݤpW=;C8cHH{nݝej{DӲXT\P ![Atr-V$O Q"wD3HI'U7.iBvse8QcD0} FY[UI?C!ߝI$ lMd`sq#3~X,I+ Z,t`XPKj2&ped yNkywIbk qݫOk( %n*W_]KtK_PK?N Wpygetty/__init__.pyPK?Ndpygetty/assetchanges.pyPK?N*gMM pygetty/assets.pyPK?N# Tpygetty/auth.pyPK?NkRD``pygetty/collections.pyPK?Ncÿpygetty/consts.pyPK?N`[X pygetty/downloads.pyPK?Ns E&pygetty/formatters.pyPK?NH2pygetty/images.pyPK?NA6 5pygetty/search.pyPK?N*iCpygetty/usage.pyPK?N'%o Jpygetty/util.pyPK?NLpygetty/videos.pyPK?N9̹Ppygetty-1.2.3.dist-info/LICENSEPK!Hd BUcSpygetty-1.2.3.dist-info/WHEELPK!H5}f Tpygetty-1.2.3.dist-info/METADATAPK!H~?Upygetty-1.2.3.dist-info/RECORDPKtX