From c887eabd45164e5810f80e2dbd57b144ebca1288 Mon Sep 17 00:00:00 2001 From: Isaac Jurado Date: Sat, 8 Nov 2014 18:30:57 +0100 Subject: [PATCH 1/4] Add support for listening to UNIX sockets Extend the semantics of the "-l" command line option to support creating local sockets by using the "unix:" prefix, similarly to nginx. --- net.c | 316 ++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 206 insertions(+), 110 deletions(-) diff --git a/net.c b/net.c index 958a106f..4621fa87 100644 --- a/net.c +++ b/net.c @@ -7,41 +7,35 @@ #include #include #include +#include +#include #include #include -int -make_server_socket(char *host, char *port) +static int +set_nonblocking(int fd) { - int fd = -1, flags, r; - struct linger linger = {0, 0}; - struct addrinfo *airoot, *ai, hints; + int flags, r; - /* See if we got a listen fd from systemd. If so, all socket options etc - * are already set, so we check that the fd is a TCP listen socket and - * return. */ - r = sd_listen_fds(1); - if (r < 0) { - return twarn("sd_listen_fds"), -1; + flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) { + twarn("getting flags"); + return -1; } - if (r > 0) { - if (r > 1) { - twarnx("inherited more than one listen socket;" - " ignoring all but the first"); - } - fd = SD_LISTEN_FDS_START; - r = sd_is_socket_inet(fd, 0, SOCK_STREAM, 1, 0); - if (r < 0) { - errno = -r; - twarn("sd_is_socket_inet"); - return -1; - } - if (!r) { - twarnx("inherited fd is not a TCP listen socket"); - return -1; - } - return fd; + r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (r == -1) { + twarn("setting O_NONBLOCK"); + return -1; } + return 0; +} + +static int +make_inet_socket(char *host, char *port) +{ + int fd = -1, flags, r; + struct linger linger = {0, 0}; + struct addrinfo *airoot, *ai, hints; memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; @@ -49,101 +43,203 @@ make_server_socket(char *host, char *port) hints.ai_flags = AI_PASSIVE; r = getaddrinfo(host, port, &hints, &airoot); if (r != 0) { - twarnx("getaddrinfo(): %s", gai_strerror(r)); - return -1; + twarnx("getaddrinfo(): %s", gai_strerror(r)); + return -1; } - for(ai = airoot; ai; ai = ai->ai_next) { - fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); - if (fd == -1) { - twarn("socket()"); - continue; - } + for (ai = airoot; ai; ai = ai->ai_next) { + fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (fd == -1) { + twarn("socket()"); + continue; + } - flags = fcntl(fd, F_GETFL, 0); - if (flags < 0) { - twarn("getting flags"); - close(fd); - continue; - } + r = set_nonblocking(fd); + if (r == -1) { + close(fd); + continue; + } - r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - if (r == -1) { - twarn("setting O_NONBLOCK"); - close(fd); - continue; - } + flags = 1; + r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof flags); + if (r == -1) { + twarn("setting SO_REUSEADDR on fd %d", fd); + close(fd); + continue; + } + r = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof flags); + if (r == -1) { + twarn("setting SO_KEEPALIVE on fd %d", fd); + close(fd); + continue; + } + r = setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger); + if (r == -1) { + twarn("setting SO_LINGER on fd %d", fd); + close(fd); + continue; + } + r = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof flags); + if (r == -1) { + twarn("setting TCP_NODELAY on fd %d", fd); + close(fd); + continue; + } - flags = 1; - r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof flags); - if (r == -1) { - twarn("setting SO_REUSEADDR on fd %d", fd); - close(fd); - continue; - } - r = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof flags); - if (r == -1) { - twarn("setting SO_KEEPALIVE on fd %d", fd); - close(fd); - continue; - } - r = setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger); - if (r == -1) { - twarn("setting SO_LINGER on fd %d", fd); - close(fd); - continue; - } - r = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof flags); - if (r == -1) { - twarn("setting TCP_NODELAY on fd %d", fd); + r = bind(fd, ai->ai_addr, ai->ai_addrlen); + if (r == -1) { + twarn("bind()"); + close(fd); + continue; + } + if (verbose) { + char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV], *h = host, *p = port; + struct sockaddr_in addr; + socklen_t addrlen; + + addrlen = sizeof(addr); + r = getsockname(fd, (struct sockaddr *) &addr, &addrlen); + if (!r) { + r = getnameinfo((struct sockaddr *) &addr, addrlen, + hbuf, sizeof(hbuf), + pbuf, sizeof(pbuf), + NI_NUMERICHOST|NI_NUMERICSERV); + if (!r) { + h = hbuf; + p = pbuf; + } + } + if (ai->ai_family == AF_INET6) { + printf("bind %d [%s]:%s\n", fd, h, p); + } else { + printf("bind %d %s:%s\n", fd, h, p); + } + } + + r = listen(fd, 1024); + if (r == -1) { + twarn("listen()"); + close(fd); + continue; + } + + break; + } + + freeaddrinfo(airoot); + + if(ai == NULL) + fd = -1; + + return fd; +} + +static int +make_unix_socket(char *path) +{ + int fd = -1, r; + struct stat st; + struct sockaddr_un addr; + const size_t maxlen = sizeof(addr.sun_path) - 1; // Reserve the last position for '\0' + + memset(&addr, 0, sizeof(struct sockaddr_un)); + addr.sun_family = AF_UNIX; + if (strlen(path) > maxlen) { + warnx("socket path %s is too long (%ld characters), where maximum allowed is %ld", + path, strlen(path), maxlen); + return -1; + } + strncpy(addr.sun_path, path, maxlen); + + r = stat(path, &st); + if (r == 0) { + if (S_ISSOCK(st.st_mode)) { + warnx("removing existing local socket to replace it"); + r = unlink(path); + if (r == -1) { + twarn("unlink"); + return -1; + } + } else { + twarnx("another file already exists in the given path"); + return -1; + } + } + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd == -1) { + twarn("socket()"); + return -1; + } + + r = set_nonblocking(fd); + if (r == -1) { close(fd); - continue; - } + return -1; + } - r = bind(fd, ai->ai_addr, ai->ai_addrlen); - if (r == -1) { + r = bind(fd, (struct sockaddr *) &addr, sizeof(struct sockaddr_un)); + if (r == -1) { twarn("bind()"); close(fd); - continue; - } - if (verbose) { - char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV], *h = host, *p = port; - struct sockaddr_in addr; - socklen_t addrlen; - - addrlen = sizeof(addr); - r = getsockname(fd, (struct sockaddr *) &addr, &addrlen); - if (!r) { - r = getnameinfo((struct sockaddr *) &addr, addrlen, - hbuf, sizeof(hbuf), - pbuf, sizeof(pbuf), - NI_NUMERICHOST|NI_NUMERICSERV); - if (!r) { - h = hbuf; - p = pbuf; - } - } - if (ai->ai_family == AF_INET6) { - printf("bind %d [%s]:%s\n", fd, h, p); - } else { - printf("bind %d %s:%s\n", fd, h, p); - } - } - - r = listen(fd, 1024); - if (r == -1) { + return -1; + } + if (verbose) { + printf("bind %d %s\n", fd, path); + } + + r = listen(fd, 1024); + if (r == -1) { twarn("listen()"); close(fd); - continue; - } - - break; + return -1; } - freeaddrinfo(airoot); + return fd; +} - if(ai == NULL) - fd = -1; +int +make_server_socket(char *host, char *port) +{ + int fd = -1, r; - return fd; + /* See if we got a listen fd from systemd. If so, all socket options etc + * are already set, so we check that the fd is a TCP or UNIX listen socket + * and return. */ + r = sd_listen_fds(1); + if (r < 0) { + return twarn("sd_listen_fds"), -1; + } + if (r > 0) { + if (r > 1) { + twarnx("inherited more than one listen socket;" + " ignoring all but the first"); + } + fd = SD_LISTEN_FDS_START; + r = sd_is_socket_inet(fd, 0, SOCK_STREAM, 1, 0); + if (r < 0) { + twarn("sd_is_socket_inet"); + errno = -r; + return -1; + } + if (r == 0) { + r = sd_is_socket_unix(fd, SOCK_STREAM, 1, NULL, 0); + if (r < 0) { + twarn("sd_is_socket_unix"); + errno = -r; + return -1; + } + if (r == 0) { + twarnx("inherited fd is not a TCP or UNIX listening socket"); + return -1; + } + } + return fd; + } + + if (host && !strncmp(host, "unix:", 5)) { + return make_unix_socket(&host[5]); + } else { + return make_inet_socket(host, port); + } } From ff35633227ae72aba8e6913efa787ee8ee204c01 Mon Sep 17 00:00:00 2001 From: Isaac Jurado Date: Sat, 8 Nov 2014 18:51:44 +0100 Subject: [PATCH 2/4] Use a generic address storage for accept() Now that AF_UNIX socket family is supported, a more generic sockaddr variant can be used. This is not essential for local socket support, it is only a way to be more pedantic. --- prot.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/prot.c b/prot.c index e62f6268..2931bb2d 100644 --- a/prot.c +++ b/prot.c @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include @@ -2112,7 +2111,7 @@ void h_accept(const int fd, const short which, Server *s) { UNUSED_PARAMETER(which); - struct sockaddr_in6 addr; + struct sockaddr_storage addr; socklen_t addrlen = sizeof addr; int cfd = accept(fd, (struct sockaddr *)&addr, &addrlen); From ce9e1291a1850b3cabe1f288dc623eaf25daa300 Mon Sep 17 00:00:00 2001 From: Isaac Jurado Date: Sat, 8 Nov 2014 18:53:30 +0100 Subject: [PATCH 3/4] Update the man page to explain local socket support --- doc/beanstalkd.1 | 5 ++++- doc/beanstalkd.1.html | 6 +++++- doc/beanstalkd.ronn | 4 ++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/doc/beanstalkd.1 b/doc/beanstalkd.1 index 43ff0f43..b344433e 100644 --- a/doc/beanstalkd.1 +++ b/doc/beanstalkd.1 @@ -1,7 +1,7 @@ .\" generated with Ronn/v0.7.3 .\" http://github.com/rtomayko/ronn/tree/0.7.3 . -.TH "BEANSTALKD" "1" "April 2012" "" "" +.TH "BEANSTALKD" "1" "August 2019" "" "" . .SH "NAME" \fBbeanstalkd\fR \- simple, fast work queue @@ -57,6 +57,9 @@ Show a brief help message and exit\. Listen on address \fIaddr\fR (default is 0\.0\.0\.0)\. . .IP +When \fIaddr\fR starts with "unix:", the unprefixed value of it will be used as the local filesystem path to create a UNIX socket instead of a TCP socket\. In this case the value of \fB\-p\fR will be ignored\. +. +.IP (Option \fB\-l\fR has no effect if sd\-daemon(5) socket activation is being used\. See also \fIENVIRONMENT\fR\.) . .TP diff --git a/doc/beanstalkd.1.html b/doc/beanstalkd.1.html index aba5e863..af0c2b1d 100644 --- a/doc/beanstalkd.1.html +++ b/doc/beanstalkd.1.html @@ -121,6 +121,10 @@

