risingwave_connector/sink/
clickhouse.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::Serialize;
29use serde::ser::{SerializeSeq, SerializeStruct};
30use serde_derive::Deserialize;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use super::decouple_checkpoint_log_sink::{
38    DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
39};
40use super::writer::SinkWriter;
41use super::{SinkWriterMetrics, SinkWriterParam};
42use crate::enforce_secret::EnforceSecret;
43use crate::error::ConnectorResult;
44use crate::sink::{
45    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
46};
47
48const QUERY_ENGINE: &str =
49    "select distinct ?fields from system.tables where database = ? and name = ?";
50const QUERY_COLUMN: &str =
51    "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
52pub const CLICKHOUSE_SINK: &str = "clickhouse";
53
54const ALLOW_EXPERIMENTAL_JSON_TYPE: &str = "allow_experimental_json_type";
55const INPUT_FORMAT_BINARY_READ_JSON_AS_STRING: &str = "input_format_binary_read_json_as_string";
56const OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING: &str = "output_format_binary_write_json_as_string";
57
58#[serde_as]
59#[derive(Deserialize, Debug, Clone, WithOptions)]
60pub struct ClickHouseCommon {
61    #[serde(rename = "clickhouse.url")]
62    pub url: String,
63    #[serde(rename = "clickhouse.user")]
64    pub user: String,
65    #[serde(rename = "clickhouse.password")]
66    pub password: String,
67    #[serde(rename = "clickhouse.database")]
68    pub database: String,
69    #[serde(rename = "clickhouse.table")]
70    pub table: String,
71    #[serde(rename = "clickhouse.delete.column")]
72    pub delete_column: Option<String>,
73    /// Commit every n(>0) checkpoints, default is 10.
74    #[serde(default = "default_commit_checkpoint_interval")]
75    #[serde_as(as = "DisplayFromStr")]
76    #[with_option(allow_alter_on_fly)]
77    pub commit_checkpoint_interval: u64,
78}
79
80impl EnforceSecret for ClickHouseCommon {
81    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
82        "clickhouse.password", "clickhouse.user"
83    };
84}
85
86#[allow(clippy::enum_variant_names)]
87#[derive(Debug)]
88enum ClickHouseEngine {
89    MergeTree,
90    ReplacingMergeTree(Option<String>),
91    SummingMergeTree,
92    AggregatingMergeTree,
93    CollapsingMergeTree(String),
94    VersionedCollapsingMergeTree(String),
95    GraphiteMergeTree,
96    ReplicatedMergeTree,
97    ReplicatedReplacingMergeTree(Option<String>),
98    ReplicatedSummingMergeTree,
99    ReplicatedAggregatingMergeTree,
100    ReplicatedCollapsingMergeTree(String),
101    ReplicatedVersionedCollapsingMergeTree(String),
102    ReplicatedGraphiteMergeTree,
103    SharedMergeTree,
104    SharedReplacingMergeTree(Option<String>),
105    SharedSummingMergeTree,
106    SharedAggregatingMergeTree,
107    SharedCollapsingMergeTree(String),
108    SharedVersionedCollapsingMergeTree(String),
109    SharedGraphiteMergeTree,
110    Null,
111}
112impl ClickHouseEngine {
113    pub fn is_collapsing_engine(&self) -> bool {
114        matches!(
115            self,
116            ClickHouseEngine::CollapsingMergeTree(_)
117                | ClickHouseEngine::VersionedCollapsingMergeTree(_)
118                | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
119                | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
120                | ClickHouseEngine::SharedCollapsingMergeTree(_)
121                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
122        )
123    }
124
125    pub fn is_delete_replacing_engine(&self) -> bool {
126        match self {
127            ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
128            ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
129            ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
130            _ => false,
131        }
132    }
133
134    pub fn get_delete_col(&self) -> Option<String> {
135        match self {
136            ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.clone()),
137            ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
138                Some(delete_col.clone())
139            }
140            ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
141                Some(delete_col.clone())
142            }
143            _ => None,
144        }
145    }
146
147    pub fn get_sign_name(&self) -> Option<String> {
148        match self {
149            ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.clone()),
150            ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
151            ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
152            ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
153                Some(sign_name.clone())
154            }
155            ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
156            ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
157                Some(sign_name.clone())
158            }
159            _ => None,
160        }
161    }
162
163    pub fn is_shared_tree(&self) -> bool {
164        matches!(
165            self,
166            ClickHouseEngine::SharedMergeTree
167                | ClickHouseEngine::SharedReplacingMergeTree(_)
168                | ClickHouseEngine::SharedSummingMergeTree
169                | ClickHouseEngine::SharedAggregatingMergeTree
170                | ClickHouseEngine::SharedCollapsingMergeTree(_)
171                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
172                | ClickHouseEngine::SharedGraphiteMergeTree
173        )
174    }
175
176    pub fn from_query_engine(
177        engine_name: &ClickhouseQueryEngine,
178        config: &ClickHouseConfig,
179    ) -> Result<Self> {
180        match engine_name.engine.as_str() {
181            "MergeTree" => Ok(ClickHouseEngine::MergeTree),
182            "Null" => Ok(ClickHouseEngine::Null),
183            "ReplacingMergeTree" => {
184                let delete_column = config.common.delete_column.clone();
185                Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
186            }
187            "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
188            "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
189            // VersionedCollapsingMergeTree(sign_name,"a")
190            "VersionedCollapsingMergeTree" => {
191                let sign_name = engine_name
192                    .create_table_query
193                    .split("VersionedCollapsingMergeTree(")
194                    .last()
195                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
196                    .split(',')
197                    .next()
198                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
199                    .trim()
200                    .to_owned();
201                Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
202            }
203            // CollapsingMergeTree(sign_name)
204            "CollapsingMergeTree" => {
205                let sign_name = engine_name
206                    .create_table_query
207                    .split("CollapsingMergeTree(")
208                    .last()
209                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
210                    .split(')')
211                    .next()
212                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
213                    .trim()
214                    .to_owned();
215                Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
216            }
217            "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
218            "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
219            "ReplicatedReplacingMergeTree" => {
220                let delete_column = config.common.delete_column.clone();
221                Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
222                    delete_column,
223                ))
224            }
225            "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
226            "ReplicatedAggregatingMergeTree" => {
227                Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
228            }
229            // ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c")
230            "ReplicatedVersionedCollapsingMergeTree" => {
231                let sign_name = engine_name
232                    .create_table_query
233                    .split("ReplicatedVersionedCollapsingMergeTree(")
234                    .last()
235                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
236                    .split(',')
237                    .rev()
238                    .nth(1)
239                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
240                    .trim()
241                    .to_owned();
242                Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
243                    sign_name,
244                ))
245            }
246            // ReplicatedCollapsingMergeTree("a","b",sign_name)
247            "ReplicatedCollapsingMergeTree" => {
248                let sign_name = engine_name
249                    .create_table_query
250                    .split("ReplicatedCollapsingMergeTree(")
251                    .last()
252                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
253                    .split(')')
254                    .next()
255                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
256                    .split(',')
257                    .next_back()
258                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
259                    .trim()
260                    .to_owned();
261                Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
262            }
263            "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
264            "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
265            "SharedReplacingMergeTree" => {
266                let delete_column = config.common.delete_column.clone();
267                Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
268            }
269            "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
270            "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
271            // SharedVersionedCollapsingMergeTree("a","b",sign_name,"c")
272            "SharedVersionedCollapsingMergeTree" => {
273                let sign_name = engine_name
274                    .create_table_query
275                    .split("SharedVersionedCollapsingMergeTree(")
276                    .last()
277                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
278                    .split(',')
279                    .rev()
280                    .nth(1)
281                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
282                    .trim()
283                    .to_owned();
284                Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
285                    sign_name,
286                ))
287            }
288            // SharedCollapsingMergeTree("a","b",sign_name)
289            "SharedCollapsingMergeTree" => {
290                let sign_name = engine_name
291                    .create_table_query
292                    .split("SharedCollapsingMergeTree(")
293                    .last()
294                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
295                    .split(')')
296                    .next()
297                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
298                    .split(',')
299                    .next_back()
300                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
301                    .trim()
302                    .to_owned();
303                Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
304            }
305            "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
306            _ => Err(SinkError::ClickHouse(format!(
307                "Cannot find clickhouse engine {:?}",
308                engine_name.engine
309            ))),
310        }
311    }
312}
313
314impl ClickHouseCommon {
315    pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
316        let client = ClickHouseClient::default() // hyper(0.14) client inside
317            .with_url(&self.url)
318            .with_user(&self.user)
319            .with_password(&self.password)
320            .with_database(&self.database)
321            .with_option(ALLOW_EXPERIMENTAL_JSON_TYPE, "1")
322            .with_option(INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1")
323            .with_option(OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1");
324        Ok(client)
325    }
326}
327
328#[serde_as]
329#[derive(Clone, Debug, Deserialize, WithOptions)]
330pub struct ClickHouseConfig {
331    #[serde(flatten)]
332    pub common: ClickHouseCommon,
333
334    pub r#type: String, // accept "append-only" or "upsert"
335}
336
337impl EnforceSecret for ClickHouseConfig {
338    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
339        ClickHouseCommon::enforce_one(prop)
340    }
341
342    fn enforce_secret<'a>(
343        prop_iter: impl Iterator<Item = &'a str>,
344    ) -> crate::error::ConnectorResult<()> {
345        for prop in prop_iter {
346            ClickHouseCommon::enforce_one(prop)?;
347        }
348        Ok(())
349    }
350}
351
352#[derive(Clone, Debug)]
353pub struct ClickHouseSink {
354    pub config: ClickHouseConfig,
355    schema: Schema,
356    pk_indices: Vec<usize>,
357    is_append_only: bool,
358}
359
360impl EnforceSecret for ClickHouseSink {
361    fn enforce_secret<'a>(
362        prop_iter: impl Iterator<Item = &'a str>,
363    ) -> crate::error::ConnectorResult<()> {
364        for prop in prop_iter {
365            ClickHouseConfig::enforce_one(prop)?;
366        }
367        Ok(())
368    }
369}
370
371impl ClickHouseConfig {
372    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
373        let config =
374            serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
375                .map_err(|e| SinkError::Config(anyhow!(e)))?;
376        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
377            return Err(SinkError::Config(anyhow!(
378                "`{}` must be {}, or {}",
379                SINK_TYPE_OPTION,
380                SINK_TYPE_APPEND_ONLY,
381                SINK_TYPE_UPSERT
382            )));
383        }
384        Ok(config)
385    }
386}
387
388impl TryFrom<SinkParam> for ClickHouseSink {
389    type Error = SinkError;
390
391    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
392        let schema = param.schema();
393        let config = ClickHouseConfig::from_btreemap(param.properties)?;
394        Ok(Self {
395            config,
396            schema,
397            pk_indices: param.downstream_pk,
398            is_append_only: param.sink_type.is_append_only(),
399        })
400    }
401}
402
403impl ClickHouseSink {
404    /// Check that the column names and types of risingwave and clickhouse are identical
405    fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
406        let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
407        let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
408            .iter()
409            .map(|s| (s.name.clone(), s.clone()))
410            .collect();
411
412        if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
413            return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
414        }
415
416        for i in rw_fields_name {
417            let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
418                SinkError::ClickHouse(format!(
419                    "Column name don't find in clickhouse, risingwave is {:?} ",
420                    i.0
421                ))
422            })?;
423
424            Self::check_and_correct_column_type(&i.1, value)?;
425        }
426        Ok(())
427    }
428
429    /// Check that the column names and types of risingwave and clickhouse are identical
430    fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
431        let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
432            .iter()
433            .filter(|s| s.is_in_primary_key == 1)
434            .map(|s| s.name.clone())
435            .collect();
436
437        for (_, field) in self
438            .schema
439            .fields()
440            .iter()
441            .enumerate()
442            .filter(|(index, _)| self.pk_indices.contains(index))
443        {
444            if !clickhouse_pks.remove(&field.name) {
445                return Err(SinkError::ClickHouse(
446                    "Clicklhouse and RisingWave pk is not match".to_owned(),
447                ));
448            }
449        }
450
451        if !clickhouse_pks.is_empty() {
452            return Err(SinkError::ClickHouse(
453                "Clicklhouse and RisingWave pk is not match".to_owned(),
454            ));
455        }
456        Ok(())
457    }
458
459    /// Check that the column types of risingwave and clickhouse are identical
460    fn check_and_correct_column_type(
461        fields_type: &DataType,
462        ck_column: &SystemColumn,
463    ) -> Result<()> {
464        // FIXME: the "contains" based implementation is wrong
465        let is_match = match fields_type {
466            risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
467            risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
468                | ck_column.r#type.contains("Int16")
469                // Allow Int16 to be pushed to Enum16, they share an encoding and value range
470                // No special care is taken to ensure values are valid.
471                | ck_column.r#type.contains("Enum16")),
472            risingwave_common::types::DataType::Int32 => {
473                Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
474            }
475            risingwave_common::types::DataType::Int64 => {
476                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
477            }
478            risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
479            risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
480            risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
481            risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
482            risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
483            risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
484                "TIME is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
485            )),
486            risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
487                "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
488            )),
489            risingwave_common::types::DataType::Timestamptz => {
490                Ok(ck_column.r#type.contains("DateTime64"))
491            }
492            risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
493                "INTERVAL is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
494            )),
495            risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
496                "struct needs to be converted into a list".to_owned(),
497            )),
498            risingwave_common::types::DataType::List(list) => {
499                Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
500                Ok(ck_column.r#type.contains("Array"))
501            }
502            risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
503                "BYTEA is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
504            )),
505            risingwave_common::types::DataType::Jsonb => Ok(ck_column.r#type.contains("JSON")),
506            risingwave_common::types::DataType::Serial => {
507                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
508            }
509            risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
510                "INT256 is not supported for ClickHouse sink.".to_owned(),
511            )),
512            risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
513                "MAP is not supported for ClickHouse sink.".to_owned(),
514            )),
515            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
516        };
517        if !is_match? {
518            return Err(SinkError::ClickHouse(format!(
519                "Column type mismatch for column {:?}: RisingWave type is {:?}, ClickHouse type is {:?}",
520                ck_column.name, fields_type, ck_column.r#type
521            )));
522        }
523
524        Ok(())
525    }
526}
527
528impl Sink for ClickHouseSink {
529    type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
530
531    const SINK_NAME: &'static str = CLICKHOUSE_SINK;
532
533    async fn validate(&self) -> Result<()> {
534        // For upsert clickhouse sink, the primary key must be defined.
535        if !self.is_append_only && self.pk_indices.is_empty() {
536            return Err(SinkError::Config(anyhow!(
537                "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
538            )));
539        }
540
541        // check reachability
542        let client = self.config.common.build_client()?;
543
544        let (clickhouse_column, clickhouse_engine) =
545            query_column_engine_from_ck(client, &self.config).await?;
546        if clickhouse_engine.is_shared_tree() {
547            risingwave_common::license::Feature::ClickHouseSharedEngine
548                .check_available()
549                .map_err(|e| anyhow::anyhow!(e))?;
550        }
551
552        if !self.is_append_only
553            && !clickhouse_engine.is_collapsing_engine()
554            && !clickhouse_engine.is_delete_replacing_engine()
555        {
556            return match clickhouse_engine {
557                ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) =>  {
558                    Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
559                }
560                _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
561            };
562        }
563
564        self.check_column_name_and_type(&clickhouse_column)?;
565        if !self.is_append_only {
566            self.check_pk_match(&clickhouse_column)?;
567        }
568
569        if self.config.common.commit_checkpoint_interval == 0 {
570            return Err(SinkError::Config(anyhow!(
571                "`commit_checkpoint_interval` must be greater than 0"
572            )));
573        }
574        Ok(())
575    }
576
577    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
578        ClickHouseConfig::from_btreemap(config.clone())?;
579        Ok(())
580    }
581
582    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
583        let writer = ClickHouseSinkWriter::new(
584            self.config.clone(),
585            self.schema.clone(),
586            self.pk_indices.clone(),
587            self.is_append_only,
588        )
589        .await?;
590        let commit_checkpoint_interval =
591    NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
592        "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
593    );
594
595        Ok(DecoupleCheckpointLogSinkerOf::new(
596            writer,
597            SinkWriterMetrics::new(&writer_param),
598            commit_checkpoint_interval,
599        ))
600    }
601}
602pub struct ClickHouseSinkWriter {
603    pub config: ClickHouseConfig,
604    #[expect(dead_code)]
605    schema: Schema,
606    #[expect(dead_code)]
607    pk_indices: Vec<usize>,
608    client: ClickHouseClient,
609    #[expect(dead_code)]
610    is_append_only: bool,
611    // Save some features of the clickhouse column type
612    column_correct_vec: Vec<ClickHouseSchemaFeature>,
613    rw_fields_name_after_calibration: Vec<String>,
614    clickhouse_engine: ClickHouseEngine,
615    inserter: Option<Insert<ClickHouseColumn>>,
616}
617#[derive(Debug)]
618struct ClickHouseSchemaFeature {
619    can_null: bool,
620    // Time accuracy in clickhouse for rw and ck conversions
621    accuracy_time: u8,
622
623    accuracy_decimal: (u8, u8),
624}
625
626impl ClickHouseSinkWriter {
627    pub async fn new(
628        config: ClickHouseConfig,
629        schema: Schema,
630        pk_indices: Vec<usize>,
631        is_append_only: bool,
632    ) -> Result<Self> {
633        let client = config.common.build_client()?;
634
635        let (clickhouse_column, clickhouse_engine) =
636            query_column_engine_from_ck(client.clone(), &config).await?;
637
638        let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
639            .iter()
640            .map(Self::build_column_correct_vec)
641            .collect();
642        let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
643            .iter()
644            .map(|(a, _)| a.clone())
645            .collect_vec();
646
647        if let Some(sign) = clickhouse_engine.get_sign_name() {
648            rw_fields_name_after_calibration.push(sign);
649        }
650        if let Some(delete_col) = clickhouse_engine.get_delete_col() {
651            rw_fields_name_after_calibration.push(delete_col);
652        }
653        Ok(Self {
654            config,
655            schema,
656            pk_indices,
657            client,
658            is_append_only,
659            column_correct_vec: column_correct_vec?,
660            rw_fields_name_after_calibration,
661            clickhouse_engine,
662            inserter: None,
663        })
664    }
665
666    /// Check if clickhouse's column is 'Nullable', valid bits of `DateTime64`. And save it in
667    /// `column_correct_vec`
668    fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
669        let can_null = ck_column.r#type.contains("Nullable");
670        // `DateTime64` without precision is already displayed as `DateTime(3)` in `system.columns`.
671        let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
672            ck_column
673                .r#type
674                .split("DateTime64(")
675                .last()
676                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
677                .split(')')
678                .next()
679                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
680                .split(',')
681                .next()
682                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
683                .parse::<u8>()
684                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
685        } else {
686            0_u8
687        };
688        let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
689            let decimal_all = ck_column
690                .r#type
691                .split("Decimal(")
692                .last()
693                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
694                .split(')')
695                .next()
696                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
697                .split(", ")
698                .collect_vec();
699            let length = decimal_all
700                .first()
701                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
702                .parse::<u8>()
703                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
704
705            if length > 38 {
706                return Err(SinkError::ClickHouse(
707                    "RW don't support Decimal256".to_owned(),
708                ));
709            }
710
711            let scale = decimal_all
712                .last()
713                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
714                .parse::<u8>()
715                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
716            (length, scale)
717        } else {
718            (0_u8, 0_u8)
719        };
720        Ok(ClickHouseSchemaFeature {
721            can_null,
722            accuracy_time,
723            accuracy_decimal,
724        })
725    }
726
727    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
728        if self.inserter.is_none() {
729            self.inserter = Some(self.client.insert_with_fields_name(
730                &self.config.common.table,
731                self.rw_fields_name_after_calibration.clone(),
732            )?);
733        }
734        for (op, row) in chunk.rows() {
735            let mut clickhouse_filed_vec = vec![];
736            for (index, data) in row.iter().enumerate() {
737                clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
738                    data,
739                    &self.column_correct_vec,
740                    index,
741                )?);
742            }
743            match op {
744                Op::Insert | Op::UpdateInsert => {
745                    if self.clickhouse_engine.is_collapsing_engine() {
746                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
747                            ClickHouseField::Int8(1),
748                        ));
749                    }
750                    if self.clickhouse_engine.is_delete_replacing_engine() {
751                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
752                            ClickHouseField::Int8(0),
753                        ))
754                    }
755                }
756                Op::Delete | Op::UpdateDelete => {
757                    if !self.clickhouse_engine.is_collapsing_engine()
758                        && !self.clickhouse_engine.is_delete_replacing_engine()
759                    {
760                        return Err(SinkError::ClickHouse(
761                            "Clickhouse engine don't support upsert".to_owned(),
762                        ));
763                    }
764                    if self.clickhouse_engine.is_collapsing_engine() {
765                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
766                            ClickHouseField::Int8(-1),
767                        ));
768                    }
769                    if self.clickhouse_engine.is_delete_replacing_engine() {
770                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
771                            ClickHouseField::Int8(1),
772                        ))
773                    }
774                }
775            }
776            let clickhouse_column = ClickHouseColumn {
777                row: clickhouse_filed_vec,
778            };
779            self.inserter
780                .as_mut()
781                .unwrap()
782                .write(&clickhouse_column)
783                .await?;
784        }
785        Ok(())
786    }
787}
788
789#[async_trait]
790impl SinkWriter for ClickHouseSinkWriter {
791    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
792        self.write(chunk).await
793    }
794
795    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
796        Ok(())
797    }
798
799    async fn abort(&mut self) -> Result<()> {
800        Ok(())
801    }
802
803    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
804        if is_checkpoint && let Some(inserter) = self.inserter.take() {
805            inserter.end().await?;
806        }
807        Ok(())
808    }
809}
810
811#[derive(ClickHouseRow, Deserialize, Clone)]
812struct SystemColumn {
813    name: String,
814    r#type: String,
815    is_in_primary_key: u8,
816}
817
818#[derive(ClickHouseRow, Deserialize)]
819struct ClickhouseQueryEngine {
820    #[expect(dead_code)]
821    name: String,
822    engine: String,
823    create_table_query: String,
824}
825
826async fn query_column_engine_from_ck(
827    client: ClickHouseClient,
828    config: &ClickHouseConfig,
829) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
830    let query_engine = QUERY_ENGINE;
831    let query_column = QUERY_COLUMN;
832
833    let clickhouse_engine = client
834        .query(query_engine)
835        .bind(config.common.database.clone())
836        .bind(config.common.table.clone())
837        .fetch_all::<ClickhouseQueryEngine>()
838        .await?;
839    let mut clickhouse_column = client
840        .query(query_column)
841        .bind(config.common.database.clone())
842        .bind(config.common.table.clone())
843        .bind("position")
844        .fetch_all::<SystemColumn>()
845        .await?;
846    if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
847        return Err(SinkError::ClickHouse(format!(
848            "table {:?}.{:?} is not find in clickhouse",
849            config.common.database, config.common.table
850        )));
851    }
852
853    let clickhouse_engine =
854        ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
855
856    if let Some(sign) = &clickhouse_engine.get_sign_name() {
857        clickhouse_column.retain(|a| sign.ne(&a.name))
858    }
859
860    if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
861        clickhouse_column.retain(|a| delete_col.ne(&a.name))
862    }
863
864    Ok((clickhouse_column, clickhouse_engine))
865}
866
867/// Serialize this structure to simulate the `struct` call clickhouse interface
868#[derive(ClickHouseRow, Debug)]
869struct ClickHouseColumn {
870    row: Vec<ClickHouseFieldWithNull>,
871}
872
873/// Basic data types for use with the clickhouse interface
874#[derive(Debug)]
875enum ClickHouseField {
876    Int16(i16),
877    Int32(i32),
878    Int64(i64),
879    Serial(Serial),
880    Float32(f32),
881    Float64(f64),
882    String(String),
883    Bool(bool),
884    List(Vec<ClickHouseFieldWithNull>),
885    Int8(i8),
886    Decimal(ClickHouseDecimal),
887}
888#[derive(Debug)]
889enum ClickHouseDecimal {
890    Decimal32(i32),
891    Decimal64(i64),
892    Decimal128(i128),
893}
894impl Serialize for ClickHouseDecimal {
895    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
896    where
897        S: serde::Serializer,
898    {
899        match self {
900            ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
901            ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
902            ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
903        }
904    }
905}
906
907/// Enum that support clickhouse nullable
908#[derive(Debug)]
909enum ClickHouseFieldWithNull {
910    WithSome(ClickHouseField),
911    WithoutSome(ClickHouseField),
912    None,
913}
914
915impl ClickHouseFieldWithNull {
916    pub fn from_scalar_ref(
917        data: Option<ScalarRefImpl<'_>>,
918        clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
919        clickhouse_schema_feature_index: usize,
920    ) -> Result<Vec<ClickHouseFieldWithNull>> {
921        let clickhouse_schema_feature = clickhouse_schema_feature_vec
922            .get(clickhouse_schema_feature_index)
923            .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
924        if data.is_none() {
925            if !clickhouse_schema_feature.can_null {
926                return Err(SinkError::ClickHouse(
927                    "Cannot insert null value into non-nullable ClickHouse column".to_owned(),
928                ));
929            } else {
930                return Ok(vec![ClickHouseFieldWithNull::None]);
931            }
932        }
933        let data = match data.unwrap() {
934            ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
935            ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
936            ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
937            ScalarRefImpl::Int256(_) => {
938                return Err(SinkError::ClickHouse(
939                    "INT256 is not supported for ClickHouse sink.".to_owned(),
940                ));
941            }
942            ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
943            ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
944            ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
945            ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
946            ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
947            ScalarRefImpl::Decimal(d) => {
948                let d = if let Decimal::Normalized(d) = d {
949                    let scale =
950                        clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
951                    if scale < 0 {
952                        d.mantissa() / 10_i128.pow(scale.unsigned_abs())
953                    } else {
954                        d.mantissa() * 10_i128.pow(scale as u32)
955                    }
956                } else if clickhouse_schema_feature.can_null {
957                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
958                    return Ok(vec![ClickHouseFieldWithNull::None]);
959                } else {
960                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
961                    0_i128
962                };
963                if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
964                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
965                } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
966                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
967                } else {
968                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
969                }
970            }
971            ScalarRefImpl::Interval(_) => {
972                return Err(SinkError::ClickHouse(
973                    "INTERVAL is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
974                ));
975            }
976            ScalarRefImpl::Date(v) => {
977                let days = v.get_nums_days_unix_epoch();
978                ClickHouseField::Int32(days)
979            }
980            ScalarRefImpl::Time(_) => {
981                return Err(SinkError::ClickHouse(
982                    "TIME is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
983                ));
984            }
985            ScalarRefImpl::Timestamp(_) => {
986                return Err(SinkError::ClickHouse(
987                    "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
988                ));
989            }
990            ScalarRefImpl::Timestamptz(v) => {
991                let micros = v.timestamp_micros();
992                let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
993                    true => {
994                        micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
995                    }
996                    false => micros
997                        .checked_mul(
998                            10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
999                        )
1000                        .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1001                };
1002                ClickHouseField::Int64(ticks)
1003            }
1004            ScalarRefImpl::Jsonb(v) => {
1005                let json_str = v.to_string();
1006                ClickHouseField::String(json_str)
1007            }
1008            ScalarRefImpl::Struct(v) => {
1009                let mut struct_vec = vec![];
1010                for (index, field) in v.iter_fields_ref().enumerate() {
1011                    let a = Self::from_scalar_ref(
1012                        field,
1013                        clickhouse_schema_feature_vec,
1014                        clickhouse_schema_feature_index + index,
1015                    )?;
1016                    struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1017                        a,
1018                    )));
1019                }
1020                return Ok(struct_vec);
1021            }
1022            ScalarRefImpl::List(v) => {
1023                let mut vec = vec![];
1024                for i in v.iter() {
1025                    vec.extend(Self::from_scalar_ref(
1026                        i,
1027                        clickhouse_schema_feature_vec,
1028                        clickhouse_schema_feature_index,
1029                    )?)
1030                }
1031                return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1032                    ClickHouseField::List(vec),
1033                )]);
1034            }
1035            ScalarRefImpl::Bytea(_) => {
1036                return Err(SinkError::ClickHouse(
1037                    "BYTEA is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
1038                ));
1039            }
1040            ScalarRefImpl::Map(_) => {
1041                return Err(SinkError::ClickHouse(
1042                    "MAP is not supported for ClickHouse sink.".to_owned(),
1043                ));
1044            }
1045            ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
1046        };
1047        let data = if clickhouse_schema_feature.can_null {
1048            vec![ClickHouseFieldWithNull::WithSome(data)]
1049        } else {
1050            vec![ClickHouseFieldWithNull::WithoutSome(data)]
1051        };
1052        Ok(data)
1053    }
1054}
1055
1056impl Serialize for ClickHouseField {
1057    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1058    where
1059        S: serde::Serializer,
1060    {
1061        match self {
1062            ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1063            ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1064            ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1065            ClickHouseField::Serial(v) => v.serialize(serializer),
1066            ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1067            ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1068            ClickHouseField::String(v) => serializer.serialize_str(v),
1069            ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1070            ClickHouseField::List(v) => {
1071                let mut s = serializer.serialize_seq(Some(v.len()))?;
1072                for i in v {
1073                    s.serialize_element(i)?;
1074                }
1075                s.end()
1076            }
1077            ClickHouseField::Decimal(v) => v.serialize(serializer),
1078            ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1079        }
1080    }
1081}
1082impl Serialize for ClickHouseFieldWithNull {
1083    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1084    where
1085        S: serde::Serializer,
1086    {
1087        match self {
1088            ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1089            ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1090            ClickHouseFieldWithNull::None => serializer.serialize_none(),
1091        }
1092    }
1093}
1094impl Serialize for ClickHouseColumn {
1095    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1096    where
1097        S: serde::Serializer,
1098    {
1099        let mut s = serializer.serialize_struct("useless", self.row.len())?;
1100        for data in &self.row {
1101            s.serialize_field("useless", &data)?
1102        }
1103        s.end()
1104    }
1105}
1106
1107/// 'Struct'(clickhouse type name is nested) will be converted into some arrays by clickhouse. So we
1108/// need to make some conversions
1109pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1110    let mut vec = vec![];
1111    for field in schema.fields() {
1112        if let DataType::Struct(st) = &field.data_type {
1113            for (name, data_type) in st.iter() {
1114                if matches!(data_type, DataType::Struct(_)) {
1115                    return Err(SinkError::ClickHouse(
1116                        "Only one level of nesting is supported for struct".to_owned(),
1117                    ));
1118                } else {
1119                    vec.push((
1120                        format!("{}.{}", field.name, name),
1121                        DataType::List(Box::new(data_type.clone())),
1122                    ))
1123                }
1124            }
1125        } else {
1126            vec.push((field.name.clone(), field.data_type()));
1127        }
1128    }
1129    Ok(vec)
1130}