summaryrefslogtreecommitdiff
path: root/sysdep/unix/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'sysdep/unix/io.c')
-rw-r--r--sysdep/unix/io.c235
1 files changed, 167 insertions, 68 deletions
diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c
index 4fd77453..0e5adc14 100644
--- a/sysdep/unix/io.c
+++ b/sysdep/unix/io.c
@@ -36,12 +36,14 @@
#include "lib/resource.h"
#include "lib/socket.h"
#include "lib/event.h"
+#include "lib/locking.h"
#include "lib/timer.h"
#include "lib/string.h"
#include "nest/iface.h"
#include "conf/conf.h"
#include "sysdep/unix/unix.h"
+#include "sysdep/unix/io-loop.h"
#include CONFIG_INCLUDE_SYSIO_H
/* Maximum number of calls of tx handler for one socket in one
@@ -122,55 +124,50 @@ rf_fileno(struct rfile *f)
btime boot_time;
+
void
-times_init(struct timeloop *loop)
+times_update(void)
{
struct timespec ts;
int rv;
+ btime old_time = current_time();
+ btime old_real_time = current_real_time();
+
rv = clock_gettime(CLOCK_MONOTONIC, &ts);
if (rv < 0)
die("Monotonic clock is missing");
if ((ts.tv_sec < 0) || (((u64) ts.tv_sec) > ((u64) 1 << 40)))
log(L_WARN "Monotonic clock is crazy");
-
- loop->last_time = ts.tv_sec S + ts.tv_nsec NS;
- loop->real_time = 0;
-}
-
-void
-times_update(struct timeloop *loop)
-{
- struct timespec ts;
- int rv;
-
- rv = clock_gettime(CLOCK_MONOTONIC, &ts);
- if (rv < 0)
- die("clock_gettime: %m");
-
+
btime new_time = ts.tv_sec S + ts.tv_nsec NS;
- if (new_time < loop->last_time)
+ if (new_time < old_time)
log(L_ERR "Monotonic clock is broken");
- loop->last_time = new_time;
- loop->real_time = 0;
-}
-
-void
-times_update_real_time(struct timeloop *loop)
-{
- struct timespec ts;
- int rv;
-
rv = clock_gettime(CLOCK_REALTIME, &ts);
if (rv < 0)
die("clock_gettime: %m");
- loop->real_time = ts.tv_sec S + ts.tv_nsec NS;
-}
+ btime new_real_time = ts.tv_sec S + ts.tv_nsec NS;
+
+ if (!atomic_compare_exchange_strong_explicit(
+ &last_time,
+ &old_time,
+ new_time,
+ memory_order_acq_rel,
+ memory_order_relaxed))
+ DBG("Time update collision: last_time");
+ if (!atomic_compare_exchange_strong_explicit(
+ &real_time,
+ &old_real_time,
+ new_real_time,
+ memory_order_acq_rel,
+ memory_order_relaxed))
+ DBG("Time update collision: real_time");
+}
/**
* DOC: Sockets
@@ -797,6 +794,7 @@ sk_free(resource *r)
{
sock *s = (sock *) r;
+ ASSERT_DIE(!s->loop || birdloop_inside(s->loop));
sk_free_bufs(s);
#ifdef HAVE_LIBSSH
@@ -804,20 +802,30 @@ sk_free(resource *r)
sk_ssh_free(s);
#endif
- if (s->fd < 0)
- return;
+ if (s->cork)
+ {
+ LOCK_DOMAIN(cork, s->cork->lock);
+ if (enlisted(&s->cork_node))
+ rem_node(&s->cork_node);
+ UNLOCK_DOMAIN(cork, s->cork->lock);
+ }
- /* FIXME: we should call sk_stop() for SKF_THREAD sockets */
- if (!(s->flags & SKF_THREAD))
+ if (!s->loop)
+ ;
+ else if (s->flags & SKF_THREAD)
+ sk_stop(s);
+ else
{
if (s == current_sock)
current_sock = sk_next(s);
if (s == stored_sock)
stored_sock = sk_next(s);
- rem_node(&s->n);
+
+ if (enlisted(&s->n))
+ rem_node(&s->n);
}
- if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
+ if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE && s->fd != -1)
close(s->fd);
s->fd = -1;
@@ -1108,7 +1116,15 @@ sk_passive_connected(sock *s, int type)
return 1;
}
- sk_insert(t);
+ if (s->flags & SKF_PASSIVE_THREAD)
+ t->flags |= SKF_THREAD;
+ else
+ {
+ ASSERT_DIE(s->loop == &main_birdloop);
+ t->loop = &main_birdloop;
+ sk_insert(t);
+ }
+
sk_alloc_bufs(t);
s->rx_hook(t, 0);
return 1;
@@ -1329,6 +1345,17 @@ sk_open(sock *s)
ip_addr bind_addr = IPA_NONE;
sockaddr sa;
+ if (s->flags & SKF_THREAD)
+ {
+ ASSERT_DIE(s->loop && (s->loop != &main_birdloop));
+ ASSERT_DIE(birdloop_inside(s->loop));
+ }
+ else
+ {
+ ASSERT_DIE(!s->loop);
+ s->loop = &main_birdloop;
+ }
+
if (s->type <= SK_IP)
{
/*
@@ -1512,10 +1539,41 @@ sk_open_unix(sock *s, char *name)
return -1;
s->fd = fd;
+ s->loop = &main_birdloop;
sk_insert(s);
return 0;
}
+static void
+sk_reloop_hook(void *_vs)
+{
+ sock *s = _vs;
+ if (birdloop_inside(&main_birdloop))
+ {
+ s->flags &= ~SKF_THREAD;
+ sk_insert(s);
+ }
+ else
+ {
+ s->flags |= SKF_THREAD;
+ sk_start(s);
+ }
+}
+
+void
+sk_reloop(sock *s, struct birdloop *loop)
+{
+ if (enlisted(&s->n))
+ rem_node(&s->n);
+
+ s->reloop = (event) {
+ .hook = sk_reloop_hook,
+ .data = s,
+ };
+
+ ev_send_loop(loop, &s->reloop);
+}
+
#define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \
CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL)
@@ -1661,6 +1719,7 @@ sk_maybe_write(sock *s)
s->err_hook(s, (errno != EPIPE) ? errno : 0);
return -1;
}
+ sk_ping(s);
return 0;
}
s->ttx += e;
@@ -1868,6 +1927,25 @@ sk_read(sock *s, int revents)
case SK_TCP:
case SK_UNIX:
{
+ if (s->cork)
+ {
+ int cont = 0;
+ LOCK_DOMAIN(cork, s->cork->lock);
+ if (!enlisted(&s->cork_node))
+ if (s->cork->count)
+ {
+// log(L_TRACE "Socket %p corked", s);
+ add_tail(&s->cork->sockets, &s->cork_node);
+ sk_ping(s);
+ }
+ else
+ cont = 1;
+ UNLOCK_DOMAIN(cork, s->cork->lock);
+
+ if (!cont)
+ return 0;
+ }
+
int c = read(s->fd, s->rpos, s->rbuf + s->rbsize - s->rpos);
if (c < 0)
@@ -2020,30 +2098,17 @@ struct event_log_entry
static struct event_log_entry event_log[EVENT_LOG_LENGTH];
static struct event_log_entry *event_open;
static int event_log_pos, event_log_num, watchdog_active;
-static btime last_time;
+static btime last_io_time;
static btime loop_time;
static void
io_update_time(void)
{
- struct timespec ts;
- int rv;
-
- /*
- * This is third time-tracking procedure (after update_times() above and
- * times_update() in BFD), dedicated to internal event log and latency
- * tracking. Hopefully, we consolidate these sometimes.
- */
-
- rv = clock_gettime(CLOCK_MONOTONIC, &ts);
- if (rv < 0)
- die("clock_gettime: %m");
-
- last_time = ts.tv_sec S + ts.tv_nsec NS;
+ last_io_time = current_time_update();
if (event_open)
{
- event_open->duration = last_time - event_open->timestamp;
+ event_open->duration = last_io_time - event_open->timestamp;
if (event_open->duration > config->latency_limit)
log(L_WARN "Event 0x%p 0x%p took %d ms",
@@ -2072,7 +2137,7 @@ io_log_event(void *hook, void *data)
en->hook = hook;
en->data = data;
- en->timestamp = last_time;
+ en->timestamp = last_io_time;
en->duration = 0;
event_log_num++;
@@ -2100,14 +2165,14 @@ io_log_dump(void)
struct event_log_entry *en = event_log + (event_log_pos + i) % EVENT_LOG_LENGTH;
if (en->hook)
log(L_DEBUG " Event 0x%p 0x%p at %8d for %d ms", en->hook, en->data,
- (int) ((last_time - en->timestamp) TO_MS), (int) (en->duration TO_MS));
+ (int) ((last_io_time - en->timestamp) TO_MS), (int) (en->duration TO_MS));
}
}
void
watchdog_sigalrm(int sig UNUSED)
{
- /* Update last_time and duration, but skip latency check */
+ /* Update last_io_time and duration, but skip latency check */
config->latency_limit = 0xffffffff;
io_update_time();
@@ -2120,7 +2185,7 @@ watchdog_start1(void)
{
io_update_time();
- loop_time = last_time;
+ loop_time = last_io_time;
}
static inline void
@@ -2128,7 +2193,7 @@ watchdog_start(void)
{
io_update_time();
- loop_time = last_time;
+ loop_time = last_io_time;
event_log_num = 0;
if (config->watchdog_timeout)
@@ -2149,7 +2214,7 @@ watchdog_stop(void)
watchdog_active = 0;
}
- btime duration = last_time - loop_time;
+ btime duration = last_io_time - loop_time;
if (duration > config->watchdog_warning)
log(L_WARN "I/O loop cycle took %d ms for %d events",
(int) (duration TO_MS), event_log_num);
@@ -2164,8 +2229,9 @@ void
io_init(void)
{
init_list(&sock_list);
- init_list(&global_event_list);
- init_list(&global_work_list);
+ ev_init_list(&global_event_list, &main_birdloop, "Global event list");
+ ev_init_list(&global_work_list, &main_birdloop, "Global work list");
+ ev_init_list(&main_birdloop.event_list, &main_birdloop, "Global fast event list");
krt_io_init();
// XXX init_times();
// XXX update_times();
@@ -2179,11 +2245,15 @@ static int short_loops = 0;
#define SHORT_LOOP_MAX 10
#define WORK_EVENTS_MAX 10
+void pipe_drain(int fd);
+void check_stored_pages(void);
+
void
io_loop(void)
{
int poll_tout, timeout;
int nfds, events, pout;
+ int reload_requested = 0;
timer *t;
sock *s;
node *n;
@@ -2193,27 +2263,39 @@ io_loop(void)
watchdog_start1();
for(;;)
{
- times_update(&main_timeloop);
+ times_update();
events = ev_run_list(&global_event_list);
events = ev_run_list_limited(&global_work_list, WORK_EVENTS_MAX) || events;
- timers_fire(&main_timeloop);
+ events = ev_run_list(&main_birdloop.event_list) || events;
+ timers_fire(&main_birdloop.time, 1);
io_close_event();
+#if DEBUGGING
+#define PERIODIC_WAKEUP 86400000
+#else
+#define PERIODIC_WAKEUP 3000
+#endif
+restart_poll:
// FIXME
- poll_tout = (events ? 0 : 3000); /* Time in milliseconds */
- if (t = timers_first(&main_timeloop))
+ poll_tout = ((reload_requested || events) ? 0 : PERIODIC_WAKEUP); /* Time in milliseconds */
+ if (t = timers_first(&main_birdloop.time))
{
- times_update(&main_timeloop);
+ times_update();
timeout = (tm_remains(t) TO_MS) + 1;
poll_tout = MIN(poll_tout, timeout);
}
- nfds = 0;
+ /* A hack to reload main io_loop() when something has changed asynchronously. */
+ pfd[0].fd = main_birdloop.wakeup_fds[0];
+ pfd[0].events = POLLIN;
+
+ nfds = 1;
+
WALK_LIST(n, sock_list)
{
pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */
s = SKIP_BACK(sock, n, n);
- if (s->rx_hook)
+ if (s->rx_hook && !ev_corked(s->cork))
{
pfd[nfds].fd = s->fd;
pfd[nfds].events |= POLLIN;
@@ -2267,7 +2349,9 @@ io_loop(void)
/* And finally enter poll() to find active sockets */
watchdog_stop();
+ birdloop_leave(&main_birdloop);
pout = poll(pfd, nfds, poll_tout);
+ birdloop_enter(&main_birdloop);
watchdog_start();
if (pout < 0)
@@ -2276,9 +2360,24 @@ io_loop(void)
continue;
die("poll: %m");
}
+
+ if (pout && (pfd[0].revents & POLLIN))
+ {
+ /* IO loop reload requested */
+ pipe_drain(main_birdloop.wakeup_fds[0]);
+ reload_requested = 1;
+ goto restart_poll;
+ }
+
+ if (reload_requested)
+ {
+ reload_requested = 0;
+ atomic_exchange_explicit(&main_birdloop.ping_sent, 0, memory_order_acq_rel);
+ }
+
if (pout)
{
- times_update(&main_timeloop);
+ times_update();
/* guaranteed to be non-empty */
current_sock = SKIP_BACK(sock, n, HEAD(sock_list));