В рамках данной лабораторной работы необходимо:
- создать GitHub-репозиторий.
- произвести генерацию синтетических данных (одна таблица) с первым или вторым типом историчности, категориальными и количественными признаками (полей в выходящей таблице должно быть десять).
- сделать выгрузку таблицы в базу данных PostgreSQL, трансформировав данные от аномалий.
Детальное описание лабораторной работы:
- Поиск данных:
- Генерация синтетических данных: данные генерировать можно любым доступным способом. Можно прикрутить LLM (API, либо развернуть на мощности своей машины), так и сгенерировать любым другим способом. Как вариант: создать функцию get_dataset(...), в рамках которой будет проводиться генерация. Важно! Данные ОБЯЗАНЫ быть "сломанными" - чтобы их можно было почистить или структурировать далее.
- После генерации данных приступаем к загрузке ее в базу в формате "как есть" (например, с помощью Python или любого другого ETL-инструмента или языка программирования):
- Создать функцию, например, load_data_to_db(...) в рамках нее реализовать загрузку данных в базу в таблицу t_sql_source_unstructured.
- Создать схему s_psql_dds. Создать таблицы в этой схеме t_sql_source_unstructured (в этой таблице должны быть данные, которые вы загрузили в п.2) и t_sql_source_structured (то, куда вы загрузите очищенные и структурированные данные). Создать SQL-процедуру/SQL-функцию, которая будет убирать аномалии в данных и приводить данные в работоспособный вид (замечание: код избавления от аномалий должен быть написан исключительно на SQL! В качестве параметров функции/процедуры должны быть указаны) start_date и end_date - параметры с типом данных DATE.
- Например, можно создать PostgreSQL-функцию с названием fn_etl_data_load(start_date date, end_date date), в рамках которой будет проводиться трансформация данных и загрузка из загруженного в базу источника в предварительно подготовленную таблицу.
- Создать структуру репозитория: data-pipeline/"тут ваш проект на любом языке программирования, начиная с корневой папки (для питона: где main-py-файл - обычно, это корень)", sql/"dds(папки: table - Тут храниться будут ddl-таблиц, которые будут принимать участие в расчете, function - тут будут все функции, которые вы будете создавать для расчета, view - тут будут все представления") и выгрузить код в репозиторий.
- После написания SQL-функции создать функцию fill_structured_table(...) в приложении, которая будет просто запускать функцию fn_etl_data_load на PostgreSQL.
- Объединить функцию генерации датасета, функцию загрузки в неструктурированную таблицу и функцию запуска заполнения структурированной таблицы в одну верхнеуровневую функцию etl().
Пример структуры репозитория для пары Python - SQL (где написано init.txt - пустой файл, временное решение, постепенно будет наполняться кодом):
|-----------------------------------------------|
| data-pipeline/
|---- src/
|-------- get_dataset.py
|-------- load_data_to_db.py
|-------- fill_structured_table.py
|-------- etl.py
|-------- ...
|---- main.py
|---- config.py
|---- ...
| sql/
|---- dds/
|-------- s_sql_dds/
|------------ table/
|---------------- t_sql_source_unstructured.sql
|---------------- t_sql_source_structured.sql
|------------ function/
|---------------- fn_etl_data_load.sql
|------------ view/
|---------------- init.txt
|------------ trigger/
|---------------- init.txt
|------------ ...
|---- dm/
|-------- s_sql_dm/
|------------ table/
|---------------- init.txt
|------------ function/
|---------------- init.txt
|------------ view/
|---------------- init.txt
|------------ trigger/
|---------------- init.txt
|------------ ...
| .gitignore
| README.md
|-----------------------------------------------|
- Писать код маленькими буквами.
- Любой объект НАЧИНАТЬ называть следующим образом:
- для таблицы - t_: t_table
- для представления - v_: v_view
- для функции - fn_: fn_function
- для схемы - s_: s_schema
- использование Apache Airflow, DAG, DBT
- Код приложения и SQL-код работают и выдают данные в целевой таблице.
- Параметры функции в виде временного периода применяются в расчете.
- Наличие верхнеуровневой функции etl() в приложении, которая состоит из:
- запуска функции get_dataset(...)
- запуска функции load_data_to_db(...)
- запуска функции fill_structured_table(...)
- функция etl() должна быть без входных параметров. Три функции, перечисленные выше, могут иметь входные параметры, если это необходимо.
- Написанный и работающий Dockerfile для сборки проекта в контейнер.
- Написание автотестов (unit-tests) на работоспособность процедур:
- создание fn_etl_data_load_test(start_date date, end_date date), которая будет заполнять итоговую таблицу t_sql_source_structured_copy
- подключение библиотек pytest/junit/testng/др. для создания автотестов
- создание теста на запуск процедуры SQL (только локально, для п.6 сделать заглушку)
- покрытие кода тестами
- Создание GitHub Actions Pipeline, показывающий, что проект билдится с помощью докерфайла и тесты проходят.
Из получившихся таблиц реализовать схему данных: в центре будет таблица с ИД всех справочников и количественных признаков (фактов)
- Спроектировать схему данных из получившихся таблиц в формате снежинки или звезды.
- Создать таблицы-справочники и загрузить в них данные в формате (id serial, name varchar) из категориальных признаков таблицы t_sql_source_structured. Количественные признаки (факты) не трогаем.
- Создать функцию fn_dm_data_load(start_dt date, end_dt date) и таблицу t_dm_task (все в схеме из первой лабораторной работы), которая будет брать данные из таблицы t_sql_source_structured, в функции будет происходить джойн этой таблицы на справочники по name и вытаскивание ИД со вставкой в таблицу. Структура таблицы t_dm_task: аналогична таблице t_sql_source_structured, но с добавлением id из справочников. Поля ID должны быть названы соответствующе и иметь тип данных INT/BIGINT.
- Создать представление VIEW с названием v_dm_task, которое будет смотреть на таблицу t_dm_task. Не писать "select * from t_dm_task" (!!!). Перечислить все поля их получившейся таблицы t_dm_task при создании представления (представление в данном случае является витриной).
- Создать на стороне вашего приложения функцию с формированием витрины, например, с названием fill_dm_table(...).
- Развернуть СУБД MySQL. Создать в ней таблицы t_dm_task и t_dm_stg_task с аналогичной структурой как в t_dm_task. Переложить данные из витрины v_dm_task в таблицу t_dm_stg_task, подготовленную на стороне MySQL.
- На стороне MySQL создать функцию или процедуру fn_dm_data_stg_to_dm_load(start_dt date, end_dt date), которая будет перекладывать данные из t_dm_stg_task в t_dm_task за временной промежуток с предварительной очисткой данных в целевой таблице за промежуток времени.
- На стороне приложения создать функцию, которая будет запускать перекладку данных и вызов MySQL-функции, которая будет обеспечивать загрузку в t_dm_task.
- Код приложения и SQL-код работают.
- В представлении присутствуют данные, насчитанные SQL-функцией/процедурой, вызванной из приложения.
- Схема данных корректна и не требует доработок.
- Перекладка данных из приложения работает и данные на стороне MySQL присутствуют.
|-----------------------------------------------|
| data-pipeline/
|---- src/
|-------- get_dataset.py
|-------- load_data_to_db.py
|-------- fill_structured_table.py
|-------- etl.py
|-------- fill_dm_table.py
|---- main.py
|---- config.py
| sql/
|---- dds/
|-------- s_sql_dds/
|------------ table/
|---------------- t_sql_source_unstructured.sql
|---------------- t_sql_source_structured.sql
|---------------- t_dm_task.sql
|------------ function/
|---------------- fn_etl_data_load.sql
|---------------- fn_dm_data_load.sql
|------------ view/
|---------------- v_dm_task.sql
|---- dm/
|-------- s_sql_dm/
|------------ table/
|---------------- t_dm_stg_task.sql
|---------------- t_dm_task.sql
|------------ function/
|---------------- fn_dm_data_stg_to_dm_load.sql
|------------ view/
|---------------- v_dm_task.sql
| .gitignore
| README.md
|-----------------------------------------------|
В рамках данной лабораторной работы необходимо:
- Определить набор правил и метрик для проверки качества данных в витрине (v_dm_task) и целевой таблице в MySQL.
- Разработать автоматизированный DataQuality-пайплайн, который будет выполнять эти проверки на регулярной основе.
- Реализовать хранение истории выполнения проверок и их результатов.
- Создать простой дашборд для визуализации ключевых метрик качества данных.
- Настроить базовый механизм алертинга о критических нарушениях.
- Проектирование метрик контроля качества.
На основе структуры данных в витрине v_dm_task определить 5 проверок, например:
- Правильность: сравнение данных между витриной и истичником, например, по сумме или количеству чего-нибудь.
- Полнота: процент пропусков в критических полях (например, ID заказа, сумма).
- Непротиворечивость: проверка бизнес-правил (например, сумма по конкретному объекту атрибута не должна превышать сумму по всему множеству объектов этого атрибута).
- Уникальность: отсутствие дубликатов по ключевым полям.
- Валидность: значения в колонках соответствуют допустимым диапазонам или справочникам (например, status IN ('new', 'processed', 'shipped', 'cancelled')).
- Хранение результатов расчета метрик качества данных.
- Создать таблицу s_sql_dds.t_dq_check_results со структурой:
drop table if exists s_sql_dds.t_dq_check_results;
create table s_sql_dds.t_dq_check_results (
check_id serial primary key, -- уникальный идентификатор проверки, автоинкремент.
check_type varchar, -- тип проверки: полнота, правильность, точность и т. д.
table_name varchar, -- название проверяемой таблицы
execution_date timestamp(6) default current_timestamp, -- время запуска проверки
status varchar, -- passed, failed, error, cancelled...
error_message varchar -- текст ошибки или детали проверки
);
- Заполнение функции качества данных.
- Создать функцию s_sql_dds.fn_dq_checks_load(start_dt date, end_dt date), в которой будут последовательно запускаться все запланированные SQL-проверки качества данных и заполнится таблица из п.2.
- Добавить последним шагом в пайплайн приложения запуск функции расчета метрик контроля качества данных.
- Приложение запускается и работает.
- SQL-код запускается и работает.
- Все запланированные проверки должны быть пройдены.
- Приложение обернуто в докер и готово к запуску в контейнере.