В этом разделе мы рассмотрим, как организовать процесс загрузки данных из системы-источника в хранилище данных на основе подхода ETL/ELT. Наш сценарий предполагает работу с данными из файла Airline_Delay_Cause.csv, которые необходимо сначала загрузить в Staging Area, а затем переместить в Raw Vault, соблюдая концепцию Data Vault.

Основные этапы ETL/ELT-процесса

  1. Extract (Извлечение): Данные извлекаются из файла и загружаются в промежуточную таблицу (Staging_Area).
  2. Load (Загрузка): Сырые данные из Staging Area перемещаются в таблицы Raw Vault (Hubs, Links, Satellites).
  3. Transform (Трансформация): Логика бизнес-преобразований применяется позже на этапе формирования витрин (Business Vault или Data Marts).

Реализация ETL/ELT-процесса

Мы будем использовать Python с Pandas и SQLAlchemy для извлечения и загрузки данных в Staging Area, а затем напишем SQL-скрипты для переноса данных из Staging в Raw Vault. Я намеренно не использую для данного раздела какой-либо ETL инструмент, т.к. это не является целью данного курса. Главное - это понимание основных прицепов загрузки данных, а это достаточно показать на простых скриптах SQL

1. Загрузка данных в Staging Area

Для загрузки данных из файла Airline_Delay_Cause.csv в таблицу Staging_AirlineData мы будем использовать библиотеку Pandas и драйвер pyodbc для подключения к MS SQL.

import pandas as pd
from sqlalchemy import create_engine

# Чтение файла CSV
file_path = 'Airline_Delay_Cause.csv'
data = pd.read_csv(file_path)

# Подключение к MS SQL Express
engine = create_engine('mssql+pyodbc://localhost\\SQLEXPRESS/Staging_Area?driver=SQL+Server+Native+Client+11.0')

# Загрузка данных в таблицу Staging_AirlineData
data.to_sql('Staging_AirlineData', con=engine, if_exists='replace', index=False, method='multi')

print("Данные успешно загружены в Staging Area!")

2. Перенос данных из Staging в Raw Vault

После загрузки данных в Staging Area, мы перенесем их в таблицы Raw Vault с использованием SQL-запросов.

Создание ETL-скриптов для переноса данных
  • Загрузка данных в хабы (Hubs):
INSERT INTO Raw_Vault.Hub.Hub_Airline (Airline_SurrogateKey, Airline_Code, Load_Date, Record_Source)
SELECT 
    NEWID() AS Airline_SurrogateKey,  -- Генерация surrogate key
    DISTINCT Airline_Code,
    GETDATE() AS Load_Date,
    'Staging_AirlineData' AS Record_Source
FROM 
    Staging_Area.dbo.Staging_AirlineData;
  • Загрузка данных в линки (Links):
INSERT INTO Raw_Vault.Link.Link_AirlineFlights (Link_SurrogateKey, Airline_SurrogateKey, Flight_SurrogateKey, Load_Date, Record_Source)
SELECT 
    NEWID() AS Link_SurrogateKey,
    h.Airline_SurrogateKey,
    f.Flight_SurrogateKey,
    GETDATE() AS Load_Date,
    'Staging_AirlineData' AS Record_Source
FROM 
    Staging_Area.dbo.Staging_AirlineData s
JOIN Raw_Vault.Hub.Hub_Airline h ON s.Airline_Code = h.Airline_Code
JOIN Raw_Vault.Hub.Hub_Flight f ON s.Flight_Code = f.Flight_Code;
  • Загрузка данных в сателлиты (Satellites):
INSERT INTO Raw_Vault.Sat.Sat_AirlineDetails (Airline_SurrogateKey, Airline_Name, Year, Month, Total_Delays, Load_Date, Record_Source)
SELECT 
    h.Airline_SurrogateKey,
    s.Airline_Name,
    s.Year,
    s.Month,
    s.Total_Delays,
    GETDATE() AS Load_Date,
    'Staging_AirlineData' AS Record_Source
FROM 
    Staging_Area.dbo.Staging_AirlineData s
JOIN Raw_Vault.Hub.Hub_Airline h ON s.Airline_Code = h.Airline_Code;

Пояснения и примеры

  1. Surrogate Keys:

    • Surrogate keys генерируются с использованием функции NEWID() для обеспечения уникальности каждой записи в хабах и линках.
    • Это позволяет связывать данные из разных систем и гарантирует целостность модели.
  2. Схемы и организация данных:

    • Использование отдельных схем для хабов (Hub), линков (Link) и сателлитов (Sat) в базе данных Raw_Vault упрощает управление и обеспечивает четкое разделение логики.

Заключение

Этот подход к разработке ETL/ELT-процесса позволяет:

  • Упрощать управление данными за счет четкой структуры.
  • Максимально использовать возможности SQL для обработки больших объемов данных.
  • Гибко интегрировать Python для начальной загрузки данных и предварительной обработки.

В следующем разделе мы рассмотрим, как добавить бизнес-логику в Business Vault.