#!/usr/bin/env lua local json = require "luci.jsonc" local UCI = require "luci.model.uci" local fs = require "nixio.fs" local sys = require "luci.sys" local methods = { get_status = { call = function() local uci = UCI.cursor() local lease_file = uci:get("upnpd", "config", "upnp_lease_file") local ipv4_hints = sys.net.ipv4_hints() local rule = { } local ipt = io.popen("iptables --line-numbers -t nat -xnvL MINIUPNPD 2>/dev/null") if ipt then local upnpf = lease_file and io.open(lease_file, "r") while true do local ln = ipt:read("*l") if not ln then break elseif ln:match("^%d+") then local num, proto, extport, intaddr, intport = ln:match("^(%d+).-([a-z]+).-dpt:(%d+) to:(%S-):(%d+)") local descr = "" if num and proto and extport and intaddr and intport then extport = tonumber(extport) intport = tonumber(intport) if upnpf then local uln = upnpf:read("*l") if uln then descr = uln:match(string.format("^%s:%d:%s:%d:%%d*:(.*)$", proto:upper(), extport, intaddr, intport)) end if not descr then descr = "" end end local host_hint, _, e for _,e in pairs(ipv4_hints) do if e[1] == intaddr then host_hint = e[2] break end end rule[#rule+1] = { num = num, proto = proto:upper(), extport = extport, intaddr = intaddr, host_hint = host_hint, intport = intport, descr = descr } end end end if upnpf then upnpf:close() end ipt:close() end return { rules = rule } end }, delete_rule = { args = { token = "token" }, call = function(args) local util = require "luci.util" local idx = args and tonumber(args.token) local res = {} if idx and idx > 0 then local uci = UCI.cursor() sys.call("iptables -t filter -D MINIUPNPD %d 2>/dev/null" % idx) sys.call("iptables -t nat -D MINIUPNPD %d 2>/dev/null" % idx) local lease_file = uci:get("upnpd", "config", "upnp_lease_file") if lease_file and fs.access(lease_file) then sys.call("sed -i -e '%dd' %s" %{ idx, util.shellquote(lease_file) }) end uci.unload() return { result = "OK" } end return { result = "Bad request" } end } } local function parseInput() local parse = json.new() local done, err while true do local chunk = io.read(4096) if not chunk then break elseif not done and not err then done, err = parse:parse(chunk) end end if not done then print(json.stringify({ error = err or "Incomplete input" })) os.exit(1) end return parse:get() end local function validateArgs(func, uargs) local method = methods[func] if not method then print(json.stringify({ error = "Method not found" })) os.exit(1) end if type(uargs) ~= "table" then print(json.stringify({ error = "Invalid arguments" })) os.exit(1) end uargs.ubus_rpc_session = nil local k, v local margs = method.args or {} for k, v in pairs(uargs) do if margs[k] == nil or (v ~= nil and type(v) ~= type(margs[k])) then print(json.stringify({ error = "Invalid arguments" })) os.exit(1) end end return method end if arg[1] == "list" then local _, method, rv = nil, nil, {} for _, method in pairs(methods) do rv[_] = method.args or {} end print((json.stringify(rv):gsub(":%[%]", ":{}"))) elseif arg[1] == "call" then local args = parseInput() local method = validateArgs(arg[2], args) local result, code = method.call(args) print((json.stringify(result):gsub("^%[%]$", "{}"))) os.exit(code or 0) end