]> andersk Git - libfaim.git/blob - src/txqueue.c
2a1a92af88c49713158d6f4e0c5dd0cd0dd0e9e8
[libfaim.git] / src / txqueue.c
1 /*
2  *  aim_txqueue.c
3  *
4  * Herein lies all the mangement routines for the transmit (Tx) queue.
5  *
6  */
7
8 #define FAIM_INTERNAL
9 #include <aim.h>
10
11 #ifndef _WIN32
12 #include <sys/socket.h>
13 #endif
14
15 /*
16  * Allocate a new tx frame.
17  *
18  * This is more for looks than anything else.
19  *
20  * Right now, that is.  If/when we implement a pool of transmit
21  * frames, this will become the request-an-unused-frame part.
22  *
23  * framing = AIM_FRAMETYPE_OFT/OSCAR
24  * chan = channel for OSCAR, hdrtype for OFT
25  *
26  */
27 faim_internal struct command_tx_struct *aim_tx_new(struct aim_session_t *sess, struct aim_conn_t *conn, unsigned char framing, int chan, int datalen)
28 {
29   struct command_tx_struct *newtx;
30
31   if (!conn) {
32     faimdprintf(sess, 0, "aim_tx_new: ERROR: no connection specified\n");
33     return NULL;
34   }
35
36   /* For sanity... */
37   if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || (conn->type == AIM_CONN_TYPE_RENDEZVOUS_OUT)) {
38     if (framing != AIM_FRAMETYPE_OFT) {
39       faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for rendezvous connection\n");
40       return NULL;
41     }
42   } else {
43     if (framing != AIM_FRAMETYPE_OSCAR) {
44       faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for FLAP connection\n");
45       return NULL;
46     }
47   }
48
49   newtx = (struct command_tx_struct *)malloc(sizeof(struct command_tx_struct));
50   if (!newtx)
51     return NULL;
52   memset(newtx, 0, sizeof(struct command_tx_struct));
53
54   newtx->conn = conn; 
55
56   if(datalen) {
57     newtx->data = (unsigned char *)malloc(datalen);
58     newtx->commandlen = datalen;
59   } else
60     newtx->data = NULL;
61
62   newtx->hdrtype = framing;
63   if (newtx->hdrtype == AIM_FRAMETYPE_OSCAR) {
64     newtx->hdr.oscar.type = chan;
65   } else if (newtx->hdrtype == AIM_FRAMETYPE_OFT) {
66     newtx->hdr.oft.type = chan;
67     newtx->hdr.oft.hdr2len = 0; /* this will get setup by caller */
68   } else { 
69     faimdprintf(sess, 0, "tx_new: unknown framing\n");
70   }
71
72   return newtx;
73 }
74
75 /*
76  * aim_tx_enqeue__queuebased()
77  *
78  * The overall purpose here is to enqueue the passed in command struct
79  * into the outgoing (tx) queue.  Basically...
80  *   1) Make a scope-irrelevent copy of the struct
81  *   2) Lock the struct
82  *   3) Mark as not-sent-yet
83  *   4) Enqueue the struct into the list
84  *   5) Unlock the struct once it's linked in
85  *   6) Return
86  *
87  * Note that this is only used when doing queue-based transmitting;
88  * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased.
89  *
90  */
91 static int aim_tx_enqueue__queuebased(struct aim_session_t *sess, struct command_tx_struct *newpacket)
92 {
93   struct command_tx_struct *cur;
94
95   if (newpacket->conn == NULL) {
96       faimdprintf(sess, 1, "aim_tx_enqueue: WARNING: enqueueing packet with no connecetion\n");
97       newpacket->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS);
98   }
99  
100   if (newpacket->hdrtype == AIM_FRAMETYPE_OSCAR) {
101     /* assign seqnum */
102     newpacket->hdr.oscar.seqnum = aim_get_next_txseqnum(newpacket->conn);
103   }
104   /* set some more fields */
105   newpacket->lock = 1; /* lock */
106   newpacket->sent = 0; /* not sent yet */
107   newpacket->next = NULL; /* always last */
108
109   /* see overhead note in aim_rxqueue counterpart */
110   if (sess->queue_outgoing == NULL) {
111     sess->queue_outgoing = newpacket;
112   } else {
113     for (cur = sess->queue_outgoing;
114          cur->next;
115          cur = cur->next)
116       ;
117     cur->next = newpacket;
118   }
119
120   newpacket->lock = 0; /* unlock so it can be sent */
121
122   return 0;
123 }
124
125 /*
126  * aim_tx_enqueue__immediate()
127  *
128  * Parallel to aim_tx_enqueue__queuebased, however, this bypasses
129  * the whole queue mess when you want immediate writes to happen.
130  *
131  * Basically the same as its __queuebased couterpart, however
132  * instead of doing a list append, it just calls aim_tx_sendframe()
133  * right here. 
134  * 
135  */
136 static int aim_tx_enqueue__immediate(struct aim_session_t *sess, struct command_tx_struct *newpacket)
137 {
138   if (newpacket->conn == NULL) {
139     faimdprintf(sess, 1, "aim_tx_enqueue: ERROR: packet has no connection\n");
140     if (newpacket->data)
141       free(newpacket->data);
142     free(newpacket);
143     return -1;
144   }
145
146   if (newpacket->hdrtype == AIM_FRAMETYPE_OSCAR)
147     newpacket->hdr.oscar.seqnum = aim_get_next_txseqnum(newpacket->conn);
148
149   newpacket->lock = 1; /* lock */
150   newpacket->sent = 0; /* not sent yet */
151
152   aim_tx_sendframe(sess, newpacket);
153
154   if (newpacket->data)
155     free(newpacket->data);
156   free(newpacket);
157
158   return 0;
159 }
160
161 faim_export int aim_tx_setenqueue(struct aim_session_t *sess, int what,  int (*func)(struct aim_session_t *, struct command_tx_struct *))
162 {
163   if (!sess)
164     return -1;
165
166   if (what == AIM_TX_QUEUED)
167     sess->tx_enqueue = &aim_tx_enqueue__queuebased;
168   else if (what == AIM_TX_IMMEDIATE) 
169     sess->tx_enqueue = &aim_tx_enqueue__immediate;
170   else if (what == AIM_TX_USER) {
171     if (!func)
172       return -1;
173     sess->tx_enqueue = func;
174   } else
175     return -1; /* unknown action */
176
177   return 0;
178 }
179
180 faim_internal int aim_tx_enqueue(struct aim_session_t *sess, struct command_tx_struct *command)
181 {
182   /*
183    * If we want to send a connection thats inprogress, we have to force
184    * them to use the queue based version. Otherwise, use whatever they
185    * want.
186    */
187   if (command && command->conn && (command->conn->status & AIM_CONN_STATUS_INPROGRESS)) {
188     return aim_tx_enqueue__queuebased(sess, command);
189   }
190   return (*sess->tx_enqueue)(sess, command);
191 }
192
193 /* 
194  *  aim_get_next_txseqnum()
195  *
196  *   This increments the tx command count, and returns the seqnum
197  *   that should be stamped on the next FLAP packet sent.  This is
198  *   normally called during the final step of packet preparation
199  *   before enqueuement (in aim_tx_enqueue()).
200  *
201  */
202 faim_internal unsigned int aim_get_next_txseqnum(struct aim_conn_t *conn)
203 {
204   u_int ret;
205   
206   faim_mutex_lock(&conn->seqnum_lock);
207   ret = ++conn->seqnum;
208   faim_mutex_unlock(&conn->seqnum_lock);
209   return ret;
210 }
211
212 /*
213  *  aim_tx_flushqueue()
214  *
215  *  This the function is responsable for putting the queued commands
216  *  onto the wire.  This function is critical to the operation of 
217  *  the queue and therefore is the most prone to brokenness.  It
218  *  seems to be working quite well at this point.
219  *
220  *  Procedure:
221  *    1) Traverse the list, only operate on commands that are unlocked
222  *       and haven't been sent yet.
223  *    2) Lock the struct
224  *    3) Allocate a temporary buffer to store the finished, fully
225  *       processed packet in.
226  *    4) Build the packet from the command_tx_struct data.
227  *    5) Write the packet to the socket.
228  *    6) If success, mark the packet sent, if fail report failure, do NOT
229  *       mark the packet sent (so it will not get purged and therefore
230  *       be attempted again on next call).
231  *    7) Unlock the struct.
232  *    8) Free the temp buffer
233  *    9) Step to next struct in list and go back to 1.
234  *
235  */
236 faim_internal int aim_tx_sendframe(struct aim_session_t *sess, struct command_tx_struct *cur)
237 {
238   int buflen = 0;
239   unsigned char *curPacket;
240
241   if (!cur)
242     return -1; /* fatal */
243
244   cur->lock = 1; /* lock the struct */
245
246   if (cur->hdrtype == AIM_FRAMETYPE_OSCAR)
247     buflen = cur->commandlen + 6;
248   else if (cur->hdrtype == AIM_FRAMETYPE_OFT)
249     buflen = cur->hdr.oft.hdr2len + 8;
250   else {
251     cur->lock = 0;
252     return -1;
253   }
254
255   /* allocate full-packet buffer */
256   if (!(curPacket = (unsigned char *) malloc(buflen))) {
257     cur->lock = 0;
258     return -1;
259   }
260       
261   if (cur->hdrtype == AIM_FRAMETYPE_OSCAR) {
262     /* command byte */
263     curPacket[0] = 0x2a;
264       
265     /* type/family byte */
266     curPacket[1] = cur->hdr.oscar.type;
267       
268     /* bytes 3+4: word: FLAP sequence number */
269     aimutil_put16(curPacket+2, cur->hdr.oscar.seqnum);
270
271     /* bytes 5+6: word: SNAC len */
272     aimutil_put16(curPacket+4, cur->commandlen);
273       
274     /* bytes 7 and on: raw: SNAC data */  /* XXX: ye gods! get rid of this! */
275     memcpy(&(curPacket[6]), cur->data, cur->commandlen);
276
277   } else if (cur->hdrtype == AIM_FRAMETYPE_OFT) {
278     int z = 0;
279
280     z += aimutil_put8(curPacket+z, cur->hdr.oft.magic[0]);
281     z += aimutil_put8(curPacket+z, cur->hdr.oft.magic[1]);
282     z += aimutil_put8(curPacket+z, cur->hdr.oft.magic[2]);
283     z += aimutil_put8(curPacket+z, cur->hdr.oft.magic[3]);
284
285     z += aimutil_put16(curPacket+z, cur->hdr.oft.hdr2len + 8);
286     z += aimutil_put16(curPacket+z, cur->hdr.oft.type);
287
288     memcpy(curPacket+z, cur->hdr.oft.hdr2, cur->hdr.oft.hdr2len);
289   }
290
291   /* 
292    * For OSCAR, a full image of the raw packet data now in curPacket.
293    * For OFT, an image of just the bloated header is in curPacket, 
294    * since OFT allows us to do the data in a different write (yay!).
295    */
296   faim_mutex_lock(&cur->conn->active);
297   if (send(cur->conn->fd, curPacket, buflen, 0) != buflen) {
298     faim_mutex_unlock(&cur->conn->active);
299     cur->sent = 1;
300     aim_conn_close(cur->conn);
301     return 0; /* bail out */
302   }
303
304   if ((cur->hdrtype == AIM_FRAMETYPE_OFT) && cur->commandlen) {
305     int curposi;
306     for(curposi = 0; curposi < cur->commandlen; curposi++)
307       faimdprintf(sess, 0, "%02x ", cur->data[curposi]);
308
309     if (send(cur->conn->fd, cur->data, cur->commandlen, 0) != (int)cur->commandlen) {
310       /* 
311        * Theres nothing we can do about this since we've already sent the 
312        * header!  The connection is unstable.
313        */
314       faim_mutex_unlock(&cur->conn->active);
315       cur->sent = 1;
316       aim_conn_close(cur->conn);
317       return 0; /* bail out */
318     }
319
320   }
321
322   cur->sent = 1; /* mark the struct as sent */
323   cur->conn->lastactivity = time(NULL);
324
325   faim_mutex_unlock(&cur->conn->active);
326
327   if (sess->debug >= 2) {
328     int i;
329
330     faimdprintf(sess, 2, "\nOutgoing packet: (only valid for OSCAR)");
331     for (i = 0; i < buflen; i++) {
332       if (!(i % 8)) 
333         faimdprintf(sess, 2, "\n\t");
334       faimdprintf(sess, 2, "0x%02x ", curPacket[i]);
335     }
336     faimdprintf(sess, 2, "\n");
337   }
338
339   cur->lock = 0; /* unlock the struct */
340
341   free(curPacket); /* free up full-packet buffer */
342
343   return 1; /* success */
344 }
345
346 faim_export int aim_tx_flushqueue(struct aim_session_t *sess)
347 {
348   struct command_tx_struct *cur;
349    
350   if (sess->queue_outgoing == NULL)
351     return 0;
352
353   faimdprintf(sess, 2, "beginning txflush...\n");
354   for (cur = sess->queue_outgoing; cur; cur = cur->next) {
355     /* only process if its unlocked and unsent */
356     if (!cur->lock && !cur->sent) {
357
358       if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS))
359         continue;
360
361       /*
362        * And now for the meager attempt to force transmit
363        * latency and avoid missed messages.
364        */
365       if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) {
366         /* FIXME FIXME -- should be a break! we dont want to block the upper layers */
367         sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL));
368       }
369
370       /* XXX XXX XXX this should call the custom "queuing" function!! */
371       if (aim_tx_sendframe(sess, cur) == -1)
372         break;
373     }
374   }
375
376   /* purge sent commands from queue */
377   aim_tx_purgequeue(sess);
378
379   return 0;
380 }
381
382 /*
383  *  aim_tx_purgequeue()
384  *  
385  *  This is responsable for removing sent commands from the transmit 
386  *  queue. This is not a required operation, but it of course helps
387  *  reduce memory footprint at run time!  
388  *
389  */
390 faim_export void aim_tx_purgequeue(struct aim_session_t *sess)
391 {
392   struct command_tx_struct *cur = NULL;
393   struct command_tx_struct *tmp;
394
395   if (sess->queue_outgoing == NULL)
396     return;
397   
398   if (sess->queue_outgoing->next == NULL) {
399     if (!sess->queue_outgoing->lock && sess->queue_outgoing->sent) {
400       tmp = sess->queue_outgoing;
401       sess->queue_outgoing = NULL;
402       if (tmp->hdrtype == AIM_FRAMETYPE_OFT)
403         free(tmp->hdr.oft.hdr2);
404       free(tmp->data);
405       free(tmp);
406     }
407     return;
408   }
409
410   for(cur = sess->queue_outgoing; cur->next != NULL; ) {
411     if (!cur->next->lock && cur->next->sent) {
412       tmp = cur->next;
413       cur->next = tmp->next;
414       if (tmp->hdrtype == AIM_FRAMETYPE_OFT)
415         free(tmp->hdr.oft.hdr2);
416       free(tmp->data);
417       free(tmp);
418     }   
419     cur = cur->next;
420
421     /* 
422      * Be careful here.  Because of the way we just
423      * manipulated the pointer, cur may be NULL and 
424      * the for() will segfault doing the check unless
425      * we find this case first.
426      */
427     if (cur == NULL)    
428       break;
429   }
430   return;
431 }
This page took 0.059502 seconds and 3 git commands to generate.