__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

www-data@216.73.216.148: ~ $
import gc
import mmap
import threading

import numpy as np

try:
    # If available, use pure python kms package
    import kms as pykms
except ImportError:
    import pykms

from libcamera import Transform

from picamera2.previews.null_preview import *


class DrmManager():
    def __init__(self):
        self.lock = threading.Lock()
        self.use_count = 0

    def add(self, drm_preview):
        with self.lock:
            if self.use_count == 0:
                self.card = pykms.Card()
                self.resman = pykms.ResourceManager(self.card)
                conn = self.resman.reserve_connector()
                self.crtc = self.resman.reserve_crtc(conn)
            self.use_count += 1
        drm_preview.card = self.card
        drm_preview.resman = self.resman
        drm_preview.crtc = self.crtc

    def remove(self, drm_preview):
        drm_preview.card = None
        drm_preview.resman = None
        drm_preview.crtc = None
        with self.lock:
            self.use_count -= 1
            if self.use_count == 0:
                self.crtc = None
                self.resman = None
                self.card = None
                gc.collect()


class DrmPreview(NullPreview):
    FMT_MAP = {
        "RGB888": pykms.PixelFormat.RGB888,
        "BGR888": pykms.PixelFormat.BGR888,
        # doesn't work "YUYV": pykms.PixelFormat.YUYV,
        # doesn't work "YVYU": pykms.PixelFormat.YVYU,
        "XRGB8888": pykms.PixelFormat.XRGB8888,
        "XBGR8888": pykms.PixelFormat.XBGR8888,
        "YUV420": pykms.PixelFormat.YUV420,
        "YVU420": pykms.PixelFormat.YVU420,
        "MJPEG": pykms.PixelFormat.BGR888,
    }

    _manager = DrmManager()

    def __init__(self, x=0, y=0, width=640, height=480, transform=None):
        self.init_drm(x, y, width, height, transform)
        self.stop_count = 0

        # Allocate a buffer for MJPEG decode. If "XB24" appears unsupported, try "XR24".
        self.fb = None
        try:
            self.fb = pykms.DumbFramebuffer(self.card, width, height, "XB24")
        except Exception:
            pass
        if not self.fb:
            try:
                self.fb = pykms.DumbFramebuffer(self.card, width, height, "XR24")
            except Exception:
                pass
        # Even if we don't have a buffer, only fail later if it turns out we need it.
        if self.fb:
            self.mem = mmap.mmap(self.fb.fd(0), width * height * 3, mmap.MAP_SHARED, mmap.PROT_WRITE)
            self.fd = self.fb.fd(0)

        super().__init__(width=width, height=height)

    def render_request(self, completed_request):
        """Draw the camera image using DRM."""
        with self.lock:
            self.render_drm(self.picam2, completed_request)
            if self.current and self.own_current:
                self.current.release()
            self.current = completed_request
            self.own_current = (completed_request.config['buffer_count'] > 1)
            if self.own_current:
                self.current.acquire()

    def handle_request(self, picam2):
        picam2.process_requests(self)

    def init_drm(self, x, y, width, height, transform):
        DrmPreview._manager.add(self)

        self.plane = None
        self.drmfbs = {}
        self.current = None
        self.own_current = False
        self.window = (x, y, width, height)
        self.transform = Transform() if transform is None else transform
        self.overlay_plane = None
        self.overlay_fb = None
        self.overlay_new_fb = None
        self.lock = threading.Lock()
        self.display_stream_name = None

    def set_overlay(self, overlay):
        if self.picam2 is None:
            raise RuntimeError("Preview must be started before setting an overlay")
        if not self.picam2.camera_config:
            raise RuntimeError("Preview must be configured before setting an overlay")
        if self.picam2.camera_config['buffer_count'] < 2:
            raise RuntimeError("Need at least buffer_count=2 to set overlay")
        if self.overlay_plane is None:
            raise RuntimeError("Overlays not supported on this device")

        if overlay is None:
            self.overlay_new_fb = None
        else:
            h, w, channels = overlay.shape
            # Should I be recycling these instead of making new ones all the time?
            new_fb = pykms.DumbFramebuffer(self.card, w, h, "AB24")
            with mmap.mmap(new_fb.fd(0), w * h * 4, mmap.MAP_SHARED, mmap.PROT_WRITE) as mm:
                mm.write(np.ascontiguousarray(overlay).data)
            self.overlay_new_fb = new_fb

        if self.picam2.display_stream_name is not None:
            with self.lock:
                self.render_drm(self.picam2, None)

    def render_drm(self, picam2, completed_request):
        if completed_request is not None:
            self.display_stream_name = completed_request.config['display']
            stream = completed_request.stream_map[self.display_stream_name]
        else:
            if self.display_stream_name is None:
                self.display_stream_name = picam2.display_stream_name
            stream = picam2.stream_map[self.display_stream_name]

        cfg = stream.configuration
        pixel_format = str(cfg.pixel_format)
        width, height = (cfg.size.width, cfg.size.height)

        x, y, w, h = self.window
        # Letter/pillar-box to preserve the image's aspect ratio.
        if width * h > w * height:
            new_h = w * height // width
            y += (h - new_h) // 2
            h = new_h
        else:
            new_w = h * width // height
            x += (w - new_w) // 2
            w = new_w

        if self.plane is None:
            if pixel_format not in self.FMT_MAP:
                raise RuntimeError(f"Format {pixel_format} not supported by DRM preview")
            fmt = self.FMT_MAP[pixel_format]

            self.plane = self.resman.reserve_overlay_plane(self.crtc, format=fmt)
            if self.plane is None:
                # Some display devices may not support "alpha".
                self.plane = self.resman.reserve_plane(self.crtc, type=pykms.PlaneType.Primary, format=fmt)
                if self.plane is None:
                    raise RuntimeError("Failed to reserve DRM plane")
            drm_rotation = 1
            if self.transform.hflip:
                drm_rotation |= 16
            if self.transform.vflip:
                drm_rotation |= 32
            try:
                self.plane.set_prop("rotation", drm_rotation)
            except RuntimeError:
                pass
            # The second plane we ask for will go on top of the first.
            self.overlay_plane = self.resman.reserve_overlay_plane(self.crtc, format=pykms.PixelFormat.ABGR8888)
            if self.overlay_plane is not None:
                # Want "coverage" mode, not pre-multiplied alpha. fkms doesn't seem to have this
                # property so we suppress the error, but it seems to have the right behaviour anyway.
                try:
                    self.overlay_plane.set_prop("pixel blend mode", 1)
                except RuntimeError:
                    pass

        # Use an atomic commit for rendering
        ctx = pykms.AtomicReq(self.card)
        if completed_request is not None:
            fb = completed_request.request.buffers[stream]

            if pixel_format == "MJPEG":
                img = completed_request.make_array(self.display_stream_name).tobytes()
                if not self.fb:
                    # This is point at which this buffer really needs to exist!
                    raise RuntimeError("Failed to allocate buffer for MJPEG frame")
                self.mem.seek(0)
                self.mem.write(img)
                fd = self.fd
                stride = width * 3
            else:
                fd = fb.planes[0].fd
                stride = cfg.stride

            if fb not in self.drmfbs:
                if self.stop_count != picam2.stop_count:
                    old_drmfbs = self.drmfbs  # hang on to these until after a new one is sent
                    self.drmfbs = {}
                    self.stop_count = picam2.stop_count
                fmt = self.FMT_MAP[pixel_format]

                if pixel_format in ("YUV420", "YVU420"):
                    h2 = height // 2
                    stride2 = stride // 2
                    size = height * stride
                    drmfb = pykms.DmabufFramebuffer(self.card, width, height, fmt,
                                                    [fd, fd, fd],
                                                    [stride, stride2, stride2],
                                                    [0, size, size + h2 * stride2])
                else:
                    drmfb = pykms.DmabufFramebuffer(self.card, width, height, fmt, [fd], [stride], [0])
                self.drmfbs[fb] = drmfb

            drmfb = self.drmfbs[fb]
            ctx.add_plane(self.plane, drmfb, self.crtc, (0, 0, width, height), (x, y, w, h))

        overlay_new_fb = self.overlay_new_fb
        if overlay_new_fb != self.overlay_fb:
            overlay_old_fb = self.overlay_fb  # Must hang on to this momentarily to avoid a "wink"
            self.overlay_fb = overlay_new_fb
            if self.overlay_fb is not None:
                width, height = self.overlay_fb.width, self.overlay_fb.height
                ctx.add_plane(self.overlay_plane, self.overlay_fb, self.crtc, (0, 0, width, height), (x, y, w, h))
        ctx.commit_sync()
        ctx = None
        overlay_old_fb = None  # noqa  The new one has been sent so it's safe to let this go now
        old_drmfbs = None  # noqa  Can chuck these away now too

    def stop(self):
        super().stop()
        # We may be hanging on to a request, return it to the camera system.
        if self.current is not None and self.own_current:
            self.current.release()
        self.current = None
        self.display_stream_name = None
        # Seem to need some of this in order to be able to create another DrmPreview.
        self.drmfbs = {}
        self.overlay_new_fb = None
        self.overlay_fb = None
        self.plane = None
        self.overlay_plane = None
        self.fd = None
        self.mem = None
        self.fb = None
        DrmPreview._manager.remove(self)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 122 B 0644
drm_preview.py File 10.1 KB 0644
gl_helpers.py File 2.06 KB 0644
null_preview.py File 2.81 KB 0644
q_gl_picamera2.py File 19.05 KB 0644
q_picamera2.py File 9.59 KB 0644
qt.py File 1.53 KB 0644
qt_compatibility.py File 1.37 KB 0644
qt_previews.py File 5.08 KB 0644
Filemanager