summaryrefslogtreecommitdiff
path: root/sysdep/unix/io-loop.c
diff options
context:
space:
mode:
Diffstat (limited to 'sysdep/unix/io-loop.c')
-rw-r--r--sysdep/unix/io-loop.c1297
1 files changed, 1004 insertions, 293 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c
index 9b107a1a..efb408e0 100644
--- a/sysdep/unix/io-loop.c
+++ b/sysdep/unix/io-loop.c
@@ -17,8 +17,8 @@
#include "nest/bird.h"
#include "lib/buffer.h"
-#include "lib/coro.h"
#include "lib/lists.h"
+#include "lib/locking.h"
#include "lib/resource.h"
#include "lib/event.h"
#include "lib/timer.h"
@@ -27,15 +27,54 @@
#include "lib/io-loop.h"
#include "sysdep/unix/io-loop.h"
#include "conf/conf.h"
+#include "nest/cli.h"
+
+#define THREAD_STACK_SIZE 65536 /* To be lowered in near future */
+
+static struct birdloop *birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup, struct birdloop_pickup_group *group);
+
+/*
+ * Nanosecond time for accounting purposes
+ *
+ * A fixed point on startup is set as zero, all other values are relative to that.
+ * Caution: this overflows after like 500 years or so. If you plan to run
+ * BIRD for such a long time, please implement some means of overflow prevention.
+ */
+
+static struct timespec ns_begin;
+
+static void ns_init(void)
+{
+ if (clock_gettime(CLOCK_MONOTONIC, &ns_begin))
+ bug("clock_gettime: %m");
+}
+
+static u64 ns_now(void)
+{
+ struct timespec ts;
+ if (clock_gettime(CLOCK_MONOTONIC, &ts))
+ bug("clock_gettime: %m");
+
+ return (u64) (ts.tv_sec - ns_begin.tv_sec) * 1000000000 + ts.tv_nsec - ns_begin.tv_nsec;
+}
+
/*
* Current thread context
*/
-_Thread_local struct birdloop *birdloop_current = NULL;
+_Thread_local struct birdloop *birdloop_current;
static _Thread_local struct birdloop *birdloop_wakeup_masked;
static _Thread_local uint birdloop_wakeup_masked_count;
+#define LOOP_NAME(loop) domain_name((loop)->time.domain)
+
+#define LOOP_TRACE(loop, fmt, args...) do { if (config && config->latency_debug) log(L_TRACE "%s (%p): " fmt, LOOP_NAME(loop), (loop), ##args); } while (0)
+#define THREAD_TRACE(...) do { if (config && config->latency_debug) log(L_TRACE "Thread: " __VA_ARGS__); } while (0)
+
+#define LOOP_WARN(loop, fmt, args...) log(L_TRACE "%s (%p): " fmt, LOOP_NAME(loop), (loop), ##args)
+
+
event_list *
birdloop_event_list(struct birdloop *loop)
{
@@ -48,12 +87,6 @@ birdloop_time_loop(struct birdloop *loop)
return &loop->time;
}
-pool *
-birdloop_pool(struct birdloop *loop)
-{
- return loop->pool;
-}
-
_Bool
birdloop_inside(struct birdloop *loop)
{
@@ -64,91 +97,210 @@ birdloop_inside(struct birdloop *loop)
return 0;
}
+_Bool
+birdloop_in_this_thread(struct birdloop *loop)
+{
+ return pthread_equal(pthread_self(), loop->thread->thread_id);
+}
+
+void
+birdloop_flag(struct birdloop *loop, u32 flag)
+{
+ atomic_fetch_or_explicit(&loop->flags, flag, memory_order_acq_rel);
+ birdloop_ping(loop);
+}
+
+void
+birdloop_flag_set_handler(struct birdloop *loop, struct birdloop_flag_handler *fh)
+{
+ ASSERT_DIE(birdloop_inside(loop));
+ loop->flag_handler = fh;
+}
+
+static int
+birdloop_process_flags(struct birdloop *loop)
+{
+ if (!loop->flag_handler)
+ return 0;
+
+ u32 flags = atomic_exchange_explicit(&loop->flags, 0, memory_order_acq_rel);
+ if (!flags)
+ return 0;
+
+ loop->flag_handler->hook(loop->flag_handler, flags);
+ return 1;
+}
+
/*
* Wakeup code for birdloop
*/
-static void
-pipe_new(int *pfds)
+void
+pipe_new(struct pipe *p)
{
- int rv = pipe(pfds);
+ int rv = pipe(p->fd);
if (rv < 0)
die("pipe: %m");
- if (fcntl(pfds[0], F_SETFL, O_NONBLOCK) < 0)
+ if (fcntl(p->fd[0], F_SETFL, O_NONBLOCK) < 0)
die("fcntl(O_NONBLOCK): %m");
- if (fcntl(pfds[1], F_SETFL, O_NONBLOCK) < 0)
+ if (fcntl(p->fd[1], F_SETFL, O_NONBLOCK) < 0)
die("fcntl(O_NONBLOCK): %m");
}
void
-pipe_drain(int fd)
+pipe_drain(struct pipe *p)
{
- char buf[64];
- int rv;
-
- try:
- rv = read(fd, buf, 64);
- if (rv < 0)
- {
- if (errno == EINTR)
- goto try;
- if (errno == EAGAIN)
+ while (1) {
+ char buf[64];
+ int rv = read(p->fd[0], buf, sizeof(buf));
+ if ((rv < 0) && (errno == EAGAIN))
return;
- die("wakeup read: %m");
+
+ if (rv == 0)
+ bug("wakeup read eof");
+ if ((rv < 0) && (errno != EINTR))
+ bug("wakeup read: %m");
+ }
+}
+
+int
+pipe_read_one(struct pipe *p)
+{
+ while (1) {
+ char v;
+ int rv = read(p->fd[0], &v, sizeof(v));
+ if (rv == 1)
+ return 1;
+ if ((rv < 0) && (errno == EAGAIN))
+ return 0;
+ if (rv > 1)
+ bug("wakeup read more bytes than expected: %d", rv);
+ if (rv == 0)
+ bug("wakeup read eof");
+ if (errno != EINTR)
+ bug("wakeup read: %m");
}
- if (rv == 64)
- goto try;
}
void
-pipe_kick(int fd)
+pipe_kick(struct pipe *p)
{
- u64 v = 1;
+ char v = 1;
int rv;
- try:
- rv = write(fd, &v, sizeof(u64));
- if (rv < 0)
- {
- if (errno == EINTR)
- goto try;
- if (errno == EAGAIN)
+ while (1) {
+ rv = write(p->fd[1], &v, sizeof(v));
+ if ((rv >= 0) || (errno == EAGAIN))
return;
- die("wakeup write: %m");
+ if (errno != EINTR)
+ bug("wakeup write: %m");
}
}
+void
+pipe_pollin(struct pipe *p, struct pfd *pfd)
+{
+ BUFFER_PUSH(pfd->pfd) = (struct pollfd) {
+ .fd = p->fd[0],
+ .events = POLLIN,
+ };
+ BUFFER_PUSH(pfd->loop) = NULL;
+}
+
static inline void
-wakeup_init(struct birdloop *loop)
+wakeup_init(struct bird_thread *loop)
{
- pipe_new(loop->wakeup_fds);
+ pipe_new(&loop->wakeup);
}
static inline void
-wakeup_drain(struct birdloop *loop)
+wakeup_drain(struct bird_thread *loop)
{
- pipe_drain(loop->wakeup_fds[0]);
+ pipe_drain(&loop->wakeup);
}
static inline void
-wakeup_do_kick(struct birdloop *loop)
+wakeup_do_kick(struct bird_thread *loop)
{
- pipe_kick(loop->wakeup_fds[1]);
+ pipe_kick(&loop->wakeup);
}
-void
-birdloop_ping(struct birdloop *loop)
+static inline _Bool
+birdloop_try_ping(struct birdloop *loop, u32 ltt)
{
- u32 ping_sent = atomic_fetch_add_explicit(&loop->ping_sent, 1, memory_order_acq_rel);
- if (ping_sent)
- return;
+ /* Somebody else is already pinging, be idempotent */
+ if (ltt & LTT_PING)
+ {
+ LOOP_TRACE(loop, "already being pinged");
+ return 0;
+ }
+
+ /* Thread moving is an implicit ping */
+ if (ltt & LTT_MOVE)
+ {
+ LOOP_TRACE(loop, "ping while moving");
+ return 1;
+ }
+ /* No more flags allowed */
+ ASSERT_DIE(!ltt);
+
+ /* No ping when not picked up */
+ if (!loop->thread)
+ {
+ LOOP_TRACE(loop, "not picked up yet, can't ping");
+ return 1;
+ }
+
+ /* No ping when masked */
if (loop == birdloop_wakeup_masked)
+ {
+ LOOP_TRACE(loop, "wakeup masked, can't ping");
birdloop_wakeup_masked_count++;
+ return 1;
+ }
+
+ /* Send meta event to ping */
+ if ((loop != loop->thread->meta) && (loop != &main_birdloop))
+ {
+ LOOP_TRACE(loop, "Ping by meta event to %p", loop->thread->meta);
+ ev_send_loop(loop->thread->meta, &loop->event);
+ return 1;
+ }
+
+ /* Do the real ping */
+ LOOP_TRACE(loop, "sending pipe ping");
+ wakeup_do_kick(loop->thread);
+ return 0;
+}
+
+static inline void
+birdloop_do_ping(struct birdloop *loop)
+{
+ /* Register our ping effort */
+ u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_PING, memory_order_acq_rel);
+
+ /* Try to ping in multiple ways */
+ if (birdloop_try_ping(loop, ltt))
+ atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_PING, memory_order_acq_rel);
+}
+
+void
+birdloop_ping(struct birdloop *loop)
+{
+ if (!birdloop_inside(loop))
+ {
+ LOOP_TRACE(loop, "ping from outside");
+ birdloop_do_ping(loop);
+ }
else
- wakeup_do_kick(loop);
+ {
+ LOOP_TRACE(loop, "ping from inside, pending=%d", loop->ping_pending);
+ if (!loop->ping_pending)
+ loop->ping_pending++;
+ }
}
@@ -161,204 +313,748 @@ sockets_init(struct birdloop *loop)
{
init_list(&loop->sock_list);
loop->sock_num = 0;
+}
+
+void
+socket_changed(sock *s)
+{
+ struct birdloop *loop = s->loop;
+ ASSERT_DIE(birdloop_inside(loop));
- BUFFER_INIT(loop->poll_sk, loop->pool, 4);
- BUFFER_INIT(loop->poll_fd, loop->pool, 4);
- loop->poll_changed = 1; /* add wakeup fd */
+ loop->sock_changed++;
+ birdloop_ping(loop);
}
-static void
-sockets_add(struct birdloop *loop, sock *s)
+void
+birdloop_add_socket(struct birdloop *loop, sock *s)
{
- ASSERT_DIE(!enlisted(&s->n));
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(!s->loop);
+ LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num);
add_tail(&loop->sock_list, &s->n);
loop->sock_num++;
s->loop = loop;
s->index = -1;
- loop->poll_changed = 1;
- birdloop_ping(loop);
+ socket_changed(s);
}
+extern sock *stored_sock; /* mainloop hack */
+
void
-sk_start(sock *s)
+birdloop_remove_socket(struct birdloop *loop, sock *s)
{
- ASSERT_DIE(birdloop_current != &main_birdloop);
- sockets_add(birdloop_current, s);
-}
+ ASSERT_DIE(!enlisted(&s->n) == !s->loop);
-static void
-sockets_remove(struct birdloop *loop, sock *s)
-{
+ if (!s->loop)
+ return;
+
+ ASSERT_DIE(birdloop_inside(loop));
ASSERT_DIE(s->loop == loop);
- if (!enlisted(&s->n))
- return;
+ /* Decouple the socket from the loop at all. */
+ LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num);
+
+ if (loop->sock_active == s)
+ loop->sock_active = sk_next(s);
+
+ if ((loop == &main_birdloop) && (s == stored_sock))
+ stored_sock = sk_next(s);
rem_node(&s->n);
loop->sock_num--;
- if (s->index >= 0)
- {
- loop->poll_sk.data[s->index] = NULL;
- s->index = -1;
- loop->poll_changed = 1;
- birdloop_ping(loop);
- }
+ socket_changed(s);
s->loop = NULL;
+ s->index = -1;
}
void
-sk_stop(sock *s)
+sk_reloop(sock *s, struct birdloop *loop)
{
- sockets_remove(birdloop_current, s);
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(birdloop_inside(s->loop));
+
+ if (loop == s->loop)
+ return;
+
+ birdloop_remove_socket(s->loop, s);
+ birdloop_add_socket(loop, s);
+}
+
+void
+sk_pause_rx(struct birdloop *loop, sock *s)
+{
+ ASSERT_DIE(birdloop_inside(loop));
+ s->rx_hook = NULL;
+ socket_changed(s);
+}
+
+void
+sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint))
+{
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(hook);
+ s->rx_hook = hook;
+ socket_changed(s);
}
static inline uint sk_want_events(sock *s)
+{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); }
+
+void
+sockets_prepare(struct birdloop *loop, struct pfd *pfd)
{
- uint out = ((s->ttx != s->tpos) ? POLLOUT : 0);
- if (s->rx_hook)
- if (s->cork)
+ node *n;
+ WALK_LIST(n, loop->sock_list)
+ {
+ sock *s = SKIP_BACK(sock, n, n);
+ uint w = sk_want_events(s);
+
+ if (!w)
{
- LOCK_DOMAIN(cork, s->cork->lock);
- if (!enlisted(&s->cork_node))
- if (s->cork->count)
- {
-// log(L_TRACE "Socket %p corked", s);
- add_tail(&s->cork->sockets, &s->cork_node);
- }
- else
- out |= POLLIN;
- UNLOCK_DOMAIN(cork, s->cork->lock);
+ s->index = -1;
+ continue;
}
- else
- out |= POLLIN;
-// log(L_TRACE "sk_want_events(%p) = %x", s, out);
- return out;
+ s->index = pfd->pfd.used;
+ LOOP_TRACE(loop, "socket %p poll index is %d", s, s->index);
+
+ BUFFER_PUSH(pfd->pfd) = (struct pollfd) {
+ .fd = s->fd,
+ .events = sk_want_events(s),
+ };
+ BUFFER_PUSH(pfd->loop) = loop;
+ }
}
+int sk_read(sock *s, int revents);
+int sk_write(sock *s);
+void sk_err(sock *s, int revents);
-void
-sk_ping(sock *s)
+static int
+sockets_fire(struct birdloop *loop)
{
- s->loop->poll_changed = 1;
- birdloop_ping(s->loop);
+ if (EMPTY_LIST(loop->sock_list))
+ return 0;
+
+ int repeat = 0;
+
+ times_update();
+
+ struct pollfd *pfd = loop->thread->pfd->pfd.data;
+ loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list));
+
+ while (loop->sock_active)
+ {
+ sock *s = loop->sock_active;
+
+ int rev;
+ if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL))
+ {
+ int e = 1;
+
+ if (rev & POLLOUT)
+ {
+ /* Write everything. */
+ while ((s == loop->sock_active) && (e = sk_write(s)))
+ ;
+
+ if (s != loop->sock_active)
+ continue;
+
+ if (!sk_tx_pending(s))
+ loop->thread->sock_changed++;
+ }
+
+ if (rev & POLLIN)
+ /* Read just one packet and request repeat. */
+ if ((s == loop->sock_active) && s->rx_hook)
+ if (sk_read(s, rev))
+ repeat++;
+
+ if (s != loop->sock_active)
+ continue;
+
+ if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR))
+ sk_err(s, rev);
+
+ if (s != loop->sock_active)
+ continue;
+ }
+
+ loop->sock_active = sk_next(s);
+ }
+
+ return repeat;
}
/*
-FIXME: this should be called from sock code
+ * Threads
+ */
+
+DEFINE_DOMAIN(resource);
+
+struct birdloop_pickup_group {
+ DOMAIN(resource) domain;
+ list loops;
+ list threads;
+ btime max_latency;
+} pickup_groups[2] = {
+ {
+ /* all zeroes */
+ },
+ {
+ /* FIXME: make this dynamic, now it copies the loop_max_latency value from proto/bfd/config.Y */
+ .max_latency = 10 MS,
+ },
+};
+
+static _Thread_local struct bird_thread *this_thread;
static void
-sockets_update(struct birdloop *loop, sock *s)
+birdloop_set_thread(struct birdloop *loop, struct bird_thread *thr, struct birdloop_pickup_group *group)
+{
+ struct bird_thread *old = loop->thread;
+ ASSERT_DIE(!thr != !old);
+
+ /* Signal our moving effort */
+ u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_MOVE, memory_order_acq_rel);
+ ASSERT_DIE((ltt & LTT_MOVE) == 0);
+
+ while (ltt & LTT_PING)
+ {
+ birdloop_yield();
+ ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire);
+ ASSERT_DIE(ltt & LTT_MOVE);
+ }
+ /* Now we are free of running pings */
+
+ if (loop->thread = thr)
+ {
+ add_tail(&thr->loops, &loop->n);
+ thr->loop_count++;
+ }
+ else
+ {
+ old->loop_count--;
+
+ LOCK_DOMAIN(resource, group->domain);
+ add_tail(&group->loops, &loop->n);
+ UNLOCK_DOMAIN(resource, group->domain);
+ }
+
+ /* Finished */
+ atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel);
+
+ /* Request to run by force */
+ ev_send_loop(loop->thread->meta, &loop->event);
+}
+
+static struct birdloop *
+birdloop_take(struct birdloop_pickup_group *group)
{
- if (s->index >= 0)
- loop->poll_fd.data[s->index].events = sk_want_events(s);
+ struct birdloop *loop = NULL;
+
+ LOCK_DOMAIN(resource, group->domain);
+ if (!EMPTY_LIST(group->loops))
+ {
+ /* Take the first loop from the pickup list and unlock */
+ loop = SKIP_BACK(struct birdloop, n, HEAD(group->loops));
+ rem_node(&loop->n);
+ UNLOCK_DOMAIN(resource, group->domain);
+
+ birdloop_set_thread(loop, this_thread, group);
+
+ /* This thread goes to the end of the pickup list */
+ LOCK_DOMAIN(resource, group->domain);
+ rem_node(&this_thread->n);
+ add_tail(&group->threads, &this_thread->n);
+
+ /* If there are more loops to be picked up, wakeup the next thread in order */
+ if (!EMPTY_LIST(group->loops))
+ wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads)));
+ }
+ UNLOCK_DOMAIN(resource, group->domain);
+
+ return loop;
}
-*/
static void
-sockets_prepare(struct birdloop *loop)
+birdloop_drop(struct birdloop *loop, struct birdloop_pickup_group *group)
{
- BUFFER_SET(loop->poll_sk, loop->sock_num + 1);
- BUFFER_SET(loop->poll_fd, loop->sock_num + 1);
+ /* Remove loop from this thread's list */
+ rem_node(&loop->n);
- struct pollfd *pfd = loop->poll_fd.data;
- sock **psk = loop->poll_sk.data;
- uint i = 0;
- node *n;
+ /* Unset loop's thread */
+ if (birdloop_inside(loop))
+ birdloop_set_thread(loop, NULL, group);
+ else
+ {
+ birdloop_enter(loop);
+ birdloop_set_thread(loop, NULL, group);
+ birdloop_leave(loop);
+ }
- WALK_LIST(n, loop->sock_list)
+ /* Put loop into pickup list */
+ LOCK_DOMAIN(resource, group->domain);
+ add_tail(&group->loops, &loop->n);
+ UNLOCK_DOMAIN(resource, group->domain);
+}
+
+static int
+poll_timeout(struct birdloop *loop)
+{
+ timer *t = timers_first(&loop->time);
+ if (!t)
+ return -1;
+
+ btime remains = tm_remains(t);
+ return remains TO_MS + ((remains TO_MS) MS < remains);
+}
+
+static void *
+bird_thread_main(void *arg)
+{
+ struct bird_thread *thr = this_thread = arg;
+
+ rcu_thread_start(&thr->rcu);
+ synchronize_rcu();
+
+ tmp_init(thr->pool);
+ init_list(&thr->loops);
+
+ thr->meta = birdloop_new_internal(thr->pool, DOMAIN_ORDER(meta), "Thread Meta", 0, thr->group);
+ thr->meta->thread = thr;
+ birdloop_enter(thr->meta);
+
+ thr->sock_changed = 1;
+
+ struct pfd pfd;
+ BUFFER_INIT(pfd.pfd, thr->pool, 16);
+ BUFFER_INIT(pfd.loop, thr->pool, 16);
+ thr->pfd = &pfd;
+
+ while (1)
{
- sock *s = SKIP_BACK(sock, n, n);
+ u64 thr_loop_start = ns_now();
+ int timeout;
+
+ /* Pickup new loops */
+ struct birdloop *loop = birdloop_take(thr->group);
+ if (loop)
+ {
+ birdloop_enter(loop);
+ if (!EMPTY_LIST(loop->sock_list))
+ thr->sock_changed = 1;
+ birdloop_leave(loop);
+ }
+
+ /* Schedule all loops with timed out timers */
+ timers_fire(&thr->meta->time, 0);
+
+ /* Compute maximal time per loop */
+ u64 thr_before_run = ns_now();
+ if (thr->loop_count > 0)
+ thr->max_loop_time_ns = (thr->max_latency_ns / 2 - (thr_before_run - thr_loop_start)) / (u64) thr->loop_count;
+
+ /* Run all scheduled loops */
+ int more_events = ev_run_list(&thr->meta->event_list);
+ if (more_events)
+ {
+ THREAD_TRACE("More events to run");
+ timeout = 0;
+ }
+ else
+ {
+ timeout = poll_timeout(thr->meta);
+ if (timeout == -1)
+ THREAD_TRACE("No timers, no events");
+ else
+ THREAD_TRACE("Next timer in %d ms", timeout);
+ }
+
+ /* Run priority events before sleeping */
+ ev_run_list(&thr->priority_events);
+
+ /* Do we have to refresh sockets? */
+ if (thr->sock_changed)
+ {
+ thr->sock_changed = 0;
+
+ BUFFER_FLUSH(pfd.pfd);
+ BUFFER_FLUSH(pfd.loop);
+
+ pipe_pollin(&thr->wakeup, &pfd);
+
+ node *nn;
+ WALK_LIST2(loop, nn, thr->loops, n)
+ {
+ birdloop_enter(loop);
+ sockets_prepare(loop, &pfd);
+ birdloop_leave(loop);
+ }
+
+ ASSERT_DIE(pfd.loop.used == pfd.pfd.used);
+ }
+ /* Nothing to do in at least 5 seconds, flush local hot page cache */
+ else if (timeout > 5000)
+ flush_local_pages();
+
+poll_retry:;
+ int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout);
+ if (rv < 0)
+ {
+ if (errno == EINTR || errno == EAGAIN)
+ goto poll_retry;
+ bug("poll in %p: %m", thr);
+ }
- ASSERT(i < loop->sock_num);
+ /* Drain wakeup fd */
+ if (pfd.pfd.data[0].revents & POLLIN)
+ {
+ ASSERT_DIE(rv > 0);
+ rv--;
+ wakeup_drain(thr);
+ }
- s->index = i;
- *psk = s;
- pfd->fd = s->fd;
- pfd->events = sk_want_events(s);
- pfd->revents = 0;
+ atomic_fetch_and_explicit(&thr->meta->thread_transition, ~LTT_PING, memory_order_acq_rel);
- pfd++;
- psk++;
- i++;
+ /* Schedule loops with active sockets */
+ if (rv)
+ for (uint i = 1; i < pfd.pfd.used; i++)
+ if (pfd.pfd.data[i].revents)
+ {
+ LOOP_TRACE(pfd.loop.data[i], "socket id %d got revents=%d", i, pfd.pfd.data[i].revents);
+ ev_send_loop(thr->meta, &pfd.loop.data[i]->event);
+ }
}
- ASSERT(i == loop->sock_num);
+ bug("An infinite loop has ended.");
+}
+
+static void
+bird_thread_cleanup(void *_thr)
+{
+ struct bird_thread *thr = _thr;
+ ASSERT_DIE(birdloop_inside(&main_birdloop));
+
+ /* Thread attributes no longer needed */
+ pthread_attr_destroy(&thr->thread_attr);
+
+ /* Free all remaining memory */
+ rfree(thr->pool);
+}
+
+static struct bird_thread *
+bird_thread_start(struct birdloop_pickup_group *group)
+{
+ ASSERT_DIE(birdloop_inside(&main_birdloop));
+ ASSERT_DIE(DOMAIN_IS_LOCKED(resource, group->domain));
+
+ pool *p = rp_new(&root_pool, "Thread");
+
+ struct bird_thread *thr = mb_allocz(p, sizeof(*thr));
+ thr->pool = p;
+ thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, };
+ thr->group = group;
+ thr->max_latency_ns = (group->max_latency ?: 5 S) TO_NS;
+
+ wakeup_init(thr);
+ ev_init_list(&thr->priority_events, NULL, "Thread direct event list");
- /* Add internal wakeup fd */
- *psk = NULL;
- pfd->fd = loop->wakeup_fds[0];
- pfd->events = POLLIN;
- pfd->revents = 0;
+ add_tail(&group->threads, &thr->n);
- loop->poll_changed = 0;
+ int e = 0;
+
+ if (e = pthread_attr_init(&thr->thread_attr))
+ die("pthread_attr_init() failed: %M", e);
+
+ /* We don't have to worry about thread stack size so much.
+ if (e = pthread_attr_setstacksize(&thr->thread_attr, THREAD_STACK_SIZE))
+ die("pthread_attr_setstacksize(%u) failed: %M", THREAD_STACK_SIZE, e);
+ */
+
+ if (e = pthread_attr_setdetachstate(&thr->thread_attr, PTHREAD_CREATE_DETACHED))
+ die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e);
+
+ if (e = pthread_create(&thr->thread_id, &thr->thread_attr, bird_thread_main, thr))
+ die("pthread_create() failed: %M", e);
+
+ return thr;
}
-int sk_read(sock *s, int revents);
-int sk_write(sock *s);
+static struct birdloop *thread_dropper;
+static event *thread_dropper_event;
+static uint thread_dropper_goal;
static void
-sockets_fire(struct birdloop *loop)
+bird_thread_shutdown(void * _ UNUSED)
{
- struct pollfd *pfd = loop->poll_fd.data;
- sock **psk = loop->poll_sk.data;
- int poll_num = loop->poll_fd.used - 1;
+ struct birdloop_pickup_group *group = this_thread->group;
+ LOCK_DOMAIN(resource, group->domain);
+ int dif = list_length(&group->threads) - thread_dropper_goal;
+ struct birdloop *tdl_stop = NULL;
- times_update();
+ if (dif > 0)
+ ev_send_loop(thread_dropper, thread_dropper_event);
+ else
+ {
+ tdl_stop = thread_dropper;
+ thread_dropper = NULL;
+ }
+
+ UNLOCK_DOMAIN(resource, group->domain);
- /* Last fd is internal wakeup fd */
- if (pfd[poll_num].revents & POLLIN)
+ DBG("Thread pickup size differs from dropper goal by %d%s\n", dif, tdl_stop ? ", stopping" : "");
+
+ if (tdl_stop)
{
- wakeup_drain(loop);
- loop->poll_changed = 1;
+ birdloop_stop_self(tdl_stop, NULL, NULL);
+ return;
}
- int i;
- for (i = 0; i < poll_num; pfd++, psk++, i++)
+ struct bird_thread *thr = this_thread;
+
+ /* Leave the thread-picker list to get no more loops */
+ LOCK_DOMAIN(resource, group->domain);
+ rem_node(&thr->n);
+ UNLOCK_DOMAIN(resource, group->domain);
+
+ /* Drop loops including the thread dropper itself */
+ while (!EMPTY_LIST(thr->loops))
+ birdloop_drop(HEAD(thr->loops), group);
+
+ /* Let others know about new loops */
+ if (!EMPTY_LIST(group->loops))
+ wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads)));
+ UNLOCK_DOMAIN(resource, group->domain);
+
+ /* Leave the thread-dropper loop as we aren't going to return. */
+ birdloop_leave(thread_dropper);
+
+ /* Stop the meta loop */
+ birdloop_leave(thr->meta);
+ domain_free(thr->meta->time.domain);
+ rfree(thr->meta->pool);
+
+ /* Local pages not needed anymore */
+ flush_local_pages();
+
+ /* Unregister from RCU */
+ rcu_thread_stop(&thr->rcu);
+
+ /* Request thread cleanup from main loop */
+ ev_send_loop(&main_birdloop, &thr->cleanup_event);
+
+ /* Exit! */
+ pthread_exit(NULL);
+}
+
+
+void
+bird_thread_commit(struct config *new, struct config *old UNUSED)
+{
+ ASSERT_DIE(birdloop_inside(&main_birdloop));
+
+ if (new->shutdown)
+ return;
+
+ if (!new->thread_count)
+ new->thread_count = 1;
+
+ while (1)
{
- if (!*psk)
- continue;
+ struct birdloop_pickup_group *group = &pickup_groups[0];
+ LOCK_DOMAIN(resource, group->domain);
+
+ int dif = list_length(&group->threads) - (thread_dropper_goal = new->thread_count);
+ _Bool thread_dropper_running = !!thread_dropper;
- if (! pfd->revents)
+ if (dif < 0)
+ {
+ bird_thread_start(group);
+ UNLOCK_DOMAIN(resource, group->domain);
continue;
+ }
+
+ UNLOCK_DOMAIN(resource, group->domain);
+
+ if ((dif > 0) && !thread_dropper_running)
+ {
+ struct birdloop *tdl = birdloop_new(&root_pool, DOMAIN_ORDER(control), "Thread dropper", group->max_latency);
+ event *tde = ev_new_init(tdl->pool, bird_thread_shutdown, NULL);
+
+ LOCK_DOMAIN(resource, group->domain);
+ thread_dropper = tdl;
+ thread_dropper_event = tde;
+ UNLOCK_DOMAIN(resource, group->domain);
+
+ ev_send_loop(thread_dropper, thread_dropper_event);
+ }
+
+ return;
+ }
+}
+
+
+DEFINE_DOMAIN(control);
+
+struct bird_thread_show_data {
+ cli *cli;
+ pool *pool;
+ DOMAIN(control) lock;
+ uint total;
+ uint done;
+ u8 show_loops;
+};
- if (pfd->revents & POLLNVAL)
- bug("poll: invalid fd %d", pfd->fd);
+static void
+bird_thread_show_cli_cont(struct cli *c UNUSED)
+{
+ /* Explicitly do nothing to prevent CLI from trying to parse another command. */
+}
+
+static int
+bird_thread_show_cli_cleanup(struct cli *c UNUSED)
+{
+ return 1; /* Defer the cleanup until the writeout is finished. */
+}
- int e = 1;
+static void
+bird_thread_show(void *data)
+{
+ struct bird_thread_show_data *tsd = data;
- if (pfd->revents & POLLIN)
- while (e && *psk && (*psk)->rx_hook)
- e = sk_read(*psk, pfd->revents);
+ LOCK_DOMAIN(control, tsd->lock);
+ if (tsd->show_loops)
+ cli_printf(tsd->cli, -1026, "Thread %p", this_thread);
+
+ u64 total_time_ns = 0;
+ struct birdloop *loop;
+ WALK_LIST(loop, this_thread->loops)
+ {
+ if (tsd->show_loops)
+ cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS);
+ total_time_ns += loop->total_time_spent_ns;
+ }
+
+ tsd->done++;
+ int last = (tsd->done == tsd->total);
+
+ if (last)
+ {
+ tsd->cli->cont = NULL;
+ tsd->cli->cleanup = NULL;
+ }
- e = 1;
- if (pfd->revents & POLLOUT)
+ if (tsd->show_loops)
+ cli_printf(tsd->cli, (last ? 1 : -1) * 1026, " Total time: %t", total_time_ns NS);
+ else
+ cli_printf(tsd->cli, (last ? 1 : -1) * 1026, "Thread %p time %t", this_thread, total_time_ns NS);
+
+ UNLOCK_DOMAIN(control, tsd->lock);
+
+ if (last)
+ {
+ the_bird_lock();
+
+ for (int i=0; i<2; i++)
{
- loop->poll_changed = 1;
- while (e && *psk)
- e = sk_write(*psk);
+ struct birdloop_pickup_group *group = &pickup_groups[i];
+
+ LOCK_DOMAIN(resource, group->domain);
+ if (!EMPTY_LIST(group->loops))
+ if (tsd->show_loops)
+ {
+ cli_printf(tsd->cli, -1026, "Unassigned loops");
+ WALK_LIST(loop, group->loops)
+ cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS);
+ }
+ else
+ {
+ uint count = 0;
+ u64 total_time_ns = 0;
+ WALK_LIST(loop, group->loops)
+ {
+ count++;
+ total_time_ns += loop->total_time_spent_ns;
+ }
+ cli_printf(tsd->cli, -1026, "Unassigned loops: %d, total time %t", count, total_time_ns NS);
+ }
+ UNLOCK_DOMAIN(resource, group->domain);
}
+
+ cli_write_trigger(tsd->cli);
+ DOMAIN_FREE(control, tsd->lock);
+ rfree(tsd->pool);
+
+ the_bird_unlock();
}
}
+void
+cmd_show_threads(int show_loops)
+{
+ pool *p = rp_new(&root_pool, "Show Threads");
+
+ struct bird_thread_show_data *tsd = mb_allocz(p, sizeof(struct bird_thread_show_data));
+ tsd->lock = DOMAIN_NEW(control, "Show Threads");
+ tsd->cli = this_cli;
+ tsd->pool = p;
+ tsd->show_loops = show_loops;
+
+ this_cli->cont = bird_thread_show_cli_cont;
+ this_cli->cleanup = bird_thread_show_cli_cleanup;
+
+ for (int i=0; i<2; i++)
+ {
+ struct birdloop_pickup_group *group = &pickup_groups[i];
+
+ LOCK_DOMAIN(control, tsd->lock);
+ LOCK_DOMAIN(resource, group->domain);
+
+ struct bird_thread *thr;
+ WALK_LIST(thr, group->threads)
+ {
+ tsd->total++;
+ ev_send(&thr->priority_events, ev_new_init(p, bird_thread_show, tsd));
+ wakeup_do_kick(thr);
+ }
+
+ UNLOCK_DOMAIN(resource, group->domain);
+ UNLOCK_DOMAIN(control, tsd->lock);
+ }
+}
+
/*
* Birdloop
*/
-struct birdloop main_birdloop;
+static struct bird_thread main_thread;
+struct birdloop main_birdloop = { .thread = &main_thread, };
static void birdloop_enter_locked(struct birdloop *loop);
void
birdloop_init(void)
{
- wakeup_init(&main_birdloop);
+ ns_init();
+
+ for (int i=0; i<2; i++)
+ {
+ struct birdloop_pickup_group *group = &pickup_groups[i];
+
+ group->domain = DOMAIN_NEW(resource, "Loop Pickup");
+ init_list(&group->loops);
+ init_list(&group->threads);
+ }
+
+ wakeup_init(main_birdloop.thread);
main_birdloop.time.domain = the_bird_domain.the_bird;
main_birdloop.time.loop = &main_birdloop;
@@ -366,85 +1062,177 @@ birdloop_init(void)
times_update();
timers_init(&main_birdloop.time, &root_pool);
- root_pool.loop = &main_birdloop;
- main_birdloop.pool = &root_pool;
-
birdloop_enter_locked(&main_birdloop);
}
-static void birdloop_main(void *arg);
-
-void
-birdloop_free(resource *r)
+static void
+birdloop_stop_internal(struct birdloop *loop)
{
- struct birdloop *loop = (void *) r;
+ LOOP_TRACE(loop, "Stopping");
- ASSERT_DIE(loop->links == 0);
- domain_free(loop->time.domain);
-}
+ /* Block incoming pings */
+ u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire);
+ while (!atomic_compare_exchange_strong_explicit(
+ &loop->thread_transition, &ltt, LTT_PING,
+ memory_order_acq_rel, memory_order_acquire))
+ ;
-void
-birdloop_dump(resource *r)
-{
- struct birdloop *loop = (void *) r;
+ /* Flush remaining events */
+ ASSERT_DIE(!ev_run_list(&loop->event_list));
+
+ /* Drop timers */
+ timer *t;
+ while (t = timers_first(&loop->time))
+ tm_stop(t);
- debug("%s\n", loop->pool->name);
+ /* Drop sockets */
+ sock *s;
+ WALK_LIST_FIRST2(s, n, loop->sock_list)
+ birdloop_remove_socket(loop, s);
+
+ /* Unschedule from Meta */
+ ev_postpone(&loop->event);
+ tm_stop(&loop->timer);
+
+ /* Remove from thread loop list */
+ rem_node(&loop->n);
+ loop->thread = NULL;
+
+ /* Leave the loop context without causing any other fuss */
+ ASSERT_DIE(!ev_active(&loop->event));
+ loop->ping_pending = 0;
+ birdloop_leave(loop);
+
+ /* Request local socket reload */
+ this_thread->sock_changed++;
+
+ /* Tail-call the stopped hook */
+ loop->stopped(loop->stop_data);
}
-struct resmem birdloop_memsize(resource *r)
+static void
+birdloop_run(void *_loop)
{
- struct birdloop *loop = (void *) r;
+ /* Run priority events before the loop is executed */
+ ev_run_list(&this_thread->priority_events);
+
+ u64 start_time = ns_now();
+ u64 end_time = start_time + this_thread->max_loop_time_ns;
+
+ struct birdloop *loop = _loop;
+ birdloop_enter(loop);
+
+ u64 locked_time = ns_now(), task_done_time;
+ if (locked_time > end_time)
+ LOOP_WARN(loop, "locked %luns after its scheduled end time", locked_time - end_time);
+
+ uint repeat, loop_runs = 0;
+ do {
+ repeat = 0;
+ LOOP_TRACE(loop, "Regular run");
+ loop_runs++;
+
+ if (loop->stopped)
+ /* Birdloop left inside the helper function */
+ return birdloop_stop_internal(loop);
+
+ /* Process sockets */
+ repeat += sockets_fire(loop);
- return (struct resmem) {
- .effective = sizeof(struct birdloop) - sizeof(resource) - ALLOC_OVERHEAD,
- .overhead = ALLOC_OVERHEAD + sizeof(resource) + page_size * list_length(&loop->pages.list),
- };
+ /* Run timers */
+ timers_fire(&loop->time, 0);
+
+ /* Run flag handlers */
+ repeat += birdloop_process_flags(loop);
+
+ /* Run events */
+ repeat += ev_run_list(&loop->event_list);
+
+ /* Check end time */
+ } while (((task_done_time = ns_now()) < end_time) && repeat);
+
+ /* Request meta timer */
+ timer *t = timers_first(&loop->time);
+ if (t)
+ tm_start_in(&loop->timer, tm_remains(t), this_thread->meta);
+ else
+ tm_stop(&loop->timer);
+
+ /* Request re-run if needed */
+ if (repeat)
+ ev_send_loop(this_thread->meta, &loop->event);
+
+ /* Collect socket change requests */
+ this_thread->sock_changed += loop->sock_changed;
+ loop->sock_changed = 0;
+
+ birdloop_leave(loop);
}
-struct resclass birdloop_class = {
- .name = "IO Loop",
- .size = sizeof(struct birdloop),
- .free = birdloop_free,
- .dump = birdloop_dump,
- .memsize = birdloop_memsize,
-};
+static void
+birdloop_run_timer(timer *tm)
+{
+ struct birdloop *loop = tm->data;
+ LOOP_TRACE(loop, "Timer ready, requesting run");
+ ev_send_loop(loop->thread->meta, &loop->event);
+}
-struct birdloop *
-birdloop_new(pool *pp, uint order, const char *name)
+static struct birdloop *
+birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup, struct birdloop_pickup_group *group)
{
struct domain_generic *dg = domain_new(name, order);
- struct birdloop *loop = ralloc(pp, &birdloop_class);
+ pool *p = rp_new(pp, name);
+ struct birdloop *loop = mb_allocz(p, sizeof(struct birdloop));
+ loop->pool = p;
loop->time.domain = dg;
loop->time.loop = loop;
- birdloop_enter(loop);
+ atomic_store_explicit(&loop->thread_transition, 0, memory_order_relaxed);
- loop->pool = rp_new(pp, loop, name);
- loop->parent = pp;
- rmove(&loop->r, loop->pool);
+ birdloop_enter(loop);
- wakeup_init(loop);
ev_init_list(&loop->event_list, loop, name);
- timers_init(&loop->time, loop->pool);
+ timers_init(&loop->time, p);
sockets_init(loop);
- init_pages(loop);
+ loop->event = (event) { .hook = birdloop_run, .data = loop, };
+ loop->timer = (timer) { .hook = birdloop_run_timer, .data = loop, };
- loop->time.coro = coro_run(loop->pool, birdloop_main, loop);
+ if (request_pickup)
+ {
+ LOCK_DOMAIN(resource, group->domain);
+ add_tail(&group->loops, &loop->n);
+ if (EMPTY_LIST(group->threads))
+ bird_thread_start(group);
+
+ wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads)));
+ UNLOCK_DOMAIN(resource, group->domain);
+ }
+ else
+ loop->n.next = loop->n.prev = &loop->n;
birdloop_leave(loop);
return loop;
}
+struct birdloop *
+birdloop_new(pool *pp, uint order, const char *name, btime max_latency)
+{
+ return birdloop_new_internal(pp, order, name, 1, max_latency ? &pickup_groups[1] : &pickup_groups[0]);
+}
+
static void
birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
{
+ LOOP_TRACE(loop, "Stop requested");
+
loop->stopped = stopped;
loop->stop_data = data;
- wakeup_do_kick(loop);
+
+ birdloop_do_ping(loop);
}
void
@@ -464,6 +1252,15 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat
birdloop_do_stop(loop, stopped, data);
}
+void
+birdloop_free(struct birdloop *loop)
+{
+ ASSERT_DIE(loop->thread == NULL);
+
+ domain_free(loop->time.domain);
+ rfree(loop->pool);
+}
+
static void
birdloop_enter_locked(struct birdloop *loop)
{
@@ -490,6 +1287,14 @@ birdloop_leave_locked(struct birdloop *loop)
/* Check the current context */
ASSERT_DIE(birdloop_current == loop);
+ /* Send pending pings */
+ if (loop->ping_pending)
+ {
+ LOOP_TRACE(loop, "sending pings on leave");
+ loop->ping_pending = 0;
+ birdloop_do_ping(loop);
+ }
+
/* Restore the old context */
birdloop_current = loop->prev_loop;
}
@@ -514,107 +1319,13 @@ birdloop_unmask_wakeups(struct birdloop *loop)
ASSERT_DIE(birdloop_wakeup_masked == loop);
birdloop_wakeup_masked = NULL;
if (birdloop_wakeup_masked_count)
- wakeup_do_kick(loop);
+ wakeup_do_kick(loop->thread);
birdloop_wakeup_masked_count = 0;
}
void
-birdloop_link(struct birdloop *loop)
-{
- ASSERT_DIE(birdloop_inside(loop));
- loop->links++;
-}
-
-void
-birdloop_unlink(struct birdloop *loop)
-{
- ASSERT_DIE(birdloop_inside(loop));
- ASSERT_DIE(loop->links);
- if (!--loop->links)
- birdloop_ping(loop);
-}
-
-static void
-birdloop_main(void *arg)
+birdloop_yield(void)
{
- struct birdloop *loop = arg;
- timer *t;
- int rv, timeout;
-
- btime loop_begin = current_time();
-
- birdloop_enter(loop);
- while (1)
- {
- timers_fire(&loop->time, 0);
- if (ev_run_list(&loop->event_list))
- timeout = 0;
- else if (t = timers_first(&loop->time))
- timeout = (tm_remains(t) TO_MS) + 1;
- else
- timeout = -1;
-
- if (loop->poll_changed)
- sockets_prepare(loop);
-
- btime duration = current_time_update() - loop_begin;
- if (duration > config->watchdog_warning)
- log(L_WARN "I/O loop cycle took %d ms", (int) (duration TO_MS));
-
- birdloop_leave(loop);
-
- try:
- rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout);
- if (rv < 0)
- {
- if (errno == EINTR || errno == EAGAIN)
- goto try;
- die("poll: %m");
- }
-
- birdloop_enter(loop);
-
- if (loop->stopped && !loop->links)
- break;
-
- loop_begin = current_time_update();
-
- if (rv)
- sockets_fire(loop);
-
- atomic_exchange_explicit(&loop->ping_sent, 0, memory_order_acq_rel);
- }
-
- /* Flush remaining events */
- ASSERT_DIE(!ev_run_list(&loop->event_list));
-
- /* No timers allowed */
- ASSERT_DIE(timers_count(&loop->time) == 0);
- ASSERT_DIE(EMPTY_LIST(loop->sock_list));
- ASSERT_DIE(loop->sock_num == 0);
-
- birdloop_leave(loop);
-
- /* Lock parent loop */
- pool *parent = loop->parent;
- birdloop_enter(parent->loop);
-
- /* Move the loop temporarily to parent pool */
- birdloop_enter(loop);
- rmove(&loop->r, parent);
- birdloop_leave(loop);
-
- /* Announce loop stop */
- loop->stopped(loop->stop_data);
-
- /* Free the pool and loop */
- birdloop_enter(loop);
- rp_free(loop->pool, parent);
- flush_pages(loop);
- birdloop_leave(loop);
- rfree(&loop->r);
-
- /* And finally leave the parent loop before finishing */
- birdloop_leave(parent->loop);
+ usleep(100);
}