大神论坛

找回密码
快速注册
查看: 542 | 回复: 1

[原创] 利用uiautomator2加模拟器自动刷抖音下载无水印视频

主题

帖子

7

积分

初入江湖

UID
226
积分
7
精华
威望
14 点
违规
大神币
68 枚
注册时间
2021-09-21 08:31
发表于 2021-10-08 21:16
本帖最后由 爱菜菜没蛀牙 于 2021-10-08 21:16 编辑

准备

1.adb
2.任意模拟器,我这里用的是夜神

成果展示

...试了半天,找不到处理gif的办法,40多m,放蓝奏自己看吧

颜值检测效果

代码

import base64
import os
import time
import re
import uiautomator2 as u2
import requests
from jsonpath import jsonpath
from concurrent.futures import ThreadPoolExecutor

def get_mid_str(s, start_str, stop_str):
# 查找左边文本的结束位置
start_pos = s.find(start_str)
if start_pos == -1:
return None
start_pos += len(start_str)
# 查找右边文本的起始位置
stop_pos = s.find(stop_str, start_pos)
if stop_pos == -1:
return None

# 通过切片取出中间的文本
return s[start_pos:stop_pos]

class DouyinDown:
def __init__(self):
self.device = u2.connect('127.0.0.1:62001') # 这里改成自己的设备
self.down_url = []

def get_downurl(self):
de = self.device
de.app_start('com.ss.android.ugc.aweme')
while True:
if len(self.down_url) <= 5:
share_btn = de(resourceId="com.ss.android.ugc.aweme:id/share_container")
if de(text='点击进入直播间').exists:
de.swipe_ext('up', 1)
print("直播间,跳过")
else:
if self.check_beauty() is True:
share_btn.click()
if share_btn.exists:
share_btn.click()
else:
if de(text='复制链接').exists:
de(text='复制链接').click()
else:
zb = de.xpath('//*[@resource-id="com.ss.android.ugc.aweme:id/is-"]').rect
de.swipe(zb[0] + 400, zb[1] + 50, zb[0], zb[1] + 20, 0.05)
de(text='复制链接').click()
self.down_url.append(de.clipboard)
de.swipe_ext('up', 1)
else:
de.swipe_ext('up', 1)
else:
time.sleep(5)

def check_beauty(self):
api_key = '' # 百度开放平台api_key
secret_key = '' # 百度开放平台secret_key
url = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=' + api_key + '&client_secret=' + secret_key
get_acces_token = requests.get(url).json()
acces_token = jsonpath(get_acces_token, '$..access_token')[0]
de = self.device
time.sleep(2)
screenshot = de.xpath('//*[@resource-id="com.ss.android.ugc.aweme:id/ge0"]').screenshot()
screenshot.save('screenshot.jpg')
with open('screenshot.jpg', 'rb') as f:
base64_data = base64.b64encode(f.read())
request_url = "https://aip.baidubce.com/rest/2.0/face/v3/detect"
post_data = {
"image": base64_data,
"image_type": "BASE64",
"face_field": "gender,age,beauty,gender",
"face_type": "LIVE"
}
headers = {
"Content-Type": "application/json;"
}
age, beauty, gender = 0, 0, 'male'
request_url = request_url + "?access_token=" + acces_token
response = requests.post(url=request_url, data=post_data, headers=headers)
json_result = response.json()
error_msg = jsonpath(json_result, '$..error_msg')[0]
if error_msg == 'SUCCESS':
age = jsonpath(json_result, '$..age')[0]
beauty = jsonpath(json_result, '$..beauty')[0]
gender = jsonpath(json_result, '$..type')[0]
print(age, beauty, gender)
else:
print('未识别到人脸')
return False
if int(age) <= 30 and int(beauty) >= 60 and gender == 'female':
return True
else:
return False

def down_video(self):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
}
while True:
if len(self.down_url) == 0:
time.sleep(5)
else:
url = re.search('https://[a-zA-Z./0-9]+', self.down_url[0]).group()
video_id = get_mid_str(requests.get(url).url, 'video/', '?previous')
respone = requests.get('https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids=' + video_id,
headers=headers)
video_url = jsonpath(respone.json(), '$.item_list[0].video.play_addr.url_list[0]')[0].replace('playwm',
'play')
video_content = requests.get(video_url, headers=headers).content
with open(video_id + '.mp4', 'wb') as data:
data.write(video_content)
print('下载成功')
del (self.down_url[0])

