summaryrefslogtreecommitdiffstats
path: root/ttun_server/endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to 'ttun_server/endpoints.py')
-rw-r--r--ttun_server/endpoints.py52
1 files changed, 34 insertions, 18 deletions
diff --git a/ttun_server/endpoints.py b/ttun_server/endpoints.py
index b25ffe4..6728c31 100644
--- a/ttun_server/endpoints.py
+++ b/ttun_server/endpoints.py
@@ -12,7 +12,7 @@ from starlette.types import Scope, Receive, Send
12from starlette.websockets import WebSocket 12from starlette.websockets import WebSocket
13 13
14from ttun_server.proxy_queue import ProxyQueue 14from ttun_server.proxy_queue import ProxyQueue
15from ttun_server.types import RequestData, Config 15from ttun_server.types import RequestData, Config, Message, MessageType
16 16
17logger = logging.getLogger(__name__) 17logger = logging.getLogger(__name__)
18 18
@@ -33,26 +33,39 @@ class Proxy(HTTPEndpoint):
33 [subdomain, *_] = request.headers['host'].split('.') 33 [subdomain, *_] = request.headers['host'].split('.')
34 response = Response(content='Not Found', status_code=404) 34 response = Response(content='Not Found', status_code=404)
35 35
36 identifier = str(uuid4())
37 response_queue = await ProxyQueue.create_for_identifier(f'{subdomain}_{identifier}')
38
36 try: 39 try:
37 queue = await ProxyQueue.get_for_identifier(subdomain)
38 40
39 await queue.send_request(RequestData( 41 request_queue = await ProxyQueue.get_for_identifier(subdomain)
40 method=request.method, 42
41 path=str(request.url).replace(str(request.base_url), '/'), 43 await request_queue.enqueue(
42 headers=list(request.headers.items()), 44 Message(
43 body=b64encode(await request.body()).decode() 45 type=MessageType.request,
44 )) 46 identifier=identifier,
47 payload=
48 RequestData(
49 method=request.method,
50 path=str(request.url).replace(str(request.base_url), '/'),
51 headers=list(request.headers.items()),
52 body=b64encode(await request.body()).decode()
53 )
54 )
55 )
45 56
46 _response = await queue.handle_response() 57 _response = await response_queue.dequeue()
58 payload = _response['payload']
47 response = Response( 59 response = Response(
48 status_code=_response['status'], 60 status_code=payload['status'],
49 headers=HeaderMapping(_response['headers']), 61 headers=HeaderMapping(payload['headers']),
50 content=b64decode(_response['body'].encode()) 62 content=b64decode(payload['body'].encode())
51 ) 63 )
52 except AssertionError: 64 except AssertionError:
53 pass 65 pass
54 66 finally:
55 await response(self.scope, self.receive, self.send) 67 await response(self.scope, self.receive, self.send)
68 await response_queue.delete()
56 69
57 70
58class Health(HTTPEndpoint): 71class Health(HTTPEndpoint):
@@ -62,7 +75,6 @@ class Health(HTTPEndpoint):
62 await response(self.scope, self.receive, self.send) 75 await response(self.scope, self.receive, self.send)
63 76
64 77
65
66class Tunnel(WebSocketEndpoint): 78class Tunnel(WebSocketEndpoint):
67 encoding = 'json' 79 encoding = 'json'
68 80
@@ -72,7 +84,7 @@ class Tunnel(WebSocketEndpoint):
72 self.config: Optional[Config] = None 84 self.config: Optional[Config] = None
73 85
74 async def handle_requests(self, websocket: WebSocket): 86 async def handle_requests(self, websocket: WebSocket):
75 while request := await self.proxy_queue.handle_request(): 87 while request := await self.proxy_queue.dequeue():
76 await websocket.send_json(request) 88 await websocket.send_json(request)
77 89
78 async def on_connect(self, websocket: WebSocket) -> None: 90 async def on_connect(self, websocket: WebSocket) -> None:
@@ -94,8 +106,12 @@ class Tunnel(WebSocketEndpoint):
94 106
95 self.request_task = asyncio.create_task(self.handle_requests(websocket)) 107 self.request_task = asyncio.create_task(self.handle_requests(websocket))
96 108
97 async def on_receive(self, websocket: WebSocket, data: Any): 109 async def on_receive(self, websocket: WebSocket, data: Message):
98 await self.proxy_queue.send_response(data) 110 try:
111 response_queue = await ProxyQueue.get_for_identifier(f"{self.config['subdomain']}_{data['identifier']}")
112 await response_queue.enqueue(data)
113 except AssertionError:
114 pass
99 115
100 async def on_disconnect(self, websocket: WebSocket, close_code: int): 116 async def on_disconnect(self, websocket: WebSocket, close_code: int):
101 await self.proxy_queue.delete() 117 await self.proxy_queue.delete()