From f0e865fe3dddb4c6681c7f5be25438f1d0f4ae68 Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Wed, 1 Jan 2025 16:26:50 +0100 Subject: ubus: add named parameter support in functions with many params When a function supports many parameters, the order can be somewhat confusing, especially when dealing with several optional ones. In order to make this easier to use, support for passing an object with named parameters. for example: obj.notify("test", data, null, null, null, 1000); can be written as: obj.notify({ method: "test", data, timeout: 1000 }); Signed-off-by: Felix Fietkau --- lib/ubus.c | 64 +++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 24 deletions(-) (limited to 'lib/ubus.c') diff --git a/lib/ubus.c b/lib/ubus.c index 40711d1..78838f1 100644 --- a/lib/ubus.c +++ b/lib/ubus.c @@ -23,6 +23,10 @@ #define ok_return(expr) do { set_error(0, NULL); return (expr); } while(0) #define err_return(err, ...) do { set_error(err, __VA_ARGS__); return NULL; } while(0) +#define REQUIRED 0 +#define OPTIONAL 1 +#define NAMED 2 + static struct { enum ubus_msg_status code; char *msg; @@ -62,14 +66,20 @@ _arg_type(uc_type_t type) } static bool -_args_get(uc_vm_t *vm, size_t nargs, ...) +_args_get(uc_vm_t *vm, bool named, size_t nargs, ...) { - uc_value_t **ptr, *arg; + uc_value_t **ptr, *arg, *obj; uc_type_t type, t; const char *name; size_t index = 0; va_list ap; - bool opt; + int opt; + + if (named) { + obj = uc_fn_arg(0); + if (nargs != 1 || ucv_type(obj) != UC_OBJECT) + named = false; + } va_start(ap, nargs); @@ -79,13 +89,18 @@ _args_get(uc_vm_t *vm, size_t nargs, ...) if (!name) break; - arg = uc_fn_arg(index++); - type = va_arg(ap, uc_type_t); opt = va_arg(ap, int); ptr = va_arg(ap, uc_value_t **); - if (!opt && !arg) + if (named) + arg = ucv_object_get(obj, name, NULL); + else if (opt != NAMED) + arg = uc_fn_arg(index++); + else + arg = NULL; + + if (opt == REQUIRED && !arg) err_return(UBUS_STATUS_INVALID_ARGUMENT, "Argument %s is required", name); t = ucv_type(arg); @@ -104,7 +119,8 @@ _args_get(uc_vm_t *vm, size_t nargs, ...) ok_return(true); } -#define args_get(vm, nargs, ...) do { if (!_args_get(vm, nargs, __VA_ARGS__, NULL)) return NULL; } while(0) +#define args_get_named(vm, nargs, ...) do { if (!_args_get(vm, true, nargs, __VA_ARGS__, NULL)) return NULL; } while(0) +#define args_get(vm, nargs, ...) do { if (!_args_get(vm, false, nargs, __VA_ARGS__, NULL)) return NULL; } while(0) static uc_resource_type_t *subscriber_type; static uc_resource_type_t *listener_type; @@ -688,11 +704,11 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) conn_get(vm, &c); - args_get(vm, nargs, - "object name", UC_STRING, false, &objname, - "function name", UC_STRING, false, &funname, - "function arguments", UC_OBJECT, true, &funargs, - "multiple return", UC_BOOLEAN, true, &mret); + args_get_named(vm, nargs, + "object", UC_STRING, REQUIRED, &objname, + "method", UC_STRING, REQUIRED, &funname, + "data", UC_OBJECT, OPTIONAL, &funargs, + "multiple_return", UC_BOOLEAN, OPTIONAL, &mret); blob_buf_init(&c->buf, 0); @@ -729,11 +745,11 @@ uc_ubus_defer(uc_vm_t *vm, size_t nargs) conn_get(vm, &c); - args_get(vm, nargs, - "object name", UC_STRING, false, &objname, - "function name", UC_STRING, false, &funname, - "function arguments", UC_OBJECT, true, &funargs, - "reply callback", UC_CLOSURE, true, &replycb); + args_get_named(vm, nargs, + "object", UC_STRING, REQUIRED, &objname, + "method", UC_STRING, REQUIRED, &funname, + "data", UC_OBJECT, OPTIONAL, &funargs, + "cb", UC_CLOSURE, OPTIONAL, &replycb); blob_buf_init(&c->buf, 0); @@ -1005,13 +1021,13 @@ uc_ubus_object_notify(uc_vm_t *vm, size_t nargs) if (!uuobj || !*uuobj) err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid object context"); - args_get(vm, nargs, - "typename", UC_STRING, false, &typename, - "message", UC_OBJECT, true, &message, - "data callback", UC_CLOSURE, true, &data_cb, - "status callback", UC_CLOSURE, true, &status_cb, - "completion callback", UC_CLOSURE, true, &complete_cb, - "timeout", UC_INTEGER, true, &timeout); + args_get_named(vm, nargs, + "type", UC_STRING, REQUIRED, &typename, + "data", UC_OBJECT, OPTIONAL, &message, + "data_cb", UC_CLOSURE, OPTIONAL, &data_cb, + "status_cb", UC_CLOSURE, OPTIONAL, &status_cb, + "cb", UC_CLOSURE, OPTIONAL, &complete_cb, + "timeout", UC_INTEGER, OPTIONAL, &timeout); t = timeout ? ucv_int64_get(timeout) : -1; -- cgit v1.2.3 From 9da8b8aa4ad0ff12bc150892965cedf1a507064d Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Wed, 1 Jan 2025 15:19:48 +0100 Subject: ubus: add defer.await() method This can be used to synchronously complete a deferred ubus request, waiting for completion or timeout. Signed-off-by: Felix Fietkau --- CMakeLists.txt | 4 ++++ lib/ubus.c | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+) (limited to 'lib/ubus.c') diff --git a/CMakeLists.txt b/CMakeLists.txt index bdf0738..0e6025d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -181,6 +181,10 @@ if(UBUS_SUPPORT) ${CMAKE_BINARY_DIR} "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test.c") check_symbol_exists(uloop_fd_set_cb "libubox/uloop.h" FD_SET_CB_EXISTS) + check_function_exists(uloop_timeout_remaining64 REMAINING64_FUNCTION_EXISTS) + if(REMAINING64_FUNCTION_EXISTS) + target_compile_definitions(ubus_lib PUBLIC HAVE_ULOOP_TIMEOUT_REMAINING64) + endif() if(HAVE_NEW_UBUS_STATUS_CODES) add_definitions(-DHAVE_NEW_UBUS_STATUS_CODES) endif() diff --git a/lib/ubus.c b/lib/ubus.c index 78838f1..b49a134 100644 --- a/lib/ubus.c +++ b/lib/ubus.c @@ -1980,6 +1980,29 @@ uc_ubus_defer_completed(uc_vm_t *vm, size_t nargs) ok_return(ucv_boolean_new((*d)->complete)); } +static uc_value_t * +uc_ubus_defer_await(uc_vm_t *vm, size_t nargs) +{ + uc_ubus_deferred_t *d = uc_fn_thisval("ubus.deferred"); + int64_t remaining; + + if (!d) + err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid deferred context"); + + if (d->complete) + ok_return(ucv_boolean_new(false)); + +#ifdef HAVE_ULOOP_TIMEOUT_REMAINING64 + remaining = uloop_timeout_remaining64(&d->timeout); +#else + remaining = uloop_timeout_remaining(&d->timeout); +#endif + + ubus_complete_request(d->ctx, &d->request, remaining); + + ok_return(ucv_boolean_new(true)); +} + static uc_value_t * uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs) { @@ -2026,6 +2049,7 @@ static const uc_function_list_t conn_fns[] = { }; static const uc_function_list_t defer_fns[] = { + { "await", uc_ubus_defer_await }, { "completed", uc_ubus_defer_completed }, { "abort", uc_ubus_defer_abort }, }; -- cgit v1.2.3 From 17dbf0bef4b6471e55331e86fc5983bd3e2602de Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Wed, 1 Jan 2025 15:58:26 +0100 Subject: ubus: add request get_fd/set_fd methods This can be used to send and receive file descriptors from within an object method call. Signed-off-by: Felix Fietkau --- lib/ubus.c | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 11 deletions(-) (limited to 'lib/ubus.c') diff --git a/lib/ubus.c b/lib/ubus.c index b49a134..330ef5d 100644 --- a/lib/ubus.c +++ b/lib/ubus.c @@ -15,6 +15,7 @@ */ #include +#include #include #include @@ -693,6 +694,34 @@ uc_ubus_have_uloop(void) return active; } +static int +get_fd(uc_vm_t *vm, uc_value_t *val) +{ + uc_value_t *fn; + int64_t n; + + fn = ucv_property_get(val, "fileno"); + if (ucv_is_callable(fn)) { + uc_vm_stack_push(vm, ucv_get(val)); + uc_vm_stack_push(vm, ucv_get(fn)); + + if (uc_vm_call(vm, true, 0) != EXCEPTION_NONE) + return -1; + + val = uc_vm_stack_pop(vm); + n = ucv_int64_get(val); + ucv_put(val); + } + else { + n = ucv_int64_get(val); + } + + if (errno || n < 0 || n > (int64_t)INT_MAX) + return -1; + + return (int)n; +} + static uc_value_t * uc_ubus_call(uc_vm_t *vm, size_t nargs) { @@ -814,6 +843,19 @@ uc_ubus_defer(uc_vm_t *vm, size_t nargs) * -------------------------------------------------------------------------- */ +static void +uc_ubus_request_finish_common(uc_ubus_request_t *callctx, int code) +{ + int fd; + + fd = ubus_request_get_caller_fd(&callctx->req); + if (fd >= 0) + close(fd); + + callctx->replied = true; + ubus_complete_deferred_request(callctx->ctx, &callctx->req, code); +} + static void uc_ubus_request_finish(uc_ubus_request_t *callctx, int code, uc_value_t *reply) { @@ -826,9 +868,7 @@ uc_ubus_request_finish(uc_ubus_request_t *callctx, int code, uc_value_t *reply) ubus_send_reply(callctx->ctx, &callctx->req, buf.head); } - callctx->replied = true; - - ubus_complete_deferred_request(callctx->ctx, &callctx->req, code); + uc_ubus_request_finish_common(callctx, code); request_reg_clear(callctx->vm, callctx->registry_index); } @@ -881,6 +921,34 @@ uc_ubus_request_defer(uc_vm_t *vm, size_t nargs) return ucv_boolean_new(true); } +static uc_value_t * +uc_ubus_request_get_fd(uc_vm_t *vm, size_t nargs) +{ + uc_ubus_request_t *callctx = uc_fn_thisval("ubus.request"); + + if (!callctx) + return NULL; + + return ucv_int64_new(ubus_request_get_caller_fd(&callctx->req)); +} + +static uc_value_t * +uc_ubus_request_set_fd(uc_vm_t *vm, size_t nargs) +{ + uc_ubus_request_t *callctx = uc_fn_thisval("ubus.request"); + int fd; + + if (!callctx) + err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid call context"); + + fd = get_fd(vm, uc_fn_arg(0)); + if (fd < 0) + err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor"); + + ubus_request_set_fd(callctx->ctx, &callctx->req, fd); + return ucv_boolean_new(true); +} + static uc_value_t * uc_ubus_request_error(uc_vm_t *vm, size_t nargs) { @@ -1251,6 +1319,9 @@ uc_ubus_handle_reply_common(struct ubus_context *ctx, ubus_defer_request(ctx, req, &callctx->req); + /* fd is copied to deferred request. ensure it does not get closed early */ + ubus_request_get_caller_fd(req); + /* create ucode request type object and set properties */ reqobj = uc_resource_new(request_type, callctx); @@ -1285,8 +1356,7 @@ uc_ubus_handle_reply_common(struct ubus_context *ctx, ucv_object_to_blob(res, &buf); ubus_send_reply(ctx, &callctx->req, buf.head); - ubus_complete_deferred_request(ctx, &callctx->req, UBUS_STATUS_OK); - callctx->replied = true; + uc_ubus_request_finish_common(callctx, UBUS_STATUS_OK); } /* If neither a deferred ubus request, nor a plain object were @@ -1302,8 +1372,7 @@ uc_ubus_handle_reply_common(struct ubus_context *ctx, rv = UBUS_STATUS_UNKNOWN_ERROR; } - ubus_complete_deferred_request(ctx, &callctx->req, rv); - callctx->replied = true; + uc_ubus_request_finish_common(callctx, rv); } ucv_put(res); @@ -1317,15 +1386,13 @@ uc_ubus_handle_reply_common(struct ubus_context *ctx, if (rv < UBUS_STATUS_OK || rv >= __UBUS_STATUS_LAST) rv = UBUS_STATUS_UNKNOWN_ERROR; - ubus_complete_deferred_request(ctx, &callctx->req, rv); - callctx->replied = true; + uc_ubus_request_finish_common(callctx, rv); break; /* treat other exceptions as fatal and halt uloop */ default: - ubus_complete_deferred_request(ctx, &callctx->req, UBUS_STATUS_UNKNOWN_ERROR); + uc_ubus_request_finish_common(callctx, UBUS_STATUS_UNKNOWN_ERROR); uloop_end(); - callctx->replied = true; break; } @@ -2064,6 +2131,8 @@ static const uc_function_list_t request_fns[] = { { "reply", uc_ubus_request_reply }, { "error", uc_ubus_request_error }, { "defer", uc_ubus_request_defer }, + { "get_fd", uc_ubus_request_get_fd }, + { "set_fd", uc_ubus_request_set_fd }, }; static const uc_function_list_t notify_fns[] = { -- cgit v1.2.3 From 4acb960c90f77de933afeac0c6a8ba77209c68d2 Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Wed, 1 Jan 2025 17:28:23 +0100 Subject: ubus: add support for sending file descriptors via ubus.call/defer File descriptors can be passed via the named fd argument Signed-off-by: Felix Fietkau --- lib/ubus.c | 43 +++++++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 14 deletions(-) (limited to 'lib/ubus.c') diff --git a/lib/ubus.c b/lib/ubus.c index 330ef5d..2e37f36 100644 --- a/lib/ubus.c +++ b/lib/ubus.c @@ -109,7 +109,7 @@ _args_get(uc_vm_t *vm, bool named, size_t nargs, ...) if (t == UC_CFUNCTION) t = UC_CLOSURE; - if (arg && t != type) + if (arg && type && t != type) err_return(UBUS_STATUS_INVALID_ARGUMENT, "Argument %s is not %s", name, _arg_type(type)); *ptr = arg; @@ -292,14 +292,14 @@ _uc_reg_clear(uc_vm_t *vm, const char *key, size_t idx, size_t nptrs) } -#define request_reg_add(vm, request, cb, conn) \ - _uc_reg_add(vm, "ubus.requests", 3, request, cb, conn) +#define request_reg_add(vm, request, cb, conn, fd) \ + _uc_reg_add(vm, "ubus.requests", 4, request, cb, conn, fd) #define request_reg_get(vm, idx, request, cb) \ _uc_reg_get(vm, "ubus.requests", idx, 2, request, cb) #define request_reg_clear(vm, idx) \ - _uc_reg_clear(vm, "ubus.requests", idx, 3) + _uc_reg_clear(vm, "ubus.requests", idx, 4) #define object_reg_add(vm, obj, msg, cb) \ @@ -725,10 +725,11 @@ get_fd(uc_vm_t *vm, uc_value_t *val) static uc_value_t * uc_ubus_call(uc_vm_t *vm, size_t nargs) { - uc_value_t *objname, *funname, *funargs, *mret = NULL; + uc_value_t *objname, *funname, *funargs, *fd, *mret = NULL; uc_ubus_call_res_t res = { 0 }; uc_ubus_connection_t *c; enum ubus_msg_status rv; + int fd_val = -1; uint32_t id; conn_get(vm, &c); @@ -737,12 +738,18 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) "object", UC_STRING, REQUIRED, &objname, "method", UC_STRING, REQUIRED, &funname, "data", UC_OBJECT, OPTIONAL, &funargs, - "multiple_return", UC_BOOLEAN, OPTIONAL, &mret); + "multiple_return", UC_BOOLEAN, OPTIONAL, &mret, + "fd", 0, NAMED, &fd); blob_buf_init(&c->buf, 0); if (funargs) ucv_object_to_blob(funargs, &c->buf); + if (fd) { + fd_val = get_fd(vm, fd); + if (fd_val < 0) + err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument"); + } rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id); @@ -752,8 +759,8 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) res.mret = ucv_is_truish(mret); - rv = ubus_invoke(&c->ctx, id, ucv_string_get(funname), c->buf.head, - uc_ubus_call_cb, &res, c->timeout * 1000); + rv = ubus_invoke_fd(&c->ctx, id, ucv_string_get(funname), c->buf.head, + uc_ubus_call_cb, &res, c->timeout * 1000, fd_val); if (rv != UBUS_STATUS_OK) err_return(rv, "Failed to invoke function '%s' on object '%s'", @@ -765,12 +772,13 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) static uc_value_t * uc_ubus_defer(uc_vm_t *vm, size_t nargs) { - uc_value_t *objname, *funname, *funargs, *replycb, *conn, *res = NULL; + uc_value_t *objname, *funname, *funargs, *replycb, *fd, *conn, *res = NULL; uc_ubus_deferred_t *defer; uc_ubus_connection_t *c; enum ubus_msg_status rv; uc_callframe_t *frame; uint32_t id; + int fd_val = -1; conn_get(vm, &c); @@ -778,13 +786,20 @@ uc_ubus_defer(uc_vm_t *vm, size_t nargs) "object", UC_STRING, REQUIRED, &objname, "method", UC_STRING, REQUIRED, &funname, "data", UC_OBJECT, OPTIONAL, &funargs, - "cb", UC_CLOSURE, OPTIONAL, &replycb); + "cb", UC_CLOSURE, OPTIONAL, &replycb, + "fd", 0, NAMED, &fd); blob_buf_init(&c->buf, 0); if (funargs) ucv_object_to_blob(funargs, &c->buf); + if (fd) { + fd_val = get_fd(vm, fd); + if (fd_val < 0) + err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument"); + } + rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id); if (rv != UBUS_STATUS_OK) @@ -793,8 +808,8 @@ uc_ubus_defer(uc_vm_t *vm, size_t nargs) defer = xalloc(sizeof(*defer)); - rv = ubus_invoke_async(&c->ctx, id, ucv_string_get(funname), - c->buf.head, &defer->request); + rv = ubus_invoke_async_fd(&c->ctx, id, ucv_string_get(funname), + c->buf.head, &defer->request, fd_val); if (rv == UBUS_STATUS_OK) { defer->vm = vm; @@ -811,7 +826,7 @@ uc_ubus_defer(uc_vm_t *vm, size_t nargs) frame = uc_vector_last(&vm->callframes); conn = frame ? frame->ctx : NULL; - defer->registry_index = request_reg_add(vm, ucv_get(res), ucv_get(replycb), ucv_get(conn)); + defer->registry_index = request_reg_add(vm, ucv_get(res), ucv_get(replycb), ucv_get(conn), ucv_get(fd)); if (!uc_ubus_have_uloop()) { have_own_uloop = true; @@ -1346,7 +1361,7 @@ uc_ubus_handle_reply_common(struct ubus_context *ctx, /* Add wrapped request context into registry to prevent GC'ing * until reply or timeout occurred */ - callctx->registry_index = request_reg_add(vm, ucv_get(reqobj), NULL, NULL); + callctx->registry_index = request_reg_add(vm, ucv_get(reqobj), NULL, NULL, NULL); } /* Otherwise, when the function returned an object, treat it as -- cgit v1.2.3 From 22b9523565a5b59d98e8ba2fa578fa5d994ecfdb Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Wed, 1 Jan 2025 15:34:30 +0100 Subject: ubus: add support for receiving file descriptors in call and defer Add a named parameter fd_cb, which is called when the callee returned a file descriptor. Signed-off-by: Felix Fietkau --- lib/ubus.c | 70 +++++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 14 deletions(-) (limited to 'lib/ubus.c') diff --git a/lib/ubus.c b/lib/ubus.c index 2e37f36..20e25d2 100644 --- a/lib/ubus.c +++ b/lib/ubus.c @@ -292,14 +292,14 @@ _uc_reg_clear(uc_vm_t *vm, const char *key, size_t idx, size_t nptrs) } -#define request_reg_add(vm, request, cb, conn, fd) \ - _uc_reg_add(vm, "ubus.requests", 4, request, cb, conn, fd) +#define request_reg_add(vm, request, cb, fdcb, conn, fd) \ + _uc_reg_add(vm, "ubus.requests", 5, request, cb, fdcb, conn, fd) -#define request_reg_get(vm, idx, request, cb) \ - _uc_reg_get(vm, "ubus.requests", idx, 2, request, cb) +#define request_reg_get(vm, idx, request, cb, fdcb) \ + _uc_reg_get(vm, "ubus.requests", idx, 3, request, cb, fdcb) #define request_reg_clear(vm, idx) \ - _uc_reg_clear(vm, "ubus.requests", idx, 4) + _uc_reg_clear(vm, "ubus.requests", idx, 5) #define object_reg_add(vm, obj, msg, cb) \ @@ -619,7 +619,7 @@ uc_ubus_call_user_cb(uc_ubus_deferred_t *defer, int ret, uc_value_t *reply) { uc_value_t *this, *func; - request_reg_get(defer->vm, defer->registry_index, &this, &func); + request_reg_get(defer->vm, defer->registry_index, &this, &func, NULL); if (ucv_is_callable(func)) { uc_vm_stack_push(defer->vm, ucv_get(this)); @@ -648,6 +648,28 @@ uc_ubus_call_data_cb(struct ubus_request *req, int type, struct blob_attr *msg) defer->response = blob_array_to_ucv(defer->vm, blob_data(msg), blob_len(msg), true); } +static void +uc_ubus_call_fd_cb(struct ubus_request *req, int fd) +{ + uc_ubus_deferred_t *defer = container_of(req, uc_ubus_deferred_t, request); + uc_value_t *this, *func; + + if (defer->complete) + return; + + request_reg_get(defer->vm, defer->registry_index, &this, NULL, &func); + if (ucv_is_callable(func)) { + uc_vm_stack_push(defer->vm, ucv_get(this)); + uc_vm_stack_push(defer->vm, ucv_get(func)); + uc_vm_stack_push(defer->vm, ucv_int64_new(fd)); + + if (uc_vm_call(defer->vm, true, 1) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(defer->vm)); + else + uloop_end(); + } +} + static void uc_ubus_call_done_cb(struct ubus_request *req, int ret) { @@ -725,8 +747,9 @@ get_fd(uc_vm_t *vm, uc_value_t *val) static uc_value_t * uc_ubus_call(uc_vm_t *vm, size_t nargs) { - uc_value_t *objname, *funname, *funargs, *fd, *mret = NULL; + uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL; uc_ubus_call_res_t res = { 0 }; + uc_ubus_deferred_t defer = {}; uc_ubus_connection_t *c; enum ubus_msg_status rv; int fd_val = -1; @@ -739,7 +762,8 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) "method", UC_STRING, REQUIRED, &funname, "data", UC_OBJECT, OPTIONAL, &funargs, "multiple_return", UC_BOOLEAN, OPTIONAL, &mret, - "fd", 0, NAMED, &fd); + "fd", 0, NAMED, &fd, + "fd_cb", UC_CLOSURE, NAMED, &fdcb); blob_buf_init(&c->buf, 0); @@ -759,8 +783,22 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) res.mret = ucv_is_truish(mret); - rv = ubus_invoke_fd(&c->ctx, id, ucv_string_get(funname), c->buf.head, - uc_ubus_call_cb, &res, c->timeout * 1000, fd_val); + rv = ubus_invoke_async_fd(&c->ctx, id, ucv_string_get(funname), + c->buf.head, &defer.request, fd_val); + defer.vm = vm; + defer.ctx = &c->ctx; + defer.request.data_cb = uc_ubus_call_cb; + defer.request.priv = &res; + if (ucv_is_callable(fdcb)) { + defer.request.fd_cb = uc_ubus_call_fd_cb; + defer.registry_index = request_reg_add(vm, NULL, NULL, ucv_get(fdcb), NULL, NULL); + } + + if (rv == UBUS_STATUS_OK) + rv = ubus_complete_request(&c->ctx, &defer.request, c->timeout * 1000); + + if (defer.request.fd_cb) + request_reg_clear(vm, defer.registry_index); if (rv != UBUS_STATUS_OK) err_return(rv, "Failed to invoke function '%s' on object '%s'", @@ -772,7 +810,7 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) static uc_value_t * uc_ubus_defer(uc_vm_t *vm, size_t nargs) { - uc_value_t *objname, *funname, *funargs, *replycb, *fd, *conn, *res = NULL; + uc_value_t *objname, *funname, *funargs, *replycb, *fd, *fdcb, *conn, *res = NULL; uc_ubus_deferred_t *defer; uc_ubus_connection_t *c; enum ubus_msg_status rv; @@ -787,7 +825,8 @@ uc_ubus_defer(uc_vm_t *vm, size_t nargs) "method", UC_STRING, REQUIRED, &funname, "data", UC_OBJECT, OPTIONAL, &funargs, "cb", UC_CLOSURE, OPTIONAL, &replycb, - "fd", 0, NAMED, &fd); + "fd", 0, NAMED, &fd, + "fd_cb", UC_CLOSURE, NAMED, &fdcb); blob_buf_init(&c->buf, 0); @@ -816,7 +855,10 @@ uc_ubus_defer(uc_vm_t *vm, size_t nargs) defer->ctx = &c->ctx; defer->request.data_cb = uc_ubus_call_data_cb; + if (ucv_is_callable(fdcb)) + defer->request.fd_cb = uc_ubus_call_fd_cb; defer->request.complete_cb = uc_ubus_call_done_cb; + ubus_complete_request_async(&c->ctx, &defer->request); defer->timeout.cb = uc_ubus_call_timeout_cb; @@ -826,7 +868,7 @@ uc_ubus_defer(uc_vm_t *vm, size_t nargs) frame = uc_vector_last(&vm->callframes); conn = frame ? frame->ctx : NULL; - defer->registry_index = request_reg_add(vm, ucv_get(res), ucv_get(replycb), ucv_get(conn), ucv_get(fd)); + defer->registry_index = request_reg_add(vm, ucv_get(res), ucv_get(replycb), ucv_get(fdcb), ucv_get(conn), ucv_get(fd)); if (!uc_ubus_have_uloop()) { have_own_uloop = true; @@ -1361,7 +1403,7 @@ uc_ubus_handle_reply_common(struct ubus_context *ctx, /* Add wrapped request context into registry to prevent GC'ing * until reply or timeout occurred */ - callctx->registry_index = request_reg_add(vm, ucv_get(reqobj), NULL, NULL, NULL); + callctx->registry_index = request_reg_add(vm, ucv_get(reqobj), NULL, NULL, NULL, NULL); } /* Otherwise, when the function returned an object, treat it as -- cgit v1.2.3 From c0d1654a450b62f5767ebd28352141f33c7ef38e Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Thu, 2 Jan 2025 13:41:30 +0100 Subject: ubus: add support for channels A channel is a context that is directly connected to a peer instead of going through ubusd. The use of this context is limited to calling ubus_invoke and receiving requests not bound to any registered object. The main use case for this is having a more stateful interaction between processes. A service using channels can attach metadata to each individual channel and keep track of its lifetime, which is not possible through the regular subscribe/notify mechanism. Using channels also improves request latency, since messages are passed directly between processes. A channel can either be opened by fd using ubus.open_channel(), or created from within a request by using req.new_channel(). When calling req.new_channel, the fd for the other side of the channel is automatically passed to the remote caller. Signed-off-by: Felix Fietkau --- CMakeLists.txt | 6 +- lib/ubus.c | 256 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 235 insertions(+), 27 deletions(-) (limited to 'lib/ubus.c') diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e6025d..858557e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -172,7 +172,7 @@ if(UBUS_SUPPORT) set_target_properties(ubus_lib PROPERTIES OUTPUT_NAME ubus PREFIX "") target_link_options(ubus_lib PRIVATE ${UCODE_MODULE_LINK_OPTIONS}) target_link_libraries(ubus_lib ${libubus} ${libblobmsg_json}) - list(APPEND CMAKE_REQUIRED_LIBRARIES ${libubox}) + list(APPEND CMAKE_REQUIRED_LIBRARIES ${libubox} ${libubus}) file(WRITE "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test.c" " #include int main() { return UBUS_STATUS_NO_MEMORY; } @@ -182,9 +182,13 @@ if(UBUS_SUPPORT) "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test.c") check_symbol_exists(uloop_fd_set_cb "libubox/uloop.h" FD_SET_CB_EXISTS) check_function_exists(uloop_timeout_remaining64 REMAINING64_FUNCTION_EXISTS) + check_function_exists(ubus_channel_connect HAVE_CHANNEL_SUPPORT) if(REMAINING64_FUNCTION_EXISTS) target_compile_definitions(ubus_lib PUBLIC HAVE_ULOOP_TIMEOUT_REMAINING64) endif() + if(HAVE_CHANNEL_SUPPORT) + target_compile_definitions(ubus_lib PUBLIC HAVE_UBUS_CHANNEL_SUPPORT) + endif() if(HAVE_NEW_UBUS_STATUS_CODES) add_definitions(-DHAVE_NEW_UBUS_STATUS_CODES) endif() diff --git a/lib/ubus.c b/lib/ubus.c index 20e25d2..c177a91 100644 --- a/lib/ubus.c +++ b/lib/ubus.c @@ -130,6 +130,7 @@ static uc_resource_type_t *notify_type; static uc_resource_type_t *object_type; static uc_resource_type_t *defer_type; static uc_resource_type_t *conn_type; +static uc_resource_type_t *chan_type; static uint64_t n_cb_active; static bool have_own_uloop; @@ -140,6 +141,9 @@ typedef struct { struct ubus_context ctx; struct blob_buf buf; int timeout; + + uc_vm_t *vm; + int registry_index; } uc_ubus_connection_t; typedef struct { @@ -292,6 +296,16 @@ _uc_reg_clear(uc_vm_t *vm, const char *key, size_t idx, size_t nptrs) } +#define connection_reg_add(vm, conn, cb, disconnect_cb, fd) \ + _uc_reg_add(vm, "ubus.connections", 4, conn, cb, disconnect_cb, fd) + +#define connection_reg_get(vm, idx, conn, cb, disconnect_cb) \ + _uc_reg_get(vm, "ubus.connections", idx, 3, conn, cb, disconnect_cb) + +#define connection_reg_clear(vm, idx) \ + _uc_reg_clear(vm, "ubus.connections", idx, 4) + + #define request_reg_add(vm, request, cb, fdcb, conn, fd) \ _uc_reg_add(vm, "ubus.requests", 5, request, cb, fdcb, conn, fd) @@ -512,6 +526,7 @@ uc_ubus_connect(uc_vm_t *vm, size_t nargs) "timeout", UC_INTEGER, true, &timeout); c = xalloc(sizeof(*c)); + c->registry_index = -1; c->timeout = timeout ? ucv_int64_get(timeout) : 30; if (ubus_connect_ctx(&c->ctx, socket ? ucv_string_get(socket) : NULL)) { @@ -555,6 +570,8 @@ _conn_get(uc_vm_t *vm, uc_ubus_connection_t **conn) { uc_ubus_connection_t *c = uc_fn_thisval("ubus.connection"); + if (!c) + c = uc_fn_thisval("ubus.channel"); if (!c) err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid connection context"); @@ -744,26 +761,14 @@ get_fd(uc_vm_t *vm, uc_value_t *val) return (int)n; } -static uc_value_t * -uc_ubus_call(uc_vm_t *vm, size_t nargs) +static int +uc_ubus_call_common(uc_vm_t *vm, uc_ubus_connection_t *c, uc_ubus_call_res_t *res, + uint32_t id, uc_value_t *funname, uc_value_t *funargs, + uc_value_t *fd, uc_value_t *fdcb, uc_value_t *mret) { - uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL; - uc_ubus_call_res_t res = { 0 }; uc_ubus_deferred_t defer = {}; - uc_ubus_connection_t *c; enum ubus_msg_status rv; int fd_val = -1; - uint32_t id; - - conn_get(vm, &c); - - args_get_named(vm, nargs, - "object", UC_STRING, REQUIRED, &objname, - "method", UC_STRING, REQUIRED, &funname, - "data", UC_OBJECT, OPTIONAL, &funargs, - "multiple_return", UC_BOOLEAN, OPTIONAL, &mret, - "fd", 0, NAMED, &fd, - "fd_cb", UC_CLOSURE, NAMED, &fdcb); blob_buf_init(&c->buf, 0); @@ -771,24 +776,21 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) ucv_object_to_blob(funargs, &c->buf); if (fd) { fd_val = get_fd(vm, fd); - if (fd_val < 0) - err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument"); + if (fd_val < 0) { + rv = UBUS_STATUS_INVALID_ARGUMENT; + set_error(rv, "Invalid file descriptor argument"); + return rv; + } } - rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id); - - if (rv != UBUS_STATUS_OK) - err_return(rv, "Failed to resolve object name '%s'", - ucv_string_get(objname)); - - res.mret = ucv_is_truish(mret); + res->mret = ucv_is_truish(mret); rv = ubus_invoke_async_fd(&c->ctx, id, ucv_string_get(funname), c->buf.head, &defer.request, fd_val); defer.vm = vm; defer.ctx = &c->ctx; defer.request.data_cb = uc_ubus_call_cb; - defer.request.priv = &res; + defer.request.priv = res; if (ucv_is_callable(fdcb)) { defer.request.fd_cb = uc_ubus_call_fd_cb; defer.registry_index = request_reg_add(vm, NULL, NULL, ucv_get(fdcb), NULL, NULL); @@ -800,6 +802,34 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) if (defer.request.fd_cb) request_reg_clear(vm, defer.registry_index); + return rv; +} + +static uc_value_t * +uc_ubus_call(uc_vm_t *vm, size_t nargs) +{ + uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL; + uc_ubus_call_res_t res = { 0 }; + uc_ubus_connection_t *c; + enum ubus_msg_status rv; + uint32_t id; + + args_get_named(vm, nargs, + "object", UC_STRING, REQUIRED, &objname, + "method", UC_STRING, REQUIRED, &funname, + "data", UC_OBJECT, OPTIONAL, &funargs, + "multiple_return", UC_BOOLEAN, OPTIONAL, &mret, + "fd", 0, NAMED, &fd, + "fd_cb", UC_CLOSURE, NAMED, &fdcb); + + conn_get(vm, &c); + + rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id); + if (rv != UBUS_STATUS_OK) + err_return(rv, "Failed to resolve object name '%s'", + ucv_string_get(objname)); + + rv = uc_ubus_call_common(vm, c, &res, id, funname, funargs, fd, fdcb, mret); if (rv != UBUS_STATUS_OK) err_return(rv, "Failed to invoke function '%s' on object '%s'", ucv_string_get(funname), ucv_string_get(objname)); @@ -807,6 +837,31 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) ok_return(res.res); } +static uc_value_t * +uc_ubus_chan_request(uc_vm_t *vm, size_t nargs) +{ + uc_value_t *funname, *funargs, *fd, *fdcb, *mret = NULL; + uc_ubus_call_res_t res = { 0 }; + uc_ubus_connection_t *c; + enum ubus_msg_status rv; + + args_get_named(vm, nargs, + "method", UC_STRING, REQUIRED, &funname, + "data", UC_OBJECT, OPTIONAL, &funargs, + "multiple_return", UC_BOOLEAN, OPTIONAL, &mret, + "fd", 0, NAMED, &fd, + "fd_cb", UC_CLOSURE, NAMED, &fdcb); + + conn_get(vm, &c); + + rv = uc_ubus_call_common(vm, c, &res, 0, funname, funargs, fd, fdcb, mret); + if (rv != UBUS_STATUS_OK) + err_return(rv, "Failed to send request '%s' on channel", + ucv_string_get(funname)); + + ok_return(res.res); +} + static uc_value_t * uc_ubus_defer(uc_vm_t *vm, size_t nargs) { @@ -2153,10 +2208,149 @@ uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs) ok_return(ucv_boolean_new(true)); } +/* + * channel related methods + * -------------------------------------------------------------------------- + */ + +#ifdef HAVE_UBUS_CHANNEL_SUPPORT +static int +uc_ubus_channel_req_cb(struct ubus_context *ctx, struct ubus_object *obj, + struct ubus_request_data *req, const char *method, + struct blob_attr *msg) +{ + uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx); + uc_value_t *this, *func, *args, *reqproto; + + connection_reg_get(c->vm, c->registry_index, &this, &func, NULL); + if (!ucv_is_callable(func)) + return UBUS_STATUS_METHOD_NOT_FOUND; + + args = blob_array_to_ucv(c->vm, blob_data(msg), blob_len(msg), true); + reqproto = ucv_object_new(c->vm); + ucv_object_add(reqproto, "args", ucv_get(args)); + if (method) + ucv_object_add(reqproto, "type", ucv_get(ucv_string_new(method))); + + return uc_ubus_handle_reply_common(ctx, req, c->vm, this, func, reqproto); +} + +static void +uc_ubus_channel_disconnect_cb(struct ubus_context *ctx) +{ + uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx); + uc_value_t *this, *func; + + connection_reg_get(c->vm, c->registry_index, &this, NULL, &func); + if (ucv_is_callable(func)) { + uc_vm_stack_push(c->vm, ucv_get(this)); + uc_vm_stack_push(c->vm, ucv_get(func)); + + if (uc_vm_call(c->vm, true, 0) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(c->vm)); + else + uloop_end(); + } + + blob_buf_free(&c->buf); + if (c->registry_index >= 0) + connection_reg_clear(c->vm, c->registry_index); + if (c->ctx.sock.fd >= 0) { + ubus_shutdown(&c->ctx); + c->ctx.sock.fd = -1; + } +} + +static uc_value_t * +uc_ubus_channel_add(uc_vm_t *vm, uc_ubus_connection_t *c, uc_value_t *cb, + uc_value_t *disconnect_cb, uc_value_t *fd) +{ + uc_value_t *chan; + + c->vm = vm; + if (c->timeout < 0) + c->timeout = 30; + + chan = uc_resource_new(chan_type, c); + c->registry_index = connection_reg_add(vm, ucv_get(chan), ucv_get(cb), ucv_get(disconnect_cb), ucv_get(fd)); + c->ctx.connection_lost = uc_ubus_channel_disconnect_cb; + ubus_add_uloop(&c->ctx); + + ok_return(chan); +} +#endif + +static uc_value_t * +uc_ubus_request_new_channel(uc_vm_t *vm, size_t nargs) +{ +#ifdef HAVE_UBUS_CHANNEL_SUPPORT + uc_ubus_request_t *callctx = uc_fn_thisval("ubus.request"); + uc_value_t *cb, *disconnect_cb, *timeout; + uc_ubus_connection_t *c; + int fd; + + if (!callctx) + err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid call context"); + + args_get(vm, nargs, + "cb", UC_CLOSURE, true, &cb, + "disconnect_cb", UC_CLOSURE, true, &disconnect_cb, + "timeout", UC_INTEGER, true, &timeout); + + c = xalloc(sizeof(*c)); + c->timeout = timeout ? ucv_int64_get(timeout) : 30; + + if (ubus_channel_create(&c->ctx, &fd, cb ? uc_ubus_channel_req_cb : NULL)) { + free(c); + err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel"); + } + + ubus_request_set_fd(callctx->ctx, &callctx->req, fd); + + return uc_ubus_channel_add(vm, c, cb, disconnect_cb, NULL); +#else + err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support"); +#endif +} + + +static uc_value_t * +uc_ubus_channel_connect(uc_vm_t *vm, size_t nargs) +{ +#ifdef HAVE_UBUS_CHANNEL_SUPPORT + uc_value_t *fd, *cb, *disconnect_cb, *timeout; + uc_ubus_connection_t *c; + int fd_val; + + args_get(vm, nargs, + "fd", UC_NULL, false, &fd, + "cb", UC_CLOSURE, true, &cb, + "disconnect_cb", UC_CLOSURE, true, &disconnect_cb, + "timeout", UC_INTEGER, true, &timeout); + + fd_val = get_fd(vm, fd); + if (fd_val < 0) + err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument"); + + c = xalloc(sizeof(*c)); + c->timeout = timeout ? ucv_int64_get(timeout) : 30; + + if (ubus_channel_connect(&c->ctx, fd_val, cb ? uc_ubus_channel_req_cb : NULL)) { + free(c); + err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel"); + } + + return uc_ubus_channel_add(vm, c, cb, disconnect_cb, fd); +#else + err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support"); +#endif +} + static const uc_function_list_t global_fns[] = { { "error", uc_ubus_error }, { "connect", uc_ubus_connect }, + { "open_channel", uc_ubus_channel_connect }, }; static const uc_function_list_t conn_fns[] = { @@ -2172,6 +2366,12 @@ static const uc_function_list_t conn_fns[] = { { "disconnect", uc_ubus_disconnect }, }; +static const uc_function_list_t chan_fns[] = { + { "request", uc_ubus_chan_request }, + { "error", uc_ubus_error }, + { "disconnect", uc_ubus_disconnect }, +}; + static const uc_function_list_t defer_fns[] = { { "await", uc_ubus_defer_await }, { "completed", uc_ubus_defer_completed }, @@ -2190,6 +2390,7 @@ static const uc_function_list_t request_fns[] = { { "defer", uc_ubus_request_defer }, { "get_fd", uc_ubus_request_get_fd }, { "set_fd", uc_ubus_request_set_fd }, + { "new_channel", uc_ubus_request_new_channel }, }; static const uc_function_list_t notify_fns[] = { @@ -2214,6 +2415,8 @@ static void free_connection(void *ud) { if (conn->ctx.sock.fd >= 0) ubus_shutdown(&conn->ctx); + if (conn->registry_index >= 0) + connection_reg_clear(conn->vm, conn->registry_index); free(conn); } @@ -2292,6 +2495,7 @@ void uc_module_init(uc_vm_t *vm, uc_value_t *scope) #endif conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, free_connection); + chan_type = uc_type_declare(vm, "ubus.channel", chan_fns, free_connection); defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, free_deferred); object_type = uc_type_declare(vm, "ubus.object", object_fns, free_object); notify_type = uc_type_declare(vm, "ubus.notify", notify_fns, free_notify); -- cgit v1.2.3