Sqlalchemy
Установка
Ядро
pip install sqlalchemy
Драйвер для postgres, mysql
pip install psycopg2
pip install psycopg2-binary
pip install pymysql
Подключение
from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg2://username:password@localhost:5432/mydb')
engine = create_engine('mysql+pymysql://cookiem:chip@mysql01.com/cookies', pool_recycle=3600)
engine = create_engine('sqlite:///cookies.db')
engine2 = create_engine('sqlite:///:memory:')
Доп. ключи через запятую:
echo | булево. Лог запросов. По-умолчанию False. |
encoding | строка. По-умолчанию utf-8 |
isolation_level | уровень изоляции |
pool_recycle | число секунд для переподключения, желательно выставлять 3600. При работе с mysql соединение может быть активным до 4 часов. |
Создание engine не создает фактического соединения с БД.
connection = engine.connect()
Сырой запрос:
result = connection.execute("select * from orders").fetchall()
Структура фреймворка
Таблицы -> MetaData -> Engine -> Dialect -> DB
MetaData: объект, в котором таблицы, индексы,...
- Может получать информацию о существующих сейчас сущностях в БД
- Может хранить шаблоны именования индексов и ограничений (constraint). Т е перед началом проекта добавляется настройка именования, затем при создании/удалении все ок. Это словарь ограничение:шаблон
Ограничение или индекс | Описание |
ix | обычный индекс |
uq | уникальный индекс |
ck | ограничение проверки |
fk | foreign-ключ |
pk | primary-ключ |
from sqlalchemy import MetaData
convention = {
'all_column_names': lambda constraint, table: '_'.join([
column.name for column in constraint.columns.values()
]),
'ix': 'ix__%(table_name)s__%(all_column_names)s',
'uq': 'uq__%(table_name)s__%(all_column_names)s',
'ck': 'ck__%(table_name)s__%(all_column_names)s',
'fk': ('fk__%(table_name)s__%(all_column_names)s' '%(referred_table_name)s'),
'pk': 'pk__%(table_name)s'
}
metadata_obj = MetaData(naming_convention=convention)
- Должен быть инициализирован до обращения к нему в таблицах
- Изначально пустой объект, можно или вручную занести данные, или получить из базы.
- Получение информации об одной таблице по имени
- Нельзя получить одновременно две таблицы. Либо одна, либо вся база
- Нельзя (и одна таблица, и база) получить ограничения (CONSTRAINT), комментарии, триггеры, значения по умолчанию. Но можно вручную добавить данные. Но похоже проще импортировать описание.
from sqlalchemy import ForeignKeyConstraint
album.append_constraint(ForeignKeyConstraint(['ArtistId'], ['artist.ArtistId']))
-
- при получении базы, имена таблиц с большой и маленькой буквы. Т е удваивается количество объектов.
engine = create_engine(...)
metadata = MetaData()
cookie_tbl = Table('cookies', metadata, autoload_with=engine)
s = select(cookie_tbl)
with engine.connect() as conn:
m = conn.execute(s)
print(m.keys())
- Получение информации обо всей базе
from sqlalchemy import create_engine, MetaData
engine = create_engine(...)
metadata = MetaData()
metadata.reflect(bind=engine)
for table in metadata.sorted_tables:
print(table.name)
#Потом получить объект таблицы:
mytable = metadata.tables['mytable']
- Получение информации обо всей базе через ORM + Automap
from sqlalchemy.ext.automap import automap_base
from sqlalchemy import create_engine
Base = automap_base()
engine = create_engine('sqlite:///Chinook_Sqlite.sqlite')
Base.prepare(engine, reflect=True)
# данные о классах загружены.
#Например для получения списка классов:
Base.classes.keys()
Artist = Base.classes.Artist # создание классов
#Внешние связи - в свойстве <related_object>_collection
artist = session.query(Artist).first()
for album in artist.album_collection:
print('{} - {}'.format(artist.Name, album.Title))
Engine: Скрывает пул подключений и диалект.
Dialect: Скрывает детали реализации в конкретной базе
Core: SQL в чистом виде
ORM: абстракции
Dialect: Скрывает детали реализации в конкретной базе
Core: SQL в чистом виде
ORM: абстракции
========== Неформатированный текст ===========
Внешние связи:
from sqlalchemy import ForeignKey
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id')),
Column('shipped', Boolean(), default=False)
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)
Связь для поля order_id:
ForeignKeyConstraint(['order_id'], ['orders.order_id'])
============================================================Работа с данными=============================================================
INSERT
Вариант 1:
quantity="12",
unit_cost="0.50"
)
result = connection.execute(ins)
print(result.inserted_primary_key)
Вариант 2:
from sqlalchemy import insert
quantity="12",
unit_cost="0.50"
)
Вариант 3
result = connection.execute(
ins,
quantity='1',
unit_cost='0.75'
)
result.inserted_primary_key
Вариант 4
inventory_list = [
{
'quantity': '24',
'unit_cost': '0.25'
},
{
'quantity': '100',
'unit_cost': '1.00'
}
]
result = connection.execute(ins, inventory_list)
SELECT
Вариант 1
from sqlalchemy.sql import select
rp = connection.execute(s)
results = rp.fetchall()
Вариант 2
rp = connection.execute(s)
results = rp.fetchall()
Вариант 3
rp = connection.execute(s)
for record in rp:
print(record.cookie_name)
SELECT определенных столбцов
ORDERING
s = s.order_by(cookies.c.quantity) - прямая сортировка
или s = s.order_by(desc(cookies.c.quantity)) - обратная сортировка
rp = connection.execute(s)
LIMITING
s = s.order_by(cookies.c.quantity)
s = s.limit(2)
rp = connection.execute(s)
Встроенные функции SQL
Сумма: func.sum
from sqlalchemy.sql import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())
Количество: func.count
s = select([func.count(cookies.c.cookie_name)])
rp = connection.execute(s)
record = rp.first()
print(record.keys()) - ключи могут быть разные.
print(record.count_1)
При помощи label можно определить название свойства
s = select([func.count(cookies.c.cookie_name).label('inventory_count')])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.inventory_count)
WHERE
Их можно комбинировать, используется AND
rp = connection.execute(s)
Варианты модификаторов:
Метод Описание
between(cleft, cright) Значение столбца между cleft и cright
concat(column_two) Объединение column и column_two
distinct() Находит только уникальные значения в столбце
in_([list]) Только если значения столбца в списке
is_(None) Проверка на пустые значения
contains(string) Значение столбца содержит строку
endswith(string) Оканчивается строкой, зависит от больших букв
like(string) зависит от больших букв
startswith(string) зависит от больших букв
ilike(string) не зависит от больших букв
Есть отрицательные модификаторы not<method>, исключение - метод isnot()
Пример для LIKE
rp = connection.execute(s)
for record in rp.fetchall():
print(record.cookie_name)
Можно модифицировать полученные данные по шаблону
Вариант 1
Вариант 2 - через функцию cast
from sqlalchemy import cast
cast((cookies.c.quantity * cookies.c.unit_cost),
Numeric(12,2)).label('inv_cost')])
for row in connection.execute(s):
print('{} - {}'.format(row.cookie_name, row.inv_cost))
Логические функции:
from sqlalchemy import and_, or_, not_
and_(
)
)
извлечение данных
first_row = results[0]
first_row[1]
Для rp есть следующие варианты, однако без limit() извлекаются все данные, затем одна строка:
first() - Первая запись и закрытие соединения
fetchone() - Одна запись и оставляет открытый курсор для последующих запросов
scalar() - Одно значение если запрос возвращает одно значение в одной строке
rp.keys() - список столбцов
Обновление данных
from sqlalchemy import update
u = u.values(quantity=(cookies.c.quantity + 120))
result = connection.execute(u)
Удаление данных
from sqlalchemy import delete
result = connection.execute(u)
JOIN
columns = [orders.c.order_id, users.c.username, users.c.phone,
line_items.c.extended_cost]
line_items).join(cookies)).where(users.c.username ==
'cookiemon')
result = connection.execute(cookiemon_orders).fetchall()
OUTER JOIN:
Для получения обратной статистики
columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
Есть поддержка ALIAS
Группировка данных:
columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
Объединение запросов по условию
def get_orders_by_customer(cust_name, shipped=None, details=False):
columns = [orders.c.order_id, users.c.username, users.c.phone]
joins = users.join(orders)
if details:
columns.extend([cookies.c.cookie_name, line_items.c.quantity,
line_items.c.extended_cost])
joins = joins.join(line_items).join(cookies)
cust_orders = select(columns)
cust_orders = cust_orders.select_from(joins)
34 | Chapter 2: Working with Data via SQLAlchemy Core
cust_orders = cust_orders.where(users.c.username == cust_name)
if shipped is not None:
cust_orders = cust_orders.where(orders.c.shipped == shipped)
result = connection.execute(cust_orders).fetchall()
return result
get_orders_by_customer('cakeeater')
get_orders_by_customer('cakeeater', details=True)
get_orders_by_customer('cakeeater', shipped=True)
get_orders_by_customer('cakeeater', shipped=False)
get_orders_by_customer('cakeeater', shipped=False, details=True)
Сырые запросы
result = connection.execute("select * from orders").fetchall()
print(result)