__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
from typing import TYPE_CHECKING, List
from twisted.trial.unittest import SynchronousTestCase
from .reactormixins import ReactorBuilder
if TYPE_CHECKING:
fakeBase = SynchronousTestCase
else:
fakeBase = object
def noop() -> None:
"""
Do-nothing callable. Stub for testing.
"""
noop() # Exercise for coverage, since it will never be called below.
class CoreFoundationSpecificTests(ReactorBuilder, fakeBase):
"""
Tests for platform interactions of the CoreFoundation-based reactor.
"""
_reactors = ["twisted.internet.cfreactor.CFReactor"]
def test_whiteboxStopSimulating(self) -> None:
"""
CFReactor's simulation timer is None after CFReactor crashes.
"""
r = self.buildReactor()
r.callLater(0, r.crash)
r.callLater(100, noop)
self.runReactor(r)
self.assertIs(r._currentSimulator, None)
def test_callLaterLeakage(self) -> None:
"""
callLater should not leak global state into CoreFoundation which will
be invoked by a different reactor running the main loop.
@note: this test may actually be usable for other reactors as well, so
we may wish to promote it to ensure this invariant across other
foreign-main-loop reactors.
"""
r = self.buildReactor()
delayed = r.callLater(0, noop)
r2 = self.buildReactor()
def stopBlocking() -> None:
r2.callLater(0, r2stop)
def r2stop() -> None:
r2.stop()
r2.callLater(0, stopBlocking)
self.runReactor(r2)
self.assertEqual(r.getDelayedCalls(), [delayed])
def test_whiteboxIterate(self) -> None:
"""
C{.iterate()} should remove the CFTimer that will run Twisted's
callLaters from the loop, even if one is still pending. We test this
state indirectly with a white-box assertion by verifying the
C{_currentSimulator} is set to C{None}, since CoreFoundation does not
allow us to enumerate all active timers or sources.
"""
r = self.buildReactor()
x: List[int] = []
r.callLater(0, x.append, 1)
delayed = r.callLater(100, noop)
r.iterate()
self.assertIs(r._currentSimulator, None)
self.assertEqual(r.getDelayedCalls(), [delayed])
self.assertEqual(x, [1])
def test_noTimers(self) -> None:
"""
The loop can wake up just fine even if there are no timers in it.
"""
r = self.buildReactor()
stopped = []
def doStop() -> None:
r.stop()
stopped.append("yes")
def sleepThenStop() -> None:
r.callFromThread(doStop)
r.callLater(0, r.callInThread, sleepThenStop)
# Can't use runReactor here because it does a callLater. This is
# therefore a somewhat risky test: inherently, this is the "no timed
# events anywhere in the reactor" test case and so we can't have a
# timeout for it.
r.run()
self.assertEqual(stopped, ["yes"])
globals().update(CoreFoundationSpecificTests.makeTestCaseClasses())
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| fake_CAs | Folder | 0755 |
|
|
| __init__.py | File | 112 B | 0644 |
|
| _posixifaces.py | File | 4.29 KB | 0644 |
|
| _win32ifaces.py | File | 3.88 KB | 0644 |
|
| connectionmixins.py | File | 19.71 KB | 0644 |
|
| fakeendpoint.py | File | 1.62 KB | 0644 |
|
| modulehelpers.py | File | 1.63 KB | 0644 |
|
| process_cli.py | File | 547 B | 0644 |
|
| process_connectionlost.py | File | 126 B | 0644 |
|
| process_gireactornocompat.py | File | 792 B | 0644 |
|
| process_helper.py | File | 1.22 KB | 0644 |
|
| reactormixins.py | File | 16.05 KB | 0644 |
|
| test_abstract.py | File | 2.15 KB | 0644 |
|
| test_address.py | File | 8.07 KB | 0644 |
|
| test_asyncioreactor.py | File | 9.7 KB | 0644 |
|
| test_base.py | File | 14.44 KB | 0644 |
|
| test_baseprocess.py | File | 2.53 KB | 0644 |
|
| test_cfreactor.py | File | 3.09 KB | 0644 |
|
| test_core.py | File | 10.86 KB | 0644 |
|
| test_default.py | File | 3.5 KB | 0644 |
|
| test_defer_await.py | File | 6.66 KB | 0644 |
|
| test_defer_yieldfrom.py | File | 4.79 KB | 0644 |
|
| test_endpoints.py | File | 147.25 KB | 0644 |
|
| test_epollreactor.py | File | 7.15 KB | 0644 |
|
| test_error.py | File | 1.08 KB | 0644 |
|
| test_fdset.py | File | 13.18 KB | 0644 |
|
| test_filedescriptor.py | File | 2.7 KB | 0644 |
|
| test_gireactor.py | File | 7.18 KB | 0644 |
|
| test_glibbase.py | File | 3.02 KB | 0644 |
|
| test_inlinecb.py | File | 11.21 KB | 0644 |
|
| test_inotify.py | File | 18.13 KB | 0644 |
|
| test_iocp.py | File | 7.52 KB | 0644 |
|
| test_kqueuereactor.py | File | 1.93 KB | 0644 |
|
| test_main.py | File | 1.32 KB | 0644 |
|
| test_newtls.py | File | 6.38 KB | 0644 |
|
| test_pollingfile.py | File | 1.28 KB | 0644 |
|
| test_posixbase.py | File | 11.56 KB | 0644 |
|
| test_posixprocess.py | File | 10.94 KB | 0644 |
|
| test_process.py | File | 44.6 KB | 0644 |
|
| test_protocol.py | File | 18.08 KB | 0644 |
|
| test_reactormixins.py | File | 1.93 KB | 0644 |
|
| test_resolver.py | File | 19.15 KB | 0644 |
|
| test_serialport.py | File | 1.94 KB | 0644 |
|
| test_sigchld.py | File | 3.25 KB | 0644 |
|
| test_socket.py | File | 9.22 KB | 0644 |
|
| test_stdio.py | File | 6.18 KB | 0644 |
|
| test_tcp.py | File | 105.7 KB | 0644 |
|
| test_testing.py | File | 16.49 KB | 0644 |
|
| test_threads.py | File | 7.94 KB | 0644 |
|
| test_time.py | File | 3.57 KB | 0644 |
|
| test_tls.py | File | 12.54 KB | 0644 |
|
| test_udp.py | File | 16.48 KB | 0644 |
|
| test_udp_internals.py | File | 4.98 KB | 0644 |
|
| test_unix.py | File | 34.04 KB | 0644 |
|
| test_win32events.py | File | 6.3 KB | 0644 |
|
| test_win32serialport.py | File | 5.18 KB | 0644 |
|