/*********************************************************************** * * Copyright (C) 2006 Novell, Inc. All Rights Reserved. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; version 2.1 * of the License. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, Novell, Inc. * * To contact Novell about this file by physical or electronic mail, * you may find current contact information at www.novell.com. * * Author: Juan Carlos Luciani * ***********************************************************************/ //===[ Include files ]===================================================== #include "ipcint.h" #include "cchannel.h" #include "clientreq.h" #include "remoteendpoint.h" #include // Ensure that NDEBUG is defined for release builds! //===[ External data ]===================================================== //===[ External prototypes ]=============================================== //===[ Manifest constants ]================================================ #define REMOTE_ENDPOINT_SIGNATURE 0X50454D52 // RMEP //===[ Type definitions ]================================================== //===[ Function prototypes ]=============================================== //===[ Global variables ]================================================== //===[ Type definitions ]================================================== //===[ Function prototypes ]=============================================== //===[ Global variables ]================================================== // // Object Counters // unsigned long numRemoteEndPointObjects = 0; //++======================================================================= RemoteEndPoint::RemoteEndPoint( bool multithreaded, int maxRpcRetries, char *pSocketFileName) : m_signature (REMOTE_ENDPOINT_SIGNATURE), m_numChannelSubmits (0), m_maxRpcRetries (maxRpcRetries) // // Arguments: // // Returns: // // Abstract: // // Notes: // // L2 //=======================================================================-- { DbgTrace(1, "RemoteEndPoint::RemoteEndPoint- Start, Obj = %0X\n", this); // Verify that the specified path is not too long if (strlen(pSocketFileName) < sizeof(m_serverUnAddr.sun_path)) { // Initialize our mutex pthread_mutex_init(&m_mutex, NULL); // Set the necessary information in the m_serverUnAddr variable m_serverUnAddr.sun_family = AF_UNIX; strncpy(m_serverUnAddr.sun_path, pSocketFileName, sizeof(m_serverUnAddr.sun_path) - 1); // Set the necessary flags to indicate that DOMAIN sockets // should be used for communications. m_Use_PF_UNIX = true; m_Use_AF_INET = false; // Setup the number of channels that we may have based on // whether the application is multi-threaded or not. if (multithreaded) m_numCChannels = MAX_CHANNELS_PER_ENDPOINT; else m_numCChannels = 1; // Instantiate entries in SmartCChannel vector try { for (int i = 0; i < m_numCChannels; i++) m_cchannelVector.push_back(SmartCChannelPointer()); } catch (...) { DbgTrace(0, "RemoteEndPoint::RemoteEndPoint- Exception caught while initializing the cchannelVector\n", 0); pthread_mutex_destroy(&m_mutex); throw bad_alloc(); } } else { DbgTrace(0, "RemoteEndPoint::RemoteEndPoint- Socket file path name too long\n", 0); throw bad_alloc(); } // Increment the object count InterlockedIncrement(&numRemoteEndPointObjects); DbgTrace(1, "RemoteEndPoint::RemoteEndPoint- End\n", 0); } /*-- RemoteEndPoint::RemoteEndPoint() --*/ //++======================================================================= RemoteEndPoint::RemoteEndPoint( bool multithreaded, int maxRpcRetries, unsigned short int port, uint32_t address) : m_signature (REMOTE_ENDPOINT_SIGNATURE), m_numChannelSubmits (0), m_maxRpcRetries (maxRpcRetries) // // Arguments: // // Returns: // // Abstract: // // Notes: // // L2 //=======================================================================-- { DbgTrace(1, "RemoteEndPoint::RemoteEndPoint- Start, Obj = %0X\n", this); // Initialize our mutex pthread_mutex_init(&m_mutex, NULL); // Set the necessary information in the serverInAddr variable m_serverInAddr.sin_family = AF_INET; m_serverInAddr.sin_port = htons(port); m_serverInAddr.sin_addr.s_addr = htonl(address); // Set the necessary flags to indicate that TCP sockets // should be used for communications. m_Use_AF_INET = true; m_Use_PF_UNIX = false; // Setup the number of channels that we may have based on // whether the application is multi-threaded or not. if (multithreaded) m_numCChannels = MAX_CHANNELS_PER_ENDPOINT; else m_numCChannels = 1; // Instantiate entries in SmartCChannel vector try { for (int i = 0; i < m_numCChannels; i++) m_cchannelVector.push_back(SmartCChannelPointer()); } catch (...) { DbgTrace(0, "RemoteEndPoint::RemoteEndPoint- Exception caught while initializing the cchannelVector\n", 0); pthread_mutex_destroy(&m_mutex); throw bad_alloc(); } // Increment the object count InterlockedIncrement(&numRemoteEndPointObjects); DbgTrace(1, "RemoteEndPoint::RemoteEndPoint- End\n", 0); } /*-- RemoteEndPoint::RemoteEndPoint() --*/ //++======================================================================= RemoteEndPoint::~RemoteEndPoint(void) // // Arguments: // // Returns: // // Abstract: // // Notes: // // L2 //=======================================================================-- { DbgTrace(1, "RemoteEndPoint::~RemoteEndPoint- Start, Obj = %0X\n", this); // Clean up all allocated SmartCChannel objects for (int i = 0; i < m_cchannelVector.size(); i++) { if (m_cchannelVector[i].getPointer() != NULL) { CChannel *pCChannel = *(m_cchannelVector[i].getPointer()); pCChannel->closeChannel(); } } m_cchannelVector.clear(); // Decrement the object count InterlockedDecrement(&numRemoteEndPointObjects); DbgTrace(1, "RemoteEndPoint::~RemoteEndPoint- End\n", 0); } /*-- RemoteEndPoint::~RemoteEndPoint() --*/ //++======================================================================= SmartCChannel* RemoteEndPoint::getCChannel(void) // // Arguments: // // Returns: // // Abstract: // // Notes: // // L2 //=======================================================================-- { SmartCChannel *pSmartCChannel = NULL; int channelSelector = (m_numChannelSubmits++) % m_numCChannels; DbgTrace(1, "RemoteEndPoint::getCChannel- Start, Obj = %0X\n", this); #if DEBUG assert(m_signature == REMOTE_ENDPOINT_SIGNATURE); #endif // Obtain our mutex pthread_mutex_lock(&m_mutex); // Check if there is an available and usable channel for the client if (m_cchannelVector[channelSelector].getPointer() != NULL && (*m_cchannelVector[channelSelector].getPointer())->ok()) { // Use the available channel pSmartCChannel = new SmartCChannel(*m_cchannelVector[channelSelector].getPointer()); } else { // The channel is either unavailable or unusable, clean up // the channel if it is indeed unusable. if (m_cchannelVector[channelSelector].getPointer() != NULL) { // Clean up the channel CChannel *pCChannel = *m_cchannelVector[channelSelector].getPointer(); pCChannel->closeChannel(); delete m_cchannelVector[channelSelector].getPointer(); m_cchannelVector[channelSelector].setPointer(NULL); } CChannel *pCChannel = NULL; try { // Use the appropriate server address when instantiating // the CChannel object. if (m_Use_PF_UNIX) { // PF_UNIX pCChannel = new CChannel(&m_serverUnAddr); } else { // Assume AF_INET pCChannel = new CChannel(&m_serverInAddr); } // CChannel object created, now associate a SmartCChannel // object with it. It is important to do this to keep // the object from being deleted as we initialize it. m_cchannelVector[channelSelector].setPointer(new SmartCChannel(pCChannel)); // Initialize the CChannel if (pCChannel->init() == 0) { // CChannel initialization succeeded, use it to // satisfy the caller. pSmartCChannel = new SmartCChannel(*m_cchannelVector[channelSelector].getPointer()); } else { // CChannel initialization failed delete m_cchannelVector[channelSelector].getPointer(); m_cchannelVector[channelSelector].setPointer(NULL); } } catch (...) { DbgTrace(0, "getCChannel- Exception caught\n", 0); // Try to clean things up just in case if (m_cchannelVector[channelSelector].getPointer()) { delete m_cchannelVector[channelSelector].getPointer(); m_cchannelVector[channelSelector].setPointer(NULL); } else { if (pCChannel != NULL) delete pCChannel; } } } // Release client mutex pthread_mutex_unlock(&m_mutex); DbgTrace(1, "RemoteEndPoint::getCChannel- End\n", 0); return pSmartCChannel; } /*-- RemoteEndPoint::getCChannel() --*/ //++======================================================================= int RemoteEndPoint::submitReq( char *pClientData, uint32_t clientDataLen, char **ppServerData, uint32_t *pServerDataLen) // // Arguments: // // Returns: // // Abstract: // // Notes: // // L2 //=======================================================================-- { int retStatus = -1; DbgTrace(1, "RemoteEndPoint::submitReq- Start, Obj = %0X\n", this); try { SmartCChannel *pSmartCChannel; // Perform the following in a loop to deal with abnormal connection terminations unsigned long rpcRetryCount = 0; bool okToRetry = true; while (rpcRetryCount < m_maxRpcRetries) { // Get SmartCChannel pSmartCChannel = getCChannel(); if (pSmartCChannel != NULL) { // Get pointer to channel object CChannel *pCChannel = *pSmartCChannel; // Allocate a requestId uint32_t reqId = pCChannel->allocReqId(); // Allocate client request object. ClientReq clientReq(reqId); // Submit the request via the channel if (pCChannel->submitReq(reqId, clientReq, pClientData, clientDataLen) == 0) { // Request submission over the channel succeeded, now // wait for the completion of the request. if (clientReq.waitForCompletion(ppServerData, pServerDataLen) == 0) { // Now proceed based on the completion status ClientReq::CompletionStatus compStatus = clientReq.completionStatus(); if (compStatus == ClientReq::SuccessCompletionStatus) { // Success retStatus = 0; } else { // Received a failure from the server, do not retry. okToRetry = false; } } else { // Error waiting for the completion. okToRetry = false; } // Remove the request from the channel pCChannel->removeReq(reqId); } else { DbgTrace(0, "RemoteEndPoint::submitReq- Request submittion over the channel failed\n", 0); // Remove the request from the channel pCChannel->removeReq(reqId); } // Delete the SmartCChannel delete pSmartCChannel; } else { DbgTrace(0, "RemoteEndPoint::submitReq- Channel unavailable\n", 0); } // Stop trying if the RPC succeeded or if it is not ok to retry if (retStatus == 0 || !okToRetry) break; // Account for this RPC try rpcRetryCount ++; DbgTrace(0, "RemoteEndPoint::submitReq- Will attempt to retry RPC, count = %d\n", rpcRetryCount); } } catch(...) { DbgTrace(0, "RemoteEndPoint::submitReq- Exception caught\n", 0); } DbgTrace(1, "RemoteEndPoint::submitReq- End\n", 0); return retStatus; } /*-- RemoteEndPoint::submitReq() --*/ //========================================================================= //=========================================================================