From 99fdafd8c03a7baee265ce3dd22af5b2d02fdeb6 Mon Sep 17 00:00:00 2001 From: Jo-Philipp Wich Date: Tue, 7 Dec 2021 13:40:51 +0100 Subject: vm: introduce value registry Introduce a new, lazily allocated value registry which can be used by C code to store values which should not be garbage collected. The registry is a plain ucode object internally and treated as GC root but not exposed to ucode script code, this allows it to retain references to values which are otherwise completely unreachable from ucode scripts. Signed-off-by: Jo-Philipp Wich --- include/ucode/types.h | 1 + include/ucode/vm.h | 5 +++++ types.c | 1 + vm.c | 31 +++++++++++++++++++++++++++++++ 4 files changed, 38 insertions(+) diff --git a/include/ucode/types.h b/include/ucode/types.h index ccb0e1b..49d910b 100644 --- a/include/ucode/types.h +++ b/include/ucode/types.h @@ -250,6 +250,7 @@ struct uc_vm { uc_upvalref_t *open_upvals; uc_parse_config_t *config; uc_value_t *globals; + uc_value_t *registry; uc_source_t *sources; uc_weakref_t values; uc_resource_types_t restypes; diff --git a/include/ucode/vm.h b/include/ucode/vm.h index 1e9357f..caebb7a 100644 --- a/include/ucode/vm.h +++ b/include/ucode/vm.h @@ -119,6 +119,11 @@ void uc_vm_free(uc_vm_t *vm); uc_value_t *uc_vm_scope_get(uc_vm_t *vm); void uc_vm_scope_set(uc_vm_t *vm, uc_value_t *ctx); +bool uc_vm_registry_exists(uc_vm_t *vm, const char *key); +uc_value_t *uc_vm_registry_get(uc_vm_t *vm, const char *key); +void uc_vm_registry_set(uc_vm_t *vm, const char *key, uc_value_t *value); +bool uc_vm_registry_delete(uc_vm_t *vm, const char *key); + void uc_vm_stack_push(uc_vm_t *vm, uc_value_t *value); uc_value_t *uc_vm_stack_pop(uc_vm_t *vm); uc_value_t *uc_vm_stack_peek(uc_vm_t *vm, size_t offset); diff --git a/types.c b/types.c index c3032e1..a8ca023 100644 --- a/types.c +++ b/types.c @@ -2078,6 +2078,7 @@ ucv_gc_common(uc_vm_t *vm, bool final) if (!final) { /* mark reachable objects */ ucv_gc_mark(vm->globals); + ucv_gc_mark(vm->registry); ucv_gc_mark(vm->exception.stacktrace); for (i = 0; i < vm->callframes.count; i++) { diff --git a/vm.c b/vm.c index a1be51b..e21bc4b 100644 --- a/vm.c +++ b/vm.c @@ -2480,3 +2480,34 @@ uc_vm_trace_set(uc_vm_t *vm, uint32_t level) { vm->trace = level; } + +bool +uc_vm_registry_exists(uc_vm_t *vm, const char *key) +{ + bool exists; + + ucv_object_get(vm->registry, key, &exists); + + return exists; +} + +uc_value_t * +uc_vm_registry_get(uc_vm_t *vm, const char *key) +{ + return ucv_object_get(vm->registry, key, NULL); +} + +void +uc_vm_registry_set(uc_vm_t *vm, const char *key, uc_value_t *value) +{ + if (!vm->registry) + vm->registry = ucv_object_new(vm); + + ucv_object_add(vm->registry, key, value); +} + +bool +uc_vm_registry_delete(uc_vm_t *vm, const char *key) +{ + return ucv_object_delete(vm->registry, key); +} -- cgit v1.2.3 From 66f7c00af29d2547ae541eba3ce768f5eea8d269 Mon Sep 17 00:00:00 2001 From: Jo-Philipp Wich Date: Tue, 7 Dec 2021 10:50:12 +0100 Subject: ubus: add support for async requests Introduce a new ubus.defer() method which initiates asynchroneous requests and invokes the completion callback passed as 4th argument once the reply is received. This allows multiplexing mutliple ubus requests with the same ucode VM context / the same uloop event loop. In case the ucode context is not running under an active uloop, the ubus module will spawn uloop itself once the first asynchroneous request is launched and terminate the loop once all pending requests finished. Signed-off-by: Jo-Philipp Wich --- lib/ubus.c | 255 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 254 insertions(+), 1 deletion(-) diff --git a/lib/ubus.c b/lib/ubus.c index b5d5633..6605736 100644 --- a/lib/ubus.c +++ b/lib/ubus.c @@ -24,14 +24,29 @@ #define err_return(err) do { last_error = err; return NULL; } while(0) static enum ubus_msg_status last_error = 0; +static uc_resource_type_t *defer_type; static uc_resource_type_t *conn_type; +static uc_value_t *cb_registry; +static uint64_t n_cb_active; +static bool have_own_uloop; + typedef struct { int timeout; struct blob_buf buf; struct ubus_context *ctx; } ubus_connection; +typedef struct { + struct ubus_context *context; + struct ubus_request request; + struct uloop_timeout timeout; + bool complete; + uc_vm_t *vm; + uc_value_t *callback; + uc_value_t *response; +} ubus_deferred; + static uc_value_t * uc_ubus_error(uc_vm_t *vm, size_t nargs) { @@ -236,6 +251,89 @@ uc_ubus_call_cb(struct ubus_request *req, int type, struct blob_attr *msg) *res = msg ? uc_blob_array_to_json(NULL, blob_data(msg), blob_len(msg), true) : NULL; } +static void +uc_ubus_invoke_async_callback(ubus_deferred *defer, int ret, uc_value_t *reply) +{ + uc_resource_t *r; + size_t i; + + if (defer->callback) { + uc_vm_stack_push(defer->vm, ucv_get(defer->callback)); + uc_vm_stack_push(defer->vm, ucv_int64_new(ret)); + uc_vm_stack_push(defer->vm, ucv_get(reply)); + + if (uc_vm_call(defer->vm, false, 2) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(defer->vm)); + + defer->callback = NULL; + } + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == defer) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); +} + +static void +uc_ubus_call_data_cb(struct ubus_request *req, int type, struct blob_attr *msg) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->response == NULL) + defer->response = uc_blob_array_to_json(defer->vm, blob_data(msg), blob_len(msg), true); +} + +static void +uc_ubus_call_done_cb(struct ubus_request *req, int ret) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->complete) + return; + + defer->complete = true; + uloop_timeout_cancel(&defer->timeout); + + uc_ubus_invoke_async_callback(defer, ret, defer->response); +} + +static void +uc_ubus_call_timeout_cb(struct uloop_timeout *timeout) +{ + ubus_deferred *defer = container_of(timeout, ubus_deferred, timeout); + + if (defer->complete) + return; + + defer->complete = true; + ubus_abort_request(defer->context, &defer->request); + + uc_ubus_invoke_async_callback(defer, UBUS_STATUS_TIMEOUT, NULL); +} + +static bool +uc_ubus_have_uloop(void) +{ + bool prev = uloop_cancelled; + bool active; + + uloop_cancelled = true; + active = uloop_cancelling(); + uloop_cancelled = prev; + + return active; +} + static uc_value_t * uc_ubus_call(uc_vm_t *vm, size_t nargs) { @@ -244,8 +342,8 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) uc_value_t *funname = uc_fn_arg(1); uc_value_t *funargs = uc_fn_arg(2); uc_value_t *res = NULL; - json_object *o; enum ubus_msg_status rv; + json_object *o; uint32_t id; if (!c || !*c || !(*c)->ctx) @@ -281,6 +379,95 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) return res; } +static uc_value_t * +uc_ubus_defer(uc_vm_t *vm, size_t nargs) +{ + ubus_connection **c = uc_fn_this("ubus.connection"); + uc_value_t *objname = uc_fn_arg(0); + uc_value_t *funname = uc_fn_arg(1); + uc_value_t *funargs = uc_fn_arg(2); + uc_value_t *replycb = uc_fn_arg(3); + uc_value_t *res = NULL; + enum ubus_msg_status rv; + ubus_deferred *defer; + json_object *o; + uint32_t id; + size_t i; + + if (!c || !*c || !(*c)->ctx) + err_return(UBUS_STATUS_CONNECTION_FAILED); + + if (ucv_type(objname) != UC_STRING || + ucv_type(funname) != UC_STRING || + (funargs && ucv_type(funargs) != UC_OBJECT) || + (replycb && !ucv_is_callable(replycb))) + err_return(UBUS_STATUS_INVALID_ARGUMENT); + + blob_buf_init(&(*c)->buf, 0); + + if (funargs) { + o = ucv_to_json(funargs); + rv = blobmsg_add_object(&(*c)->buf, o); + json_object_put(o); + + if (!rv) + err_return(UBUS_STATUS_UNKNOWN_ERROR); + } + + rv = ubus_lookup_id((*c)->ctx, ucv_string_get(objname), &id); + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + defer = xalloc(sizeof(*defer)); + + rv = ubus_invoke_async((*c)->ctx, id, ucv_string_get(funname), + (*c)->buf.head, &defer->request); + + if (rv == UBUS_STATUS_OK) { + defer->vm = vm; + defer->context = (*c)->ctx; + defer->callback = replycb; + + defer->request.data_cb = uc_ubus_call_data_cb; + defer->request.complete_cb = uc_ubus_call_done_cb; + ubus_complete_request_async((*c)->ctx, &defer->request); + + defer->timeout.cb = uc_ubus_call_timeout_cb; + uloop_timeout_set(&defer->timeout, (*c)->timeout * 1000); + + res = uc_resource_new(defer_type, defer); + + for (i = 0;; i += 2) { + if (ucv_array_get(cb_registry, i) == NULL) { + ucv_array_set(cb_registry, i, ucv_get(res)); + ucv_array_set(cb_registry, i + 1, ucv_get(replycb)); + n_cb_active++; + break; + } + } + + if (!uc_ubus_have_uloop()) { + have_own_uloop = true; + uloop_run(); + } + } + else { + uc_vm_stack_push(vm, ucv_get(replycb)); + uc_vm_stack_push(vm, ucv_int64_new(rv)); + + if (uc_vm_call(vm, false, 1) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(vm)); + + free(defer); + } + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + return res; +} + static uc_value_t * uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) { @@ -295,6 +482,54 @@ uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) return ucv_boolean_new(true); } +static uc_value_t * +uc_ubus_defer_complete(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + + if (!d || !*d) + return NULL; + + return ucv_boolean_new((*d)->complete); +} + +static uc_value_t * +uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + uc_resource_t *r; + size_t i; + + if (!d || !*d) + return NULL; + + if ((*d)->complete) + return ucv_boolean_new(false); + + ubus_abort_request((*d)->context, &(*d)->request); + uloop_timeout_cancel(&(*d)->timeout); + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == *d) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); + + (*d)->callback = NULL; + (*d)->complete = true; + + return ucv_boolean_new(true); +} + static const uc_function_list_t global_fns[] = { { "error", uc_ubus_error }, @@ -304,10 +539,16 @@ static const uc_function_list_t global_fns[] = { static const uc_function_list_t conn_fns[] = { { "list", uc_ubus_list }, { "call", uc_ubus_call }, + { "defer", uc_ubus_defer }, { "error", uc_ubus_error }, { "disconnect", uc_ubus_disconnect }, }; +static const uc_function_list_t defer_fns[] = { + { "complete", uc_ubus_defer_complete }, + { "abort", uc_ubus_defer_abort }, +}; + static void close_connection(void *ud) { ubus_connection *conn = ud; @@ -320,9 +561,21 @@ static void close_connection(void *ud) { free(conn); } +static void close_deferred(void *ud) { + ubus_deferred *defer = ud; + + uloop_timeout_cancel(&defer->timeout); + ucv_put(defer->response); + free(defer); +} + void uc_module_init(uc_vm_t *vm, uc_value_t *scope) { uc_function_list_register(scope, global_fns); conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, close_connection); + defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, close_deferred); + cb_registry = ucv_array_new(vm); + + uc_vm_registry_set(vm, "ubus.cb_registry", cb_registry); } -- cgit v1.2.3