| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| #define _GNU_SOURCE |
| #include <config.h> |
| |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <sys/stat.h> |
| #include <sys/mman.h> |
| #include <sys/sendfile.h> |
| #include <stdarg.h> |
| #include <dlfcn.h> |
| #include <netdb.h> |
| #include <unistd.h> |
| #include <fcntl.h> |
| #include <string.h> |
| #include <netinet/tcp.h> |
| #include <unistd.h> |
| #include <semaphore.h> |
| #include <ctype.h> |
| #include <stdlib.h> |
| #include <stdio.h> |
| |
| #include <sys/uio.h> |
| |
| #include <rdma/rdma_cma.h> |
| #include <rdma/rdma_verbs.h> |
| #include <rdma/rsocket.h> |
| #include "cma.h" |
| #include "indexer.h" |
| |
| struct socket_calls { |
| int (*socket)(int domain, int type, int protocol); |
| int (*bind)(int socket, const struct sockaddr *addr, socklen_t addrlen); |
| int (*listen)(int socket, int backlog); |
| int (*accept)(int socket, struct sockaddr *addr, socklen_t *addrlen); |
| int (*connect)(int socket, const struct sockaddr *addr, socklen_t addrlen); |
| ssize_t (*recv)(int socket, void *buf, size_t len, int flags); |
| ssize_t (*recvfrom)(int socket, void *buf, size_t len, int flags, |
| struct sockaddr *src_addr, socklen_t *addrlen); |
| ssize_t (*recvmsg)(int socket, struct msghdr *msg, int flags); |
| ssize_t (*read)(int socket, void *buf, size_t count); |
| ssize_t (*readv)(int socket, const struct iovec *iov, int iovcnt); |
| ssize_t (*send)(int socket, const void *buf, size_t len, int flags); |
| ssize_t (*sendto)(int socket, const void *buf, size_t len, int flags, |
| const struct sockaddr *dest_addr, socklen_t addrlen); |
| ssize_t (*sendmsg)(int socket, const struct msghdr *msg, int flags); |
| ssize_t (*write)(int socket, const void *buf, size_t count); |
| ssize_t (*writev)(int socket, const struct iovec *iov, int iovcnt); |
| int (*poll)(struct pollfd *fds, nfds_t nfds, int timeout); |
| int (*shutdown)(int socket, int how); |
| int (*close)(int socket); |
| int (*getpeername)(int socket, struct sockaddr *addr, socklen_t *addrlen); |
| int (*getsockname)(int socket, struct sockaddr *addr, socklen_t *addrlen); |
| int (*setsockopt)(int socket, int level, int optname, |
| const void *optval, socklen_t optlen); |
| int (*getsockopt)(int socket, int level, int optname, |
| void *optval, socklen_t *optlen); |
| int (*fcntl)(int socket, int cmd, ... ); |
| int (*dup2)(int oldfd, int newfd); |
| ssize_t (*sendfile)(int out_fd, int in_fd, off_t *offset, size_t count); |
| int (*fxstat)(int ver, int fd, struct stat *buf); |
| }; |
| |
| static struct socket_calls real; |
| static struct socket_calls rs; |
| |
| static struct index_map idm; |
| static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; |
| |
| static int sq_size; |
| static int rq_size; |
| static int sq_inline; |
| static int fork_support; |
| |
| enum fd_type { |
| fd_normal, |
| fd_rsocket |
| }; |
| |
| enum fd_fork_state { |
| fd_ready, |
| fd_fork, |
| fd_fork_listen, |
| fd_fork_active, |
| fd_fork_passive |
| }; |
| |
| struct fd_info { |
| enum fd_type type; |
| enum fd_fork_state state; |
| int fd; |
| int dupfd; |
| _Atomic(int) refcnt; |
| }; |
| |
| struct config_entry { |
| char *name; |
| int domain; |
| int type; |
| int protocol; |
| }; |
| |
| static struct config_entry *config; |
| static int config_cnt; |
| |
| static void free_config(void) |
| { |
| while (config_cnt) |
| free(config[--config_cnt].name); |
| |
| free(config); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| static void scan_config(void) |
| { |
| struct config_entry *new_config; |
| FILE *fp; |
| char line[120], prog[64], dom[16], type[16], proto[16]; |
| |
| fp = fopen(RS_CONF_DIR "/preload_config", "r"); |
| if (!fp) |
| return; |
| |
| while (fgets(line, sizeof(line), fp)) { |
| if (line[0] == '#') |
| continue; |
| |
| if (sscanf(line, "%63s%15s%15s%15s", prog, dom, type, proto) != 4) |
| continue; |
| |
| new_config = realloc(config, (config_cnt + 1) * |
| sizeof(struct config_entry)); |
| if (!new_config) |
| break; |
| |
| config = new_config; |
| memset(&config[config_cnt], 0, sizeof(struct config_entry)); |
| |
| if (!strcasecmp(dom, "INET") || |
| !strcasecmp(dom, "AF_INET") || |
| !strcasecmp(dom, "PF_INET")) { |
| config[config_cnt].domain = AF_INET; |
| } else if (!strcasecmp(dom, "INET6") || |
| !strcasecmp(dom, "AF_INET6") || |
| !strcasecmp(dom, "PF_INET6")) { |
| config[config_cnt].domain = AF_INET6; |
| } else if (!strcasecmp(dom, "IB") || |
| !strcasecmp(dom, "AF_IB") || |
| !strcasecmp(dom, "PF_IB")) { |
| config[config_cnt].domain = AF_IB; |
| } else if (strcmp(dom, "*")) { |
| continue; |
| } |
| |
| if (!strcasecmp(type, "STREAM") || |
| !strcasecmp(type, "SOCK_STREAM")) { |
| config[config_cnt].type = SOCK_STREAM; |
| } else if (!strcasecmp(type, "DGRAM") || |
| !strcasecmp(type, "SOCK_DGRAM")) { |
| config[config_cnt].type = SOCK_DGRAM; |
| } else if (strcmp(type, "*")) { |
| continue; |
| } |
| |
| if (!strcasecmp(proto, "TCP") || |
| !strcasecmp(proto, "IPPROTO_TCP")) { |
| config[config_cnt].protocol = IPPROTO_TCP; |
| } else if (!strcasecmp(proto, "UDP") || |
| !strcasecmp(proto, "IPPROTO_UDP")) { |
| config[config_cnt].protocol = IPPROTO_UDP; |
| } else if (strcmp(proto, "*")) { |
| continue; |
| } |
| |
| if (strcmp(prog, "*")) { |
| if (!(config[config_cnt].name = strdup(prog))) |
| continue; |
| } |
| |
| config_cnt++; |
| } |
| |
| fclose(fp); |
| if (config_cnt) |
| atexit(free_config); |
| } |
| |
| static int intercept_socket(int domain, int type, int protocol) |
| { |
| int i; |
| |
| if (!config_cnt) |
| return 1; |
| |
| if (!protocol) { |
| if (type == SOCK_STREAM) |
| protocol = IPPROTO_TCP; |
| else if (type == SOCK_DGRAM) |
| protocol = IPPROTO_UDP; |
| } |
| |
| for (i = 0; i < config_cnt; i++) { |
| if ((!config[i].name || |
| !strncasecmp(config[i].name, program_invocation_short_name, |
| strlen(config[i].name))) && |
| (!config[i].domain || config[i].domain == domain) && |
| (!config[i].type || config[i].type == type) && |
| (!config[i].protocol || config[i].protocol == protocol)) |
| return 1; |
| } |
| |
| return 0; |
| } |
| |
| static int fd_open(void) |
| { |
| struct fd_info *fdi; |
| int ret, index; |
| |
| fdi = calloc(1, sizeof(*fdi)); |
| if (!fdi) |
| return ERR(ENOMEM); |
| |
| index = open("/dev/null", O_RDONLY); |
| if (index < 0) { |
| ret = index; |
| goto err1; |
| } |
| |
| fdi->dupfd = -1; |
| atomic_store(&fdi->refcnt, 1); |
| pthread_mutex_lock(&mut); |
| ret = idm_set(&idm, index, fdi); |
| pthread_mutex_unlock(&mut); |
| if (ret < 0) |
| goto err2; |
| |
| return index; |
| |
| err2: |
| real.close(index); |
| err1: |
| free(fdi); |
| return ret; |
| } |
| |
| static void fd_store(int index, int fd, enum fd_type type, enum fd_fork_state state) |
| { |
| struct fd_info *fdi; |
| |
| fdi = idm_at(&idm, index); |
| fdi->fd = fd; |
| fdi->type = type; |
| fdi->state = state; |
| } |
| |
| static inline enum fd_type fd_get(int index, int *fd) |
| { |
| struct fd_info *fdi; |
| |
| fdi = idm_lookup(&idm, index); |
| if (fdi) { |
| *fd = fdi->fd; |
| return fdi->type; |
| |
| } else { |
| *fd = index; |
| return fd_normal; |
| } |
| } |
| |
| static inline int fd_getd(int index) |
| { |
| struct fd_info *fdi; |
| |
| fdi = idm_lookup(&idm, index); |
| return fdi ? fdi->fd : index; |
| } |
| |
| static inline enum fd_fork_state fd_gets(int index) |
| { |
| struct fd_info *fdi; |
| |
| fdi = idm_lookup(&idm, index); |
| return fdi ? fdi->state : fd_ready; |
| } |
| |
| static inline enum fd_type fd_gett(int index) |
| { |
| struct fd_info *fdi; |
| |
| fdi = idm_lookup(&idm, index); |
| return fdi ? fdi->type : fd_normal; |
| } |
| |
| static enum fd_type fd_close(int index, int *fd) |
| { |
| struct fd_info *fdi; |
| enum fd_type type; |
| |
| fdi = idm_lookup(&idm, index); |
| if (fdi) { |
| idm_clear(&idm, index); |
| *fd = fdi->fd; |
| type = fdi->type; |
| real.close(index); |
| free(fdi); |
| } else { |
| *fd = index; |
| type = fd_normal; |
| } |
| return type; |
| } |
| |
| static void getenv_options(void) |
| { |
| char *var; |
| |
| var = getenv("RS_SQ_SIZE"); |
| if (var) |
| sq_size = atoi(var); |
| |
| var = getenv("RS_RQ_SIZE"); |
| if (var) |
| rq_size = atoi(var); |
| |
| var = getenv("RS_INLINE"); |
| if (var) |
| sq_inline = atoi(var); |
| |
| var = getenv("RDMAV_FORK_SAFE"); |
| if (var) |
| fork_support = atoi(var); |
| } |
| |
| static void init_preload(void) |
| { |
| static int init; |
| |
| |
| if (init) |
| return; |
| |
| pthread_mutex_lock(&mut); |
| if (init) |
| goto out; |
| |
| real.socket = dlsym(RTLD_NEXT, "socket"); |
| real.bind = dlsym(RTLD_NEXT, "bind"); |
| real.listen = dlsym(RTLD_NEXT, "listen"); |
| real.accept = dlsym(RTLD_NEXT, "accept"); |
| real.connect = dlsym(RTLD_NEXT, "connect"); |
| real.recv = dlsym(RTLD_NEXT, "recv"); |
| real.recvfrom = dlsym(RTLD_NEXT, "recvfrom"); |
| real.recvmsg = dlsym(RTLD_NEXT, "recvmsg"); |
| real.read = dlsym(RTLD_NEXT, "read"); |
| real.readv = dlsym(RTLD_NEXT, "readv"); |
| real.send = dlsym(RTLD_NEXT, "send"); |
| real.sendto = dlsym(RTLD_NEXT, "sendto"); |
| real.sendmsg = dlsym(RTLD_NEXT, "sendmsg"); |
| real.write = dlsym(RTLD_NEXT, "write"); |
| real.writev = dlsym(RTLD_NEXT, "writev"); |
| real.poll = dlsym(RTLD_NEXT, "poll"); |
| real.shutdown = dlsym(RTLD_NEXT, "shutdown"); |
| real.close = dlsym(RTLD_NEXT, "close"); |
| real.getpeername = dlsym(RTLD_NEXT, "getpeername"); |
| real.getsockname = dlsym(RTLD_NEXT, "getsockname"); |
| real.setsockopt = dlsym(RTLD_NEXT, "setsockopt"); |
| real.getsockopt = dlsym(RTLD_NEXT, "getsockopt"); |
| real.fcntl = dlsym(RTLD_NEXT, "fcntl"); |
| real.dup2 = dlsym(RTLD_NEXT, "dup2"); |
| real.sendfile = dlsym(RTLD_NEXT, "sendfile"); |
| real.fxstat = dlsym(RTLD_NEXT, "__fxstat"); |
| |
| rs.socket = dlsym(RTLD_DEFAULT, "rsocket"); |
| rs.bind = dlsym(RTLD_DEFAULT, "rbind"); |
| rs.listen = dlsym(RTLD_DEFAULT, "rlisten"); |
| rs.accept = dlsym(RTLD_DEFAULT, "raccept"); |
| rs.connect = dlsym(RTLD_DEFAULT, "rconnect"); |
| rs.recv = dlsym(RTLD_DEFAULT, "rrecv"); |
| rs.recvfrom = dlsym(RTLD_DEFAULT, "rrecvfrom"); |
| rs.recvmsg = dlsym(RTLD_DEFAULT, "rrecvmsg"); |
| rs.read = dlsym(RTLD_DEFAULT, "rread"); |
| rs.readv = dlsym(RTLD_DEFAULT, "rreadv"); |
| rs.send = dlsym(RTLD_DEFAULT, "rsend"); |
| rs.sendto = dlsym(RTLD_DEFAULT, "rsendto"); |
| rs.sendmsg = dlsym(RTLD_DEFAULT, "rsendmsg"); |
| rs.write = dlsym(RTLD_DEFAULT, "rwrite"); |
| rs.writev = dlsym(RTLD_DEFAULT, "rwritev"); |
| rs.poll = dlsym(RTLD_DEFAULT, "rpoll"); |
| rs.shutdown = dlsym(RTLD_DEFAULT, "rshutdown"); |
| rs.close = dlsym(RTLD_DEFAULT, "rclose"); |
| rs.getpeername = dlsym(RTLD_DEFAULT, "rgetpeername"); |
| rs.getsockname = dlsym(RTLD_DEFAULT, "rgetsockname"); |
| rs.setsockopt = dlsym(RTLD_DEFAULT, "rsetsockopt"); |
| rs.getsockopt = dlsym(RTLD_DEFAULT, "rgetsockopt"); |
| rs.fcntl = dlsym(RTLD_DEFAULT, "rfcntl"); |
| |
| getenv_options(); |
| scan_config(); |
| init = 1; |
| out: |
| pthread_mutex_unlock(&mut); |
| } |
| |
| |
| |
| |
| static int copysockopts(int dfd, int sfd, struct socket_calls *dapi, |
| struct socket_calls *sapi) |
| { |
| socklen_t len; |
| int param, ret; |
| |
| ret = sapi->fcntl(sfd, F_GETFL); |
| if (ret > 0) |
| ret = dapi->fcntl(dfd, F_SETFL, ret); |
| if (ret) |
| return ret; |
| |
| len = sizeof param; |
| ret = sapi->getsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, ¶m, &len); |
| if (param && !ret) |
| ret = dapi->setsockopt(dfd, SOL_SOCKET, SO_REUSEADDR, ¶m, len); |
| if (ret) |
| return ret; |
| |
| len = sizeof param; |
| ret = sapi->getsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, ¶m, &len); |
| if (param && !ret) |
| ret = dapi->setsockopt(dfd, IPPROTO_TCP, TCP_NODELAY, ¶m, len); |
| if (ret) |
| return ret; |
| |
| return 0; |
| } |
| |
| |
| |
| |
| static int transpose_socket(int socket, enum fd_type new_type) |
| { |
| socklen_t len = 0; |
| int sfd, dfd, param, ret; |
| struct socket_calls *sapi, *dapi; |
| |
| sfd = fd_getd(socket); |
| if (new_type == fd_rsocket) { |
| dapi = &rs; |
| sapi = ℜ |
| } else { |
| dapi = ℜ |
| sapi = &rs; |
| } |
| |
| ret = sapi->getsockname(sfd, NULL, &len); |
| if (ret) |
| return ret; |
| |
| param = (len == sizeof(struct sockaddr_in6)) ? PF_INET6 : PF_INET; |
| dfd = dapi->socket(param, SOCK_STREAM, 0); |
| if (dfd < 0) |
| return dfd; |
| |
| ret = copysockopts(dfd, sfd, dapi, sapi); |
| if (ret) |
| goto err; |
| |
| fd_store(socket, dfd, new_type, fd_ready); |
| return dfd; |
| |
| err: |
| dapi->close(dfd); |
| return ret; |
| } |
| |
| |
| |
| |
| static void set_rsocket_options(int rsocket) |
| { |
| if (sq_size) |
| rsetsockopt(rsocket, SOL_RDMA, RDMA_SQSIZE, &sq_size, sizeof sq_size); |
| |
| if (rq_size) |
| rsetsockopt(rsocket, SOL_RDMA, RDMA_RQSIZE, &rq_size, sizeof rq_size); |
| |
| if (sq_inline) |
| rsetsockopt(rsocket, SOL_RDMA, RDMA_INLINE, &sq_inline, sizeof sq_inline); |
| } |
| |
| int socket(int domain, int type, int protocol) |
| { |
| static __thread int recursive; |
| int index, ret; |
| |
| init_preload(); |
| |
| if (recursive || !intercept_socket(domain, type, protocol)) |
| goto real; |
| |
| index = fd_open(); |
| if (index < 0) |
| return index; |
| |
| if (fork_support && (domain == PF_INET || domain == PF_INET6) && |
| (type == SOCK_STREAM) && (!protocol || protocol == IPPROTO_TCP)) { |
| ret = real.socket(domain, type, protocol); |
| if (ret < 0) |
| return ret; |
| fd_store(index, ret, fd_normal, fd_fork); |
| return index; |
| } |
| |
| recursive = 1; |
| ret = rsocket(domain, type, protocol); |
| recursive = 0; |
| if (ret >= 0) { |
| fd_store(index, ret, fd_rsocket, fd_ready); |
| set_rsocket_options(ret); |
| return index; |
| } |
| fd_close(index, &ret); |
| real: |
| return real.socket(domain, type, protocol); |
| } |
| |
| int bind(int socket, const struct sockaddr *addr, socklen_t addrlen) |
| { |
| int fd; |
| return (fd_get(socket, &fd) == fd_rsocket) ? |
| rbind(fd, addr, addrlen) : real.bind(fd, addr, addrlen); |
| } |
| |
| int listen(int socket, int backlog) |
| { |
| int fd, ret; |
| if (fd_get(socket, &fd) == fd_rsocket) { |
| ret = rlisten(fd, backlog); |
| } else { |
| ret = real.listen(fd, backlog); |
| if (!ret && fd_gets(socket) == fd_fork) |
| fd_store(socket, fd, fd_normal, fd_fork_listen); |
| } |
| return ret; |
| } |
| |
| int accept(int socket, struct sockaddr *addr, socklen_t *addrlen) |
| { |
| int fd, index, ret; |
| |
| if (fd_get(socket, &fd) == fd_rsocket) { |
| index = fd_open(); |
| if (index < 0) |
| return index; |
| |
| ret = raccept(fd, addr, addrlen); |
| if (ret < 0) { |
| fd_close(index, &fd); |
| return ret; |
| } |
| |
| fd_store(index, ret, fd_rsocket, fd_ready); |
| return index; |
| } else if (fd_gets(socket) == fd_fork_listen) { |
| index = fd_open(); |
| if (index < 0) |
| return index; |
| |
| ret = real.accept(fd, addr, addrlen); |
| if (ret < 0) { |
| fd_close(index, &fd); |
| return ret; |
| } |
| |
| fd_store(index, ret, fd_normal, fd_fork_passive); |
| return index; |
| } else { |
| return real.accept(fd, addr, addrlen); |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| static void fork_active(int socket) |
| { |
| struct sockaddr_storage addr; |
| int sfd, dfd, ret; |
| socklen_t len; |
| uint32_t msg; |
| long flags; |
| |
| sfd = fd_getd(socket); |
| |
| flags = real.fcntl(sfd, F_GETFL); |
| real.fcntl(sfd, F_SETFL, 0); |
| ret = real.recv(sfd, &msg, sizeof msg, MSG_PEEK); |
| real.fcntl(sfd, F_SETFL, flags); |
| if ((ret != sizeof msg) || msg) |
| goto err1; |
| |
| len = sizeof addr; |
| ret = real.getpeername(sfd, (struct sockaddr *) &addr, &len); |
| if (ret) |
| goto err1; |
| |
| dfd = rsocket(addr.ss_family, SOCK_STREAM, 0); |
| if (dfd < 0) |
| goto err1; |
| |
| ret = rconnect(dfd, (struct sockaddr *) &addr, len); |
| if (ret) |
| goto err2; |
| |
| set_rsocket_options(dfd); |
| copysockopts(dfd, sfd, &rs, &real); |
| real.shutdown(sfd, SHUT_RDWR); |
| real.close(sfd); |
| fd_store(socket, dfd, fd_rsocket, fd_ready); |
| return; |
| |
| err2: |
| rclose(dfd); |
| err1: |
| fd_store(socket, sfd, fd_normal, fd_ready); |
| } |
| |
| |
| |
| |
| |
| |
| |
| static void fork_passive(int socket) |
| { |
| struct sockaddr_in6 sin6; |
| sem_t *sem; |
| int lfd, sfd, dfd, ret, param; |
| socklen_t len; |
| uint32_t msg; |
| |
| sfd = fd_getd(socket); |
| |
| len = sizeof sin6; |
| ret = real.getsockname(sfd, (struct sockaddr *) &sin6, &len); |
| if (ret) |
| goto out; |
| sin6.sin6_flowinfo = 0; |
| sin6.sin6_scope_id = 0; |
| memset(&sin6.sin6_addr, 0, sizeof sin6.sin6_addr); |
| |
| sem = sem_open("/rsocket_fork", O_CREAT | O_RDWR, |
| S_IRWXU | S_IRWXG, 1); |
| if (sem == SEM_FAILED) { |
| ret = -1; |
| goto out; |
| } |
| |
| lfd = rsocket(sin6.sin6_family, SOCK_STREAM, 0); |
| if (lfd < 0) { |
| ret = lfd; |
| goto sclose; |
| } |
| |
| param = 1; |
| rsetsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, ¶m, sizeof param); |
| |
| sem_wait(sem); |
| ret = rbind(lfd, (struct sockaddr *) &sin6, sizeof sin6); |
| if (ret) |
| goto lclose; |
| |
| ret = rlisten(lfd, 1); |
| if (ret) |
| goto lclose; |
| |
| msg = 0; |
| len = real.write(sfd, &msg, sizeof msg); |
| if (len != sizeof msg) |
| goto lclose; |
| |
| dfd = raccept(lfd, NULL, NULL); |
| if (dfd < 0) { |
| ret = dfd; |
| goto lclose; |
| } |
| |
| set_rsocket_options(dfd); |
| copysockopts(dfd, sfd, &rs, &real); |
| real.shutdown(sfd, SHUT_RDWR); |
| real.close(sfd); |
| fd_store(socket, dfd, fd_rsocket, fd_ready); |
| |
| lclose: |
| rclose(lfd); |
| sem_post(sem); |
| sclose: |
| sem_close(sem); |
| out: |
| if (ret) |
| fd_store(socket, sfd, fd_normal, fd_ready); |
| } |
| |
| static inline enum fd_type fd_fork_get(int index, int *fd) |
| { |
| struct fd_info *fdi; |
| |
| fdi = idm_lookup(&idm, index); |
| if (fdi) { |
| if (fdi->state == fd_fork_passive) |
| fork_passive(index); |
| else if (fdi->state == fd_fork_active) |
| fork_active(index); |
| *fd = fdi->fd; |
| return fdi->type; |
| |
| } else { |
| *fd = index; |
| return fd_normal; |
| } |
| } |
| |
| int connect(int socket, const struct sockaddr *addr, socklen_t addrlen) |
| { |
| int fd, ret; |
| |
| if (fd_get(socket, &fd) == fd_rsocket) { |
| ret = rconnect(fd, addr, addrlen); |
| if (!ret || errno == EINPROGRESS) |
| return ret; |
| |
| ret = transpose_socket(socket, fd_normal); |
| if (ret < 0) |
| return ret; |
| |
| rclose(fd); |
| fd = ret; |
| } else if (fd_gets(socket) == fd_fork) { |
| fd_store(socket, fd, fd_normal, fd_fork_active); |
| } |
| |
| return real.connect(fd, addr, addrlen); |
| } |
| |
| ssize_t recv(int socket, void *buf, size_t len, int flags) |
| { |
| int fd; |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rrecv(fd, buf, len, flags) : real.recv(fd, buf, len, flags); |
| } |
| |
| ssize_t recvfrom(int socket, void *buf, size_t len, int flags, |
| struct sockaddr *src_addr, socklen_t *addrlen) |
| { |
| int fd; |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rrecvfrom(fd, buf, len, flags, src_addr, addrlen) : |
| real.recvfrom(fd, buf, len, flags, src_addr, addrlen); |
| } |
| |
| ssize_t recvmsg(int socket, struct msghdr *msg, int flags) |
| { |
| int fd; |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rrecvmsg(fd, msg, flags) : real.recvmsg(fd, msg, flags); |
| } |
| |
| ssize_t read(int socket, void *buf, size_t count) |
| { |
| int fd; |
| init_preload(); |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rread(fd, buf, count) : real.read(fd, buf, count); |
| } |
| |
| ssize_t readv(int socket, const struct iovec *iov, int iovcnt) |
| { |
| int fd; |
| init_preload(); |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rreadv(fd, iov, iovcnt) : real.readv(fd, iov, iovcnt); |
| } |
| |
| ssize_t send(int socket, const void *buf, size_t len, int flags) |
| { |
| int fd; |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rsend(fd, buf, len, flags) : real.send(fd, buf, len, flags); |
| } |
| |
| ssize_t sendto(int socket, const void *buf, size_t len, int flags, |
| const struct sockaddr *dest_addr, socklen_t addrlen) |
| { |
| int fd; |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rsendto(fd, buf, len, flags, dest_addr, addrlen) : |
| real.sendto(fd, buf, len, flags, dest_addr, addrlen); |
| } |
| |
| ssize_t sendmsg(int socket, const struct msghdr *msg, int flags) |
| { |
| int fd; |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rsendmsg(fd, msg, flags) : real.sendmsg(fd, msg, flags); |
| } |
| |
| ssize_t write(int socket, const void *buf, size_t count) |
| { |
| int fd; |
| init_preload(); |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rwrite(fd, buf, count) : real.write(fd, buf, count); |
| } |
| |
| ssize_t writev(int socket, const struct iovec *iov, int iovcnt) |
| { |
| int fd; |
| init_preload(); |
| return (fd_fork_get(socket, &fd) == fd_rsocket) ? |
| rwritev(fd, iov, iovcnt) : real.writev(fd, iov, iovcnt); |
| } |
| |
| static struct pollfd *fds_alloc(nfds_t nfds) |
| { |
| static __thread struct pollfd *rfds; |
| static __thread nfds_t rnfds; |
| |
| if (nfds > rnfds) { |
| if (rfds) |
| free(rfds); |
| |
| rfds = malloc(sizeof(*rfds) * nfds); |
| rnfds = rfds ? nfds : 0; |
| } |
| |
| return rfds; |
| } |
| |
| int poll(struct pollfd *fds, nfds_t nfds, int timeout) |
| { |
| struct pollfd *rfds; |
| int i, ret; |
| |
| init_preload(); |
| for (i = 0; i < nfds; i++) { |
| if (fd_gett(fds[i].fd) == fd_rsocket) |
| goto use_rpoll; |
| } |
| |
| return real.poll(fds, nfds, timeout); |
| |
| use_rpoll: |
| rfds = fds_alloc(nfds); |
| if (!rfds) |
| return ERR(ENOMEM); |
| |
| for (i = 0; i < nfds; i++) { |
| rfds[i].fd = fd_getd(fds[i].fd); |
| rfds[i].events = fds[i].events; |
| rfds[i].revents = 0; |
| } |
| |
| ret = rpoll(rfds, nfds, timeout); |
| |
| for (i = 0; i < nfds; i++) |
| fds[i].revents = rfds[i].revents; |
| |
| return ret; |
| } |
| |
| static void select_to_rpoll(struct pollfd *fds, int *nfds, |
| fd_set *readfds, fd_set *writefds, fd_set *exceptfds) |
| { |
| int fd, events, i = 0; |
| |
| for (fd = 0; fd < *nfds; fd++) { |
| events = (readfds && FD_ISSET(fd, readfds)) ? POLLIN : 0; |
| if (writefds && FD_ISSET(fd, writefds)) |
| events |= POLLOUT; |
| |
| if (events || (exceptfds && FD_ISSET(fd, exceptfds))) { |
| fds[i].fd = fd_getd(fd); |
| fds[i++].events = events; |
| } |
| } |
| |
| *nfds = i; |
| } |
| |
| static int rpoll_to_select(struct pollfd *fds, int nfds, |
| fd_set *readfds, fd_set *writefds, fd_set *exceptfds) |
| { |
| int fd, rfd, i, cnt = 0; |
| |
| for (i = 0, fd = 0; i < nfds; fd++) { |
| rfd = fd_getd(fd); |
| if (rfd != fds[i].fd) |
| continue; |
| |
| if (readfds && (fds[i].revents & POLLIN)) { |
| FD_SET(fd, readfds); |
| cnt++; |
| } |
| |
| if (writefds && (fds[i].revents & POLLOUT)) { |
| FD_SET(fd, writefds); |
| cnt++; |
| } |
| |
| if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) { |
| FD_SET(fd, exceptfds); |
| cnt++; |
| } |
| i++; |
| } |
| |
| return cnt; |
| } |
| |
| static int rs_convert_timeout(struct timeval *timeout) |
| { |
| return !timeout ? -1 : timeout->tv_sec * 1000 + timeout->tv_usec / 1000; |
| } |
| |
| int select(int nfds, fd_set *readfds, fd_set *writefds, |
| fd_set *exceptfds, struct timeval *timeout) |
| { |
| struct pollfd *fds; |
| int ret; |
| |
| fds = fds_alloc(nfds); |
| if (!fds) |
| return ERR(ENOMEM); |
| |
| select_to_rpoll(fds, &nfds, readfds, writefds, exceptfds); |
| ret = rpoll(fds, nfds, rs_convert_timeout(timeout)); |
| |
| if (readfds) |
| FD_ZERO(readfds); |
| if (writefds) |
| FD_ZERO(writefds); |
| if (exceptfds) |
| FD_ZERO(exceptfds); |
| |
| if (ret > 0) |
| ret = rpoll_to_select(fds, nfds, readfds, writefds, exceptfds); |
| |
| return ret; |
| } |
| |
| int shutdown(int socket, int how) |
| { |
| int fd; |
| return (fd_get(socket, &fd) == fd_rsocket) ? |
| rshutdown(fd, how) : real.shutdown(fd, how); |
| } |
| |
| int close(int socket) |
| { |
| struct fd_info *fdi; |
| int ret; |
| |
| init_preload(); |
| fdi = idm_lookup(&idm, socket); |
| if (!fdi) |
| return real.close(socket); |
| |
| if (fdi->dupfd != -1) { |
| ret = close(fdi->dupfd); |
| if (ret) |
| return ret; |
| } |
| |
| if (atomic_fetch_sub(&fdi->refcnt, 1) != 1) |
| return 0; |
| |
| idm_clear(&idm, socket); |
| real.close(socket); |
| ret = (fdi->type == fd_rsocket) ? rclose(fdi->fd) : real.close(fdi->fd); |
| free(fdi); |
| return ret; |
| } |
| |
| int getpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) |
| { |
| int fd; |
| return (fd_get(socket, &fd) == fd_rsocket) ? |
| rgetpeername(fd, addr, addrlen) : |
| real.getpeername(fd, addr, addrlen); |
| } |
| |
| int getsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) |
| { |
| int fd; |
| init_preload(); |
| return (fd_get(socket, &fd) == fd_rsocket) ? |
| rgetsockname(fd, addr, addrlen) : |
| real.getsockname(fd, addr, addrlen); |
| } |
| |
| int setsockopt(int socket, int level, int optname, |
| const void *optval, socklen_t optlen) |
| { |
| int fd; |
| return (fd_get(socket, &fd) == fd_rsocket) ? |
| rsetsockopt(fd, level, optname, optval, optlen) : |
| real.setsockopt(fd, level, optname, optval, optlen); |
| } |
| |
| int getsockopt(int socket, int level, int optname, |
| void *optval, socklen_t *optlen) |
| { |
| int fd; |
| return (fd_get(socket, &fd) == fd_rsocket) ? |
| rgetsockopt(fd, level, optname, optval, optlen) : |
| real.getsockopt(fd, level, optname, optval, optlen); |
| } |
| |
| int fcntl(int socket, int cmd, ... ) |
| { |
| va_list args; |
| long lparam; |
| void *pparam; |
| int fd, ret; |
| |
| init_preload(); |
| va_start(args, cmd); |
| switch (cmd) { |
| case F_GETFD: |
| case F_GETFL: |
| case F_GETOWN: |
| case F_GETSIG: |
| case F_GETLEASE: |
| ret = (fd_get(socket, &fd) == fd_rsocket) ? |
| rfcntl(fd, cmd) : real.fcntl(fd, cmd); |
| break; |
| case F_DUPFD: |
| |
| case F_SETFD: |
| case F_SETFL: |
| case F_SETOWN: |
| case F_SETSIG: |
| case F_SETLEASE: |
| case F_NOTIFY: |
| lparam = va_arg(args, long); |
| ret = (fd_get(socket, &fd) == fd_rsocket) ? |
| rfcntl(fd, cmd, lparam) : real.fcntl(fd, cmd, lparam); |
| break; |
| default: |
| pparam = va_arg(args, void *); |
| ret = (fd_get(socket, &fd) == fd_rsocket) ? |
| rfcntl(fd, cmd, pparam) : real.fcntl(fd, cmd, pparam); |
| break; |
| } |
| va_end(args); |
| return ret; |
| } |
| |
| |
| |
| |
| int dup2(int oldfd, int newfd) |
| { |
| struct fd_info *oldfdi, *newfdi; |
| int ret; |
| |
| init_preload(); |
| oldfdi = idm_lookup(&idm, oldfd); |
| if (oldfdi) { |
| if (oldfdi->state == fd_fork_passive) |
| fork_passive(oldfd); |
| else if (oldfdi->state == fd_fork_active) |
| fork_active(oldfd); |
| } |
| |
| newfdi = idm_lookup(&idm, newfd); |
| if (newfdi) { |
| |
| if (atomic_load(&newfdi->refcnt) > 1) |
| return ERR(EBUSY); |
| close(newfd); |
| } |
| |
| ret = real.dup2(oldfd, newfd); |
| if (!oldfdi || ret != newfd) |
| return ret; |
| |
| newfdi = calloc(1, sizeof(*newfdi)); |
| if (!newfdi) { |
| close(newfd); |
| return ERR(ENOMEM); |
| } |
| |
| pthread_mutex_lock(&mut); |
| idm_set(&idm, newfd, newfdi); |
| pthread_mutex_unlock(&mut); |
| |
| newfdi->fd = oldfdi->fd; |
| newfdi->type = oldfdi->type; |
| if (oldfdi->dupfd != -1) { |
| newfdi->dupfd = oldfdi->dupfd; |
| oldfdi = idm_lookup(&idm, oldfdi->dupfd); |
| } else { |
| newfdi->dupfd = oldfd; |
| } |
| atomic_store(&newfdi->refcnt, 1); |
| atomic_fetch_add(&oldfdi->refcnt, 1); |
| return newfd; |
| } |
| |
| ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count) |
| { |
| void *file_addr; |
| int fd; |
| size_t ret; |
| |
| if (fd_get(out_fd, &fd) != fd_rsocket) |
| return real.sendfile(fd, in_fd, offset, count); |
| |
| file_addr = mmap(NULL, count, PROT_READ, 0, in_fd, offset ? *offset : 0); |
| if (file_addr == (void *) -1) |
| return -1; |
| |
| ret = rwrite(fd, file_addr, count); |
| if ((ret > 0) && offset) |
| lseek(in_fd, ret, SEEK_CUR); |
| munmap(file_addr, count); |
| return ret; |
| } |
| |
| int __fxstat(int ver, int socket, struct stat *buf) |
| { |
| int fd, ret; |
| |
| init_preload(); |
| if (fd_get(socket, &fd) == fd_rsocket) { |
| ret = real.fxstat(ver, socket, buf); |
| if (!ret) |
| buf->st_mode = (buf->st_mode & ~S_IFMT) | S_IFSOCK; |
| } else { |
| ret = real.fxstat(ver, fd, buf); |
| } |
| return ret; |
| } |