From 7ac28203290a211a6e17ae0b91bc2b609f110514 Mon Sep 17 00:00:00 2001 From: Tom van der Lee Date: Thu, 8 Jun 2023 08:24:43 +0200 Subject: WIP --- ttun_server/endpoints.py | 52 +++++++++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 18 deletions(-) (limited to 'ttun_server/endpoints.py') diff --git a/ttun_server/endpoints.py b/ttun_server/endpoints.py index b25ffe4..6728c31 100644 --- a/ttun_server/endpoints.py +++ b/ttun_server/endpoints.py @@ -12,7 +12,7 @@ from starlette.types import Scope, Receive, Send from starlette.websockets import WebSocket from ttun_server.proxy_queue import ProxyQueue -from ttun_server.types import RequestData, Config +from ttun_server.types import RequestData, Config, Message, MessageType logger = logging.getLogger(__name__) @@ -33,26 +33,39 @@ class Proxy(HTTPEndpoint): [subdomain, *_] = request.headers['host'].split('.') response = Response(content='Not Found', status_code=404) + identifier = str(uuid4()) + response_queue = await ProxyQueue.create_for_identifier(f'{subdomain}_{identifier}') + try: - queue = await ProxyQueue.get_for_identifier(subdomain) - await queue.send_request(RequestData( - method=request.method, - path=str(request.url).replace(str(request.base_url), '/'), - headers=list(request.headers.items()), - body=b64encode(await request.body()).decode() - )) + request_queue = await ProxyQueue.get_for_identifier(subdomain) + + await request_queue.enqueue( + Message( + type=MessageType.request, + identifier=identifier, + payload= + RequestData( + method=request.method, + path=str(request.url).replace(str(request.base_url), '/'), + headers=list(request.headers.items()), + body=b64encode(await request.body()).decode() + ) + ) + ) - _response = await queue.handle_response() + _response = await response_queue.dequeue() + payload = _response['payload'] response = Response( - status_code=_response['status'], - headers=HeaderMapping(_response['headers']), - content=b64decode(_response['body'].encode()) + status_code=payload['status'], + headers=HeaderMapping(payload['headers']), + content=b64decode(payload['body'].encode()) ) except AssertionError: pass - - await response(self.scope, self.receive, self.send) + finally: + await response(self.scope, self.receive, self.send) + await response_queue.delete() class Health(HTTPEndpoint): @@ -62,7 +75,6 @@ class Health(HTTPEndpoint): await response(self.scope, self.receive, self.send) - class Tunnel(WebSocketEndpoint): encoding = 'json' @@ -72,7 +84,7 @@ class Tunnel(WebSocketEndpoint): self.config: Optional[Config] = None async def handle_requests(self, websocket: WebSocket): - while request := await self.proxy_queue.handle_request(): + while request := await self.proxy_queue.dequeue(): await websocket.send_json(request) async def on_connect(self, websocket: WebSocket) -> None: @@ -94,8 +106,12 @@ class Tunnel(WebSocketEndpoint): self.request_task = asyncio.create_task(self.handle_requests(websocket)) - async def on_receive(self, websocket: WebSocket, data: Any): - await self.proxy_queue.send_response(data) + async def on_receive(self, websocket: WebSocket, data: Message): + try: + response_queue = await ProxyQueue.get_for_identifier(f"{self.config['subdomain']}_{data['identifier']}") + await response_queue.enqueue(data) + except AssertionError: + pass async def on_disconnect(self, websocket: WebSocket, close_code: int): await self.proxy_queue.delete() -- cgit v1.2.3