#undef __STRICT_ANSI__ #include #include #include #include #include #include "chan.h" int passed = 0; void assert_true(int expression, chan_t* chan, char* msg) { if (!expression) { chan_dispose(chan); fprintf(stderr, "Assertion failed: %s\n", msg); exit(1); } } void pass() { printf("."); fflush(stdout); passed++; } void wait_for_reader(chan_t* chan) { for (;;) { pthread_mutex_lock(&chan->m_mu); int send = chan->r_waiting > 0; pthread_mutex_unlock(&chan->m_mu); if (send) break; sched_yield(); } } void wait_for_writer(chan_t* chan) { for (;;) { pthread_mutex_lock(&chan->m_mu); int recv = chan->w_waiting > 0; pthread_mutex_unlock(&chan->m_mu); if (recv) break; sched_yield(); } } void test_chan_init_buffered() { size_t size = 5; chan_t* chan = chan_init(size); assert_true(chan->queue != NULL, chan, "Queue is NULL"); assert_true((size_t) chan->queue->capacity == size, chan, "Size is not 5"); assert_true(!chan->closed, chan, "Chan is closed"); chan_dispose(chan); pass(); } void test_chan_init_unbuffered() { chan_t* chan = chan_init(0); assert_true(chan->queue == NULL, chan, "Queue is not NULL"); assert_true(!chan->closed, chan, "Chan is closed"); chan_dispose(chan); pass(); } void test_chan_init() { test_chan_init_buffered(); test_chan_init_unbuffered(); } void test_chan_close() { chan_t* chan = chan_init(0); assert_true(!chan->closed, chan, "Chan is closed"); assert_true(!chan_is_closed(chan), chan, "Chan is closed"); assert_true(chan_close(chan) == 0, chan, "Close failed"); assert_true(chan->closed, chan, "Chan is not closed"); assert_true(chan_close(chan) == -1, chan, "Close succeeded"); assert_true(chan_is_closed(chan), chan, "Chan is not closed"); chan_dispose(chan); pass(); } void test_chan_send_buffered() { chan_t* chan = chan_init(1); void* msg = "foo"; assert_true(chan_size(chan) == 0, chan, "Queue is not empty"); assert_true(chan_send(chan, msg) == 0, chan, "Send failed"); assert_true(chan_size(chan) == 1, chan, "Queue is empty"); chan_dispose(chan); pass(); } void* receiver(void* chan) { void* msg; chan_recv(chan, &msg); return NULL; } void test_chan_send_unbuffered() { chan_t* chan = chan_init(0); void* msg = "foo"; pthread_t th; pthread_create(&th, NULL, receiver, chan); wait_for_reader(chan); assert_true(chan_size(chan) == 0, chan, "Chan size is not 0"); assert_true(!chan->w_waiting, chan, "Chan has sender"); assert_true(chan_send(chan, msg) == 0, chan, "Send failed"); assert_true(!chan->w_waiting, chan, "Chan has sender"); assert_true(chan_size(chan) == 0, chan, "Chan size is not 0"); pthread_join(th, NULL); chan_dispose(chan); pass(); } void test_chan_send() { test_chan_send_buffered(); test_chan_send_unbuffered(); } void test_chan_recv_buffered() { chan_t* chan = chan_init(1); void* msg = "foo"; assert_true(chan_size(chan) == 0, chan, "Queue is not empty"); chan_send(chan, msg); assert_true(chan_size(chan) == 1, chan, "Queue is empty"); void* received; assert_true(chan_recv(chan, &received) == 0, chan, "Recv failed"); assert_true(msg == received, chan, "Messages are not equal"); assert_true(chan_size(chan) == 0, chan, "Queue is not empty"); chan_dispose(chan); pass(); } void* sender(void* chan) { chan_send(chan, "foo"); return NULL; } void test_chan_recv_unbuffered() { chan_t* chan = chan_init(0); pthread_t th; pthread_create(&th, NULL, sender, chan); assert_true(chan_size(chan) == 0, chan, "Chan size is not 0"); assert_true(!chan->r_waiting, chan, "Chan has reader"); void *msg; assert_true(chan_recv(chan, &msg) == 0, chan, "Recv failed"); assert_true(strcmp(msg, "foo") == 0, chan, "Messages are not equal"); assert_true(!chan->r_waiting, chan, "Chan has reader"); assert_true(chan_size(chan) == 0, chan, "Chan size is not 0"); pthread_join(th, NULL); chan_dispose(chan); pass(); } void test_chan_recv() { test_chan_recv_buffered(); test_chan_recv_unbuffered(); } void test_chan_select_recv() { chan_t* chan1 = chan_init(0); chan_t* chan2 = chan_init(1); chan_t* chans[2] = {chan1, chan2}; void* recv; void* msg = "foo"; chan_send(chan2, msg); switch(chan_select(chans, 2, &recv, NULL, 0, NULL)) { case 0: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Received on wrong channel\n"); exit(1); case 1: if (strcmp(recv, msg) != 0) { chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Messages are not equal\n"); exit(1); } recv = NULL; break; default: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Received on no channels\n"); exit(1); } pthread_t th; pthread_create(&th, NULL, sender, chan1); wait_for_writer(chan1); switch(chan_select(chans, 2, &recv, NULL, 0, NULL)) { case 0: if (strcmp(recv, "foo") != 0) { chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Messages are not equal\n"); exit(1); } recv = NULL; break; case 1: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Received on wrong channel\n"); exit(1); default: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Received on no channels\n"); exit(1); } switch(chan_select(&chan2, 1, &msg, NULL, 0, NULL)) { case 0: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Received on channel\n"); exit(1); default: break; } pthread_join(th, NULL); chan_dispose(chan1); chan_dispose(chan2); pass(); } void test_chan_select_send() { chan_t* chan1 = chan_init(0); chan_t* chan2 = chan_init(1); chan_t* chans[2] = {chan1, chan2}; void* msg[] = {"foo", "bar"}; switch(chan_select(NULL, 0, NULL, chans, 2, msg)) { case 0: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Sent on wrong channel"); exit(1); case 1: break; default: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Sent on no channels"); exit(1); } void* recv; chan_recv(chan2, &recv); if (strcmp(recv, "bar") != 0) { chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Messages are not equal"); exit(1); } chan_send(chan2, "foo"); pthread_t th; pthread_create(&th, NULL, receiver, chan1); wait_for_reader(chan1); switch(chan_select(NULL, 0, NULL, chans, 2, msg)) { case 0: break; case 1: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Sent on wrong channel"); exit(1); default: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Sent on no channels"); exit(1); } switch(chan_select(NULL, 0, NULL, &chan1, 1, msg[0])) { case 0: chan_dispose(chan1); chan_dispose(chan2); fprintf(stderr, "Sent on channel"); exit(1); default: break; } pthread_join(th, NULL); chan_dispose(chan1); chan_dispose(chan2); pass(); } void test_chan_select() { test_chan_select_recv(); test_chan_select_send(); } void test_chan_int() { chan_t* chan = chan_init(1); int s = 12345, r = 0; chan_send_int(chan, s); chan_recv_int(chan, &r); assert_true(s == r, chan, "Wrong value of int(12345)"); int32_t s32 = 12345, r32 = 0; chan_send_int32(chan, s32); chan_recv_int32(chan, &r32); assert_true(s32 == r32, chan, "Wrong value of int32(12345)"); int64_t s64 = 12345, r64 = 0; chan_send_int64(chan, s64); chan_recv_int64(chan, &r64); assert_true(s64 == r64, chan, "Wrong value of int64(12345)"); chan_dispose(chan); pass(); } void test_chan_double() { chan_t* chan = chan_init(1); double s = 123.45, r = 0; chan_send_double(chan, s); chan_recv_double(chan, &r); assert_true(s == r, chan, "Wrong value of double(123.45)"); chan_dispose(chan); pass(); } void test_chan_buf() { chan_t* chan = chan_init(1); char s[256], r[256]; strcpy(s, "hello world"); chan_send_buf(chan, s, sizeof(s)); strcpy(s, "Hello World"); chan_recv_buf(chan, &r, sizeof(s)); assert_true(memcmp(s, r, sizeof(s)), chan, "Wrong value of buf"); chan_dispose(chan); pass(); } void test_chan_multi() { chan_t* chan = chan_init(5); pthread_t th[100]; for (int i = 0; i < 50; ++i) { pthread_create(&th[i], NULL, sender, chan); } for (;;) { pthread_mutex_lock(&chan->m_mu); int all_waiting = chan->w_waiting == 45; pthread_mutex_unlock(&chan->m_mu); if (all_waiting) break; sched_yield(); } for (int i = 50; i < 100; ++i) { pthread_create(&th[i], NULL, receiver, chan); } for (int i = 0; i < 100; ++i) { pthread_join(th[i], NULL); } chan_dispose(chan); pass(); } void test_chan_multi2() { chan_t* chan = chan_init(5); pthread_t th[100]; for (int i = 0; i < 100; ++i) { pthread_create(&th[i], NULL, receiver, chan); } for (;;) { pthread_mutex_lock(&chan->m_mu); int all_waiting = chan->r_waiting == 100; pthread_mutex_unlock(&chan->m_mu); if (all_waiting) break; sched_yield(); } // Simulate 5 high-priority writing threads. pthread_mutex_lock(&chan->m_mu); for (int i = 0; i < 5; ++i) { assert_true(0 == queue_add(chan->queue, "foo"), chan, "Simulate writer thread"); pthread_cond_signal(&chan->r_cond); // wakeup reader } // Simulate 6th high-priority waiting writer. assert_true(chan->queue->size == chan->queue->capacity, chan, "6th writer has to wait"); chan->w_waiting++; // Simulated writer must be woken up by reader. pthread_cond_wait(&chan->w_cond, &chan->m_mu); chan->w_waiting--; pthread_mutex_unlock(&chan->m_mu); // Wake up other waiting reader. for (int i = 5; i < 100; ++i) { chan_send(chan, "foo"); } for (int i = 0; i < 100; ++i) { pthread_join(th[i], NULL); } chan_dispose(chan); pass(); } int main() { test_chan_init(); test_chan_close(); test_chan_send(); test_chan_recv(); test_chan_select(); test_chan_int(); test_chan_double(); test_chan_buf(); test_chan_multi(); test_chan_multi2(); printf("\n%d passed\n", passed); return 0; }