risingwave_connector/sink/
clickhouse.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::Serialize;
29use serde::ser::{SerializeSeq, SerializeStruct};
30use serde_derive::Deserialize;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use super::decouple_checkpoint_log_sink::{
38    DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
39};
40use super::writer::SinkWriter;
41use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
42use crate::enforce_secret::EnforceSecret;
43use crate::error::ConnectorResult;
44use crate::sink::{
45    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
46};
47
48const QUERY_ENGINE: &str =
49    "select distinct ?fields from system.tables where database = ? and name = ?";
50const QUERY_COLUMN: &str =
51    "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
52pub const CLICKHOUSE_SINK: &str = "clickhouse";
53
54const ALLOW_EXPERIMENTAL_JSON_TYPE: &str = "allow_experimental_json_type";
55const INPUT_FORMAT_BINARY_READ_JSON_AS_STRING: &str = "input_format_binary_read_json_as_string";
56const OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING: &str = "output_format_binary_write_json_as_string";
57
58#[serde_as]
59#[derive(Deserialize, Debug, Clone, WithOptions)]
60pub struct ClickHouseCommon {
61    #[serde(rename = "clickhouse.url")]
62    pub url: String,
63    #[serde(rename = "clickhouse.user")]
64    pub user: String,
65    #[serde(rename = "clickhouse.password")]
66    pub password: String,
67    #[serde(rename = "clickhouse.database")]
68    pub database: String,
69    #[serde(rename = "clickhouse.table")]
70    pub table: String,
71    #[serde(rename = "clickhouse.delete.column")]
72    pub delete_column: Option<String>,
73    /// Commit every n(>0) checkpoints, default is 10.
74    #[serde(default = "default_commit_checkpoint_interval")]
75    #[serde_as(as = "DisplayFromStr")]
76    pub commit_checkpoint_interval: u64,
77}
78
79impl EnforceSecret for ClickHouseCommon {
80    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
81        "clickhouse.password", "clickhouse.user"
82    };
83}
84
85#[allow(clippy::enum_variant_names)]
86#[derive(Debug)]
87enum ClickHouseEngine {
88    MergeTree,
89    ReplacingMergeTree(Option<String>),
90    SummingMergeTree,
91    AggregatingMergeTree,
92    CollapsingMergeTree(String),
93    VersionedCollapsingMergeTree(String),
94    GraphiteMergeTree,
95    ReplicatedMergeTree,
96    ReplicatedReplacingMergeTree(Option<String>),
97    ReplicatedSummingMergeTree,
98    ReplicatedAggregatingMergeTree,
99    ReplicatedCollapsingMergeTree(String),
100    ReplicatedVersionedCollapsingMergeTree(String),
101    ReplicatedGraphiteMergeTree,
102    SharedMergeTree,
103    SharedReplacingMergeTree(Option<String>),
104    SharedSummingMergeTree,
105    SharedAggregatingMergeTree,
106    SharedCollapsingMergeTree(String),
107    SharedVersionedCollapsingMergeTree(String),
108    SharedGraphiteMergeTree,
109    Null,
110}
111impl ClickHouseEngine {
112    pub fn is_collapsing_engine(&self) -> bool {
113        matches!(
114            self,
115            ClickHouseEngine::CollapsingMergeTree(_)
116                | ClickHouseEngine::VersionedCollapsingMergeTree(_)
117                | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
118                | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
119                | ClickHouseEngine::SharedCollapsingMergeTree(_)
120                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
121        )
122    }
123
124    pub fn is_delete_replacing_engine(&self) -> bool {
125        match self {
126            ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
127            ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
128            ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
129            _ => false,
130        }
131    }
132
133    pub fn get_delete_col(&self) -> Option<String> {
134        match self {
135            ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
136            ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
137                Some(delete_col.to_string())
138            }
139            ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
140                Some(delete_col.to_string())
141            }
142            _ => None,
143        }
144    }
145
146    pub fn get_sign_name(&self) -> Option<String> {
147        match self {
148            ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
149            ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => {
150                Some(sign_name.to_string())
151            }
152            ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => {
153                Some(sign_name.to_string())
154            }
155            ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
156                Some(sign_name.to_string())
157            }
158            ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
159            ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
160                Some(sign_name.to_string())
161            }
162            _ => None,
163        }
164    }
165
166    pub fn is_shared_tree(&self) -> bool {
167        matches!(
168            self,
169            ClickHouseEngine::SharedMergeTree
170                | ClickHouseEngine::SharedReplacingMergeTree(_)
171                | ClickHouseEngine::SharedSummingMergeTree
172                | ClickHouseEngine::SharedAggregatingMergeTree
173                | ClickHouseEngine::SharedCollapsingMergeTree(_)
174                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
175                | ClickHouseEngine::SharedGraphiteMergeTree
176        )
177    }
178
179    pub fn from_query_engine(
180        engine_name: &ClickhouseQueryEngine,
181        config: &ClickHouseConfig,
182    ) -> Result<Self> {
183        match engine_name.engine.as_str() {
184            "MergeTree" => Ok(ClickHouseEngine::MergeTree),
185            "Null" => Ok(ClickHouseEngine::Null),
186            "ReplacingMergeTree" => {
187                let delete_column = config.common.delete_column.clone();
188                Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
189            }
190            "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
191            "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
192            // VersionedCollapsingMergeTree(sign_name,"a")
193            "VersionedCollapsingMergeTree" => {
194                let sign_name = engine_name
195                    .create_table_query
196                    .split("VersionedCollapsingMergeTree(")
197                    .last()
198                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
199                    .split(',')
200                    .next()
201                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
202                    .trim()
203                    .to_owned();
204                Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
205            }
206            // CollapsingMergeTree(sign_name)
207            "CollapsingMergeTree" => {
208                let sign_name = engine_name
209                    .create_table_query
210                    .split("CollapsingMergeTree(")
211                    .last()
212                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
213                    .split(')')
214                    .next()
215                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
216                    .trim()
217                    .to_owned();
218                Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
219            }
220            "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
221            "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
222            "ReplicatedReplacingMergeTree" => {
223                let delete_column = config.common.delete_column.clone();
224                Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
225                    delete_column,
226                ))
227            }
228            "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
229            "ReplicatedAggregatingMergeTree" => {
230                Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
231            }
232            // ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c")
233            "ReplicatedVersionedCollapsingMergeTree" => {
234                let sign_name = engine_name
235                    .create_table_query
236                    .split("ReplicatedVersionedCollapsingMergeTree(")
237                    .last()
238                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
239                    .split(',')
240                    .rev()
241                    .nth(1)
242                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
243                    .trim()
244                    .to_owned();
245                Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
246                    sign_name,
247                ))
248            }
249            // ReplicatedCollapsingMergeTree("a","b",sign_name)
250            "ReplicatedCollapsingMergeTree" => {
251                let sign_name = engine_name
252                    .create_table_query
253                    .split("ReplicatedCollapsingMergeTree(")
254                    .last()
255                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
256                    .split(')')
257                    .next()
258                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
259                    .split(',')
260                    .next_back()
261                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
262                    .trim()
263                    .to_owned();
264                Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
265            }
266            "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
267            "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
268            "SharedReplacingMergeTree" => {
269                let delete_column = config.common.delete_column.clone();
270                Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
271            }
272            "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
273            "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
274            // SharedVersionedCollapsingMergeTree("a","b",sign_name,"c")
275            "SharedVersionedCollapsingMergeTree" => {
276                let sign_name = engine_name
277                    .create_table_query
278                    .split("SharedVersionedCollapsingMergeTree(")
279                    .last()
280                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
281                    .split(',')
282                    .rev()
283                    .nth(1)
284                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
285                    .trim()
286                    .to_owned();
287                Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
288                    sign_name,
289                ))
290            }
291            // SharedCollapsingMergeTree("a","b",sign_name)
292            "SharedCollapsingMergeTree" => {
293                let sign_name = engine_name
294                    .create_table_query
295                    .split("SharedCollapsingMergeTree(")
296                    .last()
297                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
298                    .split(')')
299                    .next()
300                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
301                    .split(',')
302                    .next_back()
303                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
304                    .trim()
305                    .to_owned();
306                Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
307            }
308            "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
309            _ => Err(SinkError::ClickHouse(format!(
310                "Cannot find clickhouse engine {:?}",
311                engine_name.engine
312            ))),
313        }
314    }
315}
316
317impl ClickHouseCommon {
318    pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
319        let client = ClickHouseClient::default() // hyper(0.14) client inside
320            .with_url(&self.url)
321            .with_user(&self.user)
322            .with_password(&self.password)
323            .with_database(&self.database)
324            .with_option(ALLOW_EXPERIMENTAL_JSON_TYPE, "1")
325            .with_option(INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1")
326            .with_option(OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1");
327        Ok(client)
328    }
329}
330
331#[serde_as]
332#[derive(Clone, Debug, Deserialize, WithOptions)]
333pub struct ClickHouseConfig {
334    #[serde(flatten)]
335    pub common: ClickHouseCommon,
336
337    pub r#type: String, // accept "append-only" or "upsert"
338}
339
340impl EnforceSecret for ClickHouseConfig {
341    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
342        ClickHouseCommon::enforce_one(prop)
343    }
344
345    fn enforce_secret<'a>(
346        prop_iter: impl Iterator<Item = &'a str>,
347    ) -> crate::error::ConnectorResult<()> {
348        for prop in prop_iter {
349            ClickHouseCommon::enforce_one(prop)?;
350        }
351        Ok(())
352    }
353}
354
355#[derive(Clone, Debug)]
356pub struct ClickHouseSink {
357    pub config: ClickHouseConfig,
358    schema: Schema,
359    pk_indices: Vec<usize>,
360    is_append_only: bool,
361}
362
363impl EnforceSecret for ClickHouseSink {
364    fn enforce_secret<'a>(
365        prop_iter: impl Iterator<Item = &'a str>,
366    ) -> crate::error::ConnectorResult<()> {
367        for prop in prop_iter {
368            ClickHouseConfig::enforce_one(prop)?;
369        }
370        Ok(())
371    }
372}
373
374impl ClickHouseConfig {
375    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
376        let config =
377            serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
378                .map_err(|e| SinkError::Config(anyhow!(e)))?;
379        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
380            return Err(SinkError::Config(anyhow!(
381                "`{}` must be {}, or {}",
382                SINK_TYPE_OPTION,
383                SINK_TYPE_APPEND_ONLY,
384                SINK_TYPE_UPSERT
385            )));
386        }
387        Ok(config)
388    }
389}
390
391impl TryFrom<SinkParam> for ClickHouseSink {
392    type Error = SinkError;
393
394    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
395        let schema = param.schema();
396        let config = ClickHouseConfig::from_btreemap(param.properties)?;
397        Ok(Self {
398            config,
399            schema,
400            pk_indices: param.downstream_pk,
401            is_append_only: param.sink_type.is_append_only(),
402        })
403    }
404}
405
406impl ClickHouseSink {
407    /// Check that the column names and types of risingwave and clickhouse are identical
408    fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
409        let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
410        let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
411            .iter()
412            .map(|s| (s.name.clone(), s.clone()))
413            .collect();
414
415        if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
416            return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
417        }
418
419        for i in rw_fields_name {
420            let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
421                SinkError::ClickHouse(format!(
422                    "Column name don't find in clickhouse, risingwave is {:?} ",
423                    i.0
424                ))
425            })?;
426
427            Self::check_and_correct_column_type(&i.1, value)?;
428        }
429        Ok(())
430    }
431
432    /// Check that the column names and types of risingwave and clickhouse are identical
433    fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
434        let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
435            .iter()
436            .filter(|s| s.is_in_primary_key == 1)
437            .map(|s| s.name.clone())
438            .collect();
439
440        for (_, field) in self
441            .schema
442            .fields()
443            .iter()
444            .enumerate()
445            .filter(|(index, _)| self.pk_indices.contains(index))
446        {
447            if !clickhouse_pks.remove(&field.name) {
448                return Err(SinkError::ClickHouse(
449                    "Clicklhouse and RisingWave pk is not match".to_owned(),
450                ));
451            }
452        }
453
454        if !clickhouse_pks.is_empty() {
455            return Err(SinkError::ClickHouse(
456                "Clicklhouse and RisingWave pk is not match".to_owned(),
457            ));
458        }
459        Ok(())
460    }
461
462    /// Check that the column types of risingwave and clickhouse are identical
463    fn check_and_correct_column_type(
464        fields_type: &DataType,
465        ck_column: &SystemColumn,
466    ) -> Result<()> {
467        // FIXME: the "contains" based implementation is wrong
468        let is_match = match fields_type {
469            risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
470            risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
471                | ck_column.r#type.contains("Int16")
472                // Allow Int16 to be pushed to Enum16, they share an encoding and value range
473                // No special care is taken to ensure values are valid.
474                | ck_column.r#type.contains("Enum16")),
475            risingwave_common::types::DataType::Int32 => {
476                Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
477            }
478            risingwave_common::types::DataType::Int64 => {
479                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
480            }
481            risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
482            risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
483            risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
484            risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
485            risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
486            risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
487                "clickhouse can not support Time".to_owned(),
488            )),
489            risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
490                "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
491            )),
492            risingwave_common::types::DataType::Timestamptz => {
493                Ok(ck_column.r#type.contains("DateTime64"))
494            }
495            risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
496                "clickhouse can not support Interval".to_owned(),
497            )),
498            risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
499                "struct needs to be converted into a list".to_owned(),
500            )),
501            risingwave_common::types::DataType::List(list) => {
502                Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
503                Ok(ck_column.r#type.contains("Array"))
504            }
505            risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
506                "clickhouse can not support Bytea".to_owned(),
507            )),
508            risingwave_common::types::DataType::Jsonb => Ok(ck_column.r#type.contains("JSON")),
509            risingwave_common::types::DataType::Serial => {
510                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
511            }
512            risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
513                "clickhouse can not support Int256".to_owned(),
514            )),
515            risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
516                "clickhouse can not support Map".to_owned(),
517            )),
518            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
519        };
520        if !is_match? {
521            return Err(SinkError::ClickHouse(format!(
522                "Column type can not match name is {:?}, risingwave is {:?} and clickhouse is {:?}",
523                ck_column.name, fields_type, ck_column.r#type
524            )));
525        }
526
527        Ok(())
528    }
529}
530
531impl Sink for ClickHouseSink {
532    type Coordinator = DummySinkCommitCoordinator;
533    type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
534
535    const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
536    const SINK_NAME: &'static str = CLICKHOUSE_SINK;
537
538    async fn validate(&self) -> Result<()> {
539        // For upsert clickhouse sink, the primary key must be defined.
540        if !self.is_append_only && self.pk_indices.is_empty() {
541            return Err(SinkError::Config(anyhow!(
542                "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
543            )));
544        }
545
546        // check reachability
547        let client = self.config.common.build_client()?;
548
549        let (clickhouse_column, clickhouse_engine) =
550            query_column_engine_from_ck(client, &self.config).await?;
551        if clickhouse_engine.is_shared_tree() {
552            risingwave_common::license::Feature::ClickHouseSharedEngine
553                .check_available()
554                .map_err(|e| anyhow::anyhow!(e))?;
555        }
556
557        if !self.is_append_only
558            && !clickhouse_engine.is_collapsing_engine()
559            && !clickhouse_engine.is_delete_replacing_engine()
560        {
561            return match clickhouse_engine {
562                ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) =>  {
563                    Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
564                }
565                _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
566            };
567        }
568
569        self.check_column_name_and_type(&clickhouse_column)?;
570        if !self.is_append_only {
571            self.check_pk_match(&clickhouse_column)?;
572        }
573
574        if self.config.common.commit_checkpoint_interval == 0 {
575            return Err(SinkError::Config(anyhow!(
576                "`commit_checkpoint_interval` must be greater than 0"
577            )));
578        }
579        Ok(())
580    }
581
582    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
583        ClickHouseConfig::from_btreemap(config.clone())?;
584        Ok(())
585    }
586
587    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
588        let writer = ClickHouseSinkWriter::new(
589            self.config.clone(),
590            self.schema.clone(),
591            self.pk_indices.clone(),
592            self.is_append_only,
593        )
594        .await?;
595        let commit_checkpoint_interval =
596    NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
597        "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
598    );
599
600        Ok(DecoupleCheckpointLogSinkerOf::new(
601            writer,
602            SinkWriterMetrics::new(&writer_param),
603            commit_checkpoint_interval,
604        ))
605    }
606}
607pub struct ClickHouseSinkWriter {
608    pub config: ClickHouseConfig,
609    #[expect(dead_code)]
610    schema: Schema,
611    #[expect(dead_code)]
612    pk_indices: Vec<usize>,
613    client: ClickHouseClient,
614    #[expect(dead_code)]
615    is_append_only: bool,
616    // Save some features of the clickhouse column type
617    column_correct_vec: Vec<ClickHouseSchemaFeature>,
618    rw_fields_name_after_calibration: Vec<String>,
619    clickhouse_engine: ClickHouseEngine,
620    inserter: Option<Insert<ClickHouseColumn>>,
621}
622#[derive(Debug)]
623struct ClickHouseSchemaFeature {
624    can_null: bool,
625    // Time accuracy in clickhouse for rw and ck conversions
626    accuracy_time: u8,
627
628    accuracy_decimal: (u8, u8),
629}
630
631impl ClickHouseSinkWriter {
632    pub async fn new(
633        config: ClickHouseConfig,
634        schema: Schema,
635        pk_indices: Vec<usize>,
636        is_append_only: bool,
637    ) -> Result<Self> {
638        let client = config.common.build_client()?;
639
640        let (clickhouse_column, clickhouse_engine) =
641            query_column_engine_from_ck(client.clone(), &config).await?;
642
643        let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
644            .iter()
645            .map(Self::build_column_correct_vec)
646            .collect();
647        let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
648            .iter()
649            .map(|(a, _)| a.clone())
650            .collect_vec();
651
652        if let Some(sign) = clickhouse_engine.get_sign_name() {
653            rw_fields_name_after_calibration.push(sign);
654        }
655        if let Some(delete_col) = clickhouse_engine.get_delete_col() {
656            rw_fields_name_after_calibration.push(delete_col);
657        }
658        Ok(Self {
659            config,
660            schema,
661            pk_indices,
662            client,
663            is_append_only,
664            column_correct_vec: column_correct_vec?,
665            rw_fields_name_after_calibration,
666            clickhouse_engine,
667            inserter: None,
668        })
669    }
670
671    /// Check if clickhouse's column is 'Nullable', valid bits of `DateTime64`. And save it in
672    /// `column_correct_vec`
673    fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
674        let can_null = ck_column.r#type.contains("Nullable");
675        // `DateTime64` without precision is already displayed as `DateTime(3)` in `system.columns`.
676        let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
677            ck_column
678                .r#type
679                .split("DateTime64(")
680                .last()
681                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
682                .split(')')
683                .next()
684                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
685                .split(',')
686                .next()
687                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
688                .parse::<u8>()
689                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
690        } else {
691            0_u8
692        };
693        let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
694            let decimal_all = ck_column
695                .r#type
696                .split("Decimal(")
697                .last()
698                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
699                .split(')')
700                .next()
701                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
702                .split(", ")
703                .collect_vec();
704            let length = decimal_all
705                .first()
706                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
707                .parse::<u8>()
708                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
709
710            if length > 38 {
711                return Err(SinkError::ClickHouse(
712                    "RW don't support Decimal256".to_owned(),
713                ));
714            }
715
716            let scale = decimal_all
717                .last()
718                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
719                .parse::<u8>()
720                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
721            (length, scale)
722        } else {
723            (0_u8, 0_u8)
724        };
725        Ok(ClickHouseSchemaFeature {
726            can_null,
727            accuracy_time,
728            accuracy_decimal,
729        })
730    }
731
732    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
733        if self.inserter.is_none() {
734            self.inserter = Some(self.client.insert_with_fields_name(
735                &self.config.common.table,
736                self.rw_fields_name_after_calibration.clone(),
737            )?);
738        }
739        for (op, row) in chunk.rows() {
740            let mut clickhouse_filed_vec = vec![];
741            for (index, data) in row.iter().enumerate() {
742                clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
743                    data,
744                    &self.column_correct_vec,
745                    index,
746                )?);
747            }
748            match op {
749                Op::Insert | Op::UpdateInsert => {
750                    if self.clickhouse_engine.is_collapsing_engine() {
751                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
752                            ClickHouseField::Int8(1),
753                        ));
754                    }
755                    if self.clickhouse_engine.is_delete_replacing_engine() {
756                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
757                            ClickHouseField::Int8(0),
758                        ))
759                    }
760                }
761                Op::Delete | Op::UpdateDelete => {
762                    if !self.clickhouse_engine.is_collapsing_engine()
763                        && !self.clickhouse_engine.is_delete_replacing_engine()
764                    {
765                        return Err(SinkError::ClickHouse(
766                            "Clickhouse engine don't support upsert".to_owned(),
767                        ));
768                    }
769                    if self.clickhouse_engine.is_collapsing_engine() {
770                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
771                            ClickHouseField::Int8(-1),
772                        ));
773                    }
774                    if self.clickhouse_engine.is_delete_replacing_engine() {
775                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
776                            ClickHouseField::Int8(1),
777                        ))
778                    }
779                }
780            }
781            let clickhouse_column = ClickHouseColumn {
782                row: clickhouse_filed_vec,
783            };
784            self.inserter
785                .as_mut()
786                .unwrap()
787                .write(&clickhouse_column)
788                .await?;
789        }
790        Ok(())
791    }
792}
793
794#[async_trait]
795impl SinkWriter for ClickHouseSinkWriter {
796    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
797        self.write(chunk).await
798    }
799
800    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
801        Ok(())
802    }
803
804    async fn abort(&mut self) -> Result<()> {
805        Ok(())
806    }
807
808    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
809        if is_checkpoint && let Some(inserter) = self.inserter.take() {
810            inserter.end().await?;
811        }
812        Ok(())
813    }
814}
815
816#[derive(ClickHouseRow, Deserialize, Clone)]
817struct SystemColumn {
818    name: String,
819    r#type: String,
820    is_in_primary_key: u8,
821}
822
823#[derive(ClickHouseRow, Deserialize)]
824struct ClickhouseQueryEngine {
825    #[expect(dead_code)]
826    name: String,
827    engine: String,
828    create_table_query: String,
829}
830
831async fn query_column_engine_from_ck(
832    client: ClickHouseClient,
833    config: &ClickHouseConfig,
834) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
835    let query_engine = QUERY_ENGINE;
836    let query_column = QUERY_COLUMN;
837
838    let clickhouse_engine = client
839        .query(query_engine)
840        .bind(config.common.database.clone())
841        .bind(config.common.table.clone())
842        .fetch_all::<ClickhouseQueryEngine>()
843        .await?;
844    let mut clickhouse_column = client
845        .query(query_column)
846        .bind(config.common.database.clone())
847        .bind(config.common.table.clone())
848        .bind("position")
849        .fetch_all::<SystemColumn>()
850        .await?;
851    if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
852        return Err(SinkError::ClickHouse(format!(
853            "table {:?}.{:?} is not find in clickhouse",
854            config.common.database, config.common.table
855        )));
856    }
857
858    let clickhouse_engine =
859        ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
860
861    if let Some(sign) = &clickhouse_engine.get_sign_name() {
862        clickhouse_column.retain(|a| sign.ne(&a.name))
863    }
864
865    if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
866        clickhouse_column.retain(|a| delete_col.ne(&a.name))
867    }
868
869    Ok((clickhouse_column, clickhouse_engine))
870}
871
872/// Serialize this structure to simulate the `struct` call clickhouse interface
873#[derive(ClickHouseRow, Debug)]
874struct ClickHouseColumn {
875    row: Vec<ClickHouseFieldWithNull>,
876}
877
878/// Basic data types for use with the clickhouse interface
879#[derive(Debug)]
880enum ClickHouseField {
881    Int16(i16),
882    Int32(i32),
883    Int64(i64),
884    Serial(Serial),
885    Float32(f32),
886    Float64(f64),
887    String(String),
888    Bool(bool),
889    List(Vec<ClickHouseFieldWithNull>),
890    Int8(i8),
891    Decimal(ClickHouseDecimal),
892}
893#[derive(Debug)]
894enum ClickHouseDecimal {
895    Decimal32(i32),
896    Decimal64(i64),
897    Decimal128(i128),
898}
899impl Serialize for ClickHouseDecimal {
900    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
901    where
902        S: serde::Serializer,
903    {
904        match self {
905            ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
906            ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
907            ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
908        }
909    }
910}
911
912/// Enum that support clickhouse nullable
913#[derive(Debug)]
914enum ClickHouseFieldWithNull {
915    WithSome(ClickHouseField),
916    WithoutSome(ClickHouseField),
917    None,
918}
919
920impl ClickHouseFieldWithNull {
921    pub fn from_scalar_ref(
922        data: Option<ScalarRefImpl<'_>>,
923        clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
924        clickhouse_schema_feature_index: usize,
925    ) -> Result<Vec<ClickHouseFieldWithNull>> {
926        let clickhouse_schema_feature = clickhouse_schema_feature_vec
927            .get(clickhouse_schema_feature_index)
928            .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
929        if data.is_none() {
930            if !clickhouse_schema_feature.can_null {
931                return Err(SinkError::ClickHouse(
932                    "clickhouse column can not insert null".to_owned(),
933                ));
934            } else {
935                return Ok(vec![ClickHouseFieldWithNull::None]);
936            }
937        }
938        let data = match data.unwrap() {
939            ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
940            ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
941            ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
942            ScalarRefImpl::Int256(_) => {
943                return Err(SinkError::ClickHouse(
944                    "clickhouse can not support Int256".to_owned(),
945                ));
946            }
947            ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
948            ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
949            ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
950            ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
951            ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
952            ScalarRefImpl::Decimal(d) => {
953                let d = if let Decimal::Normalized(d) = d {
954                    let scale =
955                        clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
956                    if scale < 0 {
957                        d.mantissa() / 10_i128.pow(scale.unsigned_abs())
958                    } else {
959                        d.mantissa() * 10_i128.pow(scale as u32)
960                    }
961                } else if clickhouse_schema_feature.can_null {
962                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
963                    return Ok(vec![ClickHouseFieldWithNull::None]);
964                } else {
965                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
966                    0_i128
967                };
968                if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
969                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
970                } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
971                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
972                } else {
973                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
974                }
975            }
976            ScalarRefImpl::Interval(_) => {
977                return Err(SinkError::ClickHouse(
978                    "clickhouse can not support Interval".to_owned(),
979                ));
980            }
981            ScalarRefImpl::Date(v) => {
982                let days = v.get_nums_days_unix_epoch();
983                ClickHouseField::Int32(days)
984            }
985            ScalarRefImpl::Time(_) => {
986                return Err(SinkError::ClickHouse(
987                    "clickhouse can not support Time".to_owned(),
988                ));
989            }
990            ScalarRefImpl::Timestamp(_) => {
991                return Err(SinkError::ClickHouse(
992                    "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
993                ));
994            }
995            ScalarRefImpl::Timestamptz(v) => {
996                let micros = v.timestamp_micros();
997                let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
998                    true => {
999                        micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
1000                    }
1001                    false => micros
1002                        .checked_mul(
1003                            10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
1004                        )
1005                        .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1006                };
1007                ClickHouseField::Int64(ticks)
1008            }
1009            ScalarRefImpl::Jsonb(v) => {
1010                let json_str = v.to_string();
1011                ClickHouseField::String(json_str)
1012            }
1013            ScalarRefImpl::Struct(v) => {
1014                let mut struct_vec = vec![];
1015                for (index, field) in v.iter_fields_ref().enumerate() {
1016                    let a = Self::from_scalar_ref(
1017                        field,
1018                        clickhouse_schema_feature_vec,
1019                        clickhouse_schema_feature_index + index,
1020                    )?;
1021                    struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1022                        a,
1023                    )));
1024                }
1025                return Ok(struct_vec);
1026            }
1027            ScalarRefImpl::List(v) => {
1028                let mut vec = vec![];
1029                for i in v.iter() {
1030                    vec.extend(Self::from_scalar_ref(
1031                        i,
1032                        clickhouse_schema_feature_vec,
1033                        clickhouse_schema_feature_index,
1034                    )?)
1035                }
1036                return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1037                    ClickHouseField::List(vec),
1038                )]);
1039            }
1040            ScalarRefImpl::Bytea(_) => {
1041                return Err(SinkError::ClickHouse(
1042                    "clickhouse can not support Bytea".to_owned(),
1043                ));
1044            }
1045            ScalarRefImpl::Map(_) => {
1046                return Err(SinkError::ClickHouse(
1047                    "clickhouse can not support Map".to_owned(),
1048                ));
1049            }
1050            ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
1051        };
1052        let data = if clickhouse_schema_feature.can_null {
1053            vec![ClickHouseFieldWithNull::WithSome(data)]
1054        } else {
1055            vec![ClickHouseFieldWithNull::WithoutSome(data)]
1056        };
1057        Ok(data)
1058    }
1059}
1060
1061impl Serialize for ClickHouseField {
1062    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1063    where
1064        S: serde::Serializer,
1065    {
1066        match self {
1067            ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1068            ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1069            ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1070            ClickHouseField::Serial(v) => v.serialize(serializer),
1071            ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1072            ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1073            ClickHouseField::String(v) => serializer.serialize_str(v),
1074            ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1075            ClickHouseField::List(v) => {
1076                let mut s = serializer.serialize_seq(Some(v.len()))?;
1077                for i in v {
1078                    s.serialize_element(i)?;
1079                }
1080                s.end()
1081            }
1082            ClickHouseField::Decimal(v) => v.serialize(serializer),
1083            ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1084        }
1085    }
1086}
1087impl Serialize for ClickHouseFieldWithNull {
1088    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1089    where
1090        S: serde::Serializer,
1091    {
1092        match self {
1093            ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1094            ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1095            ClickHouseFieldWithNull::None => serializer.serialize_none(),
1096        }
1097    }
1098}
1099impl Serialize for ClickHouseColumn {
1100    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1101    where
1102        S: serde::Serializer,
1103    {
1104        let mut s = serializer.serialize_struct("useless", self.row.len())?;
1105        for data in &self.row {
1106            s.serialize_field("useless", &data)?
1107        }
1108        s.end()
1109    }
1110}
1111
1112/// 'Struct'(clickhouse type name is nested) will be converted into some arrays by clickhouse. So we
1113/// need to make some conversions
1114pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1115    let mut vec = vec![];
1116    for field in schema.fields() {
1117        if let DataType::Struct(st) = &field.data_type {
1118            for (name, data_type) in st.iter() {
1119                if matches!(data_type, DataType::Struct(_)) {
1120                    return Err(SinkError::ClickHouse(
1121                        "Only one level of nesting is supported for struct".to_owned(),
1122                    ));
1123                } else {
1124                    vec.push((
1125                        format!("{}.{}", field.name, name),
1126                        DataType::List(Box::new(data_type.clone())),
1127                    ))
1128                }
1129            }
1130        } else {
1131            vec.push((field.name.clone(), field.data_type()));
1132        }
1133    }
1134    Ok(vec)
1135}