diff options
Diffstat (limited to 'libar2simplified_hash.c')
-rw-r--r-- | libar2simplified_hash.c | 307 |
1 files changed, 165 insertions, 142 deletions
diff --git a/libar2simplified_hash.c b/libar2simplified_hash.c index bdc5dd2..4583dfb 100644 --- a/libar2simplified_hash.c +++ b/libar2simplified_hash.c @@ -4,13 +4,13 @@ #include <semaphore.h> +struct user_data; + struct thread_data { + size_t index; + struct user_data *master; pthread_t thread; - pthread_mutex_t mutex; sem_t semaphore; - pthread_mutex_t *master_mutex; - sem_t *master_semaphore; - int *master_needs_a_thread; int error; void (*function)(void *data); void *function_input; @@ -19,14 +19,15 @@ struct thread_data { struct user_data { struct thread_data *threads; size_t nthreads; - int need_a_thread; - pthread_mutex_t master_mutex; - sem_t master_semaphore; + pthread_mutex_t mutex; + sem_t semaphore; + uint_least64_t *joined; + uint_least64_t resting[]; }; static void * -alignedalloc(size_t num, size_t size, size_t alignment, size_t extra) +alignedalloc(size_t num, size_t size, size_t extra, size_t alignment) { void *ptr; int err; @@ -50,7 +51,7 @@ static void * allocate(size_t num, size_t size, size_t alignment, struct libar2_context *ctx) { size_t pad = (alignment - ((2 * sizeof(size_t)) & (alignment - 1))) & (alignment - 1); - char *ptr = alignedalloc(num, size, alignment, pad + 2 * sizeof(size_t)); + char *ptr = alignedalloc(num, size, pad + 2 * sizeof(size_t), alignment); if (ptr) { ptr = &ptr[pad]; *(size_t *)ptr = pad; @@ -81,8 +82,6 @@ thread_loop(void *data_) { struct thread_data *data = data_; int err; - void (*function)(void *data); - void *function_input; for (;;) { if (sem_wait(&data->semaphore)) { @@ -90,43 +89,22 @@ thread_loop(void *data_) return NULL; } - err = pthread_mutex_lock(&data->mutex); + if (!data->function) { + data->error = ENOTRECOVERABLE; + return NULL; + } + data->function(data->function_input); + + err = pthread_mutex_lock(&data->master->mutex); if (err) { data->error = err; return NULL; } - function_input = data->function_input; - function = data->function; - pthread_mutex_unlock(&data->mutex); - - if (function) { - function(function_input); - - err = pthread_mutex_lock(data->master_mutex); - if (err) { - data->error = err; - return NULL; - } - - err = pthread_mutex_lock(&data->mutex); - if (err) { - pthread_mutex_unlock(data->master_mutex); - data->error = err; - return NULL; - } - data->function = NULL; - data->function_input = NULL; - pthread_mutex_unlock(&data->mutex); - if (*data->master_needs_a_thread) { - *data->master_needs_a_thread = 0; - if (sem_post(data->master_semaphore)) { - err = errno; - pthread_mutex_unlock(data->master_mutex); - data->error = err; - return NULL; - } - } - pthread_mutex_unlock(data->master_mutex); + data->master->resting[data->index / 64] |= (uint_least64_t)1 << (data->index % 64); + pthread_mutex_unlock(&data->master->mutex); + if (sem_post(&data->master->semaphore)) { + data->error = errno; + return NULL; } } } @@ -137,23 +115,25 @@ run_thread(size_t index, void (*function)(void *arg), void *arg, struct libar2_c { struct user_data *data = ctx->user_data; int err; - err = pthread_mutex_lock(&data->threads[index].mutex); + + err = pthread_mutex_lock(&data->mutex); if (err) { errno = err; return -1; } + data->resting[index / 64] ^= (uint_least64_t)1 << (index % 64); + pthread_mutex_unlock(&data->mutex); + if (data->threads[index].error) { - err = data->threads[index].error; - pthread_mutex_unlock(&data->threads[index].mutex); - errno = err; + errno = data->threads[index].error; return -1; } - data->threads[index].function_input = arg; + data->threads[index].function = function; - if (sem_post(&data->threads[index].semaphore)) { + data->threads[index].function_input = arg; + if (sem_post(&data->threads[index].semaphore)) return -1; - } - pthread_mutex_unlock(&data->threads[index].mutex); + return 0; } @@ -163,24 +143,20 @@ destroy_thread_pool(struct libar2_context *ctx) { struct user_data *data = ctx->user_data; size_t i; - int ret = 0, err; + int ret = 0; for (i = data->nthreads; i--;) if (run_thread(i, pthread_exit, NULL, ctx)) return -1; for (i = data->nthreads; i--;) { pthread_join(data->threads[i].thread, NULL); - err = pthread_mutex_lock(&data->threads[i].mutex); - if (err) - ret = err; sem_destroy(&data->threads[i].semaphore); if (data->threads[i].error) ret = data->threads[i].error; - pthread_mutex_unlock(&data->threads[i].mutex); - pthread_mutex_destroy(&data->threads[i].mutex); } free(data->threads); - sem_destroy(&data->master_semaphore); - pthread_mutex_destroy(&data->master_mutex); + sem_destroy(&data->semaphore); + pthread_mutex_destroy(&data->mutex); + free(data); return ret; } @@ -188,20 +164,21 @@ destroy_thread_pool(struct libar2_context *ctx) static int init_thread_pool(size_t desired, size_t *createdp, struct libar2_context *ctx) { - struct user_data *data = ctx->user_data; + struct user_data *data; int err; - size_t i; + size_t i, size; long int nproc, nproc_limit; #ifdef __linux__ char path[sizeof("/sys/devices/system/cpu/cpu") + 3 * sizeof(nproc)]; #endif +#ifdef _SC_SEM_VALUE_MAX + long int semlimit; +#endif -#ifdef TODO if (desired < 2) { *createdp = 0; return 0; } -#endif nproc = sysconf(_SC_NPROCESSORS_ONLN); #ifdef __linux__ @@ -217,49 +194,61 @@ init_thread_pool(size_t desired, size_t *createdp, struct libar2_context *ctx) if (nproc < 1) nproc = FALLBACK_NPROC; +#ifdef _SC_SEM_VALUE_MAX + semlimit = sysconf(_SC_SEM_VALUE_MAX); + if (semlimit >= 1 && semlimit < nproc) + nproc = semlimit; +#endif + if (nproc == 1) { *createdp = 0; return 0; } - data->nthreads = (size_t)nproc < desired ? (size_t)nproc : desired; - *createdp = data->nthreads; + desired = (size_t)nproc < desired ? (size_t)nproc : desired; - data->threads = alignedalloc(data->nthreads, sizeof(*data->threads), ALIGNOF(struct thread_data), 0); + if (desired > SIZE_MAX - 63 || (desired + 63) / 64 > SIZE_MAX / sizeof(uint_least64_t) / 2) { + errno = ENOMEM; + return -1; + } + size = (desired + 63) / 64; + size *= sizeof(uint_least64_t) * 2; + data = alignedalloc(1, offsetof(struct user_data, resting), size, ALIGNOF(struct user_data)); + memset(data, 0, offsetof(struct user_data, resting) + size); + data->joined = &data->resting[(desired + 63) / 64]; + ctx->user_data = data; + + *createdp = data->nthreads = desired; + + data->threads = alignedalloc(data->nthreads, sizeof(*data->threads), 0, ALIGNOF(struct thread_data)); if (!data->threads) return -1; - err = pthread_mutex_init(&data->master_mutex, NULL); + err = pthread_mutex_init(&data->mutex, NULL); if (err) { free(data->threads); return -1; } - err = sem_init(&data->master_semaphore, 0, 0); + err = sem_init(&data->semaphore, 0, 0); if (err) { - pthread_mutex_destroy(&data->master_mutex); + pthread_mutex_destroy(&data->mutex); free(data->threads); return -1; } - data->need_a_thread = 0; for (i = 0; i < data->nthreads; i++) { memset(&data->threads[i], 0, sizeof(data->threads[i])); - data->threads[i].master_mutex = &data->master_mutex; - data->threads[i].master_semaphore = &data->master_semaphore; - data->threads[i].master_needs_a_thread = &data->need_a_thread; - err = pthread_mutex_init(&data->threads[i].mutex, NULL); - if (err) - goto fail_post_mutex; + data->threads[i].master = data; + data->threads[i].index = i; + data->resting[i / 64] |= (uint_least64_t)1 << (i % 64); if (sem_init(&data->threads[i].semaphore, 0, 0)) { err = errno; - goto fail_post_cond; + goto fail_post_sem; } err = pthread_create(&data->threads[i].thread, NULL, thread_loop, &data->threads[i]); if (err) { sem_destroy(&data->threads[i].semaphore); - fail_post_cond: - pthread_mutex_destroy(&data->threads[i].mutex); - fail_post_mutex: + fail_post_sem: data->nthreads = i; destroy_thread_pool(ctx); errno = err; @@ -271,78 +260,112 @@ init_thread_pool(size_t desired, size_t *createdp, struct libar2_context *ctx) } -static int -set_need_a_thread(struct user_data *data, int need) -{ - int err; - err = pthread_mutex_lock(&data->master_mutex); - if (err) { - errno = err; - return -1; - } - data->need_a_thread = need; - pthread_mutex_unlock(&data->master_mutex); - return 0; -} - - -static int -await_some_thread(struct user_data *data) +/* + * INIT_THREAD_POOL + * slave semaphores: 0 + * master semaphore: 0 + * all ressting + * + * THREAD_LOOP + * acquire slave + * run function(input) + * with lock on master { + * mark as resting + * release master + * } + * + * RUN_THREAD + * with lock on master { + * mark as busy + * } + * set function & input + * release slave + * + * AWAIT_THREADS + * for (ret = 0; ret < require;) { + * acquire master + * with lock on master { + * mark one resting as joined + * ret += 1 + * } + * } + * while (try-acquire master) { + * with lock on master { + * mark one resting as joined + * ret += 1 + * } + * } + * return ret + */ + +#if defined(__GNUC__) +__attribute__((__const__)) +#endif +static size_t +lb(uint_least64_t x) { - int err, need_a_thread; - err = pthread_mutex_lock(&data->master_mutex); - if (err) { - errno = err; - return -1; + size_t r = 0; + while (x > 1) { + x >>= 1; + r += 1; } - need_a_thread = data->need_a_thread; - pthread_mutex_unlock(&data->master_mutex); - if (need_a_thread) { - if (sem_wait(&data->master_semaphore)) { - err = errno; - pthread_mutex_unlock(&data->master_mutex); - errno = err; - return -1; - } - } - return 0; + return r; } - static size_t await_threads(size_t *indices, size_t n, size_t require, struct libar2_context *ctx) { struct user_data *data = ctx->user_data; - size_t i, ret = 0, first = 0; + size_t ret = 0, i; + uint_least64_t one; int err; + + memset(data->joined, 0, (data->nthreads + 63) / 64 * sizeof(*data->joined)); + + for (i = 0; i < data->nthreads; i += 64) { + for (;;) { + one = data->resting[i / 64]; + one ^= data->joined[i / 64]; + if (!one) + break; + one &= ~(one - 1); + data->joined[i / 64] |= one; + if (ret++ < n) + indices[ret - 1] = i + lb(one); + } + } + for (;;) { - if (set_need_a_thread(data, 1)) - return 0; - for (i = first; i < data->nthreads; i++) { - err = pthread_mutex_lock(&data->threads[i].mutex); - if (err) { - errno = err; - return 0; - } - if (!data->threads[i].function) { - if (ret++ < n) - indices[ret - 1] = i; - first += (i == first); - } - if (data->threads[i].error) { - errno = data->threads[i].error; + if (ret < require) { + if (sem_wait(&data->semaphore)) return 0; - } - pthread_mutex_unlock(&data->threads[i].mutex); - } - if (ret >= require) { - if (set_need_a_thread(data, 0)) + } else if (sem_trywait(&data->semaphore)) { + if (errno == EAGAIN) + break; + else return 0; - return ret; } - if (await_some_thread(data)) + + err = pthread_mutex_lock(&data->mutex); + if (err) { + errno = err; return 0; + } + for (i = 0; i < data->nthreads; i += 64) { + one = data->resting[i / 64]; + one ^= data->joined[i / 64]; + if (!one) + continue; + one &= ~(one - 1); + data->joined[i / 64] |= one; + if (ret++ < n) + indices[ret - 1] = i + lb(one); + break; + } + pthread_mutex_unlock(&data->mutex); } + + return ret; } @@ -357,20 +380,20 @@ static int join_thread_pool(struct libar2_context *ctx) { struct user_data *data = ctx->user_data; - return await_threads(NULL, 0, data->nthreads, ctx) ? 0 : -1; + if (await_threads(NULL, 0, data->nthreads, ctx)) + return 0; + destroy_thread_pool(ctx); + return -1; } int libar2simplified_hash(void *hash, void *msg, size_t msglen, struct libar2_argon2_parameters *params) { - struct user_data ctx_data; struct libar2_context ctx; memset(&ctx, 0, sizeof(ctx)); - ctx.user_data = &ctx_data; ctx.autoerase_message = 1; - ctx.autoerase_salt = 1; ctx.allocate = allocate; ctx.deallocate = deallocate; ctx.init_thread_pool = init_thread_pool; |