__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet.fdesc}.
"""
import errno
import os
import sys
try:
import fcntl
except ImportError:
skip = "not supported on this platform"
else:
from twisted.internet import fdesc
from twisted.python.util import untilConcludes
from twisted.trial import unittest
class NonBlockingTests(unittest.SynchronousTestCase):
"""
Tests for L{fdesc.setNonBlocking} and L{fdesc.setBlocking}.
"""
def test_setNonBlocking(self):
"""
L{fdesc.setNonBlocking} sets a file description to non-blocking.
"""
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
self.assertFalse(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK)
fdesc.setNonBlocking(r)
self.assertTrue(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK)
def test_setBlocking(self):
"""
L{fdesc.setBlocking} sets a file description to blocking.
"""
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
fdesc.setNonBlocking(r)
fdesc.setBlocking(r)
self.assertFalse(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK)
class ReadWriteTests(unittest.SynchronousTestCase):
"""
Tests for L{fdesc.readFromFD}, L{fdesc.writeToFD}.
"""
def setUp(self):
"""
Create a non-blocking pipe that can be used in tests.
"""
self.r, self.w = os.pipe()
fdesc.setNonBlocking(self.r)
fdesc.setNonBlocking(self.w)
def tearDown(self):
"""
Close pipes.
"""
try:
os.close(self.w)
except OSError:
pass
try:
os.close(self.r)
except OSError:
pass
def write(self, d):
"""
Write data to the pipe.
"""
return fdesc.writeToFD(self.w, d)
def read(self):
"""
Read data from the pipe.
"""
l = []
res = fdesc.readFromFD(self.r, l.append)
if res is None:
if l:
return l[0]
else:
return b""
else:
return res
def test_writeAndRead(self):
"""
Test that the number of bytes L{fdesc.writeToFD} reports as written
with its return value are seen by L{fdesc.readFromFD}.
"""
n = self.write(b"hello")
self.assertTrue(n > 0)
s = self.read()
self.assertEqual(len(s), n)
self.assertEqual(b"hello"[:n], s)
def test_writeAndReadLarge(self):
"""
Similar to L{test_writeAndRead}, but use a much larger string to verify
the behavior for that case.
"""
orig = b"0123456879" * 10000
written = self.write(orig)
self.assertTrue(written > 0)
result = []
resultlength = 0
i = 0
while resultlength < written or i < 50:
result.append(self.read())
resultlength += len(result[-1])
# Increment a counter to be sure we'll exit at some point
i += 1
result = b"".join(result)
self.assertEqual(len(result), written)
self.assertEqual(orig[:written], result)
def test_readFromEmpty(self):
"""
Verify that reading from a file descriptor with no data does not raise
an exception and does not result in the callback function being called.
"""
l = []
result = fdesc.readFromFD(self.r, l.append)
self.assertEqual(l, [])
self.assertIsNone(result)
def test_readFromCleanClose(self):
"""
Test that using L{fdesc.readFromFD} on a cleanly closed file descriptor
returns a connection done indicator.
"""
os.close(self.w)
self.assertEqual(self.read(), fdesc.CONNECTION_DONE)
def test_writeToClosed(self):
"""
Verify that writing with L{fdesc.writeToFD} when the read end is closed
results in a connection lost indicator.
"""
os.close(self.r)
self.assertEqual(self.write(b"s"), fdesc.CONNECTION_LOST)
def test_readFromInvalid(self):
"""
Verify that reading with L{fdesc.readFromFD} when the read end is
closed results in a connection lost indicator.
"""
os.close(self.r)
self.assertEqual(self.read(), fdesc.CONNECTION_LOST)
def test_writeToInvalid(self):
"""
Verify that writing with L{fdesc.writeToFD} when the write end is
closed results in a connection lost indicator.
"""
os.close(self.w)
self.assertEqual(self.write(b"s"), fdesc.CONNECTION_LOST)
def test_writeErrors(self):
"""
Test error path for L{fdesc.writeTod}.
"""
oldOsWrite = os.write
def eagainWrite(fd, data):
err = OSError()
err.errno = errno.EAGAIN
raise err
os.write = eagainWrite
try:
self.assertEqual(self.write(b"s"), 0)
finally:
os.write = oldOsWrite
def eintrWrite(fd, data):
err = OSError()
err.errno = errno.EINTR
raise err
os.write = eintrWrite
try:
self.assertEqual(self.write(b"s"), 0)
finally:
os.write = oldOsWrite
class CloseOnExecTests(unittest.SynchronousTestCase):
"""
Tests for L{fdesc._setCloseOnExec} and L{fdesc._unsetCloseOnExec}.
"""
program = """
import os, errno
try:
os.write(%d, b'lul')
except OSError as e:
if e.errno == errno.EBADF:
os._exit(0)
os._exit(5)
except BaseException:
os._exit(10)
else:
os._exit(20)
"""
def _execWithFileDescriptor(self, fObj):
pid = os.fork()
if pid == 0:
try:
os.execv(
sys.executable,
[sys.executable, "-c", self.program % (fObj.fileno(),)],
)
except BaseException:
import traceback
traceback.print_exc()
os._exit(30)
else:
# On Linux wait(2) doesn't seem ever able to fail with EINTR but
# POSIX seems to allow it and on macOS it happens quite a lot.
return untilConcludes(os.waitpid, pid, 0)[1]
def test_setCloseOnExec(self):
"""
A file descriptor passed to L{fdesc._setCloseOnExec} is not inherited
by a new process image created with one of the exec family of
functions.
"""
with open(self.mktemp(), "wb") as fObj:
fdesc._setCloseOnExec(fObj.fileno())
status = self._execWithFileDescriptor(fObj)
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(os.WEXITSTATUS(status), 0)
def test_unsetCloseOnExec(self):
"""
A file descriptor passed to L{fdesc._unsetCloseOnExec} is inherited by
a new process image created with one of the exec family of functions.
"""
with open(self.mktemp(), "wb") as fObj:
fdesc._setCloseOnExec(fObj.fileno())
fdesc._unsetCloseOnExec(fObj.fileno())
status = self._execWithFileDescriptor(fObj)
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(os.WEXITSTATUS(status), 20)
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| __init__.py | File | 475 B | 0644 |
|
| cert.pem.no_trailing_newline | File | 1.46 KB | 0644 |
|
| crash_test_dummy.py | File | 654 B | 0644 |
|
| ignore_test_failure.py | File | 34.88 KB | 0644 |
|
| iosim.py | File | 18.4 KB | 0644 |
|
| key.pem.no_trailing_newline | File | 1.63 KB | 0644 |
|
| mock_win32process.py | File | 1.25 KB | 0644 |
|
| myrebuilder1.py | File | 172 B | 0644 |
|
| myrebuilder2.py | File | 172 B | 0644 |
|
| plugin_basic.py | File | 925 B | 0644 |
|
| plugin_extra1.py | File | 400 B | 0644 |
|
| plugin_extra2.py | File | 566 B | 0644 |
|
| process_cmdline.py | File | 123 B | 0644 |
|
| process_echoer.py | File | 214 B | 0644 |
|
| process_fds.py | File | 983 B | 0644 |
|
| process_getargv.py | File | 233 B | 0644 |
|
| process_getenv.py | File | 268 B | 0644 |
|
| process_linger.py | File | 297 B | 0644 |
|
| process_reader.py | File | 178 B | 0644 |
|
| process_signal.py | File | 220 B | 0644 |
|
| process_stdinreader.py | File | 739 B | 0644 |
|
| process_tester.py | File | 787 B | 0644 |
|
| process_tty.py | File | 130 B | 0644 |
|
| process_twisted.py | File | 1.15 KB | 0644 |
|
| proto_helpers.py | File | 1.34 KB | 0644 |
|
| reflect_helper_IE.py | File | 60 B | 0644 |
|
| reflect_helper_VE.py | File | 81 B | 0644 |
|
| reflect_helper_ZDE.py | File | 48 B | 0644 |
|
| server.pem | File | 5.23 KB | 0644 |
|
| ssl_helpers.py | File | 1.72 KB | 0644 |
|
| stdio_test_consumer.py | File | 1.14 KB | 0644 |
|
| stdio_test_halfclose.py | File | 2 KB | 0644 |
|
| stdio_test_hostpeer.py | File | 1.06 KB | 0644 |
|
| stdio_test_lastwrite.py | File | 1.13 KB | 0644 |
|
| stdio_test_loseconn.py | File | 1.55 KB | 0644 |
|
| stdio_test_producer.py | File | 1.45 KB | 0644 |
|
| stdio_test_write.py | File | 902 B | 0644 |
|
| stdio_test_writeseq.py | File | 894 B | 0644 |
|
| test_abstract.py | File | 3.66 KB | 0644 |
|
| test_adbapi.py | File | 25.47 KB | 0644 |
|
| test_amp.py | File | 108.04 KB | 0644 |
|
| test_application.py | File | 33.34 KB | 0644 |
|
| test_compat.py | File | 17.75 KB | 0644 |
|
| test_context.py | File | 1.43 KB | 0644 |
|
| test_cooperator.py | File | 20.84 KB | 0644 |
|
| test_defer.py | File | 143.37 KB | 0644 |
|
| test_defgen.py | File | 13.01 KB | 0644 |
|
| test_dirdbm.py | File | 6.9 KB | 0644 |
|
| test_error.py | File | 9.6 KB | 0644 |
|
| test_factories.py | File | 4.46 KB | 0644 |
|
| test_fdesc.py | File | 7.28 KB | 0644 |
|
| test_finger.py | File | 1.89 KB | 0644 |
|
| test_formmethod.py | File | 4.27 KB | 0644 |
|
| test_ftp.py | File | 126.96 KB | 0644 |
|
| test_ftp_options.py | File | 2.65 KB | 0644 |
|
| test_htb.py | File | 3.19 KB | 0644 |
|
| test_ident.py | File | 6.56 KB | 0644 |
|
| test_internet.py | File | 45.38 KB | 0644 |
|
| test_iosim.py | File | 9.58 KB | 0644 |
|
| test_iutils.py | File | 13.31 KB | 0644 |
|
| test_lockfile.py | File | 15.54 KB | 0644 |
|
| test_log.py | File | 36.86 KB | 0644 |
|
| test_logfile.py | File | 17.79 KB | 0644 |
|
| test_loopback.py | File | 13.99 KB | 0644 |
|
| test_main.py | File | 2.12 KB | 0644 |
|
| test_memcache.py | File | 24.69 KB | 0644 |
|
| test_modules.py | File | 17.84 KB | 0644 |
|
| test_monkey.py | File | 6.38 KB | 0644 |
|
| test_paths.py | File | 73.64 KB | 0644 |
|
| test_pcp.py | File | 12.23 KB | 0644 |
|
| test_persisted.py | File | 14.73 KB | 0644 |
|
| test_plugin.py | File | 26.02 KB | 0644 |
|
| test_policies.py | File | 32.28 KB | 0644 |
|
| test_postfix.py | File | 4.32 KB | 0644 |
|
| test_process.py | File | 86.29 KB | 0644 |
|
| test_protocols.py | File | 7.16 KB | 0644 |
|
| test_randbytes.py | File | 3.63 KB | 0644 |
|
| test_rebuild.py | File | 7.4 KB | 0644 |
|
| test_reflect.py | File | 23.89 KB | 0644 |
|
| test_roots.py | File | 1.65 KB | 0644 |
|
| test_shortcut.py | File | 1.91 KB | 0644 |
|
| test_sip.py | File | 24.9 KB | 0644 |
|
| test_sob.py | File | 5.53 KB | 0644 |
|
| test_socks.py | File | 17.09 KB | 0644 |
|
| test_ssl.py | File | 22.73 KB | 0644 |
|
| test_sslverify.py | File | 113.84 KB | 0644 |
|
| test_stateful.py | File | 1.97 KB | 0644 |
|
| test_stdio.py | File | 12.43 KB | 0644 |
|
| test_strerror.py | File | 5.1 KB | 0644 |
|
| test_strports.py | File | 1.67 KB | 0644 |
|
| test_task.py | File | 47.73 KB | 0644 |
|
| test_tcp.py | File | 64.26 KB | 0644 |
|
| test_tcp_internals.py | File | 12.73 KB | 0644 |
|
| test_text.py | File | 6.47 KB | 0644 |
|
| test_threadable.py | File | 3.26 KB | 0644 |
|
| test_threadpool.py | File | 21.64 KB | 0644 |
|
| test_threads.py | File | 12.9 KB | 0644 |
|
| test_tpfile.py | File | 1.69 KB | 0644 |
|
| test_twistd.py | File | 72.29 KB | 0644 |
|
| test_twisted.py | File | 6.13 KB | 0644 |
|
| test_udp.py | File | 26.79 KB | 0644 |
|
| test_unix.py | File | 13.26 KB | 0644 |
|
| test_usage.py | File | 22.76 KB | 0644 |
|
| testutils.py | File | 5.06 KB | 0644 |
|