Skip to content

Commit

Permalink
Introduced callback to Pthread, Win32 and OpenMP backend
Browse files Browse the repository at this point in the history
  • Loading branch information
shivammonaka committed Apr 1, 2024
1 parent 87f83eb commit 7061af9
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 186 deletions.
5 changes: 5 additions & 0 deletions cblas.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ char* openblas_get_config(void);
/*Get the CPU corename on runtime.*/
char* openblas_get_corename(void);

/*Set the threading backend to a custom callback.*/
typedef void (*openblas_dojob_callback)(int thread_num, void *jobdata, int dojob_data);
typedef void (*openblas_threads_callback)(int sync, openblas_dojob_callback dojob, int numjobs, size_t jobdata_elsize, void *jobdata, int dojob_data);
void openblas_set_threads_callback_function(openblas_threads_callback callback);

#ifdef OPENBLAS_OS_LINUX
/* Sets thread affinity for OpenBLAS threads. `thread_idx` is in [0, openblas_get_num_threads()-1]. */
int openblas_setaffinity(int thread_idx, size_t cpusetsize, cpu_set_t* cpu_set);
Expand Down
5 changes: 5 additions & 0 deletions common_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ int BLASFUNC(xerbla)(char *, blasint *info, blasint);

void openblas_set_num_threads_(int *);

/*Set the threading backend to a custom callback.*/
typedef void (*openblas_dojob_callback)(int thread_num, void *jobdata, int dojob_data);
typedef void (*openblas_threads_callback)(int sync, openblas_dojob_callback dojob, int numjobs, size_t jobdata_elsize, void *jobdata, int dojob_data);
extern openblas_threads_callback openblas_threads_callback_;

FLOATRET BLASFUNC(sdot) (blasint *, float *, blasint *, float *, blasint *);
FLOATRET BLASFUNC(sdsdot)(blasint *, float *, float *, blasint *, float *, blasint *);

Expand Down
1 change: 1 addition & 0 deletions driver/others/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ if (USE_THREAD)
${BLAS_SERVER}
divtable.c # TODO: Makefile has -UDOUBLE
blas_l1_thread.c
blas_server_callback.c
)

if (NOT NO_AFFINITY)
Expand Down
5 changes: 4 additions & 1 deletion driver/others/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COMMONOBJS = memory.$(SUFFIX) xerbla.$(SUFFIX) c_abs.$(SUFFIX) z_abs.$(SUFFIX)
#COMMONOBJS += slamch.$(SUFFIX) slamc3.$(SUFFIX) dlamch.$(SUFFIX) dlamc3.$(SUFFIX)

ifdef SMP
COMMONOBJS += blas_server.$(SUFFIX) divtable.$(SUFFIX) blasL1thread.$(SUFFIX)
COMMONOBJS += blas_server.$(SUFFIX) divtable.$(SUFFIX) blasL1thread.$(SUFFIX) blas_server_callback.$(SUFFIX)
ifneq ($(NO_AFFINITY), 1)
COMMONOBJS += init.$(SUFFIX)
endif
Expand Down Expand Up @@ -140,6 +140,9 @@ memory.$(SUFFIX) : $(MEMORY) ../../common.h ../../param.h
blas_server.$(SUFFIX) : $(BLAS_SERVER) ../../common.h ../../common_thread.h ../../param.h
$(CC) $(CFLAGS) -c $< -o $(@F)

blas_server_callback.$(SUFFIX) : blas_server_callback.c ../../common.h
$(CC) $(CFLAGS) -c $< -o $(@F)

openblas_set_num_threads.$(SUFFIX) : openblas_set_num_threads.c
$(CC) $(CFLAGS) -c $< -o $(@F)

Expand Down
235 changes: 144 additions & 91 deletions driver/others/blas_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ int blas_server_avail __attribute__((aligned(ATTRIBUTE_SIZE))) = 0;

int blas_omp_threads_local = 1;

static void * blas_thread_buffer[MAX_CPU_NUMBER];

