__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet.stdio}.
"""
from twisted.internet.protocol import Protocol
from twisted.internet.test.reactormixins import ReactorBuilder
from twisted.python.runtime import platform
if not platform.isWindows():
from twisted.internet.stdio import StandardIO
class StdioFilesTests(ReactorBuilder):
"""
L{StandardIO} supports reading and writing to filesystem files.
"""
def setUp(self):
path = self.mktemp()
open(path, "wb").close()
self.extraFile = open(path, "rb+")
self.addCleanup(self.extraFile.close)
def test_addReader(self):
"""
Adding a filesystem file reader to a reactor will make sure it is
polled.
"""
reactor = self.buildReactor()
class DataProtocol(Protocol):
data = b""
def dataReceived(self, data):
self.data += data
# It'd be better to stop reactor on connectionLost, but that
# fails on FreeBSD, probably due to
# http://bugs.python.org/issue9591:
if self.data == b"hello!":
reactor.stop()
path = self.mktemp()
with open(path, "wb") as f:
f.write(b"hello!")
with open(path, "rb") as f:
# Read bytes from a file, deliver them to a protocol instance:
protocol = DataProtocol()
StandardIO(
protocol,
stdin=f.fileno(),
stdout=self.extraFile.fileno(),
reactor=reactor,
)
self.runReactor(reactor)
self.assertEqual(protocol.data, b"hello!")
def test_addWriter(self):
"""
Adding a filesystem file writer to a reactor will make sure it is
polled.
"""
reactor = self.buildReactor()
class DisconnectProtocol(Protocol):
def connectionLost(self, reason):
reactor.stop()
path = self.mktemp()
with open(path, "wb") as f:
# Write bytes to a transport, hopefully have them written to a
# file:
protocol = DisconnectProtocol()
StandardIO(
protocol,
stdout=f.fileno(),
stdin=self.extraFile.fileno(),
reactor=reactor,
)
protocol.transport.write(b"hello")
protocol.transport.write(b", world")
protocol.transport.loseConnection()
self.runReactor(reactor)
with open(path, "rb") as f:
self.assertEqual(f.read(), b"hello, world")
def test_removeReader(self):
"""
Removing a filesystem file reader from a reactor will make sure it is
no longer polled.
"""
reactor = self.buildReactor()
path = self.mktemp()
open(path, "wb").close()
with open(path, "rb") as f:
# Have the reader added:
stdio = StandardIO(
Protocol(),
stdin=f.fileno(),
stdout=self.extraFile.fileno(),
reactor=reactor,
)
self.assertIn(stdio._reader, reactor.getReaders())
stdio._reader.stopReading()
self.assertNotIn(stdio._reader, reactor.getReaders())
def test_removeWriter(self):
"""
Removing a filesystem file writer from a reactor will make sure it is
no longer polled.
"""
reactor = self.buildReactor()
# Cleanup might fail if file is GCed too soon:
self.f = f = open(self.mktemp(), "wb")
# Have the reader added:
protocol = Protocol()
stdio = StandardIO(
protocol, stdout=f.fileno(), stdin=self.extraFile.fileno(), reactor=reactor
)
protocol.transport.write(b"hello")
self.assertIn(stdio._writer, reactor.getWriters())
stdio._writer.stopWriting()
self.assertNotIn(stdio._writer, reactor.getWriters())
def test_removeAll(self):
"""
Calling C{removeAll} on a reactor includes descriptors that are
filesystem files.
"""
reactor = self.buildReactor()
path = self.mktemp()
open(path, "wb").close()
# Cleanup might fail if file is GCed too soon:
self.f = f = open(path, "rb")
# Have the reader added:
stdio = StandardIO(
Protocol(),
stdin=f.fileno(),
stdout=self.extraFile.fileno(),
reactor=reactor,
)
# And then removed:
removed = reactor.removeAll()
self.assertIn(stdio._reader, removed)
self.assertNotIn(stdio._reader, reactor.getReaders())
def test_getReaders(self):
"""
C{reactor.getReaders} includes descriptors that are filesystem files.
"""
reactor = self.buildReactor()
path = self.mktemp()
open(path, "wb").close()
# Cleanup might fail if file is GCed too soon:
with open(path, "rb") as f:
# Have the reader added:
stdio = StandardIO(
Protocol(),
stdin=f.fileno(),
stdout=self.extraFile.fileno(),
reactor=reactor,
)
self.assertIn(stdio._reader, reactor.getReaders())
def test_getWriters(self):
"""
C{reactor.getWriters} includes descriptors that are filesystem files.
"""
reactor = self.buildReactor()
# Cleanup might fail if file is GCed too soon:
self.f = f = open(self.mktemp(), "wb")
# Have the reader added:
stdio = StandardIO(
Protocol(),
stdout=f.fileno(),
stdin=self.extraFile.fileno(),
reactor=reactor,
)
self.assertNotIn(stdio._writer, reactor.getWriters())
stdio._writer.startWriting()
self.assertIn(stdio._writer, reactor.getWriters())
if platform.isWindows():
skip = (
"StandardIO does not accept stdout as an argument to Windows. "
"Testing redirection to a file is therefore harder."
)
globals().update(StdioFilesTests.makeTestCaseClasses())
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| fake_CAs | Folder | 0755 |
|
|
| __init__.py | File | 112 B | 0644 |
|
| _posixifaces.py | File | 4.29 KB | 0644 |
|
| _win32ifaces.py | File | 3.88 KB | 0644 |
|
| connectionmixins.py | File | 19.71 KB | 0644 |
|
| fakeendpoint.py | File | 1.62 KB | 0644 |
|
| modulehelpers.py | File | 1.63 KB | 0644 |
|
| process_cli.py | File | 547 B | 0644 |
|
| process_connectionlost.py | File | 126 B | 0644 |
|
| process_gireactornocompat.py | File | 792 B | 0644 |
|
| process_helper.py | File | 1.22 KB | 0644 |
|
| reactormixins.py | File | 16.05 KB | 0644 |
|
| test_abstract.py | File | 2.15 KB | 0644 |
|
| test_address.py | File | 8.07 KB | 0644 |
|
| test_asyncioreactor.py | File | 9.7 KB | 0644 |
|
| test_base.py | File | 14.44 KB | 0644 |
|
| test_baseprocess.py | File | 2.53 KB | 0644 |
|
| test_cfreactor.py | File | 3.09 KB | 0644 |
|
| test_core.py | File | 10.86 KB | 0644 |
|
| test_default.py | File | 3.5 KB | 0644 |
|
| test_defer_await.py | File | 6.66 KB | 0644 |
|
| test_defer_yieldfrom.py | File | 4.79 KB | 0644 |
|
| test_endpoints.py | File | 147.25 KB | 0644 |
|
| test_epollreactor.py | File | 7.15 KB | 0644 |
|
| test_error.py | File | 1.08 KB | 0644 |
|
| test_fdset.py | File | 13.18 KB | 0644 |
|
| test_filedescriptor.py | File | 2.7 KB | 0644 |
|
| test_gireactor.py | File | 7.18 KB | 0644 |
|
| test_glibbase.py | File | 3.02 KB | 0644 |
|
| test_inlinecb.py | File | 11.21 KB | 0644 |
|
| test_inotify.py | File | 18.13 KB | 0644 |
|
| test_iocp.py | File | 7.52 KB | 0644 |
|
| test_kqueuereactor.py | File | 1.93 KB | 0644 |
|
| test_main.py | File | 1.32 KB | 0644 |
|
| test_newtls.py | File | 6.38 KB | 0644 |
|
| test_pollingfile.py | File | 1.28 KB | 0644 |
|
| test_posixbase.py | File | 11.56 KB | 0644 |
|
| test_posixprocess.py | File | 10.94 KB | 0644 |
|
| test_process.py | File | 44.6 KB | 0644 |
|
| test_protocol.py | File | 18.08 KB | 0644 |
|
| test_reactormixins.py | File | 1.93 KB | 0644 |
|
| test_resolver.py | File | 19.15 KB | 0644 |
|
| test_serialport.py | File | 1.94 KB | 0644 |
|
| test_sigchld.py | File | 3.25 KB | 0644 |
|
| test_socket.py | File | 9.22 KB | 0644 |
|
| test_stdio.py | File | 6.18 KB | 0644 |
|
| test_tcp.py | File | 105.7 KB | 0644 |
|
| test_testing.py | File | 16.49 KB | 0644 |
|
| test_threads.py | File | 7.94 KB | 0644 |
|
| test_time.py | File | 3.57 KB | 0644 |
|
| test_tls.py | File | 12.54 KB | 0644 |
|
| test_udp.py | File | 16.48 KB | 0644 |
|
| test_udp_internals.py | File | 4.98 KB | 0644 |
|
| test_unix.py | File | 34.04 KB | 0644 |
|
| test_win32events.py | File | 6.3 KB | 0644 |
|
| test_win32serialport.py | File | 5.18 KB | 0644 |
|