summaryrefslogtreecommitdiff
path: root/nest/proto.c
diff options
context:
space:
mode:
Diffstat (limited to 'nest/proto.c')
-rw-r--r--nest/proto.c158
1 files changed, 158 insertions, 0 deletions
diff --git a/nest/proto.c b/nest/proto.c
index 7b359152..6bd2427b 100644
--- a/nest/proto.c
+++ b/nest/proto.c
@@ -48,6 +48,7 @@ static char *e_states[] = { "DOWN", "FEEDING", "READY" };
extern struct protocol proto_unix_iface;
+static void channel_request_reload(struct channel *c);
static void proto_shutdown_loop(timer *);
static void proto_rethink_goal(struct proto *p);
static char *proto_state_name(struct proto *p);
@@ -180,6 +181,8 @@ proto_add_channel(struct proto *p, struct channel_config *cf)
c->last_state_change = current_time();
c->reloadable = 1;
+ init_list(&c->roa_subscriptions);
+
CALL(c->channel->init, c, cf);
add_tail(&p->channels, &c->n);
@@ -256,10 +259,15 @@ channel_feed_loop(void *ptr)
if (c->export_state != ES_FEEDING)
return;
+ /* Start feeding */
if (!c->feed_active)
+ {
if (c->proto->feed_begin)
c->proto->feed_begin(c, !c->refeeding);
+ c->refeed_pending = 0;
+ }
+
// DBG("Feeding protocol %s continued\n", p->name);
if (!rt_feed_channel(c))
{
@@ -289,10 +297,133 @@ channel_feed_loop(void *ptr)
if (c->proto->feed_end)
c->proto->feed_end(c);
+
+ /* Restart feeding */
+ if (c->refeed_pending)
+ channel_request_feeding(c);
+}
+
+
+static void
+channel_roa_in_changed(struct rt_subscription *s)
+{
+ struct channel *c = s->data;
+ int active = c->reload_event && ev_active(c->reload_event);
+
+ CD(c, "Reload triggered by RPKI change%s", active ? " - already active" : "");
+
+ if (!active)
+ channel_request_reload(c);
+ else
+ c->reload_pending = 1;
+}
+
+static void
+channel_roa_out_changed(struct rt_subscription *s)
+{
+ struct channel *c = s->data;
+ int active = (c->export_state == ES_FEEDING);
+
+ CD(c, "Feeding triggered by RPKI change%s", active ? " - already active" : "");
+
+ if (!active)
+ channel_request_feeding(c);
+ else
+ c->refeed_pending = 1;
+}
+
+/* Temporary code, subscriptions should be changed to resources */
+struct roa_subscription {
+ struct rt_subscription s;
+ node roa_node;
+};
+
+static int
+channel_roa_is_subscribed(struct channel *c, rtable *tab, int dir)
+{
+ void (*hook)(struct rt_subscription *) =
+ dir ? channel_roa_in_changed : channel_roa_out_changed;
+
+ struct roa_subscription *s;
+ node *n;
+
+ WALK_LIST2(s, n, c->roa_subscriptions, roa_node)
+ if ((s->s.tab == tab) && (s->s.hook == hook))
+ return 1;
+
+ return 0;
}
static void
+channel_roa_subscribe(struct channel *c, rtable *tab, int dir)
+{
+ if (channel_roa_is_subscribed(c, tab, dir))
+ return;
+
+ struct roa_subscription *s = mb_allocz(c->proto->pool, sizeof(struct roa_subscription));
+
+ s->s.hook = dir ? channel_roa_in_changed : channel_roa_out_changed;
+ s->s.data = c;
+ rt_subscribe(tab, &s->s);
+
+ add_tail(&c->roa_subscriptions, &s->roa_node);
+}
+
+static void
+channel_roa_unsubscribe(struct roa_subscription *s)
+{
+ rt_unsubscribe(&s->s);
+ rem_node(&s->roa_node);
+ mb_free(s);
+}
+
+static void
+channel_roa_subscribe_filter(struct channel *c, int dir)
+{
+ const struct filter *f = dir ? c->in_filter : c->out_filter;
+ struct rtable *tab;
+
+ if ((f == FILTER_ACCEPT) || (f == FILTER_REJECT))
+ return;
+
+ struct filter_iterator fit;
+ FILTER_ITERATE_INIT(&fit, f, c->proto->pool);
+
+ FILTER_ITERATE(&fit, fi)
+ {
+ switch (fi->fi_code)
+ {
+ case FI_ROA_CHECK_IMPLICIT:
+ tab = fi->i_FI_ROA_CHECK_IMPLICIT.rtc->table;
+ channel_roa_subscribe(c, tab, dir);
+ break;
+
+ case FI_ROA_CHECK_EXPLICIT:
+ tab = fi->i_FI_ROA_CHECK_EXPLICIT.rtc->table;
+ channel_roa_subscribe(c, tab, dir);
+ break;
+
+ default:
+ break;
+ }
+ }
+ FILTER_ITERATE_END;
+
+ FILTER_ITERATE_CLEANUP(&fit);
+}
+
+static void
+channel_roa_unsubscribe_all(struct channel *c)
+{
+ struct roa_subscription *s;
+ node *n, *x;
+
+ WALK_LIST2_DELSAFE(s, n, x, c->roa_subscriptions, roa_node)
+ channel_roa_unsubscribe(s);
+}
+
+static void
channel_start_export(struct channel *c)
{
ASSERT(c->channel_state == CS_UP);
@@ -329,11 +460,19 @@ channel_reload_loop(void *ptr)
{
struct channel *c = ptr;
+ /* Start reload */
+ if (!c->reload_active)
+ c->reload_pending = 0;
+
if (!rt_reload_channel(c))
{
ev_schedule(c->reload_event);
return;
}
+
+ /* Restart reload */
+ if (c->reload_pending)
+ channel_request_reload(c);
}
static void
@@ -400,6 +539,14 @@ channel_do_start(struct channel *c)
}
static void
+channel_do_up(struct channel *c)
+{
+ /* Register RPKI/ROA subscriptions */
+ channel_roa_subscribe_filter(c, 1);
+ channel_roa_subscribe_filter(c, 0);
+}
+
+static void
channel_do_flush(struct channel *c)
{
rt_schedule_prune(c->table);
@@ -415,6 +562,8 @@ channel_do_flush(struct channel *c)
c->in_table = NULL;
c->reload_event = NULL;
c->out_table = NULL;
+
+ channel_roa_unsubscribe_all(c);
}
static void
@@ -484,6 +633,7 @@ channel_set_state(struct channel *c, uint state)
if (!c->gr_wait && c->proto->rt_notify)
channel_start_export(c);
+ channel_do_up(c);
break;
case CS_FLUSHING:
@@ -694,6 +844,14 @@ channel_reconfigure(struct channel *c, struct channel_config *cf)
if (c->channel_state != CS_UP)
goto done;
+ /* Update RPKI/ROA subscriptions */
+ if (import_changed || export_changed)
+ {
+ channel_roa_unsubscribe_all(c);
+ channel_roa_subscribe_filter(c, 1);
+ channel_roa_subscribe_filter(c, 0);
+ }
+
if (reconfigure_type == RECONFIG_SOFT)
{
if (import_changed)