PK!encapsia_api/__init__.py#: Keep in sync with git tag and package version in pyproject.toml. __version__ = "0.1.13" class EncapsiaApiError(RuntimeError): # NOQA pass from encapsia_api.credentials import CredentialsStore # NOQA from encapsia_api.rest import EncapsiaApi # NOQA PK!>wencapsia_api/credentials.pyimport os from pathlib import Path import toml import encapsia_api class CredentialsStore: CREDENTIALS_STORE = Path().home() / ".encapsia" / "credentials.toml" def __init__(self): # Create directory and file if they don't exist aleady self.CREDENTIALS_STORE.parent.mkdir(mode=0o700, exist_ok=True) self.CREDENTIALS_STORE.touch(mode=0o600, exist_ok=True) self._store_timestamp = -1 self._store = None self._refresh() def _refresh(self): current_timestamp = self.CREDENTIALS_STORE.stat().st_mtime if self._store_timestamp < current_timestamp: self._store = toml.loads(self.CREDENTIALS_STORE.read_text()) self._store_timestamp = current_timestamp def _save(self): self.CREDENTIALS_STORE.write_text(toml.dumps(self._store)) self._store_timestamp = self.CREDENTIALS_STORE.stat().st_mtime def _get(self, label): return self._store[label] def get(self, label): self._refresh() d = self._get(label) return d["url"], d["token"] def _set(self, label, url, token): if not url.startswith("http"): url = f"https://{url}" self._store[label] = {"url": url, "token": token} self._save() def set(self, label, url, token): self._refresh() self._set(label, url, token) def remove(self, label): self._refresh() if label in self._store: del self._store[label] self._save() def _get_env_var(name): try: return os.environ[name] except KeyError: raise encapsia_api.EncapsiaApiError( f"Environment variable {name} does not exist!" ) def discover_credentials(host=None): """Return (url, token) or raise EncapsiaApiError.""" if not host: host = os.environ.get("ENCAPSIA_HOST") if host: store = CredentialsStore() try: url, token = store.get(host) except KeyError: raise encapsia_api.EncapsiaApiError( f"Cannot find entry for '{host}' in encapsia credentials file." ) else: url, token = _get_env_var("ENCAPSIA_URL"), _get_env_var("ENCAPSIA_TOKEN") return url, token PK!!M88encapsia_api/rest.pyimport collections import mimetypes import tempfile import uuid import requests import encapsia_api class Base: def __init__(self, url, token, version="v1"): """Initialize with server URL (e.g. https://myserver.encapsia.com).""" if not url.startswith("http"): url = f"https://{url}" self.url = url.rstrip("/") self.token = token self.version = version def __str__(self): return self.url def call_api( self, method, path_segments, data=None, json=None, return_json=False, check_json_status=False, extra_headers=None, expected_codes=(200, 201), params=None, ): headers = { "Accept": "application/json", "Authorization": "Bearer {}".format(self.token), "User-Agent": f"encapsia-api/{encapsia_api.__version__}", } if json: headers["Content-type"] = "application/json" if extra_headers: headers.update(extra_headers) if path_segments: segments = [self.url, self.version] if isinstance(path_segments, str): segments.append(path_segments.lstrip("/")) else: segments.extend([s.lstrip("/") for s in path_segments]) else: segments = [self.url] url = "/".join(segments) response = requests.request( method, url, data=data, json=json, params=params, headers=headers, verify=True, allow_redirects=False, ) if response.status_code not in expected_codes: raise encapsia_api.EncapsiaApiError( "{} {}\nFull response:\n{}".format( response.status_code, response.reason, (response.content or "").strip(), ) ) if return_json: answer = response.json() if check_json_status and answer["status"] != "ok": raise encapsia_api.EncapsiaApiError(response.text) return answer else: return response def get(self, *args, **kwargs): return self.call_api( "get", *args, return_json=True, check_json_status=True, **kwargs ) def put(self, *args, **kwargs): return self.call_api( "put", *args, return_json=True, check_json_status=True, **kwargs ) def post(self, *args, **kwargs): return self.call_api( "post", *args, return_json=True, check_json_status=True, **kwargs ) def delete(self, *args, **kwargs): return self.call_api( "delete", *args, return_json=True, check_json_status=True, **kwargs ) class GeneralMixin: def whoami(self): return self.get("whoami")["result"] class ReplicationMixin: def get_hwm(self): answer = self.post( ("sync", "out"), json=[], params=dict(all_zones=True, limit=0) ) return answer["result"]["hwm"] def get_assertions(self, hwm, blocksize): answer = self.post( ("sync", "out"), json=hwm, params=dict(all_zones=True, limit=blocksize) ) assertions = answer["result"]["assertions"] hwm = answer["result"]["hwm"] return assertions, hwm def post_assertions(self, assertions): self.post(("sync", "in"), json=assertions) def guess_mime_type(filename): mime_type = mimetypes.guess_type(filename, strict=False)[0] if mime_type is None: mime_type = "application/octet-stream" return mime_type class BlobsMixin: def upload_file_as_blob(self, filename, mime_type=None): """Upload given file to blob, guessing mime_type if not given.""" blob_id = uuid.uuid4().hex if mime_type is None: mime_type = guess_mime_type(filename) with open(filename, "rb") as f: blob_data = f.read() self.upload_blob_data(blob_id, mime_type, blob_data) return blob_id def upload_blob_data(self, blob_id, mime_type, blob_data): """Upload blob data.""" extra_headers = {"Content-type": mime_type} self.call_api( "put", ("blobs", blob_id), data=blob_data, extra_headers=extra_headers, return_json=True, check_json_status=True, ) def download_blob_to_file(self, blob_id, filename): """Download blob to given filename.""" with open(filename, "wb") as f: data = self.download_blob_data(blob_id) if data: f.write(data) def download_blob_data(self, blob_id): """Download blob data for given blob_id.""" extra_headers = {"Accept": "*/*"} response = self.call_api( "get", ("blobs", blob_id), extra_headers=extra_headers, expected_codes=(200, 302, 404), ) if response.status_code == 200: return response.content elif response.status_code in (302, 404): return None else: raise encapsia_api.EncapsiaApiError( "Unable to download blob {}: {}".format(blob_id, response.status_code) ) def get_blobs(self): return self.get("blobs")["result"]["blobs"] def tag_blobs(self, blob_ids, tag): post_data = [{"blob_id": blob_id, "tag": tag} for blob_id in blob_ids] self.post("blobtags", post_data) def delete_blobtag(self, blob_id, tag): self.delete(("blobtags", blob_id, tag)) def get_blob_ids_with_tag(self, tag): return self.get(("blobtags", "", tag))["result"]["blob_ids"] def trim_blobtags(self, blob_ids, tag): """Ensure only the given blobs have given tag.""" server_blob_ids = self.get_blob_ids_with_tag(tag) unwanted = set(server_blob_ids) - set(blob_ids) for blob_id in unwanted: print("Untagging blob {} for tag {}".format(blob_id, tag)) self.delete_blobtag(blob_id, tag) class LoginMixin: def login_transfer(self, user, lifespan=600): data = {"lifespan": lifespan} answer = self.post(("login", "transfer", user), json=data) return answer["result"]["token"] def login_federate(self, origin_server, origin_token, federated_group): data = { "origin_server": origin_server, "origin_token": origin_token, "group": federated_group, } answer = self.post(("login", "federate"), json=data) return answer["result"]["token"] def login_again(self, capabilities=None, lifespan=None): data = {} if capabilities: data["capabilities"] = capabilities if lifespan: data["lifespan"] = lifespan answer = self.post(("login", "again"), json=data) return answer["result"]["token"] class TaskMixin: def run_task(self, namespace, function, params, data=None): """Run task and return a means to poll for the result. Returns a function and a unique "no result yet" object. When called, the function will return the "no result yet" object until a reply is available, or raise an error, or simply return the result from the function. """ extra_headers = {"Content-type": "application/octet-stream"} if data else None reply = self.post( ("tasks", namespace, function), params=params, data=data, extra_headers=extra_headers, ) task_id = reply["result"]["task_id"] class NoResultYet: pass def get_task_result(): reply = self.get(("tasks", namespace, task_id)) rest_api_result = reply["result"] task_status = rest_api_result["status"] task_result = rest_api_result["result"] if task_status == "finished": return task_result elif task_status == "failed": raise encapsia_api.EncapsiaApiError(rest_api_result) else: return NoResultYet return get_task_result, NoResultYet class DbCtlMixin: def dbctl_action(self, name, params): """Request a Database control action. Returns a function and a unique "no reply yet" object. When called, the function will return the "no repy yet" object until a reply is available, or raise an error, or simply return the result from the function. """ reply = self.post(("dbctl", "action", name), params=params) action_id = reply["result"]["action_id"] class NoResultYet: pass def get_result(): reply = self.get(("dbctl", "action", action_id)) rest_api_result = reply["result"] action_status = rest_api_result["status"] action_result = rest_api_result["result"] if action_status == "finished": return action_result elif action_status == "failed": raise encapsia_api.EncapsiaApiError(rest_api_result) else: return NoResultYet return get_result, NoResultYet def dbctl_download_data(self, handle, filename=None): """Download data and return (temp) filename.""" headers = {"Accept": "*/*", "Authorization": "Bearer {}".format(self.token)} url = "/".join([self.url, self.version, "dbctl/data", handle]) response = requests.get(url, headers=headers, verify=True, stream=True) if response.status_code != 200: raise encapsia_api.EncapsiaApiError( "{} {}".format(response.status_code, response.reason) ) if filename is None: fd, filename = tempfile.mkstemp() with open(filename, "wb") as f: for chunk in response.iter_content(chunk_size=1024): f.write(chunk) return filename def dbctl_upload_data(self, filename): """Upload data from given filename. Return a handle which can be used for future downloads. """ with open(filename, "rb") as f: extra_headers = {"Content-type": "application/octet-stream"} response = self.post(("dbctl", "data"), data=f, extra_headers=extra_headers) return response["result"]["handle"] class ConfigMixin: def get_all_config(self): """Return all server configuration.""" return self.get("config")["result"] def get_config(self, key): """Return server configuration value for given key.""" try: return self.get(("config", key))["result"][key] except encapsia_api.EncapsiaApiError: raise KeyError(key) def set_config(self, key, value): """Set server configuration value for given key.""" self.put(("config", key), json=value) def set_config_multi(self, data): """Set multiple server configuration values from JSON dictionary.""" self.post("config", json=data) def delete_config(self, key): """Delete server configuration value associated with given key.""" self.delete(("config", key)) class UserMixin: def add_system_user(self, description, capabilities): """Add system user and system role for given description and capabilities.""" description = description.capitalize() encoded_description = description.lower().replace(" ", "-") email = f"system@{encoded_description}.encapsia.com" role_name = "System - " + description self.post( "roles", json=[ {"name": role_name, "alias": role_name, "capabilities": capabilities} ], ) self.post( "users", json=[ { "email": email, "first_name": "System", "last_name": description, "role": role_name, "enabled": True, "is_site_user": False, } ], ) def add_super_user(self, email, first_name, last_name): """Add a superuser and superuser role.""" self.post( "roles", json=[ { "name": "Superuser", "alias": "Superuser", "capabilities": ["superuser"], } ], ) self.post( "users", json=[ { "email": email, "first_name": first_name, "last_name": last_name, "role": "Superuser", "enabled": True, "is_site_user": False, } ], ) def delete_user(self, email): self.delete(("users", email)) def get_all_users(self): """Return raw json of all users.""" return self.get("users")["result"]["users"] def get_all_roles(self): """Return raw json of all roles.""" return self.get("roles")["result"]["roles"] def get_super_users(self): """Yield namedtuples of superusers.""" SuperUser = collections.namedtuple("SuperUser", "email first_name last_name") for user in self.get_all_users(): if user["role"] == "Superuser": yield SuperUser(user["email"], user["first_name"], user["last_name"]) def get_system_users(self): """Yield namedtuples of system users.""" users = [ user for user in self.get_all_users() if user["email"].startswith("system@") ] capabilities = { role["name"]: role["capabilities"] for role in self.get_all_roles() } SystemUser = collections.namedtuple( "SystemUser", "email description capabilities" ) for user in users: yield SystemUser( user["email"], user["last_name"], tuple(capabilities[user["role"]]) ) class EncapsiaApi( Base, GeneralMixin, ReplicationMixin, BlobsMixin, LoginMixin, TaskMixin, DbCtlMixin, ConfigMixin, UserMixin, ): """REST API access to an Encapsia server.""" PK!kBړ66%encapsia_api-0.1.13.dist-info/LICENSEMIT License Copyright (c) 2019 Timothy Corbett-Clark Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTU#encapsia_api-0.1.13.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H'pBU0&encapsia_api-0.1.13.dist-info/METADATASQo0~ MWSNL]%&6ڊsK:q/ io{J}}#A h}S3=_BbcVD,0r,6$nlSI9?q|oXFm#2>l8?w`v%>PƓ ,cI5*XpQ\w+[{rgQDs*BVJP,ggF0p?tϳw:.FhreQȯ[yݶ?absu^WJ=S_\tCG4]4g,.':?|}kw`7M-7ĦQiW5TwvhB,ft>K80Shغsd )QXbTUӜ8Tz[vz̛MU&[fNzگ`ƋUy9z}ȭ<pZPK!H2dp[/$encapsia_api-0.1.13.dist-info/RECORDKr0}, DaE ZA%PlTR_Nߕ3v;U6RIڈ$d( d &<*+S@$6P}?~yEZʇrm 7wjZX%}7&v'l)>5hezw̶XDъ9r?f+@&՛;t{޲sWEu.c Vt.6ɾQKlWvU"QwG2uO'q[nG`'O#⒈s H\uܶALx7/Ƥ%\ˍm<uVPK!encapsia_api/__init__.pyPK!>w<encapsia_api/credentials.pyPK!!M88S encapsia_api/rest.pyPK!kBړ66%[Cencapsia_api-0.1.13.dist-info/LICENSEPK!HڽTU#Gencapsia_api-0.1.13.dist-info/WHEELPK!H'pBU0&iHencapsia_api-0.1.13.dist-info/METADATAPK!H2dp[/$Jencapsia_api-0.1.13.dist-info/RECORDPKzL