Blame testsuite/clknetsim/node.cc

Packit 9c3e7e
/*
Packit 9c3e7e
 * Copyright (C) 2010  Miroslav Lichvar <mlichvar@redhat.com>
Packit 9c3e7e
 * 
Packit 9c3e7e
 * This program is free software; you can redistribute it and/or modify
Packit 9c3e7e
 * it under the terms of the GNU General Public License as published by
Packit 9c3e7e
 * the Free Software Foundation; either version 2 of the License, or
Packit 9c3e7e
 * (at your option) any later version.
Packit 9c3e7e
 * 
Packit 9c3e7e
 * This program is distributed in the hope that it will be useful,
Packit 9c3e7e
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 9c3e7e
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit 9c3e7e
 * GNU General Public License for more details.
Packit 9c3e7e
 * 
Packit 9c3e7e
 * You should have received a copy of the GNU General Public License
Packit 9c3e7e
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
Packit 9c3e7e
 */
Packit 9c3e7e
Packit 9c3e7e
#include "node.h"
Packit 9c3e7e
#include "network.h"
Packit 9c3e7e
#include "protocol.h"
Packit 9c3e7e
#include "sysheaders.h"
Packit 9c3e7e
Packit 9c3e7e
Node::Node(int index, Network *network) {
Packit 9c3e7e
	this->network = network;
Packit 9c3e7e
	this->index = index;
Packit 9c3e7e
	fd = -1;
Packit 9c3e7e
	pending_request = REQ_REGISTER;
Packit 9c3e7e
	start_time = 0.0;
Packit 9c3e7e
	terminate = false;
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
Node::~Node() {
Packit 9c3e7e
	while (!incoming_packets.empty()) {
Packit 9c3e7e
		delete incoming_packets.back();
Packit 9c3e7e
		incoming_packets.pop_back();
Packit 9c3e7e
	}
Packit 9c3e7e
Packit 9c3e7e
	terminate = true;
Packit 9c3e7e
Packit 9c3e7e
	do {
Packit 9c3e7e
		if (waiting())
Packit 9c3e7e
			resume();
Packit 9c3e7e
	} while (process_fd());
Packit 9c3e7e
Packit 9c3e7e
	if (fd >= 0)
Packit 9c3e7e
		close(fd);
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::set_fd(int fd) {
Packit 9c3e7e
	this->fd = fd;
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
int Node::get_fd() const {
Packit 9c3e7e
	return fd;
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::set_start_time(double time) {
Packit 9c3e7e
	start_time = time;
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
bool Node::process_fd() {
Packit 9c3e7e
	Request_packet request;
Packit 9c3e7e
	int received, reqlen;
Packit 9c3e7e
Packit 9c3e7e
	received = recv(fd, &request, sizeof (request), 0);
Packit 9c3e7e
	if (received < (int)sizeof (request.header))
Packit 9c3e7e
		return false;
Packit 9c3e7e
Packit 9c3e7e
	reqlen = received - (int)offsetof(Request_packet, data);
Packit 9c3e7e
Packit 9c3e7e
	assert(pending_request == 0);
Packit 9c3e7e
	pending_request = request.header.request;
Packit 9c3e7e
Packit 9c3e7e
#ifdef DEBUG
Packit 9c3e7e
	printf("received request %ld in node %d at %f\n",
Packit 9c3e7e
			pending_request, index, clock.get_real_time());
Packit 9c3e7e
#endif
Packit 9c3e7e
Packit 9c3e7e
	switch (pending_request) {
Packit 9c3e7e
		case REQ_GETTIME:
Packit 9c3e7e
			assert(reqlen == 0);
Packit 9c3e7e
			process_gettime();
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_SETTIME:
Packit 9c3e7e
			assert(reqlen == sizeof (Request_settime));
Packit 9c3e7e
			process_settime(&request.data.settime);
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_ADJTIMEX:
Packit 9c3e7e
			assert(reqlen == sizeof (Request_adjtimex));
Packit 9c3e7e
			process_adjtimex(&request.data.adjtimex);
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_ADJTIME:
Packit 9c3e7e
			assert(reqlen == sizeof (Request_adjtime));
Packit 9c3e7e
			process_adjtime(&request.data.adjtime);
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_SELECT:
Packit 9c3e7e
			assert(reqlen == sizeof (Request_select));
Packit 9c3e7e
			process_select(&request.data.select);
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_SEND:
Packit 9c3e7e
			/* request with variable length */
Packit 9c3e7e
			assert(reqlen >= (int)offsetof(Request_send, data) &&
Packit 9c3e7e
					reqlen <= (int)sizeof (Request_send));
Packit 9c3e7e
			assert(request.data.send.len <= sizeof (request.data.send.data));
Packit 9c3e7e
			assert((int)(request.data.send.len + offsetof(Request_send, data)) <= reqlen);
Packit 9c3e7e
			process_send(&request.data.send);
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_RECV:
Packit 9c3e7e
			assert(reqlen == 0);
Packit 9c3e7e
			process_recv();
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_GETREFSAMPLE:
Packit 9c3e7e
			assert(reqlen == 0);
Packit 9c3e7e
			process_getrefsample();
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_GETREFOFFSETS:
Packit 9c3e7e
			assert(reqlen == 0);
Packit 9c3e7e
			process_getrefoffsets();
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_DEREGISTER:
Packit 9c3e7e
			assert(reqlen == 0);
Packit 9c3e7e
			break;
Packit 9c3e7e
		default:
Packit 9c3e7e
			assert(0);
Packit 9c3e7e
	}
Packit 9c3e7e
Packit 9c3e7e
	return true;
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::reply(void *data, int len, int request) {
Packit 9c3e7e
	int sent;
Packit 9c3e7e
Packit 9c3e7e
	assert(request == pending_request);
Packit 9c3e7e
	pending_request = 0;
Packit 9c3e7e
Packit 9c3e7e
	if (data) {
Packit 9c3e7e
		sent = send(fd, data, len, 0);
Packit 9c3e7e
		assert(sent == len);
Packit 9c3e7e
	}
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
Packit 9c3e7e
void Node::process_gettime() {
Packit 9c3e7e
	Reply_gettime r;
Packit 9c3e7e
Packit 9c3e7e
	r.real_time = clock.get_real_time();
Packit 9c3e7e
	r.monotonic_time = clock.get_monotonic_time();
Packit 9c3e7e
	r.network_time = network->get_time();
Packit 9c3e7e
	reply(&r, sizeof (r), REQ_GETTIME);
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::process_settime(Request_settime *req) {
Packit 9c3e7e
	clock.set_time(req->time);
Packit 9c3e7e
	reply(NULL, 0, REQ_SETTIME);
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::process_adjtimex(Request_adjtimex *req) {
Packit 9c3e7e
	Reply_adjtimex rep;
Packit 9c3e7e
	struct timex *buf = &req->timex;
Packit 9c3e7e
Packit 9c3e7e
	rep.ret = clock.adjtimex(buf);
Packit 9c3e7e
	rep.timex = *buf;
Packit 9c3e7e
	rep._pad = 0;
Packit 9c3e7e
	reply(&rep, sizeof (rep), REQ_ADJTIMEX);
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::process_adjtime(Request_adjtime *req) {
Packit 9c3e7e
	Reply_adjtime rep;
Packit 9c3e7e
Packit 9c3e7e
	clock.adjtime(&req->tv, &rep.tv);
Packit 9c3e7e
	reply(&rep, sizeof (rep), REQ_ADJTIME);
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::try_select() {
Packit 9c3e7e
	Reply_select rep = {-1, 0, 0};
Packit 9c3e7e
Packit 9c3e7e
	if (terminate) {
Packit 9c3e7e
		rep.ret = REPLY_SELECT_TERMINATE;
Packit 9c3e7e
#ifdef DEBUG
Packit 9c3e7e
		printf("select returned on termination in %d at %f\n",
Packit 9c3e7e
				index, clock.get_real_time());
Packit 9c3e7e
#endif
Packit 9c3e7e
	} else if (select_timeout - clock.get_monotonic_time() <= 0.0) {
Packit 9c3e7e
		assert(select_timeout - clock.get_monotonic_time() > -1e-10);
Packit 9c3e7e
		rep.ret = REPLY_SELECT_TIMEOUT;
Packit 9c3e7e
#ifdef DEBUG
Packit 9c3e7e
		printf("select returned on timeout in %d at %f\n", index, clock.get_real_time());
Packit 9c3e7e
#endif
Packit 9c3e7e
	} else if (select_read && incoming_packets.size() > 0) {
Packit 9c3e7e
		rep.ret = incoming_packets.back()->broadcast ?
Packit 9c3e7e
			REPLY_SELECT_BROADCAST :
Packit 9c3e7e
			REPLY_SELECT_NORMAL;
Packit 9c3e7e
		rep.subnet = incoming_packets.back()->subnet;
Packit 9c3e7e
		rep.dst_port = incoming_packets.back()->dst_port;
Packit 9c3e7e
#ifdef DEBUG
Packit 9c3e7e
		printf("select returned for packet in %d at %f\n", index, clock.get_real_time());
Packit 9c3e7e
#endif
Packit 9c3e7e
	}
Packit 9c3e7e
Packit 9c3e7e
	if (rep.ret >= 0) {
Packit 9c3e7e
		rep.time.real_time = clock.get_real_time();
Packit 9c3e7e
		rep.time.monotonic_time = clock.get_monotonic_time();
Packit 9c3e7e
		rep.time.network_time = network->get_time();
Packit 9c3e7e
		reply(&rep, sizeof (rep), REQ_SELECT);
Packit 9c3e7e
	}
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::process_select(Request_select *req) {
Packit 9c3e7e
	if (req->timeout < 0.0)
Packit 9c3e7e
		req->timeout = 0.0;
Packit 9c3e7e
	select_timeout = clock.get_monotonic_time() + req->timeout;
Packit 9c3e7e
	select_read = req->read;
Packit 9c3e7e
#ifdef DEBUG
Packit 9c3e7e
	printf("select called with timeout %f read %d in %d at %f\n",
Packit 9c3e7e
			req->timeout, req->read, index, clock.get_real_time());
Packit 9c3e7e
#endif
Packit 9c3e7e
	try_select();
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::process_send(Request_send *req) {
Packit 9c3e7e
	struct Packet *packet;
Packit 9c3e7e
Packit 9c3e7e
	if (!terminate) {
Packit 9c3e7e
		packet = new struct Packet;
Packit 9c3e7e
		packet->broadcast = req->to == (unsigned int)-1;
Packit 9c3e7e
		packet->subnet = req->subnet;
Packit 9c3e7e
		packet->from = index;
Packit 9c3e7e
		packet->to = req->to;
Packit 9c3e7e
		packet->src_port = req->src_port;
Packit 9c3e7e
		packet->dst_port = req->dst_port;
Packit 9c3e7e
		packet->len = req->len;
Packit 9c3e7e
		memcpy(packet->data, req->data, req->len);
Packit 9c3e7e
		network->send(packet);
Packit 9c3e7e
	}
Packit 9c3e7e
Packit 9c3e7e
	reply(NULL, 0, REQ_SEND);
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::process_recv() {
Packit 9c3e7e
	Reply_recv rep;
Packit 9c3e7e
	struct Packet *packet;
Packit 9c3e7e
Packit 9c3e7e
	if (incoming_packets.empty()) {
Packit 9c3e7e
		rep.subnet = 0;
Packit 9c3e7e
		rep.from = -1;
Packit 9c3e7e
		rep.src_port = 0;
Packit 9c3e7e
		rep.dst_port = 0;
Packit 9c3e7e
		rep.len = 0;
Packit 9c3e7e
		reply(&rep, offsetof (Reply_recv, data), REQ_RECV);
Packit 9c3e7e
Packit 9c3e7e
		return;
Packit 9c3e7e
	}
Packit 9c3e7e
Packit 9c3e7e
	packet = incoming_packets.back();
Packit 9c3e7e
Packit 9c3e7e
	rep.subnet = packet->subnet;
Packit 9c3e7e
	rep.from = packet->from;
Packit 9c3e7e
	rep.src_port = packet->src_port;
Packit 9c3e7e
	rep.dst_port = packet->dst_port;
Packit 9c3e7e
	rep.len = packet->len;
Packit 9c3e7e
Packit 9c3e7e
	assert(packet->len <= sizeof (rep.data));
Packit 9c3e7e
	memcpy(rep.data, packet->data, packet->len);
Packit 9c3e7e
	
Packit 9c3e7e
	delete packet;
Packit 9c3e7e
Packit 9c3e7e
	reply(&rep, offsetof (Reply_recv, data) + rep.len, REQ_RECV);
Packit 9c3e7e
Packit 9c3e7e
	incoming_packets.pop_back();
Packit 9c3e7e
#ifdef DEBUG
Packit 9c3e7e
	printf("received packet in %d at %f\n", index, clock.get_real_time());
Packit 9c3e7e
#endif
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::receive(struct Packet *packet) {
Packit 9c3e7e
	if (pending_request == REQ_REGISTER || pending_request == REQ_DEREGISTER) {
Packit 9c3e7e
		delete packet;
Packit 9c3e7e
		return;
Packit 9c3e7e
	}
Packit 9c3e7e
Packit 9c3e7e
	incoming_packets.insert(incoming_packets.begin(), packet);
Packit 9c3e7e
Packit 9c3e7e
	if (pending_request == REQ_SELECT)
Packit 9c3e7e
		try_select();
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::process_getrefsample() {
Packit 9c3e7e
	Reply_getrefsample r;
Packit 9c3e7e
Packit 9c3e7e
	refclock.set_generation(true);
Packit 9c3e7e
	r.valid = refclock.get_sample(&r.time, &r.offset);
Packit 9c3e7e
	r._pad = 0;
Packit 9c3e7e
	reply(&r, sizeof (r), REQ_GETREFSAMPLE);
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::process_getrefoffsets() {
Packit 9c3e7e
	Reply_getrefoffsets r;
Packit 9c3e7e
Packit 9c3e7e
	refclock.get_offsets(r.offsets, REPLY_GETREFOFFSETS_SIZE);
Packit 9c3e7e
	reply(&r, sizeof (r), REQ_GETREFOFFSETS);
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
void Node::resume() {
Packit 9c3e7e
	switch (pending_request) {
Packit 9c3e7e
		case REQ_SELECT:
Packit 9c3e7e
			try_select();
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_REGISTER:
Packit 9c3e7e
			if (start_time - network->get_time() <= 0.0 || terminate) {
Packit 9c3e7e
				Reply_register rep;
Packit 9c3e7e
				rep.subnets = network->get_subnets();
Packit 9c3e7e
				reply(&rep, sizeof (rep), REQ_REGISTER);
Packit 9c3e7e
#ifdef DEBUG
Packit 9c3e7e
				printf("starting %d at %f\n", index, network->get_time());
Packit 9c3e7e
#endif
Packit 9c3e7e
			}
Packit 9c3e7e
			break;
Packit 9c3e7e
		case REQ_DEREGISTER:
Packit 9c3e7e
			break;
Packit 9c3e7e
		default:
Packit 9c3e7e
			assert(0);
Packit 9c3e7e
Packit 9c3e7e
	}
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
bool Node::waiting() const {
Packit 9c3e7e
	return pending_request == REQ_SELECT ||
Packit 9c3e7e
		pending_request == REQ_REGISTER ||
Packit 9c3e7e
		pending_request == REQ_DEREGISTER;
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
bool Node::finished() const {
Packit 9c3e7e
	return pending_request == REQ_DEREGISTER;
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
double Node::get_timeout() const {
Packit 9c3e7e
	switch (pending_request) {
Packit 9c3e7e
		case REQ_SELECT:
Packit 9c3e7e
			return clock.get_true_interval(select_timeout - clock.get_monotonic_time());
Packit 9c3e7e
		case REQ_REGISTER:
Packit 9c3e7e
			return start_time - network->get_time();
Packit 9c3e7e
		case REQ_DEREGISTER:
Packit 9c3e7e
			return 10.0;
Packit 9c3e7e
		default:
Packit 9c3e7e
			assert(0);
Packit 9c3e7e
	}
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
Clock *Node::get_clock() {
Packit 9c3e7e
	return &clock;
Packit 9c3e7e
}
Packit 9c3e7e
Packit 9c3e7e
Refclock *Node::get_refclock() {
Packit 9c3e7e
	return &refclock;
Packit 9c3e7e
}