airtest.core.ios.relay 源代码

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Use python to forward the iOS mobile phone port
# refer to: http://iphonedevwiki.net/index.php/SSH_Over_USB
# https://github.com/nabla-c0d3/multcprelay
# 通过Python对iOS端口进行转发,可以参考网上的tcprelay代码,或是tidevice的relay.py(使用了tornado)
import socketserver as SocketServer
import select
import sys
import threading
from optparse import OptionParser
from airtest.utils.logger import get_logger


LOGGING = get_logger(__name__)


[文档]class SocketRelay(object): def __init__(self, a, b, maxbuf=65535): self.a = a self.b = b self.atob = b"" self.btoa = b"" self.maxbuf = maxbuf
[文档] def handle(self): while True: rlist = [] wlist = [] xlist = [self.a, self.b] if self.atob: wlist.append(self.b) if self.btoa: wlist.append(self.a) if len(self.atob) < self.maxbuf: rlist.append(self.a) if len(self.btoa) < self.maxbuf: rlist.append(self.b) rlo, wlo, xlo = select.select(rlist, wlist, xlist) if xlo: return if self.a in wlo: n = self.a.send(self.btoa) self.btoa = self.btoa[n:] if self.b in wlo: n = self.b.send(self.atob) self.atob = self.atob[n:] if self.a in rlo: s = self.a.recv(self.maxbuf - len(self.atob)) if not s: return self.atob += s if self.b in rlo: s = self.b.recv(self.maxbuf - len(self.btoa)) if not s: return self.btoa += s
[文档]class TCPRelay(SocketServer.BaseRequestHandler):
[文档] def handle(self): dev = self.server.device dsock = dev.create_inner_connection(self.server.rport)._sock lsock = self.request LOGGING.info("Connection established, relaying data") try: fwd = SocketRelay(dsock, lsock, self.server.bufsize * 1024) fwd.handle() finally: dsock.close() lsock.close() LOGGING.info("Connection closed")
[文档]class TCPServer(SocketServer.TCPServer): allow_reuse_address = True
[文档]class ThreadedTCPServer(SocketServer.ThreadingMixIn, TCPServer): # 显式指定为True,否则脚本运行完毕时,因为连接没有断开,导致线程不会终止 daemon_threads = True
if __name__ == '__main__': """ 本文件可以在usb仅插入一台iOS手机时,执行命令行:python relay.py -t 5001:5001 """ from wda.usbmux import Usbmux HOST = "localhost" parser = OptionParser(usage="usage: %prog [OPTIONS] RemotePort[:LocalPort] [RemotePort[:LocalPort]]...") parser.add_option("-t", "--threaded", dest='threaded', action='store_true', default=False, help="use threading to handle multiple connections at once") parser.add_option("-b", "--bufsize", dest='bufsize', action='store', metavar='KILOBYTES', type='int', default=128, help="specify buffer size for socket forwarding") parser.add_option("-s", "--socket", dest='sockpath', action='store', metavar='PATH', type='str', default=None, help="specify the path of the usbmuxd socket") options, args = parser.parse_args() serverclass = TCPServer if options.threaded: serverclass = ThreadedTCPServer if len(args) == 0: parser.print_help() sys.exit(1) ports = [] for arg in args: try: if ':' in arg: rport, lport = arg.split(":") rport = int(rport) lport = int(lport) ports.append((rport, lport)) else: ports.append((int(arg), int(arg))) except: parser.print_help() sys.exit(1) servers=[] for rport, lport in ports: LOGGING.info("Forwarding local port %d to remote port %d"%(lport, rport)) server = serverclass((HOST, lport), TCPRelay) # 当前仅有一台iOS手机连接usb时 dev_uuid = Usbmux().get_single_device_udid() dev = Usbmux().device(dev_uuid) server.rport = rport server.device = dev server.bufsize = options.bufsize servers.append(server) alive = True while alive: # sleep(1) try: rl, wl, xl = select.select(servers, [], []) for server in rl: server.handle_request() except: alive = False