diff options
Diffstat (limited to 'sysdep')
-rw-r--r-- | sysdep/unix/config.Y | 11 | ||||
-rw-r--r-- | sysdep/unix/domain.c | 6 | ||||
-rw-r--r-- | sysdep/unix/io-loop.c | 773 | ||||
-rw-r--r-- | sysdep/unix/io-loop.h | 46 | ||||
-rw-r--r-- | sysdep/unix/io.c | 6 | ||||
-rw-r--r-- | sysdep/unix/main.c | 4 | ||||
-rw-r--r-- | sysdep/unix/unix.h | 3 |
7 files changed, 630 insertions, 219 deletions
diff --git a/sysdep/unix/config.Y b/sysdep/unix/config.Y index 5c4b5bef..a50ec757 100644 --- a/sysdep/unix/config.Y +++ b/sysdep/unix/config.Y @@ -101,6 +101,12 @@ mrtdump_base: ; +conf: THREADS expr { + if ($2 < 1) cf_error("Number of threads must be at least one."); + new_config->thread_count = $2; +} + + conf: debug_unix ; debug_unix: @@ -145,6 +151,11 @@ CF_CLI_HELP(GRACEFUL, restart, [[Shut the daemon down for graceful restart]]) CF_CLI(GRACEFUL RESTART,,, [[Shut the daemon down for graceful restart]]) { cmd_graceful_restart(); } ; +CF_CLI(SHOW THREADS,,, [[Write out thread information]]) +{ cmd_show_threads(0); } ; + +CF_CLI(SHOW THREADS ALL,,, [[Write out thread and IO loop information]]) +{ cmd_show_threads(1); } ; cfg_name: /* empty */ { $$ = NULL; } diff --git a/sysdep/unix/domain.c b/sysdep/unix/domain.c index 0a5858a6..1cba540b 100644 --- a/sysdep/unix/domain.c +++ b/sysdep/unix/domain.c @@ -70,6 +70,12 @@ domain_free(struct domain_generic *dg) xfree(dg); } +const char * +domain_name(struct domain_generic *dg) +{ + return dg->name; +} + uint dg_order(struct domain_generic *dg) { return dg->order; diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index 5ce2d350..cc2e0523 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -18,6 +18,7 @@ #include "lib/buffer.h" #include "lib/lists.h" +#include "lib/locking.h" #include "lib/resource.h" #include "lib/event.h" #include "lib/timer.h" @@ -26,10 +27,37 @@ #include "lib/io-loop.h" #include "sysdep/unix/io-loop.h" #include "conf/conf.h" +#include "nest/cli.h" #define THREAD_STACK_SIZE 65536 /* To be lowered in near future */ /* + * Nanosecond time for accounting purposes + * + * A fixed point on startup is set as zero, all other values are relative to that. + * Caution: this overflows after like 500 years or so. If you plan to run + * BIRD for such a long time, please implement some means of overflow prevention. + */ + +static struct timespec ns_begin; + +static void ns_init(void) +{ + if (clock_gettime(CLOCK_MONOTONIC, &ns_begin)) + bug("clock_gettime: %m"); +} + +static u64 ns_now(void) +{ + struct timespec ts; + if (clock_gettime(CLOCK_MONOTONIC, &ts)) + bug("clock_gettime: %m"); + + return (u64) (ts.tv_sec - ns_begin.tv_sec) * 1000000000 + ts.tv_nsec - ns_begin.tv_nsec; +} + + +/* * Current thread context */ @@ -59,6 +87,12 @@ birdloop_inside(struct birdloop *loop) return 0; } +_Bool +birdloop_in_this_thread(struct birdloop *loop) +{ + return pthread_equal(pthread_self(), loop->thread->thread_id); +} + void birdloop_flag(struct birdloop *loop, u32 flag) { @@ -80,22 +114,10 @@ birdloop_process_flags(struct birdloop *loop) return 0; u32 flags = atomic_exchange_explicit(&loop->flags, 0, memory_order_acq_rel); - loop->flag_handler->hook(loop->flag_handler, flags); - return !!flags; -} - -static int -birdloop_run_events(struct birdloop *loop) -{ - btime begin = current_time(); - while (current_time() - begin < 5 MS) - { - if (!ev_run_list(&loop->event_list)) - return 0; - - times_update(); - } + if (!flags) + return 0; + loop->flag_handler->hook(loop->flag_handler, flags); return 1; } @@ -160,7 +182,7 @@ pipe_kick(struct pipe *p) while (1) { rv = write(p->fd[1], &v, sizeof(v)); - if ((rv >= 0) || (errno == EAGAIN)) + if ((rv >= 0) || (errno == EAGAIN)) return; if (errno != EINTR) bug("wakeup write: %m"); @@ -176,19 +198,19 @@ pipe_pollin(struct pipe *p, struct pollfd *pfd) } static inline void -wakeup_init(struct birdloop *loop) +wakeup_init(struct bird_thread *loop) { pipe_new(&loop->wakeup); } static inline void -wakeup_drain(struct birdloop *loop) +wakeup_drain(struct bird_thread *loop) { pipe_drain(&loop->wakeup); } static inline void -wakeup_do_kick(struct birdloop *loop) +wakeup_do_kick(struct bird_thread *loop) { pipe_kick(&loop->wakeup); } @@ -196,13 +218,16 @@ wakeup_do_kick(struct birdloop *loop) static inline void birdloop_do_ping(struct birdloop *loop) { - if (atomic_fetch_add_explicit(&loop->ping_sent, 1, memory_order_acq_rel)) + if (!loop->thread) + return; + + if (atomic_fetch_add_explicit(&loop->thread->ping_sent, 1, memory_order_acq_rel)) return; if (loop == birdloop_wakeup_masked) birdloop_wakeup_masked_count++; else - wakeup_do_kick(loop); + wakeup_do_kick(loop->thread); } void @@ -224,10 +249,6 @@ sockets_init(struct birdloop *loop) { init_list(&loop->sock_list); loop->sock_num = 0; - - BUFFER_INIT(loop->poll_sk, loop->pool, 4); - BUFFER_INIT(loop->poll_fd, loop->pool, 4); - loop->poll_changed = 1; /* add wakeup fd */ } static void @@ -237,7 +258,8 @@ sockets_add(struct birdloop *loop, sock *s) loop->sock_num++; s->index = -1; - loop->poll_changed = 1; + if (loop->thread) + atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); birdloop_ping(loop); } @@ -255,19 +277,20 @@ sockets_remove(struct birdloop *loop, sock *s) if (!enlisted(&s->n)) return; + /* Decouple the socket from the loop at all. */ rem_node(&s->n); loop->sock_num--; + if (loop->thread) + atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); - if (s->index >= 0) - { - loop->poll_sk.data[s->index] = NULL; - s->index = -1; - loop->poll_changed = 1; - loop->close_scheduled = 1; - birdloop_ping(loop); - } - else - close(s->fd); + s->index = -1; + + /* Close the filedescriptor. If it ever gets into the poll(), it just returns + * POLLNVAL for this fd which then is ignored because nobody checks for + * that result. Or some other routine opens another fd, getting this number, + * yet also in this case poll() at worst spuriously returns and nobody checks + * for the result in this fd. No further precaution is needed. */ + close(s->fd); } void @@ -279,123 +302,559 @@ sk_stop(sock *s) static inline uint sk_want_events(sock *s) { return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } -/* -FIXME: this should be called from sock code - -static void -sockets_update(struct birdloop *loop, sock *s) -{ - if (s->index >= 0) - loop->poll_fd.data[s->index].events = sk_want_events(s); -} -*/ - -static void -sockets_prepare(struct birdloop *loop) +static struct pollfd * +sockets_prepare(struct birdloop *loop, struct pollfd *pfd, struct pollfd *end) { - BUFFER_SET(loop->poll_sk, loop->sock_num + 1); - BUFFER_SET(loop->poll_fd, loop->sock_num + 1); - - struct pollfd *pfd = loop->poll_fd.data; - sock **psk = loop->poll_sk.data; - uint i = 0; node *n; + loop->pfd = pfd; WALK_LIST(n, loop->sock_list) { sock *s = SKIP_BACK(sock, n, n); - ASSERT(i < loop->sock_num); + /* Out of space for pfds. Force reallocation. */ + if (pfd >= end) + return NULL; + + s->index = pfd - loop->pfd; - s->index = i; - *psk = s; pfd->fd = s->fd; pfd->events = sk_want_events(s); pfd->revents = 0; pfd++; - psk++; - i++; } - ASSERT(i == loop->sock_num); + return pfd; +} + +int sk_read(sock *s, int revents); +int sk_write(sock *s); + +static void +sockets_fire(struct birdloop *loop) +{ + struct pollfd *pfd = loop->pfd; + + times_update(); + + sock *s; node *n, *nxt; + WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n) + { + if (s->index < 0) + continue; + + int rev = pfd[s->index].revents; + + if (!rev) + continue; + + if (rev & POLLNVAL) + bug("poll: invalid fd %d", s->fd); + + int e = 1; + + if (rev & POLLIN) + while (e && s->rx_hook) + e = sk_read(s, rev); + + if (rev & POLLOUT) + { + atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release); + while (e = sk_write(s)) + ; + } + } +} + +/* + * Threads + */ + +DEFINE_DOMAIN(resource); +static DOMAIN(resource) birdloop_domain; +static list birdloop_pickup; +static list bird_thread_pickup; + +static _Thread_local struct bird_thread *this_thread; + +static void * +bird_thread_main(void *arg) +{ + struct bird_thread *thr = this_thread = arg; + + rcu_thread_start(&thr->rcu); + synchronize_rcu(); + + tmp_init(thr->pool); + init_list(&thr->loops); + + u32 refresh_sockets = 1; + + struct pollfd *pfd, *end; + + while (1) + { + /* Wakeup at least once a minute. */ + int timeout = 60000; + + /* Pickup new loops */ + LOCK_DOMAIN(resource, birdloop_domain); + if (!EMPTY_LIST(birdloop_pickup)) + { + struct birdloop *loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup)); + rem_node(&loop->n); + UNLOCK_DOMAIN(resource, birdloop_domain); + + add_tail(&thr->loops, &loop->n); + + birdloop_enter(loop); + loop->thread = thr; + if (!EMPTY_LIST(loop->sock_list)) + refresh_sockets = 1; + birdloop_leave(loop); + + /* If there are more loops to be picked up, wakeup the next thread */ + LOCK_DOMAIN(resource, birdloop_domain); + rem_node(&thr->n); + add_tail(&bird_thread_pickup, &thr->n); + + if (!EMPTY_LIST(birdloop_pickup)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); + } + UNLOCK_DOMAIN(resource, birdloop_domain); + + struct birdloop *loop; node *nn; + WALK_LIST2(loop, nn, thr->loops, n) + { + birdloop_enter(loop); + u64 after_enter = ns_now(); + + timer *t; + + times_update(); + timers_fire(&loop->time, 0); + int again = birdloop_process_flags(loop) + ev_run_list(&loop->event_list); + +#if 0 + if (loop->n.next->next) + __builtin_prefetch(SKIP_BACK(struct birdloop, n, loop->n.next)->time.domain); +#endif + + if (again) + timeout = MIN(0, timeout); + else if (t = timers_first(&loop->time)) + timeout = MIN(((tm_remains(t) TO_MS) + 1), timeout); + + u64 before_leave = ns_now(); + loop->total_time_spent_ns += (before_leave - after_enter); + birdloop_leave(loop); + + ev_run_list(&thr->priority_events); + } + + refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel); - /* Add internal wakeup fd */ - *psk = NULL; - pipe_pollin(&loop->wakeup, pfd); + if (!refresh_sockets && ((timeout < 0) || (timeout > 5000))) + flush_local_pages(); - loop->poll_changed = 0; + while (refresh_sockets) + { +sock_retry:; + end = (pfd = thr->pfd) + thr->pfd_max; + + /* Add internal wakeup fd */ + pipe_pollin(&thr->wakeup, pfd); + pfd++; + + WALK_LIST2(loop, nn, thr->loops, n) + { + birdloop_enter(loop); + pfd = sockets_prepare(loop, pfd, end); + birdloop_leave(loop); + + if (!pfd) + { + mb_free(thr->pfd); + thr->pfd = mb_alloc(thr->pool, sizeof(struct pollfd) * (thr->pfd_max *= 2)); + goto sock_retry; + } + } + + refresh_sockets = 0; + } + +poll_retry:; + int rv = poll(thr->pfd, pfd - thr->pfd, timeout); + if (rv < 0) + { + if (errno == EINTR || errno == EAGAIN) + goto poll_retry; + bug("poll in %p: %m", thr); + } + + /* Drain wakeup fd */ + if (thr->pfd[0].revents & POLLIN) + { + ASSERT_DIE(rv > 0); + rv--; + wakeup_drain(thr); + } + + atomic_exchange_explicit(&thr->ping_sent, 0, memory_order_acq_rel); + + if (!rv && !atomic_exchange_explicit(&thr->run_cleanup, 0, memory_order_acq_rel)) + continue; + + /* Process stops and regular sockets */ + node *nxt; + WALK_LIST2_DELSAFE(loop, nn, nxt, thr->loops, n) + { + birdloop_enter(loop); + + if (loop->stopped) + { + /* Flush remaining events */ + ASSERT_DIE(!ev_run_list(&loop->event_list)); + + /* Drop timers */ + timer *t; + while (t = timers_first(&loop->time)) + tm_stop(t); + + /* No sockets allowed */ + ASSERT_DIE(EMPTY_LIST(loop->sock_list)); + + /* Declare loop stopped */ + rem_node(&loop->n); + birdloop_leave(loop); + loop->stopped(loop->stop_data); + + /* Birdloop already left */ + continue; + } + else if (rv) + sockets_fire(loop); + + birdloop_leave(loop); + } + } } static void -sockets_close_fds(struct birdloop *loop) +bird_thread_cleanup(void *_thr) { - struct pollfd *pfd = loop->poll_fd.data; - sock **psk = loop->poll_sk.data; - int poll_num = loop->poll_fd.used - 1; + struct bird_thread *thr = _thr; + ASSERT_DIE(birdloop_inside(&main_birdloop)); - int i; - for (i = 0; i < poll_num; i++) - if (psk[i] == NULL) - close(pfd[i].fd); + /* Thread attributes no longer needed */ + pthread_attr_destroy(&thr->thread_attr); - loop->close_scheduled = 0; + /* Free all remaining memory */ + rfree(thr->pool); } -int sk_read(sock *s, int revents); -int sk_write(sock *s); +static struct bird_thread * +bird_thread_start(void) +{ + ASSERT_DIE(birdloop_inside(&main_birdloop)); + + pool *p = rp_new(&root_pool, "Thread"); + + struct bird_thread *thr = mb_allocz(p, sizeof(*thr)); + thr->pool = p; + thr->pfd = mb_alloc(p, sizeof(struct pollfd) * (thr->pfd_max = 16)); + thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, }; + + atomic_store_explicit(&thr->ping_sent, 0, memory_order_relaxed); + + wakeup_init(thr); + ev_init_list(&thr->priority_events, NULL, "Thread direct event list"); + + LOCK_DOMAIN(resource, birdloop_domain); + add_tail(&bird_thread_pickup, &thr->n); + UNLOCK_DOMAIN(resource, birdloop_domain); + + int e = 0; + + if (e = pthread_attr_init(&thr->thread_attr)) + die("pthread_attr_init() failed: %M", e); + + /* We don't have to worry about thread stack size so much. + if (e = pthread_attr_setstacksize(&thr->thread_attr, THREAD_STACK_SIZE)) + die("pthread_attr_setstacksize(%u) failed: %M", THREAD_STACK_SIZE, e); + */ + + if (e = pthread_attr_setdetachstate(&thr->thread_attr, PTHREAD_CREATE_DETACHED)) + die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e); + + if (e = pthread_create(&thr->thread_id, &thr->thread_attr, bird_thread_main, thr)) + die("pthread_create() failed: %M", e); + + return thr; +} + +static struct birdloop *thread_dropper; +static event *thread_dropper_event; +static uint thread_dropper_goal; static void -sockets_fire(struct birdloop *loop) +bird_thread_shutdown(void * _ UNUSED) { - struct pollfd *pfd = loop->poll_fd.data; - sock **psk = loop->poll_sk.data; - int poll_num = loop->poll_fd.used - 1; + LOCK_DOMAIN(resource, birdloop_domain); + int dif = list_length(&bird_thread_pickup) - thread_dropper_goal; + struct birdloop *tdl_stop = NULL; - times_update(); + if (dif > 0) + ev_send_loop(thread_dropper, thread_dropper_event); + else + { + tdl_stop = thread_dropper; + thread_dropper = NULL; + } - /* Last fd is internal wakeup fd */ - if (pfd[poll_num].revents & POLLIN) - wakeup_drain(loop); + UNLOCK_DOMAIN(resource, birdloop_domain); - int i; - for (i = 0; i < poll_num; pfd++, psk++, i++) + log(L_INFO "Thread pickup size differs from dropper goal by %d%s", dif, tdl_stop ? ", stopping" : ""); + + if (tdl_stop) { - int e = 1; + birdloop_stop_self(tdl_stop, NULL, NULL); + return; + } - if (! pfd->revents) - continue; + struct bird_thread *thr = this_thread; + + /* Leave the thread-picker list to get no more loops */ + LOCK_DOMAIN(resource, birdloop_domain); + rem_node(&thr->n); + + /* Drop loops including the thread dropper itself */ + while (!EMPTY_LIST(thr->loops)) + { + /* Remove loop from this thread's list */ + struct birdloop *loop = HEAD(thr->loops); + rem_node(&loop->n); + UNLOCK_DOMAIN(resource, birdloop_domain); + + /* Unset loop's thread */ + if (birdloop_inside(loop)) + loop->thread = NULL; + else + { + birdloop_enter(loop); + loop->thread = NULL; + birdloop_leave(loop); + } + + /* Put loop into pickup list */ + LOCK_DOMAIN(resource, birdloop_domain); + add_tail(&birdloop_pickup, &loop->n); + } + + /* Let others know about new loops */ + if (!EMPTY_LIST(birdloop_pickup)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); + UNLOCK_DOMAIN(resource, birdloop_domain); + + /* Leave the thread-dropper loop as we aren't going to return. */ + birdloop_leave(thread_dropper); + + /* Local pages not needed anymore */ + flush_local_pages(); + + /* Unregister from RCU */ + rcu_thread_stop(&thr->rcu); + + /* Request thread cleanup from main loop */ + ev_send_loop(&main_birdloop, &thr->cleanup_event); + + /* Exit! */ + pthread_exit(NULL); +} + + +void +bird_thread_commit(struct config *new, struct config *old UNUSED) +{ + ASSERT_DIE(birdloop_inside(&main_birdloop)); - if (pfd->revents & POLLNVAL) - bug("poll: invalid fd %d", pfd->fd); + if (new->shutdown) + return; - if (pfd->revents & POLLIN) - while (e && *psk && (*psk)->rx_hook) - e = sk_read(*psk, pfd->revents); + if (!new->thread_count) + new->thread_count = 1; - e = 1; - if (pfd->revents & POLLOUT) + while (1) + { + LOCK_DOMAIN(resource, birdloop_domain); + int dif = list_length(&bird_thread_pickup) - (thread_dropper_goal = new->thread_count); + _Bool thread_dropper_running = !!thread_dropper; + UNLOCK_DOMAIN(resource, birdloop_domain); + + if (dif < 0) { - loop->poll_changed = 1; - while (e && *psk) - e = sk_write(*psk); + bird_thread_start(); + continue; } + + if ((dif > 0) && !thread_dropper_running) + { + struct birdloop *tdl = birdloop_new(&root_pool, DOMAIN_ORDER(control), "Thread dropper"); + event *tde = ev_new_init(tdl->pool, bird_thread_shutdown, NULL); + + LOCK_DOMAIN(resource, birdloop_domain); + thread_dropper = tdl; + thread_dropper_event = tde; + UNLOCK_DOMAIN(resource, birdloop_domain); + + ev_send_loop(thread_dropper, thread_dropper_event); + } + + return; + } +} + + +DEFINE_DOMAIN(control); + +struct bird_thread_show_data { + cli *cli; + pool *pool; + DOMAIN(control) lock; + uint total; + uint done; + u8 show_loops; +}; + +static void +bird_thread_show_cli_cont(struct cli *c UNUSED) +{ + /* Explicitly do nothing to prevent CLI from trying to parse another command. */ +} + +static int +bird_thread_show_cli_cleanup(struct cli *c UNUSED) +{ + return 1; /* Defer the cleanup until the writeout is finished. */ +} + +static void +bird_thread_show(void *data) +{ + struct bird_thread_show_data *tsd = data; + + LOCK_DOMAIN(control, tsd->lock); + if (tsd->show_loops) + cli_printf(tsd->cli, -1026, "Thread %p", this_thread); + + u64 total_time_ns = 0; + struct birdloop *loop; + WALK_LIST(loop, this_thread->loops) + { + if (tsd->show_loops) + cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS); + total_time_ns += loop->total_time_spent_ns; + } + + tsd->done++; + int last = (tsd->done == tsd->total); + + if (last) + { + tsd->cli->cont = NULL; + tsd->cli->cleanup = NULL; + } + + if (tsd->show_loops) + cli_printf(tsd->cli, (last ? 1 : -1) * 1026, " Total time: %t", total_time_ns NS); + else + cli_printf(tsd->cli, (last ? 1 : -1) * 1026, "Thread %p time %t", this_thread, total_time_ns NS); + + UNLOCK_DOMAIN(control, tsd->lock); + + if (last) + { + the_bird_lock(); + + LOCK_DOMAIN(resource, birdloop_domain); + if (!EMPTY_LIST(birdloop_pickup)) + if (tsd->show_loops) + { + cli_printf(tsd->cli, -1026, "Unassigned loops"); + WALK_LIST(loop, birdloop_pickup) + cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS); + } + else + { + uint count = 0; + u64 total_time_ns = 0; + WALK_LIST(loop, birdloop_pickup) + { + count++; + total_time_ns += loop->total_time_spent_ns; + } + cli_printf(tsd->cli, -1026, "Unassigned loops: %d, total time %t", count, total_time_ns NS); + } + UNLOCK_DOMAIN(resource, birdloop_domain); + + cli_write_trigger(tsd->cli); + DOMAIN_FREE(control, tsd->lock); + rfree(tsd->pool); + + the_bird_unlock(); } } +void +cmd_show_threads(int show_loops) +{ + pool *p = rp_new(&root_pool, "Show Threads"); + + struct bird_thread_show_data *tsd = mb_allocz(p, sizeof(struct bird_thread_show_data)); + tsd->lock = DOMAIN_NEW(control, "Show Threads"); + tsd->cli = this_cli; + tsd->pool = p; + tsd->show_loops = show_loops; + + this_cli->cont = bird_thread_show_cli_cont; + this_cli->cleanup = bird_thread_show_cli_cleanup; + + LOCK_DOMAIN(control, tsd->lock); + LOCK_DOMAIN(resource, birdloop_domain); + + struct bird_thread *thr; + WALK_LIST(thr, bird_thread_pickup) + { + tsd->total++; + ev_send(&thr->priority_events, ev_new_init(p, bird_thread_show, tsd)); + wakeup_do_kick(thr); + } + + UNLOCK_DOMAIN(resource, birdloop_domain); + UNLOCK_DOMAIN(control, tsd->lock); +} + /* * Birdloop */ -struct birdloop main_birdloop; +static struct bird_thread main_thread; +struct birdloop main_birdloop = { .thread = &main_thread, }; static void birdloop_enter_locked(struct birdloop *loop); void birdloop_init(void) { - wakeup_init(&main_birdloop); + ns_init(); + + birdloop_domain = DOMAIN_NEW(resource, "Loop Pickup"); + init_list(&birdloop_pickup); + init_list(&bird_thread_pickup); + + wakeup_init(main_birdloop.thread); main_birdloop.time.domain = the_bird_domain.the_bird; main_birdloop.time.loop = &main_birdloop; @@ -406,8 +865,6 @@ birdloop_init(void) birdloop_enter_locked(&main_birdloop); } -static void *birdloop_main(void *arg); - struct birdloop * birdloop_new(pool *pp, uint order, const char *name) { @@ -422,24 +879,14 @@ birdloop_new(pool *pp, uint order, const char *name) birdloop_enter(loop); - wakeup_init(loop); ev_init_list(&loop->event_list, loop, name); timers_init(&loop->time, p); sockets_init(loop); - int e = 0; - - if (e = pthread_attr_init(&loop->thread_attr)) - die("pthread_attr_init() failed: %M", e); - - if (e = pthread_attr_setstacksize(&loop->thread_attr, THREAD_STACK_SIZE)) - die("pthread_attr_setstacksize(%u) failed: %M", THREAD_STACK_SIZE, e); - - if (e = pthread_attr_setdetachstate(&loop->thread_attr, PTHREAD_CREATE_DETACHED)) - die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e); - - if (e = pthread_create(&loop->thread_id, &loop->thread_attr, birdloop_main, loop)) - die("pthread_create() failed: %M", e); + LOCK_DOMAIN(resource, birdloop_domain); + add_tail(&birdloop_pickup, &loop->n); + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); + UNLOCK_DOMAIN(resource, birdloop_domain); birdloop_leave(loop); @@ -451,7 +898,11 @@ birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { loop->stopped = stopped; loop->stop_data = data; - wakeup_do_kick(loop); + if (loop->thread) + { + atomic_store_explicit(&loop->thread->run_cleanup, 1, memory_order_release); + wakeup_do_kick(loop->thread); + } } void @@ -475,10 +926,7 @@ void birdloop_free(struct birdloop *loop) { ASSERT_DIE(loop->links == 0); - ASSERT_DIE(pthread_equal(pthread_self(), loop->thread_id)); - - rcu_birdloop_stop(&loop->rcu); - pthread_attr_destroy(&loop->thread_attr); + ASSERT_DIE(birdloop_in_this_thread(loop)); domain_free(loop->time.domain); rfree(loop->pool); @@ -541,7 +989,7 @@ birdloop_unmask_wakeups(struct birdloop *loop) ASSERT_DIE(birdloop_wakeup_masked == loop); birdloop_wakeup_masked = NULL; if (birdloop_wakeup_masked_count) - wakeup_do_kick(loop); + wakeup_do_kick(loop->thread); birdloop_wakeup_masked_count = 0; } @@ -560,85 +1008,6 @@ birdloop_unlink(struct birdloop *loop) loop->links--; } -static void * -birdloop_main(void *arg) -{ - struct birdloop *loop = arg; - timer *t; - int rv, timeout; - - rcu_birdloop_start(&loop->rcu); - - btime loop_begin = current_time(); - - tmp_init(loop->pool); - - birdloop_enter(loop); - while (1) - { - timers_fire(&loop->time, 0); - if (birdloop_process_flags(loop) + birdloop_run_events(loop)) - timeout = 0; - else if (t = timers_first(&loop->time)) - timeout = (tm_remains(t) TO_MS) + 1; - else - timeout = -1; - - if (loop->poll_changed) - sockets_prepare(loop); - else - if ((timeout < 0) || (timeout > 5000)) - flush_local_pages(); - - btime duration = current_time() - loop_begin; - if (duration > config->watchdog_warning) - log(L_WARN "I/O loop cycle took %d ms", (int) (duration TO_MS)); - - birdloop_leave(loop); - - try: - rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout); - if (rv < 0) - { - if (errno == EINTR || errno == EAGAIN) - goto try; - bug("poll: %m"); - } - - birdloop_enter(loop); - - if (loop->close_scheduled) - sockets_close_fds(loop); - - if (loop->stopped) - break; - - loop_begin = current_time(); - - if (rv && !loop->poll_changed) - sockets_fire(loop); - - atomic_exchange_explicit(&loop->ping_sent, 0, memory_order_acq_rel); - } - - /* Flush remaining events */ - ASSERT_DIE(!ev_run_list(&loop->event_list)); - - /* Drop timers */ - while (t = timers_first(&loop->time)) - tm_stop(t); - - /* No sockets allowed */ - ASSERT_DIE(EMPTY_LIST(loop->sock_list)); - ASSERT_DIE(loop->sock_num == 0); - - birdloop_leave(loop); - loop->stopped(loop->stop_data); - - flush_local_pages(); - return NULL; -} - void birdloop_yield(void) { diff --git a/sysdep/unix/io-loop.h b/sysdep/unix/io-loop.h index 29ca96d6..33d529da 100644 --- a/sysdep/unix/io-loop.h +++ b/sysdep/unix/io-loop.h @@ -21,26 +21,16 @@ void pipe_kick(struct pipe *); struct birdloop { + node n; + pool *pool; struct timeloop time; event_list event_list; list sock_list; - uint sock_num; - - BUFFER(sock *) poll_sk; - BUFFER(struct pollfd) poll_fd; - u8 poll_changed; - u8 close_scheduled; + int sock_num; uint ping_pending; - _Atomic u32 ping_sent; - struct pipe wakeup; - - pthread_t thread_id; - pthread_attr_t thread_attr; - - struct rcu_birdloop rcu; uint links; @@ -51,6 +41,36 @@ struct birdloop void *stop_data; struct birdloop *prev_loop; + + struct bird_thread *thread; + struct pollfd *pfd; + + u64 total_time_spent_ns; +}; + +struct bird_thread +{ + node n; + + struct pollfd *pfd; + uint pfd_max; + + _Atomic u32 ping_sent; + _Atomic u32 run_cleanup; + _Atomic u32 poll_changed; + + struct pipe wakeup; + event_list priority_events; + + pthread_t thread_id; + pthread_attr_t thread_attr; + + struct rcu_thread rcu; + + list loops; + pool *pool; + + event cleanup_event; }; #endif diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index c91bd597..c1466b56 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -2254,7 +2254,7 @@ io_loop(void) } /* A hack to reload main io_loop() when something has changed asynchronously. */ - pipe_pollin(&main_birdloop.wakeup, &pfd[0]); + pipe_pollin(&main_birdloop.thread->wakeup, &pfd[0]); nfds = 1; @@ -2332,8 +2332,8 @@ io_loop(void) if (pfd[0].revents & POLLIN) { /* IO loop reload requested */ - pipe_drain(&main_birdloop.wakeup); - atomic_exchange_explicit(&main_birdloop.ping_sent, 0, memory_order_acq_rel); + pipe_drain(&main_birdloop.thread->wakeup); + atomic_exchange_explicit(&main_birdloop.thread->ping_sent, 0, memory_order_acq_rel); continue; } diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c index 74827d98..01cc0c93 100644 --- a/sysdep/unix/main.c +++ b/sysdep/unix/main.c @@ -200,9 +200,11 @@ sysdep_preconfig(struct config *c) } int -sysdep_commit(struct config *new, struct config *old UNUSED) +sysdep_commit(struct config *new, struct config *old) { log_switch(0, &new->logfiles, new->syslog_name); + bird_thread_commit(new, old); + return 0; } diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h index ad85d1ea..e26afe41 100644 --- a/sysdep/unix/unix.h +++ b/sysdep/unix/unix.h @@ -16,6 +16,7 @@ struct pool; struct iface; struct birdsock; struct rfile; +struct config; /* main.c */ @@ -32,6 +33,8 @@ void cmd_reconfig_undo(void); void cmd_reconfig_status(void); void cmd_shutdown(void); void cmd_graceful_restart(void); +void cmd_show_threads(int); +void bird_thread_commit(struct config *new, struct config *old); #define UNIX_DEFAULT_CONFIGURE_TIMEOUT 300 |