__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.web.twcgi}.
"""

import json
import os
import sys
from io import BytesIO

from twisted.internet import address, error, interfaces, reactor
from twisted.internet.error import ConnectionLost
from twisted.python import failure, log, util
from twisted.trial import unittest
from twisted.web import client, http, http_headers, resource, server, twcgi
from twisted.web.http import INTERNAL_SERVER_ERROR, NOT_FOUND
from twisted.web.test._util import _render
from twisted.web.test.requesthelper import DummyChannel, DummyRequest

DUMMY_CGI = """\
print("Header: OK")
print("")
print("cgi output")
"""

DUAL_HEADER_CGI = """\
print("Header: spam")
print("Header: eggs")
print("")
print("cgi output")
"""

BROKEN_HEADER_CGI = """\
print("XYZ")
print("")
print("cgi output")
"""

SPECIAL_HEADER_CGI = """\
print("Server: monkeys")
print("Date: last year")
print("")
print("cgi output")
"""

READINPUT_CGI = """\
# This is an example of a correctly-written CGI script which reads a body
# from stdin, which only reads env['CONTENT_LENGTH'] bytes.

import os, sys

body_length = int(os.environ.get('CONTENT_LENGTH',0))
indata = sys.stdin.read(body_length)
print("Header: OK")
print("")
print("readinput ok")
"""

READALLINPUT_CGI = """\
# This is an example of the typical (incorrect) CGI script which expects
# the server to close stdin when the body of the request is complete.
# A correct CGI should only read env['CONTENT_LENGTH'] bytes.

import sys

