risingwave_connector/sink/
clickhouse.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::ser::{SerializeSeq, SerializeStruct};
29use serde::{Deserialize, Serialize};
30use serde_with::{DisplayFromStr, serde_as};
31use thiserror_ext::AsReport;
32use tonic::async_trait;
33use tracing::warn;
34use with_options::WithOptions;
35
36use super::decouple_checkpoint_log_sink::{
37    DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
38};
39use super::writer::SinkWriter;
40use super::{SinkWriterMetrics, SinkWriterParam};
41use crate::enforce_secret::EnforceSecret;
42use crate::error::ConnectorResult;
43use crate::sink::{
44    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
45};
46
47const QUERY_ENGINE: &str =
48    "select distinct ?fields from system.tables where database = ? and name = ?";
49const QUERY_COLUMN: &str =
50    "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
51pub const CLICKHOUSE_SINK: &str = "clickhouse";
52
53const ALLOW_EXPERIMENTAL_JSON_TYPE: &str = "allow_experimental_json_type";
54const INPUT_FORMAT_BINARY_READ_JSON_AS_STRING: &str = "input_format_binary_read_json_as_string";
55const OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING: &str = "output_format_binary_write_json_as_string";
56
57#[serde_as]
58#[derive(Deserialize, Debug, Clone, WithOptions)]
59pub struct ClickHouseCommon {
60    #[serde(rename = "clickhouse.url")]
61    pub url: String,
62    #[serde(rename = "clickhouse.user")]
63    pub user: String,
64    #[serde(rename = "clickhouse.password")]
65    pub password: String,
66    #[serde(rename = "clickhouse.database")]
67    pub database: String,
68    #[serde(rename = "clickhouse.table")]
69    pub table: String,
70    #[serde(rename = "clickhouse.delete.column")]
71    pub delete_column: Option<String>,
72    /// Commit every n(>0) checkpoints, default is 10.
73    #[serde(default = "default_commit_checkpoint_interval")]
74    #[serde_as(as = "DisplayFromStr")]
75    #[with_option(allow_alter_on_fly)]
76    pub commit_checkpoint_interval: u64,
77}
78
79impl EnforceSecret for ClickHouseCommon {
80    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
81        "clickhouse.password", "clickhouse.user"
82    };
83}
84
85#[allow(clippy::enum_variant_names)]
86#[derive(Debug)]
87enum ClickHouseEngine {
88    MergeTree,
89    ReplacingMergeTree(Option<String>),
90    SummingMergeTree,
91    AggregatingMergeTree,
92    CollapsingMergeTree(String),
93    VersionedCollapsingMergeTree(String),
94    GraphiteMergeTree,
95    ReplicatedMergeTree,
96    ReplicatedReplacingMergeTree(Option<String>),
97    ReplicatedSummingMergeTree,
98    ReplicatedAggregatingMergeTree,
99    ReplicatedCollapsingMergeTree(String),
100    ReplicatedVersionedCollapsingMergeTree(String),
101    ReplicatedGraphiteMergeTree,
102    SharedMergeTree,
103    SharedReplacingMergeTree(Option<String>),
104    SharedSummingMergeTree,
105    SharedAggregatingMergeTree,
106    SharedCollapsingMergeTree(String),
107    SharedVersionedCollapsingMergeTree(String),
108    SharedGraphiteMergeTree,
109    Null,
110}
111impl ClickHouseEngine {
112    pub fn is_collapsing_engine(&self) -> bool {
113        matches!(
114            self,
115            ClickHouseEngine::CollapsingMergeTree(_)
116                | ClickHouseEngine::VersionedCollapsingMergeTree(_)
117                | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
118                | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
119                | ClickHouseEngine::SharedCollapsingMergeTree(_)
120                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
121        )
122    }
123
124    pub fn is_delete_replacing_engine(&self) -> bool {
125        match self {
126            ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
127            ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
128            ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
129            _ => false,
130        }
131    }
132
133    pub fn get_delete_col(&self) -> Option<String> {
134        match self {
135            ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.clone()),
136            ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
137                Some(delete_col.clone())
138            }
139            ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
140                Some(delete_col.clone())
141            }
142            _ => None,
143        }
144    }
145
146    pub fn get_sign_name(&self) -> Option<String> {
147        match self {
148            ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.clone()),
149            ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
150            ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
151            ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
152                Some(sign_name.clone())
153            }
154            ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
155            ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
156                Some(sign_name.clone())
157            }
158            _ => None,
159        }
160    }
161
162    pub fn is_shared_tree(&self) -> bool {
163        matches!(
164            self,
165            ClickHouseEngine::SharedMergeTree
166                | ClickHouseEngine::SharedReplacingMergeTree(_)
167                | ClickHouseEngine::SharedSummingMergeTree
168                | ClickHouseEngine::SharedAggregatingMergeTree
169                | ClickHouseEngine::SharedCollapsingMergeTree(_)
170                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
171                | ClickHouseEngine::SharedGraphiteMergeTree
172        )
173    }
174
175    pub fn from_query_engine(
176        engine_name: &ClickhouseQueryEngine,
177        config: &ClickHouseConfig,
178    ) -> Result<Self> {
179        match engine_name.engine.as_str() {
180            "MergeTree" => Ok(ClickHouseEngine::MergeTree),
181            "Null" => Ok(ClickHouseEngine::Null),
182            "ReplacingMergeTree" => {
183                let delete_column = config.common.delete_column.clone();
184                Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
185            }
186            "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
187            "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
188            // VersionedCollapsingMergeTree(sign_name,"a")
189            "VersionedCollapsingMergeTree" => {
190                let sign_name = engine_name
191                    .create_table_query
192                    .split("VersionedCollapsingMergeTree(")
193                    .last()
194                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
195                    .split(',')
196                    .next()
197                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
198                    .trim()
199                    .to_owned();
200                Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
201            }
202            // CollapsingMergeTree(sign_name)
203            "CollapsingMergeTree" => {
204                let sign_name = engine_name
205                    .create_table_query
206                    .split("CollapsingMergeTree(")
207                    .last()
208                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
209                    .split(')')
210                    .next()
211                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
212                    .trim()
213                    .to_owned();
214                Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
215            }
216            "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
217            "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
218            "ReplicatedReplacingMergeTree" => {
219                let delete_column = config.common.delete_column.clone();
220                Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
221                    delete_column,
222                ))
223            }
224            "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
225            "ReplicatedAggregatingMergeTree" => {
226                Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
227            }
228            // ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c")
229            "ReplicatedVersionedCollapsingMergeTree" => {
230                let sign_name = engine_name
231                    .create_table_query
232                    .split("ReplicatedVersionedCollapsingMergeTree(")
233                    .last()
234                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
235                    .split(',')
236                    .rev()
237                    .nth(1)
238                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
239                    .trim()
240                    .to_owned();
241                Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
242                    sign_name,
243                ))
244            }
245            // ReplicatedCollapsingMergeTree("a","b",sign_name)
246            "ReplicatedCollapsingMergeTree" => {
247                let sign_name = engine_name
248                    .create_table_query
249                    .split("ReplicatedCollapsingMergeTree(")
250                    .last()
251                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
252                    .split(')')
253                    .next()
254                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
255                    .split(',')
256                    .next_back()
257                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
258                    .trim()
259                    .to_owned();
260                Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
261            }
262            "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
263            "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
264            "SharedReplacingMergeTree" => {
265                let delete_column = config.common.delete_column.clone();
266                Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
267            }
268            "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
269            "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
270            // SharedVersionedCollapsingMergeTree("a","b",sign_name,"c")
271            "SharedVersionedCollapsingMergeTree" => {
272                let sign_name = engine_name
273                    .create_table_query
274                    .split("SharedVersionedCollapsingMergeTree(")
275                    .last()
276                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
277                    .split(',')
278                    .rev()
279                    .nth(1)
280                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
281                    .trim()
282                    .to_owned();
283                Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
284                    sign_name,
285                ))
286            }
287            // SharedCollapsingMergeTree("a","b",sign_name)
288            "SharedCollapsingMergeTree" => {
289                let sign_name = engine_name
290                    .create_table_query
291                    .split("SharedCollapsingMergeTree(")
292                    .last()
293                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
294                    .split(')')
295                    .next()
296                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
297                    .split(',')
298                    .next_back()
299                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
300                    .trim()
301                    .to_owned();
302                Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
303            }
304            "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
305            _ => Err(SinkError::ClickHouse(format!(
306                "Cannot find clickhouse engine {:?}",
307                engine_name.engine
308            ))),
309        }
310    }
311}
312
313impl ClickHouseCommon {
314    pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
315        let client = ClickHouseClient::default() // hyper(0.14) client inside
316            .with_url(&self.url)
317            .with_user(&self.user)
318            .with_password(&self.password)
319            .with_database(&self.database)
320            .with_option(ALLOW_EXPERIMENTAL_JSON_TYPE, "1")
321            .with_option(INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1")
322            .with_option(OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1");
323        Ok(client)
324    }
325}
326
327#[serde_as]
328#[derive(Clone, Debug, Deserialize, WithOptions)]
329pub struct ClickHouseConfig {
330    #[serde(flatten)]
331    pub common: ClickHouseCommon,
332
333    pub r#type: String, // accept "append-only" or "upsert"
334}
335
336impl EnforceSecret for ClickHouseConfig {
337    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
338        ClickHouseCommon::enforce_one(prop)
339    }
340
341    fn enforce_secret<'a>(
342        prop_iter: impl Iterator<Item = &'a str>,
343    ) -> crate::error::ConnectorResult<()> {
344        for prop in prop_iter {
345            ClickHouseCommon::enforce_one(prop)?;
346        }
347        Ok(())
348    }
349}
350
351#[derive(Clone, Debug)]
352pub struct ClickHouseSink {
353    pub config: ClickHouseConfig,
354    schema: Schema,
355    pk_indices: Vec<usize>,
356    is_append_only: bool,
357}
358
359impl EnforceSecret for ClickHouseSink {
360    fn enforce_secret<'a>(
361        prop_iter: impl Iterator<Item = &'a str>,
362    ) -> crate::error::ConnectorResult<()> {
363        for prop in prop_iter {
364            ClickHouseConfig::enforce_one(prop)?;
365        }
366        Ok(())
367    }
368}
369
370impl ClickHouseConfig {
371    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
372        let config =
373            serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
374                .map_err(|e| SinkError::Config(anyhow!(e)))?;
375        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
376            return Err(SinkError::Config(anyhow!(
377                "`{}` must be {}, or {}",
378                SINK_TYPE_OPTION,
379                SINK_TYPE_APPEND_ONLY,
380                SINK_TYPE_UPSERT
381            )));
382        }
383        Ok(config)
384    }
385}
386
387impl TryFrom<SinkParam> for ClickHouseSink {
388    type Error = SinkError;
389
390    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
391        let schema = param.schema();
392        let pk_indices = param.downstream_pk_or_empty();
393        let config = ClickHouseConfig::from_btreemap(param.properties)?;
394        Ok(Self {
395            config,
396            schema,
397            pk_indices,
398            is_append_only: param.sink_type.is_append_only(),
399        })
400    }
401}
402
403impl ClickHouseSink {
404    /// Check that the column names and types of risingwave and clickhouse are identical
405    fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
406        let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
407        let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
408            .iter()
409            .map(|s| (s.name.clone(), s.clone()))
410            .collect();
411
412        if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
413            return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
414        }
415
416        for i in rw_fields_name {
417            let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
418                SinkError::ClickHouse(format!(
419                    "Column name don't find in clickhouse, risingwave is {:?} ",
420                    i.0
421                ))
422            })?;
423
424            Self::check_and_correct_column_type(&i.1, value)?;
425        }
426        Ok(())
427    }
428
429    /// Check that the column names and types of risingwave and clickhouse are identical
430    fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
431        let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
432            .iter()
433            .filter(|s| s.is_in_primary_key == 1)
434            .map(|s| s.name.clone())
435            .collect();
436
437        for (_, field) in self
438            .schema
439            .fields()
440            .iter()
441            .enumerate()
442            .filter(|(index, _)| self.pk_indices.contains(index))
443        {
444            if !clickhouse_pks.remove(&field.name) {
445                return Err(SinkError::ClickHouse(
446                    "Clicklhouse and RisingWave pk is not match".to_owned(),
447                ));
448            }
449        }
450
451        if !clickhouse_pks.is_empty() {
452            return Err(SinkError::ClickHouse(
453                "Clicklhouse and RisingWave pk is not match".to_owned(),
454            ));
455        }
456        Ok(())
457    }
458
459    /// Check that the column types of risingwave and clickhouse are identical
460    fn check_and_correct_column_type(
461        fields_type: &DataType,
462        ck_column: &SystemColumn,
463    ) -> Result<()> {
464        // FIXME: the "contains" based implementation is wrong
465        let is_match = match fields_type {
466            risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
467            risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
468                | ck_column.r#type.contains("Int16")
469                // Allow Int16 to be pushed to Enum16, they share an encoding and value range
470                // No special care is taken to ensure values are valid.
471                | ck_column.r#type.contains("Enum16")),
472            risingwave_common::types::DataType::Int32 => {
473                Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
474            }
475            risingwave_common::types::DataType::Int64 => {
476                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
477            }
478            risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
479            risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
480            risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
481            risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
482            risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
483            risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
484                "TIME is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
485            )),
486            risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
487                "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
488            )),
489            risingwave_common::types::DataType::Timestamptz => {
490                Ok(ck_column.r#type.contains("DateTime64"))
491            }
492            risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
493                "INTERVAL is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
494            )),
495            risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
496                "struct needs to be converted into a list".to_owned(),
497            )),
498            risingwave_common::types::DataType::List(list) => {
499                Self::check_and_correct_column_type(list.elem(), ck_column)?;
500                Ok(ck_column.r#type.contains("Array"))
501            }
502            risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
503                "BYTEA is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
504            )),
505            risingwave_common::types::DataType::Jsonb => Ok(ck_column.r#type.contains("JSON")),
506            risingwave_common::types::DataType::Serial => {
507                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
508            }
509            risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
510                "INT256 is not supported for ClickHouse sink.".to_owned(),
511            )),
512            risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
513                "MAP is not supported for ClickHouse sink.".to_owned(),
514            )),
515            DataType::Vector(_) => Err(SinkError::ClickHouse(
516                "VECTOR is not supported for ClickHouse sink.".to_owned(),
517            )),
518        };
519        if !is_match? {
520            return Err(SinkError::ClickHouse(format!(
521                "Column type mismatch for column {:?}: RisingWave type is {:?}, ClickHouse type is {:?}",
522                ck_column.name, fields_type, ck_column.r#type
523            )));
524        }
525
526        Ok(())
527    }
528}
529
530impl Sink for ClickHouseSink {
531    type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
532
533    const SINK_NAME: &'static str = CLICKHOUSE_SINK;
534
535    async fn validate(&self) -> Result<()> {
536        // For upsert clickhouse sink, the primary key must be defined.
537        if !self.is_append_only && self.pk_indices.is_empty() {
538            return Err(SinkError::Config(anyhow!(
539                "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
540            )));
541        }
542
543        // check reachability
544        let client = self.config.common.build_client()?;
545
546        let (clickhouse_column, clickhouse_engine) =
547            query_column_engine_from_ck(client, &self.config).await?;
548        if clickhouse_engine.is_shared_tree() {
549            risingwave_common::license::Feature::ClickHouseSharedEngine
550                .check_available()
551                .map_err(|e| anyhow::anyhow!(e))?;
552        }
553
554        if !self.is_append_only
555            && !clickhouse_engine.is_collapsing_engine()
556            && !clickhouse_engine.is_delete_replacing_engine()
557        {
558            return match clickhouse_engine {
559                ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) =>  {
560                    Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
561                }
562                _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
563            };
564        }
565
566        self.check_column_name_and_type(&clickhouse_column)?;
567        if !self.is_append_only {
568            self.check_pk_match(&clickhouse_column)?;
569        }
570
571        if self.config.common.commit_checkpoint_interval == 0 {
572            return Err(SinkError::Config(anyhow!(
573                "`commit_checkpoint_interval` must be greater than 0"
574            )));
575        }
576        Ok(())
577    }
578
579    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
580        ClickHouseConfig::from_btreemap(config.clone())?;
581        Ok(())
582    }
583
584    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
585        let writer = ClickHouseSinkWriter::new(
586            self.config.clone(),
587            self.schema.clone(),
588            self.pk_indices.clone(),
589            self.is_append_only,
590        )
591        .await?;
592        let commit_checkpoint_interval =
593    NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
594        "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
595    );
596
597        Ok(DecoupleCheckpointLogSinkerOf::new(
598            writer,
599            SinkWriterMetrics::new(&writer_param),
600            commit_checkpoint_interval,
601        ))
602    }
603}
604pub struct ClickHouseSinkWriter {
605    pub config: ClickHouseConfig,
606    #[expect(dead_code)]
607    schema: Schema,
608    #[expect(dead_code)]
609    pk_indices: Vec<usize>,
610    client: ClickHouseClient,
611    #[expect(dead_code)]
612    is_append_only: bool,
613    // Save some features of the clickhouse column type
614    column_correct_vec: Vec<ClickHouseSchemaFeature>,
615    rw_fields_name_after_calibration: Vec<String>,
616    clickhouse_engine: ClickHouseEngine,
617    inserter: Option<Insert<ClickHouseColumn>>,
618}
619#[derive(Debug)]
620struct ClickHouseSchemaFeature {
621    can_null: bool,
622    // Time accuracy in clickhouse for rw and ck conversions
623    accuracy_time: u8,
624
625    accuracy_decimal: (u8, u8),
626}
627
628impl ClickHouseSinkWriter {
629    pub async fn new(
630        config: ClickHouseConfig,
631        schema: Schema,
632        pk_indices: Vec<usize>,
633        is_append_only: bool,
634    ) -> Result<Self> {
635        let client = config.common.build_client()?;
636
637        let (clickhouse_column, clickhouse_engine) =
638            query_column_engine_from_ck(client.clone(), &config).await?;
639
640        let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
641            .iter()
642            .map(Self::build_column_correct_vec)
643            .collect();
644        let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
645            .iter()
646            .map(|(a, _)| a.clone())
647            .collect_vec();
648
649        if let Some(sign) = clickhouse_engine.get_sign_name() {
650            rw_fields_name_after_calibration.push(sign);
651        }
652        if let Some(delete_col) = clickhouse_engine.get_delete_col() {
653            rw_fields_name_after_calibration.push(delete_col);
654        }
655        Ok(Self {
656            config,
657            schema,
658            pk_indices,
659            client,
660            is_append_only,
661            column_correct_vec: column_correct_vec?,
662            rw_fields_name_after_calibration,
663            clickhouse_engine,
664            inserter: None,
665        })
666    }
667
668    /// Check if clickhouse's column is 'Nullable', valid bits of `DateTime64`. And save it in
669    /// `column_correct_vec`
670    fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
671        let can_null = ck_column.r#type.contains("Nullable");
672        // `DateTime64` without precision is already displayed as `DateTime(3)` in `system.columns`.
673        let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
674            ck_column
675                .r#type
676                .split("DateTime64(")
677                .last()
678                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
679                .split(')')
680                .next()
681                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
682                .split(',')
683                .next()
684                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
685                .parse::<u8>()
686                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
687        } else {
688            0_u8
689        };
690        let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
691            let decimal_all = ck_column
692                .r#type
693                .split("Decimal(")
694                .last()
695                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
696                .split(')')
697                .next()
698                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
699                .split(", ")
700                .collect_vec();
701            let length = decimal_all
702                .first()
703                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
704                .parse::<u8>()
705                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
706
707            if length > 38 {
708                return Err(SinkError::ClickHouse(
709                    "RW don't support Decimal256".to_owned(),
710                ));
711            }
712
713            let scale = decimal_all
714                .last()
715                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
716                .parse::<u8>()
717                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
718            (length, scale)
719        } else {
720            (0_u8, 0_u8)
721        };
722        Ok(ClickHouseSchemaFeature {
723            can_null,
724            accuracy_time,
725            accuracy_decimal,
726        })
727    }
728
729    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
730        if self.inserter.is_none() {
731            self.inserter = Some(self.client.insert_with_fields_name(
732                &self.config.common.table,
733                self.rw_fields_name_after_calibration.clone(),
734            )?);
735        }
736        for (op, row) in chunk.rows() {
737            let mut clickhouse_filed_vec = vec![];
738            for (index, data) in row.iter().enumerate() {
739                clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
740                    data,
741                    &self.column_correct_vec,
742                    index,
743                )?);
744            }
745            match op {
746                Op::Insert | Op::UpdateInsert => {
747                    if self.clickhouse_engine.is_collapsing_engine() {
748                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
749                            ClickHouseField::Int8(1),
750                        ));
751                    }
752                    if self.clickhouse_engine.is_delete_replacing_engine() {
753                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
754                            ClickHouseField::Int8(0),
755                        ))
756                    }
757                }
758                Op::Delete | Op::UpdateDelete => {
759                    if !self.clickhouse_engine.is_collapsing_engine()
760                        && !self.clickhouse_engine.is_delete_replacing_engine()
761                    {
762                        return Err(SinkError::ClickHouse(
763                            "Clickhouse engine don't support upsert".to_owned(),
764                        ));
765                    }
766                    if self.clickhouse_engine.is_collapsing_engine() {
767                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
768                            ClickHouseField::Int8(-1),
769                        ));
770                    }
771                    if self.clickhouse_engine.is_delete_replacing_engine() {
772                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
773                            ClickHouseField::Int8(1),
774                        ))
775                    }
776                }
777            }
778            let clickhouse_column = ClickHouseColumn {
779                row: clickhouse_filed_vec,
780            };
781            self.inserter
782                .as_mut()
783                .unwrap()
784                .write(&clickhouse_column)
785                .await?;
786        }
787        Ok(())
788    }
789}
790
791#[async_trait]
792impl SinkWriter for ClickHouseSinkWriter {
793    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
794        self.write(chunk).await
795    }
796
797    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
798        Ok(())
799    }
800
801    async fn abort(&mut self) -> Result<()> {
802        Ok(())
803    }
804
805    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
806        if is_checkpoint && let Some(inserter) = self.inserter.take() {
807            inserter.end().await?;
808        }
809        Ok(())
810    }
811}
812
813#[derive(ClickHouseRow, Deserialize, Clone)]
814struct SystemColumn {
815    name: String,
816    r#type: String,
817    is_in_primary_key: u8,
818}
819
820#[derive(ClickHouseRow, Deserialize)]
821struct ClickhouseQueryEngine {
822    #[expect(dead_code)]
823    name: String,
824    engine: String,
825    create_table_query: String,
826}
827
828async fn query_column_engine_from_ck(
829    client: ClickHouseClient,
830    config: &ClickHouseConfig,
831) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
832    let query_engine = QUERY_ENGINE;
833    let query_column = QUERY_COLUMN;
834
835    let clickhouse_engine = client
836        .query(query_engine)
837        .bind(config.common.database.clone())
838        .bind(config.common.table.clone())
839        .fetch_all::<ClickhouseQueryEngine>()
840        .await?;
841    let mut clickhouse_column = client
842        .query(query_column)
843        .bind(config.common.database.clone())
844        .bind(config.common.table.clone())
845        .bind("position")
846        .fetch_all::<SystemColumn>()
847        .await?;
848    if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
849        return Err(SinkError::ClickHouse(format!(
850            "table {:?}.{:?} is not find in clickhouse",
851            config.common.database, config.common.table
852        )));
853    }
854
855    let clickhouse_engine =
856        ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
857
858    if let Some(sign) = &clickhouse_engine.get_sign_name() {
859        clickhouse_column.retain(|a| sign.ne(&a.name))
860    }
861
862    if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
863        clickhouse_column.retain(|a| delete_col.ne(&a.name))
864    }
865
866    Ok((clickhouse_column, clickhouse_engine))
867}
868
869/// Serialize this structure to simulate the `struct` call clickhouse interface
870#[derive(ClickHouseRow, Debug)]
871struct ClickHouseColumn {
872    row: Vec<ClickHouseFieldWithNull>,
873}
874
875/// Basic data types for use with the clickhouse interface
876#[derive(Debug)]
877enum ClickHouseField {
878    Int16(i16),
879    Int32(i32),
880    Int64(i64),
881    Serial(Serial),
882    Float32(f32),
883    Float64(f64),
884    String(String),
885    Bool(bool),
886    List(Vec<ClickHouseFieldWithNull>),
887    Int8(i8),
888    Decimal(ClickHouseDecimal),
889}
890#[derive(Debug)]
891enum ClickHouseDecimal {
892    Decimal32(i32),
893    Decimal64(i64),
894    Decimal128(i128),
895}
896impl Serialize for ClickHouseDecimal {
897    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
898    where
899        S: serde::Serializer,
900    {
901        match self {
902            ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
903            ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
904            ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
905        }
906    }
907}
908
909/// Enum that support clickhouse nullable
910#[derive(Debug)]
911enum ClickHouseFieldWithNull {
912    WithSome(ClickHouseField),
913    WithoutSome(ClickHouseField),
914    None,
915}
916
917impl ClickHouseFieldWithNull {
918    pub fn from_scalar_ref(
919        data: Option<ScalarRefImpl<'_>>,
920        clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
921        clickhouse_schema_feature_index: usize,
922    ) -> Result<Vec<ClickHouseFieldWithNull>> {
923        let clickhouse_schema_feature = clickhouse_schema_feature_vec
924            .get(clickhouse_schema_feature_index)
925            .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
926        if data.is_none() {
927            if !clickhouse_schema_feature.can_null {
928                return Err(SinkError::ClickHouse(
929                    "Cannot insert null value into non-nullable ClickHouse column".to_owned(),
930                ));
931            } else {
932                return Ok(vec![ClickHouseFieldWithNull::None]);
933            }
934        }
935        let data = match data.unwrap() {
936            ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
937            ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
938            ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
939            ScalarRefImpl::Int256(_) => {
940                return Err(SinkError::ClickHouse(
941                    "INT256 is not supported for ClickHouse sink.".to_owned(),
942                ));
943            }
944            ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
945            ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
946            ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
947            ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
948            ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
949            ScalarRefImpl::Decimal(d) => {
950                let d = if let Decimal::Normalized(d) = d {
951                    let scale =
952                        clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
953                    if scale < 0 {
954                        d.mantissa() / 10_i128.pow(scale.unsigned_abs())
955                    } else {
956                        d.mantissa() * 10_i128.pow(scale as u32)
957                    }
958                } else if clickhouse_schema_feature.can_null {
959                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
960                    return Ok(vec![ClickHouseFieldWithNull::None]);
961                } else {
962                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
963                    0_i128
964                };
965                if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
966                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
967                } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
968                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
969                } else {
970                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
971                }
972            }
973            ScalarRefImpl::Interval(_) => {
974                return Err(SinkError::ClickHouse(
975                    "INTERVAL is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
976                ));
977            }
978            ScalarRefImpl::Date(v) => {
979                let days = v.get_nums_days_unix_epoch();
980                ClickHouseField::Int32(days)
981            }
982            ScalarRefImpl::Time(_) => {
983                return Err(SinkError::ClickHouse(
984                    "TIME is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
985                ));
986            }
987            ScalarRefImpl::Timestamp(_) => {
988                return Err(SinkError::ClickHouse(
989                    "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
990                ));
991            }
992            ScalarRefImpl::Timestamptz(v) => {
993                let micros = v.timestamp_micros();
994                let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
995                    true => {
996                        micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
997                    }
998                    false => micros
999                        .checked_mul(
1000                            10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
1001                        )
1002                        .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1003                };
1004                ClickHouseField::Int64(ticks)
1005            }
1006            ScalarRefImpl::Jsonb(v) => {
1007                let json_str = v.to_string();
1008                ClickHouseField::String(json_str)
1009            }
1010            ScalarRefImpl::Struct(v) => {
1011                let mut struct_vec = vec![];
1012                for (index, field) in v.iter_fields_ref().enumerate() {
1013                    let a = Self::from_scalar_ref(
1014                        field,
1015                        clickhouse_schema_feature_vec,
1016                        clickhouse_schema_feature_index + index,
1017                    )?;
1018                    struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1019                        a,
1020                    )));
1021                }
1022                return Ok(struct_vec);
1023            }
1024            ScalarRefImpl::List(v) => {
1025                let mut vec = vec![];
1026                for i in v.iter() {
1027                    vec.extend(Self::from_scalar_ref(
1028                        i,
1029                        clickhouse_schema_feature_vec,
1030                        clickhouse_schema_feature_index,
1031                    )?)
1032                }
1033                return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1034                    ClickHouseField::List(vec),
1035                )]);
1036            }
1037            ScalarRefImpl::Bytea(_) => {
1038                return Err(SinkError::ClickHouse(
1039                    "BYTEA is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
1040                ));
1041            }
1042            ScalarRefImpl::Map(_) => {
1043                return Err(SinkError::ClickHouse(
1044                    "MAP is not supported for ClickHouse sink.".to_owned(),
1045                ));
1046            }
1047            ScalarRefImpl::Vector(_) => {
1048                return Err(SinkError::ClickHouse(
1049                    "VECTOR is not supported for ClickHouse sink.".to_owned(),
1050                ));
1051            }
1052        };
1053        let data = if clickhouse_schema_feature.can_null {
1054            vec![ClickHouseFieldWithNull::WithSome(data)]
1055        } else {
1056            vec![ClickHouseFieldWithNull::WithoutSome(data)]
1057        };
1058        Ok(data)
1059    }
1060}
1061
1062impl Serialize for ClickHouseField {
1063    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1064    where
1065        S: serde::Serializer,
1066    {
1067        match self {
1068            ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1069            ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1070            ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1071            ClickHouseField::Serial(v) => v.serialize(serializer),
1072            ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1073            ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1074            ClickHouseField::String(v) => serializer.serialize_str(v),
1075            ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1076            ClickHouseField::List(v) => {
1077                let mut s = serializer.serialize_seq(Some(v.len()))?;
1078                for i in v {
1079                    s.serialize_element(i)?;
1080                }
1081                s.end()
1082            }
1083            ClickHouseField::Decimal(v) => v.serialize(serializer),
1084            ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1085        }
1086    }
1087}
1088impl Serialize for ClickHouseFieldWithNull {
1089    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1090    where
1091        S: serde::Serializer,
1092    {
1093        match self {
1094            ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1095            ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1096            ClickHouseFieldWithNull::None => serializer.serialize_none(),
1097        }
1098    }
1099}
1100impl Serialize for ClickHouseColumn {
1101    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1102    where
1103        S: serde::Serializer,
1104    {
1105        let mut s = serializer.serialize_struct("useless", self.row.len())?;
1106        for data in &self.row {
1107            s.serialize_field("useless", &data)?
1108        }
1109        s.end()
1110    }
1111}
1112
1113/// 'Struct'(clickhouse type name is nested) will be converted into some arrays by clickhouse. So we
1114/// need to make some conversions
1115pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1116    let mut vec = vec![];
1117    for field in schema.fields() {
1118        if let DataType::Struct(st) = &field.data_type {
1119            for (name, data_type) in st.iter() {
1120                if matches!(data_type, DataType::Struct(_)) {
1121                    return Err(SinkError::ClickHouse(
1122                        "Only one level of nesting is supported for struct".to_owned(),
1123                    ));
1124                } else {
1125                    vec.push((
1126                        format!("{}.{}", field.name, name),
1127                        DataType::list(data_type.clone()),
1128                    ))
1129                }
1130            }
1131        } else {
1132            vec.push((field.name.clone(), field.data_type()));
1133        }
1134    }
1135    Ok(vec)
1136}