Package entropy :: Package client :: Package interfaces :: Module fetch

Source Code for Module entropy.client.interfaces.fetch

  1  # -*- coding: utf-8 -*- 
  2  ''' 
  3      # DESCRIPTION: 
  4      # Entropy Object Oriented Interface 
  5   
  6      Copyright (C) 2007-2009 Fabio Erculiani 
  7   
  8      This program is free software; you can redistribute it and/or modify 
  9      it under the terms of the GNU General Public License as published by 
 10      the Free Software Foundation; either version 2 of the License, or 
 11      (at your option) any later version. 
 12   
 13      This program is distributed in the hope that it will be useful, 
 14      but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16      GNU General Public License for more details. 
 17   
 18      You should have received a copy of the GNU General Public License 
 19      along with this program; if not, write to the Free Software 
 20      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 21  ''' 
 22  from __future__ import with_statement 
 23  import os 
 24  from entropy.i18n import _ 
 25  from entropy.const import * 
 26  from entropy.exceptions import * 
 27  from entropy.output import purple, bold, red, blue, darkgreen, darkred, brown, darkblue 
 28   
29 -class FetchersMixin:
30
31 - def check_needed_package_download(self, filepath, checksum = None):
32 # is the file available 33 if os.path.isfile(etpConst['entropyworkdir']+"/"+filepath): 34 if checksum is None: 35 return 0 36 else: 37 # check digest 38 md5res = self.entropyTools.compare_md5(etpConst['entropyworkdir']+"/"+filepath,checksum) 39 if (md5res): 40 return 0 41 else: 42 return -2 43 else: 44 return -1
45
46 - def fetch_files(self, url_data_list, checksum = True, resume = True, fetch_file_abort_function = None):
47 """ 48 Fetch multiple files simultaneously on URLs. 49 50 @param url_data_list list 51 [(url,dest_path [or None],checksum ['ab86fff46f6ec0f4b1e0a2a4a82bf323' or None],branch,),..] 52 @param digest bool, digest check (checksum) 53 @param resume bool enable resume support 54 @param fetch_file_abort_function callable method that could raise exceptions 55 @return general_status_code, {'url': (status_code,checksum,resumed,)}, data_transfer 56 """ 57 pkgs_bindir = etpConst['packagesbindir'] 58 url_path_list = [] 59 checksum_map = {} 60 count = 0 61 for url, dest_path, cksum, branch in url_data_list: 62 count += 1 63 filename = os.path.basename(url) 64 if dest_path == None: 65 dest_path = os.path.join(pkgs_bindir,branch,filename,) 66 67 dest_dir = os.path.dirname(dest_path) 68 if not os.path.isdir(dest_dir): 69 os.makedirs(dest_dir,0755) 70 71 url_path_list.append((url,dest_path,)) 72 if cksum != None: checksum_map[count] = cksum 73 74 # load class 75 fetchConn = self.MultipleUrlFetcher(url_path_list, resume = resume, 76 abort_check_func = fetch_file_abort_function, OutputInterface = self, 77 UrlFetcherClass = self.urlFetcher, checksum = checksum) 78 try: 79 data = fetchConn.download() 80 except KeyboardInterrupt: 81 return -100, {}, 0 82 83 diff_map = {} 84 if checksum_map and checksum: # verify checksums 85 diff_map = dict((url_path_list[x-1][0],checksum_map.get(x)) for x in checksum_map \ 86 if checksum_map.get(x) != data.get(x)) 87 88 data_transfer = fetchConn.get_data_transfer() 89 if diff_map: 90 defval = -1 91 for key, val in diff_map.items(): 92 if val == "-1": # general error 93 diff_map[key] = -1 94 elif val == "-2": 95 diff_map[key] = -2 96 elif val == "-4": # timeout 97 diff_map[key] = -4 98 elif val == "-3": # not found 99 diff_map[key] = -3 100 elif val == -100: 101 defval = -100 102 return defval, diff_map, data_transfer 103 104 return 0, diff_map, data_transfer
105
106 - def fetch_files_on_mirrors(self, download_list, checksum = False, fetch_abort_function = None):
107 """ 108 @param download_map list [(repository,branch,filename,checksum (digest),signatures,),..] 109 @param checksum bool verify checksum? 110 @param fetch_abort_function callable method that could raise exceptions 111 """ 112 repo_uris = dict(((x[0],self.SystemSettings['repositories']['available'][x[0]]['packages'][::-1],) for x in download_list)) 113 remaining = repo_uris.copy() 114 my_download_list = download_list[:] 115 116 def get_best_mirror(repository): 117 try: 118 return remaining[repository][0] 119 except IndexError: 120 return None
121 122 def update_download_list(down_list, failed_down): 123 newlist = [] 124 for repo, branch, fname, cksum, signatures in down_list: 125 myuri = get_best_mirror(repo) 126 myuri = os.path.join(myuri,fname) 127 if myuri not in failed_down: 128 continue 129 newlist.append((repo,branch,fname,cksum,signatures,)) 130 return newlist
131 132 # return True: for failing, return False: for fine 133 def mirror_fail_check(repository, best_mirror): 134 # check if uri is sane 135 if not self.MirrorStatus.get_failing_mirror_status(best_mirror) >= 30: 136 return False 137 # set to 30 for convenience 138 self.MirrorStatus.set_failing_mirror_status(best_mirror, 30) 139 mirrorcount = repo_uris[repo].index(best_mirror)+1 140 mytxt = "( mirror #%s ) " % (mirrorcount,) 141 mytxt += blue(" %s: ") % (_("Mirror"),) 142 mytxt += red(self.entropyTools.spliturl(best_mirror)[1]) 143 mytxt += " - %s." % (_("maximum failure threshold reached"),) 144 self.updateProgress( 145 mytxt, 146 importance = 1, 147 type = "warning", 148 header = red(" ## ") 149 ) 150 151 if self.MirrorStatus.get_failing_mirror_status(best_mirror) == 30: 152 self.MirrorStatus.add_failing_mirror(best_mirror,45) 153 elif self.MirrorStatus.get_failing_mirror_status(best_mirror) > 31: 154 self.MirrorStatus.add_failing_mirror(best_mirror,-4) 155 else: 156 self.MirrorStatus.set_failing_mirror_status(best_mirror, 0) 157 158 remaining[repository].discard(best_mirror) 159 return True 160 161 def show_download_summary(down_list): 162 # fetch_files_list.append((myuri,None,cksum,branch,)) 163 for repo, branch, fname, cksum, signatures in down_list: 164 best_mirror = get_best_mirror(repo) 165 mirrorcount = repo_uris[repo].index(best_mirror)+1 166 mytxt = "( mirror #%s ) " % (mirrorcount,) 167 basef = os.path.basename(fname) 168 mytxt += "[%s] %s " % (brown(basef),blue("@"),) 169 mytxt += red(self.entropyTools.spliturl(best_mirror)[1]) 170 # now fetch the new one 171 self.updateProgress( 172 mytxt, 173 importance = 1, 174 type = "info", 175 header = red(" ## ") 176 ) 177 178 def show_successful_download(down_list, data_transfer): 179 for repo, branch, fname, cksum, signatures in down_list: 180 best_mirror = get_best_mirror(repo) 181 mirrorcount = repo_uris[repo].index(best_mirror)+1 182 mytxt = "( mirror #%s ) " % (mirrorcount,) 183 basef = os.path.basename(fname) 184 mytxt += "[%s] %s %s " % (brown(basef),darkred(_("success")),blue("@"),) 185 mytxt += red(self.entropyTools.spliturl(best_mirror)[1]) 186 self.updateProgress( 187 mytxt, 188 importance = 1, 189 type = "info", 190 header = red(" ## ") 191 ) 192 mytxt = " %s: %s%s%s" % ( 193 blue(_("Aggregated transfer rate")), 194 bold(self.entropyTools.bytes_into_human(data_transfer)), 195 darkred("/"), 196 darkblue(_("second")), 197 ) 198 self.updateProgress( 199 mytxt, 200 importance = 1, 201 type = "info", 202 header = red(" ## ") 203 ) 204 205 def show_download_error(down_list, rc): 206 for repo, branch, fname, cksum, signatures in down_list: 207 best_mirror = get_best_mirror(repo) 208 mirrorcount = repo_uris[repo].index(best_mirror)+1 209 mytxt = "( mirror #%s ) " % (mirrorcount,) 210 mytxt += blue("%s: %s") % ( 211 _("Error downloading from"), 212 red(self.entropyTools.spliturl(best_mirror)[1]), 213 ) 214 if rc == -1: 215 mytxt += " - %s." % (_("data not available on this mirror"),) 216 elif rc == -2: 217 self.MirrorStatus.add_failing_mirror(best_mirror,1) 218 mytxt += " - %s." % (_("wrong checksum"),) 219 elif rc == -3: 220 mytxt += " - %s." % (_("not found"),) 221 elif rc == -4: # timeout! 222 mytxt += " - %s." % (_("timeout error"),) 223 elif rc == -100: 224 mytxt += " - %s." % (_("discarded download"),) 225 else: 226 self.MirrorStatus.add_failing_mirror(best_mirror, 5) 227 mytxt += " - %s." % (_("unknown reason"),) 228 self.updateProgress( 229 mytxt, 230 importance = 1, 231 type = "warning", 232 header = red(" ## ") 233 ) 234 235 def remove_failing_mirrors(repos): 236 for repo in repos: 237 best_mirror = get_best_mirror(repo) 238 if remaining[repo]: 239 remaining[repo].pop(0) 240 241 def check_remaining_mirror_failure(repos): 242 return [x for x in repos if not remaining.get(x)] 243 244 while 1: 245 246 do_resume = True 247 timeout_try_count = 50 248 while 1: 249 250 fetch_files_list = [] 251 for repo, branch, fname, cksum, signatures in my_download_list: 252 best_mirror = get_best_mirror(repo) 253 # set working mirror, dont care if its None 254 self.MirrorStatus.set_working_mirror(best_mirror) 255 if best_mirror != None: 256 mirror_fail_check(repo, best_mirror) 257 best_mirror = get_best_mirror(repo) 258 if best_mirror == None: 259 # at least one package failed to download 260 # properly, give up with everything 261 return 3, my_download_list 262 myuri = os.path.join(best_mirror,fname) 263 fetch_files_list.append((myuri,None,cksum,branch,)) 264 265 try: 266 267 show_download_summary(my_download_list) 268 rc, failed_downloads, data_transfer = self.fetch_files( 269 fetch_files_list, checksum = checksum, 270 fetch_file_abort_function = fetch_abort_function, 271 resume = do_resume 272 ) 273 if rc == 0: 274 show_successful_download(my_download_list, data_transfer) 275 return 0, [] 276 277 # update my_download_list 278 my_download_list = update_download_list(my_download_list,failed_downloads) 279 if rc not in (-3,-4,-100,) and failed_downloads and do_resume: 280 # disable resume 281 do_resume = False 282 continue 283 else: 284 show_download_error(my_download_list, rc) 285 if rc == -4: # timeout 286 timeout_try_count -= 1 287 if timeout_try_count > 0: 288 continue 289 elif rc == -100: # user discarded fetch 290 return 1, [] 291 myrepos = set([x[0] for x in my_download_list]) 292 remove_failing_mirrors(myrepos) 293 # make sure we don't have nasty issues 294 remaining_failure = check_remaining_mirror_failure(myrepos) 295 if remaining_failure: 296 return 3, my_download_list 297 break 298 except KeyboardInterrupt: 299 return 1, [] 300 return 0, [] 301 302
303 - def fetch_file(self, url, branch, digest = None, resume = True, fetch_file_abort_function = None, filepath = None):
304 305 def do_stfu_rm(xpath): 306 try: 307 os.remove(xpath) 308 except OSError: 309 pass
310 311 filename = os.path.basename(url) 312 if not filepath: 313 filepath = os.path.join(etpConst['packagesbindir'],branch,filename) 314 filepath_dir = os.path.dirname(filepath) 315 if not os.path.isdir(filepath_dir): 316 os.makedirs(filepath_dir,0755) 317 318 existed_before = False 319 if os.path.isfile(filepath) and os.path.exists(filepath): 320 existed_before = True 321 322 # load class 323 fetchConn = self.urlFetcher(url, filepath, resume = resume, 324 abort_check_func = fetch_file_abort_function, OutputInterface = self) 325 fetchConn.progress = self.progress 326 327 # start to download 328 data_transfer = 0 329 resumed = False 330 try: 331 fetchChecksum = fetchConn.download() 332 data_transfer = fetchConn.get_transfer_rate() 333 resumed = fetchConn.is_resumed() 334 except KeyboardInterrupt: 335 return -100, data_transfer, resumed 336 except NameError: 337 raise 338 except: 339 if etpUi['debug']: 340 self.updateProgress( 341 "fetch_file:", 342 importance = 1, 343 type = "warning", 344 header = red(" ## ") 345 ) 346 self.entropyTools.print_traceback() 347 if not existed_before: 348 do_stfu_rm(filepath) 349 return -1, data_transfer, resumed 350 if fetchChecksum == "-3": 351 # not found 352 return -3, data_transfer, resumed 353 elif fetchChecksum == "-4": 354 # timeout 355 return -4, data_transfer, resumed 356 357 del fetchConn 358 359 if digest and (fetchChecksum != digest): 360 # not properly downloaded 361 if not existed_before: 362 do_stfu_rm(filepath) 363 return -2, data_transfer, resumed 364 365 return 0, data_transfer, resumed 366 367
368 - def fetch_file_on_mirrors(self, repository, branch, filename, 369 digest = False, fetch_abort_function = None):
370 371 uris = self.SystemSettings['repositories']['available'][repository]['packages'][::-1] 372 remaining = set(uris) 373 374 mirrorcount = 0 375 for uri in uris: 376 377 if not remaining: 378 # tried all the mirrors, quitting for error 379 self.MirrorStatus.set_working_mirror(None) 380 return 3 381 382 self.MirrorStatus.set_working_mirror(uri) 383 mirrorcount += 1 384 mirrorCountText = "( mirror #%s ) " % (mirrorcount,) 385 url = uri+"/"+filename 386 387 # check if uri is sane 388 if self.MirrorStatus.get_failing_mirror_status(uri) >= 30: 389 # ohohoh! 390 # set to 30 for convenience 391 self.MirrorStatus.set_failing_mirror_status(uri, 30) 392 mytxt = mirrorCountText 393 mytxt += blue(" %s: ") % (_("Mirror"),) 394 mytxt += red(self.entropyTools.spliturl(uri)[1]) 395 mytxt += " - %s." % (_("maximum failure threshold reached"),) 396 self.updateProgress( 397 mytxt, 398 importance = 1, 399 type = "warning", 400 header = red(" ## ") 401 ) 402 403 if self.MirrorStatus.get_failing_mirror_status(uri) == 30: 404 # put to 75 then decrement by 4 so we 405 # won't reach 30 anytime soon ahahaha 406 self.MirrorStatus.add_failing_mirror(uri,45) 407 elif self.MirrorStatus.get_failing_mirror_status(uri) > 31: 408 # now decrement each time this point is reached, 409 # if will be back < 30, then equo will try to use it again 410 self.MirrorStatus.add_failing_mirror(uri,-4) 411 else: 412 # put to 0 - reenable mirror, welcome back uri! 413 self.MirrorStatus.set_failing_mirror_status(uri, 0) 414 415 remaining.discard(uri) 416 continue 417 418 do_resume = True 419 timeout_try_count = 50 420 while 1: 421 try: 422 mytxt = mirrorCountText 423 mytxt += blue("%s: ") % (_("Downloading from"),) 424 mytxt += red(self.entropyTools.spliturl(uri)[1]) 425 # now fetch the new one 426 self.updateProgress( 427 mytxt, 428 importance = 1, 429 type = "warning", 430 header = red(" ## ") 431 ) 432 rc, data_transfer, resumed = self.fetch_file( 433 url, 434 branch, 435 digest, 436 do_resume, 437 fetch_file_abort_function = fetch_abort_function 438 ) 439 if rc == 0: 440 mytxt = mirrorCountText 441 mytxt += blue("%s: ") % (_("Successfully downloaded from"),) 442 mytxt += red(self.entropyTools.spliturl(uri)[1]) 443 mytxt += " %s %s/%s" % (_("at"),self.entropyTools.bytes_into_human(data_transfer),_("second"),) 444 self.updateProgress( 445 mytxt, 446 importance = 1, 447 type = "info", 448 header = red(" ## ") 449 ) 450 451 self.MirrorStatus.set_working_mirror(None) 452 return 0 453 elif resumed and (rc not in (-3,-4,-100,)): 454 do_resume = False 455 continue 456 else: 457 error_message = mirrorCountText 458 error_message += blue("%s: %s") % ( 459 _("Error downloading from"), 460 red(self.entropyTools.spliturl(uri)[1]), 461 ) 462 # something bad happened 463 if rc == -1: 464 error_message += " - %s." % (_("file not available on this mirror"),) 465 elif rc == -2: 466 self.MirrorStatus.add_failing_mirror(uri,1) 467 error_message += " - %s." % (_("wrong checksum"),) 468 elif rc == -3: 469 error_message += " - %s." % (_("not found"),) 470 elif rc == -4: # timeout! 471 timeout_try_count -= 1 472 if timeout_try_count > 0: 473 error_message += " - %s." % (_("timeout, retrying on this mirror"),) 474 else: 475 error_message += " - %s." % (_("timeout, giving up"),) 476 elif rc == -100: 477 error_message += " - %s." % (_("discarded download"),) 478 else: 479 self.MirrorStatus.add_failing_mirror(uri, 5) 480 error_message += " - %s." % (_("unknown reason"),) 481 self.updateProgress( 482 error_message, 483 importance = 1, 484 type = "warning", 485 header = red(" ## ") 486 ) 487 if rc == -4: # timeout 488 if timeout_try_count > 0: 489 continue 490 elif rc == -100: # user discarded fetch 491 self.MirrorStatus.set_working_mirror(None) 492 return 1 493 remaining.discard(uri) 494 # make sure we don't have nasty issues 495 if not remaining: 496 self.MirrorStatus.set_working_mirror(None) 497 return 3 498 break 499 except KeyboardInterrupt: 500 self.MirrorStatus.set_working_mirror(None) 501 return 1 502 503 self.MirrorStatus.set_working_mirror(None) 504 return 0
505