summaryrefslogtreecommitdiffhomepage
path: root/lib
diff options
context:
space:
mode:
authorFelix Fietkau <nbd@nbd.name>2025-01-02 13:41:30 +0100
committerFelix Fietkau <nbd@nbd.name>2025-02-04 19:39:03 +0100
commitc0d1654a450b62f5767ebd28352141f33c7ef38e (patch)
tree50c7bc5bcd12a8a29fd26b6cd876d37c61331e83 /lib
parent22b9523565a5b59d98e8ba2fa578fa5d994ecfdb (diff)
ubus: add support for channels
A channel is a context that is directly connected to a peer instead of going through ubusd. The use of this context is limited to calling ubus_invoke and receiving requests not bound to any registered object. The main use case for this is having a more stateful interaction between processes. A service using channels can attach metadata to each individual channel and keep track of its lifetime, which is not possible through the regular subscribe/notify mechanism. Using channels also improves request latency, since messages are passed directly between processes. A channel can either be opened by fd using ubus.open_channel(), or created from within a request by using req.new_channel(). When calling req.new_channel, the fd for the other side of the channel is automatically passed to the remote caller. Signed-off-by: Felix Fietkau <nbd@nbd.name>
Diffstat (limited to 'lib')
-rw-r--r--lib/ubus.c256
1 files changed, 230 insertions, 26 deletions
diff --git a/lib/ubus.c b/lib/ubus.c
index 20e25d2..c177a91 100644
--- a/lib/ubus.c
+++ b/lib/ubus.c
@@ -130,6 +130,7 @@ static uc_resource_type_t *notify_type;
static uc_resource_type_t *object_type;
static uc_resource_type_t *defer_type;
static uc_resource_type_t *conn_type;
+static uc_resource_type_t *chan_type;
static uint64_t n_cb_active;
static bool have_own_uloop;
@@ -140,6 +141,9 @@ typedef struct {
struct ubus_context ctx;
struct blob_buf buf;
int timeout;
+
+ uc_vm_t *vm;
+ int registry_index;
} uc_ubus_connection_t;
typedef struct {
@@ -292,6 +296,16 @@ _uc_reg_clear(uc_vm_t *vm, const char *key, size_t idx, size_t nptrs)
}
+#define connection_reg_add(vm, conn, cb, disconnect_cb, fd) \
+ _uc_reg_add(vm, "ubus.connections", 4, conn, cb, disconnect_cb, fd)
+
+#define connection_reg_get(vm, idx, conn, cb, disconnect_cb) \
+ _uc_reg_get(vm, "ubus.connections", idx, 3, conn, cb, disconnect_cb)
+
+#define connection_reg_clear(vm, idx) \
+ _uc_reg_clear(vm, "ubus.connections", idx, 4)
+
+
#define request_reg_add(vm, request, cb, fdcb, conn, fd) \
_uc_reg_add(vm, "ubus.requests", 5, request, cb, fdcb, conn, fd)
@@ -512,6 +526,7 @@ uc_ubus_connect(uc_vm_t *vm, size_t nargs)
"timeout", UC_INTEGER, true, &timeout);
c = xalloc(sizeof(*c));
+ c->registry_index = -1;
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
if (ubus_connect_ctx(&c->ctx, socket ? ucv_string_get(socket) : NULL)) {
@@ -556,6 +571,8 @@ _conn_get(uc_vm_t *vm, uc_ubus_connection_t **conn)
uc_ubus_connection_t *c = uc_fn_thisval("ubus.connection");
if (!c)
+ c = uc_fn_thisval("ubus.channel");
+ if (!c)
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid connection context");
if (c->ctx.sock.fd < 0)
@@ -744,26 +761,14 @@ get_fd(uc_vm_t *vm, uc_value_t *val)
return (int)n;
}
-static uc_value_t *
-uc_ubus_call(uc_vm_t *vm, size_t nargs)
+static int
+uc_ubus_call_common(uc_vm_t *vm, uc_ubus_connection_t *c, uc_ubus_call_res_t *res,
+ uint32_t id, uc_value_t *funname, uc_value_t *funargs,
+ uc_value_t *fd, uc_value_t *fdcb, uc_value_t *mret)
{
- uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL;
- uc_ubus_call_res_t res = { 0 };
uc_ubus_deferred_t defer = {};
- uc_ubus_connection_t *c;
enum ubus_msg_status rv;
int fd_val = -1;
- uint32_t id;
-
- conn_get(vm, &c);
-
- args_get_named(vm, nargs,
- "object", UC_STRING, REQUIRED, &objname,
- "method", UC_STRING, REQUIRED, &funname,
- "data", UC_OBJECT, OPTIONAL, &funargs,
- "multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
- "fd", 0, NAMED, &fd,
- "fd_cb", UC_CLOSURE, NAMED, &fdcb);
blob_buf_init(&c->buf, 0);
@@ -771,24 +776,21 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs)
ucv_object_to_blob(funargs, &c->buf);
if (fd) {
fd_val = get_fd(vm, fd);
- if (fd_val < 0)
- err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument");
+ if (fd_val < 0) {
+ rv = UBUS_STATUS_INVALID_ARGUMENT;
+ set_error(rv, "Invalid file descriptor argument");
+ return rv;
+ }
}
- rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id);
-
- if (rv != UBUS_STATUS_OK)
- err_return(rv, "Failed to resolve object name '%s'",
- ucv_string_get(objname));
-
- res.mret = ucv_is_truish(mret);
+ res->mret = ucv_is_truish(mret);
rv = ubus_invoke_async_fd(&c->ctx, id, ucv_string_get(funname),
c->buf.head, &defer.request, fd_val);
defer.vm = vm;
defer.ctx = &c->ctx;
defer.request.data_cb = uc_ubus_call_cb;
- defer.request.priv = &res;
+ defer.request.priv = res;
if (ucv_is_callable(fdcb)) {
defer.request.fd_cb = uc_ubus_call_fd_cb;
defer.registry_index = request_reg_add(vm, NULL, NULL, ucv_get(fdcb), NULL, NULL);
@@ -800,6 +802,34 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs)
if (defer.request.fd_cb)
request_reg_clear(vm, defer.registry_index);
+ return rv;
+}
+
+static uc_value_t *
+uc_ubus_call(uc_vm_t *vm, size_t nargs)
+{
+ uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL;
+ uc_ubus_call_res_t res = { 0 };
+ uc_ubus_connection_t *c;
+ enum ubus_msg_status rv;
+ uint32_t id;
+
+ args_get_named(vm, nargs,
+ "object", UC_STRING, REQUIRED, &objname,
+ "method", UC_STRING, REQUIRED, &funname,
+ "data", UC_OBJECT, OPTIONAL, &funargs,
+ "multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
+ "fd", 0, NAMED, &fd,
+ "fd_cb", UC_CLOSURE, NAMED, &fdcb);
+
+ conn_get(vm, &c);
+
+ rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id);
+ if (rv != UBUS_STATUS_OK)
+ err_return(rv, "Failed to resolve object name '%s'",
+ ucv_string_get(objname));
+
+ rv = uc_ubus_call_common(vm, c, &res, id, funname, funargs, fd, fdcb, mret);
if (rv != UBUS_STATUS_OK)
err_return(rv, "Failed to invoke function '%s' on object '%s'",
ucv_string_get(funname), ucv_string_get(objname));
@@ -808,6 +838,31 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs)
}
static uc_value_t *
+uc_ubus_chan_request(uc_vm_t *vm, size_t nargs)
+{
+ uc_value_t *funname, *funargs, *fd, *fdcb, *mret = NULL;
+ uc_ubus_call_res_t res = { 0 };
+ uc_ubus_connection_t *c;
+ enum ubus_msg_status rv;
+
+ args_get_named(vm, nargs,
+ "method", UC_STRING, REQUIRED, &funname,
+ "data", UC_OBJECT, OPTIONAL, &funargs,
+ "multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
+ "fd", 0, NAMED, &fd,
+ "fd_cb", UC_CLOSURE, NAMED, &fdcb);
+
+ conn_get(vm, &c);
+
+ rv = uc_ubus_call_common(vm, c, &res, 0, funname, funargs, fd, fdcb, mret);
+ if (rv != UBUS_STATUS_OK)
+ err_return(rv, "Failed to send request '%s' on channel",
+ ucv_string_get(funname));
+
+ ok_return(res.res);
+}
+
+static uc_value_t *
uc_ubus_defer(uc_vm_t *vm, size_t nargs)
{
uc_value_t *objname, *funname, *funargs, *replycb, *fd, *fdcb, *conn, *res = NULL;
@@ -2153,10 +2208,149 @@ uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs)
ok_return(ucv_boolean_new(true));
}
+/*
+ * channel related methods
+ * --------------------------------------------------------------------------
+ */
+
+#ifdef HAVE_UBUS_CHANNEL_SUPPORT
+static int
+uc_ubus_channel_req_cb(struct ubus_context *ctx, struct ubus_object *obj,
+ struct ubus_request_data *req, const char *method,
+ struct blob_attr *msg)
+{
+ uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx);
+ uc_value_t *this, *func, *args, *reqproto;
+
+ connection_reg_get(c->vm, c->registry_index, &this, &func, NULL);
+ if (!ucv_is_callable(func))
+ return UBUS_STATUS_METHOD_NOT_FOUND;
+
+ args = blob_array_to_ucv(c->vm, blob_data(msg), blob_len(msg), true);
+ reqproto = ucv_object_new(c->vm);
+ ucv_object_add(reqproto, "args", ucv_get(args));
+ if (method)
+ ucv_object_add(reqproto, "type", ucv_get(ucv_string_new(method)));
+
+ return uc_ubus_handle_reply_common(ctx, req, c->vm, this, func, reqproto);
+}
+
+static void
+uc_ubus_channel_disconnect_cb(struct ubus_context *ctx)
+{
+ uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx);
+ uc_value_t *this, *func;
+
+ connection_reg_get(c->vm, c->registry_index, &this, NULL, &func);
+ if (ucv_is_callable(func)) {
+ uc_vm_stack_push(c->vm, ucv_get(this));
+ uc_vm_stack_push(c->vm, ucv_get(func));
+
+ if (uc_vm_call(c->vm, true, 0) == EXCEPTION_NONE)
+ ucv_put(uc_vm_stack_pop(c->vm));
+ else
+ uloop_end();
+ }
+
+ blob_buf_free(&c->buf);
+ if (c->registry_index >= 0)
+ connection_reg_clear(c->vm, c->registry_index);
+ if (c->ctx.sock.fd >= 0) {
+ ubus_shutdown(&c->ctx);
+ c->ctx.sock.fd = -1;
+ }
+}
+
+static uc_value_t *
+uc_ubus_channel_add(uc_vm_t *vm, uc_ubus_connection_t *c, uc_value_t *cb,
+ uc_value_t *disconnect_cb, uc_value_t *fd)
+{
+ uc_value_t *chan;
+
+ c->vm = vm;
+ if (c->timeout < 0)
+ c->timeout = 30;
+
+ chan = uc_resource_new(chan_type, c);
+ c->registry_index = connection_reg_add(vm, ucv_get(chan), ucv_get(cb), ucv_get(disconnect_cb), ucv_get(fd));
+ c->ctx.connection_lost = uc_ubus_channel_disconnect_cb;
+ ubus_add_uloop(&c->ctx);
+
+ ok_return(chan);
+}
+#endif
+
+static uc_value_t *
+uc_ubus_request_new_channel(uc_vm_t *vm, size_t nargs)
+{
+#ifdef HAVE_UBUS_CHANNEL_SUPPORT
+ uc_ubus_request_t *callctx = uc_fn_thisval("ubus.request");
+ uc_value_t *cb, *disconnect_cb, *timeout;
+ uc_ubus_connection_t *c;
+ int fd;
+
+ if (!callctx)
+ err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid call context");
+
+ args_get(vm, nargs,
+ "cb", UC_CLOSURE, true, &cb,
+ "disconnect_cb", UC_CLOSURE, true, &disconnect_cb,
+ "timeout", UC_INTEGER, true, &timeout);
+
+ c = xalloc(sizeof(*c));
+ c->timeout = timeout ? ucv_int64_get(timeout) : 30;
+
+ if (ubus_channel_create(&c->ctx, &fd, cb ? uc_ubus_channel_req_cb : NULL)) {
+ free(c);
+ err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel");
+ }
+
+ ubus_request_set_fd(callctx->ctx, &callctx->req, fd);
+
+ return uc_ubus_channel_add(vm, c, cb, disconnect_cb, NULL);
+#else
+ err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support");
+#endif
+}
+
+
+static uc_value_t *
+uc_ubus_channel_connect(uc_vm_t *vm, size_t nargs)
+{
+#ifdef HAVE_UBUS_CHANNEL_SUPPORT
+ uc_value_t *fd, *cb, *disconnect_cb, *timeout;
+ uc_ubus_connection_t *c;
+ int fd_val;
+
+ args_get(vm, nargs,
+ "fd", UC_NULL, false, &fd,
+ "cb", UC_CLOSURE, true, &cb,
+ "disconnect_cb", UC_CLOSURE, true, &disconnect_cb,
+ "timeout", UC_INTEGER, true, &timeout);
+
+ fd_val = get_fd(vm, fd);
+ if (fd_val < 0)
+ err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument");
+
+ c = xalloc(sizeof(*c));
+ c->timeout = timeout ? ucv_int64_get(timeout) : 30;
+
+ if (ubus_channel_connect(&c->ctx, fd_val, cb ? uc_ubus_channel_req_cb : NULL)) {
+ free(c);
+ err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel");
+ }
+
+ return uc_ubus_channel_add(vm, c, cb, disconnect_cb, fd);
+#else
+ err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support");
+#endif
+}
+
static const uc_function_list_t global_fns[] = {
{ "error", uc_ubus_error },
{ "connect", uc_ubus_connect },
+ { "open_channel", uc_ubus_channel_connect },
};
static const uc_function_list_t conn_fns[] = {
@@ -2172,6 +2366,12 @@ static const uc_function_list_t conn_fns[] = {
{ "disconnect", uc_ubus_disconnect },
};
+static const uc_function_list_t chan_fns[] = {
+ { "request", uc_ubus_chan_request },
+ { "error", uc_ubus_error },
+ { "disconnect", uc_ubus_disconnect },
+};
+
static const uc_function_list_t defer_fns[] = {
{ "await", uc_ubus_defer_await },
{ "completed", uc_ubus_defer_completed },
@@ -2190,6 +2390,7 @@ static const uc_function_list_t request_fns[] = {
{ "defer", uc_ubus_request_defer },
{ "get_fd", uc_ubus_request_get_fd },
{ "set_fd", uc_ubus_request_set_fd },
+ { "new_channel", uc_ubus_request_new_channel },
};
static const uc_function_list_t notify_fns[] = {
@@ -2214,6 +2415,8 @@ static void free_connection(void *ud) {
if (conn->ctx.sock.fd >= 0)
ubus_shutdown(&conn->ctx);
+ if (conn->registry_index >= 0)
+ connection_reg_clear(conn->vm, conn->registry_index);
free(conn);
}
@@ -2292,6 +2495,7 @@ void uc_module_init(uc_vm_t *vm, uc_value_t *scope)
#endif
conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, free_connection);
+ chan_type = uc_type_declare(vm, "ubus.channel", chan_fns, free_connection);
defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, free_deferred);
object_type = uc_type_declare(vm, "ubus.object", object_fns, free_object);
notify_type = uc_type_declare(vm, "ubus.notify", notify_fns, free_notify);