archie/tcl-dp/generic/dpRPC.c
2024-05-27 16:13:40 +02:00

1589 lines
44 KiB
C
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* generic/dpRPC.c
*
* Todo:
* Make activation records dynamic structure.
* Change DpRPC/DpRDO to use diff parameters. Parsing should be done
* in dpCmds.c
*
* This file contains code for sending and receiving Tcl-DP
* RPCs and RDOs. All of this code is OS and, to some extent,
* transport layer independent.
*
* An RPC/RDO is a message formatted as follows:
*
* ---------------------------------------------------------------------
* | len:6 | space:1 | tok:1 | space:1 | id:6 | space:1 | msg:len-16 |
* ---------------------------------------------------------------------
*
* Length is a 6 byte character string that gives the length
* of the rest of the message (including the header)
* as a decimal integer. Whitespace is prepended to pad it to
* 6 bytes if the message length is less than 100,000 bytes.
*
* Tok is a character that tells what this packet
* represents in RPC/RDO terms. It can be one of four
* values, one each for RPC, RDO, error, or return value
* messages. See the #defines for TOK_* below for details.
*
* A space follows the token, followed by a 6 digit numeric
* identifier, formatted as a decimal string with leading
* whitespace padding. The id field uniquely identifies the
* RPC call, and is returned to the sender with the RPC's
* return value or error. The id field is ignored for RDO calls.
*
* A space is tacked on after that and then the message
* follows. For RPC and RDO messages, the message is a
* Tcl command string to evaluate. For return values,
* message contains a Tcl return value. For errors, message
* contains a Tcl list with the errorCode and errorInfo variables
* set.
*
* When a message comes in, ReadRPCChannel is called. Its
* job is to read the available messages from the channel and
* process each message. It first reads all available input
* from the channel. There are 3 cases:
* o if the message is short, the input is buffered, and
* ReadRPCChannel returns.
* o if the input is badly formatted (invalid length field, or
* bad token), or some other error occurs, the channel is closed.
* o otherwise, each complete message is processed using
* ProcessRPCMessage. Any remaining incomplete data is
* buffered.
*
* ProcessRPCMessage processes an incoming RPC, RDO, error, or
* return value message. There are 4 cases:
* o if the message is an RPC message, the global variable dp_rpcFile
* is set to the channel identifier and the string is evaluated.
* Security checks, if any, are performed at this point.
* If an error code is generated, a return message is sent on
* the channel consisting of the errorCode and errorInfo variable,
* formatted as a Tcl list. If no error is generated, the return
* value is sent on the channel.
* o if the message is an RDO message, processing proceeds as with
* RPC, but no return value is sent in any case (if a return value
* is requested in an RDO, the evaluated string will contain a
* call to dp_RDO).
* o if the message is a return value or error message, the message
* is stored in the activeRPCs global variable, which stores the
* values that dp_RPC should return in interp->result and the
* errorCode (typically, TCL_OK or TCL_ERROR).
*
* When an RPC is sent, the message is sent (along with a unique id),
* and the process typically enters an event loop, waiting for the
* return message or error. The id is used as an index in the
* activeRPCs global variable. When the return value (or error)
* comes back, the waiting field for the coreesponding RPC is cleared.
* If a timeout occurs, the timedOut field will be set, in which case
* the RPC will return (with an appropriate message).
*
* Timeouts cause a garbage collection problem in the activeRPCs
* structure. Since each RPC corresponds to a unique outstanding
* RPC message, only 1000 RPCs can be active at any time. If
* RPCs time out, then that slot in the activeRPCs structure can't
* be reused until the RPC returns. If the RPC never returns, we
* still need to eventually free them. We know we can free a slot
* if a return value message comes back (with the appropriate id) or
* when the associated channel closes.
*
* To solve this problem, we store the time the RPC was sent in the
* slot. If we ever run out of slots, the oldest timed out slots
* will be reclaimed.
*/
#include "generic/dpInt.h"
#include "tclInt.h"
/*
* If DBG() is defined, you will get ALOT of garbage on stdout
* cooresponding to what is happening in the RPC subsystem.
*/
#ifndef DEBUG
#define DBG(x)
#else
#define DBG(x) x
#endif
/*
* The following tokens appear as the first characters
* in the RPC messages which are transmitted between processes.
*/
#define TOK_RPC 'e'
#define TOK_RDO 'd'
#define TOK_RET 'r'
#define TOK_ERR 'x'
#define NO_TOKEN (Tcl_TimerToken)(-1)
#define RPC_BUFFER_SIZE 8192
/*
* One of the following structures is maintained for each Tcl channel
* that is receiving/sending RPCs.
*/
typedef struct RPCChannel {
char *name; /* Name of channel in Tcl interpreter */
Tcl_Interp *interp; /* Associated interpreter */
Tcl_Channel chan; /* Associated channel */
char *buffer; /* Any buffered input */
int error; /* Has an error occured on this channel? */
int bufLen; /* Length of any buffered input */
int maxLen; /* Number of bytes allocated to buffer */
char *checkCmd; /* Tcl command to run to check RPCs */
struct RPCChannel *next;
int flags; /* Channel status */
#define CHAN_BUSY 1
#define CHAN_FREE 2
} RPCChannel;
static RPCChannel *registeredChannels = NULL;
/*
* One of the following structures exists for each active RPC. The
* structures are held in a hash table.
*/
typedef struct ActiveRPC {
int flags; /* Flags (see below) */
int id; /* ID for the RPC */
unsigned long time; /* Time RPC was sent (for garbage collection) */
Tcl_TimerToken token; /* Token for timeout */
RPCChannel *chanPtr; /* Associated channel */
int returnValue; /* Value to return from dp_RPC */
char *result; /* Value to set in interp->result */
} ActiveRPC;
static Tcl_HashTable activeRPCs;
static int activeId; /* Next free id */
/*
* Values for ActiveRPC->flags
*/
#define RPC_WAITING (1<<0) /* Waiting for a reply */
#define RPC_TIMEDOUT (1<<1) /* Timed out, no reply yet. When reply
* comes, clear it */
#define RPC_CANCELLED (1<<2) /* Cancelled by user -- same semantics
* for clearing as timeout */
/*
* Forward Declarations
*/
static void DpReadRPCChannel _ANSI_ARGS_((RPCChannel *rpcChanPtr));
static void DpReadRPCChannelCallback _ANSI_ARGS_((ClientData clientData,
int mask));
static void DpProcessRPCMessage _ANSI_ARGS_((Tcl_Interp *interp,
RPCChannel *chan,
int id, int token,
char *message));
static int DpCheckRPC _ANSI_ARGS_((Tcl_Interp *interp,
RPCChannel *rcPtr,
char *rpcStr));
static void DpTimeoutHandler _ANSI_ARGS_((ClientData clientData));
static int DpParseEventList _ANSI_ARGS_((Tcl_Interp *interp,
char *eventList, int *maskPtr));
static ActiveRPC *MakeActivationRecord _ANSI_ARGS_((RPCChannel *rpcChanPtr,
int setTimeout, int timeout));
static int DpRegisterRPCChannel _ANSI_ARGS_((Tcl_Interp *interp,
char *chanName,
char *checkCmd));
static int DpDeleteRPCChannel _ANSI_ARGS_((Tcl_Interp *interp,
RPCChannel *rpcChanPtr));
int Dp_RPCCmd _ANSI_ARGS_((ClientData clientData,
Tcl_Interp *interp, int argc,
char **argv));
int Dp_RDOCmd _ANSI_ARGS_((ClientData clientData,
Tcl_Interp *interp, int argc,
char **argv));
int Dp_AdminCmd _ANSI_ARGS_((ClientData clientData,
Tcl_Interp *interp, int argc,
char **argv));
int Dp_CancelRPCCmd _ANSI_ARGS_((ClientData unused,
Tcl_Interp *interp, int argc,
char **argv));
int Dp_RPCInit _ANSI_ARGS_((Tcl_Interp *interp));
static int DpSendRPCMessage _ANSI_ARGS_((Tcl_Channel chan,
int token, int id,
char *mesgStr));
/*
* The following strings are used to provide callback and/or error
* catching for RDOs. c = callback, e = onerror, ce = both
*/
/*
* sprintf template when both callback and onerror are specified.
* Params are cmd, onerror, callback
*/
static char *ceCmdTemplate =
"if [catch {%s} dp_rv] {\
dp_RDO $dp_rpcFile set errorInfo \"$errorInfo\n while remotely executing\n%s\"; \
dp_RDO $dp_rpcFile eval \"%s \\{$dp_rv\\}\"\
} else {\
dp_RDO $dp_rpcFile eval \"%s \\{$dp_rv\\}\"\
}";
/*
* sprintf template when just onerror is specified.
* Params are cmd, onerror
*/
static char *eCmdTemplate =
"if [catch {%s} dp_rv] {\
dp_RDO $dp_rpcFile set errorInfo \"$errorInfo\n while remotely executing\n%s\"; \
dp_RDO $dp_rpcFile eval \"%s \\{$dp_rv\\}\"\
}";
/*
* sprintf template when just callback is specified.
* Params are cmd, callback
*/
static char *cCmdTemplate =
"set dp_rv [%s]; dp_RDO $dp_rpcFile eval \"%s \\{$dp_rv\\}\"";
/*
*--------------------------------------------------------------
*
* ReadRPCChannel --
*
* This function is called by the event loop (through
* ReadRPCChannelCallback) whenever the file is readable
* (i.e., there's an inbound message).
* It reads all available input on the channel, and calls
* ProcessRPCMessage on each valid message. If the input
* is badly formatted, the channel is closed and an error
* is returned. If the message is incomplete, it is buffered.
*
* Results:
* None
*
* Side effects:
* May close the channel (in which case a background error is
* triggered).
*
*--------------------------------------------------------------
*/
static void
DpReadRPCChannel (rpcChanPtr)
RPCChannel *rpcChanPtr;
{
char *buffer;
int bufLen, maxLen;
char str[100];
char scratch[100000];
Tcl_DString dstr;
int blocking;
int numRead;
char *curMsg;
int id, msgLen;
char token, saveChar;
/*
* Make sure the socket's non-blocking (it was set to be non-blocking
* when it was registered, and shouldn't have changed. If it did, then
* it's an error in the user program).
*/
memset(scratch, 0, 1000);
Tcl_DStringInit(&dstr);
Tcl_GetChannelOption (
#ifdef _TCL80
rpcChanPtr->interp,
#endif
rpcChanPtr->chan, "-blocking", &dstr);
Tcl_GetBoolean (rpcChanPtr->interp, Tcl_DStringValue(&dstr), &blocking);
if (blocking) {
sprintf (str, "Channel %s must be in non-blocking mode", rpcChanPtr->name);
Tcl_SetResult (rpcChanPtr->interp, str, TCL_VOLATILE);
Tcl_BackgroundError (rpcChanPtr->interp);
DpDeleteRPCChannel (rpcChanPtr->interp, rpcChanPtr);
return;
}
Tcl_DStringFree(&dstr);
/*
* If everything's ok, read in up to 1000000 bytes of data (the maximum
* message length) and append it onto the buffer (expanding if necessary)
*
* Note that we only read in 100000 bytes since more causes stack
* problems in Win32. There are other systems that limit a thread's
* stack to 64k also, so this is not a good solution. This is a
* gaping wound in the RPC code that needs to be fixed.
*
* You need help if you are sending a megabyte RPC anyhow. :)
*/
numRead = Tcl_Read (rpcChanPtr->chan, scratch, sizeof(scratch));
if (numRead < 1) {
if (Tcl_Eof(rpcChanPtr->chan)) {
/*
* End of File on the RPC channel. This will usually happen
* if the TCP connection is reset.
*
* We'll just close the channel quietly.
*/
Tcl_Channel chan = rpcChanPtr->chan;
Tcl_Interp *interp = rpcChanPtr->interp;
DpDeleteRPCChannel(rpcChanPtr->interp, rpcChanPtr);
DBG(printf("Closing EOF'd RPC channel"));
DpClose(interp, chan);
return;
} else {
return;
}
}
buffer = rpcChanPtr->buffer;
bufLen = rpcChanPtr->bufLen;
maxLen = rpcChanPtr->maxLen;
if (bufLen + numRead > maxLen) {
char *newBuffer;
maxLen = bufLen + numRead + 1024;
newBuffer = ckalloc (maxLen);
memcpy (newBuffer, buffer, bufLen);
ckfree (buffer);
buffer = newBuffer;
}
memcpy (buffer+bufLen, scratch, numRead);
bufLen += numRead;
/*
* Process all the messages, one by one. We break out of this loop
* when there's not a complete message left in the buffer
*/
curMsg = buffer;
while (1) {
/*
* Make sure message length field is there.
*/
if (bufLen < 6) {
break;
}
/*
* Ok, we have the length field of the message. Make sure it's
* valid.
*/
memcpy(str, curMsg, 6);
str[6] = 0;
if (Tcl_GetInt(rpcChanPtr->interp, str, &msgLen) != TCL_OK) {
goto badFormat;
}
/*
* Got a valid length field. Make sure rest of message is there.
*/
if (bufLen < msgLen) {
break;
}
saveChar = curMsg[msgLen];
curMsg[msgLen] = '\0';
DBG(printf("\nIncoming RPC: %s on %s\n", curMsg, Tcl_GetChannelName(rpcChanPtr->chan)));
/*
* Ok, we've got enough data. Extract the fields and call
* ProcessRPCMessage. If we get a bad message, shut down the channel.
* Recall that the message format is:
* ---------------------------------------------------------------------
* | len:6 | space:1 | tok:1 | space:1 | id:6 | space:1 | msg:len-16 |
* ---------------------------------------------------------------------
*/
token = curMsg[7];
memcpy(str, &curMsg[9], 6);
str[6] = 0;
if (Tcl_GetInt(rpcChanPtr->interp, str, &id) != TCL_OK) {
goto badFormat;
}
DpProcessRPCMessage(rpcChanPtr->interp, rpcChanPtr, id, token, &curMsg[16]);
curMsg += msgLen;
*curMsg = saveChar;
bufLen -= msgLen;
if (rpcChanPtr->flags & CHAN_FREE) {
DpDeleteRPCChannel (rpcChanPtr->interp, rpcChanPtr);
return;
}
}
/*
* Done! Shift down any remaining data to the beginning of the buffer
* and return!
*/
memcpy (buffer, curMsg, bufLen);
rpcChanPtr->bufLen = bufLen;
rpcChanPtr->buffer = buffer;
rpcChanPtr->maxLen = maxLen;
return;
badFormat:
DBG(printf("Bad RPC packet: %s\n", curMsg));
sprintf(str, "Received badly formatted packet on RPC channel %s",
rpcChanPtr->name);
Tcl_SetResult(rpcChanPtr->interp, str, TCL_VOLATILE);
Tcl_BackgroundError(rpcChanPtr->interp);
return;
}
/*
*--------------------------------------------------------------
*
* DpReadRPCChannelCallback --
*
* This function is called by the event handling system
* whenever the channel is readable. Calls ReadRPCChannel
* on the channel.
*
* Results:
* None
*
* Side effects:
* None.
*
*--------------------------------------------------------------
*/
static void
DpReadRPCChannelCallback (clientData, mask)
ClientData clientData;
int mask;
{
DpReadRPCChannel((RPCChannel *)clientData);
}
/*
*--------------------------------------------------------------
*
* DpProcessRPCMessage --
*
* Process an incoming RPC, RDO, error, or return value message.
* See the documentation at the top of this file for details.
*
* Results:
* None.
*
* Side effects:
* Since a Tcl command is evaluated, almost anything can happen.
*
* ProcessRPCMessage processes an incoming RPC, RDO, error, or
* return value message. There are 4 cases:
*
* o if the message is an RPC message, the global variable dp_rpcFile
* is set to the channel identifier and the string is evaluated.
* Security checks, if any, are performed at this point.
* If an error code is generated, a return message is sent on
* the channel consisting of the errorCode and errorInfo variable,
* formatted as a Tcl list. If no error is generated, the return
* value is sent on the channel.
* o if the message is an RDO message, processing proceeds as with
* RPC, but no return value is sent in any case (if a return value
* is requested in an RDO, the evaluated string will contain a
* call to dp_RDO).
* o if the message is a return value or error message, the message
* is stored in the activeRPCs global variable, which stores the
* values that dp_RPC should return in interp->result and the
* errorCode (typically, TCL_OK or TCL_ERROR).
*
*--------------------------------------------------------------
*/
static void
DpProcessRPCMessage(interp, chan, id, token, message)
Tcl_Interp *interp; /* (in) Tcl interpreter for error reporting */
RPCChannel *chan; /* (in) Incoming RPC data channel */
int id; /* (in) Integer id for RPC */
int token; /* (in) Message token */
char *message; /* (in) The command to evaluate (zero terminated) */
{
ActiveRPC *arPtr = NULL;
RPCChannel *rcPtr;
Tcl_HashEntry *entryPtr;
int retCode, len;
char **argv;
int argc;
rcPtr = chan;
rcPtr->flags |= CHAN_BUSY;
entryPtr = Tcl_FindHashEntry(&activeRPCs, (char *)id);
if (entryPtr != NULL) {
arPtr = (ActiveRPC *) Tcl_GetHashValue(entryPtr);
}
if (((token == TOK_RET) || (token == TOK_ERR)) && (arPtr == NULL)) {
DBG(printf("Received reply to cancelled RPC\n"));
rcPtr->flags &= ~CHAN_BUSY;
return;
}
switch (token) {
case TOK_RPC:
Tcl_SetVar(interp, "dp_rpcFile", rcPtr->name, TCL_GLOBAL_ONLY);
if (DpCheckRPC(interp, rcPtr, message) == TCL_OK) {
retCode = Tcl_GlobalEval(interp, message);
if (retCode != TCL_OK) {
char *rv[2];
char *errMsg;
rv[0] = interp->result;
rv[1] = Tcl_GetVar(interp, "errorInfo", TCL_GLOBAL_ONLY);
errMsg = Tcl_Merge(2, rv);
if (DpSendRPCMessage(rcPtr->chan, TOK_ERR, id, errMsg)
!= TCL_OK) {
ckfree(errMsg);
goto error;
}
ckfree(errMsg);
} else {
if (DpSendRPCMessage(rcPtr->chan, TOK_RET, id,
interp->result) != TCL_OK) {
goto error;
}
}
} else {
if (DpSendRPCMessage(rcPtr->chan, TOK_ERR, id,
"RPC authorization denied") != TCL_OK) {
goto error;
}
}
break;
case TOK_RDO:
/*
* RDOs are very simple.
*/
Tcl_SetVar(interp, "dp_rpcFile", rcPtr->name, TCL_GLOBAL_ONLY);
if (DpCheckRPC(interp, rcPtr, message) == TCL_OK) {
Tcl_GlobalEval(interp, message);
}
break;
case TOK_ERR:
/*
* Split the error message into the result and info
* and save both.
*/
Tcl_SplitList(interp, message, &argc, &argv);
Tcl_AddErrorInfo(rcPtr->interp, argv[1]);
len = strlen(argv[0]) + 1;
arPtr->result = ckalloc(len);
memcpy(arPtr->result, argv[0], len);
ckfree((char *)argv);
arPtr->flags = 0;
arPtr->returnValue = TCL_ERROR;
break;
case TOK_RET:
/*
* Provide the return result to the interp.
*/
len = strlen(message) + 1;
arPtr->result = ckalloc(len);
memcpy(arPtr->result, message, len);
arPtr->flags = 0;
arPtr->returnValue = TCL_OK;
break;
default:
fprintf(stderr, "Invalid token received in incoming RPC.\n");
break;
}
rcPtr->flags &= ~CHAN_BUSY;
return;
error:
Tcl_AppendResult(interp, "Error sending RPC response on \"",
Tcl_GetChannelName(rcPtr->chan), "\"", NULL);
rcPtr->flags &= ~CHAN_BUSY;
return;
}
/* ----------------------------------------------------
*
* DpCheckRPC --
*
* Checks an incoming RPC with a user-defined function.
*
* Returns
*
* TCL_OK if allowed or TCL_ERROR if not.
*
* Side Effects
*
* The command checking procedure is evaluated.
*
* -----------------------------------------------------
*/
static int
DpCheckRPC(interp, rcPtr, rpcStr)
Tcl_Interp *interp;
RPCChannel *rcPtr;
char *rpcStr;
{
Tcl_DString cmd;
int resCode;
if (rcPtr->checkCmd == NULL) {
return TCL_OK;
}
Tcl_DStringInit(&cmd);
Tcl_DStringAppend(&cmd, rcPtr->checkCmd, -1);
Tcl_DStringAppend(&cmd, " {", -1);
Tcl_DStringAppend(&cmd, rpcStr, -1);
Tcl_DStringAppend(&cmd, "}", -1);
resCode = Tcl_GlobalEval(interp, Tcl_DStringValue(&cmd));
Tcl_DStringFree(&cmd);
return resCode;
}
/*
*--------------------------------------------------------------
*
* TimeoutHandler --
*
* This function is called by a timer handler when the associated
* RPC times out
*
* Results:
* None.
*
* Side effects:
* The associated RPC will return as soon as possible.
*
*--------------------------------------------------------------
*/
static void
DpTimeoutHandler (clientData)
ClientData clientData; /* Id of the RPC that timed out */
{
int id = (int)clientData;
ActiveRPC *activePtr;
Tcl_HashEntry *entryPtr;
/*
* Cancel the RPC by setting the flag on its active record (the
* event loop with the waiting RPC will notice this when it's its
* turn to return).
*/
entryPtr = Tcl_FindHashEntry(&activeRPCs, (char *)id);
if (entryPtr != NULL) {
activePtr = (ActiveRPC *)Tcl_GetHashValue(entryPtr);
activePtr->flags = RPC_TIMEDOUT;
}
}
/*
*--------------------------------------------------------------
*
* ParseEventList --
*
* Helper function to parse "-events" option of dp_RPC
*
* Results:
* A standard tcl result.
*
* Side effects:
* None.
*
*--------------------------------------------------------------
*/
static int
DpParseEventList (interp, eventList, maskPtr)
Tcl_Interp *interp; /* Tcl interpreter for error reporting */
char *eventList; /* Event list to parse */
int *maskPtr; /* Returned event mask */
{
int i;
int eventc;
char **eventv;
int events = 0;
if (Tcl_SplitList(interp, eventList, &eventc, &eventv) !=
TCL_OK) {
Tcl_AppendResult(interp, "bad parameter \"", eventList,
"\" for -events flag", (char *) NULL);
return TCL_ERROR;
}
for (i = 0; i < eventc; i++) {
if (strcmp(eventv[i], "x") == 0)
*maskPtr |= TCL_WINDOW_EVENTS;
else if (strcmp(eventv[i], "window") == 0)
*maskPtr |= TCL_WINDOW_EVENTS;
else if (strcmp(eventv[i], "rpc") == 0)
*maskPtr |= TCL_FILE_EVENTS;
else if (strcmp(eventv[i], "file") == 0)
*maskPtr |= TCL_FILE_EVENTS;
else if (strcmp(eventv[i], "timer") == 0)
*maskPtr |= TCL_TIMER_EVENTS;
else if (strcmp(eventv[i], "idle") == 0)
*maskPtr |= TCL_IDLE_EVENTS;
else {
Tcl_AppendResult(interp, "unknown event type \"", eventv[i],
"\" : should be all, or list of ",
"x, window, rpc, file, timer, or idle", (char *) NULL);
ckfree((char *) eventv);
return TCL_ERROR;
}
}
ckfree((char *) eventv);
return TCL_OK;
}
/*
*--------------------------------------------------------------
*
* MakeActivationRecord --
*
* Create an activation record for a new, outbound RPC.
*
* Results:
* Pointer to the new record, or NULL on error.
*
* Side effects:
* Creates a timer handler that may need to be cleared
* later on. The hash table activeRPCs now contains an
* entry for the activation record, so it'll have to be
* free'd later on.
*
*--------------------------------------------------------------
*/
static ActiveRPC *
MakeActivationRecord (rpcChanPtr, setTimeout, timeout)
RPCChannel *rpcChanPtr; /* Channel we're sending RPC on. */
int setTimeout; /* Should we set a timeout? */
int timeout; /* Timeout (in milliseconds) */
{
int i;
ActiveRPC *newRecPtr;
Tcl_HashEntry *entryPtr;
int isNew;
/*
* Create/initialize new activation record
*/
newRecPtr = (ActiveRPC *)ckalloc (sizeof(ActiveRPC));
newRecPtr->flags = RPC_WAITING;
newRecPtr->time = TclpGetSeconds();
newRecPtr->chanPtr = rpcChanPtr;
newRecPtr->id = 0;
/*
* Find a free id, and install a new activation record for it
*/
for (i = activeId; i != activeId-1; i = (i+1)%1000000) {
entryPtr = Tcl_CreateHashEntry(&activeRPCs, (char *)i, &isNew);
if (isNew) {
DBG(printf("Adding %s:%d to hash table\n",
Tcl_GetChannelName(newRecPtr->chanPtr->chan), i));
Tcl_SetHashValue(entryPtr, (ClientData)newRecPtr);
newRecPtr->id = i;
activeId = i+1;
if (setTimeout) {
newRecPtr->token = Tcl_CreateTimerHandler(timeout,
DpTimeoutHandler, (ClientData)i);
} else {
newRecPtr->token = NO_TOKEN;
}
return newRecPtr;
}
}
/*
* Couldn't find a free one -- return error
*/
return NULL;
}
/*
*--------------------------------------------------------------
*
* DpRPCCmd --
*
* Send an RPC message and wait for a reply.
*
* Results:
* A standard Tcl result
*
* Side effects:
* A message is sent on the channel, and the procedure blocks
* until the return value is received or the RPC times out.
*
*--------------------------------------------------------------
*/
/* ARGSUSED */
int
Dp_RPCCmd (clientData, interp, argc, argv)
ClientData clientData; /* ignored */
Tcl_Interp *interp; /* tcl interpreter */
int argc; /* Number of arguments */
char *argv[]; /* Arg list */
{
RPCChannel *rpcChanPtr;
ActiveRPC *activePtr, *arPtr;
int i;
int rpcArgc;
char **rpcArgv;
char *command;
Tcl_HashEntry *entryPtr;
int returnValue;
int rc = TCL_OK;
/*
* The default values for the value-option pairs
*/
int timeout = -1;
char *timeoutReturn = NULL;
int events = TCL_FILE_EVENTS;
/*
* Flags to indicate that a certain option has been set by the
* command line
*/
int setTimeout = 0;
int setTimeoutReturn = 0;
if (argc < 3) {
goto usage;
}
for (rpcChanPtr = registeredChannels; rpcChanPtr != NULL;
rpcChanPtr = rpcChanPtr->next) {
if (strcmp(argv[1], rpcChanPtr->name) == 0) {
break;
}
}
if (rpcChanPtr == NULL) {
Tcl_AppendResult(interp, "Attempt to send RPC over unregistered ",
"channel. Use dp_admin to register channel first.", NULL);
return TCL_ERROR;
}
/*
* Ok, found the channel and it's valid. Now, parse the value/option
* pairs.
*/
for (i=2; i<argc; i+=2) {
int v = i+1;
int len = strlen(argv[i]);
if (strncmp(argv[i], "-timeout", len)==0) {
if (v==argc) {goto arg_missing;}
if (Tcl_GetInt(interp, argv[v], &timeout) != TCL_OK) {
return TCL_ERROR;
}
setTimeout = 1;
} else if (strncmp(argv[i], "-timeoutReturn", len)==0) {
if (v==argc) {goto arg_missing;}
timeoutReturn = argv[v];
setTimeoutReturn = 1;
} else if (strncmp(argv[i], "-events", len)==0) {
if (v==argc) {goto arg_missing;}
if (strcmp (argv[v], "all") == 0) {
events = TCL_ALL_EVENTS;
} else {
DpParseEventList (interp, argv[v], &events);
}
} else {
rpcArgc = argc - i;
rpcArgv = argv + i;
break;
}
}
/*
* If -timeoutReturn is specified, then -timeout must be
*/
if (setTimeoutReturn && !setTimeout) {
Tcl_AppendResult(interp, " Using -timeoutReturn requires -timeout\n",
NULL);
return TCL_ERROR;
}
/*
* Adjust the event mask so it's consistent
*/
if (setTimeout) {
events |= TCL_TIMER_EVENTS;
}
/*
* Make an activation record for this RPC,
* package up the RPC and send it
*/
activePtr = MakeActivationRecord (rpcChanPtr, setTimeout, timeout);
if (activePtr == NULL) {
Tcl_AppendResult (interp, "Error -- ran out of RPC identifiers ",
"(over a million active RPCs?)\n", NULL);
return TCL_ERROR;
}
command = Tcl_Merge(rpcArgc, rpcArgv);
if (DpSendRPCMessage (rpcChanPtr->chan, TOK_RPC, activePtr->id, command)
!= TCL_OK) {
Tcl_AppendResult(interp, "Error sending RPC on channel ",
Tcl_GetChannelName(rpcChanPtr->chan), NULL);
ckfree((char *) command);
rc = TCL_ERROR;
goto cleanup;
}
ckfree((char *) command);
/*
* Wait for reply in an event loop.
*/
while (activePtr->flags == RPC_WAITING) {
Tcl_DoOneEvent (events);
}
/*
* If we got a timeout, call the timeout handler (if there is one)
* or trigger an error.
*
* If the RPC was cancelled, clear the timer event and return an
* error.
*
* If there wasn't a timeout or error, clear the timer event, set
* the variables according to the reply, and return.
*/
if (activePtr->flags & RPC_TIMEDOUT) {
if (timeoutReturn) {
/*
* Allocate memory for timeout proc and channel name
*/
char *cmd = ckalloc(strlen(timeoutReturn) + 16);
sprintf(cmd, "%s %s", timeoutReturn,
Tcl_GetChannelName(rpcChanPtr->chan));
rc = Tcl_GlobalEval(interp, cmd);
ckfree(cmd);
goto cleanup;
} else {
Tcl_AppendResult(interp, "RPC timed out on channel ", argv[1],
NULL);
rc = TCL_ERROR;
goto cleanup;
}
} else {
if (activePtr->token != NO_TOKEN) {
Tcl_DeleteTimerHandler(activePtr->token);
}
if (activePtr->flags & RPC_CANCELLED) {
Tcl_AppendResult(interp, "RPC cancelled on channel ", argv[1],
NULL);
rc = TCL_ERROR;
goto cleanup;
}
Tcl_SetResult (interp, activePtr->result, TCL_VOLATILE);
rc = activePtr->returnValue;
ckfree((char *)activePtr->result);
goto cleanup;
}
cleanup:
entryPtr = Tcl_FindHashEntry(&activeRPCs, (char *)activePtr->id);
if (entryPtr != NULL) {
arPtr = (ActiveRPC *) Tcl_GetHashValue(entryPtr);
DBG(printf("Deleting %s:%d from hash table\n",
Tcl_GetChannelName(arPtr->chanPtr->chan), arPtr->id, activePtr->id));
Tcl_DeleteHashEntry(entryPtr);
}
ckfree((char *)activePtr);
return rc;
usage:
Tcl_AppendResult(interp, "Usage:\n", "\"", argv[0],
" <channel> ?-timeout milliseconds ?-timeoutReturn callback??",
" ?-events eventList? command ?args ...?\"\n",
NULL);
return TCL_ERROR;
arg_missing:
Tcl_AppendResult(interp, "value for \"", argv[argc-1], "\" missing", NULL);
return TCL_ERROR;
}
/*
*--------------------------------------------------------------
*
* Dp_RDOCmd --
*
* Send an RDO message and return.
*
* Results:
* A standard Tcl result
*
* Side effects:
* A message is sent on the channel. This may cause the receiving
* process to send us a reply, which will invoke DpReadRPCChannel.
*
*--------------------------------------------------------------
*/
/* ARGSUSED */
int
Dp_RDOCmd (clientData, interp, argc, argv)
ClientData clientData; /* ignored */
Tcl_Interp *interp; /* tcl interpreter */
int argc; /* Number of arguments */
char *argv[]; /* Arg list */
{
RPCChannel *rpcChanPtr;
int rpcArgc, i;
char **rpcArgv;
char *command, *cmdStr;
char *onerror, *callback;
char *cmd;
if (argc < 3) {
Tcl_AppendResult(interp, "Wrong number of args", NULL);
goto usage;
}
for (rpcChanPtr = registeredChannels; rpcChanPtr != NULL;
rpcChanPtr = rpcChanPtr->next) {
if (strcmp(argv[1], rpcChanPtr->name) == 0) {
break;
}
}
if (rpcChanPtr == NULL) {
Tcl_AppendResult(interp, "Attempted to send RDO over unregistered ",
"channel.\nUse dp_admin to register channel first.", NULL);
return TCL_ERROR;
}
/*
* Ok, found the channel and it's valid. Now, parse the value/option
* pairs.
*/
onerror = callback = NULL;
for (i=2; i<argc; i+=2) {
int v = i+1;
int len = strlen(argv[i]);
if (strncmp(argv[i], "-callback", len)==0) {
if (v==argc) {goto arg_missing;}
callback = argv[v];
} else if (strncmp(argv[i], "-onerror", len)==0) {
if (v==argc) {goto arg_missing;}
if (!strcmp(argv[v], "none")) {
onerror = "tkerror";
} else {
onerror = argv[v];
}
} else {
rpcArgc = argc - i;
rpcArgv = argv + i;
break;
}
}
cmd = Tcl_Merge(rpcArgc, rpcArgv);
if (onerror != NULL) {
if (callback != NULL) {
/*
* Both onerror & callback specified. Form of command is: if
* [catch $cmd dp_rv] { dp_RDO $dp_rpcFile eval {$onerror $dp_rv}
* } else { dp_RDO $dp_rpcFile eval {$callback $dp_rv} }
*/
cmdStr = Tcl_Merge(argc, argv);
command = (char *) ckalloc(strlen(cmd) +
strlen(cmdStr) +
strlen(onerror) +
strlen(callback) +
strlen(ceCmdTemplate));
sprintf(command, ceCmdTemplate, cmd, cmdStr, onerror, callback);
ckfree(cmdStr);
} else {
/*
* Just onerror specified. Form of command is: if [catch $cmd
* dp_rv] { dp_RDO $dp_rpcFile eval {$onerror $dp_rv} }
*/
cmdStr = Tcl_Merge(argc, argv);
command = (char *) ckalloc(strlen(cmd) +
strlen(onerror) +
strlen(cmdStr) +
strlen(eCmdTemplate));
sprintf(command, eCmdTemplate, cmd, cmdStr, onerror);
ckfree(cmdStr);
}
} else {
if (callback != NULL) {
/*
* Just callback specified. Form of command is: dp_RDO
* $dp_rpcFile $callback [$cmd]
*/
command = (char *) ckalloc(strlen(cmd) +
strlen(callback) +
strlen(cCmdTemplate));
sprintf(command, cCmdTemplate, cmd, callback);
} else {
/*
* No callbacks specified. Form of command is: $cmd
*/
command = (char *) ckalloc(strlen(cmd) + 1);
strcpy(command, cmd);
}
}
ckfree((char *) cmd);
DpSendRPCMessage(rpcChanPtr->chan, TOK_RDO, 0, command);
ckfree((char *) command);
return TCL_OK;
usage:
Tcl_AppendResult(interp, " Usage:\n", "\"", argv[0],
" <channel> ?-events eventList? ?-callback script? ?-onerror script? command ?args ...?\"\n",
NULL);
return TCL_ERROR;
arg_missing:
Tcl_AppendResult(interp, "value for \"", argv[argc-1], "\" missing", NULL);
return TCL_ERROR;
}
/* ----------------------------------------------------
*
* DpCancelRPC --
*
* Breaks off the current RPC(s?) and forces it
* to return an error.
*
* Returns
*
* Nothing.
*
* Side Effects
*
* All waiting RPCs on the given channel are cancelled.
*
* -----------------------------------------------------
*/
static void
DpCancelRPC(rpcChanPtr)
RPCChannel *rpcChanPtr; /* (in) RPCChannel to cancel */
{
ActiveRPC *rpcList;
Tcl_HashEntry *hashEntry;
Tcl_HashSearch hashStruct;
for (hashEntry = Tcl_FirstHashEntry(&activeRPCs, &hashStruct);
hashEntry != NULL;
hashEntry = Tcl_NextHashEntry(&hashStruct)) {
rpcList = (ActiveRPC *) Tcl_GetHashValue(hashEntry);
if (rpcList->chanPtr == rpcChanPtr) {
/*
* Mark it as cancelled so the event
* loop will exit.
*/
rpcList->flags = RPC_CANCELLED;
}
}
}
/* ----------------------------------------------------
*
* Dp_CancelRPCCmd --
*
* Command parser for the dp_CancelRPC command.
*
* Returns
*
* TCL_OK or TCL_ERROR.
*
* Side Effects
*
* RPCs may be cancelled.
*
* -----------------------------------------------------
*/
int
Dp_CancelRPCCmd(unused, interp, argc, argv)
ClientData unused;
Tcl_Interp *interp;
int argc;
char **argv;
{
RPCChannel *chanPtr;
Tcl_Channel chan;
int mode, rc = TCL_OK, i;
char *chanName;
if (argc < 2) {
Tcl_AppendResult(interp, "wrong # args: should be \"",
argv[0], " [?chanID? ?chanID? ... | all]",
(char *) NULL);
return TCL_ERROR;
}
if ((argc == 2) && !strcmp(argv[1], "all")) {
for (chanPtr = registeredChannels; chanPtr != NULL;
chanPtr = chanPtr->next) {
DpCancelRPC(chanPtr);
}
} else {
/*
* Now we need to cycle through all the channel IDs given,
* find the actual channel structure and then find the RPC
* structure cooresponding to that channel.
*/
for (i = 1; i != argc; i++) {
chan = Tcl_GetChannel(interp, argv[i], &mode);
if (chan == NULL) {
return TCL_ERROR;
} else {
chanName = Tcl_GetChannelName(chan);
for (chanPtr = registeredChannels; chanPtr != NULL;
chanPtr = chanPtr->next) {
if (!strcmp(Tcl_GetChannelName(chanPtr->chan), chanName)) {
DpCancelRPC(chanPtr);
break;
}
}
}
}
}
return rc;
}
/*
*--------------------------------------------------------------
*
* DpRegisterRPCChannel --
*
* This function turns a Tcl-DP channel into an RPC channel
* by creating an event handler and the appropriate data structure.
*
* Results:
* A standard Tcl result
*
* Side effects:
* From now on, whenever the channel is readable, ReadRPCChannel
* will be called
*
*--------------------------------------------------------------
*/
static int
DpRegisterRPCChannel (interp, chanName, checkCmd)
Tcl_Interp *interp;
char *chanName;
char *checkCmd;
{
Tcl_Channel chan;
int mode;
RPCChannel *prev, *searchPtr;
RPCChannel *newRpcChannelPtr;
/*
* Make sure the channel hasn't been registered already.
*/
for (prev = NULL, searchPtr = registeredChannels; searchPtr != NULL;
prev = searchPtr, searchPtr = searchPtr->next) {
if (strcmp(chanName, searchPtr->name) == 0) {
Tcl_AppendResult(interp, "Channel ", chanName,
" is already registered", NULL);
return TCL_ERROR;
}
}
/*
* Check to make sure we can find the channel, and that it's
* readable/writeable
*/
chan = Tcl_GetChannel(interp, chanName, &mode);
if (chan == NULL) {
return TCL_ERROR;
}
if (mode != (TCL_READABLE | TCL_WRITABLE)) {
Tcl_AppendResult(interp, "Can't use channel ", chanName,
" for RPC -- channel is not both readable and writeable",
NULL);
return TCL_ERROR;
}
/*
* Things look ok! Make the file non-blocking, create a file handler
* for the channel, and return
*/
if (Tcl_SetChannelOption (interp, chan, "-blocking", "0") != TCL_OK) {
return TCL_ERROR;
}
newRpcChannelPtr = (RPCChannel *)ckalloc(sizeof(RPCChannel));
newRpcChannelPtr->name = ckalloc(strlen(chanName) + 1);
strcpy(newRpcChannelPtr->name, chanName);
newRpcChannelPtr->interp = interp;
newRpcChannelPtr->error = 0;
newRpcChannelPtr->bufLen = 0;
newRpcChannelPtr->maxLen = RPC_BUFFER_SIZE;
newRpcChannelPtr->buffer = ckalloc(RPC_BUFFER_SIZE);
memset(newRpcChannelPtr->buffer, 0, RPC_BUFFER_SIZE);
newRpcChannelPtr->chan = chan;
newRpcChannelPtr->checkCmd = NULL;
newRpcChannelPtr->flags = 0;
if (checkCmd) {
newRpcChannelPtr->checkCmd = ckalloc(strlen(checkCmd) + 1);
strcpy(newRpcChannelPtr->checkCmd, checkCmd);
}
newRpcChannelPtr->next = registeredChannels;
registeredChannels = newRpcChannelPtr;
Tcl_CreateChannelHandler(chan, TCL_READABLE, DpReadRPCChannelCallback,
(ClientData)newRpcChannelPtr);
return TCL_OK;
}
/*
*--------------------------------------------------------------
*
* DpDeleteRPCChannel --
*
* Clears the RPC channel handler created by CreateRPCChannel
* and frees up the data structures associated with it.
*
* Results:
* A standard Tcl result
*
* Side effects:
* rpcChanPtr is free'd and should not be referenced.
*
*--------------------------------------------------------------
*/
static int
DpDeleteRPCChannel (interp, rpcChanPtr)
Tcl_Interp *interp;
RPCChannel *rpcChanPtr;
{
Tcl_HashEntry *entryPtr;
Tcl_HashSearch hashSearchRec;
RPCChannel *searchPtr, *prev;
ActiveRPC *activePtr;
Tcl_Channel chan;
int mode;
if (rpcChanPtr == NULL) {
return TCL_ERROR;
}
if (rpcChanPtr->flags & CHAN_BUSY) {
rpcChanPtr->flags |= CHAN_FREE;
return TCL_OK;
}
for (searchPtr = registeredChannels, prev = NULL;
searchPtr != NULL;
prev = searchPtr, searchPtr = searchPtr->next) {
if (!strcmp(rpcChanPtr->name, searchPtr->name)) {
break;
}
}
/*
* Clear the file handler, clear all active RPCs,
* and free its memory.
*/
if ((chan = Tcl_GetChannel(interp, searchPtr->name, &mode)) != NULL) {
Tcl_DeleteChannelHandler(chan, DpReadRPCChannelCallback,
(ClientData)searchPtr);
}
DpCancelRPC(searchPtr);
if (prev == NULL) {
registeredChannels = searchPtr->next;
} else {
prev->next = searchPtr->next;
}
ckfree((char *) searchPtr->name);
ckfree((char *) searchPtr->buffer);
if (searchPtr->checkCmd) {
ckfree((char *) searchPtr->checkCmd);
}
ckfree((char *) searchPtr);
return TCL_OK;
}
/*
*--------------------------------------------------------------
*
* Dp_AdminCmd --
*
* This function implements the "dp_admin" command. It
* handles administrative functions of the RPC module.
* It's usage:
*
* dp_admin register <chan> ?-check checkCmd?
* dp_admin delete <chan>
*
* If called with "register", the channel is made into an RPC
* channel (i.e., RPCs can be sent/received across it). If called
* with "delete", RPCs will no longer be sent/received across it.
*
* Results:
* A standard tcl result.
*
* Side effects:
* Once a channel is registered, all input on the channel will
* be received through DpReadRPCChannel(), which will be called
* from the event loop.
*
*--------------------------------------------------------------
*/
/* ARGSUSED */
int
Dp_AdminCmd (clientData, interp, argc, argv)
ClientData clientData; /* ignored */
Tcl_Interp *interp; /* tcl interpreter */
int argc; /* Number of arguments */
char *argv[]; /* Arg list */
{
char c;
int len;
Tcl_Channel chan = NULL;
RPCChannel *searchPtr;
char *checkCmd = NULL;
if (argc != 3 && argc != 5) {
Tcl_AppendResult(interp, "Wrong number of args", NULL);
goto usage;
}
c = argv[1][0];
len = strlen(argv[1]);
/*
* Check to see if we got a check procedure.
*/
if (argc == 5) {
if (!strcmp(argv[3], "-check")) {
checkCmd = argv[4];
if (!strcmp(checkCmd, "none")) {
checkCmd = NULL;
}
} else {
goto usage;
}
}
/* ------------------------ REGISTER ------------------------------ */
if ((c == 'r') && (strncmp(argv[1], "register", len) == 0)) {
return DpRegisterRPCChannel (interp, argv[2], checkCmd);
/* ------------------------ DELETE -------------------------------- */
} else if ((c == 'd') && (strncmp(argv[1], "delete", len) == 0)) {
for (searchPtr = registeredChannels; searchPtr != NULL;
searchPtr = searchPtr->next) {
if (!strcmp(argv[2], searchPtr->name)) {
break;
}
}
if (searchPtr == NULL) {
Tcl_AppendResult(interp, "Channel \"", argv[2],
"\" not registered.", NULL);
return TCL_ERROR;
}
return DpDeleteRPCChannel (interp, searchPtr);
/* ------------------------ ERROR --------------------------------- */
} else {
goto usage;
}
usage:
Tcl_AppendResult(interp, " Possible usages:\n",
"\"", argv[0], " register <channel> ?-check checkCmd?\"\n",
"\"", argv[0], " delete <channel>\"\n",
NULL);
return TCL_ERROR;
}
/*
*--------------------------------------------------------------
*
* DpSendRPCMessage --
*
* Send a formatted RPC packet out on the specified channel.
*
* Results:
* TCL_OK or TCL_ERROR.
*
* Side effects:
* None
*
*--------------------------------------------------------------
*/
static int
DpSendRPCMessage (chan, token, id, mesgStr)
Tcl_Channel chan;
int token; /* in: msg type token */
int id; /* in: RPC ID # */
char *mesgStr; /* in: actual RPC string */
{
char *bufStr;
int result, totalLength;
totalLength = strlen(mesgStr) + 16;
bufStr = ckalloc(totalLength + 1);
sprintf(bufStr, "%6d %c %6d %s", totalLength, (char)token, id, mesgStr);
DBG(printf("\nSending RPC: %s on %s\n", bufStr, Tcl_GetChannelName(chan)));
result = Tcl_Write(chan, bufStr, totalLength);
ckfree(bufStr);
if (result != totalLength) {
return TCL_ERROR;
}
return TCL_OK;
}
/*
*--------------------------------------------------------------
*
* DpRPCInit --
*
* This function initializes the global data structures in
* this module.
*
*
* Results:
* A standard Tcl result.
*
* Side effects:
* None.
*
*--------------------------------------------------------------
*/
int
DpRPCInit (interp)
Tcl_Interp *interp;
{
Tcl_InitHashTable(&activeRPCs, TCL_ONE_WORD_KEYS);
activeId = 0;
return TCL_OK;
}