source: roaraudio/roard/streams.c @ 334:89308957eafe

Last change on this file since 334:89308957eafe was 334:89308957eafe, checked in by phi, 16 years ago

resolved a memory leak

File size: 15.0 KB
Line 
1//streams.c:
2
3#include "roard.h"
4
5int streams_init (void) {
6 int i;
7
8 for (i = 0; i < ROAR_STREAMS_MAX; i++)
9  g_streams[i] = NULL;
10
11 return 0;
12}
13
14int streams_free (void) {
15 int i;
16
17 for (i = 0; i < ROAR_STREAMS_MAX; i++) {
18  if ( g_streams[i] != NULL ) {
19   streams_delete(i);
20  }
21 }
22
23 return 0;
24}
25
26
27int streams_new    (void) {
28 int i, j;
29 struct roar_stream * n = NULL;
30
31 for (i = 0; i < ROAR_STREAMS_MAX; i++) {
32  if ( g_streams[i] == NULL ) {
33   n = malloc(sizeof(struct roar_stream_server));
34   if ( n == NULL ) {
35    ROAR_ERR("streams_new(void): can not allocate memory for new stream: %s", strerror(errno));
36    ROAR_DBG("streams_new(void) = -1");
37    return -1;
38   }
39
40   n->id         = i;
41   n->fh         = -1;
42//   n->pos_rel_id = i;
43   n->database   = NULL;
44   n->dataoff    = NULL;
45   n->datalen    = 0;
46   n->offset     = 0;
47
48   ((struct roar_stream_server*)n)->client        = -1;
49   ((struct roar_stream_server*)n)->buffer        = NULL;
50   ((struct roar_stream_server*)n)->need_extra    =  0;
51   ((struct roar_stream_server*)n)->output        = NULL;
52   ((struct roar_stream_server*)n)->is_new        =  1;
53   ((struct roar_stream_server*)n)->codecfilter   = -1;
54   ((struct roar_stream_server*)n)->mixer.scale   = 65535;
55   ((struct roar_stream_server*)n)->mixer.rpg_mul = 1;
56   ((struct roar_stream_server*)n)->mixer.rpg_div = 1;
57   for (j = 0; j < ROAR_MAX_CHANNELS; j++)
58    ((struct roar_stream_server*)n)->mixer.mixer[j] = 65535;
59   for (j = 0; j < ROAR_META_MAX_PER_STREAM; j++) {
60    ((struct roar_stream_server*)n)->meta[j].type   = ROAR_META_TYPE_NONE;
61    ((struct roar_stream_server*)n)->meta[j].key[0] = 0;
62    ((struct roar_stream_server*)n)->meta[j].value  = NULL;
63   }
64
65   g_streams[i] = (struct roar_stream_server*)n;
66   ROAR_DBG("streams_new(void) = %i", i);
67   return i;
68  }
69 }
70
71 return -1;
72}
73
74int streams_delete (int id) {
75 if ( g_streams[id] == NULL )
76  return 0;
77
78 ROAR_DBG("streams_delete(id=%i) = ?", id);
79
80 if ( g_streams[id]->codecfilter != -1 ) {
81  codecfilter_close(g_streams[id]->codecfilter_inst, g_streams[id]->codecfilter);
82  g_streams[id]->codecfilter = -1;
83 }
84
85 if ( g_streams[id]->client != -1 ) {
86  ROAR_DBG("streams_delete(id=%i): Stream is owned by client %i", id, g_streams[id]->client);
87  client_stream_delete(g_streams[id]->client, id);
88 }
89
90 if ( g_streams[id]->buffer != NULL )
91  roar_buffer_free(g_streams[id]->buffer);
92
93 if ( g_streams[id]->output != NULL )
94  free(g_streams[id]->output);
95
96 if ( ROAR_STREAM(g_streams[id])->fh != -1 )
97  close(ROAR_STREAM(g_streams[id])->fh);
98
99 free(g_streams[id]);
100
101 g_streams[id] = NULL;
102
103 ROAR_DBG("streams_delete(id=%i) = 0", id);
104 return 0;
105}
106
107int streams_set_client (int id, int client) {
108 if ( g_streams[id] == NULL )
109  return -1;
110
111 g_streams[id]->client = client;
112
113 return 0;
114}
115
116int streams_set_fh     (int id, int fh) {
117 int dir;
118
119 if ( g_streams[id] == NULL )
120  return -1;
121
122 ((struct roar_stream *)g_streams[id])->fh = fh;
123
124 codecfilter_open(&(g_streams[id]->codecfilter_inst), &(g_streams[id]->codecfilter), NULL,
125                  ((struct roar_stream *)g_streams[id])->info.codec, g_streams[id]);
126
127 dir = ((struct roar_stream *)g_streams[id])->dir;
128
129 if ( dir == ROAR_DIR_MONITOR || dir == ROAR_DIR_RECORD ) {
130  shutdown(fh, SHUT_RD);
131 }
132
133 if ( dir == ROAR_DIR_FILTER ) {
134  return 0;
135 } else {
136  return roar_socket_nonblock(fh, ROAR_SOCKET_NONBLOCK);
137 }
138}
139
140int streams_get_fh     (int id) {
141 if ( id < 0 )
142  return -1;
143
144 if ( g_streams[id] == NULL )
145  return -1;
146
147 return ((struct roar_stream *)g_streams[id])->fh;
148}
149
150int streams_get    (int id, struct roar_stream_server ** stream) {
151 if ( g_streams[id] == NULL )
152  return -1;
153
154 *stream = g_streams[id];
155
156 return 0;
157}
158
159
160int streams_get_outputbuffer  (int id, void ** buffer, size_t size) {
161 if ( g_streams[id] == NULL )
162  return -1;
163
164 // output buffer size does never change.
165 if ( g_streams[id]->output != NULL ) {
166  *buffer = g_streams[id]->output;
167  return 0;
168 }
169
170 if ( (g_streams[id]->output = malloc(size)) == NULL ) {
171  ROAR_ERR("streams_get_outputbuffer(*): Can not alloc: %s", strerror(errno));
172  return -1;
173 }
174
175 *buffer = g_streams[id]->output;
176
177 return 0;
178}
179
180int streams_fill_mixbuffer (int id, struct roar_audio_info * info) {
181 // TODO: decide: is this the most complex, hacked, un-understadable,
182 //               un-writeable and even worse: un-readable
183 //               function in the whole project?
184 size_t todo = ROAR_OUTPUT_CALC_OUTBUFSIZE(info);
185 size_t todo_in;
186 size_t len, outlen;
187 size_t mul = 1, div = 1;
188 void * rest = NULL;
189 void * in   = NULL;
190 struct roar_buffer     * buf;
191 struct roar_audio_info * stream_info;
192 int is_the_same = 0;
193
194 if ( g_streams[id] == NULL )
195  return -1;
196
197 if ( streams_get_outputbuffer(id, &rest, todo) == -1 ) {
198  return -1;
199 }
200
201 if ( rest == NULL ) {
202  return -1;
203 }
204
205 // set up stream_info
206
207 stream_info = &(((struct roar_stream*)g_streams[id])->info);
208
209 // calc todo_in
210 todo_in = ROAR_OUTPUT_CALC_OUTBUFSIZE(stream_info);
211
212 // calc mul and div:
213 mul = todo    / todo_in;
214 div = todo_in / todo;
215
216 if ( mul == 0 ) {
217  mul = 1;
218 } else {
219  div = 1;
220 }
221
222 ROAR_DBG("streams_fill_mixbuffer(*): mul=%i, div=%i", mul, div);
223
224 ROAR_DBG("streams_fill_mixbuffer(*): rest=%p, todo=%i->%i (in->out)", rest, todo_in, todo);
225 // are both (input and output) of same format?
226
227
228 ROAR_DBG("streams_fill_mixbuffer(*): stream_info:");
229 roar_debug_audio_info_print(stream_info);
230 ROAR_DBG("streams_fill_mixbuffer(*): info:");
231 roar_debug_audio_info_print(info);
232
233 is_the_same = stream_info->rate     == info->rate     && stream_info->bits  == info->bits &&
234               stream_info->channels == info->channels && stream_info->codec == info->codec;
235
236 ROAR_DBG("streams_fill_mixbuffer(*): is_the_same=%i", is_the_same);
237
238/* How it works:
239 *
240 * set a counter to the number of samples we need.
241 * loop until we have all samples done or no samples are
242 * left in our input buffer.
243 * If there a no samples left in the input buffer: fill the rest
244 * of the output buffer with zeros.
245 *
246 * The loop:
247 * get a buffer from the input.
248 * if it's bigger than needed, set an offset.
249 * The rest of the data:
250 * 0) convert endianness (codec) from remote to local...
251 * 1) change bits in of the samples
252 * 2) change sample rate
253 * 3) change the nummber of channels
254 * 4) insert into output buffer
255 */
256
257/*
258 // get our first buffer:
259
260 if ( stream_shift_buffer(id, &buf) == -1 ) {
261  return -1;
262 }
263
264 // first test for some basic simple cases...
265
266 if ( buf == NULL ) { // we habe nothing in input queue
267                      // we may memset() our output buffer OR
268                      // just return with -1 so we are going to
269                      // be ignored.
270  return -1;
271 }
272*/
273
274 while (todo) { // main loop
275  ROAR_DBG("streams_fill_mixbuffer(*): looping...");
276  // exit loop if nothing is left, even if we need more data..
277  if ( stream_shift_buffer(id, &buf) == -1 )
278   break;
279  if ( buf == NULL )
280   break;
281
282  // read the data for this loop...
283  roar_buffer_get_data(buf, &in);
284  roar_buffer_get_len(buf, &len);
285
286  ROAR_DBG("streams_fill_mixbuffer(*): len = %i", len);
287
288  if ( len > todo_in ) {
289   roar_buffer_set_offset(buf, todo_in);
290   len = todo_in;
291  } else {
292   roar_buffer_set_len(buf, 0); // queue for deletation
293  }
294
295  // we now have 'len' bytes in 'in'
296
297  // calc how much outlen this has...
298  outlen = (len * mul) / div;
299
300  ROAR_DBG("streams_fill_mixbuffer(*): outlen = %i, buf = %p, len = %i", outlen, in, len);
301
302  if ( is_the_same ) {
303/*
304 * 0) convert endianness (codec) from remote to local...
305 * 1) change bits in of the samples
306 * 2) change sample rate
307 * 3) change the nummber of channels
308   \\==> skiping,...
309 */
310   // * 4) insert into output buffer
311   ROAR_DBG("streams_fill_mixbuffer(*): memcpy: in->rest: %p -> %p", in, rest);
312   if ( memcpy(rest, in, len) != rest ) {
313    ROAR_ERR("streams_fill_mixbuffer(*): memcpy returned invalid pointer.");
314   }
315
316  } else {
317
318/*
319   // * 0) convert endianness (codec) from remote to local...
320   if ( stream_info->codec != info->codec ) {
321    // we neet to convert...
322    return -1;
323   }
324
325   // * 1) change bits in of the samples
326   if ( stream_info->bits != info->bits ) {
327    return -1;
328   }
329
330   // * 2) change sample rate
331   if ( stream_info->rate != info->rate ) {
332    return -1;
333   }
334
335   // * 3) change the nummber of channels
336   if ( stream_info->channels != info->channels ) {
337    return -1;
338   }
339
340   // * 4) insert into output buffer
341*/
342  // hey! we have roar_conv() :)
343
344  if ( roar_conv(rest, in, 8*len / stream_info->bits, stream_info, info) == -1 )
345   return -1;
346  }
347
348  if ( change_vol(rest, info->bits, rest, 8*outlen / info->bits, info->channels, &(((struct roar_stream_server*)g_streams[id])->mixer)) == -1 )
349   return -1;
350
351  // we habe outlen bytes more...
352  todo    -= outlen;
353  rest    += outlen;
354  todo_in -= len;
355
356  roar_buffer_get_len(buf, &len);
357  ROAR_DBG("streams_fill_mixbuffer(*): New length of buffer %p is %i", buf, len);
358  if ( len == 0 ) {
359   roar_buffer_delete(buf, NULL);
360  } else {
361   stream_unshift_buffer(id, buf);
362  }
363 }
364
365//len = 0;
366//roar_buffer_get_len(buf, &len);
367
368/*
369 if ( len > 0 ) // we still have some data in this buffer, re-inserting it to the input buffers...
370  stream_unshift_buffer(id, buf);
371 else
372  buffer_delete(buf, NULL);
373*/
374
375 if ( todo > 0 ) { // zeroize the rest of teh buffer
376  memset(rest, 0, todo);
377
378  if ( todo != ROAR_OUTPUT_CALC_OUTBUFSIZE(info) ) {
379   if ( !g_streams[id]->is_new )
380    ROAR_WARN("streams_fill_mixbuffer(*): Underrun in stream: %i bytes missing, filling with zeros", todo);
381
382   g_streams[id]->is_new = 0;
383  }
384 } else {
385  g_streams[id]->is_new = 0;
386 }
387
388 return 0;
389}
390
391
392int streams_get_mixbuffers (void *** bufferlist, struct roar_audio_info * info, unsigned int pos) {
393 static void * bufs[ROAR_STREAMS_MAX+1];
394 int i;
395 int have = 0;
396
397 for (i = 0; i < ROAR_STREAMS_MAX; i++) {
398  if ( g_streams[i] != NULL ) {
399   if ( ((struct roar_stream *)g_streams[i])->dir != ROAR_DIR_PLAY )
400    continue;
401
402   if ( streams_get_outputbuffer(i, &bufs[have], ROAR_OUTPUT_CALC_OUTBUFSIZE(info)) == -1 ) {
403    ROAR_ERR("streams_get_mixbuffer(*): Can not alloc output buffer for stream %i, BAD!", i);
404    ROAR_ERR("streams_get_mixbuffer(*): Ignoring stream for this round.");
405    continue;
406   }
407   if ( streams_fill_mixbuffer(i, info) == -1 ) {
408    ROAR_ERR("streams_get_mixbuffer(*): Can not fill output buffer for stream %i, this should not happen", i);
409    continue;
410   }
411
412//   printf("D: bufs[have=%i] = %p\n", have, bufs[have]);
413
414   ROAR_DBG("streams_get_mixbuffers(*):  bufs[have] = %p", bufs[have]);
415   ROAR_DBG("streams_get_mixbuffers(*): *bufs[have] = 0x%08x...", *(uint32_t*)bufs[have]);
416
417   have++; // we have a new stream!
418  }
419 }
420
421 bufs[have] = NULL;
422 //printf("D: bufs[have=%i] = %p\n", have, bufs[have]);
423
424 ROAR_DBG("streams_get_mixbuffers(*): have = %i", have);
425
426 *bufferlist = bufs;
427 return 0;
428}
429
430
431int stream_add_buffer  (int id, struct roar_buffer * buf) {
432 ROAR_DBG("stream_add_buffer(id=%i, buf=%p) = ?", id, buf);
433
434 if ( g_streams[id] == NULL )
435  return -1;
436
437 if ( g_streams[id]->buffer == NULL ) {
438  g_streams[id]->buffer = buf;
439  ROAR_DBG("stream_add_buffer(id=%i, buf=%p) = 0", id, buf);
440  return 0;
441 }
442
443 ROAR_DBG("stream_add_buffer(id=%i, buf=%p) = ?", id, buf);
444 return roar_buffer_add(g_streams[id]->buffer, buf);
445}
446
447int stream_shift_buffer   (int id, struct roar_buffer ** buf) {
448 struct roar_buffer * next;
449
450 if ( g_streams[id] == NULL )
451  return -1;
452
453 if ( g_streams[id]->buffer == NULL ) {
454  *buf = NULL;
455  return 0;
456 }
457
458 roar_buffer_get_next(g_streams[id]->buffer, &next);
459
460 *buf                  = g_streams[id]->buffer;
461 g_streams[id]->buffer = next;
462
463 return 0;
464}
465int stream_unshift_buffer (int id, struct roar_buffer *  buf) {
466 if ( g_streams[id] == NULL )
467  return -1;
468
469 if ( g_streams[id]->buffer == NULL ) {
470  g_streams[id]->buffer = buf;
471  return 0;
472 }
473
474 buf->next = NULL;
475
476 roar_buffer_add(buf, g_streams[id]->buffer);
477
478 g_streams[id]->buffer = buf;
479
480 return 0;
481}
482
483int streams_check  (int id) {
484 int fh;
485 ssize_t req;
486 struct roar_stream        *   s;
487 struct roar_stream_server *  ss;
488 struct roar_buffer        *   b;
489 char                      * buf;
490
491 if ( g_streams[id] == NULL )
492  return -1;
493
494 ROAR_DBG("streams_check(id=%i) = ?", id);
495
496 s = (struct roar_stream *) (ss = g_streams[id]);
497
498 if ( (fh = s->fh) == -1 )
499  return 0;
500
501 if ( s->dir != ROAR_DIR_PLAY )
502  return 0;
503
504 ROAR_DBG("streams_check(id=%i): fh = %i", id, fh);
505
506 req  = ROAR_OUTPUT_BUFFER_SAMPLES * s->info.channels * s->info.bits / 8; // optimal size
507 req += ss->need_extra; // bytes left we sould get....
508
509 if ( roar_buffer_new(&b, req) == -1 ) {
510  ROAR_ERR("streams_check(*): Can not alloc buffer space!");
511  ROAR_DBG("streams_check(*) = -1");
512  return -1;
513 }
514
515 roar_buffer_get_data(b, (void **)&buf);
516
517 ROAR_DBG("streams_check(id=%i): buffer is up and ready ;)", id);
518
519 if ( ss->codecfilter == -1 ) {
520  req = read(fh, buf, req);
521 } else {
522  req = codecfilter_read(ss->codecfilter_inst, ss->codecfilter, buf, req);
523 }
524
525 if ( req > 0 ) {
526  ROAR_DBG("streams_check(id=%i): got %i bytes", id, req);
527
528  roar_buffer_set_len(b, req);
529
530  if ( stream_add_buffer(id, b) != -1 )
531   return 0;
532
533  ROAR_ERR("streams_check(id=%i): something is wrong, could not add buffer to stream!", id);
534  roar_buffer_free(b);
535 } else {
536  ROAR_DBG("streams_check(id=%i): read() = %i // errno: %s", id, req, strerror(errno));
537#ifdef ROAR_HAVE_LIBVORBISFILE
538  if ( errno != EAGAIN && errno != ESPIPE ) { // libvorbis file trys to seek a bit ofen :)
539#else
540  if ( errno != EAGAIN ) {
541#endif
542   ROAR_DBG("streams_check(id=%i): EOF!", id);
543   streams_delete(id);
544   ROAR_DBG("streams_check(id=%i) = 0", id);
545  }
546  roar_buffer_free(b);
547  return 0;
548 }
549
550
551 ROAR_DBG("streams_check(id=%i) = -1", id);
552 return -1;
553}
554
555
556int streams_send_mon   (int id) {
557 int fh;
558 struct roar_stream        *   s;
559 struct roar_stream_server *  ss;
560
561 if ( g_streams[id] == NULL )
562  return -1;
563
564 ROAR_DBG("streams_send_mon(id=%i) = ?", id);
565
566 s = (struct roar_stream *) (ss = g_streams[id]);
567
568 if ( (fh = s->fh) == -1 )
569  return 0;
570
571 if ( s->dir != ROAR_DIR_MONITOR )
572  return 0;
573
574 ROAR_DBG("streams_send_mon(id=%i): fh = %i", id, fh);
575
576 if ( write(fh, g_output_buffer, g_output_buffer_len) == g_output_buffer_len )
577  return 0;
578
579 if ( errno == EAGAIN ) {
580  // ok, the client blocks for a moment, we try to sleep a bit an retry in the hope not to
581  // make any gapes in any output because of this
582
583  usleep(100); // 0.1ms
584
585  if ( write(fh, g_output_buffer, g_output_buffer_len) == g_output_buffer_len )
586   return 0;
587 }
588
589 // ug... error... delete stream!
590
591 streams_delete(id);
592
593 return -1;
594}
595
596int streams_send_filter(int id) {
597 int fh;
598 int have = 0;
599 int len;
600 struct roar_stream        *   s;
601 struct roar_stream_server *  ss;
602
603 if ( g_streams[id] == NULL )
604  return -1;
605
606 ROAR_DBG("streams_send_filter(id=%i) = ?", id);
607
608 s = (struct roar_stream *) (ss = g_streams[id]);
609
610 if ( (fh = s->fh) == -1 )
611  return 0;
612
613 if ( s->dir != ROAR_DIR_FILTER )
614  return 0;
615
616 ROAR_DBG("streams_send_filter(id=%i): fh = %i", id, fh);
617
618 if ( write(fh, g_output_buffer, g_output_buffer_len) == g_output_buffer_len ) {
619  while ( have < g_output_buffer_len ) {
620   if ( (len = read(fh, g_output_buffer+have, g_output_buffer_len-have)) < 1 ) {
621    streams_delete(id);
622    return -1;
623   }
624   have += len;
625  }
626  return 0;
627 }
628
629 // ug... error... delete stream!
630
631 streams_delete(id);
632
633 return -1;
634}
635
636//ll
Note: See TracBrowser for help on using the repository browser.