__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorThreads}.
"""


import gc
import threading
from weakref import ref

from twisted.internet.interfaces import IReactorThreads
from twisted.internet.test.reactormixins import ReactorBuilder
from twisted.python.threadable import isInIOThread
from twisted.python.threadpool import ThreadPool
from twisted.python.versions import Version


class ThreadTestsBuilder(ReactorBuilder):
    """
    Builder for defining tests relating to L{IReactorThreads}.
    """

    requiredInterfaces = (IReactorThreads,)

    def test_getThreadPool(self):
        """
        C{reactor.getThreadPool()} returns an instance of L{ThreadPool} which
        starts when C{reactor.run()} is called and stops before it returns.
        """
        state = []
        reactor = self.buildReactor()

        pool = reactor.getThreadPool()
        self.assertIsInstance(pool, ThreadPool)
        self.assertFalse(pool.started, "Pool should not start before reactor.run")

        def f():
            # Record the state for later assertions
            state.append(pool.started)
            state.append(pool.joined)
            reactor.stop()

        reactor.callWhenRunning(f)
        self.runReactor(reactor, 2)

        self.assertTrue(state[0], "Pool should start after reactor.run")
        self.assertFalse(state[1], "Pool should not be joined before reactor.stop")
        self.assertTrue(pool.joined, "Pool should be stopped after reactor.run returns")

    def test_suggestThreadPoolSize(self):
        """
        C{reactor.suggestThreadPoolSize()} sets the maximum size of the reactor
        threadpool.
        """
        reactor = self.buildReactor()
        reactor.suggestThreadPoolSize(17)
        pool = reactor.getThreadPool()
        self.assertEqual(pool.max, 17)

    def test_delayedCallFromThread(self):
        """
        A function scheduled with L{IReactorThreads.callFromThread} invoked
        from a delayed call is run immediately in the next reactor iteration.

        When invoked from the reactor thread, previous implementations of
        L{IReactorThreads.callFromThread} would skip the pipe/socket based wake
        up step, assuming the reactor would wake up on its own.  However, this
        resulted in the reactor not noticing an insert into the thread queue at
        the right time (in this case, after the thread queue has been processed
        for that reactor iteration).
        """
        reactor = self.buildReactor()

        def threadCall():
            reactor.stop()

        # Set up the use of callFromThread being tested.
        reactor.callLater(0, reactor.callFromThread, threadCall)

        before = reactor.seconds()
        self.runReactor(reactor, 60)
        after = reactor.seconds()

        # We specified a timeout of 60 seconds.  The timeout code in runReactor
        # probably won't actually work, though.  If the reactor comes out of
        # the event notification API just a little bit early, say after 59.9999
        # seconds instead of after 60 seconds, then the queued thread call will
        # get processed but the timeout delayed call runReactor sets up won't!
        # Then the reactor will stop and runReactor will return without the
        # timeout firing.  As it turns out, select() and poll() are quite
        # likely to return *slightly* earlier than we ask them to, so the
        # timeout will rarely happen, even if callFromThread is broken.  So,
        # instead we'll measure the elapsed time and make sure it's something
        # less than about half of the timeout we specified.  This is heuristic.
        # It assumes that select() won't ever return after 30 seconds when we
        # asked it to timeout after 60 seconds.  And of course like all
        # time-based tests, it's slightly non-deterministic.  If the OS doesn't
        # schedule this process for 30 seconds, then the test might fail even
        # if callFromThread is working.
        self.assertTrue(after - before < 30)

    def test_callFromThread(self):
        """
        A function scheduled with L{IReactorThreads.callFromThread} invoked
        from another thread is run in the reactor thread.
        """
        reactor = self.buildReactor()
        result = []

        def threadCall():
            result.append(threading.current_thread())
            reactor.stop()

        reactor.callLater(0, reactor.callInThread, reactor.callFromThread, threadCall)
        self.runReactor(reactor, 5)

        self.assertEqual(result, [threading.current_thread()])

    def test_stopThreadPool(self):
        """
        When the reactor stops, L{ReactorBase._stopThreadPool} drops the
        reactor's direct reference to its internal threadpool and removes
        the associated startup and shutdown triggers.

        This is the case of the thread pool being created before the reactor
        is run.
        """
        reactor = self.buildReactor()
        threadpool = ref(reactor.getThreadPool())
        reactor.callWhenRunning(reactor.stop)
        self.runReactor(reactor)
        gc.collect()
        self.assertIsNone(threadpool())

    def test_stopThreadPoolWhenStartedAfterReactorRan(self):
        """
        We must handle the case of shutting down the thread pool when it was
        started after the reactor was run in a special way.

        Some implementation background: The thread pool is started with
        callWhenRunning, which only returns a system trigger ID when it is
        invoked before the reactor is started.

        This is the case of the thread pool being created after the reactor
        is started.
        """
        reactor = self.buildReactor()
        threadPoolRefs = []

        def acquireThreadPool():
            threadPoolRefs.append(ref(reactor.getThreadPool()))
            reactor.stop()

        reactor.callWhenRunning(acquireThreadPool)
        self.runReactor(reactor)
        gc.collect()
        self.assertIsNone(threadPoolRefs[0]())

    def test_cleanUpThreadPoolEvenBeforeReactorIsRun(self):
        """
        When the reactor has its shutdown event fired before it is run, the
        thread pool is completely destroyed.

        For what it's worth, the reason we support this behavior at all is
        because Trial does this.

        This is the case of the thread pool being created without the reactor
        being started at al.
        """
        reactor = self.buildReactor()
        threadPoolRef = ref(reactor.getThreadPool())
        reactor.fireSystemEvent("shutdown")
        gc.collect()
        self.assertIsNone(threadPoolRef())

    def test_isInIOThread(self):
        """
        The reactor registers itself as the I/O thread when it runs so that
        L{twisted.python.threadable.isInIOThread} returns C{True} if it is
        called in the thread the reactor is running in.
        """
        results = []
        reactor = self.buildReactor()

        def check():
            results.append(isInIOThread())
            reactor.stop()

        reactor.callWhenRunning(check)
        self.runReactor(reactor)
        self.assertEqual([True], results)

    def test_isNotInIOThread(self):
        """
        The reactor registers itself as the I/O thread when it runs so that
        L{twisted.python.threadable.isInIOThread} returns C{False} if it is
        called in a different thread than the reactor is running in.
        """
        results = []
        reactor = self.buildReactor()

        def check():
            results.append(isInIOThread())
            reactor.callFromThread(reactor.stop)

        reactor.callInThread(check)
        self.runReactor(reactor)
        self.assertEqual([False], results)

    def test_threadPoolCurrentThreadDeprecated(self):
        self.callDeprecated(
            version=(
                Version("Twisted", 22, 1, 0),
                "threading.current_thread",
            ),
            f=ThreadPool.currentThread,
        )


globals().update(ThreadTestsBuilder.makeTestCaseClasses())

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
fake_CAs Folder 0755
__init__.py File 112 B 0644
_posixifaces.py File 4.29 KB 0644
_win32ifaces.py File 3.88 KB 0644
connectionmixins.py File 19.71 KB 0644
fakeendpoint.py File 1.62 KB 0644
modulehelpers.py File 1.63 KB 0644
process_cli.py File 547 B 0644
process_connectionlost.py File 126 B 0644
process_gireactornocompat.py File 792 B 0644
process_helper.py File 1.22 KB 0644
reactormixins.py File 16.05 KB 0644
test_abstract.py File 2.15 KB 0644
test_address.py File 8.07 KB 0644
test_asyncioreactor.py File 9.7 KB 0644
test_base.py File 14.44 KB 0644
test_baseprocess.py File 2.53 KB 0644
test_cfreactor.py File 3.09 KB 0644
test_core.py File 10.86 KB 0644
test_default.py File 3.5 KB 0644
test_defer_await.py File 6.66 KB 0644
test_defer_yieldfrom.py File 4.79 KB 0644
test_endpoints.py File 147.25 KB 0644
test_epollreactor.py File 7.15 KB 0644
test_error.py File 1.08 KB 0644
test_fdset.py File 13.18 KB 0644
test_filedescriptor.py File 2.7 KB 0644
test_gireactor.py File 7.18 KB 0644
test_glibbase.py File 3.02 KB 0644
test_inlinecb.py File 11.21 KB 0644
test_inotify.py File 18.13 KB 0644
test_iocp.py File 7.52 KB 0644
test_kqueuereactor.py File 1.93 KB 0644
test_main.py File 1.32 KB 0644
test_newtls.py File 6.38 KB 0644
test_pollingfile.py File 1.28 KB 0644
test_posixbase.py File 11.56 KB 0644
test_posixprocess.py File 10.94 KB 0644
test_process.py File 44.6 KB 0644
test_protocol.py File 18.08 KB 0644
test_reactormixins.py File 1.93 KB 0644
test_resolver.py File 19.15 KB 0644
test_serialport.py File 1.94 KB 0644
test_sigchld.py File 3.25 KB 0644
test_socket.py File 9.22 KB 0644
test_stdio.py File 6.18 KB 0644
test_tcp.py File 105.7 KB 0644
test_testing.py File 16.49 KB 0644
test_threads.py File 7.94 KB 0644
test_time.py File 3.57 KB 0644
test_tls.py File 12.54 KB 0644
test_udp.py File 16.48 KB 0644
test_udp_internals.py File 4.98 KB 0644
test_unix.py File 34.04 KB 0644
test_win32events.py File 6.3 KB 0644
test_win32serialport.py File 5.18 KB 0644
Filemanager