PK!FFFnornir/__init__.pyfrom nornir.init_nornir import InitNornir __all__ = ("InitNornir",) PK! llnornir/core/__init__.pyimport logging import logging.config from multiprocessing.dummy import Pool from nornir.core.configuration import Config from nornir.core.inventory import Inventory from nornir.core.state import GlobalState from nornir.core.task import AggregatedResult, Task logger = logging.getLogger(__name__) class Nornir(object): """ This is the main object to work with. It contains the inventory and it serves as task dispatcher. Arguments: inventory (:obj:`nornir.core.inventory.Inventory`): Inventory to work with data(GlobalState): shared data amongst different iterations of nornir dry_run(``bool``): Whether if we are testing the changes or not config (:obj:`nornir.core.configuration.Config`): Configuration object Attributes: inventory (:obj:`nornir.core.inventory.Inventory`): Inventory to work with data(:obj:`nornir.core.GlobalState`): shared data amongst different iterations of nornir dry_run(``bool``): Whether if we are testing the changes or not config (:obj:`nornir.core.configuration.Config`): Configuration parameters """ def __init__( self, inventory: Inventory, config: Config = None, data: GlobalState = None ) -> None: self.data = data if data is not None else GlobalState() self.inventory = inventory self.config = config or Config() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close_connections(on_good=True, on_failed=True) def filter(self, *args, **kwargs): """ See :py:meth:`nornir.core.inventory.Inventory.filter` Returns: :obj:`Nornir`: A new object with same configuration as ``self`` but filtered inventory. """ b = Nornir(**self.__dict__) b.inventory = self.inventory.filter(*args, **kwargs) return b def _run_serial(self, task, hosts, **kwargs): result = AggregatedResult(kwargs.get("name") or task.__name__) for host in hosts: result[host.name] = Task(task, **kwargs).start(host, self) return result def _run_parallel(self, task, hosts, num_workers, **kwargs): result = AggregatedResult(kwargs.get("name") or task.__name__) pool = Pool(processes=num_workers) result_pool = [ pool.apply_async(Task(task, **kwargs).start, args=(h, self)) for h in hosts ] pool.close() pool.join() for rp in result_pool: r = rp.get() result[r.host.name] = r return result def run( self, task, num_workers=None, raise_on_error=None, on_good=True, on_failed=False, **kwargs, ): """ Run task over all the hosts in the inventory. Arguments: task (``callable``): function or callable that will be run against each device in the inventory num_workers(``int``): Override for how many hosts to run in paralell for this task raise_on_error (``bool``): Override raise_on_error behavior on_good(``bool``): Whether to run or not this task on hosts marked as good on_failed(``bool``): Whether to run or not this task on hosts marked as failed **kwargs: additional argument to pass to ``task`` when calling it Raises: :obj:`nornir.core.exceptions.NornirExecutionError`: if at least a task fails and self.config.core.raise_on_error is set to ``True`` Returns: :obj:`nornir.core.task.AggregatedResult`: results of each execution """ num_workers = num_workers or self.config.core.num_workers run_on = [] if on_good: for name, host in self.inventory.hosts.items(): if name not in self.data.failed_hosts: run_on.append(host) if on_failed: for name, host in self.inventory.hosts.items(): if name in self.data.failed_hosts: run_on.append(host) num_hosts = len(self.inventory.hosts) task_name = kwargs.get("name") or task.__name__ if num_hosts: logger.info( f"Running task %r with args %s on %d hosts", task_name, kwargs, num_hosts, ) else: logger.warning("Task %r has not been run – 0 hosts selected", task_name) if num_workers == 1: result = self._run_serial(task, run_on, **kwargs) else: result = self._run_parallel(task, run_on, num_workers, **kwargs) raise_on_error = ( raise_on_error if raise_on_error is not None else self.config.core.raise_on_error ) # noqa if raise_on_error: result.raise_on_error() else: self.data.failed_hosts.update(result.failed_hosts.keys()) return result def dict(self): """ Return a dictionary representing the object. """ return {"data": self.data.dict(), "inventory": self.inventory.dict()} def close_connections(self, on_good=True, on_failed=False): def close_connections_task(task): task.host.close_connections() self.run(task=close_connections_task, on_good=on_good, on_failed=on_failed) @classmethod def get_validators(cls): yield cls.validate @classmethod def validate(cls, v): if not isinstance(v, cls): raise ValueError(f"Nornir: Nornir expected not {type(v)}") return v @property def state(self): return GlobalState PK!7nornir/core/configuration.pyimport logging import logging.handlers import sys import warnings from pathlib import Path from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Type, List from nornir.core.exceptions import ConflictingConfigurationWarning if TYPE_CHECKING: from nornir.core.deserializer.inventory import Inventory # noqa class SSHConfig(object): __slots__ = "config_file" def __init__(self, config_file: str) -> None: self.config_file = config_file class InventoryConfig(object): __slots__ = "plugin", "options", "transform_function", "transform_function_options" def __init__( self, plugin: Type["Inventory"], options: Dict[str, Any], transform_function: Optional[Callable[..., Any]], transform_function_options: Optional[Dict[str, Any]], ) -> None: self.plugin = plugin self.options = options self.transform_function = transform_function self.transform_function_options = transform_function_options class LoggingConfig(object): __slots__ = "enabled", "level", "file", "format", "to_console", "loggers" def __init__( self, enabled: Optional[bool], level: str, file_: str, format_: str, to_console: bool, loggers: List[str], ) -> None: self.enabled = enabled self.level = level self.file = file_ self.format = format_ self.to_console = to_console self.loggers = loggers def configure(self) -> None: if not self.enabled: return root_logger = logging.getLogger() if root_logger.hasHandlers() or root_logger.level != logging.WARNING: msg = ( "Native Python logging configuration has been detected, but Nornir " "logging is enabled too. " "This can lead to unexpected logging results. " "Please set logging.enabled config to False " "to disable automatic Nornir logging configuration. Refer to " "https://nornir.readthedocs.io/en/stable/configuration/index.html#logging" # noqa ) warnings.warn(msg, ConflictingConfigurationWarning) formatter = logging.Formatter(self.format) # log INFO and DEBUG to stdout stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(formatter) stdout_handler.setLevel(logging.DEBUG) stdout_handler.addFilter(lambda record: record.levelno <= logging.INFO) # log WARNING, ERROR and CRITICAL to stderr stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setFormatter(formatter) stderr_handler.setLevel(logging.WARNING) for logger_name in self.loggers: logger_ = logging.getLogger(logger_name) logger_.propagate = False logger_.setLevel(self.level) if logger_.hasHandlers(): # Don't add handlers if some handlers are already attached to the logger # This is crucial to avoid duplicate handlers # Alternative would be to clear all handlers and reconfigure them # with Nornir # There are several situations this branch can be executed: # multiple calls to InitNornir, # logging.config.dictConfig configuring 'nornir' logger, etc. # The warning is not emitted in this scenario continue if self.file: handler = logging.handlers.RotatingFileHandler( str(Path(self.file)), maxBytes=1024 * 1024 * 10, backupCount=20 ) handler.setFormatter(formatter) logger_.addHandler(handler) if self.to_console: logger_.addHandler(stdout_handler) logger_.addHandler(stderr_handler) class Jinja2Config(object): __slots__ = "filters" def __init__(self, filters: Optional[Dict[str, Callable[..., Any]]]) -> None: self.filters = filters or {} class CoreConfig(object): __slots__ = ("num_workers", "raise_on_error") def __init__(self, num_workers: int, raise_on_error: bool) -> None: self.num_workers = num_workers self.raise_on_error = raise_on_error class Config(object): __slots__ = ("core", "ssh", "inventory", "jinja2", "logging", "user_defined") def __init__( self, inventory: InventoryConfig, ssh: SSHConfig, logging: LoggingConfig, jinja2: Jinja2Config, core: CoreConfig, user_defined: Dict[str, Any], ) -> None: self.inventory = inventory self.ssh = ssh self.logging = logging self.jinja2 = jinja2 self.core = core self.user_defined = user_defined PK!a˦nornir/core/connections.pyfrom abc import ABC, abstractmethod from typing import Any, Dict, NoReturn, Optional, Type from nornir.core.configuration import Config from nornir.core.exceptions import ( ConnectionPluginAlreadyRegistered, ConnectionPluginNotRegistered, ) class ConnectionPlugin(ABC): """ Connection plugins have to inherit from this class and provide implementations for both the :meth:`open` and :meth:`close` methods. Attributes: connection: Underlying connection. Populated by :meth:`open`. state: Dictionary to hold any data that needs to be shared between the connection plugin and the plugin tasks using this connection. """ __slots__ = ("connection", "state") def __init__(self) -> None: self.connection: Any = UnestablishedConnection() self.state: Dict[str, Any] = {} @abstractmethod def open( self, hostname: Optional[str], username: Optional[str], password: Optional[str], port: Optional[int], platform: Optional[str], extras: Optional[Dict[str, Any]] = None, configuration: Optional[Config] = None, ) -> None: """ Connect to the device and populate the attribute :attr:`connection` with the underlying connection """ pass @abstractmethod def close(self) -> None: """Close the connection with the device""" pass class UnestablishedConnection(object): def close(self) -> NoReturn: raise ValueError("Connection not established") disconnect = close class Connections(Dict[str, ConnectionPlugin]): available: Dict[str, Type[ConnectionPlugin]] = {} @classmethod def register(cls, name: str, plugin: Type[ConnectionPlugin]) -> None: """Registers a connection plugin with a specified name Args: name: name of the connection plugin to register plugin: defined connection plugin class Raises: :obj:`nornir.core.exceptions.ConnectionPluginAlreadyRegistered` if another plugin with the specified name was already registered """ existing_plugin = cls.available.get(name) if existing_plugin is None: cls.available[name] = plugin elif existing_plugin != plugin: raise ConnectionPluginAlreadyRegistered( f"Connection plugin {plugin.__name__} can't be registered as " f"{name!r} because plugin {existing_plugin.__name__} " f"was already registered under this name" ) @classmethod def deregister(cls, name: str) -> None: """Deregisters a registered connection plugin by its name Args: name: name of the connection plugin to deregister Raises: :obj:`nornir.core.exceptions.ConnectionPluginNotRegistered` """ if name not in cls.available: raise ConnectionPluginNotRegistered( f"Connection {name!r} is not registered" ) cls.available.pop(name) @classmethod def deregister_all(cls) -> None: """Deregisters all registered connection plugins""" cls.available = {} @classmethod def get_plugin(cls, name: str) -> Type[ConnectionPlugin]: """Fetches the connection plugin by name if already registered Args: name: name of the connection plugin Raises: :obj:`nornir.core.exceptions.ConnectionPluginNotRegistered` """ if name not in cls.available: raise ConnectionPluginNotRegistered( f"Connection {name!r} is not registered" ) return cls.available[name] PK!$nornir/core/deserializer/__init__.pyPK!w`)nornir/core/deserializer/configuration.pyimport importlib import logging from pathlib import Path from typing import Any, Callable, Dict, Optional, Type, Union, List, cast from nornir.core import configuration from nornir.core.deserializer.inventory import Inventory from pydantic import BaseSettings, Schema import ruamel.yaml logger = logging.getLogger(__name__) class BaseNornirSettings(BaseSettings): def _build_values(self, init_kwargs: Dict[str, Any]) -> Dict[str, Any]: config_settings = init_kwargs.pop("__config_settings__", {}) return {**config_settings, **self._build_environ(), **init_kwargs} class SSHConfig(BaseNornirSettings): config_file: str = Schema( default="~/.ssh/config", description="Path to ssh configuration file" ) class Config: env_prefix = "NORNIR_SSH_" ignore_extra = False @classmethod def deserialize(cls, **kwargs: Any) -> configuration.SSHConfig: s = SSHConfig(**kwargs) s.config_file = str(Path(s.config_file).expanduser()) return configuration.SSHConfig(**s.dict()) class InventoryConfig(BaseNornirSettings): plugin: str = Schema( default="nornir.plugins.inventory.simple.SimpleInventory", description="Import path to inventory plugin", ) options: Dict[str, Any] = Schema( default={}, description="kwargs to pass to the inventory plugin" ) transform_function: str = Schema( default="", description=( "Path to transform function. The transform_function " "you provide will run against each host in the inventory" ), ) transform_function_options: Dict[str, Any] = Schema( default={}, description="kwargs to pass to the transform_function" ) class Config: env_prefix = "NORNIR_INVENTORY_" ignore_extra = False @classmethod def deserialize(cls, **kwargs: Any) -> configuration.InventoryConfig: inv = InventoryConfig(**kwargs) return configuration.InventoryConfig( plugin=cast(Type[Inventory], _resolve_import_from_string(inv.plugin)), options=inv.options, transform_function=_resolve_import_from_string(inv.transform_function), transform_function_options=inv.transform_function_options, ) class LoggingConfig(BaseNornirSettings): enabled: Optional[bool] = Schema( default=None, description="Whether to configure logging or not" ) level: str = Schema(default="INFO", description="Logging level") file: str = Schema(default="nornir.log", description="Logging file") format: str = Schema( default="%(asctime)s - %(name)12s - %(levelname)8s - %(funcName)10s() - %(message)s", description="Logging format", ) to_console: bool = Schema( default=False, description="Whether to log to console or not" ) loggers: List[str] = Schema(default=["nornir"], description="Loggers to configure") class Config: env_prefix = "NORNIR_LOGGING_" ignore_extra = False @classmethod def deserialize(cls, **kwargs) -> configuration.LoggingConfig: conf = cls(**kwargs) return configuration.LoggingConfig( enabled=conf.enabled, level=conf.level.upper(), file_=conf.file, format_=conf.format, to_console=conf.to_console, loggers=conf.loggers, ) class Jinja2Config(BaseNornirSettings): filters: str = Schema( default="", description="Path to callable returning jinja filters to be used" ) class Config: env_prefix = "NORNIR_JINJA2_" ignore_extra = False @classmethod def deserialize(cls, **kwargs: Any) -> configuration.Jinja2Config: c = Jinja2Config(**kwargs) jinja_filter_func = _resolve_import_from_string(c.filters) jinja_filters = jinja_filter_func() if jinja_filter_func else {} return configuration.Jinja2Config(filters=jinja_filters) class CoreConfig(BaseNornirSettings): num_workers: int = Schema( default=20, description="Number of Nornir worker threads that are run at the same time by default", ) raise_on_error: bool = Schema( default=False, description=( "If set to ``True``, (:obj:`nornir.core.Nornir.run`) method of " "will raise exception :obj:`nornir.core.exceptions.NornirExecutionError` " "if at least a host failed" ), ) class Config: env_prefix = "NORNIR_CORE_" ignore_extra = False @classmethod def deserialize(cls, **kwargs: Any) -> configuration.CoreConfig: c = CoreConfig(**kwargs) return configuration.CoreConfig(**c.dict()) class Config(BaseNornirSettings): core: CoreConfig = CoreConfig() inventory: InventoryConfig = InventoryConfig() ssh: SSHConfig = SSHConfig() logging: LoggingConfig = LoggingConfig() jinja2: Jinja2Config = Jinja2Config() user_defined: Dict[str, Any] = Schema( default={}, description="User-defined pairs" ) class Config: env_prefix = "NORNIR_" ignore_extra = False @classmethod def deserialize( cls, __config_settings__: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> configuration.Config: __config_settings__ = __config_settings__ or {} c = Config( core=CoreConfig( __config_settings__=__config_settings__.pop("core", {}), **kwargs.pop("core", {}), ), ssh=SSHConfig( __config_settings__=__config_settings__.pop("ssh", {}), **kwargs.pop("ssh", {}), ), inventory=InventoryConfig( __config_settings__=__config_settings__.pop("inventory", {}), **kwargs.pop("inventory", {}), ), logging=LoggingConfig( __config_settings__=__config_settings__.pop("logging", {}), **kwargs.pop("logging", {}), ), jinja2=Jinja2Config( __config_settings__=__config_settings__.pop("jinja2", {}), **kwargs.pop("jinja2", {}), ), __config_settings__=__config_settings__, **kwargs, ) return configuration.Config( core=CoreConfig.deserialize(**c.core.dict()), inventory=InventoryConfig.deserialize(**c.inventory.dict()), ssh=SSHConfig.deserialize(**c.ssh.dict()), logging=LoggingConfig.deserialize(**c.logging.dict()), jinja2=Jinja2Config.deserialize(**c.jinja2.dict()), user_defined=c.user_defined, ) @classmethod def load_from_file(cls, config_file: str, **kwargs: Any) -> configuration.Config: config_dict: Dict[str, Any] = {} if config_file: yml = ruamel.yaml.YAML(typ="safe") with open(config_file, "r") as f: config_dict = yml.load(f) or {} return Config.deserialize(__config_settings__=config_dict, **kwargs) def _resolve_import_from_string( import_path: Union[Callable[..., Any], str] ) -> Optional[Callable[..., Any]]: try: if not import_path: return None elif callable(import_path): return import_path module_name, obj_name = import_path.rsplit(".", 1) module = importlib.import_module(module_name) return getattr(module, obj_name) except Exception: logger.error("Failed to import %r", import_path, exc_info=True) raise PK!=l..%nornir/core/deserializer/inventory.pyfrom typing import Any, Callable, Dict, List, Optional, Union from nornir.core import inventory from pydantic import BaseModel VarsDict = Dict[str, Any] HostsDict = Dict[str, VarsDict] GroupsDict = Dict[str, VarsDict] DefaultsDict = VarsDict class BaseAttributes(BaseModel): hostname: Optional[str] = None port: Optional[int] = None username: Optional[str] = None password: Optional[str] = None platform: Optional[str] = None class Config: ignore_extra = False class ConnectionOptions(BaseAttributes): extras: Optional[Dict[str, Any]] @classmethod def serialize(cls, i: inventory.ConnectionOptions) -> "ConnectionOptions": return ConnectionOptions( hostname=i.hostname, port=i.port, username=i.username, password=i.password, platform=i.platform, extras=i.extras, ) class InventoryElement(BaseAttributes): groups: List[str] = [] data: Dict[str, Any] = {} connection_options: Dict[str, ConnectionOptions] = {} @classmethod def deserialize( cls, name: str, hostname: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None, platform: Optional[str] = None, groups: Optional[List[str]] = None, data: Optional[Dict[str, Any]] = None, connection_options: Optional[Dict[str, ConnectionOptions]] = None, defaults: inventory.Defaults = None, ) -> Dict[str, Any]: parent_groups = inventory.ParentGroups(groups) connection_options = connection_options or {} conn_opts = { k: inventory.ConnectionOptions(**v) for k, v in connection_options.items() } return { "name": name, "hostname": hostname, "port": port, "username": username, "password": password, "platform": platform, "groups": parent_groups, "data": data, "connection_options": conn_opts, "defaults": defaults, } @classmethod def deserialize_host(cls, **kwargs: Any) -> inventory.Host: return inventory.Host(**cls.deserialize(**kwargs)) @classmethod def deserialize_group(cls, **kwargs: Any) -> inventory.Group: return inventory.Group(**cls.deserialize(**kwargs)) @classmethod def serialize(cls, e: Union[inventory.Host, inventory.Group]) -> "InventoryElement": d = {} for f in cls.__fields__: d[f] = object.__getattribute__(e, f) d["groups"] = list(d["groups"]) d["connection_options"] = { k: ConnectionOptions.serialize(v) for k, v in d["connection_options"].items() } return InventoryElement(**d) class Defaults(BaseAttributes): data: Dict[str, Any] = {} connection_options: Dict[str, ConnectionOptions] = {} @classmethod def serialize(cls, defaults: inventory.Defaults) -> "InventoryElement": d = {} for f in cls.__fields__: d[f] = getattr(defaults, f) d["connection_options"] = { k: ConnectionOptions.serialize(v) for k, v in d["connection_options"].items() } return Defaults(**d) class Inventory(BaseModel): hosts: Dict[str, InventoryElement] groups: Dict[str, InventoryElement] defaults: Defaults @classmethod def deserialize( cls, transform_function: Optional[Callable[..., Any]] = None, transform_function_options: Optional[Dict[str, Any]] = None, *args: Any, **kwargs: Any ) -> inventory.Inventory: transform_function_options = transform_function_options or {} deserialized = cls(*args, **kwargs) defaults_dict = deserialized.defaults.dict() for k, v in defaults_dict["connection_options"].items(): defaults_dict["connection_options"][k] = inventory.ConnectionOptions(**v) defaults = inventory.Defaults(**defaults_dict) hosts = inventory.Hosts() for n, h in deserialized.hosts.items(): hosts[n] = InventoryElement.deserialize_host( defaults=defaults, name=n, **h.dict() ) groups = inventory.Groups() for n, g in deserialized.groups.items(): groups[n] = InventoryElement.deserialize_group(name=n, **g.dict()) return inventory.Inventory( hosts=hosts, groups=groups, defaults=defaults, transform_function=transform_function, transform_function_options=transform_function_options, ) @classmethod def serialize(cls, inv: inventory.Inventory) -> "Inventory": hosts = {} for n, h in inv.hosts.items(): hosts[n] = InventoryElement.serialize(h) groups = {} for n, g in inv.groups.items(): groups[n] = InventoryElement.serialize(g) defaults = Defaults.serialize(inv.defaults) return Inventory(hosts=hosts, groups=groups, defaults=defaults) PK!M nornir/core/exceptions.pyfrom typing import Dict, TYPE_CHECKING if TYPE_CHECKING: from nornir.core.connection import Connection from nornir.core.result import AggregatedResult, MultiResult, Result # noqa from nornir.core.tasks import Task class ConnectionException(Exception): """ Superclass for all the Connection* Exceptions """ def __init__(self, connection: "Connection") -> None: self.connection = connection class ConnectionAlreadyOpen(ConnectionException): """ Raised when opening an already opened connection """ pass class ConnectionNotOpen(ConnectionException): """ Raised when trying to close a connection that isn't open """ pass class ConnectionPluginAlreadyRegistered(ConnectionException): """ Raised when trying to register an already registered plugin """ pass class ConnectionPluginNotRegistered(ConnectionException): """ Raised when trying to access a plugin that is not registered """ pass class CommandError(Exception): """ Raised when there is a command error. """ def __init__( self, command: str, status_code: int, stdout: str, stderr: str ) -> None: self.status_code = status_code self.stdout = stdout self.stderr = stderr self.command = command super().__init__(command, status_code, stdout, stderr) class NornirExecutionError(Exception): """ Raised by nornir when any of the tasks managed by :meth:`nornir.core.Nornir.run` when any of the tasks fail. """ def __init__(self, result: "AggregatedResult") -> None: self.result = result @property def failed_hosts(self) -> Dict[str, "MultiResult"]: """ Hosts that failed to complete the task """ return {k: v for k, v in self.result.items() if v.failed} def __str__(self) -> str: text = "\n" for k, r in self.result.items(): text += "{}\n".format("#" * 40) if r.failed: text += "# {} (failed)\n".format(k) else: text += "# {} (succeeded)\n".format(k) text += "{}\n".format("#" * 40) for sub_r in r: text += "**** {}\n".format(sub_r.name) text += "{}\n".format(sub_r) return text class NornirSubTaskError(Exception): """ Raised by nornir when a sub task managed by :meth:`nornir.core.Task.run` has failed """ def __init__(self, task: "Task", result: "Result"): self.task = task self.result = result def __str__(self) -> str: return "Subtask: {} (failed)\n".format(self.task) class ConflictingConfigurationWarning(UserWarning): pass PK!,k%7 7 nornir/core/filter.pyfrom typing import Any, List from nornir.core.inventory import Host class F_BASE(object): def __call__(self, host: Host) -> bool: raise NotImplementedError() class F_OP_BASE(F_BASE): def __init__(self, op1: F_BASE, op2: F_BASE) -> None: self.op1 = op1 self.op2 = op2 def __and__(self, other: F_BASE) -> "AND": return AND(self, other) def __or__(self, other: F_BASE) -> "OR": return OR(self, other) def __repr__(self) -> str: return "( {} {} {} )".format(self.op1, self.__class__.__name__, self.op2) class AND(F_OP_BASE): def __call__(self, host: Host) -> bool: return self.op1(host) and self.op2(host) class OR(F_OP_BASE): def __call__(self, host: Host) -> bool: return self.op1(host) or self.op2(host) class F(F_BASE): def __init__(self, **kwargs: Any) -> None: self.filters = kwargs def __call__(self, host: Host) -> bool: return all( F._verify_rules(host, k.split("__"), v) for k, v in self.filters.items() ) def __and__(self, other: "F") -> AND: return AND(self, other) def __or__(self, other: "F") -> OR: return OR(self, other) def __invert__(self) -> "F": return NOT_F(**self.filters) def __repr__(self) -> str: return "".format(self.filters) @staticmethod def _verify_rules(data: Any, rule: List[str], value: Any) -> bool: if len(rule) > 1: try: return F._verify_rules(data.get(rule[0], {}), rule[1:], value) except AttributeError: return False elif len(rule) == 1: operator = "__{}__".format(rule[0]) if hasattr(data, operator): return bool(getattr(data, operator)(value)) elif hasattr(data, rule[0]): if callable(getattr(data, rule[0])): return bool(getattr(data, rule[0])(value)) else: return bool(getattr(data, rule[0]) == value) elif rule == ["in"]: return bool(data in value) elif rule == ["any"]: return any([x in data for x in value]) elif rule == ["all"]: return all([x in data for x in value]) else: return bool(data.get(rule[0]) == value) else: raise Exception( "I don't know how I got here:\n{}\n{}\n{}".format(data, rule, value) ) class NOT_F(F): def __call__(self, host: Host) -> bool: return not any( F._verify_rules(host, k.split("__"), v) for k, v in self.filters.items() ) def __invert__(self) -> F: return F(**self.filters) def __repr__(self) -> str: return "".format(self.filters) PK!nnornir/core/helpers/__init__.pyfrom typing import Any, Dict def merge_two_dicts(x: Dict[Any, Any], y: Dict[Any, Any]) -> Dict[Any, Any]: try: z = x.copy() except AttributeError: z = dict(x) z.update(y) return z PK!"i99#nornir/core/helpers/jinja_helper.pyfrom typing import Any, Dict, Optional from jinja2 import Environment, FileSystemLoader, StrictUndefined def render_from_file( path: str, template: str, jinja_filters: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> str: jinja_filters = jinja_filters or {} env = Environment( loader=FileSystemLoader(path), undefined=StrictUndefined, trim_blocks=True ) env.filters.update(jinja_filters) t = env.get_template(template) return t.render(**kwargs) def render_from_string( template: str, jinja_filters: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> str: jinja_filters = jinja_filters or {} env = Environment(undefined=StrictUndefined, trim_blocks=True) env.filters.update(jinja_filters) t = env.from_string(template) return t.render(**kwargs) PK!6 ^:^:nornir/core/inventory.pyimport warnings from collections import UserList from typing import Any, Dict, List, Optional, Set, Union from nornir.core import deserializer from nornir.core.configuration import Config from nornir.core.connections import ConnectionPlugin, Connections from nornir.core.exceptions import ConnectionAlreadyOpen, ConnectionNotOpen class BaseAttributes(object): __slots__ = ("hostname", "port", "username", "password", "platform") def __init__( self, hostname: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None, platform: Optional[str] = None, ) -> None: self.hostname = hostname self.port = port self.username = username self.password = password self.platform = platform def dict(self): w = f"{self.dict.__qualname__} is deprecated, use nornir.core.deserializer instead" warnings.warn(w) return ( getattr(deserializer.inventory, self.__class__.__name__) .serialize(self) .dict() ) class ConnectionOptions(BaseAttributes): __slots__ = ("extras",) def __init__(self, extras: Optional[Dict[str, Any]] = None, **kwargs) -> None: self.extras = extras super().__init__(**kwargs) class ParentGroups(UserList): __slots__ = "refs" def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.refs: List["Group"] = kwargs.get("refs", []) def __contains__(self, value) -> bool: return value in self.data or value in self.refs class InventoryElement(BaseAttributes): __slots__ = ("groups", "data", "connection_options") def __init__( self, groups: Optional[ParentGroups] = None, data: Optional[Dict[str, Any]] = None, connection_options: Optional[Dict[str, ConnectionOptions]] = None, **kwargs, ) -> None: self.groups = groups or ParentGroups() self.data = data or {} self.connection_options = connection_options or {} super().__init__(**kwargs) class Defaults(BaseAttributes): __slots__ = ("data", "connection_options") def __init__( self, data: Optional[Dict[str, Any]] = None, connection_options: Optional[Dict[str, ConnectionOptions]] = None, **kwargs, ) -> None: self.data = data or {} self.connection_options = connection_options or {} super().__init__(**kwargs) class Host(InventoryElement): __slots__ = ("name", "connections", "defaults") def __init__( self, name: str, defaults: Optional[Defaults] = None, **kwargs ) -> None: self.name = name self.defaults = defaults or Defaults() self.connections: Connections = Connections() super().__init__(**kwargs) def _resolve_data(self): processed = [] result = {} for k, v in self.data.items(): processed.append(k) result[k] = v for g in self.groups.refs: for k, v in g.items(): if k not in processed: processed.append(k) result[k] = v for k, v in self.defaults.data.items(): if k not in processed: processed.append(k) result[k] = v return result def keys(self): """Returns the keys of the attribute ``data`` and of the parent(s) groups.""" return self._resolve_data().keys() def values(self): """Returns the values of the attribute ``data`` and of the parent(s) groups.""" return self._resolve_data().values() def items(self): """ Returns all the data accessible from a device, including the one inherited from parent groups """ return self._resolve_data().items() def has_parent_group(self, group): """Retuns whether the object is a child of the :obj:`Group` ``group``""" if isinstance(group, str): return self._has_parent_group_by_name(group) else: return self._has_parent_group_by_object(group) def _has_parent_group_by_name(self, group): for g in self.groups.refs: if g.name == group or g.has_parent_group(group): return True def _has_parent_group_by_object(self, group): for g in self.groups.refs: if g is group or g.has_parent_group(group): return True def __getitem__(self, item): try: return self.data[item] except KeyError: for g in self.groups.refs: try: r = g[item] return r except KeyError: continue r = self.defaults.data.get(item) if r: return r raise def __getattribute__(self, name): if name not in ("hostname", "port", "username", "password", "platform"): return object.__getattribute__(self, name) v = object.__getattribute__(self, name) if v is None: for g in self.groups.refs: r = getattr(g, name) if r is not None: return r return object.__getattribute__(self.defaults, name) else: return v def __bool__(self): return bool(self.name) def __setitem__(self, item, value): self.data[item] = value def __len__(self): return len(self._resolve_data().keys()) def __iter__(self): return self.data.__iter__() def __str__(self): return self.name def __repr__(self): return "{}: {}".format(self.__class__.__name__, self.name or "") def get(self, item, default=None): """ Returns the value ``item`` from the host or hosts group variables. Arguments: item(``str``): The variable to get default(``any``): Return value if item not found """ if hasattr(self, item): return getattr(self, item) try: return self.__getitem__(item) except KeyError: return default def get_connection_parameters( self, connection: Optional[str] = None ) -> ConnectionOptions: if not connection: d = ConnectionOptions( hostname=self.hostname, port=self.port, username=self.username, password=self.password, platform=self.platform, extras={}, ) else: r = self._get_connection_options_recursively(connection) if r is not None: d = ConnectionOptions( hostname=r.hostname if r.hostname is not None else self.hostname, port=r.port if r.port is not None else self.port, username=r.username if r.username is not None else self.username, password=r.password if r.password is not None else self.password, platform=r.platform if r.platform is not None else self.platform, extras=r.extras if r.extras is not None else {}, ) else: d = ConnectionOptions( hostname=self.hostname, port=self.port, username=self.username, password=self.password, platform=self.platform, extras={}, ) return d def _get_connection_options_recursively( self, connection: str ) -> Optional[ConnectionOptions]: p = self.connection_options.get(connection) if p is None: p = ConnectionOptions() for g in self.groups.refs: sp = g._get_connection_options_recursively(connection) if sp is not None: p.hostname = p.hostname if p.hostname is not None else sp.hostname p.port = p.port if p.port is not None else sp.port p.username = p.username if p.username is not None else sp.username p.password = p.password if p.password is not None else sp.password p.platform = p.platform if p.platform is not None else sp.platform p.extras = p.extras if p.extras is not None else sp.extras sp = self.defaults.connection_options.get(connection, None) if sp is not None: p.hostname = p.hostname if p.hostname is not None else sp.hostname p.port = p.port if p.port is not None else sp.port p.username = p.username if p.username is not None else sp.username p.password = p.password if p.password is not None else sp.password p.platform = p.platform if p.platform is not None else sp.platform p.extras = p.extras if p.extras is not None else sp.extras return p def get_connection(self, connection: str, configuration: Config) -> Any: """ The function of this method is twofold: 1. If an existing connection is already established for the given type return it 2. If none exists, establish a new connection of that type with default parameters and return it Raises: AttributeError: if it's unknown how to establish a connection for the given type Arguments: connection: Name of the connection, for instance, netmiko, paramiko, napalm... Returns: An already established connection """ if connection not in self.connections: conn = self.get_connection_parameters(connection) self.open_connection( connection=connection, configuration=configuration, hostname=conn.hostname, port=conn.port, username=conn.username, password=conn.password, platform=conn.platform, extras=conn.extras, ) return self.connections[connection].connection def get_connection_state(self, connection: str) -> Dict[str, Any]: """ For an already established connection return its state. """ if connection not in self.connections: raise ConnectionNotOpen(connection) return self.connections[connection].state def open_connection( self, connection: str, configuration: Config, hostname: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, port: Optional[int] = None, platform: Optional[str] = None, extras: Optional[Dict[str, Any]] = None, default_to_host_attributes: bool = True, ) -> ConnectionPlugin: """ Open a new connection. If ``default_to_host_attributes`` is set to ``True`` arguments will default to host attributes if not specified. Raises: AttributeError: if it's unknown how to establish a connection for the given type Returns: An already established connection """ if connection in self.connections: raise ConnectionAlreadyOpen(connection) self.connections[connection] = self.connections.get_plugin(connection)() if default_to_host_attributes: conn_params = self.get_connection_parameters(connection) self.connections[connection].open( hostname=hostname if hostname is not None else conn_params.hostname, username=username if username is not None else conn_params.username, password=password if password is not None else conn_params.password, port=port if port is not None else conn_params.port, platform=platform if platform is not None else conn_params.platform, extras=extras if extras is not None else conn_params.extras, configuration=configuration, ) else: self.connections[connection].open( hostname=hostname, username=username, password=password, port=port, platform=platform, extras=extras, configuration=configuration, ) return self.connections[connection] def close_connection(self, connection: str) -> None: """ Close the connection""" if connection not in self.connections: raise ConnectionNotOpen(connection) self.connections.pop(connection).close() def close_connections(self) -> None: # Decouple deleting dictionary elements from iterating over connections dict existing_conns = list(self.connections.keys()) for connection in existing_conns: self.close_connection(connection) class Group(Host): pass class Hosts(Dict[str, Host]): pass class Groups(Dict[str, Group]): pass class Inventory(object): __slots__ = ("hosts", "groups", "defaults") def __init__( self, hosts: Hosts, groups: Optional[Groups] = None, defaults: Optional[Defaults] = None, transform_function=None, transform_function_options=None, ) -> None: self.hosts = hosts self.groups = groups or Groups() self.defaults = defaults or Defaults() for host in self.hosts.values(): host.groups.refs = [self.groups[p] for p in host.groups] for group in self.groups.values(): group.groups.refs = [self.groups[p] for p in group.groups] if transform_function: for h in self.hosts.values(): transform_function(h, **transform_function_options) def filter(self, filter_obj=None, filter_func=None, *args, **kwargs): filter_func = filter_obj or filter_func if filter_func: filtered = {n: h for n, h in self.hosts.items() if filter_func(h, **kwargs)} else: filtered = { n: h for n, h in self.hosts.items() if all(h.get(k) == v for k, v in kwargs.items()) } return Inventory(hosts=filtered, groups=self.groups, defaults=self.defaults) def __len__(self): return self.hosts.__len__() def children_of_group(self, group: Union[str, Group]) -> Set[Host]: """ Returns set of hosts that belongs to a group including those that belong indirectly via inheritance """ hosts: List[Host] = set() for host in self.hosts.values(): if host.has_parent_group(group): hosts.add(host) return hosts PK!myz2nornir/core/state.pyfrom typing import Any, Dict, Set class GlobalState(object): """ This class is just a placeholder to share data amongst different versions of Nornir after running ``filter`` multiple times. Attributes: failed_hosts: Hosts that have failed to run a task properly """ __slots__ = "dry_run", "failed_hosts" def __init__(self, dry_run: bool = None, failed_hosts: Set[str] = None) -> None: self.dry_run = dry_run self.failed_hosts = failed_hosts or set() def recover_host(self, host: str) -> None: """Remove ``host`` from list of failed hosts.""" self.failed_hosts.discard(host) def reset_failed_hosts(self) -> None: """Reset failed hosts and make all hosts available for future tasks.""" self.failed_hosts = set() def to_dict(self) -> Dict[str, Any]: """ Return a dictionary representing the object. """ return self.__dict__ PK!N"N"nornir/core/task.pyimport logging import traceback from typing import Any, Optional, TYPE_CHECKING from nornir.core.exceptions import NornirExecutionError from nornir.core.exceptions import NornirSubTaskError if TYPE_CHECKING: from nornir.core.inventory import Host logger = logging.getLogger(__name__) class Task(object): """ A task is basically a wrapper around a function that has to be run against multiple devices. You won't probably have to deal with this class yourself as :meth:`nornir.core.Nornir.run` will create it automatically. Arguments: task (callable): function or callable we will be calling name (``string``): name of task, defaults to ``task.__name__`` severity_level (logging.LEVEL): Severity level associated to the task **kwargs: Parameters that will be passed to the ``task`` Attributes: task (callable): function or callable we will be calling name (``string``): name of task, defaults to ``task.__name__`` params: Parameters that will be passed to the ``task``. self.results (:obj:`nornir.core.task.MultiResult`): Intermediate results host (:obj:`nornir.core.inventory.Host`): Host we are operating with. Populated right before calling the ``task`` nornir(:obj:`nornir.core.Nornir`): Populated right before calling the ``task`` severity_level (logging.LEVEL): Severity level associated to the task """ def __init__(self, task, name=None, severity_level=logging.INFO, **kwargs): self.name = name or task.__name__ self.task = task self.params = kwargs self.results = MultiResult(self.name) self.severity_level = severity_level def __repr__(self): return self.name def start(self, host, nornir): """ Run the task for the given host. Arguments: host (:obj:`nornir.core.inventory.Host`): Host we are operating with. Populated right before calling the ``task`` nornir(:obj:`nornir.core.Nornir`): Populated right before calling the ``task`` Returns: host (:obj:`nornir.core.task.MultiResult`): Results of the task and its subtasks """ self.host = host self.nornir = nornir try: logger.debug("Host %r: running task %r", self.host.name, self.name) r = self.task(self, **self.params) if not isinstance(r, Result): r = Result(host=host, result=r) except NornirSubTaskError as e: tb = traceback.format_exc() logger.error( "Host %r: task %r failed with traceback:\n%s", self.host.name, self.name, tb, ) r = Result(host, exception=e, result=str(e), failed=True) except Exception as e: tb = traceback.format_exc() logger.error( "Host %r: task %r failed with traceback:\n%s", self.host.name, self.name, tb, ) r = Result(host, exception=e, result=tb, failed=True) r.name = self.name r.severity_level = logging.ERROR if r.failed else self.severity_level self.results.insert(0, r) return self.results def run(self, task, **kwargs): """ This is a utility method to call a task from within a task. For instance: def grouped_tasks(task): task.run(my_first_task) task.run(my_second_task) nornir.run(grouped_tasks) This method will ensure the subtask is run only for the host in the current thread. """ if not self.host or not self.nornir: msg = ( "You have to call this after setting host and nornir attributes. ", "You probably called this from outside a nested task", ) raise Exception(msg) if "severity_level" not in kwargs: kwargs["severity_level"] = self.severity_level task = Task(task, **kwargs) r = task.start(self.host, self.nornir) self.results.append(r[0] if len(r) == 1 else r) if r.failed: # Without this we will keep running the grouped task raise NornirSubTaskError(task=task, result=r) return r def is_dry_run(self, override: bool = None) -> bool: """ Returns whether current task is a dry_run or not. """ return override if override is not None else self.nornir.data.dry_run class Result(object): """ Result of running individual tasks. Arguments: changed (bool): ``True`` if the task is changing the system diff (obj): Diff between state of the system before/after running this task result (obj): Result of the task execution, see task's documentation for details host (:obj:`nornir.core.inventory.Host`): Reference to the host that lead ot this result failed (bool): Whether the execution failed or not severity_level (logging.LEVEL): Severity level associated to the result of the excecution exception (Exception): uncaught exception thrown during the exection of the task (if any) Attributes: changed (bool): ``True`` if the task is changing the system diff (obj): Diff between state of the system before/after running this task result (obj): Result of the task execution, see task's documentation for details host (:obj:`nornir.core.inventory.Host`): Reference to the host that lead ot this result failed (bool): Whether the execution failed or not severity_level (logging.LEVEL): Severity level associated to the result of the excecution exception (Exception): uncaught exception thrown during the exection of the task (if any) """ def __init__( self, host: "Host", result: Any = None, changed: bool = False, diff: str = "", failed: bool = False, exception: Optional[BaseException] = None, severity_level: int = logging.INFO, **kwargs: Any ): self.result = result self.host = host self.changed = changed self.diff = diff self.failed = failed self.exception = exception self.name = None self.severity_level = severity_level self.stdout: Optional[str] = None self.stderr: Optional[str] = None for k, v in kwargs.items(): setattr(self, k, v) def __repr__(self): return '{}: "{}"'.format(self.__class__.__name__, self.name) def __str__(self): if self.exception: return str(self.exception) else: return str(self.result) class AggregatedResult(dict): """ It basically is a dict-like object that aggregates the results for all devices. You can access each individual result by doing ``my_aggr_result["hostname_of_device"]``. """ def __init__(self, name, **kwargs): self.name = name super().__init__(**kwargs) def __repr__(self): return "{} ({}): {}".format( self.__class__.__name__, self.name, super().__repr__() ) @property def failed(self): """If ``True`` at least a host failed.""" return any([h.failed for h in self.values()]) @property def failed_hosts(self): """Hosts that failed during the execution of the task.""" return {h: r for h, r in self.items() if r.failed} def raise_on_error(self): """ Raises: :obj:`nornir.core.exceptions.NornirExecutionError`: When at least a task failed """ if self.failed: raise NornirExecutionError(self) class MultiResult(list): """ It is basically is a list-like object that gives you access to the results of all subtasks for a particular device/task. """ def __init__(self, name): self.name = name def __getattr__(self, name): return getattr(self[0], name) def __repr__(self): return "{}: {}".format(self.__class__.__name__, super().__repr__()) @property def failed(self): """If ``True`` at least a task failed.""" return any([h.failed for h in self]) @property def changed(self): """If ``True`` at least a task changed the system.""" return any([h.changed for h in self]) def raise_on_error(self): """ Raises: :obj:`nornir.core.exceptions.NornirExecutionError`: When at least a task failed """ if self.failed: raise NornirExecutionError(self) PK!. nornir/init_nornir.pyfrom typing import Any, Callable, Dict, Optional import warnings from nornir.core import Nornir from nornir.core.connections import Connections from nornir.core.deserializer.configuration import Config from nornir.core.state import GlobalState from nornir.plugins.connections.napalm import Napalm from nornir.plugins.connections.netmiko import Netmiko from nornir.plugins.connections.paramiko import Paramiko def register_default_connection_plugins() -> None: Connections.register("napalm", Napalm) Connections.register("netmiko", Netmiko) Connections.register("paramiko", Paramiko) def cls_to_string(cls: Callable[..., Any]) -> str: return f"{cls.__module__}.{cls.__name__}" def InitNornir( config_file: str = "", dry_run: bool = False, configure_logging: Optional[bool] = None, **kwargs: Dict[str, Any], ) -> Nornir: """ Arguments: config_file(str): Path to the configuration file (optional) dry_run(bool): Whether to simulate changes or not configure_logging: Whether to configure logging or not. This argument is being deprecated. Please use logging.enabled parameter in the configuration instead. **kwargs: Extra information to pass to the :obj:`nornir.core.configuration.Config` object Returns: :obj:`nornir.core.Nornir`: fully instantiated and configured """ register_default_connection_plugins() if callable(kwargs.get("inventory", {}).get("plugin", "")): kwargs["inventory"]["plugin"] = cls_to_string(kwargs["inventory"]["plugin"]) if callable(kwargs.get("inventory", {}).get("transform_function", "")): kwargs["inventory"]["transform_function"] = cls_to_string( kwargs["inventory"]["transform_function"] ) conf = Config.load_from_file(config_file, **kwargs) data = GlobalState(dry_run=dry_run) if configure_logging is not None: msg = ( "'configure_logging' argument is deprecated, please use " "'logging.enabled' parameter in the configuration instead: " "https://nornir.readthedocs.io/en/stable/configuration/index.html" ) warnings.warn(msg, DeprecationWarning) if conf.logging.enabled is None: if configure_logging is not None: conf.logging.enabled = configure_logging else: conf.logging.enabled = True conf.logging.configure() inv = conf.inventory.plugin.deserialize( transform_function=conf.inventory.transform_function, transform_function_options=conf.inventory.transform_function_options, config=conf, **conf.inventory.options, ) return Nornir(inventory=inv, config=conf, data=data) PK!nornir/plugins/__init__.pyPK!&nornir/plugins/connections/__init__.pyPK!qM$nornir/plugins/connections/napalm.pyfrom typing import Any, Dict, Optional from napalm import get_network_driver from nornir.core.configuration import Config from nornir.core.connections import ConnectionPlugin class Napalm(ConnectionPlugin): """ This plugin connects to the device using the NAPALM driver and sets the relevant connection. Inventory: extras: passed as it is to the napalm driver """ def open( self, hostname: Optional[str], username: Optional[str], password: Optional[str], port: Optional[int], platform: Optional[str], extras: Optional[Dict[str, Any]] = None, configuration: Optional[Config] = None, ) -> None: extras = extras or {} parameters: Dict[str, Any] = { "hostname": hostname, "username": username, "password": password, "optional_args": {}, } try: parameters["optional_args"][ "ssh_config_file" ] = configuration.ssh.config_file # type: ignore except AttributeError: pass parameters.update(extras) if port and "port" not in parameters["optional_args"]: parameters["optional_args"]["port"] = port network_driver = get_network_driver(platform) connection = network_driver(**parameters) connection.open() self.connection = connection def close(self) -> None: self.connection.close() PK!b]%nornir/plugins/connections/netmiko.pyfrom typing import Any, Dict, Optional from netmiko import ConnectHandler from nornir.core.configuration import Config from nornir.core.connections import ConnectionPlugin napalm_to_netmiko_map = { "ios": "cisco_ios", "nxos": "cisco_nxos", "nxos_ssh": "cisco_nxos", "eos": "arista_eos", "junos": "juniper_junos", "iosxr": "cisco_xr", } class Netmiko(ConnectionPlugin): """ This plugin connects to the device using the Netmiko driver and sets the relevant connection. Inventory: extras: maps to argument passed to ``ConnectHandler``. """ def open( self, hostname: Optional[str], username: Optional[str], password: Optional[str], port: Optional[int], platform: Optional[str], extras: Optional[Dict[str, Any]] = None, configuration: Optional[Config] = None, ) -> None: parameters = { "host": hostname, "username": username, "password": password, "port": port, } try: parameters[ "ssh_config_file" ] = configuration.ssh.config_file # type: ignore except AttributeError: pass if platform is not None: # Look platform up in corresponding map, if no entry return the host.nos unmodified platform = napalm_to_netmiko_map.get(platform, platform) parameters["device_type"] = platform extras = extras or {} parameters.update(extras) self.connection = ConnectHandler(**parameters) def close(self) -> None: self.connection.disconnect() PK!{&nornir/plugins/connections/paramiko.pyimport os from typing import Any, Dict, Optional from nornir.core.configuration import Config from nornir.core.connections import ConnectionPlugin import paramiko class Paramiko(ConnectionPlugin): """ This plugin connects to the device with paramiko to the device and sets the relevant connection. Inventory: extras: maps to argument passed to ``ConnectHandler``. """ def open( self, hostname: Optional[str], username: Optional[str], password: Optional[str], port: Optional[int], platform: Optional[str], extras: Optional[Dict[str, Any]] = None, configuration: Optional[Config] = None, ) -> None: extras = extras or {} client = paramiko.SSHClient() client._policy = paramiko.WarningPolicy() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_config = paramiko.SSHConfig() ssh_config_file = configuration.ssh.config_file # type: ignore if os.path.exists(ssh_config_file): with open(ssh_config_file) as f: ssh_config.parse(f) parameters = { "hostname": hostname, "username": username, "password": password, "port": port, } user_config = ssh_config.lookup(hostname) for k in ("hostname", "username", "port"): if k in user_config: parameters[k] = user_config[k] if "proxycommand" in user_config: parameters["sock"] = paramiko.ProxyCommand(user_config["proxycommand"]) self.state["ssh_forward_agent"] = user_config.get("forwardagent") == "yes" # TODO configurable # if ssh_key_file: # parameters['key_filename'] = ssh_key_file if "identityfile" in user_config: parameters["key_filename"] = user_config["identityfile"] extras.update(parameters) client.connect(**extras) self.connection = client def close(self) -> None: self.connection.close() PK!$nornir/plugins/functions/__init__.pyPK! )nornir/plugins/functions/text/__init__.pyimport logging import pprint import threading from typing import List, Optional, cast from collections import OrderedDict import json from colorama import Fore, Style, init from nornir.core.task import AggregatedResult, MultiResult, Result LOCK = threading.Lock() init(autoreset=True, strip=False) def print_title(title: str) -> None: """ Helper function to print a title. """ msg = "**** {} ".format(title) print("{}{}{}{}".format(Style.BRIGHT, Fore.GREEN, msg, "*" * (80 - len(msg)))) def _get_color(result: Result, failed: bool) -> str: if result.failed or failed: color = Fore.RED elif result.changed: color = Fore.YELLOW else: color = Fore.GREEN return cast(str, color) def _print_individual_result( result: Result, host: Optional[str], attrs: List[str], failed: bool, severity_level: int, task_group: bool = False, ) -> None: if result.severity_level < severity_level: return color = _get_color(result, failed) subtitle = ( "" if result.changed is None else " ** changed : {} ".format(result.changed) ) level_name = logging.getLevelName(result.severity_level) symbol = "v" if task_group else "-" msg = "{} {}{}".format(symbol * 4, result.name, subtitle) print( "{}{}{}{} {}".format( Style.BRIGHT, color, msg, symbol * (80 - len(msg)), level_name ) ) for attribute in attrs: x = getattr(result, attribute, "") if isinstance(x, BaseException): # for consistency between py3.6 and py3.7 print(f"{x.__class__.__name__}{x.args}") elif x and not isinstance(x, str): if isinstance(x, OrderedDict): print(json.dumps(x, indent=2)) else: pprint.pprint(x, indent=2) elif x: print(x) def _print_result( result: Result, host: Optional[str] = None, attrs: List[str] = None, failed: bool = False, severity_level: int = logging.INFO, ) -> None: attrs = attrs or ["diff", "result", "stdout"] if isinstance(attrs, str): attrs = [attrs] if isinstance(result, AggregatedResult): msg = result.name print("{}{}{}{}".format(Style.BRIGHT, Fore.CYAN, msg, "*" * (80 - len(msg)))) for host, host_data in sorted(result.items()): title = ( "" if host_data.changed is None else " ** changed : {} ".format(host_data.changed) ) msg = "* {}{}".format(host, title) print( "{}{}{}{}".format(Style.BRIGHT, Fore.BLUE, msg, "*" * (80 - len(msg))) ) _print_result(host_data, host, attrs, failed, severity_level) elif isinstance(result, MultiResult): _print_individual_result( result[0], host, attrs, failed, severity_level, task_group=True ) for r in result[1:]: _print_result(r, host, attrs, failed, severity_level) color = _get_color(result[0], failed) msg = "^^^^ END {} ".format(result[0].name) print("{}{}{}{}".format(Style.BRIGHT, color, msg, "^" * (80 - len(msg)))) elif isinstance(result, Result): _print_individual_result(result, host, attrs, failed, severity_level) def print_result( result: Result, host: Optional[str] = None, vars: List[str] = None, failed: bool = False, severity_level: int = logging.INFO, ) -> None: """ Prints the :obj:`nornir.core.task.Result` from a previous task to screen Arguments: result: from a previous task host: # TODO vars: Which attributes you want to print failed: if ``True`` assume the task failed severity_level: Print only errors with this severity level or higher """ LOCK.acquire() try: _print_result(result, host, vars, failed, severity_level) finally: LOCK.release() PK!$nornir/plugins/inventory/__init__.pyPK!5""#nornir/plugins/inventory/ansible.pyimport configparser as cp import logging import os from collections import defaultdict from pathlib import Path from typing import Any, DefaultDict, Dict, MutableMapping, Optional, Tuple, Union, cast from mypy_extensions import TypedDict from nornir.core.deserializer.inventory import ( DefaultsDict, GroupsDict, HostsDict, Inventory, InventoryElement, VarsDict, ) import ruamel.yaml from ruamel.yaml.composer import ComposerError from ruamel.yaml.scanner import ScannerError VARS_FILENAME_EXTENSIONS = ["", ".yml", ".yaml"] YAML = ruamel.yaml.YAML(typ="safe") logger = logging.getLogger(__name__) AnsibleHostsDict = Dict[str, Optional[VarsDict]] AnsibleGroupDataDict = TypedDict( "AnsibleGroupDataDict", {"children": Dict[str, Any], "vars": VarsDict, "hosts": AnsibleHostsDict}, total=False, ) # bug: https://github.com/python/mypy/issues/5357 AnsibleGroupsDict = Dict[str, AnsibleGroupDataDict] class AnsibleParser(object): def __init__(self, hostsfile: str) -> None: self.hostsfile = hostsfile self.path = os.path.dirname(hostsfile) self.hosts: HostsDict = {} self.groups: GroupsDict = {} self.defaults: DefaultsDict = {"data": {}} self.original_data: Optional[AnsibleGroupsDict] = None self.load_hosts_file() def parse_group( self, group: str, data: AnsibleGroupDataDict, parent: Optional[str] = None ) -> None: data = data or {} if group == "defaults": group_file = "all" dest_group = self.defaults else: self.add(group, self.groups) group_file = group dest_group = self.groups[group] if parent and parent != "defaults": dest_group["groups"].append(parent) group_data = data.get("vars", {}) vars_file_data = self.read_vars_file(group_file, self.path, False) or {} self.normalize_data(dest_group, group_data, vars_file_data) self.map_nornir_vars(dest_group) self.parse_hosts(data.get("hosts", {}), parent=group) for children, children_data in data.get("children", {}).items(): self.parse_group( children, cast(AnsibleGroupDataDict, children_data), parent=group ) def parse(self) -> None: if self.original_data is not None: self.parse_group("defaults", self.original_data["all"]) self.sort_groups() def parse_hosts( self, hosts: AnsibleHostsDict, parent: Optional[str] = None ) -> None: for host, data in hosts.items(): data = data or {} self.add(host, self.hosts) if parent and parent != "defaults": self.hosts[host]["groups"].append(parent) vars_file_data = self.read_vars_file(host, self.path, True) self.normalize_data(self.hosts[host], data, vars_file_data) self.map_nornir_vars(self.hosts[host]) def normalize_data( self, host: HostsDict, data: Dict[str, Any], vars_data: Dict[str, Any] ) -> None: reserved_fields = InventoryElement.__fields__.keys() self.map_nornir_vars(data) for k, v in data.items(): if k in reserved_fields: host[k] = v else: host["data"][k] = v self.map_nornir_vars(vars_data) for k, v in vars_data.items(): if k in reserved_fields: host[k] = v else: host["data"][k] = v def sort_groups(self) -> None: for host in self.hosts.values(): host["groups"].sort() for name, group in self.groups.items(): if name == "defaults": continue group["groups"].sort() @staticmethod def read_vars_file(element: str, path: str, is_host: bool = True) -> VarsDict: sub_dir = "host_vars" if is_host else "group_vars" vars_dir = Path(path) / sub_dir if vars_dir.is_dir(): vars_file_base = vars_dir / element for extension in VARS_FILENAME_EXTENSIONS: vars_file = vars_file_base.with_suffix( vars_file_base.suffix + extension ) if vars_file.is_file(): with open(vars_file) as f: logger.debug("AnsibleInventory: reading var file %r", vars_file) return cast(Dict[str, Any], YAML.load(f)) logger.debug( "AnsibleInventory: no vars file was found with the path %r " "and one of the supported extensions: %s", vars_file_base, VARS_FILENAME_EXTENSIONS, ) return {} @staticmethod def map_nornir_vars(obj: VarsDict) -> None: mappings = { "ansible_host": "hostname", "ansible_port": "port", "ansible_user": "username", "ansible_password": "password", } for ansible_var, nornir_var in mappings.items(): if ansible_var in obj: obj[nornir_var] = obj.pop(ansible_var) @staticmethod def add(element: str, element_dict: Dict[str, VarsDict]) -> None: if element not in element_dict: element_dict[element] = {"groups": [], "data": {}} def load_hosts_file(self) -> None: raise NotImplementedError class INIParser(AnsibleParser): @staticmethod def normalize_value(value: str) -> Union[str, int]: try: return int(value) except (ValueError, TypeError): return value @staticmethod def normalize_content(content: str) -> VarsDict: result: VarsDict = {} if not content: return result for option in content.split(): key, value = option.split("=") result[key] = INIParser.normalize_value(value) return result @staticmethod def process_meta( meta: Optional[str], section: MutableMapping[str, str] ) -> Dict[str, Any]: if meta == "vars": return { key: INIParser.normalize_value(value) for key, value in section.items() } elif meta == "children": return {group_name: {} for group_name in section} else: raise ValueError(f"Unknown tag {meta}") def normalize(self, data: cp.ConfigParser) -> Dict[str, AnsibleGroupDataDict]: groups: DefaultDict[str, Dict[str, Any]] = defaultdict(dict) # Dict[str, AnsibleGroupDataDict] does not work because of # https://github.com/python/mypy/issues/5359 result: Dict[str, Dict[str, Dict[str, Dict[str, Any]]]] = { "all": {"children": groups} } for section_name, section in data.items(): if section_name == "DEFAULT": continue if ":" in section_name: group_name, meta = section_name.split(":") subsection = self.process_meta(meta, section) if group_name == "all": result["all"][meta] = subsection else: groups[group_name][meta] = subsection else: groups[section_name]["hosts"] = { host: self.normalize_content(host_vars) for host, host_vars in section.items() } return cast(AnsibleGroupsDict, result) def load_hosts_file(self) -> None: original_data = cp.ConfigParser( interpolation=None, allow_no_value=True, delimiters=" =" ) original_data.read(self.hostsfile) self.original_data = self.normalize(original_data) class YAMLParser(AnsibleParser): def load_hosts_file(self) -> None: with open(self.hostsfile, "r") as f: self.original_data = cast(AnsibleGroupsDict, YAML.load(f)) def parse(hostsfile: str) -> Tuple[HostsDict, GroupsDict, DefaultsDict]: try: parser: AnsibleParser = INIParser(hostsfile) except cp.Error: try: parser = YAMLParser(hostsfile) except (ScannerError, ComposerError): logger.error("AnsibleInventory: file %r is not INI or YAML file", hostsfile) raise parser.parse() return parser.hosts, parser.groups, parser.defaults class AnsibleInventory(Inventory): def __init__(self, hostsfile: str = "hosts", *args: Any, **kwargs: Any) -> None: host_vars, group_vars, defaults = parse(hostsfile) super().__init__( hosts=host_vars, groups=group_vars, defaults=defaults, *args, **kwargs ) PK!d? ? "nornir/plugins/inventory/netbox.pyimport os from typing import Any, Dict, Optional from nornir.core.deserializer.inventory import Inventory, HostsDict import requests class NBInventory(Inventory): def __init__( self, nb_url: Optional[str] = None, nb_token: Optional[str] = None, use_slugs: bool = True, flatten_custom_fields: bool = True, filter_parameters: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: """ Netbox plugin Arguments: nb_url: Netbox url, defaults to http://localhost:8080. You can also use env variable NB_URL nb_token: Netbokx token. You can also use env variable NB_TOKEN use_slugs: Whether to use slugs or not flatten_custom_fields: Whether to assign custom fields directly to the host or not filter_parameters: Key-value pairs to filter down hosts """ filter_parameters = filter_parameters or {} nb_url = nb_url or os.environ.get("NB_URL", "http://localhost:8080") nb_token = nb_token or os.environ.get( "NB_TOKEN", "0123456789abcdef0123456789abcdef01234567" ) headers = {"Authorization": "Token {}".format(nb_token)} # Create dict of hosts using 'devices' from NetBox r = requests.get( "{}/api/dcim/devices/?limit=0".format(nb_url), headers=headers, params=filter_parameters, ) r.raise_for_status() nb_devices = r.json() hosts = {} for d in nb_devices["results"]: host: HostsDict = {"data": {}} # Add value for IP address if d.get("primary_ip", {}): host["hostname"] = d["primary_ip"]["address"].split("/")[0] # Add values that don't have an option for 'slug' host["data"]["serial"] = d["serial"] host["data"]["vendor"] = d["device_type"]["manufacturer"]["name"] host["data"]["asset_tag"] = d["asset_tag"] if flatten_custom_fields: for cf, value in d["custom_fields"].items(): host["data"][cf] = value else: host["data"]["custom_fields"] = d["custom_fields"] # Add values that do have an option for 'slug' if use_slugs: host["data"]["site"] = d["site"]["slug"] host["data"]["role"] = d["device_role"]["slug"] host["data"]["model"] = d["device_type"]["slug"] # Attempt to add 'platform' based of value in 'slug' host["platform"] = d["platform"]["slug"] if d["platform"] else None else: host["data"]["site"] = d["site"]["name"] host["data"]["role"] = d["device_role"] host["data"]["model"] = d["device_type"] host["platform"] = d["platform"] # Assign temporary dict to outer dict hosts[d["name"]] = host # Pass the data back to the parent class super().__init__(hosts=hosts, groups={}, defaults={}, **kwargs) PK!Е nornir/plugins/inventory/nsot.pyimport os from typing import Any from nornir.core.deserializer.inventory import Inventory, InventoryElement import requests class NSOTInventory(Inventory): """ Inventory plugin that uses `nsot `_ as backend. Note: An extra attribute ``site`` will be assigned to the host. The value will be the name of the site the host belongs to. Environment Variables: * ``NSOT_URL``: Corresponds to nsot_url argument * ``NSOT_EMAIL``: Corresponds to nsot_email argument * ``NSOT_AUTH_HEADER``: Corresponds to nsot_auth_header argument * ``NSOT_SECRET_KEY``: Corresponds to nsot_secret_key argument Arguments: flatten_attributes: Assign host attributes to the root object. Useful for filtering hosts. nsot_url: URL to nsot's API (defaults to ``http://localhost:8990/api``) nsot_email: email for authtication (defaults to admin@acme.com) nsot_auth_header: String for auth_header authentication (defaults to X-NSoT-Email) nsot_secret_key: Secret Key for auth_token method. If given auth_token will be used as auth_method. """ def __init__( self, nsot_url: str = "", nsot_email: str = "", nsot_secret_key: str = "", nsot_auth_header: str = "", flatten_attributes: bool = True, *args: Any, **kwargs: Any ) -> None: nsot_url = nsot_url or os.environ.get("NSOT_URL", "http://localhost:8990/api") nsot_email = nsot_email or os.environ.get("NSOT_EMAIL", "admin@acme.com") secret_key = nsot_secret_key or os.environ.get("NSOT_SECRET_KEY") if secret_key: data = {"email": nsot_email, "secret_key": secret_key} res = requests.post("{}/authenticate/".format(nsot_url), data=data) auth_token = res.json().get("auth_token") headers = { "Authorization": "AuthToken {}:{}".format(nsot_email, auth_token) } else: nsot_auth_header = nsot_auth_header or os.environ.get( "NSOT_AUTH_HEADER", "X-NSoT-Email" ) headers = {nsot_auth_header: nsot_email} devices = requests.get("{}/devices".format(nsot_url), headers=headers).json() sites = requests.get("{}/sites".format(nsot_url), headers=headers).json() interfaces = requests.get( "{}/interfaces".format(nsot_url), headers=headers ).json() # We resolve site_id and assign "site" variable with the name of the site for d in devices: d["data"] = {"site": sites[d["site_id"] - 1]["name"], "interfaces": {}} remove_keys = [] for k, v in d.items(): if k not in InventoryElement().fields: remove_keys.append(k) d["data"][k] = v for r in remove_keys: d.pop(r) if flatten_attributes: # We assign attributes to the root for k, v in d["data"].pop("attributes").items(): d["data"][k] = v # We assign the interfaces to the hosts for i in interfaces: devices[i["device"] - 1]["data"]["interfaces"][i["name"]] = i # Finally the inventory expects a dict of hosts where the key is the hostname hosts = {d["hostname"]: d for d in devices} super().__init__(hosts=hosts, groups={}, defaults={}, *args, **kwargs) PK![%"nornir/plugins/inventory/simple.pyimport logging import os from typing import Any from nornir.core.deserializer.inventory import GroupsDict, Inventory, VarsDict import ruamel.yaml logger = logging.getLogger(__name__) class SimpleInventory(Inventory): def __init__( self, host_file: str = "hosts.yaml", group_file: str = "groups.yaml", defaults_file: str = "defaults.yaml", *args: Any, **kwargs: Any ) -> None: yml = ruamel.yaml.YAML(typ="safe") with open(host_file, "r") as f: hosts = yml.load(f) groups: GroupsDict = {} if group_file: if os.path.exists(group_file): with open(group_file, "r") as f: groups = yml.load(f) or {} else: logger.debug("File %r was not found", group_file) groups = {} defaults: VarsDict = {} if defaults_file: if os.path.exists(defaults_file): with open(defaults_file, "r") as f: defaults = yml.load(f) or {} else: logger.debug("File %r was not found", defaults_file) defaults = {} super().__init__(hosts=hosts, groups=groups, defaults=defaults, *args, **kwargs) PK! nornir/plugins/tasks/__init__.pyPK!AA%nornir/plugins/tasks/apis/__init__.pyfrom .http_method import http_method __all__ = ("http_method",) PK!|:܍(nornir/plugins/tasks/apis/http_method.pyfrom typing import Optional, Any from nornir.core.task import Result, Task import requests def http_method( task: Optional[Task] = None, method: str = "get", url: str = "", raise_for_status: bool = True, **kwargs: Any ) -> Result: """ This is a convenience task that uses `requests `_ to interact with an HTTP server. Arguments: method: HTTP method to call url: URL to connect to raise_for_status: Whether to call `raise_for_status `_ method automatically or not. For quick reference, raise_for_status will consider an error if the return code is any of 4xx or 5xx **kwargs: Keyword arguments will be passed to the `request `_ method Returns: Result object with the following attributes set: * result (``string/dict``): Body of the response. Either text or a dict if the response was a json object * response (``requests.Response``): Original `Response `_ """ r = requests.request(method, url, **kwargs) if raise_for_status: r.raise_for_status() try: content_type = r.headers["Content-type"] except KeyError: content_type = "text" result = r.json() if "application/json" == content_type else r.text return Result(host=task.host if task else None, response=r, result=result) PK!^rr)nornir/plugins/tasks/commands/__init__.pyfrom .command import command from .remote_command import remote_command __all__ = ("command", "remote_command") PK!4(nornir/plugins/tasks/commands/command.pyimport shlex import subprocess from nornir.core.exceptions import CommandError from nornir.core.task import Result, Task def command(task: Task, command: str) -> Result: """ Executes a command locally Arguments: command: command to execute Returns: Result object with the following attributes set: * result (``str``): stderr or stdout * stdout (``str``): stdout * stderr (``str``): stderr Raises: :obj:`nornir.core.exceptions.CommandError`: when there is a command error """ cmd = subprocess.Popen( shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, ) stdout, stderr = cmd.communicate() stdout = stdout.decode() stderr = stderr.decode() if cmd.poll(): raise CommandError(command, cmd.returncode, stdout, stderr) result = stderr if stderr else stdout return Result(result=result, host=task.host, stderr=stderr, stdout=stdout) PK!C/nornir/plugins/tasks/commands/remote_command.pyfrom nornir.core.exceptions import CommandError from nornir.core.task import Result, Task from paramiko.agent import AgentRequestHandler def remote_command(task: Task, command: str) -> Result: """ Executes a command remotely on the host Arguments: command (``str``): command to execute Returns: Result object with the following attributes set: * result (``str``): stderr or stdout * stdout (``str``): stdout * stderr (``str``): stderr Raises: :obj:`nornir.core.exceptions.CommandError`: when there is a command error """ client = task.host.get_connection("paramiko", task.nornir.config) connection_state = task.host.get_connection_state("paramiko") chan = client.get_transport().open_session() if connection_state["ssh_forward_agent"]: AgentRequestHandler(chan) chan.exec_command(command) with chan.makefile() as f: stdout = f.read().decode() with chan.makefile_stderr() as f: stderr = f.read().decode() exit_status_code = chan.recv_exit_status() if exit_status_code: raise CommandError(command, exit_status_code, stdout, stderr) result = stderr if stderr else stdout return Result(result=result, host=task.host, stderr=stderr, stdout=stdout) PK!P%nornir/plugins/tasks/data/__init__.pyfrom .load_json import load_json from .load_yaml import load_yaml from .echo_data import echo_data __all__ = ("load_json", "load_yaml", "echo_data") PK!&nornir/plugins/tasks/data/echo_data.pyfrom typing import Any from nornir.core.task import Result, Task def echo_data(task: Task, **kwargs: Any) -> Result: """ Dummy task that echoes the data passed to it. Useful in grouped_tasks to debug data passed to tasks. Arguments: ``**kwargs``: Any pair you want Returns: Result object with the following attributes set: * result (``dict``): ``**kwargs`` passed to the task """ return Result(host=task.host, result=kwargs) PK!"$))&nornir/plugins/tasks/data/load_json.pyimport json from typing import Any, Dict, MutableMapping, Type from nornir.core.task import Result, Task def load_json(task: Task, file: str) -> Result: """ Loads a json file. Arguments: file: path to the file containing the json file to load Examples: Simple example with ``ordered_dict``:: > nr.run(task=load_json, file="mydata.json") file: path to the file containing the json file to load Returns: Result object with the following attributes set: * result (``dict``): dictionary with the contents of the file """ kwargs: Dict[str, Type[MutableMapping[str, Any]]] = {} with open(file, "r") as f: data = json.loads(f.read(), **kwargs) return Result(host=task.host, result=data) PK!&&nornir/plugins/tasks/data/load_yaml.pyfrom nornir.core.task import Result, Task import ruamel.yaml def load_yaml(task: Task, file: str) -> Result: """ Loads a yaml file. Arguments: file: path to the file containing the yaml file to load Examples: Simple example with ``ordered_dict``:: > nr.run(task=load_yaml, file="mydata.yaml") Returns: Result object with the following attributes set: * result (``dict``): dictionary with the contents of the file """ with open(file, "r") as f: yml = ruamel.yaml.YAML(typ="safe") data = yml.load(f) return Result(host=task.host, result=data) PK!xB]]&nornir/plugins/tasks/files/__init__.pyfrom .sftp import sftp from .write_file import write_file __all__ = ("sftp", "write_file") PK!WQ"nornir/plugins/tasks/files/sftp.pyimport hashlib import os import stat from typing import List, Optional from nornir.core.exceptions import CommandError from nornir.core.task import Result, Task from nornir.plugins.tasks import commands import paramiko from scp import SCPClient def get_src_hash(filename: str) -> str: sha1sum = hashlib.sha1() with open(filename, "rb") as f: block = f.read(2 ** 16) while len(block) != 0: sha1sum.update(block) block = f.read(2 ** 16) return sha1sum.hexdigest() def get_dst_hash(task: Task, filename: str) -> str: command = "sha1sum {}".format(filename) try: result = commands.remote_command(task, command) if result.stdout is not None: return result.stdout.split()[0] except CommandError as e: if "No such file or directory" in e.stderr: return "" raise return "" def remote_exists(sftp_client: paramiko.SFTPClient, f: str) -> bool: try: sftp_client.stat(f) return True except IOError: return False def compare_put_files( task: Task, sftp_client: paramiko.SFTPClient, src: str, dst: str ) -> List[str]: changed = [] if os.path.isfile(src): src_hash = get_src_hash(src) try: dst_hash = get_dst_hash(task, dst) except IOError: dst_hash = "" if src_hash != dst_hash: changed.append(dst) else: if remote_exists(sftp_client, dst): for f in os.listdir(src): s = os.path.join(src, f) d = os.path.join(dst, f) changed.extend(compare_put_files(task, sftp_client, s, d)) else: changed.append(dst) return changed def compare_get_files( task: Task, sftp_client: paramiko.SFTPClient, src: str, dst: str ) -> List[str]: changed = [] if stat.S_ISREG(sftp_client.stat(src).st_mode): # is a file src_hash = get_dst_hash(task, src) try: dst_hash = get_src_hash(dst) except IOError: dst_hash = "" if src_hash != dst_hash: changed.append(dst) else: if os.path.exists(dst): for f in sftp_client.listdir(src): s = os.path.join(src, f) d = os.path.join(dst, f) changed.extend(compare_get_files(task, sftp_client, s, d)) else: changed.append(dst) return changed def get( task: Task, scp_client: SCPClient, sftp_client: paramiko.SFTPClient, src: str, dst: str, dry_run: Optional[bool] = None, ) -> List[str]: changed = compare_get_files(task, sftp_client, src, dst) if changed and not dry_run: scp_client.get(src, dst, recursive=True) return changed def put( task: Task, scp_client: SCPClient, sftp_client: paramiko.SFTPClient, src: str, dst: str, dry_run: Optional[bool] = None, ) -> List[str]: changed = compare_put_files(task, sftp_client, src, dst) if changed and not dry_run: scp_client.put(src, dst, recursive=True) return changed def sftp( task: Task, src: str, dst: str, action: str, dry_run: Optional[bool] = None ) -> Result: """ Transfer files from/to the device using sftp protocol Example:: nornir.run(files.sftp, action="put", src="README.md", dst="/tmp/README.md") Arguments: dry_run: Whether to apply changes or not src: source file dst: destination action: ``put``, ``get``. Returns: Result object with the following attributes set: * changed (``bool``): * files_changed (``list``): list of files that changed """ dry_run = task.is_dry_run(dry_run) actions = {"put": put, "get": get} client = task.host.get_connection("paramiko", task.nornir.config) scp_client = SCPClient(client.get_transport()) sftp_client = paramiko.SFTPClient.from_transport(client.get_transport()) files_changed = actions[action](task, scp_client, sftp_client, src, dst, dry_run) return Result( host=task.host, changed=bool(files_changed), files_changed=files_changed ) PK!(^(nornir/plugins/tasks/files/write_file.pyimport difflib import os from typing import List, Optional from nornir.core.task import Result, Task def _read_file(file: str) -> List[str]: if not os.path.exists(file): return [] with open(file, "r") as f: return f.read().splitlines() def _generate_diff(filename: str, content: str, append: bool) -> str: original = _read_file(filename) if append: c = list(original) c.extend(content.splitlines()) new_content = c else: new_content = content.splitlines() diff = difflib.unified_diff(original, new_content, fromfile=filename, tofile="new") return "\n".join(diff) def write_file( task: Task, filename: str, content: str, append: bool = False, dry_run: Optional[bool] = None, ) -> Result: """ Write contents to a file (locally) Arguments: dry_run: Whether to apply changes or not filename: file you want to write into content: content you want to write append: whether you want to replace the contents or append to it Returns: Result object with the following attributes set: * changed (``bool``): * diff (``str``): unified diff """ diff = _generate_diff(filename, content, append) if not task.is_dry_run(dry_run): mode = "a+" if append else "w+" with open(filename, mode=mode) as f: f.write(content) return Result(host=task.host, diff=diff, changed=bool(diff)) PK! ||+nornir/plugins/tasks/networking/__init__.pyfrom .napalm_cli import napalm_cli from .napalm_configure import napalm_configure from .napalm_get import napalm_get from .napalm_validate import napalm_validate from .netmiko_file_transfer import netmiko_file_transfer from .netmiko_send_command import netmiko_send_command from .netmiko_send_config import netmiko_send_config from .netmiko_save_config import netmiko_save_config from .tcp_ping import tcp_ping __all__ = ( "napalm_cli", "napalm_configure", "napalm_get", "napalm_validate", "netmiko_file_transfer", "netmiko_send_command", "netmiko_send_config", "netmiko_save_config", "tcp_ping", ) PK!vb6-nornir/plugins/tasks/networking/napalm_cli.pyfrom typing import List from nornir.core.task import Result, Task def napalm_cli(task: Task, commands: List[str]) -> Result: """ Run commands on remote devices using napalm Arguments: commands: commands to execute Returns: Result object with the following attributes set: * result (``dict``): result of the commands execution """ device = task.host.get_connection("napalm", task.nornir.config) result = device.cli(commands) return Result(host=task.host, result=result) PK!Įr3nornir/plugins/tasks/networking/napalm_configure.pyfrom typing import Optional from nornir.core.task import Result, Task def napalm_configure( task: Task, dry_run: Optional[bool] = None, filename: Optional[str] = None, configuration: Optional[str] = None, replace: bool = False, ) -> Result: """ Loads configuration into a network devices using napalm Arguments: dry_run: Whether to apply changes or not filename: filename containing the configuration to load into the device configuration: configuration to load into the device replace: whether to replace or merge the configuration Returns: Result object with the following attributes set: * changed (``bool``): whether the task is changing the system or not * diff (``string``): change in the system """ device = task.host.get_connection("napalm", task.nornir.config) if replace: device.load_replace_candidate(filename=filename, config=configuration) else: device.load_merge_candidate(filename=filename, config=configuration) diff = device.compare_config() dry_run = task.is_dry_run(dry_run) if not dry_run and diff: device.commit_config() else: device.discard_config() return Result(host=task.host, diff=diff, changed=len(diff) > 0) PK!{ GOO-nornir/plugins/tasks/networking/napalm_get.pyimport copy from typing import Any, Dict, List, Optional from nornir.core.task import Result, Task GetterOptionsDict = Optional[Dict[str, Dict[str, Any]]] def napalm_get( task: Task, getters: List[str], getters_options: GetterOptionsDict = None, **kwargs: Any ) -> Result: """ Gather information from network devices using napalm Arguments: getters: getters to use getters_options (dict of dicts): When passing multiple getters you pass a dictionary where the outer key is the getter name and the included dictionary represents the options to pass to the getter **kwargs: will be passed as they are to the getters Examples: Simple example:: > nr.run(task=napalm_get, > getters=["interfaces", "facts"]) Passing options using ``**kwargs``:: > nr.run(task=napalm_get, > getters=["config"], > retrieve="all") Passing options using ``getters_options``:: > nr.run(task=napalm_get, > getters=["config", "interfaces"], > getters_options={"config": {"retrieve": "all"}}) Returns: Result object with the following attributes set: * result (``dict``): dictionary with the result of the getter """ device = task.host.get_connection("napalm", task.nornir.config) getters_options = getters_options or {} if isinstance(getters, str): getters = [getters] result = {} for g in getters: options = copy.deepcopy(kwargs) options.update(getters_options.get(g, {})) getter = g if g.startswith("get_") else "get_{}".format(g) method = getattr(device, getter) result[g] = method(**options) return Result(host=task.host, result=result) PK!h x2nornir/plugins/tasks/networking/napalm_validate.pyfrom typing import Any, Dict, Optional from nornir.core.task import Result, Task ValidationSourceData = Optional[Dict[str, Dict[str, Any]]] def napalm_validate( task: Task, src: Optional[str] = None, validation_source: ValidationSourceData = None, ) -> Result: """ Gather information with napalm and validate it: http://napalm.readthedocs.io/en/develop/validate/index.html Arguments: src: file to use as validation source validation_source (list): data to validate device's state Returns: Result object with the following attributes set: * result (``dict``): dictionary with the result of the validation * complies (``bool``): Whether the device complies or not """ device = task.host.get_connection("napalm", task.nornir.config) r = device.compliance_report( validation_file=src, validation_source=validation_source ) return Result(host=task.host, result=r) PK!~Gee8nornir/plugins/tasks/networking/netmiko_file_transfer.pyfrom typing import Any from netmiko import file_transfer from nornir.core.task import Result, Task def netmiko_file_transfer( task: Task, source_file: str, dest_file: str, **kwargs: Any ) -> Result: """ Execute Netmiko file_transfer method Arguments: source_file: Source file. dest_file: Destination file. kwargs: Additional arguments to pass to file_transfer Returns: Result object with the following attributes set: * result (``bool``): file exists and MD5 is valid * changed (``bool``): the destination file was changed """ net_connect = task.host.get_connection("netmiko", task.nornir.config) kwargs.setdefault("direction", "put") scp_result = file_transfer( net_connect, source_file=source_file, dest_file=dest_file, **kwargs ) if kwargs.get("disable_md5") is True: file_valid = scp_result["file_exists"] else: file_valid = scp_result["file_exists"] and scp_result["file_verified"] return Result( host=task.host, result=file_valid, changed=scp_result["file_transferred"] ) PK!R  6nornir/plugins/tasks/networking/netmiko_save_config.pyfrom __future__ import unicode_literals from nornir.core.task import Result, Task def netmiko_save_config( task: Task, cmd: str = "", confirm: bool = False, confirm_response: str = "" ) -> Result: """ Execute Netmiko save_config method Arguments: cmd(str, optional): Command used to save the configuration. confirm(bool, optional): Does device prompt for confirmation before executing save operation confirm_response(str, optional): Response send to device when it prompts for confirmation Returns: :obj: `nornir.core.task.Result`: * result (``str``): String showing the CLI output from the save operation """ conn = task.host.get_connection("netmiko", task.nornir.config) if cmd: result = conn.save_config( cmd=cmd, confirm=confirm, confirm_response=confirm_response ) else: result = conn.save_config(confirm=confirm, confirm_response=confirm_response) return Result(host=task.host, result=result, changed=True) PK!{;QQ7nornir/plugins/tasks/networking/netmiko_send_command.pyfrom typing import Any from nornir.core.task import Result, Task def netmiko_send_command( task: Task, command_string: str, use_timing: bool = False, enable: bool = False, **kwargs: Any ) -> Result: """ Execute Netmiko send_command method (or send_command_timing) Arguments: command_string: Command to execute on the remote network device. use_timing: Set to True to switch to send_command_timing method. enable: Set to True to force Netmiko .enable() call. kwargs: Additional arguments to pass to send_command method. Returns: Result object with the following attributes set: * result: Result of the show command (generally a string, but depends on use of TextFSM). """ net_connect = task.host.get_connection("netmiko", task.nornir.config) if enable: net_connect.enable() if use_timing: result = net_connect.send_command_timing(command_string, **kwargs) else: result = net_connect.send_command(command_string, **kwargs) return Result(host=task.host, result=result) PK!CW6nornir/plugins/tasks/networking/netmiko_send_config.pyfrom typing import Any, List, Optional from nornir.core.task import Result, Task def netmiko_send_config( task: Task, config_commands: Optional[List[str]] = None, config_file: Optional[str] = None, **kwargs: Any ) -> Result: """ Execute Netmiko send_config_set method (or send_config_from_file) Arguments: config_commands: Commands to configure on the remote network device. config_file: File to read configuration commands from. kwargs: Additional arguments to pass to method. Returns: Result object with the following attributes set: * result (``str``): string showing the CLI from the configuration changes. """ net_connect = task.host.get_connection("netmiko", task.nornir.config) net_connect.enable() if config_commands: result = net_connect.send_config_set(config_commands=config_commands, **kwargs) elif config_file: result = net_connect.send_config_from_file(config_file=config_file, **kwargs) else: raise ValueError("Must specify either config_commands or config_file") return Result(host=task.host, result=result, changed=True) PK!1+N+nornir/plugins/tasks/networking/tcp_ping.pyimport socket from typing import Optional, List from nornir.core.task import Result, Task def tcp_ping( task: Task, ports: List[int], timeout: int = 2, host: Optional[str] = None ) -> Result: """ Tests connection to a tcp port and tries to establish a three way handshake. To be used for network discovery or testing. Arguments: ports (list of int): tcp ports to ping timeout (int, optional): defaults to 2 host (string, optional): defaults to ``hostname`` Returns: Result object with the following attributes set: * result (``dict``): Contains port numbers as keys with True/False as values """ if isinstance(ports, int): ports = [ports] if isinstance(ports, list): if not all(isinstance(port, int) for port in ports): raise ValueError("Invalid value for 'ports'") else: raise ValueError("Invalid value for 'ports'") host = host or task.host.hostname result = {} for port in ports: s = socket.socket() s.settimeout(timeout) try: status = s.connect_ex((host, port)) if status == 0: connection = True else: connection = False except (socket.gaierror, socket.timeout, socket.error): connection = False finally: s.close() result[port] = connection return Result(host=task.host, result=result) PK!ŸL%nornir/plugins/tasks/text/__init__.pyfrom .template_file import template_file from .template_string import template_string __all__ = ("template_file", "template_string") PK!=ŋ??*nornir/plugins/tasks/text/template_file.pyfrom typing import Any, Optional, Dict, Callable from nornir.core.helpers import jinja_helper from nornir.core.task import Result, Task FiltersDict = Optional[Dict[str, Callable[..., str]]] def template_file( task: Task, template: str, path: str, jinja_filters: FiltersDict = None, **kwargs: Any ) -> Result: """ Renders contants of a file with jinja2. All the host data is available in the template Arguments: template: filename path: path to dir with templates jinja_filters: jinja filters to enable. Defaults to nornir.config.jinja2.filters **kwargs: additional data to pass to the template Returns: Result object with the following attributes set: * result (``string``): rendered string """ jinja_filters = jinja_filters or {} or task.nornir.config.jinja2.filters text = jinja_helper.render_from_file( template=template, path=path, host=task.host, jinja_filters=jinja_filters, **kwargs ) return Result(host=task.host, result=text) PK!̚,nornir/plugins/tasks/text/template_string.pyfrom typing import Any, Optional, Dict, Callable from nornir.core.helpers import jinja_helper from nornir.core.task import Result, Task FiltersDict = Optional[Dict[str, Callable[..., str]]] def template_string( task: Task, template: str, jinja_filters: FiltersDict = None, **kwargs: Any ) -> Result: """ Renders a string with jinja2. All the host data is available in the template Arguments: template (string): template string jinja_filters (dict): jinja filters to enable. Defaults to nornir.config.jinja2.filters **kwargs: additional data to pass to the template Returns: Result object with the following attributes set: * result (``string``): rendered string """ jinja_filters = jinja_filters or {} or task.nornir.config.jinja2.filters text = jinja_helper.render_from_string( template=template, host=task.host, jinja_filters=jinja_filters, **kwargs ) return Result(host=task.host, result=text) PK!;U],],nornir-2.1.1.dist-info/LICENSE Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "{}" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright {yyyy} {name of copyright owner} Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. PK!HڽTUnornir-2.1.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HHfnornir-2.1.1.dist-info/METADATAWmoG~bK9/@TP(Bns@RHn^yf'yYI/utSqfvEh[iSqC]BhjKJ-W.JFnI7v-ZɚZ|ވt/LOVzњJ?NhJrҸT4n:԰4$8V${ A>f;'r*RZkJOEU/KT6%;gkϏ'+-SsE}fM[ꀰϣ:|?n=QOY%)^͏8ßn +9 tײՃ\KN9)n{5 Ҫ_unwZTfeˠt%.}xOƥʍtesaeW61OM?@LEFbkEuEr2BV5MJpt5Qa|8vmD22:z\ʝVaK+Y>OW65Ð* B,H:z 1ڹ(Z \{ $T'.z*/N.ގĪQe#&$5DFb%t/4aXDZyi7,{9BvW5FOZ[#>IlXl\c}a=$!Xc`WPqvx`}ƄdEEhw+b/*ʪBJ \7֋*.qsGLQI{K! (c M&xuV!C2z\O[XY(u7g٥7λ# '۝yHN@7u])ܼ3 }=Tkk:`o9uSY5mn8H?PK!H481 gnornir-2.1.1.dist-info/RECORDǶ~dBI$9 _ھwKW]7]unVg.@z0s([ԡc5XVVZE?NU(h% D MaNtI#X8v"{#/h8K~B!Pɠ:&~z%L2rFCD(IPQ?!{X'P#5t/0L{KzurQ$fl>3˼2[)E Wۜ3suv؁F5vRڃH.H$e܍ivueW `"Y*@>BV?zhcHuj|qh:~Q LeY6- A8A9ׄ->a՚־ʣS'uLbC=.0#mシzkVȑ{5LZm5r3W{\C'i?;$jx9TB_b=ۿ'tE-$˜:{%*p{DPOPؕ\ @)*~L)Zn.PPqAɽ׳ θ %^ B~FGhڮֽBv#(!|%OcV(c>/Pc냭=UJ& ״ Fiװue!86^;ަq@,t!rLe/g{$*Z\5qUt"o$݀'I!({!֛)<h&sAcNYW]ZKۉSY-۷\@C^ oe#P,&>X.JQYNG;cRv $>U1)?C4x3ߊ쑆 Mlt$l(1p)AQRݼУ՘m <>{1+D\E Or7ڹUd8N_(ɲ1GܐW7^Y%';GruMxP/ \JbR$ `OgU!JڇlψpC+!,%A2$F%://b$M >u <0S0n i>0,* \/] *V"꽜;d >A%XSun0/%ǔ|a,v`3w?C=o~xIoZ_zND۲}ns8I< ʍA8tg|/퉒~ $I5oYA{VEOY̗M @};WhY?:n~%?iԁ.!hY`MH=4 T[yxǵ9nornir/core/deserializer/__init__.pyPK!w`)9nornir/core/deserializer/configuration.pyPK!=l..%Wnornir/core/deserializer/inventory.pyPK!M knornir/core/exceptions.pyPK!,k%7 7 vnornir/core/filter.pyPK!nRnornir/core/helpers/__init__.pyPK!"i99#enornir/core/helpers/jinja_helper.pyPK!6 ^:^:߆nornir/core/inventory.pyPK!myz2snornir/core/state.pyPK!N"N"Rnornir/core/task.pyPK!. nornir/init_nornir.pyPK!nornir/plugins/__init__.pyPK!&nornir/plugins/connections/__init__.pyPK!qM$Anornir/plugins/connections/napalm.pyPK!b]%\nornir/plugins/connections/netmiko.pyPK!{&&nornir/plugins/connections/paramiko.pyPK!$nornir/plugins/functions/__init__.pyPK! )nornir/plugins/functions/text/__init__.pyPK!$nornir/plugins/inventory/__init__.pyPK!5""#nornir/plugins/inventory/ansible.pyPK!d? ? ");nornir/plugins/inventory/netbox.pyPK!Е Gnornir/plugins/inventory/nsot.pyPK![%"Unornir/plugins/inventory/simple.pyPK! Znornir/plugins/tasks/__init__.pyPK!AA%"[nornir/plugins/tasks/apis/__init__.pyPK!|:܍([nornir/plugins/tasks/apis/http_method.pyPK!^rr)ybnornir/plugins/tasks/commands/__init__.pyPK!4(2cnornir/plugins/tasks/commands/command.pyPK!C/qgnornir/plugins/tasks/commands/remote_command.pyPK!P%lnornir/plugins/tasks/data/__init__.pyPK!&mnornir/plugins/tasks/data/echo_data.pyPK!"$))&onornir/plugins/tasks/data/load_json.pyPK!&&Wsnornir/plugins/tasks/data/load_yaml.pyPK!xB]]&4vnornir/plugins/tasks/files/__init__.pyPK!WQ"vnornir/plugins/tasks/files/sftp.pyPK!(^(nornir/plugins/tasks/files/write_file.pyPK! ||+ԍnornir/plugins/tasks/networking/__init__.pyPK!vb6-nornir/plugins/tasks/networking/napalm_cli.pyPK!Įr3nornir/plugins/tasks/networking/napalm_configure.pyPK!{ GOO-dnornir/plugins/tasks/networking/napalm_get.pyPK!h x2nornir/plugins/tasks/networking/napalm_validate.pyPK!~Gee8nornir/plugins/tasks/networking/netmiko_file_transfer.pyPK!R  6Өnornir/plugins/tasks/networking/netmiko_save_config.pyPK!{;QQ72nornir/plugins/tasks/networking/netmiko_send_command.pyPK!CW6رnornir/plugins/tasks/networking/netmiko_send_config.pyPK!1+N+nornir/plugins/tasks/networking/tcp_ping.pyPK!ŸL%˼nornir/plugins/tasks/text/__init__.pyPK!=ŋ??*nornir/plugins/tasks/text/template_file.pyPK!̚,nornir/plugins/tasks/text/template_string.pyPK!;U],],Fnornir-2.1.1.dist-info/LICENSEPK!HڽTUnornir-2.1.1.dist-info/WHEELPK!HHfmnornir-2.1.1.dist-info/METADATAPK!H481 gnornir-2.1.1.dist-info/RECORDPK99O