[entropy.client.package.actions] multifetch: add concurrency support

This commit is contained in:
Fabio Erculiani
2013-12-03 17:58:09 +01:00
parent 1ac0e38115
commit 2e92fddbf5
2 changed files with 310 additions and 215 deletions
@@ -20,7 +20,7 @@ from entropy.const import etpConst, const_debug_write, const_debug_enabled, \
from entropy.client.mirrors import StatusInterface
from entropy.fetchers import UrlFetcher
from entropy.i18n import _
from entropy.output import red, darkred, blue, purple, darkgreen
from entropy.output import red, darkred, blue, purple, darkgreen, brown
from entropy.security import Repository as RepositorySecurity
import entropy.dep
@@ -1059,8 +1059,11 @@ class _PackageFetchAction(PackageAction):
if cmp_func is None:
continue
down_name = os.path.basename(download_path)
self._entropy.output(
"%s: %s" % (
"[%s] %s: %s" % (
brown(down_name),
blue(_("Checking package signature")),
purple(hash_type.upper()),
),
@@ -1073,7 +1076,8 @@ class _PackageFetchAction(PackageAction):
valid = cmp_func(download_path, hash_val)
if valid is None:
self._entropy.output(
"%s '%s' %s" % (
"[%s] %s '%s' %s" % (
brown(down_name),
darkred(_("Package signature verification")),
purple(hash_type.upper()),
darkred(_("temporarily unavailable")),
@@ -1086,7 +1090,8 @@ class _PackageFetchAction(PackageAction):
if not valid:
self._entropy.output(
"%s: %s %s" % (
"[%s] %s: %s %s" % (
brown(down_name),
darkred(_("Package signature")),
purple(hash_type.upper()),
darkred(_("does not match the recorded one")),
@@ -1098,7 +1103,8 @@ class _PackageFetchAction(PackageAction):
return 1
self._entropy.output(
"%s %s" % (
"[%s] %s %s" % (
brown(down_name),
purple(hash_type.upper()),
darkgreen(_("matches")),
),
@@ -9,10 +9,12 @@
B{Entropy Package Manager Client Package Interface}.
"""
import contextlib
import errno
import os
import threading
from entropy.const import etpConst, const_setup_perms
from entropy.const import etpConst, const_setup_perms, const_mkstemp
from entropy.client.mirrors import StatusInterface
from entropy.fetchers import UrlFetcher
from entropy.output import blue, darkblue, bold, red, darkred, brown, darkgreen
@@ -81,64 +83,28 @@ class _PackageMultiFetchAction(_PackageFetchAction):
metadata['matches'] = self._package_matches
temp_fetch_list = []
temp_checksum_list = []
temp_already_downloaded_count = 0
def _setup_download(download, size, package_id, repository_id, digest,
signatures):
inst_repo = self._entropy.installed_repository()
installed_package_id = None
repo = self._entropy.open_repository(repository_id)
key_slot = repo.retrieveKeySlotAggregated(package_id)
if key_slot:
installed_package_id, _inst_rc = inst_repo.atomMatch(key_slot)
obj = (package_id, repository_id, download, digest, signatures)
temp_checksum_list.append(obj)
down_path = self.get_standard_fetch_disk_path(download)
try:
down_st = os.lstat(down_path)
st_size = down_st.st_size
except OSError as err:
if err.errno != errno.ENOENT:
raise
st_size = None
if st_size is not None:
if st_size == size:
return 1
else:
obj = (package_id, repository_id, download, digest, signatures)
temp_fetch_list.append(obj)
return 0
download_list = []
for package_id, repository_id in self._package_matches:
if repository_id.endswith(etpConst['packagesext']):
if self._entropy._is_package_repository(repository_id):
continue
repo = self._entropy.open_repository(repository_id)
atom = repo.retrieveAtom(package_id)
download = repo.retrieveDownloadURL(package_id)
digest = repo.retrieveDigest(package_id)
sha1, sha256, sha512, gpg = repo.retrieveSignatures(package_id)
size = repo.retrieveSize(package_id)
extra_downloads = repo.retrieveExtraDownload(package_id)
signatures = {
'sha1': sha1,
'sha256': sha256,
'sha512': sha512,
'gpg': gpg,
}
temp_already_downloaded_count += _setup_download(
download, size, package_id, repository_id, digest, signatures)
extra_downloads = repo.retrieveExtraDownload(package_id)
obj = (package_id, repository_id, download, digest, signatures)
download_list.append(obj)
splitdebug = metadata['splitdebug']
# if splitdebug is enabled, check if it's also enabled
@@ -153,7 +119,6 @@ class _PackageMultiFetchAction(_PackageFetchAction):
for extra_download in extra_downloads:
download = extra_download['download']
size = extra_download['size']
digest = extra_download['md5']
signatures = {
'sha1': extra_download['sha1'],
@@ -161,22 +126,15 @@ class _PackageMultiFetchAction(_PackageFetchAction):
'sha512': extra_download['sha512'],
'gpg': extra_download['gpg'],
}
temp_already_downloaded_count += _setup_download(
download, size, package_id, repository_id,
digest, signatures)
metadata['multi_fetch_list'] = temp_fetch_list
metadata['multi_checksum_list'] = temp_checksum_list
obj = (package_id, repository_id, download, digest, signatures)
download_list.append(obj)
metadata['multi_fetch_list'] = download_list
metadata['phases'] = []
if metadata['multi_fetch_list']:
metadata['phases'].append(self._fetch)
if metadata['multi_checksum_list']:
metadata['phases'].append(self._checksum)
if temp_already_downloaded_count == len(temp_checksum_list):
metadata['phases'].reverse()
metadata['phases'].append(self._fetch_phase)
self._meta = metadata
@@ -193,47 +151,92 @@ class _PackageMultiFetchAction(_PackageFetchAction):
break
return exit_st
def _setup_url_directories(self, url_data):
"""
Create the directories needed to download the files in url_data.
"""
for _pkg_id, _repository_id, _url, dest_path, _cksum, _sig in url_data:
dest_dir = os.path.dirname(dest_path)
try:
os.makedirs(dest_dir, 0o775)
const_setup_perms(dest_dir, etpConst['entropygid'])
except OSError as err:
if err.errno != errno.EEXIST:
raise
def _try_edelta_multifetch(self, url_data, resume):
"""
Attempt to download and use the edelta file.
"""
# no edelta support enabled
if not self._meta.get('edelta_support'):
return [], 0.0, False
return [], 0.0, 0
if not entropy.tools.is_entropy_delta_available():
return [], 0.0, False
return [], 0.0, 0
self._setup_url_directories(url_data)
edelta_approvals = []
inst_repo = self._entropy.installed_repository()
with inst_repo.shared():
for (pkg_id, repository_id, url, download_path,
cksum, signs) in url_data:
repo = self._entropy.open_repository(repository_id)
if cksum is None:
# cannot setup edelta without checksum, get from repository
cksum = repo.retrieveDigest(pkg_id)
if cksum is None:
# still nothing
continue
key_slot = repo.retrieveKeySlotAggregated(pkg_id)
if key_slot is None:
# wtf corrupted entry, skip
continue
installed_package_id, _inst_rc = inst_repo.atomMatch(key_slot)
if installed_package_id == -1:
# package is not installed
continue
installed_url = inst_repo.retrieveDownloadURL(
installed_package_id)
installed_checksum = inst_repo.retrieveDigest(
installed_package_id)
installed_download_path = self.get_standard_fetch_disk_path(
installed_url)
if installed_download_path == download_path:
# collision between what we need locally and what we need
# remotely, definitely edelta fetch is not going to work.
# Abort here.
continue
edelta_approvals.append(
(pkg_id, repository_id,
url, cksum, signs, download_path, installed_url,
installed_checksum, installed_download_path))
if not edelta_approvals:
return [], 0.0, 0
url_path_list = []
url_data_map = {}
url_data_map_idx = 0
inst_repo = self._entropy.installed_repository()
for pkg_id, repository_id, url, dest_path, cksum in url_data:
repo = self._entropy.open_repository(repository_id)
if cksum is None:
# cannot setup edelta without checksum, get from repository
cksum = repo.retrieveDigest(pkg_id)
if cksum is None:
# still nothing
continue
for tup in edelta_approvals:
key_slot = repo.retrieveKeySlotAggregated(pkg_id)
if key_slot is None:
# wtf corrupted entry, skip
continue
(pkg_id, repository_id, url,
cksum, signs, download_path, installed_url,
installed_checksum, installed_download_path) = tup
installed_package_id, _inst_rc = inst_repo.atomMatch(key_slot)
if installed_package_id == -1:
# package is not installed
continue
installed_url = inst_repo.retrieveDownloadURL(
installed_package_id)
installed_checksum = inst_repo.retrieveDigest(
installed_package_id)
installed_download_path = self.get_standard_fetch_disk_path(
installed_url)
# installed_download_path is read in a fault-tolerant mode
# so, there is no need for locking.
edelta_download_path = download_path
edelta_download_path += etpConst['packagesdeltaext']
edelta_url = self._approve_edelta_unlocked(
url, cksum, installed_url, installed_checksum,
@@ -242,23 +245,55 @@ class _PackageMultiFetchAction(_PackageFetchAction):
# no edelta support
continue
edelta_save_path = dest_path + etpConst['packagesdeltaext']
key = (edelta_url, edelta_save_path)
key = (edelta_url, edelta_download_path)
url_path_list.append(key)
url_data_map_idx += 1
url_data_map[url_data_map_idx] = (
pkg_id, repository_id, url,
dest_path, cksum, edelta_url,
edelta_save_path, installed_download_path)
download_path, cksum, signs, edelta_url,
edelta_download_path, installed_download_path)
if not url_path_list:
# no martini, no party!
return [], 0.0, False
return [], 0.0, 0
return self._try_edelta_multifetch_internal(
url_path_list, url_data_map, resume)
def _try_edelta_multifetch_internal(self, url_path_list,
url_data_map, resume):
"""
_try_edelta_multifetch(), assuming that the relevant file locks
are held.
"""
@contextlib.contextmanager
def download_context(path):
lock = None
try:
lock = self.path_lock(path)
with lock.exclusive():
yield
finally:
if lock is not None:
lock.close()
def pre_download_hook(path):
# assume that, if path is available, it's been
# downloaded already. checksum verification will
# happen afterwards.
if self._stat_path(path):
# this is not None and not an established
# UrlFetcher return code code
return self
fetch_abort_function = self._meta.get('fetch_abort_function')
fetch_intf = self._entropy._multiple_url_fetcher(url_path_list,
resume = resume, abort_check_func = fetch_abort_function,
url_fetcher_class = self._entropy._url_fetcher)
fetch_intf = self._entropy._multiple_url_fetcher(
url_path_list, resume = resume,
abort_check_func = fetch_abort_function,
url_fetcher_class = self._entropy._url_fetcher,
download_context_func = download_context,
pre_download_hook = pre_download_hook)
try:
# make sure that we don't need to abort already
# doing the check here avoids timeouts
@@ -267,7 +302,7 @@ class _PackageMultiFetchAction(_PackageFetchAction):
data = fetch_intf.download()
except KeyboardInterrupt:
return [], 0.0, True
return [], 0.0, -100
data_transfer = fetch_intf.get_transfer_rate()
fetch_errors = (
@@ -284,31 +319,53 @@ class _PackageMultiFetchAction(_PackageFetchAction):
continue
(pkg_id, repository_id, url, dest_path,
orig_cksum, edelta_url, edelta_save_path,
installed_fetch_path) = url_data_map[url_data_map_idx]
orig_cksum, _signs, _edelta_url, edelta_download_path,
installed_download_path) = url_data_map[url_data_map_idx]
# now check
tmp_dest_path = dest_path + ".edelta_pkg_tmp"
# yay, we can apply the delta and cook the new package file!
dest_path_dir = os.path.dirname(dest_path)
lock = None
try:
entropy.tools.apply_entropy_delta(installed_fetch_path,
edelta_save_path, tmp_dest_path)
except IOError:
# give up with this edelta
try:
os.remove(tmp_dest_path)
except (OSError, IOError):
pass
continue
lock = self.path_lock(edelta_download_path)
with lock.shared():
os.rename(tmp_dest_path, dest_path)
valid_idxs.append(url_data_map_idx)
tmp_fd, tmp_path = None, None
try:
tmp_fd, tmp_path = const_mkstemp(
dir=dest_path_dir, suffix=".edelta_pkg_tmp")
try:
entropy.tools.apply_entropy_delta(
installed_download_path, # best effort read
edelta_download_path, # shared lock
tmp_path) # atomically created path
except IOError:
continue
os.rename(tmp_path, dest_path)
valid_idxs.append(url_data_map_idx)
finally:
if tmp_fd is not None:
try:
os.close(tmp_fd)
except OSError:
pass
if tmp_path is not None:
try:
os.remove(tmp_path)
except OSError:
pass
finally:
if lock is not None:
lock.close()
fetched_url_data = []
for url_data_map_idx in valid_idxs:
(pkg_id, repository_id, url, dest_path,
orig_cksum, edelta_url, edelta_save_path,
installed_fetch_path) = url_data_map[url_data_map_idx]
orig_cksum, signs, _edelta_url, edelta_download_path,
installed_download_path) = url_data_map[url_data_map_idx]
try:
valid = entropy.tools.compare_md5(dest_path, orig_cksum)
@@ -316,95 +373,138 @@ class _PackageMultiFetchAction(_PackageFetchAction):
valid = False
if valid:
url_data_item = (pkg_id, repository_id, url,
dest_path, orig_cksum)
url_data_item = (
pkg_id, repository_id, url,
dest_path, orig_cksum, signs
)
fetched_url_data.append(url_data_item)
return fetched_url_data, data_transfer, False
return fetched_url_data, data_transfer, 0
def _fetch_files(self, url_data_list, resume = True):
def _download_files(self, url_data, resume = True):
"""
Effectively fetch the package files.
"""
self._setup_url_directories(url_data)
def _generate_checksum_map(url_data):
ck_map = {}
ck_map_id = 0
for _pkg_id, _repository_id, _url, _dest_path, cksum in url_data:
ck_map_id += 1
if cksum is not None:
ck_map[ck_map_id] = cksum
return ck_map
@contextlib.contextmanager
def download_context(path):
lock = None
try:
lock = self.path_lock(path)
with lock.exclusive():
yield # hooks running inside here
finally:
if lock is not None:
lock.close()
fetch_abort_function = self._meta.get('fetch_abort_function')
# avoid tainting data pointed by url_data_list
url_data = url_data_list[:]
diff_map = {}
# set of paths that have been verified and don't need any
# firther match_checksum() call.
validated_download_ids_lock = threading.Lock()
validated_download_ids = set()
# setup directories
for pkg_id, repository_id, url, dest_path, _cksum in url_data:
dest_dir = os.path.dirname(dest_path)
if not os.path.isdir(dest_dir):
os.makedirs(dest_dir, 0o775)
const_setup_perms(dest_dir, etpConst['entropygid'])
# Note: the following two hooks are running in separate threads.
checksum_map = _generate_checksum_map(url_data)
fetched_url_data, data_transfer, abort = self._try_edelta_multifetch(
url_data, resume)
if abort:
return -100, {}, 0
def pre_download_hook(path, download_id):
path_data = url_data[download_id - 1]
(_hook_package_id, hook_repository_id, _hook_url,
hook_download_path, hook_cksum, hook_signs) = path_data
for url_data_item in fetched_url_data:
url_data.remove(url_data_item)
if self._stat_path(path):
verify_st = self._match_checksum(
hook_download_path,
hook_repository_id,
hook_cksum,
hook_signs)
if verify_st == 0:
# UrlFetcher returns the md5 checksum on success
with validated_download_ids_lock:
validated_download_ids.add(download_id)
return hook_cksum
# some packages haven't been downloaded using edelta
if url_data:
# request the download
return None
url_path_list = []
for pkg_id, repository_id, url, dest_path, _cksum in url_data:
url_path_list.append((url, dest_path,))
self._setup_differential_download(
self._entropy._multiple_url_fetcher, url, resume, dest_path,
def post_download_hook(_path, _status, download_id):
path_data = url_data[download_id - 1]
(_hook_package_id, hook_repository_id, _hook_url,
hook_download_path, hook_cksum, hook_signs) = path_data
with validated_download_ids_lock:
if hook_download_path in validated_download_ids:
# nothing to check, path already verified
return
verify_st = self._match_checksum(
hook_download_path,
hook_repository_id,
hook_cksum,
hook_signs)
if verify_st == 0:
with validated_download_ids_lock:
validated_download_ids.add(download_id)
url_path_list = []
for pkg_id, repository_id, url, download_path, _cksum, _sig in url_data:
url_path_list.append((url, download_path))
lock = None
try:
# hold a lock against the download path, like fetch.py does.
lock = self.path_lock(download_path)
with lock.exclusive():
self._setup_differential_download(
self._entropy._multiple_url_fetcher, url,
resume, download_path,
repository_id, pkg_id)
# load class
fetch_intf = self._entropy._multiple_url_fetcher(
url_path_list, resume = resume,
abort_check_func = fetch_abort_function,
url_fetcher_class = self._entropy._url_fetcher,
checksum = True)
try:
# make sure that we don't need to abort already
# doing the check here avoids timeouts
if fetch_abort_function != None:
fetch_abort_function()
finally:
if lock is not None:
lock.close()
data = fetch_intf.download()
except KeyboardInterrupt:
return -100, {}, 0
fetch_abort_function = self._meta.get('fetch_abort_function')
fetch_intf = self._entropy._multiple_url_fetcher(
url_path_list, resume = resume,
abort_check_func = fetch_abort_function,
url_fetcher_class = self._entropy._url_fetcher,
download_context_func = download_context,
pre_download_hook = pre_download_hook,
post_download_hook = post_download_hook)
try:
# make sure that we don't need to abort already
# doing the check here avoids timeouts
if fetch_abort_function != None:
fetch_abort_function()
data_transfer = fetch_intf.get_transfer_rate()
checksum_map = _generate_checksum_map(url_data)
for ck_id in checksum_map:
orig_checksum = checksum_map.get(ck_id)
if orig_checksum != data.get(ck_id):
diff_map[url_path_list[ck_id-1][0]] = orig_checksum
data = fetch_intf.download()
except KeyboardInterrupt:
return -100, {}, 0
if diff_map:
defval = -1
for key, val in tuple(diff_map.items()):
if val == UrlFetcher.GENERIC_FETCH_WARN:
diff_map[key] = -2
elif val == UrlFetcher.TIMEOUT_FETCH_ERROR:
diff_map[key] = -4
elif val == UrlFetcher.GENERIC_FETCH_ERROR:
diff_map[key] = -3
elif val == -100:
defval = -100
failed_map = {}
for download_id, tup in enumerate(url_data, 1):
return defval, diff_map, data_transfer
if download_id in validated_download_ids:
# valid, nothing to do
continue
return 0, diff_map, data_transfer
(_pkg_id, repository_id, _url,
_download_path, _ignore_checksum, signatures) = tup
# use the outcome returned by download(), it
# contains an error code if download failed.
val = data.get(download_id)
failed_map[url_path_list[download_id - 1][0]] = (
val, signatures)
exit_st = 0
# determine if we got a -100, KeyboardInterrupt
for _key, (val, _signs) in tuple(failed_map.items()):
if val == -100:
exit_st = -100
break
return exit_st, failed_map, fetch_intf.get_transfer_rate()
def _download_packages(self, download_list):
"""
@@ -613,7 +713,7 @@ class _PackageMultiFetchAction(_PackageFetchAction):
while True:
fetch_files_list = []
for pkg_id, repository_id, fname, cksum, _signs in d_list:
for pkg_id, repository_id, fname, cksum, signs in d_list:
best_mirror = get_best_mirror(repository_id)
# set working mirror, dont care if its None
mirror_status.set_working_mirror(best_mirror)
@@ -629,13 +729,26 @@ class _PackageMultiFetchAction(_PackageFetchAction):
myuri = os.path.join(best_mirror, fname)
pkg_path = self.get_standard_fetch_disk_path(fname)
fetch_files_list.append(
(pkg_id, repository_id, myuri, pkg_path, cksum,)
(pkg_id, repository_id, myuri, pkg_path, cksum, signs)
)
show_download_summary(d_list)
(exit_st, failed_downloads,
data_transfer) = self._fetch_files(
fetch_files_list, resume = do_resume)
(edelta_fetch_files_list, data_transfer,
exit_st) = self._try_edelta_multifetch(
fetch_files_list, do_resume)
if exit_st == 0:
# O(nm) but both lists are very small...
updated_fetch_files_list = [
x for x in fetch_files_list if x not in
edelta_fetch_files_list]
if updated_fetch_files_list:
(exit_st, failed_downloads,
data_transfer) = self._download_files(
updated_fetch_files_list,
resume = do_resume)
if exit_st == 0:
show_successful_download(
@@ -675,13 +788,13 @@ class _PackageMultiFetchAction(_PackageFetchAction):
return 0, []
def _fetch(self):
def _fetch_phase(self):
"""
Execute the fetch phase.
"""
m_fetch_len = len(self._meta['multi_fetch_list']) / 2
xterm_title = "%s: %s %s" % (
_("Multi Fetching"),
_("Downloading"),
m_fetch_len,
ngettext("package", "packages", m_fetch_len),
)
@@ -691,7 +804,7 @@ class _PackageMultiFetchAction(_PackageFetchAction):
txt = "%s: %s %s" % (
blue(_("Downloading")),
darkred("%s" % (m_fetch_len,)),
ngettext("archive", "archives", m_fetch_len),
ngettext("package", "packages", m_fetch_len),
)
self._entropy.output(
txt,
@@ -740,27 +853,3 @@ class _PackageMultiFetchAction(_PackageFetchAction):
)
return exit_st
def _checksum(self):
"""
Execute the checksum verification phase.
"""
m_len = len(self._meta['multi_checksum_list'])
xterm_title = "%s: %s %s" % (
_("Multi Verification"),
m_len,
ngettext("package", "packages", m_len),
)
self._entropy.set_title(xterm_title)
exit_st = 0
ck_list = self._meta['multi_checksum_list']
for (pkg_id, repository_id, download, digest, signatures) in ck_list:
download_path = self.get_standard_fetch_disk_path(download)
exit_st = self._match_checksum(
download_path, repository_id, digest, signatures)
if exit_st != 0:
break
return exit_st