__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet.epollreactor}.
"""
from unittest import skipIf
from twisted.internet.error import ConnectionDone
from twisted.internet.posixbase import _ContinuousPolling
from twisted.internet.task import Clock
from twisted.trial.unittest import TestCase
try:
from twisted.internet import epollreactor
except ImportError:
epollreactor = None # type: ignore[assignment]
class Descriptor:
"""
Records reads and writes, as if it were a C{FileDescriptor}.
"""
def __init__(self):
self.events = []
def fileno(self):
return 1
def doRead(self):
self.events.append("read")
def doWrite(self):
self.events.append("write")
def connectionLost(self, reason):
reason.trap(ConnectionDone)
self.events.append("lost")
@skipIf(not epollreactor, "epoll not supported in this environment.")
class ContinuousPollingTests(TestCase):
"""
L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
objects.
"""
def test_addReader(self):
"""
Adding a reader when there was previously no reader starts up a
C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
self.assertIsNone(poller._loop)
reader = object()
self.assertFalse(poller.isReading(reader))
poller.addReader(reader)
self.assertIsNotNone(poller._loop)
self.assertTrue(poller._loop.running)
self.assertIs(poller._loop.clock, poller._reactor)
self.assertTrue(poller.isReading(reader))
def test_addWriter(self):
"""
Adding a writer when there was previously no writer starts up a
C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
self.assertIsNone(poller._loop)
writer = object()
self.assertFalse(poller.isWriting(writer))
poller.addWriter(writer)
self.assertIsNotNone(poller._loop)
self.assertTrue(poller._loop.running)
self.assertIs(poller._loop.clock, poller._reactor)
self.assertTrue(poller.isWriting(writer))
def test_removeReader(self):
"""
Removing a reader stops the C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
reader = object()
poller.addReader(reader)
poller.removeReader(reader)
self.assertIsNone(poller._loop)
self.assertEqual(poller._reactor.getDelayedCalls(), [])
self.assertFalse(poller.isReading(reader))
def test_removeWriter(self):
"""
Removing a writer stops the C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
writer = object()
poller.addWriter(writer)
poller.removeWriter(writer)
self.assertIsNone(poller._loop)
self.assertEqual(poller._reactor.getDelayedCalls(), [])
self.assertFalse(poller.isWriting(writer))
def test_removeUnknown(self):
"""
Removing unknown readers and writers silently does nothing.
"""
poller = _ContinuousPolling(Clock())
poller.removeWriter(object())
poller.removeReader(object())
def test_multipleReadersAndWriters(self):
"""
Adding multiple readers and writers results in a single
C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
writer = object()
poller.addWriter(writer)
self.assertIsNotNone(poller._loop)
poller.addWriter(object())
self.assertIsNotNone(poller._loop)
poller.addReader(object())
self.assertIsNotNone(poller._loop)
poller.addReader(object())
poller.removeWriter(writer)
self.assertIsNotNone(poller._loop)
self.assertTrue(poller._loop.running)
self.assertEqual(len(poller._reactor.getDelayedCalls()), 1)
def test_readerPolling(self):
"""
Adding a reader causes its C{doRead} to be called every 1
milliseconds.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
poller.addReader(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.00001)
self.assertEqual(desc.events, ["read"])
reactor.advance(0.00001)
self.assertEqual(desc.events, ["read", "read"])
reactor.advance(0.00001)
self.assertEqual(desc.events, ["read", "read", "read"])
def test_writerPolling(self):
"""
Adding a writer causes its C{doWrite} to be called every 1
milliseconds.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
poller.addWriter(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.001)
self.assertEqual(desc.events, ["write"])
reactor.advance(0.001)
self.assertEqual(desc.events, ["write", "write"])
reactor.advance(0.001)
self.assertEqual(desc.events, ["write", "write", "write"])
def test_connectionLostOnRead(self):
"""
If a C{doRead} returns a value indicating disconnection,
C{connectionLost} is called on it.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
desc.doRead = lambda: ConnectionDone()
poller.addReader(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.001)
self.assertEqual(desc.events, ["lost"])
def test_connectionLostOnWrite(self):
"""
If a C{doWrite} returns a value indicating disconnection,
C{connectionLost} is called on it.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
desc.doWrite = lambda: ConnectionDone()
poller.addWriter(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.001)
self.assertEqual(desc.events, ["lost"])
def test_removeAll(self):
"""
L{_ContinuousPolling.removeAll} removes all descriptors and returns
the readers and writers.
"""
poller = _ContinuousPolling(Clock())
reader = object()
writer = object()
both = object()
poller.addReader(reader)
poller.addReader(both)
poller.addWriter(writer)
poller.addWriter(both)
removed = poller.removeAll()
self.assertEqual(poller.getReaders(), [])
self.assertEqual(poller.getWriters(), [])
self.assertEqual(len(removed), 3)
self.assertEqual(set(removed), {reader, writer, both})
def test_getReaders(self):
"""
L{_ContinuousPolling.getReaders} returns a list of the read
descriptors.
"""
poller = _ContinuousPolling(Clock())
reader = object()
poller.addReader(reader)
self.assertIn(reader, poller.getReaders())
def test_getWriters(self):
"""
L{_ContinuousPolling.getWriters} returns a list of the write
descriptors.
"""
poller = _ContinuousPolling(Clock())
writer = object()
poller.addWriter(writer)
self.assertIn(writer, poller.getWriters())
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| fake_CAs | Folder | 0755 |
|
|
| __init__.py | File | 112 B | 0644 |
|
| _posixifaces.py | File | 4.29 KB | 0644 |
|
| _win32ifaces.py | File | 3.88 KB | 0644 |
|
| connectionmixins.py | File | 19.71 KB | 0644 |
|
| fakeendpoint.py | File | 1.62 KB | 0644 |
|
| modulehelpers.py | File | 1.63 KB | 0644 |
|
| process_cli.py | File | 547 B | 0644 |
|
| process_connectionlost.py | File | 126 B | 0644 |
|
| process_gireactornocompat.py | File | 792 B | 0644 |
|
| process_helper.py | File | 1.22 KB | 0644 |
|
| reactormixins.py | File | 16.05 KB | 0644 |
|
| test_abstract.py | File | 2.15 KB | 0644 |
|
| test_address.py | File | 8.07 KB | 0644 |
|
| test_asyncioreactor.py | File | 9.7 KB | 0644 |
|
| test_base.py | File | 14.44 KB | 0644 |
|
| test_baseprocess.py | File | 2.53 KB | 0644 |
|
| test_cfreactor.py | File | 3.09 KB | 0644 |
|
| test_core.py | File | 10.86 KB | 0644 |
|
| test_default.py | File | 3.5 KB | 0644 |
|
| test_defer_await.py | File | 6.66 KB | 0644 |
|
| test_defer_yieldfrom.py | File | 4.79 KB | 0644 |
|
| test_endpoints.py | File | 147.25 KB | 0644 |
|
| test_epollreactor.py | File | 7.15 KB | 0644 |
|
| test_error.py | File | 1.08 KB | 0644 |
|
| test_fdset.py | File | 13.18 KB | 0644 |
|
| test_filedescriptor.py | File | 2.7 KB | 0644 |
|
| test_gireactor.py | File | 7.18 KB | 0644 |
|
| test_glibbase.py | File | 3.02 KB | 0644 |
|
| test_inlinecb.py | File | 11.21 KB | 0644 |
|
| test_inotify.py | File | 18.13 KB | 0644 |
|
| test_iocp.py | File | 7.52 KB | 0644 |
|
| test_kqueuereactor.py | File | 1.93 KB | 0644 |
|
| test_main.py | File | 1.32 KB | 0644 |
|
| test_newtls.py | File | 6.38 KB | 0644 |
|
| test_pollingfile.py | File | 1.28 KB | 0644 |
|
| test_posixbase.py | File | 11.56 KB | 0644 |
|
| test_posixprocess.py | File | 10.94 KB | 0644 |
|
| test_process.py | File | 44.6 KB | 0644 |
|
| test_protocol.py | File | 18.08 KB | 0644 |
|
| test_reactormixins.py | File | 1.93 KB | 0644 |
|
| test_resolver.py | File | 19.15 KB | 0644 |
|
| test_serialport.py | File | 1.94 KB | 0644 |
|
| test_sigchld.py | File | 3.25 KB | 0644 |
|
| test_socket.py | File | 9.22 KB | 0644 |
|
| test_stdio.py | File | 6.18 KB | 0644 |
|
| test_tcp.py | File | 105.7 KB | 0644 |
|
| test_testing.py | File | 16.49 KB | 0644 |
|
| test_threads.py | File | 7.94 KB | 0644 |
|
| test_time.py | File | 3.57 KB | 0644 |
|
| test_tls.py | File | 12.54 KB | 0644 |
|
| test_udp.py | File | 16.48 KB | 0644 |
|
| test_udp_internals.py | File | 4.98 KB | 0644 |
|
| test_unix.py | File | 34.04 KB | 0644 |
|
| test_win32events.py | File | 6.3 KB | 0644 |
|
| test_win32serialport.py | File | 5.18 KB | 0644 |
|