Source code for airtest.core.android.adb

# -*- coding: utf-8 -*-
import os
import re
import sys
import time
import random
import platform
import psutil
import warnings
import subprocess
import threading
from copy import copy
from six import PY3, text_type, binary_type
from six.moves import reduce

from airtest.core.android.constant import (DEFAULT_ADB_PATH, IP_PATTERN,
                                           SDK_VERISON_ANDROID7)
from airtest.core.error import (AdbError, AdbShellError, AirtestError,
                                DeviceConnectionError)
from airtest.utils.compat import decode_path, raisefrom, proc_communicate_timeout, SUBPROCESS_FLAG
from airtest.utils.logger import get_logger
from airtest.utils.nbsp import NonBlockingStreamReader
from airtest.utils.retry import retries
from airtest.utils.apkparser import APK
from airtest.utils.snippet import get_std_encoding, reg_cleanup, split_cmd, make_file_executable

LOGGING = get_logger(__name__)


[docs]class ADB(object): """adb client object class""" _instances = [] status_device = "device" status_offline = "offline" SHELL_ENCODING = "utf-8" def __init__(self, serialno=None, adb_path=None, server_addr=None, display_id=None, input_event=None): self.serialno = serialno self.adb_path = adb_path or self.get_adb_path() self.display_id = display_id self.input_event = input_event self._set_cmd_options(server_addr) self.connect() self._sdk_version = None self._line_breaker = None self._display_info = {} self._display_info_lock = threading.Lock() self._forward_local_using = [] self.__class__._instances.append(self)
[docs] @staticmethod def get_adb_path(): """ Returns the path to the adb executable. Checks if adb process is running, returns the running process path. If adb process is not running, checks if ANDROID_HOME environment variable is set. Constructs the adb path using ANDROID_HOME and returns it if set. If adb process is not running and ANDROID_HOME is not set, uses built-in adb path. Returns: str: The path to the adb executable. """ if platform.system() == "Windows": ADB_NAME = "adb.exe" else: ADB_NAME = "adb" # Check if adb process is already running try: for process in psutil.process_iter(['name', 'exe']): if process.info['name'] == ADB_NAME and process.info['exe'] and os.path.exists(process.info['exe']): return process.info['exe'] except: # maybe OSError pass # Check if ANDROID_HOME environment variable exists android_home = os.environ.get('ANDROID_HOME') if android_home: adb_path = os.path.join(android_home, 'platform-tools', ADB_NAME) if os.path.exists(adb_path): return adb_path # Use airtest builtin adb path builtin_adb_path = ADB.builtin_adb_path() return builtin_adb_path
[docs] @staticmethod def builtin_adb_path(): """ Return built-in adb executable path Returns: adb executable path """ system = platform.system() machine = platform.machine() adb_path = DEFAULT_ADB_PATH.get('{}-{}'.format(system, machine)) if not adb_path: adb_path = DEFAULT_ADB_PATH.get(system) if not adb_path: raise RuntimeError("No adb executable supports this platform({}-{}).".format(system, machine)) if system != "Windows": # chmod +x adb make_file_executable(adb_path) return adb_path
def _set_cmd_options(self, server_addr=None): """ Set communication parameters (host and port) between adb server and adb client Args: server_addr: adb server address, default is ["127.0.0.1", 5037] Returns: None """ self.host = server_addr[0] if server_addr else "127.0.0.1" self.port = server_addr[1] if server_addr else 5037 self.cmd_options = [self.adb_path] if self.host not in ("localhost", "127.0.0.1"): self.cmd_options += ['-H', self.host] if self.port != 5037: self.cmd_options += ['-P', str(self.port)]
[docs] def start_server(self): """ Perform `adb start-server` command to start the adb server Returns: None """ return self.cmd("start-server", device=False)
[docs] def kill_server(self): """ Perform `adb kill-server` command to kill the adb server Returns: None """ return self.cmd("kill-server", device=False)
[docs] def version(self): """ Perform `adb version` command and return the command output Returns: command output """ return self.cmd("version", device=False).strip()
[docs] def start_cmd(self, cmds, device=True): """ Start a subprocess with adb command(s) Args: cmds: command(s) to be run device: if True, the device serial number must be specified by `-s serialno` argument Raises: RuntimeError: if `device` is True and serialno is not specified Returns: a subprocess """ if device: if not self.serialno: raise RuntimeError("please set serialno first") cmd_options = self.cmd_options + ['-s', self.serialno] else: cmd_options = self.cmd_options cmds = cmd_options + split_cmd(cmds) LOGGING.debug(" ".join(cmds)) if not PY3: cmds = [c.encode(get_std_encoding(sys.stdin)) for c in cmds] proc = subprocess.Popen( cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, creationflags=SUBPROCESS_FLAG ) return proc
[docs] def cmd(self, cmds, device=True, ensure_unicode=True, timeout=None): """ Run the adb command(s) in subprocess and return the standard output Args: cmds: command(s) to be run device: if True, the device serial number must be specified by -s serialno argument ensure_unicode: encode/decode unicode of standard outputs (stdout, stderr) timeout: timeout in seconds Raises: DeviceConnectionError: if any error occurs when connecting the device AdbError: if any other adb error occurs Returns: command(s) standard output (stdout) """ proc = self.start_cmd(cmds, device) if timeout: stdout, stderr = proc_communicate_timeout(proc, timeout) else: stdout, stderr = proc.communicate() if ensure_unicode: stdout = stdout.decode(get_std_encoding(sys.stdout)) stderr = stderr.decode(get_std_encoding(sys.stderr)) if proc.returncode > 0: # adb connection error pattern = DeviceConnectionError.DEVICE_CONNECTION_ERROR if isinstance(stderr, binary_type): pattern = pattern.encode("utf-8") if re.search(pattern, stderr): raise DeviceConnectionError(stderr) else: raise AdbError(stdout, stderr) return stdout
[docs] def close_proc_pipe(self, proc): """close stdin/stdout/stderr of subprocess.Popen.""" def close_pipe(pipe): if pipe: pipe.close() close_pipe(proc.stdin) close_pipe(proc.stdout) close_pipe(proc.stderr)
[docs] def devices(self, state=None): """ Perform `adb devices` command and return the list of adb devices Args: state: optional parameter to filter devices in specific state Returns: list od adb devices """ patten = re.compile(r'[\w\d.:-]+\t[\w]+$') device_list = [] # self.start_server() output = self.cmd("devices", device=False) for line in output.splitlines(): line = line.strip() if not line or not patten.match(line): continue serialno, cstate = line.split('\t') if state and cstate != state: continue device_list.append((serialno, cstate)) return device_list
[docs] def connect(self, force=False): """ Perform `adb connect` command, remote devices are preferred to connect first Args: force: force connection, default is False Returns: None """ if self.serialno and ":" in self.serialno and (force or self.get_status() != "device"): connect_result = self.cmd("connect %s" % self.serialno) LOGGING.info(connect_result)
[docs] def disconnect(self): """ Perform `adb disconnect` command Returns: None """ if ":" in self.serialno: self.cmd("disconnect %s" % self.serialno)
[docs] def get_status(self): """ Perform `adb get-state` and return the device status Raises: AdbError: if status cannot be obtained from the device Returns: None if status is `not found`, otherwise return the standard output from `adb get-state` command """ proc = self.start_cmd("get-state") stdout, stderr = proc.communicate() stdout = stdout.decode(get_std_encoding(sys.stdout)) stderr = stderr.decode(get_std_encoding(sys.stdout)) if proc.returncode == 0: return stdout.strip() elif "not found" in stderr: return None else: raise AdbError(stdout, stderr)
[docs] def wait_for_device(self, timeout=5): """ Perform `adb wait-for-device` command Args: timeout: time interval in seconds to wait for device Raises: DeviceConnectionError: if device is not available after timeout Returns: None """ try: self.cmd("wait-for-device", timeout=timeout) except RuntimeError as e: raisefrom(DeviceConnectionError, "device not ready", e)
[docs] def start_shell(self, cmds): """ Handle `adb shell` command(s) Args: cmds: adb shell command(s) Returns: None """ cmds = ['shell'] + split_cmd(cmds) return self.start_cmd(cmds)
[docs] def raw_shell(self, cmds, ensure_unicode=True): """ Handle `adb shell` command(s) with unicode support Args: cmds: adb shell command(s) ensure_unicode: decode/encode unicode True or False, default is True Returns: command(s) output """ cmds = ['shell'] + split_cmd(cmds) out = self.cmd(cmds, ensure_unicode=False) if not ensure_unicode: return out # use shell encoding to decode output try: return out.decode(self.SHELL_ENCODING) except UnicodeDecodeError: warnings.warn("shell output decode {} fail. repr={}".format(self.SHELL_ENCODING, repr(out))) return text_type(repr(out))
[docs] def shell(self, cmd): """ Run the `adb shell` command on the device Args: cmd: a command to be run Raises: AdbShellError: if command return value is non-zero or if any other `AdbError` occurred Returns: command output """ if self.sdk_version < SDK_VERISON_ANDROID7: # for sdk_version < 25, adb shell do not raise error # https://stackoverflow.com/questions/9379400/adb-error-codes cmd = split_cmd(cmd) + [";", "echo", "---$?---"] out = self.raw_shell(cmd).rstrip() m = re.match("(.*)---(\d+)---$", out, re.DOTALL) if not m: warnings.warn("return code not matched") stdout = out returncode = 0 else: stdout = m.group(1) returncode = int(m.group(2)) if returncode > 0: raise AdbShellError("", stdout) return stdout else: try: out = self.raw_shell(cmd) except AdbError as err: raise AdbShellError(err.stdout, err.stderr) else: return out
[docs] def keyevent(self, keyname): """ Perform `adb shell input keyevent` command on the device Args: keyname: key event name Returns: None """ self.shell(["input", "keyevent", keyname.upper()])
[docs] def getprop(self, key, strip=True): """ Perform `adb shell getprop` on the device Args: key: key value for property strip: True or False to strip the return carriage and line break from returned string Returns: propery value """ prop = self.raw_shell(['getprop', key]) if strip: if "\r\r\n" in prop: # Some mobile phones will output multiple lines of extra log prop = prop.split("\r\r\n") if len(prop) > 1: prop = prop[-2] else: prop = prop[-1] else: prop = prop.strip("\r\n") return prop
@property @retries(max_tries=3) def sdk_version(self): """ Get the SDK version from the device Returns: SDK version """ if self._sdk_version is None: keyname = 'ro.build.version.sdk' self._sdk_version = int(self.getprop(keyname)) return self._sdk_version
[docs] def push(self, local, remote): """ Perform `adb push` command Note: If there is a space (or special symbol) in the file name, it will be forced to add escape characters, and the new file name will be added with quotation marks and returned as the return value 注意:文件名中如果带有空格(或特殊符号),将会被强制增加转义符,并将新的文件名添加引号,作为返回值返回 Args: local: local file to be copied to the device remote: destination on the device where the file will be copied Returns: The file path saved in the phone may be enclosed in quotation marks, eg. '"test\ file.txt"' Examples: >>> adb = device().adb >>> adb.push("test.txt", "/data/local/tmp") >>> new_name = adb.push("test space.txt", "/data/local/tmp") # test the space in file name >>> print(new_name) "/data/local/tmp/test\ space.txt" >>> adb.shell("rm " + new_name) """ local = decode_path(local) # py2 if os.path.isfile(local) and os.path.splitext(local)[-1] != os.path.splitext(remote)[-1]: # If remote is a folder, add the filename and escape filename = os.path.basename(local) # Add escape characters for spaces, parentheses, etc. in filenames filename = re.sub(r"[ \(\)\&]", lambda m: "\\" + m.group(0), filename) remote = '%s/%s' % (remote, filename) self.cmd(["push", local, remote], ensure_unicode=False) return '\"%s\"' % remote
[docs] def pull(self, remote, local): """ Perform `adb pull` command Args: remote: remote file to be downloaded from the device local: local destination where the file will be downloaded from the device Note: If <=PY3, the path in Windows cannot be the root directory, and cannot contain symbols such as /g in the path 注意:如果低于PY3,windows中路径不能为根目录,并且不能包含/g等符号在路径里 Returns: None """ local = decode_path(local) # py2 if PY3: # If python3, use Path to force / convert to \ from pathlib import Path local = Path(local).as_posix() self.cmd(["pull", remote, local], ensure_unicode=False)
[docs] def forward(self, local, remote, no_rebind=True): """ Perform `adb forward` command Args: local: local tcp port to be forwarded remote: tcp port of the device where the local tcp port will be forwarded no_rebind: True or False Returns: None """ cmds = ['forward'] if no_rebind: cmds += ['--no-rebind'] self.cmd(cmds + [local, remote]) # register for cleanup atexit if local not in self._forward_local_using: self._forward_local_using.append(local)
[docs] def get_forwards(self): """ Perform `adb forward --list`command Yields: serial number, local tcp port, remote tcp port Returns: None """ out = self.cmd(['forward', '--list']) for line in out.splitlines(): line = line.strip() if not line: continue cols = line.split() if len(cols) != 3: continue serialno, local, remote = cols yield serialno, local, remote
[docs] @classmethod def get_available_forward_local(cls): """ Generate a pseudo random number between 11111 and 20000 that will be used as local forward port Returns: integer between 11111 and 20000 Note: use `forward --no-rebind` to check if port is available """ return random.randint(11111, 20000)
[docs] @retries(3) def setup_forward(self, device_port, no_rebind=True): """ Generate pseudo random local port and check if the port is available. Args: device_port: it can be string or the value of the `function(localport)`, e.g. `"tcp:5001"` or `"localabstract:{}".format` no_rebind: adb forward --no-rebind option Returns: local port and device port """ localport = self.get_available_forward_local() if callable(device_port): device_port = device_port(localport) self.forward("tcp:%s" % localport, device_port, no_rebind=no_rebind) return localport, device_port
[docs] def remove_forward(self, local=None): """ Perform `adb forward --remove` command Args: local: local tcp port Returns: None """ if local: cmds = ["forward", "--remove", local] else: cmds = ["forward", "--remove-all"] try: self.cmd(cmds) except AdbError as e: # ignore if already removed or disconnected if "not found" in (repr(e.stdout) + repr(e.stderr)): pass except DeviceConnectionError: pass # unregister for cleanup if local in self._forward_local_using: self._forward_local_using.remove(local)
[docs] def install_app(self, filepath, replace=False, install_options=None): """ Perform `adb install` command Args: filepath: full path to file to be installed on the device replace: force to replace existing application, default is False e.g.["-t", # allow test packages "-l", # forward lock application, "-s", # install application on sdcard, "-d", # allow version code downgrade (debuggable packages only) "-g", # grant all runtime permissions ] Returns: command output """ if isinstance(filepath, str): filepath = decode_path(filepath) if not os.path.isfile(filepath): raise RuntimeError("file: %s does not exists" % (repr(filepath))) if not install_options or type(install_options) != list: install_options = [] if replace: install_options.append("-r") cmds = ["install", ] + install_options + [filepath, ] try: out = self.cmd(cmds) except AdbError as e: out = repr(e.stderr) + repr(e.stdout) # If the signatures are inconsistent, uninstall the old version first if "INSTALL_FAILED_UPDATE_INCOMPATIBLE" in out and replace: try: package_name = re.search(r"package (.*?) .*", out).group(1) except: # get package name package_name = APK(filepath).get_package() self.uninstall_app(package_name) out = self.cmd(cmds) else: raise if re.search(r"Failure \[.*?\]", out): raise AdbShellError("Installation Failure", repr(out)) return out
[docs] def install_multiple_app(self, filepath, replace=False, install_options=None): """ Perform `adb install-multiple` command Args: filepath: full path to file to be installed on the device replace: force to replace existing application, default is False install_options: list of options e.g.["-t", # allow test packages "-l", # forward lock application, "-s", # install application on sdcard, "-d", # allow version code downgrade (debuggable packages only) "-g", # grant all runtime permissions "-p", # partial application install (install-multiple only) ] Returns: command output """ if isinstance(filepath, str): filepath = decode_path(filepath) if not os.path.isfile(filepath): raise RuntimeError("file: %s does not exists" % (repr(filepath))) if not install_options or type(install_options) != list: install_options = [] if replace: install_options.append("-r") cmds = ["install-multiple", ] + install_options + [filepath, ] try: out = self.cmd(cmds) except AdbError as err: if "Failed to finalize session".lower() in err.stderr.lower(): return "Success" else: return self.install_app(filepath, replace) if re.search(r"Failure \[.*?\]", out): raise AdbShellError("Installation Failure", repr(out)) return out
[docs] def pm_install(self, filepath, replace=False, install_options=None): """ Perform `adb push` and `adb install` commands Note: This is more reliable and recommended way of installing `.apk` files Args: filepath: full path to file to be installed on the device replace: force to replace existing application, default is False install_options: list of options e.g.["-t", # allow test packages "-l", # forward lock application, "-s", # install application on sdcard, "-d", # allow version code downgrade (debuggable packages only) "-g", # grant all runtime permissions ] Returns: None """ if isinstance(filepath, str): filepath = decode_path(filepath) if not os.path.isfile(filepath): raise RuntimeError("file: %s does not exists" % (repr(filepath))) if not install_options or type(install_options) != list: install_options = [] if replace: install_options.append("-r") filename = os.path.basename(filepath) device_dir = "/data/local/tmp" try: # if the apk file path contains spaces, the path must be escaped device_path = self.push(filepath, device_dir) except AdbError as err: # Error: no space left on device raise err try: cmds = ["pm", "install", ] + install_options + [device_path] self.shell(cmds) except AdbError as e: out = repr(e.stderr) + repr(e.stdout) # If the signatures are inconsistent, uninstall the old version first if re.search(r"INSTALL_FAILED_UPDATE_INCOMPATIBLE", out): try: package_name = re.search(r"package (.*?) .*", out).group(1) except: # get package name package_name = APK(filepath).get_package() self.uninstall_app(package_name) out = self.shell(cmds) else: raise finally: # delete apk file self.cmd(["shell", "rm", device_path], timeout=30)
[docs] def uninstall_app(self, package): """ Perform `adb uninstall` command Args: package: package name to be uninstalled from the device Returns: command output """ return self.cmd(['uninstall', package])
[docs] def pm_uninstall(self, package, keepdata=False): """ Perform `adb uninstall` command and delete all related application data Args: package: package name to be uninstalled from the device keepdata: True or False, keep application data after removing the app from the device Returns: command output """ cmd = ['pm', 'uninstall', package] if keepdata: cmd.append('-k') self.shell(cmd)
[docs] def snapshot(self): """ Take the screenshot of the device display Returns: command output (stdout) """ if self.display_id: raw = self.cmd('shell screencap -d {0} -p'.format(self.display_id), ensure_unicode=False) else: raw = self.cmd('shell screencap -p', ensure_unicode=False) return raw.replace(self.line_breaker, b"\n")
# PEP 3113 -- Removal of Tuple Parameter Unpacking # https://www.python.org/dev/peps/pep-3113/
[docs] def touch(self, tuple_xy): """ Perform user input (touchscreen) on given coordinates Args: tuple_xy: coordinates (x, y) Returns: None """ x, y = tuple_xy self.shell('input tap %d %d' % (x, y)) time.sleep(0.1)
[docs] def swipe(self, tuple_x0y0, tuple_x1y1, duration=500): """ Perform user input (swipe screen) from start point (x,y) to end point (x,y) Args: tuple_x0y0: start point coordinates (x, y) tuple_x1y1: end point coordinates (x, y) duration: time interval for action, default 500 Raises: AirtestError: if SDK version is not supported Returns: None """ # prot python 3 x0, y0 = tuple_x0y0 x1, y1 = tuple_x1y1 version = self.sdk_version if version <= 15: raise AirtestError('swipe: API <= 15 not supported (version=%d)' % version) elif version <= 17: self.shell('input swipe %d %d %d %d' % (x0, y0, x1, y1)) else: self.shell('input touchscreen swipe %d %d %d %d %d' % (x0, y0, x1, y1, duration))
[docs] def logcat(self, grep_str="", extra_args="", read_timeout=10): """ Perform `adb shell logcat` command and search for given patterns Args: grep_str: pattern to filter from the logcat output extra_args: additional logcat arguments read_timeout: time interval to read the logcat, default is 10 Yields: logcat lines containing filtered patterns Returns: None """ cmds = "shell logcat" if extra_args: cmds += " " + extra_args if grep_str: cmds += " | grep " + grep_str logcat_proc = self.start_cmd(cmds) nbsp = NonBlockingStreamReader(logcat_proc.stdout, print_output=False) while True: line = nbsp.readline(read_timeout) if line is None: break else: yield line nbsp.kill() logcat_proc.kill() return
[docs] def exists_file(self, filepath): """ Check if the file exits on the device Args: filepath: path to the file Returns: True or False if file found or not """ try: out = self.shell(["ls", filepath]) except AdbShellError: return False else: return not ("No such file or directory" in out)
[docs] def file_size(self, filepath): """ Get the file size Args: filepath: path to the file Returns: The file size Raises: AdbShellError if no such file """ out = self.shell(["ls", "-l", filepath]) try: file_size = int(out.split()[4]) except ValueError: # 安卓6.0.1系统得到的结果是[3]为文件大小 file_size = int(out.split()[3]) return file_size
def _cleanup_forwards(self): """ Remove the local forward ports Returns: None """ # remove forward成功后,会改变self._forward_local_using的内容,因此需要copy一遍 # After remove_forward() is successful, self._forward_local_using will be changed, so it needs to be copied forward_local_list = copy(self._forward_local_using) for local in forward_local_list: try: self.remove_forward(local) except DeviceConnectionError: # if device is offline, ignore pass @property def line_breaker(self): """ Set carriage return and line break property for various platforms and SDK versions Returns: carriage return and line break string """ if not self._line_breaker: if self.sdk_version >= SDK_VERISON_ANDROID7: line_breaker = os.linesep else: line_breaker = '\r' + os.linesep self._line_breaker = line_breaker.encode("ascii") return self._line_breaker @property def display_info(self): """ Set device display properties (orientation, rotation and max values for x and y coordinates) Notes: if there is a lock screen detected, the function tries to unlock the device first Returns: device screen properties """ self._display_info_lock.acquire() if not self._display_info: self._display_info = self.get_display_info() self._display_info_lock.release() return self._display_info
[docs] def get_display_info(self): """ Get information about device physical display (orientation, rotation and max values for x and y coordinates) Returns: device screen properties e.g { 'width': 1440, 'height': 2960, 'density': 4.0, 'orientation': 3, 'rotation': 270, 'max_x': 4095, 'max_y': 4095 } """ display_info = self.getPhysicalDisplayInfo() orientation = self.getDisplayOrientation() max_x, max_y = self.getMaxXY() display_info.update({ "orientation": orientation, "rotation": orientation * 90, "max_x": max_x, "max_y": max_y, }) return display_info
[docs] def getMaxXY(self): """ Get device display maximum values for x and y coordinates Returns: max x and max y coordinates """ ret = self.shell('getevent -p').split('\n') max_x, max_y = None, None for i in ret: if i.find("0035") != -1: patten = re.compile(r'max [0-9]+') ret = patten.search(i) if ret: max_x = int(ret.group(0).split()[1]) if i.find("0036") != -1: patten = re.compile(r'max [0-9]+') ret = patten.search(i) if ret: max_y = int(ret.group(0).split()[1]) return max_x, max_y
[docs] def getRestrictedScreen(self): """ Get value for mRestrictedScreen (without black border / virtual keyboard)` Returns: screen resolution mRestrictedScreen value as tuple (x, y) """ # get the effective screen resolution of the device result = None # get the corresponding mRestrictedScreen parameters according to the device serial number dumpsys_info = self.shell("dumpsys window") match = re.search(r'mRestrictedScreen=.+', dumpsys_info) if match: infoline = match.group(0).strip() # like 'mRestrictedScreen=(0,0) 720x1184' resolution = infoline.split(" ")[1].split("x") if isinstance(resolution, list) and len(resolution) == 2: result = int(str(resolution[0])), int(str(resolution[1])) return result
[docs] def getPhysicalDisplayInfo(self): """ Get value for display dimension and density from `mPhysicalDisplayInfo` value obtained from `dumpsys` command. Returns: physical display info for dimension and density """ # use adb shell wm size displayInfo = {} try: wm_size = re.search(r'(?P<width>\d+)x(?P<height>\d+)\s*$', self.cmd('shell wm size', timeout=5)) except (AdbError, RuntimeError) as e: LOGGING.error(e) else: if wm_size: displayInfo = dict((k, int(v)) for k, v in wm_size.groupdict().items()) displayInfo['density'] = self._getDisplayDensity(strip=True) return displayInfo phyDispRE = re.compile('.*PhysicalDisplayInfo{(?P<width>\d+) x (?P<height>\d+), .*, density (?P<density>[\d.]+).*') out = self.raw_shell('dumpsys display') m = phyDispRE.search(out) if m: for prop in ['width', 'height']: displayInfo[prop] = int(m.group(prop)) for prop in ['density']: # In mPhysicalDisplayInfo density is already a factor, no need to calculate displayInfo[prop] = float(m.group(prop)) return displayInfo # This could also be mSystem or mOverscanScreen phyDispRE = re.compile('\s*mUnrestrictedScreen=\((?P<x>\d+),(?P<y>\d+)\) (?P<width>\d+)x(?P<height>\d+)') # This is known to work on older versions (i.e. API 10) where mrestrictedScreen is not available dispWHRE = re.compile('\s*DisplayWidth=(?P<width>\d+) *DisplayHeight=(?P<height>\d+)') out = self.raw_shell('dumpsys window') m = phyDispRE.search(out, 0) if not m: m = dispWHRE.search(out, 0) if m: for prop in ['width', 'height']: displayInfo[prop] = int(m.group(prop)) for prop in ['density']: d = self._getDisplayDensity(strip=True) if d: displayInfo[prop] = d else: # No available density information displayInfo[prop] = -1.0 return displayInfo # gets C{mPhysicalDisplayInfo} values from dumpsys. This is a method to obtain display dimensions and density phyDispRE = re.compile('Physical size: (?P<width>\d+)x(?P<height>\d+).*Physical density: (?P<density>\d+)', re.S) m = phyDispRE.search(self.cmd('shell wm size; wm density', timeout=3)) if m: for prop in ['width', 'height']: displayInfo[prop] = int(m.group(prop)) for prop in ['density']: displayInfo[prop] = float(m.group(prop)) return displayInfo if not displayInfo: raise DeviceConnectionError("Getting device screen information timed out")
def _getDisplayDensity(self, strip=True): """ Get display density Args: strip: strip the output Returns: display density """ BASE_DPI = 160.0 d = self.getprop('ro.sf.lcd_density', strip) if d: return float(d) / BASE_DPI d = self.getprop('qemu.sf.lcd_density', strip) if d: return float(d) / BASE_DPI return -1.0
[docs] def getDisplayOrientation(self): """ Another way to get the display orientation, this works well for older devices (SDK version 15) Returns: display orientation information """ # another way to get orientation, for old sumsung device(sdk version 15) from xiaoma SurfaceFlingerRE = re.compile('orientation=(\d+)') output = self.shell('dumpsys SurfaceFlinger') m = SurfaceFlingerRE.search(output) if m: return int(m.group(1)) # Fallback method to obtain the orientation # See https://github.com/dtmilano/AndroidViewClient/issues/128 surfaceOrientationRE = re.compile('SurfaceOrientation:\s+(\d+)') output = self.shell('dumpsys input') m = surfaceOrientationRE.search(output) if m: return int(m.group(1)) displayFramesRE = re.compile(r"DisplayFrames.*r=(\d+)") output = self.shell('dumpsys window displays') m = displayFramesRE.search(output) if m: return int(m.group(1)) # We couldn't obtain the orientation warnings.warn("Could not obtain the orientation, return 0") return 0
[docs] def update_cur_display(self, display_info): """ Some phones support resolution modification, try to get the modified resolution from dumpsys adb shell dumpsys window displays | find "cur=" 本方法虽然可以更好地获取到部分修改过分辨率的手机信息 但是会因为cur=(\d+)x(\d+)的数值在不同设备上width和height的顺序可能不同,导致横竖屏识别出现问题 airtest不再使用本方法作为通用的屏幕尺寸获取方法,但依然可用于部分设备获取当前被修改过的分辨率 Examples: >>> # 部分三星和华为设备,若分辨率没有指定为最高,可能会导致点击偏移,可以用这个方式强制修改: >>> # For some Samsung and Huawei devices, if the resolution is not specified as the highest, >>> # it may cause click offset, which can be modified in this way: >>> dev = device() >>> info = dev.display_info >>> info2 = dev.adb.update_cur_display(info) >>> dev.display_info.update(info2) Args: display_info: the return of self.getPhysicalDisplayInfo() Returns: display_info """ # adb shell dumpsys window displays | find "init=" # 上面的命令行在dumpsys window里查找init=widthxheight,得到的结果是物理分辨率,且部分型号手机不止一个结果 # 如果改为读取 cur=widthxheight 的数据,得到的是修改过分辨率手机的结果(例如三星S8) actual = self.shell("dumpsys window displays") arr = re.findall(r'cur=(\d+)x(\d+)', actual) if len(arr) > 0: # 强制设定宽度width为更小的数字、height为更大的数字,避免因为各手机厂商返回结果的顺序不同导致问题 # Set the width to a smaller number and the height to a larger number width, height = min(list(map(int, arr[0]))), max(list(map(int, arr[0]))) display_info['physical_width'] = display_info['width'] display_info['physical_height'] = display_info['height'] display_info['width'], display_info['height'] = width, height return display_info
[docs] def get_top_activity(self): """ Perform `adb shell dumpsys activity top` command search for the top activity Raises: AirtestError: if top activity cannot be obtained Returns: top activity as a tuple: (package_name, activity_name, pid) """ dat = self.shell('dumpsys activity top') activityRE = re.compile(r'\s*ACTIVITY ([A-Za-z0-9_.$]+)/([A-Za-z0-9_.$]+) \w+ pid=(\d+)') # in Android8.0 or higher, the result may be more than one m = activityRE.findall(dat) if m: return m[-1] else: raise AirtestError("Can not get top activity, output:%s" % dat)
[docs] def is_keyboard_shown(self): """ Perform `adb shell dumpsys input_method` command and search for information if keyboard is shown Returns: True or False whether the keyboard is shown or not """ dim = self.shell('dumpsys input_method') if dim: return "mInputShown=true" in dim return False
[docs] def is_screenon(self): """ Perform `adb shell dumpsys window policy` command and search for information if screen is turned on or off Raises: AirtestError: if screen state can't be detected Returns: True or False whether the screen is turned on or off """ screenOnRE = re.compile('mScreenOnFully=(true|false)') m = screenOnRE.search(self.shell('dumpsys window policy')) if m: return m.group(1) == 'true' else: # MIUI11 screenOnRE = re.compile('screenState=(SCREEN_STATE_ON|SCREEN_STATE_OFF)') m = screenOnRE.search(self.shell('dumpsys window policy')) if m: return m.group(1) == 'SCREEN_STATE_ON' raise AirtestError("Couldn't determine screen ON state")
[docs] @retries(max_tries=3) def is_locked(self): """ Perform `adb shell dumpsys window policy` command and search for information if screen is locked or not Raises: AirtestError: if lock screen can't be detected Returns: True or False whether the screen is locked or not """ lockScreenRE = re.compile('(?:mShowingLockscreen|isStatusBarKeyguard|showing)=(true|false)') m = lockScreenRE.search(self.shell('dumpsys window policy')) if not m: raise AirtestError("Couldn't determine screen lock state") return (m.group(1) == 'true')
[docs] def unlock(self): """ Perform `adb shell input keyevent MENU` and `adb shell input keyevent BACK` commands to attempt to unlock the screen Returns: None Warnings: Might not work on all devices """ self.shell('input keyevent MENU') self.shell('input keyevent BACK')
[docs] def get_package_version(self, package): """ Perform `adb shell dumpsys package` and search for information about given package version Args: package: package name Returns: None if no info has been found, otherwise package version """ package_info = self.shell(['dumpsys', 'package', package]) matcher = re.search(r'versionCode=(\d+)', package_info) if matcher: return int(matcher.group(1)) return None
[docs] def list_app(self, third_only=False): """ Perform `adb shell pm list packages` to print all packages, optionally only those whose package name contains the text in FILTER. Options -f: see their associated file -d: filter to only show disabled packages -e: filter to only show enabled packages -s: filter to only show system packages -3: filter to only show third party packages -i: see the installer for the packages -u: also include uninstalled packages Args: third_only: print only third party packages Returns: list of packages """ cmd = ["pm", "list", "packages"] if third_only: cmd.append("-3") output = self.shell(cmd) packages = output.splitlines() # remove all empty string; "package:xxx" -> "xxx" packages = [p.split(":")[1] for p in packages if p] return packages
[docs] def path_app(self, package): """ Perform `adb shell pm path` command to print the path to the package Args: package: package name Raises: AdbShellError: if any adb error occurs AirtestError: if package is not found on the device Returns: path to the package """ try: output = self.shell(['pm', 'path', package]) except AdbShellError: output = "" if 'package:' not in output: raise AirtestError('package not found, output:[%s]' % output) return output.split("package:")[1].strip()
[docs] def check_app(self, package): """ Perform `adb shell dumpsys package` command and check if package exists on the device Args: package: package name Raises: AirtestError: if package is not found Returns: True if package has been found """ output = self.shell(['dumpsys', 'package', package]) pattern = r'Package\s+\[' + str(package) + '\]' match = re.search(pattern, output) if match is None: raise AirtestError('package "{}" not found'.format(package)) return True
[docs] def start_app(self, package, activity=None): """ Perform `adb shell monkey` commands to start the application, if `activity` argument is `None`, then `adb shell am start` command is used. Args: package: package name activity: activity name Returns: None """ if not activity: try: ret = self.shell(['monkey', '-p', package, '-c', 'android.intent.category.LAUNCHER', '1']) except AdbShellError as e: raise AirtestError("Starting App: %s Failed! No activities found to run." % package) if "No activities found to run" in ret: raise AirtestError("Starting App: %s Failed! No activities found to run." % package) else: self.shell(['am', 'start', '-n', '%s/%s.%s' % (package, package, activity)])
[docs] def start_app_timing(self, package, activity): """ Start the application and activity, and measure time Args: package: package name activity: activity name Returns: app launch time """ out = self.shell(['am', 'start', '-S', '-W', '%s/%s' % (package, activity), '-c', 'android.intent.category.LAUNCHER', '-a', 'android.intent.action.MAIN']) if not re.search(r"Status:\s*ok", out): raise AirtestError("Starting App: %s/%s Failed!" % (package, activity)) # matcher = re.search(r"TotalTime:\s*(\d+)", out) matcher = re.search(r"TotalTime:\s*(\d+)", out) if matcher: return int(matcher.group(1)) else: return 0
[docs] def stop_app(self, package): """ Perform `adb shell am force-stop` command to force stop the application Args: package: package name Returns: None """ self.shell(['am', 'force-stop', package])
[docs] def clear_app(self, package): """ Perform `adb shell pm clear` command to clear all application data Args: package: package name Returns: None """ self.shell(['pm', 'clear', package])
[docs] def text(self, content): """ Use adb shell input for text input Args: content: text to input Returns: None Examples: >>> dev = connect_device("Android:///") >>> dev.text("Hello World") >>> dev.text("test123") """ if content.isalpha(): self.shell(["input", "text", content]) else: # If it contains letters and numbers, input with `adb shell input text` may result in the wrong order for i in content: if i == " ": self.keyevent("KEYCODE_SPACE") else: self.shell(["input", "text", i])
[docs] def get_ip_address(self): """ Perform several set of commands to obtain the IP address. * `adb shell netcfg | grep wlan0` * `adb shell ifconfig` * `adb getprop dhcp.wlan0.ipaddress` Returns: None if no IP address has been found, otherwise return the IP address """ def get_ip_address_from_interface(interface): """Get device ip from target network interface.""" # android >= 6.0: ip -f inet addr show {interface} try: res = self.shell('ip -f inet addr show {}'.format(interface)) except AdbShellError: res = '' matcher = re.search(r"inet (\d+\.){3}\d+", res) if matcher: return matcher.group().split(" ")[-1] # android >= 6.0 backup method: ifconfig try: res = self.shell('ifconfig') except AdbShellError: res = '' matcher = re.search(interface + r'.*?inet addr:((\d+\.){3}\d+)', res, re.DOTALL) if matcher: return matcher.group(1) # android <= 6.0: netcfg try: res = self.shell('netcfg') except AdbShellError: res = '' matcher = re.search(interface + r'.* ((\d+\.){3}\d+)/\d+', res) if matcher: return matcher.group(1) # android <= 6.0 backup method: getprop dhcp.{}.ipaddress try: res = self.shell('getprop dhcp.{}.ipaddress'.format(interface)) except AdbShellError: res = '' matcher = IP_PATTERN.search(res) if matcher: return matcher.group(0) # sorry, no more methods... return None interfaces = ('eth0', 'eth1', 'wlan0') for i in interfaces: ip = get_ip_address_from_interface(i) if ip and not ip.startswith('127.') and not ip.startswith('169.'): return ip return None
[docs] def get_gateway_address(self): """ Perform several set of commands to obtain the gateway address. * `adb getprop dhcp.wlan0.gateway` * `adb shell netcfg | grep wlan0` Returns: None if no gateway address has been found, otherwise return the gateway address """ ip2int = lambda ip: reduce(lambda a, b: (a << 8) + b, map(int, ip.split('.')), 0) int2ip = lambda n: '.'.join([str(n >> (i << 3) & 0xFF) for i in range(0, 4)[::-1]]) try: res = self.shell('getprop dhcp.wlan0.gateway') except AdbShellError: res = '' matcher = IP_PATTERN.search(res) if matcher: return matcher.group(0) ip = self.get_ip_address() if not ip: return None mask_len = self._get_subnet_mask_len() gateway = (ip2int(ip) & (((1 << mask_len) - 1) << (32 - mask_len))) + 1 return int2ip(gateway)
def _get_subnet_mask_len(self): """ Perform `adb shell netcfg | grep wlan0` command to obtain mask length Returns: 17 if mask length could not be detected, otherwise the mask length """ try: res = self.shell('netcfg') except AdbShellError: pass else: matcher = re.search(r'wlan0.* (\d+\.){3}\d+/(\d+) ', res) if matcher: return int(matcher.group(2)) # 获取不到网段长度就默认取17 print('[iputils WARNING] fail to get subnet mask len. use 17 as default.') return 17
[docs] def get_memory(self): res = self.shell("dumpsys meminfo") pat = re.compile(r".*Total RAM:\s+(\S+)\s+", re.DOTALL) _str = pat.match(res).group(1) if ',' in _str: _list = _str.split(',') _num = int(_list[0]) _num = round(_num + (float(_list[1]) / 1000.0)) else: _num = round(float(_str) / 1000.0 / 1000.0) res = str(_num) + 'G' return res
[docs] def get_storage(self): res = self.shell("df /data") pat = re.compile(r".*\/data\s+(\S+)", re.DOTALL) if pat.match(res): _str = pat.match(res).group(1) else: pat = re.compile(r".*\s+(\S+)\s+\S+\s+\S+\s+\S+\s+\/data", re.DOTALL) _str = pat.match(res).group(1) if 'G' in _str: _num = round(float(_str[:-1])) elif 'M' in _str: _num = round(float(_str[:-1]) / 1000.0) else: _num = round(float(_str) / 1000.0 / 1000.0) if _num > 64: res = '128G' elif _num > 32: res = '64G' elif _num > 16: res = '32G' elif _num > 8: res = '16G' else: res = '8G' return res
[docs] def get_cpuinfo(self): res = self.shell("cat /proc/cpuinfo").strip() cpuNum = res.count("processor") pat = re.compile(r'Hardware\s+:\s+(\w+.*)') m = pat.search(res) if not m: pat = re.compile(r'Processor\s+:\s+(\w+.*)') m = pat.match(res) cpuName = m.group(1).replace('\r', '') return dict(cpuNum=cpuNum, cpuName=cpuName)
[docs] def get_cpufreq(self): res = self.shell("cat /sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq") num = round(float(res) / 1000 / 1000, 1) res = str(num) + 'GHz' return res.strip()
[docs] def get_cpuabi(self): res = self.shell("getprop ro.product.cpu.abi") return res.strip()
[docs] def get_gpu(self): res = self.shell("dumpsys SurfaceFlinger") pat = re.compile(r'GLES:\s+(.*)') m = pat.search(res) if not m: return None _list = m.group(1).split(',') gpuModel = "" opengl = "" if len(_list) > 0: gpuModel = _list[1].strip() if len(_list) > 1: m2 = re.search(r'(\S+\s+\S+\s+\S+).*', _list[2]) if m2: opengl = m2.group(1) return dict(gpuModel=gpuModel, opengl=opengl)
[docs] def get_model(self): return self.getprop("ro.product.model")
[docs] def get_manufacturer(self): return self.getprop("ro.product.manufacturer")
[docs] def get_device_info(self): """ Get android device information, including: memory/storage/display/cpu/gpu/model/manufacturer... Returns: Dict of info """ handlers = { "platform": "Android", "serialno": self.serialno, "memory": self.get_memory, "storage": self.get_storage, "display": self.getPhysicalDisplayInfo, "cpuinfo": self.get_cpuinfo, "cpufreq": self.get_cpufreq, "cpuabi": self.get_cpuabi, "sdkversion": self.sdk_version, "gpu": self.get_gpu, "model": self.get_model, "manufacturer": self.get_manufacturer, # "battery": getBatteryCapacity } ret = {} for k, v in handlers.items(): if callable(v): try: value = v() except Exception: value = None ret[k] = value else: ret[k] = v return ret
[docs] def get_display_of_all_screen(self, info, package=None): """ Perform `adb shell dumpsys window windows` commands to get window display of application. Args: info: device screen properties package: package name, default to the package of top activity Returns: None if adb command failed to run, otherwise return device screen properties(portrait mode) eg. (offset_x, offset_y, screen_width, screen_height) """ output = self.shell("dumpsys window windows") windows = output.split("Window #") offsetx, offsety, width, height = 0, 0, info['width'], info['height'] package = self._search_for_current_package(output) if package is None else package if package: for w in windows: if "package=%s" % package in w: arr = re.findall(r'Frames: containing=\[(\d+\.?\d*),(\d+\.?\d*)]\[(\d+\.?\d*),(\d+\.?\d*)]', w) if len(arr) >= 1 and len(arr[0]) == 4: offsetx, offsety, width, height = float(arr[0][0]), float(arr[0][1]), float(arr[0][2]), float(arr[0][3]) if info["orientation"] in [1, 3]: offsetx, offsety, width, height = offsety, offsetx, height, width width, height = width - offsetx, height - offsety return { "offset_x": offsetx, "offset_y": offsety, "offset_width": width, "offset_height": height, }
def _search_for_current_package(self, ret): """ Search for current app package name from the output of command "adb shell dumpsys window windows" Returns: package name if exists else "" """ try: packageRE = re.compile('\s*mCurrentFocus=Window{.* ([A-Za-z0-9_.]+)/[A-Za-z0-9_.]+}') m = packageRE.findall(ret) if m: return m[-1] else: return self.get_top_activity()[0] except Exception as e: print("[Error] Cannot get current top activity") return ""
[docs]def cleanup_adb_forward(): for adb in ADB._instances: adb._cleanup_forwards()
reg_cleanup(cleanup_adb_forward)