summaryrefslogtreecommitdiff
path: root/nest/rt-table.c
diff options
context:
space:
mode:
authorMaria Matejka <mq@ucw.cz>2022-07-15 14:57:02 +0200
committerMaria Matejka <mq@ucw.cz>2022-07-15 14:57:02 +0200
commit05673b16a87792baf8734dfcbf12ac2fd867f80b (patch)
tree7dd306058ea07dc41ec22d5429e8d1a9a6e7793b /nest/rt-table.c
parent1c2851ecfa94f3d0b732a267c6c2db8b817c37f4 (diff)
parentc70b3198dc349127273d202ab8c36afeebb6d9d0 (diff)
Merge commit 'c70b3198' into thread-next [lots of conflicts]
There were more conflicts that I'd like to see, most notably in route export. If a bisect identifies this commit with something related, it may be simply true that this commit introduces that bug. Let's hope it doesn't happen.
Diffstat (limited to 'nest/rt-table.c')
-rw-r--r--nest/rt-table.c801
1 files changed, 641 insertions, 160 deletions
diff --git a/nest/rt-table.c b/nest/rt-table.c
index 5e07c129..85a6faf7 100644
--- a/nest/rt-table.c
+++ b/nest/rt-table.c
@@ -110,6 +110,8 @@
#include "proto/bgp/bgp.h"
#endif
+#include <stdatomic.h>
+
pool *rt_table_pool;
static linpool *rte_update_pool;
@@ -117,6 +119,16 @@ static linpool *rte_update_pool;
list routing_tables;
list deleted_routing_tables;
+/* Data structures for export journal */
+#define RT_PENDING_EXPORT_ITEMS (page_size - sizeof(struct rt_export_block)) / sizeof(struct rt_pending_export)
+
+struct rt_export_block {
+ node n;
+ _Atomic u32 end;
+ _Atomic _Bool not_last;
+ struct rt_pending_export export[];
+};
+
static void rt_free_hostcache(rtable *tab);
static void rt_notify_hostcache(rtable *tab, net *net);
static void rt_update_hostcache(rtable *tab);
@@ -133,6 +145,14 @@ static void rt_feed_equal(void *);
static void rt_feed_for(void *);
static uint rt_feed_net(struct rt_export_hook *c, net *n);
+static inline void rt_export_used(struct rt_exporter *);
+static void rt_export_cleanup(rtable *tab);
+
+static inline void rte_update_lock(void);
+static inline void rte_update_unlock(void);
+
+static int rte_same(rte *x, rte *y);
+
const char *rt_import_state_name_array[TIS_MAX] = {
[TIS_DOWN] = "DOWN",
[TIS_UP] = "UP",
@@ -685,8 +705,9 @@ rte_mergable(rte *pri, rte *sec)
static void
rte_trace(const char *name, const rte *e, int dir, const char *msg)
{
- log(L_TRACE "%s %c %s %N %uL %uG %s",
- name, dir, msg, e->net, e->src->private_id, e->src->global_id,
+ log(L_TRACE "%s %c %s %N src %uL %uG %uS id %u %s",
+ name, dir, msg, e->net,
+ e->src->private_id, e->src->global_id, e->stale_cycle, e->id,
rta_dest_name(rte_dest(e)));
}
@@ -848,6 +869,16 @@ do_rt_notify(struct channel *c, const net_addr *net, rte *new, const rte *old)
static void
rt_notify_basic(struct channel *c, const net_addr *net, rte *new, rte *old)
{
+ if (new && old && rte_same(new, old))
+ {
+ if ((new->id != old->id) && bmap_test(&c->export_map, old->id))
+ {
+ bmap_set(&c->export_map, new->id);
+ bmap_clear(&c->export_map, old->id);
+ }
+ return;
+ }
+
if (new)
new = export_filter(c, new, 0);
@@ -860,6 +891,16 @@ rt_notify_basic(struct channel *c, const net_addr *net, rte *new, rte *old)
do_rt_notify(c, net, new, old);
}
+static void
+channel_rpe_mark_seen(struct rt_export_request *req, struct rt_pending_export *rpe)
+{
+ struct channel *c = SKIP_BACK(struct channel, out_req, req);
+
+ rpe_mark_seen(req->hook, rpe);
+ if (rpe->old)
+ bmap_clear(&c->export_reject_map, rpe->old->rte.id);
+}
+
void
rt_notify_accepted(struct rt_export_request *req, const net_addr *n, struct rt_pending_export *rpe,
struct rte **feed, uint count)
@@ -903,38 +944,30 @@ rt_notify_accepted(struct rt_export_request *req, const net_addr *n, struct rt_p
}
}
+done:
/* Check obsolete routes for previously exported */
- if (!old_best)
- if (rpe && rpe->old && bmap_test(&c->export_map, rpe->old->rte.id))
- old_best = &rpe->old->rte;
-
-/* for (; rpe; rpe = atomic_load_explicit(&rpe->next, memory_order_relaxed))
+ while (rpe)
+ {
+ channel_rpe_mark_seen(req, rpe);
+ if (rpe->old)
{
- if (rpe->old && bmap_test(&hook->accept_map, rpe->old->id))
+ if (bmap_test(&c->export_map, rpe->old->rte.id))
{
- old_best = &rpe->old.rte;
- break;
+ ASSERT_DIE(old_best == NULL);
+ old_best = &rpe->old->rte;
}
-
- if (rpe == rpe_last)
- break;
}
- */
+ rpe = rpe_next(rpe, NULL);
+ }
/* Nothing to export */
if (!new_best && !old_best)
{
DBG("rt_notify_accepted: nothing to export\n");
- goto done;
+ return;
}
do_rt_notify(c, n, new_best, old_best);
-
-done:
- /* Drop the old stored rejection if applicable.
- * new->id == old->id happens when updating hostentries. */
- if (rpe && rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id)))
- bmap_clear(&c->export_reject_map, rpe->old->rte.id);
}
rte *
@@ -1026,33 +1059,25 @@ rt_notify_merged(struct rt_export_request *req, const net_addr *n, struct rt_pen
}
/* Check obsolete routes for previously exported */
- if (!old_best)
- if (rpe && rpe->old && bmap_test(&c->export_map, rpe->old->rte.id))
- old_best = &rpe->old->rte;
-
-/* for (; rpe; rpe = atomic_load_explicit(&rpe->next, memory_order_relaxed))
+ while (rpe)
+ {
+ channel_rpe_mark_seen(req, rpe);
+ if (rpe->old)
{
- if (rpe->old && bmap_test(&hook->accept_map, rpe->old->id))
+ if (bmap_test(&c->export_map, rpe->old->rte.id))
{
- old_best = &rpe->old.rte;
- break;
+ ASSERT_DIE(old_best == NULL);
+ old_best = &rpe->old->rte;
}
-
- if (rpe == rpe_last)
- break;
}
- */
+ rpe = rpe_next(rpe, NULL);
+ }
/* Prepare new merged route */
rte *new_merged = count ? rt_export_merged(c, feed, count, rte_update_pool, 0) : NULL;
if (new_merged || old_best)
do_rt_notify(c, n, new_merged, old_best);
-
- /* Drop the old stored rejection if applicable.
- * new->id == old->id happens when updating hostentries. */
- if (rpe && rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id)))
- bmap_clear(&c->export_reject_map, rpe->old->rte.id);
}
void
@@ -1060,19 +1085,19 @@ rt_notify_optimal(struct rt_export_request *req, const net_addr *net, struct rt_
{
struct channel *c = SKIP_BACK(struct channel, out_req, req);
- if (rpe->new_best != rpe->old_best)
- {
- rte n0 = RTE_COPY_VALID(rpe->new_best);
- rte *o = RTE_VALID_OR_NULL(rpe->old_best);
+ rte *o = RTE_VALID_OR_NULL(rpe->old_best);
+ struct rte_storage *new_best = rpe->new_best;
- if (n0.src || o)
- rt_notify_basic(c, net, n0.src ? &n0 : NULL, o);
+ while (rpe)
+ {
+ channel_rpe_mark_seen(req, rpe);
+ new_best = rpe->new_best;
+ rpe = rpe_next(rpe, NULL);
}
- /* Drop the old stored rejection if applicable.
- * new->id == old->id happens when updating hostentries. */
- if (rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id)))
- bmap_clear(&c->export_reject_map, rpe->old->rte.id);
+ rte n0 = RTE_COPY_VALID(new_best);
+ if (n0.src || o)
+ rt_notify_basic(c, net, n0.src ? &n0 : NULL, o);
}
void
@@ -1080,18 +1105,28 @@ rt_notify_any(struct rt_export_request *req, const net_addr *net, struct rt_pend
{
struct channel *c = SKIP_BACK(struct channel, out_req, req);
- if (rpe->new != rpe->old)
+ rte *n = RTE_VALID_OR_NULL(rpe->new);
+ rte *o = RTE_VALID_OR_NULL(rpe->old);
+
+ if (!n && !o)
{
- rte n0 = RTE_COPY_VALID(rpe->new);
- rte *o = RTE_VALID_OR_NULL(rpe->old);
- if (n0.src || o)
- rt_notify_basic(c, net, n0.src ? &n0 : NULL, o);
+ channel_rpe_mark_seen(req, rpe);
+ return;
}
- /* Drop the old stored rejection if applicable.
- * new->id == old->id happens when updating hostentries. */
- if (rpe->old && (!rpe->new || (rpe->new->rte.id != rpe->old->rte.id)))
- bmap_clear(&c->export_reject_map, rpe->old->rte.id);
+ struct rte_src *src = n ? n->src : o->src;
+ struct rte_storage *new_latest = rpe->new;
+
+ while (rpe)
+ {
+ channel_rpe_mark_seen(req, rpe);
+ new_latest = rpe->new;
+ rpe = rpe_next(rpe, src);
+ }
+
+ rte n0 = RTE_COPY_VALID(new_latest);
+ if (n0.src || o)
+ rt_notify_basic(c, net, n0.src ? &n0 : NULL, o);
}
void
@@ -1107,6 +1142,98 @@ rt_feed_any(struct rt_export_request *req, const net_addr *net, struct rt_pendin
}
}
+void
+rpe_mark_seen(struct rt_export_hook *hook, struct rt_pending_export *rpe)
+{
+ bmap_set(&hook->seq_map, rpe->seq);
+}
+
+struct rt_pending_export *
+rpe_next(struct rt_pending_export *rpe, struct rte_src *src)
+{
+ struct rt_pending_export *next = atomic_load_explicit(&rpe->next, memory_order_acquire);
+
+ if (!next)
+ return NULL;
+
+ if (!src)
+ return next;
+
+ while (rpe = next)
+ if (src == (rpe->new ? rpe->new->rte.src : rpe->old->rte.src))
+ return rpe;
+ else
+ next = atomic_load_explicit(&rpe->next, memory_order_acquire);
+
+ return NULL;
+}
+
+static struct rt_pending_export * rt_next_export_fast(struct rt_pending_export *last);
+static void
+rte_export(struct rt_export_hook *hook, struct rt_pending_export *rpe)
+{
+ if (bmap_test(&hook->seq_map, rpe->seq))
+ goto ignore; /* Seen already */
+
+ const net_addr *n = rpe->new_best ? rpe->new_best->rte.net : rpe->old_best->rte.net;
+
+ switch (hook->req->addr_mode)
+ {
+ case TE_ADDR_NONE:
+ break;
+
+ case TE_ADDR_IN:
+ if (!net_in_netX(n, hook->req->addr))
+ goto ignore;
+ break;
+
+ case TE_ADDR_EQUAL:
+ if (!net_equal(n, hook->req->addr))
+ goto ignore;
+ break;
+
+ case TE_ADDR_FOR:
+ bug("Continuos export of best prefix match not implemented yet.");
+
+ default:
+ bug("Strange table export address mode: %d", hook->req->addr_mode);
+ }
+
+ if (rpe->new)
+ hook->stats.updates_received++;
+ else
+ hook->stats.withdraws_received++;
+
+ if (hook->req->export_one)
+ hook->req->export_one(hook->req, n, rpe);
+ else if (hook->req->export_bulk)
+ {
+ net *net = SKIP_BACK(struct network, n.addr, (net_addr (*)[0]) n);
+ uint count = rte_feed_count(net);
+ rte **feed = NULL;
+ if (count)
+ {
+ feed = alloca(count * sizeof(rte *));
+ rte_feed_obtain(net, feed, count);
+ }
+ hook->req->export_bulk(hook->req, n, rpe, feed, count);
+ }
+ else
+ bug("Export request must always provide an export method");
+
+ignore:
+ /* Get the next export if exists */
+ hook->rpe_next = rt_next_export_fast(rpe);
+
+ /* The last block may be available to free */
+ if (PAGE_HEAD(hook->rpe_next) != PAGE_HEAD(rpe))
+ CALL(hook->table->used, hook->table);
+
+ /* Releasing this export for cleanup routine */
+ DBG("store hook=%p last_export=%p seq=%lu\n", hook, rpe, rpe->seq);
+ atomic_store_explicit(&hook->last_export, rpe, memory_order_release);
+}
+
/**
* rte_announce - announce a routing table change
* @tab: table the route has been added to
@@ -1145,7 +1272,7 @@ rte_announce(rtable *tab, net *net, struct rte_storage *new, struct rte_storage
int new_best_valid = rte_is_valid(RTE_OR_NULL(new_best));
int old_best_valid = rte_is_valid(RTE_OR_NULL(old_best));
- if (!new && !old && !new_best && !old_best)
+ if ((new == old) && (new_best == old_best))
return;
if (new_best_valid || old_best_valid)
@@ -1164,57 +1291,198 @@ rte_announce(rtable *tab, net *net, struct rte_storage *new, struct rte_storage
rt_schedule_notify(tab);
- struct rt_pending_export rpe = { .new = new, .old = old, .new_best = new_best, .old_best = old_best };
- uint count = rte_feed_count(net);
- rte **feed = NULL;
- if (count)
+ if (EMPTY_LIST(tab->exporter.hooks) && EMPTY_LIST(tab->exporter.pending))
+ {
+ /* No export hook and no pending exports to cleanup. We may free the route immediately. */
+ if (!old)
+ return;
+
+ hmap_clear(&tab->id_map, old->rte.id);
+ rte_free(old);
+ return;
+ }
+
+ /* Get the pending export structure */
+ struct rt_export_block *rpeb = NULL, *rpebsnl = NULL;
+ u32 end = 0;
+
+ if (!EMPTY_LIST(tab->exporter.pending))
{
- feed = alloca(count * sizeof(rte *));
- rte_feed_obtain(net, feed, count);
+ rpeb = TAIL(tab->exporter.pending);
+ end = atomic_load_explicit(&rpeb->end, memory_order_relaxed);
+ if (end >= RT_PENDING_EXPORT_ITEMS)
+ {
+ ASSERT_DIE(end == RT_PENDING_EXPORT_ITEMS);
+ rpebsnl = rpeb;
+
+ rpeb = NULL;
+ end = 0;
+ }
}
- struct rt_export_hook *eh;
- WALK_LIST(eh, tab->exporter.hooks)
+ if (!rpeb)
+ {
+ rpeb = alloc_page();
+ *rpeb = (struct rt_export_block) {};
+ add_tail(&tab->exporter.pending, &rpeb->n);
+ }
+
+ /* Fill the pending export */
+ struct rt_pending_export *rpe = &rpeb->export[rpeb->end];
+ *rpe = (struct rt_pending_export) {
+ .new = new,
+ .new_best = new_best,
+ .old = old,
+ .old_best = old_best,
+ .seq = tab->exporter.next_seq++,
+ };
+
+ DBGL("rte_announce: table=%s net=%N new=%p id %u from %s old=%p id %u from %s new_best=%p id %u old_best=%p id %u seq=%lu",
+ tab->name, net->n.addr,
+ new, new ? new->rte.id : 0, new ? new->rte.sender->req->name : NULL,
+ old, old ? old->rte.id : 0, old ? old->rte.sender->req->name : NULL,
+ new_best, old_best, rpe->seq);
+
+ ASSERT_DIE(atomic_fetch_add_explicit(&rpeb->end, 1, memory_order_release) == end);
+
+ if (rpebsnl)
{
- if (eh->export_state == TES_STOP)
+ _Bool f = 0;
+ ASSERT_DIE(atomic_compare_exchange_strong_explicit(&rpebsnl->not_last, &f, 1,
+ memory_order_release, memory_order_relaxed));
+ }
+
+ /* Append to the same-network squasher list */
+ if (net->last)
+ {
+ struct rt_pending_export *rpenull = NULL;
+ ASSERT_DIE(atomic_compare_exchange_strong_explicit(
+ &net->last->next, &rpenull, rpe,
+ memory_order_relaxed,
+ memory_order_relaxed));
+
+ }
+
+ net->last = rpe;
+
+ if (!net->first)
+ net->first = rpe;
+
+ if (tab->exporter.first == NULL)
+ tab->exporter.first = rpe;
+
+ if (!tm_active(tab->exporter.export_timer))
+ tm_start(tab->exporter.export_timer, tab->config->export_settle_time);
+}
+
+static struct rt_pending_export *
+rt_next_export_fast(struct rt_pending_export *last)
+{
+ /* Get the whole export block and find our position in there. */
+ struct rt_export_block *rpeb = PAGE_HEAD(last);
+ u32 pos = (last - &rpeb->export[0]);
+ u32 end = atomic_load_explicit(&rpeb->end, memory_order_acquire);
+ ASSERT_DIE(pos < end);
+
+ /* Next is in the same block. */
+ if (++pos < end)
+ return &rpeb->export[pos];
+
+ /* There is another block. */
+ if (atomic_load_explicit(&rpeb->not_last, memory_order_acquire))
+ {
+ /* This is OK to do non-atomically because of the not_last flag. */
+ rpeb = NODE_NEXT(rpeb);
+ return &rpeb->export[0];
+ }
+
+ /* There is nothing more. */
+ return NULL;
+}
+
+static struct rt_pending_export *
+rt_next_export(struct rt_export_hook *hook, struct rt_exporter *tab)
+{
+ /* As the table is locked, it is safe to reload the last export pointer */
+ struct rt_pending_export *last = atomic_load_explicit(&hook->last_export, memory_order_acquire);
+
+ /* It is still valid, let's reuse it */
+ if (last)
+ return rt_next_export_fast(last);
+
+ /* No, therefore we must process the table's first pending export */
+ else
+ return tab->first;
+}
+
+static void
+rt_announce_exports(timer *tm)
+{
+ rtable *tab = tm->data;
+
+ struct rt_export_hook *c; node *n;
+ WALK_LIST2(c, n, tab->exporter.hooks, n)
+ {
+ if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_READY)
continue;
- switch (eh->req->addr_mode)
- {
- case TE_ADDR_NONE:
- break;
+ ev_schedule_work(c->event);
+ }
+}
- case TE_ADDR_IN:
- if (!net_in_netX(net->n.addr, eh->req->addr))
- continue;
- break;
+static struct rt_pending_export *
+rt_last_export(struct rt_exporter *tab)
+{
+ struct rt_pending_export *rpe = NULL;
- case TE_ADDR_EQUAL:
- if (!net_equal(net->n.addr, eh->req->addr))
- continue;
- break;
+ if (!EMPTY_LIST(tab->pending))
+ {
+ /* We'll continue processing exports from this export on */
+ struct rt_export_block *reb = TAIL(tab->pending);
+ ASSERT_DIE(reb->end);
+ rpe = &reb->export[reb->end - 1];
+ }
- case TE_ADDR_FOR:
- bug("Continuos export of best prefix match not implemented yet.");
+ return rpe;
+}
- default:
- bug("Strange table export address mode: %d", eh->req->addr_mode);
+#define RT_EXPORT_BULK 1024
+
+static void
+rt_export_hook(void *_data)
+{
+ struct rt_export_hook *c = _data;
+
+ ASSERT_DIE(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_READY);
+
+ if (!c->rpe_next)
+ {
+ c->rpe_next = rt_next_export(c, c->table);
+
+ if (!c->rpe_next)
+ {
+ CALL(c->table->used, c->table);
+ return;
}
+ }
- if (new)
- eh->stats.updates_received++;
- else
- eh->stats.withdraws_received++;
+ /* Process the export */
+ for (uint i=0; i<RT_EXPORT_BULK; i++)
+ {
+ rte_update_lock();
- if (eh->req->export_one)
- eh->req->export_one(eh->req, net->n.addr, &rpe);
- else if (eh->req->export_bulk)
- eh->req->export_bulk(eh->req, net->n.addr, &rpe, feed, count);
- else
- bug("Export request must always provide an export method");
+ rte_export(c, c->rpe_next);
+
+ if (!c->rpe_next)
+ break;
+
+ rte_update_unlock();
}
+
+ ev_schedule_work(c->event);
}
+
static inline int
rte_validate(struct channel *ch, rte *e)
{
@@ -1459,14 +1727,8 @@ rte_recalculate(struct rt_import_hook *c, net *net, rte *new, struct rte_src *sr
if (new_stored)
{
new_stored->rte.lastmod = current_time();
-
- if (!old)
- {
- new_stored->rte.id = hmap_first_zero(&table->id_map);
- hmap_set(&table->id_map, new_stored->rte.id);
- }
- else
- new_stored->rte.id = old->id;
+ new_stored->rte.id = hmap_first_zero(&table->id_map);
+ hmap_set(&table->id_map, new_stored->rte.id);
}
/* Log the route change */
@@ -1498,13 +1760,6 @@ rte_recalculate(struct rt_import_hook *c, net *net, rte *new, struct rte_src *sr
p->rte_insert(net, &new_stored->rte);
#endif
- if (old)
- {
- if (!new_stored)
- hmap_clear(&table->id_map, old->id);
-
- rte_free(old_stored);
- }
}
static int rte_update_nest_cnt; /* Nesting counter to allow recursive updates */
@@ -1711,16 +1966,19 @@ rt_export_stopped(void *data)
struct rt_export_hook *hook = data;
struct rt_exporter *tab = hook->table;
+ /* Drop pending exports */
+ CALL(tab->used, tab);
+
/* Unlist */
rem_node(&hook->n);
- /* Reporting the channel as stopped. */
+ /* Report the channel as stopped. */
hook->stopped(hook->req);
/* Reporting the hook as finished. */
CALL(tab->done, hook);
- /* Freeing the hook together with its coroutine. */
+ /* Free the hook together with its coroutine. */
rfree(hook->pool);
}
@@ -1738,7 +1996,7 @@ void
rt_set_export_state(struct rt_export_hook *hook, u8 state)
{
hook->last_state_change = current_time();
- hook->export_state = state;
+ atomic_store_explicit(&hook->export_state, state, memory_order_release);
if (hook->req->log_state_change)
hook->req->log_state_change(hook->req, state);
@@ -1784,7 +2042,6 @@ rt_table_export_start(struct rt_exporter *re, struct rt_export_request *req)
pool *p = rp_new(tab->rp, "Export hook");
struct rt_export_hook *hook = mb_allocz(p, sizeof(struct rt_export_hook));
hook->pool = p;
- hook->lp = lp_new_default(p);
/* stats zeroed by mb_allocz */
switch (req->addr_mode)
@@ -1829,6 +2086,12 @@ rt_request_export(struct rt_exporter *re, struct rt_export_request *req)
hook->req = req;
hook->table = re;
+ bmap_init(&hook->seq_map, hook->pool, 1024);
+
+ struct rt_pending_export *rpe = rt_last_export(hook->table);
+ DBG("store hook=%p last_export=%p seq=%lu\n", hook, rpe, rpe ? rpe->seq : 0);
+ atomic_store_explicit(&hook->last_export, rpe, memory_order_relaxed);
+
hook->n = (node) {};
add_tail(&re->hooks, &hook->n);
@@ -1842,7 +2105,7 @@ rt_table_export_stop(struct rt_export_hook *hook)
{
rtable *tab = SKIP_BACK(rtable, exporter, hook->table);
- if (hook->export_state != TES_FEEDING)
+ if (atomic_load_explicit(&hook->export_state, memory_order_relaxed) != TES_FEEDING)
return;
switch (hook->req->addr_mode)
@@ -2037,8 +2300,8 @@ rt_dump_hooks(rtable *tab)
{
eh->req->dump_req(eh->req);
debug(" Export hook %p requested by %p:"
- " refeed_pending=%u last_state_change=%t export_state=%u stopped=%p\n",
- eh, eh->req, eh->refeed_pending, eh->last_state_change, eh->export_state, eh->stopped);
+ " refeed_pending=%u last_state_change=%t export_state=%u\n",
+ eh, eh->req, eh->refeed_pending, eh->last_state_change, atomic_load_explicit(&eh->export_state, memory_order_relaxed));
}
debug("\n");
}
@@ -2091,6 +2354,20 @@ rt_schedule_prune(rtable *tab)
tab->prune_state |= 1;
}
+static void
+rt_export_used(struct rt_exporter *e)
+{
+ rtable *tab = SKIP_BACK(rtable, exporter, e);
+
+ if (config->table_debug)
+ log(L_TRACE "%s: Export cleanup requested", tab->name);
+
+ if (tab->export_used)
+ return;
+
+ tab->export_used = 1;
+ ev_schedule(tab->rt_event);
+}
static void
rt_event(void *ptr)
@@ -2099,6 +2376,9 @@ rt_event(void *ptr)
rt_lock_table(tab);
+ if (tab->export_used)
+ rt_export_cleanup(tab);
+
if (tab->hcu_scheduled)
rt_update_hostcache(tab);
@@ -2362,8 +2642,11 @@ rt_setup(pool *pp, struct rtable_config *cf)
.start = rt_table_export_start,
.stop = rt_table_export_stop,
.done = rt_table_export_done,
+ .used = rt_export_used,
};
+
init_list(&t->exporter.hooks);
+ init_list(&t->exporter.pending);
init_list(&t->imports);
@@ -2374,7 +2657,9 @@ rt_setup(pool *pp, struct rtable_config *cf)
t->rt_event = ev_new_init(p, rt_event, t);
t->prune_timer = tm_new_init(p, rt_prune_timer, t, 0, 0);
+ t->exporter.export_timer = tm_new_init(p, rt_announce_exports, t, 0, 0);
t->last_rt_change = t->gc_time = current_time();
+ t->exporter.next_seq = 1;
t->rl_pipe = (struct tbf) TBF_DEFAULT_LOG_LIMITS;
@@ -2488,7 +2773,7 @@ again:
}
}
- if (!n->routes) /* Orphaned FIB entry */
+ if (!n->routes && !n->first) /* Orphaned FIB entry */
{
FIB_ITERATE_PUT(fit);
fib_delete(&tab->fib, n);
@@ -2543,15 +2828,15 @@ again:
rt_prune_sources();
+ uint flushed_channels = 0;
+
/* Close flushed channels */
WALK_LIST2_DELSAFE(ih, n, x, tab->imports, n)
if (ih->import_state == TIS_FLUSHING)
{
- rt_set_import_state(ih, TIS_CLEARED);
- ih->stopped(ih->req);
- rem_node(&ih->n);
- mb_free(ih);
- rt_unlock_table(tab);
+ ih->flush_seq = tab->exporter.next_seq;
+ rt_set_import_state(ih, TIS_WAITING);
+ flushed_channels++;
}
else if (ih->stale_pruning != ih->stale_pruned)
{
@@ -2559,6 +2844,184 @@ again:
if (ih->req->trace_routes & D_STATES)
log(L_TRACE "%s: table prune after refresh end [%u]", ih->req->name, ih->stale_pruned);
}
+
+ /* In some cases, we may want to directly proceed to export cleanup */
+ if (EMPTY_LIST(tab->exporter.hooks) && flushed_channels)
+ rt_export_cleanup(tab);
+}
+
+static void
+rt_export_cleanup(rtable *tab)
+{
+ tab->export_used = 0;
+
+ u64 min_seq = ~((u64) 0);
+ struct rt_pending_export *last_export_to_free = NULL;
+ struct rt_pending_export *first = tab->exporter.first;
+
+ struct rt_export_hook *eh;
+ node *n;
+ WALK_LIST2(eh, n, tab->exporter.hooks, n)
+ {
+ switch (atomic_load_explicit(&eh->export_state, memory_order_acquire))
+ {
+ case TES_DOWN:
+ continue;
+
+ case TES_READY:
+ {
+ struct rt_pending_export *last = atomic_load_explicit(&eh->last_export, memory_order_acquire);
+ if (!last)
+ /* No last export means that the channel has exported nothing since last cleanup */
+ goto done;
+
+ else if (min_seq > last->seq)
+ {
+ min_seq = last->seq;
+ last_export_to_free = last;
+ }
+ continue;
+ }
+
+ default:
+ /* It's only safe to cleanup when the export state is idle or regular. No feeding or stopping allowed. */
+ goto done;
+ }
+ }
+
+ tab->exporter.first = last_export_to_free ? rt_next_export_fast(last_export_to_free) : NULL;
+
+ if (config->table_debug)
+ log(L_TRACE "%s: Export cleanup, old exporter.first seq %lu, new %lu, min_seq %ld",
+ tab->name,
+ first ? first->seq : 0,
+ tab->exporter.first ? tab->exporter.first->seq : 0,
+ min_seq);
+
+ WALK_LIST2(eh, n, tab->exporter.hooks, n)
+ {
+ if (atomic_load_explicit(&eh->export_state, memory_order_acquire) != TES_READY)
+ continue;
+
+ struct rt_pending_export *last = atomic_load_explicit(&eh->last_export, memory_order_acquire);
+ if (last == last_export_to_free)
+ {
+ /* This may fail when the channel managed to export more inbetween. This is OK. */
+ atomic_compare_exchange_strong_explicit(
+ &eh->last_export, &last, NULL,
+ memory_order_release,
+ memory_order_relaxed);
+
+ DBG("store hook=%p last_export=NULL\n", eh);
+ }
+ }
+
+ while (first && (first->seq <= min_seq))
+ {
+ ASSERT_DIE(first->new || first->old);
+
+ const net_addr *n = first->new ?
+ first->new->rte.net :
+ first->old->rte.net;
+ net *net = SKIP_BACK(struct network, n.addr, (net_addr (*)[0]) n);
+
+ ASSERT_DIE(net->first == first);
+
+ if (first == net->last)
+ /* The only export here */
+ net->last = net->first = NULL;
+ else
+ /* First is now the next one */
+ net->first = atomic_load_explicit(&first->next, memory_order_relaxed);
+
+ /* For now, the old route may be finally freed */
+ if (first->old)
+ {
+ rt_rte_trace_in(D_ROUTES, first->old->rte.sender->req, &first->old->rte, "freed");
+ hmap_clear(&tab->id_map, first->old->rte.id);
+ rte_free(first->old);
+ }
+
+#ifdef LOCAL_DEBUG
+ memset(first, 0xbd, sizeof(struct rt_pending_export));
+#endif
+
+ struct rt_export_block *reb = HEAD(tab->exporter.pending);
+ ASSERT_DIE(reb == PAGE_HEAD(first));
+
+ u32 pos = (first - &reb->export[0]);
+ u32 end = atomic_load_explicit(&reb->end, memory_order_relaxed);
+ ASSERT_DIE(pos < end);
+
+ struct rt_pending_export *next = NULL;
+
+ if (++pos < end)
+ next = &reb->export[pos];
+ else
+ {
+ rem_node(&reb->n);
+
+#ifdef LOCAL_DEBUG
+ memset(reb, 0xbe, page_size);
+#endif
+
+ free_page(reb);
+
+ if (EMPTY_LIST(tab->exporter.pending))
+ {
+ if (config->table_debug)
+ log(L_TRACE "%s: Resetting export seq", tab->name);
+
+ node *n;
+ WALK_LIST2(eh, n, tab->exporter.hooks, n)
+ {
+ if (atomic_load_explicit(&eh->export_state, memory_order_acquire) != TES_READY)
+ continue;
+
+ ASSERT_DIE(atomic_load_explicit(&eh->last_export, memory_order_acquire) == NULL);
+ bmap_reset(&eh->seq_map, 1024);
+ }
+
+ tab->exporter.next_seq = 1;
+ }
+ else
+ {
+ reb = HEAD(tab->exporter.pending);
+ next = &reb->export[0];
+ }
+ }
+
+ first = next;
+ }
+
+done:;
+ struct rt_import_hook *ih; node *x;
+ _Bool imports_stopped = 0;
+ WALK_LIST2_DELSAFE(ih, n, x, tab->imports, n)
+ if (ih->import_state == TIS_WAITING)
+ if (!first || (first->seq >= ih->flush_seq))
+ {
+ ih->import_state = TIS_CLEARED;
+ ih->stopped(ih->req);
+ rem_node(&ih->n);
+ mb_free(ih);
+ rt_unlock_table(tab);
+ imports_stopped = 1;
+ }
+
+ if (tab->export_used)
+ ev_schedule(tab->rt_event);
+
+ if (imports_stopped)
+ {
+ if (config->table_debug)
+ log(L_TRACE "%s: Sources pruning routine requested", tab->name);
+
+ rt_prune_sources();
+ }
+
+ if (EMPTY_LIST(tab->exporter.pending) && tm_active(tab->exporter.export_timer))
+ tm_stop(tab->exporter.export_timer);
}
/**
@@ -3040,6 +3503,11 @@ rt_next_hop_update_net(rtable *tab, net *n)
/* Replace the route in the list */
new->next = e->next;
*k = e = new;
+
+ /* Get a new ID for the route */
+ new->rte.lastmod = current_time();
+ new->rte.id = hmap_first_zero(&tab->id_map);
+ hmap_set(&tab->id_map, new->rte.id);
}
ASSERT_DIE(pos <= count);
@@ -3066,14 +3534,14 @@ rt_next_hop_update_net(rtable *tab, net *n)
for (int i=0; i<count; i++)
{
_Bool nb = (new == updates[i].new), ob = (old_best == updates[i].old);
- const char *best_indicator[2][2] = { { "updated", "updated [-best]" }, { "updated [+best]", "updated [best]" } };
+ const char *best_indicator[2][2] = {
+ { "autoupdated", "autoupdated [-best]" },
+ { "autoupdated [+best]", "autoupdated [best]" }
+ };
rt_rte_trace_in(D_ROUTES, updates[i].new->rte.sender->req, &updates[i].new->rte, best_indicator[nb][ob]);
rte_announce_i(tab, n, updates[i].new, updates[i].old, new, old_best);
}
- for (int i=0; i<count; i++)
- rte_free(updates[i].old);
-
return count;
}
@@ -3252,6 +3720,16 @@ rt_commit(struct config *new, struct config *old)
DBG("\tdone\n");
}
+static void
+rt_feed_done(struct rt_export_hook *c)
+{
+ c->event->hook = rt_export_hook;
+
+ rt_set_export_state(c, TES_READY);
+
+ ev_schedule_work(c->event);
+}
+
/**
* rt_feed_by_fib - advertise all routes to a channel by walking a fib
* @c: channel to be fed
@@ -3269,7 +3747,7 @@ rt_feed_by_fib(void *data)
struct fib_iterator *fit = &c->feed_fit;
int max_feed = 256;
- ASSERT(c->export_state == TES_FEEDING);
+ ASSERT(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING);
rtable *tab = SKIP_BACK(rtable, exporter, c->table);
@@ -3282,14 +3760,15 @@ rt_feed_by_fib(void *data)
return;
}
- ASSERT(c->export_state == TES_FEEDING);
+ if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_FEEDING)
+ return;
if ((c->req->addr_mode == TE_ADDR_NONE) || net_in_netX(n->n.addr, c->req->addr))
max_feed -= rt_feed_net(c, n);
}
FIB_ITERATE_END;
- rt_set_export_state(c, TES_READY);
+ rt_feed_done(c);
}
static void
@@ -3303,7 +3782,7 @@ rt_feed_by_trie(void *data)
int max_feed = 256;
- ASSERT_DIE(c->export_state == TES_FEEDING);
+ ASSERT(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING);
net_addr addr;
while (trie_walk_next(ws, &addr))
@@ -3315,7 +3794,8 @@ rt_feed_by_trie(void *data)
if ((max_feed -= rt_feed_net(c, n)) <= 0)
return;
- ASSERT_DIE(c->export_state == TES_FEEDING);
+ if (atomic_load_explicit(&c->export_state, memory_order_acquire) != TES_FEEDING)
+ return;
}
rt_unlock_trie(tab, c->walk_lock);
@@ -3324,7 +3804,7 @@ rt_feed_by_trie(void *data)
mb_free(c->walk_state);
c->walk_state = NULL;
- rt_set_export_state(c, TES_READY);
+ rt_feed_done(c);
}
static void
@@ -3333,14 +3813,14 @@ rt_feed_equal(void *data)
struct rt_export_hook *c = data;
rtable *tab = SKIP_BACK(rtable, exporter, c->table);
- ASSERT_DIE(c->export_state == TES_FEEDING);
+ ASSERT_DIE(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING);
ASSERT_DIE(c->req->addr_mode == TE_ADDR_EQUAL);
net *n = net_find(tab, c->req->addr);
if (n)
rt_feed_net(c, n);
- rt_set_export_state(c, TES_READY);
+ rt_feed_done(c);
}
static void
@@ -3349,52 +3829,53 @@ rt_feed_for(void *data)
struct rt_export_hook *c = data;
rtable *tab = SKIP_BACK(rtable, exporter, c->table);
- ASSERT_DIE(c->export_state == TES_FEEDING);
+ ASSERT_DIE(atomic_load_explicit(&c->export_state, memory_order_relaxed) == TES_FEEDING);
ASSERT_DIE(c->req->addr_mode == TE_ADDR_FOR);
net *n = net_route(tab, c->req->addr);
if (n)
rt_feed_net(c, n);
- rt_set_export_state(c, TES_READY);
+ rt_feed_done(c);
}
static uint
rt_feed_net(struct rt_export_hook *c, net *n)
{
- if (c->req->export_bulk)
- {
- uint count = rte_feed_count(n);
- if (count)
- {
- rte_update_lock();
- rte **feed = alloca(count * sizeof(rte *));
- rte_feed_obtain(n, feed, count);
- struct rt_pending_export rpe = { .new_best = n->routes };
- c->req->export_bulk(c->req, n->n.addr, &rpe, feed, count);
- rte_update_unlock();
- }
- return count;
- }
+ uint count = 0;
- if (n->routes && rte_is_valid(&n->routes->rte))
- {
- rte_update_lock();
- struct rt_pending_export rpe = { .new = n->routes, .new_best = n->routes };
- c->req->export_one(c->req, n->n.addr, &rpe);
- rte_update_unlock();
- return 1;
- }
+ if (c->req->export_bulk)
+ {
+ count = rte_feed_count(n);
+ if (count)
+ {
+ rte_update_lock();
+ rte **feed = alloca(count * sizeof(rte *));
+ rte_feed_obtain(n, feed, count);
+ c->req->export_bulk(c->req, n->n.addr, NULL, feed, count);
+ rte_update_unlock();
+ }
+ }
- return 0;
-}
+ else if (n->routes)
+ {
+ rte_update_lock();
+ struct rt_pending_export rpe = { .new = n->routes, .new_best = n->routes };
+ c->req->export_one(c->req, n->n.addr, &rpe);
+ rte_update_unlock();
+ count = 1;
+ }
+ for (struct rt_pending_export *rpe = n->first; rpe; rpe = rpe_next(rpe, NULL))
+ rpe_mark_seen(c, rpe);
+
+ return count;
+}
/*
* Import table
*/
-
void channel_reload_export_bulk(struct rt_export_request *req, const net_addr *net, struct rt_pending_export *rpe UNUSED, rte **feed, uint count)
{
struct channel *c = SKIP_BACK(struct channel, reload_req, req);