mmpSearch/venv/lib/python3.12/site-packages/gevent/tests/test__selectors.py

172 lines
5.6 KiB
Python

# Tests for gevent.selectors in its native form, without
# monkey-patching.
import gevent
from gevent import socket
from gevent import selectors
import gevent.testing as greentest
from gevent.testing import timing
class SelectorTestMixin(object):
@staticmethod
def run_selector_once(sel, timeout=3):
# Run in a background greenlet, leaving the main
# greenlet free to send data.
events = sel.select(timeout=timeout)
for key, mask in events:
key.data(sel, key.fileobj, mask)
gevent.sleep()
unregister_after_send = True
def read_from_ready_socket_and_reply(self, selector, conn, _events):
data = conn.recv(100) # Should be ready
if data:
conn.send(data) # Hope it won't block
# Must unregister before we close.
if self.unregister_after_send:
selector.unregister(conn)
conn.close()
def _check_selector(self, sel):
server, client = socket.socketpair()
glet = None
try:
sel.register(server, selectors.EVENT_READ, self.read_from_ready_socket_and_reply)
glet = gevent.spawn(self.run_selector_once, sel)
DATA = b'abcdef'
client.send(DATA)
data = client.recv(50) # here is probably where we yield to the event loop
self.assertEqual(data, DATA)
finally:
sel.close()
server.close()
client.close()
if glet is not None:
glet.join(10)
self.assertTrue(glet is not None and glet.ready())
class GeventSelectorTest(SelectorTestMixin,
greentest.TestCase):
def test_select_using_socketpair(self):
# Basic test.
with selectors.GeventSelector() as sel:
self._check_selector(sel)
def test_select_many_sockets(self):
try:
AF_UNIX = socket.AF_UNIX
except AttributeError:
AF_UNIX = None
pairs = [socket.socketpair() for _ in range(10)]
try:
server_sel = selectors.GeventSelector()
client_sel = selectors.GeventSelector()
for i, pair in enumerate(pairs):
server, client = pair
server_sel.register(server, selectors.EVENT_READ,
self.read_from_ready_socket_and_reply)
client_sel.register(client, selectors.EVENT_READ, i)
# Prime them all to be ready at once.
data = str(i).encode('ascii')
client.send(data)
# Read and reply to all the clients..
# Everyone should be ready, so we ask not to block.
# The call to gevent.idle() is there to make sure that
# all event loop implementations (looking at you, libuv)
# get a chance to poll for IO. Without it, libuv
# doesn't find any results here.
# Not blocking only works for AF_UNIX sockets, though.
# If we got AF_INET (Windows) the data may need some time to
# traverse through the layers.
gevent.idle()
self.run_selector_once(
server_sel,
timeout=-1 if pairs[0][0].family == AF_UNIX else 3)
found = 0
for key, _ in client_sel.select(timeout=3):
expected = str(key.data).encode('ascii')
data = key.fileobj.recv(50)
self.assertEqual(data, expected)
found += 1
self.assertEqual(found, len(pairs))
finally:
server_sel.close()
client_sel.close()
for pair in pairs:
for s in pair:
s.close()
@greentest.skipOnWindows("Things like os.close don't work on Windows")
class TestPossibleCrashes(greentest.TestCase):
"""
Tests for the crashes and unexpected exceptions
that happen when we try to use or create (depending on
loop implementation) a IO watcher for a closed/invalid file descriptor.
See https://github.com/gevent/gevent/issues/2100
See test__select.py
"""
def test_closing_object_while_selecting(self):
sock = socket.socket()
self.addCleanup(sock.close)
gevent.spawn(sock.close)
sel = selectors.GeventSelector()
sel.register(sock, selectors.EVENT_READ)
# This call needs to be blocking so we get all the way
# to having an open, started IO watcher when the
# socket gets closed.
with sel:
sel.select(timeout=timing.SMALLEST_RELIABLE_DELAY)
def _close_invalid_sock(self, sock):
# Because we closed the FD already (which raises EBADF when done again), but we
# still need to take care of the gevent-resources
try:
sock.close()
except OSError:
pass
def test_closing_fd_while_selecting(self):
# using regular os.close will
# crash under libuv
from gevent import os
sock = socket.socket()
self.addCleanup(self._close_invalid_sock, sock)
gevent.spawn(os.close, sock.fileno())
sel = selectors.GeventSelector()
sel.register(sock, selectors.EVENT_READ)
with sel:
sel.select(timeout=timing.SMALLEST_RELIABLE_DELAY)
def test_closing_fd_before_selecting(self):
import os
sock = socket.socket()
self.addCleanup(self._close_invalid_sock, sock)
os.close(sock.fileno())
sel = selectors.GeventSelector()
with self.assertRaisesRegex(ValueError, 'Invalid file'):
sel.register(sock, selectors.EVENT_READ)
if __name__ == '__main__':
greentest.main()