Package entropy :: Package client :: Package interfaces :: Module fetch

Source Code for Module entropy.client.interfaces.fetch

  1  # -*- coding: utf-8 -*- 
  2  """ 
  3   
  4      @author: Fabio Erculiani <lxnay@sabayonlinux.org> 
  5      @contact: lxnay@sabayonlinux.org 
  6      @copyright: Fabio Erculiani 
  7      @license: GPL-2 
  8   
  9      B{Entropy Package Manager Client Packages retrieval Interface}. 
 10   
 11  """ 
 12  from __future__ import with_statement 
 13  import os 
 14  from entropy.i18n import _ 
 15  from entropy.const import * 
 16  from entropy.exceptions import * 
 17  from entropy.output import purple, bold, red, blue, darkgreen, darkred, brown, darkblue 
 18   
19 -class FetchersMixin:
20
21 - def check_needed_package_download(self, filepath, checksum = None):
22 # is the file available 23 if os.path.isfile(etpConst['entropyworkdir']+"/"+filepath): 24 if checksum is None: 25 return 0 26 else: 27 # check digest 28 md5res = self.entropyTools.compare_md5(etpConst['entropyworkdir']+"/"+filepath,checksum) 29 if (md5res): 30 return 0 31 else: 32 return -2 33 else: 34 return -1
35
36 - def fetch_files(self, url_data_list, checksum = True, resume = True, fetch_file_abort_function = None):
37 """ 38 Fetch multiple files simultaneously on URLs. 39 40 @param url_data_list list 41 [(url,dest_path [or None],checksum ['ab86fff46f6ec0f4b1e0a2a4a82bf323' or None],branch,),..] 42 @param digest bool, digest check (checksum) 43 @param resume bool enable resume support 44 @param fetch_file_abort_function callable method that could raise exceptions 45 @return general_status_code, {'url': (status_code,checksum,resumed,)}, data_transfer 46 """ 47 pkgs_bindir = etpConst['packagesbindir'] 48 url_path_list = [] 49 checksum_map = {} 50 count = 0 51 for url, dest_path, cksum, branch in url_data_list: 52 count += 1 53 filename = os.path.basename(url) 54 if dest_path == None: 55 dest_path = os.path.join(pkgs_bindir,branch,filename,) 56 57 dest_dir = os.path.dirname(dest_path) 58 if not os.path.isdir(dest_dir): 59 os.makedirs(dest_dir,0755) 60 61 url_path_list.append((url,dest_path,)) 62 if cksum != None: checksum_map[count] = cksum 63 64 # load class 65 fetchConn = self.MultipleUrlFetcher(url_path_list, resume = resume, 66 abort_check_func = fetch_file_abort_function, OutputInterface = self, 67 UrlFetcherClass = self.urlFetcher, checksum = checksum) 68 try: 69 data = fetchConn.download() 70 except KeyboardInterrupt: 71 return -100, {}, 0 72 73 diff_map = {} 74 if checksum_map and checksum: # verify checksums 75 diff_map = dict((url_path_list[x-1][0],checksum_map.get(x)) for x in checksum_map \ 76 if checksum_map.get(x) != data.get(x)) 77 78 data_transfer = fetchConn.get_data_transfer() 79 if diff_map: 80 defval = -1 81 for key, val in diff_map.items(): 82 if val == "-1": # general error 83 diff_map[key] = -1 84 elif val == "-2": 85 diff_map[key] = -2 86 elif val == "-4": # timeout 87 diff_map[key] = -4 88 elif val == "-3": # not found 89 diff_map[key] = -3 90 elif val == -100: 91 defval = -100 92 return defval, diff_map, data_transfer 93 94 return 0, diff_map, data_transfer
95
96 - def fetch_files_on_mirrors(self, download_list, checksum = False, fetch_abort_function = None):
97 """ 98 @param download_map list [(repository,branch,filename,checksum (digest),signatures,),..] 99 @param checksum bool verify checksum? 100 @param fetch_abort_function callable method that could raise exceptions 101 """ 102 repo_uris = dict(((x[0],self.SystemSettings['repositories']['available'][x[0]]['packages'][::-1],) for x in download_list)) 103 remaining = repo_uris.copy() 104 my_download_list = download_list[:] 105 106 def get_best_mirror(repository): 107 try: 108 return remaining[repository][0] 109 except IndexError: 110 return None
111 112 def update_download_list(down_list, failed_down): 113 newlist = [] 114 for repo, branch, fname, cksum, signatures in down_list: 115 myuri = get_best_mirror(repo) 116 myuri = os.path.join(myuri,fname) 117 if myuri not in failed_down: 118 continue 119 newlist.append((repo,branch,fname,cksum,signatures,)) 120 return newlist
121 122 # return True: for failing, return False: for fine 123 def mirror_fail_check(repository, best_mirror): 124 # check if uri is sane 125 if not self.MirrorStatus.get_failing_mirror_status(best_mirror) >= 30: 126 return False 127 # set to 30 for convenience 128 self.MirrorStatus.set_failing_mirror_status(best_mirror, 30) 129 mirrorcount = repo_uris[repo].index(best_mirror)+1 130 mytxt = "( mirror #%s ) " % (mirrorcount,) 131 mytxt += blue(" %s: ") % (_("Mirror"),) 132 mytxt += red(self.entropyTools.spliturl(best_mirror)[1]) 133 mytxt += " - %s." % (_("maximum failure threshold reached"),) 134 self.updateProgress( 135 mytxt, 136 importance = 1, 137 type = "warning", 138 header = red(" ## ") 139 ) 140 141 if self.MirrorStatus.get_failing_mirror_status(best_mirror) == 30: 142 self.MirrorStatus.add_failing_mirror(best_mirror,45) 143 elif self.MirrorStatus.get_failing_mirror_status(best_mirror) > 31: 144 self.MirrorStatus.add_failing_mirror(best_mirror,-4) 145 else: 146 self.MirrorStatus.set_failing_mirror_status(best_mirror, 0) 147 148 remaining[repository].discard(best_mirror) 149 return True 150 151 def show_download_summary(down_list): 152 # fetch_files_list.append((myuri,None,cksum,branch,)) 153 for repo, branch, fname, cksum, signatures in down_list: 154 best_mirror = get_best_mirror(repo) 155 mirrorcount = repo_uris[repo].index(best_mirror)+1 156 mytxt = "( mirror #%s ) " % (mirrorcount,) 157 basef = os.path.basename(fname) 158 mytxt += "[%s] %s " % (brown(basef),blue("@"),) 159 mytxt += red(self.entropyTools.spliturl(best_mirror)[1]) 160 # now fetch the new one 161 self.updateProgress( 162 mytxt, 163 importance = 1, 164 type = "info", 165 header = red(" ## ") 166 ) 167 168 def show_successful_download(down_list, data_transfer): 169 for repo, branch, fname, cksum, signatures in down_list: 170 best_mirror = get_best_mirror(repo) 171 mirrorcount = repo_uris[repo].index(best_mirror)+1 172 mytxt = "( mirror #%s ) " % (mirrorcount,) 173 basef = os.path.basename(fname) 174 mytxt += "[%s] %s %s " % (brown(basef),darkred(_("success")),blue("@"),) 175 mytxt += red(self.entropyTools.spliturl(best_mirror)[1]) 176 self.updateProgress( 177 mytxt, 178 importance = 1, 179 type = "info", 180 header = red(" ## ") 181 ) 182 mytxt = " %s: %s%s%s" % ( 183 blue(_("Aggregated transfer rate")), 184 bold(self.entropyTools.bytes_into_human(data_transfer)), 185 darkred("/"), 186 darkblue(_("second")), 187 ) 188 self.updateProgress( 189 mytxt, 190 importance = 1, 191 type = "info", 192 header = red(" ## ") 193 ) 194 195 def show_download_error(down_list, rc): 196 for repo, branch, fname, cksum, signatures in down_list: 197 best_mirror = get_best_mirror(repo) 198 mirrorcount = repo_uris[repo].index(best_mirror)+1 199 mytxt = "( mirror #%s ) " % (mirrorcount,) 200 mytxt += blue("%s: %s") % ( 201 _("Error downloading from"), 202 red(self.entropyTools.spliturl(best_mirror)[1]), 203 ) 204 if rc == -1: 205 mytxt += " - %s." % (_("data not available on this mirror"),) 206 elif rc == -2: 207 self.MirrorStatus.add_failing_mirror(best_mirror,1) 208 mytxt += " - %s." % (_("wrong checksum"),) 209 elif rc == -3: 210 mytxt += " - %s." % (_("not found"),) 211 elif rc == -4: # timeout! 212 mytxt += " - %s." % (_("timeout error"),) 213 elif rc == -100: 214 mytxt += " - %s." % (_("discarded download"),) 215 else: 216 self.MirrorStatus.add_failing_mirror(best_mirror, 5) 217 mytxt += " - %s." % (_("unknown reason"),) 218 self.updateProgress( 219 mytxt, 220 importance = 1, 221 type = "warning", 222 header = red(" ## ") 223 ) 224 225 def remove_failing_mirrors(repos): 226 for repo in repos: 227 best_mirror = get_best_mirror(repo) 228 if remaining[repo]: 229 remaining[repo].pop(0) 230 231 def check_remaining_mirror_failure(repos): 232 return [x for x in repos if not remaining.get(x)] 233 234 while 1: 235 236 do_resume = True 237 timeout_try_count = 50 238 while 1: 239 240 fetch_files_list = [] 241 for repo, branch, fname, cksum, signatures in my_download_list: 242 best_mirror = get_best_mirror(repo) 243 # set working mirror, dont care if its None 244 self.MirrorStatus.set_working_mirror(best_mirror) 245 if best_mirror != None: 246 mirror_fail_check(repo, best_mirror) 247 best_mirror = get_best_mirror(repo) 248 if best_mirror == None: 249 # at least one package failed to download 250 # properly, give up with everything 251 return 3, my_download_list 252 myuri = os.path.join(best_mirror,fname) 253 fetch_files_list.append((myuri,None,cksum,branch,)) 254 255 try: 256 257 show_download_summary(my_download_list) 258 rc, failed_downloads, data_transfer = self.fetch_files( 259 fetch_files_list, checksum = checksum, 260 fetch_file_abort_function = fetch_abort_function, 261 resume = do_resume 262 ) 263 if rc == 0: 264 show_successful_download(my_download_list, data_transfer) 265 return 0, [] 266 267 # update my_download_list 268 my_download_list = update_download_list(my_download_list,failed_downloads) 269 if rc not in (-3,-4,-100,) and failed_downloads and do_resume: 270 # disable resume 271 do_resume = False 272 continue 273 else: 274 show_download_error(my_download_list, rc) 275 if rc == -4: # timeout 276 timeout_try_count -= 1 277 if timeout_try_count > 0: 278 continue 279 elif rc == -100: # user discarded fetch 280 return 1, [] 281 myrepos = set([x[0] for x in my_download_list]) 282 remove_failing_mirrors(myrepos) 283 # make sure we don't have nasty issues 284 remaining_failure = check_remaining_mirror_failure(myrepos) 285 if remaining_failure: 286 return 3, my_download_list 287 break 288 except KeyboardInterrupt: 289 return 1, [] 290 return 0, [] 291 292
293 - def fetch_file(self, url, branch, digest = None, resume = True, fetch_file_abort_function = None, filepath = None):
294 295 def do_stfu_rm(xpath): 296 try: 297 os.remove(xpath) 298 except OSError: 299 pass
300 301 filename = os.path.basename(url) 302 if not filepath: 303 filepath = os.path.join(etpConst['packagesbindir'],branch,filename) 304 filepath_dir = os.path.dirname(filepath) 305 if not os.path.isdir(filepath_dir): 306 os.makedirs(filepath_dir,0755) 307 308 existed_before = False 309 if os.path.isfile(filepath) and os.path.exists(filepath): 310 existed_before = True 311 312 # load class 313 fetchConn = self.urlFetcher(url, filepath, resume = resume, 314 abort_check_func = fetch_file_abort_function, OutputInterface = self) 315 fetchConn.progress = self.progress 316 317 # start to download 318 data_transfer = 0 319 resumed = False 320 try: 321 fetchChecksum = fetchConn.download() 322 data_transfer = fetchConn.get_transfer_rate() 323 resumed = fetchConn.is_resumed() 324 except KeyboardInterrupt: 325 return -100, data_transfer, resumed 326 except NameError: 327 raise 328 except: 329 if etpUi['debug']: 330 self.updateProgress( 331 "fetch_file:", 332 importance = 1, 333 type = "warning", 334 header = red(" ## ") 335 ) 336 self.entropyTools.print_traceback() 337 if not existed_before: 338 do_stfu_rm(filepath) 339 return -1, data_transfer, resumed 340 if fetchChecksum == "-3": 341 # not found 342 return -3, data_transfer, resumed 343 elif fetchChecksum == "-4": 344 # timeout 345 return -4, data_transfer, resumed 346 347 del fetchConn 348 349 if digest and (fetchChecksum != digest): 350 # not properly downloaded 351 if not existed_before: 352 do_stfu_rm(filepath) 353 return -2, data_transfer, resumed 354 355 return 0, data_transfer, resumed 356 357
358 - def fetch_file_on_mirrors(self, repository, branch, filename, 359 digest = False, fetch_abort_function = None):
360 361 uris = self.SystemSettings['repositories']['available'][repository]['packages'][::-1] 362 remaining = set(uris) 363 364 mirrorcount = 0 365 for uri in uris: 366 367 if not remaining: 368 # tried all the mirrors, quitting for error 369 self.MirrorStatus.set_working_mirror(None) 370 return 3 371 372 self.MirrorStatus.set_working_mirror(uri) 373 mirrorcount += 1 374 mirrorCountText = "( mirror #%s ) " % (mirrorcount,) 375 url = uri+"/"+filename 376 377 # check if uri is sane 378 if self.MirrorStatus.get_failing_mirror_status(uri) >= 30: 379 # ohohoh! 380 # set to 30 for convenience 381 self.MirrorStatus.set_failing_mirror_status(uri, 30) 382 mytxt = mirrorCountText 383 mytxt += blue(" %s: ") % (_("Mirror"),) 384 mytxt += red(self.entropyTools.spliturl(uri)[1]) 385 mytxt += " - %s." % (_("maximum failure threshold reached"),) 386 self.updateProgress( 387 mytxt, 388 importance = 1, 389 type = "warning", 390 header = red(" ## ") 391 ) 392 393 if self.MirrorStatus.get_failing_mirror_status(uri) == 30: 394 # put to 75 then decrement by 4 so we 395 # won't reach 30 anytime soon ahahaha 396 self.MirrorStatus.add_failing_mirror(uri,45) 397 elif self.MirrorStatus.get_failing_mirror_status(uri) > 31: 398 # now decrement each time this point is reached, 399 # if will be back < 30, then equo will try to use it again 400 self.MirrorStatus.add_failing_mirror(uri,-4) 401 else: 402 # put to 0 - reenable mirror, welcome back uri! 403 self.MirrorStatus.set_failing_mirror_status(uri, 0) 404 405 remaining.discard(uri) 406 continue 407 408 do_resume = True 409 timeout_try_count = 50 410 while 1: 411 try: 412 mytxt = mirrorCountText 413 mytxt += blue("%s: ") % (_("Downloading from"),) 414 mytxt += red(self.entropyTools.spliturl(uri)[1]) 415 # now fetch the new one 416 self.updateProgress( 417 mytxt, 418 importance = 1, 419 type = "warning", 420 header = red(" ## ") 421 ) 422 rc, data_transfer, resumed = self.fetch_file( 423 url, 424 branch, 425 digest, 426 do_resume, 427 fetch_file_abort_function = fetch_abort_function 428 ) 429 if rc == 0: 430 mytxt = mirrorCountText 431 mytxt += blue("%s: ") % (_("Successfully downloaded from"),) 432 mytxt += red(self.entropyTools.spliturl(uri)[1]) 433 mytxt += " %s %s/%s" % (_("at"),self.entropyTools.bytes_into_human(data_transfer),_("second"),) 434 self.updateProgress( 435 mytxt, 436 importance = 1, 437 type = "info", 438 header = red(" ## ") 439 ) 440 441 self.MirrorStatus.set_working_mirror(None) 442 return 0 443 elif resumed and (rc not in (-3,-4,-100,)): 444 do_resume = False 445 continue 446 else: 447 error_message = mirrorCountText 448 error_message += blue("%s: %s") % ( 449 _("Error downloading from"), 450 red(self.entropyTools.spliturl(uri)[1]), 451 ) 452 # something bad happened 453 if rc == -1: 454 error_message += " - %s." % (_("file not available on this mirror"),) 455 elif rc == -2: 456 self.MirrorStatus.add_failing_mirror(uri,1) 457 error_message += " - %s." % (_("wrong checksum"),) 458 elif rc == -3: 459 error_message += " - %s." % (_("not found"),) 460 elif rc == -4: # timeout! 461 timeout_try_count -= 1 462 if timeout_try_count > 0: 463 error_message += " - %s." % (_("timeout, retrying on this mirror"),) 464 else: 465 error_message += " - %s." % (_("timeout, giving up"),) 466 elif rc == -100: 467 error_message += " - %s." % (_("discarded download"),) 468 else: 469 self.MirrorStatus.add_failing_mirror(uri, 5) 470 error_message += " - %s." % (_("unknown reason"),) 471 self.updateProgress( 472 error_message, 473 importance = 1, 474 type = "warning", 475 header = red(" ## ") 476 ) 477 if rc == -4: # timeout 478 if timeout_try_count > 0: 479 continue 480 elif rc == -100: # user discarded fetch 481 self.MirrorStatus.set_working_mirror(None) 482 return 1 483 remaining.discard(uri) 484 # make sure we don't have nasty issues 485 if not remaining: 486 self.MirrorStatus.set_working_mirror(None) 487 return 3 488 break 489 except KeyboardInterrupt: 490 self.MirrorStatus.set_working_mirror(None) 491 return 1 492 493 self.MirrorStatus.set_working_mirror(None) 494 return 0
495