Package entropy :: Package services :: Module interfaces

Source Code for Module entropy.services.interfaces

   1  # -*- coding: utf-8 -*- 
   2  ''' 
   3      # DESCRIPTION: 
   4      # Entropy Object Oriented Interface 
   5   
   6      Copyright (C) 2007-2009 Fabio Erculiani 
   7   
   8      This program is free software; you can redistribute it and/or modify 
   9      it under the terms of the GNU General Public License as published by 
  10      the Free Software Foundation; either version 2 of the License, or 
  11      (at your option) any later version. 
  12   
  13      This program is distributed in the hope that it will be useful, 
  14      but WITHOUT ANY WARRANTY; without even the implied warranty of 
  15      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
  16      GNU General Public License for more details. 
  17   
  18      You should have received a copy of the GNU General Public License 
  19      along with this program; if not, write to the Free Software 
  20      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
  21  ''' 
  22   
  23  from __future__ import with_statement 
  24  import os 
  25  import select 
  26  import shutil 
  27  import time 
  28  from entropy.const import etpConst, ETP_LOGLEVEL_NORMAL, ETP_LOGPRI_INFO, \ 
  29      const_setup_perms 
  30  from entropy.exceptions import * 
  31  from entropy.services.skel import SocketAuthenticator, SocketCommands 
  32  from entropy.i18n import _ 
  33  from entropy.output import blue, red, darkgreen 
  34   
