summaryrefslogtreecommitdiff
path: root/sysdep/unix/io-loop.c
diff options
context:
space:
mode:
authorMaria Matejka <mq@ucw.cz>2023-04-02 19:15:22 +0200
committerMaria Matejka <mq@ucw.cz>2023-04-04 17:00:59 +0200
commit836e857b3098f8962c621a33f7ae5b532cf512f3 (patch)
tree96c62361fd7b2dd57cecdf9d9b9323f8aa95f021 /sysdep/unix/io-loop.c
parent571c4f69bfbcf437d848b332bb2f4995fea2347d (diff)
Sockets: Unified API for main and other loops
Now sk_open() requires an explicit IO loop to open the socket in. Also specific functions for socket RX pause / resume are added to allow for BGP corking. And last but not least, socket reloop is now synchronous to resolve weird cases of the target loop stopping before actually picking up the relooped socket. Now the caller must ensure that both loops are locked while relooping, and this way all sockets always have their respective loop.
Diffstat (limited to 'sysdep/unix/io-loop.c')
-rw-r--r--sysdep/unix/io-loop.c206
1 files changed, 137 insertions, 69 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c
index df162c6e..0e2396f7 100644
--- a/sysdep/unix/io-loop.c
+++ b/sysdep/unix/io-loop.c
@@ -310,59 +310,95 @@ sockets_init(struct birdloop *loop)
loop->sock_num = 0;
}
-static void
-sockets_add(struct birdloop *loop, sock *s)
+void
+socket_changed(sock *s)
+{
+ struct birdloop *loop = s->loop;
+ ASSERT_DIE(birdloop_inside(loop));
+
+ loop->sock_changed++;
+ birdloop_ping(loop);
+}
+
+void
+birdloop_add_socket(struct birdloop *loop, sock *s)
{
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(!s->loop);
+
LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num);
add_tail(&loop->sock_list, &s->n);
loop->sock_num++;
+ s->loop = loop;
s->index = -1;
- if (loop->thread)
- atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
- birdloop_ping(loop);
+ socket_changed(s);
}
+extern sock *stored_sock; /* mainloop hack */
+
void
-sk_start(sock *s)
+birdloop_remove_socket(struct birdloop *loop, sock *s)
{
- ASSERT_DIE(birdloop_current != &main_birdloop);
- sockets_add(birdloop_current, s);
-}
+ ASSERT_DIE(!enlisted(&s->n) == !s->loop);
-static void
-sockets_remove(struct birdloop *loop, sock *s)
-{
- if (!enlisted(&s->n))
+ if (!s->loop)
return;
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(s->loop == loop);
+
/* Decouple the socket from the loop at all. */
LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num);
+ if (loop->sock_active == s)
+ loop->sock_active = sk_next(s);
+
+ if ((loop == &main_birdloop) && (s == stored_sock))
+ stored_sock = sk_next(s);
+
rem_node(&s->n);
loop->sock_num--;
- if (loop->thread)
- atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
+ socket_changed(s);
+
+ s->loop = NULL;
s->index = -1;
+}
- /* Close the filedescriptor. If it ever gets into the poll(), it just returns
- * POLLNVAL for this fd which then is ignored because nobody checks for
- * that result. Or some other routine opens another fd, getting this number,
- * yet also in this case poll() at worst spuriously returns and nobody checks
- * for the result in this fd. No further precaution is needed. */
- close(s->fd);
+void
+sk_reloop(sock *s, struct birdloop *loop)
+{
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(birdloop_inside(s->loop));
+
+ if (loop == s->loop)
+ return;
+
+ birdloop_remove_socket(s->loop, s);
+ birdloop_add_socket(loop, s);
+}
+
+void
+sk_pause_rx(struct birdloop *loop, sock *s)
+{
+ ASSERT_DIE(birdloop_inside(loop));
+ s->rx_hook = NULL;
+ socket_changed(s);
}
void
-sk_stop(sock *s)
+sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint))
{
- sockets_remove(birdloop_current, s);
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(hook);
+ s->rx_hook = hook;
+ socket_changed(s);
}
static inline uint sk_want_events(sock *s)
-{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
+{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); }
void
sockets_prepare(struct birdloop *loop, struct pfd *pfd)
@@ -392,40 +428,60 @@ sockets_prepare(struct birdloop *loop, struct pfd *pfd)
int sk_read(sock *s, int revents);
int sk_write(sock *s);
+void sk_err(sock *s, int revents);
-static void
+static int
sockets_fire(struct birdloop *loop)
{
+ if (EMPTY_LIST(loop->sock_list))
+ return 0;
+
+ int sch = 0;
+
times_update();
struct pollfd *pfd = loop->thread->pfd->pfd.data;
- sock *s; node *n, *nxt;
- WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
+ loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list));
+
+ while (loop->sock_active)
{
- if (s->index < 0)
- continue;
+ sock *s = loop->sock_active;
- int rev = pfd[s->index].revents;
+ int rev;
+ if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL))
+ {
+ int e = 1;
- if (!rev)
- continue;
+ if (rev & POLLOUT)
+ {
+ while ((s == loop->sock_active) && (e = sk_write(s)))
+ ;
- if (rev & POLLNVAL)
- bug("poll: invalid fd %d", s->fd);
+ if (s != loop->sock_active)
+ continue;
- int e = 1;
+ if (!sk_tx_pending(s))
+ sch++;
+ }
- if (rev & POLLIN)
- while (e && s->rx_hook)
- e = sk_read(s, rev);
+ if (rev & POLLIN)
+ while (e && (s == loop->sock_active) && s->rx_hook)
+ e = sk_read(s, rev);
- if (rev & POLLOUT)
- {
- atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
- while (e = sk_write(s))
- ;
+ if (s != loop->sock_active)
+ continue;
+
+ if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR))
+ sk_err(s, rev);
+
+ if (s != loop->sock_active)
+ continue;
}
+
+ loop->sock_active = sk_next(s);
}
+
+ return sch;
}
/*
@@ -547,7 +603,8 @@ bird_thread_main(void *arg)
thr->meta->thread = thr;
birdloop_enter(thr->meta);
- u32 refresh_sockets = 1;
+ thr->sock_changed = 1;
+
struct pfd pfd;
BUFFER_INIT(pfd.pfd, thr->pool, 16);
BUFFER_INIT(pfd.loop, thr->pool, 16);
@@ -563,7 +620,7 @@ bird_thread_main(void *arg)
{
birdloop_enter(loop);
if (!EMPTY_LIST(loop->sock_list))
- refresh_sockets = 1;
+ thr->sock_changed = 1;
birdloop_leave(loop);
}
@@ -590,10 +647,10 @@ bird_thread_main(void *arg)
ev_run_list(&thr->priority_events);
/* Do we have to refresh sockets? */
- refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel);
-
- if (refresh_sockets)
+ if (thr->sock_changed)
{
+ thr->sock_changed = 0;
+
BUFFER_FLUSH(pfd.pfd);
BUFFER_FLUSH(pfd.loop);
@@ -608,7 +665,6 @@ bird_thread_main(void *arg)
}
ASSERT_DIE(pfd.loop.used == pfd.pfd.used);
- refresh_sockets = 0;
}
/* Nothing to do in at least 5 seconds, flush local hot page cache */
else if (timeout > 5000)
@@ -957,6 +1013,15 @@ birdloop_init(void)
static void
birdloop_stop_internal(struct birdloop *loop)
{
+ LOOP_TRACE(loop, "Stopping");
+
+ /* Block incoming pings */
+ u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire);
+ while (!atomic_compare_exchange_strong_explicit(
+ &loop->thread_transition, &ltt, LTT_PING,
+ memory_order_acq_rel, memory_order_acquire))
+ ;
+
/* Flush remaining events */
ASSERT_DIE(!ev_run_list(&loop->event_list));
@@ -965,17 +1030,27 @@ birdloop_stop_internal(struct birdloop *loop)
while (t = timers_first(&loop->time))
tm_stop(t);
- /* No sockets allowed */
- ASSERT_DIE(EMPTY_LIST(loop->sock_list));
+ /* Drop sockets */
+ sock *s;
+ WALK_LIST_FIRST2(s, n, loop->sock_list)
+ birdloop_remove_socket(loop, s);
/* Unschedule from Meta */
ev_postpone(&loop->event);
tm_stop(&loop->timer);
- /* Declare loop stopped */
+ /* Remove from thread loop list */
rem_node(&loop->n);
+ loop->thread = NULL;
+
+ /* Leave the loop context without causing any other fuss */
+ ASSERT_DIE(!ev_active(&loop->event));
+ loop->ping_pending = 0;
birdloop_leave(loop);
+ /* Request local socket reload */
+ this_thread->sock_changed++;
+
/* Tail-call the stopped hook */
loop->stopped(loop->stop_data);
}
@@ -989,12 +1064,14 @@ birdloop_run(void *_loop)
struct birdloop *loop = _loop;
birdloop_enter(loop);
+ LOOP_TRACE(loop, "Regular run");
+
if (loop->stopped)
/* Birdloop left inside the helper function */
return birdloop_stop_internal(loop);
/* Process sockets */
- sockets_fire(loop);
+ this_thread->sock_changed += sockets_fire(loop);
/* Run timers */
timers_fire(&loop->time, 0);
@@ -1016,6 +1093,10 @@ birdloop_run(void *_loop)
else
tm_stop(&loop->timer);
+ /* Collect socket change requests */
+ this_thread->sock_changed += loop->sock_changed;
+ loop->sock_changed = 0;
+
birdloop_leave(loop);
}
@@ -1074,6 +1155,8 @@ birdloop_new(pool *pp, uint order, const char *name)
static void
birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
{
+ LOOP_TRACE(loop, "Stop requested");
+
loop->stopped = stopped;
loop->stop_data = data;
@@ -1100,8 +1183,7 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat
void
birdloop_free(struct birdloop *loop)
{
- ASSERT_DIE(loop->links == 0);
- ASSERT_DIE(birdloop_in_this_thread(loop));
+ ASSERT_DIE(loop->thread == NULL);
domain_free(loop->time.domain);
rfree(loop->pool);
@@ -1171,20 +1253,6 @@ birdloop_unmask_wakeups(struct birdloop *loop)
}
void
-birdloop_link(struct birdloop *loop)
-{
- ASSERT_DIE(birdloop_inside(loop));
- loop->links++;
-}
-
-void
-birdloop_unlink(struct birdloop *loop)
-{
- ASSERT_DIE(birdloop_inside(loop));
- loop->links--;
-}
-
-void
birdloop_yield(void)
{
usleep(100);