PK!99ergal/__init__.py__version__ = '1.0.0' from .profile import Profile PK!ÿ_ ergal/cli.py""" ergal.cli ~~~~~~~~~ This module implements the command line interface for Ergal. """ import os import sys import asyncio from .profile import Profile clear = lambda: os.system('cls' if os.name == 'nt' else 'clear') def main(): clear() print('Welcome to the Ergal CLI.') name = input('\nEnter the name of a Profile to get/create: ') profile = Profile(name, logs=True) main_menu(profile) def main_menu(profile): clear() action = input(f'''Current Profile: {profile.name}\n Management Options (enter corresponding number) 1. Authentication management 4. Profile management 2. Endpoint management 5. Change profile 3. Data target management 6. Quit ''') if action == '1': auth_menu(profile) elif action == '2': endpoint_menu(profile) elif action == '3': input('\nAction not supported! Press enter to return.') main_menu(profile) elif action == '4': profile_menu(profile) elif action == '5': main() elif action == '6': clear() sys.exit() ############################## # # Authentication Management # ############################## def auth_menu(profile): clear() action = input(f"""Current Profile: {profile.name}\n Authentication Management (enter corresponding number) 1. View Authentication 2. Add authentication 3. Return to the main menu """) if action == '1': auth_view(profile) elif action == '2': auth_add(profile) elif action == '3': main_menu(profile) def auth_view(profile): print(f"\nAuthentication Data for {profile.name}: \n") print(profile.auth) input('\nPress enter to return to the authentication management menu.') auth_menu(profile) def auth_add(profile): clear() method = input('\nAuthentication Method: ') if method == 'basic': username = input('\nUsername: ') password = input('Password: ') asyncio.run(profile.add_auth( method, username=username, password=password)) main_menu(profile) elif method == 'params': name = input('\nName: ') value = input('Value: ') asyncio.run(profile.add_auth( method, name=name, value=value)) main_menu(profile) elif method == 'headers': name = input('\nName: ') value = input('Value: ') asyncio.run(profile.add_auth( method, name=name, value=value)) main_menu(profile) elif method == '': input('\nPress enter to return to the authentication management menu.') auth_menu(profile) else: print('\nInvalid method. Try "basic", "params", or "headers".') auth_add(profile) ######################### # # Endpoint Management # ######################### def endpoint_menu(profile): clear() action = input(f"""Current Profile: {profile.name}\n Endpoint management (enter corresponding number) 1. View endpoints 4. Add endpoint 2. Delete endpoint 5. Return to the main menu 3. Update endpoint """) if action == '1': endpoint_view(profile) elif action == '2': endpoint_delete(profile) elif action == '3': endpoint_update(profile) elif action == '4': endpoint_add(profile) elif action == '5': main_menu(profile) def endpoint_view(profile): print('\nDisplaying endpoints...\n') print(profile.endpoints) input('\nEndpoints displayed, press enter to return to the endpoint management menu.') endpoint_menu(profile) def endpoint_delete(profile): endpoint = input('\nEndpoint to Delete: ') asyncio.run(profile.del_endpoint( endpoint)) input('\nPress enter to return to the endpoint management menu.') endpoint_menu(profile) def endpoint_update(profile): input('\nThis feature is not yet supported! Press enter to return to the endpoint management menu.') endpoint_menu(profile) def endpoint_add(profile): name = input('\nName: ') path = input('Path from Base: ') method = input('Request Method: ') asyncio.run(profile.add_endpoint( name, path, method)) input('\nPress enter to return to the endpoint management menu.') endpoint_menu(profile) ####################### # # Profile Management # ####################### def profile_menu(profile): clear() action = input(f"""Current Profile: {profile.name}\n URL Management (enter corresponding number) 1. View URL 2. Change URL 3. Return to the main menu """) if action == '1': url_view(profile) elif action == '2': url_change(profile) elif action == '3': main_menu(profile) def url_view(profile): print('\nThe current base URL is: ', profile.base) input('\nPress enter to return to the URL management menu') profile_menu(profile) def url_change(profile): clear() url = profile.base if url: new = input('\nThe current base URL is: ' + url + ', what would you like to set the URL to? Leave blank to cancel.\n') else: new = input('\nThere is no current base URL for this profile, what would you like to set the URL to? Leave blank to cancel.\n') if new == '': print('Canceled.') else: profile.base = new profile.update() input('\nPress enter to return to the main menu') profile_menu(profile) if __name__ == '__main__': main() PK!""ergal/profile.py""" ergal.profile ~~~~~~~~~~~~~ This module implements the Profile interface, which enables the user to manage their API profiles. """ import json import uuid import sqlite3 from . import utils import xmltodict as xtd import requests from requests.auth import HTTPDigestAuth class Profile: """ Enables API profile management. This class handles the creation/storage/management of API profiles in a local SQLite3 database called `ergal.db`, unless it is instantiated as a test instance, in which case the database is called `ergal_test.db`. :param name: a name for the API profile :param base: (optional) the base URL of the API :param logs: (optional) specifies whether or not log strings are printed on execution of certain methods. :param test: (optional) specifies whether or not the database instance created should be a test instance. Example: >>> profile = Profile('HTTPBin', base='https://httpbin.com') >>> profile.add_endpoint('JSON', '/json', 'get') >>> profile.call('JSON') """ def __init__(self, name, base=None, logs=False, test=False): self.logs = logs self.name = name if type(name) is str else 'default' self.id = ( uuid.uuid5(uuid.NAMESPACE_DNS, self.name).hex if type(name) is str else 'default') self.base = base if type(base) is str else 'default' self.auth = {} self.endpoints = {} self.db, self.cursor = utils.get_db(test=test) try: self._get() except Exception as e: if str(e) == 'get: no matching record': self._create() else: raise Exception('get/create: unknown error occurred') def _get(self): """ Get an existing profile.. """ sql = "SELECT * FROM Profile WHERE id = ?" self.cursor.execute(sql, (self.id,)) record = self.cursor.fetchone() if record: self.id = record[0] self.name = record[1] self.base = record[2] self.auth = json.loads(record[3]) if record[3] else {} self.endpoints = json.loads(record[4]) if record[4] else {} else: raise Exception('get: no matching record') if self.logs: print(f"Profile for {self.name} fetched from {self.id}.") def _create(self): """ Create a new profile. """ sql = "INSERT INTO Profile (id, name, base) VALUES (?, ?, ?)" with self.db: self.cursor.execute(sql, (self.id, self.name, self.base,)) if self.logs: print(f"Profile for {self.name} created on {self.id}.") def update(self): """ Update a profile's database entry. """ sql = """ UPDATE Profile SET base = ?, auth = ?, endpoints = ? WHERE id = ?""" with self.db: self.cursor.execute( sql, ( self.base, json.dumps(self.auth), json.dumps(self.endpoints), self.id)) if self.logs: print(f"Profile for {self.name} updated on {self.id}.") def delete(self): """ Delete a profile's database entry. """ sql = "DELETE FROM Profile WHERE id = ?" with self.db: self.cursor.execute(sql, (self.id,)) if self.logs: print(f"Profile for {self.name} deleted from {self.id}") async def call(self, name, **kwargs): """ Call an endpoint. :param name: the name of the endpoint """ endpoint = self.endpoints[name] url = self.base + endpoint['path'] targets = endpoint['targets'] if 'targets' in endpoint else None if 'pathvars' in kwargs: url = url.format(**kwargs['pathvars']) if 'auth' in endpoint and endpoint['auth']: if self.auth['method'] == 'headers': kwargs['headers'] = {} kwargs['headers'][self.auth['name']] = self.auth['value'] elif self.auth['method'] == 'params': kwargs['params'] = {} kwargs['params'][self.auth['name']] = self.auth['value'] elif self.auth['method'] == 'basic': kwargs['auth'] = ( self.auth['username'], self.auth['password']) elif self.auth['method'] == 'digest': kwargs['auth'] = HTTPDigestAuth( self.auth['username'], self.auth['password']) for k in list(kwargs): if k not in ('headers', 'params', 'data', 'body', 'auth'): kwargs.pop(k) response = getattr(requests, endpoint['method'].lower())(url, **kwargs) if 'parse' in endpoint and endpoint['parse']: data = await utils.parse(response, targets=targets) return data else: return response async def add_auth(self, method, **kwargs): """ Add authentication details. :param method: a supported authentication method """ auth = {'method': method} for k, v in kwargs.items(): if k in ('name', 'value', 'username', 'password'): auth[k] = v self.auth = auth auth_str = json.dumps(self.auth) sql = "UPDATE Profile SET auth = ? WHERE id = ?" with self.db: self.cursor.execute(sql, (auth_str, self.id,)) if self.logs: print(f"Authentication details for {self.name} added on {self.id}.") async def add_endpoint(self, name, path, method, **kwargs): """ Add an endpoint. :param name: a name for the endpoint :param path: the path, from the base URL, to the endpoint :param method: a supported HTTP method """ endpoint = {'path': path, 'method': method} for key in kwargs: if key in ( 'headers', 'params', 'data', 'body', 'auth', 'parse', 'targets'): endpoint[key] = kwargs[key] self.endpoints[name] = endpoint endpoints_str = json.dumps(self.endpoints) sql = "UPDATE Profile SET endpoints = ? WHERE id = ?" with self.db: self.cursor.execute(sql, (endpoints_str, self.id,)) if self.logs: print(f"Endpoint {name} for {self.name} added on {self.id}.") async def del_endpoint(self, name): """ Delete an endpoint. :param name: the name of an endpoint """ del self.endpoints[name] endpoints_str = json.dumps(self.endpoints) sql = "UPDATE Profile SET endpoints = ? WHERE id = ?" with self.db: self.cursor.execute(sql, (endpoints_str, self.id,)) if self.logs: print(f"Endpoint {name} for {self.name} deleted from {self.id}.") async def add_target(self, endpoint, target): """ Add a data target. :param endpoint: the name of the endpoint :param target: the name of the target field """ targets = ( self.endpoints[endpoint]['targets'] if 'targets' in self.endpoints[endpoint] else []) targets.append(target) self.endpoints[endpoint]['targets'] = targets endpoints_str = json.dumps(self.endpoints) sql = "UPDATE Profile SET endpoints = ? WHERE id = ?" with self.db: self.cursor.execute(sql, (endpoints_str, self.id,)) if self.logs: print(f"Target {target} for {endpoint} added on {self.id}.") async def del_target(self, endpoint, target): """ Delete a data target. :param endpoint: the name of the endpoint :param target: the name of the target field """ targets = self.endpoints[endpoint]['targets'] del targets[targets.index(target)] self.endpoints[endpoint]['targets'] = targets endpoints_str = json.dumps(self.endpoints) sql = "UPDATE Profile SET endpoints = ? WHERE id = ?" with self.db: self.cursor.execute(sql, (endpoints_str, self.id,)) if self.logs: print(f"Target {target} for {endpoint} deleted from {self.id}.") PK!ghergal/utils.py""" ergal.utils ~~~~~~~~~~~ This module implements the utility methods used by the Profile interface. """ import json import types import sqlite3 import xmltodict def get_db(test=False): """ Get/create a database connection. If a local ergal.db file exists, a connection is established and returned, otherwise a new database is created and the connection is returned. :param test: (optional) determines whether or not a test database should be created. """ file = 'ergal_test.db' if test else 'ergal.db' db = sqlite3.connect(file) cursor = db.cursor() db.execute(""" CREATE TABLE IF NOT EXISTS Profile ( id TEXT NOT NULL, name TEXT NOT NULL, base TEXT NOT NULL, auth TEXT, endpoints TEXT, PRIMARY KEY(id))""") return db, cursor async def parse(response, targets=None): """ Parse response data. :param response: a requests.Response object :param targets: a list of data targets """ try: data = json.loads(response.text) except json.JSONDecodeError: data = xmltodict.parse(response.text) if type(data) is list: data = {'data': data} output = {} async def search(d): for k, v in d.items(): if k in targets: output[k] = None yield v elif type(v) is dict: async for i in search(v): output[i] = None yield i for k, v in zip(output, [i async for i in search(data)]): output[k] = v return output PK!$$ergal-1.0.3.dist-info/LICENSECopyright 2019 Elliott Maguire Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.PK!HڽTUergal-1.0.3.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hyergal-1.0.3.dist-info/METADATA[O1+Qً IDF@}ر4ۮoW@o>sΌ"61{&J^ /QCs^Uk NDmKJBMpO'(-)z`緐x)jkf\qC0v=#K4]@Nk3>')]rE8[W:o$^Ghi~fdhڿ0am-Ѱ6g=f/б`'$bX)o,p* VwJ`mNeoߠ;MyB"x" BiޏhV&~`ez &elcqTCQ("PFHțH %ʬd5Wި>]`?PK!99ergal/__init__.pyPK!ÿ_ hergal/cli.pyPK!""Vergal/profile.pyPK!gh9ergal/utils.pyPK!$$@ergal-1.0.3.dist-info/LICENSEPK!HڽTUDergal-1.0.3.dist-info/WHEELPK!HyEergal-1.0.3.dist-info/METADATAPK!H:@}:Fergal-1.0.3.dist-info/RECORDPKH