From 66f7c00af29d2547ae541eba3ce768f5eea8d269 Mon Sep 17 00:00:00 2001 From: Jo-Philipp Wich Date: Tue, 7 Dec 2021 10:50:12 +0100 Subject: ubus: add support for async requests Introduce a new ubus.defer() method which initiates asynchroneous requests and invokes the completion callback passed as 4th argument once the reply is received. This allows multiplexing mutliple ubus requests with the same ucode VM context / the same uloop event loop. In case the ucode context is not running under an active uloop, the ubus module will spawn uloop itself once the first asynchroneous request is launched and terminate the loop once all pending requests finished. Signed-off-by: Jo-Philipp Wich --- lib/ubus.c | 255 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 254 insertions(+), 1 deletion(-) (limited to 'lib/ubus.c') diff --git a/lib/ubus.c b/lib/ubus.c index b5d5633..6605736 100644 --- a/lib/ubus.c +++ b/lib/ubus.c @@ -24,14 +24,29 @@ #define err_return(err) do { last_error = err; return NULL; } while(0) static enum ubus_msg_status last_error = 0; +static uc_resource_type_t *defer_type; static uc_resource_type_t *conn_type; +static uc_value_t *cb_registry; +static uint64_t n_cb_active; +static bool have_own_uloop; + typedef struct { int timeout; struct blob_buf buf; struct ubus_context *ctx; } ubus_connection; +typedef struct { + struct ubus_context *context; + struct ubus_request request; + struct uloop_timeout timeout; + bool complete; + uc_vm_t *vm; + uc_value_t *callback; + uc_value_t *response; +} ubus_deferred; + static uc_value_t * uc_ubus_error(uc_vm_t *vm, size_t nargs) { @@ -236,6 +251,89 @@ uc_ubus_call_cb(struct ubus_request *req, int type, struct blob_attr *msg) *res = msg ? uc_blob_array_to_json(NULL, blob_data(msg), blob_len(msg), true) : NULL; } +static void +uc_ubus_invoke_async_callback(ubus_deferred *defer, int ret, uc_value_t *reply) +{ + uc_resource_t *r; + size_t i; + + if (defer->callback) { + uc_vm_stack_push(defer->vm, ucv_get(defer->callback)); + uc_vm_stack_push(defer->vm, ucv_int64_new(ret)); + uc_vm_stack_push(defer->vm, ucv_get(reply)); + + if (uc_vm_call(defer->vm, false, 2) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(defer->vm)); + + defer->callback = NULL; + } + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == defer) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); +} + +static void +uc_ubus_call_data_cb(struct ubus_request *req, int type, struct blob_attr *msg) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->response == NULL) + defer->response = uc_blob_array_to_json(defer->vm, blob_data(msg), blob_len(msg), true); +} + +static void +uc_ubus_call_done_cb(struct ubus_request *req, int ret) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->complete) + return; + + defer->complete = true; + uloop_timeout_cancel(&defer->timeout); + + uc_ubus_invoke_async_callback(defer, ret, defer->response); +} + +static void +uc_ubus_call_timeout_cb(struct uloop_timeout *timeout) +{ + ubus_deferred *defer = container_of(timeout, ubus_deferred, timeout); + + if (defer->complete) + return; + + defer->complete = true; + ubus_abort_request(defer->context, &defer->request); + + uc_ubus_invoke_async_callback(defer, UBUS_STATUS_TIMEOUT, NULL); +} + +static bool +uc_ubus_have_uloop(void) +{ + bool prev = uloop_cancelled; + bool active; + + uloop_cancelled = true; + active = uloop_cancelling(); + uloop_cancelled = prev; + + return active; +} + static uc_value_t * uc_ubus_call(uc_vm_t *vm, size_t nargs) { @@ -244,8 +342,8 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) uc_value_t *funname = uc_fn_arg(1); uc_value_t *funargs = uc_fn_arg(2); uc_value_t *res = NULL; - json_object *o; enum ubus_msg_status rv; + json_object *o; uint32_t id; if (!c || !*c || !(*c)->ctx) @@ -281,6 +379,95 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) return res; } +static uc_value_t * +uc_ubus_defer(uc_vm_t *vm, size_t nargs) +{ + ubus_connection **c = uc_fn_this("ubus.connection"); + uc_value_t *objname = uc_fn_arg(0); + uc_value_t *funname = uc_fn_arg(1); + uc_value_t *funargs = uc_fn_arg(2); + uc_value_t *replycb = uc_fn_arg(3); + uc_value_t *res = NULL; + enum ubus_msg_status rv; + ubus_deferred *defer; + json_object *o; + uint32_t id; + size_t i; + + if (!c || !*c || !(*c)->ctx) + err_return(UBUS_STATUS_CONNECTION_FAILED); + + if (ucv_type(objname) != UC_STRING || + ucv_type(funname) != UC_STRING || + (funargs && ucv_type(funargs) != UC_OBJECT) || + (replycb && !ucv_is_callable(replycb))) + err_return(UBUS_STATUS_INVALID_ARGUMENT); + + blob_buf_init(&(*c)->buf, 0); + + if (funargs) { + o = ucv_to_json(funargs); + rv = blobmsg_add_object(&(*c)->buf, o); + json_object_put(o); + + if (!rv) + err_return(UBUS_STATUS_UNKNOWN_ERROR); + } + + rv = ubus_lookup_id((*c)->ctx, ucv_string_get(objname), &id); + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + defer = xalloc(sizeof(*defer)); + + rv = ubus_invoke_async((*c)->ctx, id, ucv_string_get(funname), + (*c)->buf.head, &defer->request); + + if (rv == UBUS_STATUS_OK) { + defer->vm = vm; + defer->context = (*c)->ctx; + defer->callback = replycb; + + defer->request.data_cb = uc_ubus_call_data_cb; + defer->request.complete_cb = uc_ubus_call_done_cb; + ubus_complete_request_async((*c)->ctx, &defer->request); + + defer->timeout.cb = uc_ubus_call_timeout_cb; + uloop_timeout_set(&defer->timeout, (*c)->timeout * 1000); + + res = uc_resource_new(defer_type, defer); + + for (i = 0;; i += 2) { + if (ucv_array_get(cb_registry, i) == NULL) { + ucv_array_set(cb_registry, i, ucv_get(res)); + ucv_array_set(cb_registry, i + 1, ucv_get(replycb)); + n_cb_active++; + break; + } + } + + if (!uc_ubus_have_uloop()) { + have_own_uloop = true; + uloop_run(); + } + } + else { + uc_vm_stack_push(vm, ucv_get(replycb)); + uc_vm_stack_push(vm, ucv_int64_new(rv)); + + if (uc_vm_call(vm, false, 1) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(vm)); + + free(defer); + } + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + return res; +} + static uc_value_t * uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) { @@ -295,6 +482,54 @@ uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) return ucv_boolean_new(true); } +static uc_value_t * +uc_ubus_defer_complete(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + + if (!d || !*d) + return NULL; + + return ucv_boolean_new((*d)->complete); +} + +static uc_value_t * +uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + uc_resource_t *r; + size_t i; + + if (!d || !*d) + return NULL; + + if ((*d)->complete) + return ucv_boolean_new(false); + + ubus_abort_request((*d)->context, &(*d)->request); + uloop_timeout_cancel(&(*d)->timeout); + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == *d) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); + + (*d)->callback = NULL; + (*d)->complete = true; + + return ucv_boolean_new(true); +} + static const uc_function_list_t global_fns[] = { { "error", uc_ubus_error }, @@ -304,10 +539,16 @@ static const uc_function_list_t global_fns[] = { static const uc_function_list_t conn_fns[] = { { "list", uc_ubus_list }, { "call", uc_ubus_call }, + { "defer", uc_ubus_defer }, { "error", uc_ubus_error }, { "disconnect", uc_ubus_disconnect }, }; +static const uc_function_list_t defer_fns[] = { + { "complete", uc_ubus_defer_complete }, + { "abort", uc_ubus_defer_abort }, +}; + static void close_connection(void *ud) { ubus_connection *conn = ud; @@ -320,9 +561,21 @@ static void close_connection(void *ud) { free(conn); } +static void close_deferred(void *ud) { + ubus_deferred *defer = ud; + + uloop_timeout_cancel(&defer->timeout); + ucv_put(defer->response); + free(defer); +} + void uc_module_init(uc_vm_t *vm, uc_value_t *scope) { uc_function_list_register(scope, global_fns); conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, close_connection); + defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, close_deferred); + cb_registry = ucv_array_new(vm); + + uc_vm_registry_set(vm, "ubus.cb_registry", cb_registry); } -- cgit v1.2.3