/* Local Variables */
#if defined(USE_PTHREAD_LOCK)
static pthread_mutex_t server_lock = PTHREAD_MUTEX_INITIALIZER;
Expand Down Expand Up @@ -190,6 +192,10 @@ static int main_status[MAX_CPU_NUMBER];
BLASLONG exit_time[MAX_CPU_NUMBER];
#endif

//Prototypes
static void exec_threads(int , blas_queue_t *, int);
static void adjust_thread_buffers();

static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){

if (!(mode & BLAS_COMPLEX)){
Expand Down Expand Up @@ -375,7 +381,6 @@ static void* blas_thread_server(void *arg){
/* Thread identifier */
BLASLONG cpu = (BLASLONG)arg;
unsigned int last_tick;
void *buffer, *sa, *sb;
blas_queue_t *queue;

blas_queue_t *tscq;
Expand All @@ -395,8 +400,6 @@ blas_queue_t *tscq;
main_status[cpu] = MAIN_ENTER;
#endif

buffer = blas_memory_alloc(2);

#ifdef SMP_DEBUG
fprintf(STDERR, "Server[%2ld] Thread has just been spawned!\n", cpu);
#endif
Expand Down Expand Up @@ -457,92 +460,8 @@ blas_queue_t *tscq;
#endif

if (queue) {
int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = (int (*)(blas_arg_t *, void *, void *, void *, void *, BLASLONG))queue -> routine;

atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1);

sa = queue -> sa;
sb = queue -> sb;

#ifdef SMP_DEBUG
if (queue -> args) {
fprintf(STDERR, "Server[%2ld] Calculation started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n",
cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k);
}
#endif

#ifdef CONSISTENT_FPCSR
#ifdef __aarch64__
__asm__ __volatile__ ("msr fpcr, %0" : : "r" (queue -> sse_mode));
#else
__asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode));
__asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode));
#endif
#endif

#ifdef MONITOR
main_status[cpu] = MAIN_RUNNING1;
#endif

if (sa == NULL) sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A);

if (sb == NULL) {
if (!(queue -> mode & BLAS_COMPLEX)){
#ifdef EXPRECISION
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
sb = (void *)(((BLASLONG)sa + ((QGEMM_P * QGEMM_Q * sizeof(xdouble)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
} else
#endif
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) {
#ifdef BUILD_DOUBLE
sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
#ifdef BUILD_SINGLE
sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else {
/* Other types in future */
}
} else {
#ifdef EXPRECISION
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
} else
#endif
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){
#ifdef BUILD_COMPLEX16
sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
#ifdef BUILD_COMPLEX
sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else {
/* Other types in future */
}
}
queue->sb=sb;
}

#ifdef MONITOR
main_status[cpu] = MAIN_RUNNING2;
#endif

if (queue -> mode & BLAS_LEGACY) {
legacy_exec(routine, queue -> mode, queue -> args, sb);
} else
if (queue -> mode & BLAS_PTHREAD) {
void (*pthreadcompat)(void *) = (void(*)(void*))queue -> routine;
(pthreadcompat)(queue -> args);
} else
(routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position);
exec_threads(cpu, queue, 0);

#ifdef SMP_DEBUG
fprintf(STDERR, "Server[%2ld] Calculation finished!\n", cpu);
Expand All @@ -557,7 +476,7 @@ blas_queue_t *tscq;
MB;
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)0);


}

#ifdef MONITOR
Expand All @@ -580,8 +499,6 @@ blas_queue_t *tscq;
fprintf(STDERR, "Server[%2ld] Shutdown!\n", cpu);
#endif

blas_memory_free(buffer);

//pthread_exit(NULL);

