diff options
author | Jo-Philipp Wich <jo@mein.io> | 2022-03-20 23:07:43 +0100 |
---|---|---|
committer | Jo-Philipp Wich <jo@mein.io> | 2022-03-21 10:49:16 +0100 |
commit | 28ee7e13854bd518348ede61de74fdceef61e2fd (patch) | |
tree | 7f4e3c53b9109e2dac02d648fcf4cbf8ade8ad79 /lib | |
parent | 753dea91bcfecb82fb5db646e72c9a022d2d2cbf (diff) |
uloop: add support for tasks
Tasks are similar to processes but instead of executing a new process, an
ucode function is invoked instead, running independently of the main
process.
Example usage:
uloop.init();
let t = uloop.task(
// program function
function(pipe) {
let input = pipe.receive();
pipe.send({ got_input: input });
return { result: true };
},
// parent recv function, invoked when task function calls pipe.send()
function(res) {
printf("Received output message: %.J\n", res);
},
// parent send function, invoked when task function calls pipe.receive()
function() {
let input = { test: "Example" };
printf("Sending input message: %.J\n", input);
return input;
}
);
uloop.run();
Signed-off-by: Jo-Philipp Wich <jo@mein.io>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/uloop.c | 517 |
1 files changed, 516 insertions, 1 deletions
diff --git a/lib/uloop.c b/lib/uloop.c index b74633d..6acb347 100644 --- a/lib/uloop.c +++ b/lib/uloop.c @@ -18,6 +18,7 @@ #include <string.h> #include <unistd.h> #include <limits.h> +#include <fcntl.h> #include <libubox/uloop.h> @@ -25,7 +26,7 @@ #define err_return(err) do { last_error = err; return NULL; } while(0) -static uc_resource_type_t *timer_type, *handle_type, *process_type; +static uc_resource_type_t *timer_type, *handle_type, *process_type, *task_type, *pipe_type; static uc_value_t *object_registry; static int last_error = 0; @@ -560,6 +561,475 @@ uc_uloop_process(uc_vm_t *vm, size_t nargs) } +static bool +readall(int fd, void *buf, size_t len) +{ + ssize_t rlen; + + while (len > 0) { + rlen = read(fd, buf, len); + + if (rlen == -1) { + if (errno == EINTR) + continue; + + return false; + } + + if (rlen == 0) { + errno = EINTR; + + return false; + } + + buf += rlen; + len -= rlen; + } + + return true; +} + +static bool +writeall(int fd, void *buf, size_t len) +{ + ssize_t wlen; + + while (len > 0) { + wlen = write(fd, buf, len); + + if (wlen == -1) { + if (errno == EINTR) + continue; + + return false; + } + + buf += wlen; + len -= wlen; + } + + return true; +} + +typedef struct { + int input; + int output; + bool has_sender; + bool has_receiver; +} uc_uloop_pipe_t; + +static uc_value_t * +uc_uloop_pipe_send_common(uc_vm_t *vm, uc_value_t *msg, int fd) +{ + uc_stringbuf_t *buf; + size_t len; + bool rv; + + buf = xprintbuf_new(); + + printbuf_memset(buf, 0, 0, sizeof(len)); + ucv_to_stringbuf(vm, buf, msg, true); + + len = printbuf_length(buf); + memcpy(buf->buf, &len, sizeof(len)); + + rv = writeall(fd, buf->buf, len); + + printbuf_free(buf); + + if (!rv) + err_return(errno); + + return ucv_boolean_new(true); +} + +static uc_value_t * +uc_uloop_pipe_send(uc_vm_t *vm, size_t nargs) +{ + uc_uloop_pipe_t **pipe = uc_fn_this("uloop.pipe"); + uc_value_t *msg = uc_fn_arg(0); + + if (!pipe || !*pipe) + err_return(EINVAL); + + if (!(*pipe)->has_receiver) + err_return(EPIPE); + + return uc_uloop_pipe_send_common(vm, msg, (*pipe)->output); +} + +static bool +uc_uloop_pipe_receive_common(uc_vm_t *vm, int fd, uc_value_t **res, bool skip) +{ + enum json_tokener_error err = json_tokener_error_parse_eof; + json_tokener *tok = NULL; + json_object *jso = NULL; + char buf[1024]; + ssize_t rlen; + size_t len; + + *res = NULL; + + if (!readall(fd, &len, sizeof(len))) + err_return(errno); + + /* message length 0 is special, means input requested on other pipe */ + if (len == 0) + err_return(ENODATA); + + /* valid messages should be at least sizeof(len) plus one byte of payload */ + if (len <= sizeof(len)) + err_return(EINVAL); + + len -= sizeof(len); + + while (len > 0) { + rlen = read(fd, buf, len < sizeof(buf) ? len : sizeof(buf)); + + if (rlen == -1) { + if (errno == EINTR) + continue; + + goto read_fail; + } + + /* premature EOF */ + if (rlen == 0) { + errno = EPIPE; + goto read_fail; + } + + if (!skip) { + if (!tok) + tok = xjs_new_tokener(); + + jso = json_tokener_parse_ex(tok, buf, rlen); + err = json_tokener_get_error(tok); + } + + len -= rlen; + } + + if (!skip) { + if (err == json_tokener_continue) { + jso = json_tokener_parse_ex(tok, "\0", 1); + err = json_tokener_get_error(tok); + } + + json_tokener_free(tok); + + if (err != json_tokener_success) { + errno = EINVAL; + goto read_fail; + } + + *res = ucv_from_json(vm, jso); + + json_object_put(jso); + } + + return true; + +read_fail: + if (tok) + json_tokener_free(tok); + + json_object_put(jso); + err_return(errno); +} + +static uc_value_t * +uc_uloop_pipe_receive(uc_vm_t *vm, size_t nargs) +{ + uc_uloop_pipe_t **pipe = uc_fn_this("uloop.pipe"); + uc_value_t *rv; + size_t len = 0; + + if (!pipe || !*pipe) + err_return(EINVAL); + + if (!(*pipe)->has_sender) + err_return(EPIPE); + + /* send zero-length message to signal input request */ + writeall((*pipe)->output, &len, sizeof(len)); + + /* receive input message */ + uc_uloop_pipe_receive_common(vm, (*pipe)->input, &rv, false); + + return rv; +} + +static uc_value_t * +uc_uloop_pipe_sending(uc_vm_t *vm, size_t nargs) +{ + uc_uloop_pipe_t **pipe = uc_fn_this("uloop.pipe"); + + if (!pipe || !*pipe) + err_return(EINVAL); + + return ucv_boolean_new((*pipe)->has_sender); +} + +static uc_value_t * +uc_uloop_pipe_receiving(uc_vm_t *vm, size_t nargs) +{ + uc_uloop_pipe_t **pipe = uc_fn_this("uloop.pipe"); + + if (!pipe || !*pipe) + err_return(EINVAL); + + return ucv_boolean_new((*pipe)->has_receiver); +} + + +typedef struct { + struct uloop_process process; + struct uloop_fd output; + size_t registry_index; + bool finished; + int input_fd; + uc_vm_t *vm; + uc_value_t *input_cb; + uc_value_t *output_cb; +} uc_uloop_task_t; + +static int +patch_devnull(int fd, bool write) +{ + int devnull = open("/dev/null", write ? O_WRONLY : O_RDONLY); + + if (devnull != -1) { + dup2(fd, devnull); + close(fd); + } + + return devnull; +} + +static void +uloop_fd_close(struct uloop_fd *fd) { + if (fd->fd == -1) + return; + + close(fd->fd); + fd->fd = -1; +} + +static void +uc_uloop_task_clear(uc_uloop_task_t **task) +{ + /* drop registry entries and clear data to prevent reuse */ + uc_uloop_reg_remove((*task)->registry_index); + *task = NULL; +} + +static uc_value_t * +uc_uloop_task_pid(uc_vm_t *vm, size_t nargs) +{ + uc_uloop_task_t **task = uc_fn_this("uloop.task"); + + if (!task || !*task) + err_return(EINVAL); + + if ((*task)->finished) + err_return(ESRCH); + + return ucv_int64_new((*task)->process.pid); +} + +static uc_value_t * +uc_uloop_task_kill(uc_vm_t *vm, size_t nargs) +{ + uc_uloop_task_t **task = uc_fn_this("uloop.task"); + int rv; + + if (!task || !*task) + err_return(EINVAL); + + if ((*task)->finished) + err_return(ESRCH); + + rv = kill((*task)->process.pid, SIGTERM); + + if (rv == -1) + err_return(errno); + + return ucv_boolean_new(true); +} + +static uc_value_t * +uc_uloop_task_finished(uc_vm_t *vm, size_t nargs) +{ + uc_uloop_task_t **task = uc_fn_this("uloop.task"); + + if (!task || !*task) + err_return(EINVAL); + + return ucv_boolean_new((*task)->finished); +} + +static void +uc_uloop_task_output_cb(struct uloop_fd *fd, unsigned int flags) +{ + uc_uloop_task_t *task = container_of(fd, uc_uloop_task_t, output); + uc_value_t *obj = ucv_array_get(object_registry, task->registry_index); + uc_value_t *msg; + + if (flags & ULOOP_READ) { + while (true) { + if (!uc_uloop_pipe_receive_common(task->vm, fd->fd, &msg, !task->output_cb)) { + /* input requested */ + if (last_error == ENODATA) { + uc_vm_stack_push(task->vm, ucv_get(obj)); + uc_vm_stack_push(task->vm, ucv_get(task->input_cb)); + + if (uc_vm_call(task->vm, true, 0) == EXCEPTION_NONE) + msg = uc_vm_stack_pop(task->vm); + else + msg = NULL; + + uc_uloop_pipe_send_common(task->vm, msg, task->input_fd); + ucv_put(msg); + + continue; + } + + /* error */ + break; + } + + uc_vm_stack_push(task->vm, ucv_get(obj)); + uc_vm_stack_push(task->vm, ucv_get(task->output_cb)); + uc_vm_stack_push(task->vm, msg); + + if (uc_vm_call(task->vm, true, 1) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(task->vm)); + } + } + + if (!fd->registered && task->finished) { + close(task->input_fd); + task->input_fd = -1; + + uloop_fd_close(&task->output); + uloop_process_delete(&task->process); + + uc_uloop_task_clear(&task); + } +} + +static void +uc_uloop_task_process_cb(struct uloop_process *proc, int exitcode) +{ + uc_uloop_task_t *task = container_of(proc, uc_uloop_task_t, process); + + task->finished = true; + + uc_uloop_task_output_cb(&task->output, ULOOP_READ); +} + +static uc_value_t * +uc_uloop_task(uc_vm_t *vm, size_t nargs) +{ + uc_value_t *func = uc_fn_arg(0); + uc_value_t *output_cb = uc_fn_arg(1); + uc_value_t *input_cb = uc_fn_arg(2); + int outpipe[2] = { -1, -1 }; + int inpipe[2] = { -1, -1 }; + uc_value_t *res, *cbs, *p; + uc_uloop_pipe_t *tpipe; + uc_uloop_task_t *task; + pid_t pid; + int err; + + if (!ucv_is_callable(func) || + (output_cb && !ucv_is_callable(output_cb)) || + (input_cb && !ucv_is_callable(input_cb))) + err_return(EINVAL); + + if (pipe(outpipe) == -1 || pipe(inpipe) == -1) { + err = errno; + + close(outpipe[0]); close(outpipe[1]); + close(inpipe[0]); close(inpipe[1]); + + err_return(err); + } + + pid = fork(); + + if (pid == -1) + err_return(errno); + + if (pid == 0) { + patch_devnull(0, false); + patch_devnull(1, true); + patch_devnull(2, true); + + vm->output = fdopen(1, "w"); + + close(inpipe[1]); + close(outpipe[0]); + + tpipe = xalloc(sizeof(*tpipe)); + tpipe->input = inpipe[0]; + tpipe->output = outpipe[1]; + tpipe->has_sender = input_cb; + tpipe->has_receiver = output_cb; + + p = uc_resource_new(pipe_type, tpipe); + + uc_vm_stack_push(vm, func); + uc_vm_stack_push(vm, ucv_get(p)); + + if (uc_vm_call(vm, false, 1) == EXCEPTION_NONE) { + res = uc_vm_stack_pop(vm); + uc_uloop_pipe_send_common(vm, res, tpipe->output); + ucv_put(res); + } + + ucv_put(p); + + _exit(0); + } + + close(inpipe[0]); + close(outpipe[1]); + + task = xalloc(sizeof(*task)); + task->process.pid = pid; + task->process.cb = uc_uloop_task_process_cb; + + task->vm = vm; + + task->output.fd = outpipe[0]; + task->output.cb = uc_uloop_task_output_cb; + task->output_cb = output_cb; + uloop_fd_add(&task->output, ULOOP_READ); + + if (input_cb) { + task->input_fd = inpipe[1]; + task->input_cb = input_cb; + } + else { + task->input_fd = -1; + close(inpipe[1]); + } + + uloop_process_add(&task->process); + + res = uc_resource_new(task_type, task); + + cbs = ucv_array_new(NULL); + ucv_array_set(cbs, 0, ucv_get(output_cb)); + ucv_array_set(cbs, 1, ucv_get(input_cb)); + + task->registry_index = uc_uloop_reg_add(res, cbs); + + return res; +} + static const uc_function_list_t timer_fns[] = { { "set", uc_uloop_timer_set }, @@ -578,6 +1048,19 @@ static const uc_function_list_t process_fns[] = { { "delete", uc_uloop_process_delete }, }; +static const uc_function_list_t task_fns[] = { + { "pid", uc_uloop_task_pid }, + { "kill", uc_uloop_task_kill }, + { "finished", uc_uloop_task_finished }, +}; + +static const uc_function_list_t pipe_fns[] = { + { "send", uc_uloop_pipe_send }, + { "receive", uc_uloop_pipe_receive }, + { "sending", uc_uloop_pipe_sending }, + { "receiving", uc_uloop_pipe_receiving }, +}; + static const uc_function_list_t global_fns[] = { { "error", uc_uloop_error }, { "init", uc_uloop_init }, @@ -585,6 +1068,7 @@ static const uc_function_list_t global_fns[] = { { "timer", uc_uloop_timer }, { "handle", uc_uloop_handle }, { "process", uc_uloop_process }, + { "task", uc_uloop_task }, { "cancelling", uc_uloop_cancelling }, { "running", uc_uloop_running }, { "done", uc_uloop_done }, @@ -626,6 +1110,35 @@ static void close_process(void *ud) free(process); } +static void close_task(void *ud) +{ + uc_uloop_task_t *task = ud; + + if (!task) + return; + + uloop_process_delete(&task->process); + uloop_fd_close(&task->output); + + if (task->input_fd != -1) + close(task->input_fd); + + free(task); +} + +static void close_pipe(void *ud) +{ + uc_uloop_pipe_t *pipe = ud; + + if (!pipe) + return; + + close(pipe->input); + close(pipe->output); + + free(pipe); +} + void uc_module_init(uc_vm_t *vm, uc_value_t *scope) { @@ -641,6 +1154,8 @@ void uc_module_init(uc_vm_t *vm, uc_value_t *scope) timer_type = uc_type_declare(vm, "uloop.timer", timer_fns, close_timer); handle_type = uc_type_declare(vm, "uloop.handle", handle_fns, close_handle); process_type = uc_type_declare(vm, "uloop.process", process_fns, close_process); + task_type = uc_type_declare(vm, "uloop.task", task_fns, close_task); + pipe_type = uc_type_declare(vm, "uloop.pipe", pipe_fns, close_pipe); object_registry = ucv_array_new(vm); |