# -*- coding: utf-8 -*-
import os
import time
import threading
import traceback
from airtest.utils.snippet import reg_cleanup, is_exiting, on_method_ready, kill_proc
from airtest.utils.nbsp import NonBlockingStreamReader
from airtest.utils.logger import get_logger
from airtest.core.android.constant import ORI_METHOD, ROTATIONWATCHER_JAR
LOGGING = get_logger(__name__)
[docs]class RotationWatcher(object):
"""
RotationWatcher class
"""
def __init__(self, adb, ori_method=ORI_METHOD.MINICAP):
self.adb = adb
self.ow_proc = None
self.nbsp = None
self.ow_callback = []
self.ori_method = ori_method
self._t = None
self._t_kill_event = threading.Event()
self.current_orientation = None
self.path_in_android = "/data/local/tmp/" + os.path.basename(ROTATIONWATCHER_JAR)
reg_cleanup(self.teardown)
[docs] @on_method_ready('start')
def get_ready(self):
pass
[docs] def install(self):
"""
Install the RotationWatcher package
Returns:
None
"""
try:
exists_file = self.adb.file_size(self.path_in_android)
except:
pass
else:
local_minitouch_size = int(os.path.getsize(ROTATIONWATCHER_JAR))
if exists_file and exists_file == local_minitouch_size:
LOGGING.debug("install_rotationwatcher skipped")
return
self.uninstall()
self.adb.push(ROTATIONWATCHER_JAR, self.path_in_android)
self.adb.shell("chmod 755 %s" % self.path_in_android)
LOGGING.info("install rotationwacher finished")
[docs] def uninstall(self):
"""
Uninstall the RotationWatcher package
Returns:
None
"""
self.adb.raw_shell("rm %s" % self.path_in_android)
[docs] def setup_server(self):
"""
Setup rotation wacher server
Returns:
server process
"""
self.install()
if self.ow_proc:
self.ow_proc.kill()
self.ow_proc = None
p = self.adb.start_shell(
"app_process -Djava.class.path={0} /data/local/tmp com.example.rotationwatcher.Main".format(
self.path_in_android))
self.nbsp = NonBlockingStreamReader(p.stdout, name="rotation_server", auto_kill=True)
if p.poll() is not None:
# server setup error, may be already setup by others
# subprocess exit immediately
raise RuntimeError("rotation watcher server quit immediately")
self.ow_proc = p
return p
[docs] def teardown(self):
self._t_kill_event.set()
if self.ow_proc:
kill_proc(self.ow_proc)
if self.nbsp:
self.nbsp.kill()
self.ow_callback = []
setattr(self, "_start_ready", None)
[docs] def start(self):
"""
Start the RotationWatcher daemon thread
Returns:
initial orientation
"""
if self.ori_method == ORI_METHOD.MINICAP:
try:
self.setup_server()
except:
# install or setup failed
LOGGING.error(traceback.format_exc())
LOGGING.error("RotationWatcher setup failed, use ADBORI instead.")
self.ori_method = ORI_METHOD.ADB
def _refresh_by_ow():
# 在产生旋转时,nbsp读取到的内容为b"90\r\n",平时读到的是空数据None,进程结束时读到的是b""
line = self.nbsp.readline()
if line is not None:
if line == b"":
self.teardown()
if LOGGING is not None: # may be None atexit
LOGGING.debug("orientationWatcher has ended")
else:
print("orientationWatcher has ended")
return None
ori = int(int(line) / 90)
return ori
# 每隔1秒读取一次
time.sleep(1)
def _refresh_by_adb():
ori = self.adb.getDisplayOrientation()
return ori
def _run(kill_event):
while not kill_event.is_set():
if self.ori_method == ORI_METHOD.ADB:
ori = _refresh_by_adb()
if self.current_orientation == ori:
time.sleep(3)
continue
else:
ori = _refresh_by_ow()
if ori is None:
# 以前ori=None是进程结束,现在屏幕方向不变时会返回None
continue
LOGGING.info('update orientation %s->%s' % (self.current_orientation, ori))
self.current_orientation = ori
if is_exiting():
self.teardown()
for cb in self.ow_callback:
try:
cb(ori)
except:
LOGGING.error("cb: %s error" % cb)
traceback.print_exc()
self.current_orientation = _refresh_by_ow() if self.ori_method != ORI_METHOD.ADB else _refresh_by_adb()
self._t = threading.Thread(target=_run, args=(self._t_kill_event, ), name="rotationwatcher")
self._t.daemon = True
self._t.start()
return self.current_orientation
[docs] def reg_callback(self, ow_callback):
"""
Args:
ow_callback:
Returns:
"""
"""方向变化的时候的回调函数,参数一定是ori,如果断掉了,ori传None"""
if ow_callback not in self.ow_callback:
self.ow_callback.append(ow_callback)