airtest.report.report 源代码

#!/usr/bin/env python
# -*- coding:utf8 -*-
import json
import os
import io
import re
import six
import sys
from PIL import Image
import shutil
import jinja2
import traceback
from copy import deepcopy
from datetime import datetime
from markupsafe import Markup, escape
from six.moves.urllib.parse import parse_qsl, urlparse
try:
    from jinja2 import evalcontextfilter as pass_eval_context  # jinja2<3.1
except:
    from jinja2 import pass_eval_context  # jinja2>=3.1
from airtest.aircv import imread, get_resolution
from airtest.aircv.error import FileNotExistError
from airtest.core.settings import Settings as ST
from airtest.aircv.utils import compress_image
from airtest.utils.compat import decode_path, script_dir_name
from airtest.cli.info import get_script_info
from airtest.utils.logger import get_logger
from airtest.utils.snippet import parse_device_uri
from six import PY3

LOGGING = get_logger(__name__)
DEFAULT_LOG_DIR = "log"
DEFAULT_LOG_FILE = "log.txt"
HTML_TPL = "log_template.html"
HTML_FILE = "log.html"
STATIC_DIR = os.path.dirname(__file__)


_paragraph_re = re.compile(r'(?:\r\n|\r|\n){2,}')


[文档]@pass_eval_context def nl2br(eval_ctx, value): result = u'\n\n'.join(u'<p>%s</p>' % p.replace('\n', '<br>\n') for p in _paragraph_re.split(escape(value))) if eval_ctx.autoescape: result = Markup(result) return result
[文档]def timefmt(timestamp): """ Formatting of timestamp in Jinja2 templates :param timestamp: timestamp of steps :return: "%Y-%m-%d %H:%M:%S" """ return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
[文档]class LogToHtml(object): """Convert log to html display """ scale = 0.5 def __init__(self, script_root, log_root="", static_root="", export_dir=None, script_name="", logfile=None, lang="en", plugins=None): self.log = [] self.devices = {} self.script_root = script_root self.script_name = script_name if not self.script_name or os.path.isfile(self.script_root): self.script_root, self.script_name = script_dir_name(self.script_root) self.log_root = log_root or ST.LOG_DIR or os.path.join(".", DEFAULT_LOG_DIR) self.static_root = static_root or STATIC_DIR self.test_result = True self.run_start = None self.run_end = None self.export_dir = export_dir self.logfile = logfile or getattr(ST, "LOG_FILE", DEFAULT_LOG_FILE) self.lang = lang self.init_plugin_modules(plugins)
[文档] @staticmethod def init_plugin_modules(plugins): if not plugins: return for plugin_name in plugins: LOGGING.debug("try loading plugin: %s" % plugin_name) try: __import__(plugin_name) except: LOGGING.error(traceback.format_exc())
def _load(self): logfile = os.path.join(self.log_root, self.logfile) if not PY3: logfile = logfile.encode(sys.getfilesystemencoding()) with io.open(logfile, encoding="utf-8") as f: for line in f.readlines(): self.log.append(json.loads(line)) def _analyse(self): """ 解析log成可渲染的dict """ steps = [] children_steps = [] for log in self.log: depth = log['depth'] if not self.run_start: self.run_start = log.get('data', {}).get('start_time', '') or log["time"] self.run_end = log["time"] if depth == 0: # single log line, not in stack steps.append(log) elif depth == 1: step = deepcopy(log) step["__children__"] = children_steps steps.append(step) children_steps = [] else: children_steps.insert(0, log) translated_steps = [self._translate_step(s) for s in steps] if len(translated_steps) > 0 and translated_steps[-1].get("traceback"): # Final Error self.test_result = False return translated_steps def _translate_step(self, step): """translate single step""" name = step["data"]["name"] title = self._translate_title(name, step) code = self._translate_code(step) desc = self._translate_desc(step, code) screen = self._translate_screen(step, code) info = self._translate_info(step) assertion = self._translate_assertion(step) self._translate_device(step) translated = { "title": title, "time": step["time"], "code": code, "screen": screen, "desc": desc, "traceback": info[0], "log": info[1], "assert": assertion, } return translated def _translate_assertion(self, step): if "assert_" in step["data"].get("name", "") and "msg" in step["data"].get("call_args", {}): return step["data"]["call_args"]["msg"] def _translate_screen(self, step, code): if step['tag'] not in ["function", "info"] or not step.get("__children__"): return None screen = { "src": None, "rect": [], "pos": [], "vector": [], "confidence": None, } for item in step["__children__"]: if item["data"]["name"] == "try_log_screen": snapshot = item["data"].get("ret", None) if isinstance(snapshot, six.text_type): src = snapshot elif isinstance(snapshot, dict): src = snapshot['screen'] screen['resolution'] = snapshot['resolution'] else: continue if self.export_dir: # all relative path screen['_filepath'] = os.path.join(DEFAULT_LOG_DIR, src) else: screen['_filepath'] = os.path.abspath(os.path.join(self.log_root, src)) screen['src'] = screen['_filepath'] self.get_thumbnail(os.path.join(self.log_root, src)) screen['thumbnail'] = self.get_small_name(screen['src']) break display_pos = None for item in step["__children__"]: if item["data"]["name"] == "_cv_match" and isinstance(item["data"].get("ret"), dict): cv_result = item["data"]["ret"] pos = cv_result['result'] if self.is_pos(pos): display_pos = [round(pos[0]), round(pos[1])] rect = self.div_rect(cv_result['rectangle']) screen['rect'].append(rect) screen['confidence'] = cv_result['confidence'] break if step["data"]["name"] in ["touch", "assert_exists", "wait", "exists"]: # 将图像匹配得到的pos修正为最终pos if self.is_pos(step["data"].get("ret")): display_pos = step["data"]["ret"] elif self.is_pos(step["data"]["call_args"].get("v")): display_pos = step["data"]["call_args"]["v"] elif step["data"]["name"] == "swipe": if "ret" in step["data"]: screen["pos"].append(step["data"]["ret"][0]) target_pos = step["data"]["ret"][1] origin_pos = step["data"]["ret"][0] screen["vector"].append([target_pos[0] - origin_pos[0], target_pos[1] - origin_pos[1]]) if display_pos: screen["pos"].append(display_pos) return screen def _translate_device(self, step): if step["tag"] == "function" and step["data"]["name"] == "connect_device": uri = step["data"]["call_args"]["uri"] platform, uuid, params = parse_device_uri(uri) if "name" in params: uuid = params["name"] if uuid not in self.devices: self.devices[uuid] = uri return uuid return None
[文档] @classmethod def get_thumbnail(cls, path): """compress screenshot""" new_path = cls.get_small_name(path) if not os.path.isfile(new_path): try: img = Image.open(path) compress_image(img, new_path, ST.SNAPSHOT_QUALITY, max_size=300) except Exception: LOGGING.error(traceback.format_exc()) return new_path else: return None
[文档] @classmethod def get_small_name(cls, filename): name, ext = os.path.splitext(filename) return "%s_small%s" % (name, ext)
def _translate_info(self, step): trace_msg, log_msg = "", "" if "traceback" in step["data"]: # 若包含有traceback内容,将会认定步骤失败 trace_msg = step["data"]["traceback"] if step["tag"] == "info": if "log" in step["data"]: # 普通文本log内容,仅显示 log_msg = step["data"]["log"] return trace_msg, log_msg def _translate_code(self, step): if step["tag"] != "function": return None step_data = step["data"] args = [] code = { "name": step_data["name"], "args": args, } for key, value in step_data["call_args"].items(): args.append({ "key": key, "value": value, }) for k, arg in enumerate(args): value = arg["value"] if isinstance(value, dict) and value.get("__class__") == "Template": if self.export_dir: # all relative path image_path = value['filename'] if not os.path.isfile(os.path.join(self.script_root, image_path)) and value['_filepath']: # copy image used by using statement shutil.copyfile(value['_filepath'], os.path.join(self.script_root, value['filename'])) else: image_path = os.path.abspath(value['_filepath'] or value['filename']) arg["image"] = image_path try: if not value['_filepath'] and not os.path.exists(value['filename']): crop_img = imread(os.path.join(self.script_root, value['filename'])) else: crop_img = imread(value['_filepath'] or value['filename']) except FileNotExistError: # 在某些情况下会报图片不存在的错误(偶现),但不应该影响主流程 if os.path.exists(image_path): arg["resolution"] = get_resolution(imread(image_path)) else: arg["resolution"] = (0, 0) else: arg["resolution"] = get_resolution(crop_img) return code
[文档] @staticmethod def div_rect(r): """count rect for js use""" xs = [p[0] for p in r] ys = [p[1] for p in r] left = min(xs) top = min(ys) w = max(xs) - left h = max(ys) - top return {'left': left, 'top': top, 'width': w, 'height': h}
def _translate_desc(self, step, code): """ 函数描述 """ if step['tag'] != "function": return None name = step['data']['name'] res = step['data'].get('ret') args = {i["key"]: i["value"] for i in code["args"]} desc = { "snapshot": lambda: u"Screenshot description: %s" % args.get("msg"), "touch": lambda: u"Touch %s" % ("target image" if isinstance(args['v'], dict) else "coordinates %s" % args['v']), "swipe": u"Swipe on screen", "wait": u"Wait for target image to appear", "exists": lambda: u"Image %s exists" % ("" if res else "not"), "text": lambda: u"Input text:%s" % args.get('text'), "keyevent": lambda: u"Click [%s] button" % args.get('keyname'), "sleep": lambda: u"Wait for %s seconds" % args.get('secs'), "assert_exists": u"Assert target image exists", "assert_not_exists": u"Assert target image does not exists", "connect_device": lambda : u"Connect device: %s" % args.get("uri"), } # todo: 最好用js里的多语言实现 desc_zh = { "snapshot": lambda: u"截图描述: %s" % args.get("msg"), "touch": lambda: u"点击 %s" % (u"目标图片" if isinstance(args['v'], dict) else u"屏幕坐标 %s" % args['v']), "swipe": u"滑动操作", "wait": u"等待目标图片出现", "exists": lambda: u"图片%s存在" % ("" if res else u"不"), "text": lambda: u"输入文字:%s" % args.get('text'), "keyevent": lambda: u"点击[%s]按键" % args.get('keyname'), "sleep": lambda: u"等待%s秒" % args.get('secs'), "assert_exists": u"断言目标图片存在", "assert_not_exists": u"断言目标图片不存在", "connect_device": lambda : u"连接设备: %s" % args.get("uri"), } if self.lang == "zh": desc = desc_zh ret = desc.get(name) if callable(ret): ret = ret() return ret def _translate_title(self, name, step): title = { "touch": u"Touch", "swipe": u"Swipe", "wait": u"Wait", "exists": u"Exists", "text": u"Text", "keyevent": u"Keyevent", "sleep": u"Sleep", "assert_exists": u"Assert exists", "assert_not_exists": u"Assert not exists", "snapshot": u"Snapshot", "assert_equal": u"Assert equal", "assert_not_equal": u"Assert not equal", "connect_device": u"Connect device", } return title.get(name, name) @staticmethod def _render(template_name, output_file=None, **template_vars): """ 用jinja2渲染html""" env = jinja2.Environment( loader=jinja2.FileSystemLoader(STATIC_DIR), extensions=(), autoescape=True ) env.filters['nl2br'] = nl2br env.filters['datetime'] = timefmt template = env.get_template(template_name) html = template.render(**template_vars) if output_file: with io.open(output_file, 'w', encoding="utf-8") as f: f.write(html) LOGGING.info(output_file) return html
[文档] def is_pos(self, v): return isinstance(v, (list, tuple))
[文档] def copy_tree(self, src, dst, ignore=None): try: shutil.copytree(src, dst, ignore=ignore) except: LOGGING.error(traceback.format_exc())
def _make_export_dir(self): """mkdir & copy /staticfiles/screenshots""" # let dirname = <script name>.log dirname = self.script_name.replace(os.path.splitext(self.script_name)[1], ".log") # mkdir dirpath = os.path.join(self.export_dir, dirname) if os.path.isdir(dirpath): shutil.rmtree(dirpath, ignore_errors=True) # copy script def ignore_export_dir(dirname, filenames): # 忽略当前导出的目录,防止递归导出 if os.path.commonprefix([dirpath, dirname]) == dirpath: return filenames return [] self.copy_tree(self.script_root, dirpath, ignore=ignore_export_dir) # copy log logpath = os.path.join(dirpath, DEFAULT_LOG_DIR) if os.path.normpath(logpath) != os.path.normpath(self.log_root): if os.path.isdir(logpath): shutil.rmtree(logpath, ignore_errors=True) self.copy_tree(self.log_root, logpath, ignore=shutil.ignore_patterns(dirname)) # if self.static_root is not a http server address, copy static files from local directory if not self.static_root.startswith("http"): for subdir in ["css", "fonts", "image", "js"]: self.copy_tree(os.path.join(self.static_root, subdir), os.path.join(dirpath, "static", subdir)) return dirpath, logpath
[文档] def get_relative_log(self, output_file): """ Try to get the relative path of log.txt :param output_file: output file: log.html :return: ./log.txt or "" """ try: html_dir = os.path.dirname(output_file) if self.export_dir: # When exporting reports, the log directory will be named log/ (DEFAULT_LOG_DIR), # so the relative path of log.txt is log/log.txt return os.path.join(DEFAULT_LOG_DIR, os.path.basename(self.logfile)) return os.path.relpath(os.path.join(self.log_root, self.logfile), html_dir) except: LOGGING.error(traceback.format_exc()) return ""
[文档] def get_console(self, output_file): html_dir = os.path.dirname(output_file) file = os.path.join(html_dir, 'console.txt') content = "" if os.path.isfile(file): try: content = self.readFile(file) except Exception: try: content = self.readFile(file, "gbk") except Exception: content = traceback.format_exc() + content content = content + "Can not read console.txt. Please check file in:\n" + file return content
[文档] def readFile(self, filename, code='utf-8'): content = "" with io.open(filename, encoding=code) as f: for line in f.readlines(): content = content + line return content
[文档] def report_data(self, output_file=None, record_list=None): """ Generate data for the report page :param output_file: The file name or full path of the output file, default HTML_FILE :param record_list: List of screen recording files :return: """ self._load() steps = self._analyse() script_path = os.path.join(self.script_root, self.script_name) info = json.loads(get_script_info(script_path)) info['devices'] = self.devices if record_list: records = [os.path.join(DEFAULT_LOG_DIR, f) if self.export_dir else os.path.abspath(os.path.join(self.log_root, f)) for f in record_list] else: records = [] if not self.static_root.endswith(os.path.sep): self.static_root = self.static_root.replace("\\", "/") self.static_root += "/" if not output_file: output_file = HTML_FILE data = {} data['steps'] = steps data['name'] = self.script_root data['scale'] = self.scale data['test_result'] = self.test_result data['run_end'] = self.run_end data['run_start'] = self.run_start data['static_root'] = self.static_root data['lang'] = self.lang data['records'] = records data['info'] = info data['log'] = self.get_relative_log(output_file) data['console'] = self.get_console(output_file) # 如果带有<>符号,容易被highlight.js认为是特殊语法,有可能导致页面显示异常,尝试替换成不常用的{} info = json.dumps(data).replace("<", "{").replace(">", "}") data['data'] = info return data
[文档] def report(self, template_name=HTML_TPL, output_file=HTML_FILE, record_list=None): """ Generate the report page, you can add custom data and overload it if needed :param template_name: default is HTML_TPL :param output_file: The file name or full path of the output file, default HTML_FILE :param record_list: List of screen recording files :return: """ if not self.script_name: path, self.script_name = script_dir_name(self.script_root) if self.export_dir: self.script_root, self.log_root = self._make_export_dir() # output_file可传入文件名,或绝对路径 output_file = output_file if output_file and os.path.isabs(output_file) \ else os.path.join(self.script_root, output_file or HTML_FILE) if not self.static_root.startswith("http"): self.static_root = "static/" if not record_list: record_list = [f for f in os.listdir(self.log_root) if f.endswith(".mp4")] data = self.report_data(output_file=output_file, record_list=record_list) return self._render(template_name, output_file, **data)
[文档]def simple_report(filepath, logpath=True, logfile=None, output=HTML_FILE): path, name = script_dir_name(filepath) if logpath is True: logpath = os.path.join(path, getattr(ST, "LOG_DIR", DEFAULT_LOG_DIR)) rpt = LogToHtml(path, logpath, logfile=logfile or getattr(ST, "LOG_FILE", DEFAULT_LOG_FILE), script_name=name) rpt.report(HTML_TPL, output_file=output)
[文档]def get_parger(ap): ap.add_argument("script", help="script filepath") ap.add_argument("--outfile", help="output html filepath, default to be log.html", default=HTML_FILE) ap.add_argument("--static_root", help="static files root dir") ap.add_argument("--log_root", help="log & screen data root dir, logfile should be log_root/log.txt") ap.add_argument("--record", help="custom screen record file path", nargs="+") ap.add_argument("--export", help="export a portable report dir containing all resources") ap.add_argument("--lang", help="report language", default="en") ap.add_argument("--plugins", help="load reporter plugins", nargs="+") ap.add_argument("--report", help="placeholder for report cmd", default=True, nargs="?") return ap
[文档]def main(args): # script filepath path, name = script_dir_name(args.script) record_list = args.record or [] log_root = decode_path(args.log_root) or decode_path(os.path.join(path, DEFAULT_LOG_DIR)) static_root = args.static_root or STATIC_DIR static_root = decode_path(static_root) export = decode_path(args.export) if args.export else None lang = args.lang if args.lang in ['zh', 'en'] else 'en' plugins = args.plugins # gen html report rpt = LogToHtml(path, log_root, static_root, export_dir=export, script_name=name, lang=lang, plugins=plugins) rpt.report(HTML_TPL, output_file=args.outfile, record_list=record_list)
if __name__ == "__main__": import argparse ap = argparse.ArgumentParser() args = get_parger(ap).parse_args() main(args)