1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
import sys import socket from datetime import datetime, timezone
class WSGIServer(): application = None
def __init__(self, host, port) -> None: self.client_connection = None self.headers_set = None self.server_address = (host, port) self.request_data = None self.request_method = None self.path = None self.server_name = None self.server_port = None
def get_environ(self): env = {} env['wsgi.version'] = (1, 0) env['wsgi.url_scheme'] = 'http' env['wsgi.input'] = self.request_data env['wsgi.errors'] = sys.stderr env['REQUEST_METHOD'] = self.request_method env['PATH_INFO'] = self.path env['SERVER_NAME'] = self.server_name env['SERVER_PORT'] = self.server_port return env
def handle_one_request(self): sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sk.bind(self.server_address) sk.listen(1) self.client_connection, client_address = sk.accept() request_data = self.request_data = self.client_connection.recv(1024) self.request_method, self.path, self.request_version = self.parse_request(request_data) env = self.get_environ() result = self.application(env, self.start_response) self.finish_response(result) self.client_connection.close()
def start_response(self, status, response_headers, exc_info=None): now = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT") server_headers = [('Date', now), ('Server', 'WSGIServerCustomer 0.1')] self.headers_set = [status, response_headers+server_headers]
def finish_response(self, result): try: status, response_headers = self.headers_set response = f'{self.request_version} {status}\r\n' for header in response_headers: response += '{0}: {1}\r\n'.format(*header) response += '\r\n' for data in result: response += data.decode() print(''.join('>{line}\n'.format(line=line) for line in response.splitlines())) self.client_connection.sendall(response.encode()) finally: self.client_connection.close()
def parse_request(self, data): data = data.splitlines()[0] return data.decode().split()
def get_app(self): return self.application
def set_app(self,application): self.application = application
def demo_app(environ, start_response): from io import StringIO stdout = StringIO() print("Hello world!", file=stdout) print(file=stdout) h = sorted(environ.items()) for k,v in h: print(k,'=',repr(v), file=stdout) start_response("200 OK", [('Content-Type','text/plain; charset=utf-8')]) return [stdout.getvalue().encode("utf-8")]
def make_server(host, port, app): server = WSGIServer(host, port) server.set_app(app) return server
if __name__ == '__main__': httpd = make_server('', 8000, demo_app) httpd.handle_one_request()
|