Audio samples producer multiple threads OSX

2019-09-12 09:59发布

问题:

This question is a follow-up to a former question (Audio producer threads with OSX AudioComponent consumer thread and callback in C), including a test example, which works and behaves as expected but does not quite answer the question. I have substantially rephrased the question, and re-coded the example, so that it only contains plain-C code. (I've found out that few Objective-C portions of code in the former example only caused confusion and distracted the reader from what's essential in the question.)

In order to take advantage of multiple processor cores as well as to make the CoreAudio pull-model render thread as lightweight as possible, the LPCM samples' producer routine clearly has to "sit" on a different thread, outside the real-lime-priority render thread/callback. It must feed the samples to a circular buffer (TPCircularBuffer in this example), from which the system would schedule data pull-out in quants of inNumberFrames.

The Grand Central Dispatch API offers a simple solution, which I've deduced upon some individual research (including trial-and-error coding). This solution is elegant, since it doesn't block anything nor conflict between push and pull models. Yet the GCD, which is supposed to take care of "sub-threading" does not by far meet the specific parallelization requirements for the work threads of the producer code, so I had to explicitely spawn a number of POSIX threads, depending on the number of logical cores available. Although results are already remarkable in terms of speeding-up the computation I still feel a bit unconfortable mixing the POSIX and GCD. In particular it goes for the variable wait_interval, and computing it properly, not by predicting how many PCM samples may the render thread require for the next cycle.

Here's the shortened and simplified (pseudo)code for my test program, in plain-C.

Controller declaration:

#include "TPCircularBuffer.h"
#include <AudioToolbox/AudioToolbox.h>
#include <AudioUnit/AudioUnit.h>
#include <dispatch/dispatch.h>
#include <sys/sysctl.h>
#include <pthread.h>

typedef struct {
TPCircularBuffer            buffer;
AudioComponentInstance      toneUnit;
Float64                     sampleRate;
AudioStreamBasicDescription streamFormat;   
Float32*                     f;     //array of updated frequencies
Float32*                     a;     //array of updated amps
Float32*                     prevf; //array of prev. frequencies
Float32*                     preva; //array of prev. amps
Float32*                     val;
    int*                    arg;
    int*                    previous_arg;
UInt32                      frames;
int                         state;
Boolean                     midif; //wip
} MyAudioController;

MyAudioController gen;    
dispatch_semaphore_t mSemaphore;
Boolean multithreading, NF;

typedef struct data{
    int tid;
    int cpuCount;
}data;

Controller management:

void setup (void){
    // Initialize circular buffer
    TPCircularBufferInit(&(self->buffer), kBufferLength);
    // Create the semaphore
    mSemaphore = dispatch_semaphore_create(0);
    // Setup audio
    createToneUnit(&gen);  
}

void dealloc (void) {
    // Release buffer resources
    TPCircularBufferCleanup(&buffer);
    // Clean up semaphore
    dispatch_release(mSemaphore);
    // dispose of audio
    if(gen.toneUnit){
        AudioOutputUnitStop(gen.toneUnit);
        AudioUnitUninitialize(gen.toneUnit);
        AudioComponentInstanceDispose(gen.toneUnit);
    }
}

Dispatcher call (launching producer queue from the main thread):

void dproducer (Boolean on, Boolean multithreading, Boolean NF)
{
if (on == true)
{
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
        if((multithreading)||(NF))
            producerSum(on);
        else
            producer(on);
    });
}
return;
}

Threadable producer routine:

