可乐博客

Python高并发小记

# python3
# make:v0.0.1,Kele,2018-11-29 19:46:17 email:kele@masong.me

# python高并发三种实现方式 多线程、多进程、异步
# 由于python只运行在单进程内 所以可以在多进程内开多线程
# 多进程一般为cpu数,多线程一般为20+ 异步数800+
# 在并发时,注意常量、锁、队列
# 队列 先进先出、可以将1万个url放在队列里 在任意子线程/子进程/子异步函数get
# 常量 在任意子线程/子进程/子异步函数共享的常量值
# 锁 加锁解锁防止同一时间的干扰

# 原理
# 函数A(参数a)
# 参数集合Lists
# 并发池(函数A名称,参数集合Lists,并发数20)

# 难点
# 在于数据的控制、分发、合并

# 技巧
# 利用线程池、进程池、异步池
# 有些时候 为了防止可能会频繁创建销毁时 在每个子函数中写while 1 
# 上述技巧难点在于全局控制销毁 可以用os._exit(0)结束

# 例子
# 爬虫的难点在于高并发与反爬虫机制
# so 高并发时假设url数量大于10万时 可以存在数据库加一个state值进行updata

# 快速构建爬虫框架
# 入口函数->参数构造器->文件缓存区->http请求器->编码处理器->文本处理器

#线程池
def pool(callback, lists,threadNum=10):
    import threadpool         
    pool = threadpool.ThreadPool(threadNum) 
    requests = threadpool.makeRequests(callback, lists) 
    [pool.putRequest(req) for req in requests] 
    pool.wait()

def bPool(arg):
    from multiprocessing.dummy import Pool as ThreadPool # 线程池
    tpool = ThreadPool(arg['tnum'])
    arr=list(map(lambda i:{'cnum':arg['cnum'],'tnum':i,'arg':arg['arg']},range(arg['tnum'])))
    tpool.map(arg['callback'], arr)
    tpool.close()  
    tpool.join() 

#进程池
def sPool(callback,tnum=20,cnum='',arg=[]):
    from multiprocessing import Pool as ProcessPool # 进程池
    from multiprocessing import cpu_count #cpu数量
    if cnum=='':
        spool = ProcessPool(cpu_count())
    else:
        spool = ProcessPool(cnum)
    arr=list(map(lambda i:{'cnum':i,'tnum':tnum,'callback':callback,'arg':arg},range(cnum)))
    spool.map(bPool, arr)
    spool.close()  
    spool.join()

def Manager():
    from multiprocessing import Manager
    manager = Manager()
    q = manager.Queue()
    lock = manager.Lock()
    return q,lock

#gevent协程
def gPool(callback,urls=[],pnum=800):
    from gevent import monkey; monkey.patch_all(socket=True,select=True)
    from gevent.pool import Pool
    gpool = Pool(pnum)
    gpool.map(callback, urls)

#gevent常量
def getGevent():
    from gevent import monkey; monkey.patch_all(socket=True,select=True)
    from gevent.queue import Queue#get,put
    from gevent.local import local
    try:
        from gevent.lock import BoundedSemaphore
    except:
        from gevent.coros import BoundedSemaphore 
    sem = BoundedSemaphore(2)#acquire,release
    return local,Queue,sem#返回常量,队列,锁

def gethtml(url):
    import requests
    # return len(requests.get(url).text)
    print(len(requests.get(url).text))

def hello(arg):
    print('进程序号:'+str(arg['cnum']),'线程序号:'+str(arg['tnum']))

if __name__ == '__main__':
    import warnings
    warnings.filterwarnings("ignore")#忽略警告
    url='http://www.baidu.com'
    urls=[url for _ in range(10)]
    #正常访问一遍
    gethtml(url)
    print('='*20)

    #多线程访问
    pool(gethtml,urls,10)
    print('='*20)

    #多进程访问
    # q,lock = rq.Manager()#队列,锁
    # rq.feed(q,urls)
    # rq.sPool(Downloader,tnum=25,cnum=4,arg=[q,lock])
    sPool(hello,tnum=20,cnum=4)

    #异步访问
    #l,q,s=rq.getGevent()#返回常量,队列,锁
    gPool(gethtml,urls,10)
    print('='*20)


    pass
Pythonic