diff options
| author | 2026-04-28 10:45:41 +0200 | |
|---|---|---|
| committer | 2026-04-28 10:45:41 +0200 | |
| commit | b8d5952b83e7601c5df646efd976879f0dbd30c2 (patch) | |
| tree | 6366e383f6fc484e96a7e618015374561452d841 /ttun_server | |
| parent | d93199684f159d59ef62e4c90d16516f6fd9526e (diff) | |
| download | server-b8d5952b83e7601c5df646efd976879f0dbd30c2.tar.gz server-b8d5952b83e7601c5df646efd976879f0dbd30c2.tar.bz2 server-b8d5952b83e7601c5df646efd976879f0dbd30c2.zip | |
Fixed issue with the redis proxy queue not working due to enqueing on the wrong queuev2.2.1
Diffstat (limited to 'ttun_server')
| -rw-r--r-- | ttun_server/proxy_queue.py | 2 | ||||
| -rw-r--r-- | ttun_server/redis.py | 14 | ||||
| -rw-r--r-- | ttun_server/websockets.py | 6 |
3 files changed, 7 insertions, 15 deletions
diff --git a/ttun_server/proxy_queue.py b/ttun_server/proxy_queue.py index c6c8067..cfa0f3c 100644 --- a/ttun_server/proxy_queue.py +++ b/ttun_server/proxy_queue.py | |||
| @@ -109,7 +109,7 @@ class RedisProxyQueue(BaseProxyQueue): | |||
| 109 | async def enqueue(self, message: Message): | 109 | async def enqueue(self, message: Message): |
| 110 | await RedisConnectionPool \ | 110 | await RedisConnectionPool \ |
| 111 | .get_connection() \ | 111 | .get_connection() \ |
| 112 | .publish(self.identifier, json.dumps(message)) | 112 | .publish(f'request_{self.identifier}', json.dumps(message)) |
| 113 | 113 | ||
| 114 | async def dequeue(self) -> Message: | 114 | async def dequeue(self) -> Message: |
| 115 | message = await self.wait_for_message() | 115 | message = await self.wait_for_message() |
diff --git a/ttun_server/redis.py b/ttun_server/redis.py index 18fbca2..fb9ff81 100644 --- a/ttun_server/redis.py +++ b/ttun_server/redis.py | |||
| @@ -1,19 +1,13 @@ | |||
| 1 | import asyncio | ||
| 2 | import os | 1 | import os |
| 3 | from asyncio import get_running_loop | ||
| 4 | 2 | ||
| 5 | from redis.asyncio import ConnectionPool, Redis | 3 | from redis.asyncio import ConnectionPool, Redis |
| 6 | 4 | ||
| 7 | 5 | ||
| 8 | class RedisConnectionPool: | 6 | class RedisConnectionPool: |
| 9 | instance: 'RedisConnectionPool' = None | 7 | _pool: ConnectionPool | None = None |
| 10 | |||
| 11 | def __init__(self): | ||
| 12 | self.pool = ConnectionPool.from_url(os.environ.get('REDIS_URL')) | ||
| 13 | 8 | ||
| 14 | @classmethod | 9 | @classmethod |
| 15 | def get_connection(cls) -> Redis: | 10 | def get_connection(cls) -> Redis: |
| 16 | if cls.instance is None: | 11 | if cls._pool is None: |
| 17 | cls.instance = RedisConnectionPool() | 12 | cls._pool = ConnectionPool.from_url(os.environ.get('REDIS_URL')) |
| 18 | 13 | return Redis(connection_pool=cls._pool) \ No newline at end of file | |
| 19 | return Redis(connection_pool=cls.instance.pool) | ||
diff --git a/ttun_server/websockets.py b/ttun_server/websockets.py index 0800cbc..f80359f 100644 --- a/ttun_server/websockets.py +++ b/ttun_server/websockets.py | |||
| @@ -46,7 +46,7 @@ class WebsocketProxy(WebSocketEndpoint): | |||
| 46 | else: | 46 | else: |
| 47 | yield | 47 | yield |
| 48 | except AssertionError: | 48 | except AssertionError: |
| 49 | pass | 49 | yield None |
| 50 | 50 | ||
| 51 | async def listen_for_messages(self, websocket: WebSocket): | 51 | async def listen_for_messages(self, websocket: WebSocket): |
| 52 | [subdomain, *_] = websocket.url.hostname.split('.') | 52 | [subdomain, *_] = websocket.url.hostname.split('.') |
| @@ -74,9 +74,7 @@ class WebsocketProxy(WebSocketEndpoint): | |||
| 74 | ) | 74 | ) |
| 75 | 75 | ||
| 76 | async with self.proxy(websocket, message) as m: | 76 | async with self.proxy(websocket, message) as m: |
| 77 | type = WebsocketMessageType(m['type']) | 77 | if m is not None and WebsocketMessageType(m['type']) == WebsocketMessageType.ack: |
| 78 | |||
| 79 | if type == WebsocketMessageType.ack: | ||
| 80 | await super().on_connect(websocket) | 78 | await super().on_connect(websocket) |
| 81 | 79 | ||
| 82 | self.websocket_listen_task = asyncio.create_task(self.listen_for_messages(websocket)) | 80 | self.websocket_listen_task = asyncio.create_task(self.listen_for_messages(websocket)) |
