#!/usr/bin/python ''' # DESCRIPTION: # generic tools for all the handlers applications Copyright (C) 2007 Fabio Erculiani This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ''' def initializePortageTree(): portage.settings.unlock() portage.settings['PORTDIR'] = etpConst['portagetreedir'] portage.settings['PORTDIR_OVERLAY'] = etpConst['overlays'] portage.settings.lock() portage.portdb.__init__(etpConst['portagetreedir']) import portage import portage_const from portage_dep import isvalidatom, isspecific, isjustname, dep_getkey, dep_getcpv from entropyConstants import * initializePortageTree() # colours support import output from output import bold, colorize, green, red, yellow, blue, darkblue, nocolor import re import sys import random import commands def isRoot(): import getpass if (getpass.getuser() == "root"): return True return False def getPortageEnv(var): try: rc = portage.config(clone=portage.settings).environ()[var] return rc except KeyError: return None def getRandomNumber(): return int(str(random.random())[2:7]) def getThirdPartyMirrors(mirrorname): return portage.thirdpartymirrors[mirrorname] # FIXME: implement SLOTS support # resolve atoms automagically (best, not current!) # sys-libs/application --> sys-libs/application-1.2.3-r1 def getBestAtom(atom): try: rc = portage.portdb.xmatch("bestmatch-visible",str(atom)) return rc except ValueError: return "!!conflicts" # I need a valid complete atom... def calculateFullAtomsDependencies(atoms, deep = False): # in order... thanks emerge :-) deepOpt = "" if (deep): deepOpt = "-Du" deplist = [] blocklist = [] cmd = "USE='"+getUSEFlags()+"' "+cdbRunEmerge+" --pretend --color=n --quiet "+deepOpt+" "+atoms result = commands.getoutput(cmd).split("\n") for line in result: if line.startswith("[ebuild"): line = line.split("] ")[1].split(" [")[0].strip() deplist.append(line) if line.startswith("[blocks"): line = line.split("] ")[1].split()[0].strip() blocklist.append(line) return deplist, blocklist # should be only used when a pkgcat/pkgname <-- is not specified (example: db, amarok, AND NOT media-sound/amarok) def getAtomCategory(atom): try: rc = portage.portdb.xmatch("match-all",str(atom))[0].split("/")[0] return rc except: return None # This function compare the version number of two atoms # This function needs a complete atom, pkgcat (not mandatory) - pkgname - pkgver # if atom1 < atom2 --> returns a NEGATIVE number # if atom1 > atom2 --> returns a POSITIVE number # if atom1 == atom2 --> returns 0 def compareAtoms(atom1,atom2): # filter pkgver x, atom1 = extractPkgNameVer(atom1) x, atom2 = extractPkgNameVer(atom2) from portage_versions import vercmp return vercmp(atom1,atom2) def countdown(secs=5,what="Counting..."): import time if secs: print what for i in range(secs): sys.stdout.write(str(i)+" ") sys.stdout.flush() time.sleep(1) def spinner(rotations, interval, message=''): for x in xrange(rotations): writechar(message + '|/-\\'[x%4] + '\r') time.sleep(interval) writechar(' ') for i in xrange(len(message)): print ' ', writechar('\r') def writechar(char): sys.stdout.write(char); sys.stdout.flush() def getArchFromChost(chost): # when we'll add new archs, we'll have to add a testcase here if chost.startswith("x86_64"): resultingArch = "amd64" elif chost.split("-")[0].startswith("i") and chost.split("-")[0].endswith("86"): resultingArch = "x86" else: resultingArch = "ERROR" return resultingArch def translateArchFromUname(string): import commands rc = commands.getoutput("uname -m").split("\n")[0] return translateArch(string,rc) def translateArch(string,chost): if string.find(ETP_ARCH_CONST) != -1: # substitute %ARCH% resultingArch = getArchFromChost(chost) return re.subn(ETP_ARCH_CONST,resultingArch, string)[0] else: return string # FIXME: add slotted packages support def getInstalledAtom(atom): rc = portage.db['/']['vartree'].dep_match(str(atom)) if (rc != []): return rc[len(rc)-1] else: return None def getInstalledAtoms(atom): rc = portage.db['/']['vartree'].dep_match(str(atom)) if (rc != []): return rc else: return None # YOU MUST PROVIDE A COMPLETE ATOM with a pkgcat ! def unmerge(atom): if isjustname(atom) or (not isvalidatom(atom)) or (atom.find("/") == -1): return 1 else: pkgcat = atom.split("/")[0] pkgnamever = atom.split("/")[1] portage.settings.unlock() rc = portage.unmerge(pkgcat, pkgnamever, ETP_ROOT_DIR, portage.settings, 1) portage.settings.lock() return rc # TO THIS FUNCTION: # must be provided a valid and complete atom # FIXME: needs some love ! def extractPkgNameVer(atom): package = atom.split("/")[len(atom.split("/"))-1] package = package.split("-") pkgname = "" pkglen = len(package) if package[pkglen-1].startswith("r"): pkgver = package[pkglen-2]+"-"+package[pkglen-1] pkglen -= 2 else: pkgver = package[len(package)-1] pkglen -= 1 for i in range(pkglen): if i == pkglen-1: pkgname += package[i] else: pkgname += package[i]+"-" return pkgname,pkgver def emerge(atom, options, outfile = None, redirect = "&>", simulate = False): if (simulate): return 0,"" # simulation enabled if (outfile is None) and (redirect == "&>"): outfile = etpConst['packagestmpdir']+"/.emerge-"+str(getRandomNumber()) elif (redirect is None): outfile = "" redirect = "" if os.path.isfile(outfile): try: os.remove(outfile) except: spawnCommand("rm -rf "+outfile) rc = spawnCommand(cdbRunEmerge+" "+options+" "+atom, redirect+outfile) return rc, outfile # create a .tbz2 file in the specified path def quickpkg(atom,dirpath): # getting package info pkgname = atom.split("/")[1] pkgcat = atom.split("/")[0] pkgfile = pkgname+".tbz2" dirpath += "/"+pkgname+".tbz2" tmpdirpath = etpConst['packagestmpdir']+"/"+pkgname+".tbz2"+"-tmpdir" if os.path.isdir(tmpdirpath): spawnCommand("rm -rf "+tmpdirpath) os.makedirs(tmpdirpath) dbdir = "/var/db/pkg/"+pkgcat+"/"+pkgname+"/" # crate file list f = open(dbdir+dbCONTENTS,"r") pkgcontent = f.readlines() f.close() _pkgcontent = [] for line in pkgcontent: line = line.strip().split()[1] if not ((os.path.isdir(line)) and (os.path.islink(line))): _pkgcontent.append(line) pkgcontent = _pkgcontent f = open(tmpdirpath+"/"+dbCONTENTS,"w") for i in pkgcontent: f.write(i+"\n") f.flush() f.close() # package them into a file rc = spawnCommand("tar cjf "+dirpath+" -C / --files-from='"+tmpdirpath+"/"+dbCONTENTS+"' --no-recursion", redirect = "&>/dev/null") # appending xpak informations import xpak tbz2 = xpak.tbz2(dirpath) tbz2.recompose(dbdir) # Remove tmp file os.system("rm -rf "+tmpdirpath) if os.path.isfile(dirpath): return dirpath else: return False def unpackTbz2(tbz2File,tmpdir = None): import xpak if tmpdir is None: tmpdir = etpConst['packagestmpdir']+"/"+tbz2File.split("/")[len(tbz2File.split("/"))-1].split(".tbz2")[0]+"/" if (not tmpdir.endswith("/")): tmpdir += "/" tbz2 = xpak.tbz2(tbz2File) if os.path.isdir(tmpdir): os.system("rm -rf "+tmpdir+"*") tbz2.decompose(tmpdir) return tmpdir # NOTE: atom must be a COMPLETE atom, with version! def isTbz2PackageAvailable(atom, verbose = False): # check if the package have been already merged atomName = atom.split("/")[len(atom.split("/"))-1] tbz2Available = False uploadPath = etpConst['packagessuploaddir']+"/"+atomName+".tbz2" storePath = etpConst['packagesstoredir']+"/"+atomName+".tbz2" packagesPath = etpConst['packagesbindir']+"/"+atomName+".tbz2" if (verbose): print "testing in directory: "+packagesPath if os.path.isfile(packagesPath): tbz2Available = packagesPath if (verbose): print "testing in directory: "+storePath if os.path.isfile(storePath): tbz2Available = storePath if (verbose): print "testing in directory: "+uploadPath if os.path.isfile(uploadPath): tbz2Available = uploadPath if (verbose): print "found here: "+str(tbz2Available) return tbz2Available def checkAtom(atom): bestAtom = getBestAtom(atom) if bestAtom == "!!conflicts": bestAtom = "" if (isvalidatom(atom) == 1) or ( bestAtom != ""): return True return False def removeSpaceAtTheEnd(string): if string.endswith(" "): return string[:len(string)-1] else: return string def md5sum(filepath): import md5 m = md5.new() readfile = file(filepath) block = readfile.read(1024) while block: m.update(block) block = readfile.read(1024) return m.hexdigest() # Tool to run commands def spawnCommand(command, redirect = None): if redirect is not None: command += " "+redirect rc = os.system(command) return rc def getPackageDependencyList(atom): pkgSplittedDeps = [] tmp = portage.portdb.aux_get(atom, ["DEPEND"])[0].split() for i in tmp: pkgSplittedDeps.append(i) tmp = portage.portdb.aux_get(atom, ["RDEPEND"])[0].split() for i in tmp: pkgSplittedDeps.append(i) tmp = portage.portdb.aux_get(atom, ["PDEPEND"])[0].split() for i in tmp: pkgSplittedDeps.append(i) return pkgSplittedDeps # parser of the gentoo db "NEEDED" file # this file is contained in the .tbz2->.xpak file def getPackageRuntimeDependencies(NEEDED): if not os.path.isfile(NEEDED): return [],[] # both empty f = open(NEEDED,"r") includedBins = f.readlines() f.close() neededLibraries = [] # filter the first word for line in includedBins: line = line.strip().split() line = line[0] depLibs = commands.getoutput("ldd "+line).split("\n") for i in depLibs: i = i.strip() if i.find("=>") != -1: i = i.split("=>")[1] # format properly if i.startswith(" "): i = i[1:] if i.startswith("//"): i = i[1:] i = i.split()[0] neededLibraries.append(i) neededLibraries = list(set(neededLibraries)) runtimeNeededPackages = [] runtimeNeededPackagesXT = [] for i in neededLibraries: if i.startswith("/"): # filter garbage pkgs = commands.getoutput(pFindLibraryXT+i).split("\n") if (pkgs[0] != ""): for y in pkgs: runtimeNeededPackagesXT.append(y) y = dep_getkey(y) runtimeNeededPackages.append(y) runtimeNeededPackages = list(set(runtimeNeededPackages)) runtimeNeededPackagesXT = list(set(runtimeNeededPackagesXT)) return runtimeNeededPackages, runtimeNeededPackagesXT def getUSEFlags(): return getPortageEnv('USE') # you must provide a complete atom def getPackageIUSE(atom): return getPackageVar(atom,"IUSE") # you must provide a complete atom def getPackageUSEList(atom): if (not atom.startswith("=")): atom = "="+atom myUseFlags = getPackageVar(atom[1:],"IUSE").split() useFlags = getUSEFlags().split() uselist = [] for myuse in myUseFlags: useFind = False for use in useFlags: if (myuse == use): useFind = True break if (useFind): uselist.append(myuse) else: uselist.append("-"+myuse) # order _uselist = [] for use in uselist: if (not use.startswith("-")): _uselist.append(red(use)) for use in uselist: if use.startswith("-"): _uselist.append(darkblue(use)) uselist = _uselist if len(uselist) > 0: import string return string.join(uselist," ") else: return "" def getPackageVar(atom,var): if atom.startswith("="): atom = atom[1:] # can't check - return error if (atom.find("/") == -1): return 1 return portage.portdb.aux_get(atom,[var])[0] def synthetizeRoughDependencies(roughDependencies, useflags = None): if useflags is None: useflags = getUSEFlags() # returns dependencies, conflicts useMatch = False openParenthesis = 0 openOr = False useFlagQuestion = False dependencies = "" conflicts = "" for atom in roughDependencies: if atom.endswith("?"): # we need to see if that useflag is enabled useFlag = atom.split("?")[0] useFlagQuestion = True for i in useflags.split(): if i.startswith("!"): if (i != useFlag): useMatch = True break else: if (i == useFlag): useMatch = True break if atom.startswith("("): openParenthesis += 1 if atom.startswith(")"): if (openOr): # remove last "_or_" from dependencies openOr = False if dependencies.endswith(dbOR): dependencies = dependencies[:len(dependencies)-len(dbOR)] dependencies += " " openParenthesis -= 1 if (openParenthesis == 0): useFlagQuestion = False useMatch = False if atom.startswith("||"): openOr = True if atom.find("/") != -1 and (not atom.startswith("!")) and (not atom.endswith("?")): # it's a package name /-??? if ((useFlagQuestion) and (useMatch)) or ((not useFlagQuestion) and (not useMatch)): # check if there's an OR dependencies += atom if (openOr): dependencies += dbOR else: dependencies += " " if atom.startswith("!") and (not atom.endswith("?")): if ((useFlagQuestion) and (useMatch)) or ((not useFlagQuestion) and (not useMatch)): conflicts += atom if (openOr): conflicts += dbOR else: conflicts += " " # format properly tmpConflicts = list(set(conflicts.split())) conflicts = '' for i in tmpConflicts: i = i[1:] # remove "!" conflicts += i+" " conflicts = removeSpaceAtTheEnd(conflicts) tmpDeps = list(set(dependencies.split())) dependencies = '' for i in tmpDeps: dependencies += i+" " dependencies = removeSpaceAtTheEnd(dependencies) return dependencies, conflicts # Collect installed packages def getInstalledPackages(): import os appDbDir = getPortageAppDbPath() dbDirs = os.listdir(appDbDir) installedAtoms = [] for pkgsdir in dbDirs: pkgdir = os.listdir(appDbDir+pkgsdir) for pdir in pkgdir: pkgcat = pkgsdir.split("/")[len(pkgsdir.split("/"))-1] pkgatom = pkgcat+"/"+pdir if pkgatom.find("-MERGING-") == -1: installedAtoms.append(pkgatom) return installedAtoms, len(installedAtoms) def getPortageAppDbPath(): rc = getPortageEnv("ROOT")+portage_const.VDB_PATH if (not rc.endswith("/")): return rc+"/" return rc # get a list, returns a sorted list def alphaSorter(seq): def stripter(s, goodchrs): badchrs = set(s) for c in goodchrs: if c in badchrs: badchrs.remove(c) badchrs = ''.join(badchrs) return s.strip(badchrs) def chr_index(value, sortorder): result = [] for c in stripter(value, order): cindex = sortorder.find(c) if cindex == -1: cindex = len(sortorder)+ord(c) result.append(cindex) return result order = ( '0123456789AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz' ) deco = [(chr_index(a, order), a) for a in seq] deco.sort() return list(x[1] for x in deco) # Temporary files cleaner def cleanup(options): toCleanDirs = [ etpConst['packagestmpdir'] ] counter = 0 for dir in toCleanDirs: print_info(red(" * ")+"Cleaning "+yellow(dir)+" directory...", back = True) dircontent = os.listdir(dir) if dircontent != []: for data in dircontent: os.system("rm -rf "+dir+"/"+data) counter += 1 print_info(green(" * ")+"Cleaned: "+str(counter)+" files and directories") def mountProc(): # check if it's already mounted procfiles = os.listdir("/proc") if len(procfiles) > 2: return True else: os.system("mount -t proc proc /proc &> /dev/null") return True def umountProc(): # check if it's already mounted procfiles = os.listdir("/proc") if len(procfiles) > 2: os.system("umount /proc &> /dev/null") os.system("umount /proc &> /dev/null") os.system("umount /proc &> /dev/null") return True else: return True def print_error(msg): print red(">>")+" "+msg def print_info(msg, back = False): writechar("\r \r") if (back): writechar("\r"+green(">>")+" "+msg) return print green(">>")+" "+msg def print_warning(msg): print yellow(">>")+" "+msg def print_generic(msg): # here we'll wrap any nice formatting print msg