Python

FastApi


FastApi

Общие команды

Установка

pip install fastapi uvicorn

Ручной запуск (api - имя файла, app - имя объекта FastApi)

uvicorn api:app --port 8000 --reload

Запуск uvicorn из python скрипта

Файл main.py

from uvicorn import run
...
app = FastAPI()
...
if __name__ == '__main__':
    run(app="main:app", host='0.0.0.0', port=8000, workers=4, log_level='warning')
    #run(app="main:app", host='0.0.0.0', port=8000, reload=True)

 

curl запросы

curl -X 'GET' 'http://127.0.0.1:8000/todo' -H 'accept: application/json' 
curl -X 'POST' \ 
'http://127.0.0.1:8000/todo' \ 
-H 'accept: application/json' \ 
-H 'Content-Type: application/json' \ 
-d '{ 
"id": 1, 
"item": "First Todo is to finish this book!" 
}'

Автоматическая документация

Swagger

http://ip:port/docs

Redoc

http://ip:port/redoc

Шаблоны Jinja

Поддерживает шаблоны Jinja при выводе данных (вплоть до циклов).

FastApi

Маршрутизация

Добавление маршрутов

Основной файл:

from fastapi import FastAPI 
from todo import todo_router 
 
app = FastAPI() 

@app.get("/") 
async def welcome() -> dict: 
    return { 
        "message": "Hello World" 
        } 

app.include_router(todo_router)

Файл дополнительных маршрутов

from fastapi import APIRouter 
 
todo_router = APIRouter()

todo_list = [] 
 
@todo_router.post("/todo") 
async def add_todo(todo: dict) -> dict: 
    todo_list.append(todo) 
    return {"message": "Todo added successfully"} 

@todo_router.get("/todo") 
async def retrieve_todos() -> dict: 
    return {"todos": todo_list}

Автоматическое добавление маршрутов в основной файл app из файлов в директории data/plugins имеющих шаблон имени объекта APIRouter modulename_router 

fpath = os.path.join('data', 'plugins')
flist = os.listdir(fpath)
sys.path.insert(0, fpath)
for fname in flist:
    if fname not in ['__pycache__', '__init__.py']:
        m = os.path.splitext(fname)[0]
        impmod = importlib.import_module(m)
        router_name = f'{m}_router'
        if router_name in dir(impmod):
            router_mod = getattr(impmod, router_name)
            app.include_router(router_mod)

Получаемые параметры

Параметры пути:

from fastapi import Path

@todo_router.get("/todo/{todo_id}") 
async def get_single_todo(todo_id: int = Path(..., title="The ID of the todo to retrieve.")) -> dict: 
    for todo in todo_list: 
        if todo.id == todo_id: 
            return {"todo": todo} 
    return { 
        "message": "Todo with supplied ID doesn't exist." 
        }

В Path ... - параметр пути обязательный, None - не обязательный

Параметры запроса (после ? в запросе):

