用uiautomator2自动化添加微信好友
from tkinter import *
from ttkbootstrap import *
from tkinter import filedialog
import uiautomator2 as u2
import subprocess
import xlrd2
import time
import threading
class WinGUI (Tk):
def __init__(self):
super().__init__()
self.__win()
self.count = 0
self.thread_running = False # 新增变量,用于追踪线程是否正在运行
self.thread_should_stop = False # 新增变量,用于指示线程是否应该停止
self.stop_event = threading.Event() # 创建线程事件
self.tk_button_button2 = self.__tk_button_button2(self,command=self.start_thread) #开始
self.tk_text_textlog = self.__tk_text_textlog(self) #文本框
self.tk_text_text1 = self.__tk_text_text1(self) #次数框
self.tk_text_text2 = self.__tk_text_text2(self) #秒数框
self.tk_text_text3 = self.__tk_text_text3(self) #申请信息
self.tk_label_lwod5rba = self.__tk_label_lwod5rba(self)
self.tk_label_lwod635a = self.__tk_label_lwod635a(self)
def __win(self):
self.title("adb微信加好友")
# 设置窗口大小、居中
width = 594
height = 544
screenwidth = self.winfo_screenwidth()
screenheight = self.winfo_screenheight()
geometry = '%dx%d+%d+%d' % (width, height, (screenwidth - width) / 2, (screenheight - height) / 2)
self.geometry(geometry)
self.resizable(width=False, height=False)
def scrollbar_autohide(self,vbar, hbar, widget):
"""自动隐藏滚动条"""
def show():
if vbar: vbar.lift(widget)
if hbar: hbar.lift(widget)
def hide():
if vbar: vbar.lower(widget)
if hbar: hbar.lower(widget)
hide()
widget.bind("<Enter>", lambda e: show())
if vbar: vbar.bind("<Enter>", lambda e: show())
if vbar: vbar.bind("<Leave>", lambda e: hide())
if hbar: hbar.bind("<Enter>", lambda e: show())
if hbar: hbar.bind("<Leave>", lambda e: hide())
widget.bind("<Leave>", lambda e: hide())
def v_scrollbar(self,vbar, widget, x, y, w, h, pw, ph):
widget.configure(yscrollcommand=vbar.set)
vbar.config(command=widget.yview)
vbar.place(relx=(w + x) / pw, rely=y / ph, relheight=h / ph, anchor='ne')
def h_scrollbar(self,hbar, widget, x, y, w, h, pw, ph):
widget.configure(xscrollcommand=hbar.set)
hbar.config(command=widget.xview)
hbar.place(relx=x / pw, rely=(y + h) / ph, relwidth=w / pw, anchor='sw')
def create_bar(self,master, widget,is_vbar,is_hbar, x, y, w, h, pw, ph):
vbar, hbar = None, None
if is_vbar:
vbar = Scrollbar(master)
self.v_scrollbar(vbar, widget, x, y, w, h, pw, ph)
if is_hbar:
hbar = Scrollbar(master, orient="horizontal")
self.h_scrollbar(hbar, widget, x, y, w, h, pw, ph)
self.scrollbar_autohide(vbar, hbar, widget)
def __tk_button_button2(self,parent,command=None):
btn = Button(parent, text="开始", takefocus=False,command=command)
btn.place(x=343, y=95, width=52, height=33)
return btn
def __tk_text_textlog(self,parent):
text = Text(parent)
text.place(x=7, y=172, width=576, height=356)
return text
def __tk_text_text1(self,parent):
text = Entry(parent)
text.place(x=6, y=46, width=51, height=32)
text.insert('end', '10')
return text
def __tk_text_text2(self,parent):
text = Entry(parent)
text.place(x=91, y=44, width=48, height=33)
text.insert('end', '50')
return text
def __tk_text_text3(self,parent):
text = Entry(parent)
text.place(x=245, y=44, width=278, height=31)
text.insert('end', '留言')
return text
def __tk_label_lwod5rba(self,parent):
label = Label(parent,text="多少次暂停",anchor="center", )
label.place(x=0, y=10, width=74, height=30)
return label
def __tk_label_lwod635a(self,parent):
label = Label(parent,text="暂停多少秒",anchor="center", )
label.place(x=79, y=9, width=74, height=30)
return label
def ini_adb(self):
cmd_command = 'adb start-server'
result = subprocess.run(cmd_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
print("ADB启动成功")
self.tk_text_textlog.insert('end', f"ADB启动成功\n")
self.updat_log()
try:
d = u2.connect("")
package_name = "com.tencent.mm"
activity_name = ".ui.LauncherUI"
self.tk_text_textlog.insert('end', f"如果有两个微信请手动选择\n")
self.updat_log()
d.app_stop(package_name)
d.app_start(package_name, activity_name, wait=True )
except Exception as e:
self.tk_text_textlog.insert('end', f"链接手机或模拟器失败错误:{e}\n")
self.updat_log()
return d
else:
d=f"abd启动失败: {result.stderr}"
self.tk_text_textlog.insert('end', f"{d}\n")
return d
def wx_audo(self,d,name_key,keyword):
m9y = self.tk_text_text2.get()
txt=f"{name_key}*****{keyword}"
time.sleep(1)
d(resourceId="com.tencent.mm:id/meb").click() # 点击搜索框
d.send_keys(keyword, clear=True) # 输入搜索关键字
d.press('enter')
d(resourceId="com.tencent.mm:id/mdq").click() # 点击搜索结果
time.sleep(1)
if d(resourceId="com.tencent.mm:id/mm_alert_ok_btn").exists(timeout=5):
d(resourceId="com.tencent.mm:id/mm_alert_ok_btn").click()
print(f"添加失败:{name_key}*****{keyword}")
self.tk_text_textlog.insert('end', f"添加失败,手机号搜索不到用户:{txt}\n")
self.updat_log()
with open('添加失败.txt', 'a', encoding='utf-8') as file:
file.write(txt + '\n') # 每个URL后面添加一个换行符面添加一个换行符
d.press("back")
time.sleep(2)
else:
text=d(resourceId="com.tencent.mm:id/o3b").get_text() #添加到通讯录
if text == "添加到通讯录":
d(resourceId="com.tencent.mm:id/o3b").click() #添加到通讯录
d(resourceId="com.tencent.mm:id/m9y").click() #发送添加朋友申请
d.send_keys(m9y, clear=True) # 输入搜索关键字
d.press('enter')
d(resourceId="com.tencent.mm:id/m_1").click() #设置备注
d.send_keys(name_key, clear=True)
d.press('enter')
d(resourceId="com.tencent.mm:id/g68").click() #发送
time.sleep(2)
print(f"添加成功:{txt}")
self.tk_text_textlog.insert('end', f"添加成功:{txt}\n")
self.updat_log()
d.press("back")
d.press("back")
with open('添加成功.txt', 'a', encoding='utf-8') as file:
file.write(txt + '\n') # 每个URL后面添加一个换行符面添加一个换行符
time.sleep(2)
return True
else:
self.tk_text_textlog.insert('end', f"已是好友:{txt}\n")
self.updat_log()
with open('已是好友.txt', 'a', encoding='utf-8') as file:
file.write(txt + '\n') # 每个URL后面添加一个换行符面添加一个换行符
d.press("back")
time.sleep(1)
d.press("back")
time.sleep(2)
return False
def kill_adb(self):
cmd_command = 'adb kill-server' # 这是一个Windows CMD命令,用于列出当前目录的文件和文件夹
result = subprocess.run(cmd_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
print("ADB关闭成功")
self.tk_text_textlog.insert('end', f"ADB关闭成功\n")
self.updat_log()
return True
def start_thread(self):
if self.thread_running:
# 触发事件以停止线程
self.stop_event.set()
self.tk_button_button2.config(text="开始")
self.thread_running = False
else:
# 清除事件以便下次可以重新设置
self.stop_event.clear()
self.tk_button_button2.config(text="停止")
self.thread_running = True
self.start_main_thread()
def start_main_thread(self):
# 创建并启动线程
thread = threading.Thread(target=self.main_in_thread)
thread.start()
def updat_log(self):
self.tk_text_textlog.update_idletasks() # 强制处理待处理的更新
self.tk_text_textlog.see('end')
def main_in_thread(self):
while not self.stop_event.is_set(): # 使用事件检查是否应停止
try:
file_path = filedialog.askopenfilename()
self.tk_text_textlog.insert('end', f"{file_path}\n")
self.updat_log()
infor = xlrd2.open_workbook(file_path)
sheet = infor.sheet_by_name("Sheet1")
phonenumbers = sheet.col_values(1)
name = sheet.col_values(0)
stop = int(self.tk_text_text1.get()) # 次数
m = int(self.tk_text_text3.get()) # 秒
self.tk_text_textlog.insert('end', f"你设置了{stop}次暂停{m}秒\n")
self.updat_log()
d = self.ini_adb()
while not self.stop_event.is_set(): # 等待停止事件
for a, j in zip(name, phonenumbers):
if self.stop_event.is_set():
break
k = int(j)
i = str(k)
self.wx_audo(d, name_key=a, keyword=i)
self.count += 1
if self.count % stop == 0:
print("暂停{m}秒...")
time.sleep(1)
self.tk_text_textlog.insert('end', f"开始暂停{m}秒\n")
self.updat_log()
time.sleep(m) # 假设暂停m分钟
except Exception as e:
print(f"出错了:{e}")
self.kill_adb()
self.tk_text_textlog.insert('end', f"出错了:{e}\n")
self.updat_log()
self.stop_event.set() # 在异常时触发停止事件
break
self.thread_running = False
self.tk_button_button2.config(text="开始")
if __name__ == "__main__":
win = WinGUI()
win.mainloop()