From e8b5eb83dc013af59a5df3f1d86e18d853586f64 Mon Sep 17 00:00:00 2001 From: Fabio Erculiani Date: Wed, 25 Apr 2012 18:04:02 +0200 Subject: [PATCH] [entropy.spm] PortagePlugin: rewrite StdoutSplitter Rewrite StdoutSplitter in order to make possible to catch full Portage stdout/stderr redirecting it to Entropy log file. --- .../interfaces/portage_plugin/__init__.py | 364 +++++++++++------- 1 file changed, 218 insertions(+), 146 deletions(-) diff --git a/lib/entropy/spm/plugins/interfaces/portage_plugin/__init__.py b/lib/entropy/spm/plugins/interfaces/portage_plugin/__init__.py index cd628b904..cc0c48a08 100644 --- a/lib/entropy/spm/plugins/interfaces/portage_plugin/__init__.py +++ b/lib/entropy/spm/plugins/interfaces/portage_plugin/__init__.py @@ -33,13 +33,204 @@ from entropy.output import darkred, darkgreen, brown, darkblue, teal, purple, \ red, bold, blue, getcolor, decolorize from entropy.i18n import _ from entropy.core.settings.base import SystemSettings -from entropy.misc import LogFile +from entropy.misc import LogFile, ParallelTask from entropy.spm.plugins.skel import SpmPlugin import entropy.dep import entropy.tools from entropy.spm.plugins.interfaces.portage_plugin import xpak from entropy.spm.plugins.interfaces.portage_plugin import xpaktools + +class StdoutSplitter(object): + + def __init__(self, phase, logger, std): + self._phase = phase + self._logger = logger + self._std = std + self._closed = False + self._rfd, self._wfd = os.pipe() + + self._task = ParallelTask(self._pusher) + self._task.name = "StdoutSplitterPusher" + self._task.daemon = True + self._task.start() + + if sys.hexversion >= 0x3000000: + + class Writer(object): + + def __init__(self, parent, buf): + self._buf = buf + self._parent = parent + + def write(self, b): + self._buf.write(b) + self._parent.write(const_convert_to_unicode(b)) + + def flush(self): + self._buf.flush() + self._parent.flush() + + self.buffer = Writer(self, self._std.buffer) + + def __iter__(self): + return self._std + + def __hash__(self): + return hash(self._std) + + def _pusher(self): + while True: + try: + chunk = os.read(self._rfd, 512) # BLOCKS + except (IOError, OSError) as err: + # both can raise EINTR + if err.errno == errno.EINTR: + continue + if err.errno == errno.EBADF: + # fd has been closed + break + raise + try: + self._std.write(chunk) + except (OSError, IOError) as err: + sys.__stderr__.write( + "_pusher thread: " + "cannot write to stdout: " + "%s" % (repr(err),)) + try: + # write directly without mangling + os.write(self._logger.fileno(), chunk) + except (OSError, IOError) as err: + sys.__stderr__.write( + "_pusher thread: " + "cannot write to logger: " + "%s" % (repr(err),)) + if self._closed: + break + + @property + def softspace(self): + return self._std.softspace + + @property + def name(self): + return self._std.name + + @property + def newlines(self): + return self._std.newlines + + @property + def mode(self): + return self._std.mode + + @property + def errors(self): + return self._std.errors + + @property + def encoding(self): + return self._std.encoding + + @property + def closed(self): + return self._closed + + def fileno(self): + # redirect Portage to our pipe + return self._wfd + + def flush(self): + self._logger.flush() + return self._std.flush() + + def close(self): + self._closed = True + err = None + try: + os.close(self._wfd) + except OSError as err: + pass + try: + os.close(self._rfd) + except OSError: + pass + self._task.join() + if err is not None: + raise err + + def isatty(self): + return self._std.isatty() + + if sys.hexversion < 0x3000000: + def next(self): + return self._std.next() + else: + def __next__(self): + return next(self._std) + + def read(self, *args, **kwargs): + return self._std.read(*args, **kwargs) + + def readline(self, *args, **kwargs): + return self._std.readline(*args, **kwargs) + + def readlines(self, *args, **kwargs): + return self._std.readlines(*args, **kwargs) + + def seek(self, *args, **kwargs): + return self._std.seek(*args, **kwargs) + + def tell(self): + return self._std.tell() + + def truncate(self, *args, **kwargs): + return self._std.truncate(*args, **kwargs) + + def write(self, mystr): + try: + raw_string = const_convert_to_rawstring(mystr) + except UnicodeEncodeError: + raw_string = const_convert_to_rawstring( + mystr, etpConst['conf_encoding']) + + to_write = len(raw_string) + count = 0 + while to_write < count: + try: + count += os.write(self._wfd, raw_string[count:]) + except (IOError, OSError) as err: + # both can raise EINTR + if err.errno == errno.EINTR: + continue + raise + + def writelines(self, lst): + for line in lst: + self.write(line) + + if sys.hexversion >= 0x3000000: + + # line_buffering readable seekable writable + def readable(self): + return self._std.readable() + + def seekable(self): + return self._std.seekable() + + def writable(self): + return self._std.writable() + + @property + def line_buffering(self): + return self._std.line_buffering + + else: + def xreadlines(self): + return self._std.xreadlines() + + class PortagePackageGroups(dict): """ Entropy Package categories group representation @@ -1684,139 +1875,6 @@ class PortagePlugin(SpmPlugin): if key == PortagePlugin._PORTAGE_ENTROPY_PACKAGE_NAME: self._reload_modules() - class StdoutSplitter(object): - - def __init__(self, phase, logger, std): - self._phase = phase - self._logger = logger - self._std = std - - if sys.hexversion >= 0x3000000: - - class Writer(object): - - def __init__(self, parent, buf): - self._buf = buf - self._parent = parent - - def write(self, b): - self._buf.write(b) - self._parent.write(const_convert_to_unicode(b)) - - def flush(self): - self._buf.flush() - self._parent.flush() - - self.buffer = Writer(self, self._std.buffer) - - def __iter__(self): - return self._std - - def __hash__(self): - return hash(self._std) - - @property - def softspace(self): - return self._std.softspace - - @property - def name(self): - return self._std.name - - @property - def newlines(self): - return self._std.newlines - - @property - def mode(self): - return self._std.mode - - @property - def errors(self): - return self._std.errors - - @property - def encoding(self): - return self._std.encoding - - @property - def closed(self): - return self._std.closed - - def fileno(self): - return self._std.fileno() - - def flush(self): - self._logger.flush() - return self._std.flush() - - def close(self): - self._logger.close() - return self._std.close() - - def isatty(self): - return self._std.isatty() - - if sys.hexversion < 0x3000000: - def next(self): - return self._std.next() - else: - def __next__(self): - return next(self._std) - - def read(self, *args, **kwargs): - return self._std.read(*args, **kwargs) - - def readline(self, *args, **kwargs): - return self._std.readline(*args, **kwargs) - - def readlines(self, *args, **kwargs): - return self._std.readlines(*args, **kwargs) - - def seek(self, *args, **kwargs): - return self._std.seek(*args, **kwargs) - - def tell(self): - return self._std.tell() - - def truncate(self, *args, **kwargs): - return self._std.truncate(*args, **kwargs) - - def write(self, mystr): - self._logger.log( - "[Portage %s]" % (self._phase,), - etpConst['logging']['normal_loglevel_id'], "\n" + \ - decolorize(mystr)) - return self._std.write(mystr) - - def writelines(self, lst): - self._logger.log( - "[Portage %s]" % (self._phase,), - etpConst['logging']['normal_loglevel_id'], "") - self._logger.writelines([decolorize(x) for x in lst]) - return self._std.writelines(lst) - - if sys.hexversion >= 0x3000000: - - # line_buffering readable seekable writable - def readable(self): - return self._std.readable() - - def seekable(self): - return self._std.seekable() - - def writable(self): - return self._std.writable() - - @property - def line_buffering(self): - return self._std.line_buffering - - else: - - def xreadlines(self): - return self._std.xreadlines() - def _portage_doebuild(self, myebuild, action, action_metadata, mydo, tree, cpv, portage_tmpdir = None, licenses = None): @@ -1963,19 +2021,23 @@ class PortagePlugin(SpmPlugin): oldsysstdout = sys.stdout oldsysstderr = sys.stderr - if etpUi['mute']: - tmp_fd, tmp_file = tempfile.mkstemp( - prefix="entropy.spm.portage._portage_doebuild") - tmp_fw = os.fdopen(tmp_fd, "w") - sys.stdout = tmp_fw - sys.stderr = tmp_fw - else: - splitter_out = self.StdoutSplitter(mydo, logger, sys.stdout) - splitter_err = self.StdoutSplitter(mydo, logger, sys.stderr) - sys.stdout = splitter_out - sys.stderr = splitter_err - + splitter_out = None + splitter_err = None try: + if etpUi['mute']: + tmp_fd, tmp_file = tempfile.mkstemp( + prefix="entropy.spm.portage._portage_doebuild") + tmp_fw = os.fdopen(tmp_fd, "w") + sys.stdout = tmp_fw + sys.stderr = tmp_fw + else: + splitter_out = StdoutSplitter( + mydo, logger, sys.stdout) + splitter_err = StdoutSplitter( + mydo, logger, sys.stderr) + sys.stdout = splitter_out + sys.stderr = splitter_err + rc = self._portage.doebuild( str(myebuild), str(mydo), @@ -1992,6 +2054,16 @@ class PortagePlugin(SpmPlugin): finally: sys.stdout = oldsysstdout sys.stderr = oldsysstderr + if splitter_out is not None: + try: + splitter_out.close() + except OSError: + pass + if splitter_err is not None: + try: + splitter_err.close() + except OSError: + pass if etpUi['mute']: tmp_fw.flush() tmp_fw.close()