1use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::Serialize;
29use serde::ser::{SerializeSeq, SerializeStruct};
30use serde_derive::Deserialize;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use super::decouple_checkpoint_log_sink::{
38 DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
39};
40use super::writer::SinkWriter;
41use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
42use crate::enforce_secret::EnforceSecret;
43use crate::error::ConnectorResult;
44use crate::sink::{
45 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
46};
47
48const QUERY_ENGINE: &str =
49 "select distinct ?fields from system.tables where database = ? and name = ?";
50const QUERY_COLUMN: &str =
51 "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
52pub const CLICKHOUSE_SINK: &str = "clickhouse";
53
54#[serde_as]
55#[derive(Deserialize, Debug, Clone, WithOptions)]
56pub struct ClickHouseCommon {
57 #[serde(rename = "clickhouse.url")]
58 pub url: String,
59 #[serde(rename = "clickhouse.user")]
60 pub user: String,
61 #[serde(rename = "clickhouse.password")]
62 pub password: String,
63 #[serde(rename = "clickhouse.database")]
64 pub database: String,
65 #[serde(rename = "clickhouse.table")]
66 pub table: String,
67 #[serde(rename = "clickhouse.delete.column")]
68 pub delete_column: Option<String>,
69 #[serde(default = "default_commit_checkpoint_interval")]
71 #[serde_as(as = "DisplayFromStr")]
72 pub commit_checkpoint_interval: u64,
73}
74
75impl EnforceSecret for ClickHouseCommon {
76 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77 "clickhouse.password", "clickhouse.user"
78 };
79}
80
81#[allow(clippy::enum_variant_names)]
82#[derive(Debug)]
83enum ClickHouseEngine {
84 MergeTree,
85 ReplacingMergeTree(Option<String>),
86 SummingMergeTree,
87 AggregatingMergeTree,
88 CollapsingMergeTree(String),
89 VersionedCollapsingMergeTree(String),
90 GraphiteMergeTree,
91 ReplicatedMergeTree,
92 ReplicatedReplacingMergeTree(Option<String>),
93 ReplicatedSummingMergeTree,
94 ReplicatedAggregatingMergeTree,
95 ReplicatedCollapsingMergeTree(String),
96 ReplicatedVersionedCollapsingMergeTree(String),
97 ReplicatedGraphiteMergeTree,
98 SharedMergeTree,
99 SharedReplacingMergeTree(Option<String>),
100 SharedSummingMergeTree,
101 SharedAggregatingMergeTree,
102 SharedCollapsingMergeTree(String),
103 SharedVersionedCollapsingMergeTree(String),
104 SharedGraphiteMergeTree,
105 Null,
106}
107impl ClickHouseEngine {
108 pub fn is_collapsing_engine(&self) -> bool {
109 matches!(
110 self,
111 ClickHouseEngine::CollapsingMergeTree(_)
112 | ClickHouseEngine::VersionedCollapsingMergeTree(_)
113 | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
114 | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
115 | ClickHouseEngine::SharedCollapsingMergeTree(_)
116 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
117 )
118 }
119
120 pub fn is_delete_replacing_engine(&self) -> bool {
121 match self {
122 ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
123 ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
124 ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
125 _ => false,
126 }
127 }
128
129 pub fn get_delete_col(&self) -> Option<String> {
130 match self {
131 ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
132 ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
133 Some(delete_col.to_string())
134 }
135 ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
136 Some(delete_col.to_string())
137 }
138 _ => None,
139 }
140 }
141
142 pub fn get_sign_name(&self) -> Option<String> {
143 match self {
144 ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
145 ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => {
146 Some(sign_name.to_string())
147 }
148 ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => {
149 Some(sign_name.to_string())
150 }
151 ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
152 Some(sign_name.to_string())
153 }
154 ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
155 ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
156 Some(sign_name.to_string())
157 }
158 _ => None,
159 }
160 }
161
162 pub fn is_shared_tree(&self) -> bool {
163 matches!(
164 self,
165 ClickHouseEngine::SharedMergeTree
166 | ClickHouseEngine::SharedReplacingMergeTree(_)
167 | ClickHouseEngine::SharedSummingMergeTree
168 | ClickHouseEngine::SharedAggregatingMergeTree
169 | ClickHouseEngine::SharedCollapsingMergeTree(_)
170 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
171 | ClickHouseEngine::SharedGraphiteMergeTree
172 )
173 }
174
175 pub fn from_query_engine(
176 engine_name: &ClickhouseQueryEngine,
177 config: &ClickHouseConfig,
178 ) -> Result<Self> {
179 match engine_name.engine.as_str() {
180 "MergeTree" => Ok(ClickHouseEngine::MergeTree),
181 "Null" => Ok(ClickHouseEngine::Null),
182 "ReplacingMergeTree" => {
183 let delete_column = config.common.delete_column.clone();
184 Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
185 }
186 "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
187 "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
188 "VersionedCollapsingMergeTree" => {
190 let sign_name = engine_name
191 .create_table_query
192 .split("VersionedCollapsingMergeTree(")
193 .last()
194 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
195 .split(',')
196 .next()
197 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
198 .trim()
199 .to_owned();
200 Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
201 }
202 "CollapsingMergeTree" => {
204 let sign_name = engine_name
205 .create_table_query
206 .split("CollapsingMergeTree(")
207 .last()
208 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
209 .split(')')
210 .next()
211 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
212 .trim()
213 .to_owned();
214 Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
215 }
216 "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
217 "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
218 "ReplicatedReplacingMergeTree" => {
219 let delete_column = config.common.delete_column.clone();
220 Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
221 delete_column,
222 ))
223 }
224 "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
225 "ReplicatedAggregatingMergeTree" => {
226 Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
227 }
228 "ReplicatedVersionedCollapsingMergeTree" => {
230 let sign_name = engine_name
231 .create_table_query
232 .split("ReplicatedVersionedCollapsingMergeTree(")
233 .last()
234 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
235 .split(',')
236 .rev()
237 .nth(1)
238 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
239 .trim()
240 .to_owned();
241 Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
242 sign_name,
243 ))
244 }
245 "ReplicatedCollapsingMergeTree" => {
247 let sign_name = engine_name
248 .create_table_query
249 .split("ReplicatedCollapsingMergeTree(")
250 .last()
251 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
252 .split(')')
253 .next()
254 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
255 .split(',')
256 .next_back()
257 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
258 .trim()
259 .to_owned();
260 Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
261 }
262 "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
263 "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
264 "SharedReplacingMergeTree" => {
265 let delete_column = config.common.delete_column.clone();
266 Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
267 }
268 "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
269 "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
270 "SharedVersionedCollapsingMergeTree" => {
272 let sign_name = engine_name
273 .create_table_query
274 .split("SharedVersionedCollapsingMergeTree(")
275 .last()
276 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
277 .split(',')
278 .rev()
279 .nth(1)
280 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
281 .trim()
282 .to_owned();
283 Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
284 sign_name,
285 ))
286 }
287 "SharedCollapsingMergeTree" => {
289 let sign_name = engine_name
290 .create_table_query
291 .split("SharedCollapsingMergeTree(")
292 .last()
293 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
294 .split(')')
295 .next()
296 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
297 .split(',')
298 .next_back()
299 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
300 .trim()
301 .to_owned();
302 Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
303 }
304 "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
305 _ => Err(SinkError::ClickHouse(format!(
306 "Cannot find clickhouse engine {:?}",
307 engine_name.engine
308 ))),
309 }
310 }
311}
312
313impl ClickHouseCommon {
314 pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
315 let client = ClickHouseClient::default() .with_url(&self.url)
317 .with_user(&self.user)
318 .with_password(&self.password)
319 .with_database(&self.database);
320 Ok(client)
321 }
322}
323
324#[serde_as]
325#[derive(Clone, Debug, Deserialize, WithOptions)]
326pub struct ClickHouseConfig {
327 #[serde(flatten)]
328 pub common: ClickHouseCommon,
329
330 pub r#type: String, }
332
333impl EnforceSecret for ClickHouseConfig {
334 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
335 ClickHouseCommon::enforce_one(prop)
336 }
337
338 fn enforce_secret<'a>(
339 prop_iter: impl Iterator<Item = &'a str>,
340 ) -> crate::error::ConnectorResult<()> {
341 for prop in prop_iter {
342 ClickHouseCommon::enforce_one(prop)?;
343 }
344 Ok(())
345 }
346}
347
348#[derive(Clone, Debug)]
349pub struct ClickHouseSink {
350 pub config: ClickHouseConfig,
351 schema: Schema,
352 pk_indices: Vec<usize>,
353 is_append_only: bool,
354}
355
356impl EnforceSecret for ClickHouseSink {
357 fn enforce_secret<'a>(
358 prop_iter: impl Iterator<Item = &'a str>,
359 ) -> crate::error::ConnectorResult<()> {
360 for prop in prop_iter {
361 ClickHouseConfig::enforce_one(prop)?;
362 }
363 Ok(())
364 }
365}
366
367impl ClickHouseConfig {
368 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
369 let config =
370 serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
371 .map_err(|e| SinkError::Config(anyhow!(e)))?;
372 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
373 return Err(SinkError::Config(anyhow!(
374 "`{}` must be {}, or {}",
375 SINK_TYPE_OPTION,
376 SINK_TYPE_APPEND_ONLY,
377 SINK_TYPE_UPSERT
378 )));
379 }
380 Ok(config)
381 }
382}
383
384impl TryFrom<SinkParam> for ClickHouseSink {
385 type Error = SinkError;
386
387 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
388 let schema = param.schema();
389 let config = ClickHouseConfig::from_btreemap(param.properties)?;
390 Ok(Self {
391 config,
392 schema,
393 pk_indices: param.downstream_pk,
394 is_append_only: param.sink_type.is_append_only(),
395 })
396 }
397}
398
399impl ClickHouseSink {
400 fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
402 let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
403 let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
404 .iter()
405 .map(|s| (s.name.clone(), s.clone()))
406 .collect();
407
408 if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
409 return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
410 }
411
412 for i in rw_fields_name {
413 let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
414 SinkError::ClickHouse(format!(
415 "Column name don't find in clickhouse, risingwave is {:?} ",
416 i.0
417 ))
418 })?;
419
420 Self::check_and_correct_column_type(&i.1, value)?;
421 }
422 Ok(())
423 }
424
425 fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
427 let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
428 .iter()
429 .filter(|s| s.is_in_primary_key == 1)
430 .map(|s| s.name.clone())
431 .collect();
432
433 for (_, field) in self
434 .schema
435 .fields()
436 .iter()
437 .enumerate()
438 .filter(|(index, _)| self.pk_indices.contains(index))
439 {
440 if !clickhouse_pks.remove(&field.name) {
441 return Err(SinkError::ClickHouse(
442 "Clicklhouse and RisingWave pk is not match".to_owned(),
443 ));
444 }
445 }
446
447 if !clickhouse_pks.is_empty() {
448 return Err(SinkError::ClickHouse(
449 "Clicklhouse and RisingWave pk is not match".to_owned(),
450 ));
451 }
452 Ok(())
453 }
454
455 fn check_and_correct_column_type(
457 fields_type: &DataType,
458 ck_column: &SystemColumn,
459 ) -> Result<()> {
460 let is_match = match fields_type {
462 risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
463 risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
464 | ck_column.r#type.contains("Int16")
465 | ck_column.r#type.contains("Enum16")),
468 risingwave_common::types::DataType::Int32 => {
469 Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
470 }
471 risingwave_common::types::DataType::Int64 => {
472 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
473 }
474 risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
475 risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
476 risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
477 risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
478 risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
479 risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
480 "clickhouse can not support Time".to_owned(),
481 )),
482 risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
483 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
484 )),
485 risingwave_common::types::DataType::Timestamptz => {
486 Ok(ck_column.r#type.contains("DateTime64"))
487 }
488 risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
489 "clickhouse can not support Interval".to_owned(),
490 )),
491 risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
492 "struct needs to be converted into a list".to_owned(),
493 )),
494 risingwave_common::types::DataType::List(list) => {
495 Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
496 Ok(ck_column.r#type.contains("Array"))
497 }
498 risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
499 "clickhouse can not support Bytea".to_owned(),
500 )),
501 risingwave_common::types::DataType::Jsonb => Err(SinkError::ClickHouse(
502 "clickhouse rust can not support Json".to_owned(),
503 )),
504 risingwave_common::types::DataType::Serial => {
505 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
506 }
507 risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
508 "clickhouse can not support Int256".to_owned(),
509 )),
510 risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
511 "clickhouse can not support Map".to_owned(),
512 )),
513 };
514 if !is_match? {
515 return Err(SinkError::ClickHouse(format!(
516 "Column type can not match name is {:?}, risingwave is {:?} and clickhouse is {:?}",
517 ck_column.name, fields_type, ck_column.r#type
518 )));
519 }
520
521 Ok(())
522 }
523}
524
525impl Sink for ClickHouseSink {
526 type Coordinator = DummySinkCommitCoordinator;
527 type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
528
529 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
530 const SINK_NAME: &'static str = CLICKHOUSE_SINK;
531
532 async fn validate(&self) -> Result<()> {
533 if !self.is_append_only && self.pk_indices.is_empty() {
535 return Err(SinkError::Config(anyhow!(
536 "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
537 )));
538 }
539
540 let client = self.config.common.build_client()?;
542
543 let (clickhouse_column, clickhouse_engine) =
544 query_column_engine_from_ck(client, &self.config).await?;
545 if clickhouse_engine.is_shared_tree() {
546 risingwave_common::license::Feature::ClickHouseSharedEngine
547 .check_available()
548 .map_err(|e| anyhow::anyhow!(e))?;
549 }
550
551 if !self.is_append_only
552 && !clickhouse_engine.is_collapsing_engine()
553 && !clickhouse_engine.is_delete_replacing_engine()
554 {
555 return match clickhouse_engine {
556 ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) => {
557 Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
558 }
559 _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
560 };
561 }
562
563 self.check_column_name_and_type(&clickhouse_column)?;
564 if !self.is_append_only {
565 self.check_pk_match(&clickhouse_column)?;
566 }
567
568 if self.config.common.commit_checkpoint_interval == 0 {
569 return Err(SinkError::Config(anyhow!(
570 "`commit_checkpoint_interval` must be greater than 0"
571 )));
572 }
573 Ok(())
574 }
575
576 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
577 ClickHouseConfig::from_btreemap(config.clone())?;
578 Ok(())
579 }
580
581 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
582 let writer = ClickHouseSinkWriter::new(
583 self.config.clone(),
584 self.schema.clone(),
585 self.pk_indices.clone(),
586 self.is_append_only,
587 )
588 .await?;
589 let commit_checkpoint_interval =
590 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
591 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
592 );
593
594 Ok(DecoupleCheckpointLogSinkerOf::new(
595 writer,
596 SinkWriterMetrics::new(&writer_param),
597 commit_checkpoint_interval,
598 ))
599 }
600}
601pub struct ClickHouseSinkWriter {
602 pub config: ClickHouseConfig,
603 #[expect(dead_code)]
604 schema: Schema,
605 #[expect(dead_code)]
606 pk_indices: Vec<usize>,
607 client: ClickHouseClient,
608 #[expect(dead_code)]
609 is_append_only: bool,
610 column_correct_vec: Vec<ClickHouseSchemaFeature>,
612 rw_fields_name_after_calibration: Vec<String>,
613 clickhouse_engine: ClickHouseEngine,
614 inserter: Option<Insert<ClickHouseColumn>>,
615}
616#[derive(Debug)]
617struct ClickHouseSchemaFeature {
618 can_null: bool,
619 accuracy_time: u8,
621
622 accuracy_decimal: (u8, u8),
623}
624
625impl ClickHouseSinkWriter {
626 pub async fn new(
627 config: ClickHouseConfig,
628 schema: Schema,
629 pk_indices: Vec<usize>,
630 is_append_only: bool,
631 ) -> Result<Self> {
632 let client = config.common.build_client()?;
633
634 let (clickhouse_column, clickhouse_engine) =
635 query_column_engine_from_ck(client.clone(), &config).await?;
636
637 let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
638 .iter()
639 .map(Self::build_column_correct_vec)
640 .collect();
641 let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
642 .iter()
643 .map(|(a, _)| a.clone())
644 .collect_vec();
645
646 if let Some(sign) = clickhouse_engine.get_sign_name() {
647 rw_fields_name_after_calibration.push(sign);
648 }
649 if let Some(delete_col) = clickhouse_engine.get_delete_col() {
650 rw_fields_name_after_calibration.push(delete_col);
651 }
652 Ok(Self {
653 config,
654 schema,
655 pk_indices,
656 client,
657 is_append_only,
658 column_correct_vec: column_correct_vec?,
659 rw_fields_name_after_calibration,
660 clickhouse_engine,
661 inserter: None,
662 })
663 }
664
665 fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
668 let can_null = ck_column.r#type.contains("Nullable");
669 let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
671 ck_column
672 .r#type
673 .split("DateTime64(")
674 .last()
675 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
676 .split(')')
677 .next()
678 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
679 .split(',')
680 .next()
681 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
682 .parse::<u8>()
683 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
684 } else {
685 0_u8
686 };
687 let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
688 let decimal_all = ck_column
689 .r#type
690 .split("Decimal(")
691 .last()
692 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
693 .split(')')
694 .next()
695 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
696 .split(", ")
697 .collect_vec();
698 let length = decimal_all
699 .first()
700 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
701 .parse::<u8>()
702 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
703
704 if length > 38 {
705 return Err(SinkError::ClickHouse(
706 "RW don't support Decimal256".to_owned(),
707 ));
708 }
709
710 let scale = decimal_all
711 .last()
712 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
713 .parse::<u8>()
714 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
715 (length, scale)
716 } else {
717 (0_u8, 0_u8)
718 };
719 Ok(ClickHouseSchemaFeature {
720 can_null,
721 accuracy_time,
722 accuracy_decimal,
723 })
724 }
725
726 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
727 if self.inserter.is_none() {
728 self.inserter = Some(self.client.insert_with_fields_name(
729 &self.config.common.table,
730 self.rw_fields_name_after_calibration.clone(),
731 )?);
732 }
733 for (op, row) in chunk.rows() {
734 let mut clickhouse_filed_vec = vec![];
735 for (index, data) in row.iter().enumerate() {
736 clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
737 data,
738 &self.column_correct_vec,
739 index,
740 )?);
741 }
742 match op {
743 Op::Insert | Op::UpdateInsert => {
744 if self.clickhouse_engine.is_collapsing_engine() {
745 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
746 ClickHouseField::Int8(1),
747 ));
748 }
749 if self.clickhouse_engine.is_delete_replacing_engine() {
750 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
751 ClickHouseField::Int8(0),
752 ))
753 }
754 }
755 Op::Delete | Op::UpdateDelete => {
756 if !self.clickhouse_engine.is_collapsing_engine()
757 && !self.clickhouse_engine.is_delete_replacing_engine()
758 {
759 return Err(SinkError::ClickHouse(
760 "Clickhouse engine don't support upsert".to_owned(),
761 ));
762 }
763 if self.clickhouse_engine.is_collapsing_engine() {
764 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
765 ClickHouseField::Int8(-1),
766 ));
767 }
768 if self.clickhouse_engine.is_delete_replacing_engine() {
769 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
770 ClickHouseField::Int8(1),
771 ))
772 }
773 }
774 }
775 let clickhouse_column = ClickHouseColumn {
776 row: clickhouse_filed_vec,
777 };
778 self.inserter
779 .as_mut()
780 .unwrap()
781 .write(&clickhouse_column)
782 .await?;
783 }
784 Ok(())
785 }
786}
787
788#[async_trait]
789impl SinkWriter for ClickHouseSinkWriter {
790 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
791 self.write(chunk).await
792 }
793
794 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
795 Ok(())
796 }
797
798 async fn abort(&mut self) -> Result<()> {
799 Ok(())
800 }
801
802 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
803 if is_checkpoint && let Some(inserter) = self.inserter.take() {
804 inserter.end().await?;
805 }
806 Ok(())
807 }
808}
809
810#[derive(ClickHouseRow, Deserialize, Clone)]
811struct SystemColumn {
812 name: String,
813 r#type: String,
814 is_in_primary_key: u8,
815}
816
817#[derive(ClickHouseRow, Deserialize)]
818struct ClickhouseQueryEngine {
819 #[expect(dead_code)]
820 name: String,
821 engine: String,
822 create_table_query: String,
823}
824
825async fn query_column_engine_from_ck(
826 client: ClickHouseClient,
827 config: &ClickHouseConfig,
828) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
829 let query_engine = QUERY_ENGINE;
830 let query_column = QUERY_COLUMN;
831
832 let clickhouse_engine = client
833 .query(query_engine)
834 .bind(config.common.database.clone())
835 .bind(config.common.table.clone())
836 .fetch_all::<ClickhouseQueryEngine>()
837 .await?;
838 let mut clickhouse_column = client
839 .query(query_column)
840 .bind(config.common.database.clone())
841 .bind(config.common.table.clone())
842 .bind("position")
843 .fetch_all::<SystemColumn>()
844 .await?;
845 if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
846 return Err(SinkError::ClickHouse(format!(
847 "table {:?}.{:?} is not find in clickhouse",
848 config.common.database, config.common.table
849 )));
850 }
851
852 let clickhouse_engine =
853 ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
854
855 if let Some(sign) = &clickhouse_engine.get_sign_name() {
856 clickhouse_column.retain(|a| sign.ne(&a.name))
857 }
858
859 if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
860 clickhouse_column.retain(|a| delete_col.ne(&a.name))
861 }
862
863 Ok((clickhouse_column, clickhouse_engine))
864}
865
866#[derive(ClickHouseRow, Debug)]
868struct ClickHouseColumn {
869 row: Vec<ClickHouseFieldWithNull>,
870}
871
872#[derive(Debug)]
874enum ClickHouseField {
875 Int16(i16),
876 Int32(i32),
877 Int64(i64),
878 Serial(Serial),
879 Float32(f32),
880 Float64(f64),
881 String(String),
882 Bool(bool),
883 List(Vec<ClickHouseFieldWithNull>),
884 Int8(i8),
885 Decimal(ClickHouseDecimal),
886}
887#[derive(Debug)]
888enum ClickHouseDecimal {
889 Decimal32(i32),
890 Decimal64(i64),
891 Decimal128(i128),
892}
893impl Serialize for ClickHouseDecimal {
894 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
895 where
896 S: serde::Serializer,
897 {
898 match self {
899 ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
900 ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
901 ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
902 }
903 }
904}
905
906#[derive(Debug)]
908enum ClickHouseFieldWithNull {
909 WithSome(ClickHouseField),
910 WithoutSome(ClickHouseField),
911 None,
912}
913
914impl ClickHouseFieldWithNull {
915 pub fn from_scalar_ref(
916 data: Option<ScalarRefImpl<'_>>,
917 clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
918 clickhouse_schema_feature_index: usize,
919 ) -> Result<Vec<ClickHouseFieldWithNull>> {
920 let clickhouse_schema_feature = clickhouse_schema_feature_vec
921 .get(clickhouse_schema_feature_index)
922 .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
923 if data.is_none() {
924 if !clickhouse_schema_feature.can_null {
925 return Err(SinkError::ClickHouse(
926 "clickhouse column can not insert null".to_owned(),
927 ));
928 } else {
929 return Ok(vec![ClickHouseFieldWithNull::None]);
930 }
931 }
932 let data = match data.unwrap() {
933 ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
934 ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
935 ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
936 ScalarRefImpl::Int256(_) => {
937 return Err(SinkError::ClickHouse(
938 "clickhouse can not support Int256".to_owned(),
939 ));
940 }
941 ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
942 ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
943 ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
944 ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
945 ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
946 ScalarRefImpl::Decimal(d) => {
947 let d = if let Decimal::Normalized(d) = d {
948 let scale =
949 clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
950 if scale < 0 {
951 d.mantissa() / 10_i128.pow(scale.unsigned_abs())
952 } else {
953 d.mantissa() * 10_i128.pow(scale as u32)
954 }
955 } else if clickhouse_schema_feature.can_null {
956 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
957 return Ok(vec![ClickHouseFieldWithNull::None]);
958 } else {
959 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
960 0_i128
961 };
962 if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
963 ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
964 } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
965 ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
966 } else {
967 ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
968 }
969 }
970 ScalarRefImpl::Interval(_) => {
971 return Err(SinkError::ClickHouse(
972 "clickhouse can not support Interval".to_owned(),
973 ));
974 }
975 ScalarRefImpl::Date(v) => {
976 let days = v.get_nums_days_unix_epoch();
977 ClickHouseField::Int32(days)
978 }
979 ScalarRefImpl::Time(_) => {
980 return Err(SinkError::ClickHouse(
981 "clickhouse can not support Time".to_owned(),
982 ));
983 }
984 ScalarRefImpl::Timestamp(_) => {
985 return Err(SinkError::ClickHouse(
986 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
987 ));
988 }
989 ScalarRefImpl::Timestamptz(v) => {
990 let micros = v.timestamp_micros();
991 let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
992 true => {
993 micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
994 }
995 false => micros
996 .checked_mul(
997 10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
998 )
999 .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1000 };
1001 ClickHouseField::Int64(ticks)
1002 }
1003 ScalarRefImpl::Jsonb(_) => {
1004 return Err(SinkError::ClickHouse(
1005 "clickhouse rust interface can not support Json".to_owned(),
1006 ));
1007 }
1008 ScalarRefImpl::Struct(v) => {
1009 let mut struct_vec = vec![];
1010 for (index, field) in v.iter_fields_ref().enumerate() {
1011 let a = Self::from_scalar_ref(
1012 field,
1013 clickhouse_schema_feature_vec,
1014 clickhouse_schema_feature_index + index,
1015 )?;
1016 struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1017 a,
1018 )));
1019 }
1020 return Ok(struct_vec);
1021 }
1022 ScalarRefImpl::List(v) => {
1023 let mut vec = vec![];
1024 for i in v.iter() {
1025 vec.extend(Self::from_scalar_ref(
1026 i,
1027 clickhouse_schema_feature_vec,
1028 clickhouse_schema_feature_index,
1029 )?)
1030 }
1031 return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1032 ClickHouseField::List(vec),
1033 )]);
1034 }
1035 ScalarRefImpl::Bytea(_) => {
1036 return Err(SinkError::ClickHouse(
1037 "clickhouse can not support Bytea".to_owned(),
1038 ));
1039 }
1040 ScalarRefImpl::Map(_) => {
1041 return Err(SinkError::ClickHouse(
1042 "clickhouse can not support Map".to_owned(),
1043 ));
1044 }
1045 };
1046 let data = if clickhouse_schema_feature.can_null {
1047 vec![ClickHouseFieldWithNull::WithSome(data)]
1048 } else {
1049 vec![ClickHouseFieldWithNull::WithoutSome(data)]
1050 };
1051 Ok(data)
1052 }
1053}
1054
1055impl Serialize for ClickHouseField {
1056 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1057 where
1058 S: serde::Serializer,
1059 {
1060 match self {
1061 ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1062 ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1063 ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1064 ClickHouseField::Serial(v) => v.serialize(serializer),
1065 ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1066 ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1067 ClickHouseField::String(v) => serializer.serialize_str(v),
1068 ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1069 ClickHouseField::List(v) => {
1070 let mut s = serializer.serialize_seq(Some(v.len()))?;
1071 for i in v {
1072 s.serialize_element(i)?;
1073 }
1074 s.end()
1075 }
1076 ClickHouseField::Decimal(v) => v.serialize(serializer),
1077 ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1078 }
1079 }
1080}
1081impl Serialize for ClickHouseFieldWithNull {
1082 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1083 where
1084 S: serde::Serializer,
1085 {
1086 match self {
1087 ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1088 ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1089 ClickHouseFieldWithNull::None => serializer.serialize_none(),
1090 }
1091 }
1092}
1093impl Serialize for ClickHouseColumn {
1094 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1095 where
1096 S: serde::Serializer,
1097 {
1098 let mut s = serializer.serialize_struct("useless", self.row.len())?;
1099 for data in &self.row {
1100 s.serialize_field("useless", &data)?
1101 }
1102 s.end()
1103 }
1104}
1105
1106pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1109 let mut vec = vec![];
1110 for field in schema.fields() {
1111 if let DataType::Struct(st) = &field.data_type {
1112 for (name, data_type) in st.iter() {
1113 if matches!(data_type, DataType::Struct(_)) {
1114 return Err(SinkError::ClickHouse(
1115 "Only one level of nesting is supported for struct".to_owned(),
1116 ));
1117 } else {
1118 vec.push((
1119 format!("{}.{}", field.name, name),
1120 DataType::List(Box::new(data_type.clone())),
1121 ))
1122 }
1123 }
1124 } else {
1125 vec.push((field.name.clone(), field.data_type()));
1126 }
1127 }
1128 Ok(vec)
1129}