Практическая работа
Синхронизация данных SQL Server и Managed Kafka в облаке с помощью CDC на основе Debezium
Из описания сценария вы узнаете, как обеспечить передачу данных о складских остатках из внешней базы SQL Server в облако при помощи Debezium Kafka CDC. Аналогичный подход будет работать и для других БД, поддерживаемых Debezium: Oracle, DB2 и др. Для того, чтобы извлечь информацию из БД SQL Server в режиме CDC (захват изменения данных), мы должны разрешить эти изменения сначала на уровне базы данных, а потом и для каждой таблицы, которая будет участвовать в процессе репликации. Для приема данных будет использоваться контейнер Debezium, в котором будет развернут компонент Kafka Connect. Затем эти изменения будут передаваться в Apache Kafka в облаке — Managed Service for Kafka. И наконец, мы настроим интеграцию Managed Service for Kafka и хранилищем данных на базе Clickhouse.
Шаг 1.
Создайте виртуальную машину по инструкции.

Для передачи данных по сети нам нужно настроить параметры группы безопасности. Для этого перейдите в параметры ВМ, раздел Сеть и кликните на ссылку с именем группы безопасности. Добавьте разрешение на входящий трафик с портов 80, 443, 9091, 8083, а также с порта SQL Server 1433.
Шаг 2.
Создадим кластер Kafka:

В консоли управления выберите каталог, в котором нужно создать кластер БД.
Выберите сервис Managed Service for Kafka и нажмите Создать кластер.
Задайте Имя кластераinventory-cluster.

Поставьте галочку — Управление топиками через API. Она включает режим unmanaged-topics, который позволяет использовать API внешних приложений для создания данных в кластере Apache Kafka.

Включите Публичный доступ для обращения к кластеру извне облака.

В разделе Сетевые настройки укажите только одну зону доступности, потому как в рамках данной лабораторной работы не требуется отказоустойчивая конфигурация.

Остальные параметры оставьте по умолчанию.

Нажмите кнопку Создать кластер.
Шаг 3.
Создадим пользователя кластера Managed Service for Kafka с правами администратора, воспользовавшись интерфейсом командной строки (CLI). Инструкцию по установке и настройке CLI см. здесь.

$ yc managed-kafka user create inventory --cluster-name inventory-cluster --folder-id <your-yc-folder-id> --password=pass@word1 --permission topic="*",role=ACCESS_ROLE_ADMIN 
<your-yc-folder-id> замените на идентификатор своего облачного каталога.

Данные будут передаваться в Apache Kafka в формате *.json. В данном случае, стейджинговое хранилище выступает в роли платформы массивной передачи сообщений. Сначала передаются снэпшоты, а потом и изменения.

Чтобы для каждой таблицы автоматически создавался свой собственный топик, установим флаг auto-create-topics-enable:
$ yc managed-kafka cluster update --id <cluster_id> --auto-create-topics-enable
<cluster_id> замените на идентификатор кластера.
Шаг 4.
Настроим Debezium, воспользовавшись преднастроенным контейнером.
Фактически, это стандартный контейнер с Debezium, в который дополнительно установлен корневой сертификат Яндекс. Облако и создано хранилище ключей client.truststore.jks с паролем pass@word1, в которое так же добавлен корневой сертификат Яндекс.Облако, чтобы упростить задачу аутентификации в процессе выполнения работы.
Подключитесь к ВМ по ssh:
$ ssh yc-user@<ip-address-vm>
<ip-address-vm> замените на публичный IP-адрес ВМ.

Установите обновления и пакеты git, docker и docker-compose:
$ sudo apt update && sudo apt install git docker.io docker-compose
Добавьте текущего пользователя в группу докер:
$ sudo gpasswd -a $USER docker
$ newgrp docker
Клонируйте репозиторий с исходным кодом:
$ git clone https://github.com/MaxKhlupnov/yc-cdc-datamart
В тестовом примере пароль для файла client.truststore.jks – pass@word1
Мы будет использовать Debezium® connector в контейнере Docker®.
см. https://hub.docker.com/r/debezium/connect , который содержит конвекторы для работы со следующими БД: mongodb, mysql, postgres, sqlserver, oracle, db2

Чтобы обеспечить работу с управляемыми сервисами Yandex Cloud, нам необходимо модифицировать стандартный контейнер коннектора Debezium®, для возможности установить соединение с управляемыми сервисами Yandex Cloud по SSL.

В папке debezium-cdc репозитория находится код контейнера yc-connect.
Этот контейнер наследует от штатного коннектора Debezium®, при этом добавляет в него корневые сертификаты Yandex Cloud и файл настроек конфигурации Kafka Connect.

