PK ! Ё bics_nornir/.gitignore# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
.pytest_cache/
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
*.log.*
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
docs/configuration/generated/*.rst
# PyBuilder
target/
# IPython Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# dotenv
.env
# virtualenv
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject
*.swp
tags
.vagrant
.vars
output/
.DS_Store
.pytest_cache/
.mypy_cache/
.vscode
PK ! bics_nornir/__init__.pyPK ! bics_nornir/plugins/__init__.pyPK ! + bics_nornir/plugins/connections/__init__.pyPK ! Mg! ! + bics_nornir/plugins/connections/ncclient.pyimport json
import difflib
from copy import deepcopy
from collections import OrderedDict
from typing import Any, Dict, List, Optional
from ncclient import manager
import xmltodict
from nornir.core.configuration import Config
from nornir.core.connections import ConnectionPlugin
class Ncclient(ConnectionPlugin):
"""
This plugin connects to the device using ncclient and sets the
relevant connection.
Inventory:
extras: passed as it is to the napalm driver
"""
def open(
self,
hostname: Optional[str],
username: Optional[str],
password: Optional[str],
port: Optional[int],
platform: Optional[str],
extras: Optional[Dict[str, Any]] = None,
configuration: Optional[Config] = None,
) -> None:
extras = extras or {}
parameters: Dict[str, Any] = {
"host": hostname,
"username": username,
"password": password,
"hostkey_verify": False,
}
_connection = manager.connect(**parameters)
self.connection = self
self._connection = _connection
self.state = {
"connected": _connection.connected,
"session_id": _connection.session_id,
"timeout": _connection.timeout,
}
def close(self) -> None:
self._connection.close_session()
def get_config(
self,
source: str = "running",
path: str = None,
depth: int = None,
exclude: List[str] = None,
strip: bool = True,
path_sep: str = "/",
):
return self._get_data(
data_type="configuration",
source=source,
path=path,
depth=depth,
exclude=exclude,
strip=strip,
path_sep=path_sep,
)
def get(
self,
path: str = None,
depth: int = None,
exclude: List[str] = None,
strip: bool = True,
path_sep: str = "/",
):
return self._get_data(
data_type="state",
path=path,
depth=depth,
exclude=exclude,
strip=strip,
path_sep=path_sep,
)
def edit_config(
self,
config: Dict[str, Any],
target: str = "candidate",
default_operation: str = "merge",
path: str = "",
) -> None:
path = path.strip("/").split("/")
expanded_config = deepcopy(config)
for el in reversed(path):
if not el: # if e.g. path is "/"
continue
if "=" in el:
k, v = el.split("=")
expanded_config[k] = v
else:
expanded_config = {el: expanded_config}
try:
xml_str = xmltodict.unparse(expanded_config, full_document=False)
except ValueError as e:
raise ValueError(f"Cannot convert dict {expanded_config} to xml: {e}")
xml_str = (
''
+ xml_str
+ ""
)
self._connection.edit_config(
xml_str, target=target, default_operation=default_operation
)
def compare_config(self, path: Optional[str] = None):
"""
Compares 'running' and 'candidate' datastores on router
Requires netconf user to have access to console in addition to netconf
to query full running config (/admin display-config)
"""
source_dict = self.get_config(source="running", path=path)
dest_dict = self.get_config("candidate", path=path)
source_json = json.dumps(source_dict, indent=2)
dest_json = json.dumps(dest_dict, indent=2)
diff = ""
for line in difflib.unified_diff(
source_json.splitlines(keepends=True),
dest_json.splitlines(keepends=True),
fromfile=f"running@{path}",
tofile=f"candidate@{path}",
):
diff += line
return diff
def commit_config(self):
self._connection.commit()
def discard_config(self):
self._connection.discard_changes()
def _get_data(
self,
data_type: str = "configuration",
source: str = None,
path: str = None,
depth: int = None,
exclude: List[str] = None,
strip: bool = True,
path_sep: str = "/",
):
if data_type not in ["configuration", "state"]:
raise ValueError(f"Invalid data_type param: {data_type}")
if path:
path = path.strip(path_sep)
if path:
path = path.split(path_sep)
else:
path = []
else:
path = []
if len(path) > 0:
filter_str = self._expand_filter(path)
else:
filter_str = ""
if data_type == "configuration":
nc_filter = f"""
{filter_str}
"""
reply = self._connection.get_config(source, filter=nc_filter)
else:
nc_filter = f"""
{filter_str}
"""
reply = self._connection.get(filter=nc_filter)
if strip:
d = OrderedDict()
d = xmltodict.parse(reply.xml)["rpc-reply"]["data"]
if not d:
return {}
if data_type == "configuration":
d = d.get("configure", {})
else:
d = d.get("state", {})
del d["@xmlns"]
for node in path:
if "=" in node:
continue
if isinstance(d, list):
break
d = d.get(node)
if not d:
return {}
else:
d = xmltodict.parse(reply.xml)["rpc-reply"]["data"]
if isinstance(d, list):
d_temp = OrderedDict()
d_temp["_count"] = len(d)
for n, elem in enumerate(d):
d_temp[n] = elem
d = d_temp
if depth or exclude:
d = reduce_dict(d, depth=depth, exclude=exclude)
return d
@staticmethod
def _expand_filter(filter: List[str]) -> str:
f = filter.copy()
expanded_filter = ""
while len(f):
e = f.pop()
if "=" in e:
k, v = e.split("=")
expanded_filter += f"<{k}>{v}{k}>"
else:
expanded_filter = f"<{e}>{expanded_filter}{e}>"
return expanded_filter
def reduce_dict(d: OrderedDict, depth: int, exclude: List[str]) -> Dict[str, Any]:
if isinstance(d, OrderedDict):
result = OrderedDict()
else:
result = dict()
for k, v in d.items():
if exclude and k in exclude:
continue
if isinstance(v, list):
if len(v) > 0 and isinstance(v[0], dict):
depth -= 1
if depth > 0:
new_list = []
for e in v:
r_dict = reduce_dict(e, depth=depth, exclude=exclude)
if len(r_dict.keys()) > 0:
new_list.append(
reduce_dict(e, depth=depth, exclude=exclude)
)
if len(new_list) > 0:
result[k] = new_list
else:
break
else:
if isinstance(v, dict):
depth -= 1
if depth > 0:
v = reduce_dict(v, depth=depth, exclude=exclude)
else:
break
result[k] = v
return result
PK ! % bics_nornir/plugins/tasks/__init__.pyPK ! =5}4 4 * bics_nornir/plugins/tasks/data/__init__.pyfrom .load_merge_yaml import load_merge_yaml
from .load_service_intents import load_service_intents
from .render_templates import render_templates
from .download_configs import download_configs
__all__ = (
"load_merge_yaml",
"render_templates",
"load_service_intents",
"download_configs",
)
PK ! !N N 2 bics_nornir/plugins/tasks/data/download_configs.pyfrom nornir.core.task import Result, Task
from bics_nornir.plugins.tasks.networking import nc
def download_configs(task: Task, resources: set) -> None:
"""
Loads configs from device to inventory
Arguments:
resources: list of resource paths to load
Examples:
"""
config = {}
for rsc_path in resources:
r = task.run(
name=f"Get {rsc_path}",
task=nc.get_config,
source=nc.NcDatastore.running,
path=rsc_path,
)
config.update({rsc_path: r.result})
task.host["config"] = config
PK ! ԳbY 1 bics_nornir/plugins/tasks/data/load_merge_yaml.pyimport os
from copy import deepcopy
from nornir.core.task import Result, Task
import ruamel.yaml
def load_merge_yaml(task: Task, directory: str) -> Result:
"""
Loads intents from intent files
Arguments:
Examples:
"""
global_data = {}
group_data = {}
host_data = {}
for dirpath, _, files in os.walk(directory):
for name in files:
if not name.lower().endswith((".yml", ".yaml")):
continue
with open(os.path.join(dirpath, name), "r") as f:
yml = ruamel.yaml.YAML(typ="safe")
data = yml.load(f)
target_scope = data.pop("_target_scope", "")
target_groups = data.pop("_target_groups", [])
target_host = data.pop("_target_host", "")
if "_path" in data:
path = data["_path"]
data = {path: data}
if target_scope.upper() == "GLOBAL":
_merge(global_data, data)
elif target_scope.upper() == "GROUP":
for group in target_groups:
if task.host.has_parent_group(group):
_merge(group_data, data)
elif target_scope.upper() == "HOST":
if target_host == task.host.name:
_merge(host_data, data)
_merge(group_data, global_data)
_merge(host_data, group_data)
return Result(host=task.host, result=host_data)
def _merge(a, b):
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
_merge(a[key], b[key])
else:
pass # a always wins
else:
a[key] = b[key]
PK ! \. 6 bics_nornir/plugins/tasks/data/load_service_intents.pyimport os
from copy import deepcopy
from nornir.core.task import Result, Task
import ruamel.yaml
def load_service_intents(task: Task, directory: str) -> Result:
"""
Loads intents from intent files
Arguments:
Examples:
"""
host_data = {}
resources_to_populate = set()
for dirpath, _, files in os.walk(directory):
for name in files:
if not name.lower().endswith((".yml", ".yaml")):
continue
with open(os.path.join(dirpath, name), "r") as f:
yml = ruamel.yaml.YAML(typ="safe")
data = yml.load(f)
service_hosts = data.get("_service_hosts", [])
if task.host.name not in service_hosts:
continue
service_id = data.get("_service_id", None)
if not service_id:
raise ValueError(f"'_service_id' missing in file {name}")
for rsc in data.pop("_service_required_resources", []):
resources_to_populate.add(rsc)
host_params = data.get("host_params", {})
if task.host.name in host_params:
data.update(host_params[task.host.name])
if host_params:
del (data["host_params"])
host_data.update({service_id: data})
task.host["service_intent"] = host_data
task.host["rsc_to_populate"] = resources_to_populate
return Result(host=task.host, result=host_data)
PK ! oGW W 2 bics_nornir/plugins/tasks/data/render_templates.pyimport os
from pathlib import Path
from copy import deepcopy
from nornir.core.task import Result, Task
from ruamel.yaml import YAML
from ruamel.yaml.parser import ParserError
from jinja2 import Environment, FileSystemLoader, StrictUndefined
def render_templates(
task: Task, src_path: str, dst_path: str, dst_prefix: str = None, **kwargs
) -> Result:
"""
Loads intents from intent files
Arguments:
Examples:
"""
env = Environment(
loader=FileSystemLoader(src_path),
trim_blocks=True,
lstrip_blocks=True,
undefined=StrictUndefined,
)
yml = YAML(typ="safe")
if dst_prefix:
dst_path = Path(dst_path) / str(dst_prefix)
dst_path = Path(dst_path) / task.host.name
if not dst_path.exists():
dst_path.mkdir(parents=True)
for template in env.list_templates(extensions="j2"):
t = env.get_template(template)
text = t.render(hostname=task.host.name, **kwargs)
if not "_target_scope" in text:
continue
try:
_ = yml.load(text)
except ParserError as e:
raise ValueError(f"Cannot parse rendered {template}: {e}")
filename = dst_path / str(template.split(".")[0] + ".yml")
with open(filename, "w") as f:
f.write(text)
return Result(host=task.host, result={})
PK ! إN N 0 bics_nornir/plugins/tasks/networking/__init__.pyfrom .nc import nc_configure, get_config, get, nc_validate
__all__ = ("nc",)
PK ! K* * bics_nornir/plugins/tasks/networking/nc.pyfrom copy import deepcopy
from typing import Any, Dict, List, Optional
from enum import Enum
import json
import difflib
from nornir.core.task import Result, Task
class NcDatastore(str, Enum):
running = ("running",)
candidate = ("candidate",)
startup = "startup"
def get_config(
task: Task,
source: NcDatastore,
path: str,
exclude: List[str] = None,
depth: int = 99,
**kwargs,
) -> Result:
"""
Get configuration from specified datastore `source` on router
Arguments:
source: datastore to use (type: `NcDatastore`), e.g. `NcDatastore.candidate`
path: path to intended element (e.g.: `/router/interface`)
last element of path mqy be an '=' expression, e.g. `/router/interface/interface-name=to_sr2`
depth: level depth of nested objects. A value of 0 means all is returned
Examples:
Simple example::
> nr.run(task=nc_get_config,
> source=NcDatastore.running,
> path=["router", "interface"])
Returns:
Result object with the following attributes set:
* result (``dict``): dictionary with the result of the getter
"""
conn = task.host.get_connection("ncclient", task.nornir.config)
result = conn.get_config(
getattr(source, "name"), path=path, depth=depth, exclude=exclude, **kwargs
)
return Result(host=task.host, result=result)
def get(
task: Task, path: List[str], depth: int = 99, exclude: List[str] = None
) -> Result:
"""
Get state information from specified resource ('path') on router
Arguments:
path: list of elements from root to intended element.
(e.g.: `['router', 'interface']`)
depth: level depth of nested objects. A value of 0 means all is returned
Examples:
Simple example::
> nr.run(task=nc_get_config,
> path=["router", "interface"])
Returns:
Result object with the following attributes set:
* result (``dict``): dictionary with the result of the getter
"""
conn = task.host.get_connection("ncclient", task.nornir.config)
result = conn.get(path=path, depth=depth, exclude=exclude)
return Result(host=task.host, result=result)
def nc_validate(task: Task, source: Any, destination: Any, path: str) -> Result:
conn = task.host.get_connection("ncclient", task.nornir.config)
if isinstance(source, NcDatastore):
source_dict = conn.get_config(getattr(source, "name"), path=path)
elif isinstance(source, dict):
source_dict = source
else:
raise ValueError("parameter 'source' has invalid type")
if isinstance(destination, NcDatastore):
dest_dict = conn.get_config(getattr(destination, "name"), path=path)
elif isinstance(destination, dict):
dest_dict = destination
else:
raise ValueError("parameter 'destination' has invalid type")
source_json = json.dumps(source_dict, indent=2)
dest_json = json.dumps(dest_dict, indent=2)
result = ""
for line in difflib.unified_diff(
source_json.splitlines(keepends=True),
dest_json.splitlines(keepends=True),
fromfile=source,
tofile=destination,
):
result += line
return Result(host=task.host, result=result)
def nc_configure(
task: Task,
dry_run: Optional[bool] = None,
configuration: Optional[str] = None,
path: Optional[str] = None,
force: Optional[bool] = False
# replace: bool = False
) -> Result:
"""
Loads configuration into network device using netconf
Arguments:
dry_run: only show what would change rather than modifying config
configuration: config to load
path: path to resource to configure (/a/b/...)
force: Force change when candidate is not clean (by discarding it first)
Returns:
Result object with following attributes set:
* changed (``bool``): task has changed config or not
* diff (``str``): changes to device config
"""
conn = task.host.get_connection("ncclient", task.nornir.config)
if force:
conn.discard_config()
else:
diff = conn.compare_config()
if len(diff) > 0:
raise Exception(
f"Candidate datastore not clean! Use force=True to override\n\
{diff}"
)
config_data = deepcopy(configuration)
meta_data = {}
meta_keys = [k for k in config_data.keys() if k.startswith("_")]
for k in meta_keys:
meta_data[k] = config_data.pop(k)
conn.edit_config(config=config_data, target="candidate", path=path)
diff = conn.compare_config(path=path)
dry_run = task.is_dry_run(dry_run)
if not dry_run and diff:
conn.commit_config()
else:
conn.discard_config()
return Result(host=task.host, diff=diff, changed=len(diff) > 0)
def close(task: Task) -> None:
task.host.close_connection("ncclient")
PK ! 3̨ . bics_nornir/plugins/tasks/services/__init__.pyfrom .eline import eline
PK ! U[8 + bics_nornir/plugins/tasks/services/eline.pyfrom nornir.core.task import Result, Task
from bics_nornir.plugins.tasks.networking import nc
def eline(task: Task, **kwargs):
service_id = kwargs.pop("_service_id")
service_hosts = kwargs.pop("_service_hosts")
service_type = kwargs.pop("_service_type")
operation = kwargs.pop("_service_operation")
data = {}
data.update(kwargs)
data["service_id"] = service_id
if operation.lower() == "replace":
data["operation"] = "replace"
other_host = ""
for h in service_hosts:
if h not in task.host.name:
other_host = h
break
saps = kwargs.get("saps")
local_epipe = False
if isinstance(saps, list):
if len(saps) == 2: # local epipe
local_epipe = True
else:
raise ValueError(f"saps must be a list")
if not local_epipe:
if not other_host:
raise Exception(f"Eline {service_id} has not remote host")
other_sys_ip = task.nornir.inventory.hosts[other_host]["config"].get(
"/router/router-name=Base/interface/interface-name=system", {}
)
if not other_sys_ip:
raise Exception(f"Cannot get system-ip for {other_host} from inventory")
other_sys_ip = other_sys_ip["ipv4"]["primary"]["address"]
sdp_info = task.host["config"].get("/service/sdp", {})
sdp_id = ""
if sdp_info.get("_count", None): # multiple sdp's
for seq, attrs in sdp_info.items():
if str(seq).startswith("_"):
continue
if attrs.get("far-end", {}).get("ip-address", "") == other_sys_ip:
sdp_id = attrs.get("sdp-id")
break
else:
sdp_id = sdp_info.get("sdp-id", "")
if not sdp_id:
raise Exception(f"Cannot find appropriate SDP to {other_sys_ip}")
data["sdp_id"] = sdp_id
elif operation.lower() == "delete":
data["operation"] = "delete"
else:
raise Exception(f"unknown action {operation}")
return Result(host=task.host, result=data)
PK !HڽT U ! bics_nornir-0.1.0.dist-info/WHEEL
A
н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK !H27A $ bics_nornir-0.1.0.dist-info/METADATAUMs6WLgDВ'8:qG V$l`e.@v\_do߾HRKqVF:o/OTok(`