/*
* Copyright (C) 2010 Miroslav Lichvar <mlichvar@redhat.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "node.h"
#include "network.h"
#include "protocol.h"
#include "sysheaders.h"
Node::Node(int index, Network *network) {
this->network = network;
this->index = index;
fd = -1;
pending_request = REQ_REGISTER;
start_time = 0.0;
terminate = false;
}
Node::~Node() {
while (!incoming_packets.empty()) {
delete incoming_packets.back();
incoming_packets.pop_back();
}
terminate = true;
do {
if (waiting())
resume();
} while (process_fd());
if (fd >= 0)
close(fd);
}
void Node::set_fd(int fd) {
this->fd = fd;
}
int Node::get_fd() const {
return fd;
}
void Node::set_start_time(double time) {
start_time = time;
}
bool Node::process_fd() {
Request_packet request;
int received, reqlen;
received = recv(fd, &request, sizeof (request), 0);
if (received < (int)sizeof (request.header))
return false;
reqlen = received - (int)offsetof(Request_packet, data);
assert(pending_request == 0);
pending_request = request.header.request;
#ifdef DEBUG
printf("received request %ld in node %d at %f\n",
pending_request, index, clock.get_real_time());
#endif
switch (pending_request) {
case REQ_GETTIME:
assert(reqlen == 0);
process_gettime();
break;
case REQ_SETTIME:
assert(reqlen == sizeof (Request_settime));
process_settime(&request.data.settime);
break;
case REQ_ADJTIMEX:
assert(reqlen == sizeof (Request_adjtimex));
process_adjtimex(&request.data.adjtimex);
break;
case REQ_ADJTIME:
assert(reqlen == sizeof (Request_adjtime));
process_adjtime(&request.data.adjtime);
break;
case REQ_SELECT:
assert(reqlen == sizeof (Request_select));
process_select(&request.data.select);
break;
case REQ_SEND:
/* request with variable length */
assert(reqlen >= (int)offsetof(Request_send, data) &&
reqlen <= (int)sizeof (Request_send));
assert(request.data.send.len <= sizeof (request.data.send.data));
assert((int)(request.data.send.len + offsetof(Request_send, data)) <= reqlen);
process_send(&request.data.send);
break;
case REQ_RECV:
assert(reqlen == 0);
process_recv();
break;
case REQ_GETREFSAMPLE:
assert(reqlen == 0);
process_getrefsample();
break;
case REQ_GETREFOFFSETS:
assert(reqlen == 0);
process_getrefoffsets();
break;
case REQ_DEREGISTER:
assert(reqlen == 0);
break;
default:
assert(0);
}
return true;
}
void Node::reply(void *data, int len, int request) {
int sent;
assert(request == pending_request);
pending_request = 0;
if (data) {
sent = send(fd, data, len, 0);
assert(sent == len);
}
}
void Node::process_gettime() {
Reply_gettime r;
r.real_time = clock.get_real_time();
r.monotonic_time = clock.get_monotonic_time();
r.network_time = network->get_time();
reply(&r, sizeof (r), REQ_GETTIME);
}
void Node::process_settime(Request_settime *req) {
clock.set_time(req->time);
reply(NULL, 0, REQ_SETTIME);
}
void Node::process_adjtimex(Request_adjtimex *req) {
Reply_adjtimex rep;
struct timex *buf = &req->timex;
rep.ret = clock.adjtimex(buf);
rep.timex = *buf;
rep._pad = 0;
reply(&rep, sizeof (rep), REQ_ADJTIMEX);
}
void Node::process_adjtime(Request_adjtime *req) {
Reply_adjtime rep;
clock.adjtime(&req->tv, &rep.tv);
reply(&rep, sizeof (rep), REQ_ADJTIME);
}
void Node::try_select() {
Reply_select rep = {-1, 0, 0};
if (terminate) {
rep.ret = REPLY_SELECT_TERMINATE;
#ifdef DEBUG
printf("select returned on termination in %d at %f\n",
index, clock.get_real_time());
#endif
} else if (select_timeout - clock.get_monotonic_time() <= 0.0) {
assert(select_timeout - clock.get_monotonic_time() > -1e-10);
rep.ret = REPLY_SELECT_TIMEOUT;
#ifdef DEBUG
printf("select returned on timeout in %d at %f\n", index, clock.get_real_time());
#endif
} else if (select_read && incoming_packets.size() > 0) {
rep.ret = incoming_packets.back()->broadcast ?
REPLY_SELECT_BROADCAST :
REPLY_SELECT_NORMAL;
rep.subnet = incoming_packets.back()->subnet;
rep.dst_port = incoming_packets.back()->dst_port;
#ifdef DEBUG
printf("select returned for packet in %d at %f\n", index, clock.get_real_time());
#endif
}
if (rep.ret >= 0) {
rep.time.real_time = clock.get_real_time();
rep.time.monotonic_time = clock.get_monotonic_time();
rep.time.network_time = network->get_time();
reply(&rep, sizeof (rep), REQ_SELECT);
}
}
void Node::process_select(Request_select *req) {
if (req->timeout < 0.0)
req->timeout = 0.0;
select_timeout = clock.get_monotonic_time() + req->timeout;
select_read = req->read;
#ifdef DEBUG
printf("select called with timeout %f read %d in %d at %f\n",
req->timeout, req->read, index, clock.get_real_time());
#endif
try_select();
}
void Node::process_send(Request_send *req) {
struct Packet *packet;
if (!terminate) {
packet = new struct Packet;
packet->broadcast = req->to == (unsigned int)-1;
packet->subnet = req->subnet;
packet->from = index;
packet->to = req->to;
packet->src_port = req->src_port;
packet->dst_port = req->dst_port;
packet->len = req->len;
memcpy(packet->data, req->data, req->len);
network->send(packet);
}
reply(NULL, 0, REQ_SEND);
}
void Node::process_recv() {
Reply_recv rep;
struct Packet *packet;
if (incoming_packets.empty()) {
rep.subnet = 0;
rep.from = -1;
rep.src_port = 0;
rep.dst_port = 0;
rep.len = 0;
reply(&rep, offsetof (Reply_recv, data), REQ_RECV);
return;
}
packet = incoming_packets.back();
rep.subnet = packet->subnet;
rep.from = packet->from;
rep.src_port = packet->src_port;
rep.dst_port = packet->dst_port;
rep.len = packet->len;
assert(packet->len <= sizeof (rep.data));
memcpy(rep.data, packet->data, packet->len);
delete packet;
reply(&rep, offsetof (Reply_recv, data) + rep.len, REQ_RECV);
incoming_packets.pop_back();
#ifdef DEBUG
printf("received packet in %d at %f\n", index, clock.get_real_time());
#endif
}
void Node::receive(struct Packet *packet) {
if (pending_request == REQ_REGISTER || pending_request == REQ_DEREGISTER) {
delete packet;
return;
}
incoming_packets.insert(incoming_packets.begin(), packet);
if (pending_request == REQ_SELECT)
try_select();
}
void Node::process_getrefsample() {
Reply_getrefsample r;
refclock.set_generation(true);
r.valid = refclock.get_sample(&r.time, &r.offset);
r._pad = 0;
reply(&r, sizeof (r), REQ_GETREFSAMPLE);
}
void Node::process_getrefoffsets() {
Reply_getrefoffsets r;
refclock.get_offsets(r.offsets, REPLY_GETREFOFFSETS_SIZE);
reply(&r, sizeof (r), REQ_GETREFOFFSETS);
}
void Node::resume() {
switch (pending_request) {
case REQ_SELECT:
try_select();
break;
case REQ_REGISTER:
if (start_time - network->get_time() <= 0.0 || terminate) {
Reply_register rep;
rep.subnets = network->get_subnets();
reply(&rep, sizeof (rep), REQ_REGISTER);
#ifdef DEBUG
printf("starting %d at %f\n", index, network->get_time());
#endif
}
break;
case REQ_DEREGISTER:
break;
default:
assert(0);
}
}
bool Node::waiting() const {
return pending_request == REQ_SELECT ||
pending_request == REQ_REGISTER ||
pending_request == REQ_DEREGISTER;
}
bool Node::finished() const {
return pending_request == REQ_DEREGISTER;
}
double Node::get_timeout() const {
switch (pending_request) {
case REQ_SELECT:
return clock.get_true_interval(select_timeout - clock.get_monotonic_time());
case REQ_REGISTER:
return start_time - network->get_time();
case REQ_DEREGISTER:
return 10.0;
default:
assert(0);
}
}
Clock *Node::get_clock() {
return &clock;
}
Refclock *Node::get_refclock() {
return &refclock;
}