Source code for airtest.core.ios.ios

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import re
import time
import base64
import traceback
import wda
import inspect
from functools import wraps
from airtest import aircv
from airtest.core.device import Device
from airtest.core.ios.constant import CAP_METHOD, TOUCH_METHOD, IME_METHOD, ROTATION_MODE, KEY_EVENTS, \
from airtest.core.ios.rotation import XYTransformer, RotationWatcher
from airtest.core.ios.instruct_cmd import InstructHelper
from airtest.utils.logger import get_logger
from airtest.core.ios.mjpeg_cap import MJpegcap

LOGGING = get_logger(__name__)

DEFAULT_ADDR = "http://localhost:8100/"

[docs]def decorator_retry_session(func): """ When the operation fails due to session failure, try to re-acquire the session, retry at most 3 times 当因为session失效而操作失败时,尝试重新获取session,最多重试3次 """ @wraps(func) def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except (RuntimeError, wda.WDAError): for i in range(3): try: self._fetch_new_session() return func(self, *args, **kwargs) except: time.sleep(0.5) continue raise return wrapper
[docs]def decorator_retry_for_class(cls): """ Add decorators to all methods in the class 为class里的所有method添加装饰器 ``decorator_retry_session`` """ for name, method in inspect.getmembers(cls): # Ignore built-in methods and private methods named _xxx # 忽略内置方法和下划线开头命名的私有方法 _xxx if (not inspect.ismethod(method) and not inspect.isfunction(method)) \ or inspect.isbuiltin(method) or name.startswith("_"): continue setattr(cls, name, decorator_retry_session(method)) return cls
[docs]@decorator_retry_for_class class IOS(Device): """ios client - before this you have to run `WebDriverAgent <>`_ - ``xcodebuild -project path/to/WebDriverAgent.xcodeproj -scheme WebDriverAgentRunner -destination "id=$(idevice_id -l)" test`` - ``iproxy $port 8100 $udid`` """ def __init__(self, addr=DEFAULT_ADDR, cap_method=CAP_METHOD.MJPEG, mjpeg_port=None): super(IOS, self).__init__() # if none or empty, use default addr self.addr = addr or DEFAULT_ADDR # fit wda format, make url start with http:// # eg. http://localhost:8100/ or http+usbmux://00008020-001270842E88002E # with mjpeg_port: http://localhost:8100/?mjpeg_port=9100 if not self.addr.startswith("http"): self.addr = "http://" + addr """here now use these supported cap touch and ime method""" self.cap_method = cap_method self.touch_method = TOUCH_METHOD.WDATOUCH self.ime_method = IME_METHOD.WDAIME # wda driver, use to home, start app # init wda session, updata when start app # use to click/swipe/close app/get wda size wda.DEBUG = False self.driver = wda.Client(self.addr) # record device's width self._size = {'width': None, 'height': None} self._current_orientation = None self._touch_factor = None self._last_orientation = None self._is_pad = None self._using_ios_tagent = None self._device_info = {} info = self.device_info self.instruct_helper = InstructHelper(info['uuid']) self.mjpegcap = MJpegcap(self.instruct_helper, ori_function=lambda: self.display_info, ip=self.ip, port=mjpeg_port) # start up RotationWatcher with default session self.rotation_watcher = RotationWatcher(self) self._register_rotation_watcher() self.alert_watch_and_click = self.driver.alert.watch_and_click @property def ip(self): """ Returns the IP address of the host connected to the iOS phone It is not the IP address of the iOS phone. If you want to get the IP address of the phone, you can access the interface `get_ip_address` For example: when the device is connected via http://localhost:8100, return localhost If it is a remote device http://192.168.xx.xx:8100, it returns the IP address of 192.168.xx.xx Returns: """ match =, self.addr) if match: ip = else: ip = 'localhost' return ip @property def uuid(self): return self.addr @property def using_ios_tagent(self): """ 当前基础版本:appium/WebDriverAgent 4.1.4 基于上述版本,2022.3.30之后发布的iOS-Tagent版本,在/status接口中新增了一个Version参数,如果能检查到本参数,说明使用了新版本ios-Tagent 该版本基于Appium版的WDA做了一些改动,可以更快地进行点击和滑动,并优化了部分UI树的获取逻辑 但是所有的坐标都为竖屏坐标,需要airtest自行根据方向做转换 同时,大于4.1.4版本的appium/WebDriverAgent不再需要针对ipad进行特殊的横屏坐标处理了 Returns: """ if self._using_ios_tagent is None: status = self.driver.status() if 'Version' in status: self._using_ios_tagent = True else: self._using_ios_tagent = False return self._using_ios_tagent def _fetch_new_session(self): """ Re-acquire a new session 重新获取新的session :return: """ # 根据facebook-wda的逻辑,直接设为None就会自动获取一个新的默认session self.driver.session_id = None @property def is_pad(self): """ Determine whether it is an ipad(or 6P/7P/8P), if it is, in the case of horizontal screen + desktop, the coordinates need to be switched to vertical screen coordinates to click correctly (WDA bug) 判断是否是ipad(或 6P/7P/8P),如果是,在横屏+桌面的情况下,坐标需要切换成竖屏坐标才能正确点击(WDA的bug) Returns: """ if self._is_pad is None: info = self.device_info if info["model"] == "iPad" or \ (self.display_info["width"], self.display_info["height"]) in LANDSCAPE_PAD_RESOLUTION: # ipad与6P/7P/8P等设备,桌面横屏时的表现一样,都会变横屏 self._is_pad = True else: self._is_pad = False return self._is_pad @property def device_info(self): """ get the device info. .. note:: Might not work on all devices Returns: dict for device info, eg. AttrDict({ 'timeZone': 'GMT+0800', 'currentLocale': 'zh_CN', 'model': 'iPhone', 'uuid': '90CD6AB7-11C7-4E52-B2D3-61FA31D791EC', 'userInterfaceIdiom': 0, 'userInterfaceStyle': 'light', 'name': 'iPhone', 'isSimulator': False}) """ if not self._device_info: self._device_info = return self._device_info def _register_rotation_watcher(self): """ Register callbacks for Android and minicap when rotation of screen has changed callback is called in another thread, so be careful about thread-safety Returns: None """ self.rotation_watcher.reg_callback(lambda x: setattr(self, "_current_orientation", x))
[docs] def window_size(self): """ return window size namedtuple: Size(width , height) """ try: return self.driver.window_size() except wda.exceptions.WDAStaleElementReferenceError: print("iOS connection failed, please try pressing the home button to return to the desktop and try again.") print("iOS连接失败,请尝试按home键回到桌面后再重试") raise
@property def orientation(self): """ return device oritantation status in LANDSACPE POR """ if not self._current_orientation: self._current_orientation = self.get_orientation() return self._current_orientation @orientation.setter def orientation(self, value): """ Args: value(string): LANDSCAPE | PORTRAIT | UIA_DEVICE_ORIENTATION_LANDSCAPERIGHT | UIA_DEVICE_ORIENTATION_PORTRAIT_UPSIDEDOWN Returns: """ # 可以对屏幕进行旋转,但是可能会导致问题 self.driver.orientation = value
[docs] def get_orientation(self): # self.driver.orientation只能拿到LANDSCAPE,不能拿到左转/右转的确切方向 # 因此手动调用/rotation获取屏幕实际方向 rotation = self.driver._session_http.get('/rotation') # rotation dict eg. {'value': {'x': 0, 'y': 0, 'z': 90}, 'sessionId': 'xx', 'status': 0} if rotation: return ROTATION_MODE.get(rotation['value']['z'], wda.PORTRAIT)
@property def display_info(self): if not self._size['width'] or not self._size['height']: self._display_info() self._size['orientation'] = self.orientation return self._size def _display_info(self): # function window_size() return UIKit size, While screenshot() image size is Native Resolution window_size = self.window_size() # when use screenshot, the image size is pixels size. eg(1080 x 1920) snapshot = self.snapshot() if self.orientation in [wda.LANDSCAPE, wda.LANDSCAPE_RIGHT]: self._size['window_width'], self._size['window_height'] = window_size.height, window_size.width width, height = snapshot.shape[:2] else: self._size['window_width'], self._size['window_height'] = window_size.width, window_size.height height, width = snapshot.shape[:2] self._size["width"] = width self._size["height"] = height # use session.scale can get UIKit scale factor # so self._touch_factor = 1 / self.driver.scale, but the result is incorrect on some devices(6P/7P/8P) self._touch_factor = float(self._size['window_height']) / float(height) self.rotation_watcher.get_ready() # 当前版本: wda 4.1.4 获取到的/window/size,在ipad+桌面横屏下拿到的是 height * height,需要修正 if self.is_pad and self.home_interface(): self._size['window_width'] = int(width * self._touch_factor) @property def touch_factor(self): if not self._touch_factor: self._display_info() return self._touch_factor @touch_factor.setter def touch_factor(self, factor): """ touch_factor is used to convert click coordinates: mobile phone real coordinates = touch_factor * screen coordinates In general, no manual settings are required touch_factor用于换算点击坐标:手机真实坐标 = touch_factor * 屏幕坐标 默认计算方式是: self.display_info['window_height'] / self.display_info['height'] 但在部分特殊型号手机上可能不准确,例如iOS14.4的7P,默认值为 1/3,但部分7P点击位置不准确,可自行设置为:self.touch_factor = 1 / 3.3 (一般情况下,无需手动设置!) Examples: >>> device = connect_device("iOS:///") >>> device.touch((100, 100)) # wrong position >>> print(device.touch_factor) 0.333333 >>> device.touch_factor = 1 / 3.3 # default is 1/3 >>> device.touch((100, 100)) Args: factor: real_pos / pixel_pos, e.g: 1/self.driver.scale Returns: """ self._touch_factor = float(factor)
[docs] def get_render_resolution(self): """ Return render resolution after rotation Returns: offset_x, offset_y, offset_width and offset_height of the display """ w, h = self.get_current_resolution() return 0, 0, w, h
[docs] def get_current_resolution(self): w, h = self.display_info["width"], self.display_info["height"] if self.display_info["orientation"] in [wda.LANDSCAPE, wda.LANDSCAPE_RIGHT]: w, h = h, w return w, h
[docs] def home(self): # press("home") faster than home() return"home")
def _neo_wda_screenshot(self): """ this is almost same as wda implementation, but without png header check, as response data is now jpg format in mid quality """ value = self.driver.http.get('screenshot').value raw_value = base64.b64decode(value) return raw_value
[docs] def snapshot(self, filename=None, quality=10, max_size=None): """ take snapshot Args: filename: save screenshot to filename quality: The image quality, integer in range [1, 99] max_size: the maximum size of the picture, e.g 1200 Returns: """ data = self._neo_wda_screenshot() # output cv2 object try: screen = aircv.utils.string_2_img(data) except: # may be black/locked screen or other reason, print exc for debugging traceback.print_exc() return None # save as file if needed if filename: aircv.imwrite(filename, screen, quality, max_size=max_size) return screen
[docs] def get_frame_from_stream(self): if self.cap_method == CAP_METHOD.MJPEG: try: return self.mjpegcap.get_frame_from_stream() except ConnectionRefusedError: self.cap_method = CAP_METHOD.WDACAP return self._neo_wda_screenshot()
[docs] def touch(self, pos, duration=0.01): """ Args: pos: coordinates (x, y), can be float(percent) or int duration (optional): tap_hold duration Returns: None Examples: >>> touch((100, 100)) >>> touch((0.5, 0.5), duration=1) """ # trans pos of click, pos can be percentage or real coordinate x, y = self._transform_xy(pos), y, duration)
[docs] def double_click(self, pos): x, y = self._transform_xy(pos) self.driver.double_tap(x, y)
[docs] def swipe(self, fpos, tpos, duration=0, *args, **kwargs): """ Args: fpos: start point tpos: end point duration (float): start coordinate press duration (seconds), default is 0 Returns: None Examples: >>> swipe((1050, 1900), (150, 1900)) >>> swipe((0.2, 0.5), (0.8, 0.5)) """ fx, fy = self._transform_xy(fpos) tx, ty = self._transform_xy(tpos) self.driver.swipe(fx, fy, tx, ty, duration)
[docs] def keyevent(self, keyname, **kwargs): """ Perform keyevent on the device Args: keyname: home/volumeUp/volumeDown **kwargs: Returns: """ try: keyname = KEY_EVENTS[keyname.lower()] except KeyError: raise ValueError("Invalid name: %s, should be one of ('home', 'volumeUp', 'volumeDown')" % keyname) else:
[docs] def press(self, keys): """some keys in ["home", "volumeUp", "volumeDown"] can be pressed"""
[docs] def text(self, text, enter=True): """ Input text on the device Args: text: text to input enter: True if you need to enter a newline at the end Returns: None Examples: >>> text("test") >>> text("中文") """ if enter: text += '\n' self.driver.send_keys(text)
[docs] def install_app(self, uri, package): """ curl -X POST $JSON_HEADER \ -d "{\"desiredCapabilities\":{\"bundleId\":\"\", \"app\":\"[host_path]/\"}}" \ $DEVICE_URL/session """ raise NotImplementedError
[docs] def start_app(self, package, *args): """ Args: package: the app bundle id, e.g ```` Returns: None Examples: >>> start_app('') """ self.driver.app_launch(bundle_id=package)
[docs] def stop_app(self, package): """ Args: package: the app bundle id, e.g ```` Returns: """ self.driver.app_stop(bundle_id=package)
[docs] def app_state(self, package): """ Args: package: Returns: { "value": 4, "sessionId": "0363BDC5-4335-47ED-A54E-F7CCB65C6A65" } value 1(not running) 2(running in background) 3(running in foreground)? 4(running) Examples: >>> dev = device() >>> start_app('') >>> print(dev.app_state('')["value"]) # --> output is 4 >>> home() >>> print(dev.app_state('')["value"]) # --> output is 3 >>> stop_app('') >>> print(dev.app_state('')["value"]) # --> output is 1 """ # output {"value": 4, "sessionId": "xxxxxx"} # different value means 1: die, 2: background, 4: running return self.driver.app_state(bundle_id=package)
[docs] def app_current(self): """ get the app current Notes: Might not work on all devices Returns: current app state dict, eg: {"pid": 1281, "name": "", "bundleId": "com.netease.cloudmusic"} """ return self.driver.app_current()
[docs] def get_ip_address(self): """ get ip address from webDriverAgent Returns: raise if no IP address has been found, otherwise return the IP address """ return self.driver.status()['ios']['ip']
[docs] def device_status(self): """ show status return by webDriverAgent Return dicts of infos """ return self.driver.status()
def _touch_point_by_orientation(self, tuple_xy): """ Convert image coordinates to physical display coordinates, the arbitrary point (origin) is upper left corner of the device physical display Args: tuple_xy: image coordinates (x, y) Returns: """ x, y = tuple_xy # 1. 如果使用了2022.03.30之后发布的iOS-Tagent版本,则必须要进行竖屏坐标转换 # 2. 如果使用了appium/WebDriverAgent>=4.1.4版本,直接使用原坐标即可,无需转换 # 3. 如果使用了appium/WebDriverAgent<4.1.4版本,或低版本的iOS-Tagent,并且ipad下横屏点击异常,请改用airtest<=1.2.4 if self.using_ios_tagent: width = self.display_info["width"] height = self.display_info["height"] if self.orientation in [wda.LANDSCAPE, wda.LANDSCAPE_RIGHT]: width, height = height, width if x < 1 and y < 1: x = x * width y = y * height x, y = XYTransformer.up_2_ori( (x, y), (width, height), self.orientation ) return x, y def _transform_xy(self, pos): x, y = self._touch_point_by_orientation(pos) # scale touch postion if not (x < 1 and y < 1): x, y = int(x * self.touch_factor), int(y * self.touch_factor) return x, y def _check_orientation_change(self): pass
[docs] def is_locked(self): """ Return True or False whether the device is locked or not Notes: Might not work on some devices Returns: True or False """ return self.driver.locked()
[docs] def unlock(self): """ Unlock the device, unlock screen, double press home Notes: Might not work on all devices Returns: None """ return self.driver.unlock()
[docs] def lock(self): """ lock the device, lock screen Notes: Might not work on all devices Returns: None """ return self.driver.lock()
[docs] def alert_accept(self): """ Alert accept-Actually do click first alert button Notes: Might not work on all devices Returns: None """ return self.driver.alert.accept()
[docs] def alert_dismiss(self): """ Alert dissmiss-Actually do click second alert button Notes: Might not work on all devices Returns: None """ return self.driver.alert.dismiss()
[docs] def alert_wait(self, time_counter=2): """ if alert apper in time_counter second it will return True,else return False (default 20.0) time_counter default is 2 seconds Notes: Might not work on all devices Returns: None """ return self.driver.alert.wait(time_counter)
[docs] def alert_buttons(self): """ get alert buttons text. Notes: Might not work on all devices Returns: # example return: ("设置", "好") """ return self.driver.alert.buttons()
[docs] def alert_exists(self): """ get True for alert exists or False. Notes: Might not work on all devices Returns: True or False """ return self.driver.alert.exists
[docs] def alert_click(self, buttons): """ when Arg type is list, click the first match, raise ValueError if no match eg. ["设置", "信任", "安装"] Notes: Might not work on all devices Returns: None """ return
[docs] def home_interface(self): """ get True for the device status is on home interface. Reason: some devices can Horizontal screen on the home interface Notes: Might not work on all devices Returns: True or False """ try: app_current_dict = self.app_current() app_current_bundleId = app_current_dict.get('bundleId')"app_current_bundleId %s", app_current_bundleId) except: return False else: if app_current_bundleId in ['']: return True return False
[docs] def disconnect(self): """ discconect mjpeg and rotation_watcher Returns: None """ if self.cap_method == CAP_METHOD.MJPEG: self.mjpegcap.teardown_stream() if self.rotation_watcher: self.rotation_watcher.teardown()
if __name__ == "__main__": start = time.time() ios = IOS("") ios.snapshot() # ios.touch((242 * 2 + 10, 484 * 2 + 20)) # ios.start_app("") ios.home() ios.start_app('') ios.touch((88, 88)) ios.stop_app('') ios.swipe((100, 100), (800, 100)) print(ios.device_status()) print(ios.get_ip_address())