diff options
Diffstat (limited to 'lib/event.c')
-rw-r--r-- | lib/event.c | 295 |
1 files changed, 216 insertions, 79 deletions
diff --git a/lib/event.c b/lib/event.c index 33dc00b0..68ee4c06 100644 --- a/lib/event.c +++ b/lib/event.c @@ -19,20 +19,152 @@ * events in them and explicitly ask to run them. */ +#undef LOCAL_DEBUG + #include "nest/bird.h" #include "lib/event.h" +#include "lib/io-loop.h" event_list global_event_list; event_list global_work_list; -inline void -ev_postpone(event *e) +//#ifdef DEBUGGING +#if 0 +#define EDL_MAX 16384 +enum edl_caller { + EDL_REMOVE_FROM = 1, + EDL_POSTPONE = 2, + EDL_RUN = 3, + EDL_SEND = 4, + EDL_RUN_LIST = 5, +} caller; +static struct event_debug_log { + event_list *target_list; + event *event; + event *receiver; + uint pos; + uint prev_edl_pos; + uint thread; + enum edl_caller caller; +} edl[EDL_MAX]; +static _Atomic uint edl_cnt; +_Thread_local static uint edl_thread; +_Thread_local static uint prev_edl_pos = ~0; +static inline void edlog(event_list *list, event *e, event *receiver, uint pos, enum edl_caller caller) +{ + uint edl_pos = atomic_fetch_add_explicit(&edl_cnt, 1, memory_order_acq_rel); + if (!edl_thread) + edl_thread = edl_pos; + + edl[edl_pos % EDL_MAX] = (struct event_debug_log) { + .target_list = list, + .event = e, + .receiver = receiver, + .pos = pos, + .prev_edl_pos = prev_edl_pos, + .thread = edl_thread, + .caller = caller, + }; + + prev_edl_pos = edl_pos; +} +#else +#define edlog(...) +#endif + + +void +ev_init_list(event_list *el, struct birdloop *loop, const char *name) { - if (ev_active(e)) + el->name = name; + el->loop = loop; + + atomic_store_explicit(&el->receiver, NULL, memory_order_release); + atomic_store_explicit(&el->_executor, NULL, memory_order_release); +} + +/* + * The event list should work as a message passing point. Sending a message + * must be a fairly fast process with no locks and low waiting times. OTOH, + * processing messages always involves running the assigned code and the + * receiver is always a single one thread with no concurrency at all. There is + * also a postponing requirement to synchronously remove an event from a queue, + * yet we allow this only when the caller has its receiver event loop locked. + * It still means that the event may get postponed from other event in the same + * list, therefore we have to be careful. + */ + +static inline int +ev_remove_from(event *e, event * _Atomic * head) +{ + /* The head pointer stores where cur is pointed to from */ + event * _Atomic *prev = head; + + /* The current event in queue to check */ + event *cur = atomic_load_explicit(prev, memory_order_acquire); + + /* This part of queue is empty! */ + if (!cur) + return 0; + + edlog(NULL, e, cur, 1, EDL_REMOVE_FROM); + while (cur) + { + /* Pre-loaded next pointer */ + event *next = atomic_load_explicit(&cur->next, memory_order_acquire); + + if (e == cur) { - rem_node(&e->n); - e->n.next = NULL; + edlog(NULL, e, next, 3, EDL_REMOVE_FROM); + + /* Check whether we have collided with somebody else + * adding an item to the queue. */ + if (!atomic_compare_exchange_strong_explicit( + prev, &cur, next, + memory_order_acq_rel, memory_order_acquire)) + { + /* This may happen only on list head */ + ASSERT_DIE(prev == head); + + /* Restart. The collision should never happen again. */ + return ev_remove_from(e, head); + } + + /* Successfully removed from the list; inactivate this event. */ + atomic_store_explicit(&cur->next, NULL, memory_order_release); + return 1; } + + edlog(NULL, e, next, 2, EDL_REMOVE_FROM); + + /* Go to the next event. */ + prev = &cur->next; + cur = next; + } + + edlog(NULL, e, cur, 4, EDL_REMOVE_FROM); + + return 0; +} + +inline void +ev_postpone(event *e) +{ + /* Find the list to remove the event from */ + event_list *sl = ev_get_list(e); + edlog(sl, e, NULL, 1, EDL_POSTPONE); + if (!sl) + return; + + /* Postponing allowed only from the target loop */ + ASSERT_DIE(birdloop_inside(sl->loop)); + + /* Remove from one of these lists. */ + ASSERT(ev_remove_from(e, &sl->_executor) || ev_remove_from(e, &sl->receiver)); + + /* Mark as inactive */ + ASSERT_DIE(sl == atomic_exchange_explicit(&e->list, NULL, memory_order_acq_rel)); + edlog(sl, e, NULL, 2, EDL_POSTPONE); } static void @@ -43,7 +175,7 @@ ev_dump(resource *r) debug("(code %p, data %p, %s)\n", e->hook, e->data, - e->n.next ? "scheduled" : "inactive"); + atomic_load_explicit(&e->next, memory_order_relaxed) ? "scheduled" : "inactive"); } static struct resclass ev_class = { @@ -82,8 +214,10 @@ ev_new(pool *p) inline void ev_run(event *e) { + edlog(NULL, e, NULL, 1, EDL_RUN); ev_postpone(e); e->hook(e->data); + edlog(NULL, e, NULL, 2, EDL_RUN); } /** @@ -95,40 +229,39 @@ ev_run(event *e) * list @l which can be run by calling ev_run_list(). */ inline void -ev_enqueue(event_list *l, event *e) +ev_send(event_list *l, event *e) { - ev_postpone(e); - add_tail(l, &e->n); -} + edlog(l, e, NULL, 1, EDL_SEND); + /* Set the target list */ + event_list *ol = NULL; + if (!atomic_compare_exchange_strong_explicit( + &e->list, &ol, l, + memory_order_acq_rel, memory_order_acquire)) + if (ol == l) + return; + else + bug("Queuing an already queued event to another queue is not supported."); -/** - * ev_schedule - schedule an event - * @e: an event - * - * This function schedules an event by enqueueing it to a system-wide - * event list which is run by the platform dependent code whenever - * appropriate. - */ -void -ev_schedule(event *e) -{ - ev_enqueue(&global_event_list, e); -} + /* Here should be no concurrent senders */ + event *next = atomic_load_explicit(&l->receiver, memory_order_acquire); + edlog(l, e, next, 2, EDL_SEND); + event *old_next = NULL; + do + if (!atomic_compare_exchange_strong_explicit( + &e->next, &old_next, next, + memory_order_acq_rel, memory_order_acquire)) + bug("Event %p in inconsistent state"); + else + { + old_next = next; + edlog(l, old_next, next, 3, EDL_SEND); + } + while (!atomic_compare_exchange_strong_explicit( + &l->receiver, &next, e, + memory_order_acq_rel, memory_order_acquire)); -/** - * ev_schedule_work - schedule a work-event. - * @e: an event - * - * This function schedules an event by enqueueing it to a system-wide work-event - * list which is run by the platform dependent code whenever appropriate. This - * is designated for work-events instead of regular events. They are executed - * less often in order to not clog I/O loop. - */ -void -ev_schedule_work(event *e) -{ - if (!ev_active(e)) - add_tail(&global_work_list, &e->n); + edlog(l, e, next, 4, EDL_SEND); + birdloop_ping(l->loop); } void io_log_event(void *hook, void *data); @@ -140,62 +273,66 @@ void io_log_event(void *hook, void *data); * This function calls ev_run() for all events enqueued in the list @l. */ int -ev_run_list(event_list *l) +ev_run_list_limited(event_list *l, uint limit) { - node *n; - list tmp_list; - - init_list(&tmp_list); - add_tail_list(&tmp_list, l); - init_list(l); - WALK_LIST_FIRST(n, tmp_list) - { - event *e = SKIP_BACK(event, n, n); + event * _Atomic *ep = &l->_executor; + edlog(l, NULL, NULL, 1, EDL_RUN_LIST); - /* This is ugly hack, we want to log just events executed from the main I/O loop */ - if ((l == &global_event_list) || (l == &global_work_list)) - io_log_event(e->hook, e->data); + /* No pending events, refill the queue. */ + if (!atomic_load_explicit(ep, memory_order_acquire)) + { + /* Move the current event list aside and create a new one. */ + event *received = atomic_exchange_explicit(&l->receiver, NULL, memory_order_acq_rel); + edlog(l, NULL, received, 2, EDL_RUN_LIST); - ev_run(e); - tmp_flush(); - } + /* No event to run. */ + if (!received) + return 0; - return !EMPTY_LIST(*l); -} + /* Setup the executor queue */ + event *head = NULL; -int -ev_run_list_limited(event_list *l, uint limit) -{ - node *n; - list tmp_list; + /* Flip the order of the events by relinking them one by one (push-pop) */ + while (received) + { + event *cur = received; + received = atomic_exchange_explicit(&cur->next, head, memory_order_acq_rel); + edlog(l, head, received, 3, EDL_RUN_LIST); + head = cur; + } - init_list(&tmp_list); - add_tail_list(&tmp_list, l); - init_list(l); + /* Store the executor queue to its designated place */ + ASSERT_DIE(atomic_exchange_explicit(ep, head, memory_order_acq_rel) == NULL); + edlog(l, NULL, head, 4, EDL_RUN_LIST); + } - WALK_LIST_FIRST(n, tmp_list) + /* Run the events in order. */ + event *e; + while (e = atomic_load_explicit(ep, memory_order_acquire)) { - event *e = SKIP_BACK(event, n, n); - - if (!limit) - break; + edlog(l, e, NULL, 5, EDL_RUN_LIST); + /* Check limit */ + if (!--limit) + return 1; /* This is ugly hack, we want to log just events executed from the main I/O loop */ if ((l == &global_event_list) || (l == &global_work_list)) io_log_event(e->hook, e->data); - ev_run(e); + edlog(l, e, NULL, 6, EDL_RUN_LIST); + /* Inactivate the event */ + event *next = atomic_load_explicit(&e->next, memory_order_relaxed); + ASSERT_DIE(e == atomic_exchange_explicit(ep, next, memory_order_acq_rel)); + ASSERT_DIE(next == atomic_exchange_explicit(&e->next, NULL, memory_order_acq_rel)); + ASSERT_DIE(l == atomic_exchange_explicit(&e->list, NULL, memory_order_acq_rel)); + edlog(l, e, next, 7, EDL_RUN_LIST); + + /* Run the event */ + e->hook(e->data); tmp_flush(); - limit--; - } - if (!EMPTY_LIST(tmp_list)) - { - /* Attach new items after the unprocessed old items */ - add_tail_list(&tmp_list, l); - init_list(l); - add_tail_list(l, &tmp_list); - } + edlog(l, e, next, 8, EDL_RUN_LIST); + } - return !EMPTY_LIST(*l); + return !!atomic_load_explicit(&l->receiver, memory_order_acquire); } |