1
2 """
3
4 @author: Fabio Erculiani <lxnay@sabayonlinux.org>
5 @contact: lxnay@sabayonlinux.org
6 @copyright: Fabio Erculiani
7 @license: GPL-2
8
9 B{Entropy transceiver module}.
10
11 """
12 from __future__ import with_statement
13 import os
14 import urllib2
15 import time
16 from entropy.const import etpConst
17 from entropy.output import TextInterface, darkblue, darkred, purple, blue, \
18 brown, darkgreen, red, bold
19 from entropy.exceptions import *
20 from entropy.i18n import _
21 from entropy.misc import TimeScheduled, ParallelTask
22 from entropy.core.settings.base import SystemSettings
23
25
26 - def __init__(self, url, path_to_save, checksum = True,
27 show_speed = True, resume = True,
28 abort_check_func = None, disallow_redirect = False,
29 thread_stop_func = None, speed_limit = None,
30 OutputInterface = None):
31
32 self.__system_settings = SystemSettings()
33 if speed_limit == None:
34 speed_limit = self.__system_settings['repositories']['transfer_limit']
35
36 self.progress = None
37 import entropy.tools as entropyTools
38 import socket
39 self.entropyTools, self.socket = entropyTools, socket
40 self.__th_id = 0
41 self.__resume = resume
42 self.__url = self.__encode_url(url)
43 self.__path_to_save = path_to_save
44 self.__checksum = checksum
45 self.__show_speed = show_speed
46 self.__abort_check_func = abort_check_func
47 self.__thread_stop_func = thread_stop_func
48 self.__disallow_redirect = disallow_redirect
49 self.__speedlimit = speed_limit
50 self.__existed_before = False
51 self.localfile = None
52
53
54 self.__datatransfer = 0
55 self.__resumed = False
56
57 uname = os.uname()
58 self.user_agent = "Entropy/%s (compatible; %s; %s: %s %s %s)" % (
59 etpConst['entropyversion'],
60 "Entropy",
61 os.path.basename(self.__url),
62 uname[0],
63 uname[4],
64 uname[2],
65 )
66 self.__extra_header_data = {}
67 self.__Output = OutputInterface
68 if self.__Output == None:
69 self.__Output = TextInterface()
70 elif not hasattr(self.__Output,'updateProgress'):
71 mytxt = _("Output interface passed doesn't have the updateProgress method")
72 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,))
73 elif not callable(self.__Output.updateProgress):
74 mytxt = _("Output interface passed doesn't have the updateProgress method")
75 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,))
76
78
79 self.__resumed = False
80 self.__buffersize = 8192
81 self.__status = None
82 self.__remotefile = None
83 self.__downloadedsize = 0
84 self.__average = 0
85 self.__remotesize = 0
86 self.__oldaverage = 0.0
87
88 self.__startingposition = 0
89 self.__datatransfer = 0
90 self.__time_remaining = "(infinite)"
91 self.__time_remaining_secs = 0
92 self.__elapsed = 0.0
93 self.__updatestep = 0.2
94 self.__transferpollingtime = float(1)/4
95 self.__existed_before = False
96 if os.path.lexists(self.__path_to_save):
97 self.__existed_before = True
98 self.__setup_resume_support()
99 self._setup_proxy()
100
102
103
104
105
106 if isinstance(self.localfile, file):
107 try:
108 self.localfile.flush()
109 self.localfile.close()
110 except (IOError, OSError,):
111 pass
112
113
114 if os.path.isfile(self.__path_to_save) and \
115 os.access(self.__path_to_save,os.W_OK) and self.__resume:
116
117 self.localfile = open(self.__path_to_save,"awb")
118 self.localfile.seek(0,os.SEEK_END)
119 self.__startingposition = int(self.localfile.tell())
120 self.__resumed = True
121
122 elif os.path.lexists(self.__path_to_save) and not \
123 self.entropyTools.is_valid_path(self.__path_to_save):
124 try:
125 os.remove(self.__path_to_save)
126 except OSError:
127 pass
128 self.localfile = open(self.__path_to_save,"wb")
129
131
132 mydict = {}
133 proxy_data = self.__system_settings['system']['proxy']
134 if proxy_data['ftp']:
135 mydict['ftp'] = proxy_data['ftp']
136 if proxy_data['http']:
137 mydict['http'] = proxy_data['http']
138 if mydict:
139 mydict['username'] = proxy_data['username']
140 mydict['password'] = proxy_data['password']
141 self.entropyTools.add_proxy_opener(urllib2, mydict)
142 else:
143
144 urllib2._opener = None
145
147 import urllib
148 url = os.path.join(os.path.dirname(url),
149 urllib.quote(os.path.basename(url)))
150 return url
151
154
156
157 self._init_vars()
158 self.speedUpdater = TimeScheduled(
159 self.__transferpollingtime,
160 self.__update_speed,
161 )
162 self.speedUpdater.start()
163
164 self.socket.setdefaulttimeout(20)
165
166 if self.__url.startswith("http://"):
167 headers = { 'User-Agent' : self.user_agent }
168 req = urllib2.Request(self.__url, self.__extra_header_data, headers)
169 else:
170 req = self.__url
171
172 u_agent_error = False
173 while 1:
174
175 try:
176 self.__remotefile = urllib2.urlopen(req)
177 except KeyboardInterrupt:
178 self.__close(False)
179 raise
180 except urllib2.HTTPError, e:
181 if (e.code == 405) and not u_agent_error:
182
183 req = self.__url
184 u_agent_error = True
185 continue
186 self.__close(True)
187 self.__status = "-3"
188 return self.__status
189 except:
190 self.__close(True)
191 self.__status = "-3"
192 return self.__status
193 break
194
195 try:
196 self.__remotesize = int(self.__remotefile.headers.get(
197 "content-length"))
198 self.__remotefile.close()
199 except KeyboardInterrupt:
200 self.__close(False)
201 raise
202 except:
203 pass
204
205
206 try:
207 request = self.__url
208 if ((self.__startingposition > 0) and (self.__remotesize > 0)) \
209 and (self.__startingposition < self.__remotesize):
210
211 try:
212 request = urllib2.Request(
213 self.__url,
214 headers = {
215 "Range" : "bytes=" + \
216 str(self.__startingposition) + "-" + \
217 str(self.__remotesize)
218 }
219 )
220 except KeyboardInterrupt:
221 self.__close(False)
222 raise
223 except:
224 pass
225 elif (self.__startingposition == self.__remotesize):
226
227 self.__close(False)
228 return self.__prepare_return()
229 else:
230 self.localfile = open(self.__path_to_save,"wb")
231 self.__remotefile = urllib2.urlopen(request)
232 except KeyboardInterrupt:
233 self.__close(False)
234 raise
235 except:
236 self.__close(True)
237 self.__status = "-3"
238 return self.__status
239
240 if self.__remotesize > 0:
241 self.__remotesize = float(int(self.__remotesize))/1024
242
243 if self.__disallow_redirect and \
244 (self.__url != self.__remotefile.geturl()):
245
246 self.__close(True)
247 self.__status = "-3"
248 return self.__status
249
250 while 1:
251 try:
252 rsx = self.__remotefile.read(self.__buffersize)
253 if rsx == '': break
254 if self.__abort_check_func != None:
255 self.__abort_check_func()
256 if self.__thread_stop_func != None:
257 self.__thread_stop_func()
258 except KeyboardInterrupt:
259 self.__close(False)
260 raise
261 except self.socket.timeout:
262 self.__close(False)
263 self.__status = "-4"
264 return self.__status
265 except:
266
267 self.__close(True)
268 self.__status = "-3"
269 return self.__status
270 self.__commit(rsx)
271 if self.__show_speed:
272 self.handle_statistics(self.__th_id, self.__downloadedsize,
273 self.__remotesize, self.__average, self.__oldaverage,
274 self.__updatestep, self.__show_speed, self.__datatransfer,
275 self.__time_remaining, self.__time_remaining_secs
276 )
277 self.updateProgress()
278 self.__oldaverage = self.__average
279 if self.__speedlimit:
280 while self.__datatransfer > self.__speedlimit*1024:
281 time.sleep(0.1)
282 if self.__show_speed:
283 self.updateProgress()
284 self.__oldaverage = self.__average
285
286
287 self.__close(False)
288 return self.__prepare_return()
289
290
292 if self.__checksum:
293 self.__status = self.entropyTools.md5sum(self.__path_to_save)
294 return self.__status
295 self.__status = "-2"
296 return self.__status
297
299
300 self.localfile.write(mybuffer)
301
302 self.__downloadedsize = self.localfile.tell()
303 kbytecount = float(self.__downloadedsize)/1024
304 self.__average = int((kbytecount/self.__remotesize)*100)
305
307 try:
308 if isinstance(self.localfile, file):
309 self.localfile.flush()
310 self.localfile.close()
311 except IOError:
312 pass
313 if (not self.__existed_before) and errored:
314 try:
315 os.remove(self.__path_to_save)
316 except OSError:
317 pass
318 try:
319 self.__remotefile.close()
320 except:
321 pass
322 self.speedUpdater.kill()
323 self.socket.setdefaulttimeout(2)
324
326 self.__elapsed += self.__transferpollingtime
327
328 x_delta = self.__downloadedsize - self.__startingposition
329 self.__datatransfer = x_delta / self.__elapsed
330 if self.__datatransfer < 0:
331 self.__datatransfer = 0
332 try:
333 rounded_remote = int(round(self.__remotesize*1024,0))
334 rounded_downloaded = int(round(self.__downloadedsize,0))
335 x_delta = rounded_remote - rounded_downloaded
336 tx_round = int(round(x_delta/self.__datatransfer,0))
337 self.__time_remaining_secs = tx_round
338 self.__time_remaining = self.entropyTools.convert_seconds_to_fancy_output(self.__time_remaining_secs)
339 except:
340 self.__time_remaining = "(%s)" % (_("infinite"),)
341
343 return self.__datatransfer
344
346 return self.__resumed
347
348 - def handle_statistics(self, th_id, downloaded_size, total_size,
349 average, old_average, update_step, show_speed, data_transfer,
350 time_remaining, time_remaining_secs):
352
354
355 mytxt = _("[F]")
356 eta_txt = _("ETA")
357 sec_txt = _("sec")
358
359 currentText = darkred(" %s: " % (mytxt,)) + \
360 darkgreen(str(round(float(self.__downloadedsize)/1024,1))) + "/" + \
361 red(str(round(self.__remotesize,1))) + " kB"
362
363 barsize = 10
364 bartext = "["
365 curbarsize = 1
366 if self.__average > self.__oldaverage+self.__updatestep:
367 averagesize = (self.__average*barsize)/100
368 while averagesize > 0:
369 curbarsize += 1
370 bartext += "="
371 averagesize -= 1
372 bartext += ">"
373 diffbarsize = barsize-curbarsize
374 while diffbarsize > 0:
375 bartext += " "
376 diffbarsize -= 1
377 if self.__show_speed:
378 bartext += "] => %s" % (self.entropyTools.bytes_into_human(self.__datatransfer),)
379 bartext += "/%s : %s: %s" % (sec_txt,eta_txt,self.__time_remaining,)
380 else:
381 bartext += "]"
382 average = str(self.__average)
383 if len(average) < 2:
384 average = " "+average
385 currentText += " <-> "+average+"% "+bartext
386 self.__Output.updateProgress(currentText, back = True)
387
389
390 import entropy.tools as entropyTools
391 - def __init__(self, url_path_list, checksum = True,
392 show_speed = True, resume = True,
393 abort_check_func = None, disallow_redirect = False,
394 OutputInterface = None, UrlFetcherClass = None):
395 """
396 @param url_path_list list [(url,path_to_save,),...]
397 @param checksum bool return checksum data
398 @param show_speed bool show transfer speed on the output
399 @param resume bool enable resume support
400 @param abort_check_func callable function that could
401 raise exception and stop transfer
402 @param disallow_redirect bool disable automatic HTTP redirect
403 @param OutputInterface TextInterface instance used to
404 print instance output through a common interface
405 @param UrlFetcherClass, urlFetcher instance/interface used
406 """
407 self.__system_settings = SystemSettings()
408 self.__url_path_list = url_path_list
409 self.__resume = resume
410 self.__checksum = checksum
411 self.__show_speed = show_speed
412 self.__abort_check_func = abort_check_func
413 self.__disallow_redirect = disallow_redirect
414
415
416 self.__data_transfer = 0
417 self.__average = 0
418 self.__old_average = 0
419 self.__time_remaining_sec = 0
420
421 self.__Output = OutputInterface
422 if self.__Output == None:
423 self.__Output = TextInterface()
424 elif not hasattr(self.__Output,'updateProgress'):
425 mytxt = _("Output interface passed doesn't have the updateProgress method")
426 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,))
427 elif not callable(self.__Output.updateProgress):
428 mytxt = _("Output interface passed doesn't have the updateProgress method")
429 raise IncorrectParameter("IncorrectParameter: %s" % (mytxt,))
430
431 self.__url_fetcher = UrlFetcherClass
432 if self.__url_fetcher == None:
433 self.__url_fetcher = UrlFetcher
434
435
439
441 self.__progress_data = {}
442 self.__thread_pool = {}
443 self.__download_statuses = {}
444 self.__show_progress = False
445 self.__stop_threads = False
446 self.__first_refreshes = 50
447 self.__data_transfer = 0
448 self.__average = 0
449 self.__old_average = 0
450 self.__time_remaining_sec = 0
451
453 self._init_vars()
454
455 th_id = 0
456 speed_limit = 0
457 dsl = self.__system_settings['repositories']['transfer_limit']
458 if isinstance(dsl,int) and self.__url_path_list:
459 speed_limit = dsl/len(self.__url_path_list)
460
461 class MyFetcher(self.__url_fetcher):
462
463 def __init__(self, klass, multiple, *args, **kwargs):
464 klass.__init__(self, *args, **kwargs)
465 self.__multiple_fetcher = multiple
466
467 def updateProgress(self):
468 return self.__multiple_fetcher.updateProgress()
469
470 def handle_statistics(self, *args, **kwargs):
471 return self.__multiple_fetcher.handle_statistics(*args,
472 **kwargs)
473
474 for url, path_to_save in self.__url_path_list:
475 th_id += 1
476 downloader = MyFetcher(self.__url_fetcher, self, url, path_to_save,
477 checksum = self.__checksum, show_speed = self.__show_speed,
478 resume = self.__resume,
479 abort_check_func = self.__abort_check_func,
480 disallow_redirect = self.__disallow_redirect,
481 thread_stop_func = self.__handle_threads_stop,
482 speed_limit = speed_limit,
483 OutputInterface = self.__Output
484 )
485 downloader.set_id(th_id)
486
487 def do_download(ds, th_id, downloader):
488 ds[th_id] = downloader.download()
489
490 t = ParallelTask(do_download, self.__download_statuses, th_id, downloader)
491 self.__thread_pool[th_id] = t
492 t.start()
493 self.show_download_files_info()
494 self.__show_progress = True
495 while len(self.__url_path_list) != len(self.__download_statuses):
496 try:
497 time.sleep(0.5)
498 except (SystemExit, KeyboardInterrupt,):
499 self.__stop_threads = True
500 raise
501
502 return self.__download_statuses
503
505 return self.__data_transfer
506
508 return self.__average
509
511 return self.__time_remaining_sec
512
514 count = 0
515 pl = self.__url_path_list[:]
516 self.__Output.updateProgress(
517 "%s: %s %s" % (
518 darkblue(_("Aggregated download")),
519 darkred(str(len(pl))),
520 darkblue(_("items")),
521 ),
522 importance = 0,
523 type = "info",
524 header = purple(" ## ")
525 )
526 for url, save_path in pl:
527 count += 1
528 fname = os.path.basename(url)
529 uri = self.entropyTools.spliturl(url)[1]
530 self.__Output.updateProgress(
531 "[%s] %s => %s" % (
532 darkblue(str(count)),
533 darkgreen(uri),
534 blue(fname),
535 ),
536 importance = 0,
537 type = "info",
538 header = brown(" # ")
539 )
540
541 - def handle_statistics(self, th_id, downloaded_size, total_size,
542 average, old_average, update_step, show_speed, data_transfer,
543 time_remaining, time_remaining_secs):
544 data = {
545 'th_id': th_id,
546 'downloaded_size': downloaded_size,
547 'total_size': total_size,
548 'average': average,
549 'old_average': old_average,
550 'update_step': update_step,
551 'show_speed': show_speed,
552 'data_transfer': data_transfer,
553 'time_remaining': time_remaining,
554 'time_remaining_secs': time_remaining_secs,
555 }
556 self.__progress_data[th_id] = data
557
559
560 eta_txt = _("ETA")
561 sec_txt = _("sec")
562 downloaded_size = 0
563 total_size = 0
564 time_remaining = 0
565 data_transfer = 0
566 update_step = 0
567 average = 100
568 pd = self.__progress_data.copy()
569 pdlen = len(pd)
570
571
572 for th_id in sorted(pd):
573 data = pd.get(th_id)
574 downloaded_size += data.get('downloaded_size',0)
575 total_size += data.get('total_size',0)
576 data_transfer += data.get('data_transfer',0)
577 tr = data.get('time_remaining_secs',0)
578 if tr > 0: time_remaining += tr
579 update_step += data.get('update_step',0)
580
581
582
583 if total_size > 0:
584 average = int(float(downloaded_size/1024)/total_size * 100)
585 self.__data_transfer = data_transfer
586 self.__average = average
587 update_step = update_step/pdlen
588 self.__time_remaining_sec = time_remaining
589 time_remaining = self.entropyTools.convert_seconds_to_fancy_output(time_remaining)
590
591 if ((average > self.__old_average+update_step) or \
592 (self.__first_refreshes > 0)) and self.__show_progress:
593
594 self.__first_refreshes -= 1
595 currentText = darkgreen(str(round(float(downloaded_size)/1024,1))) + "/" + \
596 red(str(round(total_size,1))) + " kB"
597
598 barsize = 10
599 bartext = "["
600 curbarsize = 1
601 averagesize = (average*barsize)/100
602 while averagesize > 0:
603 curbarsize += 1
604 bartext += "="
605 averagesize -= 1
606 bartext += ">"
607 diffbarsize = barsize-curbarsize
608 while diffbarsize > 0:
609 bartext += " "
610 diffbarsize -= 1
611 if self.__show_speed:
612 bartext += "] => %s" % (self.entropyTools.bytes_into_human(data_transfer),)
613 bartext += "/%s : %s: %s" % (sec_txt,eta_txt,time_remaining,)
614 else:
615 bartext += "]"
616 myavg = str(average)
617 if len(myavg) < 2:
618 myavg = " "+myavg
619 currentText += " <-> "+myavg+"% "+bartext+" "
620 self.__Output.updateProgress(currentText, back = True)
621
622 self.__old_average = average
623
624
626
627
628 - def __init__(self, ftpuri, OutputInterface, verbose = True):
629
630 if not hasattr(OutputInterface,'updateProgress'):
631 mytxt = _("OutputInterface does not have an updateProgress method")
632 raise IncorrectParameter("IncorrectParameter: %s, (! %s !)" % (OutputInterface,mytxt,))
633 elif not callable(OutputInterface.updateProgress):
634 mytxt = _("OutputInterface does not have an updateProgress method")
635 raise IncorrectParameter("IncorrectParameter: %s, (! %s !)" % (OutputInterface,mytxt,))
636
637 import socket, ftplib
638 import entropy.tools as entropyTools
639 self.socket, self.ftplib, self.entropyTools = socket, ftplib, entropyTools
640 self.Entropy = OutputInterface
641 self.__verbose = verbose
642 self.__init_vars()
643 self.socket.setdefaulttimeout(60)
644 self.__ftpuri = ftpuri
645 self.__speed_updater = None
646 self.__currentdir = '.'
647 self.__ftphost = self.entropyTools.extract_ftp_host_from_uri(self.__ftpuri)
648 self.__ftpuser, self.__ftppassword, self.__ftpport, self.__ftpdir = self.entropyTools.extract_ftp_data(ftpuri)
649
650 count = 10
651 while 1:
652 count -= 1
653 try:
654 self.__ftpconn = self.ftplib.FTP(self.__ftphost)
655 break
656 except (self.socket.gaierror,), e:
657 raise ConnectionError('ConnectionError: %s' % (e,))
658 except (self.socket.error,), e:
659 if not count:
660 raise ConnectionError('ConnectionError: %s' % (e,))
661 continue
662 except:
663 if not count: raise
664 continue
665
666 if self.__verbose:
667 mytxt = _("connecting with user")
668 self.Entropy.updateProgress(
669 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpuser),),
670 importance = 1,
671 type = "info",
672 header = darkgreen(" * ")
673 )
674 try:
675 self.__ftpconn.login(self.__ftpuser,self.__ftppassword)
676 except self.ftplib.error_perm, e:
677 raise FtpError('FtpError: %s' % (e,))
678 if self.__verbose:
679 mytxt = _("switching to")
680 self.Entropy.updateProgress(
681 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpdir),),
682 importance = 1,
683 type = "info",
684 header = darkgreen(" * ")
685 )
686 self.set_cwd(self.__ftpdir, dodir = True)
687
689 self.__oldprogress = 0.0
690 self.__filesize = 0
691 self.__filekbcount = 0
692 self.__transfersize = 0
693 self.__startingposition = 0
694 self.__elapsed = 0.0
695 self.__time_remaining_secs = 0
696 self.__time_remaining = "(%s)" % (_("infinite"),)
697 self.__transferpollingtime = float(1)/4
698
700 return self.set_cwd(self.__ftpdir)
701
702
704
705 self.socket.setdefaulttimeout(60)
706 counter = 10
707 while 1:
708 counter -= 1
709 try:
710 self.__ftpconn = self.ftplib.FTP(self.__ftphost)
711 break
712 except:
713 if not counter:
714 raise
715 continue
716 if self.__verbose:
717 mytxt = _("reconnecting with user")
718 self.Entropy.updateProgress(
719 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpuser),),
720 importance = 1,
721 type = "info",
722 header = darkgreen(" * ")
723 )
724 self.__ftpconn.login(self.__ftpuser,self.__ftppassword)
725 if self.__verbose:
726 mytxt = _("switching to")
727 self.Entropy.updateProgress(
728 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(self.__ftpdir),),
729 importance = 1,
730 type = "info",
731 header = darkgreen(" * ")
732 )
733 self.set_cwd(self.__currentdir)
734
736 return self.__ftphost
737
739 return self.__ftpport
740
743
745 pwd = self.__ftpconn.pwd()
746 return pwd
747
748 - def set_cwd(self, mydir, dodir = False):
749 try:
750 return self._set_cwd(mydir, dodir)
751 except self.ftplib.error_perm, e:
752 raise FtpError('FtpError: %s' % (e,))
753
754 - def _set_cwd(self, mydir, dodir = False):
755 if self.__verbose:
756 mytxt = _("switching to")
757 self.Entropy.updateProgress(
758 "[ftp:%s] %s: %s" % (darkgreen(self.__ftphost),mytxt,blue(mydir),),
759 importance = 1,
760 type = "info",
761 header = darkgreen(" * ")
762 )
763 try:
764 self.__ftpconn.cwd(mydir)
765 except self.ftplib.error_perm, e:
766 if e[0][:3] == '550' and dodir:
767 self.recursive_mkdir(mydir)
768 self.__ftpconn.cwd(mydir)
769 else:
770 raise
771 self.__currentdir = self.get_cwd()
772
775
777 return self.__ftpconn.voidcmd("SITE CHMOD "+str(chmodvalue)+" "+str(file))
778
780 rc = self.__ftpconn.sendcmd("mdtm "+path)
781 return rc.split()[-1]
782
784 return self.__ftpconn.sendcmd(cmd)
785
787 return [os.path.basename(x) for x in self.__ftpconn.nlst()]
788
790 xx = []
791 def cb(x):
792 if x == filename: xx.append(x)
793 self.__ftpconn.retrlines('NLST',cb)
794 if xx: return True
795 return False
796
798 try:
799 rc = self.__ftpconn.delete(file)
800 except self.ftplib.error_perm, e:
801 if e[0][:3] == '550':
802 return True
803 return False
804 if rc.startswith("250"):
805 return True
806 return False
807
809 mydirs = [x for x in mypath.split("/") if x]
810 mycurpath = ""
811 for mydir in mydirs:
812 mycurpath = os.path.join(mycurpath,mydir)
813 if not self.is_file_available(mycurpath):
814 try:
815 self.mkdir(mycurpath)
816 except self.ftplib.error_perm, e:
817 if e[0].lower().find("permission denied") != -1:
818 raise
819 elif e[0][:3] != '550':
820 raise
821
822 - def mkdir(self,directory):
823 return self.__ftpconn.mkd(directory)
824
826
827
828 def advanced_stor(cmd, fp):
829 ''' Store a file in binary mode. Our version supports a callback function'''
830 self.__ftpconn.voidcmd('TYPE I')
831 conn = self.__ftpconn.transfercmd(cmd)
832 while 1:
833 buf = fp.readline()
834 if not buf:
835 break
836 conn.sendall(buf)
837 self.updateProgress(len(buf))
838 conn.close()
839
840
841
842 try:
843 rc = self.__ftpconn.voidresp()
844 except:
845 self.reconnect_host()
846 return "226"
847 return rc
848
849 tries = 0
850 while tries < 10:
851
852 tries += 1
853 filename = os.path.basename(file)
854 self.__init_vars()
855 self.__start_speed_counter()
856 try:
857
858 with open(file,"r") as f:
859
860 self.__filesize = round(float(self.entropyTools.get_file_size(file))/1024,1)
861 self.__filekbcount = 0
862
863
864 self.delete_file(filename+".tmp")
865
866 if ascii:
867 rc = self.__ftpconn.storlines("STOR "+filename+".tmp",f)
868 else:
869 rc = advanced_stor("STOR "+filename+".tmp", f)
870
871
872 self.rename_file(filename+".tmp",filename)
873
874 if rc.find("226") != -1:
875 return True
876 return False
877
878 except Exception, e:
879
880 self.entropyTools.print_traceback()
881 mytxt = red("%s: %s, %s... #%s") % (
882 _("Upload issue"),
883 e,
884 _("retrying"),
885 tries+1,
886 )
887 self.Entropy.updateProgress(
888 mytxt,
889 importance = 1,
890 type = "warning",
891 header = " "
892 )
893 self.reconnect_host()
894 self.delete_file(filename)
895 self.delete_file(filename+".tmp")
896
897 finally:
898 self.__stop_speed_counter()
899
906
907 tries = 10
908 while tries:
909 tries -= 1
910
911 self.__init_vars()
912 self.__start_speed_counter()
913 try:
914
915 self.__filekbcount = 0
916
917 self.__filesize = self.get_file_size_compat(filename)
918 if (self.__filesize):
919 self.__filesize = round(float(int(self.__filesize))/1024,1)
920 if (self.__filesize == 0):
921 self.__filesize = 1
922 elif not self.is_file_available(filename):
923 return False
924 else:
925 self.__filesize = 0
926 if not ascii:
927 f = open(downloaddir+"/"+filename,"wb")
928 rc = self.__ftpconn.retrbinary('RETR '+filename, df_up, 1024)
929 else:
930 f = open(downloaddir+"/"+filename,"w")
931 rc = self.__ftpconn.retrlines('RETR '+filename, f.write)
932 f.flush()
933 f.close()
934 if rc.find("226") != -1:
935 return True
936 return False
937
938 except Exception, e:
939
940 self.entropyTools.print_traceback()
941 mytxt = red("%s: %s, %s... #%s") % (
942 _("Download issue"),
943 e,
944 _("retrying"),
945 tries+1,
946 )
947 self.Entropy.updateProgress(
948 mytxt,
949 importance = 1,
950 type = "warning",
951 header = " "
952 )
953 self.reconnect_host()
954
955 finally:
956 self.__stop_speed_counter()
957
958
960 rc = self.__ftpconn.rename(fromfile,tofile)
961 return rc
962
964
965 try:
966 rc_data = self.__ftpconn.sendcmd("SITE MD5 %s" % (filename,))
967 except self.ftplib.error_perm:
968 return None
969 try:
970 return rc_data.split("\n")[0].split("\t")[0].split("-")[1]
971 except (IndexError, TypeError,):
972 return None
973
975 return self.__ftpconn.size(filename)
976
978 try:
979 data = [x.split() for x in self.__ftpconn.sendcmd("stat %s" % (filename,)).split("\n")]
980 except self.ftplib.error_temp:
981 return ""
982 for item in data:
983 if item[-1] == filename:
984 return item[4]
985 return ""
986
988 mybuffer = []
989 def bufferizer(buf):
990 mybuffer.append(buf)
991 self.__ftpconn.dir(bufferizer)
992 return mybuffer
993
995 try:
996 self.__ftpconn.quit()
997 except (EOFError,AttributeError,self.socket.timeout,self.ftplib.error_reply,):
998
999
1000
1001 pass
1002
1004 self.__speed_updater = TimeScheduled(
1005 self.__transferpollingtime,
1006 self.__update_speed,
1007 )
1008 self.__speed_updater.start()
1009
1011 if self.__speed_updater != None:
1012 self.__speed_updater.kill()
1013
1015 self.__elapsed += self.__transferpollingtime
1016
1017 self.__datatransfer = (self.__transfersize-self.__startingposition) / self.__elapsed
1018 if self.__datatransfer < 0:
1019 self.__datatransfer = 0
1020 try:
1021 self.__time_remaining_secs = int(round((int(round(self.__filesize*1024,0))-int(round(self.__transfersize,0)))/self.__datatransfer,0))
1022 self.__time_remaining = self.entropyTools.convert_seconds_to_fancy_output(self.__time_remaining_secs)
1023 except:
1024 self.__time_remaining = "(%s)" % (_("infinite"),)
1025
1027
1028 self.__filekbcount += float(buf_len)/1024
1029 self.__transfersize += buf_len
1030
1031 myUploadPercentage = 100.0
1032 if self.__filesize >= 1:
1033 myUploadPercentage = round((round(self.__filekbcount,1)/self.__filesize)*100,1)
1034 currentprogress = myUploadPercentage
1035 myUploadSize = round(self.__filekbcount,1)
1036 if (currentprogress > self.__oldprogress+1.0) and \
1037 (myUploadPercentage < 100.1) and \
1038 (myUploadSize <= self.__filesize):
1039
1040 myUploadPercentage = str(myUploadPercentage)+"%"
1041
1042 mytxt = _("Transfer status")
1043 currentText = brown(" <-> %s: " % (mytxt,)) + \
1044 darkgreen(str(myUploadSize)) + "/" + red(str(self.__filesize)) + " kB " + \
1045 brown("[") + str(myUploadPercentage) + brown("]") + " " + self.__time_remaining + \
1046 " " + self.entropyTools.bytes_into_human(self.__datatransfer) + "/"+_("sec")
1047
1048
1049 self.Entropy.updateProgress(currentText, back = True)
1050 self.__oldprogress = currentprogress
1051
1052
1054
1055 import entropy.tools as entropyTools
1056 - def __init__(self, ftp_interface, entropy_interface, uris, files_to_upload,
1057 download = False, remove = False, ftp_basedir = None, local_basedir = None,
1058 critical_files = [], use_handlers = False, handlers_data = {}, repo = None):
1059
1060 self.FtpInterface = ftp_interface
1061 self.Entropy = entropy_interface
1062 if not isinstance(uris,list):
1063 raise InvalidDataType("InvalidDataType: %s" % (_("uris must be a list instance"),))
1064 if not isinstance(files_to_upload,(list,dict)):
1065 raise InvalidDataType("InvalidDataType: %s" % (
1066 _("files_to_upload must be a list or dict instance"),
1067 )
1068 )
1069 self.uris = uris
1070 if isinstance(files_to_upload,list):
1071 self.myfiles = files_to_upload[:]
1072 else:
1073 self.myfiles = sorted([x for x in files_to_upload])
1074 self.download = download
1075 self.remove = remove
1076 self.repo = repo
1077 if self.repo == None:
1078 self.repo = self.Entropy.default_repository
1079 self.use_handlers = use_handlers
1080 if self.remove:
1081 self.download = False
1082 self.use_handlers = False
1083 if not ftp_basedir:
1084
1085 branch = self.Entropy.SystemSettings['repositories']['branch']
1086 my_path = os.path.join(self.Entropy.get_remote_database_relative_path(repo), branch)
1087 self.ftp_basedir = unicode(my_path)
1088 else:
1089 self.ftp_basedir = unicode(ftp_basedir)
1090 if not local_basedir:
1091
1092 self.local_basedir = os.path.dirname(self.Entropy.get_local_database_file(self.repo))
1093 else:
1094 self.local_basedir = unicode(local_basedir)
1095 self.critical_files = critical_files
1096 self.handlers_data = handlers_data.copy()
1097
1098 - def handler_verify_upload(self, local_filepath, uri, counter, maxcount,
1099 tries, remote_md5 = None):
1100
1101 crippled_uri = self.entropyTools.extract_ftp_host_from_uri(uri)
1102
1103 self.Entropy.updateProgress(
1104 "[%s|#%s|(%s/%s)] %s: %s" % (
1105 blue(crippled_uri),
1106 darkgreen(str(tries)),
1107 blue(str(counter)),
1108 bold(str(maxcount)),
1109 darkgreen(_("verifying upload (if supported)")),
1110 blue(os.path.basename(local_filepath)),
1111 ),
1112 importance = 0,
1113 type = "info",
1114 header = red(" @@ "),
1115 back = True
1116 )
1117
1118 valid_remote_md5 = True
1119
1120 if isinstance(remote_md5, basestring):
1121 valid_md5 = self.entropyTools.is_valid_md5(remote_md5)
1122 ckres = False
1123 if valid_md5:
1124 ckres = self.entropyTools.compare_md5(local_filepath,
1125 remote_md5)
1126 if ckres:
1127 self.Entropy.updateProgress(
1128 "[%s|#%s|(%s/%s)] %s: %s: %s" % (
1129 blue(crippled_uri),
1130 darkgreen(str(tries)),
1131 blue(str(counter)),
1132 bold(str(maxcount)),
1133 blue(_("digest verification")),
1134 os.path.basename(local_filepath),
1135 darkgreen(_("so far, so good!")),
1136 ),
1137 importance = 0,
1138 type = "info",
1139 header = red(" @@ ")
1140 )
1141 return True
1142
1143 elif not valid_md5:
1144
1145 self.Entropy.updateProgress(
1146 "[%s|#%s|(%s/%s)] %s: %s: %s" % (
1147 blue(crippled_uri),
1148 darkgreen(str(tries)),
1149 blue(str(counter)),
1150 bold(str(maxcount)),
1151 blue(_("digest verification")),
1152 os.path.basename(local_filepath),
1153 bold(_("malformed md5 provided to function")),
1154 ),
1155 importance = 0,
1156 type = "warning",
1157 header = brown(" @@ ")
1158 )
1159 else:
1160 self.Entropy.updateProgress(
1161 "[%s|#%s|(%s/%s)] %s: %s: %s" % (
1162 blue(crippled_uri),
1163 darkgreen(str(tries)),
1164 blue(str(counter)),
1165 bold(str(maxcount)),
1166 blue(_("digest verification")),
1167 os.path.basename(local_filepath),
1168 bold(_("remote md5 is invalid")),
1169 ),
1170 importance = 0,
1171 type = "warning",
1172 header = brown(" @@ ")
1173 )
1174 valid_remote_md5 = False
1175
1176 if not self.use_handlers:
1177
1178 return valid_remote_md5
1179
1180 checksum = self.Entropy.get_remote_package_checksum(
1181 self.repo,
1182 os.path.basename(local_filepath),
1183 self.handlers_data['branch']
1184 )
1185 if checksum == None:
1186 self.Entropy.updateProgress(
1187 "[%s|#%s|(%s/%s)] %s: %s: %s" % (
1188 blue(crippled_uri),
1189 darkgreen(str(tries)),
1190 blue(str(counter)),
1191 bold(str(maxcount)),
1192 blue(_("digest verification")),
1193 os.path.basename(local_filepath),
1194 darkred(_("not supported")),
1195 ),
1196 importance = 0,
1197 type = "info",
1198 header = red(" @@ ")
1199 )
1200 return valid_remote_md5
1201 elif isinstance(checksum, bool) and not checksum:
1202 self.Entropy.updateProgress(
1203 "[%s|#%s|(%s/%s)] %s: %s: %s" % (
1204 blue(crippled_uri),
1205 darkgreen(str(tries)),
1206 blue(str(counter)),
1207 bold(str(maxcount)),
1208 blue(_("digest verification")),
1209 os.path.basename(local_filepath),
1210 bold(_("file not found")),
1211 ),
1212 importance = 0,
1213 type = "warning",
1214 header = brown(" @@ ")
1215 )
1216 return False
1217 elif self.entropyTools.is_valid_md5(checksum):
1218
1219 ckres = self.entropyTools.compare_md5(local_filepath,checksum)
1220 if ckres:
1221 self.Entropy.updateProgress(
1222 "[%s|#%s|(%s/%s)] %s: %s: %s" % (
1223 blue(crippled_uri),
1224 darkgreen(str(tries)),
1225 blue(str(counter)),
1226 bold(str(maxcount)),
1227 blue(_("digest verification")),
1228 os.path.basename(local_filepath),
1229 darkgreen(_("so far, so good!")),
1230 ),
1231 importance = 0,
1232 type = "info",
1233 header = red(" @@ ")
1234 )
1235 return True
1236 else:
1237 self.Entropy.updateProgress(
1238 "[%s|#%s|(%s/%s)] %s: %s: %s" % (
1239 blue(crippled_uri),
1240 darkgreen(str(tries)),
1241 blue(str(counter)),
1242 bold(str(maxcount)),
1243 blue(_("digest verification")),
1244 os.path.basename(local_filepath),
1245 darkred(_("invalid checksum")),
1246 ),
1247 importance = 0,
1248 type = "warning",
1249 header = brown(" @@ ")
1250 )
1251 return False
1252 else:
1253 self.Entropy.updateProgress(
1254 "[%s|#%s|(%s/%s)] %s: %s: %s" % (
1255 blue(crippled_uri),
1256 darkgreen(str(tries)),
1257 blue(str(counter)),
1258 bold(str(maxcount)),
1259 blue(_("digest verification")),
1260 os.path.basename(local_filepath),
1261 darkred(_("unknown data returned")),
1262 ),
1263 importance = 0,
1264 type = "warning",
1265 header = brown(" @@ ")
1266 )
1267 return valid_remote_md5
1268
1270
1271 broken_uris = set()
1272 fine_uris = set()
1273 errors = False
1274 action = 'upload'
1275 if self.download:
1276 action = 'download'
1277 elif self.remove:
1278 action = 'remove'
1279
1280 for uri in self.uris:
1281
1282 crippled_uri = self.entropyTools.extract_ftp_host_from_uri(uri)
1283 self.Entropy.updateProgress(
1284 "[%s|%s] %s..." % (
1285 blue(crippled_uri),
1286 brown(action),
1287 blue(_("connecting to mirror")),
1288 ),
1289 importance = 0,
1290 type = "info",
1291 header = blue(" @@ ")
1292 )
1293 try:
1294 ftp = self.FtpInterface(uri, self.Entropy)
1295 except ConnectionError:
1296 self.entropyTools.print_traceback()
1297 return True,fine_uris,broken_uris
1298 branch = self.Entropy.SystemSettings['repositories']['branch']
1299 my_path = os.path.join(self.Entropy.get_remote_database_relative_path(self.repo), branch)
1300 self.Entropy.updateProgress(
1301 "[%s|%s] %s %s..." % (
1302 blue(crippled_uri),
1303 brown(action),
1304 blue(_("changing directory to")),
1305 darkgreen(my_path),
1306 ),
1307 importance = 0,
1308 type = "info",
1309 header = blue(" @@ ")
1310 )
1311
1312 ftp.set_cwd(self.ftp_basedir, dodir = True)
1313 maxcount = len(self.myfiles)
1314 counter = 0
1315
1316 for mypath in self.myfiles:
1317
1318 ftp.set_basedir()
1319 ftp.set_cwd(self.ftp_basedir, dodir = True)
1320
1321 mycwd = None
1322 if isinstance(mypath,tuple):
1323 if len(mypath) < 2: continue
1324 mycwd = mypath[0]
1325 mypath = mypath[1]
1326 ftp.set_cwd(mycwd, dodir = True)
1327
1328 syncer = ftp.upload_file
1329 myargs = [mypath]
1330 if self.download:
1331 syncer = ftp.download_file
1332 myargs = [os.path.basename(mypath),self.local_basedir]
1333 elif self.remove:
1334 syncer = ftp.delete_file
1335
1336 counter += 1
1337 tries = 0
1338 done = False
1339 lastrc = None
1340 while tries < 5:
1341 tries += 1
1342 self.Entropy.updateProgress(
1343 "[%s|#%s|(%s/%s)] %s: %s" % (
1344 blue(crippled_uri),
1345 darkgreen(str(tries)),
1346 blue(str(counter)),
1347 bold(str(maxcount)),
1348 blue(action+"ing"),
1349 red(os.path.basename(mypath)),
1350 ),
1351 importance = 0,
1352 type = "info",
1353 header = red(" @@ ")
1354 )
1355 rc = syncer(*myargs)
1356 if rc and not self.download:
1357
1358
1359 remote_md5 = ftp.get_file_md5(os.path.basename(mypath))
1360 rc = self.handler_verify_upload(mypath, uri,
1361 counter, maxcount, tries, remote_md5 = remote_md5)
1362 if rc:
1363 self.Entropy.updateProgress(
1364 "[%s|#%s|(%s/%s)] %s %s: %s" % (
1365 blue(crippled_uri),
1366 darkgreen(str(tries)),
1367 blue(str(counter)),
1368 bold(str(maxcount)),
1369 blue(action),
1370 _("successful"),
1371 red(os.path.basename(mypath)),
1372 ),
1373 importance = 0,
1374 type = "info",
1375 header = darkgreen(" @@ ")
1376 )
1377 done = True
1378 break
1379 else:
1380 self.Entropy.updateProgress(
1381 "[%s|#%s|(%s/%s)] %s %s: %s" % (
1382 blue(crippled_uri),
1383 darkgreen(str(tries)),
1384 blue(str(counter)),
1385 bold(str(maxcount)),
1386 blue(action),
1387 brown(_("failed, retrying")),
1388 red(os.path.basename(mypath)),
1389 ),
1390 importance = 0,
1391 type = "warning",
1392 header = brown(" @@ ")
1393 )
1394 lastrc = rc
1395 continue
1396
1397 if not done:
1398
1399 self.Entropy.updateProgress(
1400 "[%s|(%s/%s)] %s %s: %s - %s: %s" % (
1401 blue(crippled_uri),
1402 blue(str(counter)),
1403 bold(str(maxcount)),
1404 blue(action),
1405 darkred("failed, giving up"),
1406 red(os.path.basename(mypath)),
1407 _("error"),
1408 lastrc,
1409 ),
1410 importance = 1,
1411 type = "error",
1412 header = darkred(" !!! ")
1413 )
1414
1415 if mypath not in self.critical_files:
1416 self.Entropy.updateProgress(
1417 "[%s|(%s/%s)] %s: %s, %s..." % (
1418 blue(crippled_uri),
1419 blue(str(counter)),
1420 bold(str(maxcount)),
1421 blue(_("not critical")),
1422 os.path.basename(mypath),
1423 blue(_("continuing")),
1424 ),
1425 importance = 1,
1426 type = "warning",
1427 header = brown(" @@ ")
1428 )
1429 continue
1430
1431 ftp.close()
1432 errors = True
1433 broken_uris.add((uri,lastrc))
1434
1435 break
1436
1437
1438 ftp.close()
1439 fine_uris.add(uri)
1440
1441 return errors,fine_uris,broken_uris
1442