Enhanced to allow clients to submit requests to different remote

endpoints.
This commit is contained in:
Juan Carlos Luciani 2006-09-04 06:10:09 +00:00
parent 48d4d947ea
commit 960d27db69
5 changed files with 1064 additions and 461 deletions

View File

@ -56,6 +56,7 @@ CFILES =
CPPFILES = channelproto.cpp \
cchannel.cpp \
clientreq.cpp \
remoteendpoint.cpp \
client.cpp
CSFILES_CSC :=

View File

@ -33,6 +33,7 @@ extern "C" {
#include "cchannel.h"
#include "clientreq.h"
#include "remoteendpoint.h"
//===[ External data ]=====================================================
@ -41,27 +42,10 @@ extern "C" {
//===[ Manifest constants ]================================================
#define MAX_RPC_RETRIES 3
#define MAX_CHANNELS 3
#define DEFAULT_MAX_RPC_RETRIES 3
//===[ Type definitions ]==================================================
//
// Class for maintaining SmartCChannel pointers within the daemonVector.
//
class SmartCChannelPointer
{
private:
SmartCChannel *m_pSmartCChannel;
public:
SmartCChannelPointer() : m_pSmartCChannel(NULL) {}
~SmartCChannelPointer() { if (m_pSmartCChannel != NULL) delete m_pSmartCChannel; }
SmartCChannel* getPointer() { return m_pSmartCChannel; }
void setPointer(SmartCChannel *pSmartCChannel) { m_pSmartCChannel = pSmartCChannel; }
};
//===[ Function prototypes ]===============================================
//===[ Global variables ]==================================================
@ -74,9 +58,8 @@ bool UseSyslog = false;
char unInitialized[] = "Uninitialized";
char *pAppName = unInitialized;
vector<SmartCChannelPointer> cchannelVector;
int numCChannels;
int numChannelSubmits = 0;
// Application threaded information
bool appMultithreaded;
// Client mutex
pthread_mutex_t clientMutex;
@ -84,52 +67,379 @@ pthread_mutex_t clientMutex;
// Mutex for interlocked operations
pthread_mutex_t interlockedMutex;
// Indications
// Indicators
bool svcInitialized = false;
bool serverAddressSet = false;
// Server address variables
bool use_AF_INET;
bool use_PF_UNIX;
struct sockaddr_in serverInAddr = {0};
struct sockaddr_un serverUnAddr = {0};
// Map of open remote endpoints.
//
// This map contains all of the open remote
// endpoint objects. The key used to obtain
// RemoteEndPoint objects from the map is an
// object handle.
//
typedef map<uint32_t, SmartPtr<RemoteEndPoint>*> REPMap;
typedef REPMap::iterator REPMapIter;
typedef pair<REPMapIter, bool> REPIterBoolPair;
REPMap repMap;
// RemoteEndPoint handle allocator
uint32_t remoteEndPointHandleAllocator = 1;
//++=======================================================================
void
ReInitializeIpc(void)
extern "C"
int
IpcClientOpenUnixRemoteEndPoint(
IN char *pSocketFileName,
IN int maxRpcRetries,
INOUT uint32_t *pEndPointHandle)
//
// Arguments In: None.
// Arguments In: port - Server's listening port number.
//
// address - The server's IP Address. Use
// 0x7F000001 if the server is local.
//
// maxRpcRetries - Maximum number of Rpc retries that
// should be utilized when submitting
// a request to the endpoint. A value
// of zero requests that the default
// setting be utilized.
//
// Arguments Out: None.
// Arguments Out: pEndPointHandle - Pointer to variable that will receive
// the endpoint handle.
//
// Returns: Nothing.
// Returns: 0 == Success
// -1 == Failure
//
// Abstract: Method to re-initialize the IPC infrastructure for process.
// Abstract: Method to open a UNIX (PF_UNIX) remote endpoint.
//
// Note: The service should have been initialized before calling
// this procedure.
//
// L2
// L0
//=======================================================================--
{
CChannel *pCChannel;
int retStatus = -1;
DbgTrace(1, "ReInitializeIpc- Start\n", 0);
DbgTrace(1, "IpcClientOpenUnixRemoteEndPoint- Start\n", 0);
// Clean up all allocated SmartCChannel objects
for (int i = 0; i < cchannelVector.size(); i++)
// Verify the input parameters
if (pSocketFileName == NULL
|| pEndPointHandle == NULL)
{
// Close the channel if present
if (cchannelVector[i].getPointer() != NULL)
{
pCChannel = *(cchannelVector[i].getPointer());
pCChannel->closeChannel();
// Free the SmartCChannel
delete cchannelVector[i].getPointer();
cchannelVector[i].setPointer(NULL);
}
DbgTrace(0, "IpcClientOpenUnixRemoteEndPoint- Invalid parameter\n", 0);
goto exit;
}
DbgTrace(1, "ReInitializeIpc- End\n", 0);
// Verify that we have been initialized
if (svcInitialized)
{
// Set the default max rpc retry value if necessary
if (maxRpcRetries == 0)
maxRpcRetries = DEFAULT_MAX_RPC_RETRIES;
// Acquire our mutex
pthread_mutex_lock(&clientMutex);
try {
// Instantiate a RemoteEndPoint object and keep track of it
// with a smart pointer.
SmartRemoteEndPoint *pSmartRemoteEndPoint = new SmartRemoteEndPoint(new RemoteEndPoint(appMultithreaded,
maxRpcRetries,
pSocketFileName));
// Allocate a handle for the endpoint
uint32_t handle = remoteEndPointHandleAllocator ++;
// Insert the new RemoteEndPoint into the REP map
REPIterBoolPair insertResult;
insertResult = repMap.insert(make_pair(handle, pSmartRemoteEndPoint));
if (!insertResult.second)
{
// Insertion failed
DbgTrace(0, "IpcClientOpenUnixRemoteEndPoint- Unable to insert RemoteEndPoint into REP\n", 0);
delete pSmartRemoteEndPoint;
}
else
{
// RemoteEndPoint inserted in the REP map, success.
*pEndPointHandle = handle;
retStatus = 0;
}
} catch (...) {
DbgTrace(0, "IpcClientOpenUnixRemoteEndPoint- Exception caught\n", 0);
}
pthread_mutex_unlock(&clientMutex);
}
else
{
DbgTrace(0, "IpcClientOpenUnixRemoteEndPoint- Not initialized\n", 0);
}
exit:
DbgTrace(1, "IpcClientOpenUnixRemoteEndPoint- End, status = %08X\n", retStatus);
return retStatus;
}
//++=======================================================================
extern "C"
int
IpcClientOpenInetRemoteEndPoint(
IN unsigned short int port,
IN uint32_t address,
IN int maxRpcRetries,
INOUT uint32_t *pEndPointHandle)
//
// Arguments In: port - Server's listening port number.
//
// address - The server's IP Address. Use
// 0x7F000001 if the server is local.
//
// maxRpcRetries - Maximum number of Rpc retries that
// should be utilized when submitting
// a request to the endpoint. A value
// of zero requests that the default
// setting be utilized.
//
// Arguments Out: pEndPointHandle - Pointer to variable that will receive
// the endpoint handle.
//
// Returns: 0 == Success
// -1 == Failure
//
// Abstract: Method to open a TCP (AF_INET) remote endpoint.
//
// Note: The service should have been initialized before calling
// this procedure.
//
// L0
//=======================================================================--
{
int retStatus = -1;
DbgTrace(1, "IpcClientOpenInetRemoteEndPoint- Start\n", 0);
// Verify the input parameters
if (pEndPointHandle == NULL)
{
DbgTrace(0, "IpcClientOpenInetRemoteEndPoint- Invalid parameter\n", 0);
goto exit;
}
// Verify that we have been initialized
if (svcInitialized)
{
// Set the default max rpc retry value if necessary
if (maxRpcRetries == 0)
maxRpcRetries = DEFAULT_MAX_RPC_RETRIES;
// Acquire our mutex
pthread_mutex_lock(&clientMutex);
try {
// Instantiate a RemoteEndPoint object and keep track of it
// with a smart pointer.
SmartRemoteEndPoint *pSmartRemoteEndPoint = new SmartRemoteEndPoint(new RemoteEndPoint(appMultithreaded,
maxRpcRetries,
port,
address));
// Allocate a handle for the endpoint
uint32_t handle = remoteEndPointHandleAllocator ++;
// Insert the new RemoteEndPoint into the REP map
REPIterBoolPair insertResult;
insertResult = repMap.insert(make_pair(handle, pSmartRemoteEndPoint));
if (!insertResult.second)
{
// Insertion failed
DbgTrace(0, "IpcClientOpenInetRemoteEndPoint- Unable to insert RemoteEndPoint into REP\n", 0);
delete pSmartRemoteEndPoint;
}
else
{
// RemoteEndPoint inserted in the REP map, success.
*pEndPointHandle = handle;
retStatus = 0;
}
} catch (...) {
DbgTrace(0, "IpcClientOpenInetRemoteEndPoint- Exception caught\n", 0);
}
pthread_mutex_unlock(&clientMutex);
}
else
{
DbgTrace(0, "IpcClientOpenInetRemoteEndPoint- Not initialized\n", 0);
}
exit:
DbgTrace(1, "IpcClientOpenInetRemoteEndPoint- End, status = %08X\n", retStatus);
return retStatus;
}
//++=======================================================================
extern "C"
int
IpcClientCloseRemoteEndPoint(
IN uint32_t endPointHandle)
//
// Arguments In: endpointHandle - Handle of the endpoint being closed.
//
//
// Arguments Out: None.
// the endpoint handle.
//
// Returns: 0 == Success
// -1 == Failure
//
// Abstract: Method to close a remote endpoint.
//
// Note: The service should have been initialized before calling
// this procedure.
//
// L0
//=======================================================================--
{
int retStatus = -1;
DbgTrace(1, "IpcClientCloseRemoteEndPoint- Start\n", 0);
// Verify that we have been initialized
if (svcInitialized)
{
// Acquire our mutex
pthread_mutex_lock(&clientMutex);
// Find the appropriate RemoteEndPoint object in the REP Map using
// the handle provided by the caller.
REPMapIter iter = repMap.find(endPointHandle);
if (iter != repMap.end())
{
// Object was found in the map, remove it.
SmartRemoteEndPoint *pSmartRemoteEndPoint = iter->second;
repMap.erase(iter);
// Release our mutex before deleting the endpoint
pthread_mutex_unlock(&clientMutex);
// Close the endpoint
delete pSmartRemoteEndPoint;
// Success
retStatus = 0;
}
else
{
DbgTrace(0, "IpcClientCloseRemoteEndPoint- Invalid handle\n", 0);
// Release our mutex
pthread_mutex_unlock(&clientMutex);
}
}
else
{
DbgTrace(0, "IpcClientCloseRemoteEndPoint- Not initialized\n", 0);
}
exit:
DbgTrace(1, "IpcClientCloseRemoteEndPoint- End, status = %08X\n", retStatus);
return retStatus;
}
//++=======================================================================
extern "C"
int
IpcClientSubmitReq(
IN uint32_t endPointHandle,
IN char *pClientData,
IN int clientDataLen,
INOUT char **ppServerData,
INOUT int *pServerDataLen)
//
// Arguments In: endPointHandle - Handle of the remote endpoint that will
// be the target of the request.
//
// pClientData - Pointer to client data that must be sent to
// the server. Buffer is NEVER released by the
// procedure.
//
// clientDataLen - Length of the client data.
//
// Arguments Out: ppServerData - Pointer to variable that will receive a
// pointer to the buffer containing the data
// received from the server.
//
// pServerDataLen - Pointer to variable that will receive the
// length of the data received from the server.
//
// Returns: 0 == Request completed gracefully
// -1 == Request did not complete gracefully
//
// Abstract: Method to submit a request.
//
// Note: The routine blocks until the request completes.
//
// L0
//=======================================================================--
{
int retStatus = -1;
DbgTrace(1, "IpcClientSubmitReq- Start\n", 0);
// Verify that we have been initialized
if (svcInitialized)
{
// Acquire our mutex
pthread_mutex_lock(&clientMutex);
// Find the appropriate RemoteEndPoint object in the REP Map using
// the handle provided by the caller.
REPMapIter iter = repMap.find(endPointHandle);
if (iter != repMap.end())
{
// Object was found in the map, use it to submit
// the request.
SmartRemoteEndPoint *pSmartRemoteEndPoint = new SmartRemoteEndPoint(*(iter->second));
// Release our mutex before deleting the endpoint
pthread_mutex_unlock(&clientMutex);
// Submit the request
retStatus = (*pSmartRemoteEndPoint)->submitReq(pClientData,
clientDataLen,
ppServerData,
pServerDataLen);
// Get rid of the reference to the remote endpoint
delete pSmartRemoteEndPoint;
}
else
{
DbgTrace(0, "IpcClientSubmitReq- Invalid handle\n", 0);
// Release our mutex
pthread_mutex_unlock(&clientMutex);
}
}
else
{
DbgTrace(0, "IpcClientSubmitReq- Not initialized\n", 0);
}
DbgTrace(1, "IpcClientSubmitReq- End, retStatus = %08X\n", retStatus);
return retStatus;
}
@ -181,54 +491,36 @@ IpcClientInit(
goto exit;
}
// Save a copy of the application name
pAppName = new char[strlen(pName) + 1];
if (pAppName == NULL)
// Verify that we have not been initialized already
if (!svcInitialized)
{
DbgTrace(0, "IpcClientInit- Memory allocation failure\n", 0);
goto exit;
}
strcpy(pAppName, pName);
// Save the rest of the debug settings
DebugLevel = debugLevel;
UseSyslog = useSyslog;
// Initialize our mutexes
pthread_mutex_init(&clientMutex, NULL);
pthread_mutex_init(&interlockedMutex, NULL);
// Proceed based on whether or not we have already instantiated
// SmartCChannel vectors.
if (cchannelVector.size() == 0)
{
// SmartCChannel entries have not been instantiated
//
// Setup the number of channels that we may have based on
// whether the application is multi-threaded or not.
if (multithreaded)
numCChannels = MAX_CHANNELS;
else
numCChannels = 1;
// Instantiate entries in SmartCChannel vector
try {
for (int i = 0; i < numCChannels; i++)
cchannelVector.push_back(SmartCChannelPointer());
// Done initializing
svcInitialized = true;
retStatus = 0;
} catch (...) {
DbgTrace(0, "IpcClientInit- Exception caught while initializing the cchannelVector\n", 0);
// Save a copy of the application name
pAppName = new char[strlen(pName) + 1];
if (pAppName == NULL)
{
DbgTrace(0, "IpcClientInit- Memory allocation failure\n", 0);
goto exit;
}
strcpy(pAppName, pName);
// Save the app multithreaded information
appMultithreaded = multithreaded;
// Save the rest of the debug settings
DebugLevel = debugLevel;
UseSyslog = useSyslog;
// Initialize our mutexes
pthread_mutex_init(&clientMutex, NULL);
pthread_mutex_init(&interlockedMutex, NULL);
// Success
svcInitialized = true;
retStatus = 0;
}
else
{
// SmartCChannel vector has already been instantiated
ReInitializeIpc();
retStatus = 0;
DbgTrace(0, "IpcClientInit- Initialized already\n", 0);
}
exit:
@ -239,134 +531,6 @@ exit:
}
//++=======================================================================
extern "C"
int
IpcClientSetUnAddress(
IN char *pSocketFileName)
//
// Arguments In: pSocketFileName - Pointer to string containing the name
// of the socket file.
//
// Arguments Out: None.
//
// Returns: 0 == Success
// -1 == Failure
//
// Abstract: Method to set the socket file name to utilize for
// communicating with the server via DOMAIN sockets.
//
// Note: The service should have been initialized before calling
// this procedure.
//
// L1
//=======================================================================--
{
int retStatus = -1;
DbgTrace(1, "IpcClientSetUnAddress- Start\n", 0);
// Verify that we have been initialized
if (svcInitialized)
{
// Verify that the address has not already been set
if (serverAddressSet == false)
{
// Set the necessary information in the serverUnAddr variable
serverUnAddr.sun_family = AF_UNIX;
strcpy(serverUnAddr.sun_path, pSocketFileName);
// Set the necessary flags to indicate that DOMAIN sockets
// should be used for communications.
use_PF_UNIX = true;
use_AF_INET = false;
// Success
serverAddressSet = true;
retStatus = 0;
}
else
{
DbgTrace(0, "IpcClientSetUnAddress- Already set\n", 0);
}
}
else
{
DbgTrace(0, "IpcClientSetUnAddress- Not initialized\n", 0);
}
DbgTrace(1, "IpcClientSetUnAddress- End, status = %08X\n", retStatus);
return retStatus;
}
//++=======================================================================
extern "C"
int
IpcClientSetInAddress(
IN unsigned short int serverPort,
IN uint32_t serverAddress)
//
// Arguments In: serverPort - Server's listening port number.
//
// serverAddress - The server's IP Address. Use
// 0x7F000001 if the server is local.
//
// Arguments Out: None.
//
// Returns: 0 == Success
// -1 == Failure
//
// Abstract: Method to set the address to utilize for communicating
// with the server via TCP sockets.
//
// Note: The service should have been initialized before calling
// this procedure.
//
// L1
//=======================================================================--
{
int retStatus = -1;
DbgTrace(1, "IpcClientSetInAddress- Start\n", 0);
// Verify that we have been initialized
if (svcInitialized)
{
// Verify that the address has not already been set
if (serverAddressSet == false)
{
// Set the necessary information in the serverInAddr variable
serverInAddr.sin_family = AF_INET;
serverInAddr.sin_port = htons(serverPort);
serverInAddr.sin_addr.s_addr = htonl(serverAddress);
// Set the necessary flags to indicate that TCP sockets
// should be used for communications.
use_AF_INET = true;
use_PF_UNIX = false;
// Success
serverAddressSet = true;
retStatus = 0;
}
else
{
DbgTrace(0, "IpcClientSetInAddress- Already set\n", 0);
}
}
else
{
DbgTrace(0, "IpcClientSetInAddress- Not initialized\n", 0);
}
DbgTrace(1, "IpcClientSetInAddress- End, status = %08X\n", retStatus);
return retStatus;
}
//++=======================================================================
extern "C"
void
@ -380,251 +544,47 @@ IpcClientShutdown(void)
//
// Abstract: Method to shutdown the IPC infrastructure for process.
//
// L2
// L0
//=======================================================================--
{
DbgTrace(1, "IpcClientShutdown- Start\n", 0);
ReInitializeIpc();
// Free the AppName string if necessary
if (pAppName != unInitialized)
// Verify that we have been initialized
if (svcInitialized)
{
delete[] pAppName;
pAppName = unInitialized;
// Forget about having been initialized
svcInitialized = false;
// Clean up the REP map
pthread_mutex_lock(&clientMutex);
while (!repMap.empty())
{
REPMapIter iter = repMap.begin();
SmartRemoteEndPoint *pSmartRemoteEndPoint = iter->second;
repMap.erase(iter);
pthread_mutex_unlock(&clientMutex);
delete pSmartRemoteEndPoint;
pthread_mutex_lock(&clientMutex);
}
pthread_mutex_unlock(&clientMutex);
// Free the AppName string if necessary
if (pAppName != unInitialized)
{
delete[] pAppName;
pAppName = unInitialized;
}
}
else
{
DbgTrace(0, "IpcClientShutdown- Not initialized\n", 0);
}
DbgTrace(1, "IpcClientShutdown- End\n", 0);
}
//++=======================================================================
SmartCChannel *
getCChannel(void)
//
// Arguments In: Nothing.
//
// Arguments Out: Nothing.
//
// Returns: Pointer to SmartCChannel object if successful, otherwise
// NULL.
//
// Abstract: Method to get a SmartCChannel for submitting a request.
//
// L2
//=======================================================================--
{
SmartCChannel *pSmartCChannel = NULL;
int channelSelector = (numChannelSubmits++) % numCChannels;
DbgTrace(1, "IPCCLNT -getCChannel- Start\n", 0);
// Just exit if the server address has not been set
if (!serverAddressSet)
{
DbgTrace(0, "IPCCLNT -getCChannel- Server address not set\n", 0);
goto exit;
}
// Obtain client mutex
pthread_mutex_lock(&clientMutex);
// Check if there is an available and usable channel for the client
if (cchannelVector[channelSelector].getPointer() != NULL
&& (*cchannelVector[channelSelector].getPointer())->ok())
{
// Use the available channel
pSmartCChannel = new SmartCChannel(*cchannelVector[channelSelector].getPointer());
}
else
{
// The channel is either unavailable or unusable, clean up
// the channel if it is indeed unusable.
if (cchannelVector[channelSelector].getPointer() != NULL)
{
// Clean up the channel
CChannel *pCChannel = *cchannelVector[channelSelector].getPointer();
pCChannel->closeChannel();
delete cchannelVector[channelSelector].getPointer();
cchannelVector[channelSelector].setPointer(NULL);
}
CChannel *pCChannel;
try {
// Use the appropriate server address when instantiating
// the CChannel object.
if (use_PF_UNIX)
{
// PF_UNIX
pCChannel = new CChannel(&serverUnAddr);
}
else
{
// Assume AF_INET
pCChannel = new CChannel(&serverInAddr);
}
// CChannel object created, now associate a SmartCChannel
// object with it. It is important to do this to keep
// the object from being deleted as we initialize it.
cchannelVector[channelSelector].setPointer(new SmartCChannel(pCChannel));
// Initialize the CChannel
if (pCChannel->init() == 0)
{
// CChannel initialization succeeded, use it to
// satisfy the caller.
pSmartCChannel = new SmartCChannel(*cchannelVector[channelSelector].getPointer());
}
else
{
// CChannel initialization failed
delete cchannelVector[channelSelector].getPointer();
cchannelVector[channelSelector].setPointer(NULL);
}
}
catch (...) {
DbgTrace(0, "getCChannel- Exception caught\n", 0);
// Try to clean things up just in case
if (cchannelVector[channelSelector].getPointer())
{
delete cchannelVector[channelSelector].getPointer();
cchannelVector[channelSelector].setPointer(NULL);
}
else
{
if (pCChannel != NULL)
delete pCChannel;
}
}
}
// Release client mutex
pthread_mutex_unlock(&clientMutex);
exit:
DbgTrace(1, "getCChannel- End, Obj = %08X\n", pSmartCChannel);
return pSmartCChannel;
}
//++=======================================================================
extern "C"
int
IpcClientSubmitReq(
IN char *pClientData,
IN int clientDataLen,
INOUT char **ppServerData,
INOUT int *pServerDataLen)
//
// Arguments In: pClientData - Pointer to client data that must be sent to
// the server. Buffer is NEVER released by the
// procedure.
//
// clientDataLen - Length of the client data.
//
// Arguments Out: ppServerData - Pointer to variable that will receive a
// pointer to the buffer containing the data
// received from the server.
//
// pServerDataLen - Pointer to variable that will receive the
// length of the data received from the server.
//
// Returns: 0 == Request completed gracefully
// -1 == Request did not complete gracefully
//
// Abstract: Method to submit a request.
//
// Note: The routine blocks until the request completes.
//
// L2
//=======================================================================--
{
int retStatus = -1;
DbgTrace(1, "IpcClientSubmitReq- Start\n", 0);
try {
SmartCChannel *pSmartCChannel;
// Perform the following in a loop to deal with abnormal connection terminations
unsigned long rpcRetryCount = 0;
while (rpcRetryCount < MAX_RPC_RETRIES)
{
// Get SmartCChannel
pSmartCChannel = getCChannel();
if (pSmartCChannel != NULL)
{
// Get pointer to channel object
CChannel *pCChannel = *pSmartCChannel;
// Allocate a requestId
uint32_t reqId = pCChannel->allocReqId();
// Allocate client request object.
ClientReq clientReq(reqId);
// Submit the request via the channel
if (pCChannel->submitReq(reqId,
clientReq,
pClientData,
clientDataLen) == 0)
{
// Request submission over the channel succeeded, now
// wait for the completion of the request.
clientReq.waitForCompletion(ppServerData,
pServerDataLen);
// Remove the request from the channel
pCChannel->removeReq(reqId);
// Now proceed based on the completion status
ClientReq::CompletionStatus compStatus = clientReq.completionStatus();
if (compStatus == ClientReq::SuccessCompletionStatus)
{
// Success
retStatus = 0;
}
}
else
{
DbgTrace(0, "IpcClientSubmitReq- Request submittion over the channel failed\n", 0);
// Remove the request from the channel
pCChannel->removeReq(reqId);
}
// Delete the SmartCChannel
delete pSmartCChannel;
}
else
{
DbgTrace(0, "IpcClientSubmitReq- Channel unavailable\n", 0);
}
// Stop trying if the RPC succeeded
if (retStatus == 0)
break;
// Account for this RPC try
rpcRetryCount ++;
DbgTrace(0, "IpcClientSubmitReq- Will attempt to retry RPC, count = %d\n", rpcRetryCount);
}
}
catch(...) {
DbgTrace(0, "IpcClientSubmitReq-- Exception caught\n", 0);
}
DbgTrace(1, "IpcClientSubmitReq- End, retStatus = %08X\n", retStatus);
return retStatus;
}
//=========================================================================
//=========================================================================

View File

@ -0,0 +1,428 @@
/***********************************************************************
*
* Copyright (C) 2006 Novell, Inc. All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; version 2.1
* of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, Novell, Inc.
*
* To contact Novell about this file by physical or electronic mail,
* you may find current contact information at www.novell.com.
*
* Author: Juan Carlos Luciani <jluciani@novell.com>
*
***********************************************************************/
//===[ Include files ]=====================================================
#include "ipcint.h"
#include "cchannel.h"
#include "clientreq.h"
#include "remoteendpoint.h"
#include <assert.h> // Ensure that NDEBUG is defined for release builds!
//===[ External data ]=====================================================
//===[ External prototypes ]===============================================
//===[ Manifest constants ]================================================
#define REMOTE_ENDPOINT_SIGNATURE 0x54525653 // SVRT
//===[ Type definitions ]==================================================
//===[ Function prototypes ]===============================================
//===[ Global variables ]==================================================
//===[ Type definitions ]==================================================
//===[ Function prototypes ]===============================================
//===[ Global variables ]==================================================
//
// Object Counters
//
unsigned long numRemoteEndPointObjects = 0;
//++=======================================================================
RemoteEndPoint::RemoteEndPoint(
bool multithreaded,
int maxRpcRetries,
char *pSocketFileName) :
m_signature (REMOTE_ENDPOINT_SIGNATURE),
m_numChannelSubmits (0),
m_maxRpcRetries (maxRpcRetries)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L0
//=======================================================================--
{
DbgTrace(1, "RemoteEndPoint::RemoteEndPoint- Start, Obj = %08X\n", this);
// Initialize our mutex
pthread_mutex_init(&m_mutex, NULL);
// Set the necessary information in the m_serverUnAddr variable
m_serverUnAddr.sun_family = AF_UNIX;
strcpy(m_serverUnAddr.sun_path, pSocketFileName);
// Set the necessary flags to indicate that DOMAIN sockets
// should be used for communications.
m_Use_PF_UNIX = true;
m_Use_AF_INET = false;
// Setup the number of channels that we may have based on
// whether the application is multi-threaded or not.
if (multithreaded)
m_numCChannels = MAX_CHANNELS_PER_ENDPOINT;
else
m_numCChannels = 1;
// Instantiate entries in SmartCChannel vector
try {
for (int i = 0; i < m_numCChannels; i++)
m_cchannelVector.push_back(SmartCChannelPointer());
} catch (...) {
DbgTrace(0, "RemoteEndPoint::RemoteEndPoint- Exception caught while initializing the cchannelVector\n", 0);
throw bad_alloc();
}
// Increment the object count
InterlockedIncrement(&numRemoteEndPointObjects);
DbgTrace(1, "RemoteEndPoint::RemoteEndPoint- End\n", 0);
} /*-- RemoteEndPoint::RemoteEndPoint() --*/
//++=======================================================================
RemoteEndPoint::RemoteEndPoint(
bool multithreaded,
int maxRpcRetries,
unsigned short int port,
uint32_t address) :
m_signature (REMOTE_ENDPOINT_SIGNATURE),
m_numChannelSubmits (0),
m_maxRpcRetries (maxRpcRetries)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L0
//=======================================================================--
{
DbgTrace(1, "RemoteEndPoint::RemoteEndPoint- Start, Obj = %08X\n", this);
// Initialize our mutex
pthread_mutex_init(&m_mutex, NULL);
// Set the necessary information in the serverInAddr variable
m_serverInAddr.sin_family = AF_INET;
m_serverInAddr.sin_port = htons(port);
m_serverInAddr.sin_addr.s_addr = htonl(address);
// Set the necessary flags to indicate that TCP sockets
// should be used for communications.
m_Use_AF_INET = true;
m_Use_PF_UNIX = false;
// Setup the number of channels that we may have based on
// whether the application is multi-threaded or not.
if (multithreaded)
m_numCChannels = MAX_CHANNELS_PER_ENDPOINT;
else
m_numCChannels = 1;
// Instantiate entries in SmartCChannel vector
try {
for (int i = 0; i < m_numCChannels; i++)
m_cchannelVector.push_back(SmartCChannelPointer());
} catch (...) {
DbgTrace(0, "RemoteEndPoint::RemoteEndPoint- Exception caught while initializing the cchannelVector\n", 0);
throw bad_alloc();
}
// Increment the object count
InterlockedIncrement(&numRemoteEndPointObjects);
DbgTrace(1, "RemoteEndPoint::RemoteEndPoint- End\n", 0);
} /*-- RemoteEndPoint::RemoteEndPoint() --*/
//++=======================================================================
RemoteEndPoint::~RemoteEndPoint(void)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L0
//=======================================================================--
{
DbgTrace(1, "RemoteEndPoint::~RemoteEndPoint- Start, Obj = %08X\n", this);
// Clean up all allocated SmartCChannel objects
for (int i = 0; i < m_cchannelVector.size(); i++)
{
if (m_cchannelVector[i].getPointer() != NULL)
{
CChannel *pCChannel = *(m_cchannelVector[i].getPointer());
pCChannel->closeChannel();
}
}
m_cchannelVector.clear();
// Decrement the object count
InterlockedDecrement(&numRemoteEndPointObjects);
DbgTrace(1, "RemoteEndPoint::~RemoteEndPoint- End\n", 0);
} /*-- RemoteEndPoint::~RemoteEndPoint() --*/
//++=======================================================================
SmartCChannel*
RemoteEndPoint::getCChannel(void)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L0
//=======================================================================--
{
SmartCChannel *pSmartCChannel = NULL;
int channelSelector = (m_numChannelSubmits++) % m_numCChannels;
DbgTrace(1, "RemoteEndPoint::getCChannel- Start, Obj = %08X\n", this);
#if DEBUG
assert(m_signature == REMOTE_ENDPOINT_SIGNATURE);
#endif
// Obtain our mutex
pthread_mutex_lock(&m_mutex);
// Check if there is an available and usable channel for the client
if (m_cchannelVector[channelSelector].getPointer() != NULL
&& (*m_cchannelVector[channelSelector].getPointer())->ok())
{
// Use the available channel
pSmartCChannel = new SmartCChannel(*m_cchannelVector[channelSelector].getPointer());
}
else
{
// The channel is either unavailable or unusable, clean up
// the channel if it is indeed unusable.
if (m_cchannelVector[channelSelector].getPointer() != NULL)
{
// Clean up the channel
CChannel *pCChannel = *m_cchannelVector[channelSelector].getPointer();
pCChannel->closeChannel();
delete m_cchannelVector[channelSelector].getPointer();
m_cchannelVector[channelSelector].setPointer(NULL);
}
CChannel *pCChannel;
try {
// Use the appropriate server address when instantiating
// the CChannel object.
if (m_Use_PF_UNIX)
{
// PF_UNIX
pCChannel = new CChannel(&m_serverUnAddr);
}
else
{
// Assume AF_INET
pCChannel = new CChannel(&m_serverInAddr);
}
// CChannel object created, now associate a SmartCChannel
// object with it. It is important to do this to keep
// the object from being deleted as we initialize it.
m_cchannelVector[channelSelector].setPointer(new SmartCChannel(pCChannel));
// Initialize the CChannel
if (pCChannel->init() == 0)
{
// CChannel initialization succeeded, use it to
// satisfy the caller.
pSmartCChannel = new SmartCChannel(*m_cchannelVector[channelSelector].getPointer());
}
else
{
// CChannel initialization failed
delete m_cchannelVector[channelSelector].getPointer();
m_cchannelVector[channelSelector].setPointer(NULL);
}
}
catch (...) {
DbgTrace(0, "getCChannel- Exception caught\n", 0);
// Try to clean things up just in case
if (m_cchannelVector[channelSelector].getPointer())
{
delete m_cchannelVector[channelSelector].getPointer();
m_cchannelVector[channelSelector].setPointer(NULL);
}
else
{
if (pCChannel != NULL)
delete pCChannel;
}
}
}
// Release client mutex
pthread_mutex_unlock(&m_mutex);
DbgTrace(1, "RemoteEndPoint::getCChannel- End\n", 0);
return pSmartCChannel;
} /*-- RemoteEndPoint::getCChannel() --*/
//++=======================================================================
int
RemoteEndPoint::submitReq(
char *pClientData,
int clientDataLen,
char **ppServerData,
int *pServerDataLen)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L0
//=======================================================================--
{
int retStatus = -1;
DbgTrace(1, "RemoteEndPoint::submitReq- Start, Obj = %08X\n", this);
try {
SmartCChannel *pSmartCChannel;
// Perform the following in a loop to deal with abnormal connection terminations
unsigned long rpcRetryCount = 0;
while (rpcRetryCount < m_maxRpcRetries)
{
// Get SmartCChannel
pSmartCChannel = getCChannel();
if (pSmartCChannel != NULL)
{
// Get pointer to channel object
CChannel *pCChannel = *pSmartCChannel;
// Allocate a requestId
uint32_t reqId = pCChannel->allocReqId();
// Allocate client request object.
ClientReq clientReq(reqId);
// Submit the request via the channel
if (pCChannel->submitReq(reqId,
clientReq,
pClientData,
clientDataLen) == 0)
{
// Request submission over the channel succeeded, now
// wait for the completion of the request.
clientReq.waitForCompletion(ppServerData,
pServerDataLen);
// Remove the request from the channel
pCChannel->removeReq(reqId);
// Now proceed based on the completion status
ClientReq::CompletionStatus compStatus = clientReq.completionStatus();
if (compStatus == ClientReq::SuccessCompletionStatus)
{
// Success
retStatus = 0;
}
}
else
{
DbgTrace(0, "RemoteEndPoint::submitReq- Request submittion over the channel failed\n", 0);
// Remove the request from the channel
pCChannel->removeReq(reqId);
}
// Delete the SmartCChannel
delete pSmartCChannel;
}
else
{
DbgTrace(0, "RemoteEndPoint::submitReq- Channel unavailable\n", 0);
}
// Stop trying if the RPC succeeded
if (retStatus == 0)
break;
// Account for this RPC try
rpcRetryCount ++;
DbgTrace(0, "RemoteEndPoint::submitReq- Will attempt to retry RPC, count = %d\n", rpcRetryCount);
}
}
catch(...) {
DbgTrace(0, "RemoteEndPoint::submitReq- Exception caught\n", 0);
}
DbgTrace(1, "RemoteEndPoint::submitReq- End\n", 0);
return retStatus;
} /*-- RemoteEndPoint::submitReq() --*/
//=========================================================================
//=========================================================================

View File

@ -0,0 +1,203 @@
/***********************************************************************
*
* Copyright (C) 2006 Novell, Inc. All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; version 2.1
* of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, Novell, Inc.
*
* To contact Novell about this file by physical or electronic mail,
* you may find current contact information at www.novell.com.
*
* Author: Juan Carlos Luciani <jluciani@novell.com>
*
***********************************************************************/
#ifndef _REMOTEENDPOINT_
#define _REMOTEENDPOINT_
//===[ Include files ]=====================================================
//===[ External data ]=====================================================
//===[ External prototypes ]===============================================
//===[ Manifest constants ]================================================
#define MAX_CHANNELS_PER_ENDPOINT 3
//===[ Type definitions ]==================================================
//===[ Function prototypes ]===============================================
//===[ Global variables ]==================================================
//===[ Type definitions ]==================================================
//
// Server Thread Class
//
class RemoteEndPoint : public ObjRef
{
//
// Class for maintaining SmartCChannel pointers within the daemonVector.
//
class SmartCChannelPointer
{
private:
SmartCChannel *m_pSmartCChannel;
public:
SmartCChannelPointer() : m_pSmartCChannel(NULL) {}
~SmartCChannelPointer() { if (m_pSmartCChannel != NULL) delete m_pSmartCChannel; }
SmartCChannel* getPointer() { return m_pSmartCChannel; }
void setPointer(SmartCChannel *pSmartCChannel) { m_pSmartCChannel = pSmartCChannel; }
};
// Signature
unsigned long m_signature;
// End-point address
bool m_Use_AF_INET;
bool m_Use_PF_UNIX;
struct sockaddr_in m_serverInAddr;
struct sockaddr_un m_serverUnAddr;
// SmartCChannelPointers vector
vector<SmartCChannelPointer> m_cchannelVector;
int m_numCChannels;
// Endpoint mutex
pthread_mutex_t m_mutex;
// Number of submits made to the endpoint
int m_numChannelSubmits;
// Max number of Rpc retries
int m_maxRpcRetries;
public:
//
// Destructor
~RemoteEndPoint(void);
//
// Constructor
//
// Parameters:
// multithreaded (input) -
// Set to TRUE if the process is
// multithreaded.
//
// maxRpcRetries (input) -
// Max Rpc retries.
//
// pSocketFileName (input) -
// Pointer to string containing the name
// of the socket file.
//
// Abstract: Constructs RemoteEndPoint object and initializes it using
// a domain socket file name.
//
// Returns: Nothing.
//
RemoteEndPoint(bool multiThreaded,
int maxRpcRetries,
char *pSocketFileName);
//
// Constructor
//
// Parameters:
// multithreaded (input) -
// Set to TRUE if the process is
// multithreaded.
//
// maxRpcRetries (input) -
// Max Rpc retries.
//
// port (input) -
// Server's listening port number.
//
// address (input) -
// The server's IP Address. Use
// 0x7F000001 if the endpoint is local.
//
// Abstract: Constructs RemoteEndPoint object and initializes it using
// a tcp socket address.
//
// Returns: Nothing.
//
RemoteEndPoint(bool multiThreaded,
int maxRpcRetries,
unsigned short int port,
uint32_t address);
//
// Get a SmartCChannel.
//
// Parameters: None.
//
// Abstract: Gets a SmartCChannel for submitting requests to the
// remote endpoint.
//
// Returns: Pointer to SmartCChannel object if successful, otherwise
// NULL.
//
SmartCChannel *getCChannel(void);
//
// Submit a request to the endpoint,
//
// Parameters:
// pClientData (input) -
// Pointer to client data that must be sent to
// the server. Buffer is NEVER released by the
// procedure.
//
// clientDataLen (input) -
// Length of the client data.
//
// ppServerData (input/output) -
// Pointer to variable that will receive a
// pointer to the buffer containing the data
// received from the server.
//
// pServerDataLen (input/output) -
// Pointer to variable that will receive the
// length of the data received from the server.
//
// Abstract: Method to submit a request.
//
// Returns: 0 == Request completed gracefully
// -1 == Request did not complete gracefully
//
// Note: The routine blocks until the request completes.
//
int submitReq(char *pClientData,
int clientDataLen,
char **ppServerData,
int *pServerDataLen);
};
typedef SmartPtr<RemoteEndPoint> SmartRemoteEndPoint;
//===[ Function prototypes ]===============================================
#endif // _REMOTEENDPOINT_
//=========================================================================
//=========================================================================

View File

@ -74,7 +74,8 @@ bool errorDetected = false;
//++=======================================================================
void* SubmitThread()
void* SubmitThread(
uint32_t remoteEndPointHandle)
//
// Arguments:
//
@ -102,7 +103,8 @@ void* SubmitThread()
pthread_mutex_unlock(&testMutex);
// Submit request to the server
if (IpcClientSubmitReq(reqData,
if (IpcClientSubmitReq(remoteEndPointHandle,
reqData,
strlen(reqData),
&pReplyData,
&replyDataLen) != 0)
@ -148,6 +150,7 @@ ExecuteTests(void)
int threadCreateStatus;
int threadCreatedCount = 0;
int i;
uint32_t endPointHandle;
DbgTrace(1, "ExecuteTests- Start\n", 0);
@ -158,8 +161,10 @@ ExecuteTests(void)
false) == 0)
{
// Set the server listen address
if (IpcClientSetInAddress(5000,
0x7F000001) == 0)
if (IpcClientOpenInetRemoteEndPoint(5000,
0x7F000001,
0,
&endPointHandle) == 0)
{
// Start the configured number of threads to submit requests to
// the server.
@ -168,7 +173,7 @@ ExecuteTests(void)
threadCreateStatus = pthread_create(&thread,
NULL,
(void*(*)(void*))SubmitThread,
(void*)NULL);
(void*)endPointHandle);
if (threadCreateStatus == 0)
threadCreatedCount ++;
else
@ -185,10 +190,16 @@ ExecuteTests(void)
sleep(1);
sleep(1);
}
// Close the remote endpoint
if (IpcClientCloseRemoteEndPoint(endPointHandle) != 0)
{
DbgTrace(0, "ExecuteTests- Error closing remote endpoint\n", 0);
}
}
else
{
DbgTrace(0, "ExecuteTests- Error setting server address\n", 0);
DbgTrace(0, "ExecuteTests- Error opening remote endpoint\n", 0);
}
IpcClientShutdown();