1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import time
from requests.packages.urllib3 import disable_warnings

from concurrent.futures import ThreadPoolExecutor as TPE_POOL # 线程池的一种实现1
from multiprocessing.dummy import Pool as MPD_POOL # 线程池的一种实现2
from threadpool import makeRequests, ThreadPool as TTP_POOL # 线程池的一种实现3

import requests

disable_warnings()

# URLS = ['https://www.163.com', 'https://www.baidu.com/', 'https://www.qq.com/']
URLS = ['http://127.0.0.1'] * 1000

headers = {
"User-Agent":
"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
}


def exp():
'''统计函数执行时间'''
def deco(func):
def wrapper(*arg, **kw):
import time
t0 = time.time()
res = func(*arg, **kw)
t = time.time() - t0
t = 0.1 if t == 0.0 else t
print('CALL %s %.4fs' % (func.__name__, t))
return res
return wrapper
return deco


class HappyPool:
def __init__(self):
self.ret = [] # 使用类属性的方式,用来回收全部执行结果

def _tpe_pool(self, func, datas, max_workers=10):
pool = TPE_POOL(max_workers=max_workers)
pool.map(func, datas)
pool.shutdown(wait=True) # 等待全部结束

def _mpd_pool(self, func, datas, max_workers=10):
pool = MPD_POOL(max_workers)
pool.map(func, datas)
pool.close()
pool.join() # 等待全部结束

def _ttp_pool(self, func, datas, max_workers=10):
def _map(func, datas):
requests = makeRequests(func, datas)
[pool.putRequest(req) for req in requests]

pool = TTP_POOL(max_workers)
_map(func, datas)
pool.wait() # 等待全部结束

def get_url(self, url):
resp = requests.get(url, headers=headers, verify=False, timeout=7, allow_redirects=True)
tmp = {url: resp}
self.ret.append(tmp)

def start(self, func, urls, max_workers, pool='tpe'):
if pool == 'tpe': # 使用<concurrent.futures.ThreadPoolExecutor>的线程池
self._tpe_pool(func, urls, max_workers)

if pool == 'mpd': # 使用<multiprocessing.dummy>的线程池
self._mpd_pool(func, urls, max_workers)

if pool == 'ttp': # 使用<threadpool.ThreadPool>的线程池
self._ttp_pool(func, urls, max_workers)


@exp()
def run_tpe_pool():
obj = HappyPool()
obj.start(obj.get_url, URLS, 20, pool='tpe')
# print(obj.ret) # 返回值


@exp()
def run_mpd_pool():
obj = HappyPool()
obj.start(obj.get_url, URLS, 20, pool='mpd')
# print(obj.ret) # 返回值


@exp()
def run_ttp_pool():
obj = HappyPool()
obj.start(obj.get_url, URLS, 20, pool='ttp')
# print(obj.ret) # 返回值


if __name__ == '__main__':
# pass
run_tpe_pool() # 3.66秒
run_mpd_pool() # 3.65秒
run_ttp_pool() # 3.86秒