diff options
Diffstat (limited to 'sysdep/unix/io-loop.c')
-rw-r--r-- | sysdep/unix/io-loop.c | 514 |
1 files changed, 345 insertions, 169 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index cc2e0523..df162c6e 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -31,6 +31,8 @@ #define THREAD_STACK_SIZE 65536 /* To be lowered in near future */ +static struct birdloop *birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup); + /* * Nanosecond time for accounting purposes * @@ -65,6 +67,9 @@ _Thread_local struct birdloop *birdloop_current; static _Thread_local struct birdloop *birdloop_wakeup_masked; static _Thread_local uint birdloop_wakeup_masked_count; +#define LOOP_TRACE(loop, fmt, args...) do { if (config && config->latency_debug) log(L_TRACE "%s (%p): " fmt, domain_name((loop)->time.domain), (loop), ##args); } while (0) +#define THREAD_TRACE(...) do { if (config && config->latency_debug) log(L_TRACE "Thread: " __VA_ARGS__); } while (0) + event_list * birdloop_event_list(struct birdloop *loop) { @@ -190,11 +195,13 @@ pipe_kick(struct pipe *p) } void -pipe_pollin(struct pipe *p, struct pollfd *pfd) +pipe_pollin(struct pipe *p, struct pfd *pfd) { - pfd->fd = p->fd[0]; - pfd->events = POLLIN; - pfd->revents = 0; + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = p->fd[0], + .events = POLLIN, + }; + BUFFER_PUSH(pfd->loop) = NULL; } static inline void @@ -215,28 +222,80 @@ wakeup_do_kick(struct bird_thread *loop) pipe_kick(&loop->wakeup); } -static inline void -birdloop_do_ping(struct birdloop *loop) +static inline _Bool +birdloop_try_ping(struct birdloop *loop, u32 ltt) { - if (!loop->thread) - return; + /* Somebody else is already pinging, be idempotent */ + if (ltt & LTT_PING) + { + LOOP_TRACE(loop, "already being pinged"); + return 0; + } - if (atomic_fetch_add_explicit(&loop->thread->ping_sent, 1, memory_order_acq_rel)) - return; + /* Thread moving is an implicit ping */ + if (ltt & LTT_MOVE) + { + LOOP_TRACE(loop, "ping while moving"); + return 1; + } + + /* No more flags allowed */ + ASSERT_DIE(!ltt); + /* No ping when not picked up */ + if (!loop->thread) + { + LOOP_TRACE(loop, "not picked up yet, can't ping"); + return 1; + } + + /* No ping when masked */ if (loop == birdloop_wakeup_masked) + { + LOOP_TRACE(loop, "wakeup masked, can't ping"); birdloop_wakeup_masked_count++; - else - wakeup_do_kick(loop->thread); + return 1; + } + + /* Send meta event to ping */ + if ((loop != loop->thread->meta) && (loop != &main_birdloop)) + { + LOOP_TRACE(loop, "Ping by meta event to %p", loop->thread->meta); + ev_send_loop(loop->thread->meta, &loop->event); + return 1; + } + + /* Do the real ping */ + LOOP_TRACE(loop, "sending pipe ping"); + wakeup_do_kick(loop->thread); + return 0; +} + +static inline void +birdloop_do_ping(struct birdloop *loop) +{ + /* Register our ping effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_PING, memory_order_acq_rel); + + /* Try to ping in multiple ways */ + if (birdloop_try_ping(loop, ltt)) + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_PING, memory_order_acq_rel); } void birdloop_ping(struct birdloop *loop) { - if (birdloop_inside(loop) && !loop->ping_pending) - loop->ping_pending++; - else + if (!birdloop_inside(loop)) + { + LOOP_TRACE(loop, "ping from outside"); birdloop_do_ping(loop); + } + else + { + LOOP_TRACE(loop, "ping from inside, pending=%d", loop->ping_pending); + if (!loop->ping_pending) + loop->ping_pending++; + } } @@ -254,6 +313,7 @@ sockets_init(struct birdloop *loop) static void sockets_add(struct birdloop *loop, sock *s) { + LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num); add_tail(&loop->sock_list, &s->n); loop->sock_num++; @@ -278,6 +338,8 @@ sockets_remove(struct birdloop *loop, sock *s) return; /* Decouple the socket from the loop at all. */ + LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num); + rem_node(&s->n); loop->sock_num--; if (loop->thread) @@ -302,30 +364,30 @@ sk_stop(sock *s) static inline uint sk_want_events(sock *s) { return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } -static struct pollfd * -sockets_prepare(struct birdloop *loop, struct pollfd *pfd, struct pollfd *end) +void +sockets_prepare(struct birdloop *loop, struct pfd *pfd) { node *n; - loop->pfd = pfd; - WALK_LIST(n, loop->sock_list) { sock *s = SKIP_BACK(sock, n, n); + uint w = sk_want_events(s); - /* Out of space for pfds. Force reallocation. */ - if (pfd >= end) - return NULL; - - s->index = pfd - loop->pfd; + if (!w) + { + s->index = -1; + continue; + } - pfd->fd = s->fd; - pfd->events = sk_want_events(s); - pfd->revents = 0; + s->index = pfd->pfd.used; + LOOP_TRACE(loop, "socket %p poll index is %d", s, s->index); - pfd++; + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = s->fd, + .events = sk_want_events(s), + }; + BUFFER_PUSH(pfd->loop) = loop; } - - return pfd; } int sk_read(sock *s, int revents); @@ -334,10 +396,9 @@ int sk_write(sock *s); static void sockets_fire(struct birdloop *loop) { - struct pollfd *pfd = loop->pfd; - times_update(); + struct pollfd *pfd = loop->thread->pfd->pfd.data; sock *s; node *n, *nxt; WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n) { @@ -378,6 +439,99 @@ static list bird_thread_pickup; static _Thread_local struct bird_thread *this_thread; +static void +birdloop_set_thread(struct birdloop *loop, struct bird_thread *thr) +{ + /* Signal our moving effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_MOVE, memory_order_acq_rel); + ASSERT_DIE((ltt & LTT_MOVE) == 0); + + while (ltt & LTT_PING) + { + birdloop_yield(); + ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + ASSERT_DIE(ltt & LTT_MOVE); + } + /* Now we are free of running pings */ + + if (loop->thread = thr) + add_tail(&thr->loops, &loop->n); + else + { + LOCK_DOMAIN(resource, birdloop_domain); + add_tail(&birdloop_pickup, &loop->n); + UNLOCK_DOMAIN(resource, birdloop_domain); + } + + /* Finished */ + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel); + + /* Request to run by force */ + ev_send_loop(loop->thread->meta, &loop->event); +} + +static struct birdloop * +birdloop_take(void) +{ + struct birdloop *loop = NULL; + + LOCK_DOMAIN(resource, birdloop_domain); + if (!EMPTY_LIST(birdloop_pickup)) + { + /* Take the first loop from the pickup list and unlock */ + loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup)); + rem_node(&loop->n); + UNLOCK_DOMAIN(resource, birdloop_domain); + + birdloop_set_thread(loop, this_thread); + + /* This thread goes to the end of the pickup list */ + LOCK_DOMAIN(resource, birdloop_domain); + rem_node(&this_thread->n); + add_tail(&bird_thread_pickup, &this_thread->n); + + /* If there are more loops to be picked up, wakeup the next thread in order */ + if (!EMPTY_LIST(birdloop_pickup)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); + } + UNLOCK_DOMAIN(resource, birdloop_domain); + + return loop; +} + +static void +birdloop_drop(struct birdloop *loop) +{ + /* Remove loop from this thread's list */ + rem_node(&loop->n); + + /* Unset loop's thread */ + if (birdloop_inside(loop)) + birdloop_set_thread(loop, NULL); + else + { + birdloop_enter(loop); + birdloop_set_thread(loop, NULL); + birdloop_leave(loop); + } + + /* Put loop into pickup list */ + LOCK_DOMAIN(resource, birdloop_domain); + add_tail(&birdloop_pickup, &loop->n); + UNLOCK_DOMAIN(resource, birdloop_domain); +} + +static int +poll_timeout(struct birdloop *loop) +{ + timer *t = timers_first(&loop->time); + if (!t) + return -1; + + btime remains = tm_remains(t); + return remains TO_MS + ((remains TO_MS) MS < remains); +} + static void * bird_thread_main(void *arg) { @@ -389,103 +543,79 @@ bird_thread_main(void *arg) tmp_init(thr->pool); init_list(&thr->loops); - u32 refresh_sockets = 1; + thr->meta = birdloop_new_internal(thr->pool, DOMAIN_ORDER(meta), "Thread Meta", 0); + thr->meta->thread = thr; + birdloop_enter(thr->meta); - struct pollfd *pfd, *end; + u32 refresh_sockets = 1; + struct pfd pfd; + BUFFER_INIT(pfd.pfd, thr->pool, 16); + BUFFER_INIT(pfd.loop, thr->pool, 16); + thr->pfd = &pfd; while (1) { - /* Wakeup at least once a minute. */ - int timeout = 60000; + int timeout; /* Pickup new loops */ - LOCK_DOMAIN(resource, birdloop_domain); - if (!EMPTY_LIST(birdloop_pickup)) + struct birdloop *loop = birdloop_take(); + if (loop) { - struct birdloop *loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup)); - rem_node(&loop->n); - UNLOCK_DOMAIN(resource, birdloop_domain); - - add_tail(&thr->loops, &loop->n); - birdloop_enter(loop); - loop->thread = thr; if (!EMPTY_LIST(loop->sock_list)) refresh_sockets = 1; birdloop_leave(loop); + } - /* If there are more loops to be picked up, wakeup the next thread */ - LOCK_DOMAIN(resource, birdloop_domain); - rem_node(&thr->n); - add_tail(&bird_thread_pickup, &thr->n); + /* Schedule all loops with timed out timers */ + timers_fire(&thr->meta->time, 0); - if (!EMPTY_LIST(birdloop_pickup)) - wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); + /* Run all scheduled loops */ + int more_events = ev_run_list(&thr->meta->event_list); + if (more_events) + { + THREAD_TRACE("More events to run"); + timeout = 0; } - UNLOCK_DOMAIN(resource, birdloop_domain); - - struct birdloop *loop; node *nn; - WALK_LIST2(loop, nn, thr->loops, n) + else { - birdloop_enter(loop); - u64 after_enter = ns_now(); - - timer *t; - - times_update(); - timers_fire(&loop->time, 0); - int again = birdloop_process_flags(loop) + ev_run_list(&loop->event_list); - -#if 0 - if (loop->n.next->next) - __builtin_prefetch(SKIP_BACK(struct birdloop, n, loop->n.next)->time.domain); -#endif - - if (again) - timeout = MIN(0, timeout); - else if (t = timers_first(&loop->time)) - timeout = MIN(((tm_remains(t) TO_MS) + 1), timeout); - - u64 before_leave = ns_now(); - loop->total_time_spent_ns += (before_leave - after_enter); - birdloop_leave(loop); - - ev_run_list(&thr->priority_events); + timeout = poll_timeout(thr->meta); + if (timeout == -1) + THREAD_TRACE("No timers, no events"); + else + THREAD_TRACE("Next timer in %d ms", timeout); } - refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel); + /* Run priority events before sleeping */ + ev_run_list(&thr->priority_events); - if (!refresh_sockets && ((timeout < 0) || (timeout > 5000))) - flush_local_pages(); + /* Do we have to refresh sockets? */ + refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel); - while (refresh_sockets) + if (refresh_sockets) { -sock_retry:; - end = (pfd = thr->pfd) + thr->pfd_max; + BUFFER_FLUSH(pfd.pfd); + BUFFER_FLUSH(pfd.loop); - /* Add internal wakeup fd */ - pipe_pollin(&thr->wakeup, pfd); - pfd++; + pipe_pollin(&thr->wakeup, &pfd); + node *nn; WALK_LIST2(loop, nn, thr->loops, n) { birdloop_enter(loop); - pfd = sockets_prepare(loop, pfd, end); + sockets_prepare(loop, &pfd); birdloop_leave(loop); - - if (!pfd) - { - mb_free(thr->pfd); - thr->pfd = mb_alloc(thr->pool, sizeof(struct pollfd) * (thr->pfd_max *= 2)); - goto sock_retry; - } } + ASSERT_DIE(pfd.loop.used == pfd.pfd.used); refresh_sockets = 0; } + /* Nothing to do in at least 5 seconds, flush local hot page cache */ + else if (timeout > 5000) + flush_local_pages(); poll_retry:; - int rv = poll(thr->pfd, pfd - thr->pfd, timeout); + int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout); if (rv < 0) { if (errno == EINTR || errno == EAGAIN) @@ -494,51 +624,26 @@ poll_retry:; } /* Drain wakeup fd */ - if (thr->pfd[0].revents & POLLIN) + if (pfd.pfd.data[0].revents & POLLIN) { ASSERT_DIE(rv > 0); rv--; wakeup_drain(thr); } - atomic_exchange_explicit(&thr->ping_sent, 0, memory_order_acq_rel); + atomic_fetch_and_explicit(&thr->meta->thread_transition, ~LTT_PING, memory_order_acq_rel); - if (!rv && !atomic_exchange_explicit(&thr->run_cleanup, 0, memory_order_acq_rel)) - continue; - - /* Process stops and regular sockets */ - node *nxt; - WALK_LIST2_DELSAFE(loop, nn, nxt, thr->loops, n) - { - birdloop_enter(loop); - - if (loop->stopped) - { - /* Flush remaining events */ - ASSERT_DIE(!ev_run_list(&loop->event_list)); - - /* Drop timers */ - timer *t; - while (t = timers_first(&loop->time)) - tm_stop(t); - - /* No sockets allowed */ - ASSERT_DIE(EMPTY_LIST(loop->sock_list)); - - /* Declare loop stopped */ - rem_node(&loop->n); - birdloop_leave(loop); - loop->stopped(loop->stop_data); - - /* Birdloop already left */ - continue; - } - else if (rv) - sockets_fire(loop); - - birdloop_leave(loop); - } + /* Schedule loops with active sockets */ + if (rv) + for (uint i = 1; i < pfd.pfd.used; i++) + if (pfd.pfd.data[i].revents) + { + LOOP_TRACE(pfd.loop.data[i], "socket id %d got revents=%d", i, pfd.pfd.data[i].revents); + ev_send_loop(thr->meta, &pfd.loop.data[i]->event); + } } + + bug("An infinite loop has ended."); } static void @@ -563,11 +668,8 @@ bird_thread_start(void) struct bird_thread *thr = mb_allocz(p, sizeof(*thr)); thr->pool = p; - thr->pfd = mb_alloc(p, sizeof(struct pollfd) * (thr->pfd_max = 16)); thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, }; - atomic_store_explicit(&thr->ping_sent, 0, memory_order_relaxed); - wakeup_init(thr); ev_init_list(&thr->priority_events, NULL, "Thread direct event list"); @@ -615,7 +717,7 @@ bird_thread_shutdown(void * _ UNUSED) UNLOCK_DOMAIN(resource, birdloop_domain); - log(L_INFO "Thread pickup size differs from dropper goal by %d%s", dif, tdl_stop ? ", stopping" : ""); + DBG("Thread pickup size differs from dropper goal by %d%s\n", dif, tdl_stop ? ", stopping" : ""); if (tdl_stop) { @@ -628,29 +730,11 @@ bird_thread_shutdown(void * _ UNUSED) /* Leave the thread-picker list to get no more loops */ LOCK_DOMAIN(resource, birdloop_domain); rem_node(&thr->n); + UNLOCK_DOMAIN(resource, birdloop_domain); /* Drop loops including the thread dropper itself */ while (!EMPTY_LIST(thr->loops)) - { - /* Remove loop from this thread's list */ - struct birdloop *loop = HEAD(thr->loops); - rem_node(&loop->n); - UNLOCK_DOMAIN(resource, birdloop_domain); - - /* Unset loop's thread */ - if (birdloop_inside(loop)) - loop->thread = NULL; - else - { - birdloop_enter(loop); - loop->thread = NULL; - birdloop_leave(loop); - } - - /* Put loop into pickup list */ - LOCK_DOMAIN(resource, birdloop_domain); - add_tail(&birdloop_pickup, &loop->n); - } + birdloop_drop(HEAD(thr->loops)); /* Let others know about new loops */ if (!EMPTY_LIST(birdloop_pickup)) @@ -660,6 +744,11 @@ bird_thread_shutdown(void * _ UNUSED) /* Leave the thread-dropper loop as we aren't going to return. */ birdloop_leave(thread_dropper); + /* Stop the meta loop */ + birdloop_leave(thr->meta); + domain_free(thr->meta->time.domain); + rfree(thr->meta->pool); + /* Local pages not needed anymore */ flush_local_pages(); @@ -865,8 +954,81 @@ birdloop_init(void) birdloop_enter_locked(&main_birdloop); } -struct birdloop * -birdloop_new(pool *pp, uint order, const char *name) +static void +birdloop_stop_internal(struct birdloop *loop) +{ + /* Flush remaining events */ + ASSERT_DIE(!ev_run_list(&loop->event_list)); + + /* Drop timers */ + timer *t; + while (t = timers_first(&loop->time)) + tm_stop(t); + + /* No sockets allowed */ + ASSERT_DIE(EMPTY_LIST(loop->sock_list)); + + /* Unschedule from Meta */ + ev_postpone(&loop->event); + tm_stop(&loop->timer); + + /* Declare loop stopped */ + rem_node(&loop->n); + birdloop_leave(loop); + + /* Tail-call the stopped hook */ + loop->stopped(loop->stop_data); +} + +static void +birdloop_run(void *_loop) +{ + /* Run priority events before the loop is executed */ + ev_run_list(&this_thread->priority_events); + + struct birdloop *loop = _loop; + birdloop_enter(loop); + + if (loop->stopped) + /* Birdloop left inside the helper function */ + return birdloop_stop_internal(loop); + + /* Process sockets */ + sockets_fire(loop); + + /* Run timers */ + timers_fire(&loop->time, 0); + + /* Run flag handlers */ + if (birdloop_process_flags(loop)) + { + LOOP_TRACE(loop, "Flag processing needs another run"); + ev_send_loop(this_thread->meta, &loop->event); + } + + /* Run events */ + ev_run_list(&loop->event_list); + + /* Request meta timer */ + timer *t = timers_first(&loop->time); + if (t) + tm_start_in(&loop->timer, tm_remains(t), this_thread->meta); + else + tm_stop(&loop->timer); + + birdloop_leave(loop); +} + +static void +birdloop_run_timer(timer *tm) +{ + struct birdloop *loop = tm->data; + LOOP_TRACE(loop, "Timer ready, requesting run"); + ev_send_loop(loop->thread->meta, &loop->event); +} + +static struct birdloop * +birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup) { struct domain_generic *dg = domain_new(name, order); @@ -877,32 +1039,45 @@ birdloop_new(pool *pp, uint order, const char *name) loop->time.domain = dg; loop->time.loop = loop; + atomic_store_explicit(&loop->thread_transition, 0, memory_order_relaxed); + birdloop_enter(loop); ev_init_list(&loop->event_list, loop, name); timers_init(&loop->time, p); sockets_init(loop); - LOCK_DOMAIN(resource, birdloop_domain); - add_tail(&birdloop_pickup, &loop->n); - wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); - UNLOCK_DOMAIN(resource, birdloop_domain); + loop->event = (event) { .hook = birdloop_run, .data = loop, }; + loop->timer = (timer) { .hook = birdloop_run_timer, .data = loop, }; + + if (request_pickup) + { + LOCK_DOMAIN(resource, birdloop_domain); + add_tail(&birdloop_pickup, &loop->n); + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); + UNLOCK_DOMAIN(resource, birdloop_domain); + } + else + loop->n.next = loop->n.prev = &loop->n; birdloop_leave(loop); return loop; } +struct birdloop * +birdloop_new(pool *pp, uint order, const char *name) +{ + return birdloop_new_internal(pp, order, name, 1); +} + static void birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { loop->stopped = stopped; loop->stop_data = data; - if (loop->thread) - { - atomic_store_explicit(&loop->thread->run_cleanup, 1, memory_order_release); - wakeup_do_kick(loop->thread); - } + + birdloop_do_ping(loop); } void @@ -961,6 +1136,7 @@ birdloop_leave_locked(struct birdloop *loop) /* Send pending pings */ if (loop->ping_pending) { + LOOP_TRACE(loop, "sending pings on leave"); loop->ping_pending = 0; birdloop_do_ping(loop); } |