__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
# Copyright (c) 2005 Divmod, Inc.
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.python.lockfile}.
"""
from __future__ import annotations

import errno
import os
from unittest import skipIf, skipUnless

from typing import NoReturn

from twisted.python import lockfile
from twisted.python.reflect import requireModule
from twisted.python.runtime import platform
from twisted.trial.unittest import TestCase

skipKill = False
skipKillReason = ""
if platform.isWindows():
    if (
        requireModule("win32api.OpenProcess") is None
        and requireModule("pywintypes") is None
    ):
        skipKill = True
        skipKillReason = (
            "On windows, lockfile.kill is not implemented "
            "in the absence of win32api and/or pywintypes."
        )


class UtilTests(TestCase):
    """
    Tests for the helper functions used to implement L{FilesystemLock}.
    """

    def test_symlinkEEXIST(self) -> None:
        """
        L{lockfile.symlink} raises L{OSError} with C{errno} set to L{EEXIST}
        when an attempt is made to create a symlink which already exists.
        """
        name = self.mktemp()
        lockfile.symlink("foo", name)
        exc = self.assertRaises(OSError, lockfile.symlink, "foo", name)
        self.assertEqual(exc.errno, errno.EEXIST)

    @skipUnless(
        platform.isWindows(),
        "special rename EIO handling only necessary and correct on " "Windows.",
    )
    def test_symlinkEIOWindows(self) -> None:
        """
        L{lockfile.symlink} raises L{OSError} with C{errno} set to L{EIO} when
        the underlying L{rename} call fails with L{EIO}.

        Renaming a file on Windows may fail if the target of the rename is in
        the process of being deleted (directory deletion appears not to be
        atomic).
        """
        name = self.mktemp()

        def fakeRename(src: str, dst: str) -> NoReturn:
            raise OSError(errno.EIO, None)

        self.patch(lockfile, "rename", fakeRename)
        exc = self.assertRaises(IOError, lockfile.symlink, name, "foo")
        self.assertEqual(exc.errno, errno.EIO)

    def test_readlinkENOENT(self) -> None:
        """
        L{lockfile.readlink} raises L{OSError} with C{errno} set to L{ENOENT}
        when an attempt is made to read a symlink which does not exist.
        """
        name = self.mktemp()
        exc = self.assertRaises(OSError, lockfile.readlink, name)
        self.assertEqual(exc.errno, errno.ENOENT)

    @skipUnless(
        platform.isWindows(),
        "special readlink EACCES handling only necessary and " "correct on Windows.",
    )
    def test_readlinkEACCESWindows(self) -> None:
        """
        L{lockfile.readlink} raises L{OSError} with C{errno} set to L{EACCES}
        on Windows when the underlying file open attempt fails with C{EACCES}.

        Opening a file on Windows may fail if the path is inside a directory
        which is in the process of being deleted (directory deletion appears
        not to be atomic).
        """
        name = self.mktemp()

        def fakeOpen(path: str, mode: str) -> NoReturn:
            raise OSError(errno.EACCES, None)

        self.patch(lockfile, "_open", fakeOpen)
        exc = self.assertRaises(IOError, lockfile.readlink, name)
        self.assertEqual(exc.errno, errno.EACCES)

    @skipIf(skipKill, skipKillReason)
    def test_kill(self) -> None:
        """
        L{lockfile.kill} returns without error if passed the PID of a
        process which exists and signal C{0}.
        """
        lockfile.kill(os.getpid(), 0)

    @skipIf(skipKill, skipKillReason)
    def test_killESRCH(self) -> None:
        """
        L{lockfile.kill} raises L{OSError} with errno of L{ESRCH} if
        passed a PID which does not correspond to any process.
        """
        # Hopefully there is no process with PID 2 ** 31 - 1
        exc = self.assertRaises(OSError, lockfile.kill, 2**31 - 1, 0)
        self.assertEqual(exc.errno, errno.ESRCH)

    def test_noKillCall(self) -> None:
        """
        Verify that when L{lockfile.kill} does end up as None (e.g. on Windows
        without pywin32), it doesn't end up being called and raising a
        L{TypeError}.
        """
        self.patch(lockfile, "kill", None)
        fl = lockfile.FilesystemLock(self.mktemp())
        fl.lock()
        self.assertFalse(fl.lock())


class LockingTests(TestCase):
    def _symlinkErrorTest(self, errno: int) -> None:
        def fakeSymlink(source: str, dest: str) -> NoReturn:
            raise OSError(errno, None)

        self.patch(lockfile, "symlink", fakeSymlink)

        lockf = self.mktemp()
        lock = lockfile.FilesystemLock(lockf)
        exc = self.assertRaises(OSError, lock.lock)
        self.assertEqual(exc.errno, errno)

    def test_symlinkError(self) -> None:
        """
        An exception raised by C{symlink} other than C{EEXIST} is passed up to
        the caller of L{FilesystemLock.lock}.
        """
        self._symlinkErrorTest(errno.ENOSYS)

    @skipIf(
        platform.isWindows(),
        "POSIX-specific error propagation not expected on Windows.",
    )
    def test_symlinkErrorPOSIX(self) -> None:
        """
        An L{OSError} raised by C{symlink} on a POSIX platform with an errno of
        C{EACCES} or C{EIO} is passed to the caller of L{FilesystemLock.lock}.

        On POSIX, unlike on Windows, these are unexpected errors which cannot
        be handled by L{FilesystemLock}.
        """
        self._symlinkErrorTest(errno.EACCES)
        self._symlinkErrorTest(errno.EIO)

    def test_cleanlyAcquire(self) -> None:
        """
        If the lock has never been held, it can be acquired and the C{clean}
        and C{locked} attributes are set to C{True}.
        """
        lockf = self.mktemp()
        lock = lockfile.FilesystemLock(lockf)
        self.assertTrue(lock.lock())
        self.assertTrue(lock.clean)
        self.assertTrue(lock.locked)

    def test_cleanlyRelease(self) -> None:
        """
        If a lock is released cleanly, it can be re-acquired and the C{clean}
        and C{locked} attributes are set to C{True}.
        """
        lockf = self.mktemp()
        lock = lockfile.FilesystemLock(lockf)
        self.assertTrue(lock.lock())
        lock.unlock()
        self.assertFalse(lock.locked)

        lock = lockfile.FilesystemLock(lockf)
        self.assertTrue(lock.lock())
        self.assertTrue(lock.clean)
        self.assertTrue(lock.locked)

    def test_cannotLockLocked(self) -> None:
        """
        If a lock is currently locked, it cannot be locked again.
        """
        lockf = self.mktemp()
        firstLock = lockfile.FilesystemLock(lockf)
        self.assertTrue(firstLock.lock())

        secondLock = lockfile.FilesystemLock(lockf)
        self.assertFalse(secondLock.lock())
        self.assertFalse(secondLock.locked)

    def test_uncleanlyAcquire(self) -> None:
        """
        If a lock was held by a process which no longer exists, it can be
        acquired, the C{clean} attribute is set to C{False}, and the
        C{locked} attribute is set to C{True}.
        """
        owner = 12345

        def fakeKill(pid: int, signal: int) -> None:
            if signal != 0:
                raise OSError(errno.EPERM, None)
            if pid == owner:
                raise OSError(errno.ESRCH, None)

        lockf = self.mktemp()
        self.patch(lockfile, "kill", fakeKill)
        lockfile.symlink(str(owner), lockf)

        lock = lockfile.FilesystemLock(lockf)
        self.assertTrue(lock.lock())
        self.assertFalse(lock.clean)
        self.assertTrue(lock.locked)

        self.assertEqual(lockfile.readlink(lockf), str(os.getpid()))

    def test_lockReleasedBeforeCheck(self) -> None:
        """
        If the lock is initially held but then released before it can be
        examined to determine if the process which held it still exists, it is
        acquired and the C{clean} and C{locked} attributes are set to C{True}.
        """

        def fakeReadlink(name: str) -> str:
            # Pretend to be another process releasing the lock.
            lockfile.rmlink(lockf)
            # Fall back to the real implementation of readlink.
            readlinkPatch.restore()
            return lockfile.readlink(name)

        readlinkPatch = self.patch(lockfile, "readlink", fakeReadlink)

        def fakeKill(pid: int, signal: int) -> None:
            if signal != 0:
                raise OSError(errno.EPERM, None)
            if pid == 43125:
                raise OSError(errno.ESRCH, None)

        self.patch(lockfile, "kill", fakeKill)

        lockf = self.mktemp()
        lock = lockfile.FilesystemLock(lockf)
        lockfile.symlink(str(43125), lockf)
        self.assertTrue(lock.lock())
        self.assertTrue(lock.clean)
        self.assertTrue(lock.locked)

    @skipUnless(
        platform.isWindows(),
        "special rename EIO handling only necessary and correct on " "Windows.",
    )
    def test_lockReleasedDuringAcquireSymlink(self) -> None:
        """
        If the lock is released while an attempt is made to acquire
        it, the lock attempt fails and C{FilesystemLock.lock} returns
        C{False}.  This can happen on Windows when L{lockfile.symlink}
        fails with L{IOError} of C{EIO} because another process is in
        the middle of a call to L{os.rmdir} (implemented in terms of
        RemoveDirectory) which is not atomic.
        """

        def fakeSymlink(src: str, dst: str) -> NoReturn:
            # While another process id doing os.rmdir which the Windows
            # implementation of rmlink does, a rename call will fail with EIO.
            raise OSError(errno.EIO, None)

        self.patch(lockfile, "symlink", fakeSymlink)

        lockf = self.mktemp()
        lock = lockfile.FilesystemLock(lockf)
        self.assertFalse(lock.lock())
        self.assertFalse(lock.locked)

    @skipUnless(
        platform.isWindows(),
        "special readlink EACCES handling only necessary and " "correct on Windows.",
    )
    def test_lockReleasedDuringAcquireReadlink(self) -> None:
        """
        If the lock is initially held but is released while an attempt
        is made to acquire it, the lock attempt fails and
        L{FilesystemLock.lock} returns C{False}.
        """

        def fakeReadlink(name: str) -> NoReturn:
            # While another process is doing os.rmdir which the
            # Windows implementation of rmlink does, a readlink call
            # will fail with EACCES.
            raise OSError(errno.EACCES, None)

        self.patch(lockfile, "readlink", fakeReadlink)

        lockf = self.mktemp()
        lock = lockfile.FilesystemLock(lockf)
        lockfile.symlink(str(43125), lockf)
        self.assertFalse(lock.lock())
        self.assertFalse(lock.locked)

    def _readlinkErrorTest(
        self, exceptionType: type[OSError] | type[IOError], errno: int
    ) -> None:
        def fakeReadlink(name: str) -> NoReturn:
            raise exceptionType(errno, None)

        self.patch(lockfile, "readlink", fakeReadlink)

        lockf = self.mktemp()

        # Make it appear locked so it has to use readlink
        lockfile.symlink(str(43125), lockf)

        lock = lockfile.FilesystemLock(lockf)
        exc = self.assertRaises(exceptionType, lock.lock)
        self.assertEqual(exc.errno, errno)
        self.assertFalse(lock.locked)

    def test_readlinkError(self) -> None:
        """
        An exception raised by C{readlink} other than C{ENOENT} is passed up to
        the caller of L{FilesystemLock.lock}.
        """
        self._readlinkErrorTest(OSError, errno.ENOSYS)
        self._readlinkErrorTest(IOError, errno.ENOSYS)

    @skipIf(
        platform.isWindows(),
        "POSIX-specific error propagation not expected on Windows.",
    )
    def test_readlinkErrorPOSIX(self) -> None:
        """
        Any L{IOError} raised by C{readlink} on a POSIX platform passed to the
        caller of L{FilesystemLock.lock}.

        On POSIX, unlike on Windows, these are unexpected errors which cannot
        be handled by L{FilesystemLock}.
        """
        self._readlinkErrorTest(IOError, errno.ENOSYS)
        self._readlinkErrorTest(IOError, errno.EACCES)

    def test_lockCleanedUpConcurrently(self) -> None:
        """
        If a second process cleans up the lock after a first one checks the
        lock and finds that no process is holding it, the first process does
        not fail when it tries to clean up the lock.
        """

        def fakeRmlink(name: str) -> None:
            rmlinkPatch.restore()
            # Pretend to be another process cleaning up the lock.
            lockfile.rmlink(lockf)
            # Fall back to the real implementation of rmlink.
            return lockfile.rmlink(name)

        rmlinkPatch = self.patch(lockfile, "rmlink", fakeRmlink)

        def fakeKill(pid: int, signal: int) -> None:
            if signal != 0:
                raise OSError(errno.EPERM, None)
            if pid == 43125:
                raise OSError(errno.ESRCH, None)

        self.patch(lockfile, "kill", fakeKill)

        lockf = self.mktemp()
        lock = lockfile.FilesystemLock(lockf)
        lockfile.symlink(str(43125), lockf)
        self.assertTrue(lock.lock())
        self.assertTrue(lock.clean)
        self.assertTrue(lock.locked)

    def test_rmlinkError(self) -> None:
        """
        An exception raised by L{rmlink} other than C{ENOENT} is passed up
        to the caller of L{FilesystemLock.lock}.
        """

        def fakeRmlink(name: str) -> NoReturn:
            raise OSError(errno.ENOSYS, None)

        self.patch(lockfile, "rmlink", fakeRmlink)

        def fakeKill(pid: int, signal: int) -> None:
            if signal != 0:
                raise OSError(errno.EPERM, None)
            if pid == 43125:
                raise OSError(errno.ESRCH, None)

        self.patch(lockfile, "kill", fakeKill)

        lockf = self.mktemp()

        # Make it appear locked so it has to use readlink
        lockfile.symlink(str(43125), lockf)

        lock = lockfile.FilesystemLock(lockf)
        exc = self.assertRaises(OSError, lock.lock)
        self.assertEqual(exc.errno, errno.ENOSYS)
        self.assertFalse(lock.locked)

    def test_killError(self) -> None:
        """
        If L{kill} raises an exception other than L{OSError} with errno set to
        C{ESRCH}, the exception is passed up to the caller of
        L{FilesystemLock.lock}.
        """

        def fakeKill(pid: int, signal: int) -> NoReturn:
            raise OSError(errno.EPERM, None)

        self.patch(lockfile, "kill", fakeKill)

        lockf = self.mktemp()

        # Make it appear locked so it has to use readlink
        lockfile.symlink(str(43125), lockf)

        lock = lockfile.FilesystemLock(lockf)
        exc = self.assertRaises(OSError, lock.lock)
        self.assertEqual(exc.errno, errno.EPERM)
        self.assertFalse(lock.locked)

    def test_unlockOther(self) -> None:
        """
        L{FilesystemLock.unlock} raises L{ValueError} if called for a lock
        which is held by a different process.
        """
        lockf = self.mktemp()
        lockfile.symlink(str(os.getpid() + 1), lockf)
        lock = lockfile.FilesystemLock(lockf)
        self.assertRaises(ValueError, lock.unlock)

    def test_isLocked(self) -> None:
        """
        L{isLocked} returns C{True} if the named lock is currently locked,
        C{False} otherwise.
        """
        lockf = self.mktemp()
        self.assertFalse(lockfile.isLocked(lockf))
        lock = lockfile.FilesystemLock(lockf)
        self.assertTrue(lock.lock())
        self.assertTrue(lockfile.isLocked(lockf))
        lock.unlock()
        self.assertFalse(lockfile.isLocked(lockf))

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 475 B 0644
cert.pem.no_trailing_newline File 1.46 KB 0644
crash_test_dummy.py File 654 B 0644
ignore_test_failure.py File 34.88 KB 0644
iosim.py File 18.4 KB 0644
key.pem.no_trailing_newline File 1.63 KB 0644
mock_win32process.py File 1.25 KB 0644
myrebuilder1.py File 172 B 0644
myrebuilder2.py File 172 B 0644
plugin_basic.py File 925 B 0644
plugin_extra1.py File 400 B 0644
plugin_extra2.py File 566 B 0644
process_cmdline.py File 123 B 0644
process_echoer.py File 214 B 0644
process_fds.py File 983 B 0644
process_getargv.py File 233 B 0644
process_getenv.py File 268 B 0644
process_linger.py File 297 B 0644
process_reader.py File 178 B 0644
process_signal.py File 220 B 0644
process_stdinreader.py File 739 B 0644
process_tester.py File 787 B 0644
process_tty.py File 130 B 0644
process_twisted.py File 1.15 KB 0644
proto_helpers.py File 1.34 KB 0644
reflect_helper_IE.py File 60 B 0644
reflect_helper_VE.py File 81 B 0644
reflect_helper_ZDE.py File 48 B 0644
server.pem File 5.23 KB 0644
ssl_helpers.py File 1.72 KB 0644
stdio_test_consumer.py File 1.14 KB 0644
stdio_test_halfclose.py File 2 KB 0644
stdio_test_hostpeer.py File 1.06 KB 0644
stdio_test_lastwrite.py File 1.13 KB 0644
stdio_test_loseconn.py File 1.55 KB 0644
stdio_test_producer.py File 1.45 KB 0644
stdio_test_write.py File 902 B 0644
stdio_test_writeseq.py File 894 B 0644
test_abstract.py File 3.66 KB 0644
test_adbapi.py File 25.47 KB 0644
test_amp.py File 108.04 KB 0644
test_application.py File 33.34 KB 0644
test_compat.py File 17.75 KB 0644
test_context.py File 1.43 KB 0644
test_cooperator.py File 20.84 KB 0644
test_defer.py File 143.37 KB 0644
test_defgen.py File 13.01 KB 0644
test_dirdbm.py File 6.9 KB 0644
test_error.py File 9.6 KB 0644
test_factories.py File 4.46 KB 0644
test_fdesc.py File 7.28 KB 0644
test_finger.py File 1.89 KB 0644
test_formmethod.py File 4.27 KB 0644
test_ftp.py File 126.96 KB 0644
test_ftp_options.py File 2.65 KB 0644
test_htb.py File 3.19 KB 0644
test_ident.py File 6.56 KB 0644
test_internet.py File 45.38 KB 0644
test_iosim.py File 9.58 KB 0644
test_iutils.py File 13.31 KB 0644
test_lockfile.py File 15.54 KB 0644
test_log.py File 36.86 KB 0644
test_logfile.py File 17.79 KB 0644
test_loopback.py File 13.99 KB 0644
test_main.py File 2.12 KB 0644
test_memcache.py File 24.69 KB 0644
test_modules.py File 17.84 KB 0644
test_monkey.py File 6.38 KB 0644
test_paths.py File 73.64 KB 0644
test_pcp.py File 12.23 KB 0644
test_persisted.py File 14.73 KB 0644
test_plugin.py File 26.02 KB 0644
test_policies.py File 32.28 KB 0644
test_postfix.py File 4.32 KB 0644
test_process.py File 86.29 KB 0644
test_protocols.py File 7.16 KB 0644
test_randbytes.py File 3.63 KB 0644
test_rebuild.py File 7.4 KB 0644
test_reflect.py File 23.89 KB 0644
test_roots.py File 1.65 KB 0644
test_shortcut.py File 1.91 KB 0644
test_sip.py File 24.9 KB 0644
test_sob.py File 5.53 KB 0644
test_socks.py File 17.09 KB 0644
test_ssl.py File 22.73 KB 0644
test_sslverify.py File 113.84 KB 0644
test_stateful.py File 1.97 KB 0644
test_stdio.py File 12.43 KB 0644
test_strerror.py File 5.1 KB 0644
test_strports.py File 1.67 KB 0644
test_task.py File 47.73 KB 0644
test_tcp.py File 64.26 KB 0644
test_tcp_internals.py File 12.73 KB 0644
test_text.py File 6.47 KB 0644
test_threadable.py File 3.26 KB 0644
test_threadpool.py File 21.64 KB 0644
test_threads.py File 12.9 KB 0644
test_tpfile.py File 1.69 KB 0644
test_twistd.py File 72.29 KB 0644
test_twisted.py File 6.13 KB 0644
test_udp.py File 26.79 KB 0644
test_unix.py File 13.26 KB 0644
test_usage.py File 22.76 KB 0644
testutils.py File 5.06 KB 0644
Filemanager