diff options
author | Steven Barth <steven@midlink.org> | 2008-06-25 16:38:48 +0000 |
---|---|---|
committer | Steven Barth <steven@midlink.org> | 2008-06-25 16:38:48 +0000 |
commit | 7a4aa85dd64f72b9edcbf9310d0d95e59960d84e (patch) | |
tree | a1f18b6294c6423ed16d15eb76212030f7f068f6 /libs/httpd | |
parent | b85d292bcd13f7527d6a2c1d204ebf78bc417e06 (diff) |
* libs/httpd: Introduced keep-alive and pipelining support
Diffstat (limited to 'libs/httpd')
-rw-r--r-- | libs/httpd/luasrc/httpd.lua | 119 | ||||
-rw-r--r-- | libs/httpd/luasrc/httpd/handler/file.lua | 16 | ||||
-rw-r--r-- | libs/httpd/luasrc/httpd/handler/luci.lua | 15 | ||||
-rw-r--r-- | libs/httpd/luasrc/httpd/module.lua | 28 | ||||
-rw-r--r-- | libs/httpd/luasrc/httpd/server.lua | 247 |
5 files changed, 276 insertions, 149 deletions
diff --git a/libs/httpd/luasrc/httpd.lua b/libs/httpd/luasrc/httpd.lua index 6524bc1de..a9b1ccbb4 100644 --- a/libs/httpd/luasrc/httpd.lua +++ b/libs/httpd/luasrc/httpd.lua @@ -27,13 +27,59 @@ function Socket(ip, port) return sock, err end +Thread = luci.util.class() + +function Thread.__init__(self, socket, func) + self.socket = socket + self.routine = coroutine.create(func) + self.stamp = os.time() + self.waiting = false +end + +function Thread.getidletime(self) + return os.difftime(os.time(), self.stamp) +end + +function Thread.iswaiting(self) + return self.waiting +end + +function Thread.receive(self, ...) + local chunk, err, part + self.waiting = true + + repeat + coroutine.yield() + chunk, err, part = self.socket:receive(...) + until err ~= "timeout" + + self.waiting = false + return chunk, err, part +end + +function Thread.resume(self, ...) + return coroutine.resume(self.routine, self, ...) +end + +function Thread.status(self) + return coroutine.status(self.routine) +end + +function Thread.touch(self) + self.stamp = os.time() +end Daemon = luci.util.class() function Daemon.__init__(self, threadlimit, timeout) self.reading = {} - self.running = {} + self.threads = {} self.handler = {} + self.waiting = {} + self.threadc = 0 + + setmetatable(self.waiting, {__mode = "v"}) + self.debug = false self.threadlimit = threadlimit self.timeout = timeout or 0.1 @@ -58,10 +104,7 @@ end function Daemon.step(self) local input, output, err = socket.select( self.reading, nil, 0 ) - - if err == "timeout" and #self.running == 0 then - socket.sleep(self.timeout) - end + local working = false -- accept new connections for i, connection in ipairs(input) do @@ -70,19 +113,18 @@ function Daemon.step(self) if sock then -- check capacity - if not self.threadlimit or #self.running < self.threadlimit then + if not self.threadlimit or self.threadc < self.threadlimit then if self.debug then self:dprint("Accepted incoming connection from " .. sock:getpeername()) end - - table.insert( self.running, { - coroutine.create( self.handler[connection].clhandler ), - sock - } ) + + local t = Thread(sock, self.handler[connection].clhandler) + self.threads[sock] = t + self.threadc = self.threadc + 1 if self.debug then - self:dprint("Created " .. tostring(self.running[#self.running][1])) + self:dprint("Created " .. tostring(t)) end -- reject client @@ -101,27 +143,62 @@ function Daemon.step(self) end -- create client handler - for i, client in ipairs( self.running ) do + for sock, thread in pairs( self.threads ) do -- reap dead clients - if coroutine.status( client[1] ) == "dead" then + if thread:status() == "dead" then if self.debug then - self:dprint("Completed " .. tostring(client[1])) + self:dprint("Completed " .. tostring(thread)) end - table.remove( self.running, i ) - else + sock:close() + self.threadc = self.threadc - 1 + self.threads[sock] = nil + -- resume working threads + elseif not thread:iswaiting() then if self.debug then - self:dprint("Resuming " .. tostring(client[1])) + self:dprint("Resuming " .. tostring(thread)) end - local stat, err = coroutine.resume( client[1], client[2] ) + local stat, err = thread:resume() + if stat then + thread:touch() + if not thread:iswaiting() then + working = true + else + table.insert(self.waiting, sock) + end + end if self.debug then - self:dprint(tostring(client[1]) .. " returned") + self:dprint(tostring(thread) .. " returned") if not stat then - self:dprint("Error in " .. tostring(client[1]) .. " " .. err) + self:dprint("Error in " .. tostring(thread) .. " " .. err) + end + end + end + end + + -- check for data on waiting threads + input, output, err = socket.select( self.waiting, nil, 0 ) + + for i, sock in ipairs(input) do + self.threads[sock]:resume() + self.threads[sock]:touch() + + if not self.threads[sock]:iswaiting() then + for i, s in ipairs(self.waiting) do + if s == sock then + table.remove(self.waiting, i) + break end end + if not working then + working = true + end end end + + if err == "timeout" and not working then + socket.sleep(self.timeout) + end end diff --git a/libs/httpd/luasrc/httpd/handler/file.lua b/libs/httpd/luasrc/httpd/handler/file.lua index 83549f338..8d75edc9a 100644 --- a/libs/httpd/luasrc/httpd/handler/file.lua +++ b/libs/httpd/luasrc/httpd/handler/file.lua @@ -11,18 +11,28 @@ function Simple.__init__(self, docroot) self.docroot = docroot end -function Simple.handle(self, request, sourcein, sinkerr) - local uri = request.env.PATH_INFO +function Simple.getfile(self, uri) local file = self.docroot .. uri:gsub("%.%./", "") local stat = luci.fs.stat(file) + + return file, stat +end + +function Simple.handle_get(self, request, sourcein, sinkerr) + local file, stat = self:getfile(request.env.PATH_INFO) if stat then if stat.type == "regular" then return Response(200, {["Content-Length"] = stat.size}), ltn12.source.file(io.open(file)) else - return self:failure(403, "Unable to transmit " .. stat.type .. " " .. uri) + return self:failure(403, "Unable to transmit " .. stat.type .. " " .. request.env.PATH_INFO) end else return self:failure(404, "No such file: " .. uri) end +end + +function Simple.handle_head(self, ...) + local response, sourceout = self:handle_get(...) + return response end
\ No newline at end of file diff --git a/libs/httpd/luasrc/httpd/handler/luci.lua b/libs/httpd/luasrc/httpd/handler/luci.lua index e4916bd2c..35f832d45 100644 --- a/libs/httpd/luasrc/httpd/handler/luci.lua +++ b/libs/httpd/luasrc/httpd/handler/luci.lua @@ -10,7 +10,16 @@ function Luci.__init__(self) luci.httpd.module.Handler.__init__(self) end -function Luci.handle(self, request, sourcein, sinkerr) +function Luci.handle_head(self, ...) + local response, sourceout = self:handle_get(...) + return response +end + +function Luci.handle_post(self, ...) + return self:handle_get(...) +end + +function Luci.handle_get(self, request, sourcein, sinkerr) local r = luci.http.Request( request.env, sourcein, @@ -22,7 +31,7 @@ function Luci.handle(self, request, sourcein, sinkerr) local status = 200 local x = coroutine.create(luci.dispatcher.httpdispatch) - while id < 3 do + while not id or id < 3 do coroutine.yield() res, id, data1, data2 = coroutine.resume(x, r) @@ -45,6 +54,8 @@ function Luci.handle(self, request, sourcein, sinkerr) local res, id, data = coroutine.resume(x) if not res then return nil, id + elseif not id then + return true elseif id == 5 then return nil else diff --git a/libs/httpd/luasrc/httpd/module.lua b/libs/httpd/luasrc/httpd/module.lua index c321856a8..46cfa54ff 100644 --- a/libs/httpd/luasrc/httpd/module.lua +++ b/libs/httpd/luasrc/httpd/module.lua @@ -24,6 +24,7 @@ Handler = luci.util.class() -- Constructor function Handler.__init__(self) self.filters = {} + self.handler = {} end @@ -41,9 +42,10 @@ function Handler.failure(self, code, message) return response, sourceout end - -- Processes a request -function Handler.process(self, request, sourcein, sinkout, sinkerr) +function Handler.process(self, request, sourcein, sinkerr, ...) + local stat, response, sourceout + -- Process incoming filters for i, f in ipairs(self.filters) do local i = f:get("input") @@ -57,14 +59,20 @@ function Handler.process(self, request, sourcein, sinkout, sinkerr) end end - -- Run the handler - local stat, response, sourceout = luci.util.copcall( - self.handle, self, request, sourcein, sinkerr - ) + -- Detect request Method + local hname = "handle_" .. request.request_method + if self[hname] then + -- Run the handler + stat, response, sourceout = luci.util.copcall( + self[hname], self, request, sourcein, sinkerr, ... + ) - -- Check for any errors - if not stat then - response, sourceout = self:failure(500, response) + -- Check for any errors + if not stat then + response, sourceout = self:failure(500, response) + end + else + response, sourceout = self:failure(405, luci.http.protocol.statusmsg[405]) end -- Check data @@ -85,7 +93,7 @@ function Handler.process(self, request, sourcein, sinkout, sinkerr) end end - luci.http.protocol.push_response(request, response, sourceout, sinkout, sinkerr) + return response, sourceout end diff --git a/libs/httpd/luasrc/httpd/server.lua b/libs/httpd/luasrc/httpd/server.lua index 90fdd7ed3..181ca24a1 100644 --- a/libs/httpd/luasrc/httpd/server.lua +++ b/libs/httpd/luasrc/httpd/server.lua @@ -14,6 +14,8 @@ $Id$ ]]-- module("luci.httpd.server", package.seeall) +require("socket") +require("socket.http") require("luci.util") READ_BUFSIZE = 1024 @@ -26,7 +28,7 @@ function VHost.__init__(self, handler) self.dhandler = {} end -function VHost.process(self, request, sourcein, sinkout, sinkerr) +function VHost.process(self, request, sourcein, sinkerr, ...) local handler = self.handler local uri = request.env.REQUEST_URI:match("^([^?]*)") @@ -47,10 +49,7 @@ function VHost.process(self, request, sourcein, sinkout, sinkerr) end if handler then - handler:process(request, sourcein, sinkout, sinkerr) - return true - else - return false + return handler:process(request, sourcein, sinkerr, ...) end end @@ -69,8 +68,6 @@ end Server = luci.util.class() function Server.__init__(self, host) - self.clhandler = client_handler - self.errhandler = error503 self.host = host self.vhosts = {} end @@ -86,129 +83,153 @@ end function Server.create_daemon_handlers(self) return function(...) return self:process(...) end, - function(...) return self:error503(...) end + function(...) return self:error_overload(...) end end -function Server.create_client_sources(self, client) - -- Create LTN12 block source - local block_source = function() - -- Yielding here may cause chaos in coroutine based modules, be careful - -- coroutine.yield() - - local chunk, err, part = client:receive( READ_BUFSIZE ) - - if chunk == nil and err == "timeout" then - return part - elseif chunk ~= nil then - return chunk - else - return nil, err - end - - end - - - -- Create LTN12 line source - local line_source = ltn12.source.simplify( function() - - coroutine.yield() - - local chunk, err, part = client:receive("*l") - - -- Line too long - if chunk == nil and err ~= "timeout" then - - return nil, part - and "Line exceeds maximum allowed length["..part.."]" - or "Unexpected EOF" - - -- Line ok - elseif chunk ~= nil then - - -- Strip trailing CR - chunk = chunk:gsub("\r$","") - - -- We got end of headers, switch to dummy source - if #chunk == 0 then - return "", function() - return nil - end - else - return chunk, nil - end - end - end ) - - return block_source, line_source -end - - -function Server.error400(self, socket, msg) - socket:send( "HTTP/1.0 400 Bad request\r\n" ) - socket:send( "Content-Type: text/plain\r\n\r\n" ) - - if msg then - socket:send( msg .. "\r\n" ) - end - - socket:close() -end - -function Server.error500(self, socket, msg) - socket:send( "HTTP/1.0 500 Internal Server Error\r\n" ) +function Server.error(self, socket, code, msg) + hcode = tostring(code) + + socket:send( "HTTP/1.1 " .. hcode .. " " .. + luci.http.protocol.statusmsg[code] .. "\r\n" ) + socket:send( "Connection: close\r\n" ) socket:send( "Content-Type: text/plain\r\n\r\n" ) if msg then - socket:send( msg .. "\r\n" ) + socket:send( "HTTP-Error " .. code .. ": " .. msg .. "\r\n" ) end - - socket:close() end -function Server.error503(self, socket) - socket:send( "HTTP/1.0 503 Server unavailable\r\n" ) - socket:send( "Content-Type: text/plain\r\n\r\n" ) - socket:send( "There are too many clients connected, try again later\r\n" ) - socket:close() +function Server.error_overload(self, socket) + self:error(socket, 503, "Too many simultaneous connections") end -function Server.process(self, client) +function Server.process( self, thread ) + -- Setup sockets and sources + local client = thread.socket client:settimeout( 0 ) - local sourcein, sourcehdr = self:create_client_sources(client) - local sinkerr = ltn12.sink.file(io.stderr) - - -- FIXME: Add keep-alive support - local sinkout = socket.sink("close-when-done", client) - - coroutine.yield() - - -- parse headers - local message, err = luci.http.protocol.parse_message_header( sourcehdr ) - - if message then - -- If we have a HTTP/1.1 client and an Expect: 100-continue header then - -- respond with HTTP 100 Continue message - if message.http_version == 1.1 and message.headers['Expect'] and - message.headers['Expect'] == '100-continue' - then - client:send("HTTP/1.1 100 Continue\r\n\r\n") + local sourcein = ltn12.source.empty() + local sourcehdr = luci.http.protocol.header_source( thread ) + local sinkerr = ltn12.sink.file( io.stderr ) + + local close = false + + local reading = { client } + + local message, err + + socket.sleep(5) + + repeat + -- parse headers + message, err = luci.http.protocol.parse_message_header( sourcehdr ) + + if not message then + self:error( client, 400, err ) + break + end + + coroutine.yield() + + -- keep-alive + if message.http_version == 1.1 then + close = (message.env.HTTP_CONNECTION == "close") + else + close = not message.env.HTTP_CONNECTION or message.env.HTTP_CONNECTION == "close" end - - local host = self.vhosts[message.env.HTTP_HOST] or self.host - if host then - if host:process(message, sourcein, sinkout, sinkerr) then - sinkout() + + if message.request_method == "get" or message.request_method == "head" then + -- Be happy + + elseif message.request_method == "post" then + -- If we have a HTTP/1.1 client and an Expect: 100-continue header then + -- respond with HTTP 100 Continue message + if message.http_version == 1.1 and message.headers['Expect'] and + message.headers['Expect'] == '100-continue' + then + client:send("HTTP/1.1 100 Continue\r\n\r\n") + end + + if message.headers['Transfer-Encoding'] and + message.headers['Transfer-Encoding'] ~= "identity" then + sourcein = socket.source("http-chunked", thread) + elseif message.env.CONTENT_LENGTH then + sourcein = socket.source("by-length", thread, + tonumber(message.env.CONTENT_LENGTH)) else - self:error500( client, "No suitable path handler found" ) + self:error( client, 411, luci.http.protocol.statusmsg[411] ) + break; end + else - self:error500( client, "No suitable host handler found" ) + self:error( client, 405, luci.http.protocol.statusmsg[405] ) + break; + end - else - self:error400( client, err ) - return nil - end + + + local host = self.vhosts[message.env.HTTP_HOST] or self.host + if not host then + self:error( client, 500, "Unable to find matching host" ) + break; + end + + coroutine.yield() + + local response, sourceout = host:process( + message, sourcein, sinkerr, + client, io.stderr + ) + if not response then + self:error( client, 500, "Error processing handler" ) + end + + coroutine.yield() + + -- Post process response + local sinkmode = close and "close-when-done" or "keep-open" + + if sourceout then + if not response.headers["Content-Length"] then + if message.http_version == 1.1 then + response.headers["Transfer-Encoding"] = "chunked" + sinkmode = "http-chunked" + else + close = true + sinkmode = "close-when-done" + end + end + end + + if close then + response.headers["Connection"] = "close" + end + + + local sinkout = socket.sink(sinkmode, client) + + local header = + message.env.SERVER_PROTOCOL .. " " .. + tostring(response.status) .. " " .. + luci.http.protocol.statusmsg[response.status] .. "\r\n" + + + for k,v in pairs(response.headers) do + header = header .. k .. ": " .. v .. "\r\n" + end + + client:send(header .. "\r\n") + + if sourceout then + local eof = false + repeat + coroutine.yield() + eof = not ltn12.pump.step(sourceout, sinkout) + until eof + end + until close + + client:close() end |