Blame teamd/teamd_zmq.c

Packit cac203
/*
Packit cac203
 *   teamd_zmq.c - Teamd ZeroMQ socket api
Packit cac203
 *   Copyright (C) 2013 Jiri Zupka <jzupka@redhat.com>
Packit cac203
 *
Packit cac203
 *   This library is free software; you can redistribute it and/or
Packit cac203
 *   modify it under the terms of the GNU Lesser General Public
Packit cac203
 *   License as published by the Free Software Foundation; either
Packit cac203
 *   version 2.1 of the License, or (at your option) any later version.
Packit cac203
 *
Packit cac203
 *   This library is distributed in the hope that it will be useful,
Packit cac203
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit cac203
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit cac203
 *   Lesser General Public License for more details.
Packit cac203
 *
Packit cac203
 *   You should have received a copy of the GNU Lesser General Public
Packit cac203
 *   License along with this library; if not, write to the Free Software
Packit cac203
 *   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
Packit cac203
 */
Packit cac203
Packit cac203
#include "config.h"
Packit cac203
Packit cac203
#ifdef ENABLE_ZMQ
Packit cac203
Packit cac203
#include <stdbool.h>
Packit cac203
#include <stdlib.h>
Packit cac203
#include <string.h>
Packit cac203
#include <unistd.h>
Packit cac203
#include <sys/types.h>
Packit cac203
#include <sys/socket.h>
Packit cac203
#include <sys/un.h>
Packit cac203
#include <errno.h>
Packit cac203
#include <ctype.h>
Packit cac203
#include <private/misc.h>
Packit cac203
#include <private/list.h>
Packit cac203
#include <team.h>
Packit cac203
Packit cac203
#include "teamd.h"
Packit cac203
#include "teamd_zmq.h"
Packit cac203
#include "teamd_zmq_common.h"
Packit cac203
#include "teamd_ctl.h"
Packit cac203
#include "teamd_config.h"
Packit cac203
Packit cac203
struct zmq_ops_priv {
Packit cac203
	char *rcv_msg_args;
Packit cac203
	void *sock;
Packit cac203
};
Packit cac203
Packit cac203
struct zmq_acc_conn {
Packit cac203
	struct list_item list;
Packit cac203
	int sock;
Packit cac203
};
Packit cac203
Packit cac203
static int zmq_op_get_args(void *ops_priv, const char *fmt, ...)
Packit cac203
{
Packit cac203
	va_list ap;
Packit cac203
	struct zmq_ops_priv *zmq_ops_priv = ops_priv;
Packit cac203
	char **pstr;
Packit cac203
	char *str;
Packit cac203
	char *rest = zmq_ops_priv->rcv_msg_args;
Packit cac203
	int err = 0;
Packit cac203
Packit cac203
	va_start(ap, fmt);
Packit cac203
	while (*fmt) {
Packit cac203
		switch (*fmt++) {
Packit cac203
		case 's': /* string */
Packit cac203
			pstr = va_arg(ap, char **);
Packit cac203
			str = teamd_zmq_msg_getline(&rest);
Packit cac203
			if (!str) {
Packit cac203
				teamd_log_err("Insufficient number of arguments in message.");
Packit cac203
				err = -EINVAL;
Packit cac203
				goto out;
Packit cac203
			}
Packit cac203
			*pstr = str;
Packit cac203
			break;
Packit cac203
		default:
Packit cac203
			teamd_log_err("Unknown argument type requested");
Packit cac203
			err = -EINVAL;
Packit cac203
			goto out;
Packit cac203
		}
Packit cac203
	}
Packit cac203
out:
Packit cac203
	va_end(ap);
Packit cac203
	return err;
Packit cac203
}
Packit cac203
Packit cac203
static void zmq_custom_send(struct zmq_ops_priv *zmq_ops_priv,
Packit cac203
			    char *buf, size_t buflen)
Packit cac203
{
Packit cac203
	int ret;
Packit cac203
Packit cac203
	ret = zmq_send(zmq_ops_priv->sock, buf, buflen, 0);
Packit cac203
	if (ret == -1)
Packit cac203
		teamd_log_warn("zmq: send failed: %s", strerror(errno));
Packit cac203
}
Packit cac203
Packit cac203
static int zmq_op_reply_err(void *ops_priv, const char *err_code,
Packit cac203
			      const char *err_msg)
