summaryrefslogtreecommitdiff
path: root/sysdep
diff options
context:
space:
mode:
Diffstat (limited to 'sysdep')
-rw-r--r--sysdep/unix/io-loop.c514
-rw-r--r--sysdep/unix/io-loop.h24
-rw-r--r--sysdep/unix/io.c78
3 files changed, 386 insertions, 230 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c
index cc2e0523..df162c6e 100644
--- a/sysdep/unix/io-loop.c
+++ b/sysdep/unix/io-loop.c
@@ -31,6 +31,8 @@
#define THREAD_STACK_SIZE 65536 /* To be lowered in near future */
+static struct birdloop *birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup);
+
/*
* Nanosecond time for accounting purposes
*
@@ -65,6 +67,9 @@ _Thread_local struct birdloop *birdloop_current;
static _Thread_local struct birdloop *birdloop_wakeup_masked;
static _Thread_local uint birdloop_wakeup_masked_count;
+#define LOOP_TRACE(loop, fmt, args...) do { if (config && config->latency_debug) log(L_TRACE "%s (%p): " fmt, domain_name((loop)->time.domain), (loop), ##args); } while (0)
+#define THREAD_TRACE(...) do { if (config && config->latency_debug) log(L_TRACE "Thread: " __VA_ARGS__); } while (0)
+
event_list *
birdloop_event_list(struct birdloop *loop)
{
@@ -190,11 +195,13 @@ pipe_kick(struct pipe *p)
}
void
-pipe_pollin(struct pipe *p, struct pollfd *pfd)
+pipe_pollin(struct pipe *p, struct pfd *pfd)
{
- pfd->fd = p->fd[0];
- pfd->events = POLLIN;
- pfd->revents = 0;
+ BUFFER_PUSH(pfd->pfd) = (struct pollfd) {
+ .fd = p->fd[0],
+ .events = POLLIN,
+ };
+ BUFFER_PUSH(pfd->loop) = NULL;
}
static inline void
@@ -215,28 +222,80 @@ wakeup_do_kick(struct bird_thread *loop)
pipe_kick(&loop->wakeup);
}
-static inline void
-birdloop_do_ping(struct birdloop *loop)
+static inline _Bool
+birdloop_try_ping(struct birdloop *loop, u32 ltt)
{
- if (!loop->thread)
- return;
+ /* Somebody else is already pinging, be idempotent */
+ if (ltt & LTT_PING)
+ {
+ LOOP_TRACE(loop, "already being pinged");
+ return 0;
+ }
- if (atomic_fetch_add_explicit(&loop->thread->ping_sent, 1, memory_order_acq_rel))
- return;
+ /* Thread moving is an implicit ping */
+ if (ltt & LTT_MOVE)
+ {
+ LOOP_TRACE(loop, "ping while moving");
+ return 1;
+ }
+
+ /* No more flags allowed */
+ ASSERT_DIE(!ltt);
+ /* No ping when not picked up */
+ if (!loop->thread)
+ {
+ LOOP_TRACE(loop, "not picked up yet, can't ping");
+ return 1;
+ }
+
+ /* No ping when masked */
if (loop == birdloop_wakeup_masked)
+ {
+ LOOP_TRACE(loop, "wakeup masked, can't ping");
birdloop_wakeup_masked_count++;
- else
- wakeup_do_kick(loop->thread);
+ return 1;
+ }
+
+ /* Send meta event to ping */
+ if ((loop != loop->thread->meta) && (loop != &main_birdloop))
+ {
+ LOOP_TRACE(loop, "Ping by meta event to %p", loop->thread->meta);
+ ev_send_loop(loop->thread->meta, &loop->event);
+ return 1;
+ }
+
+ /* Do the real ping */
+ LOOP_TRACE(loop, "sending pipe ping");
+ wakeup_do_kick(loop->thread);
+ return 0;
+}
+
+static inline void
+birdloop_do_ping(struct birdloop *loop)
+{
+ /* Register our ping effort */
+ u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_PING, memory_order_acq_rel);
+
+ /* Try to ping in multiple ways */
+ if (birdloop_try_ping(loop, ltt))
+ atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_PING, memory_order_acq_rel);
}
void
birdloop_ping(struct birdloop *loop)
{
- if (birdloop_inside(loop) && !loop->ping_pending)
- loop->ping_pending++;
- else
+ if (!birdloop_inside(loop))
+ {
+ LOOP_TRACE(loop, "ping from outside");
birdloop_do_ping(loop);
+ }
+ else
+ {
+ LOOP_TRACE(loop, "ping from inside, pending=%d", loop->ping_pending);
+ if (!loop->ping_pending)
+ loop->ping_pending++;
+ }
}
@@ -254,6 +313,7 @@ sockets_init(struct birdloop *loop)
static void
sockets_add(struct birdloop *loop, sock *s)
{
+ LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num);
add_tail(&loop->sock_list, &s->n);
loop->sock_num++;
@@ -278,6 +338,8 @@ sockets_remove(struct birdloop *loop, sock *s)
return;
/* Decouple the socket from the loop at all. */
+ LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num);
+
rem_node(&s->n);
loop->sock_num--;
if (loop->thread)
@@ -302,30 +364,30 @@ sk_stop(sock *s)
static inline uint sk_want_events(sock *s)
{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
-static struct pollfd *
-sockets_prepare(struct birdloop *loop, struct pollfd *pfd, struct pollfd *end)
+void
+sockets_prepare(struct birdloop *loop, struct pfd *pfd)
{
node *n;
- loop->pfd = pfd;
-
WALK_LIST(n, loop->sock_list)
{
sock *s = SKIP_BACK(sock, n, n);
+ uint w = sk_want_events(s);
- /* Out of space for pfds. Force reallocation. */
- if (pfd >= end)
- return NULL;
-
- s->index = pfd - loop->pfd;
+ if (!w)
+ {
+ s->index = -1;
+ continue;
+ }
- pfd->fd = s->fd;
- pfd->events = sk_want_events(s);
- pfd->revents = 0;
+ s->index = pfd->pfd.used;
+ LOOP_TRACE(loop, "socket %p poll index is %d", s, s->index);
- pfd++;
+ BUFFER_PUSH(pfd->pfd) = (struct pollfd) {
+ .fd = s->fd,
+ .events = sk_want_events(s),
+ };
+ BUFFER_PUSH(pfd->loop) = loop;
}
-
- return pfd;
}
int sk_read(sock *s, int revents);
@@ -334,10 +396,9 @@ int sk_write(sock *s);
static void
sockets_fire(struct birdloop *loop)
{
- struct pollfd *pfd = loop->pfd;
-
times_update();
+ struct pollfd *pfd = loop->thread->pfd->pfd.data;
sock *s; node *n, *nxt;
WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
{
@@ -378,6 +439,99 @@ static list bird_thread_pickup;
static _Thread_local struct bird_thread *this_thread;
+static void
+birdloop_set_thread(struct birdloop *loop, struct bird_thread *thr)
+{
+ /* Signal our moving effort */
+ u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_MOVE, memory_order_acq_rel);
+ ASSERT_DIE((ltt & LTT_MOVE) == 0);
+
+ while (ltt & LTT_PING)
+ {
+ birdloop_yield();
+ ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire);
+ ASSERT_DIE(ltt & LTT_MOVE);
+ }
+ /* Now we are free of running pings */
+
+ if (loop->thread = thr)
+ add_tail(&thr->loops, &loop->n);
+ else
+ {
+ LOCK_DOMAIN(resource, birdloop_domain);
+ add_tail(&birdloop_pickup, &loop->n);
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+ }
+
+ /* Finished */
+ atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel);
+
+ /* Request to run by force */
+ ev_send_loop(loop->thread->meta, &loop->event);
+}
+
+static struct birdloop *
+birdloop_take(void)
+{
+ struct birdloop *loop = NULL;
+
+ LOCK_DOMAIN(resource, birdloop_domain);
+ if (!EMPTY_LIST(birdloop_pickup))
+ {
+ /* Take the first loop from the pickup list and unlock */
+ loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup));
+ rem_node(&loop->n);
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ birdloop_set_thread(loop, this_thread);
+
+ /* This thread goes to the end of the pickup list */
+ LOCK_DOMAIN(resource, birdloop_domain);
+ rem_node(&this_thread->n);
+ add_tail(&bird_thread_pickup, &this_thread->n);
+
+ /* If there are more loops to be picked up, wakeup the next thread in order */
+ if (!EMPTY_LIST(birdloop_pickup))
+ wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
+ }
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+
+ return loop;
+}
+
+static void
+birdloop_drop(struct birdloop *loop)
+{
+ /* Remove loop from this thread's list */
+ rem_node(&loop->n);
+
+ /* Unset loop's thread */
+ if (birdloop_inside(loop))
+ birdloop_set_thread(loop, NULL);
+ else
+ {
+ birdloop_enter(loop);
+ birdloop_set_thread(loop, NULL);
+ birdloop_leave(loop);
+ }
+
+ /* Put loop into pickup list */
+ LOCK_DOMAIN(resource, birdloop_domain);
+ add_tail(&birdloop_pickup, &loop->n);
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+}
+
+static int
+poll_timeout(struct birdloop *loop)
+{
+ timer *t = timers_first(&loop->time);
+ if (!t)
+ return -1;
+
+ btime remains = tm_remains(t);
+ return remains TO_MS + ((remains TO_MS) MS < remains);
+}
+
static void *
bird_thread_main(void *arg)
{
@@ -389,103 +543,79 @@ bird_thread_main(void *arg)
tmp_init(thr->pool);
init_list(&thr->loops);
- u32 refresh_sockets = 1;
+ thr->meta = birdloop_new_internal(thr->pool, DOMAIN_ORDER(meta), "Thread Meta", 0);
+ thr->meta->thread = thr;
+ birdloop_enter(thr->meta);
- struct pollfd *pfd, *end;
+ u32 refresh_sockets = 1;
+ struct pfd pfd;
+ BUFFER_INIT(pfd.pfd, thr->pool, 16);
+ BUFFER_INIT(pfd.loop, thr->pool, 16);
+ thr->pfd = &pfd;
while (1)
{
- /* Wakeup at least once a minute. */
- int timeout = 60000;
+ int timeout;
/* Pickup new loops */
- LOCK_DOMAIN(resource, birdloop_domain);
- if (!EMPTY_LIST(birdloop_pickup))
+ struct birdloop *loop = birdloop_take();
+ if (loop)
{
- struct birdloop *loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup));
- rem_node(&loop->n);
- UNLOCK_DOMAIN(resource, birdloop_domain);
-
- add_tail(&thr->loops, &loop->n);
-
birdloop_enter(loop);
- loop->thread = thr;
if (!EMPTY_LIST(loop->sock_list))
refresh_sockets = 1;
birdloop_leave(loop);
+ }
- /* If there are more loops to be picked up, wakeup the next thread */
- LOCK_DOMAIN(resource, birdloop_domain);
- rem_node(&thr->n);
- add_tail(&bird_thread_pickup, &thr->n);
+ /* Schedule all loops with timed out timers */
+ timers_fire(&thr->meta->time, 0);
- if (!EMPTY_LIST(birdloop_pickup))
- wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
+ /* Run all scheduled loops */
+ int more_events = ev_run_list(&thr->meta->event_list);
+ if (more_events)
+ {
+ THREAD_TRACE("More events to run");
+ timeout = 0;
}
- UNLOCK_DOMAIN(resource, birdloop_domain);
-
- struct birdloop *loop; node *nn;
- WALK_LIST2(loop, nn, thr->loops, n)
+ else
{
- birdloop_enter(loop);
- u64 after_enter = ns_now();
-
- timer *t;
-
- times_update();
- timers_fire(&loop->time, 0);
- int again = birdloop_process_flags(loop) + ev_run_list(&loop->event_list);
-
-#if 0
- if (loop->n.next->next)
- __builtin_prefetch(SKIP_BACK(struct birdloop, n, loop->n.next)->time.domain);
-#endif
-
- if (again)
- timeout = MIN(0, timeout);
- else if (t = timers_first(&loop->time))
- timeout = MIN(((tm_remains(t) TO_MS) + 1), timeout);
-
- u64 before_leave = ns_now();
- loop->total_time_spent_ns += (before_leave - after_enter);
- birdloop_leave(loop);
-
- ev_run_list(&thr->priority_events);
+ timeout = poll_timeout(thr->meta);
+ if (timeout == -1)
+ THREAD_TRACE("No timers, no events");
+ else
+ THREAD_TRACE("Next timer in %d ms", timeout);
}
- refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel);
+ /* Run priority events before sleeping */
+ ev_run_list(&thr->priority_events);
- if (!refresh_sockets && ((timeout < 0) || (timeout > 5000)))
- flush_local_pages();
+ /* Do we have to refresh sockets? */
+ refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel);
- while (refresh_sockets)
+ if (refresh_sockets)
{
-sock_retry:;
- end = (pfd = thr->pfd) + thr->pfd_max;
+ BUFFER_FLUSH(pfd.pfd);
+ BUFFER_FLUSH(pfd.loop);
- /* Add internal wakeup fd */
- pipe_pollin(&thr->wakeup, pfd);
- pfd++;
+ pipe_pollin(&thr->wakeup, &pfd);
+ node *nn;
WALK_LIST2(loop, nn, thr->loops, n)
{
birdloop_enter(loop);
- pfd = sockets_prepare(loop, pfd, end);
+ sockets_prepare(loop, &pfd);
birdloop_leave(loop);
-
- if (!pfd)
- {
- mb_free(thr->pfd);
- thr->pfd = mb_alloc(thr->pool, sizeof(struct pollfd) * (thr->pfd_max *= 2));
- goto sock_retry;
- }
}
+ ASSERT_DIE(pfd.loop.used == pfd.pfd.used);
refresh_sockets = 0;
}
+ /* Nothing to do in at least 5 seconds, flush local hot page cache */
+ else if (timeout > 5000)
+ flush_local_pages();
poll_retry:;
- int rv = poll(thr->pfd, pfd - thr->pfd, timeout);
+ int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout);
if (rv < 0)
{
if (errno == EINTR || errno == EAGAIN)
@@ -494,51 +624,26 @@ poll_retry:;
}
/* Drain wakeup fd */
- if (thr->pfd[0].revents & POLLIN)
+ if (pfd.pfd.data[0].revents & POLLIN)
{
ASSERT_DIE(rv > 0);
rv--;
wakeup_drain(thr);
}
- atomic_exchange_explicit(&thr->ping_sent, 0, memory_order_acq_rel);
+ atomic_fetch_and_explicit(&thr->meta->thread_transition, ~LTT_PING, memory_order_acq_rel);
- if (!rv && !atomic_exchange_explicit(&thr->run_cleanup, 0, memory_order_acq_rel))
- continue;
-
- /* Process stops and regular sockets */
- node *nxt;
- WALK_LIST2_DELSAFE(loop, nn, nxt, thr->loops, n)
- {
- birdloop_enter(loop);
-
- if (loop->stopped)
- {
- /* Flush remaining events */
- ASSERT_DIE(!ev_run_list(&loop->event_list));
-
- /* Drop timers */
- timer *t;
- while (t = timers_first(&loop->time))
- tm_stop(t);
-
- /* No sockets allowed */
- ASSERT_DIE(EMPTY_LIST(loop->sock_list));
-
- /* Declare loop stopped */
- rem_node(&loop->n);
- birdloop_leave(loop);
- loop->stopped(loop->stop_data);
-
- /* Birdloop already left */
- continue;
- }
- else if (rv)
- sockets_fire(loop);
-
- birdloop_leave(loop);
- }
+ /* Schedule loops with active sockets */
+ if (rv)
+ for (uint i = 1; i < pfd.pfd.used; i++)
+ if (pfd.pfd.data[i].revents)
+ {
+ LOOP_TRACE(pfd.loop.data[i], "socket id %d got revents=%d", i, pfd.pfd.data[i].revents);
+ ev_send_loop(thr->meta, &pfd.loop.data[i]->event);
+ }
}
+
+ bug("An infinite loop has ended.");
}
static void
@@ -563,11 +668,8 @@ bird_thread_start(void)
struct bird_thread *thr = mb_allocz(p, sizeof(*thr));
thr->pool = p;
- thr->pfd = mb_alloc(p, sizeof(struct pollfd) * (thr->pfd_max = 16));
thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, };
- atomic_store_explicit(&thr->ping_sent, 0, memory_order_relaxed);
-
wakeup_init(thr);
ev_init_list(&thr->priority_events, NULL, "Thread direct event list");
@@ -615,7 +717,7 @@ bird_thread_shutdown(void * _ UNUSED)
UNLOCK_DOMAIN(resource, birdloop_domain);
- log(L_INFO "Thread pickup size differs from dropper goal by %d%s", dif, tdl_stop ? ", stopping" : "");
+ DBG("Thread pickup size differs from dropper goal by %d%s\n", dif, tdl_stop ? ", stopping" : "");
if (tdl_stop)
{
@@ -628,29 +730,11 @@ bird_thread_shutdown(void * _ UNUSED)
/* Leave the thread-picker list to get no more loops */
LOCK_DOMAIN(resource, birdloop_domain);
rem_node(&thr->n);
+ UNLOCK_DOMAIN(resource, birdloop_domain);
/* Drop loops including the thread dropper itself */
while (!EMPTY_LIST(thr->loops))
- {
- /* Remove loop from this thread's list */
- struct birdloop *loop = HEAD(thr->loops);
- rem_node(&loop->n);
- UNLOCK_DOMAIN(resource, birdloop_domain);
-
- /* Unset loop's thread */
- if (birdloop_inside(loop))
- loop->thread = NULL;
- else
- {
- birdloop_enter(loop);
- loop->thread = NULL;
- birdloop_leave(loop);
- }
-
- /* Put loop into pickup list */
- LOCK_DOMAIN(resource, birdloop_domain);
- add_tail(&birdloop_pickup, &loop->n);
- }
+ birdloop_drop(HEAD(thr->loops));
/* Let others know about new loops */
if (!EMPTY_LIST(birdloop_pickup))
@@ -660,6 +744,11 @@ bird_thread_shutdown(void * _ UNUSED)
/* Leave the thread-dropper loop as we aren't going to return. */
birdloop_leave(thread_dropper);
+ /* Stop the meta loop */
+ birdloop_leave(thr->meta);
+ domain_free(thr->meta->time.domain);
+ rfree(thr->meta->pool);
+
/* Local pages not needed anymore */
flush_local_pages();
@@ -865,8 +954,81 @@ birdloop_init(void)
birdloop_enter_locked(&main_birdloop);
}
-struct birdloop *
-birdloop_new(pool *pp, uint order, const char *name)
+static void
+birdloop_stop_internal(struct birdloop *loop)
+{
+ /* Flush remaining events */
+ ASSERT_DIE(!ev_run_list(&loop->event_list));
+
+ /* Drop timers */
+ timer *t;
+ while (t = timers_first(&loop->time))
+ tm_stop(t);
+
+ /* No sockets allowed */
+ ASSERT_DIE(EMPTY_LIST(loop->sock_list));
+
+ /* Unschedule from Meta */
+ ev_postpone(&loop->event);
+ tm_stop(&loop->timer);
+
+ /* Declare loop stopped */
+ rem_node(&loop->n);
+ birdloop_leave(loop);
+
+ /* Tail-call the stopped hook */
+ loop->stopped(loop->stop_data);
+}
+
+static void
+birdloop_run(void *_loop)
+{
+ /* Run priority events before the loop is executed */
+ ev_run_list(&this_thread->priority_events);
+
+ struct birdloop *loop = _loop;
+ birdloop_enter(loop);
+
+ if (loop->stopped)
+ /* Birdloop left inside the helper function */
+ return birdloop_stop_internal(loop);
+
+ /* Process sockets */
+ sockets_fire(loop);
+
+ /* Run timers */
+ timers_fire(&loop->time, 0);
+
+ /* Run flag handlers */
+ if (birdloop_process_flags(loop))
+ {
+ LOOP_TRACE(loop, "Flag processing needs another run");
+ ev_send_loop(this_thread->meta, &loop->event);
+ }
+
+ /* Run events */
+ ev_run_list(&loop->event_list);
+
+ /* Request meta timer */
+ timer *t = timers_first(&loop->time);
+ if (t)
+ tm_start_in(&loop->timer, tm_remains(t), this_thread->meta);
+ else
+ tm_stop(&loop->timer);
+
+ birdloop_leave(loop);
+}
+
+static void
+birdloop_run_timer(timer *tm)
+{
+ struct birdloop *loop = tm->data;
+ LOOP_TRACE(loop, "Timer ready, requesting run");
+ ev_send_loop(loop->thread->meta, &loop->event);
+}
+
+static struct birdloop *
+birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup)
{
struct domain_generic *dg = domain_new(name, order);
@@ -877,32 +1039,45 @@ birdloop_new(pool *pp, uint order, const char *name)
loop->time.domain = dg;
loop->time.loop = loop;
+ atomic_store_explicit(&loop->thread_transition, 0, memory_order_relaxed);
+
birdloop_enter(loop);
ev_init_list(&loop->event_list, loop, name);
timers_init(&loop->time, p);
sockets_init(loop);
- LOCK_DOMAIN(resource, birdloop_domain);
- add_tail(&birdloop_pickup, &loop->n);
- wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
- UNLOCK_DOMAIN(resource, birdloop_domain);
+ loop->event = (event) { .hook = birdloop_run, .data = loop, };
+ loop->timer = (timer) { .hook = birdloop_run_timer, .data = loop, };
+
+ if (request_pickup)
+ {
+ LOCK_DOMAIN(resource, birdloop_domain);
+ add_tail(&birdloop_pickup, &loop->n);
+ wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
+ UNLOCK_DOMAIN(resource, birdloop_domain);
+ }
+ else
+ loop->n.next = loop->n.prev = &loop->n;
birdloop_leave(loop);
return loop;
}
+struct birdloop *
+birdloop_new(pool *pp, uint order, const char *name)
+{
+ return birdloop_new_internal(pp, order, name, 1);
+}
+
static void
birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
{
loop->stopped = stopped;
loop->stop_data = data;
- if (loop->thread)
- {
- atomic_store_explicit(&loop->thread->run_cleanup, 1, memory_order_release);
- wakeup_do_kick(loop->thread);
- }
+
+ birdloop_do_ping(loop);
}
void
@@ -961,6 +1136,7 @@ birdloop_leave_locked(struct birdloop *loop)
/* Send pending pings */
if (loop->ping_pending)
{
+ LOOP_TRACE(loop, "sending pings on leave");
loop->ping_pending = 0;
birdloop_do_ping(loop);
}
diff --git a/sysdep/unix/io-loop.h b/sysdep/unix/io-loop.h
index e23a9be0..e606f07e 100644
--- a/sysdep/unix/io-loop.h
+++ b/sysdep/unix/io-loop.h
@@ -16,8 +16,15 @@ struct pipe
int fd[2];
};
+struct pfd {
+ BUFFER(struct pollfd) pfd;
+ BUFFER(struct birdloop *) loop;
+};
+
+void sockets_prepare(struct birdloop *, struct pfd *);
+
void pipe_new(struct pipe *);
-void pipe_pollin(struct pipe *, struct pollfd *);
+void pipe_pollin(struct pipe *, struct pfd *);
void pipe_drain(struct pipe *);
void pipe_kick(struct pipe *);
@@ -25,6 +32,9 @@ struct birdloop
{
node n;
+ event event;
+ timer timer;
+
pool *pool;
struct timeloop time;
@@ -36,6 +46,9 @@ struct birdloop
uint links;
+ _Atomic u32 thread_transition;
+#define LTT_PING 1
+#define LTT_MOVE 2
_Atomic u32 flags;
struct birdloop_flag_handler *flag_handler;
@@ -45,7 +58,6 @@ struct birdloop
struct birdloop *prev_loop;
struct bird_thread *thread;
- struct pollfd *pfd;
u64 total_time_spent_ns;
};
@@ -54,16 +66,13 @@ struct bird_thread
{
node n;
- struct pollfd *pfd;
- uint pfd_max;
-
- _Atomic u32 ping_sent;
- _Atomic u32 run_cleanup;
_Atomic u32 poll_changed;
struct pipe wakeup;
event_list priority_events;
+ struct birdloop *meta;
+
pthread_t thread_id;
pthread_attr_t thread_attr;
@@ -71,6 +80,7 @@ struct bird_thread
list loops;
pool *pool;
+ struct pfd *pfd;
event cleanup_event;
};
diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c
index 78908676..06797096 100644
--- a/sysdep/unix/io.c
+++ b/sysdep/unix/io.c
@@ -140,7 +140,7 @@ times_update(void)
if ((ts.tv_sec < 0) || (((u64) ts.tv_sec) > ((u64) 1 << 40)))
log(L_WARN "Monotonic clock is crazy");
-
+
btime new_time = ts.tv_sec S + ts.tv_nsec NS;
if (new_time < old_time)
@@ -722,7 +722,6 @@ sk_log_error(sock *s, const char *p)
* Actual struct birdsock code
*/
-static list sock_list;
static struct birdsock *current_sock;
static struct birdsock *stored_sock;
@@ -1026,7 +1025,7 @@ sk_setup(sock *s)
static void
sk_insert(sock *s)
{
- add_tail(&sock_list, &s->n);
+ add_tail(&main_birdloop.sock_list, &s->n);
}
static void
@@ -2049,7 +2048,7 @@ sk_dump_all(void)
sock *s;
debug("Open sockets:\n");
- WALK_LIST(n, sock_list)
+ WALK_LIST(n, main_birdloop.sock_list)
{
s = SKIP_BACK(sock, n, n);
debug("%p ", s);
@@ -2208,7 +2207,7 @@ watchdog_stop(void)
void
io_init(void)
{
- init_list(&sock_list);
+ init_list(&main_birdloop.sock_list);
ev_init_list(&global_event_list, &main_birdloop, "Global event list");
ev_init_list(&global_work_list, &main_birdloop, "Global work list");
ev_init_list(&main_birdloop.event_list, &main_birdloop, "Global fast event list");
@@ -2229,12 +2228,11 @@ void
io_loop(void)
{
int poll_tout, timeout;
- int nfds, events, pout;
+ int events, pout;
timer *t;
- sock *s;
- node *n;
- int fdmax = 256;
- struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd));
+ struct pfd pfd;
+ BUFFER_INIT(pfd.pfd, &root_pool, 16);
+ BUFFER_INIT(pfd.loop, &root_pool, 16);
watchdog_start1();
for(;;)
@@ -2255,39 +2253,11 @@ io_loop(void)
poll_tout = MIN(poll_tout, timeout);
}
- /* A hack to reload main io_loop() when something has changed asynchronously. */
- pipe_pollin(&main_birdloop.thread->wakeup, &pfd[0]);
-
- nfds = 1;
+ BUFFER_FLUSH(pfd.pfd);
+ BUFFER_FLUSH(pfd.loop);
- WALK_LIST(n, sock_list)
- {
- pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */
- s = SKIP_BACK(sock, n, n);
- if (s->rx_hook)
- {
- pfd[nfds].fd = s->fd;
- pfd[nfds].events |= POLLIN;
- }
- if (s->tx_hook && s->ttx != s->tpos)
- {
- pfd[nfds].fd = s->fd;
- pfd[nfds].events |= POLLOUT;
- }
- if (pfd[nfds].fd != -1)
- {
- s->index = nfds;
- nfds++;
- }
- else
- s->index = -1;
-
- if (nfds >= fdmax)
- {
- fdmax *= 2;
- pfd = xrealloc(pfd, fdmax * sizeof(struct pollfd));
- }
- }
+ pipe_pollin(&main_birdloop.thread->wakeup, &pfd);
+ sockets_prepare(&main_birdloop, &pfd);
/*
* Yes, this is racy. But even if the signal comes before this test
@@ -2319,7 +2289,7 @@ io_loop(void)
/* And finally enter poll() to find active sockets */
watchdog_stop();
birdloop_leave(&main_birdloop);
- pout = poll(pfd, nfds, poll_tout);
+ pout = poll(pfd.pfd.data, pfd.pfd.used, poll_tout);
birdloop_enter(&main_birdloop);
watchdog_start();
@@ -2331,18 +2301,18 @@ io_loop(void)
}
if (pout)
{
- if (pfd[0].revents & POLLIN)
+ if (pfd.pfd.data[0].revents & POLLIN)
{
/* IO loop reload requested */
pipe_drain(&main_birdloop.thread->wakeup);
- atomic_exchange_explicit(&main_birdloop.thread->ping_sent, 0, memory_order_acq_rel);
+ atomic_fetch_and_explicit(&main_birdloop.thread_transition, ~LTT_PING, memory_order_acq_rel);
continue;
}
times_update();
/* guaranteed to be non-empty */
- current_sock = SKIP_BACK(sock, n, HEAD(sock_list));
+ current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
while (current_sock)
{
@@ -2357,19 +2327,19 @@ io_loop(void)
int steps;
steps = MAX_STEPS;
- if (s->fast_rx && (pfd[s->index].revents & POLLIN) && s->rx_hook)
+ if (s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook)
do
{
steps--;
io_log_event(s->rx_hook, s->data);
- e = sk_read(s, pfd[s->index].revents);
+ e = sk_read(s, pfd.pfd.data[s->index].revents);
if (s != current_sock)
goto next;
}
while (e && s->rx_hook && steps);
steps = MAX_STEPS;
- if (pfd[s->index].revents & POLLOUT)
+ if (pfd.pfd.data[s->index].revents & POLLOUT)
do
{
steps--;
@@ -2392,7 +2362,7 @@ io_loop(void)
int count = 0;
current_sock = stored_sock;
if (current_sock == NULL)
- current_sock = SKIP_BACK(sock, n, HEAD(sock_list));
+ current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
while (current_sock && count < MAX_RX_STEPS)
{
@@ -2403,18 +2373,18 @@ io_loop(void)
goto next2;
}
- if (!s->fast_rx && (pfd[s->index].revents & POLLIN) && s->rx_hook)
+ if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook)
{
count++;
io_log_event(s->rx_hook, s->data);
- sk_read(s, pfd[s->index].revents);
+ sk_read(s, pfd.pfd.data[s->index].revents);
if (s != current_sock)
goto next2;
}
- if (pfd[s->index].revents & (POLLHUP | POLLERR))
+ if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR))
{
- sk_err(s, pfd[s->index].revents);
+ sk_err(s, pfd.pfd.data[s->index].revents);
if (s != current_sock)
goto next2;
}