/* t-poll.c - Check the poll function * Copyright (C) 2015 g10 Code GmbH * * This file is part of libgpg-error. * * libgpg-error is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * libgpg-error is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this program; if not, see . */ /* FIXME: We need much better tests that this very basic one. */ #if HAVE_CONFIG_H # include #endif #include #include #include #include #include #include #include #ifdef _WIN32 # include # include #else # ifdef USE_POSIX_THREADS # include # endif #endif #define PGM "t-poll" #include "t-common.h" #ifdef _WIN32 # define THREAD_RET_TYPE DWORD WINAPI # define THREAD_RET_VALUE 0 #else # define THREAD_RET_TYPE void * # define THREAD_RET_VALUE NULL #endif /* Object to convey data to a thread. */ struct thread_arg { const char *name; estream_t stream; volatile int stop_me; #ifdef USE_POSIX_THREADS pthread_t thread; #elif _WIN32 HANDLE thread; #endif }; static struct thread_arg peer_stdin; /* Thread to feed the stdin. */ static struct thread_arg peer_stdout; /* Thread to feed the stdout. */ static struct thread_arg peer_stderr; /* Thread to feed the stderr. */ static estream_t test_stdin; static estream_t test_stdout; static estream_t test_stderr; #if defined(_WIN32) || defined(USE_POSIX_THREADS) /* This thread feeds data to the given stream. */ static THREAD_RET_TYPE producer_thread (void *argaddr) { struct thread_arg *arg = argaddr; int i = 0; (void)arg; while (!arg->stop_me && i++ < 3) { show ("thread '%s' about to write\n", arg->name); es_fprintf (arg->stream, "This is '%s' count=%d\n", arg->name, i); es_fflush (arg->stream); } es_fclose (arg->stream); return THREAD_RET_VALUE; } /* This thread eats data from the given stream. */ static THREAD_RET_TYPE consumer_thread (void *argaddr) { struct thread_arg *arg = argaddr; char buf[15]; (void)arg; while (!arg->stop_me) { show ("thread '%s' ready to read\n", arg->name); if (!es_fgets (buf, sizeof buf, arg->stream)) { show ("Thread '%s' received EOF or error\n", arg->name); break; } show ("Thread '%s' got: '%s'\n", arg->name, buf); } es_fclose (arg->stream); return THREAD_RET_VALUE; } #endif /*_WIN32 || USE_POSIX_THREADS */ static void launch_thread (THREAD_RET_TYPE (*fnc)(void *), struct thread_arg *th) { int fd; th->stop_me = 0; fd = es_fileno (th->stream); #ifdef _WIN32 th->thread = CreateThread (NULL, 0, fnc, th, 0, NULL); if (!th->thread) die ("creating thread '%s' failed: rc=%d", th->name, (int)GetLastError ()); show ("thread '%s' launched (fd=%d)\n", th->name, fd); #elif USE_POSIX_THREADS if (pthread_create (&th->thread, NULL, fnc, th)) die ("creating thread '%s' failed: %s\n", th->name, strerror (errno)); show ("thread '%s' launched (fd=%d)\n", th->name, fd); # else /* no thread support */ verbose++; show ("no thread support - skipping test\n", PGM); verbose--; #endif /* no thread support */ } static void join_thread (struct thread_arg *th) { #ifdef _WIN32 int rc; rc = WaitForSingleObject (th->thread, INFINITE); if (rc == WAIT_OBJECT_0) show ("thread '%s' has terminated\n", th->name); else fail ("waiting for thread '%s' failed: %d", th->name, (int)GetLastError ()); CloseHandle (th->thread); #elif USE_POSIX_THREADS pthread_join (th->thread, NULL); show ("thread '%s' has terminated\n", th->name); #endif } static void create_pipe (estream_t *r_in, estream_t *r_out) { gpg_error_t err; int filedes[2]; #ifdef _WIN32 if (_pipe (filedes, 512, 0) == -1) #else if (pipe (filedes) == -1) #endif { err = gpg_error_from_syserror (); die ("error creating a pipe: %s\n", gpg_strerror (err)); } show ("created pipe [%d, %d]\n", filedes[0], filedes[1]); *r_in = es_fdopen (filedes[0], "r,pollable"); if (!*r_in) { err = gpg_error_from_syserror (); die ("error creating a stream for a pipe: %s\n", gpg_strerror (err)); } *r_out = es_fdopen (filedes[1], "w,pollable"); if (!*r_out) { err = gpg_error_from_syserror (); die ("error creating a stream for a pipe: %s\n", gpg_strerror (err)); } } static void test_poll (void) { int ret; gpgrt_poll_t fds[3]; char buffer[16]; size_t used, nwritten; int c; memset (fds, 0, sizeof fds); fds[0].stream = test_stdin; fds[0].want_read = 1; fds[1].stream = test_stdout; fds[1].want_write = 1; /* FIXME: We don't use the next stream at all. */ fds[2].stream = test_stderr; fds[2].want_write = 1; fds[2].ignore = 1; used = 0; while (used || !fds[0].ignore) { ret = gpgrt_poll (fds, DIM(fds), -1); if (ret == -1) { fail ("gpgrt_poll failed: %s\n", strerror (errno)); continue; } if (!ret) { fail ("gpgrt_poll unexpectedly timed out\n"); continue; } show ("gpgrt_poll detected %d events\n", ret); if (debug) show ("gpgrt_poll: r=%d" " 0:%c%c%c%c%c%c%c%c%c%c%c%c" " 1:%c%c%c%c%c%c%c%c%c%c%c%c" " 2:%c%c%c%c%c%c%c%c%c%c%c%c" "\n", ret, fds[0].want_read? 'r':'-', fds[0].want_write? 'w':'-', fds[0].want_oob? 'o':'-', fds[0].want_rdhup? 'h':'-', fds[0].ignore? '!':'=', fds[0].got_read? 'r':'-', fds[0].got_write? 'w':'-', fds[0].got_oob? 'o':'-', fds[0].got_rdhup? 'h':'-', fds[0].got_hup? 'H':' ', fds[0].got_err? 'e':' ', fds[0].got_nval? 'n':' ', fds[1].want_read? 'r':'-', fds[1].want_write? 'w':'-', fds[1].want_oob? 'o':'-', fds[1].want_rdhup? 'h':'-', fds[1].ignore? '!':'=', fds[1].got_read? 'r':'-', fds[1].got_write? 'w':'-', fds[1].got_oob? 'o':'-', fds[1].got_rdhup? 'h':'-', fds[1].got_hup? 'H':' ', fds[1].got_err? 'e':' ', fds[1].got_nval? 'n':' ', fds[2].want_read? 'r':'-', fds[2].want_write? 'w':'-', fds[2].want_oob? 'o':'-', fds[2].want_rdhup? 'h':'-', fds[2].ignore? '!':'=', fds[2].got_read? 'r':'-', fds[2].got_write? 'w':'-', fds[2].got_oob? 'o':'-', fds[2].got_rdhup? 'h':'-', fds[2].got_hup? 'H':' ', fds[2].got_err? 'e':' ', fds[2].got_nval? 'n':' ' ); else show ("gpgrt_poll detected %d events\n", ret); if (fds[0].got_read) { /* Read from the producer. */ for (;;) { c = es_fgetc (fds[0].stream); if (c == EOF) { if (es_feof (fds[0].stream)) { show ("reading '%s': EOF\n", peer_stdin.name); fds[0].ignore = 1; /* Not anymore needed. */ peer_stdin.stop_me = 1; /* Tell the thread to stop. */ } else if (es_ferror (fds[0].stream)) { fail ("error reading '%s': %s\n", peer_stdin.name, strerror (errno)); fds[0].ignore = 1; /* Disable. */ peer_stdin.stop_me = 1; /* Tell the thread to stop. */ } else show ("reading '%s': EAGAIN\n", peer_stdin.name); break; } else { if (used <= sizeof buffer -1) buffer[used++] = c; if (used == sizeof buffer) { show ("throttling reading from '%s'\n", peer_stdin.name); fds[0].ignore = 1; break; } } } show ("read from '%s': %zu bytes\n", peer_stdin.name, used); if (used) fds[1].ignore = 0; /* Data to send. */ } if (fds[1].got_write) { if (used) { ret = es_write (fds[1].stream, buffer, used, &nwritten); show ("result for writing to '%s': ret=%d, n=%zu, nwritten=%zu\n", peer_stdout.name, ret, used, nwritten); if (!ret) { assert (nwritten <= used); /* Move the remaining data to the front of buffer. */ memmove (buffer, buffer + nwritten, sizeof buffer - nwritten); used -= nwritten; } ret = es_fflush (fds[1].stream); if (ret) fail ("Flushing for '%s' failed: %s\n", peer_stdout.name, strerror (errno)); } if (!used) fds[1].ignore = 1; /* No need to send data. */ } if (used < sizeof buffer / 2 && !peer_stdin.stop_me && fds[0].ignore) { show ("accelerate reading from '%s'\n", peer_stdin.name); fds[0].ignore = 0; } } } int main (int argc, char **argv) { int last_argc = -1; if (argc) { argc--; argv++; } while (argc && last_argc != argc ) { last_argc = argc; if (!strcmp (*argv, "--help")) { puts ( "usage: ./t-poll [options]\n" "\n" "Options:\n" " --verbose Show what is going on\n" " --debug Flyswatter\n" ); exit (0); } if (!strcmp (*argv, "--verbose")) { verbose = 1; argc--; argv++; } else if (!strcmp (*argv, "--debug")) { verbose = debug = 1; argc--; argv++; } } if (!gpg_error_check_version (GPG_ERROR_VERSION)) { die ("gpg_error_check_version returned an error"); errorcount++; } peer_stdin.name = "stdin producer"; create_pipe (&test_stdin, &peer_stdin.stream); peer_stdout.name = "stdout consumer"; create_pipe (&peer_stdout.stream, &test_stdout); peer_stderr.name = "stderr consumer"; create_pipe (&peer_stderr.stream, &test_stderr); if (es_set_nonblock (test_stdin, 1)) fail ("error setting test_stdin to nonblock: %s\n", strerror (errno)); if (es_set_nonblock (test_stdout, 1)) fail ("error setting test_stdout to nonblock: %s\n", strerror (errno)); if (es_set_nonblock (test_stderr, 1)) fail ("error setting test_stderr to nonblock: %s\n", strerror (errno)); launch_thread (producer_thread, &peer_stdin ); launch_thread (consumer_thread, &peer_stdout); launch_thread (consumer_thread, &peer_stderr); test_poll (); show ("Waiting for threads to terminate...\n"); es_fclose (test_stdin); es_fclose (test_stdout); es_fclose (test_stderr); peer_stdin.stop_me = 1; peer_stdout.stop_me = 1; peer_stderr.stop_me = 1; join_thread (&peer_stdin); join_thread (&peer_stdout); join_thread (&peer_stderr); return errorcount ? 1 : 0; }