异步写出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import aiohttp
import aiofiles
import random

async def async_func(obj: dict) -> None:
async with obj['sem']:
await obj['funcname'](obj)


async def async_run(funcname, arg_lists: list, max_num: int, cfg: dict) -> None:
sem = asyncio.Semaphore(max_num) # 限制并发量
to_get = [async_func({'arg': arg, 'sem': sem, 'funcname': funcname, 'cfg': cfg}) for arg in arg_lists] # 生成并发任务
await asyncio.wait(to_get)


def asyncio_run(func)->None:
loop = asyncio.get_event_loop()
loop.run_until_complete(func)
# loop.run_forever(func)
# loop.close()


def async_Pool(funcname, urls: list, max_num: int = 200, cfg: dict = {}) -> None:
import sys
if sys.version_info < (3, 7):
asyncio.run = asyncio_run # 兼容3.7以下版本
# loop.run_until_complete(async_run(funcname,urls,max_num))
asyncio.run(async_run(funcname, urls, max_num, cfg))

# 异步写出
async def gettext(obj):
url = obj['arg']
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0;WOW64)AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3202.94 Safari/537.36'}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers,verify_ssl=False, timeout=60) as r:
# rt = await r.text()
rb = await r.read()
async with aiofiles.open(str(random.randint(1000, 9999))+'.hmtl', mode='wb') as f:
await f.write(rb)

urls = ['http://127.0.0.1' for i in range(5)]
async_Pool(gettext, urls)

关于在asyncio中回收变量,可以使用class类变量特性,具体是否是内存安全再议,这里只给出一种非队列方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import asyncio
import aiohttp
import aiofiles
import random

async def async_func(obj: dict) -> None:
async with obj['sem']:
await obj['funcname'](obj)


async def async_run(funcname, arg_lists: list, max_num: int, cfg: dict) -> None:
sem = asyncio.Semaphore(max_num) # 限制并发量
to_get = [async_func({'arg': arg, 'sem': sem, 'funcname': funcname, 'cfg': cfg}) for arg in arg_lists] # 生成并发任务
await asyncio.wait(to_get)


def asyncio_run(func)->None:
loop = asyncio.get_event_loop()
loop.run_until_complete(func)
# loop.run_forever(func)
# loop.close()


def async_Pool(funcname, urls: list, max_num: int = 200, cfg: dict = {}) -> None:
import sys
if sys.version_info < (3, 7):
asyncio.run = asyncio_run # 兼容3.7以下版本
# loop.run_until_complete(async_run(funcname,urls,max_num))
asyncio.run(async_run(funcname, urls, max_num, cfg))


class Test:
def __init__(self):
self.arr=[]

async def gettext(self,obj):
url = obj['arg']
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0;WOW64)AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3202.94 Safari/537.36'}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers,verify_ssl=False, timeout=60) as r:
rt = await r.text()
self.arr.append(rt)


ts = Test()
urls = ['http://127.0.0.1' for i in range(5)]
async_Pool(ts.gettext, urls)
print(ts.arr)