#!/usr/bin/python # Copyright (c) 2009-2011 by Bjoern Kolbeck, Zuse Institute Berlin # Licensed under the BSD License, see LICENSE file for details. from config_parser import TestConfig from time import sleep from test_volume import Volume from optparse import OptionParser import test_server, os, traceback, subprocess, shutil, sys, socket, time import datetime import json START_STOP_SERVICE_RETRIES = 3 class TeeLog: def __init__(self, fp): self.file = fp self.stdout = sys.stdout def close(self): self.file.close() def write(self, data): self.file.write(data) self.stdout.write(data) def flush(self): self.file.flush() self.stdout.flush() def fileno(self): return self.file.fileno() def __eq__(self, other): return self.file == other class TestEnvironment: def __init__(self, config_file, test_set_name, volume, xtreemfs_dir, test_dir, debug_level, storage_threads, progress): self.__xtreemfs_dir = xtreemfs_dir self.__test_dir = os.path.abspath(test_dir) self.__debug_level = debug_level self.__config = TestConfig(config_file, test_set_name, volume) self.__volumes = dict() self.__dir = None self.__mrc = None self.__osds = list() self.__storage_threads = storage_threads self.__progress = progress def start(self): try: print 'xtestenv: Starting test environment for the following configuration...' self.__config.printConfig() # find out how many OSDs we need maxOsds = 1 for vc in self.__config.getVolumeConfigs().values(): if vc['stripe_width'] > maxOsds: maxOsds = vc['stripe_width'] if vc['rwr_factor'] > maxOsds: maxOsds = vc['rwr_factor'] if vc['ronly_factor'] > maxOsds: maxOsds = vc['ronly_factor'] if vc.setdefault('min_osds', maxOsds) > maxOsds: maxOsds = vc['min_osds'] print 'xtestenv: Number of OSDs for this test set: ', maxOsds ssl_enabled = False if self.__config.getTestSetConfig()['ssl']: print 'xtestenv: SSL enabled' ssl_enabled = True if self.__config.getTestSetConfig()['mrc_repl']: print 'xtestenv: Replicated MRC not yet supported' if self.__config.getTestSetConfig()['dir_repl']: print 'xtestenv: Replicated DIR not yet supported' snmp_enabled = False if self.__config.getTestSetConfig()['snmp']: print 'xtestenv: SNMP enabled' snmp_enabled = True print 'xtestenv: Creating directories...' try: os.mkdir(self.__test_dir) except: pass try: os.mkdir(os.path.join(self.__test_dir, "config")) except: pass try: os.mkdir(os.path.join(self.__test_dir, "run")) except: pass try: os.mkdir(os.path.join(self.__test_dir, "data")) except: pass try: os.mkdir(os.path.join(self.__test_dir, "log")) except: pass try: os.mkdir(os.path.join(self.__test_dir, "mnt")) except: pass print 'xtestenv: Searching for a free port range...' dir_port = 32638 PORT_SKIP = 3000 while dir_port < 65000: try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('localhost', dir_port)) s.close() break except: print 'xtestenv: port', dir_port, 'is already in use' #traceback.print_exc() dir_port += PORT_SKIP if dir_port > 65000: raise Exception("Cannot get free TCP port") print 'xtestenv: DIR port :', dir_port print 'xtestenv: MRC port :', dir_port - 2 print 'xtestenv: OSD ports:', dir_port + 2 print 'xtestenv: Starting services...' self.__dir = test_server.DIR(START_STOP_SERVICE_RETRIES, os.path.join(self.__test_dir, "config", "dir.properties"), os.path.join(self.__test_dir, "run"), self.__xtreemfs_dir, os.path.join(self.__test_dir, "data", "dir"), dir_port, "test-dir", self.__storage_threads) self.__dir.configure() self.__dir.set_debug_level(self.__debug_level) if ssl_enabled: self.__dir.enable_ssl(False, os.path.join(self.__xtreemfs_dir, "tests", "certs", "DIR.p12"), 'passphrase', os.path.join(self.__xtreemfs_dir, "tests", "certs", "trusted.jks"), 'passphrase') if snmp_enabled: self.__dir.enable_snmp(dir_port+2000,'localhost', os.path.join(self.__xtreemfs_dir,"config","snmp.acl")) self.__dir.write_config_file() self.__mrc = test_server.MRC(START_STOP_SERVICE_RETRIES, os.path.join(self.__test_dir, "config", "mrc.properties"), os.path.join(self.__test_dir, "run"), self.__xtreemfs_dir, os.path.join(self.__test_dir, "data", "mrc"), dir_port - 2, "test-mrc", self.__storage_threads) self.__mrc.configure("localhost", self.__dir.get_rpc_port()) self.__mrc.set_debug_level(self.__debug_level) if ssl_enabled: self.__mrc.enable_ssl(False, os.path.join(self.__xtreemfs_dir, "tests", "certs", "MRC.p12"), 'passphrase', os.path.join(self.__xtreemfs_dir, "tests", "certs", "trusted.jks"), 'passphrase') if snmp_enabled: self.__mrc.enable_snmp(dir_port -2 +2000,'localhost', os.path.join(self.__xtreemfs_dir,"config","snmp.acl")) self.__mrc.write_config_file() for osdNum in range(maxOsds): osd = test_server.OSD(START_STOP_SERVICE_RETRIES, os.path.join(self.__test_dir, "config", "osd"+str(osdNum)+".properties"), os.path.join(self.__test_dir, "run"), self.__xtreemfs_dir, os.path.join(self.__test_dir, "data", "osd"+str(osdNum)), dir_port + 2 +osdNum, "test-osd"+str(osdNum), self.__storage_threads) osd.configure("localhost", self.__dir.get_rpc_port()) osd.set_debug_level(self.__debug_level) if ssl_enabled: osd.enable_ssl(False, os.path.join(self.__xtreemfs_dir, "tests", "certs", "OSD.p12"), 'passphrase', os.path.join(self.__xtreemfs_dir, "tests", "certs", "trusted.jks"), 'passphrase') if snmp_enabled: osd.enable_snmp(dir_port+2000+2+osdNum,'localhost', os.path.join(self.__xtreemfs_dir,"config","snmp.acl")) osd.write_config_file() self.__osds.append(osd) self.__dir.start(os.path.join(self.__test_dir, "log", "dir.log")) self.__mrc.start(os.path.join(self.__test_dir, "log", "mrc.log")) for osdNum in range(maxOsds): self.__osds[osdNum].start(os.path.join(self.__test_dir, "log", "osd"+str(osdNum)+".log")) print 'xtestenv: All services ready. Creating and mounting volumes...' pkcs12_file_path = None pkcs12_passphrase = None if ssl_enabled: pkcs12_file_path = os.path.join(self.__xtreemfs_dir, "tests", "certs", "Client.p12") pkcs12_passphrase = 'passphrase' for k, v in self.__config.getVolumeConfigs().items(): volume = Volume(name=k, mount_point_dir_path=os.path.join(self.__test_dir, "mnt", k), xtreemfs_dir=self.__xtreemfs_dir, debug_level=self.__debug_level, mount_options=v['mount_options'], mkfs_options=v.setdefault('mkfs_options', []), mrc_uri=self.__mrc.getServiceUrl(), dir_uri=self.__dir.getServiceUrl(), pkcs12_file_path=pkcs12_file_path, pkcs12_passphrase=pkcs12_passphrase, stripe_width=v['stripe_width'], stripe_size=v['stripe_size'], rwr_policy=v.setdefault('rwr_policy', 'quorum'), rwr_factor=v['rwr_factor'], ronly_factor=v['ronly_factor']) self.__volumes[k] = volume for k in self.__volumes.keys(): self.__volumes[k].create() self.__volumes[k].mount(os.path.join(self.__test_dir, "log", "client_vol_"+str(k)+".log")) print 'xtestenv: All volumes mounted.' except KeyboardInterrupt: self.stop() raise def stop(self): print 'xtestenv: Shutting down test environment.' print 'xtestenv: Unmounting volumes...' for k in self.__volumes.keys(): self.__volumes[k].unmount() print 'xtestenv: All volumes unmounted.' print 'xtestenv: Stopping services...' for osdNum in range(len(self.__osds)): self.__osds[osdNum].stop() self.__mrc.stop() self.__dir.stop() print 'xtestenv: All services stopped.' def _runSingleTest(self, test, testlog): original_cwd = os.getcwd() perVolumeSuccess = dict() try: for vcName in test['VolumeConfigs']: safe_test_name = test['name'].replace(" ","_") safe_test_name = safe_test_name.replace("/","_") safe_test_name = safe_test_name.replace("\\","_") safe_test_name = safe_test_name.replace(".","_") test_run_dir_path = os.path.join(self.__test_dir, "mnt", vcName, safe_test_name) try: os.mkdir( test_run_dir_path ) except KeyboardInterrupt: raise except: traceback.print_exc() pass # The directory may already exist because of direct/nondirect volume sharing os.chdir(test_run_dir_path) testlog.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " Executing test '" + test['name'] + "' on VolumeConfig '" + vcName + "'\n\n") testlog.flush() sys.stdout.write("xtestenv: Test '" + test['name'] + "' on VolumeConfig '" + vcName + "' ... ") sys.stdout.flush() testargs = list() testargs.append(os.path.join(self.__xtreemfs_dir,"tests", "test_scripts", test['file'])) testargs.append(self.__xtreemfs_dir) testargs.append(self.__dir.getServiceUrl()) testargs.append(self.__mrc.getServiceUrl()) testargs.append(self.__test_dir) # Execute test... t_start = time.time() try: testprocess = subprocess.Popen(testargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for line in iter(testprocess.stdout.readline,''): testlog.write(line) testprocess.wait() if testprocess.returncode is not 0: perVolumeSuccess[vcName] = False else: perVolumeSuccess[vcName] = True except KeyboardInterrupt: perVolumeSuccess[vcName] = False raise except: traceback.print_exc() perVolumeSuccess[vcName] = False t_end = time.time() testlog.write("\n\n==================================================================\n") testlog.flush() if perVolumeSuccess[vcName]: print "ok (", int(t_end - t_start), "s)" else: print "FAILED (", int(t_end - t_start), "s)" os.chdir(original_cwd) # Change back so we can rmtree test_run_dir_path try: shutil.rmtree(test_run_dir_path) except KeyboardInterrupt: raise except: print "xtestenv: error cleaning up test directory", test_run_dir_path traceback.print_exc() except KeyboardInterrupt: os.chdir(original_cwd) raise except: traceback.print_exc() os.chdir(original_cwd) return perVolumeSuccess def _runSystemTest(self, test, testlog): testlog.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " Executing system test '" + test['name'] + "'\n\n") testlog.flush() sys.stdout.write("xtestenv: Executing system test '" + test['name'] + "' ... \n") sys.stdout.flush() testargs = list() testargs.append(os.path.join(self.__xtreemfs_dir,"tests", "test_scripts", test['file'])) testargs.append(self.__xtreemfs_dir) testargs.append(self.__dir.getServiceUrl()) testargs.append(self.__mrc.getServiceUrl()) testargs.append(self.__test_dir) # Execute test... t_start = time.time() success = False try: testprocess = subprocess.Popen(testargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self.__xtreemfs_dir) for line in iter(testprocess.stdout.readline,''): testlog.write(line) testprocess.wait() if testprocess.returncode == 0: success = True except KeyboardInterrupt: raise except: traceback.print_exc() t_end = time.time() testlog.write("\n\n==================================================================\n") testlog.flush() if success: print "ok (", int(t_end - t_start), "s)" else: print "FAILED (", int(t_end - t_start), "s)" return success def runTests(self): try: testlog = open(os.path.join(self.__test_dir, "test.log"), "w") if self.__progress: testlog = TeeLog(testlog) sysTestResults = list() volTestResults = list() print print 'Starting system tests...' for test in self.__config.getSystemTests(): sysTestResults.append(self._runSystemTest(test, testlog)) print 'System tests complete.' print print 'Starting volume tests...' for test in self.__config.getVolumeTests(): volTestResults.append(self._runSingleTest(test, testlog)) failed = False for testResult in sysTestResults: if not testResult: failed = True break for testResult in volTestResults: for k,v in testResult.items(): if v == False: failed = True break testlog.close() self._writeHtml(volTestResults, sysTestResults) self._writeJSON(volTestResults, sysTestResults) print 'Volume tests complete.' print return failed except KeyboardInterrupt: print print 'Tests aborted.' print self.stop() raise def _writeHtml(self, volTestResults, sysTestResults): out = open(os.path.join(self.__test_dir, "result.html"), "w") out.write('XtreemFS Autotest - '+"\n") out.write(''+"\n") out.write(''+"\n") out.write(''+"\n") out.write('') out.write('') out.write(''+"\n") for i in range(len(sysTestResults)): testname = self.__config.getSystemTests()[i]['name'] out.write(''+"\n") out.write('') out.write('') out.write(''+"\n") for i in range(len(volTestResults)): testname = self.__config.getVolumeTests()[i]['name'] out.write(''+"\n") out.write('') out.write('') out.write(''+"\n") out.write('
TestVolume Configs
' + testname + '') if sysTestResults[i]: out.write('ok
') else: out.write('FAILED
') out.write('
' + testname + '') for k,v in volTestResults[i].items(): out.write(k) out.write(":") if v: out.write('ok
') else: out.write('FAILED
') out.write('
'+"\n") out.close() def _writeJSON(self, volTestResults, sysTestResults): results = {} for i in range(len(sysTestResults)): results[self.__config.getSystemTests()[i]['name']] = sysTestResults[i] for i in range(len(volTestResults)): results[self.__config.getVolumeTests()[i]['name']] = volTestResults[i] with open(os.path.join(self.__test_dir, "result.json"), "w") as out: json.dump(results, out) if __name__ == "__main__": xtreemfs_dir = os.path.join(os.path.dirname( os.path.abspath( sys.modules[__name__].__file__ ) ), "..") option_parser = OptionParser( "usage: %prog [options] test_set_name") option_parser.add_option( "-t", "--test-dir", action="store", dest="test_dir", default='/tmp/xtreemfs_xtestenv/', help="Directory where all temporary test files will be stored.") option_parser.add_option( "-v", "--volume", action="store", dest="volume", help="Test a specific volume config.") option_parser.add_option( "--clean-test-dir", action="store_true", dest="clean_test_dir", help="Remove content of --test-dir before executing the test") option_parser.add_option( "-c", "--config-file-path", action="store", dest="config_file_path", default='test_config.py') option_parser.add_option( "-d", "--debug-level", action="store", dest="debug_level", default=6 ) option_parser.add_option( "-x", "--xtreemfs-base-dir", action="store", dest="xtreemfs_dir", default=xtreemfs_dir) option_parser.add_option( "-e", "--start-test-env", action="store_true", dest="start_test_env") option_parser.add_option( "-f", "--start-test-env-with-ssl", action="store_true", dest="start_test_env_with_ssl", help="Same as -e, but with SSL enabled.") option_parser.add_option( "-s", "--storage-threads", action="store", dest="storage_threads", default=1, help="Number of storage threads per OSD.") option_parser.add_option( "-p", "--progress", action="store_true", dest="progress", help="Show progress by printing the test script output to stdout.") if len(sys.argv) == 1: option_parser.print_help() sys.exit(2) options, positional_args = option_parser.parse_args() print 'Reading config from:', options.config_file_path print 'XtreemFS base dir :', options.xtreemfs_dir print 'Test dir :', options.test_dir print 'Debug level :', options.debug_level if options.clean_test_dir and os.path.exists(options.test_dir): print "Cleaning test dir : " + options.test_dir try: for dir_name in ( "config", "run", "data", "log", "mnt" ): shutil.rmtree(os.path.join(options.test_dir, dir_name)) except: print "Failed to clean all directories below: " + options.test_dir if options.start_test_env or options.start_test_env_with_ssl: manual_testset = 'manual' if options.start_test_env_with_ssl: manual_testset = 'manual-ssl' print "Starting test environment (", manual_testset, ")..." testEnv = TestEnvironment(options.config_file_path, manual_testset, options.volume, options.xtreemfs_dir, options.test_dir, options.debug_level, options.storage_threads, options.progress) testEnv.start() try: print print "*** Press enter to shut down test environment ***" raw_input() except KeyboardInterrupt: pass testEnv.stop() else: # run tests if len(positional_args) is not 1: option_parser.print_usage(sys.stdout) sys.exit(2) print 'Executing test set :', positional_args[0] testEnv = TestEnvironment(options.config_file_path, positional_args[0], options.volume, options.xtreemfs_dir, options.test_dir, options.debug_level, options.storage_threads, options.progress) testEnv.start() try: failed = testEnv.runTests() except: traceback.print_exc() failed = True testEnv.stop() if failed: print 'FAILED!' sys.exit(1) else: print 'SUCCESS.'