def main(self):
with ThreadPoolExecutor(max_workers=2) as t:
t.submit(self.get_downurl)
t.submit(self.down_video)

if __name__ == '__main__':
DouyinDown().main()


网页版

import base64
import time
import requests
from selenium.common import exceptions
from jsonpath import jsonpath
from selenium import webdriver


def check_beauty():
api_key = ''
secret_key = ''
url = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=' + api_key + '&client_secret=' + secret_key
get_acces_token = requests.get(url).json()
acces_token = jsonpath(get_acces_token, '$..access_token')[0]
with open('screenshot.png', 'rb') as f:
base64_data = base64.b64encode(f.read())
request_url = "https://aip.baidubce.com/rest/2.0/face/v3/detect"
post_data = {
"image": base64_data,
"image_type": "BASE64",
"face_field": "gender,age,beauty,gender",
"face_type": "LIVE"
}
headers = {
"Content-Type": "application/json;"
}
age, beauty, gender = 0, 0, 'male'
try:
request_url = request_url + "?access_token=" + acces_token
response = requests.post(url=request_url, data=post_data, headers=headers)
json_result = response.json()
error_msg = jsonpath(json_result, '$..error_msg')[0]
if error_msg == 'SUCCESS':
age = jsonpath(json_result, '$..age')[0]
beauty = jsonpath(json_result, '$..beauty')[0]
gender = jsonpath(json_result, '$..type')[0]
else:
print('未识别到人脸')
return False
if int(age) <= 30 and int(beauty) >= 60 and gender == 'female':
return True
else:
return False
except:
pass


class DouyinDown:
def __init__(self):
options = webdriver.ChromeOptions()
self.webdriver = webdriver.Chrome(options=options)
self.down_url = ''
self.url_title = ''

def get_downurl_web(self):

driver = self.webdriver
driver.implicitly_wait(10)
driver.get('https://www.douyin.com/recommend')
input('请手动登录继续,完成后输入任意键')
while True:
time.sleep(4)
driver.find_element_by_xpath(
'//*[@id="root"]/div/div[2]/div[2]/div/div/div/div[2]').screenshot(
'screenshot.png')
state_code = check_beauty()
try:
if state_code is True:
driver.find_element_by_xpath('//*[@id="root"]//div[2]/div/div[2]/div[1]/div[2]').click()
url = driver.find_element_by_xpath('//*[@id="root"]//video').get_attribute('src')
url_name_elenment = driver.find_elements_by_xpath('//*[@id="root"]//span/span/span/span/span')
url_name_group = []
for i in url_name_elenment:
url_name_group.append(i.text)
url_name = '_'.join(url_name_group)
print('下载中...')
self.down_url = url
self.url_title = url_name
self.down_video_web()
elif state_code is False:
print('跳过')
pass
driver.find_element_by_xpath('//*[@id="root"]//xg-bar[3]/div[1]/div/div/div[2]').click()
except exceptions.StaleElementReferenceException:
print('元素被刷新,下一个')
driver.find_element_by_xpath('//*[@id="root"]//xg-bar[3]/div[1]/div/div/div[2]').click()
except Exception as msg:
print(msg)
print('异常')
input('请完成验证码后继续')

def down_video_web(self):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
}
video_url = self.down_url
viede_name = self.url_title
video_content = requests.get(video_url, headers=headers).content
with open(viede_name + '.mp4', 'wb') as data:
data.write(video_content)
print('下载成功')


if __name__ == '__main__':
DouyinDown().get_downurl_web()

TODO

如果需要的人多的话,后期考虑更新。
目前我发现可以优化的点有:
1.过滤广告
2.监测某一个用户
3.根据颜值,性别等选择是否下载(已完成)

更新

已加入颜值检查,用的是百度智能云的人脸检测接口,目前设置是,性别女,年龄30以下,颜值60分以上


下方隐藏内容为本帖所有文件或源码下载链接:

游客你好,如果您要查看本帖隐藏链接需要登录才能查看, 请先登录

主题

帖子

18

积分

初入江湖

UID
47
积分
18
精华
威望
36 点
违规
大神币
63 枚
注册时间
2021-05-04 18:39
发表于 2022-06-01 12:22:43.0
1

返回顶部