Skip to main content

Sqlalchemy

Установка
Ядро 
pip install sqlalchemy
Драйвер для postgres, mysql
pip install psycopg2
pip install psycopg2-binary
pip  install  pymysql
Подключение
from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg2://username:password@localhost:5432/mydb')
engine = create_engine('mysql+pymysql://cookiem:chip@mysql01.com/cookies', pool_recycle=3600)
engine = create_engine('sqlite:///cookies.db')
engine2 = create_engine('sqlite:///:memory:')
Доп. ключи через запятую:
echo булево. Лог запросов. По-умолчанию False.
encoding строка. По-умолчанию utf-8
isolation_level уровень изоляции
pool_recycle число секунд для переподключения, желательно выставлять 3600. При работе с mysql соединение может быть активным до 4 часов.

Создание engine не создает фактического соединения с БД. 

connection = engine.connect()

Сырой запрос:

result = connection.execute("select * from orders").fetchall()
Структура фреймворка
Таблицы -> MetaData -> Engine -> Dialect -> DB
MetaData: объект, в котором таблицы, индексы,...
  • Может получать информацию о существующих сейчас сущностях в БД
  • Может хранить шаблоны именования индексов и ограничений (constraint). Т е перед началом проекта добавляется настройка именования, затем при создании/удалении все ок. Это словарь ограничение:шаблон
Ограничение или индекс Описание
ix обычный индекс
uq уникальный индекс
ck ограничение проверки
fk foreign-ключ
pk primary-ключ
from sqlalchemy import MetaData
convention = {
'all_column_names': lambda constraint, table: '_'.join([
column.name for column in constraint.columns.values()
]),
'ix': 'ix__%(table_name)s__%(all_column_names)s',
'uq': 'uq__%(table_name)s__%(all_column_names)s',
'ck': 'ck__%(table_name)s__%(all_column_names)s',
'fk': ('fk__%(table_name)s__%(all_column_names)s' '%(referred_table_name)s'),
'pk': 'pk__%(table_name)s'
}
metadata_obj = MetaData(naming_convention=convention)
  • Должен быть инициализирован до обращения к нему в таблицах
  • Изначально пустой объект, можно или вручную занести данные, или получить из базы.
  • Получение информации об одной таблице по имени
    • Нельзя получить одновременно две таблицы. Либо одна, либо вся база
    • Нельзя (и одна таблица, и база) получить ограничения (CONSTRAINT), комментарии, триггеры, значения по умолчанию. Но можно вручную добавить данные. Но похоже проще импортировать описание.
from sqlalchemy import ForeignKeyConstraint
album.append_constraint(ForeignKeyConstraint(['ArtistId'], ['artist.ArtistId']))
    • при получении базы, имена таблиц с большой и маленькой буквы. Т е удваивается количество объектов.
engine = create_engine(...)
metadata = MetaData()
cookie_tbl = Table('cookies', metadata, autoload_with=engine)
s = select(cookie_tbl)
with engine.connect() as conn:
    m = conn.execute(s)
    print(m.keys())
  • Получение информации обо всей базе
from sqlalchemy import create_engine, MetaData

engine = create_engine(...)
metadata = MetaData()
metadata.reflect(bind=engine)
for table in metadata.sorted_tables:
    print(table.name)
#Потом получить объект таблицы: 
mytable = metadata.tables['mytable']
  • Получение информации обо всей базе через ORM + Automap

from sqlalchemy.ext.automap import automap_base
from sqlalchemy import create_engine

Base = automap_base()
engine = create_engine('sqlite:///Chinook_Sqlite.sqlite')
Base.prepare(engine, reflect=True)
# данные о классах загружены. 
#Например для получения списка классов:
Base.classes.keys()
Artist = Base.classes.Artist # создание классов
#Внешние связи - в свойстве <related_object>_collection
artist = session.query(Artist).first()
for album in artist.album_collection:
    print('{} - {}'.format(artist.Name, album.Title))

Engine: Скрывает пул подключений и диалект. 
Dialect: Скрывает детали реализации в конкретной базе
Core: SQL в чистом виде
ORM: абстракции

========== Неформатированный текст ===========
============================================================Работа с данными=============================================================
INSERT
Вариант 1:
ins = cookies.insert().values(
      quantity="12",
      unit_cost="0.50"
      )
      result = connection.execute(ins)
      print(result.inserted_primary_key)
Вариант 2:
from sqlalchemy import insert
ins = insert(cookies).values( 
      quantity="12",
      unit_cost="0.50"
)
)
Вариант 3
ins = cookies.insert()
result = connection.execute(
      ins, 
      quantity='1',
      unit_cost='0.75'
) result.inserted_primary_key
)
result.inserted_primary_key
Вариант 4
inventory_list = [ 
      {
          'cookie_name': 'peanut butter',
          'cookie_recipe_url': 'http://some.aweso.me/cookie/peanut.html',
          'cookie_sku': 'PB01',
          'quantity': '24',
          'unit_cost': '0.25'
      },
      {
          'cookie_name': 'oatmeal raisin',
          'cookie_recipe_url': 'http://some.okay.me/cookie/raisin.html',
          'cookie_sku': 'EWW01',
          'quantity': '100',
          'unit_cost': '1.00'
      }
]
result = connection.execute(ins, inventory_list)
SELECT 
Вариант 1
from sqlalchemy.sql import select
s = select([cookies]) 
rp = connection.execute(s)
results = rp.fetchall()
Вариант 2
s = cookies.select()
rp = connection.execute(s)
results = rp.fetchall()
Вариант 3
s = cookies.select()
rp = connection.execute(s) 
for record in rp:
      print(record.cookie_name)
