__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.10: ~ $
import itertools
import re
import sys
import threading
import time
from functools import wraps
from typing import Optional

from uaclient import (
    actions,
    api,
    daemon,
    entitlements,
    event_logger,
    exceptions,
    lock,
    messages,
    status,
    util,
)
from uaclient.api.u.pro.security.fix._common import CVE_OR_USN_REGEX
from uaclient.api.u.pro.status.is_attached.v1 import _is_attached
from uaclient.apt import AptProxyScope, setup_apt_proxy
from uaclient.config import UAConfig
from uaclient.files import machine_token

event = event_logger.get_event_logger()


CLEAR_LINE_ANSI_CODE = "\033[K"


class CLIEnableDisableProgress(api.AbstractProgress):
    def __init__(self, *, assume_yes: bool):
        self.is_interactive = not assume_yes
        self.assume_yes = assume_yes

    def progress(
        self,
        *,
        total_steps: int,
        done_steps: int,
        previous_step_message: Optional[str],
        current_step_message: Optional[str]
    ):
        if current_step_message is not None:
            print(current_step_message)

    def _on_event(self, event: str, payload):
        if event == "info":
            print(payload)
        elif event == "message_operation":
            if not util.handle_message_operations(payload, self.assume_yes):
                raise exceptions.PromptDeniedError()


def _null_print(*args, **kwargs):
    pass


def create_interactive_only_print_function(json_output: bool):
    if json_output:
        return _null_print
    else:
        return print


def assert_lock_file(lock_holder=None):
    """Decorator asserting exclusive access to lock file"""

    def wrapper(f):
        @wraps(f)
        def new_f(*args, cfg, **kwargs):
            with lock.RetryLock(lock_holder=lock_holder, sleep_time=1):
                retval = f(*args, cfg=cfg, **kwargs)
            return retval

        return new_f

    return wrapper


def assert_root(f):
    """Decorator asserting root user"""

    @wraps(f)
    def new_f(*args, **kwargs):
        if not util.we_are_currently_root():
            raise exceptions.NonRootUserError()
        else:
            return f(*args, **kwargs)

    return new_f


def verify_json_format_args(f):
    """Decorator to verify if correct params are used for json format"""

    @wraps(f)
    def new_f(cmd_args, *args, **kwargs):
        if not cmd_args:
            return f(cmd_args, *args, **kwargs)

        if cmd_args.format == "json" and not cmd_args.assume_yes:
            raise exceptions.CLIJSONFormatRequireAssumeYes()
        else:
            return f(cmd_args, *args, **kwargs)

    return new_f


def assert_attached(raise_custom_error_function=None):
    """Decorator asserting attached config.
    :param msg_function: Optional function to generate a custom message
    if raising an UnattachedError
    """

    def wrapper(f):
        @wraps(f)
        def new_f(args, cfg, **kwargs):
            if not _is_attached(cfg).is_attached:
                if raise_custom_error_function:
                    command = getattr(args, "command", "")
                    service_names = getattr(args, "service", "")
                    raise_custom_error_function(
                        command=command, service_names=service_names, cfg=cfg
                    )
                else:
                    raise exceptions.UnattachedError()
            return f(args, cfg=cfg, **kwargs)

        return new_f

    return wrapper


def assert_not_attached(f):
    """Decorator asserting unattached config."""

    @wraps(f)
    def new_f(args, cfg, **kwargs):
        if _is_attached(cfg).is_attached:
            machine_token_file = machine_token.get_machine_token_file()
            raise exceptions.AlreadyAttachedError(
                account_name=machine_token_file.account.get("name", "")
            )
        return f(args, cfg=cfg, **kwargs)

    return new_f


def assert_vulnerability_issue_valid(cmd):
    def wrapper(f):
        @wraps(f)
        def new_f(args, cfg, **kwargs):
            security_issue = getattr(args, "security_issue", "")
            if not re.match(CVE_OR_USN_REGEX, security_issue):
                raise exceptions.InvalidSecurityIssueIdFormat(
                    issue=security_issue,
                    cmd=cmd,
                )

            return f(args, cfg=cfg, **kwargs)

        return new_f

    return wrapper


