summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/io-loop.h8
-rw-r--r--lib/socket.h12
-rw-r--r--proto/babel/packets.c2
-rw-r--r--proto/bfd/bfd.c11
-rw-r--r--proto/bfd/packets.c15
-rw-r--r--proto/bgp/bgp.c4
-rw-r--r--proto/bgp/packets.c5
-rw-r--r--proto/ospf/iface.c4
-rw-r--r--proto/radv/packets.c2
-rw-r--r--proto/rip/packets.c2
-rw-r--r--proto/rpki/ssh_transport.c4
-rw-r--r--proto/rpki/tcp_transport.c4
-rw-r--r--proto/rpki/transport.c2
-rw-r--r--sysdep/bsd/krt-sock.c2
-rw-r--r--sysdep/linux/netlink.c2
-rw-r--r--sysdep/unix/io-loop.c206
-rw-r--r--sysdep/unix/io-loop.h9
-rw-r--r--sysdep/unix/io.c155
-rw-r--r--sysdep/unix/main.c2
-rw-r--r--sysdep/unix/unix.h5
20 files changed, 236 insertions, 220 deletions
diff --git a/lib/io-loop.h b/lib/io-loop.h
index ae58bbee..502d77fc 100644
--- a/lib/io-loop.h
+++ b/lib/io-loop.h
@@ -16,10 +16,6 @@
extern struct birdloop main_birdloop;
-void sk_start(sock *s);
-void sk_stop(sock *s);
-void sk_reloop(sock *s, struct birdloop *loop);
-
/* Start a new birdloop owned by given pool and domain */
struct birdloop *birdloop_new(pool *p, uint order, const char *name);
@@ -58,6 +54,10 @@ struct birdloop_flag_handler {
void birdloop_flag(struct birdloop *loop, u32 flag);
void birdloop_flag_set_handler(struct birdloop *, struct birdloop_flag_handler *);
+/* Setup sockets */
+void birdloop_add_socket(struct birdloop *, struct birdsock *);
+void birdloop_remove_socket(struct birdloop *, struct birdsock *);
+
void birdloop_init(void);
/* Yield for a little while. Use only in special cases. */
diff --git a/lib/socket.h b/lib/socket.h
index 5c69482e..4b169581 100644
--- a/lib/socket.h
+++ b/lib/socket.h
@@ -80,17 +80,22 @@ typedef struct birdsock {
const char *password; /* Password for MD5 authentication */
const char *err; /* Error message */
struct ssh_sock *ssh; /* Used in SK_SSH */
- struct event reloop; /* Reloop event */
+ struct birdloop *loop; /* BIRDLoop owning this socket */
} sock;
sock *sock_new(pool *); /* Allocate new socket */
#define sk_new(X) sock_new(X) /* Wrapper to avoid name collision with OpenSSL */
-int sk_open(sock *); /* Open socket */
+int sk_open(sock *, struct birdloop *); /* Open socket */
+void sk_reloop(sock *, struct birdloop *); /* Move socket to another loop. Both loops must be locked. */
+
int sk_rx_ready(sock *s);
+_Bool sk_tx_pending(sock *s);
int sk_send(sock *, uint len); /* Send data, <0=err, >0=ok, 0=sleep */
int sk_send_to(sock *, uint len, ip_addr to, uint port); /* sk_send to given destination */
void sk_reallocate(sock *); /* Free and allocate tbuf & rbuf */
+void sk_pause_rx(struct birdloop *loop, sock *s);
+void sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint));
void sk_set_rbsize(sock *s, uint val); /* Resize RX buffer */
void sk_set_tbsize(sock *s, uint val); /* Resize TX buffer, keeping content */
void sk_set_tbuf(sock *s, void *tbuf); /* Switch TX buffer, NULL-> return to internal */
@@ -114,6 +119,7 @@ int sk_set_icmp6_filter(sock *s, int p1, int p2);
void sk_log_error(sock *s, const char *p);
byte * sk_rx_buffer(sock *s, int *len); /* Temporary */
+sock *sk_next(sock *s);
extern int sk_priority_control; /* Suggested priority for control traffic, should be sysdep define */
@@ -127,11 +133,9 @@ extern int sk_priority_control; /* Suggested priority for control traffic, shou
#define SKF_HIGH_PORT 0x20 /* Choose port from high range if possible */
#define SKF_FREEBIND 0x40 /* Allow socket to bind to a nonlocal address */
-#define SKF_THREAD 0x100 /* Socked used in thread, Do not add to main loop */
#define SKF_TRUNCATED 0x200 /* Received packet was truncated, set by IO layer */
#define SKF_HDRINCL 0x400 /* Used internally */
#define SKF_PKTINFO 0x800 /* Used internally */
-#define SKF_PASSIVE_THREAD 0x1000 /* Child sockets used in thread, do not add to main loop */
/*
* Socket types SA SP DA DP IF TTL SendTo (?=may, -=must not, *=must)
diff --git a/proto/babel/packets.c b/proto/babel/packets.c
index d4acc170..f13bb5ba 100644
--- a/proto/babel/packets.c
+++ b/proto/babel/packets.c
@@ -1617,7 +1617,7 @@ babel_open_socket(struct babel_iface *ifa)
sk->ttl = 1;
sk->flags = SKF_LADDR_RX;
- if (sk_open(sk) < 0)
+ if (sk_open(sk, p->p.loop) < 0)
goto err;
if (sk_setup_multicast(sk) < 0)
diff --git a/proto/bfd/bfd.c b/proto/bfd/bfd.c
index 1ef92b18..b6ccdb9a 100644
--- a/proto/bfd/bfd.c
+++ b/proto/bfd/bfd.c
@@ -603,16 +603,10 @@ bfd_free_iface(struct bfd_iface *ifa)
return;
if (ifa->sk)
- {
- sk_stop(ifa->sk);
rfree(ifa->sk);
- }
if (ifa->rx)
- {
- sk_stop(ifa->rx);
rfree(ifa->rx);
- }
rem_node(&ifa->n);
mb_free(ifa);
@@ -1100,11 +1094,6 @@ bfd_shutdown(struct proto *P)
bfd_drop_requests(p);
- if (p->rx4_1) sk_stop(p->rx4_1);
- if (p->rx4_m) sk_stop(p->rx4_m);
- if (p->rx6_1) sk_stop(p->rx6_1);
- if (p->rx6_m) sk_stop(p->rx6_m);
-
return PS_DOWN;
}
diff --git a/proto/bfd/packets.c b/proto/bfd/packets.c
index 2200ab09..a22f223b 100644
--- a/proto/bfd/packets.c
+++ b/proto/bfd/packets.c
@@ -430,12 +430,11 @@ bfd_open_rx_sk(struct bfd_proto *p, int multihop, int af)
/* TODO: configurable ToS and priority */
sk->tos = IP_PREC_INTERNET_CONTROL;
sk->priority = sk_priority_control;
- sk->flags = SKF_THREAD | SKF_LADDR_RX | (!multihop ? SKF_TTL_RX : 0);
+ sk->flags = SKF_LADDR_RX | (!multihop ? SKF_TTL_RX : 0);
- if (sk_open(sk) < 0)
+ if (sk_open(sk, p->p.loop) < 0)
goto err;
- sk_start(sk);
return sk;
err:
@@ -462,12 +461,11 @@ bfd_open_rx_sk_bound(struct bfd_proto *p, ip_addr local, struct iface *ifa)
/* TODO: configurable ToS and priority */
sk->tos = IP_PREC_INTERNET_CONTROL;
sk->priority = sk_priority_control;
- sk->flags = SKF_THREAD | SKF_BIND | (ifa ? SKF_TTL_RX : 0);
+ sk->flags = SKF_BIND | (ifa ? SKF_TTL_RX : 0);
- if (sk_open(sk) < 0)
+ if (sk_open(sk, p->p.loop) < 0)
goto err;
- sk_start(sk);
return sk;
err:
@@ -494,12 +492,11 @@ bfd_open_tx_sk(struct bfd_proto *p, ip_addr local, struct iface *ifa)
sk->tos = IP_PREC_INTERNET_CONTROL;
sk->priority = sk_priority_control;
sk->ttl = ifa ? 255 : -1;
- sk->flags = SKF_THREAD | SKF_BIND | SKF_HIGH_PORT;
+ sk->flags = SKF_BIND | SKF_HIGH_PORT;
- if (sk_open(sk) < 0)
+ if (sk_open(sk, p->p.loop) < 0)
goto err;
- sk_start(sk);
return sk;
err:
diff --git a/proto/bgp/bgp.c b/proto/bgp/bgp.c
index 122b0c22..a9028353 100644
--- a/proto/bgp/bgp.c
+++ b/proto/bgp/bgp.c
@@ -263,7 +263,7 @@ bgp_listen_create(void *_ UNUSED)
sk->rx_hook = bgp_incoming_connection;
sk->err_hook = bgp_listen_sock_err;
- if (sk_open(sk) < 0)
+ if (sk_open(sk, &main_birdloop) < 0)
{
sk_log_error(sk, p->p.name);
log(L_ERR "%s: Cannot open listening socket", p->p.name);
@@ -1203,7 +1203,7 @@ bgp_connect(struct bgp_proto *p) /* Enter Connect state and start establishing c
bgp_setup_sk(conn, s);
bgp_conn_set_state(conn, BS_CONNECT);
- if (sk_open(s) < 0)
+ if (sk_open(s, p->p.loop) < 0)
goto err;
/* Set minimal receive TTL if needed */
diff --git a/proto/bgp/packets.c b/proto/bgp/packets.c
index 924d6828..c90798a0 100644
--- a/proto/bgp/packets.c
+++ b/proto/bgp/packets.c
@@ -3015,7 +3015,7 @@ bgp_kick_tx(void *vconn)
;
if (!max && !ev_active(conn->tx_ev))
- ev_schedule(conn->tx_ev);
+ proto_send_event(&conn->bgp->p, conn->tx_ev);
}
void
@@ -3023,13 +3023,14 @@ bgp_tx(sock *sk)
{
struct bgp_conn *conn = sk->data;
+ ASSERT_DIE(birdloop_inside(conn->bgp->p.loop));
DBG("BGP: TX hook\n");
uint max = 1024;
while (--max && (bgp_fire_tx(conn) > 0))
;
if (!max && !ev_active(conn->tx_ev))
- ev_schedule(conn->tx_ev);
+ proto_send_event(&conn->bgp->p, conn->tx_ev);
}
diff --git a/proto/ospf/iface.c b/proto/ospf/iface.c
index 1919bccb..0aa7fa00 100644
--- a/proto/ospf/iface.c
+++ b/proto/ospf/iface.c
@@ -136,7 +136,7 @@ ospf_sk_open(struct ospf_iface *ifa)
sk->flags = SKF_LADDR_RX | (ifa->check_ttl ? SKF_TTL_RX : 0);
sk->ttl = ifa->cf->ttl_security ? 255 : 1;
- if (sk_open(sk) < 0)
+ if (sk_open(sk, p->p.loop) < 0)
goto err;
/* 12 is an offset of the checksum in an OSPFv3 packet */
@@ -220,7 +220,7 @@ ospf_open_vlink_sk(struct ospf_proto *p)
sk->data = (void *) p;
sk->flags = 0;
- if (sk_open(sk) < 0)
+ if (sk_open(sk, p->p.loop) < 0)
goto err;
/* 12 is an offset of the checksum in an OSPFv3 packet */
diff --git a/proto/radv/packets.c b/proto/radv/packets.c
index 5cd8b2de..c6b565d2 100644
--- a/proto/radv/packets.c
+++ b/proto/radv/packets.c
@@ -493,7 +493,7 @@ radv_sk_open(struct radv_iface *ifa)
sk->data = ifa;
sk->flags = SKF_LADDR_RX;
- if (sk_open(sk) < 0)
+ if (sk_open(sk, ifa->ra->p.loop) < 0)
goto err;
/* We want listen just to ICMPv6 messages of type RS and RA */
diff --git a/proto/rip/packets.c b/proto/rip/packets.c
index 9c3bd7a3..fecdf896 100644
--- a/proto/rip/packets.c
+++ b/proto/rip/packets.c
@@ -1012,7 +1012,7 @@ rip_open_socket(struct rip_iface *ifa)
/* sk->rbsize and sk->tbsize are handled in rip_iface_update_buffers() */
- if (sk_open(sk) < 0)
+ if (sk_open(sk, p->p.loop) < 0)
goto err;
if (ifa->cf->mode == RIP_IM_MULTICAST)
diff --git a/proto/rpki/ssh_transport.c b/proto/rpki/ssh_transport.c
index 223afa80..425ad460 100644
--- a/proto/rpki/ssh_transport.c
+++ b/proto/rpki/ssh_transport.c
@@ -35,11 +35,9 @@ rpki_tr_ssh_open(struct rpki_tr_sock *tr)
sk->ssh->subsystem = "rpki-rtr";
sk->ssh->state = SK_SSH_CONNECT;
- if (sk_open(sk) != 0)
+ if (sk_open(sk, cache->p->p.loop) != 0)
return RPKI_TR_ERROR;
- sk_start(sk);
-
return RPKI_TR_SUCCESS;
}
diff --git a/proto/rpki/tcp_transport.c b/proto/rpki/tcp_transport.c
index 4e850c44..ebb8030f 100644
--- a/proto/rpki/tcp_transport.c
+++ b/proto/rpki/tcp_transport.c
@@ -28,11 +28,9 @@ rpki_tr_tcp_open(struct rpki_tr_sock *tr)
sk->type = SK_TCP_ACTIVE;
- if (sk_open(sk) != 0)
+ if (sk_open(sk, tr->cache->p->p.loop) != 0)
return RPKI_TR_ERROR;
- sk_start(sk);
-
return RPKI_TR_SUCCESS;
}
diff --git a/proto/rpki/transport.c b/proto/rpki/transport.c
index 4026fca4..81bd6dd8 100644
--- a/proto/rpki/transport.c
+++ b/proto/rpki/transport.c
@@ -85,7 +85,6 @@ rpki_tr_open(struct rpki_tr_sock *tr)
sk->rbsize = RPKI_RX_BUFFER_SIZE;
sk->tbsize = RPKI_TX_BUFFER_SIZE;
sk->tos = IP_PREC_INTERNET_CONTROL;
- sk->flags |= SKF_THREAD;
sk->vrf = cache->p->p.vrf;
if (ipa_zero(sk->daddr) && sk->host)
@@ -121,7 +120,6 @@ rpki_tr_close(struct rpki_tr_sock *tr)
if (tr->sk)
{
- sk_stop(tr->sk);
rfree(tr->sk);
tr->sk = NULL;
}
diff --git a/sysdep/bsd/krt-sock.c b/sysdep/bsd/krt-sock.c
index e7b21474..094268b7 100644
--- a/sysdep/bsd/krt-sock.c
+++ b/sysdep/bsd/krt-sock.c
@@ -1088,7 +1088,7 @@ krt_sock_open(pool *pool, void *data, int table_id UNUSED)
sk->fd = fd;
sk->data = data;
- if (sk_open(sk) < 0)
+ if (sk_open(sk, &main_birdloop) < 0)
bug("krt-sock: sk_open failed");
return sk;
diff --git a/sysdep/linux/netlink.c b/sysdep/linux/netlink.c
index b18cb899..e8a86ce4 100644
--- a/sysdep/linux/netlink.c
+++ b/sysdep/linux/netlink.c
@@ -2043,7 +2043,7 @@ nl_open_async(void)
sk->rx_hook = nl_async_hook;
sk->err_hook = nl_async_err_hook;
sk->fd = fd;
- if (sk_open(sk) < 0)
+ if (sk_open(sk, &main_birdloop) < 0)
bug("Netlink: sk_open failed");
}
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c
index df162c6e..0e2396f7 100644
--- a/sysdep/unix/io-loop.c
+++ b/sysdep/unix/io-loop.c
@@ -310,59 +310,95 @@ sockets_init(struct birdloop *loop)
loop->sock_num = 0;
}
-static void
-sockets_add(struct birdloop *loop, sock *s)
+void
+socket_changed(sock *s)
+{
+ struct birdloop *loop = s->loop;
+ ASSERT_DIE(birdloop_inside(loop));
+
+ loop->sock_changed++;
+ birdloop_ping(loop);
+}
+
+void
+birdloop_add_socket(struct birdloop *loop, sock *s)
{
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(!s->loop);
+
LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num);
add_tail(&loop->sock_list, &s->n);
loop->sock_num++;
+ s->loop = loop;
s->index = -1;
- if (loop->thread)
- atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
- birdloop_ping(loop);
+ socket_changed(s);
}
+extern sock *stored_sock; /* mainloop hack */
+
void
-sk_start(sock *s)
+birdloop_remove_socket(struct birdloop *loop, sock *s)
{
- ASSERT_DIE(birdloop_current != &main_birdloop);
- sockets_add(birdloop_current, s);
-}
+ ASSERT_DIE(!enlisted(&s->n) == !s->loop);
-static void
-sockets_remove(struct birdloop *loop, sock *s)
-{
- if (!enlisted(&s->n))
+ if (!s->loop)
return;
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(s->loop == loop);
+
/* Decouple the socket from the loop at all. */
LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num);
+ if (loop->sock_active == s)
+ loop->sock_active = sk_next(s);
+
+ if ((loop == &main_birdloop) && (s == stored_sock))
+ stored_sock = sk_next(s);
+
rem_node(&s->n);
loop->sock_num--;
- if (loop->thread)
- atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
+ socket_changed(s);
+
+ s->loop = NULL;
s->index = -1;
+}
- /* Close the filedescriptor. If it ever gets into the poll(), it just returns
- * POLLNVAL for this fd which then is ignored because nobody checks for
- * that result. Or some other routine opens another fd, getting this number,
- * yet also in this case poll() at worst spuriously returns and nobody checks
- * for the result in this fd. No further precaution is needed. */
- close(s->fd);
+void
+sk_reloop(sock *s, struct birdloop *loop)
+{
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(birdloop_inside(s->loop));
+
+ if (loop == s->loop)
+ return;
+
+ birdloop_remove_socket(s->loop, s);
+ birdloop_add_socket(loop, s);
+}
+
+void
+sk_pause_rx(struct birdloop *loop, sock *s)
+{
+ ASSERT_DIE(birdloop_inside(loop));
+ s->rx_hook = NULL;
+ socket_changed(s);
}
void
-sk_stop(sock *s)
+sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint))
{
- sockets_remove(birdloop_current, s);
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(hook);
+ s->rx_hook = hook;
+ socket_changed(s);
}
static inline uint sk_want_events(sock *s)
-{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
+{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); }
void
sockets_prepare(struct birdloop *loop, struct pfd *pfd)
@@ -392,40 +428,60 @@ sockets_prepare(struct birdloop *loop, struct pfd *pfd)
int sk_read(sock *s, int revents);
int sk_write(sock *s);
+void sk_err(sock *s, int revents);
-static void
+static int
sockets_fire(struct birdloop *loop)
{
+ if (EMPTY_LIST(loop->sock_list))
+ return 0;
+
+ int sch = 0;
+
times_update();
struct pollfd *pfd = loop->thread->pfd->pfd.data;
- sock *s; node *n, *nxt;
- WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
+ loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list));
+
+ while (loop->sock_active)
{
- if (s->index < 0)
- continue;
+ sock *s = loop->sock_active;
- int rev = pfd[s->index].revents;
+ int rev;
+ if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL))
+ {
+ int e = 1;
- if (!rev)
- continue;
+ if (rev & POLLOUT)
+ {
+ while ((s == loop->sock_active) && (e = sk_write(s)))
+ ;
- if (rev & POLLNVAL)
- bug("poll: invalid fd %d", s->fd);
+ if (s != loop->sock_active)
+ continue;
- int e = 1;
+ if (!sk_tx_pending(s))
+ sch++;
+ }
- if (rev & POLLIN)
- while (e && s->rx_hook)
- e = sk_read(s, rev);
+ if (rev & POLLIN)
+ while (e && (s == loop->sock_active) && s->rx_hook)
+ e = sk_read(s, rev);
- if (rev & POLLOUT)
- {
- atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
- while (e = sk_write(s))
- ;
+ if (s != loop->sock_active)
+ continue;
+
+ if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR))
+ sk_err(s, rev);
+
+ if (s != loop->sock_active)
+ continue;
}
+
+ loop->sock_active = sk_next(s);
}
+
+ return sch;
}
/*
@@ -547,7 +603,8 @@ bird_thread_main(void *arg)
thr->meta->thread = thr;
birdloop_enter(thr->meta);
- u32 refresh_sockets = 1;
+ thr->sock_changed = 1;
+
struct pfd pfd;
BUFFER_INIT(pfd.pfd, thr->pool, 16);
BUFFER_INIT(pfd.loop, thr->pool, 16);
@@ -563,7 +620,7 @@ bird_thread_main(void *arg)
{
birdloop_enter(loop);
if (!EMPTY_LIST(loop->sock_list))
- refresh_sockets = 1;
+ thr->sock_changed = 1;
birdloop_leave(loop);
}
@@ -590,10 +647,10 @@ bird_thread_main(void *arg)
ev_run_list(&thr->priority_events);
/* Do we have to refresh sockets? */
- refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel);
-
- if (refresh_sockets)
+ if (thr->sock_changed)
{
+ thr->sock_changed = 0;
+
BUFFER_FLUSH(pfd.pfd);
BUFFER_FLUSH(pfd.loop);
@@ -608,7 +665,6 @@ bird_thread_main(void *arg)
}
ASSERT_DIE(pfd.loop.used == pfd.pfd.used);
- refresh_sockets = 0;
}
/* Nothing to do in at least 5 seconds, flush local hot page cache */
else if (timeout > 5000)
@@ -957,6 +1013,15 @@ birdloop_init(void)
static void
birdloop_stop_internal(struct birdloop *loop)
{
+ LOOP_TRACE(loop, "Stopping");
+
+ /* Block incoming pings */
+ u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire);
+ while (!atomic_compare_exchange_strong_explicit(
+ &loop->thread_transition, &ltt, LTT_PING,
+ memory_order_acq_rel, memory_order_acquire))
+ ;
+
/* Flush remaining events */
ASSERT_DIE(!ev_run_list(&loop->event_list));
@@ -965,17 +1030,27 @@ birdloop_stop_internal(struct birdloop *loop)
while (t = timers_first(&loop->time))
tm_stop(t);
- /* No sockets allowed */
- ASSERT_DIE(EMPTY_LIST(loop->sock_list));
+ /* Drop sockets */
+ sock *s;
+ WALK_LIST_FIRST2(s, n, loop->sock_list)
+ birdloop_remove_socket(loop, s);
/* Unschedule from Meta */
ev_postpone(&loop->event);
tm_stop(&loop->timer);
- /* Declare loop stopped */
+ /* Remove from thread loop list */
rem_node(&loop->n);
+ loop->thread = NULL;
+
+ /* Leave the loop context without causing any other fuss */
+ ASSERT_DIE(!ev_active(&loop->event));
+ loop->ping_pending = 0;
birdloop_leave(loop);
+ /* Request local socket reload */
+ this_thread->sock_changed++;
+
/* Tail-call the stopped hook */
loop->stopped(loop->stop_data);
}
@@ -989,12 +1064,14 @@ birdloop_run(void *_loop)
struct birdloop *loop = _loop;
birdloop_enter(loop);
+ LOOP_TRACE(loop, "Regular run");
+
if (loop->stopped)
/* Birdloop left inside the helper function */
return birdloop_stop_internal(loop);
/* Process sockets */
- sockets_fire(loop);
+ this_thread->sock_changed += sockets_fire(loop);
/* Run timers */
timers_fire(&loop->time, 0);
@@ -1016,6 +1093,10 @@ birdloop_run(void *_loop)
else
tm_stop(&loop->timer);
+ /* Collect socket change requests */
+ this_thread->sock_changed += loop->sock_changed;
+ loop->sock_changed = 0;
+
birdloop_leave(loop);
}
@@ -1074,6 +1155,8 @@ birdloop_new(pool *pp, uint order, const char *name)
static void
birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
{
+ LOOP_TRACE(loop, "Stop requested");
+
loop->stopped = stopped;
loop->stop_data = data;
@@ -1100,8 +1183,7 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat
void
birdloop_free(struct birdloop *loop)
{
- ASSERT_DIE(loop->links == 0);
- ASSERT_DIE(birdloop_in_this_thread(loop));
+ ASSERT_DIE(loop->thread == NULL);
domain_free(loop->time.domain);
rfree(loop->pool);
@@ -1171,20 +1253,6 @@ birdloop_unmask_wakeups(struct birdloop *loop)
}
void
-birdloop_link(struct birdloop *loop)
-{
- ASSERT_DIE(birdloop_inside(loop));
- loop->links++;
-}
-
-void
-birdloop_unlink(struct birdloop *loop)
-{
- ASSERT_DIE(birdloop_inside(loop));
- loop->links--;
-}
-
-void
birdloop_yield(void)
{
usleep(100);
diff --git a/sysdep/unix/io-loop.h b/sysdep/unix/io-loop.h
index e606f07e..7ec903af 100644
--- a/sysdep/unix/io-loop.h
+++ b/sysdep/unix/io-loop.h
@@ -22,6 +22,7 @@ struct pfd {
};
void sockets_prepare(struct birdloop *, struct pfd *);
+void socket_changed(struct birdsock *);
void pipe_new(struct pipe *);
void pipe_pollin(struct pipe *, struct pfd *);
@@ -40,12 +41,12 @@ struct birdloop
struct timeloop time;
event_list event_list;
list sock_list;
+ struct birdsock *sock_active;
int sock_num;
+ uint sock_changed;
uint ping_pending;
- uint links;
-
_Atomic u32 thread_transition;
#define LTT_PING 1
#define LTT_MOVE 2
@@ -66,8 +67,6 @@ struct bird_thread
{
node n;
- _Atomic u32 poll_changed;
-
struct pipe wakeup;
event_list priority_events;
@@ -83,6 +82,8 @@ struct bird_thread
struct pfd *pfd;
event cleanup_event;
+
+ int sock_changed;
};
#endif
diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c
index 06797096..88d187a4 100644
--- a/sysdep/unix/io.c
+++ b/sysdep/unix/io.c
@@ -722,10 +722,7 @@ sk_log_error(sock *s, const char *p)
* Actual struct birdsock code
*/
-static struct birdsock *current_sock;
-static struct birdsock *stored_sock;
-
-static inline sock *
+sock *
sk_next(sock *s)
{
if (!s->n.next->next)
@@ -787,6 +784,7 @@ sk_ssh_free(sock *s)
}
#endif
+
static void
sk_free(resource *r)
{
@@ -799,18 +797,10 @@ sk_free(resource *r)
sk_ssh_free(s);
#endif
- if ((s->fd < 0) || (s->flags & SKF_THREAD))
- return;
-
- if (s == current_sock)
- current_sock = sk_next(s);
- if (s == stored_sock)
- stored_sock = sk_next(s);
-
- if (enlisted(&s->n))
- rem_node(&s->n);
+ if (s->loop)
+ birdloop_remove_socket(s->loop, s);
- if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
+ if (s->fd >= 0 && s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
close(s->fd);
s->fd = -1;
@@ -1023,12 +1013,6 @@ sk_setup(sock *s)
}
static void
-sk_insert(sock *s)
-{
- add_tail(&main_birdloop.sock_list, &s->n);
-}
-
-static void
sk_tcp_connected(sock *s)
{
sockaddr sa;
@@ -1101,10 +1085,7 @@ sk_passive_connected(sock *s, int type)
return 1;
}
- if (s->flags & SKF_PASSIVE_THREAD)
- t->flags |= SKF_THREAD;
- else
- sk_insert(t);
+ birdloop_add_socket(s->loop, t);
sk_alloc_bufs(t);
s->rx_hook(t, 0);
@@ -1319,6 +1300,7 @@ sk_open_ssh(sock *s)
/**
* sk_open - open a socket
+ * @loop: loop
* @s: socket
*
* This function takes a socket resource created by sk_new() and
@@ -1328,7 +1310,7 @@ sk_open_ssh(sock *s)
* Result: 0 for success, -1 for an error.
*/
int
-sk_open(sock *s)
+sk_open(sock *s, struct birdloop *loop)
{
int af = AF_UNSPEC;
int fd = -1;
@@ -1481,9 +1463,7 @@ sk_open(sock *s)
sk_alloc_bufs(s);
}
- if (!(s->flags & SKF_THREAD))
- sk_insert(s);
-
+ birdloop_add_socket(loop, s);
return 0;
err:
@@ -1493,7 +1473,7 @@ err:
}
int
-sk_open_unix(sock *s, char *name)
+sk_open_unix(sock *s, struct birdloop *loop, char *name)
{
struct sockaddr_un sa;
int fd;
@@ -1520,40 +1500,10 @@ sk_open_unix(sock *s, char *name)
return -1;
s->fd = fd;
- sk_insert(s);
+ birdloop_add_socket(loop, s);
return 0;
}
-static void
-sk_reloop_hook(void *_vs)
-{
- sock *s = _vs;
- if (birdloop_inside(&main_birdloop))
- {
- s->flags &= ~SKF_THREAD;
- sk_insert(s);
- }
- else
- {
- s->flags |= SKF_THREAD;
- sk_start(s);
- }
-}
-
-void
-sk_reloop(sock *s, struct birdloop *loop)
-{
- if (enlisted(&s->n))
- rem_node(&s->n);
-
- s->reloop = (event) {
- .hook = sk_reloop_hook,
- .data = s,
- };
-
- ev_send_loop(loop, &s->reloop);
-}
-
#define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \
CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL)
@@ -1676,6 +1626,13 @@ sk_recvmsg(sock *s)
static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; }
+_Bool
+sk_tx_pending(sock *s)
+{
+ return s->ttx != s->tpos;
+}
+
+
static int
sk_maybe_write(sock *s)
{
@@ -1686,7 +1643,7 @@ sk_maybe_write(sock *s)
case SK_TCP:
case SK_MAGIC:
case SK_UNIX:
- while (s->ttx != s->tpos)
+ while (sk_tx_pending(s))
{
e = write(s->fd, s->ttx, s->tpos - s->ttx);
@@ -1708,7 +1665,7 @@ sk_maybe_write(sock *s)
#ifdef HAVE_LIBSSH
case SK_SSH:
- while (s->ttx != s->tpos)
+ while (sk_tx_pending(s))
{
e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx);
@@ -1791,7 +1748,12 @@ sk_send(sock *s, unsigned len)
{
s->ttx = s->tbuf;
s->tpos = s->tbuf + len;
- return sk_maybe_write(s);
+
+ int e = sk_maybe_write(s);
+ if (e == 0) /* Trigger thread poll reload to poll this socket's write. */
+ socket_changed(s);
+
+ return e;
}
/**
@@ -1838,7 +1800,7 @@ call_rx_hook(sock *s, int size)
if (s->rx_hook(s, size))
{
/* We need to be careful since the socket could have been deleted by the hook */
- if (current_sock == s)
+ if (s->loop->sock_active == s)
s->rpos = s->rbuf;
}
}
@@ -2002,7 +1964,7 @@ sk_write_noflush(sock *s)
#endif
default:
- if (s->ttx != s->tpos && sk_maybe_write(s) > 0)
+ if (sk_tx_pending(s) && sk_maybe_write(s) > 0)
{
if (s->tx_hook)
s->tx_hook(s);
@@ -2224,6 +2186,8 @@ static int short_loops = 0;
#define SHORT_LOOP_MAX 10
#define WORK_EVENTS_MAX 10
+sock *stored_sock;
+
void
io_loop(void)
{
@@ -2312,17 +2276,13 @@ io_loop(void)
times_update();
/* guaranteed to be non-empty */
- current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
+ main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
- while (current_sock)
+ while (main_birdloop.sock_active)
+ {
+ sock *s = main_birdloop.sock_active;
+ if (s->index != -1)
{
- sock *s = current_sock;
- if (s->index == -1)
- {
- current_sock = sk_next(s);
- goto next;
- }
-
int e;
int steps;
@@ -2333,10 +2293,11 @@ io_loop(void)
steps--;
io_log_event(s->rx_hook, s->data);
e = sk_read(s, pfd.pfd.data[s->index].revents);
- if (s != current_sock)
- goto next;
}
- while (e && s->rx_hook && steps);
+ while (e && (main_birdloop.sock_active == s) && s->rx_hook && steps);
+
+ if (s != main_birdloop.sock_active)
+ continue;
steps = MAX_STEPS;
if (pfd.pfd.data[s->index].revents & POLLOUT)
@@ -2345,56 +2306,54 @@ io_loop(void)
steps--;
io_log_event(s->tx_hook, s->data);
e = sk_write(s);
- if (s != current_sock)
- goto next;
}
- while (e && steps);
+ while (e && (main_birdloop.sock_active == s) && steps);
- current_sock = sk_next(s);
- next: ;
+ if (s != main_birdloop.sock_active)
+ continue;
}
+ main_birdloop.sock_active = sk_next(s);
+ }
+
short_loops++;
if (events && (short_loops < SHORT_LOOP_MAX))
continue;
short_loops = 0;
int count = 0;
- current_sock = stored_sock;
- if (current_sock == NULL)
- current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
+ main_birdloop.sock_active = stored_sock;
+ if (main_birdloop.sock_active == NULL)
+ main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
- while (current_sock && count < MAX_RX_STEPS)
+ while (main_birdloop.sock_active && count < MAX_RX_STEPS)
{
- sock *s = current_sock;
+ sock *s = main_birdloop.sock_active;
if (s->index == -1)
- {
- current_sock = sk_next(s);
- goto next2;
- }
+ goto next2;
if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook)
{
count++;
io_log_event(s->rx_hook, s->data);
sk_read(s, pfd.pfd.data[s->index].revents);
- if (s != current_sock)
- goto next2;
+ if (s != main_birdloop.sock_active)
+ continue;
}
if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR))
{
sk_err(s, pfd.pfd.data[s->index].revents);
- if (s != current_sock)
- goto next2;
+ if (s != main_birdloop.sock_active)
+ continue;
}
- current_sock = sk_next(s);
next2: ;
+ main_birdloop.sock_active = sk_next(s);
}
- stored_sock = current_sock;
+ stored_sock = main_birdloop.sock_active;
}
}
}
diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c
index a9ae842a..79db32d3 100644
--- a/sysdep/unix/main.c
+++ b/sysdep/unix/main.c
@@ -542,7 +542,7 @@ cli_init_unix(uid_t use_uid, gid_t use_gid)
/* Return value intentionally ignored */
unlink(path_control_socket);
- if (sk_open_unix(s, path_control_socket) < 0)
+ if (sk_open_unix(s, &main_birdloop, path_control_socket) < 0)
die("Cannot create control socket %s: %m", path_control_socket);
if (use_uid || use_gid)
diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h
index e26afe41..606b79cd 100644
--- a/sysdep/unix/unix.h
+++ b/sysdep/unix/unix.h
@@ -9,6 +9,9 @@
#ifndef _BIRD_UNIX_H_
#define _BIRD_UNIX_H_
+#include "nest/bird.h"
+#include "lib/io-loop.h"
+
#include <sys/socket.h>
#include <signal.h>
@@ -110,7 +113,7 @@ extern volatile sig_atomic_t async_shutdown_flag;
void io_init(void);
void io_loop(void);
void io_log_dump(void);
-int sk_open_unix(struct birdsock *s, char *name);
+int sk_open_unix(struct birdsock *s, struct birdloop *, char *name);
struct rfile *rf_open(struct pool *, const char *name, const char *mode);
void *rf_file(struct rfile *f);
int rf_fileno(struct rfile *f);