#!/usr/bin/python3
#
# tBB is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# tBB is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This module helps handling tBB front-ends.
"""
import asyncio
import json
import logging
import datetime
import os
import ssl
import socket # only used for open port checking
import contextlib # only used for open port checking
from aiohttp import web
from asyncio import coroutine
import paths
from net_elements import *
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
[docs]class FrontendsHandler(object):
def __init__(self, tracker, password, config, loop=None):
self.tracker = tracker
self.password = password
self.host = config.host.value
self.port = self.determine_port(self.host,
config.port.value,
config.maximum_port_lookup.value)
self.app = web.Application(logger=logger)
self.handler = None # will be defined at start
self.srv = None # will be defined at start
if config.ssl.enable:
ca_file_path = os.path.join(paths.certs, "cert.pem")
key_file_path = os.path.join(paths.certs, "key.pem")
if os.path.isfile(ca_file_path) and os.path.isfile(key_file_path):
self.sslcontext = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.sslcontext.options |= ssl.OP_NO_SSLv2
self.sslcontext.options |= ssl.OP_NO_SSLv3
self.sslcontext.load_cert_chain(certfile=ca_file_path,
keyfile=key_file_path)
self.sslcontext.check_hostname = config.ssl.check_hostname.value
else:
logger.warning("Asked to use SSL, but no certificates can be found. "
"Running plain HTTP.")
if loop is None:
self.loop = asyncio.get_event_loop()
else:
self.loop = loop
@staticmethod
[docs] def determine_port(host, starting_port, maximum_port_lookup):
"""
This method searches the first port available
after (and including) starting_port.
To limit this method from looking up to port 65535,
use the maximum_port_lookup argument.
"""
port = starting_port
while True:
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
if sock.connect_ex((host, port)) != 0:
break
else:
port += 1
if port > starting_port + maximum_port_lookup:
raise RuntimeError("maximum lookup exceeded while looking for available port.")
return port
@coroutine
[docs] def start(self):
self.loop = asyncio.get_event_loop()
self.handler = self.app.make_handler(logger=logger)
if hasattr(self, 'sslcontext'):
sslcon = self.sslcontext
else:
sslcon = None
self.srv = yield from self.loop.create_server(
self.handler, self.host, self.port, ssl=sslcon
)
self.bind_requests()
logger.info("Frontend socket opened at {}://{}:{}/.".format(
'https' if sslcon else 'http',
self.host, self.port
))
[docs] def close(self):
self.srv.close()
self.loop.run_until_complete(self.srv.wait_closed())
self.loop.run_until_complete(self.app.shutdown())
self.loop.run_until_complete(self.handler.finish_connections(1.0))
self.loop.run_until_complete(self.app.cleanup())
[docs] def bind_requests(self):
self.app.router.add_route('GET', '/test/', self.test)
self.app.router.add_route('GET', '/stats/{password}/', self.stats)
self.app.router.add_route('GET', '/ip_info/{addr}/{password}/', self.ip_info)
self.app.router.add_route('GET', '/mac_info/{addr}/{password}/', self.mac_info)
self.app.router.add_route('GET', '/name_info/{addr}/{password}/', self.name_info)
self.app.router.add_route('GET', '/status/{password}/', self.status)
self.app.router.add_route('GET', '/settings/get/{what}/{password}/', self.settings_get)
self.app.router.add_route('GET', '/settings/set/{what}/{value}/{password}/', self.settings_set)
self.app.router.add_route('GET', '/ignore/{method}/{ip}/{password}/', self.ignore)
self.app.router.add_route('GET', '/ignore_mac/{method}/{mac}/{password}/', self.ignore_mac)
self.app.router.add_route('GET', '/ignore_name/{method}/{name}/{password}/', self.ignore_name)
self.app.router.add_route('GET', '/is_ignored/{ip}/{password}/', self.is_ignored)
self.app.router.add_route('GET', '/is_mac_ignored/{mac}/{password}/', self.is_mac_ignored)
self.app.router.add_route('GET', '/is_name_ignored/{name}/{password}/', self.is_name_ignored)
self.app.router.add_route('GET', '/ignored_ips/{password}/', self.ignored_ips)
self.app.router.add_route('GET', '/ignored_macs/{password}/', self.ignored_macs)
self.app.router.add_route('GET', '/ignored_names/{password}/', self.ignored_names)
self.app.router.add_route('GET', '/set_priority/{ip}/{value}/{password}/', self.set_priority)
self.app.router.add_route('GET', '/get_priority/{ip}/{password}/', self.get_priority)
self.app.router.add_route('GET', '/ip_host_changes/{ip}/{from}/{to}/{password}/', self.ip_host_changes)
self.app.router.add_route('GET', '/mac_host_changes/{mac}/{from}/{to}/{password}/', self.mac_host_changes)
self.app.router.add_route('GET', '/name_host_changes/{name}/{from}/{to}/{password}/', self.name_host_changes)
self.app.router.add_route('GET', '/up_ip_hosts/{password}/', self.up_ip_hosts)
self.app.router.add_route('GET', '/up_mac_hosts/{password}/', self.up_mac_hosts)
self.app.router.add_route('GET', '/up_name_hosts/{password}/', self.up_name_hosts)
@coroutine
[docs] def test(self, request):
return web.Response(body=b"Connectivity test.")
@coroutine
[docs] def stats(self, request):
self.check_request_input(request, [])
hosts_up = []
for host in self.tracker.ip_hosts:
hosts_up.append((host.as_string(), self.tracker.ip_hosts[host].mac))
return web.Response(body=json.dumps({
'network': self.tracker.network.as_string(),
'up_hosts': self.tracker.up_hosts,
'hosts_up': hosts_up,
}).encode('utf-8'))
@coroutine
[docs] def ip_info(self, request):
check = self.check_request_input(request, ['addr'])
if check is not None:
return check
as_ip = self.check_ip(request.match_info['addr'])
if not isinstance(as_ip, IPElement):
return as_ip
host = self.tracker.ip_hosts[as_ip]
info = {
'ip': as_ip.as_string(),
'is_up': host.is_up,
'mac': host.mac,
'method': host.last_discovery_method,
'name': host.name,
'last_check': host.last_check.timestamp(),
'last_seen': host.last_seen.timestamp(),
'mac_history': {},
'is_up_history': {},
'discovery_history': {},
'name_history': {},
}
for history_name in ('mac', 'is_up', 'discovery', 'name'):
history_name += "_history"
history = getattr(host, history_name)
for entry in history:
encoded = entry.timestamp()
info[history_name][encoded] = history[entry]
return web.Response(
body=json.dumps(info).encode('utf-8'))
@coroutine
[docs] def mac_info(self, request):
check = self.check_request_input(request, ['addr'])
if check is not None:
return check
as_mac = self.check_mac(request.match_info['addr'])
if not isinstance(as_mac, MACElement):
return as_mac
host = self.tracker.mac_hosts[as_mac]
info = {
'mac': as_mac.mac,
'last_update': host.last_update.timestamp(),
'ip': [],
'history': {},
}
for ip in host.ip:
info['ip'].append(ip.ip[0])
for entry in host.history:
encoded = str(entry.timestamp())
encoded_ips = []
for ip in host.history[entry]:
encoded_ips.append(ip.ip[0])
info['history'][encoded] = encoded_ips
return web.Response(
body=json.dumps(info).encode('utf-8'))
@coroutine
[docs] def name_info(self, request):
check = self.check_request_input(request, ['addr'])
if check is not None:
return check
name = self.check_name(request.match_info['addr'])
if type(name) != str:
return name
host = self.tracker.name_hosts[name]
info = {
'name': name,
'last_update': host.last_update.timestamp(),
'ip': [],
'history': {},
}
for ip in host.ip:
info['ip'].append(ip.ip[0])
for entry in host.history:
encoded = str(entry.timestamp())
encoded_ips = []
for ip in host.history[entry]:
encoded_ips.append(ip.ip[0])
info['history'][encoded] = encoded_ips
return web.Response(
body=json.dumps(info).encode('utf-8'))
@coroutine
[docs] def status(self, request):
check = self.check_request_input(request, [])
if check is not None:
return check
info = {}
for i, track in enumerate(self.tracker.trackers):
info.update({
i: (track.outer_status, track.status)
})
return web.Response(
body=json.dumps(info).encode('utf-8'))
@coroutine
[docs] def settings_get(self, request):
check = self.check_request_input(request, ['what'])
if check is not None:
return check
settings = {
'time_between_checks': [':'.join([str(t.seconds // 60), str(t.seconds - t.seconds//60*60)])
for t in self.tracker.time_between_checks],
'maximum_seconds_randomly_added': self.tracker.maximum_seconds_randomly_added,
'auto_ignore_broadcasts': self.tracker.auto_ignore_broadcasts,
}
if request.match_info['what'] not in settings and request.match_info['what'] != 'all':
return web.Response(status=406, body=b"what invalid.") # NotAcceptable
if request.match_info['what'] == 'all':
return web.Response(
body=json.dumps(settings).encode('utf-8'))
return web.Response(
body=json.dumps(settings[request.match_info['what']]).encode('utf-8')
)
@coroutine
[docs] def settings_set(self, request):
check = self.check_request_input(request, ['what', 'value'])
if check is not None:
return check
what = request.match_info['what']
value = request.match_info['value']
if what == 'time_between_checks':
try:
_value = value.split(':')
value = []
for v in _value:
value.append(int(v))
except:
return web.Response(status=406, body=b"value invalid.") # NotAcceptable
if len(value) != 2:
return web.Response(status=406, body=b"value invalid.") # NotAcceptable
self.tracker.time_between_checks = datetime.timedelta(minutes=value[0], seconds=value[1])
return web.Response(status=200)
elif what == 'maximum_seconds_randomly_added':
try:
value = int(value)
except ValueError:
return web.Response(status=406, body=b"value invalid.") # NotAcceptable
else:
self.tracker.maximum_seconds_randomly_added = value
return web.Response(status=200)
elif what == 'auto_ignore_broadcasts':
if value == 'True':
self.tracker.auto_ignore_broadcasts = True
return web.Response(status=200)
elif value == 'False':
self.tracker.auto_ignore_broadcasts = False
return web.Response(status=200)
else:
return web.Response(status=406, body=b"value invalid.") # NotAcceptable
else:
return web.Response(status=406, body=b"what invalid.") # NotAcceptable
@coroutine
[docs] def ignore(self, request):
check = self.check_request_input(request, ['method', 'ip'])
if check is not None:
return check
as_ip = self.check_ip(request.match_info['ip'], check_in_tracker=False)
if not isinstance(as_ip, IPElement):
return as_ip
method = request.match_info['method']
if method == 'toggle':
if as_ip in self.tracker.ignore:
method = 'remove'
else:
method = 'add'
if method == 'add':
_ignore_list = self.tracker.ignore
_ignore_list.append(as_ip)
ignore_list = []
for elem in _ignore_list:
if elem not in ignore_list:
ignore_list.append(elem)
self.tracker.ignore = ignore_list
return web.Response(status=200)
elif method == 'remove':
ignore_list = []
for elem in self.tracker.ignore:
if elem != as_ip and elem not in ignore_list:
ignore_list.append(elem)
self.tracker.ignore = ignore_list
return web.Response(status=200)
else:
return web.Response(status=406, body=b"method invalid.")
@coroutine
[docs] def ignore_mac(self, request):
check = self.check_request_input(request, ['method', 'mac'])
if check is not None:
return check
as_mac = self.check_mac(request.match_info['mac'], check_in_tracker=False)
if not isinstance(as_mac, MACElement):
return as_mac
method = request.match_info['method']
if method == 'toggle':
if as_mac in self.tracker.ignore_mac:
method = 'remove'
else:
method = 'add'
if method == 'add':
_ignore_list = self.tracker.ignore_mac
_ignore_list.append(as_mac)
ignore_list = []
for elem in _ignore_list:
if elem not in ignore_list:
ignore_list.append(elem)
self.tracker.ignore_mac = ignore_list
return web.Response(status=200)
elif method == 'remove':
ignore_list = []
for elem in self.tracker.ignore_mac:
if elem != as_mac and elem not in ignore_list:
ignore_list.append(elem)
self.tracker.ignore_mac = ignore_list
return web.Response(status=200)
else:
return web.Response(status=406, body=b"method invalid.")
@coroutine
[docs] def ignore_name(self, request):
check = self.check_request_input(request, ['method', 'name'])
if check is not None:
return check
name = self.check_name(request.match_info['name'], check_in_tracker=False)
if type(name) != str:
return name
method = request.match_info['method']
if method == 'toggle':
if name in self.tracker.ignore_name:
method = 'remove'
else:
method = 'add'
if method == 'add':
_ignore_list = self.tracker.ignore_name
_ignore_list.append(name)
ignore_list = []
for elem in _ignore_list:
if elem not in ignore_list:
ignore_list.append(elem)
self.tracker.ignore_name = ignore_list
return web.Response(status=200)
elif method == 'remove':
ignore_list = []
for elem in self.tracker.ignore_name:
if elem != name and elem not in ignore_list:
ignore_list.append(elem)
self.tracker.ignore_name = ignore_list
return web.Response(status=200)
else:
return web.Response(status=406, body=b"method invalid.")
@coroutine
[docs] def is_ignored(self, request):
check = self.check_request_input(request, ['ip'])
if check is not None:
return check
as_ip = self.check_ip(request.match_info['ip'])
if isinstance(as_ip, web.Response):
return as_ip
is_ignored = {as_ip.ip[0]: as_ip in self.tracker.ignore}
return web.Response(status=200, body=
json.dumps(is_ignored).encode('utf-8')
)
@coroutine
[docs] def is_mac_ignored(self, request):
check = self.check_request_input(request, ['mac'])
if check is not None:
return check
as_mac = self.check_mac(request.match_info['mac'], check_in_tracker=False)
if isinstance(as_mac, web.Response):
return as_mac
is_ignored = {as_mac.mac: as_mac in self.tracker.ignore_mac}
return web.Response(status=200, body=
json.dumps(is_ignored).encode('utf-8')
)
@coroutine
[docs] def is_name_ignored(self, request):
check = self.check_request_input(request, ['name'])
if check is not None:
return check
name = self.check_name(request.match_info['name'], check_in_tracker=False)
if isinstance(name, web.Response):
return name
is_ignored = {name: name in self.tracker.ignore_name}
return web.Response(status=200, body=
json.dumps(is_ignored).encode('utf-8')
)
@coroutine
[docs] def ignored_ips(self, request):
check = self.check_request_input(request, [])
if check is not None:
return check
ignored = []
for host in self.tracker.ignore:
ignored.append(
host.ip[0]
)
return web.Response(status=200, body=
json.dumps(ignored).encode('utf-8')
)
@coroutine
[docs] def ignored_macs(self, request):
check = self.check_request_input(request, [])
if check is not None:
return check
ignored = []
for host in self.tracker.ignore_mac:
ignored.append(
host.mac
)
return web.Response(status=200, body=
json.dumps(ignored).encode('utf-8')
)
@coroutine
[docs] def ignored_names(self, request):
check = self.check_request_input(request, [])
if check is not None:
return check
ignored = []
for host in self.tracker.ignore_name:
ignored.append(
host
)
return web.Response(status=200, body=
json.dumps(ignored).encode('utf-8')
)
@coroutine
[docs] def set_priority(self, request):
check = self.check_request_input(request, ['ip', 'value'])
if check is not None:
return check
as_ip = self.check_ip(request.match_info['ip'])
if not isinstance(as_ip, IPElement):
return as_ip
value = request.match_info['value']
try:
value = int(value)
except ValueError:
return web.Response(status=406, body=b"value is invalid.")
priorities = self.tracker.priorities
priorities[as_ip] = value
self.tracker.priorities = priorities
return web.Response(status=200)
@coroutine
[docs] def get_priority(self, request):
check = self.check_request_input(request, ['ip'])
if check is not None:
return check
as_ip = self.check_ip(request.match_info['ip'])
if not isinstance(as_ip, IPElement):
return as_ip
try:
priority = {as_ip.ip[0]: self.tracker.priorities[as_ip]}
except KeyError:
priority = {as_ip.ip[0]: 0}
return web.Response(status=200, body=
json.dumps(priority).encode('utf-8')
)
@coroutine
[docs] def ip_host_changes(self, request):
check = self.check_request_input(request, ['ip', 'from', 'to'])
if check is not None:
return check
if request.match_info['ip'] == 'all':
as_ip = None
else:
as_ip = self.check_ip(request.match_info['ip'])
if not isinstance(as_ip, IPElement):
return as_ip
from_ = self.check_datetime(request.match_info['from'])
to = self.check_datetime(request.match_info['to'])
if not isinstance(from_, datetime.datetime):
return from_
if not isinstance(to, datetime.datetime):
return to
if as_ip is None:
changes = yield from self.tracker.ip_changes([], from_, to, json_compatible=True)
else:
changes = yield from self.tracker.ip_changes([IPHost(as_ip)], from_, to, json_compatible=True)
return web.Response(status=200, body=
json.dumps(changes).encode('utf-8')
)
@coroutine
[docs] def mac_host_changes(self, request):
check = self.check_request_input(request, ['mac', 'from', 'to'])
if check is not None:
return check
if request.match_info['mac'] == 'all':
as_mac = None
else:
as_mac = self.check_mac(request.match_info['mac'])
if not isinstance(as_mac, MACElement):
return as_mac
from_ = self.check_datetime(request.match_info['from'])
to = self.check_datetime(request.match_info['to'])
if not isinstance(from_, datetime.datetime):
return from_
if not isinstance(to, datetime.datetime):
return to
if as_mac is None:
changes = yield from self.tracker.changes([], from_, to, json_compatible=True)
else:
changes = yield from self.tracker.changes([MACHost(as_mac)], from_, to, json_compatible=True)
return web.Response(status=200, body=
json.dumps(changes).encode('utf-8')
)
@coroutine
[docs] def name_host_changes(self, request):
check = self.check_request_input(request, ['name', 'from', 'to'])
if check is not None:
return check
if request.match_info['name'] == 'all':
name = None
else:
name = self.check_name(request.match_info['name'])
if type(name) != str:
return name
from_ = self.check_datetime(request.match_info['from'])
to = self.check_datetime(request.match_info['to'])
if not isinstance(from_, datetime.datetime):
return from_
if not isinstance(to, datetime.datetime):
return to
if name is None:
changes = yield from self.tracker.changes([], from_, to, json_compatible=True)
else:
changes = yield from self.tracker.changes([NameHost(name)], from_, to, json_compatible=True)
return web.Response(status=200, body=
json.dumps(changes).encode('utf-8')
)
@coroutine
[docs] def up_ip_hosts(self, request):
check = self.check_request_input(request, [])
if check is not None:
return check
up_hosts = []
for host in self.tracker.up_ip_hosts:
up_hosts.append(host.ip[0])
yield
return web.Response(status=200, body=
json.dumps(up_hosts).encode('utf-8')
)
@coroutine
[docs] def up_mac_hosts(self, request):
check = self.check_request_input(request, [])
if check is not None:
return check
up_hosts = []
for host in self.tracker.up_mac_hosts:
up_hosts.append(host.mac)
yield
return web.Response(status=200, body=
json.dumps(up_hosts).encode('utf-8')
)
@coroutine
[docs] def up_name_hosts(self, request):
check = self.check_request_input(request, [])
if check is not None:
return check
up_hosts = []
for host in self.tracker.up_name_hosts:
up_hosts.append(host)
yield
return web.Response(status=200, body=
json.dumps(up_hosts).encode('utf-8')
)
[docs] def check_ip(self, ip, check_in_tracker=True):
try:
as_ip = IPElement(ip=ip, mask=self.tracker.network.mask)
except:
return web.Response(status=406, body=b"ip invalid.") # NotAcceptable
if as_ip not in self.tracker.ip_hosts and check_in_tracker:
return web.Response(status=406, body=b"ip not found.") # NotAcceptable
return as_ip
[docs] def check_mac(self, mac, check_in_tracker=True):
try:
as_mac = MACElement(mac)
except:
return web.Response(status=406, body=b"mac invalid.") # NotAcceptable
if as_mac not in self.tracker.mac_hosts and check_in_tracker:
return web.Response(status=406, body=b"mac not found.") # NotAcceptable
return as_mac
[docs] def check_name(self, name, check_in_tracker=True):
if type(name) != str:
return web.Response(status=406, body=b"name invalid.") # NotAcceptable
if name not in self.tracker.name_hosts and check_in_tracker:
return web.Response(status=406, body=b"name not found.") # NotAcceptable
return name
[docs] def check_datetime(self, input_, accept_now=True):
# expected format: dd.mm.yyyy-hh.mm.ss
if accept_now:
if input_ == 'now':
return datetime.datetime.now()
try:
datet = []
for got in input_.split('-'):
for g in got.split('.'):
datet.append(int(g))
return datetime.datetime(
day=datet[0], month=datet[1], year=datet[2], hour=datet[3], minute=datet[4], second=datet[5])
except:
return web.Response(status=406, body=b"date is invalid.") # NotAcceptable