# Copyright (C) 2016 Custodia Project Contributors - see LICENSE file from __future__ import absolute_import import abc import configparser import grp import inspect import json import pwd import re from jwcrypto.common import json_encode import six from .log import CustodiaLoggingAdapter, auditlog, getLogger logger = getLogger(__name__) class _Required: __slots__ = () def __repr__(self): return 'REQUIRED' class INHERIT_GLOBAL: # noqa: N801 __slots__ = ('default',) def __init__(self, default): self.default = default def __repr__(self): return 'INHERIT_GLOBAL({})'.format(self.default) REQUIRED = _Required() class CustodiaException(Exception): pass class HTTPError(CustodiaException): def __init__(self, code=None, message=None): self.code = code if code is not None else 500 self.mesg = message errstring = '%d: %s' % (self.code, self.mesg) super(HTTPError, self).__init__(errstring) class CSStoreError(CustodiaException): pass class CSStoreExists(CustodiaException): pass class CSStoreUnsupported(CustodiaException): pass class CSStoreDenied(CustodiaException): pass class OptionHandler: """Handler and parser for plugin options """ def __init__(self, parser, section): self.parser = parser self.section = section # handler is reserved to look up the plugin class self.seen = {'handler'} def get(self, po): """Lookup value for a PluginOption instance Args: po: PluginOption Returns: converted value """ name = po.name typ = po.typ default = po.default handler = getattr(self, '_get_{}'.format(typ), None) if handler is None: raise ValueError(typ) self.seen.add(name) if not self.parser.has_option(self.section, name): if default is REQUIRED: raise NameError(self.section, name) if isinstance(default, INHERIT_GLOBAL): return handler('global', name, default.default) # don't return default here, give the handler a chance to modify # the default, e.g. pw_uid with default='root' returns 0. return handler(self.section, name, default) def check_surplus(self): surplus = [] for name, _value in self.parser.items(self.section): if (name not in self.seen and not self.parser.has_option(configparser.DEFAULTSECT, name)): surplus.append(name) return surplus def _get_int(self, section, name, default): return self.parser.getint(section, name, fallback=default) def _get_oct(self, section, name, default): value = self.parser.get(section, name, fallback=default) return int(value, 8) def _get_hex(self, section, name, default): value = self.parser.get(section, name, fallback=default) return int(value, 16) def _get_float(self, section, name, default): return self.parser.getfloat(section, name, fallback=default) def _get_bool(self, section, name, default): return self.parser.getboolean(section, name, fallback=default) def _get_regex(self, section, name, default): value = self.parser.get(section, name, fallback=default) if not value: return None else: return re.compile(value) def _get_str(self, section, name, default): return self.parser.get(section, name, fallback=default) def _split_string(self, value): if ',' in value: values = value.split(',') else: values = value.split(' ') return list(v.strip() for v in values if v.strip()) def _get_str_set(self, section, name, default): try: value = self.parser.get(section, name) except configparser.NoOptionError: return default if not value or not value.strip(): return None else: return set(self._split_string(value)) def _get_str_list(self, section, name, default): try: value = self.parser.get(section, name) except configparser.NoOptionError: return default if not value or not value.strip(): return None else: return self._split_string(value) def _get_store(self, section, name, default): return self.parser.get(section, name, fallback=default) def _get_pwd_uid(self, section, name, default): value = self.parser.get(section, name, fallback=default) try: return int(value) except ValueError: return pwd.getpwnam(value).pw_uid def _get_grp_gid(self, section, name, default): value = self.parser.get(section, name, fallback=default) try: return int(value) except ValueError: return grp.getgrnam(value).gr_gid def _get_json(self, section, name, default): value = self.parser.get(section, name, fallback=default) return json.loads(value) class PluginOption: """Plugin option code:: class MyPlugin(CustodiaPlugin): number = PluginOption(int, REQUIRED, 'my value') values = PluginOption('str_list', 'foo bar', 'a list of strings') config:: [myplugin] handler = MyPlugin number = 1 values = egg spam python **Supported value types** *str* plain string *str_set* set of comma-separated or space-separated strings *str_list* ordered list of comma-separated or space-separated strings *int* number (converted from base 10) *hex* number (converted from base 16) *oct* number (converted from base 8) *float* floating point number *bool* boolean (true: on, true, yes, 1; false: off, false, no, 0) *regex* regular expression string *store* special value for refer to a store plugin *pwd_uid* numeric user id or user name *grp_gid* numeric group id or group name *json* JSON string """ __slots__ = ('name', 'typ', 'default', 'doc') def __init__(self, typ, default, doc): self.name = None if typ in {str, int, float, bool, oct, hex}: self.typ = typ.__name__ else: self.typ = typ self.default = default self.doc = doc def __repr__(self): if self.default is REQUIRED: msg = "" else: msg = ("