PK!H2-whost/__init__.py__version__ = "0.1.5" PK!Q1whost/common.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import json import string import logging from pathlib import Path logging.basicConfig(level=logging.INFO) CONFIG_PATH = Path("/etc/cardshop-host.config") NETPLAN_CONF = Path("/etc/netplan/50-cloud-init.yaml") DEFAULT_API_URL = "https://api.cardshop.hotspot.kiwix.org" UPDATE_SCRIPT = Path(__file__).parent.resolve().joinpath("update.sh") ALL_SLOTS = list(string.ascii_uppercase) def getLogger(name): return logging.getLogger(name) logger = getLogger(__name__) def read_conf(): """ read cardshop config file (json) """ try: with open(str(CONFIG_PATH), "r") as fd: return json.load(fd) except Exception as exp: logger.error(exp) return {} def save_conf(config): """ save cardshop config to file (json) """ try: with open(str(CONFIG_PATH), "w") as fd: json.dump(config, fd, indent=4) return True except Exception as exp: logger.error(exp) return False def update_conf(data): """ update values into cardshop config file """ config = read_conf() config.update(data) return save_conf(config) def get_next_slot(): config = read_conf() for slot in ALL_SLOTS: if slot in config.get("writers", {}).keys(): continue return slot # make sure we have a config if not CONFIG_PATH.exists(): save_conf( {"username": "", "password": "", "enabled": False, "api_url": DEFAULT_API_URL} ) PK!{T\::whost/devices.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import os import re import subprocess from pathlib import Path from whost.common import getLogger, read_conf logger = getLogger(__name__) BLOCK_PREFIX = Path("/sys/class/block") DEVICES_PREFIX = Path("/sys/devices") def get_writer(slot, writer_conf): device = get_device_from( pci_ident=writer_conf["pci"], usb_ident=writer_conf["usb"], host_ident=writer_conf["host"], ) writer_conf.update({"device": device, "name": get_name_for(device), "slot": slot}) return writer_conf def get_writers(): config = read_conf() return [ get_writer(slot, writer) for slot, writer in config.get("writers", {}).items() ] def get_block_devices_list(): """ ["sdb", "sdc", ..] returning all block devices """ return [ fname for fname in os.listdir(BLOCK_PREFIX) if re.search("^sd[a-z]+$", fname) ] def get_removable_usb_blocks(): return filter(lambda x: is_removable(x), get_block_devices_list()) def find_device(): """ return block_name (sdX) of the first found removable block dev with geometry """ try: return next( filter( lambda x: get_size(x) is not None, [block_name for block_name in get_removable_usb_blocks()], ) ) except Exception: return None def _block_path(block_name): return BLOCK_PREFIX.joinpath(block_name) def get_metadata(block_name): """ return PCI/USB/HOST unique identifier for a block_name """ parts = _block_path(block_name).resolve().parts return {"pci": parts[4], "usb": parts[8], "host": parts[11]} def get_device_path(pci_ident, usb_ident, host_ident): """ full /sys/devices path for a PCI/USB/HOST combination """ # /sys/devices/pci0000:00/0000:00:14.0 pci_paths = ["pci{}".format(":".join(pci_ident.split(":")[0:2])), pci_ident] # usb4/4-1/4-1.2/4-1.2:1.0 usb_paths = [ "usb{}".format(usb_ident.split("-", 1)[0]), usb_ident.split(".", 1)[0], usb_ident.split(":", 1)[0], usb_ident, ] # host6/target6:0:0/6:0:0:1 host_paths = [ "host{}".format(host_ident.split(":", 1)[0]), "target{}".format(host_ident.rsplit(":", 1)[0]), host_ident, ] return DEVICES_PREFIX.joinpath(*pci_paths + usb_paths + host_paths) def get_block_for(device_path): try: bn = [ fname for fname in os.listdir(device_path.joinpath("block")) if fname.startswith("sd") ][0] except Exception as exp: logger.error(exp) return None return device_path.joinpath("block", bn).name def get_device_from(pci_ident, usb_ident, host_ident): return get_block_for(get_device_path(pci_ident, usb_ident, host_ident)) def get_name_for(block_name): vendor_p = _block_path(block_name).joinpath("device", "vendor") model_p = _block_path(block_name).joinpath("device", "model") with open(str(vendor_p), "r") as vfp, open(str(model_p), "r") as mfp: return "{vendor}{model}".format( vendor=vfp.read().strip(), model=mfp.read().strip() ) def is_removable(block_name): try: removable_p = _block_path(block_name).joinpath("removable") with open(str(removable_p), "r") as fp: return bool(int(fp.read().strip())) except Exception: return False def get_size(block_name): ps = subprocess.run( ["/sbin/fdisk", "-l", "/dev/{}".format(block_name)], universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if not ps.returncode == 0: return None try: return int(re.search(", ([0-9]+) bytes", ps.stdout.splitlines()[0]).groups()[0]) except Exception as exp: logger.error(exp) return 1 PK!m3` whost/network.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import ipaddress import subprocess import yaml import netifaces import requests from whost.common import getLogger, NETPLAN_CONF logger = getLogger(__name__) BLANK_CONF = {"network": {"ethernets": {}, "version": 2}} NAME_SERVERS = ["8.8.8.8", "8.8.4.4"] NETPLAN_NS = {"nameservers": {"addresses": NAME_SERVERS}} def read_netplan(): """ read netplan config file (yaml) """ with open(str(NETPLAN_CONF), "r") as fd: return yaml.load(fd.read()) def save_netplan(config, apply_conf=True): """ save netplan config file (yaml) """ with open(str(NETPLAN_CONF), "w") as fd: yaml.safe_dump(config, fd) if apply_conf: return subprocess.run(["netplan", "try", "--timeout", "1"]).returncode == 0 return True def update_netplan(data, apply_conf=True): """ update values into netplan config file """ config = read_netplan() config.update(data) save_netplan(config, apply_conf=apply_conf) def reset_netplan(): """ replace netplan config with blank one """ return save_netplan(BLANK_CONF, apply_conf=True) def get_iface_config(iface): """ simple address/netmask/gateway access to real netplan conf for an iface """ conf = read_netplan()["network"]["ethernets"].get(iface, {}) addresses = conf.get("addresses", []) address = addresses[0] if addresses else None address_str = netmask_str = None if address: try: aif = ipaddress.ip_interface(address) address_str = aif.ip.compressed netmask_str = aif.hostmask.compressed except Exception: pass return { "address": address_str, "netmask": netmask_str, "gateway": conf.get("gateway4"), } def get_interfaces(skip_loopback=True): """ list of available network interfaces """ all_ifaces = netifaces.interfaces() if skip_loopback: return filter( lambda x: not x.startswith("lo") if skip_loopback else x, all_ifaces ) return all_ifaces def is_internet_connected(): """ boolean whether kiwix website is avail via HTTP (hence internet OK) """ try: requests.head("http://kiwix.org") return True except Exception as exp: logger.error(exp) return False def save_network_config(iface, dhcp, address=None, netmask=None, gateway=None): if dhcp: return configure_dhcp(iface) return configure_static(iface, address=address, netmask=netmask, gateway=gateway) def configure_dhcp(iface): dhcp_conf = {"addresses": [], "dhcp4": True, "optional": True} dhcp_conf.update(NETPLAN_NS) netplan = read_netplan() netplan["network"]["ethernets"].update({iface: dhcp_conf}) return save_netplan(netplan) def configure_static(iface, address, netmask, gateway): nma = ipaddress.ip_interface("{a}/{m}".format(a=address, m=netmask)) fixed_conf = { "addresses": [nma.compressed], "gateway4": gateway, "dhcp4": False, "optional": True, } fixed_conf.update(NETPLAN_NS) netplan = read_netplan() netplan["network"]["ethernets"].update({iface: fixed_conf}) return save_netplan(netplan) PK!}~eNNwhost/ui/__init__.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import sys from whost.ui import cli CANCEL = "CANCEL" CANCEL_LIST = [CANCEL] def handle_cancel(value): """ print and exit if value i cancel """ if value == CANCEL: display_success("Canceling ; exiting.") sys.exit(1) def display_menu(label, choices=None, menu=None, launch=True, with_cancel=False): # built list of choices if from menu if choices is None: choices = list(menu.keys()) if with_cancel: choices += CANCEL_LIST choices_t = list(choices) def _func_name(x): if menu is None: return x return menu.get(x, (CANCEL,))[0] def _func_desc(x): return choices_t.index(x) selected = cli.ask_choice( label, choices=choices, func_name=_func_name, func_desc=_func_desc ) handle_cancel(selected) if menu is not None and launch: return menu.get(selected)[1]() return selected def display_success(*message): """ standardized way of displaying a success message """ cli.info_2(*message) def display_error(*message): """ standardized way of displaying an error message """ cli.info(cli.red, *message) def restart_line(): """ reset a single printed line (useful for loader) """ sys.stdout.write("\r") sys.stdout.flush() def get_valid_string(label, validator, default=None): """ shortcut to request a string validated via a callback """ value, error = None, None while value is None: if error: cli.info(cli.yellow, error) value, error = validator(cli.ask_string(label, default=default)) return value def nonempty_validator(value): """ validator ensuring value is not empty """ if not value: return None, "Empty value not allowed" return (value, None) PK!dIIwhost/ui/cli.pyfrom typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union, IO import argparse import datetime import difflib import functools import getpass import inspect import io import os import sys import time import traceback import colorama import unidecode import tabulate Dict, Type ConfigValue = Union[None, bool, str] FileObj = IO[str] # Global variable to store configuration CONFIG = { "verbose": os.environ.get("VERBOSE"), "quiet": False, "color": "auto", "title": "auto", "timestamp": False, "record": False, # used for testing } # type: Dict[str, ConfigValue] # used for testing _MESSAGES = list() # so that we don't re-compute this variable over # and over again: _ENABLE_XTERM_TITLE = None # should we call colorama.init()? _INITIALIZED = False Token = Union[str, "Color", "UnicodeSequence", "Symbol"] def setup( *, verbose: bool = False, quiet: bool = False, color: str = "auto", title: str = "auto", timestamp: bool = False ) -> None: """ Configure behavior of message functions. :param verbose: Whether :func:`debug` messages should get printed :param quiet: Hide every message except :func:`warning`, :func:`error`, and :func:`fatal` :param color: Choices: 'auto', 'always', or 'never'. Whether to color output. By default ('auto'), only use color when output is a terminal. :param title: Ditto for setting terminal title :param timestamp: Whether to prefix every message with a time stamp """ _setup(verbose=verbose, quiet=quiet, color=color, title=title, timestamp=timestamp) def _setup(**kwargs: ConfigValue) -> None: for key, value in kwargs.items(): CONFIG[key] = value class Color: """Represent an ANSI escape sequence """ def __init__(self, code: str): self.code = code # fmt: off reset = Color(colorama.Style.RESET_ALL) bold = Color(colorama.Style.BRIGHT) faint = Color(colorama.Style.DIM) # for some reason those are not in colorama standout = Color('\x1b[3m') underline = Color('\x1b[4m') blink = Color('\x1b[5m') overline = Color('\x1b[6m') black = Color(colorama.Fore.BLACK) red = Color(colorama.Fore.RED) green = Color(colorama.Fore.GREEN) yellow = Color(colorama.Fore.YELLOW) blue = Color(colorama.Fore.BLUE) magenta = Color(colorama.Fore.MAGENTA) cyan = Color(colorama.Fore.CYAN) white = Color(colorama.Fore.WHITE) # backward compatibility: brown = yellow # used by ui.warning lightgray = white # used by ui.debug darkred = red darkgreen = green darkblue = blue purple = magenta fuscia = magenta fuschia = magenta turquoise = cyan darkgray = black darkteal = cyan darkyellow = yellow # fmt: on # Other nice-to-have characters: class UnicodeSequence: """ Represent a sequence containing a color followed by a Unicode symbol """ def __init__(self, color: Color, as_unicode: str, as_ascii: str): if os.name == "nt": self.as_string = as_ascii else: self.as_string = as_unicode self.color = color def tuple(self) -> Tuple[Token, ...]: return (reset, self.color, self.as_string, reset) ellipsis = UnicodeSequence(reset, "…", "...") check = UnicodeSequence(green, "✓", "ok") cross = UnicodeSequence(red, "❌", "ko") class Symbol(UnicodeSequence): def __init__(self, as_unicode: str, as_ascii: str): super().__init__(reset, as_unicode, as_ascii) def tuple(self) -> Tuple[Token, ...]: return (self.as_string,) def using_colorama() -> bool: if os.name == "nt": if "TERM" not in os.environ: return True if os.environ["TERM"] == "cygwin": return True return False else: return False def config_color(fileobj: FileObj) -> bool: if CONFIG["color"] == "never": return False if CONFIG["color"] == "always": return True if os.name == "nt": # sys.isatty() is False on mintty, so # let there be colors by default. (when running on windows, # people can use --color=never) # Note that on Windows, when run from cmd.exe, # console.init() does the right thing if sys.stdout is redirected return True else: return fileobj.isatty() def write_title_string(mystr: str, fileobj: FileObj) -> None: if using_colorama(): # By-pass colorama bug: # colorama/win32.py, line 154 # return _SetConsoleTitleW(title) # ctypes.ArgumentError: argument 1: : wrong type return mystr = "\x1b]0;%s\x07" % mystr fileobj.write(mystr) fileobj.flush() def process_tokens( tokens: Sequence[Token], *, end: str = "\n", sep: str = " " ) -> Tuple[str, str]: """ Returns two strings from a list of tokens. One containing ASCII escape codes, the other only the 'normal' characters """ # Flatten the list of tokens in case some of them are of # class UnicodeSequence: flat_tokens = list() # type: List[Token] for token in tokens: if isinstance(token, UnicodeSequence): flat_tokens.extend(token.tuple()) else: flat_tokens.append(token) with_color = _process_tokens(flat_tokens, end=end, sep=sep, color=True) without_color = _process_tokens(flat_tokens, end=end, sep=sep, color=False) return (with_color, without_color) def _process_tokens( tokens: Sequence[Token], *, end: str = "\n", sep: str = " ", color: bool = True ) -> str: res = "" if CONFIG["timestamp"]: now = datetime.datetime.now() res += now.strftime("[%Y-%m-%d %H:%M:%S] ") for i, token in enumerate(tokens): if isinstance(token, Color): if color: res += token.code else: res += str(token) if i != len(tokens) - 1: res += sep res += end if color: res += reset.code return res def write_and_flush(fileobj: FileObj, to_write: str) -> None: try: fileobj.write(to_write) except UnicodeEncodeError: # Maybe the file descritor does not support the full Unicode # set, like stdout on Windows. # Use the unidecode library # to make sure we only have ascii, while still keeping # as much info as we can fileobj.write(unidecode.unidecode(to_write)) fileobj.flush() def message( *tokens: Token, end: str = "\n", sep: str = " ", fileobj: FileObj = sys.stdout, update_title: bool = False ) -> None: """ Helper method for error, warning, info, debug """ if using_colorama(): global _INITIALIZED if not _INITIALIZED: colorama.init() _INITIALIZED = True with_color, without_color = process_tokens(tokens, end=end, sep=sep) if CONFIG["record"]: _MESSAGES.append(without_color) if update_title and with_color: write_title_string(without_color, fileobj) to_write = with_color if config_color(fileobj) else without_color write_and_flush(fileobj, to_write) def fatal(*tokens: Token, **kwargs: Any) -> None: """ Print an error message and call ``sys.exit`` """ error(*tokens, **kwargs) sys.exit(1) def error(*tokens: Token, **kwargs: Any) -> None: """ Print an error message """ tokens = [bold, red, "Error:"] + list(tokens) # type: ignore kwargs["fileobj"] = sys.stderr message(*tokens, **kwargs) def warning(*tokens: Token, **kwargs: Any) -> None: """ Print a warning message """ tokens = [brown, "Warning:"] + list(tokens) # type: ignore kwargs["fileobj"] = sys.stderr message(*tokens, **kwargs) def info(*tokens: Token, **kwargs: Any) -> None: r""" Print an informative message :param tokens: list of `ui` constants or strings, like ``(cli_ui.red, 'this is an error')`` :param sep: separator, defaults to ``' '`` :param end: token to place at the end, defaults to ``'\n'`` :param fileobj: file-like object to print the output, defaults to ``sys.stdout`` :param update_title: whether to update the title of the terminal window """ if CONFIG["quiet"]: return message(*tokens, **kwargs) def info_section(*tokens: Token, **kwargs: Any) -> None: """ Print an underlined section name """ # We need to know the length of the section: process_tokens_kwargs = kwargs.copy() process_tokens_kwargs["color"] = False no_color = _process_tokens(tokens, **process_tokens_kwargs) info(*tokens, **kwargs) info("-" * len(no_color), end="\n\n") def info_1(*tokens: Token, **kwargs: Any) -> None: """ Print an important informative message """ info(bold, blue, "::", reset, *tokens, **kwargs) def info_2(*tokens: Token, **kwargs: Any) -> None: """ Print an not so important informative message """ info(bold, blue, "=>", reset, *tokens, **kwargs) def info_3(*tokens: Token, **kwargs: Any) -> None: """ Print an even less important informative message """ info(bold, blue, "*", reset, *tokens, **kwargs) def dot(*, last: bool = False, fileobj: Any = None) -> None: """ Print a dot without a newline unless it is the last one. Useful when you want to display a progress with very little knowledge. :param last: whether this is the last dot (will insert a newline) """ end = "\n" if last else "" info(".", end=end, fileobj=fileobj) def info_count(i: int, n: int, *rest: Token, **kwargs: Any) -> None: """ Display a counter before the rest of the message. ``rest`` and ``kwargs`` are passed to :func:`info` Current index should start at 0 and end at ``n-1``, like in ``enumerate()`` :param i: current index :param n: total number of items """ num_digits = len(str(n)) counter_format = "(%{}d/%d)".format(num_digits) counter_str = counter_format % (i + 1, n) info(green, "*", reset, counter_str, reset, *rest, **kwargs) def info_progress(prefix: str, value: float, max_value: float) -> None: """ Display info progress in percent. :param value: the current value :param max_value: the max value :param prefix: the prefix message to print """ if sys.stdout.isatty(): percent = float(value) / max_value * 100 sys.stdout.write(prefix + ": %.0f%%\r" % percent) sys.stdout.flush() def debug(*tokens: Token, **kwargs: Any) -> None: """ Print a debug message. Messages are shown only when ``CONFIG["verbose"]`` is true """ if not CONFIG["verbose"] or CONFIG["record"]: return message(*tokens, **kwargs) def indent_iterable(elems: Sequence[str], num: int = 2) -> List[str]: """Indent an iterable.""" return [" " * num + l for l in elems] def indent(text: str, num: int = 2) -> str: """Indent a piece of text.""" lines = text.splitlines() return "\n".join(indent_iterable(lines, num=num)) def tabs(num: int) -> str: """ Compute a blank tab """ return " " * num def info_table( data: Any, *, headers: Optional[List[str]] = None, fileobj: Any = None ) -> None: if not fileobj: fileobj = sys.stdout colored_data = list() plain_data = list() for row in data: colored_row = list() plain_row = list() for item in row: colored_str, plain_str = process_tokens(item, end="") colored_row.append(colored_str) plain_row.append(plain_str) colored_data.append(colored_row) plain_data.append(plain_row) if config_color(fileobj): data_for_tabulate = colored_data else: data_for_tabulate = plain_data res = tabulate.tabulate(data_for_tabulate, headers=headers) res += "\n" write_and_flush(fileobj, res) def message_for_exception(exception: Exception, message: str) -> Sequence[Token]: """ Returns a tuple suitable for cli_ui.error() from the given exception. (Traceback will be part of the message, after the ``message`` argument) Useful when the exception occurs in an other thread than the main one. """ tb = sys.exc_info()[2] buffer = io.StringIO() traceback.print_tb(tb, file=io) # type: ignore # fmt: off return ( red, message + "\n", exception.__class__.__name__, str(exception), "\n", reset, buffer.getvalue() ) # fmt: on def read_input() -> str: """ Read input from the user """ info(green, "> ", end="") return input() def read_password() -> str: """ Read a password from the user """ info(green, "> ", end="") return getpass.getpass(prompt="") def get_ask_tokens(tokens: Sequence[Token]) -> List[Token]: return [green, "::", reset] + list(tokens) + [reset] # type: ignore def ask_string(*question: Token, default: Optional[str] = None) -> Optional[str]: """Ask the user to enter a string. """ tokens = get_ask_tokens(question) if default: tokens.append("(%s)" % default) info(*tokens) answer = read_input() if not answer: return default return answer def ask_password(*question: Token) -> str: """Ask the user to enter a password. """ tokens = get_ask_tokens(question) info(*tokens) answer = read_password() return answer FuncDesc = Callable[[Any], str] FuncName = Callable[[Any], str] def ask_choice( *prompt: Token, choices: List[Any], func_name: Optional[FuncName] = None, func_desc: Optional[FuncDesc] = None ) -> Any: """Ask the user to choose from a list of choices. :return: the selected choice ``func_desc`` will be called on every list item for displaying and sorting the list. If not given, will default to the identity function. Will loop until: * the user enters a valid index * or hits ``ctrl-c`` * or leaves the prompt empty In the last two cases, None will be returned """ if func_name is None: func_name = lambda x: str(x) if func_desc is None: func_desc = lambda x: str(x) tokens = get_ask_tokens(prompt) info(*tokens) choices.sort(key=func_desc) for i, choice in enumerate(choices, start=1): choice_desc = func_name(choice) info(" ", blue, "%i" % i, reset, choice_desc) keep_asking = True res = None while keep_asking: answer = read_input() if not answer: return None try: index = int(answer) except ValueError: info("Please enter a valid number") continue if index not in range(1, len(choices) + 1): info(str(index), "is out of range") continue res = choices[index - 1] keep_asking = False return res def ask_yes_no(*question: Token, default: bool = False) -> bool: """Ask the user to answer by yes or no""" while True: tokens = [green, "::", reset] + list(question) + [reset] if default: tokens.append("(Y/n)") else: tokens.append("(y/N)") info(*tokens) # type: ignore answer = read_input() if answer.lower() in ["y", "yes"]: return True if answer.lower() in ["n", "no"]: return False if not answer: return default warning("Please answer by 'y' (yes) or 'n' (no) ") AnyFunc = Callable[..., Any] class Timer: """ Display time taken when executing a list of statements. """ def __init__(self, description: str): self.description = description self.start_time = datetime.datetime.now() self.stop_time = datetime.datetime.now() self.elapsed_time = 0 def __call__(self, func: AnyFunc, *args: Any, **kwargs: Any) -> AnyFunc: @functools.wraps(func) def res(*args: Any, **kwargs: Any) -> Any: self.start() ret = func(*args, **kwargs) self.stop() return ret return res def __enter__(self) -> "Timer": self.start() return self def __exit__(self, *unused: Any) -> None: self.stop() def start(self) -> None: """ Start the timer """ self.start_time = datetime.datetime.now() def stop(self) -> None: """ Stop the timer and emit a nice log """ end_time = datetime.datetime.now() elapsed_time = end_time - self.start_time elapsed_seconds = elapsed_time.seconds hours, remainder = divmod(int(elapsed_seconds), 3600) minutes, seconds = divmod(remainder, 60) as_str = "%sh %sm %ss %dms" % ( hours, minutes, seconds, elapsed_time.microseconds / 1000, ) info("%s took %s" % (self.description, as_str)) def did_you_mean(message: str, user_input: str, choices: Sequence[str]) -> str: """ Given a list of choices and an invalid user input, display the closest items in the list that match the input. """ if not choices: return message else: result = { difflib.SequenceMatcher(a=user_input, b=choice).ratio(): choice for choice in choices } message += "\nDid you mean: %s?" % result[max(result)] return message def main_test_colors() -> None: this_module = sys.modules[__name__] for name, value in inspect.getmembers(this_module): if isinstance(value, Color): info(value, name) def main_demo() -> None: info("OK", check) up = Symbol("👍", "+1") info("I like it", blue, up) info_section(bold, "python-cli demo") # Monkey-patch message() so that we sleep after # each call global message old_message = message def new_message(*args: Any, **kwargs: Any) -> None: old_message(*args, **kwargs) time.sleep(1) message = new_message info_1("Important info") info_2("Secondary info") info("This is", red, "red") info("this is", bold, "bold") list_of_things = ["foo", "bar", "baz"] for i, thing in enumerate(list_of_things): info_count(i, len(list_of_things), thing) info_progress("Done", 5, 20) info_progress("Done", 10, 20) info_progress("Done", 20, 20) info("\n", check, "all done") # stop monkey patching message = old_message fruits = ["apple", "orange", "banana"] answer = ask_choice("Choose a fruit", choices=fruits) info("You chose:", answer) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("action", choices=["test_colors", "demo"]) args = parser.parse_args() if args.action == "demo": main_demo() elif args.action == "test_colors": main_test_colors() if __name__ == "__main__": main() PK!Diwhost/ui/credentials.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import requests from whost.common import getLogger, read_conf, DEFAULT_API_URL, update_conf from whost.ui import cli, get_valid_string, nonempty_validator logger = getLogger(__name__) def is_authenticated(): """ boolean if in-config credentials could authenticate """ config = read_conf() try: req = requests.post( url="{}/auth/authorize".format(config.get("api_url")), headers={ "username": config.get("username"), "password": config.get("password"), "Content-type": "application/json", }, ) req.raise_for_status() except Exception as exp: logger.error(exp) return False else: return True def configure_credentials(): """ get username/password/api_url from user and save to config file """ cli.info_2("Credentials") config = read_conf() username = get_valid_string("Username", nonempty_validator, config.get("username")) password = get_valid_string("Password", nonempty_validator, config.get("password")) api_url = get_valid_string( "API URL – use `reset` to use default", nonempty_validator, config.get("api_url", DEFAULT_API_URL), ) if api_url == "reset": api_url = DEFAULT_API_URL update_conf({"username": username, "password": password, "api_url": api_url}) cli.info_2( "Saved crentials as", cli.bold, username, cli.reset, "/", cli.bold, password, cli.reset, "for", cli.bold, api_url, ) PK!|=  whost/ui/devices.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import time from collections import OrderedDict import humanfriendly from whost.ui import cli, display_menu, display_success, display_error from whost.common import getLogger, get_next_slot, read_conf, update_conf from whost.devices import get_writers, get_name_for, find_device, get_size, get_metadata logger = getLogger(__name__) def reset_devices(): pass def add_device(): cli.info_2("Please remove all SD-cards from all writers.") ready = cli.ask_yes_no("Ready?", default=False) if not ready: return cli.info( "Great! Now please insert", cli.bold, "one", cli.reset, "SD-card into the writer you want to configure.", ) print(" waiting for card", end="") device = None while device is None: time.sleep(1) print(".", end="", flush=True) device = find_device() print("FOUND") # we now have a new DEVICE. hw = get_metadata(device) slot = get_next_slot() # update configured writers list writers = read_conf().get("writers", {}) writers.update({slot: hw}) if update_conf({"writers": writers}): display_success( "Found your", humanfriendly.format_size(get_size(device), binary=True), "card on", cli.bold, device, cli.reset, "({})".format(get_name_for(device)), ".\n", "Assigned slot:", cli.bold, slot, ) else: display_error("Failed to configure a slot for", cli.bold, device) def configure_devices(): cli.info_1("Already configured writer devices") writers = get_writers() for writer in writers: cli.info( cli.blue, " *", cli.reset, cli.bold, "{slot}:/dev/{device}".format(**writer), cli.reset, "({name} at {pci}/{usb}/{host})".format(**writer), ) cli.info("") menu = OrderedDict( [ ("reset-writers", ("Reset writers config (remove ALL)", reset_devices)), ("add-device", ("Add one device", add_device)), ] ) display_menu("Choose:", menu=menu, with_cancel=True) PK!Z;UW W whost/ui/home.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import sys from collections import OrderedDict from whost.common import UPDATE_SCRIPT, read_conf, update_conf from whost.ui import cli, restart_line, display_menu, display_success, display_error from whost.ui.devices import configure_devices from whost.ui.network import configure_network from whost.ui.credentials import configure_credentials, is_authenticated from whost.network import is_internet_connected def exit_to_shell(): display_success("Exiting to shell.") sys.exit(10) def exit_to_logout(): display_success("Exiting to logout.") sys.exit(20) def update_code(): cli.info_2("Launching update script…") cli.info(str(UPDATE_SCRIPT)) def toggle_host(): enabled = read_conf().get("enabled", False) answer = cli.ask_yes_no( "You are about to {} this host. Are you sure?".format( "disable" if enabled else "enable" ) ) if answer: ns = "enabled" if not enabled else "disabled" if update_conf({"enabled": not enabled}): display_success("Successfuly {} host!".format(ns)) else: display_error("Error: host could not be {}.".format(ns)) def display_home(): cli.info_section(cli.purple, "Hotsport Cardshop writer-host configurator") print("Checking internet connection…", end="", flush=True) connected = is_internet_connected() authenticated = is_authenticated() if connected else False restart_line() config = read_conf() connected_str = "CONNECTED" if connected else "NOT CONNECTED" connected_color = cli.green if connected else cli.red authenticated_str = "AUTHENTICATED" if authenticated else "NOT AUTHENTICATED" authenticated_color = cli.green if authenticated else cli.red enabled = config.get("enabled", False) enabled_color = cli.green if enabled else cli.red enabled_str = "ENABLED" if enabled else "DISABLED" configured_readers = len(config.get("writers", {})) configured_readers_color = cli.yellow if configured_readers else cli.red menu = OrderedDict( [ ("configure-network", ("Configure Network", configure_network)), ("configure-credentials", ("Configure Credentials", configure_credentials)), ("configure-readers", ("Configure USB Writers", configure_devices)), ("update-code", ("Update code and restart", update_code)), ( "toggle-host", ( "{} this Host".format("Enable" if not enabled else "Disable"), toggle_host, ), ), ("exit-to-shell", ("Exit to a shell", exit_to_shell)), ("logout", ("Exit (logout)", exit_to_logout)), ] ) cli.info_1("Internet Connectivity:", connected_color, connected_str) cli.info_1("Authentication:", authenticated_color, authenticated_str) cli.info_1("Host Status:", enabled_color, enabled_str) cli.info_1("Configured Writers:", configured_readers_color, str(configured_readers)) display_menu("Choose:", menu=menu) PK!GEm whost/ui/network.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import ipaddress from collections import OrderedDict from whost.ui import cli, display_menu, get_valid_string, display_error, display_success from whost.network import get_interfaces, save_network_config, reset_netplan, get_iface_config def ipadress_validator(value): """ IPv4 address validator """ try: ipaddress.ip_address(value).compressed except ValueError: return None, "Invalid IPv4 address format" return (value, None) def reset_network_config(): """ restore initial blank /etc/network/configure """ if reset_netplan(): display_success("Network configuration has been reset via netplan.") else: display_error("Failed to reset network config via netplan.") def configure_network(): cli.info_2("You need one, direct, ethernet Internet connection.") menu = OrderedDict( [ ( "reset-network-config", ("Reset network config (remove ALL interfaces)", reset_network_config), ), ("configure-iface", ("Configure Interface", configure_iface)), ] ) display_menu("Choose:", menu=menu, with_cancel=True) def configure_iface(): # pick interface ifaces = list(get_interfaces()) iface = display_menu("Choose Interface:", choices=ifaces, with_cancel=True) cli.info("You selected", cli.bold, iface) iface_config = get_iface_config(iface) from pprint import pprint as pp pp(iface_config) # select method (dhcp, fixed) dhcp = "dhcp" fixed = "fixed" methods = [dhcp, fixed] method = display_menu("Connection method:", choices=methods, with_cancel=True) if method == dhcp: if save_network_config(iface, dhcp=True): cli.info_2( "Saved your", cli.bold, iface, cli.reset, "configuration as", cli.bold, "DHCP", ) else: display_error("Unable to save DHCP network config for", cli.bold, iface) return # fixed method config # get address address = get_valid_string( "IP Address", ipadress_validator, default=iface_config["address"] ) cli.info("You entered", cli.bold, address) netmask = get_valid_string( "Netmask", ipadress_validator, default=iface_config["netmask"] ) cli.info("You entered", cli.bold, netmask) gateway = get_valid_string( "Gateway", ipadress_validator, default=iface_config["gateway"] ) cli.info("You entered", cli.bold, gateway) if save_network_config( iface, dhcp=False, address=address, netmask=netmask, gateway=gateway ): cli.info_2( "Saved your", cli.bold, iface, cli.reset, "configuration as", cli.bold, address, cli.reset, "/", cli.bold, netmask, cli.reset, "/", cli.bold, gateway, ) else: display_error("Unable to save fixed network config for", cli.bold, iface) PK!8oh whost-config#!/bin/bash ret=0 for (( ; ; )) do reset whost-configurator ret=$? # exit to shell if [ $ret -gt 2 ]; then break fi done if [ $ret -ge 20 ]; then kill -HUP $PPID fi PK!:|Ùwhost-configurator#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu from whost.ui.home import display_home def main(): display_home() if __name__ == '__main__': main() PK!+//whost-restart-all#!/bin/bash whost-stop-all && whost-start-all PK!z/QQ whost-setup#!/bin/bash RED=`tput setaf 1` GREEN=`tput setaf 2` YELLOW=`tput setaf 3` BLUE=`tput setaf 4` NC=`tput sgr0` REG_PUB="ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC4UTXOYXrKA6dR7KizO2AvqqHKQGJE/FZF2oKTiofWEYDf+UWylksH4WjFmVczDUHN653Ve/QOIyRfI6IUuVa2hJ+l02xFV7rdl7L5zSZwKiSJr+SefouzWIFwS3VS3gbLOqk864a1NkUR97yKYjxsZiT9fISf771HqEKhsXOzZDOFbxt5u+YAaAJIJlU0EMKkDRBBtAVxmLFHme0uSpZ8DlYMFARGe1s0I++1eby0NVtzP3TarouvkPN1cFmS7UhQCsHzcmDMcNyrtHGBnlgjihd4m2bppmY75xTTR/PQTKDWqwklyYZhiDCKjZYzxWTk493SwKfZfaT9FOU0r4FT reg@homelet" MAINT_PUB="ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCx9Bfw8wkceadnounwXVHInI0FyNEj3z64bqXA8cwbgkqkXTVWnI3I6vUzKY8dSfL8PXydCaVnGxogP88Y294k4rjIf8NGubwNe5B2oyLNuscBhd1QWzEmvr4ej32I1Ot3oulJsbqt7oSKUr6pQ4fD44WXjGNaQx3WhbsSJb28k4rNRs4bY+HlScsaKlfVRpE+kuI64BNPl4+IVfkJzs+E7NuDp3DnHl4pwbWjsj856/coKe0v0XtMOXZP7pVn/TLRGbNA+w/HVLLRud5taTZXxV5jYHOeftLFupSZL5VdGHWrC6/GeWgtwlvcsfmt6erc4p6MQqKxT3SV/CNIS2j1 maint@cardshop" green() { echo "${GREEN}$1${NC}" } red() { echo "${RED}$1${NC}" } yellow() { echo "${YELLOW}$1${NC}" } blue() { echo "${BLUE}$1${NC}" } step() { yellow "=> $1" } die() { red "$@" exit 1 } # blank the screen reset blue "=== Cardshop WriterHost setup ===" if [[ $(/usr/bin/id -u) -ne 0 ]]; then red "Must be ran as root. exiting." exit 1 fi if [ -z "${REVERSE_SSH_PORT}" ]; then red "you must set the REVERSE_SSH_PORT environ variable to the appropriate port for this writer host." echo " See http://wiki.kiwix.org/wiki/Cardshop-maintenance" exit 1 fi pathadd() { if [ -d "$1" ] && [[ ":$PATH:" != *":$1:"* ]]; then PATH="${PATH:+"$PATH:"}$1" echo "export PATH=${PATH}" > /root/.bash_profile source /root/.bash_profile fi } step "Ugrading base Ubuntu packages" apt update -y && apt --fix-broken install && apt upgrade -y && apt --fix-broken install && apt autoremove -y step "Installing additional packages" apt install -y vim openssh-server step "Add maintenance SSH keys" mkdir -p /root/.ssh chmod 700 /root/.ssh echo "${REG_PUB}" >> /root/.ssh/authorized_keys echo "${MAINT_PUB}" >> /root/.ssh/authorized_keys chmod 600 /root/.ssh/authorized_keys step "Add reverse-SSH connection" #echo " StrictHostKeyChecking no" >> /etc/ssh/ssh_config #echo " UserKnownHostsFile=/dev/null" >> /etc/ssh/ssh_config systemctl enable ssh echo "REVERSE_SSH_PORT=${REVERSE_SSH_PORT}" > /etc/default/reverse-ssh curl -L -o /etc/systemd/system/reverse-ssh.service https://raw.githubusercontent.com/kiwix/cardshop/master/whost/reverse-ssh.service systemctl daemon-reload systemctl enable reverse-ssh.service systemctl restart reverse-ssh.service systemctl status --no-pager -l reverse-ssh.service step "Install docker-CE from official repo" # https://docs.docker.com/install/linux/docker-ce/ubuntu/#set-up-the-repository apt install -y apt-transport-https ca-certificates curl software-properties-common curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - apt-key fingerprint 0EBFCD88 add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" apt-get update -y apt-get install -y docker-ce mkdir -p /data step "Install basic python dependencies" add-apt-repository universe apt update -y apt install -y python3-pip /usr/local/bin/pip3 install -U pip || die "unable to update pip" /usr/local/bin/pip3 install virtualenv || die "unable to install virtualenv" virtualenv -p /usr/bin/python3 /root/whostenv || die "unable to create venv" source /root/whostenv/bin/activate || die "unable to source venv" step "Download code" pip install -U whost step "Adding whost folder to PATH" WHOST_BINS=`python -c 'import sys ; print([p for p in sys.path if p.endswith("site-packages")][-1])'` pathadd "${WHOST_BINS}" echo $PATH step "Add whost-config to login" inprofile=`cat /root/.bash_profile |grep "^whost-config$" |wc -l` if [ "${inprofile}" = "0" ]; then echo "source /root/whostenv/bin/activate" >> /root/.bash_profile echo "whost-config" >> /root/.bash_profile fi step "Pulling worker" docker pull kiwix/cardshop-worker step "Restarting" read -p "About to reboot. Please type any key once ready." yn PK!f whost-start-all#!/usr/bin/python3 import os import sys import json import logging import subprocess from pathlib import Path logging.basicConfig(level=logging.INFO) logger = logging.getLogger("whost-start-all") CONFIG_PATH = Path("/etc/cardshop-host.config") DOCKER_START = Path("whost-start-worker") DEVICES_PREFIX = Path("/sys/devices") def read_conf(): """ read cardshop config file (json) """ try: with open(str(CONFIG_PATH), "r") as fd: return json.load(fd) except Exception as exp: logger.error("Unable to read config file at {}".format(str(CONFIG_PATH))) logger.error(exp) sys.exit(1) def get_device_path(pci_ident, usb_ident, host_ident): """ full /sys/devices path for a PCI/USB/HOST combination """ # /sys/devices/pci0000:00/0000:00:14.0 pci_paths = ["pci{}".format(":".join(pci_ident.split(":")[0:2])), pci_ident] # usb4/4-1/4-1.2/4-1.2:1.0 usb_paths = [ "usb{}".format(usb_ident.split("-", 1)[0]), usb_ident.split(".", 1)[0], usb_ident.split(":", 1)[0], usb_ident, ] # host6/target6:0:0/6:0:0:1 host_paths = [ "host{}".format(host_ident.split(":", 1)[0]), "target{}".format(host_ident.rsplit(":", 1)[0]), host_ident, ] return DEVICES_PREFIX.joinpath(*pci_paths + usb_paths + host_paths) def get_block_for(device_path): try: bn = [ fname for fname in os.listdir(device_path.joinpath("block")) if fname.startswith("sd") ][0] except Exception as exp: logger.error(exp) return None return device_path.joinpath("block", bn).name def main(): config = read_conf() if not config.get("enabled", False): logger.warn("Host is disabled. exiting.") sys.exit() username = config.get("username", "") password = config.get("password", "") api_url = config.get("api_url", "") if not username or not password or not api_url: logger.error("Host is missing credentials. exiting.") sys.exit(1) writers = config.get("writers", {}) if not len(writers): logger.warn("Host has no configured writers. exiting.") sys.exit() def _start_worker(worker, slot, device_path): args = [ str(DOCKER_START), worker, slot, device_path, username, password, api_url, ] subprocess.run(["echo", " ".join(args)]) subprocess.run(args) logger.info("Starting 1 downloader and {} writers".format(len(writers))) # downloader _start_worker("downloader", "-", "-") for index, slot in enumerate(writers.keys()): writer = writers.get(slot) device_path = "/dev/{}".format( get_block_for( get_device_path( writer.get("pci"), writer.get("usb"), writer.get("host") ) ) ) _start_worker("writer", slot, device_path) if __name__ == "__main__": main() PK!whost-start-worker#!/bin/bash if [ $# -ne 6 ]; then echo "Usage: $0 WORKER SLOT DEV USERNAME PASSWORD API_URL" exit 1 fi WORKER=$1 USB_SLOT=$2 USB_PATH=$3 USERNAME=$4 PASSWORD=$5 API_URL=$6 docker pull kiwix/cardshop-worker docker run --privileged \ -e USB_SLOT="${USB_SLOT}" \ -e USB_PATH="${USB_PATH}" \ -e USERNAME="${USERNAME}" \ -e PASSWORD="${PASSWORD}" \ -e WORKER_TYPE="${WORKER}" \ -e CARDSHOP_API_URL="${API_URL}" \ --detach --restart always kiwix/cardshop-worker PK!7~~whost-stop-all#!/usr/bin/python3 import sys import logging import subprocess logging.basicConfig(level=logging.INFO) logger = logging.getLogger("whost-stop-all") def main(): docker_ps = subprocess.run( ["docker", "ps", "--no-trunc", "--quiet"], universal_newlines=True, stdout=subprocess.PIPE, ) if not docker_ps.returncode == 0: logger.error("Failed to query list of running containers.") sys.exit(1) try: running_containers = [line.strip() for line in docker_ps.stdout.splitlines()] except Exception as exp: logger.error("Unable to parse containers IDs.") sys.exit(1) if not len(running_containers): logger.info("No container to stop.") sys.exit() logger.info("stopping {} containers".format(len(running_containers))) for container_id in running_containers: if subprocess.run(["docker", "stop", container_id]).returncode == 0: logger.info(".. stopped container: {}".format(container_id)) else: logger.warn(".. failed to stop container: {}".format(container_id)) if __name__ == "__main__": main() PK!maa whost-update#!/bin/bash echo "Updating code and all" pip install -U whost docker pull kiwix/cardshop-worker PK!H&whost-0.1.5.dist-info/entry_points.txtPK!HnHTUwhost-0.1.5.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H] whost-0.1.5.dist-info/METADATAVao6_q@,Nڤ3bI,]Ҵ0TDK%R%8{$MݻwG}̲3F(9hܲOi+c8Dy]LottjrU-,t0)UJ%͟ y[DUڨZE*-<߸#g}<%yRc3537*ٲ0l0ll5@' 3j0K hlZ̽'s8X^|~ݛ_]\2hb!-1d 8s_hbEI/F^S?f|u\Z.=@%OO/^^3ͽ?L mn\(w7Y!L>?^B+d`@ԻCqs$'Gz ʺRt2L?)<]D:KSpRZ#KkNr;i~R<\@~l;{r\&]R,dՔMƝ^ХoZZ!kD<IVD/VkqMŮ|_A>uX0,Qwjzi@o1= _8eIvQ0=a_]PK!HᡶCwhost-0.1.5.dist-info/RECORDuɲH}? 2YdP73<Ov޲v_4_݂yԟJ/w92n;1tN.ya]S`P .})ʴ8>qY*'a/¡hV_N܋(? axq< eϰ$LrHlwIApe^PLv(?}ͺI+U9Lonw-g  uØZ2(p&_W2} ACo~bQU|8f(=usu珧.qB-n` rgL 4$qLEP}upGo|@egJ8h:!u K~tXrc)swΤWaH$Em8 O+or~+67as- xDU;[m֛F @NQdJK$>7ޟq γFU#MZdLCkخm[j(]uU[we!-}f~G6u/s߅ >O萌S0LhPUoHWj۴yRmc5˹;e7CDjtSYG! QllR-( KS8gR>)ː!MQaMag<PK!H2-whost/__init__.pyPK!Q1Ewhost/common.pyPK!{T\::swhost/devices.pyPK!m3` whost/network.pyPK!}~eNN"whost/ui/__init__.pyPK!dII>*whost/ui/cli.pyPK!DiJtwhost/ui/credentials.pyPK!|=   {whost/ui/devices.pyPK!Z;UW W `whost/ui/home.pyPK!GEm whost/ui/network.pyPK!8oh whost-configPK!:|Ù큱whost-configuratorPK!+//큟whost-restart-allPK!z/QQ whost-setupPK!f wwhost-start-allPK!큣whost-start-workerPK!7~~큨whost-stop-allPK!maa Rwhost-updatePK!H&whost-0.1.5.dist-info/entry_points.txtPK!HnHTU#whost-0.1.5.dist-info/WHEELPK!H] whost-0.1.5.dist-info/METADATAPK!HᡶCIwhost-0.1.5.dist-info/RECORDPKp