Skip to main content

Сопрограммы

ЭТО ТЕХНОЛОГИЯ УСКОРЕНИЯ РАБОТЫ В ОДНОМ ПОТОКЕ. ДЛЯ ПАРАЛЛЕЛЬНЫХ ВЫЧИСЛЕНИЙ THREADING,

Сопрограммы - функции с возможностью приостановки при длительной внешней операции. Async определяет сопрограмму, await приостанавливает сопрограмму на время выполнения внешней задачи. При вызове сопрограммы она напрямую не выполняется. Для старта нужна точка входа в асинхронные вычисления.

Пример: правильное использование. 

import asyncio

async def corutine_add_one(a: int) -> int:
    return b + 1

corutine_res = asyncio.run(corutine_add_one(5))
print(type(corutine_res), '  ', corutine_res)

Однако если просто запустить корутину, то результат будет интересным: 

import asyncio

async def corutine_add_one(a: int) -> int:
    return b + 1

corutine_res = corutine_add_one(5)
print(type(corutine_res), '  ', corutine_res)

Вывод: 

<class 'coroutine'>    <coroutine object corutine_add_one at 0x0000026850FD4270>

Существует цикл событий. При постановке корутины на паузу задача передается следующей корутине. Однако такой код выполняется в виде простого синхронного кода: 

import asyncio

async def sleep_10(x: int) -> int:
    await asyncio.sleep(10)
    return x+10

async def main():
    x1 = await sleep_10(1)
    x2 = await sleep_10(1)
    print(x1, '  ', x2)

asyncio.run(main())

Задачи

Для приближения к параллельным вычислениям необходимо использовать задачи. 

import asyncio
from util import delay

async def main():
    sleep1 = asyncio.create_task(delay(10))
    sleep2 = asyncio.create_task(delay(10))
    await sleep1
    await sleep2

asyncio.run(main())

К тому же, если убрать await sleep2, то вторая задача все равно выполнится, т.к. вроде все задачи из очереди запускаются. Это так, вот только не говорится о том, завершаются они или нет. Вот пример кода: 

async def delay(delay_seconds: int) -> int:
    print(f'Засыпаю на {delay_seconds} секунд.')
    await asyncio.sleep(delay_seconds)
    print(f'сон в течение {delay_seconds} закончился.')
    return delay_seconds

async def delay_with_inner(delay_seconds: int) -> int:
    print(f'Засыпаю на {delay_seconds} секунд.')
    t1 = asyncio.create_task(delay(15))
    await asyncio.sleep(delay_seconds)
    #await t1 
    print(f'сон в течение {delay_seconds} закончился.')
    return delay_seconds

async def main():
    sleep1 = asyncio.create_task(delay_with_inner(10))
    sleep2 = asyncio.create_task(delay_with_inner(10))
    sleep3 = asyncio.create_task(delay_with_inner(10))
    await sleep1
    await sleep2
    await sleep3

asyncio.run(main())

Если внешнюю мы запускаем на 10 секунд, то внутренняя (t1) тоже запускается - т.к. находится в очереди задач. Но вот только программа завершается до завершения t1.

Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
сон в течение 10 закончился.
сон в течение 10 закончился.
сон в течение 10 закончился.

И все! А если раскомментировать строку с ожиданием t1, то тогда сначала завершится t1 

Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
сон в течение 15 закончился.
сон в течение 15 закончился.
сон в течение 15 закончился.
сон в течение 10 закончился.
сон в течение 10 закончился.
сон в течение 10 закончился.

А для такого кода будет создано 3 файла. Но если вместо sleep3 установить sleep1, то все задачи стартуют, но будет создан только файл 5.txt.

import asyncio
from util import delay_with_inner, delay, delay_and_writefile
from time import sleep

async def main():
    sleep1 = asyncio.create_task(delay_and_writefile(5))
    sleep2 = asyncio.create_task(delay_and_writefile(10))
    sleep3 = asyncio.create_task(delay_and_writefile(11))
    await sleep3

asyncio.run(main())

Исходя из этого, важна не последовательность помещения в задачи, а последовательность при обращении (момент await).

Снятие задач и тайм-ауты

import asyncio
from asyncio import CancelledError
from util import delay
async def main():
    long_task = asyncio.create_task(delay(10))
    seconds_elapsed = 0
    while not long_task.done():
        print('Задача не закончилась, следующая проверка через секунду.')
        await asyncio.sleep(1)
        seconds_elapsed = seconds_elapsed + 1
        if seconds_elapsed == 5:
            long_task.cancel()
    try:
        await long_task
    except CancelledError:
        print('Наша задача была снята')
asyncio.run(main())

 Тайм-аут: 