35 -class SocketHost:
36 37 import socket 38 import SocketServer 39 from threading import Thread 40
41 - class BasicPamAuthenticator(SocketAuthenticator):
42 43 import entropy.tools as entropyTools 44
45 - def __init__(self, HostInterface, *args, **kwargs):
46 self.valid_auth_types = [ "plain", "shadow", "md5" ] 47 SocketAuthenticator.__init__(self, HostInterface)
48
49 - def docmd_login(self, arguments):
50 51 # filter n00bs 52 if not arguments or (len(arguments) != 3): 53 return False,None,None,'wrong arguments' 54 55 user = arguments[0] 56 auth_type = arguments[1] 57 auth_string = arguments[2] 58 59 # check auth type validity 60 if auth_type not in self.valid_auth_types: 61 return False,user,None,'invalid auth type' 62 63 udata = self.__get_user_data(user) 64 if udata == None: 65 return False,user,None,'invalid user' 66 67 uid = udata[2] 68 # check if user is in the Entropy group 69 if not self.entropyTools.is_user_in_entropy_group(uid): 70 return False,user,uid,'user not in %s group' % (etpConst['sysgroup'],) 71 72 # now validate password 73 valid = self.__validate_auth(user,auth_type,auth_string) 74 if not valid: 75 return False,user,uid,'auth failed' 76 77 if not uid: 78 self.HostInterface.sessions[self.session]['admin'] = True 79 else: 80 self.HostInterface.sessions[self.session]['user'] = True 81 return True,user,uid,"ok"
82 83 # it we get here is because user is logged in
84 - def docmd_userdata(self):
85 86 auth_uid = self.HostInterface.sessions[self.session]['auth_uid'] 87 mydata = {} 88 udata = self.__get_uid_data(auth_uid) 89 if udata: 90 mydata['username'] = udata[0] 91 mydata['uid'] = udata[2] 92 mydata['gid'] = udata[3] 93 mydata['references'] = udata[4] 94 mydata['home'] = udata[5] 95 mydata['shell'] = udata[6] 96 return True,mydata,'ok'
97
98 - def __get_uid_data(self, user_id):
99 import pwd 100 # check user validty 101 try: 102 udata = pwd.getpwuid(user_id) 103 except KeyError: 104 return None 105 return udata
106
107 - def __get_user_data(self, user):
108 import pwd 109 # check user validty 110 try: 111 udata = pwd.getpwnam(user) 112 except KeyError: 113 return None 114 return udata
115
116 - def __validate_auth(self, user, auth_type, auth_string):
117 valid = False 118 if auth_type == "plain": 119 valid = self.__do_auth(user, auth_string) 120 elif auth_type == "shadow": 121 valid = self.__do_auth(user, auth_string, auth_type = "shadow") 122 elif auth_type == "md5": 123 valid = self.__do_auth(user, auth_string, auth_type = "md5") 124 return valid
125
126 - def __do_auth(self, user, password, auth_type = None):
127 import spwd 128 129 try: 130 enc_pass = spwd.getspnam(user)[1] 131 except KeyError: 132 return False 133 134 if auth_type == None: # plain 135 import crypt 136 generated_pass = crypt.crypt(str(password), enc_pass) 137 elif auth_type == "shadow": 138 generated_pass = password 139 elif auth_type == "md5": # md5 140 import hashlib 141 m = hashlib.md5() 142 m.update(enc_pass) 143 enc_pass = m.hexdigest() 144 generated_pass = str(password) 145 else: # haha, fuck! 146 generated_pass = None 147 148 if generated_pass == enc_pass: 149 return True 150 return False
151
152 - def docmd_logout(self, myargs):
153 154 # filter n00bs 155 if (len(myargs) < 1) or (len(myargs) > 1): 156 return False,None,'wrong arguments' 157 158 user = myargs[0] 159 # filter n00bs 160 if not user or not isinstance(user,basestring): 161 return False,None,"wrong user" 162 163 return True,user,"ok"
164
165 - def set_exc_permissions(self, uid, gid):
166 if gid != None: 167 os.setgid(gid) 168 if uid != None: 169 os.setuid(uid)
170
171 - def hide_login_data(self, args):
172 myargs = args[:] 173 myargs[-1] = 'hidden' 174 return myargs
175
176 - def terminate_instance(self):
177 pass
178
179 - class HostServerMixin(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
180
181 - class ConnWrapper:
182 ''' 183 Base class for implementing the rest of the wrappers in this module. 184 Operates by taking a connection argument which is used when 'self' doesn't 185 provide the functionality being requested. 186 '''
187 - def __init__(self, connection) :
188 self.connection = connection
189
190 - def __getattr__(self, function) :
191 return getattr(self.connection, function)
192 193 import entropy.tools as entropyTools 194 import socket as socket_mod 195 import SocketServer 196 # This means the main server will not do the equivalent of a 197 # pthread_join() on the new threads. With this set, Ctrl-C will 198 # kill the server reliably. 199 daemon_threads = True 200 201 # By setting this we allow the server to re-bind to the address by 202 # setting SO_REUSEADDR, meaning you don't have to wait for 203 # timeouts when you kill the server and the sockets don't get 204 # closed down correctly. 205 allow_reuse_address = True 206
207 - def __init__(self, server_address, RequestHandlerClass, processor, HostInterface, authorized_clients_only = False):
208 209 self.alive = True 210 self.socket = self.socket_mod 211 self.processor = processor 212 self.server_address = server_address 213 self.HostInterface = HostInterface 214 self.SSL = self.HostInterface.SSL 215 self.real_sock = None 216 self.ssl_authorized_clients_only = authorized_clients_only 217 218 if self.SSL: 219 self.SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass) 220 self.load_ssl_context() 221 self.make_ssl_connection_alive() 222 else: 223 try: 224 self.SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass) 225 except self.socket_mod.error, e: 226 if e[0] == 13: 227 raise ConnectionError('ConnectionError: %s' % (_("Cannot bind the service"),)) 228 raise
229
230 - def load_ssl_context(self):
231 # setup an SSL context. 232 self.context = self.SSL['m'].Context(self.SSL['m'].SSLv23_METHOD) 233 self.context.set_verify(self.SSL['m'].VERIFY_PEER, self.verify_ssl_cb) # ask for a certificate 234 self.context.set_options(self.SSL['m'].OP_NO_SSLv2) 235 # load up certificate stuff. 236 self.context.use_privatekey_file(self.SSL['key']) 237 self.context.use_certificate_file(self.SSL['cert']) 238 self.context.load_verify_locations(self.SSL['ca_cert']) 239 self.context.load_client_ca(self.SSL['ca_cert']) 240 self.HostInterface.updateProgress('SSL context loaded, key: %s - cert: %s, CA cert: %s, CA pkey: %s' % ( 241 self.SSL['key'], 242 self.SSL['cert'], 243 self.SSL['ca_cert'], 244 self.SSL['ca_pkey'] 245 ) 246 )
247
248 - def make_ssl_connection_alive(self):
249 self.real_sock = self.socket_mod.socket(self.address_family, self.socket_type) 250 self.socket = self.ConnWrapper(self.SSL['m'].Connection(self.context, self.real_sock)) 251 self.server_bind() 252 self.server_activate()
253 254 # this function should do the authentication checking to see that 255 # the client is who they say they are.
256 - def verify_ssl_cb(self, conn, cert, errnum, depth, ok) :
257 return ok
258
259 - def verify_request(self, request, client_address):
260 261 self.do_ssl = self.HostInterface.SSL 262 if self.do_ssl: self.do_ssl = True 263 else: self.do_ssl = False 264 265 allowed = self.ip_blacklist_check(client_address[0]) 266 if allowed: allowed = self.ip_max_connections_check(client_address[0]) 267 if not allowed: 268 self.HostInterface.updateProgress( 269 '[from: %s | SSL: %s] connection refused, ip blacklisted or maximum connections per IP reached' % ( 270 client_address, 271 self.do_ssl, 272 ) 273 ) 274 return False 275 276 allowed = self.max_connections_check(request) 277 if not allowed: 278 self.HostInterface.updateProgress( 279 '[from: %s | SSL: %s] connection refused (max connections reached: %s)' % ( 280 client_address, 281 self.do_ssl, 282 self.HostInterface.max_connections, 283 ) 284 ) 285 return False 286 287 ### let's go! 288 self.HostInterface.connections += 1 289 self.HostInterface.updateProgress( 290 '[from: %s | SSL: %s] connection established (%s of %s max connections)' % ( 291 client_address, 292 self.do_ssl, 293 self.HostInterface.connections, 294 self.HostInterface.max_connections, 295 ) 296 ) 297 return True
298
299 - def ip_blacklist_check(self, client_addr):
300 if client_addr in self.HostInterface.ip_blacklist: 301 return False 302 return True
303
304 - def ip_max_connections_check(self, ip_address):
305 max_conn_per_ip = self.HostInterface.max_connections_per_host 306 max_conn_per_ip_barrier = self.HostInterface.max_connections_per_host_barrier 307 per_host_connections = self.HostInterface.per_host_connections 308 conn_data = per_host_connections.get(ip_address) 309 if conn_data == None: 310 per_host_connections[ip_address] = 1 311 else: 312 conn_data += 1 313 per_host_connections[ip_address] += 1 314 if conn_data > max_conn_per_ip: 315 self.HostInterface.updateProgress( 316 '[from: %s] ------- :EEK: !! connection closed too many simultaneous connections from host (current: %s | limit: %s) -------' % ( 317 ip_address, 318 conn_data, 319 max_conn_per_ip, 320 ) 321 ) 322 return False 323 elif conn_data > max_conn_per_ip_barrier: 324 times = [5,6,7,8] 325 self.HostInterface.updateProgress( 326 '[from: %s] ------- :EEEK: !! connection warning simultaneous connection barrier reached from host (current: %s | soft limit: %s) -------' % ( 327 ip_address, 328 conn_data, 329 max_conn_per_ip_barrier, 330 ) 331 ) 332 rnd_num = self.entropyTools.get_random_number() 333 time.sleep(times[abs(hash(rnd_num))%len(times)]) 334 335 return True
336
337 - def max_connections_check(self, request):
338 current = self.HostInterface.connections 339 maximum = self.HostInterface.max_connections 340 if current >= maximum: 341 try: 342 self.HostInterface.transmit( 343 request, 344 self.HostInterface.answers['mcr'] 345 ) 346 except: 347 pass 348 return False 349 else: 350 return True
351
352 - def serve_forever(self):
353 while self.alive: 354 #r,w,e = select.select([self.socket], [], [], 1) 355 #if r: 356 self.handle_request()
357 358 # taken from SocketServer.py
359 - def finish_request(self, request, client_address):
360 """Finish one request by instantiating RequestHandlerClass.""" 361 self.RequestHandlerClass(request, client_address, self) 362 363 self.HostInterface.updateProgress( 364 '[from: %s] connection closed (%s of %s max connections)' % ( 365 client_address, 366 self.HostInterface.connections - 1, 367 self.HostInterface.max_connections, 368 ) 369 ) 370 per_host_connections = self.HostInterface.per_host_connections 371 conn_data = per_host_connections.get(client_address[0]) 372 if conn_data != None: 373 if conn_data < 1: 374 del per_host_connections[client_address[0]] 375 else: 376 per_host_connections[client_address[0]] -= 1
377
378 - def close_request(self, request):
379 if self.HostInterface.connections > 0: 380 self.HostInterface.connections -= 1
381
382 - class RequestHandler(SocketServer.BaseRequestHandler):
383 384 import SocketServer 385 import select 386 import socket 387 import entropy.tools as entropyTools 388 import gc 389 timed_out = False 390
391 - def __init__(self, request, client_address, server):
392 393 # pre-init attribues 394 self.__DEBUG = False 395 self.__buffered_data = None 396 self.__inst_token = self.entropyTools.get_random_number() 397 self.server = None 398 self.request = None 399 self.client_address = None 400 self.SocketServer.BaseRequestHandler.__init__(self, request, 401 client_address, server) 402 self.__data_counter = None
403
404 - def data_receiver(self):
405 406 if self.timed_out: 407 return True 408 self.timed_out = True 409 try: 410 ready_to_read, ready_to_write, in_error = select.select( 411 [self.request], [], [], self.default_timeout) 412 except KeyboardInterrupt: 413 self.timed_out = True 414 return True 415 416 if len(ready_to_read) == 1 and ready_to_read[0] == self.request: 417 418 self.timed_out = False 419 # for ValueError exception trapping: 420 data = None 421 422 if self.__DEBUG: 423 self.server.processor.HostInterface.updateProgress( 424 '[from: %s] request arrived :: counter: %s | buf_data: %s' % ( 425 self.client_address, 426 self.__data_counter, 427 len(self.__buffered_data), 428 ) 429 ) 430 431 try: 432 433 if self.ssl and hasattr(self.request, 'setblocking'): 434 # set SSL socket in blocking mode 435 # this fixes bugs related to data stream flooding 436 # with SSL - pyOpenSSL, probably because handshake 437 # and WantRead/WantWrite bullshit is handled 438 # automatically 439 self.request.setblocking(True) 440 441 data = self.request.recv(1024) 442 if self.ssl: 443 while self.request.pending(): 444 data += self.request.recv(1024) 445 446 if self.__data_counter is None: 447 if data == '': # client wants to close 448 return True 449 elif data == self.server.processor.HostInterface.answers['noop']: 450 return False 451 elif len(data) < len(self.myeos): 452 self.server.processor.HostInterface.updateProgress( 453 'interrupted: %s, reason: %s - from client: %s - data: "%s" - counter: %s' % ( 454 self.server.server_address, 455 "malformed EOS", 456 self.client_address, 457 repr(data), 458 self.__data_counter, 459 ) 460 ) 461 self.__buffered_data = '' 462 return True 463 mystrlen = data.split(self.myeos)[0] 464 self.__data_counter = int(mystrlen) 465 data = data[len(mystrlen)+1:] 466 self.__data_counter -= len(data) 467 self.__buffered_data += data 468 469 # command length exceeds our command length limit 470 if self.__data_counter > self.max_command_length: 471 raise InterruptError( 472 'InterruptError: command too long: %s, limit: %s' % ( 473 self.__data_counter, self.max_command_length,)) 474 475 buf_empty_watchdog_count = 50 # * 0.05 = 2,5 seconds 476 buf_len = 1024 477 while self.__data_counter > 0: 478 data_buf = buf_len 479 if self.__data_counter < buf_len: 480 data_buf = self.__data_counter 481 if self.ssl: 482 x = self.request.recv(data_buf) 483 else: 484 x = self.request.recv(data_buf) 485 xlen = len(x) 486 self.__data_counter -= xlen 487 self.__buffered_data += x 488 # if we did not receive a shit and we still 489 # need some data, trigger the watchdog 490 if (xlen == 0) and (self.__data_counter > 0): 491 buf_empty_watchdog_count -= 1 492 time.sleep(0.05) 493 if buf_empty_watchdog_count < 1: 494 raise ValueError( 495 "buffer counter watchdog trigger") 496 497 self.__data_counter = None 498 except ValueError: 499 tb = self.entropyTools.get_traceback() 500 print tb 501 self.server.processor.HostInterface.socketLog.write(tb) 502 self.server.processor.HostInterface.socketLog.write(repr(data)) 503 self.server.processor.HostInterface.socketLog.write(str(self)) 504 self.server.processor.HostInterface.socketLog.write(str(self.__inst_token)) 505 self.server.processor.HostInterface.updateProgress( 506 'interrupted: %s, reason: %s - from client: %s' % ( 507 self.server.server_address, 508 "malformed transmission", 509 self.client_address, 510 ) 511 ) 512 return True 513 except self.socket.timeout, e: 514 self.server.processor.HostInterface.updateProgress( 515 'interrupted: %s, reason: %s - from client: %s' % ( 516 self.server.server_address, 517 e, 518 self.client_address, 519 ) 520 ) 521 return True 522 except self.socket.sslerror, e: 523 self.server.processor.HostInterface.updateProgress( 524 'interrupted: %s, SSL socket error reason: %s - from client: %s' % ( 525 self.server.server_address, 526 e, 527 self.client_address, 528 ) 529 ) 530 return True 531 except self.ssl_exceptions['WantX509LookupError']: 532 return False 533 except self.ssl_exceptions['WantReadError']: 534 self.server.processor.HostInterface._ssl_poll( 535 self.request, select.POLLIN, 'read') 536 return False 537 except self.ssl_exceptions['WantWriteError']: 538 self.server.processor.HostInterface._ssl_poll( 539 self.request, select.POLLOUT, 'read') 540 return False 541 except self.ssl_exceptions['ZeroReturnError']: 542 return True 543 except self.ssl_exceptions['Error'], e: 544 self.server.processor.HostInterface.updateProgress( 545 'interrupted: SSL Error, reason: %s - from client: %s' % ( 546 e, 547 self.client_address, 548 ) 549 ) 550 return True 551 except InterruptError, e: 552 self.server.processor.HostInterface.updateProgress( 553 'interrupted: Command Error, reason: %s - from client: %s' % ( 554 e, 555 self.client_address, 556 ) 557 ) 558 return True 559 560 if not self.__buffered_data: 561 return True 562 563 cmd = self.server.processor.process(self.__buffered_data, self.request, self.client_address) 564 if cmd == 'close': 565 # send KAPUTT signal JA! 566 self.server.processor.transmit(self.server.processor.HostInterface.answers['cl']) 567 return True 568 self.__buffered_data = '' 569 return False
570
571 - def fork_lock_acquire(self):
572 if hasattr(self.server.processor.HostInterface,'ForkLock'): 573 x = getattr(self.server.processor.HostInterface,'ForkLock') 574 if hasattr(x,'acquire') and hasattr(x,'release') and hasattr(x,'locked'): 575 x.acquire()
576
577 - def fork_lock_release(self):
578 if hasattr(self.server.processor.HostInterface,'ForkLock'): 579 x = getattr(self.server.processor.HostInterface,'ForkLock') 580 if hasattr(x,'acquire') and hasattr(x,'release') and hasattr(x,'locked'): 581 if x.locked(): x.release()
582
583 - def handle(self):
584 # not using spawnFunction because it causes some mess 585 # forking this way avoids having memory leaks 586 if self.server.processor.HostInterface.fork_requests: 587 self.fork_lock_acquire() 588 try: 589 my_timeout = self.server.processor.HostInterface.fork_request_timeout_seconds 590 pid = os.fork() 591 seconds = 0 592 if pid > 0: # parent here 593 # pid killer after timeout 594 passed_away = False 595 while 1: 596 time.sleep(1) 597 seconds += 1 598 try: 599 dead = os.waitpid(pid, os.WNOHANG)[0] 600 except OSError, e: 601 if e.errno != 10: raise 602 dead = True 603 if passed_away: 604 break 605 if dead: break 606 if seconds > my_timeout: 607 self.server.processor.HostInterface.updateProgress( 608 'interrupted: forked request timeout: %s,%s from client: %s' % ( 609 seconds, 610 dead, 611 self.client_address, 612 ) 613 ) 614 if not dead: 615 import signal 616 os.kill(pid,signal.SIGKILL) 617 passed_away = True # in this way, the process table should be clean 618 continue 619 break 620 else: 621 self.do_handle() 622 os._exit(0) 623 finally: 624 self.fork_lock_release() 625 else: 626 self.do_handle()
627 #self.entropyTools.spawn_function(self.do_handle) 628
629 - def do_handle(self):
630 631 self.default_timeout = self.server.processor.HostInterface.timeout 632 self.ssl = self.server.processor.HostInterface.SSL 633 self.ssl_exceptions = self.server.processor.HostInterface.SSL_exceptions 634 self.myeos = self.server.processor.HostInterface.answers['eos'] 635 self.max_command_length = self.server.processor.HostInterface.max_command_length 636 637 while 1: 638 639 try: 640 if self.__DEBUG: 641 self.server.processor.HostInterface.updateProgress( 642 '[from: %s] calling data_receiver' % ( 643 self.client_address, 644 ) 645 ) 646 dobreak = self.data_receiver() 647 if self.__DEBUG: 648 self.server.processor.HostInterface.updateProgress( 649 '[from: %s] quitting data_receiver :: dobreak: %s' % ( 650 self.client_address, 651 dobreak, 652 ) 653 ) 654 if dobreak: break 655 except Exception, e: 656 self.server.processor.HostInterface.updateProgress( 657 'interrupted: Unhandled exception: %s, error: %s - from client: %s' % ( 658 Exception, 659 e, 660 self.client_address, 661 ) 662 ) 663 # print exception 664 tb = self.entropyTools.get_traceback() 665 print tb 666 self.server.processor.HostInterface.socketLog.write(tb) 667 break 668 669 self.request.close()
670
671 - def setup(self):
672 self.__data_counter = None 673 self.__buffered_data = ''
674 675
676 - class CommandProcessor:
677 678 import entropy.tools as entropyTools 679 import socket 680 import gc 681
682 - def __init__(self, HostInterface):
683 self.HostInterface = HostInterface 684 self.channel = None
685
686 - def handle_termination_commands(self, data):
687 if data.strip() in self.HostInterface.termination_commands: 688 self.HostInterface.updateProgress('close: %s' % (self.client_address,)) 689 self.transmit(self.HostInterface.answers['cl']) 690 return "close" 691 692 if not data.strip(): 693 return "ignore"
694
695 - def handle_command_string(self, string):
696 # validate command 697 args = string.strip().split(" ") 698 session = args[0] 699 if (session in self.HostInterface.initialization_commands) or \ 700 (session in self.HostInterface.no_session_commands) or \ 701 len(args) < 2: 702 cmd = args[0] 703 session = None 704 else: 705 cmd = args[1] 706 args = args[1:] # remove session 707 708 stream_enabled = False 709 if (session != None) and self.HostInterface.sessions.has_key(session): 710 stream_enabled = self.HostInterface.sessions[session].get('stream_mode') 711 712 if stream_enabled and (cmd not in self.HostInterface.config_commands): 713 session_len = 0 714 if session: session_len = len(session)+1 715 return cmd,[string[session_len+len(cmd)+1:]],session 716 else: 717 myargs = [] 718 if len(args) > 1: 719 myargs = args[1:] 720 721 return cmd,myargs,session
722
723 - def handle_end_answer(self, cmd, whoops, valid_cmd):
724 if not valid_cmd: 725 self.transmit(self.HostInterface.answers['no']) 726 elif whoops: 727 self.transmit(self.HostInterface.answers['er']) 728 elif cmd not in self.HostInterface.no_acked_commands: 729 self.transmit(self.HostInterface.answers['ok'])
730
731 - def validate_command(self, cmd, args, session):
732 733 # answer to invalid commands 734 if (cmd not in self.HostInterface.valid_commands): 735 return False,"not a valid command" 736 737 if session == None: 738 if cmd not in self.HostInterface.no_session_commands: 739 return False,"need a valid session" 740 elif session not in self.HostInterface.sessions: 741 return False,"session is not alive" 742 743 # check if command needs authentication 744 if session != None: 745 auth = self.HostInterface.valid_commands[cmd]['auth'] 746 if auth: 747 # are we? 748 authed = self.HostInterface.sessions[session]['auth_uid'] 749 if authed == None: 750 # nope 751 return False,"not authenticated" 752 753 # keep session alive 754 if session != None: 755 self.HostInterface.set_session_running(session) 756 self.HostInterface.update_session_time(session) 757 758 return True,"all good"
759
760 - def load_authenticator(self):
761 f, args, kwargs = self.HostInterface.AuthenticatorInst 762 myinst = f(*args,**kwargs) 763 return myinst
764
765 - def load_service_interface(self, session):
766 767 uid = None 768 if session != None: 769 uid = self.HostInterface.sessions[session]['auth_uid'] 770 771 intf = self.HostInterface.EntropyInstantiation[0] 772 args = self.HostInterface.EntropyInstantiation[1] 773 kwds = self.HostInterface.EntropyInstantiation[2] 774 return intf(*args, **kwds)
775
776 - def process(self, data, channel, client_address):
777 778 self.channel = channel 779 self.client_address = client_address 780 781 term = self.handle_termination_commands(data) 782 if term: 783 del authenticator 784 return term 785 786 cmd, args, session = self.handle_command_string(data) 787 valid_cmd, reason = self.validate_command(cmd, args, session) 788 789 # decide if we need to load authenticator or Entropy 790 authenticator = None 791 cmd_data = self.HostInterface.valid_commands.get(cmd) 792 if not isinstance(cmd_data,dict): 793 self.HostInterface.updateProgress( 794 '[from: %s] command error: invalid command: %s' % ( 795 self.client_address, 796 cmd, 797 ) 798 ) 799 return "close" 800 elif (("authenticator" in cmd_data['args']) or (cmd in self.HostInterface.login_pass_commands)): 801 try: 802 authenticator = self.load_authenticator() 803 except ConnectionError, e: 804 self.HostInterface.updateProgress( 805 '[from: %s] authenticator error: cannot load: %s' % ( 806 self.client_address, 807 e, 808 ) 809 ) 810 tb = self.entropyTools.get_traceback() 811 print tb 812 self.HostInterface.socketLog.write(tb) 813 return "close" 814 except Exception, e: 815 self.HostInterface.updateProgress( 816 '[from: %s] authenticator error: cannot load: %s - unknown error' % ( 817 self.client_address, 818 e, 819 ) 820 ) 821 tb = self.entropyTools.get_traceback() 822 print tb 823 self.HostInterface.socketLog.write(tb) 824 return "close" 825 826 p_args = args 827 if (cmd in self.HostInterface.login_pass_commands) and authenticator != None: 828 p_args = authenticator.hide_login_data(p_args) 829 elif cmd in self.HostInterface.raw_commands: 830 p_args = ['raw data'] 831 self.HostInterface.updateProgress( 832 '[from: %s] command validation :: called %s: length: %s, args: %s, session: %s, valid: %s, reason: %s' % ( 833 self.client_address, 834 cmd, 835 len(data), 836 p_args, 837 session, 838 valid_cmd, 839 reason, 840 ) 841 ) 842 843 whoops = False 844 if valid_cmd: 845 846 if authenticator != None: 847 # now set session 848 authenticator.set_session(session) 849 850 Entropy = None 851 if "Entropy" in cmd_data['args']: 852 Entropy = self.load_service_interface(session) 853 try: 854 self.run_task(cmd, args, session, Entropy, authenticator) 855 except self.socket.timeout: 856 self.HostInterface.updateProgress( 857 '[from: %s] command error: timeout, closing connection' % ( 858 self.client_address, 859 ) 860 ) 861 # close connection 862 del authenticator 863 del Entropy 864 return "close" 865 except self.socket.error, e: 866 self.HostInterface.updateProgress( 867 '[from: %s] command error: socket error: %s' % ( 868 self.client_address, 869 e, 870 ) 871 ) 872 # close connection 873 del authenticator 874 del Entropy 875 return "close" 876 except self.HostInterface.SSL_exceptions['SysCallError'], e: 877 self.HostInterface.updateProgress( 878 '[from: %s] command error: SSL SysCallError: %s' % ( 879 self.client_address, 880 e, 881 ) 882 ) 883 # close connection 884 del authenticator 885 del Entropy 886 return "close" 887 except Exception, e: 888 # write to self.HostInterface.socketLog 889 tb = self.entropyTools.get_traceback() 890 print tb 891 self.HostInterface.socketLog.write(tb) 892 # store error 893 self.HostInterface.updateProgress( 894 '[from: %s] command error: %s, type: %s' % ( 895 self.client_address, 896 e, 897 type(e), 898 ) 899 ) 900 if session != None: 901 self.HostInterface.store_rc(str(e),session) 902 whoops = True 903 904 del Entropy 905 906 if session != None: 907 self.HostInterface.update_session_time(session) 908 self.HostInterface.unset_session_running(session) 909 rcmd = None 910 try: 911 self.handle_end_answer(cmd, whoops, valid_cmd) 912 except (self.socket.error, self.socket.timeout,self.HostInterface.SSL_exceptions['SysCallError'],): 913 rcmd = "close" 914 915 if authenticator != None: 916 authenticator.terminate_instance() 917 del authenticator 918 if not self.HostInterface.fork_requests: 919 self.gc.collect() 920 return rcmd
921
922 - def transmit(self, data):
923 self.HostInterface.transmit(self.channel, data)
924
925 - def run_task(self, cmd, args, session, Entropy, authenticator):
926 927 p_args = args 928 if cmd in self.HostInterface.login_pass_commands: 929 p_args = authenticator.hide_login_data(p_args) 930 elif cmd in self.HostInterface.raw_commands: 931 p_args = ['raw data'] 932 self.HostInterface.updateProgress( 933 '[from: %s] run_task :: called %s: args: %s, session: %s' % ( 934 self.client_address, 935 cmd, 936 p_args, 937 session, 938 ) 939 ) 940 941 myargs = args 942 mykwargs = {} 943 if cmd not in self.HostInterface.raw_commands: 944 myargs, mykwargs = self._get_args_kwargs(args) 945 946 rc = self.spawn_function(cmd, myargs, mykwargs, session, Entropy, authenticator) 947 if session != None and self.HostInterface.sessions.has_key(session): 948 self.HostInterface.store_rc(rc, session) 949 return rc
950
951 - def _get_args_kwargs(self, args):
952 myargs = [] 953 mykwargs = {} 954 955 def is_int(x): 956 try: 957 int(x) 958 except ValueError: 959 return False 960 return True
961 962 for arg in args: 963 if (arg.find("=") != -1) and not arg.startswith("="): 964 x = arg.split("=") 965 a = x[0] 966 b = ''.join(x[1:]) 967 if (b in ("True","False",)) or is_int(b): 968 mykwargs[a] = eval(b) 969 else: 970 myargs.append(arg) 971 else: 972 if (arg in ("True","False",)) or is_int(arg): 973 myargs.append(eval(arg)) 974 else: 975 myargs.append(arg) 976 return myargs, mykwargs
977
978 - def spawn_function(self, cmd, myargs, mykwargs, session, Entropy, authenticator):
979 980 p_args = myargs 981 if cmd in self.HostInterface.login_pass_commands: 982 p_args = authenticator.hide_login_data(p_args) 983 elif cmd in self.HostInterface.raw_commands: 984 p_args = ['raw data'] 985 self.HostInterface.updateProgress( 986 '[from: %s] called %s: args: %s, kwargs: %s' % ( 987 self.client_address, 988 cmd, 989 p_args, 990 mykwargs, 991 ) 992 ) 993 return self.do_spawn(cmd, myargs, mykwargs, session, Entropy, authenticator)
994
995 - def do_spawn(self, cmd, myargs, mykwargs, session, Entropy, authenticator):
996 997 cmd_data = self.HostInterface.valid_commands.get(cmd) 998 do_fork = cmd_data['as_user'] 999 f = cmd_data['cb'] 1000 func_args = [] 1001 for arg in cmd_data['args']: 1002 try: 1003 func_args.append(eval(arg)) 1004 except (NameError, SyntaxError): 1005 func_args.append(str(arg)) 1006 1007 if do_fork: 1008 myfargs = func_args[:] 1009 myfargs.extend(myargs) 1010 return self.fork_task(f, session, authenticator, *myfargs, **mykwargs) 1011 else: 1012 return f(*func_args)
1013
1014 - def fork_task(self, f, session, authenticator, *args, **kwargs):
1015 gid = None 1016 uid = None 1017 if session != None: 1018 logged_in = self.HostInterface.sessions[session]['auth_uid'] 1019 if logged_in != None: 1020 uid = logged_in 1021 gid = etpConst['entropygid'] 1022 return self.entropyTools.spawn_function(self._do_fork, f, authenticator, uid, gid, *args, **kwargs)
1023
1024 - def _do_fork(self, f, authenticator, uid, gid, *args, **kwargs):
1025 authenticator.set_exc_permissions(uid,gid) 1026 rc = f(*args,**kwargs) 1027 return rc
1028
1029 - class BuiltInCommands(SocketCommands):
1030 1031 import entropy.dump as dumpTools 1032 import zlib 1033
1034 - def __init__(self, HostInterface):
1035 1036 SocketCommands.__init__(self, HostInterface, inst_name = "builtin") 1037 1038 self.valid_commands = { 1039 'begin': { 1040 'auth': False, # does it need authentication ? 1041 'built_in': True, # is it built-in ? 1042 'cb': self.docmd_begin, # function to call 1043 'args': ["self.transmit", "self.client_address"], # arguments to be passed before *args and **kwards, in SocketHostInterface.do_spawn() 1044 'as_user': False, # do I have to fork the process and run it as logged user? 1045 # needs auth = True 1046 'desc': "instantiate a session", # description 1047 'syntax': "begin", # syntax 1048 'from': unicode(self), # from what class 1049 }, 1050 'end': { 1051 'auth': False, 1052 'built_in': True, 1053 'cb': self.docmd_end, 1054 'args': ["self.transmit", "session"], 1055 'as_user': False, 1056 'desc': "end a session", 1057 'syntax': "<SESSION_ID> end", 1058 'from': unicode(self), 1059 }, 1060 'session_config': { 1061 'auth': False, 1062 'built_in': True, 1063 'cb': self.docmd_session_config, 1064 'args': ["session","myargs"], 1065 'as_user': False, 1066 'desc': "set session configuration options", 1067 'syntax': "<SESSION_ID> session_config <option> [parameters]", 1068 'from': unicode(self), 1069 }, 1070 'rc': { 1071 'auth': False, 1072 'built_in': True, 1073 'cb': self.docmd_rc, 1074 'args': ["self.transmit","session"], 1075 'as_user': False, 1076 'desc': "get data returned by the last valid command (streamed python object)", 1077 'syntax': "<SESSION_ID> rc", 1078 'from': unicode(self), 1079 }, 1080 'hello': { 1081 'auth': False, 1082 'built_in': True, 1083 'cb': self.docmd_hello, 1084 'args': ["self.transmit"], 1085 'as_user': False, 1086 'desc': "get server status", 1087 'syntax': "hello", 1088 'from': unicode(self), 1089 }, 1090 'alive': { 1091 'auth': True, 1092 'built_in': True, 1093 'cb': self.docmd_alive, 1094 'args': ["self.transmit","self.client_address","myargs"], 1095 'as_user': False, 1096 'desc': "check if a session is still alive", 1097 'syntax': "alive <SESSION_ID>", 1098 'from': unicode(self), 1099 }, 1100 'login': { 1101 'auth': False, 1102 'built_in': True, 1103 'cb': self.docmd_login, 1104 'args': ["self.transmit", "authenticator", "session", "self.client_address", "myargs"], 1105 'as_user': False, 1106 'desc': "login on the running server (allows running extra commands)", 1107 'syntax': "<SESSION_ID> login <authenticator parameters, default: <user> <auth_type> <password> >", 1108 'from': unicode(self), 1109 }, 1110 'user_data': { 1111 'auth': True, 1112 'built_in': True, 1113 'cb': self.docmd_userdata, 1114 'args': ["self.transmit", "authenticator", "session"], 1115 'as_user': False, 1116 'desc': "get general user information, user must be logged in", 1117 'syntax': "<SESSION_ID> user_data", 1118 'from': unicode(self), 1119 }, 1120 'logout': { 1121 'auth': True, 1122 'built_in': True, 1123 'cb': self.docmd_logout, 1124 'args': ["self.transmit", "authenticator", "session", "self.client_address", "myargs"], 1125 'as_user': False, 1126 'desc': "logout on the running server", 1127 'syntax': "<SESSION_ID> logout <USER>", 1128 'from': unicode(self), 1129 }, 1130 'help': { 1131 'auth': False, 1132 'built_in': True, 1133 'cb': self.docmd_help, 1134 'args': ["self.transmit"], 1135 'as_user': False, 1136 'desc': "this output", 1137 'syntax': "help", 1138 'from': unicode(self), 1139 }, 1140 'available_commands': { 1141 'auth': False, 1142 'built_in': True, 1143 'cb': self.docmd_available_commands, 1144 'args': ["self.HostInterface"], 1145 'as_user': False, 1146 'desc': "get info about available commands (you must retrieve this using the 'rc' command)", 1147 'syntax': "available_commands", 1148 'from': unicode(self), 1149 }, 1150 'stream': { 1151 'auth': True, 1152 'built_in': True, 1153 'cb': self.docmd_stream, 1154 'args': ["session", "myargs"], 1155 'as_user': False, 1156 'desc': "send a chunk of data to be saved on the session temp file path (will be removed on session expiration)", 1157 'syntax': "<SESSION_ID> stream <chunk of byte-string to write to file>", 1158 'from': unicode(self), 1159 }, 1160 } 1161 1162 self.no_acked_commands = ["rc", "begin", "end", "hello", "alive", "login", "logout","help"] 1163 self.termination_commands = ["quit","close"] 1164 self.initialization_commands = ["begin"] 1165 self.login_pass_commands = ["login"] 1166 self.no_session_commands = ["begin","hello","alive","help"] 1167 self.raw_commands = ["stream"] 1168 self.config_commands = ["session_config"]
1169
1170 - def docmd_session_config(self, session, myargs):
1171 1172 if not myargs: 1173 return False,"not enough parameters" 1174 1175 option = myargs[0] 1176 myopts = myargs[1:] 1177 1178 if option == "compression": 1179 docomp = True 1180 do_zlib = False 1181 if "zlib" in myopts: 1182 do_zlib = True 1183 if myopts: 1184 if isinstance(myopts[0],bool): 1185 docomp = myopts[0] 1186 else: 1187 try: 1188 docomp = eval(myopts[0]) 1189 except (NameError, TypeError,): 1190 pass 1191 if docomp and do_zlib: 1192 docomp = "zlib" 1193 elif docomp and not do_zlib: 1194 docomp = "gzip" 1195 else: 1196 docomp = None 1197 self.HostInterface.sessions[session]['compression'] = docomp 1198 return True,"compression now: %s" % (docomp,) 1199 elif option == "stream": 1200 dostream = True 1201 if "off" in myopts: 1202 dostream = False 1203 self.HostInterface.sessions[session]['stream_mode'] = dostream 1204 return True,'stream mode: %s' % (dostream,) 1205 else: 1206 return False,"invalid config option"
1207
1208 - def docmd_available_commands(self, host_interface):
1209 1210 def copy_obj(obj): 1211 if isinstance(obj,set) or isinstance(obj,dict): 1212 return obj.copy() 1213 elif isinstance(obj,list) or isinstance(obj,tuple): 1214 return obj[:] 1215 return obj
1216 1217 def can_be_streamed(obj): 1218 if isinstance(obj,(bool,basestring,int,float,list,tuple,set,dict,)): 1219 return True 1220 return False
1221 1222 mydata = {} 1223 mydata['disabled_commands'] = copy_obj(host_interface.disabled_commands) 1224 valid_cmds = copy_obj(host_interface.valid_commands) 1225 mydata['valid_commands'] = {} 1226 for cmd in valid_cmds: 1227 mydict = {} 1228 for item in valid_cmds[cmd]: 1229 param = valid_cmds[cmd][item] 1230 if not can_be_streamed(param): 1231 continue 1232 mydict[item] = param 1233 mydata['valid_commands'][cmd] = mydict.copy() 1234 1235 return mydata 1236
1237 - def docmd_stream(self, session, myargs):
1238 1239 if not self.HostInterface.sessions[session]['stream_mode']: 1240 return False,'not in stream mode' 1241 if not myargs: 1242 return False,'no stream sent' 1243 1244 compression = self.HostInterface.sessions[session]['compression'] 1245 1246 stream = myargs[0] 1247 stream_path = self.HostInterface.sessions[session]['stream_path'] 1248 stream_dir = os.path.dirname(stream_path) 1249 if not os.path.isdir(os.path.dirname(stream_path)): 1250 try: 1251 os.makedirs(stream_dir) 1252 if etpConst['entropygid'] != None: 1253 const_setup_perms(stream_dir,etpConst['entropygid']) 1254 except OSError: 1255 return False,'cannot initialize stream directory' 1256 1257 f = open(stream_path,'abw') 1258 if compression: 1259 stream = self.zlib.decompress(stream) 1260 f.write(stream) 1261 f.flush() 1262 f.close() 1263 1264 return True,'ok'
1265
1266 - def docmd_login(self, transmitter, authenticator, session, client_address, myargs):
1267 1268 # is already auth'd? 1269 auth_uid = self.HostInterface.sessions[session]['auth_uid'] 1270 if auth_uid != None: 1271 return False,"already authenticated" 1272 1273 status, user, uid, reason = authenticator.docmd_login(myargs) 1274 if status: 1275 self.HostInterface.updateProgress( 1276 '[from: %s] user %s logged in successfully, session: %s' % ( 1277 client_address, 1278 user, 1279 session, 1280 ) 1281 ) 1282 self.HostInterface.sessions[session]['auth_uid'] = uid 1283 transmitter(self.HostInterface.answers['ok']) 1284 return True,reason 1285 elif user == None: 1286 self.HostInterface.updateProgress( 1287 '[from: %s] user -not specified- login failed, session: %s, reason: %s' % ( 1288 client_address, 1289 session, 1290 reason, 1291 ) 1292 ) 1293 transmitter(self.HostInterface.answers['no']) 1294 return False,reason 1295 else: 1296 self.HostInterface.updateProgress( 1297 '[from: %s] user %s login failed, session: %s, reason: %s' % ( 1298 client_address, 1299 user, 1300 session, 1301 reason, 1302 ) 1303 ) 1304 transmitter(self.HostInterface.answers['no']) 1305 return False,reason
1306
1307 - def docmd_userdata(self, transmitter, authenticator, session):
1308 1309 auth_uid = self.HostInterface.sessions[session]['auth_uid'] 1310 if auth_uid == None: 1311 return False,None,"not authenticated" 1312 1313 return authenticator.docmd_userdata()
1314
1315 - def docmd_logout(self, transmitter, authenticator, session, client_address, myargs):
1316 status, user, reason = authenticator.docmd_logout(myargs) 1317 if status: 1318 self.HostInterface.updateProgress( 1319 '[from: %s] user %s logged out successfully, session: %s, args: %s ' % ( 1320 client_address, 1321 user, 1322 session, 1323 myargs, 1324 ) 1325 ) 1326 self.HostInterface.sessions[session]['auth_uid'] = None 1327 transmitter(self.HostInterface.answers['ok']) 1328 return True,reason 1329 elif user == None: 1330 self.HostInterface.updateProgress( 1331 '[from: %s] user -not specified- logout failed, session: %s, args: %s, reason: %s' % ( 1332 client_address, 1333 session, 1334 myargs, 1335 reason, 1336 ) 1337 ) 1338 transmitter(self.HostInterface.answers['no']) 1339 return False,reason 1340 else: 1341 self.HostInterface.updateProgress( 1342 '[from: %s] user %s logout failed, session: %s, args: %s, reason: %s' % ( 1343 client_address, 1344 user, 1345 session, 1346 myargs, 1347 reason, 1348 ) 1349 ) 1350 transmitter(self.HostInterface.answers['no']) 1351 return False,reason
1352
1353 - def docmd_alive(self, transmitter, client_address, myargs):
1354 cmd = self.HostInterface.answers['no'] 1355 alive = False 1356 if myargs: 1357 session_data = self.HostInterface.sessions.get(myargs[0]) 1358 if session_data != None: 1359 if client_address[0] == session_data.get('ip_address'): 1360 cmd = self.HostInterface.answers['ok'] 1361 alive = True 1362 transmitter(cmd) 1363 return alive
1364
1365 - def docmd_hello(self, transmitter):
1366 from entropy.tools import getstatusoutput 1367 from entropy.core import SystemSettings 1368 sys_settings = SystemSettings() 1369 uname = os.uname() 1370 kern_string = uname[2] 1371 running_host = uname[1] 1372 running_arch = uname[4] 1373 load_stats = getstatusoutput('uptime')[1].split("\n")[0] 1374 text = "Entropy Server %s, connections: %s ~ running on: %s ~ host: %s ~ arch: %s, kernel: %s, stats: %s\n" % ( 1375 etpConst['entropyversion'], 1376 self.HostInterface.connections, 1377 sys_settings['system']['name'], 1378 running_host, 1379 running_arch, 1380 kern_string, 1381 load_stats 1382 ) 1383 transmitter(text)
1384
1385 - def docmd_help(self, transmitter):
1386 text = '\nEntropy Socket Interface Help Menu\n' + \ 1387 'Available Commands:\n\n' 1388 valid_cmds = sorted(self.HostInterface.valid_commands.keys()) 1389 for cmd in valid_cmds: 1390 if self.HostInterface.valid_commands[cmd].has_key('desc'): 1391 desc = self.HostInterface.valid_commands[cmd]['desc'] 1392 else: 1393 desc = 'no description available' 1394 1395 if self.HostInterface.valid_commands[cmd].has_key('syntax'): 1396 syntax = self.HostInterface.valid_commands[cmd]['syntax'] 1397 else: 1398 syntax = 'no syntax available' 1399 if self.HostInterface.valid_commands[cmd].has_key('from'): 1400 myfrom = self.HostInterface.valid_commands[cmd]['from'] 1401 else: 1402 myfrom = 'N/A' 1403 text += "[%s] %s\n %s: %s\n %s: %s\n" % ( 1404 myfrom, 1405 blue(cmd), 1406 red("description"), 1407 desc.strip(), 1408 darkgreen("syntax"), 1409 syntax, 1410 ) 1411 transmitter(text)
1412
1413 - def docmd_end(self, transmitter, session):
1414 rc = self.HostInterface.destroy_session(session) 1415 cmd = self.HostInterface.answers['no'] 1416 if rc: cmd = self.HostInterface.answers['ok'] 1417 transmitter(cmd) 1418 return rc
1419
1420 - def docmd_begin(self, transmitter, client_address):
1421 session = self.HostInterface.get_new_session(client_address[0]) 1422 transmitter(session) 1423 return session
1424
1425 - def docmd_rc(self, transmitter, session):
1426 rc = self.HostInterface.get_rc(session) 1427 comp = self.HostInterface.sessions[session]['compression'] 1428 myserialized = self.dumpTools.serialize_string(rc) 1429 if comp == "zlib": # new shiny zlib 1430 myserialized = self.zlib.compress(myserialized, 7) # compression level 1-9 1431 elif comp == "gzip": # old and burried 1432 import gzip 1433 try: 1434 import cStringIO as stringio 1435 except ImportError: 1436 import StringIO as stringio 1437 f = stringio.StringIO() 1438 self.dumpTools.serialize(rc, f) 1439 myf = stringio.StringIO() 1440 mygz = gzip.GzipFile( 1441 mode = 'wb', 1442 fileobj = myf 1443 ) 1444 f.seek(0) 1445 chunk = f.read(8192) 1446 while chunk: 1447 mygz.write(chunk) 1448 chunk = f.read(8192) 1449 mygz.flush() 1450 mygz.close() 1451 myserialized = myf.getvalue() 1452 f.close() 1453 myf.close() 1454 1455 1456 transmitter(myserialized) 1457 1458 return rc
1459
1460 - def __init__(self, service_interface, *args, **kwds):
1461 1462 import gc 1463 self.gc = gc 1464 import threading 1465 self.threading = threading 1466 import entropy.tools as entropyTools 1467 from entropy.misc import TimeScheduled 1468 self.TimeScheduled = TimeScheduled 1469 self.entropyTools = entropyTools 1470 self.Server = None 1471 self.Gc = None 1472 self.PythonGarbageCollector = None 1473 self.AuthenticatorInst = None 1474 1475 self.args = args 1476 self.kwds = kwds 1477 from entropy.misc import LogFile 1478 self.socketLog = LogFile( 1479 level = etpConst['socketloglevel'], 1480 filename = etpConst['socketlogfile'], 1481 header = "[Socket]" 1482 ) 1483 1484 # settings 1485 from entropy.core import SystemSettings 1486 import copy 1487 """ 1488 SystemSettings is a singleton, and we just need to read 1489 socket configuration. we don't want to mess other instances 1490 so we pay attention to not use it more than what is needed. 1491 """ 1492 sys_settings = SystemSettings() 1493 self.__socket_settings = copy.deepcopy(sys_settings['socket_service']) 1494 1495 self.SessionsLock = self.threading.Lock() 1496 self.fork_requests = True # used by the command processor 1497 self.fork_request_timeout_seconds = self.__socket_settings['forked_requests_timeout'] 1498 self.stdout_logging = True 1499 self.timeout = self.__socket_settings['timeout'] 1500 self.hostname = self.__socket_settings['hostname'] 1501 self.session_ttl = self.__socket_settings['session_ttl'] 1502 if self.hostname == "*": self.hostname = '' 1503 self.port = self.__socket_settings['port'] 1504 self.threads = self.__socket_settings['threads'] # maximum number of allowed sessions 1505 self.max_connections = self.__socket_settings['max_connections'] 1506 self.max_connections_per_host = self.__socket_settings['max_connections_per_host'] 1507 self.max_connections_per_host_barrier = self.__socket_settings['max_connections_per_host_barrier'] 1508 self.max_command_length = self.__socket_settings['max_command_length'] 1509 self.disabled_commands = self.__socket_settings['disabled_cmds'] 1510 self.ip_blacklist = self.__socket_settings['ip_blacklist'] 1511 self.answers = self.__socket_settings['answers'] 1512 self.connections = 0 1513 self.per_host_connections = {} 1514 self.sessions = {} 1515 self.__output = None 1516 self.SSL = {} 1517 self.SSL_exceptions = {} 1518 self.SSL_exceptions['WantReadError'] = None 1519 self.SSL_exceptions['WantWriteError'] = None 1520 self.SSL_exceptions['WantX509LookupError'] = None 1521 self.SSL_exceptions['ZeroReturnError'] = None 1522 self.SSL_exceptions['SysCallError'] = None 1523 self.SSL_exceptions['Error'] = [] 1524 self.last_print = '' 1525 self.valid_commands = {} 1526 self.no_acked_commands = [] 1527 self.raw_commands = [] 1528 self.config_commands = [] 1529 self.termination_commands = [] 1530 self.initialization_commands = [] 1531 self.login_pass_commands = [] 1532 self.no_session_commands = [] 1533 self.command_classes = [self.BuiltInCommands] 1534 self.command_instances = [] 1535 self.EntropyInstantiation = (service_interface, self.args, self.kwds) 1536 1537 self.setup_external_command_classes() 1538 self.start_local_output_interface() 1539 self.setup_authenticator() 1540 self.setup_hostname() 1541 self.setup_commands() 1542 self.disable_commands() 1543 self.start_session_garbage_collector() 1544 self.setup_ssl() 1545 self.start_python_garbage_collector()
1546
1547 - def killall(self):
1548 if hasattr(self,'socketLog'): 1549 self.socketLog.close() 1550 if self.Server != None: 1551 self.Server.alive = False 1552 if self.Gc != None: 1553 self.Gc.kill() 1554 if self.PythonGarbageCollector != None: 1555 self.PythonGarbageCollector.kill()
1556
1557 - def append_eos(self, data):
1558 return str(len(data)) + \ 1559 self.answers['eos'] + \ 1560 data
1561
1562 - def setup_ssl(self):
1563 1564 do_ssl = False 1565 if self.kwds.has_key('ssl'): 1566 do_ssl = self.kwds.pop('ssl') 1567 1568 if not do_ssl: 1569 return 1570 1571 try: 1572 from OpenSSL import SSL, crypto 1573 except ImportError, e: 1574 self.updateProgress('Unable to load OpenSSL, error: %s' % (repr(e),)) 1575 return 1576 self.SSL_exceptions['WantReadError'] = SSL.WantReadError 1577 self.SSL_exceptions['Error'] = SSL.Error 1578 self.SSL_exceptions['WantWriteError'] = SSL.WantWriteError 1579 self.SSL_exceptions['WantX509LookupError'] = SSL.WantX509LookupError 1580 self.SSL_exceptions['ZeroReturnError'] = SSL.ZeroReturnError 1581 self.SSL_exceptions['SysCallError'] = SSL.SysCallError 1582 self.SSL['m'] = SSL 1583 self.SSL['crypto'] = crypto 1584 self.SSL['key'] = self.__socket_settings['ssl_key'] 1585 self.SSL['cert'] = self.__socket_settings['ssl_cert'] 1586 self.SSL['ca_cert'] = self.__socket_settings['ssl_ca_cert'] 1587 self.SSL['ca_pkey'] = self.__socket_settings['ssl_ca_pkey'] 1588 # change port 1589 self.port = self.__socket_settings['ssl_port'] 1590 self.SSL['not_before'] = 0 1591 self.SSL['not_after'] = 60*60*24*365*5 # 5 years 1592 self.SSL['serial'] = 0 1593 self.SSL['digest'] = 'md5' 1594 1595 if not (os.path.isfile(self.SSL['ca_cert']) and \ 1596 os.path.isfile(self.SSL['ca_pkey']) and \ 1597 os.path.isfile(self.SSL['key']) and \ 1598 os.path.isfile(self.SSL['cert'])): 1599 self.create_ca_server_certs( 1600 self.SSL['serial'], 1601 self.SSL['digest'], 1602 self.SSL['not_before'], 1603 self.SSL['not_after'], 1604 self.SSL['ca_pkey'], 1605 self.SSL['ca_cert'], 1606 self.SSL['key'], 1607 self.SSL['cert'] 1608 ) 1609 os.chmod(self.SSL['ca_cert'],0644) 1610 try: 1611 os.chown(self.SSL['ca_cert'],-1,0) 1612 except OSError: 1613 pass 1614 os.chmod(self.SSL['ca_pkey'],0600) 1615 try: 1616 os.chown(self.SSL['ca_pkey'],-1,0) 1617 except OSError: 1618 pass 1619 1620 os.chmod(self.SSL['key'],0600) 1621 try: 1622 os.chown(self.SSL['key'],-1,0) 1623 except OSError: 1624 pass 1625 os.chmod(self.SSL['cert'],0644) 1626 try: 1627 os.chown(self.SSL['cert'],-1,0) 1628 except OSError: 1629 pass
1630
1631 - def create_ca_server_certs(self, serial, digest, not_before, not_after, ca_pkey_dest, ca_cert_dest, server_key, server_cert):
1632 1633 mycn = 'Entropy Repository Service' 1634 cakey = self.create_ssl_key_pair(self.SSL['crypto'].TYPE_RSA, 1024) 1635 careq = self.create_ssl_certificate_request(cakey, digest, CN = mycn) 1636 cert = self.SSL['crypto'].X509() 1637 cert.set_serial_number(serial) 1638 cert.gmtime_adj_notBefore(not_before) 1639 cert.gmtime_adj_notAfter(not_after) 1640 cert.set_issuer(careq.get_subject()) 1641 cert.set_subject(careq.get_subject()) 1642 cert.sign(cakey, digest) 1643 1644 # now create server key + cert 1645 s_pkey = self.create_ssl_key_pair(self.SSL['crypto'].TYPE_RSA, 1024) 1646 s_req = self.create_ssl_certificate_request(s_pkey, digest, CN = mycn) 1647 s_cert = self.SSL['crypto'].X509() 1648 s_cert.set_serial_number(serial+1) 1649 s_cert.gmtime_adj_notBefore(not_before) 1650 s_cert.gmtime_adj_notAfter(not_after) 1651 s_cert.set_issuer(cert.get_subject()) 1652 s_cert.set_subject(s_req.get_subject()) 1653 s_cert.set_pubkey(s_req.get_pubkey()) 1654 s_cert.sign(cakey, digest) 1655 1656 # write CA 1657 if os.path.isfile(ca_pkey_dest): 1658 shutil.move(ca_pkey_dest,ca_pkey_dest+".moved") 1659 f = open(ca_pkey_dest,"w") 1660 f.write(self.SSL['crypto'].dump_privatekey(self.SSL['crypto'].FILETYPE_PEM, cakey)) 1661 f.flush() 1662 f.close() 1663 if os.path.isfile(ca_cert_dest): 1664 shutil.move(ca_cert_dest,ca_cert_dest+".moved") 1665 f = open(ca_cert_dest,"w") 1666 f.write(self.SSL['crypto'].dump_certificate(self.SSL['crypto'].FILETYPE_PEM, cert)) 1667 f.flush() 1668 f.close() 1669 1670 if os.path.isfile(server_key): 1671 shutil.move(server_key,server_key+".moved") 1672 # write server 1673 f = open(server_key,"w") 1674 f.write(self.SSL['crypto'].dump_privatekey(self.SSL['crypto'].FILETYPE_PEM, s_pkey)) 1675 f.flush() 1676 f.close() 1677 if os.path.isfile(server_cert): 1678 shutil.move(server_cert,server_cert+".moved") 1679 f = open(server_cert,"w") 1680 f.write(self.SSL['crypto'].dump_certificate(self.SSL['crypto'].FILETYPE_PEM, s_cert)) 1681 f.flush() 1682 f.close()
1683
1684 - def create_ssl_key_pair(self, keytype, bits):
1685 pkey = self.SSL['crypto'].PKey() 1686 pkey.generate_key(keytype, bits) 1687 return pkey
1688
1689 - def create_ssl_certificate_request(self, pkey, digest, **name):
1690 req = self.SSL['crypto'].X509Req() 1691 subj = req.get_subject() 1692 for (key,value) in name.items(): 1693 setattr(subj, key, value) 1694 req.set_pubkey(pkey) 1695 req.sign(pkey, digest) 1696 return req
1697
1698 - def setup_external_command_classes(self):
1699 1700 if self.kwds.has_key('external_cmd_classes'): 1701 ext_commands = self.kwds.pop('external_cmd_classes') 1702 if not isinstance(ext_commands,list): 1703 raise InvalidDataType("InvalidDataType: external_cmd_classes must be a list") 1704 self.command_classes += ext_commands
1705
1706 - def setup_commands(self):
1707 1708 identifiers = set() 1709 for myclass in self.command_classes: 1710 1711 myargs = [] 1712 mykwargs = {} 1713 if isinstance(myclass,tuple) or isinstance(myclass,list): 1714 if len(myclass) > 2: 1715 mykwargs = myclass[2] 1716 if len(myclass) > 1: 1717 myargs = myclass[1] 1718 myclass = myclass[0] 1719 1720 myinst = myclass(self, *myargs, **mykwargs) 1721 if str(myinst) in identifiers: 1722 raise PermissionDenied("PermissionDenied: another command instance is owning this name") 1723 identifiers.add(str(myinst)) 1724 self.command_instances.append(myinst) 1725 # now register 1726 myinst.register( self.valid_commands, 1727 self.no_acked_commands, 1728 self.termination_commands, 1729 self.initialization_commands, 1730 self.login_pass_commands, 1731 self.no_session_commands, 1732 self.raw_commands, 1733 self.config_commands 1734 )
1735
1736 - def disable_commands(self):
1737 for cmd in self.disabled_commands: 1738 1739 if cmd in self.valid_commands: 1740 self.valid_commands.pop(cmd) 1741 1742 if cmd in self.no_acked_commands: 1743 self.no_acked_commands.remove(cmd) 1744 1745 if cmd in self.termination_commands: 1746 self.termination_commands.remove(cmd) 1747 1748 if cmd in self.initialization_commands: 1749 self.initialization_commands.remove(cmd) 1750 1751 if cmd in self.login_pass_commands: 1752 self.login_pass_commands.remove(cmd) 1753 1754 if cmd in self.no_session_commands: 1755 self.no_session_commands.remove(cmd) 1756 1757 if cmd in self.raw_commands: 1758 self.raw_commands.remove(cmd) 1759 1760 if cmd in self.config_commands: 1761 self.config_commands.remove(cmd)
1762
1763 - def start_local_output_interface(self):
1764 if self.kwds.has_key('sock_output'): 1765 outputIntf = self.kwds.pop('sock_output') 1766 self.__output = outputIntf
1767
1768 - def setup_authenticator(self):
1769 1770 # lock, if perhaps some implementations need it 1771 self.AuthenticatorLock = self.threading.Lock() 1772 auth_inst = (self.BasicPamAuthenticator, [self], {},) # authentication class, args, keywords 1773 # external authenticator 1774 if self.kwds.has_key('sock_auth'): 1775 authIntf = self.kwds.pop('sock_auth') 1776 if type(authIntf) is tuple: 1777 if len(authIntf) == 3: 1778 auth_inst = authIntf[:] 1779 else: 1780 raise IncorrectParameter("IncorrectParameter: wront authentication interface specified") 1781 else: 1782 raise IncorrectParameter("IncorrectParameter: wront authentication interface specified") 1783 # initialize authenticator 1784 self.AuthenticatorInst = (auth_inst[0],[self]+auth_inst[1],auth_inst[2],)
1785
1786 - def start_python_garbage_collector(self):
1787 self.PythonGarbageCollector = self.TimeScheduled(3600, self.python_garbage_collect) 1788 self.PythonGarbageCollector.set_accuracy(False) 1789 self.PythonGarbageCollector.start()
1790
1791 - def start_session_garbage_collector(self):
1792 self.Gc = self.TimeScheduled(5, self.gc_clean) 1793 self.Gc.start()
1794
1795 - def python_garbage_collect(self):
1796 self.gc.collect() 1797 self.gc.collect() 1798 self.gc.collect()
1799
1800 - def gc_clean(self):
1801 if not self.sessions: 1802 return 1803 1804 with self.SessionsLock: 1805 for session_id in self.sessions.keys(): 1806 sess_time = self.sessions[session_id]['t'] 1807 is_running = self.sessions[session_id]['running'] 1808 auth_uid = self.sessions[session_id]['auth_uid'] # is kept alive? 1809 if (is_running) or (auth_uid == -1): 1810 if auth_uid == -1: 1811 self.updateProgress('not killing session %s, since it is kept alive by auth_uid=-1' % (session_id,) ) 1812 continue 1813 cur_time = time.time() 1814 ttl = self.session_ttl 1815 check_time = sess_time + ttl 1816 if cur_time > check_time: 1817 self.updateProgress('killing session %s, ttl: %ss: no activity' % (session_id,ttl,) ) 1818 self._destroy_session(session_id)
1819
1820 - def setup_hostname(self):
1821 if self.hostname: 1822 try: 1823 self.hostname = self.get_ip_address(self.hostname) 1824 except IOError: # it isn't a device name 1825 pass
1826
1827 - def get_ip_address(self, ifname):
1828 import fcntl 1829 import struct 1830 mysock = self.socket.socket ( self.socket.AF_INET, self.socket.SOCK_STREAM ) 1831 return self.socket.inet_ntoa(fcntl.ioctl(mysock.fileno(), 0x8915, struct.pack('256s', ifname[:15]))[20:24])
1832
1833 - def get_md5_hash(self, salt):
1834 import hashlib 1835 m = hashlib.md5() 1836 m.update(os.urandom(2)) 1837 m.update(salt) 1838 return m.hexdigest()
1839
1840 - def get_new_session(self, ip_address):
1841 with self.SessionsLock: 1842 if len(self.sessions) > self.threads: 1843 # fuck! 1844 return "0" 1845 rng = self.get_md5_hash(str(ip_address)) 1846 while rng in self.sessions: 1847 rng = self.get_md5_hash(str(ip_address)) 1848 self.sessions[rng] = {} 1849 self.sessions[rng]['running'] = False 1850 self.sessions[rng]['auth_uid'] = None 1851 self.sessions[rng]['admin'] = False 1852 self.sessions[rng]['moderator'] = False 1853 self.sessions[rng]['user'] = False 1854 self.sessions[rng]['developer'] = False 1855 self.sessions[rng]['compression'] = None 1856 self.sessions[rng]['stream_mode'] = False 1857 try: 1858 self.sessions[rng]['stream_path'] = self.entropyTools.get_random_temp_file() 1859 except (IOError,OSError,): 1860 self.sessions[rng]['stream_path'] = '' 1861 self.sessions[rng]['t'] = time.time() 1862 self.sessions[rng]['ip_address'] = ip_address 1863 return rng
1864
1865 - def update_session_time(self, session):
1866 with self.SessionsLock: 1867 if self.sessions.has_key(session): 1868 self.sessions[session]['t'] = time.time() 1869 self.updateProgress('session time updated for %s' % (session,) )
1870
1871 - def set_session_running(self, session):
1872 with self.SessionsLock: 1873 if self.sessions.has_key(session): 1874 self.sessions[session]['running'] = True
1875
1876 - def unset_session_running(self, session):
1877 with self.SessionsLock: 1878 if self.sessions.has_key(session): 1879 self.sessions[session]['running'] = False
1880
1881 - def destroy_session(self, session):
1882 with self.SessionsLock: 1883 self._destroy_session(session)
1884
1885 - def _destroy_session(self, session):
1886 if self.sessions.has_key(session): 1887 stream_path = self.sessions[session]['stream_path'] 1888 del self.sessions[session] 1889 if os.path.isfile(stream_path) and os.access(stream_path,os.W_OK) and not os.path.islink(stream_path): 1890 try: os.remove(stream_path) 1891 except OSError: pass 1892 return True 1893 return False
1894
1895 - def go(self):
1896 self.socket.setdefaulttimeout(self.timeout) 1897 while 1: 1898 try: 1899 self.Server = self.HostServerMixin( 1900 (self.hostname, self.port), 1901 self.RequestHandler, 1902 self.CommandProcessor(self), 1903 self 1904 ) 1905 break 1906 except self.socket.error, e: 1907 if e[0] == 98: 1908 # Address already in use 1909 self.updateProgress('address already in use (%s, port: %s), waiting 5 seconds...' % (self.hostname,self.port,)) 1910 time.sleep(5) 1911 continue 1912 else: 1913 raise 1914 self.updateProgress('server connected, listening on: %s, port: %s, timeout: %s' % (self.hostname,self.port,self.timeout,)) 1915 self.Server.serve_forever() 1916 self.Gc.kill()
1917
1918 - def store_rc(self, rc, session):
1919 with self.SessionsLock: 1920 if session in self.sessions: 1921 if type(rc) in (list,tuple,): 1922 rc_item = rc[:] 1923 elif type(rc) in (set,frozenset,dict,): 1924 rc_item = rc.copy() 1925 else: 1926 rc_item = rc 1927 self.sessions[session]['rc'] = rc_item
1928
1929 - def get_rc(self, session):
1930 with self.SessionsLock: 1931 if session in self.sessions: 1932 return self.sessions[session].get('rc')
1933
1934 - def _ssl_poll(self, sock_obj, filter_type, caller_name):
1935 poller = select.poll() 1936 poller.register(sock_obj, filter_type) 1937 res = poller.poll(sock_obj.gettimeout() * 1000) 1938 if len(res) != 1: 1939 raise TimeoutError("Connection timed out on %s" % caller_name)
1940
1941 - def transmit(self, channel, data):
1942 if self.SSL: 1943 mydata = self.append_eos(data) 1944 encode_done = False 1945 while 1: 1946 try: 1947 sent = channel.send(mydata) 1948 if sent == len(mydata): 1949 break 1950 mydata = mydata[sent:] 1951 except self.SSL_exceptions['WantWriteError']: 1952 self._ssl_poll(channel, select.POLLOUT, 'write') 1953 except self.SSL_exceptions['WantReadError']: 1954 self._ssl_poll(channel, select.POLLIN, 'write') 1955 except UnicodeEncodeError: 1956 if encode_done: 1957 raise 1958 mydata = mydata.encode('utf-8') 1959 encode_done = True 1960 continue 1961 else: 1962 channel.sendall(self.append_eos(data))
1963
1964 - def updateProgress(self, *args, **kwargs):
1965 message = args[0] 1966 if message != self.last_print: 1967 self.socketLog.log(ETP_LOGPRI_INFO,ETP_LOGLEVEL_NORMAL,str(args[0])) 1968 if self.__output != None and self.stdout_logging: 1969 self.__output.updateProgress(*args,**kwargs) 1970 self.last_print = message
1971