#include <chrono>
#include <condition_variable>
#include <cstring>
#include <giomm.h>
#include <glibmm.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
namespace
{
Glib::RefPtr<Glib::MainLoop> loop;
bool verbose = false;
bool non_blocking = false;
bool use_udp = false;
bool use_source = false;
bool use_ipv6 = false;
int cancel_timeout = 0;
bool stop_thread = false;
std::mutex mutex_thread;
std::condition_variable cond_thread;
class ClientOptionGroup : public Glib::OptionGroup
{
public:
ClientOptionGroup() : Glib::OptionGroup("client_group", "", "")
{
Glib::OptionEntry entry;
entry.set_long_name("cancel");
entry.set_short_name('c');
entry.set_description("Cancel any op after the specified amount of seconds");
add_entry(entry, cancel_timeout);
entry.set_long_name("udp");
entry.set_short_name('u');
entry.set_description("Use UDP instead of TCP");
add_entry(entry, use_udp);
entry.set_long_name("verbose");
entry.set_short_name('v');
entry.set_description("Be verbose");
add_entry(entry, verbose);
entry.set_long_name("non-blocking");
entry.set_short_name('n');
entry.set_description("Enable non-blocking I/O");
add_entry(entry, non_blocking);
entry.set_long_name("use-source");
entry.set_short_name('s');
entry.set_description("Use Gio::SocketSource to wait for non-blocking I/O");
add_entry(entry, use_source);
entry.set_long_name("use-ipv6");
entry.set_short_name('6');
entry.set_description("Use IPv6 address family");
add_entry(entry, use_ipv6);
}
};
Glib::ustring
socket_address_to_string(const Glib::RefPtr<Gio::SocketAddress>& address)
{
Glib::RefPtr<Gio::InetAddress> inet_address;
Glib::ustring str, res;
int port;
auto isockaddr = Glib::RefPtr<Gio::InetSocketAddress>::cast_dynamic(address);
if (!isockaddr)
return Glib::ustring();
inet_address = isockaddr->get_address();
str = inet_address->to_string();
port = isockaddr->get_port();
res = Glib::ustring::compose("%1:%2", str, port);
return res;
}
static bool source_ready(Glib::IOCondition /*condition*/)
{
loop->quit();
return false;
}
static void
ensure_condition(const Glib::RefPtr<Gio::Socket>& socket, const Glib::ustring& where,
const Glib::RefPtr<Gio::Cancellable>& cancellable, Glib::IOCondition condition)
{
if (!non_blocking)
return;
if (use_source)
{
auto source = socket->create_source(condition, cancellable);
source->connect(sigc::ptr_fun(&source_ready));
source->attach();
loop->run();
}
else
{
try
{
socket->condition_wait(condition, cancellable);
}
catch (const Gio::Error& error)
{
std::cerr << Glib::ustring::compose("condition wait error for %1: %2\n", where, error.what());
exit(1);
}
}
}
static void
cancel_thread(Glib::RefPtr<Gio::Cancellable> cancellable)
{
std::unique_lock<std::mutex> lock(mutex_thread);
if (!cond_thread.wait_for(
lock, std::chrono::seconds(cancel_timeout), []() { return stop_thread; }))
{
// !stop_thread, i.e. timeout
std::cout << "Cancelling\n";
cancellable->cancel();
}
}
class JoinAndDelete
{
public:
void operator()(std::thread* thread)
{
stop_thread = true;
cond_thread.notify_all();
thread->join();
delete thread;
}
};
} // end anonymous namespace
int
main(int argc, char* argv[])
{
Glib::RefPtr<Gio::Socket> socket;
Glib::RefPtr<Gio::SocketAddress> src_address;
Glib::RefPtr<Gio::SocketAddress> address;
Gio::SocketType socket_type;
Gio::SocketFamily socket_family;
Glib::RefPtr<Gio::Cancellable> cancellable;
Glib::RefPtr<Gio::SocketAddressEnumerator> enumerator;
Glib::RefPtr<Gio::SocketConnectable> connectable;
Gio::init();
Glib::OptionContext option_context(" <hostname>[:port] - Test Gio::Socket client stuff");
option_context.set_summary("Default port: 7777\n"
"For a local test with socket-server:\n"
" ./socket-client [option...] localhost\n"
"or, if that fails\n"
" ./socket-client [option...] 127.0.0.1 (IPv4)\n"
" ./socket-client [option...] ::1 (IPv6)");
ClientOptionGroup option_group;
option_context.set_main_group(option_group);
try
{
option_context.parse(argc, argv);
}
catch (const Glib::Error& error)
{
std::cerr << Glib::ustring::compose("%1: %2\n", argv[0], error.what());
return 1;
}
if (argc != 2)
{
const auto error_message = "Need to specify hostname";
std::cerr << Glib::ustring::compose("%1: %2\n", argv[0], error_message);
return 1;
}
std::unique_ptr<std::thread, JoinAndDelete> thread;
if (cancel_timeout)
{
cancellable = Gio::Cancellable::create();
thread.reset(new std::thread(&cancel_thread, cancellable));
}
loop = Glib::MainLoop::create();
socket_type = use_udp ? Gio::SOCKET_TYPE_DATAGRAM : Gio::SOCKET_TYPE_STREAM;
socket_family = use_ipv6 ? Gio::SOCKET_FAMILY_IPV6 : Gio::SOCKET_FAMILY_IPV4;
try
{
socket = Gio::Socket::create(socket_family, socket_type, Gio::SOCKET_PROTOCOL_DEFAULT);
}
catch (const Gio::Error& error)
{
std::cerr << Glib::ustring::compose("%1: %2\n", argv[0], error.what());
return 1;
}
try
{
connectable = Gio::NetworkAddress::parse(argv[1], 7777);
}
catch (const Gio::Error& error)
{
std::cerr << Glib::ustring::compose("%1: %2\n", argv[0], error.what());
return 1;
}
enumerator = connectable->enumerate();
while (true)
{
try
{
address = enumerator->next(cancellable);
if (!address)
{
std::cerr << Glib::ustring::compose("%1: No more addresses to try\n", argv[0]);
return 1;
}
}
catch (const Gio::Error& error)
{
std::cerr << Glib::ustring::compose("%1: %2\n", argv[0], error.what());
return 1;
}
try
{
socket->connect(address, cancellable);
break;
}
catch (const Gio::Error& error)
{
std::cerr << Glib::ustring::compose("%1: Connection to %2 failed: %3, trying next\n", argv[0],
socket_address_to_string(address), error.what());
}
}
std::cout << Glib::ustring::compose("Connected to %1\n", socket_address_to_string(address));
/* TODO: Test non-blocking connect */
if (non_blocking)
socket->set_blocking(false);
try
{
src_address = socket->get_local_address();
}
catch (const Gio::Error& error)
{
std::cerr << Glib::ustring::compose("Error getting local address: %1\n", error.what());
return 1;
}
std::cout << Glib::ustring::compose("local address: %1\n", socket_address_to_string(src_address));
while (true)
{
gchar buffer[4096] = {};
gssize size;
gsize to_send;
if (!std::cin.getline(buffer, sizeof buffer - 1))
break;
to_send = strlen(buffer);
buffer[to_send++] = '\n';
buffer[to_send] = '\0';
while (to_send > 0)
{
ensure_condition(socket, "send", cancellable, Glib::IO_OUT);
try
{
if (use_udp)
size = socket->send_to(address, buffer, to_send, cancellable);
else
size = socket->send(buffer, to_send, cancellable);
}
catch (const Gio::Error& error)
{
if (error.code() == Gio::Error::WOULD_BLOCK)
{
std::cout << "socket send would block, handling\n";
continue;
}
else
{
std::cerr << Glib::ustring::compose("Error sending to socket: %1\n", error.what());
return 1;
}
}
std::cout << Glib::ustring::compose("sent %1 bytes of data\n", size);
if (size == 0)
{
std::cerr << "Unexpected short write\n";
return 1;
}
to_send -= size;
}
ensure_condition(socket, "receive", cancellable, Glib::IO_IN);
try
{
if (use_udp)
size = socket->receive_from(src_address, buffer, sizeof buffer, cancellable);
else
size = socket->receive(buffer, sizeof buffer, cancellable);
}
catch (const Gio::Error& error)
{
std::cerr << Glib::ustring::compose("Error receiving from socket: %1\n", error.what());
return 1;
}
if (size == 0)
break;
std::cout << Glib::ustring::compose("received %1 bytes of data", size);
if (use_udp)
std::cout << Glib::ustring::compose(" from %1", socket_address_to_string(src_address));
std::cout << std::endl;
if (verbose)
g_print("-------------------------\n"
"%.*s"
"-------------------------\n",
(int)size, buffer);
}
std::cout << "closing socket\n";
try
{
socket->close();
}
catch (const Gio::Error& error)
{
std::cerr << Glib::ustring::compose("Error closing master socket: %1\n", error.what());
return 1;
}
return 0;
}