Практическая работа
Загрузка данных из промежуточного слоя в аналитические витрины
В ходе выполнения задания вы обеспечите преобразование и загрузку данных из промежуточного (стейджингового) хранилища на базе Managed Service for Kafka в витрину данных нашего аналитического хранилища.
Шаг 1.
Настроим кластер ClickHouse для интеграции с Apache Kafka.
В консоли управления Yandex.Cloud выберите Managed Service for ClickHouse. В правой верхней части страницы нажмите Изменить кластер. Откроется страница с параметрами кластера ClickHouse.
В нижней части страницы в блоке Настройки СУБД нажмите кнопку Настроить. В открывшемся окне укажите параметры Kafka:
  • Sasl mechanismSCRAM-SHA-.
  • Sasl password — В нашем случае pass@word1.
  • Sasl username — В нашем случае inventory.
  • Security protocol SASL_SSL.
На данном этапе в таблице хранится избыточная иерархическая структура json с большой вложенностью. Для загрузки в таблицы ClickHouse её необходимо упростить.
Шаг 2.
Для этого в настройках коннектора Debezium включим преобразование записи single message transformation (SMT). В результате запись будет содержать только собственно данные, что сделает ее пригодной для вставки в ClickHouse с использованием выражения JSONEachRow (https://debezium.io/documentation/reference/configuration/event-flattening.html).
Чтобы включить преобразование, измените настройки коннектора Debezium: добавьте в файл register-connectors.http следующие строки.
"transforms" : "unwrap",
"transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState",
Выполните REST-запрос на сервис для применения настроек. Подобнее о работе с коннектором – см. урок 2.2
Для проверки добавим несколько записей в таблицу products: подключитесь к Microsoft SQL Server ивыполните следующий SQL запрос:
INSERT INTO products(name,description,weight) 
VALUES ('bike 12','Small 2-wheel scooter',3.14);
INSERT INTO products(name,description,weight)
VALUES ('car battery','12V car battery',8.1);
INSERT INTO products(name,description,weight)
VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); 
INSERT INTO products(name,description,weight)
VALUES ('hammer','12oz carpenter''s hammer',0.75);
Убедитесь, что в кластере Kafka в топиках появилась более простая структура данных.
Подключитесь к ClickHouse используя DBeaver и создайте таблицу ClickHouse для чтения топика Kafka. В редакторе введите запрос на создание таблицы kafka_store_data:
CREATE TABLE kafka_store_data ( 
                          store_id UInt32,
                          store_name String, 
                          store_address Nullable(String), 
                          store_location Nullable(String), 
                          description Nullable(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = '<kafka-broker-fqdn>:9091', 
                                 kafka_topic_list = 'inventory.dbo.store_data',
                                 kafka_group_name = 'inventory-consumer-group',
                                 kafka_format = 'JSONEachRow';
В запросе замените:
  • < kafka-broker-fqdn > на хост кластера Kafka.
Создайте основную таблицу ch_store_data на кластере ClickHouse для репликации изменений:
В редакторе введите запрос на создание таблицы:
CREATE TABLE ch_store_data  ON CLUSTER  '{cluster}'(
	 store_id  UInt32,
	 store_name String,
	 store_address Nullable(String),
	 description Nullable(String)
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/ch_store_data', '{replica}') 
ORDER BY (store_id);
Чтобы посмотреть содержимое созданной таблицы, выполните запрос:
SELECT *FROM ch_store_data;
Создайте материализованное представление, выполнив следующий запрос:
CREATE MATERIALIZED VIEW materialized_store_data TO ch_store_data
		AS SELECT  store_id, store_name, store_address, store_location, description
FROM kafka_store_data;
Шаг 3.
Для демонстрации сценария с построением отчетов в DataLens (урок 5) нам понадобится таблица, содержащая информацию о точках отгрузки (включая их географические координаты/ адреса для отображения на карте).

Создадим эту таблицу, включим для нее возможность CDC и выполним заполнение таблицы, путем импорта данных из csv- файла

Подключитесь к Microsoft SQL Server используя DBeaver и выполните следующий скрипт:
CREATE TABLE store_data (
	store_id bigint NULL,
	store_name nvarchar(11) NULL,
	store_address nvarchar(105) NULL,
	store_location varchar(27) NULL,
	description nvarchar(257) NULL
);

EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'store_data', @role_name = NULL, @supports_net_changes = 0;
В результате вы создадите таблицу store_data и включите CDC.


Заполните таблицу, импортировав данные из файла примера store_data.csv: вызовите контекстное меню правой кнопкой мыши и выберите Import Data.
В открывшемся окне укажите источник CSV и целевую базу данных Inventory.dbo.store_data.
В поле Source Container выберите файл store_data.csv с данными для импорта.
Укажите в параметрах импорта: Column delimeter; и нажмите Next.
В открывшемся окне вы можете нажать Preview data для просмотра импортируемой информации.
На следующем шаге проверьте настройки импорта и нажмите Start для запуска операции.
Убедитесь, что в Kafka был создан топик для таблицы store_data, с данными json, соответствующие записям в таблицах.
Данные успешно переместились в нашу витрину данных в ClickHouse. Для просмотра данных в ClickHouse выполните запрос:
SELECT * FROM ch_store_data;
Конец документа