Blame usr/actor.c

Packit Service 646995
/*
Packit Service 646995
 * iSCSI timeout & deferred work handling
Packit Service 646995
 *
Packit Service 646995
 * Copyright (C) 2004 Dmitry Yusupov, Alex Aizman
Packit Service 646995
 * Copyright (C) 2014 Red Hat Inc.
Packit Service 646995
 * maintained by open-iscsi@googlegroups.com
Packit Service 646995
 *
Packit Service 646995
 * This program is free software; you can redistribute it and/or modify
Packit Service 646995
 * it under the terms of the GNU General Public License as published
Packit Service 646995
 * by the Free Software Foundation; either version 2 of the License, or
Packit Service 646995
 * (at your option) any later version.
Packit Service 646995
 *
Packit Service 646995
 * This program is distributed in the hope that it will be useful, but
Packit Service 646995
 * WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 646995
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Packit Service 646995
 * General Public License for more details.
Packit Service 646995
 *
Packit Service 646995
 * See the file COPYING included with this distribution for more details.
Packit Service 646995
 */
Packit Service 646995
#include <inttypes.h>
Packit Service 646995
#include <time.h>
Packit Service 646995
#include <sys/signalfd.h>
Packit Service 646995
#include <assert.h>
Packit Service 646995
#include <unistd.h>
Packit Service 646995
#include "actor.h"
Packit Service 646995
#include "log.h"
Packit Service 646995
#include "list.h"
Packit Service 646995
Packit Service 646995
static LIST_HEAD(pend_list);
Packit Service 646995
static LIST_HEAD(ready_list);
Packit Service 646995
static volatile int poll_in_progress;
Packit Service 646995
Packit Service 646995
static uint64_t
Packit Service 646995
actor_time_left(actor_t *thread, time_t current_time)
Packit Service 646995
{
Packit Service 646995
	if (current_time > thread->ttschedule)
Packit Service 646995
		return 0;
Packit Service 646995
	else
Packit Service 646995
		return (thread->ttschedule - current_time);
Packit Service 646995
}
Packit Service 646995
Packit Service 646995
#define time_after(a,b) \
Packit Service 646995
	((int64_t)(b) - (int64_t)(a) < 0)
Packit Service 646995
Packit Service 646995
void
Packit Service 646995
actor_init(actor_t *thread, void (*callback)(void *), void *data)
Packit Service 646995
{
Packit Service 646995
	INIT_LIST_HEAD(&thread->list);
Packit Service 646995
	thread->state = ACTOR_NOTSCHEDULED;
Packit Service 646995
	thread->callback = callback;
Packit Service 646995
	thread->data = data;
Packit Service 646995
}
Packit Service 646995
Packit Service 646995
void
Packit Service 646995
actor_delete(actor_t *thread)
Packit Service 646995
{
Packit Service 646995
	log_debug(7, "thread %08lx delete: state %d", (long)thread,
Packit Service 646995
			thread->state);
Packit Service 646995
	switch(thread->state) {
Packit Service 646995
	case ACTOR_WAITING:
Packit Service 646995
		/* TODO: remove/reset alarm if we were 1st entry in pend_list */
Packit Service 646995
		/* priority: low */
Packit Service 646995
		/* fallthrough */
Packit Service 646995
	case ACTOR_SCHEDULED:
Packit Service 646995
		log_debug(1, "deleting a scheduled/waiting thread!");
Packit Service 646995
		list_del_init(&thread->list);
Packit Service 646995
		if (list_empty(&pend_list)) {
Packit Service 646995
			log_debug(7, "nothing left on pend_list, deactivating alarm");
Packit Service 646995
			alarm(0);
Packit Service 646995
		}
Packit Service 646995
Packit Service 646995
		break;
Packit Service 646995
	default:
Packit Service 646995
		break;
Packit Service 646995
	}
Packit Service 646995
	thread->state = ACTOR_NOTSCHEDULED;
Packit Service 646995
}
Packit Service 646995
Packit Service 646995
/*
Packit Service 646995
 * Inserts actor on pend list and sets alarm if new item is
Packit Service 646995
 * sooner than previous entries.
Packit Service 646995
 */
