risingwave_connector/sink/
clickhouse.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::Serialize;
29use serde::ser::{SerializeSeq, SerializeStruct};
30use serde_derive::Deserialize;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use super::decouple_checkpoint_log_sink::{
38    DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
39};
40use super::writer::SinkWriter;
41use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
42use crate::enforce_secret::EnforceSecret;
43use crate::error::ConnectorResult;
44use crate::sink::{
45    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
46};
47
48const QUERY_ENGINE: &str =
49    "select distinct ?fields from system.tables where database = ? and name = ?";
50const QUERY_COLUMN: &str =
51    "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
52pub const CLICKHOUSE_SINK: &str = "clickhouse";
53
54const ALLOW_EXPERIMENTAL_JSON_TYPE: &str = "allow_experimental_json_type";
55const INPUT_FORMAT_BINARY_READ_JSON_AS_STRING: &str = "input_format_binary_read_json_as_string";
56const OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING: &str = "output_format_binary_write_json_as_string";
57
58#[serde_as]
59#[derive(Deserialize, Debug, Clone, WithOptions)]
60pub struct ClickHouseCommon {
61    #[serde(rename = "clickhouse.url")]
62    pub url: String,
63    #[serde(rename = "clickhouse.user")]
64    pub user: String,
65    #[serde(rename = "clickhouse.password")]
66    pub password: String,
67    #[serde(rename = "clickhouse.database")]
68    pub database: String,
69    #[serde(rename = "clickhouse.table")]
70    pub table: String,
71    #[serde(rename = "clickhouse.delete.column")]
72    pub delete_column: Option<String>,
73    /// Commit every n(>0) checkpoints, default is 10.
74    #[serde(default = "default_commit_checkpoint_interval")]
75    #[serde_as(as = "DisplayFromStr")]
76    #[with_option(allow_alter_on_fly)]
77    pub commit_checkpoint_interval: u64,
78}
79
80impl EnforceSecret for ClickHouseCommon {
81    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
82        "clickhouse.password", "clickhouse.user"
83    };
84}
85
86#[allow(clippy::enum_variant_names)]
87#[derive(Debug)]
88enum ClickHouseEngine {
89    MergeTree,
90    ReplacingMergeTree(Option<String>),
91    SummingMergeTree,
92    AggregatingMergeTree,
93    CollapsingMergeTree(String),
94    VersionedCollapsingMergeTree(String),
95    GraphiteMergeTree,
96    ReplicatedMergeTree,
97    ReplicatedReplacingMergeTree(Option<String>),
98    ReplicatedSummingMergeTree,
99    ReplicatedAggregatingMergeTree,
100    ReplicatedCollapsingMergeTree(String),
101    ReplicatedVersionedCollapsingMergeTree(String),
102    ReplicatedGraphiteMergeTree,
103    SharedMergeTree,
104    SharedReplacingMergeTree(Option<String>),
105    SharedSummingMergeTree,
106    SharedAggregatingMergeTree,
107    SharedCollapsingMergeTree(String),
108    SharedVersionedCollapsingMergeTree(String),
109    SharedGraphiteMergeTree,
110    Null,
111}
112impl ClickHouseEngine {
113    pub fn is_collapsing_engine(&self) -> bool {
114        matches!(
115            self,
116            ClickHouseEngine::CollapsingMergeTree(_)
117                | ClickHouseEngine::VersionedCollapsingMergeTree(_)
118                | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
119                | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
120                | ClickHouseEngine::SharedCollapsingMergeTree(_)
121                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
122        )
123    }
124
125    pub fn is_delete_replacing_engine(&self) -> bool {
126        match self {
127            ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
128            ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
129            ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
130            _ => false,
131        }
132    }
133
134    pub fn get_delete_col(&self) -> Option<String> {
135        match self {
136            ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.clone()),
137            ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
138                Some(delete_col.clone())
139            }
140            ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
141                Some(delete_col.clone())
142            }
143            _ => None,
144        }
145    }
146
147    pub fn get_sign_name(&self) -> Option<String> {
148        match self {
149            ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.clone()),
150            ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
151            ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
152            ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
153                Some(sign_name.clone())
154            }
155            ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
156            ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
157                Some(sign_name.clone())
158            }
159            _ => None,
160        }
161    }
162
163    pub fn is_shared_tree(&self) -> bool {
164        matches!(
165            self,
166            ClickHouseEngine::SharedMergeTree
167                | ClickHouseEngine::SharedReplacingMergeTree(_)
168                | ClickHouseEngine::SharedSummingMergeTree
169                | ClickHouseEngine::SharedAggregatingMergeTree
170                | ClickHouseEngine::SharedCollapsingMergeTree(_)
171                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
172                | ClickHouseEngine::SharedGraphiteMergeTree
173        )
174    }
175
176    pub fn from_query_engine(
177        engine_name: &ClickhouseQueryEngine,
178        config: &ClickHouseConfig,
179    ) -> Result<Self> {
180        match engine_name.engine.as_str() {
181            "MergeTree" => Ok(ClickHouseEngine::MergeTree),
182            "Null" => Ok(ClickHouseEngine::Null),
183            "ReplacingMergeTree" => {
184                let delete_column = config.common.delete_column.clone();
185                Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
186            }
187            "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
188            "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
189            // VersionedCollapsingMergeTree(sign_name,"a")
190            "VersionedCollapsingMergeTree" => {
191                let sign_name = engine_name
192                    .create_table_query
193                    .split("VersionedCollapsingMergeTree(")
194                    .last()
195                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
196                    .split(',')
197                    .next()
198                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
199                    .trim()
200                    .to_owned();
201                Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
202            }
203            // CollapsingMergeTree(sign_name)
204            "CollapsingMergeTree" => {
205                let sign_name = engine_name
206                    .create_table_query
207                    .split("CollapsingMergeTree(")
208                    .last()
209                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
210                    .split(')')
211                    .next()
212                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
213                    .trim()
214                    .to_owned();
215                Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
216            }
217            "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
218            "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
219            "ReplicatedReplacingMergeTree" => {
220                let delete_column = config.common.delete_column.clone();
221                Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
222                    delete_column,
223                ))
224            }
225            "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
226            "ReplicatedAggregatingMergeTree" => {
227                Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
228            }
229            // ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c")
230            "ReplicatedVersionedCollapsingMergeTree" => {
231                let sign_name = engine_name
232                    .create_table_query
233                    .split("ReplicatedVersionedCollapsingMergeTree(")
234                    .last()
235                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
236                    .split(',')
237                    .rev()
238                    .nth(1)
239                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
240                    .trim()
241                    .to_owned();
242                Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
243                    sign_name,
244                ))
245            }
246            // ReplicatedCollapsingMergeTree("a","b",sign_name)
247            "ReplicatedCollapsingMergeTree" => {
248                let sign_name = engine_name
249                    .create_table_query
250                    .split("ReplicatedCollapsingMergeTree(")
251                    .last()
252                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
253                    .split(')')
254                    .next()
255                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
256                    .split(',')
257                    .next_back()
258                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
259                    .trim()
260                    .to_owned();
261                Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
262            }
263            "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
264            "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
265            "SharedReplacingMergeTree" => {
266                let delete_column = config.common.delete_column.clone();
267                Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
268            }
269            "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
270            "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
271            // SharedVersionedCollapsingMergeTree("a","b",sign_name,"c")
272            "SharedVersionedCollapsingMergeTree" => {
273                let sign_name = engine_name
274                    .create_table_query
275                    .split("SharedVersionedCollapsingMergeTree(")
276                    .last()
277                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
278                    .split(',')
279                    .rev()
280                    .nth(1)
281                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
282                    .trim()
283                    .to_owned();
284                Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
285                    sign_name,
286                ))
287            }
288            // SharedCollapsingMergeTree("a","b",sign_name)
289            "SharedCollapsingMergeTree" => {
290                let sign_name = engine_name
291                    .create_table_query
292                    .split("SharedCollapsingMergeTree(")
293                    .last()
294                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
295                    .split(')')
296                    .next()
297                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
298                    .split(',')
299                    .next_back()
300                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
301                    .trim()
302                    .to_owned();
303                Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
304            }
305            "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
306            _ => Err(SinkError::ClickHouse(format!(
307                "Cannot find clickhouse engine {:?}",
308                engine_name.engine
309            ))),
310        }
311    }
312}
313
314impl ClickHouseCommon {
315    pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
316        let client = ClickHouseClient::default() // hyper(0.14) client inside
317            .with_url(&self.url)
318            .with_user(&self.user)
319            .with_password(&self.password)
320            .with_database(&self.database)
321            .with_option(ALLOW_EXPERIMENTAL_JSON_TYPE, "1")
322            .with_option(INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1")
323            .with_option(OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1");
324        Ok(client)
325    }
326}
327
328#[serde_as]
329#[derive(Clone, Debug, Deserialize, WithOptions)]
330pub struct ClickHouseConfig {
331    #[serde(flatten)]
332    pub common: ClickHouseCommon,
333
334    pub r#type: String, // accept "append-only" or "upsert"
335}
336
337impl EnforceSecret for ClickHouseConfig {
338    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
339        ClickHouseCommon::enforce_one(prop)
340    }
341
342    fn enforce_secret<'a>(
343        prop_iter: impl Iterator<Item = &'a str>,
344    ) -> crate::error::ConnectorResult<()> {
345        for prop in prop_iter {
346            ClickHouseCommon::enforce_one(prop)?;
347        }
348        Ok(())
349    }
350}
351
352#[derive(Clone, Debug)]
353pub struct ClickHouseSink {
354    pub config: ClickHouseConfig,
355    schema: Schema,
356    pk_indices: Vec<usize>,
357    is_append_only: bool,
358}
359
360impl EnforceSecret for ClickHouseSink {
361    fn enforce_secret<'a>(
362        prop_iter: impl Iterator<Item = &'a str>,
363    ) -> crate::error::ConnectorResult<()> {
364        for prop in prop_iter {
365            ClickHouseConfig::enforce_one(prop)?;
366        }
367        Ok(())
368    }
369}
370
371impl ClickHouseConfig {
372    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
373        let config =
374            serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
375                .map_err(|e| SinkError::Config(anyhow!(e)))?;
376        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
377            return Err(SinkError::Config(anyhow!(
378                "`{}` must be {}, or {}",
379                SINK_TYPE_OPTION,
380                SINK_TYPE_APPEND_ONLY,
381                SINK_TYPE_UPSERT
382            )));
383        }
384        Ok(config)
385    }
386}
387
388impl TryFrom<SinkParam> for ClickHouseSink {
389    type Error = SinkError;
390
391    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
392        let schema = param.schema();
393        let config = ClickHouseConfig::from_btreemap(param.properties)?;
394        Ok(Self {
395            config,
396            schema,
397            pk_indices: param.downstream_pk,
398            is_append_only: param.sink_type.is_append_only(),
399        })
400    }
401}
402
403impl ClickHouseSink {
404    /// Check that the column names and types of risingwave and clickhouse are identical
405    fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
406        let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
407        let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
408            .iter()
409            .map(|s| (s.name.clone(), s.clone()))
410            .collect();
411
412        if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
413            return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
414        }
415
416        for i in rw_fields_name {
417            let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
418                SinkError::ClickHouse(format!(
419                    "Column name don't find in clickhouse, risingwave is {:?} ",
420                    i.0
421                ))
422            })?;
423
424            Self::check_and_correct_column_type(&i.1, value)?;
425        }
426        Ok(())
427    }
428
429    /// Check that the column names and types of risingwave and clickhouse are identical
430    fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
431        let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
432            .iter()
433            .filter(|s| s.is_in_primary_key == 1)
434            .map(|s| s.name.clone())
435            .collect();
436
437        for (_, field) in self
438            .schema
439            .fields()
440            .iter()
441            .enumerate()
442            .filter(|(index, _)| self.pk_indices.contains(index))
443        {
444            if !clickhouse_pks.remove(&field.name) {
445                return Err(SinkError::ClickHouse(
446                    "Clicklhouse and RisingWave pk is not match".to_owned(),
447                ));
448            }
449        }
450
451        if !clickhouse_pks.is_empty() {
452            return Err(SinkError::ClickHouse(
453                "Clicklhouse and RisingWave pk is not match".to_owned(),
454            ));
455        }
456        Ok(())
457    }
458
459    /// Check that the column types of risingwave and clickhouse are identical
460    fn check_and_correct_column_type(
461        fields_type: &DataType,
462        ck_column: &SystemColumn,
463    ) -> Result<()> {
464        // FIXME: the "contains" based implementation is wrong
465        let is_match = match fields_type {
466            risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
467            risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
468                | ck_column.r#type.contains("Int16")
469                // Allow Int16 to be pushed to Enum16, they share an encoding and value range
470                // No special care is taken to ensure values are valid.
471                | ck_column.r#type.contains("Enum16")),
472            risingwave_common::types::DataType::Int32 => {
473                Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
474            }
475            risingwave_common::types::DataType::Int64 => {
476                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
477            }
478            risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
479            risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
480            risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
481            risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
482            risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
483            risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
484                "clickhouse can not support Time".to_owned(),
485            )),
486            risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
487                "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
488            )),
489            risingwave_common::types::DataType::Timestamptz => {
490                Ok(ck_column.r#type.contains("DateTime64"))
491            }
492            risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
493                "clickhouse can not support Interval".to_owned(),
494            )),
495            risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
496                "struct needs to be converted into a list".to_owned(),
497            )),
498            risingwave_common::types::DataType::List(list) => {
499                Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
500                Ok(ck_column.r#type.contains("Array"))
501            }
502            risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
503                "clickhouse can not support Bytea".to_owned(),
504            )),
505            risingwave_common::types::DataType::Jsonb => Ok(ck_column.r#type.contains("JSON")),
506            risingwave_common::types::DataType::Serial => {
507                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
508            }
509            risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
510                "clickhouse can not support Int256".to_owned(),
511            )),
512            risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
513                "clickhouse can not support Map".to_owned(),
514            )),
515            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
516        };
517        if !is_match? {
518            return Err(SinkError::ClickHouse(format!(
519                "Column type can not match name is {:?}, risingwave is {:?} and clickhouse is {:?}",
520                ck_column.name, fields_type, ck_column.r#type
521            )));
522        }
523
524        Ok(())
525    }
526}
527
528impl Sink for ClickHouseSink {
529    type Coordinator = DummySinkCommitCoordinator;
530    type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
531
532    const SINK_NAME: &'static str = CLICKHOUSE_SINK;
533
534    async fn validate(&self) -> Result<()> {
535        // For upsert clickhouse sink, the primary key must be defined.
536        if !self.is_append_only && self.pk_indices.is_empty() {
537            return Err(SinkError::Config(anyhow!(
538                "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
539            )));
540        }
541
542        // check reachability
543        let client = self.config.common.build_client()?;
544
545        let (clickhouse_column, clickhouse_engine) =
546            query_column_engine_from_ck(client, &self.config).await?;
547        if clickhouse_engine.is_shared_tree() {
548            risingwave_common::license::Feature::ClickHouseSharedEngine
549                .check_available()
550                .map_err(|e| anyhow::anyhow!(e))?;
551        }
552
553        if !self.is_append_only
554            && !clickhouse_engine.is_collapsing_engine()
555            && !clickhouse_engine.is_delete_replacing_engine()
556        {
557            return match clickhouse_engine {
558                ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) =>  {
559                    Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
560                }
561                _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
562            };
563        }
564
565        self.check_column_name_and_type(&clickhouse_column)?;
566        if !self.is_append_only {
567            self.check_pk_match(&clickhouse_column)?;
568        }
569
570        if self.config.common.commit_checkpoint_interval == 0 {
571            return Err(SinkError::Config(anyhow!(
572                "`commit_checkpoint_interval` must be greater than 0"
573            )));
574        }
575        Ok(())
576    }
577
578    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
579        ClickHouseConfig::from_btreemap(config.clone())?;
580        Ok(())
581    }
582
583    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
584        let writer = ClickHouseSinkWriter::new(
585            self.config.clone(),
586            self.schema.clone(),
587            self.pk_indices.clone(),
588            self.is_append_only,
589        )
590        .await?;
591        let commit_checkpoint_interval =
592    NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
593        "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
594    );
595
596        Ok(DecoupleCheckpointLogSinkerOf::new(
597            writer,
598            SinkWriterMetrics::new(&writer_param),
599            commit_checkpoint_interval,
600        ))
601    }
602}
603pub struct ClickHouseSinkWriter {
604    pub config: ClickHouseConfig,
605    #[expect(dead_code)]
606    schema: Schema,
607    #[expect(dead_code)]
608    pk_indices: Vec<usize>,
609    client: ClickHouseClient,
610    #[expect(dead_code)]
611    is_append_only: bool,
612    // Save some features of the clickhouse column type
613    column_correct_vec: Vec<ClickHouseSchemaFeature>,
614    rw_fields_name_after_calibration: Vec<String>,
615    clickhouse_engine: ClickHouseEngine,
616    inserter: Option<Insert<ClickHouseColumn>>,
617}
618#[derive(Debug)]
619struct ClickHouseSchemaFeature {
620    can_null: bool,
621    // Time accuracy in clickhouse for rw and ck conversions
622    accuracy_time: u8,
623
624    accuracy_decimal: (u8, u8),
625}
626
627impl ClickHouseSinkWriter {
628    pub async fn new(
629        config: ClickHouseConfig,
630        schema: Schema,
631        pk_indices: Vec<usize>,
632        is_append_only: bool,
633    ) -> Result<Self> {
634        let client = config.common.build_client()?;
635
636        let (clickhouse_column, clickhouse_engine) =
637            query_column_engine_from_ck(client.clone(), &config).await?;
638
639        let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
640            .iter()
641            .map(Self::build_column_correct_vec)
642            .collect();
643        let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
644            .iter()
645            .map(|(a, _)| a.clone())
646            .collect_vec();
647
648        if let Some(sign) = clickhouse_engine.get_sign_name() {
649            rw_fields_name_after_calibration.push(sign);
650        }
651        if let Some(delete_col) = clickhouse_engine.get_delete_col() {
652            rw_fields_name_after_calibration.push(delete_col);
653        }
654        Ok(Self {
655            config,
656            schema,
657            pk_indices,
658            client,
659            is_append_only,
660            column_correct_vec: column_correct_vec?,
661            rw_fields_name_after_calibration,
662            clickhouse_engine,
663            inserter: None,
664        })
665    }
666
667    /// Check if clickhouse's column is 'Nullable', valid bits of `DateTime64`. And save it in
668    /// `column_correct_vec`
669    fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
670        let can_null = ck_column.r#type.contains("Nullable");
671        // `DateTime64` without precision is already displayed as `DateTime(3)` in `system.columns`.
672        let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
673            ck_column
674                .r#type
675                .split("DateTime64(")
676                .last()
677                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
678                .split(')')
679                .next()
680                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
681                .split(',')
682                .next()
683                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
684                .parse::<u8>()
685                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
686        } else {
687            0_u8
688        };
689        let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
690            let decimal_all = ck_column
691                .r#type
692                .split("Decimal(")
693                .last()
694                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
695                .split(')')
696                .next()
697                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
698                .split(", ")
699                .collect_vec();
700            let length = decimal_all
701                .first()
702                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
703                .parse::<u8>()
704                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
705
706            if length > 38 {
707                return Err(SinkError::ClickHouse(
708                    "RW don't support Decimal256".to_owned(),
709                ));
710            }
711
712            let scale = decimal_all
713                .last()
714                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
715                .parse::<u8>()
716                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
717            (length, scale)
718        } else {
719            (0_u8, 0_u8)
720        };
721        Ok(ClickHouseSchemaFeature {
722            can_null,
723            accuracy_time,
724            accuracy_decimal,
725        })
726    }
727
728    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
729        if self.inserter.is_none() {
730            self.inserter = Some(self.client.insert_with_fields_name(
731                &self.config.common.table,
732                self.rw_fields_name_after_calibration.clone(),
733            )?);
734        }
735        for (op, row) in chunk.rows() {
736            let mut clickhouse_filed_vec = vec![];
737            for (index, data) in row.iter().enumerate() {
738                clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
739                    data,
740                    &self.column_correct_vec,
741                    index,
742                )?);
743            }
744            match op {
745                Op::Insert | Op::UpdateInsert => {
746                    if self.clickhouse_engine.is_collapsing_engine() {
747                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
748                            ClickHouseField::Int8(1),
749                        ));
750                    }
751                    if self.clickhouse_engine.is_delete_replacing_engine() {
752                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
753                            ClickHouseField::Int8(0),
754                        ))
755                    }
756                }
757                Op::Delete | Op::UpdateDelete => {
758                    if !self.clickhouse_engine.is_collapsing_engine()
759                        && !self.clickhouse_engine.is_delete_replacing_engine()
760                    {
761                        return Err(SinkError::ClickHouse(
762                            "Clickhouse engine don't support upsert".to_owned(),
763                        ));
764                    }
765                    if self.clickhouse_engine.is_collapsing_engine() {
766                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
767                            ClickHouseField::Int8(-1),
768                        ));
769                    }
770                    if self.clickhouse_engine.is_delete_replacing_engine() {
771                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
772                            ClickHouseField::Int8(1),
773                        ))
774                    }
775                }
776            }
777            let clickhouse_column = ClickHouseColumn {
778                row: clickhouse_filed_vec,
779            };
780            self.inserter
781                .as_mut()
782                .unwrap()
783                .write(&clickhouse_column)
784                .await?;
785        }
786        Ok(())
787    }
788}
789
790#[async_trait]
791impl SinkWriter for ClickHouseSinkWriter {
792    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
793        self.write(chunk).await
794    }
795
796    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
797        Ok(())
798    }
799
800    async fn abort(&mut self) -> Result<()> {
801        Ok(())
802    }
803
804    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
805        if is_checkpoint && let Some(inserter) = self.inserter.take() {
806            inserter.end().await?;
807        }
808        Ok(())
809    }
810}
811
812#[derive(ClickHouseRow, Deserialize, Clone)]
813struct SystemColumn {
814    name: String,
815    r#type: String,
816    is_in_primary_key: u8,
817}
818
819#[derive(ClickHouseRow, Deserialize)]
820struct ClickhouseQueryEngine {
821    #[expect(dead_code)]
822    name: String,
823    engine: String,
824    create_table_query: String,
825}
826
827async fn query_column_engine_from_ck(
828    client: ClickHouseClient,
829    config: &ClickHouseConfig,
830) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
831    let query_engine = QUERY_ENGINE;
832    let query_column = QUERY_COLUMN;
833
834    let clickhouse_engine = client
835        .query(query_engine)
836        .bind(config.common.database.clone())
837        .bind(config.common.table.clone())
838        .fetch_all::<ClickhouseQueryEngine>()
839        .await?;
840    let mut clickhouse_column = client
841        .query(query_column)
842        .bind(config.common.database.clone())
843        .bind(config.common.table.clone())
844        .bind("position")
845        .fetch_all::<SystemColumn>()
846        .await?;
847    if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
848        return Err(SinkError::ClickHouse(format!(
849            "table {:?}.{:?} is not find in clickhouse",
850            config.common.database, config.common.table
851        )));
852    }
853
854    let clickhouse_engine =
855        ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
856
857    if let Some(sign) = &clickhouse_engine.get_sign_name() {
858        clickhouse_column.retain(|a| sign.ne(&a.name))
859    }
860
861    if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
862        clickhouse_column.retain(|a| delete_col.ne(&a.name))
863    }
864
865    Ok((clickhouse_column, clickhouse_engine))
866}
867
868/// Serialize this structure to simulate the `struct` call clickhouse interface
869#[derive(ClickHouseRow, Debug)]
870struct ClickHouseColumn {
871    row: Vec<ClickHouseFieldWithNull>,
872}
873
874/// Basic data types for use with the clickhouse interface
875#[derive(Debug)]
876enum ClickHouseField {
877    Int16(i16),
878    Int32(i32),
879    Int64(i64),
880    Serial(Serial),
881    Float32(f32),
882    Float64(f64),
883    String(String),
884    Bool(bool),
885    List(Vec<ClickHouseFieldWithNull>),
886    Int8(i8),
887    Decimal(ClickHouseDecimal),
888}
889#[derive(Debug)]
890enum ClickHouseDecimal {
891    Decimal32(i32),
892    Decimal64(i64),
893    Decimal128(i128),
894}
895impl Serialize for ClickHouseDecimal {
896    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
897    where
898        S: serde::Serializer,
899    {
900        match self {
901            ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
902            ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
903            ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
904        }
905    }
906}
907
908/// Enum that support clickhouse nullable
909#[derive(Debug)]
910enum ClickHouseFieldWithNull {
911    WithSome(ClickHouseField),
912    WithoutSome(ClickHouseField),
913    None,
914}
915
916impl ClickHouseFieldWithNull {
917    pub fn from_scalar_ref(
918        data: Option<ScalarRefImpl<'_>>,
919        clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
920        clickhouse_schema_feature_index: usize,
921    ) -> Result<Vec<ClickHouseFieldWithNull>> {
922        let clickhouse_schema_feature = clickhouse_schema_feature_vec
923            .get(clickhouse_schema_feature_index)
924            .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
925        if data.is_none() {
926            if !clickhouse_schema_feature.can_null {
927                return Err(SinkError::ClickHouse(
928                    "clickhouse column can not insert null".to_owned(),
929                ));
930            } else {
931                return Ok(vec![ClickHouseFieldWithNull::None]);
932            }
933        }
934        let data = match data.unwrap() {
935            ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
936            ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
937            ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
938            ScalarRefImpl::Int256(_) => {
939                return Err(SinkError::ClickHouse(
940                    "clickhouse can not support Int256".to_owned(),
941                ));
942            }
943            ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
944            ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
945            ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
946            ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
947            ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
948            ScalarRefImpl::Decimal(d) => {
949                let d = if let Decimal::Normalized(d) = d {
950                    let scale =
951                        clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
952                    if scale < 0 {
953                        d.mantissa() / 10_i128.pow(scale.unsigned_abs())
954                    } else {
955                        d.mantissa() * 10_i128.pow(scale as u32)
956                    }
957                } else if clickhouse_schema_feature.can_null {
958                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
959                    return Ok(vec![ClickHouseFieldWithNull::None]);
960                } else {
961                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
962                    0_i128
963                };
964                if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
965                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
966                } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
967                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
968                } else {
969                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
970                }
971            }
972            ScalarRefImpl::Interval(_) => {
973                return Err(SinkError::ClickHouse(
974                    "clickhouse can not support Interval".to_owned(),
975                ));
976            }
977            ScalarRefImpl::Date(v) => {
978                let days = v.get_nums_days_unix_epoch();
979                ClickHouseField::Int32(days)
980            }
981            ScalarRefImpl::Time(_) => {
982                return Err(SinkError::ClickHouse(
983                    "clickhouse can not support Time".to_owned(),
984                ));
985            }
986            ScalarRefImpl::Timestamp(_) => {
987                return Err(SinkError::ClickHouse(
988                    "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
989                ));
990            }
991            ScalarRefImpl::Timestamptz(v) => {
992                let micros = v.timestamp_micros();
993                let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
994                    true => {
995                        micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
996                    }
997                    false => micros
998                        .checked_mul(
999                            10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
1000                        )
1001                        .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1002                };
1003                ClickHouseField::Int64(ticks)
1004            }
1005            ScalarRefImpl::Jsonb(v) => {
1006                let json_str = v.to_string();
1007                ClickHouseField::String(json_str)
1008            }
1009            ScalarRefImpl::Struct(v) => {
1010                let mut struct_vec = vec![];
1011                for (index, field) in v.iter_fields_ref().enumerate() {
1012                    let a = Self::from_scalar_ref(
1013                        field,
1014                        clickhouse_schema_feature_vec,
1015                        clickhouse_schema_feature_index + index,
1016                    )?;
1017                    struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1018                        a,
1019                    )));
1020                }
1021                return Ok(struct_vec);
1022            }
1023            ScalarRefImpl::List(v) => {
1024                let mut vec = vec![];
1025                for i in v.iter() {
1026                    vec.extend(Self::from_scalar_ref(
1027                        i,
1028                        clickhouse_schema_feature_vec,
1029                        clickhouse_schema_feature_index,
1030                    )?)
1031                }
1032                return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1033                    ClickHouseField::List(vec),
1034                )]);
1035            }
1036            ScalarRefImpl::Bytea(_) => {
1037                return Err(SinkError::ClickHouse(
1038                    "clickhouse can not support Bytea".to_owned(),
1039                ));
1040            }
1041            ScalarRefImpl::Map(_) => {
1042                return Err(SinkError::ClickHouse(
1043                    "clickhouse can not support Map".to_owned(),
1044                ));
1045            }
1046            ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
1047        };
1048        let data = if clickhouse_schema_feature.can_null {
1049            vec![ClickHouseFieldWithNull::WithSome(data)]
1050        } else {
1051            vec![ClickHouseFieldWithNull::WithoutSome(data)]
1052        };
1053        Ok(data)
1054    }
1055}
1056
1057impl Serialize for ClickHouseField {
1058    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1059    where
1060        S: serde::Serializer,
1061    {
1062        match self {
1063            ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1064            ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1065            ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1066            ClickHouseField::Serial(v) => v.serialize(serializer),
1067            ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1068            ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1069            ClickHouseField::String(v) => serializer.serialize_str(v),
1070            ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1071            ClickHouseField::List(v) => {
1072                let mut s = serializer.serialize_seq(Some(v.len()))?;
1073                for i in v {
1074                    s.serialize_element(i)?;
1075                }
1076                s.end()
1077            }
1078            ClickHouseField::Decimal(v) => v.serialize(serializer),
1079            ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1080        }
1081    }
1082}
1083impl Serialize for ClickHouseFieldWithNull {
1084    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1085    where
1086        S: serde::Serializer,
1087    {
1088        match self {
1089            ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1090            ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1091            ClickHouseFieldWithNull::None => serializer.serialize_none(),
1092        }
1093    }
1094}
1095impl Serialize for ClickHouseColumn {
1096    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1097    where
1098        S: serde::Serializer,
1099    {
1100        let mut s = serializer.serialize_struct("useless", self.row.len())?;
1101        for data in &self.row {
1102            s.serialize_field("useless", &data)?
1103        }
1104        s.end()
1105    }
1106}
1107
1108/// 'Struct'(clickhouse type name is nested) will be converted into some arrays by clickhouse. So we
1109/// need to make some conversions
1110pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1111    let mut vec = vec![];
1112    for field in schema.fields() {
1113        if let DataType::Struct(st) = &field.data_type {
1114            for (name, data_type) in st.iter() {
1115                if matches!(data_type, DataType::Struct(_)) {
1116                    return Err(SinkError::ClickHouse(
1117                        "Only one level of nesting is supported for struct".to_owned(),
1118                    ));
1119                } else {
1120                    vec.push((
1121                        format!("{}.{}", field.name, name),
1122                        DataType::List(Box::new(data_type.clone())),
1123                    ))
1124                }
1125            }
1126        } else {
1127            vec.push((field.name.clone(), field.data_type()));
1128        }
1129    }
1130    Ok(vec)
1131}