PK!Ёbics_nornir/.gitignore# Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class .pytest_cache/ # C extensions *.so # Distribution / packaging .Python env/ build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ *.egg-info/ .installed.cfg *.egg # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *,cover .hypothesis/ # Translations *.mo *.pot # Django stuff: *.log *.log.* local_settings.py # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ docs/configuration/generated/*.rst # PyBuilder target/ # IPython Notebook .ipynb_checkpoints # pyenv .python-version # celery beat schedule file celerybeat-schedule # dotenv .env # virtualenv venv/ ENV/ # Spyder project settings .spyderproject # Rope project settings .ropeproject *.swp tags .vagrant .vars output/ .DS_Store .pytest_cache/ .mypy_cache/ .vscode PK!bics_nornir/__init__.pyPK!bics_nornir/plugins/__init__.pyPK!+bics_nornir/plugins/connections/__init__.pyPK!Mg!!+bics_nornir/plugins/connections/ncclient.pyimport json import difflib from copy import deepcopy from collections import OrderedDict from typing import Any, Dict, List, Optional from ncclient import manager import xmltodict from nornir.core.configuration import Config from nornir.core.connections import ConnectionPlugin class Ncclient(ConnectionPlugin): """ This plugin connects to the device using ncclient and sets the relevant connection. Inventory: extras: passed as it is to the napalm driver """ def open( self, hostname: Optional[str], username: Optional[str], password: Optional[str], port: Optional[int], platform: Optional[str], extras: Optional[Dict[str, Any]] = None, configuration: Optional[Config] = None, ) -> None: extras = extras or {} parameters: Dict[str, Any] = { "host": hostname, "username": username, "password": password, "hostkey_verify": False, } _connection = manager.connect(**parameters) self.connection = self self._connection = _connection self.state = { "connected": _connection.connected, "session_id": _connection.session_id, "timeout": _connection.timeout, } def close(self) -> None: self._connection.close_session() def get_config( self, source: str = "running", path: str = None, depth: int = None, exclude: List[str] = None, strip: bool = True, path_sep: str = "/", ): return self._get_data( data_type="configuration", source=source, path=path, depth=depth, exclude=exclude, strip=strip, path_sep=path_sep, ) def get( self, path: str = None, depth: int = None, exclude: List[str] = None, strip: bool = True, path_sep: str = "/", ): return self._get_data( data_type="state", path=path, depth=depth, exclude=exclude, strip=strip, path_sep=path_sep, ) def edit_config( self, config: Dict[str, Any], target: str = "candidate", default_operation: str = "merge", path: str = "", ) -> None: path = path.strip("/").split("/") expanded_config = deepcopy(config) for el in reversed(path): if not el: # if e.g. path is "/" continue if "=" in el: k, v = el.split("=") expanded_config[k] = v else: expanded_config = {el: expanded_config} try: xml_str = xmltodict.unparse(expanded_config, full_document=False) except ValueError as e: raise ValueError(f"Cannot convert dict {expanded_config} to xml: {e}") xml_str = ( '' + xml_str + "" ) self._connection.edit_config( xml_str, target=target, default_operation=default_operation ) def compare_config(self, path: Optional[str] = None): """ Compares 'running' and 'candidate' datastores on router Requires netconf user to have access to console in addition to netconf to query full running config (/admin display-config) """ source_dict = self.get_config(source="running", path=path) dest_dict = self.get_config("candidate", path=path) source_json = json.dumps(source_dict, indent=2) dest_json = json.dumps(dest_dict, indent=2) diff = "" for line in difflib.unified_diff( source_json.splitlines(keepends=True), dest_json.splitlines(keepends=True), fromfile=f"running@{path}", tofile=f"candidate@{path}", ): diff += line return diff def commit_config(self): self._connection.commit() def discard_config(self): self._connection.discard_changes() def _get_data( self, data_type: str = "configuration", source: str = None, path: str = None, depth: int = None, exclude: List[str] = None, strip: bool = True, path_sep: str = "/", ): if data_type not in ["configuration", "state"]: raise ValueError(f"Invalid data_type param: {data_type}") if path: path = path.strip(path_sep) if path: path = path.split(path_sep) else: path = [] else: path = [] if len(path) > 0: filter_str = self._expand_filter(path) else: filter_str = "" if data_type == "configuration": nc_filter = f""" {filter_str} """ reply = self._connection.get_config(source, filter=nc_filter) else: nc_filter = f""" {filter_str} """ reply = self._connection.get(filter=nc_filter) if strip: d = OrderedDict() d = xmltodict.parse(reply.xml)["rpc-reply"]["data"] if not d: return {} if data_type == "configuration": d = d.get("configure", {}) else: d = d.get("state", {}) del d["@xmlns"] for node in path: if "=" in node: continue if isinstance(d, list): break d = d.get(node) if not d: return {} else: d = xmltodict.parse(reply.xml)["rpc-reply"]["data"] if isinstance(d, list): d_temp = OrderedDict() d_temp["_count"] = len(d) for n, elem in enumerate(d): d_temp[n] = elem d = d_temp if depth or exclude: d = reduce_dict(d, depth=depth, exclude=exclude) return d @staticmethod def _expand_filter(filter: List[str]) -> str: f = filter.copy() expanded_filter = "" while len(f): e = f.pop() if "=" in e: k, v = e.split("=") expanded_filter += f"<{k}>{v}" else: expanded_filter = f"<{e}>{expanded_filter}" return expanded_filter def reduce_dict(d: OrderedDict, depth: int, exclude: List[str]) -> Dict[str, Any]: if isinstance(d, OrderedDict): result = OrderedDict() else: result = dict() for k, v in d.items(): if exclude and k in exclude: continue if isinstance(v, list): if len(v) > 0 and isinstance(v[0], dict): depth -= 1 if depth > 0: new_list = [] for e in v: r_dict = reduce_dict(e, depth=depth, exclude=exclude) if len(r_dict.keys()) > 0: new_list.append( reduce_dict(e, depth=depth, exclude=exclude) ) if len(new_list) > 0: result[k] = new_list else: break else: if isinstance(v, dict): depth -= 1 if depth > 0: v = reduce_dict(v, depth=depth, exclude=exclude) else: break result[k] = v return result PK!%bics_nornir/plugins/tasks/__init__.pyPK!=5}44*bics_nornir/plugins/tasks/data/__init__.pyfrom .load_merge_yaml import load_merge_yaml from .load_service_intents import load_service_intents from .render_templates import render_templates from .download_configs import download_configs __all__ = ( "load_merge_yaml", "render_templates", "load_service_intents", "download_configs", ) PK!!NN2bics_nornir/plugins/tasks/data/download_configs.pyfrom nornir.core.task import Result, Task from bics_nornir.plugins.tasks.networking import nc def download_configs(task: Task, resources: set) -> None: """ Loads configs from device to inventory Arguments: resources: list of resource paths to load Examples: """ config = {} for rsc_path in resources: r = task.run( name=f"Get {rsc_path}", task=nc.get_config, source=nc.NcDatastore.running, path=rsc_path, ) config.update({rsc_path: r.result}) task.host["config"] = config PK!ԳbY1bics_nornir/plugins/tasks/data/load_merge_yaml.pyimport os from copy import deepcopy from nornir.core.task import Result, Task import ruamel.yaml def load_merge_yaml(task: Task, directory: str) -> Result: """ Loads intents from intent files Arguments: Examples: """ global_data = {} group_data = {} host_data = {} for dirpath, _, files in os.walk(directory): for name in files: if not name.lower().endswith((".yml", ".yaml")): continue with open(os.path.join(dirpath, name), "r") as f: yml = ruamel.yaml.YAML(typ="safe") data = yml.load(f) target_scope = data.pop("_target_scope", "") target_groups = data.pop("_target_groups", []) target_host = data.pop("_target_host", "") if "_path" in data: path = data["_path"] data = {path: data} if target_scope.upper() == "GLOBAL": _merge(global_data, data) elif target_scope.upper() == "GROUP": for group in target_groups: if task.host.has_parent_group(group): _merge(group_data, data) elif target_scope.upper() == "HOST": if target_host == task.host.name: _merge(host_data, data) _merge(group_data, global_data) _merge(host_data, group_data) return Result(host=task.host, result=host_data) def _merge(a, b): for key in b: if key in a: if isinstance(a[key], dict) and isinstance(b[key], dict): _merge(a[key], b[key]) else: pass # a always wins else: a[key] = b[key] PK!\.6bics_nornir/plugins/tasks/data/load_service_intents.pyimport os from copy import deepcopy from nornir.core.task import Result, Task import ruamel.yaml def load_service_intents(task: Task, directory: str) -> Result: """ Loads intents from intent files Arguments: Examples: """ host_data = {} resources_to_populate = set() for dirpath, _, files in os.walk(directory): for name in files: if not name.lower().endswith((".yml", ".yaml")): continue with open(os.path.join(dirpath, name), "r") as f: yml = ruamel.yaml.YAML(typ="safe") data = yml.load(f) service_hosts = data.get("_service_hosts", []) if task.host.name not in service_hosts: continue service_id = data.get("_service_id", None) if not service_id: raise ValueError(f"'_service_id' missing in file {name}") for rsc in data.pop("_service_required_resources", []): resources_to_populate.add(rsc) host_params = data.get("host_params", {}) if task.host.name in host_params: data.update(host_params[task.host.name]) if host_params: del (data["host_params"]) host_data.update({service_id: data}) task.host["service_intent"] = host_data task.host["rsc_to_populate"] = resources_to_populate return Result(host=task.host, result=host_data) PK!oGWW2bics_nornir/plugins/tasks/data/render_templates.pyimport os from pathlib import Path from copy import deepcopy from nornir.core.task import Result, Task from ruamel.yaml import YAML from ruamel.yaml.parser import ParserError from jinja2 import Environment, FileSystemLoader, StrictUndefined def render_templates( task: Task, src_path: str, dst_path: str, dst_prefix: str = None, **kwargs ) -> Result: """ Loads intents from intent files Arguments: Examples: """ env = Environment( loader=FileSystemLoader(src_path), trim_blocks=True, lstrip_blocks=True, undefined=StrictUndefined, ) yml = YAML(typ="safe") if dst_prefix: dst_path = Path(dst_path) / str(dst_prefix) dst_path = Path(dst_path) / task.host.name if not dst_path.exists(): dst_path.mkdir(parents=True) for template in env.list_templates(extensions="j2"): t = env.get_template(template) text = t.render(hostname=task.host.name, **kwargs) if not "_target_scope" in text: continue try: _ = yml.load(text) except ParserError as e: raise ValueError(f"Cannot parse rendered {template}: {e}") filename = dst_path / str(template.split(".")[0] + ".yml") with open(filename, "w") as f: f.write(text) return Result(host=task.host, result={}) PK!إNN0bics_nornir/plugins/tasks/networking/__init__.pyfrom .nc import nc_configure, get_config, get, nc_validate __all__ = ("nc",) PK!K**bics_nornir/plugins/tasks/networking/nc.pyfrom copy import deepcopy from typing import Any, Dict, List, Optional from enum import Enum import json import difflib from nornir.core.task import Result, Task class NcDatastore(str, Enum): running = ("running",) candidate = ("candidate",) startup = "startup" def get_config( task: Task, source: NcDatastore, path: str, exclude: List[str] = None, depth: int = 99, **kwargs, ) -> Result: """ Get configuration from specified datastore `source` on router Arguments: source: datastore to use (type: `NcDatastore`), e.g. `NcDatastore.candidate` path: path to intended element (e.g.: `/router/interface`) last element of path mqy be an '=' expression, e.g. `/router/interface/interface-name=to_sr2` depth: level depth of nested objects. A value of 0 means all is returned Examples: Simple example:: > nr.run(task=nc_get_config, > source=NcDatastore.running, > path=["router", "interface"]) Returns: Result object with the following attributes set: * result (``dict``): dictionary with the result of the getter """ conn = task.host.get_connection("ncclient", task.nornir.config) result = conn.get_config( getattr(source, "name"), path=path, depth=depth, exclude=exclude, **kwargs ) return Result(host=task.host, result=result) def get( task: Task, path: List[str], depth: int = 99, exclude: List[str] = None ) -> Result: """ Get state information from specified resource ('path') on router Arguments: path: list of elements from root to intended element. (e.g.: `['router', 'interface']`) depth: level depth of nested objects. A value of 0 means all is returned Examples: Simple example:: > nr.run(task=nc_get_config, > path=["router", "interface"]) Returns: Result object with the following attributes set: * result (``dict``): dictionary with the result of the getter """ conn = task.host.get_connection("ncclient", task.nornir.config) result = conn.get(path=path, depth=depth, exclude=exclude) return Result(host=task.host, result=result) def nc_validate(task: Task, source: Any, destination: Any, path: str) -> Result: conn = task.host.get_connection("ncclient", task.nornir.config) if isinstance(source, NcDatastore): source_dict = conn.get_config(getattr(source, "name"), path=path) elif isinstance(source, dict): source_dict = source else: raise ValueError("parameter 'source' has invalid type") if isinstance(destination, NcDatastore): dest_dict = conn.get_config(getattr(destination, "name"), path=path) elif isinstance(destination, dict): dest_dict = destination else: raise ValueError("parameter 'destination' has invalid type") source_json = json.dumps(source_dict, indent=2) dest_json = json.dumps(dest_dict, indent=2) result = "" for line in difflib.unified_diff( source_json.splitlines(keepends=True), dest_json.splitlines(keepends=True), fromfile=source, tofile=destination, ): result += line return Result(host=task.host, result=result) def nc_configure( task: Task, dry_run: Optional[bool] = None, configuration: Optional[str] = None, path: Optional[str] = None, force: Optional[bool] = False # replace: bool = False ) -> Result: """ Loads configuration into network device using netconf Arguments: dry_run: only show what would change rather than modifying config configuration: config to load path: path to resource to configure (/a/b/...) force: Force change when candidate is not clean (by discarding it first) Returns: Result object with following attributes set: * changed (``bool``): task has changed config or not * diff (``str``): changes to device config """ conn = task.host.get_connection("ncclient", task.nornir.config) if force: conn.discard_config() else: diff = conn.compare_config() if len(diff) > 0: raise Exception( f"Candidate datastore not clean! Use force=True to override\n\ {diff}" ) config_data = deepcopy(configuration) meta_data = {} meta_keys = [k for k in config_data.keys() if k.startswith("_")] for k in meta_keys: meta_data[k] = config_data.pop(k) conn.edit_config(config=config_data, target="candidate", path=path) diff = conn.compare_config(path=path) dry_run = task.is_dry_run(dry_run) if not dry_run and diff: conn.commit_config() else: conn.discard_config() return Result(host=task.host, diff=diff, changed=len(diff) > 0) def close(task: Task) -> None: task.host.close_connection("ncclient") PK!3̨.bics_nornir/plugins/tasks/services/__init__.pyfrom .eline import eline PK!U[8+bics_nornir/plugins/tasks/services/eline.pyfrom nornir.core.task import Result, Task from bics_nornir.plugins.tasks.networking import nc def eline(task: Task, **kwargs): service_id = kwargs.pop("_service_id") service_hosts = kwargs.pop("_service_hosts") service_type = kwargs.pop("_service_type") operation = kwargs.pop("_service_operation") data = {} data.update(kwargs) data["service_id"] = service_id if operation.lower() == "replace": data["operation"] = "replace" other_host = "" for h in service_hosts: if h not in task.host.name: other_host = h break saps = kwargs.get("saps") local_epipe = False if isinstance(saps, list): if len(saps) == 2: # local epipe local_epipe = True else: raise ValueError(f"saps must be a list") if not local_epipe: if not other_host: raise Exception(f"Eline {service_id} has not remote host") other_sys_ip = task.nornir.inventory.hosts[other_host]["config"].get( "/router/router-name=Base/interface/interface-name=system", {} ) if not other_sys_ip: raise Exception(f"Cannot get system-ip for {other_host} from inventory") other_sys_ip = other_sys_ip["ipv4"]["primary"]["address"] sdp_info = task.host["config"].get("/service/sdp", {}) sdp_id = "" if sdp_info.get("_count", None): # multiple sdp's for seq, attrs in sdp_info.items(): if str(seq).startswith("_"): continue if attrs.get("far-end", {}).get("ip-address", "") == other_sys_ip: sdp_id = attrs.get("sdp-id") break else: sdp_id = sdp_info.get("sdp-id", "") if not sdp_id: raise Exception(f"Cannot find appropriate SDP to {other_sys_ip}") data["sdp_id"] = sdp_id elif operation.lower() == "delete": data["operation"] = "delete" else: raise Exception(f"unknown action {operation}") return Result(host=task.host, result=data) PK!HڽTU!bics_nornir-0.1.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H27A$bics_nornir-0.1.0.dist-info/METADATAUMs6WLgDВ'8:q݌G V$l`e.@v\_do߾HRKqVF:o/OTok(`)l9}~0/Gr!,|m9c]%,ej ˚Fs1Z>#S+8D"1m"agy6M[v..AZ$ZpG$ '\QFll,׉v;ey<- 7flr.Sbx_i\V#.P=Udc^;Utt)$ S:>):'䫅#MYfEPK!H1+"bics_nornir-0.1.0.dist-info/RECORD98|~ x8li6Pj`kwSd6$yOR>uasC@E _9 = luMlANp 66C,HrQ1$QhR2Lɶv贮|ZxCY ;y_L14&p"0y7|!;~:# V$ET4HܗMg1iQjX[7/BreY*It髁v23wBVOq},$ PL!3{F$ `5ڄ$&%(R.W < [e•ƺeb%t#L?ϕ}$=Nyy׻tp0]SRv"kBHpK9l͔T]i2q/fYd'4O6gy m cЙwRUgr]