Programowanie asynchroniczne w Pythonie (asyncio module)

Wyobraźcie sobie, że tworzycie crawlera, który ma za zadanie pobierać eventy z API githuba przez internet jak najszybciej. Prawdopodobnie od razu przyjdzie wam do głowy, żeby skorzystać z przetwarzania równoległego. Co mamy do wyboru? Wątki, procesy i przetwarzanie asynchroniczne.

W tym artykule skupię się na przetwarzaniu asynchronicznym, które całkiem dobrze nadaje się przy wielu operacjach I/O, takich jak na przykład pobieranie danych przez sieć.

Zaczniemy od wersji nieasynchronicznej.

from urllib import request
import json

API_URL = 'https://api.github.com'

EVENTS_URL = API_URL + '/events'

# client id z serwisu github
CLIENT_ID = 'XXX'

# client secret z serwisu github
CLIENT_SECRET = 'XXX'

NUMBER_OF_PAGES = 10

# pobieramy dane z API githuba
def fetch_page(page):
    response = request.urlopen(EVENTS_URL + '?page={}&client_id={}&client_secret={}'.format(page, CLIENT_ID, CLIENT_SECRET))
    return json.loads(response.read().decode('utf-8'))


# wyswietlamy wyniki
def print_result(events):
    for event in events:
        print("{} - {}".format(event['type'], event['repo']['name']))


def main():
    for page in range(1, NUMBER_OF_PAGES):
        print_result(fetch_page(page))


if __name__ == "__main__":
    main()

Jak widać, zadanie trywialne, pobieramy eventy z github API dla 10 stron i tyle 😉

Jaki mamy tu problem? Kod wykonywany jest sekwencyjnie, linia po lini. Musimy czekać za każdym razem na pobranie danych przez API.

A nie byłoby lepiej, żeby za każdym razem jak czekamy na dane, w tym samym czasie wykonać request po kolejną stronę z wynikami? Po co tracić niepotrzebnie czas na wejście/wyjście 😉

Zanim zaczniemy wprowadzę kilka pojęć:

  • korutyna/współprogram (ang. coroutine) – współprogram A (w naszym przypadku funkcja), która posiada możliwość zawieszenia wykonywania i przeniesienie wykonywania do innego współprogramu B (innej funkcji). Praca współprogramu A może zostać wznowiona w miejscu w którym została zawieszona.
  • cooperative coroutine scheduler (pętla zdarzeń) – za każdym razem jak korutyna zawiesi wykonywanie, scheduler wkracza do akcji i decyduje jaka inna korutyna dostanie zielone światło do wykonania swojego kodu.

Słowa kluczowe async i await są głównymi elementami programowania asynchronicznego w Pythonie (wersja 3.5+).

Punkty wstrzymania i wznowienia definiujemy przy pomocy słowa kluczowego „await” – w ten sposób możemy uruchomić korutynę. Korutyna, którą chcemy uruchomić przekazywana jest do schedulera gdzie umieszczana jest na liście korutyn do uruchomienia.

Scheduler sprawdza czy jest jakaś korutyna, która oczekuje na uruchomienie i dlaczego: np. nowe dane z sieci, pobrane dane z bazy, etc. Na tej podstawie wybiera jedną z korutyn i wznawia jej wykonywanie.

Korutyny definiujemy przy pomocy słowa kluczowego async. Funkcje zdefiniowane przy pomocy async podczas wywołania nie rozpoczynają wykonania swojego kodu, a zwracają specjalny obiekty korutyny. Korutyna nie robi nic, póki jej wykonanie nie zostanie zgłoszone do schedulera.

Spróbujmy przepisać powyższy kod, tym razem wykorzystując „asyncio” i możliwości współprogramów (korutyn). Niestety urllib z biblioteki standardowej nie wspiera asynchroniczności, dlatego posłużymy się aiohttp.

import aiohttp, asyncio

API_URL = 'https://api.github.com'

EVENTS_URL = API_URL + '/events'

# client id z serwisu github
CLIENT_ID = 'XXX'

# client secret z serwisu github
CLIENT_SECRET = 'XXX'

NUMBER_OF_PAGES = 10

session = aiohttp.ClientSession()


# korutyna do pobierania danych z githuba przy pomocy klienta aiohttp
async def fetch_page(page):
    async with session.get(EVENTS_URL + '?page={}&client_id={}&client_secret={}'.format(page, CLIENT_ID, CLIENT_SECRET)) as response:
        return await response.json()


# korutyna do wyświetlania wyników pobranych przy pomocy fetch_page
async def print_result(result):
    events = await result
    for event in events:
        print("{} - {}".format(event['type'], event['repo']['name']))


# głowna korutyna
async def main():
    await asyncio.wait([
        print_result(fetch_page(page)) for page in range(1, NUMBER_OF_PAGES)
    ])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    # pętla zdarzeń czeka na zakończenie działania main(),
    # czyli tak naprawdę na zakończenie wszystkich korutyn,
    # które są dodane w asyncio.wait
    loop.run_until_complete(main())

    loop.run_until_complete(session.close())
    loop.close()

Jeszcze dla porównania czasy wykonania obu skryptów:

time python downloader.py
0,35s user 0,09s system 5% cpu 7,819 total
time python downloader_asyncio.py
0,39s user 0,11s system 32% cpu 1,541 total

Jak widać skrypt asynchroniczny jest szybszy 🙂

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *