/* * Copyright (C) 2010 Miroslav Lichvar * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "node.h" #include "network.h" #include "protocol.h" #include "sysheaders.h" Node::Node(int index, Network *network) { this->network = network; this->index = index; fd = -1; pending_request = REQ_REGISTER; start_time = 0.0; terminate = false; } Node::~Node() { while (!incoming_packets.empty()) { delete incoming_packets.back(); incoming_packets.pop_back(); } terminate = true; do { if (waiting()) resume(); } while (process_fd()); if (fd >= 0) close(fd); } void Node::set_fd(int fd) { this->fd = fd; } int Node::get_fd() const { return fd; } void Node::set_start_time(double time) { start_time = time; } bool Node::process_fd() { Request_packet request; int received, reqlen; received = recv(fd, &request, sizeof (request), 0); if (received < (int)sizeof (request.header)) return false; reqlen = received - (int)offsetof(Request_packet, data); assert(pending_request == 0); pending_request = request.header.request; #ifdef DEBUG printf("received request %ld in node %d at %f\n", pending_request, index, clock.get_real_time()); #endif switch (pending_request) { case REQ_GETTIME: assert(reqlen == 0); process_gettime(); break; case REQ_SETTIME: assert(reqlen == sizeof (Request_settime)); process_settime(&request.data.settime); break; case REQ_ADJTIMEX: assert(reqlen == sizeof (Request_adjtimex)); process_adjtimex(&request.data.adjtimex); break; case REQ_ADJTIME: assert(reqlen == sizeof (Request_adjtime)); process_adjtime(&request.data.adjtime); break; case REQ_SELECT: assert(reqlen == sizeof (Request_select)); process_select(&request.data.select); break; case REQ_SEND: /* request with variable length */ assert(reqlen >= (int)offsetof(Request_send, data) && reqlen <= (int)sizeof (Request_send)); assert(request.data.send.len <= sizeof (request.data.send.data)); assert((int)(request.data.send.len + offsetof(Request_send, data)) <= reqlen); process_send(&request.data.send); break; case REQ_RECV: assert(reqlen == 0); process_recv(); break; case REQ_GETREFSAMPLE: assert(reqlen == 0); process_getrefsample(); break; case REQ_GETREFOFFSETS: assert(reqlen == 0); process_getrefoffsets(); break; case REQ_DEREGISTER: assert(reqlen == 0); break; default: assert(0); } return true; } void Node::reply(void *data, int len, int request) { int sent; assert(request == pending_request); pending_request = 0; if (data) { sent = send(fd, data, len, 0); assert(sent == len); } } void Node::process_gettime() { Reply_gettime r; r.real_time = clock.get_real_time(); r.monotonic_time = clock.get_monotonic_time(); r.network_time = network->get_time(); reply(&r, sizeof (r), REQ_GETTIME); } void Node::process_settime(Request_settime *req) { clock.set_time(req->time); reply(NULL, 0, REQ_SETTIME); } void Node::process_adjtimex(Request_adjtimex *req) { Reply_adjtimex rep; struct timex *buf = &req->timex; rep.ret = clock.adjtimex(buf); rep.timex = *buf; rep._pad = 0; reply(&rep, sizeof (rep), REQ_ADJTIMEX); } void Node::process_adjtime(Request_adjtime *req) { Reply_adjtime rep; clock.adjtime(&req->tv, &rep.tv); reply(&rep, sizeof (rep), REQ_ADJTIME); } void Node::try_select() { Reply_select rep = {-1, 0, 0}; if (terminate) { rep.ret = REPLY_SELECT_TERMINATE; #ifdef DEBUG printf("select returned on termination in %d at %f\n", index, clock.get_real_time()); #endif } else if (select_timeout - clock.get_monotonic_time() <= 0.0) { assert(select_timeout - clock.get_monotonic_time() > -1e-10); rep.ret = REPLY_SELECT_TIMEOUT; #ifdef DEBUG printf("select returned on timeout in %d at %f\n", index, clock.get_real_time()); #endif } else if (select_read && incoming_packets.size() > 0) { rep.ret = incoming_packets.back()->broadcast ? REPLY_SELECT_BROADCAST : REPLY_SELECT_NORMAL; rep.subnet = incoming_packets.back()->subnet; rep.dst_port = incoming_packets.back()->dst_port; #ifdef DEBUG printf("select returned for packet in %d at %f\n", index, clock.get_real_time()); #endif } if (rep.ret >= 0) { rep.time.real_time = clock.get_real_time(); rep.time.monotonic_time = clock.get_monotonic_time(); rep.time.network_time = network->get_time(); reply(&rep, sizeof (rep), REQ_SELECT); } } void Node::process_select(Request_select *req) { if (req->timeout < 0.0) req->timeout = 0.0; select_timeout = clock.get_monotonic_time() + req->timeout; select_read = req->read; #ifdef DEBUG printf("select called with timeout %f read %d in %d at %f\n", req->timeout, req->read, index, clock.get_real_time()); #endif try_select(); } void Node::process_send(Request_send *req) { struct Packet *packet; if (!terminate) { packet = new struct Packet; packet->broadcast = req->to == (unsigned int)-1; packet->subnet = req->subnet; packet->from = index; packet->to = req->to; packet->src_port = req->src_port; packet->dst_port = req->dst_port; packet->len = req->len; memcpy(packet->data, req->data, req->len); network->send(packet); } reply(NULL, 0, REQ_SEND); } void Node::process_recv() { Reply_recv rep; struct Packet *packet; if (incoming_packets.empty()) { rep.subnet = 0; rep.from = -1; rep.src_port = 0; rep.dst_port = 0; rep.len = 0; reply(&rep, offsetof (Reply_recv, data), REQ_RECV); return; } packet = incoming_packets.back(); rep.subnet = packet->subnet; rep.from = packet->from; rep.src_port = packet->src_port; rep.dst_port = packet->dst_port; rep.len = packet->len; assert(packet->len <= sizeof (rep.data)); memcpy(rep.data, packet->data, packet->len); delete packet; reply(&rep, offsetof (Reply_recv, data) + rep.len, REQ_RECV); incoming_packets.pop_back(); #ifdef DEBUG printf("received packet in %d at %f\n", index, clock.get_real_time()); #endif } void Node::receive(struct Packet *packet) { if (pending_request == REQ_REGISTER || pending_request == REQ_DEREGISTER) { delete packet; return; } incoming_packets.insert(incoming_packets.begin(), packet); if (pending_request == REQ_SELECT) try_select(); } void Node::process_getrefsample() { Reply_getrefsample r; refclock.set_generation(true); r.valid = refclock.get_sample(&r.time, &r.offset); r._pad = 0; reply(&r, sizeof (r), REQ_GETREFSAMPLE); } void Node::process_getrefoffsets() { Reply_getrefoffsets r; refclock.get_offsets(r.offsets, REPLY_GETREFOFFSETS_SIZE); reply(&r, sizeof (r), REQ_GETREFOFFSETS); } void Node::resume() { switch (pending_request) { case REQ_SELECT: try_select(); break; case REQ_REGISTER: if (start_time - network->get_time() <= 0.0 || terminate) { Reply_register rep; rep.subnets = network->get_subnets(); reply(&rep, sizeof (rep), REQ_REGISTER); #ifdef DEBUG printf("starting %d at %f\n", index, network->get_time()); #endif } break; case REQ_DEREGISTER: break; default: assert(0); } } bool Node::waiting() const { return pending_request == REQ_SELECT || pending_request == REQ_REGISTER || pending_request == REQ_DEREGISTER; } bool Node::finished() const { return pending_request == REQ_DEREGISTER; } double Node::get_timeout() const { switch (pending_request) { case REQ_SELECT: return clock.get_true_interval(select_timeout - clock.get_monotonic_time()); case REQ_REGISTER: return start_time - network->get_time(); case REQ_DEREGISTER: return 10.0; default: assert(0); } } Clock *Node::get_clock() { return &clock; } Refclock *Node::get_refclock() { return &refclock; }