PKVLOOmidgard/__init__.py"""Midgard, the Python Geodesy library Midgard is a collection of useful Python utilities used by the Geodetic institute at the Norwegian Mapping Authority (Kartverket). Although some of these are geodesy-specific, many are also useful in more general settings. Note: Midgard is still in pre-alpha status. Its functionality will change, and it should not be depended on in any production-like setting. Midgard comes organized into different subpackages: {subpackages} Look for help inside each subpackage: >>> from midgard import subpackage >>> help(subpackage) Current maintainers: -------------------- {maintainers} """ # Standard library imports from datetime import date as _date from collections import namedtuple as _namedtuple from pathlib import Path as _Path # Version of Midgard. # # This is automatically set using the bumpversion tool __version__ = "0.1.4" # Authors of Midgard. _Author = _namedtuple("_Author", ["name", "email", "start", "end"]) _AUTHORS = [_Author("Geir Arne Hjelle", "geir.arne.hjelle@kartverket.no", _date.min, _date.max)] __author__ = ", ".join(a.name for a in _AUTHORS if a.start < _date.today() < a.end) __contact__ = ", ".join(a.email for a in _AUTHORS if a.start < _date.today() < a.end) # Copyleft of the library __copyright__ = f"2018 - {_date.today().year} Norwegian Mapping Authority" # Update doc with info about subpackages and maintainers def _update_doc(doc): """Add information to doc-string Args: doc (str): The doc-string to update. Returns: str: The updated doc-string """ # Subpackages subpackage_paths = _Path(__file__).parent.iterdir() subpackage_list = [p.name for p in subpackage_paths if p.is_dir() and not p.name.startswith("_")] subpackages = "\n".join(f"+ {p}" for p in subpackage_list) # Maintainers maintainer_list = [f"+ {a.name} <{a.email}>" for a in _AUTHORS if a.start < _date.today() < a.end] maintainers = "\n".join(maintainer_list) # Add to doc-string return doc.format(subpackages=subpackages, maintainers=maintainers) __doc__ = _update_doc(__doc__) PK4Lmidgard/collections/__init__.pyPK=Lip: : midgard/collections/enums.py"""Framework for working with enumerations Description: ------------ Custom enumerations used for structured names. """ # Standard library imports import enum from typing import Callable, Dict, List # Midgard imports from midgard.dev import exceptions # Dictionary of Enumerations. Populated by the @register_enum-decorators. _ENUMS: Dict[str, enum.EnumMeta] = dict() def register_enum(name: str) -> Callable[[enum.EnumMeta], enum.EnumMeta]: """Register a named Enumeration This allows for getting Enumerations with the get_enum-function. Args: name: Name used for Enumeration. Returns: Decorator that registers an Enumeration. """ def register_decorator(enum_cls: enum.EnumMeta) -> enum.EnumMeta: _ENUMS[name] = enum_cls return enum_cls return register_decorator def enums() -> List[str]: """Return a list of available enums Returns: Names of available enums. """ return sorted(_ENUMS) def get_enum(name: str) -> enum.EnumMeta: """Return a named Enumeration Names are defined by the @register_enum-decorator. If the name-parameter is not a valid enum, the function will raise an UnknownEnumError and list the available enumerations. Args: name: Name used for Enumeration. Returns: Enumeration with the given name. """ try: return _ENUMS[name] except KeyError: valid_enums = ", ".join(e for e in _ENUMS) raise exceptions.UnknownEnumError( f"Enumeration '{name}' is not defined. Available enumerations are {valid_enums}." ) from None def get_value(name: str, value: str) -> enum.Enum: """Return the value of a named Enumeration Names are defined by the @register_enum-decorator. Args: name: Name used for Enumeration. value: Value of Enumeration. Returns: Value of enumeration with the given name. """ try: return get_enum(name)[value] except KeyError: valid_values = ", ".join(v.name for v in get_enum(name)) # type: ignore raise ValueError( f"Value '{value}' is not valid for a {name}-enumeration. Valid values are {valid_values}." ) from None # # ENUMS # @register_enum("log_level") class LogLevel(enum.IntEnum): """Levels used when deciding how much log output to show""" all = enum.auto() debug = enum.auto() time = enum.auto() dev = enum.auto() info = enum.auto() warn = enum.auto() check = enum.auto() error = enum.auto() fatal = enum.auto() none = enum.auto() PKJL'midgard/collections/dataset/__init__.pyPKwL&midgard/collections/dataset/dataset.pyPKzL$midgard/collections/dataset/table.pyPK\Lmidgard/config/__init__.pyPK Ls^((midgard/config/config.py"""Midgard library module for handling of configuration settings Description: ------------ A Configuration consists of one or several sections. Each ConfigurationSection consists of one or more entries. Each ConfigurationEntry consists of a key and a value. Examples: --------- For basic use, an entry is looked up by simple attribute access. For instance if `cfg` is a Configuration with the section `midgard` which has an entry `foo = bar`: >>> cfg.midgard.foo ConfigurationEntry('foo', 'bar') ConfigurationEntry has several access methods that convert the entry to a given data type: >>> cfg.midgard.foo_pi ConfigurationEntry('foo_pi', '3.14') >>> cfg.midgard.foo_pi.float 3.14 >>> cfg.midgard.foo_pi.str '3.14' >>> cfg.midgard.foo_pi.tuple ('3.14', ) Sources: -------- Each configuration entry records its source. That is, where that entry was defined. Examples include read from file, set as a command line option, or programmatically from a dictionary. The source can be looked up on an individual entry, or for all entries in a configuration. >>> cfg.midgard.foo.source '/home/midgard/midgard.conf' >>> cfg.sources {'/home/midgard/midgard.conf', 'command line'} Profiles: --------- Parent Configuration: --------------------- Master Section: --------------- Replacement Variables: ---------------------- Help text and Type hints: ------------------------- """ # Standard library imports import builtins from configparser import ConfigParser, BasicInterpolation, ExtendedInterpolation from contextlib import contextmanager import datetime as stdlib_datetime import os.path import pathlib import re import sys from collections import UserDict # Where imports from midgard.dev import console from midgard.collections import enums from midgard.dev import exceptions # Typing from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union ProfileName = Union[None, str] SectionName = str Sections = Dict[SectionName, "ConfigurationSection"] EntryName = str ConfigVars = Dict[str, Any] # Date and datetime formats FMT_date = "%Y-%m-%d" FMT_datetime = "%Y-%m-%d %H:%M:%S" FMT_dt_file = "%Y%m%d-%H%M%S" class Configuration(): """Represents a Configuration""" def __init__(self, name: str) -> None: """Initialize a Configuration The name is used purely for representation and error messages. Args: name: Name of configuration. """ self.name = name self.parent_config = None self.master_section = None self._profiles: List[ProfileName] = [None] self._profile_sections: Dict[ProfileName, Sections] = dict() self._sections: Sections = dict() self._vars_dict: ConfigVars = dict() self._update_count: int = 0 @classmethod def read_from_file(cls, cfg_name: str, *file_paths: Union[str, pathlib.Path]) -> "Configuration": """Read a configuration from one or more files Args: file_paths: File(s) that will be read. Returns: A Configuration representing the file(s). """ cfg = cls(cfg_name) for file_path in file_paths[::-1]: cfg.update_from_file(file_path) return cfg @classmethod @contextmanager def update_on_file(cls, file_path: Union[str, pathlib.Path], **as_str_args) -> None: """Context manager for updating a configuration on file """ # Read config from file cfg = cls.read_from_file("Temporary", file_path) update_count_before = cfg._update_count # Yield config so it can be updated yield cfg # Write config if it has been updated if cfg._update_count > update_count_before: cfg.write_to_file(file_path, **as_str_args) def write_to_file(self, file_path: Union[str, pathlib.Path], **as_str_args) -> None: """Write the configuration to a file In addition to the file path, arguments can be specified and will be passed on to the as_str() function. See `as_str()` for more information. Todo: Use files.open_path """ file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, mode="w") as fid: fid.write(self.as_str(**as_str_args) + "\n") @property def sections(self) -> List[SectionName]: """List of sections in Configuration""" return list(self._sections.keys()) @property def sources(self) -> Set[str]: """Sources of entries in Configuration""" return {self[s][k].source for s in self.sections for k in self[s].keys() if self[s][k].source} @property def profiles(self) -> List[ProfileName]: """List of profiles currently being used in Configuration""" return self._profiles @profiles.setter def profiles(self, values: Union[None, List[ProfileName]]) -> None: """Set profiles that will be used in Configuration The list of profiles should be a prioritized list where the first profile listed will be used if available. None is used to indicate default values (no profile), and will be automatically appended at the end of the list of profiles. To not use any profiles, set `cfg.profiles = None`. Args: values: List of profiles to use. """ if values is None: values = [None] elif values[-1] is not None: values.append(None) self._profiles = values self._set_sections_for_profiles() def _set_sections_for_profiles(self) -> None: """Update sections according to profiles""" self._sections.clear() # Add values in reverse order so that the first profile is prioritized for profile in self.profiles[::-1]: for section_name, profile_section in self._profile_sections.get(profile, dict()).items(): self._sections.setdefault(section_name, ConfigurationSection(section_name)) for key, entry in profile_section.items(): self._sections[section_name][key] = entry @property def parent_config(self) -> "Configuration": """The parent configuration""" if self._parent_config is None: raise exceptions.MissingConfigurationError( f"Configuration '{self.name}' has not defined a parent configuration" ) return self._parent_config @parent_config.setter def parent_config(self, cfg: Optional["Configuration"]) -> None: """Set the parent configuration""" self._parent_config = cfg @property def master_section(self) -> "ConfigurationSection": """The master section""" if self._master_section is None: raise exceptions.MissingSectionError(f"Configuration '{self.name}' has not defined a master section") try: return self._sections[self._master_section] except KeyError: return ConfigurationSection("undefined") @master_section.setter def master_section(self, section: Optional[SectionName]): """Set the master section""" if section is None or section in self._sections: self._master_section = section else: raise exceptions.MissingSectionError(f"Configuration '{self.name}' does not contain section '{section}'") def get( self, key: str, value: Optional[str] = None, section: Optional[str] = None, default: Optional[str] = None ) -> "ConfigurationEntry": """Get an entry from a configuration with possibility for override and default value A value for an entry is found using the following priorities: 1. An explicit value given in `value`. None is used as a marker for no value. 2. Looked up in the current configuration. 3. Looked up in any parent confiurations that are defined. 4. The default value is used. If `value` is not None, that value is simply returned as a `ConfigurationEntry`. If `default` is not given (is None), and a value is not found in any other way, a MissingEntryError is raised. Args: key: Name of option (key in the configuration entry). value: Value of entry. Used for overriding the configuration. section: Section in the configuration in which to look up the key. default: Default value that is returned if value is not found any other way. Returns: Entry representing the value. """ if value is not None: return ConfigurationEntry(key, value=value, source="method call", vars_dict=self.vars) try: return self.master_section[key] if section is None else self[section][key] except (exceptions.MissingSectionError, exceptions.MissingEntryError): try: return self.parent_config.get(key=key, section=section) except (exceptions.MissingConfigurationError, exceptions.MissingEntryError): if default is None: return self.master_section[key] if section is None else self[section][key] else: return ConfigurationEntry(key, value=default, source="default value", vars_dict=self.vars) def update( self, section: SectionName, key: str, value: str, *, profile: ProfileName = None, source: str = "unknown", meta: Optional[Dict[str, str]] = None, allow_new: bool = True, _update_sections: bool = True, ) -> None: """Update a configuration section with a configuration entry If `allow_new` is False, the configuration entry must already exist. If it is True the update is allowed to create a new section and a new entry is necessary. The `_update_sections` flag can be used to not update the sections of the configuration, only the profiles. This should typically not be done, but is used by some of the other update methods which update the sections themselves. Args: section: Section to update. key: Key of entry. value: Value of entry. profile: Profile to update. source: Source of the update. meta: Metadata like help text and type hints for the entry. allow_new: Whether to allow the creation of a new section and entry. """ if not allow_new: profile_str = "" if profile is None else f"(profile: '{profile}')" if section not in self._sections: raise exceptions.MissingSectionError( f"Configuration '{self.name}' does not contain section '{section}' {profile_str}" ) if key not in self._sections[section]: raise exceptions.MissingEntryError( f"Section '{section}' of configuration '{self.name}' does not contain entry '{key}' {profile_str}" ) # Add entry to profile source = source if profile is None else f"{source} ({profile})" profile_sections = self._profile_sections.setdefault(profile, dict()) profile_sections.setdefault(section, ConfigurationSection(section)) # Record that configuration has been updated if key not in profile_sections[section] or profile_sections[section][key]._value != value: self._update_count += 1 profile_sections[section][key] = ConfigurationEntry( key, value=value, source=source, meta=meta, vars_dict=self.vars ) # Update sections if _update_sections: self._set_sections_for_profiles() def update_from_file( self, file_path: Union[str, pathlib.Path], allow_new: bool = True, interpolate: bool = False ) -> None: """Update the configuration from a configuration file The Python ConfigParser is used to read the file. The file format that is supported is described at https://docs.python.org/library/configparser.html Different profiles in a configuration file is denoted by double underscores in the sections names. For instance does the following configuration have a `foo` profile in the `spam` section (in addition to the default profile): [spam] ... [spam__foo] ... If `interpolate` is set to True, ExtendedInterpolation of variables in the configuration file is used. See https://docs.python.org/library/configparser.html#configparser.ExtendedInterpolation for details. Args: file_path: Path to the configuration file. allow_new: Whether to allow the creation of new sections and entries. interpolate: Whether to interpolate variables in the configuration file. """ # Use ConfigParser to read from file cfg_parser_args = dict( allow_no_value=True, delimiters=("=",), interpolation=ExtendedInterpolation() if interpolate else BasicInterpolation(), ) cfg_parser = ConfigParser(**cfg_parser_args) cfg_parser.read(file_path) # Add configuration entries for cfg_section in cfg_parser.sections(): section, has_profile, profile = cfg_section.partition("__") for key, value in cfg_parser[cfg_section].items(): # Handle meta-information if ":" in key: continue meta = {k.partition(":")[-1]: v for k, v in cfg_parser[cfg_section].items() if k.startswith(f"{key}:")} # Create a configuration entry self.update( section, key, value if value is None else value.replace("\n", " "), profile=profile if has_profile else None, source=str(file_path), meta=meta, allow_new=allow_new, _update_sections=False, ) self._set_sections_for_profiles() def update_from_config_section( self, other_section: "ConfigurationSection", section: Optional[SectionName] = None, allow_new: bool = True ) -> None: section = other_section.name if section is None else section for key, entry in other_section.items(): self.update( section, key, entry.str, source=entry.source, meta=entry.meta, allow_new=allow_new, _update_sections=False, ) self._set_sections_for_profiles() def update_from_options( self, options: Optional[List[str]] = None, profile: ProfileName = None, source: str = "command line", allow_new: bool = False, ) -> None: if options is None: options = sys.argv[1:] for option in options: if not (option.startswith("--") and "=" in option): continue # Parse config name, section, key and value of the form name:section:key=value opt_key, _, opt_value = option[2:].partition("=") opt_section, _, opt_key = opt_key.rpartition(":") opt_name, _, opt_section = opt_section.rpartition(":") # Update current configuration if opt_name and opt_name != self.name: continue if not opt_section: opt_section = self.master_section.name self.update( opt_section, opt_key, opt_value, profile=profile, source=f"{source} ({option})", allow_new=allow_new, _update_sections=False, ) self._set_sections_for_profiles() def update_from_dict( self, cfg_dict: Dict[str, Any], section: Optional[SectionName] = None, source: str = "dictionary", allow_new: bool = True, ) -> None: section = self.master_section.name if section is None else section for key, value in cfg_dict.items(): self.update(section, key, value, source=source, allow_new=allow_new, _update_sections=False) self._set_sections_for_profiles() def clear(self) -> None: """Clear the configuration""" self._sections.clear() self.clear_vars() @property def vars(self) -> ConfigVars: """The configuration variables""" return self._vars_dict def clear_vars(self) -> None: """Clear the configuration variables""" self._vars_dict.clear() def update_vars(self, new_vars: ConfigVars) -> None: """Update the configuration variables""" self._vars_dict.update(new_vars) def as_str( self, width: Optional[int] = None, key_width: int = 30, only_used: bool = False, metadata: bool = True ) -> str: """The configuration represented as a string This is simililar to what is shown by `str(configuration)` (and implemented by `__str__`), but has more flexibility. Args: width: Width of text for wrapping. Default is width of console. key_width: Width of the key column. Default is 30 characters. only_used: Only include configuration entries that has been used so far. metadata: Include metadata like type and help text. Returns: String representation of the configuration. """ sections = self._sections.values() section_strs = [ s.as_str(width=width, key_width=key_width, only_used=only_used, metadata=metadata) for s in sections ] return "\n\n\n".join(s for s in section_strs if s) def as_dict( self, getters: Optional[Dict[SectionName, Dict[str, str]]] = None, default_getter: str = "str" ) -> Dict[SectionName, Dict[str, Any]]: """The configuration represented as a dictionary Args: getters: How to get the value of each entry in each section. default_getter: How to get the value of entries not specified in getters. Returns: Representation of the configuration as a nested dictionary. """ getters = dict() if getters is None else getters return {k: v.as_dict(getters=getters.get(k), default_getter=default_getter) for k, v in self._sections.items()} def __getitem__(self, key: SectionName) -> "ConfigurationSection": """Get a section or entry from the master section from the configuration""" try: return self._sections[key] except KeyError: try: return self.master_section[key] except exceptions.MissingSectionError: raise exceptions.MissingSectionError(f"Configuration '{self.name}' has no section '{key}'") from None def __getattr__(self, key: SectionName) -> "ConfigurationSection": """Get a section or entry from the master section from the configuration""" return self[key] def __delitem__(self, key: SectionName) -> None: """Delete a section from the configuration""" del self._sections[key] def __delattr__(self, key: SectionName) -> None: """Delete a section from the configuration""" del self._sections[key] def __dir__(self) -> List[str]: """List attributes and sections in the configuration""" try: return super().__dir__() + self.sections + self.master_section.as_list() except exceptions.MissingSectionError: return super().__dir__() + self.sections def __str__(self) -> str: """The configuration represented as a string This string can be stored in a file and read back with `update_from_file`. """ return "\n\n".join(str(s) for s in self._sections.values()) def __repr__(self) -> str: """A simple string representation of the configuration""" return f"{self.__class__.__name__}(name='{self.name}')" class ConfigurationSection(UserDict): def __init__(self, name: SectionName) -> None: super().__init__() self.name = name def as_str( self, width: Optional[int] = None, key_width: int = 30, only_used: bool = False, metadata: bool = True ) -> str: """The configuration section represented as a string This is simililar to what is shown by `str(section)` (and implemented by `__str__`), but has more flexibility. Args: width: Width of text for wrapping. Default is width of console. key_width: Width of the key column. Default is 30 characters. only_used: Only include configuration entries that has been used so far. metadata: Include metadata like type and help text. Returns: String representation of the configuration section. """ lines = list() for entry in self.data.values(): if only_used and not entry.is_used: continue lines.append(entry.entry_as_str(width=width, key_width=key_width, metadata=metadata)) if lines: return f"[{self.name}]\n" + "\n".join(lines) else: return "" def as_list(self) -> List[str]: """List of keys of entries in configuration section Returns: List of keys of entries in configuration section. """ return list(self.data.keys()) def as_dict(self, getters: Dict[str, str] = None, default_getter: str = "str") -> Dict[str, Any]: """The configuration section represented as a dictionary Args: getters: How to get the value of each entry in the section. default_getter: How to get the value of entries not specified in getters. Returns: Representation of the configuration section as a dictionary. """ getters = dict() if getters is None else getters getters = {k: getters.get(k, default_getter) for k in self.keys()} return {k: getattr(e, getters[k]) for k, e in self.items()} def __getitem__(self, key: EntryName) -> "ConfigurationEntry": """Get an entry from the configuration section""" try: return self.data[key] except KeyError: raise exceptions.MissingEntryError(f"Configuration section '{self.name}' has no entry '{key}'") from None def __getattr__(self, key: EntryName) -> "ConfigurationEntry": """Get an entry from the configuration section""" try: return self.data[key] except KeyError: raise exceptions.MissingEntryError(f"Configuration section '{self.name}' has no entry '{key}'") from None def __dir__(self) -> List[str]: """List attributes and entries in the configuration section""" return super().__dir__() + self.as_list() def __str__(self) -> str: """The configuration section represented as a string""" return f"[{self.name}]\n" + "\n".join(str(v) for v in self.data.values()) def __repr__(self) -> str: """A simple string representation of the configuration section""" return f"{self.__class__.__name__}(name='{self.name}')" class ConfigurationEntry(): _BOOLEAN_STATES = { "0": False, "1": True, "false": False, "true": True, "no": False, "yes": True, "off": False, "on": True } def __init__( self, key: EntryName, value: Any, *, source: builtins.str = "", meta: Optional[Dict[str, Any]] = None, vars_dict: Optional[ConfigVars] = None, _used_as: Optional[Set[builtins.str]] = None, ) -> None: self.source = source self.meta = dict() if meta is None else meta self._key = key self._value = str(value) self._vars_dict = dict() if vars_dict is None else vars_dict self._used_as = set() if _used_as is None else _used_as @property def type(self) -> Optional[builtins.str]: """Type hint for the ConfigurationEntry""" return self.meta.get("type", None) @property def help(self) -> builtins.str: """Help text for the ConfigurationEntry""" return self.meta.get("help", "") @property def str(self) -> builtins.str: """Value of ConfigurationEntry as string""" self._using("str") return self._value def as_str(self) -> builtins.str: """Value of ConfigurationEntry as string""" return self.str @property def int(self) -> builtins.int: """Value of ConfigurationEntry converted to an integer""" self._using("int") try: return int(self._value) except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} cannot be converted to an integer" ) from None def as_int(self) -> builtins.int: """Value of ConfigurationEntry converted to an integer""" return self.int @property def float(self) -> builtins.float: """Value of ConfigurationEntry converted to a float""" self._using("float") try: return float(self._value) except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} cannot be converted to a float" ) from None def as_float(self) -> builtins.float: """Value of ConfigurationEntry converted to a float""" return self.float @property def bool(self) -> builtins.bool: """Value of ConfigurationEntry converted to a boolean The conversion is done by looking up the string value of the entry in _BOOLEAN_STATES. """ self._using("bool") try: return self._BOOLEAN_STATES[self._value.lower()] except KeyError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} cannot be converted to a boolean" ) from None def as_bool(self) -> builtins.bool: """Value of ConfigurationEntry converted to a boolean The conversion is done by looking up the string value of the entry in _BOOLEAN_STATES. """ return self.bool @property def date(self) -> stdlib_datetime.date: """Value of ConfigurationEntry converted to a date object assuming format `FMT_date`""" return self.as_date(format=FMT_date) def as_date(self, format: builtins.str = FMT_date) -> stdlib_datetime.date: """Value of ConfigurationEntry converted to a date object Args: format (String): Format string, see strftime for information about the string. Returns: Date: Value of entry. """ self._using("date") try: return stdlib_datetime.datetime.strptime(self._value, format).date() except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} does not match the date format '{format}'" ) from None @property def datetime(self) -> stdlib_datetime.datetime: """Value of ConfigurationEntry converted to a datetime object assuming format `FMT_datetime`""" return self.as_datetime(format=FMT_datetime) def as_datetime(self, format: builtins.str = FMT_datetime) -> stdlib_datetime.datetime: """Value of ConfigurationEntry converted to a datetime object Args: format (String): Format string, see strftime for information about the string. Returns: Datetime: Value of entry. """ self._using("datetime") try: return stdlib_datetime.datetime.strptime(self._value, format) except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} does not match the date format '{format}'" ) from None @property def path(self) -> pathlib.Path: """Value of ConfigurationEntry interpreted as a path string""" self._using("path") path = self._value if "~" in path: path = os.path.expanduser(path) return pathlib.Path(path) def as_path(self) -> pathlib.Path: """Value of ConfigurationEntry interpreted as a path string""" return self.path @property def list(self) -> List[builtins.str]: """Value of ConfigurationEntry converted to a list by splitting at commas and whitespace""" self._using("list") return self._value.replace(",", " ").split() def as_list(self, split_re: builtins.str = r"[\s,]", convert: Callable = builtins.str) -> List[Any]: """Value of ConfigurationEntry converted to a list The entry is converted to a list by using the `split_re`-regular expression. By default the entry will be split at commas and whitespace. Args: split_re: Regular expression used to split entry into list. convert: Function used to convert each element of the list. Returns: Value of entry as list. """ self._using("list") return [convert(s) for s in re.split(split_re, self._value) if s] @property def list_of_lists(self) -> List[List[builtins.str]]: self._using("list_of_lists") raise NotImplementedError def as_list_of_lists( self, split_res: Tuple[builtins.str, ...] = (r"[\s,]", r"[^_\w]"), num_elements: Optional[builtins.int] = None, convert: Callable = builtins.str, ) -> List[List[Any]]: self._using("list_of_lists") raise NotImplementedError @property def tuple(self) -> Tuple[builtins.str, ...]: """Value of ConfigurationEntry converted to tuple by splitting at commas and whitespace""" self._using("tuple") return tuple(self._value.replace(",", " ").split()) def as_tuple(self, split_re: builtins.str = r"[\s,]", convert: Callable = builtins.str) -> Tuple[Any, ...]: """Value of ConfigurationEntry converted to a tuple The entry is converted to a tuple by using the `split_re`-regular expression. By default the entry will be split at commas and whitespace. Args: split_re: Regular expression used to split entry into tuple. convert: Function used to convert each element of the tuple. Returns: Value of entry as tuple. """ self._using("tuple") return tuple([convert(s) for s in re.split(split_re, self._value) if s]) @property def dict(self) -> Dict[builtins.str, builtins.str]: """Value of ConfigurationEntry converted to a dict""" self._using("dict") return dict(i.split(":", maxsplit=1) for i in self.list) def as_dict( self, item_split_re: builtins.str = r"[\s,]", key_value_split_re: builtins.str = r"[:]", convert: Callable = builtins.str, ) -> Dict[builtins.str, Any]: """Value of ConfigurationEntry converted to a dictionary By default the dictionary is created by splitting items at commas and whitespace, and key from value at colons. Args: item_split_re: Regular expression used to split entry into items. key_value_split_re: Regular expression used to split items into keys and values. convert: Function used to convert each value in the dictionary. Returns: Value of entry as dict. """ self._using("dict") items = [s for s in re.split(item_split_re, self._value) if s] key_values = [re.split(key_value_split_re, i, maxsplit=1) for i in items] return {k: convert(v) for k, v in key_values} def as_enum(self, enum: builtins.str) -> enums.enum.Enum: """Value of ConfigurationEntry converted to an enumeration Args: enum (String): Name of Enum. Returns: Enum: Value of entry as Enum. """ self._using("enum") return enums.get_value(enum, self._value) @property def replaced(self) -> "ConfigurationEntry": """Value of ConfigurationEntry with {$}-variables replaced""" return self.replace() def replace(self, default: Optional[builtins.str] = None, **replace_vars: builtins.str) -> "ConfigurationEntry": replacement_vars = dict(self._vars_dict, **replace_vars) replacement_value = self._value replacements = list() matches = re.findall(r"\{\$\w+\}", replacement_value) for match in matches: var = match.strip("${}") replacement = str(replacement_vars.get(var, match if default is None else default)) replacements.append(f"{var}={replacement}") replacement_value = replacement_value.replace(match, replacement) return self.__class__( key=self._key, value=replacement_value, source=self.source + " ({','.join(replacements)})", _used_as=self._used_as, ) @property def is_used(self) -> builtins.bool: return bool(self._used_as) def entry_as_str( self, width: Optional[builtins.int] = None, key_width: builtins.int = 30, metadata: builtins.bool = True ) -> builtins.str: """The configuration entry represented as a string This is simililar to what is shown by `str(entry)` (and implemented by `__str__`), but has more flexibility. Args: width: Width of text for wrapping. Default is width of console. key_width: Width of the key column. Default is 30 characters. metadata: Include metadata like type and help text. Returns: String representation of the configuration entry. """ lines = list() width = console.columns() if width is None else width fill_args = dict(width=width, hanging=key_width + 3, break_long_words=False, break_on_hyphens=False) # The entry itself lines.append(console.fill(f"{self._key:<{key_width}} = {self._value}", **fill_args)) # Metadata, including help text and type hints if metadata and self.meta: for meta_key, meta_value in self.meta.items(): if meta_value is None: lines.append(console.fill(f"{self._key}:{meta_key}", **fill_args)) else: lines.append(console.fill(f"{f'{self._key}:{meta_key}':<{key_width}} = {meta_value}", **fill_args)) lines.append("") return "\n".join(lines) def _using(self, as_type: builtins.str) -> None: """Register that entry is used as a type Args: as_type: Name of type entry is used as. """ self._used_as.add(as_type) def __add__(self, other: "ConfigurationEntry") -> "ConfigurationEntry": if isinstance(other, self.__class__): if self.source == other.source: source = f"{self.source} (+)" else: source = f"{self.source} + {other.source}" return self.__class__( key=f"{self._key} + {other._key}", value=self.str + other.str, source=source, vars_dict=self._vars_dict ) else: return NotImplemented def __bool__(self) -> builtins.bool: """A ConfigurationEntry is truthy if the value is not empty""" return bool(self._value) def __str__(self) -> builtins.str: """The configuration entry represented as a string""" return self.entry_as_str() def __repr__(self) -> builtins.str: """A simple string representation of the configuration entry""" return f"{self.__class__.__name__}(key='{self._key}', value='{self._value}')" PKҲLmidgard/config/constant.pyPKԲLmidgard/config/unit.pyPKfLmidgard/coords/__init__.pyPKLmidgard/coords/position.pyPK=Lmidgard/coords/time.pyPK,Lmidgard/dev/__init__.pyPKαLmidgard/dev/cache.pyPKwLLLmidgard/dev/console.py"""Simpler dealing with the console Description: ------------ Utilities for using the console. Mainly wrappers around other libraries to make them easier and more intuitive to use. Size of console: The two functions `lines()` and `columns()` report the current size of the console. Textwrapping: The function `fill()` can be used to rewrap a text-string so that it fits inside the console. Color: The sub-module `color` can be used to set the foreground and background colors. Note that the color functionality depends on the external package `colorama`. If `colorama` is not installed, color gracefully falls back to not showing any color. Examples: --------- >>> from midgard.lib import console >>> print(console.columns()) 86 >>> print(console.fill(a_very_long_string)) Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras tempus eleifend feugiat. Maecenas vitae posuere metus. Sed sit amet fermentum velit. Aenean vitae turpis at risus sollicitudin fringilla in in nisi. Maecenas vitae ante libero. Aenean ut eros consequat, ornare erat at, tempus arcu. Suspendisse velit leo, eleifend eget mi non, vehicula ultricies erat. Vestibulum id nisi eget nisl venenatis dignissim. Duis cursus quam dui, vel hendrerit nibh lacinia id. >>> print(console.color.Fore.YELLOW + console.color.Back.BLUE + 'I am YELLOW text on a BLUE background!') I am YELLOW text on a BLUE background! """ # Standard library imports import shutil import textwrap from typing import Any, Optional # Midgard imports from midgard.dev import optional # Use colorama for coloring in the console, graceful fallback to no color if colorama is not installed _empty_string = optional.EmptyStringMock("_empty_string") color = optional.optional_import( "colorama", attrs=dict(init=lambda **_: None, Back=_empty_string, Fore=_empty_string, Style=_empty_string) ) color.init(autoreset=True) def lines() -> int: """The height of the console Returns: The heigth of the console in characters. """ return shutil.get_terminal_size().lines def columns() -> int: """The width of the console Returns: The width of the console in characters. """ return shutil.get_terminal_size().columns def fill(text: str, *, width: Optional[int] = None, hanging: Optional[int] = None, **tw_args: Any) -> str: """Wrapper around textwrap.fill The `tw_args` are passed on to textwrap.fill. See textwrap.TextWrapper for available keyword arguments. The default value for `width` is console.columns(), while the new argument `hanging`, if defined, will try to set (although not override) the textwrap-arguments `initial_indent` and `subsequent_indent` to create a hanging indent (no indent on the first line) of `hanging` spaces. Args: text: Text that will be wrapped. width: The maximum width (in characters) of wrapped lines. hanging: Number of characters used for hanging indent. tw_args: Arguments passed on to `textwrap.fill`. Returns: Wrapped string. """ width = columns() if width is None else width if hanging is not None: tw_args.setdefault("initial_indent", "") tw_args.setdefault("subsequent_indent", " " * hanging) return textwrap.fill(text, width=width, **tw_args) def dedent(text: str, num_spaces: int) -> str: """Wrapper around textwrap.dedent Dedents at most num_spaces. Args: text: Text that will be dedented. num_spaces: Number of spaces that will be used for dedentation. Returns: Dedented string. """ # Dedent the text all the way dedented_text = textwrap.dedent(text) # Indent it back if necessary num_indents = (num_leading_spaces(text) - num_leading_spaces(dedented_text)) - num_spaces if num_indents > 0: return indent(dedented_text, num_spaces=num_indents) else: return dedented_text def indent(text: str, num_spaces: int, **tw_args: Any) -> str: """Wrapper around textwrap.indent The `tw_args` are passed on to textwrap.indent. Args: text: Text that will be indented. num_spaces: Number of spaces that will be used for indentation. Returns: Indented string. """ tw_args["prefix"] = " " * num_spaces return textwrap.indent(text, **tw_args) def num_leading_spaces(text: str, space_char: str = " ") -> int: """Count number of leading spaces in a string Args: text: String to count. space_char: Which characters count as spaces. Returns: Number of leading spaces. """ return len(text) - len(text.lstrip(space_char)) PK!L1/,,midgard/dev/exceptions.py"""Definition of Midgard-specific exceptions Description: ------------ Custom exceptions used by Midgard for more specific error messages and handling. """ class MidgardException(Exception): pass class MidgardExit(SystemExit, MidgardException): pass class InitializationError(MidgardException): pass class FieldExistsError(MidgardException): pass class FieldDoesNotExistError(MidgardException): pass class MissingConfigurationError(MidgardException): pass class MissingDataError(MidgardException): pass class MissingEntryError(MidgardException): pass class MissingSectionError(MidgardException): pass class UnknownEnumError(MidgardException): pass class UnknownPluginError(MidgardException): pass class UnitError(MidgardException): pass PKuLmidgard/dev/interactive.pyPK4[L/8<<midgard/dev/library.py"""Python wrapper around C-libraries Description: ------------ Loads a C-library. If a library is missing, a mock library is returned. If this mock is used for anything, a warning will be printed. This is done to avoid dependencies to all the C/C++-libraries for Python programs only using some of them. """ # Standard library imports import ctypes as c from ctypes import util as c_util import sys def load_name(library_name, func_specs=None, name_patterns=None): """Load the given shared C-library See `load_path` for an explanation of the `func_specs` and `name_patterns`-arguments. Args: library_name (String): The name of the library. func_specs (Dict): Specification of types in lib (see load_path). name_patterns (List): Name mangling patterns (see load_path). Returns: ctypes.CDLL: Representation of the shared library. """ library_path = c_util.find_library(library_name) return load_path(library_path, func_specs=func_specs, name_patterns=name_patterns) def load_path(library_path, func_specs=None, name_patterns=None): """Load the given shared C-library The optional func_specs-dictionary can be used to specify argument and return types of functions in the library (see the ctypes documentation for information about argtypes and restype). The dictionary should be on the form:: func_spec = {'func_1': dict(func_name='name_of_func_1_in_lib', argtypes=[ ... argtypes of func_1 ... ], restype=... restype of func_1 ...), 'func_2': ... } If the library is not found, a mock library is returned instead. The mock library will print a warning if it is used. For some libraries, name mangling is used and this might be different depending on operating system and how the library is compiled. For instance, in a Fortran library the function `Test` might be represented as `__Test` on a Windows system and `test_` (with lower-case `t`) on a Linux system. This can be handled by providing a list of possible patterns. The above example can be handled by:: name_patterns = ('__{func_name}', '{func_name_lower}_') In this case, each function in func_specs is looked up by testing each pattern in turn until a match is found. Args: library_path (String): The path to the library. func_specs (Dict): Specification of types in library (see above). name_patterns (List): Name mangling patterns (see above). Returns: ctypes.CDLL: Representation of the shared library. """ library_handle = c.cdll.LoadLibrary(library_path) # Return a Mock object if the library is not found if library_handle._name is None: mock = SimpleMock(name=library_path) return mock # Handle name mangling if name_patterns is None: def mangled_name(name): return name else: def mangled_name(name): for pattern in name_patterns: full_name = pattern.format(func_name=name, func_name_lower=name.lower()) if hasattr(library_handle, full_name): return full_name return name # Set argument and return types on functions if func_specs: for name, spec in func_specs.items(): if "func_name" in spec: func_name = mangled_name(spec["func_name"]) else: func_name = mangled_name(name) func = getattr(library_handle, func_name) delattr(library_handle, func_name) if "argtypes" in spec: func.argtypes = spec["argtypes"] if "restype" in spec: func.restype = spec["restype"] setattr(library_handle, name, func) return library_handle class SimpleMock: """Class that can stand in for any other object The SimpleMock is used to stand in for any library that can not be imported. The mock object simply returns itself whenever it is called, or any attributes are looked up on the object. This is done, to avoid ImportErrors when a library is imported, but never used (typically because a plugin is loaded but never called). Instead the ImportError is raised when the SimpleMock is used in any way. The ImportError will only be raised once for any SimpleMock-object (which is only important if the ImportError is caught and the program carries on). """ def __init__(self, name, raise_error=True): """Initialize SimpleMock object Args: name (String): Name of SimpleMock-object. raise_error (Bool): Should ImportError be raised when using object. """ self._name = name self._children = dict() self._raise_error = raise_error def _raise_import_error(self): """Raise an import error when the SimpleMock object is used The ImportError is only raised the first time the object is used. """ # Only raise the error once if self._raise_error: self._raise_error = False else: return # Find calling function caller = sys._getframe() while caller.f_code.co_filename == __file__: caller = caller.f_back func_name = caller.f_code.co_name line_no = caller.f_lineno file_name = caller.f_code.co_filename # Raise ImportError with a helpful message raise ImportError( "The library '{}' is not installed, but is used by '{}' on line {} of {}" "".format(self._name, func_name, line_no, file_name) ) def __call__(self, *args, **kwargs): """Return the same SimpleMock-object when it is called An ImportError is raised the first time the object is used. Returns: SimpleMock: Itself. """ self._raise_import_error() return self def __getattr__(self, key): """Create a child-SimpleMock-object and return it. The same child object is returned if the same attribute is gotten several times. An ImportError is raised the first time the SimpleMock-object is used. Additional errors are not raised for the children. Args: key (String): Name of attribute. Returns: SimpleMock: A child-SimpleMock-object. """ self._raise_import_error() if key not in self._children: self._children[key] = type(self)("{}.{}".format(self._name, key), raise_error=self._raise_error) setattr(self, key, self._children[key]) return self._children[key] def __repr__(self): """String representation of the SimpleMock-object Returns: String: Simple representation of the SimpleMock-object. """ return "{}('{}')".format(self.__class__.__name__, self._name) def __str__(self): """Convert to the empty string Returns: String: An empty string. """ return "" PKױLmidgard/dev/log.pyPKLeemidgard/dev/optional.py"""Midgard library module for handling optional dependencies Description: ------------ Import dependencies that are only necessary for specific parts of Midgard. Using this module will delay raising an ImportError until the dependency is actually used. This means that if one for instance only wants to run a GNSS analysis (or only use a Rinex-parser) installing special libraries only used for VLBI is not necessary. Examples: --------- The optional import is typically used as follows:: from midgard.lib import optional netCDF4 = optional.optional_import('netCDF4') """ # Standard library imports import importlib import sys from types import ModuleType from typing import Any, Dict, List, Optional, Union class SimpleMock: """Class that can stand in for any other object The SimpleMock is used to stand in for any library that can not be imported. The mock object simply returns itself whenever it is called, or any attributes are looked up on the object. This is done, to avoid ImportErrors when a library is imported, but never used (for instance if a plugin is loaded but never called). Instead the ImportError is raised when the SimpleMock is used in any way. The ImportError will only be raised once for any SimpleMock-object (which is only important if the ImportError is caught and the program carries on). The exception is if any attributes (`attrs`) are explicitly defined on the mock. No exception is raised if those attributes are looked up. """ def __init__(self, name: str, raise_error: bool = True, attrs: Optional[Dict[str, Any]] = None) -> None: """Initialize SimpleMock object Args: name: Name of SimpleMock-object. Used for string representation and when raising Errors. raise_error: Whether ImportError should be raised when object is used. attrs: Attributes that should be added to the SimpleMock. """ self._name = name self._children: Dict[str, "SimpleMock"] = dict() self._raise_error = raise_error self._attrs = attrs if attrs is not None: for name, attr in attrs.items(): setattr(self, name, attr) def _raise_import_error(self) -> None: """Raise an import error when the SimpleMock object is used The ImportError is only raised the first time the object is used. """ # Only raise the error once if self._raise_error: self._raise_error = False else: return # Find calling function caller = sys._getframe() while caller.f_code.co_filename == __file__: caller = caller.f_back func_name = caller.f_code.co_name line_num = caller.f_lineno file_name = caller.f_code.co_filename # Raise ImportError with a helpful message raise ImportError( f"The module '{self._name}' is not installed, " f"but is used by '{func_name}' on line {line_num} of {file_name}" ) def __call__(self, *args: Any, **kwargs: Any) -> "SimpleMock": """Return the same SimpleMock-object when it is called An ImportError is raised the first time the object is used. Returns: Itself. """ self._raise_import_error() return self def __getattr__(self, key: str) -> Any: """Create a child-SimpleMock-object and return it. The same child object is returned if the same attribute is gotten several times. An ImportError is raised the first time the SimpleMock-object is used. Additional errors are not raised for the children. Args: key: Name of attribute. Returns: A child-SimpleMock-object. """ self._raise_import_error() if key not in self._children: child_name = f"{self._name}.{key}" self._children[key] = type(self)(child_name, raise_error=self._raise_error, attrs=self._attrs) setattr(self, key, self._children[key]) return self._children[key] def __repr__(self) -> str: """String representation of the SimpleMock-object Returns: Simple string representation of the SimpleMock-object. """ return f"{self.__class__.__name__}('{self._name}')" class EmptyStringMock(SimpleMock): """A mock object whose properties are all empty strings """ def __getattr__(self, key: str) -> str: """All attributes are empty strings. Args: key: Name of attribute. Returns: An empty string. """ return "" def optional_import( module_name: str, raise_error: bool = True, mock_cls: type = SimpleMock, attrs: Optional[Dict[str, Any]] = None ) -> Union[Any, SimpleMock]: # TODO: Should be typed types.ModuleType? but causes typing errors """Try to import an optional module If the module does not exist, a SimpleMock-object is returned instead. If this SimpleMock-object is later used, an ImportError will be raised then (if `raise_error` is True, which is default). Args: module_name: Name of module to import. raise_error: Whether an ImportError should be raised if the module does not exist, but is used. attrs: Attributes that should be added to the SimpleMock used if the module does not exist. Returns: Imported module object, or a SimpleMock-object if the module can not be imported. """ try: return importlib.import_module(module_name) except ImportError: return mock_cls(module_name, raise_error=raise_error, attrs=attrs) PK᱐Lmidgard/dev/plugins.pyPKLs۽  midgard/dev/profiler.py"""Add a profiler when running Supports several profilers including cprofile, line_profiler, memprof and memory_profiler. """ # Standard library imports from abc import ABCMeta, abstractmethod from typing import Callable, Generator class Profiler(metaclass=ABCMeta): """Base class for profilers""" pip_name: str = NotImplemented option: str = NotImplemented extension: str = NotImplemented @abstractmethod def setup(self, options): """Set up profiler""" @abstractmethod def start(self): """Start profiler""" @abstractmethod def end(self): """Stop profiler""" @abstractmethod def show(self): """Show results of profiler session in console""" @abstractmethod def write(self): """Write results of profiler session to disk""" def _get_func_from_name(self, name: str) -> Generator: """Get a function object from a name Assume the name is given as `module.function`, or `module.class.method`. Args: name: The full name of a function, for instance 'midgard.dev.profiler._get_func_from_name'. Returns: Function: The function object represented by name. """ import importlib import types # Import module and find function (possibly trying to find function inside class) modname, _, fname = name.rpartition(".") try: obj = importlib.import_module(modname) except ImportError: modname, _, clsname = modname.rpartition(".") mod = importlib.import_module(modname) obj = getattr(mod, clsname) # Support simple *-wildcard matching if name.endswith("*"): fnames = [f for f in dir(obj) if f.startswith(fname[:-1]) and not f.startswith("__")] if not fnames: raise TypeError("Found no functions named '{}'".format(name)) else: fnames = [fname] # Generate each function in fnames for fname in fnames: func = getattr(obj, fname) # Unwrap decorated functions while hasattr(func, "__wrapped__"): func = func.__wrapped__ # Return getter, setter and deleter functions of properties if isinstance(func, property): for accessor in ("fget", "fset", "fdel"): if getattr(func, accessor) is not None: yield getattr(func, accessor) else: if isinstance(func, types.FunctionType): yield func elif not name.endswith("*"): raise TypeError("'{}.{}' is not a function".format(modname, fname)) class CProfile(Profiler): """cprofile is used for profiling the whole program""" pip_name = "cprofile" option = "--profiler" def start(self): print("hei") class LineProfiler(Profiler): """line_profiler is used to profile one or a few functions in detail""" pip_name = "line_profiler" option = "--line_profiler" """ def profiler(func_to_profile): ""Run a function with profiling turned on Args: func_to_profile (Function): Function that will be called after profiling is turned on. "" # Should we do line or function profiling? prof, info = _setup_line_profiler() if util.check_options("--line_profile") else _setup_cprofile() # Read command line options filename = util.read_option_value("--profile_output", default="where") if not os.path.splitext(filename)[1]: filename = "{}.{}".format(filename, info["extension"]) # Start profiling and run Where as usual. Store and print profile even if Where crashes log.info("Enable {}, output stored in {}", info["doc"], filename) prof.enable() try: func_to_profile() finally: prof.disable() # Store profile to file inspect_str = "Inspect it using e.g. {}".format(" or ".join(info["inspect"])) log.info("Profile information stored to {f}\n " + inspect_str, f=filename) prof.dump_stats(filename) # Print profile to terminal if util.check_options("--show_profile"): prof.print_stats() def _setup_line_profiler(): ""Set up a line profiler"" import line_profiler prof = line_profiler.LineProfiler() funcs = util.read_option_value("--line_profile", default="").split(",") for fname in funcs: for func in _get_func_from_name(fname): prof.add_function(func) info = dict( extension="lprof", inspect=['"python -m line_profiler {f}"'], doc="line profiling functions {}".format(", ".join(funcs)), ) return prof, info def _get_func_from_name(name): ""Get a function object from a name Assume the name is given as module.function, or module.class.method. Args: name (String): The full name of a function, for instance 'where.__main__._get_func_from_name'. Returns: Function: The function object represented by name "" import importlib import types # Import module and find function (possibly trying to find function inside class) name = name if name.startswith("where.") else "where." + name modname, _, fname = name.rpartition(".") try: obj = importlib.import_module(modname) except ImportError: modname, _, clsname = modname.rpartition(".") mod = importlib.import_module(modname) obj = getattr(mod, clsname) # Support simple *-wildcard matching if name.endswith("*"): fnames = [f for f in dir(obj) if f.startswith(fname[:-1]) and not f.startswith("__")] if not fnames: raise TypeError("Found no functions named '{}'".format(name)) else: fnames = [fname] # Generate each function in fnames for fname in fnames: func = getattr(obj, fname) # Unwrap decorated functions while hasattr(func, "__wrapped__"): func = func.__wrapped__ # Return getter, setter and deleter functions of properties if isinstance(func, property): for accessor in ("fget", "fset", "fdel"): if getattr(func, accessor) is not None: yield getattr(func, accessor) else: if isinstance(func, types.FunctionType): yield func elif not name.endswith("*"): raise TypeError("'{}.{}' is not a function".format(modname, fname)) def _setup_cprofile(): ""Set up a profiler"" import cProfile prof = cProfile.Profile() prof.print_stats = _print_cprofile_stats(prof) info = dict(extension="prof", inspect=['"pyprof2calltree -k -i {f}"', '"python -m pstats {f}"'], doc="profiling") return prof, info def _print_cprofile_stats(prof): ""Print statistics about data from cProfile Print statistics as defined by the `--show_profile` command line option. See the `profile`-function for more information. "" import pstats profile_info = util.read_option_value("--show_profile", default="time:50") def print_stats(): stat = pstats.Stats(prof).strip_dirs() if ":" in profile_info: sort_columns, _, amount = profile_info.partition(":") else: sort_keys = stat.get_sort_arg_defs().keys() sort_columns, amount = (profile_info, "") if profile_info in sort_keys else ("time", profile_info) amount = int(amount) if amount.isnumeric() else amount pstats.Stats(prof).strip_dirs().sort_stats(sort_columns).print_stats(amount) return print_stats """ PKLmidgard/dev/timer.pyPKLmidgard/dev/util.pyPKmLmidgard/files/__init__.pyPKZLnOmidgard/files/dates.py"""Convenience functions for working with dates Description: ------------ Formats and converters that can be used for convenience and consistency. """ # Formats that can be passed to datetime.strftime, see http://strftime.org/ FMT_date = "%Y-%m-%d" FMT_datetime = "%Y-%m-%d %H:%M:%S" FMT_dt_file = "%Y%m%d-%H%M%S" def date_vars(date): """Construct a dict of date variables From a given date, construct a dict containing all relevant date variables. This dict can be used to for instance replace variables in file names. Examples: >>> from datetime import date >>> date_vars = date_vars(date(2009, 11, 2)) >>> sorted(date_vars.items()) # doctest: +NORMALIZE_WHITESPACE [('MMM', 'NOV'), ('ce', '20'), ('d', '2'), ('dd', '02'), ('dow', '1'), ('doy', '306'), ('gpsweek', '1556'), ('m', '11'), ('mm', '11'), ('mmm', 'nov'), ('yy', '09'), ('yyyy', '2009')] Args: date (Date/Datetime): The date. Returns: Dict: Dictionary with date variables for the given date. """ if date is None: return dict() # Create the dict of date variables return dict( yyyy=date.strftime("%Y"), ce=date.strftime("%Y")[:2], yy=date.strftime("%y"), m=str(date.month), mm=date.strftime("%m"), mmm=date.strftime("%b").lower(), MMM=date.strftime("%b").upper(), d=str(date.day), dd=date.strftime("%d"), doy=date.strftime("%j"), dow=date.strftime("%w"), ) PKƱLmidgard/files/dependencies.pyPKLmidgard/files/files.pyPKLmidgard/math/__init__.pyPKLߪM--midgard/math/interpolation.py"""Methods for interpolating in numpy arrays Description: ------------ Different interpolation methods are decorated with `@register_interpolator` and will then become available for use as `kind` in `interpolate` and `moving_window`. Example: -------- >>> x = np.linspace(-1, 1, 11) >>> y = x**3 - x >>> y array([ 0. , 0.288, 0.384, 0.336, 0.192, 0. , -0.192, -0.336, -0.384, -0.288, 0. ]) >>> x_new = np.linspace(-0.8, 0.8, 11) >>> interpolation.interpolate(x, y, x_new, kind='cubic') array([ 0.288, 0.378, 0.369, 0.287, 0.156, -0. , -0.156, -0.287, -0.369, -0.378, -0.288]) Developer info: --------------- To add your own interpolators, you can simply decorate your interpolator functions with `@register_interpolator`. Your interpolator function should have the signature (x: np.ndarray, y: np.ndarray) -> Callable For instance, the following would implement a terrible interpolation function that sets all values to zero: from midgard.math.interpolation import register_interpolator @register_interpolator def zero(x: np.ndarray, y: np.ndarray) -> Callable: def _zero(x_new: np.ndarray) -> np.ndarray: return np.zeros(y.shape) return _zero This function would then be available as an interpolator. For instance, one could do >>> interpolation.interpolate(x, y, x_new, kind='zero') array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) """ # System library imports from typing import Any, Callable, Dict, List # Third party imports import numpy as np import scipy.interpolate import scipy.misc # Midgard imports from midgard.dev import exceptions # Dictionary of Enumerations. Populated by the @register_enum-decorators. _INTERPOLATORS: Dict[str, Callable] = dict() def register_interpolator(func: Callable) -> Callable: """Register an interpolation function This function should be used as a @register_interpolator-decorator Args: func: Function that will be registered as an interpolator. Returns: Same function. """ name = func.__name__ _INTERPOLATORS[name] = func return func def interpolators() -> List[str]: """Return a list of available interpolators Returns: Names of available interpolators. """ return sorted(_INTERPOLATORS) def get_interpolator(name: str) -> Callable: """Return an interpolation function Interpolation functions are registered by the @register_interpolator-decorator. The name-parameter corresponds to the function name of the interpolator. Args: name: Name of interpolator. Returns: Interpolation function with the given name. """ try: return _INTERPOLATORS[name] except KeyError: interpolator_list = ", ".join(interpolators()) raise exceptions.UnknownPluginError( f"Interpolator '{name}' is not defined. Available interpolators are {interpolator_list}." ) from None def interpolate(x: np.ndarray, y: np.ndarray, x_new: np.ndarray, *, kind: str, **ipargs: Any) -> np.ndarray: """Interpolate values from one x-array to another See `interpolators()` for a list of valid interpolators. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. x_new: 1-dimensional array with new x-values. kind: Name of interpolator to use. ipargs: Keyword arguments passed on to the interpolator. Returns: Array of interpolated y-values. """ interpolator = get_interpolator(kind)(x, y, **ipargs) return interpolator(x_new) def interpolate_with_derivative( x: np.ndarray, y: np.ndarray, x_new: np.ndarray, *, kind: str, dx: float = 0.5, **ipargs: Any ) -> np.ndarray: """Interpolate values from one x-array to another as well as find derivatives See `interpolators()` for a list of valid interpolators. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. x_new: 1-dimensional array with new x-values. kind: Name of interpolator to use. dx: Values at x ± dx are used to determine derivative. ipargs: Keyword arguments passed on to the interpolator. Returns: Tuple with array of interpolated y-values and array of derivatives. """ interpolator = get_interpolator(kind)(x, y, **ipargs) y_new = interpolator(x_new) y_dot = scipy.misc.derivative(interpolator, x_new, dx=dx) return y_new, y_dot @register_interpolator def lagrange( x: np.ndarray, y: np.ndarray, *, window: int = 10, bounds_error: bool = True, assume_sorted: bool = False ) -> Callable: """Computes the lagrange polynomial passing through a certain set of points See https://en.wikipedia.org/wiki/Lagrange_polynomial Uses `window` of the original points to calculate the Lagrange polynomials. The window of points is chosen by finding the closest original point and essentially picking the `window // 2` indices on either side. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. window: Number of points used in interpolation. bounds_error: If True, a ValueError is raised if extrapolation is attempted. assume_sorted: If True, x must be an array of monotonically increasing values. Returns: Lagrange interpolation function """ # Check input if x.ndim != 1: raise ValueError(f"The x array must have exactly one dimension, currently x.ndim={x.ndim}.") if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") if len(y) != len(x): raise ValueError("x and y arrays must be equal in length along the first axis.") if window < 3: raise ValueError("The window should be at least 3") if window > len(x): raise ValueError(f"x and y arrays must have at least window={window} entries") # Sort the input according to the x-array if not assume_sorted: sort_idxs = np.argsort(x) x, y = x[sort_idxs], y[sort_idxs] # Check that x values are monotonically increasing if not all(np.diff(x) > 0): raise ValueError("expected x to be a sorted array with unique values") # Rescale x values to avoid numerical instability _xm, _xs = x.mean(), x.std() x_scaled = (x - _xm) / _xs # Indices to use during calculation of polynomial values indices = np.eye(window) == 0 def _lagrange(x_new: np.ndarray) -> np.ndarray: """Interpolate using a Lagrange polynomial""" if bounds_error and x_new.min() < x.min(): raise ValueError(f"Value {x_new.min()} in x_new is below the interpolation range {x.min()}.") if bounds_error and x_new.max() > x.max(): raise ValueError(f"Value {x_new.max()} in x_new is above the interpolation range {x.max()}.") y_new = np.zeros(x_new.shape[:1] + y.shape[1:]) x_new_scaled = (x_new - _xm) / _xs # Figure out which points to use for the interpolation start_idxs = np.abs(x[:, None] - x_new[None, :]).argmin(axis=0) - window // 2 start_idxs[start_idxs < 0] = 0 start_idxs[start_idxs > len(x) - window] = len(x) - window # Interpolate for each unique set of interpolation points for idx in np.unique(start_idxs): y_idx = start_idxs == idx x_wd, y_wd = x_scaled[idx:idx + window], y[idx:idx + window] diff_x = np.subtract(*np.meshgrid(x_wd, x_wd)) + np.eye(window) r = np.array( [ np.prod((x_new_scaled[y_idx, None] - x_wd[idxs]) / diff_x[idxs, i], axis=1) for i, idxs in enumerate(indices) ] ) y_new[y_idx] = r.T @ y_wd return y_new return _lagrange @register_interpolator def linear(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """Linear interpolation through the given points Uses the scipy.interpolate.interp1d function with kind='linear' behind the scenes. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the interp1d-interpolator. Returns: Linear interpolation function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") # Interpolate along axis=0 by default ipargs.setdefault("axis", 0) return scipy.interpolate.interp1d(x, y, kind="linear", **ipargs) @register_interpolator def cubic(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """Cubic spline interpolation through the given points Uses the scipy.interpolate.interp1d function with kind='cubic' behind the scenes. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the interp1d-interpolator. Returns: Cubic spline interpolation function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") # Interpolate along axis=0 by default ipargs.setdefault("axis", 0) return scipy.interpolate.interp1d(x, y, kind="cubic", **ipargs) @register_interpolator def interpolated_univariate_spline(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """One-dimensional interpolating spline for the given points Uses the scipy.interpolate.InterpolatedUnivariateSpline function behind the scenes. The original only deals with one-dimensional y arrays, so multiple calls are made for higher dimensional y arrays. The dimensions are handled independently of each other. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the scipy-interpolator. Returns: Interpolating spline function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") if y.ndim == 1: return scipy.interpolate.InterpolatedUnivariateSpline(x, y, **ipargs) # Loop over columns in y for higher dimensions def _interpolated_univariate_spline(x_new: np.ndarray) -> np.ndarray: """Interpolate using an interpolating spline""" first_y = (slice(len(y)),) first_new = (slice(len(x_new)),) y_new = np.zeros(x_new.shape[:1] + y.shape[1:]) for last_cols in np.ndindex(y.shape[1:]): idx_new = first_new + last_cols idx_y = first_y + last_cols y_new[idx_new] = scipy.interpolate.InterpolatedUnivariateSpline(x, y[idx_y], **ipargs)(x_new) return y_new return _interpolated_univariate_spline @register_interpolator def barycentric_interpolator(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """The interpolating polynomial through the given points Uses the scipy.interpolate.BarycentricInterpolator function behind the scenes. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the scipy-interpolator. Returns: Barycentric interpolation function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") return scipy.interpolate.BarycentricInterpolator(x, y, **ipargs) PKLmidgard/math/rotation.pyPKLmidgard/parsers/__init__.pyPKLmidgard/parsers/_parser.pyPKL midgard/parsers/_parser_chain.pyPKLmidgard/parsers/_parser_line.pyPKL midgard/parsers/_parser_sinex.pyPK!H) -8(midgard-0.1.4.dist-info/entry_points.txtN+I/N.,()LIO,J/JIM,NE[&fqqPKNL|SFFmidgard-0.1.4.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2018 Norwegian Mapping Authority Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HNOmidgard-0.1.4.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,zd&Y)r$[)T&UrPK!HP midgard-0.1.4.dist-info/METADATAVMo7W chWMS FHIJ;M嬖1ܒ\)_Tsͼ{Q*e]K5.j-'qֵ ᶋЁ+OAwWc5blb>_XpQAq>^NhX]|\mWa!wQc-]t~k--\˦v a-[̫h ['?*Y\b(n"u!{lD!vZt0]kj(c`A݅ n=fAUh ! |]~c@VnC }pdI4NuK rovmt8L}Cdu!br(]x\%wڶ_o[{Y|` H 5iSY"zD,J&8,sލN9iRMb(4-@P8oWBlxrr+Xu)mu! Q6"Șv8};9EuM :N Bwu]ڒ&Pn02!l".`Rjҩl#6v*([ rlQ p&A6 BPFQZHfPmJ#&$NNmt1ܖ!,reR7?vW; qͩ iKLTuc:𭵬F* Q|DN ܡK\q saa{ fQ:LE$z@]"i/̀XWV* MrPҰሡ,9긗@%.NvN)x8ӧva1=*}ܕ0}V"^z-< D 1%sL-iEo,ԁw'|2v-#;qwyJmpȂ.꿇t/3 ^9i zx#TQܺ4Įqt[o!Oψر΃x.:з>&231J/mA(.+.>>8Kh^@Ҥpj}ޡq Ȕ2 ?Q|cdzN[fJz I^ΙE~diqA\)IĪ rj'*PH&Q[4xASN5J_E^&ѣ$ՖeIU]Gҁ\6}.^B@HpCρ7D"PK!Hu^' midgard-0.1.4.dist-info/RECORDvJ}? 2^BET R&ѧ$}4'ٴb,a(`TgN} {AL cNBp4E +, E cŪDsg#X`y9sVNcRQ- &D͋Wc7жKrǧ]Ng%5RE#5&O'6 1L{?5w1˭q(uCjumqWmo oEy5ENͫ5/I@_oXi?aF Bk ib.٭O]|ٛ(-E]۲݉f]d]̜@T8 uu쀼][0V1I_' G;N¡t'ANEtj_\sn*@#W&Uފgw ܴ10ŶdJ*тyIC> 1NМK[G7UL!h^t_NB|:AҷBD`= =]4'cEl1d$\_T^- Ss/8ʁ (B;y{>m>NMm|Ufk