diff options
author | Felix Fietkau <nbd@nbd.name> | 2025-01-02 13:41:30 +0100 |
---|---|---|
committer | Felix Fietkau <nbd@nbd.name> | 2025-02-04 19:39:03 +0100 |
commit | c0d1654a450b62f5767ebd28352141f33c7ef38e (patch) | |
tree | 50c7bc5bcd12a8a29fd26b6cd876d37c61331e83 /lib | |
parent | 22b9523565a5b59d98e8ba2fa578fa5d994ecfdb (diff) |
ubus: add support for channels
A channel is a context that is directly connected to a peer instead of going
through ubusd. The use of this context is limited to calling ubus_invoke and
receiving requests not bound to any registered object.
The main use case for this is having a more stateful interaction between
processes. A service using channels can attach metadata to each individual
channel and keep track of its lifetime, which is not possible through
the regular subscribe/notify mechanism.
Using channels also improves request latency, since messages are passed
directly between processes.
A channel can either be opened by fd using ubus.open_channel(), or created
from within a request by using req.new_channel(). When calling req.new_channel,
the fd for the other side of the channel is automatically passed to the
remote caller.
Signed-off-by: Felix Fietkau <nbd@nbd.name>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/ubus.c | 256 |
1 files changed, 230 insertions, 26 deletions
@@ -130,6 +130,7 @@ static uc_resource_type_t *notify_type; static uc_resource_type_t *object_type; static uc_resource_type_t *defer_type; static uc_resource_type_t *conn_type; +static uc_resource_type_t *chan_type; static uint64_t n_cb_active; static bool have_own_uloop; @@ -140,6 +141,9 @@ typedef struct { struct ubus_context ctx; struct blob_buf buf; int timeout; + + uc_vm_t *vm; + int registry_index; } uc_ubus_connection_t; typedef struct { @@ -292,6 +296,16 @@ _uc_reg_clear(uc_vm_t *vm, const char *key, size_t idx, size_t nptrs) } +#define connection_reg_add(vm, conn, cb, disconnect_cb, fd) \ + _uc_reg_add(vm, "ubus.connections", 4, conn, cb, disconnect_cb, fd) + +#define connection_reg_get(vm, idx, conn, cb, disconnect_cb) \ + _uc_reg_get(vm, "ubus.connections", idx, 3, conn, cb, disconnect_cb) + +#define connection_reg_clear(vm, idx) \ + _uc_reg_clear(vm, "ubus.connections", idx, 4) + + #define request_reg_add(vm, request, cb, fdcb, conn, fd) \ _uc_reg_add(vm, "ubus.requests", 5, request, cb, fdcb, conn, fd) @@ -512,6 +526,7 @@ uc_ubus_connect(uc_vm_t *vm, size_t nargs) "timeout", UC_INTEGER, true, &timeout); c = xalloc(sizeof(*c)); + c->registry_index = -1; c->timeout = timeout ? ucv_int64_get(timeout) : 30; if (ubus_connect_ctx(&c->ctx, socket ? ucv_string_get(socket) : NULL)) { @@ -556,6 +571,8 @@ _conn_get(uc_vm_t *vm, uc_ubus_connection_t **conn) uc_ubus_connection_t *c = uc_fn_thisval("ubus.connection"); if (!c) + c = uc_fn_thisval("ubus.channel"); + if (!c) err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid connection context"); if (c->ctx.sock.fd < 0) @@ -744,26 +761,14 @@ get_fd(uc_vm_t *vm, uc_value_t *val) return (int)n; } -static uc_value_t * -uc_ubus_call(uc_vm_t *vm, size_t nargs) +static int +uc_ubus_call_common(uc_vm_t *vm, uc_ubus_connection_t *c, uc_ubus_call_res_t *res, + uint32_t id, uc_value_t *funname, uc_value_t *funargs, + uc_value_t *fd, uc_value_t *fdcb, uc_value_t *mret) { - uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL; - uc_ubus_call_res_t res = { 0 }; uc_ubus_deferred_t defer = {}; - uc_ubus_connection_t *c; enum ubus_msg_status rv; int fd_val = -1; - uint32_t id; - - conn_get(vm, &c); - - args_get_named(vm, nargs, - "object", UC_STRING, REQUIRED, &objname, - "method", UC_STRING, REQUIRED, &funname, - "data", UC_OBJECT, OPTIONAL, &funargs, - "multiple_return", UC_BOOLEAN, OPTIONAL, &mret, - "fd", 0, NAMED, &fd, - "fd_cb", UC_CLOSURE, NAMED, &fdcb); blob_buf_init(&c->buf, 0); @@ -771,24 +776,21 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) ucv_object_to_blob(funargs, &c->buf); if (fd) { fd_val = get_fd(vm, fd); - if (fd_val < 0) - err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument"); + if (fd_val < 0) { + rv = UBUS_STATUS_INVALID_ARGUMENT; + set_error(rv, "Invalid file descriptor argument"); + return rv; + } } - rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id); - - if (rv != UBUS_STATUS_OK) - err_return(rv, "Failed to resolve object name '%s'", - ucv_string_get(objname)); - - res.mret = ucv_is_truish(mret); + res->mret = ucv_is_truish(mret); rv = ubus_invoke_async_fd(&c->ctx, id, ucv_string_get(funname), c->buf.head, &defer.request, fd_val); defer.vm = vm; defer.ctx = &c->ctx; defer.request.data_cb = uc_ubus_call_cb; - defer.request.priv = &res; + defer.request.priv = res; if (ucv_is_callable(fdcb)) { defer.request.fd_cb = uc_ubus_call_fd_cb; defer.registry_index = request_reg_add(vm, NULL, NULL, ucv_get(fdcb), NULL, NULL); @@ -800,6 +802,34 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) if (defer.request.fd_cb) request_reg_clear(vm, defer.registry_index); + return rv; +} + +static uc_value_t * +uc_ubus_call(uc_vm_t *vm, size_t nargs) +{ + uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL; + uc_ubus_call_res_t res = { 0 }; + uc_ubus_connection_t *c; + enum ubus_msg_status rv; + uint32_t id; + + args_get_named(vm, nargs, + "object", UC_STRING, REQUIRED, &objname, + "method", UC_STRING, REQUIRED, &funname, + "data", UC_OBJECT, OPTIONAL, &funargs, + "multiple_return", UC_BOOLEAN, OPTIONAL, &mret, + "fd", 0, NAMED, &fd, + "fd_cb", UC_CLOSURE, NAMED, &fdcb); + + conn_get(vm, &c); + + rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id); + if (rv != UBUS_STATUS_OK) + err_return(rv, "Failed to resolve object name '%s'", + ucv_string_get(objname)); + + rv = uc_ubus_call_common(vm, c, &res, id, funname, funargs, fd, fdcb, mret); if (rv != UBUS_STATUS_OK) err_return(rv, "Failed to invoke function '%s' on object '%s'", ucv_string_get(funname), ucv_string_get(objname)); @@ -808,6 +838,31 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) } static uc_value_t * +uc_ubus_chan_request(uc_vm_t *vm, size_t nargs) +{ + uc_value_t *funname, *funargs, *fd, *fdcb, *mret = NULL; + uc_ubus_call_res_t res = { 0 }; + uc_ubus_connection_t *c; + enum ubus_msg_status rv; + + args_get_named(vm, nargs, + "method", UC_STRING, REQUIRED, &funname, + "data", UC_OBJECT, OPTIONAL, &funargs, + "multiple_return", UC_BOOLEAN, OPTIONAL, &mret, + "fd", 0, NAMED, &fd, + "fd_cb", UC_CLOSURE, NAMED, &fdcb); + + conn_get(vm, &c); + + rv = uc_ubus_call_common(vm, c, &res, 0, funname, funargs, fd, fdcb, mret); + if (rv != UBUS_STATUS_OK) + err_return(rv, "Failed to send request '%s' on channel", + ucv_string_get(funname)); + + ok_return(res.res); +} + +static uc_value_t * uc_ubus_defer(uc_vm_t *vm, size_t nargs) { uc_value_t *objname, *funname, *funargs, *replycb, *fd, *fdcb, *conn, *res = NULL; @@ -2153,10 +2208,149 @@ uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs) ok_return(ucv_boolean_new(true)); } +/* + * channel related methods + * -------------------------------------------------------------------------- + */ + +#ifdef HAVE_UBUS_CHANNEL_SUPPORT +static int +uc_ubus_channel_req_cb(struct ubus_context *ctx, struct ubus_object *obj, + struct ubus_request_data *req, const char *method, + struct blob_attr *msg) +{ + uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx); + uc_value_t *this, *func, *args, *reqproto; + + connection_reg_get(c->vm, c->registry_index, &this, &func, NULL); + if (!ucv_is_callable(func)) + return UBUS_STATUS_METHOD_NOT_FOUND; + + args = blob_array_to_ucv(c->vm, blob_data(msg), blob_len(msg), true); + reqproto = ucv_object_new(c->vm); + ucv_object_add(reqproto, "args", ucv_get(args)); + if (method) + ucv_object_add(reqproto, "type", ucv_get(ucv_string_new(method))); + + return uc_ubus_handle_reply_common(ctx, req, c->vm, this, func, reqproto); +} + +static void +uc_ubus_channel_disconnect_cb(struct ubus_context *ctx) +{ + uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx); + uc_value_t *this, *func; + + connection_reg_get(c->vm, c->registry_index, &this, NULL, &func); + if (ucv_is_callable(func)) { + uc_vm_stack_push(c->vm, ucv_get(this)); + uc_vm_stack_push(c->vm, ucv_get(func)); + + if (uc_vm_call(c->vm, true, 0) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(c->vm)); + else + uloop_end(); + } + + blob_buf_free(&c->buf); + if (c->registry_index >= 0) + connection_reg_clear(c->vm, c->registry_index); + if (c->ctx.sock.fd >= 0) { + ubus_shutdown(&c->ctx); + c->ctx.sock.fd = -1; + } +} + +static uc_value_t * +uc_ubus_channel_add(uc_vm_t *vm, uc_ubus_connection_t *c, uc_value_t *cb, + uc_value_t *disconnect_cb, uc_value_t *fd) +{ + uc_value_t *chan; + + c->vm = vm; + if (c->timeout < 0) + c->timeout = 30; + + chan = uc_resource_new(chan_type, c); + c->registry_index = connection_reg_add(vm, ucv_get(chan), ucv_get(cb), ucv_get(disconnect_cb), ucv_get(fd)); + c->ctx.connection_lost = uc_ubus_channel_disconnect_cb; + ubus_add_uloop(&c->ctx); + + ok_return(chan); +} +#endif + +static uc_value_t * +uc_ubus_request_new_channel(uc_vm_t *vm, size_t nargs) +{ +#ifdef HAVE_UBUS_CHANNEL_SUPPORT + uc_ubus_request_t *callctx = uc_fn_thisval("ubus.request"); + uc_value_t *cb, *disconnect_cb, *timeout; + uc_ubus_connection_t *c; + int fd; + + if (!callctx) + err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid call context"); + + args_get(vm, nargs, + "cb", UC_CLOSURE, true, &cb, + "disconnect_cb", UC_CLOSURE, true, &disconnect_cb, + "timeout", UC_INTEGER, true, &timeout); + + c = xalloc(sizeof(*c)); + c->timeout = timeout ? ucv_int64_get(timeout) : 30; + + if (ubus_channel_create(&c->ctx, &fd, cb ? uc_ubus_channel_req_cb : NULL)) { + free(c); + err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel"); + } + + ubus_request_set_fd(callctx->ctx, &callctx->req, fd); + + return uc_ubus_channel_add(vm, c, cb, disconnect_cb, NULL); +#else + err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support"); +#endif +} + + +static uc_value_t * +uc_ubus_channel_connect(uc_vm_t *vm, size_t nargs) +{ +#ifdef HAVE_UBUS_CHANNEL_SUPPORT + uc_value_t *fd, *cb, *disconnect_cb, *timeout; + uc_ubus_connection_t *c; + int fd_val; + + args_get(vm, nargs, + "fd", UC_NULL, false, &fd, + "cb", UC_CLOSURE, true, &cb, + "disconnect_cb", UC_CLOSURE, true, &disconnect_cb, + "timeout", UC_INTEGER, true, &timeout); + + fd_val = get_fd(vm, fd); + if (fd_val < 0) + err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument"); + + c = xalloc(sizeof(*c)); + c->timeout = timeout ? ucv_int64_get(timeout) : 30; + + if (ubus_channel_connect(&c->ctx, fd_val, cb ? uc_ubus_channel_req_cb : NULL)) { + free(c); + err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel"); + } + + return uc_ubus_channel_add(vm, c, cb, disconnect_cb, fd); +#else + err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support"); +#endif +} + static const uc_function_list_t global_fns[] = { { "error", uc_ubus_error }, { "connect", uc_ubus_connect }, + { "open_channel", uc_ubus_channel_connect }, }; static const uc_function_list_t conn_fns[] = { @@ -2172,6 +2366,12 @@ static const uc_function_list_t conn_fns[] = { { "disconnect", uc_ubus_disconnect }, }; +static const uc_function_list_t chan_fns[] = { + { "request", uc_ubus_chan_request }, + { "error", uc_ubus_error }, + { "disconnect", uc_ubus_disconnect }, +}; + static const uc_function_list_t defer_fns[] = { { "await", uc_ubus_defer_await }, { "completed", uc_ubus_defer_completed }, @@ -2190,6 +2390,7 @@ static const uc_function_list_t request_fns[] = { { "defer", uc_ubus_request_defer }, { "get_fd", uc_ubus_request_get_fd }, { "set_fd", uc_ubus_request_set_fd }, + { "new_channel", uc_ubus_request_new_channel }, }; static const uc_function_list_t notify_fns[] = { @@ -2214,6 +2415,8 @@ static void free_connection(void *ud) { if (conn->ctx.sock.fd >= 0) ubus_shutdown(&conn->ctx); + if (conn->registry_index >= 0) + connection_reg_clear(conn->vm, conn->registry_index); free(conn); } @@ -2292,6 +2495,7 @@ void uc_module_init(uc_vm_t *vm, uc_value_t *scope) #endif conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, free_connection); + chan_type = uc_type_declare(vm, "ubus.channel", chan_fns, free_connection); defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, free_deferred); object_type = uc_type_declare(vm, "ubus.object", object_fns, free_object); notify_type = uc_type_declare(vm, "ubus.notify", notify_fns, free_notify); |