Packit Service 646995
static void
Packit Service 646995
actor_insert_on_pend_list(actor_t *thread, uint32_t delay_secs)
Packit Service 646995
{
Packit Service 646995
	struct actor *orig_head;
Packit Service 646995
	struct actor *new_head;
Packit Service 646995
	struct actor *next_thread;
Packit Service 646995
Packit Service 646995
	orig_head = list_first_entry_or_null(&pend_list,
Packit Service 646995
					     struct actor, list);
Packit Service 646995
Packit Service 646995
	/* insert new entry in sort order */
Packit Service 646995
	list_for_each_entry(next_thread, &pend_list, list) {
Packit Service 646995
		if (time_after(next_thread->ttschedule, thread->ttschedule)) {
Packit Service 646995
			log_debug(7, "next thread %p due %lld", next_thread,
Packit Service 646995
			  (long long)next_thread->ttschedule);
Packit Service 646995
			log_debug(7, "new thread %p is before (%lld), inserting", thread,
Packit Service 646995
			  (long long)thread->ttschedule);
Packit Service 646995
Packit Service 646995
			/* insert new thread before the next thread */
Packit Service 646995
			__list_add(&thread->list, next_thread->list.prev, &next_thread->list);
Packit Service 646995
			goto inserted;
Packit Service 646995
		}
Packit Service 646995
	}
Packit Service 646995
Packit Service 646995
	if (orig_head) {
Packit Service 646995
		log_debug(7, "last thread %p due %lld", next_thread,
Packit Service 646995
			  (long long)next_thread->ttschedule);
Packit Service 646995
		log_debug(7, "new thread %p is after (%lld), inserting at tail", thread,
Packit Service 646995
			  (long long)thread->ttschedule);
Packit Service 646995
	}
Packit Service 646995
	else
Packit Service 646995
		log_debug(7, "new thread %p due %lld is first item on pend_list", thread,
Packit Service 646995
			  (long long)thread->ttschedule);
Packit Service 646995
Packit Service 646995
	/* Not before any existing entries */
Packit Service 646995
	list_add_tail(&thread->list, &pend_list);
Packit Service 646995
Packit Service 646995
inserted:
Packit Service 646995
	new_head = list_first_entry(&pend_list, struct actor, list);
Packit Service 646995
	if (orig_head != new_head) {
Packit Service 646995
		int result = alarm(delay_secs);
Packit Service 646995
		log_debug(7, "new alarm set for %d seconds, old alarm %d",
Packit Service 646995
			  delay_secs, result);
Packit Service 646995
	}
Packit Service 646995
}
Packit Service 646995
Packit Service 646995
static void
Packit Service 646995
actor_schedule_private(actor_t *thread, uint32_t delay_secs, int head)
Packit Service 646995
{
Packit Service 646995
	time_t current_time;
Packit Service 646995
Packit Service 646995
	struct timespec tv;
Packit Service 646995
Packit Service 646995
	if (clock_gettime(CLOCK_MONOTONIC_COARSE, &tv)) {
Packit Service 646995
		log_error("clock_getime failed, can't schedule!");
Packit Service 646995
		return;
Packit Service 646995
	}
Packit Service 646995
Packit Service 646995
	current_time = tv.tv_sec;
Packit Service 646995
Packit Service 646995
	log_debug(7, "thread %p schedule: delay %u state %d",
Packit Service 646995
		thread, delay_secs, thread->state);
Packit Service 646995
Packit Service 646995
	switch(thread->state) {
Packit Service 646995
	case ACTOR_WAITING:
Packit Service 646995
		log_error("rescheduling a waiting thread!");
Packit Service 646995
		list_del(&thread->list);
Packit Service 646995
		/* fall-through */
Packit Service 646995
	case ACTOR_NOTSCHEDULED:
Packit Service 646995
		INIT_LIST_HEAD(&thread->list);
Packit Service 646995
Packit Service 646995
		if (delay_secs == 0) {
Packit Service 646995
			thread->state = ACTOR_SCHEDULED;
Packit Service 646995
			if (head)
Packit Service 646995
				list_add(&thread->list, &ready_list);
Packit Service 646995
			else
Packit Service 646995
				list_add_tail(&thread->list, &ready_list);
Packit Service 646995
		} else {
Packit Service 646995
			thread->state = ACTOR_WAITING;
Packit Service 646995
			thread->ttschedule = current_time + delay_secs;
Packit Service 646995
Packit Service 646995
			actor_insert_on_pend_list(thread, delay_secs);
Packit Service 646995
		}
Packit Service 646995
		break;
Packit Service 646995
	case ACTOR_SCHEDULED:
Packit Service 646995
		// don't do anything
Packit Service 646995
		break;
Packit Service 646995
	case ACTOR_INVALID:
Packit Service 646995
		log_error("BUG: Trying to schedule a thread that has not been "
Packit Service 646995
			  "setup. Ignoring sched.");
Packit Service 646995
		break;
Packit Service 646995
	}
Packit Service 646995
Packit Service 646995
}
Packit Service 646995
Packit Service 646995
void
Packit Service 646995
actor_schedule_head(actor_t *thread)
Packit Service 646995
{
Packit Service 646995
	actor_schedule_private(thread, 0, 1);
Packit Service 646995
}
Packit Service 646995
Packit Service 646995
void
Packit Service 646995
actor_schedule(actor_t *thread)
Packit Service 646995
{
Packit Service 646995
	actor_schedule_private(thread, 0, 0);
Packit Service 646995
}
Packit Service 646995
Packit Service 646995
void
Packit Service 646995
actor_timer(actor_t *thread, uint32_t timeout_secs, void (*callback)(void *),
Packit Service 646995
	    void *data)
