__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorSocket}.

Generally only tests for failure cases are found here.  Success cases for
this interface are tested elsewhere.  For example, the success case for
I{AF_INET} is in L{twisted.internet.test.test_tcp}, since that case should
behave exactly the same as L{IReactorTCP.listenTCP}.
"""

import errno
import socket

from zope.interface import verify

from twisted.internet.error import UnsupportedAddressFamily
from twisted.internet.interfaces import IReactorSocket
from twisted.internet.protocol import DatagramProtocol, ServerFactory
from twisted.internet.test.reactormixins import ReactorBuilder, needsRunningReactor
from twisted.python.log import err
from twisted.python.runtime import platform


class IReactorSocketVerificationTestsBuilder(ReactorBuilder):
    """
    Builder for testing L{IReactorSocket} implementations for required
    methods and method signatures.

    L{ReactorBuilder} already runs L{IReactorSocket.providedBy} to
    ensure that these tests will only be run on reactor classes that
    claim to implement L{IReactorSocket}.

    These tests ensure that reactors which claim to provide the
    L{IReactorSocket} interface actually have all the required methods
    and that those methods have the expected number of arguments.

    These tests will be skipped for reactors which do not claim to
    provide L{IReactorSocket}.
    """

    requiredInterfaces = [IReactorSocket]

    def test_provider(self):
        """
        The reactor instance returned by C{buildReactor} provides
        L{IReactorSocket}.
        """
        reactor = self.buildReactor()
        self.assertTrue(verify.verifyObject(IReactorSocket, reactor))


class AdoptStreamPortErrorsTestsBuilder(ReactorBuilder):
    """
    Builder for testing L{IReactorSocket.adoptStreamPort} implementations.

    Generally only tests for failure cases are found here.  Success cases for
    this interface are tested elsewhere.  For example, the success case for
    I{AF_INET} is in L{twisted.internet.test.test_tcp}, since that case should
    behave exactly the same as L{IReactorTCP.listenTCP}.
    """

    requiredInterfaces = [IReactorSocket]

    def test_invalidDescriptor(self):
        """
        An implementation of L{IReactorSocket.adoptStreamPort} raises
        L{socket.error} if passed an integer which is not associated with a
        socket.
        """
        reactor = self.buildReactor()

        probe = socket.socket()
        fileno = probe.fileno()
        probe.close()

        exc = self.assertRaises(
            socket.error,
            reactor.adoptStreamPort,
            fileno,
            socket.AF_INET,
            ServerFactory(),
        )
        if platform.isWindows():
            self.assertEqual(exc.args[0], errno.WSAENOTSOCK)
        else:
            self.assertEqual(exc.args[0], errno.EBADF)

    def test_invalidAddressFamily(self):
        """
        An implementation of L{IReactorSocket.adoptStreamPort} raises
        L{UnsupportedAddressFamily} if passed an address family it does not
        support.
        """
        reactor = self.buildReactor()

        port = socket.socket()
        port.bind(("127.0.0.1", 0))
        port.listen(1)
        self.addCleanup(port.close)

        arbitrary = 2**16 + 7

        self.assertRaises(
            UnsupportedAddressFamily,
            reactor.adoptStreamPort,
            port.fileno(),
            arbitrary,
            ServerFactory(),
        )

    def test_stopOnlyCloses(self):
        """
        When the L{IListeningPort} returned by
        L{IReactorSocket.adoptStreamPort} is stopped using
        C{stopListening}, the underlying socket is closed but not
        shutdown.  This allows another process which still has a
        reference to it to continue accepting connections over it.
        """
        reactor = self.buildReactor()

        portSocket = socket.socket()
        self.addCleanup(portSocket.close)

        portSocket.bind(("127.0.0.1", 0))
        portSocket.listen(1)
        portSocket.setblocking(False)

        # The file descriptor is duplicated by adoptStreamPort
        port = reactor.adoptStreamPort(
            portSocket.fileno(), portSocket.family, ServerFactory()
        )
        d = port.stopListening()

        def stopped(ignored):
            # Should still be possible to accept a connection on
            # portSocket.  If it was shutdown, the exception would be
            # EINVAL instead.
            exc = self.assertRaises(socket.error, portSocket.accept)
            if platform.isWindows():
                self.assertEqual(exc.args[0], errno.WSAEWOULDBLOCK)
            else:
                self.assertEqual(exc.args[0], errno.EAGAIN)

        d.addCallback(stopped)
        d.addErrback(err, "Failed to accept on original port.")

        needsRunningReactor(
            reactor, lambda: d.addCallback(lambda ignored: reactor.stop())
        )

        reactor.run()


class AdoptStreamConnectionErrorsTestsBuilder(ReactorBuilder):
    """
    Builder for testing L{IReactorSocket.adoptStreamConnection}
    implementations.

    Generally only tests for failure cases are found here.  Success cases for
    this interface are tested elsewhere.  For example, the success case for
    I{AF_INET} is in L{twisted.internet.test.test_tcp}, since that case should
    behave exactly the same as L{IReactorTCP.listenTCP}.
    """

    requiredInterfaces = [IReactorSocket]

    def test_invalidAddressFamily(self):
        """
        An implementation of L{IReactorSocket.adoptStreamConnection} raises
        L{UnsupportedAddressFamily} if passed an address family it does not
        support.
        """
        reactor = self.buildReactor()

        connection = socket.socket()
        self.addCleanup(connection.close)

        arbitrary = 2**16 + 7

        self.assertRaises(
            UnsupportedAddressFamily,
            reactor.adoptStreamConnection,
            connection.fileno(),
            arbitrary,
            ServerFactory(),
        )


class AdoptDatagramPortErrorsTestsBuilder(ReactorBuilder):
    """
    Builder for testing L{IReactorSocket.adoptDatagramPort} implementations.
    """

    requiredInterfaces = [IReactorSocket]

    def test_invalidDescriptor(self):
        """
        An implementation of L{IReactorSocket.adoptDatagramPort} raises
        L{socket.error} if passed an integer which is not associated with a
        socket.
        """
        reactor = self.buildReactor()

        probe = socket.socket()
        fileno = probe.fileno()
        probe.close()

        exc = self.assertRaises(
            socket.error,
            reactor.adoptDatagramPort,
            fileno,
            socket.AF_INET,
            DatagramProtocol(),
        )
        if platform.isWindows():
            self.assertEqual(exc.args[0], errno.WSAENOTSOCK)
        else:
            self.assertEqual(exc.args[0], errno.EBADF)

    def test_invalidAddressFamily(self):
        """
        An implementation of L{IReactorSocket.adoptDatagramPort} raises
        L{UnsupportedAddressFamily} if passed an address family it does not
        support.
        """
        reactor = self.buildReactor()

        port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.addCleanup(port.close)

        arbitrary = 2**16 + 7

        self.assertRaises(
            UnsupportedAddressFamily,
            reactor.adoptDatagramPort,
            port.fileno(),
            arbitrary,
            DatagramProtocol(),
        )

    def test_stopOnlyCloses(self):
        """
        When the L{IListeningPort} returned by
        L{IReactorSocket.adoptDatagramPort} is stopped using
        C{stopListening}, the underlying socket is closed but not
        shutdown.  This allows another process which still has a
        reference to it to continue reading and writing to it.
        """
        reactor = self.buildReactor()

        portSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.addCleanup(portSocket.close)

        portSocket.bind(("127.0.0.1", 0))
        portSocket.setblocking(False)

        # The file descriptor is duplicated by adoptDatagramPort
        port = reactor.adoptDatagramPort(
            portSocket.fileno(), portSocket.family, DatagramProtocol()
        )
        d = port.stopListening()

        def stopped(ignored):
            # Should still be possible to recv on portSocket.  If
            # it was shutdown, the exception would be EINVAL instead.
            exc = self.assertRaises(socket.error, portSocket.recvfrom, 1)
            if platform.isWindows():
                self.assertEqual(exc.args[0], errno.WSAEWOULDBLOCK)
            else:
                self.assertEqual(exc.args[0], errno.EAGAIN)

        d.addCallback(stopped)
        d.addErrback(err, "Failed to read on original port.")

        needsRunningReactor(
            reactor, lambda: d.addCallback(lambda ignored: reactor.stop())
        )

        reactor.run()


globals().update(IReactorSocketVerificationTestsBuilder.makeTestCaseClasses())
globals().update(AdoptStreamPortErrorsTestsBuilder.makeTestCaseClasses())
globals().update(AdoptStreamConnectionErrorsTestsBuilder.makeTestCaseClasses())
globals().update(AdoptDatagramPortErrorsTestsBuilder.makeTestCaseClasses())

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
fake_CAs Folder 0755
__init__.py File 112 B 0644
_posixifaces.py File 4.29 KB 0644
_win32ifaces.py File 3.88 KB 0644
connectionmixins.py File 19.71 KB 0644
fakeendpoint.py File 1.62 KB 0644
modulehelpers.py File 1.63 KB 0644
process_cli.py File 547 B 0644
process_connectionlost.py File 126 B 0644
process_gireactornocompat.py File 792 B 0644
process_helper.py File 1.22 KB 0644
reactormixins.py File 16.05 KB 0644
test_abstract.py File 2.15 KB 0644
test_address.py File 8.07 KB 0644
test_asyncioreactor.py File 9.7 KB 0644
test_base.py File 14.44 KB 0644
test_baseprocess.py File 2.53 KB 0644
test_cfreactor.py File 3.09 KB 0644
test_core.py File 10.86 KB 0644
test_default.py File 3.5 KB 0644
test_defer_await.py File 6.66 KB 0644
test_defer_yieldfrom.py File 4.79 KB 0644
test_endpoints.py File 147.25 KB 0644
test_epollreactor.py File 7.15 KB 0644
test_error.py File 1.08 KB 0644
test_fdset.py File 13.18 KB 0644
test_filedescriptor.py File 2.7 KB 0644
test_gireactor.py File 7.18 KB 0644
test_glibbase.py File 3.02 KB 0644
test_inlinecb.py File 11.21 KB 0644
test_inotify.py File 18.13 KB 0644
test_iocp.py File 7.52 KB 0644
test_kqueuereactor.py File 1.93 KB 0644
test_main.py File 1.32 KB 0644
test_newtls.py File 6.38 KB 0644
test_pollingfile.py File 1.28 KB 0644
test_posixbase.py File 11.56 KB 0644
test_posixprocess.py File 10.94 KB 0644
test_process.py File 44.6 KB 0644
test_protocol.py File 18.08 KB 0644
test_reactormixins.py File 1.93 KB 0644
test_resolver.py File 19.15 KB 0644
test_serialport.py File 1.94 KB 0644
test_sigchld.py File 3.25 KB 0644
test_socket.py File 9.22 KB 0644
test_stdio.py File 6.18 KB 0644
test_tcp.py File 105.7 KB 0644
test_testing.py File 16.49 KB 0644
test_threads.py File 7.94 KB 0644
test_time.py File 3.57 KB 0644
test_tls.py File 12.54 KB 0644
test_udp.py File 16.48 KB 0644
test_udp_internals.py File 4.98 KB 0644
test_unix.py File 34.04 KB 0644
test_win32events.py File 6.3 KB 0644
test_win32serialport.py File 5.18 KB 0644
Filemanager