import asyncio
from util import delay
async def main():
    delay_task = asyncio.create_task(delay(2))
    try:
        result = await asyncio.wait_for(delay_task, timeout=1)
        print(result)
    except asyncio.exceptions.TimeoutError:
        print('Тайм-аут!')
        print(f'Задача была снята? {delay_task.cancelled()}')
asyncio.run(main())

При помощи shield можно игнорировать запросы на снятие: 

import asyncio
from util import delay
async def main():
    task = asyncio.create_task(delay(10))
    try:
    result = await asyncio.wait_for(asyncio.shield(task), 5)
        print(result)
    except TimeoutError:
        print("Задача заняла более 5 с, скоро она закончится!")
        result = await task
        print(result)
asyncio.run(main())

Список задач: 

for task in asyncio.tasks.all_tasks():
    print(task)

Мниожественные task в цикле.

Проблема: при создании в цикле при возникновении исключения в одной задаче, остальные задачи могут не завершить выполнение. Для

решения данной проблемы есть ф

Функция gather 

import asyncio
import aiohttp
from aiohttp import ClientSession
from chapter_04 import fetch_status

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://example.com' for _ in range(1000)]
        requests = [fetch_status(session, url) for url in urls]
        status_codes = await asyncio.gather(*requests)
        print(status_codes)
asyncio.run(main())

Gather возвращает статусы в порядке передаче объектов, независимо от времени исполнения.
Возвращает все данные одновременно.

Обработка исключений - необязательный bool параметр return_exceptions. 

  • return_exceptions=False по умолчанию. Если хотя бы  одна  сопрограмма  возбуждает  исключение,  то  gather  возбуждает то же исключение в точке await. Но, даже если какая-то сопрограмма откажет, остальные не снимаются и продолжат работать при условии, что мы обработаем исключение и оно не приведет к остановке цикла событий и снятию задач; 
  • return_exceptions=True – в этом случае исключения возвращаются в том же списке, что результаты. Сам по себе вызов gather не возбуждает исключений, и мы можем обработать исключения, как нам удобно.
async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://example.com', 'python://example.com']
        tasks = [fetch_status_code(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        exceptions = [res for res in results if isinstance(res, Exception)]
        successful_results = [res for res in results if not isinstance(res, Exception)]
        print(f'Все результаты: {results}')
        print(f'Завершились успешно: {successful_results}')
        print(f'Завершились с исключением: {exceptions}')

Функция as_completed

Начинает возвращать данные по мере поступления. Однако порядок не определен.  

import asyncio
import aiohttp
from aiohttp import ClientSession
from chapter_04 import fetch_status

async def main():
    async with aiohttp.ClientSession() as session:
        fetchers = [fetch_status(session, 'https://www.example.com', 1),
                    fetch_status(session, 'https://www.example.com', 1),
                    fetch_status(session, 'https://www.example.com', 10)]
        for finished_task in asyncio.as_completed(fetchers):
            print(await finished_task)
asyncio.run(main())

asyncio.as_completed(fetchers, timeout=2) задает тайм-аут. Но созданные задачи продолжают работать в фоновом режиме.

Точный контроль над функциями

Используется wait: 

async def main():
    async with aiohttp.ClientSession() as session:
    fetchers = \
        [asyncio.create_task(fetch_status(session, 'https://example.com')),
        asyncio.create_task(fetch_status(session, 'https://example.com'))]
    done, pending = await asyncio.wait(fetchers)
    print(f'Число завершившихся задач: {len(done)}')
    print(f'Число ожидающих задач: {len(pending)}')
    for done_task in done:
        result = await done_task
        print(result)

Сигнатура  wait –  список  допускающих  ожидание  объектов, за которым следует факультативный тайм-аут и факультативный 
параметр return_when, который может принимать значения ALL_COMPLETED, FIRST_EXCEPTION и FIRST_COMPLETED, а по умолчанию равен ALL_COMPLETED.

wait не возбуждает исключений, исключения содержатся в методе done_task.exception()

Пример обработки исключений 

import asyncio
import logging

async def main():
    async with aiohttp.ClientSession() as session:
        good_request = fetch_status(session, 'https://www.example.com')
        bad_request = fetch_status(session, 'python://bad')
        fetchers = [asyncio.create_task(good_request),
                    asyncio.create_task(bad_request)]
        done, pending = await asyncio.wait(fetchers)
        print(f'Число завершившихся задач: {len(done)}')
        print(f'Число ожидающих задач: {len(pending)}')
        for done_task in done:
            # result = await done_task возбудит исключение
            if done_task.exception() is None:
                print(done_task.result())
            else:
                logging.error("При выполнении запроса возникло исключение",
                              exc_info=done_task.exception())
asyncio.run(main())