From e88ba3953d4dea83f2699669a73ce48b2e573227 Mon Sep 17 00:00:00 2001 From: mid Date: Mon, 29 May 2000 12:37:45 +0000 Subject: [PATCH] - Mon May 29 12:08:28 GMT 2000 - Rearranged aim_tx_flushqueue(); moved write operation to aim_tx_sendframe() - Turned aim_tx_enqueue() into a macro that calls sess->tx_enqueue, a function pointer to whatever you want to use to enqueue things for transmition (or not) - Old aim_tx_enqueue becomes aim_tx_enqueue__queuebased. Added aim_tx_enqueue__immediate for doing immediate writes. Default is to use queue-based procedure. - Cleaned up parts of aim_conn.c - Added locking around the sockets themselves. Should allow for full cross-thread usage. - Unfortunatly, only pthreads are supported at this time. If you don't have pthreads on your arch, implement the macros your arch needs and send me a patch. (A SysV semaphore implementation would be trivial, as would a simple integer-based lock.) --- CHANGES | 18 ++++++ aim_conn.c | 77 +++++++++------------- aim_misc.c | 3 +- aim_rxqueue.c | 6 ++ aim_txqueue.c | 161 +++++++++++++++++++++++++++++++--------------- faim/aim.h | 27 +++++++- faim/faimconfig.h | 11 ++++ 7 files changed, 201 insertions(+), 102 deletions(-) diff --git a/CHANGES b/CHANGES index 0b36050..335d4ba 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,24 @@ No release numbers ------------------ + - Mon May 29 12:08:28 GMT 2000 + - Rearranged aim_tx_flushqueue(); moved write operation + to aim_tx_sendframe() + - Turned aim_tx_enqueue() into a macro that calls sess->tx_enqueue, + a function pointer to whatever you want to use to enqueue + things for transmition (or not) + - Old aim_tx_enqueue becomes aim_tx_enqueue__queuebased. Added + aim_tx_enqueue__immediate for doing immediate writes. Default + is to use queue-based procedure. + - Cleaned up parts of aim_conn.c + - Added locking around the sockets themselves. Should allow + for full cross-thread usage. + - Unfortunatly, only pthreads are supported at this time. + If you don't have pthreads on your arch, implement the + macros your arch needs and send me a patch. (A SysV + semaphore implementation would be trivial, as would a + simple integer-based lock.) + - Sun May 21 14:59:20 GMT 2000 - Added infotype parameter to aim_getinfo() for requesting different types of messages. AIM_GETINFO_GENERALINFO diff --git a/aim_conn.c b/aim_conn.c index 5cb4a79..0396f88 100644 --- a/aim_conn.c +++ b/aim_conn.c @@ -11,18 +11,9 @@ void aim_connrst(struct aim_session_t *sess) { int i; - for (i = 0; i < AIM_CONN_MAX; i++) - { - sess->conns[i].fd = -1; - sess->conns[i].type = -1; - sess->conns[i].status = 0; - sess->conns[i].seqnum = 0; - sess->conns[i].lastactivity = 0; - sess->conns[i].forcedlatency = 0; - aim_clearhandlers(&(sess->conns[i])); - sess->conns[i].handlerlist = NULL; - } - + for (i = 0; i < AIM_CONN_MAX; i++) { + aim_conn_close(&sess->conns[i]); + } } struct aim_conn_t *aim_conn_getnext(struct aim_session_t *sess) @@ -48,6 +39,7 @@ void aim_conn_close(struct aim_conn_t *deadconn) if (deadconn->priv) free(deadconn->priv); deadconn->priv = NULL; + faim_mutex_init(&deadconn->active, NULL); } struct aim_conn_t *aim_getconn_type(struct aim_session_t *sess, @@ -100,13 +92,12 @@ struct aim_conn_t *aim_newconn(struct aim_session_t *sess, * */ - for(i=0;istatus = (h_errno | AIM_CONN_STATUS_RESOLVERR); - return connstruct; - } + if (hp == NULL) { + connstruct->status = (h_errno | AIM_CONN_STATUS_RESOLVERR); + return connstruct; + } memset(&sa.sin_zero, 0, 8); sa.sin_port = htons(port); @@ -127,12 +117,11 @@ struct aim_conn_t *aim_newconn(struct aim_session_t *sess, connstruct->fd = socket(hp->h_addrtype, SOCK_STREAM, 0); ret = connect(connstruct->fd, (struct sockaddr *)&sa, sizeof(struct sockaddr_in)); - if( ret < 0) - { - connstruct->fd = -1; - connstruct->status = (errno | AIM_CONN_STATUS_CONNERR); - return connstruct; - } + if(ret < 0) { + connstruct->fd = -1; + connstruct->status = (errno | AIM_CONN_STATUS_CONNERR); + return connstruct; + } return connstruct; } @@ -140,8 +129,8 @@ struct aim_conn_t *aim_newconn(struct aim_session_t *sess, int aim_conngetmaxfd(struct aim_session_t *sess) { int i,j; - j=0; - for (i=0;iconns[i].fd > j) j = sess->conns[i].fd; return j; @@ -150,8 +139,8 @@ int aim_conngetmaxfd(struct aim_session_t *sess) int aim_countconn(struct aim_session_t *sess) { int i,cnt; - cnt = 0; - for (i=0;iconns[i].fd > -1) cnt++; return cnt; @@ -239,8 +228,6 @@ int aim_conn_setlatency(struct aim_conn_t *conn, int newval) void aim_session_init(struct aim_session_t *sess) { - int i; - if (!sess) return; @@ -250,23 +237,19 @@ void aim_session_init(struct aim_session_t *sess) sess->logininfo.email = NULL; sess->logininfo.regstatus = 0x00; - for (i = 0; i < AIM_CONN_MAX; i++) - { - sess->conns[i].fd = -1; - sess->conns[i].type = -1; - sess->conns[i].status = 0; - sess->conns[i].seqnum = 0; - sess->conns[i].lastactivity = 0; - sess->conns[i].forcedlatency = 0; - sess->conns[i].handlerlist = NULL; - sess->conns[i].priv = NULL; - } - + aim_connrst(sess); + sess->queue_outgoing = NULL; sess->queue_incoming = NULL; sess->pendingjoin = NULL; sess->outstanding_snacs = NULL; sess->snac_nextid = 0x00000001; + /* + * This must always be set. Default to the queue-based + * version for back-compatibility. + */ + sess->tx_enqueue = &aim_tx_enqueue__queuebased; + return; } diff --git a/aim_misc.c b/aim_misc.c index 242d28d..9a0d1f3 100644 --- a/aim_misc.c +++ b/aim_misc.c @@ -648,7 +648,7 @@ u_long aim_bos_reqlocaterights(struct aim_session_t *sess, } /* - * aim_bos_reqicbmparaminfo() +* aim_bos_reqicbmparaminfo() * * Request ICBM parameter information. * @@ -658,3 +658,4 @@ u_long aim_bos_reqicbmparaminfo(struct aim_session_t *sess, { return aim_genericreq_n(sess, conn, 0x0004, 0x0004); } + diff --git a/aim_rxqueue.c b/aim_rxqueue.c index 4298b47..d82d6f4 100644 --- a/aim_rxqueue.c +++ b/aim_rxqueue.c @@ -31,10 +31,13 @@ int aim_get_command(struct aim_session_t *sess, struct aim_conn_t *conn) * 2 short -- Sequence number * 4 short -- Number of data bytes that follow. */ + faim_mutex_lock(&conn->active); if (read(conn->fd, generic, 6) < 6){ aim_conn_close(conn); + faim_mutex_unlock(&conn->active); return -1; } + faim_mutex_unlock(&conn->active); /* * This shouldn't happen unless the socket breaks, the server breaks, @@ -72,12 +75,15 @@ int aim_get_command(struct aim_session_t *sess, struct aim_conn_t *conn) } /* read the data portion of the packet */ + faim_mutex_lock(&conn->active); if (read(conn->fd, newrx->data, newrx->commandlen) < newrx->commandlen){ free(newrx->data); free(newrx); aim_conn_close(conn); + faim_mutex_unlock(&conn->active); return -1; } + faim_mutex_unlock(&conn->active); newrx->conn = conn; diff --git a/aim_txqueue.c b/aim_txqueue.c index 7967cbf..8c3a9fb 100644 --- a/aim_txqueue.c +++ b/aim_txqueue.c @@ -19,8 +19,10 @@ struct command_tx_struct *aim_tx_new(int chan, struct aim_conn_t *conn, int data { struct command_tx_struct *new; - if (!conn) + if (!conn) { + printf("aim_tx_new: ERROR: no connection specified\n"); return NULL; + } new = (struct command_tx_struct *)malloc(sizeof(struct command_tx_struct)); if (!new) @@ -39,7 +41,7 @@ struct command_tx_struct *aim_tx_new(int chan, struct aim_conn_t *conn, int data } /* - * aim_tx_enqeue() + * aim_tx_enqeue__queuebased() * * The overall purpose here is to enqueue the passed in command struct * into the outgoing (tx) queue. Basically... @@ -50,9 +52,12 @@ struct command_tx_struct *aim_tx_new(int chan, struct aim_conn_t *conn, int data * 5) Unlock the struct once it's linked in * 6) Return * + * Note that this is only used when doing queue-based transmitting; + * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased. + * */ -int aim_tx_enqueue(struct aim_session_t *sess, - struct command_tx_struct *newpacket) +int aim_tx_enqueue__queuebased(struct aim_session_t *sess, + struct command_tx_struct *newpacket) { struct command_tx_struct *cur; @@ -90,6 +95,41 @@ int aim_tx_enqueue(struct aim_session_t *sess, return 0; } +/* + * aim_tx_enqueue__immediate() + * + * Parallel to aim_tx_enqueue__queuebased, however, this bypasses + * the whole queue mess when you want immediate writes to happen. + * + * Basically the same as its __queuebased couterpart, however + * instead of doing a list append, it just calls aim_tx_sendframe() + * right here. + * + */ +int aim_tx_enqueue__immediate(struct aim_session_t *sess, struct command_tx_struct *newpacket) +{ + if (newpacket->conn == NULL) { + faimdprintf(1, "aim_tx_enqueue: ERROR: packet has no connection\n"); + if (newpacket->data) + free(newpacket->data); + free(newpacket); + return -1; + } + + newpacket->seqnum = aim_get_next_txseqnum(newpacket->conn); + + newpacket->lock = 1; /* lock */ + newpacket->sent = 0; /* not sent yet */ + + aim_tx_sendframe(newpacket); + + if (newpacket->data) + free(newpacket->data); + free(newpacket); + + return 0; +} + /* * aim_get_next_txseqnum() * @@ -161,10 +201,72 @@ int aim_tx_printqueue(struct aim_session_t *sess) * 9) Step to next struct in list and go back to 1. * */ +int aim_tx_sendframe(struct command_tx_struct *cur) +{ + u_char *curPacket; + + if (!cur) + return -1; /* fatal */ + + cur->lock = 1; /* lock the struct */ + + /* allocate full-packet buffer */ + curPacket = (char *) malloc(cur->commandlen + 6); + + /* command byte */ + curPacket[0] = 0x2a; + + /* type/family byte */ + curPacket[1] = cur->type; + + /* bytes 3+4: word: FLAP sequence number */ + aimutil_put16(curPacket+2, cur->seqnum); + + /* bytes 5+6: word: SNAC len */ + aimutil_put16(curPacket+4, cur->commandlen); + + /* bytes 7 and on: raw: SNAC data */ /* XXX: ye gods! get rid of this! */ + memcpy(&(curPacket[6]), cur->data, cur->commandlen); + + /* full image of raw packet data now in curPacket */ + faim_mutex_lock(&cur->conn->active); + if ( (u_int)write(cur->conn->fd, curPacket, (cur->commandlen + 6)) != (cur->commandlen + 6)) { + faim_mutex_unlock(&cur->conn->active); + printf("\nWARNING: Error in sending packet 0x%4x -- will try again next time\n\n", cur->seqnum); + cur->sent = 0; /* mark it unsent */ + return 0; /* bail out -- continuable error */ + } else { + faimdprintf(2, "\nSENT 0x%4x\n\n", cur->seqnum); + + cur->sent = 1; /* mark the struct as sent */ + cur->conn->lastactivity = time(NULL); + } + faim_mutex_unlock(&cur->conn->active); + +#if debug > 2 + faimdprintf(2, "\nPacket:"); + for (i = 0; i < (cur->commandlen + 6); i++) { + if ((i % 8) == 0) { + faimdprintf(2, "\n\t"); + } + if (curPacket[i] >= ' ' && curPacket[i]<127) { + faimdprintf(2, "%c=%02x ", curPacket[i], curPacket[i]); + } else { + faimdprintf(2, "0x%2x ", curPacket[i]); + } + } + faimdprintf(2, "\n"); +#endif + cur->lock = 0; /* unlock the struct */ + free(curPacket); /* free up full-packet buffer */ + + return 1; /* success */ +} + int aim_tx_flushqueue(struct aim_session_t *sess) { struct command_tx_struct *cur; - u_char *curPacket = NULL; + #if debug > 1 int i = 0; #endif @@ -185,54 +287,9 @@ int aim_tx_flushqueue(struct aim_session_t *sess) /* FIXME FIXME -- should be a break! we dont want to block the upper layers */ sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL)); } - - cur->lock = 1; /* lock the struct */ - - /* allocate full-packet buffer */ - curPacket = (char *) malloc(cur->commandlen + 6); - - /* command byte */ - curPacket[0] = 0x2a; - - /* type/family byte */ - curPacket[1] = cur->type; - - /* bytes 3+4: word: FLAP sequence number */ - aimutil_put16(curPacket+2, cur->seqnum); - /* bytes 5+6: word: SNAC len */ - aimutil_put16(curPacket+4, cur->commandlen); - - /* bytes 7 and on: raw: SNAC data */ - memcpy(&(curPacket[6]), cur->data, cur->commandlen); - - /* full image of raw packet data now in curPacket */ - if ( (u_int)write(cur->conn->fd, curPacket, (cur->commandlen + 6)) != (cur->commandlen + 6)) { - printf("\nWARNING: Error in sending packet 0x%4x -- will try again next time\n\n", cur->seqnum); - cur->sent = 0; /* mark it unsent */ - continue; /* bail out */ - } else { - faimdprintf(2, "\nSENT 0x%4x\n\n", cur->seqnum); - - cur->sent = 1; /* mark the struct as sent */ - cur->conn->lastactivity = time(NULL); - } -#if debug > 2 - faimdprintf(2, "\nPacket:"); - for (i = 0; i < (cur->commandlen + 6); i++) { - if ((i % 8) == 0) { - faimdprintf(2, "\n\t"); - } - if (curPacket[i] >= ' ' && curPacket[i]<127) { - faimdprintf(2, "%c=%02x ", curPacket[i], curPacket[i]); - } else { - faimdprintf(2, "0x%2x ", curPacket[i]); - } - } - faimdprintf(2, "\n"); -#endif - cur->lock = 0; /* unlock the struct */ - free(curPacket); /* free up full-packet buffer */ + if (aim_tx_sendframe(cur) == -1) + break; } } diff --git a/faim/aim.h b/faim/aim.h index 8216628..c15e646 100644 --- a/faim/aim.h +++ b/faim/aim.h @@ -9,6 +9,10 @@ #include #include +#ifndef FAIM_USEPTHREADS +#error pthreads are currently required. +#endif + #include #include #include @@ -17,6 +21,14 @@ #include #include +#ifdef FAIM_USEPTHREADS +#include +#define faim_mutex_t pthread_mutex_t +#define faim_mutex_init pthread_mutex_init +#define faim_mutex_lock pthread_mutex_lock +#define faim_mutex_unlock pthread_mutex_unlock +#endif + #ifdef _WIN32 #include #include @@ -121,6 +133,9 @@ struct aim_conn_t { time_t lastactivity; /* time of last transmit */ int forcedlatency; struct aim_rxcblist_t *handlerlist; +#ifdef FAIM_USEPTHREADS + faim_mutex_t active; +#endif }; /* struct for incoming commands */ @@ -180,9 +195,14 @@ struct aim_session_t { /* * TX/RX queues */ - struct command_tx_struct *queue_outgoing; + struct command_tx_struct *queue_outgoing; struct command_rx_struct *queue_incoming; + /* + * Tx Enqueuing function + */ + int (*tx_enqueue)(struct aim_session_t *, struct command_tx_struct *); + /* * This is a dreadful solution to the what-room-are-we-joining * problem. (There's no connection between the service @@ -301,7 +321,10 @@ int aim_parse_missed_im(struct aim_session_t *, struct command_rx_struct *, ...) int aim_parse_last_bad(struct aim_session_t *, struct command_rx_struct *, ...); struct command_tx_struct *aim_tx_new(int, struct aim_conn_t *, int); -int aim_tx_enqueue(struct aim_session_t *, struct command_tx_struct *); +int aim_tx_enqueue__queuebased(struct aim_session_t *, struct command_tx_struct *); +int aim_tx_enqueue__immediate(struct aim_session_t *, struct command_tx_struct *); +#define aim_tx_enqueue(x, y) ((*(x->tx_enqueue))(x, y)) +int aim_tx_sendframe(struct command_tx_struct *cur); u_int aim_get_next_txseqnum(struct aim_conn_t *); int aim_tx_flushqueue(struct aim_session_t *); int aim_tx_printqueue(struct aim_session_t *); diff --git a/faim/faimconfig.h b/faim/faimconfig.h index 0184f77..b0ccc4a 100644 --- a/faim/faimconfig.h +++ b/faim/faimconfig.h @@ -103,4 +103,15 @@ */ #define AIMUTIL_USEMACROS +/* + * Select whether or not to use POSIX thread functionality. + * + * Default: defined on Linux, otherwise undefined + */ +#ifdef __linux__ +#define FAIM_USEPTHREADS +#endif + #endif /* __FAIMCONFIG_H__ */ + + -- 2.45.1