diff options
Diffstat (limited to 'sysdep/unix/io-loop.c')
-rw-r--r-- | sysdep/unix/io-loop.c | 1297 |
1 files changed, 1004 insertions, 293 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index 9b107a1a..efb408e0 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -17,8 +17,8 @@ #include "nest/bird.h" #include "lib/buffer.h" -#include "lib/coro.h" #include "lib/lists.h" +#include "lib/locking.h" #include "lib/resource.h" #include "lib/event.h" #include "lib/timer.h" @@ -27,15 +27,54 @@ #include "lib/io-loop.h" #include "sysdep/unix/io-loop.h" #include "conf/conf.h" +#include "nest/cli.h" + +#define THREAD_STACK_SIZE 65536 /* To be lowered in near future */ + +static struct birdloop *birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup, struct birdloop_pickup_group *group); + +/* + * Nanosecond time for accounting purposes + * + * A fixed point on startup is set as zero, all other values are relative to that. + * Caution: this overflows after like 500 years or so. If you plan to run + * BIRD for such a long time, please implement some means of overflow prevention. + */ + +static struct timespec ns_begin; + +static void ns_init(void) +{ + if (clock_gettime(CLOCK_MONOTONIC, &ns_begin)) + bug("clock_gettime: %m"); +} + +static u64 ns_now(void) +{ + struct timespec ts; + if (clock_gettime(CLOCK_MONOTONIC, &ts)) + bug("clock_gettime: %m"); + + return (u64) (ts.tv_sec - ns_begin.tv_sec) * 1000000000 + ts.tv_nsec - ns_begin.tv_nsec; +} + /* * Current thread context */ -_Thread_local struct birdloop *birdloop_current = NULL; +_Thread_local struct birdloop *birdloop_current; static _Thread_local struct birdloop *birdloop_wakeup_masked; static _Thread_local uint birdloop_wakeup_masked_count; +#define LOOP_NAME(loop) domain_name((loop)->time.domain) + +#define LOOP_TRACE(loop, fmt, args...) do { if (config && config->latency_debug) log(L_TRACE "%s (%p): " fmt, LOOP_NAME(loop), (loop), ##args); } while (0) +#define THREAD_TRACE(...) do { if (config && config->latency_debug) log(L_TRACE "Thread: " __VA_ARGS__); } while (0) + +#define LOOP_WARN(loop, fmt, args...) log(L_TRACE "%s (%p): " fmt, LOOP_NAME(loop), (loop), ##args) + + event_list * birdloop_event_list(struct birdloop *loop) { @@ -48,12 +87,6 @@ birdloop_time_loop(struct birdloop *loop) return &loop->time; } -pool * -birdloop_pool(struct birdloop *loop) -{ - return loop->pool; -} - _Bool birdloop_inside(struct birdloop *loop) { @@ -64,91 +97,210 @@ birdloop_inside(struct birdloop *loop) return 0; } +_Bool +birdloop_in_this_thread(struct birdloop *loop) +{ + return pthread_equal(pthread_self(), loop->thread->thread_id); +} + +void +birdloop_flag(struct birdloop *loop, u32 flag) +{ + atomic_fetch_or_explicit(&loop->flags, flag, memory_order_acq_rel); + birdloop_ping(loop); +} + +void +birdloop_flag_set_handler(struct birdloop *loop, struct birdloop_flag_handler *fh) +{ + ASSERT_DIE(birdloop_inside(loop)); + loop->flag_handler = fh; +} + +static int +birdloop_process_flags(struct birdloop *loop) +{ + if (!loop->flag_handler) + return 0; + + u32 flags = atomic_exchange_explicit(&loop->flags, 0, memory_order_acq_rel); + if (!flags) + return 0; + + loop->flag_handler->hook(loop->flag_handler, flags); + return 1; +} + /* * Wakeup code for birdloop */ -static void -pipe_new(int *pfds) +void +pipe_new(struct pipe *p) { - int rv = pipe(pfds); + int rv = pipe(p->fd); if (rv < 0) die("pipe: %m"); - if (fcntl(pfds[0], F_SETFL, O_NONBLOCK) < 0) + if (fcntl(p->fd[0], F_SETFL, O_NONBLOCK) < 0) die("fcntl(O_NONBLOCK): %m"); - if (fcntl(pfds[1], F_SETFL, O_NONBLOCK) < 0) + if (fcntl(p->fd[1], F_SETFL, O_NONBLOCK) < 0) die("fcntl(O_NONBLOCK): %m"); } void -pipe_drain(int fd) +pipe_drain(struct pipe *p) { - char buf[64]; - int rv; - - try: - rv = read(fd, buf, 64); - if (rv < 0) - { - if (errno == EINTR) - goto try; - if (errno == EAGAIN) + while (1) { + char buf[64]; + int rv = read(p->fd[0], buf, sizeof(buf)); + if ((rv < 0) && (errno == EAGAIN)) return; - die("wakeup read: %m"); + + if (rv == 0) + bug("wakeup read eof"); + if ((rv < 0) && (errno != EINTR)) + bug("wakeup read: %m"); + } +} + +int +pipe_read_one(struct pipe *p) +{ + while (1) { + char v; + int rv = read(p->fd[0], &v, sizeof(v)); + if (rv == 1) + return 1; + if ((rv < 0) && (errno == EAGAIN)) + return 0; + if (rv > 1) + bug("wakeup read more bytes than expected: %d", rv); + if (rv == 0) + bug("wakeup read eof"); + if (errno != EINTR) + bug("wakeup read: %m"); } - if (rv == 64) - goto try; } void -pipe_kick(int fd) +pipe_kick(struct pipe *p) { - u64 v = 1; + char v = 1; int rv; - try: - rv = write(fd, &v, sizeof(u64)); - if (rv < 0) - { - if (errno == EINTR) - goto try; - if (errno == EAGAIN) + while (1) { + rv = write(p->fd[1], &v, sizeof(v)); + if ((rv >= 0) || (errno == EAGAIN)) return; - die("wakeup write: %m"); + if (errno != EINTR) + bug("wakeup write: %m"); } } +void +pipe_pollin(struct pipe *p, struct pfd *pfd) +{ + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = p->fd[0], + .events = POLLIN, + }; + BUFFER_PUSH(pfd->loop) = NULL; +} + static inline void -wakeup_init(struct birdloop *loop) +wakeup_init(struct bird_thread *loop) { - pipe_new(loop->wakeup_fds); + pipe_new(&loop->wakeup); } static inline void -wakeup_drain(struct birdloop *loop) +wakeup_drain(struct bird_thread *loop) { - pipe_drain(loop->wakeup_fds[0]); + pipe_drain(&loop->wakeup); } static inline void -wakeup_do_kick(struct birdloop *loop) +wakeup_do_kick(struct bird_thread *loop) { - pipe_kick(loop->wakeup_fds[1]); + pipe_kick(&loop->wakeup); } -void -birdloop_ping(struct birdloop *loop) +static inline _Bool +birdloop_try_ping(struct birdloop *loop, u32 ltt) { - u32 ping_sent = atomic_fetch_add_explicit(&loop->ping_sent, 1, memory_order_acq_rel); - if (ping_sent) - return; + /* Somebody else is already pinging, be idempotent */ + if (ltt & LTT_PING) + { + LOOP_TRACE(loop, "already being pinged"); + return 0; + } + + /* Thread moving is an implicit ping */ + if (ltt & LTT_MOVE) + { + LOOP_TRACE(loop, "ping while moving"); + return 1; + } + /* No more flags allowed */ + ASSERT_DIE(!ltt); + + /* No ping when not picked up */ + if (!loop->thread) + { + LOOP_TRACE(loop, "not picked up yet, can't ping"); + return 1; + } + + /* No ping when masked */ if (loop == birdloop_wakeup_masked) + { + LOOP_TRACE(loop, "wakeup masked, can't ping"); birdloop_wakeup_masked_count++; + return 1; + } + + /* Send meta event to ping */ + if ((loop != loop->thread->meta) && (loop != &main_birdloop)) + { + LOOP_TRACE(loop, "Ping by meta event to %p", loop->thread->meta); + ev_send_loop(loop->thread->meta, &loop->event); + return 1; + } + + /* Do the real ping */ + LOOP_TRACE(loop, "sending pipe ping"); + wakeup_do_kick(loop->thread); + return 0; +} + +static inline void +birdloop_do_ping(struct birdloop *loop) +{ + /* Register our ping effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_PING, memory_order_acq_rel); + + /* Try to ping in multiple ways */ + if (birdloop_try_ping(loop, ltt)) + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_PING, memory_order_acq_rel); +} + +void +birdloop_ping(struct birdloop *loop) +{ + if (!birdloop_inside(loop)) + { + LOOP_TRACE(loop, "ping from outside"); + birdloop_do_ping(loop); + } else - wakeup_do_kick(loop); + { + LOOP_TRACE(loop, "ping from inside, pending=%d", loop->ping_pending); + if (!loop->ping_pending) + loop->ping_pending++; + } } @@ -161,204 +313,748 @@ sockets_init(struct birdloop *loop) { init_list(&loop->sock_list); loop->sock_num = 0; +} + +void +socket_changed(sock *s) +{ + struct birdloop *loop = s->loop; + ASSERT_DIE(birdloop_inside(loop)); - BUFFER_INIT(loop->poll_sk, loop->pool, 4); - BUFFER_INIT(loop->poll_fd, loop->pool, 4); - loop->poll_changed = 1; /* add wakeup fd */ + loop->sock_changed++; + birdloop_ping(loop); } -static void -sockets_add(struct birdloop *loop, sock *s) +void +birdloop_add_socket(struct birdloop *loop, sock *s) { - ASSERT_DIE(!enlisted(&s->n)); + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(!s->loop); + LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num); add_tail(&loop->sock_list, &s->n); loop->sock_num++; s->loop = loop; s->index = -1; - loop->poll_changed = 1; - birdloop_ping(loop); + socket_changed(s); } +extern sock *stored_sock; /* mainloop hack */ + void -sk_start(sock *s) +birdloop_remove_socket(struct birdloop *loop, sock *s) { - ASSERT_DIE(birdloop_current != &main_birdloop); - sockets_add(birdloop_current, s); -} + ASSERT_DIE(!enlisted(&s->n) == !s->loop); -static void -sockets_remove(struct birdloop *loop, sock *s) -{ + if (!s->loop) + return; + + ASSERT_DIE(birdloop_inside(loop)); ASSERT_DIE(s->loop == loop); - if (!enlisted(&s->n)) - return; + /* Decouple the socket from the loop at all. */ + LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num); + + if (loop->sock_active == s) + loop->sock_active = sk_next(s); + + if ((loop == &main_birdloop) && (s == stored_sock)) + stored_sock = sk_next(s); rem_node(&s->n); loop->sock_num--; - if (s->index >= 0) - { - loop->poll_sk.data[s->index] = NULL; - s->index = -1; - loop->poll_changed = 1; - birdloop_ping(loop); - } + socket_changed(s); s->loop = NULL; + s->index = -1; } void -sk_stop(sock *s) +sk_reloop(sock *s, struct birdloop *loop) { - sockets_remove(birdloop_current, s); + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(birdloop_inside(s->loop)); + + if (loop == s->loop) + return; + + birdloop_remove_socket(s->loop, s); + birdloop_add_socket(loop, s); +} + +void +sk_pause_rx(struct birdloop *loop, sock *s) +{ + ASSERT_DIE(birdloop_inside(loop)); + s->rx_hook = NULL; + socket_changed(s); +} + +void +sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint)) +{ + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(hook); + s->rx_hook = hook; + socket_changed(s); } static inline uint sk_want_events(sock *s) +{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); } + +void +sockets_prepare(struct birdloop *loop, struct pfd *pfd) { - uint out = ((s->ttx != s->tpos) ? POLLOUT : 0); - if (s->rx_hook) - if (s->cork) + node *n; + WALK_LIST(n, loop->sock_list) + { + sock *s = SKIP_BACK(sock, n, n); + uint w = sk_want_events(s); + + if (!w) { - LOCK_DOMAIN(cork, s->cork->lock); - if (!enlisted(&s->cork_node)) - if (s->cork->count) - { -// log(L_TRACE "Socket %p corked", s); - add_tail(&s->cork->sockets, &s->cork_node); - } - else - out |= POLLIN; - UNLOCK_DOMAIN(cork, s->cork->lock); + s->index = -1; + continue; } - else - out |= POLLIN; -// log(L_TRACE "sk_want_events(%p) = %x", s, out); - return out; + s->index = pfd->pfd.used; + LOOP_TRACE(loop, "socket %p poll index is %d", s, s->index); + + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = s->fd, + .events = sk_want_events(s), + }; + BUFFER_PUSH(pfd->loop) = loop; + } } +int sk_read(sock *s, int revents); +int sk_write(sock *s); +void sk_err(sock *s, int revents); -void -sk_ping(sock *s) +static int +sockets_fire(struct birdloop *loop) { - s->loop->poll_changed = 1; - birdloop_ping(s->loop); + if (EMPTY_LIST(loop->sock_list)) + return 0; + + int repeat = 0; + + times_update(); + + struct pollfd *pfd = loop->thread->pfd->pfd.data; + loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list)); + + while (loop->sock_active) + { + sock *s = loop->sock_active; + + int rev; + if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL)) + { + int e = 1; + + if (rev & POLLOUT) + { + /* Write everything. */ + while ((s == loop->sock_active) && (e = sk_write(s))) + ; + + if (s != loop->sock_active) + continue; + + if (!sk_tx_pending(s)) + loop->thread->sock_changed++; + } + + if (rev & POLLIN) + /* Read just one packet and request repeat. */ + if ((s == loop->sock_active) && s->rx_hook) + if (sk_read(s, rev)) + repeat++; + + if (s != loop->sock_active) + continue; + + if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR)) + sk_err(s, rev); + + if (s != loop->sock_active) + continue; + } + + loop->sock_active = sk_next(s); + } + + return repeat; } /* -FIXME: this should be called from sock code + * Threads + */ + +DEFINE_DOMAIN(resource); + +struct birdloop_pickup_group { + DOMAIN(resource) domain; + list loops; + list threads; + btime max_latency; +} pickup_groups[2] = { + { + /* all zeroes */ + }, + { + /* FIXME: make this dynamic, now it copies the loop_max_latency value from proto/bfd/config.Y */ + .max_latency = 10 MS, + }, +}; + +static _Thread_local struct bird_thread *this_thread; static void -sockets_update(struct birdloop *loop, sock *s) +birdloop_set_thread(struct birdloop *loop, struct bird_thread *thr, struct birdloop_pickup_group *group) +{ + struct bird_thread *old = loop->thread; + ASSERT_DIE(!thr != !old); + + /* Signal our moving effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_MOVE, memory_order_acq_rel); + ASSERT_DIE((ltt & LTT_MOVE) == 0); + + while (ltt & LTT_PING) + { + birdloop_yield(); + ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + ASSERT_DIE(ltt & LTT_MOVE); + } + /* Now we are free of running pings */ + + if (loop->thread = thr) + { + add_tail(&thr->loops, &loop->n); + thr->loop_count++; + } + else + { + old->loop_count--; + + LOCK_DOMAIN(resource, group->domain); + add_tail(&group->loops, &loop->n); + UNLOCK_DOMAIN(resource, group->domain); + } + + /* Finished */ + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel); + + /* Request to run by force */ + ev_send_loop(loop->thread->meta, &loop->event); +} + +static struct birdloop * +birdloop_take(struct birdloop_pickup_group *group) { - if (s->index >= 0) - loop->poll_fd.data[s->index].events = sk_want_events(s); + struct birdloop *loop = NULL; + + LOCK_DOMAIN(resource, group->domain); + if (!EMPTY_LIST(group->loops)) + { + /* Take the first loop from the pickup list and unlock */ + loop = SKIP_BACK(struct birdloop, n, HEAD(group->loops)); + rem_node(&loop->n); + UNLOCK_DOMAIN(resource, group->domain); + + birdloop_set_thread(loop, this_thread, group); + + /* This thread goes to the end of the pickup list */ + LOCK_DOMAIN(resource, group->domain); + rem_node(&this_thread->n); + add_tail(&group->threads, &this_thread->n); + + /* If there are more loops to be picked up, wakeup the next thread in order */ + if (!EMPTY_LIST(group->loops)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads))); + } + UNLOCK_DOMAIN(resource, group->domain); + + return loop; } -*/ static void -sockets_prepare(struct birdloop *loop) +birdloop_drop(struct birdloop *loop, struct birdloop_pickup_group *group) { - BUFFER_SET(loop->poll_sk, loop->sock_num + 1); - BUFFER_SET(loop->poll_fd, loop->sock_num + 1); + /* Remove loop from this thread's list */ + rem_node(&loop->n); - struct pollfd *pfd = loop->poll_fd.data; - sock **psk = loop->poll_sk.data; - uint i = 0; - node *n; + /* Unset loop's thread */ + if (birdloop_inside(loop)) + birdloop_set_thread(loop, NULL, group); + else + { + birdloop_enter(loop); + birdloop_set_thread(loop, NULL, group); + birdloop_leave(loop); + } - WALK_LIST(n, loop->sock_list) + /* Put loop into pickup list */ + LOCK_DOMAIN(resource, group->domain); + add_tail(&group->loops, &loop->n); + UNLOCK_DOMAIN(resource, group->domain); +} + +static int +poll_timeout(struct birdloop *loop) +{ + timer *t = timers_first(&loop->time); + if (!t) + return -1; + + btime remains = tm_remains(t); + return remains TO_MS + ((remains TO_MS) MS < remains); +} + +static void * +bird_thread_main(void *arg) +{ + struct bird_thread *thr = this_thread = arg; + + rcu_thread_start(&thr->rcu); + synchronize_rcu(); + + tmp_init(thr->pool); + init_list(&thr->loops); + + thr->meta = birdloop_new_internal(thr->pool, DOMAIN_ORDER(meta), "Thread Meta", 0, thr->group); + thr->meta->thread = thr; + birdloop_enter(thr->meta); + + thr->sock_changed = 1; + + struct pfd pfd; + BUFFER_INIT(pfd.pfd, thr->pool, 16); + BUFFER_INIT(pfd.loop, thr->pool, 16); + thr->pfd = &pfd; + + while (1) { - sock *s = SKIP_BACK(sock, n, n); + u64 thr_loop_start = ns_now(); + int timeout; + + /* Pickup new loops */ + struct birdloop *loop = birdloop_take(thr->group); + if (loop) + { + birdloop_enter(loop); + if (!EMPTY_LIST(loop->sock_list)) + thr->sock_changed = 1; + birdloop_leave(loop); + } + + /* Schedule all loops with timed out timers */ + timers_fire(&thr->meta->time, 0); + + /* Compute maximal time per loop */ + u64 thr_before_run = ns_now(); + if (thr->loop_count > 0) + thr->max_loop_time_ns = (thr->max_latency_ns / 2 - (thr_before_run - thr_loop_start)) / (u64) thr->loop_count; + + /* Run all scheduled loops */ + int more_events = ev_run_list(&thr->meta->event_list); + if (more_events) + { + THREAD_TRACE("More events to run"); + timeout = 0; + } + else + { + timeout = poll_timeout(thr->meta); + if (timeout == -1) + THREAD_TRACE("No timers, no events"); + else + THREAD_TRACE("Next timer in %d ms", timeout); + } + + /* Run priority events before sleeping */ + ev_run_list(&thr->priority_events); + + /* Do we have to refresh sockets? */ + if (thr->sock_changed) + { + thr->sock_changed = 0; + + BUFFER_FLUSH(pfd.pfd); + BUFFER_FLUSH(pfd.loop); + + pipe_pollin(&thr->wakeup, &pfd); + + node *nn; + WALK_LIST2(loop, nn, thr->loops, n) + { + birdloop_enter(loop); + sockets_prepare(loop, &pfd); + birdloop_leave(loop); + } + + ASSERT_DIE(pfd.loop.used == pfd.pfd.used); + } + /* Nothing to do in at least 5 seconds, flush local hot page cache */ + else if (timeout > 5000) + flush_local_pages(); + +poll_retry:; + int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout); + if (rv < 0) + { + if (errno == EINTR || errno == EAGAIN) + goto poll_retry; + bug("poll in %p: %m", thr); + } - ASSERT(i < loop->sock_num); + /* Drain wakeup fd */ + if (pfd.pfd.data[0].revents & POLLIN) + { + ASSERT_DIE(rv > 0); + rv--; + wakeup_drain(thr); + } - s->index = i; - *psk = s; - pfd->fd = s->fd; - pfd->events = sk_want_events(s); - pfd->revents = 0; + atomic_fetch_and_explicit(&thr->meta->thread_transition, ~LTT_PING, memory_order_acq_rel); - pfd++; - psk++; - i++; + /* Schedule loops with active sockets */ + if (rv) + for (uint i = 1; i < pfd.pfd.used; i++) + if (pfd.pfd.data[i].revents) + { + LOOP_TRACE(pfd.loop.data[i], "socket id %d got revents=%d", i, pfd.pfd.data[i].revents); + ev_send_loop(thr->meta, &pfd.loop.data[i]->event); + } } - ASSERT(i == loop->sock_num); + bug("An infinite loop has ended."); +} + +static void +bird_thread_cleanup(void *_thr) +{ + struct bird_thread *thr = _thr; + ASSERT_DIE(birdloop_inside(&main_birdloop)); + + /* Thread attributes no longer needed */ + pthread_attr_destroy(&thr->thread_attr); + + /* Free all remaining memory */ + rfree(thr->pool); +} + +static struct bird_thread * +bird_thread_start(struct birdloop_pickup_group *group) +{ + ASSERT_DIE(birdloop_inside(&main_birdloop)); + ASSERT_DIE(DOMAIN_IS_LOCKED(resource, group->domain)); + + pool *p = rp_new(&root_pool, "Thread"); + + struct bird_thread *thr = mb_allocz(p, sizeof(*thr)); + thr->pool = p; + thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, }; + thr->group = group; + thr->max_latency_ns = (group->max_latency ?: 5 S) TO_NS; + + wakeup_init(thr); + ev_init_list(&thr->priority_events, NULL, "Thread direct event list"); - /* Add internal wakeup fd */ - *psk = NULL; - pfd->fd = loop->wakeup_fds[0]; - pfd->events = POLLIN; - pfd->revents = 0; + add_tail(&group->threads, &thr->n); - loop->poll_changed = 0; + int e = 0; + + if (e = pthread_attr_init(&thr->thread_attr)) + die("pthread_attr_init() failed: %M", e); + + /* We don't have to worry about thread stack size so much. + if (e = pthread_attr_setstacksize(&thr->thread_attr, THREAD_STACK_SIZE)) + die("pthread_attr_setstacksize(%u) failed: %M", THREAD_STACK_SIZE, e); + */ + + if (e = pthread_attr_setdetachstate(&thr->thread_attr, PTHREAD_CREATE_DETACHED)) + die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e); + + if (e = pthread_create(&thr->thread_id, &thr->thread_attr, bird_thread_main, thr)) + die("pthread_create() failed: %M", e); + + return thr; } -int sk_read(sock *s, int revents); -int sk_write(sock *s); +static struct birdloop *thread_dropper; +static event *thread_dropper_event; +static uint thread_dropper_goal; static void -sockets_fire(struct birdloop *loop) +bird_thread_shutdown(void * _ UNUSED) { - struct pollfd *pfd = loop->poll_fd.data; - sock **psk = loop->poll_sk.data; - int poll_num = loop->poll_fd.used - 1; + struct birdloop_pickup_group *group = this_thread->group; + LOCK_DOMAIN(resource, group->domain); + int dif = list_length(&group->threads) - thread_dropper_goal; + struct birdloop *tdl_stop = NULL; - times_update(); + if (dif > 0) + ev_send_loop(thread_dropper, thread_dropper_event); + else + { + tdl_stop = thread_dropper; + thread_dropper = NULL; + } + + UNLOCK_DOMAIN(resource, group->domain); - /* Last fd is internal wakeup fd */ - if (pfd[poll_num].revents & POLLIN) + DBG("Thread pickup size differs from dropper goal by %d%s\n", dif, tdl_stop ? ", stopping" : ""); + + if (tdl_stop) { - wakeup_drain(loop); - loop->poll_changed = 1; + birdloop_stop_self(tdl_stop, NULL, NULL); + return; } - int i; - for (i = 0; i < poll_num; pfd++, psk++, i++) + struct bird_thread *thr = this_thread; + + /* Leave the thread-picker list to get no more loops */ + LOCK_DOMAIN(resource, group->domain); + rem_node(&thr->n); + UNLOCK_DOMAIN(resource, group->domain); + + /* Drop loops including the thread dropper itself */ + while (!EMPTY_LIST(thr->loops)) + birdloop_drop(HEAD(thr->loops), group); + + /* Let others know about new loops */ + if (!EMPTY_LIST(group->loops)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads))); + UNLOCK_DOMAIN(resource, group->domain); + + /* Leave the thread-dropper loop as we aren't going to return. */ + birdloop_leave(thread_dropper); + + /* Stop the meta loop */ + birdloop_leave(thr->meta); + domain_free(thr->meta->time.domain); + rfree(thr->meta->pool); + + /* Local pages not needed anymore */ + flush_local_pages(); + + /* Unregister from RCU */ + rcu_thread_stop(&thr->rcu); + + /* Request thread cleanup from main loop */ + ev_send_loop(&main_birdloop, &thr->cleanup_event); + + /* Exit! */ + pthread_exit(NULL); +} + + +void +bird_thread_commit(struct config *new, struct config *old UNUSED) +{ + ASSERT_DIE(birdloop_inside(&main_birdloop)); + + if (new->shutdown) + return; + + if (!new->thread_count) + new->thread_count = 1; + + while (1) { - if (!*psk) - continue; + struct birdloop_pickup_group *group = &pickup_groups[0]; + LOCK_DOMAIN(resource, group->domain); + + int dif = list_length(&group->threads) - (thread_dropper_goal = new->thread_count); + _Bool thread_dropper_running = !!thread_dropper; - if (! pfd->revents) + if (dif < 0) + { + bird_thread_start(group); + UNLOCK_DOMAIN(resource, group->domain); continue; + } + + UNLOCK_DOMAIN(resource, group->domain); + + if ((dif > 0) && !thread_dropper_running) + { + struct birdloop *tdl = birdloop_new(&root_pool, DOMAIN_ORDER(control), "Thread dropper", group->max_latency); + event *tde = ev_new_init(tdl->pool, bird_thread_shutdown, NULL); + + LOCK_DOMAIN(resource, group->domain); + thread_dropper = tdl; + thread_dropper_event = tde; + UNLOCK_DOMAIN(resource, group->domain); + + ev_send_loop(thread_dropper, thread_dropper_event); + } + + return; + } +} + + +DEFINE_DOMAIN(control); + +struct bird_thread_show_data { + cli *cli; + pool *pool; + DOMAIN(control) lock; + uint total; + uint done; + u8 show_loops; +}; - if (pfd->revents & POLLNVAL) - bug("poll: invalid fd %d", pfd->fd); +static void +bird_thread_show_cli_cont(struct cli *c UNUSED) +{ + /* Explicitly do nothing to prevent CLI from trying to parse another command. */ +} + +static int +bird_thread_show_cli_cleanup(struct cli *c UNUSED) +{ + return 1; /* Defer the cleanup until the writeout is finished. */ +} - int e = 1; +static void +bird_thread_show(void *data) +{ + struct bird_thread_show_data *tsd = data; - if (pfd->revents & POLLIN) - while (e && *psk && (*psk)->rx_hook) - e = sk_read(*psk, pfd->revents); + LOCK_DOMAIN(control, tsd->lock); + if (tsd->show_loops) + cli_printf(tsd->cli, -1026, "Thread %p", this_thread); + + u64 total_time_ns = 0; + struct birdloop *loop; + WALK_LIST(loop, this_thread->loops) + { + if (tsd->show_loops) + cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS); + total_time_ns += loop->total_time_spent_ns; + } + + tsd->done++; + int last = (tsd->done == tsd->total); + + if (last) + { + tsd->cli->cont = NULL; + tsd->cli->cleanup = NULL; + } - e = 1; - if (pfd->revents & POLLOUT) + if (tsd->show_loops) + cli_printf(tsd->cli, (last ? 1 : -1) * 1026, " Total time: %t", total_time_ns NS); + else + cli_printf(tsd->cli, (last ? 1 : -1) * 1026, "Thread %p time %t", this_thread, total_time_ns NS); + + UNLOCK_DOMAIN(control, tsd->lock); + + if (last) + { + the_bird_lock(); + + for (int i=0; i<2; i++) { - loop->poll_changed = 1; - while (e && *psk) - e = sk_write(*psk); + struct birdloop_pickup_group *group = &pickup_groups[i]; + + LOCK_DOMAIN(resource, group->domain); + if (!EMPTY_LIST(group->loops)) + if (tsd->show_loops) + { + cli_printf(tsd->cli, -1026, "Unassigned loops"); + WALK_LIST(loop, group->loops) + cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS); + } + else + { + uint count = 0; + u64 total_time_ns = 0; + WALK_LIST(loop, group->loops) + { + count++; + total_time_ns += loop->total_time_spent_ns; + } + cli_printf(tsd->cli, -1026, "Unassigned loops: %d, total time %t", count, total_time_ns NS); + } + UNLOCK_DOMAIN(resource, group->domain); } + + cli_write_trigger(tsd->cli); + DOMAIN_FREE(control, tsd->lock); + rfree(tsd->pool); + + the_bird_unlock(); } } +void +cmd_show_threads(int show_loops) +{ + pool *p = rp_new(&root_pool, "Show Threads"); + + struct bird_thread_show_data *tsd = mb_allocz(p, sizeof(struct bird_thread_show_data)); + tsd->lock = DOMAIN_NEW(control, "Show Threads"); + tsd->cli = this_cli; + tsd->pool = p; + tsd->show_loops = show_loops; + + this_cli->cont = bird_thread_show_cli_cont; + this_cli->cleanup = bird_thread_show_cli_cleanup; + + for (int i=0; i<2; i++) + { + struct birdloop_pickup_group *group = &pickup_groups[i]; + + LOCK_DOMAIN(control, tsd->lock); + LOCK_DOMAIN(resource, group->domain); + + struct bird_thread *thr; + WALK_LIST(thr, group->threads) + { + tsd->total++; + ev_send(&thr->priority_events, ev_new_init(p, bird_thread_show, tsd)); + wakeup_do_kick(thr); + } + + UNLOCK_DOMAIN(resource, group->domain); + UNLOCK_DOMAIN(control, tsd->lock); + } +} + /* * Birdloop */ -struct birdloop main_birdloop; +static struct bird_thread main_thread; +struct birdloop main_birdloop = { .thread = &main_thread, }; static void birdloop_enter_locked(struct birdloop *loop); void birdloop_init(void) { - wakeup_init(&main_birdloop); + ns_init(); + + for (int i=0; i<2; i++) + { + struct birdloop_pickup_group *group = &pickup_groups[i]; + + group->domain = DOMAIN_NEW(resource, "Loop Pickup"); + init_list(&group->loops); + init_list(&group->threads); + } + + wakeup_init(main_birdloop.thread); main_birdloop.time.domain = the_bird_domain.the_bird; main_birdloop.time.loop = &main_birdloop; @@ -366,85 +1062,177 @@ birdloop_init(void) times_update(); timers_init(&main_birdloop.time, &root_pool); - root_pool.loop = &main_birdloop; - main_birdloop.pool = &root_pool; - birdloop_enter_locked(&main_birdloop); } -static void birdloop_main(void *arg); - -void -birdloop_free(resource *r) +static void +birdloop_stop_internal(struct birdloop *loop) { - struct birdloop *loop = (void *) r; + LOOP_TRACE(loop, "Stopping"); - ASSERT_DIE(loop->links == 0); - domain_free(loop->time.domain); -} + /* Block incoming pings */ + u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + while (!atomic_compare_exchange_strong_explicit( + &loop->thread_transition, <t, LTT_PING, + memory_order_acq_rel, memory_order_acquire)) + ; -void -birdloop_dump(resource *r) -{ - struct birdloop *loop = (void *) r; + /* Flush remaining events */ + ASSERT_DIE(!ev_run_list(&loop->event_list)); + + /* Drop timers */ + timer *t; + while (t = timers_first(&loop->time)) + tm_stop(t); - debug("%s\n", loop->pool->name); + /* Drop sockets */ + sock *s; + WALK_LIST_FIRST2(s, n, loop->sock_list) + birdloop_remove_socket(loop, s); + + /* Unschedule from Meta */ + ev_postpone(&loop->event); + tm_stop(&loop->timer); + + /* Remove from thread loop list */ + rem_node(&loop->n); + loop->thread = NULL; + + /* Leave the loop context without causing any other fuss */ + ASSERT_DIE(!ev_active(&loop->event)); + loop->ping_pending = 0; + birdloop_leave(loop); + + /* Request local socket reload */ + this_thread->sock_changed++; + + /* Tail-call the stopped hook */ + loop->stopped(loop->stop_data); } -struct resmem birdloop_memsize(resource *r) +static void +birdloop_run(void *_loop) { - struct birdloop *loop = (void *) r; + /* Run priority events before the loop is executed */ + ev_run_list(&this_thread->priority_events); + + u64 start_time = ns_now(); + u64 end_time = start_time + this_thread->max_loop_time_ns; + + struct birdloop *loop = _loop; + birdloop_enter(loop); + + u64 locked_time = ns_now(), task_done_time; + if (locked_time > end_time) + LOOP_WARN(loop, "locked %luns after its scheduled end time", locked_time - end_time); + + uint repeat, loop_runs = 0; + do { + repeat = 0; + LOOP_TRACE(loop, "Regular run"); + loop_runs++; + + if (loop->stopped) + /* Birdloop left inside the helper function */ + return birdloop_stop_internal(loop); + + /* Process sockets */ + repeat += sockets_fire(loop); - return (struct resmem) { - .effective = sizeof(struct birdloop) - sizeof(resource) - ALLOC_OVERHEAD, - .overhead = ALLOC_OVERHEAD + sizeof(resource) + page_size * list_length(&loop->pages.list), - }; + /* Run timers */ + timers_fire(&loop->time, 0); + + /* Run flag handlers */ + repeat += birdloop_process_flags(loop); + + /* Run events */ + repeat += ev_run_list(&loop->event_list); + + /* Check end time */ + } while (((task_done_time = ns_now()) < end_time) && repeat); + + /* Request meta timer */ + timer *t = timers_first(&loop->time); + if (t) + tm_start_in(&loop->timer, tm_remains(t), this_thread->meta); + else + tm_stop(&loop->timer); + + /* Request re-run if needed */ + if (repeat) + ev_send_loop(this_thread->meta, &loop->event); + + /* Collect socket change requests */ + this_thread->sock_changed += loop->sock_changed; + loop->sock_changed = 0; + + birdloop_leave(loop); } -struct resclass birdloop_class = { - .name = "IO Loop", - .size = sizeof(struct birdloop), - .free = birdloop_free, - .dump = birdloop_dump, - .memsize = birdloop_memsize, -}; +static void +birdloop_run_timer(timer *tm) +{ + struct birdloop *loop = tm->data; + LOOP_TRACE(loop, "Timer ready, requesting run"); + ev_send_loop(loop->thread->meta, &loop->event); +} -struct birdloop * -birdloop_new(pool *pp, uint order, const char *name) +static struct birdloop * +birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup, struct birdloop_pickup_group *group) { struct domain_generic *dg = domain_new(name, order); - struct birdloop *loop = ralloc(pp, &birdloop_class); + pool *p = rp_new(pp, name); + struct birdloop *loop = mb_allocz(p, sizeof(struct birdloop)); + loop->pool = p; loop->time.domain = dg; loop->time.loop = loop; - birdloop_enter(loop); + atomic_store_explicit(&loop->thread_transition, 0, memory_order_relaxed); - loop->pool = rp_new(pp, loop, name); - loop->parent = pp; - rmove(&loop->r, loop->pool); + birdloop_enter(loop); - wakeup_init(loop); ev_init_list(&loop->event_list, loop, name); - timers_init(&loop->time, loop->pool); + timers_init(&loop->time, p); sockets_init(loop); - init_pages(loop); + loop->event = (event) { .hook = birdloop_run, .data = loop, }; + loop->timer = (timer) { .hook = birdloop_run_timer, .data = loop, }; - loop->time.coro = coro_run(loop->pool, birdloop_main, loop); + if (request_pickup) + { + LOCK_DOMAIN(resource, group->domain); + add_tail(&group->loops, &loop->n); + if (EMPTY_LIST(group->threads)) + bird_thread_start(group); + + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads))); + UNLOCK_DOMAIN(resource, group->domain); + } + else + loop->n.next = loop->n.prev = &loop->n; birdloop_leave(loop); return loop; } +struct birdloop * +birdloop_new(pool *pp, uint order, const char *name, btime max_latency) +{ + return birdloop_new_internal(pp, order, name, 1, max_latency ? &pickup_groups[1] : &pickup_groups[0]); +} + static void birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { + LOOP_TRACE(loop, "Stop requested"); + loop->stopped = stopped; loop->stop_data = data; - wakeup_do_kick(loop); + + birdloop_do_ping(loop); } void @@ -464,6 +1252,15 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat birdloop_do_stop(loop, stopped, data); } +void +birdloop_free(struct birdloop *loop) +{ + ASSERT_DIE(loop->thread == NULL); + + domain_free(loop->time.domain); + rfree(loop->pool); +} + static void birdloop_enter_locked(struct birdloop *loop) { @@ -490,6 +1287,14 @@ birdloop_leave_locked(struct birdloop *loop) /* Check the current context */ ASSERT_DIE(birdloop_current == loop); + /* Send pending pings */ + if (loop->ping_pending) + { + LOOP_TRACE(loop, "sending pings on leave"); + loop->ping_pending = 0; + birdloop_do_ping(loop); + } + /* Restore the old context */ birdloop_current = loop->prev_loop; } @@ -514,107 +1319,13 @@ birdloop_unmask_wakeups(struct birdloop *loop) ASSERT_DIE(birdloop_wakeup_masked == loop); birdloop_wakeup_masked = NULL; if (birdloop_wakeup_masked_count) - wakeup_do_kick(loop); + wakeup_do_kick(loop->thread); birdloop_wakeup_masked_count = 0; } void -birdloop_link(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - loop->links++; -} - -void -birdloop_unlink(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - ASSERT_DIE(loop->links); - if (!--loop->links) - birdloop_ping(loop); -} - -static void -birdloop_main(void *arg) +birdloop_yield(void) { - struct birdloop *loop = arg; - timer *t; - int rv, timeout; - - btime loop_begin = current_time(); - - birdloop_enter(loop); - while (1) - { - timers_fire(&loop->time, 0); - if (ev_run_list(&loop->event_list)) - timeout = 0; - else if (t = timers_first(&loop->time)) - timeout = (tm_remains(t) TO_MS) + 1; - else - timeout = -1; - - if (loop->poll_changed) - sockets_prepare(loop); - - btime duration = current_time_update() - loop_begin; - if (duration > config->watchdog_warning) - log(L_WARN "I/O loop cycle took %d ms", (int) (duration TO_MS)); - - birdloop_leave(loop); - - try: - rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout); - if (rv < 0) - { - if (errno == EINTR || errno == EAGAIN) - goto try; - die("poll: %m"); - } - - birdloop_enter(loop); - - if (loop->stopped && !loop->links) - break; - - loop_begin = current_time_update(); - - if (rv) - sockets_fire(loop); - - atomic_exchange_explicit(&loop->ping_sent, 0, memory_order_acq_rel); - } - - /* Flush remaining events */ - ASSERT_DIE(!ev_run_list(&loop->event_list)); - - /* No timers allowed */ - ASSERT_DIE(timers_count(&loop->time) == 0); - ASSERT_DIE(EMPTY_LIST(loop->sock_list)); - ASSERT_DIE(loop->sock_num == 0); - - birdloop_leave(loop); - - /* Lock parent loop */ - pool *parent = loop->parent; - birdloop_enter(parent->loop); - - /* Move the loop temporarily to parent pool */ - birdloop_enter(loop); - rmove(&loop->r, parent); - birdloop_leave(loop); - - /* Announce loop stop */ - loop->stopped(loop->stop_data); - - /* Free the pool and loop */ - birdloop_enter(loop); - rp_free(loop->pool, parent); - flush_pages(loop); - birdloop_leave(loop); - rfree(&loop->r); - - /* And finally leave the parent loop before finishing */ - birdloop_leave(parent->loop); + usleep(100); } |