From 2e92fddbf5b86d956af68bb0e7b8b396c2afe440 Mon Sep 17 00:00:00 2001 From: Fabio Erculiani Date: Tue, 3 Dec 2013 17:58:09 +0100 Subject: [PATCH] [entropy.client.package.actions] multifetch: add concurrency support --- .../interfaces/package/actions/fetch.py | 16 +- .../interfaces/package/actions/multifetch.py | 509 ++++++++++-------- 2 files changed, 310 insertions(+), 215 deletions(-) diff --git a/lib/entropy/client/interfaces/package/actions/fetch.py b/lib/entropy/client/interfaces/package/actions/fetch.py index c234bc238..179379afc 100644 --- a/lib/entropy/client/interfaces/package/actions/fetch.py +++ b/lib/entropy/client/interfaces/package/actions/fetch.py @@ -20,7 +20,7 @@ from entropy.const import etpConst, const_debug_write, const_debug_enabled, \ from entropy.client.mirrors import StatusInterface from entropy.fetchers import UrlFetcher from entropy.i18n import _ -from entropy.output import red, darkred, blue, purple, darkgreen +from entropy.output import red, darkred, blue, purple, darkgreen, brown from entropy.security import Repository as RepositorySecurity import entropy.dep @@ -1059,8 +1059,11 @@ class _PackageFetchAction(PackageAction): if cmp_func is None: continue + down_name = os.path.basename(download_path) + self._entropy.output( - "%s: %s" % ( + "[%s] %s: %s" % ( + brown(down_name), blue(_("Checking package signature")), purple(hash_type.upper()), ), @@ -1073,7 +1076,8 @@ class _PackageFetchAction(PackageAction): valid = cmp_func(download_path, hash_val) if valid is None: self._entropy.output( - "%s '%s' %s" % ( + "[%s] %s '%s' %s" % ( + brown(down_name), darkred(_("Package signature verification")), purple(hash_type.upper()), darkred(_("temporarily unavailable")), @@ -1086,7 +1090,8 @@ class _PackageFetchAction(PackageAction): if not valid: self._entropy.output( - "%s: %s %s" % ( + "[%s] %s: %s %s" % ( + brown(down_name), darkred(_("Package signature")), purple(hash_type.upper()), darkred(_("does not match the recorded one")), @@ -1098,7 +1103,8 @@ class _PackageFetchAction(PackageAction): return 1 self._entropy.output( - "%s %s" % ( + "[%s] %s %s" % ( + brown(down_name), purple(hash_type.upper()), darkgreen(_("matches")), ), diff --git a/lib/entropy/client/interfaces/package/actions/multifetch.py b/lib/entropy/client/interfaces/package/actions/multifetch.py index ca004040b..9c0b2f79d 100644 --- a/lib/entropy/client/interfaces/package/actions/multifetch.py +++ b/lib/entropy/client/interfaces/package/actions/multifetch.py @@ -9,10 +9,12 @@ B{Entropy Package Manager Client Package Interface}. """ +import contextlib import errno import os +import threading -from entropy.const import etpConst, const_setup_perms +from entropy.const import etpConst, const_setup_perms, const_mkstemp from entropy.client.mirrors import StatusInterface from entropy.fetchers import UrlFetcher from entropy.output import blue, darkblue, bold, red, darkred, brown, darkgreen @@ -81,64 +83,28 @@ class _PackageMultiFetchAction(_PackageFetchAction): metadata['matches'] = self._package_matches - temp_fetch_list = [] - temp_checksum_list = [] - temp_already_downloaded_count = 0 - - def _setup_download(download, size, package_id, repository_id, digest, - signatures): - - inst_repo = self._entropy.installed_repository() - installed_package_id = None - - repo = self._entropy.open_repository(repository_id) - key_slot = repo.retrieveKeySlotAggregated(package_id) - if key_slot: - installed_package_id, _inst_rc = inst_repo.atomMatch(key_slot) - - obj = (package_id, repository_id, download, digest, signatures) - temp_checksum_list.append(obj) - - down_path = self.get_standard_fetch_disk_path(download) - try: - down_st = os.lstat(down_path) - st_size = down_st.st_size - except OSError as err: - if err.errno != errno.ENOENT: - raise - st_size = None - - if st_size is not None: - if st_size == size: - return 1 - else: - obj = (package_id, repository_id, download, digest, signatures) - temp_fetch_list.append(obj) - - return 0 + download_list = [] for package_id, repository_id in self._package_matches: - if repository_id.endswith(etpConst['packagesext']): + if self._entropy._is_package_repository(repository_id): continue repo = self._entropy.open_repository(repository_id) - atom = repo.retrieveAtom(package_id) - download = repo.retrieveDownloadURL(package_id) digest = repo.retrieveDigest(package_id) sha1, sha256, sha512, gpg = repo.retrieveSignatures(package_id) - size = repo.retrieveSize(package_id) + extra_downloads = repo.retrieveExtraDownload(package_id) + signatures = { 'sha1': sha1, 'sha256': sha256, 'sha512': sha512, 'gpg': gpg, } - temp_already_downloaded_count += _setup_download( - download, size, package_id, repository_id, digest, signatures) - extra_downloads = repo.retrieveExtraDownload(package_id) + obj = (package_id, repository_id, download, digest, signatures) + download_list.append(obj) splitdebug = metadata['splitdebug'] # if splitdebug is enabled, check if it's also enabled @@ -153,7 +119,6 @@ class _PackageMultiFetchAction(_PackageFetchAction): for extra_download in extra_downloads: download = extra_download['download'] - size = extra_download['size'] digest = extra_download['md5'] signatures = { 'sha1': extra_download['sha1'], @@ -161,22 +126,15 @@ class _PackageMultiFetchAction(_PackageFetchAction): 'sha512': extra_download['sha512'], 'gpg': extra_download['gpg'], } - temp_already_downloaded_count += _setup_download( - download, size, package_id, repository_id, - digest, signatures) - metadata['multi_fetch_list'] = temp_fetch_list - metadata['multi_checksum_list'] = temp_checksum_list + obj = (package_id, repository_id, download, digest, signatures) + download_list.append(obj) + + metadata['multi_fetch_list'] = download_list metadata['phases'] = [] if metadata['multi_fetch_list']: - metadata['phases'].append(self._fetch) - - if metadata['multi_checksum_list']: - metadata['phases'].append(self._checksum) - - if temp_already_downloaded_count == len(temp_checksum_list): - metadata['phases'].reverse() + metadata['phases'].append(self._fetch_phase) self._meta = metadata @@ -193,47 +151,92 @@ class _PackageMultiFetchAction(_PackageFetchAction): break return exit_st + def _setup_url_directories(self, url_data): + """ + Create the directories needed to download the files in url_data. + """ + for _pkg_id, _repository_id, _url, dest_path, _cksum, _sig in url_data: + dest_dir = os.path.dirname(dest_path) + try: + os.makedirs(dest_dir, 0o775) + const_setup_perms(dest_dir, etpConst['entropygid']) + except OSError as err: + if err.errno != errno.EEXIST: + raise + def _try_edelta_multifetch(self, url_data, resume): """ Attempt to download and use the edelta file. """ # no edelta support enabled if not self._meta.get('edelta_support'): - return [], 0.0, False + return [], 0.0, 0 if not entropy.tools.is_entropy_delta_available(): - return [], 0.0, False + return [], 0.0, 0 + + self._setup_url_directories(url_data) + + edelta_approvals = [] + inst_repo = self._entropy.installed_repository() + with inst_repo.shared(): + + for (pkg_id, repository_id, url, download_path, + cksum, signs) in url_data: + + repo = self._entropy.open_repository(repository_id) + if cksum is None: + # cannot setup edelta without checksum, get from repository + cksum = repo.retrieveDigest(pkg_id) + if cksum is None: + # still nothing + continue + + key_slot = repo.retrieveKeySlotAggregated(pkg_id) + if key_slot is None: + # wtf corrupted entry, skip + continue + + installed_package_id, _inst_rc = inst_repo.atomMatch(key_slot) + if installed_package_id == -1: + # package is not installed + continue + + installed_url = inst_repo.retrieveDownloadURL( + installed_package_id) + installed_checksum = inst_repo.retrieveDigest( + installed_package_id) + installed_download_path = self.get_standard_fetch_disk_path( + installed_url) + + if installed_download_path == download_path: + # collision between what we need locally and what we need + # remotely, definitely edelta fetch is not going to work. + # Abort here. + continue + + edelta_approvals.append( + (pkg_id, repository_id, + url, cksum, signs, download_path, installed_url, + installed_checksum, installed_download_path)) + + if not edelta_approvals: + return [], 0.0, 0 url_path_list = [] url_data_map = {} url_data_map_idx = 0 - inst_repo = self._entropy.installed_repository() - for pkg_id, repository_id, url, dest_path, cksum in url_data: - repo = self._entropy.open_repository(repository_id) - if cksum is None: - # cannot setup edelta without checksum, get from repository - cksum = repo.retrieveDigest(pkg_id) - if cksum is None: - # still nothing - continue + for tup in edelta_approvals: - key_slot = repo.retrieveKeySlotAggregated(pkg_id) - if key_slot is None: - # wtf corrupted entry, skip - continue + (pkg_id, repository_id, url, + cksum, signs, download_path, installed_url, + installed_checksum, installed_download_path) = tup - installed_package_id, _inst_rc = inst_repo.atomMatch(key_slot) - if installed_package_id == -1: - # package is not installed - continue - - installed_url = inst_repo.retrieveDownloadURL( - installed_package_id) - installed_checksum = inst_repo.retrieveDigest( - installed_package_id) - installed_download_path = self.get_standard_fetch_disk_path( - installed_url) + # installed_download_path is read in a fault-tolerant mode + # so, there is no need for locking. + edelta_download_path = download_path + edelta_download_path += etpConst['packagesdeltaext'] edelta_url = self._approve_edelta_unlocked( url, cksum, installed_url, installed_checksum, @@ -242,23 +245,55 @@ class _PackageMultiFetchAction(_PackageFetchAction): # no edelta support continue - edelta_save_path = dest_path + etpConst['packagesdeltaext'] - key = (edelta_url, edelta_save_path) + key = (edelta_url, edelta_download_path) + url_path_list.append(key) url_data_map_idx += 1 url_data_map[url_data_map_idx] = ( pkg_id, repository_id, url, - dest_path, cksum, edelta_url, - edelta_save_path, installed_download_path) + download_path, cksum, signs, edelta_url, + edelta_download_path, installed_download_path) if not url_path_list: # no martini, no party! - return [], 0.0, False + return [], 0.0, 0 + + return self._try_edelta_multifetch_internal( + url_path_list, url_data_map, resume) + + def _try_edelta_multifetch_internal(self, url_path_list, + url_data_map, resume): + """ + _try_edelta_multifetch(), assuming that the relevant file locks + are held. + """ + @contextlib.contextmanager + def download_context(path): + lock = None + try: + lock = self.path_lock(path) + with lock.exclusive(): + yield + finally: + if lock is not None: + lock.close() + + def pre_download_hook(path): + # assume that, if path is available, it's been + # downloaded already. checksum verification will + # happen afterwards. + if self._stat_path(path): + # this is not None and not an established + # UrlFetcher return code code + return self fetch_abort_function = self._meta.get('fetch_abort_function') - fetch_intf = self._entropy._multiple_url_fetcher(url_path_list, - resume = resume, abort_check_func = fetch_abort_function, - url_fetcher_class = self._entropy._url_fetcher) + fetch_intf = self._entropy._multiple_url_fetcher( + url_path_list, resume = resume, + abort_check_func = fetch_abort_function, + url_fetcher_class = self._entropy._url_fetcher, + download_context_func = download_context, + pre_download_hook = pre_download_hook) try: # make sure that we don't need to abort already # doing the check here avoids timeouts @@ -267,7 +302,7 @@ class _PackageMultiFetchAction(_PackageFetchAction): data = fetch_intf.download() except KeyboardInterrupt: - return [], 0.0, True + return [], 0.0, -100 data_transfer = fetch_intf.get_transfer_rate() fetch_errors = ( @@ -284,31 +319,53 @@ class _PackageMultiFetchAction(_PackageFetchAction): continue (pkg_id, repository_id, url, dest_path, - orig_cksum, edelta_url, edelta_save_path, - installed_fetch_path) = url_data_map[url_data_map_idx] + orig_cksum, _signs, _edelta_url, edelta_download_path, + installed_download_path) = url_data_map[url_data_map_idx] - # now check - tmp_dest_path = dest_path + ".edelta_pkg_tmp" - # yay, we can apply the delta and cook the new package file! + dest_path_dir = os.path.dirname(dest_path) + + lock = None try: - entropy.tools.apply_entropy_delta(installed_fetch_path, - edelta_save_path, tmp_dest_path) - except IOError: - # give up with this edelta - try: - os.remove(tmp_dest_path) - except (OSError, IOError): - pass - continue + lock = self.path_lock(edelta_download_path) + with lock.shared(): - os.rename(tmp_dest_path, dest_path) - valid_idxs.append(url_data_map_idx) + tmp_fd, tmp_path = None, None + try: + tmp_fd, tmp_path = const_mkstemp( + dir=dest_path_dir, suffix=".edelta_pkg_tmp") + + try: + entropy.tools.apply_entropy_delta( + installed_download_path, # best effort read + edelta_download_path, # shared lock + tmp_path) # atomically created path + except IOError: + continue + + os.rename(tmp_path, dest_path) + valid_idxs.append(url_data_map_idx) + + finally: + if tmp_fd is not None: + try: + os.close(tmp_fd) + except OSError: + pass + if tmp_path is not None: + try: + os.remove(tmp_path) + except OSError: + pass + + finally: + if lock is not None: + lock.close() fetched_url_data = [] for url_data_map_idx in valid_idxs: (pkg_id, repository_id, url, dest_path, - orig_cksum, edelta_url, edelta_save_path, - installed_fetch_path) = url_data_map[url_data_map_idx] + orig_cksum, signs, _edelta_url, edelta_download_path, + installed_download_path) = url_data_map[url_data_map_idx] try: valid = entropy.tools.compare_md5(dest_path, orig_cksum) @@ -316,95 +373,138 @@ class _PackageMultiFetchAction(_PackageFetchAction): valid = False if valid: - url_data_item = (pkg_id, repository_id, url, - dest_path, orig_cksum) + url_data_item = ( + pkg_id, repository_id, url, + dest_path, orig_cksum, signs + ) fetched_url_data.append(url_data_item) - return fetched_url_data, data_transfer, False + return fetched_url_data, data_transfer, 0 - def _fetch_files(self, url_data_list, resume = True): + def _download_files(self, url_data, resume = True): """ Effectively fetch the package files. """ + self._setup_url_directories(url_data) - def _generate_checksum_map(url_data): - ck_map = {} - ck_map_id = 0 - for _pkg_id, _repository_id, _url, _dest_path, cksum in url_data: - ck_map_id += 1 - if cksum is not None: - ck_map[ck_map_id] = cksum - return ck_map + @contextlib.contextmanager + def download_context(path): + lock = None + try: + lock = self.path_lock(path) + with lock.exclusive(): + yield # hooks running inside here + finally: + if lock is not None: + lock.close() - fetch_abort_function = self._meta.get('fetch_abort_function') - # avoid tainting data pointed by url_data_list - url_data = url_data_list[:] - diff_map = {} + # set of paths that have been verified and don't need any + # firther match_checksum() call. + validated_download_ids_lock = threading.Lock() + validated_download_ids = set() - # setup directories - for pkg_id, repository_id, url, dest_path, _cksum in url_data: - dest_dir = os.path.dirname(dest_path) - if not os.path.isdir(dest_dir): - os.makedirs(dest_dir, 0o775) - const_setup_perms(dest_dir, etpConst['entropygid']) + # Note: the following two hooks are running in separate threads. - checksum_map = _generate_checksum_map(url_data) - fetched_url_data, data_transfer, abort = self._try_edelta_multifetch( - url_data, resume) - if abort: - return -100, {}, 0 + def pre_download_hook(path, download_id): + path_data = url_data[download_id - 1] + (_hook_package_id, hook_repository_id, _hook_url, + hook_download_path, hook_cksum, hook_signs) = path_data - for url_data_item in fetched_url_data: - url_data.remove(url_data_item) + if self._stat_path(path): + verify_st = self._match_checksum( + hook_download_path, + hook_repository_id, + hook_cksum, + hook_signs) + if verify_st == 0: + # UrlFetcher returns the md5 checksum on success + with validated_download_ids_lock: + validated_download_ids.add(download_id) + return hook_cksum - # some packages haven't been downloaded using edelta - if url_data: + # request the download + return None - url_path_list = [] - for pkg_id, repository_id, url, dest_path, _cksum in url_data: - url_path_list.append((url, dest_path,)) - self._setup_differential_download( - self._entropy._multiple_url_fetcher, url, resume, dest_path, + def post_download_hook(_path, _status, download_id): + path_data = url_data[download_id - 1] + (_hook_package_id, hook_repository_id, _hook_url, + hook_download_path, hook_cksum, hook_signs) = path_data + + with validated_download_ids_lock: + if hook_download_path in validated_download_ids: + # nothing to check, path already verified + return + + verify_st = self._match_checksum( + hook_download_path, + hook_repository_id, + hook_cksum, + hook_signs) + if verify_st == 0: + with validated_download_ids_lock: + validated_download_ids.add(download_id) + + url_path_list = [] + for pkg_id, repository_id, url, download_path, _cksum, _sig in url_data: + url_path_list.append((url, download_path)) + + lock = None + try: + # hold a lock against the download path, like fetch.py does. + lock = self.path_lock(download_path) + with lock.exclusive(): + + self._setup_differential_download( + self._entropy._multiple_url_fetcher, url, + resume, download_path, repository_id, pkg_id) - # load class - fetch_intf = self._entropy._multiple_url_fetcher( - url_path_list, resume = resume, - abort_check_func = fetch_abort_function, - url_fetcher_class = self._entropy._url_fetcher, - checksum = True) - try: - # make sure that we don't need to abort already - # doing the check here avoids timeouts - if fetch_abort_function != None: - fetch_abort_function() + finally: + if lock is not None: + lock.close() - data = fetch_intf.download() - except KeyboardInterrupt: - return -100, {}, 0 + fetch_abort_function = self._meta.get('fetch_abort_function') + fetch_intf = self._entropy._multiple_url_fetcher( + url_path_list, resume = resume, + abort_check_func = fetch_abort_function, + url_fetcher_class = self._entropy._url_fetcher, + download_context_func = download_context, + pre_download_hook = pre_download_hook, + post_download_hook = post_download_hook) + try: + # make sure that we don't need to abort already + # doing the check here avoids timeouts + if fetch_abort_function != None: + fetch_abort_function() - data_transfer = fetch_intf.get_transfer_rate() - checksum_map = _generate_checksum_map(url_data) - for ck_id in checksum_map: - orig_checksum = checksum_map.get(ck_id) - if orig_checksum != data.get(ck_id): - diff_map[url_path_list[ck_id-1][0]] = orig_checksum + data = fetch_intf.download() + except KeyboardInterrupt: + return -100, {}, 0 - if diff_map: - defval = -1 - for key, val in tuple(diff_map.items()): - if val == UrlFetcher.GENERIC_FETCH_WARN: - diff_map[key] = -2 - elif val == UrlFetcher.TIMEOUT_FETCH_ERROR: - diff_map[key] = -4 - elif val == UrlFetcher.GENERIC_FETCH_ERROR: - diff_map[key] = -3 - elif val == -100: - defval = -100 + failed_map = {} + for download_id, tup in enumerate(url_data, 1): - return defval, diff_map, data_transfer + if download_id in validated_download_ids: + # valid, nothing to do + continue - return 0, diff_map, data_transfer + (_pkg_id, repository_id, _url, + _download_path, _ignore_checksum, signatures) = tup + + # use the outcome returned by download(), it + # contains an error code if download failed. + val = data.get(download_id) + failed_map[url_path_list[download_id - 1][0]] = ( + val, signatures) + + exit_st = 0 + # determine if we got a -100, KeyboardInterrupt + for _key, (val, _signs) in tuple(failed_map.items()): + if val == -100: + exit_st = -100 + break + + return exit_st, failed_map, fetch_intf.get_transfer_rate() def _download_packages(self, download_list): """ @@ -613,7 +713,7 @@ class _PackageMultiFetchAction(_PackageFetchAction): while True: fetch_files_list = [] - for pkg_id, repository_id, fname, cksum, _signs in d_list: + for pkg_id, repository_id, fname, cksum, signs in d_list: best_mirror = get_best_mirror(repository_id) # set working mirror, dont care if its None mirror_status.set_working_mirror(best_mirror) @@ -629,13 +729,26 @@ class _PackageMultiFetchAction(_PackageFetchAction): myuri = os.path.join(best_mirror, fname) pkg_path = self.get_standard_fetch_disk_path(fname) fetch_files_list.append( - (pkg_id, repository_id, myuri, pkg_path, cksum,) + (pkg_id, repository_id, myuri, pkg_path, cksum, signs) ) show_download_summary(d_list) - (exit_st, failed_downloads, - data_transfer) = self._fetch_files( - fetch_files_list, resume = do_resume) + + (edelta_fetch_files_list, data_transfer, + exit_st) = self._try_edelta_multifetch( + fetch_files_list, do_resume) + + if exit_st == 0: + # O(nm) but both lists are very small... + updated_fetch_files_list = [ + x for x in fetch_files_list if x not in + edelta_fetch_files_list] + + if updated_fetch_files_list: + (exit_st, failed_downloads, + data_transfer) = self._download_files( + updated_fetch_files_list, + resume = do_resume) if exit_st == 0: show_successful_download( @@ -675,13 +788,13 @@ class _PackageMultiFetchAction(_PackageFetchAction): return 0, [] - def _fetch(self): + def _fetch_phase(self): """ Execute the fetch phase. """ m_fetch_len = len(self._meta['multi_fetch_list']) / 2 xterm_title = "%s: %s %s" % ( - _("Multi Fetching"), + _("Downloading"), m_fetch_len, ngettext("package", "packages", m_fetch_len), ) @@ -691,7 +804,7 @@ class _PackageMultiFetchAction(_PackageFetchAction): txt = "%s: %s %s" % ( blue(_("Downloading")), darkred("%s" % (m_fetch_len,)), - ngettext("archive", "archives", m_fetch_len), + ngettext("package", "packages", m_fetch_len), ) self._entropy.output( txt, @@ -740,27 +853,3 @@ class _PackageMultiFetchAction(_PackageFetchAction): ) return exit_st - - def _checksum(self): - """ - Execute the checksum verification phase. - """ - m_len = len(self._meta['multi_checksum_list']) - xterm_title = "%s: %s %s" % ( - _("Multi Verification"), - m_len, - ngettext("package", "packages", m_len), - ) - self._entropy.set_title(xterm_title) - - exit_st = 0 - ck_list = self._meta['multi_checksum_list'] - - for (pkg_id, repository_id, download, digest, signatures) in ck_list: - download_path = self.get_standard_fetch_disk_path(download) - exit_st = self._match_checksum( - download_path, repository_id, digest, signatures) - if exit_st != 0: - break - - return exit_st