Verified Commit 43253d82 authored by Benjamin "Ziirish" SANS's avatar Benjamin "Ziirish" SANS
Browse files

use black to format the code

parent 3dd7885b
......@@ -13,7 +13,7 @@ import warnings
from .app import create_app
warnings.simplefilter('always', RuntimeWarning)
warnings.simplefilter("always", RuntimeWarning)
# backward compatibility
......
......@@ -17,34 +17,73 @@ from argparse import ArgumentParser, REMAINDER
ROOT = os.path.dirname(os.path.realpath(__file__))
# Try to load modules from our current env first
sys.path.insert(0, os.path.join(ROOT, '..'))
sys.path.insert(0, os.path.join(ROOT, ".."))
def parse_args(mode=True, name=None):
mname = name
if not name:
mname = 'burp-ui'
mname = "burp-ui"
parser = ArgumentParser(prog=mname)
parser.add_argument('-v', '--verbose', dest='log', help='increase output verbosity (e.g., -vv is more verbose than -v)', action='count')
parser.add_argument('-d', '--debug', dest='debug', help='enable debug mode', action='store_true')
parser.add_argument('-V', '--version', dest='version', help='print version and exit', action='store_true')
parser.add_argument('-c', '--config', dest='config', help='burp-ui configuration file', metavar='<CONFIG>')
parser.add_argument('-l', '--logfile', dest='logfile', help='output logs in defined file', metavar='<FILE>')
parser.add_argument('-i', '--migrations', dest='migrations', help='migrations directory', metavar='<MIGRATIONSDIR>')
parser.add_argument('remaining', nargs=REMAINDER)
parser.add_argument(
"-v",
"--verbose",
dest="log",
help="increase output verbosity (e.g., -vv is more verbose than -v)",
action="count",
)
parser.add_argument(
"-d", "--debug", dest="debug", help="enable debug mode", action="store_true"
)
parser.add_argument(
"-V",
"--version",
dest="version",
help="print version and exit",
action="store_true",
)
parser.add_argument(
"-c",
"--config",
dest="config",
help="burp-ui configuration file",
metavar="<CONFIG>",
)
parser.add_argument(
"-l",
"--logfile",
dest="logfile",
help="output logs in defined file",
metavar="<FILE>",
)
parser.add_argument(
"-i",
"--migrations",
dest="migrations",
help="migrations directory",
metavar="<MIGRATIONSDIR>",
)
parser.add_argument("remaining", nargs=REMAINDER)
if mode:
parser.add_argument('-m', '--mode', dest='mode', help='application mode', metavar='<agent|server|celery|manage|monitor|legacy>')
parser.add_argument(
"-m",
"--mode",
dest="mode",
help="application mode",
metavar="<agent|server|celery|manage|monitor|legacy>",
)
options, unknown = parser.parse_known_args()
if mode and options.mode and options.mode not in ['celery', 'manage', 'server']:
if mode and options.mode and options.mode not in ["celery", "manage", "server"]:
options = parser.parse_args()
unknown = []
if options.version:
from burpui.desc import __title__, __version__, __release__
ver = '{}: v{}'.format(mname or __title__, __version__)
ver = "{}: v{}".format(mname or __title__, __version__)
if options.log:
ver = '{} ({})'.format(ver, __release__)
ver = "{} ({})".format(ver, __release__)
print(ver)
sys.exit(0)
......@@ -57,20 +96,20 @@ def main():
"""
options, unknown = parse_args(mode=True)
if not options.mode or options.mode == 'server':
if not options.mode or options.mode == "server":
server(options, unknown)
elif options.mode == 'agent':
elif options.mode == "agent":
agent(options)
elif options.mode == 'celery':
elif options.mode == "celery":
celery()
elif options.mode == 'manage':
elif options.mode == "manage":
manage()
elif options.mode == 'monitor':
elif options.mode == "monitor":
monitor(options)
elif options.mode == 'legacy':
elif options.mode == "legacy":
legacy(options, unknown)
else:
print('Wrong mode!')
print("Wrong mode!")
sys.exit(1)
......@@ -86,31 +125,28 @@ def server(options=None, unknown=None):
if options.config:
conf = lookup_file(options.config, guess=False)
else:
if 'BUI_CONFIG' in env:
conf = env['BUI_CONFIG']
if "BUI_CONFIG" in env:
conf = env["BUI_CONFIG"]
else:
conf = lookup_file()
check_config(conf)
if os.path.isdir('burpui'):
env['FLASK_APP'] = 'burpui/cli.py'
if os.path.isdir("burpui"):
env["FLASK_APP"] = "burpui/cli.py"
else:
env['FLASK_APP'] = 'burpui.cli'
env['BUI_CONFIG'] = conf
env['BUI_VERBOSE'] = str(options.log)
env["FLASK_APP"] = "burpui.cli"
env["BUI_CONFIG"] = conf
env["BUI_VERBOSE"] = str(options.log)
if options.logfile:
env['BUI_LOGFILE'] = options.logfile
env["BUI_LOGFILE"] = options.logfile
if options.debug:
env['BUI_DEBUG'] = '1'
env['FLASK_DEBUG'] = '1'
env['BUI_MODE'] = 'server'
args = [
'flask',
'run'
]
env["BUI_DEBUG"] = "1"
env["FLASK_DEBUG"] = "1"
env["BUI_MODE"] = "server"
args = ["flask", "run"]
args += unknown
args += [x for x in options.remaining if x != '--']
args += [x for x in options.remaining if x != "--"]
os.execvpe(args[0], args, env)
......@@ -121,9 +157,9 @@ def agent(options=None):
from burpui.utils import lookup_file
if not options:
options, _ = parse_args(mode=False, name='bui-agent')
options, _ = parse_args(mode=False, name="bui-agent")
conf = ['buiagent.cfg', 'buiagent.sample.cfg']
conf = ["buiagent.cfg", "buiagent.sample.cfg"]
if options.config:
conf = lookup_file(options.config, guess=False)
else:
......@@ -140,9 +176,9 @@ def monitor(options=None):
from burpui.utils import lookup_file
if not options:
options, _ = parse_args(mode=False, name='bui-agent')
options, _ = parse_args(mode=False, name="bui-agent")
conf = ['buimonitor.cfg', 'buimonitor.sample.cfg']
conf = ["buimonitor.cfg", "buimonitor.sample.cfg"]
if options.config:
conf = lookup_file(options.config, guess=False)
else:
......@@ -156,11 +192,25 @@ def monitor(options=None):
def celery():
from burpui.utils import lookup_file
parser = ArgumentParser('bui-celery')
parser.add_argument('-c', '--config', dest='config', help='burp-ui configuration file', metavar='<CONFIG>')
parser.add_argument('-t', '--type', dest='type', help='celery mode', metavar='<worker|beat|flower>')
parser.add_argument('-m', '--mode', dest='mode', help='application mode', metavar='<agent|server|worker|manage|legacy>')
parser.add_argument('remaining', nargs=REMAINDER)
parser = ArgumentParser("bui-celery")
parser.add_argument(
"-c",
"--config",
dest="config",
help="burp-ui configuration file",
metavar="<CONFIG>",
)
parser.add_argument(
"-t", "--type", dest="type", help="celery mode", metavar="<worker|beat|flower>"
)
parser.add_argument(
"-m",
"--mode",
dest="mode",
help="application mode",
metavar="<agent|server|worker|manage|legacy>",
)
parser.add_argument("remaining", nargs=REMAINDER)
options, unknown = parser.parse_known_args()
env = os.environ
......@@ -168,18 +218,18 @@ def celery():
if options.config:
conf = lookup_file(options.config, guess=False)
else:
if 'BUI_CONFIG' in env:
conf = env['BUI_CONFIG']
if "BUI_CONFIG" in env:
conf = env["BUI_CONFIG"]
else:
conf = lookup_file()
if options.type:
celery_mode = options.type
else:
celery_mode = 'worker'
celery_mode = "worker"
# make conf path absolute
if not conf.startswith('/'):
if not conf.startswith("/"):
curr = os.getcwd()
conf = os.path.join(curr, conf)
......@@ -187,17 +237,12 @@ def celery():
os.chdir(ROOT)
env['BUI_MODE'] = 'celery'
env['BUI_CONFIG'] = conf
env["BUI_MODE"] = "celery"
env["BUI_CONFIG"] = conf
args = [
'celery',
celery_mode,
'-A',
'engines.worker.celery'
]
args = ["celery", celery_mode, "-A", "engines.worker.celery"]
args += unknown
args += [x for x in options.remaining if x != '--']
args += [x for x in options.remaining if x != "--"]
os.execvpe(args[0], args, env)
......@@ -205,48 +250,78 @@ def celery():
def manage():
from burpui.utils import lookup_file
parser = ArgumentParser('bui-manage')
parser.add_argument('-v', '--verbose', dest='log', help='increase output verbosity (e.g., -vv is more verbose than -v)', action='count')
parser.add_argument('-c', '--config', dest='config', help='burp-ui configuration file', metavar='<CONFIG>')
parser.add_argument('-i', '--migrations', dest='migrations', help='migrations directory', metavar='<MIGRATIONSDIR>')
parser.add_argument('-m', '--mode', dest='mode', help='application mode', metavar='<agent|server|worker|manage|legacy>')
parser.add_argument('-l', '--logfile', dest='logfile', help='output logs in defined file', metavar='<FILE>')
parser.add_argument('remaining', nargs=REMAINDER)
parser = ArgumentParser("bui-manage")
parser.add_argument(
"-v",
"--verbose",
dest="log",
help="increase output verbosity (e.g., -vv is more verbose than -v)",
action="count",
)
parser.add_argument(
"-c",
"--config",
dest="config",
help="burp-ui configuration file",
metavar="<CONFIG>",
)
parser.add_argument(
"-i",
"--migrations",
dest="migrations",
help="migrations directory",
metavar="<MIGRATIONSDIR>",
)
parser.add_argument(
"-m",
"--mode",
dest="mode",
help="application mode",
metavar="<agent|server|worker|manage|legacy>",
)
parser.add_argument(
"-l",
"--logfile",
dest="logfile",
help="output logs in defined file",
metavar="<FILE>",
)
parser.add_argument("remaining", nargs=REMAINDER)
options, unknown = parser.parse_known_args()
env = os.environ
if options.logfile:
env['BUI_LOGFILE'] = options.logfile
env["BUI_LOGFILE"] = options.logfile
if options.config:
conf = lookup_file(options.config, guess=False)
else:
if 'BUI_CONFIG' in env:
conf = env['BUI_CONFIG']
if "BUI_CONFIG" in env:
conf = env["BUI_CONFIG"]
else:
conf = lookup_file()
check_config(conf)
if options.migrations:
migrations = lookup_file(options.migrations, guess=False, directory=True, check=False)
migrations = lookup_file(
options.migrations, guess=False, directory=True, check=False
)
else:
migrations = lookup_file('migrations', directory=True)
migrations = lookup_file("migrations", directory=True)
env['BUI_MODE'] = 'manage'
env['BUI_CONFIG'] = conf
env['BUI_VERBOSE'] = str(options.log)
env["BUI_MODE"] = "manage"
env["BUI_CONFIG"] = conf
env["BUI_VERBOSE"] = str(options.log)
if migrations:
env['BUI_MIGRATIONS'] = migrations
if os.path.isdir('burpui') and os.path.isfile('burpui/cli.py'):
env['FLASK_APP'] = 'burpui/cli.py'
env["BUI_MIGRATIONS"] = migrations
if os.path.isdir("burpui") and os.path.isfile("burpui/cli.py"):
env["FLASK_APP"] = "burpui/cli.py"
else:
env['FLASK_APP'] = 'burpui.cli'
env["FLASK_APP"] = "burpui.cli"
args = [
'flask'
]
args = ["flask"]
args += unknown
args += [x for x in options.remaining if x != '--']
args += [x for x in options.remaining if x != "--"]
os.execvpe(args[0], args, env)
......@@ -257,47 +332,44 @@ def legacy(options=None, unknown=None):
if unknown is None:
unknown = []
if not options:
options, unknown = parse_args(mode=False, name='burpui-legacy')
options, unknown = parse_args(mode=False, name="burpui-legacy")
env = os.environ
if options.config:
conf = lookup_file(options.config, guess=False)
else:
if 'BUI_CONFIG' in env:
conf = env['BUI_CONFIG']
if "BUI_CONFIG" in env:
conf = env["BUI_CONFIG"]
else:
conf = lookup_file()
check_config(conf)
env['BUI_MODE'] = 'legacy'
env['BUI_CONFIG'] = conf
if os.path.isdir('burpui'):
env['FLASK_APP'] = 'burpui/cli.py'
env["BUI_MODE"] = "legacy"
env["BUI_CONFIG"] = conf
if os.path.isdir("burpui"):
env["FLASK_APP"] = "burpui/cli.py"
else:
env['FLASK_APP'] = 'burpui.cli'
env['BUI_VERBOSE'] = str(options.log)
env["FLASK_APP"] = "burpui.cli"
env["BUI_VERBOSE"] = str(options.log)
if options.logfile:
env['BUI_LOGFILE'] = options.logfile
env["BUI_LOGFILE"] = options.logfile
if options.debug:
env['BUI_DEBUG'] = '1'
env['FLASK_DEBUG'] = '1'
env["BUI_DEBUG"] = "1"
env["FLASK_DEBUG"] = "1"
args = [
'flask',
'legacy'
]
args = ["flask", "legacy"]
args += unknown
args += [x for x in options.remaining if x != '--']
args += [x for x in options.remaining if x != "--"]
os.execvpe(args[0], args, env)
def check_config(conf):
if not conf:
raise IOError('No configuration file found')
raise IOError("No configuration file found")
if not os.path.isfile(conf):
raise IOError('File does not exist: \'{0}\''.format(conf))
raise IOError("File does not exist: '{0}'".format(conf))
if __name__ == '__main__':
if __name__ == "__main__":
main()
......@@ -10,6 +10,7 @@
import pickle # noqa
from urllib.parse import unquote, quote, urlparse, urljoin # noqa
text_type = str
string_types = (str,)
......@@ -17,15 +18,15 @@ string_types = (str,)
def to_bytes(text):
"""Transform string to bytes."""
if isinstance(text, text_type):
text = text.encode('utf-8')
return text or b''
text = text.encode("utf-8")
return text or b""
def to_unicode(input_bytes, encoding='utf-8'):
def to_unicode(input_bytes, encoding="utf-8"):
"""Decodes input_bytes to text if needed."""
if not isinstance(input_bytes, string_types):
input_bytes = input_bytes.decode(encoding)
return input_bytes or ''
return input_bytes or ""
# maps module name -> attribute name -> original item
......@@ -39,24 +40,24 @@ def patch_item(module, attr, newitem, newmodule=None):
olditem = getattr(module, attr, NONE)
if olditem is not NONE:
saved.setdefault(module.__name__, {}).setdefault(attr, olditem)
if newmodule and not getattr(newmodule, 'ori_' + attr, None):
setattr(newmodule, 'ori_' + attr, olditem)
if not getattr(newmodule, 'ori_' + attr, None):
if newmodule and not getattr(newmodule, "ori_" + attr, None):
setattr(newmodule, "ori_" + attr, olditem)
if not getattr(newmodule, "ori_" + attr, None):
setattr(module, attr, newitem)
def patch_module(name, items=None):
toimport = items or []
mod = __name__
if '.' in mod:
mod = mod.split('.')[0]
replace_module = __import__('{}._{}'.format(mod, name), fromlist=toimport)
if "." in mod:
mod = mod.split(".")[0]
replace_module = __import__("{}._{}".format(mod, name), fromlist=toimport)
module_name = name
module = __import__(module_name)
if items is None:
items = getattr(replace_module, '__implements__', None)
items = getattr(replace_module, "__implements__", None)
if items is None:
raise AttributeError('%r does not have __implements__' % replace_module)
raise AttributeError("%r does not have __implements__" % replace_module)
for attr in items:
patch_item(module, attr, getattr(replace_module, attr), replace_module)
......@@ -67,4 +68,4 @@ def patch_json():
except ImportError:
# ujson is not available, we won't patch anything
return
patch_module('json', ['dumps', 'loads'])
patch_module("json", ["dumps", "loads"])
......@@ -9,18 +9,18 @@
"""
import ujson
__implements__ = ['dumps', 'loads']
__implements__ = ["dumps", "loads"]
ori_dumps = None
ori_loads = None
IMPLEMENTED_DUMPS_KWARGS = [
'ensure_ascii',
'double_precision',
'encode_html_chars',
'sort_keys',
"ensure_ascii",
"double_precision",
"encode_html_chars",
"sort_keys",
]
IMPLEMENTED_LOADS_KWARGS = [
'precise_float',
"precise_float",
]
......
......@@ -14,4 +14,4 @@ from . import create_app
# This is a lie we are not really unittesting, but we want to avoid the v2
# errors
app = create_app(conf='/dev/null', gunicorn=False, unittest=True)
app = create_app(conf="/dev/null", gunicorn=False, unittest=True)
......@@ -27,22 +27,23 @@ from ..config import config
from ..tools.logging import logger
bui = current_app # type: BUIServer
EXEMPT_METHODS = set(['OPTIONS'])
EXEMPT_METHODS = set(["OPTIONS"])
def force_refresh():
return request.headers.get('X-No-Cache', False) is not False or \
getattr(g, 'DONOTCACHE', False)
return request.headers.get("X-No-Cache", False) is not False or getattr(
g, "DONOTCACHE", False
)
def cache_key():
key = '{}-{}-{}-{}-{}-{}'.format(
session.get('login', uuid.uuid4()),
key = "{}-{}-{}-{}-{}-{}".format(
session.get("login", uuid.uuid4()),
request.path,
request.values,
request.headers.get('X-Session-Tag', ''),
request.headers.get("X-Session-Tag", ""),
request.cookies,
session.get('language', '')
session.get("language", ""),
)
key = hashlib.sha256(to_bytes(key)).hexdigest()
return key
......@@ -52,6 +53,7 @@ def api_login_required(func):
"""Custom login decorator that is able to parse Basic credentials as well as
Cookies set with the traditional login.
"""
@wraps(func)
def decorated_view(*args, **kwargs):
"""decorator"""
......@@ -60,23 +62,29 @@ def api_login_required(func):
# 'func' is a Flask.view.MethodView so we have access to some special
# params
cls = func.view_class
login_required = getattr(cls, 'login_required', True)
if (bui.auth != 'none' and
login_required and
not bui.config.get('LOGIN_DISABLED', False)):
login_required = getattr(cls, "login_required", True)
if (
bui.auth != "none"
and login_required
and not bui.config.get("LOGIN_DISABLED", False)
):
if not current_user.is_authenticated:
if request.headers.get('X-From-UI', False):
if request.headers.get("X-From-UI", False):
abort(403)
return Response(
'Could not verify your access level for that URL.\n'
'You have to login with proper credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
"Could not verify your access level for that URL.\n"
"You have to login with proper credentials",
401,
{"WWW-Authenticate": 'Basic realm="Login Required"'},
)
return func(*args, **kwargs)
return decorated_view
def check_acl(func):