Reverted the forked process model changes after finding out that

they needed to be more involved (Each child process needed to
initialize the JVM) and because it introduced a whole set of
new problems that were not acceptible. Instead, now we will support
an optional single-threaded process model that can be invoked to
deal with JVMs that have trouble executing when invoked from a
thread.
This commit is contained in:
Juan Carlos Luciani 2006-11-15 05:06:54 +00:00
parent 075d5d6da2
commit f90f3d91bf
2 changed files with 151 additions and 451 deletions

View File

@ -45,7 +45,6 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include <pthread.h> #include <pthread.h>
#include <syslog.h> #include <syslog.h>
#include <sys/mman.h>
#include <signal.h> #include <signal.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/file.h> #include <sys/file.h>

View File

@ -38,40 +38,17 @@
#define DOMAIN_SOCKET_FILE_NAME "/var/lib/CASA/authtoken/validate/socket" #define DOMAIN_SOCKET_FILE_NAME "/var/lib/CASA/authtoken/validate/socket"
#define SHARED_DATA_SZ_INCREMENT 512
//===[ Type definitions ]================================================== //===[ Type definitions ]==================================================
typedef
void*
(*WorkerThreadType)(void*);
typedef struct _WorkerSharedSeg
{
pthread_mutex_t mutexReply;
pthread_mutex_t mutexRequest;
bool childTerminate;
int compStatus;
int dataLenAllowed;
int dataLen;
char data[1];
} WorkerSharedSeg, *PWorkerSharedSeg;
//===[ Function prototypes ]=============================================== //===[ Function prototypes ]===============================================
void* void*
WorkerThread(void*); WorkerThread(void*);
void*
WorkerThreadForked(void*);
//===[ Global variables ]================================================== //===[ Global variables ]==================================================
// Usage string // Usage string
char usage[] = "\nCasaAuthtokenValidateD: usage: [-p ListenPort] [-b BeginThreads] [-g GrowThreads] [-m MaxThreads] [-D DebugLevel] [-d] [-f]\n"; char usage[] = "\nCasaAuthtokenValidateD: usage: [-p ListenPort] [-b BeginThreads] [-g GrowThreads] [-m MaxThreads] [-D DebugLevel] [-d] [-s]\n";
// Worker thread pool configuration parameters // Worker thread pool configuration parameters
int beginThreads = 5; int beginThreads = 5;
@ -109,17 +86,12 @@ pthread_mutex_t serverMutex;
pthread_cond_t serverCondition; pthread_cond_t serverCondition;
// Operating parameters // Operating parameters
bool singleThreaded = false;
bool terminating = false; bool terminating = false;
// Java parameters // Java parameters
JavaVM *g_jvm = NULL; JavaVM *g_jvm = NULL;
JNIEnv *g_env = NULL; JNIEnv *g_env = NULL;
bool g_execJavaFromForkedProcess = false;
// Parameters only utilized if g_execJavaFromForkedProcess is true
int g_sharedDataSz = 0;
jclass g_helperClass;
jmethodID g_mId;
char classpath[] = "-Djava.class.path=/usr/share/java/CASA/authtoken/CasaAuthToken.jar:/usr/share/java/CASA/authtoken/external/axis.jar:/usr/share/java/CASA/authtoken/external/axis-ant.jar:/usr/share/java/CASA/authtoken/external/commons-discovery-0.2.jar:/usr/share/java/CASA/authtoken/external/commons-logging-1.0.4.jar:/usr/share/java/CASA/authtoken/external/jaxrpc.jar:/usr/share/java/CASA/authtoken/external/log4j-1.2.8.jar:/usr/share/java/CASA/authtoken/external/saaj.jar:/usr/share/java/CASA/authtoken/external/wsdl4j-1.5.1.jar:/usr/share/java/CASA/authtoken/external/wss4j-1.5.0.jar:/usr/share/java/CASA/authtoken/external/xalan.jar:/usr/share/java/CASA/authtoken/external/xercesImpl.jar:/usr/share/java/CASA/authtoken/external/xml-apis.jar:/usr/share/java/CASA/authtoken/external/xmlsec-1.2.1.jar:/usr/share/java/CASA/authtoken/external:/etc/CASA/authtoken/keys/client"; char classpath[] = "-Djava.class.path=/usr/share/java/CASA/authtoken/CasaAuthToken.jar:/usr/share/java/CASA/authtoken/external/axis.jar:/usr/share/java/CASA/authtoken/external/axis-ant.jar:/usr/share/java/CASA/authtoken/external/commons-discovery-0.2.jar:/usr/share/java/CASA/authtoken/external/commons-logging-1.0.4.jar:/usr/share/java/CASA/authtoken/external/jaxrpc.jar:/usr/share/java/CASA/authtoken/external/log4j-1.2.8.jar:/usr/share/java/CASA/authtoken/external/saaj.jar:/usr/share/java/CASA/authtoken/external/wsdl4j-1.5.1.jar:/usr/share/java/CASA/authtoken/external/wss4j-1.5.0.jar:/usr/share/java/CASA/authtoken/external/xalan.jar:/usr/share/java/CASA/authtoken/external/xercesImpl.jar:/usr/share/java/CASA/authtoken/external/xml-apis.jar:/usr/share/java/CASA/authtoken/external/xmlsec-1.2.1.jar:/usr/share/java/CASA/authtoken/external:/etc/CASA/authtoken/keys/client";
// Java AuthenticationToken Class and method name // Java AuthenticationToken Class and method name
@ -129,6 +101,127 @@ char authTokenClassName[] = "com/novell/casa/authtoksvc/AuthToken";
char authTokenClassValidateMethodName[] = "validate"; char authTokenClassValidateMethodName[] = "validate";
//++=======================================================================
void
ServiceRequests(void)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L2
//=======================================================================--
{
DbgTrace(1, "ServiceRequests- Start\n", 0);
// We are now attached to the JVM, find the helper class that
// we need.
jclass helperClass = g_env->FindClass(authTokenClassName);
if (helperClass)
{
// Helper class found, now get the id of the method that we invoke
jmethodID mId = g_env->GetStaticMethodID(helperClass,
authTokenClassValidateMethodName,
"(Ljava/lang/String;)Ljava/lang/String;");
if (mId)
{
// Loop until told to terminate
while (!terminating)
{
// Get a request that needs servicing
int32_t requestId = IpcServerGetRequest();
if (requestId != 0)
{
// We got a request that needs servicing, now get the
// data associated with it.
char *pReqData;
int dataLen = IpcServerGetRequestData(requestId, &pReqData);
if (dataLen != 0)
{
// Lets push the jvm local frame to allow us to clean up our local
// references later.
g_env->PushLocalFrame(10);
// Encapsulate the request data into a string object
jstring inString = g_env->NewStringUTF(pReqData);
if (inString)
{
// Invoke our helper method
jstring outString = (jstring) g_env->CallStaticObjectMethod(helperClass, mId, inString);
// Check if an excption occurred
if (g_env->ExceptionCheck() == JNI_TRUE)
{
// There is a pending exception, display the info which in turn clears it.
g_env->ExceptionDescribe();
IpcServerAbortRequest(requestId);
}
else
{
if (outString)
{
// The helper method succeded, complete the request
// with the data returned.
const char *pOutChars = g_env->GetStringUTFChars(outString, NULL);
if (pOutChars)
{
IpcServerCompleteRequest(requestId, (char*) pOutChars);
g_env->ReleaseStringUTFChars(outString, pOutChars);
}
else
{
DbgTrace(0, "ServiceRequests- Unable to get UTF characters\n", 0);
IpcServerAbortRequest(requestId);
}
}
else
{
// The helper method failed, just abort the request.
IpcServerAbortRequest(requestId);
}
}
}
else
{
DbgTrace(0, "ServiceRequests- UTF String allocation failure\n", 0);
IpcServerAbortRequest(requestId);
}
// Pop the jvm local frame to clean up our local references
g_env->PopLocalFrame(NULL);
}
else
{
DbgTrace(0, "ServiceRequests- Error obtaining Request data\n", 0);
IpcServerAbortRequest(requestId);
}
}
else
{
// No need to service requests any longer
break;
}
}
}
else
{
DbgTrace(0, "ServiceRequests- Failed to get method id\n", 0);
}
}
else
{
DbgTrace(0, "ServiceRequests- Failed to find helper class\n", 0);
}
DbgTrace(1, "ServiceRequests- End\n", 0);
} /*-- ServiceRequests() --*/
//++======================================================================= //++=======================================================================
void void
GrowWorkerThreadPool(int growNumber) GrowWorkerThreadPool(int growNumber)
@ -145,16 +238,8 @@ GrowWorkerThreadPool(int growNumber)
// L2 // L2
//=======================================================================-- //=======================================================================--
{ {
WorkerThreadType worker;
DbgTrace(1, "GrowWorkerThreadPool- Start\n", 0); DbgTrace(1, "GrowWorkerThreadPool- Start\n", 0);
// Determine what worker thread routine to utilze
if (g_execJavaFromForkedProcess)
worker = (void*(*)(void*)) WorkerThreadForked;
else
worker = (void*(*)(void*)) WorkerThread;
for (int i = 0; i < growNumber; i++) for (int i = 0; i < growNumber; i++)
{ {
int threadCreateStatus; int threadCreateStatus;
@ -162,7 +247,7 @@ GrowWorkerThreadPool(int growNumber)
if ((threadCreateStatus = pthread_create(&thread, if ((threadCreateStatus = pthread_create(&thread,
NULL, NULL,
worker, (void*(*)(void*))WorkerThread,
NULL) == 0)) NULL) == 0))
{ {
// Worker thread created // Worker thread created
@ -334,6 +419,7 @@ WorkerThread(void*)
{ {
// There is a pending exception, display the info which in turn clears it. // There is a pending exception, display the info which in turn clears it.
env->ExceptionDescribe(); env->ExceptionDescribe();
IpcServerAbortRequest(requestId);
} }
else else
{ {
@ -435,369 +521,6 @@ WorkerThread(void*)
} /*-- WorkerThread() --*/ } /*-- WorkerThread() --*/
//++=======================================================================
WorkerSharedSeg*
GetWorkerSharedSeg(int dataLen)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L0
//=======================================================================--
{
WorkerSharedSeg *pSharedSeg = NULL;
int fd;
DbgTrace(1, "GetWorkerSharedSeg- Start\n", 0);
// Check if the Determine the data size that must be handled
if (dataLen > g_sharedDataSz)
{
pthread_mutex_lock(&serverMutex);
// Increment shared data size until it is large enough
while (g_sharedDataSz < dataLen)
{
g_sharedDataSz += SHARED_DATA_SZ_INCREMENT;
}
pthread_mutex_unlock(&serverMutex);
}
// Open dev/zero
fd = open("/dev/zero", O_RDWR);
if (fd)
{
int dataLenAllowed = g_sharedDataSz; // Save in a local variable to avoid
// having another thread change it while
// we need the value to remain constant.
// Map dev/zero into memory
pSharedSeg = (WorkerSharedSeg*) mmap(NULL,
dataLenAllowed + sizeof(WorkerSharedSeg) - 1,
PROT_READ | PROT_WRITE, MAP_SHARED,
fd,
0);
close(fd);
if (pSharedSeg != MAP_FAILED)
{
// Remember the size of the dataLenAllowed
pSharedSeg->dataLenAllowed = dataLenAllowed;
// Initialize the needed mutexes in the shared data segment
pthread_mutex_init(&pSharedSeg->mutexReply, NULL);
pthread_mutex_init(&pSharedSeg->mutexRequest, NULL);
}
else
{
DbgTrace(0, "GetWorkerSharedSeg- Failed to mmap, error = %d\n", errno);
pSharedSeg = NULL;
}
}
else
{
DbgTrace(0, "GetWorkerSharedSeg- Failed to open /dev/zero, error = %d\n", errno);
}
DbgTrace(1, "GetWorkerSharedSeg- End, pSharedSeg = %0X\n", pSharedSeg);
return pSharedSeg;
} /*-- GetWorkerSharedSeg() --*/
//++=======================================================================
void
RelWorkerSharedSeg(WorkerSharedSeg *pSharedSeg)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L0
//=======================================================================--
{
DbgTrace(1, "RelWorkerSharedSeg- Start\n", 0);
pthread_mutex_destroy(&pSharedSeg->mutexReply);
pthread_mutex_destroy(&pSharedSeg->mutexRequest);
munmap(pSharedSeg, pSharedSeg->dataLenAllowed + sizeof(WorkerSharedSeg) - 1);
DbgTrace(1, "RelWorkerSharedSeg- End\n", 0);
} /*-- RelWorkerSharedSeg() --*/
//++=======================================================================
void*
WorkerThreadForked(void*)
//
// Arguments:
//
// Returns:
//
// Abstract:
//
// Notes:
//
// L0
//=======================================================================--
{
bool perishingThread = false;
WorkerSharedSeg *pSharedSeg = NULL;
DbgTrace(1, "WorkerThreadForked- Start\n", 0);
// Set the thread in the detached state so that it is cleaned up when it exits
pthread_detach(pthread_self());
// Loop until told to terminate
while (!terminating)
{
// Get a request that needs servicing
int32_t requestId = IpcServerGetRequest();
if (requestId != 0)
{
// We got a request that needs servicing, now get the
// data associated with it.
char *pReqData;
int dataLen = IpcServerGetRequestData(requestId, &pReqData);
if (dataLen != 0)
{
bool okToProcessRequest = true;
// Indicate that we are now busy
WorkerThreadBusy();
// Deal with shared data segment to small to process request
if (pSharedSeg
&& pSharedSeg->dataLenAllowed < dataLen)
{
// We already have a shared data segment that is too small
// to process the current request, request helper child to
// terminate itself and release the shared data segment.
pSharedSeg->childTerminate = true;
pthread_mutex_unlock(&pSharedSeg->mutexRequest);
pthread_mutex_lock(&pSharedSeg->mutexReply);
// The child terminated, now release the shared segment
RelWorkerSharedSeg(pSharedSeg);
pSharedSeg = NULL;
}
// Check if we do not yet have a shared data segment
if (pSharedSeg == NULL)
{
// Get shared data segment
pSharedSeg = GetWorkerSharedSeg(dataLen);
if (pSharedSeg)
{
pid_t pid;
// Shared data segment created, spawn helper child
// after acquiring the request and reply mutexes
pthread_mutex_lock(&pSharedSeg->mutexRequest);
pthread_mutex_lock(&pSharedSeg->mutexReply);
pid = fork();
if (pid == 0) // If helper child
{
bool childTerminate = false;
// Help parent until asked to terminate
while (childTerminate == false)
{
// Wait for a request
pthread_mutex_lock(&pSharedSeg->mutexRequest);
// Check if we are being requested to terminate
if (!pSharedSeg->childTerminate)
{
// Not being requested to terminate.
//
// Lets push the jvm local frame to allow us to clean up our local
// references later.
g_env->PushLocalFrame(10);
// Initialize the completion status to -1 (Failure).
pSharedSeg->compStatus = 0;
// Encapsulate the request data into a string object (We are
// dealing with a NULL terminated string).
jstring inString = g_env->NewStringUTF(pSharedSeg->data);
if (inString)
{
// Invoke our helper method
jstring outString = (jstring) g_env->CallStaticObjectMethod(g_helperClass, g_mId, inString);
// Check if an excption occurred
if (g_env->ExceptionCheck() == JNI_TRUE)
{
// There is a pending exception, display the info which in turn clears it.
g_env->ExceptionDescribe();
}
else
{
if (outString)
{
// The helper method succeded, complete the request
// with the data returned.
const char *pOutChars = g_env->GetStringUTFChars(outString, NULL);
if (pOutChars)
{
// Determine the length of the reply data
pSharedSeg->dataLen = strlen(pOutChars) + 1;
// Verify that the shared data segment can handle
// the reply data. Our assumption is that this should
// always be true for the operation being performed.
if (pSharedSeg->dataLen <= pSharedSeg->dataLenAllowed)
{
// Copy the reply data to the shared data segment
memcpy(pSharedSeg->data, pOutChars, pSharedSeg->dataLen);
// Reply successfully processed
pSharedSeg->compStatus = 0;
}
else
{
DbgTrace(0, "WorkerThreadForked- Reply data too large for Shared data segment\n", 0);
}
g_env->ReleaseStringUTFChars(outString, pOutChars);
}
else
{
DbgTrace(0, "WorkerThreadForked- Unable to get UTF characters\n", 0);
}
}
else
{
// The helper method failed, just abort the request.
DbgTrace(1, "WorkerThreadForked- Helper method failed\n", 0);
}
}
}
else
{
DbgTrace(0, "WorkerThreadForked- UTF String allocation failure\n", 0);
}
// Pop the jvm local frame to clean up our local references
g_env->PopLocalFrame(NULL);
}
else
{
// We are being requested to terminate
childTerminate = true;
}
// Let parent know that we are done with the request
pthread_mutex_unlock(&pSharedSeg->mutexReply);
}
// Child exit
exit(0);
}
else if (pid == -1)
{
DbgTrace(0, "WorkerThreadForked- Fork failed, error = %d\n", errno);
okToProcessRequest = false;
}
}
else
{
{
DbgTrace(0, "WorkerThreadForked- Failed to get shared data segment\n", 0);
okToProcessRequest = false;
}
}
}
// Check if we should continue processing the request
if (okToProcessRequest)
{
// Copy the request data onto the shared data segment
memcpy(pSharedSeg->data, pReqData, dataLen);
// Invoke the services of our child helper and
// wait for it to post its reply.
pthread_mutex_unlock(&pSharedSeg->mutexRequest);
pthread_mutex_lock(&pSharedSeg->mutexReply);
// Proceed based on the completion status
if (pSharedSeg->compStatus == 0)
{
IpcServerCompleteRequest(requestId, pSharedSeg->data);
}
else
{
// Request processing failed, abort the request.
IpcServerAbortRequest(requestId);
}
}
else
{
// Abort the request
IpcServerAbortRequest(requestId);
}
// Indicate that we are no longer busy and get indication of
// whether or not we should continue to try to process requests.
if (WorkerThreadWaiting() == true)
{
DbgTrace(1, "WorkerThreadForked- Requested to terminate\n", 0);
// Remember that we are a perishing thread so that we can reduce the
// count as we exit.
perishingThread = true;
break;
}
}
else
{
DbgTrace(0, "WorkerThreadForked- Error obtaining Request data\n", 0);
IpcServerAbortRequest(requestId);
}
}
else
{
// No need to service requests any longer
break;
}
}
// Decrement the number of worker threads and signal our main thread
// to terminate itself if we are the last worker thread.
pthread_mutex_lock(&serverMutex);
if (perishingThread)
numPerishingThreads --;
numThreads --;
if (numThreads == 0)
pthread_cond_signal(&serverCondition);
pthread_mutex_unlock(&serverMutex);
DbgTrace(1, "WorkerThread- End\n", 0);
// Exit
pthread_exit(NULL);
return 0; // never-reached!
} /*-- WorkerThreadForked() --*/
//++======================================================================= //++=======================================================================
void void
SigTermHandler( SigTermHandler(
@ -860,37 +583,8 @@ InitJavaInvoke(void)
vm_args.ignoreUnrecognized = true; vm_args.ignoreUnrecognized = true;
if (JNI_CreateJavaVM(&g_jvm, (void**)&g_env, &vm_args) >= 0) if (JNI_CreateJavaVM(&g_jvm, (void**)&g_env, &vm_args) >= 0)
{ {
// Do a little more work if executing Java from forked processes // Success
if (g_execJavaFromForkedProcess) retStatus = 0;
{
// Find the helper class that we need.
g_helperClass = g_env->FindClass(authTokenClassName);
if (g_helperClass)
{
// Helper class found, now get the id of the method that we invoke
g_mId = g_env->GetStaticMethodID(g_helperClass,
authTokenClassValidateMethodName,
"(Ljava/lang/String;)Ljava/lang/String;");
if (g_mId)
{
// Success
retStatus = 0;
}
else
{
DbgTrace(0, "InitJavaInvoke- Error obtaining method id\n", 0);
}
}
else
{
DbgTrace(0, "InitJavaInvoke- Error obtaining helper class\n", 0);
}
}
else
{
// Success
retStatus = 0;
}
} }
else else
{ {
@ -1108,7 +802,7 @@ main(
while (!doneScanning) while (!doneScanning)
{ {
opterr = 0; opterr = 0;
option = getopt(argc, argv, "m:p:b:g:D:df"); option = getopt(argc, argv, "m:p:b:g:D:ds");
// Proceed based on the result // Proceed based on the result
switch (option) switch (option)
@ -1152,13 +846,13 @@ main(
optionsSpecified ++; optionsSpecified ++;
break; break;
case 'f': case 's':
// Execute Java from a forked process // Run single threaded
g_execJavaFromForkedProcess = true; singleThreaded = true;
optionsSpecified ++; optionsSpecified ++;
break; break;
case 'D': case 'D':
// Set the debug level // Set the debug level
DebugLevel = atoi(optarg); DebugLevel = atoi(optarg);
@ -1192,10 +886,6 @@ main(
// Set a handler for SIGTERM // Set a handler for SIGTERM
signal(SIGTERM, SigTermHandler); signal(SIGTERM, SigTermHandler);
// Set to ignore SIGCHLD if executing Java from forked processes
if (g_execJavaFromForkedProcess)
sigignore(SIGCHLD);
// Initialize our mutexes // Initialize our mutexes
pthread_mutex_init(&interlockedMutex, NULL); pthread_mutex_init(&interlockedMutex, NULL);
pthread_mutex_init(&serverMutex, NULL); pthread_mutex_init(&serverMutex, NULL);
@ -1224,17 +914,28 @@ main(
// Now start the IPC server // Now start the IPC server
if (IpcServerStart() == 0) if (IpcServerStart() == 0)
{ {
// Acquire our mutex // Proceed according to run model configured
pthread_mutex_lock(&serverMutex); if (singleThreaded)
{
// Runninf in single-threaded model
ServiceRequests();
}
else
{
// Running in multi-threaded model
//
// Acquire our mutex
pthread_mutex_lock(&serverMutex);
// Start worker threads // Start worker threads
GrowWorkerThreadPool(beginThreads); GrowWorkerThreadPool(beginThreads);
// Wait for the worker threads to terminate // Wait for the worker threads to terminate
pthread_cond_wait(&serverCondition, &serverMutex); pthread_cond_wait(&serverCondition, &serverMutex);
// Release our mutex // Release our mutex
pthread_mutex_unlock(&serverMutex); pthread_mutex_unlock(&serverMutex);
}
DbgTrace(0, "main- Exiting, numThreads = %d\n", numThreads); DbgTrace(0, "main- Exiting, numThreads = %d\n", numThreads);
} }