PK!хwhost/__init__.py__version__ = "0.1.13" PK! _twhost/common.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import json import string import logging import subprocess from pathlib import Path logging.basicConfig(level=logging.INFO) CONFIG_PATH = Path("/etc/cardshop-host.config") NETPLAN_CONF = Path("/etc/netplan/50-cloud-init.yaml") DEFAULT_API_URL = "https://api.cardshop.hotspot.kiwix.org" UPDATE_SCRIPT = Path(__file__).parent.resolve().joinpath("update.sh") ALL_SLOTS = list(string.ascii_uppercase) def getLogger(name): return logging.getLogger(name) logger = getLogger(__name__) def read_conf(): """ read cardshop config file (json) """ try: with open(str(CONFIG_PATH), "r") as fd: return json.load(fd) except Exception as exp: logger.error(exp) return {} def save_conf(config): """ save cardshop config to file (json) """ try: with open(str(CONFIG_PATH), "w") as fd: json.dump(config, fd, indent=4) return True except Exception as exp: logger.error(exp) return False def update_conf(data): """ update values into cardshop config file """ config = read_conf() config.update(data) return save_conf(config) def get_next_slot(): config = read_conf() for slot in ALL_SLOTS: if slot in config.get("writers", {}).keys(): continue return slot def toggle_host(enable): if update_conf({"enabled": enable}): script = "whost-restart-all" if enable else "whost-stop-all" subprocess.run([script]) return True return False def disable_host(): return toggle_host(enable=False) # make sure we have a config if not CONFIG_PATH.exists(): save_conf( {"username": "", "password": "", "enabled": False, "api_url": DEFAULT_API_URL} ) PK!N[hwhost/devices.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu """ SD-card readers are presented and used (in docker calls) as block devices Examples: /dev/sda, /dev/mmcblk0 block_name is file name of the block device (sda for instance) Whenever card reader is present, the block device is present. If a card is in the reader, then device has a geometry that fdisk can read. /sys/class/block/ is a symlink to the /sys/devices path representing the device. /sys/devices path does not change so we store it in our config. find_device(): returns for the sole SD-card present get_device_path(block_name): /sys/devices path get_block_name(device_path): block_name from stored device path """ import os import re import subprocess from pathlib import Path from whost.common import getLogger, read_conf, update_conf logger = getLogger(__name__) BLOCK_PREFIX = Path("/sys/class/block") DEVICES_PREFIX = Path("/sys/devices") def get_writer(slot, device_path): """ shortcut to retrieve device, slot and name when reading config """ device = get_block_name(Path(device_path)) if device is None: return None return { "device": device, "name": get_display_name(device), "slot": slot, "device_path": Path(device_path), } def get_writers(): """ list of get_writer() for all writers in conf """ config = read_conf() writers = [] for slot, w in config.get("writers", {}).items(): writer = get_writer(slot, w) if writer: writers.append(writer) return writers def reset_writers(): return update_conf({"writers": {}}) def get_block_devices_list(): """ ["sdb", "sdc", ..] returning all block devices """ return [ fname for fname in os.listdir(BLOCK_PREFIX) if re.search("^sd[a-z]+$", fname) or re.search("^mmcblk[0-9]+$", fname) ] def get_removable_usb_blocks(): return filter(lambda x: is_removable(x), get_block_devices_list()) def find_device(): """ return block_name (sdX) of the first found removable block dev with geometry """ try: return next( filter( lambda x: get_block_size(x) is not None, [block_name for block_name in get_removable_usb_blocks()], ) ) except Exception: return None def _block_path(block_name): return BLOCK_PREFIX.joinpath(block_name) def _device_path(device_path): return DEVICES_PREFIX.joinpath(device_path) def get_device_path(block_name): """ hardware path to this block device """ parts = _block_path(block_name).resolve().parts return _device_path("/".join(parts[3:-2])) def get_block_name(device_path): """ current block device name for this hardware path """ try: bn = [ fname for fname in os.listdir(device_path.joinpath("block")) if fname.startswith("sd") or fname.startswith("mmcblk") ][0] except Exception as exp: logger.error(exp) return None return device_path.joinpath("block", bn).name def get_display_name(block_name): """ string from info in block/device/|| """ parts = [] for prop in ("vendor", "model", "name"): pp = _block_path(block_name).joinpath("device", prop) if pp.exists(): parts.append(open(str(pp), "r").read().strip()) return " ".join(parts) def is_removable(block_name): # internal memory card readers are non-removable yet medias are if block_name.startswith("mmcblk"): return True try: removable_p = _block_path(block_name).joinpath("removable") with open(str(removable_p), "r") as fp: return bool(int(fp.read().strip())) except Exception: return False def get_block_size(block_name): ps = subprocess.run( ["/sbin/fdisk", "-l", "/dev/{}".format(block_name)], universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if not ps.returncode == 0: return None try: return int(re.search(", ([0-9]+) bytes", ps.stdout.splitlines()[0]).groups()[0]) except Exception as exp: logger.error(exp) return 1 PK!m3` whost/network.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import ipaddress import subprocess import yaml import netifaces import requests from whost.common import getLogger, NETPLAN_CONF logger = getLogger(__name__) BLANK_CONF = {"network": {"ethernets": {}, "version": 2}} NAME_SERVERS = ["8.8.8.8", "8.8.4.4"] NETPLAN_NS = {"nameservers": {"addresses": NAME_SERVERS}} def read_netplan(): """ read netplan config file (yaml) """ with open(str(NETPLAN_CONF), "r") as fd: return yaml.load(fd.read()) def save_netplan(config, apply_conf=True): """ save netplan config file (yaml) """ with open(str(NETPLAN_CONF), "w") as fd: yaml.safe_dump(config, fd) if apply_conf: return subprocess.run(["netplan", "try", "--timeout", "1"]).returncode == 0 return True def update_netplan(data, apply_conf=True): """ update values into netplan config file """ config = read_netplan() config.update(data) save_netplan(config, apply_conf=apply_conf) def reset_netplan(): """ replace netplan config with blank one """ return save_netplan(BLANK_CONF, apply_conf=True) def get_iface_config(iface): """ simple address/netmask/gateway access to real netplan conf for an iface """ conf = read_netplan()["network"]["ethernets"].get(iface, {}) addresses = conf.get("addresses", []) address = addresses[0] if addresses else None address_str = netmask_str = None if address: try: aif = ipaddress.ip_interface(address) address_str = aif.ip.compressed netmask_str = aif.hostmask.compressed except Exception: pass return { "address": address_str, "netmask": netmask_str, "gateway": conf.get("gateway4"), } def get_interfaces(skip_loopback=True): """ list of available network interfaces """ all_ifaces = netifaces.interfaces() if skip_loopback: return filter( lambda x: not x.startswith("lo") if skip_loopback else x, all_ifaces ) return all_ifaces def is_internet_connected(): """ boolean whether kiwix website is avail via HTTP (hence internet OK) """ try: requests.head("http://kiwix.org") return True except Exception as exp: logger.error(exp) return False def save_network_config(iface, dhcp, address=None, netmask=None, gateway=None): if dhcp: return configure_dhcp(iface) return configure_static(iface, address=address, netmask=netmask, gateway=gateway) def configure_dhcp(iface): dhcp_conf = {"addresses": [], "dhcp4": True, "optional": True} dhcp_conf.update(NETPLAN_NS) netplan = read_netplan() netplan["network"]["ethernets"].update({iface: dhcp_conf}) return save_netplan(netplan) def configure_static(iface, address, netmask, gateway): nma = ipaddress.ip_interface("{a}/{m}".format(a=address, m=netmask)) fixed_conf = { "addresses": [nma.compressed], "gateway4": gateway, "dhcp4": False, "optional": True, } fixed_conf.update(NETPLAN_NS) netplan = read_netplan() netplan["network"]["ethernets"].update({iface: fixed_conf}) return save_netplan(netplan) PK!#21whost/ui/__init__.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import sys from whost.ui import cli CANCEL = "CANCEL" CANCEL_LIST = [CANCEL] def handle_cancel(value): """ print and exit if value i cancel """ if value == CANCEL: display_success("Canceling ; exiting.") sys.exit(1) def display_menu(label, choices=None, menu=None, launch=True, with_cancel=False): # built list of choices if from menu if choices is None: choices = list(menu.keys()) if with_cancel: choices += CANCEL_LIST choices_t = list(choices) def _func_name(x): if menu is None: return x return menu.get(x, (CANCEL,))[0] def _func_desc(x): return choices_t.index(x) selected = cli.ask_choice( label, choices=choices, func_name=_func_name, func_desc=_func_desc ) handle_cancel(selected) if menu is not None and launch: return menu.get(selected)[1]() return selected def display_success(*message): """ standardized way of displaying a success message """ cli.info_2(*message) def display_error(*message): """ standardized way of displaying an error message """ cli.info(cli.red, *message) def pause(*message): cli.info(message if message else "hit ENTER to continue…") cli.read_input() def restart_line(): """ reset a single printed line (useful for loader) """ sys.stdout.write("\r") sys.stdout.flush() def get_valid_string(label, validator, default=None): """ shortcut to request a string validated via a callback """ value, error = None, None while value is None: if error: cli.info(cli.yellow, error) value, error = validator(cli.ask_string(label, default=default)) return value def nonempty_validator(value): """ validator ensuring value is not empty """ if not value: return None, "Empty value not allowed" return (value, None) PK!C*IIwhost/ui/cli.pyfrom typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union, IO import argparse import datetime import difflib import functools import getpass import inspect import io import os import sys import time import traceback import colorama import unidecode import tabulate Dict, Type ConfigValue = Union[None, bool, str] FileObj = IO[str] # Global variable to store configuration CONFIG = { "verbose": os.environ.get("VERBOSE"), "quiet": False, "color": "auto", "title": "auto", "timestamp": False, "record": False, # used for testing } # type: Dict[str, ConfigValue] # used for testing _MESSAGES = list() # so that we don't re-compute this variable over # and over again: _ENABLE_XTERM_TITLE = None # should we call colorama.init()? _INITIALIZED = False Token = Union[str, "Color", "UnicodeSequence", "Symbol"] def setup( *, verbose: bool = False, quiet: bool = False, color: str = "auto", title: str = "auto", timestamp: bool = False ) -> None: """ Configure behavior of message functions. :param verbose: Whether :func:`debug` messages should get printed :param quiet: Hide every message except :func:`warning`, :func:`error`, and :func:`fatal` :param color: Choices: 'auto', 'always', or 'never'. Whether to color output. By default ('auto'), only use color when output is a terminal. :param title: Ditto for setting terminal title :param timestamp: Whether to prefix every message with a time stamp """ _setup(verbose=verbose, quiet=quiet, color=color, title=title, timestamp=timestamp) def _setup(**kwargs: ConfigValue) -> None: for key, value in kwargs.items(): CONFIG[key] = value class Color: """Represent an ANSI escape sequence """ def __init__(self, code: str): self.code = code # fmt: off reset = Color(colorama.Style.RESET_ALL) bold = Color(colorama.Style.BRIGHT) faint = Color(colorama.Style.DIM) # for some reason those are not in colorama standout = Color('\x1b[3m') underline = Color('\x1b[4m') blink = Color('\x1b[5m') overline = Color('\x1b[6m') black = Color(colorama.Fore.BLACK) red = Color(colorama.Fore.RED) green = Color(colorama.Fore.GREEN) yellow = Color(colorama.Fore.YELLOW) blue = Color(colorama.Fore.BLUE) magenta = Color(colorama.Fore.MAGENTA) cyan = Color(colorama.Fore.CYAN) white = Color(colorama.Fore.WHITE) # backward compatibility: brown = yellow # used by ui.warning lightgray = white # used by ui.debug darkred = red darkgreen = green darkblue = blue purple = magenta fuscia = magenta fuschia = magenta turquoise = cyan darkgray = black darkteal = cyan darkyellow = yellow # fmt: on # Other nice-to-have characters: class UnicodeSequence: """ Represent a sequence containing a color followed by a Unicode symbol """ def __init__(self, color: Color, as_unicode: str, as_ascii: str): if os.name == "nt": self.as_string = as_ascii else: self.as_string = as_unicode self.color = color def tuple(self) -> Tuple[Token, ...]: return (reset, self.color, self.as_string, reset) ellipsis = UnicodeSequence(reset, "…", "...") check = UnicodeSequence(green, "✓", "ok") cross = UnicodeSequence(red, "❌", "ko") class Symbol(UnicodeSequence): def __init__(self, as_unicode: str, as_ascii: str): super().__init__(reset, as_unicode, as_ascii) def tuple(self) -> Tuple[Token, ...]: return (self.as_string,) def using_colorama() -> bool: if os.name == "nt": if "TERM" not in os.environ: return True if os.environ["TERM"] == "cygwin": return True return False else: return False def config_color(fileobj: FileObj) -> bool: if CONFIG["color"] == "never": return False if CONFIG["color"] == "always": return True if os.name == "nt": # sys.isatty() is False on mintty, so # let there be colors by default. (when running on windows, # people can use --color=never) # Note that on Windows, when run from cmd.exe, # console.init() does the right thing if sys.stdout is redirected return True else: return fileobj.isatty() def write_title_string(mystr: str, fileobj: FileObj) -> None: if using_colorama(): # By-pass colorama bug: # colorama/win32.py, line 154 # return _SetConsoleTitleW(title) # ctypes.ArgumentError: argument 1: : wrong type return mystr = "\x1b]0;%s\x07" % mystr fileobj.write(mystr) fileobj.flush() def process_tokens( tokens: Sequence[Token], *, end: str = "\n", sep: str = " " ) -> Tuple[str, str]: """ Returns two strings from a list of tokens. One containing ASCII escape codes, the other only the 'normal' characters """ # Flatten the list of tokens in case some of them are of # class UnicodeSequence: flat_tokens = list() # type: List[Token] for token in tokens: if isinstance(token, UnicodeSequence): flat_tokens.extend(token.tuple()) else: flat_tokens.append(token) with_color = _process_tokens(flat_tokens, end=end, sep=sep, color=True) without_color = _process_tokens(flat_tokens, end=end, sep=sep, color=False) return (with_color, without_color) def _process_tokens( tokens: Sequence[Token], *, end: str = "\n", sep: str = " ", color: bool = True ) -> str: res = "" if CONFIG["timestamp"]: now = datetime.datetime.now() res += now.strftime("[%Y-%m-%d %H:%M:%S] ") for i, token in enumerate(tokens): if isinstance(token, Color): if color: res += token.code else: res += str(token) if i != len(tokens) - 1: res += sep res += end if color: res += reset.code return res def write_and_flush(fileobj: FileObj, to_write: str) -> None: try: fileobj.write(to_write) except UnicodeEncodeError: # Maybe the file descritor does not support the full Unicode # set, like stdout on Windows. # Use the unidecode library # to make sure we only have ascii, while still keeping # as much info as we can fileobj.write(unidecode.unidecode(to_write)) fileobj.flush() def message( *tokens: Token, end: str = "\n", sep: str = " ", fileobj: FileObj = sys.stdout, update_title: bool = False ) -> None: """ Helper method for error, warning, info, debug """ if using_colorama(): global _INITIALIZED if not _INITIALIZED: colorama.init() _INITIALIZED = True with_color, without_color = process_tokens(tokens, end=end, sep=sep) if CONFIG["record"]: _MESSAGES.append(without_color) if update_title and with_color: write_title_string(without_color, fileobj) to_write = with_color if config_color(fileobj) else without_color write_and_flush(fileobj, to_write) def fatal(*tokens: Token, **kwargs: Any) -> None: """ Print an error message and call ``sys.exit`` """ error(*tokens, **kwargs) sys.exit(1) def error(*tokens: Token, **kwargs: Any) -> None: """ Print an error message """ tokens = [bold, red, "Error:"] + list(tokens) # type: ignore kwargs["fileobj"] = sys.stderr message(*tokens, **kwargs) def warning(*tokens: Token, **kwargs: Any) -> None: """ Print a warning message """ tokens = [brown, "Warning:"] + list(tokens) # type: ignore kwargs["fileobj"] = sys.stderr message(*tokens, **kwargs) def info(*tokens: Token, **kwargs: Any) -> None: r""" Print an informative message :param tokens: list of `ui` constants or strings, like ``(cli_ui.red, 'this is an error')`` :param sep: separator, defaults to ``' '`` :param end: token to place at the end, defaults to ``'\n'`` :param fileobj: file-like object to print the output, defaults to ``sys.stdout`` :param update_title: whether to update the title of the terminal window """ if CONFIG["quiet"]: return message(*tokens, **kwargs) def info_section(*tokens: Token, **kwargs: Any) -> None: """ Print an underlined section name """ # We need to know the length of the section: process_tokens_kwargs = kwargs.copy() process_tokens_kwargs["color"] = False no_color = _process_tokens(tokens, **process_tokens_kwargs) info(*tokens, **kwargs) info("-" * len(no_color), end="\n\n") def info_1(*tokens: Token, **kwargs: Any) -> None: """ Print an important informative message """ info(bold, blue, "::", reset, *tokens, **kwargs) def info_2(*tokens: Token, **kwargs: Any) -> None: """ Print an not so important informative message """ info(bold, blue, "=>", reset, *tokens, **kwargs) def info_3(*tokens: Token, **kwargs: Any) -> None: """ Print an even less important informative message """ info(bold, blue, "*", reset, *tokens, **kwargs) def dot(*, last: bool = False, fileobj: FileObj = sys.stdout) -> None: """ Print a dot without a newline unless it is the last one. Useful when you want to display a progress with very little knowledge. :param last: whether this is the last dot (will insert a newline) """ end = "\n" if last else "" info(".", end=end, fileobj=fileobj) def info_count(i: int, n: int, *rest: Token, **kwargs: Any) -> None: """ Display a counter before the rest of the message. ``rest`` and ``kwargs`` are passed to :func:`info` Current index should start at 0 and end at ``n-1``, like in ``enumerate()`` :param i: current index :param n: total number of items """ num_digits = len(str(n)) counter_format = "(%{}d/%d)".format(num_digits) counter_str = counter_format % (i + 1, n) info(green, "*", reset, counter_str, reset, *rest, **kwargs) def info_progress(prefix: str, value: float, max_value: float) -> None: """ Display info progress in percent. :param value: the current value :param max_value: the max value :param prefix: the prefix message to print """ if sys.stdout.isatty(): percent = float(value) / max_value * 100 sys.stdout.write(prefix + ": %.0f%%\r" % percent) sys.stdout.flush() def debug(*tokens: Token, **kwargs: Any) -> None: """ Print a debug message. Messages are shown only when ``CONFIG["verbose"]`` is true """ if not CONFIG["verbose"] or CONFIG["record"]: return message(*tokens, **kwargs) def indent_iterable(elems: Sequence[str], num: int = 2) -> List[str]: """Indent an iterable.""" return [" " * num + l for l in elems] def indent(text: str, num: int = 2) -> str: """Indent a piece of text.""" lines = text.splitlines() return "\n".join(indent_iterable(lines, num=num)) def tabs(num: int) -> str: """ Compute a blank tab """ return " " * num def info_table( data: Any, *, headers: Optional[List[str]] = None, fileobj: Any = None ) -> None: if not fileobj: fileobj = sys.stdout colored_data = list() plain_data = list() for row in data: colored_row = list() plain_row = list() for item in row: colored_str, plain_str = process_tokens(item, end="") colored_row.append(colored_str) plain_row.append(plain_str) colored_data.append(colored_row) plain_data.append(plain_row) if config_color(fileobj): data_for_tabulate = colored_data else: data_for_tabulate = plain_data res = tabulate.tabulate(data_for_tabulate, headers=headers) res += "\n" write_and_flush(fileobj, res) def message_for_exception(exception: Exception, message: str) -> Sequence[Token]: """ Returns a tuple suitable for cli_ui.error() from the given exception. (Traceback will be part of the message, after the ``message`` argument) Useful when the exception occurs in an other thread than the main one. """ tb = sys.exc_info()[2] buffer = io.StringIO() traceback.print_tb(tb, file=io) # type: ignore # fmt: off return ( red, message + "\n", exception.__class__.__name__, str(exception), "\n", reset, buffer.getvalue() ) # fmt: on def read_input() -> str: """ Read input from the user """ info(green, "> ", end="") return input() def read_password() -> str: """ Read a password from the user """ info(green, "> ", end="") return getpass.getpass(prompt="") def get_ask_tokens(tokens: Sequence[Token]) -> List[Token]: return [green, "::", reset] + list(tokens) + [reset] # type: ignore def ask_string(*question: Token, default: Optional[str] = None) -> Optional[str]: """Ask the user to enter a string. """ tokens = get_ask_tokens(question) if default: tokens.append("(%s)" % default) info(*tokens) answer = read_input() if not answer: return default return answer def ask_password(*question: Token) -> str: """Ask the user to enter a password. """ tokens = get_ask_tokens(question) info(*tokens) answer = read_password() return answer FuncDesc = Callable[[Any], str] FuncName = Callable[[Any], str] def ask_choice( *prompt: Token, choices: List[Any], func_name: Optional[FuncName] = None, func_desc: Optional[FuncDesc] = None ) -> Any: """Ask the user to choose from a list of choices. :return: the selected choice ``func_desc`` will be called on every list item for displaying and sorting the list. If not given, will default to the identity function. Will loop until: * the user enters a valid index * or hits ``ctrl-c`` * or leaves the prompt empty In the last two cases, None will be returned """ if func_name is None: func_name = lambda x: str(x) if func_desc is None: func_desc = lambda x: str(x) tokens = get_ask_tokens(prompt) info(*tokens) choices.sort(key=func_desc) for i, choice in enumerate(choices, start=1): choice_desc = func_name(choice) info(" ", blue, "%i" % i, reset, choice_desc) keep_asking = True res = None while keep_asking: answer = read_input() if not answer: return None try: index = int(answer) except ValueError: info("Please enter a valid number") continue if index not in range(1, len(choices) + 1): info(str(index), "is out of range") continue res = choices[index - 1] keep_asking = False return res def ask_yes_no(*question: Token, default: bool = False) -> bool: """Ask the user to answer by yes or no""" while True: tokens = [green, "::", reset] + list(question) + [reset] if default: tokens.append("(Y/n)") else: tokens.append("(y/N)") info(*tokens) # type: ignore answer = read_input() if answer.lower() in ["y", "yes"]: return True if answer.lower() in ["n", "no"]: return False if not answer: return default warning("Please answer by 'y' (yes) or 'n' (no) ") AnyFunc = Callable[..., Any] class Timer: """ Display time taken when executing a list of statements. """ def __init__(self, description: str): self.description = description self.start_time = datetime.datetime.now() self.stop_time = datetime.datetime.now() self.elapsed_time = 0 def __call__(self, func: AnyFunc, *args: Any, **kwargs: Any) -> AnyFunc: @functools.wraps(func) def res(*args: Any, **kwargs: Any) -> Any: self.start() ret = func(*args, **kwargs) self.stop() return ret return res def __enter__(self) -> "Timer": self.start() return self def __exit__(self, *unused: Any) -> None: self.stop() def start(self) -> None: """ Start the timer """ self.start_time = datetime.datetime.now() def stop(self) -> None: """ Stop the timer and emit a nice log """ end_time = datetime.datetime.now() elapsed_time = end_time - self.start_time elapsed_seconds = elapsed_time.seconds hours, remainder = divmod(int(elapsed_seconds), 3600) minutes, seconds = divmod(remainder, 60) as_str = "%sh %sm %ss %dms" % ( hours, minutes, seconds, elapsed_time.microseconds / 1000, ) info("%s took %s" % (self.description, as_str)) def did_you_mean(message: str, user_input: str, choices: Sequence[str]) -> str: """ Given a list of choices and an invalid user input, display the closest items in the list that match the input. """ if not choices: return message else: result = { difflib.SequenceMatcher(a=user_input, b=choice).ratio(): choice for choice in choices } message += "\nDid you mean: %s?" % result[max(result)] return message def main_test_colors() -> None: this_module = sys.modules[__name__] for name, value in inspect.getmembers(this_module): if isinstance(value, Color): info(value, name) def main_demo() -> None: info("OK", check) up = Symbol("👍", "+1") info("I like it", blue, up) info_section(bold, "python-cli demo") # Monkey-patch message() so that we sleep after # each call global message old_message = message def new_message(*args: Any, **kwargs: Any) -> None: old_message(*args, **kwargs) time.sleep(1) message = new_message info_1("Important info") info_2("Secondary info") info("This is", red, "red") info("this is", bold, "bold") list_of_things = ["foo", "bar", "baz"] for i, thing in enumerate(list_of_things): info_count(i, len(list_of_things), thing) info_progress("Done", 5, 20) info_progress("Done", 10, 20) info_progress("Done", 20, 20) info("\n", check, "all done") # stop monkey patching message = old_message fruits = ["apple", "orange", "banana"] answer = ask_choice("Choose a fruit", choices=fruits) info("You chose:", answer) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("action", choices=["test_colors", "demo"]) args = parser.parse_args() if args.action == "demo": main_demo() elif args.action == "test_colors": main_test_colors() if __name__ == "__main__": main() PK!~?Ƶwhost/ui/credentials.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import requests from whost.common import getLogger, read_conf, DEFAULT_API_URL, update_conf from whost.ui import cli, get_valid_string, nonempty_validator, pause logger = getLogger(__name__) def is_authenticated(): """ boolean if in-config credentials could authenticate """ config = read_conf() try: req = requests.post( url="{}/auth/authorize".format(config.get("api_url")), headers={ "username": config.get("username"), "password": config.get("password"), "Content-type": "application/json", }, ) req.raise_for_status() except Exception as exp: logger.error(exp) return False else: return True def configure_credentials(): """ get username/password/api_url from user and save to config file """ cli.info_2("Credentials") config = read_conf() username = get_valid_string("Username", nonempty_validator, config.get("username")) password = get_valid_string("Password", nonempty_validator, config.get("password")) api_url = get_valid_string( "API URL – use `reset` to use default", nonempty_validator, config.get("api_url", DEFAULT_API_URL), ) if api_url == "reset": api_url = DEFAULT_API_URL update_conf({"username": username, "password": password, "api_url": api_url}) cli.info_2( "Saved crentials as", cli.bold, username, cli.reset, "/", cli.bold, password, cli.reset, "for", cli.bold, api_url, ) pause() PK!՟  whost/ui/devices.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import time from collections import OrderedDict import humanfriendly from whost.ui import cli, display_menu, display_success, display_error, pause from whost.common import getLogger, get_next_slot, read_conf, update_conf, disable_host from whost.devices import ( get_writers, get_display_name, find_device, get_block_size, get_device_path, reset_writers, ) logger = getLogger(__name__) def reset_devices(): cli.info_2("Reseting devices configuration") ready = cli.ask_yes_no("Sure you want to remove all devices conf?", default=False) if not ready: return if reset_writers(): display_success("Devices configuration removed.") else: display_error("Failed to reset devices configuration.") pause() def add_device(): cli.info_2("Please remove all SD-cards from all writers.") ready = cli.ask_yes_no("Ready?", default=False) if not ready: return cli.info( "Great! Now please insert", cli.bold, "one", cli.reset, "SD-card into the writer you want to configure.", ) cli.info_3("waiting for card", end="") block_name = None while block_name is None: time.sleep(1) cli.dot() block_name = find_device() cli.info("FOUND") # we now have a new DEVICE. device_path = get_device_path(block_name) slot = get_next_slot() # update configured writers list writers = read_conf().get("writers", {}) writers.update({slot: str(device_path)}) if update_conf({"writers": writers}): display_success( "Found your", humanfriendly.format_size(get_block_size(block_name), binary=True), "card on", cli.bold, block_name, cli.reset, "({})".format(get_display_name(block_name)), ".\n", "Assigned slot:", cli.bold, slot, ) else: display_error("Failed to configure a slot for", cli.bold, block_name) pause() def configure_devices(): cli.info_1("Already configured writer devices") try: writers = get_writers() except Exception as exp: logger.error(exp) display_error( "Configured devices are not present! " "Reseting devices conf and disabling host.\n" "Please configure devices and re-enable it." ) reset_writers() disable_host() for writer in writers: cli.info( cli.blue, " *", cli.reset, cli.bold, "{slot}:/dev/{device}".format(**writer), cli.reset, "({name} at {device_path})".format(**writer), ) cli.info("") menu = OrderedDict( [ ("reset-writers", ("Reset writers config (remove ALL)", reset_devices)), ("add-device", ("Add one device", add_device)), ] ) display_menu("Choose:", menu=menu, with_cancel=True) PK!] whost/ui/home.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import sys import subprocess from collections import OrderedDict from whost.common import read_conf, toggle_host from whost.ui import ( cli, restart_line, display_menu, display_success, display_error, pause, ) from whost.ui.devices import configure_devices from whost.ui.network import configure_network from whost.ui.credentials import configure_credentials, is_authenticated from whost.network import is_internet_connected def exit_to_shell(): display_success("Exiting to shell.") sys.exit(10) def exit_to_logout(): display_success("Exiting to logout.") sys.exit(20) def update_code(): cli.info_2("Launching update script…") subprocess.run(["whost-update"]) def display_toggle_host(): enabled = read_conf().get("enabled", False) answer = cli.ask_yes_no( "You are about to {} this host. Are you sure?".format( "disable" if enabled else "enable" ) ) if answer: enabled = not enabled # toggled ns = "enabled" if enabled else "disabled" if toggle_host(enabled): display_success("Successfuly {} host!".format(ns)) else: display_error("Error: host could not be {}.".format(ns)) pause() def display_home(): cli.info_section(cli.purple, "Hotsport Cardshop writer-host configurator") print("Checking internet connection…", end="", flush=True) connected = is_internet_connected() authenticated = is_authenticated() if connected else False restart_line() config = read_conf() connected_str = "CONNECTED" if connected else "NOT CONNECTED" connected_color = cli.green if connected else cli.red authenticated_str = "AUTHENTICATED" if authenticated else "NOT AUTHENTICATED" authenticated_color = cli.green if authenticated else cli.red enabled = config.get("enabled", False) enabled_color = cli.green if enabled else cli.red enabled_str = "ENABLED" if enabled else "DISABLED" configured_readers = len(config.get("writers", {})) configured_readers_color = cli.yellow if configured_readers else cli.red menu = OrderedDict( [ ("configure-network", ("Configure Network", configure_network)), ("configure-credentials", ("Configure Credentials", configure_credentials)), ("configure-readers", ("Configure USB Writers", configure_devices)), ("update-code", ("Update code and restart", update_code)), ( "toggle-host", ( "{} this Host".format("Enable" if not enabled else "Disable"), display_toggle_host, ), ), ("exit-to-shell", ("Exit to a shell", exit_to_shell)), ("logout", ("Exit (logout)", exit_to_logout)), ] ) cli.info_1("Internet Connectivity:", connected_color, connected_str) cli.info_1("Authentication:", authenticated_color, authenticated_str) cli.info_1("Host Status:", enabled_color, enabled_str) cli.info_1("Configured Writers:", configured_readers_color, str(configured_readers)) display_menu("Choose:", menu=menu) PK!'V  whost/ui/network.py#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu import ipaddress from collections import OrderedDict from whost.ui import ( cli, display_menu, get_valid_string, display_error, display_success, pause, ) from whost.network import ( get_interfaces, save_network_config, reset_netplan, get_iface_config, ) def ipadress_validator(value): """ IPv4 address validator """ try: ipaddress.ip_address(value).compressed except ValueError: return None, "Invalid IPv4 address format" return (value, None) def reset_network_config(): """ restore initial blank /etc/network/configure """ if reset_netplan(): display_success("Network configuration has been reset via netplan.") else: display_error("Failed to reset network config via netplan.") def configure_network(): cli.info_2("You need one, direct, ethernet Internet connection.") menu = OrderedDict( [ ( "reset-network-config", ("Reset network config (remove ALL interfaces)", reset_network_config), ), ("configure-iface", ("Configure Interface", configure_iface)), ] ) display_menu("Choose:", menu=menu, with_cancel=True) def configure_iface(): # pick interface ifaces = list(get_interfaces()) iface = display_menu("Choose Interface:", choices=ifaces, with_cancel=True) cli.info("You selected", cli.bold, iface) iface_config = get_iface_config(iface) from pprint import pprint as pp pp(iface_config) # select method (dhcp, fixed) dhcp = "dhcp" fixed = "fixed" methods = [dhcp, fixed] method = display_menu("Connection method:", choices=methods, with_cancel=True) if method == dhcp: if save_network_config(iface, dhcp=True): cli.info_2( "Saved your", cli.bold, iface, cli.reset, "configuration as", cli.bold, "DHCP", ) else: display_error("Unable to save DHCP network config for", cli.bold, iface) pause() return # fixed method config # get address address = get_valid_string( "IP Address", ipadress_validator, default=iface_config["address"] ) cli.info("You entered", cli.bold, address) netmask = get_valid_string( "Netmask", ipadress_validator, default=iface_config["netmask"] ) cli.info("You entered", cli.bold, netmask) gateway = get_valid_string( "Gateway", ipadress_validator, default=iface_config["gateway"] ) cli.info("You entered", cli.bold, gateway) if save_network_config( iface, dhcp=False, address=address, netmask=netmask, gateway=gateway ): cli.info_2( "Saved your", cli.bold, iface, cli.reset, "configuration as", cli.bold, address, cli.reset, "/", cli.bold, netmask, cli.reset, "/", cli.bold, gateway, ) else: display_error("Unable to save fixed network config for", cli.bold, iface) pause() PK!8oh whost-config#!/bin/bash ret=0 for (( ; ; )) do reset whost-configurator ret=$? # exit to shell if [ $ret -gt 2 ]; then break fi done if [ $ret -ge 20 ]; then kill -HUP $PPID fi PK!:|Ùwhost-configurator#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: ai ts=4 sts=4 et sw=4 nu from whost.ui.home import display_home def main(): display_home() if __name__ == '__main__': main() PK!+//whost-restart-all#!/bin/bash whost-stop-all && whost-start-all PK!hi& whost-setup#!/bin/bash RED=`tput setaf 1` GREEN=`tput setaf 2` YELLOW=`tput setaf 3` BLUE=`tput setaf 4` NC=`tput sgr0` MAINT_PUB="ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCx9Bfw8wkceadnounwXVHInI0FyNEj3z64bqXA8cwbgkqkXTVWnI3I6vUzKY8dSfL8PXydCaVnGxogP88Y294k4rjIf8NGubwNe5B2oyLNuscBhd1QWzEmvr4ej32I1Ot3oulJsbqt7oSKUr6pQ4fD44WXjGNaQx3WhbsSJb28k4rNRs4bY+HlScsaKlfVRpE+kuI64BNPl4+IVfkJzs+E7NuDp3DnHl4pwbWjsj856/coKe0v0XtMOXZP7pVn/TLRGbNA+w/HVLLRud5taTZXxV5jYHOeftLFupSZL5VdGHWrC6/GeWgtwlvcsfmt6erc4p6MQqKxT3SV/CNIS2j1 maint@cardshop" green() { echo "${GREEN}$1${NC}" } red() { echo "${RED}$1${NC}" } yellow() { echo "${YELLOW}$1${NC}" } blue() { echo "${BLUE}$1${NC}" } step() { yellow "=> $1" } die() { red "$@" exit 1 } # blank the screen reset blue "=== Cardshop WriterHost setup ===" if [[ $(/usr/bin/id -u) -ne 0 ]]; then red "Must be ran as root. exiting." exit 1 fi if [ -z "${REVERSE_SSH_PORT}" ]; then red "you must set the REVERSE_SSH_PORT environ variable to the appropriate port for this writer host." echo " See http://wiki.kiwix.org/wiki/Cardshop-maintenance" exit 1 fi # add a path to PATH if not present pathadd() { if [ -d "$1" ] && [[ ":$PATH:" != *":$1:"* ]]; then PATH="${PATH:+"$PATH:"}$1" echo "export PATH=${PATH}" > /root/.bash_profile source /root/.bash_profile fi } # add line to a file if not present addifnot() { LINE=$1 FILE=$2 grep -qF -- "$LINE" "$FILE" || echo "$LINE" >> "$FILE" } step "Ugrading base Ubuntu packages" echo 'Acquire::ForceIPv4 "true";' > /etc/apt/apt.conf.d/99force-ipv4 apt update -y && apt --fix-broken install && apt upgrade -y && apt --fix-broken install && apt autoremove -y step "Installing additional packages" apt install -y vim openssh-server step "Add maintenance SSH keys" mkdir -p /root/.ssh chmod 700 /root/.ssh addifnot "${MAINT_PUB}" /root/.ssh/authorized_keys chmod 600 /root/.ssh/authorized_keys step "Add reverse-SSH connection" #echo " StrictHostKeyChecking no" >> /etc/ssh/ssh_config #echo " UserKnownHostsFile=/dev/null" >> /etc/ssh/ssh_config systemctl enable ssh echo "REVERSE_SSH_PORT=${REVERSE_SSH_PORT}" > /etc/default/reverse-ssh curl -L -o /etc/systemd/system/reverse-ssh.service https://raw.githubusercontent.com/kiwix/cardshop/master/whost/reverse-ssh.service systemctl daemon-reload systemctl enable reverse-ssh.service systemctl restart reverse-ssh.service systemctl status --no-pager -l reverse-ssh.service step "Install docker-CE from official repo" # https://docs.docker.com/install/linux/docker-ce/ubuntu/#set-up-the-repository apt install -y apt-transport-https ca-certificates curl software-properties-common curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - apt-key fingerprint 0EBFCD88 add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" apt-get update -y apt-get install -y docker-ce mkdir -p /data step "Install basic python dependencies" add-apt-repository universe apt update -y apt install -y python3-pip /usr/bin/pip3 install -U pip || die "unable to update pip" /usr/local/bin/pip3 install virtualenv || die "unable to install virtualenv" virtualenv -p /usr/bin/python3 /root/whostenv || die "unable to create venv" source /root/whostenv/bin/activate || die "unable to source venv" step "Download code" pip install -U whost step "Adding whost folder to PATH" WHOST_BINS=`python -c 'import sys ; print([p for p in sys.path if p.endswith("site-packages")][-1])'` pathadd "${WHOST_BINS}" echo $PATH step "Add whost-config to login" addifnot "source /root/whostenv/bin/activate" /root/.bash_profile addifnot "whost-config" /root/.bash_profile step "Pulling worker" docker pull kiwix/cardshop-worker docker update --restart=unless-stopped $(docker ps -a -q) step "Start containers on boot" echo "whost-start-all" > /etc/rc.local chmod +x /etc/rc.local step "Restarting" read -p "About to reboot. Press ENTER once ready." shutdown -r now PK!} whost-start-all#!/usr/bin/python3 import os import sys import json import logging import subprocess from pathlib import Path logging.basicConfig(level=logging.INFO) logger = logging.getLogger("whost-start-all") CONFIG_PATH = Path("/etc/cardshop-host.config") DEVICES_PREFIX = Path("/sys/devices") def read_conf(): """ read cardshop config file (json) """ try: with open(str(CONFIG_PATH), "r") as fd: return json.load(fd) except Exception as exp: logger.error("Unable to read config file at {}".format(str(CONFIG_PATH))) logger.error(exp) sys.exit(1) def get_block_name(device_path): """ current block device name for this hardware path """ try: bn = [ fname for fname in os.listdir(device_path.joinpath("block")) if fname.startswith("sd") or fname.startswith("mmcblk") ][0] except Exception as exp: logger.error(exp) return None return device_path.joinpath("block", bn).name def main(): config = read_conf() if not config.get("enabled", False): logger.warn("Host is disabled. exiting.") sys.exit() username = config.get("username", "") password = config.get("password", "") api_url = config.get("api_url", "") if not username or not password or not api_url: logger.error("Host is missing credentials. exiting.") sys.exit(1) writers = config.get("writers", {}) if not len(writers): logger.warn("Host has no configured writers. exiting.") sys.exit() logger.info("Calling stop-all first so we don't duplicate workers") subprocess.run(["whost-stop-all"]) def _start_worker(worker, slot, device_path): args = [ "whost-start-worker", worker, slot, device_path, username, password, api_url, ] subprocess.run(["echo", " ".join(args)]) subprocess.run(args) logger.info("Starting 1 downloader and {} writers".format(len(writers))) # downloader _start_worker("downloader", "-", "-") for index, slot in enumerate(writers.keys()): block_name = get_block_name(Path(writers.get(slot))) if block_name is None: logger.error("\tnot starting its writer container.") continue device_path = "/dev/{}".format(block_name) _start_worker("writer", slot, device_path) if __name__ == "__main__": main() PK!C__whost-start-worker#!/bin/bash if [ $# -ne 6 ]; then echo "Usage: $0 WORKER SLOT DEV USERNAME PASSWORD API_URL" exit 1 fi WORKER=$1 USB_SLOT=$2 USB_DEVICE=$3 USERNAME=$4 PASSWORD=$5 API_URL=$6 USB_PATH="/dev/sdcard" docker pull kiwix/cardshop-worker docker run --privileged \ --name cardshop-worker-$WORKER \ --device=${USB_DEVICE}:${USB_PATH} \ -e USB_PATH="${USB_PATH}" \ -e USB_SLOT="${USB_SLOT}" \ -e HOST_DEVICE="${USB_DEVICE}" \ -e USERNAME="${USERNAME}" \ -e PASSWORD="${PASSWORD}" \ -e WORKER_TYPE="${WORKER}" \ -e CARDSHOP_API_URL="${API_URL}" \ --detach --restart unless-stopped kiwix/cardshop-worker PK!7~~whost-stop-all#!/usr/bin/python3 import sys import logging import subprocess logging.basicConfig(level=logging.INFO) logger = logging.getLogger("whost-stop-all") def main(): docker_ps = subprocess.run( ["docker", "ps", "--no-trunc", "--quiet"], universal_newlines=True, stdout=subprocess.PIPE, ) if not docker_ps.returncode == 0: logger.error("Failed to query list of running containers.") sys.exit(1) try: running_containers = [line.strip() for line in docker_ps.stdout.splitlines()] except Exception as exp: logger.error("Unable to parse containers IDs.") sys.exit(1) if not len(running_containers): logger.info("No container to stop.") sys.exit() logger.info("stopping {} containers".format(len(running_containers))) for container_id in running_containers: if subprocess.run(["docker", "stop", container_id]).returncode == 0: logger.info(".. stopped container: {}".format(container_id)) else: logger.warn(".. failed to stop container: {}".format(container_id)) if __name__ == "__main__": main() PK!maa whost-update#!/bin/bash echo "Updating code and all" pip install -U whost docker pull kiwix/cardshop-worker PK!H'whost-0.1.13.dist-info/entry_points.txtPK!HnHTUwhost-0.1.13.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HS<whost-0.1.13.dist-info/METADATAVo8bzϒåI5M ;pixD-I1 ,qqgD&*]q2nD%OiShQ2M'Ѽ*at!Lf oF9i#tuiq#r)kdlڨD|Ņvn]T軛O7_~[WhsJF֢(?Uw*y/b4H$q|&9.JaZ) _΍*Ut-K:ź?ƿN,>OxRYǛx1.5 F%o(y{XWF:+T5@O0tj%RiQ=O:=NAʉe[ 'QVLu7>;fBnv}J3h6NmrELR]MNѥQ ];Yv@{N> WEJڄNKN-aD-TCS`j8.;>wD[qƕeTM^`TגJIwC828[R TʯAGVҥXvmMH#n Nx C Y5zU#KI m4pN#3{ׯFѨ;*Xrwyl=:,puwOQm#Ys8#'`ޟæӃ$`#̵?ZQ*=\|SI=sZY,Hn}텗#J(y8404!TxLEX\MZSŒ !wKh:QQO~%yhƴB*N,K!H+і 8~Z&v*xu,zRs(77o0ʾFWu^*[Ov1Jխ{n>hP}Zjtϝ8Jmw}c~ /HoeqKYhZMh1n{BaB=Ȥ} ?iTY"]rA!N[pA\]562|A;9}Jq6Ӂw;;ʧ '[n2"Q ^ ovyY0Y@a+ա"@56䒕U by^_\hqus{5[xϥkz6.yjgK00OfHbJΐWb-qȧR AIx TuYdP4sH[Ysq ¿2{Ǵz +C8/tń rg'@.ޣ|US!5T;ŮK CΘEld/'0HaAYW`p옅|2ˤSch>fOr׳SR \vߝeZ))XUqÆ$ 4 f }K 9i^}Zn%w7ǽF1DTT:>4j6~۷`bpi] cAxږ>UfkΘ4BeCUZ?D  mc\`>n igws0Zx30 Am+J#ks/}{*aJ_ƒh1QW nPKY>%o~ߟ♭8³)$xz;N1aW?l9D< ńQPK!HM_Gwhost-0.1.13.dist-info/RECORDuɖX}= d1 ,j"L"† 2_F>iٹc?S gu6[@}`Or=:Ab'W4٢g*˶s݌0xd5 uMWi)|INm)D= JeXz2|I,p7`)Kћ t $_F\j@RPʘIVk41٘_wV S9q5pJ;iPkƯrIe|eH>)WgG{84(,(ʅ*:]*ByJdY%nƹPlHGbc9 KnYDHץX;{N!E/+U%6Qv_`O)1,\[+3qΎ==}?Gen'渆ӂM|r8Ҏ-/DG08uݞȅgj+ &lɔ`fj̥q>/ct=̌]@q{djT+R2ZѱxHȇt:̫ZUEh]zɥ!WEEW3q6y KY^IK?ЎN!\IuTXS1-*aN5; ?PK!хwhost/__init__.pyPK! _tFwhost/common.pyPK!N[hwhost/devices.pyPK!m3` |whost/network.pyPK!#21_%whost/ui/__init__.pyPK!C*IIL-whost/ui/cli.pyPK!~?Ƶbwwhost/ui/credentials.pyPK!՟  L~whost/ui/devices.pyPK!] whost/ui/home.pyPK!'V  whost/ui/network.pyPK!8oh 클whost-configPK!:|Ù큤whost-configuratorPK!+//큒whost-restart-allPK!hi& whost-setupPK!} 큻whost-start-allPK!C__큮whost-start-workerPK!7~~=whost-stop-allPK!maa whost-updatePK!H'rwhost-0.1.13.dist-info/entry_points.txtPK!HnHTUwhost-0.1.13.dist-info/WHEELPK!HS<Gwhost-0.1.13.dist-info/METADATAPK!HM_GEwhost-0.1.13.dist-info/RECORDPKh