Blame usr/actor.c

Packit eace71
/*
Packit eace71
 * iSCSI timeout & deferred work handling
Packit eace71
 *
Packit eace71
 * Copyright (C) 2004 Dmitry Yusupov, Alex Aizman
Packit eace71
 * Copyright (C) 2014 Red Hat Inc.
Packit eace71
 * maintained by open-iscsi@googlegroups.com
Packit eace71
 *
Packit eace71
 * This program is free software; you can redistribute it and/or modify
Packit eace71
 * it under the terms of the GNU General Public License as published
Packit eace71
 * by the Free Software Foundation; either version 2 of the License, or
Packit eace71
 * (at your option) any later version.
Packit eace71
 *
Packit eace71
 * This program is distributed in the hope that it will be useful, but
Packit eace71
 * WITHOUT ANY WARRANTY; without even the implied warranty of
Packit eace71
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Packit eace71
 * General Public License for more details.
Packit eace71
 *
Packit eace71
 * See the file COPYING included with this distribution for more details.
Packit eace71
 */
Packit eace71
#include <inttypes.h>
Packit eace71
#include <time.h>
Packit eace71
#include <sys/signalfd.h>
Packit eace71
#include <assert.h>
Packit eace71
#include <unistd.h>
Packit eace71
#include "actor.h"
Packit eace71
#include "log.h"
Packit eace71
#include "list.h"
Packit eace71
Packit eace71
static LIST_HEAD(pend_list);
Packit eace71
static LIST_HEAD(ready_list);
Packit eace71
static volatile int poll_in_progress;
Packit eace71
Packit eace71
static uint64_t
Packit eace71
actor_time_left(actor_t *thread, uint64_t current_time)
Packit eace71
{
Packit eace71
	if (current_time > thread->ttschedule)
Packit eace71
		return 0;
Packit eace71
	else
Packit eace71
		return (thread->ttschedule - current_time);
Packit eace71
}
Packit eace71
Packit eace71
#define time_after(a,b) \
Packit eace71
	((int64_t)(b) - (int64_t)(a) < 0)
Packit eace71
Packit eace71
void
Packit eace71
actor_init(actor_t *thread, void (*callback)(void *), void *data)
Packit eace71
{
Packit eace71
	INIT_LIST_HEAD(&thread->list);
Packit eace71
	thread->state = ACTOR_NOTSCHEDULED;
Packit eace71
	thread->callback = callback;
Packit eace71
	thread->data = data;
Packit eace71
}
Packit eace71
Packit eace71
void
Packit eace71
actor_delete(actor_t *thread)
Packit eace71
{
Packit eace71
	log_debug(7, "thread %08lx delete: state %d", (long)thread,
Packit eace71
			thread->state);
Packit eace71
	switch(thread->state) {
Packit eace71
	case ACTOR_WAITING:
Packit eace71
		/* TODO: remove/reset alarm if we were 1st entry in pend_list */
Packit eace71
		/* priority: low */
Packit eace71
		/* fallthrough */
Packit eace71
	case ACTOR_SCHEDULED:
Packit eace71
		log_debug(1, "deleting a scheduled/waiting thread!");
Packit eace71
		list_del_init(&thread->list);
Packit eace71
		if (list_empty(&pend_list)) {
Packit eace71
			log_debug(7, "nothing left on pend_list, deactivating alarm");
Packit eace71
			alarm(0);
Packit eace71
		}
Packit eace71
Packit eace71
		break;
Packit eace71
	default:
Packit eace71
		break;
Packit eace71
	}
Packit eace71
	thread->state = ACTOR_NOTSCHEDULED;
Packit eace71
}
Packit eace71
Packit eace71
/*
Packit eace71
 * Inserts actor on pend list and sets alarm if new item is
Packit eace71
 * sooner than previous entries.
Packit eace71
 */
