Files
xtreemfs/cpp/src/libxtreemfs/client_implementation.cpp
2020-09-22 02:25:22 +02:00

568 lines
19 KiB
C++

/*
* Copyright (c) 2011-2014 by Michael Berlin, Zuse Institute Berlin
* 2010-2011 by Patrick Schaefer, Zuse Institute Berlin
*
*
* Licensed under the BSD License, see LICENSE file for details.
*
*/
#include "libxtreemfs/client_implementation.h"
#include <cstdlib>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include "libxtreemfs/async_write_handler.h"
#include "libxtreemfs/execute_sync_request.h"
#include "libxtreemfs/helper.h"
#include "libxtreemfs/options.h"
#include "libxtreemfs/pbrpc_url.h"
#include "libxtreemfs/uuid_iterator.h"
#include "libxtreemfs/vivaldi.h"
#include "libxtreemfs/volume_implementation.h"
#include "libxtreemfs/xtreemfs_exception.h"
#include "util/logging.h"
#include "util/error_log.h"
#include "xtreemfs/DIRServiceClient.h"
#include "xtreemfs/MRCServiceClient.h"
#include "xtreemfs/OSDServiceClient.h"
using namespace std;
using namespace xtreemfs::pbrpc;
using namespace xtreemfs::util;
namespace xtreemfs {
static void AddAddresses(const ServiceAddresses& service_addresses,
SimpleUUIDIterator* uuid_iterator) {
ServiceAddresses::Addresses as_list = service_addresses.GetAddresses();
for (ServiceAddresses::Addresses::const_iterator iter = as_list.begin();
iter != as_list.end(); ++iter) {
uuid_iterator->AddUUID(*iter);
}
}
DIRUUIDResolver::DIRUUIDResolver(
const ServiceAddresses& dir_addresses,
const pbrpc::UserCredentials& user_credentials,
const Options& options)
: dir_service_user_credentials_(user_credentials),
options_(options) {
AddAddresses(dir_addresses, &dir_service_addresses_);
// Currently no AUTH is needed to access the DIR.
dir_service_auth_.set_auth_type(AUTH_NONE);
}
void DIRUUIDResolver::Initialize(xtreemfs::rpc::Client* network_client) {
dir_service_client_.reset(new DIRServiceClient(network_client));
}
void DIRUUIDResolver::UUIDToAddress(const std::string& uuid,
std::string* address) {
UUIDToAddressWithOptions(uuid, address, RPCOptionsFromOptions(options_));
}
void DIRUUIDResolver::UUIDToAddressWithOptions(const std::string& uuid,
std::string* address,
const RPCOptions& options) {
// The UUID must never be empty.
assert(!uuid.empty());
// Try to search in cache.
*address = uuid_cache_.get(uuid);
if (!address->empty()) {
return; // Cache-Hit.
}
addressMappingGetRequest rq = addressMappingGetRequest();
rq.set_uuid(uuid);
boost::scoped_ptr<rpc::SyncCallbackBase> response(
ExecuteSyncRequest(
boost::bind(
&xtreemfs::pbrpc::DIRServiceClient::
xtreemfs_address_mappings_get_sync,
dir_service_client_.get(),
_1,
boost::cref(dir_service_auth_),
boost::cref(dir_service_user_credentials_),
&rq),
&dir_service_addresses_,
NULL,
options,
true));
boost::unordered_set<string> local_networks = GetNetworks();
AddressMappingSet* set = static_cast<AddressMappingSet*>(
response->response());
AddressMapping found_address_mapping;
for (int i = 0; i < set->mappings_size(); i++) {
const AddressMapping& am = set->mappings(i);
if (am.protocol() != PBRPCURL::GetSchemePBRPC()
&& am.protocol() != PBRPCURL::GetSchemePBRPCS()
&& am.protocol() != PBRPCURL::GetSchemePBRPCG()
&& am.protocol() != PBRPCURL::GetSchemePBRPCU()) {
Logging::log->getLog(LEVEL_ERROR)
<< "Unknown scheme: " << am.protocol() << endl;
response->DeleteBuffers();
throw UnknownAddressSchemeException("Unknown scheme: " + am.protocol());
}
const string& network = am.match_network();
// Prefer the UUID for a matching network, use the default otherwise.
if (network == "*") {
found_address_mapping = am;
} else {
boost::unordered_set<string>::const_iterator local_network
= local_networks.find(network);
if (local_network != local_networks.end()) {
found_address_mapping = am;
break;
}
}
}
if (found_address_mapping.IsInitialized()) {
uuid_cache_.update(uuid,
found_address_mapping.address(),
found_address_mapping.port(),
found_address_mapping.ttl_s());
ostringstream s;
s << found_address_mapping.address() << ":" << found_address_mapping.port();
if (Logging::log->loggingActive(LEVEL_DEBUG)) {
Logging::log->getLog(LEVEL_DEBUG)
<< "Service found for UUID: " << s.str() << endl;
}
response->DeleteBuffers();
*address = s.str();
} else {
Logging::log->getLog(LEVEL_ERROR)
<< "Service not found for UUID: " << uuid << endl;
response->DeleteBuffers();
throw AddressToUUIDNotFoundException(uuid);
}
}
void DIRUUIDResolver::VolumeNameToMRCUUID(const std::string& volume_name,
std::string* mrc_uuid) {
if (Logging::log->loggingActive(LEVEL_DEBUG)) {
Logging::log->getLog(LEVEL_DEBUG)
<< "MRC: searching volume on MRC: " << volume_name << endl;
}
// Check if there is a @ in the volume_name.
// Everything behind the @ has to be removed as it identifies the snapshot.
string parsed_volume_name = volume_name;
size_t at_pos = volume_name.find("@");
if (at_pos != string::npos) {
parsed_volume_name = volume_name.substr(0, at_pos);
}
serviceGetByNameRequest rq = serviceGetByNameRequest();
rq.set_name(parsed_volume_name);
boost::scoped_ptr<rpc::SyncCallbackBase> response(
ExecuteSyncRequest(
boost::bind(
&xtreemfs::pbrpc::DIRServiceClient::
xtreemfs_service_get_by_name_sync,
dir_service_client_.get(),
_1,
boost::cref(dir_service_auth_),
boost::cref(dir_service_user_credentials_),
&rq),
&dir_service_addresses_,
NULL,
RPCOptionsFromOptions(options_),
true));
ServiceSet* service_set = static_cast<ServiceSet*>(response->response());
*mrc_uuid = "";
for (int i = 0; i < service_set->services_size(); i++) {
Service service = service_set->services(i);
if ((service.type() == SERVICE_TYPE_VOLUME)
&& (service.name() == parsed_volume_name)) {
const ServiceDataMap& data = service.data();
for (int j = 0; j < data.data_size(); j++) {
if (data.data(j).key() == "mrc") {
*mrc_uuid = data.data(j).value();
break;
}
}
}
}
response->DeleteBuffers();
if (mrc_uuid->empty()) {
Logging::log->getLog(LEVEL_ERROR) << "No MRC found for volume: "
<< volume_name << std::endl;
throw VolumeNotFoundException(volume_name);
}
}
void DIRUUIDResolver::VolumeNameToMRCUUID(const std::string& volume_name,
SimpleUUIDIterator* uuid_iterator) {
assert(uuid_iterator);
if (Logging::log->loggingActive(LEVEL_DEBUG)) {
Logging::log->getLog(LEVEL_DEBUG)
<< "MRC: searching volume on MRC: " << volume_name << endl;
}
// Check if there is a @ in the volume_name.
// Everything behind the @ has to be removed as it identifies the snapshot.
string parsed_volume_name = volume_name;
size_t at_pos = volume_name.find("@");
if (at_pos != string::npos) {
parsed_volume_name = volume_name.substr(0, at_pos);
}
serviceGetByNameRequest rq = serviceGetByNameRequest();
rq.set_name(parsed_volume_name);
boost::scoped_ptr<rpc::SyncCallbackBase> response(
ExecuteSyncRequest(
boost::bind(
&xtreemfs::pbrpc::DIRServiceClient::
xtreemfs_service_get_by_name_sync,
dir_service_client_.get(),
_1,
boost::cref(dir_service_auth_),
boost::cref(dir_service_user_credentials_),
&rq),
&dir_service_addresses_,
NULL,
RPCOptionsFromOptions(options_),
true));
bool mrc_found = false;
ServiceSet* service_set = static_cast<ServiceSet*>(response->response());
for (int i = 0; i < service_set->services_size(); i++) {
Service service = service_set->services(i);
if ((service.type() == SERVICE_TYPE_VOLUME)
&& (service.name() == parsed_volume_name)) {
const ServiceDataMap& data = service.data();
for (int j = 0; j < data.data_size(); j++) {
if (data.data(j).key().substr(0, 3) == "mrc") {
if (Logging::log->loggingActive(LEVEL_DEBUG)) {
Logging::log->getLog(LEVEL_DEBUG)
<< "MRC with UUID: " << data.data(j).value()
<< " added (key: " << data.data(j).key() << ")." << std::endl;
}
uuid_iterator->AddUUID(data.data(j).value());
mrc_found = true;
}
}
}
}
response->DeleteBuffers();
if (!mrc_found) {
Logging::log->getLog(LEVEL_ERROR) << "No MRC found for volume: "
<< volume_name << std::endl;
throw VolumeNotFoundException(volume_name);
}
}
ClientImplementation::ClientImplementation(
const ServiceAddresses& dir_service_addresses,
const pbrpc::UserCredentials& user_credentials,
const rpc::SSLOptions* ssl_options,
const Options& options)
: was_shutdown_(false),
options_(options),
dir_service_ssl_options_(ssl_options),
uuid_resolver_(dir_service_addresses,
user_credentials,
options) {
// Set bogus auth object.
auth_bogus_.set_auth_type(AUTH_NONE);
initialize_logger(options.log_level_string,
options.log_file_path,
LEVEL_WARN);
initialize_error_log(20);
if (options_.vivaldi_enable) {
vivaldi_.reset(new Vivaldi(dir_service_addresses,
GetUUIDResolver(),
options_));
}
if (Logging::log->loggingActive(LEVEL_DEBUG)) {
Logging::log->getLog(LEVEL_DEBUG) << "Created a new libxtreemfs Client "
"object (version " << options.version_string << ")" << endl;
}
}
ClientImplementation::~ClientImplementation() {
Shutdown();
if (!list_open_volumes_.empty()) {
string error = "Client::~Client(): Not all XtreemFS volumes were closed."
" Did you forget to call Client::Shutdown()? Memory leaks are the"
" consequence.";
Logging::log->getLog(LEVEL_ERROR) << error << endl;
ErrorLog::error_log->AppendError(error);
}
network_client_->shutdown();
network_client_thread_->join();
// Since we wait for outstanding requests, the RPC client (network_client_)
// has to shutdown first and then we can wait for the Vivaldi thread.
// The other way around a deadlock might occur.
if (vivaldi_thread_.get() && vivaldi_thread_->joinable()) {
vivaldi_thread_->join();
}
atexit(google::protobuf::ShutdownProtobufLibrary);
shutdown_logger();
shutdown_error_log();
}
void ClientImplementation::Start() {
// start network (rpc) client
network_client_.reset(new xtreemfs::rpc::Client(
options_.connect_timeout_s,
options_.request_timeout_s,
options_.linger_timeout_s,
dir_service_ssl_options_));
network_client_thread_.reset(
new boost::thread(boost::bind(&xtreemfs::rpc::Client::run,
network_client_.get())));
GenerateVersion4UUID(&client_uuid_);
assert(!client_uuid_.empty());
uuid_resolver_.Initialize(network_client_.get());
// Start vivaldi thread if configured
if (options_.vivaldi_enable) {
if (Logging::log->loggingActive(LEVEL_INFO)) {
Logging::log->getLog(LEVEL_INFO)
<< "Starting vivaldi..." << endl;
}
vivaldi_->Initialize(network_client_.get());
vivaldi_thread_.reset(new boost::thread(boost::bind(&xtreemfs::Vivaldi::Run,
vivaldi_.get())));
}
async_write_callback_thread_.reset(
new boost::thread(&xtreemfs::AsyncWriteHandler::ProcessCallbacks,
boost::ref(async_write_callback_queue_)));
}
void ClientImplementation::Shutdown() {
if (!was_shutdown_) {
was_shutdown_ = true;
boost::mutex::scoped_lock lock(list_open_volumes_mutex_);
// Issue Close() on every Volume and remove it's pointer.
list<VolumeImplementation*>::iterator it;
while (!list_open_volumes_.empty()) {
it = list_open_volumes_.begin();
(*it)->CloseInternal();
delete *it;
it = list_open_volumes_.erase(it);
}
if (async_write_callback_thread_->joinable()) {
async_write_callback_thread_->interrupt();
async_write_callback_thread_->join();
}
// Stop vivaldi thread if running
if (vivaldi_thread_.get() && vivaldi_thread_->joinable()) {
vivaldi_thread_->interrupt();
}
}
}
Volume* ClientImplementation::OpenVolume(
const std::string& volume_name,
const xtreemfs::rpc::SSLOptions* ssl_options,
const Options& options) {
// TODO(mberlin): Fix possible leak through the use of scoped_ptr and swap().
SimpleUUIDIterator* mrc_uuid_iterator = new SimpleUUIDIterator;
uuid_resolver_.VolumeNameToMRCUUID(volume_name, mrc_uuid_iterator);
VolumeImplementation* volume = new VolumeImplementation(
this,
client_uuid_,
mrc_uuid_iterator,
volume_name,
ssl_options,
options);
{
boost::mutex::scoped_lock lock(list_open_volumes_mutex_);
list_open_volumes_.push_back(volume);
}
volume->Start();
return volume;
}
void ClientImplementation::CloseVolume(xtreemfs::Volume* volume) {
boost::mutex::scoped_lock lock(list_open_volumes_mutex_);
// Find given volume pointer address in list of open volumes and erase it.
// Free it afterwards.
list<VolumeImplementation*>::iterator it;
for (it = list_open_volumes_.begin(); it != list_open_volumes_.end(); ++it) {
if (*it == volume) {
// Free Volume object.
delete *it;
it = list_open_volumes_.erase(it);
}
}
}
void ClientImplementation::CreateVolume(
const ServiceAddresses& mrc_address,
const xtreemfs::pbrpc::Auth& auth,
const xtreemfs::pbrpc::UserCredentials& user_credentials,
const std::string& volume_name,
int mode,
const std::string& owner_username,
const std::string& owner_groupname,
const xtreemfs::pbrpc::AccessControlPolicyType& access_policy,
long volume_quota,
const xtreemfs::pbrpc::StripingPolicyType& default_striping_policy_type,
int default_stripe_size,
int default_stripe_width,
const std::list<xtreemfs::pbrpc::KeyValuePair*>& volume_attributes) {
MRCServiceClient mrc_service_client(network_client_.get());
xtreemfs::pbrpc::Volume new_volume;
new_volume.set_id("");
new_volume.set_mode(mode);
new_volume.set_name(volume_name);
new_volume.set_owner_user_id(owner_username);
new_volume.set_owner_group_id(owner_groupname);
new_volume.set_access_control_policy(access_policy);
new_volume.set_quota(volume_quota);
new_volume.mutable_default_striping_policy()
->set_type(default_striping_policy_type);
new_volume.mutable_default_striping_policy()
->set_stripe_size(default_stripe_size);
new_volume.mutable_default_striping_policy()->set_width(default_stripe_width);
for (list<KeyValuePair*>::const_iterator it = volume_attributes.begin();
it != volume_attributes.end();
++it) {
new_volume.add_attrs();
new_volume.mutable_attrs(new_volume.attrs_size() - 1)->set_key((*it)->key());
new_volume.mutable_attrs(new_volume.attrs_size() - 1)
->set_value((*it)->value());
}
SimpleUUIDIterator temp_uuid_iterator_with_addresses;
AddAddresses(mrc_address, &temp_uuid_iterator_with_addresses);
boost::scoped_ptr<rpc::SyncCallbackBase> response(
ExecuteSyncRequest(
boost::bind(
&xtreemfs::pbrpc::MRCServiceClient::xtreemfs_mkvol_sync,
&mrc_service_client,
_1,
boost::cref(auth),
boost::cref(user_credentials),
&new_volume),
&temp_uuid_iterator_with_addresses,
NULL,
RPCOptionsFromOptions(options_),
true));
response->DeleteBuffers();
}
void ClientImplementation::DeleteVolume(
const ServiceAddresses& mrc_address,
const xtreemfs::pbrpc::Auth& auth,
const xtreemfs::pbrpc::UserCredentials& user_credentials,
const std::string& volume_name) {
MRCServiceClient mrc_service_client(network_client_.get());
xtreemfs_rmvolRequest rmvol_request;
rmvol_request.set_volume_name(volume_name);
SimpleUUIDIterator temp_uuid_iterator_with_addresses;
AddAddresses(mrc_address, &temp_uuid_iterator_with_addresses);
boost::scoped_ptr<rpc::SyncCallbackBase> response(
ExecuteSyncRequest(
boost::bind(
&xtreemfs::pbrpc::MRCServiceClient::xtreemfs_rmvol_sync,
&mrc_service_client,
_1,
boost::cref(auth),
boost::cref(user_credentials),
&rmvol_request),
&temp_uuid_iterator_with_addresses,
NULL,
RPCOptionsFromOptions(options_),
true));
response->DeleteBuffers();
}
xtreemfs::pbrpc::Volumes* ClientImplementation::ListVolumes(
const ServiceAddresses& mrc_addresses,
const xtreemfs::pbrpc::Auth& auth) {
// Create a MRCServiceClient
MRCServiceClient mrc_service_client(network_client_.get());
// Use bogus user_credentials;
UserCredentials user_credentials;
user_credentials.set_username("xtreemfs");
SimpleUUIDIterator mrc_service_addresses_;
AddAddresses(mrc_addresses, &mrc_service_addresses_);
// Retrieve the list of volumes from the MRC.
boost::scoped_ptr<rpc::SyncCallbackBase> response(
ExecuteSyncRequest(
boost::bind(
&xtreemfs::pbrpc::MRCServiceClient::xtreemfs_lsvol_sync,
&mrc_service_client,
_1,
boost::cref(auth),
boost::cref(user_credentials)),
&mrc_service_addresses_,
NULL,
RPCOptionsFromOptions(options_),
true));
// Delete everything except the response.
delete[] response->data();
delete response->error();
// Return the list of volumes.
return static_cast<xtreemfs::pbrpc::Volumes*>(response->response());
}
/** ClientImplementation already implements UUIDResolver and therefore this
* returns just a cast to this. */
UUIDResolver* ClientImplementation::GetUUIDResolver() {
return &uuid_resolver_;
}
std::string ClientImplementation::UUIDToAddress(const std::string& uuid) {
std::string result;
uuid_resolver_.UUIDToAddress(uuid, &result);
return result;
}
const VivaldiCoordinates& ClientImplementation::GetVivaldiCoordinates() const {
return vivaldi_->GetVivaldiCoordinates();
}
util::SynchronizedQueue<AsyncWriteHandler::CallbackEntry>& ClientImplementation::GetAsyncWriteCallbackQueue() {
return async_write_callback_queue_;
}
} // namespace xtreemfs