diff --git a/src/include/abt.h.in b/src/include/abt.h.in index 4207f479..500e46ce 100644 --- a/src/include/abt.h.in +++ b/src/include/abt.h.in @@ -522,7 +522,31 @@ enum ABT_pool_kind { * * If the user does not know how \c ABT_POOL_FIFO_WAIT works, * \c ABT_POOL_FIFO is recommended. */ - ABT_POOL_FIFO_WAIT + ABT_POOL_FIFO_WAIT, + /** + * Random work-stealing pool which is often adopted by fine-grained task + * parallel runtime systems such as Cilk. + * + * Create push Non-create push (e.g., yield/resume) + * \ / + * (head) <- <- <- (tail) + * / \ + * Local pop Remote pop + * + * Specifically, if one of the following flags is set to ABT_pool_context, a + * work unit is pushed to the head. + * - ABT_POOL_CONTEXT_OP_THREAD_CREATE + * - ABT_POOL_CONTEXT_OP_THREAD_CREATE_TO + * - ABT_POOL_CONTEXT_OP_THREAD_REVIVE + * - ABT_POOL_CONTEXT_OP_THREAD_REVIVE_TO + * Otherwise, a work unit is pushed to the tail. + * + * If ABT_POOL_CONTEXT_OWNER_SECONDARY is set to ABT_pool_context, a work + * unit is popped from the tail. Otherwise, a work unit is popped from the + * head. + * + * The user is recommended to use this pool with ABT_SCHED_RANDWS. */ + ABT_POOL_RANDWS }; /** diff --git a/src/include/abti.h b/src/include/abti.h index 0c5b45ea..428c8c3f 100644 --- a/src/include/abti.h +++ b/src/include/abti.h @@ -609,6 +609,11 @@ ABTI_pool_get_fifo_wait_def(ABT_pool_access access, ABTI_pool_required_def *p_required_def, ABTI_pool_optional_def *p_optional_def, ABTI_pool_deprecated_def *p_deprecated_def); +ABTU_ret_err int +ABTI_pool_get_randws_def(ABT_pool_access access, + ABTI_pool_required_def *p_required_def, + ABTI_pool_optional_def *p_optional_def, + ABTI_pool_deprecated_def *p_deprecated_def); void ABTI_pool_print(ABTI_pool *p_pool, FILE *p_os, int indent); void ABTI_pool_reset_id(void); diff --git a/src/pool/Makefile.mk b/src/pool/Makefile.mk index 96b09e97..f421c916 100644 --- a/src/pool/Makefile.mk +++ b/src/pool/Makefile.mk @@ -9,4 +9,5 @@ abt_sources += \ pool/pool.c \ pool/pool_config.c \ pool/pool_user_def.c \ + pool/randws.c \ pool/thread_queue.h diff --git a/src/pool/pool.c b/src/pool/pool.c index 6e9f0cc1..2e5b7309 100644 --- a/src/pool/pool.c +++ b/src/pool/pool.c @@ -1411,6 +1411,11 @@ ABTU_ret_err int ABTI_pool_create_basic(ABT_pool_kind kind, ABTI_pool_get_fifo_wait_def(access, &required_def, &optional_def, &deprecated_def); break; + case ABT_POOL_RANDWS: + abt_errno = + ABTI_pool_get_randws_def(access, &required_def, &optional_def, + &deprecated_def); + break; default: abt_errno = ABT_ERR_INV_POOL_KIND; break; diff --git a/src/pool/randws.c b/src/pool/randws.c new file mode 100644 index 00000000..0a3116d4 --- /dev/null +++ b/src/pool/randws.c @@ -0,0 +1,415 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +#include "abti.h" +#include "thread_queue.h" +#include + +/* RANDWS pool implementation */ + +static int pool_init(ABT_pool pool, ABT_pool_config config); +static void pool_free(ABT_pool pool); +static ABT_bool pool_is_empty(ABT_pool pool); +static size_t pool_get_size(ABT_pool pool); +static void pool_push_shared(ABT_pool pool, ABT_unit unit, + ABT_pool_context context); +static void pool_push_private(ABT_pool pool, ABT_unit unit, + ABT_pool_context context); +static ABT_thread pool_pop_shared(ABT_pool pool, ABT_pool_context context); +static ABT_thread pool_pop_private(ABT_pool pool, ABT_pool_context context); +static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs, + ABT_pool_context context); +static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units, + size_t num_units, ABT_pool_context context); +static void pool_push_many_private(ABT_pool pool, const ABT_unit *units, + size_t num_units, ABT_pool_context context); +static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads, + size_t max_threads, size_t *num_popped, + ABT_pool_context context); +static void pool_pop_many_private(ABT_pool pool, ABT_thread *threads, + size_t max_threads, size_t *num_popped, + ABT_pool_context context); +static void pool_print_all(ABT_pool pool, void *arg, + void (*print_fn)(void *, ABT_thread)); +static ABT_unit pool_create_unit(ABT_pool pool, ABT_thread thread); +static void pool_free_unit(ABT_pool pool, ABT_unit unit); + +/* For backward compatibility */ +static int pool_remove_shared(ABT_pool pool, ABT_unit unit); +static int pool_remove_private(ABT_pool pool, ABT_unit unit); +static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs); +static ABT_bool pool_unit_is_in_pool(ABT_unit unit); + +#define POOL_CONTEXT_PUSH_HEAD \ + (ABT_POOL_CONTEXT_OP_THREAD_CREATE | \ + ABT_POOL_CONTEXT_OP_THREAD_CREATE_TO | \ + ABT_POOL_CONTEXT_OP_THREAD_REVIVE | ABT_POOL_CONTEXT_OP_THREAD_REVIVE_TO) +#define POOL_CONTEXT_POP_TAIL (ABT_POOL_CONTEXT_OWNER_SECONDARY) + +struct data { + ABTD_spinlock mutex; + thread_queue_t queue; +}; +typedef struct data data_t; + +static inline data_t *pool_get_data_ptr(void *p_data) +{ + return (data_t *)p_data; +} + +/* Obtain the RANDWS pool definition according to the access type */ +ABTU_ret_err int +ABTI_pool_get_randws_def(ABT_pool_access access, + ABTI_pool_required_def *p_required_def, + ABTI_pool_optional_def *p_optional_def, + ABTI_pool_deprecated_def *p_deprecated_def) +{ + /* Definitions according to the access type */ + /* FIXME: need better implementation, e.g., lock-free one */ + switch (access) { + case ABT_POOL_ACCESS_PRIV: + p_required_def->p_push = pool_push_private; + p_required_def->p_pop = pool_pop_private; + p_optional_def->p_push_many = pool_push_many_private; + p_optional_def->p_pop_many = pool_pop_many_private; + p_deprecated_def->p_remove = pool_remove_private; + break; + + case ABT_POOL_ACCESS_SPSC: + case ABT_POOL_ACCESS_MPSC: + case ABT_POOL_ACCESS_SPMC: + case ABT_POOL_ACCESS_MPMC: + p_required_def->p_push = pool_push_shared; + p_required_def->p_pop = pool_pop_shared; + p_optional_def->p_push_many = pool_push_many_shared; + p_optional_def->p_pop_many = pool_pop_many_shared; + p_deprecated_def->p_remove = pool_remove_shared; + break; + + default: + ABTI_HANDLE_ERROR(ABT_ERR_INV_POOL_ACCESS); + } + + /* Common definitions regardless of the access type */ + p_optional_def->p_init = pool_init; + p_optional_def->p_free = pool_free; + p_required_def->p_is_empty = pool_is_empty; + p_optional_def->p_get_size = pool_get_size; + p_optional_def->p_pop_wait = pool_pop_wait; + p_optional_def->p_print_all = pool_print_all; + p_required_def->p_create_unit = pool_create_unit; + p_required_def->p_free_unit = pool_free_unit; + + p_deprecated_def->p_pop_timedwait = pool_pop_timedwait; + p_deprecated_def->u_is_in_pool = pool_unit_is_in_pool; + return ABT_SUCCESS; +} + +/* Pool functions */ + +static int pool_init(ABT_pool pool, ABT_pool_config config) +{ + ABTI_UNUSED(config); + int abt_errno = ABT_SUCCESS; + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + ABT_pool_access access; + + data_t *p_data; + abt_errno = ABTU_malloc(sizeof(data_t), (void **)&p_data); + ABTI_CHECK_ERROR(abt_errno); + + access = p_pool->access; + if (access != ABT_POOL_ACCESS_PRIV) { + /* Initialize the mutex */ + ABTD_spinlock_clear(&p_data->mutex); + } + thread_queue_init(&p_data->queue); + + p_pool->data = p_data; + return abt_errno; +} + +static void pool_free(ABT_pool pool) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + thread_queue_free(&p_data->queue); + ABTU_free(p_data); +} + +static ABT_bool pool_is_empty(ABT_pool pool) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + return thread_queue_is_empty(&p_data->queue); +} + +static size_t pool_get_size(ABT_pool pool) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + return thread_queue_get_size(&p_data->queue); +} + +static void pool_push_shared(ABT_pool pool, ABT_unit unit, + ABT_pool_context context) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit); + ABTD_spinlock_acquire(&p_data->mutex); + if (context & POOL_CONTEXT_PUSH_HEAD) { + thread_queue_push_head(&p_data->queue, p_thread); + } else { + thread_queue_push_tail(&p_data->queue, p_thread); + } + ABTD_spinlock_release(&p_data->mutex); +} + +static void pool_push_private(ABT_pool pool, ABT_unit unit, + ABT_pool_context context) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit); + if (context & POOL_CONTEXT_PUSH_HEAD) { + thread_queue_push_head(&p_data->queue, p_thread); + } else { + thread_queue_push_tail(&p_data->queue, p_thread); + } +} + +static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units, + size_t num_units, ABT_pool_context context) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + if (num_units > 0) { + ABTD_spinlock_acquire(&p_data->mutex); + size_t i; + for (i = 0; i < num_units; i++) { + ABTI_thread *p_thread = + ABTI_unit_get_thread_from_builtin_unit(units[i]); + if (context & POOL_CONTEXT_PUSH_HEAD) { + thread_queue_push_head(&p_data->queue, p_thread); + } else { + thread_queue_push_tail(&p_data->queue, p_thread); + } + } + ABTD_spinlock_release(&p_data->mutex); + } +} + +static void pool_push_many_private(ABT_pool pool, const ABT_unit *units, + size_t num_units, ABT_pool_context context) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + size_t i; + for (i = 0; i < num_units; i++) { + ABTI_thread *p_thread = + ABTI_unit_get_thread_from_builtin_unit(units[i]); + if (context & POOL_CONTEXT_PUSH_HEAD) { + thread_queue_push_head(&p_data->queue, p_thread); + } else { + thread_queue_push_tail(&p_data->queue, p_thread); + } + } +} + +static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs, + ABT_pool_context context) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + double time_start = 0.0; + while (1) { + if (thread_queue_acquire_spinlock_if_not_empty(&p_data->queue, + &p_data->mutex) == 0) { + ABTI_thread *p_thread; + if (context & POOL_CONTEXT_POP_TAIL) { + p_thread = thread_queue_pop_tail(&p_data->queue); + } else { + p_thread = thread_queue_pop_head(&p_data->queue); + } + ABTD_spinlock_release(&p_data->mutex); + if (p_thread) + return ABTI_thread_get_handle(p_thread); + } + if (time_start == 0.0) { + time_start = ABTI_get_wtime(); + } else { + double elapsed = ABTI_get_wtime() - time_start; + if (elapsed > time_secs) + return ABT_THREAD_NULL; + } + /* Sleep. */ + const int sleep_nsecs = 100; + struct timespec ts = { 0, sleep_nsecs }; + nanosleep(&ts, NULL); + } +} + +static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + while (1) { + if (thread_queue_acquire_spinlock_if_not_empty(&p_data->queue, + &p_data->mutex) == 0) { + ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue); + ABTD_spinlock_release(&p_data->mutex); + if (p_thread) { + return ABTI_unit_get_builtin_unit(p_thread); + } + } + const int sleep_nsecs = 100; + struct timespec ts = { 0, sleep_nsecs }; + nanosleep(&ts, NULL); + + if (ABTI_get_wtime() > abstime_secs) + return ABT_UNIT_NULL; + } +} + +static ABT_thread pool_pop_shared(ABT_pool pool, ABT_pool_context context) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + if (thread_queue_acquire_spinlock_if_not_empty(&p_data->queue, + &p_data->mutex) == 0) { + ABTI_thread *p_thread; + if (context & POOL_CONTEXT_POP_TAIL) { + p_thread = thread_queue_pop_tail(&p_data->queue); + } else { + p_thread = thread_queue_pop_head(&p_data->queue); + } + ABTD_spinlock_release(&p_data->mutex); + return ABTI_thread_get_handle(p_thread); + } else { + return ABT_THREAD_NULL; + } +} + +static ABT_thread pool_pop_private(ABT_pool pool, ABT_pool_context context) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + ABTI_thread *p_thread; + if (context & POOL_CONTEXT_POP_TAIL) { + p_thread = thread_queue_pop_tail(&p_data->queue); + } else { + p_thread = thread_queue_pop_head(&p_data->queue); + } + return ABTI_thread_get_handle(p_thread); +} + +static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads, + size_t max_threads, size_t *num_popped, + ABT_pool_context context) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + if (max_threads != 0 && + thread_queue_acquire_spinlock_if_not_empty(&p_data->queue, + &p_data->mutex) == 0) { + size_t i; + for (i = 0; i < max_threads; i++) { + ABTI_thread *p_thread; + if (context & POOL_CONTEXT_POP_TAIL) { + p_thread = thread_queue_pop_tail(&p_data->queue); + } else { + p_thread = thread_queue_pop_head(&p_data->queue); + } + if (!p_thread) + break; + threads[i] = ABTI_thread_get_handle(p_thread); + } + *num_popped = i; + ABTD_spinlock_release(&p_data->mutex); + } else { + *num_popped = 0; + } +} + +static void pool_pop_many_private(ABT_pool pool, ABT_thread *threads, + size_t max_threads, size_t *num_popped, + ABT_pool_context context) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + size_t i; + for (i = 0; i < max_threads; i++) { + ABTI_thread *p_thread; + if (context & POOL_CONTEXT_POP_TAIL) { + p_thread = thread_queue_pop_tail(&p_data->queue); + } else { + p_thread = thread_queue_pop_head(&p_data->queue); + } + if (!p_thread) + break; + threads[i] = ABTI_thread_get_handle(p_thread); + } + *num_popped = i; +} + +static int pool_remove_shared(ABT_pool pool, ABT_unit unit) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit); + ABTD_spinlock_acquire(&p_data->mutex); + int abt_errno = thread_queue_remove(&p_data->queue, p_thread); + ABTD_spinlock_release(&p_data->mutex); + return abt_errno; +} + +static int pool_remove_private(ABT_pool pool, ABT_unit unit) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit); + return thread_queue_remove(&p_data->queue, p_thread); +} + +static void pool_print_all(ABT_pool pool, void *arg, + void (*print_fn)(void *, ABT_thread)) +{ + ABT_pool_access access; + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + + access = p_pool->access; + if (access != ABT_POOL_ACCESS_PRIV) { + ABTD_spinlock_acquire(&p_data->mutex); + } + thread_queue_print_all(&p_data->queue, arg, print_fn); + if (access != ABT_POOL_ACCESS_PRIV) { + ABTD_spinlock_release(&p_data->mutex); + } +} + +/* Unit functions */ + +static ABT_bool pool_unit_is_in_pool(ABT_unit unit) +{ + ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit); + return ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ? ABT_TRUE + : ABT_FALSE; +} + +static ABT_unit pool_create_unit(ABT_pool pool, ABT_thread thread) +{ + /* Call ABTI_unit_init_builtin() instead. */ + ABTI_ASSERT(0); + return ABT_UNIT_NULL; +} + +static void pool_free_unit(ABT_pool pool, ABT_unit unit) +{ + /* A built-in unit does not need to be freed. This function may not be + * called. */ + ABTI_ASSERT(0); +} diff --git a/src/pool/thread_queue.h b/src/pool/thread_queue.h index bfa1d829..1e0b54c5 100644 --- a/src/pool/thread_queue.h +++ b/src/pool/thread_queue.h @@ -66,6 +66,29 @@ static inline size_t thread_queue_get_size(const thread_queue_t *p_queue) return p_queue->num_threads; } +static inline void thread_queue_push_head(thread_queue_t *p_queue, + ABTI_thread *p_thread) +{ + if (p_queue->num_threads == 0) { + p_thread->p_prev = p_thread; + p_thread->p_next = p_thread; + p_queue->p_head = p_thread; + p_queue->p_tail = p_thread; + p_queue->num_threads = 1; + ABTD_atomic_release_store_int(&p_queue->is_empty, 0); + } else { + ABTI_thread *p_head = p_queue->p_head; + ABTI_thread *p_tail = p_queue->p_tail; + p_tail->p_next = p_thread; + p_head->p_prev = p_thread; + p_thread->p_prev = p_tail; + p_thread->p_next = p_head; + p_queue->p_head = p_thread; + p_queue->num_threads++; + } + ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1); +} + static inline void thread_queue_push_tail(thread_queue_t *p_queue, ABTI_thread *p_thread) { @@ -114,6 +137,31 @@ static inline ABTI_thread *thread_queue_pop_head(thread_queue_t *p_queue) } } +static inline ABTI_thread *thread_queue_pop_tail(thread_queue_t *p_queue) +{ + if (p_queue->num_threads > 0) { + ABTI_thread *p_thread = p_queue->p_tail; + if (p_queue->num_threads == 1) { + p_queue->p_head = NULL; + p_queue->p_tail = NULL; + p_queue->num_threads = 0; + ABTD_atomic_release_store_int(&p_queue->is_empty, 1); + } else { + p_thread->p_prev->p_next = p_thread->p_next; + p_thread->p_next->p_prev = p_thread->p_prev; + p_queue->p_tail = p_thread->p_prev; + p_queue->num_threads--; + } + + p_thread->p_prev = NULL; + p_thread->p_next = NULL; + ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); + return p_thread; + } else { + return NULL; + } +} + ABTU_ret_err static inline int thread_queue_remove(thread_queue_t *p_queue, ABTI_thread *p_thread) {