PKTlN_e midgard/__init__.py"""Midgard, the Python Geodesy library Midgard is a collection of useful Python utilities used by the Geodetic institute at the Norwegian Mapping Authority (Kartverket). Although some of these are geodesy-specific, many are also useful in more general settings. Note: Midgard is still in pre-alpha status. Its functionality will change, and it should not be depended on in any production-like setting. Midgard comes organized into different subpackages: {subpackages} Look for help inside each subpackage: >>> from midgard import subpackage # doctest: +SKIP >>> help(subpackage) # doctest: +SKIP Current maintainers: -------------------- {maintainers} """ # Standard library imports from datetime import date as _date from collections import namedtuple as _namedtuple from pathlib import Path as _Path # Version of Midgard. # # This is automatically set using the bumpversion tool __version__ = "0.3.0" # Authors of Midgard. _Author = _namedtuple("_Author", ["name", "email", "start", "end"]) _AUTHORS = [ _Author("Michael Dähnn", "michael.daehnn@kartverket.no", _date.min, _date.max), _Author("Ingrid Fausk", "ingrid.fausk@kartverket.no", _date.min, _date.max), _Author("Ann-Silje Kirkvik", "ann-silje.kirkvik@kartverket.no", _date.min, _date.max), _Author("Mohammed Ouassou", "mohammed.ouassou@kartverket.no", _date(2018, 9, 1), _date.max), _Author("Hans Sverre Smalø", "hans.sverre.smalo@kartverket.no", _date(2018, 12, 1), _date.max), _Author("Geir Arne Hjelle", "geirarne@gmail.com", _date(2019, 2, 1), _date.max), # Hall of Fame _Author("Geir Arne Hjelle", "geir.arne.hjelle@kartverket.no", _date.min, _date(2019, 2, 1)), ] __author__ = ", ".join(a.name for a in _AUTHORS if a.start < _date.today() < a.end) __contact__ = ", ".join(a.email for a in _AUTHORS if a.start < _date.today() < a.end) # Copyleft of the library __copyright__ = f"2018 - {_date.today().year} Norwegian Mapping Authority" # Update doc with info about subpackages and maintainers def _update_doc(doc: str) -> str: """Add information to doc-string Args: doc: The doc-string to update. Returns: The updated doc-string. """ # Subpackages subpackage_paths = _Path(__file__).parent.iterdir() subpackage_list = [p.name for p in subpackage_paths if p.is_dir() and not p.name.startswith("_")] subpackages = "\n".join(f"+ {p}" for p in subpackage_list) # Maintainers maintainer_list = [f"+ {a.name} <{a.email}>" for a in _AUTHORS if a.start < _date.today() < a.end] maintainers = "\n".join(maintainer_list) # Add to doc-string return doc.format(subpackages=subpackages, maintainers=maintainers) __doc__ = _update_doc(__doc__) PKMLmidgard/collections/__init__.pyPKC Callable[[enum.EnumMeta], enum.EnumMeta]: """Register a named Enumeration This allows for getting Enumerations with the get_enum-function. Args: name: Name used for Enumeration. Returns: Decorator that registers an Enumeration. """ def register_decorator(enum_cls: enum.EnumMeta) -> enum.EnumMeta: _ENUMS[name] = enum_cls globals()[name] = enum_cls return enum_cls return register_decorator def enums() -> List[str]: """Return a list of available enums Returns: Names of available enums. """ return sorted(_ENUMS) def get_enum(name: str) -> enum.EnumMeta: """Return a named Enumeration Names are defined by the @register_enum-decorator. If the name-parameter is not a valid enum, the function will raise an UnknownEnumError and list the available enumerations. Args: name: Name used for Enumeration. Returns: Enumeration with the given name. """ try: return _ENUMS[name] except KeyError: valid_enums = ", ".join(e for e in _ENUMS) raise exceptions.UnknownEnumError( f"Enumeration '{name}' is not defined. Available enumerations are {valid_enums}." ) from None def get_value(name: str, value: str, default: Any = NotGiven) -> enum.Enum: """Return the value of a named Enumeration Names are defined by the @register_enum-decorator. Args: name: Name used for Enumeration. value: Value of Enumeration. default: Optional object returned if enumeration does not contain value Returns: Value of enumeration with the given name. """ try: return get_enum(name)[value] except KeyError: if default is NotGiven: valid_values = ", ".join(v.name for v in get_enum(name)) # type: ignore raise ValueError( f"Value '{value}' is not valid for a {name}-enumeration. Valid values are {valid_values}." ) from None else: return default def has_value(name: str, value: str) -> bool: """Check whether a named Enumeration defines a given value Args: name: Name used for Enumeration. value: Value of Enumeration. Returns: True if Enumeration defines value, False otherwise """ return value in get_enum(name).__members__ # # ENUMS # @register_enum("log_level") class LogLevel(int, enum.Enum): """Levels used when deciding how much log output to show""" all = enum.auto() debug = enum.auto() info = enum.auto() warn = enum.auto() error = enum.auto() fatal = enum.auto() none = enum.auto() @register_enum("log_color") class LogColor(str, enum.Enum): """Colors used when logging""" warn = color.Fore.YELLOW error = color.Fore.RED fatal = color.Style.BRIGHT + color.Fore.RED @register_enum("write_level") class WriteLevel(enum.IntEnum): """Levels used when deciding which fields of a dataset and other information to write to disk""" detail = enum.auto() analysis = enum.auto() operational = enum.auto() @register_enum("gnss_freq_G") class GPSFrequency(float, enum.Enum): """GPS frequencies""" L1 = 1575.42e+6 L2 = 1227.60e+6 L5 = 1176.45e+6 # Examples # from midgard.collections import enums # enums.get_value("gnss_freq_G", "L1") # enums.get_value("gnss_freq_G", "L1") + 1 # enums.get_enum("gnss_freq_G") # enums.get_enum("gnss_freq_G").L1 # enums.get_enum("gnss_freq_G").L1 + 1 # enums.gnss_freq_G.L1 # enums.gnss_freq_G.L1 * 2 PKOnAN'midgard/collections/dataset/__init__.pyPKOnAN&midgard/collections/dataset/dataset.pyPKOnAN$midgard/collections/dataset/table.pyPKMLmidgard/config/__init__.pyPKis#N,@midgard/config/config.py"""Midgard library module for handling of configuration settings Description: ------------ A Configuration consists of one or several sections. Each ConfigurationSection consists of one or more entries. Each ConfigurationEntry consists of a key and a value. Examples: --------- For basic use, an entry is looked up by simple attribute access. For instance if `cfg` is a Configuration with the section `midgard` which has an entry `foo = bar`: >>> cfg = Configuration("config_name") >>> cfg.update("midgard", "foo", "bar") >>> cfg.midgard.foo ConfigurationEntry(key='foo', value='bar') ConfigurationEntry has several access methods that convert the entry to a given data type: >>> cfg.update("midgard", "foo_pi", 3.14, source="command line") >>> cfg.midgard.foo_pi ConfigurationEntry(key='foo_pi', value='3.14') >>> cfg.midgard.foo_pi.float 3.14 >>> cfg.midgard.foo_pi.str '3.14' >>> cfg.midgard.foo_pi.tuple ('3.14',) Sources: -------- Each configuration entry records its source. That is, where that entry was defined. Examples include read from file, set as a command line option, or programmatically from a dictionary. The source can be looked up on an individual entry, or for all entries in a configuration. >>> cfg.midgard.foo_pi.source 'command line' >>> cfg.sources # doctest: +SKIP {'/home/midgard/midgard.conf', 'command line'} Profiles: --------- Fallback Configuration: --------------------- Master Section: --------------- Replacement Variables: ---------------------- Help text and Type hints: ------------------------- """ # Standard library imports import builtins from configparser import ConfigParser, BasicInterpolation, ExtendedInterpolation from contextlib import contextmanager import datetime as stdlib_datetime import os.path import pathlib import re import sys from collections import UserDict # Midgard imports from midgard.dev import console from midgard.collections import enums from midgard.dev import exceptions # Typing from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union ProfileName = Optional[str] Sections = Dict[str, "ConfigurationSection"] EntryName = str ConfigVars = Dict[str, str] # Date and datetime formats FMT_date = "%Y-%m-%d" FMT_datetime = "%Y-%m-%d %H:%M:%S" FMT_dt_file = "%Y%m%d-%H%M%S" class CasedConfigParser(ConfigParser): """ConfigParser with case-sensitive keys""" def optionxform(self, optionstr: str) -> str: """Do not turn optionstr (key) into lowercase""" return optionstr class Configuration: """Represents a Configuration""" def __init__(self, name: str) -> None: """Initialize a Configuration The name is used purely for representation and error messages. Args: name: Name of configuration. """ self.name = name self.fallback_config = None self.master_section = None self._profiles: List[ProfileName] = [None] self._profile_sections: Dict[ProfileName, Sections] = dict() self._sections: Sections = dict() self._vars_dict: ConfigVars = dict() self._update_count: int = 0 @classmethod def read_from_file(cls, cfg_name: str, *file_paths: Union[str, pathlib.Path]) -> "Configuration": """Read a configuration from one or more files Args: file_paths: File(s) that will be read. Returns: A Configuration representing the file(s). """ cfg = cls(cfg_name) for file_path in file_paths[::-1]: cfg.update_from_file(file_path) return cfg @classmethod @contextmanager def update_on_file(cls, file_path: Union[str, pathlib.Path], **as_str_args: Any) -> Generator: """Context manager for updating a configuration on file """ # Read config from file cfg = cls.read_from_file("Temporary", file_path) update_count_before = cfg._update_count # Yield config so it can be updated yield cfg # Write config if it has been updated if cfg._update_count > update_count_before: cfg.write_to_file(file_path, **as_str_args) def write_to_file(self, file_path: Union[str, pathlib.Path], **as_str_args: Any) -> None: """Write the configuration to a file In addition to the file path, arguments can be specified and will be passed on to the as_str() function. See `as_str()` for more information. Todo: Use files.open_path """ file_path = pathlib.Path(file_path) file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, mode="w") as fid: fid.write(self.as_str(**as_str_args) + "\n") @property def section_names(self) -> List[str]: """Names of sections in Configuration""" return list(self._sections.keys()) @property def sections(self) -> List["ConfigurationSection"]: """Sections in Configuration""" return list(self._sections.values()) @property def sources(self) -> Set[str]: """Sources of entries in Configuration""" return {s[k].source for s in self.sections for k in s.keys() if s[k].source} @property def profiles(self) -> List[ProfileName]: """List of profiles currently being used in Configuration""" return self._profiles @profiles.setter def profiles(self, values: Union[None, List[ProfileName]]) -> None: """Set profiles that will be used in Configuration The list of profiles should be a prioritized list where the first profile listed will be used if available. None is used to indicate default values (no profile), and will be automatically appended at the end of the list of profiles. To not use any profiles, set `cfg.profiles = None`. Args: values: List of profiles to use. """ if values is None: values = [None] elif values[-1] is not None: values.append(None) self._profiles = values self._set_sections_for_profiles() def _set_sections_for_profiles(self) -> None: """Update sections according to profiles""" self._sections.clear() # Add values in reverse order so that the first profile is prioritized for profile in self.profiles[::-1]: for section_name, profile_section in self._profile_sections.get(profile, dict()).items(): self._sections.setdefault(section_name, ConfigurationSection(section_name)) for key, entry in profile_section.items(): self._sections[section_name][key] = entry @property def fallback_config(self) -> "Configuration": """The fallback configuration""" if self._fallback_config is None: raise exceptions.MissingConfigurationError( f"Configuration '{self.name}' has not defined a fallback configuration" ) return self._fallback_config @fallback_config.setter def fallback_config(self, cfg: Optional["Configuration"]) -> None: """Set the fallback configuration""" self._fallback_config = cfg @property def master_section(self) -> "ConfigurationSection": """The master section""" if self._master_section is None: raise exceptions.MissingSectionError(f"Configuration {self.name!r} has not defined a master section") if self._master_section not in self._sections: raise exceptions.MissingSectionError( f"Master section {self._master_section!r} does not exist in configuration {self.name!r}" ) return self._sections[self._master_section] @master_section.setter def master_section(self, section: Optional[str]) -> None: """Set the master section""" self._master_section = section def get( self, key: str, value: Optional[str] = None, section: Optional[str] = None, default: Optional[str] = None ) -> "ConfigurationEntry": """Get an entry from a configuration with possibility for override and default value A value for an entry is found using the following priorities: 1. An explicit value given in `value`. None is used as a marker for no value. 2. Looked up in the current configuration. 3. Looked up in any fallback confiurations that are defined. 4. The default value is used. If `value` is not None, that value is simply returned as a `ConfigurationEntry`. If `default` is not given (is None), and a value is not found in any other way, a MissingEntryError is raised. Args: key: Name of option (key in the configuration entry). value: Value of entry. Used for overriding the configuration. section: Section in the configuration in which to look up the key. default: Default value that is returned if value is not found any other way. Returns: Entry representing the value. """ if value is not None: return ConfigurationEntry(key, value=value, source="method call", vars_dict=self.vars) try: section_value = self.master_section if section is None else self[section] if isinstance(section_value, ConfigurationEntry): return section_value else: return section_value[key] except (exceptions.MissingSectionError, exceptions.MissingEntryError) as err: try: return self.fallback_config.get(key=key, section=section) except (exceptions.MissingConfigurationError, exceptions.MissingEntryError): if default is None: # Raise original error raise err else: return ConfigurationEntry(key, value=default, source="default value", vars_dict=self.vars) def exists(self, key: str, section: Optional[str] = None) -> bool: """Check if a configuration entry exists Args: key: Name of option (key in the configuration entry). section: Section in the configuration in which to look up the key. Returns: True if the configuration entry exists, otherwise False. """ if section is None: return self.master_section.exists(key) try: cfg_section = self[section] except (exceptions.MissingSectionError, exceptions.MissingEntryError): return False if isinstance(cfg_section, ConfigurationEntry): return False else: return cfg_section.exists(key) def update( self, section: str, key: str, value: str, *, profile: ProfileName = None, source: str = "unknown", meta: Optional[Dict[str, str]] = None, allow_new: bool = True, _update_sections: bool = True, ) -> None: """Update a configuration section with a configuration entry If `allow_new` is False, the configuration entry must already exist. If it is True the update is allowed to create a new section and a new entry is necessary. The `_update_sections` flag can be used to not update the sections of the configuration, only the profiles. This should typically not be done, but is used by some of the other update methods which update the sections themselves. Args: section: Section to update. key: Key of entry. value: Value of entry. profile: Profile to update. source: Source of the update. meta: Metadata like help text and type hints for the entry. allow_new: Whether to allow the creation of a new section and entry. """ if not allow_new: profile_str = "" if profile is None else f"(profile: '{profile}')" if section not in self._sections: raise exceptions.MissingSectionError( f"Configuration '{self.name}' does not contain section '{section}' {profile_str}" ) if key not in self._sections[section]: raise exceptions.MissingEntryError( f"Section '{section}' of configuration '{self.name}' does not contain entry '{key}' {profile_str}" ) # Add entry to profile source = source if profile is None else f"{source} ({profile})" profile_sections = self._profile_sections.setdefault(profile, dict()) profile_sections.setdefault(section, ConfigurationSection(section)) # Record that configuration has been updated if key not in profile_sections[section] or profile_sections[section][key]._value != value: self._update_count += 1 profile_sections[section][key] = ConfigurationEntry( key, value=value, source=source, meta=meta, vars_dict=self.vars ) # Update sections if _update_sections: self._set_sections_for_profiles() def update_from_file( self, file_path: Union[str, pathlib.Path], allow_new: bool = True, interpolate: bool = False, case_sensitive: bool = False, ) -> None: """Update the configuration from a configuration file The Python ConfigParser is used to read the file. The file format that is supported is described at https://docs.python.org/library/configparser.html Different profiles in a configuration file are denoted by double underscores in the sections names. For instance will the following configuration have a `foo` profile in the `spam` section (in addition to the default profile): [spam] ... [spam__foo] ... The file may contain a special section called `__replace__` which may contain key-value pairs which will replace format-style strings in keys and values in the rest of the file. Additionally, the file may contain a special section called `__vars__`. The key-value pairs from this section will be added to the `dictionary` of the configuration. If `interpolate` is set to True, ExtendedInterpolation of variables in the configuration file is used. This means that variables of the form ${key:section} can be used for references within the file. See https://docs.python.org/library/configparser.html#configparser.ExtendedInterpolation for details. Args: file_path: Path to the configuration file. allow_new: Whether to allow the creation of new sections and entries. interpolate: Whether to interpolate variables in the configuration file. case_sensitive: Whether to read keys as case sensitive (or convert to lower case). """ # Use ConfigParser to read from file cfg_parser_cls = CasedConfigParser if case_sensitive else ConfigParser cfg_parser = cfg_parser_cls( allow_no_value=True, delimiters=("=",), interpolation=ExtendedInterpolation() if interpolate else BasicInterpolation(), ) cfg_parser.read(file_path) # Read special __replace__ replace_vars = {k: v for k, v in cfg_parser["__replace__"].items()} if "__replace__" in cfg_parser else {} # Add __vars__ to vars dictionary if "__vars__" in cfg_parser.sections(): self.update_vars({k: v for k, v in cfg_parser["__vars__"].items()}) # Add configuration entries for cfg_section in cfg_parser.sections(): section, has_profile, profile = cfg_section.partition("__") if not section: # Skip dunder sections continue for key, value in cfg_parser[cfg_section].items(): # Handle meta-information if ":" in key: continue meta = {k.partition(":")[-1]: v for k, v in cfg_parser[cfg_section].items() if k.startswith(f"{key}:")} # Create a configuration entry self.update( section, _replace(key, replace_vars), value if value is None else _replace(value, replace_vars).replace("\n", " "), profile=profile if has_profile else None, source=str(file_path), meta=meta, allow_new=allow_new, _update_sections=False, ) self._set_sections_for_profiles() def update_from_config_section( self, other_section: "ConfigurationSection", section: Optional[str] = None, allow_new: bool = True ) -> None: section = other_section.name if section is None else section for key, entry in other_section.data.items(): self.update( section, key, entry.str, source=entry.source, meta=entry.meta, allow_new=allow_new, _update_sections=False, ) self._set_sections_for_profiles() def update_from_options( self, options: Optional[List[str]] = None, profile: ProfileName = None, source: str = "command line", allow_new: bool = False, ) -> None: if options is None: options = sys.argv[1:] for option in options: if not (option.startswith("--") and "=" in option): continue # Parse config name, section, key and value of the form name:section:key=value opt_key, _, opt_value = option[2:].partition("=") opt_section, _, opt_key = opt_key.rpartition(":") opt_name, _, opt_section = opt_section.rpartition(":") # Update current configuration if opt_name and opt_name != self.name: continue if not opt_section: opt_section = self.master_section.name self.update( opt_section, opt_key, opt_value, profile=profile, source=f"{source} ({option})", allow_new=allow_new, _update_sections=False, ) self._set_sections_for_profiles() def update_from_dict( self, cfg_dict: Dict[str, Any], section: Optional[str] = None, source: str = "dictionary", allow_new: bool = True, ) -> None: section = self.master_section.name if section is None else section for key, value in cfg_dict.items(): self.update(section, key, value, source=source, allow_new=allow_new, _update_sections=False) self._set_sections_for_profiles() def clear(self) -> None: """Clear the configuration""" self._sections.clear() self.clear_vars() @property def vars(self) -> ConfigVars: """The configuration variables""" return self._vars_dict def clear_vars(self) -> None: """Clear the configuration variables""" self._vars_dict.clear() def update_vars(self, new_vars: ConfigVars) -> None: """Update the configuration variables""" self._vars_dict.update(new_vars) def as_str( self, width: Optional[int] = None, key_width: int = 30, only_used: bool = False, metadata: bool = True ) -> str: """The configuration represented as a string This is simililar to what is shown by `str(configuration)` (and implemented by `__str__`), but has more flexibility. Args: width: Width of text for wrapping. Default is width of console. key_width: Width of the key column. Default is 30 characters. only_used: Only include configuration entries that has been used so far. metadata: Include metadata like type and help text. Returns: String representation of the configuration. """ sections = self._sections.values() section_strs = [ s.as_str(width=width, key_width=key_width, only_used=only_used, metadata=metadata) for s in sections ] return "\n\n\n".join(s for s in section_strs if s) def as_dict( self, getters: Optional[Dict[str, Dict[str, str]]] = None, default_getter: str = "str" ) -> Dict[str, Dict[str, Any]]: """The configuration represented as a dictionary Args: getters: How to get the value of each entry in each section. default_getter: How to get the value of entries not specified in getters. Returns: Representation of the configuration as a nested dictionary. """ getters = dict() if getters is None else getters return {k: v.as_dict(getters=getters.get(k), default_getter=default_getter) for k, v in self._sections.items()} def __getitem__(self, key: str) -> Union["ConfigurationSection", "ConfigurationEntry"]: """Get a section or entry from the master section from the configuration""" if key in self.section_names: return self._sections[key] try: return self.master_section[key] except exceptions.MissingSectionError: try: return self.fallback_config[key] except exceptions.MidgardException: raise exceptions.MissingSectionError(f"Configuration {self.name!r} has no section {key!r}") from None def __getattr__(self, key: str) -> Union["ConfigurationSection", "ConfigurationEntry"]: """Get a section or entry from the master section from the configuration""" return self[key] def __delitem__(self, key: str) -> None: """Delete a section from the configuration""" del self._sections[key] def __delattr__(self, key: str) -> None: """Delete a section from the configuration""" del self._sections[key] def __dir__(self) -> List[str]: """List attributes and sections in the configuration""" try: return list(super().__dir__()) + self.section_names + self.master_section.as_list() except exceptions.MissingSectionError: return list(super().__dir__()) + self.section_names def __str__(self) -> str: """The configuration represented as a string This string can be stored in a file and read back with `update_from_file`. """ return "\n\n".join(str(s) for s in self._sections.values()) def __repr__(self) -> str: """A simple string representation of the configuration""" return f"{self.__class__.__name__}(name='{self.name}')" class ConfigurationSection(UserDict): data: Dict[str, "ConfigurationEntry"] def __init__(self, name: str) -> None: super().__init__() self.name: str = name def as_str( self, width: Optional[int] = None, key_width: int = 30, only_used: bool = False, metadata: bool = True ) -> str: """The configuration section represented as a string This is simililar to what is shown by `str(section)` (and implemented by `__str__`), but has more flexibility. Args: width: Width of text for wrapping. Default is width of console. key_width: Width of the key column. Default is 30 characters. only_used: Only include configuration entries that has been used so far. metadata: Include metadata like type and help text. Returns: String representation of the configuration section. """ lines = list() for entry in self.data.values(): if only_used and not entry.is_used: continue lines.append(entry.entry_as_str(width=width, key_width=key_width, metadata=metadata)) if lines: return f"[{self.name}]\n" + "\n".join(lines) else: return "" def as_list(self) -> List[str]: """List of keys of entries in configuration section Returns: List of keys of entries in configuration section. """ return list(self.data.keys()) def as_dict(self, getters: Dict[str, str] = None, default_getter: str = "str") -> Dict[str, Any]: """The configuration section represented as a dictionary Args: getters: How to get the value of each entry in the section. default_getter: How to get the value of entries not specified in getters. Returns: Representation of the configuration section as a dictionary. """ getters = dict() if getters is None else getters getters = {k: getters.get(k, default_getter) for k in self.keys()} return {k: getattr(e, getters[k]) for k, e in self.items()} def exists(self, key: str) -> bool: """Check if key exists in section Args: key: Name of configuration key. Returns: True if key is in section, False otherwise. """ return key in self.data def __getitem__(self, key: str) -> "ConfigurationEntry": """Get an entry from the configuration section""" try: return self.data[key] except KeyError: raise exceptions.MissingEntryError(f"Configuration section '{self.name}' has no entry '{key}'") from None def __getattr__(self, key: str) -> "ConfigurationEntry": """Get an entry from the configuration section""" try: return self.data[key] except KeyError: raise exceptions.MissingEntryError(f"Configuration section '{self.name}' has no entry '{key}'") from None def __dir__(self) -> List[str]: """List attributes and entries in the configuration section""" return list(super().__dir__()) + self.as_list() def __str__(self) -> str: """The configuration section represented as a string""" return f"[{self.name}]\n" + "\n".join(str(v) for v in self.data.values()) def __repr__(self) -> str: """A simple string representation of the configuration section""" return f"{self.__class__.__name__}(name='{self.name}')" class ConfigurationEntry: _BOOLEAN_STATES = { "0": False, "1": True, "false": False, "true": True, "no": False, "yes": True, "off": False, "on": True, } def __init__( self, key: str, value: Any, *, source: builtins.str = "", meta: Optional[Dict[str, str]] = None, vars_dict: Optional[ConfigVars] = None, _used_as: Optional[Set[builtins.str]] = None, ) -> None: self.source = source self.meta = dict() if meta is None else meta self._key = key self._value = str(value) self._vars_dict = dict() if vars_dict is None else vars_dict self._used_as = set() if _used_as is None else _used_as @property def type(self) -> Optional[builtins.str]: """Type hint for the ConfigurationEntry""" return self.meta.get("type", None) @property def help(self) -> builtins.str: """Help text for the ConfigurationEntry""" return self.meta.get("help", "") @property def str(self) -> builtins.str: """Value of ConfigurationEntry as string""" self._using("str") return self._value def as_str(self) -> builtins.str: """Value of ConfigurationEntry as string""" return self.str @property def int(self) -> builtins.int: """Value of ConfigurationEntry converted to an integer""" self._using("int") try: return int(self._value) except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} cannot be converted to an integer" ) from None def as_int(self) -> builtins.int: """Value of ConfigurationEntry converted to an integer""" return self.int @property def float(self) -> builtins.float: """Value of ConfigurationEntry converted to a float""" self._using("float") try: return float(self._value) except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} cannot be converted to a float" ) from None def as_float(self) -> builtins.float: """Value of ConfigurationEntry converted to a float""" return self.float @property def bool(self) -> builtins.bool: """Value of ConfigurationEntry converted to a boolean The conversion is done by looking up the string value of the entry in _BOOLEAN_STATES. """ self._using("bool") try: return self._BOOLEAN_STATES[self._value.lower()] except KeyError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} cannot be converted to a boolean" ) from None def as_bool(self) -> builtins.bool: """Value of ConfigurationEntry converted to a boolean The conversion is done by looking up the string value of the entry in _BOOLEAN_STATES. """ return self.bool @property def date(self) -> stdlib_datetime.date: """Value of ConfigurationEntry converted to a date object assuming format `FMT_date`""" return self.as_date(format=FMT_date) def as_date(self, format: builtins.str = FMT_date) -> stdlib_datetime.date: """Value of ConfigurationEntry converted to a date object Args: format (String): Format string, see strftime for information about the string. Returns: Date: Value of entry. """ self._using("date") try: return stdlib_datetime.datetime.strptime(self._value, format).date() except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} does not match the date format '{format}'" ) from None @property def datetime(self) -> stdlib_datetime.datetime: """Value of ConfigurationEntry converted to a datetime object assuming format `FMT_datetime`""" return self.as_datetime(format=FMT_datetime) def as_datetime(self, format: builtins.str = FMT_datetime) -> stdlib_datetime.datetime: """Value of ConfigurationEntry converted to a datetime object Args: format (String): Format string, see strftime for information about the string. Returns: Datetime: Value of entry. """ self._using("datetime") try: return stdlib_datetime.datetime.strptime(self._value, format) except ValueError: raise ValueError( f"Value '{self._value}' of '{self._key}' in {self.source} does not match the date format '{format}'" ) from None @property def path(self) -> pathlib.Path: """Value of ConfigurationEntry interpreted as a path string""" self._using("path") path = self._value if "~" in path: path = os.path.expanduser(path) return pathlib.Path(path) def as_path(self) -> pathlib.Path: """Value of ConfigurationEntry interpreted as a path string""" return self.path @property def list(self) -> List[builtins.str]: """Value of ConfigurationEntry converted to a list by splitting at commas and whitespace""" self._using("list") return self._value.replace(",", " ").split() def as_list( self, split_re: builtins.str = r"[\s,]", convert: Callable = builtins.str, maxsplit: builtins.int = 0 ) -> List[Any]: """Value of ConfigurationEntry converted to a list The entry is converted to a list by using the `split_re`-regular expression. By default the entry will be split at commas and whitespace. Args: split_re: Regular expression used to split entry into list. convert: Function used to convert each element of the list. maxsplit: If nonzero, at most maxsplit splits occur. Returns: Value of entry as list. """ self._using("list") return [convert(s) for s in re.split(split_re, self._value, maxsplit=maxsplit) if s] @property def list_of_lists(self) -> List[List[builtins.str]]: self._using("list_of_lists") raise NotImplementedError def as_list_of_lists( self, split_res: Tuple[builtins.str, ...] = (r"[\s,]", r"[^_\w]"), num_elements: Optional[builtins.int] = None, convert: Callable = builtins.str, ) -> List[List[Any]]: self._using("list_of_lists") raise NotImplementedError @property def tuple(self) -> Tuple[builtins.str, ...]: """Value of ConfigurationEntry converted to tuple by splitting at commas and whitespace""" self._using("tuple") return tuple(self._value.replace(",", " ").split()) def as_tuple( self, split_re: builtins.str = r"[\s,]", convert: Callable = builtins.str, maxsplit: builtins.int = 0 ) -> Tuple[Any, ...]: """Value of ConfigurationEntry converted to a tuple The entry is converted to a tuple by using the `split_re`-regular expression. By default the entry will be split at commas and whitespace. Args: split_re: Regular expression used to split entry into tuple. convert: Function used to convert each element of the tuple. maxsplit: If nonzero, at most maxsplit splits occur. Returns: Value of entry as tuple. """ self._using("tuple") return tuple([convert(s) for s in re.split(split_re, self._value, maxsplit=maxsplit) if s]) @property def dict(self) -> Dict[builtins.str, builtins.str]: """Value of ConfigurationEntry converted to a dict""" self._using("dict") return dict(i.partition(":")[::2] for i in self.list) def as_dict( self, item_split_re: builtins.str = r"[\s,]", key_value_split_re: builtins.str = r"[:]", convert: Callable = builtins.str, maxsplit: builtins.int = 0, ) -> Dict[builtins.str, Any]: """Value of ConfigurationEntry converted to a dictionary By default the dictionary is created by splitting items at commas and whitespace, and key from value at colons. Args: item_split_re: Regular expression used to split entry into items. key_value_split_re: Regular expression used to split items into keys and values. convert: Function used to convert each value in the dictionary. maxsplit: If nonzero, at most maxsplit splits occur when splitting entry into items. Returns: Value of entry as dict. """ self._using("dict") items = [s for s in re.split(item_split_re, self._value, maxsplit=maxsplit) if s] key_values = [re.split(key_value_split_re, i, maxsplit=1) for i in items] return {k: convert(v) for k, v in key_values} def as_enum(self, enum: builtins.str) -> enums.enum.Enum: """Value of ConfigurationEntry converted to an enumeration Args: enum (String): Name of Enum. Returns: Enum: Value of entry as Enum. """ self._using("enum") return enums.get_value(enum, self._value) @property def replaced(self) -> "ConfigurationEntry": """Value of ConfigurationEntry with {$}-variables replaced""" return self.replace() def replace(self, default: Optional[builtins.str] = None, **replace_vars: builtins.str) -> "ConfigurationEntry": value = _replace(self._value, dict(self._vars_dict, **replace_vars), default) return self.__class__(key=self._key, value=value, source=self.source, _used_as=self._used_as) @property def is_used(self) -> builtins.bool: return bool(self._used_as) def entry_as_str( self, width: Optional[builtins.int] = None, key_width: builtins.int = 30, metadata: builtins.bool = True ) -> builtins.str: """The configuration entry represented as a string This is simililar to what is shown by `str(entry)` (and implemented by `__str__`), but has more flexibility. Args: width: Width of text for wrapping. Default is width of console. key_width: Width of the key column. Default is 30 characters. metadata: Include metadata like type and help text. Returns: String representation of the configuration entry. """ lines = list() width = console.columns() if width is None else width fill_args = dict(width=width, hanging=key_width + 3, break_long_words=False, break_on_hyphens=False) # The entry itself lines.append(console.fill(f"{self._key:<{key_width}} = {self._value}", **fill_args)) # Metadata, including help text and type hints if metadata and self.meta: for meta_key, meta_value in self.meta.items(): if meta_value is None: lines.append(console.fill(f"{self._key}:{meta_key}", **fill_args)) else: lines.append(console.fill(f"{f'{self._key}:{meta_key}':<{key_width}} = {meta_value}", **fill_args)) lines.append("") return "\n".join(lines) def _using(self, as_type: builtins.str) -> None: """Register that entry is used as a type Args: as_type: Name of type entry is used as. """ self._used_as.add(as_type) def __add__(self, other: "ConfigurationEntry") -> "ConfigurationEntry": if isinstance(other, self.__class__): if self.source == other.source: source = f"{self.source} (+)" else: source = f"{self.source} + {other.source}" return self.__class__( key=f"{self._key} + {other._key}", value=self.str + other.str, source=source, vars_dict=self._vars_dict ) else: return NotImplemented def __bool__(self) -> builtins.bool: """A ConfigurationEntry is truthy if the value is not empty""" return bool(self._value) def __str__(self) -> builtins.str: """The configuration entry represented as a string""" return self.entry_as_str() def __repr__(self) -> builtins.str: """A simple string representation of the configuration entry""" return f"{self.__class__.__name__}(key='{self._key}', value='{self._value}')" def _replace(string: str, replace_vars: Dict[str, str], default: Optional[str] = None) -> str: """Replace format style variables in a string Handles nested replacements by first replacing the replace_vars. Format specifiers (after colon, :) are allowed, but can not contain nested format strings. This function is used instead of str.format for three reasons. It handles: - that not all pairs of {...} are replaced at once - optional default values for variables that are not specified - nested replacements where values of replace_vars may be replaced themselves Args: string: Original string replace_vars: Variables that can be replaced default: Optional default value used for variables that are not in replace_vars. """ matches = re.finditer(r"\{(\w+)(:[^\{\}]*)?\}", string) for match in matches: var = match.group(1) var_expr = match.string[slice(*match.span())] replacement = replace_vars.get(var) if replacement is None: replacement = var_expr if default is None else default # Default replacements else: replacement = _replace(replacement, replace_vars, default) # Nested replacements # Use str.format to handle format specifiers string = string.replace(var_expr, var_expr.format(**{var: replacement})) return string PKOnANmidgard/config/constant.pyPKFDNb1B1Bmidgard/config/files.py"""Midgard library module for opening files based on a special configuration Example: -------- from midgard.config import files with files.open('eopc04_iau', mode='rt') as fid: for line in fid: print(line.strip()) Description: ------------ This module handles opening of files registered in a special configuration, typically a configuration file. The cfg.files.open and cfg.files.open_path methods are both wrappers around the built-in open function, and behave mainly similar. In particular, they accept all the same keyword arguments (like for instance mode). Furthermore, to make sure files are properly closed they should normally be used with a context manager as in the example above. """ # Standard library imports import builtins from contextlib import contextmanager import itertools import pathlib import re from typing import Any, Callable, Dict, Iterator, List, Optional # Third party imports import pycurl # Midgard imports from midgard.config.config import Configuration from midgard.dev import console from midgard.dev import log from midgard.files import files from midgard.files import url class FileConfiguration(Configuration): """Configuration for handling files""" download_missing = True @contextmanager def open( self, file_key: str, file_vars: Dict[str, str] = None, create_dirs: bool = False, is_zipped: Optional[bool] = None, download_missing: bool = True, use_aliases: bool = True, **open_args: Any, ) -> Iterator: """Open a file based on information in a configuration Open a file based on file key which is looked up in the configuration. The method automatically handles reading from gzipped files if the filename is specified with the special {gz}-ending (including the curly braces) in the file list. In that case, the mode should be specified to be 'rt' if the contents of the file should be treated as text. If both a zipped and an unzipped version is available, the zipped version is used. This can be overridden by specifying True or False for the is_zipped-parameter. This function behaves similar to the built-in open-function, and should typically be used with a context manager as follows: Example: with cfg.open('eopc04_iau', mode='rt') as fid: for line in fid: print(line.strip()) Args: file_key: String that is looked up in the configuration file list. file_vars: Dict, extra variables used to replace variables in file name and path. create_dirs: True or False, if True missing directories are created. iz_zipped: None, True, or False. Whether the file open_args: All keyword arguments are passed on to open_path. Returns: File object representing the file. """ download_missing = download_missing and "r" in open_args.get("mode", "r") file_path = self.path( file_key, file_vars, is_zipped=is_zipped, download_missing=download_missing, use_aliases=use_aliases ) if "encoding" not in open_args: file_encoding = self.get("encoding", section=file_key, default="").str open_args["encoding"] = file_encoding or None mode = open_args.get("mode", "r") _log_file_open(file_path, description=file_key, mode=mode) try: with files.open(file_path, create_dirs=create_dirs, open_as_gzip=is_zipped, **open_args) as fid: yield fid except Exception: raise def path( self, file_key: str, file_vars: Optional[Dict[str, str]] = None, default: Optional[str] = None, is_zipped: Optional[bool] = None, download_missing: bool = False, use_aliases: bool = True, ) -> pathlib.Path: """Construct a filepath for a given file with variables If `is_zipped` is None, and the file_path contains `{gz}`, the file will be assumed to be a gzip-file if there exists a file named `.gz`. When setting `use_aliases` to True, the aliases as specified in the files configuration file represent alternative filenames. In particular: - if directory / file_name exists it is returned - otherwise the first directory / alias that exists is returned - if none of these exist, directory / file_name is returned Args: file_key (String): Key that is looked up in the configuration. file_vars (Dict): Values used to replace variables in file name and path. default (String): Value to use for variables that are not in file_vars. is_zipped (Bool/None): True, False or None. If True, open with gzip. If None automatically decide. download_missing (Bool): Whether to try to download missing files. use_aliases (Bool): Fall back on aliases if file does not exist. Return: Path: Full path with replaced variables in file name and path. """ file_vars = dict() if file_vars is None else file_vars directory = self[file_key].directory.replace(default=default, **file_vars).path file_name = self[file_key].filename.replace(default=default, **file_vars).path file_path = self._replace_gz(directory / file_name, is_zipped) # Check for aliases if use_aliases and not self._path_exists(file_path): aliases = self.get("aliases", section=file_key, default="").replace(default=default, **file_vars).list for alias in aliases: aliased_path = self._replace_gz(file_path.with_name(alias), is_zipped) if self._path_exists(aliased_path): return aliased_path # Try to download the file if it is missing if download_missing and not self._path_exists(file_path): self.download_file(file_key, file_vars, file_path, is_zipped=is_zipped) return file_path def url(self, file_key, file_vars=None, default=None, is_zipped=None, use_aliases=False): """Construct a URL for a given file with variables If `is_zipped` is None, and the url contains `{gz}`, the url will be assumed to point to a gzip-file if there exists a file named `.gz` on the server. Args: file_key: Key that is looked up in the configuration. file_vars: Values used to replace variables in file name and path. default: Value to use for variables that are not in file_vars. is_zipped: True, False or None. If True, open with gzip. If None automatically decide. use_aliases: Fall back on aliases if URL does not exist - may be slow. Return: String: Full URL with replaced variables in file name and url. """ file_vars = dict() if file_vars is None else file_vars base_url = self[file_key].url.replace(default=default, **file_vars).str.rstrip("/") file_name = self[file_key].filename.replace(default=default, **file_vars).str file_url = self._replace_gz(url.URL(f"{base_url}/{file_name}"), is_zipped=is_zipped) # Check for aliases if use_aliases and not file_url.exists(): aliases = self.get("aliases", section=file_key, default="").replace(default=default, **file_vars).list for alias in aliases: aliased_url = self._replace_gz(file_url.with_name(alias), is_zipped=is_zipped) if aliased_url.exists(): return aliased_url return file_url @staticmethod def _replace_gz(file_path: pathlib.Path, is_zipped: Optional[bool] = None) -> pathlib.Path: """Replace the {gz} pattern with '.gz' or '' depending on whether the file is zipped If `is_zipped` is None, and the file_path contains `{gz}`, the file will be assumed to be a gzip-file if there exists a file named `.gz`. Args: file_path: Path to a file is_zipped: True, False or None. If True, open with gzip. If None automatically decide. Returns: File path with {gz} replaced. """ if "{gz}" not in file_path.name: return file_path if is_zipped is None: is_zipped = file_path.with_name(file_path.name.replace("{gz}", ".gz")).exists() if is_zipped: return file_path.with_name(file_path.name.replace("{gz}", ".gz")) else: return file_path.with_name(file_path.name.replace("{gz}", "")) @classmethod def _empty_file(cls, file_path: pathlib.Path) -> bool: """Check if a file is empty Args: file_path: Path to a file. Returns: Whether path is empty or not. """ if not cls._path_exists(file_path): raise FileNotFoundError(f"File '{file_path}' does not exist") return not (file_path.stat().st_size > 0) @staticmethod def _path_exists(file_path: pathlib.Path) -> bool: """Check if a path exists Unfortunately, Windows throws an error when doing file_path.exists() if the file path contains wildcard characters like *. Thus, we wrap this in a check on whether the file path syntax is correct before calling file_path.exists. If the file path contains non-path characters, the file path can not exist. Args: file_path: Path to a file. Returns: Whether path exists or not. """ try: return file_path.exists() except OSError: return False def is_path_zipped(file_path): """Indicate whether a path is to a gzipped file or not For now, this simply checks whether the path ends in .gz or not. Args: file_path (Path): Path to a file. Returns: Boolean: True if path is to a gzipped file, False otherwise. """ try: file_name = file_path.name # Assume file_path is Path-object except AttributeError: file_name = file_path # Fall back to file_path being string return file_name.endswith(".gz") def encoding(self, file_key): """Look up the encoding for a given file key Args: file_key (String): Key that is looked up in the configuration. Returns: String: Name of encoding. If encoding is not specified None is returned. """ file_encoding = self.get("encoding", section=file_key, default="").str return file_encoding or None def download_file( self, file_key: str, file_vars: Optional[Dict[str, str]] = None, file_path: Optional[pathlib.Path] = None, create_dirs: bool = True, **path_args: Any, ) -> Optional[pathlib.Path]: """Download a file from the web and save it to disk Use pycurl (libcurl) to do the actual downloading. Requests might be nicer for this, but turned out to be much slower (and in practice unusable for bigger files) and also not really supporting ftp-downloads. Args: file_key: File key that should be downloaded. file_vars: File variables used to find path from file_key. file_path: Path where file will be saved, default is to read from configuration. create_dirs: Create directories as necessary before downloading file. path_args: Arguments passed on to .path() to find file_path. Returns: Path to downloaded file, None if no file was downloaded. """ # Do not download anything if download_missing class variable is False if not self.download_missing: return None # Do not download anything if url is not given in configuration if "url" not in self[file_key] or not self[file_key].url.str: return None # Get file_path from configuration if it's not given explicitly file_url = self.url(file_key, file_vars=file_vars, **path_args) if file_path is None: file_path = self.path(file_key, file_vars=file_vars, download_missing=False, **path_args) file_path = file_path.with_name(file_url.name) if create_dirs: file_path.parent.mkdir(parents=True, exist_ok=True) log.info(f"Download {file_key} from '{file_url}' to '{file_path}'") with builtins.open(file_path, mode="wb") as fid: c = pycurl.Curl() c.setopt(c.URL, file_url) c.setopt(c.WRITEDATA, fid) try: c.perform() if not (200 <= c.getinfo(c.HTTP_CODE) <= 299): raise pycurl.error() except pycurl.error: log.error(f"Problem downloading file: {c.getinfo(c.EFFECTIVE_URL)} ({c.getinfo(c.HTTP_CODE)})") if file_path.exists(): # Print first 10 lines to console head_of_file = f"Contents of '{file_path}':\n" + "\n".join(file_path.read_text().split("\n")[:10]) log.info(console.indent(head_of_file, num_spaces=8)) file_path.unlink() log.warn(f"Try to download '{file_url}' manually and save it at '{file_path}'") else: log.info(f"Done downloading {file_key}") finally: c.close() return file_path def glob_paths( self, file_key: str, file_vars: Optional[Dict[str, str]] = None, is_zipped: Optional[bool] = None ) -> List[pathlib.Path]: """Find all filepaths matching a filename pattern Using pathlib.Path.glob() here is not trivial because we need to split into a base directory to start searching from and a pattern which may include directories. With glob.glob() this is trivial. The downside is that it only returns strings and not pathlib.Paths. """ path_string = str(self.path(file_key, file_vars, default="*", is_zipped=is_zipped)) glob_path = pathlib.Path(re.sub(r"\*+", "*", path_string)) idx = min((i for i, p in enumerate(glob_path.parts) if "*" in p), default=len(glob_path.parts) - 1) glob_base = pathlib.Path(*glob_path.parts[:idx]) glob_pattern = str(pathlib.Path(*glob_path.parts[idx:])) return list(glob_base.glob(glob_pattern)) def glob_variable(self, file_key, variable, pattern, file_vars=None): """Find all possible values of variable """ # Find available paths file_vars = dict() if file_vars is None else dict(file_vars) file_vars[variable] = "*" search_paths = self.glob_paths(file_key, file_vars) # Set up the regular expression re_vars = {**file_vars, variable: f"(?P<{variable}>__pattern__)"} path_pattern = str(self.path(file_key, file_vars=re_vars, default=".*")).replace("\\", "\\\\") for i in itertools.count(): # Give unique names to each occurance of variable path_pattern = path_pattern.replace(f"<{variable}>", f"<{variable}__{i}>", 1) if f"<{variable}>" not in path_pattern: break re_pattern = re.compile(path_pattern.replace("__pattern__", pattern)) # Find each match values = set() for search_path in search_paths: match = re_pattern.search(str(search_path)) if match: matches = set(match.groupdict().values()) if len(matches) > 1: log.warn(f"Found multiple values for {variable!r} in {search_path}: {', '.join(matches)}") values |= matches return values def _log_file_open(file_path, description="", mode="r"): """Write a message to the log about a file being opened Args: file_path (Path/String): The path to file being opened. description (String): Description used for logging. mode (String): Same as for the built-in open, usually 'r' or 'w'. """ # Add space at end to handle empty descriptions description += " " if description else "" # Pick a log message based on the mode being used to open the file log_text = f"Read {description}from {file_path}" if "w" in mode: log_text = "Write {description}to {file_path}" if file_path.is_file(): log_text = "Overwrite {description}on {file_path}" if "a" in mode: log_text = "Append {description}to {file_path}" log.debug(log_text) PKOnANmidgard/config/unit.pyPKOnANmidgard/coords/__init__.pyPKOnANmidgard/coords/position.pyPKOnANmidgard/coords/time.pyPKCN~\midgard/data/_h5utils.py"""Simple utilities used by Dataset when dealing with HDF5 files """ # Standard library imports from typing import Dict, List, Sequence, Tuple def sequence2h5attr(lst: Sequence[str]) -> str: """Convert a list to a string that can be stored as an HDF5 attribute""" return ",".join(str(s) for s in lst) def h5attr2list(attr: str) -> List[str]: """Convert an HDF5 attribute to a list of strings""" return attr.split(",") def h5attr2tuple(attr: str) -> Tuple[str, ...]: """Convert an HDF5 attribute to a list of strings""" return tuple(attr.split(",")) def dict2h5attr(dct: Dict[str, str]) -> str: """Convert a dictionary to a string that can be stored as an HDF5 attribute""" return ",".join(f"{k}:{v}" for k, v in dct.items()) def h5attr2dict(attr: str) -> List[str]: """Convert an HDF5 attribute to a dictionary of strings""" return dict(a.partition(":")[::2] for a in attr.split(",")) PKI{fNRmidgard/data/_position.py""" Module for dealing with positions, velocities and position corrections in different coordinate systems """ # Standard library imports from typing import Any, Callable, Dict, List, Tuple import copy # Third party imports import numpy as np # Midgard imports from midgard.dev import exceptions from midgard.math.constant import constant from midgard.math import rotation from midgard.math import ellipsoid _ATTRIBUTES: Dict[str, Dict[str, Callable]] = dict() # Populated by register_attribute() _FIELDS: Dict[str, List[str]] = dict() # Populated by register_field() _UNITS: Dict[str, Dict[str, str]] = dict() # Populated by register_field() _SYSTEMS: Dict[str, Dict[str, Callable]] = dict() # Populated by register_system() _CONVERSIONS: Dict[str, Dict[Tuple[str, str], Callable]] = dict() # Populated by register_system() _CONVERSION_HOPS: Dict[str, Dict[Tuple[str, str], List[str]]] = dict() # Cache for to_system() def register_attribute(cls: Callable, name: str, attr_cls: Callable) -> None: """Function used to register new attributes on position arrays The registered attributes will be available as attributes on PositionArray and its subclasses. In addition, each attribute can be given as a parameter when creating a PositionArray. The reason for using this register-function instead of a regular attribute is to allow additional attributes to be added on all position systems. Args: cls: Name of class to register the attribute for name: Name of attribute attr_cls: Class of attribute """ _ATTRIBUTES.setdefault(cls.__name__, dict()).setdefault(name, attr_cls) def register_field(units: List[str]) -> Callable: """Decorator used to register fields and their units """ def wrapper(func: Callable) -> Callable: field = func.__name__ func_class = func.__qualname__.split(".")[0] _FIELDS.setdefault(func_class, list()).append(field) _UNITS.setdefault(func_class, dict())[field] = units return func return wrapper def register_system( convert_to: Dict[str, Callable] = None, convert_from: Dict[str, Callable] = None ) -> Callable[[Callable], Callable]: """Decorator used to register new position systems The system name is read from the .system attribute of the Position class. Args: convert_to: Functions used to convert to other systems. convert_from: Functions used to convert from other systems. Returns: Decorator registering system. """ def wrapper(cls: Callable) -> Callable: name = cls.system _SYSTEMS.setdefault(cls.cls_name, dict())[name] = cls if convert_to: for to_system, converter in convert_to.items(): _CONVERSIONS.setdefault(cls.cls_name, dict())[(name, to_system)] = converter if convert_from: for from_system, converter in convert_from.items(): _CONVERSIONS.setdefault(cls.cls_name, dict())[(from_system, name)] = converter return cls return wrapper def _find_conversion_hops(cls: str, hop: Tuple[str, str]) -> List[Tuple[str, str]]: """Calculate the hops needed to convert between systems using breadth first search""" start_sys, target_sys = hop queue = [(start_sys, [])] visited = set() while queue: from_sys, hops = queue.pop(0) for to_sys in [t for f, t in _CONVERSIONS[cls] if f == from_sys]: one_hop = (from_sys, to_sys) if to_sys == target_sys: return hops + [one_hop] if one_hop not in visited: visited.add(one_hop) queue.append((to_sys, hops + [one_hop])) raise exceptions.UnknownConversionError(f"Can't convert PositionArray from {start_sys!r} to {target_sys!r}") class PosBase(np.ndarray): """Base class for the various position and velocity arrays""" system = None column_names = None _units = None @property def val(self): """Position as a plain numpy array""" return np.asarray(self) @property def mat(self): """Position as an array of matrices Adds an extra dimension, so that matrix multiplication can be performed effectively. By default .mat returns an array of column vectors. To effectively access row vectors instead, operate on a transposed delta: Example: >>> pos # shape (2, 3) LlhPosition([[0., 0., 100.], [1., 1., 0.]]) >>> pos.mat # Column vectors, shape (2, 3, 1) array([[[0.], [0.], [100.]], [[1.], [1.], [0.]]]) >>> pos.T.mat # Row vectors, shape (2, 1, 3) array([[[0., 0., 100.]], [[1., 1., 0.]]]) """ is_transposed = self.flags.f_contiguous # Because we forced order == "C" on creation if is_transposed: return self.val[:, None, :].T else: return self.val[:, :, None] def to_system(self, system: str) -> "PosDeltaBase": """Convert to a different system Args: system: Name of new system. Returns: PosDeltaBase representing the same positions or position deltas in the new system. """ # Don't convert if not necessary if system == self.system: return self # Raise error for unknown systems if system not in _SYSTEMS[self.cls_name]: systems = ", ".join(_SYSTEMS[self.cls_name]) raise exceptions.UnknownSystemError(f"System {system!r} unknown. Use one of {systems}") # Convert to new system hop = (self.system, system) if hop in _CONVERSIONS[self.cls_name]: return _SYSTEMS[self.cls_name][system].convert_to(self, _CONVERSIONS[self.cls_name][hop]) if hop not in _CONVERSION_HOPS.setdefault(self.cls_name, {}): _CONVERSION_HOPS[self.cls_name][hop] = _find_conversion_hops(self.cls_name, hop) val = self for one_hop in _CONVERSION_HOPS[self.cls_name][hop]: val = _SYSTEMS[self.cls_name][one_hop[-1]].convert_to(val, _CONVERSIONS[self.cls_name][one_hop]) return val @classmethod def unit(cls, field: str = "") -> Tuple[str, ...]: """Unit of field""" mainfield, _, subfield = field.partition(".") # Units of position array if not field: return cls._units # Unit of columns in position array elif field in cls.column_names: return (cls._units[cls.column_names.index(field)],) # Units of properties elif field in _UNITS.get(cls.cls_name, {}): return _UNITS[cls.cls_name][field] # Units of systems elif mainfield in _SYSTEMS[cls.cls_name]: return _SYSTEMS[cls.cls_name][mainfield].unit(subfield) else: raise exceptions.FieldDoesNotExistError(f"Field {mainfield!r} does not exist") from None @classmethod def _fieldnames(cls): return ( list(cls._attributes().keys()) + _FIELDS.get(cls.cls_name, []) + [f"{s}.{c}" for s, sc in _SYSTEMS[cls.cls_name].items() for c in sc.column_names] ) @classmethod def _attributes(cls): return _ATTRIBUTES.get(cls.cls_name, {}) def __getattr__(self, key): """Get attributes with dot notation Add systems and column names to attributes on Position and PostionDelta arrays. Args: key: Name of attribute. Returns: Value of attribute. """ # Convert to a different system if key in _SYSTEMS[self.cls_name]: return self.to_system(key) # Return one column as a regular numpy array elif key in self.column_names: idx = self.column_names.index(key) return self.val[:, idx] # Raise error for unknown attributes else: raise AttributeError(f"{type(self).__name__!r} has no attribute {key!r}") from None def __dir__(self): """List all fields and attributes on the PosDeltaBase array""" return super().__dir__() + list(_SYSTEMS[self.cls_name]) + list(self.column_names) def __iadd__(self, other): """self += other""" return self.__add__(other) def __isub__(self, other): """self -= other""" return self.__sub__(other) # For matrix-multiplication use trs.mat @ numbers[slice] def __matmul__(self, _): """self @ _""" return NotImplemented def __rmatmul__(self, _): """_ @ self""" return NotImplemented def __imatmul__(self, _): """self @= _""" return NotImplemented # For _ matematical operations use pos.val def __mul__(self, _): """self * _""" return NotImplemented def __rmul__(self, _): """_ * self""" return NotImplemented def __imul__(self, _): """self *= _""" return NotImplemented def __truediv__(self, _): """self / _""" return NotImplemented def __rtruediv__(self, _): """_ / self""" return NotImplemented def __itruediv__(self, _): """self /= _""" return NotImplemented def __floordiv__(self, _): """self // _""" return NotImplemented def __rfloordiv__(self, _): """ _ // self""" return NotImplemented def __ifloordiv__(self, _): """self //= _""" return NotImplemented def __pow__(self, _): """ self ** _""" return NotImplemented def __rpow(self, _): """ _ ** self """ return NotImplemented def __ipow__(self, _): """ self **= _""" return NotImplemented class PositionArray(PosBase): """Base class for Position arrays This PositionArray should not be instantiated. Instead instantiate one of the system specific subclasses, typically using the Position factory function. """ cls_name = "PositionArray" def __new__(cls, val, ellipsoid=ellipsoid.GRS80, **pos_args): """Create a new Position""" if cls.system is None or cls.column_names is None: raise ValueError( f"{cls.cls_name} cannot be instantiated. Use {cls.cls_name.strip('Array')}(val=..., system={cls.system!r}) instead" ) obj = np.asarray(val, dtype=float, order="C").view(cls) obj.system = cls.system obj.ellipsoid = ellipsoid for attr in cls._attributes().keys(): setattr(obj, attr, pos_args.get(attr)) return obj def __array_finalize__(self, obj): """Called automatically when a new Position is created""" if obj is None: return # Validate shape num_columns = len(self.column_names) if self.shape[-1] != num_columns: column_names = ", ".join(self.column_names) raise ValueError(f"{type(self).__name__!r} requires {num_columns} columns: {column_names}") if self.ndim == 1: self.resize(1, num_columns) elif self.ndim != 2: raise ValueError(f"{type(self).__name__!r} must be a 1- or 2-dimensional array with {num_columns} columns") # Copy attributes from the original object self.system = getattr(obj, "system", None) self.ellipsoid = getattr(obj, "ellipsoid", ellipsoid.GRS80) for attr in self._attributes().keys(): setattr(self, attr, getattr(obj, attr, None)) @staticmethod def create(val: np.ndarray, system: str, **pos_args: Any) -> "PositionArray": """Factory for creating PositionArrays for different systems See each position class for exact optional parameters. Args: val: Array of position values. system: Name of position system. pos_args: Additional arguments used to create the PositionArray. Returns: Array with positions in the given system. """ if system not in _SYSTEMS["PositionArray"]: systems = ", ".join(_SYSTEMS["PositionArray"]) raise exceptions.UnknownSystemError(f"System {system!r} unknown. Use one of {systems}") return _SYSTEMS["PositionArray"][system](val, **pos_args) @classmethod def from_position(cls, val: np.ndarray, other: "PositionArray") -> "PositionArray": """Create a new position with given values and same attributes as other position Factory method for creating a new position array with the given values. Attributes will be copied from the other position. """ attrs = {a: getattr(other, a, None) for a in cls._attributes().keys()} return _SYSTEMS[cls.cls_name][other.system](val, **attrs) @classmethod def convert_to(cls, pos: "PositionArray", converter: Callable) -> "PositionArray": """Convert the position to the type of this class Applies the converter function that is provides and copies all registered attributes to the new position """ attrs = {a: getattr(pos, a, None) for a in cls._attributes().keys()} return _SYSTEMS[cls.cls_name][cls.system](converter(pos), **attrs) @property def pos(self): """Allows base classes to implement this attribute""" return self @property @register_field(units=("unitless", "unitless", "unitless")) def enu2trs(self): lat, lon, _ = self.pos.llh.val.T return rotation.enu2trs(lat, lon) @property @register_field(units=("unitless", "unitless", "unitless")) def trs2enu(self): lat, lon, _ = self.pos.llh.val.T return rotation.enu2trs(lat, lon) @property @register_field(units=("unitless", "unitless", "unitless")) def enu_east(self): """ Unit vector for in the east direction""" return self.enu2trs[:, :, 0:1] @property @register_field(units=("unitless", "unitless", "unitless")) def enu_north(self): """ Unit vector for in the north direction""" return self.enu2trs[:, :, 1:2] @property @register_field(units=("unitless", "unitless", "unitless")) def enu_up(self): """ Unit vector for in the up direction""" return self.enu2trs[:, :, 2:3] def vector_to(self, other: "PositionArray") -> np.ndarray: """Vector to other positions in current system Args: other: Other position array. Returns: Array of vectors to other position array. """ return other.pos.to_system(self.system).val - self.pos.val @property @register_field(units=("meter", "meter", "meter")) def vector(self): """Vector to registered other position""" if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.vector_to(self.other) def distance_to(self, other): """Distance to other positions in current system Args: other: Other position array Returns: Array of distances to other position array. """ return np.linalg.norm(self.vector_to(other), axis=1) @property @register_field(units=("meter",)) def distance(self): """Distance to registered other position""" if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.distance_to(self.other) def direction_to(self, other): try: return self.vector_to(other) / self.distance_to(other)[:, None] except AttributeError: return other.direction_from(self) @property @register_field(units=("unitless", "unitless", "unitless")) def direction(self): if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.direction_to(self.other) def aberrated_direction_to(self, other): return other.aberrated_direction_from(self) @property @register_field(units=("unitless", "unitless", "unitless")) def aberrated_direction(self): if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.aberrated_direction_to(self.other) def aberrated_direction_from(self, other): """Calculate aberrated direction vector from earth based position""" return other.direction_to(self - other.aberration_to(self)) # Todo: Check + or - def aberration_to(self, other): flight_time = self.distance_to(other) / constant.c rotation_angle = flight_time * constant.omega return (rotation.R3(rotation_angle) @ other.mat)[:, :, 0] - other.val @property @register_field(units=("meter", "meter", "meter")) def aberration(self): if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.aberration_to(self.other) def azimuth_to(self, other): east_proj = (self.trs.direction_to(other)[:, None, :] @ self.enu_east)[:, 0, 0] north_proj = (self.trs.direction_to(other)[:, None, :] @ self.enu_north)[:, 0, 0] return np.arctan2(east_proj, north_proj) @property @register_field(units=("radians",)) def azimuth(self): if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.azimuth_to(self.other) def aberrated_azimuth_to(self, other): east_proj = (self.trs.aberrated_direction_to(other)[:, None, :] @ self.enu_east)[:, 0, 0] north_proj = (self.trs.aberrated_direction_to(other)[:, None, :] @ self.enu_north)[:, 0, 0] return np.arctan2(east_proj, north_proj) @property @register_field(units=("radians",)) def aberrated_azimuth(self): if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.aberrated_azimuth_to(self.other) def elevation_to(self, other): up_proj = (self.trs.direction_to(other)[:, None, :] @ self.enu_up)[:, 0, 0] return np.arcsin(up_proj) @property @register_field(units=("radians",)) def elevation(self): if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.elevation_to(self.other) def aberrated_elevation_to(self, other): up_proj = (self.trs.aberrated_direction_to(other)[:, None, :] @ self.enu_up)[:, 0, 0] return np.arcsin(up_proj) @property @register_field(units=("radians",)) def aberrated_elevation(self): if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.aberrated_elevation_to(self.other) def zenith_distance_to(self, other): return np.pi / 2 - self.elevation_to(other) @property @register_field(units=("radians",)) def zenith_distance(self): return np.pi / 2 - self.elevation def aberrated_zenith_distance_to(self, other): return np.pi / 2 - self.aberrated_elevation_to(other) @property @register_field(units=("radians",)) def aberrated_zenith_distance(self): if self.other is None: raise exceptions.InitializationError("Other position is not defined") return self.aberrated_zenith_distance_to(self.other) def __add__(self, other): """self + other""" if isinstance(other, PositionDeltaArray): if self.system != other.system: return NotImplemented return self.from_position(val=self.val + other.val, other=self) elif isinstance(other, PositionArray): # pos1 + pos2 does not make sense return NotImplemented return NotImplemented def __radd__(self, other): """other + self""" if isinstance(other, PositionDeltaArray): if self.system != other.system: return NotImplemented elif isinstance(other, PositionArray): # pos1 + pos2 does not make sense return NotImplemented return NotImplemented def __sub__(self, other): """self - other""" if isinstance(other, PositionArray): if self.system != other.system: return NotImplemented try: return PositionDeltaArray.from_position(val=self.val - other.val, other=self) except KeyError: return NotImplemented elif isinstance(other, PositionDeltaArray): if self.system != other.system: return NotImplemented return self.from_position(val=self.val - other.val, other=self) return NotImplemented def __rsub__(self, other): """other - self""" if isinstance(other, PositionArray): if self.system != other.system: return NotImplemented elif isinstance(other, PositionDeltaArray): if self.system != other.system: return NotImplemented return NotImplemented def __deepcopy__(self, memo): """Deep copy a PositionArray Makes sure references to other objects are updated correctly """ attrs = {a: copy.deepcopy(getattr(self, a, None), memo) for a in self._attributes().keys()} new_pos = PositionArray.create(val=np.asarray(self).copy(), system=self.system, **attrs) memo[id(self)] = new_pos return new_pos @classmethod def _read(cls, h5_group, memo): system = h5_group.attrs["system"] ellipsoid_ = ellipsoid.get(h5_group.attrs["ellipsoid"]) pos_args = {} for a, attr_cls in PositionArray._attributes().items(): if a in h5_group.attrs: # attribute is a reference to the data of another field pos_args.update({a: memo[h5_group.attrs[a]]}) elif a in h5_group and isinstance(h5_group[a], type(h5_group)): # attribute is a part of this group and is not in a separate field arg = attr_cls._read(h5_group[a], memo) pos_args.update({a: arg}) memo[f"{h5_group.attrs['fieldname']}.{a}"] = arg val = h5_group[h5_group.attrs["fieldname"]][...] return cls.create(val, system=system, ellipsoid=ellipsoid_, **pos_args) def _write(self, h5_group, memo): h5_group.attrs["system"] = self.system h5_group.attrs["ellipsoid"] = self.ellipsoid.name h5_field = h5_group.create_dataset(h5_group.attrs["fieldname"], self.shape, dtype=self.dtype) h5_field[...] = self.val for a in PositionArray._attributes().keys(): attr = getattr(self, a, None) if attr is None: continue if id(attr) in memo: # attribute is a reference to the data of another field h5_group.attrs[a] = memo[id(attr)] else: # attribute is stored as part of this in PositionArray h5_sub_group = h5_group.create_group(a) h5_sub_group.attrs["fieldname"] = a memo[id(attr)] = f"{h5_group.attrs['fieldname']}.{a}" attr._write(h5_sub_group, memo) # Potential recursive call class PositionDeltaArray(PosBase): """Base class for position deltas This PositionDeltaArray should not be instantiated. Instead instantiate one of the system specific subclasses, typically using the PositionDelta factory function. """ system = None column_names = None cls_name = "PositionDeltaArray" def __new__(cls, val, ref_pos, **delta_args): """Create a new Positiondelta""" if cls.system is None or cls.column_names is None: raise ValueError( f"{cls.cls_name} cannot be instantiated. Use {cls.cls_name.strip('Array')}(val=..., system={cls.system!r}) instead" ) obj = np.asarray(val, dtype=float, order="C").view(cls) obj.system = cls.system obj.ref_pos = ref_pos for attr in cls._attributes().keys(): setattr(obj, attr, delta_args.get(attr)) return obj def __array_finalize__(self, obj): """Called automatically when a new PositionDelta is created""" if obj is None: return # Validate shape num_columns = len(self.column_names) if self.shape[-1] != num_columns: column_names = ", ".join(self.column_names) raise ValueError(f"{type(self).__name__!r} requires {num_columns} columns: {column_names}") if self.ndim == 1: self.resize(1, num_columns) elif self.ndim != 2: raise ValueError(f"{type(self).__name__!r} must be a 1- or 2-dimensional array with {num_columns} columns") # Copy attributes from the original object self.system = getattr(obj, "system", None) self.ref_pos = getattr(obj, "ref_pos", None) @staticmethod def create(val: np.ndarray, system: str, ref_pos, **delta_args: Any) -> "PositionDeltaArray": """Factory for creating PositionDeltaArrays for different systems See each position delta class for exact optional parameters. Args: val: Array of position deltas. system: Name of position system. ref_pos: Array of reference positions. delta_args: Additional arguments used to create the PositionDeltaArray. Returns: Array with positions in the given system. """ if system not in _SYSTEMS["PositionDeltaArray"]: systems = ", ".join(_SYSTEMS["PositionDeltaArray"]) raise exceptions.UnknownSystemError(f"System {system!r} unknown. Use one of {systems}") return _SYSTEMS["PositionDeltaArray"][system](val, ref_pos, **delta_args) @classmethod def from_position(cls, val: np.ndarray, other: "PositionArray") -> "PositionDeltaArray": """Create a new position delta with given values and same attributes as other position Factory method for creating a new position array with the given values. Attributes will be copied from the other position. """ attrs = {a: getattr(other, a, None) for a in cls._attributes().keys()} return _SYSTEMS[cls.cls_name][other.system](val, ref_pos=other, **attrs) @classmethod def from_position_delta(cls, val: np.ndarray, other: "PositionDeltaArray") -> "PositionDeltaArray": """Create a new position with given values and same attributes as other position Factory method for creating a new position array with the given values. Attributes will be copied from the other position. """ attrs = {a: getattr(other, a, None) for a in cls._attributes().keys()} return _SYSTEMS[cls.cls_name][other.system](val, ref_pos=other.ref_pos, **attrs) @classmethod def convert_to(cls, pos_delta: "PositionDeltaArray", converter: Callable) -> "PositionDeltaArray": """Convert the position delta to the type of this class Applies the converter function that is provides and copies all registered attributes to the new position delta """ attrs = {a: getattr(pos_delta, a, None) for a in cls._attributes().keys()} return _SYSTEMS[cls.cls_name][cls.system](converter(pos_delta), ref_pos=pos_delta.ref_pos, **attrs) @property def pos(self): """Allows base classes to implement this attribute""" return self def __add__(self, other): """self + other""" if isinstance(other, PositionDeltaArray): if self.system != other.system: return NotImplemented return other.from_position_delta(val=self.val + other.val, other=self) elif isinstance(other, PositionArray): if self.system != other.system: return NotImplemented return other.from_position(val=self.val + other.val, other=other) return NotImplemented def __radd__(self, other): """other + self""" if isinstance(other, PositionArray): if self.system != other.system: return NotImplemented elif isinstance(other, PositionDeltaArray): if self.system != other.system: return NotImplemented return NotImplemented def __sub__(self, other): """self - other""" if isinstance(other, PositionDeltaArray): if self.system != other.system: return NotImplemented return other.from_position_delta(val=self.val - other.val, other=self) elif isinstance(other, PositionArray): if self.system != other.system: return NotImplemented return other.from_position(val=self.val - other.val, other=other) return NotImplemented def __rsub__(self, other): """other - self""" if isinstance(other, PositionArray): if self.system != other.system: return NotImplemented elif isinstance(other, PositionDeltaArray): if self.system != other.system: return NotImplemented return NotImplemented def __deepcopy__(self, memo): """Deep copy a PositionDeltaArray Makes sure references to other objects are updated correctly """ attrs = {a: copy.deepcopy(getattr(self, a, None), memo) for a in list(self._attributes().keys()) + ["ref_pos"]} new_pos = PositionDeltaArray.create(val=np.asarray(self).copy(), system=self.system, **attrs) memo[id(self)] = new_pos return new_pos @classmethod def _read(cls, h5_group, memo): system = h5_group.attrs["system"] cls_args = {"ref_pos": PositionArray} delta_args = {} for a, attr_cls in {**PositionDeltaArray._attributes(), **cls_args}.items(): if a in h5_group.attrs: # attribute is a reference to the data of another field delta_args.update({a: memo[h5_group.attrs[a]]}) elif a in h5_group and isinstance(h5_group[a], type(h5_group)): # attribute is a part of this group and is not in a separate field arg = attr_cls._read(h5_group[a], memo) delta_args.update({a: arg}) memo[f"{h5_group.attrs['fieldname']}.{a}"] = arg val = h5_group[h5_group.attrs["fieldname"]][...] return cls.create(val, system=system, **delta_args) def _write(self, h5_group, memo): h5_group.attrs["system"] = self.system h5_field = h5_group.create_dataset(h5_group.attrs["fieldname"], self.shape, dtype=self.dtype) h5_field[...] = self.val cls_args = {"ref_pos": PositionArray} for a in {**PositionDeltaArray._attributes(), **cls_args}.keys(): attr = getattr(self, a, None) if attr is None: continue if id(attr) in memo: # attribute is a reference to the data of another field h5_group.attrs[a] = memo[id(attr)] else: # attribute is stored as part of this in PositionArray h5_sub_group = h5_group.create_group(a) h5_sub_group.attrs["fieldname"] = a memo[id(attr)] = f"{h5_group.attrs['fieldname']}.{a}" attr._write(h5_sub_group, memo) # Potential recursive call class VelocityArray(PosBase): """Base class for Velocity arrays This VelocityArray should not be instantiated. Instead instantiate one of the system specific subclasses. The intended usage will be through a PosVelArray """ cls_name = "VelocityArray" def __new__(cls, val, ref_pos, **vel_args): """Create a new VelocityArray""" if cls.system is None or cls.column_names is None: raise ValueError( f"{cls.__name__} cannot be instantiated. Use VelocityArray(val=..., system={cls.system!r}) instead" ) obj = np.asarray(val, dtype=float, order="C").view(cls) obj.system = cls.system # Reference position needed to make conversions to other systems obj.ref_pos = ref_pos for attr in cls._attributes().keys(): setattr(obj, attr, vel_args.get(attr)) return obj def __array_finalize__(self, obj): """Called automatically when a new PositionDelta is created""" if obj is None: return # Validate shape num_columns = len(self.column_names) if self.shape[-1] != num_columns: column_names = ", ".join(self.column_names) raise ValueError(f"{type(self).__name__!r} requires {num_columns} columns: {column_names}") if self.ndim == 1: self.resize(1, num_columns) elif self.ndim != 2: raise ValueError(f"{type(self).__name__!r} must be a 1- or 2-dimensional array with {num_columns} columns") # Copy attributes from the original object self.system = getattr(obj, "system", None) self.ref_pos = getattr(obj, "ref_pos", None) @classmethod def convert_to(cls, vel: "VelocityArray", converter: Callable) -> "VelocityArray": """Convert the position to the type of this class Applies the converter function that is provides and copies all registered attributes to the new position """ attrs = {a: getattr(vel, a, None) for a in cls._attributes().keys()} return _SYSTEMS[cls.cls_name][cls.system](converter(vel), ref_pos=vel.ref_pos, **attrs) def __add__(self, _): """self + other""" return NotImplemented def __radd__(self, _): """other + self""" return NotImplemented def __sub__(self, _): """self - other""" return NotImplemented def __rsub__(self, _): """other - self""" return NotImplemented class VelocityDeltaArray(PosBase): """Base class for Velocity arrays This VelocityArray should not be instantiated. Instead instantiate one of the system specific subclasses. The intended usage will be through a PosVelArray """ cls_name = "VelocityDeltaArray" def __new__(cls, val, ref_pos, **vel_args): """Create a new VelocityArray""" if cls.system is None or cls.column_names is None: raise ValueError( f"{cls.__name__} cannot be instantiated. Use VelocityDelta(val=..., system={cls.system!r}) instead" ) obj = np.asarray(val, dtype=float, order="C").view(cls) obj.system = cls.system # Reference position needed to make conversions to other systems obj.ref_pos = ref_pos for attr in cls._attributes().keys(): setattr(obj, attr, vel_args.get(attr)) return obj def __array_finalize__(self, obj): """Called automatically when a new VelocityDelta is created""" if obj is None: return # Validate shape num_columns = len(self.column_names) if self.shape[-1] != num_columns: column_names = ", ".join(self.column_names) raise ValueError(f"{type(self).__name__!r} requires {num_columns} columns: {column_names}") if self.ndim == 1: self.resize(1, num_columns) elif self.ndim != 2: raise ValueError(f"{type(self).__name__!r} must be a 1- or 2-dimensional array with {num_columns} columns") # Copy attributes from the original object self.system = getattr(obj, "system", None) self.ref_pos = getattr(obj, "ref_pos", None) @classmethod def convert_to(cls, vel: "VelocityDeltaArray", converter: Callable) -> "VelocityDeltaArray": """Convert the position to the type of this class Applies the converter function that is provides and copies all registered attributes to the new position """ attrs = {a: getattr(vel, a, None) for a in cls._attributes().keys()} return _SYSTEMS[cls.cls_name][cls.system](converter(vel), ref_pos=vel.ref_pos, **attrs) def __add__(self, _): """self + other""" return NotImplemented def __radd__(self, _): """other + self""" return NotImplemented def __sub__(self, _): """self - other""" return NotImplemented def __rsub__(self, _): """other - self""" return NotImplemented class PosVelArray(PositionArray): """Base class for Position and Velocity arrays This PosVelArray should not be instantiated. Instead instantiate one of the system specific subclasses, typically using the Position factory function. """ cls_name = "PosVelArray" @staticmethod def create(val: np.ndarray, system: str, **pos_args: Any) -> "PosVelArray": """Factory for creating PosVelArrays for different systems See each position class for exact optional parameters. Args: val: Array of position and velocity values. system: Name of position system. pos_args: Additional arguments used to create the PosVelArray. Returns: Array with positions in the given system. """ if system not in _SYSTEMS["PosVelArray"]: systems = ", ".join(_SYSTEMS["PosVelArray"]) raise exceptions.UnknownSystemError(f"System {system!r} unknown. Use one of {systems}") return _SYSTEMS["PosVelArray"][system](val, **pos_args) @classmethod def from_posvel(cls, val: np.ndarray, other: "PosVelArray") -> "PosVelArray": """Create a new posvel object with given values and same attributes as other Factory method for creating a array with the given values. Attributes will be copied from the other position. """ attrs = {a: getattr(other, a, None) for a in cls._attributes().keys()} return _SYSTEMS["PosVelArray"][other.system](val, **attrs) @classmethod def convert_to(cls, pos: "PosVelArray", converter: Callable) -> "PosVelArray": """Convert the position to the type of this class Applies the converter function that is provides and copies all registered attributes to the new position """ attrs = {a: getattr(pos, a, None) for a in cls._attributes().keys()} return _SYSTEMS["PosVelArray"][cls.system](converter(pos), **attrs) @property def pos(self): attrs = {a: getattr(self, a, None) for a in self._attributes().keys()} return _SYSTEMS["PositionArray"][self.system](self.val[:, 0:3], **attrs) @property def vel(self): attrs = {a: getattr(self, a, None) for a in self._attributes().keys()} return _SYSTEMS["VelocityArray"][self.system](self.val[:, 3:6], ref_pos=self.pos, **attrs) def __add__(self, other): """self + other""" if isinstance(other, PosVelDeltaArray): if self.system != other.system: return NotImplemented return self.from_position(val=self.val + other.val, other=self) elif isinstance(other, PosVelArray): # pos1 + pos2 does not make sense return NotImplemented return NotImplemented def __radd__(self, other): """other + self""" if isinstance(other, PosVelDeltaArray): if self.system != other.system: return NotImplemented elif isinstance(other, PosVelArray): # pos1 + pos2 does not make sense return NotImplemented return NotImplemented def __sub__(self, other): """self - other""" if isinstance(other, PosVelArray): if self.system != other.system: return NotImplemented try: return PosVelDeltaArray.from_position(val=self.val - other.val, other=self) except KeyError: return NotImplemented elif isinstance(other, PosVelDeltaArray): if self.system != other.system: return NotImplemented return self.from_position(val=self.val - other.val, other=self) return NotImplemented def __rsub__(self, other): """other - self""" if isinstance(other, PosVelArray): if self.system != other.system: return NotImplemented elif isinstance(other, PosVelDeltaArray): if self.system != other.system: return NotImplemented return NotImplemented def __deepcopy__(self, memo): """Deep copy a PosVelArray Makes sure references to other objects are updated correctly """ attrs = {a: copy.deepcopy(getattr(self, a, None), memo) for a in self._attributes().keys()} new_pos = PosVelArray.create(val=np.asarray(self).copy(), system=self.system, **attrs) memo[id(self)] = new_pos return new_pos @classmethod def _read(cls, h5_group, memo): system = h5_group.attrs["system"] ellipsoid_ = ellipsoid.get(h5_group.attrs["ellipsoid"]) pos_args = {} for a, attr_cls in PosVelArray._attributes().items(): if a in h5_group.attrs: # attribute is a reference to the data of another field pos_args.update({a: memo[h5_group.attrs[a]]}) elif a in h5_group and isinstance(h5_group[a], type(h5_group)): # attribute is a part of this group and is not in a separate field arg = attr_cls._read(h5_group[a], memo) pos_args.update({a: arg}) memo[f"{h5_group.attrs['fieldname']}.{a}"] = arg val = h5_group[h5_group.attrs["fieldname"]][...] return cls.create(val, system=system, ellipsoid=ellipsoid_, **pos_args) def _write(self, h5_group, memo): h5_group.attrs["system"] = self.system h5_group.attrs["ellipsoid"] = self.ellipsoid.name h5_field = h5_group.create_dataset(h5_group.attrs["fieldname"], self.shape, dtype=self.dtype) h5_field[...] = self.val for a in PosVelArray._attributes().keys(): attr = getattr(self, a, None) if attr is None: continue if id(attr) in memo: # attribute is a reference to the data of another field h5_group.attrs[a] = memo[id(attr)] else: # attribute is stored as part of this in PosVelArray h5_sub_group = h5_group.create_group(a) h5_sub_group.attrs["fieldname"] = a memo[id(attr)] = f"{h5_group.attrs['fieldname']}.{a}" attr._write(h5_sub_group, memo) # Potential recursive call # # Position Delta # class PosVelDeltaArray(PositionDeltaArray): """Base class for position and velocity deltas This PosVelDeltaArray should not be instantiated. Instead instantiate one of the system specific subclasses, typically using the PositionDelta factory function. """ system = None column_names = None cls_name = "PosVelDeltaArray" @staticmethod def create(val: np.ndarray, system: str, ref_pos, **delta_args: Any) -> "PosVelDeltaArray": """Factory for creating PosVelDeltaArrays for different systems See each position delta class for exact optional parameters. Args: val: Array of position deltas. system: Name of position system. ref_pos: Array of reference positions. delta_args: Additional arguments used to create the PosVelDeltaArray. Returns: Array with positions in the given system. """ if system not in _SYSTEMS["PosVelDeltaArray"]: systems = ", ".join(_SYSTEMS["PosVelDeltaArray"]) raise exceptions.UnknownSystemError(f"System {system!r} unknown. Use one of {systems}") return _SYSTEMS["PosVelDeltaArray"][system](val, ref_pos, **delta_args) @property def pos(self): attrs = {a: getattr(self, a, None) for a in self._attributes().keys()} return _SYSTEMS["PositionDeltaArray"][self.system](self.val[:, 0:3], ref_pos=self.ref_pos, **attrs) @property def vel(self): attrs = {a: getattr(self, a, None) for a in self._attributes().keys()} return _SYSTEMS["VelocityDeltaArray"][self.system](self.val[:, 3:6], ref_pos=self.ref_pos, **attrs) def __sub__(self, other): """self - other""" if isinstance(other, PosVelArray): if self.system != other.system: return NotImplemented try: return PosVelDeltaArray.from_position(val=self.val - other.val, other=self) except KeyError: return NotImplemented elif isinstance(other, PosVelDeltaArray): if self.system != other.system: return NotImplemented return self.from_position(val=self.val - other.val, other=self) def __deepcopy__(self, memo): """Deep copy a PositionDeltaArray Makes sure references to other objects are updated correctly """ attrs = {a: copy.deepcopy(getattr(self, a, None), memo) for a in list(self._attributes().keys()) + ["ref_pos"]} new_pos = PosVelDeltaArray.create(val=np.asarray(self).copy(), system=self.system, **attrs) memo[id(self)] = new_pos return new_pos @classmethod def _read(cls, h5_group, memo): system = h5_group.attrs["system"] cls_args = {"ref_pos": PosVelArray} delta_args = {} for a, attr_cls in {**cls._attributes(), **cls_args}.items(): if a in h5_group.attrs: # attribute is a reference to the data of another field delta_args.update({a: memo[h5_group.attrs[a]]}) elif a in h5_group and isinstance(h5_group[a], type(h5_group)): # attribute is a part of this group and is not in a separate field arg = attr_cls._read(h5_group[a], memo) delta_args.update({a: arg}) memo[f"{h5_group.attrs['fieldname']}.{a}"] = arg val = h5_group[h5_group.attrs["fieldname"]][...] return cls.create(val, system=system, **delta_args) def _write(self, h5_group, memo): h5_group.attrs["system"] = self.system h5_field = h5_group.create_dataset(h5_group.attrs["fieldname"], self.shape, dtype=self.dtype) h5_field[...] = self.val cls_args = {"ref_pos": PosVelArray} for a in {**PosVelDeltaArray._attributes(), **cls_args}.keys(): attr = getattr(self, a, None) if attr is None: continue if id(attr) in memo: # attribute is a reference to the data of another field h5_group.attrs[a] = memo[id(attr)] else: # attribute is stored as part of this in PosVelArray h5_sub_group = h5_group.create_group(a) h5_sub_group.attrs["fieldname"] = a memo[id(attr)] = f"{h5_group.attrs['fieldname']}.{a}" attr._write(h5_sub_group, memo) # Potential recursive call PKd2N 5D((midgard/data/_position_delta.py"""Array with positions """ # Standard library imports from typing import Any, Callable, Dict, List, Tuple # Third party imports import numpy as np # Midgard imports from midgard.dev import exceptions from midgard.math import rotation _SYSTEMS: Dict[str, "PositionDeltaArray"] = dict() # Populated by register_system() _CONVERSIONS: Dict[Tuple[str, str], Callable] = dict() # Populated by register_system() _CONVERSION_HOPS: Dict[Tuple[str, str], List[str]] = dict() # Cache for to_system() def register_system( convert_to: Dict[str, Callable] = None, convert_from: Dict[str, Callable] = None ) -> Callable[[Callable], Callable]: """Decorator used to register new position systems The system name is read from the .system attribute of the Position class. Args: convert_to: Functions used to convert to other systems. convert_from: Functions used to convert from other systems. Returns: Decorator registering system. """ def wrapper(cls: Callable): name = cls.system _SYSTEMS[name] = cls if convert_to: for to_system, converter in convert_to.items(): _CONVERSIONS[(name, to_system)] = converter if convert_from: for from_system, converter in convert_from.items(): _CONVERSIONS[(to_system, name)] = converter return cls return wrapper def _find_conversion_hops(hop: Tuple[str, str]) -> List[Tuple[str, str]]: """Calculate the hops needed to convert between systems using breadth first search""" start_sys, target_sys = hop queue = [(start_sys, [])] visited = set() while queue: from_sys, hops = queue.pop(0) for to_sys in [t for f, t in _CONVERSIONS if f == from_sys]: one_hop = (from_sys, to_sys) if to_sys == target_sys: return hops + [one_hop] if one_hop not in visited: visited.add(one_hop) queue.append((to_sys, hops + [one_hop])) raise exceptions.UnknownConversionError(f"Can't convert PositionDeltaArray from {start_sys!r} to {target_sys!r}") # # Position Delta # class PositionDeltaArray(np.ndarray): """Base class for position deltas This PositionDeltaArray should not be instatiated. Instead instantiate one of the system specific subclasses, typically using the PositionDelta factory function. """ system = None column_names = None _systems = _SYSTEMS def __new__(cls, val, ref_pos, **delta_args): """Create a new Positiondelta""" if cls.system is None or cls.column_names is None: raise ValueError( f"{cls.__name__} cannot be instantiated. Use PositionDelta(val=..., system={cls.system!r}) instead" ) obj = np.asarray(val, dtype=float, order="C").view(cls) obj.system = cls.system obj.ref_pos = ref_pos return obj def __array_finalize__(self, obj): """Called automatically when a new PositionDelta is created""" if obj is None: return # Validate shape num_columns = len(self.column_names) if self.shape[-1] != num_columns: column_names = ", ".join(self.column_names) raise ValueError(f"{type(self).__name__!r} requires {num_columns} columns: {column_names}") if self.ndim == 1: self.resize(1, num_columns) elif self.ndim != 2: raise ValueError(f"{type(self).__name__!r} must be a 1- or 2-dimensional array with {num_columns} columns") # Copy attributes from the original object self.system = getattr(obj, "system", None) self.ref_pos = getattr(obj, "ref_pos", None) @staticmethod def create(val: np.ndarray, system: str, ref_pos, **delta_args: Any) -> "PositionDeltaArray": """Factory for creating PositionDeltaArrays for different systems See each position delta class for exact optional parameters. Args: val: Array of position deltas. system: Name of position system. ref_pos: Array of reference positions. delta_args: Additional arguments used to create the PositionDeltaArray. Returns: Array with positions in the given system. """ if system not in _SYSTEMS: systems = ", ".join(_SYSTEMS) raise exceptions.UnknownSystemError(f"System {system!r} unknown. Use one of {systems}") return _SYSTEMS[system](val, ref_pos, **delta_args) @property def val(self): """Position delta as a plain numpy array""" return np.asarray(self) @property def mat(self): """Position deltas as an array of matrices Adds an extra dimension, so that matrix multiplication can be performed effectively. By default .mat returns an array of column vectors. To effectively access row vectors instead, operate on a transposed delta: Example: >>> delta # shape (2, 3) EnuPositionDelta([[0., 0., 1.], [1., 1., 0.]]) >>> delta.mat # Column vectors, shape (2, 3, 1) array([[[0.], [0.], [1.]], [[1.], [1.], [0.]]]) >>> delta.T.mat # Row vectors, shape (2, 1, 3) array([[[0., 0., 1.]], [[1., 1., 0.]]]) """ is_transposed = self.flags.f_contiguous # Because we forced order == "C" on creation if is_transposed: return np.asarray(self)[:, None, :].T else: return np.asarray(self)[:, :, None] def to_system(self, system: str) -> "PositionDeltaArray": """Convert position to a different system Returns a new PositionDeltaArray with the same position in the new system. Args: system: Name of new system. Returns: PositionDeltaArray representing the same positions in the new system. """ # Don't convert if not necessary if system == self.system: return self # Raise error for unknown systems if system not in _SYSTEMS: systems = ", ".join(_SYSTEMS) raise exceptions.UnknownSystemError(f"System {system!r} unknown. Use one of {systems}") # Convert to new system hop = (self.system, system) if hop in _CONVERSIONS: return _SYSTEMS[system](_CONVERSIONS[hop](self), ref_pos=self.ref_pos) if hop not in _CONVERSION_HOPS: _CONVERSION_HOPS[hop] = _find_conversion_hops(hop) val = self for one_hop in _CONVERSION_HOPS[hop]: print(one_hop) val = _CONVERSIONS[one_hop](val) return _SYSTEMS[system](val, ref_pos=self.ref_pos) def __getattr__(self, key): """Get attributes with dot notation Add systems and column names to attributes on Position arrays. Args: key: Name of attribute. Returns: Value of attribute. """ # Convert to a different system if key in _SYSTEMS: return self.to_system(key) # Return one column as a regular numpy array elif key in self.column_names: idx = self.column_names.index(key) return np.asarray(self)[:, idx] # Raise error for unknown attributes else: raise AttributeError(f"{type(self).__name__!r} has no attribute {key!r}") from None def __dir__(self): """List all fields and attributes on the Position array""" return super().__dir__() + list(_SYSTEMS) + list(self.column_names) def __add__(self, other): """Add position delta with other object Convert other position delta to correct system. If other is a position, delegate to that object. In all other cases, treat """ print(f"{type(self).__name__}.__add__({other})") if isinstance(other, PositionDeltaArray): return self.create( val=np.asarray(self) + np.asarray(other.to_system(self.system)), system=self.system, ref_pos=self.ref_pos, ) return super().__add__(other.to_system(self.system)) # elif isinstance(other, PositionDeltaArray): # return NotImplemented else: return NotImplemented def __radd__(self, other): print(f"{type(self).__name__}.__radd__({other})") return NotImplemented def __iadd__(self, other): print(f"{type(self).__name__}.__iadd__({other})") return self.__add__(other) def __sub__(self, other): print(f"{type(self).__name__}.__sub__({other})") if isinstance(other, PositionDeltaArray): return self.create( val=np.asarray(self) - np.asarray(other.to_system(self.system)), system=self.system, ref_pos=self.ref_pos, ) return NotImplemented def __rsub__(self, other): print(f"{type(self).__name__}.__rsub__({other})") return NotImplemented def __isub__(self, other): print(f"{type(self).__name__}.__isub__({other})") return self.__sub__(other) def __matmul__(self, other): return NotImplemented def __rmatmul__(self, other): return NotImplemented def __imatmul__(self, other): return NotImplemented def delta_trs2enu(trs: "TrsPositionDelta") -> np.ndarray: """Convert position deltas from TRS to ENU""" lat, lon, _ = trs.ref_pos.llh.val.T trs2enu = rotation.trs2enu(lat, lon) return (trs2enu @ trs.mat)[:, :, 0] def delta_enu2trs(enu: "EnuPositionDelta") -> np.ndarray: """Convert position deltas from ENU to TRS""" lat, lon, _ = enu.ref_pos.llh.val.T enu2trs = rotation.enu2trs(lat, lon) return (enu2trs @ enu.mat)[:, :, 0] @register_system(convert_to=dict(enu=delta_trs2enu)) class TrsPositionDelta(PositionDeltaArray): system = "trs" column_names = ("x", "y", "z") @register_system(convert_to=dict(trs=delta_enu2trs)) class EnuPositionDelta(PositionDeltaArray): system = "enu" column_names = ("east", "north", "up") PK΄9Nmidgard/data/_taiutc.txt# Based on https://hpiers.obspm.fr/eoppc/bul/bulc/UTC-TAI.history # columns: start_interval, end_interval, offset_in_seconds, offset_in_mjd, factor_in_seconds # The transition from TAI-UTC at a specific epoch is then given by the formula: # TAI-UTC = offset_in_seconds + (epoch_mjd - offset_in_mjd) * factor_in_seconds # The start and end intervals are given in UTC at 0h hours # # After 1972 the difference between TAI and UTC is simply the number of leap seconds introduced # Whenever a new leap second is introduced this file needs to be updated 1961-01-01 1961-08-01 1.4228180 37300 0.001296 1961-08-01 1962-01-01 1.3728180 37300 0.001296 1962-01-01 1963-11-01 1.8458580 37665 0.0011232 1963-11-01 1964-01-01 1.9458580 37665 0.0011232 1964-01-01 1964-04-01 3.2401300 38761 0.001296 1964-04-01 1964-09-01 3.3401300 38761 0.001296 1964-09-01 1965-01-01 3.4401300 38761 0.001296 1965-01-01 1965-03-01 3.5401300 38761 0.001296 1965-03-01 1965-07-01 3.6401300 38761 0.001296 1965-07-01 1965-09-01 3.7401300 38761 0.001296 1965-09-01 1966-01-01 3.8401300 38761 0.001296 1966-01-01 1968-02-01 4.3131700 39126 0.002592 1968-02-01 1972-01-01 4.2131700 39126 0.002592 1972-01-01 1972-07-01 10 1972-07-01 1973-01-01 11 1973-01-01 1974-01-01 12 1974-01-01 1975-01-01 13 1975-01-01 1976-01-01 14 1976-01-01 1977-01-01 15 1977-01-01 1978-01-01 16 1978-01-01 1979-01-01 17 1979-01-01 1980-01-01 18 1980-01-01 1981-07-01 19 1981-07-01 1982-07-01 20 1982-07-01 1983-07-01 21 1983-07-01 1985-07-01 22 1985-07-01 1988-01-01 23 1988-01-01 1990-01-01 24 1990-01-01 1991-01-01 25 1991-01-01 1992-07-01 26 1992-07-01 1993-07-01 27 1993-07-01 1994-07-01 28 1994-07-01 1996-01-01 29 1996-01-01 1997-07-01 30 1997-07-01 1999-01-01 31 1999-01-01 2006-01-01 32 2006-01-01 2009-01-01 33 2009-01-01 2012-07-01 34 2012-07-01 2015-07-01 35 2015-07-01 2017-01-01 36 2017-01-01 9999-12-31 37 PKkeNLñ//midgard/data/_time.py"""Array with time epochs """ # Standard library imports from datetime import datetime from typing import Callable, Dict, List, Optional, Tuple # Third party imports import numpy as np # Midgard imports from midgard.dev import exceptions from midgard.math.unit import Unit from midgard.data._time_formats import _FORMATS, _FORMAT_UNITS _SCALES: Dict[str, "TimeArray"] = dict() # Populated by register_scale() _CONVERSIONS: Dict[Tuple[str, str], Callable] = dict() # Populated by register_scale() _CONVERSION_HOPS: Dict[Tuple[str, str], List[str]] = dict() # Cache for to_scale() def register_scale( convert_to: Dict[str, Callable] = None, convert_from: Dict[str, Callable] = None ) -> Callable[[Callable], Callable]: """Decorator used to register new time scales The scale name is read from the .scale attribute of the Time class. Args: convert_to: Functions used to convert to other scales. convert_from: Functions used to convert from other scales. Returns: Decorator registering scale. """ def wrapper(cls: Callable) -> Callable: name = cls.scale _SCALES[name] = cls if convert_to: for to_scale, converter in convert_to.items(): _CONVERSIONS[(name, to_scale)] = converter if convert_from: for from_scale, converter in convert_from.items(): _CONVERSIONS[(to_scale, name)] = converter return cls return wrapper def _find_conversion_hops(hop: Tuple[str, str]) -> List[Tuple[str, str]]: """Calculate the hops needed to convert between scales using breadth first search""" start_scale, target_scale = hop queue = [(start_scale, [])] visited = set() while queue: from_scale, hops = queue.pop(0) for to_scale in [t for f, t in _CONVERSIONS if f == from_scale]: one_hop = (from_scale, to_scale) if to_scale == target_scale: return hops + [one_hop] if one_hop not in visited: visited.add(one_hop) queue.append((to_scale, hops + [one_hop])) raise exceptions.UnknownConversionError(f"Can't convert TimeArray from {start_scale!r} to {target_scale!r}") # # The main Time class # class TimeArray(np.ndarray): scale = None _scales = _SCALES _formats = _FORMATS _unit = staticmethod(Unit.unit_factory(__name__)) def __new__(cls, val, val2=None, format="", _jd1=None, _jd2=None): """Create a new TimeArray""" if cls.scale is None: raise ValueError(f"{cls.__name__} cannot be instantiated. Use Time(val=..., scale={cls.scale!r}) instead") if format not in cls._formats: formats = ", ".join(cls._formats) raise exceptions.UnknownSystemError(f"Format {format!r} unknown. Use one of {formats}") # Convert to numpy array and read format val = np.asarray(val) if val.ndim == 0: val.resize(1) if val2 is not None: val2 = np.asarray(val2) if val2.shape != val.shape: raise ValueError(f"'val2' must have the same shape as 'val': {val.shape}") fmt_values = cls._formats[format](val, val2) # Store values on array obj = np.asarray(fmt_values.value).view(cls) obj.format = format obj.jd1 = fmt_values.jd1 if _jd1 is None else _jd1 obj.jd2 = fmt_values.jd2 if _jd2 is None else _jd2 return obj def __array_finalize__(self, obj): """Called automatically when a new TimeArray is created""" if obj is None: return # Validate shape if self.ndim == 0: self.resize(1) if self.ndim != 1: raise ValueError(f"{type(self).__name__!r} must be a 1-dimensional array") # Copy attributes from the original object self.format = getattr(obj, "format", None) self.jd1 = getattr(obj, "jd1", None) self.jd2 = getattr(obj, "jd2", None) # Freeze self.flags.writeable = False if self.jd1 is not None and self.jd2 is not None: self.jd1.flags.writeable = False self.jd2.flags.writeable = False self._update_hash() @property def val(self): return np.asarray(self) @property def SCALES(self): return tuple(sorted(self._scales)) @property def FORMATS(self): return tuple(sorted(self._formats)) @classmethod def _cls_scale(cls, scale: str) -> "Type[TimeArray]": """Check that scale is valid and return corresponding type""" if scale not in cls._scales: scales = ", ".join(sorted(cls._scales)) raise exceptions.UnknownSystemError(f"Scale {scale!r} unknown. Use one of {scales}") return cls._scales[scale] @classmethod def create( cls, val: np.ndarray, scale: str, format: str, val2: Optional[np.ndarray] = None, _jd1: Optional[np.ndarray] = None, _jd2: Optional[np.ndarray] = None, ) -> "TimeArray": """Factory for creating TimeArrays for different scales See each time class for exact optional parameters. Args: val: Array of time values. scale: Name of time scale. pos_args: Additional arguments used to create the TimeArray. Returns: Array with times in the given scale. """ return cls._cls_scale(scale)(val, val2=val2, format=format, _jd1=_jd1, _jd2=_jd2) @classmethod def from_jds(cls, jd1: np.ndarray, jd2: np.ndarray, format: str) -> "TimeArray": """Create a new time array with given Julian dates and format, keep scale """ fmt_value = cls._formats[format].from_jds(jd1, jd2) return cls(val=fmt_value, format=format, _jd1=jd1, _jd2=jd2) @classmethod def now(cls, scale="utc", format="datetime") -> "TimeArray": """Create a new time representing now""" dt_now = np.array([datetime.now()]) jd1, jd2 = cls._formats["datetime"].to_jds(dt_now) return cls._cls_scale("utc").from_jds(jd1, jd2, format=format).to_scale(scale) def to_scale(self, scale: str) -> "TimeArray": """Convert time to a different scale Returns a new TimeArray with the same time in the new scale. Args: scale: Name of new scale. Returns: TimeArray representing the same times in the new scale. """ # Don't convert if not necessary if scale == self.scale: return self # Raise error for unknown scales if scale not in self._scales: scales = ", ".join(self._scales) raise exceptions.UnknownSystemError(f"Scale {scale!r} unknown. Use one of {scales}") # Convert to new scale hop = (self.scale, scale) if hop in _CONVERSIONS: return self._scales[scale].from_jds(*_CONVERSIONS[hop](self.jd1, self.jd2), self.format) if hop not in _CONVERSION_HOPS: _CONVERSION_HOPS[hop] = _find_conversion_hops(hop) jd1, jd2 = self.jd1, self.jd2 for one_hop in _CONVERSION_HOPS[hop]: jd1, jd2 = _CONVERSIONS[one_hop](jd1, jd2) return self._scales[scale].from_jds(jd1, jd2, self.format) def unit(self, field: str = "") -> Tuple[str, ...]: """Unit of field""" mainfield, _, subfield = field.partition(".") # Units of formats field = self.format if not field else field if field in self.FORMATS: return _FORMAT_UNITS[field] # Units of properties else: return self._unit(field) @property @Unit.register(("year",)) def year(self): return np.array([d.year for d in self.datetime]) @property @Unit.register(("month",)) def month(self): return np.array([d.month for d in self.datetime]) @property @Unit.register(("day",)) def day(self): return np.array([d.day for d in self.datetime]) @property @Unit.register(("hour",)) def hour(self): return np.array([d.hour for d in self.datetime]) @property @Unit.register(("minute",)) def minute(self): return np.array([d.minute for d in self.datetime]) @property @Unit.register(("second",)) def second(self): return np.array([d.second for d in self.datetime]) @property @Unit.register(("second",)) def sec_of_day(self): """Seconds since midnight Note - Does not support leap seconds Returns: Seconds since midnight """ return np.array([d.hour * 60 * 60 + d.minute * 60 + d.second for d in self.datetime]) @property def yydddsssss(self): """Text representation YY:DDD:SSSSS YY - decimal year without century DDD - zero padded decimal day of year SSSSS - zero padded seconds since midnight Note - Does not support leap seconds Returns: Time converted to yydddssss format """ return np.array([d.strftime("%y:%j:") + str(s).zfill(5) for d, s in zip(self.datetime, self.sec_of_day)]) def _todo__iadd__(self, other): self.flags.writeable = True super().__iadd__(other) self.flags.writeable = False self._update_hash() return self def _update_hash(self): self._hash = hash(self.jd1.data.tobytes()) + hash(self.jd2.data.tobytes()) def __hash__(self): return self._hash def __getattr__(self, key): """Get attributes with dot notation Add time scales and formats to attributes on Time arrays. """ # Convert to a different scale if key in self._scales: return self.to_scale(key) # Convert to a different format elif key in self._formats: return self._formats[key].from_jds(self.jd1, self.jd2) # Raise error for unknown attributes else: raise AttributeError(f"{type(self).__name__!r} has no attribute {key!r}") from None def __deepcopy__(self, memo): """Deep copy a TimeArray """ time = self.create(val=self.val.copy(), format=self.format, scale=self.scale, _jd1=self.jd1, _jd2=self.jd2) memo[id(time)] = time return time @classmethod def _read(self, h5_group, memo): scale = h5_group.attrs["scale"] format = h5_group.attrs["format"] val = h5_group[h5_group.attrs["fieldname"]][...] jd1 = h5_group["jd1"][...] jd2 = h5_group["jd2"][...] time = TimeArray.create(val, scale=scale, format=format, _jd1=jd1, _jd2=jd2) memo[f"{h5_group.attrs['fieldname']}"] = time return time def _write(self, h5_group, memo): h5_group.attrs["scale"] = self.scale h5_group.attrs["format"] = self.format h5_field = h5_group.create_dataset(h5_group.attrs["fieldname"], self.shape, dtype=self.dtype) h5_field[...] = self.val h5_field = h5_group.create_dataset("jd1", self.jd1.shape, dtype=self.jd1.dtype) h5_field[...] = self.data.jd1 h5_field = h5_group.create_dataset("jd2", self.shape, dtype=self.jd2.dtype) h5_field[...] = self.data.jd2 memo[id(self)] = h5_group.attrs["fieldname"] def __dir__(self): """List all fields and attributes on the Time array""" return super().__dir__() + list(self._scales) + list(self._formats) def __repr__(self): cls_name = type(self).__name__ repr_str = super().__repr__() return repr_str.replace(f"{cls_name}(", f"{cls_name}(scale={self.scale!r}, format={self.format!r}, ") # Turn off arithmetic operations def __add__(self, other): return NotImplemented def __radd__(self, other): return NotImplemented def __iadd__(self, other): return NotImplemented def __sub__(self, other): return NotImplemented def __rsub__(self, other): return NotImplemented def __isub__(self, other): return NotImplemented PKsy>NBɉ<<midgard/data/_time_formats.py"""Array with time epochs """ # Standard library imports from calendar import isleap from datetime import datetime, timedelta from typing import Callable, Dict # Third party imports import numpy as np # Midgard imports from midgard.math.unit import Unit _FORMATS: Dict[str, "TimeFormat"] = dict() # Populated by register_format() _FORMAT_UNITS: Dict[str, str] = dict() # Populated by register_format() def register_format(cls: Callable) -> Callable: """Decorator used to register new time formats The format name is read from the .format attribute of the TimeFormat class. """ name = cls.format _FORMATS[name] = cls _FORMAT_UNITS[name] = cls.unit return cls # # Time formats # class TimeFormat: format = None unit = None day2seconds = Unit.day2seconds week2days = Unit.week2days def __init__(self, val, val2=None): """Convert val and val2 to Julian days""" self.jd1, self.jd2 = self.to_jds(val, val2) @classmethod def to_jds(cls, val, val2): """Convert val and val2 to Julian days and set the .jd1 and .jd2 attributes""" raise NotImplementedError @classmethod def from_jds(cls, jd1, jd2): """Convert Julian days to the right format""" raise NotImplementedError @property def value(self): """Convert Julian days to the right format""" return self.from_jds(self.jd1, self.jd2) @register_format class TimeJD(TimeFormat): format = "jd" unit = ("day",) @classmethod def to_jds(cls, val, val2): if val2 is None: val2 = np.zeros(val.shape) _delta = val - (np.floor(val + val2 - 0.5) + 0.5) jd1 = val - _delta jd2 = val2 + _delta return jd1, jd2 @classmethod def from_jds(cls, jd1, jd2): return jd1 + jd2 @register_format class TimeMJD(TimeFormat): """Modified Julian Date time format. This represents the number of days since midnight on November 17, 1858. For example, 51544.0 in MJD is midnight on January 1, 2000. """ format = "mjd" unit = ("day",) _mjd0 = 2400000.5 @classmethod def to_jds(cls, val, val2): if val2 is None: val2 = np.zeros(val.shape) _delta = val - (np.floor(val + val2 - 0.5) + 0.5) jd1 = cls._mjd0 + val - _delta jd2 = val2 + _delta return jd1, jd2 @classmethod def from_jds(cls, jd1, jd2): return jd1 - cls._mjd0 + jd2 @register_format class TimeDateTime(TimeFormat): format = "datetime" unit = None _jd2000 = 2451544.5 _dt2000 = datetime(2000, 1, 1) @classmethod def to_jds(cls, val, val2=None): if val2 is not None: raise jd1, jd2 = np.array([cls._dt2jd(dt) for dt in val]).T return jd1, jd2 @classmethod def _dt2jd(cls, dt): """Convert one datetime to one Julian date pair""" delta = dt - cls._dt2000 jd1 = cls._jd2000 + delta.days delta -= timedelta(days=delta.days) jd2 = delta.total_seconds() / cls.day2seconds return jd1, jd2 @classmethod def from_jds(cls, jd1, jd2): return np.array([cls._jd2dt(j1, j2) for j1, j2 in zip(jd1, jd2)]) @classmethod def _jd2dt(cls, jd1, jd2): """Convert one Julian date to a datetime""" return cls._dt2000 + timedelta(days=jd1 - cls._jd2000) + timedelta(days=jd2) @register_format class TimeGPSWeekSec(TimeFormat): """GPS weeks and seconds. """ format = "gps_ws" unit = ("week", "second") _jd19800106 = 2444244.5 @classmethod def to_jds(cls, wwww, sec): # .. Determine GPS day wd = np.floor((sec + 0.5 * cls.day2seconds) / cls.day2seconds) # 0.5 d = 43200.0 s # .. Determine remainder fracSec = sec + 0.5 * cls.day2seconds - wd * cls.day2seconds # .. Conversion GPS week and day to from Julian Date (JD) jd_day = wwww * Unit.week2days + wd + cls._jd19800106 - 0.5 jd_frac = fracSec / cls.day2seconds return jd_day, jd_frac @classmethod def from_jds(cls, jd1, jd2): if np.any(jd1 + jd2 < cls._jd19800106): raise ValueError(f"Julian Day exceeds the GPS time start date of 6-Jan-1980 (JD {cls._jd19800106})") # See Time.jd_int for explanation _delta = jd1 - (np.floor(jd1 + jd2 - 0.5) + 0.5) jd_int = jd1 - _delta jd_frac = jd2 + _delta # .. Conversion from Julian Date (JD) to GPS week and day wwww = np.floor((jd_int - cls._jd19800106) / cls.week2days) wd = np.floor(jd_int - cls._jd19800106 - wwww * cls.week2days) gpssec = (jd_frac + wd) * cls.day2seconds return wwww, gpssec @register_format class TimeYear(TimeFormat): """Year. # TODO: conversion is not correct!!! """ format = "decimalyear" unit = ("common_year",) # TODO: Check this @classmethod def to_jds(cls, val, val2=None): if val2 is not None: raise jd1, jd2 = np.array([TimeDateTime._dt2jd(cls._yr2dt(yr)) for yr in val]).T return jd1, jd2 @classmethod def _yr2dt(cls, year): year_int = int(year) year_frac = year - year_int days_of_year = 366 if isleap(year_int) else 365 # accounting for leap year seconds_of_year = year_frac * days_of_year * Unit.day2seconds return datetime(year_int, 1, 1) + timedelta(seconds=float(seconds_of_year)) @classmethod def from_jds(cls, jds1, jds2): year = list() for jd1, jd2 in zip(jds1, jds2): dt = TimeDateTime._jd2dt(jd1, jd2) days_of_year = 366 if isleap(dt.year) else 365 # accounting for leap year day_frac = ( dt.hour * Unit.hour2day + dt.minute * Unit.minute2day + dt.second * Unit.second2day + dt.microsecond * Unit.microsecond2days ) # day_frac = dt.hour /24 + dt.minute /1440 + dt.second /86400 + dt.microsecond / 86400000000 year.append(dt.year + (dt.timetuple().tm_yday + day_frac) / days_of_year) return np.array(year) PKK8NɭB"midgard/data/_time_leapseconds.txt# This file is used by Midgard to keep track of leap seconds. # # Whenever a new leapsecond gets decided it should be added to this list # # See ... [leapseconds] 1972-06-30 1972-12-31 1973-12-31 PK9N@Oemidgard/data/_time_scales.py"""Time scales and conversions between them """ # Standard library imports from typing import Any, Callable, Dict, List, Tuple # Third party imports import numpy as np # Midgard imports from midgard.data._time import TimeArray, register_scale from midgard.math.unit import Unit # # Time scale conversions # def _utc2tai(utc_jd1, utc_jd2): """Convert UTC to TAI""" return utc_jd1, utc_jd2 def _tai2utc(tai_jd1, tai_jd2): """Convert TAI to UTC""" return tai_jd1, tai_jd2 def _tai2tt(tai_jd1, tai_jd2): """Convert TAI to UTC""" delta = 32.184 * Unit.seconds2day return tai_jd1, tai_jd2 + delta def _tt2tai(tt_jd1, tt_jd2): """Convert TT to TAI""" delta = 32.184 * Unit.seconds2day return tt_jd1, tt_jd2 - delta def _gps2tai(gps_jd1, gps_jd2): """Convert GPS time scale to TAI Args: gps_jd1: Float, part 1 of 'gps'-time as a two-part Julian Day. gps_jd2: Float, part 2 of 'gps'-time as a two-part Julian Day. Returns: 2-tuple of floats representing 'tai' as a two-part Julian Day. """ delta = 19 * Unit.seconds2day return gps_jd1, gps_jd2 + delta def _tai2gps(tai_jd1, tai_jd2): """Convert from TAI to GPS time scale Args: tai_jd1: Float, part 1 of 'tai'-time as a two-part Julian Day. tai_jd2: Float, part 2 of 'tai'-time as a two-part Julian Day. Returns: 2-tuple of floats representing 'gps' as a two-part Julian Day. """ delta = 19 * Unit.seconds2day return tai_jd1, tai_jd2 - delta # # Time scales # @register_scale(convert_to=dict(tai=_utc2tai)) class UtcTime(TimeArray): scale = "utc" @register_scale(convert_to=dict(utc=_tai2utc, tt=_tai2tt, gps=_tai2gps)) class TaiTime(TimeArray): scale = "tai" @register_scale(convert_to=dict(tai=_gps2tai)) class GpsTime(TimeArray): scale = "gps" @register_scale(convert_to=dict(tai=_tt2tai)) class TtTime(TimeArray): scale = "tt" PKkeNGm33midgard/data/dataset.py"""A dataset for handling time series data Description: ------------ """ # Standard library imports from collections import UserDict import copy import pathlib from typing import Any, Callable, Dict, List, Optional, Union # Third party imports import h5py import numpy as np import pandas as pd # Midgard imports import midgard from midgard.collections import enums from midgard.dev import console from midgard.dev import exceptions from midgard.dev import log from midgard.data import _h5utils from midgard.data import fieldtypes # Version of Dataset __version__ = "3.0" class Dataset: """A dataset representing fields of data arrays""" version = f"Dataset v{__version__}, Midgard v{midgard.__version__}" def __init__(self, num_obs: int = 0) -> None: """Initialize an empty dataset""" self._fields = dict() self.meta = Meta() self.vars = dict() self._num_obs = num_obs self._default_field_suffix = None @classmethod def read(cls, file_path: Union[str, pathlib.Path]) -> "Dataset": """Read a dataset from file""" log.debug(f"Read dataset from {file_path}") # Dictionary to keep track of references in the data structure # key: field_name, value: object (TimeArray, PositionArray, etc) memo = {} # Read fields from file with h5py.File(file_path, mode="r") as h5_file: num_obs = h5_file.attrs["num_obs"] dset = cls(num_obs=num_obs) dset.vars.update(_h5utils.h5attr2dict(h5_file.attrs["vars"])) # Read fields for fieldname, fieldtype in _h5utils.h5attr2dict(h5_file.attrs["fields"]).items(): field = fieldtypes.function(fieldtype).read(h5_file[fieldname], memo) if field: # TODO: Test can be removed when all fieldtypes implement read dset._fields[fieldname] = field setattr(dset, fieldname, field.data) memo[fieldname] = field.data # Read meta dset.meta.read(h5_file["__meta__"]) return dset def write(self, file_path: Union[str, pathlib.Path], write_level: Optional[enums.WriteLevel] = None) -> None: """Write a dataset to file""" write_level = ( min(enums.get_enum("write_level")) if write_level is None else enums.get_value("write_level", write_level) ) log.debug(f"Write dataset to {file_path}") # Make sure directory exists file_path = pathlib.Path(file_path).resolve() file_path.parent.mkdir(parents=True, exist_ok=True) # Dictionary to keep track of object references # key: object(TimeAraay, PostionArray, etc) id, value: field_name memo = {} with h5py.File(file_path, mode="w") as h5_file: # Write each field for field_name, field in self._fields.items(): h5_group = h5_file.create_group(field_name) memo[id(getattr(self, field_name))] = field_name field.write(h5_group, memo, write_level=write_level) # Write meta-information self.meta.write(h5_file.create_group("__meta__")) # Write information about dataset fields = {fn: f.fieldtype for fn, f in self._fields.items()} h5_file.attrs["fields"] = _h5utils.dict2h5attr(fields) h5_file.attrs["num_obs"] = self.num_obs h5_file.attrs["vars"] = _h5utils.dict2h5attr(self.vars) h5_file.attrs["version"] = self.version def subset(self, idx: np.array) -> None: """Remove observations from all fields based on index """ print("subset") import IPython IPython.embed() def extend(self, other_dataset: "Dataset") -> None: """Add observations from another dataset to the end of this dataset """ print("extend") import IPython IPython.embed() def filter(self, idx=None, **filters) -> np.array: """Filter observations TODO: default_field_suffix """ idx = np.ones(self.num_obs, dtype=bool) if idx is None else idx for field, value in filters.items(): idx &= self[field] == value return idx def unique(self, field, **filters) -> np.array: """List unique values of a given field TODO: default_field_suffix """ idx = self.filter(**filters) return np.unique(self[field][idx]) def for_each(self, key): """Do something for each suffix """ previous_field_suffix = self.default_field_suffix suffixes = [f[len(key) :] for f in self.fields if f.startswith(key) and "." not in f] multipliers = [-1, 1] if len(suffixes) == 2 else [1] * len(suffixes) for multiplier, suffix in zip(multipliers, suffixes): self.default_field_suffix = suffix yield multiplier self.default_field_suffix = previous_field_suffix def unit(self, field): """Unit for values in a given field""" mainfield, _, subfield = field.partition(".") try: return self._fields[mainfield].unit(subfield) except KeyError: raise exceptions.FieldDoesNotExistError(f"Field {mainfield!r} does not exist") from None def plot_values(self, field: str) -> np.array: """Return values of a field in a form that can be plotted""" return self._fields[field].plot_values def as_dict(self, fields=None) -> Dict[str, Any]: """Return a representation of the dataset as a dictionary""" fields_dict = dict() for field in self._fields.values(): fields_dict.update(field.as_dict(fields=fields)) return fields_dict def as_dataframe(self, fields=None, index=None) -> pd.DataFrame: """Return a representation of the dataset as a Pandas DataFrame""" df = pd.DataFrame.from_dict(self.as_dict(fields=fields)) if index is not None: df.set_index(index, drop=True, inplace=True) return df def apply(self, func: Callable, field: str, **filters: Any) -> Any: """Apply a function to a field""" idx = self.filter(**filters) values = self[field][idx] return func(values) def rms(self, field: str, **filters: Any) -> float: """Calculate Root Mean Square of a field""" return self.apply(lambda val: np.sqrt(np.mean(np.square(val))), field, **filters) def mean(self, field: str, **filters: Any) -> float: """Calculate mean of a field""" return self.apply(np.mean, field, **filters) def std(self, field: str, **filters: Any) -> float: """Calculate the standard deviation of a field""" return self.apply(np.std, field, **filters) def count(self, field: str, **filters: Any) -> int: """Count the number of unique values in a field""" return len(self.unique(field, **filters)) def num(self, **filters: Any) -> int: """Number of observations satisfying the filters""" return sum(self.filter(**filters)) @property def num_obs(self) -> int: """Number of observations in dataset""" return self._num_obs @num_obs.setter def num_obs(self, value: int): """Set the number of observations in dataset""" if self._fields and value != self._num_obs: raise AttributeError( "Can not change number of observations after fields are added. Use 'subset' or 'extend' instead" ) self._num_obs = value @property def default_field_suffix(self): """Default field suffix """ return self._default_field_suffix @default_field_suffix.setter def default_field_suffix(self, suffix): """Set the default field suffix TODO: Could we possibly use a context manager for these things? E.g.: with dset.suffix("1"): models.calculate(dset) """ if suffix: suffix = str(suffix) if not suffix.startswith("_"): suffix = "_" + suffix else: suffix = None self._default_field_suffix = suffix @property def fields(self): """Names of fields in the dataset""" all_fields = list() for field in self._fields.values(): all_fields.extend(field.subfields) return sorted(all_fields) def update_from(self, other: "Dataset") -> None: """Transfers dataset fields from other Dataset to this Dataset This will not create a copy of the data in the other Dataset """ self.num_obs = other.num_obs for fieldname, field in other._fields.items(): if fieldname in self._fields: raise exceptions.FieldExistsError(f"Field {fieldname!r} already exists in dataset") self._fields.update({fieldname: field}) setattr(self, fieldname, field.data) def _todo__setattr__(self, key, value): """Protect dataset fields from being accidentally overwritten""" print("__setattr__") def _todo__delitem__(self, key): """Delete a field from the dataset""" print("__delitem__") def _todo__delattr__(self, key): """Delete a field from the dataset""" print("__delattr__") def __dir__(self): """List all fields and attributes on the dataset""" return super().__dir__() + self.fields def _ipython_key_completions_(self) -> List[str]: """Tab completion for dict-style lookups in ipython See http://ipython.readthedocs.io/en/stable/config/integrating.html Returns: Fieldnames in the dataset. """ return self.fields def __bool__(self) -> bool: """Dataset is truthy if it has fields with observations""" return self.num_obs > 0 and len(self._fields) > 0 def __len__(self) -> int: """Length of dataset is equal to the number of observations""" return self.num_obs def __repr__(self) -> str: """A string representing the dataset""" info = [f"num_obs={self.num_obs}", f"num_fields={len(self.fields)}"] # TODO: Based on some vars return f"{type(self).__name__}({', '.join(info)})" def __str__(self) -> str: """A string describing the dataset""" fields = console.fill(f"Fields: {', '.join(self.fields)}", hanging=8) return f"{self!r}\n{fields}" def __deepcopy__(self, memo): new_dset = Dataset(num_obs=self.num_obs) for fieldname, field in self._fields.items(): new_dset._fields[fieldname] = copy.deepcopy(field, memo) setattr(new_dset, fieldname, new_dset._fields[fieldname].data) new_dset.meta = copy.deepcopy(self.meta, memo) memo[id(self)] = new_dset return new_dset class Meta(UserDict): def read(self, h5_group): pass def write(self, h5_group): pass def add(self, section, name, value): """Add information to the metaset""" print("add") import IPython IPython.embed() def add_event(self, timestamp, event_type, description): """Add event to the metaset""" print(f"add_event: {timestamp} {event_type} {description}") events = self.setdefault("__events__", dict()) events.setdefault(event_type, list()).append((timestamp.isoformat(), description)) # events.setdefault(event_type, list()).append((timestamp.utc.isot, description)) def get_events(self, *event_types): """Get events from the metaset""" print("get_event") all_events = self.get("__events__", dict()) if not event_types: event_types = all_events.keys() return sorted(set((ts, k, desc) for k, v in all_events.items() for (ts, desc) in v if k in event_types)) def copy_from(self, other): self.data = copy.deepcopy(other.data) # # Add available fieldtypes to dataset # def _add_field_factory(field_type: str) -> Callable: func = fieldtypes.function(field_type) def _add_field(self, name, val, unit=None, write_level=None, suffix=None, **field_args): """Add a {field_type} field to the dataset""" if name in self._fields: raise exceptions.FieldExistsError(f"Field {name!r} already exists in dataset") # Create collections for nested fields collection, _, field_name = name.rpartition(".") if collection and collection not in self._fields: self.add_collection(collection, val=None) # Create field field = func(num_obs=self.num_obs, name=field_name, val=val, unit=unit, write_level=write_level, **field_args) # Add field to list of fields fields = self[collection] if collection else self._fields fields[field_name] = field setattr(self, name, field.data) _add_field.__doc__ = _add_field.__doc__.format(field_type=field_type) return _add_field for field_type in fieldtypes.names(): setattr(Dataset, f"add_{field_type}", _add_field_factory(field_type)) PKyfN-midgard/data/position.py"""Array with positions """ from typing import Any # Third party imports import numpy as np # Midgard imports from midgard.data import _position from midgard.math import transformation def Position(val: np.ndarray, system: str, **pos_args: Any) -> "PositionArray": """Factory for creating PositionArrays for different systems See each position class for exact optional parameters. Args: val: Array of position values. system: Name of position system. pos_args: Additional arguments used to create the PositionArray. Returns: Array with positions in the given system. """ return _position.PositionArray.create(val, system, **pos_args) def PositionDelta( val: np.ndarray, system: str, ref_pos: _position.PositionArray, **delta_args: Any ) -> _position.PositionDeltaArray: """Factory for creating PositionArrays for different systems See each position class for exact optional parameters. Args: val: Array of position values. system: Name of position system. ref_pos: Reference position. delta_args: Additional arguments used to create the PositionArray. Returns: Array with positions in the given system. """ return _position.PositionDeltaArray.create(val, system, ref_pos, **delta_args) def PosVel(val: np.ndarray, system: str, **pos_args: Any) -> "PosVelArray": """Factory for creating PosVelArrays for different systems See each position class for exact optional parameters. Args: val: Array of position values. system: Name of position system. pos_args: Additional arguments used to create the PosVelArray. Returns: Array with positions in the given system. """ return _position.PosVelArray.create(val, system, **pos_args) def PosVelDelta( val: np.ndarray, system: str, ref_pos: _position.PosVelArray, **delta_args: Any ) -> _position.PosVelDeltaArray: """Factory for creating PosVelArrays for different systems See each position class for exact optional parameters. Args: val: Array of position values. system: Name of position system. ref_pos: Reference position. delta_args: Additional arguments used to create the PosVelArray. Returns: Array with positions in the given system. """ return _position.PosVelDeltaArray.create(val, system, ref_pos, **delta_args) # # Attributes # _position.register_attribute(_position.PosVelArray, "other", _position.PosVelArray) _position.register_attribute(_position.PositionArray, "other", _position.PositionArray) # # Position systems # @_position.register_system(convert_to=dict(llh=transformation.trs2llh)) class TrsPosition(_position.PositionArray): system = "trs" column_names = ("x", "y", "z") _units = ("meter", "meter", "meter") @_position.register_system(convert_to=dict(trs=transformation.llh2trs)) class LlhPosition(_position.PositionArray): system = "llh" column_names = ("lat", "lon", "height") _units = ("radians", "radians", "meter") # # Position delta systems # @_position.register_system(convert_to=dict(enu=transformation.delta_trs2enu)) class TrsPositionDelta(_position.PositionDeltaArray): system = "trs" column_names = ("x", "y", "z") _units = ("meter", "meter", "meter") @_position.register_system(convert_to=dict(trs=transformation.delta_enu2trs)) class EnuPositionDelta(_position.PositionDeltaArray): system = "enu" column_names = ("east", "north", "up") _units = ("meter", "meter", "meter") # # Velocity systems # @_position.register_system(convert_to=dict()) class TrsVelocity(_position.VelocityArray): system = "trs" column_names = ("x", "y", "z") _units = ("meter/second", "meter/second", "meter/second") # # Velocity delta systems # @_position.register_system(convert_to=dict(enu=transformation.delta_trs2enu)) class TrsVelocityDelta(_position.VelocityDeltaArray): system = "trs" column_names = ("x", "y", "z") _units = ("meter/second", "meter/second", "meter/second") @_position.register_system(convert_to=dict(trs=transformation.delta_enu2trs)) class EnuVelocityDelta(_position.VelocityDeltaArray): system = "enu" column_names = ("veast", "vnorth", "vup") _units = ("meter/second", "meter/second", "meter/second") # # PosVel systems # @_position.register_system(convert_to=dict()) class TrsPosVel(_position.PosVelArray): system = "trs" column_names = ("x", "y", "z", "vx", "vy", "vz") _units = ("meter", "meter", "meter", "meter/second", "meter/second", "meter/second") # # PosVelDelta systems # @_position.register_system(convert_to=dict(enu=transformation.delta_trs2enu_posvel)) class TrsPosVelDelta(_position.PosVelDeltaArray): system = "trs" column_names = ("x", "y", "z", "vx", "vy", "vz") _units = ("meter", "meter", "meter", "meter", "meter", "meter") @_position.register_system(convert_to=dict(trs=transformation.delta_enu2trs_posvel)) class EnuPosVelDelta(_position.PosVelDeltaArray): system = "enu" column_names = ("east", "north", "up", "veast", "vnorth", "vup") _units = ("meter", "meter", "meter", "meter/second", "meter/second", "meter/second") PKKYN 27\\midgard/data/sigma.py"""Array with sigma values See https://docs.scipy.org/doc/numpy/user/basics.subclassing.html for information about subclassing Numpy arrays. SigmaArray is a regular Numpy array with an added field, sigma. """ # Third party imports import numpy as np class SigmaArray(np.ndarray): def __new__(cls, values, sigma=None): """Create a new SigmaArray""" obj = np.asarray(values).view(cls) obj.sigma = sigma return obj def __array_finalize__(self, obj): """Called automatically when a new SigmaArray is created""" if obj is None: return # Copy sigma from the original object sigma_sliced = getattr(obj, "_sigma_sliced", None) if sigma_sliced: self.sigma = sigma_sliced else: self.sigma = getattr(obj, "_sigma", None) @property def sigma(self): """Sigma values""" return self._sigma @sigma.setter def sigma(self, value): """Set sigma values""" if value is None: self._sigma = np.full(self.shape, np.nan) else: self._sigma = np.asarray(value) * np.ones(self.shape) def __getitem__(self, item): """Update _sigma_sliced with correct shape, used by __array_finalize__""" self._sigma_sliced = self._sigma[item] return super().__getitem__(item) PK "TimeArray": """Factory for creating TimeArrays for different systems See each time class for exact optional parameters. Args: val: Array of time values. val2: Optional second array for extra precision. scale: Name of time scale. format: Format of values given in val and val2. Returns: Array with epochs in the given time scale and format """ return TimeArray.create(val=val, val2=val2, scale=scale, format=format, _jd1=_jd1, _jd2=_jd2) # Make classmethods available Time.FORMATS = TimeArray.FORMATS # Todo? Time.SCALES = TimeArray.SCALES Time.now = TimeArray.now PKis#N!Fԛ#midgard/data/fieldtypes/__init__.py"""Field types that can be used by Dataset """ # Standard library imports from typing import Callable, List # Midgard imports from midgard.dev import plugins def names() -> List[str]: """Names of fieldtype plugins""" return plugins.names(__name__) def function(plugin_name: str) -> Callable: """Function creating new field""" return plugins.get(__name__, plugin_name=plugin_name).function PKy[N"RsL%midgard/data/fieldtypes/_fieldtype.py"""Abstract class used to define different types of tables for a Dataset """ # Standard library imports import abc from typing import Any, Dict, List # Third party imports import numpy as np import pandas as pd # Midgard imports from midgard.collections import enums from midgard.data import _h5utils class FieldType(abc.ABC): """Abstract class representing a type of field in the Dataset""" _subfields: List[str] = list() _factory = None def __init__(self, num_obs, name, val, unit=None, write_level=None, **field_args): """Create a new field""" self.num_obs = num_obs self.name = name self._unit = unit self._write_level = ( max(enums.get_enum("write_level")) if write_level is None else enums.get_value("write_level", write_level) ) # Use _post_init to set up the actual data structure self.data = None self._post_init(val, **field_args) @abc.abstractmethod def _post_init(self, val, **field_args) -> None: """Do post initialization and validation""" @property def fieldtype(self) -> str: """Get type from module name to coincide with plug-in name""" return self.__module__.rpartition(".")[-1] @classmethod def read(cls, h5_group, memo) -> "FieldType": """Read a field from a HDF5 data source""" field = cls._read(h5_group, memo) if field: # TODO: Test can be removed when read is implemented on all fields field._unit = _h5utils.h5attr2tuple(h5_group.attrs["unit"]) if not any(field._unit): field._unit = None field._write_level = enums.get_value("write_level", h5_group.attrs["write_level"]) return field @classmethod @abc.abstractmethod def _read(cls, h5_group, memo) -> "FieldType": """Read a field from a HDF5 data source""" def write(self, h5_group, memo, write_level: enums.WriteLevel) -> None: """Write data to a HDF5 data source""" if self._write_level < write_level: return self._write(h5_group, memo) h5_group.attrs["unit"] = "" if self._unit is None else _h5utils.sequence2h5attr(self._unit) h5_group.attrs["write_level"] = self._write_level.name @abc.abstractmethod def _write(self, h5_group, memo) -> None: """Write values of field to HDF5""" def _fields_at_write_level(self, write_level: enums.WriteLevel) -> List[str]: """Names of all subfields at or above the given write level """ print("at_write_level") return [self.name] + self._subfields def subset(self, idx: np.array) -> None: """Remove observations from a field based on index""" def extend(self, other_field) -> None: """Add observations from another field""" @property def plot_values(self) -> np.array: """Return values of the field in a form that can be plotted""" return self.data def as_dict(self, fields=None) -> Dict[str, Any]: """Return a representation of the field as a dictionary""" field_dict = dict() fields = set(self.subfields if fields is None else fields) for field in self.subfields: if field not in fields: continue name, _, attr = field.partition(".") if name != self.name: continue if attr: value = getattr(self, attr) else: value = self.values # Split 2-dimensional fields into separate columns field_name = field.replace(".", "_") if value.ndim == 1: field_dict[field_name] = value else: for i, val in enumerate(value.T): field_dict[f"{field_name}_{i}"] = val return field_dict def as_dataframe(self, fields=None, index=None) -> pd.DataFrame: """Return a representation of the field as a Pandas DataFrame""" df = pd.DataFrame.from_dict(self.as_dict(fields=fields)) if index is not None: df.set_index(index, drop=True, inplace=True) return df def unit(self, subfield): """Unit(s) of field""" if not subfield: return self._unit else: return self.data.unit(subfield) @property def subfields(self): """Names of field and all subfields""" return [self.name] + [f"{self.name}.{f}" for f in sorted(self._subfields)] # def __getitem__(self, key): # """Pass item access on to the underlying data structure""" # return self.data[key] # def __getattr__(self, key): # """Get a subfield from field or attribute from underlying data structure""" # return getattr(self.data, key) # def __dir__(self): # """List all subfields and attributes on the field""" # return dir(super()) + dir(self.data) # def __len__(self) -> int: # """Length of dataset is equal to number of observations""" # return self.num_obs # def __repr__(self) -> str: # """A string representing the field # """ # return repr(self.data) # def __str__(self) -> str: # """A string describing the field # """ # return str(self.data) PKI[NP''%midgard/data/fieldtypes/collection.py"""A Dataset collection field consisting of other fields """ # Standard library imports from typing import List # Third party imports import numpy as np # Midgard imports from midgard.data import _h5utils from midgard.data import fieldtypes from midgard.data.fieldtypes._fieldtype import FieldType from midgard.dev import console from midgard.dev import exceptions from midgard.dev import plugins class Collection: def __init__(self): self._fields = dict() @property def fields(self): """Names of fields in the collection""" all_fields = list() for field in self._fields.values(): all_fields.extend(field.subfields) return sorted(all_fields) @property def data(self): values = [self[f] for f in self.fields] if values: return np.stack(values, axis=1).reshape(len(values[0]), -1) else: return np.empty((0, 0)) def unit(self, field): """Unit for values in a given field""" mainfield, _, subfield = field.partition(".") try: return self._fields[mainfield].unit(subfield) except KeyError: raise exceptions.FieldDoesNotExistError(f"Field {mainfield!r} does not exist") from None def __getitem__(self, key): """Get a field from the collection using dict-notation""" field, _, attribute = key.partition(".") if attribute: return getattr(self[field], attribute) else: return self._fields[key].data def __setitem__(self, key, value): """Add a field to the collection""" self._fields[key] = value def __getattr__(self, key): """Get a field from the collection using dot-notation""" try: return self.__getitem__(key) except KeyError: raise AttributeError(f"{type(self).__name__!r} has no attribute {key!r}") from None def __dir__(self): """List all fields and attributes on the collection""" return super().__dir__() + self.fields def _ipython_key_completions_(self) -> List[str]: """Tab completion for dict-style lookups in ipython See http://ipython.readthedocs.io/en/stable/config/integrating.html Returns: Fieldnames in the dataset. """ return self.fields def __repr__(self) -> str: """A string representing the collection """ return f"{type(self).__name__}(num_fields={len(self.fields)})" def __str__(self) -> str: """A string describing the dataset """ fields = console.fill(f"Fields: {', '.join(self.fields)}", hanging=8) return f"{self!r}\n{fields}" @plugins.register class CollectionField(FieldType): _factory = staticmethod(Collection) def _post_init(self, val, **field_args): """Initialize field""" self.data = self._factory() # Check that unit is not given if self._unit is not None: raise exceptions.InitializationError("Parameter 'unit' should not be specified for collections") @property def _subfields(self): return self.data.fields @classmethod def read(cls, h5_group, memo): name = h5_group.attrs["fieldname"] field = cls(num_obs=None, name=name, val=None) # num_obs and val not used fields = _h5utils.h5attr2dict(h5_group.attrs["fields"]) for fieldname, fieldtype in fields.items(): field.data[fieldname] = fieldtypes.function(fieldtype).read(h5_group[fieldname]) return field @classmethod def _read(cls, h5_group, memo): """ Abstract field needs implementation, but is not used by collections""" pass def write(self, h5_group, memo, write_level): """Write data to a HDF5 data source""" # Write each field in the collection h5_group.attrs["fieldname"] = self.name for field_name, field in self.data._fields.items(): h5_subgroup = h5_group.create_group(field_name) field.write(h5_subgroup, write_level=write_level) fields = {fn: f.fieldtype for fn, f in self.data._fields.items()} h5_group.attrs["fields"] = _h5utils.dict2h5attr(fields) def _write(self, h5_group, memo): """Abstract method needs implementation, but is not used by collections""" pass PKkeN=Y$!BB midgard/data/fieldtypes/float.py"""A Dataset float field """ # Third party imports import numpy as np # Midgard imports from midgard.data.fieldtypes._fieldtype import FieldType from midgard.dev import plugins @plugins.register class FloatField(FieldType): _factory = staticmethod(np.array) def _post_init(self, val, **field_args): """Initialize float field""" data = self._factory(val, dtype=float) # Check that the correct number of observations are given if len(data) != self.num_obs: raise ValueError(f"{self.name!r} initialized with {len(data)} values, expected {self.num_obs}") # We only support 1- and 2-dimensional arrays if data.ndim < 1 or data.ndim > 2: raise ValueError( f"{self.name!r} initialized with {data.ndim}-dimensional data, " "only 1- and 2-dimensional values are supported" ) # Handle units if self._unit is not None: cols = 1 if data.ndim == 1 else data.shape[1] if isinstance(self._unit, str): self._unit = (self._unit,) * cols elif len(self._unit) != cols: raise ValueError(f"Number of units ({len(self._units)}) must equal number of columns ({cols})") # Store the data as a regular numpy array self.data = data @classmethod def _read(cls, h5_group, _) -> "FieldType": """Read a field from a HDF5 data source""" name = h5_group.attrs["fieldname"] val = h5_group[name][...] return cls(num_obs=len(val), name=name, val=val) def _write(self, h5_group, _) -> None: """Write data to a HDF5 data source""" h5_group.attrs["fieldname"] = self.name h5_field = h5_group.create_dataset(self.name, self.data.shape, dtype=self.data.dtype) h5_field[...] = self.data PKxfNz2ZZ#midgard/data/fieldtypes/position.py"""A Dataset position field """ # Midgard imports from midgard.data.fieldtypes._fieldtype import FieldType from midgard.data.position import Position from midgard.data._position import PositionArray from midgard.dev import exceptions from midgard.dev import plugins @plugins.register class PositionField(FieldType): _subfields = PositionArray._fieldnames() _factory = staticmethod(Position) def _post_init(self, val, **field_args): """Initialize float field""" if isinstance(val, PositionArray): data = val else: data = self._factory(val=val, **field_args) # Check that unit is not given, overwrite with system units if self._unit is not None: raise exceptions.InitializationError("Parameter 'unit' should not be specified for positions") self._unit = data.unit() # Check that the correct number of observations are given if len(data) != self.num_obs: raise ValueError(f"{self.name!r} initialized with {len(data)} values, expected {self.num_obs}") # Store the data as a PositionArray self.data = data @classmethod def _read(cls, h5_group, memo) -> "PositionField": """Read a PositionField from a HDF5 data source""" name = h5_group.attrs["fieldname"] pos = PositionArray._read(h5_group, memo) return cls(num_obs=len(pos), name=name, val=pos) def _write(self, h5_group, memo) -> None: """Write a PositionField to a HDF5 data source""" h5_group.attrs["fieldname"] = self.name self.data._write(h5_group, memo) PKyfN?,)midgard/data/fieldtypes/position_delta.py"""A Dataset position delta field """ # Midgard imports from midgard.data.fieldtypes._fieldtype import FieldType from midgard.data.position import PositionDelta from midgard.data._position import PositionDeltaArray from midgard.dev import exceptions from midgard.dev import plugins @plugins.register class PositionDeltaField(FieldType): _subfields = PositionDeltaArray._fieldnames() _factory = staticmethod(PositionDelta) def _post_init(self, val, **field_args): """Initialize position delta field""" if isinstance(val, PositionDeltaArray): data = val else: data = self._factory(val, **field_args) # Check that unit is not given, overwrite with system units if self._unit is not None: raise exceptions.InitializationError("Parameter 'unit' should not be specified for position deltas") self._unit = data.unit() # Check that the correct number of observations are given if len(data) != self.num_obs: raise ValueError(f"{self.name!r} initialized with {len(data)} values, expected {self.num_obs}") # Store the data as a PositionDeltaArray self.data = data @classmethod def _read(cls, h5_group, memo) -> "PositionDeltaField": """Read a PositionDeltaField from a HDF5 data source""" name = h5_group.attrs["fieldname"] pos_delta = PositionDeltaArray._read(h5_group, memo) return cls(num_obs=len(pos_delta), name=name, val=pos_delta) def _write(self, h5_group, memo) -> None: """Write a PositionDeltaField to a HDF5 data source""" h5_group.attrs["fieldname"] = self.name self.data._write(h5_group, memo) PKyfNW(n{{!midgard/data/fieldtypes/posvel.py"""A Dataset position field """ # Midgard imports from midgard.data.fieldtypes._fieldtype import FieldType from midgard.data.position import PosVel from midgard.data._position import PosVelArray from midgard.dev import exceptions from midgard.dev import plugins @plugins.register class PositionField(FieldType): _subfields = PosVelArray._fieldnames() _factory = staticmethod(PosVel) def _post_init(self, val, **field_args): """Initialize float field""" if isinstance(val, PosVelArray): data = val else: data = self._factory(val=val, **field_args) # Check that unit is not given, overwrite with system units if self._unit is not None: raise exceptions.InitializationError( "Parameter 'unit' should not be specified for positions and velocities" ) self._unit = data.unit() # Check that the correct number of observations are given if len(data) != self.num_obs: raise ValueError(f"{self.name!r} initialized with {len(data)} values, expected {self.num_obs}") # Store the data as a PositionArray self.data = data @classmethod def _read(cls, h5_group, memo) -> "PositionField": """Read a PositionField from a HDF5 data source""" name = h5_group.attrs["fieldname"] pos = PosVelArray._read(h5_group, memo) return cls(num_obs=len(pos), name=name, val=pos) def _write(self, h5_group, memo) -> None: """Write a PositionField to a HDF5 data source""" h5_group.attrs["fieldname"] = self.name self.data._write(h5_group, memo) PKyfNd'midgard/data/fieldtypes/posvel_delta.py"""A Dataset position delta field """ # Midgard imports from midgard.data.fieldtypes._fieldtype import FieldType from midgard.data.position import PosVelDelta from midgard.data._position import PosVelDeltaArray from midgard.dev import exceptions from midgard.dev import plugins @plugins.register class PositionDeltaField(FieldType): _subfields = PosVelDeltaArray._fieldnames() _factory = staticmethod(PosVelDelta) def _post_init(self, val, **field_args): """Initialize position delta field""" if isinstance(val, PosVelDeltaArray): data = val else: data = self._factory(val, **field_args) # Check that unit is not given, overwrite with system units if self._unit is not None: raise exceptions.InitializationError( "Parameter 'unit' should not be specified for position and velocity deltas" ) self._unit = data.unit() # Check that the correct number of observations are given if len(data) != self.num_obs: raise ValueError(f"{self.name!r} initialized with {len(data)} values, expected {self.num_obs}") # Store the data as a PosVelDeltaArray self.data = data @classmethod def _read(cls, h5_group, memo) -> "PositionDeltaField": """Read a PositionDeltaField from a HDF5 data source""" name = h5_group.attrs["fieldname"] pos_delta = PosVelDeltaArray._read(h5_group, memo) return cls(num_obs=len(pos_delta), name=name, val=pos_delta) def _write(self, h5_group, memo) -> None: """Write a PositionDeltaField to a HDF5 data source""" h5_group.attrs["fieldname"] = self.name self.data._write(h5_group, memo) PKn\NxY[btt midgard/data/fieldtypes/sigma.py"""A Dataset sigma field """ # Midgard imports from midgard.data.fieldtypes._fieldtype import FieldType from midgard.data.sigma import SigmaArray from midgard.dev import plugins @plugins.register class SigmaField(FieldType): _subfields = ["sigma"] _factory = staticmethod(SigmaArray) def _post_init(self, val, **field_args): """Initialize float field""" if isinstance(val, SigmaArray): data = val else: data = self._factory(val, **field_args) # Check that the correct number of observations are given if len(data) != self.num_obs: raise ValueError(f"{self.name!r} initialized with {len(data)} values, expected {self.num_obs}") # We only support 1- and 2-dimensional arrays if data.ndim < 1 or data.ndim > 2: raise ValueError( f"{self.name!r} initialized with {data.ndim}-dimensional data, " "only 1- and 2-dimensional values are supported" ) # Handle units if self._unit is not None: cols = 1 if data.ndim == 1 else data.shape[1] if isinstance(self._unit, str): self._unit = (self._unit,) * cols elif len(self._unit) != cols: raise ValueError(f"Number of units ({len(self._units)}) must equal number of columns ({cols})") # Store the data as a SigmaArray self.data = data @classmethod def _read(cls, h5_group, _) -> "SigmaField": """Read a SigmaField from a HDF5 data source""" name = h5_group.attrs["fieldname"] val = h5_group[name][...] sigma = h5_group["sigma"][...] return cls(num_obs=len(val), name=name, val=val, sigma=sigma) def _write(self, h5_group, _) -> None: """Write a SigmaField to a HDF5 data source""" h5_group.attrs["fieldname"] = self.name h5_field = h5_group.create_dataset(self.name, self.data.shape, dtype=self.data.dtype) h5_field[...] = self.data h5_field = h5_group.create_dataset("sigma", self.data.shape, dtype=self.data.sigma.dtype) h5_field[...] = self.data.sigma PKJn[NIbmidgard/data/fieldtypes/text.py"""A Dataset text field """ # Third party imports import numpy as np # Midgard imports from midgard.data.fieldtypes._fieldtype import FieldType from midgard.dev import plugins @plugins.register class TextField(FieldType): _factory = staticmethod(np.array) def _post_init(self, val): """Initialize float field""" data = self._factory(val, dtype=str) # Check that the correct number of observations are given if len(data) != self.num_obs: raise ValueError(f"{self.name!r} initialized with {len(data)} values, expected {self.num_obs}") # We only support 1- and 2-dimensional arrays if data.ndim < 1 or data.ndim > 2: raise ValueError( f"{self.name!r} initialized with {data.ndim}-dimensional data, " "only 1- and 2-dimensional values are supported" ) # Handle units if self._unit is not None: cols = 1 if data.ndim == 1 else data.shape[1] if isinstance(self._unit, str): self._unit = (self._unit,) * cols elif len(self._unit) != cols: raise ValueError(f"Number of units ({len(self._units)}) must equal number of columns ({cols})") # Store the data as a regular numpy array self.data = data @classmethod def _read(cls, h5_group, _) -> "FieldType": """Read a field from a HDF5 data source""" name = h5_group.attrs["fieldname"] val = h5_group[name][...] return cls(num_obs=len(val), name=name, val=val) def _write(self, h5_group, _) -> None: """Write data to a HDF5 data source""" h5_group.attrs["fieldname"] = self.name # Convert text from unicode to string to avoid error in h5py data = np.asarray(self.data, dtype=np.string_) h5_field = h5_group.create_dataset(self.name, self.data.shape, dtype=data.dtype) h5_field[...] = data PKzfN=*midgard/data/fieldtypes/time.py"""A Dataset time field """ # Midgard imports from midgard.data.fieldtypes._fieldtype import FieldType from midgard.data.time import Time, TimeArray from midgard.dev import exceptions from midgard.dev import plugins @plugins.register class TimeField(FieldType): _factory = staticmethod(Time) def _post_init(self, val, **field_args): """Initialize float field""" if isinstance(val, TimeArray): data = val else: data = self._factory(val, **field_args) # Check that unit is not given, overwrite with time scale if self._unit is not None: raise exceptions.InitializationError("Parameter 'unit' should not be specified for times") self._unit = (data.scale,) # Check that the correct number of observations are given if len(data) != self.num_obs: raise ValueError(f"{self.name!r} initialized with {len(data)} values, expected {self.num_obs}") # Store the data as a TimeArray self.data = data @classmethod def _read(cls, h5_group, memo) -> "TimeField": """Read a TimeField from a HDF5 data source""" name = h5_group.attrs["fieldname"] time = TimeArray._read(h5_group, memo) return cls(num_obs=len(time), name=name, val=time) def _write(self, h5_group, memo) -> None: """Write a TimeField to a HDF5 data source""" h5_group.attrs["fieldname"] = self.name self.data._write(h5_group, memo) PKv|Lmidgard/dev/__init__.pyPK5J>Nmidgard/dev/cache.py"""Midgard library module for caching Description: ------------ Adds caching to properties on classes, using the recipe explained in Python Cookbook, 3rd ed. Recipe 8.10. Also, adds a cached property that may depend on other data, with the possibility to reset the cache if the dependencies change. """ # Standard library imports import functools from typing import Any, cast, Callable, Dict, List, Set, Tuple # Cache for dependent properties _DEPENDENT_PROPERTIES: Dict[str, Set[Tuple[str, str]]] = dict() class property: """Cached property, see Python Cookbook, 3rd ed. Recipe 8.10.""" def __init__(self, fget: Callable) -> None: """Set up decorator""" self.fget = fget functools.update_wrapper(cast(Callable, self), fget) def __get__(self, instance: Any, cls: Any) -> Any: """Overwrite the value, the first time the property is calculated""" if instance is None: return self else: value = self.fget(instance) setattr(instance, self.fget.__name__, value) return value class register_dependencies(type): """Metaclass for registering dependencies using dot notation""" _dependencies: List[str] = list() def __call__(cls, fget: Callable) -> Any: *container, name = fget.__qualname__.split(".") for dependency in cls._dependencies: _DEPENDENT_PROPERTIES.setdefault(dependency, set()).add((".".join(container), name)) return super().__call__(fget) def __getattr__(cls, key: str) -> Any: cls._dependencies.append(key) return cls class dependent_property(property, metaclass=register_dependencies): """Decorator for cached properties that can be reset when dependencies change""" pass def forget_dependent_values(obj: object, *dependencies: str) -> None: """Reset cache when given dependencies change""" for dependency in dependencies: for container, prop in _DEPENDENT_PROPERTIES.get(dependency, ()): if container == obj.__class__.__qualname__: try: delattr(obj, prop) except AttributeError: pass # Include lru_cache from standard library for completeness function = functools.lru_cache PKsiMG)00midgard/dev/console.py"""Simpler dealing with the console Description: ------------ Utilities for using the console. Mainly wrappers around other libraries to make them easier and more intuitive to use. Size of console: The two functions `lines()` and `columns()` report the current size of the console. Textwrapping: The function `fill()` can be used to rewrap a text-string so that it fits inside the console. Color: The sub-module `color` can be used to set the foreground and background colors. Note that the color functionality depends on the external package `colorama`. If `colorama` is not installed, color gracefully falls back to not showing any color. Examples: --------- >>> from midgard.dev import console >>> console.columns() # doctest: +SKIP 86 >>> print(console.fill(a_very_long_string)) # doctest: +SKIP Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras tempus eleifend feugiat. Maecenas vitae posuere metus. Sed sit amet fermentum velit. Aenean vitae turpis at risus sollicitudin fringilla in in nisi. Maecenas vitae ante libero. Aenean ut eros consequat, ornare erat at, tempus arcu. Suspendisse velit leo, eleifend eget mi non, vehicula ultricies erat. Vestibulum id nisi eget nisl venenatis dignissim. Duis cursus quam dui, vel hendrerit nibh lacinia id. >>> print(console.color.Fore.YELLOW + console.color.Back.BLUE + 'I am YELLOW text on BLUE backdrop!') # doctest: +SKIP I am YELLOW text on a BLUE background! """ # Standard library imports import shutil import textwrap from typing import Any, Optional # Midgard imports from midgard.dev import optional # Use colorama for coloring in the console, graceful fallback to no color if colorama is not installed _empty_string = optional.EmptyStringMock("_empty_string") color = optional.optional_import( "colorama", attrs=dict(init=lambda **_: None, Back=_empty_string, Fore=_empty_string, Style=_empty_string) ) color.init(autoreset=True) def lines() -> int: """The height of the console Returns: The heigth of the console in characters. """ return shutil.get_terminal_size().lines def columns() -> int: """The width of the console Returns: The width of the console in characters. """ return shutil.get_terminal_size().columns def fill(text: str, *, width: Optional[int] = None, hanging: Optional[int] = None, **tw_args: Any) -> str: """Wrapper around textwrap.fill The `tw_args` are passed on to textwrap.fill. See textwrap.TextWrapper for available keyword arguments. The default value for `width` is console.columns(), while the new argument `hanging`, if defined, will try to set (although not override) the textwrap-arguments `initial_indent` and `subsequent_indent` to create a hanging indent (no indent on the first line) of `hanging` spaces. Args: text: Text that will be wrapped. width: The maximum width (in characters) of wrapped lines. hanging: Number of characters used for hanging indent. tw_args: Arguments passed on to `textwrap.fill`. Returns: Wrapped string. """ width = columns() if width is None else width if hanging is not None: tw_args.setdefault("initial_indent", "") tw_args.setdefault("subsequent_indent", " " * hanging) return textwrap.fill(text, width=width, **tw_args) def dedent(text: str, num_spaces: Optional[int] = None) -> str: """Wrapper around textwrap.dedent Dedents at most num_spaces. If num_spaces is not specified, dedents as much as possible. Args: text: Text that will be dedented. num_spaces: Number of spaces that will be used for dedentation. Returns: Dedented string. """ # Dedent the text all the way dedented_text = textwrap.dedent(text) if num_spaces is None: return dedented_text # Indent it back if necessary num_indents = (num_leading_spaces(text) - num_leading_spaces(dedented_text)) - num_spaces if num_indents > 0: return indent(dedented_text, num_spaces=num_indents) else: return dedented_text def indent(text: str, num_spaces: int, **tw_args: Any) -> str: """Wrapper around textwrap.indent The `tw_args` are passed on to textwrap.indent. Args: text: Text that will be indented. num_spaces: Number of spaces that will be used for indentation. Returns: Indented string. """ tw_args["prefix"] = " " * num_spaces return textwrap.indent(text, **tw_args) def num_leading_spaces(text: str, space_char: str = " ") -> int: """Count number of leading spaces in a string Args: text: String to count. space_char: Which characters count as spaces. Returns: Number of leading spaces. """ return len(text) - len(text.lstrip(space_char)) PKK8NP3midgard/dev/exceptions.py"""Definition of Midgard-specific exceptions Description: ------------ Custom exceptions used by Midgard for more specific error messages and handling. """ class MidgardException(Exception): pass class MidgardExit(SystemExit, MidgardException): pass class InitializationError(MidgardException): pass class FieldExistsError(MidgardException): pass class FieldDoesNotExistError(MidgardException): pass class MissingConfigurationError(MidgardException): pass class MissingDataError(MidgardException): pass class MissingEntryError(MidgardException): pass class MissingSectionError(MidgardException): pass class ParserError(MidgardException): pass class TimerNotRunning(MidgardException): pass class TimerRunning(MidgardException): pass class UnknownConstantError(MidgardException): pass class UnknownConversionError(MidgardException): pass class UnknownEnumError(MidgardException): pass class UnknownPackageError(MidgardException): pass class UnknownPluginError(MidgardException): pass class UnknownSystemError(MidgardException): pass class UnitError(MidgardException): pass PKOnANmidgard/dev/interactive.pyPK8YL/8<<midgard/dev/library.py"""Python wrapper around C-libraries Description: ------------ Loads a C-library. If a library is missing, a mock library is returned. If this mock is used for anything, a warning will be printed. This is done to avoid dependencies to all the C/C++-libraries for Python programs only using some of them. """ # Standard library imports import ctypes as c from ctypes import util as c_util import sys def load_name(library_name, func_specs=None, name_patterns=None): """Load the given shared C-library See `load_path` for an explanation of the `func_specs` and `name_patterns`-arguments. Args: library_name (String): The name of the library. func_specs (Dict): Specification of types in lib (see load_path). name_patterns (List): Name mangling patterns (see load_path). Returns: ctypes.CDLL: Representation of the shared library. """ library_path = c_util.find_library(library_name) return load_path(library_path, func_specs=func_specs, name_patterns=name_patterns) def load_path(library_path, func_specs=None, name_patterns=None): """Load the given shared C-library The optional func_specs-dictionary can be used to specify argument and return types of functions in the library (see the ctypes documentation for information about argtypes and restype). The dictionary should be on the form:: func_spec = {'func_1': dict(func_name='name_of_func_1_in_lib', argtypes=[ ... argtypes of func_1 ... ], restype=... restype of func_1 ...), 'func_2': ... } If the library is not found, a mock library is returned instead. The mock library will print a warning if it is used. For some libraries, name mangling is used and this might be different depending on operating system and how the library is compiled. For instance, in a Fortran library the function `Test` might be represented as `__Test` on a Windows system and `test_` (with lower-case `t`) on a Linux system. This can be handled by providing a list of possible patterns. The above example can be handled by:: name_patterns = ('__{func_name}', '{func_name_lower}_') In this case, each function in func_specs is looked up by testing each pattern in turn until a match is found. Args: library_path (String): The path to the library. func_specs (Dict): Specification of types in library (see above). name_patterns (List): Name mangling patterns (see above). Returns: ctypes.CDLL: Representation of the shared library. """ library_handle = c.cdll.LoadLibrary(library_path) # Return a Mock object if the library is not found if library_handle._name is None: mock = SimpleMock(name=library_path) return mock # Handle name mangling if name_patterns is None: def mangled_name(name): return name else: def mangled_name(name): for pattern in name_patterns: full_name = pattern.format(func_name=name, func_name_lower=name.lower()) if hasattr(library_handle, full_name): return full_name return name # Set argument and return types on functions if func_specs: for name, spec in func_specs.items(): if "func_name" in spec: func_name = mangled_name(spec["func_name"]) else: func_name = mangled_name(name) func = getattr(library_handle, func_name) delattr(library_handle, func_name) if "argtypes" in spec: func.argtypes = spec["argtypes"] if "restype" in spec: func.restype = spec["restype"] setattr(library_handle, name, func) return library_handle class SimpleMock: """Class that can stand in for any other object The SimpleMock is used to stand in for any library that can not be imported. The mock object simply returns itself whenever it is called, or any attributes are looked up on the object. This is done, to avoid ImportErrors when a library is imported, but never used (typically because a plugin is loaded but never called). Instead the ImportError is raised when the SimpleMock is used in any way. The ImportError will only be raised once for any SimpleMock-object (which is only important if the ImportError is caught and the program carries on). """ def __init__(self, name, raise_error=True): """Initialize SimpleMock object Args: name (String): Name of SimpleMock-object. raise_error (Bool): Should ImportError be raised when using object. """ self._name = name self._children = dict() self._raise_error = raise_error def _raise_import_error(self): """Raise an import error when the SimpleMock object is used The ImportError is only raised the first time the object is used. """ # Only raise the error once if self._raise_error: self._raise_error = False else: return # Find calling function caller = sys._getframe() while caller.f_code.co_filename == __file__: caller = caller.f_back func_name = caller.f_code.co_name line_no = caller.f_lineno file_name = caller.f_code.co_filename # Raise ImportError with a helpful message raise ImportError( "The library '{}' is not installed, but is used by '{}' on line {} of {}" "".format(self._name, func_name, line_no, file_name) ) def __call__(self, *args, **kwargs): """Return the same SimpleMock-object when it is called An ImportError is raised the first time the object is used. Returns: SimpleMock: Itself. """ self._raise_import_error() return self def __getattr__(self, key): """Create a child-SimpleMock-object and return it. The same child object is returned if the same attribute is gotten several times. An ImportError is raised the first time the SimpleMock-object is used. Additional errors are not raised for the children. Args: key (String): Name of attribute. Returns: SimpleMock: A child-SimpleMock-object. """ self._raise_import_error() if key not in self._children: self._children[key] = type(self)("{}.{}".format(self._name, key), raise_error=self._raise_error) setattr(self, key, self._children[key]) return self._children[key] def __repr__(self): """String representation of the SimpleMock-object Returns: String: Simple representation of the SimpleMock-object. """ return "{}('{}')".format(self.__class__.__name__, self._name) def __str__(self): """Convert to the empty string Returns: String: An empty string. """ return "" PK.tNN6ZRRmidgard/dev/log.py"""Midgard library module for logging Description: ------------ This module provides simple logging inside Midgard. To use it, you must first add a an active logger. This is typically done using one of the init-functions: init() or file_init(). To write a log message, simply call one of midgard.log.debug, midgard.log.info, midgard.log.warn, midgard.log.error or midgard.log.fatal with a log message. To add a different logger, you should subclass the Logger abstract class. Example: -------- >>> from midgard.dev import log >>> log.init("info", prefix="My prefix") >>> n, m = 5, 3 >>> log.info(f"Calculating the inverse of a {n:>2d}x{m:<2d} matrix") INFO [My prefix] Calculating the inverse of a 5x3 matrix """ # Standard library imports import abc from datetime import datetime import functools import pathlib import sys from typing import Callable, Optional, Union # Midgard imports from midgard.collections import enums # LogLevel type LogLevel = enums.get_enum("log_level") # Overview over active loggers _ACTIVE_LOGGERS = dict() # # Loggers # class Logger(abc.ABC): """Abstract class that can be specialized to create new loggers""" name = "logger" log_levels = enums.get_enum("log_level") def __init__(self, log_level: Optional[str] = None, prefix: str = ""): """Register as an active logger, and set up parameters""" _ACTIVE_LOGGERS[self.name] = self # Use "warn" as default log level, None used as default parameter to allow for other default behaviours log_level = "warn" if log_level is None else log_level # Check and set log_level and prefix self.log_level = log_level self.prefix = prefix @property def log_level(self) -> LogLevel: """Log level as an enum""" return self._log_level @log_level.setter def log_level(self, value: str) -> None: """Set log level using string, checks that log level is a valid level""" self._log_level = enums.get_value("log_level", value) @property def prefix(self) -> str: """Prefix before log text""" return self._prefix @prefix.setter def prefix(self, value: str) -> None: """Set prefix""" self._prefix = f"[{value}] " if value else "" @classmethod def is_level(cls, level: str) -> bool: """Checks that level is a valid log level""" return enums.has_value("log_level", level) @classmethod def get_color(cls, level: LogLevel) -> str: """Get color string for the given log level""" return enums.get_value("log_color", level.name, default="") @abc.abstractmethod def blank(self) -> None: """Log blank line""" @abc.abstractmethod def log(self, level: LogLevel, log_text: str) -> None: """Log text at given level""" def __repr__(self): """Representation of logger""" return f"{type(self).__name__}({self.name!r}, log_level={self.log_level.name!r})" class ConsoleLogger(Logger): """Log to the console, the log level can also be set using command line parameters""" name = "console" def __init__(self, log_level: Optional[str] = None, prefix: str = "", use_command_line: bool = True) -> None: """Create console logger and register as an active logger""" # Use command line parameters if log level is given as `--level` if use_command_line: for arg in [o for o in sys.argv[1:] if o.startswith("--")]: if self.is_level(arg[2:]): log_level = arg[2:] break # Register as normal super().__init__(log_level=log_level, prefix=prefix) def blank(self) -> None: """Log blank line""" if self.log_level < self.log_levels.none: print() def log(self, level: LogLevel, log_text: str) -> None: """Log text at given level""" if level < self.log_level: return color = self.get_color(level) stream = sys.stderr if level >= self.log_levels.warn else sys.stdout print(f"{color}{level.name.upper():<6}{self.prefix}{log_text}", file=stream) class FileLogger(Logger): """Log to a file, the log files can be rotated so that older files are kept""" def __init__( self, file_path: Union[str, pathlib.Path], log_level: Optional[str] = None, prefix: str = "", rotation: Optional[int] = None, ) -> None: """Create file logger and register as an active logger""" # Store file path and generate name self.file_path = pathlib.Path(file_path).resolve() self.name = f"file://{self.file_path}" # Rotate old log files and open log file for writing if rotation is not None: self.rotate_files(rotation) self.fid = open(file_path, mode="wt") # Register as an active logger super().__init__(log_level=log_level, prefix=prefix) def __del__(self): """Close log file when logger is deleted""" try: self.fid.close() except AttributeError: pass # self.fid not defined, nothing to close def blank(self) -> None: """Log blank line""" if self.log_level < self.log_levels.none: self.fid.write("\n") def log(self, level: LogLevel, log_text: str) -> None: """Log text at given level""" if level < self.log_level: return timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.fid.write(f"{level.name.upper():<6} {timestamp} {self.prefix}{log_text}\n") def rotate_files(self, rotation: int) -> None: """Perform necessary rolling of log files Rolls the log files. That is, if there are old log files, they will be moved to files with extension .0, .1 and so on. The number of rolled logs to keep is specified by the rotation parameter. Args: rotation: Number of log files to keep. """ if not self.file_path.exists(): return directory = self.file_path.parent log_name = self.file_path.name for roll in range(rotation - 1)[::-1]: if (directory / f"{log_name}.{roll}").exists(): (directory / f"{log_name}.{roll}").replace(directory / f"{log_name}.{roll + 1}") if rotation > 0: self.file_path.replace(directory / f"{log_name}.0") # # Module functions used for logging # def log(log_text: str, level: str) -> None: """Log text at the given level""" if not _ACTIVE_LOGGERS: return # Looking up the level dynamically, as the enum may have been updated level = enums.get_value("log_level", level) for output in _ACTIVE_LOGGERS.values(): output.log(level, log_text) def blank() -> None: """Log blank line""" for output in _ACTIVE_LOGGERS.values(): output.blank() # Aliases for initializing loggers init = ConsoleLogger file_init = FileLogger # Make each log level available as a function for level in enums.get_enum("log_level"): globals()[level.name] = functools.partial(log, level=level.name) # # Convenience functions related to logging # def print_file( log_path: Union[str, pathlib.Path], log_level: str = "info", print_func: Callable[[str], None] = print ) -> None: """Print a log file with colors, stripping away any item below log_level""" log_level = enums.get_value("log_level", log_level) log_path = pathlib.Path(log_path) current_level = log_level with log_path.open(mode="r") as fid: # lib.files is not available in lib.log for line in fid: line_level = line.partition(" ")[0].lower() current_level = enums.get_value("log_level", line_level, default=current_level) if current_level >= log_level: color = Logger.get_color(current_level) print_func(f"{color}| {line.rstrip()}") PKHwM {4kkmidgard/dev/optional.py"""Midgard library module for handling optional dependencies Description: ------------ Import dependencies that are only necessary for specific parts of Midgard. Using this module will delay raising an ImportError until the dependency is actually used. This means that if one for instance only wants to run a GNSS analysis (or only use a Rinex-parser) installing special libraries only used for VLBI is not necessary. Examples: --------- The optional import is typically used as follows:: from midgard.lib import optional netCDF4 = optional.optional_import('netCDF4') """ # Standard library imports import importlib import sys from typing import Any, Dict, Optional, Union class SimpleMock: """Class that can stand in for any other object The SimpleMock is used to stand in for any library that can not be imported. The mock object simply returns itself whenever it is called, or any attributes are looked up on the object. This is done, to avoid ImportErrors when a library is imported, but never used (for instance if a plugin is loaded but never called). Instead the ImportError is raised when the SimpleMock is used in any way. The ImportError will only be raised once for any SimpleMock-object (which is only important if the ImportError is caught and the program carries on). The exception is if any attributes (`attrs`) are explicitly defined on the mock. No exception is raised if those attributes are looked up. """ def __init__( self, name: str, raise_error: bool = True, attrs: Optional[Dict[str, Any]] = None, error_msg: Optional[str] = None, ) -> None: """Initialize SimpleMock object Args: name: Name of SimpleMock-object. Used for string representation and when raising Errors. raise_error: Whether ImportError should be raised when object is used. attrs: Attributes that should be added to the SimpleMock. error_msg: Text that will be added to error message. """ self._name = name self._children: Dict[str, "SimpleMock"] = dict() self._raise_error = raise_error self._attrs = attrs self._error_msg = error_msg if attrs is not None: for name, attr in attrs.items(): setattr(self, name, attr) def _raise_import_error(self) -> None: """Raise an import error when the SimpleMock object is used The ImportError is only raised the first time the object is used. """ # Only raise the error once if self._raise_error: self._raise_error = False else: return # Find calling function caller = sys._getframe() while caller.f_code.co_filename == __file__: caller = caller.f_back func_name = caller.f_code.co_name line_num = caller.f_lineno file_name = caller.f_code.co_filename # Raise ImportError with a helpful message error_msg = ( f"The module '{self._name}' is not installed, " f"but is used by '{func_name}' on line {line_num} of {file_name}" ) if self._error_msg: error_msg += f".\n {self._error_msg}" raise ImportError(error_msg) def __call__(self, *args: Any, **kwargs: Any) -> "SimpleMock": """Return the same SimpleMock-object when it is called An ImportError is raised the first time the object is used. Returns: Itself. """ self._raise_import_error() return self def __getattr__(self, key: str) -> Any: """Create a child-SimpleMock-object and return it. The same child object is returned if the same attribute is gotten several times. An ImportError is raised the first time the SimpleMock-object is used. Additional errors are not raised for the children. Args: key: Name of attribute. Returns: A child-SimpleMock-object. """ self._raise_import_error() if key not in self._children: child_name = f"{self._name}.{key}" self._children[key] = type(self)(child_name, raise_error=self._raise_error, attrs=self._attrs) setattr(self, key, self._children[key]) return self._children[key] def __repr__(self) -> str: """String representation of the SimpleMock-object Returns: Simple string representation of the SimpleMock-object. """ return f"{self.__class__.__name__}('{self._name}')" class EmptyStringMock(SimpleMock): """A mock object whose properties are all empty strings """ def __getattr__(self, key: str) -> str: """All attributes are empty strings. Args: key: Name of attribute. Returns: An empty string. """ return "" def optional_import( module_name: str, raise_error: bool = True, mock_cls: type = SimpleMock, attrs: Optional[Dict[str, Any]] = None ) -> Union[Any, SimpleMock]: # TODO: Should be typed types.ModuleType? but causes typing errors """Try to import an optional module If the module does not exist, a SimpleMock-object is returned instead. If this SimpleMock-object is later used, an ImportError will be raised then (if `raise_error` is True, which is default). Args: module_name: Name of module to import. raise_error: Whether an ImportError should be raised if the module does not exist, but is used. attrs: Attributes that should be added to the SimpleMock used if the module does not exist. Returns: Imported module object, or a SimpleMock-object if the module can not be imported. """ try: return importlib.import_module(module_name) except ImportError: return mock_cls(module_name, raise_error=raise_error, attrs=attrs) PK@lAN$UUmidgard/dev/plugins.py"""Set up a plug-in architecture for Midgard Description: ------------ In order to be able to add models, parsers, data sources etc without needing to hardcode names, but rather pick them from configuration files, we use a simple plug-in architecture. The plug-in mechanism is based on the different plug-ins registering themselves using the `register` decorator: from midgard.dev import plugins @plugins.register def simple_model(rundate, tech, dset): ... Plug-ins are registered based on the name of the module (file) they are defined in, as well as the package (directory) which contains them. Typically all plug-ins of a given type are collected in a package, e.g. models, techniques, parsers, etc. To list all plug-ins in a package use `names`: > from midgard.dev import plugins > plugins.names('midgard.models') ['model_one', 'model_three', 'model_two'] If the optional parameter `config_key` is given, then only plug-ins listed in the corresponding section in the current configuration file is listed. For instance, if the configuration file contains a line saying ham_models = model_three, model_one then we can list only the `ham_models` as follows: > from midgard.dev import plugins > plugins.names('midgard.models', config_key='ham_models') ['model_one', 'model_three'] Note that the plug-ins by default are sorted alphabetically. To run the plug-ins, use either `call_all` or `call_one`. The former calls all plug-ins and returns a dictionary containing the result from each plug-in. As with `names` the optional parameter `config_key` may be given: > from midgard.dev import plugins > plugins.call_all('midgard.models', config_key='ham_models', arg_to_plugin='hello') {'model_three': , 'model_one': } Arguments to the plug-ins should be passed as named arguments to `call_all`. Similarly, one plug-in may be called explicitly using `call_one`: > from midgard.dev import plugins > plugins.call_one('midgard.models', plugin_name='model_one', arg_to_plugin='hello') There may be more than one function in each plug-in that is decorated by `register`. In this case, the default behaviour is that only the first function will be called. To call the other registered functions one should use the `list_parts` function to get a list of these functions and call them explicitly using the `part` optional parameter to `call_one`: > from midgard.dev import plugins > plugins.list_parts('midgard.techniques', plugin_name='vlbi') ['read', 'edit', 'calculate', 'estimate', 'write_result']) > for part in plugins.list_parts('midgard.techniques', plugin_name='vlbi'): ... plugins.call_one('midgard.techniques', plugin_name='vlbi', part=part, ...) """ # Standard library imports import functools import importlib import inspect import pathlib import re import sys from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Tuple # Midgard imports from midgard.dev import console from midgard.dev.exceptions import UnknownPackageError, UnknownPluginError from midgard.dev import log from midgard.files import dependencies # The _PLUGINS-dict is populated by the `register` decorator in each module. _PLUGINS: Dict[str, Dict[str, Any]] = dict(__aliases__=dict(), __packages__=dict()) # Simple structure containing information about a plug-in class Plugin(NamedTuple): """Information about a plug-in Args: name: Name of the plug-in. function: The plug-in. file_path: Path to the source code of the plug-in, may be used to add the source as a dependency. sort_value: Value used when sorting plug-ins in order to control the order they are called. """ name: str function: Callable file_path: pathlib.Path sort_value: int # # REGISTER PLUG-INS # def register(func: Callable, name: Optional[str] = None, sort_value: int = 0) -> Callable: """Decorator used to register a plug-in Plug-ins are registered based on the name of the module (file) they are defined in, as well as the package (directory) which contains them. Typically all plug-ins of a given type are collected in a package, e.g. models, techniques, parsers, etc. The path to the source code file is also stored. This is used to be able to add the source code as a dependency file when the plug-in is called. If `name` is given, the plug-in is registered based on this name instead of the name of the module. The name of the module is still registered as a part that can be used to distinguish between similar plug-ins in different files (see for instance how `session` is used in `midgard.pipelines`). Args: func: The function that is being registered. name: Alternative name of plug-in. Used by `register_named`. sort_value: The value used when sorting plug-ins. Used by `register_ordered`. Returns: The function that is being registered. """ # Get information from the function being registered package_name, _, plugin_name = func.__module__.rpartition(".") package_name = _PLUGINS["__aliases__"].get(package_name, package_name) file_path = pathlib.Path(sys.modules[func.__module__].__file__) # Store Plugin-object in _PLUGINS dictionary _PLUGINS["__packages__"].setdefault(package_name, [package_name]) plugin_info = _PLUGINS.setdefault(package_name, dict()).setdefault(plugin_name, dict()) if name is None: name = func.__name__ # Name of function is used as default name plugin_info.setdefault("__parts__", list()).append(name) # Only unnamed parts are added to list plugin = Plugin(f"{plugin_name}.{name}", func, file_path, sort_value) plugin_info[name] = plugin log.debug(f"Registering {plugin.name} ({plugin.file_path}) as a {package_name}-plugin") # Add first registered unnamed part as default if "__parts__" in plugin_info: plugin_info["__default__"] = plugin_info[plugin_info["__parts__"][0]] return func def register_named(name: str) -> Callable: """Decorator used to register a named plug-in This allows for overriding the name used to register the plug-in. See `register` for more details. Args: name: Name used for plug-in instead of module name. Returns: Decorator that registers a named function. """ return functools.partial(register, name=name) def register_ordered(sort_value: int) -> Callable: """Decorator used to register a plug-in with a specific sort order The sort value should be a number. Lower numbers are sorted first, higher numbers last. Plug-ins without an explicit sort_order gets the sort value of 0. Args: sort_value: The value used when sorting plug-ins. Returns: Decorator that registers an ordered function. """ return functools.partial(register, sort_value=sort_value) def get(package_name: str, plugin_name: str, part: Optional[str] = None, prefix: Optional[str] = None) -> Plugin: """Get a specific plugin-object If the plug-in is not part of the package an UnknownPluginError is raised. If there are several functions registered in a plug-in and `part` is not specified, then the first function registered in the plug-in will be called. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in, i.e. the module containing the plug-in. part: Name of function to call within the plug-in (optional). prefix: Prefix of the plug-in name, used if the plug-in name is not found (optional). Returns: Plugin-namedtuple representing the plug-in. """ # Get Plugin-object plugin_name = load(package_name, plugin_name, prefix=prefix) part = "__default__" if part is None else part try: return _PLUGINS[package_name][plugin_name][part] except KeyError: # TODO: List available plugins raise UnknownPluginError(f"Plugin {part!r} not found for {plugin_name!r} in {package_name!r}") from None # # CALL PLUG-INS # def call( package_name: str, plugin_name: str, part: Optional[str] = None, prefix: Optional[str] = None, plugin_logger: Optional[Callable[[str], None]] = None, **plugin_args: Any, ) -> Any: """Call one plug-in Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in, i.e. the module containing the plug-in. part: Name of function to call within the plug-in (optional). prefix: Prefix of the plug-in name, used if the plug-in name is not found (optional). plugin_logger: Function used for logging (optional). plugin_args: Named arguments passed on to the plug-in. Returns: Return value of the plug-in. """ plugin = get(package_name, plugin_name, part, prefix) # Log message about calling plug-in if plugin_logger is not None: plugin_logger(f"Start plug-in {plugin.name!r} in {package_name!r}") # Add dependency to the plug-in dependencies.add(plugin.file_path, label=f"plugin:{package_name}") # Call plug-in return plugin.function(**plugin_args) def call_all( package_name: str, plugins: Optional[List[str]] = None, part: Optional[str] = None, prefix: Optional[str] = None, plugin_logger: Optional[Callable[[str], None]] = None, **plugin_args: Any, ) -> Dict[str, Any]: """Call all plug-ins in a package If `plugins` is given, it should be a list of names of plug-ins. If a plug-in listed in the `plugins`-list or in the config file does not exist, an UnknownPluginError is raised. If `plugins` is not given, all available plugins will be called. Do note, however, that this will import all python files in the package. Args: package_name: Name of package containing plug-ins. plugins: List of plug-in names that should be used (optional). part: Name of function to call within the plug-ins (optional). prefix: Prefix of the plug-in names, used for a plug-in if it is not found (optional). plugin_logger: Function used for logging (optional). plugin_args: Named arguments passed on to all the plug-ins. Returns: Dictionary of all results from the plug-ins. """ plugin_names = names(package_name, plugins=plugins, prefix=prefix) return {p: call(package_name, p, part=part, plugin_logger=plugin_logger, **plugin_args) for p in plugin_names} # # GET DOCUMENTATION FOR PLUG-INS # def doc( package_name: str, plugin_name: str, part: Optional[str] = None, prefix: Optional[str] = None, long_doc: bool = True, include_details: bool = False, use_module: bool = False, ) -> str: """Document one plug-in If the plug-in is not part of the package an UnknownPluginError is raised. If there are several functions registered in a plug-in and `part` is not specified, then the first function registered in the plug-in will be documented. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in, i.e. the module containing the plug-in. part: Name of function to call within the plug-in (optional). prefix: Prefix of the plug-in name, used if the plug-in name is unknown (optional). long_doc: Whether to return the long doc-string or the short one-line string (optional). include_details: Whether to include development details like parameters and return values (optional). use_module: Whether to use module doc-string instead of plug-in doc-string (optional). Returns: Documentation of the plug-in. """ # Get Plugin-object and pick out doc-string plugin = get(package_name, plugin_name, part, prefix) if use_module: doc = sys.modules[plugin.function.__module__].__doc__ or "" else: doc = plugin.function.__doc__ or "" if long_doc: # Strip short description and indentation doc = console.dedent("\n\n".join(doc.split("\n\n")[1:])) lines = doc.rstrip().splitlines() # Stop before Args:, Returns: etc if details should not be included idx_args = len(lines) if not include_details: re_args = re.compile("(Args:|Returns:|Details:|Examples?:|Attributes:)$") try: idx_args = [re_args.match(l) is not None for l in lines].index(True) except ValueError: pass return "\n".join(lines[:idx_args]).strip() else: # Return short description return doc.split("\n\n")[0].replace("\n", " ").strip() def doc_all( package_name: str, plugins: Optional[Iterable[str]] = None, prefix: Optional[str] = None, long_doc: bool = True, include_details: bool = False, use_module: bool = False, ) -> Dict[str, str]: """Call all plug-ins in a package If `plugins` is given, it should be a list of names of plug-ins. If a plug-in listed in the `plugins`-list does not exist, an UnknownPluginError is raised. If `plugins` is not given, all available plugins will be called. Do note, however, that this will import all python files in the package. Args: package_name: Name of package containing plug-ins. plugins: List of plug-ins that should be used (optional). prefix: Prefix of the plug-in names, used if any of the plug-ins are unknown (optional). long_doc: Whether to return the long doc-string or the short one-line string (optional). include_details: Whether to include development details like parameters and return values (optional). use_module: Whether to use module doc-string instead of plug-in doc-string (optional). Returns: Dictionary of all doc-strings from the plug-ins. """ plugin_names = names(package_name, plugins=plugins, prefix=prefix) return { p: doc(package_name, p, long_doc=long_doc, include_details=include_details, use_module=use_module) for p in plugin_names } def signature( package_name: str, plugin_name: str, part: Optional[str] = None, prefix: Optional[str] = None ) -> inspect.Signature: """Get signature of a plug-in If the plug-in is not part of the package an UnknownPluginError is raised. If there are several functions registered in a plug-in and `part` is not specified, then the first function registered in the plug-in will be documented. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in, i.e. the module containing the plug-in. part: Name of function to call within the plug-in (optional). prefix: Prefix of the plug-in name, used if the plug-in name is unknown (optional). Returns: Signature of the plugin """ # Get Plugin-object and pick out signature plugin = get(package_name, plugin_name, part, prefix) return inspect.signature(plugin.function) # # LIST AVAILABLE PLUG-INS # def names(package_name: str, plugins: Optional[Iterable[str]] = None, prefix: Optional[str] = None) -> List[str]: """List plug-ins in a package If `plugins` is given, it should be a list of names of plug-ins. If a plug-in listed in the `plugins`-list does not exist, an UnknownPluginError is raised. If `plugins` is not given, all available plugins will be listed. Do note, however, that this will import all python files in the package. Args: package_name: Name of package containing plug-ins. plugins: List of plug-ins that should be used (optional). prefix: Prefix of the plug-in names, used if any of the plug-in names are unknown (optional). Returns: List of strings with names of plug-ins. """ # Figure out names of plug-ins if plugins is None: _import_all(package_name) plugins = _PLUGINS.get(package_name, dict()).keys() # Load each plug-in and return them in sort order def _sort_value(plugin: str) -> Tuple[int, str]: """Pick out sort_value of plugin""" return (getattr(_PLUGINS[package_name][plugin].get("__default__"), "sort_value", 0), plugin) return sorted((load(package_name, p, prefix=prefix) for p in plugins), key=_sort_value) def parts(package_name: str, plugin_name: str, prefix: Optional[str] = None) -> List[str]: """List all parts of one plug-in Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in. prefix: Prefix of the plug-in name, used if the plug-in name is unknown (optional). Returns: List: Strings with names of parts. """ plugin_name = load(package_name, plugin_name, prefix=prefix) return _PLUGINS[package_name][plugin_name].get("__parts__", list()) def exists(package_name: str, plugin_name: str) -> bool: """Check whether or not a plug-in exists in a package Tries to import the given plug-in. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in (module). Returns: True if plug-in exists, False otherwise. """ if plugin_name not in _PLUGINS.get(package_name, dict()): try: _import_one(package_name, plugin_name) except UnknownPluginError: return False return plugin_name in _PLUGINS.get(package_name, dict()) # # LOAD PLUG-INS # def add_alias(package_name: str, alias: str) -> None: """Add alias to plug-in package This allows one package of plug-ins to be spread over several directories Args: package_name: Name of package containing plug-ins. directory: Additional plug-in directory. """ _PLUGINS["__packages__"].setdefault(package_name, [package_name]).append(alias) _PLUGINS["__aliases__"][alias] = package_name def load(package_name: str, plugin_name: str, prefix: Optional[str] = None) -> str: """Load one plug-in from a package First tries to load the plugin with the given name. If that fails, it tries to load {prefix}_{plugin_name} instead. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in (module). prefix: Prefix of the plug-in name, used if the plug-in name is unknown (optional). Returns: Actual name of plug-in (with or without prefix). """ if plugin_name not in _PLUGINS.get(package_name, dict()): try: _import_one(package_name, plugin_name) except UnknownPluginError as import_err: if prefix: try: return load(package_name, f"{prefix}_{plugin_name}") except UnknownPluginError: pass raise import_err from None # Raise original exception return plugin_name def _aliases(package_name: str) -> List[str]: """Aliases for the given package Args: package_name: Name of package. Returns: List of names of packages (including the given one) containing plugins. """ return _PLUGINS["__packages__"].get(package_name, [package_name]) def _import_one(package_name: str, plugin_name: str) -> None: """Import a plugin from a package This is essentially just a regular python import. As the module is imported, the _PLUGINS-dict will be populated by @register decorated functions in the file. Args: package_name: Name of package containing plug-ins. plugin_name: Name of the plug-in (module). """ for package in _aliases(package_name): module_name = f"{package}.{plugin_name}" try: importlib.import_module(module_name) except ImportError as err: # Error comes from a subimport inside the plugin if not hasattr(err, "name") or err.name != module_name: raise # Plugin does not exist, but might be aliased from another package else: pass # Just to be explicit, not really necessary # Success, do not import more aliases else: continue if plugin_name not in _PLUGINS.get(package_name, dict()): raise UnknownPluginError(f"Plug-in {plugin_name!r} not found in package {package_name!r}") from None def _import_all(package_name: str) -> None: """Import the relevant .py-files in the given package directory As each file is imported, the _PLUGINS-dict will be populated by @register decorated functions in the files. Args: package_name: Name of package containing plug-ins. """ file_paths: List[pathlib.Path] = list() for package_alias in _aliases(package_name): # Figure out the directory of the package by importing it try: package = importlib.import_module(package_alias) except ImportError: raise UnknownPackageError(f"Plug-in package {package_name!r} not found") from None # List all .py files in the given directory directory = pathlib.Path(package.__file__).parent file_paths += [f for f in directory.glob("*.py") if not f.stem.startswith("_")] # Import all Python files for file_path in file_paths: plugin_name = file_path.stem try: _import_one(package_name, plugin_name) except UnknownPluginError: pass # OK if .py file does not contain a plugin PKX3MAC##midgard/dev/profiler.py"""Add a profiler when running Supports several profilers including cprofile, line_profiler, memprof and memory_profiler. """ # Standard library imports from abc import ABCMeta, abstractmethod from typing import Callable, Generator class Profiler(metaclass=ABCMeta): """Base class for profilers""" pip_name: str = NotImplemented option: str = NotImplemented extension: str = NotImplemented @abstractmethod def setup(self, options): """Set up profiler""" @abstractmethod def start(self): """Start profiler""" @abstractmethod def end(self): """Stop profiler""" @abstractmethod def show(self): """Show results of profiler session in console""" @abstractmethod def write(self): """Write results of profiler session to disk""" def _get_func_from_name(self, name: str) -> Generator: """Get a function object from a name Assume the name is given as `module.function`, or `module.class.method`. Args: name: The full name of a function, for instance 'midgard.dev.profiler._get_func_from_name'. Returns: Function: The function object represented by name. """ import importlib import types # Import module and find function (possibly trying to find function inside class) modname, _, fname = name.rpartition(".") try: obj = importlib.import_module(modname) except ImportError: modname, _, clsname = modname.rpartition(".") mod = importlib.import_module(modname) obj = getattr(mod, clsname) # Support simple *-wildcard matching if name.endswith("*"): fnames = [f for f in dir(obj) if f.startswith(fname[:-1]) and not f.startswith("__")] if not fnames: raise TypeError("Found no functions named '{}'".format(name)) else: fnames = [fname] # Generate each function in fnames for fname in fnames: func = getattr(obj, fname) # Unwrap decorated functions while hasattr(func, "__wrapped__"): func = func.__wrapped__ # Return getter, setter and deleter functions of properties if isinstance(func, property): for accessor in ("fget", "fset", "fdel"): if getattr(func, accessor) is not None: yield getattr(func, accessor) else: if isinstance(func, types.FunctionType): yield func elif not name.endswith("*"): raise TypeError("'{}.{}' is not a function".format(modname, fname)) class CProfile(Profiler): """cprofile is used for profiling the whole program""" pip_name = "cprofile" option = "--profiler" def start(self): print("hei") class LineProfiler(Profiler): """line_profiler is used to profile one or a few functions in detail""" pip_name = "line_profiler" option = "--line_profiler" """ def profiler(func_to_profile): ""Run a function with profiling turned on Args: func_to_profile (Function): Function that will be called after profiling is turned on. "" # Should we do line or function profiling? prof, info = _setup_line_profiler() if util.check_options("--line_profile") else _setup_cprofile() # Read command line options filename = util.read_option_value("--profile_output", default="where") if not os.path.splitext(filename)[1]: filename = "{}.{}".format(filename, info["extension"]) # Start profiling and run Where as usual. Store and print profile even if Where crashes log.info("Enable {}, output stored in {}", info["doc"], filename) prof.enable() try: func_to_profile() finally: prof.disable() # Store profile to file inspect_str = "Inspect it using e.g. {}".format(" or ".join(info["inspect"])) log.info("Profile information stored to {f}\n " + inspect_str, f=filename) prof.dump_stats(filename) # Print profile to terminal if util.check_options("--show_profile"): prof.print_stats() def _setup_line_profiler(): ""Set up a line profiler"" import line_profiler prof = line_profiler.LineProfiler() funcs = util.read_option_value("--line_profile", default="").split(",") for fname in funcs: for func in _get_func_from_name(fname): prof.add_function(func) info = dict( extension="lprof", inspect=['"python -m line_profiler {f}"'], doc="line profiling functions {}".format(", ".join(funcs)), ) return prof, info def _get_func_from_name(name): ""Get a function object from a name Assume the name is given as module.function, or module.class.method. Args: name (String): The full name of a function, for instance 'where.__main__._get_func_from_name'. Returns: Function: The function object represented by name "" import importlib import types # Import module and find function (possibly trying to find function inside class) name = name if name.startswith("where.") else "where." + name modname, _, fname = name.rpartition(".") try: obj = importlib.import_module(modname) except ImportError: modname, _, clsname = modname.rpartition(".") mod = importlib.import_module(modname) obj = getattr(mod, clsname) # Support simple *-wildcard matching if name.endswith("*"): fnames = [f for f in dir(obj) if f.startswith(fname[:-1]) and not f.startswith("__")] if not fnames: raise TypeError("Found no functions named '{}'".format(name)) else: fnames = [fname] # Generate each function in fnames for fname in fnames: func = getattr(obj, fname) # Unwrap decorated functions while hasattr(func, "__wrapped__"): func = func.__wrapped__ # Return getter, setter and deleter functions of properties if isinstance(func, property): for accessor in ("fget", "fset", "fdel"): if getattr(func, accessor) is not None: yield getattr(func, accessor) else: if isinstance(func, types.FunctionType): yield func elif not name.endswith("*"): raise TypeError("'{}.{}' is not a function".format(modname, fname)) def _setup_cprofile(): ""Set up a profiler"" import cProfile prof = cProfile.Profile() prof.print_stats = _print_cprofile_stats(prof) info = dict(extension="prof", inspect=['"pyprof2calltree -k -i {f}"', '"python -m pstats {f}"'], doc="profiling") return prof, info def _print_cprofile_stats(prof): ""Print statistics about data from cProfile Print statistics as defined by the `--show_profile` command line option. See the `profile`-function for more information. "" import pstats profile_info = util.read_option_value("--show_profile", default="time:50") def print_stats(): stat = pstats.Stats(prof).strip_dirs() if ":" in profile_info: sort_columns, _, amount = profile_info.partition(":") else: sort_keys = stat.get_sort_arg_defs().keys() sort_columns, amount = (profile_info, "") if profile_info in sort_keys else ("time", profile_info) amount = int(amount) if amount.isnumeric() else amount pstats.Stats(prof).strip_dirs().sort_stats(sort_columns).print_stats(amount) return print_stats """ PKC None: """Set up a new timer The text to be shown when logging the timer can be customized. Typically, the value of the timer will be added at the end of the string (e.g. 'Elapsed time: 0.1234 seconds'). However, this can be customized by adding a '{}' to the text. For example `text='Used {} to run the code'` will produce something like 'Used 0.1234 seconds to run the code'. Args: text: Text used when logging the timer (see above). fmt: Format used when formatting the time (default 4 decimals). logger: Function used to do the logging. """ super().__init__() self._start: Optional[float] = None self._end: Optional[float] = None self.text = text if (text is None or "{}" in text) else (text + " {}").strip() self.fmt = fmt self.logger = (lambda _: None) if logger is None else logger @staticmethod def timer() -> float: """Get current value of timer Using the built-in `time.perf_counter` to do the timing. Returns: Current value of timer. """ return time.perf_counter() def start(self) -> None: """Start the timer """ self._start = self.timer() self._end = None def pause(self) -> float: """Pause the timer without logging. Use .start() to restart the timer """ self._end = self.timer() time_elapsed = self.elapsed() self._start = None return time_elapsed def end(self) -> float: """End the timer and log the time elapsed Returns: The time elapsed in seconds. """ time_elapsed = self.pause() self._log(time_elapsed) return time_elapsed def elapsed(self) -> float: """Log the time elapsed Can be used explicitly to log the time since a timer started without ending the timer. Returns: The time elapsed in seconds. """ if self._start is None: raise exceptions.TimerNotRunning( f"The timer is not running. See `help({self.__module__})` for information on how to start one." ) timer_end = self.timer() if self._end is None else self._end time_elapsed = 0 if self._start is None else timer_end - self._start return time_elapsed def _log(self, time_elapsed: float) -> None: """Do the actual logging of elapsed time Args: The time elapsed in seconds. """ time_text = f"{time_elapsed:{self.fmt}} seconds" if self.text: self.logger(self.text.format(time_text)) def __enter__(self) -> "Timer": """Start the timer as a context manager """ self.start() return self def __exit__(self, *_: Any) -> None: """End the timer and log the time elapsed as a context manager """ self.end() class AccumulatedTimer(Timer): def __init__( self, text: str = "Elapsed time:", fmt: str = ".4f", logger: Optional[Callable[[str], None]] = log.info ) -> None: super().__init__(text, fmt, logger) self.accumulated = 0.0 def reset(self) -> None: """Reset the timer back to 0 """ if self._end is None: raise exceptions.TimerRunning(f"The timer is running and cannot be reset") self.accumulated = 0.0 def end(self) -> float: """End the timer and log the time elapsed Returns: The time elapsed in seconds. """ super().end() return self.accumulated def elapsed(self) -> float: """Log the time elapsed Can be used explicitly to log the time since a timer started without ending the timer. Returns: The time elapsed in seconds. """ try: time_elapsed = super().elapsed() except exceptions.TimerNotRunning: time_elapsed = 0 self.accumulated += time_elapsed return time_elapsed def _log(self, time_elapsed: float) -> None: """Do the actual logging of elapsed time Args: The time elapsed in seconds. """ time_text = f"{time_elapsed:{self.fmt}}/{self.accumulated:{self.fmt}} seconds" if self.text: self.logger(self.text.format(time_text)) PKOnANmidgard/dev/util.pyPKMLmidgard/files/__init__.pyPKsiMmidgard/files/dates.py"""Convenience functions for working with dates Description: ------------ Formats and converters that can be used for convenience and consistency. """ # Standard library imports import datetime from typing import Dict, Optional, Union # Formats that can be passed to datetime.strftime, see http://strftime.org/ FMT_date = "%Y-%m-%d" FMT_datetime = "%Y-%m-%d %H:%M:%S" FMT_dt_file = "%Y%m%d-%H%M%S" def date_vars(date: Optional[Union[datetime.date, datetime.datetime]]) -> Dict[str, str]: """Construct a dict of date variables From a given date, construct a dict containing all relevant date variables. This dict can be used to for instance replace variables in file names. Examples: >>> from datetime import date >>> date_vars(date(2009, 11, 2)) # doctest: +NORMALIZE_WHITESPACE {'yyyy': '2009', 'ce': '20', 'yy': '09', 'm': '11', 'mm': '11', 'mmm': 'nov', 'MMM': 'NOV', 'd': '2', 'dd': '02', 'doy': '306', 'dow': '1', 'h': '0', 'hh': '00'} >>> date_vars(None) {} Args: date: The given date. Returns: Dictionary with date variables for the given date. """ if date is None: return dict() # Create the dict of date variables return dict( yyyy=date.strftime("%Y"), ce=date.strftime("%Y")[:2], yy=date.strftime("%y"), m=str(date.month), mm=date.strftime("%m"), mmm=date.strftime("%b").lower(), MMM=date.strftime("%b").upper(), d=str(date.day), dd=date.strftime("%d"), doy=date.strftime("%j"), dow=date.strftime("%w"), h=date.strftime("%-H"), hh=date.strftime("%H"), ) PKC None: """Start a clean list of dependencies The file_path is to the file in which dependencies are stored. This is cached, so after init() is run, the other functions do not need to specify the file_path. Args: file_path: Path to dependency file. fast_check: Fast check uses timestamps, slow check uses md5 checksums. """ file_path = pathlib.Path(file_path) # Store current dependencies to disk write() # Update and cache variables _DEPENDENCY_CACHE.clear() _DEPENDENCY_CACHE["file_path"] = file_path _DEPENDENCY_CACHE["fast_check"] = fast_check # Delete any existing dependency file try: file_path.unlink() log.debug(f"Removing old dependency file {file_path}") except FileNotFoundError: pass # If dependency file does not exist, we don't do anything # Register _write in case program exits without writing all dependencies to disk atexit.register(_write) def add(*file_paths: Union[str, pathlib.Path], label: str = "") -> None: """Add a list of files to the list of dependencies Records the current time stamp or md5 hash of the files specified by file paths, and stores as dependencies on the dependency file. Before adding dependencies, a call to `init()` has to be done, to set up where to store the dependencies. Args: file_paths: List of file paths to add to the dependency file. label: Optional label for dependencies. """ # Ignore dependency if no dependency file has been set (init() has not been called) if not _DEPENDENCY_CACHE: return # Add or update dependency information fast_check = _DEPENDENCY_CACHE["fast_check"] for file_path in file_paths: file_info = _file_info(file_path, fast_check, label=label) _CURRENT_DEPENDENCIES[str(file_path)] = file_info log.debug(f"Adding dependency: {file_path} ({file_info['checksum']})") def _file_info(file_path: Union[str, pathlib.Path], fast_check: bool, **info_args: str) -> Dict[str, str]: """Get file info for a file path The contents of the file info depends on whether we are doing a fast check or not. Args: file_path: File path. fast_check: Whether to do a fast check. info_args: Optional arguments that will be added to file info. Returns: Dictionary with info about file. """ file_info = dict(timestamp=get_timestamp(file_path)) if fast_check: file_info["checksum"] = file_info["timestamp"] else: file_info["checksum"] = get_md5(file_path) file_info.update(info_args) return file_info def write() -> None: """Write dependencies to file """ atexit.unregister(_write) _write(write_as_crash=False) def _write(write_as_crash: bool = True) -> None: """Write dependencies to file This function is called either when starting a new list of dependencies (with a call to `init`) or when the program exits (including with an error). If `write_as_crash` is True, a special dependency is stored that will force `changed` to return True. This will in particular make sure that a stage is rerun if it crashed the previous time it ran. Args: write_as_crash (Boolean): Whether to note that the current dependendee crashed. """ # Ignore dependency if no dependency path is set (init() has not been called) if not _DEPENDENCY_CACHE: return # Store timestamp of crash, this will also force the current stage to be rerun next time if write_as_crash: _CURRENT_DEPENDENCIES["CRASHED"] = _file_info("CRASHED", fast_check=True, checksum="CRASHED", label="CRASHED") # No need to open and close files if there are no dependencies to store if not _CURRENT_DEPENDENCIES: return # Open dependency file or start from a fresh configuration/dictionary dependencies = Configuration.read_from_file("dependecies", _DEPENDENCY_CACHE["file_path"]) # Update dependency information and clear list of current dependencies for file_path, info in _CURRENT_DEPENDENCIES.items(): dependencies.update_from_dict(info, section=file_path) _CURRENT_DEPENDENCIES.clear() # Write to dependency file dependencies.write_to_file(_DEPENDENCY_CACHE["file_path"]) def changed(file_path: Union[str, pathlib.Path], fast_check: bool = True) -> bool: """Check if the dependencies have changed Returns True if any of the files listed in the dependency file have changed, or if the dependency file itself does not exist. Args: file_path: Path to dependency file. fast_check: Fast check uses timestamps, slow check uses md5 checksums. Returns: True if any file has changed or if the dependecy file does not exist, False otherwise. """ # Make sure dependency file exists file_path = pathlib.Path(file_path) if not file_path.exists(): log.debug(f"Dependency file {file_path} does not exist") return True # Check if any dependencies have changed dependencies = Configuration.read_from_file("dependencies", file_path) for file_path in dependencies.section_names: previous_checksum = dependencies[file_path].checksum.str current_checksum = _file_info(file_path, fast_check=fast_check)["checksum"] if current_checksum != previous_checksum: log.debug(f"Dependency {file_path} changed from {previous_checksum} to {current_checksum}") return True return False def get_paths_with_label(file_path: Union[str, pathlib.Path], label_pattern: str) -> List[pathlib.Path]: """Find all paths with the given label Args: file_path: Path to dependency file. label_pattern: String with label or regular expression (e.g. 'gnss_rinex_nav_[MGE]' or 'gnss_rinex_nav_.'). Returns: List: List of file paths. """ label_re = re.compile(f"^{label_pattern}$") # ^ and $ is used to match the whole string # Make sure dependency file exists file_path = pathlib.Path(file_path) if not file_path.exists(): log.debug(f"Dependency file {file_path} does not exist") return [] # Find dependencies with the given label dependencies = Configuration.read_from_file("dependencies", file_path) paths = list() for file_path in dependencies.section_names: label = dependencies[file_path].label.str if label_re.match(label): paths.append(pathlib.Path(file_path)) return paths def get_timestamp(file_path: Union[str, pathlib.Path]) -> str: """Return a textual timestamp from the modification date of a file Args: file_path: Path to file. Returns: String representing the modification date of the file. """ file_path = pathlib.Path(file_path) try: mod_time = file_path.stat().st_mtime except FileNotFoundError: return "File does not exist" return datetime.fromtimestamp(mod_time).isoformat() def get_md5(file_path: Union[str, pathlib.Path]) -> str: """Return a md5 checksum based on a file. Args: file_path: Path to file. Returns: Hex-string representing the contents of the file. """ md5 = hashlib.md5() block_size = 128 * md5.block_size # Chunk file to avoid memory problems try: with open(file_path, mode="rb") as fid: for chunk in iter(lambda: fid.read(block_size), b""): md5.update(chunk) return md5.hexdigest() except FileNotFoundError: return "File does not exist" PKH5N % % midgard/files/files.py"""Utilities for working with files """ # Standard library imports import builtins from contextlib import contextmanager import gzip import pathlib from typing import Any, Iterator, Optional, Union @contextmanager def open( file_path: Union[str, pathlib.Path], create_dirs: bool = False, open_as_gzip: Optional[bool] = None, **open_args: Any ) -> Iterator: """Open a file. Can automatically create the necessary directories before writing to a file, as well as handle gzipped files. With `open_as_gzip` set to None (default), it will try to detect whether the path is a .gz file simply by looking at the path suffix. For more control, you can set the parameter to True or False explicitly. Args: file_path: String or pathlib.Path representing the full file path. create_dirs: True or False, if True missing directories are created. open_as_gzip: Use gzip library to open file. open_args: All keyword arguments are passed on to the built-in open. Returns: File object representing the file. """ file_path = pathlib.Path(file_path).resolve() if create_dirs: file_path.parent.mkdir(parents=True, exist_ok=True) # Simple detection of .gz-files if open_as_gzip is None: open_as_gzip = file_path.suffix == ".gz" open_func = gzip.open if open_as_gzip else builtins.open try: with open_func(file_path, **open_args) as fid: yield fid except Exception: raise def move(from_path: Union[str, pathlib.Path], to_path: Union[str, pathlib.Path], overwrite: bool = True) -> None: """Move a file to another path With overwrite set to True, to_path may already exist and will be overwritten without warning. Setting overwrite to False will raise a FileExistsError if to_path already exists. Args: from_path: Path of file to be moved. to_path: Path file will be moved to. overwrite: If True, to_path may already exist. If False, to_path will never be overwritten. """ from_path = pathlib.Path(from_path) to_path = pathlib.Path(to_path) # Create directories if necessary to_path.parent.mkdir(parents=True, exist_ok=True) # Be careful not to overwrite destination if overwrite is False if overwrite: # No worries from_path.replace(to_path) else: # Be careful, see https://realpython.com/python-pathlib/#moving-and-deleting-files with to_path.open(mode="xb") as fid: fid.write(from_path.read_bytes()) from_path.unlink() PKH5NQ{hhmidgard/files/url.py"""Midgard library module, defining a URL class that mirrors Pathlib.Path Warning: There are many intricacies of URLs that are not handled by this class at the moment. """ # Standard library imports import pathlib from typing import Union # Third party imports import pycurl class URL(str): """Simple wrapper around String to have URLs work similar to pathlib.Path """ @property def name(self) -> str: """Part of URL after the last /""" return self.split("/")[-1] def with_name(self, name: str) -> "URL": """Replace part of URL after the last / with a new name Args: name: New name. Return: URL with part after the last / replaced with the new name. """ base_url, slash, _ = self.rpartition("/") return self.__class__(f"{base_url}{slash}{name}") def exists(self) -> bool: """Check whether the given URL returns a valid document Try to download the first byte of the document (avoid downloading a big file if it exists). Warning: Because of network latency, this will be a slow operation. Return: True if URL leads to a valid document, False otherwise. """ c = pycurl.Curl() c.setopt(c.URL, self) c.setopt(c.RANGE, "0-0") try: c.perform() except pycurl.error: return False return True def download_to(self, path: Union[str, pathlib.Path]) -> None: """Download the URL to path Args: path: Path to save the URL. """ raise NotImplementedError PKC Tuple[np.ndarray, ...]: """Compute DOP (dilution of precision) Args: az: Satellite azimuth angle (radians) el: Satellite elevation angle (radians) el_mask: Elevation cut-off angle (radians) min_num_sats: Minimum number of satellites for a valid solution Returns: Tuple with GDOP, PDOP, HDOP and VDOP """ gdop = np.array([0.0]) pdop = np.array([0.0]) hdop = np.array([0.0]) vdop = np.array([0.0]) # Make sure that we have enough valid satellites n_valid_sats = np.sum(el >= el_mask) if n_valid_sats < min_num_sats: log.warn( f"Not enough valid observation={n_valid_sats:02d}, at least {min_num_sats:02d} observations are needed ...!" ) return gdop, pdop, hdop, vdop # Construct the design matrix H based on observed & valid satellites # NB: treat H as a vector # # | cos(e1)sin(a1) cos(e1)cos(a1) sin(e1) 1 | # | cos(e2)sin(a2) cos(e2)cos(a2) sin(e2) 1 | # | cos(e3)sin(a3) cos(e3)cos(a3) sin(a3) 1 | # H = | cos(e4)sin(a4) cos(e4)cos(a4) sin(e4) 1 | # | .. .. .. .. | # | cos(e_n)sin(a_n) cos(e_n)cos(a_n) sin(a_n) 1 | H = np.stack((np.cos(el) * np.sin(az), np.cos(el) * np.cos(az), np.sin(el), np.ones(el.shape)), axis=1) Q = H.T @ H # H^t*H # User info log.debug("Q=H^t*H::") log.debug(Q) # Check if the inverse of Q exists by computing the conditional number OR computation of the detereminant if not np.isfinite(np.linalg.cond(Q)): log.warn("error computing the inverse of the co-factor matrix Q") else: Q = np.linalg.inv(Q) # (H*H^t)^{-1} gdop = np.sqrt(np.trace(Q)) # GDOP pdop = np.sqrt(np.trace(Q[0:3])) # PDOP hdop = np.sqrt(np.trace(Q[0:2])) # HDOP vdop = np.sqrt(np.trace(Q[0:1])) # VDOP return gdop, pdop, hdop, vdop def comp_quality_indicators(sol_vc_mat: np.ndarray) -> tuple: """Compute quality indicators Following quality indicators are computed 1. compute the standard error ellipse(SEE) 2. compute the distance root mean squared (DRMS) 3. compute the circular error probable (CEP) Args: sol_vc_mat: Variance-covariance matrix of the unknown Returns: Tuple with DRMS, CEP and SEE """ # 1. Compute the standard error ellipse(SEE). Requires to compute # lam_min, lam_max and the orientation angle (alpha) eig_val, eig_vec = np.linalg.eig(sol_vc_mat) lambda_max = np.max(np.diag(eig_val)) idx_a = np.argmax(eig_val) a = np.sqrt(lam_max) lambda_min = np.min(np.diag(eig_val)) idx_b = np.argmin(eig_val) b = np.sqrt(lam_min) # Direction of the largest vector alpha = arctan2(eig_val[1, idx_a], eig_val[0, idx_b]) * 180 / np.pi see = [lambda_max, lambda_min, alpha] # 2. Compute the distance root mean squred (DRMS) drms = np.sqrt(sol_vc_mat[0, 0] + sol_vc_mat[1, 1]) # 3. Compute the circular error probable (CEP) std_of_unknown = np.sqrt(np.diag(sol_vc_mat)) std_ratio = std_of_unknown[0] / std_of_unknown[1] if .2 <= std_ratio <= 1.0: cep = 0.589 * (std_of_unknown[0] + std_of_unknown[1]) else: cep = .0 return drms, cep, see def sol_validation(residuals: np.ndarray, alpha_siglev: float, n_params: int = 4) -> bool: """Validating the GNSS solution is carried out using Chi-square test Use Chi-square test for outlier detection and rejection. Args: residuals: Postfit residuals for all satellites in each epoch alpha_siglev: Alpha significance level n_params: Number of parameters (states), normally 4 parameters for station coordinates and receiver clock Returns: Array containing False for observations to throw away. """ # Regular checks num_obs = len(residuals) df = num_obs - n_params - 1 if df < 0: log.warn(f"sol_validattion():: degree of freedom < 0 (df = {df}) --> TEST NOT PASSED") return False # Chi-square validation of residuals vv = np.dot(residuals, residuals) # sum (v(i) * v(i)) chi_sqr = stats.chi2.ppf(1 - alpha_siglev, df=df) if vv > chi_sqr: log.debug( f"sol_validattion():: number of valid obs={num_obs:03} vv={vv:.2f} chi-square value={chi_sqr:.2f}--> TEST NOT PASSED" ) return False else: log.debug( f"sol_validation():: number of valid obs={num_obs:02} vv={vv:.2f} < chi-square value={chi_sqr:.2f} --> TEST PASSED for alpha significance level= {(1.0-alpha_siglev)*100:.2f} %" ) return True def main(): """Main program for testing solution validation implementation #TODO: This should be done via midgard/tests/gnss !!! """ log.init(log_level="info") # Upper bound for GDOP max_gdops = 30.0 # Read command line arguments parser = get_my_parser() results = parser.parse_args() no_print = lambda _: None # User info/test log.info(f" number of arguments ={len(sys.argv):d}") log.info(f" program name ={sys.argv}") # Testing the implemented function if len(sys.argv) == 1: i_res_cnt = 9 n_val_sats = i_res_cnt alpha_siglev = 0.01 n_params = 5 my_residuals = np.random.normal(0.0, 1, size=i_res_cnt) az, el = np.random.normal(60, 30, size=(2 * n_val_sats)).reshape(-1, 2).T my_result = sol_validation(my_residuals, alpha_siglev, n_params) dops_vals = compute_dops(az, el) if dops_vals[0] <= 0.0 or dops_vals[0] > max_gdops: log.warn( f"sol_validation():: compute_dops(): not valid solution, number of valid sats={n_val_sats:02d} and GDOP={dops_vals[0]:.2f}" ) log.info(f" DOPS results::") log.info(f" compute_dops(): GDOP={dops_vals[0]:.2f}") log.info(f" compute_dops(): PDOP={dops_vals[1]:.2f}") log.info(f" compute_dops(): HDOP={dops_vals[2]:.2f}") log.info(f" compute_dops(): PDOP={dops_vals[3]:.2f}") if __name__ == "__main__": main() PKC " f"site height={rec_pos[2]:.2f}, elevation={el:.2f} [radians]" ) if np.linalg.norm(ion_coeffs, ord=8) <= 0.0: raise ValueError( "klobuchar_model():: Invalid input parameters --> " "missing ionosphere model parameters (a0, a1, a2, a3, b0, b1, b2, b3) .." ) # ==================================================== # # 1. calculate the Earth centered angle (semi-circle) # # ==================================================== # psi = 0.0137 / (el / np.pi + 0.11) - 0.022 # ==================================================== # # 2. sub-ionospheric latitude/longitude (semi-circle) # # ==================================================== # phi = (rec_pos[0] / np.pi) + psi * np.cos(az) # +TODO: old # phi = 0.416 if phi > 0.416 else -0.416 # -TODO: old if phi > 0.416: phi = 0.416 elif phi < -0.416: phi = -0.416 phi_ = phi # ==================================================== # # 3. compute the sub-ionospheric longitude # # ==================================================== # lam = rec_pos[1] / np.pi + psi * np.sin(az) / np.cos(phi * np.pi) # ==================================================== # # 4. compute geomagnetic latitude (semi-circle) # # ==================================================== # phi += 0.064 * np.cos((lam - 1.617) * np.pi) # ==================================================== # # 5. find the local time (s) # # ==================================================== # # +TODO: old # # tt = 43200.0*lam + time2gpst(t, week); # tt = t # tt -= np.floor(tt / 86400.0) * 86400.0 # 0<=tt<86400 # -TODO: old tt = 43200.0 * lam + time tt -= np.floor(tt / 86400.0) * 86400.0 # Seconds of day (0<=tt<86400) # TODO: Do we need that? if tt > 86400.0: tt -= 86400.0 elif tt < 0.0: tt += 86400.0 # ==================================================== # # 6. compute the slant factor # # ==================================================== # f = 1.0 + 16.0 * (0.53 - el / np.pi) ** 3 # elevation angle shall be in cycle # ==================================================== # # 7. compute the L1 ionospheric time delay # # ==================================================== # amp = ion_coeffs[0] + phi * (ion_coeffs[1] + phi * (ion_coeffs[2] + phi * ion_coeffs[3])) # compute the amplitude per = ion_coeffs[4] + phi * (ion_coeffs[5] + phi * (ion_coeffs[6] + phi * ion_coeffs[7])) # compute the periode amp = 0.0 if amp < 0.0 else amp per = 72000.0 if per < 72000.0 else per x = 2.0 * np.pi * (tt - 50400.0) / per L1_delay = ( constant.c * f * (5e-9 + amp * (1.0 + x * x * (-0.5 + x * x / 24.0))) if (np.fabs(x) < 1.57) else constant.c * f * 5e-9 ) # ==================================================== # # 8. Ionospheric time delay for given frequency # # ==================================================== # # The ionospheric delay for another frequency than GPS L1 or BeiDou B1 can be determined via Eq. 5.5 in Sanz # Subirana et al. (2013). if freq is None: iono_delay = L1_delay else: iono_delay = (freq_l1 / freq) ** 2 * L1_delay # ========================================================= # # define ERR_BRDCI 0.5: broadcast iono model error factor # # ========================================================= # # TODO: How to determine variance for other GNSS frequencies? L1_variance = (L1_delay * 0.5) ** 2 # debuging info logger(" =================================== OUTPUT ============================================") logger(f"\t[1] Earth-centered angle = {psi:11.5f} [semicircles]") logger(f"\t[2] sub-ionospheric latitude = {phi_:11.5f} [semicircles]") logger(f"\t[3] sub-ionospheric longitude = {lam:11.5f} [semicircles]") logger(f"\t[4] geomagnetic latitude = {phi:11.5f} [semicircles]") logger(f"\t[5] local time = {tt:11.5f} [seconds]") logger(f"\t[6] slant factor = {f:11.5f} ") logger(f"\t[7] amplitude = {amp:11.5e}") logger(f"\t[8] period = {per:11.5f}") logger( f"\t[9] ionosphere delay for frequency {freq * 1e-6:11.2f} Mhz and the corresponding variance are: " f"{iono_delay:.5f} (m) and {L1_variance:.5f} (m^2)" ) logger(" ================================================================================================") return iono_delay, L1_variance def main(): # ================================================ # # these values are copied from Klobuchar trest # # ================================================ # tt = 50700.0 ion_coeffs = np.array([3.82e-8, 1.49e-8, -1.79e-7, 0, 1.43e5, 0.0, -3.28e5, 1.13e5]) rec_pos = np.array([40.0 / 180.0, -100.0 / 180.0, 170]) az = 240.0 / 180 el = 20.0 / 180 delay, variance = klobuchar(tt, ion_coeffs, rec_pos, az, el) # user info print(f" Ionospheric path delay on L1= {delay:.5f} [m] and the corresponding variance={variance:.5f} [m^2]") if __name__ == "__main__": main() PKv|Lmidgard/math/__init__.pyPKƀ>> from midgard.math.constant import Constant >>> print(f"The speed of light is {constant.c:0.2f}") The speed of light is 299792458.00 Todo: ----- Rewrite as a class instead of a module, to have somewhat cleaner code (and be more consistent with things like lib.unit). """ # Standard library imports from contextlib import contextmanager from typing import List try: import importlib.resources as importlib_resources # Python >= 3.7 except ImportError: import importlib_resources # Python <= 3.6: pip install importlib_resources # Midgard imports from midgard.config.config import Configuration from midgard.dev import exceptions class Constant: _constants = Configuration("constant") def __init__(self) -> None: # Read constants from file package, _, name = __name__.rpartition(".") with importlib_resources.path(package, f"{name}.txt") as path: self.update_from_file(path) # Set default source self._source = "default" def update_from_file(self, file_path): """Update list of constants from file""" self._constants.update_from_file(file_path, case_sensitive=True) @property def source(self): return self._source def get(self, constant: str, source: str = None) -> float: """Get the value of one constant Note that if you need the default value of the constant (from the default source) it is typically better to read it as a property. That is, `constant.c` is preferred to `constant.get('c')`. Args: constant: Name of the constant. source: Source from which the constant is defined. Returns: Value of constant. """ source = self.source if source is None else source try: return self._constants[constant][source].float except exceptions.MissingSectionError: raise exceptions.UnknownConstantError( f"Constant {constant!r} is not defined in {', '.join(self._constants.sources)}" ) from None except exceptions.MissingEntryError: raise exceptions.UnknownConstantError( f"Constant {constant!r} is not defined by source {source!r} in {', '.join(self._constants.sources)}" ) from None def unit(self, constant: str) -> str: """Unit of constant""" try: return self._constants[constant].__unit__.str except exceptions.MissingSectionError: raise exceptions.UnknownConstantError( f"Constant {constant!r} is not defined in {', '.join(self._constants.sources)}" ) from None except exceptions.MissingEntryError: raise exceptions.UnitError( f"Constant {constant!r} has not defined a unit in {', '.join(self._constants.sources)}" ) from None @contextmanager def use_source(self, source: str) -> None: """Context manager for handling different sources Example: >>> constant.GM 398600441800000.0 >>> with constant.use_source("de430"): ... print(constant.GM) ... 398600435436000.0 Args: source: Name of source of constant. """ previous_source = self.source self._source = source try: yield None finally: self._source = previous_source def __getattr__(self, constant: str) -> float: """Get constant as an attribute with dot-notation""" if constant in self._constants.section_names: try: return self._constants[constant][self.source].float except exceptions.MissingEntryError: raise exceptions.UnknownConstantError( f"Constant {constant!r} is not defined by source {self.source!r} in " f"{', '.join(self._constants.sources)}" ) from None # Raise error for unknown attributes else: raise AttributeError( f"Constant {constant!r} is not defined in {', '.join(self._constants.sources)}" ) from None def __dir__(self) -> List[str]: """List all constants""" return super().__dir__() + list(self._constants.section_names) def __repr__(self): return f"{type(self).__name__}({', '.join(self._constants.sources)!r})" constant = Constant() PKR8NmbAAmidgard/math/constant.txt# This file defines the MIDGARD constants. It is read and handled by the midgard.lib.constant-module. # # It is possible to use different values for the same constant. Unfortunately this is sometimes necessary as some # models are only consistent with given values for the constant. # # References: # [1] Petit, G. and Luzum, B. (eds.), IERS Conventions (2010), # IERS Technical Note No. 36, BKG (2010). # http://www.iers.org/IERS/EN/Publications/TechnicalNotes/tn36.html # # [2] EGM2008 Global Gravitational Model # http://earth-info.nga.mil/GandG/wgs84/gravitymod/egm2008/README_FIRST.pdf # # [3] Folkner et al. (2014), The Planetary and Lunar Ephemerides DE430 and DE431. # https://naif.jpl.nasa.gov/pub/naif/generic_kernels/spk/planets/de430_and_de431.pdf # # [4] IS-GPS-200H (2013), Interface specification IS-GPS-200, Navstar GPS Space # Segment/Navigation User Interfaces, Global Positioning Systems Directorate # Systems Engineering & Integration # # [5] DE421 header file. GM values are given in AU^3 / day^2 # ftp://ssd.jpl.nasa.gov/pub/eph/planets/ascii/de421/header.421 # # [6] Williams et al. (2008), DE421 Lunar Orbit, Physical Librations, and Surface Coordinates. # naif.jpl.nasa.gov/pub/naif/generic_kernels/spk/planets/a_old_versions/de421_lunar_ephemeris_and_orientation.pdf # # [7] OS SIS ICD (2015), European GNSS (Galileo) Open Service Signal In Space Interface Control Document, # European Union, Issue 1.2 # # [8] BDS-SIS-ICD-2.0 (2013), BeiDou Navigation Satellite System Signal in Space Interface Control Document - Open # Service Signal (Version 2.0), China Satellite Navigation Office # # [9] IS-QZSS-PNT-001 (2017), Quasi-Zenith Satellite System - Inferface specification: Satellite Positioning, Navigation # and Timing Service, Japan Aerospace Exploration Agency # # [10] IRNSS-ICD-SPS (2014), Indian Regional Navigation Satellite System - Signal in space ICD for standard positioning # service, ISRO Satellite Centre, Indian Space Research Organization, Bangalore, Version 1.0 # # [11] Gurtner, W. and Estey, L. (2015). Rinex: The receiver independent exchange format version 3.03. # # [12] Montenbruck, O., Steigenberger, P., and Hauschild, A. (2018). Multi-GNSS Signal-in-Space Range Error Assessment - # Methodology Results. Advances in Space Research, DOI 10.1016/j.asr.2018.03.041. ## \f$ a_E \f$: Equatorial radius of the Earth. # # Scaling parameter in the spherical harmonic function describing the # gravity field of the Earth. # # Unit: Meters, \f$ m \f$. # # References: IERS Conventions [1], table 1.1, # EGM2008 Global Gravitational Model [2]. [a] __unit__ = meter default = %(iers_2010)s iers_2010 = 6378136.6 egm_2008 = 6378136.3 ## \f$ a_sun \f$: Radius of the Sun ## # # Unit: Meters, \f$ m \f$. # # References: https://en.wikipedia.org/wiki/Solar_radius # [R_sun] __unit__ = meter default = %(web)s web = 6.955e8 ## \f$ c \f$: Speed of light. # # Unit: Meters per second, \f$ m / s \f$. # # References: IERS Conventions [1], table 1.1, # http://en.wikipedia.org/wiki/Speed_of_light [c] __unit__ = meter per second default = %(iers_2010)s iers_2010 = 299792458.0 ## \f$ G \f$: Gravitational constant # # Unit: \f$ m^3 / kg / s^2 \f$. # # References: IERS Conventions [1], table 1.1, [G] __unit__ = meter**3 / (kilogram*second**2) default = %(iers_2010)s iers_2010 = 6.67428e-11 ## \f$ L_G \f$: Conversion factor between TT and TCG time or length, x. # As a result, x can also be GM with M as mass. # All values are relative to the SI second, defined by the involved # atomic clocks, and the SI meter, defined by the definition of c. # The use of the TT # spacetime system implies that the space and time coordinates of the # reference system are scaled wrt the spacetime coordinates of a # commonly used form of the Einstein field equations. The reason for the # scaling is that TAI-time_{TT} should be only periodic (approximately # constant). If (GM)_{TT} is estimated based on observations, what would # the estimated GM be if we do not scale the spacetime, (GM)_{TCG}: # # # Applied as \f$ x_{TT} = (1-L_G) x_{TCG} $. # # Unit: none. # # References: IERS Conventions [1], table 1.1. [L_G] __unit__ = default = %(iers_2010)s iers_2010 = 6.969290134e-10 ## \f$ L_B \f$: Conversion factor between TDB and TCB time or length, x. # As a result, x can also be GM with M as mass. # All values are relative to the SI second, defined by the involved # atomic clocks, and the SI meter, defined by the definition of c. # The use of the TDB # spacetime system implies that the space and time coordinates of the # reference system are scaled wrt the spacetime coordinates of a # commonly used form of the Einstein field equations. The reason for the # scaling is that TAI-time_{TDB} should be only periodic. # If (GM)_{TDB} is estimated based on observations, what would # the estimated GM be if we do not scale the spacetime, (GM)_{TCB}: # # Applied as \f$ x_{TDB} = (1-L_B) x_{TCB} $. # # Unit: none. # # References: IERS Conventions [1], table 1.1. [L_B] __unit__ = default = %(iers_2010)s iers_2010 = 1.550519768e-8 ## \f$ L_C \f$: Conversion factor between TDT and TDB constants, x, like GM. # The relation between TDT time and TDB time is much more complicated than # a scaling. The connection below is made on the basis that constants in TCG have # equal value as constants in TCB. L_C = L_B - L_G, see above. # # Applied as \f$ x_{TDT} = (1+L_C) x_{TDB} $. # # Unit: none. # # References: IERS Conventions [1], table 1.1. [L_C] __unit__ = default = %(iers_2010)s iers_2010 = 1.48082686741e-8 ## \f$ T_0 \f$: Epoch used in the transformation between TT and TCG and between # TDB and TCB. See equations (10.1) and (10.3) in IERS Conventions [1]. # # Unit: none. # # References: IERS Conventions [1], chapter 10.1 [T_0] __unit__ = default = %(iers_2010)s iers_2010 = 2443144.5003725 ## \f$ TDB_0 \f$: Defining constant in transformation between TCB and TDB. # # Unit: seconds # # References: IERS Conventions [1]. Table 1.1 and chapter 10.1 [TDB_0] __unit__ = second default = %(iers_2010)s iers_2010 = -6.55e-5 ## \f$ GM_\oplus \f$: Gravitational constant times the mass of Earth. # # Unit: \f$ m^3 / s^2 \f$. # # References: # cgcs2000 - BDS-SIS-ICD-2.0 [8], table 5.11 # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [6], table 1, page 6 [tdb]. # egm_2008 - EGM2008 Global Gravitational Model [2]. # gtrf - Galileo OS-SIS-ICD [7], table 58 # iers_2010 - IERS Conventions [1], table 1.1.; Galileo OS-SIS-ICD [7], table 58 # jgs - QZSS IS-QZSS-PNT-001 [9], section 5.3.3 # wgs84 - IS-GPS-200H [4], table 20-IV. [GM] __unit__ = meter**3 / second **2 default = %(iers_2010)s cgcs2000 = 3.986004418e14 de430 = 3.98600435436e14 de421 = 3.986004362e14 egm_2008 = 3.986004415e14 gtrf = 3.986004418e14 iers_2010 = 3.986004418e14 jgs = 3.986005e14 wgs84 = 3.986005e14 ## \f$ GM_\odot \f$: Heliocentric gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # iers_2010 - IERS Conventions [1], table 1.1. # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant GMS [tdb]. [GM_sun] __unit__ = meter**3 / second **2 default = %(iers_2010)s iers_2010 = 1.32712442099e20 de430 = 1.327124400419394e20 de421 = 1.327124400409446e20 ## \f$ GM_\text{moon} \f$: Lunacentric gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [6], table 2, page 7 [tdb]. [GM_moon] __unit__ = meter**3 / second **2 default = %(de430)s de430 = 4.902800066e12 de421 = 4.90280008e12 ## \f$ GM_\text{mercury} \f$: Mercury gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant GM1 [tdb]. [GM_mercury] __unit__ = meter**3 / second **2 default = %(de430)s de430 = 2.203178e13 de421 = 2.203209e13 ## \f$ GM_\text{venus} \f$: Venus gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant GM2 [tdb]. [GM_venus] __unit__ = meter**3 / second **2 default = %(de430)s de430 = 3.24858592e14 de421 = 3.24858592e14 ## \f$ GM_\text{mars} \f$: Mars gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant GM4 [tdb]. [GM_mars] __unit__ = meter**3 / second **2 default = %(de430)s de430 = 4.2828375214e13 de421 = 4.2828375214e13 ## \f$ GM_\text{jupiter} \f$: Jupiter gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant GM5 [tdb]. [GM_jupiter] __unit__ = meter**3 / second **2 default = %(de430)s de430 = 1.267127648e17 de421 = 1.267127648e17 ## \f$ GM_\text{saturn} \f$: Saturn gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant GM6 [tdb]. [GM_saturn] __unit__ = meter**3 / second **2 default = %(de430)s de430 = 3.79405852e16 de421 = 3.79405852e16 ## \f$ GM_\text{uranus} \f$: Uranus gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant GM7 [tdb]. [GM_uranus] __unit__ = meter**3 / second **2 default = %(de430)s de430 = 5.7945486e15 de421 = 5.7945486e15 ## \f$ GM_\text{neptune} \f$: Neptune gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant GM8 [tdb]. [GM_neptune] __unit__ = meter**3 / second **2 default = %(de430)s de430 = 6.83652710058e15 de421 = 6.836535e15 ## \f$ GM_\text{pluto} \f$: Pluto gravitational constant. # # Unit: \f$ m^3 / s^2 \f$. # # References: # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant GM9 [tdb]. [GM_pluto] __unit__ = meter**3 / second **2 default = %(de430)s de430 = 9.77e11 de421 = 9.77e11 ## Astronomical unit. # # The astronomical unit roughly represents the distance from the Earth to the Sun. It has been defined as exactly 149 # 597 870 700 meters since 2012. # # Unit: meters # # References: # iers_2010 - IERS Conventions [1]: in TDB system # web - https://en.wikipedia.org/wiki/Astronomical_unit # de430 - DE430 [3], table 8, page 49 [tdb]. # de421 - DE421 [5], constant AU. [AU] __unit__ = meter default = %(iers_2010)s iers_2010 = 149597870700 web = 149597870700 de430 = 149597870700 de421 = 149597870699.6262 ## Solar constant. # # Unit: watts per square meter (W/m^2) # # Reference: https://en.wikipedia.org/wiki/Solar_constant # # Montenbruck, Oliver and Gill, Eberhard, # Satellite Orbits, Springer Verlag, 2000. # [S] __unit__ = watt / meter**2 default = %(book)s web = 1361 book = 1367 ## \f$ d\theta / dt \f$: Rate of advance of ERA. # # Unit: \f$ \mathrm{rev} / \mathrm{UT1day} \f$ # # Reference: IERS Conventions [1], table 1.1 [era_dot] __unit__ = revolution per day default = %(iers_2010)s iers_2010 = 1.00273781191135448 ## \f$ \omega \f$: Nominal mean Earth's angular velocity. # # Unit: \f$ \mathrm{rads}^{-1} \f$ # # Reference: cgcs2000 - BeiDou BDS-SIS-ICD-2.0 [8], table 5.11 # gtrf - Galileo OS-SIS-ICD [7], table 58 # iers_2010 - IERS Conventions [1], table 1.2 # jgs - QZSS IS-QZSS-PNT-001 [9], section 5.3.3 # wgs84 - IS-GPS-200H [4], table 20-IV. [omega] __unit__ = radians per second default = %(iers_2010)s cgcs2000 = 7.292115e-5 gtrf = 7.2921151467e-5 iers_2010 = 7.292115e-5 jgs = 7.2921151467e-5 wgs84 = 7.2921151467e-5 ## \f$ \rho_w \f$: Density of sea water # # Unit: \f$ kg / m^3 \f$. # # References: IERS Conventions [1], section 7.1.5 [rho_w] __unit__ = kilogram per meter ** 3 default = %(iers_2010)s iers_2010 = 1025 ## \f$ g_E \f$: Mean equatorial gravity # # Unit: \f$ m / s^2 \f$. # # References: IERS Conventions [1], table 1.1 [g_E] __unit__ = meter per second ** 2 default = %(iers_2010)s iers_2010 = 9.7803278 ## \f$ f_1, \ldots, f_n \f$: GNSS frequencies # # Unit: \f$ s^{-1} \f$. # # References: RINEX 3.03 format [11], section 5.1 # # Overview over frequency number (1 .. 9) and observation code dependent on GNSS (C, E, G, I and J): # ______________________________ # freq | C E G I J # ______|_______________________ # 1 | E1 L1 L1 # 2 | B1 L2 L2 # 5 | E5a L5 L5 L5 # 6 | B3 E6 # 7 | B2 E5b # 8 | E5 # 9 | S # ______|_______________________ # [gnss_freq_1] __unit__ = hertz default = %(G)s E = 1575.42e+6 G = 1575.42e+6 J = 1575.42e+6 [gnss_freq_2] __unit__ = hertz default = %(G)s C = 1561.098e+6 G = 1227.60e+6 J = 1227.60e+6 [gnss_freq_5] __unit__ = hertz default = %(G)s E = 1176.45e+6 G = 1176.45e+6 I = 1176.45e+6 J = 1176.45e+6 [gnss_freq_6] __unit__ = hertz default = %(E)s C = 1268.52e+6 E = 1278.75e+6 J = 1278.75e+6 [gnss_freq_7] __unit__ = hertz default = %(E)s C = 1207.14e+6 E = 1207.14e+6 [gnss_freq_8] __unit__ = hertz default = %(E)s E = 1191.795e+6 [gnss_freq_9] __unit__ = hertz default = %(I)s I = 2492.028e+6 #TODO: What is a better solution for defining GNSS frequencies (above or under)? [gnss_freq_C] __unit__ = hertz default = %(B1)s B1 = 1561.098e+6 B2 = 1207.14e+6 B3 = 1268.52e+6 [gnss_freq_E] __unit__ = hertz default = %(E1)s E1 = 1575.42e+6 E5 = 1191.795e+6 E5a = 1176.45e+6 E5b = 1207.140e+6 E6 = 1278.75e+6 [gnss_freq_G] __unit__ = hertz default = %(L1)s L1 = 1575.42e+6 L2 = 1227.60e+6 L5 = 1176.45e+6 [gnss_freq_I] __unit__ = hertz default = %(L5)s L5 = 1176.45e+6 S = 2492.028e+6 [gnss_freq_J] __unit__ = hertz default = %(L1)s L1 = 1575.42e+6 L2 = 1227.60e+6 L5 = 1176.45e+6 [gnss_freq_S] __unit__ = hertz default = %(L1)s L1 = 1575.42e+6 L5 = 1176.45e+6 ## \f$ w_{R} \f$: Global average signal-in-space ranging error (SISRE) radial weight factor # # Note: The radial SISRE weight factors are given for an elevation of 0 and 5 degrees, except for QZSS (J) with # 0 degree (average of QZSS perigee and apogee). # # Unit: None. # # References: # Montenbruck 2018 [4], table 4 [sisre_weight_radial_0deg] __unit__ = default = %(G)s C = 0.981 E = 0.984 G = 0.979 J = 0.991 R = 0.977 [sisre_weight_along_cross_0deg] __unit__ = default = %(G)s C = 0.136 E = 0.128 G = 0.143 J = 0.096 R = 0.149 ## \f$ w_{A, C} \f$: Global average signal-in-space ranging error (SISRE) along-/cross-track weight factor # # Note: The radial SISRE weight factors are given for an elevation of 0 and 5 degrees degrees, except for QZSS (J) with # 0 degree (average of QZSS perigee and apogee). # # Unit: None. # # References: # Montenbruck 2018 [4], table 4 [sisre_weight_radial_5deg] __unit__ = default = %(G)s C = 0.982 E = 0.984 G = 0.980 J = 0.991 R = 0.979 [sisre_weight_along_cross_5deg] __unit__ = default = %(G)s C = 0.132 E = 0.124 G = 0.139 J = 0.096 R = 0.145 PKH5N4w`midgard/math/ellipsoid.py"""Midgard library module for handling Earth ellipsoids Description: ------------ """ # Standard library imports from dataclasses import dataclass # pip install dataclasses on Python 3.6 import math from typing import Dict # Third party imports import numpy as np # Midgard imports # from midgard.dev import cache # TODO from midgard.dev import exceptions _ELLIPSOIDS: Dict[str, "Ellipsoid"] = dict() def get(ellipsoid: str) -> "Ellipsoid": """Get an ellipsoid by name""" try: return _ELLIPSOIDS[ellipsoid] except KeyError: ellipsoids = ", ".join(sorted(_ELLIPSOIDS)) raise exceptions.UnknownSystemError(f"Ellipsoid {ellipsoid!r} unknown. Use one of {ellipsoids}") @dataclass class Ellipsoid: name: str a: float f_inv: float description: str def __post_init__(self) -> None: """Simple registration of Ellipsoids""" _ELLIPSOIDS[self.name] = self @property def b(self) -> float: """Semi-minor axis""" return self.a * (1 - self.f) @property def f(self) -> float: """Flattening""" return 1 / self.f_inv @property def e(self) -> float: """Eccentricity""" return np.sqrt(self.e2) @property def e2(self) -> float: """Eccentricity squared""" return 1 - self.b ** 2 / self.a ** 2 @property def eps(self) -> float: return self.e2 / (1 - self.e2) # # Ellipsoids, see https://en.wikipedia.org/wiki/Earth_ellipsoid # sphere = Ellipsoid("sphere", a=6371008.8, f_inv=math.inf, description="Regular sphere, mean radius") WGS72 = Ellipsoid("WGS72", a=6378135, f_inv=298.26, description="WGS72") GRS80 = Ellipsoid("GRS80", a=6378137, f_inv=298.257222101, description="Used by ITRS") WGS84 = Ellipsoid("WGS84", a=6378137, f_inv=298.257223563, description="Used by GPS") IERS2003 = Ellipsoid("IERS2003", a=6378136.6, f_inv=298.25642, description="IERS conventions 2003, p. 12") IERS2010 = Ellipsoid("IERS2010", a=6378136.6, f_inv=298.25642, description="IERS conventions 2010, p. 18") PKH5Nryg.g.midgard/math/interpolation.py"""Methods for interpolating in numpy arrays Description: ------------ Different interpolation methods are decorated with `@register_interpolator` and will then become available for use as `kind` in `interpolate` and `moving_window`. Example: -------- >>> import numpy as np >>> np.set_printoptions(precision=3, suppress=True) >>> x = np.linspace(-1, 1, 11) >>> y = x**3 - x >>> y array([ 0. , 0.288, 0.384, 0.336, 0.192, 0. , -0.192, -0.336, -0.384, -0.288, 0. ]) >>> x_new = np.linspace(-0.8, 0.8, 11) >>> interpolate(x, y, x_new, kind='cubic') array([ 0.288, 0.378, 0.369, 0.287, 0.156, -0. , -0.156, -0.287, -0.369, -0.378, -0.288]) Developer info: --------------- To add your own interpolators, you can simply decorate your interpolator functions with `@register_interpolator`. Your interpolator function should have the signature (x: np.ndarray, y: np.ndarray) -> Callable For instance, the following would implement a terrible interpolation function that sets all values to zero: from midgard.math.interpolation import register_interpolator @register_interpolator def zero(x: np.ndarray, y: np.ndarray) -> Callable: def _zero(x_new: np.ndarray) -> np.ndarray: return np.zeros(y.shape) return _zero This function would then be available as an interpolator. For instance, one could do >>> interpolate(x, y, x_new, kind='zero') # doctest: +SKIP array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) """ # Standard library imports from typing import Any, Callable, Dict, List # Third party imports import numpy as np import scipy.interpolate import scipy.misc # Midgard imports from midgard.dev import exceptions # Dictionary of Enumerations. Populated by the @register_enum-decorators. _INTERPOLATORS: Dict[str, Callable] = dict() def register_interpolator(func: Callable) -> Callable: """Register an interpolation function This function should be used as a @register_interpolator-decorator Args: func: Function that will be registered as an interpolator. Returns: Same function. """ name = func.__name__ _INTERPOLATORS[name] = func return func def interpolators() -> List[str]: """Return a list of available interpolators Returns: Names of available interpolators. """ return sorted(_INTERPOLATORS) def get_interpolator(name: str) -> Callable: """Return an interpolation function Interpolation functions are registered by the @register_interpolator-decorator. The name-parameter corresponds to the function name of the interpolator. Args: name: Name of interpolator. Returns: Interpolation function with the given name. """ try: return _INTERPOLATORS[name] except KeyError: interpolator_list = ", ".join(interpolators()) raise exceptions.UnknownPluginError( f"Interpolator '{name}' is not defined. Available interpolators are {interpolator_list}." ) from None def interpolate(x: np.ndarray, y: np.ndarray, x_new: np.ndarray, *, kind: str, **ipargs: Any) -> np.ndarray: """Interpolate values from one x-array to another See `interpolators()` for a list of valid interpolators. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. x_new: 1-dimensional array with new x-values. kind: Name of interpolator to use. ipargs: Keyword arguments passed on to the interpolator. Returns: Array of interpolated y-values. """ interpolator = get_interpolator(kind)(x, y, **ipargs) return interpolator(x_new) def interpolate_with_derivative( x: np.ndarray, y: np.ndarray, x_new: np.ndarray, *, kind: str, dx: float = 0.5, **ipargs: Any ) -> np.ndarray: """Interpolate values from one x-array to another as well as find derivatives See `interpolators()` for a list of valid interpolators. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. x_new: 1-dimensional array with new x-values. kind: Name of interpolator to use. dx: Values at x ± dx are used to determine derivative. ipargs: Keyword arguments passed on to the interpolator. Returns: Tuple with array of interpolated y-values and array of derivatives. """ interpolator = get_interpolator(kind)(x, y, **ipargs) y_new = interpolator(x_new) y_dot = scipy.misc.derivative(interpolator, x_new, dx=dx) return y_new, y_dot @register_interpolator def lagrange( x: np.ndarray, y: np.ndarray, *, window: int = 10, bounds_error: bool = True, assume_sorted: bool = False ) -> Callable: """Computes the lagrange polynomial passing through a certain set of points See https://en.wikipedia.org/wiki/Lagrange_polynomial Uses `window` of the original points to calculate the Lagrange polynomials. The window of points is chosen by finding the closest original point and essentially picking the `window // 2` indices on either side. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. window: Number of points used in interpolation. bounds_error: If True, a ValueError is raised if extrapolation is attempted. assume_sorted: If True, x must be an array of monotonically increasing values. Returns: Lagrange interpolation function. """ # Check input if x.ndim != 1: raise ValueError(f"The x array must have exactly one dimension, currently x.ndim={x.ndim}.") if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") if len(y) != len(x): raise ValueError("x and y arrays must be equal in length along the first axis.") if window < 3: raise ValueError("The window should be at least 3") if window > len(x): raise ValueError(f"x and y arrays must have at least window={window} entries") # Sort the input according to the x-array if not assume_sorted: sort_idxs = np.argsort(x) x, y = x[sort_idxs], y[sort_idxs] # Check that x values are monotonically increasing if not all(np.diff(x) > 0): raise ValueError("expected x to be a sorted array with unique values") # Rescale x values to avoid numerical instability _xm, _xs = x.mean(), x.std() x_scaled = (x - _xm) / _xs # Indices to use during calculation of polynomial values indices = np.eye(window) == 0 def _lagrange(x_new: np.ndarray) -> np.ndarray: """Interpolate using a Lagrange polynomial""" if bounds_error and x_new.min() < x.min(): raise ValueError(f"Value {x_new.min()} in x_new is below the interpolation range {x.min()}.") if bounds_error and x_new.max() > x.max(): raise ValueError(f"Value {x_new.max()} in x_new is above the interpolation range {x.max()}.") y_new = np.zeros(x_new.shape[:1] + y.shape[1:]) x_new_scaled = (x_new - _xm) / _xs # Figure out which points to use for the interpolation start_idxs = np.abs(x[:, None] - x_new[None, :]).argmin(axis=0) - window // 2 start_idxs[start_idxs < 0] = 0 start_idxs[start_idxs > len(x) - window] = len(x) - window # Interpolate for each unique set of interpolation points for idx in np.unique(start_idxs): y_idx = start_idxs == idx x_wd, y_wd = x_scaled[idx : idx + window], y[idx : idx + window] diff_x = np.subtract(*np.meshgrid(x_wd, x_wd)) + np.eye(window) r = np.array( [ np.prod((x_new_scaled[y_idx, None] - x_wd[idxs]) / diff_x[idxs, i], axis=1) for i, idxs in enumerate(indices) ] ) y_new[y_idx] = r.T @ y_wd return y_new return _lagrange @register_interpolator def linear(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """Linear interpolation through the given points Uses the scipy.interpolate.interp1d function with kind='linear' behind the scenes. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the interp1d-interpolator. Returns: Linear interpolation function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") # Interpolate along axis=0 by default ipargs.setdefault("axis", 0) return scipy.interpolate.interp1d(x, y, kind="linear", **ipargs) @register_interpolator def cubic(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """Cubic spline interpolation through the given points Uses the scipy.interpolate.interp1d function with kind='cubic' behind the scenes. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the interp1d-interpolator. Returns: Cubic spline interpolation function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") # Interpolate along axis=0 by default ipargs.setdefault("axis", 0) return scipy.interpolate.interp1d(x, y, kind="cubic", **ipargs) @register_interpolator def interpolated_univariate_spline(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """One-dimensional interpolating spline for the given points Uses the scipy.interpolate.InterpolatedUnivariateSpline function behind the scenes. The original only deals with one-dimensional y arrays, so multiple calls are made for higher dimensional y arrays. The dimensions are handled independently of each other. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the scipy-interpolator. Returns: Interpolating spline function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") if y.ndim == 1: return scipy.interpolate.InterpolatedUnivariateSpline(x, y, **ipargs) # Loop over columns in y for higher dimensions def _interpolated_univariate_spline(x_new: np.ndarray) -> np.ndarray: """Interpolate using an interpolating spline""" first_y = (slice(len(y)),) first_new = (slice(len(x_new)),) y_new = np.zeros(x_new.shape[:1] + y.shape[1:]) for last_cols in np.ndindex(y.shape[1:]): idx_new = first_new + last_cols idx_y = first_y + last_cols y_new[idx_new] = scipy.interpolate.InterpolatedUnivariateSpline(x, y[idx_y], **ipargs)(x_new) return y_new return _interpolated_univariate_spline @register_interpolator def barycentric_interpolator(x: np.ndarray, y: np.ndarray, **ipargs: Any) -> Callable: """The interpolating polynomial through the given points Uses the scipy.interpolate.BarycentricInterpolator function behind the scenes. Args: x: 1-dimensional array with original x-values. y: Array with original y-values. ipargs: Keyword arguments passed on to the scipy-interpolator. Returns: Barycentric interpolation function """ if y.ndim < 1: raise ValueError(f"The y array must have at least one dimension, currently y.ndim={y.ndim}.") return scipy.interpolate.BarycentricInterpolator(x, y, **ipargs) PKd2N~[%%midgard/math/rotation.py"""Library for basic rotation matrices Description: ------------ Creates rotation matrices for rotation around the axes of a right handed Cartesian coordinate system and their derivatives. For instance, for an XYZ-system, R1 returns a rotation matrix around the x-axis and for an ENU-system, R1 returns a rotation matrix around the east-axis. dR1 returns the derivative of the R1 matrix with respect to the rotation angle. All functions are vectorized, so that one rotation matrix is returned per input angle. Example: -------- >>> from where.lib import rotation >>> rotation.R1([0, 1]) array([[[ 1. , 0. , 0. ], [ 0. , 1. , 0. ], [ 0. , -0. , 1. ]], [[ 1. , 0. , 0. ], [ 0. , 0.54030231, 0.84147098], [ 0. , -0.84147098, 0.54030231]]]) """ # Standard library imports from typing import TypeVar # Third party imports import numpy as np # Type specification: scalar float or numpy array np_float = TypeVar("np_float", float, np.ndarray) def R1(angle: np_float) -> np.ndarray: """Rotation matrix around the first axis Args: angle: Scalar, list or numpy array of angles in radians. Returns: Numpy array: Rotation matrix or array of rotation matrices. """ zero, one = _zero(angle), _one(angle) cosA, sinA = np.cos(angle), np.sin(angle) return _roll_axes(np.array([[one, zero, zero], [zero, cosA, sinA], [zero, -sinA, cosA]])) def R2(angle: np_float) -> np.ndarray: """Rotation matrix around the second axis Args: angle: Scalar, list or numpy array of angles in radians. Returns: Numpy array: Rotation matrix or array of rotation matrices. """ zero, one = _zero(angle), _one(angle) cosA, sinA = np.cos(angle), np.sin(angle) return _roll_axes(np.array([[cosA, zero, -sinA], [zero, one, zero], [sinA, zero, cosA]])) def R3(angle: np_float) -> np.ndarray: """Rotation matrix around the third axis Args: angle: Scalar, list or numpy array of angles in radians. Returns: Numpy array: Rotation matrix or array of rotation matrices. """ zero, one = _zero(angle), _one(angle) cosA, sinA = np.cos(angle), np.sin(angle) return _roll_axes(np.array([[cosA, sinA, zero], [-sinA, cosA, zero], [zero, zero, one]])) def dR1(angle: np_float) -> np.ndarray: """Derivative of a rotation matrix around the first axis with respect to the rotation angle. Args: angle: Scalar, list or numpy array of angles in radians. Returns: Numpy array: Rotation matrix or array of rotation matrices. """ zero = _zero(angle) cosA, sinA = np.cos(angle), np.sin(angle) return _roll_axes(np.array([[zero, zero, zero], [zero, -sinA, cosA], [zero, -cosA, -sinA]])) def dR2(angle: np_float) -> np.ndarray: """Derivative of a rotation matrix around the second axis with respect to the rotation angle Args: angle: Scalar, list or numpy array of angles in radians. Returns: Numpy array: Rotation matrix or array of rotation matrices. """ zero = _zero(angle) cosA, sinA = np.cos(angle), np.sin(angle) return _roll_axes(np.array([[-sinA, zero, -cosA], [zero, zero, zero], [cosA, zero, -sinA]])) def dR3(angle: np_float) -> np.ndarray: """Derivative of a rotation matrix around the third axis with respect to the rotation angle Args: angle: Scalar, list or numpy array of angles in radians. Returns: Numpy array: Rotation matrix or array of rotation matrices. """ zero = _zero(angle) cosA, sinA = np.cos(angle), np.sin(angle) return _roll_axes(np.array([[-sinA, cosA, zero], [-cosA, -sinA, zero], [zero, zero, zero]])) def enu2trs(lat: np_float, lon: np_float) -> np.ndarray: """Rotation matrix for rotating an ENU coordinate system to an earth oriented one See for instance http://www.navipedia.net/index.php/Transformations_between_ECEF_and_ENU_coordinates This is equal to doing:: R3(-(np.pi/2 + lon)) @ R1(-(np.pi/2 - lat)) Args: lat (Float or Array): Latitude of origin of ENU coordinate system. lon (Float or Array): Longitude of origin of ENU coordinate system. Returns: Numpy array: Rotation matrix or array of rotation matrices. """ zero = _zero(lat) coslat, coslon, sinlat, sinlon = np.cos(lat), np.cos(lon), np.sin(lat), np.sin(lon) return _roll_axes( np.array( [ [-sinlon, -coslon * sinlat, coslon * coslat], [coslon, -sinlon * sinlat, sinlon * coslat], [zero, coslat, sinlat], ] ) ) def trs2enu(lat: np_float, lon: np_float) -> np.ndarray: """Rotation matrix for rotating an earth oriented coordinate system to an ENU one See for instance http://www.navipedia.net/index.php/Transformations_between_ECEF_and_ENU_coordinates This is equal to doing:: R1(np.pi/2 - lat) @ R3(np.pi/2 + lon) Args: lat (Float or Array): Latitude of origin of ENU coordinate system. lon (Float or Array): Longitude of origin of ENU coordinate system. Returns: Numpy array: Rotation matrix or array of rotation matrices. """ zero = _zero(lat) coslat, coslon, sinlat, sinlon = np.cos(lat), np.cos(lon), np.sin(lat), np.sin(lon) return _roll_axes( np.array( [ [-sinlon, coslon, zero], [-sinlat * coslon, -sinlat * sinlon, coslat], [coslat * coslon, coslat * sinlon, sinlat], ] ) ) def _roll_axes(mat: np.ndarray) -> np.ndarray: """Move the axes of an array of 2-d matrices properly Roll the first two dimensions to the end, so that indexing works as expected. Args: mat (numpy array): Array of 2-d matrices, can be a numpy array of any dimension. Returns: Numpy array: The same array as mat, but with the first two dimensions rolled to the end. """ return mat if mat.ndim < 3 else mat.transpose(np.roll(np.arange(mat.ndim), -2)) def _zero(angle: np_float) -> np_float: """Returns a scalar or array of zeros with the same shape as angle Args: angle: Scalar, list or numpy array. Returns: Scalar or numpy array: Zero-scalar or array. """ try: return np.zeros(angle.shape) # angle is numpy array except AttributeError: try: return np.zeros(len(angle)) # angle is list or other iterable except TypeError: return 0 # angle is scalar def _one(angle: np_float) -> np_float: """Returns a scalar or array of ones with the same shape as angle Args: angle: Scalar, list or numpy array. Returns: Scalar or numpy array: One-scalar or array. """ try: return np.ones(angle.shape) # angle is numpy array except AttributeError: try: return np.ones(len(angle)) # angle is list or other iterable except TypeError: return 1 # angle is scalar PKyfN+midgard/math/transformation.py"""Midgard library module for handling of geodetic conversions Description: ------------ """ # Third party imports import numpy as np # Midgard imports # from midgard.dev import cache # TODO from midgard.math.ellipsoid import Ellipsoid, GRS80 def trs2llh(trs: np.ndarray, ellipsoid: Ellipsoid = None) -> np.ndarray: """Convert geocentric xyz-coordinates to geodetic latitude-, longitude-, height-coordinates Reimplementation of GC2GDE.for from the IUA SOFA software collection. """ if ellipsoid is None: ellipsoid = trs.ellipsoid if hasattr(trs, "ellipsoid") else GRS80 trs = np.asarray(trs) if trs.ndim < 1 or trs.ndim > 2 or trs.shape[-1] != 3: raise ValueError("'trs' must be a 1- or 2-dimensional array with 3 columns") x, y, z = trs.T e4t = ellipsoid.e2 ** 2 * 1.5 ec2 = 1 - ellipsoid.e2 ec = np.sqrt(ec2) # Compute longitude lon = np.arctan2(y, x) lat = np.zeros(len(trs)) height = np.zeros(len(trs)) p2 = x ** 2 + y ** 2 # Distance from polar axis squared absz = np.abs(z) pi = np.ones(len(trs)) * np.pi # Identify positions close to the poles pole_idx = p2 <= ellipsoid.a ** 2 * 1e-32 p = np.sqrt(p2) # Normalization s0 = absz / ellipsoid.a pn = p / ellipsoid.a zc = ec * s0 # Newton factors: c0 = ec * pn a0 = np.sqrt(c0 ** 2 + s0 ** 2) d0 = zc * a0 ** 3 + ellipsoid.e2 * s0 ** 3 f0 = pn * a0 ** 3 - ellipsoid.e2 ** c0 ** 3 # Halley correction factor b0 = e4t * s0 ** 2 ** c0 ** 2 * pn * (a0 - ec) s1 = d0 * f0 - b0 * s0 cc = ec * (f0 ** 2 - b0 * c0) # Compute latitude and height lat[~pole_idx] = (np.arctan(s1 / cc))[~pole_idx] height[~pole_idx] = ( (p * cc + absz * s1 - ellipsoid.a * np.sqrt(ec2 * s1 ** 2 + cc ** 2)) / np.sqrt(s1 ** 2 + cc * 2) )[~pole_idx] lat[pole_idx] = (pi / 2)[pole_idx] height[pole_idx] = (absz - ellipsoid.b)[pole_idx] return np.stack((lat, lon, height)).T def llh2trs(llh: np.ndarray, ellipsoid: Ellipsoid = None) -> np.ndarray: """Convert geodetic latitude-, longitude-, height-coordinates to geocentric xyz-coordinates Reimplementation of GD2GCE.for from the IUA SOFA software collection. """ if ellipsoid is None: ellipsoid = llh.ellipsoid if hasattr(llh, "ellipsoid") else GRS80 llh = np.asarray(llh) if llh.ndim < 1 or llh.ndim > 2 or llh.shape[-1] != 3: raise ValueError("'llh' must be a 1- or 2-dimensional array with 3 columns") lat, lon, height = llh.T coslat, sinlat = np.cos(lat), np.sin(lat) coslon, sinlon = np.cos(lon), np.sin(lon) w = (1 - ellipsoid.f) ** 2 ac = ellipsoid.a / np.sqrt(coslat ** 2 + w * sinlat ** 2) r = (ac + height) * coslat x = r * coslon y = r * sinlon z = (w * ac + height) * sinlat return np.stack((x, y, z)).T def delta_trs2enu(trs: "TrsPositionDelta") -> "EnuPositionDelta": """Convert position deltas from TRS to ENU""" return (trs.ref_pos.trs2enu @ trs.mat)[:, :, 0] def delta_enu2trs(enu: "EnuPositionDelta") -> "TrsPositionDelta": """Convert position deltas from ENU to TRS""" return (enu.ref_pos.enu2trs @ enu.mat)[:, :, 0] def delta_trs2enu_posvel(trs: "TrsPosVelDelta") -> "EnuPosVelDelta": """Convert position deltas from TRS to ENU""" t2e = trs.ref_pos.trs2enu trs2enu = np.block([[t2e, np.zeros(t2e.shape)], [np.zeros(t2e.shape), t2e]]) return (trs2enu @ trs.mat)[:, :, 0] def delta_enu2trs_posvel(enu: "EnuPosVelDelta") -> "TrsPosVelDelta": """Convert position deltas from ENU to TRS""" e2t = enu.ref_pos.enu2trs enu2trs = np.block([[e2t, np.zeros(e2t.shape)], [np.zeros(e2t.shape), e2t]]) return (enu2trs @ enu.mat)[:, :, 0] PKC>> from midgard.math.unit import Unit >>> seconds_in_two_weeks = 2 * Unit.week2secs >>> seconds_in_two_weeks 1209600.0 In general `Unit.spam2ham` will give the multiplicative conversion scale between the units `spam` and `ham`. Through the `pint` package we support a lot of units. See `Unit.list()` or `https://github.com/hgrecco/pint/blob/master/pint/default_en.txt`. Another notation is also available, and might be necessary for some more complicated conversions: >>> seconds_in_two_weeks = 2 * Unit('week', 'seconds') >>> miles_per_hour_in_meters_per_second = Unit('mph', 'meters / sec') Do note that we support most normal aliases as well as singular and plural forms of the units. For instance can `second` be represented as `s`, `sec`, `secs` and `seconds`. Prefixes are also handled: >>> nanoseconds_in_an_hour = Unit.hour2nanosecs >>> inches_in_a_kilometer = Unit.km2inches For more complicated conversions (for instance from Celsius to Fahrenheit) one can create custom conversion functions using `convert`: >>> c2f = Unit.function('celsius', 'fahrenheit') >>> absolute_zero_in_fahrenheit = c2f(-273.15) For convenience, this can also be written using the attribute notation as `Unit.spam_to_ham(spam_value)`. Then the previous example simply becomes: >>> absolute_zero_in_fahrenheit = Unit.celsius_to_fahrenheit(-273.15) (or even easier `Unit.kelvin_to_fahrenheit(0)`). Finally, we can access the unit/quantity system of `pint` by using the name of a unit by itself, e.g. `Unit.spam`. For instance: >>> distance = 42 * Unit.km >>> time = 31 * Unit('minutes') >>> speed = distance / time >>> speed.to(Unit.mph) >>> speed.to_base_units() However, using the full unit system adds some overhead so we should be careful in using it in heavy calculations. Note that `pint` has a system for defining new units and constants if necessary, `http://pint.readthedocs.io/en/latest/defining.html`. To use this system, add units to the `unit.txt` file in the current (midgard/math) directory. """ # Standard library imports import pathlib from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union try: import importlib.resources as importlib_resources # Python >= 3.7 except ImportError: import importlib_resources # Python <= 3.6: pip install importlib_resources # Third party imports import numpy as np import pint # Midgard imports # from midgard.dev import cache # TODO from midgard.dev import exceptions # Type that can be either float or numpy array np_float = TypeVar("np_float", float, np.array) # The _UNITS-dict is used to keep track of units values returned by functions and methods _UNITS: Dict[str, Dict[str, str]] = dict() class _convert_units(type): """A meta-class that does the parsing of units The meta-class is used for convenience. It allows us to use the `Unit`-class without instantiating it. That is, we can write `Unit.km2m` instead of `Unit().km2m`. """ _ureg = pint.UnitRegistry() # @cache.function # TODO def __call__(cls, from_unit: str, to_unit: Optional[str] = None) -> Any: # type: ignore """Calculate the conversion scale between from_unit and to_unit If `to_unit` is not given, then `from_unit` is returned as a `pint` Quantity. Args: from_unit: The unit to convert from. to_unit: The unit to convert to. Returns: Scale to multiply by to convert from from_unit to to_unit, or from_unit as a Quantity. """ if to_unit is None: return cls._ureg(from_unit) else: return cls._ureg(from_unit).to(to_unit).magnitude def __getattr__(cls, key: str) -> Any: """Simplify notation for converting between units This makes it possible to type `Unit.km2m` instead of `Unit('km', 'm')`. We split on the character `2` (pronounced "to"), and pass the result on to `__call__` to do the conversion. If a `2` is not found, we check if we can split on '_to_' instead, if so it is interpreted as a conversion function and is handed of to `convert`. Finally, if no split is done, the attribute is interpreted as a simple unit. Note that if you need a unit whose name contains a '2' (or '_to_') you need to use the notation `Unit('foot_H2O', 'pascal'). Similarly, more complex units need the same notation, e.g. `Unit('meters per second ** 2')`. Args: key: The key (name) of the attribute to the class. Interpreted as units. Returns: Scale to multiply by or function to perform the unit conversion, or Quantity. """ if "2" in key: from_unit, _, to_unit = key.partition("2") return cls(from_unit, to_unit) elif "_to_" in key: from_unit, _, to_unit = key.partition("_to_") return cls.function(from_unit, to_unit) else: return cls(key) def load_definitions(cls, file_path: Union[str, pathlib.Path]) -> None: """Load customized units and constants Piggybacking on `pint`'s system for defining new units and constants, `http://pint.readthedocs.io/en/latest/defining.html`. Args: file_path: File containing definitions of units and constants. """ with open(file_path, mode="rt") as fid: cls._ureg.load_definitions(fid) def function(cls, from_unit: str, to_unit: str) -> Callable[[float], float]: """Create a conversion function This is necessary for unit conversions that are not simple multiplications. The usual example is temperature conversions for instance from Celsius to Fahrenheit. Args: from_unit: The unit to convert from. to_unit: The unit to convert to. Returns: Conversion function that converts from from_unit to to_unit. """ return lambda value: cls._ureg.Quantity(value, cls._ureg(from_unit)).to(cls._ureg(to_unit)).magnitude def register(cls, unit: str) -> Callable: """Register unit of a function/method/property This method should be used as a decorator on the function/method/property, and specify the unit of the value returned by that function/method/property. For instance @property @Unit.register('meter') def calculate_delay(...): return delay_in_meters Units registered with this decorator can be used by the functions returned by the `unit_func_factory`, `convert_func_factory` and `factor_func_factory`. Args: unit: Name of unit. Returns: Decorator that registers the unit. """ def register_decorator(func: Callable) -> Callable: """Register unit of func in _UNITS-dictionary""" module_name = func.__module__ func_name = func.__name__ _UNITS.setdefault(module_name, dict())[func_name] = unit return func return register_decorator @staticmethod def _get_unit(module_name: str, func_name: str) -> str: """Get registered unit of function/method/property Outside code should use the `unit_factory` to get registered units. Args: module_name: Name of module containing function/method/property. func_name: Name of function/method/property with registered unit. Returns: Name of unit. """ units = _UNITS.get(module_name, dict()) try: return units[func_name] except KeyError: raise exceptions.UnitError(f"No unit is registered for {func_name!r} in {module_name!r}") from None def unit_factory(cls, module_name: str) -> Callable[[str], str]: """Provide a function that can get registered units of functions/methods/properties The function checks for units registered with the unit.register-decorator. It can for instance be added to a class as follows: unit = staticmethod(Unit.unit_factory(__name__)) Args: module_name: Name of module as returned by `__name__`. Returns: Function that gets unit of values returned by functions. """ def unit(func_name: str) -> str: """Unit of value returned by function/method/property Args: func_name (String): Name of function/method/property. Returns: String: Name of unit. """ return cls._get_unit(module_name, func_name) return unit def convert_factory(cls, module_name: str) -> Callable[[object, str, str], float]: """Provide a function that can convert values of properties to a given unit The function checks for units registered with the unit.register-decorator. It can for instance be added to a class as follows: convert_to = Unit.convert_factory(__name__) Note that unlike the other factories, this one only works for properties. Args: module_name: Name of module as returned by `__name__`. Returns: Function that converts values of properties. """ def convert(self: object, property_name: str, to_unit: str) -> float: """Convert value of property to another unit Args: property_name: Name of property. to_unit: Name of other unit. Returns: Value of property converted to other unit. """ from_unit = cls._get_unit(module_name, property_name) factor = cls(from_unit, to_unit) return getattr(self, property_name) * factor return convert def factor_factory(cls, module_name: str) -> Callable[[str, str], float]: """Provide a function that calculates conversion factor to another unit The function finds conversion factors for units registered with the unit.register-decorator. It can for instance be added to a class as follows: unit_factor = staticmethod(Unit.factor_factory(__name__)) Args: module_name: Name of module as returned by `__name__`. Returns: Function that calculates conversion factor to another unit. """ def factor(func_name: str, to_unit: str) -> float: """Conversion factor between unit of function/method/property and another unit Args: func_name: Name of function/method/property. to_unit: Name of other unit. Returns: Conversion factor. """ from_unit = cls._get_unit(module_name, func_name) return cls(from_unit, to_unit) return factor def units_dict(cls, module_name: str) -> Dict[str, str]: """Dictionary of units registered on a module Add a sub-dictionary if the module name is unknown, to set up a reference in case units are registered later. Args: module_name: Name of module. Returns: Dictionary with units registered on a module. """ return _UNITS.setdefault(module_name, dict()) @property def names(cls) -> List[str]: """List available units and constants The list of available units contains aliases (for instance s, sec, second), but not plural forms (secs, seconds) or possible prefixes (milliseconds, usec, ms). Returns: List of names of available units and constants """ return dir(cls._ureg) class Unit(metaclass=_convert_units): """Unit converter The implementation of the unit conversion is done in the `_convert_units`-metaclass. """ # Make pint exceptions available from pint.errors import DimensionalityError # # Conversion routines not defined by pint # @classmethod def rad_to_dms(cls, radians: np_float) -> Tuple[np_float, np_float, np_float]: """Converts radians to degrees, minutes and seconds Args: radians: Angle(s) in radians Returns: Tuple with degrees, minutes, and seconds. Examples: >>> Unit.rad_to_dms(1.04570587646256) (59.0, 54.0, 52.3200000000179) >>> Unit.rad_to_dms(-0.2196050301753194) (-12.0, 34.0, 56.78900000000468) >>> Unit.rad_to_dms(-0.005817642339636369) (-0.0, 19.0, 59.974869999999925) """ sign = np.sign(radians) degrees = abs(radians) * cls.radians2degrees minutes = (degrees % 1) * cls.hour2minutes seconds = (minutes % 1) * cls.minute2seconds return sign * np.floor(degrees), np.floor(minutes), seconds @classmethod def dms_to_rad(cls, degrees: np_float, minutes: np_float, seconds: np_float) -> np_float: """Convert degrees, minutes and seconds to radians The sign of degrees will be used. In this case, be careful that the sign of +0 or -0 is correctly passed on. That is, degrees must be specified as a float, not an int. Args: degrees: Degrees as float (including sign) or array of floats minutes: Minutes as int/float or array of ints/floats seconds: Seconds as float or array of floats Returns: Given degrees, minutes and seconds as radians. Examples: >>> Unit.dms_to_rad(59, 54, 52.32) 1.04570587646256 >>> Unit.dms_to_rad(-12.0, 34, 56.789) -0.21960503017531938 >>> Unit.dms_to_rad(-0.0, 19, 59.974870) -0.005817642339636369 """ sign = np.copysign(1, degrees) return ( sign * (np.abs(degrees) + minutes * cls.minutes2hours + seconds * cls.seconds2hours) * cls.degrees2radians ) @classmethod def hms_to_rad(cls, hours: np_float, minutes: np_float, seconds: np_float) -> np_float: """Convert hours, minutes and seconds to radians Args: hours: Hours as int or array of ints minutes: Minutes as int or or array of ints seconds: Seconds as float or or array of floats Returns: Given hours, minutes and seconds as radians. Examples: >>> Unit.hms_to_rad(17, 7, 17.753427) 4.482423920139868 >>> Unit.hms_to_rad(12, 0, 0.00) 3.1415926535897936 >>> Unit.hms_to_rad(-12, 34, 56.789) Traceback (most recent call last): ValueError: hours must be non-negative """ if np.any(np.array(hours) < 0): raise ValueError("hours must be non-negative") return 15 * cls.dms_to_rad(hours, minutes, seconds) # Read extra units defined specially for Midgard with importlib_resources.open_text("midgard.math", "unit.txt") as fid: Unit._ureg.load_definitions(fid) PKJ7N?'TTmidgard/math/unit.txt# Extra units used in Midgard, not already defined # # Midgard is using Pint for handling units, http://pint.readthedocs.io/. The list of units already defined in Pint is # quite comprehensive. See https://github.com/hgrecco/pint/blob/master/pint/default_en.txt or simply do # # >>> from midgard.math.unit import Unit # >>> print(Unit.list()) # # However, the list can also be extended. Any units defined in this file will also be available to Midgard. See # http://pint.readthedocs.io/en/stable/defining.html for a description of how to define new units. unit = [] percent = unit / 100 PKC Parser: """Use the given parser on a file and return parsed data Specify `parser_name` and `file_path` to the file that should be parsed. The following parsers are available: {doc_parser_names} Data can be retrieved either as Dictionaries, Pandas DataFrames or Midgard Datasets by using one of the methods `as_dict`, `as_dataframe` or `as_dataset`. Example: >>> df = parse_file('rinex2_obs', 'ande3160.16o').as_dataframe() # doctest: +SKIP Args: parser_name: Name of parser file_path: Path to file that should be parsed. encoding: Encoding in file that is parsed. timer_logger: Logging function that will be used to log timing information. use_cache: Whether to use a cache to avoid parsing the same file several times. parser_args: Input arguments to the parser Returns: Parser: Parser with the parsed data """ # TODO: Cache # Create the parser and parse the data parser = plugins.call( package_name=__name__, plugin_name=parser_name, file_path=file_path, encoding=encoding, **parser_args ) with Timer(f"Finish {parser_name} ({__name__}) - {file_path} in", logger=timer_logger): return parser.parse() def names() -> List[str]: """List the names of the available parsers Returns: Names of the available parsers """ return plugins.names(package_name=__name__) PKQDN豰x44midgard/parsers/_parser.py"""Basic functionality for parsing datafiles, extended by individual parsers Description: ------------ This module contains functions and classes for parsing datafiles. It should typically be used by calling `parsers.parse_file`: Example: -------- from midgard import parsers my_new_parser = parsers.parse_file('my_new_parser', 'file_name.txt', ...) my_data = my_new_parser.as_dict() """ # Standard library imports import pathlib from typing import Any, Callable, Dict, List, NoReturn, Optional, Union # Third party imports import pandas as pd # Midgard imports from midgard.dev import log class Parser: """An abstract base class that has basic methods for parsing a datafile This class provides functionality for parsing a file. You should inherit from one of the specific parsers like for instance ChainParser, LineParser, SinexParser etc Attributes: file_path (Path): Path to the datafile that will be read. file_encoding (String): Encoding of the datafile. parser_name (String): Name of the parser (as needed to call parsers.parse_...). data_available (Boolean): Indicator of whether data are available. data (Dict): The (observation) data read from file. meta (Dict): Metainformation read from file. """ def __init__(self, file_path: Union[str, pathlib.Path], encoding: Optional[str] = None) -> None: """Set up the basic information needed by the parser Args: file_path: Path to file that will be read. encoding: Encoding of file that will be read. """ self.file_path = pathlib.Path(file_path) self.file_encoding = encoding self.parser_name = self.__module__.split(".")[-1] # Initialize the data self.data_available = self.file_path.exists() self.meta: Dict[str, Any] = dict(__parser_name__=self.parser_name, __data_path__=str(self.file_path)) self.data: Dict[str, Any] = dict() def setup_parser(self) -> Any: """Set up information needed for the parser""" pass def setup_postprocessors(self) -> List[Callable[[], None]]: """List postprocessors that should be called after parsing""" return list() def parse(self) -> "Parser": """Parse data This is a basic implementation that carries out the whole pipeline of reading and parsing datafiles including calculating secondary data. Subclasses should typically implement (at least) the `read_data`-method. """ self.setup_parser() if self.data_available: self.read_data() if not self.data_available: # May have been set to False by self.read_data() log.warn(f"No data found by {self.__class__.__name__} in {self.file_path}") return self self.postprocess_data() return self def read_data(self) -> None: """Read data from the data file Data should be read from `self.file_path` and stored in the dictionary `self.data`. A description of the data may be placed in the dictionary `self.meta`. If data are not available for some reason, `self.data_available` should be set to False. """ raise NotImplementedError def postprocess_data(self) -> None: """Do simple manipulations on the data after they are read Simple manipulations of data may be performed in postprocessors after they are read. They should be kept simple so that a parser returns as true representation of the data file as possible. Advanced calculations may be done inside apriori classes or similar. To add a postprocessor, define it in its own method, and override the `setup_postprocessors`-method to return a list of all postprocessors. """ for postprocessor in self.setup_postprocessors(): postprocessor() def as_dict(self, include_meta: bool = False) -> Dict[str, Any]: """Return the parsed data as a dictionary This is a basic implementation, simply returning a copy of self.data. More advanced parsers may need to reimplement this method. Args: include_meta: Whether to include meta-data in the returned dictionary (default: False). Returns: Dictionary with the parsed data. """ return dict(self.data, __meta__=self.meta) if include_meta else self.data.copy() def as_dataframe(self, index: Optional[Union[str, List[str]]] = None) -> pd.DataFrame: """Return the parsed data as a Pandas DataFrame This is a basic implementation, assuming the `self.data`-dictionary has a simple structure. More advanced parsers may need to reimplement this method. Args: index: Optional name of field to use as index. May also be a list of strings. Returns: Pandas DataFrame with the parsed data. """ df = pd.DataFrame.from_dict(self.data) if index is not None: df.set_index(index, drop=True, inplace=True) return df def as_dataset(self) -> NoReturn: """Return the parsed data as a Midgard Dataset This is a basic implementation, assuming the `self.data`-dictionary has a simple structure. More advanced parsers may need to reimplement this method. Returns: Dataset: The parsed data. """ # from midgard import data # # dset = data.Dataset.from_dict(self.data) # # return dset raise NotImplementedError def update_dataset(self, dset: Any) -> NoReturn: """Update the given dataset with the parsed data This is a basic implementation, assuming the `self.data`-dictionary has a simple structure. More advanced parsers may need to reimplement this method. Args: dset: The dataset to update with parsed data. """ # parser_dset = self.as_dataset() # if new fields: # dset.add ... # elif new epochs: # dset.extend ... raise NotImplementedError def __repr__(self) -> str: """Simple string representation of the parser""" return f"{self.__class__.__name__}(file_path='{self.file_path}')" PKis#N? midgard/parsers/_parser_chain.py"""Basic functionality for parsing datafiles line by line Description: ------------ This module contains functions and classes for parsing datafiles. Example: -------- from midgard import parsers my_new_parser = parsers.parse_file('my_new_parser', 'file_name.txt', ...) my_data = my_new_parser.as_dict() """ # Standard library imports import itertools from typing import Any, Callable, Dict, NamedTuple, Optional # Midgard imports from midgard.parsers._parser import Parser # A simple structure used to define the necessary fields of a parser class ParserDef(NamedTuple): """A convenience class for defining the necessary fields of a parser A single parser can read and parse one group of datalines, defined through the ParserDef by specifying how to parse each line (parser_def), how to identify each line (label), how to recognize the end of the group of lines (end_marker) and finally what (if anything) should be done after all lines in a group is read (end_callback). The end_marker, label, skip_line and end_callback parameters should all be functions with the following signatures: end_marker = func(line, line_num, next_line) label = func(line, line_num) skip_line = func(line) end_callback = func(cache) The parser definition `parser_def` includes the `parser`, `field`, `strip` and `delimiter` entries. The `parser` entry points to the parser function and the `field` entry defines how to separate the line in fields. The separated fields are saved either in a dictionary or in a list. In the last case the line is split on whitespace by default. With the `delimiter` entry the default definition can be overwritten. Leading and trailing whitespace characters are removed by default before a line is parsed. This default can be overwritten by defining the characters, which should be removed with the 'strip' entry. The `parser` dictionary is defined like: parser_def = {