В этом разделе мы рассмотрим, как организовать процесс загрузки данных из системы-источника в хранилище данных на основе подхода ETL/ELT. Наш сценарий предполагает работу с данными из файла Airline_Delay_Cause.csv, которые необходимо сначала загрузить в Staging Area, а затем переместить в Raw Vault, соблюдая концепцию Data Vault.
Основные этапы ETL/ELT-процесса
- Extract (Извлечение): Данные извлекаются из файла и загружаются в промежуточную таблицу (Staging_Area).
- Load (Загрузка): Сырые данные из Staging Area перемещаются в таблицы Raw Vault (Hubs, Links, Satellites).
- Transform (Трансформация): Логика бизнес-преобразований применяется позже на этапе формирования витрин (Business Vault или Data Marts).
Реализация ETL/ELT-процесса
Мы будем использовать Python с Pandas и SQLAlchemy для извлечения и загрузки данных в Staging Area, а затем напишем SQL-скрипты для переноса данных из Staging в Raw Vault. Я намеренно не использую для данного раздела какой-либо ETL инструмент, т.к. это не является целью данного курса. Главное - это понимание основных прицепов загрузки данных, а это достаточно показать на простых скриптах SQL
1. Загрузка данных в Staging Area
Для загрузки данных из файла Airline_Delay_Cause.csv
в таблицу Staging_AirlineData мы будем использовать библиотеку Pandas и драйвер pyodbc для подключения к MS SQL.
import pandas as pd
from sqlalchemy import create_engine
# Чтение файла CSV
file_path = 'Airline_Delay_Cause.csv'
data = pd.read_csv(file_path)
# Подключение к MS SQL Express
engine = create_engine('mssql+pyodbc://localhost\\SQLEXPRESS/Staging_Area?driver=SQL+Server+Native+Client+11.0')
# Загрузка данных в таблицу Staging_AirlineData
data.to_sql('Staging_AirlineData', con=engine, if_exists='replace', index=False, method='multi')
print("Данные успешно загружены в Staging Area!")
2. Перенос данных из Staging в Raw Vault
После загрузки данных в Staging Area, мы перенесем их в таблицы Raw Vault с использованием SQL-запросов.
Создание ETL-скриптов для переноса данных
- Загрузка данных в хабы (Hubs):
INSERT INTO Raw_Vault.Hub.Hub_Airline (Airline_SurrogateKey, Airline_Code, Load_Date, Record_Source)
SELECT
NEWID() AS Airline_SurrogateKey, -- Генерация surrogate key
DISTINCT Airline_Code,
GETDATE() AS Load_Date,
'Staging_AirlineData' AS Record_Source
FROM
Staging_Area.dbo.Staging_AirlineData;
- Загрузка данных в линки (Links):
INSERT INTO Raw_Vault.Link.Link_AirlineFlights (Link_SurrogateKey, Airline_SurrogateKey, Flight_SurrogateKey, Load_Date, Record_Source)
SELECT
NEWID() AS Link_SurrogateKey,
h.Airline_SurrogateKey,
f.Flight_SurrogateKey,
GETDATE() AS Load_Date,
'Staging_AirlineData' AS Record_Source
FROM
Staging_Area.dbo.Staging_AirlineData s
JOIN Raw_Vault.Hub.Hub_Airline h ON s.Airline_Code = h.Airline_Code
JOIN Raw_Vault.Hub.Hub_Flight f ON s.Flight_Code = f.Flight_Code;
- Загрузка данных в сателлиты (Satellites):
INSERT INTO Raw_Vault.Sat.Sat_AirlineDetails (Airline_SurrogateKey, Airline_Name, Year, Month, Total_Delays, Load_Date, Record_Source)
SELECT
h.Airline_SurrogateKey,
s.Airline_Name,
s.Year,
s.Month,
s.Total_Delays,
GETDATE() AS Load_Date,
'Staging_AirlineData' AS Record_Source
FROM
Staging_Area.dbo.Staging_AirlineData s
JOIN Raw_Vault.Hub.Hub_Airline h ON s.Airline_Code = h.Airline_Code;
Пояснения и примеры
-
Surrogate Keys:
- Surrogate keys генерируются с использованием функции NEWID() для обеспечения уникальности каждой записи в хабах и линках.
- Это позволяет связывать данные из разных систем и гарантирует целостность модели.
-
Схемы и организация данных:
- Использование отдельных схем для хабов (Hub), линков (Link) и сателлитов (Sat) в базе данных Raw_Vault упрощает управление и обеспечивает четкое разделение логики.
Заключение
Этот подход к разработке ETL/ELT-процесса позволяет:
- Упрощать управление данными за счет четкой структуры.
- Максимально использовать возможности SQL для обработки больших объемов данных.
- Гибко интегрировать Python для начальной загрузки данных и предварительной обработки.
В следующем разделе мы рассмотрим, как добавить бизнес-логику в Business Vault.