async query_route(query: str = Query(None): 
    return query

Передаваемые параметры

При помощи response_model можно фильтровать отдаваемые данные. Т е можно в отдаваемой модели указать неполный набор.

from typing import List 
 
class TodoItem(BaseModel): 
    item: str 
 
class TodoItems(BaseModel): 
    todos: List[TodoItem]
@todo_router.get("/todo", response_model=TodoItems) 
async def retrieve_todo() -> dict: 
    return { 
        "todos": todo_list 
        }

У содержащихся в списке словарей будет оставлен только item

Исключения

Класс HTTPException принимает три аргумента: 

from fastapi import APIRouter, Path, HTTPException, status 

@todo_router.get("/todo/{todo_id}") 
async def get_single_todo(todo_id: int = Path(..., title="The ID of the todo to retrieve.")) -> dict: 
    for todo in todo_list:
      if todo.id == todo_id: 
        return { "todo": todo } 
    raise HTTPException( 
        status_code=status.HTTP_404_NOT_FOUND, 
        detail="Todo with supplied ID doesn't exist", 
        )

Можно переопределить код успешного возврата в декораторе

@todo_router.post("/todo", status_code=201) 
async def add_todo(todo: Todo) -> dict: 
  todo_list.append(todo) 
  return { "message": "Todo added successfully." }

FastApi

Pydantic

 

class Item(BaseModel): 
    item: str 
    status: str 
 
class Todo(BaseModel):
    id: int 
    item: Item

 

 

FastApi

Jinja2

Формат Jinja2
Переменные шаблона Jinja могут относиться к любому типу или объекту Python, если их можно преобразовать в строки. Тип модели, списка или словаря можно передать шаблону и отобразить его атрибуты, поместив эти атрибуты во второй блок, указанный ранее. 
Виды синтаксиса
{% … %} управляющие структуры
{{ todo.item }} вывод значений переданных ему выражений
{# This is a great API book! #} комментарии
Иерархия шаблонов
Способ Описание
{% include %} Позволяет включить содержимое другого шаблона целиком. 
<!-- основной шаблон -->
<h1>Главная страница</h1>
{% include 'partials/header.html' %}
<p>Основное содержимое...</p>
{% extends %} + {% block %}

Наследование шаблонов и переопределение блоков. 

Базовый шаблон 

<!DOCTYPE html>
<html>
<head>
    <title>{% block title %}Default Title{% endblock %}</title>
</head>
<body>
    {% block content %}{% endblock %}
</body>
</html>

Дочерний шаблон 

{% extends "base.html" %}

{% block title %}Custom Title{% endblock %}

{% block content %}
    <h1>Привет, мир!</h1>
    {% include 'partials/footer.html' %}
{% endblock %}

 

Фильтры
{{ variable | filter_name(*args) }}
Виды фильтров
Название Описание
 default(strdefault)

Замена вывода переданного значения, если оно оказывается None

{{ todo.item | default('This is a default todo item') }}
escape Отображение необработанного вывода HTML
striptags Удаление HTML тетов перед отправкой

int

float

Преобразование типов перед ответом 

{{ 3.142 | int }} 
3 
{{ 31 | float }} 
31.0
join(whitespace) Объединение элементов списка в строку 
{{ ['Packt', 'produces', 'great', 'books!'] | join(' ') }} 
Packt produces great books!
length Длина переданного объекта 
Todo count: {{ todos | length }} 
Todo count: 4
Полный список фильтров
Условия:
{% if user %}
    Hello, {{ user.name }}!
{% else %}
    Hello, Unknown!
{% endif %}
Циклы
{% for comment in comments %}
   <b>{{ comment }}</b>
{% endfor %}
Можно также обратиться к переменной loop.index для получения дополнительной информации
Переменная Описание
loop.index Текущее значение итерации (1 - первая итерация)
loop.index0 Текущее значение итерации (0 - первая итерация)
loop.revindex loop.revindex0 Кол-во оставшихся итераций
loop.first True если первая итерация
loop.last
loop.length
loop.pervitem loop.nextitem Значение предыдущей/следующей итерации (пусто если не существует)
Макросы:
{% macro render_comment(comment) %}
    <li>{{ comment }}</li>
{% endmacro %}
<ul>
    {% for comment in comments %}
        {{ render_comment(comment) }}
    {% endfor %}
</ul>
Макросы можно импортировать из файлов
{% import 'macros.html' as macros %}


В Jinja двойные фигурные скобки {{ }} позволяют получить результат выражение, переменную или вызвать функцию и вывести значение в шаблоне.
class Foo:
 def __str__(self):
     return "This is an instance of Foo class"
Template("{{ var }}").render(var=Foo())
'This is an instance of Foo class'
Если обратится к индексу, который не существует, Jinja просто выведет пустую строку.

Вызов функции
В Jinja для определения функции ее нужно просто вызвать.
 def foo():
     return "foo() called"
Template("{{ foo() }}").render(foo=foo)
'foo() called'

Объявление переменных

Внутри шаблона можно задать переменную с помощью инструкции set.

{% set fruit = 'apple' %}

{% set name, age = 'Tom', 20 %}

Переменные определяются для хранения результатов сложных операций, так чтобы их можно было использовать дальше в шаблоне. Переменные, определенные вне управляющих конструкций (о них дальше), ведут себя как глобальные переменные и доступны внутри любой структуры. Тем не менее переменные, созданные внутри конструкций, ведут себя как локальные переменные и видимы только внутри этих конкретных конструкций. Единственное исключение — инструкция if.
Цикл и условные выражения
Реклама

Управляющие конструкции позволяют добавлять в шаблоны элементы управления потоком и циклы. По умолчанию, управляющие конструкции используют разделитель {% … %} вместо двойных фигурных скобок {{ ... }}.
Инструкция if

Инструкция if в Jinja имитирует выражение if в Python, а значение условия определяет набор инструкции. Например:

{% if bookmarks %}
    <p>User has some bookmarks</p>
{% endif %}

Если значение переменной bookmarks – True, тогда будет выведена строка <p>User has some bookmarks</p>. Стоит запомнить, что в Jinja, если у переменной нет значения, она возвращает False.

Также можно использовать условия elif и else, как в обычном коде Python. Например:

{% if user.newbie %}
    <p>Display newbie stages</p>
{% elif user.pro %}
    <p>Display pro stages</p>
{% elif user.ninja %}
    <p>Display ninja stages</p>
{% else %}
    <p>You have completed all stages</p>
{% endif %}

Управляющие инструкции также могут быть вложенными. Например:

{% if user %}
    {% if user.newbie %}
        <p>Display newbie stages</p>
    {% elif user.pro %}
        <p>Display pro stages</p>
    {% elif user.ninja %}
        <p>Display ninja stages</p>
    {% else %}
        <p>You have completed all states</p>
    {% endif %}
{% else %}
    <p>User is not defined</p>
{% endif %}

Реклама

В определенных случаях достаточно удобно записывать инструкцию if в одну строку. Jinja поддерживает такой тип записи, но называет это выражением if, потому что оно записывается с помощью двойных фигурных скобок {{ … }}, а не {% … %}. Например:

{{ "User is logged in" if loggedin else "User is not logged in" }}

Здесь если переменная loggedin вернет True, тогда будет выведена строка “User is logged in”. В противном случае — “User is not logged in”.

Условие else использовать необязательно. Если его нет, тогда блок else вернет объект undefined.

{{ "User is logged in" if loggedin }}

Здесь, если переменная loggedin вернет True, будет выведена строка “User is logged in”. В противном случае — ничего.

Как и в Python можно использовать операторы сравнения, присваивания и логические операторы для управляющих конструкций, чтобы создавать более сложные условия. Вот несколько примеров:

{# Если user.count ревен 1000, код '<p>User count is 1000</p>' отобразится #}
{% if users.count == 1000 %}
    <p>User count is 1000</p>
{% endif %}

{# Если выражение 10 >= 2 верно, код '<p>10 >= 2</p>' отобразится #}
{% if 10 >= 2 %}
    <p>10 >= 2</p>
{% endif %}

{# Если выражение "car" <= "train" верно, код '<p>car <= train</p>' отобразится #}
{% if "car" <= "train" %}
    <p>car <= train</p>
{% endif %}

{#
    Если user залогинен и superuser, код
    '<p>User is logged in and is a superuser</p>' отобразится
#}
{% if user.loggedin and user.is_superuser %}
    <p>User is logged in and is a superuser</p>
{% endif %}

{#
    Если user является superuser, moderator или author, код
    '<a href="#">Edit</a>' отобразится
#}
{% if user.is_superuser or user.is_moderator or user.is_author %}
    <a href="#">Edit</a>
{% endif %}

{#
    Если user и current_user один и тот же объект, код 
    <p>user and current_user are same</p> отобразится
#}
{% if user is current_user %}
    <p>user and current_user are same</p>
{% endif %}

{#
    Если "Flask" есть в списке, код 
    '<p>Flask is in the dictionary</p>' отобразится
#}
{% if ["Flask"] in ["Django", "web2py", "Flask"] %}
    <p>Flask is in the dictionary</p>
{% endif %}

Если условия становятся слишком сложными, или просто есть желание поменять приоритет оператора, можно обернуть выражения скобками ():

{% if (user.marks > 80) and (user.marks < 90) %}
    <p>You grade is B</p>
{% endif %}

Цикл for

Цикл for позволяет перебирать последовательность. Например:

{% set user_list = ['tom', 'jerry', 'spike'] %}

<ul>
{% for user in user_list %}
    <li>{{ user }}</li>
{% endfor %}
</ul>

Вывод:

<ul>

    <li>tom</li>

    <li>jerry</li>

    <li>spike</li>

</ul>

Вот как можно перебирать значения словаря:

{% set employee = { 'name': 'tom', 'age': 25, 'designation': 'Manager' } %}

<ul>
{% for key in employee.items() %}
<li>{{ key }} : {{ employee[key] }}</li>
{% endfor %}
</ul>

Вывод:

<ul>

    <li>designation : Manager</li>

    <li>name : tom</li>

    <li>age : 25</li>

</ul>

Примечание: в Python элементы словаря не хранятся в конкретном порядке, поэтому вывод может отличаться.

Если нужно получить ключ и значение словаря вместе, используйте метод items().

{% set employee = { 'name': 'tom', 'age': 25, 'designation': 'Manager' } %}

<ul>
{% for key, value in employee.items() %}
<li>{{ key }} : {{ value }}</li>
{% endfor %}
</ul>

Вывод:

<ul>

    <li>designation : Manager</li>

    <li>name : tom</li>

    <li>age : 25</li>

</ul>

Цикл for также может использовать дополнительное условие else, как в Python, но зачастую способ его применения отличается. Стоит вспомнить, что в Python, если else идет следом за циклом for, условие else выполняется только в том случае, если цикл завершается после перебора всей последовательности, или если она пуста. Оно не выполняется, если цикл остановить оператором break.

Когда условие else используется в цикле for в Jinja, оно исполняется только в том случае, если последовательность пустая или не определена. Например:
Реклама

{% set user_list = [] %}

<ul>
{% for user in user_list %}
    <li>{{ user }}</li>
{% else %}
    <li>user_list is empty</li>
{% endfor %}
</ul>

Вывод:

<ul>

    <li>user_list is empty</li>

</ul>

По аналогии с вложенными инструкциями if, можно использовать вложенные циклы for. На самом деле, любые управляющие конструкции можно вкладывать одна в другую.

{% for user in user_list %}
    <p>{{ user.full_name }}</p>
    <p>
        <ul class="follower-list">
            {% for follower in user.followers %}
            <li>{{ follower }}</li>
            {% endfor %}
        </ul>
    </p>
{% endfor %}

Цикл for предоставляет специальную переменную loop для отслеживания прогресса цикла. Например:

<ul>
{% for user in user_list %}
    <li>{{ loop.index }} - {{ user }}</li>
{% endfor %}
</ul>

loop.index внутри цикла for начинает отсчет с 1. В таблице упомянуты остальные широко используемые атрибуты переменной loop.
Метод Значение
loop.index0 то же самое что и loop.index, но с индексом 0, то есть, начинает считать с 0, а не с 1.
loop.revindex возвращает номер итерации с конца цикла (считает с 1).
loop.revindex0 возвращает номер итерации с конца цикла (считает с 0).
loop.first возвращает True, если итерация первая. В противном случае — False.
loop.last возвращает True, если итерация последняя. В противном случае — False.
loop.length возвращает длину цикла(количество итераций).

Примечание: полный список есть в документации Flask.
Фильтры

Фильтры изменяют переменные до процесса рендеринга. Синтаксис использования фильтров следующий:

variable_or_value|filter_name

Вот пример:

{{ comment|title }}

Фильтр title делает заглавной первую букву в каждом слове. Если значение переменной comment — "dust in the wind", то вывод будет "Dust In The Wind".

Можно использовать несколько фильтров, чтобы точнее настраивать вывод. Например:

{{ full_name|striptags|title }}

Фильтр striptags удалит из переменной все HTML-теги. В приведенном выше коде сначала будет применен фильтр striptags, а затем — title.

У некоторых фильтров есть аргументы. Чтобы передать их фильтру, нужно вызвать фильтр как функцию. Например:

{{ number|round(2) }}

Фильтр round округляет число до конкретного количества символов.

В следующей таблице указаны широко используемые фильтры.
Название Описание
upper делает все символы заглавными
lower приводит все символы к нижнему регистру
capitalize делает заглавной первую букву и приводит остальные к нижнему регистру
escape экранирует значение
safe предотвращает экранирование
length возвращает количество элементов в последовательности
trim удаляет пустые символы в начале и в конце
random возвращает случайный элемент последовательности

Примечание: полный список фильтров доступен здесь.
Макросы

Макросы в Jinja напоминают функции в Python. Суть в том, чтобы сделать код, который можно использовать повторно, просто присвоив ему название. Например:

{% macro render_posts(post_list, sep=False) %}
    <div>
        {% for post in post_list %}
            <h2>{{ post.title }}</h2>
            <article>
                {{ post.html|safe }}
            </article>
        {% endfor %}
        {% if sep %}<hr>{% endif %}
    </div>
{% endmacro %}

В этом примере создан макрос render_posts, который принимает обязательный аргумент post_list и необязательный аргумент sep. Использовать его нужно следующим образом:

{{ render_posts(posts) }}

Определение макроса должно идти до первого вызова, иначе выйдет ошибка.
Реклама

Вместо того чтобы использовать макросы прямо в шаблоне, лучше хранить их в отдельном файле и импортировать по надобности.

Предположим, все макросы хранятся в файле macros.html в папке templates. Чтобы импортировать их из файла, нужно использовать инструкцию import:

{% import "macros.html" as  macros %}

Теперь можно ссылаться на макросы в файле macros.html с помощью переменной macros. Например:

{{ macros.render_posts(posts) }}

Инструкция {% import “macros.html” as macros %} импортирует все макросы и переменные (определенные на высшем уровне) из файла macros.html в шаблон. Также можно импортировать определенные макросы с помощью from:

{% from "macros.html" import render_posts %}

При использовании макросов будут ситуации, когда потребуется передать им произвольное число аргументов.

По аналогии с *args и **kwargs в Python внутри макросов можно получить доступ к varargs и kwargs.

varags: сохраняет дополнительные позиционные аргументы, переданные макросу, в виде кортежа.

lwargs: сохраняет дополнительные позиционные аргументы, переданные макросу, в виде словаря.

Хотя к ним можно получить доступ внутри макроса, объявлять их отдельно в заголовке макроса не нужно. Вот пример:

{% macro custom_renderer(para) %}
    <p>{{ para }}</p>
    <p>varargs: {{ varargs }}</p>
    <p>kwargs: {{ kwargs }}</p>
{%  endmacro  %}

{{ custom_renderer("some content", "apple", name='spike', age=15) }}

В этом случае дополнительный позиционный аргумент, "apple", присваивается varargs, а дополнительные аргументы-ключевые слова (name=’spike’, age=15) — kwargs.
Экранирование

Jinja по умолчанию автоматически экранирует вывод переменной в целях безопасности. Поэтому если переменная содержит, например, такой HTML-код: "<p>Escaping in Jinja</p>", он отрендерится в виде "&lt;p&gt;Escaping in Jinja&lt;/p&gt;". Благодаря этому HTML-коды будут отображаться в браузере, а не интерпретироваться. Если есть уверенность, что данные безопасны и их точно можно рендерить, стоит воспользоваться фильтром safe. Например:

{% set html = "<p>Escaping in Jinja</p>" %}
{{ html|safe }}

Вывод:

<p>Escaping in Jinja</p>

Использовать фильтр safe в большом блоке кода будет неудобно, поэтому в Jinja есть оператор autoescape, который используется, чтобы отключить экранирование для большого объема данных. Он может принимать аргументы true или false для включения и отключения экранирования, соответственно. Например:

{% autoescape true %}
    Escaping enabled
{% endautoescape %}

{% autoescape false %}
    Escaping disabled
{% endautoescape %}

Все между {% autoescape false %} и {% endautoescape %} отрендерится без экранирования символов. Если нужно экранировать отдельные символы при выключенном экранировании, стоит использовать фильтр escape. Например:

{% autoescape false %}
    <div class="post">
        {% for post in post_list %}
            <h2>{{ post.title }}</h2>
            <article>
                {{ post.html }}
            </article>
        {% endfor %}
    </div>
    <div>
        {% for comment in comment_list %}
            <p>{{ comment|escape }}</p> # escaping is on for comments
        {% endfor %}
    </div>
{% endautoescape %}

Вложенные шаблоны

Инструкция include рендерит шаблон внутри другого шаблона. Она широко используется, чтобы рендерить статический раздел, который повторяется в разных местах сайта. Вот синтаксис include:

Предположим, что навигационное меню хранится в файле nav.html, сохраненном в папке templates:

<nav>
    <a href="/home">Home</a>
    <a href="/blog">Blog</a>
    <a href="/contact">Contact</a>
</nav>

Чтобы добавить это меню в home.html, нужно использовать следующий код:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>

    {# добавляем панель навигации из nav.html #}
    {% include 'nav.html' %}

</body>
</html>

Вывод:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>

<nav>
    <a href="/home">Home</a>
    <a href="/blog">Blog</a>
    <a href="/contact">Contact</a>
</nav>

</body>
</html>

Наследование шаблонов

Наследование шаблонов — один из самых мощных элементов шаблонизатора Jinja. Его принцип похож на ООП (объектно-ориентированное программирование). Все начинается с создания базового шаблона, который содержит в себе скелет HTML и отдельные маркеры, которые дочерние шаблоны смогут переопределять. Маркеры создаются с помощью инструкции block. Дочерние шаблоны используют инструкцию extends для наследования или расширения основного шаблона. Вот пример:

{# Это шаблон templates/base.html #}
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>{% block title %}Default Title{% endblock %}</title>
</head>
<body>

    {% block nav %}
        <ul>
            <li><a href="/home">Home</a></li>
            <li><a href="/api">API</a></li>
        </ul>
    {% endblock %}
    
    {% block content %}
    
    {% endblock %}
</body>
</html>

Это базовый шаблон base.html. Он создает три блока с помощью block, которые впоследствии будут заполнены дочерними шаблонами. Инструкция block принимает один аргумент — название блока. Внутри шаблона это название должно быть уникальным, иначе возникнет ошибка.

Дочерний шаблон — это шаблон, который растягивает базовый шаблон. Он может добавлять, перезаписывать или оставлять элементы родительского блока. Вот как можно создать дочерний шаблон.

{# Это шаблон templates/child.html #}
{% extends 'base.html' %}


{% block content %}
    {% for bookmark in bookmarks %}
        <p>{{ bookmark.title }}</p>
    {% endfor %}
{% endblock %}

Инструкция extends сообщает Jinja, что child.html — это дочерний элемент, наследник base.html. Когда Jinja обнаруживает инструкцию extends, он загружает базовый шаблон, то есть base.html, а затем заменяет блоки контента внутри родительского шаблона блоками с теми же именами из дочерних шаблонов. Если блок с соответствующим названием не найден, используется блок родительского шаблона.

Стоит отметить, что в дочернем шаблоне перезаписывается только блок content, так что содержимое по умолчанию из title и nav будет использоваться при рендеринге дочернего шаблона. Вывод должен выглядеть следующим образом:
Реклама

<!DOCTYPE html>
<head>
    <meta charset="UTF-8">
    <title>Default Title</title>
</head>
<body>

    <ul>
        <li><a href="/home">Home</a></li>
        <li><a href="/api">API</a></li>
    </ul>

    <p>Bookmark title 1</p>
    <p>Bookmark title 2</p>
    <p>Bookmark title 3</p>
    <p>Bookmark title 4</p>


</body>
</html>

Если нужно, можно поменять заголовок по умолчанию, переписав блок title в child.html:

{# Это шаблон templates/child.html #}
{% extends 'base.html' %}

{% block title %}
    Child Title
{% endblock %}

{% block content %}
    {% for bookmark in bookmarks %}
        <p>{{ bookmark.title }}</p>
    {% endfor %}
{% endblock %}

После перезаписи блока на контент из родительского шаблона все еще можно ссылаться с помощью функции super(). Обычно она используется, когда в дополнение к контенту дочернего шаблона нужно добавить содержимое из родительского. Например:

{# Это шаблон templates/child.html #}
{% extends 'base.html' %}

{% block title %}
    Child Title
{% endblock %}

{% block nav %}
    {{ super() }} {# referring to the content in the parent templates #}
    <li><a href="/contact">Contact</a></li>
    <li><a href="/career">Career</a></li>
{% endblock %}

{% block content %}
    {% for bookmark in bookmarks %}
        <p>{{ bookmark.title }}</p>
    {% endfor %}
{% endblock %}

Вывод:

<!DOCTYPE html>
<head>
    <meta charset="UTF-8">
    <title>Child Title</title>
</head>
<body>

    <ul>
        <li><a href="/home">Home</a></li>
        <li><a href="/api">API</a></li>
        <li><a href="/contact">Contact</a></li>
        <li><a href="/career">Career</a></li>
    </ul>

    <p>Bookmark title 1</p>
    <p>Bookmark title 2</p>
    <p>Bookmark title 3</p>
    <p>Bookmark title 4</p>


</body>
</html>

FastApi

Авторизация и аутентификация

Ссылки:

Fastapi users документация

Role-based authentification

Подготовка проекта

python -m venv --system-site-packages env
python -m pip install fastapi uvicorn

 

 

 

SQLAlchemy

SQLAlchemy

Sqlalchemy

Установка
Ядро 
pip install sqlalchemy
Драйвер для postgres, mysql
pip install psycopg2
pip install psycopg2-binary
pip  install  pymysql
Подключение
from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg2://username:password@localhost:5432/mydb')
engine = create_engine('mysql+pymysql://cookiem:chip@mysql01.com/cookies', pool_recycle=3600)
engine = create_engine('sqlite:///cookies.db')
engine2 = create_engine('sqlite:///:memory:')
Доп. ключи через запятую:
echo булево. Лог запросов. По-умолчанию False.
encoding строка. По-умолчанию utf-8
isolation_level уровень изоляции
pool_recycle число секунд для переподключения, желательно выставлять 3600. При работе с mysql соединение может быть активным до 4 часов.

Создание engine не создает фактического соединения с БД. 

connection = engine.connect()

Сырой запрос:

result = connection.execute("select * from orders").fetchall()
Структура фреймворка
Таблицы -> MetaData -> Engine -> Dialect -> DB
MetaData: объект, в котором таблицы, индексы,...
Ограничение или индекс Описание
ix обычный индекс
uq уникальный индекс
ck ограничение проверки
fk foreign-ключ
pk primary-ключ
from sqlalchemy import MetaData
convention = {
'all_column_names': lambda constraint, table: '_'.join([
column.name for column in constraint.columns.values()
]),
'ix': 'ix__%(table_name)s__%(all_column_names)s',
'uq': 'uq__%(table_name)s__%(all_column_names)s',
'ck': 'ck__%(table_name)s__%(all_column_names)s',
'fk': ('fk__%(table_name)s__%(all_column_names)s' '%(referred_table_name)s'),
'pk': 'pk__%(table_name)s'
}
metadata_obj = MetaData(naming_convention=convention)
from sqlalchemy import ForeignKeyConstraint
album.append_constraint(ForeignKeyConstraint(['ArtistId'], ['artist.ArtistId']))
    • при получении базы, имена таблиц с большой и маленькой буквы. Т е удваивается количество объектов.
engine = create_engine(...)
metadata = MetaData()
cookie_tbl = Table('cookies', metadata, autoload_with=engine)
s = select(cookie_tbl)
with engine.connect() as conn:
    m = conn.execute(s)
    print(m.keys())
  • Получение информации обо всей базе
from sqlalchemy import create_engine, MetaData

engine = create_engine(...)
metadata = MetaData()
metadata.reflect(bind=engine)
for table in metadata.sorted_tables:
    print(table.name)
#Потом получить объект таблицы: 
mytable = metadata.tables['mytable']
  • Получение информации обо всей базе через ORM + Automap

from sqlalchemy.ext.automap import automap_base
from sqlalchemy import create_engine

Base = automap_base()
engine = create_engine('sqlite:///Chinook_Sqlite.sqlite')
Base.prepare(engine, reflect=True)
# данные о классах загружены. 
#Например для получения списка классов:
Base.classes.keys()
Artist = Base.classes.Artist # создание классов
#Внешние связи - в свойстве <related_object>_collection
artist = session.query(Artist).first()
for album in artist.album_collection:
    print('{} - {}'.format(artist.Name, album.Title))

Engine: Скрывает пул подключений и диалект. 
Dialect: Скрывает детали реализации в конкретной базе
Core: SQL в чистом виде
ORM: абстракции
Работа с данными
INSERT
Вариант 1:
ins = cookies.insert().values(
    cookie_name="chocolate chip",
    cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
    cookie_sku="CC01",
    quantity="12",
    unit_cost="0.50"
    )
    result = connection.execute(ins)
    print(result.inserted_primary_key)
Вариант 2:
from sqlalchemy import insert
ins = insert(cookies).values( 
    cookie_name="chocolate chip",
    cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
    cookie_sku="CC01",
    quantity="12",
    unit_cost="0.50"
)
Вариант 3
ins = cookies.insert()
result = connection.execute(
    ins, 
    cookie_name='dark chocolate chip', 
    cookie_recipe_url='http://some.aweso.me/cookie/recipe_dark.html',
    cookie_sku='CC02',
    quantity='1',
    unit_cost='0.75'
)
result.inserted_primary_key
Вариант 4
inventory_list = [ 
    {
        'cookie_name': 'peanut butter',
        'cookie_recipe_url': 'http://some.aweso.me/cookie/peanut.html',
        'cookie_sku': 'PB01',
        'quantity': '24',
        'unit_cost': '0.25'
    },
    {
        'cookie_name': 'oatmeal raisin',
        'cookie_recipe_url': 'http://some.okay.me/cookie/raisin.html',
        'cookie_sku': 'EWW01',
        'quantity': '100',
        'unit_cost': '1.00'
    }
]
result = connection.execute(ins, inventory_list)
SELECT 
Вариант 1
from sqlalchemy.sql import select
s = select([cookies]) 
rp = connection.execute(s)
results = rp.fetchall()
Вариант 2
s = cookies.select()
rp = connection.execute(s)
results = rp.fetchall()
Вариант 3
s = cookies.select()
rp = connection.execute(s) 
for record in rp:
    print(record.cookie_name)
SELECT определенных столбцов
s = select([cookies.c.cookie_name, cookies.c.quantity])
ORDERING
s = select([cookies.c.cookie_name, cookies.c.quantity])
Прямая сортировка
s = s.order_by(cookies.c.quantity)
Обратная сортировка
s = s.order_by(desc(cookies.c.quantity))
rp = connection.execute(s)
LIMITING
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
s = s.limit(2)
rp = connection.execute(s)
Встроенные функции SQL
Сумма: func.sum
from sqlalchemy.sql import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())
Количество: func.count
s = select([func.count(cookies.c.cookie_name)])
rp = connection.execute(s)
record = rp.first()
print(record.keys()) # ключи могут быть разные.
print(record.count_1) 
Название свойства: label 
s = select([func.count(cookies.c.cookie_name).label('inventory_count')]) 
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.inventory_count)
WHERE
Их можно комбинировать, используется AND
s = select([cookies]).where(cookies.c.cookie_name == 'chocolate chip')
rp = connection.execute(s)
Варианты модификаторов:
Пример для LIKE
s = select([cookies]).where(cookies.c.cookie_name.like('%chocolate%'))
rp = connection.execute(s)
for record in rp.fetchall():
    print(record.cookie_name)
Модификация полученных данных по шаблону
Вариант 1: к каждому значению столбца
s = select([cookies.c.cookie_name, 'SKU-' + cookies.c.cookie_sku]) - добавит строку 'SKU-'
Вариант 2: через функцию cast
from sqlalchemy import cast 
s = select([cookies.c.cookie_name,
          cast((cookies.c.quantity * cookies.c.unit_cost),
               Numeric(12,2)).label('inv_cost')]) 
for row in connection.execute(s):
    print('{} - {}'.format(row.cookie_name, row.inv_cost))
Логические функции
from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
    and_(
        cookies.c.quantity > 23,
        cookies.c.unit_cost < 0.40
   )
)
Извлечение данных
first_row = results[0] 
first_row[1] 
first_row.cookie_name 
first_row[cookies.c.cookie_name]
Для rp есть следующие варианты, однако без limit() извлекаются все данные, затем одна строка:
Обновление данных
from sqlalchemy import update
u = update(cookies).where(cookies.c.cookie_name == "chocolate chip")
u = u.values(quantity=(cookies.c.quantity + 120)) 
result = connection.execute(u)
Удаление данных
from sqlalchemy import delete
u = delete(cookies).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(u)
JOIN
columns = [orders.c.order_id, users.c.username, users.c.phone,
           cookies.c.cookie_name, line_items.c.quantity,
           line_items.c.extended_cost]
cookiemon_orders = select(columns)
cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join( 
                        line_items).join(cookies)).where(users.c.username ==
                            'cookiemon')
result = connection.execute(cookiemon_orders).fetchall()
OUTER JOIN
Для получения обратной статистики
columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders)) 
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
Есть поддержка ALIAS
Группировка данных
columns = [users.c.username, func.count(orders.c.order_id)] 
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username) 
result = connection.execute(all_orders).fetchall()
Объединение запросов по условию
def get_orders_by_customer(cust_name, shipped=None, details=False):
    columns = [orders.c.order_id, users.c.username, users.c.phone]
    joins = users.join(orders)
    if details:
        columns.extend([cookies.c.cookie_name, line_items.c.quantity,
                       line_items.c.extended_cost])
        joins = joins.join(line_items).join(cookies)
    cust_orders = select(columns)
    cust_orders = cust_orders.select_from(joins)


    cust_orders = cust_orders.where(users.c.username == cust_name)
    if shipped is not None:
        cust_orders = cust_orders.where(orders.c.shipped == shipped)
    result = connection.execute(cust_orders).fetchall()
    return result
get_orders_by_customer('cakeeater') 
get_orders_by_customer('cakeeater', details=True) 
get_orders_by_customer('cakeeater', shipped=True) 
get_orders_by_customer('cakeeater', shipped=False) 
get_orders_by_customer('cakeeater', shipped=False, details=True) 

SQLAlchemy

ORM режим

Таблица это класс с требованиями:

from sqlalchemy import Table, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
			
	Base = declarative_base() 
	
	class Cookie(Base): 
		__tablename__ = 'cookies' 
		__table_args__ = (CheckConstraint('quantity >= 0', name='quantity_positive'),)
		cookie_id = Column(Integer(), primary_key=True) 
		cookie_name = Column(String(50), index=True)
		quantity = Column(Integer())

Создание таблиц

from sqlalchemy import create_engine
from dataclasses import Base

engine = create_engine(...)
Base.metadata.create_all(engine)

Ограничения

 

__table_args__ = (ForeignKeyConstraint(['id'], ['other_table.id']), CheckConstraint(unit_cost >= 0.00', name='unit_cost_positive'))

 

Внешние связи

Один к одному:

Один ко многим:

user = relationship("User", backref=backref('orders'))

На себя (дерево) - неоднозначное решение. 

class Employee(Base):
	__tablename__ = 'employees'
	id = Column(Integer(), primary_key=True)
	manager_id = Column(Integer(), ForeignKey('employees.id'))
	name = Column(String(255), nullable=False)
	manager = relationship("Employee", backref=backref('reports'), remote_side=[id])

Сессии

В ORM режиме, сессия упаковывает

Это похожая на хэш-систему, состоящую из списка объектов, таблиц и ключей. Сессия создается через sessionmaker, один раз. Соединяется с базой в момент необходимости

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker 

engine = create_engine('sqlite:///:memory:')
Session = sessionmaker(bind=engine) 
session = Session()

Состояния объекта в сессии:

Transient Объект не в сессии и не в БД
Pending Объект добавлен в сессию add(), но не flush или commit
Persistent Объект в сессии имеет связанную запись в БД
Detached Объект больше не в сессии, но в БД есть соответствующая запись
Modified Объект изменен

Просмотр состояния:

from sqlalchemy import inspect
insp = inspect(cc_cookie)

Отключение объекта от сессии:

session.expunge(cc_cookie)

Просмотр списка атрибутов и выяснение, что было модифицировано

for attr, attr_state in insp.attrs.items():
	if attr_state.history.has_changes(): 
		print('{}: {}'.format(attr, attr_state.value))
		print('History: {}\n'.format(attr_state.history))

Добавление данных

Создаем объект класса, добавляем в сессию и коммитим. Множественное добавление данных:

dcc = Cookie(...)
mcc = Cookie(...)
session.add(dcc)
session.add(mcc)
session.flush()

flush: не выполняет коммит и не завершает транзакцию, но получает id. Потом нужно в пределах сессии сделать commit. Commit в пределах чужой сессии не влияет. Дальше можно использовать объект.

Если дальше не нужно выполнять операции с объектами:

session.bulk_save_objects([dcc,mcc])
session.commit()

Получение данных

all() все
first() возвращает одну запись если она единственная и закрывает соединение
one() возвращает одну запись и оставляет соединение. Аккуратно!
scalar() возвращает одно значение если результат запроса одна строка с одним столбцом
cookies = session.query(Cookie).all() 
print(cookies)

Если использовать через итератор, то без all:

Получение определенных столбцов:

cookies = session.query(Cookie.cookie_id).all()

Сортировка:

session.query(Cookie).order_by(Cookie.quantity).all()
.order_by(desc(Cookie.quantity))

Ограничения через срезы или .limit(2)

Встроенные функции:

from sqlalchemy import func
inv_count = session.query(func.sum(Cookie.quantity)).scalar() 
print(inv_count)
rec_count = session.query(func.count(Cookie.cookie_name)).first()

Метки: можно добавить для дальнейшего обращения

rec_count = session.query(func.count(Cookie.cookie_name).label('inventory_count')).first()
print(rec_count.inventory_count)

Фильтрация

record = session.query(Cookie).filter(cookie_name=='chocolate chip').first() 
record = session.query(Cookie).filter_by(cookie_name='chocolate chip').first()
query = session.query(Cookie).filter(Cookie.cookie_name.like('%chocolate%'))
query = session.query(Cookie).filter(Cookie.quantity > 23, Cookie.unit_cost < 0.40)

Изменение строк при выводе

results = session.query(Cookie.cookie_name, 'SKU-' + Cookie.cookie_sku).all()
query = session.query(Cookie.cookie_name,
      cast((Cookie.quantity * Cookie.unit_cost),
           Numeric(12,2)).label('inv_cost'))

Join:

query = session.query(Order.order_id, User.username)
results = query.join(User).all()

query = session.query(User.username, func.count(Order.order_id))
query = query.outerjoin(Order).group_by(User.username)

Group:

query = session.query(User.username, func.count(Order.order_id))
query = query.outerjoin(Order).group_by(User.username)

Сырые запросы

from sqlalchemy import text
query = session.query(User).filter(text("username='cookiemon'"))

Обновление данных

Через объект

query = session.query(Cookie) 
cc_cookie = query.filter(Cookie.cookie_name == "chocolate chip").first() 
cc_cookie.quantity = cc_cookie.quantity + 120
session.commit()

Через метод update

query = session.query(Cookie)
query = query.filter(Cookie.cookie_name == "chocolate chip")
query.update({Cookie.quantity: Cookie.quantity - 20})

Удаление

session.delete(dcc_cookie)
session.commit()

Исключения

from sqlalchemy.orm.exc import MultipleResultsFound 
try:
	results = session.query(Cookie).one()
except MultipleResultsFound as error: 
	print('We found too many cookies... is that even possible?')

Транзакции

В ORM транзакция создается автоматически до очередного коммита.

session.add(order)
try:
	session.commit()
except IntegrityError as error: 
	session.rollback() 

Пример структурирования

db.py

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
                        DateTime, ForeignKey, Boolean, create_engine)
class DataAccessLayer:
    connection = None
    engine = None
    conn_string = None
    metadata = MetaData()
    cookies = Table('cookies',
                    metadata,
                    Column('cookie_id', Integer(), primary_key=True),
                    Column('cookie_name', String(50), index=True),
                    Column('cookie_recipe_url', String(255)),
                    Column('cookie_sku', String(55)),
                    Column('quantity', Integer()),
                    Column('unit_cost', Numeric(12, 2))
              ) 
			  
    def db_init(self, conn_string): 
        self.engine = create_engine(conn_string or self.conn_string)
        self.metadata.create_all(self.engine)
        self.connection = self.engine.connect()
dal = DataAccessLayer()

app.py

from db import dal 
from sqlalchemy.sql import select

def get_orders_by_customer(cust_name, shipped=None, details=False):
    columns = [dal.orders.c.order_id, dal.users.c.username, dal.users.c.phone]
    joins = dal.users.join(dal.orders) 
    if details:
        columns.extend([dal.cookies.c.cookie_name,
                        dal.line_items.c.quantity,
                        dal.line_items.c.extended_cost])
        joins = joins.join(dal.line_items).join(dal.cookies)
    cust_orders = select(columns)
    cust_orders = cust_orders.select_from(joins).where(
        dal.users.c.username == cust_name)
    if shipped is not None:
        cust_orders = cust_orders.where(dal.orders.c.shipped == shipped)
    return dal.connection.execute(cust_orders).fetchall()

test.py 

import unittest
class TestApp(unittest.TestCase): 
    
	@classmethod
    def setUpClass(cls): 
        dal.db_init('sqlite:///:memory:')
	
	def test_one(self):
		res = get_orders_by_customer('', False)
		self.assert_equal(res, [])


SQLAlchemy

Core режим

Сначала необходимо определить, как данные хранятся в таблице. Варианты определения:

Сопоставление типов

SQLAlchemy Python SQL
BigInteger int BIGINT
Boolean bool BOOLEAN or SMALLINT
Date datetime.date DATE (SQLite: STRING)
DateTime datetime.datetime DATETIME (SQLite: STRING)
Time datetime.time DATETIME
Enum str ENUM or VARCHAR
Float float or Decimal FLOAT or REAL
Integer int INTEGER
Interval datetime.timedelta INTERVAL or DATE from epoch
LargeBinary byte BLOB or BYTEA
Numeric decimal.Decimal NUMERIC or DECIMAL
Unicode unicode UNICODE or VARCHAR
Text str CLOB or TEXT

Metadata

Каталог объектов Table с опциональной информацией о engine и соединении.

from sqlalchemy import MetaData
metadata = MetaData()

Создание таблицы

metadata.create_all(engine)

Метод ...create_all не пересоздает таблицы.

Объект таблицы состоит из названия, переменной метаданных и столбцов.

from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey
from datetime import datetime
from sqlalchemy import DateTime

cookies = Table('cookies', metadata,
    Column('cookie_id', Integer(), primary_key=True), 
    Column('cookie_name', String(50), index=True), 
    Column('cookie_recipe_url', String(255)),
    Column('cookie_sku', String(55)),
    Column('quantity', Integer()),
    Column('unit_cost', Numeric(12, 2)) 
)

users = Table('users', metadata,
    Column('user_id', Integer(), primary_key=True),
    Column('username', String(15), nullable=False, unique=True), 
    Column('email_address', String(255), nullable=False),
    Column('phone', String(20), nullable=False),
    Column('password', String(25), nullable=False),
    Column('created_on', DateTime(), default=datetime.now), 
    Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now) 
)

Класс Column

primary_key=True
index=True
nullable=False
unique=True
default=datetime.now
onupdate=datetime.now

Ключи, ограничения и индексы

Могут быть определены в конструкторе столбца (primary_key=True) или позже в конструкторе таблицы.

from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint, CheckConstraint

users = Table(...
             PrimaryKeyConstraint('user_id', name='user_pk'),
             UniqueConstraint('username', name='uix_username'),
             CheckConstraint('unit_cost >= 0.00', name='unit_cost_positive'),
             ...)

Множественные ключи перечисляются через запятую.

from sqlalchemy import Index
 
Index('ix_cookies_cookie_name', 'cookie_name')
Index('ix_test', mytable.c.cookie_sku, mytable.c.cookie_name)

Внешние связи

from sqlalchemy import ForeignKey
orders = Table('orders', metadata,
    Column('order_id', Integer(), primary_key=True),
    Column('user_id', ForeignKey('users.user_id')),
    Column('shipped', Boolean(), default=False)
)
line_items = Table('line_items', metadata,
    Column('line_items_id', Integer(), primary_key=True),
    Column('order_id', ForeignKey('orders.order_id')),
    Column('cookie_id', ForeignKey('cookies.cookie_id')),
    Column('quantity', Integer()),
    Column('extended_cost', Numeric(12, 2))
)

Связь для поля order_id:

Column('user_id', ForeignKey('users.user_id'))
#При желании - ограничение
ForeignKeyConstraint(['order_id'], ['orders.order_id'])

Добавление данных

from sqlalchemy import insert
перем = таблица.insert().values()

Лучше (?) вариант

from sqlalchemy import insert
перем = insert(таблица).values()

Строковое представление запроса

str(перем)

Компиляция запроса

перем.compile()
перем.compile().params

Примеры

with engine.connect() as connection:
	metadata = ...
	cookies = Table...
	ins = insert(cookies).values(...)
	res = connection.execute(ins)
	#res.inserted_primary_key - какой в будущем будет ключ (сейчас фактически в БД нет данных)
	connection.commit()
ins = cookies.insert()
inventory_list = [ 
	{
		'cookie_name': 'peanut butter',
		'cookie_recipe_url': 'http://some.aweso.me/cookie/peanut.html',
	},
	{
		'cookie_name': 'oatmeal raisin',
		'cookie_recipe_url': 'http://some.okay.me/cookie/raisin.html',
	}
]
result = connection.execute(ins, inventory_list)
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey

metadata = MetaData()
cookies = Table('cookies', metadata,
	Column('cookie_id', Integer(), primary_key=True), 
	Column('cookie_name', String(50), index=True), 
	Column('cookie_recipe_url', String(255))
)
engine = create_engine('sqlite:///:memory:')
connection = engine.connect()
metadata.create_all(engine)

from sqlalchemy import insert

ins = cookies.insert().values(
	cookie_name="chocolate chip",
	cookie_recipe_url="http://some.aweso.me/cookie/recipe.html"
	)
print(str(ins))

Получение данных

from sqlalchemy.sql import select
s = select(cookies) 
rp = connection.execute(s)

Список столбцов

rp.keys()

Получение результата

results = rp.fetchall()
fetchall() Все записи
first() Возвращает одну запись если она единственная и закрывает соединение
fetchone() Возвращает одну запись и оставляет соединение. Аккуратно!
scalar() Возвращает одно значение если результат запроса одна строка с одним столбцом

Доступ возможен по:

first_row = results[0] по индексу результата
first_row[1] по номеру столбца в результате
first_row.cookie_name по имени столбца
first_row[cookies.c.cookie_name] через объект таблицы

Сортировка

s = select(cookies.c.cookie_name, cookies.c.quantity)
s = s.order_by(cookies.c.quantity)
s = s.order_by(desc(cookies.c.quantity))

Ограничения количества

s = s.limit(2)

Встроенные функции

from sqlalchemy.sql import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())

Фильтрация

s = select([cookies]).where(cookies.c.cookie_name == 'chocolate chip')
Оператор Описание
== Точное равенство
like('%chocolate%') Вхождение элемента (регистрозависимый)
ilike(string) Вхождение элемента
between(cleft, cright) Элемент между значениями
concat(column_two) Объединение столбцов
distinct() Только уникальные значения столбца
in_([list]) Значения столбца в списке
is_(None) Значение в столбце None
contains(string) Содержит в себе строку (регистрозависимый)
endswith(string) Заканчивается строкой (регистрозависимый)
startswith(string) Начинается строкой (регистрозависимый)
notin_() Отрицание
isnot() Исключение

Внутри where можно использовать and_, or_, not_

from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
	and_(
		cookies.c.quantity > 23,
		cookies.c.unit_cost < 0.40
	)
)

Join

columns = [orders.c.order_id, users.c.username, users.c.phone,
		cookies.c.cookie_name, line_items.c.quantity, line_items.c.extended_cost]
cookiemon_orders = select(*columns)
cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join(line_items).join(cookies)).where(users.c.username == 'cookiemon')
result = connection.execute(cookiemon_orders).fetchall()
for row in result:
	print(row)

Для outerjoin: join -> outerjoin

Алиасы

manager = employee_table.alias('mgr')

Grouping:

columns = [users.c.username, func.count(orders.c.order_id)] 
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)

Обновление данных:

from sqlalchemy import update
u = update(cookies).where(cookies.c.cookie_name == "chocolate chip")
u = u.values(quantity=(cookies.c.quantity + 120)) 
result = connection.execute(u)

Удаление данных:

from sqlalchemy import delete
u = delete(cookies).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(u)

Сырые запросы (raw)

result = connection.execute("select * from orders").fetchall()

Обработка исключений

AttributeError - ошибка набора данных
IntegrityError - ошибка ограничений
Стандартная обработка подходит если исполняется один независимый запрос.

try:
	result = connection.execute(ins)
except IntegrityError as error: 
	print(error.orig.message, error.params)

В случае нескольких взаимозависимых запросов необходимо использовать транзакции.

transaction = connection.begin() 
try:
	...
	transaction.commit() 
except IntegrityError as error:
	transaction.rollback()

Пример:

transaction = connection.begin() 
cookies_to_ship = connection.execute(s).fetchall() 
try:
	for cookie in cookies_to_ship:
		u = update(cookies).where(cookies.c.cookie_id == cookie.cookie_id)
		u = u.values(quantity = cookies.c.quantity-cookie.quantity)
		result = connection.execute(u)
	u = update(orders).where(orders.c.order_id == order_id)
	u = u.values(shipped=True)
	result = connection.execute(u)
	print("Shipped order ID: {}".format(order_id))
	transaction.commit() 
except IntegrityError as error:
	transaction.rollback() 

 

 

SQLAlchemy

Пример проекта

Структура проекта

Директория / файл Описание
alembic/
Настройки alembic
conf/ Настройки окружений. 
conf/settings

Файлы основных настроек. 

db/

Описание структуры базы данных.

initializer.py - Инициализация базы данных, метаданных 

db/tablesdefinition

Файлы описания структур таблиц и методов взаимодействия с данными.

docker/ Настройки контейнера
docker/data Данные БД
docker/docker-entrypoint-initdb.d

Скрипты инициализации БД

main.sql - Файл скрипта иницализации

docker/docker-compose.yml Compose файл
src/ Дополнительные модули
main.py Точка входа
error.log
Файл лога.

Предварительная настройка

Для работы примера необходимо установить docker. 

Клонировать проекта с репозитория 

git clone https://gitverse.ru/bobrobot/alembictemplate.git

Перейти в директорию проекта, создать виртуальное окружение и активировать 

cd alembictemplate
python3 -m venv env
source env/bin/activate

Установить дополнительные модули 

pip install -r requirements.txt

Перейти в директорию docker и в файле docker-compose.yml настроить пути, имя БД, логин и пароль к новой базе данных. 

services:
  postgres:
    image: postgres:latest
    environment:
      POSTGRES_DB: "learnsqlalchemy"
      POSTGRES_USER: "learner"
      POSTGRES_PASSWORD: "StrongPassword123"
      PGDATA: "/home/sergey/projects/alembictemplate/docker/data/pgdata"
    volumes:
      - .:/docker-entrypoint-initdb.d
      - mydata:/home/sergey/projects/alembictemplate/docker/data
    ports:
      - "5430:5432"
volumes:
  mydata:

В файле docker-entrypoint-initdb.d/main.sql изменить имя БД, логин и пароль.

CREATE DATABASE learnsqlalchemy;
CREATE USER learner WITH PASSWORD 'StrongPassword123';
ALTER ROLE learner WITH PASSWORD 'StrongPassword123';
GRANT ALL PRIVILEGES ON DATABASE learnsqlalchemy to learner;

В директории docker запустить контейнер БД в фоновом режиме. 

docker compose up -d

Для остановки контейнера: 

docker compose stop

Сейчас, запустив контейнер, при помощи консольного клиента psql можно проверить соединение с базой данных для пользователя. 

psql -d learnsqlalchemy -U learner -W -h 127.0.0.1 -p 5430

Для работы с настройками в формате json используется библиотека src/libsettings.py Описание библиотеки Создать папку src, скопировать из проекта библиотеку libsettings.py 

Настройки системы

Файлы основных настроек расположены в conf/settings/  Файл base.py 

'''Loading settings to project'''
import os
from src.libsettings import Jsettings

settingspath = os.path.join('conf', 'settings', 'settings.json')
schemapath = os.path.join('conf', 'settings', 'schema.json')
# dev settings
mysettings = Jsettings(settingsfname= settingspath,
                       schemafname=schemapath)
mysettings.load_settings()

Файл schema.json 

{
    "type": "object",
    "properties": {
        "db_username": {"type": "string"},
        "db_password": {"type": "string"},
        "db_host": {"type": "string"},
        "db_port": {"type": "string"},
        "db_name": {"type": "string"}
    }, 
    "required": ["db_username", "db_password", "db_host",
                "db_port", "db_name"]
}

Файл settings.json 

{
    "db_username": "learner",
    "db_password": "StrongPassword123",
    "db_host": "127.0.0.1",
    "db_port": "5430",
    "db_name": "learnsqlalchemy"
}

Файл base.py проверяет схему и создает объект настроек mysettings из файла settings.json. Для получения объекта настроек нужно импортировать объект mysettings в нужном модуле. В данный момент присутствуют настройки базы данных с префиксом db_*

Если такая усложненная система управления настройками покажется излишней, возможно использовать экспорт настроек напрямую из файла.

Инициализация базы данных

Создаем папку db, в ней создаем файл initializer.py 

'''Db classes and  initialization'''
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from conf.settings.base import mysettings

#load engine settings
engine = create_engine(
    "postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}".format(
        db_username=mysettings.db_username,
        db_password=mysettings.db_password,
        db_host=mysettings.db_host,
        db_port=mysettings.db_port,
        db_name=mysettings.db_name
    ),
    echo=True)
Base = declarative_base()

Здесь только создается engine для подключения к БД и класс Base. При настройке структуры таблиц данный файл будет обновлен, сейчас нужен только класс Base.

Настройка системы версионирования базы данных alembic

Инициализируем alembic в корне проекта. 

alembic init alembic

В корне проекта будет создан файл alembic.ini, будет создана папка alembic с файлами инициализации. В большинстве инструкций параметры подключения задаются в файле alembic.ini однако, для доступа к настройкам из единой точки будет изпользоваться способ установки параметров в файле env.py Поэтому в файле alembic.ini переменная sqlalchemy.url должна быть закомментирована. Часть файла alembic.ini 

#sqlalchemy.url = driver://user:pass@localhost/dbname

В файле env.py

import os
import sys

sys.path.append(os.getcwd())

Импортируем настройки, создаем строку соединения и создаем закоментированный ранее в файле alembic.ini параметр sqlalchemy.url

from conf.settings.base import mysettings

connstring = "postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}".format(
        db_username=mysettings.db_username,
        db_password=mysettings.db_password,
        db_host=mysettings.db_host,
        db_port=mysettings.db_port,
        db_name=mysettings.db_name
    )
config.set_main_option(name="sqlalchemy.url", value=connstring)

Импортируем db.initializer и создаем метаданные 

import db.initializer

target_metadata = db.initializer.Base.metadata

Остальные параметры оставляем неизменными. Результирующий файл настроек окружения alembic env.py: 

from logging.config import fileConfig
import os
import sys

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

from conf.settings.base import mysettings

sys.path.append(os.getcwd())

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

connstring = "postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}".format(
        db_username=mysettings.db_username,
        db_password=mysettings.db_password,
        db_host=mysettings.db_host,
        db_port=mysettings.db_port,
        db_name=mysettings.db_name
    )
config.set_main_option(name="sqlalchemy.url", value=connstring)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
import db.initializer

target_metadata = db.initializer.Base.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection, target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Обновление конфигурации таблиц

Для проверки создаем первую пустую миграцию. После ее выполнения создастся таблица alembic_version

alembic revision -m "Empty Init"

В данный момент фактического соединения с БД не было. В папке versions сформируется файл вида <id>_empty_init.py 

После выполнения команды 

alembic upgrade head

в таблице alembic_version появится одна запись - идентификатор текущей версии базы данных.

Сейчас в папке db создаем папку tablesdefinition. В ней будем хранить файлы описаний таблиц и методы для работы с таблицами. Создадим файл userprofile.py 

'''Definition tables of userprofile'''
import logging
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
import db.initializer
from db.initializer import engine

def create_userprofile_class(Curbase):
    class Userprofile(Curbase):
        '''Class Userprofile definition'''
        __tablename__ = 'userprofile'

        user_id = Column(Integer(), primary_key=True)
        username = Column(String(15), nullable=False, unique=True)
        password = Column(String(255), nullable=False)
        email = Column(String(255))
        name = Column(String(100))
        second_name = Column(String(100))
        photo = Column(String(255))
        balance = Column(Integer())
    return Userprofile

def create_one_userprofile(username, password, email='', name='',
                           second_name='', photo='', balance=0):
    ''' Create one userprofile '''
    try:
        with engine.connect():
            Session = sessionmaker(bind=engine)
            with Session() as sess:
                upelem = db.initializer.userprofile(username=username, password=password,
                                     email=email, name=name, second_name=second_name,
                                     photo=photo, balance=balance)
                sess.add(upelem)
                sess.commit()
    except Exception as e:
        logging.error(e)

И в файле initializer.py после инициализации переменной Base добавим раздел инициализации описания таблицы 

#============================ Creation classes definitions =========================
# === Import Userprofile class ===
from db.tablesdefinition.userprofile import create_userprofile_class
userprofile = create_userprofile_class(Base)
# ==================================================================================

Теперь после выполнения команды 

alembic revision --autogenerate -m "Added userprofile model"

будет автоматически сгененрирован файл миграции, и после

alembic upgrade head

создастся таблица userprofile.

P.s. В точке входа необходимо полностью импортировать initializer иначе будет ошибка, пример: 

'''Main learning module'''
import logging

from db import initializer
from db.tablesdefinition.userprofile import create_one_userprofile

logging.basicConfig(level=logging.INFO,
                    filename='error.log',
                    format="%(levelname)s %(message)s")

if __name__ == '__main__':
    create_one_userprofile(username = 'first6',
                           password = 'first',
                           balance = 1)

Alembic

Alembic

Установка и настройка

Стандартная установка:

pip install alembic

Первая инициализация: alembic init folder_for_dbdata 

alembic init alembic

Будет создан файл alembic.ini и директория в соответствии с названием. 

В alembic.ini обновляем параметр sqlalchemy.url

В файл env.py добавляем код

import os
import sys
sys.path.append(os.getcwd())
from app.db import Base 
target_metadata = Base.metadata 

 

 

Alembic

Миграции

Создание первой (пустой) миграции.

После создания пустой миграции, в БД создастся таблица alembic_version в которой хранится идентификатор текущей версии.

alembic revision -m "Empty Init"

Обновление базы данных после создания миграции

alembic upgrade head

Хэш текущей миграции

alembic current

История миграций.

Считываются все файлы миграций. Т.е. в случае загрузки предыдущего состояния, последующие миграции будут отображаться до тех пор, пока ненужные файлы миграций не будут удалены.

alembic history

Возврат к предыдущему состоянию

alembic downgrade migration_id

Для возврата в стартовое состояние, нужно выполнить 

alembic downgrade base

Если что-то пошло не так, для возврата в нулевое состояние нужно удалить из таблицы alembic_version текущий номер.

DELETE FROM public.alembic_version;

Пропуск состояния

alembic stamp migration_id

Экспорт в формате sql 

alembic upgrade migration_id_start:migration_id_stop --sql > migration.sql

Автогенерация миграции

alembic revision --autogenerate -m "Added Cookie model"

Поддерживаемые и неподдерживаемые действия при автоматической миграции

Тип элемента Поддерживаемые Неподдерживаемые
Таблицы Добавление и удаление Изменение имени
Столбец Добавление, удаление, изменение нулевого статуса на столбце Изменение имени
Индекс Основные изменения в индексах и явно обозначенных уникальных ограничениях, поддержка автоматической генерации индексов
и уникальных ограничений

Ограничения
Ограничения без явного имени
Ключи Переименование
Типы
Типы, которые явно не поддерживаются базой данных

Чтобы alembic увидел класс данных, необходимо его непосредственно импортировать. Импорт всей директории не работает.

Ручное создание миграций

На примере изменения имени таблицы

alembic revision -m "Renaming table"
def upgrade():
    op.rename_table('old_name', 'new_name')
def downgrade():
    op.rename_table('new_name', 'old_name')
alembic upgrade head

Команды alembic

add_column Добавить столбец
alter_column Изменить тип столбца, имя или значение по-умолчанию
create_check_constraint Добавить ограничение
create_foreign_key Добавить внешний ключ
create_index Создать индекс
create_primary_key Создать основной ключ
create_table Создать таблицу
create_unique_constraint Создать ограничение уникальности
drop_column Удалить столбец
drop_constraint Удалить ограничение
drop_index Удалить индекс
drop_table Удалить таблицу
execute Выполнить сырую SQL команду
rename_table Переименовать таблицу


Модули

Модули

Описание модулей

Хранение конфигурации

Configparser стандартная библиотека для чтения и записи .ini файлов. Инструкция 1 

Jsonschema модуль для проверки соответствия json существующей схеме. Документация 

Libsettings модуль на основе jsonschema для чтения конфигурации из json файла и проверки конфигурации на соответствие схеме. Gitverse проекта

import logging
from libsettings import Jsettings

logging.basicConfig(level=logging.ERROR,
                    filename='error.log',
                    format="%(levelname)s %(message)s")
mysettings = Jsettings(settingsfname='mysettings.json',
                          schemafname='myschema.json')
mysettings.load_settings()

Модули

jsonschema

Используется для валидации json схемы. По умолчанию дополнительно указанные ключи (не существующие в схеме, но присутствующие в документе) не проверяются.

Установка

pip install jsonschema

Базовое использование

from jsonschema import validate

validate(instance=json_to_check, schema=schema)

 

 

Исключения

jsonschema.exceptions.ValidationError - если документ не соответствует структуре

jsonschema.exceptions.SchemaError - если сама схема некорректна

Пример схемы:

schema = {
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "age": {"type": "number"},
    },
    "required": ["name"],
}

Данная схема определяет json объект с 2 свойствами: name и age. Обязательное свойство name.

Ключевые слова

Для некоторых типов используются дополнительные ключевые слова.

Ключевое слово Описание
type

Тип. Для корня часто object.

string - строка

number - число

object - объект

array - список

$defs Вложенный шаблон для случая, когда шаблон элемента встречается в нескольких местах.
$ref Подстановка вложенного шаблона.
$schema Ссылка на шаблон шаблона. При обновлении версии библиотеки будет использоваться новый шаблон шаблона, что может привести к проблемам. Желательно указывать.

Дополнительные ключевые слова для типов.

 Тип array

Ключевое слово Описание
items

Тип элементов списка.

"scores": {
            "type": "array",
            "items": {"type": "number"},
        }
minItems Минимальное количество элементов


Тип object

Ключевое слово Описание
required

Список обязательных ключей.

"required": ["name"]
properties

Определяет ключи объекта и их тип.

"properties": {
        "name": {"type": "string"}
    },
additionalProperties

Если True, то дополнительно указанные ключи приводят к исключению.

Локальные вложенные шаблоны.

Для определения используется переменная $defs, для использования - $ref. 

schema = {
    "type": "object",
    "properties": {
        "address": {"$ref": "#/$defs/address"},
    },
    "$defs": {
        "address": {
            "type": "object",
            "properties": {
                "street": {"type": "string"},
            }
        },
    },
}

Модули

Pydantic 2

Описание

Библиотека валидации (проверка на соответствие типов) и трансформации (автоматическое приведение к нужным типам и форматам) данных.

Модели наследуются от класса BaseModel. Модель описывает набор полей, представляющих структуру данных и условия  валидации.

Установка

pip install pydantic

Типизация:

Внутри класса можно комбинировать способы типизации. 

from pydantic import BaseModel, Field

class User(BaseModel):
    name: str
    email: str = Field(..., alias='email_address')

Валидация:

from pydantic import BaseModel, field_validator

class User(BaseModel):
    age: int

    @field_validator('age')
    def check_age(cls, value):
        if value < 18:
            raise ValueError('Возраст должен быть больше 18 лет')
        return value
from pydantic import BaseModel, computed_field

class User(BaseModel):
    name: str
    surname: str

    @computed_field
    def full_name(self) -> str:
        return f"{self.name} {self.surname}"

Работает со всей моделью (а не с отдельными полями), может изменять данные перед валидацией (mode='before') или после (mode='after'), полезен для комплексных проверок, где одно поле зависит от другого, может возвращать новую версию модели (если нужно модифицировать данные).

@model_validator(mode='before')
def validate_before(cls, data: dict):
    if 'username' not in data:
        data['username'] = "guest_" + str(data.get('id', 0))
    return data
@model_validator(mode='after')
def validate_after(self):
    if self.age < 18 and self.is_premium:
        raise ValueError("Minors cannot have premium accounts!")
    return self

При проверке before передается класс, при after - объект. Можно делать два валидатора: before для подстановки вычисляемых значений и after для финальной проверки

Интеграция с SQLAlchemy:

Для настройки используется параметр ConfigDict с флагом from_attributes=True. 

from datetime import date
from pydantic import BaseModel, ConfigDict

class User(BaseModel):
    id: int
    name: str = 'John Doe'
    birthday_date: date

    config = ConfigDict(from_attributes=True)

Для создания модели Pydantic из объекта ORM используется метод from_orm. 

user = User.from_orm(orm_instance)

Ссылки:

Основа для текста

Модули

Pyinstaller

Установка: 

python -m pip install pyinstaller

Использование 

pyinstaller [параметры] script.py

Параметры

Параметр Описание
--onefile собирает всё в один .exe файл
--windowed скрывает консоль (если у вас GUI-приложение). Если нужна консоль, уберите этот флаг.
--icon=ваша_иконка.ico иконка
--name "МояПрограмма" имя программы

 

 

 

Rabbitmq

Базовая информация

 

Авторизация через ВК

Перейти на VK для разработчиков

Создать новое приложение, страницы настроек: 

python-vk-1.JPG 

python-vk-2.JPGpython-vk-3.JPGВ настройках приложения будут защищенный ключ и сервисный ключ.

 

Тестирование Playwright


Тестирование Playwright

Начало

Официальный сайт проекта

Установка

python -m pip install playwright

Проверка установки

playwright --version

Установка драйверов для браузеров 

playwright install #Все браузеры
playwright install name #Только name браузеры
playwright install chromium #Chrome

Установка pytest 

python -m pip install pytest

Установка плагина pytest-playwright 

python -m pip install pytest-playwright

Описание 

Два режима работы: синхронный и асинхронный. Для синхронного: 

from playwright.sync_api import sync_playwright

Для асинхронного режима: 

import asyncio
from playwright.async_api import async_playwright

async def main():
    async with async_playwright() as playwright:
        browser = await playwright.chromium.launch()
        page = await browser.new_page()
        await page.goto("https://playwright.dev")
        print(await page.title())
        await browser.close()

asyncio.run(main())

Чаще используется синхронный режим.

Запуск и закрытие браузера: 

from playwright.sync_api import sync_playwright

with sync_playwright() as playwright:
    browser = playwright.chromium.launch(headless=False, slow_mo=500)
    page = browser.new_page()
    page.goto("https://playwright.dev/python")

    docs_button = page.get_by_role('link', name="Docs")
    docs_button.click()

    browser.close()

headless=False обозначает визуальное открытие, slow_mo задержка

Использование интерактивной консоли

Иногда для удобства можно использовать консоль python для ручного тестирования покомандного ввода. 

python
Python 3.13.1 (tags/v3.13.1:0671451, Dec  3 2024, 19:06:28) [MSC v.1942 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from playwright.sync_api import sync_playwright
>>> playwright = sync_playwright().start()
>>> browser = playwright.chromium.launch(headless=False, slow_mo=100)
>>> page = browser.new_page()
>>> browser.close()
>>> playwright.stop()

Если создать docs_button, у нее будет метод highlight() для визуальной подсветки найденного элемента.

Тестирование Playwright

Локаторы

Локаторы: способ поиска элементов на странице. Поэтому они являются методами page

В VSC Ctrl+Click по методу выводит код метода.

Локатор Описание
page.get_by_role('link', name="Docs")

Поиск элемента по роли name - текст

link <a>

heading  <h>

radio, checkbox, button

page.get_by_label("Email address") Для выделения элементов, у которых есть привязанная метка. Например 
<div>
  <label for="exampleInputEmail1" class="form-label mt-4">Email address</label>
  <input type="email" class="form-control" id="exampleInputEmail1" aria-describedby="emailHelp" placeholder="Enter email">
  <small id="emailHelp" class="form-text text-muted">We'll never share your email with anyone else.</small>
</div>

 

page.get_by_placeholder("Enter email") Поиск элементов по placeholder
page.get_by_text("Something", exact=False) Поиск по тексту. Exact=False ищет вхождение.
page.get_by_alt_text(text) Поиск по атрибуту alt у изображений
page.get_by_title(text) Атрибут title
page.locator(text)

Поиск по CSS. Можно использовать tagname, classname, id, attribute/value

Примеры^

css=h1

footer

<tagname>.<classname>  button.btn-outline-sucess

<tagname>#<idname>   button#BtnGroupDrop1

<tagname>[attribute]    input[readonly]

<tagname>[attribute=somevalue]    input[value='correct value']


Поиск по иерархии элементов.

Если через пробелы-то вложенные элементы. Если через точку - то у элемента несколько классов. Но они не обязательно непосредственно вложенные.

nav.bg-dark a.nav-link.active 

Для непосредственного вложения:

nav.bg-dark > a.nav-link.active 


Называются sudo классами,

Класс и текст в теге. Для вхождения:

h1:text('Navbars')

Для полного соответствия:

h1:text-is('Navbars')

div.dropdown-menu:visible

Для определения по номеру вхождения, когда их много

 :nth-match(button.btn-primary, 4)


XPath

Абсолютный путь:  xpath=/html/head/title

С любого начала: xpath=//h1/h2

С указанием атрибута xpath=//h1[ @id='navbars' ]


Функции XPath

Для поиска по тексту, точно: //h1[text()='Headling1']

Для поиска по тексту, содержит: //h1[contains(text(), 'Headling1')]

Для поиска по тексту, содержит: //h1[contains(@class, 'btn')]

Множественные условия

Поиск родительского элемента

page.get_by_label("Email address").locator("..")

Фильтрация

page.get_by_role("heading").filter(has_text="First")

По дочернему элементу

page.locator("div.form-group").filter(has=page.get_by_label("Password"))

Доступ к iframe

'''Test for auth module'''
import pytest
from playwright.sync_api import Browser, Page, expect

AUTH_URL = "https://wood.bobrobotirk.ru/auth"

@pytest.fixture
def page_and_auth(browser: Browser):
    context = browser.new_context(
        storage_state="playwright/.auth/vk.json"
        )
    page = context.new_page()
    yield page
    context.close()

def test_first(page_and_auth: Page):
    page_and_auth.goto(AUTH_URL)
    vkframe = page_and_auth.frame(url=lambda url: "id.vk.com" in url)

    if vkframe:
        authbutton = vkframe.get_by_role("button", name="Продолжить как")
        expect(authbutton, "Кнопка Продолжить как... отсутствует").to_be_visible(timeout=20000)
        authbutton.click()
        back_button = page_and_auth.get_by_role("button", name="Авторизация успешна")
        expect(back_button, "Сервис не произвел авторизацию").to_be_visible()
        back_button.click()
    else:
        assert False, "VK фрейм не найден."

Тестирование Playwright

Actions

Действие Описание
click()

Однократное нажатие. Опции:

button="left"

modifiers=["Shift", "Alt"] с зажатой кнопкой Shift

timeout=2_000 Задержка перед ошибкой. Обычно 30 сек.

force=True Ошибка сразу же если не найден.

dblclick()

Двойной щелчок. Опции как у click +:

delay=100 - задержка в миллисекундах

 

hover() Навести мышь на выбранный элемент
fill("my text") Заполнить поле ввода текстом my text. Аналогично Ctrl-V
clear() Очистить поле ввода
type("my text", delay=100) Имитация побуквенного ввода

check()

Еще: set_checked(True)

Выбор radiobutton, checkbox, switch

is_checked() для checkbox проверяет, выбран ли checkbox

uncheck()

убрать выбор

select_option("text")

Выбор опции из раскрывающегося списка. Но если отсутствует - будет Timeout Error. 

Если передать список - будет множественный выбор.


Для раскрытия Dropdown элемента: нажатие на него, выбор элемента и нажатие

set_input_files("")

Для элемента позволяющего загружать файлы, имя файла из директории, из которой запускается скрипт. Можно передать список.


Если по кнопке открывается меню выбора файла, то 

with page.expect_file_chooser() as fc_info:
    file_input.click() #до этого через локатор найден file_input
file_chooser = fc_info.value
file_chooser.set_files("first.txt")

 


press("KeyW")

press("Shift+KeyW")

press("Control+ArrowLeft")

Тестирование Playwright

События (Events)

События в page.goto

В переменной wait_until.

Можно считать время загрузки. 

from playwright.sync_api import sync_playwright
from time import perf_counter

with sync_playwright() as playwright:
    browser = playwright.chromium.launch(headless=False, slow_mo=500)
    page = browser.new_page()
    print('Page loading...')
    start = perf_counter()
    page.goto("https://playwright.dev/python", wait_until='load')
    delta = perf_counter() - start
    print(f'Page loaded in {delta} s.')
    
    browser.close()

События динамического контента (React, ...)

Находим динамический элемент, кликаем по нему и при помощи wait_for() ждем.

from playwright.sync_api import sync_playwright
from time import perf_counter

with sync_playwright() as playwright:
    browser = playwright.chromium.launch(headless=False, slow_mo=500)
    page = browser.new_page()
    page.goto("https://www.scrapethissite.com/pages/ajax-javascript/", timeout=60_000)
    mylink = page.get_by_role("link", name="2014")
    mylink.click()
    print('Loading movie...')
    start = perf_counter()
    loadedcont = page.locator("td.film-title").first
    loadedcont.wait_for()
    delta = perf_counter() - start
    print(f'Movie loaded in {delta} s.')
    
    browser.close()

Ожидание события.

Указываем тип события и функцию, выполняемую при наступлении события. 

from playwright.sync_api import sync_playwright

def onload(page):
    print("Page loaded", page)

with sync_playwright() as playwright:
    browser = playwright.chromium.launch(headless=False, slow_mo=500)
    page = browser.new_page()
    page.on("load", onload)
    page.goto("https://bootswatch.com/default")
    browser.close()

Пример просмотра событий запросов 

from playwright.sync_api import sync_playwright

def onrequest(request):
    print("Request send: ", request)

with sync_playwright() as playwright:
    browser = playwright.chromium.launch(headless=False, slow_mo=500)
    page = browser.new_page()
    page.on("request", onrequest)
    page.goto("https://bootswatch.com/default")
    browser.close()

Вывод: 

Request send:  <Request url='https://bootswatch.com/default' method='GET'>
Request send:  <Request url='http://bootswatch.com/default/' method='GET'>
Request send:  <Request url='https://bootswatch.com/default/' method='GET'>
Request send:  <Request url='https://bootswatch.com/_vendor/bootstrap/dist/css/bootstrap.css' method='GET'>
Request send:  <Request url='https://bootswatch.com/_vendor/bootstrap-icons/font/bootstrap-icons.min.css' method='GET'>
Request send:  <Request url='https://bootswatch.com/_vendor/prismjs/themes/prism-okaidia.css' method='GET'>
Request send:  <Request url='https://bootswatch.com/_assets/css/custom.min.css' method='GET'>
Request send:  <Request url='https://www.googletagmanager.com/gtag/js?id=G-KGDJBEFF3W' method='GET'>
Request send:  <Request url='https://cdn.carbonads.com/carbon.js?serve=CKYIE23N&placement=bootswatchcom' method='GET'>
Request send:  <Request url='https://bootswatch.com/_vendor/bootstrap/dist/js/bootstrap.bundle.min.js' method='GET'>
Request send:  <Request url='https://bootswatch.com/_vendor/prismjs/prism.js' method='GET'>
Request send:  <Request url='https://bootswatch.com/_assets/js/custom.js' method='GET'>
Request send:  <Request url='https://bootswatch.com/_vendor/bootstrap-icons/font/fonts/bootstrap-icons.woff2?1fa40e8900654d2863d011707b9fb6f2' method='GET'>
Request send:  <Request url='https://www.google-analytics.com/g/collect?v=2&tid=G-KGDJBEFF3W&gtm=45je54g0v9135688085za200&_p=1744912703288&gcd=13l3l3l3l1l1&npa=0&dma=0&tag_exp=102509682~102803279~102813109~102887800~102926062~103027016~103051953~103055465~103077950~103106314~103106316~103130495~103130497&cid=128797986.1744912704&ul=ru-ru&sr=1280x720&uaa=x86&uab=64&uafvl=Not%253AA-Brand%3B24.0.0.0%7CChromium%3B134.0.6998.35&uamb=0&uam=&uap=Windows&uapv=10.0.0&uaw=0&are=1&frm=0&pscdl=noapi&_s=1&sid=1744912703&sct=1&seg=0&dl=https%3A%2F%2Fbootswatch.com%2Fdefault%2F&dt=Bootswatch%3A%20Default&en=page_view&_fv=1&_nsi=1&_ss=1&_ee=1&tfd=2018' method='POST'>
Request send:  <Request url='https://srv.carbonads.net/ads/CKYIE23N.json?segment=placement:bootswatchcom&v=true' method='GET'>

Есть событие при выборе файла.

Удаление прослушивания события: page.remove_listener("name_event", func)

События всплывающих окон (alert, confirm, prompt)

Событие page.on("dialog", func), устанавливаем ожидание до возможного появления окна.

Функция: 

def on_dialog(dialog)
    dialog.accept()
    dialog.dismiss()

Если диалог prompt, то для заполнения поля ввода в функции accept нужно добавить строковую переменную. 

def on_dialog(dialog)
    dialog.accept("Text for enter")

Событие скачивания файла

from playwright.sync_api import sync_playwright

with sync_playwright() as playwright:
    browser = playwright.chromium.launch(headless=False, slow_mo=500)
    page = browser.new_page()
    page.goto("https://bootswatch.com/default")
    btn = page.get_by_role("link", name="Download me")
    with page.expect_download() as download_info:
        btn.click()
    download = download_info.value
    download.save_as(fname)
    browser.close()

Второй вариант: добавить listener 

from playwright.sync_api import sync_playwright

def saving_func(download):
    fname = "first.jpg"
    download.save_as(fname)

with sync_playwright() as playwright:
    browser = playwright.chromium.launch(headless=False, slow_mo=500)
    page = browser.new_page()
    page.goto("https://bootswatch.com/default")
    page.once("download", saving_func)
    btn = page.get_by_role("link", name="Download me")
    with page.expect_download() as download_info:
        btn.click()
    browser.close()

 

 

Тестирование Playwright

Аутентификация

При 2FA аутентификации возникают проблемы при повторном исполнении скрипта. Для обхода этого используют контекст браузера. 

Шаг 1. Сохранение контекста. 

from playwright.sync_api import sync_playwright

with sync_playwright() as playwright:
    browser = playwright.firefox.launch(headless=False, slow_mo=500)
    context = browser.new_context()
    page = context.new_page()
    page.goto("https://vk.ru")
    page.pause() # Откроется доп. окно. Проходим авторизацию, в доп.окне play
    context.storage_state(path="playwright/.auth/vk.json")
    context.close()

Шаг 2. Использование контекста 

from playwright.sync_api import sync_playwright

with sync_playwright() as playwright:
    browser = playwright.firefox.launch(headless=False, slow_mo=500)
    context = browser.new_context(storage_state="playwright/.auth/vk.json")
    page = context.new_page()
    page.goto("https://vk.ru")
    page.pause() #в реальных скриптах это убирается)
    context.close()

 

 

 

Тестирование Playwright

Pytest & Playwright

Pytest

Имена файлов тестов должны иметь префикс test_ или постфикс _test. Имена тестов должны иметь префикс test_

В модуле utils функция root, отнимающая 1 от входного параметра. Пример теста: 

import utils

def test_first():
    num24 = utils.root(25)
    assert num24 == 24

Запуск теста: 

pytest second_test.py

Ключ -v отображает расширенную информацию.
Ключ -s разрешает вывод данных из тестируемых функций. 

Запуск без указания имени файла исполняет все тесты.

Желательно определение типов в функциях.

Виды проверок

Проверка Описание
assert mynum == 32 Проверка на значение
assert type(dt) == dict Проверка типа
assert "timestamp" in dt_list Проверка присутствия в списке

Для вывода доп. информации: 

assert mynum == 32, "Число mynum должно быть равно 32"

Фикстуры:

Для упаковки повторяющихся действий. 

import json
import report
import pytest

@pytest.fixtures
def report_json():
    report.generate_report()
    with open("report.json") as file:
        return json.load(file)

def test_report_json(report_json):
    assert type(report_json) == dict

Фикстура исполняется каждый раз при вызове. Для однократного исполнения фикстуры нужно добавить scope="session" 

import json
import report
import pytest

@pytest.fixtures(scope="session")
def report_json():
    report.generate_report()
    with open("report.json") as file:
        return json.load(file)

def test_report_json(report_json):
    assert type(report_json) == dict

Виды scope

session Один раз в пределах сессии
function Каждый раз
module Если несколько тестовых файлов, то один раз в пределе запуска.

Pytest-playwright

Данный плагин упрощает работу с pytest. Встроенные фикстуры. Задачу создания и удаления playwright, browser берет на себя.

from playwright.sync_api import Page

def test_first(page: Page):
    page.goto("https://playwright.dev/python")
    link = page.get_by_role("link", name="GET STARTED")
    link.click()
    assert page.url == "https://playwright.dev/python/docs/intro"

Настройка pytest

Через консоль:

Через конфигурационный файл pytest.ini

[pytest]
addopts = --headed --slowmo=500 --browser=firefox

Фикстуры

Добавить фикстуру 

import pytest
from playwright.sync_api import Page

@pytest.fixture
def load_testpage(page: Page):
    page.goto("https://playwright.dev/python")
    return page

def test_first(load_testpage: Page):
    link = load_testpage.get_by_role("link", name="GET STARTED")
    link.click()
    assert load_testpage.url == "https://playwright.dev/python/docs/intro"

Автоматическое исполнение фикстуры перед каждой функцией:

import pytest
from playwright.sync_api import Page

@pytest.fixture(autouse=True)
def load_testpage(page: Page):
    page.goto("https://playwright.dev/python")
    return page

def test_first(page: Page):
    link = page.get_by_role("link", name="GET STARTED")
    link.click()
    assert page.url == "https://playwright.dev/python/docs/intro"

Пред и пост исполнение кода:

import pytest
from playwright.sync_api import Page

@pytest.fixture(autouse=True)
def load_testpage(page: Page):
    page.goto("https://playwright.dev/python")
    yield page
    page.goto("https://yandex.ru")

def test_first(page: Page):
    link = page.get_by_role("link", name="GET STARTED")
    link.click()
    assert page.url == "https://playwright.dev/python/docs/intro"

Пример проверки авторизации ВК 

'''Test for auth module'''
import pytest
from playwright.sync_api import Browser, Page, expect

AUTH_URL = "https://wood.bobrobotirk.ru/auth"

@pytest.fixture
def page_and_auth(browser: Browser):
    context = browser.new_context(
        storage_state="playwright/.auth/vk.json"
        )
    #page = context.new_page()
    yield context
    context.close()

def get_cookie(all_cookies, cname):
    val = next((cookie['value'] for cookie in all_cookies if cookie.get('name') == cname), None)
    return val

def test_first(page_and_auth: Browser):
    page = page_and_auth.new_page()
    page.goto(AUTH_URL)
    vkframe = page.frame(url=lambda url: "id.vk.com" in url)

    if vkframe:
        authbutton = vkframe.get_by_role("button", name="Продолжить как")
        expect(authbutton, "Кнопка Продолжить как... отсутствует").to_be_visible(timeout=20000)
        authbutton.click()
        back_button = page.get_by_role("button", name="Авторизация успешна")
        expect(back_button, "Сервис не произвел авторизацию").to_be_visible()
        back_button.click()
        all_cookies = page_and_auth.cookies()
        session_id_value = get_cookie(all_cookies, 'session_id')
        assert session_id_value, "Cookie session_id не установлен."
    else:
        assert False, "VK фрейм не найден."

Тестирование Playwright

Дополнительные возможности

Скриншоты

Скрин страницы

page.screenshot(path="", full_page=True)

Скрин элемента тоже работает.

link.screenshot(path="")

Запись видео

from playwright.sync_api import Browser

def test_first(browser: Browser):
    context = browser.new_context(
        storage_state="playwright/.auth/vk.json",
        record_video_dir="video/"
        )
    page = context.new_page()
    page.goto("https://playwright.dev/python")
    link = page.get_by_role("link", name="GET STARTED")
    link.click()
    assert page.url == "https://playwright.dev/python/docs/intro"
    page.goto("https://vk.ru")

Вариант с текстурой: 

import pytest
from playwright.sync_api import Browser, Page

@pytest.fixture
def recordable(browser: Browser):
    context = browser.new_context(
        storage_state="playwright/.auth/vk.json",
        record_video_dir="video/"
        )
    page = context.new_page()
    yield page
    context.close()

def test_first(recordable: Page):
    recordable.goto("https://playwright.dev/python")
    link = recordable.get_by_role("link", name="GET STARTED")
    link.click()
    assert recordable.url == "https://playwright.dev/python/docs/intro"
    recordable.goto("https://vk.ru")

Трассировка данных

Создание трассировки: 

import pytest
from playwright.sync_api import Page, BrowserContext

@pytest.fixture(autouse=True)
def trace_test(context: BrowserContext):
    context.tracing.start(
        name="playwrite",
        screenshots=True,
        snapshots=True,
        sources=True
    )
    yield 
    context.tracing.stop(path="trace.zip")

def test_first(page: Page):
    page.goto("https://playwright.dev/python")
    link = page.get_by_role("link", name="GET STARTED")
    link.click()
    assert page.url == "https://playwright.dev/python/docs/intro"

Просмотр трассировки 

playwright show-trace trace.zip

Запись действий

Старт записи 

playwright codegen playwright.dev

В окне кода есть поле Target, позволяет переключать тип библиотек (playwright sync/async, pytest). Потом сохраняем полученную последовательность и устанавливаем нужные условия для проверки.

 

 

 

 

 

 

Тестирование Playwright

Ожидание

 

from playwright.sync_api import Page, expect

DOCS_URL = "https://playwright.dev/python/docs/intro"

def test_first(page: Page):
    page.goto("https://playwright.dev/python")
    link = page.get_by_role("link", name="GET STARTED")
    link.click()
    #assert page.url == DOCS_URL
    expect(page).to_have_url(DOCS_URL)
expect(page).to_have_url наличие url
expect(page).to_have_title наличие title
link = page.get_by_role("link", name="GET STARTEDer")
expect(link).to_be_visible()
Видимость элемента в переменной link
expect(link).to_be_enabled()
Доступный элемент
expect(heading).to_contain_text()
Присутствие текста (часть)
expect(heading).to_have_text()
Присутствие текста (полное совпадение)
expect(mylink).to_have_class()

Наличие класса у элемента Несколько классов: "class1 class2"

Должно быть полное соответствие. Но можно использовать регулярки. 

expect(mylink).to_have_class(
    re.compile(r"navbar__link")
    )
expect(mylink).to_have_id()

Наличие id

expect(mylink).to_have_attribute(attr_name, attr_value)

Наличие атрибута. При необходимости можно указать значение атрибута.

expect(mylink).to_be_editable()


expect(mylink).to_be_empty()


expect(mycheckbox).to_be_checked()


expect(mymenu).to_have_value()

Элемент в меню выбора

expect(mymultimenu).to_have_values([])

Несколько выбранных 

not_ - префикс отрицания

Asyncio

Asyncio

Сопрограммы

ЭТО ТЕХНОЛОГИЯ УСКОРЕНИЯ РАБОТЫ В ОДНОМ ПОТОКЕ. ДЛЯ ПАРАЛЛЕЛЬНЫХ ВЫЧИСЛЕНИЙ THREADING,

Сопрограммы - функции с возможностью приостановки при длительной внешней операции. Async определяет сопрограмму, await приостанавливает сопрограмму на время выполнения внешней задачи. При вызове сопрограммы она напрямую не выполняется. Для старта нужна точка входа в асинхронные вычисления.

Пример: правильное использование. 

import asyncio

async def corutine_add_one(a: int) -> int:
    return b + 1

corutine_res = asyncio.run(corutine_add_one(5))
print(type(corutine_res), '  ', corutine_res)

Однако если просто запустить корутину, то результат будет интересным: 

import asyncio

async def corutine_add_one(a: int) -> int:
    return b + 1

corutine_res = corutine_add_one(5)
print(type(corutine_res), '  ', corutine_res)

Вывод: 

<class 'coroutine'>    <coroutine object corutine_add_one at 0x0000026850FD4270>

Существует цикл событий. При постановке корутины на паузу задача передается следующей корутине. Однако такой код выполняется в виде простого синхронного кода: 

import asyncio

async def sleep_10(x: int) -> int:
    await asyncio.sleep(10)
    return x+10

async def main():
    x1 = await sleep_10(1)
    x2 = await sleep_10(1)
    print(x1, '  ', x2)

asyncio.run(main())

Задачи

Для приближения к параллельным вычислениям необходимо использовать задачи. 

import asyncio
from util import delay

async def main():
    sleep1 = asyncio.create_task(delay(10))
    sleep2 = asyncio.create_task(delay(10))
    await sleep1
    await sleep2

asyncio.run(main())

К тому же, если убрать await sleep2, то вторая задача все равно выполнится, т.к. вроде все задачи из очереди запускаются. Это так, вот только не говорится о том, завершаются они или нет. Вот пример кода: 

async def delay(delay_seconds: int) -> int:
    print(f'Засыпаю на {delay_seconds} секунд.')
    await asyncio.sleep(delay_seconds)
    print(f'сон в течение {delay_seconds} закончился.')
    return delay_seconds

async def delay_with_inner(delay_seconds: int) -> int:
    print(f'Засыпаю на {delay_seconds} секунд.')
    t1 = asyncio.create_task(delay(15))
    await asyncio.sleep(delay_seconds)
    #await t1 
    print(f'сон в течение {delay_seconds} закончился.')
    return delay_seconds

async def main():
    sleep1 = asyncio.create_task(delay_with_inner(10))
    sleep2 = asyncio.create_task(delay_with_inner(10))
    sleep3 = asyncio.create_task(delay_with_inner(10))
    await sleep1
    await sleep2
    await sleep3

asyncio.run(main())

Если внешнюю мы запускаем на 10 секунд, то внутренняя (t1) тоже запускается - т.к. находится в очереди задач. Но вот только программа завершается до завершения t1.

Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
сон в течение 10 закончился.
сон в течение 10 закончился.
сон в течение 10 закончился.

И все! А если раскомментировать строку с ожиданием t1, то тогда сначала завершится t1 

Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
сон в течение 15 закончился.
сон в течение 15 закончился.
сон в течение 15 закончился.
сон в течение 10 закончился.
сон в течение 10 закончился.
сон в течение 10 закончился.

А для такого кода будет создано 3 файла. Но если вместо sleep3 установить sleep1, то все задачи стартуют, но будет создан только файл 5.txt.

import asyncio
from util import delay_with_inner, delay, delay_and_writefile
from time import sleep

async def main():
    sleep1 = asyncio.create_task(delay_and_writefile(5))
    sleep2 = asyncio.create_task(delay_and_writefile(10))
    sleep3 = asyncio.create_task(delay_and_writefile(11))
    await sleep3

asyncio.run(main())

Исходя из этого, важна не последовательность помещения в задачи, а последовательность при обращении (момент await).

Снятие задач и тайм-ауты

import asyncio
from asyncio import CancelledError
from util import delay
async def main():
    long_task = asyncio.create_task(delay(10))
    seconds_elapsed = 0
    while not long_task.done():
        print('Задача не закончилась, следующая проверка через секунду.')
        await asyncio.sleep(1)
        seconds_elapsed = seconds_elapsed + 1
        if seconds_elapsed == 5:
            long_task.cancel()
    try:
        await long_task
    except CancelledError:
        print('Наша задача была снята')
asyncio.run(main())

 Тайм-аут: 

import asyncio
from util import delay
async def main():
    delay_task = asyncio.create_task(delay(2))
    try:
        result = await asyncio.wait_for(delay_task, timeout=1)
        print(result)
    except asyncio.exceptions.TimeoutError:
        print('Тайм-аут!')
        print(f'Задача была снята? {delay_task.cancelled()}')
asyncio.run(main())

При помощи shield можно игнорировать запросы на снятие: 

import asyncio
from util import delay
async def main():
    task = asyncio.create_task(delay(10))
    try:
    result = await asyncio.wait_for(asyncio.shield(task), 5)
        print(result)
    except TimeoutError:
        print("Задача заняла более 5 с, скоро она закончится!")
        result = await task
        print(result)
asyncio.run(main())

Список задач: 

for task in asyncio.tasks.all_tasks():
    print(task)

Множественные task в цикле.

Проблема: при создании в цикле при возникновении исключения в одной задаче, остальные задачи могут не завершить выполнение. 

Функция gather

import asyncio
import aiohttp
from aiohttp import ClientSession
from chapter_04 import fetch_status

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://example.com' for _ in range(1000)]
        requests = [fetch_status(session, url) for url in urls]
        status_codes = await asyncio.gather(*requests)
        print(status_codes)
asyncio.run(main())

Gather возвращает статусы в порядке передаче объектов, независимо от времени исполнения. Возвращает все данные одновременно.

Обработка исключений - необязательный bool параметр return_exceptions. 

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://example.com', 'python://example.com']
        tasks = [fetch_status_code(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        exceptions = [res for res in results if isinstance(res, Exception)]
        successful_results = [res for res in results if not isinstance(res, Exception)]
        print(f'Все результаты: {results}')
        print(f'Завершились успешно: {successful_results}')
        print(f'Завершились с исключением: {exceptions}')

Функция as_completed

Начинает возвращать данные по мере поступления. Однако порядок не определен.  

import asyncio
import aiohttp
from aiohttp import ClientSession
from chapter_04 import fetch_status

async def main():
    async with aiohttp.ClientSession() as session:
        fetchers = [fetch_status(session, 'https://www.example.com', 1),
                    fetch_status(session, 'https://www.example.com', 1),
                    fetch_status(session, 'https://www.example.com', 10)]
        for finished_task in asyncio.as_completed(fetchers):
            print(await finished_task)
asyncio.run(main())

asyncio.as_completed(fetchers, timeout=2) задает тайм-аут. Но созданные задачи продолжают работать в фоновом режиме.

Точный контроль над функциями

Используется wait: 

async def main():
    async with aiohttp.ClientSession() as session:
    fetchers = \
        [asyncio.create_task(fetch_status(session, 'https://example.com')),
        asyncio.create_task(fetch_status(session, 'https://example.com'))]
    done, pending = await asyncio.wait(fetchers)
    print(f'Число завершившихся задач: {len(done)}')
    print(f'Число ожидающих задач: {len(pending)}')
    for done_task in done:
        result = await done_task
        print(result)

Сигнатура  wait –  список  допускающих  ожидание  объектов, за которым следует факультативный тайм-аут и факультативный 
параметр return_when, который может принимать значения ALL_COMPLETED, FIRST_EXCEPTION и FIRST_COMPLETED, а по умолчанию равен ALL_COMPLETED.

wait не возбуждает исключений, исключения содержатся в методе done_task.exception()

Пример обработки исключений 

import asyncio
import logging

async def main():
    async with aiohttp.ClientSession() as session:
        good_request = fetch_status(session, 'https://www.example.com')
        bad_request = fetch_status(session, 'python://bad')
        fetchers = [asyncio.create_task(good_request),
                    asyncio.create_task(bad_request)]
        done, pending = await asyncio.wait(fetchers)
        print(f'Число завершившихся задач: {len(done)}')
        print(f'Число ожидающих задач: {len(pending)}')
        for done_task in done:
            # result = await done_task возбудит исключение
            if done_task.exception() is None:
                print(done_task.result())
            else:
                logging.error("При выполнении запроса возникло исключение",
                              exc_info=done_task.exception())
asyncio.run(main())


 

 

Asyncio

Типы future и awaitable

Практически редко применяются, но нужны для понимания 

Будущие объекты можно использовать в выражениях await. Это означает «я посплю, пока в будущем объекте не будет установлено значение, с которым я могу работать, а когда оно появится, разбуди меня и дай возможность его обработать». 

from asyncio import Future
my_future = Future()
print(f'my_future готов? {my_future.done()}')
my_future.set_result(42)
print(f'my_future готов? {my_future.done()}')
print(f'Какой результат хранится в my_future? {my_future.result()}')

Вариант для использования не до конца определенной переменной: 

from asyncio import Future
import asyncio
def make_request() -> Future:
    future = Future()
    asyncio.create_task(set_future_value(future))
    return future
async def set_future_value(future) -> None:
    await asyncio.sleep(1)
    future.set_result(42)
async def main():
    future = make_request()
    print(f'Будущий объект готов? {future.done()}')
    value = await future
    print(f'Будущий объект готов? {future.done()}')
    print(value)
asyncio.run(main())

Любой объект, который реализует метод __await__, можно использовать в выражении await. Это объекты, допускающие ожидание (Awaitaible).

 

 

Asyncio

Асинхронный контекстный менеджер

Асинхронный контекстный менеджер.

Это  класс,  реализующий два  специальных  метода-сопрограммы:  __aenter__,  который  асинхронно захватывает ресурс, и __aexit__, который закрывает ресурс. Сопрограмма  __aexit__  принимает  несколько  аргументов,  относящихся к обработке исключений. Пример для сокетов:

import asyncio
import socket
from types import TracebackType
from typing import Optional, Type
class ConnectedSocket:
    def __init__(self, server_socket):
        self._connection = None
        self._server_socket = server_socket
    async def __aenter__(self):
        print('Вход в контекстный менеджер, ожидание подключения')
        loop = asyncio.get_event_loop()
        connection, address = await loop.sock_accept(self._server_socket)
        self._connection = connection
        print('Подключение подтверждено')
        return self._connection
    async def __aexit__(self,
                        exc_type: Optional[Type[BaseException]],
                        exc_val: Optional[BaseException],
                        exc_tb: Optional[TracebackType]):
        print('Выход из контекстного менеджера')
        self._connection.close()
        print('Подключение закрыто')
async def main():
    loop = asyncio.get_event_loop()
    server_socket = socket.socket()
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_address = ('127.0.0.1', 8000)
    server_socket.setblocking(False)
    server_socket.bind(server_address)
    server_socket.listen()
    async with ConnectedSocket(server_socket) as connection:
        data = await loop.sock_recv(connection, 1024)
        print(data)
asyncio.run(main())

Asyncio

Aiohttp

Сеансовый асинхронный http(s) клиент с автоматической поддержкой cookies. Пул подключений использует один сеанс. 

Установка: 

pip install aiohttp

Использование: 

import asyncio
import aiohttp
from aiohttp import ClientSession
from util import async_timed

async def fetch_status(session: ClientSession, url: str) -> int:
    async with session.get(url) as result:
        return result.status

async def main():
    async with aiohttp.ClientSession() as session:
        url = 'https://www.example.com'
        status = await fetch_status(session, url)
        print(f'Состояние для {url} было равно {status}')
asyncio.run(main())

По умолчанию в сеансе не более 100 подключений. Для увеличения можно создать экземпляр класса TCPConnector, входящего в состав aiohttp, указав максимальное число подключений, и передать его конструктору ClientSession. Подробнее в документации aiohttp.

Тайм-аут:

По умолчанию тайм-аут запроса 5 минут. Можно устанавливать на уровне сеанса или запроса. 

import asyncio
import aiohttp
from aiohttp import ClientSession
async def fetch_status(session: ClientSession, url: str) -> int:
    ten_millis = aiohttp.ClientTimeout(total=.01)
    async with session.get(url, timeout=ten_millis) as result:
        return result.status

async def main():
    session_timeout = aiohttp.ClientTimeout(total=1, connect=.1)
    async with aiohttp.ClientSession(timeout=session_timeout) as session:
        await fetch_status(session, 'https://example.com')
asyncio.run(main())

В этом случае полный тайм-аут 1 секунда, для установки соединения - 100мс. В функции fetch_status переопределяется в 10мс.

Множественные запросы

import asyncio
import aiohttp
from aiohttp import ClientSession
from chapter_04 import fetch_status

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://example.com' for _ in range(1000)]
        requests = [fetch_status(session, url) for url in urls]
        status_codes = await asyncio.gather(*requests)
        print(status_codes)
asyncio.run(main())

Обработка ошибок

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://example.com', 'python://example.com']
        tasks = [fetch_status_code(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        exceptions = [res for res in results if isinstance(res, Exception)]
        successful_results = [res for res in results if not isinstance(res, Exception)]
        print(f'Все результаты: {results}')
        print(f'Завершились успешно: {successful_results}')
        print(f'Завершились с исключением: {exceptions}')