/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include <assert.h>
#include <stdlib.h>
#include "uv.h"
#include "internal.h"
#include "handle-inl.h"
#include "stream-inl.h"
#include "req-inl.h"
/*
* Threshold of active tcp streams for which to preallocate tcp read buffers.
* (Due to node slab allocator performing poorly under this pattern,
* the optimization is temporarily disabled (threshold=0). This will be
* revisited once node allocator is improved.)
*/
const unsigned int uv_active_tcp_streams_threshold = 0;
/*
* Number of simultaneous pending AcceptEx calls.
*/
const unsigned int uv_simultaneous_server_accepts = 32;
/* A zero-size buffer for use by uv_tcp_read */
static char uv_zero_[] = "";
static int uv__tcp_nodelay(uv_tcp_t* handle, SOCKET socket, int enable) {
if (setsockopt(socket,
IPPROTO_TCP,
TCP_NODELAY,
(const char*)&enable,
sizeof enable) == -1) {
return WSAGetLastError();
}
return 0;
}
static int uv__tcp_keepalive(uv_tcp_t* handle, SOCKET socket, int enable, unsigned int delay) {
if (setsockopt(socket,
SOL_SOCKET,
SO_KEEPALIVE,
(const char*)&enable,
sizeof enable) == -1) {
return WSAGetLastError();
}
if (enable && setsockopt(socket,
IPPROTO_TCP,
TCP_KEEPALIVE,
(const char*)&delay,
sizeof delay) == -1) {
return WSAGetLastError();
}
return 0;
}
static int uv_tcp_set_socket(uv_loop_t* loop,
uv_tcp_t* handle,
SOCKET socket,
int family,
int imported) {
DWORD yes = 1;
int non_ifs_lsp;
int err;
if (handle->socket != INVALID_SOCKET)
return UV_EBUSY;
/* Set the socket to nonblocking mode */
if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) {
return WSAGetLastError();
}
/* Make the socket non-inheritable */
if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0))
return GetLastError();
/* Associate it with the I/O completion port. Use uv_handle_t pointer as
* completion key. */
if (CreateIoCompletionPort((HANDLE)socket,
loop->iocp,
(ULONG_PTR)socket,
0) == NULL) {
if (imported) {
handle->flags |= UV_HANDLE_EMULATE_IOCP;
} else {
return GetLastError();
}
}
if (family == AF_INET6) {
non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv6;
} else {
non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv4;
}
if (!(handle->flags & UV_HANDLE_EMULATE_IOCP) && !non_ifs_lsp) {
UCHAR sfcnm_flags =
FILE_SKIP_SET_EVENT_ON_HANDLE | FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
if (!SetFileCompletionNotificationModes((HANDLE) socket, sfcnm_flags))
return GetLastError();
handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
}
if (handle->flags & UV_HANDLE_TCP_NODELAY) {
err = uv__tcp_nodelay(handle, socket, 1);
if (err)
return err;
}
/* TODO: Use stored delay. */
if (handle->flags & UV_HANDLE_TCP_KEEPALIVE) {
err = uv__tcp_keepalive(handle, socket, 1, 60);
if (err)
return err;
}
handle->socket = socket;
if (family == AF_INET6) {
handle->flags |= UV_HANDLE_IPV6;
} else {
assert(!(handle->flags & UV_HANDLE_IPV6));
}
return 0;
}
int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
int domain;
/* Use the lower 8 bits for the domain */
domain = flags & 0xFF;
if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
return UV_EINVAL;
if (flags & ~0xFF)
return UV_EINVAL;
uv_stream_init(loop, (uv_stream_t*) handle, UV_TCP);
handle->tcp.serv.accept_reqs = NULL;
handle->tcp.serv.pending_accepts = NULL;
handle->socket = INVALID_SOCKET;
handle->reqs_pending = 0;
handle->tcp.serv.func_acceptex = NULL;
handle->tcp.conn.func_connectex = NULL;
handle->tcp.serv.processed_accepts = 0;
handle->delayed_error = 0;
/* If anything fails beyond this point we need to remove the handle from
* the handle queue, since it was added by uv__handle_init in uv_stream_init.
*/
if (domain != AF_UNSPEC) {
SOCKET sock;
DWORD err;
sock = socket(domain, SOCK_STREAM, 0);
if (sock == INVALID_SOCKET) {
err = WSAGetLastError();
QUEUE_REMOVE(&handle->handle_queue);
return uv_translate_sys_error(err);
}
err = uv_tcp_set_socket(handle->loop, handle, sock, domain, 0);
if (err) {
closesocket(sock);
QUEUE_REMOVE(&handle->handle_queue);
return uv_translate_sys_error(err);
}
}
return 0;
}
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
return uv_tcp_init_ex(loop, handle, AF_UNSPEC);
}
void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
int err;
unsigned int i;
uv_tcp_accept_t* req;
if (handle->flags & UV_HANDLE_CONNECTION &&
handle->stream.conn.shutdown_req != NULL &&
handle->stream.conn.write_reqs_pending == 0) {
UNREGISTER_HANDLE_REQ(loop, handle, handle->stream.conn.shutdown_req);
err = 0;
if (handle->flags & UV_HANDLE_CLOSING) {
err = ERROR_OPERATION_ABORTED;
} else if (shutdown(handle->socket, SD_SEND) == SOCKET_ERROR) {
err = WSAGetLastError();
}
if (handle->stream.conn.shutdown_req->cb) {
handle->stream.conn.shutdown_req->cb(handle->stream.conn.shutdown_req,
uv_translate_sys_error(err));
}
handle->stream.conn.shutdown_req = NULL;
DECREASE_PENDING_REQ_COUNT(handle);
return;
}
if (handle->flags & UV_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
if (!(handle->flags & UV_HANDLE_TCP_SOCKET_CLOSED)) {
closesocket(handle->socket);
handle->socket = INVALID_SOCKET;
handle->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
}
if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->tcp.serv.accept_reqs) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
req = &handle->tcp.serv.accept_reqs[i];
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
}
}
uv__free(handle->tcp.serv.accept_reqs);
handle->tcp.serv.accept_reqs = NULL;
}
if (handle->flags & UV_HANDLE_CONNECTION &&
handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(handle->read_req.wait_handle);
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
}
if (handle->read_req.event_handle) {
CloseHandle(handle->read_req.event_handle);
handle->read_req.event_handle = NULL;
}
}
uv__handle_close(handle);
loop->active_tcp_streams--;
}
}
/* Unlike on Unix, here we don't set SO_REUSEADDR, because it doesn't just
* allow binding to addresses that are in use by sockets in TIME_WAIT, it
* effectively allows 'stealing' a port which is in use by another application.
*
* SO_EXCLUSIVEADDRUSE is also not good here because it does check all sockets,
* regardless of state, so we'd get an error even if the port is in use by a
* socket in TIME_WAIT state.
*
* See issue #1360.
*
*/
static int uv_tcp_try_bind(uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
unsigned int flags) {
DWORD err;
int r;
if (handle->socket == INVALID_SOCKET) {
SOCKET sock;
/* Cannot set IPv6-only mode on non-IPv6 socket. */
if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
return ERROR_INVALID_PARAMETER;
sock = socket(addr->sa_family, SOCK_STREAM, 0);
if (sock == INVALID_SOCKET) {
return WSAGetLastError();
}
err = uv_tcp_set_socket(handle->loop, handle, sock, addr->sa_family, 0);
if (err) {
closesocket(sock);
return err;
}
}
#ifdef IPV6_V6ONLY
if (addr->sa_family == AF_INET6) {
int on;
on = (flags & UV_TCP_IPV6ONLY) != 0;
/* TODO: how to handle errors? This may fail if there is no ipv4 stack
* available, or when run on XP/2003 which have no support for dualstack
* sockets. For now we're silently ignoring the error. */
setsockopt(handle->socket,
IPPROTO_IPV6,
IPV6_V6ONLY,
(const char*)&on,
sizeof on);
}
#endif
r = bind(handle->socket, addr, addrlen);
if (r == SOCKET_ERROR) {
err = WSAGetLastError();
if (err == WSAEADDRINUSE) {
/* Some errors are not to be reported until connect() or listen() */
handle->delayed_error = err;
} else {
return err;
}
}
handle->flags |= UV_HANDLE_BOUND;
return 0;
}
static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
uv_req_t* req;
uv_tcp_t* handle;
req = (uv_req_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->data;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->u.io.overlapped.InternalHigh,
0,
&req->u.io.overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void CALLBACK post_write_completion(void* context, BOOLEAN timed_out) {
uv_write_t* req;
uv_tcp_t* handle;
req = (uv_write_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->handle;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->u.io.overlapped.InternalHigh,
0,
&req->u.io.overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
uv_loop_t* loop = handle->loop;
BOOL success;
DWORD bytes;
SOCKET accept_socket;
short family;
assert(handle->flags & UV_HANDLE_LISTENING);
assert(req->accept_socket == INVALID_SOCKET);
/* choose family and extension function */
if (handle->flags & UV_HANDLE_IPV6) {
family = AF_INET6;
} else {
family = AF_INET;
}
/* Open a socket for the accepted connection. */
accept_socket = socket(family, SOCK_STREAM, 0);
if (accept_socket == INVALID_SOCKET) {
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
return;
}
/* Make the socket non-inheritable */
if (!SetHandleInformation((HANDLE) accept_socket, HANDLE_FLAG_INHERIT, 0)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
closesocket(accept_socket);
return;
}
/* Prepare the overlapped structure. */
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
}
success = handle->tcp.serv.func_acceptex(handle->socket,
accept_socket,
(void*)req->accept_buffer,
0,
sizeof(struct sockaddr_storage),
sizeof(struct sockaddr_storage),
&bytes,
&req->u.io.overlapped);
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
/* Process the req without IOCP. */
req->accept_socket = accept_socket;
handle->reqs_pending++;
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
/* The req will be processed with IOCP. */
req->accept_socket = accept_socket;
handle->reqs_pending++;
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
req->wait_handle == INVALID_HANDLE_VALUE &&
!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
}
} else {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
/* Destroy the event handle */
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
CloseHandle(req->u.io.overlapped.hEvent);
req->event_handle = NULL;
}
}
}
static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
uv_read_t* req;
uv_buf_t buf;
int result;
DWORD bytes, flags;
assert(handle->flags & UV_HANDLE_READING);
assert(!(handle->flags & UV_HANDLE_READ_PENDING));
req = &handle->read_req;
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
/*
* Preallocate a read buffer if the number of active streams is below
* the threshold.
*/
if (loop->active_tcp_streams < uv_active_tcp_streams_threshold) {
handle->flags &= ~UV_HANDLE_ZERO_READ;
handle->tcp.conn.read_buffer = uv_buf_init(NULL, 0);
handle->alloc_cb((uv_handle_t*) handle, 65536, &handle->tcp.conn.read_buffer);
if (handle->tcp.conn.read_buffer.base == NULL ||
handle->tcp.conn.read_buffer.len == 0) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &handle->tcp.conn.read_buffer);
return;
}
assert(handle->tcp.conn.read_buffer.base != NULL);
buf = handle->tcp.conn.read_buffer;
} else {
handle->flags |= UV_HANDLE_ZERO_READ;
buf.base = (char*) &uv_zero_;
buf.len = 0;
}
/* Prepare the overlapped structure. */
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
assert(req->event_handle);
req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
}
flags = 0;
result = WSARecv(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
&req->u.io.overlapped,
NULL);
if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
/* Process the req without IOCP. */
handle->flags |= UV_HANDLE_READ_PENDING;
req->u.io.overlapped.InternalHigh = bytes;
handle->reqs_pending++;
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
/* The req will be processed with IOCP. */
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
req->wait_handle == INVALID_HANDLE_VALUE &&
!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
}
} else {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
}
}
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
unsigned int i, simultaneous_accepts;
uv_tcp_accept_t* req;
int err;
assert(backlog > 0);
if (handle->flags & UV_HANDLE_LISTENING) {
handle->stream.serv.connection_cb = cb;
}
if (handle->flags & UV_HANDLE_READING) {
return WSAEISCONN;
}
if (handle->delayed_error) {
return handle->delayed_error;
}
if (!(handle->flags & UV_HANDLE_BOUND)) {
err = uv_tcp_try_bind(handle,
(const struct sockaddr*) &uv_addr_ip4_any_,
sizeof(uv_addr_ip4_any_),
0);
if (err)
return err;
if (handle->delayed_error)
return handle->delayed_error;
}
if (!handle->tcp.serv.func_acceptex) {
if (!uv_get_acceptex_function(handle->socket, &handle->tcp.serv.func_acceptex)) {
return WSAEAFNOSUPPORT;
}
}
if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
listen(handle->socket, backlog) == SOCKET_ERROR) {
return WSAGetLastError();
}
handle->flags |= UV_HANDLE_LISTENING;
handle->stream.serv.connection_cb = cb;
INCREASE_ACTIVE_COUNT(loop, handle);
simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1
: uv_simultaneous_server_accepts;
if(!handle->tcp.serv.accept_reqs) {
handle->tcp.serv.accept_reqs = (uv_tcp_accept_t*)
uv__malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
if (!handle->tcp.serv.accept_reqs) {
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
}
for (i = 0; i < simultaneous_accepts; i++) {
req = &handle->tcp.serv.accept_reqs[i];
UV_REQ_INIT(req, UV_ACCEPT);
req->accept_socket = INVALID_SOCKET;
req->data = handle;
req->wait_handle = INVALID_HANDLE_VALUE;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
} else {
req->event_handle = NULL;
}
uv_tcp_queue_accept(handle, req);
}
/* Initialize other unused requests too, because uv_tcp_endgame doesn't
* know how many requests were initialized, so it will try to clean up
* {uv_simultaneous_server_accepts} requests. */
for (i = simultaneous_accepts; i < uv_simultaneous_server_accepts; i++) {
req = &handle->tcp.serv.accept_reqs[i];
UV_REQ_INIT(req, UV_ACCEPT);
req->accept_socket = INVALID_SOCKET;
req->data = handle;
req->wait_handle = INVALID_HANDLE_VALUE;
req->event_handle = NULL;
}
}
return 0;
}
int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
uv_loop_t* loop = server->loop;
int err = 0;
int family;
uv_tcp_accept_t* req = server->tcp.serv.pending_accepts;
if (!req) {
/* No valid connections found, so we error out. */
return WSAEWOULDBLOCK;
}
if (req->accept_socket == INVALID_SOCKET) {
return WSAENOTCONN;
}
if (server->flags & UV_HANDLE_IPV6) {
family = AF_INET6;
} else {
family = AF_INET;
}
err = uv_tcp_set_socket(client->loop,
client,
req->accept_socket,
family,
0);
if (err) {
closesocket(req->accept_socket);
} else {
uv_connection_init((uv_stream_t*) client);
/* AcceptEx() implicitly binds the accepted socket. */
client->flags |= UV_HANDLE_BOUND | UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
}
/* Prepare the req to pick up a new connection */
server->tcp.serv.pending_accepts = req->next_pending;
req->next_pending = NULL;
req->accept_socket = INVALID_SOCKET;
if (!(server->flags & UV_HANDLE_CLOSING)) {
/* Check if we're in a middle of changing the number of pending accepts. */
if (!(server->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING)) {
uv_tcp_queue_accept(server, req);
} else {
/* We better be switching to a single pending accept. */
assert(server->flags & UV_HANDLE_TCP_SINGLE_ACCEPT);
server->tcp.serv.processed_accepts++;
if (server->tcp.serv.processed_accepts >= uv_simultaneous_server_accepts) {
server->tcp.serv.processed_accepts = 0;
/*
* All previously queued accept requests are now processed.
* We now switch to queueing just a single accept.
*/
uv_tcp_queue_accept(server, &server->tcp.serv.accept_reqs[0]);
server->flags &= ~UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
server->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
}
}
}
loop->active_tcp_streams++;
return err;
}
int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
uv_loop_t* loop = handle->loop;
handle->flags |= UV_HANDLE_READING;
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
INCREASE_ACTIVE_COUNT(loop, handle);
/* If reading was stopped and then started again, there could still be a read
* request pending. */
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
!handle->read_req.event_handle) {
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!handle->read_req.event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
}
uv_tcp_queue_read(loop, handle);
}
return 0;
}
static int uv_tcp_try_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
uv_connect_cb cb) {
uv_loop_t* loop = handle->loop;
const struct sockaddr* bind_addr;
struct sockaddr_storage converted;
BOOL success;
DWORD bytes;
int err;
err = uv__convert_to_localhost_if_unspecified(addr, &converted);
if (err)
return err;
if (handle->delayed_error) {
return handle->delayed_error;
}
if (!(handle->flags & UV_HANDLE_BOUND)) {
if (addrlen == sizeof(uv_addr_ip4_any_)) {
bind_addr = (const struct sockaddr*) &uv_addr_ip4_any_;
} else if (addrlen == sizeof(uv_addr_ip6_any_)) {
bind_addr = (const struct sockaddr*) &uv_addr_ip6_any_;
} else {
abort();
}
err = uv_tcp_try_bind(handle, bind_addr, addrlen, 0);
if (err)
return err;
if (handle->delayed_error)
return handle->delayed_error;
}
if (!handle->tcp.conn.func_connectex) {
if (!uv_get_connectex_function(handle->socket, &handle->tcp.conn.func_connectex)) {
return WSAEAFNOSUPPORT;
}
}
UV_REQ_INIT(req, UV_CONNECT);
req->handle = (uv_stream_t*) handle;
req->cb = cb;
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
success = handle->tcp.conn.func_connectex(handle->socket,
(const struct sockaddr*) &converted,
addrlen,
NULL,
0,
&bytes,
&req->u.io.overlapped);
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
/* Process the req without IOCP. */
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
/* The req will be processed with IOCP. */
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
} else {
return WSAGetLastError();
}
return 0;
}
int uv_tcp_getsockname(const uv_tcp_t* handle,
struct sockaddr* name,
int* namelen) {
int result;
if (handle->socket == INVALID_SOCKET) {
return UV_EINVAL;
}
if (handle->delayed_error) {
return uv_translate_sys_error(handle->delayed_error);
}
result = getsockname(handle->socket, name, namelen);
if (result != 0) {
return uv_translate_sys_error(WSAGetLastError());
}
return 0;
}
int uv_tcp_getpeername(const uv_tcp_t* handle,
struct sockaddr* name,
int* namelen) {
int result;
if (handle->socket == INVALID_SOCKET) {
return UV_EINVAL;
}
if (handle->delayed_error) {
return uv_translate_sys_error(handle->delayed_error);
}
result = getpeername(handle->socket, name, namelen);
if (result != 0) {
return uv_translate_sys_error(WSAGetLastError());
}
return 0;
}
int uv_tcp_write(uv_loop_t* loop,
uv_write_t* req,
uv_tcp_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_write_cb cb) {
int result;
DWORD bytes;
UV_REQ_INIT(req, UV_WRITE);
req->handle = (uv_stream_t*) handle;
req->cb = cb;
/* Prepare the overlapped structure. */
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
req->wait_handle = INVALID_HANDLE_VALUE;
}
result = WSASend(handle->socket,
(WSABUF*) bufs,
nbufs,
&bytes,
0,
&req->u.io.overlapped,
NULL);
if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
/* Request completed immediately. */
req->u.io.queued_bytes = 0;
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
uv_insert_pending_req(loop, (uv_req_t*) req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
/* Request queued by the kernel. */
req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
handle->write_queue_size += req->u.io.queued_bytes;
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_write_completion, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
}
} else {
/* Send failed due to an error, report it later */
req->u.io.queued_bytes = 0;
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, (uv_req_t*) req);
}
return 0;
}
int uv__tcp_try_write(uv_tcp_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs) {
int result;
DWORD bytes;
if (handle->stream.conn.write_reqs_pending > 0)
return UV_EAGAIN;
result = WSASend(handle->socket,
(WSABUF*) bufs,
nbufs,
&bytes,
0,
NULL,
NULL);
if (result == SOCKET_ERROR)
return uv_translate_sys_error(WSAGetLastError());
else
return bytes;
}
void uv_process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_req_t* req) {
DWORD bytes, flags, err;
uv_buf_t buf;
assert(handle->type == UV_TCP);
handle->flags &= ~UV_HANDLE_READ_PENDING;
if (!REQ_SUCCESS(req)) {
/* An error occurred doing the read. */
if ((handle->flags & UV_HANDLE_READING) ||
!(handle->flags & UV_HANDLE_ZERO_READ)) {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
buf = (handle->flags & UV_HANDLE_ZERO_READ) ?
uv_buf_init(NULL, 0) : handle->tcp.conn.read_buffer;
err = GET_REQ_SOCK_ERROR(req);
if (err == WSAECONNABORTED) {
/* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with Unix.
*/
err = WSAECONNRESET;
}
handle->read_cb((uv_stream_t*)handle,
uv_translate_sys_error(err),
&buf);
}
} else {
if (!(handle->flags & UV_HANDLE_ZERO_READ)) {
/* The read was done with a non-zero buffer length. */
if (req->u.io.overlapped.InternalHigh > 0) {
/* Successful read */
handle->read_cb((uv_stream_t*)handle,
req->u.io.overlapped.InternalHigh,
&handle->tcp.conn.read_buffer);
/* Read again only if bytes == buf.len */
if (req->u.io.overlapped.InternalHigh < handle->tcp.conn.read_buffer.len) {
goto done;
}
} else {
/* Connection closed */
if (handle->flags & UV_HANDLE_READING) {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
}
handle->flags &= ~UV_HANDLE_READABLE;
buf.base = 0;
buf.len = 0;
handle->read_cb((uv_stream_t*)handle, UV_EOF, &handle->tcp.conn.read_buffer);
goto done;
}
}
/* Do nonblocking reads until the buffer is empty */
while (handle->flags & UV_HANDLE_READING) {
buf = uv_buf_init(NULL, 0);
handle->alloc_cb((uv_handle_t*) handle, 65536, &buf);
if (buf.base == NULL || buf.len == 0) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
break;
}
assert(buf.base != NULL);
flags = 0;
if (WSARecv(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
NULL,
NULL) != SOCKET_ERROR) {
if (bytes > 0) {
/* Successful read */
handle->read_cb((uv_stream_t*)handle, bytes, &buf);
/* Read again only if bytes == buf.len */
if (bytes < buf.len) {
break;
}
} else {
/* Connection closed */
handle->flags &= ~(UV_HANDLE_READING | UV_HANDLE_READABLE);
DECREASE_ACTIVE_COUNT(loop, handle);
handle->read_cb((uv_stream_t*)handle, UV_EOF, &buf);
break;
}
} else {
err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
/* Read buffer was completely empty, report a 0-byte read. */
handle->read_cb((uv_stream_t*)handle, 0, &buf);
} else {
/* Ouch! serious error. */
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
if (err == WSAECONNABORTED) {
/* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with
* Unix. */
err = WSAECONNRESET;
}
handle->read_cb((uv_stream_t*)handle,
uv_translate_sys_error(err),
&buf);
}
break;
}
}
done:
/* Post another read if still reading and not closing. */
if ((handle->flags & UV_HANDLE_READING) &&
!(handle->flags & UV_HANDLE_READ_PENDING)) {
uv_tcp_queue_read(loop, handle);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_write_t* req) {
int err;
assert(handle->type == UV_TCP);
assert(handle->write_queue_size >= req->u.io.queued_bytes);
handle->write_queue_size -= req->u.io.queued_bytes;
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
}
if (req->cb) {
err = uv_translate_sys_error(GET_REQ_SOCK_ERROR(req));
if (err == UV_ECONNABORTED) {
/* use UV_ECANCELED for consistency with Unix */
err = UV_ECANCELED;
}
req->cb(req, err);
}
handle->stream.conn.write_reqs_pending--;
if (handle->stream.conn.shutdown_req != NULL &&
handle->stream.conn.write_reqs_pending == 0) {
uv_want_endgame(loop, (uv_handle_t*)handle);
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_tcp_accept_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_req_t* raw_req) {
uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;
int err;
assert(handle->type == UV_TCP);
/* If handle->accepted_socket is not a valid socket, then uv_queue_accept
* must have failed. This is a serious error. We stop accepting connections
* and report this error to the connection callback. */
if (req->accept_socket == INVALID_SOCKET) {
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags &= ~UV_HANDLE_LISTENING;
DECREASE_ACTIVE_COUNT(loop, handle);
if (handle->stream.serv.connection_cb) {
err = GET_REQ_SOCK_ERROR(req);
handle->stream.serv.connection_cb((uv_stream_t*)handle,
uv_translate_sys_error(err));
}
}
} else if (REQ_SUCCESS(req) &&
setsockopt(req->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
req->next_pending = handle->tcp.serv.pending_accepts;
handle->tcp.serv.pending_accepts = req;
/* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
if (handle->stream.serv.connection_cb) {
handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
}
} else {
/* Error related to accepted socket is ignored because the server socket
* may still be healthy. If the server socket is broken uv_queue_accept
* will detect it. */
closesocket(req->accept_socket);
req->accept_socket = INVALID_SOCKET;
if (handle->flags & UV_HANDLE_LISTENING) {
uv_tcp_queue_accept(handle, req);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_connect_t* req) {
int err;
assert(handle->type == UV_TCP);
UNREGISTER_HANDLE_REQ(loop, handle, req);
err = 0;
if (REQ_SUCCESS(req)) {
if (handle->flags & UV_HANDLE_CLOSING) {
/* use UV_ECANCELED for consistency with Unix */
err = ERROR_OPERATION_ABORTED;
} else if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
uv_connection_init((uv_stream_t*)handle);
handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
loop->active_tcp_streams++;
} else {
err = WSAGetLastError();
}
} else {
err = GET_REQ_SOCK_ERROR(req);
}
req->cb(req, uv_translate_sys_error(err));
DECREASE_PENDING_REQ_COUNT(handle);
}
int uv__tcp_xfer_export(uv_tcp_t* handle,
int target_pid,
uv__ipc_socket_xfer_type_t* xfer_type,
uv__ipc_socket_xfer_info_t* xfer_info) {
if (handle->flags & UV_HANDLE_CONNECTION) {
*xfer_type = UV__IPC_SOCKET_XFER_TCP_CONNECTION;
} else {
*xfer_type = UV__IPC_SOCKET_XFER_TCP_SERVER;
/* We're about to share the socket with another process. Because this is a
* listening socket, we assume that the other process will be accepting
* connections on it. Thus, before sharing the socket with another process,
* we call listen here in the parent process. */
if (!(handle->flags & UV_HANDLE_LISTENING)) {
if (!(handle->flags & UV_HANDLE_BOUND)) {
return ERROR_NOT_SUPPORTED;
}
if (handle->delayed_error == 0 &&
listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
handle->delayed_error = WSAGetLastError();
}
}
}
if (WSADuplicateSocketW(handle->socket, target_pid, &xfer_info->socket_info))
return WSAGetLastError();
xfer_info->delayed_error = handle->delayed_error;
/* Mark the local copy of the handle as 'shared' so we behave in a way that's
* friendly to the process(es) that we share the socket with. */
handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
return 0;
}
int uv__tcp_xfer_import(uv_tcp_t* tcp,
uv__ipc_socket_xfer_type_t xfer_type,
uv__ipc_socket_xfer_info_t* xfer_info) {
int err;
SOCKET socket;
assert(xfer_type == UV__IPC_SOCKET_XFER_TCP_SERVER ||
xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION);
socket = WSASocketW(FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
&xfer_info->socket_info,
0,
WSA_FLAG_OVERLAPPED);
if (socket == INVALID_SOCKET) {
return WSAGetLastError();
}
err = uv_tcp_set_socket(
tcp->loop, tcp, socket, xfer_info->socket_info.iAddressFamily, 1);
if (err) {
closesocket(socket);
return err;
}
tcp->delayed_error = xfer_info->delayed_error;
tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET;
if (xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION) {
uv_connection_init((uv_stream_t*)tcp);
tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
}
tcp->loop->active_tcp_streams++;
return 0;
}
int uv_tcp_nodelay(uv_tcp_t* handle, int enable) {
int err;
if (handle->socket != INVALID_SOCKET) {
err = uv__tcp_nodelay(handle, handle->socket, enable);
if (err)
return err;
}
if (enable) {
handle->flags |= UV_HANDLE_TCP_NODELAY;
} else {
handle->flags &= ~UV_HANDLE_TCP_NODELAY;
}
return 0;
}
int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
int err;
if (handle->socket != INVALID_SOCKET) {
err = uv__tcp_keepalive(handle, handle->socket, enable, delay);
if (err)
return err;
}
if (enable) {
handle->flags |= UV_HANDLE_TCP_KEEPALIVE;
} else {
handle->flags &= ~UV_HANDLE_TCP_KEEPALIVE;
}
/* TODO: Store delay if handle->socket isn't created yet. */
return 0;
}
int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
if (handle->flags & UV_HANDLE_CONNECTION) {
return UV_EINVAL;
}
/* Check if we're already in the desired mode. */
if ((enable && !(handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) ||
(!enable && handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
return 0;
}
/* Don't allow switching from single pending accept to many. */
if (enable) {
return UV_ENOTSUP;
}
/* Check if we're in a middle of changing the number of pending accepts. */
if (handle->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING) {
return 0;
}
handle->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
/* Flip the changing flag if we have already queued multiple accepts. */
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags |= UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
}
return 0;
}
static int uv_tcp_try_cancel_io(uv_tcp_t* tcp) {
SOCKET socket = tcp->socket;
int non_ifs_lsp;
/* Check if we have any non-IFS LSPs stacked on top of TCP */
non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
uv_tcp_non_ifs_lsp_ipv4;
/* If there are non-ifs LSPs then try to obtain a base handle for the socket.
* This will always fail on Windows XP/3k. */
if (non_ifs_lsp) {
DWORD bytes;
if (WSAIoctl(socket,
SIO_BASE_HANDLE,
NULL,
0,
&socket,
sizeof socket,
&bytes,
NULL,
NULL) != 0) {
/* Failed. We can't do CancelIo. */
return -1;
}
}
assert(socket != 0 && socket != INVALID_SOCKET);
if (!CancelIo((HANDLE) socket)) {
return GetLastError();
}
/* It worked. */
return 0;
}
void uv_tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
int close_socket = 1;
if (tcp->flags & UV_HANDLE_READ_PENDING) {
/* In order for winsock to do a graceful close there must not be any any
* pending reads, or the socket must be shut down for writing */
if (!(tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET)) {
/* Just do shutdown on non-shared sockets, which ensures graceful close. */
shutdown(tcp->socket, SD_SEND);
} else if (uv_tcp_try_cancel_io(tcp) == 0) {
/* In case of a shared socket, we try to cancel all outstanding I/O,. If
* that works, don't close the socket yet - wait for the read req to
* return and close the socket in uv_tcp_endgame. */
close_socket = 0;
} else {
/* When cancelling isn't possible - which could happen when an LSP is
* present on an old Windows version, we will have to close the socket
* with a read pending. That is not nice because trailing sent bytes may
* not make it to the other side. */
}
} else if ((tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
tcp->tcp.serv.accept_reqs != NULL) {
/* Under normal circumstances closesocket() will ensure that all pending
* accept reqs are canceled. However, when the socket is shared the
* presence of another reference to the socket in another process will keep
* the accept reqs going, so we have to ensure that these are canceled. */
if (uv_tcp_try_cancel_io(tcp) != 0) {
/* When cancellation is not possible, there is another option: we can
* close the incoming sockets, which will also cancel the accept
* operations. However this is not cool because we might inadvertently
* close a socket that just accepted a new connection, which will cause
* the connection to be aborted. */
unsigned int i;
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
uv_tcp_accept_t* req = &tcp->tcp.serv.accept_reqs[i];
if (req->accept_socket != INVALID_SOCKET &&
!HasOverlappedIoCompleted(&req->u.io.overlapped)) {
closesocket(req->accept_socket);
req->accept_socket = INVALID_SOCKET;
}
}
}
}
if (tcp->flags & UV_HANDLE_READING) {
tcp->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, tcp);
}
if (tcp->flags & UV_HANDLE_LISTENING) {
tcp->flags &= ~UV_HANDLE_LISTENING;
DECREASE_ACTIVE_COUNT(loop, tcp);
}
if (close_socket) {
closesocket(tcp->socket);
tcp->socket = INVALID_SOCKET;
tcp->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
}
tcp->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
uv__handle_closing(tcp);
if (tcp->reqs_pending == 0) {
uv_want_endgame(tcp->loop, (uv_handle_t*)tcp);
}
}
int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock) {
WSAPROTOCOL_INFOW protocol_info;
int opt_len;
int err;
struct sockaddr_storage saddr;
int saddr_len;
/* Detect the address family of the socket. */
opt_len = (int) sizeof protocol_info;
if (getsockopt(sock,
SOL_SOCKET,
SO_PROTOCOL_INFOW,
(char*) &protocol_info,
&opt_len) == SOCKET_ERROR) {
return uv_translate_sys_error(GetLastError());
}
err = uv_tcp_set_socket(handle->loop,
handle,
sock,
protocol_info.iAddressFamily,
1);
if (err) {
return uv_translate_sys_error(err);
}
/* Support already active socket. */
saddr_len = sizeof(saddr);
if (!uv_tcp_getsockname(handle, (struct sockaddr*) &saddr, &saddr_len)) {
/* Socket is already bound. */
handle->flags |= UV_HANDLE_BOUND;
saddr_len = sizeof(saddr);
if (!uv_tcp_getpeername(handle, (struct sockaddr*) &saddr, &saddr_len)) {
/* Socket is already connected. */
uv_connection_init((uv_stream_t*) handle);
handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
}
}
return 0;
}
/* This function is an egress point, i.e. it returns libuv errors rather than
* system errors.
*/
int uv__tcp_bind(uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
unsigned int flags) {
int err;
err = uv_tcp_try_bind(handle, addr, addrlen, flags);
if (err)
return uv_translate_sys_error(err);
return 0;
}
/* This function is an egress point, i.e. it returns libuv errors rather than
* system errors.
*/
int uv__tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
uv_connect_cb cb) {
int err;
err = uv_tcp_try_connect(req, handle, addr, addrlen, cb);
if (err)
return uv_translate_sys_error(err);
return 0;
}