"transforms" : "unwrap",
"transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState",
INSERT INTO products(name,description,weight)
VALUES ('bike 12','Small 2-wheel scooter',3.14);
INSERT INTO products(name,description,weight)
VALUES ('car battery','12V car battery',8.1);
INSERT INTO products(name,description,weight)
VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO products(name,description,weight)
VALUES ('hammer','12oz carpenter''s hammer',0.75);
CREATE TABLE kafka_store_data (
store_id UInt32,
store_name String,
store_address Nullable(String),
store_location Nullable(String),
description Nullable(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = '<kafka-broker-fqdn>:9091',
kafka_topic_list = 'inventory.dbo.store_data',
kafka_group_name = 'inventory-consumer-group',
kafka_format = 'JSONEachRow';
CREATE TABLE ch_store_data ON CLUSTER '{cluster}'(
store_id UInt32,
store_name String,
store_address Nullable(String),
description Nullable(String)
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/ch_store_data', '{replica}')
ORDER BY (store_id);
SELECT *FROM ch_store_data;
CREATE MATERIALIZED VIEW materialized_store_data TO ch_store_data
AS SELECT store_id, store_name, store_address, store_location, description
FROM kafka_store_data;
CREATE TABLE store_data (
store_id bigint NULL,
store_name nvarchar(11) NULL,
store_address nvarchar(105) NULL,
store_location varchar(27) NULL,
description nvarchar(257) NULL
);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'store_data', @role_name = NULL, @supports_net_changes = 0;
SELECT * FROM ch_store_data;