Entropy:
- fixed sqlite IntegrityError trapping in etpDatabase.addPackage() - third big commit for the Socket Interface, adding sessions management, changed protocol git-svn-id: http://svn.sabayonlinux.org/projects/entropy/trunk@1509 cd1c1023-2f26-0410-ae45-c471fc1f0318
This commit is contained in:
@@ -3,9 +3,9 @@ import sys
|
||||
import socket
|
||||
serverHost = "localhost"
|
||||
serverPort = 999
|
||||
s = None
|
||||
def connect():
|
||||
global s
|
||||
def spawn(cmd, silent = False):
|
||||
|
||||
# connect
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
s.connect((serverHost, serverPort))
|
||||
@@ -15,31 +15,40 @@ def connect():
|
||||
sys.exit(1)
|
||||
else:
|
||||
raise
|
||||
connect()
|
||||
try:
|
||||
#s.send("reposync reponames=['sabayonlinux.org'] forceUpdate=True")
|
||||
s.send("match 'x11-libs/qt'")
|
||||
data = s.recv(1024)
|
||||
while data:
|
||||
if not data:
|
||||
break
|
||||
sys.stdout.write(data)
|
||||
sys.stdout.flush()
|
||||
data = s.recv(1024)
|
||||
|
||||
# get rc
|
||||
# send command
|
||||
s.send(cmd)
|
||||
|
||||
# get data
|
||||
data = ''
|
||||
x = s.recv(1024)
|
||||
while x:
|
||||
data += x
|
||||
if not silent:
|
||||
sys.stdout.write(x)
|
||||
sys.stdout.flush()
|
||||
x = s.recv(1024)
|
||||
s.close()
|
||||
sys.stdout.write("\n")
|
||||
connect()
|
||||
s.send("rc")
|
||||
result = ''
|
||||
d = s.recv(1024)
|
||||
while d:
|
||||
result += d
|
||||
d = s.recv(1024)
|
||||
print "returned data:",result
|
||||
return data
|
||||
|
||||
try:
|
||||
|
||||
# 1st step, get session
|
||||
session = spawn("begin", silent = True)
|
||||
print "session is:",session # XXX VALIDATE
|
||||
|
||||
# 2nd step, run the command
|
||||
#s.send("reposync reponames=['sabayonlinux.org'] forceUpdate=True")
|
||||
result = spawn("%s match 'x11-libs/qt'" % (session,), silent = True)
|
||||
print "spawn result is:",result
|
||||
|
||||
# 3rd step, get rc
|
||||
rc = spawn("%s rc" % (session,), silent = True)
|
||||
print "REAL returned data:",rc
|
||||
|
||||
# 4th step, end session
|
||||
rc = spawn("%s end" % (session,), silent = True)
|
||||
print "END session result",rc
|
||||
|
||||
except KeyboardInterrupt:
|
||||
s.close()
|
||||
sys.exit(0)
|
||||
s.close()
|
||||
@@ -884,7 +884,7 @@ class etpDatabase:
|
||||
etpData['branch'],
|
||||
)
|
||||
)
|
||||
except IntegrityError: # we have a PRIMARY KEY we need to remove
|
||||
except dbapi2.IntegrityError: # we have a PRIMARY KEY we need to remove
|
||||
self.migrateCountersTable()
|
||||
self.cursor.execute(
|
||||
'INSERT into counters VALUES '
|
||||
|
||||
+63
-19
@@ -9346,9 +9346,12 @@ class SocketHostInterface:
|
||||
self.hostname = etpConst['socket_service']['hostname']
|
||||
self.port = etpConst['socket_service']['port']
|
||||
self.threads = etpConst['socket_service']['threads']
|
||||
self.sessions = {}
|
||||
|
||||
# FIXME: add policy handling
|
||||
self.valid_commands = [
|
||||
'begin',
|
||||
'end',
|
||||
'reposync',
|
||||
'rc',
|
||||
'match'
|
||||
@@ -9358,7 +9361,6 @@ class SocketHostInterface:
|
||||
self.conn_active = False
|
||||
self.channel = None
|
||||
self.Entropy = intf
|
||||
self.last_result = None
|
||||
self.Entropy = intf(*args, **kwds)
|
||||
self.Entropy_updateProgress = self.Entropy.updateProgress
|
||||
self.updateProgress = self.localUpdateProgress
|
||||
@@ -9366,7 +9368,6 @@ class SocketHostInterface:
|
||||
self.Entropy.progress = self.remoteUpdateProgress
|
||||
self.Entropy.urlFetcher = self.SocketUrlFetcher
|
||||
self.lastoutput = ''
|
||||
self.lastresult = None
|
||||
|
||||
self.SocketServer = self.socket.socket ( self.socket.AF_INET, self.socket.SOCK_STREAM )
|
||||
while 1:
|
||||
@@ -9382,6 +9383,18 @@ class SocketHostInterface:
|
||||
raise
|
||||
self.SocketServer.listen ( self.threads )
|
||||
|
||||
def get_new_session(self):
|
||||
rng = str(int(random.random()*100000))
|
||||
while rng in self.sessions:
|
||||
rng = str(int(random.random()*100000))
|
||||
self.sessions[rng] = {}
|
||||
return rng
|
||||
|
||||
def destroy_session(self, session):
|
||||
if self.sessions.has_key(session):
|
||||
del self.sessions[session]
|
||||
return True
|
||||
return False
|
||||
|
||||
def go(self):
|
||||
try:
|
||||
@@ -9412,18 +9425,18 @@ class SocketHostInterface:
|
||||
|
||||
# validate command
|
||||
args = data.split()
|
||||
cmd = args[0]
|
||||
if cmd not in self.valid_commands:
|
||||
session = args[0]
|
||||
if session in ["begin","rc"]:
|
||||
cmd = args[0]
|
||||
session = None
|
||||
else:
|
||||
cmd = args[1]
|
||||
args = args[1:] # remove session
|
||||
|
||||
# answer to invalid commands
|
||||
if (cmd not in self.valid_commands) or (session not in self.sessions and (cmd != "begin")):
|
||||
self.channel.send ( "NOP" )
|
||||
self.channel.close()
|
||||
self.lastresult = None
|
||||
break
|
||||
|
||||
# get last returned output
|
||||
if cmd == "rc":
|
||||
self.channel.send ( str(self.lastresult) )
|
||||
self.channel.close()
|
||||
self.lastresult = None
|
||||
break
|
||||
|
||||
try:
|
||||
@@ -9431,16 +9444,16 @@ class SocketHostInterface:
|
||||
if len(args) > 1:
|
||||
myargs = args[1:]
|
||||
# FIXME: run this in parallel to avoid locks and check when done (sleep until done)
|
||||
self.lastresult = self.run_task(cmd, myargs)
|
||||
rc = self.run_task(cmd, myargs, session)
|
||||
except Exception, e:
|
||||
self.channel.send ( "Exception: %s\n" % (str(Exception),) )
|
||||
self.channel.send ( "Error: %s\n" % (e,) )
|
||||
self.channel.send ( "ERR" )
|
||||
self.channel.close()
|
||||
self.lastresult = None
|
||||
break
|
||||
|
||||
self.channel.send ( "SUC" )
|
||||
if cmd not in ["rc","begin","end"]:
|
||||
self.channel.send ( "SUC" )
|
||||
self.updateProgress('close: %s' % (details,))
|
||||
self.channel.close()
|
||||
break
|
||||
@@ -9457,7 +9470,7 @@ class SocketHostInterface:
|
||||
else:
|
||||
raise
|
||||
|
||||
def run_task(self, cmd, args):
|
||||
def run_task(self, cmd, args, session):
|
||||
myargs = []
|
||||
mykwargs = {}
|
||||
for arg in args:
|
||||
@@ -9468,20 +9481,51 @@ class SocketHostInterface:
|
||||
mykwargs[a] = eval(b)
|
||||
else:
|
||||
myargs.append(eval(arg))
|
||||
rc = self.spawn_function(cmd, myargs, mykwargs)
|
||||
return rc
|
||||
return self.spawn_function(cmd, myargs, mykwargs, session)
|
||||
|
||||
def spawn_function(self, cmd, myargs, mykwargs):
|
||||
def _store_rc(self, rc, session):
|
||||
if type(rc) in (list,tuple,):
|
||||
rc_item = rc[:]
|
||||
elif type(rc) in (set,frozenset,dict,):
|
||||
rc_item = rc.copy()
|
||||
else:
|
||||
rc_item = rc
|
||||
self.sessions[session]['rc'] = rc_item
|
||||
|
||||
def _get_rc(self, session):
|
||||
return self.sessions[session]['rc']
|
||||
|
||||
def spawn_function(self, cmd, myargs, mykwargs, session):
|
||||
|
||||
self.updateProgress('called %s: args: %s, kwargs: %s' % (cmd,myargs,mykwargs,))
|
||||
|
||||
if cmd == "reposync":
|
||||
repoConn = self.Entropy.Repositories(*myargs, **mykwargs)
|
||||
rc = repoConn.sync()
|
||||
self._store_rc(rc, session)
|
||||
return rc
|
||||
|
||||
elif cmd == "match":
|
||||
rc = self.Entropy.atomMatch(*myargs, **mykwargs)
|
||||
self._store_rc(rc, session)
|
||||
return rc
|
||||
|
||||
elif cmd == "rc":
|
||||
rc = self._get_rc(session)
|
||||
self.channel.send(str(rc))
|
||||
return rc
|
||||
|
||||
elif cmd == "begin":
|
||||
session = self.get_new_session()
|
||||
self.channel.send(session)
|
||||
return session
|
||||
|
||||
elif cmd == "end":
|
||||
rc = self.destroy_session(session)
|
||||
if rc:
|
||||
self.channel.send ( "SUC" )
|
||||
else:
|
||||
self.channel.send ( "ERR" )
|
||||
return rc
|
||||
|
||||
def remoteUpdateProgress(self, text, header = "", footer = "", back = False, importance = 0, type = "info", count = [], percent = False):
|
||||
|
||||
Reference in New Issue
Block a user