def _raise_enable_disable_unattached_error(command, service_names, cfg):
    """Raises a custom error for enable/disable commands when unattached.

    Takes into consideration if the services exist or not, and notify the user
    accordingly."""
    (
        entitlements_found,
        entitlements_not_found,
    ) = entitlements.get_valid_entitlement_names(names=service_names, cfg=cfg)
    if entitlements_found and entitlements_not_found:
        raise exceptions.UnattachedMixedServicesError(
            valid_service=", ".join(entitlements_found),
            operation=command,
            invalid_service=", ".join(entitlements_not_found),
            service_msg="",
        )
    elif entitlements_found:
        raise exceptions.UnattachedValidServicesError(
            valid_service=", ".join(entitlements_found), operation=command
        )
    else:
        raise exceptions.UnattachedInvalidServicesError(
            operation=command,
            invalid_service=", ".join(entitlements_not_found),
            service_msg="",
        )


def post_cli_attach(cfg: UAConfig) -> None:
    machine_token_file = machine_token.get_machine_token_file(cfg)
    contract_name = machine_token_file.contract_name

    if contract_name:
        event.info(
            messages.ATTACH_SUCCESS_TMPL.format(contract_name=contract_name)
        )
    else:
        event.info(messages.ATTACH_SUCCESS_NO_CONTRACT_NAME)

    daemon.stop()
    daemon.cleanup(cfg)

    status_dict, _ret = actions.status(cfg)
    output = status.format_tabular(status_dict)
    event.info(util.handle_unicode_characters(output))
    event.process_events()


def configure_apt_proxy(
    cfg: UAConfig,
    scope: AptProxyScope,
    set_key: str,
    set_value: Optional[str],
) -> None:
    """
    Handles setting part the apt proxies - global and uaclient scoped proxies
    """
    if scope == AptProxyScope.GLOBAL:
        http_proxy = cfg.global_apt_http_proxy
        https_proxy = cfg.global_apt_https_proxy
    elif scope == AptProxyScope.UACLIENT:
        http_proxy = cfg.ua_apt_http_proxy
        https_proxy = cfg.ua_apt_https_proxy
    if "https" in set_key:
        https_proxy = set_value
    else:
        http_proxy = set_value
    setup_apt_proxy(
        http_proxy=http_proxy, https_proxy=https_proxy, proxy_scope=scope
    )


def run_spinner(stop_spinner, msg):
    spinner = itertools.cycle(["|", "/", "-", "\\"])
    while not stop_spinner.is_set():
        sys.stdout.write(
            "\r{}{}{}".format(msg, next(spinner), CLEAR_LINE_ANSI_CODE)
        )
        sys.stdout.flush()
        time.sleep(0.1)
        sys.stdout.write("\b")


def with_spinner(msg: Optional[str] = ""):
    def wrapper(f):
        @wraps(f)
        def new_f(args, *, cfg, **kwargs):
            if not sys.stdout.isatty():
                return f(args, cfg=cfg, **kwargs)

            stop_spinner = threading.Event()

            spinner_thread = threading.Thread(
                target=run_spinner, args=(stop_spinner, msg)
            )
            spinner_thread.start()

            retval = f(args, cfg=cfg, **kwargs)

            stop_spinner.set()
            spinner_thread.join()

            sys.stdout.write("\r" + CLEAR_LINE_ANSI_CODE)
            sys.stdout.flush()

            return retval

        return new_f

    return wrapper


def colorize_priority(priority):
    if priority == "low":
        return messages.TxtColor.INFOBLUE + priority + messages.TxtColor.ENDC
    elif priority == "medium":
        return (
            messages.TxtColor.WARNINGYELLOW + priority + messages.TxtColor.ENDC
        )
    elif priority == "high":
        return messages.TxtColor.ORANGE + priority + messages.TxtColor.ENDC
    elif priority == "critical":
        return messages.TxtColor.FAIL + priority + messages.TxtColor.ENDC
    else:
        return priority

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 9.28 KB 0644
api.py File 2.4 KB 0644
attach.py File 5.48 KB 0644
auto_attach.py File 958 B 0644
cli_util.py File 8.08 KB 0644
collect_logs.py File 1.6 KB 0644
commands.py File 3.68 KB 0644
config.py File 10.72 KB 0644
cve.py File 6.73 KB 0644
cves.py File 3.78 KB 0644
detach.py File 3.38 KB 0644
disable.py File 10.65 KB 0644
enable.py File 18.81 KB 0644
fix.py File 28.63 KB 0644
formatter.py File 8.75 KB 0644
help.py File 1.67 KB 0644
parser.py File 2.72 KB 0644
refresh.py File 2.42 KB 0644
security_status.py File 2.79 KB 0644
status.py File 2.37 KB 0644
system.py File 1.05 KB 0644
Filemanager