Package entropy :: Module transceivers

Source Code for Module entropy.transceivers

   1  # -*- coding: utf-8 -*- 
   2  """ 
   3   
   4      @author: Fabio Erculiani <lxnay@sabayonlinux.org> 
   5      @contact: lxnay@sabayonlinux.org 
   6      @copyright: Fabio Erculiani 
   7      @license: GPL-2 
   8   
   9      B{Entropy transceiver module}. 
  10   
  11  """ 
  12  from __future__ import with_statement 
  13  import os 
  14  import urllib2 
  15  import time 
  16  from entropy.const import etpConst 
  17  from entropy.output import TextInterface, darkblue, darkred, purple, blue, \ 
  18      brown, darkgreen, red, bold 
  19  from entropy.exceptions import * 
  20  from entropy.i18n import _ 
  21  from entropy.misc import TimeScheduled, ParallelTask 
  22  from entropy.core.settings.base import SystemSettings 
  23   
24 -class UrlFetcher:
25
26 - def __init__(self, url, path_to_save, checksum = True, 27 show_speed = True, resume = True, 28 abort_check_func = None, disallow_redirect = False, 29 thread_stop_func = None, speed_limit = None, 30 OutputInterface = None):
31 32 self.__system_settings = SystemSettings() 33 if speed_limit == None: 34 speed_limit = self.__system_settings['repositories']['transfer_limit'] 35 36 self.progress = None 37 import entropy.tools as entropyTools 38 import socket 39 self.entropyTools, self.socket = entropyTools, socket 40 self.__th_id = 0 41 self.__resume = resume 42 self.__url = self.__encode_url(url) 43 self.__path_to_save = path_to_save 44 self.__checksum = checksum 45 self.__show_speed = show_speed 46 self.__abort_check_func = abort_check_func 47 self.__thread_stop_func = thread_stop_func 48 self.__disallow_redirect = disallow_redirect 49 self.__speedlimit = speed_limit # kbytes/sec 50 self.__existed_before = False 51 self.localfile = None 52 53 # important to have this here too 54 self.__datatransfer = 0 55 self.__resumed = False 56 57 uname = os.uname() 58 self.user_agent = "Entropy/%s (compatible; %s; %s: %s %s %s)" % ( 59 etpConst['entropyversion'], 60 "Entropy", 61 os.path.basename(self.__url), 62 uname[0], 63 uname[4], 64 uname[2], 65 ) 66 self.__extra_header_data = {} 67 self.__Output = OutputInterface 68 if self.__Output == None: 69 self.__Output = TextInterface() 70 elif not hasattr(self.__Output,'updateProgress'): 71 mytxt = _("Output interface passed doesn't have the updateProgress method") 72 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,)) 73 elif not callable(self.__Output.updateProgress): 74 mytxt = _("Output interface passed doesn't have the updateProgress method") 75 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,))
76
77 - def _init_vars(self):
78 79 self.__resumed = False 80 self.__buffersize = 8192 81 self.__status = None 82 self.__remotefile = None 83 self.__downloadedsize = 0 84 self.__average = 0 85 self.__remotesize = 0 86 self.__oldaverage = 0.0 87 # transfer status data 88 self.__startingposition = 0 89 self.__datatransfer = 0 90 self.__time_remaining = "(infinite)" 91 self.__time_remaining_secs = 0 92 self.__elapsed = 0.0 93 self.__updatestep = 0.2 94 self.__transferpollingtime = float(1)/4 95 self.__existed_before = False 96 if os.path.lexists(self.__path_to_save): 97 self.__existed_before = True 98 self.__setup_resume_support() 99 self._setup_proxy()
100
101 - def __setup_resume_support(self):
102 103 # if client uses this instance more than 104 # once, make sure we close previously opened 105 # files. 106 if isinstance(self.localfile, file): 107 try: 108 self.localfile.flush() 109 self.localfile.close() 110 except (IOError, OSError,): 111 pass 112 113 # resume support 114 if os.path.isfile(self.__path_to_save) and \ 115 os.access(self.__path_to_save,os.W_OK) and self.__resume: 116 117 self.localfile = open(self.__path_to_save,"awb") 118 self.localfile.seek(0,os.SEEK_END) 119 self.__startingposition = int(self.localfile.tell()) 120 self.__resumed = True 121 122 elif os.path.lexists(self.__path_to_save) and not \ 123 self.entropyTools.is_valid_path(self.__path_to_save): 124 try: 125 os.remove(self.__path_to_save) 126 except OSError: # I won't stop you here 127 pass 128 self.localfile = open(self.__path_to_save,"wb")
129
130 - def _setup_proxy(self):
131 # setup proxy, doing here because config is dynamic 132 mydict = {} 133 proxy_data = self.__system_settings['system']['proxy'] 134 if proxy_data['ftp']: 135 mydict['ftp'] = proxy_data['ftp'] 136 if proxy_data['http']: 137 mydict['http'] = proxy_data['http'] 138 if mydict: 139 mydict['username'] = proxy_data['username'] 140 mydict['password'] = proxy_data['password'] 141 self.entropyTools.add_proxy_opener(urllib2, mydict) 142 else: 143 # unset 144 urllib2._opener = None
145
146 - def __encode_url(self, url):
147 import urllib 148 url = os.path.join(os.path.dirname(url), 149 urllib.quote(os.path.basename(url))) 150 return url
151
152 - def set_id(self, th_id):
153 self.__th_id = th_id
154
155 - def download(self):
156 157 self._init_vars() 158 self.speedUpdater = TimeScheduled( 159 self.__transferpollingtime, 160 self.__update_speed, 161 ) 162 self.speedUpdater.start() 163 # set timeout 164 self.socket.setdefaulttimeout(20) 165 166 if self.__url.startswith("http://"): 167 headers = { 'User-Agent' : self.user_agent } 168 req = urllib2.Request(self.__url, self.__extra_header_data, headers) 169 else: 170 req = self.__url 171 172 u_agent_error = False 173 while 1: 174 # get file size if available 175 try: 176 self.__remotefile = urllib2.urlopen(req) 177 except KeyboardInterrupt: 178 self.__close(False) 179 raise 180 except urllib2.HTTPError, e: 181 if (e.code == 405) and not u_agent_error: 182 # server doesn't like our user agent 183 req = self.__url 184 u_agent_error = True 185 continue 186 self.__close(True) 187 self.__status = "-3" 188 return self.__status 189 except: 190 self.__close(True) 191 self.__status = "-3" 192 return self.__status 193 break 194 195 try: 196 self.__remotesize = int(self.__remotefile.headers.get( 197 "content-length")) 198 self.__remotefile.close() 199 except KeyboardInterrupt: 200 self.__close(False) 201 raise 202 except: 203 pass 204 205 # handle user stupidity 206 try: 207 request = self.__url 208 if ((self.__startingposition > 0) and (self.__remotesize > 0)) \ 209 and (self.__startingposition < self.__remotesize): 210 211 try: 212 request = urllib2.Request( 213 self.__url, 214 headers = { 215 "Range" : "bytes=" + \ 216 str(self.__startingposition) + "-" + \ 217 str(self.__remotesize) 218 } 219 ) 220 except KeyboardInterrupt: 221 self.__close(False) 222 raise 223 except: 224 pass 225 elif (self.__startingposition == self.__remotesize): 226 # all fine then! 227 self.__close(False) 228 return self.__prepare_return() 229 else: 230 self.localfile = open(self.__path_to_save,"wb") 231 self.__remotefile = urllib2.urlopen(request) 232 except KeyboardInterrupt: 233 self.__close(False) 234 raise 235 except: 236 self.__close(True) 237 self.__status = "-3" 238 return self.__status 239 240 if self.__remotesize > 0: 241 self.__remotesize = float(int(self.__remotesize))/1024 242 243 if self.__disallow_redirect and \ 244 (self.__url != self.__remotefile.geturl()): 245 246 self.__close(True) 247 self.__status = "-3" 248 return self.__status 249 250 while 1: 251 try: 252 rsx = self.__remotefile.read(self.__buffersize) 253 if rsx == '': break 254 if self.__abort_check_func != None: 255 self.__abort_check_func() 256 if self.__thread_stop_func != None: 257 self.__thread_stop_func() 258 except KeyboardInterrupt: 259 self.__close(False) 260 raise 261 except self.socket.timeout: 262 self.__close(False) 263 self.__status = "-4" 264 return self.__status 265 except: 266 # python 2.4 timeouts go here 267 self.__close(True) 268 self.__status = "-3" 269 return self.__status 270 self.__commit(rsx) 271 if self.__show_speed: 272 self.handle_statistics(self.__th_id, self.__downloadedsize, 273 self.__remotesize, self.__average, self.__oldaverage, 274 self.__updatestep, self.__show_speed, self.__datatransfer, 275 self.__time_remaining, self.__time_remaining_secs 276 ) 277 self.updateProgress() 278 self.__oldaverage = self.__average 279 if self.__speedlimit: 280 while self.__datatransfer > self.__speedlimit*1024: 281 time.sleep(0.1) 282 if self.__show_speed: 283 self.updateProgress() 284 self.__oldaverage = self.__average 285 286 # kill thread 287 self.__close(False) 288 return self.__prepare_return()
289 290
291 - def __prepare_return(self):
292 if self.__checksum: 293 self.__status = self.entropyTools.md5sum(self.__path_to_save) 294 return self.__status 295 self.__status = "-2" 296 return self.__status
297
298 - def __commit(self, mybuffer):
299 # writing file buffer 300 self.localfile.write(mybuffer) 301 # update progress info 302 self.__downloadedsize = self.localfile.tell() 303 kbytecount = float(self.__downloadedsize)/1024 304 self.__average = int((kbytecount/self.__remotesize)*100)
305
306 - def __close(self, errored):
307 try: 308 if isinstance(self.localfile, file): 309 self.localfile.flush() 310 self.localfile.close() 311 except IOError: 312 pass 313 if (not self.__existed_before) and errored: 314 try: 315 os.remove(self.__path_to_save) 316 except OSError: 317 pass 318 try: 319 self.__remotefile.close() 320 except: 321 pass 322 self.speedUpdater.kill() 323 self.socket.setdefaulttimeout(2)
324
325 - def __update_speed(self):
326 self.__elapsed += self.__transferpollingtime 327 # we have the diff size 328 x_delta = self.__downloadedsize - self.__startingposition 329 self.__datatransfer = x_delta / self.__elapsed 330 if self.__datatransfer < 0: 331 self.__datatransfer = 0 332 try: 333 rounded_remote = int(round(self.__remotesize*1024,0)) 334 rounded_downloaded = int(round(self.__downloadedsize,0)) 335 x_delta = rounded_remote - rounded_downloaded 336 tx_round = int(round(x_delta/self.__datatransfer,0)) 337 self.__time_remaining_secs = tx_round 338 self.__time_remaining = self.entropyTools.convert_seconds_to_fancy_output(self.__time_remaining_secs) 339 except: 340 self.__time_remaining = "(%s)" % (_("infinite"),)
341
342 - def get_transfer_rate(self):
343 return self.__datatransfer
344
345 - def is_resumed(self):
346 return self.__resumed
347
348 - def handle_statistics(self, th_id, downloaded_size, total_size, 349 average, old_average, update_step, show_speed, data_transfer, 350 time_remaining, time_remaining_secs):
351 return
352
353 - def updateProgress(self):
354 355 mytxt = _("[F]") 356 eta_txt = _("ETA") 357 sec_txt = _("sec") # as in XX kb/sec 358 359 currentText = darkred(" %s: " % (mytxt,)) + \ 360 darkgreen(str(round(float(self.__downloadedsize)/1024,1))) + "/" + \ 361 red(str(round(self.__remotesize,1))) + " kB" 362 # create progress bar 363 barsize = 10 364 bartext = "[" 365 curbarsize = 1 366 if self.__average > self.__oldaverage+self.__updatestep: 367 averagesize = (self.__average*barsize)/100 368 while averagesize > 0: 369 curbarsize += 1 370 bartext += "=" 371 averagesize -= 1 372 bartext += ">" 373 diffbarsize = barsize-curbarsize 374 while diffbarsize > 0: 375 bartext += " " 376 diffbarsize -= 1 377 if self.__show_speed: 378 bartext += "] => %s" % (self.entropyTools.bytes_into_human(self.__datatransfer),) 379 bartext += "/%s : %s: %s" % (sec_txt,eta_txt,self.__time_remaining,) 380 else: 381 bartext += "]" 382 average = str(self.__average) 383 if len(average) < 2: 384 average = " "+average 385 currentText += " <-> "+average+"% "+bartext 386 self.__Output.updateProgress(currentText, back = True)
387
388 -class MultipleUrlFetcher:
389 390 import entropy.tools as entropyTools
391 - def __init__(self, url_path_list, checksum = True, 392 show_speed = True, resume = True, 393 abort_check_func = None, disallow_redirect = False, 394 OutputInterface = None, UrlFetcherClass = None):
395 """ 396 @param url_path_list list [(url,path_to_save,),...] 397 @param checksum bool return checksum data 398 @param show_speed bool show transfer speed on the output 399 @param resume bool enable resume support 400 @param abort_check_func callable function that could 401 raise exception and stop transfer 402 @param disallow_redirect bool disable automatic HTTP redirect 403 @param OutputInterface TextInterface instance used to 404 print instance output through a common interface 405 @param UrlFetcherClass, urlFetcher instance/interface used 406 """ 407 self.__system_settings = SystemSettings() 408 self.__url_path_list = url_path_list 409 self.__resume = resume 410 self.__checksum = checksum 411 self.__show_speed = show_speed 412 self.__abort_check_func = abort_check_func 413 self.__disallow_redirect = disallow_redirect 414 415 # important to have a declaration here 416 self.__data_transfer = 0 417 self.__average = 0 418 self.__old_average = 0 419 self.__time_remaining_sec = 0 420 421 self.__Output = OutputInterface 422 if self.__Output == None: 423 self.__Output = TextInterface() 424 elif not hasattr(self.__Output,'updateProgress'): 425 mytxt = _("Output interface passed doesn't have the updateProgress method") 426 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,)) 427 elif not callable(self.__Output.updateProgress): 428 mytxt = _("Output interface passed doesn't have the updateProgress method") 429 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,)) 430 431 self.__url_fetcher = UrlFetcherClass 432 if self.__url_fetcher == None: 433 self.__url_fetcher = UrlFetcher
434 435
436 - def __handle_threads_stop(self):
437 if self.__stop_threads: 438 raise InterruptError
439
440 - def _init_vars(self):
441 self.__progress_data = {} 442 self.__thread_pool = {} 443 self.__download_statuses = {} 444 self.__show_progress = False 445 self.__stop_threads = False 446 self.__first_refreshes = 50 447 self.__data_transfer = 0 448 self.__average = 0 449 self.__old_average = 0 450 self.__time_remaining_sec = 0
451
452 - def download(self):
453 self._init_vars() 454 455 th_id = 0 456 speed_limit = 0 457 dsl = self.__system_settings['repositories']['transfer_limit'] 458 if isinstance(dsl,int) and self.__url_path_list: 459 speed_limit = dsl/len(self.__url_path_list) 460 461 class MyFetcher(self.__url_fetcher): 462 463 def __init__(self, klass, multiple, *args, **kwargs): 464 klass.__init__(self, *args, **kwargs) 465 self.__multiple_fetcher = multiple
466 467 def updateProgress(self): 468 return self.__multiple_fetcher.updateProgress()
469 470 def handle_statistics(self, *args, **kwargs): 471 return self.__multiple_fetcher.handle_statistics(*args, 472 **kwargs) 473 474 for url, path_to_save in self.__url_path_list: 475 th_id += 1 476 downloader = MyFetcher(self.__url_fetcher, self, url, path_to_save, 477 checksum = self.__checksum, show_speed = self.__show_speed, 478 resume = self.__resume, 479 abort_check_func = self.__abort_check_func, 480 disallow_redirect = self.__disallow_redirect, 481 thread_stop_func = self.__handle_threads_stop, 482 speed_limit = speed_limit, 483 OutputInterface = self.__Output 484 ) 485 downloader.set_id(th_id) 486 487 def do_download(ds, th_id, downloader): 488 ds[th_id] = downloader.download() 489 490 t = ParallelTask(do_download, self.__download_statuses, th_id, downloader) 491 self.__thread_pool[th_id] = t 492 t.start() 493 self.show_download_files_info() 494 self.__show_progress = True 495 while len(self.__url_path_list) != len(self.__download_statuses): 496 try: 497 time.sleep(0.5) 498 except (SystemExit, KeyboardInterrupt,): 499 self.__stop_threads = True 500 raise 501 502 return self.__download_statuses 503
504 - def get_data_transfer(self):
505 return self.__data_transfer
506
507 - def get_average(self):
508 return self.__average
509
510 - def get_seconds_remaining(self):
511 return self.__time_remaining_sec
512
513 - def show_download_files_info(self):
514 count = 0 515 pl = self.__url_path_list[:] 516 self.__Output.updateProgress( 517 "%s: %s %s" % ( 518 darkblue(_("Aggregated download")), 519 darkred(str(len(pl))), 520 darkblue(_("items")), 521 ), 522 importance = 0, 523 type = "info", 524 header = purple(" ## ") 525 ) 526 for url, save_path in pl: 527 count += 1 528 fname = os.path.basename(url) 529 uri = self.entropyTools.spliturl(url)[1] 530 self.__Output.updateProgress( 531 "[%s] %s => %s" % ( 532 darkblue(str(count)), 533 darkgreen(uri), 534 blue(fname), 535 ), 536 importance = 0, 537 type = "info", 538 header = brown(" # ") 539 )
540
541 - def handle_statistics(self, th_id, downloaded_size, total_size, 542 average, old_average, update_step, show_speed, data_transfer, 543 time_remaining, time_remaining_secs):
544 data = { 545 'th_id': th_id, 546 'downloaded_size': downloaded_size, 547 'total_size': total_size, 548 'average': average, 549 'old_average': old_average, 550 'update_step': update_step, 551 'show_speed': show_speed, 552 'data_transfer': data_transfer, 553 'time_remaining': time_remaining, 554 'time_remaining_secs': time_remaining_secs, 555 } 556 self.__progress_data[th_id] = data
557
558 - def updateProgress(self):
559 560 eta_txt = _("ETA") 561 sec_txt = _("sec") # as in XX kb/sec 562 downloaded_size = 0 563 total_size = 0 564 time_remaining = 0 565 data_transfer = 0 566 update_step = 0 567 average = 100 568 pd = self.__progress_data.copy() 569 pdlen = len(pd) 570 571 # calculation 572 for th_id in sorted(pd): 573 data = pd.get(th_id) 574 downloaded_size += data.get('downloaded_size',0) 575 total_size += data.get('total_size',0) 576 data_transfer += data.get('data_transfer',0) 577 tr = data.get('time_remaining_secs',0) 578 if tr > 0: time_remaining += tr 579 update_step += data.get('update_step',0) 580 581 # total_size is in kbytes 582 # downloaded_size is in bytes 583 if total_size > 0: 584 average = int(float(downloaded_size/1024)/total_size * 100) 585 self.__data_transfer = data_transfer 586 self.__average = average 587 update_step = update_step/pdlen 588 self.__time_remaining_sec = time_remaining 589 time_remaining = self.entropyTools.convert_seconds_to_fancy_output(time_remaining) 590 591 if ((average > self.__old_average+update_step) or \ 592 (self.__first_refreshes > 0)) and self.__show_progress: 593 594 self.__first_refreshes -= 1 595 currentText = darkgreen(str(round(float(downloaded_size)/1024,1))) + "/" + \ 596 red(str(round(total_size,1))) + " kB" 597 # create progress bar 598 barsize = 10 599 bartext = "[" 600 curbarsize = 1 601 averagesize = (average*barsize)/100 602 while averagesize > 0: 603 curbarsize += 1 604 bartext += "=" 605 averagesize -= 1 606 bartext += ">" 607 diffbarsize = barsize-curbarsize 608 while diffbarsize > 0: 609 bartext += " " 610 diffbarsize -= 1 611 if self.__show_speed: 612 bartext += "] => %s" % (self.entropyTools.bytes_into_human(data_transfer),) 613 bartext += "/%s : %s: %s" % (sec_txt,eta_txt,time_remaining,) 614 else: 615 bartext += "]" 616 myavg = str(average) 617 if len(myavg) < 2: 618 myavg = " "+myavg 619 currentText += " <-> "+myavg+"% "+bartext+" " 620 self.__Output.updateProgress(currentText, back = True) 621 622 self.__old_average = average
623 624
625 -class FtpInterface:
626 627 # this must be run before calling the other functions
628 - def __init__(self, ftpuri, OutputInterface, verbose = True):
629 630 if not hasattr(OutputInterface,'updateProgress'): 631 mytxt = _("OutputInterface does not have an updateProgress method") 632 raise IncorrectParameter("IncorrectParameter: %s, (! %s !)" % (OutputInterface,mytxt,)) 633 elif not callable(OutputInterface.updateProgress): 634 mytxt = _("OutputInterface does not have an updateProgress method") 635 raise IncorrectParameter("IncorrectParameter: %s, (! %s !)" % (OutputInterface,mytxt,)) 636 637 import socket, ftplib 638 import entropy.tools as entropyTools 639 self.socket, self.ftplib, self.entropyTools = socket, ftplib, entropyTools 640 self.Entropy = OutputInterface 641 self.__verbose = verbose 642 self.__init_vars() 643 self.socket.setdefaulttimeout(60) 644 self.__ftpuri = ftpuri 645 self.__speed_updater = None 646 self.__currentdir = '.' 647 self.__ftphost = self.entropyTools.extract_ftp_host_from_uri(self.__ftpuri) 648 self.__ftpuser, self.__ftppassword, self.__ftpport, self.__ftpdir = self.entropyTools.extract_ftp_data(ftpuri) 649 650 count = 10 651 while 1: 652 count -= 1 653 try: 654 self.__ftpconn = self.ftplib.FTP(self.__ftphost) 655 break 656 except (self.socket.gaierror,), e: 657 raise ConnectionError('ConnectionError: %s' % (e,)) 658 except (self.socket.error,), e: 659 if not count: 660 raise ConnectionError('ConnectionError: %s' % (e,)) 661 continue 662 except: 663 if not count: raise 664 continue 665 666 if self.__verbose: 667 mytxt = _("connecting with user") 668 self.Entropy.updateProgress( 669 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpuser),), 670 importance = 1, 671 type = "info", 672 header = darkgreen(" * ") 673 ) 674 try: 675 self.__ftpconn.login(self.__ftpuser,self.__ftppassword) 676 except self.ftplib.error_perm, e: 677 raise FtpError('FtpError: %s' % (e,)) 678 if self.__verbose: 679 mytxt = _("switching to") 680 self.Entropy.updateProgress( 681 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpdir),), 682 importance = 1, 683 type = "info", 684 header = darkgreen(" * ") 685 ) 686 self.set_cwd(self.__ftpdir, dodir = True)
687
688 - def __init_vars(self):
689 self.__oldprogress = 0.0 690 self.__filesize = 0 691 self.__filekbcount = 0 692 self.__transfersize = 0 693 self.__startingposition = 0 694 self.__elapsed = 0.0 695 self.__time_remaining_secs = 0 696 self.__time_remaining = "(%s)" % (_("infinite"),) 697 self.__transferpollingtime = float(1)/4
698
699 - def set_basedir(self):
700 return self.set_cwd(self.__ftpdir)
701 702 # this can be used in case of exceptions
703 - def reconnect_host(self):
704 # import FTP modules 705 self.socket.setdefaulttimeout(60) 706 counter = 10 707 while 1: 708 counter -= 1 709 try: 710 self.__ftpconn = self.ftplib.FTP(self.__ftphost) 711 break 712 except: 713 if not counter: 714 raise 715 continue 716 if self.__verbose: 717 mytxt = _("reconnecting with user") 718 self.Entropy.updateProgress( 719 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpuser),), 720 importance = 1, 721 type = "info", 722 header = darkgreen(" * ") 723 ) 724 self.__ftpconn.login(self.__ftpuser,self.__ftppassword) 725 if self.__verbose: 726 mytxt = _("switching to") 727 self.Entropy.updateProgress( 728 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpdir),), 729 importance = 1, 730 type = "info", 731 header = darkgreen(" * ") 732 ) 733 self.set_cwd(self.__currentdir)
734
735 - def get_host(self):
736 return self.__ftphost
737
738 - def get_port(self):
739 return self.__ftpport
740
741 - def get_dir(self):
742 return self.__ftpdir
743
744 - def get_cwd(self):
745 pwd = self.__ftpconn.pwd() 746 return pwd
747
748 - def set_cwd(self, mydir, dodir = False):
749 try: 750 return self._set_cwd(mydir, dodir) 751 except self.ftplib.error_perm, e: 752 raise FtpError('FtpError: %s' % (e,))
753
754 - def _set_cwd(self, mydir, dodir = False):
755 if self.__verbose: 756 mytxt = _("switching to") 757 self.Entropy.updateProgress( 758 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(mydir),), 759 importance = 1, 760 type = "info", 761 header = darkgreen(" * ") 762 ) 763 try: 764 self.__ftpconn.cwd(mydir) 765 except self.ftplib.error_perm, e: 766 if e[0][:3] == '550' and dodir: 767 self.recursive_mkdir(mydir) 768 self.__ftpconn.cwd(mydir) 769 else: 770 raise 771 self.__currentdir = self.get_cwd()
772
773 - def set_pasv(self,bool):
774 self.__ftpconn.set_pasv(bool)
775
776 - def set_chmod(self,chmodvalue,file):
777 return self.__ftpconn.voidcmd("SITE CHMOD "+str(chmodvalue)+" "+str(file))
778
779 - def get_file_mtime(self,path):
780 rc = self.__ftpconn.sendcmd("mdtm "+path) 781 return rc.split()[-1]
782
783 - def send_cmd(self,cmd):
784 return self.__ftpconn.sendcmd(cmd)
785
786 - def list_dir(self):
787 return [os.path.basename(x) for x in self.__ftpconn.nlst()]
788
789 - def is_file_available(self, filename):
790 xx = [] 791 def cb(x): 792 if x == filename: xx.append(x)
793 self.__ftpconn.retrlines('NLST',cb) 794 if xx: return True 795 return False
796
797 - def delete_file(self,file):
798 try: 799 rc = self.__ftpconn.delete(file) 800 except self.ftplib.error_perm, e: 801 if e[0][:3] == '550': 802 return True 803 return False # not found 804 if rc.startswith("250"): 805 return True 806 return False
807
808 - def recursive_mkdir(self, mypath):
809 mydirs = [x for x in mypath.split("/") if x] 810 mycurpath = "" 811 for mydir in mydirs: 812 mycurpath = os.path.join(mycurpath,mydir) 813 if not self.is_file_available(mycurpath): 814 try: 815 self.mkdir(mycurpath) 816 except self.ftplib.error_perm, e: 817 if e[0].lower().find("permission denied") != -1: 818 raise 819 elif e[0][:3] != '550': 820 raise
821
822 - def mkdir(self,directory):
823 return self.__ftpconn.mkd(directory)
824
825 - def upload_file(self, file, ascii = False):
826 827 # this function also supports callback, because storbinary doesn't 828 def advanced_stor(cmd, fp): 829 ''' Store a file in binary mode. Our version supports a callback function''' 830 self.__ftpconn.voidcmd('TYPE I') 831 conn = self.__ftpconn.transfercmd(cmd) 832 while 1: 833 buf = fp.readline() 834 if not buf: 835 break 836 conn.sendall(buf) 837 self.updateProgress(len(buf)) 838 conn.close() 839 840 # that's another workaround 841 #return "226" 842 try: 843 rc = self.__ftpconn.voidresp() 844 except: 845 self.reconnect_host() 846 return "226" 847 return rc
848 849 tries = 0 850 while tries < 10: 851 852 tries += 1 853 filename = os.path.basename(file) 854 self.__init_vars() 855 self.__start_speed_counter() 856 try: 857 858 with open(file,"r") as f: 859 860 self.__filesize = round(float(self.entropyTools.get_file_size(file))/1024,1) 861 self.__filekbcount = 0 862 863 # delete old one, if exists 864 self.delete_file(filename+".tmp") 865 866 if ascii: 867 rc = self.__ftpconn.storlines("STOR "+filename+".tmp",f) 868 else: 869 rc = advanced_stor("STOR "+filename+".tmp", f) 870 871 # now we can rename the file with its original name 872 self.rename_file(filename+".tmp",filename) 873 874 if rc.find("226") != -1: # upload complete 875 return True 876 return False 877 878 except Exception, e: # connection reset by peer 879 880 self.entropyTools.print_traceback() 881 mytxt = red("%s: %s, %s... #%s") % ( 882 _("Upload issue"), 883 e, 884 _("retrying"), 885 tries+1, 886 ) 887 self.Entropy.updateProgress( 888 mytxt, 889 importance = 1, 890 type = "warning", 891 header = " " 892 ) 893 self.reconnect_host() # reconnect 894 self.delete_file(filename) 895 self.delete_file(filename+".tmp") 896 897 finally: 898 self.__stop_speed_counter() 899
900 - def download_file(self, filename, downloaddir, ascii = False):
901 902 def df_up(buf): 903 # writing file buffer 904 f.write(buf) 905 self.updateProgress(len(buf))
906 907 tries = 10 908 while tries: 909 tries -= 1 910 911 self.__init_vars() 912 self.__start_speed_counter() 913 try: 914 915 self.__filekbcount = 0 916 # get the file size 917 self.__filesize = self.get_file_size_compat(filename) 918 if (self.__filesize): 919 self.__filesize = round(float(int(self.__filesize))/1024,1) 920 if (self.__filesize == 0): 921 self.__filesize = 1 922 elif not self.is_file_available(filename): 923 return False 924 else: 925 self.__filesize = 0 926 if not ascii: 927 f = open(downloaddir+"/"+filename,"wb") 928 rc = self.__ftpconn.retrbinary('RETR '+filename, df_up, 1024) 929 else: 930 f = open(downloaddir+"/"+filename,"w") 931 rc = self.__ftpconn.retrlines('RETR '+filename, f.write) 932 f.flush() 933 f.close() 934 if rc.find("226") != -1: # upload complete 935 return True 936 return False 937 938 except Exception, e: # connection reset by peer 939 940 self.entropyTools.print_traceback() 941 mytxt = red("%s: %s, %s... #%s") % ( 942 _("Download issue"), 943 e, 944 _("retrying"), 945 tries+1, 946 ) 947 self.Entropy.updateProgress( 948 mytxt, 949 importance = 1, 950 type = "warning", 951 header = " " 952 ) 953 self.reconnect_host() # reconnect 954 955 finally: 956 self.__stop_speed_counter() 957 958 # also used to move files
959 - def rename_file(self, fromfile, tofile):
960 rc = self.__ftpconn.rename(fromfile,tofile) 961 return rc
962
963 - def get_file_md5(self, filename):
964 # PROFTPD with mod_md5 supports it! 965 try: 966 rc_data = self.__ftpconn.sendcmd("SITE MD5 %s" % (filename,)) 967 except self.ftplib.error_perm: 968 return None # not supported 969 try: 970 return rc_data.split("\n")[0].split("\t")[0].split("-")[1] 971 except (IndexError, TypeError,): # wrong output 972 return None
973
974 - def get_file_size(self, filename):
975 return self.__ftpconn.size(filename)
976
977 - def get_file_size_compat(self, filename):
978 try: 979 data = [x.split() for x in self.__ftpconn.sendcmd("stat %s" % (filename,)).split("\n")] 980 except self.ftplib.error_temp: 981 return "" 982 for item in data: 983 if item[-1] == filename: 984 return item[4] 985 return ""
986
987 - def get_raw_list(self):
988 mybuffer = [] 989 def bufferizer(buf): 990 mybuffer.append(buf)
991 self.__ftpconn.dir(bufferizer) 992 return mybuffer 993
994 - def close(self):
995 try: 996 self.__ftpconn.quit() 997 except (EOFError,AttributeError,self.socket.timeout,self.ftplib.error_reply,): 998 # AttributeError is raised when socket gets trashed 999 # EOFError is raised when the connection breaks 1000 # timeout, who cares! 1001 pass
1002
1003 - def __start_speed_counter(self):
1004 self.__speed_updater = TimeScheduled( 1005 self.__transferpollingtime, 1006 self.__update_speed, 1007 ) 1008 self.__speed_updater.start()
1009
1010 - def __stop_speed_counter(self):
1011 if self.__speed_updater != None: 1012 self.__speed_updater.kill()
1013
1014 - def __update_speed(self):
1015 self.__elapsed += self.__transferpollingtime 1016 # we have the diff size 1017 self.__datatransfer = (self.__transfersize-self.__startingposition) / self.__elapsed 1018 if self.__datatransfer < 0: 1019 self.__datatransfer = 0 1020 try: 1021 self.__time_remaining_secs = int(round((int(round(self.__filesize*1024,0))-int(round(self.__transfersize,0)))/self.__datatransfer,0)) 1022 self.__time_remaining = self.entropyTools.convert_seconds_to_fancy_output(self.__time_remaining_secs) 1023 except: 1024 self.__time_remaining = "(%s)" % (_("infinite"),)
1025
1026 - def updateProgress(self, buf_len):
1027 # get the buffer size 1028 self.__filekbcount += float(buf_len)/1024 1029 self.__transfersize += buf_len 1030 # create percentage 1031 myUploadPercentage = 100.0 1032 if self.__filesize >= 1: 1033 myUploadPercentage = round((round(self.__filekbcount,1)/self.__filesize)*100,1) 1034 currentprogress = myUploadPercentage 1035 myUploadSize = round(self.__filekbcount,1) 1036 if (currentprogress > self.__oldprogress+1.0) and \ 1037 (myUploadPercentage < 100.1) and \ 1038 (myUploadSize <= self.__filesize): 1039 1040 myUploadPercentage = str(myUploadPercentage)+"%" 1041 # create text 1042 mytxt = _("Transfer status") 1043 currentText = brown(" <-> %s: " % (mytxt,)) + \ 1044 darkgreen(str(myUploadSize)) + "/" + red(str(self.__filesize)) + " kB " + \ 1045 brown("[") + str(myUploadPercentage) + brown("]") + " " + self.__time_remaining + \ 1046 " " + self.entropyTools.bytes_into_human(self.__datatransfer) + "/"+_("sec") 1047 # WARN: re-enabled updateProgress, this may cause slowdowns 1048 # print_info(currentText, back = True) 1049 self.Entropy.updateProgress(currentText, back = True) 1050 self.__oldprogress = currentprogress
1051 1052
1053 -class FtpServerHandler:
1054 1055 import entropy.tools as entropyTools
1056 - def __init__(self, ftp_interface, entropy_interface, uris, files_to_upload, 1057 download = False, remove = False, ftp_basedir = None, local_basedir = None, 1058 critical_files = [], use_handlers = False, handlers_data = {}, repo = None):
1059 1060 self.FtpInterface = ftp_interface 1061 self.Entropy = entropy_interface 1062 if not isinstance(uris,list): 1063 raise InvalidDataType("InvalidDataType: %s" % (_("uris must be a list instance"),)) 1064 if not isinstance(files_to_upload,(list,dict)): 1065 raise InvalidDataType("InvalidDataType: %s" % ( 1066 _("files_to_upload must be a list or dict instance"), 1067 ) 1068 ) 1069 self.uris = uris 1070 if isinstance(files_to_upload,list): 1071 self.myfiles = files_to_upload[:] 1072 else: 1073 self.myfiles = sorted([x for x in files_to_upload]) 1074 self.download = download 1075 self.remove = remove 1076 self.repo = repo 1077 if self.repo == None: 1078 self.repo = self.Entropy.default_repository 1079 self.use_handlers = use_handlers 1080 if self.remove: 1081 self.download = False 1082 self.use_handlers = False 1083 if not ftp_basedir: 1084 # default to database directory 1085 branch = self.Entropy.SystemSettings['repositories']['branch'] 1086 my_path = os.path.join(self.Entropy.get_remote_database_relative_path(repo), branch) 1087 self.ftp_basedir = unicode(my_path) 1088 else: 1089 self.ftp_basedir = unicode(ftp_basedir) 1090 if not local_basedir: 1091 # default to database directory 1092 self.local_basedir = os.path.dirname(self.Entropy.get_local_database_file(self.repo)) 1093 else: 1094 self.local_basedir = unicode(local_basedir) 1095 self.critical_files = critical_files 1096 self.handlers_data = handlers_data.copy()
1097
1098 - def handler_verify_upload(self, local_filepath, uri, counter, maxcount, 1099 tries, remote_md5 = None):
1100 1101 crippled_uri = self.entropyTools.extract_ftp_host_from_uri(uri) 1102 1103 self.Entropy.updateProgress( 1104 "[%s|#%s|(%s/%s)] %s: %s" % ( 1105 blue(crippled_uri), 1106 darkgreen(str(tries)), 1107 blue(str(counter)), 1108 bold(str(maxcount)), 1109 darkgreen(_("verifying upload (if supported)")), 1110 blue(os.path.basename(local_filepath)), 1111 ), 1112 importance = 0, 1113 type = "info", 1114 header = red(" @@ "), 1115 back = True 1116 ) 1117 1118 valid_remote_md5 = True 1119 # if remote server supports MD5 commands, remote_md5 is filled 1120 if isinstance(remote_md5, basestring): 1121 valid_md5 = self.entropyTools.is_valid_md5(remote_md5) 1122 ckres = False 1123 if valid_md5: # seems valid 1124 ckres = self.entropyTools.compare_md5(local_filepath, 1125 remote_md5) 1126 if ckres: 1127 self.Entropy.updateProgress( 1128 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1129 blue(crippled_uri), 1130 darkgreen(str(tries)), 1131 blue(str(counter)), 1132 bold(str(maxcount)), 1133 blue(_("digest verification")), 1134 os.path.basename(local_filepath), 1135 darkgreen(_("so far, so good!")), 1136 ), 1137 importance = 0, 1138 type = "info", 1139 header = red(" @@ ") 1140 ) 1141 return True 1142 # ouch! 1143 elif not valid_md5: 1144 # mmmh... malformed md5, try with handlers 1145 self.Entropy.updateProgress( 1146 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1147 blue(crippled_uri), 1148 darkgreen(str(tries)), 1149 blue(str(counter)), 1150 bold(str(maxcount)), 1151 blue(_("digest verification")), 1152 os.path.basename(local_filepath), 1153 bold(_("malformed md5 provided to function")), 1154 ), 1155 importance = 0, 1156 type = "warning", 1157 header = brown(" @@ ") 1158 ) 1159 else: # it's really bad! 1160 self.Entropy.updateProgress( 1161 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1162 blue(crippled_uri), 1163 darkgreen(str(tries)), 1164 blue(str(counter)), 1165 bold(str(maxcount)), 1166 blue(_("digest verification")), 1167 os.path.basename(local_filepath), 1168 bold(_("remote md5 is invalid")), 1169 ), 1170 importance = 0, 1171 type = "warning", 1172 header = brown(" @@ ") 1173 ) 1174 valid_remote_md5 = False 1175 1176 if not self.use_handlers: 1177 # handlers usage is disabled 1178 return valid_remote_md5 # always valid 1179 1180 checksum = self.Entropy.get_remote_package_checksum( 1181 self.repo, 1182 os.path.basename(local_filepath), 1183 self.handlers_data['branch'] 1184 ) 1185 if checksum == None: 1186 self.Entropy.updateProgress( 1187 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1188 blue(crippled_uri), 1189 darkgreen(str(tries)), 1190 blue(str(counter)), 1191 bold(str(maxcount)), 1192 blue(_("digest verification")), 1193 os.path.basename(local_filepath), 1194 darkred(_("not supported")), 1195 ), 1196 importance = 0, 1197 type = "info", 1198 header = red(" @@ ") 1199 ) 1200 return valid_remote_md5 1201 elif isinstance(checksum, bool) and not checksum: 1202 self.Entropy.updateProgress( 1203 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1204 blue(crippled_uri), 1205 darkgreen(str(tries)), 1206 blue(str(counter)), 1207 bold(str(maxcount)), 1208 blue(_("digest verification")), 1209 os.path.basename(local_filepath), 1210 bold(_("file not found")), 1211 ), 1212 importance = 0, 1213 type = "warning", 1214 header = brown(" @@ ") 1215 ) 1216 return False 1217 elif self.entropyTools.is_valid_md5(checksum): 1218 # valid? checking 1219 ckres = self.entropyTools.compare_md5(local_filepath,checksum) 1220 if ckres: 1221 self.Entropy.updateProgress( 1222 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1223 blue(crippled_uri), 1224 darkgreen(str(tries)), 1225 blue(str(counter)), 1226 bold(str(maxcount)), 1227 blue(_("digest verification")), 1228 os.path.basename(local_filepath), 1229 darkgreen(_("so far, so good!")), 1230 ), 1231 importance = 0, 1232 type = "info", 1233 header = red(" @@ ") 1234 ) 1235 return True 1236 else: 1237 self.Entropy.updateProgress( 1238 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1239 blue(crippled_uri), 1240 darkgreen(str(tries)), 1241 blue(str(counter)), 1242 bold(str(maxcount)), 1243 blue(_("digest verification")), 1244 os.path.basename(local_filepath), 1245 darkred(_("invalid checksum")), 1246 ), 1247 importance = 0, 1248 type = "warning", 1249 header = brown(" @@ ") 1250 ) 1251 return False 1252 else: 1253 self.Entropy.updateProgress( 1254 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1255 blue(crippled_uri), 1256 darkgreen(str(tries)), 1257 blue(str(counter)), 1258 bold(str(maxcount)), 1259 blue(_("digest verification")), 1260 os.path.basename(local_filepath), 1261 darkred(_("unknown data returned")), 1262 ), 1263 importance = 0, 1264 type = "warning", 1265 header = brown(" @@ ") 1266 ) 1267 return valid_remote_md5
1268
1269 - def go(self):
1270 1271 broken_uris = set() 1272 fine_uris = set() 1273 errors = False 1274 action = 'upload' 1275 if self.download: 1276 action = 'download' 1277 elif self.remove: 1278 action = 'remove' 1279 1280 for uri in self.uris: 1281 1282 crippled_uri = self.entropyTools.extract_ftp_host_from_uri(uri) 1283 self.Entropy.updateProgress( 1284 "[%s|%s] %s..." % ( 1285 blue(crippled_uri), 1286 brown(action), 1287 blue(_("connecting to mirror")), 1288 ), 1289 importance = 0, 1290 type = "info", 1291 header = blue(" @@ ") 1292 ) 1293 try: 1294 ftp = self.FtpInterface(uri, self.Entropy) 1295 except ConnectionError: 1296 self.entropyTools.print_traceback() 1297 return True,fine_uris,broken_uris # issues 1298 branch = self.Entropy.SystemSettings['repositories']['branch'] 1299 my_path = os.path.join(self.Entropy.get_remote_database_relative_path(self.repo), branch) 1300 self.Entropy.updateProgress( 1301 "[%s|%s] %s %s..." % ( 1302 blue(crippled_uri), 1303 brown(action), 1304 blue(_("changing directory to")), 1305 darkgreen(my_path), 1306 ), 1307 importance = 0, 1308 type = "info", 1309 header = blue(" @@ ") 1310 ) 1311 1312 ftp.set_cwd(self.ftp_basedir, dodir = True) 1313 maxcount = len(self.myfiles) 1314 counter = 0 1315 1316 for mypath in self.myfiles: 1317 1318 ftp.set_basedir() 1319 ftp.set_cwd(self.ftp_basedir, dodir = True) 1320 1321 mycwd = None 1322 if isinstance(mypath,tuple): 1323 if len(mypath) < 2: continue 1324 mycwd = mypath[0] 1325 mypath = mypath[1] 1326 ftp.set_cwd(mycwd, dodir = True) 1327 1328 syncer = ftp.upload_file 1329 myargs = [mypath] 1330 if self.download: 1331 syncer = ftp.download_file 1332 myargs = [os.path.basename(mypath),self.local_basedir] 1333 elif self.remove: 1334 syncer = ftp.delete_file 1335 1336 counter += 1 1337 tries = 0 1338 done = False 1339 lastrc = None 1340 while tries < 5: 1341 tries += 1 1342 self.Entropy.updateProgress( 1343 "[%s|#%s|(%s/%s)] %s: %s" % ( 1344 blue(crippled_uri), 1345 darkgreen(str(tries)), 1346 blue(str(counter)), 1347 bold(str(maxcount)), 1348 blue(action+"ing"), 1349 red(os.path.basename(mypath)), 1350 ), 1351 importance = 0, 1352 type = "info", 1353 header = red(" @@ ") 1354 ) 1355 rc = syncer(*myargs) 1356 if rc and not self.download: 1357 # try with "SITE MD5 command first" 1358 # proftpd's mod_md5 supports it 1359 remote_md5 = ftp.get_file_md5(os.path.basename(mypath)) 1360 rc = self.handler_verify_upload(mypath, uri, 1361 counter, maxcount, tries, remote_md5 = remote_md5) 1362 if rc: 1363 self.Entropy.updateProgress( 1364 "[%s|#%s|(%s/%s)] %s %s: %s" % ( 1365 blue(crippled_uri), 1366 darkgreen(str(tries)), 1367 blue(str(counter)), 1368 bold(str(maxcount)), 1369 blue(action), 1370 _("successful"), 1371 red(os.path.basename(mypath)), 1372 ), 1373 importance = 0, 1374 type = "info", 1375 header = darkgreen(" @@ ") 1376 ) 1377 done = True 1378 break 1379 else: 1380 self.Entropy.updateProgress( 1381 "[%s|#%s|(%s/%s)] %s %s: %s" % ( 1382 blue(crippled_uri), 1383 darkgreen(str(tries)), 1384 blue(str(counter)), 1385 bold(str(maxcount)), 1386 blue(action), 1387 brown(_("failed, retrying")), 1388 red(os.path.basename(mypath)), 1389 ), 1390 importance = 0, 1391 type = "warning", 1392 header = brown(" @@ ") 1393 ) 1394 lastrc = rc 1395 continue 1396 1397 if not done: 1398 1399 self.Entropy.updateProgress( 1400 "[%s|(%s/%s)] %s %s: %s - %s: %s" % ( 1401 blue(crippled_uri), 1402 blue(str(counter)), 1403 bold(str(maxcount)), 1404 blue(action), 1405 darkred("failed, giving up"), 1406 red(os.path.basename(mypath)), 1407 _("error"), 1408 lastrc, 1409 ), 1410 importance = 1, 1411 type = "error", 1412 header = darkred(" !!! ") 1413 ) 1414 1415 if mypath not in self.critical_files: 1416 self.Entropy.updateProgress( 1417 "[%s|(%s/%s)] %s: %s, %s..." % ( 1418 blue(crippled_uri), 1419 blue(str(counter)), 1420 bold(str(maxcount)), 1421 blue(_("not critical")), 1422 os.path.basename(mypath), 1423 blue(_("continuing")), 1424 ), 1425 importance = 1, 1426 type = "warning", 1427 header = brown(" @@ ") 1428 ) 1429 continue 1430 1431 ftp.close() 1432 errors = True 1433 broken_uris.add((uri,lastrc)) 1434 # next mirror 1435 break 1436 1437 # close connection 1438 ftp.close() 1439 fine_uris.add(uri) 1440 1441 return errors,fine_uris,broken_uris
1442