__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# -*- test-case-name: twisted.internet.test.test_defer_await -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for C{await} support in Deferreds.
"""
import types
from typing import NoReturn
from twisted.internet.defer import (
Deferred,
ensureDeferred,
fail,
maybeDeferred,
succeed,
)
from twisted.internet.task import Clock
from twisted.python.failure import Failure
from twisted.trial.unittest import TestCase
class SampleException(Exception):
"""
A specific sample exception for testing.
"""
class AwaitTests(TestCase):
"""
Tests for using Deferreds in conjunction with PEP-492.
"""
def test_awaitReturnsIterable(self) -> None:
"""
C{Deferred.__await__} returns an iterable.
"""
d: Deferred[None] = Deferred()
awaitedDeferred = d.__await__()
self.assertEqual(awaitedDeferred, iter(awaitedDeferred))
def test_deferredFromCoroutine(self) -> None:
"""
L{Deferred.fromCoroutine} will turn a coroutine into a L{Deferred}.
"""
async def run() -> str:
d = succeed("bar")
await d
res = await run2()
return res
async def run2() -> str:
d = succeed("foo")
res = await d
return res
# It's a coroutine...
r = run()
self.assertIsInstance(r, types.CoroutineType)
# Now it's a Deferred.
d = Deferred.fromCoroutine(r)
self.assertIsInstance(d, Deferred)
# The Deferred has the result we want.
res = self.successResultOf(d)
self.assertEqual(res, "foo")
def test_basic(self) -> None:
"""
L{Deferred.fromCoroutine} allows a function to C{await} on a
L{Deferred}.
"""
async def run() -> str:
d = succeed("foo")
res = await d
return res
d = Deferred.fromCoroutine(run())
res = self.successResultOf(d)
self.assertEqual(res, "foo")
def test_basicEnsureDeferred(self) -> None:
"""
L{ensureDeferred} allows a function to C{await} on a L{Deferred}.
"""
async def run() -> str:
d = succeed("foo")
res = await d
return res
d = ensureDeferred(run())
res = self.successResultOf(d)
self.assertEqual(res, "foo")
def test_exception(self) -> None:
"""
An exception in a coroutine scheduled with L{Deferred.fromCoroutine}
will cause the returned L{Deferred} to fire with a failure.
"""
async def run() -> NoReturn:
d = succeed("foo")
await d
raise ValueError("Oh no!")
d = Deferred.fromCoroutine(run())
res = self.failureResultOf(d)
self.assertEqual(type(res.value), ValueError)
self.assertEqual(res.value.args, ("Oh no!",))
def test_synchronousDeferredFailureTraceback(self) -> None:
"""
When a Deferred is awaited upon that has already failed with a Failure
that has a traceback, both the place that the synchronous traceback
comes from and the awaiting line are shown in the traceback.
"""
def raises() -> None:
raise SampleException()
it = maybeDeferred(raises)
async def doomed() -> None:
return await it
failure = self.failureResultOf(Deferred.fromCoroutine(doomed()))
self.assertIn(", in doomed\n", failure.getTraceback())
self.assertIn(", in raises\n", failure.getTraceback())
def test_asyncDeferredFailureTraceback(self) -> None:
"""
When a Deferred is awaited upon that later fails with a Failure that
has a traceback, both the place that the synchronous traceback comes
from and the awaiting line are shown in the traceback.
"""
def returnsFailure() -> Failure:
try:
raise SampleException()
except SampleException:
return Failure()
it: Deferred[None] = Deferred()
async def doomed() -> None:
return await it
started = Deferred.fromCoroutine(doomed())
self.assertNoResult(started)
it.errback(returnsFailure())
failure = self.failureResultOf(started)
self.assertIn(", in doomed\n", failure.getTraceback())
self.assertIn(", in returnsFailure\n", failure.getTraceback())
def test_twoDeep(self) -> None:
"""
A coroutine scheduled with L{Deferred.fromCoroutine} that awaits a
L{Deferred} suspends its execution until the inner L{Deferred} fires.
"""
reactor = Clock()
sections = []
async def runone() -> str:
sections.append(2)
d: Deferred[int] = Deferred()
reactor.callLater(1, d.callback, 2)
await d
sections.append(3)
return "Yay!"
async def run() -> str:
sections.append(1)
result = await runone()
sections.append(4)
d: Deferred[int] = Deferred()
reactor.callLater(1, d.callback, 1)
await d
sections.append(5)
return result
d = Deferred.fromCoroutine(run())
reactor.advance(0.9)
self.assertEqual(sections, [1, 2])
reactor.advance(0.1)
self.assertEqual(sections, [1, 2, 3, 4])
reactor.advance(0.9)
self.assertEqual(sections, [1, 2, 3, 4])
reactor.advance(0.1)
self.assertEqual(sections, [1, 2, 3, 4, 5])
res = self.successResultOf(d)
self.assertEqual(res, "Yay!")
def test_reraise(self) -> None:
"""
Awaiting an already failed Deferred will raise the exception.
"""
async def test() -> int:
try:
await fail(ValueError("Boom"))
except ValueError as e:
self.assertEqual(e.args, ("Boom",))
return 1
return 0
res = self.successResultOf(Deferred.fromCoroutine(test()))
self.assertEqual(res, 1)
def test_chained(self) -> None:
"""
Awaiting a paused & chained Deferred will give the result when it has
one.
"""
reactor = Clock()
async def test() -> None:
d: Deferred[None] = Deferred()
d2: Deferred[None] = Deferred()
d.addCallback(lambda ignored: d2)
d.callback(None)
reactor.callLater(0, d2.callback, "bye")
return await d
d = Deferred.fromCoroutine(test())
reactor.advance(0.1)
res = self.successResultOf(d)
self.assertEqual(res, "bye")
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| fake_CAs | Folder | 0755 |
|
|
| __init__.py | File | 112 B | 0644 |
|
| _posixifaces.py | File | 4.29 KB | 0644 |
|
| _win32ifaces.py | File | 3.88 KB | 0644 |
|
| connectionmixins.py | File | 19.71 KB | 0644 |
|
| fakeendpoint.py | File | 1.62 KB | 0644 |
|
| modulehelpers.py | File | 1.63 KB | 0644 |
|
| process_cli.py | File | 547 B | 0644 |
|
| process_connectionlost.py | File | 126 B | 0644 |
|
| process_gireactornocompat.py | File | 792 B | 0644 |
|
| process_helper.py | File | 1.22 KB | 0644 |
|
| reactormixins.py | File | 16.05 KB | 0644 |
|
| test_abstract.py | File | 2.15 KB | 0644 |
|
| test_address.py | File | 8.07 KB | 0644 |
|
| test_asyncioreactor.py | File | 9.7 KB | 0644 |
|
| test_base.py | File | 14.44 KB | 0644 |
|
| test_baseprocess.py | File | 2.53 KB | 0644 |
|
| test_cfreactor.py | File | 3.09 KB | 0644 |
|
| test_core.py | File | 10.86 KB | 0644 |
|
| test_default.py | File | 3.5 KB | 0644 |
|
| test_defer_await.py | File | 6.66 KB | 0644 |
|
| test_defer_yieldfrom.py | File | 4.79 KB | 0644 |
|
| test_endpoints.py | File | 147.25 KB | 0644 |
|
| test_epollreactor.py | File | 7.15 KB | 0644 |
|
| test_error.py | File | 1.08 KB | 0644 |
|
| test_fdset.py | File | 13.18 KB | 0644 |
|
| test_filedescriptor.py | File | 2.7 KB | 0644 |
|
| test_gireactor.py | File | 7.18 KB | 0644 |
|
| test_glibbase.py | File | 3.02 KB | 0644 |
|
| test_inlinecb.py | File | 11.21 KB | 0644 |
|
| test_inotify.py | File | 18.13 KB | 0644 |
|
| test_iocp.py | File | 7.52 KB | 0644 |
|
| test_kqueuereactor.py | File | 1.93 KB | 0644 |
|
| test_main.py | File | 1.32 KB | 0644 |
|
| test_newtls.py | File | 6.38 KB | 0644 |
|
| test_pollingfile.py | File | 1.28 KB | 0644 |
|
| test_posixbase.py | File | 11.56 KB | 0644 |
|
| test_posixprocess.py | File | 10.94 KB | 0644 |
|
| test_process.py | File | 44.6 KB | 0644 |
|
| test_protocol.py | File | 18.08 KB | 0644 |
|
| test_reactormixins.py | File | 1.93 KB | 0644 |
|
| test_resolver.py | File | 19.15 KB | 0644 |
|
| test_serialport.py | File | 1.94 KB | 0644 |
|
| test_sigchld.py | File | 3.25 KB | 0644 |
|
| test_socket.py | File | 9.22 KB | 0644 |
|
| test_stdio.py | File | 6.18 KB | 0644 |
|
| test_tcp.py | File | 105.7 KB | 0644 |
|
| test_testing.py | File | 16.49 KB | 0644 |
|
| test_threads.py | File | 7.94 KB | 0644 |
|
| test_time.py | File | 3.57 KB | 0644 |
|
| test_tls.py | File | 12.54 KB | 0644 |
|
| test_udp.py | File | 16.48 KB | 0644 |
|
| test_udp_internals.py | File | 4.98 KB | 0644 |
|
| test_unix.py | File | 34.04 KB | 0644 |
|
| test_win32events.py | File | 6.3 KB | 0644 |
|
| test_win32serialport.py | File | 5.18 KB | 0644 |
|