Package entropy :: Module transceivers

Source Code for Module entropy.transceivers

   1  # -*- coding: utf-8 -*- 
   2  ''' 
   3      # DESCRIPTION: 
   4      # Entropy Object Oriented Interface 
   5   
   6      Copyright (C) 2007-2009 Fabio Erculiani 
   7   
   8      This program is free software; you can redistribute it and/or modify 
   9      it under the terms of the GNU General Public License as published by 
  10      the Free Software Foundation; either version 2 of the License, or 
  11      (at your option) any later version. 
  12   
  13      This program is distributed in the hope that it will be useful, 
  14      but WITHOUT ANY WARRANTY; without even the implied warranty of 
  15      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
  16      GNU General Public License for more details. 
  17   
  18      You should have received a copy of the GNU General Public License 
  19      along with this program; if not, write to the Free Software 
  20      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
  21  ''' 
  22  from __future__ import with_statement 
  23  import os 
  24  import urllib2 
  25  import time 
  26  from entropy.const import etpConst 
  27  from entropy.output import TextInterface, darkblue, darkred, purple, blue, \ 
  28      brown, darkgreen, red, bold 
  29  from entropy.exceptions import * 
  30  from entropy.i18n import _ 
  31  from entropy.misc import TimeScheduled, ParallelTask 
  32  from entropy.core import SystemSettings 
  33   