indata = sys.stdin.read()
print("Header: OK")
print("")
print("readallinput ok")
"""

NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI = """\
print("content-type: text/cgi-duplicate-test")
print("")
print("cgi output")
"""

HEADER_OUTPUT_CGI = """\
import json
import os
print("")
print("")
vals = {x:y for x,y in os.environ.items() if x.startswith("HTTP_")}
print(json.dumps(vals))
"""

URL_PARAMETER_CGI = """\
import os
param = str(os.environ['QUERY_STRING'])
print("Header: OK")
print("")
print(param)
"""


class PythonScript(twcgi.FilteredScript):
    filter = sys.executable


class _StartServerAndTearDownMixin:
    def startServer(self, cgi):
        root = resource.Resource()
        cgipath = util.sibpath(__file__, cgi)
        root.putChild(b"cgi", PythonScript(cgipath))
        site = server.Site(root)
        self.p = reactor.listenTCP(0, site)
        return self.p.getHost().port

    def tearDown(self):
        if getattr(self, "p", None):
            return self.p.stopListening()

    def writeCGI(self, source):
        cgiFilename = os.path.abspath(self.mktemp())
        with open(cgiFilename, "wt") as cgiFile:
            cgiFile.write(source)
        return cgiFilename


class CGITests(_StartServerAndTearDownMixin, unittest.TestCase):
    """
    Tests for L{twcgi.FilteredScript}.
    """

    if not interfaces.IReactorProcess.providedBy(reactor):
        skip = "CGI tests require a functional reactor.spawnProcess()"

    def test_CGI(self):
        cgiFilename = self.writeCGI(DUMMY_CGI)

        portnum = self.startServer(cgiFilename)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")
        d = client.Agent(reactor).request(b"GET", url)
        d.addCallback(client.readBody)
        d.addCallback(self._testCGI_1)
        return d

    def _testCGI_1(self, res):
        self.assertEqual(res, b"cgi output" + os.linesep.encode("ascii"))

    def test_protectedServerAndDate(self):
        """
        If the CGI script emits a I{Server} or I{Date} header, these are
        ignored.
        """
        cgiFilename = self.writeCGI(SPECIAL_HEADER_CGI)

        portnum = self.startServer(cgiFilename)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")
        agent = client.Agent(reactor)
        d = agent.request(b"GET", url)
        d.addCallback(discardBody)

        def checkResponse(response):
            self.assertNotIn("monkeys", response.headers.getRawHeaders("server"))
            self.assertNotIn("last year", response.headers.getRawHeaders("date"))

        d.addCallback(checkResponse)
        return d

    def test_noDuplicateContentTypeHeaders(self):
        """
        If the CGI script emits a I{content-type} header, make sure that the
        server doesn't add an additional (duplicate) one, as per ticket 4786.
        """
        cgiFilename = self.writeCGI(NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI)

        portnum = self.startServer(cgiFilename)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")
        agent = client.Agent(reactor)
        d = agent.request(b"GET", url)
        d.addCallback(discardBody)

        def checkResponse(response):
            self.assertEqual(
                response.headers.getRawHeaders("content-type"),
                ["text/cgi-duplicate-test"],
            )
            return response

        d.addCallback(checkResponse)
        return d

    def test_noProxyPassthrough(self):
        """
        The CGI script is never called with the Proxy header passed through.
        """
        cgiFilename = self.writeCGI(HEADER_OUTPUT_CGI)

        portnum = self.startServer(cgiFilename)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")

        agent = client.Agent(reactor)

        headers = http_headers.Headers(
            {b"Proxy": [b"foo"], b"X-Innocent-Header": [b"bar"]}
        )
        d = agent.request(b"GET", url, headers=headers)

        def checkResponse(response):
            headers = json.loads(response.decode("ascii"))
            self.assertEqual(
                set(headers.keys()),
                {"HTTP_HOST", "HTTP_CONNECTION", "HTTP_X_INNOCENT_HEADER"},
            )

        d.addCallback(client.readBody)
        d.addCallback(checkResponse)
        return d

    def test_duplicateHeaderCGI(self):
        """
        If a CGI script emits two instances of the same header, both are sent
        in the response.
        """
        cgiFilename = self.writeCGI(DUAL_HEADER_CGI)

        portnum = self.startServer(cgiFilename)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")
        agent = client.Agent(reactor)
        d = agent.request(b"GET", url)
        d.addCallback(discardBody)

        def checkResponse(response):
            self.assertEqual(response.headers.getRawHeaders("header"), ["spam", "eggs"])

        d.addCallback(checkResponse)
        return d

    def test_malformedHeaderCGI(self):
        """
        Check for the error message in the duplicated header
        """
        cgiFilename = self.writeCGI(BROKEN_HEADER_CGI)

        portnum = self.startServer(cgiFilename)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")
        agent = client.Agent(reactor)
        d = agent.request(b"GET", url)
        d.addCallback(discardBody)
        loggedMessages = []

        def addMessage(eventDict):
            loggedMessages.append(log.textFromEventDict(eventDict))

        log.addObserver(addMessage)
        self.addCleanup(log.removeObserver, addMessage)

        def checkResponse(ignored):
            self.assertIn(
                "ignoring malformed CGI header: " + repr(b"XYZ"), loggedMessages
            )

        d.addCallback(checkResponse)
        return d

    def test_ReadEmptyInput(self):
        cgiFilename = os.path.abspath(self.mktemp())
        with open(cgiFilename, "wt") as cgiFile:
            cgiFile.write(READINPUT_CGI)

        portnum = self.startServer(cgiFilename)
        agent = client.Agent(reactor)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")
        d = agent.request(b"GET", url)
        d.addCallback(client.readBody)
        d.addCallback(self._test_ReadEmptyInput_1)
        return d

    test_ReadEmptyInput.timeout = 5  # type: ignore[attr-defined]

    def _test_ReadEmptyInput_1(self, res):
        expected = f"readinput ok{os.linesep}"
        expected = expected.encode("ascii")
        self.assertEqual(res, expected)

    def test_ReadInput(self):
        cgiFilename = os.path.abspath(self.mktemp())
        with open(cgiFilename, "wt") as cgiFile:
            cgiFile.write(READINPUT_CGI)

        portnum = self.startServer(cgiFilename)
        agent = client.Agent(reactor)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")
        d = agent.request(
            uri=url,
            method=b"POST",
            bodyProducer=client.FileBodyProducer(BytesIO(b"Here is your stdin")),
        )
        d.addCallback(client.readBody)
        d.addCallback(self._test_ReadInput_1)
        return d

    test_ReadInput.timeout = 5  # type: ignore[attr-defined]

    def _test_ReadInput_1(self, res):
        expected = f"readinput ok{os.linesep}"
        expected = expected.encode("ascii")
        self.assertEqual(res, expected)

    def test_ReadAllInput(self):
        cgiFilename = os.path.abspath(self.mktemp())
        with open(cgiFilename, "wt") as cgiFile:
            cgiFile.write(READALLINPUT_CGI)

        portnum = self.startServer(cgiFilename)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")
        d = client.Agent(reactor).request(
            uri=url,
            method=b"POST",
            bodyProducer=client.FileBodyProducer(BytesIO(b"Here is your stdin")),
        )
        d.addCallback(client.readBody)
        d.addCallback(self._test_ReadAllInput_1)
        return d

    test_ReadAllInput.timeout = 5  # type: ignore[attr-defined]

    def _test_ReadAllInput_1(self, res):
        expected = f"readallinput ok{os.linesep}"
        expected = expected.encode("ascii")
        self.assertEqual(res, expected)

    def test_useReactorArgument(self):
        """
        L{twcgi.FilteredScript.runProcess} uses the reactor passed as an
        argument to the constructor.
        """

        class FakeReactor:
            """
            A fake reactor recording whether spawnProcess is called.
            """

            called = False

            def spawnProcess(self, *args, **kwargs):
                """
                Set the C{called} flag to C{True} if C{spawnProcess} is called.

                @param args: Positional arguments.
                @param kwargs: Keyword arguments.
                """
                self.called = True

        fakeReactor = FakeReactor()
        request = DummyRequest(["a", "b"])
        request.client = address.IPv4Address("TCP", "127.0.0.1", 12345)
        resource = twcgi.FilteredScript("dummy-file", reactor=fakeReactor)
        _render(resource, request)

        self.assertTrue(fakeReactor.called)


class CGIScriptTests(_StartServerAndTearDownMixin, unittest.TestCase):
    """
    Tests for L{twcgi.CGIScript}.
    """

    def test_urlParameters(self):
        """
        If the CGI script is passed URL parameters, do not fall over,
        as per ticket 9887.
        """
        cgiFilename = self.writeCGI(URL_PARAMETER_CGI)
        portnum = self.startServer(cgiFilename)
        url = b"http://localhost:%d/cgi?param=1234" % (portnum,)
        agent = client.Agent(reactor)
        d = agent.request(b"GET", url)
        d.addCallback(client.readBody)
        d.addCallback(self._test_urlParameters_1)
        return d

    def _test_urlParameters_1(self, res):
        expected = f"param=1234{os.linesep}"
        expected = expected.encode("ascii")
        self.assertEqual(res, expected)

    def test_pathInfo(self):
        """
        L{twcgi.CGIScript.render} sets the process environment
        I{PATH_INFO} from the request path.
        """

        class FakeReactor:
            """
            A fake reactor recording the environment passed to spawnProcess.
            """

            def spawnProcess(self, process, filename, args, env, wdir):
                """
                Store the C{env} L{dict} to an instance attribute.

                @param process: Ignored
                @param filename: Ignored
                @param args: Ignored
                @param env: The environment L{dict} which will be stored
                @param wdir: Ignored
                """
                self.process_env = env

        _reactor = FakeReactor()
        resource = twcgi.CGIScript(self.mktemp(), reactor=_reactor)
        request = DummyRequest(["a", "b"])
        request.client = address.IPv4Address("TCP", "127.0.0.1", 12345)
        _render(resource, request)

        self.assertEqual(_reactor.process_env["PATH_INFO"], "/a/b")


class CGIDirectoryTests(unittest.TestCase):
    """
    Tests for L{twcgi.CGIDirectory}.
    """

    def test_render(self):
        """
        L{twcgi.CGIDirectory.render} sets the HTTP response code to I{NOT
        FOUND}.
        """
        resource = twcgi.CGIDirectory(self.mktemp())
        request = DummyRequest([""])
        d = _render(resource, request)

        def cbRendered(ignored):
            self.assertEqual(request.responseCode, NOT_FOUND)

        d.addCallback(cbRendered)
        return d

    def test_notFoundChild(self):
        """
        L{twcgi.CGIDirectory.getChild} returns a resource which renders an
        response with the HTTP I{NOT FOUND} status code if the indicated child
        does not exist as an entry in the directory used to initialized the
        L{twcgi.CGIDirectory}.
        """
        path = self.mktemp()
        os.makedirs(path)
        resource = twcgi.CGIDirectory(path)
        request = DummyRequest(["foo"])
        child = resource.getChild("foo", request)
        d = _render(child, request)

        def cbRendered(ignored):
            self.assertEqual(request.responseCode, NOT_FOUND)

        d.addCallback(cbRendered)
        return d


class CGIProcessProtocolTests(unittest.TestCase):
    """
    Tests for L{twcgi.CGIProcessProtocol}.
    """

    def test_prematureEndOfHeaders(self):
        """
        If the process communicating with L{CGIProcessProtocol} ends before
        finishing writing out headers, the response has I{INTERNAL SERVER
        ERROR} as its status code.
        """
        request = DummyRequest([""])
        protocol = twcgi.CGIProcessProtocol(request)
        protocol.processEnded(failure.Failure(error.ProcessTerminated()))
        self.assertEqual(request.responseCode, INTERNAL_SERVER_ERROR)

    def test_connectionLost(self):
        """
        Ensure that the CGI process ends cleanly when the request connection
        is lost.
        """
        d = DummyChannel()
        request = http.Request(d, True)
        protocol = twcgi.CGIProcessProtocol(request)
        request.connectionLost(failure.Failure(ConnectionLost("Connection done")))
        protocol.processEnded(failure.Failure(error.ProcessTerminated()))


def discardBody(response):
    """
    Discard the body of a HTTP response.

    @param response: The response.

    @return: The response.
    """
    return client.readBody(response).addCallback(lambda _: response)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 107 B 0644
_util.py File 3.26 KB 0644
injectionhelpers.py File 5.46 KB 0644
requesthelper.py File 15.07 KB 0644
test_agent.py File 119.75 KB 0644
test_cgi.py File 14.76 KB 0644
test_client.py File 1.52 KB 0644
test_distrib.py File 17.58 KB 0644
test_domhelpers.py File 11.18 KB 0644
test_error.py File 16.37 KB 0644
test_flatten.py File 25.67 KB 0644
test_html.py File 1.21 KB 0644
test_http.py File 153.63 KB 0644
test_http2.py File 105.17 KB 0644
test_http_headers.py File 24.38 KB 0644
test_httpauth.py File 23.23 KB 0644
test_newclient.py File 106.8 KB 0644
test_pages.py File 3.56 KB 0644
test_proxy.py File 19.57 KB 0644
test_resource.py File 10.37 KB 0644
test_script.py File 3.91 KB 0644
test_soap.py File 3.04 KB 0644
test_stan.py File 7.08 KB 0644
test_static.py File 66.6 KB 0644
test_tap.py File 11.86 KB 0644
test_template.py File 28.38 KB 0644
test_util.py File 14.76 KB 0644
test_vhost.py File 7.49 KB 0644
test_web.py File 67.52 KB 0644
test_web__responses.py File 837 B 0644
test_webclient.py File 11.52 KB 0644
test_wsgi.py File 73.83 KB 0644
test_xml.py File 42.28 KB 0644
test_xmlrpc.py File 29.85 KB 0644
Filemanager