#!/usr/bin/python3
#
# tBB is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# tBB is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Serialization module for tBB.
"""
import asyncio
import os
import json
import datetime
import time
import logging
from concurrent.futures import ProcessPoolExecutor
import paths
import tracker
from net_elements import *
start_datetime = datetime.datetime.now()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
[docs]def path_for_network(network, saving_path=paths.scans, suffix='.tbbscan'):
return os.path.join(saving_path, network.as_string().replace('/', '\\') + suffix)
[docs]class Serializer(object):
def __init__(self, network=None, path=None, track=None, config=None, sessions=[]):
if network is not None:
self.path = path_for_network(network)
else:
self.path = path
self.out_indent = config.indent.value
self.out_sort_keys = config.do_sort.value
self.track = track
self.sessions = sessions
self.last_save = datetime.datetime.fromtimestamp(0)
self.saving = False # is this Serializer saving to file?
[docs] def load(self):
start = time.time()
logger.debug("Loading scan from file {}.".format(self.path))
net = Network(os.path.splitext(os.path.basename(self.path))[0].replace('\\', '/'))
with open(self.path, 'r') as f:
read = json.load(f)
for session in read['SESSIONS']:
self.sessions.append(
(datetime.datetime.fromtimestamp(float(session)),
datetime.datetime.fromtimestamp(read['SESSIONS'][session]))
)
self.track = tracker.TrackersHandler(net, read['TRACKERS_HANDLER']['hosts'])
self.track.serializer = self
ignore_list = self.track.ignore
for ignore in read['TRACKERS_HANDLER']['ignore']:
if IPElement(ignore) not in ignore_list:
ignore_list.append(IPElement(ignore))
self.track.ignore = ignore_list
ignore_list = self.track.ignore_mac
for ignore in read['TRACKERS_HANDLER']['ignore_mac']:
if MACElement(ignore) not in ignore_list:
ignore_list.append(MACElement(ignore))
self.track.ignore_mac = ignore_list
ignore_list = self.track.ignore_name
for ignore in read['TRACKERS_HANDLER']['ignore_name']:
if ignore not in ignore_list:
ignore_list.append(ignore)
self.track.ignore_mac = ignore_list
priorities = self.track.priorities
for ip in read['TRACKERS_HANDLER']['priorities']:
if read['TRACKERS_HANDLER']['priorities'][ip] != 0:
priorities.update({IPElement(ip): read['TRACKERS_HANDLER']['priorities'][ip]})
self.track.priorities = priorities
for ip in read['IP_HOSTS']:
host = self.track.ip_hosts[IPElement(ip)]
host.last_check = datetime.datetime.fromtimestamp(read['IP_HOSTS'][ip]['last_check'])
host.last_seen = datetime.datetime.fromtimestamp(read['IP_HOSTS'][ip]['last_seen'])
for history_name in ('mac', 'is_up', 'discovery', 'name'):
history_name += "_history"
history = getattr(host, history_name)
for entry in read['IP_HOSTS'][ip][history_name]:
decoded = datetime.datetime.fromtimestamp(float(entry))
history[decoded] = read['IP_HOSTS'][ip][history_name][entry]
if history_name == 'name_history':
# print("name_history")
# print(history[decoded])
history[decoded] = tuple(history[decoded])
# print(history[decoded])
for mac in read['MAC_HOSTS']:
try:
host = self.track.mac_hosts[MACElement(mac)]
except KeyError:
host = MACHost(MACElement(mac))
host.last_update = datetime.datetime.fromtimestamp(read['MAC_HOSTS'][mac]['last_update'])
for entry in read['MAC_HOSTS'][mac]['history']:
decoded = datetime.datetime.fromtimestamp(float(entry))
decoded_ips = []
for ip in read['MAC_HOSTS'][mac]['history'][entry]:
decoded_ips.append(IPElement(ip))
host.history[decoded] = tuple(decoded_ips)
self.track.trackers[0].mac_hosts[MACElement(mac)] = host
for name in read['NAME_HOSTS']:
try:
host = self.track.name_hosts[name]
except KeyError:
host = NameHost(name)
host.last_update = datetime.datetime.fromtimestamp(read['NAME_HOSTS'][name]['last_update'])
for entry in read['NAME_HOSTS'][name]['history']:
decoded = datetime.datetime.fromtimestamp(float(entry))
decoded_ips = []
for ip in read['NAME_HOSTS'][name]['history'][entry]:
decoded_ips.append(IPElement(ip))
host.history[decoded] = tuple(decoded_ips)
self.track.trackers[0].name_hosts[name] = host
logger.debug("Loading took {:.3f} seconds.".format(time.time() - start))
@asyncio.coroutine
[docs] def save(self):
yield from asyncio.get_event_loop().run_in_executor(ProcessPoolExecutor(max_workers=3), self._save)
# self._save already sets self.last_save, but the executor prevents the change from being stored
self.last_save = datetime.datetime.now()
def _save(self):
if self.saving:
logger.debug('skipping saving due to pending operation.')
return
self.saving = True
start = time.time()
logger.debug("Saving scan to file {}.".format(self.path))
to_save = {
"SESSIONS": {},
"TRACKERS_HANDLER": {},
"IP_HOSTS": {},
"MAC_HOSTS": {},
"NAME_HOSTS": {},
}
for session in self.sessions:
to_save['SESSIONS'][session[0].timestamp()] = session[1].timestamp()
to_save['SESSIONS'][start_datetime.timestamp()] = datetime.datetime.now().timestamp()
to_save['TRACKERS_HANDLER']['hosts'] = self.track.hosts
to_save['TRACKERS_HANDLER']['ignore'] = []
to_save['TRACKERS_HANDLER']['ignore_mac'] = []
to_save['TRACKERS_HANDLER']['ignore_name'] = []
to_save['TRACKERS_HANDLER']['priorities'] = {}
for ignore in self.track.ignore:
if ignore.as_string() not in to_save['TRACKERS_HANDLER']['ignore']:
to_save['TRACKERS_HANDLER']['ignore'].append(ignore.as_string())
for ignore in self.track.ignore_mac:
if ignore.mac not in to_save['TRACKERS_HANDLER']['ignore_mac']:
to_save['TRACKERS_HANDLER']['ignore_mac'].append(ignore.mac)
for ignore in self.track.ignore_name:
if ignore not in to_save['TRACKERS_HANDLER']['ignore_name']:
to_save['TRACKERS_HANDLER']['ignore_name'].append(ignore)
for ip in self.track.priorities:
if self.track.priorities[ip] != 0:
to_save['TRACKERS_HANDLER']['priorities'].update({ip.as_string(): self.track.priorities[ip]})
for ip_host in self.track.ip_hosts:
host = self.track.ip_hosts[ip_host]
to_save['IP_HOSTS'][ip_host.as_string()] = {
'mac_history': {},
'is_up_history': {},
'discovery_history': {},
'name_history': {},
'last_check': host.last_check.timestamp(),
'last_seen': host.last_seen.timestamp()
}
for history_name in ('mac', 'is_up', 'discovery', 'name'):
history_name += "_history"
history = getattr(host, history_name)
for entry in history:
encoded = entry.timestamp()
to_save['IP_HOSTS'][ip_host.as_string()][history_name][encoded] = history[entry]
for mac_host in self.track.mac_hosts:
host = self.track.mac_hosts[mac_host]
to_save['MAC_HOSTS'][mac_host.mac] = {
'history': {},
'last_update': host.last_update.timestamp(),
}
for entry in host.history:
encoded = str(entry.timestamp())
encoded_ips = []
for ip in host.history[entry]:
encoded_ips.append(ip.as_string())
to_save['MAC_HOSTS'][MACElement(mac_host.mac)]['history'][encoded] = encoded_ips
for name_host in self.track.name_hosts:
host = self.track.name_hosts[name_host]
to_save['NAME_HOSTS'][name_host] = {
'history': {},
'last_update': host.last_update.timestamp(),
}
if host.name == 'empty-name':
# this name usually takes up a lot of space for some very useless
# information, so let's only save the latest status for 'empty-name'.
ips = []
for ip in host.history[sorted(host.history)[-1]]:
ips.append(ip.as_string())
to_save['NAME_HOSTS']['empty-name']['history'][str(sorted(host.history)[-1].timestamp())] = ips
continue
for entry in host.history:
encoded = str(entry.timestamp())
encoded_ips = []
for ip in host.history[entry]:
encoded_ips.append(ip.as_string())
to_save['NAME_HOSTS'][name_host]['history'][encoded] = encoded_ips
with open(self.path, 'w') as f:
json.dump(to_save, f, indent=self.out_indent, sort_keys=self.out_sort_keys)
self.saving = False
self.last_save = datetime.datetime.now()
logger.debug("Saving took {:.3f} seconds.".format(time.time() - start))
@asyncio.coroutine
[docs] def keep_saving(self, frequency):
while asyncio.get_event_loop().is_running():
if (datetime.datetime.now() - self.last_save).total_seconds() >= frequency:
yield from self.save()
logger.info("Automatic saving.")
yield from asyncio.sleep(10)