/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#include "arbiter.h"
#include <ucs/debug/assert.h>
#include <ucs/debug/log.h>
#define SENTINEL ((ucs_arbiter_elem_t*)0x1)
void ucs_arbiter_init(ucs_arbiter_t *arbiter)
{
arbiter->current = NULL;
UCS_ARBITER_GUARD_INIT(arbiter);
}
void ucs_arbiter_group_init(ucs_arbiter_group_t *group)
{
group->tail = NULL;
}
void ucs_arbiter_cleanup(ucs_arbiter_t *arbiter)
{
ucs_assert(arbiter->current == NULL);
}
void ucs_arbiter_group_cleanup(ucs_arbiter_group_t *group)
{
ucs_assert(group->tail == NULL);
}
void ucs_arbiter_group_push_elem_always(ucs_arbiter_group_t *group,
ucs_arbiter_elem_t *elem)
{
ucs_arbiter_elem_t *tail = group->tail;
if (tail == NULL) {
elem->list.next = NULL; /* Not scheduled yet */
elem->next = elem; /* Connect to itself */
} else {
elem->next = tail->next; /* Point to first element */
tail->next = elem; /* Point previous element to new one */
}
elem->group = group; /* Always point to group */
group->tail = elem; /* Update group tail */
}
static int ucs_arbiter_group_is_scheduled(ucs_arbiter_elem_t *head)
{
return head->list.next != NULL;
}
void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_t *arbiter,
ucs_arbiter_group_t *group,
ucs_arbiter_elem_t *elem)
{
ucs_arbiter_elem_t *tail = group->tail;
ucs_arbiter_elem_t *head;
elem->group = group; /* Always point to group */
elem->list.next = NULL; /* Not scheduled yet */
if (tail == NULL) {
elem->next = elem; /* Connect to itself */
group->tail = elem; /* Update group tail */
return;
}
head = tail->next;
elem->next = head; /* Point to first element */
tail->next = elem; /* Point previous element to new one */
if (!ucs_arbiter_group_is_scheduled(head)) {
return;
}
ucs_assert(arbiter != NULL);
if (head->list.next == &head->list) {
/* single group which was scheduled */
ucs_assert(arbiter->current == head);
ucs_list_head_init(&elem->list);
arbiter->current = elem;
} else {
ucs_list_insert_replace(head->list.prev, head->list.next, &elem->list);
if (arbiter->current == head) {
arbiter->current = elem;
}
}
}
void ucs_arbiter_group_head_desched(ucs_arbiter_t *arbiter,
ucs_arbiter_elem_t *head)
{
ucs_arbiter_elem_t *next;
if (!ucs_arbiter_group_is_scheduled(head)) {
return; /* Not scheduled */
}
/* If this group is the next to be scheduled, skip it */
if (arbiter->current == head) {
next = ucs_list_next(&head->list, ucs_arbiter_elem_t, list);
arbiter->current = (next == head) ? NULL : next;
}
ucs_list_del(&head->list);
}
void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter,
ucs_arbiter_group_t *group,
ucs_arbiter_callback_t cb, void *cb_arg)
{
ucs_arbiter_elem_t *tail = group->tail;
ucs_arbiter_elem_t *next_group = NULL;
ucs_arbiter_elem_t *prev_group = NULL;
ucs_arbiter_elem_t *ptr, *next, *prev;
ucs_arbiter_elem_t *head, *orig_head;
ucs_arbiter_cb_result_t result;
int is_scheduled;
if (tail == NULL) {
return; /* Empty group */
}
orig_head = head = tail->next;
is_scheduled = ucs_arbiter_group_is_scheduled(head);
next = head;
prev = tail;
if (is_scheduled) {
prev_group = ucs_list_prev(&head->list, ucs_arbiter_elem_t, list);
next_group = ucs_list_next(&head->list, ucs_arbiter_elem_t, list);
}
do {
ptr = next;
next = ptr->next;
/* Can't touch the element if it gets removed. But it can be reused
* later as well, so it's next should be NULL. */
ptr->next = NULL;
result = cb(arbiter, ptr, cb_arg);
if (result == UCS_ARBITER_CB_RESULT_REMOVE_ELEM) {
if (ptr == head) {
head = next;
if (ptr == tail) {
/* Last element is being removed - mark group as empty */
group->tail = NULL;
/* Break here to keep ptr->next = NULL, otherwise ptr->next
will be set to itself below */
break;
}
} else if (ptr == tail) {
group->tail = prev;
/* tail->next should point to head, make sure next is head
* (it is assigned 2 lines below) */
ucs_assert_always(next == head);
}
prev->next = next;
} else {
/* keep the element */
ptr->next = next; /* Restore next pointer */
prev = ptr;
}
} while (ptr != tail);
if (is_scheduled) {
if (orig_head == prev_group) {
/* this is the only group which was scheduled */
if (group->tail == NULL) {
/* group became empty - no more groups scheduled */
arbiter->current = NULL;
} else if (orig_head != head) {
/* keep the group scheduled, but with new head element */
arbiter->current = head;
ucs_list_head_init(&head->list);
}
} else {
if (group->tail == NULL) {
/* group became empty - deschedule it */
prev_group->list.next = &next_group->list;
next_group->list.prev = &prev_group->list;
if (arbiter->current == orig_head) {
arbiter->current = next_group;
}
} else if (orig_head != head) {
/* keep the group scheduled, but with new head element */
ucs_list_insert_replace(&prev_group->list, &next_group->list,
&head->list);
if (arbiter->current == orig_head) {
arbiter->current = head;
}
}
}
} else if ((orig_head != head) && (group->tail != NULL)) {
/* Mark new head as unscheduled */
head->list.next = NULL;
}
}
void ucs_arbiter_group_schedule_nonempty(ucs_arbiter_t *arbiter,
ucs_arbiter_group_t *group)
{
ucs_arbiter_elem_t *tail = group->tail;
ucs_arbiter_elem_t *current, *head;
UCS_ARBITER_GUARD_CHECK(arbiter);
ucs_assert(tail != NULL);
head = tail->next;
if (head == NULL) {
/* it means that 1 element group is
* scheduled during dispatch.
* Restore next pointer.
*/
head = tail->next = tail;
}
if (ucs_arbiter_group_is_scheduled(head)) {
return; /* Already scheduled */
}
current = arbiter->current;
if (current == NULL) {
ucs_list_head_init(&head->list);
arbiter->current = head;
} else {
ucs_list_insert_before(¤t->list, &head->list);
}
}
void ucs_arbiter_dispatch_nonempty(ucs_arbiter_t *arbiter, unsigned per_group,
ucs_arbiter_callback_t cb, void *cb_arg)
{
ucs_arbiter_elem_t *group_head, *last_elem, *elem, *next_elem;
ucs_list_link_t *elem_list_next;
ucs_arbiter_elem_t *next_group, *prev_group;
ucs_arbiter_group_t *group;
ucs_arbiter_cb_result_t result;
unsigned group_dispatch_count;
int is_single_group;
UCS_LIST_HEAD(resched_groups);
next_group = arbiter->current;
ucs_assert(next_group != NULL);
do {
group_head = next_group;
ucs_assert(group_head != NULL);
prev_group = ucs_list_prev(&group_head->list, ucs_arbiter_elem_t, list);
next_group = ucs_list_next(&group_head->list, ucs_arbiter_elem_t, list);
ucs_assert(prev_group != NULL);
ucs_assert(next_group != NULL);
ucs_assert(prev_group->list.next == &group_head->list);
ucs_assert(next_group->list.prev == &group_head->list);
group_dispatch_count = 0;
group = group_head->group;
last_elem = group->tail;
next_elem = group_head;
is_single_group = group_head == prev_group;
do {
elem = next_elem;
next_elem = elem->next;
/* zero pointer to next elem here because:
* - user callback may free() the element
* - push_elem() will fail if next is not NULL
* and elem is reused later. For example in
* rc/ud transports control.
*/
elem->next = NULL;
elem_list_next = elem->list.next;
elem->list.next = NULL;
ucs_assert(elem->group == group);
ucs_trace_poll("dispatching arbiter element %p", elem);
UCS_ARBITER_GUARD_ENTER(arbiter);
result = cb(arbiter, elem, cb_arg);
UCS_ARBITER_GUARD_EXIT(arbiter);
ucs_trace_poll("dispatch result %d", result);
++group_dispatch_count;
if (result == UCS_ARBITER_CB_RESULT_REMOVE_ELEM) {
if (elem == last_elem) {
/* Only element */
group->tail = NULL; /* Group is empty now */
if (is_single_group) {
next_group = NULL; /* No more groups */
} else {
/* Remove the group */
prev_group->list.next = &next_group->list;
next_group->list.prev = &prev_group->list;
}
} else {
/* Not only element */
ucs_assert(elem == last_elem->next); /* first element should be removed */
if (is_single_group) {
next_group = next_elem; /* No more groups, point arbiter
to next element in this group */
ucs_list_head_init(&next_elem->list);
} else {
ucs_list_insert_replace(&prev_group->list,
&next_group->list,
&next_elem->list);
}
last_elem->next = next_elem; /* Tail points to new head */
}
} else if (result == UCS_ARBITER_CB_RESULT_NEXT_GROUP) {
elem->next = next_elem;
/* avoid infinite loop */
elem->list.next = elem_list_next;
break;
} else if ((result == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) ||
(result == UCS_ARBITER_CB_RESULT_RESCHED_GROUP)) {
elem->next = next_elem;
if (is_single_group) {
next_group = NULL; /* No more groups */
} else {
prev_group->list.next = &next_group->list;
next_group->list.prev = &prev_group->list;
}
if (result == UCS_ARBITER_CB_RESULT_RESCHED_GROUP) {
ucs_list_add_tail(&resched_groups, &elem->list);
}
break;
} else if (result == UCS_ARBITER_CB_RESULT_STOP) {
elem->next = next_elem;
elem->list.next = elem_list_next;
/* make sure that next dispatch() will continue
* from the current group */
arbiter->current = elem;
goto out;
} else {
elem->next = next_elem;
elem->list.next = elem_list_next;
ucs_bug("unexpected return value from arbiter callback");
}
} while ((elem != last_elem) && (group_dispatch_count < per_group));
} while (next_group != NULL);
arbiter->current = NULL;
out:
ucs_list_for_each_safe(elem, next_elem, &resched_groups, list) {
ucs_list_del(&elem->list);
elem->list.next = NULL;
ucs_trace_poll("reschedule group %p", elem->group);
ucs_arbiter_group_schedule_nonempty(arbiter, elem->group);
}
}
void ucs_arbiter_dump(ucs_arbiter_t *arbiter, FILE *stream)
{
ucs_arbiter_elem_t *first_group, *group_head, *elem;
fprintf(stream, "-------\n");
if (arbiter->current == NULL) {
fprintf(stream, "(empty)\n");
goto out;
}
first_group = arbiter->current;
group_head = first_group;
do {
elem = group_head;
if (group_head == first_group) {
fprintf(stream, "=> ");
} else {
fprintf(stream, " * ");
}
do {
fprintf(stream, "[%p", elem);
if (elem == group_head) {
fprintf(stream, " prev_g:%p", elem->list.prev);
fprintf(stream, " next_g:%p", elem->list.next);
}
fprintf(stream, " next_e:%p grp:%p]", elem->next, elem->group);
if (elem->next != group_head) {
fprintf(stream, "->");
}
elem = elem->next;
} while (elem != group_head);
fprintf(stream, "\n");
group_head = ucs_list_next(&group_head->list, ucs_arbiter_elem_t, list);
} while (group_head != first_group);
out:
fprintf(stream, "-------\n");
}