564 lines
23 KiB
Python
Executable File
564 lines
23 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
# Copyright (c) 2009-2011 by Bjoern Kolbeck, Zuse Institute Berlin
|
|
# Licensed under the BSD License, see LICENSE file for details.
|
|
|
|
from config_parser import TestConfig
|
|
from time import sleep
|
|
from test_volume import Volume
|
|
from optparse import OptionParser
|
|
import test_server, os, traceback, subprocess, shutil, sys, socket, time
|
|
import datetime
|
|
import json
|
|
|
|
START_STOP_SERVICE_RETRIES = 3
|
|
|
|
class TeeLog:
|
|
def __init__(self, fp):
|
|
self.file = fp
|
|
self.stdout = sys.stdout
|
|
|
|
def close(self):
|
|
self.file.close()
|
|
|
|
def write(self, data):
|
|
self.file.write(data)
|
|
self.stdout.write(data)
|
|
|
|
def flush(self):
|
|
self.file.flush()
|
|
self.stdout.flush()
|
|
|
|
def fileno(self):
|
|
return self.file.fileno()
|
|
|
|
def __eq__(self, other):
|
|
return self.file == other
|
|
|
|
|
|
class TestEnvironment:
|
|
def __init__(self,
|
|
config_file,
|
|
test_set_name,
|
|
volume,
|
|
xtreemfs_dir,
|
|
test_dir,
|
|
debug_level,
|
|
storage_threads,
|
|
progress):
|
|
self.__xtreemfs_dir = xtreemfs_dir
|
|
self.__test_dir = os.path.abspath(test_dir)
|
|
self.__debug_level = debug_level
|
|
self.__config = TestConfig(config_file, test_set_name, volume)
|
|
self.__volumes = dict()
|
|
self.__dir = None
|
|
self.__mrc = None
|
|
self.__osds = list()
|
|
self.__storage_threads = storage_threads
|
|
self.__progress = progress
|
|
|
|
def start(self):
|
|
try:
|
|
print 'xtestenv: Starting test environment for the following configuration...'
|
|
self.__config.printConfig()
|
|
|
|
# find out how many OSDs we need
|
|
maxOsds = 1
|
|
for vc in self.__config.getVolumeConfigs().values():
|
|
if vc['stripe_width'] > maxOsds:
|
|
maxOsds = vc['stripe_width']
|
|
if vc['rwr_factor'] > maxOsds:
|
|
maxOsds = vc['rwr_factor']
|
|
if vc['ronly_factor'] > maxOsds:
|
|
maxOsds = vc['ronly_factor']
|
|
if vc.setdefault('min_osds', maxOsds) > maxOsds:
|
|
maxOsds = vc['min_osds']
|
|
print 'xtestenv: Number of OSDs for this test set: ', maxOsds
|
|
|
|
ssl_enabled = False
|
|
if self.__config.getTestSetConfig()['ssl']:
|
|
print 'xtestenv: SSL enabled'
|
|
ssl_enabled = True
|
|
|
|
if self.__config.getTestSetConfig()['mrc_repl']:
|
|
print 'xtestenv: Replicated MRC not yet supported'
|
|
|
|
if self.__config.getTestSetConfig()['dir_repl']:
|
|
print 'xtestenv: Replicated DIR not yet supported'
|
|
|
|
snmp_enabled = False
|
|
if self.__config.getTestSetConfig()['snmp']:
|
|
print 'xtestenv: SNMP enabled'
|
|
snmp_enabled = True
|
|
|
|
print 'xtestenv: Creating directories...'
|
|
try: os.mkdir(self.__test_dir)
|
|
except: pass
|
|
|
|
try: os.mkdir(os.path.join(self.__test_dir, "config"))
|
|
except: pass
|
|
|
|
try: os.mkdir(os.path.join(self.__test_dir, "run"))
|
|
except: pass
|
|
|
|
try: os.mkdir(os.path.join(self.__test_dir, "data"))
|
|
except: pass
|
|
|
|
try: os.mkdir(os.path.join(self.__test_dir, "log"))
|
|
except: pass
|
|
|
|
try: os.mkdir(os.path.join(self.__test_dir, "mnt"))
|
|
except: pass
|
|
|
|
print 'xtestenv: Searching for a free port range...'
|
|
|
|
dir_port = 32638
|
|
PORT_SKIP = 3000
|
|
|
|
while dir_port < 65000:
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
s.bind(('localhost', dir_port))
|
|
s.close()
|
|
break
|
|
except:
|
|
print 'xtestenv: port', dir_port, 'is already in use'
|
|
#traceback.print_exc()
|
|
dir_port += PORT_SKIP
|
|
if dir_port > 65000:
|
|
raise Exception("Cannot get free TCP port")
|
|
print 'xtestenv: DIR port :', dir_port
|
|
print 'xtestenv: MRC port :', dir_port - 2
|
|
print 'xtestenv: OSD ports:', dir_port + 2
|
|
|
|
print 'xtestenv: Starting services...'
|
|
|
|
self.__dir = test_server.DIR(START_STOP_SERVICE_RETRIES,
|
|
os.path.join(self.__test_dir, "config", "dir.properties"),
|
|
os.path.join(self.__test_dir, "run"),
|
|
self.__xtreemfs_dir,
|
|
os.path.join(self.__test_dir, "data", "dir"),
|
|
dir_port,
|
|
"test-dir",
|
|
self.__storage_threads)
|
|
self.__dir.configure()
|
|
self.__dir.set_debug_level(self.__debug_level)
|
|
if ssl_enabled:
|
|
self.__dir.enable_ssl(False,
|
|
os.path.join(self.__xtreemfs_dir, "tests", "certs", "DIR.p12"),
|
|
'passphrase',
|
|
os.path.join(self.__xtreemfs_dir, "tests", "certs", "trusted.jks"),
|
|
'passphrase')
|
|
|
|
if snmp_enabled:
|
|
self.__dir.enable_snmp(dir_port+2000,'localhost',
|
|
os.path.join(self.__xtreemfs_dir,"config","snmp.acl"))
|
|
|
|
self.__dir.write_config_file()
|
|
|
|
self.__mrc = test_server.MRC(START_STOP_SERVICE_RETRIES,
|
|
os.path.join(self.__test_dir, "config", "mrc.properties"),
|
|
os.path.join(self.__test_dir, "run"),
|
|
self.__xtreemfs_dir,
|
|
os.path.join(self.__test_dir, "data", "mrc"),
|
|
dir_port - 2,
|
|
"test-mrc",
|
|
self.__storage_threads)
|
|
self.__mrc.configure("localhost", self.__dir.get_rpc_port())
|
|
self.__mrc.set_debug_level(self.__debug_level)
|
|
if ssl_enabled:
|
|
self.__mrc.enable_ssl(False,
|
|
os.path.join(self.__xtreemfs_dir, "tests", "certs", "MRC.p12"),
|
|
'passphrase',
|
|
os.path.join(self.__xtreemfs_dir, "tests", "certs", "trusted.jks"),
|
|
'passphrase')
|
|
if snmp_enabled:
|
|
self.__mrc.enable_snmp(dir_port -2 +2000,'localhost',
|
|
os.path.join(self.__xtreemfs_dir,"config","snmp.acl"))
|
|
|
|
self.__mrc.write_config_file()
|
|
|
|
for osdNum in range(maxOsds):
|
|
osd = test_server.OSD(START_STOP_SERVICE_RETRIES,
|
|
os.path.join(self.__test_dir, "config", "osd"+str(osdNum)+".properties"),
|
|
os.path.join(self.__test_dir, "run"),
|
|
self.__xtreemfs_dir,
|
|
os.path.join(self.__test_dir, "data", "osd"+str(osdNum)),
|
|
dir_port + 2 +osdNum,
|
|
"test-osd"+str(osdNum),
|
|
self.__storage_threads)
|
|
osd.configure("localhost", self.__dir.get_rpc_port())
|
|
osd.set_debug_level(self.__debug_level)
|
|
if ssl_enabled:
|
|
osd.enable_ssl(False,
|
|
os.path.join(self.__xtreemfs_dir, "tests", "certs", "OSD.p12"),
|
|
'passphrase',
|
|
os.path.join(self.__xtreemfs_dir, "tests", "certs", "trusted.jks"),
|
|
'passphrase')
|
|
|
|
if snmp_enabled:
|
|
osd.enable_snmp(dir_port+2000+2+osdNum,'localhost',
|
|
os.path.join(self.__xtreemfs_dir,"config","snmp.acl"))
|
|
osd.write_config_file()
|
|
self.__osds.append(osd)
|
|
|
|
self.__dir.start(os.path.join(self.__test_dir, "log", "dir.log"))
|
|
self.__mrc.start(os.path.join(self.__test_dir, "log", "mrc.log"))
|
|
for osdNum in range(maxOsds):
|
|
self.__osds[osdNum].start(os.path.join(self.__test_dir, "log", "osd"+str(osdNum)+".log"))
|
|
|
|
print 'xtestenv: All services ready. Creating and mounting volumes...'
|
|
|
|
pkcs12_file_path = None
|
|
pkcs12_passphrase = None
|
|
if ssl_enabled:
|
|
pkcs12_file_path = os.path.join(self.__xtreemfs_dir, "tests", "certs", "Client.p12")
|
|
pkcs12_passphrase = 'passphrase'
|
|
|
|
for k, v in self.__config.getVolumeConfigs().items():
|
|
volume = Volume(name=k,
|
|
mount_point_dir_path=os.path.join(self.__test_dir, "mnt", k),
|
|
xtreemfs_dir=self.__xtreemfs_dir,
|
|
debug_level=self.__debug_level,
|
|
mount_options=v['mount_options'],
|
|
mkfs_options=v.setdefault('mkfs_options', []),
|
|
mrc_uri=self.__mrc.getServiceUrl(),
|
|
dir_uri=self.__dir.getServiceUrl(),
|
|
pkcs12_file_path=pkcs12_file_path,
|
|
pkcs12_passphrase=pkcs12_passphrase,
|
|
stripe_width=v['stripe_width'],
|
|
stripe_size=v['stripe_size'],
|
|
rwr_policy=v.setdefault('rwr_policy', 'quorum'),
|
|
rwr_factor=v['rwr_factor'],
|
|
ronly_factor=v['ronly_factor'])
|
|
self.__volumes[k] = volume
|
|
|
|
for k in self.__volumes.keys():
|
|
self.__volumes[k].create()
|
|
self.__volumes[k].mount(os.path.join(self.__test_dir, "log", "client_vol_"+str(k)+".log"))
|
|
|
|
print 'xtestenv: All volumes mounted.'
|
|
except KeyboardInterrupt:
|
|
self.stop()
|
|
raise
|
|
|
|
def stop(self):
|
|
print 'xtestenv: Shutting down test environment.'
|
|
print 'xtestenv: Unmounting volumes...'
|
|
|
|
for k in self.__volumes.keys():
|
|
self.__volumes[k].unmount()
|
|
|
|
print 'xtestenv: All volumes unmounted.'
|
|
|
|
print 'xtestenv: Stopping services...'
|
|
|
|
for osdNum in range(len(self.__osds)):
|
|
self.__osds[osdNum].stop()
|
|
|
|
self.__mrc.stop()
|
|
self.__dir.stop()
|
|
print 'xtestenv: All services stopped.'
|
|
|
|
|
|
def _runSingleTest(self, test, testlog):
|
|
original_cwd = os.getcwd()
|
|
perVolumeSuccess = dict()
|
|
|
|
try:
|
|
for vcName in test['VolumeConfigs']:
|
|
safe_test_name = test['name'].replace(" ","_")
|
|
safe_test_name = safe_test_name.replace("/","_")
|
|
safe_test_name = safe_test_name.replace("\\","_")
|
|
safe_test_name = safe_test_name.replace(".","_")
|
|
test_run_dir_path = os.path.join(self.__test_dir, "mnt", vcName, safe_test_name)
|
|
try:
|
|
os.mkdir( test_run_dir_path )
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except:
|
|
traceback.print_exc()
|
|
pass # The directory may already exist because of direct/nondirect volume sharing
|
|
|
|
os.chdir(test_run_dir_path)
|
|
testlog.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " Executing test '" + test['name'] + "' on VolumeConfig '" + vcName + "'\n\n")
|
|
testlog.flush()
|
|
sys.stdout.write("xtestenv: Test '" + test['name'] + "' on VolumeConfig '" + vcName + "' ... ")
|
|
sys.stdout.flush()
|
|
testargs = list()
|
|
testargs.append(os.path.join(self.__xtreemfs_dir,"tests", "test_scripts", test['file']))
|
|
testargs.append(self.__xtreemfs_dir)
|
|
testargs.append(self.__dir.getServiceUrl())
|
|
testargs.append(self.__mrc.getServiceUrl())
|
|
testargs.append(self.__test_dir)
|
|
# Execute test...
|
|
t_start = time.time()
|
|
try:
|
|
testprocess = subprocess.Popen(testargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
for line in iter(testprocess.stdout.readline,''):
|
|
testlog.write(line)
|
|
testprocess.wait()
|
|
if testprocess.returncode is not 0:
|
|
perVolumeSuccess[vcName] = False
|
|
else:
|
|
perVolumeSuccess[vcName] = True
|
|
except KeyboardInterrupt:
|
|
perVolumeSuccess[vcName] = False
|
|
raise
|
|
except:
|
|
traceback.print_exc()
|
|
perVolumeSuccess[vcName] = False
|
|
|
|
t_end = time.time()
|
|
|
|
testlog.write("\n\n==================================================================\n")
|
|
testlog.flush()
|
|
|
|
if perVolumeSuccess[vcName]:
|
|
print "ok (", int(t_end - t_start), "s)"
|
|
else:
|
|
print "FAILED (", int(t_end - t_start), "s)"
|
|
|
|
os.chdir(original_cwd) # Change back so we can rmtree test_run_dir_path
|
|
try:
|
|
shutil.rmtree(test_run_dir_path)
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except:
|
|
print "xtestenv: error cleaning up test directory", test_run_dir_path
|
|
traceback.print_exc()
|
|
|
|
except KeyboardInterrupt:
|
|
os.chdir(original_cwd)
|
|
raise
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
os.chdir(original_cwd)
|
|
return perVolumeSuccess
|
|
|
|
def _runSystemTest(self, test, testlog):
|
|
testlog.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " Executing system test '" + test['name'] + "'\n\n")
|
|
testlog.flush()
|
|
sys.stdout.write("xtestenv: Executing system test '" + test['name'] + "' ... \n")
|
|
sys.stdout.flush()
|
|
testargs = list()
|
|
testargs.append(os.path.join(self.__xtreemfs_dir,"tests", "test_scripts", test['file']))
|
|
testargs.append(self.__xtreemfs_dir)
|
|
testargs.append(self.__dir.getServiceUrl())
|
|
testargs.append(self.__mrc.getServiceUrl())
|
|
testargs.append(self.__test_dir)
|
|
# Execute test...
|
|
t_start = time.time()
|
|
success = False
|
|
try:
|
|
testprocess = subprocess.Popen(testargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self.__xtreemfs_dir)
|
|
for line in iter(testprocess.stdout.readline,''):
|
|
testlog.write(line)
|
|
testprocess.wait()
|
|
if testprocess.returncode == 0:
|
|
success = True
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
t_end = time.time()
|
|
|
|
testlog.write("\n\n==================================================================\n")
|
|
testlog.flush()
|
|
|
|
if success:
|
|
print "ok (", int(t_end - t_start), "s)"
|
|
else:
|
|
print "FAILED (", int(t_end - t_start), "s)"
|
|
|
|
return success
|
|
|
|
def runTests(self):
|
|
try:
|
|
testlog = open(os.path.join(self.__test_dir, "test.log"), "w")
|
|
if self.__progress:
|
|
testlog = TeeLog(testlog)
|
|
|
|
sysTestResults = list()
|
|
volTestResults = list()
|
|
|
|
print
|
|
print 'Starting system tests...'
|
|
for test in self.__config.getSystemTests():
|
|
sysTestResults.append(self._runSystemTest(test,
|
|
testlog))
|
|
print 'System tests complete.'
|
|
|
|
print
|
|
print 'Starting volume tests...'
|
|
for test in self.__config.getVolumeTests():
|
|
volTestResults.append(self._runSingleTest(test,
|
|
testlog))
|
|
|
|
failed = False
|
|
for testResult in sysTestResults:
|
|
if not testResult:
|
|
failed = True
|
|
break
|
|
for testResult in volTestResults:
|
|
for k,v in testResult.items():
|
|
if v == False:
|
|
failed = True
|
|
break
|
|
|
|
testlog.close()
|
|
self._writeHtml(volTestResults, sysTestResults)
|
|
self._writeJSON(volTestResults, sysTestResults)
|
|
print 'Volume tests complete.'
|
|
print
|
|
return failed
|
|
except KeyboardInterrupt:
|
|
print
|
|
print 'Tests aborted.'
|
|
print
|
|
self.stop()
|
|
raise
|
|
|
|
def _writeHtml(self, volTestResults, sysTestResults):
|
|
out = open(os.path.join(self.__test_dir, "result.html"), "w")
|
|
|
|
out.write('<HTML><HEAD><TITLE>XtreemFS Autotest - </TITLE></HEAD>'+"\n")
|
|
out.write('<BODY>'+"\n")
|
|
out.write('<TABLE>'+"\n")
|
|
out.write('<TR>'+"\n")
|
|
out.write('<TD><B>Test</B></TD>')
|
|
out.write('<TD><B>Volume Configs</B></TD>')
|
|
out.write('</TR>'+"\n")
|
|
|
|
|
|
for i in range(len(sysTestResults)):
|
|
testname = self.__config.getSystemTests()[i]['name']
|
|
out.write('<TR>'+"\n")
|
|
out.write('<TD>' + testname + '</TD>')
|
|
out.write('<TD>')
|
|
if sysTestResults[i]:
|
|
out.write('ok<BR>')
|
|
else:
|
|
out.write('FAILED<BR>')
|
|
out.write('</TD>')
|
|
out.write('</TR>'+"\n")
|
|
|
|
for i in range(len(volTestResults)):
|
|
testname = self.__config.getVolumeTests()[i]['name']
|
|
out.write('<TR>'+"\n")
|
|
out.write('<TD>' + testname + '</TD>')
|
|
out.write('<TD>')
|
|
for k,v in volTestResults[i].items():
|
|
out.write(k)
|
|
out.write(":")
|
|
if v:
|
|
out.write('ok<BR>')
|
|
else:
|
|
out.write('FAILED<BR>')
|
|
out.write('</TD>')
|
|
out.write('</TR>'+"\n")
|
|
|
|
out.write('</TABLE></BODY></HTML>'+"\n")
|
|
out.close()
|
|
|
|
def _writeJSON(self, volTestResults, sysTestResults):
|
|
results = {}
|
|
for i in range(len(sysTestResults)):
|
|
results[self.__config.getSystemTests()[i]['name']] = sysTestResults[i]
|
|
|
|
for i in range(len(volTestResults)):
|
|
results[self.__config.getVolumeTests()[i]['name']] = volTestResults[i]
|
|
|
|
with open(os.path.join(self.__test_dir, "result.json"), "w") as out:
|
|
json.dump(results, out)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
xtreemfs_dir = os.path.join(os.path.dirname( os.path.abspath( sys.modules[__name__].__file__ ) ), "..")
|
|
|
|
option_parser = OptionParser( "usage: %prog [options] test_set_name")
|
|
option_parser.add_option( "-t", "--test-dir", action="store", dest="test_dir", default='/tmp/xtreemfs_xtestenv/', help="Directory where all temporary test files will be stored.")
|
|
option_parser.add_option( "-v", "--volume", action="store", dest="volume", help="Test a specific volume config.")
|
|
option_parser.add_option( "--clean-test-dir", action="store_true", dest="clean_test_dir", help="Remove content of --test-dir before executing the test")
|
|
option_parser.add_option( "-c", "--config-file-path", action="store", dest="config_file_path", default='test_config.py')
|
|
option_parser.add_option( "-d", "--debug-level", action="store", dest="debug_level", default=6 )
|
|
option_parser.add_option( "-x", "--xtreemfs-base-dir", action="store", dest="xtreemfs_dir", default=xtreemfs_dir)
|
|
option_parser.add_option( "-e", "--start-test-env", action="store_true", dest="start_test_env")
|
|
option_parser.add_option( "-f", "--start-test-env-with-ssl", action="store_true", dest="start_test_env_with_ssl", help="Same as -e, but with SSL enabled.")
|
|
option_parser.add_option( "-s", "--storage-threads", action="store", dest="storage_threads", default=1, help="Number of storage threads per OSD.")
|
|
option_parser.add_option( "-p", "--progress", action="store_true", dest="progress", help="Show progress by printing the test script output to stdout.")
|
|
|
|
if len(sys.argv) == 1:
|
|
option_parser.print_help()
|
|
sys.exit(2)
|
|
options, positional_args = option_parser.parse_args()
|
|
|
|
print 'Reading config from:', options.config_file_path
|
|
print 'XtreemFS base dir :', options.xtreemfs_dir
|
|
print 'Test dir :', options.test_dir
|
|
print 'Debug level :', options.debug_level
|
|
|
|
if options.clean_test_dir and os.path.exists(options.test_dir):
|
|
print "Cleaning test dir : " + options.test_dir
|
|
try:
|
|
for dir_name in ( "config", "run", "data", "log", "mnt" ):
|
|
shutil.rmtree(os.path.join(options.test_dir, dir_name))
|
|
except:
|
|
print "Failed to clean all directories below: " + options.test_dir
|
|
|
|
if options.start_test_env or options.start_test_env_with_ssl:
|
|
manual_testset = 'manual'
|
|
if options.start_test_env_with_ssl:
|
|
manual_testset = 'manual-ssl'
|
|
print "Starting test environment (", manual_testset, ")..."
|
|
testEnv = TestEnvironment(options.config_file_path,
|
|
manual_testset,
|
|
options.volume,
|
|
options.xtreemfs_dir,
|
|
options.test_dir,
|
|
options.debug_level,
|
|
options.storage_threads,
|
|
options.progress)
|
|
testEnv.start()
|
|
try:
|
|
print
|
|
print "*** Press enter to shut down test environment ***"
|
|
raw_input()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
testEnv.stop()
|
|
|
|
|
|
else:
|
|
# run tests
|
|
if len(positional_args) is not 1:
|
|
option_parser.print_usage(sys.stdout)
|
|
sys.exit(2)
|
|
|
|
print 'Executing test set :', positional_args[0]
|
|
|
|
testEnv = TestEnvironment(options.config_file_path,
|
|
positional_args[0],
|
|
options.volume,
|
|
options.xtreemfs_dir,
|
|
options.test_dir,
|
|
options.debug_level,
|
|
options.storage_threads,
|
|
options.progress)
|
|
testEnv.start()
|
|
try:
|
|
failed = testEnv.runTests()
|
|
except:
|
|
traceback.print_exc()
|
|
failed = True
|
|
testEnv.stop()
|
|
|
|
if failed:
|
|
print 'FAILED!'
|
|
sys.exit(1)
|
|
else:
|
|
print 'SUCCESS.'
|