airtest.core.android.cap_methods.javacap 源代码

# -*- coding: utf-8 -*-
from functools import partial
from airtest.utils.logger import get_logger
from airtest.utils.safesocket import SafeSocket
from airtest.utils.nbsp import NonBlockingStreamReader
from airtest.utils.snippet import on_method_ready, reg_cleanup, kill_proc
from airtest.core.android.yosemite import Yosemite
from airtest.core.android.cap_methods.base_cap import BaseCap
from airtest.utils.threadsafe import threadsafe_generator
from airtest.core.error import ScreenError
import struct
LOGGING = get_logger(__name__)


[文档]class Javacap(Yosemite, BaseCap): """ This is another screencap class, it is slower in performance than minicap, but it provides the better compatibility """ APP_PKG = "com.netease.nie.yosemite" SCREENCAP_SERVICE = "com.netease.nie.yosemite.Capture" RECVTIMEOUT = None def __init__(self, adb, *args, **kwargs): super(Javacap, self).__init__(adb) self.frame_gen = None self.cleanup_func = [] @on_method_ready('install_or_upgrade') def _setup_stream_server(self): """ Setup stream server Returns: adb shell process, non-blocking stream reader and local port """ localport, deviceport = self.adb.setup_forward("localabstract:javacap_{}".format) deviceport = deviceport[len("localabstract:"):] # setup agent proc apkpath = self.adb.path_app(self.APP_PKG) cmds = ["CLASSPATH=" + apkpath, 'exec', 'app_process', '/system/bin', self.SCREENCAP_SERVICE, "--scale", "100", "--socket", "%s" % deviceport, "-lazy", "2>&1"] proc = self.adb.start_shell(cmds) # check proc output nbsp = NonBlockingStreamReader(proc.stdout, print_output=True, name="javacap_sever", auto_kill=True) while True: line = nbsp.readline(timeout=5.0) if line is None: raise ScreenError("javacap server setup timeout") if b"Capture server listening on" in line: break if b"Address already in use" in line: raise ScreenError("javacap server setup error: %s" % line) reg_cleanup(kill_proc, proc) return proc, nbsp, localport
[文档] @threadsafe_generator def get_frames(self): """ Get the screen frames Returns: None """ proc, nbsp, localport = self._setup_stream_server() s = SafeSocket() s.connect((self.adb.host, localport)) try: t = s.recv(24) except Exception as e: # On some phones, the connection can be established successfully, # but an error is reported when starting to fetch data raise ScreenError(e) # javacap header LOGGING.debug(struct.unpack("<2B5I2B", t)) stopping = False # reg_cleanup(s.close) self.cleanup_func.append(s.close) self.cleanup_func.append(nbsp.kill) self.cleanup_func.append(partial(kill_proc, proc)) self.cleanup_func.append(partial(self.adb.remove_forward, "tcp:%s" % localport)) while not stopping: s.send(b"1") # recv frame header, count frame_size if self.RECVTIMEOUT is not None: header = s.recv_with_timeout(4, self.RECVTIMEOUT) else: header = s.recv(4) if header is None: LOGGING.error("javacap header is None") # recv timeout, if not frame updated, maybe screen locked stopping = yield None else: frame_size = struct.unpack("<I", header)[0] frame_data = s.recv(frame_size) stopping = yield frame_data LOGGING.debug("javacap stream ends") self._cleanup()
[文档] def get_frame_from_stream(self): """ Get frame from the stream Returns: frame """ if self.frame_gen is None: self.frame_gen = self.get_frames() return self.frame_gen.send(None)
def _cleanup(self): """ Cleanup javacap process and stream reader Returns: """ for func in self.cleanup_func: func() self.cleanup_func = []
[文档] def teardown_stream(self): """ End stream Returns: None """ self._cleanup() if not self.frame_gen: return try: self.frame_gen.send(1) except (TypeError, StopIteration): pass else: LOGGING.warn("%s tear down failed" % self.frame_gen) self.frame_gen = None