[PATCH] ubus: lua: Add support for async calls in lua

Maciej Krüger mkg20001 at mkg20001.io
Sat Jan 21 11:31:01 PST 2023


From: Maciej Krüger <mkg20001 at gmail.com>

This extends the conn:call function to take a function
as it's last parameter, which will make the library
use ubus_invoke_async.

This allows streaming the logs from ubus,
among other things.

An example has been provided

Signed-off-by: Maciej Krüger <mkg20001 at gmail.com>
---
 lua/stream_logs.lua |  29 ++++++++
 lua/ubus.c          | 168 +++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 195 insertions(+), 2 deletions(-)
 create mode 100644 lua/stream_logs.lua

diff --git a/lua/stream_logs.lua b/lua/stream_logs.lua
new file mode 100644
index 0000000..5490b02
--- /dev/null
+++ b/lua/stream_logs.lua
@@ -0,0 +1,29 @@
+-- Load modules
+require "ubus"
+require "uloop"
+
+uloop.init()
+
+-- Establish connection
+local conn = ubus.connect()
+if not conn then
+        error("Failed to connect to ubusd")
+end
+
+local function handleLog (log)
+        for k, v in pairs(log) do
+                print(k , v)
+        end
+end
+
+-- Stream logs
+local ret = conn:call("log", "read", { stream = true, oneshot = false, lines = 0 }, function (log, control)
+        if control then
+                print('Control event', control.type)
+        else
+                handleLog(log)
+        end
+end)
+
+uloop.run()
+
diff --git a/lua/ubus.c b/lua/ubus.c
index 07b816d..51643df 100644
--- a/lua/ubus.c
+++ b/lua/ubus.c
@@ -19,6 +19,7 @@
 #include <libubox/blobmsg_json.h>
 #include <lauxlib.h>
 #include <lua.h>
+#include <libubox/ustream.h>
 
 #define MODNAME		"ubus"
 #define METANAME	MODNAME ".meta"
@@ -42,6 +43,12 @@ struct ubus_lua_event {
 	int r;
 };
 
+struct ubus_lua_request {
+	struct ubus_request r;
+	struct ustream_fd fd;
+	int fnc;
+};
+
 struct ubus_lua_subscriber {
 	struct ubus_subscriber s;
 	int rnotify;
@@ -660,6 +667,134 @@ ubus_lua_call_cb(struct ubus_request *req, int type, struct blob_attr *msg)
 		ubus_lua_parse_blob_array(L, blob_data(msg), blob_len(msg), true);
 }
 
