diff options
author | Maria Matejka <mq@ucw.cz> | 2022-06-24 19:53:34 +0200 |
---|---|---|
committer | Maria Matejka <mq@ucw.cz> | 2022-07-18 13:28:35 +0200 |
commit | e91754f5b9ef68d52f5ff3abcd24661034a0feed (patch) | |
tree | 390d7d4fcdbaeb726fe1c74b192a0a5971ea93e5 | |
parent | 08c84846089a131a0e7e9e0185b6c6ccb4ed4e2d (diff) |
Event lists rewritten to a single linked list
In multithreaded environment, we need to pass messages between workers.
This is done by queuing events to their respective queues. The
double-linked list is not really useful for that as it needs locking
everywhere.
This commit rewrites the event subsystem to use a single-linked list
where events are enqueued by a single atomic instruction and the queue
is processed after atomically moving the whole queue aside.
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | lib/event.c | 221 | ||||
-rw-r--r-- | lib/event.h | 55 | ||||
-rw-r--r-- | lib/io-loop.h | 4 | ||||
-rw-r--r-- | lib/locking.h | 1 | ||||
-rw-r--r-- | nest/proto.c | 3 |
6 files changed, 155 insertions, 131 deletions
diff --git a/configure.ac b/configure.ac index 5e5f2c4e..330add87 100644 --- a/configure.ac +++ b/configure.ac @@ -337,7 +337,7 @@ case $sysdesc in ;; esac -AC_CHECK_HEADERS_ONCE([alloca.h syslog.h]) +AC_CHECK_HEADERS_ONCE([alloca.h syslog.h stdatomic.h]) AC_CHECK_HEADER([sys/mman.h], [AC_DEFINE([HAVE_MMAP], [1], [Define to 1 if mmap() is available.])], have_mman=no) AC_CHECK_FUNC([aligned_alloc], [AC_DEFINE([HAVE_ALIGNED_ALLOC], [1], [Define to 1 if aligned_alloc() is available.])], have_aligned_alloc=no) AC_CHECK_MEMBERS([struct sockaddr.sa_len], [], [], [#include <sys/socket.h>]) diff --git a/lib/event.c b/lib/event.c index b0abd1d0..7effc315 100644 --- a/lib/event.c +++ b/lib/event.c @@ -23,27 +23,91 @@ #include "nest/bird.h" #include "lib/event.h" -#include "lib/locking.h" #include "lib/io-loop.h" -extern _Thread_local struct coroutine *this_coro; - event_list global_event_list; event_list global_work_list; +STATIC_ASSERT(OFFSETOF(event_list, _sentinel.next) >= OFFSETOF(event_list, _end[0])); + +void +ev_init_list(event_list *el, struct birdloop *loop, const char *name) +{ + el->name = name; + el->loop = loop; + + atomic_store_explicit(&el->receiver, &el->_sentinel, memory_order_relaxed); + atomic_store_explicit(&el->_executor, &el->_sentinel, memory_order_relaxed); + atomic_store_explicit(&el->_sentinel.next, NULL, memory_order_relaxed); +} + +/* + * The event list should work as a message passing point. Sending a message + * must be a fairly fast process with no locks and low waiting times. OTOH, + * processing messages always involves running the assigned code and the + * receiver is always a single one thread with no concurrency at all. There is + * also a postponing requirement to synchronously remove an event from a queue, + * yet we allow this only when the caller has its receiver event loop locked. + * It still means that the event may get postponed from other event in the same + * list, therefore we have to be careful. + */ + +static inline int +ev_remove_from(event *e, event * _Atomic * head) +{ + /* The head pointer stores where cur is pointed to from */ + event * _Atomic *prev = head; + + /* The current event in queue to check */ + event *cur = atomic_load_explicit(prev, memory_order_acquire); + + /* Pre-loaded next pointer; if NULL, this is sentinel */ + event *next = atomic_load_explicit(&cur->next, memory_order_acquire); + + while (next) + { + if (e == cur) + { + /* Check whether we have collided with somebody else + * adding an item to the queue. */ + if (!atomic_compare_exchange_strong_explicit( + prev, &cur, next, + memory_order_acq_rel, memory_order_acquire)) + { + /* This may happen only on list head */ + ASSERT_DIE(prev == head); + + /* Restart. The collision should never happen again. */ + return ev_remove_from(e, head); + } + + /* Successfully removed from the list; inactivate this event. */ + atomic_store_explicit(&cur->next, NULL, memory_order_release); + return 1; + } + + /* Go to the next event. */ + prev = &cur->next; + cur = next; + next = atomic_load_explicit(&cur->next, memory_order_acquire); + } + + return 0; +} + inline void ev_postpone(event *e) { - event_list *el = e->list; - if (!el) + /* Find the list to remove the event from */ + event_list *sl = ev_get_list(e); + if (!sl) return; - ASSERT_DIE(birdloop_inside(el->loop)); + /* Postponing allowed only from the target loop */ + ASSERT_DIE(birdloop_inside(sl->loop)); - LOCK_DOMAIN(event, el->lock); - if (ev_active(e)) - rem_node(&e->n); - UNLOCK_DOMAIN(event, el->lock); + /* Remove from one of these lists. */ + ASSERT(ev_remove_from(e, &sl->_executor) || ev_remove_from(e, &sl->receiver)); } static void @@ -54,7 +118,7 @@ ev_dump(resource *r) debug("(code %p, data %p, %s)\n", e->hook, e->data, - e->n.next ? "scheduled" : "inactive"); + atomic_load_explicit(&e->next, memory_order_relaxed) ? "scheduled" : "inactive"); } static struct resclass ev_class = { @@ -108,23 +172,17 @@ ev_run(event *e) inline void ev_send(event_list *l, event *e) { - DBG("ev_send(%p, %p)\n", l, e); - ASSERT_DIE(e->hook); - ASSERT_DIE(!e->list || (e->list == l) || (e->list->loop == l->loop)); - - e->list = l; - - LOCK_DOMAIN(event, l->lock); - if (enlisted(&e->n)) - { - UNLOCK_DOMAIN(event, l->lock); + event_list *sl = ev_get_list(e); + if (sl == l) return; - } - - add_tail(&l->events, &e->n); - UNLOCK_DOMAIN(event, l->lock); - - birdloop_ping(l->loop); + if (sl) + bug("Queuing an already queued event to another queue is not supported."); + + event *next = atomic_load_explicit(&l->receiver, memory_order_acquire); + do atomic_store_explicit(&e->next, next, memory_order_relaxed); + while (!atomic_compare_exchange_strong_explicit( + &l->receiver, &next, e, + memory_order_acq_rel, memory_order_acquire)); } void io_log_event(void *hook, void *data); @@ -136,93 +194,56 @@ void io_log_event(void *hook, void *data); * This function calls ev_run() for all events enqueued in the list @l. */ int -ev_run_list(event_list *l) +ev_run_list_limited(event_list *l, uint limit) { - const _Bool legacy = LEGACY_EVENT_LIST(l); - - if (legacy) - ASSERT_THE_BIRD_LOCKED; + event * _Atomic *ep = &l->_executor; - node *n; + /* No pending events, refill the queue. */ + if (atomic_load_explicit(ep, memory_order_relaxed) == &l->_sentinel) + { + /* Move the current event list aside and create a new one. */ + event *received = atomic_exchange_explicit( + &l->receiver, &l->_sentinel, memory_order_acq_rel); - list tmp_list; - init_list(&tmp_list); + /* No event to run. */ + if (received == &l->_sentinel) + return 0; - /* Move the event list contents to a local list to avoid executing repeatedly added events */ - LOCK_DOMAIN(event, l->lock); - add_tail_list(&tmp_list, &l->events); - init_list(&l->events); - UNLOCK_DOMAIN(event, l->lock); + /* Setup the executor queue */ + event *head = &l->_sentinel; - WALK_LIST_FIRST(n, tmp_list) + /* Flip the order of the events by relinking them one by one (push-pop) */ + while (received != &l->_sentinel) { - event *e = SKIP_BACK(event, n, n); - - if (legacy) - { - /* The legacy way of event execution */ - io_log_event(e->hook, e->data); - ev_postpone(e); - e->hook(e->data); - } - else - { - // io_log_event(e->hook, e->data); /* TODO: add support for event logging in other io loops */ - ASSERT_DIE(e->list == l); - LOCK_DOMAIN(event, l->lock); - rem_node(&e->n); - UNLOCK_DOMAIN(event, l->lock); - e->hook(e->data); - } - tmp_flush(); + event *cur = received; + received = atomic_exchange_explicit(&cur->next, head, memory_order_relaxed); + head = cur; } - LOCK_DOMAIN(event, l->lock); - int repeat = ! EMPTY_LIST(l->events); - UNLOCK_DOMAIN(event, l->lock); - return repeat; -} - -int -ev_run_list_limited(event_list *l, uint limit) -{ - ASSERT_DIE(LEGACY_EVENT_LIST(l)); - ASSERT_THE_BIRD_LOCKED; - - node *n; - list tmp_list; - - LOCK_DOMAIN(event, l->lock); - init_list(&tmp_list); - add_tail_list(&tmp_list, &l->events); - init_list(&l->events); - UNLOCK_DOMAIN(event, l->lock); + /* Store the executor queue to its designated place */ + atomic_store_explicit(ep, head, memory_order_relaxed); + } - WALK_LIST_FIRST(n, tmp_list) + /* Run the events in order. */ + event *e; + while ((e = atomic_load_explicit(ep, memory_order_relaxed)) != &l->_sentinel) { - event *e = SKIP_BACK(event, n, n); + /* Check limit */ + if (!--limit) + return 1; - if (!limit) - break; + /* This is ugly hack, we want to log just events executed from the main I/O loop */ + if ((l == &global_event_list) || (l == &global_work_list)) + io_log_event(e->hook, e->data); - io_log_event(e->hook, e->data); + /* Inactivate the event */ + atomic_store_explicit(ep, atomic_load_explicit(&e->next, memory_order_relaxed), memory_order_relaxed); + atomic_store_explicit(&e->next, NULL, memory_order_relaxed); - ev_run(e); + /* Run the event */ + e->hook(e->data); tmp_flush(); - limit--; } - LOCK_DOMAIN(event, l->lock); - if (!EMPTY_LIST(tmp_list)) - { - /* Attach new items after the unprocessed old items */ - add_tail_list(&tmp_list, &l->events); - init_list(&l->events); - add_tail_list(&l->events, &tmp_list); - } - - int repeat = ! EMPTY_LIST(l->events); - UNLOCK_DOMAIN(event, l->lock); - - return repeat; + return atomic_load_explicit(&l->receiver, memory_order_relaxed) != &l->_sentinel; } diff --git a/lib/event.h b/lib/event.h index 6c358f84..9773c3a9 100644 --- a/lib/event.h +++ b/lib/event.h @@ -14,21 +14,24 @@ #include <stdatomic.h> -DEFINE_DOMAIN(event); +struct birdloop; typedef struct event { resource r; void (*hook)(void *); void *data; - node n; /* Internal link */ - struct event_list *list; /* List where this event is put in */ + struct event * _Atomic next; } event; -typedef struct event_list { - list events; - pool *pool; - struct birdloop *loop; - DOMAIN(event) lock; +typedef union event_list { + struct { + event * _Atomic receiver; /* Event receive list */ + event * _Atomic _executor; /* Event execute list */ + const char *name; + struct birdloop *loop; /* The executor loop */ + char _end[0]; + }; + event _sentinel; /* Sentinel node to actively detect list end */ } event_list; extern event_list global_event_list; @@ -36,36 +39,40 @@ extern event_list global_work_list; event *ev_new(pool *); void ev_run(event *); - -static inline void ev_init_list(event_list *el, struct birdloop *loop, const char *name) -{ - init_list(&el->events); - el->loop = loop; - el->lock = DOMAIN_NEW(event, name); -} - -void ev_send(event_list *, event *); +void ev_init_list(event_list *, struct birdloop *loop, const char *name); +void ev_enqueue(event_list *, event *); +#define ev_send ev_enqueue #define ev_send_loop(l, e) ev_send(birdloop_event_list((l)), (e)) #define ev_schedule(e) ({ ASSERT_THE_BIRD_LOCKED; if (!ev_active((e))) ev_send(&global_event_list, (e)); }) #define ev_schedule_work(e) ({ ASSERT_THE_BIRD_LOCKED; if (!ev_active((e))) ev_send(&global_work_list, (e)); }) void ev_postpone(event *); -int ev_run_list(event_list *); int ev_run_list_limited(event_list *, uint); +#define ev_run_list(l) ev_run_list_limited((l), ~0) #define LEGACY_EVENT_LIST(l) (((l) == &global_event_list) || ((l) == &global_work_list)) -_Bool birdloop_inside(struct birdloop *loop); - static inline int ev_active(event *e) { - if (e->list == NULL) - return 0; + return atomic_load_explicit(&e->next, memory_order_relaxed) != NULL; +} - ASSERT_DIE(birdloop_inside(e->list->loop)); - return enlisted(&e->n); +static inline event_list * +ev_get_list(event *e) +{ + /* We are looking for the sentinel node at the list end. + * After this, we have s->next == NULL */ + event *s = e; + for (event *sn; sn = atomic_load_explicit(&s->next, memory_order_acquire); s = sn) + ; + + /* No sentinel, no list. */ + if (s == e) + return NULL; + else + return SKIP_BACK(event_list, _sentinel, s); } static inline event* diff --git a/lib/io-loop.h b/lib/io-loop.h index 25f1b2a3..dec7d040 100644 --- a/lib/io-loop.h +++ b/lib/io-loop.h @@ -14,12 +14,12 @@ #include "lib/event.h" #include "lib/socket.h" +extern struct birdloop main_birdloop; + void sk_start(sock *s); void sk_stop(sock *s); void sk_reloop(sock *s, struct birdloop *loop); -extern struct birdloop main_birdloop; - /* Start a new birdloop owned by given pool and domain */ struct birdloop *birdloop_new(pool *p, uint order, const char *name); diff --git a/lib/locking.h b/lib/locking.h index ab5c06af..8ea1c968 100644 --- a/lib/locking.h +++ b/lib/locking.h @@ -16,7 +16,6 @@ struct lock_order { struct domain_generic *the_bird; struct domain_generic *proto; struct domain_generic *rtable; - struct domain_generic *event; }; extern _Thread_local struct lock_order locking_stack; diff --git a/nest/proto.c b/nest/proto.c index e9bced3b..17cd08f7 100644 --- a/nest/proto.c +++ b/nest/proto.c @@ -1072,7 +1072,6 @@ proto_loop_stopped(void *ptr) birdloop_enter(&main_birdloop); p->loop = &main_birdloop; - p->event->list = NULL; proto_cleanup(p); birdloop_leave(&main_birdloop); @@ -1163,8 +1162,6 @@ proto_start(struct proto *p) if (p->cf->loop_order != DOMAIN_ORDER(the_bird)) p->loop = birdloop_new(p->pool, p->cf->loop_order, p->pool->name); - p->event->list = proto_event_list(p); - PROTO_LOCKED_FROM_MAIN(p) proto_notify_state(p, (p->proto->start ? p->proto->start(p) : PS_UP)); } |