Commit b854b76c authored by Benjamin "Ziirish" SANS's avatar Benjamin "Ziirish" SANS

Merge branch 'parallel-for-agent' into 'master'

Parallel for agent

See merge request !103
parents c9a65e0f ad1ffb67
Pipeline #1537 failed with stages
in 23 seconds
......@@ -11,8 +11,6 @@ import time
import datetime
from functools import wraps
from wsgiref.handlers import format_date_time
from flask_restplus.utils import unpack
def browser_cache(expires=None):
......@@ -31,6 +29,9 @@ def browser_cache(expires=None):
return render_template('index.html')
"""
from wsgiref.handlers import format_date_time
from flask_restplus.utils import unpack
def cache_decorator(view):
@wraps(view)
def cache_func(*args, **kwargs):
......@@ -64,5 +65,25 @@ def implement(func):
to indicate we don't want the default "magic" implementation and use the
custom implementation instead.
"""
func.__ismethodimplemented__ = True
try:
func.__ismethodimplemented__ = True
except AttributeError:
# properties seem immutable
pass
return func
def usetriorun(func):
"""A decorator indicating the method uses trio.run
Such functions should always be written like this:
::
def function(self, *args, **kwargs):
return trio.run(self._async_function, partial(*args, **kwargs))
"""
try:
func.__isusingtriorun__ = True
except AttributeError:
pass
return func
......@@ -15,8 +15,6 @@ import json
import logging
import trio
from logging.handlers import RotatingFileHandler
from ..exceptions import BUIserverException
from ..misc.backend.interface import BUIbackend
from .._compat import pickle, to_bytes, to_unicode
......@@ -53,20 +51,24 @@ class BurpHandler(BUIbackend):
BUIbackend.__abstractmethods__ = frozenset()
def __init__(self, backend='burp2', logger=None, conf=None):
self.backend = backend
self.backend_name = backend
self.is_async = backend == 'parallel'
self.logger = logger
top = __name__
if '.' in self.backend:
module = self.backend
if '.' in self.backend_name:
module = self.backend_name
else:
if '.' in top:
top = top.split('.')[0]
module = '{0}.misc.backend.{1}'.format(top, self.backend)
module = '{0}.misc.backend.{1}'.format(top, self.backend_name)
try:
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
mod = __import__(module, fromlist=['Burp'])
Client = mod.Burp
if self.is_async:
Client = mod.AsyncBurp
else:
Client = mod.Burp
self.backend = Client(conf=conf)
stats = self.backend.statistics()
if 'alive' not in stats or not stats['alive']:
......@@ -78,7 +80,7 @@ class BurpHandler(BUIbackend):
def __getattribute__(self, name):
# always return this value because we need it and if we don't do that
# we'll end up with an infinite loop
if name == 'foreign' or name == 'backend':
if name in ['foreign', 'backend', 'logger', 'is_async']:
return object.__getattribute__(self, name)
# now we can retrieve the 'foreign' list and know if the object called
# is in the backend
......@@ -97,36 +99,10 @@ class BUIAgent(BUIbackend):
def __init__(self, conf=None, level=0, logfile=None):
self.padding = 1
level = level or 0
if level > logging.NOTSET:
levels = [
logging.CRITICAL,
logging.ERROR,
logging.WARNING,
logging.INFO,
logging.DEBUG,
]
if level >= len(levels):
level = len(levels) - 1
lvl = levels[level]
self.logger.setLevel(lvl)
if lvl > logging.DEBUG:
LOG_FORMAT = '[%(asctime)s] %(levelname)s in %(module)s.%(funcName)s: %(message)s'
else:
LOG_FORMAT = (
'-' * 80 + '\n' +
'%(levelname)s in %(module)s.%(funcName)s [%(pathname)s:%(lineno)d]:\n' +
'%(message)s\n' +
'-' * 80
)
if logfile:
handler = RotatingFileHandler(logfile, maxBytes=1024 * 1024 * 100, backupCount=20)
else:
handler = logging.StreamHandler()
handler.setLevel(lvl)
handler.setFormatter(logging.Formatter(LOG_FORMAT))
self.logger.addHandler(handler)
self.logger.info('conf: {}'.format(conf))
self.logger.info('level: {}'.format(logging.getLevelName(lvl)))
self.logger.init_logger(config=dict(level=level, logfile=logfile))
lvl = self.logger.getEffectiveLevel()
self.logger.info('conf: {}'.format(conf))
self.logger.info('level: {}'.format(logging.getLevelName(lvl)))
if not conf:
raise IOError('No configuration file found')
......@@ -194,7 +170,10 @@ class BUIAgent(BUIbackend):
elif j['func'] == 'agent_version':
res = json.dumps(__version__)
elif j['func'] == 'restore_files':
res, err = getattr(self.client, j['func'])(**j['args'])
if self.client.is_async:
res, err = await getattr(self.client, j['func'])(**j['args'])
else:
res, err = getattr(self.client, j['func'])(**j['args'])
if err:
await server_stream.send_all(b'ER')
await server_stream.send_all(struct.pack('!Q', len(err)))
......@@ -258,6 +237,7 @@ class BUIAgent(BUIbackend):
os.unlink(path)
res = json.dumps(True)
else:
callback = getattr(self.client, j['func'])
if j['args']:
if 'pickled' in j and j['pickled']:
# de-serialize arguments if needed
......@@ -280,9 +260,16 @@ class BUIAgent(BUIbackend):
data = b64decode(pickles)
data = data.replace(b'burpui.datastructures', to_bytes(f'{mod}.datastructures'))
j['args'] = pickle.loads(data)
res = json.dumps(getattr(self.client, j['func'])(**j['args']))
if self.client.is_async:
res = json.dumps(await callback(**j['args']))
else:
res = json.dumps(callback(**j['args']))
else:
res = json.dumps(getattr(self.client, j['func'])())
if self.client.is_async:
res = json.dumps(await callback())
else:
res = json.dumps(callback())
self.logger.info(f'result: {res}')
await server_stream.send_all(b'OK')
except (BUIserverException, Exception) as exc:
......
......@@ -541,9 +541,7 @@ class Burp(BUIbackend):
def get_clients_report(self, clients, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_clients_report`"""
ret = {}
cls = []
bkp = []
data = []
for cli in clients:
if not cli:
continue
......@@ -551,19 +549,27 @@ class Burp(BUIbackend):
if not client or not client[-1]:
continue
stats = self.get_backup_logs(client[-1]['number'], cli['name'])
data.append((cli, client, stats))
return self._do_get_clients_report(data)
def _do_get_clients_report(self, data):
ret = {}
cls = []
bkp = []
for client, backups, stats in data:
os = stats['os'] if 'os' in stats else "unknown"
totsize = stats['totsize'] if 'totsize' in stats else 0
total = stats['total']['total'] if \
'total' in stats and 'total' in stats['total'] else 0
cls.append({
'name': cli['name'],
'name': client['name'],
'stats': {
'os': os,
'totsize': totsize,
'total': total
}
})
bkp.append({'name': cli['name'], 'number': len(client)})
bkp.append({'name': client['name'], 'number': len(backups)})
ret = {'clients': cls, 'backups': bkp}
return ret
......
......@@ -376,7 +376,11 @@ class Burp(Burp1):
except KeyError:
self.logger.warning('Client not found')
return ret
return self._do_get_counters(client)
def _do_get_counters(self, data):
ret = {}
client = data
# check the client is currently backing-up
if 'run_status' not in client or client['run_status'] != 'running':
return ret
......@@ -486,13 +490,14 @@ class Burp(Burp1):
query = self.status('c:{0}\n'.format(name))
except BUIserverException:
return False
if not query:
return False
try:
return query['clients'][0]['run_status'] in ['running']
except KeyError:
self.logger.warning('Client not found')
return False
return self._do_is_backup_running(query)
def _do_is_backup_running(self, data):
if data:
try:
return data['clients'][0]['run_status'] in ['running']
except KeyError:
pass
return False
def is_one_backup_running(self, agent=None):
......@@ -504,7 +509,11 @@ class Burp(Burp1):
clients = self.get_all_clients()
except BUIserverException:
return ret
for client in clients:
return self._do_is_one_backup_running(clients)
def _do_is_one_backup_running(self, data):
ret = []
for client in data:
if client['state'] in ['running']:
ret.append(client['name'])
return ret
......@@ -631,6 +640,11 @@ class Burp(Burp1):
except (KeyError, IndexError):
self.logger.warning('Client not found')
return ret
return self._do_get_client_status(client)
def _do_get_client_status(self, data):
ret = {}
client = data
ret['state'] = self._status_human_readable(client['run_status'])
infos = client['backups']
if ret['state'] in ['running']:
......@@ -641,7 +655,7 @@ class Burp(Burp1):
if 'action' in child and child['action'] == 'backup':
ret['phase'] = child['phase']
break
counters = self.get_counters(name)
counters = self._do_get_counters(client)
if 'percent' in counters:
ret['percent'] = counters['percent']
else:
......@@ -736,6 +750,10 @@ class Burp(Burp1):
query = self.status('c:{0}:b:{1}\n'.format(name, backup))
if not query:
return False
return self._do_is_backup_deletable(query)
def _do_is_backup_deletable(self, data):
query = data
try:
flags = query['clients'][0]['backups'][0]['flags']
return 'deletable' in flags
......
......@@ -9,10 +9,11 @@
"""
import os
import re
import logging
from abc import ABCMeta, abstractmethod
from ...tools.logging import logger
G_BURPPORT = 4972
G_BURPHOST = '::1'
G_BURPBIN = '/usr/sbin/burp'
......@@ -44,7 +45,7 @@ class BUIbackend(object, metaclass=ABCMeta):
# Defaults config parameters
defaults = {}
logger = logging.getLogger('burp-ui')
logger = logger
def __init__(self, server=None, conf=None): # pragma: no cover
"""
......@@ -1143,3 +1144,6 @@ class BUIbackend(object, metaclass=ABCMeta):
returns the version of the given agent.
"""
raise NotImplementedError("Sorry, the current Backend does not implement this method!") # pragma: no cover
BUIBACKEND_INTERFACE_METHODS = BUIbackend.__abstractmethods__.copy()
......@@ -12,14 +12,17 @@ import json
import ssl
import trio
import struct
import logging
from asyncio import iscoroutinefunction
from .burp2 import Burp as Burp2
from .interface import BUIbackend
from .interface import BUIbackend, BUIBACKEND_INTERFACE_METHODS
from .utils.constant import BURP_STATUS_FORMAT_V2
from ..parser.burp2 import Parser
from ...exceptions import BUIserverException
from ...decorators import implement, usetriorun
from ..._compat import to_unicode, to_bytes
from ...tools.logging import logger
BUI_DEFAULTS = {
'Parallel': {
......@@ -34,7 +37,7 @@ BUI_DEFAULTS = {
class Parallel:
logger = logging.getLogger('burp-ui') # type: logging.Logger
logger = logger
def __init__(self, conf):
"""Parallel client
......@@ -210,15 +213,11 @@ class Burp(Burp2):
@property
def client_version(self):
if self._client_version is None:
self._client_version = trio.run(self._async_request, 'client_version')
return self._client_version
return self.get_client_version()
@property
def server_version(self):
if self._server_version is None:
self._server_version = trio.run(self._async_request, 'server_version')
return self._server_version
return self.get_server_version()
@property
def batch_list_supported(self):
......@@ -229,7 +228,17 @@ class Burp(Burp2):
def statistics(self, agent=None):
return json.loads(trio.run(self._async_request, 'statistics'))
async def _async_status(self, query='c:\n', timeout=None, cache=True):
def get_client_version(self, agent=None):
if self._client_version is None:
self._client_version = trio.run(self._async_request, 'client_version')
return self._client_version
def get_server_version(self, agent=None):
if self._server_version is None:
self._server_version = trio.run(self._async_request, 'server_version')
return self._server_version
async def _async_status(self, query='c:\n', timeout=None, cache=True, agent=None):
async_client = Parallel(self.conf)
try:
return await async_client.status(query, timeout, cache)
......@@ -243,6 +252,7 @@ class Burp(Burp2):
except OSError as exc:
raise BUIserverException(str(exc))
@usetriorun
def status(self, query='c:\n', timeout=None, cache=True, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.status`"""
return trio.run(self._async_status, query, timeout, cache)
......@@ -365,14 +375,87 @@ class Burp(Burp2):
bucket.append(ret)
return ret
# def get_clients_report(self, clients, agent=None):
async def _async_get_clients_report(self, clients, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_clients_report`"""
async def __compute_client_report(cli, limit, queue):
async with limit:
if not cli:
return
client = await self._async_get_client(cli['name'])
if not client or not client[-1]:
return
stats = await self._async_get_backup_logs(client[-1]['number'], cli['name'])
queue.append((cli, client, stats))
data = []
limiter = trio.CapacityLimiter(self.concurrency)
async with trio.open_nursery() as nursery:
for client in clients:
nursery.start_soon(__compute_client_report, client, limiter, data)
return self._do_get_clients_report(data)
@usetriorun
def get_clients_report(self, clients, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_clients_report`"""
return trio.run(self._async_get_clients_report, clients)
async def _async_get_counters(self, name=None, agent=None):
ret = {}
query = await self._async_status('c:{0}\n'.format(name), cache=False)
# check the status returned something
if not query:
return ret
try:
client = query['clients'][0]
except KeyError:
self.logger.warning('Client not found')
return ret
return self._do_get_counters(client)
# inherited
# def get_counters(self, name=None, agent=None):
@usetriorun
def get_counters(self, name=None, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_counters`"""
return trio.run(self._async_get_counters, name)
# def is_backup_running(self, name=None, agent=None):
async def _async_is_backup_running(self, name=None, agent=None):
"""See
:func:`burpui.misc.backend.interface.BUIbackend.is_backup_running`
"""
if not name:
return False
try:
query = await self._async_status('c:{0}\n'.format(name))
except BUIserverException:
return False
return self._do_is_backup_running(query)
# def is_one_backup_running(self, agent=None):
@usetriorun
def is_backup_running(self, name=None, agent=None):
"""See
:func:`burpui.misc.backend.interface.BUIbackend.is_backup_running`
"""
return trio.run(self._async_is_backup_running, name)
async def _async_is_one_backup_running(self, agent=None):
"""See
:func:`burpui.misc.backend.interface.BUIbackend.is_one_backup_running`
"""
ret = []
try:
clients = await self._async_get_all_clients()
except BUIserverException:
return ret
return self._do_is_one_backup_running(clients)
@usetriorun
def is_one_backup_running(self, agent=None):
"""See
:func:`burpui.misc.backend.interface.BUIbackend.is_one_backup_running`
"""
return trio.run(self._async_is_one_backup_running)
async def _async_get_last_backup(self, name):
"""Return the last backup of a given client
......@@ -389,6 +472,7 @@ class Burp(Burp2):
except (KeyError, BUIserverException):
return None
@usetriorun
def _get_last_backup(self, name):
"""Return the last backup of a given client
......@@ -444,6 +528,7 @@ class Burp(Burp2):
self._os_cache[name] = ret
return ret
@usetriorun
def _guess_os(self, name):
"""Return the OS of the given client based on the magic *os* label
......@@ -459,7 +544,7 @@ class Burp(Burp2):
"""
return trio.run(self._async_guess_os, name)
async def _async_get_all_clients(self):
async def _async_get_all_clients(self, agent=None):
ret = []
query = await self._async_status()
if not query or 'clients' not in query:
......@@ -501,16 +586,36 @@ class Burp(Burp2):
# the deep inspection can take advantage of async processing
return trio.run(self._async_get_all_clients)
# def get_client_status(self, name=None, agent=None):
async def _async_get_client_status(self, name=None, agent=None):
ret = {}
if not name:
return ret
query = await self._async_status('c:{0}\n'.format(name))
if not query:
return ret
try:
client = query['clients'][0]
except (KeyError, IndexError):
self.logger.warning('Client not found')
return ret
return self._do_get_client_status(client)
@usetriorun
def get_client_status(self, name=None, agent=None):
"""See
:func:`burpui.misc.backend.interface.BUIbackend.get_client_status`
"""
return trio.run(self._async_get_client_status, name)
async def _async_get_client(self, name=None):
async def _async_get_client(self, name=None, agent=None):
return await self._async_get_client_filtered(name)
@usetriorun
def get_client(self, name=None, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_client`"""
return trio.run(self._async_get_client, name)
async def _async_get_client_filtered(self, name=None, limit=-1, page=None, start=None, end=None):
async def _async_get_client_filtered(self, name=None, limit=-1, page=None, start=None, end=None, agent=None):
ret = []
if not name:
return ret
......@@ -588,13 +693,27 @@ class Burp(Burp2):
ret = sorted(queue, key=lambda x: x['number'])
return ret
@usetriorun
def get_client_filtered(self, name=None, limit=-1, page=None, start=None, end=None, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_client_filtered`"""
return trio.run(self._async_get_client_filtered, name, limit, page, start, end)
# def is_backup_deletable(self, name=None, backup=None, agent=None):
async def _async_is_backup_deletable(self, name=None, backup=None, agent=None):
if not name or not backup:
return False
query = await self._async_status('c:{0}:b:{1}\n'.format(name, backup))
if not query:
return False
return self._do_is_backup_deletable(query)
@usetriorun
def is_backup_deletable(self, name=None, backup=None, agent=None):
"""See
:func:`burpui.misc.backend.interface.BUIbackend.is_backup_deletable`
"""
return trio.run(self._async_is_backup_deletable, name, backup)
async def _async_get_tree(self, name=None, backup=None, root=None, level=-1):
async def _async_get_tree(self, name=None, backup=None, root=None, level=-1, agent=None):
ret = []
if not name or not backup:
return ret
......@@ -615,15 +734,12 @@ class Burp(Burp2):
)
return self._format_tree(query, top, level)
@usetriorun
def get_tree(self, name=None, backup=None, root=None, level=-1, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_tree`"""
return trio.run(self._async_get_tree, name, backup, root, level)
# def get_client_version(self, agent=None):
# def get_server_version(self, agent=None):
async def _async_get_client_labels(self, client=None):
async def _async_get_client_labels(self, client=None, agent=None):
"""See
:func:`burpui.misc.backend.interface.BUIbackend.get_client_labels`
"""
......@@ -643,6 +759,7 @@ class Burp(Burp2):
except KeyError:
return ret
@usetriorun
def get_client_labels(self, client=None, agent=None):
"""See
:func:`burpui.misc.backend.interface.BUIbackend.get_client_labels`
......@@ -669,3 +786,124 @@ class Burp(Burp2):
# def store_conf_srv(self, data, agent=None):
# def get_parser_attr(self, attr=None, agent=None):
# Make every "Burp" method async
class AsyncBurp(Burp):
# this method must not be async!
@implement