__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet._newtls}.
"""
from twisted.internet import interfaces
from twisted.internet.test.connectionmixins import (
ConnectableProtocol,
runProtocolsWithReactor,
)
from twisted.internet.test.reactormixins import ReactorBuilder
from twisted.internet.test.test_tcp import TCPCreator
from twisted.internet.test.test_tls import (
ContextGeneratingMixin,
SSLCreator,
StartTLSClientCreator,
TLSMixin,
)
from twisted.trial import unittest
try:
from twisted.internet import _newtls as __newtls
from twisted.protocols import tls
except ImportError:
_newtls = None
else:
_newtls = __newtls
from zope.interface import implementer
class BypassTLSTests(unittest.TestCase):
"""
Tests for the L{_newtls._BypassTLS} class.
"""
if not _newtls:
skip = "Couldn't import _newtls, perhaps pyOpenSSL is old or missing"
def test_loseConnectionPassThrough(self):
"""
C{_BypassTLS.loseConnection} calls C{loseConnection} on the base
class, while preserving any default argument in the base class'
C{loseConnection} implementation.
"""
default = object()
result = []
class FakeTransport:
def loseConnection(self, _connDone=default):
result.append(_connDone)
bypass = _newtls._BypassTLS(FakeTransport, FakeTransport())
# The default from FakeTransport is used:
bypass.loseConnection()
self.assertEqual(result, [default])
# And we can pass our own:
notDefault = object()
bypass.loseConnection(notDefault)
self.assertEqual(result, [default, notDefault])
class FakeProducer:
"""
A producer that does nothing.
"""
def pauseProducing(self):
pass
def resumeProducing(self):
pass
def stopProducing(self):
pass
@implementer(interfaces.IHandshakeListener)
class ProducerProtocol(ConnectableProtocol):
"""
Register a producer, unregister it, and verify the producer hooks up to
innards of C{TLSMemoryBIOProtocol}.
"""
def __init__(self, producer, result):
self.producer = producer
self.result = result
def handshakeCompleted(self):
if not isinstance(self.transport.protocol, tls.BufferingTLSTransport):
# Either the test or the code have a bug...
raise RuntimeError("TLSMemoryBIOProtocol not hooked up.")
self.transport.registerProducer(self.producer, True)
# The producer was registered with the TLSMemoryBIOProtocol:
self.result.append(self.transport.protocol._producer._producer)
self.transport.unregisterProducer()
# The producer was unregistered from the TLSMemoryBIOProtocol:
self.result.append(self.transport.protocol._producer)
self.transport.loseConnection()
class ProducerTestsMixin(ReactorBuilder, TLSMixin, ContextGeneratingMixin):
"""
Test the new TLS code integrates C{TLSMemoryBIOProtocol} correctly.
"""
if not _newtls:
skip = "Could not import twisted.internet._newtls"
def test_producerSSLFromStart(self):
"""
C{registerProducer} and C{unregisterProducer} on TLS transports
created as SSL from the get go are passed to the
C{TLSMemoryBIOProtocol}, not the underlying transport directly.
"""
result = []
producer = FakeProducer()
runProtocolsWithReactor(
self,
ConnectableProtocol(),
ProducerProtocol(producer, result),
SSLCreator(),
)
self.assertEqual(result, [producer, None])
def test_producerAfterStartTLS(self):
"""
C{registerProducer} and C{unregisterProducer} on TLS transports
created by C{startTLS} are passed to the C{TLSMemoryBIOProtocol}, not
the underlying transport directly.
"""
result = []
producer = FakeProducer()
runProtocolsWithReactor(
self,
ConnectableProtocol(),
ProducerProtocol(producer, result),
StartTLSClientCreator(),
)
self.assertEqual(result, [producer, None])
def startTLSAfterRegisterProducer(self, streaming):
"""
When a producer is registered, and then startTLS is called,
the producer is re-registered with the C{TLSMemoryBIOProtocol}.
"""
clientContext = self.getClientContext()
serverContext = self.getServerContext()
result = []
producer = FakeProducer()
class RegisterTLSProtocol(ConnectableProtocol):
def connectionMade(self):
self.transport.registerProducer(producer, streaming)
self.transport.startTLS(serverContext)
# Store TLSMemoryBIOProtocol and underlying transport producer
# status:
if streaming:
# _ProducerMembrane -> producer:
result.append(self.transport.protocol._producer._producer)
result.append(self.transport.producer._producer)
else:
# _ProducerMembrane -> _PullToPush -> producer:
result.append(self.transport.protocol._producer._producer._producer)
result.append(self.transport.producer._producer._producer)
self.transport.unregisterProducer()
self.transport.loseConnection()
class StartTLSProtocol(ConnectableProtocol):
def connectionMade(self):
self.transport.startTLS(clientContext)
runProtocolsWithReactor(
self, RegisterTLSProtocol(), StartTLSProtocol(), TCPCreator()
)
self.assertEqual(result, [producer, producer])
def test_startTLSAfterRegisterProducerStreaming(self):
"""
When a streaming producer is registered, and then startTLS is called,
the producer is re-registered with the C{TLSMemoryBIOProtocol}.
"""
self.startTLSAfterRegisterProducer(True)
def test_startTLSAfterRegisterProducerNonStreaming(self):
"""
When a non-streaming producer is registered, and then startTLS is
called, the producer is re-registered with the
C{TLSMemoryBIOProtocol}.
"""
self.startTLSAfterRegisterProducer(False)
globals().update(ProducerTestsMixin.makeTestCaseClasses())
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| fake_CAs | Folder | 0755 |
|
|
| __init__.py | File | 112 B | 0644 |
|
| _posixifaces.py | File | 4.29 KB | 0644 |
|
| _win32ifaces.py | File | 3.88 KB | 0644 |
|
| connectionmixins.py | File | 19.71 KB | 0644 |
|
| fakeendpoint.py | File | 1.62 KB | 0644 |
|
| modulehelpers.py | File | 1.63 KB | 0644 |
|
| process_cli.py | File | 547 B | 0644 |
|
| process_connectionlost.py | File | 126 B | 0644 |
|
| process_gireactornocompat.py | File | 792 B | 0644 |
|
| process_helper.py | File | 1.22 KB | 0644 |
|
| reactormixins.py | File | 16.05 KB | 0644 |
|
| test_abstract.py | File | 2.15 KB | 0644 |
|
| test_address.py | File | 8.07 KB | 0644 |
|
| test_asyncioreactor.py | File | 9.7 KB | 0644 |
|
| test_base.py | File | 14.44 KB | 0644 |
|
| test_baseprocess.py | File | 2.53 KB | 0644 |
|
| test_cfreactor.py | File | 3.09 KB | 0644 |
|
| test_core.py | File | 10.86 KB | 0644 |
|
| test_default.py | File | 3.5 KB | 0644 |
|
| test_defer_await.py | File | 6.66 KB | 0644 |
|
| test_defer_yieldfrom.py | File | 4.79 KB | 0644 |
|
| test_endpoints.py | File | 147.25 KB | 0644 |
|
| test_epollreactor.py | File | 7.15 KB | 0644 |
|
| test_error.py | File | 1.08 KB | 0644 |
|
| test_fdset.py | File | 13.18 KB | 0644 |
|
| test_filedescriptor.py | File | 2.7 KB | 0644 |
|
| test_gireactor.py | File | 7.18 KB | 0644 |
|
| test_glibbase.py | File | 3.02 KB | 0644 |
|
| test_inlinecb.py | File | 11.21 KB | 0644 |
|
| test_inotify.py | File | 18.13 KB | 0644 |
|
| test_iocp.py | File | 7.52 KB | 0644 |
|
| test_kqueuereactor.py | File | 1.93 KB | 0644 |
|
| test_main.py | File | 1.32 KB | 0644 |
|
| test_newtls.py | File | 6.38 KB | 0644 |
|
| test_pollingfile.py | File | 1.28 KB | 0644 |
|
| test_posixbase.py | File | 11.56 KB | 0644 |
|
| test_posixprocess.py | File | 10.94 KB | 0644 |
|
| test_process.py | File | 44.6 KB | 0644 |
|
| test_protocol.py | File | 18.08 KB | 0644 |
|
| test_reactormixins.py | File | 1.93 KB | 0644 |
|
| test_resolver.py | File | 19.15 KB | 0644 |
|
| test_serialport.py | File | 1.94 KB | 0644 |
|
| test_sigchld.py | File | 3.25 KB | 0644 |
|
| test_socket.py | File | 9.22 KB | 0644 |
|
| test_stdio.py | File | 6.18 KB | 0644 |
|
| test_tcp.py | File | 105.7 KB | 0644 |
|
| test_testing.py | File | 16.49 KB | 0644 |
|
| test_threads.py | File | 7.94 KB | 0644 |
|
| test_time.py | File | 3.57 KB | 0644 |
|
| test_tls.py | File | 12.54 KB | 0644 |
|
| test_udp.py | File | 16.48 KB | 0644 |
|
| test_udp_internals.py | File | 4.98 KB | 0644 |
|
| test_unix.py | File | 34.04 KB | 0644 |
|
| test_win32events.py | File | 6.3 KB | 0644 |
|
| test_win32serialport.py | File | 5.18 KB | 0644 |
|