diff options
-rw-r--r-- | lib/io-loop.h | 8 | ||||
-rw-r--r-- | lib/socket.h | 12 | ||||
-rw-r--r-- | proto/babel/packets.c | 2 | ||||
-rw-r--r-- | proto/bfd/bfd.c | 11 | ||||
-rw-r--r-- | proto/bfd/packets.c | 15 | ||||
-rw-r--r-- | proto/bgp/bgp.c | 4 | ||||
-rw-r--r-- | proto/bgp/packets.c | 5 | ||||
-rw-r--r-- | proto/ospf/iface.c | 4 | ||||
-rw-r--r-- | proto/radv/packets.c | 2 | ||||
-rw-r--r-- | proto/rip/packets.c | 2 | ||||
-rw-r--r-- | proto/rpki/ssh_transport.c | 4 | ||||
-rw-r--r-- | proto/rpki/tcp_transport.c | 4 | ||||
-rw-r--r-- | proto/rpki/transport.c | 2 | ||||
-rw-r--r-- | sysdep/bsd/krt-sock.c | 2 | ||||
-rw-r--r-- | sysdep/linux/netlink.c | 2 | ||||
-rw-r--r-- | sysdep/unix/io-loop.c | 206 | ||||
-rw-r--r-- | sysdep/unix/io-loop.h | 9 | ||||
-rw-r--r-- | sysdep/unix/io.c | 155 | ||||
-rw-r--r-- | sysdep/unix/main.c | 2 | ||||
-rw-r--r-- | sysdep/unix/unix.h | 5 |
20 files changed, 236 insertions, 220 deletions
diff --git a/lib/io-loop.h b/lib/io-loop.h index ae58bbee..502d77fc 100644 --- a/lib/io-loop.h +++ b/lib/io-loop.h @@ -16,10 +16,6 @@ extern struct birdloop main_birdloop; -void sk_start(sock *s); -void sk_stop(sock *s); -void sk_reloop(sock *s, struct birdloop *loop); - /* Start a new birdloop owned by given pool and domain */ struct birdloop *birdloop_new(pool *p, uint order, const char *name); @@ -58,6 +54,10 @@ struct birdloop_flag_handler { void birdloop_flag(struct birdloop *loop, u32 flag); void birdloop_flag_set_handler(struct birdloop *, struct birdloop_flag_handler *); +/* Setup sockets */ +void birdloop_add_socket(struct birdloop *, struct birdsock *); +void birdloop_remove_socket(struct birdloop *, struct birdsock *); + void birdloop_init(void); /* Yield for a little while. Use only in special cases. */ diff --git a/lib/socket.h b/lib/socket.h index 5c69482e..4b169581 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -80,17 +80,22 @@ typedef struct birdsock { const char *password; /* Password for MD5 authentication */ const char *err; /* Error message */ struct ssh_sock *ssh; /* Used in SK_SSH */ - struct event reloop; /* Reloop event */ + struct birdloop *loop; /* BIRDLoop owning this socket */ } sock; sock *sock_new(pool *); /* Allocate new socket */ #define sk_new(X) sock_new(X) /* Wrapper to avoid name collision with OpenSSL */ -int sk_open(sock *); /* Open socket */ +int sk_open(sock *, struct birdloop *); /* Open socket */ +void sk_reloop(sock *, struct birdloop *); /* Move socket to another loop. Both loops must be locked. */ + int sk_rx_ready(sock *s); +_Bool sk_tx_pending(sock *s); int sk_send(sock *, uint len); /* Send data, <0=err, >0=ok, 0=sleep */ int sk_send_to(sock *, uint len, ip_addr to, uint port); /* sk_send to given destination */ void sk_reallocate(sock *); /* Free and allocate tbuf & rbuf */ +void sk_pause_rx(struct birdloop *loop, sock *s); +void sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint)); void sk_set_rbsize(sock *s, uint val); /* Resize RX buffer */ void sk_set_tbsize(sock *s, uint val); /* Resize TX buffer, keeping content */ void sk_set_tbuf(sock *s, void *tbuf); /* Switch TX buffer, NULL-> return to internal */ @@ -114,6 +119,7 @@ int sk_set_icmp6_filter(sock *s, int p1, int p2); void sk_log_error(sock *s, const char *p); byte * sk_rx_buffer(sock *s, int *len); /* Temporary */ +sock *sk_next(sock *s); extern int sk_priority_control; /* Suggested priority for control traffic, should be sysdep define */ @@ -127,11 +133,9 @@ extern int sk_priority_control; /* Suggested priority for control traffic, shou #define SKF_HIGH_PORT 0x20 /* Choose port from high range if possible */ #define SKF_FREEBIND 0x40 /* Allow socket to bind to a nonlocal address */ -#define SKF_THREAD 0x100 /* Socked used in thread, Do not add to main loop */ #define SKF_TRUNCATED 0x200 /* Received packet was truncated, set by IO layer */ #define SKF_HDRINCL 0x400 /* Used internally */ #define SKF_PKTINFO 0x800 /* Used internally */ -#define SKF_PASSIVE_THREAD 0x1000 /* Child sockets used in thread, do not add to main loop */ /* * Socket types SA SP DA DP IF TTL SendTo (?=may, -=must not, *=must) diff --git a/proto/babel/packets.c b/proto/babel/packets.c index d4acc170..f13bb5ba 100644 --- a/proto/babel/packets.c +++ b/proto/babel/packets.c @@ -1617,7 +1617,7 @@ babel_open_socket(struct babel_iface *ifa) sk->ttl = 1; sk->flags = SKF_LADDR_RX; - if (sk_open(sk) < 0) + if (sk_open(sk, p->p.loop) < 0) goto err; if (sk_setup_multicast(sk) < 0) diff --git a/proto/bfd/bfd.c b/proto/bfd/bfd.c index 1ef92b18..b6ccdb9a 100644 --- a/proto/bfd/bfd.c +++ b/proto/bfd/bfd.c @@ -603,16 +603,10 @@ bfd_free_iface(struct bfd_iface *ifa) return; if (ifa->sk) - { - sk_stop(ifa->sk); rfree(ifa->sk); - } if (ifa->rx) - { - sk_stop(ifa->rx); rfree(ifa->rx); - } rem_node(&ifa->n); mb_free(ifa); @@ -1100,11 +1094,6 @@ bfd_shutdown(struct proto *P) bfd_drop_requests(p); - if (p->rx4_1) sk_stop(p->rx4_1); - if (p->rx4_m) sk_stop(p->rx4_m); - if (p->rx6_1) sk_stop(p->rx6_1); - if (p->rx6_m) sk_stop(p->rx6_m); - return PS_DOWN; } diff --git a/proto/bfd/packets.c b/proto/bfd/packets.c index 2200ab09..a22f223b 100644 --- a/proto/bfd/packets.c +++ b/proto/bfd/packets.c @@ -430,12 +430,11 @@ bfd_open_rx_sk(struct bfd_proto *p, int multihop, int af) /* TODO: configurable ToS and priority */ sk->tos = IP_PREC_INTERNET_CONTROL; sk->priority = sk_priority_control; - sk->flags = SKF_THREAD | SKF_LADDR_RX | (!multihop ? SKF_TTL_RX : 0); + sk->flags = SKF_LADDR_RX | (!multihop ? SKF_TTL_RX : 0); - if (sk_open(sk) < 0) + if (sk_open(sk, p->p.loop) < 0) goto err; - sk_start(sk); return sk; err: @@ -462,12 +461,11 @@ bfd_open_rx_sk_bound(struct bfd_proto *p, ip_addr local, struct iface *ifa) /* TODO: configurable ToS and priority */ sk->tos = IP_PREC_INTERNET_CONTROL; sk->priority = sk_priority_control; - sk->flags = SKF_THREAD | SKF_BIND | (ifa ? SKF_TTL_RX : 0); + sk->flags = SKF_BIND | (ifa ? SKF_TTL_RX : 0); - if (sk_open(sk) < 0) + if (sk_open(sk, p->p.loop) < 0) goto err; - sk_start(sk); return sk; err: @@ -494,12 +492,11 @@ bfd_open_tx_sk(struct bfd_proto *p, ip_addr local, struct iface *ifa) sk->tos = IP_PREC_INTERNET_CONTROL; sk->priority = sk_priority_control; sk->ttl = ifa ? 255 : -1; - sk->flags = SKF_THREAD | SKF_BIND | SKF_HIGH_PORT; + sk->flags = SKF_BIND | SKF_HIGH_PORT; - if (sk_open(sk) < 0) + if (sk_open(sk, p->p.loop) < 0) goto err; - sk_start(sk); return sk; err: diff --git a/proto/bgp/bgp.c b/proto/bgp/bgp.c index 122b0c22..a9028353 100644 --- a/proto/bgp/bgp.c +++ b/proto/bgp/bgp.c @@ -263,7 +263,7 @@ bgp_listen_create(void *_ UNUSED) sk->rx_hook = bgp_incoming_connection; sk->err_hook = bgp_listen_sock_err; - if (sk_open(sk) < 0) + if (sk_open(sk, &main_birdloop) < 0) { sk_log_error(sk, p->p.name); log(L_ERR "%s: Cannot open listening socket", p->p.name); @@ -1203,7 +1203,7 @@ bgp_connect(struct bgp_proto *p) /* Enter Connect state and start establishing c bgp_setup_sk(conn, s); bgp_conn_set_state(conn, BS_CONNECT); - if (sk_open(s) < 0) + if (sk_open(s, p->p.loop) < 0) goto err; /* Set minimal receive TTL if needed */ diff --git a/proto/bgp/packets.c b/proto/bgp/packets.c index 924d6828..c90798a0 100644 --- a/proto/bgp/packets.c +++ b/proto/bgp/packets.c @@ -3015,7 +3015,7 @@ bgp_kick_tx(void *vconn) ; if (!max && !ev_active(conn->tx_ev)) - ev_schedule(conn->tx_ev); + proto_send_event(&conn->bgp->p, conn->tx_ev); } void @@ -3023,13 +3023,14 @@ bgp_tx(sock *sk) { struct bgp_conn *conn = sk->data; + ASSERT_DIE(birdloop_inside(conn->bgp->p.loop)); DBG("BGP: TX hook\n"); uint max = 1024; while (--max && (bgp_fire_tx(conn) > 0)) ; if (!max && !ev_active(conn->tx_ev)) - ev_schedule(conn->tx_ev); + proto_send_event(&conn->bgp->p, conn->tx_ev); } diff --git a/proto/ospf/iface.c b/proto/ospf/iface.c index 1919bccb..0aa7fa00 100644 --- a/proto/ospf/iface.c +++ b/proto/ospf/iface.c @@ -136,7 +136,7 @@ ospf_sk_open(struct ospf_iface *ifa) sk->flags = SKF_LADDR_RX | (ifa->check_ttl ? SKF_TTL_RX : 0); sk->ttl = ifa->cf->ttl_security ? 255 : 1; - if (sk_open(sk) < 0) + if (sk_open(sk, p->p.loop) < 0) goto err; /* 12 is an offset of the checksum in an OSPFv3 packet */ @@ -220,7 +220,7 @@ ospf_open_vlink_sk(struct ospf_proto *p) sk->data = (void *) p; sk->flags = 0; - if (sk_open(sk) < 0) + if (sk_open(sk, p->p.loop) < 0) goto err; /* 12 is an offset of the checksum in an OSPFv3 packet */ diff --git a/proto/radv/packets.c b/proto/radv/packets.c index 5cd8b2de..c6b565d2 100644 --- a/proto/radv/packets.c +++ b/proto/radv/packets.c @@ -493,7 +493,7 @@ radv_sk_open(struct radv_iface *ifa) sk->data = ifa; sk->flags = SKF_LADDR_RX; - if (sk_open(sk) < 0) + if (sk_open(sk, ifa->ra->p.loop) < 0) goto err; /* We want listen just to ICMPv6 messages of type RS and RA */ diff --git a/proto/rip/packets.c b/proto/rip/packets.c index 9c3bd7a3..fecdf896 100644 --- a/proto/rip/packets.c +++ b/proto/rip/packets.c @@ -1012,7 +1012,7 @@ rip_open_socket(struct rip_iface *ifa) /* sk->rbsize and sk->tbsize are handled in rip_iface_update_buffers() */ - if (sk_open(sk) < 0) + if (sk_open(sk, p->p.loop) < 0) goto err; if (ifa->cf->mode == RIP_IM_MULTICAST) diff --git a/proto/rpki/ssh_transport.c b/proto/rpki/ssh_transport.c index 223afa80..425ad460 100644 --- a/proto/rpki/ssh_transport.c +++ b/proto/rpki/ssh_transport.c @@ -35,11 +35,9 @@ rpki_tr_ssh_open(struct rpki_tr_sock *tr) sk->ssh->subsystem = "rpki-rtr"; sk->ssh->state = SK_SSH_CONNECT; - if (sk_open(sk) != 0) + if (sk_open(sk, cache->p->p.loop) != 0) return RPKI_TR_ERROR; - sk_start(sk); - return RPKI_TR_SUCCESS; } diff --git a/proto/rpki/tcp_transport.c b/proto/rpki/tcp_transport.c index 4e850c44..ebb8030f 100644 --- a/proto/rpki/tcp_transport.c +++ b/proto/rpki/tcp_transport.c @@ -28,11 +28,9 @@ rpki_tr_tcp_open(struct rpki_tr_sock *tr) sk->type = SK_TCP_ACTIVE; - if (sk_open(sk) != 0) + if (sk_open(sk, tr->cache->p->p.loop) != 0) return RPKI_TR_ERROR; - sk_start(sk); - return RPKI_TR_SUCCESS; } diff --git a/proto/rpki/transport.c b/proto/rpki/transport.c index 4026fca4..81bd6dd8 100644 --- a/proto/rpki/transport.c +++ b/proto/rpki/transport.c @@ -85,7 +85,6 @@ rpki_tr_open(struct rpki_tr_sock *tr) sk->rbsize = RPKI_RX_BUFFER_SIZE; sk->tbsize = RPKI_TX_BUFFER_SIZE; sk->tos = IP_PREC_INTERNET_CONTROL; - sk->flags |= SKF_THREAD; sk->vrf = cache->p->p.vrf; if (ipa_zero(sk->daddr) && sk->host) @@ -121,7 +120,6 @@ rpki_tr_close(struct rpki_tr_sock *tr) if (tr->sk) { - sk_stop(tr->sk); rfree(tr->sk); tr->sk = NULL; } diff --git a/sysdep/bsd/krt-sock.c b/sysdep/bsd/krt-sock.c index e7b21474..094268b7 100644 --- a/sysdep/bsd/krt-sock.c +++ b/sysdep/bsd/krt-sock.c @@ -1088,7 +1088,7 @@ krt_sock_open(pool *pool, void *data, int table_id UNUSED) sk->fd = fd; sk->data = data; - if (sk_open(sk) < 0) + if (sk_open(sk, &main_birdloop) < 0) bug("krt-sock: sk_open failed"); return sk; diff --git a/sysdep/linux/netlink.c b/sysdep/linux/netlink.c index b18cb899..e8a86ce4 100644 --- a/sysdep/linux/netlink.c +++ b/sysdep/linux/netlink.c @@ -2043,7 +2043,7 @@ nl_open_async(void) sk->rx_hook = nl_async_hook; sk->err_hook = nl_async_err_hook; sk->fd = fd; - if (sk_open(sk) < 0) + if (sk_open(sk, &main_birdloop) < 0) bug("Netlink: sk_open failed"); } diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index df162c6e..0e2396f7 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -310,59 +310,95 @@ sockets_init(struct birdloop *loop) loop->sock_num = 0; } -static void -sockets_add(struct birdloop *loop, sock *s) +void +socket_changed(sock *s) +{ + struct birdloop *loop = s->loop; + ASSERT_DIE(birdloop_inside(loop)); + + loop->sock_changed++; + birdloop_ping(loop); +} + +void +birdloop_add_socket(struct birdloop *loop, sock *s) { + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(!s->loop); + LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num); add_tail(&loop->sock_list, &s->n); loop->sock_num++; + s->loop = loop; s->index = -1; - if (loop->thread) - atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); - birdloop_ping(loop); + socket_changed(s); } +extern sock *stored_sock; /* mainloop hack */ + void -sk_start(sock *s) +birdloop_remove_socket(struct birdloop *loop, sock *s) { - ASSERT_DIE(birdloop_current != &main_birdloop); - sockets_add(birdloop_current, s); -} + ASSERT_DIE(!enlisted(&s->n) == !s->loop); -static void -sockets_remove(struct birdloop *loop, sock *s) -{ - if (!enlisted(&s->n)) + if (!s->loop) return; + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(s->loop == loop); + /* Decouple the socket from the loop at all. */ LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num); + if (loop->sock_active == s) + loop->sock_active = sk_next(s); + + if ((loop == &main_birdloop) && (s == stored_sock)) + stored_sock = sk_next(s); + rem_node(&s->n); loop->sock_num--; - if (loop->thread) - atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); + socket_changed(s); + + s->loop = NULL; s->index = -1; +} - /* Close the filedescriptor. If it ever gets into the poll(), it just returns - * POLLNVAL for this fd which then is ignored because nobody checks for - * that result. Or some other routine opens another fd, getting this number, - * yet also in this case poll() at worst spuriously returns and nobody checks - * for the result in this fd. No further precaution is needed. */ - close(s->fd); +void +sk_reloop(sock *s, struct birdloop *loop) +{ + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(birdloop_inside(s->loop)); + + if (loop == s->loop) + return; + + birdloop_remove_socket(s->loop, s); + birdloop_add_socket(loop, s); +} + +void +sk_pause_rx(struct birdloop *loop, sock *s) +{ + ASSERT_DIE(birdloop_inside(loop)); + s->rx_hook = NULL; + socket_changed(s); } void -sk_stop(sock *s) +sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint)) { - sockets_remove(birdloop_current, s); + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(hook); + s->rx_hook = hook; + socket_changed(s); } static inline uint sk_want_events(sock *s) -{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } +{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); } void sockets_prepare(struct birdloop *loop, struct pfd *pfd) @@ -392,40 +428,60 @@ sockets_prepare(struct birdloop *loop, struct pfd *pfd) int sk_read(sock *s, int revents); int sk_write(sock *s); +void sk_err(sock *s, int revents); -static void +static int sockets_fire(struct birdloop *loop) { + if (EMPTY_LIST(loop->sock_list)) + return 0; + + int sch = 0; + times_update(); struct pollfd *pfd = loop->thread->pfd->pfd.data; - sock *s; node *n, *nxt; - WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n) + loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list)); + + while (loop->sock_active) { - if (s->index < 0) - continue; + sock *s = loop->sock_active; - int rev = pfd[s->index].revents; + int rev; + if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL)) + { + int e = 1; - if (!rev) - continue; + if (rev & POLLOUT) + { + while ((s == loop->sock_active) && (e = sk_write(s))) + ; - if (rev & POLLNVAL) - bug("poll: invalid fd %d", s->fd); + if (s != loop->sock_active) + continue; - int e = 1; + if (!sk_tx_pending(s)) + sch++; + } - if (rev & POLLIN) - while (e && s->rx_hook) - e = sk_read(s, rev); + if (rev & POLLIN) + while (e && (s == loop->sock_active) && s->rx_hook) + e = sk_read(s, rev); - if (rev & POLLOUT) - { - atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); - while (e = sk_write(s)) - ; + if (s != loop->sock_active) + continue; + + if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR)) + sk_err(s, rev); + + if (s != loop->sock_active) + continue; } + + loop->sock_active = sk_next(s); } + + return sch; } /* @@ -547,7 +603,8 @@ bird_thread_main(void *arg) thr->meta->thread = thr; birdloop_enter(thr->meta); - u32 refresh_sockets = 1; + thr->sock_changed = 1; + struct pfd pfd; BUFFER_INIT(pfd.pfd, thr->pool, 16); BUFFER_INIT(pfd.loop, thr->pool, 16); @@ -563,7 +620,7 @@ bird_thread_main(void *arg) { birdloop_enter(loop); if (!EMPTY_LIST(loop->sock_list)) - refresh_sockets = 1; + thr->sock_changed = 1; birdloop_leave(loop); } @@ -590,10 +647,10 @@ bird_thread_main(void *arg) ev_run_list(&thr->priority_events); /* Do we have to refresh sockets? */ - refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel); - - if (refresh_sockets) + if (thr->sock_changed) { + thr->sock_changed = 0; + BUFFER_FLUSH(pfd.pfd); BUFFER_FLUSH(pfd.loop); @@ -608,7 +665,6 @@ bird_thread_main(void *arg) } ASSERT_DIE(pfd.loop.used == pfd.pfd.used); - refresh_sockets = 0; } /* Nothing to do in at least 5 seconds, flush local hot page cache */ else if (timeout > 5000) @@ -957,6 +1013,15 @@ birdloop_init(void) static void birdloop_stop_internal(struct birdloop *loop) { + LOOP_TRACE(loop, "Stopping"); + + /* Block incoming pings */ + u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + while (!atomic_compare_exchange_strong_explicit( + &loop->thread_transition, <t, LTT_PING, + memory_order_acq_rel, memory_order_acquire)) + ; + /* Flush remaining events */ ASSERT_DIE(!ev_run_list(&loop->event_list)); @@ -965,17 +1030,27 @@ birdloop_stop_internal(struct birdloop *loop) while (t = timers_first(&loop->time)) tm_stop(t); - /* No sockets allowed */ - ASSERT_DIE(EMPTY_LIST(loop->sock_list)); + /* Drop sockets */ + sock *s; + WALK_LIST_FIRST2(s, n, loop->sock_list) + birdloop_remove_socket(loop, s); /* Unschedule from Meta */ ev_postpone(&loop->event); tm_stop(&loop->timer); - /* Declare loop stopped */ + /* Remove from thread loop list */ rem_node(&loop->n); + loop->thread = NULL; + + /* Leave the loop context without causing any other fuss */ + ASSERT_DIE(!ev_active(&loop->event)); + loop->ping_pending = 0; birdloop_leave(loop); + /* Request local socket reload */ + this_thread->sock_changed++; + /* Tail-call the stopped hook */ loop->stopped(loop->stop_data); } @@ -989,12 +1064,14 @@ birdloop_run(void *_loop) struct birdloop *loop = _loop; birdloop_enter(loop); + LOOP_TRACE(loop, "Regular run"); + if (loop->stopped) /* Birdloop left inside the helper function */ return birdloop_stop_internal(loop); /* Process sockets */ - sockets_fire(loop); + this_thread->sock_changed += sockets_fire(loop); /* Run timers */ timers_fire(&loop->time, 0); @@ -1016,6 +1093,10 @@ birdloop_run(void *_loop) else tm_stop(&loop->timer); + /* Collect socket change requests */ + this_thread->sock_changed += loop->sock_changed; + loop->sock_changed = 0; + birdloop_leave(loop); } @@ -1074,6 +1155,8 @@ birdloop_new(pool *pp, uint order, const char *name) static void birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { + LOOP_TRACE(loop, "Stop requested"); + loop->stopped = stopped; loop->stop_data = data; @@ -1100,8 +1183,7 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat void birdloop_free(struct birdloop *loop) { - ASSERT_DIE(loop->links == 0); - ASSERT_DIE(birdloop_in_this_thread(loop)); + ASSERT_DIE(loop->thread == NULL); domain_free(loop->time.domain); rfree(loop->pool); @@ -1171,20 +1253,6 @@ birdloop_unmask_wakeups(struct birdloop *loop) } void -birdloop_link(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - loop->links++; -} - -void -birdloop_unlink(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - loop->links--; -} - -void birdloop_yield(void) { usleep(100); diff --git a/sysdep/unix/io-loop.h b/sysdep/unix/io-loop.h index e606f07e..7ec903af 100644 --- a/sysdep/unix/io-loop.h +++ b/sysdep/unix/io-loop.h @@ -22,6 +22,7 @@ struct pfd { }; void sockets_prepare(struct birdloop *, struct pfd *); +void socket_changed(struct birdsock *); void pipe_new(struct pipe *); void pipe_pollin(struct pipe *, struct pfd *); @@ -40,12 +41,12 @@ struct birdloop struct timeloop time; event_list event_list; list sock_list; + struct birdsock *sock_active; int sock_num; + uint sock_changed; uint ping_pending; - uint links; - _Atomic u32 thread_transition; #define LTT_PING 1 #define LTT_MOVE 2 @@ -66,8 +67,6 @@ struct bird_thread { node n; - _Atomic u32 poll_changed; - struct pipe wakeup; event_list priority_events; @@ -83,6 +82,8 @@ struct bird_thread struct pfd *pfd; event cleanup_event; + + int sock_changed; }; #endif diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index 06797096..88d187a4 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -722,10 +722,7 @@ sk_log_error(sock *s, const char *p) * Actual struct birdsock code */ -static struct birdsock *current_sock; -static struct birdsock *stored_sock; - -static inline sock * +sock * sk_next(sock *s) { if (!s->n.next->next) @@ -787,6 +784,7 @@ sk_ssh_free(sock *s) } #endif + static void sk_free(resource *r) { @@ -799,18 +797,10 @@ sk_free(resource *r) sk_ssh_free(s); #endif - if ((s->fd < 0) || (s->flags & SKF_THREAD)) - return; - - if (s == current_sock) - current_sock = sk_next(s); - if (s == stored_sock) - stored_sock = sk_next(s); - - if (enlisted(&s->n)) - rem_node(&s->n); + if (s->loop) + birdloop_remove_socket(s->loop, s); - if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE) + if (s->fd >= 0 && s->type != SK_SSH && s->type != SK_SSH_ACTIVE) close(s->fd); s->fd = -1; @@ -1023,12 +1013,6 @@ sk_setup(sock *s) } static void -sk_insert(sock *s) -{ - add_tail(&main_birdloop.sock_list, &s->n); -} - -static void sk_tcp_connected(sock *s) { sockaddr sa; @@ -1101,10 +1085,7 @@ sk_passive_connected(sock *s, int type) return 1; } - if (s->flags & SKF_PASSIVE_THREAD) - t->flags |= SKF_THREAD; - else - sk_insert(t); + birdloop_add_socket(s->loop, t); sk_alloc_bufs(t); s->rx_hook(t, 0); @@ -1319,6 +1300,7 @@ sk_open_ssh(sock *s) /** * sk_open - open a socket + * @loop: loop * @s: socket * * This function takes a socket resource created by sk_new() and @@ -1328,7 +1310,7 @@ sk_open_ssh(sock *s) * Result: 0 for success, -1 for an error. */ int -sk_open(sock *s) +sk_open(sock *s, struct birdloop *loop) { int af = AF_UNSPEC; int fd = -1; @@ -1481,9 +1463,7 @@ sk_open(sock *s) sk_alloc_bufs(s); } - if (!(s->flags & SKF_THREAD)) - sk_insert(s); - + birdloop_add_socket(loop, s); return 0; err: @@ -1493,7 +1473,7 @@ err: } int -sk_open_unix(sock *s, char *name) +sk_open_unix(sock *s, struct birdloop *loop, char *name) { struct sockaddr_un sa; int fd; @@ -1520,40 +1500,10 @@ sk_open_unix(sock *s, char *name) return -1; s->fd = fd; - sk_insert(s); + birdloop_add_socket(loop, s); return 0; } -static void -sk_reloop_hook(void *_vs) -{ - sock *s = _vs; - if (birdloop_inside(&main_birdloop)) - { - s->flags &= ~SKF_THREAD; - sk_insert(s); - } - else - { - s->flags |= SKF_THREAD; - sk_start(s); - } -} - -void -sk_reloop(sock *s, struct birdloop *loop) -{ - if (enlisted(&s->n)) - rem_node(&s->n); - - s->reloop = (event) { - .hook = sk_reloop_hook, - .data = s, - }; - - ev_send_loop(loop, &s->reloop); -} - #define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \ CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL) @@ -1676,6 +1626,13 @@ sk_recvmsg(sock *s) static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; } +_Bool +sk_tx_pending(sock *s) +{ + return s->ttx != s->tpos; +} + + static int sk_maybe_write(sock *s) { @@ -1686,7 +1643,7 @@ sk_maybe_write(sock *s) case SK_TCP: case SK_MAGIC: case SK_UNIX: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = write(s->fd, s->ttx, s->tpos - s->ttx); @@ -1708,7 +1665,7 @@ sk_maybe_write(sock *s) #ifdef HAVE_LIBSSH case SK_SSH: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx); @@ -1791,7 +1748,12 @@ sk_send(sock *s, unsigned len) { s->ttx = s->tbuf; s->tpos = s->tbuf + len; - return sk_maybe_write(s); + + int e = sk_maybe_write(s); + if (e == 0) /* Trigger thread poll reload to poll this socket's write. */ + socket_changed(s); + + return e; } /** @@ -1838,7 +1800,7 @@ call_rx_hook(sock *s, int size) if (s->rx_hook(s, size)) { /* We need to be careful since the socket could have been deleted by the hook */ - if (current_sock == s) + if (s->loop->sock_active == s) s->rpos = s->rbuf; } } @@ -2002,7 +1964,7 @@ sk_write_noflush(sock *s) #endif default: - if (s->ttx != s->tpos && sk_maybe_write(s) > 0) + if (sk_tx_pending(s) && sk_maybe_write(s) > 0) { if (s->tx_hook) s->tx_hook(s); @@ -2224,6 +2186,8 @@ static int short_loops = 0; #define SHORT_LOOP_MAX 10 #define WORK_EVENTS_MAX 10 +sock *stored_sock; + void io_loop(void) { @@ -2312,17 +2276,13 @@ io_loop(void) times_update(); /* guaranteed to be non-empty */ - current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock) + while (main_birdloop.sock_active) + { + sock *s = main_birdloop.sock_active; + if (s->index != -1) { - sock *s = current_sock; - if (s->index == -1) - { - current_sock = sk_next(s); - goto next; - } - int e; int steps; @@ -2333,10 +2293,11 @@ io_loop(void) steps--; io_log_event(s->rx_hook, s->data); e = sk_read(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next; } - while (e && s->rx_hook && steps); + while (e && (main_birdloop.sock_active == s) && s->rx_hook && steps); + + if (s != main_birdloop.sock_active) + continue; steps = MAX_STEPS; if (pfd.pfd.data[s->index].revents & POLLOUT) @@ -2345,56 +2306,54 @@ io_loop(void) steps--; io_log_event(s->tx_hook, s->data); e = sk_write(s); - if (s != current_sock) - goto next; } - while (e && steps); + while (e && (main_birdloop.sock_active == s) && steps); - current_sock = sk_next(s); - next: ; + if (s != main_birdloop.sock_active) + continue; } + main_birdloop.sock_active = sk_next(s); + } + short_loops++; if (events && (short_loops < SHORT_LOOP_MAX)) continue; short_loops = 0; int count = 0; - current_sock = stored_sock; - if (current_sock == NULL) - current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); + main_birdloop.sock_active = stored_sock; + if (main_birdloop.sock_active == NULL) + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock && count < MAX_RX_STEPS) + while (main_birdloop.sock_active && count < MAX_RX_STEPS) { - sock *s = current_sock; + sock *s = main_birdloop.sock_active; if (s->index == -1) - { - current_sock = sk_next(s); - goto next2; - } + goto next2; if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook) { count++; io_log_event(s->rx_hook, s->data); sk_read(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next2; + if (s != main_birdloop.sock_active) + continue; } if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR)) { sk_err(s, pfd.pfd.data[s->index].revents); - if (s != current_sock) - goto next2; + if (s != main_birdloop.sock_active) + continue; } - current_sock = sk_next(s); next2: ; + main_birdloop.sock_active = sk_next(s); } - stored_sock = current_sock; + stored_sock = main_birdloop.sock_active; } } } diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c index a9ae842a..79db32d3 100644 --- a/sysdep/unix/main.c +++ b/sysdep/unix/main.c @@ -542,7 +542,7 @@ cli_init_unix(uid_t use_uid, gid_t use_gid) /* Return value intentionally ignored */ unlink(path_control_socket); - if (sk_open_unix(s, path_control_socket) < 0) + if (sk_open_unix(s, &main_birdloop, path_control_socket) < 0) die("Cannot create control socket %s: %m", path_control_socket); if (use_uid || use_gid) diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h index e26afe41..606b79cd 100644 --- a/sysdep/unix/unix.h +++ b/sysdep/unix/unix.h @@ -9,6 +9,9 @@ #ifndef _BIRD_UNIX_H_ #define _BIRD_UNIX_H_ +#include "nest/bird.h" +#include "lib/io-loop.h" + #include <sys/socket.h> #include <signal.h> @@ -110,7 +113,7 @@ extern volatile sig_atomic_t async_shutdown_flag; void io_init(void); void io_loop(void); void io_log_dump(void); -int sk_open_unix(struct birdsock *s, char *name); +int sk_open_unix(struct birdsock *s, struct birdloop *, char *name); struct rfile *rf_open(struct pool *, const char *name, const char *mode); void *rf_file(struct rfile *f); int rf_fileno(struct rfile *f); |