__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.internet.defer.deferredGenerator} and related APIs.
"""

import traceback

from twisted.internet import defer, reactor, task
from twisted.internet.defer import (
    Deferred,
    deferredGenerator,
    inlineCallbacks,
    returnValue,
    waitForDeferred,
)
from twisted.python.util import runWithWarningsSuppressed
from twisted.trial import unittest
from twisted.trial.util import suppress as SUPPRESS


def getThing():
    d = Deferred()
    reactor.callLater(0, d.callback, "hi")
    return d


def getOwie():
    d = Deferred()

    def CRAP():
        d.errback(ZeroDivisionError("OMG"))

    reactor.callLater(0, CRAP)
    return d


# NOTE: most of the tests in DeferredGeneratorTests are duplicated
# with slightly different syntax for the InlineCallbacksTests below.


class TerminalException(Exception):
    pass


class BaseDefgenTests:
    """
    This class sets up a bunch of test cases which will test both
    deferredGenerator and inlineCallbacks based generators. The subclasses
    DeferredGeneratorTests and InlineCallbacksTests each provide the actual
    generator implementations tested.
    """

    def testBasics(self):
        """
        Test that a normal deferredGenerator works.  Tests yielding a
        deferred which callbacks, as well as a deferred errbacks. Also
        ensures returning a final value works.
        """

        return self._genBasics().addCallback(self.assertEqual, "WOOSH")

    def testBuggy(self):
        """
        Ensure that a buggy generator properly signals a Failure
        condition on result deferred.
        """
        return self.assertFailure(self._genBuggy(), ZeroDivisionError)

    def testNothing(self):
        """Test that a generator which never yields results in None."""

        return self._genNothing().addCallback(self.assertEqual, None)

    def testHandledTerminalFailure(self):
        """
        Create a Deferred Generator which yields a Deferred which fails and
        handles the exception which results.  Assert that the Deferred
        Generator does not errback its Deferred.
        """
        return self._genHandledTerminalFailure().addCallback(self.assertEqual, None)

    def testHandledTerminalAsyncFailure(self):
        """
        Just like testHandledTerminalFailure, only with a Deferred which fires
        asynchronously with an error.
        """
        d = defer.Deferred()
        deferredGeneratorResultDeferred = self._genHandledTerminalAsyncFailure(d)
        d.errback(TerminalException("Handled Terminal Failure"))
        return deferredGeneratorResultDeferred.addCallback(self.assertEqual, None)

    def testStackUsage(self):
        """
        Make sure we don't blow the stack when yielding immediately
        available deferreds.
        """
        return self._genStackUsage().addCallback(self.assertEqual, 0)

    def testStackUsage2(self):
        """
        Make sure we don't blow the stack when yielding immediately
        available values.
        """
        return self._genStackUsage2().addCallback(self.assertEqual, 0)


def deprecatedDeferredGenerator(f):
    """
    Calls L{deferredGenerator} while suppressing the deprecation warning.

    @param f: Function to call
    @return: Return value of function.
    """
    return runWithWarningsSuppressed(
        [
            SUPPRESS(
                message="twisted.internet.defer.deferredGenerator was " "deprecated"
            )
        ],
        deferredGenerator,
        f,
    )


class DeferredGeneratorTests(BaseDefgenTests, unittest.TestCase):
    # First provide all the generator impls necessary for BaseDefgenTests
    @deprecatedDeferredGenerator
    def _genBasics(self):
        x = waitForDeferred(getThing())
        yield x
        x = x.getResult()

        self.assertEqual(x, "hi")

        ow = waitForDeferred(getOwie())
        yield ow
        try:
            ow.getResult()
        except ZeroDivisionError as e:
            self.assertEqual(str(e), "OMG")
        yield "WOOSH"
        return

    @deprecatedDeferredGenerator
    def _genBuggy(self):
        yield waitForDeferred(getThing())
        1 // 0

    @deprecatedDeferredGenerator
    def _genNothing(self):
        if False:
            yield 1

    @deprecatedDeferredGenerator
    def _genHandledTerminalFailure(self):
        x = waitForDeferred(defer.fail(TerminalException("Handled Terminal Failure")))
        yield x
        try:
            x.getResult()
        except TerminalException:
            pass

    @deprecatedDeferredGenerator
    def _genHandledTerminalAsyncFailure(self, d):
        x = waitForDeferred(d)
        yield x
        try:
            x.getResult()
        except TerminalException:
            pass

    def _genStackUsage(self):
        for x in range(5000):
            # Test with yielding a deferred
            x = waitForDeferred(defer.succeed(1))
            yield x
            x = x.getResult()
        yield 0

    _genStackUsage = deprecatedDeferredGenerator(_genStackUsage)

    def _genStackUsage2(self):
        for x in range(5000):
            # Test with yielding a random value
            yield 1
        yield 0

    _genStackUsage2 = deprecatedDeferredGenerator(_genStackUsage2)

    # Tests unique to deferredGenerator

    def testDeferredYielding(self):
        """
        Ensure that yielding a Deferred directly is trapped as an
        error.
        """

        # See the comment _deferGenerator about d.callback(Deferred).
        def _genDeferred():
            yield getThing()

        _genDeferred = deprecatedDeferredGenerator(_genDeferred)

        return self.assertFailure(_genDeferred(), TypeError)

    suppress = [
        SUPPRESS(message="twisted.internet.defer.waitForDeferred was " "deprecated")
    ]


class InlineCallbacksTests(BaseDefgenTests, unittest.TestCase):
    # First provide all the generator impls necessary for BaseDefgenTests

    def _genBasics(self):
        x = yield getThing()

        self.assertEqual(x, "hi")

        try:
            yield getOwie()
        except ZeroDivisionError as e:
            self.assertEqual(str(e), "OMG")
        returnValue("WOOSH")

    _genBasics = inlineCallbacks(_genBasics)

    def _genBuggy(self):
        yield getThing()
        1 / 0

    _genBuggy = inlineCallbacks(_genBuggy)

    def _genNothing(self):
        if False:
            yield 1

    _genNothing = inlineCallbacks(_genNothing)

    def _genHandledTerminalFailure(self):
        try:
            yield defer.fail(TerminalException("Handled Terminal Failure"))
        except TerminalException:
            pass

    _genHandledTerminalFailure = inlineCallbacks(_genHandledTerminalFailure)

    def _genHandledTerminalAsyncFailure(self, d):
        try:
            yield d
        except TerminalException:
            pass

    _genHandledTerminalAsyncFailure = inlineCallbacks(_genHandledTerminalAsyncFailure)

    def _genStackUsage(self):
        for x in range(5000):
            # Test with yielding a deferred
            yield defer.succeed(1)
        returnValue(0)

    _genStackUsage = inlineCallbacks(_genStackUsage)

    def _genStackUsage2(self):
        for x in range(5000):
            # Test with yielding a random value
            yield 1
        returnValue(0)

    _genStackUsage2 = inlineCallbacks(_genStackUsage2)

    # Tests unique to inlineCallbacks

    def testYieldNonDeferred(self):
        """
        Ensure that yielding a non-deferred passes it back as the
        result of the yield expression.

        @return: A L{twisted.internet.defer.Deferred}
        @rtype: L{twisted.internet.defer.Deferred}
        """

        def _test():
            yield 5
            returnValue(5)

        _test = inlineCallbacks(_test)

        return _test().addCallback(self.assertEqual, 5)

    def testReturnNoValue(self):
        """Ensure a standard python return results in a None result."""

        def _noReturn():
            yield 5
            return

        _noReturn = inlineCallbacks(_noReturn)

        return _noReturn().addCallback(self.assertEqual, None)

    def testReturnValue(self):
        """Ensure that returnValue works."""

        def _return():
            yield 5
            returnValue(6)

        _return = inlineCallbacks(_return)

        return _return().addCallback(self.assertEqual, 6)

    def test_nonGeneratorReturn(self):
        """
        Ensure that C{TypeError} with a message about L{inlineCallbacks} is
        raised when a non-generator returns something other than a generator.
        """

        def _noYield():
            return 5

        _noYield = inlineCallbacks(_noYield)

        self.assertIn("inlineCallbacks", str(self.assertRaises(TypeError, _noYield)))

    def test_nonGeneratorReturnValue(self):
        """
        Ensure that C{TypeError} with a message about L{inlineCallbacks} is
        raised when a non-generator calls L{returnValue}.
        """

        def _noYield():
            returnValue(5)

        _noYield = inlineCallbacks(_noYield)

        self.assertIn("inlineCallbacks", str(self.assertRaises(TypeError, _noYield)))

    def test_internalDefGenReturnValueDoesntLeak(self):
        """
        When one inlineCallbacks calls another, the internal L{_DefGen_Return}
        flow control exception raised by calling L{defer.returnValue} doesn't
        leak into tracebacks captured in the caller.
        """
        clock = task.Clock()

        @inlineCallbacks
        def _returns():
            """
            This is the inner function using returnValue.
            """
            yield task.deferLater(clock, 0)
            returnValue("actual-value-not-used-for-the-test")

        @inlineCallbacks
        def _raises():
            try:
                yield _returns()
                raise TerminalException("boom returnValue")
            except TerminalException:
                return traceback.format_exc()

        d = _raises()
        clock.advance(0)
        tb = self.successResultOf(d)

        # The internal exception is not in the traceback.
        self.assertNotIn("_DefGen_Return", tb)
        # No other extra exception is in the traceback.
        self.assertNotIn(
            "During handling of the above exception, another exception occurred", tb
        )
        # Our targeted exception is in the traceback
        self.assertIn("test_defgen.TerminalException: boom returnValue", tb)

    def test_internalStopIterationDoesntLeak(self):
        """
        When one inlineCallbacks calls another, the internal L{StopIteration}
        flow control exception generated when the inner generator returns
        doesn't leak into tracebacks captured in the caller.

        This is similar to C{test_internalDefGenReturnValueDoesntLeak} but the
        inner function uses the "normal" return statemement rather than the
        C{returnValue} helper.
        """
        clock = task.Clock()

        @inlineCallbacks
        def _returns():
            yield task.deferLater(clock, 0)
            return 6

        @inlineCallbacks
        def _raises():
            try:
                yield _returns()
                raise TerminalException("boom normal return")
            except TerminalException:
                return traceback.format_exc()

        d = _raises()
        clock.advance(0)
        tb = self.successResultOf(d)

        # The internal exception is not in the traceback.
        self.assertNotIn("StopIteration", tb)
        # No other extra exception is in the traceback.
        self.assertNotIn(
            "During handling of the above exception, another exception occurred", tb
        )
        # Our targeted exception is in the traceback
        self.assertIn("test_defgen.TerminalException: boom normal return", tb)


class DeprecateDeferredGeneratorTests(unittest.SynchronousTestCase):
    """
    Tests that L{DeferredGeneratorTests} and L{waitForDeferred} are
    deprecated.
    """

    def test_deferredGeneratorDeprecated(self):
        """
        L{deferredGenerator} is deprecated.
        """

        @deferredGenerator
        def decoratedFunction():
            yield None

        warnings = self.flushWarnings([self.test_deferredGeneratorDeprecated])
        self.assertEqual(len(warnings), 1)
        self.assertEqual(warnings[0]["category"], DeprecationWarning)
        self.assertEqual(
            warnings[0]["message"],
            "twisted.internet.defer.deferredGenerator was deprecated in "
            "Twisted 15.0.0; please use "
            "twisted.internet.defer.inlineCallbacks instead",
        )

    def test_waitForDeferredDeprecated(self):
        """
        L{waitForDeferred} is deprecated.
        """
        d = Deferred()
        waitForDeferred(d)

        warnings = self.flushWarnings([self.test_waitForDeferredDeprecated])
        self.assertEqual(len(warnings), 1)
        self.assertEqual(warnings[0]["category"], DeprecationWarning)
        self.assertEqual(
            warnings[0]["message"],
            "twisted.internet.defer.waitForDeferred was deprecated in "
            "Twisted 15.0.0; please use "
            "twisted.internet.defer.inlineCallbacks instead",
        )

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 475 B 0644
cert.pem.no_trailing_newline File 1.46 KB 0644
crash_test_dummy.py File 654 B 0644
ignore_test_failure.py File 34.88 KB 0644
iosim.py File 18.4 KB 0644
key.pem.no_trailing_newline File 1.63 KB 0644
mock_win32process.py File 1.25 KB 0644
myrebuilder1.py File 172 B 0644
myrebuilder2.py File 172 B 0644
plugin_basic.py File 925 B 0644
plugin_extra1.py File 400 B 0644
plugin_extra2.py File 566 B 0644
process_cmdline.py File 123 B 0644
process_echoer.py File 214 B 0644
process_fds.py File 983 B 0644
process_getargv.py File 233 B 0644
process_getenv.py File 268 B 0644
process_linger.py File 297 B 0644
process_reader.py File 178 B 0644
process_signal.py File 220 B 0644
process_stdinreader.py File 739 B 0644
process_tester.py File 787 B 0644
process_tty.py File 130 B 0644
process_twisted.py File 1.15 KB 0644
proto_helpers.py File 1.34 KB 0644
reflect_helper_IE.py File 60 B 0644
reflect_helper_VE.py File 81 B 0644
reflect_helper_ZDE.py File 48 B 0644
server.pem File 5.23 KB 0644
ssl_helpers.py File 1.72 KB 0644
stdio_test_consumer.py File 1.14 KB 0644
stdio_test_halfclose.py File 2 KB 0644
stdio_test_hostpeer.py File 1.06 KB 0644
stdio_test_lastwrite.py File 1.13 KB 0644
stdio_test_loseconn.py File 1.55 KB 0644
stdio_test_producer.py File 1.45 KB 0644
stdio_test_write.py File 902 B 0644
stdio_test_writeseq.py File 894 B 0644
test_abstract.py File 3.66 KB 0644
test_adbapi.py File 25.47 KB 0644
test_amp.py File 108.04 KB 0644
test_application.py File 33.34 KB 0644
test_compat.py File 17.75 KB 0644
test_context.py File 1.43 KB 0644
test_cooperator.py File 20.84 KB 0644
test_defer.py File 143.37 KB 0644
test_defgen.py File 13.01 KB 0644
test_dirdbm.py File 6.9 KB 0644
test_error.py File 9.6 KB 0644
test_factories.py File 4.46 KB 0644
test_fdesc.py File 7.28 KB 0644
test_finger.py File 1.89 KB 0644
test_formmethod.py File 4.27 KB 0644
test_ftp.py File 126.96 KB 0644
test_ftp_options.py File 2.65 KB 0644
test_htb.py File 3.19 KB 0644
test_ident.py File 6.56 KB 0644
test_internet.py File 45.38 KB 0644
test_iosim.py File 9.58 KB 0644
test_iutils.py File 13.31 KB 0644
test_lockfile.py File 15.54 KB 0644
test_log.py File 36.86 KB 0644
test_logfile.py File 17.79 KB 0644
test_loopback.py File 13.99 KB 0644
test_main.py File 2.12 KB 0644
test_memcache.py File 24.69 KB 0644
test_modules.py File 17.84 KB 0644
test_monkey.py File 6.38 KB 0644
test_paths.py File 73.64 KB 0644
test_pcp.py File 12.23 KB 0644
test_persisted.py File 14.73 KB 0644
test_plugin.py File 26.02 KB 0644
test_policies.py File 32.28 KB 0644
test_postfix.py File 4.32 KB 0644
test_process.py File 86.29 KB 0644
test_protocols.py File 7.16 KB 0644
test_randbytes.py File 3.63 KB 0644
test_rebuild.py File 7.4 KB 0644
test_reflect.py File 23.89 KB 0644
test_roots.py File 1.65 KB 0644
test_shortcut.py File 1.91 KB 0644
test_sip.py File 24.9 KB 0644
test_sob.py File 5.53 KB 0644
test_socks.py File 17.09 KB 0644
test_ssl.py File 22.73 KB 0644
test_sslverify.py File 113.84 KB 0644
test_stateful.py File 1.97 KB 0644
test_stdio.py File 12.43 KB 0644
test_strerror.py File 5.1 KB 0644
test_strports.py File 1.67 KB 0644
test_task.py File 47.73 KB 0644
test_tcp.py File 64.26 KB 0644
test_tcp_internals.py File 12.73 KB 0644
test_text.py File 6.47 KB 0644
test_threadable.py File 3.26 KB 0644
test_threadpool.py File 21.64 KB 0644
test_threads.py File 12.9 KB 0644
test_tpfile.py File 1.69 KB 0644
test_twistd.py File 72.29 KB 0644
test_twisted.py File 6.13 KB 0644
test_udp.py File 26.79 KB 0644
test_unix.py File 13.26 KB 0644
test_usage.py File 22.76 KB 0644
testutils.py File 5.06 KB 0644
Filemanager