__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
GObject Introspection reactor tests; i.e. `gireactor` module for gio/glib/gtk
integration.
"""
from __future__ import annotations

from unittest import skipIf

try:
    from gi.repository import Gio
except ImportError:
    giImported = False
    gtkVersion = None
else:
    giImported = True
    # If we can import Gio, we ought to be able to import our reactor.
    from os import environ

    from gi import get_required_version, require_version

    from twisted.internet import gireactor

    def requireEach(someVersion: str) -> str:
        try:
            require_version("Gtk", someVersion)
        except ValueError as ve:
            return str(ve)
        else:
            return ""

    errorMessage = ", ".join(
        requireEach(version)
        for version in environ.get("TWISTED_TEST_GTK_VERSION", "4.0,3.0").split(",")
    )

    actualVersion = get_required_version("Gtk")
    gtkVersion = actualVersion if actualVersion is not None else errorMessage


from twisted.internet.error import ReactorAlreadyRunning
from twisted.internet.test.reactormixins import ReactorBuilder
from twisted.trial.unittest import SkipTest, TestCase

# Skip all tests if gi is unavailable:
if not giImported:
    skip = "GObject Introspection `gi` module not importable"

noGtkSkip = (gtkVersion is None) or (gtkVersion not in ("3.0", "4.0"))
noGtkMessage = f"Unknown GTK version: {repr(gtkVersion)}"

if not noGtkSkip:
    from gi.repository import Gtk


class GApplicationRegistrationTests(ReactorBuilder, TestCase):
    """
    GtkApplication and GApplication are supported by
    L{twisted.internet.gtk3reactor} and L{twisted.internet.gireactor}.

    We inherit from L{ReactorBuilder} in order to use some of its
    reactor-running infrastructure, but don't need its test-creation
    functionality.
    """

    def runReactor(  # type: ignore[override]
        self,
        app: Gio.Application,
        reactor: gireactor.GIReactor,
    ) -> None:
        """
        Register the app, run the reactor, make sure app was activated, and
        that reactor was running, and that reactor can be stopped.
        """
        if not hasattr(app, "quit"):
            raise SkipTest("Version of PyGObject is too old.")

        result = []

        def stop() -> None:
            result.append("stopped")
            reactor.stop()

        def activate(widget: object) -> None:
            result.append("activated")
            reactor.callLater(0, stop)

        app.connect("activate", activate)

        # We want reactor.stop() to *always* stop the event loop, even if
        # someone has called hold() on the application and never done the
        # corresponding release() -- for more details see
        # http://developer.gnome.org/gio/unstable/GApplication.html.
        app.hold()

        reactor.registerGApplication(app)
        ReactorBuilder.runReactor(self, reactor)
        self.assertEqual(result, ["activated", "stopped"])

    def test_gApplicationActivate(self) -> None:
        """
        L{Gio.Application} instances can be registered with a gireactor.
        """
        self.reactorFactory = lambda: gireactor.GIReactor(useGtk=False)
        reactor = self.buildReactor()
        app = Gio.Application(
            application_id="com.twistedmatrix.trial.gireactor",
            flags=Gio.ApplicationFlags.FLAGS_NONE,
        )

        self.runReactor(app, reactor)

    @skipIf(noGtkSkip, noGtkMessage)
    def test_gtkAliases(self) -> None:
        """
        L{twisted.internet.gtk3reactor} is now just a set of compatibility
        aliases for L{twisted.internet.GIReactor}.
        """
        from twisted.internet.gtk3reactor import (
            Gtk3Reactor,
            PortableGtk3Reactor,
            install,
        )

        self.assertIs(Gtk3Reactor, gireactor.GIReactor)
        self.assertIs(PortableGtk3Reactor, gireactor.PortableGIReactor)
        self.assertIs(install, gireactor.install)
        warnings = self.flushWarnings()
        self.assertEqual(len(warnings), 1)
        self.assertIn(
            "twisted.internet.gtk3reactor was deprecated", warnings[0]["message"]
        )

    @skipIf(noGtkSkip, noGtkMessage)
    def test_gtkApplicationActivate(self) -> None:
        """
        L{Gtk.Application} instances can be registered with a gtk3reactor.
        """
        self.reactorFactory = gireactor.GIReactor
        reactor = self.buildReactor()
        app = Gtk.Application(
            application_id="com.twistedmatrix.trial.gtk3reactor",
            flags=Gio.ApplicationFlags.FLAGS_NONE,
        )
        self.runReactor(app, reactor)

    def test_portable(self) -> None:
        """
        L{gireactor.PortableGIReactor} doesn't support application
        registration at this time.
        """
        self.reactorFactory = gireactor.PortableGIReactor
        reactor = self.buildReactor()
        app = Gio.Application(
            application_id="com.twistedmatrix.trial.gireactor",
            flags=Gio.ApplicationFlags.FLAGS_NONE,
        )
        self.assertRaises(NotImplementedError, reactor.registerGApplication, app)

    def test_noQuit(self) -> None:
        """
        Older versions of PyGObject lack C{Application.quit}, and so won't
        allow registration.
        """
        self.reactorFactory = lambda: gireactor.GIReactor(useGtk=False)
        reactor = self.buildReactor()
        # An app with no "quit" method:
        app = object()
        exc = self.assertRaises(RuntimeError, reactor.registerGApplication, app)
        self.assertTrue(exc.args[0].startswith("Application registration is not"))

    def test_cantRegisterAfterRun(self) -> None:
        """
        It is not possible to register a C{Application} after the reactor has
        already started.
        """
        self.reactorFactory = lambda: gireactor.GIReactor(useGtk=False)
        reactor = self.buildReactor()
        app = Gio.Application(
            application_id="com.twistedmatrix.trial.gireactor",
            flags=Gio.ApplicationFlags.FLAGS_NONE,
        )

        def tryRegister() -> None:
            exc = self.assertRaises(
                ReactorAlreadyRunning, reactor.registerGApplication, app
            )
            self.assertEqual(
                exc.args[0], "Can't register application after reactor was started."
            )
            reactor.stop()

        reactor.callLater(0, tryRegister)
        ReactorBuilder.runReactor(self, reactor)

    def test_cantRegisterTwice(self) -> None:
        """
        It is not possible to register more than one C{Application}.
        """
        self.reactorFactory = lambda: gireactor.GIReactor(useGtk=False)
        reactor = self.buildReactor()
        app = Gio.Application(
            application_id="com.twistedmatrix.trial.gireactor",
            flags=Gio.ApplicationFlags.FLAGS_NONE,
        )
        reactor.registerGApplication(app)
        app2 = Gio.Application(
            application_id="com.twistedmatrix.trial.gireactor2",
            flags=Gio.ApplicationFlags.FLAGS_NONE,
        )
        exc = self.assertRaises(RuntimeError, reactor.registerGApplication, app2)
        self.assertEqual(
            exc.args[0], "Can't register more than one application instance."
        )

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
fake_CAs Folder 0755
__init__.py File 112 B 0644
_posixifaces.py File 4.29 KB 0644
_win32ifaces.py File 3.88 KB 0644
connectionmixins.py File 19.71 KB 0644
fakeendpoint.py File 1.62 KB 0644
modulehelpers.py File 1.63 KB 0644
process_cli.py File 547 B 0644
process_connectionlost.py File 126 B 0644
process_gireactornocompat.py File 792 B 0644
process_helper.py File 1.22 KB 0644
reactormixins.py File 16.05 KB 0644
test_abstract.py File 2.15 KB 0644
test_address.py File 8.07 KB 0644
test_asyncioreactor.py File 9.7 KB 0644
test_base.py File 14.44 KB 0644
test_baseprocess.py File 2.53 KB 0644
test_cfreactor.py File 3.09 KB 0644
test_core.py File 10.86 KB 0644
test_default.py File 3.5 KB 0644
test_defer_await.py File 6.66 KB 0644
test_defer_yieldfrom.py File 4.79 KB 0644
test_endpoints.py File 147.25 KB 0644
test_epollreactor.py File 7.15 KB 0644
test_error.py File 1.08 KB 0644
test_fdset.py File 13.18 KB 0644
test_filedescriptor.py File 2.7 KB 0644
test_gireactor.py File 7.18 KB 0644
test_glibbase.py File 3.02 KB 0644
test_inlinecb.py File 11.21 KB 0644
test_inotify.py File 18.13 KB 0644
test_iocp.py File 7.52 KB 0644
test_kqueuereactor.py File 1.93 KB 0644
test_main.py File 1.32 KB 0644
test_newtls.py File 6.38 KB 0644
test_pollingfile.py File 1.28 KB 0644
test_posixbase.py File 11.56 KB 0644
test_posixprocess.py File 10.94 KB 0644
test_process.py File 44.6 KB 0644
test_protocol.py File 18.08 KB 0644
test_reactormixins.py File 1.93 KB 0644
test_resolver.py File 19.15 KB 0644
test_serialport.py File 1.94 KB 0644
test_sigchld.py File 3.25 KB 0644
test_socket.py File 9.22 KB 0644
test_stdio.py File 6.18 KB 0644
test_tcp.py File 105.7 KB 0644
test_testing.py File 16.49 KB 0644
test_threads.py File 7.94 KB 0644
test_time.py File 3.57 KB 0644
test_tls.py File 12.54 KB 0644
test_udp.py File 16.48 KB 0644
test_udp_internals.py File 4.98 KB 0644
test_unix.py File 34.04 KB 0644
test_win32events.py File 6.3 KB 0644
test_win32serialport.py File 5.18 KB 0644
Filemanager