Commit 15be9a62 authored by Ziirish's avatar Ziirish

various performance improvements for the parallel backend

parent 77f0554d
Pipeline #1618 passed with stages
in 22 minutes and 5 seconds
......@@ -118,7 +118,6 @@ class MonitorPool:
self.conf.setdefault('BUI_MONITOR', True)
# self.monitor_pool = trio.Queue(self.pool)
self.pool = Pool(self.pool_size)
def _ssl_context(self):
......@@ -157,10 +156,6 @@ class MonitorPool:
@asynccontextmanager
async def get_mon(self, ident) -> Monitor:
if self.pool.empty():
await self.fill_pool()
if self.pool.empty():
raise BUIserverException("Unable to spawn Monitors")
self.logger.info(f'{ident} - Waiting for a monitor...')
t1 = trio.current_time()
mon = await self.pool.get() # type: Monitor
......@@ -206,9 +201,7 @@ class MonitorPool:
'server_version': 'unknown',
'client_version': 'unknown'
}
if self.pool.empty():
await self.fill_pool()
while not self.pool.empty():
while not res['alive'] and len(tmp) < self.pool.size:
mon = await self.pool.get()
tmp.append(mon)
if mon.alive:
......@@ -218,6 +211,7 @@ class MonitorPool:
'client_version': getattr(mon, 'client_version', '')
}
break
await trio.sleep(0.5)
for mon in tmp:
await self.pool.put(mon)
response = json.dumps(res)
......@@ -294,15 +288,21 @@ class MonitorPool:
for i in range(self.pool_size):
nursery.start_soon(self.launch_monitor, i + 1)
async def _run(self):
self.logger.info(f'Ready to serve requests on {self.bind}:{self.port}')
ctx = self._ssl_context()
if ctx:
await trio.serve_ssl_over_tcp(self.handle, self.port, ctx, host=self.bind)
else:
await trio.serve_tcp(self.handle, self.port, host=self.bind)
async def run(self):
async with self.pool:
await self.fill_pool()
self.logger.info(f'Ready to serve requests on {self.bind}:{self.port}')
try:
ctx = self._ssl_context()
if ctx:
await trio.serve_ssl_over_tcp(self.handle, self.port, ctx, host=self.bind)
else:
await trio.serve_tcp(self.handle, self.port, host=self.bind)
async with trio.open_nursery() as nursery:
# listen to connections as soon as possible
nursery.start_soon(self._run)
# in parallel we start to populate the pool
nursery.start_soon(self.fill_pool)
except KeyboardInterrupt:
pass
......@@ -39,11 +39,11 @@ BUI_DEFAULTS = {
}
class Parallel:
class Connector:
logger = logger
def __init__(self, conf):
"""Parallel client
"""Connector client
:param conf: Configuration to use
:type conf: :class:`burpui.config.BUIConfig`
......@@ -79,13 +79,16 @@ class Parallel:
self.connected = True
return self.client_stream
async def _do_process(self, data):
res = '[]'
async def _send(self, data):
data = to_bytes(data)
length = struct.pack('!Q', len(data))
await self.client_stream.send_all(length)
self.logger.debug(f'Sending: {data!r}')
self.logger.debug(f"Sending: {data!r}")
await self.client_stream.send_all(data)
async def _do_process(self, data):
res = '[]'
await self._send(data)
tmp = await self.client_stream.receive_some(2)
tmp = to_unicode(tmp)
if tmp == 'ER':
......@@ -223,12 +226,13 @@ class Burp(Burp2):
if self.init_wait:
exc = None
init_mon = Parallel(conf)
for i in range(self.init_wait):
connector = Connector(conf)
try:
self.logger.warning('monitor not ready, waiting for it... {}/{}'.format(i, self.init_wait))
trio.run(init_mon.conn)
if init_mon.connected:
trio.run(connector.conn)
if connector.connected:
trio.run(connector._send, 'RE')
break
except BUIserverException as eee:
exc = eee
......@@ -236,7 +240,6 @@ class Burp(Burp2):
else:
self.logger.error('monitor not ready, giving up!')
raise exc
del init_mon
stats = self.statistics()
if 'alive' in stats and stats['alive']:
self.init_all()
......@@ -279,18 +282,18 @@ class Burp(Burp2):
return self._server_version or ''
async def _async_status(self, query='c:\n', timeout=None, cache=True, agent=None):
async_client = Parallel(self.conf)
try:
return await async_client.status(query, timeout, cache)
connector = Connector(self.conf)
return await connector.status(query, timeout, cache)
except (OSError, IOError) as exc:
raise BUIserverException(str(exc))
if not self._ready:
self.init_all()
async def _async_request(self, func, *args, **kwargs):
async_client = Parallel(self.conf)
try:
return await async_client.request(func, *args, **kwargs)
connector = Connector(self.conf)
return await connector.request(func, *args, **kwargs)
except (OSError, IOError) as exc:
raise BUIserverException(str(exc))
......@@ -419,7 +422,7 @@ class Burp(Burp2):
async def _async_get_clients_report(self, clients, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_clients_report`"""
async def __compute_client_report(cli, limit, queue):
async def __compute_client_report(cli, queue, limit):
async with limit:
if not cli:
return
......@@ -434,7 +437,7 @@ class Burp(Burp2):
async with trio.open_nursery() as nursery:
for client in clients:
nursery.start_soon(__compute_client_report, client, limiter, data)
nursery.start_soon(__compute_client_report, client, data, limiter)
return self._do_get_clients_report(data)
......@@ -487,7 +490,7 @@ class Burp(Burp2):
"""
ret = []
try:
clients = await self._async_get_all_clients()
clients = await self._async_get_all_clients(deep=False)
except BUIserverException:
return ret
return self._do_is_one_backup_running(clients)
......@@ -586,13 +589,13 @@ class Burp(Burp2):
"""
return trio.run(self._async_guess_os, name)
async def _async_get_all_clients(self, agent=None):
async def _async_get_all_clients(self, agent=None, deep=True):
ret = []
query = await self._async_status()
if not query or 'clients' not in query:
return ret
async def __compute_client_data(client, limit, queue):
async def __compute_client_data(client, queue, limit):
async with limit:
cli = {}
cli['name'] = client['name']
......@@ -604,8 +607,11 @@ class Burp(Burp2):
cli['last'] = 'never'
else:
infos = infos[0]
logs = await self._async_get_backup_logs(infos['number'], client['name'])
cli['last'] = logs['start']
if deep:
logs = await self._async_get_backup_logs(infos['number'], client['name'])
cli['last'] = logs['start']
else:
cli['last'] = infos['timestamp']
queue.append(cli)
clients = query['clients']
......@@ -613,7 +619,7 @@ class Burp(Burp2):
async with trio.open_nursery() as nursery:
for client in clients:
nursery.start_soon(__compute_client_data, client, limiter, ret)
nursery.start_soon(__compute_client_data, client, ret, limiter)
return ret
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment