diff options
author | Maria Matejka <mq@ucw.cz> | 2023-04-02 19:15:22 +0200 |
---|---|---|
committer | Maria Matejka <mq@ucw.cz> | 2023-04-04 17:00:59 +0200 |
commit | 836e857b3098f8962c621a33f7ae5b532cf512f3 (patch) | |
tree | 96c62361fd7b2dd57cecdf9d9b9323f8aa95f021 /sysdep/unix | |
parent | 571c4f69bfbcf437d848b332bb2f4995fea2347d (diff) |
Sockets: Unified API for main and other loops
Now sk_open() requires an explicit IO loop to open the socket in. Also
specific functions for socket RX pause / resume are added to allow for
BGP corking.
And last but not least, socket reloop is now synchronous to resolve
weird cases of the target loop stopping before actually picking up the
relooped socket. Now the caller must ensure that both loops are locked
while relooping, and this way all sockets always have their respective
loop.
Diffstat (limited to 'sysdep/unix')
-rw-r--r-- | sysdep/unix/io-loop.c | 206 | ||||
-rw-r--r-- | sysdep/unix/io-loop.h | 9 | ||||
-rw-r--r-- | sysdep/unix/io.c | 155 | ||||
-rw-r--r-- | sysdep/unix/main.c | 2 | ||||
-rw-r--r-- | sysdep/unix/unix.h | 5 |
5 files changed, 204 insertions, 173 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index df162c6e..0e2396f7 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -310,59 +310,95 @@ sockets_init(struct birdloop *loop) loop->sock_num = 0; } -static void -sockets_add(struct birdloop *loop, sock *s) +void +socket_changed(sock *s) +{ + struct birdloop *loop = s->loop; + ASSERT_DIE(birdloop_inside(loop)); + + loop->sock_changed++; + birdloop_ping(loop); +} + +void +birdloop_add_socket(struct birdloop *loop, sock *s) { + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(!s->loop); + LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num); add_tail(&loop->sock_list, &s->n); loop->sock_num++; + s->loop = loop; s->index = -1; - if (loop->thread) - atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); - birdloop_ping(loop); + socket_changed(s); } +extern sock *stored_sock; /* mainloop hack */ + void -sk_start(sock *s) +birdloop_remove_socket(struct birdloop *loop, sock *s) { - ASSERT_DIE(birdloop_current != &main_birdloop); - sockets_add(birdloop_current, s); -} + ASSERT_DIE(!enlisted(&s->n) == !s->loop); -static void -sockets_remove(struct birdloop *loop, sock *s) -{ - if (!enlisted(&s->n)) + if (!s->loop) return; + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(s->loop == loop); + /* Decouple the socket from the loop at all. */ LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num); + if (loop->sock_active == s) + loop->sock_active = sk_next(s); + + if ((loop == &main_birdloop) && (s == stored_sock)) + stored_sock = sk_next(s); + rem_node(&s->n); loop->sock_num--; - if (loop->thread) - atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); + socket_changed(s); + + s->loop = NULL; s->index = -1; +} - /* Close the filedescriptor. If it ever gets into the poll(), it just returns - * POLLNVAL for this fd which then is ignored because nobody checks for - * that result. Or some other routine opens another fd, getting this number, - * yet also in this case poll() at worst spuriously returns and nobody checks - * for the result in this fd. No further precaution is needed. */ - close(s->fd); +void +sk_reloop(sock *s, struct birdloop *loop) +{ + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(birdloop_inside(s->loop)); + + if (loop == s->loop) + return; + + birdloop_remove_socket(s->loop, s); + birdloop_add_socket(loop, s); +} + +void +sk_pause_rx(struct birdloop *loop, sock *s) +{ + ASSERT_DIE(birdloop_inside(loop)); + s->rx_hook = NULL; + socket_changed(s); } void -sk_stop(sock *s) +sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint)) { - sockets_remove(birdloop_current, s); + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(hook); + s->rx_hook = hook; + socket_changed(s); } static inline uint sk_want_events(sock *s) -{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } +{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); } void sockets_prepare(struct birdloop *loop, struct pfd *pfd) @@ -392,40 +428,60 @@ sockets_prepare(struct birdloop *loop, struct pfd *pfd) int sk_read(sock *s, int revents); int sk_write(sock *s); +void sk_err(sock *s, int revents); -static void +static int sockets_fire(struct birdloop *loop) { + if (EMPTY_LIST(loop->sock_list)) + return 0; + + int sch = 0; + times_update(); struct pollfd *pfd = loop->thread->pfd->pfd.data; - sock *s; node *n, *nxt; - WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n) + loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list)); + + while (loop->sock_active) { - if (s->index < 0) - continue; + sock *s = loop->sock_active; - int rev = pfd[s->index].revents; + int rev; + if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL)) + { + int e = 1; - if (!rev) - continue; + if (rev & POLLOUT) + { + while ((s == loop->sock_active) && (e = sk_write(s))) + ; - if (rev & POLLNVAL) - bug("poll: invalid fd %d", s->fd); + if (s != loop->sock_active) + continue; - int e = 1; + if (!sk_tx_pending(s)) + sch++; + } - if (rev & POLLIN) - while (e && s->rx_hook) - e = sk_read(s, rev); + if (rev & POLLIN) + while (e && (s == loop->sock_active) && s->rx_hook) + e = sk_read(s, rev); - if (rev & POLLOUT) - { - atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); - while (e = sk_write(s)) - ; + if (s != loop->sock_active) + continue; + + if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR)) + sk_err(s, rev); + + if (s != loop->sock_active) + continue; } + + loop->sock_active = sk_next(s); } + + return sch; } /* @@ -547,7 +603,8 @@ bird_thread_main(void *arg) thr->meta->thread = thr; birdloop_enter(thr->meta); - u32 refresh_sockets = 1; + thr->sock_changed = 1; + struct pfd pfd; BUFFER_INIT(pfd.pfd, thr->pool, 16); BUFFER_INIT(pfd.loop, thr->pool, 16); @@ -563,7 +620,7 @@ bird_thread_main(void *arg) { birdloop_enter(loop); if (!EMPTY_LIST(loop->sock_list)) - refresh_sockets = 1; + thr->sock_changed = 1; birdloop_leave(loop); } @@ -590,10 +647,10 @@ bird_thread_main(void *arg) ev_run_list(&thr->priority_events); /* Do we have to refresh sockets? */ - refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel); - - if (refresh_sockets) + if (thr->sock_changed) { + thr->sock_changed = 0; + BUFFER_FLUSH(pfd.pfd); BUFFER_FLUSH(pfd.loop); @@ -608,7 +665,6 @@ bird_thread_main(void *arg) } ASSERT_DIE(pfd.loop.used == pfd.pfd.used); - refresh_sockets = 0; } /* Nothing to do in at least 5 seconds, flush local hot page cache */ else if (timeout > 5000) @@ -957,6 +1013,15 @@ birdloop_init(void) static void birdloop_stop_internal(struct birdloop *loop) { + LOOP_TRACE(loop, "Stopping"); + + /* Block incoming pings */ + u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + while (!atomic_compare_exchange_strong_explicit( + &loop->thread_transition, <t, LTT_PING, + memory_order_acq_rel, memory_order_acquire)) + ; + /* Flush remaining events */ ASSERT_DIE(!ev_run_list(&loop->event_list)); @@ -965,17 +1030,27 @@ birdloop_stop_internal(struct birdloop *loop) while (t = timers_first(&loop->time)) tm_stop(t); - /* No sockets allowed */ - ASSERT_DIE(EMPTY_LIST(loop->sock_list)); + /* Drop sockets */ + sock *s; + WALK_LIST_FIRST2(s, n, loop->sock_list) + birdloop_remove_socket(loop, s); /* Unschedule from Meta */ ev_postpone(&loop->event); tm_stop(&loop->timer); - /* Declare loop stopped */ + /* Remove from thread loop list */ rem_node(&loop->n); + loop->thread = NULL; + + /* Leave the loop context without causing any other fuss */ + ASSERT_DIE(!ev_active(&loop->event)); + loop->ping_pending = 0; birdloop_leave(loop); + /* Request local socket reload */ + this_thread->sock_changed++; + /* Tail-call the stopped hook */ loop->stopped(loop->stop_data); } @@ -989,12 +1064,14 @@ birdloop_run(void *_loop) struct birdloop *loop = _loop; birdloop_enter(loop); + LOOP_TRACE(loop, "Regular run"); + if (loop->stopped) /* Birdloop left inside the helper function */ return birdloop_stop_internal(loop); /* Process sockets */ - sockets_fire(loop); + this_thread->sock_changed += sockets_fire(loop); /* Run timers */ timers_fire(&loop->time, 0); @@ -1016,6 +1093,10 @@ birdloop_run(void *_loop) else tm_stop(&loop->timer); + /* Collect socket change requests */ + this_thread->sock_changed += loop->sock_changed; + loop->sock_changed = 0; + birdloop_leave(loop); } @@ -1074,6 +1155,8 @@ birdloop_new(pool *pp, uint order, const char *name) static void birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { + LOOP_TRACE(loop, "Stop requested"); + loop->stopped = stopped; loop->stop_data = data; @@ -1100,8 +1183,7 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat void birdloop_free(struct birdloop *loop) { - ASSERT_DIE(loop->links == 0); - ASSERT_DIE(birdloop_in_this_thread(loop)); + ASSERT_DIE(loop->thread == NULL); domain_free(loop->time.domain); rfree(loop->pool); @@ -1171,20 +1253,6 @@ birdloop_unmask_wakeups(struct birdloop *loop) } void -birdloop_link(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - loop->links++; -} - -void -birdloop_unlink(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - loop->links--; -} - -void birdloop_yield(void) { usleep(100); diff --git a/sysdep/unix/io-loop.h b/sysdep/unix/io-loop.h index e606f07e..7ec903af 100644 --- a/sysdep/unix/io-loop.h +++ b/sysdep/unix/io-loop.h @@ -22,6 +22,7 @@ struct pfd { }; void sockets_prepare(struct birdloop *, struct pfd *); +void socket_changed(struct birdsock *); void pipe_new(struct pipe *); void pipe_pollin(struct pipe *, struct pfd *); @@ -40,12 +41,12 @@ struct birdloop struct timeloop time; event_list event_list; list sock_list; + struct birdsock *sock_active; int sock_num; + uint sock_changed; uint ping_pending; - uint links; - _Atomic u32 thread_transition; #define LTT_PING 1 #define LTT_MOVE 2 @@ -66,8 +67,6 @@ struct bird_thread { node n; - _Atomic u32 poll_changed; - struct pipe wakeup; event_list priority_events; @@ -83,6 +82,8 @@ struct bird_thread struct pfd *pfd; event cleanup_event; + + int sock_changed; }; #endif diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index 06797096..88d187a4 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -722,10 +722,7 @@ sk_log_error(sock *s, const char *p) * Actual struct birdsock code */ -static struct birdsock *current_sock; -static struct birdsock *stored_sock; - -static inline sock * +sock * sk_next(sock *s) { if (!s->n.next->next) @@ -787,6 +784,7 @@ sk_ssh_free(sock *s) } #endif + static void sk_free(resource *r) { @@ -799,18 +797,10 @@ sk_free(resource *r) sk_ssh_free(s); #endif - if ((s->fd < 0) || (s->flags & SKF_THREAD)) - return; - - if (s == current_sock) - current_sock = sk_next(s); - if (s == stored_sock) - stored_sock = sk_next(s); - - if (enlisted(&s->n)) - rem_node(&s->n); + if (s->loop) + birdloop_remove_socket(s->loop, s); - if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE) + if (s->fd >= 0 && s->type != SK_SSH && s->type != SK_SSH_ACTIVE) close(s->fd); s->fd = -1; @@ -1023,12 +1013,6 @@ sk_setup(sock *s) } static void -sk_insert(sock *s) -{ - add_tail(&main_birdloop.sock_list, &s->n); -} - -static void sk_tcp_connected(sock *s) { sockaddr sa; @@ -1101,10 +1085,7 @@ sk_passive_connected(sock *s, int type) return 1; } - if (s->flags & SKF_PASSIVE_THREAD) - t->flags |= SKF_THREAD; - else - sk_insert(t); + birdloop_add_socket(s->loop, t); sk_alloc_bufs(t); s->rx_hook(t, 0); @@ -1319,6 +1300,7 @@ sk_open_ssh(sock *s) /** * sk_open - open a socket + * @loop: loop * @s: socket * * This function takes a socket resource created by sk_new() and @@ -1328,7 +1310,7 @@ sk_open_ssh(sock *s) * Result: 0 for success, -1 for an error. */ int -sk_open(sock *s) +sk_open(sock *s, struct birdloop *loop) { int af = AF_UNSPEC; int fd = -1; @@ -1481,9 +1463,7 @@ sk_open(sock *s) sk_alloc_bufs(s); } - if (!(s->flags & SKF_THREAD)) - sk_insert(s); - + birdloop_add_socket(loop, s); return 0; err: @@ -1493,7 +1473,7 @@ err: } int -sk_open_unix(sock *s, char *name) +sk_open_unix(sock *s, struct birdloop *loop, char *name) { struct sockaddr_un sa; int fd; @@ -1520,40 +1500,10 @@ sk_open_unix(sock *s, char *name) return -1; s->fd = fd; - sk_insert(s); + birdloop_add_socket(loop, s); return 0; } -static void -sk_reloop_hook(void *_vs) -{ - sock *s = _vs; - if (birdloop_inside(&main_birdloop)) - { - s->flags &= ~SKF_THREAD; - sk_insert(s); - } - else - { - s->flags |= SKF_THREAD; - sk_start(s); - } -} - -void -sk_reloop(sock *s, struct birdloop *loop) -{ - if (enlisted(&s->n)) - rem_node(&s->n); - - s->reloop = (event) { - .hook = sk_reloop_hook, - .data = s, - }; - - ev_send_loop(loop, &s->reloop); -} - #define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \ CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL) @@ -1676,6 +1626,13 @@ sk_recvmsg(sock *s) static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; } +_Bool +sk_tx_pending(sock *s) +{ + return s->ttx != s->tpos; +} + + static int sk_maybe_write(sock *s) { @@ -1686,7 +1643,7 @@ sk_maybe_write(sock *s) case SK_TCP: case SK_MAGIC: case SK_UNIX: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = write(s->fd, s->ttx, s->tpos - s->ttx); @@ -1708,7 +1665,7 @@ sk_maybe_write(sock *s) #ifdef HAVE_LIBSSH case SK_SSH: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx); @@ -1791,7 +1748,12 @@ sk_send(sock *s, unsigned len) { s->ttx = s->tbuf; s->tpos = s->tbuf + len; - return sk_maybe_write(s); + + int e = sk_maybe_write(s); + if (e == 0) /* Trigger thread poll reload to poll this socket's write. */ + socket_changed(s); + + return e; } /** @@ -1838,7 +1800,7 @@ call_rx_hook(sock *s, int size) if (s->rx_hook(s, size)) { /* We need to be careful since the socket could have been deleted by the hook */ - if (current_sock == s) + if (s->loop->sock_active == s) s->rpos = s->rbuf; } } @@ -2002,7 +1964,7 @@ sk_write_noflush(sock *s) #endif default: - if (s->ttx != s->tpos && sk_maybe_write(s) > 0) + if (sk_tx_pending(s) && sk_maybe_write(s) > 0) { if (s->tx_hook) s->tx_hook(s); @@ -2224,6 +2186,8 @@ static int short_loops = 0; #define SHORT_LOOP_MAX 10 #define WORK_EVENTS_MAX 10 +sock *stored_sock; + void io_loop(void) { @@ -2312,17 +2276,13 @@ io_loop(void) times_update(); /* guaranteed to be non-empty */ - current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock) + while (main_birdloop.sock_active) + { + sock *s = main_birdloop.sock_active; + if (s->index != -1) { - sock *s = current_sock; - if (s->index == -1) - { - current_sock = sk_next(s); - goto next; - } - int e; int steps; @@ -2333,10 +2293,11 @@ io_loop(void) steps--; io_log_event(s->rx_hook, s->data); e = sk_read(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next; } - while (e && s->rx_hook && steps); + while (e && (main_birdloop.sock_active == s) && s->rx_hook && steps); + + if (s != main_birdloop.sock_active) + continue; steps = MAX_STEPS; if (pfd.pfd.data[s->index].revents & POLLOUT) @@ -2345,56 +2306,54 @@ io_loop(void) steps--; io_log_event(s->tx_hook, s->data); e = sk_write(s); - if (s != current_sock) - goto next; } - while (e && steps); + while (e && (main_birdloop.sock_active == s) && steps); - current_sock = sk_next(s); - next: ; + if (s != main_birdloop.sock_active) + continue; } + main_birdloop.sock_active = sk_next(s); + } + short_loops++; if (events && (short_loops < SHORT_LOOP_MAX)) continue; short_loops = 0; int count = 0; - current_sock = stored_sock; - if (current_sock == NULL) - current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); + main_birdloop.sock_active = stored_sock; + if (main_birdloop.sock_active == NULL) + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock && count < MAX_RX_STEPS) + while (main_birdloop.sock_active && count < MAX_RX_STEPS) { - sock *s = current_sock; + sock *s = main_birdloop.sock_active; if (s->index == -1) - { - current_sock = sk_next(s); - goto next2; - } + goto next2; if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook) { count++; io_log_event(s->rx_hook, s->data); sk_read(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next2; + if (s != main_birdloop.sock_active) + continue; } if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR)) { sk_err(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next2; + if (s != main_birdloop.sock_active) + continue; } - current_sock = sk_next(s); next2: ; + main_birdloop.sock_active = sk_next(s); } - stored_sock = current_sock; + stored_sock = main_birdloop.sock_active; } } } diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c index a9ae842a..79db32d3 100644 --- a/sysdep/unix/main.c +++ b/sysdep/unix/main.c @@ -542,7 +542,7 @@ cli_init_unix(uid_t use_uid, gid_t use_gid) /* Return value intentionally ignored */ unlink(path_control_socket); - if (sk_open_unix(s, path_control_socket) < 0) + if (sk_open_unix(s, &main_birdloop, path_control_socket) < 0) die("Cannot create control socket %s: %m", path_control_socket); if (use_uid || use_gid) diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h index e26afe41..606b79cd 100644 --- a/sysdep/unix/unix.h +++ b/sysdep/unix/unix.h @@ -9,6 +9,9 @@ #ifndef _BIRD_UNIX_H_ #define _BIRD_UNIX_H_ +#include "nest/bird.h" +#include "lib/io-loop.h" + #include <sys/socket.h> #include <signal.h> @@ -110,7 +113,7 @@ extern volatile sig_atomic_t async_shutdown_flag; void io_init(void); void io_loop(void); void io_log_dump(void); -int sk_open_unix(struct birdsock *s, char *name); +int sk_open_unix(struct birdsock *s, struct birdloop *, char *name); struct rfile *rf_open(struct pool *, const char *name, const char *mode); void *rf_file(struct rfile *f); int rf_fileno(struct rfile *f); |