__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
import asyncio
import time
from .compatibility import guarantee_single_callable
from .timeout import timeout as async_timeout
class ApplicationCommunicator:
"""
Runs an ASGI application in a test mode, allowing sending of
messages to it and retrieval of messages it sends.
"""
def __init__(self, application, scope):
self.application = guarantee_single_callable(application)
self.scope = scope
self.input_queue = asyncio.Queue()
self.output_queue = asyncio.Queue()
self.future = asyncio.ensure_future(
self.application(scope, self.input_queue.get, self.output_queue.put)
)
async def wait(self, timeout=1):
"""
Waits for the application to stop itself and returns any exceptions.
"""
try:
async with async_timeout(timeout):
try:
await self.future
self.future.result()
except asyncio.CancelledError:
pass
finally:
if not self.future.done():
self.future.cancel()
try:
await self.future
except asyncio.CancelledError:
pass
def stop(self, exceptions=True):
if not self.future.done():
self.future.cancel()
elif exceptions:
# Give a chance to raise any exceptions
self.future.result()
def __del__(self):
# Clean up on deletion
try:
self.stop(exceptions=False)
except RuntimeError:
# Event loop already stopped
pass
async def send_input(self, message):
"""
Sends a single message to the application
"""
# Give it the message
await self.input_queue.put(message)
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout(timeout):
return await self.output_queue.get()
except asyncio.TimeoutError as e:
# See if we have another error to raise inside
if self.future.done():
self.future.result()
else:
self.future.cancel()
try:
await self.future
except asyncio.CancelledError:
pass
raise e
async def receive_nothing(self, timeout=0.1, interval=0.01):
"""
Checks that there is no message to receive in the given time.
"""
# `interval` has precedence over `timeout`
start = time.monotonic()
while time.monotonic() - start < timeout:
if not self.output_queue.empty():
return False
await asyncio.sleep(interval)
return self.output_queue.empty()
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| __init__.py | File | 22 B | 0644 |
|
| compatibility.py | File | 1.57 KB | 0644 |
|
| current_thread_executor.py | File | 2.74 KB | 0644 |
|
| local.py | File | 4.74 KB | 0644 |
|
| py.typed | File | 0 B | 0644 |
|
| server.py | File | 5.86 KB | 0644 |
|
| sync.py | File | 20.18 KB | 0644 |
|
| testing.py | File | 3.05 KB | 0644 |
|
| timeout.py | File | 3.54 KB | 0644 |
|
| typing.py | File | 5.71 KB | 0644 |
|
| wsgi.py | File | 6.42 KB | 0644 |
|