Here is the piece of code in which segmentation fault occurs (the perror is not being called):
job = malloc(sizeof(task_t));
if(job == NULL)
perror("malloc");
To be more precise, gdb says that the segfault
happens inside a __int_malloc
call, which is a sub-routine call made by malloc
.
Since the malloc function is called in parallel with other threads, initially I thought that it could be the problem. I was using version 2.19 of glibc.
The data structures:
typedef struct rv_thread thread_wrapper_t;
typedef struct future
{
pthread_cond_t wait;
pthread_mutex_t mutex;
long completed;
} future_t;
typedef struct task
{
future_t * f;
void * data;
void *
(*fun)(thread_wrapper_t *, void *);
} task_t;
typedef struct
{
queue_t * queue;
} pool_worker_t;
typedef struct
{
task_t * t;
} sfuture_t;
struct rv_thread
{
pool_worker_t * pool;
};
Now the future implementation:
future_t *
create_future()
{
future_t * new_f = malloc(sizeof(future_t));
if(new_f == NULL)
perror("malloc");
new_f->completed = 0;
pthread_mutex_init(&(new_f->mutex), NULL);
pthread_cond_init(&(new_f->wait), NULL);
return new_f;
}
int
wait_future(future_t * f)
{
pthread_mutex_lock(&(f->mutex));
while (!f->completed)
{
pthread_cond_wait(&(f->wait),&(f->mutex));
}
pthread_mutex_unlock(&(f->mutex));
return 0;
}
void
complete(future_t * f)
{
pthread_mutex_lock(&(f->mutex));
f->completed = 1;
pthread_mutex_unlock(&(f->mutex));
pthread_cond_broadcast(&(f->wait));
}
The thread pool itself:
pool_worker_t *
create_work_pool(int threads)
{
pool_worker_t * new_p = malloc(sizeof(pool_worker_t));
if(new_p == NULL)
perror("malloc");
threads = 1;
new_p->queue = create_queue();
int i;
for (i = 0; i < threads; i++){
thread_wrapper_t * w = malloc(sizeof(thread_wrapper_t));
if(w == NULL)
perror("malloc");
w->pool = new_p;
pthread_t n;
pthread_create(&n, NULL, work, w);
}
return new_p;
}
task_t *
try_get_new_task(thread_wrapper_t * thr)
{
task_t * t = NULL;
try_dequeue(thr->pool->queue, t);
return t;
}
void
submit_job(pool_worker_t * p, task_t * t)
{
enqueue(p->queue, t);
}
void *
work(void * data)
{
thread_wrapper_t * thr = (thread_wrapper_t *) data;
while (1){
task_t * t = NULL;
while ((t = (task_t *) try_get_new_task(thr)) == NULL);
future_t * f = t->f;
(*(t->fun))(thr,t->data);
complete(f);
}
pthread_exit(NULL);
}
And finally the task.c:
pool_worker_t *
create_tpool()
{
return (create_work_pool(8));
}
sfuture_t *
async(pool_worker_t * p, thread_wrapper_t * thr, void *
(*fun)(thread_wrapper_t *, void *), void * data)
{
task_t * job = NULL;
job = malloc(sizeof(task_t));
if(job == NULL)
perror("malloc");
job->data = data;
job->fun = fun;
job->f = create_future();
submit_job(p, job);
sfuture_t * new_t = malloc(sizeof(sfuture_t));
if(new_t == NULL)
perror("malloc");
new_t->t = job;
return (new_t);
}
void
mywait(thread_wrapper_t * thr, sfuture_t * sf)
{
if (sf == NULL)
return;
if (thr != NULL)
{
while (!sf->t->f->completed)
{
task_t * t_n = try_get_new_task(thr);
if (t_n != NULL)
{
future_t * f = t_n->f;
(*(t_n->fun))(thr,t_n->data);
complete(f);
}
}
return;
}
wait_future(sf->t->f);
return ;
}
The queue is the lfds lock-free queue.
#define enqueue(q,t) { \
if(!lfds611_queue_enqueue(q->lq, t)) \
{ \
lfds611_queue_guaranteed_enqueue(q->lq, t); \
} \
}
#define try_dequeue(q,t) { \
lfds611_queue_dequeue(q->lq, &t); \
}
The problem happens whenever the number of calls to async is very high.
Valgrind output:
Process terminating with default action of signal 11 (SIGSEGV)
==12022== Bad permissions for mapped region at address 0x5AF9FF8
==12022== at 0x4C28737: malloc (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)