__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet.defer.deferredGenerator} and related APIs.
"""
import traceback
from twisted.internet import defer, reactor, task
from twisted.internet.defer import (
Deferred,
deferredGenerator,
inlineCallbacks,
returnValue,
waitForDeferred,
)
from twisted.python.util import runWithWarningsSuppressed
from twisted.trial import unittest
from twisted.trial.util import suppress as SUPPRESS
def getThing():
d = Deferred()
reactor.callLater(0, d.callback, "hi")
return d
def getOwie():
d = Deferred()
def CRAP():
d.errback(ZeroDivisionError("OMG"))
reactor.callLater(0, CRAP)
return d
# NOTE: most of the tests in DeferredGeneratorTests are duplicated
# with slightly different syntax for the InlineCallbacksTests below.
class TerminalException(Exception):
pass
class BaseDefgenTests:
"""
This class sets up a bunch of test cases which will test both
deferredGenerator and inlineCallbacks based generators. The subclasses
DeferredGeneratorTests and InlineCallbacksTests each provide the actual
generator implementations tested.
"""
def testBasics(self):
"""
Test that a normal deferredGenerator works. Tests yielding a
deferred which callbacks, as well as a deferred errbacks. Also
ensures returning a final value works.
"""
return self._genBasics().addCallback(self.assertEqual, "WOOSH")
def testBuggy(self):
"""
Ensure that a buggy generator properly signals a Failure
condition on result deferred.
"""
return self.assertFailure(self._genBuggy(), ZeroDivisionError)
def testNothing(self):
"""Test that a generator which never yields results in None."""
return self._genNothing().addCallback(self.assertEqual, None)
def testHandledTerminalFailure(self):
"""
Create a Deferred Generator which yields a Deferred which fails and
handles the exception which results. Assert that the Deferred
Generator does not errback its Deferred.
"""
return self._genHandledTerminalFailure().addCallback(self.assertEqual, None)
def testHandledTerminalAsyncFailure(self):
"""
Just like testHandledTerminalFailure, only with a Deferred which fires
asynchronously with an error.
"""
d = defer.Deferred()
deferredGeneratorResultDeferred = self._genHandledTerminalAsyncFailure(d)
d.errback(TerminalException("Handled Terminal Failure"))
return deferredGeneratorResultDeferred.addCallback(self.assertEqual, None)
def testStackUsage(self):
"""
Make sure we don't blow the stack when yielding immediately
available deferreds.
"""
return self._genStackUsage().addCallback(self.assertEqual, 0)
def testStackUsage2(self):
"""
Make sure we don't blow the stack when yielding immediately
available values.
"""
return self._genStackUsage2().addCallback(self.assertEqual, 0)
def deprecatedDeferredGenerator(f):
"""
Calls L{deferredGenerator} while suppressing the deprecation warning.
@param f: Function to call
@return: Return value of function.
"""
return runWithWarningsSuppressed(
[
SUPPRESS(
message="twisted.internet.defer.deferredGenerator was " "deprecated"
)
],
deferredGenerator,
f,
)
class DeferredGeneratorTests(BaseDefgenTests, unittest.TestCase):
# First provide all the generator impls necessary for BaseDefgenTests
@deprecatedDeferredGenerator
def _genBasics(self):
x = waitForDeferred(getThing())
yield x
x = x.getResult()
self.assertEqual(x, "hi")
ow = waitForDeferred(getOwie())
yield ow
try:
ow.getResult()
except ZeroDivisionError as e:
self.assertEqual(str(e), "OMG")
yield "WOOSH"
return
@deprecatedDeferredGenerator
def _genBuggy(self):
yield waitForDeferred(getThing())
1 // 0
@deprecatedDeferredGenerator
def _genNothing(self):
if False:
yield 1
@deprecatedDeferredGenerator
def _genHandledTerminalFailure(self):
x = waitForDeferred(defer.fail(TerminalException("Handled Terminal Failure")))
yield x
try:
x.getResult()
except TerminalException:
pass
@deprecatedDeferredGenerator
def _genHandledTerminalAsyncFailure(self, d):
x = waitForDeferred(d)
yield x
try:
x.getResult()
except TerminalException:
pass
def _genStackUsage(self):
for x in range(5000):
# Test with yielding a deferred
x = waitForDeferred(defer.succeed(1))
yield x
x = x.getResult()
yield 0
_genStackUsage = deprecatedDeferredGenerator(_genStackUsage)
def _genStackUsage2(self):
for x in range(5000):
# Test with yielding a random value
yield 1
yield 0
_genStackUsage2 = deprecatedDeferredGenerator(_genStackUsage2)
# Tests unique to deferredGenerator
def testDeferredYielding(self):
"""
Ensure that yielding a Deferred directly is trapped as an
error.
"""
# See the comment _deferGenerator about d.callback(Deferred).
def _genDeferred():
yield getThing()
_genDeferred = deprecatedDeferredGenerator(_genDeferred)
return self.assertFailure(_genDeferred(), TypeError)
suppress = [
SUPPRESS(message="twisted.internet.defer.waitForDeferred was " "deprecated")
]
class InlineCallbacksTests(BaseDefgenTests, unittest.TestCase):
# First provide all the generator impls necessary for BaseDefgenTests
def _genBasics(self):
x = yield getThing()
self.assertEqual(x, "hi")
try:
yield getOwie()
except ZeroDivisionError as e:
self.assertEqual(str(e), "OMG")
returnValue("WOOSH")
_genBasics = inlineCallbacks(_genBasics)
def _genBuggy(self):
yield getThing()
1 / 0
_genBuggy = inlineCallbacks(_genBuggy)
def _genNothing(self):
if False:
yield 1
_genNothing = inlineCallbacks(_genNothing)
def _genHandledTerminalFailure(self):
try:
yield defer.fail(TerminalException("Handled Terminal Failure"))
except TerminalException:
pass
_genHandledTerminalFailure = inlineCallbacks(_genHandledTerminalFailure)
def _genHandledTerminalAsyncFailure(self, d):
try:
yield d
except TerminalException:
pass
_genHandledTerminalAsyncFailure = inlineCallbacks(_genHandledTerminalAsyncFailure)
def _genStackUsage(self):
for x in range(5000):
# Test with yielding a deferred
yield defer.succeed(1)
returnValue(0)
_genStackUsage = inlineCallbacks(_genStackUsage)
def _genStackUsage2(self):
for x in range(5000):
# Test with yielding a random value
yield 1
returnValue(0)
_genStackUsage2 = inlineCallbacks(_genStackUsage2)
# Tests unique to inlineCallbacks
def testYieldNonDeferred(self):
"""
Ensure that yielding a non-deferred passes it back as the
result of the yield expression.
@return: A L{twisted.internet.defer.Deferred}
@rtype: L{twisted.internet.defer.Deferred}
"""
def _test():
yield 5
returnValue(5)
_test = inlineCallbacks(_test)
return _test().addCallback(self.assertEqual, 5)
def testReturnNoValue(self):
"""Ensure a standard python return results in a None result."""
def _noReturn():
yield 5
return
_noReturn = inlineCallbacks(_noReturn)
return _noReturn().addCallback(self.assertEqual, None)
def testReturnValue(self):
"""Ensure that returnValue works."""
def _return():
yield 5
returnValue(6)
_return = inlineCallbacks(_return)
return _return().addCallback(self.assertEqual, 6)
def test_nonGeneratorReturn(self):
"""
Ensure that C{TypeError} with a message about L{inlineCallbacks} is
raised when a non-generator returns something other than a generator.
"""
def _noYield():
return 5
_noYield = inlineCallbacks(_noYield)
self.assertIn("inlineCallbacks", str(self.assertRaises(TypeError, _noYield)))
def test_nonGeneratorReturnValue(self):
"""
Ensure that C{TypeError} with a message about L{inlineCallbacks} is
raised when a non-generator calls L{returnValue}.
"""
def _noYield():
returnValue(5)
_noYield = inlineCallbacks(_noYield)
self.assertIn("inlineCallbacks", str(self.assertRaises(TypeError, _noYield)))
def test_internalDefGenReturnValueDoesntLeak(self):
"""
When one inlineCallbacks calls another, the internal L{_DefGen_Return}
flow control exception raised by calling L{defer.returnValue} doesn't
leak into tracebacks captured in the caller.
"""
clock = task.Clock()
@inlineCallbacks
def _returns():
"""
This is the inner function using returnValue.
"""
yield task.deferLater(clock, 0)
returnValue("actual-value-not-used-for-the-test")
@inlineCallbacks
def _raises():
try:
yield _returns()
raise TerminalException("boom returnValue")
except TerminalException:
return traceback.format_exc()
d = _raises()
clock.advance(0)
tb = self.successResultOf(d)
# The internal exception is not in the traceback.
self.assertNotIn("_DefGen_Return", tb)
# No other extra exception is in the traceback.
self.assertNotIn(
"During handling of the above exception, another exception occurred", tb
)
# Our targeted exception is in the traceback
self.assertIn("test_defgen.TerminalException: boom returnValue", tb)
def test_internalStopIterationDoesntLeak(self):
"""
When one inlineCallbacks calls another, the internal L{StopIteration}
flow control exception generated when the inner generator returns
doesn't leak into tracebacks captured in the caller.
This is similar to C{test_internalDefGenReturnValueDoesntLeak} but the
inner function uses the "normal" return statemement rather than the
C{returnValue} helper.
"""
clock = task.Clock()
@inlineCallbacks
def _returns():
yield task.deferLater(clock, 0)
return 6
@inlineCallbacks
def _raises():
try:
yield _returns()
raise TerminalException("boom normal return")
except TerminalException:
return traceback.format_exc()
d = _raises()
clock.advance(0)
tb = self.successResultOf(d)
# The internal exception is not in the traceback.
self.assertNotIn("StopIteration", tb)
# No other extra exception is in the traceback.
self.assertNotIn(
"During handling of the above exception, another exception occurred", tb
)
# Our targeted exception is in the traceback
self.assertIn("test_defgen.TerminalException: boom normal return", tb)
class DeprecateDeferredGeneratorTests(unittest.SynchronousTestCase):
"""
Tests that L{DeferredGeneratorTests} and L{waitForDeferred} are
deprecated.
"""
def test_deferredGeneratorDeprecated(self):
"""
L{deferredGenerator} is deprecated.
"""
@deferredGenerator
def decoratedFunction():
yield None
warnings = self.flushWarnings([self.test_deferredGeneratorDeprecated])
self.assertEqual(len(warnings), 1)
self.assertEqual(warnings[0]["category"], DeprecationWarning)
self.assertEqual(
warnings[0]["message"],
"twisted.internet.defer.deferredGenerator was deprecated in "
"Twisted 15.0.0; please use "
"twisted.internet.defer.inlineCallbacks instead",
)
def test_waitForDeferredDeprecated(self):
"""
L{waitForDeferred} is deprecated.
"""
d = Deferred()
waitForDeferred(d)
warnings = self.flushWarnings([self.test_waitForDeferredDeprecated])
self.assertEqual(len(warnings), 1)
self.assertEqual(warnings[0]["category"], DeprecationWarning)
self.assertEqual(
warnings[0]["message"],
"twisted.internet.defer.waitForDeferred was deprecated in "
"Twisted 15.0.0; please use "
"twisted.internet.defer.inlineCallbacks instead",
)
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| __init__.py | File | 475 B | 0644 |
|
| cert.pem.no_trailing_newline | File | 1.46 KB | 0644 |
|
| crash_test_dummy.py | File | 654 B | 0644 |
|
| ignore_test_failure.py | File | 34.88 KB | 0644 |
|
| iosim.py | File | 18.4 KB | 0644 |
|
| key.pem.no_trailing_newline | File | 1.63 KB | 0644 |
|
| mock_win32process.py | File | 1.25 KB | 0644 |
|
| myrebuilder1.py | File | 172 B | 0644 |
|
| myrebuilder2.py | File | 172 B | 0644 |
|
| plugin_basic.py | File | 925 B | 0644 |
|
| plugin_extra1.py | File | 400 B | 0644 |
|
| plugin_extra2.py | File | 566 B | 0644 |
|
| process_cmdline.py | File | 123 B | 0644 |
|
| process_echoer.py | File | 214 B | 0644 |
|
| process_fds.py | File | 983 B | 0644 |
|
| process_getargv.py | File | 233 B | 0644 |
|
| process_getenv.py | File | 268 B | 0644 |
|
| process_linger.py | File | 297 B | 0644 |
|
| process_reader.py | File | 178 B | 0644 |
|
| process_signal.py | File | 220 B | 0644 |
|
| process_stdinreader.py | File | 739 B | 0644 |
|
| process_tester.py | File | 787 B | 0644 |
|
| process_tty.py | File | 130 B | 0644 |
|
| process_twisted.py | File | 1.15 KB | 0644 |
|
| proto_helpers.py | File | 1.34 KB | 0644 |
|
| reflect_helper_IE.py | File | 60 B | 0644 |
|
| reflect_helper_VE.py | File | 81 B | 0644 |
|
| reflect_helper_ZDE.py | File | 48 B | 0644 |
|
| server.pem | File | 5.23 KB | 0644 |
|
| ssl_helpers.py | File | 1.72 KB | 0644 |
|
| stdio_test_consumer.py | File | 1.14 KB | 0644 |
|
| stdio_test_halfclose.py | File | 2 KB | 0644 |
|
| stdio_test_hostpeer.py | File | 1.06 KB | 0644 |
|
| stdio_test_lastwrite.py | File | 1.13 KB | 0644 |
|
| stdio_test_loseconn.py | File | 1.55 KB | 0644 |
|
| stdio_test_producer.py | File | 1.45 KB | 0644 |
|
| stdio_test_write.py | File | 902 B | 0644 |
|
| stdio_test_writeseq.py | File | 894 B | 0644 |
|
| test_abstract.py | File | 3.66 KB | 0644 |
|
| test_adbapi.py | File | 25.47 KB | 0644 |
|
| test_amp.py | File | 108.04 KB | 0644 |
|
| test_application.py | File | 33.34 KB | 0644 |
|
| test_compat.py | File | 17.75 KB | 0644 |
|
| test_context.py | File | 1.43 KB | 0644 |
|
| test_cooperator.py | File | 20.84 KB | 0644 |
|
| test_defer.py | File | 143.37 KB | 0644 |
|
| test_defgen.py | File | 13.01 KB | 0644 |
|
| test_dirdbm.py | File | 6.9 KB | 0644 |
|
| test_error.py | File | 9.6 KB | 0644 |
|
| test_factories.py | File | 4.46 KB | 0644 |
|
| test_fdesc.py | File | 7.28 KB | 0644 |
|
| test_finger.py | File | 1.89 KB | 0644 |
|
| test_formmethod.py | File | 4.27 KB | 0644 |
|
| test_ftp.py | File | 126.96 KB | 0644 |
|
| test_ftp_options.py | File | 2.65 KB | 0644 |
|
| test_htb.py | File | 3.19 KB | 0644 |
|
| test_ident.py | File | 6.56 KB | 0644 |
|
| test_internet.py | File | 45.38 KB | 0644 |
|
| test_iosim.py | File | 9.58 KB | 0644 |
|
| test_iutils.py | File | 13.31 KB | 0644 |
|
| test_lockfile.py | File | 15.54 KB | 0644 |
|
| test_log.py | File | 36.86 KB | 0644 |
|
| test_logfile.py | File | 17.79 KB | 0644 |
|
| test_loopback.py | File | 13.99 KB | 0644 |
|
| test_main.py | File | 2.12 KB | 0644 |
|
| test_memcache.py | File | 24.69 KB | 0644 |
|
| test_modules.py | File | 17.84 KB | 0644 |
|
| test_monkey.py | File | 6.38 KB | 0644 |
|
| test_paths.py | File | 73.64 KB | 0644 |
|
| test_pcp.py | File | 12.23 KB | 0644 |
|
| test_persisted.py | File | 14.73 KB | 0644 |
|
| test_plugin.py | File | 26.02 KB | 0644 |
|
| test_policies.py | File | 32.28 KB | 0644 |
|
| test_postfix.py | File | 4.32 KB | 0644 |
|
| test_process.py | File | 86.29 KB | 0644 |
|
| test_protocols.py | File | 7.16 KB | 0644 |
|
| test_randbytes.py | File | 3.63 KB | 0644 |
|
| test_rebuild.py | File | 7.4 KB | 0644 |
|
| test_reflect.py | File | 23.89 KB | 0644 |
|
| test_roots.py | File | 1.65 KB | 0644 |
|
| test_shortcut.py | File | 1.91 KB | 0644 |
|
| test_sip.py | File | 24.9 KB | 0644 |
|
| test_sob.py | File | 5.53 KB | 0644 |
|
| test_socks.py | File | 17.09 KB | 0644 |
|
| test_ssl.py | File | 22.73 KB | 0644 |
|
| test_sslverify.py | File | 113.84 KB | 0644 |
|
| test_stateful.py | File | 1.97 KB | 0644 |
|
| test_stdio.py | File | 12.43 KB | 0644 |
|
| test_strerror.py | File | 5.1 KB | 0644 |
|
| test_strports.py | File | 1.67 KB | 0644 |
|
| test_task.py | File | 47.73 KB | 0644 |
|
| test_tcp.py | File | 64.26 KB | 0644 |
|
| test_tcp_internals.py | File | 12.73 KB | 0644 |
|
| test_text.py | File | 6.47 KB | 0644 |
|
| test_threadable.py | File | 3.26 KB | 0644 |
|
| test_threadpool.py | File | 21.64 KB | 0644 |
|
| test_threads.py | File | 12.9 KB | 0644 |
|
| test_tpfile.py | File | 1.69 KB | 0644 |
|
| test_twistd.py | File | 72.29 KB | 0644 |
|
| test_twisted.py | File | 6.13 KB | 0644 |
|
| test_udp.py | File | 26.79 KB | 0644 |
|
| test_unix.py | File | 13.26 KB | 0644 |
|
| test_usage.py | File | 22.76 KB | 0644 |
|
| testutils.py | File | 5.06 KB | 0644 |
|