Package entropy :: Module transceivers

Source Code for Module entropy.transceivers

   1  # -*- coding: utf-8 -*- 
   2  ''' 
   3      # DESCRIPTION: 
   4      # Entropy Object Oriented Interface 
   5   
   6      Copyright (C) 2007-2009 Fabio Erculiani 
   7   
   8      This program is free software; you can redistribute it and/or modify 
   9      it under the terms of the GNU General Public License as published by 
  10      the Free Software Foundation; either version 2 of the License, or 
  11      (at your option) any later version. 
  12   
  13      This program is distributed in the hope that it will be useful, 
  14      but WITHOUT ANY WARRANTY; without even the implied warranty of 
  15      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
  16      GNU General Public License for more details. 
  17   
  18      You should have received a copy of the GNU General Public License 
  19      along with this program; if not, write to the Free Software 
  20      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
  21  ''' 
  22  from __future__ import with_statement 
  23  import os 
  24  import urllib2 
  25  import time 
  26  from entropy.const import etpConst 
  27  from entropy.output import TextInterface, darkblue, darkred, purple, blue, \ 
  28      brown, darkgreen, red, bold 
  29  from entropy.exceptions import * 
  30  from entropy.i18n import _ 
  31  from entropy.misc import TimeScheduled, ParallelTask 
  32  from entropy.core import SystemSettings 
  33   
