Cog Audio: Enhance playback queue handler, so it always halts buffering when there are at least 30 seconds worth of buffers filled, possibly spanning multiple files. Also improve the chain reset function so that playlist changes and playback order control reset the queue properly when the queue refill function is currently entered in another thread.

CQTexperiment
Christopher Snowhill 2022-01-13 23:05:32 -08:00
parent 7cc89c9f92
commit c8d2864862
8 changed files with 102 additions and 27 deletions

View File

@ -8,6 +8,10 @@
#import <Cocoa/Cocoa.h>
#import <CogAudio/Semaphore.h>
#import <stdatomic.h>
@class BufferChain;
@class OutputNode;
@ -30,6 +34,11 @@
BOOL endOfInputReached;
BOOL startedPaused;
BOOL initialBufferFilled;
Semaphore *semaphore;
atomic_bool resettingNow;
atomic_int refCount;
}
- (id)init;

View File

@ -29,6 +29,11 @@
endOfInputReached = NO;
chainQueue = [[NSMutableArray alloc] init];
semaphore = [[Semaphore alloc] init];
atomic_init(&resettingNow, false);
atomic_init(&refCount, 0);
}
return self;
@ -189,6 +194,21 @@
// Called when the playlist changed before we actually started playing a requested stream. We will re-request.
- (void)resetNextStreams
{
// This sucks! And since the thread that's inside the function can be calling
// event dispatches, we have to pump the message queue if we're on the main
// thread. Damn.
if (atomic_load_explicit(&refCount, memory_order_relaxed) != 0) {
BOOL mainThread = (dispatch_queue_get_label(dispatch_get_main_queue()) == dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL));
atomic_store(&resettingNow, true);
while (atomic_load_explicit(&refCount, memory_order_relaxed) != 0) {
[semaphore signal]; // Gotta poke this periodically
if (mainThread)
[[NSRunLoop currentRunLoop] runUntilDate:[NSDate dateWithTimeIntervalSinceNow:0.001]];
else
usleep(500);
}
atomic_store(&resettingNow, false);
}
@synchronized (chainQueue) {
for (id anObject in chainQueue) {
[anObject setShouldContinue:NO];
@ -246,31 +266,14 @@
}
- (BOOL)endOfInputReached:(BufferChain *)sender //Sender is a BufferChain
{
// Stop single or series of short tracks from queueing forever
{
unsigned long queueCount;
@synchronized (chainQueue) {
queueCount = [chainQueue count];
}
while (queueCount >= 5)
{
usleep(10000);
@synchronized (chainQueue) {
queueCount = [chainQueue count];
}
}
}
return [self endOfInputReachedInternal:sender];
}
- (BOOL)endOfInputReachedInternal:(BufferChain *)sender //Sender is a BufferChain
{
BufferChain *newChain = nil;
if (atomic_load_explicit(&resettingNow, memory_order_relaxed))
return YES;
atomic_fetch_add(&refCount, 1);
@synchronized (chainQueue) {
// No point in constructing new chain for the next playlist entry
// if there's already one at the head of chainQueue... r-r-right?
@ -278,6 +281,7 @@
{
if ([chain isRunning])
{
atomic_fetch_sub(&refCount, 1);
return YES;
}
}
@ -287,17 +291,42 @@
//{
// return YES;
//}
nextStreamUserInfo = [sender userInfo];
nextStreamRGInfo = [sender rgInfo];
}
double duration = 0.0;
@synchronized (chainQueue) {
for (BufferChain *chain in chainQueue) {
duration += [chain secondsBuffered];
}
}
while (duration >= 30.0)
{
[semaphore wait];
if (atomic_load_explicit(&resettingNow, memory_order_relaxed)) {
atomic_fetch_sub(&refCount, 1);
return YES;
}
@synchronized (chainQueue) {
duration = 0.0;
for (BufferChain *chain in chainQueue) {
duration += [chain secondsBuffered];
}
}
}
nextStreamUserInfo = [sender userInfo];
nextStreamRGInfo = [sender rgInfo];
// This call can sometimes lead to invoking a chainQueue block on another thread
[self requestNextStream: nextStreamUserInfo];
if (!nextStream)
if (!nextStream) {
atomic_fetch_sub(&refCount, 1);
return YES;
}
@synchronized (chainQueue) {
newChain = [[BufferChain alloc] initWithController:self];
@ -326,6 +355,7 @@
//Keep on-playin
newChain = nil;
atomic_fetch_sub(&refCount, 1);
return NO;
}
}
@ -337,6 +367,7 @@
if (nextStream == nil)
{
newChain = nil;
atomic_fetch_sub(&refCount, 1);
return YES;
}
@ -362,6 +393,7 @@
// - head of chainQueue is the buffer chain for the next entry (which has launched its threads already)
}
atomic_fetch_sub(&refCount, 1);
return YES;
}
@ -390,6 +422,8 @@
[chainQueue removeObjectAtIndex:0];
DLog(@"New!!! %@ %@", bufferChain, [[bufferChain inputNode] decoder]);
[semaphore signal];
}
[self notifyStreamChanged:[bufferChain userInfo]];

View File

@ -73,4 +73,6 @@
- (ConverterNode *)converter;
- (AudioStreamBasicDescription)inputFormat;
- (double)secondsBuffered;
@end

View File

@ -228,4 +228,15 @@
return inputFormat;
}
- (double)secondsBuffered
{
double duration = 0.0;
Node * node = [self finalNode];
while (node) {
duration += [node secondsBuffered];
node = [node previousNode];
}
return duration;
}
@end

View File

@ -1000,4 +1000,9 @@ static float db_to_scale(float db)
floatSize = 0;
}
- (double) secondsBuffered
{
return ((double)[buffer bufferedLength] / (outputFormat.mSampleRate * outputFormat.mBytesPerPacket));
}
@end

View File

@ -236,4 +236,10 @@
return decoder;
}
- (double) secondsBuffered
{
AudioStreamBasicDescription inputFormat = [[[controller controller] bufferChain] inputFormat];
return ((double)[buffer bufferedLength] / (inputFormat.mSampleRate * inputFormat.mBytesPerPacket));
}
@end

View File

@ -60,4 +60,6 @@
- (BOOL)endOfStream;
- (void)setEndOfStream:(BOOL)e;
- (double)secondsBuffered;
@end

View File

@ -219,5 +219,11 @@
return shouldReset;
}
// Buffering nodes should implement this
- (double)secondsBuffered
{
return 0.0;
}
@end