summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJo-Philipp Wich <jo@mein.io>2021-12-08 20:04:35 +0100
committerGitHub <noreply@github.com>2021-12-08 20:04:35 +0100
commit70c87cd08da5a42a30c3a039565530f28803385e (patch)
tree42f994ce85360d0ea8aa43a89a7cfcc8b747f52d
parent0d29b2558987eda5d8a913638f40d506172606ac (diff)
parent66f7c00af29d2547ae541eba3ce768f5eea8d269 (diff)
Merge pull request #27 from jow-/ubus-defer-support
ubus: add support for async requests
-rw-r--r--include/ucode/types.h1
-rw-r--r--include/ucode/vm.h5
-rw-r--r--lib/ubus.c255
-rw-r--r--types.c1
-rw-r--r--vm.c31
5 files changed, 292 insertions, 1 deletions
diff --git a/include/ucode/types.h b/include/ucode/types.h
index ccb0e1b..49d910b 100644
--- a/include/ucode/types.h
+++ b/include/ucode/types.h
@@ -250,6 +250,7 @@ struct uc_vm {
uc_upvalref_t *open_upvals;
uc_parse_config_t *config;
uc_value_t *globals;
+ uc_value_t *registry;
uc_source_t *sources;
uc_weakref_t values;
uc_resource_types_t restypes;
diff --git a/include/ucode/vm.h b/include/ucode/vm.h
index 1e9357f..caebb7a 100644
--- a/include/ucode/vm.h
+++ b/include/ucode/vm.h
@@ -119,6 +119,11 @@ void uc_vm_free(uc_vm_t *vm);
uc_value_t *uc_vm_scope_get(uc_vm_t *vm);
void uc_vm_scope_set(uc_vm_t *vm, uc_value_t *ctx);
+bool uc_vm_registry_exists(uc_vm_t *vm, const char *key);
+uc_value_t *uc_vm_registry_get(uc_vm_t *vm, const char *key);
+void uc_vm_registry_set(uc_vm_t *vm, const char *key, uc_value_t *value);
+bool uc_vm_registry_delete(uc_vm_t *vm, const char *key);
+
void uc_vm_stack_push(uc_vm_t *vm, uc_value_t *value);
uc_value_t *uc_vm_stack_pop(uc_vm_t *vm);
uc_value_t *uc_vm_stack_peek(uc_vm_t *vm, size_t offset);
diff --git a/lib/ubus.c b/lib/ubus.c
index b5d5633..6605736 100644
--- a/lib/ubus.c
+++ b/lib/ubus.c
@@ -24,14 +24,29 @@
#define err_return(err) do { last_error = err; return NULL; } while(0)
static enum ubus_msg_status last_error = 0;
+static uc_resource_type_t *defer_type;
static uc_resource_type_t *conn_type;
+static uc_value_t *cb_registry;
+static uint64_t n_cb_active;
+static bool have_own_uloop;
+
typedef struct {
int timeout;
struct blob_buf buf;
struct ubus_context *ctx;
} ubus_connection;
+typedef struct {
+ struct ubus_context *context;
+ struct ubus_request request;
+ struct uloop_timeout timeout;
+ bool complete;
+ uc_vm_t *vm;
+ uc_value_t *callback;
+ uc_value_t *response;
+} ubus_deferred;
+
static uc_value_t *
uc_ubus_error(uc_vm_t *vm, size_t nargs)
{
@@ -236,6 +251,89 @@ uc_ubus_call_cb(struct ubus_request *req, int type, struct blob_attr *msg)
*res = msg ? uc_blob_array_to_json(NULL, blob_data(msg), blob_len(msg), true) : NULL;
}
+static void
+uc_ubus_invoke_async_callback(ubus_deferred *defer, int ret, uc_value_t *reply)
+{
+ uc_resource_t *r;
+ size_t i;
+
+ if (defer->callback) {
+ uc_vm_stack_push(defer->vm, ucv_get(defer->callback));
+ uc_vm_stack_push(defer->vm, ucv_int64_new(ret));
+ uc_vm_stack_push(defer->vm, ucv_get(reply));
+
+ if (uc_vm_call(defer->vm, false, 2) == EXCEPTION_NONE)
+ ucv_put(uc_vm_stack_pop(defer->vm));
+
+ defer->callback = NULL;
+ }
+
+ for (i = 0; i < ucv_array_length(cb_registry); i += 2) {
+ r = (uc_resource_t *)ucv_array_get(cb_registry, i);
+
+ if (r && r->data == defer) {
+ ucv_array_set(cb_registry, i, NULL);
+ ucv_array_set(cb_registry, i + 1, NULL);
+ break;
+ }
+ }
+
+ n_cb_active--;
+
+ if (have_own_uloop && n_cb_active == 0)
+ uloop_end();
+}
+
+static void
+uc_ubus_call_data_cb(struct ubus_request *req, int type, struct blob_attr *msg)
+{
+ ubus_deferred *defer = container_of(req, ubus_deferred, request);
+
+ if (defer->response == NULL)
+ defer->response = uc_blob_array_to_json(defer->vm, blob_data(msg), blob_len(msg), true);
+}
+
+static void
+uc_ubus_call_done_cb(struct ubus_request *req, int ret)
+{
+ ubus_deferred *defer = container_of(req, ubus_deferred, request);
+
+ if (defer->complete)
+ return;
+
+ defer->complete = true;
+ uloop_timeout_cancel(&defer->timeout);
+
+ uc_ubus_invoke_async_callback(defer, ret, defer->response);
+}
+
+static void
+uc_ubus_call_timeout_cb(struct uloop_timeout *timeout)
+{
+ ubus_deferred *defer = container_of(timeout, ubus_deferred, timeout);
+
+ if (defer->complete)
+ return;
+
+ defer->complete = true;
+ ubus_abort_request(defer->context, &defer->request);
+
+ uc_ubus_invoke_async_callback(defer, UBUS_STATUS_TIMEOUT, NULL);
+}
+
+static bool
+uc_ubus_have_uloop(void)
+{
+ bool prev = uloop_cancelled;
+ bool active;
+
+ uloop_cancelled = true;
+ active = uloop_cancelling();
+ uloop_cancelled = prev;
+
+ return active;
+}
+
static uc_value_t *
uc_ubus_call(uc_vm_t *vm, size_t nargs)
{
@@ -244,8 +342,8 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs)
uc_value_t *funname = uc_fn_arg(1);
uc_value_t *funargs = uc_fn_arg(2);
uc_value_t *res = NULL;
- json_object *o;
enum ubus_msg_status rv;
+ json_object *o;
uint32_t id;
if (!c || !*c || !(*c)->ctx)
@@ -282,6 +380,95 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs)
}
static uc_value_t *
+uc_ubus_defer(uc_vm_t *vm, size_t nargs)
+{
+ ubus_connection **c = uc_fn_this("ubus.connection");
+ uc_value_t *objname = uc_fn_arg(0);
+ uc_value_t *funname = uc_fn_arg(1);
+ uc_value_t *funargs = uc_fn_arg(2);
+ uc_value_t *replycb = uc_fn_arg(3);
+ uc_value_t *res = NULL;
+ enum ubus_msg_status rv;
+ ubus_deferred *defer;
+ json_object *o;
+ uint32_t id;
+ size_t i;
+
+ if (!c || !*c || !(*c)->ctx)
+ err_return(UBUS_STATUS_CONNECTION_FAILED);
+
+ if (ucv_type(objname) != UC_STRING ||
+ ucv_type(funname) != UC_STRING ||
+ (funargs && ucv_type(funargs) != UC_OBJECT) ||
+ (replycb && !ucv_is_callable(replycb)))
+ err_return(UBUS_STATUS_INVALID_ARGUMENT);
+
+ blob_buf_init(&(*c)->buf, 0);
+
+ if (funargs) {
+ o = ucv_to_json(funargs);
+ rv = blobmsg_add_object(&(*c)->buf, o);
+ json_object_put(o);
+
+ if (!rv)
+ err_return(UBUS_STATUS_UNKNOWN_ERROR);
+ }
+
+ rv = ubus_lookup_id((*c)->ctx, ucv_string_get(objname), &id);
+
+ if (rv != UBUS_STATUS_OK)
+ err_return(rv);
+
+ defer = xalloc(sizeof(*defer));
+
+ rv = ubus_invoke_async((*c)->ctx, id, ucv_string_get(funname),
+ (*c)->buf.head, &defer->request);
+
+ if (rv == UBUS_STATUS_OK) {
+ defer->vm = vm;
+ defer->context = (*c)->ctx;
+ defer->callback = replycb;
+
+ defer->request.data_cb = uc_ubus_call_data_cb;
+ defer->request.complete_cb = uc_ubus_call_done_cb;
+ ubus_complete_request_async((*c)->ctx, &defer->request);
+
+ defer->timeout.cb = uc_ubus_call_timeout_cb;
+ uloop_timeout_set(&defer->timeout, (*c)->timeout * 1000);
+
+ res = uc_resource_new(defer_type, defer);
+
+ for (i = 0;; i += 2) {
+ if (ucv_array_get(cb_registry, i) == NULL) {
+ ucv_array_set(cb_registry, i, ucv_get(res));
+ ucv_array_set(cb_registry, i + 1, ucv_get(replycb));
+ n_cb_active++;
+ break;
+ }
+ }
+
+ if (!uc_ubus_have_uloop()) {
+ have_own_uloop = true;
+ uloop_run();
+ }
+ }
+ else {
+ uc_vm_stack_push(vm, ucv_get(replycb));
+ uc_vm_stack_push(vm, ucv_int64_new(rv));
+
+ if (uc_vm_call(vm, false, 1) == EXCEPTION_NONE)
+ ucv_put(uc_vm_stack_pop(vm));
+
+ free(defer);
+ }
+
+ if (rv != UBUS_STATUS_OK)
+ err_return(rv);
+
+ return res;
+}
+
+static uc_value_t *
uc_ubus_disconnect(uc_vm_t *vm, size_t nargs)
{
ubus_connection **c = uc_fn_this("ubus.connection");
@@ -295,6 +482,54 @@ uc_ubus_disconnect(uc_vm_t *vm, size_t nargs)
return ucv_boolean_new(true);
}
+static uc_value_t *
+uc_ubus_defer_complete(uc_vm_t *vm, size_t nargs)
+{
+ ubus_deferred **d = uc_fn_this("ubus.deferred");
+
+ if (!d || !*d)
+ return NULL;
+
+ return ucv_boolean_new((*d)->complete);
+}
+
+static uc_value_t *
+uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs)
+{
+ ubus_deferred **d = uc_fn_this("ubus.deferred");
+ uc_resource_t *r;
+ size_t i;
+
+ if (!d || !*d)
+ return NULL;
+
+ if ((*d)->complete)
+ return ucv_boolean_new(false);
+
+ ubus_abort_request((*d)->context, &(*d)->request);
+ uloop_timeout_cancel(&(*d)->timeout);
+
+ for (i = 0; i < ucv_array_length(cb_registry); i += 2) {
+ r = (uc_resource_t *)ucv_array_get(cb_registry, i);
+
+ if (r && r->data == *d) {
+ ucv_array_set(cb_registry, i, NULL);
+ ucv_array_set(cb_registry, i + 1, NULL);
+ break;
+ }
+ }
+
+ n_cb_active--;
+
+ if (have_own_uloop && n_cb_active == 0)
+ uloop_end();
+
+ (*d)->callback = NULL;
+ (*d)->complete = true;
+
+ return ucv_boolean_new(true);
+}
+
static const uc_function_list_t global_fns[] = {
{ "error", uc_ubus_error },
@@ -304,10 +539,16 @@ static const uc_function_list_t global_fns[] = {
static const uc_function_list_t conn_fns[] = {
{ "list", uc_ubus_list },
{ "call", uc_ubus_call },
+ { "defer", uc_ubus_defer },
{ "error", uc_ubus_error },
{ "disconnect", uc_ubus_disconnect },
};
+static const uc_function_list_t defer_fns[] = {
+ { "complete", uc_ubus_defer_complete },
+ { "abort", uc_ubus_defer_abort },
+};
+
static void close_connection(void *ud) {
ubus_connection *conn = ud;
@@ -320,9 +561,21 @@ static void close_connection(void *ud) {
free(conn);
}
+static void close_deferred(void *ud) {
+ ubus_deferred *defer = ud;
+
+ uloop_timeout_cancel(&defer->timeout);
+ ucv_put(defer->response);
+ free(defer);
+}
+
void uc_module_init(uc_vm_t *vm, uc_value_t *scope)
{
uc_function_list_register(scope, global_fns);
conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, close_connection);
+ defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, close_deferred);
+ cb_registry = ucv_array_new(vm);
+
+ uc_vm_registry_set(vm, "ubus.cb_registry", cb_registry);
}
diff --git a/types.c b/types.c
index c3032e1..a8ca023 100644
--- a/types.c
+++ b/types.c
@@ -2078,6 +2078,7 @@ ucv_gc_common(uc_vm_t *vm, bool final)
if (!final) {
/* mark reachable objects */
ucv_gc_mark(vm->globals);
+ ucv_gc_mark(vm->registry);
ucv_gc_mark(vm->exception.stacktrace);
for (i = 0; i < vm->callframes.count; i++) {
diff --git a/vm.c b/vm.c
index a1be51b..e21bc4b 100644
--- a/vm.c
+++ b/vm.c
@@ -2480,3 +2480,34 @@ uc_vm_trace_set(uc_vm_t *vm, uint32_t level)
{
vm->trace = level;
}
+
+bool
+uc_vm_registry_exists(uc_vm_t *vm, const char *key)
+{
+ bool exists;
+
+ ucv_object_get(vm->registry, key, &exists);
+
+ return exists;
+}
+
+uc_value_t *
+uc_vm_registry_get(uc_vm_t *vm, const char *key)
+{
+ return ucv_object_get(vm->registry, key, NULL);
+}
+
+void
+uc_vm_registry_set(uc_vm_t *vm, const char *key, uc_value_t *value)
+{
+ if (!vm->registry)
+ vm->registry = ucv_object_new(vm);
+
+ ucv_object_add(vm->registry, key, value);
+}
+
+bool
+uc_vm_registry_delete(uc_vm_t *vm, const char *key)
+{
+ return ucv_object_delete(vm->registry, key);
+}