Packit eace71
static void
Packit eace71
actor_insert_on_pend_list(actor_t *thread, uint32_t delay_secs)
Packit eace71
{
Packit eace71
	struct actor *orig_head;
Packit eace71
	struct actor *new_head;
Packit eace71
	struct actor *next_thread;
Packit eace71
Packit eace71
	orig_head = list_first_entry_or_null(&pend_list,
Packit eace71
					     struct actor, list);
Packit eace71
Packit eace71
	/* insert new entry in sort order */
Packit eace71
	list_for_each_entry(next_thread, &pend_list, list) {
Packit eace71
		if (time_after(next_thread->ttschedule, thread->ttschedule)) {
Packit eace71
			log_debug(7, "next thread %p due %lld", next_thread,
Packit eace71
			  (long long)next_thread->ttschedule);
Packit eace71
			log_debug(7, "new thread %p is before (%lld), inserting", thread,
Packit eace71
			  (long long)thread->ttschedule);
Packit eace71
Packit eace71
			/* insert new thread before the next thread */
Packit eace71
			__list_add(&thread->list, next_thread->list.prev, &next_thread->list);
Packit eace71
			goto inserted;
Packit eace71
		}
Packit eace71
	}
Packit eace71
Packit eace71
	if (orig_head) {
Packit eace71
		log_debug(7, "last thread %p due %lld", next_thread,
Packit eace71
			  (long long)next_thread->ttschedule);
Packit eace71
		log_debug(7, "new thread %p is after (%lld), inserting at tail", thread,
Packit eace71
			  (long long)thread->ttschedule);
Packit eace71
	}
Packit eace71
	else
Packit eace71
		log_debug(7, "new thread %p due %lld is first item on pend_list", thread,
Packit eace71
			  (long long)thread->ttschedule);
Packit eace71
Packit eace71
	/* Not before any existing entries */
Packit eace71
	list_add_tail(&thread->list, &pend_list);
Packit eace71
Packit eace71
inserted:
Packit eace71
	new_head = list_first_entry(&pend_list, struct actor, list);
Packit eace71
	if (orig_head != new_head) {
Packit eace71
		int result = alarm(delay_secs);
Packit eace71
		log_debug(7, "new alarm set for %d seconds, old alarm %d",
Packit eace71
			  delay_secs, result);
Packit eace71
	}
Packit eace71
}
Packit eace71
Packit eace71
static void
Packit eace71
actor_schedule_private(actor_t *thread, uint32_t delay_secs, int head)
Packit eace71
{
Packit eace71
	time_t current_time;
Packit eace71
Packit eace71
	struct timespec tv;
Packit eace71
Packit eace71
	if (clock_gettime(CLOCK_MONOTONIC_COARSE, &tv)) {
Packit eace71
		log_error("clock_getime failed, can't schedule!");
Packit eace71
		return;
Packit eace71
	}
Packit eace71
Packit eace71
	current_time = tv.tv_sec;
Packit eace71
Packit eace71
	log_debug(7, "thread %p schedule: delay %u state %d",
Packit eace71
		thread, delay_secs, thread->state);
Packit eace71
Packit eace71
	switch(thread->state) {
Packit eace71
	case ACTOR_WAITING:
Packit eace71
		log_error("rescheduling a waiting thread!");
Packit eace71
		list_del(&thread->list);
Packit eace71
		/* fall-through */
Packit eace71
	case ACTOR_NOTSCHEDULED:
Packit eace71
		INIT_LIST_HEAD(&thread->list);
Packit eace71
Packit eace71
		if (delay_secs == 0) {
Packit eace71
			thread->state = ACTOR_SCHEDULED;
Packit eace71
			if (head)
Packit eace71
				list_add(&thread->list, &ready_list);
Packit eace71
			else
Packit eace71
				list_add_tail(&thread->list, &ready_list);
Packit eace71
		} else {
Packit eace71
			thread->state = ACTOR_WAITING;
Packit eace71
			thread->ttschedule = current_time + delay_secs;
Packit eace71
Packit eace71
			actor_insert_on_pend_list(thread, delay_secs);
Packit eace71
		}
Packit eace71
		break;
Packit eace71
	case ACTOR_SCHEDULED:
Packit eace71
		// don't do anything
Packit eace71
		break;
Packit eace71
	case ACTOR_INVALID:
Packit eace71
		log_error("BUG: Trying to schedule a thread that has not been "
Packit eace71
			  "setup. Ignoring sched.");
Packit eace71
		break;
Packit eace71
	}
Packit eace71
Packit eace71
}
Packit eace71
Packit eace71
void
Packit eace71
actor_schedule_head(actor_t *thread)
Packit eace71
{
Packit eace71
	actor_schedule_private(thread, 0, 1);
Packit eace71
}
Packit eace71
Packit eace71
void
Packit eace71
actor_schedule(actor_t *thread)
Packit eace71
{
Packit eace71
	actor_schedule_private(thread, 0, 0);
Packit eace71
}
Packit eace71
Packit eace71
void
Packit eace71
actor_timer(actor_t *thread, uint32_t timeout_secs, void (*callback)(void *),
Packit eace71
	    void *data)