void producerSum(Boolean on)
{
int rc;
int num = getCPUnum();
pthread_t threads[num];
data thread_args[num];
void* resulT;
static Float32 frames  [FR_MAX];
Float32 wait_interval;
int     bytesToCopy;
Float32 floatmax;

while(on){
    wait_interval = FACT*(gen.frames)/(gen.sampleRate);
    Float32 damp = 1./(Float32)(gen.frames);
    bytesToCopy = gen.frames*sizeof(Float32);
    memset(frames,  0, FR_MAX*sizeof(Float32));
    availableBytes = 0;
    fbuffW  = (Float32**)calloc(num + 1, sizeof(Float32*));
    for (int i=0; i<num; ++i)
    {
        fbuffW[i] = (Float32*)calloc(gen.frames, sizeof(Float32));
        thread_args[i].tid = i; 
        thread_args[i].cpuCount = num;
        rc = pthread_create(&threads[i], NULL, producerTN, (void *) &thread_args[i]);
    }

    for (int i=0; i<num; ++i)  rc = pthread_join(threads[i], &resulT);

    for(UInt32 samp = 0; samp < gen.frames; samp++)
        for(int i = 0; i < num; i++)
            frames[samp] += fbuffW[i][samp];

    //code for managing producer state and GUI updates
    { ... }

    float *head = TPCircularBufferHead(&(gen.buffer), &availableBytes);
    memcpy(head,(const void*)frames,MIN(bytesToCopy, availableBytes));//copies frames to head
    TPCircularBufferProduce(&(gen.buffer),MIN(bytesToCopy,availableBytes));

dispatch_semaphore_wait(mSemaphore, dispatch_time(DISPATCH_TIME_NOW, wait_interval * NSEC_PER_SEC));
if(gen.state == stopped){gen.state = idle; on = false;}

for(int i = 0; i <= num; i++)
        free(fbuffW[i]);
free(fbuffW);
}
return;
}

A single producer thread may look somewhat like this:

void *producerT  (void *TN)
{
Float32 samples[FR_MAX];
data threadData;
threadData = *((data *)TN);
int tid  = threadData.tid;
int step = threadData.cpuCount;
int *ret = calloc(1,sizeof(int));

do_something(tid, step, &samples);

{ … }
return (void*)ret;
}

Here is the render callback (CoreAudio real-time consumer thread):

static OSStatus audioRenderCallback(void *inRefCon,
                                  AudioUnitRenderActionFlags *ioActionFlags,
                                  const AudioTimeStamp *inTimeStamp,
                                  UInt32 inBusNumber,
                                  UInt32 inNumberFrames,
                                  AudioBufferList *ioData) {    

MyAudioController *THIS = (MyAudioController *)inRefCon;

// An event happens in the render thread- signal whoever is waiting
if (THIS->state == active) dispatch_semaphore_signal(mSemaphore);

// Mono audio rendering: we only need one target buffer
const int channel = 0;
Float32* targetBuffer = (Float32 *)ioData->mBuffers[channel].mData;
memset(targetBuffer,0,inNumberFrames*sizeof(Float32));

// Pull samples from circular buffer
int32_t availableBytes;    
Float32 *buffer = TPCircularBufferTail(&THIS->buffer, &availableBytes);

//copy circularBuffer content to target buffer
int bytesToCopy = ioData->mBuffers[channel].mDataByteSize;
memcpy(targetBuffer, buffer, MIN(bytesToCopy, availableBytes));
{ … };

TPCircularBufferConsume(&THIS->buffer, availableBytes);    
THIS->frames = inNumberFrames;
return noErr;
}

回答1:

Grand Central Dispatch already takes care of dispatching operations to multiple processor cores and threads. In typical real-time audio rendering or processing, one never needs to wait on a signal or semaphore, as the circular buffer consumption rate is very predictable, and drifts extremely slowly over time. The AVAudioSession API (if available) and Audio Unit API and callback allow you to set and determine the callback buffer size, and thus the maximum rate at which the circular buffer can change. Thus you can dispatch all render operations on a timer, render the exact number needed per timer period, and let the buffer size and state compensate for any jitter in thread dispatch time.

In extremely long running audio renders, you might want to measure the drift between timer operations and real-time audio consumption (sample rate), and tweak the number of samples rendered or the timer offset.