Packit Service 646995
{
Packit Service 646995
	actor_init(thread, callback, data);
Packit Service 646995
	actor_schedule_private(thread, timeout_secs, 0);
Packit Service 646995
}
Packit Service 646995
Packit Service 646995
void
Packit Service 646995
actor_timer_mod(actor_t *thread, uint32_t new_timeout_secs, void *data)
Packit Service 646995
{
Packit Service 646995
	actor_delete(thread);
Packit Service 646995
	thread->data = data;
Packit Service 646995
	actor_schedule_private(thread, new_timeout_secs, 0);
Packit Service 646995
}
Packit Service 646995
Packit Service 646995
/*
Packit Service 646995
 * Execute all items that have expired.
Packit Service 646995
 *
Packit Service 646995
 * Set an alarm if items remain. Caller must catch SIGALRM and
Packit Service 646995
 * then re-invoke this function.
Packit Service 646995
 */
Packit Service 646995
void
Packit Service 646995
actor_poll(void)
Packit Service 646995
{
Packit Service 646995
	struct actor *thread, *tmp;
Packit Service 646995
	uint64_t current_time;
Packit Service 646995
	struct timespec tv;
Packit Service 646995
Packit Service 646995
	if (poll_in_progress) {
Packit Service 646995
		log_error("recursive actor_poll() is not allowed");
Packit Service 646995
		return;
Packit Service 646995
	}
Packit Service 646995
Packit Service 646995
	if (clock_gettime(CLOCK_MONOTONIC_COARSE, &tv)) {
Packit Service 646995
		log_error("clock_gettime failed, can't schedule!");
Packit Service 646995
		return;
Packit Service 646995
	}
Packit Service 646995
Packit Service 646995
	current_time = tv.tv_sec;
Packit Service 646995
Packit Service 646995
	/*
Packit Service 646995
	 * Move items that are ripe from pend_list to ready_list.
Packit Service 646995
	 * Actors are in sorted order of ascending run time, so
Packit Service 646995
	 * stop at the first unripe entry.
Packit Service 646995
	 */
Packit Service 646995
	log_debug(7, "current time %" PRIu64, current_time);
Packit Service 646995
Packit Service 646995
	list_for_each_entry_safe(thread, tmp, &pend_list, list) {
Packit Service 646995
		uint64_t time_left = actor_time_left(thread, current_time);
Packit Service 646995
		if (time_left) {
Packit Service 646995
			log_debug(7, "thread %08lx due %" PRIu64 ", wait %" PRIu64 " more",
Packit Service 646995
				  (long)thread,
Packit Service 646995
				  (uint64_t)thread->ttschedule, time_left);
Packit Service 646995
Packit Service 646995
			alarm(time_left);
Packit Service 646995
			break;
Packit Service 646995
		}
Packit Service 646995
Packit Service 646995
		/* This entry can be run now */
Packit Service 646995
		list_del_init(&thread->list);
Packit Service 646995
Packit Service 646995
		log_debug(2, "thread %08lx was scheduled for "
Packit Service 646995
			  "%" PRIu64 ", curtime %" PRIu64 " q_forw %p "
Packit Service 646995
			  "&pend_list %p",
Packit Service 646995
			  (long)thread, (uint64_t)thread->ttschedule,
Packit Service 646995
			  current_time, pend_list.next, &pend_list);
Packit Service 646995
Packit Service 646995
		list_add_tail(&thread->list, &ready_list);
Packit Service 646995
		assert(thread->state == ACTOR_WAITING);
Packit Service 646995
		thread->state = ACTOR_SCHEDULED;
Packit Service 646995
		log_debug(7, "thread %08lx now in ready_list",
Packit Service 646995
			  (long)thread);
Packit Service 646995
	}
Packit Service 646995
Packit Service 646995
	/* Disable alarm if nothing else pending */
Packit Service 646995
	if (list_empty(&pend_list)) {
Packit Service 646995
		log_debug(7, "nothing on pend_list, deactivating alarm");
Packit Service 646995
		alarm(0);
Packit Service 646995
	}
Packit Service 646995
Packit Service 646995
	poll_in_progress = 1;
Packit Service 646995
	while (!list_empty(&ready_list)) {
Packit Service 646995
		thread = list_first_entry(&ready_list, struct actor, list);
Packit Service 646995
		list_del_init(&thread->list);
Packit Service 646995
Packit Service 646995
		if (thread->state != ACTOR_SCHEDULED)
Packit Service 646995
			log_error("ready_list: thread state corrupted! "
Packit Service 646995
				  "Thread with state %d in actor list.",
Packit Service 646995
				  thread->state);
Packit Service 646995
		thread->state = ACTOR_NOTSCHEDULED;
Packit Service 646995
		log_debug(7, "exec thread %08lx callback", (long)thread);
Packit Service 646995
		thread->callback(thread->data);
Packit Service 646995
		log_debug(7, "thread %08lx done", (long)thread);
Packit Service 646995
	}
Packit Service 646995
	poll_in_progress = 0;
Packit Service 646995
}