PKAr|M!be e pyntc/__init__.py"""Kickoff functions for getting instancs of device objects. """ import os from .devices import supported_devices from .errors import UnsupportedDeviceError, DeviceNameNotFoundError, ConfFileNotFoundError try: from configparser import ConfigParser as SafeConfigParser except ImportError: from ConfigParser import SafeConfigParser __version__ = "0.0.9" LIB_PATH_ENV_VAR = "PYNTC_CONF" LIB_PATH_DEFAULT = "~/.ntc.conf" def ntc_device(device_type, *args, **kwargs): """Instantiate and return an instance of a device subclassed from ``pyntc.devices.BaseDevice``. ``*args`` and ``*kwargs`` are passed directly to the device initializer. Arguments: device_type (string): A valid device_type listed in ``pyntc.devices.supported_devices`` Returns: An instance of a subclass of ``pyntc.devices.BaseDevice``. Raises: UnsupportedDeviceError: if the device_type is unsupported. """ try: device_class = supported_devices[device_type] return device_class(*args, **kwargs) except KeyError: raise UnsupportedDeviceError(device_type) def ntc_device_by_name(name, filename=None): """Instantiate and return an instance of a device subclassed from ``pyntc.devices.BaseDevice`` based on its name in an NTC configuration file. If no filename is given the environment variable PYNTC_CONF is checked for a path, and then ~/.ntc.conf. Arguments: name (string): Name of the device as listed in teh NTC configuration file. filename (string): (Optional) Path to NTC configuration file that includes the ``name`` argument as section header. Raises: DeviceNameNotFoundError: if the name is not found in the NTC configuration file. ConfFileNotFoundError: if no NTC configuration can be found. """ config, filename = _get_config_from_file(filename=filename) sections = config.sections() if not sections: raise ConfFileNotFoundError(filename) for section in sections: if ":" in section: device_type_and_conn_name = section.split(":") device_type = device_type_and_conn_name[0] conn_name = device_type_and_conn_name[1] if name == conn_name: device_kwargs = dict(config.items(section)) if "host" not in device_kwargs: device_kwargs["host"] = name return ntc_device(device_type, **device_kwargs) raise DeviceNameNotFoundError(name, filename) def _get_config_from_file(filename=None): if filename is None: if LIB_PATH_ENV_VAR in os.environ: filename = os.path.expanduser(os.environ[LIB_PATH_ENV_VAR]) else: filename = os.path.expanduser(LIB_PATH_DEFAULT) config = SafeConfigParser() config.read(filename) return config, filename PKce|M2k k pyntc/errors.pyclass NTCError(Exception): def __init__(self, message): self.message = message def __repr__(self): return "%s: \n%s" % (self.__class__.__name__, self.message) __str__ = __repr__ class UnsupportedDeviceError(NTCError): def __init__(self, vendor): message = "%s is not a supported vendor." % vendor super(UnsupportedDeviceError, self).__init__(message) class DeviceNameNotFoundError(NTCError): def __init__(self, name, filename): message = "Name %s not found in %s. The file may not exist." % (name, filename) super(DeviceNameNotFoundError, self).__init__(message) class ConfFileNotFoundError(NTCError): def __init__(self, filename): message = "NTC Configuration file %s could not be found." % filename super(ConfFileNotFoundError, self).__init__(message) class CommandError(NTCError): def __init__(self, command, message): self.cli_error_msg = message message = "Command %s was not successful: %s" % (command, message) super(CommandError, self).__init__(message) class CommandListError(NTCError): def __init__(self, commands, command, message): self.commands = commands self.command = command message = "\nCommand %s failed with message: %s" % (command, message) message += "\nCommand List: \n" for command in commands: message += "\t%s\n" % command super(CommandListError, self).__init__(message) class FeatureNotFoundError(NTCError): def __init__(self, feature, device_type): message = "%s feature not found for %s device type." % (feature, device_type) super(FeatureNotFoundError, self).__init__(message) class FileSystemNotFoundError(NTCError): def __init__(self, hostname, command): message = 'Unable to parse "{0}" command to identify the default file system on {1}.'.format(command, hostname) super(FileSystemNotFoundError, self).__init__(message) class RebootTimeoutError(NTCError): def __init__(self, hostname, wait_time): message = "Unable to reconnect to {0} after {1} seconds".format(hostname, wait_time) super(RebootTimeoutError, self).__init__(message) class NotEnoughFreeSpaceError(NTCError): def __init__(self, hostname, min_space): message = "{0} does not meet the minimum disk space requirements of {1}".format(hostname, min_space) super(NotEnoughFreeSpace, self).__init__(message) class OSInstallError(NTCError): def __init__(self, hostname, desired_boot): message = "{0} was unable to boot into {1}".format(hostname, desired_boot) super(OSInstallError, self).__init__(message) class NTCFileNotFoundError(NTCError): def __init__(self, hostname, file, dir): message = "{0} was not found in {1} on {2}".format(file, dir, hostname) super(NTCFileNotFoundError, self).__init__(message) PKce|Mpyntc/data_model/__init__.pyPKce|M4^O pyntc/data_model/converters.py"""This module provides methods for manipulating and converting data received from an API to a desired format. """ import collections import sys def convert_dict_by_key(original, key_map, fill_in=False, whitelist=[], blacklist=[]): """Use a key map to convert a dictionary to desired keys. Args: original (dict): Original dictionary to be converted. key_map (dict): Key map to use to convert dictionary. fill_in (dict): Whether the returned dictionary should contain keys and values from the original dictionary if not specified in the key map. whitelist: If fill_in is True, and whitelist isn't empty, only fill in the keys in the whitelist in the returned dictionary. blacklist: If fill_in is True, and blacklist isn't empty, fill in with all keys from the original dictionary besides those in the blacklist. Returns: A converted dictionary through the key map. """ converted = {} for converted_key in key_map: original_key = key_map[converted_key] converted[converted_key] = recursive_key_lookup(original_key, original) if fill_in: original_key_subset = [] # ignore complex values in key map key_map_values = list(x for x in key_map.values() if not isinstance(x, list)) if whitelist: original_key_subset.extend(list(set(whitelist) - set(key_map_values))) else: original_key_subset.extend(list(set(original.keys()) - set(blacklist) - set(key_map_values))) for original_key in original_key_subset: if original_key in original: converted[original_key] = original[original_key] return converted def convert_list_by_key(original_list, key_map, fill_in=False, whitelist=[], blacklist=[]): """Apply a dictionary conversion for all dictionaries in original_list. """ converted_list = [] for original in list(original_list): converted_list.append( convert_dict_by_key(original, key_map, fill_in=fill_in, whitelist=whitelist, blacklist=blacklist) ) return converted_list def recursive_key_lookup(keys, obj): """Return obj[keys] if keys is actually a single key. Otherwise return obj[keys[0]][keys[1]]...[keys[n]] if keys is a list. """ if not isinstance(keys, list): return obj.get(keys) for key in keys: if obj is not None: obj = obj.get(key) return obj def strip_unicode(data): """Return the original data but with all internal instances of type (unicode) converted to type (str). """ if sys.version_info.major >= 3: return data if isinstance(data, basestring): return str(data) elif isinstance(data, collections.Mapping): return dict(map(strip_unicode, data.iteritems())) elif isinstance(data, collections.Iterable): return type(data)(map(strip_unicode, data)) else: return data PKce|M%pyntc/data_model/key_maps/__init__.pyPKce|MVK)pyntc/data_model/key_maps/eos_key_maps.py# Key maps for EOS devices are stored here. BASIC_FACTS_KM = {"model": "modelName", "os_version": "internalVersion", "serial_number": "serialNumber"} INTERFACES_KM = { "speed": "bandwidth", "duplex": "duplex", "vlan": ["vlanInformation", "vlanId"], "state": "linkStatus", "description": "description", } VLAN_KM = {"state": "state", "name": "name", "id": "vlan_id"} PKce|Mui)pyntc/data_model/key_maps/ios_key_maps.py# Key maps for IOS devices are stored here. BASIC_FACTS_KM = {"model": "hardware", "os_version": "version", "serial_number": "serial", "hostname": "hostname"} PKce|MSqqpyntc/devices/__init__.py"""Supported devices are stored here. Every supported device needs a device_type stored as a string, and a class subclassed from BaseDevice. """ from .eos_device import EOSDevice from .nxos_device import NXOSDevice from .ios_device import IOSDevice from .jnpr_device import JunosDevice from .asa_device import ASADevice from .f5_device import F5Device from .base_device import BaseDevice supported_devices = { "cisco_asa_ssh": ASADevice, "arista_eos_eapi": EOSDevice, "f5_tmos_icontrol": F5Device, "cisco_ios_ssh": IOSDevice, "juniper_junos_netconf": JunosDevice, "cisco_nxos_nxapi": NXOSDevice, } PKce|M`+b00pyntc/devices/asa_device.py"""Module for using a Cisco ASA device over SSH. """ import os import re import signal import time from netmiko import ConnectHandler from netmiko import FileTransfer from pyntc.templates import get_structured_data from .base_device import BaseDevice, fix_docs from .system_features.file_copy.base_file_copy import FileTransferError from pyntc.errors import ( CommandError, CommandListError, FileSystemNotFoundError, NTCError, NTCFileNotFoundError, RebootTimeoutError, OSInstallError, ) @fix_docs class ASADevice(BaseDevice): def __init__(self, host, username, password, secret="", port=22, **kwargs): super(ASADevice, self).__init__(host, username, password, vendor="cisco", device_type="cisco_asa_ssh") self.native = None self.host = host self.username = username self.password = password self.secret = secret self.port = int(port) self.global_delay_factor = kwargs.get("global_delay_factor", 1) self.delay_factor = kwargs.get("delay_factor", 1) self._connected = False self.open() def _enable(self): self.native.exit_config_mode() if not self.native.check_enable_mode(): self.native.enable() def _enter_config(self): self._enable() self.native.config_mode() def _file_copy_instance(self, src, dest=None, file_system="flash:"): if dest is None: dest = os.path.basename(src) fc = FileTransfer(self.native, src, dest, file_system=file_system) return fc def _get_file_system(self): """Determines the default file system or directory for device. Returns: str: The name of the default file system or directory for the device. Raises: FileSystemNotFound: When the module is unable to determine the default file system. """ raw_data = self.show("dir") try: file_system = re.match(r"\s*.*?(\S+:)", raw_data).group(1) return file_system except AttributeError: # TODO: Get proper hostname raise FileSystemNotFoundError(hostname=self.host, command="dir") def _image_booted(self, image_name, **vendor_specifics): version_data = self.show("show version") if re.search(image_name, version_data): return True return False def _interfaces_detailed_list(self): ip_int = self.show("show interface") ip_int_data = get_structured_data("cisco_asa_show_interface.template", ip_int) return ip_int_data def _is_catalyst(self): return self.facts["model"].startswith("WS-") def _raw_version_data(self): show_version_out = self.show("show version") try: version_data = get_structured_data("cisco_asa_show_version.template", show_version_out)[0] return version_data except IndexError: return {} def _send_command(self, command, expect=False, expect_string=""): if expect: if expect_string: response = self.native.send_command_expect(command, expect_string=expect_string) else: response = self.native.send_command_expect(command) else: response = self.native.send_command_timing(command) if "% " in response or "Error:" in response: raise CommandError(command, response) return response def _show_vlan(self): show_vlan_out = self.show("show vlan") show_vlan_data = get_structured_data("cisco_ios_show_vlan.template", show_vlan_out) return show_vlan_data def _uptime_components(self, uptime_full_string): match_days = re.search(r"(\d+) days?", uptime_full_string) match_hours = re.search(r"(\d+) hours?", uptime_full_string) match_minutes = re.search(r"(\d+) minutes?", uptime_full_string) days = int(match_days.group(1)) if match_days else 0 hours = int(match_hours.group(1)) if match_hours else 0 minutes = int(match_minutes.group(1)) if match_minutes else 0 return days, hours, minutes def _uptime_to_seconds(self, uptime_full_string): days, hours, minutes = self._uptime_components(uptime_full_string) seconds = days * 24 * 60 * 60 seconds += hours * 60 * 60 seconds += minutes * 60 return seconds def _uptime_to_string(self, uptime_full_string): days, hours, minutes = self._uptime_components(uptime_full_string) return "%02d:%02d:%02d:00" % (days, hours, minutes) def _wait_for_device_reboot(self, timeout=3600): start = time.time() while time.time() - start < timeout: try: self.open() return except: pass # TODO: Get proper hostname parameter raise RebootTimeoutError(hostname=self.host, wait_time=timeout) def backup_running_config(self, filename): with open(filename, "w") as f: f.write(self.running_config) def checkpoint(self, checkpoint_file): self.save(filename=checkpoint_file) def close(self): if self._connected: self.native.disconnect() self._connected = False def config(self, command): self._enter_config() self._send_command(command) self.native.exit_config_mode() def config_list(self, commands): self._enter_config() entered_commands = [] for command in commands: entered_commands.append(command) try: self._send_command(command) except CommandError as e: raise CommandListError(entered_commands, command, e.cli_error_msg) self.native.exit_config_mode() @property def facts(self): """Implement this once facts' re-factor is done. """ return {} def file_copy(self, src, dest=None, file_system=None): self._enable() if file_system is None: file_system = self._get_file_system() if not self.file_copy_remote_exists(src, dest, file_system): fc = self._file_copy_instance(src, dest, file_system=file_system) # if not self.fc.verify_space_available(): # raise FileTransferError('Not enough space available.') try: fc.enable_scp() fc.establish_scp_conn() fc.transfer_file() except: raise FileTransferError finally: fc.close_scp_chan() if not self.file_copy_remote_exists(src, dest, file_system): raise FileTransferError( message="Attempted file copy, but could not validate file existed after transfer" ) # TODO: Make this an internal method since exposing file_copy should be sufficient def file_copy_remote_exists(self, src, dest=None, file_system=None): self._enable() if file_system is None: file_system = self._get_file_system() fc = self._file_copy_instance(src, dest, file_system=file_system) if fc.check_file_exists() and fc.compare_md5(): return True return False def get_boot_options(self): show_boot_out = self.show("show boot | i BOOT variable") # Improve regex to get only the first boot $var in the sequence! boot_path_regex = r"Current BOOT variable = (\S+):\/(\S+)" match = re.search(boot_path_regex, show_boot_out) if match: boot_image = match.group(2) else: boot_image = None return dict(sys=boot_image) def install_os(self, image_name, **vendor_specifics): timeout = vendor_specifics.get("timeout", 3600) if not self._image_booted(image_name): self.set_boot_options(image_name, **vendor_specifics) self.reboot(confirm=True) self._wait_for_device_reboot(timeout=timeout) if not self._image_booted(image_name): raise OSInstallError(hostname=self.facts.get("hostname"), desired_boot=image_name) return True return False def open(self): if self._connected: try: self.native.find_prompt() except: self._connected = False if not self._connected: self.native = ConnectHandler( device_type="cisco_asa", ip=self.host, username=self.username, password=self.password, port=self.port, global_delay_factor=self.global_delay_factor, secret=self.secret, verbose=False, ) self._connected = True def reboot(self, timer=0, confirm=False): if confirm: def handler(signum, frame): raise RebootSignal("Interrupting after reload") signal.signal(signal.SIGALRM, handler) signal.alarm(10) try: if timer > 0: first_response = self.show("reload in %d" % timer) else: first_response = self.show("reload") if "System configuration" in first_response: self.native.send_command_timing("no") self.native.send_command_timing("\n") except RebootSignal: signal.alarm(0) signal.alarm(0) else: print("Need to confirm reboot with confirm=True") def rollback(self, rollback_to): raise NotImplementedError @property def running_config(self): return self.show("show running-config", expect=True) def save(self, filename="startup-config"): command = "copy running-config %s" % filename # Changed to send_command_timing to not require a direct prompt return. self.native.send_command_timing(command) # If the user has enabled 'file prompt quiet' which dose not require any confirmation or feedback. # This will send return without requiring an OK. # Send a return to pass the [OK]? message - Increase delay_factor for looking for response. self.native.send_command_timing("\n", delay_factor=2) # Confirm that we have a valid prompt again before returning. self.native.find_prompt() return True def set_boot_options(self, image_name, **vendor_specifics): current_boot = self.show("show running-config | inc ^boot system ") file_system = vendor_specifics.get("file_system") if file_system is None: file_system = self._get_file_system() file_system_files = self.show("dir {0}".format(file_system)) if re.search(image_name, file_system_files) is None: raise NTCFileNotFoundError( # TODO: Update to use hostname hostname=self.host, file=image_name, dir=file_system, ) current_images = current_boot.splitlines() commands_to_exec = ["no {0}".format(image) for image in current_images] commands_to_exec.append("boot system {0}/{1}".format(file_system, image_name)) self.config_list(commands_to_exec) self.save() if self.get_boot_options()["sys"] != image_name: raise CommandError( command="boot system {0}/{1}".format(file_system, image_name), message="Setting boot command did not yield expected results", ) def show(self, command, expect=False, expect_string=""): self._enable() return self._send_command(command, expect=expect, expect_string=expect_string) def show_list(self, commands): self._enable() responses = [] entered_commands = [] for command in commands: entered_commands.append(command) try: responses.append(self._send_command(command)) except CommandError as e: raise CommandListError(entered_commands, command, e.cli_error_msg) return responses @property def startup_config(self): return self.show("show startup-config") class RebootSignal(NTCError): pass PKce|MN 11pyntc/devices/base_device.py"""The module contains the base class that all device classes must inherit from. """ import abc import importlib from pyntc.errors import NTCError, FeatureNotFoundError def fix_docs(cls): for name, func in vars(cls).items(): if hasattr(func, "__call__") and not func.__doc__: # print(func, 'needs doc') for parent in cls.__bases__: parfunc = getattr(parent, name, None) if parfunc and getattr(parfunc, "__doc__", None): func.__doc__ = parfunc.__doc__ break return cls class BaseDevice(object): __metaclass__ = abc.ABCMeta def __init__(self, host, username, password, vendor=None, device_type=None, **kwargs): self.host = host self.username = username self.password = password self.vendor = vendor self.device_type = device_type self._facts = None def _image_booted(self, image_name, **vendor_specifics): """Determines if a particular image is serving as the active OS. Args: image_name (str): The image that you would like the device to be using for active OS. vendor_specifics (kwargs): volume: Required by F5Device as F5 boots into a volume. Returns: bool: True if image is currently being used by the device, else False. """ raise NotImplementedError #################### # ABSTRACT METHODS # #################### @abc.abstractmethod def backup_running_config(self, filename): """Save a local copy of the running config. Args: filename (str): The local file path on which to save the running config. """ raise NotImplementedError @abc.abstractmethod def checkpoint(self, filename): """Save a checkpoint of the running configuration to the device. Args: filename (str): The filename to save the checkpoint as on the remote device. """ raise NotImplementedError @abc.abstractmethod def close(self): """Close the connection to the device. """ raise NotImplementedError @abc.abstractmethod def config(self, command): """Send a configuration command. Args: command (str): The command to send to the device. Raises: CommandError: If there is a problem with the supplied command. """ raise NotImplementedError @abc.abstractmethod def config_list(self, commands): """Send a list of configuration commands. Args: commands (list): A list of commands to send to the device. Raises: CommandListError: If there is a problem with one of the commands in the list. """ raise NotImplementedError @abc.abstractproperty def facts(self): """Return a dictionary of facts about the device. The dictionary must include the following keys. All keys are strings, the value type is given in parenthesis: uptime (int) vendor (str) os_version (str) interfaces (list of strings) hostname (str) fqdn (str) uptime_string (str) serial_number (str) model (str) vlans (list of strings) The dictionary can also include a vendor-specific dictionary, with the device type as a key in the outer dictionary. Example: { "uptime": 1819711, "vendor": "cisco", "os_version": "7.0(3)I2(1)", "interfaces": [ "mgmt0", "Ethernet1/1", "Ethernet1/2", "Ethernet1/3", "Ethernet1/4", "Ethernet1/5", "Ethernet1/6", ], "hostname": "n9k1", "fqdn": "N/A", "uptime_string": "21:01:28:31", "serial_number": "SAL1819S6LU", "model": "Nexus9000 C9396PX Chassis", "vlans": [ "1", "2", "3", ] } """ raise NotImplementedError @abc.abstractmethod def file_copy(self, src, dest=None, **kwargs): """Send a local file to the device. Args: src (str): Path to the local file to send. dest (str): The destination file path to be saved on remote flash. If none is supplied, the implementing class should use the basename of the source path. Keyword Args: file_system (str): Supported only for IOS and NXOS. The file system for the remote fle. If no file_system is provided, then the ``get_file_system`` method is used to determine the correct file system to use. """ raise NotImplementedError @abc.abstractmethod def file_copy_remote_exists(self, src, dest=None, **kwargs): """Check if a remote file exists. A remote file exists if it has the same name as supplied dest, and the same md5 hash as the source. Args: src (str): Path to local file to check. dest (str): The destination file path to be saved on remote the remote device. If none is supplied, the implementing class should use the basename of the source path. Keyword Args: file_system (str): Supported only for IOS and NXOS. The file system for the remote fle. If no file_system is provided, then the ``get_file_system`` method is used to determine the correct file system to use. Returns: True if the remote file exists, False if it doesn't. """ @abc.abstractmethod def get_boot_options(self): """Get current boot variables like system image and kickstart image. Returns: A dictionary, e.g. {'kick': router_kick.img, 'sys': 'router_sys.img'} """ raise NotImplementedError @abc.abstractmethod def install_os(self, image_name, **vendor_specifics): """Install the OS from specified image_name Args: image_name(str): The name of the image on the device to install. Keyword Args: kickstart (str): Option for ``NXOSDevice`` for devices that require a kickstart image. volume (str): Option for ``F5Device`` to set the target boot volume. file_system (str): Option for ``ASADevice``, ``EOSDevice``, ``IOSDevice``, and ``NXOSDevice`` to set where the OS files are stored. The default will use the ``_get_file_system`` method. timeout (int): Option for ``IOSDevice`` and ``NXOSDevice`` to set the wait time for device installation to complete. Returns: True if system has been installed during function's call, False if OS has not been installed Raises: OSInstallError: When device finishes installation process, but the running image does not match ``image_name``. CommandError: When sending a command to the device fails, or when the config status after sending a command does not yield expected results. CommandListError: When sending commands to the device fails. NotEnoughFreeSpaceError: When the device does not have enough free space for install. NTCFileNotFoundError: When the ``image_name`` is not found in the devices ``file_system``. FileSystemNotFoundError: When the ``file_system`` is left to default, and the ``file_system`` cannot be identified. RebootTimeoutError: When device is rebooted and is unreachable longer than ``timeout`` period. """ raise NotImplementedError @abc.abstractmethod def open(self): """Open a connection to the device. """ raise NotImplementedError @abc.abstractmethod def reboot(self, timer=0, confirm=False): """Reboot the device. Args: confirm(bool): if False, this method has no effect. timer(int): number of seconds to wait before rebooting. """ raise NotImplementedError @abc.abstractmethod def rollback(self, checkpoint_file): """Rollback to a checkpoint file. Args: filename (str): The filename of the checkpoint file to load into the running configuration. """ raise NotImplementedError @abc.abstractproperty def running_config(self): """Return the running configuration of the device. """ raise NotImplementedError @abc.abstractmethod def save(self, filename=None): """Save a device's running configuration. Args: filename (str): The filename on the remote device. If none is supplied, the implementing class should save to the "startup configuration". """ raise NotImplementedError @abc.abstractmethod def set_boot_options(self, image_name, **vendor_specifics): """Set boot variables like system image and kickstart image. Args: image_name: The main system image file name. Keyword Args: kickstart: Option for ``NXOSDevice`` for devices that require a kickstart image. volume: Option for ``F5Device`` to set which volume should have image installed. file_system: Option for ``ASADevice`` and ``IOSDevice`` to set which directory to use when setting the boot path. The default will use the directory returned by the ``_get_file_system()`` method. Raises: ValueError: When the boot options returned by the ``get_boot_options`` method does not match the ``image_name`` after the config command(s) have been sent to the device. """ raise NotImplementedError @abc.abstractmethod def show(self, command, raw_text=False): """Send a non-configuration command. Args: command (str): The command to send to the device. Keyword Args: raw_text (bool): Whether to return raw text or structured data. Returns: The output of the show command, which could be raw text or structured data. """ raise NotImplementedError @abc.abstractmethod def show_list(self, commands, raw_text=False): """Send a list of non-configuration commands. Args: commands (list): A list of commands to send to the device. Keyword Args: raw_text (bool): Whether to return raw text or structured data. Returns: A list of outputs for each show command """ raise NotImplementedError @abc.abstractproperty def startup_config(self): """Return the startup configuration of the device. """ raise NotImplementedError ################################# # Inherited implemented methods # ################################# def feature(self, feature_name): """Return a feature class based on the ``feature_name`` for the appropriate subclassed device type. """ try: feature_module = importlib.import_module( "pyntc.devices.system_features.%s.%s_%s" % (feature_name, self.device_type, feature_name) ) return feature_module.instance(self) except ImportError: raise FeatureNotFoundError(feature_name, self.device_type) except AttributeError: raise def refresh(self): """Refresh caches on device instance. """ self.refresh_facts() def refresh_facts(self): """Refresh cached facts. """ # Persist values that were not added by facts getter if isinstance(self._facts, dict): facts_backup = self._facts.copy() self._facts = None facts_backup.update(self.facts) self._facts = facts_backup.copy() else: self._facts = None return self.facts class FileTransferError(NTCError): pass class RebootTimerError(NTCError): def __init__(self, device_type): super(RebootTimerError, self).__init__("Reboot timer not supported on %s." % device_type) class RollbackError(NTCError): pass class SetBootImageError(NTCError): pass PKce|M~##pyntc/devices/eos_device.py"""Module for using an Arista EOS device over the eAPI. """ import re import time from pyntc.data_model.converters import convert_dict_by_key, convert_list_by_key, strip_unicode from pyntc.data_model.key_maps import eos_key_maps from .system_features.file_copy.eos_file_copy import EOSFileCopy from .system_features.vlans.eos_vlans import EOSVlans from .base_device import BaseDevice, RollbackError, RebootTimerError, fix_docs from pyntc.errors import ( CommandError, CommandListError, FileSystemNotFoundError, NTCError, NTCFileNotFoundError, RebootTimeoutError, OSInstallError, ) from pyeapi import connect as eos_connect from pyeapi.client import Node as EOSNative from pyeapi.eapilib import CommandError as EOSCommandError from .system_features.file_copy.base_file_copy import FileTransferError @fix_docs class EOSDevice(BaseDevice): def __init__(self, host, username, password, transport="http", timeout=60, **kwargs): super(EOSDevice, self).__init__(host, username, password, vendor="arista", device_type="arista_eos_eapi") self.transport = transport self.timeout = timeout self.connection = eos_connect(transport, host=host, username=username, password=password, timeout=timeout) self.native = EOSNative(self.connection) def _get_file_system(self): """Determines the default file system or directory for device. Returns: str: The name of the default file system or directory for the device. Raises: FileSystemNotFound: When the module is unable to determine the default file system. """ raw_data = self.show("dir", raw_text=True) try: file_system = re.match(r"\s*.*?(\S+:)", raw_data).group(1) return file_system except AttributeError: raise FileSystemNotFoundError(hostname=self.facts.get("hostname"), command="dir") def _get_interface_list(self): iface_detailed_list = self._interfaces_status_list() iface_list = sorted(list(x["interface"] for x in iface_detailed_list)) return iface_list def _get_vlan_list(self): vlans = EOSVlans(self) vlan_list = vlans.get_list() return vlan_list def _image_booted(self, image_name, **vendor_specifics): version_data = self.show("show boot", raw_text=True) if re.search(image_name, version_data): return True return False def _interfaces_status_list(self): interfaces_list = [] interfaces_status_dictionary = self.show("show interfaces status")["interfaceStatuses"] for key in interfaces_status_dictionary: interface_dictionary = interfaces_status_dictionary[key] interface_dictionary["interface"] = key interfaces_list.append(interface_dictionary) return convert_list_by_key(interfaces_list, eos_key_maps.INTERFACES_KM, fill_in=True, whitelist=["interface"]) def _parse_response(self, response, raw_text): if raw_text: return list(x["result"]["output"] for x in response) else: return list(x["result"] for x in response) def _uptime_to_string(self, uptime): days = uptime / (24 * 60 * 60) uptime = uptime % (24 * 60 * 60) hours = uptime / (60 * 60) uptime = uptime % (60 * 60) mins = uptime / 60 uptime = uptime % 60 seconds = uptime return "%02d:%02d:%02d:%02d" % (days, hours, mins, seconds) def _wait_for_device_reboot(self, timeout=3600): start = time.time() while time.time() - start < timeout: try: self.show("show hostname") return except: pass raise RebootTimeoutError(hostname=self.facts["hostname"], wait_time=timeout) def backup_running_config(self, filename): with open(filename, "w") as f: f.write(self.running_config) def checkpoint(self, checkpoint_file): self.show("copy running-config %s" % checkpoint_file) def close(self): pass def config(self, command): try: self.config_list([command]) except CommandListError as e: raise CommandError(e.command, e.message) def config_list(self, commands): try: self.native.config(commands) except EOSCommandError as e: raise CommandListError(commands, e.commands[len(e.commands) - 1], e.message) @property def facts(self): if self._facts is None: sh_version_output = self.show("show version") self._facts = convert_dict_by_key(sh_version_output, eos_key_maps.BASIC_FACTS_KM) self._facts["vendor"] = self.vendor uptime = int(time.time() - sh_version_output["bootupTimestamp"]) self._facts["uptime"] = uptime self._facts["uptime_string"] = self._uptime_to_string(uptime) sh_hostname_output = self.show("show hostname") self._facts.update( convert_dict_by_key(sh_hostname_output, {}, fill_in=True, whitelist=["hostname", "fqdn"]) ) self._facts["interfaces"] = self._get_interface_list() self._facts["vlans"] = self._get_vlan_list() return self._facts def file_copy(self, src, dest=None, **kwargs): if not self.file_copy_remote_exists(src, dest, **kwargs): fc = EOSFileCopy(self, src, dest) fc.send() if not self.file_copy_remote_exists(src, dest, **kwargs): raise FileTransferError( message="Attempted file copy, but could not validate file existed after transfer" ) # TODO: Make this an internal method since exposing file_copy should be sufficient def file_copy_remote_exists(self, src, dest=None, **kwargs): fc = EOSFileCopy(self, src, dest) if fc.remote_file_exists() and fc.already_transfered(): return True return False def get_boot_options(self): image = self.show("show boot-config")["softwareImage"] image = image.replace("flash:", "") return dict(sys=image) def install_os(self, image_name, **vendor_specifics): timeout = vendor_specifics.get("timeout", 3600) if not self._image_booted(image_name): self.set_boot_options(image_name, **vendor_specifics) self.reboot(confirm=True) self._wait_for_device_reboot(timeout=timeout) if not self._image_booted(image_name): raise OSInstallError(hostname=self.facts.get("hostname"), desired_boot=image_name) return True return False def open(self): pass def reboot(self, confirm=False, timer=0): if timer != 0: raise RebootTimerError(self.device_type) if confirm: self.show("reload now") else: print("Need to confirm reboot with confirm=True") def rollback(self, rollback_to): try: self.show("configure replace %s force" % rollback_to) except (CommandError, CommandListError): raise RollbackError("Rollback unsuccessful. %s may not exist." % rollback_to) @property def running_config(self): return self.show("show running-config", raw_text=True) def save(self, filename="startup-config"): self.show("copy running-config %s" % filename) return True def set_boot_options(self, image_name, **vendor_specifics): file_system = vendor_specifics.get("file_system") if file_system is None: file_system = self._get_file_system() file_system_files = self.show("dir {0}".format(file_system), raw_text=True) if re.search(image_name, file_system_files) is None: raise NTCFileNotFoundError(hostname=self.facts.get("hostname"), file=image_name, dir=file_system) self.show("install source {0}{1}".format(file_system, image_name)) if self.get_boot_options()["sys"] != image_name: raise CommandError( command="install source {0}".format(image_name), message="Setting install source did not yield expected results", ) def show(self, command, raw_text=False): try: response_list = self.show_list([command], raw_text=raw_text) return response_list[0] except CommandListError as e: raise CommandError(e.command, e.message) def show_list(self, commands, raw_text=False): if raw_text: encoding = "text" else: encoding = "json" try: return strip_unicode( self._parse_response(self.native.enable(commands, encoding=encoding), raw_text=raw_text) ) except EOSCommandError as e: raise CommandListError(commands, e.commands[len(e.commands) - 1], e.message) @property def startup_config(self): return self.show("show startup-config", raw_text=True) class RebootSignal(NTCError): pass PKce|M= min_space: return elif free_space < min_space: raise NotEnoughFreeSpaceError(hostname=self.facts.get("hostname"), min_space=min_space) def _check_md5sum(self, filename, checksum): """Checks if md5sum is correct Returns: bool - True / False if checksums match """ md5sum = self._file_copy_remote_md5(filename) if checksum == md5sum: return True else: return False @staticmethod def _file_copy_local_file_exists(filepath): return os.path.isfile(filepath) def _file_copy_local_md5(self, filepath, blocksize=2 ** 20): """Gets md5 checksum from the filepath Returns: str - if the file exists None - if the file does not exist """ if self._file_copy_local_file_exists(filepath): m = hashlib.md5() with open(filepath, "rb") as f: buf = f.read(blocksize) while buf: m.update(buf) buf = f.read(blocksize) return m.hexdigest() def _file_copy_remote_md5(self, filepath): """Gets md5 checksum of the filename Example of 'md5sum' command: [root@ntc:Active:Standalone] config # md5sum /tmp/systemauth.pl c813ac405cab73591492db326ad8893a /tmp/systemauth.pl Returns: str - md5sum of the filename """ md5sum_result = None md5sum_output = self.api_handler.tm.util.bash.exec_cmd("run", utilCmdArgs='-c "md5sum {}"'.format(filepath)) if md5sum_output: md5sum_result = md5sum_output.commandResult md5sum_result = md5sum_result.split()[0] return md5sum_result def _get_active_volume(self): """Gets name of active volume on the device Returns: str - name of active volume """ volumes = self._get_volumes() for _volume in volumes: if hasattr(_volume, "active") and _volume.active is True: current_volume = _volume.name return current_volume def _get_free_space(self): """Gets free space on the device Example of 'vgdisplay -s --units G' command: [root@ntc:Active:Standalone] config # vgdisplay -s --units G "vg-db-sda" 30.98 GB [23.89 GB used / 7.10 GB free] Returns: int - number of gigabytes of free space """ free_space = None free_space_output = self.api_handler.tm.util.bash.exec_cmd("run", utilCmdArgs='-c "vgdisplay -s --units G"') if free_space_output: free_space = free_space_output.commandResult free_space_regex = ".*\s\/\s(\d+\.?\d+) GB free" match = re.match(free_space_regex, free_space) if match: free_space = float(match.group(1)) return free_space def _get_hostname(self): return self.soap_handler.Management.Device.get_hostname(self.devices)[0] def _get_images(self): """Gets list of images on the device Returns: list - of images """ images = self.api_handler.tm.sys.software.images.get_collection() return images def _get_interfaces_list(self): interfaces = self.soap_handler.Networking.Interfaces.get_list() return interfaces def _get_model(self): return self.soap_handler.System.SystemInfo.get_marketing_name() def _get_serial_number(self): system_information = self.soap_handler.System.SystemInfo.get_system_information() chassis_serial = system_information.get("chassis_serial") return chassis_serial def _get_uptime(self): return self.soap_handler.System.SystemInfo.get_uptime() def _get_version(self): return self.soap_handler.System.SystemInfo.get_version() def _get_vlans(self): rd_list = self.soap_handler.Networking.RouteDomainV2.get_list() rd_vlan_list = self.soap_handler.Networking.RouteDomainV2.get_vlan(rd_list) return rd_vlan_list def _get_volumes(self): """Gets list of volumes on the device Returns: list - of volumes """ volumes = self.api_handler.tm.sys.software.volumes.get_collection() return volumes def _image_booted(self, image_name, **vendor_specifics): """Checks if requested booted volume is an active volume. F5 does not provide reliable way to rely on image_name once the volume has been installed so check needs to be performed against volume parameter. """ volume = vendor_specifics.get("volume") return True if self._get_active_volume() == volume else False def _image_exists(self, image_name): """Checks if image exists on the device Returns: bool - True / False if image exists """ all_images_output = self.api_handler.tm.util.unix_ls.exec_cmd("run", utilCmdArgs="/shared/images") if all_images_output: all_images = all_images_output.commandResult.splitlines() else: return None if image_name in all_images: return True else: return False def _image_install(self, image_name, volume): """Requests the installation of the image on a volume Returns: None """ options = [] create_volume = not self._volume_exists(volume) if create_volume: options.append({"create-volume": True}) self.api_handler.tm.sys.software.images.exec_cmd("install", name=image_name, volume=volume, options=options) def _image_match(self, image_name, checksum): """Checks if image name matches the checksum Returns: bool - True / False if image matches the checksum """ if self._image_exists(image_name): image = os.path.join("/shared/images", image_name) if self._check_md5sum(image, checksum): return True return False def _open_soap(self): try: self.soap_handler = bigsuds.BIGIP(hostname=self.hostname, username=self.username, password=self.password) self.devices = self.soap_handler.Management.Device.get_list() except bigsuds.OperationFailed as err: raise RuntimeError("ConfigSync API Error ({})".format(err)) def _reboot_to_volume(self, volume_name=None): """Requests the reboot (activiation) to a specified volume Returns: None """ if volume_name: self.api_handler.tm.sys.software.volumes.exec_cmd("reboot", volume=volume_name) else: # F5 SDK API does not support reboot to the current volume. # This is a workaround by issuing reboot command from bash directly. self.api_handler.tm.util.bash.exec_cmd("run", utilCmdArgs='-c "reboot"') def _reconnect(self): """ Reconnects to the device """ self.api_handler = ManagementRoot(self.hostname, self.username, self.password) def _upload_image(self, image_filepath): """Uploads an iso image to the device Returns: None """ image_filename = os.path.basename(image_filepath) _URI = "https://{hostname}/mgmt/cm/autodeploy/software-image-uploads/{filename}".format( hostname=self.hostname, filename=image_filename ) chunk_size = 512 * 1024 size = os.path.getsize(image_filepath) headers = {"Content-Type": "application/octet-stream"} requests.packages.urllib3.disable_warnings() start = 0 with open(image_filepath, "rb") as fileobj: while True: payload = fileobj.read(chunk_size) if not payload: break end = fileobj.tell() if end < chunk_size: end = size content_range = "{}-{}/{}".format(start, end - 1, size) headers["Content-Range"] = content_range resp = requests.post( _URI, auth=(self.username, self.password), data=payload, headers=headers, verify=False ) start += len(payload) @staticmethod def _uptime_to_string(uptime): days = uptime / (24 * 60 * 60) uptime = uptime % (24 * 60 * 60) hours = uptime / (60 * 60) uptime = uptime % (60 * 60) mins = uptime / 60 uptime = uptime % 60 seconds = uptime return "%02d:%02d:%02d:%02d" % (days, hours, mins, seconds) def _volume_exists(self, volume_name): """Checks if volume exists on the device Returns: bool - True / False if volume exists """ result = self.api_handler.tm.sys.software.volumes.volume.exists(name=volume_name) return result def _wait_for_device_reboot(self, volume_name, timeout=600): """Waits for the device to be booted into a specified volume Returns: bool - True / False if reboot has been successful """ end_time = time.time() + timeout time.sleep(60) while time.time() < end_time: time.sleep(5) try: self._reconnect() volume = self.api_handler.tm.sys.software.volumes.volume.load(name=volume_name) if hasattr(volume, "active") and volume.active is True: return True except Exception: pass return False def _wait_for_image_installed(self, image_name, volume, timeout=1800): """Waits for the device to install image on a volume Args: image_name (str): The name of the image that should be booting. volume (str): The volume that the device should be booting into. timeout (int): The number of seconds to wait for device to boot up. Raises: OSInstallError: When the volume is not booted before the timeout is reached. """ end_time = time.time() + timeout while time.time() < end_time: time.sleep(20) # Avoid race-conditions issues. Newly created volumes _might_ lack # of .version attribute in first seconds of their live. try: if self.image_installed(image_name=image_name, volume=volume): return except: pass raise OSInstallError(hostname=self.facts.get("hostname"), desired_boot=volume) def backup_running_config(self, filename): raise NotImplementedError def checkpoint(self, filename): raise NotImplementedError def close(self): pass def config(self, command): raise NotImplementedError def config_list(self, commands): raise NotImplementedError @property def facts(self): if self._facts is None: self._facts = { "uptime": self._get_uptime(), "vendor": self.vendor, "model": self._get_model(), "hostname": self._get_hostname(), "fqdn": self._get_hostname(), "os_version": self._get_version(), "serial_number": self._get_serial_number(), "interfaces": self._get_interfaces_list(), "vlans": self._get_vlans(), "uptime_string": self._uptime_to_string(self._get_uptime()), } return self._facts def file_copy(self, src, dest=None, **kwargs): if not self.file_copy_remote_exists(src, dest, **kwargs): self._check_free_space(min_space=6) self._upload_image(image_filepath=src) if not self.file_copy_remote_exists(src, dest, **kwargs): raise FileTransferError( message="Attempted file copy, but could not validate file existed after transfer" ) # TODO: Make this an internal method since exposing file_copy should be sufficient def file_copy_remote_exists(self, src, dest=None, **kwargs): if dest and not dest.startswith("/shared/images"): raise NotImplementedError("Support only for images - destination is always /shared/images") local_md5sum = self._file_copy_local_md5(filepath=src) file_basename = os.path.basename(src) if not self._image_match(image_name=file_basename, checksum=local_md5sum): return False else: return True def get_boot_options(self): active_volume = self._get_active_volume() return {"active_volume": active_volume} def image_installed(self, image_name, volume): """Checks if image is installed on a specified volume Returns: bool - True / False if image installed on a specified volume """ if not image_name or not volume: raise RuntimeError("image_name and volume must be specified") image = None images_on_device = self._get_images() for _image in images_on_device: # fullPath = u'BIGIP-11.6.0.0.0.401.iso' if _image.fullPath == image_name: image = _image if image: volumes = self._get_volumes() for _volume in volumes: if ( _volume.name == volume and _volume.version == image.version and _volume.basebuild == image.build and _volume.status == "complete" ): return True return False def install_os(self, image_name, **vendor_specifics): volume = vendor_specifics.get('volume') if not self.image_installed(image_name, volume): self._check_free_space(min_space=6) if not self._image_exists(image_name): raise NTCFileNotFoundError( hostname=self._get_hostname(), file=image_name, dir='/shared/images' ) self._image_install(image_name=image_name, volume=volume) self._wait_for_image_installed(image_name=image_name, volume=volume) return True return False def open(self): pass def reboot(self, timer=0, confirm=False, volume=None): if confirm: if self._get_active_volume() == volume: volume_name = None else: volume_name = volume self._reboot_to_volume(volume_name=volume_name) if not self._wait_for_device_reboot(volume_name=volume): raise RuntimeError("Reboot to volume {} failed".format(volume)) else: print("Need to confirm reboot with confirm=True") def rollback(self, checkpoint_file): raise NotImplementedError def running_config(self): raise NotImplementedError def save(self, filename=None): raise NotImplementedError def set_boot_options(self, image_name, **vendor_specifics): volume = vendor_specifics.get("volume") self._check_free_space(min_space=6) if not self._image_exists(image_name): raise NTCFileNotFoundError( hostname=self._get_hostname(), file=image_name, dir='/shared/images' ) self._image_install(image_name=image_name, volume=volume) self._wait_for_image_installed(image_name=image_name, volume=volume) def show(self, command, raw_text=False): raise NotImplementedError def show_list(self, commands, raw_text=False): raise NotImplementedError def startup_config(self): raise NotImplementedError PKce|MNg8g8pyntc/devices/ios_device.py"""Module for using a Cisco IOS device over SSH. """ import signal import os import re import time from pyntc.templates import get_structured_data from pyntc.data_model.converters import convert_dict_by_key from pyntc.data_model.key_maps import ios_key_maps from .system_features.file_copy.base_file_copy import FileTransferError from .base_device import BaseDevice, RollbackError, fix_docs from pyntc.errors import ( CommandError, CommandListError, FileSystemNotFoundError, NTCError, NTCFileNotFoundError, OSInstallError, RebootTimeoutError, ) from netmiko import ConnectHandler from netmiko import FileTransfer @fix_docs class IOSDevice(BaseDevice): def __init__(self, host, username, password, secret="", port=22, **kwargs): super(IOSDevice, self).__init__(host, username, password, vendor="cisco", device_type="cisco_ios_ssh") self.native = None self.host = host self.username = username self.password = password self.secret = secret self.port = int(port) self.global_delay_factor = kwargs.get("global_delay_factor", 1) self.delay_factor = kwargs.get("delay_factor", 1) self._connected = False self.open() def _enable(self): self.native.exit_config_mode() if not self.native.check_enable_mode(): self.native.enable() def _enter_config(self): self._enable() self.native.config_mode() def _file_copy_instance(self, src, dest=None, file_system="flash:"): if dest is None: dest = os.path.basename(src) fc = FileTransfer(self.native, src, dest, file_system=file_system) return fc def _get_file_system(self): """Determines the default file system or directory for device. Returns: str: The name of the default file system or directory for the device. Raises: FileSystemNotFound: When the module is unable to determine the default file system. """ raw_data = self.show("dir") try: file_system = re.match(r"\s*.*?(\S+:)", raw_data).group(1) return file_system except AttributeError: raise FileSystemNotFoundError(hostname=self.facts.get("hostname"), command="dir") def _image_booted(self, image_name, **vendor_specifics): version_data = self.show("show version") if re.search(image_name, version_data): return True return False def _interfaces_detailed_list(self): ip_int_br_out = self.show("show ip int br") ip_int_br_data = get_structured_data("cisco_ios_show_ip_int_brief.template", ip_int_br_out) return ip_int_br_data def _is_catalyst(self): return self.facts["model"].startswith("WS-") def _raw_version_data(self): show_version_out = self.show("show version") try: version_data = get_structured_data("cisco_ios_show_version.template", show_version_out)[0] return version_data except IndexError: return {} def _send_command(self, command, expect=False, expect_string=""): if expect: if expect_string: response = self.native.send_command_expect(command, expect_string=expect_string) else: response = self.native.send_command_expect(command) else: response = self.native.send_command_timing(command) if "% " in response or "Error:" in response: raise CommandError(command, response) return response def _show_vlan(self): show_vlan_out = self.show("show vlan") show_vlan_data = get_structured_data("cisco_ios_show_vlan.template", show_vlan_out) return show_vlan_data def _uptime_components(self, uptime_full_string): match_days = re.search(r"(\d+) days?", uptime_full_string) match_hours = re.search(r"(\d+) hours?", uptime_full_string) match_minutes = re.search(r"(\d+) minutes?", uptime_full_string) days = int(match_days.group(1)) if match_days else 0 hours = int(match_hours.group(1)) if match_hours else 0 minutes = int(match_minutes.group(1)) if match_minutes else 0 return days, hours, minutes def _uptime_to_seconds(self, uptime_full_string): days, hours, minutes = self._uptime_components(uptime_full_string) seconds = days * 24 * 60 * 60 seconds += hours * 60 * 60 seconds += minutes * 60 return seconds def _uptime_to_string(self, uptime_full_string): days, hours, minutes = self._uptime_components(uptime_full_string) return "%02d:%02d:%02d:00" % (days, hours, minutes) def _wait_for_device_reboot(self, timeout=3600): start = time.time() while time.time() - start < timeout: try: self.open() return except: pass raise RebootTimeoutError(hostname=self.facts["hostname"], wait_time=timeout) def backup_running_config(self, filename): with open(filename, "w") as f: f.write(self.running_config) def checkpoint(self, checkpoint_file): self.save(filename=checkpoint_file) def close(self): if self._connected: self.native.disconnect() self._connected = False def config(self, command): self._enter_config() self._send_command(command) self.native.exit_config_mode() def config_list(self, commands): self._enter_config() entered_commands = [] for command in commands: entered_commands.append(command) try: self._send_command(command) except CommandError as e: raise CommandListError(entered_commands, command, e.cli_error_msg) self.native.exit_config_mode() @property def facts(self): if self._facts is None: version_data = self._raw_version_data() self._facts = convert_dict_by_key(version_data, ios_key_maps.BASIC_FACTS_KM) self._facts["vendor"] = self.vendor uptime_full_string = version_data["uptime"] self._facts["uptime"] = self._uptime_to_seconds(uptime_full_string) self._facts["uptime_string"] = self._uptime_to_string(uptime_full_string) self._facts["fqdn"] = "N/A" self._facts["interfaces"] = list(x["intf"] for x in self._interfaces_detailed_list()) if self._facts["model"].startswith("WS"): self._facts["vlans"] = list(str(x["vlan_id"]) for x in self._show_vlan()) else: self._facts["vlans"] = [] # ios-specific facts self._facts[self.device_type] = {"config_register": version_data["config_register"]} return self._facts def file_copy(self, src, dest=None, file_system=None): self._enable() if file_system is None: file_system = self._get_file_system() if not self.file_copy_remote_exists(src, dest, file_system): fc = self._file_copy_instance(src, dest, file_system=file_system) # if not self.fc.verify_space_available(): # raise FileTransferError('Not enough space available.') try: fc.enable_scp() fc.establish_scp_conn() fc.transfer_file() except: raise FileTransferError finally: fc.close_scp_chan() if not self.file_copy_remote_exists(src, dest, file_system): raise FileTransferError( message="Attempted file copy, but could not validate file existed after transfer" ) # TODO: Make this an internal method since exposing file_copy should be sufficient def file_copy_remote_exists(self, src, dest=None, file_system=None): self._enable() if file_system is None: file_system = self._get_file_system() fc = self._file_copy_instance(src, dest, file_system=file_system) if fc.check_file_exists() and fc.compare_md5(): return True return False def get_boot_options(self): # TODO: CREATE A MOCK FOR TESTING THIS FUNCTION boot_path_regex = r"(?:BOOT variable\s+=\s+(\S+)\s*$|BOOT path-list\s+:\s*(\S+)\s*$)" try: # Try show bootvar command first show_boot_out = self.show("show bootvar") show_boot_out = show_boot_out.split("Boot Variables on next reload", 1)[-1] except CommandError: try: # Try show boot if previous command was invalid show_boot_out = self.show("show boot") show_boot_out = show_boot_out.split("Boot Variables on next reload", 1)[-1] except CommandError: # Default to running config value show_boot_out = self.show("show run | inc boot") boot_path_regex = r"boot\s+system\s+(?:\S+\s+|)(\S+)\s*$" match = re.search(boot_path_regex, show_boot_out, re.MULTILINE) if match: boot_path = match.group(1) file_system = self._get_file_system() boot_image = boot_path.replace(file_system, "") boot_image = boot_image.replace("/", "") boot_image = boot_image.split(",")[0] boot_image = boot_image.split(";")[0] else: boot_image = None return {"sys": boot_image} def install_os(self, image_name, **vendor_specifics): timeout = vendor_specifics.get("timeout", 3600) if not self._image_booted(image_name): self.set_boot_options(image_name, **vendor_specifics) self.reboot(confirm=True) self._wait_for_device_reboot(timeout=timeout) if not self._image_booted(image_name): raise OSInstallError(hostname=self.facts.get("hostname"), desired_boot=image_name) return True return False def open(self): if self._connected: try: self.native.find_prompt() except: self._connected = False if not self._connected: self.native = ConnectHandler( device_type="cisco_ios", ip=self.host, username=self.username, password=self.password, port=self.port, global_delay_factor=self.global_delay_factor, secret=self.secret, verbose=False, ) self._connected = True def reboot(self, timer=0, confirm=False): if confirm: def handler(signum, frame): raise RebootSignal("Interrupting after reload") signal.signal(signal.SIGALRM, handler) signal.alarm(10) try: if timer > 0: first_response = self.show("reload in %d" % timer) else: first_response = self.show("reload") if "System configuration" in first_response: self.native.send_command_timing("no") self.native.send_command_timing("\n") except RebootSignal: signal.alarm(0) signal.alarm(0) else: print("Need to confirm reboot with confirm=True") def rollback(self, rollback_to): try: self.show("configure replace flash:%s force" % rollback_to) except CommandError: raise RollbackError("Rollback unsuccessful. %s may not exist." % rollback_to) @property def running_config(self): return self.show("show running-config", expect=True) def save(self, filename="startup-config"): command = "copy running-config %s" % filename # Changed to send_command_timing to not require a direct prompt return. self.native.send_command_timing(command) # If the user has enabled 'file prompt quiet' which dose not require any confirmation or feedback. # This will send return without requiring an OK. # Send a return to pass the [OK]? message - Incease delay_factor for looking for response. self.native.send_command_timing("\n", delay_factor=2) # Confirm that we have a valid prompt again before returning. self.native.find_prompt() return True def set_boot_options(self, image_name, **vendor_specifics): file_system = vendor_specifics.get("file_system") if file_system is None: file_system = self._get_file_system() file_system_files = self.show("dir {0}".format(file_system)) if re.search(image_name, file_system_files) is None: raise NTCFileNotFoundError(hostname=self.facts.get("hostname"), file=image_name, dir=file_system) try: command = "boot system {0}/{1}".format(file_system, image_name) self.config_list(["no boot system", command]) except CommandError: file_system = file_system.replace(":", "") command = "boot system {0} {1}".format(file_system, image_name) self.config_list(["no boot system", command]) self.save() new_boot_options = self.get_boot_options()["sys"] if new_boot_options != image_name: raise CommandError( command=command, message="Setting boot command did not yield expected results, found {0}".format(new_boot_options), ) def show(self, command, expect=False, expect_string=""): self._enable() return self._send_command(command, expect=expect, expect_string=expect_string) def show_list(self, commands): self._enable() responses = [] entered_commands = [] for command in commands: entered_commands.append(command) try: responses.append(self._send_command(command)) except CommandError as e: raise CommandListError(entered_commands, command, e.cli_error_msg) return responses @property def startup_config(self): return self.show("show startup-config") class RebootSignal(NTCError): pass PKce|Mv9!!pyntc/devices/jnpr_device.pyimport os import re import time import hashlib from tempfile import NamedTemporaryFile from jnpr.junos import Device as JunosNativeDevice from jnpr.junos.utils.config import Config as JunosNativeConfig from jnpr.junos.utils.fs import FS as JunosNativeFS from jnpr.junos.utils.sw import SW as JunosNativdSW from jnpr.junos.utils.scp import SCP from jnpr.junos.op.ethport import EthPortTable from jnpr.junos.exception import ConfigLoadError from .tables.jnpr.loopback import LoopbackTable from .base_device import BaseDevice, fix_docs from pyntc.errors import CommandError, CommandListError, RebootTimeoutError from .system_features.file_copy.base_file_copy import FileTransferError @fix_docs class JunosDevice(BaseDevice): def __init__(self, host, username, password, *args, **kwargs): super(JunosDevice, self).__init__( host, username, password, *args, vendor="juniper", device_type="juniper_junos_netconf", **kwargs ) self.native = JunosNativeDevice(*args, host=host, user=username, passwd=password, **kwargs) self.open() self.cu = JunosNativeConfig(self.native) self.fs = JunosNativeFS(self.native) self.sw = JunosNativdSW(self.native) def _file_copy_local_file_exists(self, filepath): return os.path.isfile(filepath) def _file_copy_local_md5(self, filepath, blocksize=2 ** 20): if self._file_copy_local_file_exists(filepath): m = hashlib.md5() with open(filepath, "rb") as f: buf = f.read(blocksize) while buf: m.update(buf) buf = f.read(blocksize) return m.hexdigest() def _file_copy_remote_md5(self, filename): return self.fs.checksum(filename) def _get_interfaces(self): eth_ifaces = EthPortTable(self.native) eth_ifaces.get() loop_ifaces = LoopbackTable(self.native) loop_ifaces.get() ifaces = eth_ifaces.keys() ifaces.extend(loop_ifaces.keys()) return ifaces def _image_booted(self, image_name, **vendor_specifics): raise NotImplementedError def _uptime_components(self, uptime_full_string): match_days = re.search(r"(\d+) days?", uptime_full_string) match_hours = re.search(r"(\d+) hours?", uptime_full_string) match_minutes = re.search(r"(\d+) minutes?", uptime_full_string) match_seconds = re.search(r"(\d+) seconds?", uptime_full_string) days = int(match_days.group(1)) if match_days else 0 hours = int(match_hours.group(1)) if match_hours else 0 minutes = int(match_minutes.group(1)) if match_minutes else 0 seconds = int(match_seconds.group(1)) if match_seconds else 0 return days, hours, minutes, seconds def _uptime_to_seconds(self, uptime_full_string): days, hours, minutes, seconds = self._uptime_components(uptime_full_string) seconds += days * 24 * 60 * 60 seconds += hours * 60 * 60 seconds += minutes * 60 return seconds def _uptime_to_string(self, uptime_full_string): days, hours, minutes, seconds = self._uptime_components(uptime_full_string) return "%02d:%02d:%02d:%02d" % (days, hours, minutes, seconds) def _wait_for_device_reboot(self, timeout=3600): start = time.time() while time.time() - start < timeout: try: self.open() return except: pass raise RebootTimeoutError(hostname=self.facts["hostname"], wait_time=timeout) def backup_running_config(self, filename): with open(filename, "w") as f: f.write(self.running_config) def checkpoint(self, filename): self.save(filename) def close(self): if self.connected: self.native.close() def config(self, command, format="set"): try: self.cu.load(command, format=format) self.cu.commit() except ConfigLoadError as e: raise CommandError(command, e.message) def config_list(self, commands, format="set"): try: for command in commands: self.cu.load(command, format=format) self.cu.commit() except ConfigLoadError as e: raise CommandListError(commands, command, e.message) @property def connected(self): return self.native.connected @property def facts(self): if self._facts is None: native_facts = self.native.facts try: native_uptime_string = native_facts["RE0"]["up_time"] except (AttributeError, TypeError): native_uptime_string = None self._facts = { "hostname": native_facts.get("hostname"), "fqdn": native_facts.get("fqdn"), "model": native_facts.get("model"), "uptime": None, "uptime_string": None, "serial_number": native_facts.get("serialnumber"), "interfaces": self._get_interfaces(), "vendor": self.vendor, "version": native_facts.get("version"), } # TODO: Use a more reliable method for determining uptime (show system uptime) if native_uptime_string is not None: self._facts["uptime"] = self._uptime_to_seconds(native_uptime_string) self._facts["uptime_string"] = self._uptime_to_string(native_uptime_string) return self._facts def file_copy(self, src, dest=None, **kwargs): if not self.file_copy_remote_exists(src, dest, **kwargs): if dest is None: dest = os.path.basename(src) with SCP(self.native) as scp: scp.put(src, remote_path=dest) if not self.file_copy_remote_exists(src, dest, **kwargs): raise FileTransferError( message="Attempted file copy, but could not validate file existed after transfer" ) # TODO: Make this an internal method since exposing file_copy should be sufficient def file_copy_remote_exists(self, src, dest=None, **kwargs): if dest is None: dest = os.path.basename(src) local_hash = self._file_copy_local_md5(src) remote_hash = self._file_copy_remote_md5(dest) if local_hash is not None and local_hash == remote_hash: return True return False def get_boot_options(self): return self.facts["os_version"] def install_os(self, image_name, **vendor_specifics): raise NotImplementedError def open(self): if not self.connected: self.native.open() def reboot(self, timer=0, confirm=False): if confirm: self.sw.reboot(in_min=timer) else: print("Need to confirm reboot with confirm=True") def rollback(self, filename): self.native.timeout = 60 temp_file = NamedTemporaryFile() with SCP(self.native) as scp: scp.get(filename, local_path=temp_file.name) self.cu.load(path=temp_file.name, format="text", overwrite=True) self.cu.commit() temp_file.close() self.native.timeout = 30 @property def running_config(self): return self.show("show config") def save(self, filename=None): if filename is None: self.cu.commit() return temp_file = NamedTemporaryFile() temp_file.write(self.show("show config")) temp_file.flush() with SCP(self.native) as scp: scp.put(temp_file.name, remote_path=filename) temp_file.close() return True def set_boot_options(self, sys): raise NotImplementedError def show(self, command, raw_text=True): if not raw_text: raise ValueError( 'Juniper only supports raw text output. \ Append " | display xml" to your commands for a structured string.' ) if not command.startswith("show"): raise CommandError(command, 'Juniper "show" commands must begin with "show".') return self.native.cli(command, warning=False) def show_list(self, commands, raw_text=True): responses = [] for command in commands: responses.append(self.show(command, raw_text=raw_text)) return responses @property def startup_config(self): return self.show("show config") PKce|M+3YYpyntc/devices/nxos_device.py"""Module for using an NXOX device over NX-API. """ import os import re import time from pyntc.data_model.converters import strip_unicode from .system_features.file_copy.base_file_copy import FileTransferError from .base_device import BaseDevice, RollbackError, RebootTimerError, fix_docs from pyntc.errors import CommandError, CommandListError, NTCFileNotFoundError, RebootTimeoutError, OSInstallError from pynxos.device import Device as NXOSNative from pynxos.features.file_copy import FileTransferError as NXOSFileTransferError from pynxos.errors import CLIError @fix_docs class NXOSDevice(BaseDevice): def __init__(self, host, username, password, transport="http", timeout=30, port=None, **kwargs): super(NXOSDevice, self).__init__(host, username, password, vendor="cisco", device_type="cisco_nxos_nxapi") self.transport = transport self.timeout = timeout self.native = NXOSNative(host, username, password, transport=transport, timeout=timeout, port=port) def _image_booted(self, image_name, **vendor_specifics): version_data = self.show("show version", raw_text=True) if re.search(image_name, version_data): return True return False def _wait_for_device_reboot(self, timeout=600): start = time.time() while time.time() - start < timeout: try: self.refresh_facts() if self.facts["uptime"] < 180: return except: pass raise RebootTimeoutError(hostname=self.facts["hostname"], wait_time=timeout) def backup_running_config(self, filename): self.native.backup_running_config(filename) def checkpoint(self, filename): return self.native.checkpoint(filename) def close(self): pass def config(self, command): try: self.native.config(command) except CLIError as e: raise CommandError(command, str(e)) def config_list(self, commands): try: self.native.config_list(commands) except CLIError as e: raise CommandListError(commands, e.command, str(e)) @property def facts(self): if self._facts is None: if hasattr(self.native, "_facts"): del self.native._facts self._facts = strip_unicode(self.native.facts) self._facts["vendor"] = self.vendor return self._facts def file_copy(self, src, dest=None, file_system="bootflash:"): if not self.file_copy_remote_exists(src, dest, file_system): dest = dest or os.path.basename(src) try: file_copy = self.native.file_copy(src, dest, file_system=file_system) if not self.file_copy_remote_exists(src, dest, file_system): raise FileTransferError( message="Attempted file copy, but could not validate file existed after transfer" ) return file_copy except NXOSFileTransferError as e: print(str(e)) raise FileTransferError # TODO: Make this an internal method since exposing file_copy should be sufficient def file_copy_remote_exists(self, src, dest=None, file_system="bootflash:"): dest = dest or os.path.basename(src) return self.native.file_copy_remote_exists(src, dest, file_system=file_system) def get_boot_options(self): return self.native.get_boot_options() def install_os(self, image_name, **vendor_specifics): timeout = vendor_specifics.get("timeout", 3600) if not self._image_booted(image_name): self.set_boot_options(image_name, **vendor_specifics) self._wait_for_device_reboot(timeout=timeout) if not self._image_booted(image_name): raise OSInstallError(hostname=self.facts.get("hostname"), desired_boot=image_name) self.save() return True return False def open(self): pass def reboot(self, confirm=False, timer=0): if timer != 0: raise RebootTimerError(self.device_type) self.native.reboot(confirm=confirm) def rollback(self, filename): try: self.native.rollback(filename) except CLIError: raise RollbackError("Rollback unsuccessful, %s may not exist." % filename) @property def running_config(self): return self.native.running_config def save(self, filename="startup-config"): return self.native.save(filename=filename) def set_boot_options(self, image_name, kickstart=None, **vendor_specifics): file_system = vendor_specifics.get("file_system") if file_system is None: file_system = "bootflash:" file_system_files = self.show("dir {0}".format(file_system), raw_text=True) if re.search(image_name, file_system_files) is None: raise NTCFileNotFoundError(hostname=self.facts.get("hostname"), file=image_name, dir=file_system) if kickstart is not None: if re.search(kickstart, file_system_files) is None: raise NTCFileNotFoundError(hostname=self.facts.get("hostname"), file=kickstart, dir=file_system) kickstart = file_system + kickstart image_name = file_system + image_name self.native.timeout = 300 upgrade_result = self.native.set_boot_options(image_name, kickstart=kickstart) self.native.timeout = 30 return upgrade_result def set_timeout(self, timeout): self.native.timeout = timeout def show(self, command, raw_text=False): try: return strip_unicode(self.native.show(command, raw_text=raw_text)) except CLIError as e: raise CommandError(command, str(e)) def show_list(self, commands, raw_text=False): try: return strip_unicode(self.native.show_list(commands, raw_text=raw_text)) except CLIError as e: raise CommandListError(commands, e.command, str(e)) @property def startup_config(self): return self.show("show startup-config", raw_text=True) PKce|M)pyntc/devices/system_features/__init__.pyPKce|Mw-pyntc/devices/system_features/base_feature.pyclass BaseFeature(object): def config(self, vlan_id, **params): raise NotImplementedError def get_all(self): raise NotImplementedError def get(self, vlan_id): raise NotImplementedError def get_list(self): raise NotImplementedError PKce|M3pyntc/devices/system_features/file_copy/__init__.pyPKce|ME>9pyntc/devices/system_features/file_copy/base_file_copy.pyfrom ..base_feature import BaseFeature from pyntc.errors import NTCError class BaseFileCopy(BaseFeature): pass class FileTransferError(NTCError): def __init__(self, message=None): if message is None: message = ( "An error occurred during transfer. " "Please make sure the local file exists and " "that appropriate permissions are set on the remote device." ) super(FileTransferError, self).__init__(message) PKce|M] 8pyntc/devices/system_features/file_copy/eos_file_copy.pyimport paramiko import os import hashlib import re from scp import SCPClient from .base_file_copy import BaseFileCopy, FileTransferError class EOSFileCopy(BaseFileCopy): def __init__(self, device, local, remote=None, port=22): self.device = device self.local = local self.remote = remote or os.path.basename(local) self.port = port def already_transfered(self): remote_hash = self.get_remote_md5() local_hash = self.get_local_md5() if local_hash is not None: if local_hash == remote_hash: return True return False def enough_remote_space(self): remote_size = self.get_remote_size() file_size = os.path.getsize(self.local) if file_size > remote_size: return False return True def get(self): self.transfer_file(pull=True) def get_local_md5(self, blocksize=2 ** 20): if self.local_file_exists(): m = hashlib.md5() with open(self.local, "rb") as f: buf = f.read(blocksize) while buf: m.update(buf) buf = f.read(blocksize) return m.hexdigest() def get_remote_md5(self): try: hash_out = self.device.show("verify /md5 {}".format(self.remote), raw_text=True) hash_out = hash_out.split("=")[1].strip() return hash_out except: return None def get_remote_size(self): dir_out = self.device.show("dir", raw_text=True) match = re.search(r"(\d+) bytes free", dir_out) bytes_free = match.group(1) return int(bytes_free) def local_file_exists(self): return os.path.isfile(self.local) def remote_file_exists(self): try: self.device.show("dir {}".format(self.remote)) except: return False return True def send(self): self.transfer_file() def transfer_file(self, pull=False): if pull is False: if not self.local_file_exists(): raise FileTransferError("Could not transfer file. Local file doesn't exist.") if not self.enough_remote_space(): raise FileTransferError("Could not transfer file. Not enough space on device.") ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect( hostname=self.device.host, username=self.device.username, password=self.device.password, port=self.port, allow_agent=False, look_for_keys=False, ) scp = SCPClient(ssh.get_transport(), socket_timeout=30.0) try: if pull: scp.get(self.remote, self.local) else: scp.put(self.local, self.remote) except Exception as e: raise FileTransferError finally: scp.close() return True PKce|M/pyntc/devices/system_features/vlans/__init__.pyPKce|M31pyntc/devices/system_features/vlans/base_vlans.pyfrom ..base_feature import BaseFeature from pyntc.errors import NTCError def vlan_not_in_range_error(vlan_id, lower=1, upper=4094): vlan_id = int(vlan_id) if vlan_id < lower or vlan_id > upper: raise VlanNotInRangeError(lower, upper) class BaseVlans(BaseFeature): pass class VlanNotInRangeError(NTCError): def __init__(self, lower, upper): super(VlanNotInRangeError, self).__init__("Vlan Id must be in range %s-%s" % (lower, upper)) PKce|MVp0pyntc/devices/system_features/vlans/eos_vlans.pyfrom .base_vlans import BaseVlans, vlan_not_in_range_error from pyntc.data_model.key_maps.eos_key_maps import VLAN_KM from pyntc.data_model.converters import convert_dict_by_key, convert_list_by_key, strip_unicode def instance(device): return EOSVlans(device) class EOSVlans(BaseVlans): def __init__(self, device): self.native_vlans = device.native.api("vlans") # def config(self, vlan_id, **params): # vlan_not_in_range_error(vlan_id) # # self.native_vlans.create(vlan_id) # vlan_name = params.get('name') # if vlan_name: # self.native_vlans.set_name(vlan_id, vlan_name) def get(self, vlan_id): vlan_not_in_range_error(vlan_id) vlan_id = str(vlan_id) native_vlan_response = self.native_vlans.get(vlan_id) converted = convert_dict_by_key(native_vlan_response, VLAN_KM) converted["id"] = vlan_id return strip_unicode(converted) # def get_all(self): # native_all_vlan_response = self.native_vlans.getall() # detailed_vlan_list = convert_list_by_key(native_all_vlan_response.values(), VLAN_KM) # # return strip_unicode(detailed_vlan_list) def get_list(self): native_all_vlan_response = self.native_vlans.getall() extracted_vlan_ids = sorted(list(native_all_vlan_response.keys())) return strip_unicode(extracted_vlan_ids) def remove(self, vlan_id): vlan_not_in_range_error(vlan_id) self.native_vlans.delete(vlan_id) # def set_name(self, vlan_id, vlan_name, default=False, disable=False): # vlan_not_in_range_error(vlan_id) # # self.native_vlans.set_name(vlan_id, vlan_name, default=default, disable=disable) PKce|M pyntc/devices/tables/__init__.pyPKce|M%pyntc/devices/tables/jnpr/__init__.pyPKce|M+ξ%pyntc/devices/tables/jnpr/loopback.py""" Pythonifier for Loopback Table/View """ from jnpr.junos.factory import loadyaml from os.path import splitext _YAML_ = splitext(__file__)[0] + ".yml" globals().update(loadyaml(_YAML_)) PKce|M>n~&pyntc/devices/tables/jnpr/loopback.yml--- LoopbackTable: rpc: get-interface-information args: media: True interface_name: 'lo*' args_key: interface_name item: physical-interface view: LoopbackView LoopbackView: groups: mac_stats: ethernet-mac-statistics flags: if-device-flags fields: oper: oper-status admin: admin-status description: description mtu: { mtu : int } link_mode: link-mode macaddr: current-physical-address fields_mac_stats: rx_bytes: input-bytes rx_packets: input-packets tx_bytes: output-bytes tx_packets: output-packets fields_flags: running: { ifdf-running: flag } present: { ifdf-present: flag } PKce|MӷSpyntc/templates/__init__.pyimport os import textfsm TEMPLATE_PATH_ENV_VAR = "NTC_TEMPLATES" def get_structured_data(template_name, rawtxt): """Returns structured data given raw text using TextFSM templates """ template_file = get_template(template_name) with open(template_file) as template: fsm = textfsm.TextFSM(template) table = fsm.ParseText(rawtxt) structured_data = [] for row in table: temp_dict = {} for index, element in enumerate(row): temp_dict[fsm.header[index].lower()] = element structured_data.append(temp_dict) return structured_data def get_template(template_name): template_dir = get_template_dir() return os.path.join(template_dir, template_name) def get_template_dir(): try: return os.environ[TEMPLATE_PATH_ENV_VAR] except KeyError: return os.path.realpath(os.path.dirname(__file__)) PKce|Mn%n1pyntc/templates/cisco_asa_show_interface.templateValue Required INTERFACE (\S+) Value INTERFACE_ZONE (.+?) Value LINK_STATUS (\w+) Value PROTOCOL_STATUS (.*) Value HARDWARE_TYPE ([\w ]+) Value BANDWIDTH (\d+\s+\w+) Value DELAY (\d+\s+\w+) Value DUPLEX (\w+\-\w+) Value SPEED (\d+\w+\s\w+) Value DESCRIPTION (.*) Value ADDRESS ([a-zA-Z0-9]+.[a-zA-Z0-9]+.[a-zA-Z0-9]+) Value MTU (\d+) Value IP_ADDRESS (\d+\.\d+\.\d+\.\d+) Value NET_MASK (\d+\.\d+\.\d+\.\d+) Value ONEMIN_IN_PPS (\d+) Value ONEMIN_IN_RATE (\d+) Value ONEMIN_OUT_PPS (\d+) Value ONEMIN_OUT_RATE (\d+) Value ONEMIN_DROP_RATE (\d+) Value FIVEMIN_IN_PPS (\d+) Value FIVEMIN_IN_RATE (\d+) Value FIVEMIN_OUT_PPS (\d+) Value FIVEMIN_OUT_RATE (\d+) Value FIVEMIN_DROP_RATE (\d+) Start ^.*Interface ${INTERFACE} "${INTERFACE_ZONE}", is ${LINK_STATUS}.*protocol is ${PROTOCOL_STATUS} ^\s+Hardware is ${HARDWARE_TYPE} -> Continue ^.*BW ${BANDWIDTH}.*DLY ${DELAY} ^.*\(${DUPLEX}.*Auto-Speed\(${SPEED}\) ^.*Description: ${DESCRIPTION} ^.*MAC address ${ADDRESS}.*MTU ${MTU} ^.*IP address ${IP_ADDRESS}, .*subnet mask ${NET_MASK} ^.*1 minute input rate ${ONEMIN_IN_PPS} pkts/sec,\s+${ONEMIN_IN_RATE} bytes/sec ^.*1 minute output rate ${ONEMIN_OUT_PPS} pkts/sec,\s+${ONEMIN_OUT_RATE} bytes/sec ^.*1 minute drop rate, ${ONEMIN_DROP_RATE} ^.*5 minute input rate ${FIVEMIN_IN_PPS} pkts/sec,\s+${FIVEMIN_IN_RATE} bytes/sec ^.*5 minute output rate ${FIVEMIN_OUT_PPS} pkts/sec,\s+${FIVEMIN_OUT_RATE} bytes/sec ^.*5 minute drop rate, ${FIVEMIN_DROP_RATE} -> RecordPKce|M9=kk/pyntc/templates/cisco_asa_show_version.templateValue VERSION (\S+) Value DEVICE_MGR_VERSION (\S+) Value IMAGE (\S+) Value HOSTNAME (\S+) Value UPTIME (.+) Value HARDWARE (.+) Value MODEL (\S+) Value FLASH (\S+) Value List INTERFACES (\S+) Value LICENSE_MODE (.+) Value LICENSE_STATE (.+) Value MAX_INTF (\d+) Value MAX_VLANS (\d+) Value FAILOVER (\S+) Value CLUSTER (\S+) Value SERIAL (\S+) Value LAST_MOD (.+) Start ^.*Software\sVersion\s${VERSION} ^Device.+\s${DEVICE_MGR_VERSION} ^System image file.+"${IMAGE}" ^${HOSTNAME} up ${UPTIME} ^Hardware:\s+${HARDWARE}, ^Model Id:\s+${MODEL} ^Internal.+Flash,\s${FLASH} ^ \d+:.\S+\s${INTERFACES}.* ^License mode:\s${LICENSE_MODE} ^.+License State:\s${LICENSE_STATE} ^Maximum Physical.+:\s${MAX_INTF} ^Maximum VLANs.+:\s${MAX_VLANS} ^Failover\s+:\s${FAILOVER} ^Cluster\s+:\s${CLUSTER} ^Serial Number:\s${SERIAL} ^.+last modified by\s${LAST_MOD}PKce|MͲ4pyntc/templates/cisco_ios_show_ip_int_brief.templateValue INTF (\S+) Value IPADDR (\S+) Value STATUS (up|down|administratively down) Value PROTO (up|down) Start ^${INTF}\s+${IPADDR}\s+\w+\s+\w+\s+${STATUS}\s+${PROTO} -> Record PKce|M\U/pyntc/templates/cisco_ios_show_version.templateValue VERSION (\d+\.\d+(.+).*) Value HOSTNAME (\S+) Value UPTIME (.+) Value RUNNING_IMAGE (\S+) Value HARDWARE ((WS-C\S+)|(\d+)|(CSR\S+)) Value SERIAL (\S+) Value CONFIG_REGISTER (\S+) Start ^.*Software\s.+\),\sVersion\s${VERSION}, RELEASE.* ^${HOSTNAME}\s+uptime\s+is\s+${UPTIME} ^[sS]ystem\s+image\s+file\s+is\s+".*flash:${RUNNING_IMAGE}" ^[Cc]isco\s+${HARDWARE}.+ ^[Ss]ystem serial number\s+:\s+${SERIAL} ^[Cc]onfiguration\s+register\s+is\s+${CONFIG_REGISTER} -> Record PKce|M!,pyntc/templates/cisco_ios_show_vlan.templateValue VLAN_ID (\d+) Value NAME (\S+) Value STATUS (\S+) Start ^VLAN\s+Type -> Done ^${VLAN_ID}\s+${NAME}\s+${STATUS} -> Record Done PK!Hd BUcpyntc-0.0.9.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,rzd&Y)r$[)T&UD"PK!H{$ pyntc-0.0.9.dist-info/METADATAYms6_3'cQ/DsΝ#ˍDN^#R$d![ ҶHݦX.}vx$2%xć΀MHxsU}dEyjmXE9_$y..c{xs7I$w-yQw#e1w$"O<@ >V8qzӔ2F|jy1xWD GKPGLdswĹ*\<=xw$1 ~-d&TD|3|*WΡߜ7gE^dbsvE6N{t|?$؛aosODxnGh]h{8d[wUp9:7ԶKVx8Q][o(?eYr' Mq'Nbg_2 e兺npНT]_:Ivbuwy."OXylv*G&L}LUo7Oi{Zzƾgq%Aӏ1-ť^̓T\%E0;ϋ$桄lsڛR-LQp(tnQZ0r҆[3ř(=G2)ޖ:;s v-ő1O 0?P..Ń r8Z{VJq#HʋI'2fczy"k Io^#מ]Bc)E&Y.~z9Ba{>Oi9 ]B<./N%: a+b1q.CJ-b~'^oP3OOu5.P8*\a0$ sM|0bu]Ox*Sahv8>1ǻB_hʡU s?DcMp0h4V"/R']6!i0[@4;{ojՖù^de*'a ߂qB3&ז=y[@8S\2C!}ZHY#B+(Խ!jܓg)`ycNuӫ똸z Djv{C\˙H /^YHvɐXh/]={gpg-| gIj5͈hut`%#0ͩP8AAXNɝ~+LzSOS9SdfJ-ϔA{lG;fIԙ3 DT=MFp=78٫QOF| ?|:O뷓?=K̄9dltG-h d8j!^Lg ;y0PlVt:l2~Q)=BYmoO'?v&~PMg褺*}Zq&8@“YB(uSPI]Z\sҟ*!Rd%&lERjz X 8 l\JOVKk RINq`ol /إɕaSU`6nB-l aLs7`:K9k#M3@7MՒ#7pþs Y՟ ~]sd֣1[xpx ΋QGwã_fC%?ZB`2e#9P?1_u7ǟ,ꉥ f|K/mGM0ߤ]3]gZ E.F1os-дVRV觐vWhjjZ h sE@X$͌i&9=P;O0: B)ΰzjA/%%򸱨ԝF#gSH B.[Z_DQs67YA7ݜEکZwTm٭-:TT݃s/AGdu7{fҚ^v yʶހ=>x1G˜ؒ=s ¨e*15M<}MtTE?p97cc뺳ORW&=UԞ%KXY̊t=ڙ 7 )o#6:-%ۍ eT>䎌MC6Kf;u$ӸJkƭw`V@3ԦpU^N_W RHD,}?E@J"f(\ؼ! a̷1җ4q3FõnƱ L(NS3NXe5߶"j#JVMбĿ^NkUezJ=:Dqo5j2D@,Ra1O>wۤ"igm3HaDTb{ZTy!! j:r eC?ۼ6BA_3~%{~@m>zf3=I;oS+E8c<Q߭_=7ʢSDXfzv)njIӤڗ 8[s\RuW&sCo`=l 2n{ZE zMo-uD?omo-,k#6^|:LNvg^'OPK!H| pyntc-0.0.9.dist-info/RECORDɒF@/@bhB}כiKjܨʬwLzR@ws84P4{/Ȼ`B@hJ ۶jj@RՏvJmD7ݻA(NEXn_ A]0FrN)5]б M8B#+p]g[Do W}@c\E@\%GlYmx8)XeF(fnêsjtBTI̅U&!oS 'H^ ;@P;kSТyB-+I«'މE" ME9? npu3 k}_}PlݦEea(\G:beXDXGIW).l!Ȃ%9 DMbn:\o-9 -;i?CԿW>Kɮ_4.,U<6!|F}V?9rVYN r9Srԑ]ڤދ }_ϫȡ]Zhbc#T&)w9Ќ""yoG)?!jpżCmIL-=dS<]۾MҖ(F>fgp۟PKAr|M!be e pyntc/__init__.pyPKce|M2k k  pyntc/errors.pyPKce|M,pyntc/data_model/__init__.pyPKce|M4^O fpyntc/data_model/converters.pyPKce|M%Z#pyntc/data_model/key_maps/__init__.pyPKce|MVK)#pyntc/data_model/key_maps/eos_key_maps.pyPKce|Mui)j%pyntc/data_model/key_maps/ios_key_maps.pyPKce|MSqqQ&pyntc/devices/__init__.pyPKce|M`+b00(pyntc/devices/asa_device.pyPKce|MN 11Ypyntc/devices/base_device.pyPKce|M~##pyntc/devices/eos_device.pyPKce|M9hpyntc/devices/system_features/file_copy/base_file_copy.pyPKce|M] 8kpyntc/devices/system_features/file_copy/eos_file_copy.pyPKce|M/Rwpyntc/devices/system_features/vlans/__init__.pyPKce|M31wpyntc/devices/system_features/vlans/base_vlans.pyPKce|MVp0ypyntc/devices/system_features/vlans/eos_vlans.pyPKce|M pyntc/devices/tables/__init__.pyPKce|M%Epyntc/devices/tables/jnpr/__init__.pyPKce|M+ξ%pyntc/devices/tables/jnpr/loopback.pyPKce|M>n~&pyntc/devices/tables/jnpr/loopback.ymlPKce|MӷSbpyntc/templates/__init__.pyPKce|Mn%n19pyntc/templates/cisco_asa_show_interface.templatePKce|M9=kk/Tpyntc/templates/cisco_asa_show_version.templatePKce|MͲ4 pyntc/templates/cisco_ios_show_ip_int_brief.templatePKce|M\U/pyntc/templates/cisco_ios_show_version.templatePKce|M!,Cpyntc/templates/cisco_ios_show_vlan.templatePK!Hd BUcpyntc-0.0.9.dist-info/WHEELPK!H{$ pyntc-0.0.9.dist-info/METADATAPK!H| tpyntc-0.0.9.dist-info/RECORDPK$$ `