return NULL;
Expand Down Expand Up @@ -663,6 +580,9 @@ int blas_thread_init(void){

LOCK_COMMAND(&server_lock);

// Adjust thread buffers
adjust_thread_buffers();

if (!blas_server_avail){

thread_timeout_env=openblas_thread_timeout();
Expand Down Expand Up @@ -893,6 +813,18 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){
fprintf(STDERR, "Exec_blas is called. Number of executing threads : %ld\n", num);
#endif

//Redirect to caller's callback routine
if (openblas_threads_callback_) {
int buf_index = 0, i = 0;
#ifndef USE_SIMPLE_THREADED_LEVEL3
for (i = 0; i < num; i ++)
queue[i].position = i;
#endif
openblas_threads_callback_(1, (openblas_dojob_callback) exec_threads, num, sizeof(blas_queue_t), (void*) queue, buf_index);
return 0;
}


#ifdef __ELF__
if (omp_in_parallel && (num > 1)) {
if (omp_in_parallel() > 0) {
Expand Down Expand Up @@ -1066,6 +998,14 @@ int BLASFUNC(blas_thread_shutdown)(void){

LOCK_COMMAND(&server_lock);

//Free buffers allocated for threads
for(i=0; i<MAX_CPU_NUMBER; i++){
if(blas_thread_buffer[i]!=NULL){
blas_memory_free(blas_thread_buffer[i]);
blas_thread_buffer[i]=NULL;
}
}

if (blas_server_avail) {

for (i = 0; i < blas_num_threads - 1; i++) {
Expand Down Expand Up @@ -1102,5 +1042,118 @@ int BLASFUNC(blas_thread_shutdown)(void){
return 0;
}

static void adjust_thread_buffers() {

int i=0;

//adjust buffer for each thread
for(i=0; i < blas_cpu_number; i++){
if(blas_thread_buffer[i] == NULL){
blas_thread_buffer[i] = blas_memory_alloc(2);
}
}
for(; i < MAX_CPU_NUMBER; i++){
if(blas_thread_buffer[i] != NULL){
blas_memory_free(blas_thread_buffer[i]);
blas_thread_buffer[i] = NULL;
}
}
}

static void exec_threads(int cpu, blas_queue_t *queue, int buf_index)
{

void *buffer, *sa, *sb;

buffer = blas_thread_buffer[cpu];

int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = (int (*)(blas_arg_t *, void *, void *, void *, void *, BLASLONG))queue -> routine;

atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1);

sa = queue -> sa;
sb = queue -> sb;

#ifdef SMP_DEBUG
if (queue -> args) {
fprintf(STDERR, "Server[%2ld] Calculation started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n",
cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k);
}
#endif

#ifdef CONSISTENT_FPCSR
#ifdef __aarch64__
__asm__ __volatile__ ("msr fpcr, %0" : : "r" (queue -> sse_mode));
#else
__asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode));
__asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode));
#endif
#endif

#ifdef MONITOR
main_status[cpu] = MAIN_RUNNING1;
#endif

if (sa == NULL) sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A);

if (sb == NULL) {
if (!(queue -> mode & BLAS_COMPLEX)){
#ifdef EXPRECISION
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
sb = (void *)(((BLASLONG)sa + ((QGEMM_P * QGEMM_Q * sizeof(xdouble)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
} else
#endif
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) {
#ifdef BUILD_DOUBLE
sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
#ifdef BUILD_SINGLE
sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else {
/* Other types in future */
}
} else {
#ifdef EXPRECISION
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
} else
#endif
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){
#ifdef BUILD_COMPLEX16
sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
#ifdef BUILD_COMPLEX
sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else {
/* Other types in future */
}
}
queue->sb=sb;
}

#ifdef MONITOR
main_status[cpu] = MAIN_RUNNING2;
#endif

if (queue -> mode & BLAS_LEGACY) {
legacy_exec(routine, queue -> mode, queue -> args, sb);
} else
if (queue -> mode & BLAS_PTHREAD) {
void (*pthreadcompat)(void *) = (void(*)(void*))queue -> routine;
(pthreadcompat)(queue -> args);
} else
(routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position);

}

#endif
12 changes: 12 additions & 0 deletions driver/others/blas_server_callback.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#include "common.h"

/* global variable to change threading backend from openblas-managed to caller-managed */
openblas_threads_callback openblas_threads_callback_ = 0;

/* non-threadsafe function should be called before any other
openblas function to change how threads are managed */

void openblas_set_threads_callback_function(openblas_threads_callback callback)
{
openblas_threads_callback_ = callback;
}
Loading

0 comments on commit 7061af9

Please sign in to comment.