diff options
Diffstat (limited to 'sysdep/unix')
-rw-r--r-- | sysdep/unix/Makefile | 2 | ||||
-rw-r--r-- | sysdep/unix/alloc.c | 397 | ||||
-rw-r--r-- | sysdep/unix/config.Y | 11 | ||||
-rw-r--r-- | sysdep/unix/domain.c (renamed from sysdep/unix/coroutine.c) | 108 | ||||
-rw-r--r-- | sysdep/unix/io-loop.c | 1297 | ||||
-rw-r--r-- | sysdep/unix/io-loop.h | 84 | ||||
-rw-r--r-- | sysdep/unix/io.c | 377 | ||||
-rw-r--r-- | sysdep/unix/krt.Y | 5 | ||||
-rw-r--r-- | sysdep/unix/krt.c | 267 | ||||
-rw-r--r-- | sysdep/unix/krt.h | 7 | ||||
-rw-r--r-- | sysdep/unix/log.c | 55 | ||||
-rw-r--r-- | sysdep/unix/main.c | 62 | ||||
-rw-r--r-- | sysdep/unix/unix.h | 9 |
13 files changed, 1699 insertions, 982 deletions
diff --git a/sysdep/unix/Makefile b/sysdep/unix/Makefile index 07f454ab..6f6b0d26 100644 --- a/sysdep/unix/Makefile +++ b/sysdep/unix/Makefile @@ -1,4 +1,4 @@ -src := alloc.c io.c io-loop.c krt.c log.c main.c random.c coroutine.c +src := alloc.c io.c io-loop.c krt.c log.c main.c random.c domain.c obj := $(src-o-files) $(all-daemon) $(cf-local) diff --git a/sysdep/unix/alloc.c b/sysdep/unix/alloc.c index c09a8356..cafcc8dd 100644 --- a/sysdep/unix/alloc.c +++ b/sysdep/unix/alloc.c @@ -10,239 +10,324 @@ #include "lib/resource.h" #include "lib/lists.h" #include "lib/event.h" +#include "lib/rcu.h" -#include "sysdep/unix/io-loop.h" - +#include <errno.h> #include <stdlib.h> #include <unistd.h> -#include <stdatomic.h> -#include <errno.h> #ifdef HAVE_MMAP #include <sys/mman.h> #endif +#ifdef CONFIG_DISABLE_THP +#include <sys/prctl.h> +#ifndef PR_SET_THP_DISABLE +#define PR_SET_THP_DISABLE 41 +#endif +#endif + long page_size = 0; #ifdef HAVE_MMAP -#if DEBUGGING -#define FP_NODE_OFFSET 42 -#else -#define FP_NODE_OFFSET 1 -#endif +#define KEEP_PAGES_MAX 512 +#define KEEP_PAGES_MIN 32 +#define KEEP_PAGES_MAX_LOCAL 16 +#define ALLOC_PAGES_AT_ONCE 8 + +STATIC_ASSERT(KEEP_PAGES_MIN * 4 < KEEP_PAGES_MAX); +STATIC_ASSERT(ALLOC_PAGES_AT_ONCE < KEEP_PAGES_MAX_LOCAL); + static _Bool use_fake = 0; +static _Bool initialized = 0; + +#if DEBUGGING +struct free_page { + node unused[42]; + struct free_page * _Atomic next; +}; #else -static _Bool use_fake = 1; +struct free_page { + struct free_page * _Atomic next; +}; #endif +#define EP_POS_MAX ((page_size - OFFSETOF(struct empty_pages, pages)) / sizeof (void *)) + +struct empty_pages { + struct empty_pages *next; + uint pos; + void *pages[0]; +}; + +DEFINE_DOMAIN(resource); +static DOMAIN(resource) empty_pages_domain; +static struct empty_pages *empty_pages = NULL; + +static struct free_page * _Atomic page_stack = NULL; +static _Thread_local struct free_page * local_page_stack = NULL; + +static void page_cleanup(void *); +static event page_cleanup_event = { .hook = page_cleanup, }; +#define SCHEDULE_CLEANUP do if (initialized && !shutting_down) ev_send(&global_event_list, &page_cleanup_event); while (0) + +_Atomic int pages_kept = 0; +_Atomic int pages_kept_locally = 0; +static _Thread_local int pages_kept_here = 0; + static void * alloc_sys_page(void) { - void *ptr = mmap(NULL, page_size, PROT_WRITE | PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + void *ptr = mmap(NULL, page_size * ALLOC_PAGES_AT_ONCE, PROT_WRITE | PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if (ptr == MAP_FAILED) - bug("mmap(%lu) failed: %m", page_size); + die("mmap(%ld) failed: %m", (s64) page_size); return ptr; } extern int shutting_down; /* Shutdown requested. */ +#else // ! HAVE_MMAP +#define use_fake 1 +#endif + void * alloc_page(void) { -#ifdef HAVE_MMAP - if (!use_fake) + /* If the system page allocator is goofy, we use posix_memalign to get aligned blocks of memory. */ + if (use_fake) { - struct free_pages *fp = &birdloop_current->pages; - if (!fp->cnt) - return alloc_sys_page(); - - node *n = HEAD(fp->list); - rem_node(n); - if ((--fp->cnt < fp->min) && !shutting_down) - ev_send(fp->cleanup->list, fp->cleanup); - - void *ptr = n - FP_NODE_OFFSET; - memset(ptr, 0, page_size); + void *ptr = NULL; + int err = posix_memalign(&ptr, page_size, page_size); + + if (err || !ptr) + die("posix_memalign(%ld) failed", (s64) page_size); + return ptr; } - else -#endif - { -#ifdef HAVE_ALIGNED_ALLOC - void *ret = aligned_alloc(page_size, page_size); - if (!ret) - bug("aligned_alloc(%lu) failed", page_size); - return ret; -#else - bug("BIRD should have already died on fatal error."); -#endif - } -} -void -free_page(void *ptr) -{ #ifdef HAVE_MMAP - if (!use_fake) + /* If there is any free page kept hot in this thread, we use it. */ + struct free_page *fp = local_page_stack; + if (fp) { - struct free_pages *fp = &birdloop_current->pages; - struct node *n = ptr; - n += FP_NODE_OFFSET; - - memset(n, 0, sizeof(node)); - add_tail(&fp->list, n); - if ((++fp->cnt > fp->max) && !shutting_down) - ev_send(fp->cleanup->list, fp->cleanup); + local_page_stack = atomic_load_explicit(&fp->next, memory_order_acquire); + atomic_fetch_sub_explicit(&pages_kept_locally, 1, memory_order_relaxed); + pages_kept_here--; + return fp; } - else -#endif - free(ptr); -} -#ifdef HAVE_MMAP + ASSERT_DIE(pages_kept_here == 0); -#define GFP (&main_birdloop.pages) + /* If there is any free page kept hot in global storage, we use it. */ + rcu_read_lock(); + fp = atomic_load_explicit(&page_stack, memory_order_acquire); + while (fp && !atomic_compare_exchange_strong_explicit( + &page_stack, &fp, atomic_load_explicit(&fp->next, memory_order_acquire), + memory_order_acq_rel, memory_order_acquire)) + ; + rcu_read_unlock(); -void -flush_pages(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop->parent->loop)); + if (fp) + { + atomic_fetch_sub_explicit(&pages_kept, 1, memory_order_relaxed); + return fp; + } - struct free_pages *fp = &loop->pages; - struct free_pages *pfp = &loop->parent->loop->pages; + /* If there is any free page kept cold, we use that. */ + LOCK_DOMAIN(resource, empty_pages_domain); + if (empty_pages) { + if (empty_pages->pos) + /* Either the keeper page contains at least one cold page pointer, return that */ + fp = empty_pages->pages[--empty_pages->pos]; + else + { + /* Or the keeper page has no more cold page pointer, return the keeper page */ + fp = (struct free_page *) empty_pages; + empty_pages = empty_pages->next; + } + } + UNLOCK_DOMAIN(resource, empty_pages_domain); - add_tail_list(&pfp->list, &fp->list); - pfp->cnt += fp->cnt; - - fp->cnt = 0; - fp->list = (list) {}; - fp->min = 0; - fp->max = 0; + if (fp) + return fp; - rfree(fp->cleanup); - fp->cleanup = NULL; + /* And in the worst case, allocate some new pages by mmap() */ + void *ptr = alloc_sys_page(); + for (int i=1; i<ALLOC_PAGES_AT_ONCE; i++) + free_page(ptr + page_size * i); + + return ptr; +#endif } -static void -cleanup_pages(void *data) +void +free_page(void *ptr) { - struct birdloop *loop = data; - birdloop_enter(loop); - - ASSERT_DIE(birdloop_inside(loop->parent->loop)); - - struct free_pages *fp = &loop->pages; - struct free_pages *pfp = &loop->parent->loop->pages; - - while ((fp->cnt < fp->min) && (pfp->cnt > pfp->min)) + /* If the system page allocator is goofy, we just free the block and care no more. */ + if (use_fake) { - node *n = HEAD(pfp->list); - rem_node(n); - add_tail(&fp->list, n); - fp->cnt++; - pfp->cnt--; - } - - while (fp->cnt < fp->min) - { - node *n = alloc_sys_page(); - add_tail(&fp->list, n + FP_NODE_OFFSET); - fp->cnt++; + free(ptr); + return; } - while (fp->cnt > fp->max) +#ifdef HAVE_MMAP + /* We primarily try to keep the pages locally. */ + struct free_page *fp = ptr; + if (shutting_down || (pages_kept_here < KEEP_PAGES_MAX_LOCAL)) { - node *n = HEAD(fp->list); - rem_node(n); - add_tail(&pfp->list, n); - fp->cnt--; - pfp->cnt++; + atomic_store_explicit(&fp->next, local_page_stack, memory_order_relaxed); + local_page_stack = fp; + + atomic_fetch_add_explicit(&pages_kept_locally, 1, memory_order_relaxed); + pages_kept_here++; + return; } - birdloop_leave(loop); + /* If there are too many local pages, we add the free page to the global hot-free-page list */ + rcu_read_lock(); + struct free_page *next = atomic_load_explicit(&page_stack, memory_order_acquire); - if (!shutting_down && (pfp->cnt > pfp->max)) - ev_send(pfp->cleanup->list, pfp->cleanup); + do atomic_store_explicit(&fp->next, next, memory_order_release); + while (!atomic_compare_exchange_strong_explicit( + &page_stack, &next, fp, + memory_order_acq_rel, memory_order_acquire)); + rcu_read_unlock(); + + /* And if there are too many global hot free pages, we ask for page cleanup */ + if (atomic_fetch_add_explicit(&pages_kept, 1, memory_order_relaxed) >= KEEP_PAGES_MAX) + SCHEDULE_CLEANUP; +#endif } -static void -cleanup_global_pages(void *data UNUSED) +/* When the routine is going to sleep for a long time, we flush the local + * hot page cache to not keep dirty pages for nothing. */ +void +flush_local_pages(void) { - while (GFP->cnt < GFP->max) - { - node *n = alloc_sys_page(); - add_tail(&GFP->list, n + FP_NODE_OFFSET); - GFP->cnt++; - } + if (use_fake || !local_page_stack || shutting_down) + return; - for (uint limit = GFP->cnt; (limit > 0) && (GFP->cnt > GFP->max); limit--) + /* We first count the pages to enable consistency checking. + * Also, we need to know the last page. */ + struct free_page *last = local_page_stack, *next; + int check_count = 1; + while (next = atomic_load_explicit(&last->next, memory_order_acquire)) { - node *n = TAIL(GFP->list); - rem_node(n); - - if (munmap(n - FP_NODE_OFFSET, page_size) == 0) - GFP->cnt--; - else if (errno == ENOMEM) - add_head(&GFP->list, n); - else - bug("munmap(%p) failed: %m", n - FP_NODE_OFFSET); + check_count++; + last = next; } + + /* The actual number of pages must be equal to the counter value. */ + ASSERT_DIE(check_count == pages_kept_here); + + /* Repeatedly trying to insert the whole page list into global page stack at once. */ + rcu_read_lock(); + next = atomic_load_explicit(&page_stack, memory_order_acquire); + + /* First we set the outwards pointer (from our last), + * then we try to set the inwards pointer to our first page. */ + do atomic_store_explicit(&last->next, next, memory_order_release); + while (!atomic_compare_exchange_strong_explicit( + &page_stack, &next, local_page_stack, + memory_order_acq_rel, memory_order_acquire)); + rcu_read_unlock(); + + /* Finished. Now the local stack is empty. */ + local_page_stack = NULL; + pages_kept_here = 0; + + /* Check the state of global page cache and maybe schedule its cleanup. */ + atomic_fetch_sub_explicit(&pages_kept_locally, check_count, memory_order_relaxed); + if (atomic_fetch_add_explicit(&pages_kept, check_count, memory_order_relaxed) >= KEEP_PAGES_MAX) + SCHEDULE_CLEANUP; } -void -init_pages(struct birdloop *loop) +#ifdef HAVE_MMAP +static void +page_cleanup(void *_ UNUSED) { - struct free_pages *fp = &loop->pages; + /* Cleanup on shutdown is ignored. All pages may be kept hot, OS will take care. */ + if (shutting_down) + return; + + struct free_page *stack = atomic_exchange_explicit(&page_stack, NULL, memory_order_acq_rel); + if (!stack) + return; - init_list(&fp->list); - fp->cleanup = ev_new_init(loop->parent->loop->pool, cleanup_pages, loop); - fp->cleanup->list = (loop->parent->loop == &main_birdloop) ? &global_work_list : birdloop_event_list(loop->parent->loop); - fp->min = 4; - fp->max = 16; - for (fp->cnt = 0; fp->cnt < fp->min; fp->cnt++) + do { + synchronize_rcu(); + struct free_page *fp = stack; + stack = atomic_load_explicit(&fp->next, memory_order_acquire); + + LOCK_DOMAIN(resource, empty_pages_domain); + /* Empty pages are stored as pointers. To store them, we need a pointer block. */ + if (!empty_pages || (empty_pages->pos == EP_POS_MAX)) + { + /* There is either no pointer block or the last block is full. We use this block as a pointer block. */ + empty_pages = (struct empty_pages *) fp; + *empty_pages = (struct empty_pages) {}; + } + else + { + /* We store this block as a pointer into the first free place + * and tell the OS that the underlying memory is trash. */ + empty_pages->pages[empty_pages->pos++] = fp; + if (madvise(fp, page_size, +#ifdef CONFIG_MADV_DONTNEED_TO_FREE + MADV_DONTNEED +#else + MADV_FREE +#endif + ) < 0) + bug("madvise(%p) failed: %m", fp); + } + UNLOCK_DOMAIN(resource, empty_pages_domain); + } + while ((atomic_fetch_sub_explicit(&pages_kept, 1, memory_order_relaxed) >= KEEP_PAGES_MAX / 2) && stack); + + while (stack) { - node *n = alloc_sys_page(); - add_tail(&fp->list, n + FP_NODE_OFFSET); + struct free_page *f = stack; + stack = atomic_load_explicit(&f->next, memory_order_acquire); + free_page(f); + + atomic_fetch_sub_explicit(&pages_kept, 1, memory_order_relaxed); } } +#endif -static event global_free_pages_cleanup_event = { .hook = cleanup_global_pages, .list = &global_work_list }; - -void resource_sys_init(void) +void +resource_sys_init(void) { +#ifdef CONFIG_DISABLE_THP + /* Disable transparent huge pages, they do not work properly with madvice(MADV_DONTNEED) */ + if (prctl(PR_SET_THP_DISABLE, (unsigned long) 1, (unsigned long) 0, (unsigned long) 0, (unsigned long) 0) < 0) + log(L_WARN "Cannot disable transparent huge pages: prctl(PR_SET_THP_DISABLE) failed: %m"); +#endif + +#ifdef HAVE_MMAP + /* Check what page size the system supports */ if (!(page_size = sysconf(_SC_PAGESIZE))) die("System page size must be non-zero"); - if (u64_popcount(page_size) == 1) + if ((u64_popcount(page_size) == 1) && (page_size >= (1 << 10)) && (page_size <= (1 << 18))) { - init_list(&GFP->list); - GFP->cleanup = &global_free_pages_cleanup_event; - GFP->min = 0; - GFP->max = 256; + /* We assume that page size has only one bit and is between 1K and 256K (incl.). + * Otherwise, the assumptions in lib/slab.c (sl_head's num_full range) aren't met. */ + + empty_pages_domain = DOMAIN_NEW(resource, "Empty Pages"); + initialized = 1; return; } -#ifdef HAVE_ALIGNED_ALLOC - log(L_WARN "Got strange memory page size (%lu), using the aligned allocator instead", page_size); -#else - die("Got strange memory page size (%lu) and aligned_alloc is not available", page_size); -#endif - /* Too big or strange page, use the aligned allocator instead */ - page_size = 4096; + log(L_WARN "Got strange memory page size (%ld), using the aligned allocator instead", (s64) page_size); use_fake = 1; -} - -#else +#endif -void -resource_sys_init(void) -{ page_size = 4096; - use_fake = 1; + initialized = 1; } - -#endif diff --git a/sysdep/unix/config.Y b/sysdep/unix/config.Y index 5c4b5bef..a50ec757 100644 --- a/sysdep/unix/config.Y +++ b/sysdep/unix/config.Y @@ -101,6 +101,12 @@ mrtdump_base: ; +conf: THREADS expr { + if ($2 < 1) cf_error("Number of threads must be at least one."); + new_config->thread_count = $2; +} + + conf: debug_unix ; debug_unix: @@ -145,6 +151,11 @@ CF_CLI_HELP(GRACEFUL, restart, [[Shut the daemon down for graceful restart]]) CF_CLI(GRACEFUL RESTART,,, [[Shut the daemon down for graceful restart]]) { cmd_graceful_restart(); } ; +CF_CLI(SHOW THREADS,,, [[Write out thread information]]) +{ cmd_show_threads(0); } ; + +CF_CLI(SHOW THREADS ALL,,, [[Write out thread and IO loop information]]) +{ cmd_show_threads(1); } ; cfg_name: /* empty */ { $$ = NULL; } diff --git a/sysdep/unix/coroutine.c b/sysdep/unix/domain.c index 2068afd5..f4ee595d 100644 --- a/sysdep/unix/coroutine.c +++ b/sysdep/unix/domain.c @@ -1,7 +1,6 @@ /* - * BIRD Coroutines + * BIRD Locking * - * (c) 2017 Martin Mares <mj@ucw.cz> * (c) 2020 Maria Matejka <mq@jmq.cz> * * Can be freely distributed and used under the terms of the GNU GPL. @@ -17,19 +16,11 @@ #include "lib/birdlib.h" #include "lib/locking.h" -#include "lib/coro.h" -#include "lib/rcu.h" #include "lib/resource.h" #include "lib/timer.h" #include "conf/conf.h" -#define CORO_STACK_SIZE 65536 - -/* - * Implementation of coroutines based on POSIX threads - */ - #include <errno.h> #include <fcntl.h> #include <poll.h> @@ -79,6 +70,12 @@ domain_free(struct domain_generic *dg) xfree(dg); } +const char * +domain_name(struct domain_generic *dg) +{ + return dg->name; +} + uint dg_order(struct domain_generic *dg) { return dg->order; @@ -86,11 +83,15 @@ uint dg_order(struct domain_generic *dg) void do_lock(struct domain_generic *dg, struct domain_generic **lsp) { - if ((char *) lsp - (char *) &locking_stack != (int) dg->order) + struct lock_order stack_copy; + memcpy(&stack_copy, &locking_stack, sizeof(stack_copy)); + struct domain_generic **lll = last_locked; + + if ((char *) lsp - (char *) &locking_stack != dg->order) bug("Trying to lock on bad position: order=%u, lsp=%p, base=%p", dg->order, lsp, &locking_stack); if (lsp <= last_locked) - bug("Trying to lock in a bad order"); + bug("Trying to lock in a bad order: %p %p", &stack_copy, lll); if (*lsp) bug("Inconsistent locking stack state on lock"); @@ -110,7 +111,7 @@ void do_lock(struct domain_generic *dg, struct domain_generic **lsp) void do_unlock(struct domain_generic *dg, struct domain_generic **lsp) { - if ((char *) lsp - (char *) &locking_stack != (int) dg->order) + if ((char *) lsp - (char *) &locking_stack != dg->order) bug("Trying to unlock on bad position: order=%u, lsp=%p, base=%p", dg->order, lsp, &locking_stack); if (dg->locked_by != &locking_stack) @@ -123,84 +124,3 @@ void do_unlock(struct domain_generic *dg, struct domain_generic **lsp) dg->prev = NULL; pthread_mutex_unlock(&dg->mutex); } - -/* Coroutines */ -struct coroutine { - resource r; - pthread_t id; - pthread_attr_t attr; - struct rcu_coro rcu; - void (*entry)(void *); - void *data; -}; - -static _Thread_local _Bool coro_cleaned_up = 0; - -static void coro_free(resource *r) -{ - struct coroutine *c = (void *) r; - rcu_coro_stop(&c->rcu); - ASSERT_DIE(pthread_equal(pthread_self(), c->id)); - pthread_attr_destroy(&c->attr); - coro_cleaned_up = 1; -} - -static void coro_dump(resource *r UNUSED) { } - -static struct resclass coro_class = { - .name = "Coroutine", - .size = sizeof(struct coroutine), - .free = coro_free, - .dump = coro_dump, -}; - -_Thread_local struct coroutine *this_coro = NULL; - -static void *coro_entry(void *p) -{ - struct coroutine *c = p; - - ASSERT_DIE(c->entry); - - this_coro = c; - rcu_coro_start(&c->rcu); - - c->entry(c->data); - ASSERT_DIE(coro_cleaned_up); - - return NULL; -} - -struct coroutine *coro_run(pool *p, void (*entry)(void *), void *data) -{ - ASSERT_DIE(entry); - ASSERT_DIE(p); - - struct coroutine *c = ralloc(p, &coro_class); - - c->entry = entry; - c->data = data; - - int e = 0; - - if (e = pthread_attr_init(&c->attr)) - die("pthread_attr_init() failed: %M", e); - - if (e = pthread_attr_setstacksize(&c->attr, CORO_STACK_SIZE)) - die("pthread_attr_setstacksize(%u) failed: %M", CORO_STACK_SIZE, e); - - if (e = pthread_attr_setdetachstate(&c->attr, PTHREAD_CREATE_DETACHED)) - die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e); - - if (e = pthread_create(&c->id, &c->attr, coro_entry, c)) - die("pthread_create() failed: %M", e); - - return c; -} - -void -coro_yield(void) -{ - const struct timespec req = { .tv_nsec = 100 }; - nanosleep(&req, NULL); -} diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index 9b107a1a..efb408e0 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -17,8 +17,8 @@ #include "nest/bird.h" #include "lib/buffer.h" -#include "lib/coro.h" #include "lib/lists.h" +#include "lib/locking.h" #include "lib/resource.h" #include "lib/event.h" #include "lib/timer.h" @@ -27,15 +27,54 @@ #include "lib/io-loop.h" #include "sysdep/unix/io-loop.h" #include "conf/conf.h" +#include "nest/cli.h" + +#define THREAD_STACK_SIZE 65536 /* To be lowered in near future */ + +static struct birdloop *birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup, struct birdloop_pickup_group *group); + +/* + * Nanosecond time for accounting purposes + * + * A fixed point on startup is set as zero, all other values are relative to that. + * Caution: this overflows after like 500 years or so. If you plan to run + * BIRD for such a long time, please implement some means of overflow prevention. + */ + +static struct timespec ns_begin; + +static void ns_init(void) +{ + if (clock_gettime(CLOCK_MONOTONIC, &ns_begin)) + bug("clock_gettime: %m"); +} + +static u64 ns_now(void) +{ + struct timespec ts; + if (clock_gettime(CLOCK_MONOTONIC, &ts)) + bug("clock_gettime: %m"); + + return (u64) (ts.tv_sec - ns_begin.tv_sec) * 1000000000 + ts.tv_nsec - ns_begin.tv_nsec; +} + /* * Current thread context */ -_Thread_local struct birdloop *birdloop_current = NULL; +_Thread_local struct birdloop *birdloop_current; static _Thread_local struct birdloop *birdloop_wakeup_masked; static _Thread_local uint birdloop_wakeup_masked_count; +#define LOOP_NAME(loop) domain_name((loop)->time.domain) + +#define LOOP_TRACE(loop, fmt, args...) do { if (config && config->latency_debug) log(L_TRACE "%s (%p): " fmt, LOOP_NAME(loop), (loop), ##args); } while (0) +#define THREAD_TRACE(...) do { if (config && config->latency_debug) log(L_TRACE "Thread: " __VA_ARGS__); } while (0) + +#define LOOP_WARN(loop, fmt, args...) log(L_TRACE "%s (%p): " fmt, LOOP_NAME(loop), (loop), ##args) + + event_list * birdloop_event_list(struct birdloop *loop) { @@ -48,12 +87,6 @@ birdloop_time_loop(struct birdloop *loop) return &loop->time; } -pool * -birdloop_pool(struct birdloop *loop) -{ - return loop->pool; -} - _Bool birdloop_inside(struct birdloop *loop) { @@ -64,91 +97,210 @@ birdloop_inside(struct birdloop *loop) return 0; } +_Bool +birdloop_in_this_thread(struct birdloop *loop) +{ + return pthread_equal(pthread_self(), loop->thread->thread_id); +} + +void +birdloop_flag(struct birdloop *loop, u32 flag) +{ + atomic_fetch_or_explicit(&loop->flags, flag, memory_order_acq_rel); + birdloop_ping(loop); +} + +void +birdloop_flag_set_handler(struct birdloop *loop, struct birdloop_flag_handler *fh) +{ + ASSERT_DIE(birdloop_inside(loop)); + loop->flag_handler = fh; +} + +static int +birdloop_process_flags(struct birdloop *loop) +{ + if (!loop->flag_handler) + return 0; + + u32 flags = atomic_exchange_explicit(&loop->flags, 0, memory_order_acq_rel); + if (!flags) + return 0; + + loop->flag_handler->hook(loop->flag_handler, flags); + return 1; +} + /* * Wakeup code for birdloop */ -static void -pipe_new(int *pfds) +void +pipe_new(struct pipe *p) { - int rv = pipe(pfds); + int rv = pipe(p->fd); if (rv < 0) die("pipe: %m"); - if (fcntl(pfds[0], F_SETFL, O_NONBLOCK) < 0) + if (fcntl(p->fd[0], F_SETFL, O_NONBLOCK) < 0) die("fcntl(O_NONBLOCK): %m"); - if (fcntl(pfds[1], F_SETFL, O_NONBLOCK) < 0) + if (fcntl(p->fd[1], F_SETFL, O_NONBLOCK) < 0) die("fcntl(O_NONBLOCK): %m"); } void -pipe_drain(int fd) +pipe_drain(struct pipe *p) { - char buf[64]; - int rv; - - try: - rv = read(fd, buf, 64); - if (rv < 0) - { - if (errno == EINTR) - goto try; - if (errno == EAGAIN) + while (1) { + char buf[64]; + int rv = read(p->fd[0], buf, sizeof(buf)); + if ((rv < 0) && (errno == EAGAIN)) return; - die("wakeup read: %m"); + + if (rv == 0) + bug("wakeup read eof"); + if ((rv < 0) && (errno != EINTR)) + bug("wakeup read: %m"); + } +} + +int +pipe_read_one(struct pipe *p) +{ + while (1) { + char v; + int rv = read(p->fd[0], &v, sizeof(v)); + if (rv == 1) + return 1; + if ((rv < 0) && (errno == EAGAIN)) + return 0; + if (rv > 1) + bug("wakeup read more bytes than expected: %d", rv); + if (rv == 0) + bug("wakeup read eof"); + if (errno != EINTR) + bug("wakeup read: %m"); } - if (rv == 64) - goto try; } void -pipe_kick(int fd) +pipe_kick(struct pipe *p) { - u64 v = 1; + char v = 1; int rv; - try: - rv = write(fd, &v, sizeof(u64)); - if (rv < 0) - { - if (errno == EINTR) - goto try; - if (errno == EAGAIN) + while (1) { + rv = write(p->fd[1], &v, sizeof(v)); + if ((rv >= 0) || (errno == EAGAIN)) return; - die("wakeup write: %m"); + if (errno != EINTR) + bug("wakeup write: %m"); } } +void +pipe_pollin(struct pipe *p, struct pfd *pfd) +{ + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = p->fd[0], + .events = POLLIN, + }; + BUFFER_PUSH(pfd->loop) = NULL; +} + static inline void -wakeup_init(struct birdloop *loop) +wakeup_init(struct bird_thread *loop) { - pipe_new(loop->wakeup_fds); + pipe_new(&loop->wakeup); } static inline void -wakeup_drain(struct birdloop *loop) +wakeup_drain(struct bird_thread *loop) { - pipe_drain(loop->wakeup_fds[0]); + pipe_drain(&loop->wakeup); } static inline void -wakeup_do_kick(struct birdloop *loop) +wakeup_do_kick(struct bird_thread *loop) { - pipe_kick(loop->wakeup_fds[1]); + pipe_kick(&loop->wakeup); } -void -birdloop_ping(struct birdloop *loop) +static inline _Bool +birdloop_try_ping(struct birdloop *loop, u32 ltt) { - u32 ping_sent = atomic_fetch_add_explicit(&loop->ping_sent, 1, memory_order_acq_rel); - if (ping_sent) - return; + /* Somebody else is already pinging, be idempotent */ + if (ltt & LTT_PING) + { + LOOP_TRACE(loop, "already being pinged"); + return 0; + } + + /* Thread moving is an implicit ping */ + if (ltt & LTT_MOVE) + { + LOOP_TRACE(loop, "ping while moving"); + return 1; + } + /* No more flags allowed */ + ASSERT_DIE(!ltt); + + /* No ping when not picked up */ + if (!loop->thread) + { + LOOP_TRACE(loop, "not picked up yet, can't ping"); + return 1; + } + + /* No ping when masked */ if (loop == birdloop_wakeup_masked) + { + LOOP_TRACE(loop, "wakeup masked, can't ping"); birdloop_wakeup_masked_count++; + return 1; + } + + /* Send meta event to ping */ + if ((loop != loop->thread->meta) && (loop != &main_birdloop)) + { + LOOP_TRACE(loop, "Ping by meta event to %p", loop->thread->meta); + ev_send_loop(loop->thread->meta, &loop->event); + return 1; + } + + /* Do the real ping */ + LOOP_TRACE(loop, "sending pipe ping"); + wakeup_do_kick(loop->thread); + return 0; +} + +static inline void +birdloop_do_ping(struct birdloop *loop) +{ + /* Register our ping effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_PING, memory_order_acq_rel); + + /* Try to ping in multiple ways */ + if (birdloop_try_ping(loop, ltt)) + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_PING, memory_order_acq_rel); +} + +void +birdloop_ping(struct birdloop *loop) +{ + if (!birdloop_inside(loop)) + { + LOOP_TRACE(loop, "ping from outside"); + birdloop_do_ping(loop); + } else - wakeup_do_kick(loop); + { + LOOP_TRACE(loop, "ping from inside, pending=%d", loop->ping_pending); + if (!loop->ping_pending) + loop->ping_pending++; + } } @@ -161,204 +313,748 @@ sockets_init(struct birdloop *loop) { init_list(&loop->sock_list); loop->sock_num = 0; +} + +void +socket_changed(sock *s) +{ + struct birdloop *loop = s->loop; + ASSERT_DIE(birdloop_inside(loop)); - BUFFER_INIT(loop->poll_sk, loop->pool, 4); - BUFFER_INIT(loop->poll_fd, loop->pool, 4); - loop->poll_changed = 1; /* add wakeup fd */ + loop->sock_changed++; + birdloop_ping(loop); } -static void -sockets_add(struct birdloop *loop, sock *s) +void +birdloop_add_socket(struct birdloop *loop, sock *s) { - ASSERT_DIE(!enlisted(&s->n)); + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(!s->loop); + LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num); add_tail(&loop->sock_list, &s->n); loop->sock_num++; s->loop = loop; s->index = -1; - loop->poll_changed = 1; - birdloop_ping(loop); + socket_changed(s); } +extern sock *stored_sock; /* mainloop hack */ + void -sk_start(sock *s) +birdloop_remove_socket(struct birdloop *loop, sock *s) { - ASSERT_DIE(birdloop_current != &main_birdloop); - sockets_add(birdloop_current, s); -} + ASSERT_DIE(!enlisted(&s->n) == !s->loop); -static void -sockets_remove(struct birdloop *loop, sock *s) -{ + if (!s->loop) + return; + + ASSERT_DIE(birdloop_inside(loop)); ASSERT_DIE(s->loop == loop); - if (!enlisted(&s->n)) - return; + /* Decouple the socket from the loop at all. */ + LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num); + + if (loop->sock_active == s) + loop->sock_active = sk_next(s); + + if ((loop == &main_birdloop) && (s == stored_sock)) + stored_sock = sk_next(s); rem_node(&s->n); loop->sock_num--; - if (s->index >= 0) - { - loop->poll_sk.data[s->index] = NULL; - s->index = -1; - loop->poll_changed = 1; - birdloop_ping(loop); - } + socket_changed(s); s->loop = NULL; + s->index = -1; } void -sk_stop(sock *s) +sk_reloop(sock *s, struct birdloop *loop) { - sockets_remove(birdloop_current, s); + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(birdloop_inside(s->loop)); + + if (loop == s->loop) + return; + + birdloop_remove_socket(s->loop, s); + birdloop_add_socket(loop, s); +} + +void +sk_pause_rx(struct birdloop *loop, sock *s) +{ + ASSERT_DIE(birdloop_inside(loop)); + s->rx_hook = NULL; + socket_changed(s); +} + +void +sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint)) +{ + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(hook); + s->rx_hook = hook; + socket_changed(s); } static inline uint sk_want_events(sock *s) +{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); } + +void +sockets_prepare(struct birdloop *loop, struct pfd *pfd) { - uint out = ((s->ttx != s->tpos) ? POLLOUT : 0); - if (s->rx_hook) - if (s->cork) + node *n; + WALK_LIST(n, loop->sock_list) + { + sock *s = SKIP_BACK(sock, n, n); + uint w = sk_want_events(s); + + if (!w) { - LOCK_DOMAIN(cork, s->cork->lock); - if (!enlisted(&s->cork_node)) - if (s->cork->count) - { -// log(L_TRACE "Socket %p corked", s); - add_tail(&s->cork->sockets, &s->cork_node); - } - else - out |= POLLIN; - UNLOCK_DOMAIN(cork, s->cork->lock); + s->index = -1; + continue; } - else - out |= POLLIN; -// log(L_TRACE "sk_want_events(%p) = %x", s, out); - return out; + s->index = pfd->pfd.used; + LOOP_TRACE(loop, "socket %p poll index is %d", s, s->index); + + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = s->fd, + .events = sk_want_events(s), + }; + BUFFER_PUSH(pfd->loop) = loop; + } } +int sk_read(sock *s, int revents); +int sk_write(sock *s); +void sk_err(sock *s, int revents); -void -sk_ping(sock *s) +static int +sockets_fire(struct birdloop *loop) { - s->loop->poll_changed = 1; - birdloop_ping(s->loop); + if (EMPTY_LIST(loop->sock_list)) + return 0; + + int repeat = 0; + + times_update(); + + struct pollfd *pfd = loop->thread->pfd->pfd.data; + loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list)); + + while (loop->sock_active) + { + sock *s = loop->sock_active; + + int rev; + if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL)) + { + int e = 1; + + if (rev & POLLOUT) + { + /* Write everything. */ + while ((s == loop->sock_active) && (e = sk_write(s))) + ; + + if (s != loop->sock_active) + continue; + + if (!sk_tx_pending(s)) + loop->thread->sock_changed++; + } + + if (rev & POLLIN) + /* Read just one packet and request repeat. */ + if ((s == loop->sock_active) && s->rx_hook) + if (sk_read(s, rev)) + repeat++; + + if (s != loop->sock_active) + continue; + + if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR)) + sk_err(s, rev); + + if (s != loop->sock_active) + continue; + } + + loop->sock_active = sk_next(s); + } + + return repeat; } /* -FIXME: this should be called from sock code + * Threads + */ + +DEFINE_DOMAIN(resource); + +struct birdloop_pickup_group { + DOMAIN(resource) domain; + list loops; + list threads; + btime max_latency; +} pickup_groups[2] = { + { + /* all zeroes */ + }, + { + /* FIXME: make this dynamic, now it copies the loop_max_latency value from proto/bfd/config.Y */ + .max_latency = 10 MS, + }, +}; + +static _Thread_local struct bird_thread *this_thread; static void -sockets_update(struct birdloop *loop, sock *s) +birdloop_set_thread(struct birdloop *loop, struct bird_thread *thr, struct birdloop_pickup_group *group) +{ + struct bird_thread *old = loop->thread; + ASSERT_DIE(!thr != !old); + + /* Signal our moving effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_MOVE, memory_order_acq_rel); + ASSERT_DIE((ltt & LTT_MOVE) == 0); + + while (ltt & LTT_PING) + { + birdloop_yield(); + ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + ASSERT_DIE(ltt & LTT_MOVE); + } + /* Now we are free of running pings */ + + if (loop->thread = thr) + { + add_tail(&thr->loops, &loop->n); + thr->loop_count++; + } + else + { + old->loop_count--; + + LOCK_DOMAIN(resource, group->domain); + add_tail(&group->loops, &loop->n); + UNLOCK_DOMAIN(resource, group->domain); + } + + /* Finished */ + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel); + + /* Request to run by force */ + ev_send_loop(loop->thread->meta, &loop->event); +} + +static struct birdloop * +birdloop_take(struct birdloop_pickup_group *group) { - if (s->index >= 0) - loop->poll_fd.data[s->index].events = sk_want_events(s); + struct birdloop *loop = NULL; + + LOCK_DOMAIN(resource, group->domain); + if (!EMPTY_LIST(group->loops)) + { + /* Take the first loop from the pickup list and unlock */ + loop = SKIP_BACK(struct birdloop, n, HEAD(group->loops)); + rem_node(&loop->n); + UNLOCK_DOMAIN(resource, group->domain); + + birdloop_set_thread(loop, this_thread, group); + + /* This thread goes to the end of the pickup list */ + LOCK_DOMAIN(resource, group->domain); + rem_node(&this_thread->n); + add_tail(&group->threads, &this_thread->n); + + /* If there are more loops to be picked up, wakeup the next thread in order */ + if (!EMPTY_LIST(group->loops)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads))); + } + UNLOCK_DOMAIN(resource, group->domain); + + return loop; } -*/ static void -sockets_prepare(struct birdloop *loop) +birdloop_drop(struct birdloop *loop, struct birdloop_pickup_group *group) { - BUFFER_SET(loop->poll_sk, loop->sock_num + 1); - BUFFER_SET(loop->poll_fd, loop->sock_num + 1); + /* Remove loop from this thread's list */ + rem_node(&loop->n); - struct pollfd *pfd = loop->poll_fd.data; - sock **psk = loop->poll_sk.data; - uint i = 0; - node *n; + /* Unset loop's thread */ + if (birdloop_inside(loop)) + birdloop_set_thread(loop, NULL, group); + else + { + birdloop_enter(loop); + birdloop_set_thread(loop, NULL, group); + birdloop_leave(loop); + } - WALK_LIST(n, loop->sock_list) + /* Put loop into pickup list */ + LOCK_DOMAIN(resource, group->domain); + add_tail(&group->loops, &loop->n); + UNLOCK_DOMAIN(resource, group->domain); +} + +static int +poll_timeout(struct birdloop *loop) +{ + timer *t = timers_first(&loop->time); + if (!t) + return -1; + + btime remains = tm_remains(t); + return remains TO_MS + ((remains TO_MS) MS < remains); +} + +static void * +bird_thread_main(void *arg) +{ + struct bird_thread *thr = this_thread = arg; + + rcu_thread_start(&thr->rcu); + synchronize_rcu(); + + tmp_init(thr->pool); + init_list(&thr->loops); + + thr->meta = birdloop_new_internal(thr->pool, DOMAIN_ORDER(meta), "Thread Meta", 0, thr->group); + thr->meta->thread = thr; + birdloop_enter(thr->meta); + + thr->sock_changed = 1; + + struct pfd pfd; + BUFFER_INIT(pfd.pfd, thr->pool, 16); + BUFFER_INIT(pfd.loop, thr->pool, 16); + thr->pfd = &pfd; + + while (1) { - sock *s = SKIP_BACK(sock, n, n); + u64 thr_loop_start = ns_now(); + int timeout; + + /* Pickup new loops */ + struct birdloop *loop = birdloop_take(thr->group); + if (loop) + { + birdloop_enter(loop); + if (!EMPTY_LIST(loop->sock_list)) + thr->sock_changed = 1; + birdloop_leave(loop); + } + + /* Schedule all loops with timed out timers */ + timers_fire(&thr->meta->time, 0); + + /* Compute maximal time per loop */ + u64 thr_before_run = ns_now(); + if (thr->loop_count > 0) + thr->max_loop_time_ns = (thr->max_latency_ns / 2 - (thr_before_run - thr_loop_start)) / (u64) thr->loop_count; + + /* Run all scheduled loops */ + int more_events = ev_run_list(&thr->meta->event_list); + if (more_events) + { + THREAD_TRACE("More events to run"); + timeout = 0; + } + else + { + timeout = poll_timeout(thr->meta); + if (timeout == -1) + THREAD_TRACE("No timers, no events"); + else + THREAD_TRACE("Next timer in %d ms", timeout); + } + + /* Run priority events before sleeping */ + ev_run_list(&thr->priority_events); + + /* Do we have to refresh sockets? */ + if (thr->sock_changed) + { + thr->sock_changed = 0; + + BUFFER_FLUSH(pfd.pfd); + BUFFER_FLUSH(pfd.loop); + + pipe_pollin(&thr->wakeup, &pfd); + + node *nn; + WALK_LIST2(loop, nn, thr->loops, n) + { + birdloop_enter(loop); + sockets_prepare(loop, &pfd); + birdloop_leave(loop); + } + + ASSERT_DIE(pfd.loop.used == pfd.pfd.used); + } + /* Nothing to do in at least 5 seconds, flush local hot page cache */ + else if (timeout > 5000) + flush_local_pages(); + +poll_retry:; + int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout); + if (rv < 0) + { + if (errno == EINTR || errno == EAGAIN) + goto poll_retry; + bug("poll in %p: %m", thr); + } - ASSERT(i < loop->sock_num); + /* Drain wakeup fd */ + if (pfd.pfd.data[0].revents & POLLIN) + { + ASSERT_DIE(rv > 0); + rv--; + wakeup_drain(thr); + } - s->index = i; - *psk = s; - pfd->fd = s->fd; - pfd->events = sk_want_events(s); - pfd->revents = 0; + atomic_fetch_and_explicit(&thr->meta->thread_transition, ~LTT_PING, memory_order_acq_rel); - pfd++; - psk++; - i++; + /* Schedule loops with active sockets */ + if (rv) + for (uint i = 1; i < pfd.pfd.used; i++) + if (pfd.pfd.data[i].revents) + { + LOOP_TRACE(pfd.loop.data[i], "socket id %d got revents=%d", i, pfd.pfd.data[i].revents); + ev_send_loop(thr->meta, &pfd.loop.data[i]->event); + } } - ASSERT(i == loop->sock_num); + bug("An infinite loop has ended."); +} + +static void +bird_thread_cleanup(void *_thr) +{ + struct bird_thread *thr = _thr; + ASSERT_DIE(birdloop_inside(&main_birdloop)); + + /* Thread attributes no longer needed */ + pthread_attr_destroy(&thr->thread_attr); + + /* Free all remaining memory */ + rfree(thr->pool); +} + +static struct bird_thread * +bird_thread_start(struct birdloop_pickup_group *group) +{ + ASSERT_DIE(birdloop_inside(&main_birdloop)); + ASSERT_DIE(DOMAIN_IS_LOCKED(resource, group->domain)); + + pool *p = rp_new(&root_pool, "Thread"); + + struct bird_thread *thr = mb_allocz(p, sizeof(*thr)); + thr->pool = p; + thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, }; + thr->group = group; + thr->max_latency_ns = (group->max_latency ?: 5 S) TO_NS; + + wakeup_init(thr); + ev_init_list(&thr->priority_events, NULL, "Thread direct event list"); - /* Add internal wakeup fd */ - *psk = NULL; - pfd->fd = loop->wakeup_fds[0]; - pfd->events = POLLIN; - pfd->revents = 0; + add_tail(&group->threads, &thr->n); - loop->poll_changed = 0; + int e = 0; + + if (e = pthread_attr_init(&thr->thread_attr)) + die("pthread_attr_init() failed: %M", e); + + /* We don't have to worry about thread stack size so much. + if (e = pthread_attr_setstacksize(&thr->thread_attr, THREAD_STACK_SIZE)) + die("pthread_attr_setstacksize(%u) failed: %M", THREAD_STACK_SIZE, e); + */ + + if (e = pthread_attr_setdetachstate(&thr->thread_attr, PTHREAD_CREATE_DETACHED)) + die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e); + + if (e = pthread_create(&thr->thread_id, &thr->thread_attr, bird_thread_main, thr)) + die("pthread_create() failed: %M", e); + + return thr; } -int sk_read(sock *s, int revents); -int sk_write(sock *s); +static struct birdloop *thread_dropper; +static event *thread_dropper_event; +static uint thread_dropper_goal; static void -sockets_fire(struct birdloop *loop) +bird_thread_shutdown(void * _ UNUSED) { - struct pollfd *pfd = loop->poll_fd.data; - sock **psk = loop->poll_sk.data; - int poll_num = loop->poll_fd.used - 1; + struct birdloop_pickup_group *group = this_thread->group; + LOCK_DOMAIN(resource, group->domain); + int dif = list_length(&group->threads) - thread_dropper_goal; + struct birdloop *tdl_stop = NULL; - times_update(); + if (dif > 0) + ev_send_loop(thread_dropper, thread_dropper_event); + else + { + tdl_stop = thread_dropper; + thread_dropper = NULL; + } + + UNLOCK_DOMAIN(resource, group->domain); - /* Last fd is internal wakeup fd */ - if (pfd[poll_num].revents & POLLIN) + DBG("Thread pickup size differs from dropper goal by %d%s\n", dif, tdl_stop ? ", stopping" : ""); + + if (tdl_stop) { - wakeup_drain(loop); - loop->poll_changed = 1; + birdloop_stop_self(tdl_stop, NULL, NULL); + return; } - int i; - for (i = 0; i < poll_num; pfd++, psk++, i++) + struct bird_thread *thr = this_thread; + + /* Leave the thread-picker list to get no more loops */ + LOCK_DOMAIN(resource, group->domain); + rem_node(&thr->n); + UNLOCK_DOMAIN(resource, group->domain); + + /* Drop loops including the thread dropper itself */ + while (!EMPTY_LIST(thr->loops)) + birdloop_drop(HEAD(thr->loops), group); + + /* Let others know about new loops */ + if (!EMPTY_LIST(group->loops)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads))); + UNLOCK_DOMAIN(resource, group->domain); + + /* Leave the thread-dropper loop as we aren't going to return. */ + birdloop_leave(thread_dropper); + + /* Stop the meta loop */ + birdloop_leave(thr->meta); + domain_free(thr->meta->time.domain); + rfree(thr->meta->pool); + + /* Local pages not needed anymore */ + flush_local_pages(); + + /* Unregister from RCU */ + rcu_thread_stop(&thr->rcu); + + /* Request thread cleanup from main loop */ + ev_send_loop(&main_birdloop, &thr->cleanup_event); + + /* Exit! */ + pthread_exit(NULL); +} + + +void +bird_thread_commit(struct config *new, struct config *old UNUSED) +{ + ASSERT_DIE(birdloop_inside(&main_birdloop)); + + if (new->shutdown) + return; + + if (!new->thread_count) + new->thread_count = 1; + + while (1) { - if (!*psk) - continue; + struct birdloop_pickup_group *group = &pickup_groups[0]; + LOCK_DOMAIN(resource, group->domain); + + int dif = list_length(&group->threads) - (thread_dropper_goal = new->thread_count); + _Bool thread_dropper_running = !!thread_dropper; - if (! pfd->revents) + if (dif < 0) + { + bird_thread_start(group); + UNLOCK_DOMAIN(resource, group->domain); continue; + } + + UNLOCK_DOMAIN(resource, group->domain); + + if ((dif > 0) && !thread_dropper_running) + { + struct birdloop *tdl = birdloop_new(&root_pool, DOMAIN_ORDER(control), "Thread dropper", group->max_latency); + event *tde = ev_new_init(tdl->pool, bird_thread_shutdown, NULL); + + LOCK_DOMAIN(resource, group->domain); + thread_dropper = tdl; + thread_dropper_event = tde; + UNLOCK_DOMAIN(resource, group->domain); + + ev_send_loop(thread_dropper, thread_dropper_event); + } + + return; + } +} + + +DEFINE_DOMAIN(control); + +struct bird_thread_show_data { + cli *cli; + pool *pool; + DOMAIN(control) lock; + uint total; + uint done; + u8 show_loops; +}; - if (pfd->revents & POLLNVAL) - bug("poll: invalid fd %d", pfd->fd); +static void +bird_thread_show_cli_cont(struct cli *c UNUSED) +{ + /* Explicitly do nothing to prevent CLI from trying to parse another command. */ +} + +static int +bird_thread_show_cli_cleanup(struct cli *c UNUSED) +{ + return 1; /* Defer the cleanup until the writeout is finished. */ +} - int e = 1; +static void +bird_thread_show(void *data) +{ + struct bird_thread_show_data *tsd = data; - if (pfd->revents & POLLIN) - while (e && *psk && (*psk)->rx_hook) - e = sk_read(*psk, pfd->revents); + LOCK_DOMAIN(control, tsd->lock); + if (tsd->show_loops) + cli_printf(tsd->cli, -1026, "Thread %p", this_thread); + + u64 total_time_ns = 0; + struct birdloop *loop; + WALK_LIST(loop, this_thread->loops) + { + if (tsd->show_loops) + cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS); + total_time_ns += loop->total_time_spent_ns; + } + + tsd->done++; + int last = (tsd->done == tsd->total); + + if (last) + { + tsd->cli->cont = NULL; + tsd->cli->cleanup = NULL; + } - e = 1; - if (pfd->revents & POLLOUT) + if (tsd->show_loops) + cli_printf(tsd->cli, (last ? 1 : -1) * 1026, " Total time: %t", total_time_ns NS); + else + cli_printf(tsd->cli, (last ? 1 : -1) * 1026, "Thread %p time %t", this_thread, total_time_ns NS); + + UNLOCK_DOMAIN(control, tsd->lock); + + if (last) + { + the_bird_lock(); + + for (int i=0; i<2; i++) { - loop->poll_changed = 1; - while (e && *psk) - e = sk_write(*psk); + struct birdloop_pickup_group *group = &pickup_groups[i]; + + LOCK_DOMAIN(resource, group->domain); + if (!EMPTY_LIST(group->loops)) + if (tsd->show_loops) + { + cli_printf(tsd->cli, -1026, "Unassigned loops"); + WALK_LIST(loop, group->loops) + cli_printf(tsd->cli, -1026, " Loop %s time: %t", domain_name(loop->time.domain), loop->total_time_spent_ns NS); + } + else + { + uint count = 0; + u64 total_time_ns = 0; + WALK_LIST(loop, group->loops) + { + count++; + total_time_ns += loop->total_time_spent_ns; + } + cli_printf(tsd->cli, -1026, "Unassigned loops: %d, total time %t", count, total_time_ns NS); + } + UNLOCK_DOMAIN(resource, group->domain); } + + cli_write_trigger(tsd->cli); + DOMAIN_FREE(control, tsd->lock); + rfree(tsd->pool); + + the_bird_unlock(); } } +void +cmd_show_threads(int show_loops) +{ + pool *p = rp_new(&root_pool, "Show Threads"); + + struct bird_thread_show_data *tsd = mb_allocz(p, sizeof(struct bird_thread_show_data)); + tsd->lock = DOMAIN_NEW(control, "Show Threads"); + tsd->cli = this_cli; + tsd->pool = p; + tsd->show_loops = show_loops; + + this_cli->cont = bird_thread_show_cli_cont; + this_cli->cleanup = bird_thread_show_cli_cleanup; + + for (int i=0; i<2; i++) + { + struct birdloop_pickup_group *group = &pickup_groups[i]; + + LOCK_DOMAIN(control, tsd->lock); + LOCK_DOMAIN(resource, group->domain); + + struct bird_thread *thr; + WALK_LIST(thr, group->threads) + { + tsd->total++; + ev_send(&thr->priority_events, ev_new_init(p, bird_thread_show, tsd)); + wakeup_do_kick(thr); + } + + UNLOCK_DOMAIN(resource, group->domain); + UNLOCK_DOMAIN(control, tsd->lock); + } +} + /* * Birdloop */ -struct birdloop main_birdloop; +static struct bird_thread main_thread; +struct birdloop main_birdloop = { .thread = &main_thread, }; static void birdloop_enter_locked(struct birdloop *loop); void birdloop_init(void) { - wakeup_init(&main_birdloop); + ns_init(); + + for (int i=0; i<2; i++) + { + struct birdloop_pickup_group *group = &pickup_groups[i]; + + group->domain = DOMAIN_NEW(resource, "Loop Pickup"); + init_list(&group->loops); + init_list(&group->threads); + } + + wakeup_init(main_birdloop.thread); main_birdloop.time.domain = the_bird_domain.the_bird; main_birdloop.time.loop = &main_birdloop; @@ -366,85 +1062,177 @@ birdloop_init(void) times_update(); timers_init(&main_birdloop.time, &root_pool); - root_pool.loop = &main_birdloop; - main_birdloop.pool = &root_pool; - birdloop_enter_locked(&main_birdloop); } -static void birdloop_main(void *arg); - -void -birdloop_free(resource *r) +static void +birdloop_stop_internal(struct birdloop *loop) { - struct birdloop *loop = (void *) r; + LOOP_TRACE(loop, "Stopping"); - ASSERT_DIE(loop->links == 0); - domain_free(loop->time.domain); -} + /* Block incoming pings */ + u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + while (!atomic_compare_exchange_strong_explicit( + &loop->thread_transition, <t, LTT_PING, + memory_order_acq_rel, memory_order_acquire)) + ; -void -birdloop_dump(resource *r) -{ - struct birdloop *loop = (void *) r; + /* Flush remaining events */ + ASSERT_DIE(!ev_run_list(&loop->event_list)); + + /* Drop timers */ + timer *t; + while (t = timers_first(&loop->time)) + tm_stop(t); - debug("%s\n", loop->pool->name); + /* Drop sockets */ + sock *s; + WALK_LIST_FIRST2(s, n, loop->sock_list) + birdloop_remove_socket(loop, s); + + /* Unschedule from Meta */ + ev_postpone(&loop->event); + tm_stop(&loop->timer); + + /* Remove from thread loop list */ + rem_node(&loop->n); + loop->thread = NULL; + + /* Leave the loop context without causing any other fuss */ + ASSERT_DIE(!ev_active(&loop->event)); + loop->ping_pending = 0; + birdloop_leave(loop); + + /* Request local socket reload */ + this_thread->sock_changed++; + + /* Tail-call the stopped hook */ + loop->stopped(loop->stop_data); } -struct resmem birdloop_memsize(resource *r) +static void +birdloop_run(void *_loop) { - struct birdloop *loop = (void *) r; + /* Run priority events before the loop is executed */ + ev_run_list(&this_thread->priority_events); + + u64 start_time = ns_now(); + u64 end_time = start_time + this_thread->max_loop_time_ns; + + struct birdloop *loop = _loop; + birdloop_enter(loop); + + u64 locked_time = ns_now(), task_done_time; + if (locked_time > end_time) + LOOP_WARN(loop, "locked %luns after its scheduled end time", locked_time - end_time); + + uint repeat, loop_runs = 0; + do { + repeat = 0; + LOOP_TRACE(loop, "Regular run"); + loop_runs++; + + if (loop->stopped) + /* Birdloop left inside the helper function */ + return birdloop_stop_internal(loop); + + /* Process sockets */ + repeat += sockets_fire(loop); - return (struct resmem) { - .effective = sizeof(struct birdloop) - sizeof(resource) - ALLOC_OVERHEAD, - .overhead = ALLOC_OVERHEAD + sizeof(resource) + page_size * list_length(&loop->pages.list), - }; + /* Run timers */ + timers_fire(&loop->time, 0); + + /* Run flag handlers */ + repeat += birdloop_process_flags(loop); + + /* Run events */ + repeat += ev_run_list(&loop->event_list); + + /* Check end time */ + } while (((task_done_time = ns_now()) < end_time) && repeat); + + /* Request meta timer */ + timer *t = timers_first(&loop->time); + if (t) + tm_start_in(&loop->timer, tm_remains(t), this_thread->meta); + else + tm_stop(&loop->timer); + + /* Request re-run if needed */ + if (repeat) + ev_send_loop(this_thread->meta, &loop->event); + + /* Collect socket change requests */ + this_thread->sock_changed += loop->sock_changed; + loop->sock_changed = 0; + + birdloop_leave(loop); } -struct resclass birdloop_class = { - .name = "IO Loop", - .size = sizeof(struct birdloop), - .free = birdloop_free, - .dump = birdloop_dump, - .memsize = birdloop_memsize, -}; +static void +birdloop_run_timer(timer *tm) +{ + struct birdloop *loop = tm->data; + LOOP_TRACE(loop, "Timer ready, requesting run"); + ev_send_loop(loop->thread->meta, &loop->event); +} -struct birdloop * -birdloop_new(pool *pp, uint order, const char *name) +static struct birdloop * +birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup, struct birdloop_pickup_group *group) { struct domain_generic *dg = domain_new(name, order); - struct birdloop *loop = ralloc(pp, &birdloop_class); + pool *p = rp_new(pp, name); + struct birdloop *loop = mb_allocz(p, sizeof(struct birdloop)); + loop->pool = p; loop->time.domain = dg; loop->time.loop = loop; - birdloop_enter(loop); + atomic_store_explicit(&loop->thread_transition, 0, memory_order_relaxed); - loop->pool = rp_new(pp, loop, name); - loop->parent = pp; - rmove(&loop->r, loop->pool); + birdloop_enter(loop); - wakeup_init(loop); ev_init_list(&loop->event_list, loop, name); - timers_init(&loop->time, loop->pool); + timers_init(&loop->time, p); sockets_init(loop); - init_pages(loop); + loop->event = (event) { .hook = birdloop_run, .data = loop, }; + loop->timer = (timer) { .hook = birdloop_run_timer, .data = loop, }; - loop->time.coro = coro_run(loop->pool, birdloop_main, loop); + if (request_pickup) + { + LOCK_DOMAIN(resource, group->domain); + add_tail(&group->loops, &loop->n); + if (EMPTY_LIST(group->threads)) + bird_thread_start(group); + + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads))); + UNLOCK_DOMAIN(resource, group->domain); + } + else + loop->n.next = loop->n.prev = &loop->n; birdloop_leave(loop); return loop; } +struct birdloop * +birdloop_new(pool *pp, uint order, const char *name, btime max_latency) +{ + return birdloop_new_internal(pp, order, name, 1, max_latency ? &pickup_groups[1] : &pickup_groups[0]); +} + static void birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { + LOOP_TRACE(loop, "Stop requested"); + loop->stopped = stopped; loop->stop_data = data; - wakeup_do_kick(loop); + + birdloop_do_ping(loop); } void @@ -464,6 +1252,15 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat birdloop_do_stop(loop, stopped, data); } +void +birdloop_free(struct birdloop *loop) +{ + ASSERT_DIE(loop->thread == NULL); + + domain_free(loop->time.domain); + rfree(loop->pool); +} + static void birdloop_enter_locked(struct birdloop *loop) { @@ -490,6 +1287,14 @@ birdloop_leave_locked(struct birdloop *loop) /* Check the current context */ ASSERT_DIE(birdloop_current == loop); + /* Send pending pings */ + if (loop->ping_pending) + { + LOOP_TRACE(loop, "sending pings on leave"); + loop->ping_pending = 0; + birdloop_do_ping(loop); + } + /* Restore the old context */ birdloop_current = loop->prev_loop; } @@ -514,107 +1319,13 @@ birdloop_unmask_wakeups(struct birdloop *loop) ASSERT_DIE(birdloop_wakeup_masked == loop); birdloop_wakeup_masked = NULL; if (birdloop_wakeup_masked_count) - wakeup_do_kick(loop); + wakeup_do_kick(loop->thread); birdloop_wakeup_masked_count = 0; } void -birdloop_link(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - loop->links++; -} - -void -birdloop_unlink(struct birdloop *loop) -{ - ASSERT_DIE(birdloop_inside(loop)); - ASSERT_DIE(loop->links); - if (!--loop->links) - birdloop_ping(loop); -} - -static void -birdloop_main(void *arg) +birdloop_yield(void) { - struct birdloop *loop = arg; - timer *t; - int rv, timeout; - - btime loop_begin = current_time(); - - birdloop_enter(loop); - while (1) - { - timers_fire(&loop->time, 0); - if (ev_run_list(&loop->event_list)) - timeout = 0; - else if (t = timers_first(&loop->time)) - timeout = (tm_remains(t) TO_MS) + 1; - else - timeout = -1; - - if (loop->poll_changed) - sockets_prepare(loop); - - btime duration = current_time_update() - loop_begin; - if (duration > config->watchdog_warning) - log(L_WARN "I/O loop cycle took %d ms", (int) (duration TO_MS)); - - birdloop_leave(loop); - - try: - rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout); - if (rv < 0) - { - if (errno == EINTR || errno == EAGAIN) - goto try; - die("poll: %m"); - } - - birdloop_enter(loop); - - if (loop->stopped && !loop->links) - break; - - loop_begin = current_time_update(); - - if (rv) - sockets_fire(loop); - - atomic_exchange_explicit(&loop->ping_sent, 0, memory_order_acq_rel); - } - - /* Flush remaining events */ - ASSERT_DIE(!ev_run_list(&loop->event_list)); - - /* No timers allowed */ - ASSERT_DIE(timers_count(&loop->time) == 0); - ASSERT_DIE(EMPTY_LIST(loop->sock_list)); - ASSERT_DIE(loop->sock_num == 0); - - birdloop_leave(loop); - - /* Lock parent loop */ - pool *parent = loop->parent; - birdloop_enter(parent->loop); - - /* Move the loop temporarily to parent pool */ - birdloop_enter(loop); - rmove(&loop->r, parent); - birdloop_leave(loop); - - /* Announce loop stop */ - loop->stopped(loop->stop_data); - - /* Free the pool and loop */ - birdloop_enter(loop); - rp_free(loop->pool, parent); - flush_pages(loop); - birdloop_leave(loop); - rfree(&loop->r); - - /* And finally leave the parent loop before finishing */ - birdloop_leave(parent->loop); + usleep(100); } diff --git a/sysdep/unix/io-loop.h b/sysdep/unix/io-loop.h index 1727637a..2b0b7ebf 100644 --- a/sysdep/unix/io-loop.h +++ b/sysdep/unix/io-loop.h @@ -7,52 +7,88 @@ #ifndef _BIRD_SYSDEP_UNIX_IO_LOOP_H_ #define _BIRD_SYSDEP_UNIX_IO_LOOP_H_ -#include "nest/bird.h" +#include "lib/rcu.h" -#include "lib/lists.h" -#include "lib/event.h" -#include "lib/timer.h" +#include <pthread.h> -struct free_pages +struct pipe { - list list; /* List of empty pages */ - event *cleanup; /* Event to call when number of pages is outside bounds */ - u16 min, max; /* Minimal and maximal number of free pages kept */ - uint cnt; /* Number of empty pages */ + int fd[2]; }; +struct pfd { + BUFFER(struct pollfd) pfd; + BUFFER(struct birdloop *) loop; +}; + +void sockets_prepare(struct birdloop *, struct pfd *); +void socket_changed(struct birdsock *); + +void pipe_new(struct pipe *); +void pipe_pollin(struct pipe *, struct pfd *); +void pipe_drain(struct pipe *); +void pipe_kick(struct pipe *); + struct birdloop { - resource r; + node n; + + event event; + timer timer; pool *pool; - pool *parent; struct timeloop time; event_list event_list; list sock_list; - uint sock_num; + struct birdsock *sock_active; + int sock_num; + uint sock_changed; - BUFFER(sock *) poll_sk; - BUFFER(struct pollfd) poll_fd; - u8 poll_changed; + uint ping_pending; - _Atomic u32 ping_sent; - int wakeup_fds[2]; - - uint links; - - struct free_pages pages; + _Atomic u32 thread_transition; +#define LTT_PING 1 +#define LTT_MOVE 2 + _Atomic u32 flags; + struct birdloop_flag_handler *flag_handler; void (*stopped)(void *data); void *stop_data; struct birdloop *prev_loop; + + struct bird_thread *thread; + + u64 total_time_spent_ns; }; -extern _Thread_local struct birdloop *birdloop_current; +struct bird_thread +{ + node n; -void init_pages(struct birdloop *loop); -void flush_pages(struct birdloop *loop); + struct pipe wakeup; + event_list priority_events; + + struct birdloop *meta; + + pthread_t thread_id; + pthread_attr_t thread_attr; + + struct rcu_thread rcu; + + list loops; + struct birdloop_pickup_group *group; + pool *pool; + struct pfd *pfd; + + event cleanup_event; + + int sock_changed; + uint loop_count; + + u64 max_latency_ns; + u64 max_loop_time_ns; +}; #endif diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index 0e5adc14..88d187a4 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -76,7 +76,7 @@ rf_free(resource *r) } static void -rf_dump(resource *r) +rf_dump(resource *r, unsigned indent UNUSED) { struct rfile *a = (struct rfile *) r; @@ -140,7 +140,7 @@ times_update(void) if ((ts.tv_sec < 0) || (((u64) ts.tv_sec) > ((u64) 1 << 40))) log(L_WARN "Monotonic clock is crazy"); - + btime new_time = ts.tv_sec S + ts.tv_nsec NS; if (new_time < old_time) @@ -722,11 +722,7 @@ sk_log_error(sock *s, const char *p) * Actual struct birdsock code */ -static list sock_list; -static struct birdsock *current_sock; -static struct birdsock *stored_sock; - -static inline sock * +sock * sk_next(sock *s) { if (!s->n.next->next) @@ -774,8 +770,7 @@ sk_ssh_free(sock *s) if (ssh->channel) { - if (ssh_channel_is_open(ssh->channel)) - ssh_channel_close(ssh->channel); + ssh_channel_close(ssh->channel); ssh_channel_free(ssh->channel); ssh->channel = NULL; } @@ -789,12 +784,12 @@ sk_ssh_free(sock *s) } #endif + static void sk_free(resource *r) { sock *s = (sock *) r; - ASSERT_DIE(!s->loop || birdloop_inside(s->loop)); sk_free_bufs(s); #ifdef HAVE_LIBSSH @@ -802,30 +797,10 @@ sk_free(resource *r) sk_ssh_free(s); #endif - if (s->cork) - { - LOCK_DOMAIN(cork, s->cork->lock); - if (enlisted(&s->cork_node)) - rem_node(&s->cork_node); - UNLOCK_DOMAIN(cork, s->cork->lock); - } - - if (!s->loop) - ; - else if (s->flags & SKF_THREAD) - sk_stop(s); - else - { - if (s == current_sock) - current_sock = sk_next(s); - if (s == stored_sock) - stored_sock = sk_next(s); - - if (enlisted(&s->n)) - rem_node(&s->n); - } + if (s->loop) + birdloop_remove_socket(s->loop, s); - if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE && s->fd != -1) + if (s->fd >= 0 && s->type != SK_SSH && s->type != SK_SSH_ACTIVE) close(s->fd); s->fd = -1; @@ -876,7 +851,7 @@ sk_reallocate(sock *s) } static void -sk_dump(resource *r) +sk_dump(resource *r, unsigned indent UNUSED) { sock *s = (sock *) r; static char *sk_type_names[] = { "TCP<", "TCP>", "TCP", "UDP", NULL, "IP", NULL, "MAGIC", "UNIX<", "UNIX", "SSH>", "SSH", "DEL!" }; @@ -1038,12 +1013,6 @@ sk_setup(sock *s) } static void -sk_insert(sock *s) -{ - add_tail(&sock_list, &s->n); -} - -static void sk_tcp_connected(sock *s) { sockaddr sa; @@ -1116,14 +1085,7 @@ sk_passive_connected(sock *s, int type) return 1; } - if (s->flags & SKF_PASSIVE_THREAD) - t->flags |= SKF_THREAD; - else - { - ASSERT_DIE(s->loop == &main_birdloop); - t->loop = &main_birdloop; - sk_insert(t); - } + birdloop_add_socket(s->loop, t); sk_alloc_bufs(t); s->rx_hook(t, 0); @@ -1169,34 +1131,45 @@ sk_ssh_connect(sock *s) { int server_identity_is_ok = 1; +#ifdef HAVE_SSH_OLD_SERVER_VALIDATION_API +#define ssh_session_is_known_server ssh_is_server_known +#define SSH_KNOWN_HOSTS_OK SSH_SERVER_KNOWN_OK +#define SSH_KNOWN_HOSTS_UNKNOWN SSH_SERVER_NOT_KNOWN +#define SSH_KNOWN_HOSTS_CHANGED SSH_SERVER_KNOWN_CHANGED +#define SSH_KNOWN_HOSTS_NOT_FOUND SSH_SERVER_FILE_NOT_FOUND +#define SSH_KNOWN_HOSTS_ERROR SSH_SERVER_ERROR +#define SSH_KNOWN_HOSTS_OTHER SSH_SERVER_FOUND_OTHER +#endif + /* Check server identity */ - switch (ssh_is_server_known(s->ssh->session)) + switch (ssh_session_is_known_server(s->ssh->session)) { #define LOG_WARN_ABOUT_SSH_SERVER_VALIDATION(s,msg,args...) log(L_WARN "SSH Identity %s@%s:%u: " msg, (s)->ssh->username, (s)->host, (s)->dport, ## args); - case SSH_SERVER_KNOWN_OK: + case SSH_KNOWN_HOSTS_OK: /* The server is known and has not changed. */ break; - case SSH_SERVER_NOT_KNOWN: + case SSH_KNOWN_HOSTS_UNKNOWN: LOG_WARN_ABOUT_SSH_SERVER_VALIDATION(s, "The server is unknown, its public key was not found in the known host file %s", s->ssh->server_hostkey_path); + server_identity_is_ok = 0; break; - case SSH_SERVER_KNOWN_CHANGED: + case SSH_KNOWN_HOSTS_CHANGED: LOG_WARN_ABOUT_SSH_SERVER_VALIDATION(s, "The server key has changed. Either you are under attack or the administrator changed the key."); server_identity_is_ok = 0; break; - case SSH_SERVER_FILE_NOT_FOUND: + case SSH_KNOWN_HOSTS_NOT_FOUND: LOG_WARN_ABOUT_SSH_SERVER_VALIDATION(s, "The known host file %s does not exist", s->ssh->server_hostkey_path); server_identity_is_ok = 0; break; - case SSH_SERVER_ERROR: + case SSH_KNOWN_HOSTS_ERROR: LOG_WARN_ABOUT_SSH_SERVER_VALIDATION(s, "Some error happened"); server_identity_is_ok = 0; break; - case SSH_SERVER_FOUND_OTHER: + case SSH_KNOWN_HOSTS_OTHER: LOG_WARN_ABOUT_SSH_SERVER_VALIDATION(s, "The server gave use a key of a type while we had an other type recorded. " \ "It is a possible attack."); server_identity_is_ok = 0; @@ -1327,6 +1300,7 @@ sk_open_ssh(sock *s) /** * sk_open - open a socket + * @loop: loop * @s: socket * * This function takes a socket resource created by sk_new() and @@ -1336,7 +1310,7 @@ sk_open_ssh(sock *s) * Result: 0 for success, -1 for an error. */ int -sk_open(sock *s) +sk_open(sock *s, struct birdloop *loop) { int af = AF_UNSPEC; int fd = -1; @@ -1345,17 +1319,6 @@ sk_open(sock *s) ip_addr bind_addr = IPA_NONE; sockaddr sa; - if (s->flags & SKF_THREAD) - { - ASSERT_DIE(s->loop && (s->loop != &main_birdloop)); - ASSERT_DIE(birdloop_inside(s->loop)); - } - else - { - ASSERT_DIE(!s->loop); - s->loop = &main_birdloop; - } - if (s->type <= SK_IP) { /* @@ -1500,9 +1463,7 @@ sk_open(sock *s) sk_alloc_bufs(s); } - if (!(s->flags & SKF_THREAD)) - sk_insert(s); - + birdloop_add_socket(loop, s); return 0; err: @@ -1512,7 +1473,7 @@ err: } int -sk_open_unix(sock *s, char *name) +sk_open_unix(sock *s, struct birdloop *loop, char *name) { struct sockaddr_un sa; int fd; @@ -1539,41 +1500,10 @@ sk_open_unix(sock *s, char *name) return -1; s->fd = fd; - s->loop = &main_birdloop; - sk_insert(s); + birdloop_add_socket(loop, s); return 0; } -static void -sk_reloop_hook(void *_vs) -{ - sock *s = _vs; - if (birdloop_inside(&main_birdloop)) - { - s->flags &= ~SKF_THREAD; - sk_insert(s); - } - else - { - s->flags |= SKF_THREAD; - sk_start(s); - } -} - -void -sk_reloop(sock *s, struct birdloop *loop) -{ - if (enlisted(&s->n)) - rem_node(&s->n); - - s->reloop = (event) { - .hook = sk_reloop_hook, - .data = s, - }; - - ev_send_loop(loop, &s->reloop); -} - #define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \ CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL) @@ -1696,6 +1626,13 @@ sk_recvmsg(sock *s) static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; } +_Bool +sk_tx_pending(sock *s) +{ + return s->ttx != s->tpos; +} + + static int sk_maybe_write(sock *s) { @@ -1706,7 +1643,7 @@ sk_maybe_write(sock *s) case SK_TCP: case SK_MAGIC: case SK_UNIX: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = write(s->fd, s->ttx, s->tpos - s->ttx); @@ -1719,7 +1656,6 @@ sk_maybe_write(sock *s) s->err_hook(s, (errno != EPIPE) ? errno : 0); return -1; } - sk_ping(s); return 0; } s->ttx += e; @@ -1729,7 +1665,7 @@ sk_maybe_write(sock *s) #ifdef HAVE_LIBSSH case SK_SSH: - while (s->ttx != s->tpos) + while (sk_tx_pending(s)) { e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx); @@ -1812,7 +1748,12 @@ sk_send(sock *s, unsigned len) { s->ttx = s->tbuf; s->tpos = s->tbuf + len; - return sk_maybe_write(s); + + int e = sk_maybe_write(s); + if (e == 0) /* Trigger thread poll reload to poll this socket's write. */ + socket_changed(s); + + return e; } /** @@ -1859,7 +1800,7 @@ call_rx_hook(sock *s, int size) if (s->rx_hook(s, size)) { /* We need to be careful since the socket could have been deleted by the hook */ - if (current_sock == s) + if (s->loop->sock_active == s) s->rpos = s->rbuf; } } @@ -1913,8 +1854,8 @@ sk_read_ssh(sock *s) /* sk_read() and sk_write() are called from BFD's event loop */ -int -sk_read(sock *s, int revents) +static inline int +sk_read_noflush(sock *s, int revents) { switch (s->type) { @@ -1927,25 +1868,6 @@ sk_read(sock *s, int revents) case SK_TCP: case SK_UNIX: { - if (s->cork) - { - int cont = 0; - LOCK_DOMAIN(cork, s->cork->lock); - if (!enlisted(&s->cork_node)) - if (s->cork->count) - { -// log(L_TRACE "Socket %p corked", s); - add_tail(&s->cork->sockets, &s->cork_node); - sk_ping(s); - } - else - cont = 1; - UNLOCK_DOMAIN(cork, s->cork->lock); - - if (!cont) - return 0; - } - int c = read(s->fd, s->rpos, s->rbuf + s->rbsize - s->rpos); if (c < 0) @@ -1996,7 +1918,15 @@ sk_read(sock *s, int revents) } int -sk_write(sock *s) +sk_read(sock *s, int revents) +{ + int e = sk_read_noflush(s, revents); + tmp_flush(); + return e; +} + +static inline int +sk_write_noflush(sock *s) { switch (s->type) { @@ -2034,7 +1964,7 @@ sk_write(sock *s) #endif default: - if (s->ttx != s->tpos && sk_maybe_write(s) > 0) + if (sk_tx_pending(s) && sk_maybe_write(s) > 0) { if (s->tx_hook) s->tx_hook(s); @@ -2044,6 +1974,14 @@ sk_write(sock *s) } } +int +sk_write(sock *s) +{ + int e = sk_write_noflush(s); + tmp_flush(); + return e; +} + int sk_is_ipv4(sock *s) { return s->af == AF_INET; } @@ -2062,6 +2000,7 @@ sk_err(sock *s, int revents) } s->err_hook(s, se); + tmp_flush(); } void @@ -2071,11 +2010,11 @@ sk_dump_all(void) sock *s; debug("Open sockets:\n"); - WALK_LIST(n, sock_list) + WALK_LIST(n, main_birdloop.sock_list) { s = SKIP_BACK(sock, n, n); debug("%p ", s); - sk_dump(&s->r); + sk_dump(&s->r, 3); } debug("\n"); } @@ -2104,15 +2043,15 @@ static btime loop_time; static void io_update_time(void) { - last_io_time = current_time_update(); + last_io_time = current_time(); if (event_open) { event_open->duration = last_io_time - event_open->timestamp; if (event_open->duration > config->latency_limit) - log(L_WARN "Event 0x%p 0x%p took %d ms", - event_open->hook, event_open->data, (int) (event_open->duration TO_MS)); + log(L_WARN "Event 0x%p 0x%p took %u.%03u ms", + event_open->hook, event_open->data, (uint) (event_open->duration TO_MS), (uint) (event_open->duration % 1000)); event_open = NULL; } @@ -2176,6 +2115,8 @@ watchdog_sigalrm(int sig UNUSED) config->latency_limit = 0xffffffff; io_update_time(); + debug_safe("Watchdog timer timed out\n"); + /* We want core dump */ abort(); } @@ -2216,8 +2157,8 @@ watchdog_stop(void) btime duration = last_io_time - loop_time; if (duration > config->watchdog_warning) - log(L_WARN "I/O loop cycle took %d ms for %d events", - (int) (duration TO_MS), event_log_num); + log(L_WARN "I/O loop cycle took %u.%03u ms for %d events", + (uint) (duration TO_MS), (uint) (duration % 1000), event_log_num); } @@ -2228,7 +2169,7 @@ watchdog_stop(void) void io_init(void) { - init_list(&sock_list); + init_list(&main_birdloop.sock_list); ev_init_list(&global_event_list, &main_birdloop, "Global event list"); ev_init_list(&global_work_list, &main_birdloop, "Global work list"); ev_init_list(&main_birdloop.event_list, &main_birdloop, "Global fast event list"); @@ -2245,20 +2186,17 @@ static int short_loops = 0; #define SHORT_LOOP_MAX 10 #define WORK_EVENTS_MAX 10 -void pipe_drain(int fd); -void check_stored_pages(void); +sock *stored_sock; void io_loop(void) { int poll_tout, timeout; - int nfds, events, pout; - int reload_requested = 0; + int events, pout; timer *t; - sock *s; - node *n; - int fdmax = 256; - struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd)); + struct pfd pfd; + BUFFER_INIT(pfd.pfd, &root_pool, 16); + BUFFER_INIT(pfd.loop, &root_pool, 16); watchdog_start1(); for(;;) @@ -2270,14 +2208,8 @@ io_loop(void) timers_fire(&main_birdloop.time, 1); io_close_event(); -#if DEBUGGING -#define PERIODIC_WAKEUP 86400000 -#else -#define PERIODIC_WAKEUP 3000 -#endif -restart_poll: // FIXME - poll_tout = ((reload_requested || events) ? 0 : PERIODIC_WAKEUP); /* Time in milliseconds */ + poll_tout = (events ? 0 : 3000); /* Time in milliseconds */ if (t = timers_first(&main_birdloop.time)) { times_update(); @@ -2285,40 +2217,11 @@ restart_poll: poll_tout = MIN(poll_tout, timeout); } - /* A hack to reload main io_loop() when something has changed asynchronously. */ - pfd[0].fd = main_birdloop.wakeup_fds[0]; - pfd[0].events = POLLIN; - - nfds = 1; - - WALK_LIST(n, sock_list) - { - pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */ - s = SKIP_BACK(sock, n, n); - if (s->rx_hook && !ev_corked(s->cork)) - { - pfd[nfds].fd = s->fd; - pfd[nfds].events |= POLLIN; - } - if (s->tx_hook && s->ttx != s->tpos) - { - pfd[nfds].fd = s->fd; - pfd[nfds].events |= POLLOUT; - } - if (pfd[nfds].fd != -1) - { - s->index = nfds; - nfds++; - } - else - s->index = -1; + BUFFER_FLUSH(pfd.pfd); + BUFFER_FLUSH(pfd.loop); - if (nfds >= fdmax) - { - fdmax *= 2; - pfd = xrealloc(pfd, fdmax * sizeof(struct pollfd)); - } - } + pipe_pollin(&main_birdloop.thread->wakeup, &pfd); + sockets_prepare(&main_birdloop, &pfd); /* * Yes, this is racy. But even if the signal comes before this test @@ -2350,7 +2253,7 @@ restart_poll: /* And finally enter poll() to find active sockets */ watchdog_stop(); birdloop_leave(&main_birdloop); - pout = poll(pfd, nfds, poll_tout); + pout = poll(pfd.pfd.data, pfd.pfd.used, poll_tout); birdloop_enter(&main_birdloop); watchdog_start(); @@ -2358,111 +2261,99 @@ restart_poll: { if (errno == EINTR || errno == EAGAIN) continue; - die("poll: %m"); + bug("poll: %m"); } - - if (pout && (pfd[0].revents & POLLIN)) - { - /* IO loop reload requested */ - pipe_drain(main_birdloop.wakeup_fds[0]); - reload_requested = 1; - goto restart_poll; - } - - if (reload_requested) - { - reload_requested = 0; - atomic_exchange_explicit(&main_birdloop.ping_sent, 0, memory_order_acq_rel); - } - if (pout) { + if (pfd.pfd.data[0].revents & POLLIN) + { + /* IO loop reload requested */ + pipe_drain(&main_birdloop.thread->wakeup); + atomic_fetch_and_explicit(&main_birdloop.thread_transition, ~LTT_PING, memory_order_acq_rel); + continue; + } + times_update(); /* guaranteed to be non-empty */ - current_sock = SKIP_BACK(sock, n, HEAD(sock_list)); + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock) + while (main_birdloop.sock_active) + { + sock *s = main_birdloop.sock_active; + if (s->index != -1) { - sock *s = current_sock; - if (s->index == -1) - { - current_sock = sk_next(s); - goto next; - } - int e; int steps; steps = MAX_STEPS; - if (s->fast_rx && (pfd[s->index].revents & POLLIN) && s->rx_hook) + if (s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook) do { steps--; io_log_event(s->rx_hook, s->data); - e = sk_read(s, pfd[s->index].revents); - if (s != current_sock) - goto next; + e = sk_read(s, pfd.pfd.data[s->index].revents); } - while (e && s->rx_hook && steps); + while (e && (main_birdloop.sock_active == s) && s->rx_hook && steps); + + if (s != main_birdloop.sock_active) + continue; steps = MAX_STEPS; - if (pfd[s->index].revents & POLLOUT) + if (pfd.pfd.data[s->index].revents & POLLOUT) do { steps--; io_log_event(s->tx_hook, s->data); e = sk_write(s); - if (s != current_sock) - goto next; } - while (e && steps); + while (e && (main_birdloop.sock_active == s) && steps); - current_sock = sk_next(s); - next: ; + if (s != main_birdloop.sock_active) + continue; } + main_birdloop.sock_active = sk_next(s); + } + short_loops++; if (events && (short_loops < SHORT_LOOP_MAX)) continue; short_loops = 0; int count = 0; - current_sock = stored_sock; - if (current_sock == NULL) - current_sock = SKIP_BACK(sock, n, HEAD(sock_list)); + main_birdloop.sock_active = stored_sock; + if (main_birdloop.sock_active == NULL) + main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list)); - while (current_sock && count < MAX_RX_STEPS) + while (main_birdloop.sock_active && count < MAX_RX_STEPS) { - sock *s = current_sock; + sock *s = main_birdloop.sock_active; if (s->index == -1) - { - current_sock = sk_next(s); - goto next2; - } + goto next2; - if (!s->fast_rx && (pfd[s->index].revents & POLLIN) && s->rx_hook) + if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook) { count++; io_log_event(s->rx_hook, s->data); - sk_read(s, pfd[s->index].revents); - if (s != current_sock) - goto next2; + sk_read(s, pfd.pfd.data[s->index].revents); + if (s != main_birdloop.sock_active) + continue; } - if (pfd[s->index].revents & (POLLHUP | POLLERR)) + if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR)) { - sk_err(s, pfd[s->index].revents); - if (s != current_sock) - goto next2; + sk_err(s, pfd.pfd.data[s->index].revents); + if (s != main_birdloop.sock_active) + continue; } - current_sock = sk_next(s); next2: ; + main_birdloop.sock_active = sk_next(s); } - stored_sock = current_sock; + stored_sock = main_birdloop.sock_active; } } } diff --git a/sysdep/unix/krt.Y b/sysdep/unix/krt.Y index 95b54d65..4ce9a328 100644 --- a/sysdep/unix/krt.Y +++ b/sysdep/unix/krt.Y @@ -29,7 +29,7 @@ kif_set_preferred(ip_addr ip) CF_DECLS -CF_KEYWORDS(KERNEL, PERSIST, SCAN, TIME, LEARN, DEVICE, ROUTES, GRACEFUL, RESTART, KRT_SOURCE, KRT_METRIC, MERGE, PATHS) +CF_KEYWORDS(KERNEL, PERSIST, SCAN, TIME, LEARN, DEVICE, ROUTES, GRACEFUL, RESTART, MERGE, PATHS) CF_KEYWORDS(INTERFACE, PREFERRED) %type <i> kern_mp_limit @@ -122,9 +122,6 @@ kif_iface: kif_iface_start iface_patt_list_nopx kif_iface_opt_list; -dynamic_attr: KRT_SOURCE { $$ = f_new_dynamic_attr(EAF_TYPE_INT, T_INT, EA_KRT_SOURCE); } ; -dynamic_attr: KRT_METRIC { $$ = f_new_dynamic_attr(EAF_TYPE_INT, T_INT, EA_KRT_METRIC); } ; - CF_CODE CF_END diff --git a/sysdep/unix/krt.c b/sysdep/unix/krt.c index 0cb86213..9e6ddb45 100644 --- a/sysdep/unix/krt.c +++ b/sysdep/unix/krt.c @@ -53,7 +53,7 @@ #include "nest/bird.h" #include "nest/iface.h" -#include "nest/route.h" +#include "nest/rt.h" #include "nest/protocol.h" #include "filter/filter.h" #include "conf/conf.h" @@ -74,7 +74,7 @@ static list krt_proto_list; void krt_io_init(void) { - krt_pool = rp_new(&root_pool, &main_birdloop, "Kernel Syncer"); + krt_pool = rp_new(&root_pool, "Kernel Syncer"); krt_filter_lp = lp_new_default(krt_pool); init_list(&krt_proto_list); krt_sys_io_init(); @@ -163,6 +163,15 @@ kif_shutdown(struct proto *P) return PS_DOWN; } +static void +kif_cleanup(struct proto *p) +{ + if (p->debug & D_EVENTS) + log(L_TRACE "%s: Flushing interfaces", p->name); + if_start_update(); + if_end_update(); +} + static int kif_reconfigure(struct proto *p, struct proto_config *new) { @@ -232,17 +241,24 @@ kif_copy_config(struct proto_config *dest, struct proto_config *src) struct protocol proto_unix_iface = { .name = "Device", .template = "device%d", - .class = PROTOCOL_DEVICE, .proto_size = sizeof(struct kif_proto), .config_size = sizeof(struct kif_config), .preconfig = kif_preconfig, .init = kif_init, .start = kif_start, .shutdown = kif_shutdown, + .cleanup = kif_cleanup, .reconfigure = kif_reconfigure, .copy_config = kif_copy_config }; +void +kif_build(void) +{ + proto_build(&proto_unix_iface); +} + + /* * Tracing of routes */ @@ -280,30 +296,40 @@ static struct tbf rl_alien = TBF_DEFAULT_LOG_LIMITS; static inline u32 krt_metric(rte *a) { - eattr *ea = ea_find(a->attrs->eattrs, EA_KRT_METRIC); + eattr *ea = ea_find(a->attrs, &ea_krt_metric); return ea ? ea->u.data : 0; } -static inline int -krt_rte_better(rte *a, rte *b) +static void +krt_learn_alien_attr(struct channel *c, rte *e) { - return (krt_metric(a) > krt_metric(b)); + ea_set_attr_u32(&e->attrs, &ea_gen_preference, 0, c->preference); } /* Called when alien route is discovered during scan */ static void -krt_learn_rte(struct krt_proto *p, rte *e) +krt_learn_scan(struct krt_proto *p, rte *e) { - struct rte_src *src = e->src = rt_get_source(&p->p, krt_metric(e)); - rte_update(p->p.main_channel, e->net, e, e->src); - rt_unlock_source(src); + rte e0 = { + .attrs = e->attrs, + .src = rt_get_source(&p->p, krt_metric(e)), + }; + + krt_learn_alien_attr(p->p.main_channel, &e0); + + rte_update(p->p.main_channel, e->net, &e0, e0.src); + rt_unlock_source(e0.src); } static void -krt_learn_init(struct krt_proto *p) +krt_learn_async(struct krt_proto *p, rte *e, int new) { - if (KRT_CF->learn) - channel_setup_in_table(p->p.main_channel, 1); + if (new) + return krt_learn_scan(p, e); + + struct rte_src *src = rt_get_source(&p->p, krt_metric(e)); + rte_update(p->p.main_channel, e->net, NULL, src); + rt_unlock_source(src); } #endif @@ -323,17 +349,17 @@ rte_feed_count(net *n) { uint count = 0; for (struct rte_storage *e = n->routes; e; e = e->next) - if (rte_is_valid(RTES_OR_NULL(e))) + if (rte_is_valid(RTE_OR_NULL(e))) count++; return count; } static void -rte_feed_obtain(net *n, rte **feed, uint count) +rte_feed_obtain(net *n, const rte **feed, uint count) { uint i = 0; for (struct rte_storage *e = n->routes; e; e = e->next) - if (rte_is_valid(RTES_OR_NULL(e))) + if (rte_is_valid(RTE_OR_NULL(e))) { ASSERT_DIE(i < count); feed[i++] = &e->rte; @@ -344,6 +370,13 @@ rte_feed_obtain(net *n, rte **feed, uint count) static struct rte * krt_export_net(struct krt_proto *p, net *net) { + /* FIXME: Here we are calling filters in table-locked context when exporting + * to kernel. Here BIRD can crash if the user requested ROA check in kernel + * export filter. It doesn't make much sense to write the filters like this, + * therefore we may keep this unfinished piece of work here for later as it + * won't really affect anybody. */ + ASSERT_DIE(RT_IS_LOCKED(p->p.main_channel->table)); + struct channel *c = p->p.main_channel; const struct filter *filter = c->out_filter; @@ -353,7 +386,7 @@ krt_export_net(struct krt_proto *p, net *net) if (!count) return NULL; - rte **feed = alloca(count * sizeof(rte *)); + const rte **feed = alloca(count * sizeof(rte *)); rte_feed_obtain(net, feed, count); return rt_export_merged(c, feed, count, krt_filter_lp, 1); } @@ -372,7 +405,7 @@ krt_export_net(struct krt_proto *p, net *net) if (filter == FILTER_ACCEPT) goto accept; - if (f_run(filter, &rt, krt_filter_lp, FF_SILENT) > F_ACCEPT) + if (f_run(filter, &rt, FF_SILENT) > F_ACCEPT) goto reject; @@ -386,15 +419,12 @@ reject: static int krt_same_dest(rte *k, rte *e) { - rta *ka = k->attrs, *ea = e->attrs; + ea_list *ka = k->attrs, *ea = e->attrs; - if (ka->dest != ea->dest) - return 0; + eattr *nhea_k = ea_find(ka, &ea_gen_nexthop); + eattr *nhea_e = ea_find(ea, &ea_gen_nexthop); - if (ka->dest == RTD_UNICAST) - return nexthop_same(&(ka->nh), &(ea->nh)); - - return 1; + return (!nhea_k == !nhea_e) && adata_same(nhea_k->u.ptr, nhea_e->u.ptr); } /* @@ -412,22 +442,28 @@ krt_got_route(struct krt_proto *p, rte *e, s8 src) switch (src) { case KRT_SRC_KERNEL: - goto ignore; + krt_trace_in(p, e, "ignored"); + return; case KRT_SRC_REDIRECT: - goto delete; + krt_trace_in(p, e, "deleting"); + krt_replace_rte(p, e->net, NULL, e); + return; case KRT_SRC_ALIEN: if (KRT_CF->learn) - krt_learn_rte(p, e); + krt_learn_scan(p, e); else krt_trace_in_rl(&rl_alien, p, e, "[alien] ignored"); return; } #endif + /* The rest is for KRT_SRC_BIRD (or KRT_SRC_UNKNOWN) */ - RT_LOCK(p->p.main_channel->table); + RT_LOCKED(p->p.main_channel->table, tab) + { + /* Deleting all routes if flush is requested */ if (p->flush_routes) goto delete; @@ -436,7 +472,7 @@ krt_got_route(struct krt_proto *p, rte *e, s8 src) if (!p->ready) goto ignore; - net *net = net_find(RT_PRIV(p->p.main_channel->table), e->net); + net *net = net_find(tab, e->net); if (!net || !krt_is_installed(p, net)) goto delete; @@ -481,8 +517,9 @@ delete: krt_replace_rte(p, e->net, NULL, e); goto done; -done: - RT_UNLOCK(p->p.main_channel->table); +done:; + } + lp_flush(krt_filter_lp); } @@ -490,18 +527,13 @@ static void krt_init_scan(struct krt_proto *p) { bmap_reset(&p->seen_map, 1024); - -#ifdef KRT_ALLOW_LEARN - if (KRT_CF->learn) - channel_refresh_begin(p->p.main_channel); -#endif } static void krt_prune(struct krt_proto *p) { - RT_LOCK(p->p.main_channel->table); - rtable_private *t = RT_PRIV(p->p.main_channel->table); + RT_LOCKED(p->p.main_channel->table, t) + { KRT_TRACE(p, D_EVENTS, "Pruning table %s", t->name); FIB_WALK(&t->fib, net, n) @@ -521,15 +553,10 @@ krt_prune(struct krt_proto *p) } FIB_WALK_END; - RT_UNLOCK(p->p.main_channel->table); - -#ifdef KRT_ALLOW_LEARN - if (KRT_CF->learn) - channel_refresh_end(p->p.main_channel); -#endif - if (p->ready) p->initialized = 1; + + } } static void @@ -567,25 +594,24 @@ krt_got_route_async(struct krt_proto *p, rte *e, int new, s8 src) case KRT_SRC_ALIEN: if (KRT_CF->learn) { - krt_learn_rte(p, e); + krt_learn_async(p, e, new); return; } #endif } } + /* * Periodic scanning */ - -#ifdef CONFIG_ALL_TABLES_AT_ONCE - -static timer *krt_scan_timer; -static int krt_scan_count; +static timer *krt_scan_all_timer; +static int krt_scan_all_count; +static _Bool krt_scan_all_tables; static void -krt_scan(timer *t UNUSED) +krt_scan_all(timer *t UNUSED) { struct krt_proto *p; node *n; @@ -606,35 +632,42 @@ krt_scan(timer *t UNUSED) } static void -krt_scan_timer_start(struct krt_proto *p) +krt_scan_all_timer_start(struct krt_proto *p) { - if (!krt_scan_count) - krt_scan_timer = tm_new_init(krt_pool, krt_scan, NULL, KRT_CF->scan_time, 0); + if (!krt_scan_all_count) + krt_scan_all_timer = tm_new_init(krt_pool, krt_scan_all, NULL, KRT_CF->scan_time, 0); - krt_scan_count++; + krt_scan_all_count++; - tm_start(krt_scan_timer, 1 S); + tm_start(krt_scan_all_timer, 1 S); } static void -krt_scan_timer_stop(struct krt_proto *p UNUSED) +krt_scan_all_timer_stop(void) { - krt_scan_count--; + ASSERT(krt_scan_all_count > 0); + + krt_scan_all_count--; - if (!krt_scan_count) + if (!krt_scan_all_count) { - rfree(krt_scan_timer); - krt_scan_timer = NULL; + rfree(krt_scan_all_timer); + krt_scan_all_timer = NULL; } } static void -krt_scan_timer_kick(struct krt_proto *p UNUSED) +krt_scan_all_timer_kick(void) { - tm_start(krt_scan_timer, 0); + tm_start(krt_scan_all_timer, 0); +} + +void +krt_use_shared_scan(void) +{ + krt_scan_all_tables = 1; } -#else static void krt_scan(timer *t) @@ -652,37 +685,44 @@ krt_scan(timer *t) static void krt_scan_timer_start(struct krt_proto *p) { - p->scan_timer = tm_new_init(p->p.pool, krt_scan, p, KRT_CF->scan_time, 0); - tm_start(p->scan_timer, 1 S); + if (krt_scan_all_tables) + krt_scan_all_timer_start(p); + else + { + p->scan_timer = tm_new_init(p->p.pool, krt_scan, p, KRT_CF->scan_time, 0); + tm_start(p->scan_timer, 1 S); + } } static void krt_scan_timer_stop(struct krt_proto *p) { - tm_stop(p->scan_timer); + if (krt_scan_all_tables) + krt_scan_all_timer_stop(); + else + tm_stop(p->scan_timer); } static void krt_scan_timer_kick(struct krt_proto *p) { - tm_start(p->scan_timer, 0); + if (krt_scan_all_tables) + krt_scan_all_timer_kick(); + else + tm_start(p->scan_timer, 0); } -#endif - - - /* * Updates */ static int -krt_preexport(struct channel *c, rte *e) +krt_preexport(struct channel *C, rte *e) { - if (e->src->owner == &c->proto->sources) + if (e->src->owner == &C->proto->sources) #ifdef CONFIG_SINGLE_ROUTE - return 1; /* Passing the route directly for rt_notify() to ignore */ + return 1; #else return -1; #endif @@ -703,11 +743,8 @@ krt_rt_notify(struct proto *P, struct channel *ch UNUSED, const net_addr *net, return; #ifdef CONFIG_SINGLE_ROUTE - /* - * When the imported kernel route becomes the best one, we get it directly and - * we simply know that it is already there. Nothing to do. - */ - if (new->src->owner == &P->sources) + /* Got the same route as we imported. Keep it, do nothing. */ + if (new && new->src->owner == &P->sources) return; #endif @@ -755,6 +792,14 @@ krt_feed_end(struct channel *C) krt_scan_timer_kick(p); } +static int +krt_rte_better(const rte *new, const rte *old) +{ + u32 n = ea_get_int(new->attrs, &ea_krt_metric, IGP_METRIC_UNKNOWN); + u32 o = ea_get_int(old->attrs, &ea_krt_metric, IGP_METRIC_UNKNOWN); + + return (n < o); +} /* * Protocol glue @@ -781,11 +826,6 @@ krt_postconfig(struct proto_config *CF) if (! proto_cf_main_channel(CF)) cf_error("Channel not specified"); -#ifdef CONFIG_ALL_TABLES_AT_ONCE - if (krt_cf->scan_time != cf->scan_time) - cf_error("All kernel syncers must use the same table scan interval"); -#endif - struct channel_config *cc = proto_cf_main_channel(CF); struct rtable_config *tab = cc->table; if (tab->krt_attached) @@ -801,6 +841,10 @@ krt_postconfig(struct proto_config *CF) krt_sys_postconfig(cf); } +struct rte_owner_class krt_rte_owner_class = { + .rte_better = krt_rte_better, +}; + static struct proto * krt_init(struct proto_config *CF) { @@ -811,10 +855,11 @@ krt_init(struct proto_config *CF) p->p.preexport = krt_preexport; p->p.rt_notify = krt_rt_notify; - p->p.if_notify = krt_if_notify; + p->p.iface_sub.if_notify = krt_if_notify; p->p.reload_routes = krt_reload_routes; p->p.feed_end = krt_feed_end; - p->p.rte_better = krt_rte_better; + + p->p.sources.class = &krt_rte_owner_class; krt_sys_init(p); return &p->p; @@ -840,10 +885,6 @@ krt_start(struct proto *P) bmap_init(&p->seen_map, p->p.pool, 1024); add_tail(&krt_proto_list, &p->krt_node); -#ifdef KRT_ALLOW_LEARN - krt_learn_init(p); -#endif - if (!krt_sys_start(p)) { rem_node(&p->krt_node); @@ -923,24 +964,15 @@ krt_copy_config(struct proto_config *dest, struct proto_config *src) krt_sys_copy_config(d, s); } -static int -krt_get_attr(const eattr *a, byte *buf, int buflen) -{ - switch (a->id) - { - case EA_KRT_SOURCE: - bsprintf(buf, "source"); - return GA_NAME; - - case EA_KRT_METRIC: - bsprintf(buf, "metric"); - return GA_NAME; - - default: - return krt_sys_get_attr(a, buf, buflen); - } -} +struct ea_class ea_krt_source = { + .name = "krt_source", + .type = T_INT, +}; +struct ea_class ea_krt_metric = { + .name = "krt_metric", + .type = T_INT, +}; #ifdef CONFIG_IP6_SADR_KERNEL #define MAYBE_IP6_SADR NB_IP6_SADR @@ -957,7 +989,6 @@ krt_get_attr(const eattr *a, byte *buf, int buflen) struct protocol proto_unix_kernel = { .name = "Kernel", .template = "kernel%d", - .class = PROTOCOL_KERNEL, .preference = DEF_PREF_INHERITED, .channel_mask = NB_IP | MAYBE_IP6_SADR | MAYBE_MPLS, .proto_size = sizeof(struct krt_proto), @@ -969,5 +1000,15 @@ struct protocol proto_unix_kernel = { .shutdown = krt_shutdown, .reconfigure = krt_reconfigure, .copy_config = krt_copy_config, - .get_attr = krt_get_attr, }; + +void +krt_build(void) +{ + proto_build(&proto_unix_kernel); + + EA_REGISTER_ALL( + &ea_krt_source, + &ea_krt_metric, + ); +} diff --git a/sysdep/unix/krt.h b/sysdep/unix/krt.h index 968c5b16..9f7ebb4f 100644 --- a/sysdep/unix/krt.h +++ b/sysdep/unix/krt.h @@ -21,8 +21,7 @@ struct kif_proto; #define KRT_DEFAULT_ECMP_LIMIT 16 -#define EA_KRT_SOURCE EA_CODE(PROTOCOL_KERNEL, 0) -#define EA_KRT_METRIC EA_CODE(PROTOCOL_KERNEL, 1) +extern struct ea_class ea_krt_source, ea_krt_metric; #define KRT_REF_SEEN 0x1 /* Seen in table */ #define KRT_REF_BEST 0x2 /* Best in table */ @@ -51,10 +50,7 @@ struct krt_proto { struct proto p; struct krt_state sys; /* Sysdep state */ -#ifndef CONFIG_ALL_TABLES_AT_ONCE timer *scan_timer; -#endif - struct bmap sync_map; /* Keeps track which exported routes were successfully written to kernel */ struct bmap seen_map; /* Routes seen during last periodic scan */ node krt_node; /* Node in krt_proto_list */ @@ -76,6 +72,7 @@ extern pool *krt_pool; struct proto_config * kif_init_config(int class); void kif_request_scan(void); +void krt_use_shared_scan(void); void krt_got_route(struct krt_proto *p, struct rte *e, s8 src); void krt_got_route_async(struct krt_proto *p, struct rte *e, int new, s8 src); diff --git a/sysdep/unix/log.c b/sysdep/unix/log.c index 68a04e78..be7f8adf 100644 --- a/sysdep/unix/log.c +++ b/sysdep/unix/log.c @@ -32,14 +32,15 @@ #include "lib/lists.h" #include "sysdep/unix/unix.h" +static int dbg_fd = -1; static FILE *dbgf; static list *current_log_list; static char *current_syslog_name; /* NULL -> syslog closed */ -static _Atomic uint max_coro_id = ATOMIC_VAR_INIT(1); -static _Thread_local uint this_coro_id; +static _Atomic uint max_thread_id = ATOMIC_VAR_INIT(1); +static _Thread_local uint this_thread_id; -#define THIS_CORO_ID (this_coro_id ?: (this_coro_id = atomic_fetch_add_explicit(&max_coro_id, 1, memory_order_acq_rel))) +#define THIS_THREAD_ID (this_thread_id ?: (this_thread_id = atomic_fetch_add_explicit(&max_thread_id, 1, memory_order_acq_rel))) #include <pthread.h> @@ -183,7 +184,7 @@ log_commit(int class, buffer *buf) l->pos += msg_len; } - fprintf(l->fh, "%s [%04x] <%s> ", tbuf, THIS_CORO_ID, class_names[class]); + fprintf(l->fh, "%s [%04x] <%s> ", tbuf, THIS_THREAD_ID, class_names[class]); } fputs(buf->start, l->fh); fputc('\n', l->fh); @@ -313,6 +314,7 @@ debug(const char *msg, ...) va_start(args, msg); if (dbgf) { +#if 0 struct timespec dbg_time; clock_gettime(CLOCK_MONOTONIC, &dbg_time); uint nsec; @@ -329,10 +331,10 @@ debug(const char *msg, ...) sec = dbg_time.tv_sec - dbg_time_start.tv_sec - 1; } - int n = bsnprintf(pos, max, "%u.%09u: [%04x] ", sec, nsec, THIS_CORO_ID); + int n = bsnprintf(pos, max, "%u.%09u: [%04x] ", sec, nsec, THIS_THREAD_ID); pos += n; max -= n; - +#endif if (bvsnprintf(pos, max, msg, args) < 0) bug("Extremely long debug output, split it."); @@ -341,6 +343,21 @@ debug(const char *msg, ...) va_end(args); } +/** + * debug_safe - async-safe write to debug output + * @msg: a string message + * + * This function prints the message @msg to the debugging output in a + * way that is async safe and can be used in signal handlers. No newline + * character is appended. + */ +void +debug_safe(const char *msg) +{ + if (dbg_fd >= 0) + write(dbg_fd, msg, strlen(msg)); +} + static list * default_log_list(int initial, const char **syslog_name) { @@ -388,21 +405,6 @@ default_log_list(int initial, const char **syslog_name) } void -log_cleanup(int syslog) -{ - struct log_config *l; - - if (current_log_list) - WALK_LIST(l, *current_log_list) - if (l->rf) - log_close(l); - - if (syslog && current_syslog_name) - closelog(); -} - - -void log_switch(int initial, list *logs, const char *new_syslog_name) { struct log_config *l; @@ -414,7 +416,10 @@ log_switch(int initial, list *logs, const char *new_syslog_name) logs = default_log_list(initial, &new_syslog_name); /* Close the logs to avoid pinning them on disk when deleted */ - log_cleanup(0); + if (current_log_list) + WALK_LIST(l, *current_log_list) + if (l->rf) + log_close(l); /* Reopen the logs, needed for 'configure undo' */ if (logs) @@ -453,8 +458,10 @@ log_init_debug(char *f) { clock_gettime(CLOCK_MONOTONIC, &dbg_time_start); + dbg_fd = -1; if (dbgf && dbgf != stderr) fclose(dbgf); + if (!f) dbgf = NULL; else if (!*f) @@ -465,6 +472,10 @@ log_init_debug(char *f) fprintf(stderr, "bird: Unable to open debug file %s: %s\n", f, strerror(errno)); exit(1); } + if (dbgf) + { setvbuf(dbgf, NULL, _IONBF, 0); + dbg_fd = fileno(dbgf); + } } diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c index c326ba2b..0337c755 100644 --- a/sysdep/unix/main.c +++ b/sysdep/unix/main.c @@ -31,7 +31,7 @@ #include "lib/locking.h" #include "lib/timer.h" #include "lib/string.h" -#include "nest/route.h" +#include "nest/rt.h" #include "nest/protocol.h" #include "nest/iface.h" #include "nest/cli.h" @@ -52,12 +52,12 @@ async_dump(void) { debug("INTERNAL STATE DUMP\n\n"); - rp_dump(&root_pool); + rdump(&root_pool, 0); sk_dump_all(); // XXXX tm_dump_all(); if_dump_all(); neigh_dump_all(); - rta_dump_all(); + ea_dump_all(); rt_dump_all(); protos_dump_all(); @@ -117,7 +117,7 @@ add_num_const(char *name, int val, const char *file, const uint line) struct f_val *v = cfg_alloc(sizeof(struct f_val)); *v = (struct f_val) { .type = T_INT, .val.i = val }; struct symbol *sym = cf_get_symbol(name); - if (sym->class && (sym->scope == conf_this_scope)) + if (sym->class && cf_symbol_is_local(sym)) cf_error("Error reading value for %s from %s:%d: already defined", name, file, line); cf_define_symbol(sym, SYM_CONSTANT | T_INT, val, v); @@ -200,10 +200,10 @@ sysdep_preconfig(struct config *c) } int -sysdep_commit(struct config *new, struct config *old UNUSED) +sysdep_commit(struct config *new, struct config *old) { - if (!new->shutdown) - log_switch(0, &new->logfiles, new->syslog_name); + log_switch(0, &new->logfiles, new->syslog_name); + bird_thread_commit(new, old); return 0; } @@ -245,6 +245,8 @@ async_config(void) { struct config *conf; + config_free_old(); + log(L_INFO "Reconfiguration requested by SIGHUP"); if (!unix_read_config(&conf, config_name)) { @@ -283,6 +285,9 @@ cmd_read_config(const char *name) void cmd_check_config(const char *name) { + if (cli_access_restricted()) + return; + struct config *conf = cmd_read_config(name); if (!conf) return; @@ -327,6 +332,8 @@ cmd_reconfig(const char *name, int type, uint timeout) if (cli_access_restricted()) return; + config_free_old(); + struct config *conf = cmd_read_config(name); if (!conf) return; @@ -397,7 +404,8 @@ static char *path_control_socket = PATH_CONTROL_SOCKET; static void cli_write(cli *c) { - sock *s = c->priv; + sock *s = c->sock; + ASSERT_DIE(c->sock); while (c->tx_pos) { @@ -421,7 +429,9 @@ cli_write(cli *c) void cli_write_trigger(cli *c) { - sock *s = c->priv; + sock *s = c->sock; + if (!s) + return; if (s->tbuf == NULL) cli_write(c); @@ -436,8 +446,9 @@ cli_tx(sock *s) int cli_get_command(cli *c) { - sock *s = c->priv; - byte *t = c->rx_aux ? : s->rbuf; + sock *s = c->sock; + ASSERT_DIE(c->sock); + byte *t = s->rbuf; byte *tend = s->rpos; byte *d = c->rx_pos; byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2; @@ -448,16 +459,22 @@ cli_get_command(cli *c) t++; else if (*t == '\n') { + *d = 0; t++; + + /* Move remaining data and reset pointers */ + uint rest = (t < tend) ? (tend - t) : 0; + memmove(s->rbuf, t, rest); + s->rpos = s->rbuf + rest; c->rx_pos = c->rx_buf; - c->rx_aux = t; - *d = 0; + return (d < dend) ? 1 : -1; } else if (d < dend) *d++ = *t++; } - c->rx_aux = s->rpos = s->rbuf; + + s->rpos = s->rbuf; c->rx_pos = d; return 0; } @@ -479,6 +496,7 @@ cli_err(sock *s, int err) else log(L_INFO "CLI connection closed"); } + cli_free(s->data); } @@ -504,7 +522,6 @@ cli_connect(sock *s, uint size UNUSED) s->pool = c->pool; /* We need to have all the socket buffers allocated in the cli pool */ s->fast_rx = 1; c->rx_pos = c->rx_buf; - c->rx_aux = NULL; rmove(s, c->pool); return 1; } @@ -525,7 +542,7 @@ cli_init_unix(uid_t use_uid, gid_t use_gid) /* Return value intentionally ignored */ unlink(path_control_socket); - if (sk_open_unix(s, path_control_socket) < 0) + if (sk_open_unix(s, &main_birdloop, path_control_socket) < 0) die("Cannot create control socket %s: %m", path_control_socket); if (use_uid || use_gid) @@ -615,7 +632,6 @@ sysdep_shutdown_done(void) unlink_pid_file(); unlink(path_control_socket); log_msg(L_FATAL "Shutdown completed"); - log_cleanup(1); exit(0); } @@ -686,7 +702,7 @@ signal_init(void) * Parsing of command-line arguments */ -static char *opt_list = "c:dD:ps:P:u:g:flRh"; +static char *opt_list = "bc:dD:ps:P:u:g:flRh"; int parse_and_exit; char *bird_name; static char *use_user; @@ -865,8 +881,6 @@ parse_args(int argc, char **argv) } } -void resource_sys_init(void); - /* * Hic Est main() */ @@ -880,19 +894,17 @@ main(int argc, char **argv) #endif times_update(); - resource_sys_init(); parse_args(argc, argv); log_switch(1, NULL, NULL); the_bird_lock(); random_init(); - net_init(); resource_init(); birdloop_init(); olock_init(); - io_init(); rt_init(); + io_init(); if_init(); // roa_init(); config_init(); @@ -916,14 +928,14 @@ main(int argc, char **argv) open_pid_file(); protos_build(); - proto_build(&proto_unix_kernel); - proto_build(&proto_unix_iface); struct config *conf = read_config(); if (parse_and_exit) exit(0); + flush_local_pages(); + if (!run_in_foreground) { pid_t pid = fork(); diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h index 51ec1b1e..606b79cd 100644 --- a/sysdep/unix/unix.h +++ b/sysdep/unix/unix.h @@ -9,6 +9,9 @@ #ifndef _BIRD_UNIX_H_ #define _BIRD_UNIX_H_ +#include "nest/bird.h" +#include "lib/io-loop.h" + #include <sys/socket.h> #include <signal.h> @@ -16,6 +19,7 @@ struct pool; struct iface; struct birdsock; struct rfile; +struct config; /* main.c */ @@ -32,6 +36,8 @@ void cmd_reconfig_undo(void); void cmd_reconfig_status(void); void cmd_shutdown(void); void cmd_graceful_restart(void); +void cmd_show_threads(int); +void bird_thread_commit(struct config *new, struct config *old); #define UNIX_DEFAULT_CONFIGURE_TIMEOUT 300 @@ -107,7 +113,7 @@ extern volatile sig_atomic_t async_shutdown_flag; void io_init(void); void io_loop(void); void io_log_dump(void); -int sk_open_unix(struct birdsock *s, char *name); +int sk_open_unix(struct birdsock *s, struct birdloop *, char *name); struct rfile *rf_open(struct pool *, const char *name, const char *mode); void *rf_file(struct rfile *f); int rf_fileno(struct rfile *f); @@ -122,7 +128,6 @@ void krt_io_init(void); void main_thread_init(void); void log_init_debug(char *); /* Initialize debug dump to given file (NULL=stderr, ""=off) */ void log_switch(int initial, list *l, const char *); -void log_cleanup(int syslog); struct log_config { node n; |