diff options
author | Maria Matejka <mq@ucw.cz> | 2021-06-18 18:23:41 +0200 |
---|---|---|
committer | Maria Matejka <mq@ucw.cz> | 2021-11-22 19:05:43 +0100 |
commit | c84ed603714db2c42a781f8dbb5b3fd540ff689f (patch) | |
tree | 59b5c772dd1a9209ded29efc3ec58ba21582a810 /sysdep/unix/io-loop.c | |
parent | a4451535c69b8f934523905a8131ae2f16be2146 (diff) |
Moved BFD IO loop out of BFD as we want to use it as socket-io coroutine
Diffstat (limited to 'sysdep/unix/io-loop.c')
-rw-r--r-- | sysdep/unix/io-loop.c | 508 |
1 files changed, 508 insertions, 0 deletions
diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c new file mode 100644 index 00000000..a15d866a --- /dev/null +++ b/sysdep/unix/io-loop.c @@ -0,0 +1,508 @@ +/* + * BIRD -- I/O and event loop + * + * Can be freely distributed and used under the terms of the GNU GPL. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> +#include <poll.h> +#include <pthread.h> +#include <time.h> +#include <sys/time.h> + +#include "nest/bird.h" +#include "sysdep/unix/io-loop.h" + +#include "lib/buffer.h" +#include "lib/lists.h" +#include "lib/resource.h" +#include "lib/event.h" +#include "lib/timer.h" +#include "lib/socket.h" + + +struct birdloop +{ + pool *pool; + pthread_t thread; + pthread_mutex_t mutex; + + u8 stop_called; + u8 poll_active; + u8 wakeup_masked; + int wakeup_fds[2]; + + struct timeloop time; + list event_list; + list sock_list; + uint sock_num; + + BUFFER(sock *) poll_sk; + BUFFER(struct pollfd) poll_fd; + u8 poll_changed; + u8 close_scheduled; +}; + + +/* + * Current thread context + */ + +static _Thread_local struct birdloop *birdloop_current; + +static inline void +birdloop_set_current(struct birdloop *loop) +{ + birdloop_current = loop; + local_timeloop = loop ? &loop->time : &main_timeloop; +} + +/* + * Wakeup code for birdloop + */ + +static void +pipe_new(int *pfds) +{ + int rv = pipe(pfds); + if (rv < 0) + die("pipe: %m"); + + if (fcntl(pfds[0], F_SETFL, O_NONBLOCK) < 0) + die("fcntl(O_NONBLOCK): %m"); + + if (fcntl(pfds[1], F_SETFL, O_NONBLOCK) < 0) + die("fcntl(O_NONBLOCK): %m"); +} + +void +pipe_drain(int fd) +{ + char buf[64]; + int rv; + + try: + rv = read(fd, buf, 64); + if (rv < 0) + { + if (errno == EINTR) + goto try; + if (errno == EAGAIN) + return; + die("wakeup read: %m"); + } + if (rv == 64) + goto try; +} + +void +pipe_kick(int fd) +{ + u64 v = 1; + int rv; + + try: + rv = write(fd, &v, sizeof(u64)); + if (rv < 0) + { + if (errno == EINTR) + goto try; + if (errno == EAGAIN) + return; + die("wakeup write: %m"); + } +} + +static inline void +wakeup_init(struct birdloop *loop) +{ + pipe_new(loop->wakeup_fds); +} + +static inline void +wakeup_drain(struct birdloop *loop) +{ + pipe_drain(loop->wakeup_fds[0]); +} + +static inline void +wakeup_do_kick(struct birdloop *loop) +{ + pipe_kick(loop->wakeup_fds[1]); +} + +static inline void +wakeup_kick(struct birdloop *loop) +{ + if (!loop->wakeup_masked) + wakeup_do_kick(loop); + else + loop->wakeup_masked = 2; +} + +/* For notifications from outside */ +void +wakeup_kick_current(void) +{ + if (birdloop_current && birdloop_current->poll_active) + wakeup_kick(birdloop_current); +} + + +/* + * Events + */ + +static inline uint +events_waiting(struct birdloop *loop) +{ + return !EMPTY_LIST(loop->event_list); +} + +static inline void +events_init(struct birdloop *loop) +{ + init_list(&loop->event_list); +} + +static void +events_fire(struct birdloop *loop) +{ + times_update(); + ev_run_list(&loop->event_list); +} + +void +ev2_schedule(event *e) +{ + if (birdloop_current->poll_active && EMPTY_LIST(birdloop_current->event_list)) + wakeup_kick(birdloop_current); + + if (e->n.next) + rem_node(&e->n); + + add_tail(&birdloop_current->event_list, &e->n); +} + + +/* + * Sockets + */ + +static void +sockets_init(struct birdloop *loop) +{ + init_list(&loop->sock_list); + loop->sock_num = 0; + + BUFFER_INIT(loop->poll_sk, loop->pool, 4); + BUFFER_INIT(loop->poll_fd, loop->pool, 4); + loop->poll_changed = 1; /* add wakeup fd */ +} + +static void +sockets_add(struct birdloop *loop, sock *s) +{ + add_tail(&loop->sock_list, &s->n); + loop->sock_num++; + + s->index = -1; + loop->poll_changed = 1; + + if (loop->poll_active) + wakeup_kick(loop); +} + +void +sk_start(sock *s) +{ + sockets_add(birdloop_current, s); +} + +static void +sockets_remove(struct birdloop *loop, sock *s) +{ + rem_node(&s->n); + loop->sock_num--; + + if (s->index >= 0) + loop->poll_sk.data[s->index] = NULL; + + s->index = -1; + loop->poll_changed = 1; + + /* Wakeup moved to sk_stop() */ +} + +void +sk_stop(sock *s) +{ + sockets_remove(birdloop_current, s); + + if (birdloop_current->poll_active) + { + birdloop_current->close_scheduled = 1; + wakeup_kick(birdloop_current); + } + else + close(s->fd); + + s->fd = -1; +} + +static inline uint sk_want_events(sock *s) +{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } + +/* +FIXME: this should be called from sock code + +static void +sockets_update(struct birdloop *loop, sock *s) +{ + if (s->index >= 0) + loop->poll_fd.data[s->index].events = sk_want_events(s); +} +*/ + +static void +sockets_prepare(struct birdloop *loop) +{ + BUFFER_SET(loop->poll_sk, loop->sock_num + 1); + BUFFER_SET(loop->poll_fd, loop->sock_num + 1); + + struct pollfd *pfd = loop->poll_fd.data; + sock **psk = loop->poll_sk.data; + uint i = 0; + node *n; + + WALK_LIST(n, loop->sock_list) + { + sock *s = SKIP_BACK(sock, n, n); + + ASSERT(i < loop->sock_num); + + s->index = i; + *psk = s; + pfd->fd = s->fd; + pfd->events = sk_want_events(s); + pfd->revents = 0; + + pfd++; + psk++; + i++; + } + + ASSERT(i == loop->sock_num); + + /* Add internal wakeup fd */ + *psk = NULL; + pfd->fd = loop->wakeup_fds[0]; + pfd->events = POLLIN; + pfd->revents = 0; + + loop->poll_changed = 0; +} + +static void +sockets_close_fds(struct birdloop *loop) +{ + struct pollfd *pfd = loop->poll_fd.data; + sock **psk = loop->poll_sk.data; + int poll_num = loop->poll_fd.used - 1; + + int i; + for (i = 0; i < poll_num; i++) + if (psk[i] == NULL) + close(pfd[i].fd); + + loop->close_scheduled = 0; +} + +int sk_read(sock *s, int revents); +int sk_write(sock *s); + +static void +sockets_fire(struct birdloop *loop) +{ + struct pollfd *pfd = loop->poll_fd.data; + sock **psk = loop->poll_sk.data; + int poll_num = loop->poll_fd.used - 1; + + times_update(); + + /* Last fd is internal wakeup fd */ + if (pfd[poll_num].revents & POLLIN) + wakeup_drain(loop); + + int i; + for (i = 0; i < poll_num; pfd++, psk++, i++) + { + int e = 1; + + if (! pfd->revents) + continue; + + if (pfd->revents & POLLNVAL) + die("poll: invalid fd %d", pfd->fd); + + if (pfd->revents & POLLIN) + while (e && *psk && (*psk)->rx_hook) + e = sk_read(*psk, 0); + + e = 1; + if (pfd->revents & POLLOUT) + while (e && *psk) + e = sk_write(*psk); + } +} + + +/* + * Birdloop + */ + +static void *birdloop_main(void *arg); + +struct birdloop * +birdloop_new(void) +{ + pool *p = rp_new(NULL, "Birdloop root"); + struct birdloop *loop = mb_allocz(p, sizeof(struct birdloop)); + loop->pool = p; + pthread_mutex_init(&loop->mutex, NULL); + + wakeup_init(loop); + + events_init(loop); + timers_init(&loop->time, p); + sockets_init(loop); + + return loop; +} + +void +birdloop_start(struct birdloop *loop) +{ + int rv = pthread_create(&loop->thread, NULL, birdloop_main, loop); + if (rv) + die("pthread_create(): %M", rv); +} + +void +birdloop_stop(struct birdloop *loop) +{ + pthread_mutex_lock(&loop->mutex); + loop->stop_called = 1; + wakeup_do_kick(loop); + pthread_mutex_unlock(&loop->mutex); + + int rv = pthread_join(loop->thread, NULL); + if (rv) + die("pthread_join(): %M", rv); +} + +void +birdloop_free(struct birdloop *loop) +{ + rfree(loop->pool); +} + + +void +birdloop_enter(struct birdloop *loop) +{ + /* TODO: these functions could save and restore old context */ + pthread_mutex_lock(&loop->mutex); + birdloop_set_current(loop); +} + +void +birdloop_leave(struct birdloop *loop) +{ + /* TODO: these functions could save and restore old context */ + birdloop_set_current(NULL); + pthread_mutex_unlock(&loop->mutex); +} + +void +birdloop_mask_wakeups(struct birdloop *loop) +{ + pthread_mutex_lock(&loop->mutex); + loop->wakeup_masked = 1; + pthread_mutex_unlock(&loop->mutex); +} + +void +birdloop_unmask_wakeups(struct birdloop *loop) +{ + pthread_mutex_lock(&loop->mutex); + if (loop->wakeup_masked == 2) + wakeup_do_kick(loop); + loop->wakeup_masked = 0; + pthread_mutex_unlock(&loop->mutex); +} + +static void * +birdloop_main(void *arg) +{ + struct birdloop *loop = arg; + timer *t; + int rv, timeout; + + birdloop_set_current(loop); + + pthread_mutex_lock(&loop->mutex); + while (1) + { + events_fire(loop); + timers_fire(&loop->time); + + times_update(); + if (events_waiting(loop)) + timeout = 0; + else if (t = timers_first(&loop->time)) + timeout = (tm_remains(t) TO_MS) + 1; + else + timeout = -1; + + if (loop->poll_changed) + sockets_prepare(loop); + + loop->poll_active = 1; + pthread_mutex_unlock(&loop->mutex); + + try: + rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout); + if (rv < 0) + { + if (errno == EINTR || errno == EAGAIN) + goto try; + die("poll: %m"); + } + + pthread_mutex_lock(&loop->mutex); + loop->poll_active = 0; + + if (loop->close_scheduled) + sockets_close_fds(loop); + + if (loop->stop_called) + break; + + if (rv) + sockets_fire(loop); + + timers_fire(&loop->time); + } + + loop->stop_called = 0; + pthread_mutex_unlock(&loop->mutex); + + return NULL; +} + + |