Blame src/izem/src/cond/zm_wskip.c

Packit Service c5cf8c
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit Service c5cf8c
/*
Packit Service c5cf8c
 * See COPYRIGHT in top-level directory.
Packit Service c5cf8c
 */
Packit Service c5cf8c
Packit Service c5cf8c
#include <stdlib.h>
Packit Service c5cf8c
#include <hwloc.h>
Packit Service c5cf8c
#include "lock/zm_mcs.h"
Packit Service c5cf8c
#include "cond/zm_wskip.h"
Packit Service c5cf8c
Packit Service c5cf8c
#define ZM_WAIT 0
Packit Service c5cf8c
#define ZM_WAKE 1
Packit Service c5cf8c
#define ZM_SKIP 2
Packit Service c5cf8c
#define ZM_RECYCLE 3
Packit Service c5cf8c
#define ZM_CHECK 4
Packit Service c5cf8c
Packit Service c5cf8c
struct zm_mcs {
Packit Service c5cf8c
    zm_atomic_ptr_t lock;
Packit Service c5cf8c
    struct zm_mcs_qnode *local_nodes;
Packit Service c5cf8c
    hwloc_topology_t topo;
Packit Service c5cf8c
};
Packit Service c5cf8c
Packit Service c5cf8c
static zm_thread_local int tid = -1;
Packit Service c5cf8c
Packit Service c5cf8c
/* Check the actual affinity mask assigned to the thread */
Packit Service c5cf8c
static inline void check_affinity(hwloc_topology_t topo) {
Packit Service c5cf8c
    hwloc_cpuset_t cpuset = hwloc_bitmap_alloc();
Packit Service c5cf8c
    int set_length;
Packit Service c5cf8c
    hwloc_get_cpubind(topo, cpuset, HWLOC_CPUBIND_THREAD);
Packit Service c5cf8c
    set_length = hwloc_get_nbobjs_inside_cpuset_by_type(topo, cpuset, HWLOC_OBJ_PU);
Packit Service c5cf8c
    hwloc_bitmap_free(cpuset);
Packit Service c5cf8c
Packit Service c5cf8c
    if(set_length != 1) {
Packit Service c5cf8c
        printf("IZEM:WSKIP:ERROR: thread bound to more than one HW thread!\n");
Packit Service c5cf8c
        exit(EXIT_FAILURE);
Packit Service c5cf8c
    }
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
static inline int get_hwthread_id(hwloc_topology_t topo){
Packit Service c5cf8c
    hwloc_cpuset_t cpuset = hwloc_bitmap_alloc();
Packit Service c5cf8c
    hwloc_obj_t obj;
Packit Service c5cf8c
    hwloc_get_cpubind(topo, cpuset, HWLOC_CPUBIND_THREAD);
Packit Service c5cf8c
    obj = hwloc_get_obj_inside_cpuset_by_type(topo, cpuset, HWLOC_OBJ_PU, 0);
Packit Service c5cf8c
    hwloc_bitmap_free(cpuset);
Packit Service c5cf8c
    return obj->logical_index;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
static void* new_wskip() {
Packit Service c5cf8c
    int max_threads;
Packit Service c5cf8c
    struct zm_mcs_qnode *qnodes;
Packit Service c5cf8c
Packit Service c5cf8c
Packit Service c5cf8c
    struct zm_mcs *L;
Packit Service c5cf8c
    posix_memalign((void **) &L, ZM_CACHELINE_SIZE, sizeof(struct zm_mcs));
Packit Service c5cf8c
Packit Service c5cf8c
    hwloc_topology_init(&L->topo);
Packit Service c5cf8c
    hwloc_topology_load(L->topo);
Packit Service c5cf8c
Packit Service c5cf8c
    max_threads = hwloc_get_nbobjs_by_type(L->topo, HWLOC_OBJ_PU);
Packit Service c5cf8c
Packit Service c5cf8c
    posix_memalign((void **) &qnodes, ZM_CACHELINE_SIZE, sizeof(struct zm_mcs_qnode) * max_threads);
Packit Service c5cf8c
    for (int i = 0; i < max_threads; i ++)
Packit Service c5cf8c
        zm_atomic_store(&qnodes[i].status, ZM_RECYCLE, zm_memord_release);
Packit Service c5cf8c
Packit Service c5cf8c
    zm_atomic_store(&L->lock, (zm_ptr_t)ZM_NULL, zm_memord_release);
Packit Service c5cf8c
    L->local_nodes = qnodes;
Packit Service c5cf8c
Packit Service c5cf8c
    return L;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
/* This routine is just to insert myself into the queue and block
Packit Service c5cf8c
 * whoever comes after me. */
Packit Service c5cf8c
static inline int enq(struct zm_mcs *L, zm_mcs_qnode_t* I, int *wait) {
Packit Service c5cf8c
    *wait = 1;
Packit Service c5cf8c
    zm_mcs_qnode_t* pred;
Packit Service c5cf8c
    int status = zm_atomic_exchange_int(&I->status, ZM_WAIT, zm_memord_acq_rel);
Packit Service c5cf8c
    /* wake() passed this node and is in the processs of setting it to RECYCLE */
Packit Service c5cf8c
    if(status == ZM_CHECK) {
Packit Service c5cf8c
        while (status != ZM_RECYCLE)
Packit Service c5cf8c
            status = zm_atomic_load(&I->status, zm_memord_acquire); /* wait */
Packit Service c5cf8c
        zm_atomic_store(&I->status, ZM_WAIT, zm_memord_release);
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    if (status == ZM_RECYCLE) {
Packit Service c5cf8c
        zm_atomic_store(&I->next, ZM_NULL, zm_memord_release);
Packit Service c5cf8c
        pred = (zm_mcs_qnode_t*)zm_atomic_exchange_ptr(&L->lock, (zm_ptr_t)I, zm_memord_acq_rel);
Packit Service c5cf8c
        if((zm_ptr_t)pred == ZM_NULL) {
Packit Service c5cf8c
            zm_atomic_store(&I->status, ZM_WAKE, zm_memord_release);
Packit Service c5cf8c
            *wait = 0;
Packit Service c5cf8c
            return 0;
Packit Service c5cf8c
        }
Packit Service c5cf8c
        zm_atomic_store(&pred->next, (zm_ptr_t)I, zm_memord_release);
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    return 0;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
/* Main routines */
Packit Service c5cf8c
static inline int wait(struct zm_mcs *L, zm_mcs_qnode_t* I) {
Packit Service c5cf8c
    int wait = 0;
Packit Service c5cf8c
    /* First, insert the qnode into the queue */
Packit Service c5cf8c
    enq(L,I, &wait);
Packit Service c5cf8c
    /* wait in line if necessary */
Packit Service c5cf8c
    if (wait)
Packit Service c5cf8c
        while(zm_atomic_load(&I->status, zm_memord_acquire) != ZM_WAKE &&
Packit Service c5cf8c
              zm_atomic_load(&I->status, zm_memord_acquire) != ZM_RECYCLE)
Packit Service c5cf8c
            ; /* SPIN */
Packit Service c5cf8c
Packit Service c5cf8c
    return 0;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
/* Release the lock */
Packit Service c5cf8c
static inline int wake(struct zm_mcs *L, zm_mcs_qnode_t *I) {
Packit Service c5cf8c
Packit Service c5cf8c
    int status = ZM_WAKE;
Packit Service c5cf8c
    if(!zm_atomic_compare_exchange_strong(&I->status,
Packit Service c5cf8c
                                         &status,
Packit Service c5cf8c
                                         ZM_RECYCLE,
Packit Service c5cf8c
                                         zm_memord_acq_rel,
Packit Service c5cf8c
                                         zm_memord_acquire))
Packit Service c5cf8c
        return 0;
Packit Service c5cf8c
Packit Service c5cf8c
    zm_mcs_qnode_t *cur_node = I;
Packit Service c5cf8c
    zm_mcs_qnode_t *next = (zm_mcs_qnode_t*)zm_atomic_load(&cur_node->next, zm_memord_acquire);
Packit Service c5cf8c
    zm_mcs_qnode_t *pred = NULL;
Packit Service c5cf8c
    /* traverse queue until end or encountering a node that wasn't skipped */
Packit Service c5cf8c
    while ((zm_ptr_t)next != ZM_NULL) {
Packit Service c5cf8c
        int status = ZM_WAIT;
Packit Service c5cf8c
        if(zm_atomic_compare_exchange_strong(&next->status,
Packit Service c5cf8c
                                         &status,
Packit Service c5cf8c
                                         ZM_WAKE,
Packit Service c5cf8c
                                         zm_memord_acq_rel,
Packit Service c5cf8c
                                         zm_memord_acquire))
Packit Service c5cf8c
            break;
Packit Service c5cf8c
        zm_atomic_store(&next->status, ZM_CHECK, zm_memord_release);
Packit Service c5cf8c
        /* modify next for reverse traversal later */
Packit Service c5cf8c
        zm_atomic_store(&cur_node->next, pred, zm_memord_release);
Packit Service c5cf8c
        pred = cur_node;
Packit Service c5cf8c
        cur_node = next;
Packit Service c5cf8c
        next = (zm_mcs_qnode_t*)zm_atomic_load(&next->next, zm_memord_acquire);
Packit Service c5cf8c
    }
Packit Service c5cf8c
    /* reverse traversal for recycling */
Packit Service c5cf8c
    zm_atomic_store(&cur_node->status, ZM_RECYCLE, zm_memord_release);
Packit Service c5cf8c
    zm_mcs_qnode_t *rev = pred;
Packit Service c5cf8c
    while((zm_ptr_t)rev != ZM_NULL) {
Packit Service c5cf8c
        zm_atomic_store(&rev->status, ZM_RECYCLE, zm_memord_release);
Packit Service c5cf8c
        rev = (zm_mcs_qnode_t*)zm_atomic_load(&rev->next, zm_memord_acquire);
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    if ((zm_ptr_t)next == ZM_NULL) {
Packit Service c5cf8c
        zm_mcs_qnode_t *tmp = cur_node;
Packit Service c5cf8c
        if(zm_atomic_compare_exchange_strong(&L->lock,
Packit Service c5cf8c
                                             (zm_ptr_t*)&tmp,
Packit Service c5cf8c
                                             ZM_NULL,
Packit Service c5cf8c
                                             zm_memord_acq_rel,
Packit Service c5cf8c
                                             zm_memord_acquire))
Packit Service c5cf8c
            return 0;
Packit Service c5cf8c
        while(zm_atomic_load(&cur_node->next, zm_memord_acquire) == ZM_NULL)
Packit Service c5cf8c
            ; /* SPIN */
Packit Service c5cf8c
        zm_atomic_store(&((zm_mcs_qnode_t*)zm_atomic_load(&cur_node->next, zm_memord_acquire))->status, ZM_WAKE, zm_memord_release);
Packit Service c5cf8c
    }
Packit Service c5cf8c
    zm_atomic_store(&I->next, NULL, zm_memord_release);
Packit Service c5cf8c
Packit Service c5cf8c
    return 0;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
static inline int skip(zm_mcs_qnode_t *I) {
Packit Service c5cf8c
    int status = ZM_WAIT;
Packit Service c5cf8c
    zm_atomic_compare_exchange_strong(&I->status,
Packit Service c5cf8c
                                      &status,
Packit Service c5cf8c
                                      ZM_SKIP,
Packit Service c5cf8c
                                      zm_memord_acq_rel,
Packit Service c5cf8c
                                      zm_memord_acquire);
Packit Service c5cf8c
    return 0;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
static inline int nowaiters(struct zm_mcs *L, zm_mcs_qnode_t *I) {
Packit Service c5cf8c
    return (zm_atomic_load(&I->next, zm_memord_acquire) == ZM_NULL);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int wskip_wait(struct zm_mcs *L, zm_mcs_qnode_t** I) {
Packit Service c5cf8c
    if (zm_unlikely(tid == -1)) {
Packit Service c5cf8c
        check_affinity(L->topo);
Packit Service c5cf8c
        tid = get_hwthread_id(L->topo);
Packit Service c5cf8c
    }
Packit Service c5cf8c
    *I= &L->local_nodes[tid];
Packit Service c5cf8c
    return wait(L, *I);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int wskip_enq(struct zm_mcs *L, zm_mcs_qnode_t *I) {
Packit Service c5cf8c
    int wait; /* unused */
Packit Service c5cf8c
    return enq(L, I, &wait);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int wskip_wake(struct zm_mcs *L, zm_mcs_qnode_t *I) {
Packit Service c5cf8c
    return wake(L, I);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int wskip_skip(zm_mcs_qnode_t *I) {
Packit Service c5cf8c
    return skip(I);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int wskip_nowaiters(struct zm_mcs *L, zm_mcs_qnode_t *I) {
Packit Service c5cf8c
    return nowaiters(L, I);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
static inline int free_wskip(struct zm_mcs *L)
Packit Service c5cf8c
{
Packit Service c5cf8c
    free(L->local_nodes);
Packit Service c5cf8c
    hwloc_topology_destroy(L->topo);
Packit Service c5cf8c
    return 0;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
Packit Service c5cf8c
int zm_wskip_init(zm_mcs_t *handle) {
Packit Service c5cf8c
    void *p = new_wskip();
Packit Service c5cf8c
    *handle  = (zm_mcs_t) p;
Packit Service c5cf8c
    return 0;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int zm_wskip_destroy(zm_mcs_t *L) {
Packit Service c5cf8c
    free_wskip((struct zm_mcs*)(*L));
Packit Service c5cf8c
    return 0;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int zm_wskip_wait(zm_mcs_t L, zm_mcs_qnode_t** I) {
Packit Service c5cf8c
    return wskip_wait((struct zm_mcs*)(void *)L, I);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int zm_wskip_enq(zm_mcs_t L, zm_mcs_qnode_t* I) {
Packit Service c5cf8c
    return wskip_enq((struct zm_mcs*)(void *)L, I);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int zm_wskip_wake(zm_mcs_t L, zm_mcs_qnode_t *I) {
Packit Service c5cf8c
    return wskip_wake((struct zm_mcs*)(void *)L, I);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int zm_wskip_skip(zm_mcs_qnode_t *I) {
Packit Service c5cf8c
    return wskip_skip(I);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
int zm_wskip_nowaiters(zm_mcs_t L, zm_mcs_qnode_t *I) {
Packit Service c5cf8c
    return wskip_nowaiters((struct zm_mcs*)(void *)L, I);
Packit Service c5cf8c
}
Packit Service c5cf8c