Файл client.truststore.jks реализует предварительно подготовленный репозиторий хранилище доверенных сертификатов (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера.
К хранилищу доверенных сертификатов требуется установить пароль.
Перейдите в каталог ~/yc-cdc-datamart/debezium-cdc/
Выполним сборку контейнера yc-connect :
$ docker build -t yc-connect -f ./yc-connect/Dockerfile yc-connect/
Выполним сборку контейнера yc-connect :
Запустим команду:
$ docker-compose up
Будут развернуты два контейнера — в одном из них коннектор Debezium из собранного ранее образа yc-connect, в другом — SQL Server из стандартного образа Microsoft. Таким образом, мы эмулируем распределенную инфраструктуру. В реальном сценарии контейнер и SQL Server будут находиться, скорее всего, на разных физических и/или виртуальных серверах.

Обратите внимание, что если Вы прервёте терминальную сессию – выполнение docker-compose завершится, и SQL Server и коннектор Debezium будут остановлены.
Чтобы запустить docker-compose в фоновом режиме используйте флаг -d
$ docker-compose up -d
Как только запускаются коннектор Debezium из образа yc-connect, он автоматически подключаются к Managed Apache Kafka с использованием тех характеристик, которые мы указали при настройке, и создают системные топики, которые необходимы для передачи изменений — offset, config и status. Эти топики показывают смещение между чтением и текущим положением временного указателя, конфигурацию и информацию об ошибках.
В консоли можно наблюдать диагностические сообщения контейнеров. Если docker-compose запущен с флагом -d , то для просмотра логов контейнера можно воспользоваться конструкцией docker logs.
Шаг 5.
Следующим шагом нам необходимо настроить получение информации из SQL Server.


Для отображения схемы БД воспользуемся SQL Server management studio (SSMS) или DBeaver.

В параметрах соединения укажите адрес виртуальной машины с контейнером Debezium-а, Тип аутентификацииSQL Server Authentication, Имя пользователяsa и ПарольPassword!

Используем скрипт ~/yc-cdc-datamart/debezium-cdc/SQL/inventory-mssql.sql для создания БД и заполнения ее данными, а также включим для БД CDC.
Используем скрипт ~/yc-cdc-datamart/debezium-cdc/SQL/starting-agent.sql для включения SQL Server Agent-а, он создаст задания, которые будут обеспечивать синхронизацию. Убедитесь, что SQL Server Agent запущен и успешно создались задания для формирования таблицы изменений и второе – для обеспечения очистки. Подробнее см. в документации Microsoft.
Чтобы наблюдать за процессом передачи сообщений и создания топиков в Kafka графическом интерфейсе - установите инструмент Conduktor. Как альтернатива – Вы можете воспользоваться утилитой командной строки kafkacat, в документации описывается процесс подключения и использования kafkacat для подключения и работы с кластером.
Создайте подключение к кластеру. Файл сертификата client.truststore.jks, находящийся в репозитории, необходимо сохранить на локальный компьютер и прописать путь к нему. В строке bootstrap-servers пропишите адрес хоста своего кластера Kafka.
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="inventory" password="pass@word1";
ssl.truststore.location=C:/JavaKeyStore/client.truststore.jks
ssl.truststore.password=pass@word1
Сохраните параметры и откройте созданное подключение. Проследите, что автоматически создались системные топики (offsets, configs и statuses), о которых говорилось выше.
Шаг 6.
Запустим процесс синхронизации. Для этого настроим клиента Debezium. Это можно сделать через REST веб-сервис, который публикует клиент Debezium.
Выполним REST-запрос на сервис
Для выполнения REST запроса можно воспользоваться VS Code с установленным plugin REST Client или командой curl.
Отредактируйте параметры запроса в файле, укажите идентификатор кластера Kafka, FQDN или IP-адрес ВМ, на которой запущен контейнеры с SQL-Server.
Если Вы используете VS Code – используйте шаблон из файла ~/yc-cdc-datamart/debezium-cdc/SQL/register-connectors.http, а затем выполните запрос, нажав Send Request.
Если Вы используете curl – используйте шаблон из файла ~/yc-cdc-datamart/debezium-cdc/SQL/register-connector.json , а затем выполните запрос:
$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-connector.json
Убедитесь, что в кластере создались топики для каждой таблицы БД MS SQL Server, для которой был настроен CDC.
Убедитесь, что в топиках таблиц, что в топиках появились JSON, соответствующие записям в таблицах.
Конец документа