summaryrefslogtreecommitdiff
path: root/sysdep/unix
diff options
context:
space:
mode:
authorMaria Matejka <mq@ucw.cz>2021-02-08 09:51:59 +0100
committerMaria Matejka <mq@ucw.cz>2021-11-22 19:05:43 +0100
commit1289c1c5eede5b3d015d06b725d30024ccac51bd (patch)
tree3461d23b870f3a1c739cdea91b8a2231a70171a6 /sysdep/unix
parent1db83a507a9ae287815d62733d1337074993b433 (diff)
Coroutines: A simple and lightweight parallel execution framework.
Diffstat (limited to 'sysdep/unix')
-rw-r--r--sysdep/unix/coroutine.c69
-rw-r--r--sysdep/unix/io.c27
-rw-r--r--sysdep/unix/log.c34
-rw-r--r--sysdep/unix/unix.h1
4 files changed, 127 insertions, 4 deletions
diff --git a/sysdep/unix/coroutine.c b/sysdep/unix/coroutine.c
index 05f101fb..71847505 100644
--- a/sysdep/unix/coroutine.c
+++ b/sysdep/unix/coroutine.c
@@ -17,7 +17,14 @@
#include "lib/birdlib.h"
#include "lib/locking.h"
+#include "lib/coro.h"
#include "lib/resource.h"
+#include "lib/timer.h"
+
+/* Using a rather big stack for coroutines to allow for stack-local allocations.
+ * In real world, the kernel doesn't alloc this memory until it is used.
+ * */
+#define CORO_STACK_SIZE 1048576
/*
* Implementation of coroutines based on POSIX threads
@@ -100,3 +107,65 @@ void do_unlock(struct domain_generic *dg, struct domain_generic **lsp)
pthread_mutex_unlock(&dg->mutex);
}
+/* Coroutines */
+struct coroutine {
+ resource r;
+ pthread_t id;
+ pthread_attr_t attr;
+ void (*entry)(void *);
+ void *data;
+};
+
+static _Thread_local _Bool coro_cleaned_up = 0;
+
+static void coro_free(resource *r)
+{
+ struct coroutine *c = (void *) r;
+ ASSERT_DIE(pthread_equal(pthread_self(), c->id));
+ pthread_attr_destroy(&c->attr);
+ coro_cleaned_up = 1;
+}
+
+static struct resclass coro_class = {
+ .name = "Coroutine",
+ .size = sizeof(struct coroutine),
+ .free = coro_free,
+};
+
+static void *coro_entry(void *p)
+{
+ struct coroutine *c = p;
+ ASSERT_DIE(c->entry);
+
+ c->entry(c->data);
+ ASSERT_DIE(coro_cleaned_up);
+
+ return NULL;
+}
+
+struct coroutine *coro_run(pool *p, void (*entry)(void *), void *data)
+{
+ ASSERT_DIE(entry);
+ ASSERT_DIE(p);
+
+ struct coroutine *c = ralloc(p, &coro_class);
+
+ c->entry = entry;
+ c->data = data;
+
+ int e = 0;
+
+ if (e = pthread_attr_init(&c->attr))
+ die("pthread_attr_init() failed: %M", e);
+
+ if (e = pthread_attr_setstacksize(&c->attr, CORO_STACK_SIZE))
+ die("pthread_attr_setstacksize(%u) failed: %M", CORO_STACK_SIZE, e);
+
+ if (e = pthread_attr_setdetachstate(&c->attr, PTHREAD_CREATE_DETACHED))
+ die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e);
+
+ if (e = pthread_create(&c->id, &c->attr, coro_entry, c))
+ die("pthread_create() failed: %M", e);
+
+ return c;
+}
diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c
index 29467867..40841ea4 100644
--- a/sysdep/unix/io.c
+++ b/sysdep/unix/io.c
@@ -2176,6 +2176,15 @@ static int short_loops = 0;
#define SHORT_LOOP_MAX 10
#define WORK_EVENTS_MAX 10
+static int poll_reload_pipe[2];
+
+void
+io_loop_reload(void)
+{
+ char b;
+ write(poll_reload_pipe[1], &b, 1);
+}
+
void
io_loop(void)
{
@@ -2187,6 +2196,9 @@ io_loop(void)
int fdmax = 256;
struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd));
+ if (pipe(poll_reload_pipe) < 0)
+ die("pipe(poll_reload_pipe) failed: %m");
+
watchdog_start1();
for(;;)
{
@@ -2205,7 +2217,12 @@ io_loop(void)
poll_tout = MIN(poll_tout, timeout);
}
- nfds = 0;
+ /* A hack to reload main io_loop() when something has changed asynchronously. */
+ pfd[0].fd = poll_reload_pipe[0];
+ pfd[0].events = POLLIN;
+
+ nfds = 1;
+
WALK_LIST(n, sock_list)
{
pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */
@@ -2277,6 +2294,14 @@ io_loop(void)
}
if (pout)
{
+ if (pfd[0].revents & POLLIN)
+ {
+ /* IO loop reload requested */
+ char b;
+ read(poll_reload_pipe[0], &b, 1);
+ continue;
+ }
+
times_update(&main_timeloop);
/* guaranteed to be non-empty */
diff --git a/sysdep/unix/log.c b/sysdep/unix/log.c
index a23903b7..dc2b14b3 100644
--- a/sysdep/unix/log.c
+++ b/sysdep/unix/log.c
@@ -15,6 +15,7 @@
* user's manual.
*/
+#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
@@ -35,6 +36,10 @@ static FILE *dbgf;
static list *current_log_list;
static char *current_syslog_name; /* NULL -> syslog closed */
+static _Atomic uint max_coro_id = ATOMIC_VAR_INIT(1);
+static _Thread_local uint this_coro_id;
+
+#define THIS_CORO_ID (this_coro_id ?: (this_coro_id = atomic_fetch_add_explicit(&max_coro_id, 1, memory_order_acq_rel)))
#include <pthread.h>
@@ -178,7 +183,7 @@ log_commit(int class, buffer *buf)
l->pos += msg_len;
}
- fprintf(l->fh, "%s <%s> ", tbuf, class_names[class]);
+ fprintf(l->fh, "%s [%04x] <%s> ", tbuf, THIS_CORO_ID, class_names[class]);
}
fputs(buf->start, l->fh);
fputc('\n', l->fh);
@@ -288,6 +293,8 @@ die(const char *msg, ...)
exit(1);
}
+static struct timespec dbg_time_start;
+
/**
* debug - write to debug output
* @msg: a printf-like message
@@ -300,12 +307,33 @@ debug(const char *msg, ...)
{
#define MAX_DEBUG_BUFSIZE 16384
va_list args;
- char buf[MAX_DEBUG_BUFSIZE];
+ char buf[MAX_DEBUG_BUFSIZE], *pos = buf;
+ int max = MAX_DEBUG_BUFSIZE;
va_start(args, msg);
if (dbgf)
{
- if (bvsnprintf(buf, MAX_DEBUG_BUFSIZE, msg, args) < 0)
+ struct timespec dbg_time;
+ clock_gettime(CLOCK_MONOTONIC, &dbg_time);
+ uint nsec;
+ uint sec;
+
+ if (dbg_time.tv_nsec > dbg_time_start.tv_nsec)
+ {
+ nsec = dbg_time.tv_nsec - dbg_time_start.tv_nsec;
+ sec = dbg_time.tv_sec - dbg_time_start.tv_sec;
+ }
+ else
+ {
+ nsec = 1000000000 + dbg_time.tv_nsec - dbg_time_start.tv_nsec;
+ sec = dbg_time.tv_sec - dbg_time_start.tv_sec - 1;
+ }
+
+ int n = bsnprintf(pos, max, "%u.%09u: [%04x] ", sec, nsec, THIS_CORO_ID);
+ pos += n;
+ max -= n;
+
+ if (bvsnprintf(pos, max, msg, args) < 0)
bug("Extremely long debug output, split it.");
fputs(buf, dbgf);
diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h
index ad85d1ea..313c97c3 100644
--- a/sysdep/unix/unix.h
+++ b/sysdep/unix/unix.h
@@ -106,6 +106,7 @@ extern volatile sig_atomic_t async_shutdown_flag;
void io_init(void);
void io_loop(void);
+void io_loop_reload(void);
void io_log_dump(void);
int sk_open_unix(struct birdsock *s, char *name);
struct rfile *rf_open(struct pool *, const char *name, const char *mode);