Source code for tBB.net_elements

# tBB is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# tBB is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.


Network elements representations for Python.


import datetime
import math
import logging

logger = logging.getLogger(__name__)

[docs]def netmask_from_netlength(hosts): if not (hosts != 0 and ((hosts & (hosts - 1)) == 0)): raise ValueError("expected argument hosts to be a power of 2. Got: {}".format(hosts)) return 32 - math.log(hosts, 2)
[docs]class IPElement(object): """ IP node object representation for Python. This object responds to the following interfaces: - addition (and subtraction) - equality (and inequality) - hashable """ def __init__(self, *args, **kwargs): """ Possible initializations:: IPElement('') IPElement(ip='', mask=25) """ self._ip = ('', [0, 0, 0, 0]) self._mask = 0 if len(args) == 1: self.mask, self.ip = reversed(args[0].split('/')) # mask needs to be initialized before ip else: self.ip = kwargs['ip'] self.mask = kwargs['mask'] # if 'detect_element_type' in kwargs and kwargs['detect_element_type']: # if self.is_network(): # return Network(self) @property def ip(self): return self._ip @ip.setter def ip(self, *args): if len(args) == 1: self._ip = self.parse_ip(args[0]) elif len(args) == 4: i = 0 for elem in args: i += 1 if elem < 0 or elem > 255: raise ValueError("IP address (part {}) out of range: {}.".format(i, elem)) self._ip = ('.'.join(args), args) else: raise TypeError("either a string or a 4-length iterable are required for ip.") @property def mask(self): return self._mask @mask.setter def mask(self, value): if type(value) == int: if value < 0 or value > 32: raise ValueError("IP mask out of range: {}.".format(value)) self._mask = value self._mask = self.parse_mask(value) @staticmethod
[docs] def parse_ip_with_mask(string): """ Parses a string and returns the network IP and mask. :return: (ip: str, mask: int) """ ip = IPElement.parse_ip(string[:string.find('/')]) mask = IPElement.parse_mask(string[string.find('/')+1:]) if ip and mask: return ip, mask
[docs] def parse_ip(string): """ Parses a string and checks if it is a valid IP. :return: string, list """ split = string.split('.') if len(split) != 4: raise ValueError("IP address malformed.") i = 0 for elem in split: elem = int(elem) split[i] = elem i += 1 if elem < 0 or elem > 255: raise ValueError("IP address (part {}) out of range: {}.".format(i, elem)) return string, split
[docs] def parse_mask(string): """ Parses a string and checks if it is a valid mask. :return: int """ mask = int(string) if mask < 0 or mask > 32: raise ValueError("IP mask out of range: {}.".format(mask)) return mask
[docs] def is_network(self): """ Returns ``True`` if ``self.ip`` is a network IP in accordance to ``self.mask``. :return: bool """ num = '' for i in self.ip[1]: num += '{0:08b}'.format(i) for i in num[self.mask:]: if i != '0': return False return True
[docs] def is_broadcast(self): """ Returns ``True`` if ``self.ip`` is a broadcast IP in accordance to ``self.mask``. :return: bool """ num = '' for i in self.ip[1]: num += '{0:08b}'.format(i) for i in num[self.mask:]: if i != '1': return False return True
[docs] def as_int(self): num = 0 for i, ip_part in enumerate(reversed(self.ip[1])): num += ip_part * 10**(3*i) return num
[docs] def as_string(self): return '/'.join([self._ip[0], str(self.mask)])
def __eq__(self, other): if type(other) == str: try: other = self.__class__(other) except (TypeError, ValueError): return False if not isinstance(other, self.__class__): return False return self.ip == other.ip def __lt__(self, other): if type(other) == str: try: other = self.__class__(other) except (TypeError, ValueError): return False if not isinstance(other, self.__class__): raise TypeError("unorderable types: {} < {}.".format(self, other)) return self.as_int() < other.as_int() def __hash__(self): return self.ip[0].__hash__() def __add__(self, other): if type(other) != int: raise TypeError("expected an integer.") ip = self.ip[1].copy() to_add = other for elem in reversed(range(4)): ip[elem] += to_add to_add = ip[elem] // 256 # truncated division ip[elem] %= 256 return IPElement(ip='.'.join([str(x) for x in ip]), mask=self.mask) def __sub__(self, other): if type(other) != int: raise TypeError("expected an integer.") return self + (-other) def __repr__(self): try: return "<{} {}/{}>".format(self.__class__.__name__, self.ip[0], self.mask) except AttributeError: return "<{} not initialized>".format(self.__class__.__name__)
[docs]class MACElement(object): def __init__(self, mac): if type(mac) != str: raise TypeError("expected argument mac to be of type str. Got: {}.".format(type(mac))) self.mac = mac.lower() def __eq__(self, other): if type(other) == str: try: other = self.__class__(other) except (TypeError, ValueError): return False if not isinstance(other, self.__class__): return False return self.mac == other.mac def __hash__(self): return self.mac.__hash__() def __repr__(self): try: return "<{} {}>".format(self.__class__.__name__, self.mac) except AttributeError: return "<{} not initialized>".format(self.__class__.__name__)
[docs]class Network(IPElement): """ IP network object representation for Python. This object responds to the following interfaces (inherited from IPElement): - addition (and subtraction) - equality (and inequality) - hashable In addition, also responds to the following: - iterable - sliceable """ def __init__(self, *args, **kwargs): """ Possible initializations:: Network('') Network(ip='', mask=25) Network('') Network(ip='', mask=25, force_length=10) """ if len(args) == 1 and isinstance(args[0], IPElement): self._ip = args[0].ip self._mask = args[0].mask else: if len(args) == 1: if len(args[0].split('-')) == 2: ip, fl = args[0].split('-') self.forced_length = int(fl) super().__init__(ip) else: super().__init__(*args, **kwargs) else: super().__init__(*args, **kwargs) if not self.is_network(): raise ValueError("IP address must be a valid network IP address.") if not hasattr(self, 'forced_length'): self.forced_length = -1 if 'force_length' in kwargs: # TODO: provide documentation for force_length and its behaviour self.forced_length = kwargs['force_length'] self.__iter_index = 0 @property def ip(self): return super().ip @ip.setter def ip(self, value): prev_ip = '/'.join([self.ip[0], str(self.mask)]) super(Network, Network).ip.__set__(self, value) if not self.is_network(): if not prev_ip.startswith('/'): super(Network, Network).ip.__set__(self, prev_ip) raise ValueError("IP address must be a valid network IP address.") def __len__(self): try: if self.forced_length >= 0: return self.forced_length return 2 ** (32 - self.mask) except AttributeError: return 0 def __getitem__(self, item): if item >= len(self): raise ValueError("requested item '{}' out of network range ({}).".format(item, len(self))) return self + item def __iter__(self): return self def __next__(self): return
[docs] def as_string(self): return '-'.join([super().as_string(), str(len(self))])
[docs] def next(self): if self.forced_length == -1: if self.__iter_index >= len(self)-1: # remove broadcast self.__iter_index = 0 raise StopIteration() else: if self.__iter_index >= len(self): self.__iter_index = 0 raise StopIteration() self.__iter_index += 1 return self + self.__iter_index
[docs] def broadcast(self): return self + (len(self) - 1)
[docs] def last_host(self): return self + (len(self) - 2)
def __repr__(self): return super().__repr__().replace('>', '-{}>'.format(len(self)))
[docs]class IPHost(object): def __init__(self, ip): if not isinstance(ip, IPElement): raise TypeError("expected an IPElement instance for argument ip. Got: {}".format(ip)) self._ip = ip self.mac_history = {} self.is_up_history = {} self.discovery_history = {} self.name_history = {} self.last_check = datetime.datetime.fromtimestamp(0) self.last_seen = datetime.datetime.fromtimestamp(0) @property def ip(self): return self._ip.ip @ip.setter def ip(self, value): self._ip.ip = value @property def is_up(self): try: return self.is_up_history[sorted(self.is_up_history.keys())[-1]] except IndexError: # history is empty return None @property def mac(self): try: return self.mac_history[sorted(self.mac_history.keys())[-1]] except IndexError: # history is empty return None @property def name(self): try: return self.name_history[sorted(self.name_history.keys())[-1]] except IndexError: # history is empty return None @property def second_last_mac(self): try: return self.mac_history[sorted(self.mac_history.keys())[-2]] except IndexError: # history is empty return None @property def second_last_name(self): try: return self.name_history[sorted(self.name_history.keys())[-2]] except IndexError: # history is empty return None @property def ago(self): return - self.last_check @property def last_discovery_method(self): try: return self.discovery_history[sorted(self.discovery_history.keys())[-1]] except IndexError: # history is empty return None
[docs] def add_to_mac_history(self, entry): self.mac_history[self.last_check] = entry
[docs] def add_to_discovery_history(self, entry): self.discovery_history[self.last_check] = entry
[docs] def add_to_is_up_history(self, entry): self.is_up_history[self.last_check] = entry
[docs] def add_to_name_history(self, entry): self.name_history[self.last_check] = entry
[docs] def update(self, mac, method, is_up, name): self.last_check = if is_up: self.last_seen = self.last_check changed = False what = [] if mac != self.mac and mac is not None: self.add_to_mac_history(mac) changed = True what.append('mac') if method != self.last_discovery_method and method is not None: self.add_to_discovery_history(method) changed = True what.append('method') if is_up != self.is_up: self.add_to_is_up_history(is_up) changed = True what.append('is_up') if name != and name != (): self.add_to_name_history(name) changed = True what.append('name') what = ' '.join(what) return changed, what
[docs] def print_histories(self): print("MAC HISTORY FOR IPHOST: {}".format(repr(self))) for entry in sorted(self.mac_history): print(entry, " - ", self.mac_history[entry]) print("UP HISTORY FOR IPHOST: {}".format(repr(self))) for entry in sorted(self.is_up_history): print(entry, " - ", self.is_up_history[entry]) print("DISCOVERY HISTORY FOR IPHOST: {}".format(repr(self))) for entry in sorted(self.discovery_history): print(entry, " - ", self.discovery_history[entry]) print("NAME HISTORY FOR IPHOST: {}".format(repr(self))) for entry in sorted(self.name_history): print(entry, " - ", self.name_history[entry])
def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.ip == other.ip def __hash__(self): return self._ip.__hash__() def __repr__(self): if self.mac is not None: return "<{} {}/{}@{}>".format(self.__class__.__name__, self.ip[0], self._ip.mask, self.mac) else: return "<{} {}/{}>".format(self.__class__.__name__, self.ip[0], self._ip.mask)
[docs]class MACHost(object): def __init__(self, mac): if not isinstance(mac, MACElement): raise TypeError("expected a MACElement instance for argument mac. Got: {}".format(mac)) self.mac = mac self.history = {} self.last_update = datetime.datetime.fromtimestamp(0) @property def ip(self): try: return self.history[sorted(self.history.keys())[-1]] except IndexError: # history is empty return None @property def ago(self): return - self.last_update
[docs] def add_to_history(self, entry): if not isinstance(entry, tuple): entry = (entry,) self.history[self.last_update] = entry
[docs] def update(self, ip): self.last_update = changed = False what = [] if self.ip is not None: if type(ip) == tuple: if self.ip != ip: self.add_to_history(ip) changed = True what.append('ip_left') else: if ip not in self.ip: _ip = list(self.ip) _ip.append(ip) _ip = tuple(_ip) self.add_to_history(_ip) changed = True what.append('ip_joined') else: self.add_to_history(ip) changed = True what.append('ip_none') what = ' '.join(what) return changed, what
[docs] def update_ip_disconnected(self, ip): ips = list(self.ip) try: ips.remove(ip) except ValueError: logger.error("MACHost '{}' was told that IP '{}' changed mac, " "but it wasn't part of this MACHost.".format(self, ip)) self.update(tuple(ips))
[docs] def print_histories(self): print("IP HISTORY FOR macHOST: {}".format(repr(self))) for entry in sorted(self.history): print(entry, " - ", self.history[entry])
def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.mac == other.mac def __hash__(self): return self.mac.__hash__() def __repr__(self): if self.mac is not None: return "<{} {}@{}>".format(self.__class__.__name__, self.mac, self.ip) else: return "<{} {}>".format(self.__class__.__name__, self.mac)
[docs]class NameHost(object): def __init__(self, name): if type(name) != str: raise TypeError("expected a string for argument name. Got: {}".format(name)) = name self.history = {} self.last_update = datetime.datetime.fromtimestamp(0) @property def ip(self): try: return self.history[sorted(self.history.keys())[-1]] except IndexError: # history is empty return None @property def ago(self): return - self.last_update
[docs] def add_to_history(self, entry): if not isinstance(entry, tuple): entry = (entry,) self.history[self.last_update] = entry
[docs] def update(self, ip): self.last_update = changed = False what = [] if self.ip is not None: if type(ip) == tuple: if self.ip != ip: self.add_to_history(ip) changed = True what.append('ip_left') else: if ip not in self.ip: _ip = list(self.ip) _ip.append(ip) _ip = tuple(_ip) self.add_to_history(_ip) changed = True what.append('ip_joined') else: self.add_to_history(ip) changed = True what.append('ip_none') what = ' '.join(what) return changed, what
[docs] def update_ip_disconnected(self, ip): ips = list(self.ip) try: ips.remove(ip) except ValueError: logger.error("NameHost '{}' was told that IP '{}' changed name, " "but it wasn't part of this NameHost.".format(self, ip)) self.update(tuple(ips))
[docs] def print_histories(self): print("IP HISTORY FOR NameHOST: {}".format(repr(self))) for entry in sorted(self.history): print(entry, " - ", self.history[entry])
def __eq__(self, other): if not isinstance(other, self.__class__): return False return == def __hash__(self): return def __repr__(self): if is not None: return "<{} {}@{}>".format(self.__class__.__name__,, self.ip) else: return "<{} {}>".format(self.__class__.__name__,