本帖最后由 无厘头大 于 2023-10-06 19:51 编辑
看教程的时候看到一个,生产者跟消费者的概念比较有意思,但是给的代码有问题无法正常运行,于是我就捣鼓了一下。 基本概念就是: 生产者: 一个进程获取网页没页的图片连接(主进程) 消费者: 一个进程下载图片,不同的是每个页面有20张图片,所以在进程中又开了二十线程(子进程) 嗯,概念就是这样接下来看代码 # -*- coding: utf-8 -*- # @Time :2023/9/30 22:46 # [url=home.php?mod=space&uid=686208]@AuThor[/url] : # @FileName :进程池.线程池配合使用与进程共用变量的使用.py # [url=home.php?mod=space&uid=2097534]@IED[/url] :PyCharm """ """ import multiprocessing import requests import os from concurrent.futures import ThreadPoolExecutor from lxml import etree path = r'D:\img' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/117.0.0.0 Safari/537.36' } # 获取图片地址 def get_url(url, que): resp = requests.get(url, headers=headers) resp.encoding = resp.apparent_encoding date = resp.text tree = etree.HTML(date) list_url = tree.xpath('//ul[@class="clearfix"]/li/a') for i in list_url: url_img = i.xpath('./img/@src') que.put('https://pic.netbian.com' + str(*url_img)) # put写入进程变量队列中 resp.close() # 下载并保存图片 def download_ove(url, name): try: resp = requests.get(url, headers=headers) path_img = os.path.join(path, f'{name.value}.jpg') with open(path_img, 'wb') as f: f.write(resp.content) print(f'保存成功{name.value}.jpg') name.value += 1 # 数值型进程变量自增 except Exception as ex: print('下载出错', ex) # 获取进程队列中的url并启用线程池下载保存图片 def download_img(que, name): # 创建线程池,指定20个线程处理数据 with ThreadPoolExecutor(20) as t: while True: try: s = que.get(timeout=3) # 获取进程队列中的数据,等待3秒若是还没获取到数据抛出异常 t.submit(download_ove, s, name) # 添加进线程池 # t.submit(download_ove, s).add_done_callback(err_call_back) # 获取线程池异常 except Exception as ec: print(ec) break # 接收进程池与线程池异常的回调函数 def err_call_back(err): print(f'出错啦~ error:{str(err)}') if __name__ == '__main__': if not os.path.exists(path): os.mkdir(path) que = multiprocessing.Manager().Queue() # 创建一个进程之间共享的队列变量 name = multiprocessing.Manager().Value('i', 0) # 创建一个进程之间共享的数值型变量,'i'表示整型数字,0表示从0开始 for i in range(1, 4): if i != 1: url = f'https://pic.netbian.com/4kdongman/index_{i}.html' else: url = 'https://pic.netbian.com/4kdongman/' get_url(url, que) # 创建进程池,并将函数添加到进程池内,指定最多开辟1个进程处理数据(开多了怕把网址玩崩) with multiprocessing.Pool(1) as pool: pool.apply(download_img, args=(que, name)) # 将函数以同步的方式添加进进程池 # apply_async异步方式无法使用不知道为啥,不报异常也不运行download_img函数 # pool.apply_async(download_img, args=(que, name), error_callback=err_call_back) pool.close() # 结束进程池,不在往内添加数据 pool.join() # 等待进程池运行结束 print('结束')
本代码仅作为练习所用,大家别把人家网站搞崩了。
|