diff options
Diffstat (limited to 'proto/aggregator/aggregator.c')
-rw-r--r-- | proto/aggregator/aggregator.c | 731 |
1 files changed, 731 insertions, 0 deletions
diff --git a/proto/aggregator/aggregator.c b/proto/aggregator/aggregator.c new file mode 100644 index 00000000..9d95c7db --- /dev/null +++ b/proto/aggregator/aggregator.c @@ -0,0 +1,731 @@ +/* + * BIRD Internet Routing Daemon -- Route aggregation + * + * (c) 2023--2023 Igor Putovny <igor.putovny@nic.cz> + * (c) 2023 CZ.NIC, z.s.p.o. + * + * Can be freely distributed and used under the terms of the GNU GPL. + */ + +/** + * DOC: Route aggregation + * + * This is an implementation of route aggregation functionality. + * It enables user to specify a set of route attributes in the configuarion file + * and then, for a given destination (net), aggregate routes with the same + * values of these attributes into a single multi-path route. + * + * Structure &channel contains pointer to aggregation list which is represented + * by &aggr_list_linearized. In rt_notify_aggregated(), attributes from this + * list are evaluated for every route of a given net and results are stored + * in &rte_val_list which contains pointer to this route and array of &f_val. + * Array of pointers to &rte_val_list entries is sorted using + * sort_rte_val_list(). For comparison of &f_val structures, val_compare() + * is used. Comparator function is written so that sorting is stable. If all + * attributes have the same values, routes are compared by their global IDs. + * + * After sorting, &rte_val_list entries containing equivalent routes will be + * adjacent to each other. Function process_rte_list() iterates through these + * entries to identify sequences of equivalent routes. New route will be + * created for each such sequence, even if only from a single route. + * Only attributes from the aggreagation list will be set for the new route. + * New &rta is created and prepare_rta() is used to copy static and dynamic + * attributes to new &rta from &rta of the original route. New route is created + * by create_merged_rte() from new &rta and exported to the routing table. + */ + +#undef LOCAL_DEBUG + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include "nest/bird.h" +#include "nest/iface.h" +#include "filter/filter.h" +#include "aggregator.h" + +#include <stdlib.h> +/* +#include "nest/route.h" +#include "nest/iface.h" +#include "lib/resource.h" +#include "lib/event.h" +#include "lib/timer.h" +#include "lib/string.h" +#include "conf/conf.h" +#include "filter/filter.h" +#include "filter/data.h" +#include "lib/hash.h" +#include "lib/string.h" +#include "lib/alloca.h" +#include "lib/flowspec.h" +*/ + +/* Context of &f_val comparison. */ +struct cmp_ctx { + const struct aggregator_proto *p; + const struct network *net; + const int val_count; + u32 failed:1; +}; + +extern linpool *rte_update_pool; + +/* + * Set static attribute in @rta from static attribute in @old according to @sa. + */ +static void +rta_set_static_attr(struct rta *rta, const struct rta *old, struct f_static_attr sa) +{ + switch (sa.sa_code) + { + case SA_NET: + break; + + case SA_FROM: + rta->from = old->from; + break; + + case SA_GW: + rta->dest = RTD_UNICAST; + rta->nh.gw = old->nh.gw; + rta->nh.iface = old->nh.iface; + rta->nh.next = NULL; + rta->hostentry = NULL; + rta->nh.labels = 0; + break; + + case SA_SCOPE: + rta->scope = old->scope; + break; + + case SA_DEST: + rta->dest = old->dest; + rta->nh.gw = IPA_NONE; + rta->nh.iface = NULL; + rta->nh.next = NULL; + rta->hostentry = NULL; + rta->nh.labels = 0; + break; + + case SA_IFNAME: + rta->dest = RTD_UNICAST; + rta->nh.gw = IPA_NONE; + rta->nh.iface = old->nh.iface; + rta->nh.next = NULL; + rta->hostentry = NULL; + rta->nh.labels = 0; + break; + + case SA_GW_MPLS: + rta->nh.labels = old->nh.labels; + memcpy(&rta->nh.label, &old->nh.label, sizeof(u32) * old->nh.labels); + break; + + case SA_WEIGHT: + rta->nh.weight = old->nh.weight; + break; + + case SA_PREF: + rta->pref = old->pref; + break; + + default: + bug("Invalid static attribute access (%u/%u)", sa.f_type, sa.sa_code); + } +} + +/* + * Compare list of &f_val entries. + * @count: number of &f_val entries + */ +static int +same_val_list(const struct f_val *v1, const struct f_val *v2, uint len) +{ + for (uint i = 0; i < len; i++) + if (!val_same(&v1[i], &v2[i])) + return 0; + + return 1; +} + +/* + * Create and export new merged route. + * @old: first route in a sequence of equivalent routes that are to be merged + * @rte_val: first element in a sequence of equivalent rte_val_list entries + * @length: number of equivalent routes that are to be merged (at least 1) + * @ail: aggregation list + */ +static void +aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *bucket, struct network *net) +{ + /* Empty bucket */ + if (!bucket->rte) + { + rte_update2(p->dst, net->n.addr, NULL, bucket->last_src); + bucket->last_src = NULL; + return; + } + + /* Allocate RTA and EA list */ + struct rta *rta = allocz(rta_size(bucket->rte->attrs)); + rta->dest = RTD_UNREACHABLE; + rta->source = RTS_AGGREGATED; + rta->scope = SCOPE_UNIVERSE; + + struct ea_list *eal = allocz(sizeof(struct ea_list) + sizeof(struct eattr) * p->aggr_on_da_count); + eal->next = NULL; + eal->count = 0; + rta->eattrs = eal; + + /* Seed the attributes from aggregator rule */ + for (uint i = 0; i < p->aggr_on_count; i++) + { + if (p->aggr_on[i].type == AGGR_ITEM_DYNAMIC_ATTR) + { + u32 ea_code = p->aggr_on[i].da.ea_code; + const struct eattr *e = ea_find(bucket->rte->attrs->eattrs, ea_code); + + if (e) + eal->attrs[eal->count++] = *e; + } + else if (p->aggr_on[i].type == AGGR_ITEM_STATIC_ATTR) + rta_set_static_attr(rta, bucket->rte->attrs, p->aggr_on[i].sa); + } + + struct rte *new = rte_get_temp(rta, bucket->rte->src); + new->net = net; + + /* + log("=============== CREATE MERGED ROUTE ==============="); + log("New route created: id = %d, protocol: %s", new->src->global_id, new->src->proto->name); + log("==================================================="); + */ + + /* merge filter needs one argument called "routes" */ + struct f_val val = { + .type = T_ROUTES_BLOCK, + .val.rte = bucket->rte, + }; + + /* Actually run the filter */ + enum filter_return fret = f_eval_rte(p->merge_by, &new, rte_update_pool, 1, &val, 0); + + /* Src must be stored now, rte_update2() may return new */ + struct rte_src *new_src = new ? new->src : NULL; + + /* Finally import the route */ + switch (fret) + { + /* Pass the route to the protocol */ + case F_ACCEPT: + rte_update2(p->dst, net->n.addr, new, bucket->last_src ?: new->src); + break; + + /* Something bad happened */ + default: + ASSERT_DIE(fret == F_ERROR); + /* fall through */ + + /* We actually don't want this route */ + case F_REJECT: + if (bucket->last_src) + rte_update2(p->dst, net->n.addr, NULL, bucket->last_src); + break; + } + + /* Switch source lock for bucket->last_src */ + if (bucket->last_src != new_src) + { + if (new_src) + rt_lock_source(new_src); + if (bucket->last_src) + rt_unlock_source(bucket->last_src); + + bucket->last_src = new_src; + } +} + +/* + * Reload all the buckets on reconfiguration if merge filter has changed. + * TODO: make this splitted + */ +static void +aggregator_reload_buckets(void *data) +{ + struct aggregator_proto *p = data; + + HASH_WALK(p->buckets, next_hash, b) + if (b->rte) + aggregator_bucket_update(p, b, b->rte->net); + HASH_WALK_END; +} + + +/* + * Evaluate static attribute of @rt1 according to @sa + * and store result in @pos. + */ +static void +eval_static_attr(const struct rte *rt1, struct f_static_attr sa, struct f_val *pos) +{ + const struct rta *rta = rt1->attrs; + +#define RESULT(_type, value, result) \ + do { \ + pos->type = _type; \ + pos->val.value = result; \ + } while (0) + + switch (sa.sa_code) + { + case SA_NET: RESULT(sa.f_type, net, rt1->net->n.addr); break; + case SA_FROM: RESULT(sa.f_type, ip, rta->from); break; + case SA_GW: RESULT(sa.f_type, ip, rta->nh.gw); break; + case SA_PROTO: RESULT(sa.f_type, s, rt1->src->proto->name); break; + case SA_SOURCE: RESULT(sa.f_type, i, rta->source); break; + case SA_SCOPE: RESULT(sa.f_type, i, rta->scope); break; + case SA_DEST: RESULT(sa.f_type, i, rta->dest); break; + case SA_IFNAME: RESULT(sa.f_type, s, rta->nh.iface ? rta->nh.iface->name : ""); break; + case SA_IFINDEX: RESULT(sa.f_type, i, rta->nh.iface ? rta->nh.iface->index : 0); break; + case SA_WEIGHT: RESULT(sa.f_type, i, rta->nh.weight + 1); break; + case SA_PREF: RESULT(sa.f_type, i, rta->pref); break; + case SA_GW_MPLS: RESULT(sa.f_type, i, rta->nh.labels ? rta->nh.label[0] : MPLS_NULL); break; + default: + bug("Invalid static attribute access (%u/%u)", sa.f_type, sa.sa_code); + } + +#undef RESULT +} + +/* + * Evaluate dynamic attribute of @rt1 according to @da + * and store result in @pos. + */ +static void +eval_dynamic_attr(const struct rte *rt1, struct f_dynamic_attr da, struct f_val *pos) +{ + const struct rta *rta = rt1->attrs; + const struct eattr *e = ea_find(rta->eattrs, da.ea_code); + +#define RESULT(_type, value, result) \ + do { \ + pos->type = _type; \ + pos->val.value = result; \ + } while (0) + +#define RESULT_VOID \ + do { \ + pos->type = T_VOID; \ + } while (0) + + if (!e) + { + /* A special case: undefined as_path looks like empty as_path */ + if (da.type == EAF_TYPE_AS_PATH) + { + RESULT(T_PATH, ad, &null_adata); + return; + } + + /* The same special case for int_set */ + if (da.type == EAF_TYPE_INT_SET) + { + RESULT(T_CLIST, ad, &null_adata); + return; + } + + /* The same special case for ec_set */ + if (da.type == EAF_TYPE_EC_SET) + { + RESULT(T_ECLIST, ad, &null_adata); + return; + } + + /* The same special case for lc_set */ + if (da.type == EAF_TYPE_LC_SET) + { + RESULT(T_LCLIST, ad, &null_adata); + return; + } + + /* Undefined value */ + RESULT_VOID; + return; + } + + switch (e->type & EAF_TYPE_MASK) + { + case EAF_TYPE_INT: + RESULT(da.f_type, i, e->u.data); + break; + case EAF_TYPE_ROUTER_ID: + RESULT(T_QUAD, i, e->u.data); + break; + case EAF_TYPE_OPAQUE: + RESULT(T_ENUM_EMPTY, i, 0); + break; + case EAF_TYPE_IP_ADDRESS: + RESULT(T_IP, ip, *((ip_addr *) e->u.ptr->data)); + break; + case EAF_TYPE_AS_PATH: + RESULT(T_PATH, ad, e->u.ptr); + break; + case EAF_TYPE_BITFIELD: + RESULT(T_BOOL, i, !!(e->u.data & (1u << da.bit))); + break; + case EAF_TYPE_INT_SET: + RESULT(T_CLIST, ad, e->u.ptr); + break; + case EAF_TYPE_EC_SET: + RESULT(T_ECLIST, ad, e->u.ptr); + break; + case EAF_TYPE_LC_SET: + RESULT(T_LCLIST, ad, e->u.ptr); + break; + default: + bug("Unknown dynamic attribute type"); + } + +#undef RESULT +#undef RESULT_VOID +} + +static inline u32 aggr_route_hash(const rte *e) +{ + struct { + net *net; + struct rte_src *src; + } obj = { + .net = e->net, + .src = e->src, + }; + + return mem_hash(&obj, sizeof obj); +} + +#define AGGR_RTE_KEY(n) (&(n)->rte) +#define AGGR_RTE_NEXT(n) ((n)->next_hash) +#define AGGR_RTE_EQ(a,b) (((a)->src == (b)->src) && ((a)->net == (b)->net)) +#define AGGR_RTE_FN(_n) aggr_route_hash(_n) +#define AGGR_RTE_ORDER 4 /* Initial */ + +#define AGGR_RTE_REHASH aggr_rte_rehash +#define AGGR_RTE_PARAMS /8, *2, 2, 2, 4, 24 + +HASH_DEFINE_REHASH_FN(AGGR_RTE, struct aggregator_route); + + +#define AGGR_BUCK_KEY(n) (n) +#define AGGR_BUCK_NEXT(n) ((n)->next_hash) +#define AGGR_BUCK_EQ(a,b) (((a)->hash == (b)->hash) && (same_val_list((a)->aggr_data, (b)->aggr_data, p->aggr_on_count))) +#define AGGR_BUCK_FN(n) ((n)->hash) +#define AGGR_BUCK_ORDER 4 /* Initial */ + +#define AGGR_BUCK_REHASH aggr_buck_rehash +#define AGGR_BUCK_PARAMS /8, *2, 2, 2, 4, 24 + +HASH_DEFINE_REHASH_FN(AGGR_BUCK, struct aggregator_bucket); + + +#define AGGR_DATA_MEMSIZE (sizeof(struct f_val) * p->aggr_on_count) + +static void +aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new, rte *old) +{ + struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P); + ASSERT_DIE(src_ch == p->src); + struct aggregator_bucket *new_bucket = NULL, *old_bucket = NULL; + struct aggregator_route *old_route = NULL; + + /* Find the objects for the old route */ + if (old) + old_route = HASH_FIND(p->routes, AGGR_RTE, old); + + if (old_route) + old_bucket = old_route->bucket; + + /* Find the bucket for the new route */ + if (new) + { + /* Routes are identical, do nothing */ + if (old_route && rte_same(&old_route->rte, new)) + return; + + /* Evaluate route attributes. */ + struct aggregator_bucket *tmp_bucket = sl_allocz(p->bucket_slab); + + for (uint val_idx = 0; val_idx < p->aggr_on_count; val_idx++) + { + int type = p->aggr_on[val_idx].type; + + switch (type) + { + case AGGR_ITEM_TERM: { + const struct f_line *line = p->aggr_on[val_idx].line; + struct rte *rt1 = new; + enum filter_return fret = f_eval_rte(line, &new, rte_update_pool, 0, NULL, &tmp_bucket->aggr_data[val_idx]); + + if (rt1 != new) + { + rte_free(rt1); + log(L_WARN "Aggregator rule modifies the route, reverting"); + } + + if (fret > F_RETURN) + log(L_WARN "%s.%s: Wrong number of items left on stack after evaluation of aggregation list", rt1->src->proto->name, rt1->sender); + + break; + } + + case AGGR_ITEM_STATIC_ATTR: { + struct f_val *pos = &tmp_bucket->aggr_data[val_idx]; + eval_static_attr(new, p->aggr_on[val_idx].sa, pos); + break; + } + + case AGGR_ITEM_DYNAMIC_ATTR: { + struct f_val *pos = &tmp_bucket->aggr_data[val_idx]; + eval_dynamic_attr(new, p->aggr_on[val_idx].da, pos); + break; + } + + default: + break; + } + } + + /* Compute the hash */ + tmp_bucket->hash = mem_hash(tmp_bucket->aggr_data, AGGR_DATA_MEMSIZE); + + /* Find the existing bucket */ + if (new_bucket = HASH_FIND(p->buckets, AGGR_BUCK, tmp_bucket)) + sl_free(tmp_bucket); + else + { + new_bucket = tmp_bucket; + HASH_INSERT2(p->buckets, AGGR_BUCK, p->p.pool, new_bucket); + } + + /* Store the route attributes */ + if (rta_is_cached(new->attrs)) + rta_clone(new->attrs); + else + new->attrs = rta_lookup(new->attrs); + + /* Insert the new route into the bucket */ + struct aggregator_route *arte = sl_alloc(p->route_slab); + *arte = (struct aggregator_route) { + .bucket = new_bucket, + .rte = *new, + }; + arte->rte.next = new_bucket->rte, + new_bucket->rte = &arte->rte; + new_bucket->count++; + HASH_INSERT2(p->routes, AGGR_RTE, p->p.pool, arte); + } + + /* Remove the old route from its bucket */ + if (old_bucket) + { + for (struct rte **k = &old_bucket->rte; *k; k = &(*k)->next) + if (*k == &old_route->rte) + { + *k = (*k)->next; + break; + } + + old_bucket->count--; + HASH_REMOVE2(p->routes, AGGR_RTE, p->p.pool, old_route); + rta_free(old_route->rte.attrs); + sl_free(old_route); + } + + /* Announce changes */ + if (old_bucket) + aggregator_bucket_update(p, old_bucket, net); + + if (new_bucket && (new_bucket != old_bucket)) + aggregator_bucket_update(p, new_bucket, net); + + /* Cleanup the old bucket if empty */ + if (old_bucket && (!old_bucket->rte || !old_bucket->count)) + { + ASSERT_DIE(!old_bucket->rte && !old_bucket->count); + HASH_REMOVE2(p->buckets, AGGR_BUCK, p->p.pool, old_bucket); + sl_free(old_bucket); + } +} + +static int +aggregator_preexport(struct channel *C, struct rte *new) +{ + struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, C->proto); + /* Reject our own routes */ + if (new->sender == p->dst) + return -1; + + /* Disallow aggregating already aggregated routes */ + if (new->attrs->source == RTS_AGGREGATED) + { + log(L_ERR "Multiple aggregations of the same route not supported in BIRD 2."); + return -1; + } + + return 0; +} + +static void +aggregator_postconfig(struct proto_config *CF) +{ + struct aggregator_config *cf = SKIP_BACK(struct aggregator_config, c, CF); + + if (!cf->dst->table) + cf_error("Source table not specified"); + + if (!cf->src->table) + cf_error("Destination table not specified"); + + if (cf->dst->table->addr_type != cf->src->table->addr_type) + cf_error("Both tables must be of the same type"); + + cf->dst->in_filter = cf->src->in_filter; + + cf->src->in_filter = FILTER_REJECT; + cf->dst->out_filter = FILTER_REJECT; + + cf->dst->debug = cf->src->debug; +} + +static struct proto * +aggregator_init(struct proto_config *CF) +{ + struct proto *P = proto_new(CF); + struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P); + struct aggregator_config *cf = SKIP_BACK(struct aggregator_config, c, CF); + + proto_configure_channel(P, &p->src, cf->src); + proto_configure_channel(P, &p->dst, cf->dst); + + p->aggr_on_count = cf->aggr_on_count; + p->aggr_on_da_count = cf->aggr_on_da_count; + p->aggr_on = cf->aggr_on; + p->merge_by = cf->merge_by; + + P->rt_notify = aggregator_rt_notify; + P->preexport = aggregator_preexport; + + return P; +} + +static int +aggregator_start(struct proto *P) +{ + struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P); + + p->bucket_slab = sl_new(P->pool, sizeof(struct aggregator_bucket) + AGGR_DATA_MEMSIZE); + HASH_INIT(p->buckets, P->pool, AGGR_BUCK_ORDER); + + p->route_slab = sl_new(P->pool, sizeof(struct aggregator_route)); + HASH_INIT(p->routes, P->pool, AGGR_RTE_ORDER); + + p->reload_buckets = (event) { + .hook = aggregator_reload_buckets, + .data = p, + }; + + return PS_UP; +} + +static int +aggregator_shutdown(struct proto *P) +{ + struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P); + + HASH_WALK_DELSAFE(p->buckets, next_hash, b) + { + while (b->rte) + { + struct aggregator_route *arte = SKIP_BACK(struct aggregator_route, rte, b->rte); + b->rte = arte->rte.next; + b->count--; + HASH_REMOVE(p->routes, AGGR_RTE, arte); + rta_free(arte->rte.attrs); + sl_free(arte); + } + + ASSERT_DIE(b->count == 0); + HASH_REMOVE(p->buckets, AGGR_BUCK, b); + sl_free(b); + } + HASH_WALK_END; + + return PS_DOWN; +} + +static int +aggregator_reconfigure(struct proto *P, struct proto_config *CF) +{ + struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P); + struct aggregator_config *cf = SKIP_BACK(struct aggregator_config, c, CF); + + TRACE(D_EVENTS, "Reconfiguring"); + + /* Compare numeric values (shortcut) */ + if (cf->aggr_on_count != p->aggr_on_count) + return 0; + + if (cf->aggr_on_da_count != p->aggr_on_da_count) + return 0; + + /* Compare aggregator rule */ + for (uint i = 0; i < p->aggr_on_count; i++) + switch (cf->aggr_on[i].type) + { + case AGGR_ITEM_TERM: + if (!f_same(cf->aggr_on[i].line, p->aggr_on[i].line)) + return 0; + break; + case AGGR_ITEM_STATIC_ATTR: + if (memcmp(&cf->aggr_on[i].sa, &p->aggr_on[i].sa, sizeof(struct f_static_attr)) != 0) + return 0; + break; + case AGGR_ITEM_DYNAMIC_ATTR: + if (memcmp(&cf->aggr_on[i].da, &p->aggr_on[i].da, sizeof(struct f_dynamic_attr)) != 0) + return 0; + break; + default: + bug("Broken aggregator rule"); + } + + /* Compare merge filter */ + if (!f_same(cf->merge_by, p->merge_by)) + ev_schedule(&p->reload_buckets); + + p->aggr_on = cf->aggr_on; + p->merge_by = cf->merge_by; + + return 1; +} + +struct protocol proto_aggregator = { + .name = "Aggregator", + .template = "aggregator%d", + .class = PROTOCOL_AGGREGATOR, + .preference = 1, + .channel_mask = NB_ANY, + .proto_size = sizeof(struct aggregator_proto), + .config_size = sizeof(struct aggregator_config), + .postconfig = aggregator_postconfig, + .init = aggregator_init, + .start = aggregator_start, + .shutdown = aggregator_shutdown, + .reconfigure = aggregator_reconfigure, +}; + +void +aggregator_build(void) +{ + proto_build(&proto_aggregator); +} |