PK$OH#i MMnetlink/__init__.pyimport collections import errno import ipaddress import platform import socket import warnings import netlink.low.netif import netlink.low.netlink import netlink.low.ethtool # In case somebody tries this… if platform.system() != "Linux": warnings.warn("This is a Linux-only library, running it on another platform will very likely fail") class NetworkDevice: def __init__(self, index, name, network): self._index = index self._name = name self._network = network self._low_netlink = network._low_netlink self._low_ethtool = netlink.low.ethtool.ETHTool("", network._low_netif) def __eq__(self, device): if not isinstance(device, self.__class__): return hash(device) == self.index elif self._index is not None: return device._index == self._index else: return device._name == self._name def __hash__(self): return self.index @property def index(self): if self._index is None: self._index = self._network.device_name2index(self._name) return self._index @property def name(self): if self._name is None: self._name = self._network.device_index2name(self._index) return self._name def add_address(self, address, *args, **kwargs): """ Add a new IP address to this interface Parameters ---------- address : `str` | :func:`ipaddress.ip_interface` .. _device-add-address-address: An IP address (optionally) including the network prefix The string must be in the form of ``address/prefix_len``. Where ``address`` is an IP address in the form of ``a.b.c.d`` or ``a:b:c:d:e::f`` and ``prefix_len`` is the number of network address bits. If ``prefix_len`` is omitted then a prefix length of 32 bits for IPv4 and 128 bits for IPv6 will be assumed. flags : collections.abc.Sequence .. _device-add-address-flags: A list of flags to enable for this address Currently the following flags are known: * ``'HomeAddress'``: (IPv6 only) Designate this address the "home address" (as defined in :rfc:`6275#section-6.3`). * ``'ManageTempAddr'``: (IPv6 only) Make the kernel manage temporary addresses created from this one as template on behalf of Privacy Extensions (:rfc:`3041`). For this to become active, the ``use_tempaddr`` sysctl setting has to be set to a value greater than zero and the given address needs to have a prefix length of exactly ``64``. This flag allows to use privacy extensions in a manually configured network, just like if stateless auto-configuration was active. * ``'NoDAD'``: (IPv6 only) Do not perform Duplicate Address Detection (:rfc:`4862#section-5.4`). * ``'NoPrefixRoute'``: Do not automatically create a route for the network prefix of the added address, and don't search for one to delete when removing the address. Modifying an address to add this flag will remove the automatically added prefix route, modifying it to remove this flag will create the prefix route automatically. scope : int|str .. _device-add-address-scope: The "distance" from this host to the next host on the network represented by `address` If this is a string is must have one of the following values: * ``'Universe'``: (``0``) Address points to some host far, far away (default) * ``'Site'``: (``200``) Address points to a host that is located somewhere on the same network area (like a server in the next room of the same data center) * ``'Link'``: (``253``) Address points to a host that is directly connected to this network host (using this network link) * ``'Host'``: (``254``) Address points to this host (loopback address) * ``'Nowhere'``: (``255``) Reserved for non-existing destinations Intermediate (numeric) values, that are not listed here, are also possible. This can be used, for instance, to declare interior routes between ``'Universe'`` and ``'Link'``. peer : `str` | :func:`ipaddress.ip_address` .. _device-add-address-peer: The address of the remote endpoint for point-to-point interfaces broadcast : `str` | :func:`ipaddress.ip_address` .. _device-add-address-broadcast: The broadcast address on the interface The broadcast address is the address used when your host wants to comunicate with all devices on the local network. anycast : `str` | :func:`ipaddress.ip_address` .. _device-add-address-anycast: label : str .. _device-add-address-label: Custom label that may be used to tag individual addresses The label may only be up to 16 characters in length. valid_lft : int .. _device-add-address-valid-lft: The valid lifetime (in seconds) of this address When this value reaches ``0``, then this address is removed by the kernel. For details on lifetime handling please read :rfc:`4862#section-5.5.4` preferred_lft : int .. _device-add-address-preferred-lft: The preferred lifetime (in seconds) of this address When this value reaches ``0``, then this address won't be used for any new outgoing connections. This value must always be smaller than ``valid_lft``. For details on lifetime handling please read :rfc:`4862#section-5.5.4`. """ self._change_address('NewAddr', ('CREATE', 'EXCL'), address, *args, **kwargs) def modify_address(self, address, *args, create = False, **kwargs): """ Modify the flags of an existing IP address Parameters ---------- create : bool Add a new address to this interface, if no matching address exists See :meth:`~netlink.NetworkDevice.add_address` for a description of other possible parameters. """ nl_flags = ('CREATE', 'REPLACE') if create else ('REPLACE',) self._change_address('NewAddr', nl_flags, address, *args, **kwargs) def remove_address(self, address): """ Remove the given IP address from this interface Parameters ---------- address : `str` | :func:`ipaddress.ip_interface` An IP address (optionally) including the network prefix The string must be in the form of ``address/prefix_len``. Where ``address`` is an IP address in the form of ``a.b.c.d`` or ``a:b:c:d:e::f`` and ``prefix_len`` is the number of network address bits. If ``prefix_len`` is omitted then a prefix length of 32 bits for IPv4 and 128 bits for IPv6 will be assumed. """ self._change_address('DelAddr', (), address) def _change_address(self, nl_cmd, nl_flags, address, flags = (), scope = 0, peer = None, broadcast = None, anycast = None, label = None, valid_lft = 0xFFFFFFFF, preferred_lft = 0xFFFFFFFF): sock = self._low_netlink # Parse given IP network interface address address = ipaddress.ip_interface(address) family = socket.AF_INET if address.version == 4 else socket.AF_INET6 # Create message header message = netlink.low.netlink.AddressRequest(sock, nl_cmd, nl_flags + ('REQUEST',)) message.add_header(family, self.index, address.network.prefixlen, flags, scope) message.add_attribute('LOCAL', address.packed) # Set peer address (must be the same as `address` if not set to something else) if peer: peer = ipaddress.ip_address(peer) assert address.version == peer.version message.add_attribute('ADDRESS', peer.packed) else: message.add_attribute('ADDRESS', address.packed) # Set broadcast address if broadcast: broadcast = ipaddress.ip_address(broadcast) assert address.version == broadcast.version message.add_attribute('BROADCAST', broadcast.packed) # Set anycast address if anycast: anycast = ipaddress.ip_address(anycast) assert address.version == anycast.version message.add_attribute('ANYCAST', anycast.packed) # Set label address if label: label = label.encode('utf-8') assert len(label) <= 16 message.add_attribute('LABEL', label) # Set cache information (maximum/valid and preferred lifetime) if valid_lft < 0xFFFFFFFF or preferred_lft < 0xFFFFFFFF: assert preferred_lft <= valid_lft, "preferred_lft is greater than valid_lft" message.add_attribute('CACHEINFO', {"preferred": preferred_lft, "valid": valid_lft}) message.send() def get_interface(self): """ """ request = netlink.low.netlink.InterfaceRequest(self._low_netlink, 'GetLink', ('Request',)) request.add_header(socket.AF_INET, self.index) (message, attributes) = request.send()[0] return self._process_interface(message, attributes) _BROKEN_LINK_ATTRIBUTES = ( 'WIRELESS', 'MAP', 'LINKINFO', 'LINKMODE', 'STATS', 'TXQLEN', 'NUM_VF', 'VFINFO_LIST', 'STATS64', 'VF_PORTS', 'PORT_SELF', 'CARRIER', 'CARRIER_CHANGES' ) #XXX: _BROKEN_LINK_ATTRIBUTES = () @classmethod def _process_interface(self, message, attributes): # Add message properties as interface attributes attributes['FAMILY'] = message.family attributes['INDEX'] = message.index attributes['TYPE'] = message.TYPE.find(message.type) attributes['FLAGS'] = message.FLAG.find_bitmask(message.flags) # Remove all attributes that we cannot parse properly yet # (We don't want to expose half-broken stuff in the high-level API.) for name in list(attributes.keys()): if not isinstance(name, str) or name in self._BROKEN_LINK_ATTRIBUTES: del attributes[name] return attributes def get_addresses(self): """ Obtain all network addresses associated with this interface Returns a dictionary of network addresses and a dictionary of attributes of each address. **List of known attributes:** * ``'ADDRESS'``: (*str*) – The network address itself * ``'LOCAL'``: (*str*) – The local network address Unless this device is connected to a network tunnel this will always be the same as the value in ``'ADDRESS'``. * ``'PREFIXLEN'`` (*int*) – The length of the common network prefix in bits * ``'FAMILY'`` (*int*) – The address family value as integer Use the ``AF_*`` constants from the :mod:`socket` module to determine the address family type. * ``'FLAGS'`` (*list*) – Flags set on the interface See the :ref:`flags parameter ` of :meth:`~netlink.NetworkDevice.add_address` for a list of possible values. * ``'INDEX'`` (*int*) – The numerical index of the network device containing this address * ``'SCOPE'`` (*str* | *int*) – Network address *distance* See the :ref:`scope parameter ` of :meth:`~netlink.NetworkDevice.add_address` for details. * ``'LABEL'`` (*str*) – Custom free-text label attached to this address See the :ref:`label parameter ` of :meth:`~netlink.NetworkDevice.add_address` for information. * ``'CACHEINFO'`` (*dict*) – Lifetime related attributes of this address: * ``'preferred'`` (*int*): Number of seconds that this address will be considered new outgoing connections See the :ref:`preferred_lft parameter ` of :meth:`~netlink.NetworkDevice.add_address` for details. * ``'valid'`` (*int*): Number of secons before this address will be removed entirely See the :ref:`valid_lft parameter ` of :meth:`~netlink.NetworkDevice.add_address` for details. * ``'cstamp'`` (*int*) * ``'tstamp'`` (*int*) Example result of using :meth:`~netlink.NetworkDevice.get_addresses` on the loopback interface (``lo``): .. code-block:: python { '127.0.0.1': { 'ADDRESS': '127.0.0.1', 'LOCAL': '127.0.0.1', 'PREFIXLEN': 8, 'FAMILY': 2, 'FLAGS': ['PERMANENT'], 'INDEX': 1, 'SCOPE': 'HOST', 'LABEL': 'lo', 'CACHEINFO': {'cstamp': 602, 'preferred': 4294967295, 'tstamp': 1407340, 'valid': 4294967295} }, '::1': { 'ADDRESS': '::1', 'PREFIXLEN': 128, 'FAMILY': 10, 'FLAGS': ['PERMANENT'], 'INDEX': 1, 'SCOPE': 'HOST', 'CACHEINFO': {'cstamp': 602, 'preferred': 4294967295, 'tstamp': 602, 'valid': 4294967295} } } See the :ref:`print_all example ` for a more elaborate code sample. Returns ------- dict """ sock = self._low_netlink request = netlink.low.netlink.AddressRequest(sock, 'GetAddr', ('Request', 'Dump')) request.add_header(socket.AF_UNSPEC, self.index, 0) request.add_attribute('LOCAL', b'\x7F\x00\x00\x01') response = {} for message, attributes in request.send(): if message.index != self.index or 'ADDRESS' not in attributes: continue attributes = self._process_addresses(message, attributes) response[attributes['ADDRESS']] = attributes return response @classmethod def _process_addresses(self, message, attributes): # Add message properties as interface attributes attributes['FAMILY'] = message.family attributes['INDEX'] = message.index attributes['PREFIXLEN'] = message.prefixlen attributes['SCOPE'] = message.SCOPE.find(message.scope) attributes['FLAGS'] = message.FLAG.find_bitmask(message.flags) # Remove all attributes that we cannot parse properly yet # (We don't want to expose half-broken stuff in the high-level API.) for name in list(attributes.keys()): if not isinstance(name, str): del attributes[name] return attributes def get_statistics(self): """ Retrieve all available statistical information for the associated network interface Returns ------- dict """ try: self._low_ethtool.set_device_name(self.name) # Get list of statistic labels for device strings = self._low_ethtool.get_stringset(netlink.struct.ethtool.SSetInfo.STATS) # Retrieve the actual statistical values from kernel import ctypes class StatsWithBuf(ctypes.Structure): _fields_ = ( ('hdr', netlink.struct.ethtool.Stats), ('buf', ctypes.c_uint64 * len(strings)) ) stats = StatsWithBuf() stats.hdr.cmd = netlink.struct.ethtool.Stats.CMD stats.hdr.n_stats = len(strings) self._low_ethtool.ioctl(stats) return collections.OrderedDict(zip(strings, stats.buf)) except OSError as e: if e.errno == errno.ENOTSUP: # Operation not supported return collections.OrderedDict() else: raise class Network: def __init__(self): self._low_netif = netlink.low.netif.NetworkDeviceSocket() self._low_netlink = netlink.low.netif.RTNetlinkSocket() def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def close(self): self._low_netif.close() self._low_netlink.close() def device(self, identifier): """ Obtain `NetworkDevice` instance for this network interface client Parameters ---------- identifier : int|str The name or index of the network device Returns ------- :class:`~netlink.NetworkDevice` """ # Convert string network device names to network device indexes if isinstance(identifier, str): return NetworkDevice(None, identifier, self) else: return NetworkDevice(identifier, None, self) def device_index2name(self, index): """ Obtain the name for the network device with the given interface number Parameters ---------- index : int The interface number of the network device Raises ------ OSError System error reported during look up This usually means that there is no device with such number or that the interface does not have a name (since it is in another container for instance). Returns ------- str """ request = self._low_netif.create_request("") request.ifru.ivalue = index # GEIfName: Get Interface Name self._low_netif.ioctl("GIfName", request) return request.ifrn_name.decode("ascii") def device_name2index(self, name): """ Obtain the index of the network device with the given interface name Parameters ---------- name : str The name of the network interface Raises ------ OSError System error reported during look up This usually means that there is no device with such number or that the interface does not have a name (since it is in another container for instance). Returns ------- int """ request = self._low_netif.create_request(name) # GIfIndex: Get Interface Index self._low_netif.ioctl("GIFIndex", request) return request.ifru.ivalue def get_interfaces(self): """ """ sock = self._low_netlink request = netlink.low.netlink.InterfaceRequest(sock, 'GetLink', ('Request', 'Dump')) request.add_header(socket.AF_INET, 0) information = collections.OrderedDict() for message, attributes in request.send(): # Unify network interface information attributes = NetworkDevice._process_interface(message, attributes) if 'IFNAME' not in attributes: continue # Create network device instance device = NetworkDevice(attributes['INDEX'], attributes['IFNAME'], self) # Store network device instance with its attributes information[device] = attributes return information def get_addresses(self): """ Get all network addresses of all devices Returns ------- collections.OrderedDict Dictionary of device objects and addresses *Keys* (:class:`~netlink.NetworkDevice`): :class:`~netlink.NetworkDevice` objects, each representing a network interface *Values* (:class:`collections.OrderedDict`): Network address to attribute mappings as described in the documentation of the :meth:`netlink.NetworkDevice.get_addresses` method """ sock = self._low_netlink request = netlink.low.netlink.AddressRequest(sock, 'GetAddr', ('Request', 'Dump')) request.add_header(socket.AF_UNSPEC, 0, 0) information = collections.OrderedDict() for message, attributes in request.send(): if 'ADDRESS' not in attributes: continue # Look for network device instance for device index referenced in the current message device = None for device in information.keys(): if device.index == message.index: break if not device or device.index != message.index: device = NetworkDevice(message.index, None, self) if device not in information: information[device] = collections.OrderedDict() attributes = NetworkDevice._process_addresses(message, attributes) information[device][attributes['ADDRESS']] = attributes return information def get_interfaces_with_addresses(self): """ Get all network device along with their interface attributes and their addresses This is a convinience method that merges the results of :meth:`~netlink.Network.get_addresses` and :meth:`~netlink.Network.get_interfaces`. This method is a lot more efficient to use when performing look up of both network interface and network address information compared to using :meth:`~netlink.Network.get_interfaces` to obtain the network device list and then calling :meth:`netlink.NetworkDevice.get_addresses` on each result. """ # Obtain all interface information interfaces = self.get_interfaces() # Obtain all known address entries address_info = self.get_addresses() # Insert address list for every interface in its attributes for device, addresses in address_info.items(): interfaces[device]["ADDRESSES"] = addresses # Add address slot in every interface attribute dict for device, attributes in interfaces.items(): if "ADDRESSES" not in attributes: attributes["ADDRESSES"] = collections.OrderedDict() return interfacesPKOH^Qbnetlink/low/ethtool.pyimport ctypes import netlink.struct.ethtool class ETHTool: def __init__(self, device_name, netif): self.netif = netif self.request = netif.create_request(device_name) self._cache = {} def set_device_name(self, device_name): self.request.ifrn_name = device_name.encode('ascii') def ioctl(self, command): """ Send the given ETHTool command to kernel using the """ self.request.ifru.data = ctypes.addressof(command) self.netif.ioctl("ethtool", self.request) def get_stringset(self, set_id): """ Retrieve a list of available strings for the given `set_id` from the kernel """ if set_id not in self._cache: class SSetInfoWithBuf(ctypes.Structure): _fields_ = [ ('hdr', netlink.struct.ethtool.SSetInfo), ('buf', ctypes.c_uint32 * 1), ] sset_info = SSetInfoWithBuf() # Ask kernel for number of strings sset_info.hdr.cmd = netlink.struct.ethtool.SSetInfo.CMD sset_info.hdr.sset_mask = 1 << set_id self.ioctl(sset_info) length = sset_info.buf[0] # Actually retrieve the list of strings from the kernel class GStringsWithBuf(ctypes.Structure): _fields_ = ( ('hdr', netlink.struct.ethtool.GStrings), ('buf', ctypes.c_char * netlink.struct.ethtool.GStrings.LENGTH * length) ) strings = GStringsWithBuf() strings.hdr.cmd = netlink.struct.ethtool.GStrings.CMD strings.hdr.string_set = set_id strings.hdr.len = length if strings.hdr.len > 0: self.ioctl(strings) # Extract list of names from result array2string = lambda ca: ctypes.cast(ca, ctypes.c_char_p).value.decode('ascii') names = list(map(array2string, strings.buf)) self._cache[set_id] = names return self._cache[set_id] PKrNHU@::netlink/low/netlinkattrib.pyimport ctypes import math import ipaddress import socket import sys import netlink.struct.netlink class AttributeDecoder: AttributeType = netlink.struct.netlink.Attribute types = {} @staticmethod def ALIGN(size): return math.ceil(size / 4) * 4 def __init__(self, message = None): self.message = message self.decoders = {} self.encoders = {} for prop in dir(self): if prop.startswith('attribute_decode_'): type = prop.split('_', 2)[2] func = getattr(self, prop) self.decoders[type] = func if prop.startswith('attribute_encode_'): type = prop.split('_', 2)[2] func = getattr(self, prop) self.encoders[type] = func def attribute_decode_str(self, data): """Decode a C byte string to a Python string""" return data[:-1].decode('utf-8') def attribute_encode_str(self, string): """Encode a Python string to a C byte string""" return string.encode('utf-8') + b'\x00' def attribute_decode_bool(self, data): """Decode boolean value from a binary 8-bit number""" return bool(self.attribute_decode_int_u8(data)) def attribute_encode_bool(self, state): """Encode boolean value to a binary 8-bit number""" return self.attribute_encode_int_u8(int(state)) def attribute_decode_int(self, data): """Decode a binary integer in host byte order into a Python int""" return int.from_bytes(data, sys.byteorder) attribute_decode_int_u8 = attribute_decode_int attribute_decode_int_u16 = attribute_decode_int attribute_decode_int_u32 = attribute_decode_int attribute_decode_int_u64 = attribute_decode_int def attribute_encode_int_u8(self, number): return int(number).to_bytes(1, sys.byteorder) def attribute_encode_int_u16(self, number): return int(number).to_bytes(2, sys.byteorder) def attribute_encode_int_u32(self, number): return int(number).to_bytes(4, sys.byteorder) def attribute_encode_int_u64(self, number): return int(number).to_bytes(8, sys.byteorder) def decode_address(self, data, family): """ Decode a network address The address will be decoded to an IPv4, IPv6 or MAC address string depending on its length and the given address family value. Parameters ---------- data : bytes Binary representation of the address family : int The address family that this address (probably) belongs to Returns ------- str """ if len(data) == 4 and family == socket.AF_INET: return str(ipaddress.IPv4Address(data)) elif len(data) == 16 and family == socket.AF_INET6: return str(ipaddress.IPv6Address(data)) else: return ':'.join("{0:02X}".format(b) for b in data) def encode_address(self, address, family): if family == socket.AF_INET: return ipaddress.IPv4Address(address).packed elif family == socket.AF_INET6: return ipaddress.IPv6Address(address).packed else: hex_bytes = address.translate({ord(':'): None, ord('-'): None}) return int(hex_bytes, 16).to_bytes(len(hex_bytes) / 2, sys.byteorder) def decode_single_type(self, type): """ Convert the given type integer constant to (hopefully) a human-readable string Parameters ---------- type : int The type constant as Python ``int`` Returns ------- :func:`int` | :func:`str` """ if hasattr(self.AttributeType, 'TYPE'): return self.AttributeType.TYPE.find(type) else: return type def encode_single_type(self, type): if hasattr(self.AttributeType, 'TYPE'): return self.AttributeType.TYPE.get(type) else: return type def decode_single(self, type, data): """ Convert the given type and binary data to Pythonic, human-readable values This method calls :meth:`~netlink.low.netlinkattrib.AttributeDecoder.decode_single_type` internally to decode the ``type`` parameter. Parameters ---------- type : int The type constant as Python ``int`` data : bytes Binary data produced by the kernel Returns ------- ( :func:`int` | :func:`str` , :func:`bytes` | :func:`object` ) """ name = self.decode_single_type(type) # Determine decoder name for this attribute type decoder_name = None if name in self.types: decoder_name = self.types[name] # Determine decoding function for this attribute type decoder_func = lambda data: data if decoder_name in self.decoders: decoder_func = self.decoders[decoder_name] # Do the decoding value = decoder_func(data) return (name, value) def encode_single(self, name, value): if isinstance(value, bytes): type = self.encode_single_type(name) return (type, value) else: # Determine decoder name for this attribute type encoder_name = None if name in self.types: encoder_name = self.types[name] # Determine encoding function for this attribute type encoder_func = lambda data: data if encoder_name in self.encoders: encoder_func = self.encoders[encoder_name] # Do the encoding type = self.encode_single_type(name) data = bytes(encoder_func(value)) return (type, data) def decode(self, data): """ Decode the given attribute stream into a dictionary Parameters ---------- data : bytes Byte voodo, as produced by the Linux kernel Returns ------- dict """ offset = 0 attribute = self.AttributeType() attributes = {} while (offset + ctypes.sizeof(attribute)) <= len(data): offset_payload = offset + math.ceil(ctypes.sizeof(attribute) / 4) * 4 # "Parse" attribute header ctypes.memmove( ctypes.addressof(attribute), data[offset : offset_payload], ctypes.sizeof(attribute) ) # Store attribute type and contents data_start = offset_payload data_end = offset_payload + attribute.len - (data_start - offset) if len(data) >= data_end: # Convert attribute type and data into human-friendly representation (name, value) = (attribute.type, data[data_start : data_end]) (name, value) = self.decode_single(name, value) attributes[name] = value # Move to next attribute offset += math.ceil(attribute.len / 4) * 4 return attributes def encode(self, attributes): """ Encode the given list of attibutes into a paltform-dependant attribute byte stream This is the opposite of the :meth:`~netlink.low.netlinkattrib.AttributeDecoder.decode` method and the assertion ``encode(decode(data)) == data`` is guaranteed in most cases. Parameters ---------- attributes : dict NetLink attribute list, as produced by the :meth:`~netlink.low.netlinkattrib.AttributeDecoder.decode` method Returns ------- bytes """ result = bytearray() for name, value in attributes.items(): (type, data) = self.encode_single(name, value) attribute = self.AttributeType() attribute.len = self.ALIGN(ctypes.sizeof(attribute)) + self.ALIGN(len(data)) attribute.type = type # Store header with alignment bytes result += bytes(attribute) result += b'\x00' * (self.ALIGN(ctypes.sizeof(attribute)) - ctypes.sizeof(attribute)) # Store content with alignment bytes result += bytes(data) result += b'\x00' * (self.ALIGN(len(data)) - len(data)) return result class AddressAttributeDecoder(AttributeDecoder): AttributeType = netlink.struct.netlink.AddressAttribute types = dict(AttributeDecoder.types) types['ADDRESS'] = 'address' # Peer address (or local address if there is no peer) types['LOCAL'] = 'address' # Actual device address types['LABEL'] = 'str' # Custom identification label for device types['BROADCAST'] = 'address' # Broadcast address types['ANYCAST'] = 'address' # Anycast address types['CACHEINFO'] = 'cacheinfo' # Remaining maximum and preferred time for use types['MULTICAST'] = 'address' # Multicast address types['FLAGS'] = 'flags' def attribute_decode_address(self, data): """ Decode a network address The address will be decoded to an IPv4, IPv6 or MAC address string depending on its length and the address family of the given address message. """ return self.decode_address(data, self.message.family) def attribute_encode_address(self, address): return self.encode_address(address, self.message.family) def attribute_decode_flags(self, data): """ Decode the flag list bits into a list of human-readable strings """ return self.AttributeType.FLAG.find_bitmask(self.attribute_decode_int_u32(data)) def attribute_encode_flags(self, flags): return self.attribute_encode_int_u32(self.AttributeType.FLAG.get_with_iter(flags)) def attribute_decode_cacheinfo(self, data): return netlink.struct.netlink.CacheInformation(data).to_dict() def attribute_encode_cacheinfo(self, cacheinfo): return netlink.struct.netlink.CacheInformation.from_dict(cacheinfo) class InterfaceAttributeDecoder(AttributeDecoder): AttributeType = netlink.struct.netlink.InterfaceAttribute types = dict(AttributeDecoder.types) types['ADDRESS'] = 'address' # Device MAC address types['BROADCAST'] = 'address' # Broadcast MAC address types['IFNAME'] = 'str' # Device interface name types['MTU'] = 'int_u32' # Maximum Transmission Unit types['LINK'] = 'int_u32' #XXX: Figure what this is types['QDISC'] = 'str' # Traffic Control 'Queuing DISCiplin' types['STATS'] = None #TODO: Parse this format? types['COST'] = 'int_u32' # Bridge interface routing cost types['PRIORITY'] = 'int_u16' # Bridge interface routing priority types['MASTER'] = 'int_u32' # Bridge master device index types['WIRELESS'] = None #TODO: Figure what this is (WEXT is deprecated) types['PROTINFO'] = 'protinfo' #XXX: Figure what this is types['TXQLEN'] = None #TODO: Transfer Queue Length types['MAP'] = None #TODO: Figure what this is types['WEIGHT'] = 'int_u8' #XXX types['OPERSTATE'] = 'operstate' # Operational state (DOWN, DORMANT, UP, …) types['LINKMODE'] = 'linkmode' #XXX: Figure what this is types['LINKINFO'] = 'linkinfo' #TODO: Figure what this is types['NET_NS_PID'] = 'int_u32' # Network NameSpace Process ID types['IFALIAS'] = 'str' #XXX: Figure what this is types['NUM_VF'] = None #TODO: Number of VFs if device is SR-IOV PF types['VFINFO_LIST'] = None #TODO: Parse this format? types['STATS64'] = None #TODO: Parse this format? types['VF_PORTS'] = None #TODO: Figure what this is types['PORT_SELF'] = 'portself' #TODO: Parse this format? types['AF_SPEC'] = 'int_u32' #XXX: Figure what this is types['GROUP'] = 'int_u32' # Group the device belongs to types['NET_NS_PID'] = 'int_u32' # Network NameSpace File Descriptor types['EXT_MASK'] = 'ext_mask' # Extended information mask types['PROMISCUITY'] = 'int_u32' # Promiscuity count: > 0 means acts PROMISC types['NUM_TX_QUEUES'] = 'int_u32' # Number of transfer queues types['NUM_RX_QUEUES'] = 'int_u8' # Number of receive queues types['CARRIER'] = None #TODO: Figure what this is types['PHYS_PORT_ID'] = 'int_u32' # ID of the physical network port types['CARRIER_CHANGES'] = None #TODO: Figure what this is types['PHYS_SWITCH_ID'] = 'int_u32' #XXX: Somehow related to PHYS_PORT_ID types['LINK_NETNSID'] = 'int_u32' #XXX: Figure what this is types['PHYS_PORT_NAME'] = 'str' #XXX: Somehow related to PHYS_PORT_ID types['PROTO_DOWN'] = 'bool' #XXX: Figure what this is def attribute_decode_address(self, data): """ Decode a network address The address will be decoded to an IPv4, IPv6 or MAC address string depending on its length and the type of link that it is present on. """ message_type = self.message.TYPE.find(self.message.type) if message_type in ('TUNNEL', 'SIT', 'IPGRE'): return self.decode_address(data, socket.AF_INET) elif message_type in ('TUNNEL6',): return self.decode_address(data, socket.AF_INET6) else: return self.decode_address(data, None) def attribute_encode_address(self, address): message_type = self.message.TYPE.find(self.message.type) if message_type in ('TUNNEL', 'SIT', 'IPGRE'): return self.encode_address(address, socket.AF_INET) elif message_type in ('TUNNEL6',): return self.encode_address(address, socket.AF_INET6) else: return self.encode_address(address, None) def attribute_decode_operstate(self, data): """ Decode the operational state of a device from a binary integer to its state name """ return self.AttributeType.OPER.find(self.attribute_decode_int_u8(data)) def attribute_encode_operstate(self, operstate): return self.attribute_encode_int_u8(self.AttributeType.OPER.get(data)) def attribute_decode_protinfo(self, data): """ Decode the bridge port state of a device from a binary integer to its state name """ return self.AttributeType.BRIDGE_STATE.find(self.attribute_decode_int_u32(data)) def attribute_encode_protinfo(self, protinfo): return self.attribute_encode_int_u32(self.AttributeType.BRIDGE_STATE.get(protinfo)) def attribute_decode_linkmode(self, data): return self.AttributeType.LINKMODE.find(self.attribute_decode_int_u8(data)) def attribute_encode_linkmode(self, linkmode): return self.attribute_encode_int_u8(self.AttributeType.LINKMODE.get(linkmode)) def attribute_decode_linkinfo(self, data): return LinkInfoDecoder(self.message).decode(data) def attribute_encode_linkinfo(self, linkinfo): return LinkInfoDecoder(self.message).encode(linkinfo) def attribute_decode_portself(self, data): return PortSelfDecoder(self.message).decode(data) def attribute_encode_portself(self, portself): return PortSelfDecoder(self.message).encode(portself) def attribute_decode_ext_mask(self, data): return self.AttributeType.EXT_MASK.find_bitmask(self.attribute_decode_int_u32(data)) def attribute_encode_ext_mask(self, ext_mask): return self.attribute_encode_int_u32(self.AttributeType.EXT_MASK.find_bitmask(ext_mask)) class LinkInfoDecoder(AttributeDecoder): """ TODO: Implement proper decoding (and figure out which attribute structure this is related to) """ types = dict(AttributeDecoder.types) class PortSelfDecoder(AttributeDecoder): """ TODO: Implement proper decoding (and figure out which attribute structure this is related to) """ types = dict(AttributeDecoder.types) def decode_single(self, type, data): # Do standard name decoding (type, value) = super().decode_single(type, data) # Decode value by running it through the attribute parser again #TODO: Implement proper decoding for the second round value = AttributeDecoder(self.message).decode(value) return (type, value) PKOH-))netlink/low/netlink.pyimport ctypes import errno import ipaddress import os import socket import sys import netlink.low.netlinkattrib import netlink.struct.netlink class NetLinkError(OSError): def __init__(self, code, message, response): super().__init__(code, message) self.response = response class NetLinkEOFError(NetLinkError): def __init__(self, message, response): super().__init__(errno.ENODATA, message, response) class NetLinkTruncatedError(NetLinkError): def __init__(self, message, response): super().__init__(errno.EMSGSIZE, message, response) class NetLinkInterruptedError(NetLinkError): def __init__(self, message, response): super().__init__(errno.EINTR, message, response) class NetLinkNotAcknowledged(NetLinkError): def __init__(self, message, response): super().__init__(errno.ENOMSG, message, response) class Request: AttributeCoderType = netlink.low.netlinkattrib.AttributeDecoder MessageType = None def __init__(self, netif, cmd, flags = (), pid = 0): self._netif = netif self.header = netlink.struct.netlink.MessageHeader() self.header.type = self.header.TYPE.get(cmd) self.header.pid = pid # Send messages to kernel (pid 0) by default self.header.len = self.AttributeCoderType.ALIGN(ctypes.sizeof(self.header)) self.header.flags = self.header.FLAG.get_with_iter(flags) self.coder = self.AttributeCoderType() self._buffer = bytearray() self._buffer += b'\x00' * (self.header.len - ctypes.sizeof(self.header)) def add_attributes(self, attributes): self._buffer += self.coder.encode(attributes) return self def add_attribute(self, type, data = None): return self.add_attributes({type: data}) def add_raw(self, data): # Store given raw data self._buffer += data # Store additional alignment bytes (header alignment is responsibility of the caller) self._buffer += b'\x00' * (self.coder.ALIGN(len(data)) - len(data)) return self def send(self, require_acknowledgement = True): pid = self._netif.sockname[0] self.header.seq = self._netif.next_sequence_number() self.header.len = ctypes.sizeof(self.header) + len(self._buffer) # Prepare acknowledgement response tracking acknowledged = False if require_acknowledgement: self.header.flags |= self.header.FLAG.ACK # Send assembled request packet data = bytearray(self.header.len) data[:ctypes.sizeof(self.header)] = bytes(self.header) data[ctypes.sizeof(self.header):] = self._buffer self._netif.sendmsg(data) response = [] finished = False while not finished: try: (data, _, msg_flags, _) = self._netif.recvmsg() except OSError as e: if e.errno == errno.EINTR or e.errno == errno.EAGAIN: # Read was interrupted by signal – retry continue else: # Serious error during `recvmsg` raise if (msg_flags & socket.MSG_TRUNC) != 0: # NetLink message was too big for buffer raise NetLinkTruncatedError("Truncated NetLink message", response) if len(data) < 1: # Empty NetLink message received (may never happen) raise NetLinkEOFError("EOF on NetLink read", response) # Iterate of all response messages offset = 0 header = netlink.struct.netlink.MessageHeader() while (offset + ctypes.sizeof(header)) <= len(data): # Populate header with next data set ctypes.memmove(ctypes.addressof(header), data[offset:], ctypes.sizeof(header)) offset_payload = offset + self.coder.ALIGN(ctypes.sizeof(header)) if header.pid != pid or header.seq != self.header.seq: # Skip responses that were not intended for us offset += header.len if not hasattr(self, '_allow_recv_other_destination'): assert False continue if (header.flags & header.FLAG.DUMP_INTR) != 0: raise NetLinkInterruptedError("Message interrupted", response) if header.type == header.RESPONSE.ERROR: # Whoops error = netlink.struct.netlink.MessageError() if (len(data) - offset_payload) < ctypes.sizeof(error): # Sanity check to make sure the error message was not truncated raise NetLinkTruncatedError("Truncated NetLink error message", response) ctypes.memmove(ctypes.addressof(error), data[offset_payload:], ctypes.sizeof(error)) if error.error == 0: # Acknowledgement received acknowledged = True finished = True # Move to next message offset += header.len else: message = "RTNETLINK answers: {0}".format(os.strerror(-error.error)) raise NetLinkError(-error.error, message, response) elif header.type == header.RESPONSE.DONE: finished = True break else: data_start = offset_payload data_end = offset_payload + header.len - (offset_payload - offset) response.append(self.parse_response(data[data_start : data_end])) # Any received message is also an acknowledgement that data has been sent acknowledged = True if (header.flags & header.FLAG.MULTI) == 0: # Single message only – don't expect any DONE response message finished = True break # Move to next message offset += self.coder.ALIGN(header.len) if not acknowledged and require_acknowledgement: raise NetLinkNotAcknowledged("Missing acknowledgement", response) return response @classmethod def parse_response(cls, data): message = cls.MessageType() ctypes.memmove(ctypes.addressof(message), data, ctypes.sizeof(message)) # Add attribute decoder to message for attribute parsing later on offset = cls.AttributeCoderType.ALIGN(ctypes.sizeof(message)) attributes = cls.AttributeCoderType(message).decode(data[offset:]) return message, attributes class AddressRequest(Request): AttributeCoderType = netlink.low.netlinkattrib.AddressAttributeDecoder MessageType = netlink.struct.netlink.AddressMessage def add_header(self, family, dev_index, prefixlen, flags = 0, scope = 0): # Create and populate address message header message = self.MessageType() message.family = family message.prefixlen = prefixlen message.scope = message.SCOPE.get(scope) message.index = dev_index # Make attributes from the extended header available to the message coder self.coder.message = message flags = message.FLAG.get_with_iter(flags) # The flags list eventually got larger than 1 byte so a custom u32 attribute has to be added # if any extra flags need to be sent if (flags & 0xFF) != flags: self.add_raw(bytes(message)) flags = int(flags).to_bytes(4, sys.byteorder) return self.add_attribute(netlink.struct.netlink.Attribute.TYPE.FLAGS, flags) else: message.flags = flags return self.add_raw(bytes(message)) class InterfaceRequest(Request): AttributeCoderType = netlink.low.netlinkattrib.InterfaceAttributeDecoder MessageType = netlink.struct.netlink.InterfaceMessage def add_header(self, family, dev_index, type = 0, flags = 0, change = 0): # Create and populate interface message header message = self.MessageType() message.family = family message.type = message.TYPE.get(type) message.index = dev_index message.flags = message.FLAG.get_with_iter(flags) message.change = message.FLAG.get_with_iter(change) # Make attributes from the extended header available to the message coder self.coder.message = message return self.add_raw(bytes(message)) PKA#H+netlink/low/netif.pyimport ctypes import fcntl import socket import time import netlink.struct.netif class Socket: ########################################### # Public ioctl() calls (from sys/ioctl.h) # ########################################### # Routing table calls SIOCADDRT = 0x890B # add routing table entry SIOCDELRT = 0x890C # delete routing table entry SIOCRTMSG = 0x890D # call to routing system # Socket configuration controls SIOCGIFNAME = 0x8910 # get iface name SIOCSIFLINK = 0x8911 # set iface channel SIOCGIFCONF = 0x8912 # get iface list SIOCGIFFLAGS = 0x8913 # get flags SIOCSIFFLAGS = 0x8914 # set flags SIOCGIFADDR = 0x8915 # get PA address SIOCSIFADDR = 0x8916 # set PA address SIOCGIFDSTADDR = 0x8917 # get remote PA address SIOCSIFDSTADDR = 0x8918 # set remote PA address SIOCGIFBRDADDR = 0x8919 # get broadcast PA address SIOCSIFBRDADDR = 0x891a # set broadcast PA address SIOCGIFNETMASK = 0x891b # get network PA mask SIOCSIFNETMASK = 0x891c # set network PA mask SIOCGIFMETRIC = 0x891d # get metric SIOCSIFMETRIC = 0x891e # set metric SIOCGIFMEM = 0x891f # get memory address (BSD) SIOCSIFMEM = 0x8920 # set memory address (BSD) SIOCGIFMTU = 0x8921 # get MTU size SIOCSIFMTU = 0x8922 # set MTU size SIOCSIFNAME = 0x8923 # set interface name SIOCSIFHWADDR = 0x8924 # set hardware address SIOCGIFENCAP = 0x8925 # get/set encapsulations SIOCSIFENCAP = 0x8926 SIOCGIFHWADDR = 0x8927 # Get hardware address SIOCGIFSLAVE = 0x8929 # Driver slaving support SIOCSIFSLAVE = 0x8930 SIOCADDMULTI = 0x8931 # Multicast address lists SIOCDELMULTI = 0x8932 SIOCGIFINDEX = 0x8933 # name -> if_index mapping SIOCSIFPFLAGS = 0x8934 # set/get extended flags set SIOCGIFPFLAGS = 0x8935 SIOCDIFADDR = 0x8936 # delete PA address SIOCSIFHWBROADCAST = 0x8937 # set hardware broadcast addr SIOCGIFCOUNT = 0x8938 # get number of devices SIOCETHTOOL = 0x8946 SIOCGIFBR = 0x8940 # Bridging support SIOCSIFBR = 0x8941 # Set bridging options SIOCGIFTXQLEN = 0x8942 # Get the tx queue length SIOCSIFTXQLEN = 0x8943 # Set the tx queue length # ARP cache control calls SIOCDARP = 0x8953 # delete ARP table entry SIOCGARP = 0x8954 # get ARP table entry SIOCSARP = 0x8955 # set ARP table entry # RARP cache control calls SIOCDRARP = 0x8960 # delete RARP table entry SIOCGRARP = 0x8961 # get RARP table entry SIOCSRARP = 0x8962 # set RARP table entry # Driver configuration calls SIOCGIFMAP = 0x8970 # Get device parameters SIOCSIFMAP = 0x8971 # Set device parameters # DLCI configuration calls SIOCADDDLCI = 0x8980 # Create new DLCI device SIOCDELDLCI = 0x8981 # Delete DLCI device def __init__(self, connection): self._connection = connection # Store sequence counter for NetLink self._seq = int(time.time()) & 0xFFFFFFFF def __del__(self): self._connection.close() def __enter__(self): return self def __exit__(self, type, value, traceback): self._connection.close() @property def bufsize(self): return self._bufsize @bufsize.setter def bufsize(self, bufsize): self._bufsize = bufsize self._connection.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) self._connection.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) @property def sockname(self): return self._connection.getsockname() def ioctl(self, reqtype, request): if isinstance(reqtype, str): reqtype = getattr(self, "SIOC{0}".format(reqtype.upper())) fcntl.ioctl(self._connection.fileno(), reqtype, ctypes.addressof(request)) def next_sequence_number(self): self._seq += 1 self._seq &= 0xFFFFFFFF return self._seq def recvmsg(self, ancbufsize = 0, flags = 0): return self._connection.recvmsg(self.bufsize, ancbufsize, flags) def sendmsg(self, buffer, ancdata = (), flags = 0, address = None): return self._connection.sendmsg((buffer,), ancdata, flags, address) def close(self): self._connection.close() class NetworkDeviceSocket(Socket): def __init__(self): connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_IP) super().__init__(connection) @classmethod def create_request(cls, device_name): """ Create kernel network interface request structure """ request = netlink.struct.netif.RequestData() request.ifrn_name = device_name.encode('ascii') return request class RTNetlinkSocket(Socket): def __init__(self, subscriptions = 0): """ """ connection = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE) connection.bind((0, subscriptions)) super().__init__(connection) # Increase send and receive buffer size self.bufsize = 32768 PKg#HE.mnetlink/struct/ethtool.pyimport ctypes class GStrings(ctypes.Structure): CMD = 0x0000001b LENGTH = 32 _fields_ = [ ('cmd', ctypes.c_uint32), ('string_set', ctypes.c_uint32), ('len', ctypes.c_uint32), ] class SSetInfo(ctypes.Structure): CMD = 0x00000037 STATS = 1 _fields_ = [ ('cmd', ctypes.c_uint32), ('reserved', ctypes.c_uint32), ('sset_mask', ctypes.c_uint64), ] class Stats(ctypes.Structure): CMD = 0x0000001d _fields_ = [ ('cmd', ctypes.c_uint32), ('n_stats', ctypes.c_uint32), ] PK"H/jE,E,netlink/struct/netlink.pyimport ctypes import netlink.struct.misc import netlink.struct.routing class SocketAddress(ctypes.Structure): _fields_ = [ ('family', ctypes.c_ushort), ('pad', ctypes.c_ushort), ('pid', ctypes.c_uint32), ('groups', ctypes.c_uint32), ] class MessageHeader(ctypes.Structure): _fields_ = [ ('len', ctypes.c_uint32), ('type', ctypes.c_uint16), ('flags', ctypes.c_uint16), ('seq', ctypes.c_uint32), ('pid', ctypes.c_uint32), ] class TYPE(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ("BASE", 16), ("NEWLINK", 16), ("DELLINK", 17), ("GETLINK", 18), ("SETLINK", 19), ("NEWADDR", 20), ("DELADDR", 21), ("GETADDR", 22), ("NEWROUTE", 24), ("DELROUTE", 25), ("GETROUTE", 26), ("NEWNEIGH", 28), ("DELNEIGH", 29), ("GETNEIGH", 30), ("NEWRULE", 32), ("DELRULE", 33), ("GETRULE", 34), ("NEWQDISC", 36), ("DELQDISC", 37), ("GETQDISC", 38), ("NEWTCLASS", 40), ("DELTCLASS", 41), ("GETTCLASS", 42), ("NEWTFILTER", 44), ("DELTFILTER", 45), ("GETTFILTER", 46), ("NEWACTION", 48), ("DELACTION", 49), ("GETACTION", 50), ("NEWPREFIX", 52), ("GETMULTICAST", 58), ("GETANYCAST", 62), ("NEWNEIGHTBL", 64), ("GETNEIGHTBL", 66), ("SETNEIGHTBL", 67), ("NEWNDUSEROPT", 68), ("NEWADDRLABEL", 72), ("DELADDRLABEL", 73), ("GETADDRLABEL", 74), ("GETDCB", 78), ("SETDCB", 79), ("NEWNETCONF", 80), ("GETNETCONF", 82), ("NEWMDB", 84), ("DELMDB", 85), ("GETMDB", 86), ("NEWNSID", 88), ("DELNSID", 89), ("GETNSID", 90), ] class RESPONSE(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ("NOOP", 0x1, "Nothing"), ("ERROR", 0x2, "Error"), ("DONE", 0x3, "End of dump"), ("OVERRUN", 0x4, "Data lost"), ] class FLAG(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ("REQUEST", 1, "It is request message"), ("MULTI", 2, "Multipart message, terminated by MSG_DONE"), ("ACK", 4, "Reply with ack, with zero or error code"), ("ECHO", 8, "Echo this request"), ("DUMP_INTR", 16, "Dump was inconsistent due to sequence change"), # Modifiers to GET request ("ROOT", 0x100, "Specify tree root"), ("MATCH", 0x200, "Return all matching"), ("ATOMIC", 0x400, "Atomic GET"), ("DUMP", 0x300), # Modifiers to NEW request ("REPLACE", 0x100, "Override existing"), ("EXCL", 0x200, "Do not touch, if it exists"), ("CREATE", 0x400, "Create, if it does not exist"), ("APPEND", 0x800, "Add to end of list"), ] class AddressMessage(ctypes.Structure): _fields_ = [ ('family', ctypes.c_uint8), ('prefixlen', ctypes.c_uint8), ('flags', ctypes.c_uint8), ('scope', ctypes.c_uint8), ('index', ctypes.c_uint32), ] class FLAG(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ # ifa_flags ("SECONDARY", 0x01), ("TEMPORARY", 0x01), ("NODAD", 0x0002), ("OPTIMISTIC", 0x0004), ("DADFAILED", 0x0008), ("HOMEADDRESS", 0x0010), ("DEPRECATED", 0x0020), ("TENTATIVE", 0x0040), ("PERMANENT", 0x0080), ("MANAGETEMPADDR", 0x0100), ("NOPREFIXROUTE", 0x0200), ("MCAUTOJOIN", 0x0400), ("STABLE_PRIVACY", 0x0800), ] SCOPE = netlink.struct.routing.Message.SCOPE class InterfaceMessage(ctypes.Structure): _fields_ = [ ('family', ctypes.c_uint8), # Address family ('unused_1', ctypes.c_char), ('type', ctypes.c_ushort), ('index', ctypes.c_int), # Link index ('flags', ctypes.c_uint), ('change', ctypes.c_uint), # Flag change mask ] class TYPE(metaclass = netlink.struct.misc.Enumeration): """ ARP header types (0-255) and constants for many other network protocols (256-65535) """ _fields_ = [ ("NETROM", 0, "from KA9Q: NET/ROM pseudo"), ("ETHER", 1, "Ethernet 10Mbps"), ("EETHER", 2, "Experimental Ethernet"), ("AX25", 3, "AX.25 Level 2"), ("PRONET", 4, "PROnet token ring"), ("CHAOS", 5, "Chaosnet"), ("IEEE802", 6, "IEEE 802.2 Ethernet/TR/TB"), ("ARCNET", 7, "ARCnet"), ("APPLETLK", 8, "APPLEtalk"), ("DLCI", 15, "Frame Relay DLCI"), ("ATM", 19, "ATM"), ("METRICOM", 23, "Metricom STRIP (new IANA id)"), ("IEEE1394", 24, "IEEE 1394 IPv4 - RFC 2734"), ("EUI64", 27, "EUI-64"), ("INFINIBAND", 32, "InfiniBand"), # Dummy types for non ARP hardware ("SLIP", 256), ("CSLIP", 257), ("SLIP6", 258), ("CSLIP6", 259), ("RSRVD", 260, "Notional KISS type"), ("ADAPT", 264), ("ROSE", 270), ("X25", 271, "CCITT X.25"), ("HWX25", 272, "Boards with X.25 in firmware"), ("CAN", 280, "Controller Area Network"), ("PPP", 512), ("CISCO", 513, "Cisco HDLC"), ("HDLC", 513, "Cisco HDLC"), ("LAPB", 516, "LAPB"), ("DDCMP", 517, "Digital's DDCMP protocol"), ("RAWHDLC", 518, "Raw HDLC"), ("TUNNEL", 768, "IPIP tunnel"), ("TUNNEL6", 769, "IP6IP6 tunnel"), ("FRAD", 770, "Frame Relay Access Device"), ("SKIP", 771, "SKIP vif"), ("LOOPBACK", 772, "Loopback device"), ("LOCALTLK", 773, "Localtalk device"), ("FDDI", 774, "Fiber Distributed Data Interface"), ("BIF", 775, "AP1000 BIF"), ("SIT", 776, "sit0 device - IPv6-in-IPv4"), ("IPDDP", 777, "IP over DDP tunneller"), ("IPGRE", 778, "GRE over IP"), ("PIMREG", 779, "PIMSM register interface"), ("HIPPI", 780, "High Performance Parallel Interface"), ("ASH", 781, "Nexus 64Mbps Ash"), ("ECONET", 782, "Acorn Econet"), ("IRDA", 783, "Linux-IrDA"), ("FCPP", 784, "Point to point fibrechannel"), ("FCAL", 785, "Fibrechannel arbitrated loop"), ("FCPL", 786, "Fibrechannel public loop"), ("FCFABRIC", 787, "Fibrechannel fabric"), # 787->799 reserved for fibrechannel media types ("IEEE802_TR", 800, "Magic type ident for TR"), ("IEEE80211", 801, "IEEE 802.11"), ("IEEE80211_PRISM", 802, "IEEE 802.11 + Prism2 header"), ("IEEE80211_RADIOTAP", 803, "IEEE 802.11 + radiotap header"), ("IEEE802154", 804), ("IEEE802154_MONITOR", 805, "IEEE 802.15.4 network monitor"), ("PHONET", 820, "PhoNet media type"), ("PHONET_PIPE", 821, "PhoNet pipe header"), ("CAIF", 822, "CAIF media type"), ("IP6GRE", 823, "GRE over IPv6"), ("NETLINK", 824, "Netlink header"), ("IP6LOWPAN", 825, "IPv6 over LoWPAN"), ("VOID", 0xFFFF, "Void type, nothing is known"), ("NONE", 0xFFFE, "zero header length"), ] class FLAG(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ("UP", 1<<0, "Interface is up"), ("BROADCAST", 1<<1, "Broadcast address is valid"), ("DEBUG", 1<<2, "Debugging enabled"), ("LOOPBACK", 1<<3, "Interface is a loopback network"), ("POINTOPOINT", 1<<4, "Interface is a Point-to-Point link"), ("NOTRAILERS", 1<<5, "Avoid the use of trailers"), ("RUNNING", 1<<6, "Interface has operating status UP (:rfc:`2863#section-3.1.14`)"), ("NOARP", 1<<7, "ARP protocol is not used"), ("PROMISC", 1<<8, "Receive all packets (even if they are for another host)"), ("ALLMULTI", 1<<9, "Receive all Multicast packets"), ("MASTER", 1<<10, "Master of a load balancer"), ("SLAVE", 1<<11, "Slave of a load balancer"), ("MULTICAST", 1<<12, "Supports Multicast"), ("PORTSEL", 1<<13, "Media type selectable"), ("AUTOMEDIA", 1<<14, "Automatic media selection active"), ("DYNAMIC", 1<<15, "Is dail-up device with changing addresses"), ("LOWER_UP", 1<<16, "Device signals L1 up"), ("DORMANT", 1<<17, "Driver signals dormant"), ("ECHO", 1<<18, "Echo sent packets"), ] class MessageError(ctypes.Structure): _fields_ = [ ('error', ctypes.c_int), ('msg', MessageHeader), ] class Attribute(ctypes.Structure): _fields_ = [ ('len', ctypes.c_ushort), ('type', ctypes.c_ushort), ] class AddressAttribute(Attribute): class TYPE(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ("UNSPEC", 0), ("ADDRESS", 1, 'Peer address (or local address if there is no peer)'), ("LOCAL", 2, 'Actual device address'), ("LABEL", 3, 'Custom identification label for device'), ("BROADCAST", 4, 'Broadcast address'), ("ANYCAST", 5, 'Anycast address'), ("CACHEINFO", 6, 'Remaining maximum and preferred time for use'), ("MULTICAST", 7, 'Multicast address'), ("FLAGS", 8), ] FLAG = AddressMessage.FLAG class InterfaceAttribute(Attribute): class TYPE(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ("UNSPEC", 0), ("ADDRESS", 1), ("BROADCAST", 2), ("IFNAME", 3), ("MTU", 4), ("LINK", 5), ("QDISC", 6), ("STATS", 7), ("COST", 8), ("PRIORITY", 9), ("MASTER", 10), ("WIRELESS", 12, "Wireless Extension event"), ("PROTINFO", 13, "Protocol specific information for a link"), ("TXQLEN", 14), ("MAP", 15), ("WEIGHT", 16), ("OPERSTATE", 17), ("LINKINFO", 18), ("LINKMODE", 19), ("NET_NS_PID", 20), ("IFALIAS", 21), ("NUM_VF", 22, "Number of VFs if device is SR-IOV PF"), ("VFINFO_LIST", 23), ("STATS64", 24), ("VF_PORTS", 25), ("PORT_SELF", 26), ("AF_SPEC", 27), ("GROUP", 28, "Group the device belongs to"), ("NET_NS_FD", 29), ("EXT_MASK", 30, "Extended info mask = VFs = etc"), ("PROMISCUITY", 31, "Promiscuity count: > 0 means acts PROMISC"), ("NUM_TX_QUEUES", 32), ("NUM_RX_QUEUES", 33), ("CARRIER", 34), ("PHYS_PORT_ID", 35), ("CARRIER_CHANGES", 36), ("PHYS_SWITCH_ID", 37), ("LINK_NETNSID", 38), ("PHYS_PORT_NAME", 39), ("PROTO_DOWN", 40), ] class OPER(metaclass = netlink.struct.misc.Enumeration): """ RFC 2863 operational status """ _fields_ = [ ("UNKNOWN", 0), ("NOTPRESENT", 1), ("DOWN", 2), ("LOWERLAYERDOWN", 3), ("TESTING", 4), ("DORMANT", 5), ("UP", 6), ] class BRIDGE_STATE(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ("DISABLED", 0), ("LISTENING", 1), ("LEARNING", 2), ("FORWARDING", 3), ("BLOCKING", 4), ] class LINKMODE(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ('DEFAULT', 0), ('DORMANT', 1, 'Limit upward transition to dormant'), ] class EXT_MASK(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ('VF', 1<<0), ('BRVLAN', 1<<1), ('BRVLAN_COMPRESSED', 1<<2), ] class CacheInformation(netlink.struct.misc.Structure): _fields_ = [ ('preferred', ctypes.c_uint32), ('valid', ctypes.c_uint32), ('cstamp', ctypes.c_uint32), # Creation timestamp (hundredths of seconds) ('tstamp', ctypes.c_uint32), # Update timestamp (hundredths of seconds) ] PK0NH5o00netlink/struct/netif.pyimport ctypes import netlink.struct.misc import netlink.struct.routing class RequestData(ctypes.Structure): """ Interface request structure for generic request data pointers """ IFNAMSIZ = 16 class IFRU(ctypes.Union): IFNAMSIZ = 16 _fields_ = [ ('flags', ctypes.c_short), ('ivalue', ctypes.c_int), ('mtu', ctypes.c_int), ('slave', ctypes.c_char * IFNAMSIZ), ('newname', ctypes.c_char * IFNAMSIZ), ('data', ctypes.c_void_p), ] _fields_ = [ ('ifrn_name', ctypes.c_char * IFNAMSIZ), ('ifru', IFRU), ] PKJGw]netlink/struct/socket.pyimport ctypes import netlink.struct.misc class MessageHeader(ctypes.Structure): """ Structure describing messages sent by `sendmsg` and received by `recvmsg` """ _fields_ = [ ('name', ctypes.c_void_p), # Address to send to/receive from ('namelen', ctypes.c_uint32), # Length of address data ('iov', ctypes.c_void_p), # Vector of data to send/receive (netlink.struct.misc.IOVector *) ('iovlen', ctypes.c_size_t), # Number of elements in the vector ('control', ctypes.c_void_p), # Ancillary data ('controllen', ctypes.c_size_t), # Ancillary data buffer length ('flags', ctypes.c_int), # Flags on received messages ] PKPNHT netlink/struct/misc.pyimport collections.abc import ctypes class IOVector(ctypes.Structure): """ Some generic "thing that points somewhere" structure from linux/uio.h © 1994 Allon Cox :-) """ _fields_ = [ ('base', ctypes.c_void_p), ('len', ctypes.c_size_t), ] def __init__(self, base = 0, length = 0): super().__init__() self.base = base self.len = length class Structure(ctypes.Structure): def __init__(self, data = None): super().__init__() if data: ctypes.memmove(ctypes.addressof(self), data, ctypes.sizeof(self)) def to_dict(self): result = {} for name, type in self.__class__._fields_: result[name] = getattr(self, name) return result @classmethod def from_dict(cls, attributes): struct = cls() for name, type in cls._fields_: if name in attributes: setattr(struct, name, attributes[name]) return struct class EnumerationBase: """ Base class for all users of the :class:`netlink.struct.misc.Enumeration` meta-class """ @classmethod def get(cls, name): if isinstance(name, str): return cls[name] else: return name @classmethod def get_with_iter(cls, names): if isinstance(names, str): names = (names,) if not isinstance(names, collections.abc.Iterable): return names value = 0 for name in names: value |= cls[name] return value @classmethod def find(cls, value): for name, value2 in cls.__items__.items(): if value2 == value: return name return value @classmethod def find_bitmask(cls, value): names = [] for name, mask in cls.__items__.items(): if (value & mask) != 0: # Add name to list names.append(name) # Remove bit from value value ^= mask # Add remaining bits as integers to the list shift = 0 while value != 0: if (value & 0x01) != 0: names.append(1 << shift) value >>= 1 shift += 1 return names class Enumeration(type): """ Common base class for all constant list classes """ def __new__(cls, name, bases, namespace, **kwargs): # Add our helper base class as class decendant for this class bases += (EnumerationBase,) # Obtain list of enumeration fields fields = namespace['_fields_'] del namespace['_fields_'] namespace['__items__'] = dict() for field in fields: if len(field) > 2: # Wrapper for the standard integer that adds uses the given documentation string # as `__doc__` value class Integer(int): __slots__ = () __doc__ = field[2] field_name = field[0] field_value = Integer(field[1]) else: field_name = field[0] field_value = field[1] field_name = field_name.upper().replace('-', '_') namespace['__items__'][field_name] = field_value # Construct class object return type.__new__(cls, name, bases, namespace) def __getitem__(self, name): if isinstance(name, str): name = name.upper().replace('-', '_') return self.__items__[name] def __getattr__(self, name): return self[name] PKGg netlink/struct/routing.pyimport ctypes import netlink.struct.misc class Message(ctypes.Structure): _fields_ = [ ('family', ctypes.c_ubyte), ('dst_len', ctypes.c_ubyte), ('src_len', ctypes.c_ubyte), ('tos', ctypes.c_ubyte), ('table', ctypes.c_ubyte), ('protocol', ctypes.c_ubyte), ('scope', ctypes.c_ubyte), ('type', ctypes.c_ubyte), ('flags', ctypes.c_uint), ] class FLAG(metaclass = netlink.struct.misc.Enumeration): _fields_ = [ ("NOTIFY", 0x100, "Notify user of route change"), ("CLONED", 0x200, "This route is cloned"), ("EQUALIZE", 0x400, "Multipath equalizer: NI"), ("PREFIX", 0x800, "Prefix addresses"), ] class TYPE(metaclass = netlink.struct.misc.Enumeration): """ Possible routing types """ _fields_ = [ ("UNSPEC", 0), ("UNICAST", 1, "Gateway or direct route"), ("LOCAL", 2, "Accept locally"), ("BROADCAST", 3, "Accept locally as broadcast, sens as broadcast"), ("ANYCAST", 4, "Accept locally as broadcast, but send as unicast"), ("MULTICAST", 5, "Multicast route"), ("BLACKHOLE", 6, "Drop"), ("UNREACHABLE", 7, "Destination is unreachable"), ("PROHIBIT", 8, "Administratively prohibited"), ("THROW", 9, "Not in this table"), ("NAT", 10, "Translate this address"), ("XRESOLVE", 11, "Use external resolver"), ] class PROTOCOL(metaclass = netlink.struct.misc.Enumeration): """ Possible route creation protocols These constants describe how the route came to be and how it should be treated by miscellaneous routing modification software (such as DHCP/NDP clients). """ _fields_ = [ ("UNSPEC", 0), ("REDIRECT", 1, "Route installed by ICMP redirects (not used by current IPv4)"), ("KERNEL", 2, "Route installed by kernel"), ("BOOT", 3, "Route installed during boot"), ("STATIC", 4, "Route installed by administrator"), # Values above STATIC are not interpreted by kernel: # They are just passed from user and back as is and are be used by different routing daemons to # mark "their" routes. New protocol values should be standardized in order to avoid conflicts. ("GATED", 8, "GateD"), ("RA", 9, "RDISC/ND router advertisements"), ("MRT", 10, "Merit MRT"), ("ZEBRA", 11, "Zebra"), ("BIRD", 12, "BIRD"), ("DNROUTED", 13, "DECnet routing daemon"), ("XORP", 14, "XORP"), ("NTK", 15, "Netsukuku"), ("DHCP", 16, "DHCP client"), ("MROUTED", 17, "Multicast daemon"), ("BABEL", 18, "Babel daemon"), ] class SCOPE(metaclass = netlink.struct.misc.Enumeration): """ Enumeration class of possible link scopes The name "scope" is somewhat misleading, as these constants actually attempt to describe the distance from this host to the next hop. Intermediate values, not listed here, are also possible. This can be used, for instance, to declare interior routes between UNIVERSE and LINK. """ _fields_ = [ ("UNIVERSE", 0, "Some place far, far away"), ("SITE", 200), ("LINK", 253, "Destinations attached directly to this system"), ("HOST", 254, "Local addresses"), ("NOWHERE", 255, "None-existing destinations"), ] class TABLE(metaclass = netlink.struct.misc.Enumeration): """ Enumeration class of reserved routing table class identifiers """ _fields_ = [ ("UNSPEC", 0), ("COMPAT", 252), ("DEFAULT", 253), ("MAIN", 254), ("LOCAL", 255), ] PKOHݳw%NetLink-0.1.dist-info/DESCRIPTION.rstPython NetLink ============== Python NetLink is a pythonic pure-Python API for the Linux NetLink set of APIs (and some related ones where it makes sense). It aims to be as efficient as possible, while still giving you high-level and easy-to-use access to most common operations. When high-level access is not enough, Python NetLink also gives you the powerful low-level API that can be used to assemble arbitrary NetLink and ETHTool commands, send them to the kernel and parse the response. Documentation ------------- Documentation is available `online `__ Releases -------- Releases are available on PyPI and on the `documentation server `__. PKOHY`__ Releases -------- Releases are available on PyPI and on the `documentation server `__. PKOHBC]]NetLink-0.1.dist-info/RECORDNetLink-0.1.dist-info/DESCRIPTION.rst,sha256=yaueSm3qeJCraemcnAB05YcqZxGNLaJXWXOunW3GUnc,755 NetLink-0.1.dist-info/METADATA,sha256=Pe0z553ucTvjylQmg_BdOjgjjbFXkNPMufwjRk1G7Fk,1681 NetLink-0.1.dist-info/RECORD,, NetLink-0.1.dist-info/WHEEL,sha256=zX7PHtH_7K-lEzyK75et0UBa3Bj8egCBMXe1M4gc6SU,92 NetLink-0.1.dist-info/metadata.json,sha256=HZCFyvZt3AnwjjkN1lR6zoYnGIYJM7YbuY09Yxfc4JM,1044 NetLink-0.1.dist-info/top_level.txt,sha256=uucbUVb-PDPGU5CqlYgbGaOYHsO7fT6V6XwHJN0wvEc,8 netlink/__init__.py,sha256=8VB-p4i7ZZXDEcEMn7mZvcivQgIThKLWWNLEhhkBrgE,19939 netlink/low/ethtool.py,sha256=5NxDjv-0EDYippfvZR599w0QdDxBcvq4-O_9kVtlQB4,1750 netlink/low/netif.py,sha256=ZcsgSxBqnbXUU9NzvTdfeO3hwSPogmY699KONRgwesk,5031 netlink/low/netlink.py,sha256=afsoACrhmP3TEpi3rCZ9ulU1U5wD9LopCD4cFLDVTr0,7465 netlink/low/netlinkattrib.py,sha256=rCvLiOWJYpU-FtFrNWafdO2IaeW0GBDkVEnt8z_wToo,14853 netlink/struct/ethtool.py,sha256=g_RSY_1WZUQO4X1CBTPX78oqokBEuyVixdW0aiG5VeQ,528 netlink/struct/misc.py,sha256=Euq70ZHfWO1PAO9iMsomSiZamERpGiY69kwnJGXYBC0,2984 netlink/struct/netif.py,sha256=ZCGKI0yIk9BXb2ZIeVcAxhgNTqTTuRMySE1VUauVroM,560 netlink/struct/netlink.py,sha256=suy6DInWO3MIKjjL7J9HZGT7e84QgdGp-kUekHh-8UM,11333 netlink/struct/routing.py,sha256=m_ZI7H9aIEp3TjhAjEQpbpO6WXUZ7_04cM9majnDDmc,3495 netlink/struct/socket.py,sha256=QyBuqqAnvFpNeavhNzHsSNOrHh9fErhWYsEvqsSbZdI,682 PK$OH#i MMnetlink/__init__.pyPKOH^QbNnetlink/low/ethtool.pyPKrNHU@::Unetlink/low/netlinkattrib.pyPKOH-))]netlink/low/netlink.pyPKA#H+netlink/low/netif.pyPKg#HE.mnetlink/struct/ethtool.pyPK"H/jE,E,netlink/struct/netlink.pyPK0NH5o00Vnetlink/struct/netif.pyPKJGw]netlink/struct/socket.pyPKPNHT netlink/struct/misc.pyPKGg wnetlink/struct/routing.pyPKOHݳw%UNetLink-0.1.dist-info/DESCRIPTION.rstPKOHY