- Socket Interface:
  - implemented sessions timeouts collector (I like calling it GARBAGE collector), each session has now a configurable TTL
  - added 'alive' command to see if a session is still alive


git-svn-id: http://svn.sabayonlinux.org/projects/entropy/trunk@1528 cd1c1023-2f26-0410-ae45-c471fc1f0318
This commit is contained in:
lxnay
2008-03-28 13:32:14 +00:00
parent ff09af6efe
commit c1e8f26720
4 changed files with 117 additions and 17 deletions
+1
View File
@@ -16,5 +16,6 @@ srv = SocketHostInterface(Equo,EquoInterface)
try:
srv.go()
except KeyboardInterrupt:
srv.Gc.kill()
sys.exit(0)
+8
View File
@@ -38,3 +38,11 @@
# listen-threads|5
#
#listen-threads|5
#
# Sessions TTL:
# session-ttl|<number of seconds>
#
# example (default):
# session-ttl|120
#
#session-ttl|120
+95 -12
View File
@@ -9525,6 +9525,7 @@ class SocketHostInterface:
import socket
import SocketServer
import entropyTools
from threading import Thread
class HostServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
@@ -9593,7 +9594,7 @@ class SocketHostInterface:
self.lastoutput = None
def handle_termination_commands(self, data):
if data.strip() in ["quit","close"]:
if data.strip() in self.HostInterface.termination_commands:
self.HostInterface.updateProgress('close: %s' % (self.channel,))
self.transmit(self.HostInterface.answers['cl'])
return "close"
@@ -9623,7 +9624,7 @@ class SocketHostInterface:
self.transmit(self.HostInterface.answers['no'])
elif whoops:
self.transmit(self.HostInterface.answers['er'])
elif cmd not in ["rc","begin","end","hello"]:
elif cmd not in self.HostInterface.no_acked_commands:
self.transmit(self.HostInterface.answers['ok'])
self.channel.sendall(self.HostInterface.answers['eot'])
@@ -9634,11 +9635,16 @@ class SocketHostInterface:
return False
if session == None:
if cmd not in ["begin","hello"]:
if cmd not in self.HostInterface.no_session_commands:
return False
elif session not in self.HostInterface.sessions:
return False
# keep session alive
if session != None:
self.HostInterface.set_session_running(session)
self.HostInterface.update_session_time(session)
return True
def process(self, data, channel):
@@ -9676,6 +9682,9 @@ class SocketHostInterface:
self.HostInterface.store_rc(str(e),session)
whoops = True
if session != None:
self.HostInterface.update_session_time(session)
self.HostInterface.unset_session_running(session)
self.handle_end_answer(cmd, whoops, valid_cmd)
def duplicate_termination_cmd(self, data):
@@ -9726,9 +9735,17 @@ class SocketHostInterface:
elif cmd == "end":
return self.docmd_end(session)
elif cmd == "hello":
return self.docmd_hello(session)
return self.docmd_hello()
elif cmd == "alive":
return self.docmd_alive(session)
def docmd_hello(self, session):
def docmd_alive(self, session):
cmd = self.HostInterface.answers['no']
if session in self.HostInterface.sessions:
cmd = self.HostInterface.answers['ok']
self.transmit(cmd)
def docmd_hello(self):
uname = os.uname()
kern_string = uname[2]
running_host = uname[1]
@@ -9788,10 +9805,10 @@ class SocketHostInterface:
# settings
self.timeout = etpConst['socket_service']['timeout']
self.hostname = etpConst['socket_service']['hostname']
self.session_ttl = etpConst['socket_service']['session_ttl']
if self.hostname == "*": self.hostname = ''
self.port = etpConst['socket_service']['port']
self.threads = etpConst['socket_service']['threads'] # maximum number of allowed sessions
# FIXME: add self.sessions garbage collection
self.sessions = {}
self.answers = etpConst['socket_service']['answers']
# FIXME: add policy handling
@@ -9801,15 +9818,55 @@ class SocketHostInterface:
'reposync',
'rc',
'match',
'hello'
'hello',
'alive'
]
self.no_acked_commands = [
"rc",
"begin",
"end",
"hello",
"alive"
]
self.termination_commands = [
"quit",
"close"
]
self.no_session_commands = [
"begin",
"hello",
"alive"
]
self.EntropyInstantiation = (serviceIntf, args, kwds)
self.__Entropy = outputIntf
self.Server = None
self.setup_hostname()
self.Gc = None
self.start_session_garbage_collector()
def start_session_garbage_collector(self):
# do it every 30 seconds
self.Gc = self.entropyTools.TimeScheduled( self.gc_clean, 5 )
self.Gc.setName("Socket_GC::"+str(random.random()))
self.Gc.start()
def gc_clean(self):
if not self.sessions:
return
for session_id in self.sessions.keys():
sess_time = self.sessions[session_id]['t']
is_running = self.sessions[session_id]['running']
if is_running:
continue
cur_time = time.time()
ttl = self.session_ttl
check_time = sess_time + ttl
if cur_time > check_time:
self.updateProgress('killing session %s, ttl: %ss: no activity' % (session_id,ttl,) )
self.destroy_session(session_id)
def setup_hostname(self):
if self.hostname:
try:
@@ -9831,8 +9888,22 @@ class SocketHostInterface:
while rng in self.sessions:
rng = str(int(random.random()*100000)+1)
self.sessions[rng] = {}
self.sessions[rng]['running'] = False
self.sessions[rng]['t'] = time.time()
return rng
def update_session_time(self, session):
self.sessions[session]['t'] = time.time()
self.updateProgress('session time updated for %s' % (session,) )
def set_session_running(self, session):
if self.sessions.has_key(session):
self.sessions[session]['running'] = True
def unset_session_running(self, session):
if self.sessions.has_key(session):
self.sessions[session]['running'] = False
def destroy_session(self, session):
if self.sessions.has_key(session):
del self.sessions[session]
@@ -9841,13 +9912,25 @@ class SocketHostInterface:
def go(self):
self.socket.setdefaulttimeout(self.timeout)
self.Server = self.HostServer(
(self.hostname, self.port),
self.RequestHandler,
self.CommandProcessor(self)
)
while 1:
try:
self.Server = self.HostServer(
(self.hostname, self.port),
self.RequestHandler,
self.CommandProcessor(self)
)
break
except self.socket.error, e:
if e[0] == 98:
# Address already in use
self.updateProgress('address already in use (%s, port: %s), waiting 5 seconds...' % (self.hostname,self.port,))
time.sleep(5)
continue
else:
raise
self.updateProgress('server connected, listening on: %s, port: %s' % (self.hostname,self.port,))
self.Server.serve_forever()
self.Gc.kill()
def store_rc(self, rc, session):
if type(rc) in (list,tuple,):
+13 -5
View File
@@ -704,12 +704,13 @@ def const_defaultSettings(rootdir):
'port': 999,
'timeout': 200,
'threads': 5,
'session_ttl': 120,
'answers': {
'ok': chr(0)+chr(0),
'er': chr(0)+chr(1),
'no': chr(0)+chr(2),
'cl': chr(0)+chr(3),
'eot': chr(0)+chr(4)+"\n"
'ok': chr(0)+"OK\n"+chr(0),
'er': chr(0)+"ER\n"+chr(1),
'no': chr(0)+"NO\n"+chr(2),
'cl': chr(0)+"CL\n"+chr(3),
'eot': chr(0)+"EOT\n"+chr(4)
},
}
@@ -858,6 +859,13 @@ def const_readSocketSettings():
etpConst['socket_service']['threads'] = x
except ValueError:
pass
elif line.startswith("session-ttl|") and (len(line.split("|")) > 1):
x = line.split("|")[1].strip()
try:
x = int(x)
etpConst['socket_service']['session_ttl'] = x
except ValueError:
pass
def const_readEntropySettings():
# entropy section