source: roaraudio/roard/streams.c @ 826:bab1cca7d75f

Last change on this file since 826:bab1cca7d75f was 826:bab1cca7d75f, checked in by phi, 16 years ago

added experimental support for bidirectional streams

File size: 19.6 KB
Line 
1//streams.c:
2
3/*
4 *      Copyright (C) Philipp 'ph3-der-loewe' Schafft - 2008
5 *
6 *  This file is part of roard a part of RoarAudio,
7 *  a cross-platform sound system for both, home and professional use.
8 *  See README for details.
9 *
10 *  This file is free software; you can redistribute it and/or modify
11 *  it under the terms of the GNU General Public License version 3
12 *  as published by the Free Software Foundation.
13 *
14 *  RoarAudio is distributed in the hope that it will be useful,
15 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 *  GNU General Public License for more details.
18 *
19 *  You should have received a copy of the GNU General Public License
20 *  along with this software; see the file COPYING.  If not, write to
21 *  the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22 *
23 */
24
25#include "roard.h"
26
27int streams_init (void) {
28 int i;
29
30 for (i = 0; i < ROAR_STREAMS_MAX; i++)
31  g_streams[i] = NULL;
32
33 return 0;
34}
35
36int streams_free (void) {
37 int i;
38
39 for (i = 0; i < ROAR_STREAMS_MAX; i++) {
40  if ( g_streams[i] != NULL ) {
41   streams_delete(i);
42  }
43 }
44
45 return 0;
46}
47
48
49int streams_new    (void) {
50 int i, j;
51 struct roar_stream        * n = NULL;
52 struct roar_stream_server * s = NULL;
53
54 for (i = 0; i < ROAR_STREAMS_MAX; i++) {
55  if ( g_streams[i] == NULL ) {
56   s = ROAR_STREAM_SERVER(n = ROAR_STREAM(malloc(sizeof(struct roar_stream_server))));
57   if ( n == NULL ) {
58    ROAR_ERR("streams_new(void): can not allocate memory for new stream: %s", strerror(errno));
59    ROAR_DBG("streams_new(void) = -1");
60    return -1;
61   }
62
63   n->id         = i;
64   n->fh         = -1;
65//   n->pos_rel_id = i;
66   n->database   = NULL;
67   n->dataoff    = NULL;
68   n->datalen    = 0;
69   n->offset     = 0;
70   n->pos        = 0;
71
72   s->client          = -1;
73   s->socktype        = ROAR_SOCKET_TYPE_UNKNOWN;
74   s->buffer          = NULL;
75   s->need_extra      =  0;
76   s->output          = NULL;
77   s->is_new          =  1;
78   s->codecfilter     = -1;
79   s->pre_underruns   =  0;
80   s->post_underruns  =  0;
81   s->codec_orgi      = -1;
82   s->primary         =  0;
83
84   s->mixer.scale     = 65535;
85   s->mixer.rpg_mul   = 1;
86   s->mixer.rpg_div   = 1;
87   for (j = 0; j < ROAR_MAX_CHANNELS; j++)
88    s->mixer.mixer[j] = 65535;
89
90   for (j = 0; j < ROAR_META_MAX_PER_STREAM; j++) {
91    s->meta[j].type   = ROAR_META_TYPE_NONE;
92    s->meta[j].key[0] = 0;
93    s->meta[j].value  = NULL;
94   }
95
96   roar_vio_init_calls(&(s->vio));
97
98   g_streams[i] = s;
99   ROAR_DBG("streams_new(void): n->id=%i", n->id);
100   ROAR_DBG("streams_new(void) = %i", i);
101   return i;
102  }
103 }
104
105 return -1;
106}
107
108int streams_delete (int id) {
109 struct roar_stream_server * s;
110 int prim;
111
112 if ( (s = g_streams[id]) == NULL )
113  return 0;
114
115 ROAR_DBG("streams_delete(id=%i) = ?", id);
116 ROAR_DBG("streams_delete(id=%i): g_streams[id]->id=%i", id, ROAR_STREAM(s)->id);
117
118 if ( s->codecfilter != -1 ) {
119  codecfilter_close(s->codecfilter_inst, s->codecfilter);
120  s->codecfilter_inst = NULL;
121  s->codecfilter = -1;
122 }
123
124 if ( s->client != -1 ) {
125  ROAR_DBG("streams_delete(id=%i): Stream is owned by client %i", id, g_streams[id]->client);
126  client_stream_delete(s->client, id);
127 }
128
129 if ( s->buffer != NULL )
130  roar_buffer_free(s->buffer);
131
132 if ( s->output != NULL )
133  free(s->output);
134
135 if ( ROAR_STREAM(s)->fh != -1 )
136  close(ROAR_STREAM(s)->fh);
137
138 prim = s->primary;
139
140 free(s);
141
142 g_streams[id] = NULL;
143
144 if ( prim ) {
145  alive = 0;
146  clean_quit();
147 }
148
149 ROAR_DBG("streams_delete(id=%i) = 0", id);
150 return 0;
151}
152
153int streams_set_client (int id, int client) {
154 if ( g_streams[id] == NULL )
155  return -1;
156
157 ROAR_DBG("streams_set_client(id=%i): g_streams[id]->id=%i", id, ROAR_STREAM(g_streams[id])->id);
158 g_streams[id]->client = client;
159
160 return 0;
161}
162
163int streams_get_client (int id) {
164 if ( g_streams[id] == NULL )
165  return -1;
166
167 return g_streams[id]->client;
168}
169
170
171int streams_set_fh     (int id, int fh) {
172 int dir;
173
174 if ( g_streams[id] == NULL )
175  return -1;
176
177 ROAR_DBG("streams_set_fh(id=%i): g_streams[id]->id=%i", id, ROAR_STREAM(g_streams[id])->id);
178
179 ROAR_STREAM(g_streams[id])->fh = fh;
180
181 if ( codecfilter_open(&(g_streams[id]->codecfilter_inst), &(g_streams[id]->codecfilter), NULL,
182                  ROAR_STREAM(g_streams[id])->info.codec, g_streams[id]) == -1 ) {
183  return streams_delete(id);
184 }
185
186 if ( fh == -1 ) { // yes, this is valid, indecats full vio!
187  return 0;
188 }
189
190 dir = ROAR_STREAM(g_streams[id])->dir;
191
192 if ( dir == ROAR_DIR_MONITOR || dir == ROAR_DIR_RECORD || dir == ROAR_DIR_OUTPUT ) {
193  shutdown(fh, SHUT_RD);
194 }
195
196 if ( dir == ROAR_DIR_FILTER ) {
197  return 0;
198 } else {
199  return roar_socket_nonblock(fh, ROAR_SOCKET_NONBLOCK);
200 }
201}
202
203int streams_get_fh     (int id) {
204 if ( id < 0 )
205  return -1;
206
207 if ( g_streams[id] == NULL )
208  return -1;
209
210 return ROAR_STREAM(g_streams[id])->fh;
211}
212
213int streams_get    (int id, struct roar_stream_server ** stream) {
214 if ( g_streams[id] == NULL )
215  return -1;
216
217 *stream = g_streams[id];
218
219 return 0;
220}
221
222int streams_set_socktype (int id, int socktype) {
223 if ( g_streams[id] == NULL )
224  return -1;
225
226 g_streams[id]->socktype = socktype;
227
228 return 0;
229}
230
231int streams_get_socktype (int id) {
232 if ( g_streams[id] == NULL )
233  return -1;
234
235 return g_streams[id]->socktype;
236}
237
238int streams_set_primary (int id, int prim) {
239 if ( g_streams[id] == NULL )
240  return -1;
241
242 g_streams[id]->primary = prim;
243
244 return 0;
245}
246
247int streams_mark_primary (int id) {
248 return streams_set_primary(id, 1);
249}
250int streams_get_outputbuffer  (int id, void ** buffer, size_t size) {
251 if ( g_streams[id] == NULL )
252  return -1;
253
254 // output buffer size does never change.
255 if ( g_streams[id]->output != NULL ) {
256  *buffer = g_streams[id]->output;
257  return 0;
258 }
259
260 if ( (g_streams[id]->output = malloc(size)) == NULL ) {
261  ROAR_ERR("streams_get_outputbuffer(*): Can not alloc: %s", strerror(errno));
262  return -1;
263 }
264
265 *buffer = g_streams[id]->output;
266
267 return 0;
268}
269
270int streams_fill_mixbuffer (int id, struct roar_audio_info * info) {
271 // TODO: decide: is this the most complex, hacked, un-understadable,
272 //               un-writeable and even worse: un-readable
273 //               function in the whole project?
274 size_t todo = ROAR_OUTPUT_CALC_OUTBUFSIZE(info);
275 size_t needed = todo;
276 size_t todo_in;
277 size_t len, outlen;
278 size_t mul = 1, div = 1;
279 void * rest = NULL;
280 void * in   = NULL;
281 struct roar_buffer     * buf;
282 struct roar_audio_info * stream_info;
283 struct roar_stream_server * stream = g_streams[id];
284 int is_the_same = 0;
285
286 if ( g_streams[id] == NULL )
287  return -1;
288
289 if ( streams_get_outputbuffer(id, &rest, todo) == -1 ) {
290  return -1;
291 }
292
293 if ( rest == NULL ) {
294  return -1;
295 }
296
297 // set up stream_info
298
299 stream_info = &(ROAR_STREAM(stream)->info);
300
301 // calc todo_in
302 todo_in = ROAR_OUTPUT_CALC_OUTBUFSIZE(stream_info);
303
304 // calc mul and div:
305 mul = todo    / todo_in;
306 div = todo_in / todo;
307
308 if ( mul == 0 ) {
309  mul = 1;
310 } else {
311  div = 1;
312 }
313
314 ROAR_DBG("streams_fill_mixbuffer(*): mul=%i, div=%i", mul, div);
315
316 ROAR_DBG("streams_fill_mixbuffer(*): rest=%p, todo=%i->%i (in->out)", rest, todo_in, todo);
317 // are both (input and output) of same format?
318
319
320 ROAR_DBG("streams_fill_mixbuffer(*): stream_info:");
321 roar_debug_audio_info_print(stream_info);
322 ROAR_DBG("streams_fill_mixbuffer(*): info:");
323 roar_debug_audio_info_print(info);
324
325 is_the_same = stream_info->rate     == info->rate     && stream_info->bits  == info->bits &&
326               stream_info->channels == info->channels && stream_info->codec == info->codec;
327
328 ROAR_DBG("streams_fill_mixbuffer(*): is_the_same=%i", is_the_same);
329
330/* How it works:
331 *
332 * set a counter to the number of samples we need.
333 * loop until we have all samples done or no samples are
334 * left in our input buffer.
335 * If there a no samples left in the input buffer: fill the rest
336 * of the output buffer with zeros.
337 *
338 * The loop:
339 * get a buffer from the input.
340 * if it's bigger than needed, set an offset.
341 * The rest of the data:
342 * 0) convert endianness (codec) from remote to local...
343 * 1) change bits in of the samples
344 * 2) change sample rate
345 * 3) change the nummber of channels
346 * 4) insert into output buffer
347 */
348
349/*
350 // get our first buffer:
351
352 if ( stream_shift_buffer(id, &buf) == -1 ) {
353  return -1;
354 }
355
356 // first test for some basic simple cases...
357
358 if ( buf == NULL ) { // we habe nothing in input queue
359                      // we may memset() our output buffer OR
360                      // just return with -1 so we are going to
361                      // be ignored.
362  return -1;
363 }
364*/
365
366 while (todo) { // main loop
367  ROAR_DBG("streams_fill_mixbuffer(*): looping...");
368  // exit loop if nothing is left, even if we need more data..
369  if ( stream_shift_buffer(id, &buf) == -1 )
370   break;
371  if ( buf == NULL )
372   break;
373
374  // read the data for this loop...
375  roar_buffer_get_data(buf, &in);
376  roar_buffer_get_len(buf, &len);
377
378  ROAR_DBG("streams_fill_mixbuffer(*): len = %i", len);
379
380  if ( len > todo_in ) {
381   roar_buffer_set_offset(buf, todo_in);
382   len = todo_in;
383  } else {
384   roar_buffer_set_len(buf, 0); // queue for deletation
385  }
386
387  // we now have 'len' bytes in 'in'
388
389  // calc how much outlen this has...
390  outlen = (len * mul) / div;
391
392  ROAR_DBG("streams_fill_mixbuffer(*): outlen = %i, buf = %p, len = %i", outlen, in, len);
393
394  if ( is_the_same ) {
395/*
396 * 0) convert endianness (codec) from remote to local...
397 * 1) change bits in of the samples
398 * 2) change sample rate
399 * 3) change the nummber of channels
400   \\==> skiping,...
401 */
402   // * 4) insert into output buffer
403   ROAR_DBG("streams_fill_mixbuffer(*): memcpy: in->rest: %p -> %p", in, rest);
404   if ( memcpy(rest, in, len) != rest ) {
405    ROAR_ERR("streams_fill_mixbuffer(*): memcpy returned invalid pointer.");
406   }
407
408  } else {
409
410/*
411   // * 0) convert endianness (codec) from remote to local...
412   if ( stream_info->codec != info->codec ) {
413    // we neet to convert...
414    return -1;
415   }
416
417   // * 1) change bits in of the samples
418   if ( stream_info->bits != info->bits ) {
419    return -1;
420   }
421
422   // * 2) change sample rate
423   if ( stream_info->rate != info->rate ) {
424    return -1;
425   }
426
427   // * 3) change the nummber of channels
428   if ( stream_info->channels != info->channels ) {
429    return -1;
430   }
431
432   // * 4) insert into output buffer
433*/
434  // hey! we have roar_conv() :)
435
436  if ( roar_conv(rest, in, 8*len / stream_info->bits, stream_info, info) == -1 )
437   return -1;
438  }
439
440  if ( change_vol(rest, info->bits, rest, 8*outlen / info->bits, info->channels, &(stream->mixer)) == -1 )
441   return -1;
442
443  // we habe outlen bytes more...
444  todo    -= outlen;
445  rest    += outlen;
446  todo_in -= len;
447
448  roar_buffer_get_len(buf, &len);
449  ROAR_DBG("streams_fill_mixbuffer(*): New length of buffer %p is %i", buf, len);
450  if ( len == 0 ) {
451   roar_buffer_delete(buf, NULL);
452  } else {
453   stream_unshift_buffer(id, buf);
454  }
455 }
456
457//len = 0;
458//roar_buffer_get_len(buf, &len);
459
460/*
461 if ( len > 0 ) // we still have some data in this buffer, re-inserting it to the input buffers...
462  stream_unshift_buffer(id, buf);
463 else
464  buffer_delete(buf, NULL);
465*/
466
467 ROAR_STREAM(g_streams[id])->pos =
468      ROAR_MATH_OVERFLOW_ADD(ROAR_STREAM(g_streams[id])->pos,
469          ROAR_OUTPUT_CALC_OUTBUFSAMP(info, needed-todo));
470 //ROAR_WARN("stream=%i, pos=%u", id, ((struct roar_stream*)g_streams[id])->pos);
471
472 if ( todo > 0 ) { // zeroize the rest of the buffer
473  memset(rest, 0, todo);
474
475  if ( todo != ROAR_OUTPUT_CALC_OUTBUFSIZE(info) ) {
476   if ( g_streams[id]->is_new ) {
477    stream->pre_underruns++;
478   } else {
479    ROAR_WARN("streams_fill_mixbuffer(*): Underrun in stream: %u bytes missing, filling with zeros", (unsigned int)todo);
480    stream->post_underruns++;
481   }
482
483   stream->is_new = 0;
484  }
485 } else {
486  stream->is_new = 0;
487 }
488
489 return 0;
490}
491
492
493int streams_get_mixbuffers (void *** bufferlist, struct roar_audio_info * info, unsigned int pos) {
494 static void * bufs[ROAR_STREAMS_MAX+1];
495 int i;
496 int have = 0;
497
498 for (i = 0; i < ROAR_STREAMS_MAX; i++) {
499  if ( g_streams[i] != NULL ) {
500   if ( ROAR_STREAM(g_streams[i])->dir != ROAR_DIR_PLAY && ROAR_STREAM(g_streams[i])->dir != ROAR_DIR_BIDIR )
501    continue;
502
503   if ( streams_get_outputbuffer(i, &bufs[have], ROAR_OUTPUT_CALC_OUTBUFSIZE(info)) == -1 ) {
504    ROAR_ERR("streams_get_mixbuffer(*): Can not alloc output buffer for stream %i, BAD!", i);
505    ROAR_ERR("streams_get_mixbuffer(*): Ignoring stream for this round.");
506    continue;
507   }
508   if ( streams_fill_mixbuffer(i, info) == -1 ) {
509    ROAR_ERR("streams_get_mixbuffer(*): Can not fill output buffer for stream %i, this should not happen", i);
510    continue;
511   }
512
513//   printf("D: bufs[have=%i] = %p\n", have, bufs[have]);
514
515   ROAR_DBG("streams_get_mixbuffers(*):  bufs[have] = %p", bufs[have]);
516   ROAR_DBG("streams_get_mixbuffers(*): *bufs[have] = 0x%08x...", *(uint32_t*)bufs[have]);
517
518   have++; // we have a new stream!
519  }
520 }
521
522 bufs[have] = NULL;
523 //printf("D: bufs[have=%i] = %p\n", have, bufs[have]);
524
525 ROAR_DBG("streams_get_mixbuffers(*): have = %i", have);
526
527 *bufferlist = bufs;
528 return have;
529}
530
531
532int stream_add_buffer  (int id, struct roar_buffer * buf) {
533 ROAR_DBG("stream_add_buffer(id=%i, buf=%p) = ?", id, buf);
534
535 if ( g_streams[id] == NULL )
536  return -1;
537
538 if ( g_streams[id]->buffer == NULL ) {
539  g_streams[id]->buffer = buf;
540  ROAR_DBG("stream_add_buffer(id=%i, buf=%p) = 0", id, buf);
541  return 0;
542 }
543
544 ROAR_DBG("stream_add_buffer(id=%i, buf=%p) = ?", id, buf);
545 return roar_buffer_add(g_streams[id]->buffer, buf);
546}
547
548int stream_shift_buffer   (int id, struct roar_buffer ** buf) {
549 struct roar_buffer * next;
550
551 if ( g_streams[id] == NULL )
552  return -1;
553
554 if ( g_streams[id]->buffer == NULL ) {
555  *buf = NULL;
556  return 0;
557 }
558
559 roar_buffer_get_next(g_streams[id]->buffer, &next);
560
561 *buf                  = g_streams[id]->buffer;
562 g_streams[id]->buffer = next;
563
564 return 0;
565}
566int stream_unshift_buffer (int id, struct roar_buffer *  buf) {
567 if ( g_streams[id] == NULL )
568  return -1;
569
570 if ( g_streams[id]->buffer == NULL ) {
571  g_streams[id]->buffer = buf;
572  return 0;
573 }
574
575 buf->next = NULL;
576
577 roar_buffer_add(buf, g_streams[id]->buffer);
578
579 g_streams[id]->buffer = buf;
580
581 return 0;
582}
583
584int streams_check  (int id) {
585 int fh;
586 ssize_t req, realreq, done;
587 struct roar_stream        *   s;
588 struct roar_stream_server *  ss;
589 struct roar_buffer        *   b;
590 char                      * buf;
591
592 if ( g_streams[id] == NULL )
593  return -1;
594
595 ROAR_DBG("streams_check(id=%i) = ?", id);
596
597 s = ROAR_STREAM(ss = g_streams[id]);
598
599 if ( (fh = s->fh) == -1 )
600  return 0;
601
602 if ( s->dir != ROAR_DIR_PLAY && s->dir != ROAR_DIR_BIDIR )
603  return 0;
604
605 ROAR_DBG("streams_check(id=%i): fh = %i", id, fh);
606
607 req  = ROAR_OUTPUT_BUFFER_SAMPLES * s->info.channels * s->info.bits / 8; // optimal size
608 req += ss->need_extra; // bytes left we sould get....
609
610 if ( roar_buffer_new(&b, req) == -1 ) {
611  ROAR_ERR("streams_check(*): Can not alloc buffer space!");
612  ROAR_DBG("streams_check(*) = -1");
613  return -1;
614 }
615
616 roar_buffer_get_data(b, (void **)&buf);
617
618 ROAR_DBG("streams_check(id=%i): buffer is up and ready ;)", id);
619
620 if ( ss->codecfilter == -1 ) {
621  realreq = req;
622/*
623  req = read(fh, buf, req);
624  if ( req < realreq ) { // we can do this as the stream is in nonblocking mode!
625   if ( (realreq = read(fh, buf+req, realreq-req)) > 0 )
626    req += realreq;
627  }
628*/
629  done = 0;
630  while (req > 0 && done != realreq) {
631   if ( (req = read(fh, buf+done, realreq-done)) > 0 )
632    done += req;
633  }
634  req = done;
635 } else {
636  req = codecfilter_read(ss->codecfilter_inst, ss->codecfilter, buf, req);
637 }
638
639 if ( req > 0 ) {
640  ROAR_DBG("streams_check(id=%i): got %i bytes", id, req);
641
642  roar_buffer_set_len(b, req);
643
644  if ( stream_add_buffer(id, b) != -1 )
645   return 0;
646
647  ROAR_ERR("streams_check(id=%i): something is wrong, could not add buffer to stream!", id);
648  roar_buffer_free(b);
649 } else {
650  ROAR_DBG("streams_check(id=%i): read() = %i // errno: %s", id, req, strerror(errno));
651#ifdef ROAR_HAVE_LIBVORBISFILE
652  if ( errno != EAGAIN && errno != ESPIPE ) { // libvorbis file trys to seek a bit ofen :)
653#else
654  if ( errno != EAGAIN ) {
655#endif
656   ROAR_DBG("streams_check(id=%i): EOF!", id);
657   streams_delete(id);
658   ROAR_DBG("streams_check(id=%i) = 0", id);
659  }
660  roar_buffer_free(b);
661  return 0;
662 }
663
664
665 ROAR_DBG("streams_check(id=%i) = -1", id);
666 return -1;
667}
668
669
670int streams_send_mon   (int id) {
671 int fh;
672 struct roar_stream        *   s;
673 struct roar_stream_server *  ss;
674 void * obuf;
675 int    olen;
676 int    need_to_free = 0;
677
678 if ( g_streams[id] == NULL )
679  return -1;
680
681 ROAR_DBG("streams_send_mon(id=%i) = ?", id);
682
683 s = ROAR_STREAM((ss = g_streams[id]));
684
685 if ( (fh = s->fh) == -1 )
686  return 0;
687
688 if ( s->dir != ROAR_DIR_MONITOR && s->dir != ROAR_DIR_OUTPUT && s->dir != ROAR_DIR_BIDIR )
689  return 0;
690
691 ROAR_DBG("streams_send_mon(id=%i): fh = %i", id, fh);
692
693 if ( s->info.channels != g_sa->channels || s->info.bits  != g_sa->bits ||
694      s->info.rate     != g_sa->rate     || s->info.codec != g_sa->codec  ) {
695  olen = ROAR_OUTPUT_CALC_OUTBUFSIZE(&(s->info)); // we hope g_output_buffer_len
696                                                  // is ROAR_OUTPUT_CALC_OUTBUFSIZE(g_sa) here
697  if ( (obuf = malloc(olen)) == NULL )
698   return -1;
699
700  need_to_free = 1;
701
702  ROAR_DBG("streams_send_mon(id=%i): obuf=%p, olen=%i", id, obuf, olen);
703
704  if ( roar_conv(obuf, g_output_buffer, ROAR_OUTPUT_BUFFER_SAMPLES*g_sa->channels, g_sa, &(s->info)) == -1 ) {
705   free(obuf);
706   return -1;
707  }
708 } else {
709  obuf = g_output_buffer;
710  olen = g_output_buffer_len;
711 }
712
713 errno = 0;
714
715 if ( ss->codecfilter == -1 ) {
716  if ( write(fh, obuf, olen) == olen ) {
717   if ( need_to_free ) free(obuf);
718   return 0;
719  }
720 } else {
721  if ( codecfilter_write(ss->codecfilter_inst, ss->codecfilter, obuf, olen)
722            == olen ) {
723   if ( need_to_free ) free(obuf);
724   return 0;
725  } else { // we cann't retry on codec filetered streams
726   if ( need_to_free ) free(obuf);
727   streams_delete(id);
728   return -1;
729  }
730 }
731
732 if ( errno == EAGAIN ) {
733  // ok, the client blocks for a moment, we try to sleep a bit an retry in the hope not to
734  // make any gapes in any output because of this
735
736  usleep(100); // 0.1ms
737
738  if ( write(fh, obuf, olen) == olen ) {
739   if ( need_to_free ) free(obuf);
740   return 0;
741  }
742 }
743
744 // ug... error... delete stream!
745
746 if ( need_to_free ) free(obuf);
747 streams_delete(id);
748
749 return -1;
750}
751
752int streams_send_filter(int id) {
753 int fh;
754 int have = 0;
755 int len;
756 struct roar_stream        *   s;
757 struct roar_stream_server *  ss;
758
759 if ( g_streams[id] == NULL )
760  return -1;
761
762 ROAR_DBG("streams_send_filter(id=%i) = ?", id);
763
764 s = ROAR_STREAM(ss = g_streams[id]);
765
766 if ( (fh = s->fh) == -1 )
767  return 0;
768
769 if ( s->dir != ROAR_DIR_FILTER )
770  return 0;
771
772 ROAR_DBG("streams_send_filter(id=%i): fh = %i", id, fh);
773
774 if ( write(fh, g_output_buffer, g_output_buffer_len) == g_output_buffer_len ) {
775  while ( have < g_output_buffer_len ) {
776   if ( (len = read(fh, g_output_buffer+have, g_output_buffer_len-have)) < 1 ) {
777    streams_delete(id);
778    return -1;
779   }
780   have += len;
781  }
782  return 0;
783 }
784
785 // ug... error... delete stream!
786
787 streams_delete(id);
788
789 return -1;
790}
791
792
793// VIO:
794
795ssize_t stream_vio_read (int stream, void *buf, size_t count) {
796 struct roar_stream_server * s = g_streams[stream];
797
798 if ( !s )
799  return -1;
800
801 return stream_vio_s_read(s, buf, count);
802}
803
804ssize_t stream_vio_write(int stream, void *buf, size_t count) {
805 struct roar_stream_server * s = g_streams[stream];
806
807 if ( !s )
808  return -1;
809
810 return stream_vio_s_write(s, buf, count);
811}
812
813
814ssize_t stream_vio_s_read (struct roar_stream_server * stream, void *buf, size_t count) {
815  size_t len =  0;
816 ssize_t r   = -1;
817
818 errno = 0;
819
820 if ( !stream )
821  return -1;
822
823 if ( ! stream->vio.read )
824  return -1;
825
826 while ( (r = stream->vio.read(ROAR_STREAM(stream)->fh, buf, count, stream->vio.inst)) > 0 ) {
827  len   += r;
828  buf   += r;
829  count -= r;
830  if ( count == 0 )
831   break;
832 }
833
834 if ( len == 0 && r == -1 )
835  return -1;
836
837 return len;
838}
839
840ssize_t stream_vio_s_write(struct roar_stream_server * stream, void *buf, size_t count) {
841 errno = 0;
842
843 if ( !stream )
844  return -1;
845
846 if ( ! stream->vio.write )
847  return -1;
848
849 return stream->vio.write(ROAR_STREAM(stream)->fh, buf, count, stream->vio.inst);
850}
851
852//ll
Note: See TracBrowser for help on using the repository browser.