import unittest import gevent.testing as greentest from gevent.testing import TestCase import gevent from gevent.hub import get_hub, LoopExit from gevent import util from gevent import queue from gevent.queue import Empty, Full from gevent.event import AsyncResult from gevent.testing.timing import AbstractGenericGetTestCase # pylint:disable=too-many-ancestors class UsesOnlyOneItemMixin: # These tests only place one item at a time in # the queue, so they can work for Queue, SimpleQueue, LifoQueue, # and Channel SUPPORTS_PUTTING_WITHOUT_GETTING = True def test_put_nowait_simple(self): result = [] q = self._makeOne(1) def store_result(func, *args): result.append(func(*args)) run_callback = get_hub().loop.run_callback run_callback(store_result, util.wrap_errors(Full, q.put_nowait), 2) run_callback(store_result, util.wrap_errors(Full, q.put_nowait), 3) gevent.sleep(0) self.assertEqual(len(result), 2) if self.SUPPORTS_PUTTING_WITHOUT_GETTING: self.assertIsNone(result[0], result) else: self.assertIsInstance(result[0], queue.Full, result) self.assertIsInstance(result[1], queue.Full, result) # put_nowait must work from the mainloop def test_put_nowait_unlock(self): result = [] q = self._makeOne() p = gevent.spawn(q.get) def store_result(func, *args): result.append(func(*args)) self.assertTrue(q.empty(), q) if self.SUPPORTS_PUTTING_WITHOUT_GETTING: assertFull = self.assertFalse else: assertFull = self.assertTrue assertFull(q.full(), q) gevent.sleep(0.001) self.assertTrue(q.empty(), q) assertFull(q.full(), q) get_hub().loop.run_callback(store_result, q.put_nowait, 10) self.assertFalse(p.ready(), p) gevent.sleep(0.001) self.assertEqual(result, [None]) self.assertTrue(p.ready(), p) assertFull(q.full(), q) self.assertTrue(q.empty(), q) def test_send_last(self): q = self._makeOne() def waiter(q): with gevent.Timeout(0.1 if not greentest.RUNNING_ON_APPVEYOR else 0.5): self.assertEqual(q.get(), 'hi2') return "OK" p = gevent.spawn(waiter, q) gevent.sleep(0.01) q.put('hi2') gevent.sleep(0.01) assert p.get(timeout=0) == "OK" def test_init_and_bottleneck_methods(self): if not self.SUPPORTS_PUTTING_WITHOUT_GETTING: self.skipTest('Needs to be able to put and get') # subclasses of stdlib queues. class X(self._getFUT()): initted = None get_count = 0 put_count = 0 def _init(self, maxsize): super()._init(maxsize) self.initted = True def _get(self): self.get_count += 1 return super()._get() def _put(self, item): self.put_count += 1 return super()._put(item) x = X() x.put('hi') self.assertEqual(x.get(), 'hi') self.assertEqual(x.put_count, 1) self.assertEqual(x.get_count, 1) self.assertTrue(x.initted) class SubscriptMixin: def _getFUT(self): raise NotImplementedError def _makeOne(self, *args, **kwargs): return self._getFUT()(*args, **kwargs) def test_subscript(self): import queue as stdlib_queue kind = self._getFUT() try: stdlib_kind = getattr(stdlib_queue, kind.__name__) except AttributeError: assert kind.__name__ == 'Channel' import types self.assertIsInstance(kind[int], types.GenericAlias) else: self.assertIsNot(kind, stdlib_kind) self.assertIsInstance(kind[int], type(stdlib_kind[int])) class TestSimpleQueue(SubscriptMixin, UsesOnlyOneItemMixin, TestCase): def _getFUT(self): return queue.SimpleQueue def test_get_nowait_simple(self): result = [] q = self._makeOne(1) q.put(4) def store_result(func, *args): result.append(func(*args)) run_callback = get_hub().loop.run_callback run_callback(store_result, util.wrap_errors(Empty, q.get_nowait)) run_callback(store_result, util.wrap_errors(Empty, q.get_nowait)) gevent.sleep(0) self.assertEqual(len(result), 2) self.assertEqual(result[0], 4) self.assertIsInstance(result[1], Empty) # get_nowait must work from the mainloop def test_get_nowait_unlock(self): result = [] q = self._makeOne(1) p = gevent.spawn(q.put, 5) def store_result(func, *args): result.append(func(*args)) assert q.empty(), q gevent.sleep(0) assert q.full(), q get_hub().loop.run_callback(store_result, q.get_nowait) gevent.sleep(0) assert q.empty(), q assert result == [5], result assert p.ready(), p assert p.dead, p assert q.empty(), q def test_send_first(self): self.switch_expected = False q = self._makeOne() q.put('hi') self.assertEqual(q.peek(), 'hi') self.assertEqual(q.get(), 'hi') def test_peek_empty(self): q = self._makeOne() # No putters waiting, in the main loop: LoopExit with self.assertRaises(LoopExit): q.peek() def waiter(q): self.assertRaises(Empty, q.peek, timeout=0.01) g = gevent.spawn(waiter, q) gevent.sleep(0.1) g.join() def test_peek_multi_greenlet(self): q = self._makeOne() g = gevent.spawn(q.peek) g.start() gevent.sleep(0) q.put(1) g.join() self.assertTrue(g.exception is None) self.assertEqual(q.peek(), 1) def test_max_size(self): q = self._makeOne(2) results = [] def putter(q): q.put('a') results.append('a') q.put('b') results.append('b') q.put('c') results.append('c') return "OK" p = gevent.spawn(putter, q) gevent.sleep(0) self.assertEqual(results, ['a', 'b']) self.assertEqual(q.get(), 'a') gevent.sleep(0) self.assertEqual(results, ['a', 'b', 'c']) self.assertEqual(q.get(), 'b') self.assertEqual(q.get(), 'c') assert p.get(timeout=0) == "OK" def test_multiple_waiters(self): # tests that multiple waiters get their results back q = self._makeOne() def waiter(q, evt): evt.set(q.get()) sendings = ['1', '2', '3', '4'] evts = [AsyncResult() for x in sendings] for i, _ in enumerate(sendings): gevent.spawn(waiter, q, evts[i]) # XXX use waitall for them gevent.sleep(0.01) # get 'em all waiting results = set() def collect_pending_results(): for e in evts: with gevent.Timeout(0.001, False): x = e.get() results.add(x) return len(results) q.put(sendings[0]) self.assertEqual(collect_pending_results(), 1) q.put(sendings[1]) self.assertEqual(collect_pending_results(), 2) q.put(sendings[2]) q.put(sendings[3]) self.assertEqual(collect_pending_results(), 4) def test_waiters_that_cancel(self): q = self._makeOne() def do_receive(q, evt): with gevent.Timeout(0, RuntimeError()): try: result = q.get() evt.set(result) # pragma: no cover (should have raised) except RuntimeError: evt.set('timed out') evt = AsyncResult() gevent.spawn(do_receive, q, evt) self.assertEqual(evt.get(), 'timed out') q.put('hi') self.assertEqual(q.get(), 'hi') def test_senders_that_die(self): q = self._makeOne() def do_send(q): q.put('sent') gevent.spawn(do_send, q) self.assertEqual(q.get(), 'sent') def test_two_waiters_one_dies(self): def waiter(q, evt): evt.set(q.get()) def do_receive(q, evt): with gevent.Timeout(0, RuntimeError()): try: result = q.get() evt.set(result) # pragma: no cover (should have raised) except RuntimeError: evt.set('timed out') q = self._makeOne() dying_evt = AsyncResult() waiting_evt = AsyncResult() gevent.spawn(do_receive, q, dying_evt) gevent.spawn(waiter, q, waiting_evt) gevent.sleep(0.1) q.put('hi') self.assertEqual(dying_evt.get(), 'timed out') self.assertEqual(waiting_evt.get(), 'hi') def test_two_bogus_waiters(self): def do_receive(q, evt): with gevent.Timeout(0, RuntimeError()): try: result = q.get() evt.set(result) # pragma: no cover (should have raised) except RuntimeError: evt.set('timed out') q = self._makeOne() e1 = AsyncResult() e2 = AsyncResult() gevent.spawn(do_receive, q, e1) gevent.spawn(do_receive, q, e2) gevent.sleep(0.1) q.put('sent') self.assertEqual(e1.get(), 'timed out') self.assertEqual(e2.get(), 'timed out') self.assertEqual(q.get(), 'sent') def test_subclass_assign_queue(self): # https://github.com/gevent/gevent/issues/2136 self.assertTrue(hasattr(self._makeOne(), 'queue')) my_queue = [] class Q(self._getFUT()): def _init(self, _maxsize): # pylint: disable=arguments-differ self.queue = my_queue q = Q() self.assertIs(q.queue, my_queue) class TestChannel(SubscriptMixin, UsesOnlyOneItemMixin, TestCase): SUPPORTS_PUTTING_WITHOUT_GETTING = False def _getFUT(self): return queue.Channel def test_get_nowait_unlock_channel(self): # get_nowait runs fine in the hub, and # it switches to a waiting putter if needed. result = [] q = self._makeOne() p = gevent.spawn(q.put, 5) def store_result(func, *args): result.append(func(*args)) self.assertTrue(q.empty()) self.assertTrue(q.full()) gevent.sleep(0.001) self.assertTrue(q.empty()) self.assertTrue(q.full()) get_hub().loop.run_callback(store_result, q.get_nowait) gevent.sleep(0.001) self.assertTrue(q.empty()) self.assertTrue(q.full()) self.assertEqual(result, [5]) self.assertTrue(p.ready()) self.assertTrue(p.dead) self.assertTrue(q.empty()) def test_zero_max_size(self): q = self._makeOne() def sender(evt, q): q.put('hi') evt.set('done') def receiver(evt, q): x = q.get() evt.set(x) e1 = AsyncResult() e2 = AsyncResult() p1 = gevent.spawn(sender, e1, q) gevent.sleep(0.001) self.assertTrue(not e1.ready()) p2 = gevent.spawn(receiver, e2, q) self.assertEqual(e2.get(), 'hi') self.assertEqual(e1.get(), 'done') with gevent.Timeout(0): gevent.joinall([p1, p2]) def test_send(self): channel = self._makeOne() events = [] def another_greenlet(): events.append(channel.get()) events.append(channel.get()) g = gevent.spawn(another_greenlet) events.append('sending') channel.put('hello') events.append('sent hello') channel.put('world') events.append('sent world') self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events) g.get() def test_wait(self): channel = self._makeOne() events = [] def another_greenlet(): events.append('sending hello') channel.put('hello') events.append('sending world') channel.put('world') events.append('sent world') g = gevent.spawn(another_greenlet) events.append('waiting') events.append(channel.get()) events.append(channel.get()) self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events) gevent.sleep(0) self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events) g.get() def test_iterable(self): channel = self._makeOne() gevent.spawn(channel.put, StopIteration) r = list(channel) self.assertEqual(r, []) class TestQueue(TestSimpleQueue): queue = queue def _getFUT(self): return queue.Queue def test_task_done(self): channel = self._makeOne() X = object() gevent.spawn(channel.put, X) result = channel.get() self.assertIs(result, X) self.assertEqual(1, channel.unfinished_tasks) channel.task_done() self.assertEqual(0, channel.unfinished_tasks) def _shutdown_all_methods_in_one_thread(self, immediate): q = self._makeOne() q.put("L") q.put_nowait("O") q.shutdown(immediate) with self.assertRaises(self.queue.ShutDown): q.put("E") with self.assertRaises(self.queue.ShutDown): q.put_nowait("W") if immediate: with self.assertRaises(self.queue.ShutDown): q.get() with self.assertRaises(self.queue.ShutDown): q.get_nowait() with self.assertRaises(ValueError): q.task_done() q.join() else: self.assertIn(q.get(), "LO") q.task_done() self.assertIn(q.get(), "LO") q.task_done() q.join() # on shutdown(immediate=False) # when queue is empty, should raise ShutDown Exception with self.assertRaises(self.queue.ShutDown): q.get() # p.get(True) with self.assertRaises(self.queue.ShutDown): q.get_nowait() # p.get(False) with self.assertRaises(self.queue.ShutDown): q.get(True, 1.0) def test_shutdown_all_methods_in_one_thread(self): return self._shutdown_all_methods_in_one_thread(False) def test_shutdown_immediate_all_methods_in_one_thread(self): return self._shutdown_all_methods_in_one_thread(True) def test_issue_45(self): """Test that join() exits immediately if not jobs were put into the queue""" self.switch_expected = False q = self._makeOne() q.join() class TestLifoQueue(SubscriptMixin, TestCase): def _getFUT(self): return queue.LifoQueue class TestPriorityQueue(SubscriptMixin, TestCase): def _getFUT(self): return queue.PriorityQueue class AbstractTestWeakRefMixin(object): def test_weak_reference(self): import weakref one = self._makeOne() ref = weakref.ref(one) self.assertIs(one, ref()) class TestGetInterrupt(AbstractTestWeakRefMixin, AbstractGenericGetTestCase): Timeout = Empty kind = queue.SimpleQueue def wait(self, timeout): return self._makeOne().get(timeout=timeout) def _makeOne(self): return self.kind() class TestGetInterruptJoinableQueue(TestGetInterrupt): kind = queue.Queue class TestGetInterruptLifoQueue(TestGetInterrupt): kind = queue.LifoQueue class TestGetInterruptPriorityQueue(TestGetInterrupt): kind = queue.PriorityQueue class TestGetInterruptChannel(TestGetInterrupt): kind = queue.Channel class TestPutInterrupt(AbstractGenericGetTestCase): kind = queue.SimpleQueue Timeout = Full def setUp(self): super(TestPutInterrupt, self).setUp() self.queue = self._makeOne() def wait(self, timeout): while not self.queue.full(): self.queue.put(1) return self.queue.put(2, timeout=timeout) def _makeOne(self): return self.kind(1) class TestPutInterruptJoinableQueue(TestPutInterrupt): kind = queue.Queue class TestPutInterruptLifoQueue(TestPutInterrupt): kind = queue.LifoQueue class TestPutInterruptPriorityQueue(TestPutInterrupt): kind = queue.PriorityQueue class TestPutInterruptChannel(TestPutInterrupt): kind = queue.Channel def _makeOne(self): return self.kind() class TestGetInterruptSimpleQueue(TestGetInterrupt): kind = queue.SimpleQueue def test_raises_timeout_Timeout(self): raise unittest.SkipTest("Not supported") test_raises_timeout_Timeout_exc_customized = test_raises_timeout_Timeout test_outer_timeout_is_not_lost = test_raises_timeout_Timeout del AbstractGenericGetTestCase if __name__ == '__main__': greentest.main()