/* * Copyright (c) 2011-2014 by Michael Berlin, Zuse Institute Berlin * 2010-2011 by Patrick Schaefer, Zuse Institute Berlin * * * Licensed under the BSD License, see LICENSE file for details. * */ #include "libxtreemfs/client_implementation.h" #include #include #include #include "libxtreemfs/async_write_handler.h" #include "libxtreemfs/execute_sync_request.h" #include "libxtreemfs/helper.h" #include "libxtreemfs/options.h" #include "libxtreemfs/pbrpc_url.h" #include "libxtreemfs/uuid_iterator.h" #include "libxtreemfs/vivaldi.h" #include "libxtreemfs/volume_implementation.h" #include "libxtreemfs/xtreemfs_exception.h" #include "util/logging.h" #include "util/error_log.h" #include "xtreemfs/DIRServiceClient.h" #include "xtreemfs/MRCServiceClient.h" #include "xtreemfs/OSDServiceClient.h" using namespace std; using namespace xtreemfs::pbrpc; using namespace xtreemfs::util; namespace xtreemfs { static void AddAddresses(const ServiceAddresses& service_addresses, SimpleUUIDIterator* uuid_iterator) { ServiceAddresses::Addresses as_list = service_addresses.GetAddresses(); for (ServiceAddresses::Addresses::const_iterator iter = as_list.begin(); iter != as_list.end(); ++iter) { uuid_iterator->AddUUID(*iter); } } DIRUUIDResolver::DIRUUIDResolver( const ServiceAddresses& dir_addresses, const pbrpc::UserCredentials& user_credentials, const Options& options) : dir_service_user_credentials_(user_credentials), options_(options) { AddAddresses(dir_addresses, &dir_service_addresses_); // Currently no AUTH is needed to access the DIR. dir_service_auth_.set_auth_type(AUTH_NONE); } void DIRUUIDResolver::Initialize(xtreemfs::rpc::Client* network_client) { dir_service_client_.reset(new DIRServiceClient(network_client)); } void DIRUUIDResolver::UUIDToAddress(const std::string& uuid, std::string* address) { UUIDToAddressWithOptions(uuid, address, RPCOptionsFromOptions(options_)); } void DIRUUIDResolver::UUIDToAddressWithOptions(const std::string& uuid, std::string* address, const RPCOptions& options) { // The UUID must never be empty. assert(!uuid.empty()); // Try to search in cache. *address = uuid_cache_.get(uuid); if (!address->empty()) { return; // Cache-Hit. } addressMappingGetRequest rq = addressMappingGetRequest(); rq.set_uuid(uuid); boost::scoped_ptr response( ExecuteSyncRequest( boost::bind( &xtreemfs::pbrpc::DIRServiceClient:: xtreemfs_address_mappings_get_sync, dir_service_client_.get(), _1, boost::cref(dir_service_auth_), boost::cref(dir_service_user_credentials_), &rq), &dir_service_addresses_, NULL, options, true)); boost::unordered_set local_networks = GetNetworks(); AddressMappingSet* set = static_cast( response->response()); AddressMapping found_address_mapping; for (int i = 0; i < set->mappings_size(); i++) { const AddressMapping& am = set->mappings(i); if (am.protocol() != PBRPCURL::GetSchemePBRPC() && am.protocol() != PBRPCURL::GetSchemePBRPCS() && am.protocol() != PBRPCURL::GetSchemePBRPCG() && am.protocol() != PBRPCURL::GetSchemePBRPCU()) { Logging::log->getLog(LEVEL_ERROR) << "Unknown scheme: " << am.protocol() << endl; response->DeleteBuffers(); throw UnknownAddressSchemeException("Unknown scheme: " + am.protocol()); } const string& network = am.match_network(); // Prefer the UUID for a matching network, use the default otherwise. if (network == "*") { found_address_mapping = am; } else { boost::unordered_set::const_iterator local_network = local_networks.find(network); if (local_network != local_networks.end()) { found_address_mapping = am; break; } } } if (found_address_mapping.IsInitialized()) { uuid_cache_.update(uuid, found_address_mapping.address(), found_address_mapping.port(), found_address_mapping.ttl_s()); ostringstream s; s << found_address_mapping.address() << ":" << found_address_mapping.port(); if (Logging::log->loggingActive(LEVEL_DEBUG)) { Logging::log->getLog(LEVEL_DEBUG) << "Service found for UUID: " << s.str() << endl; } response->DeleteBuffers(); *address = s.str(); } else { Logging::log->getLog(LEVEL_ERROR) << "Service not found for UUID: " << uuid << endl; response->DeleteBuffers(); throw AddressToUUIDNotFoundException(uuid); } } void DIRUUIDResolver::VolumeNameToMRCUUID(const std::string& volume_name, std::string* mrc_uuid) { if (Logging::log->loggingActive(LEVEL_DEBUG)) { Logging::log->getLog(LEVEL_DEBUG) << "MRC: searching volume on MRC: " << volume_name << endl; } // Check if there is a @ in the volume_name. // Everything behind the @ has to be removed as it identifies the snapshot. string parsed_volume_name = volume_name; size_t at_pos = volume_name.find("@"); if (at_pos != string::npos) { parsed_volume_name = volume_name.substr(0, at_pos); } serviceGetByNameRequest rq = serviceGetByNameRequest(); rq.set_name(parsed_volume_name); boost::scoped_ptr response( ExecuteSyncRequest( boost::bind( &xtreemfs::pbrpc::DIRServiceClient:: xtreemfs_service_get_by_name_sync, dir_service_client_.get(), _1, boost::cref(dir_service_auth_), boost::cref(dir_service_user_credentials_), &rq), &dir_service_addresses_, NULL, RPCOptionsFromOptions(options_), true)); ServiceSet* service_set = static_cast(response->response()); *mrc_uuid = ""; for (int i = 0; i < service_set->services_size(); i++) { Service service = service_set->services(i); if ((service.type() == SERVICE_TYPE_VOLUME) && (service.name() == parsed_volume_name)) { const ServiceDataMap& data = service.data(); for (int j = 0; j < data.data_size(); j++) { if (data.data(j).key() == "mrc") { *mrc_uuid = data.data(j).value(); break; } } } } response->DeleteBuffers(); if (mrc_uuid->empty()) { Logging::log->getLog(LEVEL_ERROR) << "No MRC found for volume: " << volume_name << std::endl; throw VolumeNotFoundException(volume_name); } } void DIRUUIDResolver::VolumeNameToMRCUUID(const std::string& volume_name, SimpleUUIDIterator* uuid_iterator) { assert(uuid_iterator); if (Logging::log->loggingActive(LEVEL_DEBUG)) { Logging::log->getLog(LEVEL_DEBUG) << "MRC: searching volume on MRC: " << volume_name << endl; } // Check if there is a @ in the volume_name. // Everything behind the @ has to be removed as it identifies the snapshot. string parsed_volume_name = volume_name; size_t at_pos = volume_name.find("@"); if (at_pos != string::npos) { parsed_volume_name = volume_name.substr(0, at_pos); } serviceGetByNameRequest rq = serviceGetByNameRequest(); rq.set_name(parsed_volume_name); boost::scoped_ptr response( ExecuteSyncRequest( boost::bind( &xtreemfs::pbrpc::DIRServiceClient:: xtreemfs_service_get_by_name_sync, dir_service_client_.get(), _1, boost::cref(dir_service_auth_), boost::cref(dir_service_user_credentials_), &rq), &dir_service_addresses_, NULL, RPCOptionsFromOptions(options_), true)); bool mrc_found = false; ServiceSet* service_set = static_cast(response->response()); for (int i = 0; i < service_set->services_size(); i++) { Service service = service_set->services(i); if ((service.type() == SERVICE_TYPE_VOLUME) && (service.name() == parsed_volume_name)) { const ServiceDataMap& data = service.data(); for (int j = 0; j < data.data_size(); j++) { if (data.data(j).key().substr(0, 3) == "mrc") { if (Logging::log->loggingActive(LEVEL_DEBUG)) { Logging::log->getLog(LEVEL_DEBUG) << "MRC with UUID: " << data.data(j).value() << " added (key: " << data.data(j).key() << ")." << std::endl; } uuid_iterator->AddUUID(data.data(j).value()); mrc_found = true; } } } } response->DeleteBuffers(); if (!mrc_found) { Logging::log->getLog(LEVEL_ERROR) << "No MRC found for volume: " << volume_name << std::endl; throw VolumeNotFoundException(volume_name); } } ClientImplementation::ClientImplementation( const ServiceAddresses& dir_service_addresses, const pbrpc::UserCredentials& user_credentials, const rpc::SSLOptions* ssl_options, const Options& options) : was_shutdown_(false), options_(options), dir_service_ssl_options_(ssl_options), uuid_resolver_(dir_service_addresses, user_credentials, options) { // Set bogus auth object. auth_bogus_.set_auth_type(AUTH_NONE); initialize_logger(options.log_level_string, options.log_file_path, LEVEL_WARN); initialize_error_log(20); if (options_.vivaldi_enable) { vivaldi_.reset(new Vivaldi(dir_service_addresses, GetUUIDResolver(), options_)); } if (Logging::log->loggingActive(LEVEL_DEBUG)) { Logging::log->getLog(LEVEL_DEBUG) << "Created a new libxtreemfs Client " "object (version " << options.version_string << ")" << endl; } } ClientImplementation::~ClientImplementation() { Shutdown(); if (!list_open_volumes_.empty()) { string error = "Client::~Client(): Not all XtreemFS volumes were closed." " Did you forget to call Client::Shutdown()? Memory leaks are the" " consequence."; Logging::log->getLog(LEVEL_ERROR) << error << endl; ErrorLog::error_log->AppendError(error); } network_client_->shutdown(); network_client_thread_->join(); // Since we wait for outstanding requests, the RPC client (network_client_) // has to shutdown first and then we can wait for the Vivaldi thread. // The other way around a deadlock might occur. if (vivaldi_thread_.get() && vivaldi_thread_->joinable()) { vivaldi_thread_->join(); } atexit(google::protobuf::ShutdownProtobufLibrary); shutdown_logger(); shutdown_error_log(); } void ClientImplementation::Start() { // start network (rpc) client network_client_.reset(new xtreemfs::rpc::Client( options_.connect_timeout_s, options_.request_timeout_s, options_.linger_timeout_s, dir_service_ssl_options_)); network_client_thread_.reset( new boost::thread(boost::bind(&xtreemfs::rpc::Client::run, network_client_.get()))); GenerateVersion4UUID(&client_uuid_); assert(!client_uuid_.empty()); uuid_resolver_.Initialize(network_client_.get()); // Start vivaldi thread if configured if (options_.vivaldi_enable) { if (Logging::log->loggingActive(LEVEL_INFO)) { Logging::log->getLog(LEVEL_INFO) << "Starting vivaldi..." << endl; } vivaldi_->Initialize(network_client_.get()); vivaldi_thread_.reset(new boost::thread(boost::bind(&xtreemfs::Vivaldi::Run, vivaldi_.get()))); } async_write_callback_thread_.reset( new boost::thread(&xtreemfs::AsyncWriteHandler::ProcessCallbacks, boost::ref(async_write_callback_queue_))); } void ClientImplementation::Shutdown() { if (!was_shutdown_) { was_shutdown_ = true; boost::mutex::scoped_lock lock(list_open_volumes_mutex_); // Issue Close() on every Volume and remove it's pointer. list::iterator it; while (!list_open_volumes_.empty()) { it = list_open_volumes_.begin(); (*it)->CloseInternal(); delete *it; it = list_open_volumes_.erase(it); } if (async_write_callback_thread_->joinable()) { async_write_callback_thread_->interrupt(); async_write_callback_thread_->join(); } // Stop vivaldi thread if running if (vivaldi_thread_.get() && vivaldi_thread_->joinable()) { vivaldi_thread_->interrupt(); } } } Volume* ClientImplementation::OpenVolume( const std::string& volume_name, const xtreemfs::rpc::SSLOptions* ssl_options, const Options& options) { // TODO(mberlin): Fix possible leak through the use of scoped_ptr and swap(). SimpleUUIDIterator* mrc_uuid_iterator = new SimpleUUIDIterator; uuid_resolver_.VolumeNameToMRCUUID(volume_name, mrc_uuid_iterator); VolumeImplementation* volume = new VolumeImplementation( this, client_uuid_, mrc_uuid_iterator, volume_name, ssl_options, options); { boost::mutex::scoped_lock lock(list_open_volumes_mutex_); list_open_volumes_.push_back(volume); } volume->Start(); return volume; } void ClientImplementation::CloseVolume(xtreemfs::Volume* volume) { boost::mutex::scoped_lock lock(list_open_volumes_mutex_); // Find given volume pointer address in list of open volumes and erase it. // Free it afterwards. list::iterator it; for (it = list_open_volumes_.begin(); it != list_open_volumes_.end(); ++it) { if (*it == volume) { // Free Volume object. delete *it; it = list_open_volumes_.erase(it); } } } void ClientImplementation::CreateVolume( const ServiceAddresses& mrc_address, const xtreemfs::pbrpc::Auth& auth, const xtreemfs::pbrpc::UserCredentials& user_credentials, const std::string& volume_name, int mode, const std::string& owner_username, const std::string& owner_groupname, const xtreemfs::pbrpc::AccessControlPolicyType& access_policy, long volume_quota, const xtreemfs::pbrpc::StripingPolicyType& default_striping_policy_type, int default_stripe_size, int default_stripe_width, const std::list& volume_attributes) { MRCServiceClient mrc_service_client(network_client_.get()); xtreemfs::pbrpc::Volume new_volume; new_volume.set_id(""); new_volume.set_mode(mode); new_volume.set_name(volume_name); new_volume.set_owner_user_id(owner_username); new_volume.set_owner_group_id(owner_groupname); new_volume.set_access_control_policy(access_policy); new_volume.set_quota(volume_quota); new_volume.mutable_default_striping_policy() ->set_type(default_striping_policy_type); new_volume.mutable_default_striping_policy() ->set_stripe_size(default_stripe_size); new_volume.mutable_default_striping_policy()->set_width(default_stripe_width); for (list::const_iterator it = volume_attributes.begin(); it != volume_attributes.end(); ++it) { new_volume.add_attrs(); new_volume.mutable_attrs(new_volume.attrs_size() - 1)->set_key((*it)->key()); new_volume.mutable_attrs(new_volume.attrs_size() - 1) ->set_value((*it)->value()); } SimpleUUIDIterator temp_uuid_iterator_with_addresses; AddAddresses(mrc_address, &temp_uuid_iterator_with_addresses); boost::scoped_ptr response( ExecuteSyncRequest( boost::bind( &xtreemfs::pbrpc::MRCServiceClient::xtreemfs_mkvol_sync, &mrc_service_client, _1, boost::cref(auth), boost::cref(user_credentials), &new_volume), &temp_uuid_iterator_with_addresses, NULL, RPCOptionsFromOptions(options_), true)); response->DeleteBuffers(); } void ClientImplementation::DeleteVolume( const ServiceAddresses& mrc_address, const xtreemfs::pbrpc::Auth& auth, const xtreemfs::pbrpc::UserCredentials& user_credentials, const std::string& volume_name) { MRCServiceClient mrc_service_client(network_client_.get()); xtreemfs_rmvolRequest rmvol_request; rmvol_request.set_volume_name(volume_name); SimpleUUIDIterator temp_uuid_iterator_with_addresses; AddAddresses(mrc_address, &temp_uuid_iterator_with_addresses); boost::scoped_ptr response( ExecuteSyncRequest( boost::bind( &xtreemfs::pbrpc::MRCServiceClient::xtreemfs_rmvol_sync, &mrc_service_client, _1, boost::cref(auth), boost::cref(user_credentials), &rmvol_request), &temp_uuid_iterator_with_addresses, NULL, RPCOptionsFromOptions(options_), true)); response->DeleteBuffers(); } xtreemfs::pbrpc::Volumes* ClientImplementation::ListVolumes( const ServiceAddresses& mrc_addresses, const xtreemfs::pbrpc::Auth& auth) { // Create a MRCServiceClient MRCServiceClient mrc_service_client(network_client_.get()); // Use bogus user_credentials; UserCredentials user_credentials; user_credentials.set_username("xtreemfs"); SimpleUUIDIterator mrc_service_addresses_; AddAddresses(mrc_addresses, &mrc_service_addresses_); // Retrieve the list of volumes from the MRC. boost::scoped_ptr response( ExecuteSyncRequest( boost::bind( &xtreemfs::pbrpc::MRCServiceClient::xtreemfs_lsvol_sync, &mrc_service_client, _1, boost::cref(auth), boost::cref(user_credentials)), &mrc_service_addresses_, NULL, RPCOptionsFromOptions(options_), true)); // Delete everything except the response. delete[] response->data(); delete response->error(); // Return the list of volumes. return static_cast(response->response()); } /** ClientImplementation already implements UUIDResolver and therefore this * returns just a cast to this. */ UUIDResolver* ClientImplementation::GetUUIDResolver() { return &uuid_resolver_; } std::string ClientImplementation::UUIDToAddress(const std::string& uuid) { std::string result; uuid_resolver_.UUIDToAddress(uuid, &result); return result; } const VivaldiCoordinates& ClientImplementation::GetVivaldiCoordinates() const { return vivaldi_->GetVivaldiCoordinates(); } util::SynchronizedQueue& ClientImplementation::GetAsyncWriteCallbackQueue() { return async_write_callback_queue_; } } // namespace xtreemfs