diff options
author | Jo-Philipp Wich <jo@mein.io> | 2021-12-08 20:04:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-08 20:04:35 +0100 |
commit | 70c87cd08da5a42a30c3a039565530f28803385e (patch) | |
tree | 42f994ce85360d0ea8aa43a89a7cfcc8b747f52d | |
parent | 0d29b2558987eda5d8a913638f40d506172606ac (diff) | |
parent | 66f7c00af29d2547ae541eba3ce768f5eea8d269 (diff) |
Merge pull request #27 from jow-/ubus-defer-support
ubus: add support for async requests
-rw-r--r-- | include/ucode/types.h | 1 | ||||
-rw-r--r-- | include/ucode/vm.h | 5 | ||||
-rw-r--r-- | lib/ubus.c | 255 | ||||
-rw-r--r-- | types.c | 1 | ||||
-rw-r--r-- | vm.c | 31 |
5 files changed, 292 insertions, 1 deletions
diff --git a/include/ucode/types.h b/include/ucode/types.h index ccb0e1b..49d910b 100644 --- a/include/ucode/types.h +++ b/include/ucode/types.h @@ -250,6 +250,7 @@ struct uc_vm { uc_upvalref_t *open_upvals; uc_parse_config_t *config; uc_value_t *globals; + uc_value_t *registry; uc_source_t *sources; uc_weakref_t values; uc_resource_types_t restypes; diff --git a/include/ucode/vm.h b/include/ucode/vm.h index 1e9357f..caebb7a 100644 --- a/include/ucode/vm.h +++ b/include/ucode/vm.h @@ -119,6 +119,11 @@ void uc_vm_free(uc_vm_t *vm); uc_value_t *uc_vm_scope_get(uc_vm_t *vm); void uc_vm_scope_set(uc_vm_t *vm, uc_value_t *ctx); +bool uc_vm_registry_exists(uc_vm_t *vm, const char *key); +uc_value_t *uc_vm_registry_get(uc_vm_t *vm, const char *key); +void uc_vm_registry_set(uc_vm_t *vm, const char *key, uc_value_t *value); +bool uc_vm_registry_delete(uc_vm_t *vm, const char *key); + void uc_vm_stack_push(uc_vm_t *vm, uc_value_t *value); uc_value_t *uc_vm_stack_pop(uc_vm_t *vm); uc_value_t *uc_vm_stack_peek(uc_vm_t *vm, size_t offset); @@ -24,14 +24,29 @@ #define err_return(err) do { last_error = err; return NULL; } while(0) static enum ubus_msg_status last_error = 0; +static uc_resource_type_t *defer_type; static uc_resource_type_t *conn_type; +static uc_value_t *cb_registry; +static uint64_t n_cb_active; +static bool have_own_uloop; + typedef struct { int timeout; struct blob_buf buf; struct ubus_context *ctx; } ubus_connection; +typedef struct { + struct ubus_context *context; + struct ubus_request request; + struct uloop_timeout timeout; + bool complete; + uc_vm_t *vm; + uc_value_t *callback; + uc_value_t *response; +} ubus_deferred; + static uc_value_t * uc_ubus_error(uc_vm_t *vm, size_t nargs) { @@ -236,6 +251,89 @@ uc_ubus_call_cb(struct ubus_request *req, int type, struct blob_attr *msg) *res = msg ? uc_blob_array_to_json(NULL, blob_data(msg), blob_len(msg), true) : NULL; } +static void +uc_ubus_invoke_async_callback(ubus_deferred *defer, int ret, uc_value_t *reply) +{ + uc_resource_t *r; + size_t i; + + if (defer->callback) { + uc_vm_stack_push(defer->vm, ucv_get(defer->callback)); + uc_vm_stack_push(defer->vm, ucv_int64_new(ret)); + uc_vm_stack_push(defer->vm, ucv_get(reply)); + + if (uc_vm_call(defer->vm, false, 2) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(defer->vm)); + + defer->callback = NULL; + } + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == defer) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); +} + +static void +uc_ubus_call_data_cb(struct ubus_request *req, int type, struct blob_attr *msg) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->response == NULL) + defer->response = uc_blob_array_to_json(defer->vm, blob_data(msg), blob_len(msg), true); +} + +static void +uc_ubus_call_done_cb(struct ubus_request *req, int ret) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->complete) + return; + + defer->complete = true; + uloop_timeout_cancel(&defer->timeout); + + uc_ubus_invoke_async_callback(defer, ret, defer->response); +} + +static void +uc_ubus_call_timeout_cb(struct uloop_timeout *timeout) +{ + ubus_deferred *defer = container_of(timeout, ubus_deferred, timeout); + + if (defer->complete) + return; + + defer->complete = true; + ubus_abort_request(defer->context, &defer->request); + + uc_ubus_invoke_async_callback(defer, UBUS_STATUS_TIMEOUT, NULL); +} + +static bool +uc_ubus_have_uloop(void) +{ + bool prev = uloop_cancelled; + bool active; + + uloop_cancelled = true; + active = uloop_cancelling(); + uloop_cancelled = prev; + + return active; +} + static uc_value_t * uc_ubus_call(uc_vm_t *vm, size_t nargs) { @@ -244,8 +342,8 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) uc_value_t *funname = uc_fn_arg(1); uc_value_t *funargs = uc_fn_arg(2); uc_value_t *res = NULL; - json_object *o; enum ubus_msg_status rv; + json_object *o; uint32_t id; if (!c || !*c || !(*c)->ctx) @@ -282,6 +380,95 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) } static uc_value_t * +uc_ubus_defer(uc_vm_t *vm, size_t nargs) +{ + ubus_connection **c = uc_fn_this("ubus.connection"); + uc_value_t *objname = uc_fn_arg(0); + uc_value_t *funname = uc_fn_arg(1); + uc_value_t *funargs = uc_fn_arg(2); + uc_value_t *replycb = uc_fn_arg(3); + uc_value_t *res = NULL; + enum ubus_msg_status rv; + ubus_deferred *defer; + json_object *o; + uint32_t id; + size_t i; + + if (!c || !*c || !(*c)->ctx) + err_return(UBUS_STATUS_CONNECTION_FAILED); + + if (ucv_type(objname) != UC_STRING || + ucv_type(funname) != UC_STRING || + (funargs && ucv_type(funargs) != UC_OBJECT) || + (replycb && !ucv_is_callable(replycb))) + err_return(UBUS_STATUS_INVALID_ARGUMENT); + + blob_buf_init(&(*c)->buf, 0); + + if (funargs) { + o = ucv_to_json(funargs); + rv = blobmsg_add_object(&(*c)->buf, o); + json_object_put(o); + + if (!rv) + err_return(UBUS_STATUS_UNKNOWN_ERROR); + } + + rv = ubus_lookup_id((*c)->ctx, ucv_string_get(objname), &id); + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + defer = xalloc(sizeof(*defer)); + + rv = ubus_invoke_async((*c)->ctx, id, ucv_string_get(funname), + (*c)->buf.head, &defer->request); + + if (rv == UBUS_STATUS_OK) { + defer->vm = vm; + defer->context = (*c)->ctx; + defer->callback = replycb; + + defer->request.data_cb = uc_ubus_call_data_cb; + defer->request.complete_cb = uc_ubus_call_done_cb; + ubus_complete_request_async((*c)->ctx, &defer->request); + + defer->timeout.cb = uc_ubus_call_timeout_cb; + uloop_timeout_set(&defer->timeout, (*c)->timeout * 1000); + + res = uc_resource_new(defer_type, defer); + + for (i = 0;; i += 2) { + if (ucv_array_get(cb_registry, i) == NULL) { + ucv_array_set(cb_registry, i, ucv_get(res)); + ucv_array_set(cb_registry, i + 1, ucv_get(replycb)); + n_cb_active++; + break; + } + } + + if (!uc_ubus_have_uloop()) { + have_own_uloop = true; + uloop_run(); + } + } + else { + uc_vm_stack_push(vm, ucv_get(replycb)); + uc_vm_stack_push(vm, ucv_int64_new(rv)); + + if (uc_vm_call(vm, false, 1) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(vm)); + + free(defer); + } + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + return res; +} + +static uc_value_t * uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) { ubus_connection **c = uc_fn_this("ubus.connection"); @@ -295,6 +482,54 @@ uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) return ucv_boolean_new(true); } +static uc_value_t * +uc_ubus_defer_complete(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + + if (!d || !*d) + return NULL; + + return ucv_boolean_new((*d)->complete); +} + +static uc_value_t * +uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + uc_resource_t *r; + size_t i; + + if (!d || !*d) + return NULL; + + if ((*d)->complete) + return ucv_boolean_new(false); + + ubus_abort_request((*d)->context, &(*d)->request); + uloop_timeout_cancel(&(*d)->timeout); + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == *d) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); + + (*d)->callback = NULL; + (*d)->complete = true; + + return ucv_boolean_new(true); +} + static const uc_function_list_t global_fns[] = { { "error", uc_ubus_error }, @@ -304,10 +539,16 @@ static const uc_function_list_t global_fns[] = { static const uc_function_list_t conn_fns[] = { { "list", uc_ubus_list }, { "call", uc_ubus_call }, + { "defer", uc_ubus_defer }, { "error", uc_ubus_error }, { "disconnect", uc_ubus_disconnect }, }; +static const uc_function_list_t defer_fns[] = { + { "complete", uc_ubus_defer_complete }, + { "abort", uc_ubus_defer_abort }, +}; + static void close_connection(void *ud) { ubus_connection *conn = ud; @@ -320,9 +561,21 @@ static void close_connection(void *ud) { free(conn); } +static void close_deferred(void *ud) { + ubus_deferred *defer = ud; + + uloop_timeout_cancel(&defer->timeout); + ucv_put(defer->response); + free(defer); +} + void uc_module_init(uc_vm_t *vm, uc_value_t *scope) { uc_function_list_register(scope, global_fns); conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, close_connection); + defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, close_deferred); + cb_registry = ucv_array_new(vm); + + uc_vm_registry_set(vm, "ubus.cb_registry", cb_registry); } @@ -2078,6 +2078,7 @@ ucv_gc_common(uc_vm_t *vm, bool final) if (!final) { /* mark reachable objects */ ucv_gc_mark(vm->globals); + ucv_gc_mark(vm->registry); ucv_gc_mark(vm->exception.stacktrace); for (i = 0; i < vm->callframes.count; i++) { @@ -2480,3 +2480,34 @@ uc_vm_trace_set(uc_vm_t *vm, uint32_t level) { vm->trace = level; } + +bool +uc_vm_registry_exists(uc_vm_t *vm, const char *key) +{ + bool exists; + + ucv_object_get(vm->registry, key, &exists); + + return exists; +} + +uc_value_t * +uc_vm_registry_get(uc_vm_t *vm, const char *key) +{ + return ucv_object_get(vm->registry, key, NULL); +} + +void +uc_vm_registry_set(uc_vm_t *vm, const char *key, uc_value_t *value) +{ + if (!vm->registry) + vm->registry = ucv_object_new(vm); + + ucv_object_add(vm->registry, key, value); +} + +bool +uc_vm_registry_delete(uc_vm_t *vm, const char *key) +{ + return ucv_object_delete(vm->registry, key); +} |