34 -class UrlFetcher:
35
36 - def __init__(self, url, path_to_save, checksum = True, 37 show_speed = True, resume = True, 38 abort_check_func = None, disallow_redirect = False, 39 thread_stop_func = None, speed_limit = None, 40 OutputInterface = None):
41 42 self.__system_settings = SystemSettings() 43 if speed_limit == None: 44 speed_limit = self.__system_settings['repositories']['transfer_limit'] 45 46 self.progress = None 47 import entropy.tools as entropyTools 48 import socket 49 self.entropyTools, self.socket = entropyTools, socket 50 self.__th_id = 0 51 self.__resume = resume 52 self.__url = self.__encode_url(url) 53 self.__path_to_save = path_to_save 54 self.__checksum = checksum 55 self.__show_speed = show_speed 56 self.__abort_check_func = abort_check_func 57 self.__thread_stop_func = thread_stop_func 58 self.__disallow_redirect = disallow_redirect 59 self.__speedlimit = speed_limit # kbytes/sec 60 self.__existed_before = False 61 self.localfile = None 62 63 # important to have this here too 64 self.__datatransfer = 0 65 self.__resumed = False 66 67 uname = os.uname() 68 self.user_agent = "Entropy/%s (compatible; %s; %s: %s %s %s)" % ( 69 etpConst['entropyversion'], 70 "Entropy", 71 os.path.basename(self.__url), 72 uname[0], 73 uname[4], 74 uname[2], 75 ) 76 self.__extra_header_data = {} 77 self.__Output = OutputInterface 78 if self.__Output == None: 79 self.__Output = TextInterface() 80 elif not hasattr(self.__Output,'updateProgress'): 81 mytxt = _("Output interface passed doesn't have the updateProgress method") 82 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,)) 83 elif not callable(self.__Output.updateProgress): 84 mytxt = _("Output interface passed doesn't have the updateProgress method") 85 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,))
86
87 - def _init_vars(self):
88 89 self.__resumed = False 90 self.__buffersize = 8192 91 self.__status = None 92 self.__remotefile = None 93 self.__downloadedsize = 0 94 self.__average = 0 95 self.__remotesize = 0 96 self.__oldaverage = 0.0 97 # transfer status data 98 self.__startingposition = 0 99 self.__datatransfer = 0 100 self.__time_remaining = "(infinite)" 101 self.__time_remaining_secs = 0 102 self.__elapsed = 0.0 103 self.__updatestep = 0.2 104 self.__transferpollingtime = float(1)/4 105 self.__existed_before = False 106 if os.path.lexists(self.__path_to_save): 107 self.__existed_before = True 108 self.__setup_resume_support() 109 self._setup_proxy()
110
111 - def __setup_resume_support(self):
112 113 # if client uses this instance more than 114 # once, make sure we close previously opened 115 # files. 116 if isinstance(self.localfile, file): 117 try: 118 self.localfile.flush() 119 self.localfile.close() 120 except (IOError, OSError,): 121 pass 122 123 # resume support 124 if os.path.isfile(self.__path_to_save) and \ 125 os.access(self.__path_to_save,os.W_OK) and self.__resume: 126 127 self.localfile = open(self.__path_to_save,"awb") 128 self.localfile.seek(0,os.SEEK_END) 129 self.__startingposition = int(self.localfile.tell()) 130 self.__resumed = True 131 132 elif os.path.lexists(self.__path_to_save) and not \ 133 self.entropyTools.is_valid_path(self.__path_to_save): 134 try: 135 os.remove(self.__path_to_save) 136 except OSError: # I won't stop you here 137 pass 138 self.localfile = open(self.__path_to_save,"wb")
139
140 - def _setup_proxy(self):
141 # setup proxy, doing here because config is dynamic 142 mydict = {} 143 proxy_data = self.__system_settings['system']['proxy'] 144 if proxy_data['ftp']: 145 mydict['ftp'] = proxy_data['ftp'] 146 if proxy_data['http']: 147 mydict['http'] = proxy_data['http'] 148 if mydict: 149 mydict['username'] = proxy_data['username'] 150 mydict['password'] = proxy_data['password'] 151 self.entropyTools.add_proxy_opener(urllib2, mydict) 152 else: 153 # unset 154 urllib2._opener = None
155
156 - def __encode_url(self, url):
157 import urllib 158 url = os.path.join(os.path.dirname(url), 159 urllib.quote(os.path.basename(url))) 160 return url
161
162 - def set_id(self, th_id):
163 self.__th_id = th_id
164
165 - def download(self):
166 167 self._init_vars() 168 self.speedUpdater = TimeScheduled( 169 self.__transferpollingtime, 170 self.__update_speed, 171 ) 172 self.speedUpdater.start() 173 # set timeout 174 self.socket.setdefaulttimeout(20) 175 176 if self.__url.startswith("http://"): 177 headers = { 'User-Agent' : self.user_agent } 178 req = urllib2.Request(self.__url, self.__extra_header_data, headers) 179 else: 180 req = self.__url 181 182 u_agent_error = False 183 while 1: 184 # get file size if available 185 try: 186 self.__remotefile = urllib2.urlopen(req) 187 except KeyboardInterrupt: 188 self.__close(False) 189 raise 190 except urllib2.HTTPError, e: 191 if (e.code == 405) and not u_agent_error: 192 # server doesn't like our user agent 193 req = self.__url 194 u_agent_error = True 195 continue 196 self.__close(True) 197 self.__status = "-3" 198 return self.__status 199 except: 200 self.__close(True) 201 self.__status = "-3" 202 return self.__status 203 break 204 205 try: 206 self.__remotesize = int(self.__remotefile.headers.get( 207 "content-length")) 208 self.__remotefile.close() 209 except KeyboardInterrupt: 210 self.__close(False) 211 raise 212 except: 213 pass 214 215 # handle user stupidity 216 try: 217 request = self.__url 218 if ((self.__startingposition > 0) and (self.__remotesize > 0)) \ 219 and (self.__startingposition < self.__remotesize): 220 221 try: 222 request = urllib2.Request( 223 self.__url, 224 headers = { 225 "Range" : "bytes=" + \ 226 str(self.__startingposition) + "-" + \ 227 str(self.__remotesize) 228 } 229 ) 230 except KeyboardInterrupt: 231 self.__close(False) 232 raise 233 except: 234 pass 235 elif (self.__startingposition == self.__remotesize): 236 # all fine then! 237 self.__close(False) 238 return self.__prepare_return() 239 else: 240 self.localfile = open(self.__path_to_save,"wb") 241 self.__remotefile = urllib2.urlopen(request) 242 except KeyboardInterrupt: 243 self.__close(False) 244 raise 245 except: 246 self.__close(True) 247 self.__status = "-3" 248 return self.__status 249 250 if self.__remotesize > 0: 251 self.__remotesize = float(int(self.__remotesize))/1024 252 253 if self.__disallow_redirect and \ 254 (self.__url != self.__remotefile.geturl()): 255 256 self.__close(True) 257 self.__status = "-3" 258 return self.__status 259 260 while 1: 261 try: 262 rsx = self.__remotefile.read(self.__buffersize) 263 if rsx == '': break 264 if self.__abort_check_func != None: 265 self.__abort_check_func() 266 if self.__thread_stop_func != None: 267 self.__thread_stop_func() 268 except KeyboardInterrupt: 269 self.__close(False) 270 raise 271 except self.socket.timeout: 272 self.__close(False) 273 self.__status = "-4" 274 return self.__status 275 except: 276 # python 2.4 timeouts go here 277 self.__close(True) 278 self.__status = "-3" 279 return self.__status 280 self.__commit(rsx) 281 if self.__show_speed: 282 self.handle_statistics(self.__th_id, self.__downloadedsize, 283 self.__remotesize, self.__average, self.__oldaverage, 284 self.__updatestep, self.__show_speed, self.__datatransfer, 285 self.__time_remaining, self.__time_remaining_secs 286 ) 287 self.updateProgress() 288 self.__oldaverage = self.__average 289 if self.__speedlimit: 290 while self.__datatransfer > self.__speedlimit*1024: 291 time.sleep(0.1) 292 if self.__show_speed: 293 self.updateProgress() 294 self.__oldaverage = self.__average 295 296 # kill thread 297 self.__close(False) 298 return self.__prepare_return()
299 300
301 - def __prepare_return(self):
302 if self.__checksum: 303 self.__status = self.entropyTools.md5sum(self.__path_to_save) 304 return self.__status 305 self.__status = "-2" 306 return self.__status
307
308 - def __commit(self, mybuffer):
309 # writing file buffer 310 self.localfile.write(mybuffer) 311 # update progress info 312 self.__downloadedsize = self.localfile.tell() 313 kbytecount = float(self.__downloadedsize)/1024 314 self.__average = int((kbytecount/self.__remotesize)*100)
315
316 - def __close(self, errored):
317 try: 318 if isinstance(self.localfile, file): 319 self.localfile.flush() 320 self.localfile.close() 321 except IOError: 322 pass 323 if (not self.__existed_before) and errored: 324 try: 325 os.remove(self.__path_to_save) 326 except OSError: 327 pass 328 try: 329 self.__remotefile.close() 330 except: 331 pass 332 self.speedUpdater.kill() 333 self.socket.setdefaulttimeout(2)
334
335 - def __update_speed(self):
336 self.__elapsed += self.__transferpollingtime 337 # we have the diff size 338 x_delta = self.__downloadedsize - self.__startingposition 339 self.__datatransfer = x_delta / self.__elapsed 340 if self.__datatransfer < 0: 341 self.__datatransfer = 0 342 try: 343 rounded_remote = int(round(self.__remotesize*1024,0)) 344 rounded_downloaded = int(round(self.__downloadedsize,0)) 345 x_delta = rounded_remote - rounded_downloaded 346 tx_round = int(round(x_delta/self.__datatransfer,0)) 347 self.__time_remaining_secs = tx_round 348 self.__time_remaining = self.entropyTools.convert_seconds_to_fancy_output(self.__time_remaining_secs) 349 except: 350 self.__time_remaining = "(%s)" % (_("infinite"),)
351
352 - def get_transfer_rate(self):
353 return self.__datatransfer
354
355 - def is_resumed(self):
356 return self.__resumed
357
358 - def handle_statistics(self, th_id, downloaded_size, total_size, 359 average, old_average, update_step, show_speed, data_transfer, 360 time_remaining, time_remaining_secs):
361 return
362
363 - def updateProgress(self):
364 365 mytxt = _("[F]") 366 eta_txt = _("ETA") 367 sec_txt = _("sec") # as in XX kb/sec 368 369 currentText = darkred(" %s: " % (mytxt,)) + \ 370 darkgreen(str(round(float(self.__downloadedsize)/1024,1))) + "/" + \ 371 red(str(round(self.__remotesize,1))) + " kB" 372 # create progress bar 373 barsize = 10 374 bartext = "[" 375 curbarsize = 1 376 if self.__average > self.__oldaverage+self.__updatestep: 377 averagesize = (self.__average*barsize)/100 378 while averagesize > 0: 379 curbarsize += 1 380 bartext += "=" 381 averagesize -= 1 382 bartext += ">" 383 diffbarsize = barsize-curbarsize 384 while diffbarsize > 0: 385 bartext += " " 386 diffbarsize -= 1 387 if self.__show_speed: 388 bartext += "] => %s" % (self.entropyTools.bytes_into_human(self.__datatransfer),) 389 bartext += "/%s : %s: %s" % (sec_txt,eta_txt,self.__time_remaining,) 390 else: 391 bartext += "]" 392 average = str(self.__average) 393 if len(average) < 2: 394 average = " "+average 395 currentText += " <-> "+average+"% "+bartext 396 self.__Output.updateProgress(currentText, back = True)
397
398 -class MultipleUrlFetcher:
399 400 import entropy.tools as entropyTools
401 - def __init__(self, url_path_list, checksum = True, 402 show_speed = True, resume = True, 403 abort_check_func = None, disallow_redirect = False, 404 OutputInterface = None, UrlFetcherClass = None):
405 """ 406 @param url_path_list list [(url,path_to_save,),...] 407 @param checksum bool return checksum data 408 @param show_speed bool show transfer speed on the output 409 @param resume bool enable resume support 410 @param abort_check_func callable function that could 411 raise exception and stop transfer 412 @param disallow_redirect bool disable automatic HTTP redirect 413 @param OutputInterface TextInterface instance used to 414 print instance output through a common interface 415 @param UrlFetcherClass, urlFetcher instance/interface used 416 """ 417 self.__system_settings = SystemSettings() 418 self.__url_path_list = url_path_list 419 self.__resume = resume 420 self.__checksum = checksum 421 self.__show_speed = show_speed 422 self.__abort_check_func = abort_check_func 423 self.__disallow_redirect = disallow_redirect 424 425 # important to have a declaration here 426 self.__data_transfer = 0 427 self.__average = 0 428 self.__old_average = 0 429 self.__time_remaining_sec = 0 430 431 self.__Output = OutputInterface 432 if self.__Output == None: 433 self.__Output = TextInterface() 434 elif not hasattr(self.__Output,'updateProgress'): 435 mytxt = _("Output interface passed doesn't have the updateProgress method") 436 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,)) 437 elif not callable(self.__Output.updateProgress): 438 mytxt = _("Output interface passed doesn't have the updateProgress method") 439 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,)) 440 441 self.__url_fetcher = UrlFetcherClass 442 if self.__url_fetcher == None: 443 self.__url_fetcher = UrlFetcher
444 445
446 - def __handle_threads_stop(self):
447 if self.__stop_threads: 448 raise InterruptError
449
450 - def _init_vars(self):
451 self.__progress_data = {} 452 self.__thread_pool = {} 453 self.__download_statuses = {} 454 self.__show_progress = False 455 self.__stop_threads = False 456 self.__first_refreshes = 50 457 self.__data_transfer = 0 458 self.__average = 0 459 self.__old_average = 0 460 self.__time_remaining_sec = 0
461
462 - def download(self):
463 self._init_vars() 464 465 th_id = 0 466 speed_limit = 0 467 dsl = self.__system_settings['repositories']['transfer_limit'] 468 if isinstance(dsl,int) and self.__url_path_list: 469 speed_limit = dsl/len(self.__url_path_list) 470 471 class MyFetcher(self.__url_fetcher): 472 473 def __init__(self, klass, multiple, *args, **kwargs): 474 klass.__init__(self, *args, **kwargs) 475 self.__multiple_fetcher = multiple
476 477 def updateProgress(self): 478 return self.__multiple_fetcher.updateProgress()
479 480 def handle_statistics(self, *args, **kwargs): 481 return self.__multiple_fetcher.handle_statistics(*args, 482 **kwargs) 483 484 for url, path_to_save in self.__url_path_list: 485 th_id += 1 486 downloader = MyFetcher(self.__url_fetcher, self, url, path_to_save, 487 checksum = self.__checksum, show_speed = self.__show_speed, 488 resume = self.__resume, 489 abort_check_func = self.__abort_check_func, 490 disallow_redirect = self.__disallow_redirect, 491 thread_stop_func = self.__handle_threads_stop, 492 speed_limit = speed_limit, 493 OutputInterface = self.__Output 494 ) 495 downloader.set_id(th_id) 496 497 def do_download(ds, th_id, downloader): 498 ds[th_id] = downloader.download() 499 500 t = ParallelTask(do_download, self.__download_statuses, th_id, downloader) 501 self.__thread_pool[th_id] = t 502 t.start() 503 self.show_download_files_info() 504 self.__show_progress = True 505 while len(self.__url_path_list) != len(self.__download_statuses): 506 try: 507 time.sleep(0.5) 508 except (SystemExit, KeyboardInterrupt,): 509 self.__stop_threads = True 510 raise 511 512 return self.__download_statuses 513
514 - def get_data_transfer(self):
515 return self.__data_transfer
516
517 - def get_average(self):
518 return self.__average
519
520 - def get_seconds_remaining(self):
521 return self.__time_remaining_sec
522
523 - def show_download_files_info(self):
524 count = 0 525 pl = self.__url_path_list[:] 526 self.__Output.updateProgress( 527 "%s: %s %s" % ( 528 darkblue(_("Aggregated download")), 529 darkred(str(len(pl))), 530 darkblue(_("items")), 531 ), 532 importance = 0, 533 type = "info", 534 header = purple(" ## ") 535 ) 536 for url, save_path in pl: 537 count += 1 538 fname = os.path.basename(url) 539 uri = self.entropyTools.spliturl(url)[1] 540 self.__Output.updateProgress( 541 "[%s] %s => %s" % ( 542 darkblue(str(count)), 543 darkgreen(uri), 544 blue(fname), 545 ), 546 importance = 0, 547 type = "info", 548 header = brown(" # ") 549 )
550
551 - def handle_statistics(self, th_id, downloaded_size, total_size, 552 average, old_average, update_step, show_speed, data_transfer, 553 time_remaining, time_remaining_secs):
554 data = { 555 'th_id': th_id, 556 'downloaded_size': downloaded_size, 557 'total_size': total_size, 558 'average': average, 559 'old_average': old_average, 560 'update_step': update_step, 561 'show_speed': show_speed, 562 'data_transfer': data_transfer, 563 'time_remaining': time_remaining, 564 'time_remaining_secs': time_remaining_secs, 565 } 566 self.__progress_data[th_id] = data
567
568 - def updateProgress(self):
569 570 eta_txt = _("ETA") 571 sec_txt = _("sec") # as in XX kb/sec 572 downloaded_size = 0 573 total_size = 0 574 time_remaining = 0 575 data_transfer = 0 576 update_step = 0 577 average = 100 578 pd = self.__progress_data.copy() 579 pdlen = len(pd) 580 581 # calculation 582 for th_id in sorted(pd): 583 data = pd.get(th_id) 584 downloaded_size += data.get('downloaded_size',0) 585 total_size += data.get('total_size',0) 586 data_transfer += data.get('data_transfer',0) 587 tr = data.get('time_remaining_secs',0) 588 if tr > 0: time_remaining += tr 589 update_step += data.get('update_step',0) 590 591 # total_size is in kbytes 592 # downloaded_size is in bytes 593 if total_size > 0: 594 average = int(float(downloaded_size/1024)/total_size * 100) 595 self.__data_transfer = data_transfer 596 self.__average = average 597 update_step = update_step/pdlen 598 self.__time_remaining_sec = time_remaining 599 time_remaining = self.entropyTools.convert_seconds_to_fancy_output(time_remaining) 600 601 if ((average > self.__old_average+update_step) or \ 602 (self.__first_refreshes > 0)) and self.__show_progress: 603 604 self.__first_refreshes -= 1 605 currentText = darkgreen(str(round(float(downloaded_size)/1024,1))) + "/" + \ 606 red(str(round(total_size,1))) + " kB" 607 # create progress bar 608 barsize = 10 609 bartext = "[" 610 curbarsize = 1 611 averagesize = (average*barsize)/100 612 while averagesize > 0: 613 curbarsize += 1 614 bartext += "=" 615 averagesize -= 1 616 bartext += ">" 617 diffbarsize = barsize-curbarsize 618 while diffbarsize > 0: 619 bartext += " " 620 diffbarsize -= 1 621 if self.__show_speed: 622 bartext += "] => %s" % (self.entropyTools.bytes_into_human(data_transfer),) 623 bartext += "/%s : %s: %s" % (sec_txt,eta_txt,time_remaining,) 624 else: 625 bartext += "]" 626 myavg = str(average) 627 if len(myavg) < 2: 628 myavg = " "+myavg 629 currentText += " <-> "+myavg+"% "+bartext+" " 630 self.__Output.updateProgress(currentText, back = True) 631 632 self.__old_average = average
633 634
635 -class FtpInterface:
636 637 # this must be run before calling the other functions
638 - def __init__(self, ftpuri, OutputInterface, verbose = True):
639 640 if not hasattr(OutputInterface,'updateProgress'): 641 mytxt = _("OutputInterface does not have an updateProgress method") 642 raise IncorrectParameter("IncorrectParameter: %s, (! %s !)" % (OutputInterface,mytxt,)) 643 elif not callable(OutputInterface.updateProgress): 644 mytxt = _("OutputInterface does not have an updateProgress method") 645 raise IncorrectParameter("IncorrectParameter: %s, (! %s !)" % (OutputInterface,mytxt,)) 646 647 import socket, ftplib 648 import entropy.tools as entropyTools 649 self.socket, self.ftplib, self.entropyTools = socket, ftplib, entropyTools 650 self.Entropy = OutputInterface 651 self.__verbose = verbose 652 self.__init_vars() 653 self.socket.setdefaulttimeout(60) 654 self.__ftpuri = ftpuri 655 self.__speed_updater = None 656 self.__currentdir = '.' 657 self.__ftphost = self.entropyTools.extract_ftp_host_from_uri(self.__ftpuri) 658 self.__ftpuser, self.__ftppassword, self.__ftpport, self.__ftpdir = self.entropyTools.extract_ftp_data(ftpuri) 659 660 count = 10 661 while 1: 662 count -= 1 663 try: 664 self.__ftpconn = self.ftplib.FTP(self.__ftphost) 665 break 666 except (self.socket.gaierror,), e: 667 raise ConnectionError('ConnectionError: %s' % (e,)) 668 except (self.socket.error,), e: 669 if not count: 670 raise ConnectionError('ConnectionError: %s' % (e,)) 671 continue 672 except: 673 if not count: raise 674 continue 675 676 if self.__verbose: 677 mytxt = _("connecting with user") 678 self.Entropy.updateProgress( 679 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpuser),), 680 importance = 1, 681 type = "info", 682 header = darkgreen(" * ") 683 ) 684 try: 685 self.__ftpconn.login(self.__ftpuser,self.__ftppassword) 686 except self.ftplib.error_perm, e: 687 raise FtpError('FtpError: %s' % (e,)) 688 if self.__verbose: 689 mytxt = _("switching to") 690 self.Entropy.updateProgress( 691 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpdir),), 692 importance = 1, 693 type = "info", 694 header = darkgreen(" * ") 695 ) 696 self.set_cwd(self.__ftpdir, dodir = True)
697
698 - def __init_vars(self):
699 self.__oldprogress = 0.0 700 self.__filesize = 0 701 self.__filekbcount = 0 702 self.__transfersize = 0 703 self.__startingposition = 0 704 self.__elapsed = 0.0 705 self.__time_remaining_secs = 0 706 self.__time_remaining = "(%s)" % (_("infinite"),) 707 self.__transferpollingtime = float(1)/4
708
709 - def set_basedir(self):
710 return self.set_cwd(self.__ftpdir)
711 712 # this can be used in case of exceptions
713 - def reconnect_host(self):
714 # import FTP modules 715 self.socket.setdefaulttimeout(60) 716 counter = 10 717 while 1: 718 counter -= 1 719 try: 720 self.__ftpconn = self.ftplib.FTP(self.__ftphost) 721 break 722 except: 723 if not counter: 724 raise 725 continue 726 if self.__verbose: 727 mytxt = _("reconnecting with user") 728 self.Entropy.updateProgress( 729 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpuser),), 730 importance = 1, 731 type = "info", 732 header = darkgreen(" * ") 733 ) 734 self.__ftpconn.login(self.__ftpuser,self.__ftppassword) 735 if self.__verbose: 736 mytxt = _("switching to") 737 self.Entropy.updateProgress( 738 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpdir),), 739 importance = 1, 740 type = "info", 741 header = darkgreen(" * ") 742 ) 743 self.set_cwd(self.__currentdir)
744
745 - def get_host(self):
746 return self.__ftphost
747
748 - def get_port(self):
749 return self.__ftpport
750
751 - def get_dir(self):
752 return self.__ftpdir
753
754 - def get_cwd(self):
755 pwd = self.__ftpconn.pwd() 756 return pwd
757
758 - def set_cwd(self, mydir, dodir = False):
759 try: 760 return self._set_cwd(mydir, dodir) 761 except self.ftplib.error_perm, e: 762 raise FtpError('FtpError: %s' % (e,))
763
764 - def _set_cwd(self, mydir, dodir = False):
765 if self.__verbose: 766 mytxt = _("switching to") 767 self.Entropy.updateProgress( 768 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(mydir),), 769 importance = 1, 770 type = "info", 771 header = darkgreen(" * ") 772 ) 773 try: 774 self.__ftpconn.cwd(mydir) 775 except self.ftplib.error_perm, e: 776 if e[0][:3] == '550' and dodir: 777 self.recursive_mkdir(mydir) 778 self.__ftpconn.cwd(mydir) 779 else: 780 raise 781 self.__currentdir = self.get_cwd()
782
783 - def set_pasv(self,bool):
784 self.__ftpconn.set_pasv(bool)
785
786 - def set_chmod(self,chmodvalue,file):
787 return self.__ftpconn.voidcmd("SITE CHMOD "+str(chmodvalue)+" "+str(file))
788
789 - def get_file_mtime(self,path):
790 rc = self.__ftpconn.sendcmd("mdtm "+path) 791 return rc.split()[-1]
792
793 - def send_cmd(self,cmd):
794 return self.__ftpconn.sendcmd(cmd)
795
796 - def list_dir(self):
797 return [os.path.basename(x) for x in self.__ftpconn.nlst()]
798
799 - def is_file_available(self, filename):
800 xx = [] 801 def cb(x): 802 if x == filename: xx.append(x)
803 self.__ftpconn.retrlines('NLST',cb) 804 if xx: return True 805 return False
806
807 - def delete_file(self,file):
808 try: 809 rc = self.__ftpconn.delete(file) 810 except self.ftplib.error_perm, e: 811 if e[0][:3] == '550': 812 return True 813 return False # not found 814 if rc.startswith("250"): 815 return True 816 return False
817
818 - def recursive_mkdir(self, mypath):
819 mydirs = [x for x in mypath.split("/") if x] 820 mycurpath = "" 821 for mydir in mydirs: 822 mycurpath = os.path.join(mycurpath,mydir) 823 if not self.is_file_available(mycurpath): 824 try: 825 self.mkdir(mycurpath) 826 except self.ftplib.error_perm, e: 827 if e[0].lower().find("permission denied") != -1: 828 raise 829 elif e[0][:3] != '550': 830 raise
831
832 - def mkdir(self,directory):
833 return self.__ftpconn.mkd(directory)
834
835 - def upload_file(self, file, ascii = False):
836 837 # this function also supports callback, because storbinary doesn't 838 def advanced_stor(cmd, fp): 839 ''' Store a file in binary mode. Our version supports a callback function''' 840 self.__ftpconn.voidcmd('TYPE I') 841 conn = self.__ftpconn.transfercmd(cmd) 842 while 1: 843 buf = fp.readline() 844 if not buf: 845 break 846 conn.sendall(buf) 847 self.updateProgress(len(buf)) 848 conn.close() 849 850 # that's another workaround 851 #return "226" 852 try: 853 rc = self.__ftpconn.voidresp() 854 except: 855 self.reconnect_host() 856 return "226" 857 return rc
858 859 tries = 0 860 while tries < 10: 861 862 tries += 1 863 filename = os.path.basename(file) 864 self.__init_vars() 865 self.__start_speed_counter() 866 try: 867 868 with open(file,"r") as f: 869 870 self.__filesize = round(float(self.entropyTools.get_file_size(file))/1024,1) 871 self.__filekbcount = 0 872 873 # delete old one, if exists 874 self.delete_file(filename+".tmp") 875 876 if ascii: 877 rc = self.__ftpconn.storlines("STOR "+filename+".tmp",f) 878 else: 879 rc = advanced_stor("STOR "+filename+".tmp", f) 880 881 # now we can rename the file with its original name 882 self.rename_file(filename+".tmp",filename) 883 884 if rc.find("226") != -1: # upload complete 885 return True 886 return False 887 888 except Exception, e: # connection reset by peer 889 890 self.entropyTools.print_traceback() 891 mytxt = red("%s: %s, %s... #%s") % ( 892 _("Upload issue"), 893 e, 894 _("retrying"), 895 tries+1, 896 ) 897 self.Entropy.updateProgress( 898 mytxt, 899 importance = 1, 900 type = "warning", 901 header = " " 902 ) 903 self.reconnect_host() # reconnect 904 self.delete_file(filename) 905 self.delete_file(filename+".tmp") 906 907 finally: 908 self.__stop_speed_counter() 909
910 - def download_file(self, filename, downloaddir, ascii = False):
911 912 def df_up(buf): 913 # writing file buffer 914 f.write(buf) 915 self.updateProgress(len(buf))
916 917 tries = 10 918 while tries: 919 tries -= 1 920 921 self.__init_vars() 922 self.__start_speed_counter() 923 try: 924 925 self.__filekbcount = 0 926 # get the file size 927 self.__filesize = self.get_file_size_compat(filename) 928 if (self.__filesize): 929 self.__filesize = round(float(int(self.__filesize))/1024,1) 930 if (self.__filesize == 0): 931 self.__filesize = 1 932 elif not self.is_file_available(filename): 933 return False 934 else: 935 self.__filesize = 0 936 if not ascii: 937 f = open(downloaddir+"/"+filename,"wb") 938 rc = self.__ftpconn.retrbinary('RETR '+filename, df_up, 1024) 939 else: 940 f = open(downloaddir+"/"+filename,"w") 941 rc = self.__ftpconn.retrlines('RETR '+filename, f.write) 942 f.flush() 943 f.close() 944 if rc.find("226") != -1: # upload complete 945 return True 946 return False 947 948 except Exception, e: # connection reset by peer 949 950 self.entropyTools.print_traceback() 951 mytxt = red("%s: %s, %s... #%s") % ( 952 _("Download issue"), 953 e, 954 _("retrying"), 955 tries+1, 956 ) 957 self.Entropy.updateProgress( 958 mytxt, 959 importance = 1, 960 type = "warning", 961 header = " " 962 ) 963 self.reconnect_host() # reconnect 964 965 finally: 966 self.__stop_speed_counter() 967 968 # also used to move files
969 - def rename_file(self, fromfile, tofile):
970 rc = self.__ftpconn.rename(fromfile,tofile) 971 return rc
972
973 - def get_file_md5(self, filename):
974 # PROFTPD with mod_md5 supports it! 975 try: 976 rc_data = self.__ftpconn.sendcmd("SITE MD5 %s" % (filename,)) 977 except self.ftplib.error_perm: 978 return None # not supported 979 try: 980 return rc_data.split("\n")[0].split("\t")[0].split("-")[1] 981 except (IndexError, TypeError,): # wrong output 982 return None
983
984 - def get_file_size(self, filename):
985 return self.__ftpconn.size(filename)
986
987 - def get_file_size_compat(self, filename):
988 try: 989 data = [x.split() for x in self.__ftpconn.sendcmd("stat %s" % (filename,)).split("\n")] 990 except self.ftplib.error_temp: 991 return "" 992 for item in data: 993 if item[-1] == filename: 994 return item[4] 995 return ""
996
997 - def get_raw_list(self):
998 mybuffer = [] 999 def bufferizer(buf): 1000 mybuffer.append(buf)
1001 self.__ftpconn.dir(bufferizer) 1002 return mybuffer 1003
1004 - def close(self):
1005 try: 1006 self.__ftpconn.quit() 1007 except (EOFError,AttributeError,self.socket.timeout,self.ftplib.error_reply,): 1008 # AttributeError is raised when socket gets trashed 1009 # EOFError is raised when the connection breaks 1010 # timeout, who cares! 1011 pass
1012
1013 - def __start_speed_counter(self):
1014 self.__speed_updater = TimeScheduled( 1015 self.__transferpollingtime, 1016 self.__update_speed, 1017 ) 1018 self.__speed_updater.start()
1019
1020 - def __stop_speed_counter(self):
1021 if self.__speed_updater != None: 1022 self.__speed_updater.kill()
1023
1024 - def __update_speed(self):
1025 self.__elapsed += self.__transferpollingtime 1026 # we have the diff size 1027 self.__datatransfer = (self.__transfersize-self.__startingposition) / self.__elapsed 1028 if self.__datatransfer < 0: 1029 self.__datatransfer = 0 1030 try: 1031 self.__time_remaining_secs = int(round((int(round(self.__filesize*1024,0))-int(round(self.__transfersize,0)))/self.__datatransfer,0)) 1032 self.__time_remaining = self.entropyTools.convert_seconds_to_fancy_output(self.__time_remaining_secs) 1033 except: 1034 self.__time_remaining = "(%s)" % (_("infinite"),)
1035
1036 - def updateProgress(self, buf_len):
1037 # get the buffer size 1038 self.__filekbcount += float(buf_len)/1024 1039 self.__transfersize += buf_len 1040 # create percentage 1041 myUploadPercentage = 100.0 1042 if self.__filesize >= 1: 1043 myUploadPercentage = round((round(self.__filekbcount,1)/self.__filesize)*100,1) 1044 currentprogress = myUploadPercentage 1045 myUploadSize = round(self.__filekbcount,1) 1046 if (currentprogress > self.__oldprogress+1.0) and \ 1047 (myUploadPercentage < 100.1) and \ 1048 (myUploadSize <= self.__filesize): 1049 1050 myUploadPercentage = str(myUploadPercentage)+"%" 1051 # create text 1052 mytxt = _("Transfer status") 1053 currentText = brown(" <-> %s: " % (mytxt,)) + \ 1054 darkgreen(str(myUploadSize)) + "/" + red(str(self.__filesize)) + " kB " + \ 1055 brown("[") + str(myUploadPercentage) + brown("]") + " " + self.__time_remaining + \ 1056 " " + self.entropyTools.bytes_into_human(self.__datatransfer) + "/"+_("sec") 1057 # WARN: re-enabled updateProgress, this may cause slowdowns 1058 # print_info(currentText, back = True) 1059 self.Entropy.updateProgress(currentText, back = True) 1060 self.__oldprogress = currentprogress
1061 1062
1063 -class FtpServerHandler:
1064 1065 import entropy.tools as entropyTools
1066 - def __init__(self, ftp_interface, entropy_interface, uris, files_to_upload, 1067 download = False, remove = False, ftp_basedir = None, local_basedir = None, 1068 critical_files = [], use_handlers = False, handlers_data = {}, repo = None):
1069 1070 self.FtpInterface = ftp_interface 1071 self.Entropy = entropy_interface 1072 if not isinstance(uris,list): 1073 raise InvalidDataType("InvalidDataType: %s" % (_("uris must be a list instance"),)) 1074 if not isinstance(files_to_upload,(list,dict)): 1075 raise InvalidDataType("InvalidDataType: %s" % ( 1076 _("files_to_upload must be a list or dict instance"), 1077 ) 1078 ) 1079 self.uris = uris 1080 if isinstance(files_to_upload,list): 1081 self.myfiles = files_to_upload[:] 1082 else: 1083 self.myfiles = sorted([x for x in files_to_upload]) 1084 self.download = download 1085 self.remove = remove 1086 self.repo = repo 1087 if self.repo == None: 1088 self.repo = self.Entropy.default_repository 1089 self.use_handlers = use_handlers 1090 if self.remove: 1091 self.download = False 1092 self.use_handlers = False 1093 if not ftp_basedir: 1094 # default to database directory 1095 branch = self.Entropy.SystemSettings['repositories']['branch'] 1096 my_path = os.path.join(self.Entropy.get_remote_database_relative_path(repo), branch) 1097 self.ftp_basedir = unicode(my_path) 1098 else: 1099 self.ftp_basedir = unicode(ftp_basedir) 1100 if not local_basedir: 1101 # default to database directory 1102 self.local_basedir = os.path.dirname(self.Entropy.get_local_database_file(self.repo)) 1103 else: 1104 self.local_basedir = unicode(local_basedir) 1105 self.critical_files = critical_files 1106 self.handlers_data = handlers_data.copy()
1107
1108 - def handler_verify_upload(self, local_filepath, uri, counter, maxcount, 1109 tries, remote_md5 = None):
1110 1111 crippled_uri = self.entropyTools.extract_ftp_host_from_uri(uri) 1112 1113 self.Entropy.updateProgress( 1114 "[%s|#%s|(%s/%s)] %s: %s" % ( 1115 blue(crippled_uri), 1116 darkgreen(str(tries)), 1117 blue(str(counter)), 1118 bold(str(maxcount)), 1119 darkgreen(_("verifying upload (if supported)")), 1120 blue(os.path.basename(local_filepath)), 1121 ), 1122 importance = 0, 1123 type = "info", 1124 header = red(" @@ "), 1125 back = True 1126 ) 1127 1128 valid_remote_md5 = True 1129 # if remote server supports MD5 commands, remote_md5 is filled 1130 if isinstance(remote_md5, basestring): 1131 valid_md5 = self.entropyTools.is_valid_md5(remote_md5) 1132 ckres = False 1133 if valid_md5: # seems valid 1134 ckres = self.entropyTools.compare_md5(local_filepath, 1135 remote_md5) 1136 if ckres: 1137 self.Entropy.updateProgress( 1138 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1139 blue(crippled_uri), 1140 darkgreen(str(tries)), 1141 blue(str(counter)), 1142 bold(str(maxcount)), 1143 blue(_("digest verification")), 1144 os.path.basename(local_filepath), 1145 darkgreen(_("so far, so good!")), 1146 ), 1147 importance = 0, 1148 type = "info", 1149 header = red(" @@ ") 1150 ) 1151 return True 1152 # ouch! 1153 elif not valid_md5: 1154 # mmmh... malformed md5, try with handlers 1155 self.Entropy.updateProgress( 1156 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1157 blue(crippled_uri), 1158 darkgreen(str(tries)), 1159 blue(str(counter)), 1160 bold(str(maxcount)), 1161 blue(_("digest verification")), 1162 os.path.basename(local_filepath), 1163 bold(_("malformed md5 provided to function")), 1164 ), 1165 importance = 0, 1166 type = "warning", 1167 header = brown(" @@ ") 1168 ) 1169 else: # it's really bad! 1170 self.Entropy.updateProgress( 1171 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1172 blue(crippled_uri), 1173 darkgreen(str(tries)), 1174 blue(str(counter)), 1175 bold(str(maxcount)), 1176 blue(_("digest verification")), 1177 os.path.basename(local_filepath), 1178 bold(_("remote md5 is invalid")), 1179 ), 1180 importance = 0, 1181 type = "warning", 1182 header = brown(" @@ ") 1183 ) 1184 valid_remote_md5 = False 1185 1186 if not self.use_handlers: 1187 # handlers usage is disabled 1188 return valid_remote_md5 # always valid 1189 1190 checksum = self.Entropy.get_remote_package_checksum( 1191 self.repo, 1192 os.path.basename(local_filepath), 1193 self.handlers_data['branch'] 1194 ) 1195 if checksum == None: 1196 self.Entropy.updateProgress( 1197 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1198 blue(crippled_uri), 1199 darkgreen(str(tries)), 1200 blue(str(counter)), 1201 bold(str(maxcount)), 1202 blue(_("digest verification")), 1203 os.path.basename(local_filepath), 1204 darkred(_("not supported")), 1205 ), 1206 importance = 0, 1207 type = "info", 1208 header = red(" @@ ") 1209 ) 1210 return valid_remote_md5 1211 elif isinstance(checksum, bool) and not checksum: 1212 self.Entropy.updateProgress( 1213 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1214 blue(crippled_uri), 1215 darkgreen(str(tries)), 1216 blue(str(counter)), 1217 bold(str(maxcount)), 1218 blue(_("digest verification")), 1219 os.path.basename(local_filepath), 1220 bold(_("file not found")), 1221 ), 1222 importance = 0, 1223 type = "warning", 1224 header = brown(" @@ ") 1225 ) 1226 return False 1227 elif self.entropyTools.is_valid_md5(checksum): 1228 # valid? checking 1229 ckres = self.entropyTools.compare_md5(local_filepath,checksum) 1230 if ckres: 1231 self.Entropy.updateProgress( 1232 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1233 blue(crippled_uri), 1234 darkgreen(str(tries)), 1235 blue(str(counter)), 1236 bold(str(maxcount)), 1237 blue(_("digest verification")), 1238 os.path.basename(local_filepath), 1239 darkgreen(_("so far, so good!")), 1240 ), 1241 importance = 0, 1242 type = "info", 1243 header = red(" @@ ") 1244 ) 1245 return True 1246 else: 1247 self.Entropy.updateProgress( 1248 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1249 blue(crippled_uri), 1250 darkgreen(str(tries)), 1251 blue(str(counter)), 1252 bold(str(maxcount)), 1253 blue(_("digest verification")), 1254 os.path.basename(local_filepath), 1255 darkred(_("invalid checksum")), 1256 ), 1257 importance = 0, 1258 type = "warning", 1259 header = brown(" @@ ") 1260 ) 1261 return False 1262 else: 1263 self.Entropy.updateProgress( 1264 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1265 blue(crippled_uri), 1266 darkgreen(str(tries)), 1267 blue(str(counter)), 1268 bold(str(maxcount)), 1269 blue(_("digest verification")), 1270 os.path.basename(local_filepath), 1271 darkred(_("unknown data returned")), 1272 ), 1273 importance = 0, 1274 type = "warning", 1275 header = brown(" @@ ") 1276 ) 1277 return valid_remote_md5
1278
1279 - def go(self):
1280 1281 broken_uris = set() 1282 fine_uris = set() 1283 errors = False 1284 action = 'upload' 1285 if self.download: 1286 action = 'download' 1287 elif self.remove: 1288 action = 'remove' 1289 1290 for uri in self.uris: 1291 1292 crippled_uri = self.entropyTools.extract_ftp_host_from_uri(uri) 1293 self.Entropy.updateProgress( 1294 "[%s|%s] %s..." % ( 1295 blue(crippled_uri), 1296 brown(action), 1297 blue(_("connecting to mirror")), 1298 ), 1299 importance = 0, 1300 type = "info", 1301 header = blue(" @@ ") 1302 ) 1303 try: 1304 ftp = self.FtpInterface(uri, self.Entropy) 1305 except ConnectionError: 1306 self.entropyTools.print_traceback() 1307 return True,fine_uris,broken_uris # issues 1308 branch = self.Entropy.SystemSettings['repositories']['branch'] 1309 my_path = os.path.join(self.Entropy.get_remote_database_relative_path(self.repo), branch) 1310 self.Entropy.updateProgress( 1311 "[%s|%s] %s %s..." % ( 1312 blue(crippled_uri), 1313 brown(action), 1314 blue(_("changing directory to")), 1315 darkgreen(my_path), 1316 ), 1317 importance = 0, 1318 type = "info", 1319 header = blue(" @@ ") 1320 ) 1321 1322 ftp.set_cwd(self.ftp_basedir, dodir = True) 1323 maxcount = len(self.myfiles) 1324 counter = 0 1325 1326 for mypath in self.myfiles: 1327 1328 ftp.set_basedir() 1329 ftp.set_cwd(self.ftp_basedir, dodir = True) 1330 1331 mycwd = None 1332 if isinstance(mypath,tuple): 1333 if len(mypath) < 2: continue 1334 mycwd = mypath[0] 1335 mypath = mypath[1] 1336 ftp.set_cwd(mycwd, dodir = True) 1337 1338 syncer = ftp.upload_file 1339 myargs = [mypath] 1340 if self.download: 1341 syncer = ftp.download_file 1342 myargs = [os.path.basename(mypath),self.local_basedir] 1343 elif self.remove: 1344 syncer = ftp.delete_file 1345 1346 counter += 1 1347 tries = 0 1348 done = False 1349 lastrc = None 1350 while tries < 5: 1351 tries += 1 1352 self.Entropy.updateProgress( 1353 "[%s|#%s|(%s/%s)] %s: %s" % ( 1354 blue(crippled_uri), 1355 darkgreen(str(tries)), 1356 blue(str(counter)), 1357 bold(str(maxcount)), 1358 blue(action+"ing"), 1359 red(os.path.basename(mypath)), 1360 ), 1361 importance = 0, 1362 type = "info", 1363 header = red(" @@ ") 1364 ) 1365 rc = syncer(*myargs) 1366 if rc and not self.download: 1367 # try with "SITE MD5 command first" 1368 # proftpd's mod_md5 supports it 1369 remote_md5 = ftp.get_file_md5(os.path.basename(mypath)) 1370 rc = self.handler_verify_upload(mypath, uri, 1371 counter, maxcount, tries, remote_md5 = remote_md5) 1372 if rc: 1373 self.Entropy.updateProgress( 1374 "[%s|#%s|(%s/%s)] %s %s: %s" % ( 1375 blue(crippled_uri), 1376 darkgreen(str(tries)), 1377 blue(str(counter)), 1378 bold(str(maxcount)), 1379 blue(action), 1380 _("successful"), 1381 red(os.path.basename(mypath)), 1382 ), 1383 importance = 0, 1384 type = "info", 1385 header = darkgreen(" @@ ") 1386 ) 1387 done = True 1388 break 1389 else: 1390 self.Entropy.updateProgress( 1391 "[%s|#%s|(%s/%s)] %s %s: %s" % ( 1392 blue(crippled_uri), 1393 darkgreen(str(tries)), 1394 blue(str(counter)), 1395 bold(str(maxcount)), 1396 blue(action), 1397 brown(_("failed, retrying")), 1398 red(os.path.basename(mypath)), 1399 ), 1400 importance = 0, 1401 type = "warning", 1402 header = brown(" @@ ") 1403 ) 1404 lastrc = rc 1405 continue 1406 1407 if not done: 1408 1409 self.Entropy.updateProgress( 1410 "[%s|(%s/%s)] %s %s: %s - %s: %s" % ( 1411 blue(crippled_uri), 1412 blue(str(counter)), 1413 bold(str(maxcount)), 1414 blue(action), 1415 darkred("failed, giving up"), 1416 red(os.path.basename(mypath)), 1417 _("error"), 1418 lastrc, 1419 ), 1420 importance = 1, 1421 type = "error", 1422 header = darkred(" !!! ") 1423 ) 1424 1425 if mypath not in self.critical_files: 1426 self.Entropy.updateProgress( 1427 "[%s|(%s/%s)] %s: %s, %s..." % ( 1428 blue(crippled_uri), 1429 blue(str(counter)), 1430 bold(str(maxcount)), 1431 blue(_("not critical")), 1432 os.path.basename(mypath), 1433 blue(_("continuing")), 1434 ), 1435 importance = 1, 1436 type = "warning", 1437 header = brown(" @@ ") 1438 ) 1439 continue 1440 1441 ftp.close() 1442 errors = True 1443 broken_uris.add((uri,lastrc)) 1444 # next mirror 1445 break 1446 1447 # close connection 1448 ftp.close() 1449 fine_uris.add(uri) 1450 1451 return errors,fine_uris,broken_uris
1452