__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# -*- test-case-name: twisted.logger.test.test_observer -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Basic log observers.
"""

from typing import Callable, Optional

from zope.interface import implementer

from twisted.python.failure import Failure
from ._interfaces import ILogObserver, LogEvent
from ._logger import Logger

OBSERVER_DISABLED = (
    "Temporarily disabling observer {observer} due to exception: {log_failure}"
)


@implementer(ILogObserver)
class LogPublisher:
    """
    I{ILogObserver} that fans out events to other observers.

    Keeps track of a set of L{ILogObserver} objects and forwards
    events to each.
    """

    def __init__(self, *observers: ILogObserver) -> None:
        self._observers = list(observers)
        self.log = Logger(observer=self)

    def addObserver(self, observer: ILogObserver) -> None:
        """
        Registers an observer with this publisher.

        @param observer: An L{ILogObserver} to add.
        """
        if not callable(observer):
            raise TypeError(f"Observer is not callable: {observer!r}")
        if observer not in self._observers:
            self._observers.append(observer)

    def removeObserver(self, observer: ILogObserver) -> None:
        """
        Unregisters an observer with this publisher.

        @param observer: An L{ILogObserver} to remove.
        """
        try:
            self._observers.remove(observer)
        except ValueError:
            pass

    def __call__(self, event: LogEvent) -> None:
        """
        Forward events to contained observers.
        """
        if "log_trace" not in event:
            trace: Optional[Callable[[ILogObserver], None]] = None

        else:

            def trace(observer: ILogObserver) -> None:
                """
                Add tracing information for an observer.

                @param observer: an observer being forwarded to
                """
                event["log_trace"].append((self, observer))

        brokenObservers = []

        for observer in self._observers:
            if trace is not None:
                trace(observer)

            try:
                observer(event)
            except Exception:
                brokenObservers.append((observer, Failure()))

        for brokenObserver, failure in brokenObservers:
            errorLogger = self._errorLoggerForObserver(brokenObserver)
            errorLogger.failure(
                OBSERVER_DISABLED,
                failure=failure,
                observer=brokenObserver,
            )

    def _errorLoggerForObserver(self, observer: ILogObserver) -> Logger:
        """
        Create an error-logger based on this logger, which does not contain the
        given bad observer.

        @param observer: The observer which previously had an error.

        @return: A L{Logger} without the given observer.
        """
        errorPublisher = LogPublisher(
            *(obs for obs in self._observers if obs is not observer)
        )
        return Logger(observer=errorPublisher)


@implementer(ILogObserver)
def bitbucketLogObserver(event: LogEvent) -> None:
    """
    I{ILogObserver} that does nothing with the events it sees.
    """

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
test Folder 0755
__init__.py File 3.29 KB 0644
_buffer.py File 1.49 KB 0644
_capture.py File 624 B 0644
_file.py File 2.28 KB 0644
_filter.py File 6.71 KB 0644
_flatten.py File 4.88 KB 0644
_format.py File 13.16 KB 0644
_global.py File 8.43 KB 0644
_interfaces.py File 2.29 KB 0644
_io.py File 4.44 KB 0644
_json.py File 8.21 KB 0644
_legacy.py File 5.12 KB 0644
_levels.py File 2.89 KB 0644
_logger.py File 9.75 KB 0644
_observer.py File 3.17 KB 0644
_stdlib.py File 4.42 KB 0644
_util.py File 1.34 KB 0644
Filemanager