airtest.aircv.screen_recorder 源代码

# coding=utf-8
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import cv2
import ffmpeg
import threading
import time
import numpy as np
import subprocess


RECORDER_ORI = {
    "PORTRAIT": 1,
    "LANDSCAPE": 2,
    "ROTATION": 0,  # The screen is centered in a square
}

[文档]def resize_by_max(img, max_size=800): if img is None: return np.zeros((max_size, max_size, 3), dtype=np.uint8) max_len = max(img.shape[0], img.shape[1]) if max_len > max_size: scale = max_size / max_len img = cv2.resize(img, (int(img.shape[1] * scale), int(img.shape[0] * scale))) return img
[文档]def get_max_size(max_size): try: max_size = int(max_size) except: max_size = None else: if max_size <= 0: max_size = None return max_size
[文档]class FfmpegVidWriter: """ Generate a video using FFMPEG. """ def __init__(self, outfile, width, height, fps=10, orientation=0): self.fps = fps # 三种横竖屏录屏模式 1 竖屏 2 横屏 0 方形居中 self.orientation = RECORDER_ORI.get(str(orientation).upper(), orientation) if self.orientation == 1: self.height = max(width, height) self.width = min(width, height) elif self.orientation == 2: self.width = max(width, height) self.height = min(width, height) else: self.width = self.height = max(width, height) # 满足视频宽高条件 self.height = height = self.height - (self.height % 32) + 32 self.width = width = self.width - (self.width % 32) + 32 self.cache_frame = np.zeros((height, width, 3), dtype=np.uint8) try: subprocess.Popen("ffmpeg", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).wait() except FileNotFoundError: from airtest.utils.ffmpeg import ffmpeg_setter try: ffmpeg_setter.add_paths() except Exception as e: print("Error: setting ffmpeg path failed, please download it at https://ffmpeg.org/download.html then add ffmpeg path to PATH") raise self.process = ( ffmpeg .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height), framerate=self.fps) .output(outfile, pix_fmt='yuv420p', vcodec='libx264', crf=25, preset="veryfast", framerate=self.fps) .global_args("-loglevel", "error") .overwrite_output() .run_async(pipe_stdin=True) ) self.writer = self.process.stdin
[文档] def process_frame(self, frame): assert len(frame.shape) == 3 frame = frame[..., ::-1] if self.orientation == 1 and frame.shape[1] > frame.shape[0]: frame = cv2.resize(frame, (self.width, int(self.width*self.width/self.height))) elif self.orientation == 2 and frame.shape[1] < frame.shape[0]: frame = cv2.resize(frame, (int(self.height*self.height/self.width), self.height)) h_st = max(self.cache_frame.shape[0]//2 - frame.shape[0]//2, 0) w_st = max(self.cache_frame.shape[1]//2 - frame.shape[1]//2, 0) h_ed = min(h_st+frame.shape[0], self.cache_frame.shape[0]) w_ed = min(w_st+frame.shape[1], self.cache_frame.shape[1]) self.cache_frame[:] = 0 self.cache_frame[h_st:h_ed, w_st:w_ed, :] = frame[:(h_ed-h_st), :(w_ed-w_st)] return self.cache_frame.copy()
[文档] def write(self, frame): self.writer.write(frame.astype(np.uint8))
[文档] def close(self): self.writer.close() self.process.wait() self.process.terminate()
[文档]class ScreenRecorder: def __init__(self, outfile, get_frame_func, fps=10, snapshot_sleep=0.001, orientation=0): self.get_frame_func = get_frame_func self.tmp_frame = self.get_frame_func() self.snapshot_sleep = snapshot_sleep width, height = self.tmp_frame.shape[1], self.tmp_frame.shape[0] self.writer = FfmpegVidWriter(outfile, width, height, fps, orientation) self.tmp_frame = self.writer.process_frame(self.tmp_frame) self._is_running = False self._stop_flag = False self._stop_time = 0
[文档] def is_running(self): return self._is_running
@property def stop_time(self): return self._stop_time @stop_time.setter def stop_time(self, max_time): if isinstance(max_time, int) and max_time > 0: self._stop_time = time.time() + max_time else: print("failed to set stop time")
[文档] def is_stop(self): if self._stop_flag: return True if self._stop_time > 0 and time.time() >= self._stop_time: return True return False
[文档] def start(self): if self._is_running: print("recording is already running, please don't call again") return False self._is_running = True self.t_stream = threading.Thread(target=self.get_frame_loop) self.t_stream.setDaemon(True) self.t_stream.start() self.t_write = threading.Thread(target=self.write_frame_loop) self.t_write.setDaemon(True) self.t_write.start() return True
[文档] def stop(self): self._is_running = False self._stop_flag = True self.t_write.join() self.t_stream.join()
[文档] def get_frame_loop(self): # 单独一个线程持续截图 try: while True: tmp_frame = self.get_frame_func() self.tmp_frame = self.writer.process_frame(tmp_frame) time.sleep(self.snapshot_sleep) if self.is_stop(): break self._stop_flag = True except Exception as e: print("record thread error", e) self._stop_flag = True raise
[文档] def write_frame_loop(self): # 按帧率间隔获取图像写入视频 try: duration = 1.0/self.writer.fps last_time = time.time() self._stop_flag = False while True: if time.time()-last_time >= duration: last_time += duration self.writer.write(self.tmp_frame) if self.is_stop(): break time.sleep(0.0001) self.writer.close() self._stop_flag = True except Exception as e: print("write thread error", e) self._stop_flag = True raise