summaryrefslogtreecommitdiff
path: root/sysdep/unix/io-loop.c
diff options
context:
space:
mode:
Diffstat (limited to 'sysdep/unix/io-loop.c')
-rw-r--r--sysdep/unix/io-loop.c773
1 files changed, 571 insertions, 202 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c
index 5ce2d350..cc2e0523 100644
--- a/sysdep/unix/io-loop.c
+++ b/sysdep/unix/io-loop.c
@@ -18,6 +18,7 @@
#include "lib/buffer.h"
#include "lib/lists.h"
+#include "lib/locking.h"
#include "lib/resource.h"
#include "lib/event.h"
#include "lib/timer.h"
@@ -26,10 +27,37 @@
#include "lib/io-loop.h"
#include "sysdep/unix/io-loop.h"
#include "conf/conf.h"
+#include "nest/cli.h"
#define THREAD_STACK_SIZE 65536 /* To be lowered in near future */
/*
+ * Nanosecond time for accounting purposes
+ *
+ * A fixed point on startup is set as zero, all other values are relative to that.
+ * Caution: this overflows after like 500 years or so. If you plan to run
+ * BIRD for such a long time, please implement some means of overflow prevention.
+ */
+
+static struct timespec ns_begin;
+
+static void ns_init(void)
+{
+ if (clock_gettime(CLOCK_MONOTONIC, &ns_begin))
+ bug("clock_gettime: %m");
+}
+
+static u64 ns_now(void)
+{
+ struct timespec ts;
+ if (clock_gettime(CLOCK_MONOTONIC, &ts))
+ bug("clock_gettime: %m");
+
+ return (u64) (ts.tv_sec - ns_begin.tv_sec) * 1000000000 + ts.tv_nsec - ns_begin.tv_nsec;
+}
+
+
+/*
* Current thread context
*/
@@ -59,6 +87,12 @@ birdloop_inside(struct birdloop *loop)
return 0;
}
+_Bool
+birdloop_in_this_thread(struct birdloop *loop)
+{
+ return pthread_equal(pthread_self(), loop->thread->thread_id);
+}
+
void
birdloop_flag(struct birdloop *loop, u32 flag)
{
@@ -80,22 +114,10 @@ birdloop_process_flags(struct birdloop *loop)
return 0;
u32 flags = atomic_exchange_explicit(&loop->flags, 0, memory_order_acq_rel);
- loop->flag_handler->hook(loop->flag_handler, flags);
- return !!flags;
-}
-
-static int
-birdloop_run_events(struct birdloop *loop)
-{
- btime begin = current_time();
- while (current_time() - begin < 5 MS)
- {
- if (!ev_run_list(&loop->event_list))
- return 0;
-
- times_update();
- }
+ if (!flags)
+ return 0;
+ loop->flag_handler->hook(loop->flag_handler, flags);
return 1;
}
@@ -160,7 +182,7 @@ pipe_kick(struct pipe *p)
while (1) {
rv = write(p->fd[1], &v, sizeof(v));
- if ((rv >= 0) || (errno == EAGAIN))
+ if ((rv >= 0) || (errno == EAGAIN))
return;
if (errno != EINTR)
bug("wakeup write: %m");
@@ -176,19 +198,19 @@ pipe_pollin(struct pipe *p, struct pollfd *pfd)
}
static inline void
-wakeup_init(struct birdloop *loop)
+wakeup_init(struct bird_thread *loop)
{
pipe_new(&loop->wakeup);
}
static inline void
-wakeup_drain(struct birdloop *loop)
+wakeup_drain(struct bird_thread *loop)
{
pipe_drain(&loop->wakeup);
}
static inline void
-wakeup_do_kick(struct birdloop *loop)
+wakeup_do_kick(struct bird_thread *loop)
{
pipe_kick(&loop->wakeup);
}
@@ -196,13 +218,16 @@ wakeup_do_kick(struct birdloop *loop)
static inline void
birdloop_do_ping(struct birdloop *loop)
{
- if (atomic_fetch_add_explicit(&loop->ping_sent, 1, memory_order_acq_rel))
+ if (!loop->thread)
+ return;
+
+ if (atomic_fetch_add_explicit(&loop->thread->ping_sent, 1, memory_order_acq_rel))
return;
if (loop == birdloop_wakeup_masked)
birdloop_wakeup_masked_count++;
else
- wakeup_do_kick(loop);
+ wakeup_do_kick(loop->thread);
}
void
@@ -224,10 +249,6 @@ sockets_init(struct birdloop *loop)
{
init_list(&loop->sock_list);
loop->sock_num = 0;
-
- BUFFER_INIT(loop->poll_sk, loop->pool, 4);
- BUFFER_INIT(loop->poll_fd, loop->pool, 4);
- loop->poll_changed = 1; /* add wakeup fd */
}
static void
@@ -237,7 +258,8 @@ sockets_add(struct birdloop *loop, sock *s)
loop->sock_num++;
s->index = -1;
- loop->poll_changed = 1;
+ if (loop->thread)
+ atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
birdloop_ping(loop);
}
@@ -255,19 +277,20 @@ sockets_remove(struct birdloop *loop, sock *s)
if (!enlisted(&s->n))
return;
+ /* Decouple the socket from the loop at all. */
rem_node(&s->n);
loop->sock_num--;
+ if (loop->thread)
+ atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
- if (s->index >= 0)
- {
- loop->poll_sk.data[s->index] = NULL;
- s->index = -1;
- loop->poll_changed = 1;
- loop->close_scheduled = 1;
- birdloop_ping(loop);
- }
- else
- close(s->fd);
+ s->index = -1;
+
+ /* Close the filedescriptor. If it ever gets into the poll(), it just returns
+ * POLLNVAL for this fd which then is ignored because nobody checks for
+ * that result. Or some other routine opens another fd, getting this number,
+ * yet also in this case poll() at worst spuriously returns and nobody checks
+ * for the result in this fd. No further precaution is needed. */
+ close(s->fd);
}
void
@@ -279,123 +302,559 @@ sk_stop(sock *s)
static inline uint sk_want_events(sock *s)
{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
-/*
-FIXME: this should be called from sock code
-
-static void
-sockets_update(struct birdloop *loop, sock *s)
-{
- if (s->index >= 0)
- loop->poll_fd.data[s->index].events = sk_want_events(s);
-}
-*/
-
-static void
-sockets_prepare(struct birdloop *loop)
+static struct pollfd *
+sockets_prepare(struct birdloop *loop, struct pollfd *pfd, struct pollfd *end)
{
- BUFFER_SET(loop->poll_sk, loop->sock_num + 1);
- BUFFER_SET(loop->poll_fd, loop->sock_num + 1);
-
- struct pollfd *pfd = loop->poll_fd.data;
- sock **psk = loop->poll_sk.data;
- uint i = 0;
node *n;
+ loop->pfd = pfd;
WALK_LIST(n, loop->sock_list)
{
sock *s = SKIP_BACK(sock, n, n);
- ASSERT(i < loop->sock_num);
+ /* Out of space for pfds. Force reallocation. */
+ if (pfd >= end)
+ return NULL;
+
+ s->index = pfd - loop->pfd;
- s->index = i;
- *psk = s;
pfd->fd = s->fd;
pfd->events = sk_want_events(s);
pfd->revents = 0;
pfd++;
- psk++;
- i++;
}
- ASSERT(i == loop->sock_num);
+ return pfd;
+}
+
+int sk_read(sock *s, int revents);
+int sk_write(sock *s);
+
+static void
+sockets_fire(struct birdloop *loop)
+{
+ struct pollfd *pfd = loop->pfd;
+
+ times_update();
+
+ sock *s; node *n, *nxt;
+ WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
+ {
+ if (s->index < 0)
+ continue;
+
+ int rev = pfd[s->index].revents;
+
+ if (!rev)
+ continue;
+
+ if (rev & POLLNVAL)
+ bug("poll: invalid fd %d", s->fd);
+
+ int e = 1;
+
+ if (rev & POLLIN)
+ while (e && s->rx_hook)
+ e = sk_read(s, rev);
+
+ if (rev & POLLOUT)
+ {
+ atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
+ while (e = sk_write(s))
+ ;
+ }
+ }
+}
+
+/*
+ * Threads
+ */
+
+DEFINE_DOMAIN(resource);
+static DOMAIN(resource) birdloop_domain;
+static list birdloop_pickup;
+static list bird_thread_pickup;
+
+static _Thread_local struct bird_thread *this_thread;
+
+static void *
+bird_thread_main(void *arg)
+{
+ struct bird_thread *thr = this_thread = arg;
+
+ rcu_thread_start(&thr->rcu);
+ synchronize_rcu();
+
+ tmp_init(thr->pool);
+ init_list(&thr->loops);
+
+ u32 refresh_sockets = 1;
+
+ struct pollfd *pfd, *end;
+
+ while (1)
+ {
+ /* Wakeup at least once a minute. */
+ int timeout = 60000;
+
+ /* Pickup new loops */
+ LOCK_DOMAIN(resource, birdloop_domain);
+ if (!EMPTY_LIST(birdloop_pickup))
+ {
+ struct birdloop *loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup));
+ rem_node(&loop->n);
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ add_tail(&thr->loops, &loop->n);
+
+ birdloop_enter(loop);
+ loop->thread = thr;
+ if (!EMPTY_LIST(loop->sock_list))
+ refresh_sockets = 1;
+ birdloop_leave(loop);
+
+ /* If there are more loops to be picked up, wakeup the next thread */
+ LOCK_DOMAIN(resource, birdloop_domain);
+ rem_node(&thr->n);
+ add_tail(&bird_thread_pickup, &thr->n);
+
+ if (!EMPTY_LIST(birdloop_pickup))
+ wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
+ }
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ struct birdloop *loop; node *nn;
+ WALK_LIST2(loop, nn, thr->loops, n)
+ {
+ birdloop_enter(loop);
+ u64 after_enter = ns_now();
+
+ timer *t;
+
+ times_update();
+ timers_fire(&loop->time, 0);
+ int again = birdloop_process_flags(loop) + ev_run_list(&loop->event_list);
+
+#if 0
+ if (loop->n.next->next)
+ __builtin_prefetch(SKIP_BACK(struct birdloop, n, loop->n.next)->time.domain);
+#endif
+
+ if (again)
+ timeout = MIN(0, timeout);
+ else if (t = timers_first(&loop->time))
+ timeout = MIN(((tm_remains(t) TO_MS) + 1), timeout);
+
+ u64 before_leave = ns_now();
+ loop->total_time_spent_ns += (before_leave - after_enter);
+ birdloop_leave(loop);
+
+ ev_run_list(&thr->priority_events);
+ }
+
+ refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel);
- /* Add internal wakeup fd */
- *psk = NULL;
- pipe_pollin(&loop->wakeup, pfd);
+ if (!refresh_sockets && ((timeout < 0) || (timeout > 5000)))
+ flush_local_pages();
- loop->poll_changed = 0;
+ while (refresh_sockets)
+ {
+sock_retry:;
+ end = (pfd = thr->pfd) + thr->pfd_max;
+
+ /* Add internal wakeup fd */
+ pipe_pollin(&thr->wakeup, pfd);
+ pfd++;
+
+ WALK_LIST2(loop, nn, thr->loops, n)
+ {
+ birdloop_enter(loop);
+ pfd = sockets_prepare(loop, pfd, end);
+ birdloop_leave(loop);
+
+ if (!pfd)
+ {
+ mb_free(thr->pfd);
+ thr->pfd = mb_alloc(thr->pool, sizeof(struct pollfd) * (thr->pfd_max *= 2));
+ goto sock_retry;
+ }
+ }
+
+ refresh_sockets = 0;
+ }
+
+poll_retry:;
+ int rv = poll(thr->pfd, pfd - thr->pfd, timeout);
+ if (rv < 0)
+ {
+ if (errno == EINTR || errno == EAGAIN)
+ goto poll_retry;
+ bug("poll in %p: %m", thr);
+ }
+
+ /* Drain wakeup fd */
+ if (thr->pfd[0].revents & POLLIN)
+ {
+ ASSERT_DIE(rv > 0);
+ rv--;
+ wakeup_drain(thr);
+ }
+
+ atomic_exchange_explicit(&thr->ping_sent, 0, memory_order_acq_rel);
+
+ if (!rv && !atomic_exchange_explicit(&thr->run_cleanup, 0, memory_order_acq_rel))
+ continue;
+
+ /* Process stops and regular sockets */
+ node *nxt;
+ WALK_LIST2_DELSAFE(loop, nn, nxt, thr->loops, n)
+ {
+ birdloop_enter(loop);
+
+ if (loop->stopped)
+ {
+ /* Flush remaining events */
+ ASSERT_DIE(!ev_run_list(&loop->event_list));
+
+ /* Drop timers */
+ timer *t;
+ while (t = timers_first(&loop->time))
+ tm_stop(t);
+
+ /* No sockets allowed */
+ ASSERT_DIE(EMPTY_LIST(loop->sock_list));
+
+ /* Declare loop stopped */
+ rem_node(&loop->n);
+ birdloop_leave(loop);
+ loop->stopped(loop->stop_data);
+
+ /* Birdloop already left */
+ continue;
+ }
+ else if (rv)
+ sockets_fire(loop);
+
+ birdloop_leave(loop);
+ }
+ }
}
static void
-sockets_close_fds(struct birdloop *loop)
+bird_thread_cleanup(void *_thr)
{
- struct pollfd *pfd = loop->poll_fd.data;
- sock **psk = loop->poll_sk.data;
- int poll_num = loop->poll_fd.used - 1;
+ struct bird_thread *thr = _thr;
+ ASSERT_DIE(birdloop_inside(&main_birdloop));
- int i;
- for (i = 0; i < poll_num; i++)
- if (psk[i] == NULL)
- close(pfd[i].fd);
+ /* Thread attributes no longer needed */
+ pthread_attr_destroy(&thr->thread_attr);
- loop->close_scheduled = 0;
+ /* Free all remaining memory */
+ rfree(thr->pool);
}
-int sk_read(sock *s, int revents);
-int sk_write(sock *s);
+static struct bird_thread *
+bird_thread_start(void)
+{
+ ASSERT_DIE(birdloop_inside(&main_birdloop));
+
+ pool *p = rp_new(&root_pool, "Thread");
+
+ struct bird_thread *thr = mb_allocz(p, sizeof(*thr));
+ thr->pool = p;
+ thr->pfd = mb_alloc(p, sizeof(struct pollfd) * (thr->pfd_max = 16));
+ thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, };
+
+ atomic_store_explicit(&thr->ping_sent, 0, memory_order_relaxed);
+
+ wakeup_init(thr);
+ ev_init_list(&thr->priority_events, NULL, "Thread direct event list");
+
+ LOCK_DOMAIN(resource, birdloop_domain);
+ add_tail(&bird_thread_pickup, &thr->n);
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ int e = 0;
+
+ if (e = pthread_attr_init(&thr->thread_attr))
+ die("pthread_attr_init() failed: %M", e);
+
+ /* We don't have to worry about thread stack size so much.
+ if (e = pthread_attr_setstacksize(&thr->thread_attr, THREAD_STACK_SIZE))
+ die("pthread_attr_setstacksize(%u) failed: %M", THREAD_STACK_SIZE, e);
+ */
+
+ if (e = pthread_attr_setdetachstate(&thr->thread_attr, PTHREAD_CREATE_DETACHED))
+ die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e);
+
+ if (e = pthread_create(&thr->thread_id, &thr->thread_attr, bird_thread_main, thr))
+ die("pthread_create() failed: %M", e);
+
+ return thr;
+}
+
+static struct birdloop *thread_dropper;
+static event *thread_dropper_event;
+static uint thread_dropper_goal;
static void
-sockets_fire(struct birdloop *loop)
+bird_thread_shutdown(void * _ UNUSED)
{
- struct pollfd *pfd = loop->poll_fd.data;
- sock **psk = loop->poll_sk.data;
- int poll_num = loop->poll_fd.used - 1;
+ LOCK_DOMAIN(resource, birdloop_domain);
+ int dif = list_length(&bird_thread_pickup) - thread_dropper_goal;
+ struct birdloop *tdl_stop = NULL;
- times_update();
+ if (dif > 0)
+ ev_send_loop(thread_dropper, thread_dropper_event);
+ else
+ {
+ tdl_stop = thread_dropper;
+ thread_dropper = NULL;
+ }
- /* Last fd is internal wakeup fd */
- if (pfd[poll_num].revents & POLLIN)
- wakeup_drain(loop);
+ UNLOCK_DOMAIN(resource, birdloop_domain);
- int i;
- for (i = 0; i < poll_num; pfd++, psk++, i++)
+ log(L_INFO "Thread pickup size differs from dropper goal by %d%s", dif, tdl_stop ? ", stopping" : "");
+
+ if (tdl_stop)
{
- int e = 1;
+ birdloop_stop_self(tdl_stop, NULL, NULL);
+ return;
+ }
- if (! pfd->revents)
- continue;
+ struct bird_thread *thr = this_thread;
+
+ /* Leave the thread-picker list to get no more loops */
+ LOCK_DOMAIN(resource, birdloop_domain);
+ rem_node(&thr->n);
+
+ /* Drop loops including the thread dropper itself */
+ while (!EMPTY_LIST(thr->loops))
+ {
+ /* Remove loop from this thread's list */
+ struct birdloop *loop = HEAD(thr->loops);
+ rem_node(&loop->n);
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ /* Unset loop's thread */
+ if (birdloop_inside(loop))
+ loop->thread = NULL;
+ else
+ {
+ birdloop_enter(loop);
+ loop->thread = NULL;
+ birdloop_leave(loop);
+ }
+
+ /* Put loop into pickup list */
+ LOCK_DOMAIN(resource, birdloop_domain);
+ add_tail(&birdloop_pickup, &loop->n);
+ }
+
+ /* Let others know about new loops */
+ if (!EMPTY_LIST(birdloop_pickup))
+ wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ /* Leave the thread-dropper loop as we aren't going to return. */
+ birdloop_leave(thread_dropper);
+
+ /* Local pages not needed anymore */
+ flush_local_pages();
+
+ /* Unregister from RCU */
+ rcu_thread_stop(&thr->rcu);
+
+ /* Request thread cleanup from main loop */
+ ev_send_loop(&main_birdloop, &thr->cleanup_event);
+
+ /* Exit! */
+ pthread_exit(NULL);
+}
+
+
+void
+bird_thread_commit(struct config *new, struct config *old UNUSED)
+{
+ ASSERT_DIE(birdloop_inside(&main_birdloop));
- if (pfd->revents & POLLNVAL)
- bug("poll: invalid fd %d", pfd->fd);
+ if (new->shutdown)
+ return;
- if (pfd->revents & POLLIN)
- while (e && *psk && (*psk)->rx_hook)
- e = sk_read(*psk, pfd->revents);
+ if (!new->thread_count)
+ new->thread_count = 1;
- e = 1;
- if (pfd->revents & POLLOUT)
+ while (1)
+ {
+ LOCK_DOMAIN(resource, birdloop_domain);
+ int dif = list_length(&bird_thread_pickup) - (thread_dropper_goal = new->thread_count);
+ _Bool thread_dropper_running = !!thread_dropper;
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ if (dif < 0)
{
- loop->poll_changed = 1;
- while (e && *psk)
- e = sk_write(*psk);
+ bird_thread_start();
+ continue;
}
+
+ if ((dif > 0) && !thread_dropper_running)
+ {
+ struct birdloop *tdl = birdloop_new(&root_pool, DOMAIN_ORDER(control), "Thread dropper");
+ event *tde = ev_new_init(tdl->pool, bird_thread_shutdown, NULL);
+
+ LOCK_DOMAIN(resource, birdloop_domain);
+ thread_dropper = tdl;
+ thread_dropper_event = tde;
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ ev_send_loop(thread_dropper, thread_dropper_event);
+ }
+
+ return;
+ }
+}
+
+
+DEFINE_DOMAIN(control);
+
+struct bird_thread_show_data {
+ cli *cli;
+ pool *pool;
+ DOMAIN(control) lock;
+ uint total;
+ uint done;
+ u8 show_loops;
+};
+
+static void
+bird_thread_show_cli_cont(struct cli *c UNUSED)
+{
+ /* Explicitly do nothing to prevent CLI from trying to parse another command. */
+}
+
+static int
+bird_thread_show_cli_cleanup(struct cli *c UNUSED)
+{
+ return 1; /* Defer the cleanup until the writeout is finished. */
+}
+
+static void
+bird_thread_show(void *data)
+{
+ struct bird_thread_show_data *tsd = data;
+
+ LOCK_DOMAIN(control, tsd->lock);
+ if (tsd->show_loops)
+ cli_printf(tsd->cli, -1026, "Thread %p", this_thread);
+
+ u64 total_time_ns = 0;
+ struct birdloop *loop;
+ WALK_LIST(loop, this_thread->loops)
+ {
+ if (tsd->show_loops)
+ cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS);
+ total_time_ns += loop->total_time_spent_ns;
+ }
+
+ tsd->done++;
+ int last = (tsd->done == tsd->total);
+
+ if (last)
+ {
+ tsd->cli->cont = NULL;
+ tsd->cli->cleanup = NULL;
+ }
+
+ if (tsd->show_loops)
+ cli_printf(tsd->cli, (last ? 1 : -1) * 1026, " Total time: %t", total_time_ns NS);
+ else
+ cli_printf(tsd->cli, (last ? 1 : -1) * 1026, "Thread %p time %t", this_thread, total_time_ns NS);
+
+ UNLOCK_DOMAIN(control, tsd->lock);
+
+ if (last)
+ {
+ the_bird_lock();
+
+ LOCK_DOMAIN(resource, birdloop_domain);
+ if (!EMPTY_LIST(birdloop_pickup))
+ if (tsd->show_loops)
+ {
+ cli_printf(tsd->cli, -1026, "Unassigned loops");
+ WALK_LIST(loop, birdloop_pickup)
+ cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS);
+ }
+ else
+ {
+ uint count = 0;
+ u64 total_time_ns = 0;
+ WALK_LIST(loop, birdloop_pickup)
+ {
+ count++;
+ total_time_ns += loop->total_time_spent_ns;
+ }
+ cli_printf(tsd->cli, -1026, "Unassigned loops: %d, total time %t", count, total_time_ns NS);
+ }
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ cli_write_trigger(tsd->cli);
+ DOMAIN_FREE(control, tsd->lock);
+ rfree(tsd->pool);
+
+ the_bird_unlock();
}
}
+void
+cmd_show_threads(int show_loops)
+{
+ pool *p = rp_new(&root_pool, "Show Threads");
+
+ struct bird_thread_show_data *tsd = mb_allocz(p, sizeof(struct bird_thread_show_data));
+ tsd->lock = DOMAIN_NEW(control, "Show Threads");
+ tsd->cli = this_cli;
+ tsd->pool = p;
+ tsd->show_loops = show_loops;
+
+ this_cli->cont = bird_thread_show_cli_cont;
+ this_cli->cleanup = bird_thread_show_cli_cleanup;
+
+ LOCK_DOMAIN(control, tsd->lock);
+ LOCK_DOMAIN(resource, birdloop_domain);
+
+ struct bird_thread *thr;
+ WALK_LIST(thr, bird_thread_pickup)
+ {
+ tsd->total++;
+ ev_send(&thr->priority_events, ev_new_init(p, bird_thread_show, tsd));
+ wakeup_do_kick(thr);
+ }
+
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+ UNLOCK_DOMAIN(control, tsd->lock);
+}
+
/*
* Birdloop
*/
-struct birdloop main_birdloop;
+static struct bird_thread main_thread;
+struct birdloop main_birdloop = { .thread = &main_thread, };
static void birdloop_enter_locked(struct birdloop *loop);
void
birdloop_init(void)
{
- wakeup_init(&main_birdloop);
+ ns_init();
+
+ birdloop_domain = DOMAIN_NEW(resource, "Loop Pickup");
+ init_list(&birdloop_pickup);
+ init_list(&bird_thread_pickup);
+
+ wakeup_init(main_birdloop.thread);
main_birdloop.time.domain = the_bird_domain.the_bird;
main_birdloop.time.loop = &main_birdloop;
@@ -406,8 +865,6 @@ birdloop_init(void)
birdloop_enter_locked(&main_birdloop);
}
-static void *birdloop_main(void *arg);
-
struct birdloop *
birdloop_new(pool *pp, uint order, const char *name)
{
@@ -422,24 +879,14 @@ birdloop_new(pool *pp, uint order, const char *name)
birdloop_enter(loop);
- wakeup_init(loop);
ev_init_list(&loop->event_list, loop, name);
timers_init(&loop->time, p);
sockets_init(loop);
- int e = 0;
-
- if (e = pthread_attr_init(&loop->thread_attr))
- die("pthread_attr_init() failed: %M", e);
-
- if (e = pthread_attr_setstacksize(&loop->thread_attr, THREAD_STACK_SIZE))
- die("pthread_attr_setstacksize(%u) failed: %M", THREAD_STACK_SIZE, e);
-
- if (e = pthread_attr_setdetachstate(&loop->thread_attr, PTHREAD_CREATE_DETACHED))
- die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e);
-
- if (e = pthread_create(&loop->thread_id, &loop->thread_attr, birdloop_main, loop))
- die("pthread_create() failed: %M", e);
+ LOCK_DOMAIN(resource, birdloop_domain);
+ add_tail(&birdloop_pickup, &loop->n);
+ wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
+ UNLOCK_DOMAIN(resource, birdloop_domain);
birdloop_leave(loop);
@@ -451,7 +898,11 @@ birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
{
loop->stopped = stopped;
loop->stop_data = data;
- wakeup_do_kick(loop);
+ if (loop->thread)
+ {
+ atomic_store_explicit(&loop->thread->run_cleanup, 1, memory_order_release);
+ wakeup_do_kick(loop->thread);
+ }
}
void
@@ -475,10 +926,7 @@ void
birdloop_free(struct birdloop *loop)
{
ASSERT_DIE(loop->links == 0);
- ASSERT_DIE(pthread_equal(pthread_self(), loop->thread_id));
-
- rcu_birdloop_stop(&loop->rcu);
- pthread_attr_destroy(&loop->thread_attr);
+ ASSERT_DIE(birdloop_in_this_thread(loop));
domain_free(loop->time.domain);
rfree(loop->pool);
@@ -541,7 +989,7 @@ birdloop_unmask_wakeups(struct birdloop *loop)
ASSERT_DIE(birdloop_wakeup_masked == loop);
birdloop_wakeup_masked = NULL;
if (birdloop_wakeup_masked_count)
- wakeup_do_kick(loop);
+ wakeup_do_kick(loop->thread);
birdloop_wakeup_masked_count = 0;
}
@@ -560,85 +1008,6 @@ birdloop_unlink(struct birdloop *loop)
loop->links--;
}
-static void *
-birdloop_main(void *arg)
-{
- struct birdloop *loop = arg;
- timer *t;
- int rv, timeout;
-
- rcu_birdloop_start(&loop->rcu);
-
- btime loop_begin = current_time();
-
- tmp_init(loop->pool);
-
- birdloop_enter(loop);
- while (1)
- {
- timers_fire(&loop->time, 0);
- if (birdloop_process_flags(loop) + birdloop_run_events(loop))
- timeout = 0;
- else if (t = timers_first(&loop->time))
- timeout = (tm_remains(t) TO_MS) + 1;
- else
- timeout = -1;
-
- if (loop->poll_changed)
- sockets_prepare(loop);
- else
- if ((timeout < 0) || (timeout > 5000))
- flush_local_pages();
-
- btime duration = current_time() - loop_begin;
- if (duration > config->watchdog_warning)
- log(L_WARN "I/O loop cycle took %d ms", (int) (duration TO_MS));
-
- birdloop_leave(loop);
-
- try:
- rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout);
- if (rv < 0)
- {
- if (errno == EINTR || errno == EAGAIN)
- goto try;
- bug("poll: %m");
- }
-
- birdloop_enter(loop);
-
- if (loop->close_scheduled)
- sockets_close_fds(loop);
-
- if (loop->stopped)
- break;
-
- loop_begin = current_time();
-
- if (rv && !loop->poll_changed)
- sockets_fire(loop);
-
- atomic_exchange_explicit(&loop->ping_sent, 0, memory_order_acq_rel);
- }
-
- /* Flush remaining events */
- ASSERT_DIE(!ev_run_list(&loop->event_list));
-
- /* Drop timers */
- while (t = timers_first(&loop->time))
- tm_stop(t);
-
- /* No sockets allowed */
- ASSERT_DIE(EMPTY_LIST(loop->sock_list));
- ASSERT_DIE(loop->sock_num == 0);
-
- birdloop_leave(loop);
- loop->stopped(loop->stop_data);
-
- flush_local_pages();
- return NULL;
-}
-
void
birdloop_yield(void)
{