# -*- coding: utf-8 -*- """ @author: Fabio Erculiani @contact: lxnay@sabayon.org @copyright: Fabio Erculiani @license: GPL-2 B{Entropy Command Line Client}. """ import os import sys import argparse from entropy.i18n import _ from entropy.const import const_convert_to_unicode, \ const_convert_to_rawstring from entropy.output import darkgreen, teal, purple, print_error, bold, \ brown from entropy.exceptions import PermissionDenied from entropy.client.interfaces import Client from entropy.core.settings.base import SystemSettings import entropy.tools from solo.utils import enlightenatom class SoloCommand(object): """ Base class for Solo commands """ # Set this to the command name from where this object # gets triggered (for equo help, "help" is the NAME # that should be set). NAME = None # Set this to a list of aliases for NAME ALIASES = [] # Set this to True if command is a catch-all (fallback) CATCH_ALL = False # Allow unprivileged access ? ALLOW_UNPRIVILEGED = False # If True, the command is not shown in the help output HIDDEN = False # These two class variables are used in the man page # generation. You also need to override man() INTRODUCTION = "No introduction available" SEE_ALSO = "" def __init__(self, args): self._args = args def _get_parser(self): """ This is the argparse parser setup method, it shall return the ArgumentParser object that will be used by parse(). """ raise NotImplementedError() def _argparse_is_valid_directory(self, string): """ To be used with argparse add_argument() type parameter for validating directory paths. """ if os.path.isdir(string) and os.path.exists(string): # cope with broken symlinks return string msg = "%s: %s" % (_("not a valid directory"), string) # see bug 3873, requires raw string msg = const_convert_to_rawstring( msg, from_enctype="utf-8") raise argparse.ArgumentTypeError(msg) def _argparse_is_valid_entropy_package(self, string): """ To be used with argparse add_argument() type parameter for validating entropy package paths. """ if os.path.isfile(string) and os.path.exists(string): if entropy.tools.is_entropy_package_file(string): return string msg = "%s: %s" % (_("not a valid Entropy package file"), string) raise argparse.ArgumentTypeError(msg) def _setup_verbose_quiet_parser(self, parser): """ Add --verbose and --quiet switches to parser. """ parser.add_argument( "--verbose", "-v", action="store_true", default=False, help=_("verbose output")) parser.add_argument( "--quiet", "-q", action="store_true", default=False, help=_("quiet output")) def parse(self): """ Parse the actual arguments and return the function that should be called and its arguments. The function signature is: int function([list of args]) The return value represents the exit status of the "command" """ raise NotImplementedError() def bashcomp(self, last_arg): """ Print to standard output the bash completion outcome for given arguments (self._args). Raise NotImplementedError() if not supported. @param last_arg: last argument in the argv. Useful for allowing its automagic completion. Can be None !! @type last_arg: string or None """ raise NotImplementedError() def _hierarchical_bashcomp(self, last_arg, outcome, commands): """ This method implements bash completion through a hierarchical (commands) dictionary object. """ # navigate through commands, finding the list of commands _commands = commands if not self._args: # show all the commands outcome += sorted(commands.keys()) for index, item in enumerate(self._args): if item in _commands: _commands = commands[item] if index == (len(self._args) - 1): # if this is the last one, generate # proper outcome elements. outcome += sorted(_commands.keys()) # reset last_arg so that outcome list # won't be filtered last_arg = "" elif index == (len(self._args) - 1): # if this is the last one, and item # is not in _commands, outcome becomes # _commands.keys() outcome += sorted(_commands.keys()) # no need to break here else: # item not in commands, but that's not the # last one, we must generate proper outcome # elements and stop right after outcome += sorted(_commands.keys()) break return self._bashcomp(sys.stdout, last_arg, outcome) def _bashcomp(self, stdout, last_arg, available_args): """ This method must be called from inside bashcomp() and does the actual bash-completion rendering on stdout. """ def _startswith(string): if last_arg is not None: if last_arg not in available_args: return string.startswith(last_arg) return True if self._args: # only filter out if last_arg is actually # something after this.NAME. available_args = sorted(filter(_startswith, available_args)) for arg in self._args: if arg in available_args: available_args.remove(arg) stdout.write(" ".join(available_args) + "\n") stdout.flush() def man(self): """ Return a dictionary containing the following man entries (in a2x format), excluding the entry title: name, synopsis, introduction, options. Optional keys are: seealso. All of them are mandatory. """ raise NotImplementedError() def _man(self): """ Standard man page outcome generator that can be used to implement class-specific man() methods. You need to provide your own INTRODUCTION and SEE_ALSO class fields (see class-level variables). """ parser = self._get_parser() prog = "%s %s" % ("equo", self.NAME) formatter = parser.formatter_class(prog=prog) usage = formatter._format_usage(parser.usage, parser._actions, parser._mutually_exclusive_groups, "").rstrip() options_txt = [] action_groups = parser._action_groups if action_groups: options_header = "\"equo " + self.NAME + "\" " options_header += "supports the following options which " options_header += "alters its behaviour.\n\n" options_txt.append(options_header) for group in action_groups: if group._group_actions: options_txt.append(group.title.upper()) options_txt.append("~" * len(group.title)) for action in group._group_actions: action_name = action.metavar option_strings = action.option_strings if not option_strings: # positional args if action_name is None: # SubParsers action_lst = [] for sub_action in action._get_subactions(): sub_action_str = "*" + sub_action.dest + "*::\n" sub_action_str += " " + sub_action.help + "\n" action_lst.append(sub_action_str) action_str = "\n".join(action_lst) else: action_str = "*" + action_name + "*::\n" action_str += " " + action.help + "\n" else: action_str = "" for option_str in option_strings: action_str = "*" + option_str + "*" if action_name: action_str += "=" + action_name action_str += "::\n" action_str += " " + action.help + "\n" options_txt.append(action_str) data = { 'name': self.NAME, 'description': parser.description, 'introduction': self.INTRODUCTION, 'seealso': self.SEE_ALSO, 'synopsis': usage, 'options': "\n".join(options_txt), } return data def _entropy(self, *args, **kwargs): """ Return the Entropy Client object. This method is not thread safe. """ return Client(*args, **kwargs) def _entropy_bashcomp(self): """ Return an Entropy Client object that MUST be used only inside bashcomp methods. This object is faster to load than the standard Entropy object loaded by _entropy() at the cost of less consistency checks. """ return Client(indexing=False, repo_validation=False) def _entropy_ws(self, entropy_client, repository_id, tx_cb=False): """ Initialize an Entropy Web Services object for the given Repository name. @param entropy_client: Entropy Client interface @type entropy_client: entropy.client.interfaces.Client @param repository_id: repository identifier @type repository_id: string @return: the ClientWebService instance @rtype: entropy.client.services.interfaces.ClientWebService @raise WebService.UnsupportedService: if service is unsupported by repository """ def _transfer_callback(transfered, total, download): if download: action = _("Downloading") else: action = _("Uploading") percent = 100 if (total > 0) and (transfered <= total): percent = int(round((float(transfered)/total) * 100, 1)) msg = "[%s%s] %s ..." % ( purple(str(percent)), "%", teal(action)) entropy_client.output(msg, back=True) factory = entropy_client.WebServices() webserv = factory.new(repository_id) if tx_cb: webserv._set_transfer_callback(_transfer_callback) return webserv def _call_locked(self, func): """ Execute the given function at func after acquiring Entropy Resources Lock, for given repository at repo. The signature of func is: int func(entropy_client). """ client = None acquired = False try: try: client = self._entropy() except PermissionDenied as err: print_error(err.value) return 1 blocking = os.getenv("__EQUO_LOCKS_BLOCKING__") if blocking: client.output(darkgreen( _("Acquiring Entropy Resources " "Lock, please wait...")), back=True) acquired = entropy.tools.acquire_entropy_locks( client, blocking=blocking) if not acquired: client.output( darkgreen(_("Another Entropy is currently running.")), level="error", importance=1 ) return 1 return func(client) finally: if client is not None: if acquired: entropy.tools.release_entropy_locks(client) client.shutdown() def _call_unlocked(self, func): """ Execute the given function at func after acquiring Entropy Resources Lock in shared mode, for given repository at repo. The signature of func is: int func(entropy_client). """ client = None acquired = False try: try: client = self._entropy() except PermissionDenied as err: print_error(err.value) return 1 # use blocking mode to avoid tainting stdout acquired = entropy.tools.acquire_entropy_locks( client, blocking=True, shared=True) if not acquired: client.output( darkgreen(_("Another Entropy is currently running.")), level="error", importance=1 ) return 1 return func(client) finally: if client is not None: if acquired: entropy.tools.release_entropy_locks(client) client.shutdown() def _settings(self): """ Return a SystemSettings instance. """ return SystemSettings() def _show_did_you_mean(self, entropy_client, package, from_installed): """ Show "Did you mean?" results for the given package name. """ items = entropy_client.get_meant_packages( package, from_installed=from_installed) if not items: return mytxt = "%s %s %s %s %s" % ( bold(const_convert_to_unicode(" ?")), teal(_("When you wrote")), bold(const_convert_to_unicode(package)), darkgreen(_("You Meant(tm)")), teal(_("one of these below?")), ) entropy_client.output(mytxt) _cache = set() for pkg_id, repo_id in items: if from_installed: repo = entropy_client.installed_repository() else: repo = entropy_client.open_repository(repo_id) key_slot = repo.retrieveKeySlotAggregated(pkg_id) if key_slot not in _cache: entropy_client.output( enlightenatom(key_slot), header=brown(" # ")) _cache.add(key_slot)