summaryrefslogtreecommitdiff
path: root/sysdep/unix
diff options
context:
space:
mode:
Diffstat (limited to 'sysdep/unix')
-rw-r--r--sysdep/unix/io-loop.c206
-rw-r--r--sysdep/unix/io-loop.h9
-rw-r--r--sysdep/unix/io.c155
-rw-r--r--sysdep/unix/main.c2
-rw-r--r--sysdep/unix/unix.h5
5 files changed, 204 insertions, 173 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c
index df162c6e..0e2396f7 100644
--- a/sysdep/unix/io-loop.c
+++ b/sysdep/unix/io-loop.c
@@ -310,59 +310,95 @@ sockets_init(struct birdloop *loop)
loop->sock_num = 0;
}
-static void
-sockets_add(struct birdloop *loop, sock *s)
+void
+socket_changed(sock *s)
+{
+ struct birdloop *loop = s->loop;
+ ASSERT_DIE(birdloop_inside(loop));
+
+ loop->sock_changed++;
+ birdloop_ping(loop);
+}
+
+void
+birdloop_add_socket(struct birdloop *loop, sock *s)
{
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(!s->loop);
+
LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num);
add_tail(&loop->sock_list, &s->n);
loop->sock_num++;
+ s->loop = loop;
s->index = -1;
- if (loop->thread)
- atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
- birdloop_ping(loop);
+ socket_changed(s);
}
+extern sock *stored_sock; /* mainloop hack */
+
void
-sk_start(sock *s)
+birdloop_remove_socket(struct birdloop *loop, sock *s)
{
- ASSERT_DIE(birdloop_current != &main_birdloop);
- sockets_add(birdloop_current, s);
-}
+ ASSERT_DIE(!enlisted(&s->n) == !s->loop);
-static void
-sockets_remove(struct birdloop *loop, sock *s)
-{
- if (!enlisted(&s->n))
+ if (!s->loop)
return;
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(s->loop == loop);
+
/* Decouple the socket from the loop at all. */
LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num);
+ if (loop->sock_active == s)
+ loop->sock_active = sk_next(s);
+
+ if ((loop == &main_birdloop) && (s == stored_sock))
+ stored_sock = sk_next(s);
+
rem_node(&s->n);
loop->sock_num--;
- if (loop->thread)
- atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
+ socket_changed(s);
+
+ s->loop = NULL;
s->index = -1;
+}
- /* Close the filedescriptor. If it ever gets into the poll(), it just returns
- * POLLNVAL for this fd which then is ignored because nobody checks for
- * that result. Or some other routine opens another fd, getting this number,
- * yet also in this case poll() at worst spuriously returns and nobody checks
- * for the result in this fd. No further precaution is needed. */
- close(s->fd);
+void
+sk_reloop(sock *s, struct birdloop *loop)
+{
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(birdloop_inside(s->loop));
+
+ if (loop == s->loop)
+ return;
+
+ birdloop_remove_socket(s->loop, s);
+ birdloop_add_socket(loop, s);
+}
+
+void
+sk_pause_rx(struct birdloop *loop, sock *s)
+{
+ ASSERT_DIE(birdloop_inside(loop));
+ s->rx_hook = NULL;
+ socket_changed(s);
}
void
-sk_stop(sock *s)
+sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint))
{
- sockets_remove(birdloop_current, s);
+ ASSERT_DIE(birdloop_inside(loop));
+ ASSERT_DIE(hook);
+ s->rx_hook = hook;
+ socket_changed(s);
}
static inline uint sk_want_events(sock *s)
-{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
+{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); }
void
sockets_prepare(struct birdloop *loop, struct pfd *pfd)
@@ -392,40 +428,60 @@ sockets_prepare(struct birdloop *loop, struct pfd *pfd)
int sk_read(sock *s, int revents);
int sk_write(sock *s);
+void sk_err(sock *s, int revents);
-static void
+static int
sockets_fire(struct birdloop *loop)
{
+ if (EMPTY_LIST(loop->sock_list))
+ return 0;
+
+ int sch = 0;
+
times_update();
struct pollfd *pfd = loop->thread->pfd->pfd.data;
- sock *s; node *n, *nxt;
- WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
+ loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list));
+
+ while (loop->sock_active)
{
- if (s->index < 0)
- continue;
+ sock *s = loop->sock_active;
- int rev = pfd[s->index].revents;
+ int rev;
+ if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL))
+ {
+ int e = 1;
- if (!rev)
- continue;
+ if (rev & POLLOUT)
+ {
+ while ((s == loop->sock_active) && (e = sk_write(s)))
+ ;
- if (rev & POLLNVAL)
- bug("poll: invalid fd %d", s->fd);
+ if (s != loop->sock_active)
+ continue;
- int e = 1;
+ if (!sk_tx_pending(s))
+ sch++;
+ }
- if (rev & POLLIN)
- while (e && s->rx_hook)
- e = sk_read(s, rev);
+ if (rev & POLLIN)
+ while (e && (s == loop->sock_active) && s->rx_hook)
+ e = sk_read(s, rev);
- if (rev & POLLOUT)
- {
- atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
- while (e = sk_write(s))
- ;
+ if (s != loop->sock_active)
+ continue;
+
+ if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR))
+ sk_err(s, rev);
+
+ if (s != loop->sock_active)
+ continue;
}
+
+ loop->sock_active = sk_next(s);
}
+
+ return sch;
}
/*
@@ -547,7 +603,8 @@ bird_thread_main(void *arg)
thr->meta->thread = thr;
birdloop_enter(thr->meta);
- u32 refresh_sockets = 1;
+ thr->sock_changed = 1;
+
struct pfd pfd;
BUFFER_INIT(pfd.pfd, thr->pool, 16);
BUFFER_INIT(pfd.loop, thr->pool, 16);
@@ -563,7 +620,7 @@ bird_thread_main(void *arg)
{
birdloop_enter(loop);
if (!EMPTY_LIST(loop->sock_list))
- refresh_sockets = 1;
+ thr->sock_changed = 1;
birdloop_leave(loop);
}
@@ -590,10 +647,10 @@ bird_thread_main(void *arg)
ev_run_list(&thr->priority_events);
/* Do we have to refresh sockets? */
- refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel);
-
- if (refresh_sockets)
+ if (thr->sock_changed)
{
+ thr->sock_changed = 0;
+
BUFFER_FLUSH(pfd.pfd);
BUFFER_FLUSH(pfd.loop);
@@ -608,7 +665,6 @@ bird_thread_main(void *arg)
}
ASSERT_DIE(pfd.loop.used == pfd.pfd.used);
- refresh_sockets = 0;
}
/* Nothing to do in at least 5 seconds, flush local hot page cache */
else if (timeout > 5000)
@@ -957,6 +1013,15 @@ birdloop_init(void)
static void
birdloop_stop_internal(struct birdloop *loop)
{
+ LOOP_TRACE(loop, "Stopping");
+
+ /* Block incoming pings */
+ u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire);
+ while (!atomic_compare_exchange_strong_explicit(
+ &loop->thread_transition, &ltt, LTT_PING,
+ memory_order_acq_rel, memory_order_acquire))
+ ;
+
/* Flush remaining events */
ASSERT_DIE(!ev_run_list(&loop->event_list));
@@ -965,17 +1030,27 @@ birdloop_stop_internal(struct birdloop *loop)
while (t = timers_first(&loop->time))
tm_stop(t);
- /* No sockets allowed */
- ASSERT_DIE(EMPTY_LIST(loop->sock_list));
+ /* Drop sockets */
+ sock *s;
+ WALK_LIST_FIRST2(s, n, loop->sock_list)
+ birdloop_remove_socket(loop, s);
/* Unschedule from Meta */
ev_postpone(&loop->event);
tm_stop(&loop->timer);
- /* Declare loop stopped */
+ /* Remove from thread loop list */
rem_node(&loop->n);
+ loop->thread = NULL;
+
+ /* Leave the loop context without causing any other fuss */
+ ASSERT_DIE(!ev_active(&loop->event));
+ loop->ping_pending = 0;
birdloop_leave(loop);
+ /* Request local socket reload */
+ this_thread->sock_changed++;
+
/* Tail-call the stopped hook */
loop->stopped(loop->stop_data);
}
@@ -989,12 +1064,14 @@ birdloop_run(void *_loop)
struct birdloop *loop = _loop;
birdloop_enter(loop);
+ LOOP_TRACE(loop, "Regular run");
+
if (loop->stopped)
/* Birdloop left inside the helper function */
return birdloop_stop_internal(loop);
/* Process sockets */
- sockets_fire(loop);
+ this_thread->sock_changed += sockets_fire(loop);
/* Run timers */
timers_fire(&loop->time, 0);
@@ -1016,6 +1093,10 @@ birdloop_run(void *_loop)
else
tm_stop(&loop->timer);
+ /* Collect socket change requests */
+ this_thread->sock_changed += loop->sock_changed;
+ loop->sock_changed = 0;
+
birdloop_leave(loop);
}
@@ -1074,6 +1155,8 @@ birdloop_new(pool *pp, uint order, const char *name)
static void
birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
{
+ LOOP_TRACE(loop, "Stop requested");
+
loop->stopped = stopped;
loop->stop_data = data;
@@ -1100,8 +1183,7 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat
void
birdloop_free(struct birdloop *loop)
{
- ASSERT_DIE(loop->links == 0);
- ASSERT_DIE(birdloop_in_this_thread(loop));
+ ASSERT_DIE(loop->thread == NULL);
domain_free(loop->time.domain);
rfree(loop->pool);
@@ -1171,20 +1253,6 @@ birdloop_unmask_wakeups(struct birdloop *loop)
}
void
-birdloop_link(struct birdloop *loop)
-{
- ASSERT_DIE(birdloop_inside(loop));
- loop->links++;
-}
-
-void
-birdloop_unlink(struct birdloop *loop)
-{
- ASSERT_DIE(birdloop_inside(loop));
- loop->links--;
-}
-
-void
birdloop_yield(void)
{
usleep(100);
diff --git a/sysdep/unix/io-loop.h b/sysdep/unix/io-loop.h
index e606f07e..7ec903af 100644
--- a/sysdep/unix/io-loop.h
+++ b/sysdep/unix/io-loop.h
@@ -22,6 +22,7 @@ struct pfd {
};
void sockets_prepare(struct birdloop *, struct pfd *);
+void socket_changed(struct birdsock *);
void pipe_new(struct pipe *);
void pipe_pollin(struct pipe *, struct pfd *);
@@ -40,12 +41,12 @@ struct birdloop
struct timeloop time;
event_list event_list;
list sock_list;
+ struct birdsock *sock_active;
int sock_num;
+ uint sock_changed;
uint ping_pending;
- uint links;
-
_Atomic u32 thread_transition;
#define LTT_PING 1
#define LTT_MOVE 2
@@ -66,8 +67,6 @@ struct bird_thread
{
node n;
- _Atomic u32 poll_changed;
-
struct pipe wakeup;
event_list priority_events;
@@ -83,6 +82,8 @@ struct bird_thread
struct pfd *pfd;
event cleanup_event;
+
+ int sock_changed;
};
#endif
diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c
index 06797096..88d187a4 100644
--- a/sysdep/unix/io.c
+++ b/sysdep/unix/io.c
@@ -722,10 +722,7 @@ sk_log_error(sock *s, const char *p)
* Actual struct birdsock code
*/
-static struct birdsock *current_sock;
-static struct birdsock *stored_sock;
-
-static inline sock *
+sock *
sk_next(sock *s)
{
if (!s->n.next->next)
@@ -787,6 +784,7 @@ sk_ssh_free(sock *s)
}
#endif
+
static void
sk_free(resource *r)
{
@@ -799,18 +797,10 @@ sk_free(resource *r)
sk_ssh_free(s);
#endif
- if ((s->fd < 0) || (s->flags & SKF_THREAD))
- return;
-
- if (s == current_sock)
- current_sock = sk_next(s);
- if (s == stored_sock)
- stored_sock = sk_next(s);
-
- if (enlisted(&s->n))
- rem_node(&s->n);
+ if (s->loop)
+ birdloop_remove_socket(s->loop, s);
- if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
+ if (s->fd >= 0 && s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
close(s->fd);
s->fd = -1;
@@ -1023,12 +1013,6 @@ sk_setup(sock *s)
}
static void
-sk_insert(sock *s)
-{
- add_tail(&main_birdloop.sock_list, &s->n);
-}
-
-static void
sk_tcp_connected(sock *s)
{
sockaddr sa;
@@ -1101,10 +1085,7 @@ sk_passive_connected(sock *s, int type)
return 1;
}
- if (s->flags & SKF_PASSIVE_THREAD)
- t->flags |= SKF_THREAD;
- else
- sk_insert(t);
+ birdloop_add_socket(s->loop, t);
sk_alloc_bufs(t);
s->rx_hook(t, 0);
@@ -1319,6 +1300,7 @@ sk_open_ssh(sock *s)
/**
* sk_open - open a socket
+ * @loop: loop
* @s: socket
*
* This function takes a socket resource created by sk_new() and
@@ -1328,7 +1310,7 @@ sk_open_ssh(sock *s)
* Result: 0 for success, -1 for an error.
*/
int
-sk_open(sock *s)
+sk_open(sock *s, struct birdloop *loop)
{
int af = AF_UNSPEC;
int fd = -1;
@@ -1481,9 +1463,7 @@ sk_open(sock *s)
sk_alloc_bufs(s);
}
- if (!(s->flags & SKF_THREAD))
- sk_insert(s);
-
+ birdloop_add_socket(loop, s);
return 0;
err:
@@ -1493,7 +1473,7 @@ err:
}
int
-sk_open_unix(sock *s, char *name)
+sk_open_unix(sock *s, struct birdloop *loop, char *name)
{
struct sockaddr_un sa;
int fd;
@@ -1520,40 +1500,10 @@ sk_open_unix(sock *s, char *name)
return -1;
s->fd = fd;
- sk_insert(s);
+ birdloop_add_socket(loop, s);
return 0;
}
-static void
-sk_reloop_hook(void *_vs)
-{
- sock *s = _vs;
- if (birdloop_inside(&main_birdloop))
- {
- s->flags &= ~SKF_THREAD;
- sk_insert(s);
- }
- else
- {
- s->flags |= SKF_THREAD;
- sk_start(s);
- }
-}
-
-void
-sk_reloop(sock *s, struct birdloop *loop)
-{
- if (enlisted(&s->n))
- rem_node(&s->n);
-
- s->reloop = (event) {
- .hook = sk_reloop_hook,
- .data = s,
- };
-
- ev_send_loop(loop, &s->reloop);
-}
-
#define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \
CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL)
@@ -1676,6 +1626,13 @@ sk_recvmsg(sock *s)
static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; }
+_Bool
+sk_tx_pending(sock *s)
+{
+ return s->ttx != s->tpos;
+}
+
+
static int
sk_maybe_write(sock *s)
{
@@ -1686,7 +1643,7 @@ sk_maybe_write(sock *s)
case SK_TCP:
case SK_MAGIC:
case SK_UNIX:
- while (s->ttx != s->tpos)
+ while (sk_tx_pending(s))
{
e = write(s->fd, s->ttx, s->tpos - s->ttx);
@@ -1708,7 +1665,7 @@ sk_maybe_write(sock *s)
#ifdef HAVE_LIBSSH
case SK_SSH:
- while (s->ttx != s->tpos)
+ while (sk_tx_pending(s))
{
e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx);
@@ -1791,7 +1748,12 @@ sk_send(sock *s, unsigned len)
{
s->ttx = s->tbuf;
s->tpos = s->tbuf + len;
- return sk_maybe_write(s);
+
+ int e = sk_maybe_write(s);
+ if (e == 0) /* Trigger thread poll reload to poll this socket's write. */
+ socket_changed(s);
+
+ return e;
}
/**
@@ -1838,7 +1800,7 @@ call_rx_hook(sock *s, int size)
if (s->rx_hook(s, size))
{
/* We need to be careful since the socket could have been deleted by the hook */
- if (current_sock == s)
+ if (s->loop->sock_active == s)
s->rpos = s->rbuf;
}
}
@@ -2002,7 +1964,7 @@ sk_write_noflush(sock *s)
#endif
default:
- if (s->ttx != s->tpos && sk_maybe_write(s) > 0)
+ if (sk_tx_pending(s) && sk_maybe_write(s) > 0)
{
if (s->tx_hook)
s->tx_hook(s);
@@ -2224,6 +2186,8 @@ static int short_loops = 0;
#define SHORT_LOOP_MAX 10
#define WORK_EVENTS_MAX 10
+sock *stored_sock;
+
void
io_loop(void)
{
@@ -2312,17 +2276,13 @@ io_loop(void)
times_update();
/* guaranteed to be non-empty */
- current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
+ main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
- while (current_sock)
+ while (main_birdloop.sock_active)
+ {
+ sock *s = main_birdloop.sock_active;
+ if (s->index != -1)
{
- sock *s = current_sock;
- if (s->index == -1)
- {
- current_sock = sk_next(s);
- goto next;
- }
-
int e;
int steps;
@@ -2333,10 +2293,11 @@ io_loop(void)
steps--;
io_log_event(s->rx_hook, s->data);
e = sk_read(s, pfd.pfd.data[s->index].revents);
- if (s != current_sock)
- goto next;
}
- while (e && s->rx_hook && steps);
+ while (e && (main_birdloop.sock_active == s) && s->rx_hook && steps);
+
+ if (s != main_birdloop.sock_active)
+ continue;
steps = MAX_STEPS;
if (pfd.pfd.data[s->index].revents & POLLOUT)
@@ -2345,56 +2306,54 @@ io_loop(void)
steps--;
io_log_event(s->tx_hook, s->data);
e = sk_write(s);
- if (s != current_sock)
- goto next;
}
- while (e && steps);
+ while (e && (main_birdloop.sock_active == s) && steps);
- current_sock = sk_next(s);
- next: ;
+ if (s != main_birdloop.sock_active)
+ continue;
}
+ main_birdloop.sock_active = sk_next(s);
+ }
+
short_loops++;
if (events && (short_loops < SHORT_LOOP_MAX))
continue;
short_loops = 0;
int count = 0;
- current_sock = stored_sock;
- if (current_sock == NULL)
- current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
+ main_birdloop.sock_active = stored_sock;
+ if (main_birdloop.sock_active == NULL)
+ main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
- while (current_sock && count < MAX_RX_STEPS)
+ while (main_birdloop.sock_active && count < MAX_RX_STEPS)
{
- sock *s = current_sock;
+ sock *s = main_birdloop.sock_active;
if (s->index == -1)
- {
- current_sock = sk_next(s);
- goto next2;
- }
+ goto next2;
if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook)
{
count++;
io_log_event(s->rx_hook, s->data);
sk_read(s, pfd.pfd.data[s->index].revents);
- if (s != current_sock)
- goto next2;
+ if (s != main_birdloop.sock_active)
+ continue;
}
if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR))
{
sk_err(s, pfd.pfd.data[s->index].revents);
- if (s != current_sock)
- goto next2;
+ if (s != main_birdloop.sock_active)
+ continue;
}
- current_sock = sk_next(s);
next2: ;
+ main_birdloop.sock_active = sk_next(s);
}
- stored_sock = current_sock;
+ stored_sock = main_birdloop.sock_active;
}
}
}
diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c
index a9ae842a..79db32d3 100644
--- a/sysdep/unix/main.c
+++ b/sysdep/unix/main.c
@@ -542,7 +542,7 @@ cli_init_unix(uid_t use_uid, gid_t use_gid)
/* Return value intentionally ignored */
unlink(path_control_socket);
- if (sk_open_unix(s, path_control_socket) < 0)
+ if (sk_open_unix(s, &main_birdloop, path_control_socket) < 0)
die("Cannot create control socket %s: %m", path_control_socket);
if (use_uid || use_gid)
diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h
index e26afe41..606b79cd 100644
--- a/sysdep/unix/unix.h
+++ b/sysdep/unix/unix.h
@@ -9,6 +9,9 @@
#ifndef _BIRD_UNIX_H_
#define _BIRD_UNIX_H_
+#include "nest/bird.h"
+#include "lib/io-loop.h"
+
#include <sys/socket.h>
#include <signal.h>
@@ -110,7 +113,7 @@ extern volatile sig_atomic_t async_shutdown_flag;
void io_init(void);
void io_loop(void);
void io_log_dump(void);
-int sk_open_unix(struct birdsock *s, char *name);
+int sk_open_unix(struct birdsock *s, struct birdloop *, char *name);
struct rfile *rf_open(struct pool *, const char *name, const char *mode);
void *rf_file(struct rfile *f);
int rf_fileno(struct rfile *f);