__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# -*- test-case-name: twisted.internet.test.test_defer_yieldfrom -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for C{yield from} support in Deferreds.
"""
import types
from twisted.internet.defer import Deferred, ensureDeferred, fail, succeed
from twisted.internet.task import Clock
from twisted.trial.unittest import TestCase
class YieldFromTests(TestCase):
"""
Tests for using Deferreds in conjunction with PEP-380.
"""
def test_ensureDeferred(self) -> None:
"""
L{ensureDeferred} will turn a coroutine into a L{Deferred}.
"""
def run():
d = succeed("foo")
res = yield from d
return res
# It's a generator...
r = run()
self.assertIsInstance(r, types.GeneratorType)
# Now it's a Deferred.
d = ensureDeferred(r)
self.assertIsInstance(d, Deferred)
# The Deferred has the result we want.
res = self.successResultOf(d)
self.assertEqual(res, "foo")
def test_DeferredfromCoroutine(self) -> None:
"""
L{Deferred.fromCoroutine} will turn a coroutine into a L{Deferred}.
"""
def run():
d = succeed("bar")
yield from d
res = yield from run2()
return res
def run2():
d = succeed("foo")
res = yield from d
return res
# It's a generator...
r = run()
self.assertIsInstance(r, types.GeneratorType)
# Now it's a Deferred.
d = Deferred.fromCoroutine(r)
self.assertIsInstance(d, Deferred)
# The Deferred has the result we want.
res = self.successResultOf(d)
self.assertEqual(res, "foo")
def test_basic(self) -> None:
"""
L{Deferred.fromCoroutine} allows a function to C{yield from} a
L{Deferred}.
"""
def run():
d = succeed("foo")
res = yield from d
return res
d = Deferred.fromCoroutine(run())
res = self.successResultOf(d)
self.assertEqual(res, "foo")
def test_exception(self) -> None:
"""
An exception in a generator scheduled with L{Deferred.fromCoroutine}
will cause the returned L{Deferred} to fire with a failure.
"""
def run():
d = succeed("foo")
yield from d
raise ValueError("Oh no!")
d = Deferred.fromCoroutine(run())
res = self.failureResultOf(d)
self.assertEqual(type(res.value), ValueError)
self.assertEqual(res.value.args, ("Oh no!",))
def test_twoDeep(self) -> None:
"""
An exception in a generator scheduled with L{Deferred.fromCoroutine}
will cause the returned L{Deferred} to fire with a failure.
"""
reactor = Clock()
sections = []
def runone():
sections.append(2)
d = Deferred()
reactor.callLater(1, d.callback, None)
yield from d
sections.append(3)
return "Yay!"
def run():
sections.append(1)
result = yield from runone()
sections.append(4)
d = Deferred()
reactor.callLater(1, d.callback, None)
yield from d
sections.append(5)
return result
d = Deferred.fromCoroutine(run())
reactor.advance(0.9)
self.assertEqual(sections, [1, 2])
reactor.advance(0.1)
self.assertEqual(sections, [1, 2, 3, 4])
reactor.advance(0.9)
self.assertEqual(sections, [1, 2, 3, 4])
reactor.advance(0.1)
self.assertEqual(sections, [1, 2, 3, 4, 5])
res = self.successResultOf(d)
self.assertEqual(res, "Yay!")
def test_reraise(self) -> None:
"""
Yielding from an already failed Deferred will raise the exception.
"""
def test():
try:
yield from fail(ValueError("Boom"))
except ValueError as e:
self.assertEqual(e.args, ("Boom",))
return 1
return 0
res = self.successResultOf(Deferred.fromCoroutine(test()))
self.assertEqual(res, 1)
def test_chained(self) -> None:
"""
Yielding from a paused & chained Deferred will give the result when it
has one.
"""
reactor = Clock()
def test():
d = Deferred()
d2 = Deferred()
d.addCallback(lambda ignored: d2)
d.callback(None)
reactor.callLater(0, d2.callback, "bye")
res = yield from d
return res
d = Deferred.fromCoroutine(test())
reactor.advance(0.1)
res = self.successResultOf(d)
self.assertEqual(res, "bye")
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| fake_CAs | Folder | 0755 |
|
|
| __init__.py | File | 112 B | 0644 |
|
| _posixifaces.py | File | 4.29 KB | 0644 |
|
| _win32ifaces.py | File | 3.88 KB | 0644 |
|
| connectionmixins.py | File | 19.71 KB | 0644 |
|
| fakeendpoint.py | File | 1.62 KB | 0644 |
|
| modulehelpers.py | File | 1.63 KB | 0644 |
|
| process_cli.py | File | 547 B | 0644 |
|
| process_connectionlost.py | File | 126 B | 0644 |
|
| process_gireactornocompat.py | File | 792 B | 0644 |
|
| process_helper.py | File | 1.22 KB | 0644 |
|
| reactormixins.py | File | 16.05 KB | 0644 |
|
| test_abstract.py | File | 2.15 KB | 0644 |
|
| test_address.py | File | 8.07 KB | 0644 |
|
| test_asyncioreactor.py | File | 9.7 KB | 0644 |
|
| test_base.py | File | 14.44 KB | 0644 |
|
| test_baseprocess.py | File | 2.53 KB | 0644 |
|
| test_cfreactor.py | File | 3.09 KB | 0644 |
|
| test_core.py | File | 10.86 KB | 0644 |
|
| test_default.py | File | 3.5 KB | 0644 |
|
| test_defer_await.py | File | 6.66 KB | 0644 |
|
| test_defer_yieldfrom.py | File | 4.79 KB | 0644 |
|
| test_endpoints.py | File | 147.25 KB | 0644 |
|
| test_epollreactor.py | File | 7.15 KB | 0644 |
|
| test_error.py | File | 1.08 KB | 0644 |
|
| test_fdset.py | File | 13.18 KB | 0644 |
|
| test_filedescriptor.py | File | 2.7 KB | 0644 |
|
| test_gireactor.py | File | 7.18 KB | 0644 |
|
| test_glibbase.py | File | 3.02 KB | 0644 |
|
| test_inlinecb.py | File | 11.21 KB | 0644 |
|
| test_inotify.py | File | 18.13 KB | 0644 |
|
| test_iocp.py | File | 7.52 KB | 0644 |
|
| test_kqueuereactor.py | File | 1.93 KB | 0644 |
|
| test_main.py | File | 1.32 KB | 0644 |
|
| test_newtls.py | File | 6.38 KB | 0644 |
|
| test_pollingfile.py | File | 1.28 KB | 0644 |
|
| test_posixbase.py | File | 11.56 KB | 0644 |
|
| test_posixprocess.py | File | 10.94 KB | 0644 |
|
| test_process.py | File | 44.6 KB | 0644 |
|
| test_protocol.py | File | 18.08 KB | 0644 |
|
| test_reactormixins.py | File | 1.93 KB | 0644 |
|
| test_resolver.py | File | 19.15 KB | 0644 |
|
| test_serialport.py | File | 1.94 KB | 0644 |
|
| test_sigchld.py | File | 3.25 KB | 0644 |
|
| test_socket.py | File | 9.22 KB | 0644 |
|
| test_stdio.py | File | 6.18 KB | 0644 |
|
| test_tcp.py | File | 105.7 KB | 0644 |
|
| test_testing.py | File | 16.49 KB | 0644 |
|
| test_threads.py | File | 7.94 KB | 0644 |
|
| test_time.py | File | 3.57 KB | 0644 |
|
| test_tls.py | File | 12.54 KB | 0644 |
|
| test_udp.py | File | 16.48 KB | 0644 |
|
| test_udp_internals.py | File | 4.98 KB | 0644 |
|
| test_unix.py | File | 34.04 KB | 0644 |
|
| test_win32events.py | File | 6.3 KB | 0644 |
|
| test_win32serialport.py | File | 5.18 KB | 0644 |
|