summaryrefslogtreecommitdiffhomepage
path: root/lib
diff options
context:
space:
mode:
authorJo-Philipp Wich <jo@mein.io>2022-03-20 23:07:43 +0100
committerJo-Philipp Wich <jo@mein.io>2022-03-21 10:49:16 +0100
commit28ee7e13854bd518348ede61de74fdceef61e2fd (patch)
tree7f4e3c53b9109e2dac02d648fcf4cbf8ade8ad79 /lib
parent753dea91bcfecb82fb5db646e72c9a022d2d2cbf (diff)
uloop: add support for tasks
Tasks are similar to processes but instead of executing a new process, an ucode function is invoked instead, running independently of the main process. Example usage: uloop.init(); let t = uloop.task( // program function function(pipe) { let input = pipe.receive(); pipe.send({ got_input: input }); return { result: true }; }, // parent recv function, invoked when task function calls pipe.send() function(res) { printf("Received output message: %.J\n", res); }, // parent send function, invoked when task function calls pipe.receive() function() { let input = { test: "Example" }; printf("Sending input message: %.J\n", input); return input; } ); uloop.run(); Signed-off-by: Jo-Philipp Wich <jo@mein.io>
Diffstat (limited to 'lib')
-rw-r--r--lib/uloop.c517
1 files changed, 516 insertions, 1 deletions
diff --git a/lib/uloop.c b/lib/uloop.c
index b74633d..6acb347 100644
--- a/lib/uloop.c
+++ b/lib/uloop.c
@@ -18,6 +18,7 @@
#include <string.h>
#include <unistd.h>
#include <limits.h>
+#include <fcntl.h>
#include <libubox/uloop.h>
@@ -25,7 +26,7 @@
#define err_return(err) do { last_error = err; return NULL; } while(0)
-static uc_resource_type_t *timer_type, *handle_type, *process_type;
+static uc_resource_type_t *timer_type, *handle_type, *process_type, *task_type, *pipe_type;
static uc_value_t *object_registry;
static int last_error = 0;
@@ -560,6 +561,475 @@ uc_uloop_process(uc_vm_t *vm, size_t nargs)
}
+static bool
+readall(int fd, void *buf, size_t len)
+{
+ ssize_t rlen;
+
+ while (len > 0) {
+ rlen = read(fd, buf, len);
+
+ if (rlen == -1) {
+ if (errno == EINTR)
+ continue;
+
+ return false;
+ }
+
+ if (rlen == 0) {
+ errno = EINTR;
+
+ return false;
+ }
+
+ buf += rlen;
+ len -= rlen;
+ }
+
+ return true;
+}
+
+static bool
+writeall(int fd, void *buf, size_t len)
+{
+ ssize_t wlen;
+
+ while (len > 0) {
+ wlen = write(fd, buf, len);
+
+ if (wlen == -1) {
+ if (errno == EINTR)
+ continue;
+
+ return false;
+ }
+
+ buf += wlen;
+ len -= wlen;
+ }
+
+ return true;
+}
+
+typedef struct {
+ int input;
+ int output;
+ bool has_sender;
+ bool has_receiver;
+} uc_uloop_pipe_t;
+
+static uc_value_t *
+uc_uloop_pipe_send_common(uc_vm_t *vm, uc_value_t *msg, int fd)
+{
+ uc_stringbuf_t *buf;
+ size_t len;
+ bool rv;
+
+ buf = xprintbuf_new();
+
+ printbuf_memset(buf, 0, 0, sizeof(len));
+ ucv_to_stringbuf(vm, buf, msg, true);
+
+ len = printbuf_length(buf);
+ memcpy(buf->buf, &len, sizeof(len));
+
+ rv = writeall(fd, buf->buf, len);
+
+ printbuf_free(buf);
+
+ if (!rv)
+ err_return(errno);
+
+ return ucv_boolean_new(true);
+}
+
+static uc_value_t *
+uc_uloop_pipe_send(uc_vm_t *vm, size_t nargs)
+{
+ uc_uloop_pipe_t **pipe = uc_fn_this("uloop.pipe");
+ uc_value_t *msg = uc_fn_arg(0);
+
+ if (!pipe || !*pipe)
+ err_return(EINVAL);
+
+ if (!(*pipe)->has_receiver)
+ err_return(EPIPE);
+
+ return uc_uloop_pipe_send_common(vm, msg, (*pipe)->output);
+}
+
+static bool
+uc_uloop_pipe_receive_common(uc_vm_t *vm, int fd, uc_value_t **res, bool skip)
+{
+ enum json_tokener_error err = json_tokener_error_parse_eof;
+ json_tokener *tok = NULL;
+ json_object *jso = NULL;
+ char buf[1024];
+ ssize_t rlen;
+ size_t len;
+
+ *res = NULL;
+
+ if (!readall(fd, &len, sizeof(len)))
+ err_return(errno);
+
+ /* message length 0 is special, means input requested on other pipe */
+ if (len == 0)
+ err_return(ENODATA);
+
+ /* valid messages should be at least sizeof(len) plus one byte of payload */
+ if (len <= sizeof(len))
+ err_return(EINVAL);
+
+ len -= sizeof(len);
+
+ while (len > 0) {
+ rlen = read(fd, buf, len < sizeof(buf) ? len : sizeof(buf));
+
+ if (rlen == -1) {
+ if (errno == EINTR)
+ continue;
+
+ goto read_fail;
+ }
+
+ /* premature EOF */
+ if (rlen == 0) {
+ errno = EPIPE;
+ goto read_fail;
+ }
+
+ if (!skip) {
+ if (!tok)
+ tok = xjs_new_tokener();
+
+ jso = json_tokener_parse_ex(tok, buf, rlen);
+ err = json_tokener_get_error(tok);
+ }
+
+ len -= rlen;
+ }
+
+ if (!skip) {
+ if (err == json_tokener_continue) {
+ jso = json_tokener_parse_ex(tok, "\0", 1);
+ err = json_tokener_get_error(tok);
+ }
+
+ json_tokener_free(tok);
+
+ if (err != json_tokener_success) {
+ errno = EINVAL;
+ goto read_fail;
+ }
+
+ *res = ucv_from_json(vm, jso);
+
+ json_object_put(jso);
+ }
+
+ return true;
+
+read_fail:
+ if (tok)
+ json_tokener_free(tok);
+
+ json_object_put(jso);
+ err_return(errno);
+}
+
+static uc_value_t *
+uc_uloop_pipe_receive(uc_vm_t *vm, size_t nargs)
+{
+ uc_uloop_pipe_t **pipe = uc_fn_this("uloop.pipe");
+ uc_value_t *rv;
+ size_t len = 0;
+
+ if (!pipe || !*pipe)
+ err_return(EINVAL);
+
+ if (!(*pipe)->has_sender)
+ err_return(EPIPE);
+
+ /* send zero-length message to signal input request */
+ writeall((*pipe)->output, &len, sizeof(len));
+
+ /* receive input message */
+ uc_uloop_pipe_receive_common(vm, (*pipe)->input, &rv, false);
+
+ return rv;
+}
+
+static uc_value_t *
+uc_uloop_pipe_sending(uc_vm_t *vm, size_t nargs)
+{
+ uc_uloop_pipe_t **pipe = uc_fn_this("uloop.pipe");
+
+ if (!pipe || !*pipe)
+ err_return(EINVAL);
+
+ return ucv_boolean_new((*pipe)->has_sender);
+}
+
+static uc_value_t *
+uc_uloop_pipe_receiving(uc_vm_t *vm, size_t nargs)
+{
+ uc_uloop_pipe_t **pipe = uc_fn_this("uloop.pipe");
+
+ if (!pipe || !*pipe)
+ err_return(EINVAL);
+
+ return ucv_boolean_new((*pipe)->has_receiver);
+}
+
+
+typedef struct {
+ struct uloop_process process;
+ struct uloop_fd output;
+ size_t registry_index;
+ bool finished;
+ int input_fd;
+ uc_vm_t *vm;
+ uc_value_t *input_cb;
+ uc_value_t *output_cb;
+} uc_uloop_task_t;
+
+static int
+patch_devnull(int fd, bool write)
+{
+ int devnull = open("/dev/null", write ? O_WRONLY : O_RDONLY);
+
+ if (devnull != -1) {
+ dup2(fd, devnull);
+ close(fd);
+ }
+
+ return devnull;
+}
+
+static void
+uloop_fd_close(struct uloop_fd *fd) {
+ if (fd->fd == -1)
+ return;
+
+ close(fd->fd);
+ fd->fd = -1;
+}
+
+static void
+uc_uloop_task_clear(uc_uloop_task_t **task)
+{
+ /* drop registry entries and clear data to prevent reuse */
+ uc_uloop_reg_remove((*task)->registry_index);
+ *task = NULL;
+}
+
+static uc_value_t *
+uc_uloop_task_pid(uc_vm_t *vm, size_t nargs)
+{
+ uc_uloop_task_t **task = uc_fn_this("uloop.task");
+
+ if (!task || !*task)
+ err_return(EINVAL);
+
+ if ((*task)->finished)
+ err_return(ESRCH);
+
+ return ucv_int64_new((*task)->process.pid);
+}
+
+static uc_value_t *
+uc_uloop_task_kill(uc_vm_t *vm, size_t nargs)
+{
+ uc_uloop_task_t **task = uc_fn_this("uloop.task");
+ int rv;
+
+ if (!task || !*task)
+ err_return(EINVAL);
+
+ if ((*task)->finished)
+ err_return(ESRCH);
+
+ rv = kill((*task)->process.pid, SIGTERM);
+
+ if (rv == -1)
+ err_return(errno);
+
+ return ucv_boolean_new(true);
+}
+
+static uc_value_t *
+uc_uloop_task_finished(uc_vm_t *vm, size_t nargs)
+{
+ uc_uloop_task_t **task = uc_fn_this("uloop.task");
+
+ if (!task || !*task)
+ err_return(EINVAL);
+
+ return ucv_boolean_new((*task)->finished);
+}
+
+static void
+uc_uloop_task_output_cb(struct uloop_fd *fd, unsigned int flags)
+{
+ uc_uloop_task_t *task = container_of(fd, uc_uloop_task_t, output);
+ uc_value_t *obj = ucv_array_get(object_registry, task->registry_index);
+ uc_value_t *msg;
+
+ if (flags & ULOOP_READ) {
+ while (true) {
+ if (!uc_uloop_pipe_receive_common(task->vm, fd->fd, &msg, !task->output_cb)) {
+ /* input requested */
+ if (last_error == ENODATA) {
+ uc_vm_stack_push(task->vm, ucv_get(obj));
+ uc_vm_stack_push(task->vm, ucv_get(task->input_cb));
+
+ if (uc_vm_call(task->vm, true, 0) == EXCEPTION_NONE)
+ msg = uc_vm_stack_pop(task->vm);
+ else
+ msg = NULL;
+
+ uc_uloop_pipe_send_common(task->vm, msg, task->input_fd);
+ ucv_put(msg);
+
+ continue;
+ }
+
+ /* error */
+ break;
+ }
+
+ uc_vm_stack_push(task->vm, ucv_get(obj));
+ uc_vm_stack_push(task->vm, ucv_get(task->output_cb));
+ uc_vm_stack_push(task->vm, msg);
+
+ if (uc_vm_call(task->vm, true, 1) == EXCEPTION_NONE)
+ ucv_put(uc_vm_stack_pop(task->vm));
+ }
+ }
+
+ if (!fd->registered && task->finished) {
+ close(task->input_fd);
+ task->input_fd = -1;
+
+ uloop_fd_close(&task->output);
+ uloop_process_delete(&task->process);
+
+ uc_uloop_task_clear(&task);
+ }
+}
+
+static void
+uc_uloop_task_process_cb(struct uloop_process *proc, int exitcode)
+{
+ uc_uloop_task_t *task = container_of(proc, uc_uloop_task_t, process);
+
+ task->finished = true;
+
+ uc_uloop_task_output_cb(&task->output, ULOOP_READ);
+}
+
+static uc_value_t *
+uc_uloop_task(uc_vm_t *vm, size_t nargs)
+{
+ uc_value_t *func = uc_fn_arg(0);
+ uc_value_t *output_cb = uc_fn_arg(1);
+ uc_value_t *input_cb = uc_fn_arg(2);
+ int outpipe[2] = { -1, -1 };
+ int inpipe[2] = { -1, -1 };
+ uc_value_t *res, *cbs, *p;
+ uc_uloop_pipe_t *tpipe;
+ uc_uloop_task_t *task;
+ pid_t pid;
+ int err;
+
+ if (!ucv_is_callable(func) ||
+ (output_cb && !ucv_is_callable(output_cb)) ||
+ (input_cb && !ucv_is_callable(input_cb)))
+ err_return(EINVAL);
+
+ if (pipe(outpipe) == -1 || pipe(inpipe) == -1) {
+ err = errno;
+
+ close(outpipe[0]); close(outpipe[1]);
+ close(inpipe[0]); close(inpipe[1]);
+
+ err_return(err);
+ }
+
+ pid = fork();
+
+ if (pid == -1)
+ err_return(errno);
+
+ if (pid == 0) {
+ patch_devnull(0, false);
+ patch_devnull(1, true);
+ patch_devnull(2, true);
+
+ vm->output = fdopen(1, "w");
+
+ close(inpipe[1]);
+ close(outpipe[0]);
+
+ tpipe = xalloc(sizeof(*tpipe));
+ tpipe->input = inpipe[0];
+ tpipe->output = outpipe[1];
+ tpipe->has_sender = input_cb;
+ tpipe->has_receiver = output_cb;
+
+ p = uc_resource_new(pipe_type, tpipe);
+
+ uc_vm_stack_push(vm, func);
+ uc_vm_stack_push(vm, ucv_get(p));
+
+ if (uc_vm_call(vm, false, 1) == EXCEPTION_NONE) {
+ res = uc_vm_stack_pop(vm);
+ uc_uloop_pipe_send_common(vm, res, tpipe->output);
+ ucv_put(res);
+ }
+
+ ucv_put(p);
+
+ _exit(0);
+ }
+
+ close(inpipe[0]);
+ close(outpipe[1]);
+
+ task = xalloc(sizeof(*task));
+ task->process.pid = pid;
+ task->process.cb = uc_uloop_task_process_cb;
+
+ task->vm = vm;
+
+ task->output.fd = outpipe[0];
+ task->output.cb = uc_uloop_task_output_cb;
+ task->output_cb = output_cb;
+ uloop_fd_add(&task->output, ULOOP_READ);
+
+ if (input_cb) {
+ task->input_fd = inpipe[1];
+ task->input_cb = input_cb;
+ }
+ else {
+ task->input_fd = -1;
+ close(inpipe[1]);
+ }
+
+ uloop_process_add(&task->process);
+
+ res = uc_resource_new(task_type, task);
+
+ cbs = ucv_array_new(NULL);
+ ucv_array_set(cbs, 0, ucv_get(output_cb));
+ ucv_array_set(cbs, 1, ucv_get(input_cb));
+
+ task->registry_index = uc_uloop_reg_add(res, cbs);
+
+ return res;
+}
+
static const uc_function_list_t timer_fns[] = {
{ "set", uc_uloop_timer_set },
@@ -578,6 +1048,19 @@ static const uc_function_list_t process_fns[] = {
{ "delete", uc_uloop_process_delete },
};
+static const uc_function_list_t task_fns[] = {
+ { "pid", uc_uloop_task_pid },
+ { "kill", uc_uloop_task_kill },
+ { "finished", uc_uloop_task_finished },
+};
+
+static const uc_function_list_t pipe_fns[] = {
+ { "send", uc_uloop_pipe_send },
+ { "receive", uc_uloop_pipe_receive },
+ { "sending", uc_uloop_pipe_sending },
+ { "receiving", uc_uloop_pipe_receiving },
+};
+
static const uc_function_list_t global_fns[] = {
{ "error", uc_uloop_error },
{ "init", uc_uloop_init },
@@ -585,6 +1068,7 @@ static const uc_function_list_t global_fns[] = {
{ "timer", uc_uloop_timer },
{ "handle", uc_uloop_handle },
{ "process", uc_uloop_process },
+ { "task", uc_uloop_task },
{ "cancelling", uc_uloop_cancelling },
{ "running", uc_uloop_running },
{ "done", uc_uloop_done },
@@ -626,6 +1110,35 @@ static void close_process(void *ud)
free(process);
}
+static void close_task(void *ud)
+{
+ uc_uloop_task_t *task = ud;
+
+ if (!task)
+ return;
+
+ uloop_process_delete(&task->process);
+ uloop_fd_close(&task->output);
+
+ if (task->input_fd != -1)
+ close(task->input_fd);
+
+ free(task);
+}
+
+static void close_pipe(void *ud)
+{
+ uc_uloop_pipe_t *pipe = ud;
+
+ if (!pipe)
+ return;
+
+ close(pipe->input);
+ close(pipe->output);
+
+ free(pipe);
+}
+
void uc_module_init(uc_vm_t *vm, uc_value_t *scope)
{
@@ -641,6 +1154,8 @@ void uc_module_init(uc_vm_t *vm, uc_value_t *scope)
timer_type = uc_type_declare(vm, "uloop.timer", timer_fns, close_timer);
handle_type = uc_type_declare(vm, "uloop.handle", handle_fns, close_handle);
process_type = uc_type_declare(vm, "uloop.process", process_fns, close_process);
+ task_type = uc_type_declare(vm, "uloop.task", task_fns, close_task);
+ pipe_type = uc_type_declare(vm, "uloop.pipe", pipe_fns, close_pipe);
object_registry = ucv_array_new(vm);