diff options
author | Maria Matejka <mq@ucw.cz> | 2021-09-27 13:04:16 +0200 |
---|---|---|
committer | Maria Matejka <mq@ucw.cz> | 2021-11-22 19:05:43 +0100 |
commit | c70b3198dc349127273d202ab8c36afeebb6d9d0 (patch) | |
tree | 27ea26fc1200195849897f47ec2e7ce7bae94d15 | |
parent | f18968f52f461946b64aaea6c4a0e88c5235fb07 (diff) |
Route export is now asynchronous.
To allow for multithreaded execution, we need to break the import-export
chain and buffer the exports before actually processing them.
-rw-r--r-- | conf/conf.h | 2 | ||||
-rw-r--r-- | nest/config.Y | 2 | ||||
-rw-r--r-- | nest/proto.c | 72 | ||||
-rw-r--r-- | nest/route.h | 33 | ||||
-rw-r--r-- | nest/rt-table.c | 697 |
5 files changed, 666 insertions, 140 deletions
diff --git a/conf/conf.h b/conf/conf.h index 69ef8a10..4f6aa6eb 100644 --- a/conf/conf.h +++ b/conf/conf.h @@ -45,7 +45,7 @@ struct config { int cli_debug; /* Tracing of CLI connections and commands */ int latency_debug; /* I/O loop tracks duration of each event */ - int pipe_debug; /* Track route propagation through pipes */ + int table_debug; /* Track route propagation through tables */ u32 latency_limit; /* Events with longer duration are logged (us) */ u32 watchdog_warning; /* I/O loop watchdog limit for warning (us) */ u32 watchdog_timeout; /* Watchdog timeout (in seconds, 0 = disabled) */ diff --git a/nest/config.Y b/nest/config.Y index a56b25be..92f1aad2 100644 --- a/nest/config.Y +++ b/nest/config.Y @@ -348,7 +348,7 @@ debug_default: DEBUG PROTOCOLS debug_mask { new_config->proto_default_debug = $3; } | DEBUG CHANNELS debug_mask { new_config->channel_default_debug = $3; } | DEBUG COMMANDS expr { new_config->cli_debug = $3; } - | DEBUG PIPE bool { new_config->pipe_debug = $3; } + | DEBUG TABLES bool { new_config->table_debug = $3; } ; /* MRTDUMP PROTOCOLS is in systep/unix/config.Y */ diff --git a/nest/proto.c b/nest/proto.c index cf448fd9..fae0647a 100644 --- a/nest/proto.c +++ b/nest/proto.c @@ -649,7 +649,7 @@ channel_aux_import_stopped(struct rt_import_request *req) static void channel_aux_export_stopped(struct rt_export_request *req) { - struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, push, req); + struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req); req->hook = NULL; if (cat->refeed_pending && !cat->stop) @@ -669,7 +669,6 @@ channel_aux_stop(struct channel_aux_table *cat) rt_stop_import(&cat->push, channel_aux_import_stopped); rt_stop_export(&cat->get, channel_aux_export_stopped); - rt_lock_table(cat->tab); cat->tab->deleted = channel_aux_stopped; cat->tab->del_data = cat; rt_unlock_table(cat->tab); @@ -714,17 +713,51 @@ channel_get_log_state_change(struct rt_export_request *req, u8 state) void rte_update_direct(struct channel *c, const net_addr *n, rte *new, struct rte_src *src); +static int +channel_aux_export_one_any(struct rt_export_request *req, struct rt_pending_export *rpe, rte **new, rte **old) +{ + struct rte_src *src = rpe->new ? rpe->new->rte.src : rpe->old->rte.src; + *old = RTES_OR_NULL(rpe->old); + struct rte_storage *new_stored; + + while (rpe) + { + new_stored = rpe->new; + rpe_mark_seen(req->hook, rpe); + rpe = rpe_next(rpe, src); + } + + *new = RTES_CLONE(new_stored, *new); + + return (*new || *old) && (&new_stored->rte != *old); +} + +static int +channel_aux_export_one_best(struct rt_export_request *req, struct rt_pending_export *rpe, rte **new, rte **old) +{ + *old = RTES_OR_NULL(rpe->old_best); + struct rte_storage *new_stored; + + while (rpe) + { + new_stored = rpe->new_best; + rpe_mark_seen(req->hook, rpe); + rpe = rpe_next(rpe, NULL); + } + + *new = RTES_CLONE(new_stored, *new); + + return (*new || *old) && (&new_stored->rte != *old); +} + static void channel_in_export_one_any(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe) { struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req); - if (!rpe->new && !rpe->old) - return; - - rte n0; - struct rte_src *src = rpe->new ? rpe->new->rte.src : rpe->old->rte.src; - rte_update_direct(cat->c, net, RTES_CLONE(rpe->new, &n0), src); + rte n0, *new = &n0, *old; + if (channel_aux_export_one_any(req, rpe, &new, &old)) + rte_update_direct(cat->c, net, new, old ? old->src : new->src); } static void @@ -732,12 +765,9 @@ channel_in_export_one_best(struct rt_export_request *req, const net_addr *net, s { struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req); - if (!rpe->new && !rpe->old) - return; - - rte n0; - struct rte_src *src = rpe->old_best ? rpe->old_best->rte.src : rpe->new_best->rte.src; - rte_update_direct(cat->c, net, RTES_CLONE(rpe->new_best, &n0), src); + rte n0, *new = &n0, *old; + if (channel_aux_export_one_best(req, rpe, &new, &old)) + rte_update_direct(cat->c, net, new, old ? old->src : new->src); } static void @@ -768,16 +798,18 @@ static void channel_out_export_one_any(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe) { struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req); - rte n0; - do_rt_notify_direct(cat->c, net, RTES_CLONE(rpe->new, &n0), RTES_OR_NULL(rpe->old)); + rte n0, *new = &n0, *old; + if (channel_aux_export_one_any(req, rpe, &new, &old)) + do_rt_notify_direct(cat->c, net, new, old); } static void channel_out_export_one_best(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe) { struct channel_aux_table *cat = SKIP_BACK(struct channel_aux_table, get, req); - rte n0; - do_rt_notify_direct(cat->c, net, RTES_CLONE(rpe->new_best, &n0), RTES_OR_NULL(rpe->old_best)); + rte n0, *new = &n0, *old; + if (channel_aux_export_one_best(req, rpe, &new, &old)) + do_rt_notify_direct(cat->c, net, new, old); } static void @@ -831,6 +863,7 @@ channel_setup_in_table(struct channel *c, int best) c->in_table->c = c; c->in_table->tab = rt_setup(c->proto->pool, &cat->tab_cf); self_link(&c->in_table->tab->n); + rt_lock_table(c->in_table->tab); rt_request_import(c->in_table->tab, &c->in_table->push); rt_request_export(c->in_table->tab, &c->in_table->get); @@ -872,6 +905,7 @@ channel_setup_out_table(struct channel *c) c->out_table->c = c; c->out_table->tab = rt_setup(c->proto->pool, &cat->tab_cf); self_link(&c->out_table->tab->n); + rt_lock_table(c->out_table->tab); rt_request_import(c->out_table->tab, &c->out_table->push); rt_request_export(c->out_table->tab, &c->out_table->get); @@ -1051,6 +1085,8 @@ channel_request_table_feeding(struct channel *c) void channel_request_feeding(struct channel *c) { + CD(c, "Refeed requested"); + ASSERT(c->out_req.hook); if (c->out_table) diff --git a/nest/route.h b/nest/route.h index cb66be2a..d6474e09 100644 --- a/nest/route.h +++ b/nest/route.h @@ -15,6 +15,8 @@ #include "lib/resource.h" #include "lib/net.h" +#include <stdatomic.h> + struct ea_list; struct protocol; struct proto; @@ -152,6 +154,7 @@ struct rtable_config { byte sorted; /* Routes of network are sorted according to rte_better() */ btime min_settle_time; /* Minimum settle time for notifications */ btime max_settle_time; /* Maximum settle time for notifications */ + btime export_settle_time; /* Delay before exports are announced */ }; typedef struct rtable { @@ -181,12 +184,20 @@ typedef struct rtable { byte prune_state; /* Table prune state, 1 -> scheduled, 2-> running */ byte hcu_scheduled; /* Hostcache update is scheduled */ byte nhu_state; /* Next Hop Update state */ + byte export_used; /* Export journal setup scheduled */ struct fib_iterator prune_fit; /* Rtable prune FIB iterator */ struct fib_iterator nhu_fit; /* Next Hop Update FIB iterator */ struct tbf rl_pipe; /* Rate limiting token buffer for pipe collisions */ list subscribers; /* Subscribers for notifications */ struct timer *settle_timer; /* Settle time for notifications */ + + list pending_exports; /* List of packed struct rt_pending_export */ + btime base_export_time; /* When first pending export was announced */ + struct timer *export_timer; + + struct rt_pending_export *first_export; /* First export to announce */ + u64 next_export_seq; /* The next export will have this ID */ } rtable; struct rt_subscription { @@ -203,6 +214,7 @@ struct rt_subscription { typedef struct network { struct rte_storage *routes; /* Available routes for this network */ + struct rt_pending_export *last, *first; /* Routes with unfinished exports */ struct fib_node n; /* FIB flags reserved for kernel syncer */ } net; @@ -294,6 +306,7 @@ struct rt_import_hook { u32 withdraws_accepted; /* Number of route withdraws accepted and processed */ } stats; + u64 flush_seq; /* Table export seq when the channel announced flushing */ btime last_state_change; /* Time of last state transition */ u8 import_state; /* IS_* */ @@ -306,7 +319,9 @@ struct rt_import_hook { }; struct rt_pending_export { + struct rt_pending_export * _Atomic next; /* Next export for the same destination */ struct rte_storage *new, *new_best, *old, *old_best; + u64 seq; /* Sequential ID (table-local) of the pending export */ }; struct rt_export_request { @@ -333,7 +348,6 @@ struct rt_export_hook { rtable *table; /* The connected table */ pool *pool; - linpool *lp; struct rt_export_request *req; /* The requestor */ @@ -345,10 +359,15 @@ struct rt_export_hook { struct fib_iterator feed_fit; /* Routing table iterator used during feeding */ + struct bmap seq_map; /* Keep track which exports were already procesed */ + + struct rt_pending_export * _Atomic last_export;/* Last export processed */ + struct rt_pending_export *rpe_next; /* Next pending export to process */ + btime last_state_change; /* Time of last state transition */ u8 refeed_pending; /* Refeeding and another refeed is scheduled */ - u8 export_state; /* Route export state (TES_*, see below) */ + _Atomic u8 export_state; /* Route export state (TES_*, see below) */ struct event *event; /* Event running all the export operations */ @@ -384,6 +403,16 @@ static inline u8 rt_export_get_state(struct rt_export_hook *eh) { return eh ? eh void rte_import(struct rt_import_request *req, const net_addr *net, rte *new, struct rte_src *src); +/* Get next rpe. If src is given, it must match. */ +struct rt_pending_export *rpe_next(struct rt_pending_export *rpe, struct rte_src *src); + +/* Mark the pending export processed */ +void rpe_mark_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe); + +/* Get pending export seen status */ +int rpe_get_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe); + + /* Types of route announcement, also used as flags */ #define RA_UNDEF 0 /* Undefined RA type */ #define RA_OPTIMAL 1 /* Announcement of optimal route change */ diff --git a/nest/rt-table.c b/nest/rt-table.c index 2dcb2e26..c049101a 100644 --- a/nest/rt-table.c +++ b/nest/rt-table.c @@ -45,12 +45,24 @@ #include "lib/string.h" #include "lib/alloca.h" +#include <stdatomic.h> + pool *rt_table_pool; static linpool *rte_update_pool; list routing_tables; +/* Data structures for export journal */ +#define RT_PENDING_EXPORT_ITEMS (page_size - sizeof(struct rt_export_block)) / sizeof(struct rt_pending_export) + +struct rt_export_block { + node n; + _Atomic u32 end; + _Atomic _Bool not_last; + struct rt_pending_export export[]; +}; + static void rt_free_hostcache(rtable *tab); static void rt_notify_hostcache(rtable *tab, net *net); static void rt_update_hostcache(rtable *tab); @@ -59,6 +71,12 @@ static inline void rt_prune_table(rtable *tab); static inline void rt_schedule_notify(rtable *tab); static void rt_feed_channel(void *); +static inline void rt_export_used(rtable *tab); +static void rt_export_cleanup(rtable *tab); + +static inline void rte_update_lock(void); +static inline void rte_update_unlock(void); + const char *rt_import_state_name_array[TIS_MAX] = { [TIS_DOWN] = "DOWN", [TIS_UP] = "UP", @@ -385,9 +403,9 @@ rte_mergable(rte *pri, rte *sec) static void rte_trace(const char *name, const rte *e, int dir, const char *msg) { - log(L_TRACE "%s %c %s %N src %uL %uG %uS %s%s", + log(L_TRACE "%s %c %s %N src %uL %uG %uS id %u %s%s", name, dir, msg, e->net, - e->src->private_id, e->src->global_id, e->stale_cycle, + e->src->private_id, e->src->global_id, e->stale_cycle, e->id, rta_dest_name(e->attrs->dest), rte_is_filtered(e) ? " (filtered)" : ""); } @@ -583,6 +601,16 @@ rt_notify_basic(struct channel *c, const net_addr *net, rte *new, rte *old) do_rt_notify(c, net, new, old); } +static void +channel_rpe_mark_seen(struct rt_export_request *req, struct rt_pending_export *rpe) +{ + struct channel *c = SKIP_BACK(struct channel, out_req, req); + + rpe_mark_seen(req->hook, rpe); + if (rpe->old) + bmap_clear(&c->export_reject_map, rpe->old->rte.id); +} + void rt_notify_accepted(struct rt_export_request *req, const net_addr *n, struct rt_pending_export *rpe, struct rte **feed, uint count) @@ -626,38 +654,30 @@ rt_notify_accepted(struct rt_export_request *req, const net_addr *n, struct rt_p } } +done: /* Check obsolete routes for previously exported */ - if (!old_best) - if (rpe && rpe->old && bmap_test(&c->export_map, rpe->old->rte.id)) - old_best = &rpe->old->rte; - -/* for (; rpe; rpe = atomic_load_explicit(&rpe->next, memory_order_relaxed)) + while (rpe) + { + channel_rpe_mark_seen(req, rpe); + if (rpe->old) { - if (rpe->old && bmap_test(&hook->accept_map, rpe->old->id)) + if (bmap_test(&c->export_map, rpe->old->rte.id)) { - old_best = &rpe->old.rte; - break; + ASSERT_DIE(old_best == NULL); + old_best = &rpe->old->rte; } - - if (rpe == rpe_last) - break; } - */ + rpe = rpe_next(rpe, NULL); + } /* Nothing to export */ if (!new_best && !old_best) { DBG("rt_notify_accepted: nothing to export\n"); - goto done; + return; } do_rt_notify(c, n, new_best, old_best); - -done: - /* Drop the old stored rejection if applicable. - * new->id == old->id happens when updating hostentries. */ - if (rpe && rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id))) - bmap_clear(&c->export_reject_map, rpe->old->rte.id); } @@ -750,63 +770,70 @@ rt_notify_merged(struct rt_export_request *req, const net_addr *n, struct rt_pen } /* Check obsolete routes for previously exported */ - if (!old_best) - if (rpe && rpe->old && bmap_test(&c->export_map, rpe->old->rte.id)) - old_best = &rpe->old->rte; - -/* for (; rpe; rpe = atomic_load_explicit(&rpe->next, memory_order_relaxed)) + while (rpe) + { + channel_rpe_mark_seen(req, rpe); + if (rpe->old) { - if (rpe->old && bmap_test(&hook->accept_map, rpe->old->id)) + if (bmap_test(&c->export_map, rpe->old->rte.id)) { - old_best = &rpe->old.rte; - break; + ASSERT_DIE(old_best == NULL); + old_best = &rpe->old->rte; } - - if (rpe == rpe_last) - break; } - */ + rpe = rpe_next(rpe, NULL); + } /* Prepare new merged route */ rte *new_merged = count ? rt_export_merged(c, feed, count, rte_update_pool, 0) : NULL; if (new_merged || old_best) do_rt_notify(c, n, new_merged, old_best); - - /* Drop the old stored rejection if applicable. - * new->id == old->id happens when updating hostentries. */ - if (rpe && rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id))) - bmap_clear(&c->export_reject_map, rpe->old->rte.id); } void rt_notify_optimal(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe) { struct channel *c = SKIP_BACK(struct channel, out_req, req); - rte n0; - if (rpe->new_best != rpe->old_best) - rt_notify_basic(c, net, RTES_CLONE(rpe->new_best, &n0), RTES_OR_NULL(rpe->old_best)); + rte *old = RTES_OR_NULL(rpe->old_best); + struct rte_storage *new_best = rpe->new_best; - /* Drop the old stored rejection if applicable. - * new->id == old->id happens when updating hostentries. */ - if (rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id))) - bmap_clear(&c->export_reject_map, rpe->old->rte.id); + while (rpe) + { + channel_rpe_mark_seen(req, rpe); + new_best = rpe->new_best; + rpe = rpe_next(rpe, NULL); + } + + if (&new_best->rte != old) + { + rte n0, *new = RTES_CLONE(new_best, &n0); + rt_notify_basic(c, net, new, old); + } } void rt_notify_any(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe) { struct channel *c = SKIP_BACK(struct channel, out_req, req); - rte n0; - if (rpe->new != rpe->old) - rt_notify_basic(c, net, RTES_CLONE(rpe->new, &n0), RTES_OR_NULL(rpe->old)); + struct rte_src *src = rpe->new ? rpe->new->rte.src : rpe->old->rte.src; + rte *old = RTES_OR_NULL(rpe->old); + struct rte_storage *new_any = rpe->new; - /* Drop the old stored rejection if applicable. - * new->id == old->id happens when updating hostentries. */ - if (rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id))) - bmap_clear(&c->export_reject_map, rpe->old->rte.id); + while (rpe) + { + channel_rpe_mark_seen(req, rpe); + new_any = rpe->new; + rpe = rpe_next(rpe, src); + } + + if (&new_any->rte != old) + { + rte n0, *new = RTES_CLONE(new_any, &n0); + rt_notify_basic(c, net, new, old); + } } void @@ -821,6 +848,76 @@ rt_feed_any(struct rt_export_request *req, const net_addr *net, struct rt_pendin } } +void +rpe_mark_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe) +{ + bmap_set(&hook->seq_map, rpe->seq); +} + +struct rt_pending_export * +rpe_next(struct rt_pending_export *rpe, struct rte_src *src) +{ + struct rt_pending_export *next = atomic_load_explicit(&rpe->next, memory_order_acquire); + + if (!next) + return NULL; + + if (!src) + return next; + + while (rpe = next) + if (src == (rpe->new ? rpe->new->rte.src : rpe->old->rte.src)) + return rpe; + else + next = atomic_load_explicit(&rpe->next, memory_order_acquire); + + return NULL; +} + +static struct rt_pending_export * rt_next_export_fast(struct rt_pending_export *last); +static void +rte_export(struct rt_export_hook *hook, struct rt_pending_export *rpe) +{ + if (bmap_test(&hook->seq_map, rpe->seq)) + goto seen; + + const net_addr *n = rpe->new_best ? rpe->new_best->rte.net : rpe->old_best->rte.net; + + if (rpe->new) + hook->stats.updates_received++; + else + hook->stats.withdraws_received++; + + if (hook->req->export_one) + hook->req->export_one(hook->req, n, rpe); + else if (hook->req->export_bulk) + { + net *net = SKIP_BACK(struct network, n.addr, (net_addr (*)[0]) n); + uint count = rte_feed_count(net); + rte **feed = NULL; + if (count) + { + feed = alloca(count * sizeof(rte *)); + rte_feed_obtain(net, feed, count); + } + hook->req->export_bulk(hook->req, n, rpe, feed, count); + } + else + bug("Export request must always provide an export method"); + +seen: + /* Get the next export if exists */ + hook->rpe_next = rt_next_export_fast(rpe); + + /* The last block may be available to free */ + if (PAGE_HEAD(hook->rpe_next) != PAGE_HEAD(rpe)) + rt_export_used(hook->table); + + /* Releasing this export for cleanup routine */ + DBG("store hook=%p last_export=%p seq=%lu\n", hook, rpe, rpe->seq); + atomic_store_explicit(&hook->last_export, rpe, memory_order_release); +} + /** * rte_announce - announce a routing table change * @tab: table the route has been added to @@ -856,19 +953,23 @@ static void rte_announce(rtable *tab, net *net, struct rte_storage *new, struct rte_storage *old, struct rte_storage *new_best, struct rte_storage *old_best) { - if (!new || !rte_is_valid(&new->rte)) - new = NULL; - - if (!old || !rte_is_valid(&old->rte)) - old = NULL; - if (!new_best || !rte_is_valid(&new_best->rte)) new_best = NULL; if (!old_best || !rte_is_valid(&old_best->rte)) old_best = NULL; - if (!new && !old && !new_best && !old_best) + if (!new || !rte_is_valid(&new->rte)) + new = NULL; + + if (old && !rte_is_valid(&old->rte)) + { + /* Filtered old route isn't announced, should be freed immediately. */ + rte_free(old, tab); + old = NULL; + } + + if ((new == old) && (new_best == old_best)) return; if (new_best != old_best) @@ -884,35 +985,194 @@ rte_announce(rtable *tab, net *net, struct rte_storage *new, struct rte_storage rt_schedule_notify(tab); - struct rt_pending_export rpe = { .new = new, .old = old, .new_best = new_best, .old_best = old_best }; - uint count = rte_feed_count(net); - rte **feed = NULL; - if (count) + if (EMPTY_LIST(tab->exports) && EMPTY_LIST(tab->pending_exports)) + { + /* No export hook and no pending exports to cleanup. We may free the route immediately. */ + if (!old) + return; + + hmap_clear(&tab->id_map, old->rte.id); + rte_free(old, tab); + return; + } + + /* Get the pending export structure */ + struct rt_export_block *rpeb = NULL, *rpebsnl = NULL; + u32 end = 0; + + if (!EMPTY_LIST(tab->pending_exports)) { - feed = alloca(count * sizeof(rte *)); - rte_feed_obtain(net, feed, count); + rpeb = TAIL(tab->pending_exports); + end = atomic_load_explicit(&rpeb->end, memory_order_relaxed); + if (end >= RT_PENDING_EXPORT_ITEMS) + { + ASSERT_DIE(end == RT_PENDING_EXPORT_ITEMS); + rpebsnl = rpeb; + + rpeb = NULL; + end = 0; + } } - struct rt_export_hook *eh; - WALK_LIST(eh, tab->exports) + if (!rpeb) + { + rpeb = alloc_page(tab->rp); + *rpeb = (struct rt_export_block) {}; + add_tail(&tab->pending_exports, &rpeb->n); + } + + /* Fill the pending export */ + struct rt_pending_export *rpe = &rpeb->export[rpeb->end]; + *rpe = (struct rt_pending_export) { + .new = new, + .new_best = new_best, + .old = old, + .old_best = old_best, + .seq = tab->next_export_seq++, + }; + + DBG("rte_announce: table=%s net=%N new=%p from %p old=%p from %p new_best=%p old_best=%p seq=%lu\n", tab->name, net->n.addr, new, new ? new->sender : NULL, old, old ? old->sender : NULL, new_best, old_best, rpe->seq); + + ASSERT_DIE(atomic_fetch_add_explicit(&rpeb->end, 1, memory_order_release) == end); + + if (rpebsnl) + { + _Bool f = 0; + ASSERT_DIE(atomic_compare_exchange_strong_explicit(&rpebsnl->not_last, &f, 1, + memory_order_release, memory_order_relaxed)); + } + + /* Append to the same-network squasher list */ + if (net->last) + { + struct rt_pending_export *rpenull = NULL; + ASSERT_DIE(atomic_compare_exchange_strong_explicit( + &net->last->next, &rpenull, rpe, + memory_order_relaxed, + memory_order_relaxed)); + + } + + net->last = rpe; + + if (!net->first) + net->first = rpe; + + if (tab->first_export == NULL) + tab->first_export = rpe; + + if (!tm_active(tab->export_timer)) + tm_start(tab->export_timer, tab->config->export_settle_time); +} + +static struct rt_pending_export * +rt_next_export_fast(struct rt_pending_export *last) +{ + /* Get the whole export block and find our position in there. */ + struct rt_export_block *rpeb = PAGE_HEAD(last); + u32 pos = (last - &rpeb->export[0]); + u32 end = atomic_load_explicit(&rpeb->end, memory_order_acquire); + ASSERT_DIE(pos < end); + + /* Next is in the same block. */ + if (++pos < end) + return &rpeb->export[pos]; + + /* There is another block. */ + if (atomic_load_explicit(&rpeb->not_last, memory_order_acquire)) + { + /* This is OK to do non-atomically because of the not_last flag. */ + rpeb = NODE_NEXT(rpeb); + return &rpeb->export[0]; + } + + /* There is nothing more. */ + return NULL; +} + +static struct rt_pending_export * +rt_next_export(struct rt_export_hook *hook, rtable *tab) +{ + /* As the table is locked, it is safe to reload the last export pointer */ + struct rt_pending_export *last = atomic_load_explicit(&hook->last_export, memory_order_acquire); + + /* It is still valid, let's reuse it */ + if (last) + return rt_next_export_fast(last); + + /* No, therefore we must process the table's first pending export */ + else + return tab->first_export; +} + +static void +rt_announce_exports(timer *tm) +{ + rtable *tab = tm->data; + + struct rt_export_hook *c; node *n; + WALK_LIST2(c, n, tab->exports, n) { - if (eh->export_state == TES_STOP) + if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_READY) continue; - if (new) - eh->stats.updates_received++; - else - eh->stats.withdraws_received++; + ev_schedule_work(c->event); + } +} - if (eh->req->export_one) - eh->req->export_one(eh->req, net->n.addr, &rpe); - else if (eh->req->export_bulk) - eh->req->export_bulk(eh->req, net->n.addr, &rpe, feed, count); - else - bug("Export request must always provide an export method"); +static struct rt_pending_export * +rt_last_export(rtable *tab) +{ + struct rt_pending_export *rpe = NULL; + + if (!EMPTY_LIST(tab->pending_exports)) + { + /* We'll continue processing exports from this export on */ + struct rt_export_block *reb = TAIL(tab->pending_exports); + ASSERT_DIE(reb->end); + rpe = &reb->export[reb->end - 1]; + } + + return rpe; +} + +#define RT_EXPORT_BULK 1024 + +static void +rt_export_hook(void *_data) +{ + struct rt_export_hook *c = _data; + + ASSERT_DIE(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_READY); + + if (!c->rpe_next) + { + c->rpe_next = rt_next_export(c, c->table); + + if (!c->rpe_next) + { + rt_export_used(c->table); + return; + } } + + /* Process the export */ + for (uint i=0; i<RT_EXPORT_BULK; i++) + { + rte_update_lock(); + + rte_export(c, c->rpe_next); + + if (!c->rpe_next) + break; + + rte_update_unlock(); + } + + ev_schedule_work(c->event); } + static inline int rte_validate(struct channel *ch, rte *e) { @@ -1133,14 +1393,8 @@ rte_recalculate(struct rt_import_hook *c, net *net, rte *new, struct rte_src *sr if (new_stored) { new_stored->rte.lastmod = current_time(); - - if (!old) - { - new_stored->rte.id = hmap_first_zero(&table->id_map); - hmap_set(&table->id_map, new_stored->rte.id); - } - else - new_stored->rte.id = old->id; + new_stored->rte.id = hmap_first_zero(&table->id_map); + hmap_set(&table->id_map, new_stored->rte.id); } _Bool nb = (new_stored == net->routes); @@ -1178,13 +1432,6 @@ rte_recalculate(struct rt_import_hook *c, net *net, rte *new, struct rte_src *sr p->rte_insert(net, &new_stored->rte); #endif - if (old) - { - if (!new_stored) - hmap_clear(&table->id_map, old->id); - - rte_free(old_stored, table); - } } static int rte_update_nest_cnt; /* Nesting counter to allow recursive updates */ @@ -1384,13 +1631,16 @@ rt_export_stopped(void *data) struct rt_export_hook *hook = data; rtable *tab = hook->table; + /* Drop pending exports */ + rt_export_used(tab); + /* Unlist */ rem_node(&hook->n); - /* Reporting the channel as stopped. */ + /* Report the channel as stopped. */ hook->stopped(hook->req); - /* Freeing the hook together with its coroutine. */ + /* Free the hook together with its coroutine. */ rfree(hook->pool); rt_unlock_table(tab); @@ -1412,7 +1662,7 @@ static inline void rt_set_export_state(struct rt_export_hook *hook, u8 state) { hook->last_state_change = current_time(); - hook->export_state = state; + atomic_store_explicit(&hook->export_state, state, memory_order_release); if (hook->req->log_state_change) hook->req->log_state_change(hook->req, state); @@ -1460,15 +1710,20 @@ rt_request_export(rtable *tab, struct rt_export_request *req) pool *p = rp_new(tab->rp, "Export hook"); struct rt_export_hook *hook = req->hook = mb_allocz(p, sizeof(struct rt_export_hook)); hook->pool = p; - hook->lp = lp_new_default(p); hook->req = req; hook->table = tab; /* stats zeroed by mb_allocz */ + bmap_init(&hook->seq_map, p, 1024); + rt_set_export_state(hook, TES_HUNGRY); + struct rt_pending_export *rpe = rt_last_export(hook->table); + DBG("store hook=%p last_export=%p seq=%lu\n", hook, rpe, rpe ? rpe->seq : 0); + atomic_store_explicit(&hook->last_export, rpe, memory_order_relaxed); + hook->n = (node) {}; add_tail(&tab->exports, &hook->n); @@ -1476,10 +1731,10 @@ rt_request_export(rtable *tab, struct rt_export_request *req) DBG("New export hook %p req %p in table %s uc=%u\n", hook, req, tab->name, tab->use_count); - rt_set_export_state(hook, TES_FEEDING); - hook->event = ev_new_init(p, rt_feed_channel, hook); ev_schedule_work(hook->event); + + rt_set_export_state(hook, TES_FEEDING); } void @@ -1493,16 +1748,18 @@ rt_stop_export(struct rt_export_request *req, void (*stopped)(struct rt_export_r /* Stop feeding */ ev_postpone(hook->event); - if (hook->export_state == TES_FEEDING) + if (atomic_load_explicit(&hook->export_state, memory_order_relaxed) == TES_FEEDING) fit_get(&tab->fib, &hook->feed_fit); hook->event->hook = rt_export_stopped; hook->stopped = stopped; - rt_set_export_state(hook, TES_STOP); ev_schedule(hook->event); + + rt_set_export_state(hook, TES_STOP); } + /** * rt_refresh_begin - start a refresh cycle * @t: related routing table @@ -1649,8 +1906,8 @@ rt_dump_hooks(rtable *tab) { eh->req->dump_req(eh->req); debug(" Export hook %p requested by %p:" - " refeed_pending=%u last_state_change=%t export_state=%u stopped=%p\n", - eh, eh->req, eh->refeed_pending, eh->last_state_change, eh->export_state, eh->stopped); + " refeed_pending=%u last_state_change=%t export_state=%u\n", + eh, eh->req, eh->refeed_pending, eh->last_state_change, atomic_load_explicit(&eh->export_state, memory_order_relaxed)); } debug("\n"); } @@ -1700,6 +1957,18 @@ rt_schedule_prune(rtable *tab) tab->prune_state |= 1; } +void +rt_export_used(rtable *tab) +{ + if (config->table_debug) + log(L_TRACE "%s: Export cleanup requested", tab->name); + + if (tab->export_used) + return; + + tab->export_used = 1; + ev_schedule(tab->rt_event); +} static void rt_event(void *ptr) @@ -1708,6 +1977,9 @@ rt_event(void *ptr) rt_lock_table(tab); + if (tab->export_used) + rt_export_cleanup(tab); + if (tab->hcu_scheduled) rt_update_hostcache(tab); @@ -1857,13 +2129,17 @@ rt_setup(pool *pp, struct rtable_config *cf) init_list(&t->imports); init_list(&t->exports); + hmap_init(&t->id_map, p, 1024); hmap_set(&t->id_map, 0); + init_list(&t->pending_exports); init_list(&t->subscribers); t->rt_event = ev_new_init(p, rt_event, t); + t->export_timer = tm_new_init(p, rt_announce_exports, t, 0, 0); t->last_rt_change = t->gc_time = current_time(); + t->next_export_seq = 1; t->rl_pipe = (struct tbf) TBF_DEFAULT_LOG_LIMITS; @@ -1961,7 +2237,7 @@ again: } } - if (!n->routes) /* Orphaned FIB entry */ + if (!n->routes && !n->first) /* Orphaned FIB entry */ { FIB_ITERATE_PUT(fit); fib_delete(&tab->fib, n); @@ -1982,15 +2258,15 @@ again: rt_prune_sources(); + uint flushed_channels = 0; + /* Close flushed channels */ WALK_LIST2_DELSAFE(ih, n, x, tab->imports, n) if (ih->import_state == TIS_FLUSHING) { - rt_set_import_state(ih, TIS_CLEARED); - ih->stopped(ih->req); - rem_node(&ih->n); - mb_free(ih); - rt_unlock_table(tab); + ih->flush_seq = tab->next_export_seq; + rt_set_import_state(ih, TIS_WAITING); + flushed_channels++; } else if (ih->stale_pruning != ih->stale_pruned) { @@ -1999,6 +2275,185 @@ again: if (ih->req->trace_routes & D_STATES) log(L_TRACE "%s: table prune after refresh end [%u]", ih->req->name, ih->stale_pruned); } + + /* In some cases, we may want to directly proceed to export cleanup */ + if (EMPTY_LIST(tab->exports) && flushed_channels) + rt_export_cleanup(tab); +} + +static void +rt_export_cleanup(rtable *tab) +{ + tab->export_used = 0; + + u64 min_seq = ~((u64) 0); + struct rt_pending_export *last_export_to_free = NULL; + struct rt_pending_export *first_export = tab->first_export; + + struct rt_export_hook *eh; + node *n; + WALK_LIST2(eh, n, tab->exports, n) + { + switch (atomic_load_explicit(&eh->export_state, memory_order_acquire)) + { + case TES_DOWN: + case TES_HUNGRY: + continue; + + case TES_READY: + { + struct rt_pending_export *last = atomic_load_explicit(&eh->last_export, memory_order_acquire); + if (!last) + /* No last export means that the channel has exported nothing since last cleanup */ + goto done; + + else if (min_seq > last->seq) + { + min_seq = last->seq; + last_export_to_free = last; + } + continue; + } + + default: + /* It's only safe to cleanup when the export state is idle or regular. No feeding or stopping allowed. */ + goto done; + } + } + + tab->first_export = last_export_to_free ? rt_next_export_fast(last_export_to_free) : NULL; + + if (config->table_debug) + log(L_TRACE "%s: Export cleanup, old first_export seq %lu, new %lu, min_seq %ld", + tab->name, + first_export ? first_export->seq : 0, + tab->first_export ? tab->first_export->seq : 0, + min_seq); + + WALK_LIST2(eh, n, tab->exports, n) + { + if (atomic_load_explicit(&eh->export_state, memory_order_acquire) != TES_READY) + continue; + + struct rt_pending_export *last = atomic_load_explicit(&eh->last_export, memory_order_acquire); + if (last == last_export_to_free) + { + /* This may fail when the channel managed to export more inbetween. This is OK. */ + atomic_compare_exchange_strong_explicit( + &eh->last_export, &last, NULL, + memory_order_release, + memory_order_relaxed); + + DBG("store hook=%p last_export=NULL\n", eh); + } + } + + while (first_export && (first_export->seq <= min_seq)) + { + ASSERT_DIE(first_export->new || first_export->old); + + const net_addr *n = first_export->new ? + first_export->new->rte.net : + first_export->old->rte.net; + net *net = SKIP_BACK(struct network, n.addr, (net_addr (*)[0]) n); + + ASSERT_DIE(net->first == first_export); + + if (first_export == net->last) + /* The only export here */ + net->last = net->first = NULL; + else + /* First is now the next one */ + net->first = atomic_load_explicit(&first_export->next, memory_order_relaxed); + + /* For now, the old route may be finally freed */ + if (first_export->old) + { + rt_rte_trace_in(D_ROUTES, first_export->old->rte.sender->req, &first_export->old->rte, "freed"); + hmap_clear(&tab->id_map, first_export->old->rte.id); + rte_free(first_export->old, tab); + } + +#ifdef LOCAL_DEBUG + memset(first_export, 0xbd, sizeof(struct rt_pending_export)); +#endif + + struct rt_export_block *reb = HEAD(tab->pending_exports); + ASSERT_DIE(reb == PAGE_HEAD(first_export)); + + u32 pos = (first_export - &reb->export[0]); + u32 end = atomic_load_explicit(&reb->end, memory_order_relaxed); + ASSERT_DIE(pos < end); + + struct rt_pending_export *next = NULL; + + if (++pos < end) + next = &reb->export[pos]; + else + { + rem_node(&reb->n); + +#ifdef LOCAL_DEBUG + memset(reb, 0xbe, page_size); +#endif + + free_page(tab->rp, reb); + + if (EMPTY_LIST(tab->pending_exports)) + { + if (config->table_debug) + log(L_TRACE "%s: Resetting export seq", tab->name); + + node *n; + WALK_LIST2(eh, n, tab->exports, n) + { + if (atomic_load_explicit(&eh->export_state, memory_order_acquire) != TES_READY) + continue; + + ASSERT_DIE(atomic_load_explicit(&eh->last_export, memory_order_acquire) == NULL); + bmap_reset(&eh->seq_map, 1024); + } + + tab->next_export_seq = 1; + } + else + { + reb = HEAD(tab->pending_exports); + next = &reb->export[0]; + } + } + + first_export = next; + } + +done:; + struct rt_import_hook *ih; node *x; + _Bool imports_stopped = 0; + WALK_LIST2_DELSAFE(ih, n, x, tab->imports, n) + if (ih->import_state == TIS_WAITING) + if (!first_export || (first_export->seq >= ih->flush_seq)) + { + ih->import_state = TIS_CLEARED; + ih->stopped(ih->req); + rem_node(&ih->n); + mb_free(ih); + rt_unlock_table(tab); + imports_stopped = 1; + } + + if (tab->export_used) + ev_schedule(tab->rt_event); + + if (imports_stopped) + { + if (config->table_debug) + log(L_TRACE "%s: Sources pruning routine requested", tab->name); + + rt_prune_sources(); + } + + if (EMPTY_LIST(tab->pending_exports) && tm_active(tab->export_timer)) + tm_stop(tab->export_timer); } void @@ -2180,6 +2635,11 @@ rt_next_hop_update_net(rtable *tab, net *n) /* Replace the route in the list */ new->next = e->next; *k = e = new; + + /* Get a new ID for the route */ + new->rte.lastmod = current_time(); + new->rte.id = hmap_first_zero(&tab->id_map); + hmap_set(&tab->id_map, new->rte.id); } ASSERT_DIE(pos == count); @@ -2213,9 +2673,6 @@ rt_next_hop_update_net(rtable *tab, net *n) rte_announce_i(tab, n, updates[i].new, updates[i].old, new, old_best); } - for (int i=0; i<count; i++) - rte_free(updates[i].old, tab); - return count; } @@ -2405,7 +2862,7 @@ rt_feed_channel(void *data) struct fib_iterator *fit = &c->feed_fit; int max_feed = 256; - ASSERT(c->export_state == TES_FEEDING); + ASSERT(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING); FIB_ITERATE_START(&c->table->fib, fit, net, n) { @@ -2416,8 +2873,8 @@ rt_feed_channel(void *data) return; } - if (c->export_state != TES_FEEDING) - goto done; + if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_FEEDING) + return; if (c->req->export_bulk) { @@ -2427,8 +2884,7 @@ rt_feed_channel(void *data) rte_update_lock(); rte **feed = alloca(count * sizeof(rte *)); rte_feed_obtain(n, feed, count); - struct rt_pending_export rpe = { .new_best = n->routes }; - c->req->export_bulk(c->req, n->n.addr, &rpe, feed, count); + c->req->export_bulk(c->req, n->n.addr, NULL, feed, count); max_feed -= count; rte_update_unlock(); } @@ -2441,10 +2897,15 @@ rt_feed_channel(void *data) max_feed--; rte_update_unlock(); } + + for (struct rt_pending_export *rpe = n->first; rpe; rpe = rpe_next(rpe, NULL)) + rpe_mark_seen(c, rpe); } FIB_ITERATE_END; -done: + c->event->hook = rt_export_hook; + ev_schedule_work(c->event); + rt_set_export_state(c, TES_READY); } |