34 -class UrlFetcher:
35
36 - def __init__(self, url, path_to_save, checksum = True, 37 show_speed = True, resume = True, 38 abort_check_func = None, disallow_redirect = False, 39 thread_stop_func = None, speed_limit = None, 40 OutputInterface = None):
41 42 self.__system_settings = SystemSettings() 43 if speed_limit == None: 44 speed_limit = self.__system_settings['repositories']['transfer_limit'] 45 46 self.progress = None 47 import entropy.tools as entropyTools 48 import socket 49 self.entropyTools, self.socket = entropyTools, socket 50 self.__th_id = 0 51 self.__resume = resume 52 self.__url = self.__encode_url(url) 53 self.__path_to_save = path_to_save 54 self.__checksum = checksum 55 self.__show_speed = show_speed 56 self.__abort_check_func = abort_check_func 57 self.__thread_stop_func = thread_stop_func 58 self.__disallow_redirect = disallow_redirect 59 self.__speedlimit = speed_limit # kbytes/sec 60 self.__existed_before = False 61 self.localfile = None 62 63 # important to have this here too 64 self.__datatransfer = 0 65 self.__resumed = False 66 67 uname = os.uname() 68 self.user_agent = "Entropy/%s (compatible; %s; %s: %s %s %s)" % ( 69 etpConst['entropyversion'], 70 "Entropy", 71 os.path.basename(self.__url), 72 uname[0], 73 uname[4], 74 uname[2], 75 ) 76 self.__extra_header_data = {} 77 self.__Output = OutputInterface 78 if self.__Output == None: 79 self.__Output = TextInterface() 80 elif not hasattr(self.__Output,'updateProgress'): 81 mytxt = _("Output interface passed doesn't have the updateProgress method") 82 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,)) 83 elif not callable(self.__Output.updateProgress): 84 mytxt = _("Output interface passed doesn't have the updateProgress method") 85 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,))
86
87 - def _init_vars(self):
88 89 self.__resumed = False 90 self.__buffersize = 8192 91 self.__status = None 92 self.__remotefile = None 93 self.__downloadedsize = 0 94 self.__average = 0 95 self.__remotesize = 0 96 self.__oldaverage = 0.0 97 # transfer status data 98 self.__startingposition = 0 99 self.__datatransfer = 0 100 self.__time_remaining = "(infinite)" 101 self.__time_remaining_secs = 0 102 self.__elapsed = 0.0 103 self.__updatestep = 0.2 104 self.__transferpollingtime = float(1)/4 105 self.__existed_before = False 106 if os.path.lexists(self.__path_to_save): 107 self.__existed_before = True 108 self.__setup_resume_support() 109 self._setup_proxy()
110
111 - def __setup_resume_support(self):
112 113 # if client uses this instance more than 114 # once, make sure we close previously opened 115 # files. 116 if isinstance(self.localfile, file): 117 try: 118 self.localfile.flush() 119 self.localfile.close() 120 except (IOError, OSError,): 121 pass 122 123 # resume support 124 if os.path.isfile(self.__path_to_save) and \ 125 os.access(self.__path_to_save,os.W_OK) and self.__resume: 126 127 self.localfile = open(self.__path_to_save,"awb") 128 self.localfile.seek(0,os.SEEK_END) 129 self.__startingposition = int(self.localfile.tell()) 130 self.__resumed = True 131 132 elif os.path.lexists(self.__path_to_save) and not \ 133 self.entropyTools.is_valid_path(self.__path_to_save): 134 try: 135 os.remove(self.__path_to_save) 136 except OSError: # I won't stop you here 137 pass 138 self.localfile = open(self.__path_to_save,"wb")
139
140 - def _setup_proxy(self):
141 # setup proxy, doing here because config is dynamic 142 mydict = {} 143 proxy_data = self.__system_settings['system']['proxy'] 144 if proxy_data['ftp']: 145 mydict['ftp'] = proxy_data['ftp'] 146 if proxy_data['http']: 147 mydict['http'] = proxy_data['http'] 148 if mydict: 149 mydict['username'] = proxy_data['username'] 150 mydict['password'] = proxy_data['password'] 151 self.entropyTools.add_proxy_opener(urllib2, mydict) 152 else: 153 # unset 154 urllib2._opener = None
155
156 - def __encode_url(self, url):
157 import urllib 158 url = os.path.join(os.path.dirname(url), 159 urllib.quote(os.path.basename(url))) 160 return url
161
162 - def set_id(self, th_id):
163 self.__th_id = th_id
164
165 - def download(self):
166 167 self._init_vars() 168 self.speedUpdater = TimeScheduled( 169 self.__transferpollingtime, 170 self.__update_speed, 171 ) 172 self.speedUpdater.start() 173 # set timeout 174 self.socket.setdefaulttimeout(20) 175 176 if self.__url.startswith("http://"): 177 headers = { 'User-Agent' : self.user_agent } 178 req = urllib2.Request(self.__url, self.__extra_header_data, headers) 179 else: 180 req = self.__url 181 182 u_agent_error = False 183 while 1: 184 # get file size if available 185 try: 186 self.__remotefile = urllib2.urlopen(req) 187 except KeyboardInterrupt: 188 self.__close(False) 189 raise 190 except urllib2.HTTPError, e: 191 if (e.code == 405) and not u_agent_error: 192 # server doesn't like our user agent 193 req = self.__url 194 u_agent_error = True 195 continue 196 self.__close(True) 197 self.__status = "-3" 198 return self.__status 199 except: 200 self.__close(True) 201 self.__status = "-3" 202 return self.__status 203 break 204 205 try: 206 self.__remotesize = int(self.__remotefile.headers.get( 207 "content-length")) 208 self.__remotefile.close() 209 except KeyboardInterrupt: 210 self.__close(False) 211 raise 212 except: 213 pass 214 215 # handle user stupidity 216 try: 217 request = self.__url 218 if ((self.__startingposition > 0) and (self.__remotesize > 0)) \ 219 and (self.__startingposition < self.__remotesize): 220 221 try: 222 request = urllib2.Request( 223 self.__url, 224 headers = { 225 "Range" : "bytes=" + \ 226 str(self.__startingposition) + "-" + \ 227 str(self.__remotesize) 228 } 229 ) 230 except KeyboardInterrupt: 231 self.__close(False) 232 raise 233 except: 234 pass 235 elif (self.__startingposition == self.__remotesize): 236 # all fine then! 237 self.__close(False) 238 return self.__prepare_return() 239 else: 240 self.localfile = open(self.__path_to_save,"wb") 241 self.__remotefile = urllib2.urlopen(request) 242 except KeyboardInterrupt: 243 self.__close(False) 244 raise 245 except: 246 self.__close(True) 247 self.__status = "-3" 248 return self.__status 249 250 if self.__remotesize > 0: 251 self.__remotesize = float(int(self.__remotesize))/1024 252 253 if self.__disallow_redirect and \ 254 (self.__url != self.__remotefile.geturl()): 255 256 self.__close(True) 257 self.__status = "-3" 258 return self.__status 259 260 while 1: 261 try: 262 rsx = self.__remotefile.read(self.__buffersize) 263 if rsx == '': break 264 if self.__abort_check_func != None: 265 self.__abort_check_func() 266 if self.__thread_stop_func != None: 267 self.__thread_stop_func() 268 except KeyboardInterrupt: 269 self.__close(False) 270 raise 271 except self.socket.timeout: 272 self.__close(False) 273 self.__status = "-4" 274 return self.__status 275 except: 276 # python 2.4 timeouts go here 277 self.__close(True) 278 self.__status = "-3" 279 return self.__status 280 self.__commit(rsx) 281 if self.__show_speed: 282 self.handle_statistics(self.__th_id, self.__downloadedsize, 283 self.__remotesize, self.__average, self.__oldaverage, 284 self.__updatestep, self.__show_speed, self.__datatransfer, 285 self.__time_remaining, self.__time_remaining_secs 286 ) 287 self.updateProgress() 288 self.__oldaverage = self.__average 289 if self.__speedlimit: 290 while self.__datatransfer > self.__speedlimit*1024: 291 time.sleep(0.1) 292 if self.__show_speed: 293 self.updateProgress() 294 self.__oldaverage = self.__average 295 296 # kill thread 297 self.__close(False) 298 return self.__prepare_return()
299 300
301 - def __prepare_return(self):
302 if self.__checksum: 303 self.__status = self.entropyTools.md5sum(self.__path_to_save) 304 return self.__status 305 self.__status = "-2" 306 return self.__status
307
308 - def __commit(self, mybuffer):
309 # writing file buffer 310 self.localfile.write(mybuffer) 311 # update progress info 312 self.__downloadedsize = self.localfile.tell() 313 kbytecount = float(self.__downloadedsize)/1024 314 self.__average = int((kbytecount/self.__remotesize)*100)
315
316 - def __close(self, errored):
317 try: 318 if isinstance(self.localfile, file): 319 self.localfile.flush() 320 self.localfile.close() 321 except IOError: 322 pass 323 if (not self.__existed_before) and errored: 324 try: 325 os.remove(self.__path_to_save) 326 except OSError: 327 pass 328 try: 329 self.__remotefile.close() 330 except: 331 pass 332 self.speedUpdater.kill() 333 self.socket.setdefaulttimeout(2)
334
335 - def __update_speed(self):
336 self.__elapsed += self.__transferpollingtime 337 # we have the diff size 338 x_delta = self.__downloadedsize - self.__startingposition 339 self.__datatransfer = x_delta / self.__elapsed 340 if self.__datatransfer < 0: 341 self.__datatransfer = 0 342 try: 343 rounded_remote = int(round(self.__remotesize*1024,0)) 344 rounded_downloaded = int(round(self.__downloadedsize,0)) 345 x_delta = rounded_remote - rounded_downloaded 346 tx_round = int(round(x_delta/self.__datatransfer,0)) 347 self.__time_remaining_secs = tx_round 348 self.__time_remaining = self.entropyTools.convert_seconds_to_fancy_output(self.__time_remaining_secs) 349 except: 350 self.__time_remaining = "(%s)" % (_("infinite"),)
351
352 - def get_transfer_rate(self):
353 return self.__datatransfer
354
355 - def is_resumed(self):
356 return self.__resumed
357
358 - def handle_statistics(self, th_id, downloaded_size, total_size, 359 average, old_average, update_step, show_speed, data_transfer, 360 time_remaining, time_remaining_secs):
361 return
362
363 - def updateProgress(self):
364 365 mytxt = _("[F]") 366 eta_txt = _("ETA") 367 sec_txt = _("sec") # as in XX kb/sec 368 369 currentText = darkred(" %s: " % (mytxt,)) + \ 370 darkgreen(str(round(float(self.__downloadedsize)/1024,1))) + "/" + \ 371 red(str(round(self.__remotesize,1))) + " kB" 372 # create progress bar 373 barsize = 10 374 bartext = "[" 375 curbarsize = 1 376 if self.__average > self.__oldaverage+self.__updatestep: 377 averagesize = (self.__average*barsize)/100 378 while averagesize > 0: 379 curbarsize += 1 380 bartext += "=" 381 averagesize -= 1 382 bartext += ">" 383 diffbarsize = barsize-curbarsize 384 while diffbarsize > 0: 385 bartext += " " 386 diffbarsize -= 1 387 if self.__show_speed: 388 bartext += "] => %s" % (self.entropyTools.bytes_into_human(self.__datatransfer),) 389 bartext += "/%s : %s: %s" % (sec_txt,eta_txt,self.__time_remaining,) 390 else: 391 bartext += "]" 392 average = str(self.__average) 393 if len(average) < 2: 394 average = " "+average 395 currentText += " <-> "+average+"% "+bartext 396 self.__Output.updateProgress(currentText, back = True)
397
398 -class MultipleUrlFetcher:
399 400 import entropy.tools as entropyTools
401 - def __init__(self, url_path_list, checksum = True, 402 show_speed = True, resume = True, 403 abort_check_func = None, disallow_redirect = False, 404 OutputInterface = None, UrlFetcherClass = None):
405 """ 406 @param url_path_list list [(url,path_to_save,),...] 407 @param checksum bool return checksum data 408 @param show_speed bool show transfer speed on the output 409 @param resume bool enable resume support 410 @param abort_check_func callable function that could 411 raise exception and stop transfer 412 @param disallow_redirect bool disable automatic HTTP redirect 413 @param OutputInterface TextInterface instance used to 414 print instance output through a common interface 415 @param UrlFetcherClass, urlFetcher instance/interface used 416 """ 417 self.__system_settings = SystemSettings() 418 self.__url_path_list = url_path_list 419 self.__resume = resume 420 self.__checksum = checksum 421 self.__show_speed = show_speed 422 self.__abort_check_func = abort_check_func 423 self.__disallow_redirect = disallow_redirect 424 425 # important to have a declaration here 426 self.__data_transfer = 0 427 self.__average = 0 428 self.__old_average = 0 429 self.__time_remaining_sec = 0 430 431 self.__Output = OutputInterface 432 if self.__Output == None: 433 self.__Output = TextInterface() 434 elif not hasattr(self.__Output,'updateProgress'): 435 mytxt = _("Output interface passed doesn't have the updateProgress method") 436 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,)) 437 elif not callable(self.__Output.updateProgress): 438 mytxt = _("Output interface passed doesn't have the updateProgress method") 439 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,)) 440 441 self.__url_fetcher = UrlFetcherClass 442 if self.__url_fetcher == None: 443 self.__url_fetcher = UrlFetcher
444 445
446 - def __handle_threads_stop(self):
447 if self.__stop_threads: 448 raise InterruptError
449
450 - def _init_vars(self):
451 self.__progress_data = {} 452 self.__thread_pool = {} 453 self.__download_statuses = {} 454 self.__show_progress = False 455 self.__stop_threads = False 456 self.__first_refreshes = 50 457 self.__data_transfer = 0 458 self.__average = 0 459 self.__old_average = 0 460 self.__time_remaining_sec = 0
461
462 - def download(self):
463 self._init_vars() 464 465 th_id = 0 466 speed_limit = 0 467 dsl = self.__system_settings['repositories']['transfer_limit'] 468 if isinstance(dsl,int) and self.__url_path_list: 469 speed_limit = dsl/len(self.__url_path_list) 470 471 for url, path_to_save in self.__url_path_list: 472 th_id += 1 473 downloader = self.__url_fetcher(url, path_to_save, 474 checksum = self.__checksum, show_speed = self.__show_speed, 475 resume = self.__resume, abort_check_func = self.__abort_check_func, 476 disallow_redirect = self.__disallow_redirect, 477 thread_stop_func = self.__handle_threads_stop, 478 speed_limit = speed_limit, 479 OutputInterface = self.__Output 480 ) 481 downloader.set_id(th_id) 482 downloader.updateProgress = self.updateProgress 483 downloader.handle_statistics = self.handle_statistics 484 485 def do_download(ds, th_id, downloader): 486 ds[th_id] = downloader.download()
487 488 t = ParallelTask(do_download, self.__download_statuses, th_id, downloader) 489 self.__thread_pool[th_id] = t 490 t.start() 491 self.show_download_files_info() 492 self.__show_progress = True 493 while len(self.__url_path_list) != len(self.__download_statuses): 494 try: 495 time.sleep(0.5) 496 except (SystemExit, KeyboardInterrupt,): 497 self.__stop_threads = True 498 raise 499 500 return self.__download_statuses
501
502 - def get_data_transfer(self):
503 return self.__data_transfer
504
505 - def get_average(self):
506 return self.__average
507
508 - def get_seconds_remaining(self):
509 return self.__time_remaining_sec
510
511 - def show_download_files_info(self):
512 count = 0 513 pl = self.__url_path_list[:] 514 self.__Output.updateProgress( 515 "%s: %s %s" % ( 516 darkblue(_("Aggregated download")), 517 darkred(str(len(pl))), 518 darkblue(_("items")), 519 ), 520 importance = 0, 521 type = "info", 522 header = purple(" ## ") 523 ) 524 for url, save_path in pl: 525 count += 1 526 fname = os.path.basename(url) 527 uri = self.entropyTools.spliturl(url)[1] 528 self.__Output.updateProgress( 529 "[%s] %s => %s" % ( 530 darkblue(str(count)), 531 darkgreen(uri), 532 blue(fname), 533 ), 534 importance = 0, 535 type = "info", 536 header = brown(" # ") 537 )
538
539 - def handle_statistics(self, th_id, downloaded_size, total_size, 540 average, old_average, update_step, show_speed, data_transfer, 541 time_remaining, time_remaining_secs):
542 data = { 543 'th_id': th_id, 544 'downloaded_size': downloaded_size, 545 'total_size': total_size, 546 'average': average, 547 'old_average': old_average, 548 'update_step': update_step, 549 'show_speed': show_speed, 550 'data_transfer': data_transfer, 551 'time_remaining': time_remaining, 552 'time_remaining_secs': time_remaining_secs, 553 } 554 self.__progress_data[th_id] = data
555
556 - def updateProgress(self):
557 558 eta_txt = _("ETA") 559 sec_txt = _("sec") # as in XX kb/sec 560 downloaded_size = 0 561 total_size = 0 562 time_remaining = 0 563 data_transfer = 0 564 update_step = 0 565 average = 100 566 pd = self.__progress_data.copy() 567 pdlen = len(pd) 568 569 # calculation 570 for th_id in sorted(pd): 571 data = pd.get(th_id) 572 downloaded_size += data.get('downloaded_size',0) 573 total_size += data.get('total_size',0) 574 data_transfer += data.get('data_transfer',0) 575 tr = data.get('time_remaining_secs',0) 576 if tr > 0: time_remaining += tr 577 update_step += data.get('update_step',0) 578 579 # total_size is in kbytes 580 # downloaded_size is in bytes 581 if total_size > 0: 582 average = int(float(downloaded_size/1024)/total_size * 100) 583 self.__data_transfer = data_transfer 584 self.__average = average 585 update_step = update_step/pdlen 586 self.__time_remaining_sec = time_remaining 587 time_remaining = self.entropyTools.convert_seconds_to_fancy_output(time_remaining) 588 589 if ((average > self.__old_average+update_step) or \ 590 (self.__first_refreshes > 0)) and self.__show_progress: 591 592 self.__first_refreshes -= 1 593 currentText = darkgreen(str(round(float(downloaded_size)/1024,1))) + "/" + \ 594 red(str(round(total_size,1))) + " kB" 595 # create progress bar 596 barsize = 10 597 bartext = "[" 598 curbarsize = 1 599 averagesize = (average*barsize)/100 600 while averagesize > 0: 601 curbarsize += 1 602 bartext += "=" 603 averagesize -= 1 604 bartext += ">" 605 diffbarsize = barsize-curbarsize 606 while diffbarsize > 0: 607 bartext += " " 608 diffbarsize -= 1 609 if self.__show_speed: 610 bartext += "] => %s" % (self.entropyTools.bytes_into_human(data_transfer),) 611 bartext += "/%s : %s: %s" % (sec_txt,eta_txt,time_remaining,) 612 else: 613 bartext += "]" 614 myavg = str(average) 615 if len(myavg) < 2: 616 myavg = " "+myavg 617 currentText += " <-> "+myavg+"% "+bartext+" " 618 self.__Output.updateProgress(currentText, back = True) 619 620 self.__old_average = average
621 622
623 -class FtpInterface:
624 625 # this must be run before calling the other functions
626 - def __init__(self, ftpuri, OutputInterface, verbose = True):
627 628 if not hasattr(OutputInterface,'updateProgress'): 629 mytxt = _("OutputInterface does not have an updateProgress method") 630 raise IncorrectParameter("IncorrectParameter: %s, (! %s !)" % (OutputInterface,mytxt,)) 631 elif not callable(OutputInterface.updateProgress): 632 mytxt = _("OutputInterface does not have an updateProgress method") 633 raise IncorrectParameter("IncorrectParameter: %s, (! %s !)" % (OutputInterface,mytxt,)) 634 635 import socket, ftplib 636 import entropy.tools as entropyTools 637 self.socket, self.ftplib, self.entropyTools = socket, ftplib, entropyTools 638 self.Entropy = OutputInterface 639 self.__verbose = verbose 640 self.__init_vars() 641 self.socket.setdefaulttimeout(60) 642 self.__ftpuri = ftpuri 643 self.__speed_updater = None 644 self.__currentdir = '.' 645 self.__ftphost = self.entropyTools.extract_ftp_host_from_uri(self.__ftpuri) 646 self.__ftpuser, self.__ftppassword, self.__ftpport, self.__ftpdir = self.entropyTools.extract_ftp_data(ftpuri) 647 648 count = 10 649 while 1: 650 count -= 1 651 try: 652 self.__ftpconn = self.ftplib.FTP(self.__ftphost) 653 break 654 except (self.socket.gaierror,), e: 655 raise ConnectionError('ConnectionError: %s' % (e,)) 656 except (self.socket.error,), e: 657 if not count: 658 raise ConnectionError('ConnectionError: %s' % (e,)) 659 continue 660 except: 661 if not count: raise 662 continue 663 664 if self.__verbose: 665 mytxt = _("connecting with user") 666 self.Entropy.updateProgress( 667 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpuser),), 668 importance = 1, 669 type = "info", 670 header = darkgreen(" * ") 671 ) 672 try: 673 self.__ftpconn.login(self.__ftpuser,self.__ftppassword) 674 except self.ftplib.error_perm, e: 675 raise FtpError('FtpError: %s' % (e,)) 676 if self.__verbose: 677 mytxt = _("switching to") 678 self.Entropy.updateProgress( 679 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpdir),), 680 importance = 1, 681 type = "info", 682 header = darkgreen(" * ") 683 ) 684 self.set_cwd(self.__ftpdir, dodir = True)
685
686 - def __init_vars(self):
687 self.__oldprogress = 0.0 688 self.__filesize = 0 689 self.__filekbcount = 0 690 self.__transfersize = 0 691 self.__startingposition = 0 692 self.__elapsed = 0.0 693 self.__time_remaining_secs = 0 694 self.__time_remaining = "(%s)" % (_("infinite"),) 695 self.__transferpollingtime = float(1)/4
696
697 - def set_basedir(self):
698 return self.set_cwd(self.__ftpdir)
699 700 # this can be used in case of exceptions
701 - def reconnect_host(self):
702 # import FTP modules 703 self.socket.setdefaulttimeout(60) 704 counter = 10 705 while 1: 706 counter -= 1 707 try: 708 self.__ftpconn = self.ftplib.FTP(self.__ftphost) 709 break 710 except: 711 if not counter: 712 raise 713 continue 714 if self.__verbose: 715 mytxt = _("reconnecting with user") 716 self.Entropy.updateProgress( 717 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpuser),), 718 importance = 1, 719 type = "info", 720 header = darkgreen(" * ") 721 ) 722 self.__ftpconn.login(self.__ftpuser,self.__ftppassword) 723 if self.__verbose: 724 mytxt = _("switching to") 725 self.Entropy.updateProgress( 726 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpdir),), 727 importance = 1, 728 type = "info", 729 header = darkgreen(" * ") 730 ) 731 self.set_cwd(self.__currentdir)
732
733 - def get_host(self):
734 return self.__ftphost
735
736 - def get_port(self):
737 return self.__ftpport
738
739 - def get_dir(self):
740 return self.__ftpdir
741
742 - def get_cwd(self):
743 pwd = self.__ftpconn.pwd() 744 return pwd
745
746 - def set_cwd(self, mydir, dodir = False):
747 try: 748 return self._set_cwd(mydir, dodir) 749 except self.ftplib.error_perm, e: 750 raise FtpError('FtpError: %s' % (e,))
751
752 - def _set_cwd(self, mydir, dodir = False):
753 if self.__verbose: 754 mytxt = _("switching to") 755 self.Entropy.updateProgress( 756 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(mydir),), 757 importance = 1, 758 type = "info", 759 header = darkgreen(" * ") 760 ) 761 try: 762 self.__ftpconn.cwd(mydir) 763 except self.ftplib.error_perm, e: 764 if e[0][:3] == '550' and dodir: 765 self.recursive_mkdir(mydir) 766 self.__ftpconn.cwd(mydir) 767 else: 768 raise 769 self.__currentdir = self.get_cwd()
770
771 - def set_pasv(self,bool):
772 self.__ftpconn.set_pasv(bool)
773
774 - def set_chmod(self,chmodvalue,file):
775 return self.__ftpconn.voidcmd("SITE CHMOD "+str(chmodvalue)+" "+str(file))
776
777 - def get_file_mtime(self,path):
778 rc = self.__ftpconn.sendcmd("mdtm "+path) 779 return rc.split()[-1]
780
781 - def send_cmd(self,cmd):
782 return self.__ftpconn.sendcmd(cmd)
783
784 - def list_dir(self):
785 return [os.path.basename(x) for x in self.__ftpconn.nlst()]
786
787 - def is_file_available(self, filename):
788 xx = [] 789 def cb(x): 790 if x == filename: xx.append(x)
791 self.__ftpconn.retrlines('NLST',cb) 792 if xx: return True 793 return False
794
795 - def delete_file(self,file):
796 try: 797 rc = self.__ftpconn.delete(file) 798 except self.ftplib.error_perm, e: 799 if e[0][:3] == '550': 800 return True 801 return False # not found 802 if rc.startswith("250"): 803 return True 804 return False
805
806 - def recursive_mkdir(self, mypath):
807 mydirs = [x for x in mypath.split("/") if x] 808 mycurpath = "" 809 for mydir in mydirs: 810 mycurpath = os.path.join(mycurpath,mydir) 811 if not self.is_file_available(mycurpath): 812 try: 813 self.mkdir(mycurpath) 814 except self.ftplib.error_perm, e: 815 if e[0].lower().find("permission denied") != -1: 816 raise 817 elif e[0][:3] != '550': 818 raise
819
820 - def mkdir(self,directory):
821 return self.__ftpconn.mkd(directory)
822
823 - def upload_file(self, file, ascii = False):
824 825 # this function also supports callback, because storbinary doesn't 826 def advanced_stor(cmd, fp): 827 ''' Store a file in binary mode. Our version supports a callback function''' 828 self.__ftpconn.voidcmd('TYPE I') 829 conn = self.__ftpconn.transfercmd(cmd) 830 while 1: 831 buf = fp.readline() 832 if not buf: 833 break 834 conn.sendall(buf) 835 self.updateProgress(len(buf)) 836 conn.close() 837 838 # that's another workaround 839 #return "226" 840 try: 841 rc = self.__ftpconn.voidresp() 842 except: 843 self.reconnect_host() 844 return "226" 845 return rc
846 847 tries = 0 848 while tries < 10: 849 850 tries += 1 851 filename = os.path.basename(file) 852 self.__init_vars() 853 self.__start_speed_counter() 854 try: 855 856 with open(file,"r") as f: 857 858 self.__filesize = round(float(self.entropyTools.get_file_size(file))/1024,1) 859 self.__filekbcount = 0 860 861 # delete old one, if exists 862 self.delete_file(filename+".tmp") 863 864 if ascii: 865 rc = self.__ftpconn.storlines("STOR "+filename+".tmp",f) 866 else: 867 rc = advanced_stor("STOR "+filename+".tmp", f) 868 869 # now we can rename the file with its original name 870 self.rename_file(filename+".tmp",filename) 871 872 if rc.find("226") != -1: # upload complete 873 return True 874 return False 875 876 except Exception, e: # connection reset by peer 877 878 self.entropyTools.print_traceback() 879 mytxt = red("%s: %s, %s... #%s") % ( 880 _("Upload issue"), 881 e, 882 _("retrying"), 883 tries+1, 884 ) 885 self.Entropy.updateProgress( 886 mytxt, 887 importance = 1, 888 type = "warning", 889 header = " " 890 ) 891 self.reconnect_host() # reconnect 892 self.delete_file(filename) 893 self.delete_file(filename+".tmp") 894 895 finally: 896 self.__stop_speed_counter() 897
898 - def download_file(self, filename, downloaddir, ascii = False):
899 900 def df_up(buf): 901 # writing file buffer 902 f.write(buf) 903 self.updateProgress(len(buf))
904 905 tries = 10 906 while tries: 907 tries -= 1 908 909 self.__init_vars() 910 self.__start_speed_counter() 911 try: 912 913 self.__filekbcount = 0 914 # get the file size 915 self.__filesize = self.get_file_size_compat(filename) 916 if (self.__filesize): 917 self.__filesize = round(float(int(self.__filesize))/1024,1) 918 if (self.__filesize == 0): 919 self.__filesize = 1 920 elif not self.is_file_available(filename): 921 return False 922 else: 923 self.__filesize = 0 924 if not ascii: 925 f = open(downloaddir+"/"+filename,"wb") 926 rc = self.__ftpconn.retrbinary('RETR '+filename, df_up, 1024) 927 else: 928 f = open(downloaddir+"/"+filename,"w") 929 rc = self.__ftpconn.retrlines('RETR '+filename, f.write) 930 f.flush() 931 f.close() 932 if rc.find("226") != -1: # upload complete 933 return True 934 return False 935 936 except Exception, e: # connection reset by peer 937 938 self.entropyTools.print_traceback() 939 mytxt = red("%s: %s, %s... #%s") % ( 940 _("Download issue"), 941 e, 942 _("retrying"), 943 tries+1, 944 ) 945 self.Entropy.updateProgress( 946 mytxt, 947 importance = 1, 948 type = "warning", 949 header = " " 950 ) 951 self.reconnect_host() # reconnect 952 953 finally: 954 self.__stop_speed_counter() 955 956 # also used to move files
957 - def rename_file(self, fromfile, tofile):
958 rc = self.__ftpconn.rename(fromfile,tofile) 959 return rc
960
961 - def get_file_md5(self, filename):
962 # PROFTPD with mod_md5 supports it! 963 try: 964 rc_data = self.__ftpconn.sendcmd("SITE MD5 %s" % (filename,)) 965 except self.ftplib.error_perm: 966 return None # not supported 967 try: 968 return rc_data.split("\n")[0].split("\t")[0].split("-")[1] 969 except (IndexError, TypeError,): # wrong output 970 return None
971
972 - def get_file_size(self, filename):
973 return self.__ftpconn.size(filename)
974
975 - def get_file_size_compat(self, filename):
976 try: 977 data = [x.split() for x in self.__ftpconn.sendcmd("stat %s" % (filename,)).split("\n")] 978 except self.ftplib.error_temp: 979 return "" 980 for item in data: 981 if item[-1] == filename: 982 return item[4] 983 return ""
984
985 - def get_raw_list(self):
986 mybuffer = [] 987 def bufferizer(buf): 988 mybuffer.append(buf)
989 self.__ftpconn.dir(bufferizer) 990 return mybuffer 991
992 - def close(self):
993 try: 994 self.__ftpconn.quit() 995 except (EOFError,AttributeError,self.socket.timeout,self.ftplib.error_reply,): 996 # AttributeError is raised when socket gets trashed 997 # EOFError is raised when the connection breaks 998 # timeout, who cares! 999 pass
1000
1001 - def __start_speed_counter(self):
1002 self.__speed_updater = TimeScheduled( 1003 self.__transferpollingtime, 1004 self.__update_speed, 1005 ) 1006 self.__speed_updater.start()
1007
1008 - def __stop_speed_counter(self):
1009 if self.__speed_updater != None: 1010 self.__speed_updater.kill()
1011
1012 - def __update_speed(self):
1013 self.__elapsed += self.__transferpollingtime 1014 # we have the diff size 1015 self.__datatransfer = (self.__transfersize-self.__startingposition) / self.__elapsed 1016 if self.__datatransfer < 0: 1017 self.__datatransfer = 0 1018 try: 1019 self.__time_remaining_secs = int(round((int(round(self.__filesize*1024,0))-int(round(self.__transfersize,0)))/self.__datatransfer,0)) 1020 self.__time_remaining = self.entropyTools.convert_seconds_to_fancy_output(self.__time_remaining_secs) 1021 except: 1022 self.__time_remaining = "(%s)" % (_("infinite"),)
1023
1024 - def updateProgress(self, buf_len):
1025 # get the buffer size 1026 self.__filekbcount += float(buf_len)/1024 1027 self.__transfersize += buf_len 1028 # create percentage 1029 myUploadPercentage = 100.0 1030 if self.__filesize >= 1: 1031 myUploadPercentage = round((round(self.__filekbcount,1)/self.__filesize)*100,1) 1032 currentprogress = myUploadPercentage 1033 myUploadSize = round(self.__filekbcount,1) 1034 if (currentprogress > self.__oldprogress+1.0) and \ 1035 (myUploadPercentage < 100.1) and \ 1036 (myUploadSize <= self.__filesize): 1037 1038 myUploadPercentage = str(myUploadPercentage)+"%" 1039 # create text 1040 mytxt = _("Transfer status") 1041 currentText = brown(" <-> %s: " % (mytxt,)) + \ 1042 darkgreen(str(myUploadSize)) + "/" + red(str(self.__filesize)) + " kB " + \ 1043 brown("[") + str(myUploadPercentage) + brown("]") + " " + self.__time_remaining + \ 1044 " " + self.entropyTools.bytes_into_human(self.__datatransfer) + "/"+_("sec") 1045 # WARN: re-enabled updateProgress, this may cause slowdowns 1046 # print_info(currentText, back = True) 1047 self.Entropy.updateProgress(currentText, back = True) 1048 self.__oldprogress = currentprogress
1049 1050
1051 -class FtpServerHandler:
1052 1053 import entropy.tools as entropyTools
1054 - def __init__(self, ftp_interface, entropy_interface, uris, files_to_upload, 1055 download = False, remove = False, ftp_basedir = None, local_basedir = None, 1056 critical_files = [], use_handlers = False, handlers_data = {}, repo = None):
1057 1058 self.FtpInterface = ftp_interface 1059 self.Entropy = entropy_interface 1060 if not isinstance(uris,list): 1061 raise InvalidDataType("InvalidDataType: %s" % (_("uris must be a list instance"),)) 1062 if not isinstance(files_to_upload,(list,dict)): 1063 raise InvalidDataType("InvalidDataType: %s" % ( 1064 _("files_to_upload must be a list or dict instance"), 1065 ) 1066 ) 1067 self.uris = uris 1068 if isinstance(files_to_upload,list): 1069 self.myfiles = files_to_upload[:] 1070 else: 1071 self.myfiles = sorted([x for x in files_to_upload]) 1072 self.download = download 1073 self.remove = remove 1074 self.repo = repo 1075 if self.repo == None: 1076 self.repo = self.Entropy.default_repository 1077 self.use_handlers = use_handlers 1078 if self.remove: 1079 self.download = False 1080 self.use_handlers = False 1081 if not ftp_basedir: 1082 # default to database directory 1083 branch = self.Entropy.SystemSettings['repositories']['branch'] 1084 my_path = os.path.join(self.Entropy.get_remote_database_relative_path(repo), branch) 1085 self.ftp_basedir = unicode(my_path) 1086 else: 1087 self.ftp_basedir = unicode(ftp_basedir) 1088 if not local_basedir: 1089 # default to database directory 1090 self.local_basedir = os.path.dirname(self.Entropy.get_local_database_file(self.repo)) 1091 else: 1092 self.local_basedir = unicode(local_basedir) 1093 self.critical_files = critical_files 1094 self.handlers_data = handlers_data.copy()
1095
1096 - def handler_verify_upload(self, local_filepath, uri, counter, maxcount, 1097 tries, remote_md5 = None):
1098 1099 crippled_uri = self.entropyTools.extract_ftp_host_from_uri(uri) 1100 1101 self.Entropy.updateProgress( 1102 "[%s|#%s|(%s/%s)] %s: %s" % ( 1103 blue(crippled_uri), 1104 darkgreen(str(tries)), 1105 blue(str(counter)), 1106 bold(str(maxcount)), 1107 darkgreen(_("verifying upload (if supported)")), 1108 blue(os.path.basename(local_filepath)), 1109 ), 1110 importance = 0, 1111 type = "info", 1112 header = red(" @@ "), 1113 back = True 1114 ) 1115 1116 valid_remote_md5 = True 1117 # if remote server supports MD5 commands, remote_md5 is filled 1118 if isinstance(remote_md5, basestring): 1119 valid_md5 = self.entropyTools.is_valid_md5(remote_md5) 1120 ckres = False 1121 if valid_md5: # seems valid 1122 ckres = self.entropyTools.compare_md5(local_filepath, 1123 remote_md5) 1124 if ckres: 1125 self.Entropy.updateProgress( 1126 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1127 blue(crippled_uri), 1128 darkgreen(str(tries)), 1129 blue(str(counter)), 1130 bold(str(maxcount)), 1131 blue(_("digest verification")), 1132 os.path.basename(local_filepath), 1133 darkgreen(_("so far, so good!")), 1134 ), 1135 importance = 0, 1136 type = "info", 1137 header = red(" @@ ") 1138 ) 1139 return True 1140 # ouch! 1141 elif not valid_md5: 1142 # mmmh... malformed md5, try with handlers 1143 self.Entropy.updateProgress( 1144 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1145 blue(crippled_uri), 1146 darkgreen(str(tries)), 1147 blue(str(counter)), 1148 bold(str(maxcount)), 1149 blue(_("digest verification")), 1150 os.path.basename(local_filepath), 1151 bold(_("malformed md5 provided to function")), 1152 ), 1153 importance = 0, 1154 type = "warning", 1155 header = brown(" @@ ") 1156 ) 1157 else: # it's really bad! 1158 self.Entropy.updateProgress( 1159 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1160 blue(crippled_uri), 1161 darkgreen(str(tries)), 1162 blue(str(counter)), 1163 bold(str(maxcount)), 1164 blue(_("digest verification")), 1165 os.path.basename(local_filepath), 1166 bold(_("remote md5 is invalid")), 1167 ), 1168 importance = 0, 1169 type = "warning", 1170 header = brown(" @@ ") 1171 ) 1172 valid_remote_md5 = False 1173 1174 if not self.use_handlers: 1175 # handlers usage is disabled 1176 return valid_remote_md5 # always valid 1177 1178 checksum = self.Entropy.get_remote_package_checksum( 1179 self.repo, 1180 os.path.basename(local_filepath), 1181 self.handlers_data['branch'] 1182 ) 1183 if checksum == None: 1184 self.Entropy.updateProgress( 1185 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1186 blue(crippled_uri), 1187 darkgreen(str(tries)), 1188 blue(str(counter)), 1189 bold(str(maxcount)), 1190 blue(_("digest verification")), 1191 os.path.basename(local_filepath), 1192 darkred(_("not supported")), 1193 ), 1194 importance = 0, 1195 type = "info", 1196 header = red(" @@ ") 1197 ) 1198 return valid_remote_md5 1199 elif isinstance(checksum, bool) and not checksum: 1200 self.Entropy.updateProgress( 1201 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1202 blue(crippled_uri), 1203 darkgreen(str(tries)), 1204 blue(str(counter)), 1205 bold(str(maxcount)), 1206 blue(_("digest verification")), 1207 os.path.basename(local_filepath), 1208 bold(_("file not found")), 1209 ), 1210 importance = 0, 1211 type = "warning", 1212 header = brown(" @@ ") 1213 ) 1214 return False 1215 elif self.entropyTools.is_valid_md5(checksum): 1216 # valid? checking 1217 ckres = self.entropyTools.compare_md5(local_filepath,checksum) 1218 if ckres: 1219 self.Entropy.updateProgress( 1220 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1221 blue(crippled_uri), 1222 darkgreen(str(tries)), 1223 blue(str(counter)), 1224 bold(str(maxcount)), 1225 blue(_("digest verification")), 1226 os.path.basename(local_filepath), 1227 darkgreen(_("so far, so good!")), 1228 ), 1229 importance = 0, 1230 type = "info", 1231 header = red(" @@ ") 1232 ) 1233 return True 1234 else: 1235 self.Entropy.updateProgress( 1236 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1237 blue(crippled_uri), 1238 darkgreen(str(tries)), 1239 blue(str(counter)), 1240 bold(str(maxcount)), 1241 blue(_("digest verification")), 1242 os.path.basename(local_filepath), 1243 darkred(_("invalid checksum")), 1244 ), 1245 importance = 0, 1246 type = "warning", 1247 header = brown(" @@ ") 1248 ) 1249 return False 1250 else: 1251 self.Entropy.updateProgress( 1252 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 1253 blue(crippled_uri), 1254 darkgreen(str(tries)), 1255 blue(str(counter)), 1256 bold(str(maxcount)), 1257 blue(_("digest verification")), 1258 os.path.basename(local_filepath), 1259 darkred(_("unknown data returned")), 1260 ), 1261 importance = 0, 1262 type = "warning", 1263 header = brown(" @@ ") 1264 ) 1265 return valid_remote_md5
1266
1267 - def go(self):
1268 1269 broken_uris = set() 1270 fine_uris = set() 1271 errors = False 1272 action = 'upload' 1273 if self.download: 1274 action = 'download' 1275 elif self.remove: 1276 action = 'remove' 1277 1278 for uri in self.uris: 1279 1280 crippled_uri = self.entropyTools.extract_ftp_host_from_uri(uri) 1281 self.Entropy.updateProgress( 1282 "[%s|%s] %s..." % ( 1283 blue(crippled_uri), 1284 brown(action), 1285 blue(_("connecting to mirror")), 1286 ), 1287 importance = 0, 1288 type = "info", 1289 header = blue(" @@ ") 1290 ) 1291 try: 1292 ftp = self.FtpInterface(uri, self.Entropy) 1293 except ConnectionError: 1294 self.entropyTools.print_traceback() 1295 return True,fine_uris,broken_uris # issues 1296 branch = self.Entropy.SystemSettings['repositories']['branch'] 1297 my_path = os.path.join(self.Entropy.get_remote_database_relative_path(self.repo), branch) 1298 self.Entropy.updateProgress( 1299 "[%s|%s] %s %s..." % ( 1300 blue(crippled_uri), 1301 brown(action), 1302 blue(_("changing directory to")), 1303 darkgreen(my_path), 1304 ), 1305 importance = 0, 1306 type = "info", 1307 header = blue(" @@ ") 1308 ) 1309 1310 ftp.set_cwd(self.ftp_basedir, dodir = True) 1311 maxcount = len(self.myfiles) 1312 counter = 0 1313 1314 for mypath in self.myfiles: 1315 1316 ftp.set_basedir() 1317 ftp.set_cwd(self.ftp_basedir, dodir = True) 1318 1319 mycwd = None 1320 if isinstance(mypath,tuple): 1321 if len(mypath) < 2: continue 1322 mycwd = mypath[0] 1323 mypath = mypath[1] 1324 ftp.set_cwd(mycwd, dodir = True) 1325 1326 syncer = ftp.upload_file 1327 myargs = [mypath] 1328 if self.download: 1329 syncer = ftp.download_file 1330 myargs = [os.path.basename(mypath),self.local_basedir] 1331 elif self.remove: 1332 syncer = ftp.delete_file 1333 1334 counter += 1 1335 tries = 0 1336 done = False 1337 lastrc = None 1338 while tries < 5: 1339 tries += 1 1340 self.Entropy.updateProgress( 1341 "[%s|#%s|(%s/%s)] %s: %s" % ( 1342 blue(crippled_uri), 1343 darkgreen(str(tries)), 1344 blue(str(counter)), 1345 bold(str(maxcount)), 1346 blue(action+"ing"), 1347 red(os.path.basename(mypath)), 1348 ), 1349 importance = 0, 1350 type = "info", 1351 header = red(" @@ ") 1352 ) 1353 rc = syncer(*myargs) 1354 if rc and not self.download: 1355 # try with "SITE MD5 command first" 1356 # proftpd's mod_md5 supports it 1357 remote_md5 = ftp.get_file_md5(os.path.basename(mypath)) 1358 rc = self.handler_verify_upload(mypath, uri, 1359 counter, maxcount, tries, remote_md5 = remote_md5) 1360 if rc: 1361 self.Entropy.updateProgress( 1362 "[%s|#%s|(%s/%s)] %s %s: %s" % ( 1363 blue(crippled_uri), 1364 darkgreen(str(tries)), 1365 blue(str(counter)), 1366 bold(str(maxcount)), 1367 blue(action), 1368 _("successful"), 1369 red(os.path.basename(mypath)), 1370 ), 1371 importance = 0, 1372 type = "info", 1373 header = darkgreen(" @@ ") 1374 ) 1375 done = True 1376 break 1377 else: 1378 self.Entropy.updateProgress( 1379 "[%s|#%s|(%s/%s)] %s %s: %s" % ( 1380 blue(crippled_uri), 1381 darkgreen(str(tries)), 1382 blue(str(counter)), 1383 bold(str(maxcount)), 1384 blue(action), 1385 brown(_("failed, retrying")), 1386 red(os.path.basename(mypath)), 1387 ), 1388 importance = 0, 1389 type = "warning", 1390 header = brown(" @@ ") 1391 ) 1392 lastrc = rc 1393 continue 1394 1395 if not done: 1396 1397 self.Entropy.updateProgress( 1398 "[%s|(%s/%s)] %s %s: %s - %s: %s" % ( 1399 blue(crippled_uri), 1400 blue(str(counter)), 1401 bold(str(maxcount)), 1402 blue(action), 1403 darkred("failed, giving up"), 1404 red(os.path.basename(mypath)), 1405 _("error"), 1406 lastrc, 1407 ), 1408 importance = 1, 1409 type = "error", 1410 header = darkred(" !!! ") 1411 ) 1412 1413 if mypath not in self.critical_files: 1414 self.Entropy.updateProgress( 1415 "[%s|(%s/%s)] %s: %s, %s..." % ( 1416 blue(crippled_uri), 1417 blue(str(counter)), 1418 bold(str(maxcount)), 1419 blue(_("not critical")), 1420 os.path.basename(mypath), 1421 blue(_("continuing")), 1422 ), 1423 importance = 1, 1424 type = "warning", 1425 header = brown(" @@ ") 1426 ) 1427 continue 1428 1429 ftp.close() 1430 errors = True 1431 broken_uris.add((uri,lastrc)) 1432 # next mirror 1433 break 1434 1435 # close connection 1436 ftp.close() 1437 fine_uris.add(uri) 1438 1439 return errors,fine_uris,broken_uris
1440