1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#4*50*800(进程数*线程数*异步数)
import warnings
warnings.filterwarnings("ignore")
from multiprocessing import cpu_count,Manager as ProcessManager,Pool as ProcessPool # 进程池
from gevent import monkey;monkey.patch_all(socket=True,select=True,thread=False)
from gevent.pool import Pool as GeventPool# 异步池
from multiprocessing.dummy import Pool as ThreadPool # 线程池


#线程池
def bPool2(arg):
tnum=arg['tnum']
tpool = ThreadPool(tnum)
arr=list(map(lambda i:{'cnum':arg['cnum'],'tnum':i,'pnum':arg['pnum'],'arg':arg['arg'],'callback':arg['callback']},range(tnum)))
tpool.map(gPool2, arr)
tpool.close()
tpool.join()

#gevent协程
def gPool2(arg):
pnum=arg['pnum']
gpool = GeventPool(pnum)
arr=list(map(lambda i:{'cnum':arg['cnum'],'tnum':arg['tnum'],'pnum':i,'arg':arg['arg']},range(pnum)))
gpool.map(arg['callback'], arr)

#进程池
def sPool(callback,tnum=20,pnum=800,cnum='',arg=[]):
cnum=cpu_count() if cnum=='' else cnum
spool=ProcessPool(cnum)
arr=list(map(lambda i:{'cnum':i,'tnum':tnum,'pnum':pnum,'arg':arg,'callback':callback},range(cnum)))
spool.map(bPool2, arr)
spool.close()
spool.join()

def gethtml2(arg):
q,lock=arg['arg']
lock.acquire()
url=q.get()
print('进程序号:'+str(arg['cnum']),'线程序号:'+str(arg['tnum']),'异步序号:'+str(arg['pnum']),url)
lock.release()
import time
time.sleep(100000)#这里阻塞一下看一下 正常情况写一个while 1
#while 1:
# try:
# function #写逻辑 可以从异步队列里拿网址请求
# except Exception as e:
# print(e)
def Manager():
manager = ProcessManager()
q = manager.Queue()
lock = manager.Lock()
return q,lock

def feed(q,urls):
[q.put(url) for url in urls]

if __name__ == '__main__':
q,lock = Manager()
urls=[i for i in range(1,160001)]
feed(q,urls)
sPool(gethtml2,tnum=50,pnum=800,cnum=4,arg=[q,lock])

ps:正常情况下网络io的并发并没有这么高,也就每秒100400url 在510Mb的网络下
同时也要修改tcp参数 进行调优 文件读写数