SELECT определенных столбцов
s = select([cookies.c.cookie_name, cookies.c.quantity])
ORDERING
ORDERING
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)  - пПрямая сортировка
или
s = s.order_by(desc(cookies.c.quantity))
- о
Обратная сортировка
s = s.order_by(desc(cookies.c.quantity))
rp = connection.execute(s)
LIMITING
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
s = s.limit(2)
rp = connection.execute(s)
Встроенные функции SQL
Сумма: func.sum
from sqlalchemy.sql import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar()) 
Количество: func.count
s = select([func.count(cookies.c.cookie_name)])
rp = connection.execute(s)
record = rp.first()
print(record.keys()) -# ключи могут быть разные.
print(record.count_1) 
При помощи label можно определить нНазвание свойства: label 
s = select([func.count(cookies.c.cookie_name).label('inventory_count')]) 
rp = connection.execute(s)
record = rp.first()
print(record.keys()) print(record.inventory_count)
print(record.inventory_count)
WHERE
Их можно комбинировать, используется AND
s = select([cookies]).where(cookies.c.cookie_name == 'chocolate chip')
rp = connection.execute(s)
Варианты модификаторов:
Метод
    Описание
  • between(cleft, cright) Значение столбца между cleft и cright
  • concat(column_two) Объединение column и column_two
  • distinct() Находит только уникальные значения в столбце
  • in_([list]) Только если значения столбца в списке
  • is_(None) Проверка на пустые значения 
  • contains(string) Значение столбца содержит строку
  • endswith(string) Оканчивается строкой, зависит от больших букв
  • like(string) зависит от больших букв
  • startswith(string) зависит от больших букв
  • ilike(string) не зависит от больших букв 
  • Есть отрицательные модификаторы not<method>, исключение - метод isnot() 
  • Пример для LIKE
    s = select([cookies]).where(cookies.c.cookie_name.like('%chocolate%'))
    rp = connection.execute(s)
    for record in rp.fetchall():
          print(record.cookie_name)
    Можно модификацироватья полученныех данныех по шаблону
    Вариант 11: к каждому значению столбца
    s = select([cookies.c.cookie_name, 'SKU-' + cookies.c.cookie_sku]) - добавит строку 'SKU-' к каждому значению столбца
    Вариант 2 -2: через функцию cast
    from sqlalchemy import cast 
    s = select([cookies.c.cookie_name,
                cast((cookies.c.quantity * cookies.c.unit_cost),
                     Numeric(12,2)).label('inv_cost')]) 
    for row in connection.execute(s):
          print('{} - {}'.format(row.cookie_name, row.inv_cost))
    Логические функции:
    from sqlalchemy import and_, or_, not_
    s = select([cookies]).where(
          and_(
              cookies.c.quantity > 23,
              cookies.c.unit_cost < 0.40    ) )
       )
    )
    иИзвлечение данных
    first_row = results[0] 
    first_row[1] 
    first_row.cookie_name 
    first_row[cookies.c.cookie_name]
    Для rp есть следующие варианты, однако без limit() извлекаются все данные, затем одна строка:
    • first() - Первая запись и закрытие соединения
  • fetchone() - Одна запись и оставляет открытый курсор для последующих запросов
  • scalar() - Одно значение если запрос возвращает одно значение в одной строке
  • rp.keys() - список столбцов
  • Обновление данных
    from sqlalchemy import update
    u = update(cookies).where(cookies.c.cookie_name == "chocolate chip")
    u = u.values(quantity=(cookies.c.quantity + 120)) 
    result = connection.execute(u)
    Удаление данных
    from sqlalchemy import delete
    u = delete(cookies).where(cookies.c.cookie_name == "dark chocolate chip")
    result = connection.execute(u)
    JOIN
    columns = [orders.c.order_id, users.c.username, users.c.phone,
                 cookies.c.cookie_name, line_items.c.quantity,
                 line_items.c.extended_cost]
    cookiemon_orders = select(columns)
    cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join( 
                              line_items).join(cookies)).where(users.c.username ==
                                  'cookiemon')
    result = connection.execute(cookiemon_orders).fetchall()
    OUTER JOIN:JOIN
    Для получения обратной статистики
    columns = [users.c.username, func.count(orders.c.order_id)]
    all_orders = select(columns)
    all_orders = all_orders.select_from(users.outerjoin(orders)) 
    all_orders = all_orders.group_by(users.c.username)
    result = connection.execute(all_orders).fetchall()
    Есть поддержка ALIAS
    Группировка данных:
    columns = [users.c.username, func.count(orders.c.order_id)] 
    all_orders = select(columns)
    all_orders = all_orders.select_from(users.outerjoin(orders))
    all_orders = all_orders.group_by(users.c.username) 
    result = connection.execute(all_orders).fetchall()
    Объединение запросов по условию
    def get_orders_by_customer(cust_name, shipped=None, details=False):
          columns = [orders.c.order_id, users.c.username, users.c.phone]
          joins = users.join(orders)
          if details:
              columns.extend([cookies.c.cookie_name, line_items.c.quantity,
                             line_items.c.extended_cost])
              joins = joins.join(line_items).join(cookies)
          cust_orders = select(columns)
          cust_orders = cust_orders.select_from(joins)

          cust_orders = cust_orders.where(users.c.username == cust_name)
          if shipped is not None:
              cust_orders = cust_orders.where(orders.c.shipped == shipped)
          result = connection.execute(cust_orders).fetchall()
          return result
    get_orders_by_customer('cakeeater') 
    get_orders_by_customer('cakeeater', details=True) 
    get_orders_by_customer('cakeeater', shipped=True) 
    get_orders_by_customer('cakeeater', shipped=False) 
    get_orders_by_customer('cakeeater', shipped=False, details=True)