diff options
-rw-r--r-- | lib/locking.h | 1 | ||||
-rw-r--r-- | nest/proto.c | 1 | ||||
-rw-r--r-- | nest/rt-table.c | 1 | ||||
-rw-r--r-- | proto/bgp/bgp.c | 8 | ||||
-rw-r--r-- | sysdep/unix/io-loop.c | 514 | ||||
-rw-r--r-- | sysdep/unix/io-loop.h | 24 | ||||
-rw-r--r-- | sysdep/unix/io.c | 78 |
7 files changed, 396 insertions, 231 deletions
diff --git a/lib/locking.h b/lib/locking.h index 7e014bd0..6bed14bf 100644 --- a/lib/locking.h +++ b/lib/locking.h @@ -13,6 +13,7 @@ struct domain_generic; /* Here define the global lock order; first to last. */ struct lock_order { + struct domain_generic *meta; struct domain_generic *the_bird; struct domain_generic *control; struct domain_generic *proto; diff --git a/nest/proto.c b/nest/proto.c index e02e232d..21460e56 100644 --- a/nest/proto.c +++ b/nest/proto.c @@ -1131,6 +1131,7 @@ proto_loop_stopped(void *ptr) birdloop_enter(&main_birdloop); + birdloop_free(p->loop); p->loop = &main_birdloop; proto_cleanup(p); diff --git a/nest/rt-table.c b/nest/rt-table.c index 6be47470..54aa90a6 100644 --- a/nest/rt-table.c +++ b/nest/rt-table.c @@ -4073,6 +4073,7 @@ rt_delete(void *tab_) RT_UNLOCK(RT_PUB(tab)); + birdloop_free(tab->loop); rfree(tab->rp); config_del_obstacle(conf); diff --git a/proto/bgp/bgp.c b/proto/bgp/bgp.c index cca1283c..122b0c22 100644 --- a/proto/bgp/bgp.c +++ b/proto/bgp/bgp.c @@ -217,6 +217,8 @@ bgp_open(struct bgp_proto *p) req->port = p->cf->local_port; req->flags = p->cf->free_bind ? SKF_FREEBIND : 0; + BGP_TRACE(D_EVENTS, "Requesting listen socket at %I%J port %u", req->addr, req->iface, req->port); + add_tail(&bgp_listen_pending, &req->n); ev_schedule(&bgp_listen_event); } @@ -243,7 +245,9 @@ bgp_listen_create(void *_ UNUSED) break; /* Not found any */ - if (!NODE_VALID(bs)) + if (NODE_VALID(bs)) + BGP_TRACE(D_EVENTS, "Found a listening socket: %p", bs); + else { sock *sk = sk_new(proto_pool); sk->type = SK_TCP_PASSIVE; @@ -275,6 +279,8 @@ bgp_listen_create(void *_ UNUSED) init_list(&bs->requests); add_tail(&bgp_sockets, &bs->n); + + BGP_TRACE(D_EVENTS, "Created new listening socket: %p", bs); } add_tail(&bs->requests, &req->n); diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index cc2e0523..df162c6e 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -31,6 +31,8 @@ #define THREAD_STACK_SIZE 65536 /* To be lowered in near future */ +static struct birdloop *birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup); + /* * Nanosecond time for accounting purposes * @@ -65,6 +67,9 @@ _Thread_local struct birdloop *birdloop_current; static _Thread_local struct birdloop *birdloop_wakeup_masked; static _Thread_local uint birdloop_wakeup_masked_count; +#define LOOP_TRACE(loop, fmt, args...) do { if (config && config->latency_debug) log(L_TRACE "%s (%p): " fmt, domain_name((loop)->time.domain), (loop), ##args); } while (0) +#define THREAD_TRACE(...) do { if (config && config->latency_debug) log(L_TRACE "Thread: " __VA_ARGS__); } while (0) + event_list * birdloop_event_list(struct birdloop *loop) { @@ -190,11 +195,13 @@ pipe_kick(struct pipe *p) } void -pipe_pollin(struct pipe *p, struct pollfd *pfd) +pipe_pollin(struct pipe *p, struct pfd *pfd) { - pfd->fd = p->fd[0]; - pfd->events = POLLIN; - pfd->revents = 0; + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = p->fd[0], + .events = POLLIN, + }; + BUFFER_PUSH(pfd->loop) = NULL; } static inline void @@ -215,28 +222,80 @@ wakeup_do_kick(struct bird_thread *loop) pipe_kick(&loop->wakeup); } -static inline void -birdloop_do_ping(struct birdloop *loop) +static inline _Bool +birdloop_try_ping(struct birdloop *loop, u32 ltt) { - if (!loop->thread) - return; + /* Somebody else is already pinging, be idempotent */ + if (ltt & LTT_PING) + { + LOOP_TRACE(loop, "already being pinged"); + return 0; + } - if (atomic_fetch_add_explicit(&loop->thread->ping_sent, 1, memory_order_acq_rel)) - return; + /* Thread moving is an implicit ping */ + if (ltt & LTT_MOVE) + { + LOOP_TRACE(loop, "ping while moving"); + return 1; + } + + /* No more flags allowed */ + ASSERT_DIE(!ltt); + /* No ping when not picked up */ + if (!loop->thread) + { + LOOP_TRACE(loop, "not picked up yet, can't ping"); + return 1; + } + + /* No ping when masked */ if (loop == birdloop_wakeup_masked) + { + LOOP_TRACE(loop, "wakeup masked, can't ping"); birdloop_wakeup_masked_count++; - else - wakeup_do_kick(loop->thread); + return 1; + } + + /* Send meta event to ping */ + if ((loop != loop->thread->meta) && (loop != &main_birdloop)) + { + LOOP_TRACE(loop, "Ping by meta event to %p", loop->thread->meta); + ev_send_loop(loop->thread->meta, &loop->event); + return 1; + } + + /* Do the real ping */ + LOOP_TRACE(loop, "sending pipe ping"); + wakeup_do_kick(loop->thread); + return 0; +} + +static inline void +birdloop_do_ping(struct birdloop *loop) +{ + /* Register our ping effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_PING, memory_order_acq_rel); + + /* Try to ping in multiple ways */ + if (birdloop_try_ping(loop, ltt)) + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_PING, memory_order_acq_rel); } void birdloop_ping(struct birdloop *loop) { - if (birdloop_inside(loop) && !loop->ping_pending) - loop->ping_pending++; - else + if (!birdloop_inside(loop)) + { + LOOP_TRACE(loop, "ping from outside"); birdloop_do_ping(loop); + } + else + { + LOOP_TRACE(loop, "ping from inside, pending=%d", loop->ping_pending); + if (!loop->ping_pending) + loop->ping_pending++; + } } @@ -254,6 +313,7 @@ sockets_init(struct birdloop *loop) static void sockets_add(struct birdloop *loop, sock *s) { + LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num); add_tail(&loop->sock_list, &s->n); loop->sock_num++; @@ -278,6 +338,8 @@ sockets_remove(struct birdloop *loop, sock *s) return; /* Decouple the socket from the loop at all. */ + LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num); + rem_node(&s->n); loop->sock_num--; if (loop->thread) @@ -302,30 +364,30 @@ sk_stop(sock *s) static inline uint sk_want_events(sock *s) { return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } -static struct pollfd * -sockets_prepare(struct birdloop *loop, struct pollfd *pfd, struct pollfd *end) +void +sockets_prepare(struct birdloop *loop, struct pfd *pfd) { node *n; - loop->pfd = pfd; - WALK_LIST(n, loop->sock_list) { sock *s = SKIP_BACK(sock, n, n); + uint w = sk_want_events(s); - /* Out of space for pfds. Force reallocation. */ - if (pfd >= end) - return NULL; - - s->index = pfd - loop->pfd; + if (!w) + { + s->index = -1; + continue; + } - pfd->fd = s->fd; - pfd->events = sk_want_events(s); - pfd->revents = 0; + s->index = pfd->pfd.used; + LOOP_TRACE(loop, "socket %p poll index is %d", s, s->index); - pfd++; + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = s->fd, + .events = sk_want_events(s), + }; + BUFFER_PUSH(pfd->loop) = loop; } - - return pfd; } int sk_read(sock *s, int revents); @@ -334,10 +396,9 @@ int sk_write(sock *s); static void sockets_fire(struct birdloop *loop) { - struct pollfd *pfd = loop->pfd; - times_update(); + struct pollfd *pfd = loop->thread->pfd->pfd.data; sock *s; node *n, *nxt; WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n) { @@ -378,6 +439,99 @@ static list bird_thread_pickup; static _Thread_local struct bird_thread *this_thread; +static void +birdloop_set_thread(struct birdloop *loop, struct bird_thread *thr) +{ + /* Signal our moving effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_MOVE, memory_order_acq_rel); + ASSERT_DIE((ltt & LTT_MOVE) == 0); + + while (ltt & LTT_PING) + { + birdloop_yield(); + ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + ASSERT_DIE(ltt & LTT_MOVE); + } + /* Now we are free of running pings */ + + if (loop->thread = thr) + add_tail(&thr->loops, &loop->n); + else + { + LOCK_DOMAIN(resource, birdloop_domain); + add_tail(&birdloop_pickup, &loop->n); + UNLOCK_DOMAIN(resource, birdloop_domain); + } + + /* Finished */ + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel); + + /* Request to run by force */ + ev_send_loop(loop->thread->meta, &loop->event); +} + +static struct birdloop * +birdloop_take(void) +{ + struct birdloop *loop = NULL; + + LOCK_DOMAIN(resource, birdloop_domain); + if (!EMPTY_LIST(birdloop_pickup)) + { + /* Take the first loop from the pickup list and unlock */ + loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup)); + rem_node(&loop->n); + UNLOCK_DOMAIN(resource, birdloop_domain); + + birdloop_set_thread(loop, this_thread); + + /* This thread goes to the end of the pickup list */ + LOCK_DOMAIN(resource, birdloop_domain); + rem_node(&this_thread->n); + add_tail(&bird_thread_pickup, &this_thread->n); + + /* If there are more loops to be picked up, wakeup the next thread in order */ + if (!EMPTY_LIST(birdloop_pickup)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); + } + UNLOCK_DOMAIN(resource, birdloop_domain); + + return loop; +} + +static void +birdloop_drop(struct birdloop *loop) +{ + /* Remove loop from this thread's list */ + rem_node(&loop->n); + + /* Unset loop's thread */ + if (birdloop_inside(loop)) + birdloop_set_thread(loop, NULL); + else + { + birdloop_enter(loop); + birdloop_set_thread(loop, NULL); + birdloop_leave(loop); + } + + /* Put loop into pickup list */ + LOCK_DOMAIN(resource, birdloop_domain); + add_tail(&birdloop_pickup, &loop->n); + UNLOCK_DOMAIN(resource, birdloop_domain); +} + +static int +poll_timeout(struct birdloop *loop) +{ + timer *t = timers_first(&loop->time); + if (!t) + return -1; + + btime remains = tm_remains(t); + return remains TO_MS + ((remains TO_MS) MS < remains); +} + static void * bird_thread_main(void *arg) { @@ -389,103 +543,79 @@ bird_thread_main(void *arg) tmp_init(thr->pool); init_list(&thr->loops); - u32 refresh_sockets = 1; + thr->meta = birdloop_new_internal(thr->pool, DOMAIN_ORDER(meta), "Thread Meta", 0); + thr->meta->thread = thr; + birdloop_enter(thr->meta); - struct pollfd *pfd, *end; + u32 refresh_sockets = 1; + struct pfd pfd; + BUFFER_INIT(pfd.pfd, thr->pool, 16); + BUFFER_INIT(pfd.loop, thr->pool, 16); + thr->pfd = &pfd; while (1) { - /* Wakeup at least once a minute. */ - int timeout = 60000; + int timeout; /* Pickup new loops */ - LOCK_DOMAIN(resource, birdloop_domain); - if (!EMPTY_LIST(birdloop_pickup)) + struct birdloop *loop = birdloop_take(); + if (loop) { - struct birdloop *loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup)); - rem_node(&loop->n); - UNLOCK_DOMAIN(resource, birdloop_domain); - - add_tail(&thr->loops, &loop->n); - birdloop_enter(loop); - loop->thread = thr; if (!EMPTY_LIST(loop->sock_list)) refresh_sockets = 1; birdloop_leave(loop); + } - /* If there are more loops to be picked up, wakeup the next thread */ - LOCK_DOMAIN(resource, birdloop_domain); - rem_node(&thr->n); - add_tail(&bird_thread_pickup, &thr->n); + /* Schedule all loops with timed out timers */ + timers_fire(&thr->meta->time, 0); - if (!EMPTY_LIST(birdloop_pickup)) - wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); + /* Run all scheduled loops */ + int more_events = ev_run_list(&thr->meta->event_list); + if (more_events) + { + THREAD_TRACE("More events to run"); + timeout = 0; } - UNLOCK_DOMAIN(resource, birdloop_domain); - - struct birdloop *loop; node *nn; - WALK_LIST2(loop, nn, thr->loops, n) + else { - birdloop_enter(loop); - u64 after_enter = ns_now(); - - timer *t; - - times_update(); - timers_fire(&loop->time, 0); - int again = birdloop_process_flags(loop) + ev_run_list(&loop->event_list); - -#if 0 - if (loop->n.next->next) - __builtin_prefetch(SKIP_BACK(struct birdloop, n, loop->n.next)->time.domain); -#endif - - if (again) - timeout = MIN(0, timeout); - else if (t = timers_first(&loop->time)) - timeout = MIN(((tm_remains(t) TO_MS) + 1), timeout); - - u64 before_leave = ns_now(); - loop->total_time_spent_ns += (before_leave - after_enter); - birdloop_leave(loop); - - ev_run_list(&thr->priority_events); + timeout = poll_timeout(thr->meta); + if (timeout == -1) + THREAD_TRACE("No timers, no events"); + else + THREAD_TRACE("Next timer in %d ms", timeout); } - refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel); + /* Run priority events before sleeping */ + ev_run_list(&thr->priority_events); - if (!refresh_sockets && ((timeout < 0) || (timeout > 5000))) - flush_local_pages(); + /* Do we have to refresh sockets? */ + refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel); - while (refresh_sockets) + if (refresh_sockets) { -sock_retry:; - end = (pfd = thr->pfd) + thr->pfd_max; + BUFFER_FLUSH(pfd.pfd); + BUFFER_FLUSH(pfd.loop); - /* Add internal wakeup fd */ - pipe_pollin(&thr->wakeup, pfd); - pfd++; + pipe_pollin(&thr->wakeup, &pfd); + node *nn; WALK_LIST2(loop, nn, thr->loops, n) { birdloop_enter(loop); - pfd = sockets_prepare(loop, pfd, end); + sockets_prepare(loop, &pfd); birdloop_leave(loop); - - if (!pfd) - { - mb_free(thr->pfd); - thr->pfd = mb_alloc(thr->pool, sizeof(struct pollfd) * (thr->pfd_max *= 2)); - goto sock_retry; - } } + ASSERT_DIE(pfd.loop.used == pfd.pfd.used); refresh_sockets = 0; } + /* Nothing to do in at least 5 seconds, flush local hot page cache */ + else if (timeout > 5000) + flush_local_pages(); poll_retry:; - int rv = poll(thr->pfd, pfd - thr->pfd, timeout); + int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout); if (rv < 0) { if (errno == EINTR || errno == EAGAIN) @@ -494,51 +624,26 @@ poll_retry:; } /* Drain wakeup fd */ - if (thr->pfd[0].revents & POLLIN) + if (pfd.pfd.data[0].revents & POLLIN) { ASSERT_DIE(rv > 0); rv--; wakeup_drain(thr); } - atomic_exchange_explicit(&thr->ping_sent, 0, memory_order_acq_rel); + atomic_fetch_and_explicit(&thr->meta->thread_transition, ~LTT_PING, memory_order_acq_rel); - if (!rv && !atomic_exchange_explicit(&thr->run_cleanup, 0, memory_order_acq_rel)) - continue; - - /* Process stops and regular sockets */ - node *nxt; - WALK_LIST2_DELSAFE(loop, nn, nxt, thr->loops, n) - { - birdloop_enter(loop); - - if (loop->stopped) - { - /* Flush remaining events */ - ASSERT_DIE(!ev_run_list(&loop->event_list)); - - /* Drop timers */ - timer *t; - while (t = timers_first(&loop->time)) - tm_stop(t); - - /* No sockets allowed */ - ASSERT_DIE(EMPTY_LIST(loop->sock_list)); - - /* Declare loop stopped */ - rem_node(&loop->n); - birdloop_leave(loop); - loop->stopped(loop->stop_data); - - /* Birdloop already left */ - continue; - } - else if (rv) - sockets_fire(loop); - - birdloop_leave(loop); - } + /* Schedule loops with active sockets */ + if (rv) + for (uint i = 1; i < pfd.pfd.used; i++) + if (pfd.pfd.data[i].revents) + { + LOOP_TRACE(pfd.loop.data[i], "socket id %d got revents=%d", i, pfd.pfd.data[i].revents); + ev_send_loop(thr->meta, &pfd.loop.data[i]->event); + } } + + bug("An infinite loop has ended."); } static void @@ -563,11 +668,8 @@ bird_thread_start(void) struct bird_thread *thr = mb_allocz(p, sizeof(*thr)); thr->pool = p; - thr->pfd = mb_alloc(p, sizeof(struct pollfd) * (thr->pfd_max = 16)); thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, }; - atomic_store_explicit(&thr->ping_sent, 0, memory_order_relaxed); - wakeup_init(thr); ev_init_list(&thr->priority_events, NULL, "Thread direct event list"); @@ -615,7 +717,7 @@ bird_thread_shutdown(void * _ UNUSED) UNLOCK_DOMAIN(resource, birdloop_domain); - log(L_INFO "Thread pickup size differs from dropper goal by %d%s", dif, tdl_stop ? ", stopping" : ""); + DBG("Thread pickup size differs from dropper goal by %d%s\n", dif, tdl_stop ? ", stopping" : ""); if (tdl_stop) { @@ -628,29 +730,11 @@ bird_thread_shutdown(void * _ UNUSED) /* Leave the thread-picker list to get no more loops */ LOCK_DOMAIN(resource, birdloop_domain); rem_node(&thr->n); + UNLOCK_DOMAIN(resource, birdloop_domain); /* Drop loops including the thread dropper itself */ while (!EMPTY_LIST(thr->loops)) - { - /* Remove loop from this thread's list */ - struct birdloop *loop = HEAD(thr->loops); - rem_node(&loop->n); - UNLOCK_DOMAIN(resource, birdloop_domain); - - /* Unset loop's thread */ - if (birdloop_inside(loop)) - loop->thread = NULL; - else - { - birdloop_enter(loop); - loop->thread = NULL; - birdloop_leave(loop); - } - - /* Put loop into pickup list */ - LOCK_DOMAIN(resource, birdloop_domain); - add_tail(&birdloop_pickup, &loop->n); - } + birdloop_drop(HEAD(thr->loops)); /* Let others know about new loops */ if (!EMPTY_LIST(birdloop_pickup)) @@ -660,6 +744,11 @@ bird_thread_shutdown(void * _ UNUSED) /* Leave the thread-dropper loop as we aren't going to return. */ birdloop_leave(thread_dropper); + /* Stop the meta loop */ + birdloop_leave(thr->meta); + domain_free(thr->meta->time.domain); + rfree(thr->meta->pool); + /* Local pages not needed anymore */ flush_local_pages(); @@ -865,8 +954,81 @@ birdloop_init(void) birdloop_enter_locked(&main_birdloop); } -struct birdloop * -birdloop_new(pool *pp, uint order, const char *name) +static void +birdloop_stop_internal(struct birdloop *loop) +{ + /* Flush remaining events */ + ASSERT_DIE(!ev_run_list(&loop->event_list)); + + /* Drop timers */ + timer *t; + while (t = timers_first(&loop->time)) + tm_stop(t); + + /* No sockets allowed */ + ASSERT_DIE(EMPTY_LIST(loop->sock_list)); + + /* Unschedule from Meta */ + ev_postpone(&loop->event); + tm_stop(&loop->timer); + + /* Declare loop stopped */ + rem_node(&loop->n); + birdloop_leave(loop); + + /* Tail-call the stopped hook */ + loop->stopped(loop->stop_data); +} + +static void +birdloop_run(void *_loop) +{ + /* Run priority events before the loop is executed */ + ev_run_list(&this_thread->priority_events); + + struct birdloop *loop = _loop; + birdloop_enter(loop); + + if (loop->stopped) + /* Birdloop left inside the helper function */ + return birdloop_stop_internal(loop); + + /* Process sockets */ + sockets_fire(loop); + + /* Run timers */ + timers_fire(&loop->time, 0); + + /* Run flag handlers */ + if (birdloop_process_flags(loop)) + { + LOOP_TRACE(loop, "Flag processing needs another run"); + ev_send_loop(this_thread->meta, &loop->event); + } + + /* Run events */ + ev_run_list(&loop->event_list); + + /* Request meta timer */ + timer *t = timers_first(&loop->time); + if (t) + tm_start_in(&loop->timer, tm_remains(t), this_thread->meta); + else + tm_stop(&loop->timer); + + birdloop_leave(loop); +} + +static void +birdloop_run_timer(timer *tm) +{ + struct birdloop *loop = tm->data; + LOOP_TRACE(loop, "Timer ready, requesting run"); + ev_send_loop(loop->thread->meta, &loop->event); +} + +static struct birdloop * +birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup) { struct domain_generic *dg = domain_new(name, order); @@ -877,32 +1039,45 @@ birdloop_new(pool *pp, uint order, const char *name) loop->time.domain = dg; loop->time.loop = loop; + atomic_store_explicit(&loop->thread_transition, 0, memory_order_relaxed); + birdloop_enter(loop); ev_init_list(&loop->event_list, loop, name); timers_init(&loop->time, p); sockets_init(loop); - LOCK_DOMAIN(resource, birdloop_domain); - add_tail(&birdloop_pickup, &loop->n); - wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); - UNLOCK_DOMAIN(resource, birdloop_domain); + loop->event = (event) { .hook = birdloop_run, .data = loop, }; + loop->timer = (timer) { .hook = birdloop_run_timer, .data = loop, }; + + if (request_pickup) + { + LOCK_DOMAIN(resource, birdloop_domain); + add_tail(&birdloop_pickup, &loop->n); + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup))); + UNLOCK_DOMAIN(resource, birdloop_domain); + } + else + loop->n.next = loop->n.prev = &loop->n; birdloop_leave(loop); return loop; } +struct birdloop * +birdloop_new(pool *pp, uint order, const char *name) +{ + return birdloop_new_internal(pp, order, name, 1); +} + static void birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { loop->stopped = stopped; loop->stop_data = data; - if (loop->thread) - { - atomic_store_explicit(&loop->thread->run_cleanup, 1, memory_order_release); - wakeup_do_kick(loop->thread); - } + + birdloop_do_ping(loop); } void @@ -961,6 +1136,7 @@ birdloop_leave_locked(struct birdloop *loop) /* Send pending pings */ if (loop->ping_pending) { + LOOP_TRACE(loop, "sending pings on leave"); loop->ping_pending = 0; birdloop_do_ping(loop); } diff --git a/sysdep/unix/io-loop.h b/sysdep/unix/io-loop.h index e23a9be0..e606f07e 100644 --- a/sysdep/unix/io-loop.h +++ b/sysdep/unix/io-loop.h @@ -16,8 +16,15 @@ struct pipe int fd[2]; }; +struct pfd { + BUFFER(struct pollfd) pfd; + BUFFER(struct birdloop *) loop; +}; + +void sockets_prepare(struct birdloop *, struct pfd *); + void pipe_new(struct pipe *); -void pipe_pollin(struct pipe *, struct pollfd *); +void pipe_pollin(struct pipe *, struct pfd *); void pipe_drain(struct pipe *); void pipe_kick(struct pipe *); @@ -25,6 +32,9 @@ struct birdloop { node n; + event event; + timer timer; + pool *pool; struct timeloop time; @@ -36,6 +46,9 @@ struct birdloop uint links; + _Atomic u32 thread_transition; +#define LTT_PING 1 +#define LTT_MOVE 2 _Atomic u32 flags; struct birdloop_flag_handler *flag_handler; @@ -45,7 +58,6 @@ struct birdloop struct birdloop *prev_loop; struct bird_thread *thread; - struct pollfd *pfd; u64 total_time_spent_ns; }; @@ -54,16 +66,13 @@ struct bird_thread { node n; - struct pollfd *pfd; - uint pfd_max; - - _Atomic u32 ping_sent; - _Atomic u32 run_cleanup; _Atomic u32 poll_changed; struct pipe wakeup; event_list priority_events; + struct birdloop *meta; + pthread_t thread_id; pthread_attr_t thread_attr; @@ -71,6 +80,7 @@ struct bird_thread list loops; pool *pool; + struct pfd *pfd; event cleanup_event; }; diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index 78908676..06797096 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -140,7 +140,7 @@ times_update(void) if ((ts.tv_sec < 0) || (((u64) ts.tv_sec) > ((u64) 1 << 40))) log(L_WARN "Monotonic clock is crazy"); - + btime new_time = ts.tv_sec S + ts.tv_nsec NS; if (new_time < old_time) @@ -722,7 +722,6 @@ sk_log_error(sock *s, const char *p) * Actual struct birdsock code */ -static list sock_list; static struct birdsock *current_sock; static struct birdsock *stored_sock; @@ -1026,7 +1025,7 @@ sk_setup(sock *s) static void sk_insert(sock *s) { - add_tail(&sock_list, &s->n); + add_tail(&main_birdloop.sock_list, &s->n); } static void @@ -2049,7 +2048,7 @@ sk_dump_all(void) sock *s; debug("Open sockets:\n"); - WALK_LIST(n, sock_list) + WALK_LIST(n, main_birdloop.sock_list) { s = SKIP_BACK(sock, n, n); debug("%p ", s); @@ -2208,7 +2207,7 @@ watchdog_stop(void) void io_init(void) { - init_list(&sock_list); + init_list(&main_birdloop.sock_list); ev_init_list(&global_event_list, &main_birdloop, "Global event list"); ev_init_list(&global_work_list, &main_birdloop, "Global work list"); ev_init_list(&main_birdloop.event_list, &main_birdloop, "Global fast event list"); @@ -2229,12 +2228,11 @@ void io_loop(void) { int poll_tout, timeout; - int nfds, events, pout; + int events, pout; timer *t; - sock *s; - node *n; - int fdmax = 256; - struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd)); + struct pfd pfd; + BUFFER_INIT(pfd.pfd, &root_pool, 16); + BUFFER_INIT(pfd.loop, &root_pool, 16); watchdog_start1(); for(;;) @@ -2255,39 +2253,11 @@ io_loop(void) poll_tout = MIN(poll_tout, timeout); } - /* A hack to reload main io_loop() when something has changed asynchronously. */ - pipe_pollin(&main_birdloop.thread->wakeup, &pfd[0]); - - nfds = 1; + BUFFER_FLUSH(pfd.pfd); + BUFFER_FLUSH(pfd.loop); - WALK_LIST(n, sock_list) - { - pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */ - s = SKIP_BACK(sock, n, n); - if (s->rx_hook) - { - pfd[nfds].fd = s->fd; - pfd[nfds].events |= POLLIN; - } - if (s->tx_hook && s->ttx != s->tpos) - { - pfd[nfds].fd = s->fd; - pfd[nfds].events |= POLLOUT; - } - if (pfd[nfds].fd != -1) - { - s->index = nfds; - nfds++; - } - else - s->index = -1; - - if (nfds >= fdmax) - { - fdmax *= 2; - pfd = xrealloc(pfd, fdmax * sizeof(struct pollfd)); - } - } + pipe_pollin(&main_birdloop.thread->wakeup, &pfd); + sockets_prepare(&main_birdloop, &pfd); /* * Yes, this is racy. But even if the signal comes before this test @@ -2319,7 +2289,7 @@ io_loop(void) /* And finally enter poll() to find active sockets */ watchdog_stop(); birdloop_leave(&main_birdloop); - pout = poll(pfd, nfds, poll_tout); + pout = poll(pfd.pfd.data, pfd.pfd.used, poll_tout); birdloop_enter(&main_birdloop); watchdog_start(); @@ -2331,18 +2301,18 @@ io_loop(void) } if (pout) { - if (pfd[0].revents & POLLIN) + if (pfd.pfd.data[0].revents & POLLIN) { /* IO loop reload requested */ pipe_drain(&main_birdloop.thread->wakeup); - atomic_exchange_explicit(&main_birdloop.thread->ping_sent, 0, memory_order_acq_rel); + atomic_fetch_and_explicit(&main_birdloop.thread_transition, ~LTT_PING, memory_order_acq_rel); continue; } times_update(); /* guaranteed to be non-empty */ - current_sock = SKIP_BACK(sock, n, HEAD(sock_list)); + current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); while (current_sock) { @@ -2357,19 +2327,19 @@ io_loop(void) int steps; steps = MAX_STEPS; - if (s->fast_rx && (pfd[s->index].revents & POLLIN) && s->rx_hook) + if (s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook) do { steps--; io_log_event(s->rx_hook, s->data); - e = sk_read(s, pfd[s->index].revents); + e = sk_read(s, pfd.pfd.data[s->index].revents); if (s != current_sock) goto next; } while (e && s->rx_hook && steps); steps = MAX_STEPS; - if (pfd[s->index].revents & POLLOUT) + if (pfd.pfd.data[s->index].revents & POLLOUT) do { steps--; @@ -2392,7 +2362,7 @@ io_loop(void) int count = 0; current_sock = stored_sock; if (current_sock == NULL) - current_sock = SKIP_BACK(sock, n, HEAD(sock_list)); + current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); while (current_sock && count < MAX_RX_STEPS) { @@ -2403,18 +2373,18 @@ io_loop(void) goto next2; } - if (!s->fast_rx && (pfd[s->index].revents & POLLIN) && s->rx_hook) + if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook) { count++; io_log_event(s->rx_hook, s->data); - sk_read(s, pfd[s->index].revents); + sk_read(s, pfd.pfd.data[s->index].revents); if (s != current_sock) goto next2; } - if (pfd[s->index].revents & (POLLHUP | POLLERR)) + if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR)) { - sk_err(s, pfd[s->index].revents); + sk_err(s, pfd.pfd.data[s->index].revents); if (s != current_sock) goto next2; } |