|
Packit |
cac203 |
/*
|
|
Packit |
cac203 |
* teamd_zmq.c - Teamd ZeroMQ socket api
|
|
Packit |
cac203 |
* Copyright (C) 2013 Jiri Zupka <jzupka@redhat.com>
|
|
Packit |
cac203 |
*
|
|
Packit |
cac203 |
* This library is free software; you can redistribute it and/or
|
|
Packit |
cac203 |
* modify it under the terms of the GNU Lesser General Public
|
|
Packit |
cac203 |
* License as published by the Free Software Foundation; either
|
|
Packit |
cac203 |
* version 2.1 of the License, or (at your option) any later version.
|
|
Packit |
cac203 |
*
|
|
Packit |
cac203 |
* This library is distributed in the hope that it will be useful,
|
|
Packit |
cac203 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit |
cac203 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Packit |
cac203 |
* Lesser General Public License for more details.
|
|
Packit |
cac203 |
*
|
|
Packit |
cac203 |
* You should have received a copy of the GNU Lesser General Public
|
|
Packit |
cac203 |
* License along with this library; if not, write to the Free Software
|
|
Packit |
cac203 |
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
Packit |
cac203 |
*/
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
#include "config.h"
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
#ifdef ENABLE_ZMQ
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
#include <stdbool.h>
|
|
Packit |
cac203 |
#include <stdlib.h>
|
|
Packit |
cac203 |
#include <string.h>
|
|
Packit |
cac203 |
#include <unistd.h>
|
|
Packit |
cac203 |
#include <sys/types.h>
|
|
Packit |
cac203 |
#include <sys/socket.h>
|
|
Packit |
cac203 |
#include <sys/un.h>
|
|
Packit |
cac203 |
#include <errno.h>
|
|
Packit |
cac203 |
#include <ctype.h>
|
|
Packit |
cac203 |
#include <private/misc.h>
|
|
Packit |
cac203 |
#include <private/list.h>
|
|
Packit |
cac203 |
#include <team.h>
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
#include "teamd.h"
|
|
Packit |
cac203 |
#include "teamd_zmq.h"
|
|
Packit |
cac203 |
#include "teamd_zmq_common.h"
|
|
Packit |
cac203 |
#include "teamd_ctl.h"
|
|
Packit |
cac203 |
#include "teamd_config.h"
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
struct zmq_ops_priv {
|
|
Packit |
cac203 |
char *rcv_msg_args;
|
|
Packit |
cac203 |
void *sock;
|
|
Packit |
cac203 |
};
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
struct zmq_acc_conn {
|
|
Packit |
cac203 |
struct list_item list;
|
|
Packit |
cac203 |
int sock;
|
|
Packit |
cac203 |
};
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
static int zmq_op_get_args(void *ops_priv, const char *fmt, ...)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
va_list ap;
|
|
Packit |
cac203 |
struct zmq_ops_priv *zmq_ops_priv = ops_priv;
|
|
Packit |
cac203 |
char **pstr;
|
|
Packit |
cac203 |
char *str;
|
|
Packit |
cac203 |
char *rest = zmq_ops_priv->rcv_msg_args;
|
|
Packit |
cac203 |
int err = 0;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
va_start(ap, fmt);
|
|
Packit |
cac203 |
while (*fmt) {
|
|
Packit |
cac203 |
switch (*fmt++) {
|
|
Packit |
cac203 |
case 's': /* string */
|
|
Packit |
cac203 |
pstr = va_arg(ap, char **);
|
|
Packit |
cac203 |
str = teamd_zmq_msg_getline(&rest);
|
|
Packit |
cac203 |
if (!str) {
|
|
Packit |
cac203 |
teamd_log_err("Insufficient number of arguments in message.");
|
|
Packit |
cac203 |
err = -EINVAL;
|
|
Packit |
cac203 |
goto out;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
*pstr = str;
|
|
Packit |
cac203 |
break;
|
|
Packit |
cac203 |
default:
|
|
Packit |
cac203 |
teamd_log_err("Unknown argument type requested");
|
|
Packit |
cac203 |
err = -EINVAL;
|
|
Packit |
cac203 |
goto out;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
out:
|
|
Packit |
cac203 |
va_end(ap);
|
|
Packit |
cac203 |
return err;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
static void zmq_custom_send(struct zmq_ops_priv *zmq_ops_priv,
|
|
Packit |
cac203 |
char *buf, size_t buflen)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
int ret;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
ret = zmq_send(zmq_ops_priv->sock, buf, buflen, 0);
|
|
Packit |
cac203 |
if (ret == -1)
|
|
Packit |
cac203 |
teamd_log_warn("zmq: send failed: %s", strerror(errno));
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
static int zmq_op_reply_err(void *ops_priv, const char *err_code,
|
|
Packit |
cac203 |
const char *err_msg)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
struct zmq_ops_priv *zmq_ops_priv = ops_priv;
|
|
Packit |
cac203 |
char *strbuf;
|
|
Packit |
cac203 |
int err;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
err = asprintf(&strbuf, "%s\n%s\n%s\n", TEAMD_ZMQ_REPLY_ERR_PREFIX,
|
|
Packit |
cac203 |
err_code, err_msg);
|
|
Packit |
cac203 |
if (err == -1)
|
|
Packit |
cac203 |
return -ENOMEM;
|
|
Packit |
cac203 |
zmq_custom_send(zmq_ops_priv, strbuf, strlen(strbuf));
|
|
Packit |
cac203 |
free(strbuf);
|
|
Packit |
cac203 |
return 0;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
static int zmq_op_reply_succ(void *ops_priv, const char *msg)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
struct zmq_ops_priv *zmq_ops_priv = ops_priv;
|
|
Packit |
cac203 |
char *strbuf;
|
|
Packit |
cac203 |
int err;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
err = asprintf(&strbuf, "%s\n%s", TEAMD_ZMQ_REPLY_SUCC_PREFIX,
|
|
Packit |
cac203 |
msg ? msg : "");
|
|
Packit |
cac203 |
if (err == -1)
|
|
Packit |
cac203 |
return -ENOMEM;
|
|
Packit |
cac203 |
zmq_custom_send(zmq_ops_priv, strbuf, strlen(strbuf));
|
|
Packit |
cac203 |
free(strbuf);
|
|
Packit |
cac203 |
return 0;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
static const struct teamd_ctl_method_ops teamd_zmq_ctl_method_ops = {
|
|
Packit |
cac203 |
.get_args = zmq_op_get_args,
|
|
Packit |
cac203 |
.reply_err = zmq_op_reply_err,
|
|
Packit |
cac203 |
.reply_succ = zmq_op_reply_succ,
|
|
Packit |
cac203 |
};
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
static int process_rcv_msg(struct teamd_context *ctx, char *rcv_msg)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
struct zmq_ops_priv zmq_ops_priv;
|
|
Packit |
cac203 |
char *str;
|
|
Packit |
cac203 |
char *rest = rcv_msg;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
str = teamd_zmq_msg_getline(&rest);
|
|
Packit |
cac203 |
if (!str) {
|
|
Packit |
cac203 |
teamd_log_dbg(ctx, "zmq: Incomplete message.");
|
|
Packit |
cac203 |
return 0;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
if (strcmp(TEAMD_ZMQ_REQUEST_PREFIX, str)) {
|
|
Packit |
cac203 |
teamd_log_dbg(ctx, "zmq: Unsupported message type.");
|
|
Packit |
cac203 |
return 0;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
str = teamd_zmq_msg_getline(&rest);
|
|
Packit |
cac203 |
if (!str) {
|
|
Packit |
cac203 |
teamd_log_dbg(ctx, "zmq: Incomplete message.");
|
|
Packit |
cac203 |
return 0;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
if (!teamd_ctl_method_exists(str)) {
|
|
Packit |
cac203 |
teamd_log_dbg(ctx, "zmq: Unknown method \"%s\".", str);
|
|
Packit |
cac203 |
return 0;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
zmq_ops_priv.sock = ctx->zmq.sock;
|
|
Packit |
cac203 |
zmq_ops_priv.rcv_msg_args = rest;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
teamd_log_dbg(ctx, "zmq: calling method \"%s\"", str);
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
return teamd_ctl_method_call(ctx, str, &teamd_zmq_ctl_method_ops,
|
|
Packit |
cac203 |
&zmq_ops_priv);
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
static int callback_zmq(struct teamd_context *ctx, int events, void *priv)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
int err = 0;
|
|
Packit |
cac203 |
int poolmask;
|
|
Packit |
cac203 |
size_t poolmask_size = sizeof(poolmask);
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
err = zmq_getsockopt(ctx->zmq.sock, ZMQ_EVENTS, &poolmask,
|
|
Packit |
cac203 |
&poolmask_size);
|
|
Packit |
cac203 |
if (err == -1)
|
|
Packit |
cac203 |
return -errno;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
while (poolmask & ZMQ_POLLIN) {
|
|
Packit |
cac203 |
zmq_msg_t msg;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
zmq_msg_init(&msg;;
|
|
Packit |
cac203 |
if (zmq_msg_recv(&msg, ctx->zmq.sock, 0) == -1) {
|
|
Packit |
cac203 |
zmq_msg_close(&msg;;
|
|
Packit |
cac203 |
return -errno;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
err = process_rcv_msg(ctx, zmq_msg_data(&msg));
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
zmq_msg_close(&msg;;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
if (err == -1)
|
|
Packit |
cac203 |
break;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
err = zmq_getsockopt(ctx->zmq.sock, ZMQ_EVENTS, &poolmask,
|
|
Packit |
cac203 |
&poolmask_size);
|
|
Packit |
cac203 |
if (err == -1)
|
|
Packit |
cac203 |
return -errno;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
return err;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
#define ZMQ_MAX_CLIENT_COUNT 10
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
static int teamd_zmq_sock_open(struct teamd_context *ctx)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
int err;
|
|
Packit |
cac203 |
void *context, *sock;
|
|
Packit |
cac203 |
int rc;
|
|
Packit |
cac203 |
const char *addr;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
context = zmq_ctx_new();
|
|
Packit |
cac203 |
if (!context) {
|
|
Packit |
cac203 |
teamd_log_err("zmq: Failed to create context.");
|
|
Packit |
cac203 |
return -errno;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
sock = zmq_socket(context, ZMQ_REP);
|
|
Packit |
cac203 |
if (!sock) {
|
|
Packit |
cac203 |
teamd_log_err("zmq: Failed to create socket.");
|
|
Packit |
cac203 |
return -errno;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
if (ctx->zmq.addr) {
|
|
Packit |
cac203 |
addr = ctx->zmq.addr;
|
|
Packit |
cac203 |
} else {
|
|
Packit |
cac203 |
err = teamd_config_string_get(ctx, &addr, "$.runner.addr");
|
|
Packit |
cac203 |
if (err) {
|
|
Packit |
cac203 |
teamd_log_err("zmq: Failed to get address from config.");
|
|
Packit |
cac203 |
return err;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
rc = zmq_bind(sock, addr);
|
|
Packit |
cac203 |
if (rc != 0) {
|
|
Packit |
cac203 |
teamd_log_err("zmq: Failed to bind socket.");
|
|
Packit |
cac203 |
err = -errno;
|
|
Packit |
cac203 |
goto close_sock;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
ctx->zmq.context = context;
|
|
Packit |
cac203 |
ctx->zmq.sock = sock;
|
|
Packit |
cac203 |
return 0;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
close_sock:
|
|
Packit |
cac203 |
zmq_close(sock);
|
|
Packit |
cac203 |
zmq_ctx_destroy(context);
|
|
Packit |
cac203 |
return err;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
static void teamd_zmq_sock_close(struct teamd_context *ctx)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
zmq_close(ctx->zmq.sock);
|
|
Packit |
cac203 |
zmq_ctx_destroy(ctx->zmq.context);
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
#define ZMQ_CB_NAME "zmq"
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
int teamd_zmq_init(struct teamd_context *ctx)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
int err;
|
|
Packit |
cac203 |
int fd;
|
|
Packit |
cac203 |
size_t fd_size;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
if (!ctx->zmq.enabled)
|
|
Packit |
cac203 |
return 0;
|
|
Packit |
cac203 |
err = teamd_zmq_sock_open(ctx);
|
|
Packit |
cac203 |
if (err)
|
|
Packit |
cac203 |
return err;
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
fd_size = sizeof(fd);
|
|
Packit |
cac203 |
zmq_getsockopt(ctx->zmq.sock, ZMQ_FD, &fd, &fd_size);
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
err = teamd_loop_callback_fd_add(ctx, ZMQ_CB_NAME, ctx, callback_zmq,
|
|
Packit |
cac203 |
fd, TEAMD_LOOP_FD_EVENT_READ);
|
|
Packit |
cac203 |
if (err)
|
|
Packit |
cac203 |
goto sock_close;
|
|
Packit |
cac203 |
teamd_loop_callback_enable(ctx, ZMQ_CB_NAME, ctx);
|
|
Packit |
cac203 |
return 0;
|
|
Packit |
cac203 |
sock_close:
|
|
Packit |
cac203 |
teamd_zmq_sock_close(ctx);
|
|
Packit |
cac203 |
return err;
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
void teamd_zmq_fini(struct teamd_context *ctx)
|
|
Packit |
cac203 |
{
|
|
Packit |
cac203 |
if (!ctx->zmq.enabled)
|
|
Packit |
cac203 |
return;
|
|
Packit |
cac203 |
teamd_loop_callback_del(ctx, ZMQ_CB_NAME, ctx);
|
|
Packit |
cac203 |
teamd_zmq_sock_close(ctx);
|
|
Packit |
cac203 |
}
|
|
Packit |
cac203 |
|
|
Packit |
cac203 |
#endif /* ENABLE_ZMQ */
|