aboutsummaryrefslogtreecommitdiffstats
path: root/libar2simplified_init_context.c
diff options
context:
space:
mode:
Diffstat (limited to 'libar2simplified_init_context.c')
-rw-r--r--libar2simplified_init_context.c50
1 files changed, 37 insertions, 13 deletions
diff --git a/libar2simplified_init_context.c b/libar2simplified_init_context.c
index ad35f13..6a5cb88 100644
--- a/libar2simplified_init_context.c
+++ b/libar2simplified_init_context.c
@@ -2,6 +2,7 @@
#include "common.h"
#include <pthread.h>
#include <semaphore.h>
+#include <stdatomic.h>
struct user_data;
@@ -11,7 +12,7 @@ struct thread_data {
struct user_data *master;
pthread_t thread;
sem_t semaphore;
- int error;
+ _Atomic int error;
void (*function)(void *data);
void *function_input;
};
@@ -84,31 +85,44 @@ static void *
thread_loop(void *data_)
{
struct thread_data *data = data_;
+ void (*function)(void *data);
+ void *function_input;
int err;
for (;;) {
if (sem_wait(&data->semaphore)) {
if (errno == EINTR)
continue;
- data->error = errno;
+ atomic_store(&data->error, errno);
return NULL;
}
- if (!data->function) {
- data->error = ENOTRECOVERABLE;
+ err = pthread_mutex_lock(&data->master->mutex);
+ if (err) {
+ atomic_store(&data->error, err);
+ return NULL;
+ }
+ function = data->function;
+ function_input = data->function_input;
+ if (!function) {
+ atomic_store(&data->error, ENOTRECOVERABLE);
+ pthread_mutex_unlock(&data->master->mutex);
return NULL;
}
- data->function(data->function_input);
+ pthread_mutex_unlock(&data->master->mutex);
+
+ function(function_input);
err = pthread_mutex_lock(&data->master->mutex);
if (err) {
- data->error = err;
+ atomic_store(&data->error, err);
return NULL;
}
data->master->resting[data->index / 64] |= (uint_least64_t)1 << (data->index % 64);
+
pthread_mutex_unlock(&data->master->mutex);
if (sem_post(&data->master->semaphore)) {
- data->error = errno;
+ atomic_store(&data->error, errno);
return NULL;
}
}
@@ -127,15 +141,17 @@ run_thread(size_t index, void (*function)(void *arg), void *arg, struct libar2_c
return -1;
}
data->resting[index / 64] ^= (uint_least64_t)1 << (index % 64);
- pthread_mutex_unlock(&data->mutex);
- if (data->threads[index].error) {
- errno = data->threads[index].error;
+ err = atomic_load(&data->threads[index].error);
+ if (err) {
+ pthread_mutex_unlock(&data->mutex);
+ errno = err;
return -1;
}
data->threads[index].function = function;
data->threads[index].function_input = arg;
+ pthread_mutex_unlock(&data->mutex);
if (sem_post(&data->threads[index].semaphore))
return -1;
@@ -148,15 +164,16 @@ destroy_thread_pool(struct libar2_context *ctx)
{
struct user_data *data = ctx->user_data;
size_t i;
- int ret = 0;
+ int err, ret = 0;
for (i = data->nthreads; i--;)
if (run_thread(i, pthread_exit, NULL, ctx))
return -1;
for (i = data->nthreads; i--;) {
pthread_join(data->threads[i].thread, NULL);
sem_destroy(&data->threads[i].semaphore);
- if (data->threads[i].error)
- ret = data->threads[i].error;
+ err = atomic_load(&data->threads[i].error);
+ if (err)
+ ret = err;
}
sem_destroy(&data->semaphore);
pthread_mutex_destroy(&data->mutex);
@@ -247,6 +264,7 @@ init_thread_pool(size_t desired, size_t *createdp, struct libar2_context *ctx)
for (i = 0; i < data->nthreads; i++) {
memset(&data->threads[i], 0, sizeof(data->threads[i]));
+ atomic_init(&data->threads[i].error, 0);
data->threads[i].master = data;
data->threads[i].index = i;
data->resting[i / 64] |= (uint_least64_t)1 << (i % 64);
@@ -293,6 +311,11 @@ await_threads(size_t *indices, size_t n, size_t require, struct libar2_context *
memset(data->joined, 0, (data->nthreads + 63) / 64 * sizeof(*data->joined));
+ err = pthread_mutex_lock(&data->mutex);
+ if (err) {
+ errno = err;
+ return 0;
+ }
for (i = 0; i < data->nthreads; i += 64) {
for (;;) {
one = data->resting[i / 64];
@@ -305,6 +328,7 @@ await_threads(size_t *indices, size_t n, size_t require, struct libar2_context *
indices[ret - 1] = i + lb(one);
}
}
+ pthread_mutex_unlock(&data->mutex);
for (;;) {
if (ret < require) {