OPTIONS

-h

Show a brief help message and exit.

-l addr

Listen on address addr (default is 0.0.0.0).

+

When addr starts with "unix:", the unprefixed value of it will be +used as the local filesystem path to create a UNIX socket instead of +a TCP socket. In this case the value of -p will be ignored.

+

(Option -l has no effect if sd-daemon(5) socket activation is being used. See also ENVIRONMENT.)

-n

Turn off binlog compaction, negating -c.

@@ -167,7 +171,7 @@

AUTHOR

  1. -
  2. April 2012
  3. +
  4. August 2019
  5. beanstalkd(1)
diff --git a/doc/beanstalkd.ronn b/doc/beanstalkd.ronn index d5c42841..0efc35a2 100644 --- a/doc/beanstalkd.ronn +++ b/doc/beanstalkd.ronn @@ -59,6 +59,10 @@ and format of the `beanstalkd` protocol. * `-l` : Listen on address (default is 0.0.0.0). + When starts with "unix:", the unprefixed value of it will be + used as the local filesystem path to create a UNIX socket instead of + a TCP socket. In this case the value of `-p` will be ignored. + (Option `-l` has no effect if sd-daemon(5) socket activation is being used. See also [ENVIRONMENT][].) From 4225bc2bc5d8bd7fe2a6c2cd1ea61fb3cacfb791 Mon Sep 17 00:00:00 2001 From: Isaac Jurado Date: Sat, 10 Aug 2019 00:31:58 +0200 Subject: [PATCH 4/4] Add some tests over unix sockets. To increase Codecov coverage report, add a couple of tests that exercise the newly added code. --- testserv.c | 106 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 97 insertions(+), 9 deletions(-) diff --git a/testserv.c b/testserv.c index 35c40f56..2c905919 100644 --- a/testserv.c +++ b/testserv.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -30,6 +31,15 @@ static int64 timeout = 5000000000LL; static byte fallocpat[3]; +static int +exist(char *path) +{ + struct stat s; + + int r = stat(path, &s); + return r != -1; +} + static int wrapfalloc(int fd, int size) { @@ -100,6 +110,29 @@ mustdiallocal(int port) return fd; } +static int +mustdialunix(char *socket_file) +{ + struct sockaddr_un addr; + const size_t maxlen = sizeof(addr.sun_path); + addr.sun_family = AF_UNIX; + snprintf(addr.sun_path, maxlen, "%s", socket_file); + + int fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd == -1) { + twarn("socket"); + exit(1); + } + + int r = connect(fd, (struct sockaddr *)&addr, sizeof addr); + if (r == -1) { + twarn("connect"); + exit(1); + } + + return fd; +} + static void exit_process(int signum) { @@ -142,6 +175,7 @@ kill_srvpid(void) } #define SERVER() (progname=__func__, mustforksrv()) +#define SERVER_UNIX() (progname=__func__, mustforksrv_unix()) // Forks the server storing the pid in srvpid. // The parent process returns port assigned. @@ -189,6 +223,44 @@ mustforksrv(void) exit(1); /* satisfy the compiler */ } +static char * +mustforksrv_unix(void) +{ + static char path[90]; + char name[95]; + snprintf(path, sizeof(path), "%s/socket", ctdir()); + snprintf(name, sizeof(name), "unix:%s", path); + srv.sock.fd = make_server_socket(name, NULL); + if (srv.sock.fd == -1) { + puts("mustforksrv_unix failed"); + exit(1); + } + + srvpid = fork(); + if (srvpid < 0) { + twarn("fork"); + exit(1); + } + + if (srvpid > 0) { + // On exit the parent (test) sends SIGTERM to the child. + atexit(kill_srvpid); + printf("start server socket=%s\n", path); + assert(exist(path)); + return path; + } + + /* now in child */ + + set_sig_handler(); + prot_init(); + + srv_acquire_wal(&srv); + + srvserve(&srv); /* does not return */ + exit(1); /* satisfy the compiler */ +} + static char * readline(int fd) { @@ -293,15 +365,6 @@ filesize(char *path) return s.st_size; } -static int -exist(char *path) -{ - struct stat s; - - int r = stat(path, &s); - return r != -1; -} - void cttest_unknown_command() { @@ -363,6 +426,31 @@ cttest_peek_not_found() ckresp(fd, "NOT_FOUND\r\n"); } +void +cttest_peek_ok_unix() +{ + char *name = SERVER_UNIX(); + int fd = mustdialunix(name); + mustsend(fd, "put 0 0 1 1\r\n"); + mustsend(fd, "a\r\n"); + ckresp(fd, "INSERTED 1\r\n"); + + mustsend(fd, "peek 1\r\n"); + ckresp(fd, "FOUND 1 1\r\n"); + ckresp(fd, "a\r\n"); + + unlink(name); +} + +void +cttest_unix_auto_removal() +{ + // Twice, to trigger autoremoval + SERVER_UNIX(); + kill_srvpid(); + SERVER_UNIX(); +} + void cttest_peek_bad_format() {