mmpSearch/venv/lib/python3.12/site-packages/socketio/async_redis_manager.py

165 lines
6.9 KiB
Python

import asyncio
from urllib.parse import urlparse
try:
from redis import asyncio as aioredis
from redis.exceptions import RedisError
except ImportError: # pragma: no cover
try:
import aioredis
from aioredis.exceptions import RedisError
except ImportError:
aioredis = None
RedisError = None
try:
from valkey import asyncio as aiovalkey
from valkey.exceptions import ValkeyError
except ImportError: # pragma: no cover
aiovalkey = None
ValkeyError = None
from engineio import json
from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url
class AsyncRedisManager(AsyncPubSubManager):
"""Redis based client manager for asyncio servers.
This class implements a Redis backend for event sharing across multiple
processes.
To use a Redis backend, initialize the :class:`AsyncServer` instance as
follows::
url = 'redis://hostname:port/0'
server = socketio.AsyncServer(
client_manager=socketio.AsyncRedisManager(url))
:param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``. To use a
TLS connection, use ``rediss://``. To use Redis Sentinel, use
``redis+sentinel://`` with a comma-separated list of hosts
and the service name after the db in the URL path. Example:
``redis+sentinel://user:pw@host1:1234,host2:2345/0/myredis``.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``Redis.from_url()`` or ``Sentinel()``.
"""
name = 'aioredis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if aioredis and \
not hasattr(aioredis.Redis, 'from_url'): # pragma: no cover
raise RuntimeError('Version 2 of aioredis package is required.')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
self.redis_options = redis_options or {}
self._redis_connect()
def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url)
scheme = parsed_url.scheme.split('+', 1)[0].lower()
if scheme in ['redis', 'rediss']:
if aioredis is None or RedisError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'in your virtualenv).')
return aioredis, RedisError
if scheme in ['valkey', 'valkeys']:
if aiovalkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" '
'in your virtualenv).')
return aiovalkey, ValkeyError
if scheme == 'unix':
if aioredis is None or RedisError is None:
if aiovalkey is None or ValkeyError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'or "pip install valkey" '
'in your virtualenv).')
else:
return aiovalkey, ValkeyError
else:
return aioredis, RedisError
error_msg = f'Unsupported Redis URL scheme: {scheme}'
raise ValueError(error_msg)
def _redis_connect(self):
module, _ = self._get_redis_module_and_error()
parsed_url = urlparse(self.redis_url)
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
else:
self.redis = module.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
async def _publish(self, data): # pragma: no cover
retry = True
_, error = self._get_redis_module_and_error()
while True:
try:
if not retry:
self._redis_connect()
return await self.redis.publish(
self.channel, json.dumps(data))
except error as exc:
if retry:
self._get_logger().error(
'Cannot publish to redis... '
'retrying',
extra={"redis_exception": str(exc)})
retry = False
else:
self._get_logger().error(
'Cannot publish to redis... '
'giving up',
extra={"redis_exception": str(exc)})
break
async def _redis_listen_with_retries(self): # pragma: no cover
retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error()
while True:
try:
if connect:
self._redis_connect()
await self.pubsub.subscribe(self.channel)
retry_sleep = 1
async for message in self.pubsub.listen():
yield message
except error as exc:
self._get_logger().error('Cannot receive from redis... '
'retrying in '
f'{retry_sleep} secs',
extra={"redis_exception": str(exc)})
connect = True
await asyncio.sleep(retry_sleep)
retry_sleep *= 2
if retry_sleep > 60:
retry_sleep = 60
async def _listen(self): # pragma: no cover
channel = self.channel.encode('utf-8')
await self.pubsub.subscribe(self.channel)
async for message in self._redis_listen_with_retries():
if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message:
yield message['data']
await self.pubsub.unsubscribe(self.channel)