/*
* Copyright (C) 2010 Miroslav Lichvar <mlichvar@redhat.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sysheaders.h"
#include "network.h"
Packet_queue::Packet_queue() {
}
Packet_queue::~Packet_queue() {
while (!queue.empty()) {
delete queue.back();
queue.pop_back();
}
}
void Packet_queue::insert(struct Packet *packet) {
deque<struct Packet *>::iterator i;
for (i = queue.begin(); i < queue.end(); i++)
if (packet->receive_time < (*i)->receive_time)
break;
queue.insert(i, packet);
}
struct Packet *Packet_queue::dequeue() {
struct Packet *ret;
assert(!queue.empty());
ret = queue.front();
queue.pop_front();
return ret;
}
double Packet_queue::get_timeout(double time) const {
if (!queue.empty()) {
return queue[0]->receive_time - time;
}
return 1e20;
}
Network::Network(const char *socket, unsigned int n, unsigned int subnets, unsigned int rate) {
time = 0.0;
this->subnets = subnets;
socket_name = socket;
update_rate = rate;
update_count = 0;
offset_log = NULL;
freq_log = NULL;
rawfreq_log = NULL;
packet_log = NULL;
assert(n > 0);
while (nodes.size() < n)
nodes.push_back(new Node(nodes.size(), this));
stats.resize(n);
link_delays.resize(n * n);
}
Network::~Network() {
while (!nodes.empty()) {
delete nodes.back();
nodes.pop_back();
}
while (!link_delays.empty()) {
delete link_delays.back();
link_delays.pop_back();
}
unlink(socket_name);
if (offset_log)
fclose(offset_log);
if (freq_log)
fclose(freq_log);
if (rawfreq_log)
fclose(rawfreq_log);
if (packet_log)
fclose(packet_log);
}
bool Network::prepare_clients() {
struct sockaddr_un s;
int sockfd, fd;
unsigned int i;
s.sun_family = AF_UNIX;
snprintf(s.sun_path, sizeof (s.sun_path), "%s", socket_name);
sockfd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (sockfd < 0) {
fprintf(stderr, "socket() failed\n");
return false;
}
unlink(socket_name);
if (bind(sockfd, (struct sockaddr *)&s, sizeof (s)) < 0) {
fprintf(stderr, "bind() failed\n");
return false;
}
if (listen(sockfd, nodes.size()) < 0) {
fprintf(stderr, "listen() failed\n");
return false;
}
for (i = 0; i < nodes.size(); i++) {
Request_packet req;
unsigned int node;
fprintf(stderr, "\rWaiting for %u clients...", (unsigned int)nodes.size() - i);
fd = accept(sockfd, NULL, NULL);
if (fd < 0) {
fprintf(stderr, "accept() failed\n");
return false;
}
if (recv(fd, &req, sizeof (req), 0) != offsetof(Request_packet, data) +
sizeof (Request_register) || req.header.request != REQ_REGISTER) {
fprintf(stderr, "client didn't register correctly.\n");
return false;
}
node = req.data._register.node;
assert(node < nodes.size() && nodes[node]->get_fd() < 0);
nodes[node]->set_fd(fd);
}
fprintf(stderr, "done\n");
close(sockfd);
update();
return true;
}
Node *Network::get_node(unsigned int node) {
assert(node < nodes.size());
return nodes[node];
}
void Network::set_link_delay_generator(unsigned int from, unsigned int to, Generator *generator) {
unsigned int i;
assert(from < nodes.size() && to < nodes.size());
i = from * nodes.size() + to;
if (link_delays[i])
delete link_delays[i];
link_delays[i] = generator;
}
bool Network::run(double time_limit) {
int i, n = nodes.size(), waiting;
bool pending_update;
double min_timeout, timeout, next_update;
while (time < time_limit) {
for (i = 0, waiting = 0; i < n; i++)
if (nodes[i]->waiting())
waiting++;
else
stats[i].update_wakeup_stats();
while (waiting < n) {
for (i = 0; i < n; i++) {
if (nodes[i]->waiting())
continue;
if (!nodes[i]->process_fd()) {
fprintf(stderr, "client %d failed.\n", i + 1);
return false;
}
if (nodes[i]->waiting())
waiting++;
}
}
do {
min_timeout = nodes[0]->get_timeout();
for (i = 1; i < n; i++) {
timeout = nodes[i]->get_timeout();
if (min_timeout > timeout)
min_timeout = timeout;
}
timeout = packet_queue.get_timeout(time);
if (timeout <= min_timeout)
min_timeout = timeout;
next_update = floor(time) + (double)(update_count + 1) / update_rate;
timeout = next_update - time;
if (timeout <= min_timeout) {
min_timeout = timeout;
pending_update = true;
} else
pending_update = false;
//min_timeout += 1e-12;
assert(min_timeout >= 0.0);
if (pending_update)
time = next_update;
else
time += min_timeout;
for (i = 0; i < n; i++)
nodes[i]->get_clock()->advance(min_timeout);
if (pending_update)
update();
} while (pending_update && time < time_limit);
for (i = 0; i < n; i++)
nodes[i]->resume();
while (packet_queue.get_timeout(time) <= 0) {
assert(packet_queue.get_timeout(time) > -1e-10);
struct Packet *packet = packet_queue.dequeue();
stats[packet->to].update_packet_stats(true, time, packet->delay);
nodes[packet->to]->receive(packet);
}
}
return true;
}
void Network::update() {
int i, n = nodes.size();
update_count++;
update_count %= update_rate;
for (i = 0; i < n; i++) {
nodes[i]->get_clock()->update(update_count == 0);
nodes[i]->get_refclock()->update(time, nodes[i]->get_clock());
}
update_clock_stats();
}
void Network::update_clock_stats() {
int i, n = nodes.size();
if (offset_log) {
for (i = 0; i < n; i++)
fprintf(offset_log, "%.9f%c", nodes[i]->get_clock()->get_real_time() - time, i + 1 < n ? '\t' : '\n');
}
if (freq_log) {
for (i = 0; i < n; i++)
fprintf(freq_log, "%e%c", nodes[i]->get_clock()->get_total_freq() - 1.0, i + 1 < n ? '\t' : '\n');
}
if (rawfreq_log) {
for (i = 0; i < n; i++)
fprintf(rawfreq_log, "%e%c", nodes[i]->get_clock()->get_raw_freq() - 1.0, i + 1 < n ? '\t' : '\n');
}
for (i = 0; i < n; i++)
stats[i].update_clock_stats(nodes[i]->get_clock()->get_real_time() - time,
nodes[i]->get_clock()->get_total_freq() - 1.0,
nodes[i]->get_clock()->get_raw_freq() - 1.0);
}
void Network::open_offset_log(const char *log) {
offset_log = fopen(log, "w");
}
void Network::open_freq_log(const char *log) {
freq_log = fopen(log, "w");
}
void Network::open_rawfreq_log(const char *log) {
rawfreq_log = fopen(log, "w");
}
void Network::open_packet_log(const char *log) {
packet_log = fopen(log, "w");
}
void Network::print_stats(int verbosity) const {
int i, n = nodes.size();
if (verbosity <= 0)
return;
for (i = 0; i < n; i++) {
if (verbosity > 1)
printf("\n---------------------- Node %d ----------------------\n\n", i + 1);
stats[i].print(verbosity);
}
if (verbosity == 1)
printf("\n");
}
void Network::reset_stats() {
int i, n = nodes.size();
for (i = 0; i < n; i++)
stats[i].reset();
}
void Network::reset_clock_stats() {
int i, n = nodes.size();
for (i = 0; i < n; i++)
stats[i].reset_clock_stats();
}
void Network::send(struct Packet *packet) {
double delay = -1.0;
unsigned int i;
/* broadcast */
if (packet->to == (unsigned int)-1) {
for (i = 0; i < nodes.size(); i++) {
struct Packet *p;
if (i == packet->from)
continue;
p = new struct Packet;
memcpy(p, packet, sizeof (struct Packet));
p->to = i;
send(p);
}
delete packet;
return;
}
assert(packet->to < nodes.size() && packet->from < nodes.size() &&
packet->subnet < subnets);
i = packet->from * nodes.size() + packet->to;
if (link_delays[i]) {
link_delay_variables["time"] = time;
link_delay_variables["from"] = packet->from + 1;
link_delay_variables["to"] = packet->to + 1;
link_delay_variables["subnet"] = packet->subnet + 1;
link_delay_variables["port"] = packet->dst_port;
link_delay_variables["length"] = packet->len;
delay = link_delays[i]->generate(&link_delay_variables);
}
stats[packet->from].update_packet_stats(false, time, delay);
if (packet_log)
fprintf(packet_log, "%e\t%d\t%d\t%e\t%d\t%d\t%d\n", time,
packet->from + 1, packet->to + 1, delay,
packet->src_port, packet->dst_port,
packet->subnet + 1);
if (delay > 0.0) {
packet->receive_time = time + delay;
packet->delay = delay;
packet_queue.insert(packet);
#ifdef DEBUG
printf("sending packet from %d to %d:%d:%d at %f delay %f \n",
packet->from, packet->subnet, packet->to,
packet->dst_port, time, delay);
#endif
} else {
#ifdef DEBUG
printf("dropping packet from %d to %d:%d:%d at %f\n",
packet->from, packet->subnet, packet->to,
packet->dst_port, time);
#endif
delete packet;
}
}
double Network::get_time() const {
return time;
}
unsigned int Network::get_subnets() const {
return subnets;
}