update trio requirement and drop use of deprecated Queues

parent f9930c91
Pipeline #1493 passed with stages
in 3 minutes and 32 seconds
......@@ -10,6 +10,7 @@
import ssl
import trio
import json
import math
import struct
import logging
import datetime
......@@ -41,6 +42,47 @@ BUI_DEFAULTS = {
}
class Pool:
def __init__(self, pool_size):
self._size = pool_size
self.send_channel, self.receive_channel = trio.open_memory_channel(pool_size)
@property
def size(self):
return self._size
@property
def stats(self):
return self.send_channel.statistics()
async def put(self, data):
await self.send_channel.send(data)
async def get(self):
return await self.receive_channel.receive()
def empty(self):
stats = self.stats
if self.size != 0 and stats.current_buffer_used == 0:
return True
return False
def full(self):
stats = self.stats
max_buffer = self.size
if max_buffer > 0 and max_buffer != math.inf:
return stats.current_buffer_used == max_buffer
return False
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.send_channel.aclose()
await self.receive_channel.aclose()
class MonitorPool:
logger = logging.getLogger('burp-ui') # type: logging.Logger
......@@ -95,7 +137,7 @@ class MonitorPool:
self.sslcert = self.conf.safe_get('sslcert')
self.sslkey = self.conf.safe_get('sslkey')
self.password = self.conf.safe_get('password')
self.pool = self.conf.safe_get('pool', 'integer')
self.pool_size = self.conf.safe_get('pool', 'integer')
self.burpbin = self.conf.safe_get('burpbin', section='Burp')
self.bconfcli = self.conf.safe_get('bconfcli', section='Burp')
......@@ -103,7 +145,8 @@ class MonitorPool:
self.conf.setdefault('BUI_MONITOR', True)
self.monitor_pool = trio.Queue(self.pool)
# self.monitor_pool = trio.Queue(self.pool)
self.pool = Pool(self.pool_size)
def _ssl_context(self):
if not self.ssl:
......@@ -141,23 +184,23 @@ class MonitorPool:
@asynccontextmanager
async def get_mon(self, ident) -> Monitor:
if self.monitor_pool.empty():
async with trio.open_nursery() as nursery:
for i in range(self.pool):
nursery.start_soon(self.launch_monitor, i + 1)
if self.monitor_pool.empty():
raise OSError('Unable to run burp client')
# if self.monitor_pool.empty():
# async with trio.open_nursery() as nursery:
# for i in range(self.pool):
# nursery.start_soon(self.launch_monitor, i + 1)
# if self.monitor_pool.empty():
# raise OSError('Unable to run burp client')
self.logger.info(f'{ident} - Waiting for a monitor...')
t1 = trio.current_time()
mon = await self.monitor_pool.get() # type: Monitor
mon = await self.pool.get() # type: Monitor
t2 = trio.current_time()
t = t2 - t1
self.logger.info(f'{ident} - Waited {t:.3f}s')
yield mon
self.logger.info(f'{ident} - Releasing monitor')
await self.monitor_pool.put(mon)
await self.pool.put(mon)
async def handle(self, server_stream: trio.StapledStream):
async def handle(self, server_stream: trio.abc.Stream):
try:
ident = next(CONNECTION_COUNTER)
self.logger.info(f'{ident} - handle_request: started')
......@@ -236,10 +279,13 @@ class MonitorPool:
async def launch_monitor(self, id):
self.logger.info(f'Starting client n°{id}')
try:
if self.pool.full():
self.logger.warning('pool full!')
return
mon = Monitor(self.burpbin, self.bconfcli, timeout=self.timeout, ident=id)
# warm up monitor
mon.status()
await self.monitor_pool.put(mon)
await self.pool.put(mon)
except (BUIserverException, OSError):
pass
......@@ -250,20 +296,17 @@ class MonitorPool:
del mon
async def run(self):
self.logger.info('Starting clients...')
async with trio.open_nursery() as nursery:
for i in range(self.pool):
nursery.start_soon(self.launch_monitor, i + 1)
self.logger.info(f'Ready to serve requests on {self.bind}:{self.port}')
try:
ctx = self._ssl_context()
if ctx:
await trio.serve_ssl_over_tcp(self.handle, self.port, ctx, host=self.bind)
else:
await trio.serve_tcp(self.handle, self.port, host=self.bind)
except KeyboardInterrupt:
pass
self.logger.info('Cleaning up')
async with trio.open_nursery() as nursery:
nursery.start_soon(self.cleanup_monitor)
async with self.pool:
self.logger.info('Starting clients...')
async with trio.open_nursery() as nursery:
for i in range(self.pool_size):
nursery.start_soon(self.launch_monitor, i + 1)
self.logger.info(f'Ready to serve requests on {self.bind}:{self.port}')
try:
ctx = self._ssl_context()
if ctx:
await trio.serve_ssl_over_tcp(self.handle, self.port, ctx, host=self.bind)
else:
await trio.serve_tcp(self.handle, self.port, host=self.bind)
except KeyboardInterrupt:
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment