# Copyright (c) 2009-2011 by Minor Gordon, Bjoern Kolbeck, Zuse Institute Berlin # Licensed under the BSD License, see LICENSE file for details. from datetime import datetime from time import sleep import sys, os, subprocess, signal class Server: def __init__(self, start_stop_retries, config_file_path, run_dir_path, xtreemfs_dir, data_dir, rpc_port, uuid, storage_threads): self._start_stop_retries = start_stop_retries self._config_file_path = config_file_path self._run_dir_path = run_dir_path self._xtreemfs_dir = xtreemfs_dir self._data_dir = data_dir self._config = dict() # Initialize with default values self._config['listen.port'] = rpc_port self._config['http_port'] = rpc_port - 2000 self._config['debug.level'] = 6 self._config['uuid'] = uuid self._config['ssl.enabled'] = 'false' self._config['storage_threads'] = storage_threads def configure(self): pass # Nothing to do here. def set_debug_level(self, debug_level): self._config['debug.level'] = debug_level def enable_ssl(self, use_gridssl, pkcs12_file_path, pkcs12_passphrase, trusted_certs_jks_file_path, trusted_certs_jks_passphrase): self._config['ssl.enabled'] = 'true' if use_gridssl: self._config['ssl.grid_ssl'] = 'true' else: self._config['ssl.grid_ssl'] = 'false' self._config['ssl.service_creds'] = pkcs12_file_path self._config['ssl.service_creds.pw'] = pkcs12_passphrase self._config['ssl.service_creds.container'] = 'PKCS12' self._config['ssl.trusted_certs'] = trusted_certs_jks_file_path self._config['ssl.trusted_certs.pw'] = trusted_certs_jks_passphrase self._config['ssl.trusted_certs.container'] = 'JKS' # set configuration parameters required for SNMP support def enable_snmp(self, snmp_port, snmp_address, snmp_aclfile): self._config['snmp.enabled'] = 'true' self._config['snmp.port'] = snmp_port self._config['snmp.address'] = snmp_address self._config['snmp.aclfile'] = snmp_aclfile # Imports the configuration from the config file. def read_config_file(self): self._config = dict() for line in open(self._config_file_path).readlines(): line_parts = line.split( "=", 1 ) if len( line_parts ) == 2: self._config[line_parts[0].strip()] = line_parts[1].strip() # Writes the current configuration to a config file def write_config_file(self): text = "# autogenerated by test_server.py at "+str(datetime.now())+"\n" for k in sorted(self._config.keys()): text += str(k) + "=" + str(self._config[k]) + "\n" f = open(self._config_file_path,'w') f.write(text) f.close() def get_config_file_path(self): return self._config_file_path def _get_config_property(self, key): return self._config[key] def _get_pid_file_path(self): return os.path.join(self._run_dir_path, self.get_uuid() + ".pid") def get_http_port(self): return int(self._config["http_port"]) def get_rpc_port(self): return int(self._config["listen.port"]) def get_uuid(self): return self._config["uuid"] def getServiceUrl(self): url = "pbrpc://" if (self._config['ssl.enabled'] == 'true'): if (self._config['ssl.grid_ssl'] == 'true'): url = "pbrpcg://" else: url = "pbrpcs://" url += "localhost:" + str(self._config["listen.port"]) + "/" return url def is_running(self): pid_file_path = self._get_pid_file_path() if os.path.exists(pid_file_path): pid = open(pid_file_path).read().strip() try: pid = int(pid) except ValueError: return False #print "xtestenv: checking if", self.__class__.__name__, "server is running with pid", pid try: pid, exitvalue = os.waitpid(int(pid), os.WNOHANG) if pid != 0 and exitvalue != 0: return False else: return True except OSError: return False else: return False def save_status_page(self, to_file_path): http_port = self.get_http_port() os.system("wget -O %(to_file_path)s http://localhost:%(http_port)u" % locals()) def start(self, log_file_path=None): if sys.platform == "win32" or not self.is_running(): try: os.mkdir(self._run_dir_path) except: pass pid_file_path = self._get_pid_file_path() java_args = [os.path.join(os.environ["JAVA_HOME"], "bin", "java")] # Enable assertions. java_args.append("-ea") # Construct the -cp classpath XtreemFS_jar_file_path = os.path.abspath(os.path.join(self._xtreemfs_dir, "java", "servers", "dist", "XtreemFS.jar")) if os.path.exists(XtreemFS_jar_file_path): classpath = ( XtreemFS_jar_file_path, os.path.abspath(os.path.join(self._xtreemfs_dir, "java", "lib", "BabuDB.jar")), os.path.abspath(os.path.join(self._xtreemfs_dir, "java", "lib", "protobuf-java-2.5.0.jar")), os.path.abspath(os.path.join(self._xtreemfs_dir, "java", "flease", "dist", "Flease.jar")), os.path.abspath(os.path.join(self._xtreemfs_dir, "java", "foundation", "dist", "Foundation.jar")), os.path.abspath(os.path.join(self._xtreemfs_dir, "java", "lib", "jdmkrt.jar")), os.path.abspath(os.path.join(self._xtreemfs_dir, "java", "lib", "commons-codec-1.3.jar")), ) else: classpath = ( os.path.join("/usr/share/java", "XtreemFS.jar"), os.path.join("/usr/share/java", "BabuDB.jar"), os.path.join("/usr/share/java", "protobuf-java-2.5.0.jar"), os.path.join("/usr/share/java", "Flease.jar"), os.path.join("/usr/share/java", "Foundation.jar"), os.path.join("/usr/share/java", "jdmkrt.jar"), os.path.join("/usr/share/java", "commons-codec-1.3.jar"), ) if sys.platform.startswith("win"): classpath = ";".join(classpath) else: classpath = ":".join(classpath) java_args.extend(("-cp", classpath)) # Name of the class to start java_args.append("org.xtreemfs." + self.__class__.__name__.lower() + "." + self.__class__.__name__.upper()) # .config file java_args.append(self.get_config_file_path()) # Don't .join java_args, since Popen wants a sequence when shell=False if log_file_path is None: stderr = sys.stderr stdout = sys.stdout else: # Redirect stderr and stdout to a log file stderr = stdout = open(log_file_path, "a") #print "xctl: starting", self.__class__.__name__, "server with UUID", self.get_uuid(), "on port", self.get_rpc_port(), "with", " ".join(java_args) p = subprocess.Popen(java_args, stdout=stdout, stderr=stderr) # No shell=True: we only want one process (java), not two (/bin/sh and java) if p.returncode is not None: raise RuntimeError(self.get_uuid() + " failed to start: " + str(p.returncode)) pidfile = open(pid_file_path, "w+") pidfile.write(str(p.pid)) pidfile.close() print "xtestenv: started", self.__class__.__name__, "server with UUID", self.get_uuid(), "on port", self.get_rpc_port(), "with pid", p.pid sleep(1.0) if not self.is_running(): raise RuntimeError, self.get_uuid() + " failed to start" else: print "xtestenv:", self.__class__.__name__, "server with UUID", self.get_uuid(), "is already running" def stop(self): pid_file_path = self._get_pid_file_path() if os.path.exists(pid_file_path): pid = int(open(pid_file_path).read().strip()) if sys.platform.startswith("win"): subprocess.call("TASKKILL /PID %(pid)u /F /T" % locals()) killed = True else: killed = False for signo in (signal.SIGTERM, signal.SIGKILL): for try_i in xrange(self._start_stop_retries): print "xtestenv: stopping", self.__class__.__name__, "server with pid", pid, "with signal", str(signo) + ", try", try_i try: os.kill(pid, signo) except: pass sleep(0.5) try: if os.waitpid(pid, os.WNOHANG)[0] != 0: killed = True break except OSError: killed = True break except: if DEBUG_ME: traceback.print_exc() if killed: break if killed: os.unlink(pid_file_path) else: print "xtestenv: no pid file for", self.__class__.__name__, "server" class DIR(Server): def configure(self): try: os.mkdir(self._data_dir) except: pass self._config['babudb.debug.level'] = self._config['debug.level'] self._config['babudb.logDir'] = self._data_dir self._config['babudb.baseDir'] = self._data_dir self._config['babudb.sync'] = 'FSYNC' self._config['babudb.worker.maxQueueLength'] = '250' self._config['babudb.worker.numThreads'] = '0' self._config['babudb.maxLogfileSize'] = '16777216' self._config['babudb.checkInterval'] = '300' self._config['babudb.pseudoSyncWait'] = '200' self._config['database.dir'] = self._data_dir self._config['database.log'] = self._data_dir self._config['authentication_provider'] = 'org.xtreemfs.common.auth.NullAuthProvider' class MRC(Server): def configure(self, dir_host, dir_port): try: os.mkdir(self._data_dir) except: pass self._config['dir_service.host'] = dir_host self._config['dir_service.port'] = dir_port self._config['osd_check_interval'] = 300 self._config['no_atime'] = 'true' self._config['no_fsync'] = 'true' self._config['local_clock_renewal'] = 0 self._config['remote_time_sync'] = 60000 self._config['capability_secret'] = 'testsecret' self._config['database.checkpoint.interval'] = 1800000 self._config['database.checkpoint.idle_interval'] = 1000 self._config['database.checkpoint.logfile_size'] = 16384 self._config['babudb.debug.level'] = self._config['debug.level'] self._config['babudb.logDir'] = self._data_dir self._config['babudb.baseDir'] = self._data_dir self._config['babudb.sync'] = 'ASYNC' self._config['babudb.worker.maxQueueLength'] = '250' self._config['babudb.worker.numThreads'] = '0' self._config['babudb.maxLogfileSize'] = '16777216' self._config['babudb.checkInterval'] = '300' self._config['babudb.pseudoSyncWait'] = '0' self._config['database.dir'] = self._data_dir self._config['database.log'] = self._data_dir self._config['authentication_provider'] = 'org.xtreemfs.common.auth.NullAuthProvider' class OSD(Server): def configure(self, dir_host, dir_port): try: os.mkdir(self._data_dir) except: pass self._config['dir_service.host'] = dir_host self._config['dir_service.port'] = dir_port self._config['local_clock_renewal'] = 0 self._config['remote_time_sync'] = 60000 self._config['capability_secret'] = 'testsecret' self._config['report_free_space'] = 'true' self._config['checksums.enabled'] = 'false' self._config['object_dir'] = self._data_dir # Some tests overload the test system, increase timeouts. self._config['flease.lease_timeout_ms'] = 60000 self._config['flease.message_to_ms'] = 2000