diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/ubus.c | 255 |
1 files changed, 254 insertions, 1 deletions
@@ -24,14 +24,29 @@ #define err_return(err) do { last_error = err; return NULL; } while(0) static enum ubus_msg_status last_error = 0; +static uc_resource_type_t *defer_type; static uc_resource_type_t *conn_type; +static uc_value_t *cb_registry; +static uint64_t n_cb_active; +static bool have_own_uloop; + typedef struct { int timeout; struct blob_buf buf; struct ubus_context *ctx; } ubus_connection; +typedef struct { + struct ubus_context *context; + struct ubus_request request; + struct uloop_timeout timeout; + bool complete; + uc_vm_t *vm; + uc_value_t *callback; + uc_value_t *response; +} ubus_deferred; + static uc_value_t * uc_ubus_error(uc_vm_t *vm, size_t nargs) { @@ -236,6 +251,89 @@ uc_ubus_call_cb(struct ubus_request *req, int type, struct blob_attr *msg) *res = msg ? uc_blob_array_to_json(NULL, blob_data(msg), blob_len(msg), true) : NULL; } +static void +uc_ubus_invoke_async_callback(ubus_deferred *defer, int ret, uc_value_t *reply) +{ + uc_resource_t *r; + size_t i; + + if (defer->callback) { + uc_vm_stack_push(defer->vm, ucv_get(defer->callback)); + uc_vm_stack_push(defer->vm, ucv_int64_new(ret)); + uc_vm_stack_push(defer->vm, ucv_get(reply)); + + if (uc_vm_call(defer->vm, false, 2) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(defer->vm)); + + defer->callback = NULL; + } + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == defer) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); +} + +static void +uc_ubus_call_data_cb(struct ubus_request *req, int type, struct blob_attr *msg) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->response == NULL) + defer->response = uc_blob_array_to_json(defer->vm, blob_data(msg), blob_len(msg), true); +} + +static void +uc_ubus_call_done_cb(struct ubus_request *req, int ret) +{ + ubus_deferred *defer = container_of(req, ubus_deferred, request); + + if (defer->complete) + return; + + defer->complete = true; + uloop_timeout_cancel(&defer->timeout); + + uc_ubus_invoke_async_callback(defer, ret, defer->response); +} + +static void +uc_ubus_call_timeout_cb(struct uloop_timeout *timeout) +{ + ubus_deferred *defer = container_of(timeout, ubus_deferred, timeout); + + if (defer->complete) + return; + + defer->complete = true; + ubus_abort_request(defer->context, &defer->request); + + uc_ubus_invoke_async_callback(defer, UBUS_STATUS_TIMEOUT, NULL); +} + +static bool +uc_ubus_have_uloop(void) +{ + bool prev = uloop_cancelled; + bool active; + + uloop_cancelled = true; + active = uloop_cancelling(); + uloop_cancelled = prev; + + return active; +} + static uc_value_t * uc_ubus_call(uc_vm_t *vm, size_t nargs) { @@ -244,8 +342,8 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) uc_value_t *funname = uc_fn_arg(1); uc_value_t *funargs = uc_fn_arg(2); uc_value_t *res = NULL; - json_object *o; enum ubus_msg_status rv; + json_object *o; uint32_t id; if (!c || !*c || !(*c)->ctx) @@ -282,6 +380,95 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs) } static uc_value_t * +uc_ubus_defer(uc_vm_t *vm, size_t nargs) +{ + ubus_connection **c = uc_fn_this("ubus.connection"); + uc_value_t *objname = uc_fn_arg(0); + uc_value_t *funname = uc_fn_arg(1); + uc_value_t *funargs = uc_fn_arg(2); + uc_value_t *replycb = uc_fn_arg(3); + uc_value_t *res = NULL; + enum ubus_msg_status rv; + ubus_deferred *defer; + json_object *o; + uint32_t id; + size_t i; + + if (!c || !*c || !(*c)->ctx) + err_return(UBUS_STATUS_CONNECTION_FAILED); + + if (ucv_type(objname) != UC_STRING || + ucv_type(funname) != UC_STRING || + (funargs && ucv_type(funargs) != UC_OBJECT) || + (replycb && !ucv_is_callable(replycb))) + err_return(UBUS_STATUS_INVALID_ARGUMENT); + + blob_buf_init(&(*c)->buf, 0); + + if (funargs) { + o = ucv_to_json(funargs); + rv = blobmsg_add_object(&(*c)->buf, o); + json_object_put(o); + + if (!rv) + err_return(UBUS_STATUS_UNKNOWN_ERROR); + } + + rv = ubus_lookup_id((*c)->ctx, ucv_string_get(objname), &id); + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + defer = xalloc(sizeof(*defer)); + + rv = ubus_invoke_async((*c)->ctx, id, ucv_string_get(funname), + (*c)->buf.head, &defer->request); + + if (rv == UBUS_STATUS_OK) { + defer->vm = vm; + defer->context = (*c)->ctx; + defer->callback = replycb; + + defer->request.data_cb = uc_ubus_call_data_cb; + defer->request.complete_cb = uc_ubus_call_done_cb; + ubus_complete_request_async((*c)->ctx, &defer->request); + + defer->timeout.cb = uc_ubus_call_timeout_cb; + uloop_timeout_set(&defer->timeout, (*c)->timeout * 1000); + + res = uc_resource_new(defer_type, defer); + + for (i = 0;; i += 2) { + if (ucv_array_get(cb_registry, i) == NULL) { + ucv_array_set(cb_registry, i, ucv_get(res)); + ucv_array_set(cb_registry, i + 1, ucv_get(replycb)); + n_cb_active++; + break; + } + } + + if (!uc_ubus_have_uloop()) { + have_own_uloop = true; + uloop_run(); + } + } + else { + uc_vm_stack_push(vm, ucv_get(replycb)); + uc_vm_stack_push(vm, ucv_int64_new(rv)); + + if (uc_vm_call(vm, false, 1) == EXCEPTION_NONE) + ucv_put(uc_vm_stack_pop(vm)); + + free(defer); + } + + if (rv != UBUS_STATUS_OK) + err_return(rv); + + return res; +} + +static uc_value_t * uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) { ubus_connection **c = uc_fn_this("ubus.connection"); @@ -295,6 +482,54 @@ uc_ubus_disconnect(uc_vm_t *vm, size_t nargs) return ucv_boolean_new(true); } +static uc_value_t * +uc_ubus_defer_complete(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + + if (!d || !*d) + return NULL; + + return ucv_boolean_new((*d)->complete); +} + +static uc_value_t * +uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs) +{ + ubus_deferred **d = uc_fn_this("ubus.deferred"); + uc_resource_t *r; + size_t i; + + if (!d || !*d) + return NULL; + + if ((*d)->complete) + return ucv_boolean_new(false); + + ubus_abort_request((*d)->context, &(*d)->request); + uloop_timeout_cancel(&(*d)->timeout); + + for (i = 0; i < ucv_array_length(cb_registry); i += 2) { + r = (uc_resource_t *)ucv_array_get(cb_registry, i); + + if (r && r->data == *d) { + ucv_array_set(cb_registry, i, NULL); + ucv_array_set(cb_registry, i + 1, NULL); + break; + } + } + + n_cb_active--; + + if (have_own_uloop && n_cb_active == 0) + uloop_end(); + + (*d)->callback = NULL; + (*d)->complete = true; + + return ucv_boolean_new(true); +} + static const uc_function_list_t global_fns[] = { { "error", uc_ubus_error }, @@ -304,10 +539,16 @@ static const uc_function_list_t global_fns[] = { static const uc_function_list_t conn_fns[] = { { "list", uc_ubus_list }, { "call", uc_ubus_call }, + { "defer", uc_ubus_defer }, { "error", uc_ubus_error }, { "disconnect", uc_ubus_disconnect }, }; +static const uc_function_list_t defer_fns[] = { + { "complete", uc_ubus_defer_complete }, + { "abort", uc_ubus_defer_abort }, +}; + static void close_connection(void *ud) { ubus_connection *conn = ud; @@ -320,9 +561,21 @@ static void close_connection(void *ud) { free(conn); } +static void close_deferred(void *ud) { + ubus_deferred *defer = ud; + + uloop_timeout_cancel(&defer->timeout); + ucv_put(defer->response); + free(defer); +} + void uc_module_init(uc_vm_t *vm, uc_value_t *scope) { uc_function_list_register(scope, global_fns); conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, close_connection); + defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, close_deferred); + cb_registry = ucv_array_new(vm); + + uc_vm_registry_set(vm, "ubus.cb_registry", cb_registry); } |