Packit cac203
{
Packit cac203
	struct zmq_ops_priv *zmq_ops_priv = ops_priv;
Packit cac203
	char *strbuf;
Packit cac203
	int err;
Packit cac203
Packit cac203
	err = asprintf(&strbuf, "%s\n%s\n%s\n", TEAMD_ZMQ_REPLY_ERR_PREFIX,
Packit cac203
		       err_code, err_msg);
Packit cac203
	if (err == -1)
Packit cac203
		return -ENOMEM;
Packit cac203
	zmq_custom_send(zmq_ops_priv, strbuf, strlen(strbuf));
Packit cac203
	free(strbuf);
Packit cac203
	return 0;
Packit cac203
}
Packit cac203
Packit cac203
static int zmq_op_reply_succ(void *ops_priv, const char *msg)
Packit cac203
{
Packit cac203
	struct zmq_ops_priv *zmq_ops_priv = ops_priv;
Packit cac203
	char *strbuf;
Packit cac203
	int err;
Packit cac203
Packit cac203
	err = asprintf(&strbuf, "%s\n%s", TEAMD_ZMQ_REPLY_SUCC_PREFIX,
Packit cac203
		       msg ? msg : "");
Packit cac203
	if (err == -1)
Packit cac203
		return -ENOMEM;
Packit cac203
	zmq_custom_send(zmq_ops_priv, strbuf, strlen(strbuf));
Packit cac203
	free(strbuf);
Packit cac203
	return 0;
Packit cac203
}
Packit cac203
Packit cac203
static const struct teamd_ctl_method_ops teamd_zmq_ctl_method_ops = {
Packit cac203
	.get_args = zmq_op_get_args,
Packit cac203
	.reply_err = zmq_op_reply_err,
Packit cac203
	.reply_succ = zmq_op_reply_succ,
Packit cac203
};
Packit cac203
Packit cac203
static int process_rcv_msg(struct teamd_context *ctx, char *rcv_msg)
Packit cac203
{
Packit cac203
	struct zmq_ops_priv zmq_ops_priv;
Packit cac203
	char *str;
Packit cac203
	char *rest = rcv_msg;
Packit cac203
Packit cac203
	str = teamd_zmq_msg_getline(&rest);
Packit cac203
	if (!str) {
Packit cac203
		teamd_log_dbg(ctx, "zmq: Incomplete message.");
Packit cac203
		return 0;
Packit cac203
	}
Packit cac203
	if (strcmp(TEAMD_ZMQ_REQUEST_PREFIX, str)) {
Packit cac203
		teamd_log_dbg(ctx, "zmq: Unsupported message type.");
Packit cac203
		return 0;
Packit cac203
	}
Packit cac203
Packit cac203
	str = teamd_zmq_msg_getline(&rest);
Packit cac203
	if (!str) {
Packit cac203
		teamd_log_dbg(ctx, "zmq: Incomplete message.");
Packit cac203
		return 0;
Packit cac203
	}
Packit cac203
	if (!teamd_ctl_method_exists(str)) {
Packit cac203
		teamd_log_dbg(ctx, "zmq: Unknown method \"%s\".", str);
Packit cac203
		return 0;
Packit cac203
	}
Packit cac203
Packit cac203
	zmq_ops_priv.sock = ctx->zmq.sock;
Packit cac203
	zmq_ops_priv.rcv_msg_args = rest;
Packit cac203
Packit cac203
	teamd_log_dbg(ctx, "zmq: calling method \"%s\"", str);
Packit cac203
Packit cac203
	return teamd_ctl_method_call(ctx, str, &teamd_zmq_ctl_method_ops,
Packit cac203
				     &zmq_ops_priv);
Packit cac203
}
Packit cac203
Packit cac203
static int callback_zmq(struct teamd_context *ctx, int events, void *priv)
Packit cac203
{
Packit cac203
	int err = 0;
Packit cac203
	int poolmask;
Packit cac203
	size_t poolmask_size = sizeof(poolmask);
Packit cac203
Packit cac203
	err = zmq_getsockopt(ctx->zmq.sock, ZMQ_EVENTS, &poolmask,
Packit cac203
			     &poolmask_size);
Packit cac203
	if (err == -1)
Packit cac203
		return -errno;
Packit cac203
Packit cac203
	while (poolmask & ZMQ_POLLIN) {
Packit cac203
		zmq_msg_t msg;
Packit cac203
Packit cac203
		zmq_msg_init(&msg;;
Packit cac203
		if (zmq_msg_recv(&msg, ctx->zmq.sock, 0) == -1) {
Packit cac203
			zmq_msg_close(&msg;;
Packit cac203
			return -errno;
Packit cac203
		}
Packit cac203
Packit cac203
		err = process_rcv_msg(ctx, zmq_msg_data(&msg));
Packit cac203
Packit cac203
		zmq_msg_close(&msg;;
Packit cac203
Packit cac203
		if (err == -1)
Packit cac203
			break;
Packit cac203
Packit cac203
		err = zmq_getsockopt(ctx->zmq.sock, ZMQ_EVENTS, &poolmask,
Packit cac203
				     &poolmask_size);
Packit cac203
		if (err == -1)
Packit cac203
			return -errno;
Packit cac203
	}
Packit cac203
	return err;
Packit cac203
}
Packit cac203
Packit cac203
#define ZMQ_MAX_CLIENT_COUNT 10
Packit cac203
Packit cac203
static int teamd_zmq_sock_open(struct teamd_context *ctx)
Packit cac203
{
Packit cac203
	int err;
Packit cac203
	void *context, *sock;
Packit cac203
	int rc;
Packit cac203
	const char *addr;
Packit cac203
Packit cac203
	context = zmq_ctx_new();
Packit cac203
	if (!context) {
Packit cac203
		teamd_log_err("zmq: Failed to create context.");
Packit cac203
		return -errno;
Packit cac203
	}
Packit cac203
Packit cac203
	sock = zmq_socket(context, ZMQ_REP);
Packit cac203
	if (!sock) {
Packit cac203
		teamd_log_err("zmq: Failed to create socket.");
Packit cac203
		return -errno;
Packit cac203
	}
Packit cac203
Packit cac203
	if (ctx->zmq.addr) {
Packit cac203
		addr = ctx->zmq.addr;
Packit cac203
	} else {
Packit cac203
		err = teamd_config_string_get(ctx, &addr, "$.runner.addr");
Packit cac203
		if (err) {
Packit cac203
			teamd_log_err("zmq: Failed to get address from config.");
Packit cac203
			return err;
Packit cac203
		}
Packit cac203
	}
Packit cac203
Packit cac203
	rc = zmq_bind(sock, addr);
Packit cac203
	if (rc != 0) {
Packit cac203
		teamd_log_err("zmq: Failed to bind socket.");
Packit cac203
		err = -errno;
Packit cac203
		goto close_sock;
Packit cac203
	}
Packit cac203
Packit cac203
	ctx->zmq.context = context;
Packit cac203
	ctx->zmq.sock = sock;
Packit cac203
	return 0;
Packit cac203
Packit cac203
close_sock:
Packit cac203
	zmq_close(sock);
Packit cac203
	zmq_ctx_destroy(context);
Packit cac203
	return err;
Packit cac203
}
Packit cac203
Packit cac203
static void teamd_zmq_sock_close(struct teamd_context *ctx)
Packit cac203
{
Packit cac203
	zmq_close(ctx->zmq.sock);
Packit cac203
	zmq_ctx_destroy(ctx->zmq.context);
Packit cac203
}
Packit cac203
Packit cac203
#define ZMQ_CB_NAME "zmq"
Packit cac203
Packit cac203
int teamd_zmq_init(struct teamd_context *ctx)
Packit cac203
{
Packit cac203
	int err;
Packit cac203
	int fd;
Packit cac203
	size_t fd_size;
Packit cac203
Packit cac203
	if (!ctx->zmq.enabled)
Packit cac203
		return 0;
Packit cac203
	err = teamd_zmq_sock_open(ctx);
Packit cac203
	if (err)
Packit cac203
		return err;
Packit cac203
Packit cac203
Packit cac203
	fd_size = sizeof(fd);
Packit cac203
	zmq_getsockopt(ctx->zmq.sock, ZMQ_FD, &fd, &fd_size);
Packit cac203
Packit cac203
	err = teamd_loop_callback_fd_add(ctx, ZMQ_CB_NAME, ctx, callback_zmq,
Packit cac203
					 fd, TEAMD_LOOP_FD_EVENT_READ);
Packit cac203
	if (err)
Packit cac203
		goto sock_close;
Packit cac203
	teamd_loop_callback_enable(ctx, ZMQ_CB_NAME, ctx);
Packit cac203
	return 0;
Packit cac203
sock_close:
Packit cac203
	teamd_zmq_sock_close(ctx);
Packit cac203
	return err;
Packit cac203
}
Packit cac203
Packit cac203
void teamd_zmq_fini(struct teamd_context *ctx)
Packit cac203
{
Packit cac203
	if (!ctx->zmq.enabled)
Packit cac203
		return;
Packit cac203
	teamd_loop_callback_del(ctx, ZMQ_CB_NAME, ctx);
Packit cac203
	teamd_zmq_sock_close(ctx);
Packit cac203
}
Packit cac203
Packit cac203
#endif /* ENABLE_ZMQ */