421 lines
13 KiB
C
421 lines
13 KiB
C
/*
|
|
* Copyright (c) 1991, 1992, 1993 by the University of Southern California
|
|
*
|
|
* For copying and distribution information, please see the file
|
|
* <usc-license.h>
|
|
*
|
|
* Written by bcn 1989-92 as part of dirsend.c in the Prospero distribution
|
|
* Modified by bcn 1/93 moved to ardp library - added asynchrony
|
|
*/
|
|
|
|
#include <usc-license.h>
|
|
|
|
#include <stdio.h>
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h>
|
|
#ifdef AIX
|
|
#include <sys/select.h>
|
|
#endif
|
|
|
|
#include <ardp.h>
|
|
#include <perrno.h>
|
|
#include <pmachine.h> /* for bcopy() */
|
|
|
|
extern RREQ ardp_activeQ; /* Info about active requests */
|
|
extern RREQ ardp_completeQ; /* Info about completed reqs */
|
|
extern int ardp_port; /* Opened UDP port */
|
|
extern int pfs_debug; /* Debug level */
|
|
int ardp_activeQ_len; /* Length of activeQ */
|
|
|
|
static struct timeval zerotime = {0, 0}; /* Used by select */
|
|
|
|
#define max(x,y) (x > y ? x : y)
|
|
|
|
static ardp__pr_ferror(int errcode);
|
|
|
|
/*
|
|
* ardp_process_active - process active requests
|
|
*
|
|
* ardp_process_active takes no arguments. It checks to see if any responses
|
|
* are pending on the UDP port, and if so processes them, adding them
|
|
* to the request structures on ardp_activeQ. If no requests are pending
|
|
* ardp_process_active processes and requests requiring retries then
|
|
* returns.
|
|
*
|
|
*/
|
|
int
|
|
ardp_process_active()
|
|
{
|
|
fd_set readfds; /* Used for select */
|
|
struct sockaddr_in from; /* Reply received from */
|
|
int from_sz; /* Size of address structure */
|
|
int hdr_len; /* Header Length */
|
|
char *ctlptr; /* Pointer to control field */
|
|
unsigned short cid = 0; /* Connection ID from rcvd pkt */
|
|
unsigned char rdflag11; /* First byte of flags (bit vect)*/
|
|
unsigned char rdflag12; /* Second byte of flags (int) */
|
|
unsigned char tchar; /* For decoding extra fields */
|
|
unsigned short stmp; /* Temp short for conversions */
|
|
unsigned long ltmp; /* Temp long for converions */
|
|
int tmp; /* Temporaty return value */
|
|
int nr; /* Number octest received */
|
|
long now; /* The current time */
|
|
|
|
RREQ req = NOREQ; /* To store request info */
|
|
PTEXT pkt = NOPKT; /* Packet being processed */
|
|
PTEXT ptmp = NOPKT; /* Packet being processed */
|
|
|
|
assert(P_IS_THIS_THREAD_MASTER()); /* something_received not MT-Safe */
|
|
check_for_pending:
|
|
|
|
/* Set list of file descriptors to be checked by select */
|
|
FD_ZERO(&readfds);
|
|
FD_SET(ardp_port, &readfds);
|
|
|
|
/* select - either recv is ready, or not */
|
|
/* see if timeout or error or wrong descriptor */
|
|
tmp = select(ardp_port + 1, &readfds, (fd_set *)0, (fd_set *)0, &zerotime);
|
|
|
|
/* If tmp is zero, then nothing to process, check for timeouts */
|
|
if(tmp == 0) {
|
|
now = time(NULL);
|
|
req = ardp_activeQ;
|
|
while(req) {
|
|
if (req->status == ARDP_STATUS_ACKPEND) {
|
|
ardp_xmit(req, -1 /* just ACK; send no data packets */);
|
|
req->status = ARDP_STATUS_ACTIVE;
|
|
}
|
|
if((req->wait_till.tv_sec) && (now >= req->wait_till.tv_sec)) {
|
|
if(req->retries_rem-- > 0) {
|
|
req->timeout_adj.tv_sec = ARDP_BACKOFF(req->timeout_adj.tv_sec);
|
|
if (pfs_debug >= 8) {
|
|
fprintf(stderr,"Connection %d timed out - Setting timeout to %d seconds.\n",
|
|
ntohs(req->cid), (int) req->timeout_adj.tv_sec);
|
|
}
|
|
req->wait_till.tv_sec = now + req->timeout_adj.tv_sec;
|
|
ardp_xmit(req, req->pwindow_sz);
|
|
} else {
|
|
if(pfs_debug >= 8) {
|
|
fprintf(stderr,"Connection %d timed out - Retry count exceeded.\n",
|
|
ntohs(req->cid) );
|
|
(void) fflush(stderr);
|
|
}
|
|
req->status = ARDP_TIMEOUT;
|
|
EXTRACT_ITEM(req,ardp_activeQ);
|
|
--ardp_activeQ_len;
|
|
APPEND_ITEM(req,ardp_completeQ);
|
|
}
|
|
}
|
|
req = req->next;
|
|
}
|
|
return(ARDP_SUCCESS);
|
|
}
|
|
|
|
/* If negative, then return an error */
|
|
if((tmp < 0) || !FD_ISSET(ardp_port, &readfds)) {
|
|
if (pfs_debug)
|
|
fprintf(stderr, "select failed : returned %d port=%d\n",
|
|
tmp, ardp_port);
|
|
|
|
return(ardp__pr_ferror(ARDP_SELECT_FAILED));
|
|
}
|
|
|
|
/* Process incoming packets */
|
|
pkt = ardp_ptalloc();
|
|
pkt->start = pkt->dat;
|
|
from_sz = sizeof(from);
|
|
|
|
if ((nr = recvfrom(ardp_port, pkt->start, sizeof(pkt->dat),
|
|
0, &from, &from_sz)) < 0) {
|
|
if (pfs_debug) perror("recvfrom");
|
|
ardp_ptlfree(pkt);
|
|
return(ardp__pr_ferror(ARDP_BAD_RECV));
|
|
}
|
|
pkt->length = nr;
|
|
*(pkt->start + pkt->length) = '\0';
|
|
|
|
if (pfs_debug >= 6)
|
|
fprintf(stderr,"Received packet from %s\n", inet_ntoa(from.sin_addr));
|
|
|
|
/* If the first byte is greater than 31 we have an old version */
|
|
/* response - return error */
|
|
if((hdr_len = (unsigned char) *(pkt->start)) > 31) {
|
|
ardp_ptfree(pkt);
|
|
if(ardp_activeQ->next == NOREQ)
|
|
return(ardp__pr_ferror(ARDP_BAD_VERSION));
|
|
else goto check_for_pending;
|
|
}
|
|
|
|
/* For the current format, the first two bits are a version number */
|
|
/* and the next six are the header length (including the first byte). */
|
|
ctlptr = pkt->start + 1;
|
|
|
|
if(hdr_len >= 3) { /* Connection ID */
|
|
bcopy2(ctlptr,&stmp);
|
|
if(stmp) cid = stmp;
|
|
ctlptr += 2;
|
|
}
|
|
|
|
/* Match up response with request */
|
|
req = ardp_activeQ;
|
|
while(req) {
|
|
if(cid == req->cid) break;
|
|
req = req->next;
|
|
}
|
|
if(!req) { /* The request isn't on the active queue */
|
|
if (pfs_debug>=6)
|
|
fprintf(stderr,"Packet received for inactive request (cid %d)\n",
|
|
ntohs(cid));
|
|
ardp_ptfree(pkt);
|
|
/* The following can be removed when old servers are gone */
|
|
if((cid == 0) && (ardp_activeQ->next == NOREQ))
|
|
return(ardp__pr_ferror(ARDP_BAD_VERSION));
|
|
goto check_for_pending;
|
|
}
|
|
|
|
if(hdr_len >= 5) { /* Packet number */
|
|
bcopy2(ctlptr,&stmp);
|
|
pkt->seq = ntohs(stmp);
|
|
ctlptr += 2;
|
|
}
|
|
else { /* No packet number specified, so this is the only one */
|
|
pkt->seq = 1;
|
|
req->rcvd_tot = 1;
|
|
}
|
|
if(hdr_len >= 7) { /* Total number of packets */
|
|
bcopy2(ctlptr,&stmp); /* 0 means don't know */
|
|
if(stmp) req->rcvd_tot = ntohs(stmp);
|
|
ctlptr += 2;
|
|
}
|
|
if(hdr_len >= 9) { /* Receievd through */
|
|
bcopy2(ctlptr,&stmp); /* 1 means received request */
|
|
req->prcvd_thru = max(ntohs(stmp),req->prcvd_thru);
|
|
ctlptr += 2;
|
|
}
|
|
if(hdr_len >= 11) { /* Service requested wait */
|
|
bcopy2(ctlptr,&stmp);
|
|
if(stmp || (req->svc_rwait != ntohs(stmp))) {
|
|
now = time(NULL);
|
|
/* New or non-zero requested wait value */
|
|
req->svc_rwait = ntohs(stmp);
|
|
if(pfs_debug >= 8)
|
|
fprintf(stderr,"Service asked us to wait %d seconds\n",
|
|
req->svc_rwait);
|
|
/* Adjust out timeout */
|
|
req->timeout_adj.tv_sec = max(req->timeout.tv_sec,req->svc_rwait);
|
|
req->wait_till.tv_sec = now + req->timeout_adj.tv_sec;
|
|
/* Reset the retry count */
|
|
req->retries_rem = req->retries;
|
|
}
|
|
ctlptr += 2;
|
|
}
|
|
if(hdr_len >= 12) { /* Flags (1st byte) */
|
|
bcopy1(ctlptr,&rdflag11);
|
|
if(rdflag11 & 0x80) {
|
|
if(pfs_debug >= 8)
|
|
fprintf(stderr,"Ack requested\n");
|
|
req->status = ARDP_STATUS_ACKPEND;
|
|
}
|
|
if(rdflag11 & 0x40) {
|
|
if(pfs_debug >= 8)
|
|
fprintf(stderr,"Sequenced control packet\n");
|
|
pkt->length = -1;
|
|
}
|
|
ctlptr += 1;
|
|
}
|
|
if(hdr_len >= 13) { /* Flags (2nd byte) */
|
|
/* Reserved for future use */
|
|
bcopy1(ctlptr,&rdflag12);
|
|
ctlptr += 1;
|
|
if(rdflag12 == 2) {
|
|
/* Attempt to set back prcvd_thru */
|
|
bcopy2(pkt->start+7,&stmp);
|
|
req->prcvd_thru = ntohs(stmp);
|
|
}
|
|
if(rdflag12 == 1) {
|
|
/* ARDP Connection Refused */
|
|
if(pfs_debug >= 8)
|
|
fprintf(stderr,"***ARDP connection refused by server***\n");
|
|
req->status = ARDP_REFUSED;
|
|
EXTRACT_ITEM(req,ardp_activeQ);
|
|
--ardp_activeQ_len;
|
|
APPEND_ITEM(req,ardp_completeQ);
|
|
goto check_for_pending;
|
|
}
|
|
if(rdflag12 == 4) {
|
|
/* Server Redirect */
|
|
bcopy4(ctlptr,&(req->peer_addr));
|
|
ctlptr += 4;
|
|
bcopy2(ctlptr,&(req->peer_port));
|
|
ctlptr += 2;
|
|
if(pfs_debug >= 8)
|
|
fprintf(stderr,"***ARDP redirect to %s(%d)***\n",
|
|
inet_ntoa(req->peer_addr),PEER_PORT(req));
|
|
ardp_xmit(req, req->pwindow_sz);
|
|
}
|
|
if(rdflag12 == 254) {
|
|
tchar = *ctlptr;
|
|
ctlptr++;
|
|
if(tchar & 0x1) { /* Queue position */
|
|
bcopy2(ctlptr,&stmp);
|
|
req->inf_queue_pos = ntohs(stmp);
|
|
if(pfs_debug >= 8)
|
|
fprintf(stderr,"Current queue position on server is %d\n",
|
|
req->inf_queue_pos);
|
|
ctlptr += 2;
|
|
}
|
|
if(tchar & 0x2) { /* Expected system time */
|
|
bcopy4(ctlptr,<mp);
|
|
req->inf_sys_time = ntohl(ltmp);
|
|
if(pfs_debug >= 8)
|
|
fprintf(stderr,"Expected system time is %d seconds\n",
|
|
req->inf_sys_time);
|
|
ctlptr += 4;
|
|
}
|
|
}
|
|
}
|
|
/* If ->seq 0, then an unsequenced control packet */
|
|
if(pkt->seq == 0) {
|
|
ardp_ptfree(pkt);
|
|
goto check_for_pending;
|
|
}
|
|
if(pkt->length >= 0) pkt->length -= hdr_len;
|
|
pkt->start += hdr_len;
|
|
pkt->text = pkt->start;
|
|
pkt->ioptr = pkt->start;
|
|
|
|
if(pfs_debug >= 8) fprintf(stderr,"Packet %d of %d (cid=%d)\n", pkt->seq,
|
|
req->rcvd_tot, ntohs(req->cid));
|
|
|
|
if (req->rcvd == NOPKT) {
|
|
/* Add it as the head of empty doubly linked list */
|
|
req->rcvd = pkt;
|
|
pkt->previous = pkt;
|
|
if(pkt->seq == 1) {
|
|
req->comp_thru = pkt;
|
|
req->rcvd_thru = 1;
|
|
/* If only one packet, then return it. We will assume */
|
|
/* that it is not a sequenced control packet */
|
|
if(req->rcvd_tot == 1) goto req_complete;
|
|
}
|
|
goto ins_done;
|
|
}
|
|
|
|
if(req->comp_thru && (pkt->seq <= req->rcvd_thru))
|
|
ardp_ptfree(pkt); /* Duplicate */
|
|
else if(pkt->seq < req->rcvd->seq) { /* First (sequentially) */
|
|
pkt->next = req->rcvd;
|
|
pkt->previous = req->rcvd->previous;
|
|
req->rcvd->previous = pkt;
|
|
req->rcvd = pkt;
|
|
if(req->rcvd->seq == 1) {
|
|
req->comp_thru = req->rcvd;
|
|
req->rcvd_thru = 1;
|
|
}
|
|
}
|
|
else { /* Insert later in the packet list unless duplicate */
|
|
ptmp = (req->comp_thru ? req->comp_thru : req->rcvd);
|
|
while (pkt->seq > ptmp->seq) {
|
|
if(ptmp->next == NULL) {
|
|
/* Insert at end */
|
|
ptmp->next = pkt;
|
|
pkt->previous = ptmp;
|
|
pkt->next = NULL;
|
|
req->rcvd->previous = pkt;
|
|
goto ins_done;
|
|
}
|
|
ptmp = ptmp->next;
|
|
}
|
|
if(ptmp->seq == pkt->seq) /* Duplicate */
|
|
ardp_ptfree(pkt);
|
|
else { /* insert before ptmp (we know were not first) */
|
|
ptmp->previous->next = pkt;
|
|
pkt->previous = ptmp->previous;
|
|
pkt->next = ptmp;
|
|
ptmp->previous = pkt;
|
|
}
|
|
}
|
|
|
|
ins_done:
|
|
/* Find out how much is complete and remove sequenced control packets */
|
|
while(req->comp_thru && req->comp_thru->next &&
|
|
(req->comp_thru->next->seq == (req->comp_thru->seq + 1))) {
|
|
/* We have added one more in sequence */
|
|
if(pfs_debug >= 8)
|
|
fprintf(stderr,"Packets now received through %d\n",
|
|
req->comp_thru->next->seq);
|
|
|
|
if(req->comp_thru->length == -1) {
|
|
/* Old comp_thru was a sequenced control packet */
|
|
ptmp = req->comp_thru;
|
|
req->comp_thru = req->comp_thru->next;
|
|
req->rcvd_thru = req->comp_thru->seq;
|
|
EXTRACT_ITEM(ptmp,req->rcvd);
|
|
ardp_ptfree(ptmp);
|
|
}
|
|
else {
|
|
/* Old comp_thru was a data packet */
|
|
req->comp_thru = req->comp_thru->next;
|
|
req->rcvd_thru = req->comp_thru->seq;
|
|
}
|
|
|
|
/* Update outgoing packets */
|
|
ardp_headers(req);
|
|
|
|
/* We've made progress, so reset timeout and retry count */
|
|
req->timeout_adj.tv_sec = max(req->timeout.tv_sec,req->svc_rwait);
|
|
req->retries_rem = req->retries;
|
|
}
|
|
|
|
/* See if there are any gaps - possibly toggle GAP status */
|
|
if(!req->comp_thru || req->comp_thru->next) {
|
|
if (req->status == ARDP_STATUS_ACTIVE) req->status = ARDP_STATUS_GAPS;
|
|
}
|
|
else if (req->status == ARDP_STATUS_GAPS) req->status = ARDP_STATUS_ACTIVE;
|
|
|
|
/* If incomplete, wait for more */
|
|
if (!(req->comp_thru) || (req->comp_thru->seq != req->rcvd_tot))
|
|
goto check_for_pending;
|
|
|
|
req_complete:
|
|
|
|
if (pfs_debug >= 7) {
|
|
fprintf(stderr,"The complete response has been received.\n");
|
|
(void) fflush(stderr);
|
|
}
|
|
|
|
if(req->status == ARDP_STATUS_ACKPEND) {
|
|
if (pfs_debug >= 7) {
|
|
if (req->peer.sin_family == AF_INET)
|
|
fprintf(stderr,"Acknowledging final packet to %s(%d)\n",
|
|
inet_ntoa(req->peer_addr), PEER_PORT(req));
|
|
else fprintf(stderr,"Acknowledging final packet\n");
|
|
(void) fflush(stderr);
|
|
}
|
|
ardp_xmit(req, req->pwindow_sz);
|
|
}
|
|
|
|
req->status = ARDP_STATUS_COMPLETE;
|
|
req->inpkt = req->rcvd;
|
|
EXTRACT_ITEM(req,ardp_activeQ);
|
|
--ardp_activeQ_len;
|
|
APPEND_ITEM(req,ardp_completeQ);
|
|
return(ARDP_SUCCESS);
|
|
}
|
|
|
|
static int
|
|
ardp__pr_ferror(int errcode)
|
|
{
|
|
RREQ creq;
|
|
|
|
while(ardp_activeQ) {
|
|
creq = ardp_activeQ;
|
|
ardp_activeQ->status = errcode;
|
|
EXTRACT_ITEM(creq,ardp_activeQ);
|
|
--ardp_activeQ_len;
|
|
APPEND_ITEM(creq,ardp_completeQ);
|
|
}
|
|
perrno = errcode;
|
|
return(errcode);
|
|
}
|