Changes due to continue development of the IpcLibs. Not done yet.

This commit is contained in:
Juan Carlos Luciani 2006-09-01 05:37:43 +00:00
parent 4326223276
commit f45c0f4c9e
10 changed files with 64 additions and 215 deletions

View File

@ -61,7 +61,7 @@ CPPFILES = channelproto.cpp \
CSFILES_CSC :=
INCLUDES = -I. -I.. -I../common -I../../../../include
RESOURCES =
DEFINES += -Wno-format-extra-args -fno-strict-aliasing
DEFINES += -Wno-format-extra-args -fno-strict-aliasing -fshort-wchar
CFLAGS += $(INCLUDES) $(DEFINES)
CPPFLAGS += $(INCLUDES) $(DEFINES)
LIBS = -lpthread -ldl -lexpat

View File

@ -394,7 +394,7 @@ CChannel::connectionThread(
uint32_t reqId;
int payloadLength;
unsigned long totalPayloadBytesReceived = 0;
char reqDataPktHdr[sizeof(ReqDataPktHdrTemplate) - 1];
char reqDataPktHdr[ReqDataPktHdrTemplate.length()];
char *pRecvBuff;
RCMapIter iter;
ClientReq *pClientReq;
@ -420,7 +420,7 @@ CChannel::connectionThread(
{
bytesReceived = recv(pCChannel->m_socket,
reqDataPktHdr,
sizeof(ReqDataPktHdrTemplate) - 1,
sizeof(reqDataPktHdr),
MSG_WAITALL);
if (bytesReceived == SOCKET_ERROR
&& errno == EINTR)
@ -432,7 +432,7 @@ CChannel::connectionThread(
if (bytesReceived != SOCKET_ERROR)
{
// Check if the connection was terminated
if (bytesReceived == (sizeof(ReqDataPktHdrTemplate) - 1))
if (bytesReceived == sizeof(reqDataPktHdr))
{
// Get the payload length
if (ChannelProto::getReqIdAndPayloadLength(reqDataPktHdr,
@ -759,7 +759,7 @@ CChannel::submitReq(
//=======================================================================--
{
int retStatus = -1;
char reqDataPktHdr[sizeof(ReqDataPktHdrTemplate) - 1];
char reqDataPktHdr[ReqDataPktHdrTemplate.length()];
struct msghdr sendmsgHdr = {0};
struct iovec ioVectors[2];
unsigned long bytesSent;
@ -941,140 +941,6 @@ CChannel::removeReq(
} /*-- CChannel::removeReq() --*/
//++=======================================================================
int
CChannel::sendData(
uint32_t reqId,
char *pClientData,
int clientDataLen)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L2
//=======================================================================--
{
int retStatus = -1;
char reqDataPktHdr[sizeof(ReqDataPktHdrTemplate) - 1];
struct msghdr sendmsgHdr = {0};
struct iovec ioVectors[2];
unsigned long bytesSent;
unsigned long totalBytesSent = 0;
unsigned long bytesToSend = sizeof(reqDataPktHdr) + clientDataLen;
DbgTrace(1, "CChannel::sendData- Start, Obj = %08X\n", this);
// Acquire exclusive access to the channel object
pthread_mutex_lock(&m_mutex);
// Verify that the channel is connected
if (m_state == State_Connected)
{
// Build ReqDataHeader
if (ChannelProto::buildReqDataPktHdr(reqId,
clientDataLen,
reqDataPktHdr) == 0)
{
// Packet header was built, now sent it along with the client data to
// the server.
ioVectors[0].iov_base = reqDataPktHdr;
ioVectors[0].iov_len = sizeof(reqDataPktHdr);
ioVectors[1].iov_base = (char*) pClientData;
ioVectors[1].iov_len = clientDataLen;
sendmsgHdr.msg_iov = ioVectors;
sendmsgHdr.msg_iovlen = 2;
while (1)
{
bytesSent = sendmsg(m_socket,
&sendmsgHdr,
MSG_NOSIGNAL);
if (bytesSent == SOCKET_ERROR)
{
// Check if we were interrupted during the transfer
if (errno == EINTR)
{
// Just try again
continue;
}
// An unrecoverable error was encountered during the send operation,
// assume there was a communication failure. Close the socket to make
// sure that the connectionThread cleans up.
DbgTrace(0, "CChannel::sendData- sendmsg error, errno = %d\n", errno);
m_state = State_Disconnected;
shutdown(m_socket, SHUT_RDWR);
struct linger linger_opt = {1, 15};
setsockopt(m_socket, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt));
closesocket(m_socket);
m_socket = INVALID_SOCKET;
break;
}
else
{
// Account for the bytes sent
totalBytesSent += bytesSent;
// Check if we are done sending all of the data
if (totalBytesSent >= bytesToSend)
{
// We are done
break;
}
else
{
// Adjust the ioVector structure to send data not yet sent
if (totalBytesSent >= sizeof(reqDataPktHdr))
{
// The packet header was sent, use only one ioVector.
int clientDataAlreadySent = totalBytesSent - sizeof(reqDataPktHdr);
ioVectors[0].iov_base = (char*) pClientData + clientDataAlreadySent;
ioVectors[0].iov_len = clientDataLen - clientDataAlreadySent;
sendmsgHdr.msg_iov = ioVectors;
sendmsgHdr.msg_iovlen = 1;
}
else
{
// Not all of the packet header was sent, use two ioVectors.
ioVectors[0].iov_base = (char*) reqDataPktHdr + totalBytesSent;
ioVectors[0].iov_len = sizeof(reqDataPktHdr) - totalBytesSent;
ioVectors[1].iov_base = (char*) pClientData;
ioVectors[1].iov_len = clientDataLen;
sendmsgHdr.msg_iov = ioVectors;
sendmsgHdr.msg_iovlen = 2;
}
}
}
}
// Return success even if the send failed to allow things to be cleaned up
// by the connectionThread routine.
retStatus = 0;
}
else
{
DbgTrace(0, "CChannel::sendData- Error building Req Data Pkt Header, Obj = %08X\n", this);
}
}
else
{
DbgTrace(1, "CChannel::sendData- Channel not connected, state = %08X\n", m_state);
}
// Release exclusive access to the channel object
pthread_mutex_unlock(&m_mutex);
DbgTrace(1, "CChannel::sendData- End, retStatus = %08X\n", retStatus);
return retStatus;
} /*-- CChannel::sendData() --*/
//=========================================================================
//=========================================================================

View File

@ -167,7 +167,7 @@ IpcClientInit(
// Note: It is necessary to call the appropriate function to
// set the server address before a request can be submitted.
//
// L0
// L1
//=======================================================================--
{
int retStatus = -1;
@ -259,7 +259,7 @@ IpcClientSetUnAddress(
// Note: The service should have been initialized before calling
// this procedure.
//
// L0
// L1
//=======================================================================--
{
int retStatus = -1;
@ -324,7 +324,7 @@ IpcClientSetInAddress(
// Note: The service should have been initialized before calling
// this procedure.
//
// L0
// L1
//=======================================================================--
{
int retStatus = -1;

View File

@ -160,41 +160,20 @@ ClientReq::processServerData(
// Acquire exclusive access to the object
pthread_mutex_lock(&m_mutex);
try {
// Save server dataetup the ServerData object
m_pServerData = pServerData;
m_serverDataLen = serverDataLength;
// Save server dataetup the ServerData object
m_pServerData = pServerData;
m_serverDataLen = serverDataLength;
// Mark the request as completed
m_completed = true;
// Check if we must awaken the thread that submitted the request
// so that it can service the server data.
if (!m_submitThreadActive)
{
// The submit thread is not active, awaken it.
m_submitThreadActive = true;
pthread_cond_signal(&m_condition);
}
}
catch(...) {
DbgTrace(0, "ClientReq::processServerData- Exception caught, Obj = %08X\n", this);
// Free the server data buffer
delete[] pServerData;
// Record that we suffered an internal problem and mark the
// request as completed.
m_internalProblem = true;
m_completed = true;
// Check if we must awaken the thread that submitted the request
// so that it can deal with the problem.
if (!m_submitThreadActive)
{
// The submit thread is not active, awaken it.
m_submitThreadActive = true;
pthread_cond_signal(&m_condition);
}
// Check if we must awaken the thread that submitted the request
// so that it can service the server data.
if (!m_submitThreadActive)
{
// The submit thread is not active, awaken it.
m_submitThreadActive = true;
pthread_cond_signal(&m_condition);
}
// Release exclusive access to the object

View File

@ -181,7 +181,7 @@ ExecuteTests(void)
if (threadCreatedCount != 0)
{
while (submitReqCount
&& !errorDatected)
&& !errorDetected)
sleep(1);
sleep(1);
}

View File

@ -100,33 +100,45 @@ extern pthread_mutex_t interlockedMutex;
__inline static unsigned long
InterlockedIncrement(unsigned long *pValue)
{
unsigned long retVal;
pthread_mutex_lock(&interlockedMutex);
*pValue ++;
(*pValue) ++;
retVal = *pValue;
pthread_mutex_unlock(&interlockedMutex);
return retVal;
}
__inline static unsigned long
InterlockedDecrement(unsigned long *pValue)
{
unsigned long retVal;
pthread_mutex_lock(&interlockedMutex);
*pValue --;
(*pValue) --;
retVal = *pValue;
pthread_mutex_unlock(&interlockedMutex);
return retVal;
}
__inline static uint32_t
InterlockedIncrement(uint32_t *pValue)
{
uint32_t retVal;
pthread_mutex_lock(&interlockedMutex);
*pValue ++;
(*pValue) ++;
retVal = *pValue;
pthread_mutex_unlock(&interlockedMutex);
return retVal;
}
__inline static uint32_t
InterlockedDecrement(uint32_t *pValue)
{
uint32_t retVal;
pthread_mutex_lock(&interlockedMutex);
*pValue --;
(*pValue) --;
retVal = *pValue;
pthread_mutex_unlock(&interlockedMutex);
return retVal;
}
//===[ Include files ]=====================================================

View File

@ -18,8 +18,6 @@
* To contact Novell about this file by physical or electronic mail,
* you may find current contact information at www.novell.com.
*
* Author: Juan Carlos Luciani <jluciani@novell.com>
*
***********************************************************************/
#ifndef SMARTPTR_H
@ -56,12 +54,12 @@ class ObjRef
void IncRefCount(void)
{
InterlockedIncrement((unsigned long*)&m_Count);
InterlockedIncrement(&m_Count);
}
bool DecRefCount(void)
{
if ((m_Count > 0) && (InterlockedDecrement((unsigned long*)&m_Count) == 0))
if ((m_Count > 0) && (InterlockedDecrement(&m_Count) == 0))
{
return true;
}
@ -81,9 +79,7 @@ class ObjRef
//
private:
// BUGBUG!! - Need to put a lock in here so the count can be updated atomically.
// or use an interlocked inc/dec if one exists.
mutable unsigned int m_Count;
mutable unsigned long m_Count;
};
@ -273,8 +269,8 @@ inline void SmartPtr<T>::resetPtr(T* newPtr)
} // End of SmartPtr::resetPtr()
#endif // SMARTPTR_H
/******************************************************************************/
/******************************************************************************/

View File

@ -62,7 +62,7 @@ CPPFILES = channelproto.cpp \
CSFILES_CSC :=
INCLUDES = -I. -I.. -I../common -I../../../../include
RESOURCES =
DEFINES += -Wno-format-extra-args -fno-strict-aliasing
DEFINES += -Wno-format-extra-args -fno-strict-aliasing -fshort-wchar
CFLAGS += $(INCLUDES) $(DEFINES)
CPPFLAGS += $(INCLUDES) $(DEFINES)
LIBS = -lpthread -ldl -lexpat

View File

@ -558,6 +558,8 @@ SChannel::sendReplyData(
ioVectors[1].iov_len = serverDataLen;
sendmsgHdr.msg_iov = ioVectors;
sendmsgHdr.msg_iovlen = 2;
printf("SChannel::sendReplyData- Sending %d header bytes\n", sizeof(reqDataPktHdr));
printf("SChannel::sendReplyData- Sending %d payload bytes\n", serverDataLen);
while (1)
{
bytesSent = sendmsg(m_socket,
@ -619,14 +621,11 @@ SChannel::sendReplyError(
//
// Notes:
//
// L0
// L1
//=======================================================================--
{
int retStatus = -1;
char reqErrorPktHdr[ReqErrorPktHdrTemplate.length()];
struct msghdr sendmsgHdr = {0};
struct iovec ioVectors[2];
unsigned long bytesSent;
DbgTrace(1, "SChannel::sendReplyError- Start, Obj = %08X\n", this);

View File

@ -134,10 +134,10 @@ ServiceRequest(
//
// Notes:
//
// L0
// L1
//=======================================================================--
{
int retStatus = -1;
int retStatus = -1;
DbgTrace(1, "ServiceRequest- Start\n", 0);
@ -197,7 +197,7 @@ AbortPendingRequests(void)
//
// Notes:
//
// L0
// L2
//=======================================================================--
{
DbgTrace(1, "AbortPendingRequests- Start\n", 0);
@ -237,7 +237,7 @@ AwakenSuspendedServerThreads(void)
//
// Notes:
//
// L0
// L2
//=======================================================================--
{
DbgTrace(1, "AwakenSuspendedServerThreads- Start\n", 0);
@ -662,7 +662,7 @@ void* ServiceConnectionsThread(void)
//
// Notes:
//
// L0
// L2
//=======================================================================--
{
DbgTrace(1, "ServiceConnectionsThread- Start\n", 0);
@ -707,7 +707,7 @@ IpcServerGetRequest(void)
// An application can execute this method from multiple
// threads to allow requests to be process concurrently.
//
// L0
// L1
//=======================================================================--
{
int32_t requestId = 0;
@ -724,7 +724,7 @@ IpcServerGetRequest(void)
try {
// Instantiate ServerThread object
ServerThread *pServerThread = new ServerThread();
ServerThread serverThread;
// Obtain server mutex
pthread_mutex_lock(&serverMutex);
@ -759,7 +759,7 @@ IpcServerGetRequest(void)
nextReqId = 2;
}
// Place the request in the pending request map
// Place the request in the active request map
RSIterBoolPair insertResult = rsMap.insert(make_pair(requestId, pServerReq));
if (!insertResult.second)
{
@ -786,17 +786,14 @@ IpcServerGetRequest(void)
{
// There is not a request for us to process, place us on the waiting
// server thread list and wait to be awaken.
waitingServerThreadList.push_back(pServerThread);
waitingServerThreadList.push_back(&serverThread);
waitingServerThreads ++;
pServerThread->suspend(&serverMutex);
serverThread.suspend(&serverMutex);
}
}
// Release server mutex
pthread_mutex_unlock(&serverMutex);
// Free ServerThread object
delete pServerThread;
}
catch (...) {
@ -833,7 +830,7 @@ IpcServerGetRequestData(
// Notes: The returned buffer SHOULD NOT be released by the calling
// application.
//
// L0
// L1
//=======================================================================--
{
int32_t reqDataLen = 0;
@ -899,7 +896,7 @@ IpcServerCompleteRequest(
//
// Notes: The returned buffer will not NOT be released by the method.
//
// L0
// L1
//=======================================================================--
{
DbgTrace(1, "IpcServerCompleteRequest- Start, requestId = %08X\n", requestId);
@ -961,7 +958,7 @@ IpcServerAbortRequest(
//
// Notes:
//
// L0
// L1
//=======================================================================--
{
DbgTrace(1, "IpcServerAbortRequest- Start, requestId = %08X\n", requestId);
@ -1024,7 +1021,7 @@ IpcServerStart(void)
// Note: The service needs to be initialized and the listen address
// needs to be set before calling this procedure.
//
// L0
// L2
//=======================================================================--
{
int retStatus = -1;
@ -1088,7 +1085,7 @@ IpcServerSetUnAddress(
//
// Note: The service needs to be initialized before calling this procedure.
//
// L0
// L1
//=======================================================================--
{
int retStatus = -1;
@ -1097,7 +1094,7 @@ IpcServerSetUnAddress(
// Verify the input parameters
if (pSocketFileName == NULL
|| strlen(pSocketFileName) >= sizeof(pSocketFileName))
|| strlen(pSocketFileName) >= sizeof(listenSocketFile))
{
DbgTrace(0, "IpcServerSetUnAddress- Invalid input parameter\n", 0);
goto exit;
@ -1157,7 +1154,7 @@ IpcServerSetInAddress(
//
// Note: The service needs to be initialized before calling this procedure.
//
// L0
// L2
//=======================================================================--
{
int retStatus = -1;
@ -1237,7 +1234,7 @@ IpcServerInit(
// Note: It is necessary to call the start procedure to start
// servicing requests.
//
// L0
// L2
//=======================================================================--
{
int retStatus = -1;
@ -1299,7 +1296,7 @@ IpcServerShutdown(void)
//
// Note:
//
// L0
// L2
//=======================================================================--
{
DbgTrace(1, "IpcServerShutdown- Start\n", 0);