diff options
author | Maria Matejka <mq@ucw.cz> | 2023-04-02 19:15:22 +0200 |
---|---|---|
committer | Maria Matejka <mq@ucw.cz> | 2023-04-04 17:00:59 +0200 |
commit | 836e857b3098f8962c621a33f7ae5b532cf512f3 (patch) | |
tree | 96c62361fd7b2dd57cecdf9d9b9323f8aa95f021 /sysdep/unix/io.c | |
parent | 571c4f69bfbcf437d848b332bb2f4995fea2347d (diff) |
Sockets: Unified API for main and other loops
Now sk_open() requires an explicit IO loop to open the socket in. Also
specific functions for socket RX pause / resume are added to allow for
BGP corking.
And last but not least, socket reloop is now synchronous to resolve
weird cases of the target loop stopping before actually picking up the
relooped socket. Now the caller must ensure that both loops are locked
while relooping, and this way all sockets always have their respective
loop.
Diffstat (limited to 'sysdep/unix/io.c')
-rw-r--r-- | sysdep/unix/io.c | 155 |
1 files changed, 57 insertions, 98 deletions
diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index 06797096..88d187a4 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -722,10 +722,7 @@ sk_log_error(sock *s, const char *p) * Actual struct birdsock code */ -static struct birdsock *current_sock; -static struct birdsock *stored_sock; - -static inline sock * +sock * sk_next(sock *s) { if (!s->n.next->next) @@ -787,6 +784,7 @@ sk_ssh_free(sock *s) } #endif + static void sk_free(resource *r) { @@ -799,18 +797,10 @@ sk_free(resource *r) sk_ssh_free(s); #endif - if ((s->fd < 0) || (s->flags & SKF_THREAD)) - return; - - if (s == current_sock) - current_sock = sk_next(s); - if (s == stored_sock) - stored_sock = sk_next(s); - - if (enlisted(&s->n)) - rem_node(&s->n); + if (s->loop) + birdloop_remove_socket(s->loop, s); - if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE) + if (s->fd >= 0 && s->type != SK_SSH && s->type != SK_SSH_ACTIVE) close(s->fd); s->fd = -1; @@ -1023,12 +1013,6 @@ sk_setup(sock *s) } static void -sk_insert(sock *s) -{ - add_tail(&main_birdloop.sock_list, &s->n); -} - -static void sk_tcp_connected(sock *s) { sockaddr sa; @@ -1101,10 +1085,7 @@ sk_passive_connected(sock *s, int type) return 1; } - if (s->flags & SKF_PASSIVE_THREAD) - t->flags |= SKF_THREAD; - else - sk_insert(t); + birdloop_add_socket(s->loop, t); sk_alloc_bufs(t); s->rx_hook(t, 0); @@ -1319,6 +1300,7 @@ sk_open_ssh(sock *s) /** * sk_open - open a socket + * @loop: loop * @s: socket * * This function takes a socket resource created by sk_new() and @@ -1328,7 +1310,7 @@ sk_open_ssh(sock *s) * Result: 0 for success, -1 for an error. */ int -sk_open(sock *s) +sk_open(sock *s, struct birdloop *loop) { int af = AF_UNSPEC; int fd = -1; @@ -1481,9 +1463,7 @@ sk_open(sock *s) sk_alloc_bufs(s); } - if (!(s->flags & SKF_THREAD)) - sk_insert(s); - + birdloop_add_socket(loop, s); return 0; err: @@ -1493,7 +1473,7 @@ err: } int -sk_open_unix(sock *s, char *name) +sk_open_unix(sock *s, struct birdloop *loop, char *name) { struct sockaddr_un sa; int fd; @@ -1520,40 +1500,10 @@ sk_open_unix(sock *s, char *name) return -1; s->fd = fd; - sk_insert(s); + birdloop_add_socket(loop, s); return 0; } -static void -sk_reloop_hook(void *_vs) -{ - sock *s = _vs; - if (birdloop_inside(&main_birdloop)) - { - s->flags &= ~SKF_THREAD; - sk_insert(s); - } - else - { - s->flags |= SKF_THREAD; - sk_start(s); - } -} - -void -sk_reloop(sock *s, struct birdloop *loop) -{ - if (enlisted(&s->n)) - rem_node(&s->n); - - s->reloop = (event) { - .hook = sk_reloop_hook, - .data = s, - }; - - ev_send_loop(loop, &s->reloop); -} - #define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \ CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL) @@ -1676,6 +1626,13 @@ sk_recvmsg(sock *s) static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; } +_Bool +sk_tx_pending(sock *s) +{ + return s->ttx != s->tpos; +} + + static int sk_maybe_write(sock *s) { @@ -1686,7 +1643,7 @@ sk_maybe_write(sock *s) case SK_TCP: case SK_MAGIC: case SK_UNIX: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = write(s->fd, s->ttx, s->tpos - s->ttx); @@ -1708,7 +1665,7 @@ sk_maybe_write(sock *s) #ifdef HAVE_LIBSSH case SK_SSH: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx); @@ -1791,7 +1748,12 @@ sk_send(sock *s, unsigned len) { s->ttx = s->tbuf; s->tpos = s->tbuf + len; - return sk_maybe_write(s); + + int e = sk_maybe_write(s); + if (e == 0) /* Trigger thread poll reload to poll this socket's write. */ + socket_changed(s); + + return e; } /** @@ -1838,7 +1800,7 @@ call_rx_hook(sock *s, int size) if (s->rx_hook(s, size)) { /* We need to be careful since the socket could have been deleted by the hook */ - if (current_sock == s) + if (s->loop->sock_active == s) s->rpos = s->rbuf; } } @@ -2002,7 +1964,7 @@ sk_write_noflush(sock *s) #endif default: - if (s->ttx != s->tpos && sk_maybe_write(s) > 0) + if (sk_tx_pending(s) && sk_maybe_write(s) > 0) { if (s->tx_hook) s->tx_hook(s); @@ -2224,6 +2186,8 @@ static int short_loops = 0; #define SHORT_LOOP_MAX 10 #define WORK_EVENTS_MAX 10 +sock *stored_sock; + void io_loop(void) { @@ -2312,17 +2276,13 @@ io_loop(void) times_update(); /* guaranteed to be non-empty */ - current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock) + while (main_birdloop.sock_active) + { + sock *s = main_birdloop.sock_active; + if (s->index != -1) { - sock *s = current_sock; - if (s->index == -1) - { - current_sock = sk_next(s); - goto next; - } - int e; int steps; @@ -2333,10 +2293,11 @@ io_loop(void) steps--; io_log_event(s->rx_hook, s->data); e = sk_read(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next; } - while (e && s->rx_hook && steps); + while (e && (main_birdloop.sock_active == s) && s->rx_hook && steps); + + if (s != main_birdloop.sock_active) + continue; steps = MAX_STEPS; if (pfd.pfd.data[s->index].revents & POLLOUT) @@ -2345,56 +2306,54 @@ io_loop(void) steps--; io_log_event(s->tx_hook, s->data); e = sk_write(s); - if (s != current_sock) - goto next; } - while (e && steps); + while (e && (main_birdloop.sock_active == s) && steps); - current_sock = sk_next(s); - next: ; + if (s != main_birdloop.sock_active) + continue; } + main_birdloop.sock_active = sk_next(s); + } + short_loops++; if (events && (short_loops < SHORT_LOOP_MAX)) continue; short_loops = 0; int count = 0; - current_sock = stored_sock; - if (current_sock == NULL) - current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); + main_birdloop.sock_active = stored_sock; + if (main_birdloop.sock_active == NULL) + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock && count < MAX_RX_STEPS) + while (main_birdloop.sock_active && count < MAX_RX_STEPS) { - sock *s = current_sock; + sock *s = main_birdloop.sock_active; if (s->index == -1) - { - current_sock = sk_next(s); - goto next2; - } + goto next2; if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook) { count++; io_log_event(s->rx_hook, s->data); sk_read(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next2; + if (s != main_birdloop.sock_active) + continue; } if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR)) { sk_err(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next2; + if (s != main_birdloop.sock_active) + continue; } - current_sock = sk_next(s); next2: ; + main_birdloop.sock_active = sk_next(s); } - stored_sock = current_sock; + stored_sock = main_birdloop.sock_active; } } } |