__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# -*- test-case-name: twisted.protocols.haproxy.test.test_v1parser -*-

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
IProxyParser implementation for version one of the PROXY protocol.
"""
from typing import Tuple, Union

from zope.interface import implementer

from twisted.internet import address
from . import _info, _interfaces
from ._exceptions import (
    InvalidNetworkProtocol,
    InvalidProxyHeader,
    MissingAddressData,
    convertError,
)


@implementer(_interfaces.IProxyParser)
class V1Parser:
    """
    PROXY protocol version one header parser.

    Version one of the PROXY protocol is a human readable format represented
    by a single, newline delimited binary string that contains all of the
    relevant source and destination data.
    """

    PROXYSTR = b"PROXY"
    UNKNOWN_PROTO = b"UNKNOWN"
    TCP4_PROTO = b"TCP4"
    TCP6_PROTO = b"TCP6"
    ALLOWED_NET_PROTOS = (
        TCP4_PROTO,
        TCP6_PROTO,
        UNKNOWN_PROTO,
    )
    NEWLINE = b"\r\n"

    def __init__(self) -> None:
        self.buffer = b""

    def feed(
        self, data: bytes
    ) -> Union[Tuple[_info.ProxyInfo, bytes], Tuple[None, None]]:
        """
        Consume a chunk of data and attempt to parse it.

        @param data: A bytestring.
        @type data: L{bytes}

        @return: A two-tuple containing, in order, a
            L{_interfaces.IProxyInfo} and any bytes fed to the
            parser that followed the end of the header.  Both of these values
            are None until a complete header is parsed.

        @raises InvalidProxyHeader: If the bytes fed to the parser create an
            invalid PROXY header.
        """
        self.buffer += data
        if len(self.buffer) > 107 and self.NEWLINE not in self.buffer:
            raise InvalidProxyHeader()
        lines = (self.buffer).split(self.NEWLINE, 1)
        if not len(lines) > 1:
            return (None, None)
        self.buffer = b""
        remaining = lines.pop()
        header = lines.pop()
        info = self.parse(header)
        return (info, remaining)

    @classmethod
    def parse(cls, line: bytes) -> _info.ProxyInfo:
        """
        Parse a bytestring as a full PROXY protocol header line.

        @param line: A bytestring that represents a valid HAProxy PROXY
            protocol header line.
        @type line: bytes

        @return: A L{_interfaces.IProxyInfo} containing the parsed data.

        @raises InvalidProxyHeader: If the bytestring does not represent a
            valid PROXY header.

        @raises InvalidNetworkProtocol: When no protocol can be parsed or is
            not one of the allowed values.

        @raises MissingAddressData: When the protocol is TCP* but the header
            does not contain a complete set of addresses and ports.
        """
        originalLine = line
        proxyStr = None
        networkProtocol = None
        sourceAddr = None
        sourcePort = None
        destAddr = None
        destPort = None

        with convertError(ValueError, InvalidProxyHeader):
            proxyStr, line = line.split(b" ", 1)

        if proxyStr != cls.PROXYSTR:
            raise InvalidProxyHeader()

        with convertError(ValueError, InvalidNetworkProtocol):
            networkProtocol, line = line.split(b" ", 1)

        if networkProtocol not in cls.ALLOWED_NET_PROTOS:
            raise InvalidNetworkProtocol()

        if networkProtocol == cls.UNKNOWN_PROTO:
            return _info.ProxyInfo(originalLine, None, None)

        with convertError(ValueError, MissingAddressData):
            sourceAddr, line = line.split(b" ", 1)

        with convertError(ValueError, MissingAddressData):
            destAddr, line = line.split(b" ", 1)

        with convertError(ValueError, MissingAddressData):
            sourcePort, line = line.split(b" ", 1)

        with convertError(ValueError, MissingAddressData):
            destPort = line.split(b" ")[0]

        if networkProtocol == cls.TCP4_PROTO:
            return _info.ProxyInfo(
                originalLine,
                address.IPv4Address("TCP", sourceAddr.decode(), int(sourcePort)),
                address.IPv4Address("TCP", destAddr.decode(), int(destPort)),
            )

        return _info.ProxyInfo(
            originalLine,
            address.IPv6Address("TCP", sourceAddr.decode(), int(sourcePort)),
            address.IPv6Address("TCP", destAddr.decode(), int(destPort)),
        )

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
test Folder 0755
__init__.py File 243 B 0644
_exceptions.py File 1.15 KB 0644
_info.py File 917 B 0644
_interfaces.py File 1.85 KB 0644
_parser.py File 2.1 KB 0644
_v1parser.py File 4.39 KB 0644
_v2parser.py File 6.47 KB 0644
_wrapper.py File 3.59 KB 0644
Filemanager