/* * Copyright (c) 2011 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include #include #ifdef HAVE_GLIB #include static GMainLoop *glib_loop; static qb_array_t *gio_map; #endif /* HAVE_GLIB */ #define ONE_MEG 1048576 static int32_t use_glib = QB_FALSE; static int32_t use_events = QB_FALSE; static qb_loop_t *bms_loop; static qb_ipcs_service_t *s1; static int32_t s1_connection_accept_fn(qb_ipcs_connection_t * c, uid_t uid, gid_t gid) { #if 0 if (uid == 0 && gid == 0) { qb_log(LOG_INFO, "Authenticated connection"); return 1; } qb_log(LOG_NOTICE, "BAD user!"); return 0; #else return 0; #endif } static void s1_connection_created_fn(qb_ipcs_connection_t * c) { struct qb_ipcs_stats srv_stats; qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); qb_log(LOG_INFO, "Connection created (active:%d, closed:%d)", srv_stats.active_connections, srv_stats.closed_connections); } static void s1_connection_destroyed_fn(qb_ipcs_connection_t * c) { qb_log(LOG_INFO, "Connection about to be freed"); } static int32_t s1_connection_closed_fn(qb_ipcs_connection_t * c) { struct qb_ipcs_connection_stats stats; struct qb_ipcs_stats srv_stats; qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); qb_ipcs_connection_stats_get(c, &stats, QB_FALSE); qb_log(LOG_INFO, "Connection to pid:%d destroyed (active:%d, closed:%d)", stats.client_pid, srv_stats.active_connections, srv_stats.closed_connections); qb_log(LOG_DEBUG, " Requests %"PRIu64"", stats.requests); qb_log(LOG_DEBUG, " Responses %"PRIu64"", stats.responses); qb_log(LOG_DEBUG, " Events %"PRIu64"", stats.events); qb_log(LOG_DEBUG, " Send retries %"PRIu64"", stats.send_retries); qb_log(LOG_DEBUG, " Recv retries %"PRIu64"", stats.recv_retries); qb_log(LOG_DEBUG, " FC state %d", stats.flow_control_state); qb_log(LOG_DEBUG, " FC count %"PRIu64"", stats.flow_control_count); return 0; } struct my_req { struct qb_ipc_request_header hdr; char message[256]; }; static int32_t s1_msg_process_fn(qb_ipcs_connection_t * c, void *data, size_t size) { struct qb_ipc_request_header *hdr; struct my_req *req_pt; struct qb_ipc_response_header response; ssize_t res; struct iovec iov[2]; char resp[100]; int32_t sl; int32_t send_ten_events = QB_FALSE; hdr = (struct qb_ipc_request_header *)data; if (hdr->id == (QB_IPC_MSG_USER_START + 1)) { return 0; } req_pt = (struct my_req *)data; qb_log(LOG_DEBUG, "msg received (id:%d, size:%d, data:%s)", req_pt->hdr.id, req_pt->hdr.size, req_pt->message); if (strcmp(req_pt->message, "kill") == 0) { exit(0); } response.size = sizeof(struct qb_ipc_response_header); response.id = 13; response.error = 0; sl = snprintf(resp, 100, "ACK %zu bytes", size) + 1; iov[0].iov_len = sizeof(response); iov[0].iov_base = &response; iov[1].iov_len = sl; iov[1].iov_base = resp; response.size += sl; send_ten_events = (strcmp(req_pt->message, "events") == 0); if (use_events && !send_ten_events) { res = qb_ipcs_event_sendv(c, iov, 2); } else { res = qb_ipcs_response_sendv(c, iov, 2); } if (res < 0) { errno = - res; qb_perror(LOG_ERR, "qb_ipcs_response_send"); } if (send_ten_events) { int32_t i; qb_log(LOG_INFO, "request to send 10 events"); for (i = 0; i < 10; i++) { res = qb_ipcs_event_sendv(c, iov, 2); qb_log(LOG_INFO, "sent event %d res:%d", i, res); } } return 0; } static void sigusr1_handler(int32_t num) { qb_log(LOG_DEBUG, "(%d)", num); qb_ipcs_destroy(s1); exit(0); } static void show_usage(const char *name) { printf("usage: \n"); printf("%s \n", name); printf("\n"); printf(" options:\n"); printf("\n"); printf(" -h show this help text\n"); printf(" -m use shared memory\n"); printf(" -u use unix sockets\n"); printf(" -g use glib mainloop\n"); printf(" -e use events\n"); printf("\n"); } #ifdef HAVE_GLIB struct gio_to_qb_poll { int32_t is_used; int32_t events; int32_t source; int32_t fd; void *data; qb_ipcs_dispatch_fn_t fn; enum qb_loop_priority p; }; static gboolean gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; gint fd = g_io_channel_unix_get_fd(gio); return (adaptor->fn(fd, condition, adaptor->data) == 0); } static void gio_poll_destroy(gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; adaptor->is_used--; if (adaptor->is_used == 0) { qb_log(LOG_DEBUG, "fd %d adaptor destroyed\n", adaptor->fd); adaptor->fd = 0; adaptor->source = 0; } } static int32_t my_g_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn, gboolean is_new) { struct gio_to_qb_poll *adaptor; GIOChannel *channel; int32_t res = 0; res = qb_array_index(gio_map, fd, (void **)&adaptor); if (res < 0) { return res; } if (adaptor->is_used && adaptor->source) { if (is_new) { return -EEXIST; } g_source_remove(adaptor->source); adaptor->source = 0; } channel = g_io_channel_unix_new(fd); if (!channel) { return -ENOMEM; } adaptor->fn = fn; adaptor->events = evts; adaptor->data = data; adaptor->p = p; adaptor->is_used++; adaptor->fd = fd; adaptor->source = g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor, gio_poll_destroy); /* we are handing the channel off to be managed by mainloop now. * remove our reference. */ g_io_channel_unref(channel); return 0; } static int32_t my_g_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return my_g_dispatch_update(p, fd, evts, data, fn, TRUE); } static int32_t my_g_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return my_g_dispatch_update(p, fd, evts, data, fn, FALSE); } static int32_t my_g_dispatch_del(int32_t fd) { struct gio_to_qb_poll *adaptor; if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) { g_source_remove(adaptor->source); adaptor->source = 0; } return 0; } #endif /* HAVE_GLIB */ static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn) { return qb_loop_job_add(bms_loop, p, data, fn); } static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_add(bms_loop, p, fd, evts, data, fn); } static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_mod(bms_loop, p, fd, evts, data, fn); } static int32_t my_dispatch_del(int32_t fd) { return qb_loop_poll_del(bms_loop, fd); } int32_t main(int32_t argc, char *argv[]) { const char *options = "mpseugh"; int32_t opt; int32_t rc; enum qb_ipc_type ipc_type = QB_IPC_NATIVE; struct qb_ipcs_service_handlers sh = { .connection_accept = s1_connection_accept_fn, .connection_created = s1_connection_created_fn, .msg_process = s1_msg_process_fn, .connection_destroyed = s1_connection_destroyed_fn, .connection_closed = s1_connection_closed_fn, }; struct qb_ipcs_poll_handlers ph = { .job_add = my_job_add, .dispatch_add = my_dispatch_add, .dispatch_mod = my_dispatch_mod, .dispatch_del = my_dispatch_del, }; #ifdef HAVE_GLIB struct qb_ipcs_poll_handlers glib_ph = { .job_add = NULL, /* FIXME */ .dispatch_add = my_g_dispatch_add, .dispatch_mod = my_g_dispatch_mod, .dispatch_del = my_g_dispatch_del, }; #endif /* HAVE_GLIB */ while ((opt = getopt(argc, argv, options)) != -1) { switch (opt) { case 'm': ipc_type = QB_IPC_SHM; break; case 'u': ipc_type = QB_IPC_SOCKET; break; case 'g': use_glib = QB_TRUE; break; case 'e': use_events = QB_TRUE; break; case 'h': default: show_usage(argv[0]); exit(0); break; } } signal(SIGINT, sigusr1_handler); qb_log_init("ipcserver", LOG_USER, LOG_TRACE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_format_set(QB_LOG_STDERR, "%f:%l [%p] %b"); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); s1 = qb_ipcs_create("ipcserver", 0, ipc_type, &sh); if (s1 == 0) { qb_perror(LOG_ERR, "qb_ipcs_create"); exit(1); } /* This forces the clients to use a minimum buffer size */ qb_ipcs_enforce_buffer_size(s1, ONE_MEG); if (!use_glib) { bms_loop = qb_loop_create(); qb_ipcs_poll_handlers_set(s1, &ph); rc = qb_ipcs_run(s1); if (rc != 0) { errno = -rc; qb_perror(LOG_ERR, "qb_ipcs_run"); exit(1); } qb_loop_run(bms_loop); } else { #ifdef HAVE_GLIB glib_loop = g_main_loop_new(NULL, FALSE); gio_map = qb_array_create_2(16, sizeof(struct gio_to_qb_poll), 1); qb_ipcs_poll_handlers_set(s1, &glib_ph); rc = qb_ipcs_run(s1); if (rc != 0) { errno = -rc; qb_perror(LOG_ERR, "qb_ipcs_run"); exit(1); } g_main_loop_run(glib_loop); #else qb_log(LOG_ERR, "You don't seem to have glib-devel installed.\n"); #endif } qb_log_fini(); return EXIT_SUCCESS; }