Packit eace71
{
Packit eace71
	actor_init(thread, callback, data);
Packit eace71
	actor_schedule_private(thread, timeout_secs, 0);
Packit eace71
}
Packit eace71
Packit eace71
void
Packit eace71
actor_timer_mod(actor_t *thread, uint32_t new_timeout_secs, void *data)
Packit eace71
{
Packit eace71
	actor_delete(thread);
Packit eace71
	thread->data = data;
Packit eace71
	actor_schedule_private(thread, new_timeout_secs, 0);
Packit eace71
}
Packit eace71
Packit eace71
/*
Packit eace71
 * Execute all items that have expired.
Packit eace71
 *
Packit eace71
 * Set an alarm if items remain. Caller must catch SIGALRM and
Packit eace71
 * then re-invoke this function.
Packit eace71
 */
Packit eace71
void
Packit eace71
actor_poll(void)
Packit eace71
{
Packit eace71
	struct actor *thread, *tmp;
Packit eace71
	uint64_t current_time;
Packit eace71
	struct timespec tv;
Packit eace71
Packit eace71
	if (poll_in_progress) {
Packit eace71
		log_error("recursive actor_poll() is not allowed");
Packit eace71
		return;
Packit eace71
	}
Packit eace71
Packit eace71
	if (clock_gettime(CLOCK_MONOTONIC_COARSE, &tv)) {
Packit eace71
		log_error("clock_gettime failed, can't schedule!");
Packit eace71
		return;
Packit eace71
	}
Packit eace71
Packit eace71
	current_time = tv.tv_sec;
Packit eace71
Packit eace71
	/*
Packit eace71
	 * Move items that are ripe from pend_list to ready_list.
Packit eace71
	 * Actors are in sorted order of ascending run time, so
Packit eace71
	 * stop at the first unripe entry.
Packit eace71
	 */
Packit eace71
	log_debug(7, "current time %" PRIu64, current_time);
Packit eace71
Packit eace71
	list_for_each_entry_safe(thread, tmp, &pend_list, list) {
Packit eace71
		uint64_t time_left = actor_time_left(thread, current_time);
Packit eace71
		if (time_left) {
Packit eace71
			log_debug(7, "thread %08lx due %" PRIu64 ", wait %" PRIu64 " more",
Packit eace71
				  (long)thread, thread->ttschedule, time_left);
Packit eace71
Packit eace71
			alarm(time_left);
Packit eace71
			break;
Packit eace71
		}
Packit eace71
Packit eace71
		/* This entry can be run now */
Packit eace71
		list_del_init(&thread->list);
Packit eace71
Packit eace71
		log_debug(2, "thread %08lx was scheduled for "
Packit eace71
			  "%" PRIu64 ", curtime %" PRIu64 " q_forw %p "
Packit eace71
			  "&pend_list %p",
Packit eace71
			  (long)thread, thread->ttschedule,
Packit eace71
			  current_time, pend_list.next, &pend_list);
Packit eace71
Packit eace71
		list_add_tail(&thread->list, &ready_list);
Packit eace71
		assert(thread->state == ACTOR_WAITING);
Packit eace71
		thread->state = ACTOR_SCHEDULED;
Packit eace71
		log_debug(7, "thread %08lx now in ready_list",
Packit eace71
			  (long)thread);
Packit eace71
	}
Packit eace71
Packit eace71
	/* Disable alarm if nothing else pending */
Packit eace71
	if (list_empty(&pend_list)) {
Packit eace71
		log_debug(7, "nothing on pend_list, deactivating alarm");
Packit eace71
		alarm(0);
Packit eace71
	}
Packit eace71
Packit eace71
	poll_in_progress = 1;
Packit eace71
	while (!list_empty(&ready_list)) {
Packit eace71
		thread = list_first_entry(&ready_list, struct actor, list);
Packit eace71
		list_del_init(&thread->list);
Packit eace71
Packit eace71
		if (thread->state != ACTOR_SCHEDULED)
Packit eace71
			log_error("ready_list: thread state corrupted! "
Packit eace71
				  "Thread with state %d in actor list.",
Packit eace71
				  thread->state);
Packit eace71
		thread->state = ACTOR_NOTSCHEDULED;
Packit eace71
		log_debug(7, "exec thread %08lx callback", (long)thread);
Packit eace71
		thread->callback(thread->data);
Packit eace71
		log_debug(7, "thread %08lx done", (long)thread);
Packit eace71
	}
Packit eace71
	poll_in_progress = 0;
Packit eace71
}