From 0efbc8ea27c4625fc01675f26ca3bbf817198395 Mon Sep 17 00:00:00 2001 From: "(no author)" <(no author)@cd1c1023-2f26-0410-ae45-c471fc1f0318> Date: Tue, 25 Mar 2008 08:33:54 +0000 Subject: [PATCH] Entropy: - fixed sqlite IntegrityError trapping in etpDatabase.addPackage() - third big commit for the Socket Interface, adding sessions management, changed protocol git-svn-id: http://svn.sabayonlinux.org/projects/entropy/trunk@1509 cd1c1023-2f26-0410-ae45-c471fc1f0318 --- client/entropy-system-test-client | 61 +++++++++++++---------- libraries/databaseTools.py | 2 +- libraries/entropy.py | 82 ++++++++++++++++++++++++------- 3 files changed, 99 insertions(+), 46 deletions(-) diff --git a/client/entropy-system-test-client b/client/entropy-system-test-client index 83790b06c..9303d5718 100644 --- a/client/entropy-system-test-client +++ b/client/entropy-system-test-client @@ -3,9 +3,9 @@ import sys import socket serverHost = "localhost" serverPort = 999 -s = None -def connect(): - global s +def spawn(cmd, silent = False): + + # connect s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.connect((serverHost, serverPort)) @@ -15,31 +15,40 @@ def connect(): sys.exit(1) else: raise -connect() -try: - #s.send("reposync reponames=['sabayonlinux.org'] forceUpdate=True") - s.send("match 'x11-libs/qt'") - data = s.recv(1024) - while data: - if not data: - break - sys.stdout.write(data) - sys.stdout.flush() - data = s.recv(1024) - # get rc + # send command + s.send(cmd) + + # get data + data = '' + x = s.recv(1024) + while x: + data += x + if not silent: + sys.stdout.write(x) + sys.stdout.flush() + x = s.recv(1024) s.close() - sys.stdout.write("\n") - connect() - s.send("rc") - result = '' - d = s.recv(1024) - while d: - result += d - d = s.recv(1024) - print "returned data:",result + return data + +try: + + # 1st step, get session + session = spawn("begin", silent = True) + print "session is:",session # XXX VALIDATE + + # 2nd step, run the command + #s.send("reposync reponames=['sabayonlinux.org'] forceUpdate=True") + result = spawn("%s match 'x11-libs/qt'" % (session,), silent = True) + print "spawn result is:",result + + # 3rd step, get rc + rc = spawn("%s rc" % (session,), silent = True) + print "REAL returned data:",rc + + # 4th step, end session + rc = spawn("%s end" % (session,), silent = True) + print "END session result",rc except KeyboardInterrupt: - s.close() sys.exit(0) -s.close() \ No newline at end of file diff --git a/libraries/databaseTools.py b/libraries/databaseTools.py index 850aea536..9c6326846 100644 --- a/libraries/databaseTools.py +++ b/libraries/databaseTools.py @@ -884,7 +884,7 @@ class etpDatabase: etpData['branch'], ) ) - except IntegrityError: # we have a PRIMARY KEY we need to remove + except dbapi2.IntegrityError: # we have a PRIMARY KEY we need to remove self.migrateCountersTable() self.cursor.execute( 'INSERT into counters VALUES ' diff --git a/libraries/entropy.py b/libraries/entropy.py index 258f5894d..18991323d 100644 --- a/libraries/entropy.py +++ b/libraries/entropy.py @@ -9346,9 +9346,12 @@ class SocketHostInterface: self.hostname = etpConst['socket_service']['hostname'] self.port = etpConst['socket_service']['port'] self.threads = etpConst['socket_service']['threads'] + self.sessions = {} # FIXME: add policy handling self.valid_commands = [ + 'begin', + 'end', 'reposync', 'rc', 'match' @@ -9358,7 +9361,6 @@ class SocketHostInterface: self.conn_active = False self.channel = None self.Entropy = intf - self.last_result = None self.Entropy = intf(*args, **kwds) self.Entropy_updateProgress = self.Entropy.updateProgress self.updateProgress = self.localUpdateProgress @@ -9366,7 +9368,6 @@ class SocketHostInterface: self.Entropy.progress = self.remoteUpdateProgress self.Entropy.urlFetcher = self.SocketUrlFetcher self.lastoutput = '' - self.lastresult = None self.SocketServer = self.socket.socket ( self.socket.AF_INET, self.socket.SOCK_STREAM ) while 1: @@ -9382,6 +9383,18 @@ class SocketHostInterface: raise self.SocketServer.listen ( self.threads ) + def get_new_session(self): + rng = str(int(random.random()*100000)) + while rng in self.sessions: + rng = str(int(random.random()*100000)) + self.sessions[rng] = {} + return rng + + def destroy_session(self, session): + if self.sessions.has_key(session): + del self.sessions[session] + return True + return False def go(self): try: @@ -9412,18 +9425,18 @@ class SocketHostInterface: # validate command args = data.split() - cmd = args[0] - if cmd not in self.valid_commands: + session = args[0] + if session in ["begin","rc"]: + cmd = args[0] + session = None + else: + cmd = args[1] + args = args[1:] # remove session + + # answer to invalid commands + if (cmd not in self.valid_commands) or (session not in self.sessions and (cmd != "begin")): self.channel.send ( "NOP" ) self.channel.close() - self.lastresult = None - break - - # get last returned output - if cmd == "rc": - self.channel.send ( str(self.lastresult) ) - self.channel.close() - self.lastresult = None break try: @@ -9431,16 +9444,16 @@ class SocketHostInterface: if len(args) > 1: myargs = args[1:] # FIXME: run this in parallel to avoid locks and check when done (sleep until done) - self.lastresult = self.run_task(cmd, myargs) + rc = self.run_task(cmd, myargs, session) except Exception, e: self.channel.send ( "Exception: %s\n" % (str(Exception),) ) self.channel.send ( "Error: %s\n" % (e,) ) self.channel.send ( "ERR" ) self.channel.close() - self.lastresult = None break - self.channel.send ( "SUC" ) + if cmd not in ["rc","begin","end"]: + self.channel.send ( "SUC" ) self.updateProgress('close: %s' % (details,)) self.channel.close() break @@ -9457,7 +9470,7 @@ class SocketHostInterface: else: raise - def run_task(self, cmd, args): + def run_task(self, cmd, args, session): myargs = [] mykwargs = {} for arg in args: @@ -9468,20 +9481,51 @@ class SocketHostInterface: mykwargs[a] = eval(b) else: myargs.append(eval(arg)) - rc = self.spawn_function(cmd, myargs, mykwargs) - return rc + return self.spawn_function(cmd, myargs, mykwargs, session) - def spawn_function(self, cmd, myargs, mykwargs): + def _store_rc(self, rc, session): + if type(rc) in (list,tuple,): + rc_item = rc[:] + elif type(rc) in (set,frozenset,dict,): + rc_item = rc.copy() + else: + rc_item = rc + self.sessions[session]['rc'] = rc_item + + def _get_rc(self, session): + return self.sessions[session]['rc'] + + def spawn_function(self, cmd, myargs, mykwargs, session): self.updateProgress('called %s: args: %s, kwargs: %s' % (cmd,myargs,mykwargs,)) if cmd == "reposync": repoConn = self.Entropy.Repositories(*myargs, **mykwargs) rc = repoConn.sync() + self._store_rc(rc, session) return rc elif cmd == "match": rc = self.Entropy.atomMatch(*myargs, **mykwargs) + self._store_rc(rc, session) + return rc + + elif cmd == "rc": + rc = self._get_rc(session) + self.channel.send(str(rc)) + return rc + + elif cmd == "begin": + session = self.get_new_session() + self.channel.send(session) + return session + + elif cmd == "end": + rc = self.destroy_session(session) + if rc: + self.channel.send ( "SUC" ) + else: + self.channel.send ( "ERR" ) return rc def remoteUpdateProgress(self, text, header = "", footer = "", back = False, importance = 0, type = "info", count = [], percent = False):