summaryrefslogtreecommitdiff
path: root/lib/event.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/event.c')
-rw-r--r--lib/event.c373
1 files changed, 208 insertions, 165 deletions
diff --git a/lib/event.c b/lib/event.c
index 10f83c28..68888a4c 100644
--- a/lib/event.c
+++ b/lib/event.c
@@ -23,38 +23,159 @@
#include "nest/bird.h"
#include "lib/event.h"
-#include "lib/locking.h"
#include "lib/io-loop.h"
-extern _Thread_local struct coroutine *this_coro;
-
event_list global_event_list;
event_list global_work_list;
+//#ifdef DEBUGGING
+#if 0
+#define EDL_MAX 16384
+enum edl_caller {
+ EDL_REMOVE_FROM = 1,
+ EDL_POSTPONE = 2,
+ EDL_RUN = 3,
+ EDL_SEND = 4,
+ EDL_RUN_LIST = 5,
+} caller;
+static struct event_debug_log {
+ event_list *target_list;
+ event *event;
+ event *receiver;
+ uint pos;
+ uint prev_edl_pos;
+ uint thread;
+ enum edl_caller caller;
+} edl[EDL_MAX];
+static _Atomic uint edl_cnt;
+_Thread_local static uint edl_thread;
+_Thread_local static uint prev_edl_pos = ~0;
+static inline void edlog(event_list *list, event *e, event *receiver, uint pos, enum edl_caller caller)
+{
+ uint edl_pos = atomic_fetch_add_explicit(&edl_cnt, 1, memory_order_acq_rel);
+ if (!edl_thread)
+ edl_thread = edl_pos;
+
+ edl[edl_pos % EDL_MAX] = (struct event_debug_log) {
+ .target_list = list,
+ .event = e,
+ .receiver = receiver,
+ .pos = pos,
+ .prev_edl_pos = prev_edl_pos,
+ .thread = edl_thread,
+ .caller = caller,
+ };
+
+ prev_edl_pos = edl_pos;
+}
+#else
+#define edlog(...)
+#endif
+
+
+void
+ev_init_list(event_list *el, struct birdloop *loop, const char *name)
+{
+ el->name = name;
+ el->loop = loop;
+
+ atomic_store_explicit(&el->receiver, NULL, memory_order_release);
+ atomic_store_explicit(&el->_executor, NULL, memory_order_release);
+}
+
+/*
+ * The event list should work as a message passing point. Sending a message
+ * must be a fairly fast process with no locks and low waiting times. OTOH,
+ * processing messages always involves running the assigned code and the
+ * receiver is always a single one thread with no concurrency at all. There is
+ * also a postponing requirement to synchronously remove an event from a queue,
+ * yet we allow this only when the caller has its receiver event loop locked.
+ * It still means that the event may get postponed from other event in the same
+ * list, therefore we have to be careful.
+ */
+
+static inline int
+ev_remove_from(event *e, event * _Atomic * head)
+{
+ /* The head pointer stores where cur is pointed to from */
+ event * _Atomic *prev = head;
+
+ /* The current event in queue to check */
+ event *cur = atomic_load_explicit(prev, memory_order_acquire);
+
+ /* This part of queue is empty! */
+ if (!cur)
+ return 0;
+
+ edlog(NULL, e, cur, 1, EDL_REMOVE_FROM);
+ while (cur)
+ {
+ /* Pre-loaded next pointer */
+ event *next = atomic_load_explicit(&cur->next, memory_order_acquire);
+
+ if (e == cur)
+ {
+ edlog(NULL, e, next, 3, EDL_REMOVE_FROM);
+
+ /* Check whether we have collided with somebody else
+ * adding an item to the queue. */
+ if (!atomic_compare_exchange_strong_explicit(
+ prev, &cur, next,
+ memory_order_acq_rel, memory_order_acquire))
+ {
+ /* This may happen only on list head */
+ ASSERT_DIE(prev == head);
+
+ /* Restart. The collision should never happen again. */
+ return ev_remove_from(e, head);
+ }
+
+ /* Successfully removed from the list; inactivate this event. */
+ atomic_store_explicit(&cur->next, NULL, memory_order_release);
+ return 1;
+ }
+
+ edlog(NULL, e, next, 2, EDL_REMOVE_FROM);
+
+ /* Go to the next event. */
+ prev = &cur->next;
+ cur = next;
+ }
+
+ edlog(NULL, e, cur, 4, EDL_REMOVE_FROM);
+
+ return 0;
+}
+
inline void
ev_postpone(event *e)
{
- event_list *el = e->list;
- if (!el)
+ /* Find the list to remove the event from */
+ event_list *sl = ev_get_list(e);
+ edlog(sl, e, NULL, 1, EDL_POSTPONE);
+ if (!sl)
return;
- ASSERT_DIE(birdloop_inside(el->loop));
+ /* Postponing allowed only from the target loop */
+ ASSERT_DIE(birdloop_inside(sl->loop));
+
+ /* Remove from one of these lists. */
+ ASSERT(ev_remove_from(e, &sl->_executor) || ev_remove_from(e, &sl->receiver));
- LOCK_DOMAIN(event, el->lock);
- if (ev_active(e))
- rem_node(&e->n);
- UNLOCK_DOMAIN(event, el->lock);
+ /* Mark as inactive */
+ ASSERT_DIE(sl == atomic_exchange_explicit(&e->list, NULL, memory_order_acq_rel));
+ edlog(sl, e, NULL, 2, EDL_POSTPONE);
}
static void
-ev_dump(resource *r)
+ev_dump(resource *r, unsigned indent UNUSED)
{
event *e = (event *) r;
debug("(code %p, data %p, %s)\n",
e->hook,
e->data,
- e->n.next ? "scheduled" : "inactive");
+ atomic_load_explicit(&e->next, memory_order_relaxed) ? "scheduled" : "inactive");
}
static struct resclass ev_class = {
@@ -93,8 +214,10 @@ ev_new(pool *p)
inline void
ev_run(event *e)
{
+ edlog(NULL, e, NULL, 1, EDL_RUN);
ev_postpone(e);
e->hook(e->data);
+ edlog(NULL, e, NULL, 2, EDL_RUN);
}
/**
@@ -108,48 +231,37 @@ ev_run(event *e)
inline void
ev_send(event_list *l, event *e)
{
- DBG("ev_send(%p, %p)\n", l, e);
- ASSERT_DIE(e->hook);
- ASSERT_DIE(!e->list || (e->list == l) || (e->list->loop == l->loop));
-
- e->list = l;
-
- struct event_cork *ec = e->cork;
-
- uint ping = 0;
-
- if (ec)
- {
- LOCK_DOMAIN(cork, ec->lock);
- LOCK_DOMAIN(event, l->lock);
-
- if (!enlisted(&e->n))
- if (ec->count)
- add_tail(&ec->events, &e->n);
- else
- {
- add_tail(&l->events, &e->n);
- ping = 1;
- }
-
- UNLOCK_DOMAIN(event, l->lock);
- UNLOCK_DOMAIN(cork, ec->lock);
- }
- else
- {
- LOCK_DOMAIN(event, l->lock);
-
- if (!enlisted(&e->n))
+ edlog(l, e, NULL, 1, EDL_SEND);
+ /* Set the target list */
+ event_list *ol = NULL;
+ if (!atomic_compare_exchange_strong_explicit(
+ &e->list, &ol, l,
+ memory_order_acq_rel, memory_order_acquire))
+ if (ol == l)
+ return;
+ else
+ bug("Queuing an already queued event to another queue is not supported.");
+
+ /* Here should be no concurrent senders */
+ event *next = atomic_load_explicit(&l->receiver, memory_order_acquire);
+ edlog(l, e, next, 2, EDL_SEND);
+ event *old_next = NULL;
+ do
+ if (!atomic_compare_exchange_strong_explicit(
+ &e->next, &old_next, next,
+ memory_order_acq_rel, memory_order_acquire))
+ bug("Event %p in inconsistent state");
+ else
{
- add_tail(&l->events, &e->n);
- ping = 1;
+ old_next = next;
+ edlog(l, old_next, next, 3, EDL_SEND);
}
+ while (!atomic_compare_exchange_strong_explicit(
+ &l->receiver, &next, e,
+ memory_order_acq_rel, memory_order_acquire));
- UNLOCK_DOMAIN(event, l->lock);
- }
-
- if (ping)
- birdloop_ping(l->loop);
+ edlog(l, e, next, 4, EDL_SEND);
+ if (l->loop) birdloop_ping(l->loop);
}
void io_log_event(void *hook, void *data);
@@ -161,135 +273,66 @@ void io_log_event(void *hook, void *data);
* This function calls ev_run() for all events enqueued in the list @l.
*/
int
-ev_run_list(event_list *l)
-{
- const _Bool legacy = LEGACY_EVENT_LIST(l);
-
- if (legacy)
- ASSERT_THE_BIRD_LOCKED;
-
- node *n;
-
- list tmp_list;
- init_list(&tmp_list);
-
- /* Move the event list contents to a local list to avoid executing repeatedly added events */
- LOCK_DOMAIN(event, l->lock);
- add_tail_list(&tmp_list, &l->events);
- init_list(&l->events);
- UNLOCK_DOMAIN(event, l->lock);
-
- WALK_LIST_FIRST(n, tmp_list)
- {
- event *e = SKIP_BACK(event, n, n);
- ASSERT_DIE(n->next->prev == n);
-
- if (legacy)
- {
- /* The legacy way of event execution */
- io_log_event(e->hook, e->data);
- ev_postpone(e);
- e->hook(e->data);
- }
- else
- {
- // io_log_event(e->hook, e->data); /* TODO: add support for event logging in other io loops */
- ASSERT_DIE(e->list == l);
- LOCK_DOMAIN(event, l->lock);
- rem_node(&e->n);
- UNLOCK_DOMAIN(event, l->lock);
- e->hook(e->data);
- }
- }
-
- LOCK_DOMAIN(event, l->lock);
- int repeat = ! EMPTY_LIST(l->events);
- UNLOCK_DOMAIN(event, l->lock);
- return repeat;
-}
-
-int
ev_run_list_limited(event_list *l, uint limit)
{
- ASSERT_DIE(LEGACY_EVENT_LIST(l));
- ASSERT_THE_BIRD_LOCKED;
+ event * _Atomic *ep = &l->_executor;
+ edlog(l, NULL, NULL, 1, EDL_RUN_LIST);
- node *n;
- list tmp_list;
-
- LOCK_DOMAIN(event, l->lock);
- init_list(&tmp_list);
- add_tail_list(&tmp_list, &l->events);
- init_list(&l->events);
- UNLOCK_DOMAIN(event, l->lock);
-
- WALK_LIST_FIRST(n, tmp_list)
- {
- event *e = SKIP_BACK(event, n, n);
-
- if (!limit)
- break;
-
- io_log_event(e->hook, e->data);
-
- ev_run(e);
- limit--;
- }
-
- LOCK_DOMAIN(event, l->lock);
- if (!EMPTY_LIST(tmp_list))
+ /* No pending events, refill the queue. */
+ if (!atomic_load_explicit(ep, memory_order_acquire))
{
- /* Attach new items after the unprocessed old items */
- add_tail_list(&tmp_list, &l->events);
- init_list(&l->events);
- add_tail_list(&l->events, &tmp_list);
- }
+ /* Move the current event list aside and create a new one. */
+ event *received = atomic_exchange_explicit(&l->receiver, NULL, memory_order_acq_rel);
+ edlog(l, NULL, received, 2, EDL_RUN_LIST);
- int repeat = ! EMPTY_LIST(l->events);
- UNLOCK_DOMAIN(event, l->lock);
+ /* No event to run. */
+ if (!received)
+ return 0;
- return repeat;
-}
+ /* Setup the executor queue */
+ event *head = NULL;
-void ev_cork(struct event_cork *ec)
-{
- LOCK_DOMAIN(cork, ec->lock);
- ec->count++;
- UNLOCK_DOMAIN(cork, ec->lock);
-}
-
-void ev_uncork(struct event_cork *ec)
-{
- LOCK_DOMAIN(cork, ec->lock);
+ /* Flip the order of the events by relinking them one by one (push-pop) */
+ while (received)
+ {
+ event *cur = received;
+ received = atomic_exchange_explicit(&cur->next, head, memory_order_acq_rel);
+ edlog(l, head, received, 3, EDL_RUN_LIST);
+ head = cur;
+ }
- if (--ec->count)
- {
- UNLOCK_DOMAIN(cork, ec->lock);
- return;
+ /* Store the executor queue to its designated place */
+ ASSERT_DIE(atomic_exchange_explicit(ep, head, memory_order_acq_rel) == NULL);
+ edlog(l, NULL, head, 4, EDL_RUN_LIST);
}
- node *n;
- WALK_LIST_FIRST(n, ec->events)
+ /* Run the events in order. */
+ event *e;
+ while (e = atomic_load_explicit(ep, memory_order_acquire))
{
- event *e = SKIP_BACK(event, n, n);
- event_list *el = e->list;
+ edlog(l, e, NULL, 5, EDL_RUN_LIST);
+ /* Check limit */
+ if (!--limit)
+ return 1;
- rem_node(&e->n);
+ /* This is ugly hack, we want to log just events executed from the main I/O loop */
+ if ((l == &global_event_list) || (l == &global_work_list))
+ io_log_event(e->hook, e->data);
- LOCK_DOMAIN(event, el->lock);
- add_tail(&el->events, &e->n);
- UNLOCK_DOMAIN(event, el->lock);
+ edlog(l, e, NULL, 6, EDL_RUN_LIST);
+ /* Inactivate the event */
+ event *next = atomic_load_explicit(&e->next, memory_order_relaxed);
+ ASSERT_DIE(e == atomic_exchange_explicit(ep, next, memory_order_acq_rel));
+ ASSERT_DIE(next == atomic_exchange_explicit(&e->next, NULL, memory_order_acq_rel));
+ ASSERT_DIE(l == atomic_exchange_explicit(&e->list, NULL, memory_order_acq_rel));
+ edlog(l, e, next, 7, EDL_RUN_LIST);
- birdloop_ping(el->loop);
- }
+ /* Run the event */
+ e->hook(e->data);
+ tmp_flush();
- struct birdsock *sk;
- WALK_LIST_FIRST2(sk, cork_node, ec->sockets)
- {
-// log(L_TRACE "Socket %p uncorked", sk);
- rem_node(&sk->cork_node);
- sk_ping(sk);
+ edlog(l, e, next, 8, EDL_RUN_LIST);
}
- UNLOCK_DOMAIN(cork, ec->lock);
+ return !!atomic_load_explicit(&l->receiver, memory_order_acquire);
}