diff options
Diffstat (limited to 'sysdep/unix')
-rw-r--r-- | sysdep/unix/io-loop.c | 206 | ||||
-rw-r--r-- | sysdep/unix/io-loop.h | 9 | ||||
-rw-r--r-- | sysdep/unix/io.c | 155 | ||||
-rw-r--r-- | sysdep/unix/main.c | 2 | ||||
-rw-r--r-- | sysdep/unix/unix.h | 5 |
5 files changed, 204 insertions, 173 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index df162c6e..0e2396f7 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -310,59 +310,95 @@ sockets_init(struct birdloop *loop) loop->sock_num = 0; } -static void -sockets_add(struct birdloop *loop, sock *s) +void +socket_changed(sock *s) +{ + struct birdloop *loop = s->loop; + ASSERT_DIE(birdloop_inside(loop)); + + loop->sock_changed++; + birdloop_ping(loop); +} + +void +birdloop_add_socket(struct birdloop *loop, sock *s) { + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(!s->loop); + LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num); add_tail(&loop->sock_list, &s->n); loop->sock_num++; + s->loop = loop; s->index = -1; - if (loop->thread) - atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); - birdloop_ping(loop); + socket_changed(s); } +extern sock *stored_sock; /* mainloop hack */ + void -sk_start(sock *s) +birdloop_remove_socket(struct birdloop *loop, sock *s) { - ASSERT_DIE(birdloop_current != &main_birdloop); - sockets_add(birdloop_current, s); -} + ASSERT_DIE(!enlisted(&s->n) == !s->loop); -static void -sockets_remove(struct birdloop *loop, sock *s) -{ - if (!enlisted(&s->n)) + if (!s->loop) return; + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(s->loop == loop); + /* Decouple the socket from the loop at all. */ LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num); + if (loop->sock_active == s) + loop->sock_active = sk_next(s); + + if ((loop == &main_birdloop) && (s == stored_sock)) + stored_sock = sk_next(s); + rem_node(&s->n); loop->sock_num--; - if (loop->thread) - atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); + socket_changed(s); + + s->loop = NULL; s->index = -1; +} - /* Close the filedescriptor. If it ever gets into the poll(), it just returns - * POLLNVAL for this fd which then is ignored because nobody checks for - * that result. Or some other routine opens another fd, getting this number, - * yet also in this case poll() at worst spuriously returns and nobody checks - * for the result in this fd. No further precaution is needed. */ - close(s->fd); +void +sk_reloop(sock *s, struct birdloop *loop) +{ + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(birdloop_inside(s->loop)); + + if (loop == s->loop) + return; + + birdloop_remove_socket(s->loop, s); + birdloop_add_socket(loop, s); +} + +void +sk_pause_rx(struct birdloop *loop, sock *s) +{ + ASSERT_DIE(birdloop_inside(loop)); + s->rx_hook = NULL; + socket_changed(s); } void -sk_stop(sock *s) +sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint)) { - sockets_remove(birdloop_current, s); + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(hook); + s->rx_hook = hook; + socket_changed(s); } static inline uint sk_want_events(sock *s) -{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } +{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); } void sockets_prepare(struct birdloop *loop, struct pfd *pfd) @@ -392,40 +428,60 @@ sockets_prepare(struct birdloop *loop, struct pfd *pfd) int sk_read(sock *s, int revents); int sk_write(sock *s); +void sk_err(sock *s, int revents); -static void +static int sockets_fire(struct birdloop *loop) { + if (EMPTY_LIST(loop->sock_list)) + return 0; + + int sch = 0; + times_update(); struct pollfd *pfd = loop->thread->pfd->pfd.data; - sock *s; node *n, *nxt; - WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n) + loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list)); + + while (loop->sock_active) { - if (s->index < 0) - continue; + sock *s = loop->sock_active; - int rev = pfd[s->index].revents; + int rev; + if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL)) + { + int e = 1; - if (!rev) - continue; + if (rev & POLLOUT) + { + while ((s == loop->sock_active) && (e = sk_write(s))) + ; - if (rev & POLLNVAL) - bug("poll: invalid fd %d", s->fd); + if (s != loop->sock_active) + continue; - int e = 1; + if (!sk_tx_pending(s)) + sch++; + } - if (rev & POLLIN) - while (e && s->rx_hook) - e = sk_read(s, rev); + if (rev & POLLIN) + while (e && (s == loop->sock_active) && s->rx_hook) + e = sk_read(s, rev); - if (rev & POLLOUT) - { - atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); - while (e = sk_write(s)) - ; + if (s != loop->sock_active) + continue; + + if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR)) + sk_err(s, rev); + + if (s != loop->sock_active) + continue; } + + loop->sock_active = sk_next(s); } + + return sch; } /* @@ -547,7 +603,8 @@ bird_thread_main(void *arg) thr->meta->thread = thr; birdloop_enter(thr->meta); - u32 refresh_sockets = 1; + thr->sock_changed = 1; + struct pfd pfd; BUFFER_INIT(pfd.pfd, thr->pool, 16); BUFFER_INIT(pfd.loop, thr->pool, 16); @@ -563,7 +620,7 @@ bird_thread_main(void *arg) { birdloop_enter(loop); if (!EMPTY_LIST(loop->sock_list)) - refresh_sockets = 1; + thr->sock_changed = 1; birdloop_leave(loop); } @@ -590,10 +647,10 @@ bird_thread_main(void *arg) ev_run_list(&thr->priority_events); /* Do we have to refresh sockets? */ - refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel); - - if (refresh_sockets) + if (thr->sock_changed) { + thr->sock_changed = 0; + BUFFER_FLUSH(pfd.pfd); BUFFER_FLUSH(pfd.loop); @@ -608,7 +665,6 @@ bird_thread_main(void *arg) } ASSERT_DIE(pfd.loop.used == pfd.pfd.used); - refresh_sockets = 0; } /* Nothing to do in at least 5 seconds, flush local hot page cache */ else if (timeout > 5000) @@ -957,6 +1013,15 @@ birdloop_init(void) static void birdloop_stop_internal(struct birdloop *loop) { + LOOP_TRACE(loop, "Stopping"); + + /* Block incoming pings */ + u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + while (!atomic_compare_exchange_strong_explicit( + &loop->thread_transition, <t, LTT_PING, + memory_order_acq_rel, memory_order_acquire)) + ; + /* Flush remaining events */ ASSERT_DIE(!ev_run_list(&loop->event_list)); @@ -965,17 +1030,27 @@ birdloop_stop_internal(struct birdloop *loop) while (t = timers_first(&loop->time)) tm_stop(t); - /* No sockets allowed */ - ASSERT_DIE(EMPTY_LIST(loop->sock_list)); + /* Drop sockets */ + sock *s; + WALK_LIST_FIRST2(s, n, loop->sock_list) + birdloop_remove_socket(loop, s); /* Unschedule from Meta */ ev_postpone(&loop->event); tm_stop(&loop->timer); - /* Declare loop stopped */ + /* Remove from thread loop list */ rem_node(&loop->n); + loop->thread = NULL; + + /* Leave the loop context without causing any other fuss */ + ASSERT_DIE(!ev_active(&loop->event)); + loop->ping_pending = 0; birdloop_leave(loop); + /* Request local socket reload */ + this_thread->sock_changed++; + /* Tail-call the stopped hook */ loop->stopped(loop->stop_data); } @@ -989,12 +1064,14 @@ birdloop_run(void *_loop) struct birdloop *loop = _loop; birdloop_enter(loop); + LOOP_TRACE(loop, "Regular run"); + if (loop->stopped) /* Birdloop left inside the helper function */ return birdloop_stop_internal(loop); /* Process sockets */ - sockets_fire(loop); + this_thread->sock_changed += sockets_fire(loop); /* Run timers */ timers_fire(&loop->time, 0); @@ -1016,6 +1093,10 @@ birdloop_run(void *_loop) else tm_stop(&loop->timer); + /* Collect socket change requests */ + this_thread->sock_changed += loop->sock_changed; + loop->sock_changed = 0; + birdloop_leave(loop); } @@ -1074,6 +1155,8 @@ birdloop_new(pool *pp, uint order, const char *name) static void birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { + LOOP_TRACE(loop, "Stop requested"); + loop->stopped = stopped; loop->stop_data = data; @@ -1100,8 +1183,7 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat void birdloop_free(struct birdloop *loop) { - ASSERT_DIE(loop->links == 0); - ASSERT_DIE(birdloop_in_this_thread(loop)); + ASSERT_DIE(loop->thread == NULL); domain_free(loop->time.domain); rfree(loop->pool); @@ -1171,20 +1253,6 @@ birdloop_unmask_wakeups(struct birdloop *loop) } void -birdloop_link(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - loop->links++; -} - -void -birdloop_unlink(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - loop->links--; -} - -void birdloop_yield(void) { usleep(100); diff --git a/sysdep/unix/io-loop.h b/sysdep/unix/io-loop.h index e606f07e..7ec903af 100644 --- a/sysdep/unix/io-loop.h +++ b/sysdep/unix/io-loop.h @@ -22,6 +22,7 @@ struct pfd { }; void sockets_prepare(struct birdloop *, struct pfd *); +void socket_changed(struct birdsock *); void pipe_new(struct pipe *); void pipe_pollin(struct pipe *, struct pfd *); @@ -40,12 +41,12 @@ struct birdloop struct timeloop time; event_list event_list; list sock_list; + struct birdsock *sock_active; int sock_num; + uint sock_changed; uint ping_pending; - uint links; - _Atomic u32 thread_transition; #define LTT_PING 1 #define LTT_MOVE 2 @@ -66,8 +67,6 @@ struct bird_thread { node n; - _Atomic u32 poll_changed; - struct pipe wakeup; event_list priority_events; @@ -83,6 +82,8 @@ struct bird_thread struct pfd *pfd; event cleanup_event; + + int sock_changed; }; #endif diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index 06797096..88d187a4 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -722,10 +722,7 @@ sk_log_error(sock *s, const char *p) * Actual struct birdsock code */ -static struct birdsock *current_sock; -static struct birdsock *stored_sock; - -static inline sock * +sock * sk_next(sock *s) { if (!s->n.next->next) @@ -787,6 +784,7 @@ sk_ssh_free(sock *s) } #endif + static void sk_free(resource *r) { @@ -799,18 +797,10 @@ sk_free(resource *r) sk_ssh_free(s); #endif - if ((s->fd < 0) || (s->flags & SKF_THREAD)) - return; - - if (s == current_sock) - current_sock = sk_next(s); - if (s == stored_sock) - stored_sock = sk_next(s); - - if (enlisted(&s->n)) - rem_node(&s->n); + if (s->loop) + birdloop_remove_socket(s->loop, s); - if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE) + if (s->fd >= 0 && s->type != SK_SSH && s->type != SK_SSH_ACTIVE) close(s->fd); s->fd = -1; @@ -1023,12 +1013,6 @@ sk_setup(sock *s) } static void -sk_insert(sock *s) -{ - add_tail(&main_birdloop.sock_list, &s->n); -} - -static void sk_tcp_connected(sock *s) { sockaddr sa; @@ -1101,10 +1085,7 @@ sk_passive_connected(sock *s, int type) return 1; } - if (s->flags & SKF_PASSIVE_THREAD) - t->flags |= SKF_THREAD; - else - sk_insert(t); + birdloop_add_socket(s->loop, t); sk_alloc_bufs(t); s->rx_hook(t, 0); @@ -1319,6 +1300,7 @@ sk_open_ssh(sock *s) /** * sk_open - open a socket + * @loop: loop * @s: socket * * This function takes a socket resource created by sk_new() and @@ -1328,7 +1310,7 @@ sk_open_ssh(sock *s) * Result: 0 for success, -1 for an error. */ int -sk_open(sock *s) +sk_open(sock *s, struct birdloop *loop) { int af = AF_UNSPEC; int fd = -1; @@ -1481,9 +1463,7 @@ sk_open(sock *s) sk_alloc_bufs(s); } - if (!(s->flags & SKF_THREAD)) - sk_insert(s); - + birdloop_add_socket(loop, s); return 0; err: @@ -1493,7 +1473,7 @@ err: } int -sk_open_unix(sock *s, char *name) +sk_open_unix(sock *s, struct birdloop *loop, char *name) { struct sockaddr_un sa; int fd; @@ -1520,40 +1500,10 @@ sk_open_unix(sock *s, char *name) return -1; s->fd = fd; - sk_insert(s); + birdloop_add_socket(loop, s); return 0; } -static void -sk_reloop_hook(void *_vs) -{ - sock *s = _vs; - if (birdloop_inside(&main_birdloop)) - { - s->flags &= ~SKF_THREAD; - sk_insert(s); - } - else - { - s->flags |= SKF_THREAD; - sk_start(s); - } -} - -void -sk_reloop(sock *s, struct birdloop *loop) -{ - if (enlisted(&s->n)) - rem_node(&s->n); - - s->reloop = (event) { - .hook = sk_reloop_hook, - .data = s, - }; - - ev_send_loop(loop, &s->reloop); -} - #define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \ CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL) @@ -1676,6 +1626,13 @@ sk_recvmsg(sock *s) static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; } +_Bool +sk_tx_pending(sock *s) +{ + return s->ttx != s->tpos; +} + + static int sk_maybe_write(sock *s) { @@ -1686,7 +1643,7 @@ sk_maybe_write(sock *s) case SK_TCP: case SK_MAGIC: case SK_UNIX: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = write(s->fd, s->ttx, s->tpos - s->ttx); @@ -1708,7 +1665,7 @@ sk_maybe_write(sock *s) #ifdef HAVE_LIBSSH case SK_SSH: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx); @@ -1791,7 +1748,12 @@ sk_send(sock *s, unsigned len) { s->ttx = s->tbuf; s->tpos = s->tbuf + len; - return sk_maybe_write(s); + + int e = sk_maybe_write(s); + if (e == 0) /* Trigger thread poll reload to poll this socket's write. */ + socket_changed(s); + + return e; } /** @@ -1838,7 +1800,7 @@ call_rx_hook(sock *s, int size) if (s->rx_hook(s, size)) { /* We need to be careful since the socket could have been deleted by the hook */ - if (current_sock == s) + if (s->loop->sock_active == s) s->rpos = s->rbuf; } } @@ -2002,7 +1964,7 @@ sk_write_noflush(sock *s) #endif default: - if (s->ttx != s->tpos && sk_maybe_write(s) > 0) + if (sk_tx_pending(s) && sk_maybe_write(s) > 0) { if (s->tx_hook) s->tx_hook(s); @@ -2224,6 +2186,8 @@ static int short_loops = 0; #define SHORT_LOOP_MAX 10 #define WORK_EVENTS_MAX 10 +sock *stored_sock; + void io_loop(void) { @@ -2312,17 +2276,13 @@ io_loop(void) times_update(); /* guaranteed to be non-empty */ - current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock) + while (main_birdloop.sock_active) + { + sock *s = main_birdloop.sock_active; + if (s->index != -1) { - sock *s = current_sock; - if (s->index == -1) - { - current_sock = sk_next(s); - goto next; - } - int e; int steps; @@ -2333,10 +2293,11 @@ io_loop(void) steps--; io_log_event(s->rx_hook, s->data); e = sk_read(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next; } - while (e && s->rx_hook && steps); + while (e && (main_birdloop.sock_active == s) && s->rx_hook && steps); + + if (s != main_birdloop.sock_active) + continue; steps = MAX_STEPS; if (pfd.pfd.data[s->index].revents & POLLOUT) @@ -2345,56 +2306,54 @@ io_loop(void) steps--; io_log_event(s->tx_hook, s->data); e = sk_write(s); - if (s != current_sock) - goto next; } - while (e && steps); + while (e && (main_birdloop.sock_active == s) && steps); - current_sock = sk_next(s); - next: ; + if (s != main_birdloop.sock_active) + continue; } + main_birdloop.sock_active = sk_next(s); + } + short_loops++; if (events && (short_loops < SHORT_LOOP_MAX)) continue; short_loops = 0; int count = 0; - current_sock = stored_sock; - if (current_sock == NULL) - current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); + main_birdloop.sock_active = stored_sock; + if (main_birdloop.sock_active == NULL) + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock && count < MAX_RX_STEPS) + while (main_birdloop.sock_active && count < MAX_RX_STEPS) { - sock *s = current_sock; + sock *s = main_birdloop.sock_active; if (s->index == -1) - { - current_sock = sk_next(s); - goto next2; - } + goto next2; if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook) { count++; io_log_event(s->rx_hook, s->data); sk_read(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next2; + if (s != main_birdloop.sock_active) + continue; } if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR)) { sk_err(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next2; + if (s != main_birdloop.sock_active) + continue; } - current_sock = sk_next(s); next2: ; + main_birdloop.sock_active = sk_next(s); } - stored_sock = current_sock; + stored_sock = main_birdloop.sock_active; } } } diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c index a9ae842a..79db32d3 100644 --- a/sysdep/unix/main.c +++ b/sysdep/unix/main.c @@ -542,7 +542,7 @@ cli_init_unix(uid_t use_uid, gid_t use_gid) /* Return value intentionally ignored */ unlink(path_control_socket); - if (sk_open_unix(s, path_control_socket) < 0) + if (sk_open_unix(s, &main_birdloop, path_control_socket) < 0) die("Cannot create control socket %s: %m", path_control_socket); if (use_uid || use_gid) diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h index e26afe41..606b79cd 100644 --- a/sysdep/unix/unix.h +++ b/sysdep/unix/unix.h @@ -9,6 +9,9 @@ #ifndef _BIRD_UNIX_H_ #define _BIRD_UNIX_H_ +#include "nest/bird.h" +#include "lib/io-loop.h" + #include <sys/socket.h> #include <signal.h> @@ -110,7 +113,7 @@ extern volatile sig_atomic_t async_shutdown_flag; void io_init(void); void io_loop(void); void io_log_dump(void); -int sk_open_unix(struct birdsock *s, char *name); +int sk_open_unix(struct birdsock *s, struct birdloop *, char *name); struct rfile *rf_open(struct pool *, const char *name, const char *mode); void *rf_file(struct rfile *f); int rf_fileno(struct rfile *f); |