summaryrefslogtreecommitdiffhomepage
path: root/libs/httpd/luasrc/copas.lua
diff options
context:
space:
mode:
authorSteven Barth <steven@midlink.org>2008-06-16 19:47:57 +0000
committerSteven Barth <steven@midlink.org>2008-06-16 19:47:57 +0000
commitdfe85d7e518cc35a55e372b0ac31345788a486a8 (patch)
tree5651cf8017bbb58f2da9eb93ca2013236b30eac6 /libs/httpd/luasrc/copas.lua
parent77da39e7762f7494399e7bad88709764cd6d8f44 (diff)
* Added preliminary HTTPD construct
Diffstat (limited to 'libs/httpd/luasrc/copas.lua')
-rw-r--r--libs/httpd/luasrc/copas.lua439
1 files changed, 439 insertions, 0 deletions
diff --git a/libs/httpd/luasrc/copas.lua b/libs/httpd/luasrc/copas.lua
new file mode 100644
index 0000000000..262096ac00
--- /dev/null
+++ b/libs/httpd/luasrc/copas.lua
@@ -0,0 +1,439 @@
+-------------------------------------------------------------------------------
+-- Copas - Coroutine Oriented Portable Asynchronous Services
+--
+-- Offers a dispatcher and socket operations based on coroutines.
+-- Usage:
+-- copas.addserver(server, handler, timeout)
+-- copas.addthread(thread, ...) Create a new coroutine thread and run it with args
+-- copas.loop(timeout) - listens infinetely
+-- copas.step(timeout) - executes one listening step
+-- copas.receive(pattern or number) - receives data from a socket
+-- copas.settimeout(client, time) if time=0 copas.receive(bufferSize) - receives partial data from a socket were data<=bufferSize
+-- copas.send - sends data through a socket
+-- copas.wrap - wraps a LuaSocket socket with Copas methods
+-- copas.connect - blocks only the thread until connection completes
+-- copas.flush - *deprecated* do nothing
+--
+-- Authors: Andre Carregal and Javier Guerra
+-- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
+-- Thomas Harning Jr. and Gary NG
+--
+-- Copyright 2005 - Kepler Project (www.keplerproject.org)
+--
+-- $Id: copas.lua,v 1.31 2008/05/19 18:57:13 carregal Exp $
+-------------------------------------------------------------------------------
+local socket = require "socket"
+
+require"luci.util"
+local copcall = luci.util.copcall
+
+
+local WATCH_DOG_TIMEOUT = 120
+
+-- Redefines LuaSocket functions with coroutine safe versions
+-- (this allows the use of socket.http from within copas)
+local function statusHandler(status, ...)
+ if status then return ... end
+ return nil, ...
+end
+function socket.protect(func)
+ return function (...)
+ return statusHandler(copcall(func, ...))
+ end
+end
+
+function socket.newtry(finalizer)
+ return function (...)
+ local status = (...) or false
+ if (status==false)then
+ copcall(finalizer, select(2, ...) )
+ error((select(2, ...)), 0)
+ end
+ return ...
+ end
+end
+-- end of LuaSocket redefinitions
+
+
+module ("copas", package.seeall)
+
+-- Meta information is public even if begining with an "_"
+_COPYRIGHT = "Copyright (C) 2005 Kepler Project"
+_DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
+_VERSION = "Copas 1.1.3"
+
+-------------------------------------------------------------------------------
+-- Simple set implementation based on LuaSocket's tinyirc.lua example
+-- adds a FIFO queue for each value in the set
+-------------------------------------------------------------------------------
+local function newset()
+ local reverse = {}
+ local set = {}
+ local q = {}
+ setmetatable(set, { __index = {
+ insert = function(set, value)
+ if not reverse[value] then
+ set[#set + 1] = value
+ reverse[value] = #set
+ end
+ end,
+
+ remove = function(set, value)
+ local index = reverse[value]
+ if index then
+ reverse[value] = nil
+ local top = set[#set]
+ set[#set] = nil
+ if top ~= value then
+ reverse[top] = index
+ set[index] = top
+ end
+ end
+ end,
+
+ push = function (set, key, itm)
+ local qKey = q[key]
+ if qKey == nil then
+ q[key] = {itm}
+ else
+ qKey[#qKey + 1] = itm
+ end
+ end,
+
+ pop = function (set, key)
+ local t = q[key]
+ if t ~= nil then
+ local ret = table.remove (t, 1)
+ if t[1] == nil then
+ q[key] = nil
+ end
+ return ret
+ end
+ end
+ }})
+ return set
+end
+
+local _servers = newset() -- servers being handled
+local _reading_log = {}
+local _writing_log = {}
+
+local _reading = newset() -- sockets currently being read
+local _writing = newset() -- sockets currently being written
+
+-------------------------------------------------------------------------------
+-- Coroutine based socket I/O functions.
+-------------------------------------------------------------------------------
+-- reads a pattern from a client and yields to the reading set on timeouts
+function receive(client, pattern, part)
+ local s, err
+ pattern = pattern or "*l"
+ repeat
+ s, err, part = client:receive(pattern, part)
+ if s or err ~= "timeout" then
+ _reading_log[client] = nil
+ return s, err, part
+ end
+ _reading_log[client] = os.time()
+ coroutine.yield(client, _reading)
+ until false
+end
+
+-- same as above but with special treatment when reading chunks,
+-- unblocks on any data received.
+function receivePartial(client, pattern)
+ local s, err, part
+ pattern = pattern or "*l"
+ repeat
+ s, err, part = client:receive(pattern)
+ if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or
+ err ~= "timeout" then
+ _reading_log[client] = nil
+ return s, err, part
+ end
+ _reading_log[client] = os.time()
+ coroutine.yield(client, _reading)
+ until false
+end
+
+-- sends data to a client. The operation is buffered and
+-- yields to the writing set on timeouts
+function send(client,data, from, to)
+ local s, err,sent
+ from = from or 1
+ local lastIndex = from - 1
+
+ repeat
+ s, err, lastIndex = client:send(data, lastIndex + 1, to)
+ -- adds extra corrotine swap
+ -- garantees that high throuput dont take other threads to starvation
+ if (math.random(100) > 90) then
+ _writing_log[client] = os.time()
+ coroutine.yield(client, _writing)
+ end
+ if s or err ~= "timeout" then
+ _writing_log[client] = nil
+ return s, err,lastIndex
+ end
+ _writing_log[client] = os.time()
+ coroutine.yield(client, _writing)
+ until false
+end
+
+-- waits until connection is completed
+function connect(skt,host, port)
+ skt:settimeout(0)
+ local ret,err = skt:connect (host, port)
+ if ret or err ~= "timeout" then
+ return ret, err
+ end
+ _writing_log[skt] = os.time()
+ coroutine.yield(skt, _writing)
+ ret,err = skt:connect (host, port)
+ _writing_log[skt] = nil
+ if (err=="already connected") then
+ return 1
+ end
+ return ret, err
+end
+
+-- flushes a client write buffer (deprecated)
+function flush(client)
+end
+
+-- wraps a socket to use Copas methods (send, receive, flush and settimeout)
+local _skt_mt = {__index = {
+ send = function (self, data, from, to)
+ return send (self.socket, data, from, to)
+ end,
+
+ receive = function (self, pattern)
+ if (self.timeout==0) then
+ return receivePartial(self.socket, pattern)
+ end
+ return receive (self.socket, pattern)
+ end,
+
+ flush = function (self)
+ return flush (self.socket)
+ end,
+
+ settimeout = function (self,time)
+ self.timeout=time
+ return
+ end,
+}}
+
+function wrap (skt)
+ return setmetatable ({socket = skt}, _skt_mt)
+end
+
+--------------------------------------------------
+-- Error handling
+--------------------------------------------------
+
+local _errhandlers = {} -- error handler per coroutine
+
+function setErrorHandler (err)
+ local co = coroutine.running()
+ if co then
+ _errhandlers [co] = err
+ end
+end
+
+local function _deferror (msg, co, skt)
+ print (msg, co, skt)
+end
+
+-------------------------------------------------------------------------------
+-- Thread handling
+-------------------------------------------------------------------------------
+
+local function _doTick (co, skt, ...)
+ if not co then return end
+
+ local ok, res, new_q = coroutine.resume(co, skt, ...)
+
+ if ok and res and new_q then
+ new_q:insert (res)
+ new_q:push (res, co)
+ else
+ if not ok then copcall (_errhandlers [co] or _deferror, res, co, skt) end
+ if skt then skt:close() end
+ _errhandlers [co] = nil
+ end
+end
+
+-- accepts a connection on socket input
+local function _accept(input, handler)
+ local client = input:accept()
+ if client then
+ client:settimeout(0)
+ local co = coroutine.create(handler)
+ _doTick (co, client)
+ --_reading:insert(client)
+ end
+ return client
+end
+
+-- handle threads on a queue
+local function _tickRead (skt)
+ _doTick (_reading:pop (skt), skt)
+end
+
+local function _tickWrite (skt)
+ _doTick (_writing:pop (skt), skt)
+end
+
+-------------------------------------------------------------------------------
+-- Adds a server/handler pair to Copas dispatcher
+-------------------------------------------------------------------------------
+function addserver(server, handler, timeout)
+ server:settimeout(timeout or 0.1)
+ _servers[server] = handler
+ _reading:insert(server)
+end
+
+-------------------------------------------------------------------------------
+-- Adds an new courotine thread to Copas dispatcher
+-------------------------------------------------------------------------------
+function addthread(thread, ...)
+ local co = coroutine.create(thread)
+ _doTick (co, nil, ...)
+end
+
+-------------------------------------------------------------------------------
+-- tasks registering
+-------------------------------------------------------------------------------
+
+local _tasks = {}
+
+local function addtaskRead (tsk)
+ -- lets tasks call the default _tick()
+ tsk.def_tick = _tickRead
+
+ _tasks [tsk] = true
+end
+
+local function addtaskWrite (tsk)
+ -- lets tasks call the default _tick()
+ tsk.def_tick = _tickWrite
+
+ _tasks [tsk] = true
+end
+
+local function tasks ()
+ return next, _tasks
+end
+
+-------------------------------------------------------------------------------
+-- main tasks: manage readable and writable socket sets
+-------------------------------------------------------------------------------
+-- a task to check ready to read events
+local _readable_t = {
+ events = function(self)
+ local i = 0
+ return function ()
+ i = i + 1
+ return self._evs [i]
+ end
+ end,
+
+ tick = function (self, input)
+ local handler = _servers[input]
+ if handler then
+ input = _accept(input, handler)
+ else
+ _reading:remove (input)
+ self.def_tick (input)
+ end
+ end
+}
+
+addtaskRead (_readable_t)
+
+
+-- a task to check ready to write events
+local _writable_t = {
+ events = function (self)
+ local i = 0
+ return function ()
+ i = i+1
+ return self._evs [i]
+ end
+ end,
+
+ tick = function (self, output)
+ _writing:remove (output)
+ self.def_tick (output)
+ end
+}
+
+addtaskWrite (_writable_t)
+
+local last_cleansing = 0
+local function _select (timeout)
+ local err
+ local readable={}
+ local writable={}
+ local r={}
+ local w={}
+ local now = os.time()
+ local duration = os.difftime
+
+
+ _readable_t._evs, _writable_t._evs, err = socket.select(_reading, _writing, timeout)
+ local r_evs, w_evs = _readable_t._evs, _writable_t._evs
+
+ if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
+ last_cleansing = now
+ for k,v in pairs(_reading_log) do
+ if not r_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then
+ _reading_log[k] = nil
+ r_evs[#r_evs + 1] = k
+ r_evs[k] = #r_evs
+ end
+ end
+
+ for k,v in pairs(_writing_log) do
+ if not w_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then
+ _writing_log[k] = nil
+ w_evs[#w_evs + 1] = k
+ w_evs[k] = #w_evs
+ end
+ end
+ end
+
+ if err == "timeout" and #r_evs + #w_evs > 0 then return nil
+ else return err end
+end
+
+
+-------------------------------------------------------------------------------
+-- Dispatcher loop step.
+-- Listen to client requests and handles them
+-------------------------------------------------------------------------------
+function step(timeout)
+ local err = _select (timeout)
+ if err == "timeout" then return end
+
+ if err then
+ error(err)
+ end
+
+ for tsk in tasks() do
+ for ev in tsk:events () do
+ tsk:tick (ev)
+ end
+ end
+end
+
+-------------------------------------------------------------------------------
+-- Dispatcher endless loop.
+-- Listen to client requests and handles them forever
+-------------------------------------------------------------------------------
+function loop(timeout)
+ while true do
+ step(timeout)
+ end
+end