From c8f6fb0e6051bc06c5d70d7ea046cc8a951ef438 Mon Sep 17 00:00:00 2001 From: lxnay Date: Fri, 3 Oct 2008 01:59:47 +0000 Subject: [PATCH] Entropy/System Manager: - cache socket connection to improve speed - add extended_result keyword inside queue items metadata to redirect big results there and avoid to have remote get_queue() taking a lot due to the need to serialize big data - complete 'reagent' part - implemented most of the 'activator' functions git-svn-id: http://svn.sabayonlinux.org/projects/entropy/trunk@2452 cd1c1023-2f26-0410-ae45-c471fc1f0318 --- libraries/entropy.py | 1235 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 1113 insertions(+), 122 deletions(-) diff --git a/libraries/entropy.py b/libraries/entropy.py index 38422e896..d9c0acb8e 100644 --- a/libraries/entropy.py +++ b/libraries/entropy.py @@ -739,7 +739,7 @@ class EquoInterface(TextInterface): return -1 def openRepositoryDatabase(self, repoid): - t_ident = thread.get_ident() + t_ident = 1 # thread.get_ident() disabled for now if not self.repoDbCache.has_key((repoid,etpConst['systemroot'],t_ident,)) or (etpConst['packagemasking'] == None): if etpConst['packagemasking'] == None: self.closeAllRepositoryDatabases() @@ -13829,22 +13829,28 @@ class SocketHostInterface: def _get_args_kwargs(self, args): myargs = [] mykwargs = {} + + def is_int(x): + try: + int(x) + except ValueError: + return False + return True + for arg in args: if (arg.find("=") != -1) and not arg.startswith("="): x = arg.split("=") a = x[0] b = ''.join(x[1:]) - try: + if (b in ("True","False",)) or is_int(b): mykwargs[a] = eval(b) - except (NameError,SyntaxError,): + else: myargs.append(arg) else: - try: + if (arg in ("True","False",)) or is_int(arg): myargs.append(eval(arg)) - except (NameError, SyntaxError): + else: myargs.append(arg) - except TypeError: - pass return myargs, mykwargs def spawn_function(self, cmd, myargs, mykwargs, session, Entropy, authenticator): @@ -15179,7 +15185,8 @@ class ServerInterface(TextInterface): repo = None, indexing = True, warnings = True, - do_cache = True + do_cache = True, + ask_treeupdates = True ): if repo == None: @@ -15195,20 +15202,21 @@ class ServerInterface(TextInterface): if etpConst['packagemasking'] == None: self.ClientService.parse_masking_settings() - t_ident = thread.get_ident() + t_ident = 1 # thread.get_ident() disabled for now local_dbfile = self.get_local_database_file(repo) - cached = self.serverDbCache.get( - ( etpConst['systemroot'], - local_dbfile, - read_only, - no_upload, - just_reading, - repo, - t_ident, - ) - ) - if cached != None: - return cached + if do_cache: + cached = self.serverDbCache.get( + ( etpConst['systemroot'], + local_dbfile, + read_only, + no_upload, + just_reading, + repo, + t_ident, + ) + ) + if cached != None: + return cached if not os.path.isdir(os.path.dirname(local_dbfile)): os.makedirs(os.path.dirname(local_dbfile)) @@ -15236,7 +15244,7 @@ class ServerInterface(TextInterface): if (repo not in etpConst['server_treeupdatescalled']) and (not just_reading): # sometimes, when filling a new server db, we need to avoid tree updates if valid: - conn.serverUpdatePackagesData() + conn.serverUpdatePackagesData(ask = ask_treeupdates) elif warnings: mytxt = _( "Entropy database is probably empty. If you don't agree with what I'm saying, then it's probably corrupted! I won't stop you here btw...") self.updateProgress( @@ -15988,7 +15996,7 @@ class ServerInterface(TextInterface): def get_remote_mirrors(self, repo = None): if repo == None: repo = self.default_repository - return etpConst['server_repositories'][repo]['mirrors'] + return etpConst['server_repositories'][repo]['mirrors'][:] def get_remote_packages_relative_path(self, repo = None): if repo == None: @@ -16099,6 +16107,20 @@ class ServerInterface(TextInterface): else: return 0 + def get_remote_database_revision(self, repo = None): + + if repo == None: + repo = self.default_repository + + remote_status = self.MirrorsService.get_remote_databases_status(repo) + if not [x for x in remote_status if x[1]]: + remote_revision = 0 + else: + remote_revision = max([x[1] for x in remote_status]) + + return remote_revision + + def atomMatch(self, *args, **kwargs): repos = etpConst['server_repositories'].keys() kwargs['server_repos'] = repos @@ -20944,6 +20966,7 @@ class SystemManagerExecutorInterface: class SystemManagerExecutorServerRepositoryInterface: + import entropyTools def __init__(self, SystemManagerExecutorInstance, *args, **kwargs): import subprocess, entropyTools self.subprocess = subprocess @@ -21000,6 +21023,34 @@ class SystemManagerExecutorServerRepositoryInterface: 'func': self.move_entropy_packages_to_repository, 'args': 5, }, + 'scan_entropy_packages_database_changes': { + 'func': self.scan_entropy_packages_database_changes, + 'args': 1, + }, + 'run_entropy_database_updates': { + 'func': self.run_entropy_database_updates, + 'args': 4, + }, + 'run_entropy_dependency_test': { + 'func': self.run_entropy_dependency_test, + 'args': 1, + }, + 'run_entropy_library_test': { + 'func': self.run_entropy_library_test, + 'args': 1, + }, + 'run_entropy_treeupdates': { + 'func': self.run_entropy_treeupdates, + 'args': 2, + }, + 'scan_entropy_mirror_updates': { + 'func': self.scan_entropy_mirror_updates, + 'args': 2, + }, + 'run_entropy_mirror_updates': { + 'func': self.run_entropy_mirror_updates, + 'args': 2, + } } def _set_processing_pid(self, queue_id, process_pid): @@ -21026,6 +21077,7 @@ class SystemManagerExecutorServerRepositoryInterface: self._set_processing_pid(queue_id, p.pid) rc = p.wait() finally: + stdout_err.write("\n### Done ###\n") stdout_err.flush() stdout_err.close() return True,rc @@ -21073,6 +21125,7 @@ class SystemManagerExecutorServerRepositoryInterface: self._set_processing_pid(queue_id, p.pid) rc = p.wait() finally: + stdout_err.write("\n### Done ###\n") stdout_err.flush() stdout_err.close() return True,rc @@ -21103,6 +21156,7 @@ class SystemManagerExecutorServerRepositoryInterface: self._set_processing_pid(queue_id, p.pid) rc = p.wait() finally: + stdout_err.write("\n### Done ###\n") stdout_err.flush() stdout_err.close() return True,rc @@ -21186,6 +21240,7 @@ class SystemManagerExecutorServerRepositoryInterface: if not package_data.has_key(category): package_data[category] = {} package_data[category][package] = self._get_spm_pkginfo(package) + return True, package_data def get_spm_categories_installed(self, queue_id, categories): @@ -21205,6 +21260,7 @@ class SystemManagerExecutorServerRepositoryInterface: if not package_data.has_key(category): package_data[category] = {} package_data[category][package] = self._get_spm_pkginfo(package, from_installed = True) + return True, package_data def run_spm_info(self, queue_id): @@ -21225,6 +21281,7 @@ class SystemManagerExecutorServerRepositoryInterface: self._set_processing_pid(queue_id, p.pid) rc = p.wait() finally: + stdout_err.write("\n### Done ###\n") stdout_err.flush() stdout_err.close() return True,rc @@ -21250,6 +21307,7 @@ class SystemManagerExecutorServerRepositoryInterface: self._set_processing_pid(queue_id, p.pid) rc = p.wait() finally: + stdout_err.write("\n### Done ###\n") stdout_err.flush() stdout_err.close() return True,rc @@ -21268,7 +21326,7 @@ class SystemManagerExecutorServerRepositoryInterface: stdout_err = open(queue_data['stdout'],"aw") def myupdateprogress(*myargs, **mykwargs): - stdout_err.write(myargs[0]+"\n") + self._file_updateProgress(stdout_err, *myargs, **mykwargs) old_updprogress = self.SystemManagerExecutor.SystemInterface.Entropy.updateProgress self.SystemManagerExecutor.SystemInterface.Entropy.updateProgress = myupdateprogress @@ -21280,6 +21338,7 @@ class SystemManagerExecutorServerRepositoryInterface: do_copy = do_copy ) finally: + stdout_err.write("\n### Done ###\n") self.SystemManagerExecutor.SystemInterface.Entropy.updateProgress = old_updprogress stdout_err.flush() stdout_err.close() @@ -21289,6 +21348,429 @@ class SystemManagerExecutorServerRepositoryInterface: rc = 0 return True,rc + def scan_entropy_packages_database_changes(self, queue_id): + + queue_data, key = self.SystemManagerExecutor.SystemInterface.get_item_by_queue_id(queue_id) + if queue_data == None: + return False,'no item in queue' + + to_add, to_remove, to_inject = self.SystemManagerExecutor.SystemInterface.Entropy.scan_package_changes() + mydict = { 'add': to_add, 'remove': to_remove, 'inject': to_inject } + + # setup add data + mydict['add_data'] = {} + for portage_atom, portage_counter in to_add: + mydict['add_data'][(portage_atom, portage_counter,)] = self._get_spm_pkginfo(portage_atom,from_installed = True) + + mydict['remove_data'] = {} + for idpackage, repoid in to_remove: + dbconn = self.SystemManagerExecutor.SystemInterface.Entropy.openServerDatabase(repo = repoid, just_reading = True, warnings = False, do_cache = False) + mydict['remove_data'][(idpackage, repoid,)] = self._get_entropy_pkginfo(dbconn, idpackage, repoid) + dbconn.closeDB() + + mydict['inject_data'] = {} + for idpackage, repoid in to_inject: + dbconn = self.SystemManagerExecutor.SystemInterface.Entropy.openServerDatabase(repo = repoid, just_reading = True, warnings = False, do_cache = False) + mydict['inject_data'][(idpackage, repoid,)] = self._get_entropy_pkginfo(dbconn, idpackage, repoid) + dbconn.closeDB() + + return True,mydict + + def run_entropy_database_updates(self, queue_id, to_add, to_remove, to_inject): + + queue_data, key = self.SystemManagerExecutor.SystemInterface.get_item_by_queue_id(queue_id) + if queue_data == None: + return False,'no item in queue' + + stdout_err = open(queue_data['stdout'],"a+") + + def myupdateprogress(*myargs, **mykwargs): + self._file_updateProgress(stdout_err, *myargs, **mykwargs) + + Entropy = self.SystemManagerExecutor.SystemInterface.Entropy + + old_updprogress = Entropy.updateProgress + old_client_updprogress = Entropy.ClientService.updateProgress + Entropy.updateProgress = myupdateprogress + Entropy.ClientService.updateProgress = myupdateprogress + try: + + atoms_removed = [] + matches_injected = set() + + if to_inject: + Entropy.updateProgress(_("Running package injection")) + + # run inject + for idpackage, repoid in to_inject: + matches_injected.add((idpackage,repoid,)) + Entropy.transform_package_into_injected(idpackage, repo = repoid) + + if to_remove: + Entropy.updateProgress(_("Running package removal")) + + # run remove + remdata = {} + for idpackage,repoid in to_remove: + dbconn = Entropy.openServerDatabase(repo = repoid, just_reading = True, warnings = False, do_cache = False) + atoms_removed.append(dbconn.retrieveAtom(idpackage)) + dbconn.closeDB() + if not remdata.has_key(repoid): + remdata[repoid] = set() + remdata[repoid].add(idpackage) + for repoid in remdata: + Entropy.remove_packages(remdata[repoid], repo = repoid) + + if to_add: + Entropy.updateProgress(_("Running package quickpkg")) + + # run quickpkg + for repoid in to_add: + store_dir = Entropy.get_local_store_directory(repo = repoid) + for atom in to_add[repoid]: + Entropy.quickpkg(atom,store_dir) + + # inject new into db + avail_repos = Entropy.get_available_repositories() + if etpConst['clientserverrepoid'] in avail_repos: + avail_repos.pop(etpConst['clientserverrepoid']) + matches_added = set() + for repoid in avail_repos: + store_dir = Entropy.get_local_store_directory(repo = repoid) + package_files = os.listdir(store_dir) + if not package_files: continue + package_files = [(os.path.join(store_dir,x),etpConst['branch'],False) for x in package_files] + + Entropy.updateProgress( "[%s|%s] %s" % ( + repoid, + etpConst['branch'], + _("Adding packages"), + ) + ) + for package_file, branch, inject in package_files: + Entropy.updateProgress(" %s" % (package_file,)) + + idpackages = Entropy.add_packages_to_repository(package_files, ask = False, repo = repoid) + matches_added |= set([(x,repoid,) for x in idpackages]) + + + Entropy.dependencies_test() + + mydict = { + 'added_data': {}, + 'remove_data': atoms_removed, + 'inject_data': {} + } + for idpackage, repoid in matches_added: + dbconn = Entropy.openServerDatabase(repo = repoid, just_reading = True, warnings = False, do_cache = False) + mydict['added_data'][(idpackage, repoid,)] = self._get_entropy_pkginfo(dbconn, idpackage, repoid) + dbconn.closeDB() + for idpackage, repoid in matches_injected: + dbconn = Entropy.openServerDatabase(repo = repoid, just_reading = True, warnings = False, do_cache = False) + mydict['inject_data'][(idpackage, repoid,)] = self._get_entropy_pkginfo(dbconn, idpackage, repoid) + dbconn.closeDB() + + except Exception, e: + self.entropyTools.printTraceback(f = stdout_err) + return False,str(e) + finally: + stdout_err.write("\n### Done ###\n") + Entropy.updateProgress = old_updprogress + Entropy.ClientService.updateProgress = old_client_updprogress + stdout_err.flush() + stdout_err.close() + + return True,mydict + + def run_entropy_dependency_test(self, queue_id): + + queue_data, key = self.SystemManagerExecutor.SystemInterface.get_item_by_queue_id(queue_id) + if queue_data == None: + return False,'no item in queue' + + stdout_err = open(queue_data['stdout'],"a+") + + def myupdateprogress(*myargs, **mykwargs): + self._file_updateProgress(stdout_err, *myargs, **mykwargs) + + Entropy = self.SystemManagerExecutor.SystemInterface.Entropy + old_updprogress = Entropy.updateProgress + old_client_updprogress = Entropy.ClientService.updateProgress + Entropy.updateProgress = myupdateprogress + Entropy.ClientService.updateProgress = myupdateprogress + + try: + deps_not_matched = Entropy.dependencies_test() + except Exception, e: + self.entropyTools.printTraceback(f = stdout_err) + return False,str(e) + finally: + stdout_err.write("\n### Done ###\n") + Entropy.updateProgress = old_updprogress + Entropy.ClientService.updateProgress = old_client_updprogress + stdout_err.flush() + stdout_err.close() + + return True,deps_not_matched + + def run_entropy_library_test(self, queue_id): + + queue_data, key = self.SystemManagerExecutor.SystemInterface.get_item_by_queue_id(queue_id) + if queue_data == None: + return False,'no item in queue' + + stdout_err = open(queue_data['stdout'],"a+") + + def myupdateprogress(*myargs, **mykwargs): + self._file_updateProgress(stdout_err, *myargs, **mykwargs) + + Entropy = self.SystemManagerExecutor.SystemInterface.Entropy + old_updprogress = Entropy.updateProgress + old_client_updprogress = Entropy.ClientService.updateProgress + Entropy.updateProgress = myupdateprogress + Entropy.ClientService.updateProgress = myupdateprogress + + try: + status, result = Entropy.libraries_test() + except Exception, e: + self.entropyTools.printTraceback(f = stdout_err) + return False,str(e) + finally: + stdout_err.write("\n### Done ###\n") + Entropy.updateProgress = old_updprogress + Entropy.ClientService.updateProgress = old_client_updprogress + stdout_err.flush() + stdout_err.close() + + status = False + if status == 0: + status = True + if not result: + result = set() + return status,result + + def run_entropy_treeupdates(self, queue_id, repoid): + + queue_data, key = self.SystemManagerExecutor.SystemInterface.get_item_by_queue_id(queue_id) + if queue_data == None: + return False,'no item in queue' + + stdout_err = open(queue_data['stdout'],"a+") + + def myupdateprogress(*myargs, **mykwargs): + self._file_updateProgress(stdout_err, *myargs, **mykwargs) + + Entropy = self.SystemManagerExecutor.SystemInterface.Entropy + old_updprogress = Entropy.updateProgress + old_client_updprogress = Entropy.ClientService.updateProgress + Entropy.updateProgress = myupdateprogress + Entropy.ClientService.updateProgress = myupdateprogress + try: + stdout_err.write(_("Opening database to let it run treeupdates. If you won't see anything below, it's just fine.")+"\n") + dbconn = Entropy.openServerDatabase(repo = repoid, do_cache = False, ask_treeupdates = False, read_only = True) + dbconn.closeDB() + finally: + stdout_err.write("\n### Done ###\n") + Entropy.updateProgress = old_updprogress + Entropy.ClientService.updateProgress = old_client_updprogress + stdout_err.flush() + stdout_err.close() + + return True,0 + + def scan_entropy_mirror_updates(self, queue_id, repositories): + + queue_data, key = self.SystemManagerExecutor.SystemInterface.get_item_by_queue_id(queue_id) + if queue_data == None: + return False,'no item in queue' + + stdout_err = open(queue_data['stdout'],"a+") + + def myupdateprogress(*myargs, **mykwargs): + self._file_updateProgress(stdout_err, *myargs, **mykwargs) + + Entropy = self.SystemManagerExecutor.SystemInterface.Entropy + old_updprogress = Entropy.updateProgress + old_client_updprogress = Entropy.ClientService.updateProgress + Entropy.updateProgress = myupdateprogress + Entropy.ClientService.updateProgress = myupdateprogress + import socket + + try: + + stdout_err.write(_("Scanning")+"\n") + repo_data = {} + for repoid in repositories: + + repo_data[repoid] = {} + + for uri in Entropy.get_remote_mirrors(repoid): + + crippled_uri = self.entropyTools.extractFTPHostFromUri(uri) + + repo_data[repoid][crippled_uri] = {} + repo_data[repoid][crippled_uri]['packages'] = {} + + for mybranch in etpConst['branches']: + + try: + upload_queue, download_queue, removal_queue, \ + fine_queue, remote_packages_data = Entropy.MirrorsService.calculate_packages_to_sync(uri, mybranch, repoid) + except socket.error, e: + self.entropyTools.printTraceback(f = stdout_err) + stdout_err.write("\n"+_("Socket error, continuing...")+"\n") + continue + + if (upload_queue or download_queue or removal_queue): + upload, download, removal, copy, metainfo = Entropy.MirrorsService.expand_queues( + upload_queue, + download_queue, + removal_queue, + remote_packages_data, + mybranch, + repoid + ) + if len(upload)+len(download)+len(removal)+len(copy): + repo_data[repoid][crippled_uri]['packages'][mybranch] = { + 'upload': upload, + 'download': download, + 'removal': removal, + 'copy': copy, + } + + # now the db + current_revision = Entropy.get_local_database_revision(repoid) + remote_revision = Entropy.get_remote_database_revision(repoid) + download_latest, upload_queue = Entropy.MirrorsService.calculate_database_sync_queues(repoid) + + repo_data[repoid][crippled_uri]['database'] = { + 'current_revision': current_revision, + 'remote_revision': remote_revision, + 'download_latest': download_latest, + 'upload_queue': [(self.entropyTools.extractFTPHostFromUri(x[0]),x[1],) for x in upload_queue] + } + + finally: + + stdout_err.write("\n### Done ###\n") + Entropy.updateProgress = old_updprogress + Entropy.ClientService.updateProgress = old_client_updprogress + stdout_err.flush() + stdout_err.close() + + return True,repo_data + + def run_entropy_mirror_updates(self, queue_id, repository_data): + + queue_data, key = self.SystemManagerExecutor.SystemInterface.get_item_by_queue_id(queue_id) + if queue_data == None: + return False,'no item in queue' + + stdout_err = open(queue_data['stdout'],"a+") + + def myupdateprogress(*myargs, **mykwargs): + self._file_updateProgress(stdout_err, *myargs, **mykwargs) + + Entropy = self.SystemManagerExecutor.SystemInterface.Entropy + old_updprogress = Entropy.updateProgress + old_client_updprogress = Entropy.ClientService.updateProgress + Entropy.updateProgress = myupdateprogress + Entropy.ClientService.updateProgress = myupdateprogress + + def sync_remote_databases(repoid, pretend): + + rdb_status = Entropy.MirrorsService.get_remote_databases_status() + Entropy.updateProgress( + "%s:" % (_("Remote Entropy Database Repository Status"),), + header = " * " + ) + for myuri, myrev in rdb_status: + Entropy.updateProgress("\t %s:\t %s" % (_("Host"),self.entropyTools.extractFTPHostFromUri(myuri),)) + Entropy.updateProgress("\t * %s: %s" % (_("Database revision"),myrev,)) + local_revision = Entropy.get_local_database_revision(repoid) + Entropy.updateProgress("\t * %s: %s" % (_("Database local revision currently at"),local_revision,)) + if pretend: + return 0,set(),set() + + errors, fine_uris, broken_uris = Entropy.MirrorsService.sync_databases(no_upload = False) + remote_status = Entropy.MirrorsService.get_remote_databases_status(repoid) + Entropy.updateProgress(" * %s: " % (_("Remote Entropy Database Repository Status"),)) + for myuri, myrev in remote_status: + Entropy.updateProgress("\t %s:\t%s" % (_("Host"),Entropy.entropyTools.extractFTPHostFromUri(myuri),)) + Entropy.updateProgress("\t * %s: %s" % (_("Database revision"),myrev,) ) + + return errors, fine_uris, broken_uris + + repo_data = {} + try: + + for repoid in repository_data: + + # avoid __default__ + if repoid == etpConst['clientserverrepoid']: continue + + successfull_mirrors = set() + mirrors_errors = False + mirrors_tainted = False + broken_mirrors = set() + check_data = [] + + repo_data[repoid] = { + 'mirrors_tainted': mirrors_tainted, + 'mirrors_errors': mirrors_errors, + 'successfull_mirrors': successfull_mirrors.copy(), + 'broken_mirrors': broken_mirrors.copy(), + 'check_data': check_data, + 'db_errors': 0, + 'db_fine': set(), + 'db_broken': set(), + } + + if repository_data[repoid]['pkg']: + + mirrors_tainted, mirrors_errors, \ + successfull_mirrors, broken_mirrors, \ + check_data = Entropy.MirrorsService.sync_packages( + ask = False, pretend = repository_data[repoid]['pretend'], + packages_check = repository_data[repoid]['pkg_check'], repo = repoid) + + repo_data[repoid]['mirrors_tainted'] = mirrors_tainted + repo_data[repoid]['mirrors_errors'] = mirrors_errors + repo_data[repoid]['successfull_mirrors'] = successfull_mirrors + repo_data[repoid]['broken_mirrors'] = broken_mirrors + repo_data[repoid]['check_data'] = check_data + + if (not successfull_mirrors) and (not repository_data[repoid]['pretend']): continue + + if (not mirrors_errors) and repository_data[repoid]['db']: + + if mirrors_tainted and etpConst['rss-feed']: + commit_msg = repository_data[repoid]['commit_msg'] + if not commit_msg: commit_msg = "Autodriven update" + etpRSSMessages['commitmessage'] = commit_msg + + errors, fine, broken = sync_remote_databases(repoid, repository_data[repoid]['pretend']) + repo_data[repoid]['db_errors'] = errors + repo_data[repoid]['db_fine'] = fine.copy() + repo_data[repoid]['db_broken'] = broken.copy() + if errors: continue + Entropy.MirrorsService.lock_mirrors(lock = False, repo = repoid) + Entropy.MirrorsService.tidy_mirrors( + repo = repoid, ask = False, + pretend = repository_data[repoid]['pretend'] + ) + + + finally: + + stdout_err.write("\n### Done ###\n") + Entropy.updateProgress = old_updprogress + Entropy.ClientService.updateProgress = old_client_updprogress + stdout_err.flush() + stdout_err.close() + + return True,repo_data + def get_spm_glsa_data(self, queue_id, list_type): queue_data, key = self.SystemManagerExecutor.SystemInterface.get_item_by_queue_id(queue_id) @@ -21302,8 +21784,64 @@ class SystemManagerExecutorServerRepositoryInterface: data[myid] = self.SystemManagerExecutor.SystemInterface.Entropy.SpmService.get_glsa_id_information(myid) return True,data + def _file_updateProgress(self, f, *myargs, **mykwargs): + + f.flush() + back = mykwargs.get("back") + count = mykwargs.get("count") + header = mykwargs.get("header") + percent = mykwargs.get("percent") + text = myargs[0].encode('utf-8') + if not header: header = '' + + count_str = "" + if count: + if len(count) > 1: + if percent: + count_str = " ("+str(round((float(count[0])/count[1])*100,1))+"%) " + else: + count_str = " (%s/%s) " % (red(str(count[0])),blue(str(count[1])),) + + def is_last_newline(f): + try: + f.seek(-1,2) + last = f.read(1) + if last == "\n": + return True + except IOError: + pass + return False + + if back: + self.entropyTools.seek_till_newline(f) + txt = header+count_str+text + else: + if not is_last_newline(f): f.write("\n") + txt = header+count_str+text+"\n" + f.write(txt) + + f.flush() + + # !!! duplicate + def _get_entropy_pkginfo(self, dbconn, idpackage, repoid): + data = {} + try: + data['atom'], data['name'], data['version'], data['versiontag'], \ + data['description'], data['category'], data['chost'], \ + data['cflags'], data['cxxflags'],data['homepage'], \ + data['license'], data['branch'], data['download'], \ + data['digest'], data['slot'], data['etpapi'], \ + data['datecreation'], data['size'], data['revision'] = dbconn.getBaseData(idpackage) + except TypeError: + return data + data['injected'] = dbconn.isInjected(idpackage) + data['repoid'] = repoid + data['idpackage'] = idpackage + return data + def _get_spm_pkginfo(self, matched_atom, from_installed = False): data = {} + data['atom'] = matched_atom data['key'] = self.entropyTools.dep_getkey(matched_atom) if from_installed: data['slot'] = self.SystemManagerExecutor.SystemInterface.Entropy.SpmService.get_installed_package_slot(matched_atom) @@ -21326,7 +21864,7 @@ class SystemManagerExecutorServerRepositoryInterface: class SystemManagerRepositoryCommands(SocketCommandsSkel): - import entropyTools + import entropyTools, dumpTools def __init__(self, HostInterface): SocketCommandsSkel.__init__(self, HostInterface, inst_name = "srvrepo") @@ -21337,7 +21875,9 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): 'srvrepo:spm_remove_atoms', 'srvrepo:run_custom_shell_command', 'srvrepo:remove_entropy_packages', - 'srvrepo:search_entropy_packages' + 'srvrepo:search_entropy_packages', + 'srvrepo:run_entropy_database_updates', + 'srvrepo:run_entropy_mirror_updates' ] self.valid_commands = { 'srvrepo:sync_spm': { @@ -21497,7 +22037,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): 'args': ["myargs"], 'as_user': False, 'desc': "remove Entropy packages using their match -> (idpackage,repo)", - 'syntax': " srvrepo:remove_entropy_packages [(idpackage,repo,),(idpackage,repo,)]", + 'syntax': " srvrepo:remove_entropy_packages idpackage:repoid,idpackage,repoid,...", 'from': str(self) }, 'srvrepo:search_entropy_packages': { @@ -21520,6 +22060,76 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): 'syntax': " srvrepo:move_entropy_packages_to_repository ", 'from': str(self) }, + 'srvrepo:scan_entropy_packages_database_changes': { + 'auth': True, + 'built_in': False, + 'cb': self.docmd_scan_entropy_packages_database_changes, + 'args': ["cmd","authenticator"], + 'as_user': False, + 'desc': "scan Spm package changes to retrieve a list of action that should be run on the repositories", + 'syntax': " srvrepo:scan_entropy_packages_database_changes", + 'from': str(self) + }, + 'srvrepo:run_entropy_database_updates': { + 'auth': True, + 'built_in': False, + 'cb': self.docmd_run_entropy_database_updates, + 'args': ["cmd","myargs","authenticator"], + 'as_user': False, + 'desc': "run Entropy database updates", + 'syntax': " srvrepo:run_entropy_database_updates ", + 'from': str(self) + }, + 'srvrepo:run_entropy_dependency_test': { + 'auth': True, + 'built_in': False, + 'cb': self.docmd_run_entropy_dependency_test, + 'args': ["cmd","authenticator"], + 'as_user': False, + 'desc': "run Entropy dependency test", + 'syntax': " srvrepo:run_entropy_dependency_test", + 'from': str(self) + }, + 'srvrepo:run_entropy_library_test': { + 'auth': True, + 'built_in': False, + 'cb': self.docmd_run_entropy_library_test, + 'args': ["cmd","authenticator"], + 'as_user': False, + 'desc': "run Entropy dependency test", + 'syntax': " srvrepo:run_entropy_library_test", + 'from': str(self) + }, + 'srvrepo:run_entropy_treeupdates': { + 'auth': True, + 'built_in': False, + 'cb': self.docmd_run_entropy_treeupdates, + 'args': ["cmd","myargs","authenticator"], + 'as_user': False, + 'desc': "run Entropy database treeupdates", + 'syntax': " srvrepo:run_entropy_treeupdates ", + 'from': str(self) + }, + 'srvrepo:scan_entropy_mirror_updates': { + 'auth': True, + 'built_in': False, + 'cb': self.docmd_scan_entropy_mirror_updates, + 'args': ["cmd","myargs","authenticator"], + 'as_user': False, + 'desc': "scan mirror updates for the specified repository identifiers", + 'syntax': " srvrepo:scan_entropy_mirror_updates <...>", + 'from': str(self) + }, + 'srvrepo:run_entropy_mirror_updates': { + 'auth': True, + 'built_in': False, + 'cb': self.docmd_run_entropy_mirror_updates, + 'args': ["cmd","myargs","authenticator"], + 'as_user': False, + 'desc': "run mirror updates for the provided repositories", + 'syntax': " srvrepo:run_entropy_mirror_updates ", + 'from': str(self) + }, } def docmd_sync_spm(self, cmd, myargs, authenticator): @@ -21528,7 +22138,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): uid = userdata.get('uid') gid = userdata.get('gid') - queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'sync_spm', [], {}, False) + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'sync_spm', [], {}, False, False) if queue_id < 0: return False, queue_id return True, queue_id @@ -21594,7 +22204,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): 'cflags': cflags, } - queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'compile_atoms', [atoms][:], add_dict.copy(), False) + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'compile_atoms', [atoms][:], add_dict.copy(), False, False) if queue_id < 0: return False, queue_id return True, queue_id @@ -21636,7 +22246,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): 'nocolor': nocolor, } - queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'spm_remove_atoms', [atoms][:], add_dict.copy(), False) + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'spm_remove_atoms', [atoms][:], add_dict.copy(), False, False) if queue_id < 0: return False, queue_id return True, queue_id @@ -21648,7 +22258,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): uid = userdata.get('uid') gid = userdata.get('gid') - queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'get_spm_categories_updates', [myargs], {}, True) + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'get_spm_categories_updates', [myargs], {}, True, True) if queue_id < 0: return False, queue_id return True, queue_id @@ -21660,7 +22270,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): uid = userdata.get('uid') gid = userdata.get('gid') - queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'get_spm_categories_installed', [myargs], {}, True) + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'get_spm_categories_installed', [myargs], {}, True, True) if queue_id < 0: return False, queue_id return True, queue_id @@ -21685,7 +22295,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): uid = userdata.get('uid') gid = userdata.get('gid') - queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'enable_uses_for_atoms', [atoms,useflags], {}, True) + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'enable_uses_for_atoms', [atoms,useflags], {}, True, True) if queue_id < 0: return False, queue_id return True, queue_id @@ -21710,7 +22320,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): uid = userdata.get('uid') gid = userdata.get('gid') - queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'disable_uses_for_atoms', [atoms,useflags], {}, True) + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'disable_uses_for_atoms', [atoms,useflags], {}, True, True) if queue_id < 0: return False, queue_id return True, queue_id @@ -21722,7 +22332,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): uid = userdata.get('uid') gid = userdata.get('gid') - queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'get_spm_atoms_info', [myargs], {}, True) + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'get_spm_atoms_info', [myargs], {}, True, True) if queue_id < 0: return False, queue_id return True, queue_id @@ -21732,7 +22342,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): uid = userdata.get('uid') gid = userdata.get('gid') - queue_id = self.HostInterface.add_to_queue(cmd, '', uid, gid, 'run_spm_info', [], {}, True) + queue_id = self.HostInterface.add_to_queue(cmd, '', uid, gid, 'run_spm_info', [], {}, True, False) if queue_id < 0: return False, queue_id return True, queue_id @@ -21745,7 +22355,7 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): gid = userdata.get('gid') command = ' '.join(myargs) - queue_id = self.HostInterface.add_to_queue(cmd, command, uid, gid, 'run_custom_shell_command', [command], {}, False) + queue_id = self.HostInterface.add_to_queue(cmd, command, uid, gid, 'run_custom_shell_command', [command], {}, False, False) if queue_id < 0: return False, queue_id return True, queue_id @@ -21757,13 +22367,15 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): uid = userdata.get('uid') gid = userdata.get('gid') - queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'get_spm_glsa_data', [myargs[0]], {}, True) + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'get_spm_glsa_data', [myargs[0]], {}, True, True) if queue_id < 0: return False, queue_id return True, queue_id def docmd_get_available_repositories(self): data = {} data['available'] = self.HostInterface.Entropy.get_available_repositories() + if etpConst['clientserverrepoid'] in data['available']: + data['available'].pop(etpConst['clientserverrepoid']) data['community_mode'] = self.HostInterface.Entropy.community_repo data['current'] = self.HostInterface.Entropy.default_repository data['branches'] = etpConst['branches'] @@ -21825,11 +22437,14 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): def docmd_remove_entropy_packages(self, myargs): if not myargs: return False,'wrong arguments' - string_to_eval = ' '.join(myargs) + string = myargs[0].split(",") + matched_atoms = [] try: - matched_atoms = eval(string_to_eval) + for item in string: + mysplit = item.split(":") + matched_atoms.append((int(mysplit[0]),mysplit[1],)) except: - return False,'cannot eval() string' + return False,'cannot eval() string correctly' repo_data = {} for idpackage,repoid in matched_atoms: @@ -21865,11 +22480,128 @@ class SystemManagerRepositoryCommands(SocketCommandsSkel): queue_id = self.HostInterface.add_to_queue( cmd, ' '.join([str(x) for x in myargs]), uid, gid, 'move_entropy_packages_to_repository', - [from_repo,to_repo,idpackages,do_copy], {}, False + [from_repo,to_repo,idpackages,do_copy], {}, False, True ) if queue_id < 0: return False, queue_id return True, queue_id + def docmd_scan_entropy_packages_database_changes(self, cmd, authenticator): + + status, userdata, err_str = authenticator.docmd_userdata() + uid = userdata.get('uid') + gid = userdata.get('gid') + + queue_id = self.HostInterface.add_to_queue(cmd, '', uid, gid, 'scan_entropy_packages_database_changes', [], {}, True, True) + if queue_id < 0: return False, queue_id + return True, queue_id + + def docmd_run_entropy_dependency_test(self, cmd, authenticator): + + status, userdata, err_str = authenticator.docmd_userdata() + uid = userdata.get('uid') + gid = userdata.get('gid') + + queue_id = self.HostInterface.add_to_queue(cmd, '', uid, gid, 'run_entropy_dependency_test', [], {}, True, True) + if queue_id < 0: return False, queue_id + return True, queue_id + + def docmd_run_entropy_library_test(self, cmd, authenticator): + + status, userdata, err_str = authenticator.docmd_userdata() + uid = userdata.get('uid') + gid = userdata.get('gid') + + queue_id = self.HostInterface.add_to_queue(cmd, '', uid, gid, 'run_entropy_library_test', [], {}, True, True) + if queue_id < 0: return False, queue_id + return True, queue_id + + def docmd_run_entropy_treeupdates(self, cmd, myargs, authenticator): + + if not myargs: + return False,'wrong arguments' + + status, userdata, err_str = authenticator.docmd_userdata() + uid = userdata.get('uid') + gid = userdata.get('gid') + + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'run_entropy_treeupdates', [myargs[0]], {}, False, False) + if queue_id < 0: return False, queue_id + return True, queue_id + + def docmd_scan_entropy_mirror_updates(self, cmd, myargs, authenticator): + + if not myargs: + return False,'wrong arguments' + + status, userdata, err_str = authenticator.docmd_userdata() + uid = userdata.get('uid') + gid = userdata.get('gid') + + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'scan_entropy_mirror_updates', [myargs], {}, True, True) + if queue_id < 0: return False, queue_id + return True, queue_id + + def docmd_run_entropy_mirror_updates(self, cmd, myargs, authenticator): + + if not myargs: + return False,'wrong arguments' + + serialized_string = '\n'.join(myargs) + try: + mydict = self.dumpTools.unserialize_string(serialized_string) + except Exception, e: + return False,'cannot parse data: %s' % (e,) + + status, userdata, err_str = authenticator.docmd_userdata() + uid = userdata.get('uid') + gid = userdata.get('gid') + + queue_id = self.HostInterface.add_to_queue(cmd, '', uid, gid, 'run_entropy_mirror_updates', [mydict], {}, False, False) + if queue_id < 0: return False, queue_id + return True, queue_id + + def docmd_run_entropy_database_updates(self, cmd, myargs, authenticator): + + to_add = {} + to_remove = [] + to_inject = [] + to_add_string = '' + to_remove_string = '' + to_inject_string = '' + if myargs: + to_add_string = myargs[0].split(",") + if len(myargs) > 1: + to_remove_string = myargs[1].split(",") + if len(myargs) > 2: + to_inject_string = myargs[2].split(",") + try: + + for item in to_add_string: + atom, counter, repoid = item.split(":") + if not to_add.has_key(repoid): + to_add[repoid] = [] + to_add[repoid].append(atom) + + for item in to_remove_string: + idpackage, repoid = item.split(":") + to_remove.append((idpackage, repoid,)) + + for item in to_inject_string: + idpackage, repoid = item.split(":") + to_inject.append((idpackage, repoid,)) + + except Exception, e: + return False,'cannot run database updates properly: %s' % (e,) + + status, userdata, err_str = authenticator.docmd_userdata() + uid = userdata.get('uid') + gid = userdata.get('gid') + + queue_id = self.HostInterface.add_to_queue(cmd, ' '.join(myargs), uid, gid, 'run_entropy_database_updates', [to_add,to_remove,to_inject], {}, False, True) + if queue_id < 0: return False, queue_id + return True, queue_id + + def docmd_search_entropy_packages(self, myargs): if len(myargs) < 3: return False,'wrong arguments' @@ -21993,7 +22725,7 @@ class SystemManagerServerInterface(SocketHostInterface): 'auth': True, 'built_in': False, 'cb': self.docmd_get_queue, - 'args': [], + 'args': ["myargs"], 'as_user': False, 'desc': "get current queue", 'syntax': " systemsrv:get_queue", @@ -22111,8 +22843,27 @@ class SystemManagerServerInterface(SocketHostInterface): }, } - def docmd_get_queue(self): - return True, self.HostInterface.ManagerQueue.copy() + def docmd_get_queue(self, myargs): + + extended = False + if myargs: + extended = myargs[0] + + if extended: + return True, self.HostInterface.ManagerQueue.copy() + else: + # FUCK YEAH BITCH! + import copy + myqueue = copy.deepcopy(self.HostInterface.ManagerQueue) + for key in myqueue: + if key not in self.HostInterface.done_queue_keys: + continue + for queue_id in myqueue.get(key): + item = myqueue[key].get(queue_id) + if not item.has_key('extended_result'): + continue + item['extended_result'] = None + return True, myqueue def docmd_get_queue_item_by_id(self, myargs): @@ -22182,7 +22933,6 @@ class SystemManagerServerInterface(SocketHostInterface): item, key = self.HostInterface.get_item_by_queue_id(queue_id) if item == None: return False,'wrong queue id' - item = item.copy() if key not in self.HostInterface.done_queue_keys: return False,'process not completed yet' @@ -22190,7 +22940,13 @@ class SystemManagerServerInterface(SocketHostInterface): if not item.has_key('result'): return False,'result not available' - return True,item['result'] + item = item.copy() + + ext_result = None + if item.has_key('extended_result'): + ext_result = item['extended_result'] + + return True,(item['result'],ext_result,) def docmd_remove_queue_ids(self, myargs): @@ -22334,6 +23090,7 @@ class SystemManagerServerInterface(SocketHostInterface): kwargs['external_cmd_classes'].insert(0,self.SystemCommands) self.Entropy = EntropyInterface(**entropy_interface_kwargs) + self.Text = TextInterface() self.SystemExecutor = SystemManagerExecutorInterface(self, self.Entropy) self.ExecutorCommandClasses = [(self.BuiltInSystemManagerExecutorCommands,[],{},)] @@ -22378,7 +23135,7 @@ class SystemManagerServerInterface(SocketHostInterface): SocketHostInterface.__init__( self, self.FakeServiceInterface, #self.ServiceInterface, - sock_output = self.Entropy, + sock_output = self.Text, ssl = do_ssl, **kwargs ) @@ -22535,7 +23292,7 @@ class SystemManagerServerInterface(SocketHostInterface): return True - def add_to_queue(self, command_name, command_text, user_id, group_id, function, args, kwargs, do_parallel): + def add_to_queue(self, command_name, command_text, user_id, group_id, function, args, kwargs, do_parallel, extended_result): if function not in self.SystemExecutor.available_commands: return -1 @@ -22559,6 +23316,8 @@ class SystemManagerServerInterface(SocketHostInterface): 'processing_pid': None, 'do_parallel': do_parallel, } + if extended_result: + myqueue_dict['extended_result'] = None self.ManagerQueue['queue'][queue_id] = myqueue_dict self.ManagerQueue['queue_order'].append(queue_id) self.store_queue() @@ -22708,7 +23467,16 @@ class SystemManagerServerInterface(SocketHostInterface): raise wait_and_takeover() - command_data['result'] = copy_obj(result) + if command_data.has_key('extended_result') and done: + try: + command_data['result'], command_data['extended_result'] = copy_obj(result) + except TypeError: + done = False + command_data['result'] = 'wrong tuple split from queue processor (1)' + command_data['extended_result'] = None + else: + command_data['result'] = copy_obj(result) + if not done: self.ManagerQueue['processing'].pop(queue_id) self.ManagerQueue['processing_order'].remove(queue_id) @@ -22722,7 +23490,7 @@ class SystemManagerServerInterface(SocketHostInterface): done, cmd_result = result except TypeError: done = False - cmd_result = 'wrong tuple split from queue processor' + command_data['result'] = 'wrong tuple split from queue processor (2)' if not done: self.ManagerQueue['processing'].pop(queue_id) self.ManagerQueue['processing_order'].remove(queue_id) @@ -22765,11 +23533,12 @@ class SystemManagerClientCommands(EntropySocketClientCommands): ) return self.do_generic_handler(cmd, session_id) - def get_queue(self, session_id): + def get_queue(self, session_id, extended): - cmd = "%s %s" % ( + cmd = "%s %s %s" % ( session_id, 'systemsrv:get_queue', + extended, ) return self.do_generic_handler(cmd, session_id) @@ -22889,6 +23658,8 @@ class SystemManagerClientCommands(EntropySocketClientCommands): class SystemManagerRepositoryClientCommands(SystemManagerClientCommands): + import dumpTools + def sync_spm(self, session_id): cmd = "%s %s" % ( @@ -23078,7 +23849,7 @@ class SystemManagerRepositoryClientCommands(SystemManagerClientCommands): cmd = "%s %s %s" % ( session_id, 'srvrepo:remove_entropy_packages', - matched_atoms, + ','.join(["%s:%s" % (str(x[0]),str(x[1]),) for x in matched_atoms]), # 1:repoid,2:repoid ) return self.do_generic_handler(cmd, session_id) @@ -23105,6 +23876,69 @@ class SystemManagerRepositoryClientCommands(SystemManagerClientCommands): ) return self.do_generic_handler(cmd, session_id) + def scan_entropy_packages_database_changes(self, session_id): + + cmd = "%s %s" % ( + session_id, + 'srvrepo:scan_entropy_packages_database_changes', + ) + return self.do_generic_handler(cmd, session_id) + + def run_entropy_database_updates(self, session_id, to_add, to_remove, to_inject): + + cmd = "%s %s %s %s %s" % ( + session_id, + 'srvrepo:run_entropy_database_updates', + ','.join(["%s:%s:%s" % (str(x[0]),str(x[1]),str(x[2]),) for x in to_add]), + ','.join(["%s:%s" % (str(x[0]),str(x[1]),) for x in to_remove]), + ','.join(["%s:%s" % (str(x[0]),str(x[1]),) for x in to_inject]), + ) + return self.do_generic_handler(cmd, session_id) + + def run_entropy_dependency_test(self, session_id): + + cmd = "%s %s" % ( + session_id, + 'srvrepo:run_entropy_dependency_test', + ) + return self.do_generic_handler(cmd, session_id) + + def run_entropy_library_test(self, session_id): + + cmd = "%s %s" % ( + session_id, + 'srvrepo:run_entropy_library_test', + ) + return self.do_generic_handler(cmd, session_id) + + def run_entropy_treeupdates(self, session_id, repoid): + + cmd = "%s %s %s" % ( + session_id, + 'srvrepo:run_entropy_treeupdates', + repoid, + ) + return self.do_generic_handler(cmd, session_id) + + def scan_entropy_mirror_updates(self, session_id, repositories): + + cmd = "%s %s %s" % ( + session_id, + 'srvrepo:scan_entropy_mirror_updates', + ' '.join(repositories), + ) + return self.do_generic_handler(cmd, session_id) + + def run_entropy_mirror_updates(self, session_id, repository_data): + + serialized_string = self.dumpTools.serialize_string(repository_data) + cmd = "%s %s %s" % ( + session_id, + 'srvrepo:run_entropy_mirror_updates', + serialized_string, + ) + return self.do_generic_handler(cmd, session_id) + class SystemManagerMethodsInterface: def __init__(self, SystemManagerClientInstance): @@ -23118,14 +23952,16 @@ class SystemManagerMethodsInterface: }, 'get_queue': { 'desc': _("Get current queue content"), - 'params': [], + 'params': [ + ('extended',bool,_('Extended results'),False,) + ], 'call': self.get_queue, 'private': True, }, 'get_queue_item_by_id': { 'desc': _("Get queue item using its queue unique identifier"), 'params': [('queue_id',int,_('Queue Identifier'),True,)], - 'call': self.get_queue, + 'call': self.get_queue_item_by_id, 'private': True, }, 'get_queue_id_stdout': { @@ -23202,8 +24038,8 @@ class SystemManagerMethodsInterface: def get_available_commands(self): return self.Manager.do_cmd(False, "available_commands", [], {}) - def get_queue(self): - return self.Manager.do_cmd(True, "get_queue", [], {}) + def get_queue(self, extended = False): + return self.Manager.do_cmd(True, "get_queue", [extended], {}) def get_queue_item_by_id(self, queue_id): return self.Manager.do_cmd(True, "get_queue_item_by_id", [queue_id], {}) @@ -23393,6 +24229,58 @@ class SystemManagerRepositoryMethodsInterface(SystemManagerMethodsInterface): 'call': self.search_entropy_packages, 'private': False, }, + 'scan_entropy_packages_database_changes': { + 'desc': _("Scan Spm package changes and retrieve a list of action that should be run on the repositories"), + 'params': [], + 'call': self.scan_entropy_packages_database_changes, + 'private': False, + }, + 'run_entropy_database_updates': { + 'desc': _("Run Entropy database updates"), + 'params': [ + ('to_add',list,_('Matches to add from Spm'),True,), + ('to_remove',list,_('Matches to remove from repository database'),True,), + ('to_inject',list,_('Matches to inject on repository database'),True,), + ], + 'call': self.run_entropy_database_updates, + 'private': False, + }, + 'run_entropy_dependency_test': { + 'desc': _("Run Entropy dependency test"), + 'params': [], + 'call': self.run_entropy_dependency_test, + 'private': False, + }, + 'run_entropy_library_test': { + 'desc': _("Run Entropy library test"), + 'params': [], + 'call': self.run_entropy_library_test, + 'private': False, + }, + 'run_entropy_treeupdates': { + 'desc': _("Run Entropy tree updates"), + 'params': [ + ('repoid',basestring,_('Repository Identifier'),True,), + ], + 'call': self.run_entropy_treeupdates, + 'private': False, + }, + 'scan_entropy_mirror_updates': { + 'desc': _("Scan for Mirror updates and retrieve a list of action that should be run"), + 'params': [ + ('repositories',list,_('list of repository identifiers'),True,), + ], + 'call': self.scan_entropy_mirror_updates, + 'private': False, + }, + 'run_entropy_mirror_updates': { + 'desc': _("Run Mirror updates for the provided repositories and its data"), + 'params': [ + ('repository_data',dict,_('composed repository data'),True,), + ], + 'call': self.run_entropy_mirror_updates, + 'private': False, + }, }) def sync_spm(self): @@ -23473,6 +24361,27 @@ class SystemManagerRepositoryMethodsInterface(SystemManagerMethodsInterface): def move_entropy_packages_to_repository(self, idpackages, from_repo, to_repo, do_copy = False): return self.Manager.do_cmd(True, "move_entropy_packages_to_repository", [idpackages,from_repo,to_repo, do_copy], {}) + def scan_entropy_packages_database_changes(self): + return self.Manager.do_cmd(True, "scan_entropy_packages_database_changes", [], {}) + + def run_entropy_database_updates(self, to_add, to_remove, to_inject): + return self.Manager.do_cmd(True, "run_entropy_database_updates", [to_add,to_remove,to_inject], {}) + + def run_entropy_dependency_test(self): + return self.Manager.do_cmd(True, "run_entropy_dependency_test", [], {}) + + def run_entropy_library_test(self): + return self.Manager.do_cmd(True, "run_entropy_library_test", [], {}) + + def run_entropy_treeupdates(self, repoid): + return self.Manager.do_cmd(True, "run_entropy_treeupdates", [repoid], {}) + + def scan_entropy_mirror_updates(self, repositories): + return self.Manager.do_cmd(True, "scan_entropy_mirror_updates", [repositories], {}) + + def run_entropy_mirror_updates(self, repository_data): + return self.Manager.do_cmd(True, "run_entropy_mirror_updates", [repository_data], {}) + class SystemManagerClientInterface: ssl_connection = True @@ -23499,8 +24408,8 @@ class SystemManagerClientInterface: else: self.MethodsInterface = SystemManagerMethodsInterface - import socket, struct - self.socket, self.struct = socket, struct + import socket, struct, entropyTools + self.socket, self.struct, self.entropyTools = socket, struct, entropyTools self.Entropy = EntropyInstance self.hostname = None self.hostport = None @@ -23510,6 +24419,15 @@ class SystemManagerClientInterface: self.show_progress = show_progress self.ClientCommandsInterface = ClientCommandsInterface self.Methods = self.MethodsInterface(self) + self.connection_cache = {} + self.CacheLock = thread.allocate_lock() + self.shutdown = False + self.connection_killer = self.entropyTools.parallelTask(self.connection_killer_handler) + self.connection_killer.start() + + def __del__(self): + if hasattr(self,'shutdown'): + self.shutdown = True def _validate_credentials(self): if not isinstance(self.hostname,basestring): @@ -23523,6 +24441,56 @@ class SystemManagerClientInterface: if not isinstance(self.ssl_connection,bool): raise exceptionTools.IncorrectParameter("IncorrectParameter: ssl_connection: %s. %s" % (_('not a bool'),_('Please use setup_connection() properly'),)) + def get_connection_cache_key(self): + return hash((self.hostname, self.hostport, self.username, self.password, self.ssl_connection,)) + + def get_connection_cache(self): + key = self.get_connection_cache_key() + return self.connection_cache.get(key) + + def cache_connection(self, srv): + key = self.get_connection_cache_key() + self.connection_cache[key] = { + 'conn': srv, + 'ts': self.get_ts(), + } + + def update_connection_ts(self): + key = self.get_connection_cache_key() + if key not in self.connection_cache: + return + self.connection_cache[key]['ts'] = self.get_ts() + + def connection_killer_handler(self): + while 1: + + time.sleep(2) + + if self.shutdown: + break + + if not self.connection_cache: + continue + + keys = self.connection_cache.keys() + for key in keys: + curr_ts = self.get_ts() + ts = self.connection_cache[key]['ts'] + delta = curr_ts - ts + if delta.seconds < 60: + continue + self.CacheLock.acquire() + try: + data = self.connection_cache.pop(key) + finally: + self.CacheLock.release() + srv = data['conn'] + srv.disconnect() + + def get_ts(self): + from datetime import datetime + return datetime.fromtimestamp(time.time()) + def setup_connection(self, hostname, port, username, password, ssl): self.hostname = hostname self.hostport = port @@ -23562,27 +24530,39 @@ class SystemManagerClientInterface: # eval(func) must have session as first param def do_cmd(self, login_required, func, args, kwargs): - srv = self.get_service_connection() - if srv == None: - return False, 'no connection' - session = srv.open_session() - if session == None: - return False, 'no session' - args.insert(0,session) + self.CacheLock.acquire() + try: + srv = self.get_connection_cache() + if srv == None: + srv = self.get_service_connection() + if srv != None: self.cache_connection(srv) + else: + srv = srv['conn'] - if login_required: - logged, error = self.login(srv, session) - if not logged: - srv.close_session(session) - srv.disconnect() - return False, error + if srv == None: + return False, 'no connection' + session = srv.open_session() + if session == None: + return False, 'no session' - rslt = eval("srv.CmdInterface.%s" % (func,))(*args,**kwargs) - if login_required: - self.logout(srv, session) - srv.close_session(session) - srv.disconnect() - return rslt + self.update_connection_ts() + args.insert(0,session) + + if login_required: + logged, error = self.login(srv, session) + if not logged: + srv.close_session(session) + #srv.disconnect() + return False, error + + rslt = eval("srv.CmdInterface.%s" % (func,))(*args,**kwargs) + if login_required: + self.logout(srv, session) + srv.close_session(session) + #srv.disconnect() + return rslt + finally: + self.CacheLock.release() def get_available_client_commands(self): return self.Methods.available_commands.copy() @@ -24701,13 +25681,16 @@ class ServerMirrorsInterface: return False - def get_remote_databases_status(self, repo = None): + def get_remote_databases_status(self, repo = None, mirrors = []): if repo == None: repo = self.Entropy.default_repository + if not mirrors: + mirrors = self.Entropy.get_remote_mirrors(repo) + data = [] - for uri in self.Entropy.get_remote_mirrors(repo): + for uri in mirrors: ftp = self.FtpInterface(uri, self.Entropy) try: @@ -26745,25 +27728,32 @@ class ServerMirrorsInterface: # if at least one server has been synced successfully, move files if (len(successfull_mirrors) > 0) and not pretend: for branch in pkgbranches: - branch_dir = os.path.join(self.Entropy.get_local_upload_directory(repo),branch) - branchcontent = os.listdir(branch_dir) - for xfile in branchcontent: - source = os.path.join(self.Entropy.get_local_upload_directory(repo),branch,xfile) - destdir = os.path.join(self.Entropy.get_local_packages_directory(repo),branch) - if not os.path.isdir(destdir): - os.makedirs(destdir) - dest = os.path.join(destdir,xfile) - shutil.move(source,dest) - # clear expiration file - dest_expiration = dest+etpConst['packagesexpirationfileext'] - if os.path.isfile(dest_expiration): - os.remove(dest_expiration) + self.remove_expiration_files(branch, repo) if packages_check: check_data = self.Entropy.verify_local_packages([], ask = ask, repo = repo) return mirrors_tainted, mirrors_errors, successfull_mirrors, broken_mirrors, check_data + def remove_expiration_files(self, branch, repo = None): + + if repo == None: + repo = self.Entropy.default_repository + + branch_dir = os.path.join(self.Entropy.get_local_upload_directory(repo),branch) + branchcontent = os.listdir(branch_dir) + for xfile in branchcontent: + source = os.path.join(self.Entropy.get_local_upload_directory(repo),branch,xfile) + destdir = os.path.join(self.Entropy.get_local_packages_directory(repo),branch) + if not os.path.isdir(destdir): + os.makedirs(destdir) + dest = os.path.join(destdir,xfile) + shutil.move(source,dest) + # clear expiration file + dest_expiration = dest+etpConst['packagesexpirationfileext'] + if os.path.isfile(dest_expiration): + os.remove(dest_expiration) + def is_package_expired(self, package_file, branch, repo = None): pkg_path = os.path.join(self.Entropy.get_local_packages_directory(repo),branch,package_file) @@ -27183,7 +28173,7 @@ class EntropyDatabaseInterface: ) # check for /usr/portage/profiles/updates changes - def serverUpdatePackagesData(self): + def serverUpdatePackagesData(self, ask = True): etpConst['server_treeupdatescalled'].add(self.server_repo) @@ -27270,7 +28260,7 @@ class EntropyDatabaseInterface: self.ServiceInterface.doServerDatabaseSyncLock(self.server_repo, self.noUpload) # now run queue try: - self.runTreeUpdatesActions(update_actions) + self.runTreeUpdatesActions(update_actions, ask = ask) except: # destroy digest self.setRepositoryUpdatesDigest(self.server_repo, "-1") @@ -27409,7 +28399,7 @@ class EntropyDatabaseInterface: return new_actions # this is the place to add extra actions support - def runTreeUpdatesActions(self, actions): + def runTreeUpdatesActions(self, actions, ask = True): # just run fixpackages if gentoo-compat is enabled if etpConst['gentoo-compat']: @@ -27458,7 +28448,7 @@ class EntropyDatabaseInterface: if quickpkg_atoms and not self.clientDatabase: # quickpkg package and packages owning it as a dependency try: - self.runTreeUpdatesQuickpkgAction(quickpkg_atoms) + self.runTreeUpdatesQuickpkgAction(quickpkg_atoms, ask = ask) except: self.entropyTools.printTraceback() mytxt = "%s: %s: %s, %s." % ( @@ -27640,7 +28630,7 @@ class EntropyDatabaseInterface: quickpkg_queue.add(myatom) return quickpkg_queue - def runTreeUpdatesQuickpkgAction(self, atoms): + def runTreeUpdatesQuickpkgAction(self, atoms, ask = True): branch = etpConst['branch'] # ask branch question @@ -27648,29 +28638,30 @@ class EntropyDatabaseInterface: _("Would you like to continue with the default branch"), # it is a question branch, ) - rc = self.askQuestion(mytxt) - if rc == "No": - # ask which - while 1: - branch = readtext("%s: " % (_("Type your branch"),)) # use the keyboard! - if branch not in self.listAllBranches(): - mytxt = "%s: %s: %s" % ( - bold(_("ATTENTION")), - red(_("the specified branch does not exist")), - blue(branch), - ) - self.updateProgress( - mytxt, - importance = 1, - type = "warning", - header = darkred(" * ") - ) - continue - # ask to confirm - mytxt = "%s '%s' ?" % (_("Do you confirm"),branch,) - rc = self.askQuestion(mytxt) - if rc == "Yes": - break + if ask: + rc = self.askQuestion(mytxt) + if rc == "No": + # ask which + while 1: + branch = readtext("%s: " % (_("Type your branch"),)) # use the keyboard! + if branch not in self.listAllBranches(): + mytxt = "%s: %s: %s" % ( + bold(_("ATTENTION")), + red(_("the specified branch does not exist")), + blue(branch), + ) + self.updateProgress( + mytxt, + importance = 1, + type = "warning", + header = darkred(" * ") + ) + continue + # ask to confirm + mytxt = "%s '%s' ?" % (_("Do you confirm"),branch,) + rc = self.askQuestion(mytxt) + if rc == "Yes": + break self.commitChanges()