用uiautomator2自动化添加微信好友

from tkinter import * from ttkbootstrap import * from tkinter import filedialog import uiautomator2 as u2 import subprocess import xlrd2 import time import threading class WinGUI (Tk): def __init__(self): super().__init__() self.__win() self.count = 0 self.thread_running = False # 新增变量,用于追踪线程是否正在运行 self.thread_should_stop = False # 新增变量,用于指示线程是否应该停止 self.stop_event = threading.Event() # 创建线程事件 self.tk_button_button2 = self.__tk_button_button2(self,command=self.start_thread) #开始 self.tk_text_textlog = self.__tk_text_textlog(self) #文本框 self.tk_text_text1 = self.__tk_text_text1(self) #次数框 self.tk_text_text2 = self.__tk_text_text2(self) #秒数框 self.tk_text_text3 = self.__tk_text_text3(self) #申请信息 self.tk_label_lwod5rba = self.__tk_label_lwod5rba(self) self.tk_label_lwod635a = self.__tk_label_lwod635a(self) def __win(self): self.title("adb微信加好友") # 设置窗口大小、居中 width = 594 height = 544 screenwidth = self.winfo_screenwidth() screenheight = self.winfo_screenheight() geometry = '%dx%d+%d+%d' % (width, height, (screenwidth - width) / 2, (screenheight - height) / 2) self.geometry(geometry) self.resizable(width=False, height=False) def scrollbar_autohide(self,vbar, hbar, widget): """自动隐藏滚动条""" def show(): if vbar: vbar.lift(widget) if hbar: hbar.lift(widget) def hide(): if vbar: vbar.lower(widget) if hbar: hbar.lower(widget) hide() widget.bind("<Enter>", lambda e: show()) if vbar: vbar.bind("<Enter>", lambda e: show()) if vbar: vbar.bind("<Leave>", lambda e: hide()) if hbar: hbar.bind("<Enter>", lambda e: show()) if hbar: hbar.bind("<Leave>", lambda e: hide()) widget.bind("<Leave>", lambda e: hide()) def v_scrollbar(self,vbar, widget, x, y, w, h, pw, ph): widget.configure(yscrollcommand=vbar.set) vbar.config(command=widget.yview) vbar.place(relx=(w + x) / pw, rely=y / ph, relheight=h / ph, anchor='ne') def h_scrollbar(self,hbar, widget, x, y, w, h, pw, ph): widget.configure(xscrollcommand=hbar.set) hbar.config(command=widget.xview) hbar.place(relx=x / pw, rely=(y + h) / ph, relwidth=w / pw, anchor='sw') def create_bar(self,master, widget,is_vbar,is_hbar, x, y, w, h, pw, ph): vbar, hbar = None, None if is_vbar: vbar = Scrollbar(master) self.v_scrollbar(vbar, widget, x, y, w, h, pw, ph) if is_hbar: hbar = Scrollbar(master, orient="horizontal") self.h_scrollbar(hbar, widget, x, y, w, h, pw, ph) self.scrollbar_autohide(vbar, hbar, widget) def __tk_button_button2(self,parent,command=None): btn = Button(parent, text="开始", takefocus=False,command=command) btn.place(x=343, y=95, width=52, height=33) return btn def __tk_text_textlog(self,parent): text = Text(parent) text.place(x=7, y=172, width=576, height=356) return text def __tk_text_text1(self,parent): text = Entry(parent) text.place(x=6, y=46, width=51, height=32) text.insert('end', '10') return text def __tk_text_text2(self,parent): text = Entry(parent) text.place(x=91, y=44, width=48, height=33) text.insert('end', '50') return text def __tk_text_text3(self,parent): text = Entry(parent) text.place(x=245, y=44, width=278, height=31) text.insert('end', '留言') return text def __tk_label_lwod5rba(self,parent): label = Label(parent,text="多少次暂停",anchor="center", ) label.place(x=0, y=10, width=74, height=30) return label def __tk_label_lwod635a(self,parent): label = Label(parent,text="暂停多少秒",anchor="center", ) label.place(x=79, y=9, width=74, height=30) return label def ini_adb(self): cmd_command = 'adb start-server' result = subprocess.run(cmd_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: print("ADB启动成功") self.tk_text_textlog.insert('end', f"ADB启动成功\n") self.updat_log() try: d = u2.connect("") package_name = "com.tencent.mm" activity_name = ".ui.LauncherUI" self.tk_text_textlog.insert('end', f"如果有两个微信请手动选择\n") self.updat_log() d.app_stop(package_name) d.app_start(package_name, activity_name, wait=True ) except Exception as e: self.tk_text_textlog.insert('end', f"链接手机或模拟器失败错误:{e}\n") self.updat_log() return d else: d=f"abd启动失败: {result.stderr}" self.tk_text_textlog.insert('end', f"{d}\n") return d def wx_audo(self,d,name_key,keyword): m9y = self.tk_text_text2.get() txt=f"{name_key}*****{keyword}" time.sleep(1) d(resourceId="com.tencent.mm:id/meb").click() # 点击搜索框 d.send_keys(keyword, clear=True) # 输入搜索关键字 d.press('enter') d(resourceId="com.tencent.mm:id/mdq").click() # 点击搜索结果 time.sleep(1) if d(resourceId="com.tencent.mm:id/mm_alert_ok_btn").exists(timeout=5): d(resourceId="com.tencent.mm:id/mm_alert_ok_btn").click() print(f"添加失败:{name_key}*****{keyword}") self.tk_text_textlog.insert('end', f"添加失败,手机号搜索不到用户:{txt}\n") self.updat_log() with open('添加失败.txt', 'a', encoding='utf-8') as file: file.write(txt + '\n') # 每个URL后面添加一个换行符面添加一个换行符 d.press("back") time.sleep(2) else: text=d(resourceId="com.tencent.mm:id/o3b").get_text() #添加到通讯录 if text == "添加到通讯录": d(resourceId="com.tencent.mm:id/o3b").click() #添加到通讯录 d(resourceId="com.tencent.mm:id/m9y").click() #发送添加朋友申请 d.send_keys(m9y, clear=True) # 输入搜索关键字 d.press('enter') d(resourceId="com.tencent.mm:id/m_1").click() #设置备注 d.send_keys(name_key, clear=True) d.press('enter') d(resourceId="com.tencent.mm:id/g68").click() #发送 time.sleep(2) print(f"添加成功:{txt}") self.tk_text_textlog.insert('end', f"添加成功:{txt}\n") self.updat_log() d.press("back") d.press("back") with open('添加成功.txt', 'a', encoding='utf-8') as file: file.write(txt + '\n') # 每个URL后面添加一个换行符面添加一个换行符 time.sleep(2) return True else: self.tk_text_textlog.insert('end', f"已是好友:{txt}\n") self.updat_log() with open('已是好友.txt', 'a', encoding='utf-8') as file: file.write(txt + '\n') # 每个URL后面添加一个换行符面添加一个换行符 d.press("back") time.sleep(1) d.press("back") time.sleep(2) return False def kill_adb(self): cmd_command = 'adb kill-server' # 这是一个Windows CMD命令,用于列出当前目录的文件和文件夹 result = subprocess.run(cmd_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: print("ADB关闭成功") self.tk_text_textlog.insert('end', f"ADB关闭成功\n") self.updat_log() return True def start_thread(self): if self.thread_running: # 触发事件以停止线程 self.stop_event.set() self.tk_button_button2.config(text="开始") self.thread_running = False else: # 清除事件以便下次可以重新设置 self.stop_event.clear() self.tk_button_button2.config(text="停止") self.thread_running = True self.start_main_thread() def start_main_thread(self): # 创建并启动线程 thread = threading.Thread(target=self.main_in_thread) thread.start() def updat_log(self): self.tk_text_textlog.update_idletasks() # 强制处理待处理的更新 self.tk_text_textlog.see('end') def main_in_thread(self): while not self.stop_event.is_set(): # 使用事件检查是否应停止 try: file_path = filedialog.askopenfilename() self.tk_text_textlog.insert('end', f"{file_path}\n") self.updat_log() infor = xlrd2.open_workbook(file_path) sheet = infor.sheet_by_name("Sheet1") phonenumbers = sheet.col_values(1) name = sheet.col_values(0) stop = int(self.tk_text_text1.get()) # 次数 m = int(self.tk_text_text3.get()) # 秒 self.tk_text_textlog.insert('end', f"你设置了{stop}次暂停{m}秒\n") self.updat_log() d = self.ini_adb() while not self.stop_event.is_set(): # 等待停止事件 for a, j in zip(name, phonenumbers): if self.stop_event.is_set(): break k = int(j) i = str(k) self.wx_audo(d, name_key=a, keyword=i) self.count += 1 if self.count % stop == 0: print("暂停{m}秒...") time.sleep(1) self.tk_text_textlog.insert('end', f"开始暂停{m}秒\n") self.updat_log() time.sleep(m) # 假设暂停m分钟 except Exception as e: print(f"出错了:{e}") self.kill_adb() self.tk_text_textlog.insert('end', f"出错了:{e}\n") self.updat_log() self.stop_event.set() # 在异常时触发停止事件 break self.thread_running = False self.tk_button_button2.config(text="开始") if __name__ == "__main__": win = WinGUI() win.mainloop()
如果觉得我的文章对你有用,请随意赞赏