可乐博客

asynio python异步编程

异步写出

import asyncio
import aiohttp
import aiofiles
import random

async def async_func(obj: dict) -> None:
    async with obj['sem']:
        await obj['funcname'](obj)


async def async_run(funcname, arg_lists: list, max_num: int, cfg: dict) -> None:
    sem = asyncio.Semaphore(max_num)  # 限制并发量
    to_get = [async_func({'arg': arg, 'sem': sem, 'funcname': funcname, 'cfg': cfg}) for arg in arg_lists]  # 生成并发任务
    await asyncio.wait(to_get)


def asyncio_run(func)->None:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(func)
    # loop.run_forever(func)
    # loop.close()


def async_Pool(funcname, urls: list, max_num: int = 200, cfg: dict = {}) -> None:
    import sys
    if sys.version_info < (3, 7):
        asyncio.run = asyncio_run  # 兼容3.7以下版本
    # loop.run_until_complete(async_run(funcname,urls,max_num))
    asyncio.run(async_run(funcname, urls, max_num, cfg))

# 异步写出
async def gettext(obj):
    url = obj['arg']
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0;WOW64)AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3202.94 Safari/537.36'}
    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers,verify_ssl=False, timeout=60) as r:
            # rt = await r.text()
            rb = await r.read()
            async with aiofiles.open(str(random.randint(1000, 9999))+'.hmtl', mode='wb') as f:
                await f.write(rb)

urls = ['http://127.0.0.1' for i in range(5)]
async_Pool(gettext, urls)

关于在asyncio中回收变量,可以使用class类变量特性,具体是否是内存安全再议,这里只给出一种非队列方法

import asyncio
import aiohttp
import aiofiles
import random

async def async_func(obj: dict) -> None:
    async with obj['sem']:
        await obj['funcname'](obj)


async def async_run(funcname, arg_lists: list, max_num: int, cfg: dict) -> None:
    sem = asyncio.Semaphore(max_num)  # 限制并发量
    to_get = [async_func({'arg': arg, 'sem': sem, 'funcname': funcname, 'cfg': cfg}) for arg in arg_lists]  # 生成并发任务
    await asyncio.wait(to_get)


def asyncio_run(func)->None:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(func)
    # loop.run_forever(func)
    # loop.close()


def async_Pool(funcname, urls: list, max_num: int = 200, cfg: dict = {}) -> None:
    import sys
    if sys.version_info < (3, 7):
        asyncio.run = asyncio_run  # 兼容3.7以下版本
    # loop.run_until_complete(async_run(funcname,urls,max_num))
    asyncio.run(async_run(funcname, urls, max_num, cfg))


class Test:
    def __init__(self):
        self.arr=[]

    async def gettext(self,obj):
        url = obj['arg']
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0;WOW64)AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3202.94 Safari/537.36'}
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=headers,verify_ssl=False, timeout=60) as r:
                rt = await r.text()
                self.arr.append(rt)


ts = Test()
urls = ['http://127.0.0.1' for i in range(5)]
async_Pool(ts.gettext, urls)
print(ts.arr)
Pythonic