Python
- FastApi
- SQLAlchemy
- Alembic
- Модули
- Rabbitmq
- Авторизация через ВК
- Тестирование Playwright
- Начало
- Локаторы
- Actions
- События (Events)
- Аутентификация
- Pytest & Playwright
- Дополнительные возможности
- Ожидание
- Asyncio
FastApi
Общие команды
Установка
pip install fastapi uvicorn
Ручной запуск (api - имя файла, app - имя объекта FastApi)
uvicorn api:app --port 8000 --reload
Запуск uvicorn из python скрипта
Файл main.py
from uvicorn import run
...
app = FastAPI()
...
if __name__ == '__main__':
run(app="main:app", host='0.0.0.0', port=8000, workers=4, log_level='warning')
#run(app="main:app", host='0.0.0.0', port=8000, reload=True)
curl запросы
curl -X 'GET' 'http://127.0.0.1:8000/todo' -H 'accept: application/json'
curl -X 'POST' \
'http://127.0.0.1:8000/todo' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"id": 1,
"item": "First Todo is to finish this book!"
}'
Автоматическая документация
Swagger
http://ip:port/docs
Redoc
http://ip:port/redoc
Шаблоны Jinja
Поддерживает шаблоны Jinja при выводе данных (вплоть до циклов).
Маршрутизация
Добавление маршрутов
Основной файл:
from fastapi import FastAPI
from todo import todo_router
app = FastAPI()
@app.get("/")
async def welcome() -> dict:
return {
"message": "Hello World"
}
app.include_router(todo_router)
Файл дополнительных маршрутов
from fastapi import APIRouter
todo_router = APIRouter()
todo_list = []
@todo_router.post("/todo")
async def add_todo(todo: dict) -> dict:
todo_list.append(todo)
return {"message": "Todo added successfully"}
@todo_router.get("/todo")
async def retrieve_todos() -> dict:
return {"todos": todo_list}
Автоматическое добавление маршрутов в основной файл app из файлов в директории data/plugins имеющих шаблон имени объекта APIRouter modulename_router
fpath = os.path.join('data', 'plugins')
flist = os.listdir(fpath)
sys.path.insert(0, fpath)
for fname in flist:
if fname not in ['__pycache__', '__init__.py']:
m = os.path.splitext(fname)[0]
impmod = importlib.import_module(m)
router_name = f'{m}_router'
if router_name in dir(impmod):
router_mod = getattr(impmod, router_name)
app.include_router(router_mod)
Получаемые параметры
Параметры пути:
from fastapi import Path
@todo_router.get("/todo/{todo_id}")
async def get_single_todo(todo_id: int = Path(..., title="The ID of the todo to retrieve.")) -> dict:
for todo in todo_list:
if todo.id == todo_id:
return {"todo": todo}
return {
"message": "Todo with supplied ID doesn't exist."
}
В Path ... - параметр пути обязательный, None - не обязательный
Параметры запроса (после ? в запросе):
async query_route(query: str = Query(None):
return query
Передаваемые параметры
При помощи response_model можно фильтровать отдаваемые данные. Т е можно в отдаваемой модели указать неполный набор.
from typing import List
class TodoItem(BaseModel):
item: str
class TodoItems(BaseModel):
todos: List[TodoItem]
@todo_router.get("/todo", response_model=TodoItems)
async def retrieve_todo() -> dict:
return {
"todos": todo_list
}
У содержащихся в списке словарей будет оставлен только item
Исключения
Класс HTTPException принимает три аргумента:
- status_code: Код состояния, который будет возвращен для этого сбоя
- detail: Сопроводительное сообщение для отправки клиенту
- headers: Необязательный параметр для ответов, требующих заголовков
from fastapi import APIRouter, Path, HTTPException, status
@todo_router.get("/todo/{todo_id}")
async def get_single_todo(todo_id: int = Path(..., title="The ID of the todo to retrieve.")) -> dict:
for todo in todo_list:
if todo.id == todo_id:
return { "todo": todo }
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Todo with supplied ID doesn't exist",
)
Можно переопределить код успешного возврата в декораторе
@todo_router.post("/todo", status_code=201)
async def add_todo(todo: Todo) -> dict:
todo_list.append(todo)
return { "message": "Todo added successfully." }
Pydantic
class Item(BaseModel):
item: str
status: str
class Todo(BaseModel):
id: int
item: Item
Jinja2
{# This is a great API book! #} комментарии
Способ | Описание |
{% include %} | Позволяет включить содержимое другого шаблона целиком.
|
{% extends %} + {% block %} |
Наследование шаблонов и переопределение блоков. Базовый шаблон
Дочерний шаблон
|
{{ variable | filter_name(*args) }}
Название | Описание |
default(strdefault) |
Замена вывода переданного значения, если оно оказывается None
|
escape | Отображение необработанного вывода HTML |
striptags | Удаление HTML тетов перед отправкой |
int float |
Преобразование типов перед ответом
|
join(whitespace) | Объединение элементов списка в строку
|
length | Длина переданного объекта
|
{% if user %}
Hello, {{ user.name }}!
{% else %}
Hello, Unknown!
{% endif %}
{% for comment in comments %}
<b>{{ comment }}</b>
{% endfor %}
Переменная | Описание |
loop.index | Текущее значение итерации (1 - первая итерация) |
loop.index0 | Текущее значение итерации (0 - первая итерация) |
loop.revindex loop.revindex0 | Кол-во оставшихся итераций |
loop.first | True если первая итерация |
loop.last | |
loop.length | |
loop.pervitem loop.nextitem | Значение предыдущей/следующей итерации (пусто если не существует) |
Авторизация и аутентификация
Ссылки:
Подготовка проекта
python -m venv --system-site-packages env
python -m pip install fastapi uvicorn
SQLAlchemy
Sqlalchemy
pip install sqlalchemy
pip install psycopg2
pip install psycopg2-binary
pip install pymysql
from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg2://username:password@localhost:5432/mydb')
engine = create_engine('mysql+pymysql://cookiem:chip@mysql01.com/cookies', pool_recycle=3600)
engine = create_engine('sqlite:///cookies.db')
engine2 = create_engine('sqlite:///:memory:')
echo | булево. Лог запросов. По-умолчанию False. |
encoding | строка. По-умолчанию utf-8 |
isolation_level | уровень изоляции |
pool_recycle | число секунд для переподключения, желательно выставлять 3600. При работе с mysql соединение может быть активным до 4 часов. |
Создание engine не создает фактического соединения с БД.
connection = engine.connect()
Сырой запрос:
result = connection.execute("select * from orders").fetchall()
- Может получать информацию о существующих сейчас сущностях в БД
- Может хранить шаблоны именования индексов и ограничений (constraint). Т е перед началом проекта добавляется настройка именования, затем при создании/удалении все ок. Это словарь ограничение:шаблон
Ограничение или индекс | Описание |
ix | обычный индекс |
uq | уникальный индекс |
ck | ограничение проверки |
fk | foreign-ключ |
pk | primary-ключ |
from sqlalchemy import MetaData
convention = {
'all_column_names': lambda constraint, table: '_'.join([
column.name for column in constraint.columns.values()
]),
'ix': 'ix__%(table_name)s__%(all_column_names)s',
'uq': 'uq__%(table_name)s__%(all_column_names)s',
'ck': 'ck__%(table_name)s__%(all_column_names)s',
'fk': ('fk__%(table_name)s__%(all_column_names)s' '%(referred_table_name)s'),
'pk': 'pk__%(table_name)s'
}
metadata_obj = MetaData(naming_convention=convention)
- Должен быть инициализирован до обращения к нему в таблицах
- Изначально пустой объект, можно или вручную занести данные, или получить из базы.
- Получение информации об одной таблице по имени
- Нельзя получить одновременно две таблицы. Либо одна, либо вся база
- Нельзя (и одна таблица, и база) получить ограничения (CONSTRAINT), комментарии, триггеры, значения по умолчанию. Но можно вручную добавить данные. Но похоже проще импортировать описание.
from sqlalchemy import ForeignKeyConstraint
album.append_constraint(ForeignKeyConstraint(['ArtistId'], ['artist.ArtistId']))
-
- при получении базы, имена таблиц с большой и маленькой буквы. Т е удваивается количество объектов.
engine = create_engine(...)
metadata = MetaData()
cookie_tbl = Table('cookies', metadata, autoload_with=engine)
s = select(cookie_tbl)
with engine.connect() as conn:
m = conn.execute(s)
print(m.keys())
- Получение информации обо всей базе
from sqlalchemy import create_engine, MetaData
engine = create_engine(...)
metadata = MetaData()
metadata.reflect(bind=engine)
for table in metadata.sorted_tables:
print(table.name)
#Потом получить объект таблицы:
mytable = metadata.tables['mytable']
- Получение информации обо всей базе через ORM + Automap
from sqlalchemy.ext.automap import automap_base
from sqlalchemy import create_engine
Base = automap_base()
engine = create_engine('sqlite:///Chinook_Sqlite.sqlite')
Base.prepare(engine, reflect=True)
# данные о классах загружены.
#Например для получения списка классов:
Base.classes.keys()
Artist = Base.classes.Artist # создание классов
#Внешние связи - в свойстве <related_object>_collection
artist = session.query(Artist).first()
for album in artist.album_collection:
print('{} - {}'.format(artist.Name, album.Title))
Dialect: Скрывает детали реализации в конкретной базе
Core: SQL в чистом виде
ORM: абстракции
ins = cookies.insert().values(
cookie_name="chocolate chip",
cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
cookie_sku="CC01",
quantity="12",
unit_cost="0.50"
)
result = connection.execute(ins)
print(result.inserted_primary_key)
from sqlalchemy import insert
ins = insert(cookies).values(
cookie_name="chocolate chip",
cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
cookie_sku="CC01",
quantity="12",
unit_cost="0.50"
)
ins = cookies.insert()
result = connection.execute(
ins,
cookie_name='dark chocolate chip',
cookie_recipe_url='http://some.aweso.me/cookie/recipe_dark.html',
cookie_sku='CC02',
quantity='1',
unit_cost='0.75'
)
result.inserted_primary_key
inventory_list = [
{
'cookie_name': 'peanut butter',
'cookie_recipe_url': 'http://some.aweso.me/cookie/peanut.html',
'cookie_sku': 'PB01',
'quantity': '24',
'unit_cost': '0.25'
},
{
'cookie_name': 'oatmeal raisin',
'cookie_recipe_url': 'http://some.okay.me/cookie/raisin.html',
'cookie_sku': 'EWW01',
'quantity': '100',
'unit_cost': '1.00'
}
]
result = connection.execute(ins, inventory_list)
from sqlalchemy.sql import select
s = select([cookies])
rp = connection.execute(s)
results = rp.fetchall()
s = cookies.select()
rp = connection.execute(s)
results = rp.fetchall()
s = cookies.select()
rp = connection.execute(s)
for record in rp:
print(record.cookie_name)
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
s = s.order_by(desc(cookies.c.quantity))
rp = connection.execute(s)
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
s = s.limit(2)
rp = connection.execute(s)
from sqlalchemy.sql import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())
s = select([func.count(cookies.c.cookie_name)])
rp = connection.execute(s)
record = rp.first()
print(record.keys()) # ключи могут быть разные.
print(record.count_1)
s = select([func.count(cookies.c.cookie_name).label('inventory_count')])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.inventory_count)
s = select([cookies]).where(cookies.c.cookie_name == 'chocolate chip')
rp = connection.execute(s)
- between(cleft, cright) Значение столбца между cleft и cright
- concat(column_two) Объединение column и column_two
- distinct() Находит только уникальные значения в столбце
- in_([list]) Только если значения столбца в списке
- is_(None) Проверка на пустые значения
- contains(string) Значение столбца содержит строку
- endswith(string) Оканчивается строкой, зависит от больших букв
- like(string) зависит от больших букв
- startswith(string) зависит от больших букв
- ilike(string) не зависит от больших букв
- Есть отрицательные модификаторы not<method>, исключение - метод isnot()
s = select([cookies]).where(cookies.c.cookie_name.like('%chocolate%'))
rp = connection.execute(s)
for record in rp.fetchall():
print(record.cookie_name)
s = select([cookies.c.cookie_name, 'SKU-' + cookies.c.cookie_sku]) - добавит строку 'SKU-'
from sqlalchemy import cast
s = select([cookies.c.cookie_name,
cast((cookies.c.quantity * cookies.c.unit_cost),
Numeric(12,2)).label('inv_cost')])
for row in connection.execute(s):
print('{} - {}'.format(row.cookie_name, row.inv_cost))
from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
and_(
cookies.c.quantity > 23,
cookies.c.unit_cost < 0.40
)
)
first_row = results[0]
first_row[1]
first_row.cookie_name
first_row[cookies.c.cookie_name]
- first() - Первая запись и закрытие соединения
- fetchone() - Одна запись и оставляет открытый курсор для последующих запросов
- scalar() - Одно значение если запрос возвращает одно значение в одной строке
- rp.keys() - список столбцов
from sqlalchemy import update
u = update(cookies).where(cookies.c.cookie_name == "chocolate chip")
u = u.values(quantity=(cookies.c.quantity + 120))
result = connection.execute(u)
from sqlalchemy import delete
u = delete(cookies).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(u)
columns = [orders.c.order_id, users.c.username, users.c.phone,
cookies.c.cookie_name, line_items.c.quantity,
line_items.c.extended_cost]
cookiemon_orders = select(columns)
cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join(
line_items).join(cookies)).where(users.c.username ==
'cookiemon')
result = connection.execute(cookiemon_orders).fetchall()
columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
def get_orders_by_customer(cust_name, shipped=None, details=False):
columns = [orders.c.order_id, users.c.username, users.c.phone]
joins = users.join(orders)
if details:
columns.extend([cookies.c.cookie_name, line_items.c.quantity,
line_items.c.extended_cost])
joins = joins.join(line_items).join(cookies)
cust_orders = select(columns)
cust_orders = cust_orders.select_from(joins)
cust_orders = cust_orders.where(users.c.username == cust_name)
if shipped is not None:
cust_orders = cust_orders.where(orders.c.shipped == shipped)
result = connection.execute(cust_orders).fetchall()
return result
get_orders_by_customer('cakeeater')
get_orders_by_customer('cakeeater', details=True)
get_orders_by_customer('cakeeater', shipped=True)
get_orders_by_customer('cakeeater', shipped=False)
get_orders_by_customer('cakeeater', shipped=False, details=True)
ORM режим
Таблица это класс с требованиями:
- Потомок объекта, возвращаемого функцией declarative_base
- Включает __tablename__ с именем таблицы
- Включает 1+ атрибутов, являющихся объектом Column
- При определении не включает имя столбца в конструкторе Column, имя столбца = имя атрибута
- 1+ атрибутов определяют первичный ключ
- __table_args__ свойства таблицы (ограничения,...)
from sqlalchemy import Table, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Cookie(Base):
__tablename__ = 'cookies'
__table_args__ = (CheckConstraint('quantity >= 0', name='quantity_positive'),)
cookie_id = Column(Integer(), primary_key=True)
cookie_name = Column(String(50), index=True)
quantity = Column(Integer())
Создание таблиц
from sqlalchemy import create_engine
from dataclasses import Base
engine = create_engine(...)
Base.metadata.create_all(engine)
Ограничения
__table_args__ = (ForeignKeyConstraint(['id'], ['other_table.id']), CheckConstraint(unit_cost >= 0.00', name='unit_cost_positive'))
Внешние связи
- Определяется столбец с ForeignKey
- Определяется дополнительный атрибут с relationship и необязательным backref
- При определении backref, relationship будет определен в указанном классе с указанным именем.
Один к одному:
cookie = relationship("Cookie", uselist=False)
Один ко многим:
user = relationship("User", backref=backref('orders'))
На себя (дерево) - неоднозначное решение.
class Employee(Base):
__tablename__ = 'employees'
id = Column(Integer(), primary_key=True)
manager_id = Column(Integer(), ForeignKey('employees.id'))
name = Column(String(255), nullable=False)
manager = relationship("Employee", backref=backref('reports'), remote_side=[id])
Сессии
В ORM режиме, сессия упаковывает
- соединение с БД через engine и предоставляет словарь объектов, загруженных через сессию или ассоциированных с сессией
- транзакции, которые открыты до коммита сессии
Это похожая на хэш-систему, состоящую из списка объектов, таблиц и ключей. Сессия создается через sessionmaker, один раз. Соединяется с базой в момент необходимости
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('sqlite:///:memory:')
Session = sessionmaker(bind=engine)
session = Session()
Состояния объекта в сессии:
Transient | Объект не в сессии и не в БД |
Pending | Объект добавлен в сессию add(), но не flush или commit |
Persistent | Объект в сессии имеет связанную запись в БД |
Detached | Объект больше не в сессии, но в БД есть соответствующая запись |
Modified | Объект изменен |
Просмотр состояния:
from sqlalchemy import inspect
insp = inspect(cc_cookie)
Отключение объекта от сессии:
session.expunge(cc_cookie)
Просмотр списка атрибутов и выяснение, что было модифицировано
for attr, attr_state in insp.attrs.items():
if attr_state.history.has_changes():
print('{}: {}'.format(attr, attr_state.value))
print('History: {}\n'.format(attr_state.history))
Добавление данных
Создаем объект класса, добавляем в сессию и коммитим. Множественное добавление данных:
dcc = Cookie(...)
mcc = Cookie(...)
session.add(dcc)
session.add(mcc)
session.flush()
flush: не выполняет коммит и не завершает транзакцию, но получает id. Потом нужно в пределах сессии сделать commit. Commit в пределах чужой сессии не влияет. Дальше можно использовать объект.
Если дальше не нужно выполнять операции с объектами:
session.bulk_save_objects([dcc,mcc])
session.commit()
Получение данных
all() | все |
first() | возвращает одну запись если она единственная и закрывает соединение |
one() | возвращает одну запись и оставляет соединение. Аккуратно! |
scalar() | возвращает одно значение если результат запроса одна строка с одним столбцом |
cookies = session.query(Cookie).all()
print(cookies)
Если использовать через итератор, то без all:
for cookie in session.query(Cookie):
print(cookie)
Получение определенных столбцов:
cookies = session.query(Cookie.cookie_id).all()
Сортировка:
session.query(Cookie).order_by(Cookie.quantity).all()
.order_by(desc(Cookie.quantity))
Ограничения через срезы или .limit(2)
Встроенные функции:
from sqlalchemy import func
inv_count = session.query(func.sum(Cookie.quantity)).scalar()
print(inv_count)
rec_count = session.query(func.count(Cookie.cookie_name)).first()
Метки: можно добавить для дальнейшего обращения
rec_count = session.query(func.count(Cookie.cookie_name).label('inventory_count')).first()
print(rec_count.inventory_count)
Фильтрация
record = session.query(Cookie).filter(cookie_name=='chocolate chip').first()
record = session.query(Cookie).filter_by(cookie_name='chocolate chip').first()
query = session.query(Cookie).filter(Cookie.cookie_name.like('%chocolate%'))
query = session.query(Cookie).filter(Cookie.quantity > 23, Cookie.unit_cost < 0.40)
Изменение строк при выводе
results = session.query(Cookie.cookie_name, 'SKU-' + Cookie.cookie_sku).all()
query = session.query(Cookie.cookie_name,
cast((Cookie.quantity * Cookie.unit_cost),
Numeric(12,2)).label('inv_cost'))
Join:
query = session.query(Order.order_id, User.username)
results = query.join(User).all()
query = session.query(User.username, func.count(Order.order_id))
query = query.outerjoin(Order).group_by(User.username)
Group:
query = session.query(User.username, func.count(Order.order_id))
query = query.outerjoin(Order).group_by(User.username)
Сырые запросы
from sqlalchemy import text
query = session.query(User).filter(text("username='cookiemon'"))
Обновление данных
Через объект
query = session.query(Cookie)
cc_cookie = query.filter(Cookie.cookie_name == "chocolate chip").first()
cc_cookie.quantity = cc_cookie.quantity + 120
session.commit()
Через метод update
query = session.query(Cookie)
query = query.filter(Cookie.cookie_name == "chocolate chip")
query.update({Cookie.quantity: Cookie.quantity - 20})
Удаление
session.delete(dcc_cookie)
session.commit()
Исключения
from sqlalchemy.orm.exc import MultipleResultsFound
try:
results = session.query(Cookie).one()
except MultipleResultsFound as error:
print('We found too many cookies... is that even possible?')
Транзакции
В ORM транзакция создается автоматически до очередного коммита.
session.add(order)
try:
session.commit()
except IntegrityError as error:
session.rollback()
Пример структурирования
db.py
from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
DateTime, ForeignKey, Boolean, create_engine)
class DataAccessLayer:
connection = None
engine = None
conn_string = None
metadata = MetaData()
cookies = Table('cookies',
metadata,
Column('cookie_id', Integer(), primary_key=True),
Column('cookie_name', String(50), index=True),
Column('cookie_recipe_url', String(255)),
Column('cookie_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
def db_init(self, conn_string):
self.engine = create_engine(conn_string or self.conn_string)
self.metadata.create_all(self.engine)
self.connection = self.engine.connect()
dal = DataAccessLayer()
app.py
from db import dal
from sqlalchemy.sql import select
def get_orders_by_customer(cust_name, shipped=None, details=False):
columns = [dal.orders.c.order_id, dal.users.c.username, dal.users.c.phone]
joins = dal.users.join(dal.orders)
if details:
columns.extend([dal.cookies.c.cookie_name,
dal.line_items.c.quantity,
dal.line_items.c.extended_cost])
joins = joins.join(dal.line_items).join(dal.cookies)
cust_orders = select(columns)
cust_orders = cust_orders.select_from(joins).where(
dal.users.c.username == cust_name)
if shipped is not None:
cust_orders = cust_orders.where(dal.orders.c.shipped == shipped)
return dal.connection.execute(cust_orders).fetchall()
test.py
import unittest
class TestApp(unittest.TestCase):
@classmethod
def setUpClass(cls):
dal.db_init('sqlite:///:memory:')
def test_one(self):
res = get_orders_by_customer('', False)
self.assert_equal(res, [])
Core режим
Сначала необходимо определить, как данные хранятся в таблице. Варианты определения:
- Объект Table
- Декларативный класс
- Получение структуры из базы данных
Сопоставление типов
SQLAlchemy | Python | SQL |
BigInteger | int | BIGINT |
Boolean | bool | BOOLEAN or SMALLINT |
Date | datetime.date | DATE (SQLite: STRING) |
DateTime | datetime.datetime | DATETIME (SQLite: STRING) |
Time | datetime.time | DATETIME |
Enum | str | ENUM or VARCHAR |
Float | float or Decimal | FLOAT or REAL |
Integer | int | INTEGER |
Interval | datetime.timedelta | INTERVAL or DATE from epoch |
LargeBinary | byte | BLOB or BYTEA |
Numeric | decimal.Decimal | NUMERIC or DECIMAL |
Unicode | unicode | UNICODE or VARCHAR |
Text | str | CLOB or TEXT |
Metadata
Каталог объектов Table с опциональной информацией о engine и соединении.
from sqlalchemy import MetaData
metadata = MetaData()
Создание таблицы
metadata.create_all(engine)
Метод ...create_all не пересоздает таблицы.
Объект таблицы состоит из названия, переменной метаданных и столбцов.
from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey
from datetime import datetime
from sqlalchemy import DateTime
cookies = Table('cookies', metadata,
Column('cookie_id', Integer(), primary_key=True),
Column('cookie_name', String(50), index=True),
Column('cookie_recipe_url', String(255)),
Column('cookie_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now)
)
Класс Column
- название столбца
- тип данных
- в String обязательно указывать длину
- Numeric(11,2) означает 11
- доп. параметры
- тип данных
primary_key=True
index=True
nullable=False
unique=True
default=datetime.now
onupdate=datetime.now
Ключи, ограничения и индексы
Могут быть определены в конструкторе столбца (primary_key=True) или позже в конструкторе таблицы.
from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint, CheckConstraint
users = Table(...
PrimaryKeyConstraint('user_id', name='user_pk'),
UniqueConstraint('username', name='uix_username'),
CheckConstraint('unit_cost >= 0.00', name='unit_cost_positive'),
...)
Множественные ключи перечисляются через запятую.
from sqlalchemy import Index
Index('ix_cookies_cookie_name', 'cookie_name')
Index('ix_test', mytable.c.cookie_sku, mytable.c.cookie_name)
Внешние связи
from sqlalchemy import ForeignKey
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id')),
Column('shipped', Boolean(), default=False)
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('cookie_id', ForeignKey('cookies.cookie_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)
Связь для поля order_id:
Column('user_id', ForeignKey('users.user_id'))
#При желании - ограничение
ForeignKeyConstraint(['order_id'], ['orders.order_id'])
Добавление данных
from sqlalchemy import insert
перем = таблица.insert().values()
Лучше (?) вариант
from sqlalchemy import insert
перем = insert(таблица).values()
Строковое представление запроса
str(перем)
Компиляция запроса
перем.compile()
перем.compile().params
Примеры
with engine.connect() as connection:
metadata = ...
cookies = Table...
ins = insert(cookies).values(...)
res = connection.execute(ins)
#res.inserted_primary_key - какой в будущем будет ключ (сейчас фактически в БД нет данных)
connection.commit()
ins = cookies.insert()
inventory_list = [
{
'cookie_name': 'peanut butter',
'cookie_recipe_url': 'http://some.aweso.me/cookie/peanut.html',
},
{
'cookie_name': 'oatmeal raisin',
'cookie_recipe_url': 'http://some.okay.me/cookie/raisin.html',
}
]
result = connection.execute(ins, inventory_list)
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey
metadata = MetaData()
cookies = Table('cookies', metadata,
Column('cookie_id', Integer(), primary_key=True),
Column('cookie_name', String(50), index=True),
Column('cookie_recipe_url', String(255))
)
engine = create_engine('sqlite:///:memory:')
connection = engine.connect()
metadata.create_all(engine)
from sqlalchemy import insert
ins = cookies.insert().values(
cookie_name="chocolate chip",
cookie_recipe_url="http://some.aweso.me/cookie/recipe.html"
)
print(str(ins))
Получение данных
from sqlalchemy.sql import select
s = select(cookies)
rp = connection.execute(s)
Список столбцов
rp.keys()
Получение результата
results = rp.fetchall()
fetchall() | Все записи |
first() | Возвращает одну запись если она единственная и закрывает соединение |
fetchone() | Возвращает одну запись и оставляет соединение. Аккуратно! |
scalar() | Возвращает одно значение если результат запроса одна строка с одним столбцом |
Доступ возможен по:
first_row = results[0] | по индексу результата |
first_row[1] | по номеру столбца в результате |
first_row.cookie_name | по имени столбца |
first_row[cookies.c.cookie_name] | через объект таблицы |
Сортировка
s = select(cookies.c.cookie_name, cookies.c.quantity)
s = s.order_by(cookies.c.quantity)
s = s.order_by(desc(cookies.c.quantity))
Ограничения количества
s = s.limit(2)
Встроенные функции
from sqlalchemy.sql import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())
Фильтрация
s = select([cookies]).where(cookies.c.cookie_name == 'chocolate chip')
Оператор | Описание |
== | Точное равенство |
like('%chocolate%') | Вхождение элемента (регистрозависимый) |
ilike(string) | Вхождение элемента |
between(cleft, cright) | Элемент между значениями |
concat(column_two) | Объединение столбцов |
distinct() | Только уникальные значения столбца |
in_([list]) | Значения столбца в списке |
is_(None) | Значение в столбце None |
contains(string) | Содержит в себе строку (регистрозависимый) |
endswith(string) | Заканчивается строкой (регистрозависимый) |
startswith(string) | Начинается строкой (регистрозависимый) |
notin_() | Отрицание |
isnot() | Исключение |
Внутри where можно использовать and_, or_, not_
from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
and_(
cookies.c.quantity > 23,
cookies.c.unit_cost < 0.40
)
)
Join
columns = [orders.c.order_id, users.c.username, users.c.phone,
cookies.c.cookie_name, line_items.c.quantity, line_items.c.extended_cost]
cookiemon_orders = select(*columns)
cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join(line_items).join(cookies)).where(users.c.username == 'cookiemon')
result = connection.execute(cookiemon_orders).fetchall()
for row in result:
print(row)
Для outerjoin: join -> outerjoin
Алиасы
manager = employee_table.alias('mgr')
Grouping:
columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
Обновление данных:
from sqlalchemy import update
u = update(cookies).where(cookies.c.cookie_name == "chocolate chip")
u = u.values(quantity=(cookies.c.quantity + 120))
result = connection.execute(u)
Удаление данных:
from sqlalchemy import delete
u = delete(cookies).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(u)
Сырые запросы (raw)
result = connection.execute("select * from orders").fetchall()
Обработка исключений
AttributeError - ошибка набора данных
IntegrityError - ошибка ограничений
Стандартная обработка подходит если исполняется один независимый запрос.
try:
result = connection.execute(ins)
except IntegrityError as error:
print(error.orig.message, error.params)
В случае нескольких взаимозависимых запросов необходимо использовать транзакции.
transaction = connection.begin()
try:
...
transaction.commit()
except IntegrityError as error:
transaction.rollback()
Пример:
transaction = connection.begin()
cookies_to_ship = connection.execute(s).fetchall()
try:
for cookie in cookies_to_ship:
u = update(cookies).where(cookies.c.cookie_id == cookie.cookie_id)
u = u.values(quantity = cookies.c.quantity-cookie.quantity)
result = connection.execute(u)
u = update(orders).where(orders.c.order_id == order_id)
u = u.values(shipped=True)
result = connection.execute(u)
print("Shipped order ID: {}".format(order_id))
transaction.commit()
except IntegrityError as error:
transaction.rollback()
Пример проекта
Структура проекта
Директория / файл | Описание |
alembic/ |
Настройки alembic |
conf/ | Настройки окружений. |
conf/settings |
Файлы основных настроек. |
db/ |
Описание структуры базы данных. initializer.py - Инициализация базы данных, метаданных |
db/tablesdefinition |
Файлы описания структур таблиц и методов взаимодействия с данными. |
docker/ | Настройки контейнера |
docker/data | Данные БД |
docker/docker-entrypoint-initdb.d |
Скрипты инициализации БД main.sql - Файл скрипта иницализации |
docker/docker-compose.yml | Compose файл |
src/ | Дополнительные модули |
main.py | Точка входа |
error.log |
Файл лога. |
Предварительная настройка
Для работы примера необходимо установить docker.
Клонировать проекта с репозитория
git clone https://gitverse.ru/bobrobot/alembictemplate.git
Перейти в директорию проекта, создать виртуальное окружение и активировать
cd alembictemplate
python3 -m venv env
source env/bin/activate
Установить дополнительные модули
pip install -r requirements.txt
Перейти в директорию docker и в файле docker-compose.yml настроить пути, имя БД, логин и пароль к новой базе данных.
services:
postgres:
image: postgres:latest
environment:
POSTGRES_DB: "learnsqlalchemy"
POSTGRES_USER: "learner"
POSTGRES_PASSWORD: "StrongPassword123"
PGDATA: "/home/sergey/projects/alembictemplate/docker/data/pgdata"
volumes:
- .:/docker-entrypoint-initdb.d
- mydata:/home/sergey/projects/alembictemplate/docker/data
ports:
- "5430:5432"
volumes:
mydata:
В файле docker-entrypoint-initdb.d/main.sql изменить имя БД, логин и пароль.
CREATE DATABASE learnsqlalchemy;
CREATE USER learner WITH PASSWORD 'StrongPassword123';
ALTER ROLE learner WITH PASSWORD 'StrongPassword123';
GRANT ALL PRIVILEGES ON DATABASE learnsqlalchemy to learner;
В директории docker запустить контейнер БД в фоновом режиме.
docker compose up -d
Для остановки контейнера:
docker compose stop
Сейчас, запустив контейнер, при помощи консольного клиента psql можно проверить соединение с базой данных для пользователя.
psql -d learnsqlalchemy -U learner -W -h 127.0.0.1 -p 5430
Для работы с настройками в формате json используется библиотека src/libsettings.py Описание библиотеки Создать папку src, скопировать из проекта библиотеку libsettings.py
Настройки системы
Файлы основных настроек расположены в conf/settings/ Файл base.py
'''Loading settings to project'''
import os
from src.libsettings import Jsettings
settingspath = os.path.join('conf', 'settings', 'settings.json')
schemapath = os.path.join('conf', 'settings', 'schema.json')
# dev settings
mysettings = Jsettings(settingsfname= settingspath,
schemafname=schemapath)
mysettings.load_settings()
Файл schema.json
{
"type": "object",
"properties": {
"db_username": {"type": "string"},
"db_password": {"type": "string"},
"db_host": {"type": "string"},
"db_port": {"type": "string"},
"db_name": {"type": "string"}
},
"required": ["db_username", "db_password", "db_host",
"db_port", "db_name"]
}
Файл settings.json
{
"db_username": "learner",
"db_password": "StrongPassword123",
"db_host": "127.0.0.1",
"db_port": "5430",
"db_name": "learnsqlalchemy"
}
Файл base.py проверяет схему и создает объект настроек mysettings из файла settings.json. Для получения объекта настроек нужно импортировать объект mysettings в нужном модуле. В данный момент присутствуют настройки базы данных с префиксом db_*
Если такая усложненная система управления настройками покажется излишней, возможно использовать экспорт настроек напрямую из файла.
Инициализация базы данных
Создаем папку db, в ней создаем файл initializer.py
'''Db classes and initialization'''
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from conf.settings.base import mysettings
#load engine settings
engine = create_engine(
"postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}".format(
db_username=mysettings.db_username,
db_password=mysettings.db_password,
db_host=mysettings.db_host,
db_port=mysettings.db_port,
db_name=mysettings.db_name
),
echo=True)
Base = declarative_base()
Здесь только создается engine для подключения к БД и класс Base. При настройке структуры таблиц данный файл будет обновлен, сейчас нужен только класс Base.
Настройка системы версионирования базы данных alembic
Инициализируем alembic в корне проекта.
alembic init alembic
В корне проекта будет создан файл alembic.ini, будет создана папка alembic с файлами инициализации. В большинстве инструкций параметры подключения задаются в файле alembic.ini однако, для доступа к настройкам из единой точки будет изпользоваться способ установки параметров в файле env.py Поэтому в файле alembic.ini переменная sqlalchemy.url должна быть закомментирована. Часть файла alembic.ini
#sqlalchemy.url = driver://user:pass@localhost/dbname
В файле env.py
- импортируем путь
import os
import sys
sys.path.append(os.getcwd())
Импортируем настройки, создаем строку соединения и создаем закоментированный ранее в файле alembic.ini параметр sqlalchemy.url
from conf.settings.base import mysettings
connstring = "postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}".format(
db_username=mysettings.db_username,
db_password=mysettings.db_password,
db_host=mysettings.db_host,
db_port=mysettings.db_port,
db_name=mysettings.db_name
)
config.set_main_option(name="sqlalchemy.url", value=connstring)
Импортируем db.initializer и создаем метаданные
import db.initializer
target_metadata = db.initializer.Base.metadata
Остальные параметры оставляем неизменными. Результирующий файл настроек окружения alembic env.py:
from logging.config import fileConfig
import os
import sys
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from conf.settings.base import mysettings
sys.path.append(os.getcwd())
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
connstring = "postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}".format(
db_username=mysettings.db_username,
db_password=mysettings.db_password,
db_host=mysettings.db_host,
db_port=mysettings.db_port,
db_name=mysettings.db_name
)
config.set_main_option(name="sqlalchemy.url", value=connstring)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
import db.initializer
target_metadata = db.initializer.Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Обновление конфигурации таблиц
Для проверки создаем первую пустую миграцию. После ее выполнения создастся таблица alembic_version
alembic revision -m "Empty Init"
В данный момент фактического соединения с БД не было. В папке versions сформируется файл вида <id>_empty_init.py
После выполнения команды
alembic upgrade head
в таблице alembic_version появится одна запись - идентификатор текущей версии базы данных.
Сейчас в папке db создаем папку tablesdefinition. В ней будем хранить файлы описаний таблиц и методы для работы с таблицами. Создадим файл userprofile.py
'''Definition tables of userprofile'''
import logging
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
import db.initializer
from db.initializer import engine
def create_userprofile_class(Curbase):
class Userprofile(Curbase):
'''Class Userprofile definition'''
__tablename__ = 'userprofile'
user_id = Column(Integer(), primary_key=True)
username = Column(String(15), nullable=False, unique=True)
password = Column(String(255), nullable=False)
email = Column(String(255))
name = Column(String(100))
second_name = Column(String(100))
photo = Column(String(255))
balance = Column(Integer())
return Userprofile
def create_one_userprofile(username, password, email='', name='',
second_name='', photo='', balance=0):
''' Create one userprofile '''
try:
with engine.connect():
Session = sessionmaker(bind=engine)
with Session() as sess:
upelem = db.initializer.userprofile(username=username, password=password,
email=email, name=name, second_name=second_name,
photo=photo, balance=balance)
sess.add(upelem)
sess.commit()
except Exception as e:
logging.error(e)
И в файле initializer.py после инициализации переменной Base добавим раздел инициализации описания таблицы
#============================ Creation classes definitions =========================
# === Import Userprofile class ===
from db.tablesdefinition.userprofile import create_userprofile_class
userprofile = create_userprofile_class(Base)
# ==================================================================================
Теперь после выполнения команды
alembic revision --autogenerate -m "Added userprofile model"
будет автоматически сгененрирован файл миграции, и после
alembic upgrade head
создастся таблица userprofile.
P.s. В точке входа необходимо полностью импортировать initializer иначе будет ошибка, пример:
'''Main learning module'''
import logging
from db import initializer
from db.tablesdefinition.userprofile import create_one_userprofile
logging.basicConfig(level=logging.INFO,
filename='error.log',
format="%(levelname)s %(message)s")
if __name__ == '__main__':
create_one_userprofile(username = 'first6',
password = 'first',
balance = 1)
Alembic
Установка и настройка
Стандартная установка:
pip install alembic
Первая инициализация: alembic init folder_for_dbdata
alembic init alembic
Будет создан файл alembic.ini и директория в соответствии с названием.
В alembic.ini обновляем параметр sqlalchemy.url
В файл env.py добавляем код
import os
import sys
sys.path.append(os.getcwd())
from app.db import Base
target_metadata = Base.metadata
Миграции
Создание первой (пустой) миграции.
После создания пустой миграции, в БД создастся таблица alembic_version в которой хранится идентификатор текущей версии.
alembic revision -m "Empty Init"
Обновление базы данных после создания миграции
alembic upgrade head
Хэш текущей миграции
alembic current
История миграций.
Считываются все файлы миграций. Т.е. в случае загрузки предыдущего состояния, последующие миграции будут отображаться до тех пор, пока ненужные файлы миграций не будут удалены.
alembic history
Возврат к предыдущему состоянию
alembic downgrade migration_id
Для возврата в стартовое состояние, нужно выполнить
alembic downgrade base
Если что-то пошло не так, для возврата в нулевое состояние нужно удалить из таблицы alembic_version текущий номер.
DELETE FROM public.alembic_version;
Пропуск состояния
alembic stamp migration_id
Экспорт в формате sql
alembic upgrade migration_id_start:migration_id_stop --sql > migration.sql
Автогенерация миграции
alembic revision --autogenerate -m "Added Cookie model"
Поддерживаемые и неподдерживаемые действия при автоматической миграции
Тип элемента | Поддерживаемые | Неподдерживаемые |
Таблицы | Добавление и удаление | Изменение имени |
Столбец | Добавление, удаление, изменение нулевого статуса на столбце | Изменение имени |
Индекс | Основные изменения в индексах и явно обозначенных уникальных ограничениях, поддержка автоматической генерации индексов и уникальных ограничений |
|
Ограничения | Ограничения без явного имени | |
Ключи | Переименование | |
Типы | Типы, которые явно не поддерживаются базой данных |
Чтобы alembic увидел класс данных, необходимо его непосредственно импортировать. Импорт всей директории не работает.
Ручное создание миграций
На примере изменения имени таблицы
- Изменить имя.
- Выполнить генерацию миграции
alembic revision -m "Renaming table"
- В созданном файле изменить upgrade/downgrade
def upgrade():
op.rename_table('old_name', 'new_name')
def downgrade():
op.rename_table('new_name', 'old_name')
- Выполнить миграцию
alembic upgrade head
Команды alembic
add_column | Добавить столбец |
alter_column | Изменить тип столбца, имя или значение по-умолчанию |
create_check_constraint | Добавить ограничение |
create_foreign_key | Добавить внешний ключ |
create_index | Создать индекс |
create_primary_key | Создать основной ключ |
create_table | Создать таблицу |
create_unique_constraint | Создать ограничение уникальности |
drop_column | Удалить столбец |
drop_constraint | Удалить ограничение |
drop_index | Удалить индекс |
drop_table | Удалить таблицу |
execute | Выполнить сырую SQL команду |
rename_table | Переименовать таблицу |
Модули
Описание модулей
Хранение конфигурации
Configparser стандартная библиотека для чтения и записи .ini файлов. Инструкция 1
Jsonschema модуль для проверки соответствия json существующей схеме. Документация
Libsettings модуль на основе jsonschema для чтения конфигурации из json файла и проверки конфигурации на соответствие схеме. Gitverse проекта
import logging
from libsettings import Jsettings
logging.basicConfig(level=logging.ERROR,
filename='error.log',
format="%(levelname)s %(message)s")
mysettings = Jsettings(settingsfname='mysettings.json',
schemafname='myschema.json')
mysettings.load_settings()
jsonschema
Используется для валидации json схемы. По умолчанию дополнительно указанные ключи (не существующие в схеме, но присутствующие в документе) не проверяются.
Установка
pip install jsonschema
Базовое использование
from jsonschema import validate
validate(instance=json_to_check, schema=schema)
Исключения
jsonschema.exceptions.ValidationError - если документ не соответствует структуре
jsonschema.exceptions.SchemaError - если сама схема некорректна
Пример схемы:
schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "number"},
},
"required": ["name"],
}
Данная схема определяет json объект с 2 свойствами: name и age. Обязательное свойство name.
Ключевые слова
Для некоторых типов используются дополнительные ключевые слова.
Ключевое слово | Описание |
type |
Тип. Для корня часто object. string - строка number - число object - объект array - список |
$defs | Вложенный шаблон для случая, когда шаблон элемента встречается в нескольких местах. |
$ref | Подстановка вложенного шаблона. |
$schema | Ссылка на шаблон шаблона. При обновлении версии библиотеки будет использоваться новый шаблон шаблона, что может привести к проблемам. Желательно указывать. |
Дополнительные ключевые слова для типов.
Тип array
Ключевое слово | Описание |
items |
Тип элементов списка.
|
minItems | Минимальное количество элементов |
Тип object
Ключевое слово | Описание |
required |
Список обязательных ключей.
|
properties |
Определяет ключи объекта и их тип.
|
additionalProperties |
Если True, то дополнительно указанные ключи приводят к исключению. |
Локальные вложенные шаблоны.
Для определения используется переменная $defs, для использования - $ref.
schema = {
"type": "object",
"properties": {
"address": {"$ref": "#/$defs/address"},
},
"$defs": {
"address": {
"type": "object",
"properties": {
"street": {"type": "string"},
}
},
},
}
Pydantic 2
Описание
Библиотека валидации (проверка на соответствие типов) и трансформации (автоматическое приведение к нужным типам и форматам) данных.
Модели наследуются от класса BaseModel. Модель описывает набор полей, представляющих структуру данных и условия валидации.
Установка
pip install pydantic
Типизация:
- Простая: указание типа, например, name: str.
- Объект Field(): дополнительные параметры, например, значения по умолчанию, ограничения и т.д.
Внутри класса можно комбинировать способы типизации.
from pydantic import BaseModel, Field
class User(BaseModel):
name: str
email: str = Field(..., alias='email_address')
Валидация:
- Минимальная валидация: встроенные типы Python (например, str, int).
- Валидаторы: например EmailStr для проверки email-адресов. Требуется установка дополнительных зависимостей: pydantic[email] или pydantic[all].
- @field_validator — добавляет логику валидации поля. Вызывается при создании или изменении модели.
from pydantic import BaseModel, field_validator
class User(BaseModel):
age: int
@field_validator('age')
def check_age(cls, value):
if value < 18:
raise ValueError('Возраст должен быть больше 18 лет')
return value
- @computed_field — вычисляемое поле на основе данных в модели. Его можно использовать для автоматической генерации значений, а также для валидации.
from pydantic import BaseModel, computed_field
class User(BaseModel):
name: str
surname: str
@computed_field
def full_name(self) -> str:
return f"{self.name} {self.surname}"
- @model_validator - валидация всей модели.
Работает со всей моделью (а не с отдельными полями), может изменять данные перед валидацией (mode='before') или после (mode='after'), полезен для комплексных проверок, где одно поле зависит от другого, может возвращать новую версию модели (если нужно модифицировать данные).
@model_validator(mode='before')
def validate_before(cls, data: dict):
if 'username' not in data:
data['username'] = "guest_" + str(data.get('id', 0))
return data
@model_validator(mode='after')
def validate_after(self):
if self.age < 18 and self.is_premium:
raise ValueError("Minors cannot have premium accounts!")
return self
При проверке before передается класс, при after - объект. Можно делать два валидатора: before для подстановки вычисляемых значений и after для финальной проверки
Интеграция с SQLAlchemy:
Для настройки используется параметр ConfigDict с флагом from_attributes=True.
from datetime import date
from pydantic import BaseModel, ConfigDict
class User(BaseModel):
id: int
name: str = 'John Doe'
birthday_date: date
config = ConfigDict(from_attributes=True)
Для создания модели Pydantic из объекта ORM используется метод from_orm.
user = User.from_orm(orm_instance)
Ссылки:
Pyinstaller
Установка:
python -m pip install pyinstaller
Использование
pyinstaller [параметры] script.py
Параметры
Параметр | Описание |
--onefile | собирает всё в один .exe файл |
--windowed | скрывает консоль (если у вас GUI-приложение). Если нужна консоль, уберите этот флаг. |
--icon=ваша_иконка.ico | иконка |
--name "МояПрограмма" | имя программы |
Rabbitmq
Авторизация через ВК
Перейти на VK для разработчиков
Создать новое приложение, страницы настроек:
В настройках приложения будут защищенный ключ и сервисный ключ.
Тестирование Playwright
Начало
Установка
python -m pip install playwright
Проверка установки
playwright --version
Установка драйверов для браузеров
playwright install #Все браузеры
playwright install name #Только name браузеры
playwright install chromium #Chrome
Установка pytest
python -m pip install pytest
Установка плагина pytest-playwright
python -m pip install pytest-playwright
Описание
Два режима работы: синхронный и асинхронный. Для синхронного:
from playwright.sync_api import sync_playwright
Для асинхронного режима:
import asyncio
from playwright.async_api import async_playwright
async def main():
async with async_playwright() as playwright:
browser = await playwright.chromium.launch()
page = await browser.new_page()
await page.goto("https://playwright.dev")
print(await page.title())
await browser.close()
asyncio.run(main())
Чаще используется синхронный режим.
Запуск и закрытие браузера:
from playwright.sync_api import sync_playwright
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=False, slow_mo=500)
page = browser.new_page()
page.goto("https://playwright.dev/python")
docs_button = page.get_by_role('link', name="Docs")
docs_button.click()
browser.close()
headless=False обозначает визуальное открытие, slow_mo задержка
Использование интерактивной консоли
Иногда для удобства можно использовать консоль python для ручного тестирования покомандного ввода.
python
Python 3.13.1 (tags/v3.13.1:0671451, Dec 3 2024, 19:06:28) [MSC v.1942 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from playwright.sync_api import sync_playwright
>>> playwright = sync_playwright().start()
>>> browser = playwright.chromium.launch(headless=False, slow_mo=100)
>>> page = browser.new_page()
>>> browser.close()
>>> playwright.stop()
Если создать docs_button, у нее будет метод highlight() для визуальной подсветки найденного элемента.
Локаторы
Локаторы: способ поиска элементов на странице. Поэтому они являются методами page
В VSC Ctrl+Click по методу выводит код метода.
Локатор | Описание |
page.get_by_role('link', name="Docs") |
Поиск элемента по роли name - текст link <a> heading <h> radio, checkbox, button |
page.get_by_label("Email address") | Для выделения элементов, у которых есть привязанная метка. Например
|
page.get_by_placeholder("Enter email") | Поиск элементов по placeholder |
page.get_by_text("Something", exact=False) | Поиск по тексту. Exact=False ищет вхождение. |
page.get_by_alt_text(text) | Поиск по атрибуту alt у изображений |
page.get_by_title(text) | Атрибут title |
page.locator(text) |
Поиск по CSS. Можно использовать tagname, classname, id, attribute/value Примеры^ css=h1 footer <tagname>.<classname> button.btn-outline-sucess <tagname>#<idname> button#BtnGroupDrop1 <tagname>[attribute] input[readonly] <tagname>[attribute=somevalue] input[value='correct value'] |
Поиск по иерархии элементов. Если через пробелы-то вложенные элементы. Если через точку - то у элемента несколько классов. Но они не обязательно непосредственно вложенные. nav.bg-dark a.nav-link.active Для непосредственного вложения: nav.bg-dark > a.nav-link.active |
|
Называются sudo классами, Класс и текст в теге. Для вхождения: h1:text('Navbars') Для полного соответствия: h1:text-is('Navbars') div.dropdown-menu:visible Для определения по номеру вхождения, когда их много :nth-match(button.btn-primary, 4) |
|
XPath Абсолютный путь: xpath=/html/head/title С любого начала: xpath=//h1/h2 С указанием атрибута xpath=//h1[ @id='navbars' ] |
|
Функции XPath Для поиска по тексту, точно: //h1[text()='Headling1'] Для поиска по тексту, содержит: //h1[contains(text(), 'Headling1')] Для поиска по тексту, содержит: //h1[contains(@class, 'btn')] |
|
Множественные условия |
Поиск родительского элемента page.get_by_label("Email address").locator("..") Фильтрация page.get_by_role("heading").filter(has_text="First") По дочернему элементу page.locator("div.form-group").filter(has=page.get_by_label("Password")) |
Доступ к iframe
'''Test for auth module'''
import pytest
from playwright.sync_api import Browser, Page, expect
AUTH_URL = "https://wood.bobrobotirk.ru/auth"
@pytest.fixture
def page_and_auth(browser: Browser):
context = browser.new_context(
storage_state="playwright/.auth/vk.json"
)
page = context.new_page()
yield page
context.close()
def test_first(page_and_auth: Page):
page_and_auth.goto(AUTH_URL)
vkframe = page_and_auth.frame(url=lambda url: "id.vk.com" in url)
if vkframe:
authbutton = vkframe.get_by_role("button", name="Продолжить как")
expect(authbutton, "Кнопка Продолжить как... отсутствует").to_be_visible(timeout=20000)
authbutton.click()
back_button = page_and_auth.get_by_role("button", name="Авторизация успешна")
expect(back_button, "Сервис не произвел авторизацию").to_be_visible()
back_button.click()
else:
assert False, "VK фрейм не найден."
Actions
Действие | Описание |
click() |
Однократное нажатие. Опции: button="left" modifiers=["Shift", "Alt"] с зажатой кнопкой Shift timeout=2_000 Задержка перед ошибкой. Обычно 30 сек. force=True Ошибка сразу же если не найден. |
dblclick() |
Двойной щелчок. Опции как у click +: delay=100 - задержка в миллисекундах
|
hover() | Навести мышь на выбранный элемент |
fill("my text") | Заполнить поле ввода текстом my text. Аналогично Ctrl-V |
clear() | Очистить поле ввода |
type("my text", delay=100) | Имитация побуквенного ввода |
check() Еще: set_checked(True) |
Выбор radiobutton, checkbox, switch is_checked() для checkbox проверяет, выбран ли checkbox |
uncheck() |
убрать выбор |
select_option("text") |
Выбор опции из раскрывающегося списка. Но если отсутствует - будет Timeout Error. Если передать список - будет множественный выбор. |
Для раскрытия Dropdown элемента: нажатие на него, выбор элемента и нажатие |
|
set_input_files("") |
Для элемента позволяющего загружать файлы, имя файла из директории, из которой запускается скрипт. Можно передать список. |
Если по кнопке открывается меню выбора файла, то
|
|
press("KeyW") press("Shift+KeyW") press("Control+ArrowLeft") |
События (Events)
События в page.goto
В переменной wait_until.
- load: загрузка всего контента
- domcontentloaded: загрузка dom
- commit: при получении ответа от сервера
- networkidle: до завершения всех событий сети. Для динамического контента не меньше чем load.
Можно считать время загрузки.
from playwright.sync_api import sync_playwright
from time import perf_counter
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=False, slow_mo=500)
page = browser.new_page()
print('Page loading...')
start = perf_counter()
page.goto("https://playwright.dev/python", wait_until='load')
delta = perf_counter() - start
print(f'Page loaded in {delta} s.')
browser.close()
События динамического контента (React, ...)
Находим динамический элемент, кликаем по нему и при помощи wait_for() ждем.
from playwright.sync_api import sync_playwright
from time import perf_counter
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=False, slow_mo=500)
page = browser.new_page()
page.goto("https://www.scrapethissite.com/pages/ajax-javascript/", timeout=60_000)
mylink = page.get_by_role("link", name="2014")
mylink.click()
print('Loading movie...')
start = perf_counter()
loadedcont = page.locator("td.film-title").first
loadedcont.wait_for()
delta = perf_counter() - start
print(f'Movie loaded in {delta} s.')
browser.close()
Ожидание события.
Указываем тип события и функцию, выполняемую при наступлении события.
from playwright.sync_api import sync_playwright
def onload(page):
print("Page loaded", page)
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=False, slow_mo=500)
page = browser.new_page()
page.on("load", onload)
page.goto("https://bootswatch.com/default")
browser.close()
Пример просмотра событий запросов
from playwright.sync_api import sync_playwright
def onrequest(request):
print("Request send: ", request)
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=False, slow_mo=500)
page = browser.new_page()
page.on("request", onrequest)
page.goto("https://bootswatch.com/default")
browser.close()
Вывод:
Request send: <Request url='https://bootswatch.com/default' method='GET'>
Request send: <Request url='http://bootswatch.com/default/' method='GET'>
Request send: <Request url='https://bootswatch.com/default/' method='GET'>
Request send: <Request url='https://bootswatch.com/_vendor/bootstrap/dist/css/bootstrap.css' method='GET'>
Request send: <Request url='https://bootswatch.com/_vendor/bootstrap-icons/font/bootstrap-icons.min.css' method='GET'>
Request send: <Request url='https://bootswatch.com/_vendor/prismjs/themes/prism-okaidia.css' method='GET'>
Request send: <Request url='https://bootswatch.com/_assets/css/custom.min.css' method='GET'>
Request send: <Request url='https://www.googletagmanager.com/gtag/js?id=G-KGDJBEFF3W' method='GET'>
Request send: <Request url='https://cdn.carbonads.com/carbon.js?serve=CKYIE23N&placement=bootswatchcom' method='GET'>
Request send: <Request url='https://bootswatch.com/_vendor/bootstrap/dist/js/bootstrap.bundle.min.js' method='GET'>
Request send: <Request url='https://bootswatch.com/_vendor/prismjs/prism.js' method='GET'>
Request send: <Request url='https://bootswatch.com/_assets/js/custom.js' method='GET'>
Request send: <Request url='https://bootswatch.com/_vendor/bootstrap-icons/font/fonts/bootstrap-icons.woff2?1fa40e8900654d2863d011707b9fb6f2' method='GET'>
Request send: <Request url='https://www.google-analytics.com/g/collect?v=2&tid=G-KGDJBEFF3W>m=45je54g0v9135688085za200&_p=1744912703288&gcd=13l3l3l3l1l1&npa=0&dma=0&tag_exp=102509682~102803279~102813109~102887800~102926062~103027016~103051953~103055465~103077950~103106314~103106316~103130495~103130497&cid=128797986.1744912704&ul=ru-ru&sr=1280x720&uaa=x86&uab=64&uafvl=Not%253AA-Brand%3B24.0.0.0%7CChromium%3B134.0.6998.35&uamb=0&uam=&uap=Windows&uapv=10.0.0&uaw=0&are=1&frm=0&pscdl=noapi&_s=1&sid=1744912703&sct=1&seg=0&dl=https%3A%2F%2Fbootswatch.com%2Fdefault%2F&dt=Bootswatch%3A%20Default&en=page_view&_fv=1&_nsi=1&_ss=1&_ee=1&tfd=2018' method='POST'>
Request send: <Request url='https://srv.carbonads.net/ads/CKYIE23N.json?segment=placement:bootswatchcom&v=true' method='GET'>
Есть событие при выборе файла.
Удаление прослушивания события: page.remove_listener("name_event", func)
События всплывающих окон (alert, confirm, prompt)
Событие page.on("dialog", func), устанавливаем ожидание до возможного появления окна.
Функция:
def on_dialog(dialog)
dialog.accept()
dialog.dismiss()
Если диалог prompt, то для заполнения поля ввода в функции accept нужно добавить строковую переменную.
def on_dialog(dialog)
dialog.accept("Text for enter")
Событие скачивания файла
from playwright.sync_api import sync_playwright
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=False, slow_mo=500)
page = browser.new_page()
page.goto("https://bootswatch.com/default")
btn = page.get_by_role("link", name="Download me")
with page.expect_download() as download_info:
btn.click()
download = download_info.value
download.save_as(fname)
browser.close()
Второй вариант: добавить listener
from playwright.sync_api import sync_playwright
def saving_func(download):
fname = "first.jpg"
download.save_as(fname)
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=False, slow_mo=500)
page = browser.new_page()
page.goto("https://bootswatch.com/default")
page.once("download", saving_func)
btn = page.get_by_role("link", name="Download me")
with page.expect_download() as download_info:
btn.click()
browser.close()
Аутентификация
При 2FA аутентификации возникают проблемы при повторном исполнении скрипта. Для обхода этого используют контекст браузера.
Шаг 1. Сохранение контекста.
from playwright.sync_api import sync_playwright
with sync_playwright() as playwright:
browser = playwright.firefox.launch(headless=False, slow_mo=500)
context = browser.new_context()
page = context.new_page()
page.goto("https://vk.ru")
page.pause() # Откроется доп. окно. Проходим авторизацию, в доп.окне play
context.storage_state(path="playwright/.auth/vk.json")
context.close()
Шаг 2. Использование контекста
from playwright.sync_api import sync_playwright
with sync_playwright() as playwright:
browser = playwright.firefox.launch(headless=False, slow_mo=500)
context = browser.new_context(storage_state="playwright/.auth/vk.json")
page = context.new_page()
page.goto("https://vk.ru")
page.pause() #в реальных скриптах это убирается)
context.close()
Pytest & Playwright
Pytest
Имена файлов тестов должны иметь префикс test_ или постфикс _test. Имена тестов должны иметь префикс test_
В модуле utils функция root, отнимающая 1 от входного параметра. Пример теста:
import utils
def test_first():
num24 = utils.root(25)
assert num24 == 24
Запуск теста:
pytest second_test.py
Ключ -v отображает расширенную информацию.
Ключ -s разрешает вывод данных из тестируемых функций.
Запуск без указания имени файла исполняет все тесты.
Желательно определение типов в функциях.
Виды проверок
Проверка | Описание |
assert mynum == 32 | Проверка на значение |
assert type(dt) == dict | Проверка типа |
assert "timestamp" in dt_list | Проверка присутствия в списке |
Для вывода доп. информации:
assert mynum == 32, "Число mynum должно быть равно 32"
Фикстуры:
Для упаковки повторяющихся действий.
import json
import report
import pytest
@pytest.fixtures
def report_json():
report.generate_report()
with open("report.json") as file:
return json.load(file)
def test_report_json(report_json):
assert type(report_json) == dict
Фикстура исполняется каждый раз при вызове. Для однократного исполнения фикстуры нужно добавить scope="session"
import json
import report
import pytest
@pytest.fixtures(scope="session")
def report_json():
report.generate_report()
with open("report.json") as file:
return json.load(file)
def test_report_json(report_json):
assert type(report_json) == dict
Виды scope
session | Один раз в пределах сессии |
function | Каждый раз |
module | Если несколько тестовых файлов, то один раз в пределе запуска. |
Pytest-playwright
Данный плагин упрощает работу с pytest. Встроенные фикстуры. Задачу создания и удаления playwright, browser берет на себя.
from playwright.sync_api import Page
def test_first(page: Page):
page.goto("https://playwright.dev/python")
link = page.get_by_role("link", name="GET STARTED")
link.click()
assert page.url == "https://playwright.dev/python/docs/intro"
Настройка pytest
Через консоль:
- --headed Отображать окно
- --slowmo=500 Замедление работы
- --browser=firefox Настройка браузера
Через конфигурационный файл pytest.ini
[pytest]
addopts = --headed --slowmo=500 --browser=firefox
Фикстуры
Добавить фикстуру
import pytest
from playwright.sync_api import Page
@pytest.fixture
def load_testpage(page: Page):
page.goto("https://playwright.dev/python")
return page
def test_first(load_testpage: Page):
link = load_testpage.get_by_role("link", name="GET STARTED")
link.click()
assert load_testpage.url == "https://playwright.dev/python/docs/intro"
Автоматическое исполнение фикстуры перед каждой функцией:
import pytest
from playwright.sync_api import Page
@pytest.fixture(autouse=True)
def load_testpage(page: Page):
page.goto("https://playwright.dev/python")
return page
def test_first(page: Page):
link = page.get_by_role("link", name="GET STARTED")
link.click()
assert page.url == "https://playwright.dev/python/docs/intro"
Пред и пост исполнение кода:
import pytest
from playwright.sync_api import Page
@pytest.fixture(autouse=True)
def load_testpage(page: Page):
page.goto("https://playwright.dev/python")
yield page
page.goto("https://yandex.ru")
def test_first(page: Page):
link = page.get_by_role("link", name="GET STARTED")
link.click()
assert page.url == "https://playwright.dev/python/docs/intro"
Пример проверки авторизации ВК
'''Test for auth module'''
import pytest
from playwright.sync_api import Browser, Page, expect
AUTH_URL = "https://wood.bobrobotirk.ru/auth"
@pytest.fixture
def page_and_auth(browser: Browser):
context = browser.new_context(
storage_state="playwright/.auth/vk.json"
)
#page = context.new_page()
yield context
context.close()
def get_cookie(all_cookies, cname):
val = next((cookie['value'] for cookie in all_cookies if cookie.get('name') == cname), None)
return val
def test_first(page_and_auth: Browser):
page = page_and_auth.new_page()
page.goto(AUTH_URL)
vkframe = page.frame(url=lambda url: "id.vk.com" in url)
if vkframe:
authbutton = vkframe.get_by_role("button", name="Продолжить как")
expect(authbutton, "Кнопка Продолжить как... отсутствует").to_be_visible(timeout=20000)
authbutton.click()
back_button = page.get_by_role("button", name="Авторизация успешна")
expect(back_button, "Сервис не произвел авторизацию").to_be_visible()
back_button.click()
all_cookies = page_and_auth.cookies()
session_id_value = get_cookie(all_cookies, 'session_id')
assert session_id_value, "Cookie session_id не установлен."
else:
assert False, "VK фрейм не найден."
Дополнительные возможности
Скриншоты
Скрин страницы
page.screenshot(path="", full_page=True)
Скрин элемента тоже работает.
link.screenshot(path="")
Запись видео
from playwright.sync_api import Browser
def test_first(browser: Browser):
context = browser.new_context(
storage_state="playwright/.auth/vk.json",
record_video_dir="video/"
)
page = context.new_page()
page.goto("https://playwright.dev/python")
link = page.get_by_role("link", name="GET STARTED")
link.click()
assert page.url == "https://playwright.dev/python/docs/intro"
page.goto("https://vk.ru")
Вариант с текстурой:
import pytest
from playwright.sync_api import Browser, Page
@pytest.fixture
def recordable(browser: Browser):
context = browser.new_context(
storage_state="playwright/.auth/vk.json",
record_video_dir="video/"
)
page = context.new_page()
yield page
context.close()
def test_first(recordable: Page):
recordable.goto("https://playwright.dev/python")
link = recordable.get_by_role("link", name="GET STARTED")
link.click()
assert recordable.url == "https://playwright.dev/python/docs/intro"
recordable.goto("https://vk.ru")
Трассировка данных
Создание трассировки:
import pytest
from playwright.sync_api import Page, BrowserContext
@pytest.fixture(autouse=True)
def trace_test(context: BrowserContext):
context.tracing.start(
name="playwrite",
screenshots=True,
snapshots=True,
sources=True
)
yield
context.tracing.stop(path="trace.zip")
def test_first(page: Page):
page.goto("https://playwright.dev/python")
link = page.get_by_role("link", name="GET STARTED")
link.click()
assert page.url == "https://playwright.dev/python/docs/intro"
Просмотр трассировки
playwright show-trace trace.zip
Запись действий
Старт записи
playwright codegen playwright.dev
В окне кода есть поле Target, позволяет переключать тип библиотек (playwright sync/async, pytest). Потом сохраняем полученную последовательность и устанавливаем нужные условия для проверки.
Ожидание
from playwright.sync_api import Page, expect
DOCS_URL = "https://playwright.dev/python/docs/intro"
def test_first(page: Page):
page.goto("https://playwright.dev/python")
link = page.get_by_role("link", name="GET STARTED")
link.click()
#assert page.url == DOCS_URL
expect(page).to_have_url(DOCS_URL)
expect(page).to_have_url | наличие url |
expect(page).to_have_title | наличие title |
link = page.get_by_role("link", name="GET STARTEDer")
expect(link).to_be_visible()
|
Видимость элемента в переменной link |
expect(link).to_be_enabled()
|
Доступный элемент |
expect(heading).to_contain_text()
|
Присутствие текста (часть) |
expect(heading).to_have_text()
|
Присутствие текста (полное совпадение) |
expect(mylink).to_have_class()
|
Наличие класса у элемента Несколько классов: "class1 class2" Должно быть полное соответствие. Но можно использовать регулярки.
|
expect(mylink).to_have_id()
|
Наличие id |
expect(mylink).to_have_attribute(attr_name, attr_value)
|
Наличие атрибута. При необходимости можно указать значение атрибута. |
expect(mylink).to_be_editable()
|
|
expect(mylink).to_be_empty()
|
|
expect(mycheckbox).to_be_checked()
|
|
expect(mymenu).to_have_value()
|
Элемент в меню выбора |
expect(mymultimenu).to_have_values([])
|
Несколько выбранных |
not_ - префикс отрицания
Asyncio
Сопрограммы
ЭТО ТЕХНОЛОГИЯ УСКОРЕНИЯ РАБОТЫ В ОДНОМ ПОТОКЕ. ДЛЯ ПАРАЛЛЕЛЬНЫХ ВЫЧИСЛЕНИЙ THREADING,
Сопрограммы - функции с возможностью приостановки при длительной внешней операции. Async определяет сопрограмму, await приостанавливает сопрограмму на время выполнения внешней задачи. При вызове сопрограммы она напрямую не выполняется. Для старта нужна точка входа в асинхронные вычисления.
Пример: правильное использование.
import asyncio
async def corutine_add_one(a: int) -> int:
return b + 1
corutine_res = asyncio.run(corutine_add_one(5))
print(type(corutine_res), ' ', corutine_res)
Однако если просто запустить корутину, то результат будет интересным:
import asyncio
async def corutine_add_one(a: int) -> int:
return b + 1
corutine_res = corutine_add_one(5)
print(type(corutine_res), ' ', corutine_res)
Вывод:
<class 'coroutine'> <coroutine object corutine_add_one at 0x0000026850FD4270>
Существует цикл событий. При постановке корутины на паузу задача передается следующей корутине. Однако такой код выполняется в виде простого синхронного кода:
import asyncio
async def sleep_10(x: int) -> int:
await asyncio.sleep(10)
return x+10
async def main():
x1 = await sleep_10(1)
x2 = await sleep_10(1)
print(x1, ' ', x2)
asyncio.run(main())
Задачи
Для приближения к параллельным вычислениям необходимо использовать задачи.
import asyncio
from util import delay
async def main():
sleep1 = asyncio.create_task(delay(10))
sleep2 = asyncio.create_task(delay(10))
await sleep1
await sleep2
asyncio.run(main())
К тому же, если убрать await sleep2, то вторая задача все равно выполнится, т.к. вроде все задачи из очереди запускаются. Это так, вот только не говорится о том, завершаются они или нет. Вот пример кода:
async def delay(delay_seconds: int) -> int:
print(f'Засыпаю на {delay_seconds} секунд.')
await asyncio.sleep(delay_seconds)
print(f'сон в течение {delay_seconds} закончился.')
return delay_seconds
async def delay_with_inner(delay_seconds: int) -> int:
print(f'Засыпаю на {delay_seconds} секунд.')
t1 = asyncio.create_task(delay(15))
await asyncio.sleep(delay_seconds)
#await t1
print(f'сон в течение {delay_seconds} закончился.')
return delay_seconds
async def main():
sleep1 = asyncio.create_task(delay_with_inner(10))
sleep2 = asyncio.create_task(delay_with_inner(10))
sleep3 = asyncio.create_task(delay_with_inner(10))
await sleep1
await sleep2
await sleep3
asyncio.run(main())
Если внешнюю мы запускаем на 10 секунд, то внутренняя (t1) тоже запускается - т.к. находится в очереди задач. Но вот только программа завершается до завершения t1.
Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
сон в течение 10 закончился.
сон в течение 10 закончился.
сон в течение 10 закончился.
И все! А если раскомментировать строку с ожиданием t1, то тогда сначала завершится t1
Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 10 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
Засыпаю на 15 секунд.
сон в течение 15 закончился.
сон в течение 15 закончился.
сон в течение 15 закончился.
сон в течение 10 закончился.
сон в течение 10 закончился.
сон в течение 10 закончился.
А для такого кода будет создано 3 файла. Но если вместо sleep3 установить sleep1, то все задачи стартуют, но будет создан только файл 5.txt.
import asyncio
from util import delay_with_inner, delay, delay_and_writefile
from time import sleep
async def main():
sleep1 = asyncio.create_task(delay_and_writefile(5))
sleep2 = asyncio.create_task(delay_and_writefile(10))
sleep3 = asyncio.create_task(delay_and_writefile(11))
await sleep3
asyncio.run(main())
Исходя из этого, важна не последовательность помещения в задачи, а последовательность при обращении (момент await).
Снятие задач и тайм-ауты
import asyncio
from asyncio import CancelledError
from util import delay
async def main():
long_task = asyncio.create_task(delay(10))
seconds_elapsed = 0
while not long_task.done():
print('Задача не закончилась, следующая проверка через секунду.')
await asyncio.sleep(1)
seconds_elapsed = seconds_elapsed + 1
if seconds_elapsed == 5:
long_task.cancel()
try:
await long_task
except CancelledError:
print('Наша задача была снята')
asyncio.run(main())
Тайм-аут:
import asyncio
from util import delay
async def main():
delay_task = asyncio.create_task(delay(2))
try:
result = await asyncio.wait_for(delay_task, timeout=1)
print(result)
except asyncio.exceptions.TimeoutError:
print('Тайм-аут!')
print(f'Задача была снята? {delay_task.cancelled()}')
asyncio.run(main())
При помощи shield можно игнорировать запросы на снятие:
import asyncio
from util import delay
async def main():
task = asyncio.create_task(delay(10))
try:
result = await asyncio.wait_for(asyncio.shield(task), 5)
print(result)
except TimeoutError:
print("Задача заняла более 5 с, скоро она закончится!")
result = await task
print(result)
asyncio.run(main())
Список задач:
for task in asyncio.tasks.all_tasks():
print(task)
Множественные task в цикле.
Проблема: при создании в цикле при возникновении исключения в одной задаче, остальные задачи могут не завершить выполнение.
Функция gather
import asyncio
import aiohttp
from aiohttp import ClientSession
from chapter_04 import fetch_status
async def main():
async with aiohttp.ClientSession() as session:
urls = ['https://example.com' for _ in range(1000)]
requests = [fetch_status(session, url) for url in urls]
status_codes = await asyncio.gather(*requests)
print(status_codes)
asyncio.run(main())
Gather возвращает статусы в порядке передаче объектов, независимо от времени исполнения. Возвращает все данные одновременно.
Обработка исключений - необязательный bool параметр return_exceptions.
- return_exceptions=False по умолчанию. Если хотя бы одна сопрограмма возбуждает исключение, то gather возбуждает то же исключение в точке await. Но, даже если какая-то сопрограмма откажет, остальные не снимаются и продолжат работать при условии, что мы обработаем исключение и оно не приведет к остановке цикла событий и снятию задач;
- return_exceptions=True – в этом случае исключения возвращаются в том же списке, что результаты. Сам по себе вызов gather не возбуждает исключений, и мы можем обработать исключения, как нам удобно.
async def main():
async with aiohttp.ClientSession() as session:
urls = ['https://example.com', 'python://example.com']
tasks = [fetch_status_code(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
exceptions = [res for res in results if isinstance(res, Exception)]
successful_results = [res for res in results if not isinstance(res, Exception)]
print(f'Все результаты: {results}')
print(f'Завершились успешно: {successful_results}')
print(f'Завершились с исключением: {exceptions}')
Функция as_completed
Начинает возвращать данные по мере поступления. Однако порядок не определен.
import asyncio
import aiohttp
from aiohttp import ClientSession
from chapter_04 import fetch_status
async def main():
async with aiohttp.ClientSession() as session:
fetchers = [fetch_status(session, 'https://www.example.com', 1),
fetch_status(session, 'https://www.example.com', 1),
fetch_status(session, 'https://www.example.com', 10)]
for finished_task in asyncio.as_completed(fetchers):
print(await finished_task)
asyncio.run(main())
asyncio.as_completed(fetchers, timeout=2) задает тайм-аут. Но созданные задачи продолжают работать в фоновом режиме.
Точный контроль над функциями
Используется wait:
async def main():
async with aiohttp.ClientSession() as session:
fetchers = \
[asyncio.create_task(fetch_status(session, 'https://example.com')),
asyncio.create_task(fetch_status(session, 'https://example.com'))]
done, pending = await asyncio.wait(fetchers)
print(f'Число завершившихся задач: {len(done)}')
print(f'Число ожидающих задач: {len(pending)}')
for done_task in done:
result = await done_task
print(result)
Сигнатура wait – список допускающих ожидание объектов, за которым следует факультативный тайм-аут и факультативный
параметр return_when, который может принимать значения ALL_COMPLETED, FIRST_EXCEPTION и FIRST_COMPLETED, а по умолчанию равен ALL_COMPLETED.
wait не возбуждает исключений, исключения содержатся в методе done_task.exception()
Пример обработки исключений
import asyncio
import logging
async def main():
async with aiohttp.ClientSession() as session:
good_request = fetch_status(session, 'https://www.example.com')
bad_request = fetch_status(session, 'python://bad')
fetchers = [asyncio.create_task(good_request),
asyncio.create_task(bad_request)]
done, pending = await asyncio.wait(fetchers)
print(f'Число завершившихся задач: {len(done)}')
print(f'Число ожидающих задач: {len(pending)}')
for done_task in done:
# result = await done_task возбудит исключение
if done_task.exception() is None:
print(done_task.result())
else:
logging.error("При выполнении запроса возникло исключение",
exc_info=done_task.exception())
asyncio.run(main())
Типы future и awaitable
Практически редко применяются, но нужны для понимания
Будущие объекты можно использовать в выражениях await. Это означает «я посплю, пока в будущем объекте не будет установлено значение, с которым я могу работать, а когда оно появится, разбуди меня и дай возможность его обработать».
from asyncio import Future
my_future = Future()
print(f'my_future готов? {my_future.done()}')
my_future.set_result(42)
print(f'my_future готов? {my_future.done()}')
print(f'Какой результат хранится в my_future? {my_future.result()}')
Вариант для использования не до конца определенной переменной:
from asyncio import Future
import asyncio
def make_request() -> Future:
future = Future()
asyncio.create_task(set_future_value(future))
return future
async def set_future_value(future) -> None:
await asyncio.sleep(1)
future.set_result(42)
async def main():
future = make_request()
print(f'Будущий объект готов? {future.done()}')
value = await future
print(f'Будущий объект готов? {future.done()}')
print(value)
asyncio.run(main())
Любой объект, который реализует метод __await__, можно использовать в выражении await. Это объекты, допускающие ожидание (Awaitaible).
Асинхронный контекстный менеджер
Асинхронный контекстный менеджер.
Это класс, реализующий два специальных метода-сопрограммы: __aenter__, который асинхронно захватывает ресурс, и __aexit__, который закрывает ресурс. Сопрограмма __aexit__ принимает несколько аргументов, относящихся к обработке исключений. Пример для сокетов:
import asyncio
import socket
from types import TracebackType
from typing import Optional, Type
class ConnectedSocket:
def __init__(self, server_socket):
self._connection = None
self._server_socket = server_socket
async def __aenter__(self):
print('Вход в контекстный менеджер, ожидание подключения')
loop = asyncio.get_event_loop()
connection, address = await loop.sock_accept(self._server_socket)
self._connection = connection
print('Подключение подтверждено')
return self._connection
async def __aexit__(self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType]):
print('Выход из контекстного менеджера')
self._connection.close()
print('Подключение закрыто')
async def main():
loop = asyncio.get_event_loop()
server_socket = socket.socket()
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ('127.0.0.1', 8000)
server_socket.setblocking(False)
server_socket.bind(server_address)
server_socket.listen()
async with ConnectedSocket(server_socket) as connection:
data = await loop.sock_recv(connection, 1024)
print(data)
asyncio.run(main())
Aiohttp
Сеансовый асинхронный http(s) клиент с автоматической поддержкой cookies. Пул подключений использует один сеанс.
Установка:
pip install aiohttp
Использование:
import asyncio
import aiohttp
from aiohttp import ClientSession
from util import async_timed
async def fetch_status(session: ClientSession, url: str) -> int:
async with session.get(url) as result:
return result.status
async def main():
async with aiohttp.ClientSession() as session:
url = 'https://www.example.com'
status = await fetch_status(session, url)
print(f'Состояние для {url} было равно {status}')
asyncio.run(main())
По умолчанию в сеансе не более 100 подключений. Для увеличения можно создать экземпляр класса TCPConnector, входящего в состав aiohttp, указав максимальное число подключений, и передать его конструктору ClientSession. Подробнее в документации aiohttp.
Тайм-аут:
По умолчанию тайм-аут запроса 5 минут. Можно устанавливать на уровне сеанса или запроса.
import asyncio
import aiohttp
from aiohttp import ClientSession
async def fetch_status(session: ClientSession, url: str) -> int:
ten_millis = aiohttp.ClientTimeout(total=.01)
async with session.get(url, timeout=ten_millis) as result:
return result.status
async def main():
session_timeout = aiohttp.ClientTimeout(total=1, connect=.1)
async with aiohttp.ClientSession(timeout=session_timeout) as session:
await fetch_status(session, 'https://example.com')
asyncio.run(main())
В этом случае полный тайм-аут 1 секунда, для установки соединения - 100мс. В функции fetch_status переопределяется в 10мс.
Множественные запросы
import asyncio
import aiohttp
from aiohttp import ClientSession
from chapter_04 import fetch_status
async def main():
async with aiohttp.ClientSession() as session:
urls = ['https://example.com' for _ in range(1000)]
requests = [fetch_status(session, url) for url in urls]
status_codes = await asyncio.gather(*requests)
print(status_codes)
asyncio.run(main())
Обработка ошибок
async def main():
async with aiohttp.ClientSession() as session:
urls = ['https://example.com', 'python://example.com']
tasks = [fetch_status_code(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
exceptions = [res for res in results if isinstance(res, Exception)]
successful_results = [res for res in results if not isinstance(res, Exception)]
print(f'Все результаты: {results}')
print(f'Завершились успешно: {successful_results}')
print(f'Завершились с исключением: {exceptions}')