/*
* Soft: Keepalived is a failover program for the LVS project
* <www.linuxvirtualserver.org>. It monitor & manipulate
* a loadbalanced server pool using multi-layer checks.
*
* Part: Scheduling framework for bfd code
*
* Author: Ilya Voronin, <ivoronin@gmail.com>
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version
* 2 of the License, or (at your option) any later version.
*
* Copyright (C) 2015-2017 Alexandre Cassen, <acassen@gmail.com>
*/
#include "config.h"
#include <sys/socket.h>
#include <unistd.h>
#include <netdb.h>
#include <inttypes.h>
#include <stdio.h>
#include <fcntl.h>
#include <stdlib.h>
#include "bfd.h"
#include "bfd_data.h"
#include "bfd_scheduler.h"
#include "bfd_event.h"
#include "parser.h"
#include "logger.h"
#include "memory.h"
#include "main.h"
#include "bitops.h"
#include "utils.h"
#include "signals.h"
#include "assert_debug.h"
/* Locals */
static int bfd_send_packet(int, bfdpkt_t *, bool);
static void bfd_sender_schedule(bfd_t *);
static void bfd_state_down(bfd_t *, uint8_t diag);
static void bfd_state_admindown(bfd_t *);
static void bfd_state_up(bfd_t *);
static void bfd_dump_timers(FILE *fp, bfd_t *);
/*
* Session sender thread
*
* Runs every local_tx_intv, or after reception of a packet
* with Poll bit set
*/
inline static unsigned long
thread_time_to_wakeup(thread_ref_t thread)
{
struct timeval tmp_time;
/* Make sure thread->sands is not in the past */
if (thread->sands.tv_sec < time_now.tv_sec ||
(thread->sands.tv_sec == time_now.tv_sec &&
thread->sands.tv_usec <= time_now.tv_usec))
return 1;
timersub(&thread->sands, &time_now, &tmp_time);
return timer_long(tmp_time);
}
/* Sends one BFD control packet and reschedules itself if needed */
static void
bfd_sender_thread(thread_ref_t thread)
{
bfd_t *bfd;
bfdpkt_t pkt;
assert(thread);
bfd = THREAD_ARG(thread);
assert(bfd);
assert(!BFD_ISADMINDOWN(bfd));
if (thread->type != THREAD_EVENT)
bfd->thread_out = NULL;
bfd_build_packet(&pkt, bfd, bfd_buffer, BFD_BUFFER_SIZE);
if (bfd_send_packet(bfd->fd_out, &pkt, !bfd->send_error) == -1) {
if (!bfd->send_error) {
log_message(LOG_ERR, "BFD_Instance(%s) Error sending packet", bfd->iname);
bfd->send_error = true;
}
} else
bfd->send_error = false;
/* Reset final flag if set */
bfd->final = 0;
/* Schedule next run if not called as an event thread */
if (thread->type != THREAD_EVENT)
bfd_sender_schedule(bfd);
}
/* Schedules bfd_sender_thread to run in local_tx_intv minus applied jitter */
static uint32_t
get_jitter(bfd_t * bfd)
{
uint32_t min_jitter;
/*
* RFC5880:
* The periodic transmission of BFD Control packets MUST be jittered
* on a per-packet basis by up to 25%, that is, the interval MUST be
* reduced by a random value of 0 to 25% <...>
*
* If bfd.DetectMult is equal to 1, the interval between transmitted
* BFD Control packets MUST be no more than 90% of the negotiated
* transmission interval, and MUST be no less than 75% of the
* negotiated transmission interval.
*/
if (bfd->local_detect_mult)
min_jitter = bfd->local_tx_intv / 10; /* 10% <=> / 10 */
else
min_jitter = 0;
return rand_intv(min_jitter, bfd->local_tx_intv / 4); /* 25% <=> / 4 */
}
/* Schedules bfd_sender_thread to run in local_tx_intv minus applied jitter */
static void
bfd_sender_schedule(bfd_t *bfd)
{
assert(bfd);
assert(!bfd->thread_out);
bfd->thread_out =
thread_add_timer(master, bfd_sender_thread, bfd,
bfd->local_tx_intv - get_jitter(bfd));
}
/* Cancels bfd_sender_thread run */
static void
bfd_sender_cancel(bfd_t *bfd)
{
assert(bfd);
assert(bfd->thread_out);
thread_cancel(bfd->thread_out);
bfd->thread_out = NULL;
}
/* Reschedules bfd_sender_thread run (usually after local_tx_intv change) */
static void
bfd_sender_reschedule(bfd_t *bfd)
{
assert(bfd);
assert(bfd->thread_out);
timer_thread_update_timeout(bfd->thread_out, bfd->local_tx_intv - get_jitter(bfd));
}
/* Returns 1 if bfd_sender_thread is scheduled to run, 0 otherwise */
static int __attribute__ ((pure))
bfd_sender_scheduled(bfd_t *bfd)
{
assert(bfd);
return bfd->thread_out != NULL;
}
/* Suspends sender thread. Needs freshly updated time_now */
static void
bfd_sender_suspend(bfd_t * bfd)
{
assert(bfd);
assert(bfd->thread_out);
assert(bfd->sands_out == TIMER_NEVER);
bfd->sands_out = thread_time_to_wakeup(bfd->thread_out);
bfd_sender_cancel(bfd);
}
/* Resumes sender thread */
static void
bfd_sender_resume(bfd_t *bfd)
{
assert(bfd);
assert(!bfd->thread_out);
assert(bfd->sands_out != TIMER_NEVER);
if (!bfd->passive || bfd->local_state == BFD_STATE_UP)
bfd->thread_out =
thread_add_timer(master, bfd_sender_thread, bfd, bfd->sands_out);
bfd->sands_out = TIMER_NEVER;
}
/* Returns 1 if bfd_sender_thread is suspended, 0 otherwise */
static int __attribute__ ((pure))
bfd_sender_suspended(bfd_t *bfd)
{
assert(bfd);
return bfd->sands_out != TIMER_NEVER;
}
static void
bfd_sender_discard(bfd_t *bfd)
{
assert(bfd);
assert(bfd->sands_out != TIMER_NEVER);
bfd->sands_out = TIMER_NEVER;
}
/*
* Session expiration thread
*
* Runs after local_detect_time has passed since receipt of last
* BFD control packet from neighbor
*/
/* Marks session as down because of Control Detection Time Expiration */
static void
bfd_expire_thread(thread_ref_t thread)
{
bfd_t *bfd;
uint32_t dead_time, overdue_time;
timeval_t dead_time_tv;
assert(thread);
bfd = THREAD_ARG(thread);
assert(bfd);
/* Session cannot expire while not in Up or Init states */
assert(BFD_ISUP(bfd) || BFD_ISINIT(bfd));
bfd->thread_exp = NULL;
/* Time since last received control packet */
timersub(&time_now, &bfd->last_seen, &dead_time_tv);
dead_time = timer_long(dead_time_tv);
/* Difference between expected and actual failure detection time */
overdue_time = dead_time - bfd->local_detect_time;
if (bfd->local_state == BFD_STATE_UP ||
__test_bit(LOG_EXTRA_DETAIL_BIT, &debug))
log_message(LOG_WARNING, "BFD_Instance(%s) Expired after"
" %" PRIu32 " ms (%" PRIu32 " usec overdue)",
bfd->iname, dead_time / 1000, overdue_time);
/*
* RFC5880:
* <...> If a period of a Detection Time passes without the
* receipt of a valid, authenticated BFD packet from the remote
* system, this <bfd.RemoteDiscr> variable MUST be set to zero.
*/
bfd->remote_discr = 0;
bfd_state_down(bfd, BFD_DIAG_EXPIRED);
}
/* Schedules bfd_expire_thread to run in local_detect_time */
static void
bfd_expire_schedule(bfd_t *bfd)
{
assert(bfd);
assert(!bfd->thread_exp);
bfd->thread_exp =
thread_add_timer(master, bfd_expire_thread, bfd,
bfd->local_detect_time);
}
/* Cancels bfd_expire_thread run */
static void
bfd_expire_cancel(bfd_t *bfd)
{
assert(bfd);
assert(bfd->thread_exp);
thread_cancel(bfd->thread_exp);
bfd->thread_exp = NULL;
}
/* Reschedules bfd_expire_thread run (usually after control packet receipt) */
static void
bfd_expire_reschedule(bfd_t *bfd)
{
assert(bfd);
assert(bfd->thread_exp);
timer_thread_update_timeout(bfd->thread_exp, bfd->local_detect_time);
}
/* Returns 1 if bfd_expire_thread is scheduled to run, 0 otherwise */
static int __attribute__ ((pure))
bfd_expire_scheduled(bfd_t *bfd)
{
assert(bfd);
return bfd->thread_exp != NULL;
}
/* Suspends expire thread. Needs freshly updated time_now */
static void
bfd_expire_suspend(bfd_t *bfd)
{
assert(bfd);
assert(bfd->thread_exp);
assert(bfd->sands_exp == TIMER_NEVER);
bfd->sands_exp = thread_time_to_wakeup(bfd->thread_exp);
bfd_expire_cancel(bfd);
}
/* Resumes expire thread */
static void
bfd_expire_resume(bfd_t *bfd)
{
assert(bfd);
assert(!bfd->thread_exp);
assert(bfd->sands_exp != TIMER_NEVER);
bfd->thread_exp =
thread_add_timer(master, bfd_expire_thread, bfd, bfd->sands_exp);
bfd->sands_exp = TIMER_NEVER;
}
/* Returns 1 if bfd_expire_thread is suspended, 0 otherwise */
static int __attribute__ ((pure))
bfd_expire_suspended(bfd_t *bfd)
{
assert(bfd);
return bfd->sands_exp != TIMER_NEVER;
}
static void
bfd_expire_discard(bfd_t *bfd)
{
assert(bfd);
assert(bfd->sands_exp != TIMER_NEVER);
bfd->sands_exp = TIMER_NEVER;
}
/*
* Session reset thread
*
* Runs after local_detect_time has passed after BFD session
* gone to Down state.
*/
/* Resets BFD session to initial state */
static void
bfd_reset_thread(thread_ref_t thread)
{
bfd_t *bfd;
assert(thread);
bfd = THREAD_ARG(thread);
assert(bfd);
assert(bfd->thread_rst);
bfd->thread_rst = NULL;
bfd_reset_state(bfd);
}
/* Schedules bfd_reset_thread to run in local_detect_time */
static void
bfd_reset_schedule(bfd_t * bfd)
{
assert(bfd);
assert(!bfd->thread_rst);
bfd->thread_rst =
thread_add_timer(master, bfd_reset_thread, bfd,
bfd->local_detect_time);
}
/* Cancels bfd_reset_thread run */
static void
bfd_reset_cancel(bfd_t *bfd)
{
assert(bfd);
assert(bfd->thread_rst);
thread_cancel(bfd->thread_rst);
bfd->thread_rst = NULL;
}
/* Returns 1 if bfd_reset_thread is scheduled to run, 0 otherwise */
static int __attribute__ ((pure))
bfd_reset_scheduled(bfd_t *bfd)
{
assert(bfd);
return bfd->thread_rst != NULL;
}
/* Suspends reset thread. Needs freshly updated time_now */
static void
bfd_reset_suspend(bfd_t *bfd)
{
assert(bfd);
assert(bfd->thread_rst);
assert(bfd->sands_rst == TIMER_NEVER);
bfd->sands_rst = thread_time_to_wakeup(bfd->thread_rst);
bfd_reset_cancel(bfd);
}
/* Resumes reset thread */
static void
bfd_reset_resume(bfd_t *bfd)
{
assert(bfd);
assert(!bfd->thread_rst);
assert(bfd->sands_rst != TIMER_NEVER);
bfd->thread_rst =
thread_add_timer(master, bfd_reset_thread, bfd, bfd->sands_rst);
bfd->sands_rst = TIMER_NEVER;
}
/* Returns 1 if bfd_reset_thread is suspended, 0 otherwise */
static int __attribute__ ((pure))
bfd_reset_suspended(bfd_t *bfd)
{
assert(bfd);
return bfd->sands_rst != TIMER_NEVER;
}
static void
bfd_reset_discard(bfd_t *bfd)
{
assert(bfd);
assert(bfd->sands_rst != TIMER_NEVER);
bfd->sands_rst = TIMER_NEVER;
}
/*
* State change handlers
*/
/* Common actions for Down and AdminDown states */
static void
bfd_state_fall(bfd_t *bfd, bool send_event)
{
assert(bfd);
/*
* RFC5880:
* When bfd.SessionState is not Up, the system MUST set
* bfd.DesiredMinTxInterval to a value of not less than
* one second (1,000,000 microseconds)
*/
bfd_idle_local_tx_intv(bfd);
if (bfd_expire_scheduled(bfd))
bfd_expire_cancel(bfd);
if (send_event &&
bfd->remote_state != BFD_STATE_ADMINDOWN)
bfd_event_send(bfd);
}
/* Runs when BFD session state goes Down */
static void
bfd_state_down(bfd_t *bfd, uint8_t diag)
{
assert(bfd);
assert(BFD_VALID_DIAG(diag));
int old_state = bfd->local_state;
if (bfd->local_state == BFD_STATE_UP)
bfd->local_discr = bfd_get_random_discr(bfd_data);
if (bfd->local_state == BFD_STATE_UP ||
__test_bit(LOG_EXTRA_DETAIL_BIT, &debug))
log_message(LOG_WARNING, "BFD_Instance(%s) Entering %s state"
" (Local diagnostic - %s, Remote diagnostic - %s)",
bfd->iname, BFD_STATE_STR(BFD_STATE_DOWN),
BFD_DIAG_STR(diag),
BFD_DIAG_STR(bfd->remote_diag));
bfd->local_state = BFD_STATE_DOWN;
bfd->local_diag = diag;
bfd_reset_schedule(bfd);
if (bfd->passive && bfd_sender_scheduled(bfd))
bfd_sender_cancel(bfd);
bfd_state_fall(bfd, old_state == BFD_STATE_UP);
}
/* Runs when BFD session state goes AdminDown */
static void
bfd_state_admindown(bfd_t *bfd)
{
assert(bfd);
bfd->local_state = BFD_STATE_ADMINDOWN;
bfd->local_diag = BFD_DIAG_ADMIN_DOWN;
if (bfd_sender_scheduled(bfd))
bfd_sender_cancel(bfd);
log_message(LOG_WARNING, "BFD_Instance(%s) Entering %s state",
bfd->iname, BFD_STATE_STR(bfd->local_state));
bfd_state_fall(bfd, false);
}
/* Common actions for Init and Up states */
static void
bfd_state_rise(bfd_t *bfd)
{
/* RFC5880 doesn't state if this must be done or not */
bfd->local_diag = BFD_DIAG_NO_DIAG;
if (bfd->local_state == BFD_STATE_UP ||
__test_bit(LOG_EXTRA_DETAIL_BIT, &debug))
log_message(LOG_INFO, "BFD_Instance(%s) Entering %s state",
bfd->iname, BFD_STATE_STR(bfd->local_state));
if (bfd_reset_scheduled(bfd))
bfd_reset_cancel(bfd);
if (!bfd_expire_scheduled(bfd))
bfd_expire_schedule(bfd);
}
/* Runs when BFD session state goes Up */
static void
bfd_state_up(bfd_t *bfd)
{
assert(bfd);
bfd->local_state = BFD_STATE_UP;
bfd_state_rise(bfd);
if (bfd->local_idle_tx_intv != bfd->local_min_tx_intv)
bfd_set_poll(bfd);
bfd_event_send(bfd);
}
/* Runs when BFD session state goes Init */
static void
bfd_state_init(bfd_t *bfd)
{
assert(bfd);
/* According to RFC5880 session cannot directly
transition from Init to Up state */
assert(!BFD_ISUP(bfd));
bfd->local_state = BFD_STATE_INIT;
bfd_state_rise(bfd);
if (bfd->passive && !bfd_sender_scheduled(bfd))
bfd_sender_schedule(bfd);
}
/* Dumps current timers values */
static void
bfd_dump_timers(FILE *fp, bfd_t *bfd)
{
assert(bfd);
conf_write(fp, "BFD_Instance(%s)"
" --------------< Session parameters >-------------",
bfd->iname);
conf_write(fp, "BFD_Instance(%s)"
" min_tx min_rx tx_intv mult detect_time",
bfd->iname);
conf_write(fp, "BFD_Instance(%s)"
" local %7u %7u %8u %5u %12" PRIu64,
bfd->iname, (bfd->local_state == BFD_STATE_UP ? bfd->local_min_tx_intv : bfd->local_idle_tx_intv) / 1000,
bfd->local_min_rx_intv / 1000,
bfd->local_tx_intv / 1000, bfd->local_detect_mult,
bfd->local_detect_time / 1000);
conf_write(fp, "BFD_Instance(%s)" " remote %6u %7u %8u %5u %12" PRIu64,
bfd->iname, bfd->remote_min_tx_intv / 1000,
bfd->remote_min_rx_intv / 1000,
bfd->remote_tx_intv / 1000, bfd->remote_detect_mult,
bfd->remote_detect_time / 1000);
}
/*
* Packet handling functions
*/
/* Sends a control packet to the neighbor (called from bfd_sender_thread)
returns -1 on error */
static int
bfd_send_packet(int fd, bfdpkt_t *pkt, bool log_error)
{
int ret;
socklen_t dstlen;
assert(fd >= 0);
assert(pkt);
if (pkt->dst_addr.ss_family == AF_INET)
dstlen = sizeof (struct sockaddr_in);
else
dstlen = sizeof (struct sockaddr_in6);
ret =
sendto(fd, pkt->buf, pkt->len, 0,
(struct sockaddr *) &pkt->dst_addr, dstlen);
if (ret == -1 && log_error)
log_message(LOG_ERR, "sendto() error (%m)");
return ret;
}
/* Handles incoming control packet (called from bfd_receiver_thread) and
processes it through a BFD state machine. */
static void
bfd_handle_packet(bfdpkt_t *pkt)
{
uint32_t old_local_tx_intv;
uint32_t old_remote_rx_intv;
uint32_t old_remote_tx_intv;
uint8_t old_remote_detect_mult;
uint64_t old_local_detect_time;
bfd_t *bfd;
assert(pkt);
assert(pkt->hdr);
/* Perform sanity checks on a packet */
if (bfd_check_packet(pkt)) {
if (__test_bit(LOG_DETAIL_BIT, &debug))
log_message(LOG_ERR,
"Discarding bogus packet from %s",
inet_sockaddrtopair(&pkt->src_addr));
return;
}
/* Lookup session */
if (!pkt->hdr->remote_discr)
bfd = find_bfd_by_addr(&pkt->src_addr, &pkt->dst_addr);
else
bfd = find_bfd_by_discr(ntohl(pkt->hdr->remote_discr));
if (!bfd) {
if (__test_bit(LOG_DETAIL_BIT, &debug))
log_message(LOG_ERR, "Discarding packet from %s"
" (session is not found - your"
" discriminator field is %u)",
inet_sockaddrtopair(&pkt->src_addr),
pkt->hdr->remote_discr);
return;
}
/* We can't check the TTL any earlier, since we need to know what
* is configured for this particular instance */
if (bfd->max_hops != UCHAR_MAX && bfd_check_packet_ttl(pkt, bfd))
return;
/* Authentication is not supported for now */
if (pkt->hdr->auth != 0) {
if (__test_bit(LOG_DETAIL_BIT, &debug))
log_message(LOG_ERR, "Discarding packet from %s"
" (auth bit is set, but no authentication"
" is in use)",
inet_sockaddrtopair(&pkt->src_addr));
return;
}
/* Discard all packets while in AdminDown state */
if (bfd->local_state == BFD_STATE_ADMINDOWN) {
if (__test_bit(LOG_DETAIL_BIT, &debug))
log_message(LOG_INFO, "Discarding packet from %s"
" (session is in AdminDown state)",
inet_sockaddrtopair(&pkt->src_addr));
return;
}
/* Save old timers */
old_remote_rx_intv = bfd->remote_min_rx_intv;
old_remote_tx_intv = bfd->remote_min_tx_intv;
old_remote_detect_mult = bfd->remote_detect_mult;
old_local_detect_time = bfd->local_detect_time ;
old_local_tx_intv = bfd->local_tx_intv ;
/* Update state variables */
bfd->remote_discr = ntohl(pkt->hdr->local_discr);
bfd->remote_state = pkt->hdr->state;
bfd->remote_diag = pkt->hdr->diag;
bfd->remote_min_rx_intv = ntohl(pkt->hdr->min_rx_intv);
bfd->remote_min_tx_intv = ntohl(pkt->hdr->min_tx_intv);
bfd->remote_demand = pkt->hdr->demand;
bfd->remote_detect_mult = pkt->hdr->detect_mult;
/* Terminate poll sequence */
if (pkt->hdr->final)
bfd->poll = 0;
/*
* Recalculate local and remote TX intervals if:
* Control packet with 'Final' bit is received OR
* Control packet with 'Poll' bit is received OR
* Session is not UP
*/
if ((bfd->local_state == BFD_STATE_UP &&
(pkt->hdr->poll || pkt->hdr->final)) ||
bfd->local_state != BFD_STATE_UP) {
if (bfd->remote_state == BFD_STATE_UP &&
(bfd->local_state == BFD_STATE_INIT || bfd->local_state == BFD_STATE_UP))
bfd_update_local_tx_intv(bfd);
else
bfd_idle_local_tx_intv(bfd);
bfd_update_remote_tx_intv(bfd);
}
/* Update the Detection Time */
bfd->local_detect_time = bfd->remote_detect_mult * bfd->remote_tx_intv;
bfd->remote_detect_time = bfd->local_detect_mult * bfd->local_tx_intv;
/* Check if timers are changed */
if (__test_bit(LOG_EXTRA_DETAIL_BIT, &debug) ||
(__test_bit(LOG_DETAIL_BIT, &debug) &&
(bfd->remote_min_rx_intv != old_remote_rx_intv ||
bfd->remote_min_tx_intv != old_remote_tx_intv ||
bfd->remote_detect_mult != old_remote_detect_mult ||
bfd->local_tx_intv != old_local_tx_intv)))
bfd_dump_timers(NULL, bfd);
/* Reschedule sender if local_tx_intv is being reduced */
if (bfd->local_tx_intv < old_local_tx_intv &&
bfd_sender_scheduled(bfd))
bfd_sender_reschedule(bfd);
/* Report detection time changes */
if (bfd->local_detect_time != old_local_detect_time)
log_message(LOG_INFO, "BFD_Instance(%s) Detection time"
" is %" PRIu64 " ms (was %" PRIu64 " ms)", bfd->iname,
bfd->local_detect_time / 1000,
old_local_detect_time / 1000);
/* BFD state machine */
if (bfd->remote_state == BFD_STATE_ADMINDOWN &&
bfd->local_state != BFD_STATE_DOWN)
bfd_state_down(bfd, BFD_DIAG_NBR_SIGNALLED_DOWN);
else {
if (bfd->local_state == BFD_STATE_DOWN) {
if (bfd->remote_state == BFD_STATE_DOWN)
bfd_state_init(bfd);
else if (bfd->remote_state == BFD_STATE_INIT)
bfd_state_up(bfd);
} else if (bfd->local_state == BFD_STATE_INIT) {
if (bfd->remote_state == BFD_STATE_INIT ||
bfd->remote_state == BFD_STATE_UP)
bfd_state_up(bfd);
} else if (bfd->local_state == BFD_STATE_UP)
if (bfd->remote_state == BFD_STATE_DOWN)
bfd_state_down(bfd, BFD_DIAG_NBR_SIGNALLED_DOWN);
}
if (bfd->remote_demand &&
bfd->local_state == BFD_STATE_UP &&
bfd->remote_state == BFD_STATE_UP)
if (bfd_sender_scheduled(bfd))
bfd_sender_cancel(bfd);
if (!bfd->remote_demand ||
bfd->local_state != BFD_STATE_UP ||
bfd->remote_state != BFD_STATE_UP)
if (!bfd_sender_scheduled(bfd))
bfd_sender_schedule(bfd);
if (pkt->hdr->poll) {
bfd->final = 1;
thread_add_event(master, bfd_sender_thread, bfd, 0);
}
/* Update last seen timer */
bfd->last_seen = timer_now();
/* Delay expiration if scheduled */
if (bfd->local_state == BFD_STATE_UP &&
bfd_expire_scheduled(bfd))
bfd_expire_reschedule(bfd);
}
/* Reads one packet from input socket */
static int
bfd_receive_packet(bfdpkt_t *pkt, int fd, char *buf, ssize_t bufsz)
{
ssize_t len;
unsigned int ttl = 0;
struct msghdr msg;
struct cmsghdr *cmsg = NULL;
char cbuf[CMSG_SPACE(sizeof (struct in6_pktinfo)) + CMSG_SPACE(sizeof(ttl))];
struct iovec iov[1];
struct in6_pktinfo *pktinfo;
assert(pkt);
assert(fd >= 0);
assert(buf);
assert(bufsz);
iov[0].iov_base = buf;
iov[0].iov_len = bufsz;
msg.msg_name = &pkt->src_addr;
msg.msg_namelen = sizeof (pkt->src_addr);
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = cbuf;
msg.msg_controllen = sizeof (cbuf);
msg.msg_flags = 0; /* Unnecessary, but keep coverity happy */
len = recvmsg(fd, &msg, MSG_DONTWAIT);
if (len == -1) {
log_message(LOG_ERR, "recvmsg() error (%m)");
return 1;
}
if (msg.msg_flags & MSG_TRUNC) {
log_message(LOG_WARNING, "recvmsg() message truncated");
return 1;
}
if (msg.msg_flags & MSG_CTRUNC)
log_message(LOG_WARNING, "recvmsg() control message truncated");
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if ((cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_TTL) ||
(cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_HOPLIMIT))
ttl = *CMSG_DATA(cmsg);
else if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
pktinfo = (struct in6_pktinfo *)CMSG_DATA(cmsg);
if (IN6_IS_ADDR_V4MAPPED(&pktinfo->ipi6_addr)) {
((struct sockaddr_in *)&pkt->dst_addr)->sin_addr.s_addr = pktinfo->ipi6_addr.s6_addr32[3];
pkt->dst_addr.ss_family = AF_INET;
} else {
memcpy(&((struct sockaddr_in6 *)&pkt->dst_addr)->sin6_addr, &pktinfo->ipi6_addr, sizeof(pktinfo->ipi6_addr));
pkt->dst_addr.ss_family = AF_INET6;
}
}
else
log_message(LOG_WARNING, "recvmsg() received"
" unexpected control message (level %d type %d)",
cmsg->cmsg_level, cmsg->cmsg_type);
}
if (!ttl)
log_message(LOG_WARNING, "recvmsg() returned no TTL control message");
pkt->hdr = (bfdhdr_t *) buf;
pkt->len = len;
pkt->ttl = ttl;
/* Convert an IPv4-mapped IPv6 address to a real IPv4 address */
if (IN6_IS_ADDR_V4MAPPED(&((struct sockaddr_in6 *)&pkt->src_addr)->sin6_addr)) {
((struct sockaddr_in *)&pkt->src_addr)->sin_addr.s_addr = ((struct sockaddr_in6 *)&pkt->src_addr)->sin6_addr.s6_addr32[3];
pkt->src_addr.ss_family = AF_INET;
}
return 0;
}
/*
* Reciever thread
*/
/* Runs when data is available in listening socket */
static void
bfd_receiver_thread(thread_ref_t thread)
{
bfd_data_t *data;
bfdpkt_t pkt;
int fd;
assert(thread);
data = THREAD_ARG(thread);
assert(data);
fd = thread->u.f.fd;
assert(fd >= 0);
data->thread_in = NULL;
/* Ignore THREAD_READ_TIMEOUT */
if (thread->type == THREAD_READY_READ_FD) {
if (!bfd_receive_packet(&pkt, fd, bfd_buffer, BFD_BUFFER_SIZE))
bfd_handle_packet(&pkt);
}
data->thread_in =
thread_add_read(thread->master, bfd_receiver_thread, data,
fd, TIMER_NEVER, false);
}
/*
* Initialization functions
*/
/* Prepares UDP socket for listening on *:3784 (both IPv4 and IPv6) */
static int
bfd_open_fd_in(bfd_data_t *data)
{
struct addrinfo hints;
struct addrinfo *ai_in;
int ret;
int yes = 1;
assert(data);
assert(data->fd_in == -1);
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET6;
hints.ai_flags = AI_NUMERICSERV | AI_PASSIVE;
hints.ai_protocol = IPPROTO_UDP;
hints.ai_socktype = SOCK_DGRAM;
if ((ret = getaddrinfo(NULL, BFD_CONTROL_PORT, &hints, &ai_in)))
log_message(LOG_ERR, "getaddrinfo() error %d (%s)", ret, gai_strerror(ret));
else if ((data->fd_in = socket(AF_INET6, ai_in->ai_socktype, ai_in->ai_protocol)) == -1)
log_message(LOG_ERR, "socket() error %d (%m)", errno);
else if ((ret = setsockopt(data->fd_in, IPPROTO_IP, IP_RECVTTL, &yes, sizeof (yes))) == -1)
log_message(LOG_ERR, "setsockopt(IP_RECVTTL) error %d (%m)", errno);
else if ((ret = setsockopt(data->fd_in, IPPROTO_IPV6, IPV6_RECVHOPLIMIT, &yes, sizeof (yes))) == -1)
log_message(LOG_ERR, "setsockopt(IPV6_RECVHOPLIMIT) error %d (%m)", errno);
else if ((ret = setsockopt(data->fd_in, IPPROTO_IPV6, IPV6_RECVPKTINFO, &yes, sizeof (yes))) == -1)
log_message(LOG_ERR, "setsockopt(IPV6_RECVPKTINFO) error %d (%m)", errno);
else if ((ret = bind(data->fd_in, ai_in->ai_addr, ai_in->ai_addrlen)) == -1)
log_message(LOG_ERR, "bind() error %d (%m)", errno);
if (ret)
ret = 1;
freeaddrinfo(ai_in);
return ret;
}
static bool
read_local_port_range(uint32_t port_limits[2])
{
char buf[5 + 1 + 5 + 1 + 1]; /* 32768<TAB>60999<NL> */
int fd;
ssize_t len;
long val[2];
char *endptr;
/* Default to sensible values */
port_limits[0] = 49152;
port_limits[1] = 60999;
fd = open("/proc/sys/net/ipv4/ip_local_port_range", O_RDONLY);
if (fd == -1)
return false;
len = read(fd, buf, sizeof(buf));
close(fd);
if (len == -1 || len == sizeof(buf))
return false;
buf[len] = '\0';
val[0] = strtol(buf, &endptr, 10);
if (val[0] <= 0 || val[0] == LONG_MAX || (*endptr != '\t' && *endptr != ' '))
return false;
val[1] = strtol(endptr + 1, &endptr, 10);
if (val[1] <= 0 || val[1] == LONG_MAX || *endptr != '\n')
return false;
port_limits[0] = val[0];
port_limits[1] = val[1];
return true;
}
/* Prepares UDP socket for sending data to neighbor */
static int
bfd_open_fd_out(bfd_t *bfd)
{
int ttl;
int ret;
uint32_t port_limits[2];
uint16_t orig_port, port;
socklen_t sockaddr_len;
assert(bfd);
assert(bfd->fd_out == -1);
bfd->fd_out = socket(bfd->nbr_addr.ss_family, SOCK_DGRAM, IPPROTO_UDP);
if (bfd->fd_out == -1) {
log_message(LOG_ERR, "BFD_Instance(%s) socket() error (%m)",
bfd->iname);
return 1;
}
if (bfd->src_addr.ss_family) {
/* Generate a random port number within the valid range */
read_local_port_range(port_limits);
if (port_limits[0] < BFD_MIN_PORT)
port_limits[0] = BFD_MIN_PORT;
if (port_limits[1] > BFD_MAX_PORT)
port_limits[1] = BFD_MAX_PORT;
/* Ensure we have a range of at least 1024 ports (an arbitrary number)
* to try. */
if (port_limits[0] + 1023 > port_limits[1]) {
/* Just use the BFD defaults */
port_limits[0] = BFD_MIN_PORT;
port_limits[1] = BFD_MAX_PORT;
}
orig_port = port = rand_intv(port_limits[0], port_limits[1]);
sockaddr_len = bfd->src_addr.ss_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
do {
/* Try binding socket to the address until we find one available */
if (bfd->src_addr.ss_family == AF_INET)
((struct sockaddr_in *)&bfd->src_addr)->sin_port = htons(port);
else
((struct sockaddr_in6 *)&bfd->src_addr)->sin6_port = htons(port);
ret = bind(bfd->fd_out, (struct sockaddr *) &bfd->src_addr, sockaddr_len);
if (ret == -1 && errno == EADDRINUSE) {
/* Port already in use, try next */
if (++port > port_limits[1])
port = port_limits[0];
if (port == orig_port)
break;
continue;
}
break;
} while (true);
if (ret == -1) {
log_message(LOG_ERR,
"BFD_Instance(%s) bind() error (%m)",
bfd->iname);
return 1;
}
} else {
/* We have a problem here - we do not have a source address, and so
* cannot bind the socket. That means that we will get a system allocated
* port, which may be outside the range [49152, 65535], as specified in
* RFC5881. */
}
ttl = bfd->ttl;
if (bfd->nbr_addr.ss_family == AF_INET)
ret = setsockopt(bfd->fd_out, IPPROTO_IP, IP_TTL, &ttl, sizeof (ttl));
else
ret = setsockopt(bfd->fd_out, IPPROTO_IPV6, IPV6_UNICAST_HOPS, &ttl, sizeof (ttl));
if (ret == -1) {
log_message(LOG_ERR, "BFD_Instance(%s) setsockopt() "
" error (%m)", bfd->iname);
return 1;
}
return 0;
}
/* Opens all needed sockets */
static int
bfd_open_fds(bfd_data_t *data)
{
bfd_t *bfd;
assert(data);
/* Do not reopen input socket on reload */
if (bfd_data->fd_in == -1) {
if (bfd_open_fd_in(data)) {
log_message(LOG_ERR, "Unable to open listening socket");
/* There is no point to stay alive w/o listening socket */
return 1;
}
}
list_for_each_entry(bfd, &data->bfd, e_list) {
if (bfd_open_fd_out(bfd)) {
log_message(LOG_ERR, "BFD_Instance(%s) Unable to"
" open output socket, disabling instance",
bfd->iname);
bfd_state_admindown(bfd);
}
}
return 0;
}
/* Registers sender and receiver threads */
static void
bfd_register_workers(bfd_data_t *data)
{
bfd_t *bfd;
assert(data);
assert(!data->thread_in);
/* Set timeout to not expire */
data->thread_in = thread_add_read(master, bfd_receiver_thread,
data, data->fd_in, TIMER_NEVER, false);
/* Resume or schedule threads */
list_for_each_entry(bfd, &data->bfd, e_list) {
/* Do not start anything if instance is in AdminDown state.
Discard saved state if any */
if (bfd_sender_suspended(bfd)) {
if (BFD_ISADMINDOWN(bfd))
bfd_sender_discard(bfd);
else
bfd_sender_resume(bfd);
} else if (!BFD_ISADMINDOWN(bfd) && !bfd->passive)
bfd_sender_schedule(bfd);
if (bfd_expire_suspended(bfd)) {
if (BFD_ISADMINDOWN(bfd))
bfd_expire_discard(bfd);
else
bfd_expire_resume(bfd);
}
if (bfd_reset_suspended(bfd)) {
if (BFD_ISADMINDOWN(bfd))
bfd_reset_discard(bfd);
else
bfd_reset_resume(bfd);
}
/* Send our status to VRRP process */
bfd_event_send(bfd);
/* If we are starting up, send a packet */
if (!reload && !bfd->passive)
thread_add_event(master, bfd_sender_thread, bfd, 0);
}
}
/* Suspends threads, closes sockets */
void
bfd_dispatcher_release(bfd_data_t *data)
{
bfd_t *bfd;
assert(data);
/* Looks like dispatcher wasn't initialized yet
This can happen is case of a configuration error */
if (!data->thread_in)
return;
assert(data->fd_in != -1);
thread_cancel(data->thread_in);
data->thread_in = NULL;
/* Do not close fd_in on reload */
if (!reload) {
close(data->fd_in);
data->fd_in = -1;
}
/* Suspend threads for possible resuming after reconfiguration */
set_time_now();
list_for_each_entry(bfd, &data->bfd, e_list) {
if (bfd_sender_scheduled(bfd))
bfd_sender_suspend(bfd);
if (bfd_expire_scheduled(bfd))
bfd_expire_suspend(bfd);
if (bfd_reset_scheduled(bfd))
bfd_reset_suspend(bfd);
assert(bfd->fd_out != -1);
close(bfd->fd_out);
bfd->fd_out = -1;
}
cancel_signal_read_thread();
}
/* Starts BFD dispatcher */
void
bfd_dispatcher_init(thread_ref_t thread)
{
bfd_data_t *data;
assert(thread);
data = THREAD_ARG(thread);
if (bfd_open_fds(data))
exit(EXIT_FAILURE);
bfd_register_workers(data);
}
#ifdef THREAD_DUMP
void
register_bfd_scheduler_addresses(void)
{
register_thread_address("bfd_sender_thread", bfd_sender_thread);
register_thread_address("bfd_expire_thread", bfd_expire_thread);
register_thread_address("bfd_reset_thread", bfd_reset_thread);
register_thread_address("bfd_receiver_thread", bfd_receiver_thread);
}
#endif