diff options
author | Jo-Philipp Wich <jo@mein.io> | 2021-12-07 10:50:12 +0100 |
---|---|---|
committer | Jo-Philipp Wich <jo@mein.io> | 2021-12-08 15:53:40 +0100 |
commit | 66f7c00af29d2547ae541eba3ce768f5eea8d269 (patch) | |
tree | 42f994ce85360d0ea8aa43a89a7cfcc8b747f52d /lib/ubus.c | |
parent | 99fdafd8c03a7baee265ce3dd22af5b2d02fdeb6 (diff) |
ubus: add support for async requests
Introduce a new ubus.defer() method which initiates asynchroneous requests
and invokes the completion callback passed as 4th argument once the reply
is received.
This allows multiplexing mutliple ubus requests with the same ucode VM
context / the same uloop event loop.
In case the ucode context is not running under an active uloop, the ubus
module will spawn uloop itself once the first asynchroneous request is
launched and terminate the loop once all pending requests finished.
Signed-off-by: Jo-Philipp Wich <jo@mein.io>
Diffstat (limited to 'lib/ubus.c')
-rw-r--r-- | lib/ubus.c | 255 |
1 files changed, 254 insertions, 1 deletions
@@ -24,14 +24,29 @@ #define err_return(err) do { last_error = err; return NULL; } while(0) static enum ubus_msg_status last_error = 0; +static uc_resource_type_t *defer_type; static uc_resource_type_t *conn_type; +static uc_value_t *cb_registry; +static uint64_t n_cb_active; +static bool have_own_uloop; + typedef struct { int timeout; struct blob_buf buf; struct ubus_context *ctx; } ubus_connection; +typedef struct { + struct ubus_context *context; + struct ubus_request request; + struct uloop_timeout timeout; + bool complete; + uc_vm_t *vm; + uc_value_t *callback; + uc_value_t *response; +} ubus_deferred; + static uc_value_t * uc_ubus_error(uc_vm_t *vm, size_t nargs) { @@ -236,6 +251,89 @@ uc_ubus_call_cb(struct ubus_request *req, int type, struct blob_attr *msg) *res = msg ? uc_blob_array_to_json(NULL, blob_data(msg), blob_len(msg), true) : NULL; } +static void +uc_ubus_invoke_async_callback(ubus_deferred *defer, int ret, uc_value_t *reply) +{ + uc_resource_t *r; + size_t i; + + if (defer->callback) { + uc_vm_stack_push(defer->vm, ucv_get(defer->callback)); + uc_vm_stack_push(defer->vm, ucv_int64_new(ret)); + uc_vm_stack_push(defer->vm, ucv_get(reply)); + + if (uc_vm_call(defer->vm, false, 2) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(defer->vm)); + + defer->callback = NULL; + } + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == defer) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); +} + +static void +uc_ubus_call_data_cb(struct ubus_request *req, int type, struct blob_attr *msg) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->response == NULL) + defer->response = uc_blob_array_to_json(defer->vm, blob_data(msg), blob_len(msg), true); +} + +static void +uc_ubus_call_done_cb(struct ubus_request *req, int ret) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->complete) + return; + + defer->complete = true; + uloop_timeout_cancel(&defer->timeout); + + uc_ubus_invoke_async_callback(defer, ret, defer->response); +} + +static void +uc_ubus_call_timeout_cb(struct uloop_timeout *timeout) +{ + ubus_deferred *defer = container_of(timeout, ubus_deferred, timeout); + + if (defer->complete) + return; + + defer->complete = true; + ubus_abort_request(defer->context, &defer->request); + + uc_ubus_invoke_async_callback(defer, UBUS_STATUS_TIMEOUT, NULL); +} + +static bool +uc_ubus_have_uloop(void) +{ + bool prev = uloop_cancelled; + bool active; + + uloop_cancelled = true; + active = uloop_cancelling(); + uloop_cancelled = prev; + + return active; +} + static uc_value_t * uc_ubus_call(uc_vm_t *vm, size_t nargs) { @@ -244,8 +342,8 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) uc_value_t *funname = uc_fn_arg(1); uc_value_t *funargs = uc_fn_arg(2); uc_value_t *res = NULL; - json_object *o; enum ubus_msg_status rv; + json_object *o; uint32_t id; if (!c || !*c || !(*c)->ctx) @@ -282,6 +380,95 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) } static uc_value_t * +uc_ubus_defer(uc_vm_t *vm, size_t nargs) +{ + ubus_connection **c = uc_fn_this("ubus.connection"); + uc_value_t *objname = uc_fn_arg(0); + uc_value_t *funname = uc_fn_arg(1); + uc_value_t *funargs = uc_fn_arg(2); + uc_value_t *replycb = uc_fn_arg(3); + uc_value_t *res = NULL; + enum ubus_msg_status rv; + ubus_deferred *defer; + json_object *o; + uint32_t id; + size_t i; + + if (!c || !*c || !(*c)->ctx) + err_return(UBUS_STATUS_CONNECTION_FAILED); + + if (ucv_type(objname) != UC_STRING || + ucv_type(funname) != UC_STRING || + (funargs && ucv_type(funargs) != UC_OBJECT) || + (replycb && !ucv_is_callable(replycb))) + err_return(UBUS_STATUS_INVALID_ARGUMENT); + + blob_buf_init(&(*c)->buf, 0); + + if (funargs) { + o = ucv_to_json(funargs); + rv = blobmsg_add_object(&(*c)->buf, o); + json_object_put(o); + + if (!rv) + err_return(UBUS_STATUS_UNKNOWN_ERROR); + } + + rv = ubus_lookup_id((*c)->ctx, ucv_string_get(objname), &id); + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + defer = xalloc(sizeof(*defer)); + + rv = ubus_invoke_async((*c)->ctx, id, ucv_string_get(funname), + (*c)->buf.head, &defer->request); + + if (rv == UBUS_STATUS_OK) { + defer->vm = vm; + defer->context = (*c)->ctx; + defer->callback = replycb; + + defer->request.data_cb = uc_ubus_call_data_cb; + defer->request.complete_cb = uc_ubus_call_done_cb; + ubus_complete_request_async((*c)->ctx, &defer->request); + + defer->timeout.cb = uc_ubus_call_timeout_cb; + uloop_timeout_set(&defer->timeout, (*c)->timeout * 1000); + + res = uc_resource_new(defer_type, defer); + + for (i = 0;; i += 2) { + if (ucv_array_get(cb_registry, i) == NULL) { + ucv_array_set(cb_registry, i, ucv_get(res)); + ucv_array_set(cb_registry, i + 1, ucv_get(replycb)); + n_cb_active++; + break; + } + } + + if (!uc_ubus_have_uloop()) { + have_own_uloop = true; + uloop_run(); + } + } + else { + uc_vm_stack_push(vm, ucv_get(replycb)); + uc_vm_stack_push(vm, ucv_int64_new(rv)); + + if (uc_vm_call(vm, false, 1) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(vm)); + + free(defer); + } + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + return res; +} + +static uc_value_t * uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) { ubus_connection **c = uc_fn_this("ubus.connection"); @@ -295,6 +482,54 @@ uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) return ucv_boolean_new(true); } +static uc_value_t * +uc_ubus_defer_complete(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + + if (!d || !*d) + return NULL; + + return ucv_boolean_new((*d)->complete); +} + +static uc_value_t * +uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + uc_resource_t *r; + size_t i; + + if (!d || !*d) + return NULL; + + if ((*d)->complete) + return ucv_boolean_new(false); + + ubus_abort_request((*d)->context, &(*d)->request); + uloop_timeout_cancel(&(*d)->timeout); + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == *d) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); + + (*d)->callback = NULL; + (*d)->complete = true; + + return ucv_boolean_new(true); +} + static const uc_function_list_t global_fns[] = { { "error", uc_ubus_error }, @@ -304,10 +539,16 @@ static const uc_function_list_t global_fns[] = { static const uc_function_list_t conn_fns[] = { { "list", uc_ubus_list }, { "call", uc_ubus_call }, + { "defer", uc_ubus_defer }, { "error", uc_ubus_error }, { "disconnect", uc_ubus_disconnect }, }; +static const uc_function_list_t defer_fns[] = { + { "complete", uc_ubus_defer_complete }, + { "abort", uc_ubus_defer_abort }, +}; + static void close_connection(void *ud) { ubus_connection *conn = ud; @@ -320,9 +561,21 @@ static void close_connection(void *ud) { free(conn); } +static void close_deferred(void *ud) { + ubus_deferred *defer = ud; + + uloop_timeout_cancel(&defer->timeout); + ucv_put(defer->response); + free(defer); +} + void uc_module_init(uc_vm_t *vm, uc_value_t *scope) { uc_function_list_register(scope, global_fns); conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, close_connection); + defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, close_deferred); + cb_registry = ucv_array_new(vm); + + uc_vm_registry_set(vm, "ubus.cb_registry", cb_registry); } |