summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMaria Matejka <mq@ucw.cz>2022-06-24 19:53:34 +0200
committerMaria Matejka <mq@ucw.cz>2022-07-18 13:28:35 +0200
commite91754f5b9ef68d52f5ff3abcd24661034a0feed (patch)
tree390d7d4fcdbaeb726fe1c74b192a0a5971ea93e5 /lib
parent08c84846089a131a0e7e9e0185b6c6ccb4ed4e2d (diff)
Event lists rewritten to a single linked list
In multithreaded environment, we need to pass messages between workers. This is done by queuing events to their respective queues. The double-linked list is not really useful for that as it needs locking everywhere. This commit rewrites the event subsystem to use a single-linked list where events are enqueued by a single atomic instruction and the queue is processed after atomically moving the whole queue aside.
Diffstat (limited to 'lib')
-rw-r--r--lib/event.c221
-rw-r--r--lib/event.h55
-rw-r--r--lib/io-loop.h4
-rw-r--r--lib/locking.h1
4 files changed, 154 insertions, 127 deletions
diff --git a/lib/event.c b/lib/event.c
index b0abd1d0..7effc315 100644
--- a/lib/event.c
+++ b/lib/event.c
@@ -23,27 +23,91 @@
#include "nest/bird.h"
#include "lib/event.h"
-#include "lib/locking.h"
#include "lib/io-loop.h"
-extern _Thread_local struct coroutine *this_coro;
-
event_list global_event_list;
event_list global_work_list;
+STATIC_ASSERT(OFFSETOF(event_list, _sentinel.next) >= OFFSETOF(event_list, _end[0]));
+
+void
+ev_init_list(event_list *el, struct birdloop *loop, const char *name)
+{
+ el->name = name;
+ el->loop = loop;
+
+ atomic_store_explicit(&el->receiver, &el->_sentinel, memory_order_relaxed);
+ atomic_store_explicit(&el->_executor, &el->_sentinel, memory_order_relaxed);
+ atomic_store_explicit(&el->_sentinel.next, NULL, memory_order_relaxed);
+}
+
+/*
+ * The event list should work as a message passing point. Sending a message
+ * must be a fairly fast process with no locks and low waiting times. OTOH,
+ * processing messages always involves running the assigned code and the
+ * receiver is always a single one thread with no concurrency at all. There is
+ * also a postponing requirement to synchronously remove an event from a queue,
+ * yet we allow this only when the caller has its receiver event loop locked.
+ * It still means that the event may get postponed from other event in the same
+ * list, therefore we have to be careful.
+ */
+
+static inline int
+ev_remove_from(event *e, event * _Atomic * head)
+{
+ /* The head pointer stores where cur is pointed to from */
+ event * _Atomic *prev = head;
+
+ /* The current event in queue to check */
+ event *cur = atomic_load_explicit(prev, memory_order_acquire);
+
+ /* Pre-loaded next pointer; if NULL, this is sentinel */
+ event *next = atomic_load_explicit(&cur->next, memory_order_acquire);
+
+ while (next)
+ {
+ if (e == cur)
+ {
+ /* Check whether we have collided with somebody else
+ * adding an item to the queue. */
+ if (!atomic_compare_exchange_strong_explicit(
+ prev, &cur, next,
+ memory_order_acq_rel, memory_order_acquire))
+ {
+ /* This may happen only on list head */
+ ASSERT_DIE(prev == head);
+
+ /* Restart. The collision should never happen again. */
+ return ev_remove_from(e, head);
+ }
+
+ /* Successfully removed from the list; inactivate this event. */
+ atomic_store_explicit(&cur->next, NULL, memory_order_release);
+ return 1;
+ }
+
+ /* Go to the next event. */
+ prev = &cur->next;
+ cur = next;
+ next = atomic_load_explicit(&cur->next, memory_order_acquire);
+ }
+
+ return 0;
+}
+
inline void
ev_postpone(event *e)
{
- event_list *el = e->list;
- if (!el)
+ /* Find the list to remove the event from */
+ event_list *sl = ev_get_list(e);
+ if (!sl)
return;
- ASSERT_DIE(birdloop_inside(el->loop));
+ /* Postponing allowed only from the target loop */
+ ASSERT_DIE(birdloop_inside(sl->loop));
- LOCK_DOMAIN(event, el->lock);
- if (ev_active(e))
- rem_node(&e->n);
- UNLOCK_DOMAIN(event, el->lock);
+ /* Remove from one of these lists. */
+ ASSERT(ev_remove_from(e, &sl->_executor) || ev_remove_from(e, &sl->receiver));
}
static void
@@ -54,7 +118,7 @@ ev_dump(resource *r)
debug("(code %p, data %p, %s)\n",
e->hook,
e->data,
- e->n.next ? "scheduled" : "inactive");
+ atomic_load_explicit(&e->next, memory_order_relaxed) ? "scheduled" : "inactive");
}
static struct resclass ev_class = {
@@ -108,23 +172,17 @@ ev_run(event *e)
inline void
ev_send(event_list *l, event *e)
{
- DBG("ev_send(%p, %p)\n", l, e);
- ASSERT_DIE(e->hook);
- ASSERT_DIE(!e->list || (e->list == l) || (e->list->loop == l->loop));
-
- e->list = l;
-
- LOCK_DOMAIN(event, l->lock);
- if (enlisted(&e->n))
- {
- UNLOCK_DOMAIN(event, l->lock);
+ event_list *sl = ev_get_list(e);
+ if (sl == l)
return;
- }
-
- add_tail(&l->events, &e->n);
- UNLOCK_DOMAIN(event, l->lock);
-
- birdloop_ping(l->loop);
+ if (sl)
+ bug("Queuing an already queued event to another queue is not supported.");
+
+ event *next = atomic_load_explicit(&l->receiver, memory_order_acquire);
+ do atomic_store_explicit(&e->next, next, memory_order_relaxed);
+ while (!atomic_compare_exchange_strong_explicit(
+ &l->receiver, &next, e,
+ memory_order_acq_rel, memory_order_acquire));
}
void io_log_event(void *hook, void *data);
@@ -136,93 +194,56 @@ void io_log_event(void *hook, void *data);
* This function calls ev_run() for all events enqueued in the list @l.
*/
int
-ev_run_list(event_list *l)
+ev_run_list_limited(event_list *l, uint limit)
{
- const _Bool legacy = LEGACY_EVENT_LIST(l);
-
- if (legacy)
- ASSERT_THE_BIRD_LOCKED;
+ event * _Atomic *ep = &l->_executor;
- node *n;
+ /* No pending events, refill the queue. */
+ if (atomic_load_explicit(ep, memory_order_relaxed) == &l->_sentinel)
+ {
+ /* Move the current event list aside and create a new one. */
+ event *received = atomic_exchange_explicit(
+ &l->receiver, &l->_sentinel, memory_order_acq_rel);
- list tmp_list;
- init_list(&tmp_list);
+ /* No event to run. */
+ if (received == &l->_sentinel)
+ return 0;
- /* Move the event list contents to a local list to avoid executing repeatedly added events */
- LOCK_DOMAIN(event, l->lock);
- add_tail_list(&tmp_list, &l->events);
- init_list(&l->events);
- UNLOCK_DOMAIN(event, l->lock);
+ /* Setup the executor queue */
+ event *head = &l->_sentinel;
- WALK_LIST_FIRST(n, tmp_list)
+ /* Flip the order of the events by relinking them one by one (push-pop) */
+ while (received != &l->_sentinel)
{
- event *e = SKIP_BACK(event, n, n);
-
- if (legacy)
- {
- /* The legacy way of event execution */
- io_log_event(e->hook, e->data);
- ev_postpone(e);
- e->hook(e->data);
- }
- else
- {
- // io_log_event(e->hook, e->data); /* TODO: add support for event logging in other io loops */
- ASSERT_DIE(e->list == l);
- LOCK_DOMAIN(event, l->lock);
- rem_node(&e->n);
- UNLOCK_DOMAIN(event, l->lock);
- e->hook(e->data);
- }
- tmp_flush();
+ event *cur = received;
+ received = atomic_exchange_explicit(&cur->next, head, memory_order_relaxed);
+ head = cur;
}
- LOCK_DOMAIN(event, l->lock);
- int repeat = ! EMPTY_LIST(l->events);
- UNLOCK_DOMAIN(event, l->lock);
- return repeat;
-}
-
-int
-ev_run_list_limited(event_list *l, uint limit)
-{
- ASSERT_DIE(LEGACY_EVENT_LIST(l));
- ASSERT_THE_BIRD_LOCKED;
-
- node *n;
- list tmp_list;
-
- LOCK_DOMAIN(event, l->lock);
- init_list(&tmp_list);
- add_tail_list(&tmp_list, &l->events);
- init_list(&l->events);
- UNLOCK_DOMAIN(event, l->lock);
+ /* Store the executor queue to its designated place */
+ atomic_store_explicit(ep, head, memory_order_relaxed);
+ }
- WALK_LIST_FIRST(n, tmp_list)
+ /* Run the events in order. */
+ event *e;
+ while ((e = atomic_load_explicit(ep, memory_order_relaxed)) != &l->_sentinel)
{
- event *e = SKIP_BACK(event, n, n);
+ /* Check limit */
+ if (!--limit)
+ return 1;
- if (!limit)
- break;
+ /* This is ugly hack, we want to log just events executed from the main I/O loop */
+ if ((l == &global_event_list) || (l == &global_work_list))
+ io_log_event(e->hook, e->data);
- io_log_event(e->hook, e->data);
+ /* Inactivate the event */
+ atomic_store_explicit(ep, atomic_load_explicit(&e->next, memory_order_relaxed), memory_order_relaxed);
+ atomic_store_explicit(&e->next, NULL, memory_order_relaxed);
- ev_run(e);
+ /* Run the event */
+ e->hook(e->data);
tmp_flush();
- limit--;
}
- LOCK_DOMAIN(event, l->lock);
- if (!EMPTY_LIST(tmp_list))
- {
- /* Attach new items after the unprocessed old items */
- add_tail_list(&tmp_list, &l->events);
- init_list(&l->events);
- add_tail_list(&l->events, &tmp_list);
- }
-
- int repeat = ! EMPTY_LIST(l->events);
- UNLOCK_DOMAIN(event, l->lock);
-
- return repeat;
+ return atomic_load_explicit(&l->receiver, memory_order_relaxed) != &l->_sentinel;
}
diff --git a/lib/event.h b/lib/event.h
index 6c358f84..9773c3a9 100644
--- a/lib/event.h
+++ b/lib/event.h
@@ -14,21 +14,24 @@
#include <stdatomic.h>
-DEFINE_DOMAIN(event);
+struct birdloop;
typedef struct event {
resource r;
void (*hook)(void *);
void *data;
- node n; /* Internal link */
- struct event_list *list; /* List where this event is put in */
+ struct event * _Atomic next;
} event;
-typedef struct event_list {
- list events;
- pool *pool;
- struct birdloop *loop;
- DOMAIN(event) lock;
+typedef union event_list {
+ struct {
+ event * _Atomic receiver; /* Event receive list */
+ event * _Atomic _executor; /* Event execute list */
+ const char *name;
+ struct birdloop *loop; /* The executor loop */
+ char _end[0];
+ };
+ event _sentinel; /* Sentinel node to actively detect list end */
} event_list;
extern event_list global_event_list;
@@ -36,36 +39,40 @@ extern event_list global_work_list;
event *ev_new(pool *);
void ev_run(event *);
-
-static inline void ev_init_list(event_list *el, struct birdloop *loop, const char *name)
-{
- init_list(&el->events);
- el->loop = loop;
- el->lock = DOMAIN_NEW(event, name);
-}
-
-void ev_send(event_list *, event *);
+void ev_init_list(event_list *, struct birdloop *loop, const char *name);
+void ev_enqueue(event_list *, event *);
+#define ev_send ev_enqueue
#define ev_send_loop(l, e) ev_send(birdloop_event_list((l)), (e))
#define ev_schedule(e) ({ ASSERT_THE_BIRD_LOCKED; if (!ev_active((e))) ev_send(&global_event_list, (e)); })
#define ev_schedule_work(e) ({ ASSERT_THE_BIRD_LOCKED; if (!ev_active((e))) ev_send(&global_work_list, (e)); })
void ev_postpone(event *);
-int ev_run_list(event_list *);
int ev_run_list_limited(event_list *, uint);
+#define ev_run_list(l) ev_run_list_limited((l), ~0)
#define LEGACY_EVENT_LIST(l) (((l) == &global_event_list) || ((l) == &global_work_list))
-_Bool birdloop_inside(struct birdloop *loop);
-
static inline int
ev_active(event *e)
{
- if (e->list == NULL)
- return 0;
+ return atomic_load_explicit(&e->next, memory_order_relaxed) != NULL;
+}
- ASSERT_DIE(birdloop_inside(e->list->loop));
- return enlisted(&e->n);
+static inline event_list *
+ev_get_list(event *e)
+{
+ /* We are looking for the sentinel node at the list end.
+ * After this, we have s->next == NULL */
+ event *s = e;
+ for (event *sn; sn = atomic_load_explicit(&s->next, memory_order_acquire); s = sn)
+ ;
+
+ /* No sentinel, no list. */
+ if (s == e)
+ return NULL;
+ else
+ return SKIP_BACK(event_list, _sentinel, s);
}
static inline event*
diff --git a/lib/io-loop.h b/lib/io-loop.h
index 25f1b2a3..dec7d040 100644
--- a/lib/io-loop.h
+++ b/lib/io-loop.h
@@ -14,12 +14,12 @@
#include "lib/event.h"
#include "lib/socket.h"
+extern struct birdloop main_birdloop;
+
void sk_start(sock *s);
void sk_stop(sock *s);
void sk_reloop(sock *s, struct birdloop *loop);
-extern struct birdloop main_birdloop;
-
/* Start a new birdloop owned by given pool and domain */
struct birdloop *birdloop_new(pool *p, uint order, const char *name);
diff --git a/lib/locking.h b/lib/locking.h
index ab5c06af..8ea1c968 100644
--- a/lib/locking.h
+++ b/lib/locking.h
@@ -16,7 +16,6 @@ struct lock_order {
struct domain_generic *the_bird;
struct domain_generic *proto;
struct domain_generic *rtable;
- struct domain_generic *event;
};
extern _Thread_local struct lock_order locking_stack;