+static void
+ubus_lua_async_complete_cb(struct ubus_request *req, int ret)
+{
+	struct ubus_lua_request *lureq = container_of(req, struct ubus_lua_request, r);
+
+	lua_getglobal(state, "__ubus_cb_async");
+	lua_rawgeti(state, -1, lureq->fnc);
+	lua_remove(state, -2);
+
+	if (lua_isfunction(state, -1)) {
+		lua_pushnil(state);
+
+		lua_newtable(state);
+
+		lua_pushstring(state, "type");
+		lua_pushstring(state, "connected");
+		lua_settable(state, -3);
+
+		lua_pushstring(state, "return");
+		lua_pushnumber(state, ret);
+		lua_settable(state, -3);
+
+		lua_call(state, 2, 0);
+	} else {
+		lua_pop(state, 1);
+	}
+}
+
+static void
+ubus_lua_async_cb(struct ustream *s, struct blob_attr *msg)
+{
+	struct ubus_lua_request *lureq = container_of(s, struct ubus_lua_request, fd.stream);
+
+	lua_getglobal(state, "__ubus_cb_async");
+	lua_rawgeti(state, -1, lureq->fnc);
+	lua_remove(state, -2);
+
+	if (lua_isfunction(state, -1)) {
+		if( msg ){
+			ubus_lua_parse_blob_array(state, blob_data(msg), blob_len(msg), true);
+		} else {
+			lua_pushnil(state);
+		}
+		lua_call(state, 1, 0);
+	} else {
+		lua_pop(state, 1);
+	}
+}
+
+static void
+ubus_lua_async_data_cb(struct ustream *s, int bytes)
+{
+	while (true) {
+		struct blob_attr *a;
+		int len, cur_len;
+
+		a = (void*) ustream_get_read_buf(s, &len);
+		if (len < (int)sizeof(*a))
+			break;
+
+		cur_len = blob_len(a) + sizeof(*a);
+		if (len < cur_len)
+			break;
+
+		ubus_lua_async_cb(s, a);
+		ustream_consume(s, cur_len);
+	}
+}
+
+static void
+ubus_lua_async_state_cb(struct ustream *s)
+{
+	struct ubus_lua_request *lureq = container_of(s, struct ubus_lua_request, fd.stream);
+
+	lua_getglobal(state, "__ubus_cb_async");
+	lua_rawgeti(state, -1, lureq->fnc);
+	lua_remove(state, -2);
+
+	if (lua_isfunction(state, -1)) {
+		lua_pushnil(state);
+
+		lua_newtable(state);
+
+		lua_pushstring(state, "type");
+		lua_pushstring(state, "closed");
+		lua_settable(state, -3);
+
+		lua_call(state, 2, 0);
+	} else {
+		lua_pop(state, 1);
+	}
+}
+
+static void
+ubus_lua_async_fd_cb(struct ubus_request *req, int fd)
+{
+	struct ubus_lua_request *lureq = container_of(req, struct ubus_lua_request, r);
+
+	lureq->fd.stream.notify_read = ubus_lua_async_data_cb;
+	lureq->fd.stream.notify_state = ubus_lua_async_state_cb;
+	ustream_fd_init(&lureq->fd, fd);
+}
+
+static int
+ubus_lua_register_async( struct ubus_lua_request ** retlureq, struct ubus_context *ctx, lua_State *L,
+                       int fnc )
+{
+	struct ubus_lua_request *lureq;
+
+	lureq = calloc( 1, sizeof( struct ubus_lua_request ) );
+	if( !lureq ){
+		lua_pushstring( L, "Out of memory" );
+		return lua_error(L);
+	}
+
+	lua_getglobal(L, "__ubus_cb_async");
+	lua_pushvalue(L, fnc);
+	lureq->fnc = luaL_ref(L, -2);
+	lua_pop(L, 1);
+
+	// remove the fnc
+	lua_pop(L, 1);
+
+	*retlureq = lureq;
+
+	return 0;
+}
+
 static int
 ubus_lua_call(lua_State *L)
 {
@@ -669,6 +804,20 @@ ubus_lua_call(lua_State *L)
 	const char *path = luaL_checkstring(L, 2);
 	const char *func = luaL_checkstring(L, 3);
 
+	bool isAsync = lua_isfunction(L, 5);
+	struct ubus_lua_request * req = NULL;
+
+	if (isAsync) {
+		int ret = ubus_lua_register_async(&req, c->ctx, L, lua_gettop(L));
+		if (ret) {
+			return ret;
+		}
+		if (!req) {
+			lua_pushstring(L, "Failed to register async callback");
+			return lua_error( L );
+		}
+	}
+
 	luaL_checktype(L, 4, LUA_TTABLE);
 	blob_buf_init(&c->buf, 0);
 
@@ -689,7 +838,14 @@ ubus_lua_call(lua_State *L)
 	}
 
 	top = lua_gettop(L);
-	rv = ubus_invoke(c->ctx, id, func, c->buf.head, ubus_lua_call_cb, L, c->timeout * 1000);
+
+	if (isAsync) {
+		rv = ubus_invoke_async(c->ctx, id, func, c->buf.head, &req->r);
+		req->r.fd_cb = ubus_lua_async_fd_cb;
+		req->r.complete_cb = ubus_lua_async_complete_cb;
+	} else {
+		rv = ubus_invoke(c->ctx, id, func, c->buf.head, ubus_lua_call_cb, L, c->timeout * 1000);
+	}
 
 	if (rv != UBUS_STATUS_OK)
 	{
@@ -699,6 +855,10 @@ ubus_lua_call(lua_State *L)
 		return 2;
 	}
 
+	if (isAsync) {
+		ubus_complete_request_async(c->ctx, &req->r);
+	}
+
 	return lua_gettop(L) - top;
 }
 
@@ -731,7 +891,7 @@ ubus_lua_load_event(lua_State *L)
 
 	event->e.cb = ubus_event_handler;
 
-	/* update the he callback lookup table */
+	/* update the callback lookup table */
 	lua_getglobal(L, "__ubus_cb_event");
 	lua_pushvalue(L, -2);
 	event->r = luaL_ref(L, -2);
@@ -1021,5 +1181,9 @@ luaopen_ubus(lua_State *L)
 	/* create the publisher table - notifications of new subs */
 	lua_createtable(L, 1, 0);
 	lua_setglobal(L, "__ubus_cb_publisher");
+
+	/* create the async table - callbacks for invoke_async */
+	lua_createtable(L, 1, 0);
+	lua_setglobal(L, "__ubus_cb_async");
 	return 0;
 }
-- 
2.38.1




More information about the openwrt-devel mailing list