diff --git a/client/entropy-system-daemon b/client/entropy-system-daemon index e83196244..0280f01f8 100644 --- a/client/entropy-system-daemon +++ b/client/entropy-system-daemon @@ -16,5 +16,6 @@ srv = SocketHostInterface(Equo,EquoInterface) try: srv.go() except KeyboardInterrupt: + srv.Gc.kill() sys.exit(0) diff --git a/conf/socket.conf b/conf/socket.conf index bb7901cfc..cf8bee892 100644 --- a/conf/socket.conf +++ b/conf/socket.conf @@ -38,3 +38,11 @@ # listen-threads|5 # #listen-threads|5 +# +# Sessions TTL: +# session-ttl| +# +# example (default): +# session-ttl|120 +# +#session-ttl|120 diff --git a/libraries/entropy.py b/libraries/entropy.py index 229d80daa..a32387f1f 100644 --- a/libraries/entropy.py +++ b/libraries/entropy.py @@ -9525,6 +9525,7 @@ class SocketHostInterface: import socket import SocketServer + import entropyTools from threading import Thread class HostServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): @@ -9593,7 +9594,7 @@ class SocketHostInterface: self.lastoutput = None def handle_termination_commands(self, data): - if data.strip() in ["quit","close"]: + if data.strip() in self.HostInterface.termination_commands: self.HostInterface.updateProgress('close: %s' % (self.channel,)) self.transmit(self.HostInterface.answers['cl']) return "close" @@ -9623,7 +9624,7 @@ class SocketHostInterface: self.transmit(self.HostInterface.answers['no']) elif whoops: self.transmit(self.HostInterface.answers['er']) - elif cmd not in ["rc","begin","end","hello"]: + elif cmd not in self.HostInterface.no_acked_commands: self.transmit(self.HostInterface.answers['ok']) self.channel.sendall(self.HostInterface.answers['eot']) @@ -9634,11 +9635,16 @@ class SocketHostInterface: return False if session == None: - if cmd not in ["begin","hello"]: + if cmd not in self.HostInterface.no_session_commands: return False elif session not in self.HostInterface.sessions: return False + # keep session alive + if session != None: + self.HostInterface.set_session_running(session) + self.HostInterface.update_session_time(session) + return True def process(self, data, channel): @@ -9676,6 +9682,9 @@ class SocketHostInterface: self.HostInterface.store_rc(str(e),session) whoops = True + if session != None: + self.HostInterface.update_session_time(session) + self.HostInterface.unset_session_running(session) self.handle_end_answer(cmd, whoops, valid_cmd) def duplicate_termination_cmd(self, data): @@ -9726,9 +9735,17 @@ class SocketHostInterface: elif cmd == "end": return self.docmd_end(session) elif cmd == "hello": - return self.docmd_hello(session) + return self.docmd_hello() + elif cmd == "alive": + return self.docmd_alive(session) - def docmd_hello(self, session): + def docmd_alive(self, session): + cmd = self.HostInterface.answers['no'] + if session in self.HostInterface.sessions: + cmd = self.HostInterface.answers['ok'] + self.transmit(cmd) + + def docmd_hello(self): uname = os.uname() kern_string = uname[2] running_host = uname[1] @@ -9788,10 +9805,10 @@ class SocketHostInterface: # settings self.timeout = etpConst['socket_service']['timeout'] self.hostname = etpConst['socket_service']['hostname'] + self.session_ttl = etpConst['socket_service']['session_ttl'] if self.hostname == "*": self.hostname = '' self.port = etpConst['socket_service']['port'] self.threads = etpConst['socket_service']['threads'] # maximum number of allowed sessions - # FIXME: add self.sessions garbage collection self.sessions = {} self.answers = etpConst['socket_service']['answers'] # FIXME: add policy handling @@ -9801,15 +9818,55 @@ class SocketHostInterface: 'reposync', 'rc', 'match', - 'hello' + 'hello', + 'alive' + ] + self.no_acked_commands = [ + "rc", + "begin", + "end", + "hello", + "alive" + ] + self.termination_commands = [ + "quit", + "close" + ] + self.no_session_commands = [ + "begin", + "hello", + "alive" ] self.EntropyInstantiation = (serviceIntf, args, kwds) self.__Entropy = outputIntf self.Server = None self.setup_hostname() + self.Gc = None + self.start_session_garbage_collector() + def start_session_garbage_collector(self): + # do it every 30 seconds + self.Gc = self.entropyTools.TimeScheduled( self.gc_clean, 5 ) + self.Gc.setName("Socket_GC::"+str(random.random())) + self.Gc.start() + + def gc_clean(self): + if not self.sessions: + return + for session_id in self.sessions.keys(): + sess_time = self.sessions[session_id]['t'] + is_running = self.sessions[session_id]['running'] + if is_running: + continue + cur_time = time.time() + ttl = self.session_ttl + check_time = sess_time + ttl + if cur_time > check_time: + self.updateProgress('killing session %s, ttl: %ss: no activity' % (session_id,ttl,) ) + self.destroy_session(session_id) + def setup_hostname(self): if self.hostname: try: @@ -9831,8 +9888,22 @@ class SocketHostInterface: while rng in self.sessions: rng = str(int(random.random()*100000)+1) self.sessions[rng] = {} + self.sessions[rng]['running'] = False + self.sessions[rng]['t'] = time.time() return rng + def update_session_time(self, session): + self.sessions[session]['t'] = time.time() + self.updateProgress('session time updated for %s' % (session,) ) + + def set_session_running(self, session): + if self.sessions.has_key(session): + self.sessions[session]['running'] = True + + def unset_session_running(self, session): + if self.sessions.has_key(session): + self.sessions[session]['running'] = False + def destroy_session(self, session): if self.sessions.has_key(session): del self.sessions[session] @@ -9841,13 +9912,25 @@ class SocketHostInterface: def go(self): self.socket.setdefaulttimeout(self.timeout) - self.Server = self.HostServer( - (self.hostname, self.port), - self.RequestHandler, - self.CommandProcessor(self) - ) + while 1: + try: + self.Server = self.HostServer( + (self.hostname, self.port), + self.RequestHandler, + self.CommandProcessor(self) + ) + break + except self.socket.error, e: + if e[0] == 98: + # Address already in use + self.updateProgress('address already in use (%s, port: %s), waiting 5 seconds...' % (self.hostname,self.port,)) + time.sleep(5) + continue + else: + raise self.updateProgress('server connected, listening on: %s, port: %s' % (self.hostname,self.port,)) self.Server.serve_forever() + self.Gc.kill() def store_rc(self, rc, session): if type(rc) in (list,tuple,): diff --git a/libraries/entropyConstants.py b/libraries/entropyConstants.py index 64c2372fc..7694a82c7 100644 --- a/libraries/entropyConstants.py +++ b/libraries/entropyConstants.py @@ -704,12 +704,13 @@ def const_defaultSettings(rootdir): 'port': 999, 'timeout': 200, 'threads': 5, + 'session_ttl': 120, 'answers': { - 'ok': chr(0)+chr(0), - 'er': chr(0)+chr(1), - 'no': chr(0)+chr(2), - 'cl': chr(0)+chr(3), - 'eot': chr(0)+chr(4)+"\n" + 'ok': chr(0)+"OK\n"+chr(0), + 'er': chr(0)+"ER\n"+chr(1), + 'no': chr(0)+"NO\n"+chr(2), + 'cl': chr(0)+"CL\n"+chr(3), + 'eot': chr(0)+"EOT\n"+chr(4) }, } @@ -858,6 +859,13 @@ def const_readSocketSettings(): etpConst['socket_service']['threads'] = x except ValueError: pass + elif line.startswith("session-ttl|") and (len(line.split("|")) > 1): + x = line.split("|")[1].strip() + try: + x = int(x) + etpConst['socket_service']['session_ttl'] = x + except ValueError: + pass def const_readEntropySettings(): # entropy section