/* * teamd_workq.c - Teamd work queue * Copyright (C) 2013-2015 Jiri Pirko * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include "teamd_workq.h" #define WORKQ_CB_NAME "workq" static int teamd_workq_callback_socket(struct teamd_context *ctx, int events, void *priv) { struct teamd_workq *workq; struct teamd_workq *tmp; char bytes[16]; int ret; int err; again: ret = read(ctx->workq.pipe_r, bytes, sizeof(bytes)); if (ret == -1) { if (errno == EINTR) goto again; else if (errno != EAGAIN) return -errno; } teamd_loop_callback_disable(ctx, WORKQ_CB_NAME, ctx); list_for_each_node_entry_safe(workq, tmp, &ctx->workq.work_list, list) { list_del(&workq->list); list_init(&workq->list); err = workq->func(ctx, workq); if (err) return err; } return 0; } int teamd_workq_init(struct teamd_context *ctx) { int fds[2]; int err; list_init(&ctx->workq.work_list); err = pipe2(fds, O_NONBLOCK); if (err) return -errno; ctx->workq.pipe_r = fds[0]; ctx->workq.pipe_w = fds[1]; err = teamd_loop_callback_fd_add_tail(ctx, WORKQ_CB_NAME, ctx, teamd_workq_callback_socket, ctx->workq.pipe_r, TEAMD_LOOP_FD_EVENT_READ); if (err) { teamd_log_err("Failed add workq callback."); goto close_pipe; } return 0; close_pipe: close(ctx->workq.pipe_r); close(ctx->workq.pipe_w); return 0; } void teamd_workq_fini(struct teamd_context *ctx) { struct teamd_workq *workq; struct teamd_workq *tmp; teamd_loop_callback_del(ctx, WORKQ_CB_NAME, ctx); close(ctx->workq.pipe_r); close(ctx->workq.pipe_w); list_for_each_node_entry_safe(workq, tmp, &ctx->workq.work_list, list) { list_del(&workq->list); list_init(&workq->list); } } static void teamd_workq_set_for_process(struct teamd_context *ctx) { int err; const char byte = 0; retry: err = write(ctx->workq.pipe_w, &byte, 1); if (err == -1 && errno == EINTR) goto retry; teamd_loop_callback_enable(ctx, WORKQ_CB_NAME, ctx); } void teamd_workq_schedule_work(struct teamd_context *ctx, struct teamd_workq *workq) { if (!list_empty(&workq->list)) return; list_add_tail(&ctx->workq.work_list, &workq->list); teamd_workq_set_for_process(ctx); } void teamd_workq_init_work(struct teamd_workq *workq, teamd_workq_func_t func) { workq->func = func; list_init(&workq->list); }