__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet._sigchld}, an alternate, superior SIGCHLD
monitoring API.
"""
from __future__ import annotations
import errno
import os
import signal
from twisted.python.runtime import platformType
from twisted.trial.unittest import SynchronousTestCase
if platformType == "posix":
from twisted.internet._signals import installHandler, isDefaultHandler
from twisted.internet.fdesc import setNonBlocking
else:
skip = "These tests can only run on POSIX platforms."
class SetWakeupSIGCHLDTests(SynchronousTestCase):
"""
Tests for the L{signal.set_wakeup_fd} implementation of the
L{installHandler} and L{isDefaultHandler} APIs.
"""
def pipe(self) -> tuple[int, int]:
"""
Create a non-blocking pipe which will be closed after the currently
running test.
"""
read, write = os.pipe()
self.addCleanup(os.close, read)
self.addCleanup(os.close, write)
setNonBlocking(read)
setNonBlocking(write)
return read, write
def setUp(self) -> None:
"""
Save the current SIGCHLD handler as reported by L{signal.signal} and
the current file descriptor registered with L{installHandler}.
"""
self.signalModuleHandler = signal.getsignal(signal.SIGCHLD)
self.oldFD = installHandler(-1)
def tearDown(self) -> None:
"""
Restore whatever signal handler was present when setUp ran.
"""
# If tests set up any kind of handlers, clear them out.
installHandler(self.oldFD)
signal.signal(signal.SIGCHLD, self.signalModuleHandler)
def test_isDefaultHandler(self) -> None:
"""
L{isDefaultHandler} returns true if the SIGCHLD handler is SIG_DFL,
false otherwise.
"""
self.assertTrue(isDefaultHandler())
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
self.assertFalse(isDefaultHandler())
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
self.assertTrue(isDefaultHandler())
signal.signal(signal.SIGCHLD, lambda *args: None)
self.assertFalse(isDefaultHandler())
def test_returnOldFD(self) -> None:
"""
L{installHandler} returns the previously registered file descriptor.
"""
read, write = self.pipe()
oldFD = installHandler(write)
self.assertEqual(installHandler(oldFD), write)
def test_uninstallHandler(self) -> None:
"""
C{installHandler(-1)} removes the SIGCHLD handler completely.
"""
read, write = self.pipe()
self.assertTrue(isDefaultHandler())
installHandler(write)
self.assertFalse(isDefaultHandler())
installHandler(-1)
self.assertTrue(isDefaultHandler())
def test_installHandler(self) -> None:
"""
The file descriptor passed to L{installHandler} has a byte written to
it when SIGCHLD is delivered to the process.
"""
read, write = self.pipe()
installHandler(write)
exc = self.assertRaises(OSError, os.read, read, 1)
self.assertEqual(exc.errno, errno.EAGAIN)
os.kill(os.getpid(), signal.SIGCHLD)
self.assertEqual(len(os.read(read, 5)), 1)
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| fake_CAs | Folder | 0755 |
|
|
| __init__.py | File | 112 B | 0644 |
|
| _posixifaces.py | File | 4.29 KB | 0644 |
|
| _win32ifaces.py | File | 3.88 KB | 0644 |
|
| connectionmixins.py | File | 19.71 KB | 0644 |
|
| fakeendpoint.py | File | 1.62 KB | 0644 |
|
| modulehelpers.py | File | 1.63 KB | 0644 |
|
| process_cli.py | File | 547 B | 0644 |
|
| process_connectionlost.py | File | 126 B | 0644 |
|
| process_gireactornocompat.py | File | 792 B | 0644 |
|
| process_helper.py | File | 1.22 KB | 0644 |
|
| reactormixins.py | File | 16.05 KB | 0644 |
|
| test_abstract.py | File | 2.15 KB | 0644 |
|
| test_address.py | File | 8.07 KB | 0644 |
|
| test_asyncioreactor.py | File | 9.7 KB | 0644 |
|
| test_base.py | File | 14.44 KB | 0644 |
|
| test_baseprocess.py | File | 2.53 KB | 0644 |
|
| test_cfreactor.py | File | 3.09 KB | 0644 |
|
| test_core.py | File | 10.86 KB | 0644 |
|
| test_default.py | File | 3.5 KB | 0644 |
|
| test_defer_await.py | File | 6.66 KB | 0644 |
|
| test_defer_yieldfrom.py | File | 4.79 KB | 0644 |
|
| test_endpoints.py | File | 147.25 KB | 0644 |
|
| test_epollreactor.py | File | 7.15 KB | 0644 |
|
| test_error.py | File | 1.08 KB | 0644 |
|
| test_fdset.py | File | 13.18 KB | 0644 |
|
| test_filedescriptor.py | File | 2.7 KB | 0644 |
|
| test_gireactor.py | File | 7.18 KB | 0644 |
|
| test_glibbase.py | File | 3.02 KB | 0644 |
|
| test_inlinecb.py | File | 11.21 KB | 0644 |
|
| test_inotify.py | File | 18.13 KB | 0644 |
|
| test_iocp.py | File | 7.52 KB | 0644 |
|
| test_kqueuereactor.py | File | 1.93 KB | 0644 |
|
| test_main.py | File | 1.32 KB | 0644 |
|
| test_newtls.py | File | 6.38 KB | 0644 |
|
| test_pollingfile.py | File | 1.28 KB | 0644 |
|
| test_posixbase.py | File | 11.56 KB | 0644 |
|
| test_posixprocess.py | File | 10.94 KB | 0644 |
|
| test_process.py | File | 44.6 KB | 0644 |
|
| test_protocol.py | File | 18.08 KB | 0644 |
|
| test_reactormixins.py | File | 1.93 KB | 0644 |
|
| test_resolver.py | File | 19.15 KB | 0644 |
|
| test_serialport.py | File | 1.94 KB | 0644 |
|
| test_sigchld.py | File | 3.25 KB | 0644 |
|
| test_socket.py | File | 9.22 KB | 0644 |
|
| test_stdio.py | File | 6.18 KB | 0644 |
|
| test_tcp.py | File | 105.7 KB | 0644 |
|
| test_testing.py | File | 16.49 KB | 0644 |
|
| test_threads.py | File | 7.94 KB | 0644 |
|
| test_time.py | File | 3.57 KB | 0644 |
|
| test_tls.py | File | 12.54 KB | 0644 |
|
| test_udp.py | File | 16.48 KB | 0644 |
|
| test_udp_internals.py | File | 4.98 KB | 0644 |
|
| test_unix.py | File | 34.04 KB | 0644 |
|
| test_win32events.py | File | 6.3 KB | 0644 |
|
| test_win32serialport.py | File | 5.18 KB | 0644 |
|