risingwave_connector/sink/
clickhouse.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::Serialize;
29use serde::ser::{SerializeSeq, SerializeStruct};
30use serde_derive::Deserialize;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use super::decouple_checkpoint_log_sink::{
38    DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
39};
40use super::writer::SinkWriter;
41use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
42use crate::enforce_secret::EnforceSecret;
43use crate::error::ConnectorResult;
44use crate::sink::{
45    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
46};
47
48const QUERY_ENGINE: &str =
49    "select distinct ?fields from system.tables where database = ? and name = ?";
50const QUERY_COLUMN: &str =
51    "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
52pub const CLICKHOUSE_SINK: &str = "clickhouse";
53
54#[serde_as]
55#[derive(Deserialize, Debug, Clone, WithOptions)]
56pub struct ClickHouseCommon {
57    #[serde(rename = "clickhouse.url")]
58    pub url: String,
59    #[serde(rename = "clickhouse.user")]
60    pub user: String,
61    #[serde(rename = "clickhouse.password")]
62    pub password: String,
63    #[serde(rename = "clickhouse.database")]
64    pub database: String,
65    #[serde(rename = "clickhouse.table")]
66    pub table: String,
67    #[serde(rename = "clickhouse.delete.column")]
68    pub delete_column: Option<String>,
69    /// Commit every n(>0) checkpoints, default is 10.
70    #[serde(default = "default_commit_checkpoint_interval")]
71    #[serde_as(as = "DisplayFromStr")]
72    pub commit_checkpoint_interval: u64,
73}
74
75impl EnforceSecret for ClickHouseCommon {
76    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77        "clickhouse.password", "clickhouse.user"
78    };
79}
80
81#[allow(clippy::enum_variant_names)]
82#[derive(Debug)]
83enum ClickHouseEngine {
84    MergeTree,
85    ReplacingMergeTree(Option<String>),
86    SummingMergeTree,
87    AggregatingMergeTree,
88    CollapsingMergeTree(String),
89    VersionedCollapsingMergeTree(String),
90    GraphiteMergeTree,
91    ReplicatedMergeTree,
92    ReplicatedReplacingMergeTree(Option<String>),
93    ReplicatedSummingMergeTree,
94    ReplicatedAggregatingMergeTree,
95    ReplicatedCollapsingMergeTree(String),
96    ReplicatedVersionedCollapsingMergeTree(String),
97    ReplicatedGraphiteMergeTree,
98    SharedMergeTree,
99    SharedReplacingMergeTree(Option<String>),
100    SharedSummingMergeTree,
101    SharedAggregatingMergeTree,
102    SharedCollapsingMergeTree(String),
103    SharedVersionedCollapsingMergeTree(String),
104    SharedGraphiteMergeTree,
105    Null,
106}
107impl ClickHouseEngine {
108    pub fn is_collapsing_engine(&self) -> bool {
109        matches!(
110            self,
111            ClickHouseEngine::CollapsingMergeTree(_)
112                | ClickHouseEngine::VersionedCollapsingMergeTree(_)
113                | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
114                | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
115                | ClickHouseEngine::SharedCollapsingMergeTree(_)
116                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
117        )
118    }
119
120    pub fn is_delete_replacing_engine(&self) -> bool {
121        match self {
122            ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
123            ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
124            ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
125            _ => false,
126        }
127    }
128
129    pub fn get_delete_col(&self) -> Option<String> {
130        match self {
131            ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
132            ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
133                Some(delete_col.to_string())
134            }
135            ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
136                Some(delete_col.to_string())
137            }
138            _ => None,
139        }
140    }
141
142    pub fn get_sign_name(&self) -> Option<String> {
143        match self {
144            ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
145            ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => {
146                Some(sign_name.to_string())
147            }
148            ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => {
149                Some(sign_name.to_string())
150            }
151            ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
152                Some(sign_name.to_string())
153            }
154            ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
155            ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
156                Some(sign_name.to_string())
157            }
158            _ => None,
159        }
160    }
161
162    pub fn is_shared_tree(&self) -> bool {
163        matches!(
164            self,
165            ClickHouseEngine::SharedMergeTree
166                | ClickHouseEngine::SharedReplacingMergeTree(_)
167                | ClickHouseEngine::SharedSummingMergeTree
168                | ClickHouseEngine::SharedAggregatingMergeTree
169                | ClickHouseEngine::SharedCollapsingMergeTree(_)
170                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
171                | ClickHouseEngine::SharedGraphiteMergeTree
172        )
173    }
174
175    pub fn from_query_engine(
176        engine_name: &ClickhouseQueryEngine,
177        config: &ClickHouseConfig,
178    ) -> Result<Self> {
179        match engine_name.engine.as_str() {
180            "MergeTree" => Ok(ClickHouseEngine::MergeTree),
181            "Null" => Ok(ClickHouseEngine::Null),
182            "ReplacingMergeTree" => {
183                let delete_column = config.common.delete_column.clone();
184                Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
185            }
186            "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
187            "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
188            // VersionedCollapsingMergeTree(sign_name,"a")
189            "VersionedCollapsingMergeTree" => {
190                let sign_name = engine_name
191                    .create_table_query
192                    .split("VersionedCollapsingMergeTree(")
193                    .last()
194                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
195                    .split(',')
196                    .next()
197                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
198                    .trim()
199                    .to_owned();
200                Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
201            }
202            // CollapsingMergeTree(sign_name)
203            "CollapsingMergeTree" => {
204                let sign_name = engine_name
205                    .create_table_query
206                    .split("CollapsingMergeTree(")
207                    .last()
208                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
209                    .split(')')
210                    .next()
211                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
212                    .trim()
213                    .to_owned();
214                Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
215            }
216            "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
217            "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
218            "ReplicatedReplacingMergeTree" => {
219                let delete_column = config.common.delete_column.clone();
220                Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
221                    delete_column,
222                ))
223            }
224            "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
225            "ReplicatedAggregatingMergeTree" => {
226                Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
227            }
228            // ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c")
229            "ReplicatedVersionedCollapsingMergeTree" => {
230                let sign_name = engine_name
231                    .create_table_query
232                    .split("ReplicatedVersionedCollapsingMergeTree(")
233                    .last()
234                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
235                    .split(',')
236                    .rev()
237                    .nth(1)
238                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
239                    .trim()
240                    .to_owned();
241                Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
242                    sign_name,
243                ))
244            }
245            // ReplicatedCollapsingMergeTree("a","b",sign_name)
246            "ReplicatedCollapsingMergeTree" => {
247                let sign_name = engine_name
248                    .create_table_query
249                    .split("ReplicatedCollapsingMergeTree(")
250                    .last()
251                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
252                    .split(')')
253                    .next()
254                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
255                    .split(',')
256                    .next_back()
257                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
258                    .trim()
259                    .to_owned();
260                Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
261            }
262            "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
263            "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
264            "SharedReplacingMergeTree" => {
265                let delete_column = config.common.delete_column.clone();
266                Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
267            }
268            "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
269            "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
270            // SharedVersionedCollapsingMergeTree("a","b",sign_name,"c")
271            "SharedVersionedCollapsingMergeTree" => {
272                let sign_name = engine_name
273                    .create_table_query
274                    .split("SharedVersionedCollapsingMergeTree(")
275                    .last()
276                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
277                    .split(',')
278                    .rev()
279                    .nth(1)
280                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
281                    .trim()
282                    .to_owned();
283                Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
284                    sign_name,
285                ))
286            }
287            // SharedCollapsingMergeTree("a","b",sign_name)
288            "SharedCollapsingMergeTree" => {
289                let sign_name = engine_name
290                    .create_table_query
291                    .split("SharedCollapsingMergeTree(")
292                    .last()
293                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
294                    .split(')')
295                    .next()
296                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
297                    .split(',')
298                    .next_back()
299                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
300                    .trim()
301                    .to_owned();
302                Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
303            }
304            "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
305            _ => Err(SinkError::ClickHouse(format!(
306                "Cannot find clickhouse engine {:?}",
307                engine_name.engine
308            ))),
309        }
310    }
311}
312
313impl ClickHouseCommon {
314    pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
315        let client = ClickHouseClient::default() // hyper(0.14) client inside
316            .with_url(&self.url)
317            .with_user(&self.user)
318            .with_password(&self.password)
319            .with_database(&self.database);
320        Ok(client)
321    }
322}
323
324#[serde_as]
325#[derive(Clone, Debug, Deserialize, WithOptions)]
326pub struct ClickHouseConfig {
327    #[serde(flatten)]
328    pub common: ClickHouseCommon,
329
330    pub r#type: String, // accept "append-only" or "upsert"
331}
332
333impl EnforceSecret for ClickHouseConfig {
334    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
335        ClickHouseCommon::enforce_one(prop)
336    }
337
338    fn enforce_secret<'a>(
339        prop_iter: impl Iterator<Item = &'a str>,
340    ) -> crate::error::ConnectorResult<()> {
341        for prop in prop_iter {
342            ClickHouseCommon::enforce_one(prop)?;
343        }
344        Ok(())
345    }
346}
347
348#[derive(Clone, Debug)]
349pub struct ClickHouseSink {
350    pub config: ClickHouseConfig,
351    schema: Schema,
352    pk_indices: Vec<usize>,
353    is_append_only: bool,
354}
355
356impl EnforceSecret for ClickHouseSink {
357    fn enforce_secret<'a>(
358        prop_iter: impl Iterator<Item = &'a str>,
359    ) -> crate::error::ConnectorResult<()> {
360        for prop in prop_iter {
361            ClickHouseConfig::enforce_one(prop)?;
362        }
363        Ok(())
364    }
365}
366
367impl ClickHouseConfig {
368    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
369        let config =
370            serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
371                .map_err(|e| SinkError::Config(anyhow!(e)))?;
372        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
373            return Err(SinkError::Config(anyhow!(
374                "`{}` must be {}, or {}",
375                SINK_TYPE_OPTION,
376                SINK_TYPE_APPEND_ONLY,
377                SINK_TYPE_UPSERT
378            )));
379        }
380        Ok(config)
381    }
382}
383
384impl TryFrom<SinkParam> for ClickHouseSink {
385    type Error = SinkError;
386
387    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
388        let schema = param.schema();
389        let config = ClickHouseConfig::from_btreemap(param.properties)?;
390        Ok(Self {
391            config,
392            schema,
393            pk_indices: param.downstream_pk,
394            is_append_only: param.sink_type.is_append_only(),
395        })
396    }
397}
398
399impl ClickHouseSink {
400    /// Check that the column names and types of risingwave and clickhouse are identical
401    fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
402        let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
403        let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
404            .iter()
405            .map(|s| (s.name.clone(), s.clone()))
406            .collect();
407
408        if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
409            return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
410        }
411
412        for i in rw_fields_name {
413            let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
414                SinkError::ClickHouse(format!(
415                    "Column name don't find in clickhouse, risingwave is {:?} ",
416                    i.0
417                ))
418            })?;
419
420            Self::check_and_correct_column_type(&i.1, value)?;
421        }
422        Ok(())
423    }
424
425    /// Check that the column names and types of risingwave and clickhouse are identical
426    fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
427        let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
428            .iter()
429            .filter(|s| s.is_in_primary_key == 1)
430            .map(|s| s.name.clone())
431            .collect();
432
433        for (_, field) in self
434            .schema
435            .fields()
436            .iter()
437            .enumerate()
438            .filter(|(index, _)| self.pk_indices.contains(index))
439        {
440            if !clickhouse_pks.remove(&field.name) {
441                return Err(SinkError::ClickHouse(
442                    "Clicklhouse and RisingWave pk is not match".to_owned(),
443                ));
444            }
445        }
446
447        if !clickhouse_pks.is_empty() {
448            return Err(SinkError::ClickHouse(
449                "Clicklhouse and RisingWave pk is not match".to_owned(),
450            ));
451        }
452        Ok(())
453    }
454
455    /// Check that the column types of risingwave and clickhouse are identical
456    fn check_and_correct_column_type(
457        fields_type: &DataType,
458        ck_column: &SystemColumn,
459    ) -> Result<()> {
460        // FIXME: the "contains" based implementation is wrong
461        let is_match = match fields_type {
462            risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
463            risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
464                | ck_column.r#type.contains("Int16")
465                // Allow Int16 to be pushed to Enum16, they share an encoding and value range
466                // No special care is taken to ensure values are valid.
467                | ck_column.r#type.contains("Enum16")),
468            risingwave_common::types::DataType::Int32 => {
469                Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
470            }
471            risingwave_common::types::DataType::Int64 => {
472                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
473            }
474            risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
475            risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
476            risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
477            risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
478            risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
479            risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
480                "clickhouse can not support Time".to_owned(),
481            )),
482            risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
483                "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
484            )),
485            risingwave_common::types::DataType::Timestamptz => {
486                Ok(ck_column.r#type.contains("DateTime64"))
487            }
488            risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
489                "clickhouse can not support Interval".to_owned(),
490            )),
491            risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
492                "struct needs to be converted into a list".to_owned(),
493            )),
494            risingwave_common::types::DataType::List(list) => {
495                Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
496                Ok(ck_column.r#type.contains("Array"))
497            }
498            risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
499                "clickhouse can not support Bytea".to_owned(),
500            )),
501            risingwave_common::types::DataType::Jsonb => Err(SinkError::ClickHouse(
502                "clickhouse rust can not support Json".to_owned(),
503            )),
504            risingwave_common::types::DataType::Serial => {
505                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
506            }
507            risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
508                "clickhouse can not support Int256".to_owned(),
509            )),
510            risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
511                "clickhouse can not support Map".to_owned(),
512            )),
513        };
514        if !is_match? {
515            return Err(SinkError::ClickHouse(format!(
516                "Column type can not match name is {:?}, risingwave is {:?} and clickhouse is {:?}",
517                ck_column.name, fields_type, ck_column.r#type
518            )));
519        }
520
521        Ok(())
522    }
523}
524
525impl Sink for ClickHouseSink {
526    type Coordinator = DummySinkCommitCoordinator;
527    type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
528
529    const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
530    const SINK_NAME: &'static str = CLICKHOUSE_SINK;
531
532    async fn validate(&self) -> Result<()> {
533        // For upsert clickhouse sink, the primary key must be defined.
534        if !self.is_append_only && self.pk_indices.is_empty() {
535            return Err(SinkError::Config(anyhow!(
536                "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
537            )));
538        }
539
540        // check reachability
541        let client = self.config.common.build_client()?;
542
543        let (clickhouse_column, clickhouse_engine) =
544            query_column_engine_from_ck(client, &self.config).await?;
545        if clickhouse_engine.is_shared_tree() {
546            risingwave_common::license::Feature::ClickHouseSharedEngine
547                .check_available()
548                .map_err(|e| anyhow::anyhow!(e))?;
549        }
550
551        if !self.is_append_only
552            && !clickhouse_engine.is_collapsing_engine()
553            && !clickhouse_engine.is_delete_replacing_engine()
554        {
555            return match clickhouse_engine {
556                ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) =>  {
557                    Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
558                }
559                _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
560            };
561        }
562
563        self.check_column_name_and_type(&clickhouse_column)?;
564        if !self.is_append_only {
565            self.check_pk_match(&clickhouse_column)?;
566        }
567
568        if self.config.common.commit_checkpoint_interval == 0 {
569            return Err(SinkError::Config(anyhow!(
570                "`commit_checkpoint_interval` must be greater than 0"
571            )));
572        }
573        Ok(())
574    }
575
576    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
577        ClickHouseConfig::from_btreemap(config.clone())?;
578        Ok(())
579    }
580
581    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
582        let writer = ClickHouseSinkWriter::new(
583            self.config.clone(),
584            self.schema.clone(),
585            self.pk_indices.clone(),
586            self.is_append_only,
587        )
588        .await?;
589        let commit_checkpoint_interval =
590    NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
591        "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
592    );
593
594        Ok(DecoupleCheckpointLogSinkerOf::new(
595            writer,
596            SinkWriterMetrics::new(&writer_param),
597            commit_checkpoint_interval,
598        ))
599    }
600}
601pub struct ClickHouseSinkWriter {
602    pub config: ClickHouseConfig,
603    #[expect(dead_code)]
604    schema: Schema,
605    #[expect(dead_code)]
606    pk_indices: Vec<usize>,
607    client: ClickHouseClient,
608    #[expect(dead_code)]
609    is_append_only: bool,
610    // Save some features of the clickhouse column type
611    column_correct_vec: Vec<ClickHouseSchemaFeature>,
612    rw_fields_name_after_calibration: Vec<String>,
613    clickhouse_engine: ClickHouseEngine,
614    inserter: Option<Insert<ClickHouseColumn>>,
615}
616#[derive(Debug)]
617struct ClickHouseSchemaFeature {
618    can_null: bool,
619    // Time accuracy in clickhouse for rw and ck conversions
620    accuracy_time: u8,
621
622    accuracy_decimal: (u8, u8),
623}
624
625impl ClickHouseSinkWriter {
626    pub async fn new(
627        config: ClickHouseConfig,
628        schema: Schema,
629        pk_indices: Vec<usize>,
630        is_append_only: bool,
631    ) -> Result<Self> {
632        let client = config.common.build_client()?;
633
634        let (clickhouse_column, clickhouse_engine) =
635            query_column_engine_from_ck(client.clone(), &config).await?;
636
637        let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
638            .iter()
639            .map(Self::build_column_correct_vec)
640            .collect();
641        let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
642            .iter()
643            .map(|(a, _)| a.clone())
644            .collect_vec();
645
646        if let Some(sign) = clickhouse_engine.get_sign_name() {
647            rw_fields_name_after_calibration.push(sign);
648        }
649        if let Some(delete_col) = clickhouse_engine.get_delete_col() {
650            rw_fields_name_after_calibration.push(delete_col);
651        }
652        Ok(Self {
653            config,
654            schema,
655            pk_indices,
656            client,
657            is_append_only,
658            column_correct_vec: column_correct_vec?,
659            rw_fields_name_after_calibration,
660            clickhouse_engine,
661            inserter: None,
662        })
663    }
664
665    /// Check if clickhouse's column is 'Nullable', valid bits of `DateTime64`. And save it in
666    /// `column_correct_vec`
667    fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
668        let can_null = ck_column.r#type.contains("Nullable");
669        // `DateTime64` without precision is already displayed as `DateTime(3)` in `system.columns`.
670        let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
671            ck_column
672                .r#type
673                .split("DateTime64(")
674                .last()
675                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
676                .split(')')
677                .next()
678                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
679                .split(',')
680                .next()
681                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
682                .parse::<u8>()
683                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
684        } else {
685            0_u8
686        };
687        let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
688            let decimal_all = ck_column
689                .r#type
690                .split("Decimal(")
691                .last()
692                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
693                .split(')')
694                .next()
695                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
696                .split(", ")
697                .collect_vec();
698            let length = decimal_all
699                .first()
700                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
701                .parse::<u8>()
702                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
703
704            if length > 38 {
705                return Err(SinkError::ClickHouse(
706                    "RW don't support Decimal256".to_owned(),
707                ));
708            }
709
710            let scale = decimal_all
711                .last()
712                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
713                .parse::<u8>()
714                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
715            (length, scale)
716        } else {
717            (0_u8, 0_u8)
718        };
719        Ok(ClickHouseSchemaFeature {
720            can_null,
721            accuracy_time,
722            accuracy_decimal,
723        })
724    }
725
726    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
727        if self.inserter.is_none() {
728            self.inserter = Some(self.client.insert_with_fields_name(
729                &self.config.common.table,
730                self.rw_fields_name_after_calibration.clone(),
731            )?);
732        }
733        for (op, row) in chunk.rows() {
734            let mut clickhouse_filed_vec = vec![];
735            for (index, data) in row.iter().enumerate() {
736                clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
737                    data,
738                    &self.column_correct_vec,
739                    index,
740                )?);
741            }
742            match op {
743                Op::Insert | Op::UpdateInsert => {
744                    if self.clickhouse_engine.is_collapsing_engine() {
745                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
746                            ClickHouseField::Int8(1),
747                        ));
748                    }
749                    if self.clickhouse_engine.is_delete_replacing_engine() {
750                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
751                            ClickHouseField::Int8(0),
752                        ))
753                    }
754                }
755                Op::Delete | Op::UpdateDelete => {
756                    if !self.clickhouse_engine.is_collapsing_engine()
757                        && !self.clickhouse_engine.is_delete_replacing_engine()
758                    {
759                        return Err(SinkError::ClickHouse(
760                            "Clickhouse engine don't support upsert".to_owned(),
761                        ));
762                    }
763                    if self.clickhouse_engine.is_collapsing_engine() {
764                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
765                            ClickHouseField::Int8(-1),
766                        ));
767                    }
768                    if self.clickhouse_engine.is_delete_replacing_engine() {
769                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
770                            ClickHouseField::Int8(1),
771                        ))
772                    }
773                }
774            }
775            let clickhouse_column = ClickHouseColumn {
776                row: clickhouse_filed_vec,
777            };
778            self.inserter
779                .as_mut()
780                .unwrap()
781                .write(&clickhouse_column)
782                .await?;
783        }
784        Ok(())
785    }
786}
787
788#[async_trait]
789impl SinkWriter for ClickHouseSinkWriter {
790    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
791        self.write(chunk).await
792    }
793
794    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
795        Ok(())
796    }
797
798    async fn abort(&mut self) -> Result<()> {
799        Ok(())
800    }
801
802    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
803        if is_checkpoint && let Some(inserter) = self.inserter.take() {
804            inserter.end().await?;
805        }
806        Ok(())
807    }
808}
809
810#[derive(ClickHouseRow, Deserialize, Clone)]
811struct SystemColumn {
812    name: String,
813    r#type: String,
814    is_in_primary_key: u8,
815}
816
817#[derive(ClickHouseRow, Deserialize)]
818struct ClickhouseQueryEngine {
819    #[expect(dead_code)]
820    name: String,
821    engine: String,
822    create_table_query: String,
823}
824
825async fn query_column_engine_from_ck(
826    client: ClickHouseClient,
827    config: &ClickHouseConfig,
828) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
829    let query_engine = QUERY_ENGINE;
830    let query_column = QUERY_COLUMN;
831
832    let clickhouse_engine = client
833        .query(query_engine)
834        .bind(config.common.database.clone())
835        .bind(config.common.table.clone())
836        .fetch_all::<ClickhouseQueryEngine>()
837        .await?;
838    let mut clickhouse_column = client
839        .query(query_column)
840        .bind(config.common.database.clone())
841        .bind(config.common.table.clone())
842        .bind("position")
843        .fetch_all::<SystemColumn>()
844        .await?;
845    if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
846        return Err(SinkError::ClickHouse(format!(
847            "table {:?}.{:?} is not find in clickhouse",
848            config.common.database, config.common.table
849        )));
850    }
851
852    let clickhouse_engine =
853        ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
854
855    if let Some(sign) = &clickhouse_engine.get_sign_name() {
856        clickhouse_column.retain(|a| sign.ne(&a.name))
857    }
858
859    if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
860        clickhouse_column.retain(|a| delete_col.ne(&a.name))
861    }
862
863    Ok((clickhouse_column, clickhouse_engine))
864}
865
866/// Serialize this structure to simulate the `struct` call clickhouse interface
867#[derive(ClickHouseRow, Debug)]
868struct ClickHouseColumn {
869    row: Vec<ClickHouseFieldWithNull>,
870}
871
872/// Basic data types for use with the clickhouse interface
873#[derive(Debug)]
874enum ClickHouseField {
875    Int16(i16),
876    Int32(i32),
877    Int64(i64),
878    Serial(Serial),
879    Float32(f32),
880    Float64(f64),
881    String(String),
882    Bool(bool),
883    List(Vec<ClickHouseFieldWithNull>),
884    Int8(i8),
885    Decimal(ClickHouseDecimal),
886}
887#[derive(Debug)]
888enum ClickHouseDecimal {
889    Decimal32(i32),
890    Decimal64(i64),
891    Decimal128(i128),
892}
893impl Serialize for ClickHouseDecimal {
894    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
895    where
896        S: serde::Serializer,
897    {
898        match self {
899            ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
900            ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
901            ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
902        }
903    }
904}
905
906/// Enum that support clickhouse nullable
907#[derive(Debug)]
908enum ClickHouseFieldWithNull {
909    WithSome(ClickHouseField),
910    WithoutSome(ClickHouseField),
911    None,
912}
913
914impl ClickHouseFieldWithNull {
915    pub fn from_scalar_ref(
916        data: Option<ScalarRefImpl<'_>>,
917        clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
918        clickhouse_schema_feature_index: usize,
919    ) -> Result<Vec<ClickHouseFieldWithNull>> {
920        let clickhouse_schema_feature = clickhouse_schema_feature_vec
921            .get(clickhouse_schema_feature_index)
922            .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
923        if data.is_none() {
924            if !clickhouse_schema_feature.can_null {
925                return Err(SinkError::ClickHouse(
926                    "clickhouse column can not insert null".to_owned(),
927                ));
928            } else {
929                return Ok(vec![ClickHouseFieldWithNull::None]);
930            }
931        }
932        let data = match data.unwrap() {
933            ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
934            ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
935            ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
936            ScalarRefImpl::Int256(_) => {
937                return Err(SinkError::ClickHouse(
938                    "clickhouse can not support Int256".to_owned(),
939                ));
940            }
941            ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
942            ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
943            ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
944            ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
945            ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
946            ScalarRefImpl::Decimal(d) => {
947                let d = if let Decimal::Normalized(d) = d {
948                    let scale =
949                        clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
950                    if scale < 0 {
951                        d.mantissa() / 10_i128.pow(scale.unsigned_abs())
952                    } else {
953                        d.mantissa() * 10_i128.pow(scale as u32)
954                    }
955                } else if clickhouse_schema_feature.can_null {
956                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
957                    return Ok(vec![ClickHouseFieldWithNull::None]);
958                } else {
959                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
960                    0_i128
961                };
962                if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
963                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
964                } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
965                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
966                } else {
967                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
968                }
969            }
970            ScalarRefImpl::Interval(_) => {
971                return Err(SinkError::ClickHouse(
972                    "clickhouse can not support Interval".to_owned(),
973                ));
974            }
975            ScalarRefImpl::Date(v) => {
976                let days = v.get_nums_days_unix_epoch();
977                ClickHouseField::Int32(days)
978            }
979            ScalarRefImpl::Time(_) => {
980                return Err(SinkError::ClickHouse(
981                    "clickhouse can not support Time".to_owned(),
982                ));
983            }
984            ScalarRefImpl::Timestamp(_) => {
985                return Err(SinkError::ClickHouse(
986                    "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
987                ));
988            }
989            ScalarRefImpl::Timestamptz(v) => {
990                let micros = v.timestamp_micros();
991                let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
992                    true => {
993                        micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
994                    }
995                    false => micros
996                        .checked_mul(
997                            10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
998                        )
999                        .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1000                };
1001                ClickHouseField::Int64(ticks)
1002            }
1003            ScalarRefImpl::Jsonb(_) => {
1004                return Err(SinkError::ClickHouse(
1005                    "clickhouse rust interface can not support Json".to_owned(),
1006                ));
1007            }
1008            ScalarRefImpl::Struct(v) => {
1009                let mut struct_vec = vec![];
1010                for (index, field) in v.iter_fields_ref().enumerate() {
1011                    let a = Self::from_scalar_ref(
1012                        field,
1013                        clickhouse_schema_feature_vec,
1014                        clickhouse_schema_feature_index + index,
1015                    )?;
1016                    struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1017                        a,
1018                    )));
1019                }
1020                return Ok(struct_vec);
1021            }
1022            ScalarRefImpl::List(v) => {
1023                let mut vec = vec![];
1024                for i in v.iter() {
1025                    vec.extend(Self::from_scalar_ref(
1026                        i,
1027                        clickhouse_schema_feature_vec,
1028                        clickhouse_schema_feature_index,
1029                    )?)
1030                }
1031                return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1032                    ClickHouseField::List(vec),
1033                )]);
1034            }
1035            ScalarRefImpl::Bytea(_) => {
1036                return Err(SinkError::ClickHouse(
1037                    "clickhouse can not support Bytea".to_owned(),
1038                ));
1039            }
1040            ScalarRefImpl::Map(_) => {
1041                return Err(SinkError::ClickHouse(
1042                    "clickhouse can not support Map".to_owned(),
1043                ));
1044            }
1045        };
1046        let data = if clickhouse_schema_feature.can_null {
1047            vec![ClickHouseFieldWithNull::WithSome(data)]
1048        } else {
1049            vec![ClickHouseFieldWithNull::WithoutSome(data)]
1050        };
1051        Ok(data)
1052    }
1053}
1054
1055impl Serialize for ClickHouseField {
1056    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1057    where
1058        S: serde::Serializer,
1059    {
1060        match self {
1061            ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1062            ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1063            ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1064            ClickHouseField::Serial(v) => v.serialize(serializer),
1065            ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1066            ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1067            ClickHouseField::String(v) => serializer.serialize_str(v),
1068            ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1069            ClickHouseField::List(v) => {
1070                let mut s = serializer.serialize_seq(Some(v.len()))?;
1071                for i in v {
1072                    s.serialize_element(i)?;
1073                }
1074                s.end()
1075            }
1076            ClickHouseField::Decimal(v) => v.serialize(serializer),
1077            ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1078        }
1079    }
1080}
1081impl Serialize for ClickHouseFieldWithNull {
1082    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1083    where
1084        S: serde::Serializer,
1085    {
1086        match self {
1087            ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1088            ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1089            ClickHouseFieldWithNull::None => serializer.serialize_none(),
1090        }
1091    }
1092}
1093impl Serialize for ClickHouseColumn {
1094    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1095    where
1096        S: serde::Serializer,
1097    {
1098        let mut s = serializer.serialize_struct("useless", self.row.len())?;
1099        for data in &self.row {
1100            s.serialize_field("useless", &data)?
1101        }
1102        s.end()
1103    }
1104}
1105
1106/// 'Struct'(clickhouse type name is nested) will be converted into some arrays by clickhouse. So we
1107/// need to make some conversions
1108pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1109    let mut vec = vec![];
1110    for field in schema.fields() {
1111        if let DataType::Struct(st) = &field.data_type {
1112            for (name, data_type) in st.iter() {
1113                if matches!(data_type, DataType::Struct(_)) {
1114                    return Err(SinkError::ClickHouse(
1115                        "Only one level of nesting is supported for struct".to_owned(),
1116                    ));
1117                } else {
1118                    vec.push((
1119                        format!("{}.{}", field.name, name),
1120                        DataType::List(Box::new(data_type.clone())),
1121                    ))
1122                }
1123            }
1124        } else {
1125            vec.push((field.name.clone(), field.data_type()));
1126        }
1127    }
1128    Ok(vec)
1129}