[PATCH] ubus: lua: Add support for async calls in lua
Maciej Krüger
mkg20001 at mkg20001.io
Sat Jan 21 11:31:01 PST 2023
From: Maciej Krüger <mkg20001 at gmail.com>
This extends the conn:call function to take a function
as it's last parameter, which will make the library
use ubus_invoke_async.
This allows streaming the logs from ubus,
among other things.
An example has been provided
Signed-off-by: Maciej Krüger <mkg20001 at gmail.com>
---
lua/stream_logs.lua | 29 ++++++++
lua/ubus.c | 168 +++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 195 insertions(+), 2 deletions(-)
create mode 100644 lua/stream_logs.lua
diff --git a/lua/stream_logs.lua b/lua/stream_logs.lua
new file mode 100644
index 0000000..5490b02
--- /dev/null
+++ b/lua/stream_logs.lua
@@ -0,0 +1,29 @@
+-- Load modules
+require "ubus"
+require "uloop"
+
+uloop.init()
+
+-- Establish connection
+local conn = ubus.connect()
+if not conn then
+ error("Failed to connect to ubusd")
+end
+
+local function handleLog (log)
+ for k, v in pairs(log) do
+ print(k , v)
+ end
+end
+
+-- Stream logs
+local ret = conn:call("log", "read", { stream = true, oneshot = false, lines = 0 }, function (log, control)
+ if control then
+ print('Control event', control.type)
+ else
+ handleLog(log)
+ end
+end)
+
+uloop.run()
+
diff --git a/lua/ubus.c b/lua/ubus.c
index 07b816d..51643df 100644
--- a/lua/ubus.c
+++ b/lua/ubus.c
@@ -19,6 +19,7 @@
#include <libubox/blobmsg_json.h>
#include <lauxlib.h>
#include <lua.h>
+#include <libubox/ustream.h>
#define MODNAME "ubus"
#define METANAME MODNAME ".meta"
@@ -42,6 +43,12 @@ struct ubus_lua_event {
int r;
};
+struct ubus_lua_request {
+ struct ubus_request r;
+ struct ustream_fd fd;
+ int fnc;
+};
+
struct ubus_lua_subscriber {
struct ubus_subscriber s;
int rnotify;
@@ -660,6 +667,134 @@ ubus_lua_call_cb(struct ubus_request *req, int type, struct blob_attr *msg)
ubus_lua_parse_blob_array(L, blob_data(msg), blob_len(msg), true);
}
+static void
+ubus_lua_async_complete_cb(struct ubus_request *req, int ret)
+{
+ struct ubus_lua_request *lureq = container_of(req, struct ubus_lua_request, r);
+
+ lua_getglobal(state, "__ubus_cb_async");
+ lua_rawgeti(state, -1, lureq->fnc);
+ lua_remove(state, -2);
+
+ if (lua_isfunction(state, -1)) {
+ lua_pushnil(state);
+
+ lua_newtable(state);
+
+ lua_pushstring(state, "type");
+ lua_pushstring(state, "connected");
+ lua_settable(state, -3);
+
+ lua_pushstring(state, "return");
+ lua_pushnumber(state, ret);
+ lua_settable(state, -3);
+
+ lua_call(state, 2, 0);
+ } else {
+ lua_pop(state, 1);
+ }
+}
+
+static void
+ubus_lua_async_cb(struct ustream *s, struct blob_attr *msg)
+{
+ struct ubus_lua_request *lureq = container_of(s, struct ubus_lua_request, fd.stream);
+
+ lua_getglobal(state, "__ubus_cb_async");
+ lua_rawgeti(state, -1, lureq->fnc);
+ lua_remove(state, -2);
+
+ if (lua_isfunction(state, -1)) {
+ if( msg ){
+ ubus_lua_parse_blob_array(state, blob_data(msg), blob_len(msg), true);
+ } else {
+ lua_pushnil(state);
+ }
+ lua_call(state, 1, 0);
+ } else {
+ lua_pop(state, 1);
+ }
+}
+
+static void
+ubus_lua_async_data_cb(struct ustream *s, int bytes)
+{
+ while (true) {
+ struct blob_attr *a;
+ int len, cur_len;
+
+ a = (void*) ustream_get_read_buf(s, &len);
+ if (len < (int)sizeof(*a))
+ break;
+
+ cur_len = blob_len(a) + sizeof(*a);
+ if (len < cur_len)
+ break;
+
+ ubus_lua_async_cb(s, a);
+ ustream_consume(s, cur_len);
+ }
+}
+
+static void
+ubus_lua_async_state_cb(struct ustream *s)
+{
+ struct ubus_lua_request *lureq = container_of(s, struct ubus_lua_request, fd.stream);
+
+ lua_getglobal(state, "__ubus_cb_async");
+ lua_rawgeti(state, -1, lureq->fnc);
+ lua_remove(state, -2);
+
+ if (lua_isfunction(state, -1)) {
+ lua_pushnil(state);
+
+ lua_newtable(state);
+
+ lua_pushstring(state, "type");
+ lua_pushstring(state, "closed");
+ lua_settable(state, -3);
+
+ lua_call(state, 2, 0);
+ } else {
+ lua_pop(state, 1);
+ }
+}
+
+static void
+ubus_lua_async_fd_cb(struct ubus_request *req, int fd)
+{
+ struct ubus_lua_request *lureq = container_of(req, struct ubus_lua_request, r);
+
+ lureq->fd.stream.notify_read = ubus_lua_async_data_cb;
+ lureq->fd.stream.notify_state = ubus_lua_async_state_cb;
+ ustream_fd_init(&lureq->fd, fd);
+}
+
+static int
+ubus_lua_register_async( struct ubus_lua_request ** retlureq, struct ubus_context *ctx, lua_State *L,
+ int fnc )
+{
+ struct ubus_lua_request *lureq;
+
+ lureq = calloc( 1, sizeof( struct ubus_lua_request ) );
+ if( !lureq ){
+ lua_pushstring( L, "Out of memory" );
+ return lua_error(L);
+ }
+
+ lua_getglobal(L, "__ubus_cb_async");
+ lua_pushvalue(L, fnc);
+ lureq->fnc = luaL_ref(L, -2);
+ lua_pop(L, 1);
+
+ // remove the fnc
+ lua_pop(L, 1);
+
+ *retlureq = lureq;
+
+ return 0;
+}
+
static int
ubus_lua_call(lua_State *L)
{
@@ -669,6 +804,20 @@ ubus_lua_call(lua_State *L)
const char *path = luaL_checkstring(L, 2);
const char *func = luaL_checkstring(L, 3);
+ bool isAsync = lua_isfunction(L, 5);
+ struct ubus_lua_request * req = NULL;
+
+ if (isAsync) {
+ int ret = ubus_lua_register_async(&req, c->ctx, L, lua_gettop(L));
+ if (ret) {
+ return ret;
+ }
+ if (!req) {
+ lua_pushstring(L, "Failed to register async callback");
+ return lua_error( L );
+ }
+ }
+
luaL_checktype(L, 4, LUA_TTABLE);
blob_buf_init(&c->buf, 0);
@@ -689,7 +838,14 @@ ubus_lua_call(lua_State *L)
}
top = lua_gettop(L);
- rv = ubus_invoke(c->ctx, id, func, c->buf.head, ubus_lua_call_cb, L, c->timeout * 1000);
+
+ if (isAsync) {
+ rv = ubus_invoke_async(c->ctx, id, func, c->buf.head, &req->r);
+ req->r.fd_cb = ubus_lua_async_fd_cb;
+ req->r.complete_cb = ubus_lua_async_complete_cb;
+ } else {
+ rv = ubus_invoke(c->ctx, id, func, c->buf.head, ubus_lua_call_cb, L, c->timeout * 1000);
+ }
if (rv != UBUS_STATUS_OK)
{
@@ -699,6 +855,10 @@ ubus_lua_call(lua_State *L)
return 2;
}
+ if (isAsync) {
+ ubus_complete_request_async(c->ctx, &req->r);
+ }
+
return lua_gettop(L) - top;
}
@@ -731,7 +891,7 @@ ubus_lua_load_event(lua_State *L)
event->e.cb = ubus_event_handler;
- /* update the he callback lookup table */
+ /* update the callback lookup table */
lua_getglobal(L, "__ubus_cb_event");
lua_pushvalue(L, -2);
event->r = luaL_ref(L, -2);
@@ -1021,5 +1181,9 @@ luaopen_ubus(lua_State *L)
/* create the publisher table - notifications of new subs */
lua_createtable(L, 1, 0);
lua_setglobal(L, "__ubus_cb_publisher");
+
+ /* create the async table - callbacks for invoke_async */
+ lua_createtable(L, 1, 0);
+ lua_setglobal(L, "__ubus_cb_async");
return 0;
}
--
2.38.1
More information about the openwrt-devel
mailing list