PKdMt midgard/__init__.py"""Midgard, the Python Geodesy library Midgard is a collection of useful Python utilities used by the Geodetic institute at the Norwegian Mapping Authority (Kartverket). Although some of these are geodesy-specific, many are also useful in more general settings. Note: Midgard is still in pre-alpha status. Its functionality will change, and it should not be depended on in any production-like setting. Midgard comes organized into different subpackages: {subpackages} Look for help inside each subpackage: >>> from midgard import subpackage # doctest: +SKIP >>> help(subpackage) # doctest: +SKIP Current maintainers: -------------------- {maintainers} """ # Standard library imports from datetime import date as _date from collections import namedtuple as _namedtuple from pathlib import Path as _Path # Version of Midgard. # # This is automatically set using the bumpversion tool __version__ = "0.1.9" # Authors of Midgard. _Author = _namedtuple("_Author", ["name", "email", "start", "end"]) _AUTHORS = [ _Author("Michael Dähnn", "michael.daehnn@kartverket.no", _date.min, _date.max), _Author("Geir Arne Hjelle", "geir.arne.hjelle@kartverket.no", _date.min, _date.max), _Author("Ann-Silje Kirkvik", "ann-silje.kirkvik@kartverket.no", _date.min, _date.max), _Author("Mohammed Ouasou", "mohammed.ouasou@kartverket.no", _date(2018, 9, 1), _date.max), ] __author__ = ", ".join(a.name for a in _AUTHORS if a.start < _date.today() < a.end) __contact__ = ", ".join(a.email for a in _AUTHORS if a.start < _date.today() < a.end) # Copyleft of the library __copyright__ = f"2018 - {_date.today().year} Norwegian Mapping Authority" # Update doc with info about subpackages and maintainers def _update_doc(doc: str) -> str: """Add information to doc-string Args: doc: The doc-string to update. Returns: The updated doc-string. """ # Subpackages subpackage_paths = _Path(__file__).parent.iterdir() subpackage_list = [p.name for p in subpackage_paths if p.is_dir() and not p.name.startswith("_")] subpackages = "\n".join(f"+ {p}" for p in subpackage_list) # Maintainers maintainer_list = [f"+ {a.name} <{a.email}>" for a in _AUTHORS if a.start < _date.today() < a.end] maintainers = "\n".join(maintainer_list) # Add to doc-string return doc.format(subpackages=subpackages, maintainers=maintainers) __doc__ = _update_doc(__doc__) PK4Lmidgard/collections/__init__.pyPK=Lip: : midgard/collections/enums.py"""Framework for working with enumerations Description: ------------ Custom enumerations used for structured names. """ # Standard library imports import enum from typing import Callable, Dict, List # Midgard imports from midgard.dev import exceptions # Dictionary of Enumerations. Populated by the @register_enum-decorators. _ENUMS: Dict[str, enum.EnumMeta] = dict() def register_enum(name: str) -> Callable[[enum.EnumMeta], enum.EnumMeta]: """Register a named Enumeration This allows for getting Enumerations with the get_enum-function. Args: name: Name used for Enumeration. Returns: Decorator that registers an Enumeration. """ def register_decorator(enum_cls: enum.EnumMeta) -> enum.EnumMeta: _ENUMS[name] = enum_cls return enum_cls return register_decorator def enums() -> List[str]: """Return a list of available enums Returns: Names of available enums. """ return sorted(_ENUMS) def get_enum(name: str) -> enum.EnumMeta: """Return a named Enumeration Names are defined by the @register_enum-decorator. If the name-parameter is not a valid enum, the function will raise an UnknownEnumError and list the available enumerations. Args: name: Name used for Enumeration. Returns: Enumeration with the given name. """ try: return _ENUMS[name] except KeyError: valid_enums = ", ".join(e for e in _ENUMS) raise exceptions.UnknownEnumError( f"Enumeration '{name}' is not defined. Available enumerations are {valid_enums}." ) from None def get_value(name: str, value: str) -> enum.Enum: """Return the value of a named Enumeration Names are defined by the @register_enum-decorator. Args: name: Name used for Enumeration. value: Value of Enumeration. Returns: Value of enumeration with the given name. """ try: return get_enum(name)[value] except KeyError: valid_values = ", ".join(v.name for v in get_enum(name)) # type: ignore raise ValueError( f"Value '{value}' is not valid for a {name}-enumeration. Valid values are {valid_values}." ) from None # # ENUMS # @register_enum("log_level") class LogLevel(enum.IntEnum): """Levels used when deciding how much log output to show""" all = enum.auto() debug = enum.auto() time = enum.auto() dev = enum.auto() info = enum.auto() warn = enum.auto() check = enum.auto() error = enum.auto() fatal = enum.auto() none = enum.auto() PKJL'midgard/collections/dataset/__init__.pyPKwL&midgard/collections/dataset/dataset.pyPKzL$midgard/collections/dataset/table.pyPK\Lmidgard/config/__init__.pyPKsEMHNmidgard/config/config.py"""Midgard library module for handling of configuration settings Description: ------------ A Configuration consists of one or several sections. Each ConfigurationSection consists of one or more entries. Each ConfigurationEntry consists of a key and a value. Examples: --------- For basic use, an entry is looked up by simple attribute access. For instance if `cfg` is a Configuration with the section `midgard` which has an entry `foo = bar`: >>> cfg = Configuration("config_name") >>> cfg.update("midgard", "foo", "bar") >>> cfg.midgard.foo ConfigurationEntry(key='foo', value='bar') ConfigurationEntry has several access methods that convert the entry to a given data type: >>> cfg.update("midgard", "foo_pi", 3.14, source="command line") >>> cfg.midgard.foo_pi ConfigurationEntry(key='foo_pi', value='3.14') >>> cfg.midgard.foo_pi.float 3.14 >>> cfg.midgard.foo_pi.str '3.14' >>> cfg.midgard.foo_pi.tuple ('3.14',) Sources: -------- Each configuration entry records its source. That is, where that entry was defined. Examples include read from file, set as a command line option, or programmatically from a dictionary. The source can be looked up on an individual entry, or for all entries in a configuration. >>> cfg.midgard.foo_pi.source 'command line' >>> cfg.sources # doctest: +SKIP {'/home/midgard/midgard.conf', 'command line'} Profiles: --------- Fallback Configuration: --------------------- Master Section: --------------- Replacement Variables: ---------------------- Help text and Type hints: ------------------------- """ # Standard library imports import builtins from configparser import ConfigParser, BasicInterpolation, ExtendedInterpolation from contextlib import contextmanager import datetime as stdlib_datetime import os.path import pathlib import re import sys from collections import UserDict # Midgard imports from midgard.dev import console from midgard.collections import enums from midgard.dev import exceptions # Typing from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union ProfileName = Optional[str] Sections = Dict[str, "ConfigurationSection"] EntryName = str ConfigVars = Dict[str, Any] # Date and datetime formats FMT_date = "%Y-%m-%d" FMT_datetime = "%Y-%m-%d %H:%M:%S" FMT_dt_file = "%Y%m%d-%H%M%S" class CasedConfigParser(ConfigParser): """ConfigParser with case-sensitive keys""" def optionxform(self, optionstr: str) -> str: """Do not turn optionstr (key) into lowercase""" return optionstr class Configuration: """Represents a Configuration""" def __init__(self, name: str) -> None: """Initialize a Configuration The name is used purely for representation and error messages. Args: name: Name of configuration. """ self.name = name self.fallback_config = None self.master_section = None self._profiles: List[ProfileName] = [None] self._profile_sections: Dict[ProfileName, Sections] = dict() self._sections: Sections = dict() self._vars_dict: ConfigVars = dict() self._update_count: int = 0 @classmethod def read_from_file(cls, cfg_name: str, *file_paths: Union[str, pathlib.Path]) -> "Configuration": """Read a configuration from one or more files Args: file_paths: File(s) that will be read. Returns: A Configuration representing the file(s). """ cfg = cls(cfg_name) for file_path in file_paths[::-1]: cfg.update_from_file(file_path) return cfg @classmethod @contextmanager def update_on_file(cls, file_path: Union[str, pathlib.Path], **as_str_args: Any) -> Generator: """Context manager for updating a configuration on file """ # Read config from file cfg = cls.read_from_file("Temporary", file_path) update_count_before = cfg._update_count # Yield config so it can be updated yield cfg # Write config if it has been updated if cfg._update_count > update_count_before: cfg.write_to_file(file_path, **as_str_args) def write_to_file(self, file_path: Union[str, pathlib.Path], **as_str_args: Any) -> None: """Write the configuration to a file In addition to the file path, arguments can be specified and will be passed on to the as_str() function. See `as_str()` for more information. Todo: Use files.open_path """ file_path = pathlib.Path(file_path) file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, mode="w") as fid: fid.write(self.as_str(**as_str_args) + "\n") @property def section_names(self) -> List[str]: """Names of sections in Configuration""" return list(self._sections.keys()) @property def sections(self) -> List["ConfigurationSection"]: """Sections in Configuration""" return list(self._sections.values()) @property def sources(self) -> Set[str]: """Sources of entries in Configuration""" return {s[k].source for s in self.sections for k in s.keys() if s[k].source} @property def profiles(self) -> List[ProfileName]: """List of profiles currently being used in Configuration""" return self._profiles @profiles.setter def profiles(self, values: Union[None, List[ProfileName]]) -> None: """Set profiles that will be used in Configuration The list of profiles should be a prioritized list where the first profile listed will be used if available. None is used to indicate default values (no profile), and will be automatically appended at the end of the list of profiles. To not use any profiles, set `cfg.profiles = None`. Args: values: List of profiles to use. """ if values is None: values = [None] elif values[-1] is not None: values.append(None) self._profiles = values self._set_sections_for_profiles() def _set_sections_for_profiles(self) -> None: """Update sections according to profiles""" self._sections.clear() # Add values in reverse order so that the first profile is prioritized for profile in self.profiles[::-1]: for section_name, profile_section in self._profile_sections.get(profile, dict()).items(): self._sections.setdefault(section_name, ConfigurationSection(section_name)) for key, entry in profile_section.items(): self._sections[section_name][key] = entry @property def fallback_config(self) -> "Configuration": """The fallback configuration""" if self._fallback_config is None: raise exceptions.MissingConfigurationError( f"Configuration '{self.name}' has not defined a fallback configuration" ) return self._fallback_config @fallback_config.setter def fallback_config(self, cfg: Optional["Configuration"]) -> None: """Set the fallback configuration""" self._fallback_config = cfg @property def master_section(self) -> "ConfigurationSection": """The master section""" if self._master_section is None: raise exceptions.MissingSectionError(f"Configuration {self.name!r} has not defined a master section") if self._master_section not in self._sections: raise exceptions.MissingSectionError(f"Master section {self._master_section!r} does not exist in" f" configuration {self.name!r}") return self._sections[self._master_section] @master_section.setter def master_section(self, section: Optional[str]) -> None: """Set the master section""" self._master_section = section def get( self, key: str, value: Optional[str] = None, section: Optional[str] = None, default: Optional[str] = None ) -> "ConfigurationEntry": """Get an entry from a configuration with possibility for override and default value A value for an entry is found using the following priorities: 1. An explicit value given in `value`. None is used as a marker for no value. 2. Looked up in the current configuration. 3. Looked up in any fallback confiurations that are defined. 4. The default value is used. If `value` is not None, that value is simply returned as a `ConfigurationEntry`. If `default` is not given (is None), and a value is not found in any other way, a MissingEntryError is raised. Args: key: Name of option (key in the configuration entry). value: Value of entry. Used for overriding the configuration. section: Section in the configuration in which to look up the key. default: Default value that is returned if value is not found any other way. Returns: Entry representing the value. """ if value is not None: return ConfigurationEntry(key, value=value, source="method call", vars_dict=self.vars) try: section_value = self.master_section if section is None else self[section] if isinstance(section_value, ConfigurationEntry): return section_value else: return section_value[key] except (exceptions.MissingSectionError, exceptions.MissingEntryError) as err: try: return self.fallback_config.get(key=key, section=section) except (exceptions.MissingConfigurationError, exceptions.MissingEntryError): if default is None: # Raise original error raise err else: return ConfigurationEntry(key, value=default, source="default value", vars_dict=self.vars) def update( self, section: str, key: str, value: str, *, profile: ProfileName = None, source: str = "unknown", meta: Optional[Dict[str, str]] = None, allow_new: bool = True, _update_sections: bool = True, ) -> None: """Update a configuration section with a configuration entry If `allow_new` is False, the configuration entry must already exist. If it is True the update is allowed to create a new section and a new entry is necessary. The `_update_sections` flag can be used to not update the sections of the configuration, only the profiles. This should typically not be done, but is used by some of the other update methods which update the sections themselves. Args: section: Section to update. key: Key of entry. value: Value of entry. profile: Profile to update. source: Source of the update. meta: Metadata like help text and type hints for the entry. allow_new: Whether to allow the creation of a new section and entry. """ if not allow_new: profile_str = "" if profile is None else f"(profile: '{profile}')" if section not in self._sections: raise exceptions.MissingSectionError( f"Configuration '{self.name}' does not contain section '{section}' {profile_str}" ) if key not in self._sections[section]: raise exceptions.MissingEntryError( f"Section '{section}' of configuration '{self.name}' does not contain entry '{key}' {profile_str}" ) # Add entry to profile source = source if profile is None else f"{source} ({profile})" profile_sections = self._profile_sections.setdefault(profile, dict()) profile_sections.setdefault(section, ConfigurationSection(section)) # Record that configuration has been updated if key not in profile_sections[section] or profile_sections[section][key]._value != value: self._update_count += 1 profile_sections[section][key] = ConfigurationEntry( key, value=value, source=source, meta=meta, vars_dict=self.vars ) # Update sections if _update_sections: self._set_sections_for_profiles() def update_from_file( self, file_path: Union[str, pathlib.Path], allow_new: bool = True, interpolate: bool = False, case_sensitive: bool = False, ) -> None: """Update the configuration from a configuration file The Python ConfigParser is used to read the file. The file format that is supported is described at https://docs.python.org/library/configparser.html Different profiles in a configuration file is denoted by double underscores in the sections names. For instance does the following configuration have a `foo` profile in the `spam` section (in addition to the default profile): [spam] ... [spam__foo] ... If `interpolate` is set to True, ExtendedInterpolation of variables in the configuration file is used. See https://docs.python.org/library/configparser.html#configparser.ExtendedInterpolation for details. Args: file_path: Path to the configuration file. allow_new: Whether to allow the creation of new sections and entries. interpolate: Whether to interpolate variables in the configuration file. case_sensitive: Whether to read keys as case sensitive (or convert to lower case). """ # Use ConfigParser to read from file cfg_parser_cls = CasedConfigParser if case_sensitive else ConfigParser cfg_parser = cfg_parser_cls( allow_no_value=True, delimiters=("=",), interpolation=ExtendedInterpolation() if interpolate else BasicInterpolation(), ) cfg_parser.read(file_path) # Add configuration entries for cfg_section in cfg_parser.sections(): section, has_profile, profile = cfg_section.partition("__") for key, value in cfg_parser[cfg_section].items(): # Handle meta-information if ":" in key: continue meta = {k.partition(":")[-1]: v for k, v in cfg_parser[cfg_section].items() if k.startswith(f"{key}:")} # Create a configuration entry self.update( section, key, value if value is None else value.replace("\n", " "), profile=profile if has_profile else None, source=str(file_path), meta=meta, allow_new=allow_new, _update_sections=False, ) self._set_sections_for_profiles() def update_from_config_section( self, other_section: "ConfigurationSection", section: Optional[str] = None, allow_new: bool = True ) -> None: section = other_section.name if section is None else section for key, entry in other_section.data.items(): self.update( section, key, entry.str, source=entry.source, meta=entry.meta, allow_new=allow_new, _update_sections=False, ) self._set_sections_for_profiles() def update_from_options( self, options: Optional[List[str]] = None, profile: ProfileName = None, source: str = "command line", allow_new: bool = False, ) -> None: if options is None: options = sys.argv[1:] for option in options: if not (option.startswith("--") and "=" in option): continue # Parse config name, section, key and value of the form name:section:key=value opt_key, _, opt_value = option[2:].partition("=") opt_section, _, opt_key = opt_key.rpartition(":") opt_name, _, opt_section = opt_section.rpartition(":") # Update current configuration if opt_name and opt_name != self.name: continue if not opt_section: opt_section = self.master_section.name self.update( opt_section, opt_key, opt_value, profile=profile, source=f"{source} ({option})", allow_new=allow_new, _update_sections=False, ) self._set_sections_for_profiles() def update_from_dict( self, cfg_dict: Dict[str, Any], section: Optional[str] = None, source: str = "dictionary", allow_new: bool = True, ) -> None: section = self.master_section.name if section is None else section for key, value in cfg_dict.items(): self.update(section, key, value, source=source, allow_new=allow_new, _update_sections=False) self._set_sections_for_profiles() def clear(self) -> None: """Clear the configuration""" self._sections.clear() self.clear_vars() @property def vars(self) -> ConfigVars: """The configuration variables""" return self._vars_dict def clear_vars(self) -> None: """Clear the configuration variables""" self._vars_dict.clear() def update_vars(self, new_vars: ConfigVars) -> None: """Update the configuration variables""" self._vars_dict.update(new_vars) def as_str( self, width: Optional[int] = None, key_width: int = 30, only_used: bool = False, metadata: bool = True ) -> str: """The configuration represented as a string This is simililar to what is shown by `str(configuration)` (and implemented by `__str__`), but has more flexibility. Args: width: Width of text for wrapping. Default is width of console. key_width: Width of the key column. Default is 30 characters. only_used: Only include configuration entries that has been used so far. metadata: Include metadata like type and help text. Returns: String representation of the configuration. """ sections = self._sections.values() section_strs = [ s.as_str(width=width, key_width=key_width, only_used=only_used, metadata=metadata) for s in sections ] return "\n\n\n".join(s for s in section_strs if s) def as_dict( self, getters: Optional[Dict[str, Dict[str, str]]] = None, default_getter: str = "str" ) -> Dict[str, Dict[str, Any]]: """The configuration represented as a dictionary Args: getters: How to get the value of each entry in each section. default_getter: How to get the value of entries not specified in getters. Returns: Representation of the configuration as a nested dictionary. """ getters = dict() if getters is None else getters return {k: v.as_dict(getters=getters.get(k), default_getter=default_getter) for k, v in self._sections.items()} def __getitem__(self, key: str) -> Union["ConfigurationSection", "ConfigurationEntry"]: """Get a section or entry from the master section from the configuration""" if key in self.section_names: return self._sections[key] try: return self.master_section[key] except exceptions.MissingSectionError: try: return self.fallback_config[key] except exceptions.MidgardException: raise exceptions.MissingSectionError(f"Configuration {self.name!r} has no section {key!r}") from None def __getattr__(self, key: str) -> Union["ConfigurationSection", "ConfigurationEntry"]: """Get a section or entry from the master section from the configuration""" return self[key] def __delitem__(self, key: str) -> None: """Delete a section from the configuration""" del self._sections[key] def __delattr__(self, key: str) -> None: """Delete a section from the configuration""" del self._sections[key] def __dir__(self) -> List[str]: """List attributes and sections in the configuration""" try: return list(super().__dir__()) + self.section_names + self.master_section.as_list() except exceptions.MissingSectionError: return list(super().__dir__()) + self.section_names def __str__(self) -> str: """The configuration represented as a string This string can be stored in a file and read back with `update_from_file`. """ return "\n\n".join(str(s) for s in self._sections.values()) def __repr__(self) -> str: """A simple string representation of the configuration""" return f"{self.__class__.__name__}(name='{self.name}')" class ConfigurationSection(UserDict): data: Dict[str, "ConfigurationEntry"] def __init__(self, name: str) -> None: super().__init__() self.name: str = name def as_str( self, width: Optional[int] = None, key_width: int = 30, only_used: bool = False, metadata: bool = True ) -> str: """The configuration section represented as a string This is simililar to what is shown by `str(section)` (and implemented by `__str__`), but has more flexibility. Args: width: Width of text for wrapping. Default is width of console. key_width: Width of the key column. Default is 30 characters. only_used: Only include configuration entries that has been used so far. metadata: Include metadata like type and help text. Returns: String representation of the configuration section. """ lines = list() for entry in self.data.values(): if only_used and not entry.is_used: continue lines.append(entry.entry_as_str(width=width, key_width=key_width, metadata=metadata)) if lines: return f"[{self.name}]\n" + "\n".join(lines) else: return "" def as_list(self) -> List[str]: """List of keys of entries in configuration section Returns: List of keys of entries in configuration section. """ return list(self.data.keys()) def as_dict(self, getters: Dict[str, str] = None, default_getter: str = "str") -> Dict[str, Any]: """The configuration section represented as a dictionary Args: getters: How to get the value of each entry in the section. default_getter: How to get the value of entries not specified in getters. Returns: Representation of the configuration section as a dictionary. """ getters = dict() if getters is None else getters getters = {k: getters.get(k, default_getter) for k in self.keys()} return {k: getattr(e, getters[k]) for k, e in self.items()} def __getitem__(self, key: str) -> "ConfigurationEntry": """Get an entry from the configuration section""" try: return self.data[key] except KeyError: raise exceptions.MissingEntryError(f"Configuration section '{self.name}' has no entry '{key}'") from None def __getattr__(self, key: str) -> "ConfigurationEntry": """Get an entry from the configuration section""" try: return self.data[key] except KeyError: raise exceptions.MissingEntryError(f"Configuration section '{self.name}' has no entry '{key}'") from None def __dir__(self) -> List[str]: """List attributes and entries in the configuration section""" return list(super().__dir__()) + self.as_list() def __str__(self) -> str: """The configuration section represented as a string""" return f"[{self.name}]\n" + "\n".join(str(v) for v in self.data.values()) def __repr__(self) -> str: """A simple string representation of the configuration section""" return f"{self.__class__.__name__}(name='{self.name}')" class ConfigurationEntry: _BOOLEAN_STATES = { "0": False, "1": True, "false": False, "true": True, "no": False, "yes": True, "off": False, "on": True, } def __init__( self, key: str, value: Any, *, source: builtins.str = "", meta: Optional[Dict[str, str]] = None, vars_dict: Optional[ConfigVars] = None, _used_as: Optional[Set[builtins.str]] = None, ) -> None: self.source = source self.meta = dict() if meta is None else meta self._key = key self._value = str(value) self._vars_dict = dict() if vars_dict is None else vars_dict self._used_as = set() if _used_as is None else _used_as @property def type(self) -> Optional[builtins.str]: """Type hint for the ConfigurationEntry""" return self.meta.get("type", None) @property def help(self) -> builtins.str: """Help text for the ConfigurationEntry""" return self.meta.get("help", "") @property def str(self) -> builtins.str: """Value of ConfigurationEntry as string""" self._using("str") return self._value def as_str(self) -> builtins.str: """Value of ConfigurationEntry as string""" return self.str @property def int(self) -> builtins.int: """Value of ConfigurationEntry converted to an integer""" self._using("int") try: return int(self._value) except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} cannot be converted to an integer" ) from None def as_int(self) -> builtins.int: """Value of ConfigurationEntry converted to an integer""" return self.int @property def float(self) -> builtins.float: """Value of ConfigurationEntry converted to a float""" self._using("float") try: return float(self._value) except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} cannot be converted to a float" ) from None def as_float(self) -> builtins.float: """Value of ConfigurationEntry converted to a float""" return self.float @property def bool(self) -> builtins.bool: """Value of ConfigurationEntry converted to a boolean The conversion is done by looking up the string value of the entry in _BOOLEAN_STATES. """ self._using("bool") try: return self._BOOLEAN_STATES[self._value.lower()] except KeyError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} cannot be converted to a boolean" ) from None def as_bool(self) -> builtins.bool: """Value of ConfigurationEntry converted to a boolean The conversion is done by looking up the string value of the entry in _BOOLEAN_STATES. """ return self.bool @property def date(self) -> stdlib_datetime.date: """Value of ConfigurationEntry converted to a date object assuming format `FMT_date`""" return self.as_date(format=FMT_date) def as_date(self, format: builtins.str = FMT_date) -> stdlib_datetime.date: """Value of ConfigurationEntry converted to a date object Args: format (String): Format string, see strftime for information about the string. Returns: Date: Value of entry. """ self._using("date") try: return stdlib_datetime.datetime.strptime(self._value, format).date() except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} does not match the date format '{format}'" ) from None @property def datetime(self) -> stdlib_datetime.datetime: """Value of ConfigurationEntry converted to a datetime object assuming format `FMT_datetime`""" return self.as_datetime(format=FMT_datetime) def as_datetime(self, format: builtins.str = FMT_datetime) -> stdlib_datetime.datetime: """Value of ConfigurationEntry converted to a datetime object Args: format (String): Format string, see strftime for information about the string. Returns: Datetime: Value of entry. """ self._using("datetime") try: return stdlib_datetime.datetime.strptime(self._value, format) except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} does not match the date format '{format}'" ) from None @property def path(self) -> pathlib.Path: """Value of ConfigurationEntry interpreted as a path string""" self._using("path") path = self._value if "~" in path: path = os.path.expanduser(path) return pathlib.Path(path) def as_path(self) -> pathlib.Path: """Value of ConfigurationEntry interpreted as a path string""" return self.path @property def list(self) -> List[builtins.str]: """Value of ConfigurationEntry converted to a list by splitting at commas and whitespace""" self._using("list") return self._value.replace(",", " ").split() def as_list( self, split_re: builtins.str = r"[\s,]", convert: Callable = builtins.str, maxsplit: builtins.int = 0 ) -> List[Any]: """Value of ConfigurationEntry converted to a list The entry is converted to a list by using the `split_re`-regular expression. By default the entry will be split at commas and whitespace. Args: split_re: Regular expression used to split entry into list. convert: Function used to convert each element of the list. maxsplit: If nonzero, at most maxsplit splits occur. Returns: Value of entry as list. """ self._using("list") return [convert(s) for s in re.split(split_re, self._value, maxsplit=maxsplit) if s] @property def list_of_lists(self) -> List[List[builtins.str]]: self._using("list_of_lists") raise NotImplementedError def as_list_of_lists( self, split_res: Tuple[builtins.str, ...] = (r"[\s,]", r"[^_\w]"), num_elements: Optional[builtins.int] = None, convert: Callable = builtins.str, ) -> List[List[Any]]: self._using("list_of_lists") raise NotImplementedError @property def tuple(self) -> Tuple[builtins.str, ...]: """Value of ConfigurationEntry converted to tuple by splitting at commas and whitespace""" self._using("tuple") return tuple(self._value.replace(",", " ").split()) def as_tuple( self, split_re: builtins.str = r"[\s,]", convert: Callable = builtins.str, maxsplit: builtins.int = 0 ) -> Tuple[Any, ...]: """Value of ConfigurationEntry converted to a tuple The entry is converted to a tuple by using the `split_re`-regular expression. By default the entry will be split at commas and whitespace. Args: split_re: Regular expression used to split entry into tuple. convert: Function used to convert each element of the tuple. maxsplit: If nonzero, at most maxsplit splits occur. Returns: Value of entry as tuple. """ self._using("tuple") return tuple([convert(s) for s in re.split(split_re, self._value, maxsplit=maxsplit) if s]) @property def dict(self) -> Dict[builtins.str, builtins.str]: """Value of ConfigurationEntry converted to a dict""" self._using("dict") return dict(i.partition(":")[::2] for i in self.list) def as_dict( self, item_split_re: builtins.str = r"[\s,]", key_value_split_re: builtins.str = r"[:]", convert: Callable = builtins.str, maxsplit: builtins.int = 0, ) -> Dict[builtins.str, Any]: """Value of ConfigurationEntry converted to a dictionary By default the dictionary is created by splitting items at commas and whitespace, and key from value at colons. Args: item_split_re: Regular expression used to split entry into items. key_value_split_re: Regular expression used to split items into keys and values. convert: Function used to convert each value in the dictionary. maxsplit: If nonzero, at most maxsplit splits occur when splitting entry into items. Returns: Value of entry as dict. """ self._using("dict") items = [s for s in re.split(item_split_re, self._value, maxsplit=maxsplit) if s] key_values = [re.split(key_value_split_re, i, maxsplit=1) for i in items] return {k: convert(v) for k, v in key_values} def as_enum(self, enum: builtins.str) -> enums.enum.Enum: """Value of ConfigurationEntry converted to an enumeration Args: enum (String): Name of Enum. Returns: Enum: Value of entry as Enum. """ self._using("enum") return enums.get_value(enum, self._value) @property def replaced(self) -> "ConfigurationEntry": """Value of ConfigurationEntry with {$}-variables replaced""" return self.replace() def replace(self, default: Optional[builtins.str] = None, **replace_vars: builtins.str) -> "ConfigurationEntry": replacement_vars = dict(self._vars_dict, **replace_vars) replacement_value = self._value replacements = list() matches = re.findall(r"\{\$\w+\}", replacement_value) for match in matches: var = match.strip("${}") replacement = str(replacement_vars.get(var, match if default is None else default)) replacements.append(f"{var}={replacement}") replacement_value = replacement_value.replace(match, replacement) return self.__class__( key=self._key, value=replacement_value, source=self.source + " ({','.join(replacements)})", _used_as=self._used_as, ) @property def is_used(self) -> builtins.bool: return bool(self._used_as) def entry_as_str( self, width: Optional[builtins.int] = None, key_width: builtins.int = 30, metadata: builtins.bool = True ) -> builtins.str: """The configuration entry represented as a string This is simililar to what is shown by `str(entry)` (and implemented by `__str__`), but has more flexibility. Args: width: Width of text for wrapping. Default is width of console. key_width: Width of the key column. Default is 30 characters. metadata: Include metadata like type and help text. Returns: String representation of the configuration entry. """ lines = list() width = console.columns() if width is None else width fill_args = dict(width=width, hanging=key_width + 3, break_long_words=False, break_on_hyphens=False) # The entry itself lines.append(console.fill(f"{self._key:<{key_width}} = {self._value}", **fill_args)) # Metadata, including help text and type hints if metadata and self.meta: for meta_key, meta_value in self.meta.items(): if meta_value is None: lines.append(console.fill(f"{self._key}:{meta_key}", **fill_args)) else: lines.append(console.fill(f"{f'{self._key}:{meta_key}':<{key_width}} = {meta_value}", **fill_args)) lines.append("") return "\n".join(lines) def _using(self, as_type: builtins.str) -> None: """Register that entry is used as a type Args: as_type: Name of type entry is used as. """ self._used_as.add(as_type) def __add__(self, other: "ConfigurationEntry") -> "ConfigurationEntry": if isinstance(other, self.__class__): if self.source == other.source: source = f"{self.source} (+)" else: source = f"{self.source} + {other.source}" return self.__class__( key=f"{self._key} + {other._key}", value=self.str + other.str, source=source, vars_dict=self._vars_dict ) else: return NotImplemented def __bool__(self) -> builtins.bool: """A ConfigurationEntry is truthy if the value is not empty""" return bool(self._value) def __str__(self) -> builtins.str: """The configuration entry represented as a string""" return self.entry_as_str() def __repr__(self) -> builtins.str: """A simple string representation of the configuration entry""" return f"{self.__class__.__name__}(key='{self._key}', value='{self._value}')" PKҲLmidgard/config/constant.pyPKԲLmidgard/config/unit.pyPKfLmidgard/coords/__init__.pyPKLmidgard/coords/position.pyPK=Lmidgard/coords/time.pyPK,Lmidgard/dev/__init__.pyPKαLmidgard/dev/cache.pyPKbMG)00midgard/dev/console.py"""Simpler dealing with the console Description: ------------ Utilities for using the console. Mainly wrappers around other libraries to make them easier and more intuitive to use. Size of console: The two functions `lines()` and `columns()` report the current size of the console. Textwrapping: The function `fill()` can be used to rewrap a text-string so that it fits inside the console. Color: The sub-module `color` can be used to set the foreground and background colors. Note that the color functionality depends on the external package `colorama`. If `colorama` is not installed, color gracefully falls back to not showing any color. Examples: --------- >>> from midgard.dev import console >>> console.columns() # doctest: +SKIP 86 >>> print(console.fill(a_very_long_string)) # doctest: +SKIP Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras tempus eleifend feugiat. Maecenas vitae posuere metus. Sed sit amet fermentum velit. Aenean vitae turpis at risus sollicitudin fringilla in in nisi. Maecenas vitae ante libero. Aenean ut eros consequat, ornare erat at, tempus arcu. Suspendisse velit leo, eleifend eget mi non, vehicula ultricies erat. Vestibulum id nisi eget nisl venenatis dignissim. Duis cursus quam dui, vel hendrerit nibh lacinia id. >>> print(console.color.Fore.YELLOW + console.color.Back.BLUE + 'I am YELLOW text on BLUE backdrop!') # doctest: +SKIP I am YELLOW text on a BLUE background! """ # Standard library imports import shutil import textwrap from typing import Any, Optional # Midgard imports from midgard.dev import optional # Use colorama for coloring in the console, graceful fallback to no color if colorama is not installed _empty_string = optional.EmptyStringMock("_empty_string") color = optional.optional_import( "colorama", attrs=dict(init=lambda **_: None, Back=_empty_string, Fore=_empty_string, Style=_empty_string) ) color.init(autoreset=True) def lines() -> int: """The height of the console Returns: The heigth of the console in characters. """ return shutil.get_terminal_size().lines def columns() -> int: """The width of the console Returns: The width of the console in characters. """ return shutil.get_terminal_size().columns def fill(text: str, *, width: Optional[int] = None, hanging: Optional[int] = None, **tw_args: Any) -> str: """Wrapper around textwrap.fill The `tw_args` are passed on to textwrap.fill. See textwrap.TextWrapper for available keyword arguments. The default value for `width` is console.columns(), while the new argument `hanging`, if defined, will try to set (although not override) the textwrap-arguments `initial_indent` and `subsequent_indent` to create a hanging indent (no indent on the first line) of `hanging` spaces. Args: text: Text that will be wrapped. width: The maximum width (in characters) of wrapped lines. hanging: Number of characters used for hanging indent. tw_args: Arguments passed on to `textwrap.fill`. Returns: Wrapped string. """ width = columns() if width is None else width if hanging is not None: tw_args.setdefault("initial_indent", "") tw_args.setdefault("subsequent_indent", " " * hanging) return textwrap.fill(text, width=width, **tw_args) def dedent(text: str, num_spaces: Optional[int] = None) -> str: """Wrapper around textwrap.dedent Dedents at most num_spaces. If num_spaces is not specified, dedents as much as possible. Args: text: Text that will be dedented. num_spaces: Number of spaces that will be used for dedentation. Returns: Dedented string. """ # Dedent the text all the way dedented_text = textwrap.dedent(text) if num_spaces is None: return dedented_text # Indent it back if necessary num_indents = (num_leading_spaces(text) - num_leading_spaces(dedented_text)) - num_spaces if num_indents > 0: return indent(dedented_text, num_spaces=num_indents) else: return dedented_text def indent(text: str, num_spaces: int, **tw_args: Any) -> str: """Wrapper around textwrap.indent The `tw_args` are passed on to textwrap.indent. Args: text: Text that will be indented. num_spaces: Number of spaces that will be used for indentation. Returns: Indented string. """ tw_args["prefix"] = " " * num_spaces return textwrap.indent(text, **tw_args) def num_leading_spaces(text: str, space_char: str = " ") -> int: """Count number of leading spaces in a string Args: text: String to count. space_char: Which characters count as spaces. Returns: Number of leading spaces. """ return len(text) - len(text.lstrip(space_char)) PKkL5Mrצmidgard/dev/exceptions.py"""Definition of Midgard-specific exceptions Description: ------------ Custom exceptions used by Midgard for more specific error messages and handling. """ class MidgardException(Exception): pass class MidgardExit(SystemExit, MidgardException): pass class InitializationError(MidgardException): pass class FieldExistsError(MidgardException): pass class FieldDoesNotExistError(MidgardException): pass class MissingConfigurationError(MidgardException): pass class MissingDataError(MidgardException): pass class MissingEntryError(MidgardException): pass class MissingSectionError(MidgardException): pass class ParserError(MidgardException): pass class TimerNotRunning(MidgardException): pass class TimerRunning(MidgardException): pass class UnknownEnumError(MidgardException): pass class UnknownPackageError(MidgardException): pass class UnknownPluginError(MidgardException): pass class UnitError(MidgardException): pass PKuLmidgard/dev/interactive.pyPK4[L/8<<midgard/dev/library.py"""Python wrapper around C-libraries Description: ------------ Loads a C-library. If a library is missing, a mock library is returned. If this mock is used for anything, a warning will be printed. This is done to avoid dependencies to all the C/C++-libraries for Python programs only using some of them. """ # Standard library imports import ctypes as c from ctypes import util as c_util import sys def load_name(library_name, func_specs=None, name_patterns=None): """Load the given shared C-library See `load_path` for an explanation of the `func_specs` and `name_patterns`-arguments. Args: library_name (String): The name of the library. func_specs (Dict): Specification of types in lib (see load_path). name_patterns (List): Name mangling patterns (see load_path). Returns: ctypes.CDLL: Representation of the shared library. """ library_path = c_util.find_library(library_name) return load_path(library_path, func_specs=func_specs, name_patterns=name_patterns) def load_path(library_path, func_specs=None, name_patterns=None): """Load the given shared C-library The optional func_specs-dictionary can be used to specify argument and return types of functions in the library (see the ctypes documentation for information about argtypes and restype). The dictionary should be on the form:: func_spec = {'func_1': dict(func_name='name_of_func_1_in_lib', argtypes=[ ... argtypes of func_1 ... ], restype=... restype of func_1 ...), 'func_2': ... } If the library is not found, a mock library is returned instead. The mock library will print a warning if it is used. For some libraries, name mangling is used and this might be different depending on operating system and how the library is compiled. For instance, in a Fortran library the function `Test` might be represented as `__Test` on a Windows system and `test_` (with lower-case `t`) on a Linux system. This can be handled by providing a list of possible patterns. The above example can be handled by:: name_patterns = ('__{func_name}', '{func_name_lower}_') In this case, each function in func_specs is looked up by testing each pattern in turn until a match is found. Args: library_path (String): The path to the library. func_specs (Dict): Specification of types in library (see above). name_patterns (List): Name mangling patterns (see above). Returns: ctypes.CDLL: Representation of the shared library. """ library_handle = c.cdll.LoadLibrary(library_path) # Return a Mock object if the library is not found if library_handle._name is None: mock = SimpleMock(name=library_path) return mock # Handle name mangling if name_patterns is None: def mangled_name(name): return name else: def mangled_name(name): for pattern in name_patterns: full_name = pattern.format(func_name=name, func_name_lower=name.lower()) if hasattr(library_handle, full_name): return full_name return name # Set argument and return types on functions if func_specs: for name, spec in func_specs.items(): if "func_name" in spec: func_name = mangled_name(spec["func_name"]) else: func_name = mangled_name(name) func = getattr(library_handle, func_name) delattr(library_handle, func_name) if "argtypes" in spec: func.argtypes = spec["argtypes"] if "restype" in spec: func.restype = spec["restype"] setattr(library_handle, name, func) return library_handle class SimpleMock: """Class that can stand in for any other object The SimpleMock is used to stand in for any library that can not be imported. The mock object simply returns itself whenever it is called, or any attributes are looked up on the object. This is done, to avoid ImportErrors when a library is imported, but never used (typically because a plugin is loaded but never called). Instead the ImportError is raised when the SimpleMock is used in any way. The ImportError will only be raised once for any SimpleMock-object (which is only important if the ImportError is caught and the program carries on). """ def __init__(self, name, raise_error=True): """Initialize SimpleMock object Args: name (String): Name of SimpleMock-object. raise_error (Bool): Should ImportError be raised when using object. """ self._name = name self._children = dict() self._raise_error = raise_error def _raise_import_error(self): """Raise an import error when the SimpleMock object is used The ImportError is only raised the first time the object is used. """ # Only raise the error once if self._raise_error: self._raise_error = False else: return # Find calling function caller = sys._getframe() while caller.f_code.co_filename == __file__: caller = caller.f_back func_name = caller.f_code.co_name line_no = caller.f_lineno file_name = caller.f_code.co_filename # Raise ImportError with a helpful message raise ImportError( "The library '{}' is not installed, but is used by '{}' on line {} of {}" "".format(self._name, func_name, line_no, file_name) ) def __call__(self, *args, **kwargs): """Return the same SimpleMock-object when it is called An ImportError is raised the first time the object is used. Returns: SimpleMock: Itself. """ self._raise_import_error() return self def __getattr__(self, key): """Create a child-SimpleMock-object and return it. The same child object is returned if the same attribute is gotten several times. An ImportError is raised the first time the SimpleMock-object is used. Additional errors are not raised for the children. Args: key (String): Name of attribute. Returns: SimpleMock: A child-SimpleMock-object. """ self._raise_import_error() if key not in self._children: self._children[key] = type(self)("{}.{}".format(self._name, key), raise_error=self._raise_error) setattr(self, key, self._children[key]) return self._children[key] def __repr__(self): """String representation of the SimpleMock-object Returns: String: Simple representation of the SimpleMock-object. """ return "{}('{}')".format(self.__class__.__name__, self._name) def __str__(self): """Convert to the empty string Returns: String: An empty string. """ return "" PKױLmidgard/dev/log.pyPK*bM S4BBmidgard/dev/optional.py"""Midgard library module for handling optional dependencies Description: ------------ Import dependencies that are only necessary for specific parts of Midgard. Using this module will delay raising an ImportError until the dependency is actually used. This means that if one for instance only wants to run a GNSS analysis (or only use a Rinex-parser) installing special libraries only used for VLBI is not necessary. Examples: --------- The optional import is typically used as follows:: from midgard.lib import optional netCDF4 = optional.optional_import('netCDF4') """ # Standard library imports import importlib import sys from typing import Any, Dict, Optional, Union class SimpleMock: """Class that can stand in for any other object The SimpleMock is used to stand in for any library that can not be imported. The mock object simply returns itself whenever it is called, or any attributes are looked up on the object. This is done, to avoid ImportErrors when a library is imported, but never used (for instance if a plugin is loaded but never called). Instead the ImportError is raised when the SimpleMock is used in any way. The ImportError will only be raised once for any SimpleMock-object (which is only important if the ImportError is caught and the program carries on). The exception is if any attributes (`attrs`) are explicitly defined on the mock. No exception is raised if those attributes are looked up. """ def __init__(self, name: str, raise_error: bool = True, attrs: Optional[Dict[str, Any]] = None) -> None: """Initialize SimpleMock object Args: name: Name of SimpleMock-object. Used for string representation and when raising Errors. raise_error: Whether ImportError should be raised when object is used. attrs: Attributes that should be added to the SimpleMock. """ self._name = name self._children: Dict[str, "SimpleMock"] = dict() self._raise_error = raise_error self._attrs = attrs if attrs is not None: for name, attr in attrs.items(): setattr(self, name, attr) def _raise_import_error(self) -> None: """Raise an import error when the SimpleMock object is used The ImportError is only raised the first time the object is used. """ # Only raise the error once if self._raise_error: self._raise_error = False else: return # Find calling function caller = sys._getframe() while caller.f_code.co_filename == __file__: caller = caller.f_back func_name = caller.f_code.co_name line_num = caller.f_lineno file_name = caller.f_code.co_filename # Raise ImportError with a helpful message raise ImportError( f"The module '{self._name}' is not installed, " f"but is used by '{func_name}' on line {line_num} of {file_name}" ) def __call__(self, *args: Any, **kwargs: Any) -> "SimpleMock": """Return the same SimpleMock-object when it is called An ImportError is raised the first time the object is used. Returns: Itself. """ self._raise_import_error() return self def __getattr__(self, key: str) -> Any: """Create a child-SimpleMock-object and return it. The same child object is returned if the same attribute is gotten several times. An ImportError is raised the first time the SimpleMock-object is used. Additional errors are not raised for the children. Args: key: Name of attribute. Returns: A child-SimpleMock-object. """ self._raise_import_error() if key not in self._children: child_name = f"{self._name}.{key}" self._children[key] = type(self)(child_name, raise_error=self._raise_error, attrs=self._attrs) setattr(self, key, self._children[key]) return self._children[key] def __repr__(self) -> str: """String representation of the SimpleMock-object Returns: Simple string representation of the SimpleMock-object. """ return f"{self.__class__.__name__}('{self._name}')" class EmptyStringMock(SimpleMock): """A mock object whose properties are all empty strings """ def __getattr__(self, key: str) -> str: """All attributes are empty strings. Args: key: Name of attribute. Returns: An empty string. """ return "" def optional_import( module_name: str, raise_error: bool = True, mock_cls: type = SimpleMock, attrs: Optional[Dict[str, Any]] = None ) -> Union[Any, SimpleMock]: # TODO: Should be typed types.ModuleType? but causes typing errors """Try to import an optional module If the module does not exist, a SimpleMock-object is returned instead. If this SimpleMock-object is later used, an ImportError will be raised then (if `raise_error` is True, which is default). Args: module_name: Name of module to import. raise_error: Whether an ImportError should be raised if the module does not exist, but is used. attrs: Attributes that should be added to the SimpleMock used if the module does not exist. Returns: Imported module object, or a SimpleMock-object if the module can not be imported. """ try: return importlib.import_module(module_name) except ImportError: return mock_cls(module_name, raise_error=raise_error, attrs=attrs) PK]M3"M"Mmidgard/dev/plugins.py"""Set up a plug-in architecture for Midgard Description: ------------ In order to be able to add models, parsers, data sources etc without needing to hardcode names, but rather pick them from configuration files, we use a simple plug-in architecture. The plug-in mechanism is based on the different plug-ins registering themselves using the `register` decorator: from midgard.dev import plugins @plugins.register def simple_model(rundate, tech, dset): ... Plug-ins are registered based on the name of the module (file) they are defined in, as well as the package (directory) which contains them. Typically all plug-ins of a given type are collected in a package, e.g. models, techniques, parsers, etc. To list all plug-ins in a package use `names`: > from midgard.dev import plugins > plugins.names('midgard.models') ['model_one', 'model_three', 'model_two'] If the optional parameter `config_key` is given, then only plug-ins listed in the corresponding section in the current configuration file is listed. For instance, if the configuration file contains a line saying ham_models = model_three, model_one then we can list only the `ham_models` as follows: > from midgard.dev import plugins > plugins.names('midgard.models', config_key='ham_models') ['model_one', 'model_three'] Note that the plug-ins by default are sorted alphabetically. To run the plug-ins, use either `call_all` or `call_one`. The former calls all plug-ins and returns a dictionary containing the result from each plug-in. As with `names` the optional parameter `config_key` may be given: > from midgard.dev import plugins > plugins.call_all('midgard.models', config_key='ham_models', arg_to_plugin='hello') {'model_three': , 'model_one': } Arguments to the plug-ins should be passed as named arguments to `call_all`. Similarly, one plug-in may be called explicitly using `call_one`: > from midgard.dev import plugins > plugins.call_one('midgard.models', plugin_name='model_one', arg_to_plugin='hello') There may be more than one function in each plug-in that is decorated by `register`. In this case, the default behaviour is that only the first function will be called. To call the other registered functions one should use the `list_parts` function to get a list of these functions and call them explicitly using the `part` optional parameter to `call_one`: > from midgard.dev import plugins > plugins.list_parts('midgard.techniques', plugin_name='vlbi') ['read', 'edit', 'calculate', 'estimate', 'write_result']) > for part in plugins.list_parts('midgard.techniques', plugin_name='vlbi'): ... plugins.call_one('midgard.techniques', plugin_name='vlbi', part=part, ...) """ # Standard library imports import functools import importlib import pathlib import re import sys from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Tuple # Midgard imports from midgard.dev import console from midgard.dev.exceptions import UnknownPackageError, UnknownPluginError # The _PLUGINS-dict is populated by the `register` decorator in each module. _PLUGINS: Dict[str, Dict[str, Any]] = dict(__aliases__=dict(), __packages__=dict()) # Simple structure containing information about a plug-in class Plugin(NamedTuple): """Information about a plug-in Args: name: Name of the plug-in. function: The plug-in. file_path: Path to the source code of the plug-in, may be used to add the source as a dependency. sort_value: Value used when sorting plug-ins in order to control the order they are called. """ name: str function: Callable file_path: pathlib.Path sort_value: int # # REGISTER PLUG-INS # def register(func: Callable, name: Optional[str] = None, sort_value: int = 0) -> Callable: """Decorator used to register a plug-in Plug-ins are registered based on the name of the module (file) they are defined in, as well as the package (directory) which contains them. Typically all plug-ins of a given type are collected in a package, e.g. models, techniques, parsers, etc. The path to the source code file is also stored. This is used to be able to add the source code as a dependency file when the plug-in is called. If `name` is given, the plug-in is registered based on this name instead of the name of the module. The name of the module is still registered as a part that can be used to distinguish between similar plug-ins in different files (see for instance how `session` is used in `midgard.pipelines`). Args: func: The function that is being registered. name: Alternative name of plug-in. Used by `register_named`. sort_value: The value used when sorting plug-ins. Used by `register_ordered`. Returns: The function that is being registered. """ # Get information from the function being registered package_name, _, plugin_name = func.__module__.rpartition(".") package_name = _PLUGINS["__aliases__"].get(package_name, package_name) file_path = pathlib.Path(sys.modules[func.__module__].__file__) # Store Plugin-object in _PLUGINS dictionary _PLUGINS["__packages__"].setdefault(package_name, [package_name]) plugin_info = _PLUGINS.setdefault(package_name, dict()).setdefault(plugin_name, dict()) if name is None: name = func.__name__ # Name of function is used as default name plugin_info.setdefault("__parts__", list()).append(name) # Only unnamed parts are added to list plugin = Plugin(f"{plugin_name}.{name}", func, file_path, sort_value) plugin_info[name] = plugin # log.debug("Registering {} as a {}-plugin from {}", plugin.name, package_name, plugin.file_path) # Add first registered unnamed part as default if "__parts__" in plugin_info: plugin_info["__default__"] = plugin_info[plugin_info["__parts__"][0]] return func def register_named(name: str) -> Callable: """Decorator used to register a named plug-in This allows for overriding the name used to register the plug-in. See `register` for more details. Args: name: Name used for plug-in instead of module name. Returns: Decorator that registers a named function. """ return functools.partial(register, name=name) def register_ordered(sort_value: int) -> Callable: """Decorator used to register a plug-in with a specific sort order The sort value should be a number. Lower numbers are sorted first, higher numbers last. Plug-ins without an explicit sort_order gets the sort value of 0. Args: sort_value: The value used when sorting plug-ins. Returns: Decorator that registers an ordered function. """ return functools.partial(register, sort_value=sort_value) # # CALL PLUG-INS # def call( package_name: str, plugin_name: str, part: Optional[str] = None, prefix: Optional[str] = None, plugin_logger: Optional[Callable[[str], None]] = None, **plugin_args: Any, ) -> Any: """Call one plug-in If the plug-in is not part of the package an UnknownPluginError is raised. If there are several functions registered in a plug-in and `part` is not specified, then the first function registered in the plug-in will be called. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in, i.e. the module containing the plug-in. part: Name of function to call within the plug-in (optional). prefix: Prefix of the plug-in name, used if the plug-in name is not found (optional). plugin_logger: Function used for logging (optional). plugin_args: Named arguments passed on to the plug-in. Returns: Return value of the plug-in. """ # Get Plugin-object plugin_name = load(package_name, plugin_name, prefix=prefix) part = "__default__" if part is None else part try: plugin = _PLUGINS[package_name][plugin_name][part] except KeyError: # TODO: List available plugins raise UnknownPluginError(f"Plugin {part!r} not found for {plugin_name!r} in {package_name!r}") from None # Log message about calling plug-in if plugin_logger is not None: plugin_logger(f"Start plug-in {plugin.name!r} in {package_name!r}") # Call plug-in return plugin.function(**plugin_args) def call_all( package_name: str, plugins: Optional[List[str]] = None, part: Optional[str] = None, prefix: Optional[str] = None, plugin_logger: Optional[Callable[[str], None]] = None, **plugin_args: Any, ) -> Dict[str, Any]: """Call all plug-ins in a package If `plugins` is given, it should be a list of names of plug-ins. If a plug-in listed in the `plugins`-list or in the config file does not exist, an UnknownPluginError is raised. If `plugins` is not given, all available plugins will be called. Do note, however, that this will import all python files in the package. Args: package_name: Name of package containing plug-ins. plugins: List of plug-in names that should be used (optional). part: Name of function to call within the plug-ins (optional). prefix: Prefix of the plug-in names, used for a plug-in if it is not found (optional). plugin_logger: Function used for logging (optional). plugin_args: Named arguments passed on to all the plug-ins. Returns: Dictionary of all results from the plug-ins. """ plugin_names = names(package_name, plugins=plugins, prefix=prefix) return {p: call(package_name, p, part=part, plugin_logger=plugin_logger, **plugin_args) for p in plugin_names} # # GET DOCUMENTATION FOR PLUG-INS # def doc( package_name: str, plugin_name: str, part: Optional[str] = None, prefix: Optional[str] = None, long_doc: bool = True, include_details: bool = False, ) -> str: """Document one plug-in If the plug-in is not part of the package an UnknownPluginError is raised. If there are several functions registered in a plug-in and `part` is not specified, then the first function registered in the plug-in will be documented. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in, i.e. the module containing the plug-in. part: Name of function to call within the plug-in (optional). prefix: Prefix of the plug-in name, used if the plug-in name is unknown (optional). long_doc: Whether to return the long doc-string or the short one-line string (optional). include_details: Whether to include development details like parameters and return values (optional). Returns: Documentation of the plug-in. """ # Get Plugin-object and pick out doc-string plugin_name = load(package_name, plugin_name, prefix=prefix) part = "__default__" if part is None else part try: plugin = _PLUGINS[package_name][plugin_name][part] except KeyError: raise UnknownPluginError(f"Plugin {part!r} not found for {plugin_name!r} in {package_name!r}") from None doc = plugin.function.__doc__ if plugin.function.__doc__ else "" if long_doc: # Strip short description and indentation doc = console.dedent("\n\n".join(doc.split("\n\n")[1:])) lines = doc.rstrip().splitlines() # Stop before Args:, Returns: etc if details should not be included idx_args = len(lines) if not include_details: re_args = re.compile("(Args:|Returns:|Details:|Examples?:|Attributes:)$") try: idx_args = [re_args.match(l) is not None for l in lines].index(True) except ValueError: pass return "\n".join(lines[:idx_args]).strip() else: # Return short description return doc.split("\n\n")[0].replace("\n", " ").strip() def doc_all( package_name: str, plugins: Optional[Iterable[str]] = None, prefix: Optional[str] = None, long_doc: bool = True, include_details: bool = False, ) -> Dict[str, str]: """Call all plug-ins in a package If `plugins` is given, it should be a list of names of plug-ins. If a plug-in listed in the `plugins`-list does not exist, an UnknownPluginError is raised. If `plugins` is not given, all available plugins will be called. Do note, however, that this will import all python files in the package. Args: package_name (String): Name of package containing plug-ins. plugins (Tuple): List of plug-ins that should be used (optional). prefix (String): Prefix of the plug-in names, used if any of the plug-ins are unknown (optional). long_doc (Boolean): Whether to return the long doc-string or the short one-line string (optional). include_details (Boolean): Whether to include development details like parameters and return values (optional). Returns: Dict: Dictionary of all results from the plug-ins. """ plugin_names = names(package_name, plugins=plugins, prefix=prefix) return {p: doc(package_name, p, long_doc=long_doc, include_details=include_details) for p in plugin_names} # # LIST AVAILABLE PLUG-INS # def names(package_name: str, plugins: Optional[Iterable[str]] = None, prefix: Optional[str] = None) -> List[str]: """List plug-ins in a package If `plugins` is given, it should be a list of names of plug-ins. If a plug-in listed in the `plugins`-list does not exist, an UnknownPluginError is raised. If `plugins` is not given, all available plugins will be listed. Do note, however, that this will import all python files in the package. Args: package_name: Name of package containing plug-ins. plugins: List of plug-ins that should be used (optional). prefix: Prefix of the plug-in names, used if any of the plug-in names are unknown (optional). Returns: List of strings with names of plug-ins. """ # Figure out names of plug-ins if plugins is None: _import_all(package_name) plugins = _PLUGINS.get(package_name, dict()).keys() # Load each plug-in and return them in sort order def _sort_value(plugin: str) -> Tuple[int, str]: """Pick out sort_value of plugin""" return (getattr(_PLUGINS[package_name][plugin].get("__default__"), "sort_value", 0), plugin) return sorted((load(package_name, p, prefix=prefix) for p in plugins), key=_sort_value) def parts(package_name: str, plugin_name: str, prefix: Optional[str] = None) -> List[str]: """List all parts of one plug-in Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in. prefix: Prefix of the plug-in name, used if the plug-in name is unknown (optional). Returns: List: Strings with names of parts. """ plugin_name = load(package_name, plugin_name, prefix=prefix) return _PLUGINS[package_name][plugin_name].get("__parts__", list()) def exists(package_name: str, plugin_name: str) -> bool: """Check whether or not a plug-in exists in a package Tries to import the given plug-in. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in (module). Returns: True if plug-in exists, False otherwise. """ if plugin_name not in _PLUGINS.get(package_name, dict()): try: _import_one(package_name, plugin_name) except UnknownPluginError: return False return plugin_name in _PLUGINS.get(package_name, dict()) # # LOAD PLUG-INS # def add_alias(package_name: str, alias: str) -> None: """Add alias to plug-in package This allows one package of plug-ins to be spread over several directories Args: package_name: Name of package containing plug-ins. directory: Additional plug-in directory. """ _PLUGINS["__packages__"].setdefault(package_name, [package_name]).append(alias) _PLUGINS["__aliases__"][alias] = package_name def load(package_name: str, plugin_name: str, prefix: Optional[str] = None) -> str: """Load one plug-in from a package First tries to load the plugin with the given name. If that fails, it tries to load {prefix}_{plugin_name} instead. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in (module). prefix: Prefix of the plug-in name, used if the plug-in name is unknown (optional). Returns: Actual name of plug-in (with or without prefix). """ if plugin_name not in _PLUGINS.get(package_name, dict()): try: _import_one(package_name, plugin_name) except UnknownPluginError as import_err: if prefix: try: return load(package_name, f"{prefix}_{plugin_name}") except UnknownPluginError: pass raise import_err from None # Raise original exception return plugin_name def _aliases(package_name: str) -> List[str]: """Aliases for the given package Args: package_name: Name of package. Returns: List of names of packages (including the given one) containing plugins. """ return _PLUGINS["__packages__"].get(package_name, [package_name]) def _import_one(package_name: str, plugin_name: str) -> None: """Import a plugin from a package This is essentially just a regular python import. As the module is imported, the _PLUGINS-dict will be populated by @register decorated functions in the file. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in (module). """ for package in _aliases(package_name): module_name = f"{package}.{plugin_name}" try: importlib.import_module(module_name) except ImportError: pass # Plugin might be aliased from another package if plugin_name not in _PLUGINS.get(package_name, dict()): raise UnknownPluginError(f"Plug-in {plugin_name!r} not found in package {package_name!r}") from None def _import_all(package_name: str) -> None: """Import the relevant .py-files in the given package directory As each file is imported, the _PLUGINS-dict will be populated by @register decorated functions in the files. Args: package_name: Name of package containing plug-ins. """ file_paths: List[pathlib.Path] = list() for package_alias in _aliases(package_name): # Figure out the directory of the package by importing it try: package = importlib.import_module(package_alias) except ImportError: raise UnknownPackageError(f"Plug-in package {package_name!r} not found") from None # List all .py files in the given directory directory = pathlib.Path(package.__file__).parent file_paths += [f for f in directory.glob("*.py") if not f.stem.startswith("_")] # Import all Python files for file_path in file_paths: plugin_name = file_path.stem try: _import_one(package_name, plugin_name) except UnknownPluginError: pass # OK if .py file does not contain a plugin PKvdMmidgard/dev/profiler.pyPK:bMBmidgard/dev/timer.py"""Class for timing the running time of functions and code blocks Description: ------------ The `dev.timer` can be used to log the running time of functions and general code blocks. Typically, you will import the `Timer`-class from within the module: from midgard.dev.timer import Timer The Timer can then be used in three different ways: 1. As a decorator to time one function: @Timer('The time to execute some_function was') def some_function(some_argument, some_other_argument=some_value): pass 2. As a context manager together with `with` to time a code block: with Timer('Finish doing stuff in', logger=logger.debug) as t: do_something() do_something_else() 3. With explicit `start`- and `end`-statements: t = Timer() t.start() do_something() do_something_else() t.end() As can be seen in the examples above, `Timer()` may be called with several optional parameters, including the text to report when the timer ends and which logger is used to report the timing. See `Timer.__init__` for more details. """ # Standard library imports from contextlib import ContextDecorator import time from typing import Any, Callable, Optional # Midgard imports from midgard.dev import exceptions class Timer(ContextDecorator): """Class for timing running time of functions and code blocks. """ def __init__( self, text: str = "Elapsed time:", fmt: str = ".4f", logger: Optional[Callable[[str], None]] = print ) -> None: """Set up a new timer The text to be shown when logging the timer can be customized. Typically, the value of the timer will be added at the end of the string (e.g. 'Elapsed time: 0.1234 seconds'). However, this can be customized by adding a '{}' to the text. For example `text='Used {} to run the code'` will produce something like 'Used 0.1234 seconds to run the code'. Args: text: Text used when logging the timer (see above). fmt: Format used when formatting the time (default 4 decimals). logger: Function used to do the logging. """ super().__init__() self._start: Optional[float] = None self._end: Optional[float] = None self.text = text if (text is None or "{}" in text) else (text + " {}").strip() self.fmt = fmt self.logger = (lambda _: None) if logger is None else logger @staticmethod def timer() -> float: """Get current value of timer Using the built-in `time.perf_counter` to do the timing. Returns: Current value of timer. """ return time.perf_counter() def start(self) -> None: """Start the timer """ self._start = self.timer() self._end = None def pause(self) -> float: """Pause the timer without logging. Use .start() to restart the timer """ self._end = self.timer() time_elapsed = self.elapsed() self._start = None return time_elapsed def end(self) -> float: """End the timer and log the time elapsed Returns: The time elapsed in seconds. """ time_elapsed = self.pause() self._log(time_elapsed) return time_elapsed def elapsed(self) -> float: """Log the time elapsed Can be used explicitly to log the time since a timer started without ending the timer. Returns: The time elapsed in seconds. """ if self._start is None: raise exceptions.TimerNotRunning( f"The timer is not running. See `help({self.__module__})` for information on how to start one." ) timer_end = self.timer() if self._end is None else self._end time_elapsed = 0 if self._start is None else timer_end - self._start return time_elapsed def _log(self, time_elapsed: float) -> None: """Do the actual logging of elapsed time Args: The time elapsed in seconds. """ time_text = f"{time_elapsed:{self.fmt}} seconds" if self.text: self.logger(self.text.format(time_text)) def __enter__(self) -> "Timer": """Start the timer as a context manager """ self.start() return self def __exit__(self, *_: Any) -> None: """End the timer and log the time elapsed as a context manager """ self.end() class AccumulatedTimer(Timer): def __init__( self, text: str = "Elapsed time:", fmt: str = ".4f", logger: Optional[Callable[[str], None]] = print ) -> None: super().__init__(text, fmt, logger) self.accumulated = 0.0 def reset(self) -> None: """Reset the timer back to 0 """ if self._end is None: raise exceptions.TimerRunning(f"The timer is running and cannot be reset") self.accumulated = 0.0 def end(self) -> float: """End the timer and log the time elapsed Returns: The time elapsed in seconds. """ super().end() return self.accumulated def elapsed(self) -> float: """Log the time elapsed Can be used explicitly to log the time since a timer started without ending the timer. Returns: The time elapsed in seconds. """ try: time_elapsed = super().elapsed() except exceptions.TimerNotRunning: time_elapsed = 0 self.accumulated += time_elapsed return time_elapsed def _log(self, time_elapsed: float) -> None: """Do the actual logging of elapsed time Args: The time elapsed in seconds. """ time_text = f"{time_elapsed:{self.fmt}}/{self.accumulated:{self.fmt}} seconds" if self.text: self.logger(self.text.format(time_text)) PKLmidgard/dev/util.pyPKmLmidgard/files/__init__.pyPK&bMmidgard/files/dates.py"""Convenience functions for working with dates Description: ------------ Formats and converters that can be used for convenience and consistency. """ # Standard library imports import datetime from typing import Dict, Optional, Union # Formats that can be passed to datetime.strftime, see http://strftime.org/ FMT_date = "%Y-%m-%d" FMT_datetime = "%Y-%m-%d %H:%M:%S" FMT_dt_file = "%Y%m%d-%H%M%S" def date_vars(date: Optional[Union[datetime.date, datetime.datetime]]) -> Dict[str, str]: """Construct a dict of date variables From a given date, construct a dict containing all relevant date variables. This dict can be used to for instance replace variables in file names. Examples: >>> from datetime import date >>> date_vars(date(2009, 11, 2)) # doctest: +NORMALIZE_WHITESPACE {'yyyy': '2009', 'ce': '20', 'yy': '09', 'm': '11', 'mm': '11', 'mmm': 'nov', 'MMM': 'NOV', 'd': '2', 'dd': '02', 'doy': '306', 'dow': '1', 'h': '0', 'hh': '00'} >>> date_vars(None) {} Args: date: The given date. Returns: Dictionary with date variables for the given date. """ if date is None: return dict() # Create the dict of date variables return dict( yyyy=date.strftime("%Y"), ce=date.strftime("%Y")[:2], yy=date.strftime("%y"), m=str(date.month), mm=date.strftime("%m"), mmm=date.strftime("%b").lower(), MMM=date.strftime("%b").upper(), d=str(date.day), dd=date.strftime("%d"), doy=date.strftime("%j"), dow=date.strftime("%w"), h=date.strftime("%-H"), hh=date.strftime("%H"), ) PKƱLmidgard/files/dependencies.pyPKLmidgard/files/files.pyPKӀXM=;ȷ""midgard/ionosphere/klobuchar.py#!/usr/bin/env python3 # -*- encoding=utf8 -*- # Author : Mohammed Ouassou # Date : October 11, 2018 # Organization : NMA, Geodetic Institute prolog = """ **PROGRAM** klobuchar_model.py **PURPOSE** compute the ionospheric time-delay correction for the single-frequency by broadcast model (klobuchar model). GPS and Beidu satellite navigation systems use this model. The implementation is based on original paper of Klobuchar, J.A. Ionospheric Time-Delay Algorithm for Single-Frequency GPS Users https://scinapse.io/papers/2058160370 **USAGE** """ epilog = """ **EXAMPLE** klobuchar_model.py(time, ion_coeffs, rec_pos, azel) args: time (I) : GPST ion_coeffs (I) : iono model parameters {a0,a1,a2,a3,b0,b1,b2,b3} as vector rec_pos (I) : receiver position {lat,lon,h} [rad, rad, m] as vector azel (I) : azimuth/elevation angle {az,el} [rad] as vector freq (I) : string, e.g. L1, L2, L5 (TODO: Not implemented) logger (I) : Function that logs l_result (O) : list containing the following parameters L1_delay : computed path delay on L1 [m] L1_variance: correspong variance [m^2] **COPYRIGHT** | Copyright 2018, by the Geodetic Institute, NMA | All rights reserved **AUTHORS** | Mohammed Ouassou | Geodetic Institute, NMA | Kartverksveien 21, N-3511 | Hønefoss, Norway Keywords: Klobuchar model, Nequick broadcast model """ # # ================================ # # [1] Import system modules # # " =============================== # import argparse import numpy as np import scipy.constants as sp_c # ====================================== # # [2] import user defined modules (TBD) # # ====================================== # # ======================================== # # FUNCTION 1: get_my_parser() # # ======================================== # def get_my_parser(): parser = argparse.ArgumentParser( description=prolog, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter ) # parser.add_argument('-t', action='store', dest='input', required=True, help=' required GPST ') # parser.add_argument('-r', action='store', dest='input', required=True, help=' required receiver position {lat,lon,h} (rad,m) as vector'); # parser.add_argument('-p', action='store', dest='input', required=True, help=' iono model parameters {a0,a1,a2,a3,b0,b1,b2,b3} as vector'); # parser.add_argument('-a', action='store', dest='input', required=True, help=' azimuth/elevation angle {az,el} [rad]'); return parser # ============================================ # # FUNCTION 2: Klobuchar Broadcast algorithm # # ============================================ # def klobuchar(t, ion_coeffs, rec_pos, az, el, logger=print): """Compute the ionospheric time-delay correction for the single-frequency by broadcast model (klobuchar model) GPS and Beidu satellite navigation systems use this model. The implementation is based on original paper of Klobuchar, J.A. Ionospheric Time-Delay Algorithm for Single-Frequency GPS Users https://scinapse.io/papers/2058160370 Args: time: GPST ion_coeffs: iono model parameters {a0,a1,a2,a3,b0,b1,b2,b3} as vector rec_pos: receiver position {lat,lon,h} [rad, rad, m] as vector az: azimuth angle [rad] el: elevation angle [rad] logger: Function that logs Returns: L1_delay: computed path delay on L1 [m] L1_variance: corresponding variance [m^2] """ # variables declaration PI = np.pi CLIGHT = sp_c.c alpha, beta = ion_coeffs[:4], ion_coeffs[4:] # check the input args if len(ion_coeffs) != 8: raise ValueError(f"klobuchar_model()::number of iono coefficients={len(ion_coeffs)}, required 8") logger(" klobuchar_model():: input ionosphere parameters (alpha's and beta's) are:") logger(f" \t Alpha coeffs= {alpha[0]:.2E},{alpha[1]:.2E},{alpha[2]:.2E},{alpha[3]:.2E}") logger(f" \t Beta coeffs = {beta[0]:.2E},{beta[1]:.2E},{beta[2]:.2E},{beta[3]:.2E}") # input data checks if rec_pos[2] < -1e3 or el <= 0.0: raise ValueError( f"klobuchar_model():: Invalid input parameters --> site height={rec_pos[2]:.2f}, elevation={el:.2f} [radians]" ) if np.linalg.norm(ion_coeffs, ord=8) <= 0.0: raise ValueError( "klobuchar_model():: Invalid input parameters --> missing ionosphere model parameters (a0,a1,a2,a3,b0,b1,b2,b3) .." ) # ==================================================== # # 1. calculate the Earth centered angle (semi-circle) # # ==================================================== # psi = 0.0137 / (el / PI + 0.11) - 0.022 # ==================================================== # # 2. sub-ionospheric latitude/longitude (semi-circle) # # ==================================================== # phi = rec_pos[0] / PI + psi * np.cos(az) phi = 0.416 if phi > 0.416 else -0.416 phi_ = phi # ==================================================== # # 3. compute the sub-ionospheric longitude # # ==================================================== # lam = rec_pos[1] / PI + psi * np.sin(az) / np.cos(phi * PI) # ==================================================== # # 4. compute geomagnetic latitude (semi-circle) # # ==================================================== # phi += 0.064 * np.cos((lam - 1.617) * PI) # ==================================================== # # 5. find the local time (s) # # ==================================================== # # tt = 43200.0*lam + time2gpst(t, week); tt = t tt -= np.floor(tt / 86400.0) * 86400.0 # 0<=tt<86400 # ==================================================== # # 6. compute the slant factor # # ==================================================== # f = 1.0 + 16.0 * (0.53 - el / PI) ** 3 # elevation angle shall be in cycle # ==================================================== # # 7. compute the ionospheric time delay # # ==================================================== # amp = ion_coeffs[0] + phi * (ion_coeffs[1] + phi * (ion_coeffs[2] + phi * ion_coeffs[3])) # compute the amplitude per = ion_coeffs[4] + phi * (ion_coeffs[5] + phi * (ion_coeffs[6] + phi * ion_coeffs[7])) # compute the periode amp = 0.0 if amp < 0.0 else amp per = 72000.0 if per < 72000.0 else per x = 2.0 * PI * (tt - 50400.0) / per L1_delay = ( CLIGHT * f * (5e-9 + amp * (1.0 + x * x * (-0.5 + x * x / 24.0))) if (np.fabs(x) < 1.57) else CLIGHT * f * 5e-9 ) # ========================================================= # # define ERR_BRDCI 0.5: broadcast iono model error factor # # ========================================================= # L1_variance = (L1_delay * 0.5) ** 2 # debuging info logger(" =================================== OUTPUT ============================================") logger(f"\t[1] Earth-centered angle = {psi:10.5f} [semicircles]") logger(f"\t[2] sub-ionospheric latitude = {phi_:10.5f} [semicircles]") logger(f"\t[3] sub-ionospheric longitude = {lam:10.5f} [semicircles]") logger(f"\t[4] geomagnetic latitude = {phi:10.5f} [semicircles]") logger(f"\t[5] local time = {tt:10.5f} [seconds]") logger(f"\t[6] slant factor = {f:10.5f} ") logger( f"\t[7] ionosphere delay on L1 and the corresponding variance are: {L1_delay:.5f} (m) and {L1_variance:.5f} (m^2)" ) logger(" ================================================================================================") return L1_delay, L1_variance def main(): # read command line arguments parser = get_my_parser() results = parser.parse_args() # ================================================ # # these values are copied from Klobuchar trest # # ================================================ # tt = 50700.0 ion_coeffs = np.array([3.82e-8, 1.49e-8, -1.79e-7, 0, 1.43e5, 0.0, -3.28e5, 1.13e5]) rec_pos = np.array([40.0 / 180.0, -100.0 / 180.0, 170]) az = 240.0 / 180 el = 20.0 / 180 delay, variance = klobuchar(tt, ion_coeffs, rec_pos, az, el) # user info print(f" Ionospheric path delay on L1= {delay:.5f} [m] and the corresponding variance={variance:.5f} [m^2]") if __name__ == "__main__": main() PKLmidgard/math/__init__.pyPKbMe.e.midgard/math/interpolation.py"""Methods for interpolating in numpy arrays Description: ------------ Different interpolation methods are decorated with `@register_interpolator` and will then become available for use as `kind` in `interpolate` and `moving_window`. Example: -------- >>> import numpy as np >>> np.set_printoptions(precision=3, suppress=True) >>> x = np.linspace(-1, 1, 11) >>> y = x**3 - x >>> y array([ 0. , 0.288, 0.384, 0.336, 0.192, 0. , -0.192, -0.336, -0.384, -0.288, 0. ]) >>> x_new = np.linspace(-0.8, 0.8, 11) >>> interpolate(x, y, x_new, kind='cubic') array([ 0.288, 0.378, 0.369, 0.287, 0.156, -0. , -0.156, -0.287, -0.369, -0.378, -0.288]) Developer info: --------------- To add your own interpolators, you can simply decorate your interpolator functions with `@register_interpolator`. Your interpolator function should have the signature (x: np.ndarray, y: np.ndarray) -> Callable For instance, the following would implement a terrible interpolation function that sets all values to zero: from midgard.math.interpolation import register_interpolator @register_interpolator def zero(x: np.ndarray, y: np.ndarray) -> Callable: def _zero(x_new: np.ndarray) -> np.ndarray: return np.zeros(y.shape) return _zero This function would then be available as an interpolator. For instance, one could do >>> interpolate(x, y, x_new, kind='zero') # doctest: +SKIP array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) """ # System library imports from typing import Any, Callable, Dict, List # Third party imports import numpy as np import scipy.interpolate import scipy.misc # Midgard imports from midgard.dev import exceptions # Dictionary of Enumerations. Populated by the @register_enum-decorators. _INTERPOLATORS: Dict[str, Callable] = dict() def register_interpolator(func: Callable) -> Callable: """Register an interpolation function This function should be used as a @register_interpolator-decorator Args: func: Function that will be registered as an interpolator. Returns: Same function. """ name = func.__name__ _INTERPOLATORS[name] = func return func def interpolators() -> List[str]: """Return a list of available interpolators Returns: Names of available interpolators. """ return sorted(_INTERPOLATORS) def get_interpolator(name: str) -> Callable: """Return an interpolation function Interpolation functions are registered by the @register_interpolator-decorator. The name-parameter corresponds to the function name of the interpolator. Args: name: Name of interpolator. Returns: Interpolation function with the given name. """ try: return _INTERPOLATORS[name] except KeyError: interpolator_list = ", ".join(interpolators()) raise exceptions.UnknownPluginError( f"Interpolator '{name}' is not defined. Available interpolators are {interpolator_list}." ) from None def interpolate(x: np.ndarray, y: np.ndarray, x_new: np.ndarray, *, kind: str, **ipargs: Any) -> np.ndarray: """Interpolate values from one x-array to another See `interpolators()` for a list of valid interpolators. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. x_new: 1-dimensional array with new x-values. kind: Name of interpolator to use. ipargs: Keyword arguments passed on to the interpolator. Returns: Array of interpolated y-values. """ interpolator = get_interpolator(kind)(x, y, **ipargs) return interpolator(x_new) def interpolate_with_derivative( x: np.ndarray, y: np.ndarray, x_new: np.ndarray, *, kind: str, dx: float = 0.5, **ipargs: Any ) -> np.ndarray: """Interpolate values from one x-array to another as well as find derivatives See `interpolators()` for a list of valid interpolators. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. x_new: 1-dimensional array with new x-values. kind: Name of interpolator to use. dx: Values at x ± dx are used to determine derivative. ipargs: Keyword arguments passed on to the interpolator. Returns: Tuple with array of interpolated y-values and array of derivatives. """ interpolator = get_interpolator(kind)(x, y, **ipargs) y_new = interpolator(x_new) y_dot = scipy.misc.derivative(interpolator, x_new, dx=dx) return y_new, y_dot @register_interpolator def lagrange( x: np.ndarray, y: np.ndarray, *, window: int = 10, bounds_error: bool = True, assume_sorted: bool = False ) -> Callable: """Computes the lagrange polynomial passing through a certain set of points See https://en.wikipedia.org/wiki/Lagrange_polynomial Uses `window` of the original points to calculate the Lagrange polynomials. The window of points is chosen by finding the closest original point and essentially picking the `window // 2` indices on either side. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. window: Number of points used in interpolation. bounds_error: If True, a ValueError is raised if extrapolation is attempted. assume_sorted: If True, x must be an array of monotonically increasing values. Returns: Lagrange interpolation function. """ # Check input if x.ndim != 1: raise ValueError(f"The x array must have exactly one dimension, currently x.ndim={x.ndim}.") if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") if len(y) != len(x): raise ValueError("x and y arrays must be equal in length along the first axis.") if window < 3: raise ValueError("The window should be at least 3") if window > len(x): raise ValueError(f"x and y arrays must have at least window={window} entries") # Sort the input according to the x-array if not assume_sorted: sort_idxs = np.argsort(x) x, y = x[sort_idxs], y[sort_idxs] # Check that x values are monotonically increasing if not all(np.diff(x) > 0): raise ValueError("expected x to be a sorted array with unique values") # Rescale x values to avoid numerical instability _xm, _xs = x.mean(), x.std() x_scaled = (x - _xm) / _xs # Indices to use during calculation of polynomial values indices = np.eye(window) == 0 def _lagrange(x_new: np.ndarray) -> np.ndarray: """Interpolate using a Lagrange polynomial""" if bounds_error and x_new.min() < x.min(): raise ValueError(f"Value {x_new.min()} in x_new is below the interpolation range {x.min()}.") if bounds_error and x_new.max() > x.max(): raise ValueError(f"Value {x_new.max()} in x_new is above the interpolation range {x.max()}.") y_new = np.zeros(x_new.shape[:1] + y.shape[1:]) x_new_scaled = (x_new - _xm) / _xs # Figure out which points to use for the interpolation start_idxs = np.abs(x[:, None] - x_new[None, :]).argmin(axis=0) - window // 2 start_idxs[start_idxs < 0] = 0 start_idxs[start_idxs > len(x) - window] = len(x) - window # Interpolate for each unique set of interpolation points for idx in np.unique(start_idxs): y_idx = start_idxs == idx x_wd, y_wd = x_scaled[idx : idx + window], y[idx : idx + window] diff_x = np.subtract(*np.meshgrid(x_wd, x_wd)) + np.eye(window) r = np.array( [ np.prod((x_new_scaled[y_idx, None] - x_wd[idxs]) / diff_x[idxs, i], axis=1) for i, idxs in enumerate(indices) ] ) y_new[y_idx] = r.T @ y_wd return y_new return _lagrange @register_interpolator def linear(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """Linear interpolation through the given points Uses the scipy.interpolate.interp1d function with kind='linear' behind the scenes. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the interp1d-interpolator. Returns: Linear interpolation function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") # Interpolate along axis=0 by default ipargs.setdefault("axis", 0) return scipy.interpolate.interp1d(x, y, kind="linear", **ipargs) @register_interpolator def cubic(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """Cubic spline interpolation through the given points Uses the scipy.interpolate.interp1d function with kind='cubic' behind the scenes. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the interp1d-interpolator. Returns: Cubic spline interpolation function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") # Interpolate along axis=0 by default ipargs.setdefault("axis", 0) return scipy.interpolate.interp1d(x, y, kind="cubic", **ipargs) @register_interpolator def interpolated_univariate_spline(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """One-dimensional interpolating spline for the given points Uses the scipy.interpolate.InterpolatedUnivariateSpline function behind the scenes. The original only deals with one-dimensional y arrays, so multiple calls are made for higher dimensional y arrays. The dimensions are handled independently of each other. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the scipy-interpolator. Returns: Interpolating spline function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") if y.ndim == 1: return scipy.interpolate.InterpolatedUnivariateSpline(x, y, **ipargs) # Loop over columns in y for higher dimensions def _interpolated_univariate_spline(x_new: np.ndarray) -> np.ndarray: """Interpolate using an interpolating spline""" first_y = (slice(len(y)),) first_new = (slice(len(x_new)),) y_new = np.zeros(x_new.shape[:1] + y.shape[1:]) for last_cols in np.ndindex(y.shape[1:]): idx_new = first_new + last_cols idx_y = first_y + last_cols y_new[idx_new] = scipy.interpolate.InterpolatedUnivariateSpline(x, y[idx_y], **ipargs)(x_new) return y_new return _interpolated_univariate_spline @register_interpolator def barycentric_interpolator(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """The interpolating polynomial through the given points Uses the scipy.interpolate.BarycentricInterpolator function behind the scenes. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the scipy-interpolator. Returns: Barycentric interpolation function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") return scipy.interpolate.BarycentricInterpolator(x, y, **ipargs) PKLmidgard/math/rotation.pyPK bM_:_:midgard/math/unit.py"""Midgard library module for handling of SI-unit conversions Description: ------------ This module provides unit conversion constants and functions. The heavy lifting is done by the `pint` package. The basic usage is as follows: >>> from midgard.math.unit import Unit >>> seconds_in_two_weeks = 2 * Unit.week2secs >>> seconds_in_two_weeks 1209600.0 In general `Unit.spam2ham` will give the multiplicative conversion scale between the units `spam` and `ham`. Through the `pint` package we support a lot of units. See `Unit.list()` or `https://github.com/hgrecco/pint/blob/master/pint/default_en.txt`. Another notation is also available, and might be necessary for some more complicated conversions: >>> seconds_in_two_weeks = 2 * Unit('week', 'seconds') >>> miles_per_hour_in_meters_per_second = Unit('mph', 'meters / sec') Do note that we support most normal aliases as well as singular and plural forms of the units. For instance can `second` be represented as `s`, `sec`, `secs` and `seconds`. Prefixes are also handled: >>> nanoseconds_in_an_hour = Unit.hour2nanosecs >>> inches_in_a_kilometer = Unit.km2inches For more complicated conversions (for instance from Celsius to Fahrenheit) one can create custom conversion functions using `convert`: >>> c2f = Unit.function('celsius', 'fahrenheit') >>> absolute_zero_in_fahrenheit = c2f(-273.15) For convenience, this can also be written using the attribute notation as `Unit.spam_to_ham(spam_value)`. Then the previous example simply becomes: >>> absolute_zero_in_fahrenheit = Unit.celsius_to_fahrenheit(-273.15) (or even easier `Unit.kelvin_to_fahrenheit(0)`). Finally, we can access the unit/quantity system of `pint` by using the name of a unit by itself, e.g. `Unit.spam`. For instance: >>> distance = 42 * Unit.km >>> time = 31 * Unit('minutes') >>> speed = distance / time >>> speed.to(Unit.mph) >>> speed.to_base_units() However, using the full unit system adds some overhead so we should be careful in using it in heavy calculations. """ # Standard library imports import pathlib from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union # External library imports import numpy as np import pint # Midgard imports # from midgard.dev import cache # TODO from midgard.dev import exceptions # Type that can be either float or numpy array np_float = TypeVar("np_float", float, np.array) # The _UNITS-dict is used to keep track of units values returned by functions and methods _UNITS: Dict[str, Dict[str, str]] = dict() class _convert_units(type): """A meta-class that does the parsing of units The meta-class is used for convenience. It allows us to use the `Unit`-class without instantiating it. That is, we can write `Unit.km2m` instead of `Unit().km2m`. """ _ureg = pint.UnitRegistry() # @cache.function # TODO def __call__(cls, from_unit: str, to_unit: Optional[str] = None) -> Any: # type: ignore """Calculate the conversion scale between from_unit and to_unit If `to_unit` is not given, then `from_unit` is returned as a `pint` Quantity. Args: from_unit: The unit to convert from. to_unit: The unit to convert to. Returns: Scale to multiply by to convert from from_unit to to_unit, or from_unit as a Quantity. """ if to_unit is None: return cls._ureg(from_unit) else: return cls._ureg(from_unit).to(to_unit).magnitude def __getattr__(cls, key: str) -> Any: """Simplify notation for converting between units This makes it possible to type `Unit.km2m` instead of `Unit('km', 'm')`. We split on the character `2` (pronounced "to"), and pass the result on to `__call__` to do the conversion. If a `2` is not found, we check if we can split on '_to_' instead, if so it is interpreted as a conversion function and is handed of to `convert`. Finally, if no split is done, the attribute is interpreted as a simple unit. Note that if you need a unit whose name contains a '2' (or '_to_') you need to use the notation `Unit('foot_H2O', 'pascal'). Similarly, more complex units need the same notation, e.g. `Unit('meters per second ** 2')`. Args: key: The key (name) of the attribute to the class. Interpreted as units. Returns: Scale to multiply by or function to perform the unit conversion, or Quantity. """ if "2" in key: from_unit, _, to_unit = key.partition("2") return cls(from_unit, to_unit) elif "_to_" in key: from_unit, _, to_unit = key.partition("_to_") return cls.function(from_unit, to_unit) else: return cls(key) def load_definitions(cls, file_path: Union[str, pathlib.Path]) -> None: """Load customized units and constants Piggybacking on `pint`'s system for defining new units and constants, `http://pint.readthedocs.io/en/latest/defining.html`. Args: file_path: File containing definitions of units and constants. """ with open(file_path, mode="rt") as fid: cls._ureg.load_definitions(fid) def function(cls, from_unit: str, to_unit: str) -> Callable[[float], float]: """Create a conversion function This is necessary for unit conversions that are not simple multiplications. The usual example is temperature conversions for instance from Celsius to Fahrenheit. Args: from_unit: The unit to convert from. to_unit: The unit to convert to. Returns: Conversion function that converts from from_unit to to_unit. """ return lambda value: cls._ureg.Quantity(value, cls._ureg(from_unit)).to(cls._ureg(to_unit)).magnitude def register(cls, unit: str) -> Callable: """Register unit of a function/method/property This method should be used as a decorator on the function/method/property, and specify the unit of the value returned by that function/method/property. For instance @property @Unit.register('meter') def calculate_delay(...): return delay_in_meters Units registered with this decorator can be used by the functions returned by the `unit_func_factory`, `convert_func_factory` and `factor_func_factory`. Args: unit: Name of unit. Returns: Decorator that registers the unit. """ def register_decorator(func: Callable) -> Callable: """Register unit of func in _UNITS-dictionary""" module_name = func.__module__ func_name = func.__name__ _UNITS.setdefault(module_name, dict())[func_name] = unit return func return register_decorator @staticmethod def _get_unit(module_name: str, func_name: str) -> str: """Get registered unit of function/method/property Outside code should use the `unit_factory` to get registered units. Args: module_name: Name of module containing function/method/property. func_name: Name of function/method/property with registered unit. Returns: Name of unit. """ units = _UNITS.get(module_name, dict()) try: return units[func_name] except KeyError: raise exceptions.UnitError(f"No unit is registered for {func_name!r} in {module_name!r}") from None def unit_factory(cls, module_name: str) -> Callable[[str], str]: """Provide a function that can get registered units of functions/methods/properties The function checks for units registered with the unit.register-decorator. It can for instance be added to a class as follows: unit = staticmethod(Unit.unit_factory(__name__)) Args: module_name: Name of module as returned by `__name__`. Returns: Function that gets unit of values returned by functions. """ def unit(func_name: str) -> str: """Unit of value returned by function/method/property Args: func_name (String): Name of function/method/property. Returns: String: Name of unit. """ return cls._get_unit(module_name, func_name) return unit def convert_factory(cls, module_name: str) -> Callable[[object, str, str], float]: """Provide a function that can convert values of properties to a given unit The function checks for units registered with the unit.register-decorator. It can for instance be added to a class as follows: convert_to = Unit.convert_factory(__name__) Note that unlike the other factories, this one only works for properties. Args: module_name: Name of module as returned by `__name__`. Returns: Function that converts values of properties. """ def convert(self: object, property_name: str, to_unit: str) -> float: """Convert value of property to another unit Args: property_name: Name of property. to_unit: Name of other unit. Returns: Value of property converted to other unit. """ from_unit = cls._get_unit(module_name, property_name) factor = cls(from_unit, to_unit) return getattr(self, property_name) * factor return convert def factor_factory(cls, module_name: str) -> Callable[[str, str], float]: """Provide a function that calculates conversion factor to another unit The function finds conversion factors for units registered with the unit.register-decorator. It can for instance be added to a class as follows: unit_factor = staticmethod(Unit.factor_factory(__name__)) Args: module_name: Name of module as returned by `__name__`. Returns: Function that calculates conversion factor to another unit. """ def factor(func_name: str, to_unit: str) -> float: """Conversion factor between unit of function/method/property and another unit Args: func_name: Name of function/method/property. to_unit: Name of other unit. Returns: Conversion factor. """ from_unit = cls._get_unit(module_name, func_name) return cls(from_unit, to_unit) return factor def units_dict(cls, module_name: str) -> Dict[str, str]: """Dictionary of units registered on a module Add a sub-dictionary if the module name is unknown, to set up a reference in case units are registered later. Args: module_name: Name of module. Returns: Dictionary with units registered on a module. """ return _UNITS.setdefault(module_name, dict()) @property def names(cls) -> List[str]: """List available units and constants The list of available units contains aliases (for instance s, sec, second), but not plural forms (secs, seconds) or possible prefixes (milliseconds, usec, ms). Returns: List of names of available units and constants """ return dir(cls._ureg) class Unit(metaclass=_convert_units): """Unit converter The implementation of the unit conversion is done in the `_convert_units`-metaclass. """ # Make pint exceptions available from pint.errors import DimensionalityError # # Conversion routines not defined by pint # @classmethod def rad_to_dms(cls, radians: np_float) -> Tuple[np_float, np_float, np_float]: """Converts radians to degrees, minutes and seconds Args: radians: Angle(s) in radians Returns: Tuple with degrees, minutes, and seconds. Examples: >>> Unit.rad_to_dms(1.04570587646256) (59.0, 54.0, 52.3200000000179) >>> Unit.rad_to_dms(-0.2196050301753194) (-12.0, 34.0, 56.78900000000468) >>> Unit.rad_to_dms(-0.005817642339636369) (-0.0, 19.0, 59.974869999999925) """ sign = np.sign(radians) degrees = abs(radians) * cls.radians2degrees minutes = (degrees % 1) * cls.hour2minutes seconds = (minutes % 1) * cls.minute2seconds return sign * np.floor(degrees), np.floor(minutes), seconds @classmethod def dms_to_rad(cls, degrees: np_float, minutes: np_float, seconds: np_float) -> np_float: """Convert degrees, minutes and seconds to radians The sign of degrees will be used. In this case, be careful that the sign of +0 or -0 is correctly passed on. That is, degrees must be specified as a float, not an int. Args: degrees: Degrees as float (including sign) or array of floats minutes: Minutes as int/float or array of ints/floats seconds: Seconds as float or array of floats Returns: Given degrees, minutes and seconds as radians. Examples: >>> Unit.dms_to_rad(59, 54, 52.32) 1.04570587646256 >>> Unit.dms_to_rad(-12.0, 34, 56.789) -0.21960503017531938 >>> Unit.dms_to_rad(-0.0, 19, 59.974870) -0.005817642339636369 """ sign = np.copysign(1, degrees) return ( sign * (np.abs(degrees) + minutes * cls.minutes2hours + seconds * cls.seconds2hours) * cls.degrees2radians ) @classmethod def hms_to_rad(cls, hours: np_float, minutes: np_float, seconds: np_float) -> np_float: """Convert hours, minutes and seconds to radians Args: hours: Hours as int or array of ints minutes: Minutes as int or or array of ints seconds: Seconds as float or or array of floats Returns: Given hours, minutes and seconds as radians. Examples: >>> Unit.hms_to_rad(17, 7, 17.753427) 4.482423920139868 >>> Unit.hms_to_rad(12, 0, 0.00) 3.1415926535897936 >>> Unit.hms_to_rad(-12, 34, 56.789) Traceback (most recent call last): ValueError: hours must be non-negative """ if np.any(np.array(hours) < 0): raise ValueError("hours must be non-negative") return 15 * cls.dms_to_rad(hours, minutes, seconds) PKbMTt\ midgard/parsers/__init__.py"""Framework for parsers Description: ------------ To add a new parser, simply create a new .py-file which defines a class inheriting from parsers.Parser. The class needs to be decorated with the `midgard.dev.plugins.register` decorator as follows: from midgard.parsers import parser from midgard.lib import plugins @plugins.register class MyNewParser(parser.Parser): ... To use a parser, you will typically use the `parse_file`-function defined below from midgard import parsers my_new_parser = parsers.parse_file('my_new_parser', 'file_name.txt', ...) my_data = my_new_parser.as_dict() The name used in `parse_file` to call the parser is the name of the module (file) containing the parser. """ # Standard library imports import pathlib from typing import Any, Callable, List, Optional, Union # Midgard imports from midgard.dev import plugins from midgard.dev.timer import Timer # Make base Parser-classes available at package level from midgard.parsers._parser import Parser # noqa from midgard.parsers._parser_chain import ParserDef, ChainParser # noqa from midgard.parsers._parser_line import LineParser # noqa from midgard.parsers._parser_rinex import RinexParser, RinexHeader # noqa from midgard.parsers._parser_sinex import SinexParser, SinexBlock, SinexField # noqa def parse_file( parser_name: str, file_path: Union[str, pathlib.Path], encoding: Optional[str] = None, parser_logger: Optional[Callable[[str], None]] = print, timer_logger: Optional[Callable[[str], None]] = None, use_cache: bool = False, **parser_args: Any, ) -> Parser: """Use the given parser on a file and return parsed data Specify `parser_name` and `file_path` to the file that should be parsed. The following parsers are available: {doc_parser_names} Data can be retrieved either as Dictionaries, Pandas DataFrames or Midgard Datasets by using one of the methods `as_dict`, `as_dataframe` or `as_dataset`. Example: >>> df = parse_file('rinex2_obs', 'ande3160.16o').as_dataframe() # doctest: +SKIP Args: parser_name: Name of parser file_path: Path to file that should be parsed. encoding: Encoding in file that is parsed. parser_logger: Logging function that will be used by parser. timer_logger: Logging function that will be used to log timing information. use_cache: Whether to use a cache to avoid parsing the same file several times. parser_args: Input arguments to the parser Returns: Parser: Parser with the parsed data """ # TODO: Cache # Create the parser and parse the data parser = plugins.call( package_name=__name__, plugin_name=parser_name, file_path=file_path, encoding=encoding, logger=parser_logger, **parser_args, ) with Timer(f"Finish {parser_name} ({__name__}) - {file_path} in", logger=timer_logger): return parser.parse() def names() -> List[str]: """List the names of the available parsers Returns: Names of the available parsers """ return plugins.names(package_name=__name__) PK3bM%QSmidgard/parsers/_parser.py"""Basic functionality for parsing datafiles, extended by individual parsers Description: ------------ This module contains functions and classes for parsing datafiles. It should typically be used by calling `parsers.parse_file`: Example: -------- from midgard import parsers my_new_parser = parsers.parse_file('my_new_parser', 'file_name.txt', ...) my_data = my_new_parser.as_dict() """ # Standard library imports import pathlib from typing import Any, Callable, Dict, List, NoReturn, Optional, Union import warnings # External library imports import pandas as pd class Parser: """An abstract base class that has basic methods for parsing a datafile This class provides functionality for parsing a file. You should inherit from one of the specific parsers like for instance ChainParser, LineParser, SinexParser etc Attributes: file_path (Path): Path to the datafile that will be read. file_encoding (String): Encoding of the datafile. parser_name (String): Name of the parser (as needed to call parsers.parse_...). data_available (Boolean): Indicator of whether data are available. data (Dict): The (observation) data read from file. meta (Dict): Metainformation read from file. """ def __init__( self, file_path: Union[str, pathlib.Path], encoding: Optional[str] = None, logger: Optional[Callable[[str], None]] = print, ) -> None: """Set up the basic information needed by the parser Args: file_path: Path to file that will be read. encoding: Encoding of file that will be read. logger: Function that will be used for logging. """ self.file_path = pathlib.Path(file_path) self.file_encoding = encoding self.parser_name = self.__module__.split(".")[-1] self.logger = (lambda _: None) if logger is None else logger # Initialize the data self.data_available = self.file_path.exists() self.meta: Dict[str, Any] = dict(__parser_name__=self.parser_name, __data_path__=self.file_path) self.data: Dict[str, Any] = dict() def setup_parser(self) -> Any: """Set up information needed for the parser""" pass def setup_postprocessors(self) -> List[Callable[[], None]]: """List postprocessors that should be called after parsing""" return list() def parse(self) -> "Parser": """Parse data This is a basic implementation that carries out the whole pipeline of reading and parsing datafiles including calculating secondary data. Subclasses should typically implement (at least) the `read_data`-method. """ self.setup_parser() if self.data_available: self.read_data() if not self.data_available: # May have been set to False by self.read_data() warnings.warn(f"No data found by {self.__class__.__name__} in {self.file_path}") return self self.postprocess_data() return self def read_data(self) -> None: """Read data from the data file Data should be read from `self.file_path` and stored in the dictionary `self.data`. A description of the data may be placed in the dictionary `self.meta`. If data are not available for some reason, `self.data_available` should be set to False. """ raise NotImplementedError def postprocess_data(self) -> None: """Do simple manipulations on the data after they are read Simple manipulations of data may be performed in postprocessors after they are read. They should be kept simple so that a parser returns as true representation of the data file as possible. Advanced calculations may be done inside apriori classes or similar. To add a postprocessor, define it in its own method, and override the `setup_postprocessors`-method to return a list of all postprocessors. """ for postprocessor in self.setup_postprocessors(): postprocessor() def as_dict(self, include_meta: bool = False) -> Dict[str, Any]: """Return the parsed data as a dictionary This is a basic implementation, simply returning a copy of self.data. More advanced parsers may need to reimplement this method. Args: include_meta: Whether to include meta-data in the returned dictionary (default: False). Returns: Dictionary with the parsed data. """ return dict(self.data, __meta__=self.meta) if include_meta else self.data.copy() def as_dataframe(self, index: Optional[Union[str, List[str]]] = None) -> pd.DataFrame: """Return the parsed data as a Pandas DataFrame This is a basic implementation, assuming the `self.data`-dictionary has a simple structure. More advanced parsers may need to reimplement this method. Args: index: Optional name of field to use as index. May also be a list of strings. Returns: Pandas DataFrame with the parsed data. """ df = pd.DataFrame.from_dict(self.data) if index is not None: df.set_index(index, drop=True, inplace=True) return df def as_dataset(self) -> NoReturn: """Return the parsed data as a Midgard Dataset This is a basic implementation, assuming the `self.data`-dictionary has a simple structure. More advanced parsers may need to reimplement this method. Returns: Dataset: The parsed data. """ # from midgard import data # # dset = data.Dataset.from_dict(self.data) # # return dset raise NotImplementedError def update_dataset(self, dset: Any) -> NoReturn: """Update the given dataset with the parsed data This is a basic implementation, assuming the `self.data`-dictionary has a simple structure. More advanced parsers may need to reimplement this method. Args: dset: The dataset to update with parsed data. """ # parser_dset = self.as_dataset() # if new fields: # dset.add ... # elif new epochs: # dset.extend ... raise NotImplementedError def __repr__(self) -> str: """Simple string representation of the parser""" return f"{self.__class__.__name__}(file_path='{self.file_path}')" PKObMp"% midgard/parsers/_parser_chain.py"""Basic functionality for parsing datafiles line by line Description: ------------ This module contains functions and classes for parsing datafiles. Example: -------- from midgard import parsers my_new_parser = parsers.parse_file('my_new_parser', 'file_name.txt', ...) my_data = my_new_parser.as_dict() """ # Standard library imports import itertools from typing import Any, Callable, Dict, NamedTuple, Optional # Midgard imports from midgard.parsers._parser import Parser # A simple structure used to define the necessary fields of a parser class ParserDef(NamedTuple): """A convenience class for defining the necessary fields of a parser A single parser can read and parse one group of datalines, defined through the ParserDef by specifying how to parse each line (parser_def), how to identify each line (label), how to recognize the end of the group of lines (end_marker) and finally what (if anything) should be done after all lines in a group is read (end_callback). The end_marker, label, skip_line and end_callback parameters should all be functions with the following signatures: end_marker = func(line, line_num, next_line) label = func(line, line_num) skip_line = func(line) end_callback = func(cache) The parser definition `parser_def` includes the `parser`, `field`, `strip` and `delimiter` entries. The `parser` entry points to the parser function and the `field` entry defines how to separate the line in fields. The separated fields are saved either in a dictionary or in a list. In the last case the line is split on whitespace by default. With the `delimiter` entry the default definition can be overwritten. Leading and trailing whitespace characters are removed by default before a line is parsed. This default can be overwritten by defining the characters, which should be removed with the 'strip' entry. The `parser` dictionary is defined like: parser_def = {